summaryrefslogtreecommitdiff
path: root/clients/umbrella-bot/umbrella-bot.py
diff options
context:
space:
mode:
Diffstat (limited to 'clients/umbrella-bot/umbrella-bot.py')
-rwxr-xr-xclients/umbrella-bot/umbrella-bot.py249
1 files changed, 77 insertions, 172 deletions
diff --git a/clients/umbrella-bot/umbrella-bot.py b/clients/umbrella-bot/umbrella-bot.py
index fed3bdb..6e26947 100755
--- a/clients/umbrella-bot/umbrella-bot.py
+++ b/clients/umbrella-bot/umbrella-bot.py
@@ -24,6 +24,7 @@ import struct
import sys
import logging
import argparse
+from datetime import datetime
from pathlib import Path
from typing import Optional
@@ -52,18 +53,22 @@ log = logging.getLogger("umbrella-bot")
DEFAULT_CONF = "/etc/umbrella/bot.conf"
DEFAULT_STORE = "/etc/umbrella/bot-store"
SOCK_PATH = "/run/umbrella/umbrella.sock"
+AUDIT_LOG = "/var/log/umbrella/bot-audit.log"
PROTO_MAX = 65536
+# Maximum lines to show in a !tail response
+TAIL_MAX_LINES = 30
+
HELP_TEXT = """\
Umbrella bot commands:
!status <unit> — Show unit status
!cmd <unit> <command> — Send a console command
+ !tail <unit> — Show recent output from a unit
!restart <unit> — Restart the unit's systemd service
!update <unit> — Run the update action
!action <unit> <action> — Run a named action
- !attach <unit> — Stream live log output to this room
- !detach <unit> — Stop streaming log output
+ !broadcast <message> — Send message to all running units
!units — List all units
!help — Show this message
"""
@@ -75,7 +80,6 @@ class UmbrellaClient:
Synchronous umbrella socket client.
We use a plain blocking socket here since commands are short-lived
and we don't need async for the request/response pattern.
- For attach (streaming), we use a separate async wrapper.
"""
def __init__(self, sock_path: str = SOCK_PATH):
@@ -139,143 +143,27 @@ class UmbrellaClient:
def run_action(self, unit: str, action: str) -> dict:
return self.command({"cmd": "action", "unit": unit, "action": action})
+ def tail(self, unit: str) -> dict:
+ return self.command({"cmd": "tail", "unit": unit})
-# ── Attach session: streams output from umbrella to a Matrix room ─────────────
-
-class AttachSession:
- """
- Maintains a persistent connection to umbrella for a specific unit,
- forwarding live output to a Matrix room.
- """
+ def broadcast(self, message: str) -> dict:
+ return self.command({"cmd": "broadcast", "message": message})
- def __init__(self, unit: str, room_id: str,
- matrix_client: "AsyncClient",
- sock_path: str = SOCK_PATH):
- self.unit = unit
- self.room_id = room_id
- self.matrix_client = matrix_client
- self.sock_path = sock_path
- self._task: Optional[asyncio.Task] = None
- self._sock: Optional[socket.socket] = None
-
- async def start(self):
- if self._task and not self._task.done():
- return # already running
- self._task = asyncio.create_task(self._stream())
-
- async def stop(self):
- if self._task:
- self._task.cancel()
- try:
- await self._task
- except asyncio.CancelledError:
- pass
- if self._sock:
- try:
- self._sock.close()
- except Exception:
- pass
- self._sock = None
-
- async def _stream(self):
- loop = asyncio.get_event_loop()
-
- def connect_and_attach():
- s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
- s.settimeout(10)
- s.connect(self.sock_path)
- # Send attach command
- msg = json.dumps({"cmd": "attach", "unit": self.unit}).encode()
- s.sendall(struct.pack(">I", len(msg)) + msg)
- # Read ok response
- hdr = b""
- while len(hdr) < 4:
- hdr += s.recv(4 - len(hdr))
- length = struct.unpack(">I", hdr)[0]
- resp_data = b""
- while len(resp_data) < length:
- resp_data += s.recv(length - len(resp_data))
- s.settimeout(None) # non-blocking from here
- s.setblocking(False)
- return s
- try:
- self._sock = await loop.run_in_executor(None, connect_and_attach)
- except Exception as e:
- log.error(f"attach: failed to connect for {self.unit}: {e}")
- return
+# ── Audit logging ─────────────────────────────────────────────────────────────
- buf = b""
- log.info(f"attach: streaming {self.unit} to room {self.room_id}")
-
- try:
- while True:
- # Read available data from socket
- try:
- chunk = await loop.run_in_executor(
- None, lambda: self._recv_one_nonblock()
- )
- if chunk is None:
- await asyncio.sleep(0.05)
- continue
- if chunk == {}:
- break # disconnected
- except asyncio.CancelledError:
- raise
- except Exception as e:
- log.error(f"attach: read error for {self.unit}: {e}")
- break
-
- if chunk.get("type") == "output":
- data = chunk.get("data", "")
- history = chunk.get("history", False)
- if data and not history:
- # Send to Matrix room, wrapped in code block
- await self.matrix_client.room_send(
- self.room_id,
- "m.room.message",
- {
- "msgtype": "m.text",
- "body": f"```\n{data.rstrip()}\n```",
- "format": "org.matrix.custom.html",
- "formatted_body": f"<pre><code>{data.rstrip()}</code></pre>",
- },
- ignore_unverified_devices=True,
- )
- except asyncio.CancelledError:
- pass
- finally:
- if self._sock:
- # Send detach
- try:
- msg = json.dumps({"cmd": "detach"}).encode()
- self._sock.setblocking(True)
- self._sock.sendall(struct.pack(">I", len(msg)) + msg)
- except Exception:
- pass
- self._sock.close()
- self._sock = None
-
- def _recv_one_nonblock(self) -> Optional[dict]:
- """Try to read one message from the non-blocking socket. Returns None if no data yet."""
- try:
- hdr = self._sock.recv(4)
- if not hdr:
- return {} # disconnected
- if len(hdr) < 4:
- return None
- length = struct.unpack(">I", hdr)[0]
- data = b""
- self._sock.setblocking(True)
- while len(data) < length:
- chunk = self._sock.recv(length - len(data))
- if not chunk:
- return {}
- data += chunk
- self._sock.setblocking(False)
- return json.loads(data.decode())
- except BlockingIOError:
- return None
+def audit_log(sender: str, unit: str, command: str) -> None:
+ """
+ Append a line to the bot audit log for !cmd invocations.
+ Format: [YYYY-MM-DD HH:MM:SS] <sender> !cmd <unit> <command>
+ """
+ timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
+ line = f"[{timestamp}] {sender} !cmd {unit} {command}\n"
+ try:
+ with open(AUDIT_LOG, "a") as f:
+ f.write(line)
+ except OSError as e:
+ log.warning(f"Could not write audit log: {e}")
# ── Power level check ─────────────────────────────────────────────────────────
@@ -304,7 +192,6 @@ async def get_user_power_level(client: AsyncClient,
async def handle_command(
client: AsyncClient,
umbrella: UmbrellaClient,
- attach_sessions: dict,
room: MatrixRoom,
sender: str,
text: str,
@@ -334,7 +221,10 @@ async def handle_command(
return "No units loaded."
lines = ["**Units:**"]
for u in units:
- lines.append(f" `{u['name']}` — {u['display']} [{u['state']}]")
+ pl = u.get("players", -1)
+ mx = u.get("max_players", -1)
+ players = f" ({pl}/{mx})" if pl >= 0 and mx >= 0 else ""
+ lines.append(f" `{u['name']}` — {u['display']} [{u['state']}{players}]")
return "\n".join(lines)
elif cmd == "!status":
@@ -344,10 +234,18 @@ async def handle_command(
resp = umbrella.status(unit)
if resp.get("type") == "error":
return f"❌ {resp['message']}"
- return (
- f"**{resp.get('display', unit)}**\n"
- f"State: `{resp.get('state', 'unknown')}`"
- )
+ pl = resp.get("players", -1)
+ mx = resp.get("max_players", -1)
+ map_name = resp.get("map", "")
+ lines = [
+ f"**{resp.get('display', unit)}**",
+ f"State: `{resp.get('state', 'unknown')}`",
+ ]
+ if isinstance(pl, int) and pl >= 0 and isinstance(mx, int) and mx >= 0:
+ lines.append(f"Players: {pl}/{mx}")
+ if map_name:
+ lines.append(f"Map: {map_name}")
+ return "\n".join(lines)
elif cmd == "!cmd":
if len(parts) < 3:
@@ -357,8 +255,43 @@ async def handle_command(
resp = umbrella.send_input(unit, command)
if resp.get("type") == "error":
return f"❌ {resp['message']}"
+ audit_log(sender, unit, command)
return f"✓ Sent: `{command}`"
+ elif cmd == "!tail":
+ if len(parts) < 2:
+ return "Usage: `!tail <unit>`"
+ unit = parts[1]
+ resp = umbrella.tail(unit)
+ if resp.get("type") == "error":
+ return f"❌ {resp.get('message', 'unknown error')}"
+ data = resp.get("data", "")
+ if not data or not data.strip():
+ return f"No output buffered for `{unit}`."
+ # Trim to last TAIL_MAX_LINES lines to avoid flooding the room
+ lines = data.splitlines()
+ if len(lines) > TAIL_MAX_LINES:
+ lines = lines[-TAIL_MAX_LINES:]
+ trimmed = f"(showing last {TAIL_MAX_LINES} lines)\n"
+ else:
+ trimmed = ""
+ body = trimmed + "\n".join(lines)
+ return f"```\n{body}\n```"
+
+ elif cmd == "!broadcast":
+ if len(parts) < 2:
+ return "Usage: `!broadcast <message>`"
+ message = " ".join(parts[1:])
+ resp = umbrella.broadcast(message)
+ if resp.get("type") == "error":
+ return f"❌ {resp.get('message', 'unknown error')}"
+ sent = resp.get("sent", 0)
+ failed = resp.get("failed", 0)
+ result = f"✓ Broadcast sent to {sent} unit(s)"
+ if failed:
+ result += f", {failed} failed"
+ return result
+
elif cmd == "!restart":
if len(parts) < 2:
return "Usage: `!restart <unit>`"
@@ -387,29 +320,6 @@ async def handle_command(
return f"❌ {resp['message']}"
return f"✓ Action `{action}` dispatched for `{unit}`"
- elif cmd == "!attach":
- if len(parts) < 2:
- return "Usage: `!attach <unit>`"
- unit = parts[1]
- key = (room.room_id, unit)
- if key in attach_sessions:
- return f"Already streaming `{unit}` to this room."
- session = AttachSession(unit, room.room_id, client)
- attach_sessions[key] = session
- await session.start()
- return f"📡 Streaming `{unit}` output to this room. Use `!detach {unit}` to stop."
-
- elif cmd == "!detach":
- if len(parts) < 2:
- return "Usage: `!detach <unit>`"
- unit = parts[1]
- key = (room.room_id, unit)
- if key not in attach_sessions:
- return f"`{unit}` is not being streamed to this room."
- await attach_sessions[key].stop()
- del attach_sessions[key]
- return f"⏹ Stopped streaming `{unit}`."
-
except RuntimeError as e:
return f"❌ Umbrella error: {e}"
except Exception as e:
@@ -441,7 +351,6 @@ async def run_bot(config: dict):
)
umbrella = UmbrellaClient(config.get("socket_path", SOCK_PATH))
- attach_sessions = {}
room_id = config["room_id"]
# Login
@@ -478,7 +387,7 @@ async def run_bot(config: dict):
log.info(f"Command from {event.sender}: {body}")
reply = await handle_command(
- client, umbrella, attach_sessions,
+ client, umbrella,
room, event.sender, body, config
)
if reply:
@@ -525,7 +434,7 @@ async def run_bot(config: dict):
await client.keys_upload()
if client.should_query_keys:
await client.keys_query()
-
+
sync_task = asyncio.create_task(sync_loop())
await stop_event.wait()
@@ -536,10 +445,6 @@ async def run_bot(config: dict):
except asyncio.CancelledError:
pass
- # Stop all attach sessions
- for session in attach_sessions.values():
- await session.stop()
-
await client.close()
log.info("Bot stopped.")