From fc10d8a0818bb87001a64a72552ed28fe60931ee Mon Sep 17 00:00:00 2001 From: auric Date: Sat, 21 Feb 2026 14:59:07 -0600 Subject: Add A2S probing, sd-bus state, tail/broadcast, and bot audit log MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit State detection: - Add STATE_STOPPING to ProcessState enum - Replace system("systemctl is-active") with libsystemd sd-bus API for accurate starting/stopping/crashed state reporting; works for any unit type (RCON or STDIN) that declares a service: field - Implement real A2S_INFO UDP queries (src/console/a2s.c) for units with health.type = a2s (Valve games: TF2, GMod); differentiates running / hibernating (0 players) / changing_map (A2S down, RCON up) / unreachable; includes player count and map name in responses - Refactor probe_rcon_state() into probe_unit_state() returning a ProbeResult struct with state, players, max_players, map fields - status and list responses now include players/max_players/map fields New daemon commands: - tail : return ring buffer snapshot as a single response - broadcast : send broadcast_cmd-formatted message to all running units; works for both RCON and STDIN console types New YAML field: - broadcast_cmd: command template (e.g. "say {msg}") — opt-in per unit; units without it are skipped by broadcast CLI (umbrella-cli): - Add tail subcommand (non-interactive output snapshot) - Add broadcast subcommand - status shows Players and Map when available - list adds PLAYERS and MAP columns Bot (umbrella-bot): - Replace !attach / !detach with !tail (shows last 30 lines, no streaming) - Add !broadcast command - Write per-!cmd audit entries to /var/log/umbrella/bot-audit.log - !units and !status responses include player counts when available Co-Authored-By: Claude Sonnet 4.6 --- clients/umbrella-bot/umbrella-bot.py | 249 +++++++++++------------------------ 1 file changed, 77 insertions(+), 172 deletions(-) (limited to 'clients/umbrella-bot/umbrella-bot.py') diff --git a/clients/umbrella-bot/umbrella-bot.py b/clients/umbrella-bot/umbrella-bot.py index fed3bdb..6e26947 100755 --- a/clients/umbrella-bot/umbrella-bot.py +++ b/clients/umbrella-bot/umbrella-bot.py @@ -24,6 +24,7 @@ import struct import sys import logging import argparse +from datetime import datetime from pathlib import Path from typing import Optional @@ -52,18 +53,22 @@ log = logging.getLogger("umbrella-bot") DEFAULT_CONF = "/etc/umbrella/bot.conf" DEFAULT_STORE = "/etc/umbrella/bot-store" SOCK_PATH = "/run/umbrella/umbrella.sock" +AUDIT_LOG = "/var/log/umbrella/bot-audit.log" PROTO_MAX = 65536 +# Maximum lines to show in a !tail response +TAIL_MAX_LINES = 30 + HELP_TEXT = """\ Umbrella bot commands: !status — Show unit status !cmd — Send a console command + !tail — Show recent output from a unit !restart — Restart the unit's systemd service !update — Run the update action !action — Run a named action - !attach — Stream live log output to this room - !detach — Stop streaming log output + !broadcast — Send message to all running units !units — List all units !help — Show this message """ @@ -75,7 +80,6 @@ class UmbrellaClient: Synchronous umbrella socket client. We use a plain blocking socket here since commands are short-lived and we don't need async for the request/response pattern. - For attach (streaming), we use a separate async wrapper. """ def __init__(self, sock_path: str = SOCK_PATH): @@ -139,143 +143,27 @@ class UmbrellaClient: def run_action(self, unit: str, action: str) -> dict: return self.command({"cmd": "action", "unit": unit, "action": action}) + def tail(self, unit: str) -> dict: + return self.command({"cmd": "tail", "unit": unit}) -# ── Attach session: streams output from umbrella to a Matrix room ───────────── - -class AttachSession: - """ - Maintains a persistent connection to umbrella for a specific unit, - forwarding live output to a Matrix room. - """ + def broadcast(self, message: str) -> dict: + return self.command({"cmd": "broadcast", "message": message}) - def __init__(self, unit: str, room_id: str, - matrix_client: "AsyncClient", - sock_path: str = SOCK_PATH): - self.unit = unit - self.room_id = room_id - self.matrix_client = matrix_client - self.sock_path = sock_path - self._task: Optional[asyncio.Task] = None - self._sock: Optional[socket.socket] = None - - async def start(self): - if self._task and not self._task.done(): - return # already running - self._task = asyncio.create_task(self._stream()) - - async def stop(self): - if self._task: - self._task.cancel() - try: - await self._task - except asyncio.CancelledError: - pass - if self._sock: - try: - self._sock.close() - except Exception: - pass - self._sock = None - - async def _stream(self): - loop = asyncio.get_event_loop() - - def connect_and_attach(): - s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - s.settimeout(10) - s.connect(self.sock_path) - # Send attach command - msg = json.dumps({"cmd": "attach", "unit": self.unit}).encode() - s.sendall(struct.pack(">I", len(msg)) + msg) - # Read ok response - hdr = b"" - while len(hdr) < 4: - hdr += s.recv(4 - len(hdr)) - length = struct.unpack(">I", hdr)[0] - resp_data = b"" - while len(resp_data) < length: - resp_data += s.recv(length - len(resp_data)) - s.settimeout(None) # non-blocking from here - s.setblocking(False) - return s - try: - self._sock = await loop.run_in_executor(None, connect_and_attach) - except Exception as e: - log.error(f"attach: failed to connect for {self.unit}: {e}") - return +# ── Audit logging ───────────────────────────────────────────────────────────── - buf = b"" - log.info(f"attach: streaming {self.unit} to room {self.room_id}") - - try: - while True: - # Read available data from socket - try: - chunk = await loop.run_in_executor( - None, lambda: self._recv_one_nonblock() - ) - if chunk is None: - await asyncio.sleep(0.05) - continue - if chunk == {}: - break # disconnected - except asyncio.CancelledError: - raise - except Exception as e: - log.error(f"attach: read error for {self.unit}: {e}") - break - - if chunk.get("type") == "output": - data = chunk.get("data", "") - history = chunk.get("history", False) - if data and not history: - # Send to Matrix room, wrapped in code block - await self.matrix_client.room_send( - self.room_id, - "m.room.message", - { - "msgtype": "m.text", - "body": f"```\n{data.rstrip()}\n```", - "format": "org.matrix.custom.html", - "formatted_body": f"
{data.rstrip()}
", - }, - ignore_unverified_devices=True, - ) - except asyncio.CancelledError: - pass - finally: - if self._sock: - # Send detach - try: - msg = json.dumps({"cmd": "detach"}).encode() - self._sock.setblocking(True) - self._sock.sendall(struct.pack(">I", len(msg)) + msg) - except Exception: - pass - self._sock.close() - self._sock = None - - def _recv_one_nonblock(self) -> Optional[dict]: - """Try to read one message from the non-blocking socket. Returns None if no data yet.""" - try: - hdr = self._sock.recv(4) - if not hdr: - return {} # disconnected - if len(hdr) < 4: - return None - length = struct.unpack(">I", hdr)[0] - data = b"" - self._sock.setblocking(True) - while len(data) < length: - chunk = self._sock.recv(length - len(data)) - if not chunk: - return {} - data += chunk - self._sock.setblocking(False) - return json.loads(data.decode()) - except BlockingIOError: - return None +def audit_log(sender: str, unit: str, command: str) -> None: + """ + Append a line to the bot audit log for !cmd invocations. + Format: [YYYY-MM-DD HH:MM:SS] !cmd + """ + timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + line = f"[{timestamp}] {sender} !cmd {unit} {command}\n" + try: + with open(AUDIT_LOG, "a") as f: + f.write(line) + except OSError as e: + log.warning(f"Could not write audit log: {e}") # ── Power level check ───────────────────────────────────────────────────────── @@ -304,7 +192,6 @@ async def get_user_power_level(client: AsyncClient, async def handle_command( client: AsyncClient, umbrella: UmbrellaClient, - attach_sessions: dict, room: MatrixRoom, sender: str, text: str, @@ -334,7 +221,10 @@ async def handle_command( return "No units loaded." lines = ["**Units:**"] for u in units: - lines.append(f" `{u['name']}` — {u['display']} [{u['state']}]") + pl = u.get("players", -1) + mx = u.get("max_players", -1) + players = f" ({pl}/{mx})" if pl >= 0 and mx >= 0 else "" + lines.append(f" `{u['name']}` — {u['display']} [{u['state']}{players}]") return "\n".join(lines) elif cmd == "!status": @@ -344,10 +234,18 @@ async def handle_command( resp = umbrella.status(unit) if resp.get("type") == "error": return f"❌ {resp['message']}" - return ( - f"**{resp.get('display', unit)}**\n" - f"State: `{resp.get('state', 'unknown')}`" - ) + pl = resp.get("players", -1) + mx = resp.get("max_players", -1) + map_name = resp.get("map", "") + lines = [ + f"**{resp.get('display', unit)}**", + f"State: `{resp.get('state', 'unknown')}`", + ] + if isinstance(pl, int) and pl >= 0 and isinstance(mx, int) and mx >= 0: + lines.append(f"Players: {pl}/{mx}") + if map_name: + lines.append(f"Map: {map_name}") + return "\n".join(lines) elif cmd == "!cmd": if len(parts) < 3: @@ -357,8 +255,43 @@ async def handle_command( resp = umbrella.send_input(unit, command) if resp.get("type") == "error": return f"❌ {resp['message']}" + audit_log(sender, unit, command) return f"✓ Sent: `{command}`" + elif cmd == "!tail": + if len(parts) < 2: + return "Usage: `!tail `" + unit = parts[1] + resp = umbrella.tail(unit) + if resp.get("type") == "error": + return f"❌ {resp.get('message', 'unknown error')}" + data = resp.get("data", "") + if not data or not data.strip(): + return f"No output buffered for `{unit}`." + # Trim to last TAIL_MAX_LINES lines to avoid flooding the room + lines = data.splitlines() + if len(lines) > TAIL_MAX_LINES: + lines = lines[-TAIL_MAX_LINES:] + trimmed = f"(showing last {TAIL_MAX_LINES} lines)\n" + else: + trimmed = "" + body = trimmed + "\n".join(lines) + return f"```\n{body}\n```" + + elif cmd == "!broadcast": + if len(parts) < 2: + return "Usage: `!broadcast `" + message = " ".join(parts[1:]) + resp = umbrella.broadcast(message) + if resp.get("type") == "error": + return f"❌ {resp.get('message', 'unknown error')}" + sent = resp.get("sent", 0) + failed = resp.get("failed", 0) + result = f"✓ Broadcast sent to {sent} unit(s)" + if failed: + result += f", {failed} failed" + return result + elif cmd == "!restart": if len(parts) < 2: return "Usage: `!restart `" @@ -387,29 +320,6 @@ async def handle_command( return f"❌ {resp['message']}" return f"✓ Action `{action}` dispatched for `{unit}`" - elif cmd == "!attach": - if len(parts) < 2: - return "Usage: `!attach `" - unit = parts[1] - key = (room.room_id, unit) - if key in attach_sessions: - return f"Already streaming `{unit}` to this room." - session = AttachSession(unit, room.room_id, client) - attach_sessions[key] = session - await session.start() - return f"📡 Streaming `{unit}` output to this room. Use `!detach {unit}` to stop." - - elif cmd == "!detach": - if len(parts) < 2: - return "Usage: `!detach `" - unit = parts[1] - key = (room.room_id, unit) - if key not in attach_sessions: - return f"`{unit}` is not being streamed to this room." - await attach_sessions[key].stop() - del attach_sessions[key] - return f"⏹ Stopped streaming `{unit}`." - except RuntimeError as e: return f"❌ Umbrella error: {e}" except Exception as e: @@ -441,7 +351,6 @@ async def run_bot(config: dict): ) umbrella = UmbrellaClient(config.get("socket_path", SOCK_PATH)) - attach_sessions = {} room_id = config["room_id"] # Login @@ -478,7 +387,7 @@ async def run_bot(config: dict): log.info(f"Command from {event.sender}: {body}") reply = await handle_command( - client, umbrella, attach_sessions, + client, umbrella, room, event.sender, body, config ) if reply: @@ -525,7 +434,7 @@ async def run_bot(config: dict): await client.keys_upload() if client.should_query_keys: await client.keys_query() - + sync_task = asyncio.create_task(sync_loop()) await stop_event.wait() @@ -536,10 +445,6 @@ async def run_bot(config: dict): except asyncio.CancelledError: pass - # Stop all attach sessions - for session in attach_sessions.values(): - await session.stop() - await client.close() log.info("Bot stopped.") -- cgit v1.2.3