summaryrefslogtreecommitdiff
path: root/clients/umbrella-bot/umbrella-bot.py
diff options
context:
space:
mode:
Diffstat (limited to 'clients/umbrella-bot/umbrella-bot.py')
-rwxr-xr-xclients/umbrella-bot/umbrella-bot.py631
1 files changed, 631 insertions, 0 deletions
diff --git a/clients/umbrella-bot/umbrella-bot.py b/clients/umbrella-bot/umbrella-bot.py
new file mode 100755
index 0000000..001b94c
--- /dev/null
+++ b/clients/umbrella-bot/umbrella-bot.py
@@ -0,0 +1,631 @@
+#!/usr/bin/env python3
+"""
+umbrella-bot: Matrix bot client for the umbrella server manager.
+
+Connects to the umbrella Unix socket and bridges commands from a Matrix
+room to umbrella units. Requires matrix-nio with encryption support.
+
+Dependencies:
+ pip install matrix-nio[e2e] aiofiles
+
+Setup:
+ 1. Create /etc/umbrella/bot.conf (see bot.conf.example)
+ 2. Run once with --setup to perform initial login and key verification
+ 3. Run normally or via systemd
+"""
+
+import asyncio
+import json
+import os
+import re
+import signal
+import socket
+import struct
+import sys
+import logging
+import argparse
+from pathlib import Path
+from typing import Optional
+
+from nio import (
+ AsyncClient,
+ AsyncClientConfig,
+ LoginResponse,
+ MatrixRoom,
+ RoomMessageText,
+ SyncResponse,
+ crypto,
+)
+from nio.store import SqliteStore
+
+# ── Logging ───────────────────────────────────────────────────────────────────
+
+logging.basicConfig(
+ level=logging.INFO,
+ format="[%(asctime)s] [%(levelname)s] %(message)s",
+ datefmt="%Y-%m-%d %H:%M:%S",
+)
+log = logging.getLogger("umbrella-bot")
+
+# ── Config ────────────────────────────────────────────────────────────────────
+
+DEFAULT_CONF = "/etc/umbrella/bot.conf"
+DEFAULT_STORE = "/etc/umbrella/bot-store"
+SOCK_PATH = "/run/umbrella/umbrella.sock"
+PROTO_MAX = 65536
+
+HELP_TEXT = """\
+Umbrella bot commands:
+
+ !status <unit> — Show unit status
+ !cmd <unit> <command> — Send a console command
+ !restart <unit> — Restart the unit's systemd service
+ !update <unit> — Run the update action
+ !action <unit> <action> — Run a named action
+ !attach <unit> — Stream live log output to this room
+ !detach <unit> — Stop streaming log output
+ !units — List all units
+ !help — Show this message
+"""
+
+# ── Umbrella socket client ────────────────────────────────────────────────────
+
+class UmbrellaClient:
+ """
+ Synchronous umbrella socket client.
+ We use a plain blocking socket here since commands are short-lived
+ and we don't need async for the request/response pattern.
+ For attach (streaming), we use a separate async wrapper.
+ """
+
+ def __init__(self, sock_path: str = SOCK_PATH):
+ self.sock_path = sock_path
+
+ def _connect(self) -> socket.socket:
+ s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ s.settimeout(10)
+ try:
+ s.connect(self.sock_path)
+ except FileNotFoundError:
+ raise RuntimeError(f"Umbrella socket not found: {self.sock_path} "
+ f"(is umbrelld running?)")
+ except ConnectionRefusedError:
+ raise RuntimeError("Umbrella daemon is not running")
+ return s
+
+ def _send(self, s: socket.socket, msg: dict) -> None:
+ data = json.dumps(msg).encode()
+ header = struct.pack(">I", len(data))
+ s.sendall(header + data)
+
+ def _recv(self, s: socket.socket) -> dict:
+ header = b""
+ while len(header) < 4:
+ chunk = s.recv(4 - len(header))
+ if not chunk:
+ raise RuntimeError("Connection closed by umbrella daemon")
+ header += chunk
+ length = struct.unpack(">I", header)[0]
+ if length > PROTO_MAX:
+ raise RuntimeError(f"Message too large: {length}")
+ data = b""
+ while len(data) < length:
+ chunk = s.recv(length - len(data))
+ if not chunk:
+ raise RuntimeError("Connection closed mid-message")
+ data += chunk
+ return json.loads(data.decode())
+
+ def command(self, msg: dict) -> dict:
+ """Send a command and return the response."""
+ s = self._connect()
+ try:
+ self._send(s, msg)
+ return self._recv(s)
+ finally:
+ s.close()
+
+ def list_units(self) -> list:
+ resp = self.command({"cmd": "list"})
+ return resp.get("units", [])
+
+ def status(self, unit: str) -> dict:
+ return self.command({"cmd": "status", "unit": unit})
+
+ def send_input(self, unit: str, data: str) -> dict:
+ return self.command({"cmd": "input", "unit": unit,
+ "data": data + "\n"})
+
+ def run_action(self, unit: str, action: str) -> dict:
+ return self.command({"cmd": "action", "unit": unit, "action": action})
+
+
+# ── Attach session: streams output from umbrella to a Matrix room ─────────────
+
+class AttachSession:
+ """
+ Maintains a persistent connection to umbrella for a specific unit,
+ forwarding live output to a Matrix room.
+ """
+
+ def __init__(self, unit: str, room_id: str,
+ matrix_client: "AsyncClient",
+ sock_path: str = SOCK_PATH):
+ self.unit = unit
+ self.room_id = room_id
+ self.matrix_client = matrix_client
+ self.sock_path = sock_path
+ self._task: Optional[asyncio.Task] = None
+ self._sock: Optional[socket.socket] = None
+
+ async def start(self):
+ if self._task and not self._task.done():
+ return # already running
+ self._task = asyncio.create_task(self._stream())
+
+ async def stop(self):
+ if self._task:
+ self._task.cancel()
+ try:
+ await self._task
+ except asyncio.CancelledError:
+ pass
+ if self._sock:
+ try:
+ self._sock.close()
+ except Exception:
+ pass
+ self._sock = None
+
+ async def _stream(self):
+ loop = asyncio.get_event_loop()
+
+ def connect_and_attach():
+ s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ s.settimeout(10)
+ s.connect(self.sock_path)
+ # Send attach command
+ msg = json.dumps({"cmd": "attach", "unit": self.unit}).encode()
+ s.sendall(struct.pack(">I", len(msg)) + msg)
+ # Read ok response
+ hdr = b""
+ while len(hdr) < 4:
+ hdr += s.recv(4 - len(hdr))
+ length = struct.unpack(">I", hdr)[0]
+ resp_data = b""
+ while len(resp_data) < length:
+ resp_data += s.recv(length - len(resp_data))
+ s.settimeout(None) # non-blocking from here
+ s.setblocking(False)
+ return s
+
+ try:
+ self._sock = await loop.run_in_executor(None, connect_and_attach)
+ except Exception as e:
+ log.error(f"attach: failed to connect for {self.unit}: {e}")
+ return
+
+ buf = b""
+ log.info(f"attach: streaming {self.unit} to room {self.room_id}")
+
+ try:
+ while True:
+ # Read available data from socket
+ try:
+ chunk = await loop.run_in_executor(
+ None, lambda: self._recv_one_nonblock()
+ )
+ if chunk is None:
+ await asyncio.sleep(0.05)
+ continue
+ if chunk == {}:
+ break # disconnected
+ except asyncio.CancelledError:
+ raise
+ except Exception as e:
+ log.error(f"attach: read error for {self.unit}: {e}")
+ break
+
+ if chunk.get("type") == "output":
+ data = chunk.get("data", "")
+ history = chunk.get("history", False)
+ if data and not history:
+ # Send to Matrix room, wrapped in code block
+ await self.matrix_client.room_send(
+ self.room_id,
+ "m.room.message",
+ {
+ "msgtype": "m.text",
+ "body": f"```\n{data.rstrip()}\n```",
+ "format": "org.matrix.custom.html",
+ "formatted_body": f"<pre><code>{data.rstrip()}</code></pre>",
+ }
+ )
+ except asyncio.CancelledError:
+ pass
+ finally:
+ if self._sock:
+ # Send detach
+ try:
+ msg = json.dumps({"cmd": "detach"}).encode()
+ self._sock.setblocking(True)
+ self._sock.sendall(struct.pack(">I", len(msg)) + msg)
+ except Exception:
+ pass
+ self._sock.close()
+ self._sock = None
+
+ def _recv_one_nonblock(self) -> Optional[dict]:
+ """Try to read one message from the non-blocking socket. Returns None if no data yet."""
+ try:
+ hdr = self._sock.recv(4)
+ if not hdr:
+ return {} # disconnected
+ if len(hdr) < 4:
+ return None
+ length = struct.unpack(">I", hdr)[0]
+ data = b""
+ self._sock.setblocking(True)
+ while len(data) < length:
+ chunk = self._sock.recv(length - len(data))
+ if not chunk:
+ return {}
+ data += chunk
+ self._sock.setblocking(False)
+ return json.loads(data.decode())
+ except BlockingIOError:
+ return None
+
+
+# ── Power level check ─────────────────────────────────────────────────────────
+
+async def get_user_power_level(client: AsyncClient,
+ room_id: str, user_id: str) -> int:
+ """
+ Fetch the power level for a user in a room.
+ Returns 0 if the user is not found or on error.
+ """
+ try:
+ resp = await client.room_get_state_event(
+ room_id, "m.room.power_levels", ""
+ )
+ if hasattr(resp, "content"):
+ users = resp.content.get("users", {})
+ default = resp.content.get("users_default", 0)
+ return users.get(user_id, default)
+ except Exception as e:
+ log.warning(f"Could not get power level for {user_id}: {e}")
+ return 0
+
+
+# ── Command dispatch ──────────────────────────────────────────────────────────
+
+async def handle_command(
+ client: AsyncClient,
+ umbrella: UmbrellaClient,
+ attach_sessions: dict,
+ room: MatrixRoom,
+ sender: str,
+ text: str,
+ config: dict,
+) -> Optional[str]:
+ """
+ Parse and dispatch a bot command. Returns a reply string or None.
+ """
+ parts = text.strip().split(None, 3)
+ if not parts:
+ return None
+
+ cmd = parts[0].lower()
+
+ # Check power level
+ power = await get_user_power_level(client, room.room_id, sender)
+ if power < 50:
+ return f"❌ Permission denied. Requires power level ≥ 50 (you have {power})."
+
+ try:
+ if cmd == "!help":
+ return HELP_TEXT
+
+ elif cmd == "!units":
+ units = umbrella.list_units()
+ if not units:
+ return "No units loaded."
+ lines = ["**Units:**"]
+ for u in units:
+ lines.append(f" `{u['name']}` — {u['display']} [{u['state']}]")
+ return "\n".join(lines)
+
+ elif cmd == "!status":
+ if len(parts) < 2:
+ return "Usage: `!status <unit>`"
+ unit = parts[1]
+ resp = umbrella.status(unit)
+ if resp.get("type") == "error":
+ return f"❌ {resp['message']}"
+ return (
+ f"**{resp.get('display', unit)}**\n"
+ f"State: `{resp.get('state', 'unknown')}`"
+ )
+
+ elif cmd == "!cmd":
+ if len(parts) < 3:
+ return "Usage: `!cmd <unit> <command>`"
+ unit = parts[1]
+ command = " ".join(parts[2:])
+ resp = umbrella.send_input(unit, command)
+ if resp.get("type") == "error":
+ return f"❌ {resp['message']}"
+ return f"✓ Sent: `{command}`"
+
+ elif cmd == "!restart":
+ if len(parts) < 2:
+ return "Usage: `!restart <unit>`"
+ unit = parts[1]
+ resp = umbrella.run_action(unit, "restart")
+ if resp.get("type") == "error":
+ return f"❌ {resp['message']}"
+ return f"✓ Restart dispatched for `{unit}`"
+
+ elif cmd == "!update":
+ if len(parts) < 2:
+ return "Usage: `!update <unit>`"
+ unit = parts[1]
+ resp = umbrella.run_action(unit, "update")
+ if resp.get("type") == "error":
+ return f"❌ {resp['message']}"
+ return f"✓ Update dispatched for `{unit}`"
+
+ elif cmd == "!action":
+ if len(parts) < 3:
+ return "Usage: `!action <unit> <action>`"
+ unit = parts[1]
+ action = parts[2]
+ resp = umbrella.run_action(unit, action)
+ if resp.get("type") == "error":
+ return f"❌ {resp['message']}"
+ return f"✓ Action `{action}` dispatched for `{unit}`"
+
+ elif cmd == "!attach":
+ if len(parts) < 2:
+ return "Usage: `!attach <unit>`"
+ unit = parts[1]
+ key = (room.room_id, unit)
+ if key in attach_sessions:
+ return f"Already streaming `{unit}` to this room."
+ session = AttachSession(unit, room.room_id, client)
+ attach_sessions[key] = session
+ await session.start()
+ return f"📡 Streaming `{unit}` output to this room. Use `!detach {unit}` to stop."
+
+ elif cmd == "!detach":
+ if len(parts) < 2:
+ return "Usage: `!detach <unit>`"
+ unit = parts[1]
+ key = (room.room_id, unit)
+ if key not in attach_sessions:
+ return f"`{unit}` is not being streamed to this room."
+ await attach_sessions[key].stop()
+ del attach_sessions[key]
+ return f"⏹ Stopped streaming `{unit}`."
+
+ except RuntimeError as e:
+ return f"❌ Umbrella error: {e}"
+ except Exception as e:
+ log.exception(f"Command error: {e}")
+ return f"❌ Internal error: {e}"
+
+ return None
+
+
+# ── Bot core ──────────────────────────────────────────────────────────────────
+
+async def run_bot(config: dict):
+ store_path = config.get("store_path", DEFAULT_STORE)
+ os.makedirs(store_path, exist_ok=True)
+
+ client_config = AsyncClientConfig(
+ max_limit_exceeded=0,
+ max_timeouts=0,
+ store_sync_tokens=True,
+ encryption_enabled=True,
+ )
+
+ client = AsyncClient(
+ homeserver=config["homeserver"],
+ user=config["user_id"],
+ device_id=config.get("device_id", "UMBRELLA_BOT"),
+ store_path=store_path,
+ config=client_config,
+ )
+
+ umbrella = UmbrellaClient(config.get("socket_path", SOCK_PATH))
+ attach_sessions = {}
+ room_id = config["room_id"]
+
+ # Login
+ log.info(f"Logging in as {config['user_id']}...")
+ resp = await client.login(
+ password=config["password"],
+ device_name="umbrella-bot",
+ )
+ if not isinstance(resp, LoginResponse):
+ log.error(f"Login failed: {resp}")
+ return
+
+ log.info(f"Logged in. Device ID: {client.device_id}")
+
+ # Load encryption keys
+ if client.should_upload_keys:
+ await client.keys_upload()
+ await client.keys_query()
+
+ # Message callback
+ async def on_message(room: MatrixRoom, event: RoomMessageText):
+ # Only handle messages in our configured room
+ if room.room_id != room_id:
+ return
+ # Ignore our own messages
+ if event.sender == config["user_id"]:
+ return
+ # Only handle messages that start with !
+ body = event.body.strip()
+ if not body.startswith("!"):
+ return
+
+ log.info(f"Command from {event.sender}: {body}")
+
+ reply = await handle_command(
+ client, umbrella, attach_sessions,
+ room, event.sender, body, config
+ )
+ if reply:
+ await client.room_send(
+ room_id,
+ "m.room.message",
+ {
+ "msgtype": "m.text",
+ "body": reply,
+ "format": "org.matrix.custom.html",
+ "formatted_body": reply.replace("\n", "<br>"),
+ }
+ )
+
+ client.add_event_callback(on_message, RoomMessageText)
+
+ # Graceful shutdown
+ loop = asyncio.get_event_loop()
+ stop_event = asyncio.Event()
+
+ def _signal_handler():
+ log.info("Shutting down...")
+ stop_event.set()
+
+ loop.add_signal_handler(signal.SIGTERM, _signal_handler)
+ loop.add_signal_handler(signal.SIGINT, _signal_handler)
+
+ log.info(f"Syncing, watching room {room_id}")
+
+ # Initial sync
+ await client.sync(timeout=10000)
+ await client.keys_query()
+
+ # Main sync loop
+ async def sync_loop():
+ while not stop_event.is_set():
+ resp = await client.sync(timeout=30000, full_state=False)
+ if isinstance(resp, SyncResponse):
+ # Upload any new keys
+ if client.should_upload_keys:
+ await client.keys_upload()
+
+ sync_task = asyncio.create_task(sync_loop())
+
+ await stop_event.wait()
+
+ sync_task.cancel()
+ try:
+ await sync_task
+ except asyncio.CancelledError:
+ pass
+
+ # Stop all attach sessions
+ for session in attach_sessions.values():
+ await session.stop()
+
+ await client.close()
+ log.info("Bot stopped.")
+
+
+# ── Setup mode ────────────────────────────────────────────────────────────────
+
+async def run_setup(config: dict):
+ """
+ First-run setup: login, generate device keys, print device ID.
+ Run this once before starting the bot normally.
+ """
+ store_path = config.get("store_path", DEFAULT_STORE)
+ os.makedirs(store_path, exist_ok=True)
+
+ client = AsyncClient(
+ homeserver=config["homeserver"],
+ user=config["user_id"],
+ device_id=config.get("device_id", "UMBRELLA_BOT"),
+ store_path=store_path,
+ config=AsyncClientConfig(encryption_enabled=True),
+ )
+
+ print(f"Logging in as {config['user_id']}...")
+ resp = await client.login(
+ password=config["password"],
+ device_name="umbrella-bot",
+ )
+ if not isinstance(resp, LoginResponse):
+ print(f"Login failed: {resp}")
+ return
+
+ print(f"Logged in successfully.")
+ print(f"Device ID: {client.device_id}")
+ print(f"Add this to your bot.conf: device_id = {client.device_id}")
+
+ if client.should_upload_keys:
+ await client.keys_upload()
+ print("Encryption keys uploaded.")
+
+ print("\nSetup complete. You can now run the bot normally.")
+ print("Note: you may need to verify this device from another session")
+ print("if your room requires verified devices.")
+
+ await client.close()
+
+
+# ── Config loading ────────────────────────────────────────────────────────────
+
+def load_config(path: str) -> dict:
+ config = {}
+ with open(path) as f:
+ for line in f:
+ line = line.strip()
+ if not line or line.startswith("#"):
+ continue
+ if "=" not in line:
+ continue
+ key, _, value = line.partition("=")
+ config[key.strip()] = value.strip()
+
+ required = ["homeserver", "user_id", "password", "room_id"]
+ for key in required:
+ if key not in config:
+ raise ValueError(f"Missing required config key: {key}")
+
+ return config
+
+
+# ── Entry point ───────────────────────────────────────────────────────────────
+
+def main():
+ parser = argparse.ArgumentParser(description="Umbrella Matrix bot")
+ parser.add_argument("-c", "--config", default=DEFAULT_CONF,
+ help=f"Config file (default: {DEFAULT_CONF})")
+ parser.add_argument("--setup", action="store_true",
+ help="Run first-time setup and exit")
+ args = parser.parse_args()
+
+ try:
+ config = load_config(args.config)
+ except FileNotFoundError:
+ print(f"Config file not found: {args.config}")
+ print(f"Create it based on bot.conf.example")
+ sys.exit(1)
+ except ValueError as e:
+ print(f"Config error: {e}")
+ sys.exit(1)
+
+ if args.setup:
+ asyncio.run(run_setup(config))
+ else:
+ asyncio.run(run_bot(config))
+
+
+if __name__ == "__main__":
+ main()