From aa73b0d955b603f623a191cc745b0173f230f02e Mon Sep 17 00:00:00 2001 From: auric Date: Sat, 21 Feb 2026 14:20:29 -0600 Subject: Use active probing for unit state in list command Previously, `umbrella-cli list` reported each unit's cached internal state, while `umbrella-cli status ` used probe_rcon_state() to actively verify RCON units via systemd and a live network probe. This caused the list to show stale or inconsistent state compared to status. Move probe_rcon_state() before cmd_list() and use it there so both commands share the same state determination logic. Co-Authored-By: Claude Sonnet 4.6 --- src/client.c | 44 ++++++++++++++++++++++---------------------- 1 file changed, 22 insertions(+), 22 deletions(-) diff --git a/src/client.c b/src/client.c index 034ea74..e1816f2 100644 --- a/src/client.c +++ b/src/client.c @@ -134,28 +134,6 @@ void client_broadcast_output(const char *unit_name, /* ── Command handlers ────────────────────────────────────────────────────── */ -static void cmd_list(Client *c) { - /* Build a JSON array of unit summaries */ - char buf[PROTO_MAX_MSG]; - int pos = 0; - - pos += snprintf(buf + pos, sizeof(buf) - pos, - "{\"type\":\"list\",\"units\":["); - - for (int i = 0; i < g.unit_count; i++) { - Unit *u = &g.units[i]; - if (i > 0) - pos += snprintf(buf + pos, sizeof(buf) - pos, ","); - pos += snprintf(buf + pos, sizeof(buf) - pos, - "{\"name\":\"%s\",\"display\":\"%s\"," - "\"state\":\"%s\"}", - u->name, u->display, unit_state_str(u->state)); - } - - snprintf(buf + pos, sizeof(buf) - pos, "]}"); - proto_send(c->fd, buf); -} - static const char *probe_rcon_state(Unit *u) { if (u->console.type != CONSOLE_RCON) return unit_state_str(u->state); @@ -177,6 +155,28 @@ static const char *probe_rcon_state(Unit *u) { return (r == 0) ? "running" : "unreachable"; } +static void cmd_list(Client *c) { + /* Build a JSON array of unit summaries, using active probing for state */ + char buf[PROTO_MAX_MSG]; + int pos = 0; + + pos += snprintf(buf + pos, sizeof(buf) - pos, + "{\"type\":\"list\",\"units\":["); + + for (int i = 0; i < g.unit_count; i++) { + Unit *u = &g.units[i]; + if (i > 0) + pos += snprintf(buf + pos, sizeof(buf) - pos, ","); + pos += snprintf(buf + pos, sizeof(buf) - pos, + "{\"name\":\"%s\",\"display\":\"%s\"," + "\"state\":\"%s\"}", + u->name, u->display, probe_rcon_state(u)); + } + + snprintf(buf + pos, sizeof(buf) - pos, "]}"); + proto_send(c->fd, buf); +} + static void cmd_status(Client *c, const char *unit_name) { Unit *u = unit_find(unit_name); if (!u) { proto_send_error(c->fd, "unit not found"); return; } -- cgit v1.2.3 From fc10d8a0818bb87001a64a72552ed28fe60931ee Mon Sep 17 00:00:00 2001 From: auric Date: Sat, 21 Feb 2026 14:59:07 -0600 Subject: Add A2S probing, sd-bus state, tail/broadcast, and bot audit log MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit State detection: - Add STATE_STOPPING to ProcessState enum - Replace system("systemctl is-active") with libsystemd sd-bus API for accurate starting/stopping/crashed state reporting; works for any unit type (RCON or STDIN) that declares a service: field - Implement real A2S_INFO UDP queries (src/console/a2s.c) for units with health.type = a2s (Valve games: TF2, GMod); differentiates running / hibernating (0 players) / changing_map (A2S down, RCON up) / unreachable; includes player count and map name in responses - Refactor probe_rcon_state() into probe_unit_state() returning a ProbeResult struct with state, players, max_players, map fields - status and list responses now include players/max_players/map fields New daemon commands: - tail : return ring buffer snapshot as a single response - broadcast : send broadcast_cmd-formatted message to all running units; works for both RCON and STDIN console types New YAML field: - broadcast_cmd: command template (e.g. "say {msg}") — opt-in per unit; units without it are skipped by broadcast CLI (umbrella-cli): - Add tail subcommand (non-interactive output snapshot) - Add broadcast subcommand - status shows Players and Map when available - list adds PLAYERS and MAP columns Bot (umbrella-bot): - Replace !attach / !detach with !tail (shows last 30 lines, no streaming) - Add !broadcast command - Write per-!cmd audit entries to /var/log/umbrella/bot-audit.log - !units and !status responses include player counts when available Co-Authored-By: Claude Sonnet 4.6 --- Makefile | 5 +- clients/umbrella-bot/umbrella-bot.py | 249 +++++++++------------------ clients/umbrella-cli/main.c | 145 +++++++++++++--- src/client.c | 325 +++++++++++++++++++++++++++++++---- src/console/a2s.c | 200 +++++++++++++++++++++ src/console/a2s.h | 27 +++ src/umbrella.h | 2 + src/unit.c | 3 + units/tf2-novemen.yaml.example | 1 + 9 files changed, 728 insertions(+), 229 deletions(-) create mode 100644 src/console/a2s.c create mode 100644 src/console/a2s.h diff --git a/Makefile b/Makefile index 7cfa4a1..e10a476 100644 --- a/Makefile +++ b/Makefile @@ -3,7 +3,7 @@ CC = gcc CFLAGS = -Wall -Wextra -Wpedantic -std=c11 -D_GNU_SOURCE \ -O2 -g -LDFLAGS = -lyaml +LDFLAGS = -lyaml -lsystemd DAEMON_SRCS = \ src/main.c \ @@ -13,7 +13,8 @@ DAEMON_SRCS = \ src/client.c \ src/log.c \ src/log_tail.c \ - src/console/rcon.c + src/console/rcon.c \ + src/console/a2s.c CLI_SRCS = clients/umbrella-cli/main.c diff --git a/clients/umbrella-bot/umbrella-bot.py b/clients/umbrella-bot/umbrella-bot.py index fed3bdb..6e26947 100755 --- a/clients/umbrella-bot/umbrella-bot.py +++ b/clients/umbrella-bot/umbrella-bot.py @@ -24,6 +24,7 @@ import struct import sys import logging import argparse +from datetime import datetime from pathlib import Path from typing import Optional @@ -52,18 +53,22 @@ log = logging.getLogger("umbrella-bot") DEFAULT_CONF = "/etc/umbrella/bot.conf" DEFAULT_STORE = "/etc/umbrella/bot-store" SOCK_PATH = "/run/umbrella/umbrella.sock" +AUDIT_LOG = "/var/log/umbrella/bot-audit.log" PROTO_MAX = 65536 +# Maximum lines to show in a !tail response +TAIL_MAX_LINES = 30 + HELP_TEXT = """\ Umbrella bot commands: !status — Show unit status !cmd — Send a console command + !tail — Show recent output from a unit !restart — Restart the unit's systemd service !update — Run the update action !action — Run a named action - !attach — Stream live log output to this room - !detach — Stop streaming log output + !broadcast — Send message to all running units !units — List all units !help — Show this message """ @@ -75,7 +80,6 @@ class UmbrellaClient: Synchronous umbrella socket client. We use a plain blocking socket here since commands are short-lived and we don't need async for the request/response pattern. - For attach (streaming), we use a separate async wrapper. """ def __init__(self, sock_path: str = SOCK_PATH): @@ -139,143 +143,27 @@ class UmbrellaClient: def run_action(self, unit: str, action: str) -> dict: return self.command({"cmd": "action", "unit": unit, "action": action}) + def tail(self, unit: str) -> dict: + return self.command({"cmd": "tail", "unit": unit}) -# ── Attach session: streams output from umbrella to a Matrix room ───────────── - -class AttachSession: - """ - Maintains a persistent connection to umbrella for a specific unit, - forwarding live output to a Matrix room. - """ + def broadcast(self, message: str) -> dict: + return self.command({"cmd": "broadcast", "message": message}) - def __init__(self, unit: str, room_id: str, - matrix_client: "AsyncClient", - sock_path: str = SOCK_PATH): - self.unit = unit - self.room_id = room_id - self.matrix_client = matrix_client - self.sock_path = sock_path - self._task: Optional[asyncio.Task] = None - self._sock: Optional[socket.socket] = None - - async def start(self): - if self._task and not self._task.done(): - return # already running - self._task = asyncio.create_task(self._stream()) - - async def stop(self): - if self._task: - self._task.cancel() - try: - await self._task - except asyncio.CancelledError: - pass - if self._sock: - try: - self._sock.close() - except Exception: - pass - self._sock = None - - async def _stream(self): - loop = asyncio.get_event_loop() - - def connect_and_attach(): - s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - s.settimeout(10) - s.connect(self.sock_path) - # Send attach command - msg = json.dumps({"cmd": "attach", "unit": self.unit}).encode() - s.sendall(struct.pack(">I", len(msg)) + msg) - # Read ok response - hdr = b"" - while len(hdr) < 4: - hdr += s.recv(4 - len(hdr)) - length = struct.unpack(">I", hdr)[0] - resp_data = b"" - while len(resp_data) < length: - resp_data += s.recv(length - len(resp_data)) - s.settimeout(None) # non-blocking from here - s.setblocking(False) - return s - try: - self._sock = await loop.run_in_executor(None, connect_and_attach) - except Exception as e: - log.error(f"attach: failed to connect for {self.unit}: {e}") - return +# ── Audit logging ───────────────────────────────────────────────────────────── - buf = b"" - log.info(f"attach: streaming {self.unit} to room {self.room_id}") - - try: - while True: - # Read available data from socket - try: - chunk = await loop.run_in_executor( - None, lambda: self._recv_one_nonblock() - ) - if chunk is None: - await asyncio.sleep(0.05) - continue - if chunk == {}: - break # disconnected - except asyncio.CancelledError: - raise - except Exception as e: - log.error(f"attach: read error for {self.unit}: {e}") - break - - if chunk.get("type") == "output": - data = chunk.get("data", "") - history = chunk.get("history", False) - if data and not history: - # Send to Matrix room, wrapped in code block - await self.matrix_client.room_send( - self.room_id, - "m.room.message", - { - "msgtype": "m.text", - "body": f"```\n{data.rstrip()}\n```", - "format": "org.matrix.custom.html", - "formatted_body": f"
{data.rstrip()}
", - }, - ignore_unverified_devices=True, - ) - except asyncio.CancelledError: - pass - finally: - if self._sock: - # Send detach - try: - msg = json.dumps({"cmd": "detach"}).encode() - self._sock.setblocking(True) - self._sock.sendall(struct.pack(">I", len(msg)) + msg) - except Exception: - pass - self._sock.close() - self._sock = None - - def _recv_one_nonblock(self) -> Optional[dict]: - """Try to read one message from the non-blocking socket. Returns None if no data yet.""" - try: - hdr = self._sock.recv(4) - if not hdr: - return {} # disconnected - if len(hdr) < 4: - return None - length = struct.unpack(">I", hdr)[0] - data = b"" - self._sock.setblocking(True) - while len(data) < length: - chunk = self._sock.recv(length - len(data)) - if not chunk: - return {} - data += chunk - self._sock.setblocking(False) - return json.loads(data.decode()) - except BlockingIOError: - return None +def audit_log(sender: str, unit: str, command: str) -> None: + """ + Append a line to the bot audit log for !cmd invocations. + Format: [YYYY-MM-DD HH:MM:SS] !cmd + """ + timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + line = f"[{timestamp}] {sender} !cmd {unit} {command}\n" + try: + with open(AUDIT_LOG, "a") as f: + f.write(line) + except OSError as e: + log.warning(f"Could not write audit log: {e}") # ── Power level check ───────────────────────────────────────────────────────── @@ -304,7 +192,6 @@ async def get_user_power_level(client: AsyncClient, async def handle_command( client: AsyncClient, umbrella: UmbrellaClient, - attach_sessions: dict, room: MatrixRoom, sender: str, text: str, @@ -334,7 +221,10 @@ async def handle_command( return "No units loaded." lines = ["**Units:**"] for u in units: - lines.append(f" `{u['name']}` — {u['display']} [{u['state']}]") + pl = u.get("players", -1) + mx = u.get("max_players", -1) + players = f" ({pl}/{mx})" if pl >= 0 and mx >= 0 else "" + lines.append(f" `{u['name']}` — {u['display']} [{u['state']}{players}]") return "\n".join(lines) elif cmd == "!status": @@ -344,10 +234,18 @@ async def handle_command( resp = umbrella.status(unit) if resp.get("type") == "error": return f"❌ {resp['message']}" - return ( - f"**{resp.get('display', unit)}**\n" - f"State: `{resp.get('state', 'unknown')}`" - ) + pl = resp.get("players", -1) + mx = resp.get("max_players", -1) + map_name = resp.get("map", "") + lines = [ + f"**{resp.get('display', unit)}**", + f"State: `{resp.get('state', 'unknown')}`", + ] + if isinstance(pl, int) and pl >= 0 and isinstance(mx, int) and mx >= 0: + lines.append(f"Players: {pl}/{mx}") + if map_name: + lines.append(f"Map: {map_name}") + return "\n".join(lines) elif cmd == "!cmd": if len(parts) < 3: @@ -357,8 +255,43 @@ async def handle_command( resp = umbrella.send_input(unit, command) if resp.get("type") == "error": return f"❌ {resp['message']}" + audit_log(sender, unit, command) return f"✓ Sent: `{command}`" + elif cmd == "!tail": + if len(parts) < 2: + return "Usage: `!tail `" + unit = parts[1] + resp = umbrella.tail(unit) + if resp.get("type") == "error": + return f"❌ {resp.get('message', 'unknown error')}" + data = resp.get("data", "") + if not data or not data.strip(): + return f"No output buffered for `{unit}`." + # Trim to last TAIL_MAX_LINES lines to avoid flooding the room + lines = data.splitlines() + if len(lines) > TAIL_MAX_LINES: + lines = lines[-TAIL_MAX_LINES:] + trimmed = f"(showing last {TAIL_MAX_LINES} lines)\n" + else: + trimmed = "" + body = trimmed + "\n".join(lines) + return f"```\n{body}\n```" + + elif cmd == "!broadcast": + if len(parts) < 2: + return "Usage: `!broadcast `" + message = " ".join(parts[1:]) + resp = umbrella.broadcast(message) + if resp.get("type") == "error": + return f"❌ {resp.get('message', 'unknown error')}" + sent = resp.get("sent", 0) + failed = resp.get("failed", 0) + result = f"✓ Broadcast sent to {sent} unit(s)" + if failed: + result += f", {failed} failed" + return result + elif cmd == "!restart": if len(parts) < 2: return "Usage: `!restart `" @@ -387,29 +320,6 @@ async def handle_command( return f"❌ {resp['message']}" return f"✓ Action `{action}` dispatched for `{unit}`" - elif cmd == "!attach": - if len(parts) < 2: - return "Usage: `!attach `" - unit = parts[1] - key = (room.room_id, unit) - if key in attach_sessions: - return f"Already streaming `{unit}` to this room." - session = AttachSession(unit, room.room_id, client) - attach_sessions[key] = session - await session.start() - return f"📡 Streaming `{unit}` output to this room. Use `!detach {unit}` to stop." - - elif cmd == "!detach": - if len(parts) < 2: - return "Usage: `!detach `" - unit = parts[1] - key = (room.room_id, unit) - if key not in attach_sessions: - return f"`{unit}` is not being streamed to this room." - await attach_sessions[key].stop() - del attach_sessions[key] - return f"⏹ Stopped streaming `{unit}`." - except RuntimeError as e: return f"❌ Umbrella error: {e}" except Exception as e: @@ -441,7 +351,6 @@ async def run_bot(config: dict): ) umbrella = UmbrellaClient(config.get("socket_path", SOCK_PATH)) - attach_sessions = {} room_id = config["room_id"] # Login @@ -478,7 +387,7 @@ async def run_bot(config: dict): log.info(f"Command from {event.sender}: {body}") reply = await handle_command( - client, umbrella, attach_sessions, + client, umbrella, room, event.sender, body, config ) if reply: @@ -525,7 +434,7 @@ async def run_bot(config: dict): await client.keys_upload() if client.should_query_keys: await client.keys_query() - + sync_task = asyncio.create_task(sync_loop()) await stop_event.wait() @@ -536,10 +445,6 @@ async def run_bot(config: dict): except asyncio.CancelledError: pass - # Stop all attach sessions - for session in attach_sessions.values(): - await session.stop() - await client.close() log.info("Bot stopped.") diff --git a/clients/umbrella-cli/main.c b/clients/umbrella-cli/main.c index 69357e4..8562e92 100644 --- a/clients/umbrella-cli/main.c +++ b/clients/umbrella-cli/main.c @@ -4,9 +4,11 @@ * Usage: * umbrella-cli list * umbrella-cli status - * umbrella-cli attach # interactive console session - * umbrella-cli input # send a single command + * umbrella-cli tail # print recent output, then exit + * umbrella-cli attach # interactive console session + * umbrella-cli input # send a single command * umbrella-cli action + * umbrella-cli broadcast */ #include @@ -124,30 +126,44 @@ static int cmd_list(int fd) { char buf[PROTO_MAX]; if (recv_msg(fd, buf, sizeof(buf)) <= 0) return 1; - /* Simple display: find all "name"/"display"/"state" triplets */ - printf("%-24s %-32s %s\n", "NAME", "DISPLAY", "STATE"); - printf("%-24s %-32s %s\n", + printf("%-24s %-32s %-14s %-8s %s\n", + "NAME", "DISPLAY", "STATE", "PLAYERS", "MAP"); + printf("%-24s %-32s %-14s %-8s %s\n", "────────────────────────", "────────────────────────────────", - "───────"); + "──────────────", + "────────", + "───────────────────"); - /* Walk the units array manually */ const char *p = buf; while ((p = strstr(p, "\"name\":")) != NULL) { char name[64] = {0}, display[128] = {0}, state[32] = {0}; + char players_s[16] = {0}, max_players_s[16] = {0}, map[64] = {0}; - /* Extract from this position forward */ char snippet[512]; strncpy(snippet, p, sizeof(snippet) - 1); - jget(snippet, "name", name, sizeof(name)); - jget(snippet, "display", display, sizeof(display)); - jget(snippet, "state", state, sizeof(state)); + jget(snippet, "name", name, sizeof(name)); + jget(snippet, "display", display, sizeof(display)); + jget(snippet, "state", state, sizeof(state)); + jget(snippet, "players", players_s, sizeof(players_s)); + jget(snippet, "max_players", max_players_s, sizeof(max_players_s)); + jget(snippet, "map", map, sizeof(map)); - if (name[0]) - printf("%-24s %-32s %s\n", name, display, state); + if (!name[0]) { p++; continue; } - p++; /* advance past current match */ + /* Format player count as "N/M" or "-" */ + char players_fmt[16] = "-"; + int pl = players_s[0] ? atoi(players_s) : -1; + int mx = max_players_s[0] ? atoi(max_players_s) : -1; + if (pl >= 0 && mx >= 0) + snprintf(players_fmt, sizeof(players_fmt), "%d/%d", pl, mx); + + printf("%-24s %-32s %-14s %-8s %s\n", + name, display, state, players_fmt, + map[0] ? map : "-"); + + p++; } return 0; } @@ -171,15 +187,88 @@ static int cmd_status(int fd, const char *unit) { } char display[128] = {0}, state[32] = {0}, pid[16] = {0}; - jget(buf, "display", display, sizeof(display)); - jget(buf, "state", state, sizeof(state)); - jget(buf, "pid", pid, sizeof(pid)); + char players_s[16] = {0}, max_players_s[16] = {0}, map[64] = {0}; + jget(buf, "display", display, sizeof(display)); + jget(buf, "state", state, sizeof(state)); + jget(buf, "pid", pid, sizeof(pid)); + jget(buf, "players", players_s, sizeof(players_s)); + jget(buf, "max_players", max_players_s, sizeof(max_players_s)); + jget(buf, "map", map, sizeof(map)); printf("Unit : %s\n", unit); printf("Display : %s\n", display); printf("State : %s\n", state); if (pid[0] && strcmp(pid, "0") != 0) printf("PID : %s\n", pid); + + int pl = players_s[0] ? atoi(players_s) : -1; + int mx = max_players_s[0] ? atoi(max_players_s) : -1; + if (pl >= 0 && mx >= 0) + printf("Players : %d/%d\n", pl, mx); + if (map[0]) + printf("Map : %s\n", map); + + return 0; +} + +static int cmd_tail(int fd, const char *unit) { + char msg[256]; + snprintf(msg, sizeof(msg), + "{\"cmd\":\"tail\",\"unit\":\"%s\"}", unit); + if (send_msg(fd, msg) != 0) return 1; + + char buf[PROTO_MAX]; + if (recv_msg(fd, buf, sizeof(buf)) <= 0) return 1; + + char type[32] = {0}; + jget(buf, "type", type, sizeof(type)); + if (strcmp(type, "error") == 0) { + char errmsg[256] = {0}; + jget(buf, "message", errmsg, sizeof(errmsg)); + fprintf(stderr, "Error: %s\n", errmsg); + return 1; + } + + char data[PROTO_MAX] = {0}; + jget(buf, "data", data, sizeof(data)); + if (data[0]) + printf("%s", data); + return 0; +} + +static int cmd_broadcast(int fd, const char *message) { + char msg[1024]; + /* Simple JSON escaping for the message */ + char escaped[900] = {0}; + int j = 0; + for (int i = 0; message[i] && j < (int)sizeof(escaped) - 2; i++) { + if (message[i] == '"') { escaped[j++] = '\\'; escaped[j++] = '"'; } + else if (message[i] == '\\') { escaped[j++] = '\\'; escaped[j++] = '\\'; } + else escaped[j++] = message[i]; + } + escaped[j] = '\0'; + + snprintf(msg, sizeof(msg), "{\"cmd\":\"broadcast\",\"message\":\"%s\"}", escaped); + if (send_msg(fd, msg) != 0) return 1; + + char buf[PROTO_MAX]; + if (recv_msg(fd, buf, sizeof(buf)) <= 0) return 1; + + char type[32] = {0}; + jget(buf, "type", type, sizeof(type)); + if (strcmp(type, "error") == 0) { + char errmsg[256] = {0}; + jget(buf, "message", errmsg, sizeof(errmsg)); + fprintf(stderr, "Error: %s\n", errmsg); + return 1; + } + + char sent_s[16] = {0}, failed_s[16] = {0}; + jget(buf, "sent", sent_s, sizeof(sent_s)); + jget(buf, "failed", failed_s, sizeof(failed_s)); + printf("Broadcast sent to %s unit(s), %s failed\n", + sent_s[0] ? sent_s : "0", + failed_s[0] ? failed_s : "0"); return 0; } @@ -363,17 +452,21 @@ static void usage(void) { "Usage: umbrella-cli [args]\n" "\n" "Commands:\n" - " list List all units\n" - " status Show unit status\n" - " attach Interactive console session\n" - " input Send a single command\n" - " action Run a named action\n" + " list List all units\n" + " status Show unit status\n" + " tail Print recent output and exit\n" + " attach Interactive console session\n" + " input Send a single command\n" + " action Run a named action\n" + " broadcast Send message to all running units\n" "\n" "Examples:\n" " umbrella-cli list\n" + " umbrella-cli tail tf2-novemen\n" " umbrella-cli attach tf2-novemen\n" " umbrella-cli input tf2-novemen \"say Server restarting soon\"\n" - " umbrella-cli action tf2-novemen update\n"); + " umbrella-cli action tf2-novemen update\n" + " umbrella-cli broadcast \"Server restart in 5 minutes\"\n"); } int main(int argc, char *argv[]) { @@ -390,6 +483,9 @@ int main(int argc, char *argv[]) { } else if (strcmp(cmd, "status") == 0) { if (argc < 3) { fprintf(stderr, "status: need unit name\n"); ret = 1; } else ret = cmd_status(fd, argv[2]); + } else if (strcmp(cmd, "tail") == 0) { + if (argc < 3) { fprintf(stderr, "tail: need unit name\n"); ret = 1; } + else ret = cmd_tail(fd, argv[2]); } else if (strcmp(cmd, "attach") == 0) { if (argc < 3) { fprintf(stderr, "attach: need unit name\n"); ret = 1; } else ret = cmd_attach(fd, argv[2]); @@ -399,6 +495,9 @@ int main(int argc, char *argv[]) { } else if (strcmp(cmd, "action") == 0) { if (argc < 4) { fprintf(stderr, "action: need unit and action name\n"); ret = 1; } else ret = cmd_action(fd, argv[2], argv[3]); + } else if (strcmp(cmd, "broadcast") == 0) { + if (argc < 3) { fprintf(stderr, "broadcast: need message\n"); ret = 1; } + else ret = cmd_broadcast(fd, argv[2]); } else { fprintf(stderr, "Unknown command: %s\n\n", cmd); usage(); diff --git a/src/client.c b/src/client.c index e1816f2..53b8e53 100644 --- a/src/client.c +++ b/src/client.c @@ -4,6 +4,7 @@ #include "log.h" #include "umbrella.h" #include "console/rcon.h" +#include "console/a2s.h" #include #include @@ -15,6 +16,7 @@ #include #include #include +#include /* ── Socket setup ────────────────────────────────────────────────────────── */ @@ -134,29 +136,174 @@ void client_broadcast_output(const char *unit_name, /* ── Command handlers ────────────────────────────────────────────────────── */ -static const char *probe_rcon_state(Unit *u) { - if (u->console.type != CONSOLE_RCON) - return unit_state_str(u->state); +/* + * systemd_active_state: query the ActiveState of a systemd unit via D-Bus. + * + * Returns a pointer to a static string: "active", "activating", + * "deactivating", "inactive", "failed", or "unknown" on error. + * + * Only called when u->service[0] is set. Works for any unit type + * (RCON or STDIN) that declares a systemd service. + */ +static const char *systemd_active_state(const char *service_name) +{ + sd_bus *bus = NULL; + sd_bus_message *reply = NULL; + sd_bus_error error = SD_BUS_ERROR_NULL; + const char *obj_path = NULL; + char *state_str = NULL; + const char *result = "unknown"; + + if (sd_bus_open_system(&bus) < 0) + goto out; + + /* GetUnit(service_name) → object path */ + if (sd_bus_call_method( + bus, + "org.freedesktop.systemd1", + "/org/freedesktop/systemd1", + "org.freedesktop.systemd1.Manager", + "GetUnit", + &error, &reply, + "s", service_name) < 0) + goto out; + + if (sd_bus_message_read(reply, "o", &obj_path) < 0 || !obj_path) + goto out; + + /* Query ActiveState property on the unit object */ + if (sd_bus_get_property_string( + bus, + "org.freedesktop.systemd1", + obj_path, + "org.freedesktop.systemd1.Unit", + "ActiveState", + &error, &state_str) < 0) + goto out; + + /* Map to one of our known strings */ + if (strcmp(state_str, "active") == 0) result = "active"; + else if (strcmp(state_str, "activating") == 0) result = "activating"; + else if (strcmp(state_str, "deactivating") == 0) result = "deactivating"; + else if (strcmp(state_str, "inactive") == 0) result = "inactive"; + else if (strcmp(state_str, "failed") == 0) result = "failed"; + +out: + free(state_str); + sd_bus_message_unref(reply); + sd_bus_error_free(&error); + sd_bus_unref(bus); + return result; +} + +/* + * ProbeResult: returned by probe_unit_state(). + * Carries richer information than a plain state string. + */ +typedef struct { + const char *state; /* pointer to static string */ + int players; /* -1 = not available (non-A2S unit) */ + int max_players; /* -1 = not available */ + char map[64]; /* empty = not available */ +} ProbeResult; + +/* + * probe_unit_state: determine current state of a unit using all available + * probing methods, honouring per-unit configuration. + * + * Decision tree: + * + * STDIN unit: + * → {state = internal process state} (tracked directly by daemon) + * + * Any unit with service[0] set → sd-bus D-Bus: + * "activating" → {state="starting"} + * "deactivating" → {state="stopping"} + * "inactive" → {state="stopped"} + * "failed" → {state="crashed"} + * "active" → continue to application-level probe + * + * RCON unit, health.type == HEALTH_A2S → A2S UDP query: + * success, players == 0 → {state="hibernating", players, max_players, map} + * success, players > 0 → {state="running", players, max_players, map} + * failure → RCON echo probe: + * success → {state="changing_map"} + * failure → {state="unreachable"} + * + * RCON unit, health.type == HEALTH_NONE → RCON echo probe: + * success → {state="running"} + * failure → {state="unreachable"} + */ +static ProbeResult probe_unit_state(Unit *u) +{ + ProbeResult r = { .state = "unknown", .players = -1, .max_players = -1 }; + r.map[0] = '\0'; + + /* STDIN units: state is maintained directly by the process layer */ + if (u->console.type != CONSOLE_RCON) { + r.state = unit_state_str(u->state); + return r; + } - /* Check systemd service state first -- cheap, no network */ + /* For any unit (RCON or STDIN) with a systemd service, check D-Bus first */ if (u->service[0]) { - char cmd[256]; - snprintf(cmd, sizeof(cmd), - "systemctl is-active --quiet %s 2>/dev/null", u->service); - if (system(cmd) != 0) - return "stopped"; + const char *sd = systemd_active_state(u->service); + if (strcmp(sd, "activating") == 0) { r.state = "starting"; return r; } + else if (strcmp(sd, "deactivating") == 0) { r.state = "stopping"; return r; } + else if (strcmp(sd, "inactive") == 0) { r.state = "stopped"; return r; } + else if (strcmp(sd, "failed") == 0) { r.state = "crashed"; return r; } + /* "active" or "unknown" → fall through to application probe */ } - /* Service is active -- probe RCON to confirm reachability */ + /* A2S health probe (Valve games: TF2, GMod, …) */ + if (u->health.type == HEALTH_A2S) { + const char *h_host = u->health.host[0] ? u->health.host : u->console.host; + uint16_t h_port = u->health.port ? u->health.port : u->console.port; + int h_tmo = u->health.timeout_ms > 0 ? u->health.timeout_ms : 5000; + + A2SInfo info; + if (a2s_query(h_host, h_port, h_tmo, &info) == 0) { + r.players = info.players; + r.max_players = info.max_players; + strncpy(r.map, info.map, sizeof(r.map) - 1); + r.state = (r.players == 0) ? "hibernating" : "running"; + return r; + } + + /* A2S failed — try RCON to distinguish changing_map from unreachable */ + char response[64] = {0}; + int ok = rcon_exec(u->console.host, u->console.port, + u->console.password, "echo umbrella_probe", + response, sizeof(response)); + r.state = (ok == 0) ? "changing_map" : "unreachable"; + return r; + } + + /* Default: plain RCON echo probe */ char response[64] = {0}; - int r = rcon_exec(u->console.host, u->console.port, - u->console.password, "echo umbrella_probe", - response, sizeof(response)); - return (r == 0) ? "running" : "unreachable"; + int ok = rcon_exec(u->console.host, u->console.port, + u->console.password, "echo umbrella_probe", + response, sizeof(response)); + r.state = (ok == 0) ? "running" : "unreachable"; + return r; +} + +/* ── JSON helpers ────────────────────────────────────────────────────────── */ + +/* Append probe result's optional fields to an in-progress JSON buffer. + * Call after the last fixed field; this adds players/max_players/map. */ +static int append_probe_fields(char *buf, int pos, int size, + const ProbeResult *r) +{ + pos += snprintf(buf + pos, size - pos, + ",\"players\":%d,\"max_players\":%d,\"map\":\"%s\"", + r->players, r->max_players, r->map); + return pos; } +/* ── cmd_list ─────────────────────────────────────────────────────────────── */ + static void cmd_list(Client *c) { - /* Build a JSON array of unit summaries, using active probing for state */ char buf[PROTO_MAX_MSG]; int pos = 0; @@ -165,31 +312,139 @@ static void cmd_list(Client *c) { for (int i = 0; i < g.unit_count; i++) { Unit *u = &g.units[i]; + ProbeResult r = probe_unit_state(u); + if (i > 0) pos += snprintf(buf + pos, sizeof(buf) - pos, ","); + pos += snprintf(buf + pos, sizeof(buf) - pos, - "{\"name\":\"%s\",\"display\":\"%s\"," - "\"state\":\"%s\"}", - u->name, u->display, probe_rcon_state(u)); + "{\"name\":\"%s\",\"display\":\"%s\",\"state\":\"%s\"", + u->name, u->display, r.state); + pos = append_probe_fields(buf, pos, sizeof(buf), &r); + pos += snprintf(buf + pos, sizeof(buf) - pos, "}"); } snprintf(buf + pos, sizeof(buf) - pos, "]}"); proto_send(c->fd, buf); } +/* ── cmd_status ──────────────────────────────────────────────────────────── */ + static void cmd_status(Client *c, const char *unit_name) { Unit *u = unit_find(unit_name); if (!u) { proto_send_error(c->fd, "unit not found"); return; } - const char *state = probe_rcon_state(u); + ProbeResult r = probe_unit_state(u); char buf[512]; + int pos = 0; + pos += snprintf(buf + pos, sizeof(buf) - pos, + "{\"type\":\"status\",\"unit\":\"%s\",\"display\":\"%s\"," + "\"state\":\"%s\",\"pid\":%d", + u->name, u->display, r.state, (int)u->pid); + pos = append_probe_fields(buf, pos, sizeof(buf), &r); + snprintf(buf + pos, sizeof(buf) - pos, "}"); + proto_send(c->fd, buf); +} + +/* ── cmd_tail ────────────────────────────────────────────────────────────── */ + +static void cmd_tail(Client *c, const char *unit_name) { + Unit *u = unit_find(unit_name); + if (!u) { proto_send_error(c->fd, "unit not found"); return; } + + /* Concatenate all ring buffer lines into a single payload */ + char data[PROTO_MAX_MSG - 256]; + int dpos = 0; + + RingBuffer *rb = u->output; + if (rb && rb->count > 0) { + int start = (rb->head - rb->count + RING_BUF_LINES) % RING_BUF_LINES; + for (int i = 0; i < rb->count && dpos < (int)sizeof(data) - 2; i++) { + int idx = (start + i) % RING_BUF_LINES; + const char *line = rb->lines[idx]; + int len = strlen(line); + if (dpos + len + 1 >= (int)sizeof(data)) + break; + memcpy(data + dpos, line, len); + dpos += len; + /* Ensure each line ends with a newline */ + if (dpos > 0 && data[dpos - 1] != '\n') + data[dpos++] = '\n'; + } + } + data[dpos] = '\0'; + + /* Build response — use proto_send_output with history flag */ + proto_send_output(c->fd, unit_name, data, 1); +} + +/* ── cmd_broadcast ───────────────────────────────────────────────────────── */ + +static void cmd_broadcast(Client *c, const char *message) { + int sent = 0, failed = 0; + + for (int i = 0; i < g.unit_count; i++) { + Unit *u = &g.units[i]; + + /* Skip units without a broadcast command configured */ + if (!u->broadcast_cmd[0]) + continue; + + /* Only broadcast to units the daemon considers running */ + if (u->state != STATE_RUNNING) + continue; + + /* Substitute {msg} with the message in the broadcast command */ + char cmd[256]; + const char *tmpl = u->broadcast_cmd; + const char *placeholder = strstr(tmpl, "{msg}"); + if (placeholder) { + int prefix_len = (int)(placeholder - tmpl); + snprintf(cmd, sizeof(cmd), "%.*s%s%s", + prefix_len, tmpl, + message, + placeholder + 5 /* strlen("{msg}") */); + } else { + /* No placeholder — append message after a space */ + snprintf(cmd, sizeof(cmd), "%s %s", tmpl, message); + } + + int ok = 0; + if (u->console.type == CONSOLE_RCON) { + char response[256] = {0}; + ok = (rcon_exec(u->console.host, u->console.port, + u->console.password, cmd, + response, sizeof(response)) == 0); + } else if (u->console.type == CONSOLE_STDIN) { + if (u->stdin_fd >= 0) { + /* Append newline so the server processes the command */ + size_t clen = strlen(cmd); + if (clen < sizeof(cmd) - 1) { + cmd[clen] = '\n'; + cmd[clen + 1] = '\0'; + ok = (write(u->stdin_fd, cmd, clen + 1) > 0); + } + } + } + + if (ok) { + log_info("broadcast: sent to %s: %s", u->name, cmd); + sent++; + } else { + log_warn("broadcast: failed for %s", u->name); + failed++; + } + } + + char buf[128]; snprintf(buf, sizeof(buf), - "{\"type\":\"status\",\"unit\":\"%s\",\"display\":\"%s\"," - "\"state\":\"%s\",\"pid\":%d}", - u->name, u->display, state, (int)u->pid); + "{\"type\":\"ok\",\"sent\":%d,\"failed\":%d}", sent, failed); proto_send(c->fd, buf); } + +/* ── cmd_attach ──────────────────────────────────────────────────────────── */ + static void cmd_attach(Client *c, const char *unit_name) { Unit *u = unit_find(unit_name); if (!u) { proto_send_error(c->fd, "unit not found"); return; } @@ -301,19 +556,25 @@ int client_handle(int fd) { char action[MAX_NAME] = {0}; char data[RING_BUF_LINE_MAX] = {0}; - json_get_str(buf, "cmd", cmd, sizeof(cmd)); - json_get_str(buf, "unit", unit, sizeof(unit)); - json_get_str(buf, "action", action, sizeof(action)); - json_get_str(buf, "data", data, sizeof(data)); + json_get_str(buf, "cmd", cmd, sizeof(cmd)); + json_get_str(buf, "unit", unit, sizeof(unit)); + json_get_str(buf, "action", action, sizeof(action)); + json_get_str(buf, "data", data, sizeof(data)); + + /* For broadcast, "message" maps to the data field */ + if (strcmp(cmd, "broadcast") == 0) + json_get_str(buf, "message", data, sizeof(data)); log_debug("Client fd=%d cmd=%s unit=%s", fd, cmd, unit); - if (strcmp(cmd, "list") == 0) cmd_list(c); - else if (strcmp(cmd, "status") == 0) cmd_status(c, unit); - else if (strcmp(cmd, "attach") == 0) cmd_attach(c, unit); - else if (strcmp(cmd, "detach") == 0) cmd_detach(c); - else if (strcmp(cmd, "input") == 0) cmd_input(c, unit, data); - else if (strcmp(cmd, "action") == 0) cmd_action(c, unit, action); + if (strcmp(cmd, "list") == 0) cmd_list(c); + else if (strcmp(cmd, "status") == 0) cmd_status(c, unit); + else if (strcmp(cmd, "tail") == 0) cmd_tail(c, unit); + else if (strcmp(cmd, "broadcast") == 0) cmd_broadcast(c, data); + else if (strcmp(cmd, "attach") == 0) cmd_attach(c, unit); + else if (strcmp(cmd, "detach") == 0) cmd_detach(c); + else if (strcmp(cmd, "input") == 0) cmd_input(c, unit, data); + else if (strcmp(cmd, "action") == 0) cmd_action(c, unit, action); else proto_send_error(fd, "unknown command"); return 0; diff --git a/src/console/a2s.c b/src/console/a2s.c new file mode 100644 index 0000000..96e556b --- /dev/null +++ b/src/console/a2s.c @@ -0,0 +1,200 @@ +#include "a2s.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +/* + * A2S_INFO — Valve Source Engine server query protocol (UDP). + * + * Reference: https://developer.valvesoftware.com/wiki/Server_queries + * + * Wire protocol (modern Source, post-Orange Box): + * + * Request: + * FF FF FF FF 54 "Source Engine Query" 00 + * + * Some servers issue a challenge before responding: + * FF FF FF FF 41 <4-byte challenge> + * In that case resend the request with the challenge appended. + * + * Response (type 0x49 = 'I'): + * FF FF FF FF 49 + * protocol + * name + * map ← we want this + * folder + * game + * appid + * players ← we want this + * max_players ← we want this + * ... (remaining fields not parsed) + */ + +/* ── helpers ─────────────────────────────────────────────────────────────── */ + +/* Read a null-terminated string from buf starting at *pos. + * Advances *pos past the null byte. + * Returns 0 on success, -1 if the buffer would be overrun. */ +static int read_string(const uint8_t *buf, int buf_len, + int *pos, char *out, int out_size) +{ + int start = *pos; + while (*pos < buf_len && buf[*pos] != '\0') + (*pos)++; + if (*pos >= buf_len) + return -1; /* no null terminator found */ + + int len = *pos - start; + if (len >= out_size) + len = out_size - 1; + memcpy(out, buf + start, len); + out[len] = '\0'; + + (*pos)++; /* skip null byte */ + return 0; +} + +/* ── a2s_query ───────────────────────────────────────────────────────────── */ + +int a2s_query(const char *host, uint16_t port, int timeout_ms, A2SInfo *out) +{ + memset(out, 0, sizeof(*out)); + out->players = -1; + out->max_players = -1; + + /* Resolve host */ + struct addrinfo hints, *res, *rp; + memset(&hints, 0, sizeof(hints)); + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_DGRAM; + + char port_str[8]; + snprintf(port_str, sizeof(port_str), "%u", port); + + if (getaddrinfo(host, port_str, &hints, &res) != 0) + return -1; + + int fd = -1; + for (rp = res; rp; rp = rp->ai_next) { + fd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol); + if (fd < 0) continue; + if (connect(fd, rp->ai_addr, rp->ai_addrlen) == 0) break; + close(fd); + fd = -1; + } + freeaddrinfo(res); + if (fd < 0) return -1; + + /* Set receive timeout */ + struct timeval tv; + tv.tv_sec = timeout_ms / 1000; + tv.tv_usec = (timeout_ms % 1000) * 1000; + setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)); + setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv)); + + /* Build initial A2S_INFO request */ + static const uint8_t a2s_prefix[] = { + 0xFF, 0xFF, 0xFF, 0xFF, 0x54 + }; + static const char a2s_payload[] = "Source Engine Query"; + /* Full request: prefix + payload + null byte */ + uint8_t req[32]; + int req_len = 0; + memcpy(req + req_len, a2s_prefix, sizeof(a2s_prefix)); + req_len += sizeof(a2s_prefix); + memcpy(req + req_len, a2s_payload, sizeof(a2s_payload)); /* includes \0 */ + req_len += sizeof(a2s_payload); + + uint8_t resp[2048]; + int resp_len; + int attempts = 0; + +retry: + if (attempts++ >= 2) { + close(fd); + return -1; + } + + if (send(fd, req, req_len, 0) != req_len) { + close(fd); + return -1; + } + + resp_len = recv(fd, resp, sizeof(resp), 0); + if (resp_len < 5) { + close(fd); + return -1; + } + + /* Check for challenge response: FF FF FF FF 41 <4 bytes> */ + if (resp_len >= 9 && + resp[0] == 0xFF && resp[1] == 0xFF && + resp[2] == 0xFF && resp[3] == 0xFF && + resp[4] == 0x41) + { + /* Append the 4-byte challenge to the request and resend */ + if (req_len + 4 > (int)sizeof(req)) { + close(fd); + return -1; + } + memcpy(req + req_len, resp + 5, 4); + req_len += 4; + goto retry; + } + + /* Expect response type 0x49 ('I') */ + if (resp[0] != 0xFF || resp[1] != 0xFF || + resp[2] != 0xFF || resp[3] != 0xFF || + resp[4] != 0x49) + { + close(fd); + return -1; + } + + close(fd); + + /* Parse response body starting after the 5-byte header */ + int pos = 5; + + /* Skip protocol version (1 byte) */ + if (pos + 1 > resp_len) return -1; + pos++; + + /* Skip name string */ + char scratch[256]; + if (read_string(resp, resp_len, &pos, scratch, sizeof(scratch)) != 0) + return -1; + + /* Map string — this is what we want */ + if (read_string(resp, resp_len, &pos, out->map, sizeof(out->map)) != 0) + return -1; + + /* Skip folder string */ + if (read_string(resp, resp_len, &pos, scratch, sizeof(scratch)) != 0) + return -1; + + /* Skip game string */ + if (read_string(resp, resp_len, &pos, scratch, sizeof(scratch)) != 0) + return -1; + + /* Skip app ID (2 bytes) */ + if (pos + 2 > resp_len) return -1; + pos += 2; + + /* Players (1 byte) */ + if (pos + 1 > resp_len) return -1; + out->players = resp[pos++]; + + /* Max players (1 byte) */ + if (pos + 1 > resp_len) return -1; + out->max_players = resp[pos++]; + + return 0; +} diff --git a/src/console/a2s.h b/src/console/a2s.h new file mode 100644 index 0000000..c96e445 --- /dev/null +++ b/src/console/a2s.h @@ -0,0 +1,27 @@ +#ifndef A2S_H +#define A2S_H + +#include + +/* + * A2S_INFO query (Valve Source engine query protocol). + * Retrieves server information over UDP. + * + * Only used for units with health.type == HEALTH_A2S. + */ + +typedef struct { + int players; /* current player count */ + int max_players; /* server player limit */ + char map[64]; /* current map name */ +} A2SInfo; + +/* + * a2s_query: Send an A2S_INFO request and parse the response. + * + * Returns 0 on success with *out populated. + * Returns -1 on timeout, parse error, or any network failure. + */ +int a2s_query(const char *host, uint16_t port, int timeout_ms, A2SInfo *out); + +#endif /* A2S_H */ diff --git a/src/umbrella.h b/src/umbrella.h index e44d626..2e9d440 100644 --- a/src/umbrella.h +++ b/src/umbrella.h @@ -45,6 +45,7 @@ typedef enum { STATE_STARTING = 1, STATE_RUNNING = 2, STATE_CRASHED = 3, + STATE_STOPPING = 4, } ProcessState; /* ── Action: a named script ──────────────────────────────────────────────── */ @@ -82,6 +83,7 @@ typedef struct { char name[MAX_NAME]; char display[MAX_DISPLAY]; char service[MAX_NAME]; /* systemd unit name, informational */ + char broadcast_cmd[128]; /* command template for !broadcast, e.g. "say {msg}" */ ConsoleConfig console; HealthConfig health; char log_paths[4][MAX_PATH]; diff --git a/src/unit.c b/src/unit.c index 3dd9d26..5baea20 100644 --- a/src/unit.c +++ b/src/unit.c @@ -41,6 +41,7 @@ const char *unit_state_str(ProcessState state) { case STATE_STARTING: return "starting"; case STATE_RUNNING: return "running"; case STATE_CRASHED: return "crashed"; + case STATE_STOPPING: return "stopping"; default: return "unknown"; } } @@ -160,6 +161,8 @@ int unit_load_file(const char *path, Unit *out) { strncpy(out->display, val, MAX_DISPLAY - 1); else if (strcmp(last_key, "service") == 0) strncpy(out->service, val, MAX_NAME - 1); + else if (strcmp(last_key, "broadcast_cmd") == 0) + strncpy(out->broadcast_cmd, val, sizeof(out->broadcast_cmd) - 1); break; case SECTION_CONSOLE: diff --git a/units/tf2-novemen.yaml.example b/units/tf2-novemen.yaml.example index 76985ba..22ac274 100644 --- a/units/tf2-novemen.yaml.example +++ b/units/tf2-novemen.yaml.example @@ -4,6 +4,7 @@ name: tf2-novemen display: "TF2 — novemen" service: tf2-server.service +broadcast_cmd: "say {msg}" # sent to all units on !broadcast console: type: rcon -- cgit v1.2.3