#!/usr/bin/env python3 """Minimal guest-side exec agent for pyro runtime bundles.""" from __future__ import annotations import json import socket import subprocess import time from typing import Any PORT = 5005 BUFFER_SIZE = 65536 def _read_request(conn: socket.socket) -> dict[str, Any]: chunks: list[bytes] = [] while True: data = conn.recv(BUFFER_SIZE) if data == b"": break chunks.append(data) if b"\n" in data: break payload = json.loads(b"".join(chunks).decode("utf-8").strip()) if not isinstance(payload, dict): raise RuntimeError("request must be a JSON object") return payload def _run_command(command: str, timeout_seconds: int) -> dict[str, Any]: started = time.monotonic() try: proc = subprocess.run( ["/bin/sh", "-lc", command], text=True, capture_output=True, timeout=timeout_seconds, check=False, ) return { "stdout": proc.stdout, "stderr": proc.stderr, "exit_code": proc.returncode, "duration_ms": int((time.monotonic() - started) * 1000), } except subprocess.TimeoutExpired: return { "stdout": "", "stderr": f"command timed out after {timeout_seconds}s", "exit_code": 124, "duration_ms": int((time.monotonic() - started) * 1000), } def main() -> None: family = getattr(socket, "AF_VSOCK", None) if family is None: raise SystemExit("AF_VSOCK is unavailable") with socket.socket(family, socket.SOCK_STREAM) as server: server.bind((socket.VMADDR_CID_ANY, PORT)) server.listen(1) while True: conn, _ = server.accept() with conn: request = _read_request(conn) command = str(request.get("command", "")) timeout_seconds = int(request.get("timeout_seconds", 30)) response = _run_command(command, timeout_seconds) conn.sendall((json.dumps(response) + "\n").encode("utf-8")) if __name__ == "__main__": main()