#!/usr/bin/env python3 """ umbrella-bot: Matrix bot client for the umbrella server manager. Connects to the umbrella Unix socket and bridges commands from a Matrix room to umbrella units. Requires matrix-nio with encryption support. Dependencies: pip install matrix-nio[e2e] aiofiles Setup: 1. Create /etc/umbrella/bot.conf (see bot.conf.example) 2. Run once with --setup to perform initial login and key verification 3. Run normally or via systemd """ import asyncio import json import os import re import signal import socket import struct import sys import logging import argparse from pathlib import Path from typing import Optional from nio import ( AsyncClient, AsyncClientConfig, LoginResponse, MatrixRoom, RoomMessageText, SyncResponse, crypto, ) from nio.store import SqliteStore # ── Logging ─────────────────────────────────────────────────────────────────── logging.basicConfig( level=logging.INFO, format="[%(asctime)s] [%(levelname)s] %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) log = logging.getLogger("umbrella-bot") # ── Config ──────────────────────────────────────────────────────────────────── DEFAULT_CONF = "/etc/umbrella/bot.conf" DEFAULT_STORE = "/etc/umbrella/bot-store" SOCK_PATH = "/run/umbrella/umbrella.sock" PROTO_MAX = 65536 HELP_TEXT = """\ Umbrella bot commands: !status — Show unit status !cmd — Send a console command !restart — Restart the unit's systemd service !update — Run the update action !action — Run a named action !attach — Stream live log output to this room !detach — Stop streaming log output !units — List all units !help — Show this message """ # ── Umbrella socket client ──────────────────────────────────────────────────── class UmbrellaClient: """ Synchronous umbrella socket client. We use a plain blocking socket here since commands are short-lived and we don't need async for the request/response pattern. For attach (streaming), we use a separate async wrapper. """ def __init__(self, sock_path: str = SOCK_PATH): self.sock_path = sock_path def _connect(self) -> socket.socket: s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) s.settimeout(10) try: s.connect(self.sock_path) except FileNotFoundError: raise RuntimeError(f"Umbrella socket not found: {self.sock_path} " f"(is umbrelld running?)") except ConnectionRefusedError: raise RuntimeError("Umbrella daemon is not running") return s def _send(self, s: socket.socket, msg: dict) -> None: data = json.dumps(msg).encode() header = struct.pack(">I", len(data)) s.sendall(header + data) def _recv(self, s: socket.socket) -> dict: header = b"" while len(header) < 4: chunk = s.recv(4 - len(header)) if not chunk: raise RuntimeError("Connection closed by umbrella daemon") header += chunk length = struct.unpack(">I", header)[0] if length > PROTO_MAX: raise RuntimeError(f"Message too large: {length}") data = b"" while len(data) < length: chunk = s.recv(length - len(data)) if not chunk: raise RuntimeError("Connection closed mid-message") data += chunk return json.loads(data.decode()) def command(self, msg: dict) -> dict: """Send a command and return the response.""" s = self._connect() try: self._send(s, msg) return self._recv(s) finally: s.close() def list_units(self) -> list: resp = self.command({"cmd": "list"}) return resp.get("units", []) def status(self, unit: str) -> dict: return self.command({"cmd": "status", "unit": unit}) def send_input(self, unit: str, data: str) -> dict: return self.command({"cmd": "input", "unit": unit, "data": data + "\n"}) def run_action(self, unit: str, action: str) -> dict: return self.command({"cmd": "action", "unit": unit, "action": action}) # ── Attach session: streams output from umbrella to a Matrix room ───────────── class AttachSession: """ Maintains a persistent connection to umbrella for a specific unit, forwarding live output to a Matrix room. """ def __init__(self, unit: str, room_id: str, matrix_client: "AsyncClient", sock_path: str = SOCK_PATH): self.unit = unit self.room_id = room_id self.matrix_client = matrix_client self.sock_path = sock_path self._task: Optional[asyncio.Task] = None self._sock: Optional[socket.socket] = None async def start(self): if self._task and not self._task.done(): return # already running self._task = asyncio.create_task(self._stream()) async def stop(self): if self._task: self._task.cancel() try: await self._task except asyncio.CancelledError: pass if self._sock: try: self._sock.close() except Exception: pass self._sock = None async def _stream(self): loop = asyncio.get_event_loop() def connect_and_attach(): s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) s.settimeout(10) s.connect(self.sock_path) # Send attach command msg = json.dumps({"cmd": "attach", "unit": self.unit}).encode() s.sendall(struct.pack(">I", len(msg)) + msg) # Read ok response hdr = b"" while len(hdr) < 4: hdr += s.recv(4 - len(hdr)) length = struct.unpack(">I", hdr)[0] resp_data = b"" while len(resp_data) < length: resp_data += s.recv(length - len(resp_data)) s.settimeout(None) # non-blocking from here s.setblocking(False) return s try: self._sock = await loop.run_in_executor(None, connect_and_attach) except Exception as e: log.error(f"attach: failed to connect for {self.unit}: {e}") return buf = b"" log.info(f"attach: streaming {self.unit} to room {self.room_id}") try: while True: # Read available data from socket try: chunk = await loop.run_in_executor( None, lambda: self._recv_one_nonblock() ) if chunk is None: await asyncio.sleep(0.05) continue if chunk == {}: break # disconnected except asyncio.CancelledError: raise except Exception as e: log.error(f"attach: read error for {self.unit}: {e}") break if chunk.get("type") == "output": data = chunk.get("data", "") history = chunk.get("history", False) if data and not history: # Send to Matrix room, wrapped in code block await self.matrix_client.room_send( self.room_id, "m.room.message", { "msgtype": "m.text", "body": f"```\n{data.rstrip()}\n```", "format": "org.matrix.custom.html", "formatted_body": f"
{data.rstrip()}
", }, ignore_unverified_devices=True, ) except asyncio.CancelledError: pass finally: if self._sock: # Send detach try: msg = json.dumps({"cmd": "detach"}).encode() self._sock.setblocking(True) self._sock.sendall(struct.pack(">I", len(msg)) + msg) except Exception: pass self._sock.close() self._sock = None def _recv_one_nonblock(self) -> Optional[dict]: """Try to read one message from the non-blocking socket. Returns None if no data yet.""" try: hdr = self._sock.recv(4) if not hdr: return {} # disconnected if len(hdr) < 4: return None length = struct.unpack(">I", hdr)[0] data = b"" self._sock.setblocking(True) while len(data) < length: chunk = self._sock.recv(length - len(data)) if not chunk: return {} data += chunk self._sock.setblocking(False) return json.loads(data.decode()) except BlockingIOError: return None # ── Power level check ───────────────────────────────────────────────────────── async def get_user_power_level(client: AsyncClient, room_id: str, user_id: str) -> int: """ Fetch the power level for a user in a room. Returns 0 if the user is not found or on error. """ try: resp = await client.room_get_state_event( room_id, "m.room.power_levels", "" ) if hasattr(resp, "content"): users = resp.content.get("users", {}) default = resp.content.get("users_default", 0) return users.get(user_id, default) except Exception as e: log.warning(f"Could not get power level for {user_id}: {e}") return 0 # ── Command dispatch ────────────────────────────────────────────────────────── async def handle_command( client: AsyncClient, umbrella: UmbrellaClient, attach_sessions: dict, room: MatrixRoom, sender: str, text: str, config: dict, ) -> Optional[str]: """ Parse and dispatch a bot command. Returns a reply string or None. """ parts = text.strip().split(None, 3) if not parts: return None cmd = parts[0].lower() # Check power level power = await get_user_power_level(client, room.room_id, sender) if power < 50: return f"❌ Permission denied. Requires power level ≥ 50 (you have {power})." try: if cmd == "!help": return HELP_TEXT elif cmd == "!units": units = umbrella.list_units() if not units: return "No units loaded." lines = ["**Units:**"] for u in units: lines.append(f" `{u['name']}` — {u['display']} [{u['state']}]") return "\n".join(lines) elif cmd == "!status": if len(parts) < 2: return "Usage: `!status `" unit = parts[1] resp = umbrella.status(unit) if resp.get("type") == "error": return f"❌ {resp['message']}" return ( f"**{resp.get('display', unit)}**\n" f"State: `{resp.get('state', 'unknown')}`" ) elif cmd == "!cmd": if len(parts) < 3: return "Usage: `!cmd `" unit = parts[1] command = " ".join(parts[2:]) resp = umbrella.send_input(unit, command) if resp.get("type") == "error": return f"❌ {resp['message']}" return f"✓ Sent: `{command}`" elif cmd == "!restart": if len(parts) < 2: return "Usage: `!restart `" unit = parts[1] resp = umbrella.run_action(unit, "restart") if resp.get("type") == "error": return f"❌ {resp['message']}" return f"✓ Restart dispatched for `{unit}`" elif cmd == "!update": if len(parts) < 2: return "Usage: `!update `" unit = parts[1] resp = umbrella.run_action(unit, "update") if resp.get("type") == "error": return f"❌ {resp['message']}" return f"✓ Update dispatched for `{unit}`" elif cmd == "!action": if len(parts) < 3: return "Usage: `!action `" unit = parts[1] action = parts[2] resp = umbrella.run_action(unit, action) if resp.get("type") == "error": return f"❌ {resp['message']}" return f"✓ Action `{action}` dispatched for `{unit}`" elif cmd == "!attach": if len(parts) < 2: return "Usage: `!attach `" unit = parts[1] key = (room.room_id, unit) if key in attach_sessions: return f"Already streaming `{unit}` to this room." session = AttachSession(unit, room.room_id, client) attach_sessions[key] = session await session.start() return f"📡 Streaming `{unit}` output to this room. Use `!detach {unit}` to stop." elif cmd == "!detach": if len(parts) < 2: return "Usage: `!detach `" unit = parts[1] key = (room.room_id, unit) if key not in attach_sessions: return f"`{unit}` is not being streamed to this room." await attach_sessions[key].stop() del attach_sessions[key] return f"⏹ Stopped streaming `{unit}`." except RuntimeError as e: return f"❌ Umbrella error: {e}" except Exception as e: log.exception(f"Command error: {e}") return f"❌ Internal error: {e}" return None # ── Bot core ────────────────────────────────────────────────────────────────── async def run_bot(config: dict): store_path = config.get("store_path", DEFAULT_STORE) os.makedirs(store_path, exist_ok=True) client_config = AsyncClientConfig( max_limit_exceeded=0, max_timeouts=0, store_sync_tokens=True, encryption_enabled=True, ) client = AsyncClient( homeserver=config["homeserver"], user=config["user_id"], device_id=config.get("device_id", "UMBRELLA_BOT"), store_path=store_path, config=client_config, ) umbrella = UmbrellaClient(config.get("socket_path", SOCK_PATH)) attach_sessions = {} room_id = config["room_id"] # Login log.info(f"Logging in as {config['user_id']}...") resp = await client.login( password=config["password"], device_name="umbrella-bot", ) if not isinstance(resp, LoginResponse): log.error(f"Login failed: {resp}") return log.info(f"Logged in. Device ID: {client.device_id}") # Load encryption keys if client.should_upload_keys: await client.keys_upload() if client.should_query_keys: await client.keys_query() # Message callback async def on_message(room: MatrixRoom, event: RoomMessageText): # Only handle messages in our configured room if room.room_id != room_id: return # Ignore our own messages if event.sender == config["user_id"]: return # Only handle messages that start with ! body = event.body.strip() if not body.startswith("!"): return log.info(f"Command from {event.sender}: {body}") reply = await handle_command( client, umbrella, attach_sessions, room, event.sender, body, config ) if reply: await client.room_send( room_id, "m.room.message", { "msgtype": "m.text", "body": reply, "format": "org.matrix.custom.html", "formatted_body": reply.replace("\n", "
"), }, ignore_unverified_devices=True, ) client.add_event_callback(on_message, RoomMessageText) # Graceful shutdown loop = asyncio.get_event_loop() stop_event = asyncio.Event() def _signal_handler(): log.info("Shutting down...") stop_event.set() loop.add_signal_handler(signal.SIGTERM, _signal_handler) loop.add_signal_handler(signal.SIGINT, _signal_handler) log.info(f"Syncing, watching room {room_id}") # Initial sync await client.sync(timeout=10000) if client.should_query_keys: await client.keys_query() await client.joined_members(room_id) # Main sync loop async def sync_loop(): while not stop_event.is_set(): resp = await client.sync(timeout=30000, full_state=False) if isinstance(resp, SyncResponse): if client.should_upload_keys: await client.keys_upload() if client.should_query_keys: await client.keys_query() sync_task = asyncio.create_task(sync_loop()) await stop_event.wait() sync_task.cancel() try: await sync_task except asyncio.CancelledError: pass # Stop all attach sessions for session in attach_sessions.values(): await session.stop() await client.close() log.info("Bot stopped.") # ── Setup mode ──────────────────────────────────────────────────────────────── async def run_setup(config: dict): """ First-run setup: login, generate device keys, print device ID. Run this once before starting the bot normally. """ store_path = config.get("store_path", DEFAULT_STORE) os.makedirs(store_path, exist_ok=True) client = AsyncClient( homeserver=config["homeserver"], user=config["user_id"], device_id=config.get("device_id", "UMBRELLA_BOT"), store_path=store_path, config=AsyncClientConfig(encryption_enabled=True), ) print(f"Logging in as {config['user_id']}...") resp = await client.login( password=config["password"], device_name="umbrella-bot", ) if not isinstance(resp, LoginResponse): print(f"Login failed: {resp}") return print(f"Logged in successfully.") print(f"Device ID: {client.device_id}") print(f"Add this to your bot.conf: device_id = {client.device_id}") if client.should_upload_keys: await client.keys_upload() print("Encryption keys uploaded.") print("\nSetup complete. You can now run the bot normally.") print("Note: you may need to verify this device from another session") print("if your room requires verified devices.") await client.close() # ── Config loading ──────────────────────────────────────────────────────────── def load_config(path: str) -> dict: config = {} with open(path) as f: for line in f: line = line.strip() if not line or line.startswith("#"): continue if "=" not in line: continue key, _, value = line.partition("=") config[key.strip()] = value.strip() required = ["homeserver", "user_id", "password", "room_id"] for key in required: if key not in config: raise ValueError(f"Missing required config key: {key}") return config # ── Entry point ─────────────────────────────────────────────────────────────── def main(): parser = argparse.ArgumentParser(description="Umbrella Matrix bot") parser.add_argument("-c", "--config", default=DEFAULT_CONF, help=f"Config file (default: {DEFAULT_CONF})") parser.add_argument("--setup", action="store_true", help="Run first-time setup and exit") args = parser.parse_args() try: config = load_config(args.config) except FileNotFoundError: print(f"Config file not found: {args.config}") print(f"Create it based on bot.conf.example") sys.exit(1) except ValueError as e: print(f"Config error: {e}") sys.exit(1) if args.setup: asyncio.run(run_setup(config)) else: asyncio.run(run_bot(config)) if __name__ == "__main__": main()