diff options
| author | auric <auric@japegames.com> | 2026-02-21 15:11:51 -0600 |
|---|---|---|
| committer | auric <auric@japegames.com> | 2026-02-21 15:11:51 -0600 |
| commit | 52f92ea70f74008d82d21fef5085fb7380314ea1 (patch) | |
| tree | 357ec1f0ec75779fc945d3b7460e976fe677ae31 /clients/umbrella-bot/umbrella-bot.py | |
| parent | af012ffe7594350021741c62bd1205b65dfec07f (diff) | |
| parent | fc10d8a0818bb87001a64a72552ed28fe60931ee (diff) | |
Merge pull request #2 from ihateamongus/claude/trusting-dirac
State probing overhaul, A2S queries, tail/broadcast, bot audit log
Diffstat (limited to 'clients/umbrella-bot/umbrella-bot.py')
| -rwxr-xr-x | clients/umbrella-bot/umbrella-bot.py | 249 |
1 files changed, 77 insertions, 172 deletions
diff --git a/clients/umbrella-bot/umbrella-bot.py b/clients/umbrella-bot/umbrella-bot.py index fed3bdb..6e26947 100755 --- a/clients/umbrella-bot/umbrella-bot.py +++ b/clients/umbrella-bot/umbrella-bot.py @@ -24,6 +24,7 @@ import struct import sys import logging import argparse +from datetime import datetime from pathlib import Path from typing import Optional @@ -52,18 +53,22 @@ log = logging.getLogger("umbrella-bot") DEFAULT_CONF = "/etc/umbrella/bot.conf" DEFAULT_STORE = "/etc/umbrella/bot-store" SOCK_PATH = "/run/umbrella/umbrella.sock" +AUDIT_LOG = "/var/log/umbrella/bot-audit.log" PROTO_MAX = 65536 +# Maximum lines to show in a !tail response +TAIL_MAX_LINES = 30 + HELP_TEXT = """\ Umbrella bot commands: !status <unit> — Show unit status !cmd <unit> <command> — Send a console command + !tail <unit> — Show recent output from a unit !restart <unit> — Restart the unit's systemd service !update <unit> — Run the update action !action <unit> <action> — Run a named action - !attach <unit> — Stream live log output to this room - !detach <unit> — Stop streaming log output + !broadcast <message> — Send message to all running units !units — List all units !help — Show this message """ @@ -75,7 +80,6 @@ class UmbrellaClient: Synchronous umbrella socket client. We use a plain blocking socket here since commands are short-lived and we don't need async for the request/response pattern. - For attach (streaming), we use a separate async wrapper. """ def __init__(self, sock_path: str = SOCK_PATH): @@ -139,143 +143,27 @@ class UmbrellaClient: def run_action(self, unit: str, action: str) -> dict: return self.command({"cmd": "action", "unit": unit, "action": action}) + def tail(self, unit: str) -> dict: + return self.command({"cmd": "tail", "unit": unit}) -# ── Attach session: streams output from umbrella to a Matrix room ───────────── - -class AttachSession: - """ - Maintains a persistent connection to umbrella for a specific unit, - forwarding live output to a Matrix room. - """ + def broadcast(self, message: str) -> dict: + return self.command({"cmd": "broadcast", "message": message}) - def __init__(self, unit: str, room_id: str, - matrix_client: "AsyncClient", - sock_path: str = SOCK_PATH): - self.unit = unit - self.room_id = room_id - self.matrix_client = matrix_client - self.sock_path = sock_path - self._task: Optional[asyncio.Task] = None - self._sock: Optional[socket.socket] = None - - async def start(self): - if self._task and not self._task.done(): - return # already running - self._task = asyncio.create_task(self._stream()) - - async def stop(self): - if self._task: - self._task.cancel() - try: - await self._task - except asyncio.CancelledError: - pass - if self._sock: - try: - self._sock.close() - except Exception: - pass - self._sock = None - - async def _stream(self): - loop = asyncio.get_event_loop() - - def connect_and_attach(): - s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - s.settimeout(10) - s.connect(self.sock_path) - # Send attach command - msg = json.dumps({"cmd": "attach", "unit": self.unit}).encode() - s.sendall(struct.pack(">I", len(msg)) + msg) - # Read ok response - hdr = b"" - while len(hdr) < 4: - hdr += s.recv(4 - len(hdr)) - length = struct.unpack(">I", hdr)[0] - resp_data = b"" - while len(resp_data) < length: - resp_data += s.recv(length - len(resp_data)) - s.settimeout(None) # non-blocking from here - s.setblocking(False) - return s - try: - self._sock = await loop.run_in_executor(None, connect_and_attach) - except Exception as e: - log.error(f"attach: failed to connect for {self.unit}: {e}") - return +# ── Audit logging ───────────────────────────────────────────────────────────── - buf = b"" - log.info(f"attach: streaming {self.unit} to room {self.room_id}") - - try: - while True: - # Read available data from socket - try: - chunk = await loop.run_in_executor( - None, lambda: self._recv_one_nonblock() - ) - if chunk is None: - await asyncio.sleep(0.05) - continue - if chunk == {}: - break # disconnected - except asyncio.CancelledError: - raise - except Exception as e: - log.error(f"attach: read error for {self.unit}: {e}") - break - - if chunk.get("type") == "output": - data = chunk.get("data", "") - history = chunk.get("history", False) - if data and not history: - # Send to Matrix room, wrapped in code block - await self.matrix_client.room_send( - self.room_id, - "m.room.message", - { - "msgtype": "m.text", - "body": f"```\n{data.rstrip()}\n```", - "format": "org.matrix.custom.html", - "formatted_body": f"<pre><code>{data.rstrip()}</code></pre>", - }, - ignore_unverified_devices=True, - ) - except asyncio.CancelledError: - pass - finally: - if self._sock: - # Send detach - try: - msg = json.dumps({"cmd": "detach"}).encode() - self._sock.setblocking(True) - self._sock.sendall(struct.pack(">I", len(msg)) + msg) - except Exception: - pass - self._sock.close() - self._sock = None - - def _recv_one_nonblock(self) -> Optional[dict]: - """Try to read one message from the non-blocking socket. Returns None if no data yet.""" - try: - hdr = self._sock.recv(4) - if not hdr: - return {} # disconnected - if len(hdr) < 4: - return None - length = struct.unpack(">I", hdr)[0] - data = b"" - self._sock.setblocking(True) - while len(data) < length: - chunk = self._sock.recv(length - len(data)) - if not chunk: - return {} - data += chunk - self._sock.setblocking(False) - return json.loads(data.decode()) - except BlockingIOError: - return None +def audit_log(sender: str, unit: str, command: str) -> None: + """ + Append a line to the bot audit log for !cmd invocations. + Format: [YYYY-MM-DD HH:MM:SS] <sender> !cmd <unit> <command> + """ + timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + line = f"[{timestamp}] {sender} !cmd {unit} {command}\n" + try: + with open(AUDIT_LOG, "a") as f: + f.write(line) + except OSError as e: + log.warning(f"Could not write audit log: {e}") # ── Power level check ───────────────────────────────────────────────────────── @@ -304,7 +192,6 @@ async def get_user_power_level(client: AsyncClient, async def handle_command( client: AsyncClient, umbrella: UmbrellaClient, - attach_sessions: dict, room: MatrixRoom, sender: str, text: str, @@ -334,7 +221,10 @@ async def handle_command( return "No units loaded." lines = ["**Units:**"] for u in units: - lines.append(f" `{u['name']}` — {u['display']} [{u['state']}]") + pl = u.get("players", -1) + mx = u.get("max_players", -1) + players = f" ({pl}/{mx})" if pl >= 0 and mx >= 0 else "" + lines.append(f" `{u['name']}` — {u['display']} [{u['state']}{players}]") return "\n".join(lines) elif cmd == "!status": @@ -344,10 +234,18 @@ async def handle_command( resp = umbrella.status(unit) if resp.get("type") == "error": return f"❌ {resp['message']}" - return ( - f"**{resp.get('display', unit)}**\n" - f"State: `{resp.get('state', 'unknown')}`" - ) + pl = resp.get("players", -1) + mx = resp.get("max_players", -1) + map_name = resp.get("map", "") + lines = [ + f"**{resp.get('display', unit)}**", + f"State: `{resp.get('state', 'unknown')}`", + ] + if isinstance(pl, int) and pl >= 0 and isinstance(mx, int) and mx >= 0: + lines.append(f"Players: {pl}/{mx}") + if map_name: + lines.append(f"Map: {map_name}") + return "\n".join(lines) elif cmd == "!cmd": if len(parts) < 3: @@ -357,8 +255,43 @@ async def handle_command( resp = umbrella.send_input(unit, command) if resp.get("type") == "error": return f"❌ {resp['message']}" + audit_log(sender, unit, command) return f"✓ Sent: `{command}`" + elif cmd == "!tail": + if len(parts) < 2: + return "Usage: `!tail <unit>`" + unit = parts[1] + resp = umbrella.tail(unit) + if resp.get("type") == "error": + return f"❌ {resp.get('message', 'unknown error')}" + data = resp.get("data", "") + if not data or not data.strip(): + return f"No output buffered for `{unit}`." + # Trim to last TAIL_MAX_LINES lines to avoid flooding the room + lines = data.splitlines() + if len(lines) > TAIL_MAX_LINES: + lines = lines[-TAIL_MAX_LINES:] + trimmed = f"(showing last {TAIL_MAX_LINES} lines)\n" + else: + trimmed = "" + body = trimmed + "\n".join(lines) + return f"```\n{body}\n```" + + elif cmd == "!broadcast": + if len(parts) < 2: + return "Usage: `!broadcast <message>`" + message = " ".join(parts[1:]) + resp = umbrella.broadcast(message) + if resp.get("type") == "error": + return f"❌ {resp.get('message', 'unknown error')}" + sent = resp.get("sent", 0) + failed = resp.get("failed", 0) + result = f"✓ Broadcast sent to {sent} unit(s)" + if failed: + result += f", {failed} failed" + return result + elif cmd == "!restart": if len(parts) < 2: return "Usage: `!restart <unit>`" @@ -387,29 +320,6 @@ async def handle_command( return f"❌ {resp['message']}" return f"✓ Action `{action}` dispatched for `{unit}`" - elif cmd == "!attach": - if len(parts) < 2: - return "Usage: `!attach <unit>`" - unit = parts[1] - key = (room.room_id, unit) - if key in attach_sessions: - return f"Already streaming `{unit}` to this room." - session = AttachSession(unit, room.room_id, client) - attach_sessions[key] = session - await session.start() - return f"📡 Streaming `{unit}` output to this room. Use `!detach {unit}` to stop." - - elif cmd == "!detach": - if len(parts) < 2: - return "Usage: `!detach <unit>`" - unit = parts[1] - key = (room.room_id, unit) - if key not in attach_sessions: - return f"`{unit}` is not being streamed to this room." - await attach_sessions[key].stop() - del attach_sessions[key] - return f"⏹ Stopped streaming `{unit}`." - except RuntimeError as e: return f"❌ Umbrella error: {e}" except Exception as e: @@ -441,7 +351,6 @@ async def run_bot(config: dict): ) umbrella = UmbrellaClient(config.get("socket_path", SOCK_PATH)) - attach_sessions = {} room_id = config["room_id"] # Login @@ -478,7 +387,7 @@ async def run_bot(config: dict): log.info(f"Command from {event.sender}: {body}") reply = await handle_command( - client, umbrella, attach_sessions, + client, umbrella, room, event.sender, body, config ) if reply: @@ -525,7 +434,7 @@ async def run_bot(config: dict): await client.keys_upload() if client.should_query_keys: await client.keys_query() - + sync_task = asyncio.create_task(sync_loop()) await stop_event.wait() @@ -536,10 +445,6 @@ async def run_bot(config: dict): except asyncio.CancelledError: pass - # Stop all attach sessions - for session in attach_sessions.values(): - await session.stop() - await client.close() log.info("Bot stopped.") |
