"""Public facade shared by the Python SDK and MCP server.""" from __future__ import annotations from pathlib import Path from typing import Any from mcp.server.fastmcp import FastMCP from pyro_mcp.vm_manager import ( DEFAULT_ALLOW_HOST_COMPAT, DEFAULT_MEM_MIB, DEFAULT_TIMEOUT_SECONDS, DEFAULT_TTL_SECONDS, DEFAULT_VCPU_COUNT, VmManager, ) class Pyro: """High-level facade over the ephemeral VM runtime.""" def __init__( self, manager: VmManager | None = None, *, backend_name: str | None = None, base_dir: Path | None = None, cache_dir: Path | None = None, max_active_vms: int = 4, ) -> None: self._manager = manager or VmManager( backend_name=backend_name, base_dir=base_dir, cache_dir=cache_dir, max_active_vms=max_active_vms, ) @property def manager(self) -> VmManager: return self._manager def list_environments(self) -> list[dict[str, object]]: return self._manager.list_environments() def pull_environment(self, environment: str) -> dict[str, object]: return self._manager.pull_environment(environment) def inspect_environment(self, environment: str) -> dict[str, object]: return self._manager.inspect_environment(environment) def prune_environments(self) -> dict[str, object]: return self._manager.prune_environments() def create_vm( self, *, environment: str, vcpu_count: int = DEFAULT_VCPU_COUNT, mem_mib: int = DEFAULT_MEM_MIB, ttl_seconds: int = DEFAULT_TTL_SECONDS, network: bool = False, allow_host_compat: bool = DEFAULT_ALLOW_HOST_COMPAT, ) -> dict[str, Any]: return self._manager.create_vm( environment=environment, vcpu_count=vcpu_count, mem_mib=mem_mib, ttl_seconds=ttl_seconds, network=network, allow_host_compat=allow_host_compat, ) def start_vm(self, vm_id: str) -> dict[str, Any]: return self._manager.start_vm(vm_id) def exec_vm(self, vm_id: str, *, command: str, timeout_seconds: int = 30) -> dict[str, Any]: return self._manager.exec_vm(vm_id, command=command, timeout_seconds=timeout_seconds) def stop_vm(self, vm_id: str) -> dict[str, Any]: return self._manager.stop_vm(vm_id) def delete_vm(self, vm_id: str) -> dict[str, Any]: return self._manager.delete_vm(vm_id) def status_vm(self, vm_id: str) -> dict[str, Any]: return self._manager.status_vm(vm_id) def network_info_vm(self, vm_id: str) -> dict[str, Any]: return self._manager.network_info_vm(vm_id) def reap_expired(self) -> dict[str, Any]: return self._manager.reap_expired() def run_in_vm( self, *, environment: str, command: str, vcpu_count: int = DEFAULT_VCPU_COUNT, mem_mib: int = DEFAULT_MEM_MIB, timeout_seconds: int = DEFAULT_TIMEOUT_SECONDS, ttl_seconds: int = DEFAULT_TTL_SECONDS, network: bool = False, allow_host_compat: bool = DEFAULT_ALLOW_HOST_COMPAT, ) -> dict[str, Any]: return self._manager.run_vm( environment=environment, command=command, vcpu_count=vcpu_count, mem_mib=mem_mib, timeout_seconds=timeout_seconds, ttl_seconds=ttl_seconds, network=network, allow_host_compat=allow_host_compat, ) def create_server(self) -> FastMCP: server = FastMCP(name="pyro_mcp") @server.tool() async def vm_run( environment: str, command: str, vcpu_count: int = DEFAULT_VCPU_COUNT, mem_mib: int = DEFAULT_MEM_MIB, timeout_seconds: int = DEFAULT_TIMEOUT_SECONDS, ttl_seconds: int = DEFAULT_TTL_SECONDS, network: bool = False, allow_host_compat: bool = DEFAULT_ALLOW_HOST_COMPAT, ) -> dict[str, Any]: """Create, start, execute, and clean up an ephemeral VM.""" return self.run_in_vm( environment=environment, command=command, vcpu_count=vcpu_count, mem_mib=mem_mib, timeout_seconds=timeout_seconds, ttl_seconds=ttl_seconds, network=network, allow_host_compat=allow_host_compat, ) @server.tool() async def vm_list_environments() -> list[dict[str, object]]: """List curated Linux environments and installation status.""" return self.list_environments() @server.tool() async def vm_create( environment: str, vcpu_count: int = DEFAULT_VCPU_COUNT, mem_mib: int = DEFAULT_MEM_MIB, ttl_seconds: int = DEFAULT_TTL_SECONDS, network: bool = False, allow_host_compat: bool = DEFAULT_ALLOW_HOST_COMPAT, ) -> dict[str, Any]: """Create an ephemeral VM record with environment and resource sizing.""" return self.create_vm( environment=environment, vcpu_count=vcpu_count, mem_mib=mem_mib, ttl_seconds=ttl_seconds, network=network, allow_host_compat=allow_host_compat, ) @server.tool() async def vm_start(vm_id: str) -> dict[str, Any]: """Start a created VM and transition it into a command-ready state.""" return self.start_vm(vm_id) @server.tool() async def vm_exec(vm_id: str, command: str, timeout_seconds: int = 30) -> dict[str, Any]: """Run one non-interactive command and auto-clean the VM.""" return self.exec_vm(vm_id, command=command, timeout_seconds=timeout_seconds) @server.tool() async def vm_stop(vm_id: str) -> dict[str, Any]: """Stop a running VM.""" return self.stop_vm(vm_id) @server.tool() async def vm_delete(vm_id: str) -> dict[str, Any]: """Delete a VM and its runtime artifacts.""" return self.delete_vm(vm_id) @server.tool() async def vm_status(vm_id: str) -> dict[str, Any]: """Get the current state and metadata for a VM.""" return self.status_vm(vm_id) @server.tool() async def vm_network_info(vm_id: str) -> dict[str, Any]: """Get the current network configuration assigned to a VM.""" return self.network_info_vm(vm_id) @server.tool() async def vm_reap_expired() -> dict[str, Any]: """Delete VMs whose TTL has expired.""" return self.reap_expired() return server