diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/filter.c | 122 | ||||
| -rw-r--r-- | src/filter.h | 25 | ||||
| -rw-r--r-- | src/log_tail.c | 6 | ||||
| -rw-r--r-- | src/main.c | 13 | ||||
| -rw-r--r-- | src/umbrella.h | 7 | ||||
| -rw-r--r-- | src/unit.c | 9 |
6 files changed, 178 insertions, 4 deletions
diff --git a/src/filter.c b/src/filter.c new file mode 100644 index 0000000..7bc2a68 --- /dev/null +++ b/src/filter.c @@ -0,0 +1,122 @@ +#include "filter.h" +#include "log.h" + +#include <assert.h> +#include <errno.h> +#include <fcntl.h> +#include <poll.h> +#include <signal.h> +#include <string.h> +#include <unistd.h> + +#define FILTER_TIMEOUT_MS 250 + +/* ── filter_start ────────────────────────────────────────────────────────── */ + +void filter_start(Unit *u) { + assert(u != NULL); + assert(u->filter_in_fd == -1); + + if (!u->log_filter[0]) return; + + int to_filter[2], from_filter[2]; + if (pipe(to_filter) < 0) { + log_warn("filter: pipe() failed for %s: %s", u->name, strerror(errno)); + return; + } + if (pipe(from_filter) < 0) { + log_warn("filter: pipe() failed for %s: %s", u->name, strerror(errno)); + close(to_filter[0]); close(to_filter[1]); + return; + } + + pid_t pid = fork(); + if (pid < 0) { + log_warn("filter: fork() failed for %s: %s", u->name, strerror(errno)); + close(to_filter[0]); close(to_filter[1]); + close(from_filter[0]); close(from_filter[1]); + return; + } + + if (pid == 0) { + /* Child: wire pipes and exec the filter */ + if (dup2(to_filter[0], STDIN_FILENO) < 0) _exit(1); + if (dup2(from_filter[1], STDOUT_FILENO) < 0) _exit(1); + close(to_filter[0]); close(to_filter[1]); + close(from_filter[0]); close(from_filter[1]); + execl(u->log_filter, u->log_filter, (char *)NULL); + _exit(1); + } + + /* Parent: keep write end of to_filter, read end of from_filter */ + close(to_filter[0]); + close(from_filter[1]); + if (fcntl(from_filter[0], F_SETFL, O_NONBLOCK) < 0) + log_warn("filter: fcntl O_NONBLOCK failed for %s", u->name); + + u->filter_pid = pid; + u->filter_in_fd = to_filter[1]; + u->filter_out_fd = from_filter[0]; + log_info("filter: started '%s' (pid %d) for unit %s", + u->log_filter, (int)pid, u->name); +} + +/* ── filter_apply ────────────────────────────────────────────────────────── */ + +void filter_apply(Unit *u, char *buf, int bufsize, int *len) { + assert(u != NULL); + assert(buf != NULL); + assert(len != NULL); + + if (u->filter_in_fd < 0) return; + + ssize_t w = write(u->filter_in_fd, buf, (size_t)*len); + if (w < 0) { + log_warn("filter: write failed for %s: %s — stopping filter", + u->name, strerror(errno)); + filter_stop(u); + return; + } + + struct pollfd pfd = { u->filter_out_fd, POLLIN, 0 }; + int r = poll(&pfd, 1, FILTER_TIMEOUT_MS); + if (r < 0) { + log_warn("filter: poll failed for %s: %s", u->name, strerror(errno)); + return; /* pass through unchanged */ + } + if (r == 0) return; /* timeout — pass through unchanged */ + + ssize_t n = read(u->filter_out_fd, buf, bufsize - 1); + if (n < 0 && errno != EAGAIN) + log_warn("filter: read failed for %s: %s", u->name, strerror(errno)); + if (n > 0) { + buf[n] = '\0'; + *len = (int)n; + } else { + *len = 0; /* filter suppressed all output */ + } +} + +/* ── filter_stop / filter_stop_all ───────────────────────────────────────── */ + +void filter_stop(Unit *u) { + assert(u != NULL); + + if (u->filter_pid <= 0) return; + + close(u->filter_in_fd); + close(u->filter_out_fd); + kill(u->filter_pid, SIGTERM); + + u->filter_in_fd = -1; + u->filter_out_fd = -1; + u->filter_pid = 0; + log_info("filter: stopped filter for unit %s", u->name); +} + +void filter_stop_all(void) { + assert(g.unit_count >= 0); + + for (int i = 0; i < g.unit_count; i++) + filter_stop(&g.units[i]); +} diff --git a/src/filter.h b/src/filter.h new file mode 100644 index 0000000..60a176f --- /dev/null +++ b/src/filter.h @@ -0,0 +1,25 @@ +#ifndef FILTER_H +#define FILTER_H + +#include "umbrella.h" + +/* + * filter_start — spawn the unit's log_filter executable as a persistent + * subprocess with stdin/stdout pipes. No-op if log_filter + * is empty. Safe to call on every unit unconditionally. + * + * filter_apply — write buf to the filter's stdin, read back the + * transformed output into buf. Updates *len. If the filter + * suppresses all output (returns 0 bytes), *len is set to 0. + * Falls back to pass-through on timeout or error. + * + * filter_stop — signal and close the filter subprocess for one unit. + * filter_stop_all — stop filters for all loaded units. + */ + +void filter_start(Unit *u); +void filter_apply(Unit *u, char *buf, int bufsize, int *len); +void filter_stop(Unit *u); +void filter_stop_all(void); + +#endif /* FILTER_H */ diff --git a/src/log_tail.c b/src/log_tail.c index c238911..f943bbe 100644 --- a/src/log_tail.c +++ b/src/log_tail.c @@ -1,4 +1,5 @@ #include "log_tail.h" +#include "filter.h" #include "client.h" #include "unit.h" #include "log.h" @@ -121,7 +122,10 @@ void log_tail_handle(int inotify_fd) { ssize_t n; while ((n = read(w->log_fd, buf, sizeof(buf) - 1)) > 0) { buf[n] = '\0'; - ring_push(w->unit->output, buf, (int)n); + int nf = (int)n; + filter_apply(w->unit, buf, sizeof(buf), &nf); + if (nf <= 0) continue; + ring_push(w->unit->output, buf, nf); client_broadcast_output(w->unit->name, buf, 0); } @@ -4,6 +4,7 @@ #include "client.h" #include "log.h" #include "log_tail.h" +#include "filter.h" #include <stdio.h> #include <stdlib.h> @@ -60,9 +61,12 @@ static void handle_signal(void) { case SIGHUP: log_info("SIGHUP received — reloading units"); log_tail_cleanup(); + filter_stop_all(); /* Reload unit descriptors. Existing runtime state is preserved. */ g.unit_count = 0; unit_load_all(); + for (int i = 0; i < g.unit_count; i++) + filter_start(&g.units[i]); log_tail_init(); break; } @@ -96,7 +100,10 @@ static void handle_process_output(int fd) { } buf[n] = '\0'; - ring_push(u->output, buf, n); + int nf = (int)n; + filter_apply(u, buf, sizeof(buf), &nf); + if (nf <= 0) return; + ring_push(u->output, buf, nf); client_broadcast_output(u->name, buf, 0); } @@ -214,6 +221,9 @@ int main(int argc, char *argv[]) { return 1; } + for (int i = 0; i < g.unit_count; i++) + filter_start(&g.units[i]); + log_tail_init(); /* Set up listening socket */ @@ -235,6 +245,7 @@ int main(int argc, char *argv[]) { log_info("Shutting down"); log_tail_cleanup(); + filter_stop_all(); daemon_cleanup(); log_close(); return 0; diff --git a/src/umbrella.h b/src/umbrella.h index 2e9d440..ff1873f 100644 --- a/src/umbrella.h +++ b/src/umbrella.h @@ -91,12 +91,19 @@ typedef struct { Action actions[MAX_ACTIONS]; int action_count; + char log_filter[MAX_PATH]; /* path to filter executable, or empty */ + /* Runtime state (populated by process/rcon layer, not yaml) */ ProcessState state; pid_t pid; /* only for CONSOLE_STDIN */ int stdin_fd; /* only for CONSOLE_STDIN */ int stdout_fd; /* only for CONSOLE_STDIN */ RingBuffer *output; + + /* Log filter subprocess (populated at runtime if log_filter is set) */ + pid_t filter_pid; + int filter_in_fd; /* write end: daemon → filter stdin */ + int filter_out_fd; /* read end: filter stdout → daemon */ } Unit; /* ── Client: a connected socket client ───────────────────────────────────── */ @@ -93,8 +93,11 @@ typedef enum { int unit_load_file(const char *path, Unit *out) { memset(out, 0, sizeof(Unit)); out->state = STATE_STOPPED; - out->stdin_fd = -1; - out->stdout_fd = -1; + out->stdin_fd = -1; + out->stdout_fd = -1; + out->filter_pid = 0; + out->filter_in_fd = -1; + out->filter_out_fd = -1; out->health.timeout_ms = 5000; FILE *f = fopen(path, "r"); @@ -171,6 +174,8 @@ int unit_load_file(const char *path, Unit *out) { strncpy(out->service, val, MAX_NAME - 1); else if (strcmp(last_key, "broadcast_cmd") == 0) strncpy(out->broadcast_cmd, val, sizeof(out->broadcast_cmd) - 1); + else if (strcmp(last_key, "log_filter") == 0) + strncpy(out->log_filter, val, MAX_PATH - 1); break; case SECTION_CONSOLE: |
