diff options
| author | auric <auric@japegames.com> | 2026-02-21 11:08:36 -0600 |
|---|---|---|
| committer | auric <auric@japegames.com> | 2026-02-21 11:08:36 -0600 |
| commit | 0d706ae72ceefd74053ad6cb0900ecce6cf1f085 (patch) | |
| tree | 6faf7d3919182b8838a6ae69ad1a2a0fac468740 /clients/umbrella-bot/umbrella-bot.py | |
Add Umbrella 0.1.5
Diffstat (limited to 'clients/umbrella-bot/umbrella-bot.py')
| -rwxr-xr-x | clients/umbrella-bot/umbrella-bot.py | 631 |
1 files changed, 631 insertions, 0 deletions
diff --git a/clients/umbrella-bot/umbrella-bot.py b/clients/umbrella-bot/umbrella-bot.py new file mode 100755 index 0000000..001b94c --- /dev/null +++ b/clients/umbrella-bot/umbrella-bot.py @@ -0,0 +1,631 @@ +#!/usr/bin/env python3 +""" +umbrella-bot: Matrix bot client for the umbrella server manager. + +Connects to the umbrella Unix socket and bridges commands from a Matrix +room to umbrella units. Requires matrix-nio with encryption support. + +Dependencies: + pip install matrix-nio[e2e] aiofiles + +Setup: + 1. Create /etc/umbrella/bot.conf (see bot.conf.example) + 2. Run once with --setup to perform initial login and key verification + 3. Run normally or via systemd +""" + +import asyncio +import json +import os +import re +import signal +import socket +import struct +import sys +import logging +import argparse +from pathlib import Path +from typing import Optional + +from nio import ( + AsyncClient, + AsyncClientConfig, + LoginResponse, + MatrixRoom, + RoomMessageText, + SyncResponse, + crypto, +) +from nio.store import SqliteStore + +# ── Logging ─────────────────────────────────────────────────────────────────── + +logging.basicConfig( + level=logging.INFO, + format="[%(asctime)s] [%(levelname)s] %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", +) +log = logging.getLogger("umbrella-bot") + +# ── Config ──────────────────────────────────────────────────────────────────── + +DEFAULT_CONF = "/etc/umbrella/bot.conf" +DEFAULT_STORE = "/etc/umbrella/bot-store" +SOCK_PATH = "/run/umbrella/umbrella.sock" +PROTO_MAX = 65536 + +HELP_TEXT = """\ +Umbrella bot commands: + + !status <unit> — Show unit status + !cmd <unit> <command> — Send a console command + !restart <unit> — Restart the unit's systemd service + !update <unit> — Run the update action + !action <unit> <action> — Run a named action + !attach <unit> — Stream live log output to this room + !detach <unit> — Stop streaming log output + !units — List all units + !help — Show this message +""" + +# ── Umbrella socket client ──────────────────────────────────────────────────── + +class UmbrellaClient: + """ + Synchronous umbrella socket client. + We use a plain blocking socket here since commands are short-lived + and we don't need async for the request/response pattern. + For attach (streaming), we use a separate async wrapper. + """ + + def __init__(self, sock_path: str = SOCK_PATH): + self.sock_path = sock_path + + def _connect(self) -> socket.socket: + s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + s.settimeout(10) + try: + s.connect(self.sock_path) + except FileNotFoundError: + raise RuntimeError(f"Umbrella socket not found: {self.sock_path} " + f"(is umbrelld running?)") + except ConnectionRefusedError: + raise RuntimeError("Umbrella daemon is not running") + return s + + def _send(self, s: socket.socket, msg: dict) -> None: + data = json.dumps(msg).encode() + header = struct.pack(">I", len(data)) + s.sendall(header + data) + + def _recv(self, s: socket.socket) -> dict: + header = b"" + while len(header) < 4: + chunk = s.recv(4 - len(header)) + if not chunk: + raise RuntimeError("Connection closed by umbrella daemon") + header += chunk + length = struct.unpack(">I", header)[0] + if length > PROTO_MAX: + raise RuntimeError(f"Message too large: {length}") + data = b"" + while len(data) < length: + chunk = s.recv(length - len(data)) + if not chunk: + raise RuntimeError("Connection closed mid-message") + data += chunk + return json.loads(data.decode()) + + def command(self, msg: dict) -> dict: + """Send a command and return the response.""" + s = self._connect() + try: + self._send(s, msg) + return self._recv(s) + finally: + s.close() + + def list_units(self) -> list: + resp = self.command({"cmd": "list"}) + return resp.get("units", []) + + def status(self, unit: str) -> dict: + return self.command({"cmd": "status", "unit": unit}) + + def send_input(self, unit: str, data: str) -> dict: + return self.command({"cmd": "input", "unit": unit, + "data": data + "\n"}) + + def run_action(self, unit: str, action: str) -> dict: + return self.command({"cmd": "action", "unit": unit, "action": action}) + + +# ── Attach session: streams output from umbrella to a Matrix room ───────────── + +class AttachSession: + """ + Maintains a persistent connection to umbrella for a specific unit, + forwarding live output to a Matrix room. + """ + + def __init__(self, unit: str, room_id: str, + matrix_client: "AsyncClient", + sock_path: str = SOCK_PATH): + self.unit = unit + self.room_id = room_id + self.matrix_client = matrix_client + self.sock_path = sock_path + self._task: Optional[asyncio.Task] = None + self._sock: Optional[socket.socket] = None + + async def start(self): + if self._task and not self._task.done(): + return # already running + self._task = asyncio.create_task(self._stream()) + + async def stop(self): + if self._task: + self._task.cancel() + try: + await self._task + except asyncio.CancelledError: + pass + if self._sock: + try: + self._sock.close() + except Exception: + pass + self._sock = None + + async def _stream(self): + loop = asyncio.get_event_loop() + + def connect_and_attach(): + s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + s.settimeout(10) + s.connect(self.sock_path) + # Send attach command + msg = json.dumps({"cmd": "attach", "unit": self.unit}).encode() + s.sendall(struct.pack(">I", len(msg)) + msg) + # Read ok response + hdr = b"" + while len(hdr) < 4: + hdr += s.recv(4 - len(hdr)) + length = struct.unpack(">I", hdr)[0] + resp_data = b"" + while len(resp_data) < length: + resp_data += s.recv(length - len(resp_data)) + s.settimeout(None) # non-blocking from here + s.setblocking(False) + return s + + try: + self._sock = await loop.run_in_executor(None, connect_and_attach) + except Exception as e: + log.error(f"attach: failed to connect for {self.unit}: {e}") + return + + buf = b"" + log.info(f"attach: streaming {self.unit} to room {self.room_id}") + + try: + while True: + # Read available data from socket + try: + chunk = await loop.run_in_executor( + None, lambda: self._recv_one_nonblock() + ) + if chunk is None: + await asyncio.sleep(0.05) + continue + if chunk == {}: + break # disconnected + except asyncio.CancelledError: + raise + except Exception as e: + log.error(f"attach: read error for {self.unit}: {e}") + break + + if chunk.get("type") == "output": + data = chunk.get("data", "") + history = chunk.get("history", False) + if data and not history: + # Send to Matrix room, wrapped in code block + await self.matrix_client.room_send( + self.room_id, + "m.room.message", + { + "msgtype": "m.text", + "body": f"```\n{data.rstrip()}\n```", + "format": "org.matrix.custom.html", + "formatted_body": f"<pre><code>{data.rstrip()}</code></pre>", + } + ) + except asyncio.CancelledError: + pass + finally: + if self._sock: + # Send detach + try: + msg = json.dumps({"cmd": "detach"}).encode() + self._sock.setblocking(True) + self._sock.sendall(struct.pack(">I", len(msg)) + msg) + except Exception: + pass + self._sock.close() + self._sock = None + + def _recv_one_nonblock(self) -> Optional[dict]: + """Try to read one message from the non-blocking socket. Returns None if no data yet.""" + try: + hdr = self._sock.recv(4) + if not hdr: + return {} # disconnected + if len(hdr) < 4: + return None + length = struct.unpack(">I", hdr)[0] + data = b"" + self._sock.setblocking(True) + while len(data) < length: + chunk = self._sock.recv(length - len(data)) + if not chunk: + return {} + data += chunk + self._sock.setblocking(False) + return json.loads(data.decode()) + except BlockingIOError: + return None + + +# ── Power level check ───────────────────────────────────────────────────────── + +async def get_user_power_level(client: AsyncClient, + room_id: str, user_id: str) -> int: + """ + Fetch the power level for a user in a room. + Returns 0 if the user is not found or on error. + """ + try: + resp = await client.room_get_state_event( + room_id, "m.room.power_levels", "" + ) + if hasattr(resp, "content"): + users = resp.content.get("users", {}) + default = resp.content.get("users_default", 0) + return users.get(user_id, default) + except Exception as e: + log.warning(f"Could not get power level for {user_id}: {e}") + return 0 + + +# ── Command dispatch ────────────────────────────────────────────────────────── + +async def handle_command( + client: AsyncClient, + umbrella: UmbrellaClient, + attach_sessions: dict, + room: MatrixRoom, + sender: str, + text: str, + config: dict, +) -> Optional[str]: + """ + Parse and dispatch a bot command. Returns a reply string or None. + """ + parts = text.strip().split(None, 3) + if not parts: + return None + + cmd = parts[0].lower() + + # Check power level + power = await get_user_power_level(client, room.room_id, sender) + if power < 50: + return f"❌ Permission denied. Requires power level ≥ 50 (you have {power})." + + try: + if cmd == "!help": + return HELP_TEXT + + elif cmd == "!units": + units = umbrella.list_units() + if not units: + return "No units loaded." + lines = ["**Units:**"] + for u in units: + lines.append(f" `{u['name']}` — {u['display']} [{u['state']}]") + return "\n".join(lines) + + elif cmd == "!status": + if len(parts) < 2: + return "Usage: `!status <unit>`" + unit = parts[1] + resp = umbrella.status(unit) + if resp.get("type") == "error": + return f"❌ {resp['message']}" + return ( + f"**{resp.get('display', unit)}**\n" + f"State: `{resp.get('state', 'unknown')}`" + ) + + elif cmd == "!cmd": + if len(parts) < 3: + return "Usage: `!cmd <unit> <command>`" + unit = parts[1] + command = " ".join(parts[2:]) + resp = umbrella.send_input(unit, command) + if resp.get("type") == "error": + return f"❌ {resp['message']}" + return f"✓ Sent: `{command}`" + + elif cmd == "!restart": + if len(parts) < 2: + return "Usage: `!restart <unit>`" + unit = parts[1] + resp = umbrella.run_action(unit, "restart") + if resp.get("type") == "error": + return f"❌ {resp['message']}" + return f"✓ Restart dispatched for `{unit}`" + + elif cmd == "!update": + if len(parts) < 2: + return "Usage: `!update <unit>`" + unit = parts[1] + resp = umbrella.run_action(unit, "update") + if resp.get("type") == "error": + return f"❌ {resp['message']}" + return f"✓ Update dispatched for `{unit}`" + + elif cmd == "!action": + if len(parts) < 3: + return "Usage: `!action <unit> <action>`" + unit = parts[1] + action = parts[2] + resp = umbrella.run_action(unit, action) + if resp.get("type") == "error": + return f"❌ {resp['message']}" + return f"✓ Action `{action}` dispatched for `{unit}`" + + elif cmd == "!attach": + if len(parts) < 2: + return "Usage: `!attach <unit>`" + unit = parts[1] + key = (room.room_id, unit) + if key in attach_sessions: + return f"Already streaming `{unit}` to this room." + session = AttachSession(unit, room.room_id, client) + attach_sessions[key] = session + await session.start() + return f"📡 Streaming `{unit}` output to this room. Use `!detach {unit}` to stop." + + elif cmd == "!detach": + if len(parts) < 2: + return "Usage: `!detach <unit>`" + unit = parts[1] + key = (room.room_id, unit) + if key not in attach_sessions: + return f"`{unit}` is not being streamed to this room." + await attach_sessions[key].stop() + del attach_sessions[key] + return f"⏹ Stopped streaming `{unit}`." + + except RuntimeError as e: + return f"❌ Umbrella error: {e}" + except Exception as e: + log.exception(f"Command error: {e}") + return f"❌ Internal error: {e}" + + return None + + +# ── Bot core ────────────────────────────────────────────────────────────────── + +async def run_bot(config: dict): + store_path = config.get("store_path", DEFAULT_STORE) + os.makedirs(store_path, exist_ok=True) + + client_config = AsyncClientConfig( + max_limit_exceeded=0, + max_timeouts=0, + store_sync_tokens=True, + encryption_enabled=True, + ) + + client = AsyncClient( + homeserver=config["homeserver"], + user=config["user_id"], + device_id=config.get("device_id", "UMBRELLA_BOT"), + store_path=store_path, + config=client_config, + ) + + umbrella = UmbrellaClient(config.get("socket_path", SOCK_PATH)) + attach_sessions = {} + room_id = config["room_id"] + + # Login + log.info(f"Logging in as {config['user_id']}...") + resp = await client.login( + password=config["password"], + device_name="umbrella-bot", + ) + if not isinstance(resp, LoginResponse): + log.error(f"Login failed: {resp}") + return + + log.info(f"Logged in. Device ID: {client.device_id}") + + # Load encryption keys + if client.should_upload_keys: + await client.keys_upload() + await client.keys_query() + + # Message callback + async def on_message(room: MatrixRoom, event: RoomMessageText): + # Only handle messages in our configured room + if room.room_id != room_id: + return + # Ignore our own messages + if event.sender == config["user_id"]: + return + # Only handle messages that start with ! + body = event.body.strip() + if not body.startswith("!"): + return + + log.info(f"Command from {event.sender}: {body}") + + reply = await handle_command( + client, umbrella, attach_sessions, + room, event.sender, body, config + ) + if reply: + await client.room_send( + room_id, + "m.room.message", + { + "msgtype": "m.text", + "body": reply, + "format": "org.matrix.custom.html", + "formatted_body": reply.replace("\n", "<br>"), + } + ) + + client.add_event_callback(on_message, RoomMessageText) + + # Graceful shutdown + loop = asyncio.get_event_loop() + stop_event = asyncio.Event() + + def _signal_handler(): + log.info("Shutting down...") + stop_event.set() + + loop.add_signal_handler(signal.SIGTERM, _signal_handler) + loop.add_signal_handler(signal.SIGINT, _signal_handler) + + log.info(f"Syncing, watching room {room_id}") + + # Initial sync + await client.sync(timeout=10000) + await client.keys_query() + + # Main sync loop + async def sync_loop(): + while not stop_event.is_set(): + resp = await client.sync(timeout=30000, full_state=False) + if isinstance(resp, SyncResponse): + # Upload any new keys + if client.should_upload_keys: + await client.keys_upload() + + sync_task = asyncio.create_task(sync_loop()) + + await stop_event.wait() + + sync_task.cancel() + try: + await sync_task + except asyncio.CancelledError: + pass + + # Stop all attach sessions + for session in attach_sessions.values(): + await session.stop() + + await client.close() + log.info("Bot stopped.") + + +# ── Setup mode ──────────────────────────────────────────────────────────────── + +async def run_setup(config: dict): + """ + First-run setup: login, generate device keys, print device ID. + Run this once before starting the bot normally. + """ + store_path = config.get("store_path", DEFAULT_STORE) + os.makedirs(store_path, exist_ok=True) + + client = AsyncClient( + homeserver=config["homeserver"], + user=config["user_id"], + device_id=config.get("device_id", "UMBRELLA_BOT"), + store_path=store_path, + config=AsyncClientConfig(encryption_enabled=True), + ) + + print(f"Logging in as {config['user_id']}...") + resp = await client.login( + password=config["password"], + device_name="umbrella-bot", + ) + if not isinstance(resp, LoginResponse): + print(f"Login failed: {resp}") + return + + print(f"Logged in successfully.") + print(f"Device ID: {client.device_id}") + print(f"Add this to your bot.conf: device_id = {client.device_id}") + + if client.should_upload_keys: + await client.keys_upload() + print("Encryption keys uploaded.") + + print("\nSetup complete. You can now run the bot normally.") + print("Note: you may need to verify this device from another session") + print("if your room requires verified devices.") + + await client.close() + + +# ── Config loading ──────────────────────────────────────────────────────────── + +def load_config(path: str) -> dict: + config = {} + with open(path) as f: + for line in f: + line = line.strip() + if not line or line.startswith("#"): + continue + if "=" not in line: + continue + key, _, value = line.partition("=") + config[key.strip()] = value.strip() + + required = ["homeserver", "user_id", "password", "room_id"] + for key in required: + if key not in config: + raise ValueError(f"Missing required config key: {key}") + + return config + + +# ── Entry point ─────────────────────────────────────────────────────────────── + +def main(): + parser = argparse.ArgumentParser(description="Umbrella Matrix bot") + parser.add_argument("-c", "--config", default=DEFAULT_CONF, + help=f"Config file (default: {DEFAULT_CONF})") + parser.add_argument("--setup", action="store_true", + help="Run first-time setup and exit") + args = parser.parse_args() + + try: + config = load_config(args.config) + except FileNotFoundError: + print(f"Config file not found: {args.config}") + print(f"Create it based on bot.conf.example") + sys.exit(1) + except ValueError as e: + print(f"Config error: {e}") + sys.exit(1) + + if args.setup: + asyncio.run(run_setup(config)) + else: + asyncio.run(run_bot(config)) + + +if __name__ == "__main__": + main() |
