From 23527346905a2728c418170a155d0c24b10f5b25 Mon Sep 17 00:00:00 2001 From: auric Date: Sun, 22 Feb 2026 19:13:06 -0600 Subject: Add log_filter: pluggable per-unit output filter subprocess MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Each unit can specify log_filter: pointing to any executable that reads stdin and writes filtered output to stdout. The daemon spawns it once at startup (and on SIGHUP), pipes raw log data through it before ring-buffering and broadcasting to attached clients. filter.c handles spawn (fork/exec with pipes), per-chunk apply (write + poll with 250ms timeout, pass-through on silence/timeout), and stop (SIGTERM + fd cleanup). Filters are stopped and restarted cleanly on SIGHUP alongside log_tail. Bundled filters in filters/: source.py — TF2, GMod: strips server_cvar/stuck/path_goal spam, strips the L MM/DD/YYYY - HH:MM:SS: prefix minecraft.py — vanilla/Paper/Spigot: strips keepAlive, autosave, internal class logs, strips [HH:MM:SS] [thread] prefix terraria.py — vanilla/tModLoader: strips blank lines and mod loading noise during startup Any executable reading stdin/writing stdout works as a custom filter. Co-Authored-By: Claude Sonnet 4.6 --- src/filter.c | 122 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/filter.h | 25 ++++++++++++ src/log_tail.c | 6 ++- src/main.c | 13 +++++- src/umbrella.h | 7 ++++ src/unit.c | 9 ++++- 6 files changed, 178 insertions(+), 4 deletions(-) create mode 100644 src/filter.c create mode 100644 src/filter.h (limited to 'src') diff --git a/src/filter.c b/src/filter.c new file mode 100644 index 0000000..7bc2a68 --- /dev/null +++ b/src/filter.c @@ -0,0 +1,122 @@ +#include "filter.h" +#include "log.h" + +#include +#include +#include +#include +#include +#include +#include + +#define FILTER_TIMEOUT_MS 250 + +/* ── filter_start ────────────────────────────────────────────────────────── */ + +void filter_start(Unit *u) { + assert(u != NULL); + assert(u->filter_in_fd == -1); + + if (!u->log_filter[0]) return; + + int to_filter[2], from_filter[2]; + if (pipe(to_filter) < 0) { + log_warn("filter: pipe() failed for %s: %s", u->name, strerror(errno)); + return; + } + if (pipe(from_filter) < 0) { + log_warn("filter: pipe() failed for %s: %s", u->name, strerror(errno)); + close(to_filter[0]); close(to_filter[1]); + return; + } + + pid_t pid = fork(); + if (pid < 0) { + log_warn("filter: fork() failed for %s: %s", u->name, strerror(errno)); + close(to_filter[0]); close(to_filter[1]); + close(from_filter[0]); close(from_filter[1]); + return; + } + + if (pid == 0) { + /* Child: wire pipes and exec the filter */ + if (dup2(to_filter[0], STDIN_FILENO) < 0) _exit(1); + if (dup2(from_filter[1], STDOUT_FILENO) < 0) _exit(1); + close(to_filter[0]); close(to_filter[1]); + close(from_filter[0]); close(from_filter[1]); + execl(u->log_filter, u->log_filter, (char *)NULL); + _exit(1); + } + + /* Parent: keep write end of to_filter, read end of from_filter */ + close(to_filter[0]); + close(from_filter[1]); + if (fcntl(from_filter[0], F_SETFL, O_NONBLOCK) < 0) + log_warn("filter: fcntl O_NONBLOCK failed for %s", u->name); + + u->filter_pid = pid; + u->filter_in_fd = to_filter[1]; + u->filter_out_fd = from_filter[0]; + log_info("filter: started '%s' (pid %d) for unit %s", + u->log_filter, (int)pid, u->name); +} + +/* ── filter_apply ────────────────────────────────────────────────────────── */ + +void filter_apply(Unit *u, char *buf, int bufsize, int *len) { + assert(u != NULL); + assert(buf != NULL); + assert(len != NULL); + + if (u->filter_in_fd < 0) return; + + ssize_t w = write(u->filter_in_fd, buf, (size_t)*len); + if (w < 0) { + log_warn("filter: write failed for %s: %s — stopping filter", + u->name, strerror(errno)); + filter_stop(u); + return; + } + + struct pollfd pfd = { u->filter_out_fd, POLLIN, 0 }; + int r = poll(&pfd, 1, FILTER_TIMEOUT_MS); + if (r < 0) { + log_warn("filter: poll failed for %s: %s", u->name, strerror(errno)); + return; /* pass through unchanged */ + } + if (r == 0) return; /* timeout — pass through unchanged */ + + ssize_t n = read(u->filter_out_fd, buf, bufsize - 1); + if (n < 0 && errno != EAGAIN) + log_warn("filter: read failed for %s: %s", u->name, strerror(errno)); + if (n > 0) { + buf[n] = '\0'; + *len = (int)n; + } else { + *len = 0; /* filter suppressed all output */ + } +} + +/* ── filter_stop / filter_stop_all ───────────────────────────────────────── */ + +void filter_stop(Unit *u) { + assert(u != NULL); + + if (u->filter_pid <= 0) return; + + close(u->filter_in_fd); + close(u->filter_out_fd); + kill(u->filter_pid, SIGTERM); + + u->filter_in_fd = -1; + u->filter_out_fd = -1; + u->filter_pid = 0; + log_info("filter: stopped filter for unit %s", u->name); +} + +void filter_stop_all(void) { + assert(g.unit_count >= 0); + + for (int i = 0; i < g.unit_count; i++) + filter_stop(&g.units[i]); +} diff --git a/src/filter.h b/src/filter.h new file mode 100644 index 0000000..60a176f --- /dev/null +++ b/src/filter.h @@ -0,0 +1,25 @@ +#ifndef FILTER_H +#define FILTER_H + +#include "umbrella.h" + +/* + * filter_start — spawn the unit's log_filter executable as a persistent + * subprocess with stdin/stdout pipes. No-op if log_filter + * is empty. Safe to call on every unit unconditionally. + * + * filter_apply — write buf to the filter's stdin, read back the + * transformed output into buf. Updates *len. If the filter + * suppresses all output (returns 0 bytes), *len is set to 0. + * Falls back to pass-through on timeout or error. + * + * filter_stop — signal and close the filter subprocess for one unit. + * filter_stop_all — stop filters for all loaded units. + */ + +void filter_start(Unit *u); +void filter_apply(Unit *u, char *buf, int bufsize, int *len); +void filter_stop(Unit *u); +void filter_stop_all(void); + +#endif /* FILTER_H */ diff --git a/src/log_tail.c b/src/log_tail.c index c238911..f943bbe 100644 --- a/src/log_tail.c +++ b/src/log_tail.c @@ -1,4 +1,5 @@ #include "log_tail.h" +#include "filter.h" #include "client.h" #include "unit.h" #include "log.h" @@ -121,7 +122,10 @@ void log_tail_handle(int inotify_fd) { ssize_t n; while ((n = read(w->log_fd, buf, sizeof(buf) - 1)) > 0) { buf[n] = '\0'; - ring_push(w->unit->output, buf, (int)n); + int nf = (int)n; + filter_apply(w->unit, buf, sizeof(buf), &nf); + if (nf <= 0) continue; + ring_push(w->unit->output, buf, nf); client_broadcast_output(w->unit->name, buf, 0); } diff --git a/src/main.c b/src/main.c index 901ce74..af86375 100644 --- a/src/main.c +++ b/src/main.c @@ -4,6 +4,7 @@ #include "client.h" #include "log.h" #include "log_tail.h" +#include "filter.h" #include #include @@ -60,9 +61,12 @@ static void handle_signal(void) { case SIGHUP: log_info("SIGHUP received — reloading units"); log_tail_cleanup(); + filter_stop_all(); /* Reload unit descriptors. Existing runtime state is preserved. */ g.unit_count = 0; unit_load_all(); + for (int i = 0; i < g.unit_count; i++) + filter_start(&g.units[i]); log_tail_init(); break; } @@ -96,7 +100,10 @@ static void handle_process_output(int fd) { } buf[n] = '\0'; - ring_push(u->output, buf, n); + int nf = (int)n; + filter_apply(u, buf, sizeof(buf), &nf); + if (nf <= 0) return; + ring_push(u->output, buf, nf); client_broadcast_output(u->name, buf, 0); } @@ -214,6 +221,9 @@ int main(int argc, char *argv[]) { return 1; } + for (int i = 0; i < g.unit_count; i++) + filter_start(&g.units[i]); + log_tail_init(); /* Set up listening socket */ @@ -235,6 +245,7 @@ int main(int argc, char *argv[]) { log_info("Shutting down"); log_tail_cleanup(); + filter_stop_all(); daemon_cleanup(); log_close(); return 0; diff --git a/src/umbrella.h b/src/umbrella.h index 2e9d440..ff1873f 100644 --- a/src/umbrella.h +++ b/src/umbrella.h @@ -91,12 +91,19 @@ typedef struct { Action actions[MAX_ACTIONS]; int action_count; + char log_filter[MAX_PATH]; /* path to filter executable, or empty */ + /* Runtime state (populated by process/rcon layer, not yaml) */ ProcessState state; pid_t pid; /* only for CONSOLE_STDIN */ int stdin_fd; /* only for CONSOLE_STDIN */ int stdout_fd; /* only for CONSOLE_STDIN */ RingBuffer *output; + + /* Log filter subprocess (populated at runtime if log_filter is set) */ + pid_t filter_pid; + int filter_in_fd; /* write end: daemon → filter stdin */ + int filter_out_fd; /* read end: filter stdout → daemon */ } Unit; /* ── Client: a connected socket client ───────────────────────────────────── */ diff --git a/src/unit.c b/src/unit.c index 429c2ce..b2cdd4c 100644 --- a/src/unit.c +++ b/src/unit.c @@ -93,8 +93,11 @@ typedef enum { int unit_load_file(const char *path, Unit *out) { memset(out, 0, sizeof(Unit)); out->state = STATE_STOPPED; - out->stdin_fd = -1; - out->stdout_fd = -1; + out->stdin_fd = -1; + out->stdout_fd = -1; + out->filter_pid = 0; + out->filter_in_fd = -1; + out->filter_out_fd = -1; out->health.timeout_ms = 5000; FILE *f = fopen(path, "r"); @@ -171,6 +174,8 @@ int unit_load_file(const char *path, Unit *out) { strncpy(out->service, val, MAX_NAME - 1); else if (strcmp(last_key, "broadcast_cmd") == 0) strncpy(out->broadcast_cmd, val, sizeof(out->broadcast_cmd) - 1); + else if (strcmp(last_key, "log_filter") == 0) + strncpy(out->log_filter, val, MAX_PATH - 1); break; case SECTION_CONSOLE: -- cgit v1.2.3