from __future__ import annotations import asyncio import subprocess import time from pathlib import Path from typing import Any, cast import pytest from pyro_mcp.api import Pyro from pyro_mcp.contract import ( PUBLIC_MCP_COLD_START_MODE_TOOLS, PUBLIC_MCP_INSPECT_MODE_TOOLS, PUBLIC_MCP_REPRO_FIX_MODE_TOOLS, PUBLIC_MCP_REVIEW_EVAL_MODE_TOOLS, PUBLIC_MCP_VM_RUN_PROFILE_TOOLS, PUBLIC_MCP_WORKSPACE_CORE_PROFILE_TOOLS, PUBLIC_MCP_WORKSPACE_FULL_PROFILE_TOOLS, ) from pyro_mcp.vm_manager import VmManager from pyro_mcp.vm_network import TapNetworkManager def _git(repo: Path, *args: str) -> str: result = subprocess.run( # noqa: S603 ["git", "-c", "commit.gpgsign=false", *args], cwd=repo, check=True, capture_output=True, text=True, ) return result.stdout.strip() def _make_repo(root: Path, *, content: str = "hello\n") -> Path: root.mkdir() _git(root, "init") _git(root, "config", "user.name", "Pyro Tests") _git(root, "config", "user.email", "pyro-tests@example.com") (root / "note.txt").write_text(content, encoding="utf-8") _git(root, "add", "note.txt") _git(root, "commit", "-m", "init") return root def test_pyro_run_in_vm_delegates_to_manager(tmp_path: Path) -> None: pyro = Pyro( manager=VmManager( backend_name="mock", base_dir=tmp_path / "vms", network_manager=TapNetworkManager(enabled=False), ) ) result = pyro.run_in_vm( environment="debian:12-base", command="printf 'ok\\n'", vcpu_count=1, mem_mib=512, timeout_seconds=30, ttl_seconds=600, network=False, allow_host_compat=True, ) assert int(result["exit_code"]) == 0 assert str(result["stdout"]) == "ok\n" def test_pyro_create_server_defaults_to_workspace_core_profile(tmp_path: Path) -> None: pyro = Pyro( manager=VmManager( backend_name="mock", base_dir=tmp_path / "vms", network_manager=TapNetworkManager(enabled=False), ) ) async def _run() -> tuple[list[str], dict[str, dict[str, Any]]]: server = pyro.create_server() tools = await server.list_tools() tool_map = {tool.name: tool.model_dump() for tool in tools} return sorted(tool_map), tool_map tool_names, tool_map = asyncio.run(_run()) assert tuple(tool_names) == tuple(sorted(PUBLIC_MCP_WORKSPACE_CORE_PROFILE_TOOLS)) create_properties = tool_map["workspace_create"]["inputSchema"]["properties"] assert "network_policy" not in create_properties assert "secrets" not in create_properties exec_properties = tool_map["workspace_exec"]["inputSchema"]["properties"] assert "secret_env" not in exec_properties assert "shell_open" not in tool_map assert "service_start" not in tool_map assert "snapshot_create" not in tool_map assert "workspace_disk_export" not in tool_map def test_pyro_create_server_workspace_full_profile_keeps_shell_read_schema(tmp_path: Path) -> None: pyro = Pyro( manager=VmManager( backend_name="mock", base_dir=tmp_path / "vms", network_manager=TapNetworkManager(enabled=False), ) ) async def _run() -> tuple[list[str], dict[str, dict[str, Any]]]: server = pyro.create_server(profile="workspace-full") tools = await server.list_tools() tool_map = {tool.name: tool.model_dump() for tool in tools} return sorted(tool_map), tool_map tool_names, tool_map = asyncio.run(_run()) assert tuple(tool_names) == tuple(sorted(PUBLIC_MCP_WORKSPACE_FULL_PROFILE_TOOLS)) shell_read_properties = tool_map["shell_read"]["inputSchema"]["properties"] assert "plain" in shell_read_properties assert "wait_for_idle_ms" in shell_read_properties def test_pyro_create_server_vm_run_profile_registers_only_vm_run(tmp_path: Path) -> None: pyro = Pyro( manager=VmManager( backend_name="mock", base_dir=tmp_path / "vms", network_manager=TapNetworkManager(enabled=False), ) ) async def _run() -> list[str]: server = pyro.create_server(profile="vm-run") tools = await server.list_tools() return sorted(tool.name for tool in tools) assert tuple(asyncio.run(_run())) == PUBLIC_MCP_VM_RUN_PROFILE_TOOLS def test_pyro_create_server_workspace_core_profile_registers_expected_tools_and_schemas( tmp_path: Path, ) -> None: pyro = Pyro( manager=VmManager( backend_name="mock", base_dir=tmp_path / "vms", network_manager=TapNetworkManager(enabled=False), ) ) async def _run() -> tuple[list[str], dict[str, dict[str, Any]]]: server = pyro.create_server(profile="workspace-core") tools = await server.list_tools() tool_map = {tool.name: tool.model_dump() for tool in tools} return sorted(tool_map), tool_map tool_names, tool_map = asyncio.run(_run()) assert tuple(tool_names) == tuple(sorted(PUBLIC_MCP_WORKSPACE_CORE_PROFILE_TOOLS)) create_properties = tool_map["workspace_create"]["inputSchema"]["properties"] assert "network_policy" not in create_properties assert "secrets" not in create_properties exec_properties = tool_map["workspace_exec"]["inputSchema"]["properties"] assert "secret_env" not in exec_properties assert "shell_open" not in tool_map assert "service_start" not in tool_map assert "snapshot_create" not in tool_map assert "workspace_disk_export" not in tool_map def test_pyro_create_server_repro_fix_mode_registers_expected_tools_and_schemas( tmp_path: Path, ) -> None: pyro = Pyro( manager=VmManager( backend_name="mock", base_dir=tmp_path / "vms", network_manager=TapNetworkManager(enabled=False), ) ) async def _run() -> tuple[list[str], dict[str, dict[str, Any]]]: server = pyro.create_server(mode="repro-fix") tools = await server.list_tools() tool_map = {tool.name: tool.model_dump() for tool in tools} return sorted(tool_map), tool_map tool_names, tool_map = asyncio.run(_run()) assert tuple(tool_names) == tuple(sorted(PUBLIC_MCP_REPRO_FIX_MODE_TOOLS)) create_properties = tool_map["workspace_create"]["inputSchema"]["properties"] assert "network_policy" not in create_properties assert "secrets" not in create_properties exec_properties = tool_map["workspace_exec"]["inputSchema"]["properties"] assert "secret_env" not in exec_properties assert "service_start" not in tool_map assert "shell_open" not in tool_map assert "snapshot_create" not in tool_map assert "reproduce a failure" in str(tool_map["workspace_create"]["description"]) def test_pyro_create_server_cold_start_mode_registers_expected_tools_and_schemas( tmp_path: Path, ) -> None: pyro = Pyro( manager=VmManager( backend_name="mock", base_dir=tmp_path / "vms", network_manager=TapNetworkManager(enabled=False), ) ) async def _run() -> tuple[list[str], dict[str, dict[str, Any]]]: server = pyro.create_server(mode="cold-start") tools = await server.list_tools() tool_map = {tool.name: tool.model_dump() for tool in tools} return sorted(tool_map), tool_map tool_names, tool_map = asyncio.run(_run()) assert tuple(tool_names) == tuple(sorted(PUBLIC_MCP_COLD_START_MODE_TOOLS)) assert "shell_open" not in tool_map assert "snapshot_create" not in tool_map service_start_properties = tool_map["service_start"]["inputSchema"]["properties"] assert "secret_env" not in service_start_properties assert "published_ports" not in service_start_properties create_properties = tool_map["workspace_create"]["inputSchema"]["properties"] assert "network_policy" not in create_properties def test_pyro_create_server_review_eval_mode_registers_expected_tools_and_schemas( tmp_path: Path, ) -> None: pyro = Pyro( manager=VmManager( backend_name="mock", base_dir=tmp_path / "vms", network_manager=TapNetworkManager(enabled=False), ) ) async def _run() -> tuple[list[str], dict[str, dict[str, Any]]]: server = pyro.create_server(mode="review-eval") tools = await server.list_tools() tool_map = {tool.name: tool.model_dump() for tool in tools} return sorted(tool_map), tool_map tool_names, tool_map = asyncio.run(_run()) assert tuple(tool_names) == tuple(sorted(PUBLIC_MCP_REVIEW_EVAL_MODE_TOOLS)) assert "service_start" not in tool_map assert "shell_open" in tool_map assert "snapshot_create" in tool_map shell_open_properties = tool_map["shell_open"]["inputSchema"]["properties"] assert "secret_env" not in shell_open_properties def test_pyro_create_server_inspect_mode_registers_expected_tools(tmp_path: Path) -> None: pyro = Pyro( manager=VmManager( backend_name="mock", base_dir=tmp_path / "vms", network_manager=TapNetworkManager(enabled=False), ) ) async def _run() -> list[str]: server = pyro.create_server(mode="inspect") tools = await server.list_tools() return sorted(tool.name for tool in tools) assert tuple(asyncio.run(_run())) == tuple(sorted(PUBLIC_MCP_INSPECT_MODE_TOOLS)) def test_pyro_create_server_rejects_mode_and_non_default_profile(tmp_path: Path) -> None: pyro = Pyro( manager=VmManager( backend_name="mock", base_dir=tmp_path / "vms", network_manager=TapNetworkManager(enabled=False), ) ) with pytest.raises(ValueError, match="mutually exclusive"): pyro.create_server(profile="workspace-full", mode="repro-fix") def test_pyro_create_server_project_path_updates_workspace_create_description_and_default_seed( tmp_path: Path, ) -> None: repo = _make_repo(tmp_path / "repo", content="project-aware\n") pyro = Pyro( manager=VmManager( backend_name="mock", base_dir=tmp_path / "vms", network_manager=TapNetworkManager(enabled=False), ) ) def _extract_structured(raw_result: object) -> dict[str, Any]: if not isinstance(raw_result, tuple) or len(raw_result) != 2: raise TypeError("unexpected call_tool result shape") _, structured = raw_result if not isinstance(structured, dict): raise TypeError("expected structured dictionary result") return cast(dict[str, Any], structured) async def _run() -> tuple[dict[str, Any], dict[str, Any], dict[str, Any]]: server = pyro.create_server(project_path=repo) tools = await server.list_tools() tool_map = {tool.name: tool.model_dump() for tool in tools} created = _extract_structured( await server.call_tool( "workspace_create", { "environment": "debian:12-base", "allow_host_compat": True, }, ) ) executed = _extract_structured( await server.call_tool( "workspace_exec", { "workspace_id": created["workspace_id"], "command": "cat note.txt", }, ) ) return tool_map["workspace_create"], created, executed workspace_create_tool, created, executed = asyncio.run(_run()) assert "If `seed_path` is omitted" in str(workspace_create_tool["description"]) assert str(repo.resolve()) in str(workspace_create_tool["description"]) assert created["workspace_seed"]["origin_kind"] == "project_path" assert created["workspace_seed"]["origin_ref"] == str(repo.resolve()) assert executed["stdout"] == "project-aware\n" def test_pyro_vm_run_tool_executes(tmp_path: Path) -> None: pyro = Pyro( manager=VmManager( backend_name="mock", base_dir=tmp_path / "vms", network_manager=TapNetworkManager(enabled=False), ) ) def _extract_structured(raw_result: object) -> dict[str, Any]: if not isinstance(raw_result, tuple) or len(raw_result) != 2: raise TypeError("unexpected call_tool result shape") _, structured = raw_result if not isinstance(structured, dict): raise TypeError("expected structured dictionary result") return cast(dict[str, Any], structured) async def _run() -> dict[str, Any]: server = pyro.create_server() return _extract_structured( await server.call_tool( "vm_run", { "environment": "debian:12-base", "command": "printf 'ok\\n'", "network": False, "allow_host_compat": True, }, ) ) result = asyncio.run(_run()) assert int(result["exit_code"]) == 0 def test_pyro_create_vm_defaults_sizing_and_host_compat(tmp_path: Path) -> None: pyro = Pyro( manager=VmManager( backend_name="mock", base_dir=tmp_path / "vms", network_manager=TapNetworkManager(enabled=False), ) ) created = pyro.create_vm( environment="debian:12-base", allow_host_compat=True, ) assert created["vcpu_count"] == 1 assert created["mem_mib"] == 1024 assert created["allow_host_compat"] is True def test_pyro_workspace_network_policy_and_published_ports_delegate() -> None: calls: list[tuple[str, dict[str, Any]]] = [] class StubManager: def create_workspace(self, **kwargs: Any) -> dict[str, Any]: calls.append(("create_workspace", kwargs)) return {"workspace_id": "workspace-123"} def list_workspaces(self) -> dict[str, Any]: calls.append(("list_workspaces", {})) return {"count": 1, "workspaces": [{"workspace_id": "workspace-123"}]} def update_workspace(self, workspace_id: str, **kwargs: Any) -> dict[str, Any]: calls.append(("update_workspace", {"workspace_id": workspace_id, **kwargs})) return {"workspace_id": workspace_id, "name": "repro-fix", "labels": {"owner": "codex"}} def start_service( self, workspace_id: str, service_name: str, **kwargs: Any, ) -> dict[str, Any]: calls.append( ( "start_service", { "workspace_id": workspace_id, "service_name": service_name, **kwargs, }, ) ) return {"workspace_id": workspace_id, "service_name": service_name, "state": "running"} pyro = Pyro(manager=cast(Any, StubManager())) pyro.create_workspace( environment="debian:12", network_policy="egress+published-ports", name="repro-fix", labels={"issue": "123"}, ) pyro.list_workspaces() pyro.update_workspace( "workspace-123", name="repro-fix", labels={"owner": "codex"}, clear_labels=["issue"], ) pyro.start_service( "workspace-123", "web", command="python3 -m http.server 8080", published_ports=[{"guest_port": 8080, "host_port": 18080}], ) assert calls[0] == ( "create_workspace", { "environment": "debian:12", "vcpu_count": 1, "mem_mib": 1024, "ttl_seconds": 600, "network_policy": "egress+published-ports", "allow_host_compat": False, "seed_path": None, "secrets": None, "name": "repro-fix", "labels": {"issue": "123"}, }, ) assert calls[1] == ( "list_workspaces", {}, ) assert calls[2] == ( "update_workspace", { "workspace_id": "workspace-123", "name": "repro-fix", "clear_name": False, "labels": {"owner": "codex"}, "clear_labels": ["issue"], }, ) assert calls[3] == ( "start_service", { "workspace_id": "workspace-123", "service_name": "web", "command": "python3 -m http.server 8080", "cwd": "/workspace", "readiness": None, "ready_timeout_seconds": 30, "ready_interval_ms": 500, "secret_env": None, "published_ports": [{"guest_port": 8080, "host_port": 18080}], }, ) def test_pyro_workspace_methods_delegate_to_manager(tmp_path: Path) -> None: pyro = Pyro( manager=VmManager( backend_name="mock", base_dir=tmp_path / "vms", network_manager=TapNetworkManager(enabled=False), ) ) source_dir = tmp_path / "seed" source_dir.mkdir() (source_dir / "note.txt").write_text("ok\n", encoding="utf-8") secret_file = tmp_path / "token.txt" secret_file.write_text("from-file\n", encoding="utf-8") created = pyro.create_workspace( environment="debian:12-base", allow_host_compat=True, seed_path=source_dir, name="repro-fix", labels={"issue": "123"}, secrets=[ {"name": "API_TOKEN", "value": "expected"}, {"name": "FILE_TOKEN", "file_path": str(secret_file)}, ], ) workspace_id = str(created["workspace_id"]) listed_before = pyro.list_workspaces() updated_metadata = pyro.update_workspace( workspace_id, labels={"owner": "codex"}, clear_labels=["issue"], ) updated_dir = tmp_path / "updated" updated_dir.mkdir() (updated_dir / "more.txt").write_text("more\n", encoding="utf-8") synced = pyro.push_workspace_sync(workspace_id, updated_dir, dest="subdir") executed = pyro.exec_workspace( workspace_id, command='sh -lc \'printf "%s\\n" "$API_TOKEN"\'', secret_env={"API_TOKEN": "API_TOKEN"}, ) listed_files = pyro.list_workspace_files(workspace_id, path="/workspace", recursive=True) file_read = pyro.read_workspace_file(workspace_id, "note.txt") file_write = pyro.write_workspace_file( workspace_id, "src/app.py", text="print('hello from file op')\n", ) patch_result = pyro.apply_workspace_patch( workspace_id, patch=( "--- a/note.txt\n" "+++ b/note.txt\n" "@@ -1 +1 @@\n" "-ok\n" "+patched\n" ), ) diff_payload = pyro.diff_workspace(workspace_id) snapshot = pyro.create_snapshot(workspace_id, "checkpoint") snapshots = pyro.list_snapshots(workspace_id) export_path = tmp_path / "exported-note.txt" exported = pyro.export_workspace(workspace_id, "note.txt", output_path=export_path) shell = pyro.open_shell( workspace_id, secret_env={"API_TOKEN": "API_TOKEN"}, ) shell_id = str(shell["shell_id"]) pyro.write_shell(workspace_id, shell_id, input='printf "%s\\n" "$API_TOKEN"') shell_output: dict[str, Any] = {} deadline = time.time() + 5 while time.time() < deadline: shell_output = pyro.read_shell(workspace_id, shell_id, cursor=0, max_chars=65536) if "[REDACTED]" in str(shell_output.get("output", "")): break time.sleep(0.05) shell_closed = pyro.close_shell(workspace_id, shell_id) service = pyro.start_service( workspace_id, "app", command=( 'sh -lc \'trap "exit 0" TERM; printf "%s\\n" "$API_TOKEN" >&2; ' 'touch .ready; while true; do sleep 60; done\'' ), readiness={"type": "file", "path": ".ready"}, secret_env={"API_TOKEN": "API_TOKEN"}, ) services = pyro.list_services(workspace_id) service_status = pyro.status_service(workspace_id, "app") service_logs = pyro.logs_service(workspace_id, "app", all=True) summary = pyro.summarize_workspace(workspace_id) reset = pyro.reset_workspace(workspace_id, snapshot="checkpoint") deleted_snapshot = pyro.delete_snapshot(workspace_id, "checkpoint") status = pyro.status_workspace(workspace_id) logs = pyro.logs_workspace(workspace_id) deleted = pyro.delete_workspace(workspace_id) assert created["secrets"] == [ {"name": "API_TOKEN", "source_kind": "literal"}, {"name": "FILE_TOKEN", "source_kind": "file"}, ] assert created["name"] == "repro-fix" assert created["labels"] == {"issue": "123"} assert listed_before["count"] == 1 assert listed_before["workspaces"][0]["name"] == "repro-fix" assert updated_metadata["labels"] == {"owner": "codex"} assert executed["stdout"] == "[REDACTED]\n" assert any(entry["path"] == "/workspace/note.txt" for entry in listed_files["entries"]) assert file_read["content"] == "ok\n" assert file_write["path"] == "/workspace/src/app.py" assert file_write["bytes_written"] == len("print('hello from file op')\n".encode("utf-8")) assert patch_result["changed"] is True assert patch_result["entries"] == [{"path": "/workspace/note.txt", "status": "modified"}] assert created["workspace_seed"]["mode"] == "directory" assert synced["workspace_sync"]["destination"] == "/workspace/subdir" assert diff_payload["changed"] is True assert snapshot["snapshot"]["snapshot_name"] == "checkpoint" assert snapshots["count"] == 2 assert exported["output_path"] == str(export_path) assert export_path.read_text(encoding="utf-8") == "patched\n" assert shell_output["output"].count("[REDACTED]") >= 1 assert shell_closed["closed"] is True assert service["state"] == "running" assert services["count"] == 1 assert service_status["state"] == "running" assert service_logs["stderr"].count("[REDACTED]") >= 1 assert service_logs["tail_lines"] is None assert summary["workspace_id"] == workspace_id assert summary["commands"]["total"] >= 1 assert summary["changes"]["available"] is True assert reset["workspace_reset"]["snapshot_name"] == "checkpoint" assert reset["secrets"] == created["secrets"] assert deleted_snapshot["deleted"] is True assert status["command_count"] == 0 assert status["service_count"] == 0 assert logs["count"] == 0 assert deleted["deleted"] is True def test_pyro_workspace_disk_methods_delegate_to_manager() -> None: calls: list[tuple[str, dict[str, Any]]] = [] class StubManager: def stop_workspace(self, workspace_id: str) -> dict[str, Any]: calls.append(("stop_workspace", {"workspace_id": workspace_id})) return {"workspace_id": workspace_id, "state": "stopped"} def start_workspace(self, workspace_id: str) -> dict[str, Any]: calls.append(("start_workspace", {"workspace_id": workspace_id})) return {"workspace_id": workspace_id, "state": "started"} def export_workspace_disk(self, workspace_id: str, *, output_path: Path) -> dict[str, Any]: calls.append( ( "export_workspace_disk", {"workspace_id": workspace_id, "output_path": str(output_path)}, ) ) return {"workspace_id": workspace_id, "output_path": str(output_path)} def list_workspace_disk( self, workspace_id: str, *, path: str = "/workspace", recursive: bool = False, ) -> dict[str, Any]: calls.append( ( "list_workspace_disk", { "workspace_id": workspace_id, "path": path, "recursive": recursive, }, ) ) return {"workspace_id": workspace_id, "entries": []} def read_workspace_disk( self, workspace_id: str, *, path: str, max_bytes: int = 65536, ) -> dict[str, Any]: calls.append( ( "read_workspace_disk", { "workspace_id": workspace_id, "path": path, "max_bytes": max_bytes, }, ) ) return {"workspace_id": workspace_id, "content": ""} pyro = Pyro(manager=cast(Any, StubManager())) stopped = pyro.stop_workspace("workspace-123") started = pyro.start_workspace("workspace-123") exported = pyro.export_workspace_disk("workspace-123", output_path=Path("/tmp/workspace.ext4")) listed = pyro.list_workspace_disk("workspace-123", path="/workspace/src", recursive=True) read = pyro.read_workspace_disk("workspace-123", "note.txt", max_bytes=4096) assert stopped["state"] == "stopped" assert started["state"] == "started" assert exported["output_path"] == "/tmp/workspace.ext4" assert listed["entries"] == [] assert read["content"] == "" assert calls == [ ("stop_workspace", {"workspace_id": "workspace-123"}), ("start_workspace", {"workspace_id": "workspace-123"}), ( "export_workspace_disk", { "workspace_id": "workspace-123", "output_path": "/tmp/workspace.ext4", }, ), ( "list_workspace_disk", { "workspace_id": "workspace-123", "path": "/workspace/src", "recursive": True, }, ), ( "read_workspace_disk", { "workspace_id": "workspace-123", "path": "note.txt", "max_bytes": 4096, }, ), ] def test_pyro_create_server_workspace_disk_tools_delegate() -> None: calls: list[tuple[str, dict[str, Any]]] = [] class StubManager: def stop_workspace(self, workspace_id: str) -> dict[str, Any]: calls.append(("stop_workspace", {"workspace_id": workspace_id})) return {"workspace_id": workspace_id, "state": "stopped"} def start_workspace(self, workspace_id: str) -> dict[str, Any]: calls.append(("start_workspace", {"workspace_id": workspace_id})) return {"workspace_id": workspace_id, "state": "started"} def export_workspace_disk(self, workspace_id: str, *, output_path: str) -> dict[str, Any]: calls.append( ( "export_workspace_disk", {"workspace_id": workspace_id, "output_path": output_path}, ) ) return {"workspace_id": workspace_id, "output_path": output_path} def list_workspace_disk( self, workspace_id: str, *, path: str = "/workspace", recursive: bool = False, ) -> dict[str, Any]: calls.append( ( "list_workspace_disk", { "workspace_id": workspace_id, "path": path, "recursive": recursive, }, ) ) return {"workspace_id": workspace_id, "entries": []} def read_workspace_disk( self, workspace_id: str, *, path: str, max_bytes: int = 65536, ) -> dict[str, Any]: calls.append( ( "read_workspace_disk", { "workspace_id": workspace_id, "path": path, "max_bytes": max_bytes, }, ) ) return {"workspace_id": workspace_id, "content": ""} pyro = Pyro(manager=cast(Any, StubManager())) def _extract_structured(raw_result: object) -> dict[str, Any]: if not isinstance(raw_result, tuple) or len(raw_result) != 2: raise TypeError("unexpected call_tool result shape") _, structured = raw_result if not isinstance(structured, dict): raise TypeError("expected structured dictionary result") return cast(dict[str, Any], structured) async def _run() -> tuple[dict[str, Any], ...]: server = pyro.create_server(profile="workspace-full") stopped = _extract_structured( await server.call_tool("workspace_stop", {"workspace_id": "workspace-123"}) ) started = _extract_structured( await server.call_tool("workspace_start", {"workspace_id": "workspace-123"}) ) exported = _extract_structured( await server.call_tool( "workspace_disk_export", { "workspace_id": "workspace-123", "output_path": "/tmp/workspace.ext4", }, ) ) listed = _extract_structured( await server.call_tool( "workspace_disk_list", { "workspace_id": "workspace-123", "path": "/workspace/src", "recursive": True, }, ) ) read = _extract_structured( await server.call_tool( "workspace_disk_read", { "workspace_id": "workspace-123", "path": "note.txt", "max_bytes": 4096, }, ) ) return stopped, started, exported, listed, read stopped, started, exported, listed, read = asyncio.run(_run()) assert stopped["state"] == "stopped" assert started["state"] == "started" assert exported["output_path"] == "/tmp/workspace.ext4" assert listed["entries"] == [] assert read["content"] == "" assert calls == [ ("stop_workspace", {"workspace_id": "workspace-123"}), ("start_workspace", {"workspace_id": "workspace-123"}), ( "export_workspace_disk", { "workspace_id": "workspace-123", "output_path": "/tmp/workspace.ext4", }, ), ( "list_workspace_disk", { "workspace_id": "workspace-123", "path": "/workspace/src", "recursive": True, }, ), ( "read_workspace_disk", { "workspace_id": "workspace-123", "path": "note.txt", "max_bytes": 4096, }, ), ] def test_pyro_workspace_file_methods_delegate_to_manager() -> None: calls: list[tuple[str, dict[str, Any]]] = [] class StubManager: def list_workspace_files( self, workspace_id: str, *, path: str = "/workspace", recursive: bool = False, ) -> dict[str, Any]: calls.append( ( "list_workspace_files", { "workspace_id": workspace_id, "path": path, "recursive": recursive, }, ) ) return {"workspace_id": workspace_id, "entries": []} def read_workspace_file( self, workspace_id: str, path: str, *, max_bytes: int = 65536, ) -> dict[str, Any]: calls.append( ( "read_workspace_file", { "workspace_id": workspace_id, "path": path, "max_bytes": max_bytes, }, ) ) return {"workspace_id": workspace_id, "content": "hello\n"} def write_workspace_file( self, workspace_id: str, path: str, *, text: str, ) -> dict[str, Any]: calls.append( ( "write_workspace_file", { "workspace_id": workspace_id, "path": path, "text": text, }, ) ) return {"workspace_id": workspace_id, "bytes_written": len(text.encode("utf-8"))} def apply_workspace_patch( self, workspace_id: str, *, patch: str, ) -> dict[str, Any]: calls.append( ( "apply_workspace_patch", { "workspace_id": workspace_id, "patch": patch, }, ) ) return {"workspace_id": workspace_id, "changed": True} pyro = Pyro(manager=cast(Any, StubManager())) listed = pyro.list_workspace_files("workspace-123", path="/workspace/src", recursive=True) read = pyro.read_workspace_file("workspace-123", "note.txt", max_bytes=4096) written = pyro.write_workspace_file("workspace-123", "src/app.py", text="print('hi')\n") patched = pyro.apply_workspace_patch( "workspace-123", patch="--- a/note.txt\n+++ b/note.txt\n@@ -1 +1 @@\n-old\n+new\n", ) assert listed["entries"] == [] assert read["content"] == "hello\n" assert written["bytes_written"] == len("print('hi')\n".encode("utf-8")) assert patched["changed"] is True assert calls == [ ( "list_workspace_files", { "workspace_id": "workspace-123", "path": "/workspace/src", "recursive": True, }, ), ( "read_workspace_file", { "workspace_id": "workspace-123", "path": "note.txt", "max_bytes": 4096, }, ), ( "write_workspace_file", { "workspace_id": "workspace-123", "path": "src/app.py", "text": "print('hi')\n", }, ), ( "apply_workspace_patch", { "workspace_id": "workspace-123", "patch": "--- a/note.txt\n+++ b/note.txt\n@@ -1 +1 @@\n-old\n+new\n", }, ), ] def test_pyro_create_server_workspace_file_tools_delegate() -> None: calls: list[tuple[str, dict[str, Any]]] = [] class StubManager: def list_workspace_files( self, workspace_id: str, *, path: str = "/workspace", recursive: bool = False, ) -> dict[str, Any]: calls.append( ( "list_workspace_files", { "workspace_id": workspace_id, "path": path, "recursive": recursive, }, ) ) return {"workspace_id": workspace_id, "entries": []} def read_workspace_file( self, workspace_id: str, path: str, *, max_bytes: int = 65536, ) -> dict[str, Any]: calls.append( ( "read_workspace_file", { "workspace_id": workspace_id, "path": path, "max_bytes": max_bytes, }, ) ) return {"workspace_id": workspace_id, "content": "hello\n"} def write_workspace_file( self, workspace_id: str, path: str, *, text: str, ) -> dict[str, Any]: calls.append( ( "write_workspace_file", { "workspace_id": workspace_id, "path": path, "text": text, }, ) ) return {"workspace_id": workspace_id, "bytes_written": len(text.encode("utf-8"))} def apply_workspace_patch( self, workspace_id: str, *, patch: str, ) -> dict[str, Any]: calls.append( ( "apply_workspace_patch", { "workspace_id": workspace_id, "patch": patch, }, ) ) return {"workspace_id": workspace_id, "changed": True} pyro = Pyro(manager=cast(Any, StubManager())) def _extract_structured(raw_result: object) -> dict[str, Any]: if not isinstance(raw_result, tuple) or len(raw_result) != 2: raise TypeError("unexpected call_tool result shape") _, structured = raw_result if not isinstance(structured, dict): raise TypeError("expected structured dictionary result") return cast(dict[str, Any], structured) async def _run() -> tuple[dict[str, Any], ...]: server = pyro.create_server() listed = _extract_structured( await server.call_tool( "workspace_file_list", { "workspace_id": "workspace-123", "path": "/workspace/src", "recursive": True, }, ) ) read = _extract_structured( await server.call_tool( "workspace_file_read", { "workspace_id": "workspace-123", "path": "note.txt", "max_bytes": 4096, }, ) ) written = _extract_structured( await server.call_tool( "workspace_file_write", { "workspace_id": "workspace-123", "path": "src/app.py", "text": "print('hi')\n", }, ) ) patched = _extract_structured( await server.call_tool( "workspace_patch_apply", { "workspace_id": "workspace-123", "patch": "--- a/note.txt\n+++ b/note.txt\n@@ -1 +1 @@\n-old\n+new\n", }, ) ) return listed, read, written, patched listed, read, written, patched = asyncio.run(_run()) assert listed["entries"] == [] assert read["content"] == "hello\n" assert written["bytes_written"] == len("print('hi')\n".encode("utf-8")) assert patched["changed"] is True assert calls == [ ( "list_workspace_files", { "workspace_id": "workspace-123", "path": "/workspace/src", "recursive": True, }, ), ( "read_workspace_file", { "workspace_id": "workspace-123", "path": "note.txt", "max_bytes": 4096, }, ), ( "write_workspace_file", { "workspace_id": "workspace-123", "path": "src/app.py", "text": "print('hi')\n", }, ), ( "apply_workspace_patch", { "workspace_id": "workspace-123", "patch": "--- a/note.txt\n+++ b/note.txt\n@@ -1 +1 @@\n-old\n+new\n", }, ), ] def test_pyro_create_server_workspace_status_shell_and_service_delegate() -> None: calls: list[tuple[str, dict[str, Any]]] = [] class StubManager: def status_workspace(self, workspace_id: str) -> dict[str, Any]: calls.append(("status_workspace", {"workspace_id": workspace_id})) return {"workspace_id": workspace_id, "state": "started"} def logs_workspace(self, workspace_id: str) -> dict[str, Any]: calls.append(("logs_workspace", {"workspace_id": workspace_id})) return {"workspace_id": workspace_id, "count": 0, "entries": []} def summarize_workspace(self, workspace_id: str) -> dict[str, Any]: calls.append(("summarize_workspace", {"workspace_id": workspace_id})) return { "workspace_id": workspace_id, "state": "started", "changes": {"available": True, "changed": False, "summary": None, "entries": []}, } def open_shell( self, workspace_id: str, *, cwd: str = "/workspace", cols: int = 120, rows: int = 30, secret_env: dict[str, str] | None = None, ) -> dict[str, Any]: calls.append( ( "open_shell", { "workspace_id": workspace_id, "cwd": cwd, "cols": cols, "rows": rows, "secret_env": secret_env, }, ) ) return {"workspace_id": workspace_id, "shell_id": "shell-1", "state": "running"} def read_shell( self, workspace_id: str, shell_id: str, *, cursor: int = 0, max_chars: int = 65536, plain: bool = False, wait_for_idle_ms: int | None = None, ) -> dict[str, Any]: calls.append( ( "read_shell", { "workspace_id": workspace_id, "shell_id": shell_id, "cursor": cursor, "max_chars": max_chars, "plain": plain, "wait_for_idle_ms": wait_for_idle_ms, }, ) ) return {"workspace_id": workspace_id, "shell_id": shell_id, "output": ""} def write_shell( self, workspace_id: str, shell_id: str, *, input_text: str, append_newline: bool = True, ) -> dict[str, Any]: calls.append( ( "write_shell", { "workspace_id": workspace_id, "shell_id": shell_id, "input_text": input_text, "append_newline": append_newline, }, ) ) return { "workspace_id": workspace_id, "shell_id": shell_id, "input_length": len(input_text), } def signal_shell( self, workspace_id: str, shell_id: str, *, signal_name: str = "INT", ) -> dict[str, Any]: calls.append( ( "signal_shell", { "workspace_id": workspace_id, "shell_id": shell_id, "signal_name": signal_name, }, ) ) return {"workspace_id": workspace_id, "shell_id": shell_id, "signal": signal_name} def close_shell(self, workspace_id: str, shell_id: str) -> dict[str, Any]: calls.append( ("close_shell", {"workspace_id": workspace_id, "shell_id": shell_id}) ) return {"workspace_id": workspace_id, "shell_id": shell_id, "closed": True} def start_service( self, workspace_id: str, service_name: str, **kwargs: Any, ) -> dict[str, Any]: calls.append( ( "start_service", { "workspace_id": workspace_id, "service_name": service_name, **kwargs, }, ) ) return {"workspace_id": workspace_id, "service_name": service_name, "state": "running"} pyro = Pyro(manager=cast(Any, StubManager())) def _extract_structured(raw_result: object) -> dict[str, Any]: if not isinstance(raw_result, tuple) or len(raw_result) != 2: raise TypeError("unexpected call_tool result shape") _, structured = raw_result if not isinstance(structured, dict): raise TypeError("expected structured dictionary result") return cast(dict[str, Any], structured) async def _run() -> tuple[dict[str, Any], ...]: server = pyro.create_server(profile="workspace-full") status = _extract_structured( await server.call_tool("workspace_status", {"workspace_id": "workspace-123"}) ) summary = _extract_structured( await server.call_tool("workspace_summary", {"workspace_id": "workspace-123"}) ) logs = _extract_structured( await server.call_tool("workspace_logs", {"workspace_id": "workspace-123"}) ) opened = _extract_structured( await server.call_tool( "shell_open", { "workspace_id": "workspace-123", "cwd": "/workspace/src", "cols": 100, "rows": 20, "secret_env": {"TOKEN": "API_TOKEN"}, }, ) ) read = _extract_structured( await server.call_tool( "shell_read", { "workspace_id": "workspace-123", "shell_id": "shell-1", "cursor": 5, "max_chars": 1024, "plain": True, "wait_for_idle_ms": 300, }, ) ) wrote = _extract_structured( await server.call_tool( "shell_write", { "workspace_id": "workspace-123", "shell_id": "shell-1", "input": "pwd", "append_newline": False, }, ) ) signaled = _extract_structured( await server.call_tool( "shell_signal", { "workspace_id": "workspace-123", "shell_id": "shell-1", "signal_name": "TERM", }, ) ) closed = _extract_structured( await server.call_tool( "shell_close", {"workspace_id": "workspace-123", "shell_id": "shell-1"}, ) ) file_service = _extract_structured( await server.call_tool( "service_start", { "workspace_id": "workspace-123", "service_name": "file", "command": "run-file", "ready_file": ".ready", }, ) ) tcp_service = _extract_structured( await server.call_tool( "service_start", { "workspace_id": "workspace-123", "service_name": "tcp", "command": "run-tcp", "ready_tcp": "127.0.0.1:8080", }, ) ) http_service = _extract_structured( await server.call_tool( "service_start", { "workspace_id": "workspace-123", "service_name": "http", "command": "run-http", "ready_http": "http://127.0.0.1:8080/", }, ) ) command_service = _extract_structured( await server.call_tool( "service_start", { "workspace_id": "workspace-123", "service_name": "command", "command": "run-command", "ready_command": "test -f .ready", }, ) ) return ( status, summary, logs, opened, read, wrote, signaled, closed, file_service, tcp_service, http_service, command_service, ) results = asyncio.run(_run()) assert results[0]["state"] == "started" assert results[1]["workspace_id"] == "workspace-123" assert results[2]["count"] == 0 assert results[3]["shell_id"] == "shell-1" assert results[7]["closed"] is True assert results[8]["state"] == "running" assert results[11]["state"] == "running" assert calls == [ ("status_workspace", {"workspace_id": "workspace-123"}), ("summarize_workspace", {"workspace_id": "workspace-123"}), ("logs_workspace", {"workspace_id": "workspace-123"}), ( "open_shell", { "workspace_id": "workspace-123", "cwd": "/workspace/src", "cols": 100, "rows": 20, "secret_env": {"TOKEN": "API_TOKEN"}, }, ), ( "read_shell", { "workspace_id": "workspace-123", "shell_id": "shell-1", "cursor": 5, "max_chars": 1024, "plain": True, "wait_for_idle_ms": 300, }, ), ( "write_shell", { "workspace_id": "workspace-123", "shell_id": "shell-1", "input_text": "pwd", "append_newline": False, }, ), ( "signal_shell", { "workspace_id": "workspace-123", "shell_id": "shell-1", "signal_name": "TERM", }, ), ("close_shell", {"workspace_id": "workspace-123", "shell_id": "shell-1"}), ( "start_service", { "workspace_id": "workspace-123", "service_name": "file", "command": "run-file", "cwd": "/workspace", "readiness": {"type": "file", "path": ".ready"}, "ready_timeout_seconds": 30, "ready_interval_ms": 500, "secret_env": None, "published_ports": None, }, ), ( "start_service", { "workspace_id": "workspace-123", "service_name": "tcp", "command": "run-tcp", "cwd": "/workspace", "readiness": {"type": "tcp", "address": "127.0.0.1:8080"}, "ready_timeout_seconds": 30, "ready_interval_ms": 500, "secret_env": None, "published_ports": None, }, ), ( "start_service", { "workspace_id": "workspace-123", "service_name": "http", "command": "run-http", "cwd": "/workspace", "readiness": {"type": "http", "url": "http://127.0.0.1:8080/"}, "ready_timeout_seconds": 30, "ready_interval_ms": 500, "secret_env": None, "published_ports": None, }, ), ( "start_service", { "workspace_id": "workspace-123", "service_name": "command", "command": "run-command", "cwd": "/workspace", "readiness": {"type": "command", "command": "test -f .ready"}, "ready_timeout_seconds": 30, "ready_interval_ms": 500, "secret_env": None, "published_ports": None, }, ), ]