#!/usr/bin/env python3 """ umbrella-bot: Matrix bot client for the umbrella server manager. Connects to the umbrella Unix socket and bridges commands from a Matrix room to umbrella units. Requires matrix-nio with encryption support. Dependencies: pip install matrix-nio[e2e] aiofiles markdown Setup: 1. Create /etc/umbrella/bot.conf (see bot.conf.example) 2. Run once with --setup to perform initial login and key verification 3. Run normally or via systemd """ import asyncio import json import markdown as md_lib import os import re import signal import socket import struct import sys import logging import argparse from datetime import datetime from pathlib import Path from typing import Optional from nio import ( AsyncClient, AsyncClientConfig, LoginResponse, MatrixRoom, RoomMessageText, SyncResponse, crypto, ) from nio.store import SqliteStore # ── Logging ─────────────────────────────────────────────────────────────────── logging.basicConfig( level=logging.INFO, format="[%(asctime)s] [%(levelname)s] %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) log = logging.getLogger("umbrella-bot") # ── Config ──────────────────────────────────────────────────────────────────── DEFAULT_CONF = "/etc/umbrella/bot.conf" DEFAULT_STORE = "/etc/umbrella/bot-store" SOCK_PATH = "/run/umbrella/umbrella.sock" AUDIT_LOG = "/var/log/umbrella/bot-audit.log" PROTO_MAX = 65536 # Maximum lines to show in a !tail response TAIL_MAX_LINES = 30 HELP_TEXT = """\ Umbrella bot commands: !status - Show unit status !cmd - Send a console command !tail - Show recent output from a unit !restart - Restart the unit's systemd service !update - Run the update action !action - Run a named action !broadcast - Send message to all running units !units - List all units !help - Show this message """ # ── Umbrella socket client ──────────────────────────────────────────────────── class UmbrellaClient: """ Synchronous umbrella socket client. We use a plain blocking socket here since commands are short-lived and we don't need async for the request/response pattern. """ def __init__(self, sock_path: str = SOCK_PATH): self.sock_path = sock_path def _connect(self) -> socket.socket: s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) s.settimeout(10) try: s.connect(self.sock_path) except FileNotFoundError: raise RuntimeError(f"Umbrella socket not found: {self.sock_path} " f"(is umbrelld running?)") except ConnectionRefusedError: raise RuntimeError("Umbrella daemon is not running") return s def _send(self, s: socket.socket, msg: dict) -> None: data = json.dumps(msg).encode() header = struct.pack(">I", len(data)) s.sendall(header + data) def _recv(self, s: socket.socket) -> dict: header = b"" while len(header) < 4: chunk = s.recv(4 - len(header)) if not chunk: raise RuntimeError("Connection closed by umbrella daemon") header += chunk length = struct.unpack(">I", header)[0] if length > PROTO_MAX: raise RuntimeError(f"Message too large: {length}") data = b"" while len(data) < length: chunk = s.recv(length - len(data)) if not chunk: raise RuntimeError("Connection closed mid-message") data += chunk return json.loads(data.decode()) def command(self, msg: dict) -> dict: """Send a command and return the response.""" s = self._connect() try: self._send(s, msg) return self._recv(s) finally: s.close() def list_units(self) -> list: resp = self.command({"cmd": "list"}) return resp.get("units", []) def status(self, unit: str) -> dict: return self.command({"cmd": "status", "unit": unit}) def send_input(self, unit: str, data: str) -> dict: return self.command({"cmd": "input", "unit": unit, "data": data + "\n"}) def run_action(self, unit: str, action: str) -> dict: return self.command({"cmd": "action", "unit": unit, "action": action}) def tail(self, unit: str) -> dict: return self.command({"cmd": "tail", "unit": unit}) def broadcast(self, message: str) -> dict: return self.command({"cmd": "broadcast", "message": message}) # ── Audit logging ───────────────────────────────────────────────────────────── def audit_log(sender: str, unit: str, command: str) -> None: """ Append a line to the bot audit log for !cmd invocations. Format: [YYYY-MM-DD HH:MM:SS] !cmd """ timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") line = f"[{timestamp}] {sender} !cmd {unit} {command}\n" try: with open(AUDIT_LOG, "a") as f: f.write(line) except OSError as e: log.warning(f"Could not write audit log: {e}") # ── Power level check ───────────────────────────────────────────────────────── async def get_user_power_level(client: AsyncClient, room_id: str, user_id: str) -> int: """ Fetch the power level for a user in a room. Returns 0 if the user is not found or on error. """ try: resp = await client.room_get_state_event( room_id, "m.room.power_levels", "" ) if hasattr(resp, "content"): users = resp.content.get("users", {}) default = resp.content.get("users_default", 0) return users.get(user_id, default) except Exception as e: log.warning(f"Could not get power level for {user_id}: {e}") return 0 # ── Command dispatch ────────────────────────────────────────────────────────── async def handle_command( client: AsyncClient, umbrella: UmbrellaClient, room: MatrixRoom, sender: str, text: str, config: dict, ) -> Optional[str]: """ Parse and dispatch a bot command. Returns a reply string or None. """ parts = text.strip().split(None, 3) if not parts: return None cmd = parts[0].lower() # Check power level power = await get_user_power_level(client, room.room_id, sender) if power < 50: return f"Permission denied. Requires power level >= 50 (you have {power})." try: if cmd == "!help": return HELP_TEXT elif cmd == "!units": units = umbrella.list_units() if not units: return "No units loaded." lines = ["**Units:**"] for u in units: pl = u.get("players", -1) mx = u.get("max_players", -1) players = f" ({pl}/{mx})" if pl >= 0 and mx >= 0 else "" lines.append(f" `{u['name']}` - {u['display']} [{u['state']}{players}]") return "\n".join(lines) elif cmd == "!status": if len(parts) < 2: return "Usage: `!status `" unit = parts[1] resp = umbrella.status(unit) if resp.get("type") == "error": return f"Error: {resp['message']}" pl = resp.get("players", -1) mx = resp.get("max_players", -1) map_name = resp.get("map", "") lines = [ f"**{resp.get('display', unit)}**", f"State: `{resp.get('state', 'unknown')}`", ] if isinstance(pl, int) and pl >= 0 and isinstance(mx, int) and mx >= 0: lines.append(f"Players: {pl}/{mx}") if map_name: lines.append(f"Map: {map_name}") return "\n".join(lines) elif cmd == "!cmd": if len(parts) < 3: return "Usage: `!cmd `" unit = parts[1] command = " ".join(parts[2:]) resp = umbrella.send_input(unit, command) if resp.get("type") == "error": return f"Error: {resp['message']}" audit_log(sender, unit, command) return f"Sent: `{command}`" elif cmd == "!tail": if len(parts) < 2: return "Usage: `!tail `" unit = parts[1] resp = umbrella.tail(unit) if resp.get("type") == "error": return f"Error: {resp.get('message', 'unknown error')}" data = resp.get("data", "") if not data or not data.strip(): return f"No output buffered for `{unit}`." # Trim to last TAIL_MAX_LINES lines to avoid flooding the room lines = data.splitlines() if len(lines) > TAIL_MAX_LINES: lines = lines[-TAIL_MAX_LINES:] trimmed = f"(showing last {TAIL_MAX_LINES} lines)\n" else: trimmed = "" body = trimmed + "\n".join(lines) return f"```\n{body}\n```" elif cmd == "!broadcast": if len(parts) < 2: return "Usage: `!broadcast `" message = " ".join(parts[1:]) resp = umbrella.broadcast(message) if resp.get("type") == "error": return f"Error: {resp.get('message', 'unknown error')}" sent = resp.get("sent", 0) failed = resp.get("failed", 0) result = f"Broadcast sent to {sent} unit(s)" if failed: result += f", {failed} failed" return result elif cmd == "!restart": if len(parts) < 2: return "Usage: `!restart `" unit = parts[1] resp = umbrella.run_action(unit, "restart") if resp.get("type") == "error": return f"Error: {resp['message']}" return f"Restart dispatched for `{unit}`" elif cmd == "!update": if len(parts) < 2: return "Usage: `!update `" unit = parts[1] resp = umbrella.run_action(unit, "update") if resp.get("type") == "error": return f"Error: {resp['message']}" return f"Update dispatched for `{unit}`" elif cmd == "!action": if len(parts) < 3: return "Usage: `!action `" unit = parts[1] action = parts[2] resp = umbrella.run_action(unit, action) if resp.get("type") == "error": return f"Error: {resp['message']}" return f"Action `{action}` dispatched for `{unit}`" except RuntimeError as e: return f"Umbrella error: {e}" except Exception as e: log.exception(f"Command error: {e}") return f"Internal error: {e}" return None # ── Bot core ────────────────────────────────────────────────────────────────── async def run_bot(config: dict): store_path = config.get("store_path", DEFAULT_STORE) os.makedirs(store_path, exist_ok=True) client_config = AsyncClientConfig( max_limit_exceeded=0, max_timeouts=0, store_sync_tokens=True, encryption_enabled=True, ) client = AsyncClient( homeserver=config["homeserver"], user=config["user_id"], device_id=config.get("device_id", "UMBRELLA_BOT"), store_path=store_path, config=client_config, ) umbrella = UmbrellaClient(config.get("socket_path", SOCK_PATH)) room_id = config["room_id"] # Login log.info(f"Logging in as {config['user_id']}...") resp = await client.login( password=config["password"], device_name="umbrella-bot", ) if not isinstance(resp, LoginResponse): log.error(f"Login failed: {resp}") return log.info(f"Logged in. Device ID: {client.device_id}") # Load encryption keys if client.should_upload_keys: await client.keys_upload() if client.should_query_keys: await client.keys_query() # Message callback async def on_message(room: MatrixRoom, event: RoomMessageText): # Only handle messages in our configured room if room.room_id != room_id: return # Ignore our own messages if event.sender == config["user_id"]: return # Only handle messages that start with ! body = event.body.strip() if not body.startswith("!"): return log.info(f"Command from {event.sender}: {body}") reply = await handle_command( client, umbrella, room, event.sender, body, config ) if reply: await client.room_send( room_id, "m.room.message", { "msgtype": "m.text", "body": reply, "format": "org.matrix.custom.html", "formatted_body": md_lib.markdown(reply, extensions=["fenced_code", "nl2br"]), }, ignore_unverified_devices=True, ) client.add_event_callback(on_message, RoomMessageText) # Graceful shutdown loop = asyncio.get_event_loop() stop_event = asyncio.Event() def _signal_handler(): log.info("Shutting down...") stop_event.set() loop.add_signal_handler(signal.SIGTERM, _signal_handler) loop.add_signal_handler(signal.SIGINT, _signal_handler) log.info(f"Syncing, watching room {room_id}") # Initial sync await client.sync(timeout=10000) if client.should_query_keys: await client.keys_query() await client.joined_members(room_id) # Main sync loop async def sync_loop(): while not stop_event.is_set(): resp = await client.sync(timeout=30000, full_state=False) if isinstance(resp, SyncResponse): if client.should_upload_keys: await client.keys_upload() if client.should_query_keys: await client.keys_query() sync_task = asyncio.create_task(sync_loop()) await stop_event.wait() sync_task.cancel() try: await sync_task except asyncio.CancelledError: pass await client.close() log.info("Bot stopped.") # ── Setup mode ──────────────────────────────────────────────────────────────── async def run_setup(config: dict): """ First-run setup: login, generate device keys, print device ID. Run this once before starting the bot normally. """ store_path = config.get("store_path", DEFAULT_STORE) os.makedirs(store_path, exist_ok=True) client = AsyncClient( homeserver=config["homeserver"], user=config["user_id"], device_id=config.get("device_id", "UMBRELLA_BOT"), store_path=store_path, config=AsyncClientConfig(encryption_enabled=True), ) print(f"Logging in as {config['user_id']}...") resp = await client.login( password=config["password"], device_name="umbrella-bot", ) if not isinstance(resp, LoginResponse): print(f"Login failed: {resp}") return print(f"Logged in successfully.") print(f"Device ID: {client.device_id}") print(f"Add this to your bot.conf: device_id = {client.device_id}") if client.should_upload_keys: await client.keys_upload() print("Encryption keys uploaded.") print("\nSetup complete. You can now run the bot normally.") print("Note: you may need to verify this device from another session") print("if your room requires verified devices.") await client.close() # ── Config loading ──────────────────────────────────────────────────────────── def load_config(path: str) -> dict: config = {} with open(path) as f: for line in f: line = line.strip() if not line or line.startswith("#"): continue if "=" not in line: continue key, _, value = line.partition("=") config[key.strip()] = value.strip() required = ["homeserver", "user_id", "password", "room_id"] for key in required: if key not in config: raise ValueError(f"Missing required config key: {key}") return config # ── Entry point ─────────────────────────────────────────────────────────────── def main(): parser = argparse.ArgumentParser(description="Umbrella Matrix bot") parser.add_argument("-c", "--config", default=DEFAULT_CONF, help=f"Config file (default: {DEFAULT_CONF})") parser.add_argument("--setup", action="store_true", help="Run first-time setup and exit") args = parser.parse_args() try: config = load_config(args.config) except FileNotFoundError: print(f"Config file not found: {args.config}") print(f"Create it based on bot.conf.example") sys.exit(1) except ValueError as e: print(f"Config error: {e}") sys.exit(1) if args.setup: asyncio.run(run_setup(config)) else: asyncio.run(run_bot(config)) if __name__ == "__main__": main()