"""Public facade shared by the Python SDK and MCP server.""" from __future__ import annotations from pathlib import Path from typing import Any from mcp.server.fastmcp import FastMCP from pyro_mcp.vm_manager import ( DEFAULT_ALLOW_HOST_COMPAT, DEFAULT_MEM_MIB, DEFAULT_TIMEOUT_SECONDS, DEFAULT_TTL_SECONDS, DEFAULT_VCPU_COUNT, DEFAULT_WORKSPACE_NETWORK_POLICY, VmManager, ) class Pyro: """High-level facade over the ephemeral VM runtime.""" def __init__( self, manager: VmManager | None = None, *, backend_name: str | None = None, base_dir: Path | None = None, cache_dir: Path | None = None, max_active_vms: int = 4, ) -> None: self._manager = manager or VmManager( backend_name=backend_name, base_dir=base_dir, cache_dir=cache_dir, max_active_vms=max_active_vms, ) @property def manager(self) -> VmManager: return self._manager def list_environments(self) -> list[dict[str, object]]: return self._manager.list_environments() def pull_environment(self, environment: str) -> dict[str, object]: return self._manager.pull_environment(environment) def inspect_environment(self, environment: str) -> dict[str, object]: return self._manager.inspect_environment(environment) def prune_environments(self) -> dict[str, object]: return self._manager.prune_environments() def create_vm( self, *, environment: str, vcpu_count: int = DEFAULT_VCPU_COUNT, mem_mib: int = DEFAULT_MEM_MIB, ttl_seconds: int = DEFAULT_TTL_SECONDS, network: bool = False, allow_host_compat: bool = DEFAULT_ALLOW_HOST_COMPAT, ) -> dict[str, Any]: return self._manager.create_vm( environment=environment, vcpu_count=vcpu_count, mem_mib=mem_mib, ttl_seconds=ttl_seconds, network=network, allow_host_compat=allow_host_compat, ) def start_vm(self, vm_id: str) -> dict[str, Any]: return self._manager.start_vm(vm_id) def exec_vm(self, vm_id: str, *, command: str, timeout_seconds: int = 30) -> dict[str, Any]: return self._manager.exec_vm(vm_id, command=command, timeout_seconds=timeout_seconds) def create_workspace( self, *, environment: str, vcpu_count: int = DEFAULT_VCPU_COUNT, mem_mib: int = DEFAULT_MEM_MIB, ttl_seconds: int = DEFAULT_TTL_SECONDS, network_policy: str = DEFAULT_WORKSPACE_NETWORK_POLICY, allow_host_compat: bool = DEFAULT_ALLOW_HOST_COMPAT, seed_path: str | Path | None = None, secrets: list[dict[str, str]] | None = None, name: str | None = None, labels: dict[str, str] | None = None, ) -> dict[str, Any]: return self._manager.create_workspace( environment=environment, vcpu_count=vcpu_count, mem_mib=mem_mib, ttl_seconds=ttl_seconds, network_policy=network_policy, allow_host_compat=allow_host_compat, seed_path=seed_path, secrets=secrets, name=name, labels=labels, ) def list_workspaces(self) -> dict[str, Any]: return self._manager.list_workspaces() def update_workspace( self, workspace_id: str, *, name: str | None = None, clear_name: bool = False, labels: dict[str, str] | None = None, clear_labels: list[str] | None = None, ) -> dict[str, Any]: return self._manager.update_workspace( workspace_id, name=name, clear_name=clear_name, labels=labels, clear_labels=clear_labels, ) def exec_workspace( self, workspace_id: str, *, command: str, timeout_seconds: int = DEFAULT_TIMEOUT_SECONDS, secret_env: dict[str, str] | None = None, ) -> dict[str, Any]: return self._manager.exec_workspace( workspace_id, command=command, timeout_seconds=timeout_seconds, secret_env=secret_env, ) def status_workspace(self, workspace_id: str) -> dict[str, Any]: return self._manager.status_workspace(workspace_id) def stop_workspace(self, workspace_id: str) -> dict[str, Any]: return self._manager.stop_workspace(workspace_id) def start_workspace(self, workspace_id: str) -> dict[str, Any]: return self._manager.start_workspace(workspace_id) def push_workspace_sync( self, workspace_id: str, source_path: str | Path, *, dest: str = "/workspace", ) -> dict[str, Any]: return self._manager.push_workspace_sync( workspace_id, source_path=source_path, dest=dest, ) def logs_workspace(self, workspace_id: str) -> dict[str, Any]: return self._manager.logs_workspace(workspace_id) def export_workspace( self, workspace_id: str, path: str, *, output_path: str | Path, ) -> dict[str, Any]: return self._manager.export_workspace( workspace_id, path=path, output_path=output_path, ) def diff_workspace(self, workspace_id: str) -> dict[str, Any]: return self._manager.diff_workspace(workspace_id) def list_workspace_files( self, workspace_id: str, *, path: str = "/workspace", recursive: bool = False, ) -> dict[str, Any]: return self._manager.list_workspace_files( workspace_id, path=path, recursive=recursive, ) def read_workspace_file( self, workspace_id: str, path: str, *, max_bytes: int = 65536, ) -> dict[str, Any]: return self._manager.read_workspace_file( workspace_id, path, max_bytes=max_bytes, ) def write_workspace_file( self, workspace_id: str, path: str, *, text: str, ) -> dict[str, Any]: return self._manager.write_workspace_file( workspace_id, path, text=text, ) def apply_workspace_patch( self, workspace_id: str, *, patch: str, ) -> dict[str, Any]: return self._manager.apply_workspace_patch( workspace_id, patch=patch, ) def export_workspace_disk( self, workspace_id: str, *, output_path: str | Path, ) -> dict[str, Any]: return self._manager.export_workspace_disk( workspace_id, output_path=output_path, ) def list_workspace_disk( self, workspace_id: str, *, path: str = "/workspace", recursive: bool = False, ) -> dict[str, Any]: return self._manager.list_workspace_disk( workspace_id, path=path, recursive=recursive, ) def read_workspace_disk( self, workspace_id: str, path: str, *, max_bytes: int = 65536, ) -> dict[str, Any]: return self._manager.read_workspace_disk( workspace_id, path=path, max_bytes=max_bytes, ) def create_snapshot(self, workspace_id: str, snapshot_name: str) -> dict[str, Any]: return self._manager.create_snapshot(workspace_id, snapshot_name) def list_snapshots(self, workspace_id: str) -> dict[str, Any]: return self._manager.list_snapshots(workspace_id) def delete_snapshot(self, workspace_id: str, snapshot_name: str) -> dict[str, Any]: return self._manager.delete_snapshot(workspace_id, snapshot_name) def reset_workspace( self, workspace_id: str, *, snapshot: str = "baseline", ) -> dict[str, Any]: return self._manager.reset_workspace(workspace_id, snapshot=snapshot) def open_shell( self, workspace_id: str, *, cwd: str = "/workspace", cols: int = 120, rows: int = 30, secret_env: dict[str, str] | None = None, ) -> dict[str, Any]: return self._manager.open_shell( workspace_id, cwd=cwd, cols=cols, rows=rows, secret_env=secret_env, ) def read_shell( self, workspace_id: str, shell_id: str, *, cursor: int = 0, max_chars: int = 65536, ) -> dict[str, Any]: return self._manager.read_shell( workspace_id, shell_id, cursor=cursor, max_chars=max_chars, ) def write_shell( self, workspace_id: str, shell_id: str, *, input: str, append_newline: bool = True, ) -> dict[str, Any]: return self._manager.write_shell( workspace_id, shell_id, input_text=input, append_newline=append_newline, ) def signal_shell( self, workspace_id: str, shell_id: str, *, signal_name: str = "INT", ) -> dict[str, Any]: return self._manager.signal_shell( workspace_id, shell_id, signal_name=signal_name, ) def close_shell(self, workspace_id: str, shell_id: str) -> dict[str, Any]: return self._manager.close_shell(workspace_id, shell_id) def start_service( self, workspace_id: str, service_name: str, *, command: str, cwd: str = "/workspace", readiness: dict[str, Any] | None = None, ready_timeout_seconds: int = 30, ready_interval_ms: int = 500, secret_env: dict[str, str] | None = None, published_ports: list[dict[str, int | None]] | None = None, ) -> dict[str, Any]: return self._manager.start_service( workspace_id, service_name, command=command, cwd=cwd, readiness=readiness, ready_timeout_seconds=ready_timeout_seconds, ready_interval_ms=ready_interval_ms, secret_env=secret_env, published_ports=published_ports, ) def list_services(self, workspace_id: str) -> dict[str, Any]: return self._manager.list_services(workspace_id) def status_service(self, workspace_id: str, service_name: str) -> dict[str, Any]: return self._manager.status_service(workspace_id, service_name) def logs_service( self, workspace_id: str, service_name: str, *, tail_lines: int = 200, all: bool = False, ) -> dict[str, Any]: return self._manager.logs_service( workspace_id, service_name, tail_lines=None if all else tail_lines, ) def stop_service(self, workspace_id: str, service_name: str) -> dict[str, Any]: return self._manager.stop_service(workspace_id, service_name) def delete_workspace(self, workspace_id: str) -> dict[str, Any]: return self._manager.delete_workspace(workspace_id) def stop_vm(self, vm_id: str) -> dict[str, Any]: return self._manager.stop_vm(vm_id) def delete_vm(self, vm_id: str) -> dict[str, Any]: return self._manager.delete_vm(vm_id) def status_vm(self, vm_id: str) -> dict[str, Any]: return self._manager.status_vm(vm_id) def network_info_vm(self, vm_id: str) -> dict[str, Any]: return self._manager.network_info_vm(vm_id) def reap_expired(self) -> dict[str, Any]: return self._manager.reap_expired() def run_in_vm( self, *, environment: str, command: str, vcpu_count: int = DEFAULT_VCPU_COUNT, mem_mib: int = DEFAULT_MEM_MIB, timeout_seconds: int = DEFAULT_TIMEOUT_SECONDS, ttl_seconds: int = DEFAULT_TTL_SECONDS, network: bool = False, allow_host_compat: bool = DEFAULT_ALLOW_HOST_COMPAT, ) -> dict[str, Any]: return self._manager.run_vm( environment=environment, command=command, vcpu_count=vcpu_count, mem_mib=mem_mib, timeout_seconds=timeout_seconds, ttl_seconds=ttl_seconds, network=network, allow_host_compat=allow_host_compat, ) def create_server(self) -> FastMCP: server = FastMCP(name="pyro_mcp") @server.tool() async def vm_run( environment: str, command: str, vcpu_count: int = DEFAULT_VCPU_COUNT, mem_mib: int = DEFAULT_MEM_MIB, timeout_seconds: int = DEFAULT_TIMEOUT_SECONDS, ttl_seconds: int = DEFAULT_TTL_SECONDS, network: bool = False, allow_host_compat: bool = DEFAULT_ALLOW_HOST_COMPAT, ) -> dict[str, Any]: """Create, start, execute, and clean up an ephemeral VM.""" return self.run_in_vm( environment=environment, command=command, vcpu_count=vcpu_count, mem_mib=mem_mib, timeout_seconds=timeout_seconds, ttl_seconds=ttl_seconds, network=network, allow_host_compat=allow_host_compat, ) @server.tool() async def vm_list_environments() -> list[dict[str, object]]: """List curated Linux environments and installation status.""" return self.list_environments() @server.tool() async def vm_create( environment: str, vcpu_count: int = DEFAULT_VCPU_COUNT, mem_mib: int = DEFAULT_MEM_MIB, ttl_seconds: int = DEFAULT_TTL_SECONDS, network: bool = False, allow_host_compat: bool = DEFAULT_ALLOW_HOST_COMPAT, ) -> dict[str, Any]: """Create an ephemeral VM record with environment and resource sizing.""" return self.create_vm( environment=environment, vcpu_count=vcpu_count, mem_mib=mem_mib, ttl_seconds=ttl_seconds, network=network, allow_host_compat=allow_host_compat, ) @server.tool() async def vm_start(vm_id: str) -> dict[str, Any]: """Start a created VM and transition it into a command-ready state.""" return self.start_vm(vm_id) @server.tool() async def vm_exec(vm_id: str, command: str, timeout_seconds: int = 30) -> dict[str, Any]: """Run one non-interactive command and auto-clean the VM.""" return self.exec_vm(vm_id, command=command, timeout_seconds=timeout_seconds) @server.tool() async def vm_stop(vm_id: str) -> dict[str, Any]: """Stop a running VM.""" return self.stop_vm(vm_id) @server.tool() async def vm_delete(vm_id: str) -> dict[str, Any]: """Delete a VM and its runtime artifacts.""" return self.delete_vm(vm_id) @server.tool() async def vm_status(vm_id: str) -> dict[str, Any]: """Get the current state and metadata for a VM.""" return self.status_vm(vm_id) @server.tool() async def vm_network_info(vm_id: str) -> dict[str, Any]: """Get the current network configuration assigned to a VM.""" return self.network_info_vm(vm_id) @server.tool() async def vm_reap_expired() -> dict[str, Any]: """Delete VMs whose TTL has expired.""" return self.reap_expired() @server.tool() async def workspace_create( environment: str, vcpu_count: int = DEFAULT_VCPU_COUNT, mem_mib: int = DEFAULT_MEM_MIB, ttl_seconds: int = DEFAULT_TTL_SECONDS, network_policy: str = DEFAULT_WORKSPACE_NETWORK_POLICY, allow_host_compat: bool = DEFAULT_ALLOW_HOST_COMPAT, seed_path: str | None = None, secrets: list[dict[str, str]] | None = None, name: str | None = None, labels: dict[str, str] | None = None, ) -> dict[str, Any]: """Create and start a persistent workspace.""" return self.create_workspace( environment=environment, vcpu_count=vcpu_count, mem_mib=mem_mib, ttl_seconds=ttl_seconds, network_policy=network_policy, allow_host_compat=allow_host_compat, seed_path=seed_path, secrets=secrets, name=name, labels=labels, ) @server.tool() async def workspace_list() -> dict[str, Any]: """List persisted workspaces with summary metadata.""" return self.list_workspaces() @server.tool() async def workspace_update( workspace_id: str, name: str | None = None, clear_name: bool = False, labels: dict[str, str] | None = None, clear_labels: list[str] | None = None, ) -> dict[str, Any]: """Update optional workspace name and labels.""" return self.update_workspace( workspace_id, name=name, clear_name=clear_name, labels=labels, clear_labels=clear_labels, ) @server.tool() async def workspace_exec( workspace_id: str, command: str, timeout_seconds: int = DEFAULT_TIMEOUT_SECONDS, secret_env: dict[str, str] | None = None, ) -> dict[str, Any]: """Run one command inside an existing persistent workspace.""" return self.exec_workspace( workspace_id, command=command, timeout_seconds=timeout_seconds, secret_env=secret_env, ) @server.tool() async def workspace_sync_push( workspace_id: str, source_path: str, dest: str = "/workspace", ) -> dict[str, Any]: """Push host content into the persistent `/workspace` of a started workspace.""" return self.push_workspace_sync(workspace_id, source_path=source_path, dest=dest) @server.tool() async def workspace_status(workspace_id: str) -> dict[str, Any]: """Inspect workspace state and latest command metadata.""" return self.status_workspace(workspace_id) @server.tool() async def workspace_stop(workspace_id: str) -> dict[str, Any]: """Stop one persistent workspace without resetting `/workspace`.""" return self.stop_workspace(workspace_id) @server.tool() async def workspace_start(workspace_id: str) -> dict[str, Any]: """Start one stopped persistent workspace without resetting `/workspace`.""" return self.start_workspace(workspace_id) @server.tool() async def workspace_logs(workspace_id: str) -> dict[str, Any]: """Return persisted command history for one workspace.""" return self.logs_workspace(workspace_id) @server.tool() async def workspace_export( workspace_id: str, path: str, output_path: str, ) -> dict[str, Any]: """Export one file or directory from `/workspace` back to the host.""" return self.export_workspace(workspace_id, path, output_path=output_path) @server.tool() async def workspace_diff(workspace_id: str) -> dict[str, Any]: """Compare `/workspace` to the immutable create-time baseline.""" return self.diff_workspace(workspace_id) @server.tool() async def workspace_file_list( workspace_id: str, path: str = "/workspace", recursive: bool = False, ) -> dict[str, Any]: """List metadata for files and directories under one live workspace path.""" return self.list_workspace_files( workspace_id, path=path, recursive=recursive, ) @server.tool() async def workspace_file_read( workspace_id: str, path: str, max_bytes: int = 65536, ) -> dict[str, Any]: """Read one regular text file from a live workspace path.""" return self.read_workspace_file( workspace_id, path, max_bytes=max_bytes, ) @server.tool() async def workspace_file_write( workspace_id: str, path: str, text: str, ) -> dict[str, Any]: """Create or replace one regular text file under `/workspace`.""" return self.write_workspace_file( workspace_id, path, text=text, ) @server.tool() async def workspace_patch_apply( workspace_id: str, patch: str, ) -> dict[str, Any]: """Apply a unified text patch inside one live workspace.""" return self.apply_workspace_patch( workspace_id, patch=patch, ) @server.tool() async def workspace_disk_export( workspace_id: str, output_path: str, ) -> dict[str, Any]: """Export the raw stopped workspace rootfs image to one host path.""" return self.export_workspace_disk(workspace_id, output_path=output_path) @server.tool() async def workspace_disk_list( workspace_id: str, path: str = "/workspace", recursive: bool = False, ) -> dict[str, Any]: """Inspect one stopped workspace rootfs path without booting the guest.""" return self.list_workspace_disk( workspace_id, path=path, recursive=recursive, ) @server.tool() async def workspace_disk_read( workspace_id: str, path: str, max_bytes: int = 65536, ) -> dict[str, Any]: """Read one regular file from a stopped workspace rootfs without booting the guest.""" return self.read_workspace_disk( workspace_id, path, max_bytes=max_bytes, ) @server.tool() async def snapshot_create(workspace_id: str, snapshot_name: str) -> dict[str, Any]: """Create one named workspace snapshot from the current `/workspace` tree.""" return self.create_snapshot(workspace_id, snapshot_name) @server.tool() async def snapshot_list(workspace_id: str) -> dict[str, Any]: """List the baseline plus named snapshots for one workspace.""" return self.list_snapshots(workspace_id) @server.tool() async def snapshot_delete(workspace_id: str, snapshot_name: str) -> dict[str, Any]: """Delete one named snapshot from a workspace.""" return self.delete_snapshot(workspace_id, snapshot_name) @server.tool() async def workspace_reset( workspace_id: str, snapshot: str = "baseline", ) -> dict[str, Any]: """Recreate a workspace and restore `/workspace` from baseline or one named snapshot.""" return self.reset_workspace(workspace_id, snapshot=snapshot) @server.tool() async def shell_open( workspace_id: str, cwd: str = "/workspace", cols: int = 120, rows: int = 30, secret_env: dict[str, str] | None = None, ) -> dict[str, Any]: """Open a persistent interactive shell inside one workspace.""" return self.open_shell( workspace_id, cwd=cwd, cols=cols, rows=rows, secret_env=secret_env, ) @server.tool() async def shell_read( workspace_id: str, shell_id: str, cursor: int = 0, max_chars: int = 65536, ) -> dict[str, Any]: """Read merged PTY output from a workspace shell.""" return self.read_shell( workspace_id, shell_id, cursor=cursor, max_chars=max_chars, ) @server.tool() async def shell_write( workspace_id: str, shell_id: str, input: str, append_newline: bool = True, ) -> dict[str, Any]: """Write text input to a persistent workspace shell.""" return self.write_shell( workspace_id, shell_id, input=input, append_newline=append_newline, ) @server.tool() async def shell_signal( workspace_id: str, shell_id: str, signal_name: str = "INT", ) -> dict[str, Any]: """Send a signal to the shell process group.""" return self.signal_shell( workspace_id, shell_id, signal_name=signal_name, ) @server.tool() async def shell_close(workspace_id: str, shell_id: str) -> dict[str, Any]: """Close a persistent workspace shell.""" return self.close_shell(workspace_id, shell_id) @server.tool() async def service_start( workspace_id: str, service_name: str, command: str, cwd: str = "/workspace", ready_file: str | None = None, ready_tcp: str | None = None, ready_http: str | None = None, ready_command: str | None = None, ready_timeout_seconds: int = 30, ready_interval_ms: int = 500, secret_env: dict[str, str] | None = None, published_ports: list[dict[str, int | None]] | None = None, ) -> dict[str, Any]: """Start a named long-running service inside a workspace.""" readiness: dict[str, Any] | None = None if ready_file is not None: readiness = {"type": "file", "path": ready_file} elif ready_tcp is not None: readiness = {"type": "tcp", "address": ready_tcp} elif ready_http is not None: readiness = {"type": "http", "url": ready_http} elif ready_command is not None: readiness = {"type": "command", "command": ready_command} return self.start_service( workspace_id, service_name, command=command, cwd=cwd, readiness=readiness, ready_timeout_seconds=ready_timeout_seconds, ready_interval_ms=ready_interval_ms, secret_env=secret_env, published_ports=published_ports, ) @server.tool() async def service_list(workspace_id: str) -> dict[str, Any]: """List named services in one workspace.""" return self.list_services(workspace_id) @server.tool() async def service_status(workspace_id: str, service_name: str) -> dict[str, Any]: """Inspect one named workspace service.""" return self.status_service(workspace_id, service_name) @server.tool() async def service_logs( workspace_id: str, service_name: str, tail_lines: int = 200, all: bool = False, ) -> dict[str, Any]: """Read persisted stdout/stderr for one workspace service.""" return self.logs_service( workspace_id, service_name, tail_lines=tail_lines, all=all, ) @server.tool() async def service_stop(workspace_id: str, service_name: str) -> dict[str, Any]: """Stop one running service in a workspace.""" return self.stop_service(workspace_id, service_name) @server.tool() async def workspace_delete(workspace_id: str) -> dict[str, Any]: """Delete a persistent workspace and its backing sandbox.""" return self.delete_workspace(workspace_id) return server