from __future__ import annotations import asyncio import subprocess from pathlib import Path from typing import Any, cast import pytest import pyro_mcp.server as server_module from pyro_mcp.contract import ( PUBLIC_MCP_COLD_START_MODE_TOOLS, PUBLIC_MCP_REPRO_FIX_MODE_TOOLS, PUBLIC_MCP_VM_RUN_PROFILE_TOOLS, PUBLIC_MCP_WORKSPACE_CORE_PROFILE_TOOLS, ) from pyro_mcp.server import create_server from pyro_mcp.vm_manager import VmManager from pyro_mcp.vm_network import TapNetworkManager def _git(repo: Path, *args: str) -> str: result = subprocess.run( # noqa: S603 ["git", "-c", "commit.gpgsign=false", *args], cwd=repo, check=True, capture_output=True, text=True, ) return result.stdout.strip() def _make_repo(root: Path, *, content: str = "hello\n") -> Path: root.mkdir() _git(root, "init") _git(root, "config", "user.name", "Pyro Tests") _git(root, "config", "user.email", "pyro-tests@example.com") (root / "note.txt").write_text(content, encoding="utf-8") _git(root, "add", "note.txt") _git(root, "commit", "-m", "init") return root def test_create_server_registers_vm_tools(tmp_path: Path) -> None: manager = VmManager( backend_name="mock", base_dir=tmp_path / "vms", network_manager=TapNetworkManager(enabled=False), ) async def _run() -> list[str]: server = create_server(manager=manager) tools = await server.list_tools() return sorted(tool.name for tool in tools) tool_names = asyncio.run(_run()) assert tuple(tool_names) == tuple(sorted(PUBLIC_MCP_WORKSPACE_CORE_PROFILE_TOOLS)) def test_create_server_vm_run_profile_registers_only_vm_run(tmp_path: Path) -> None: manager = VmManager( backend_name="mock", base_dir=tmp_path / "vms", network_manager=TapNetworkManager(enabled=False), ) async def _run() -> list[str]: server = create_server(manager=manager, profile="vm-run") tools = await server.list_tools() return sorted(tool.name for tool in tools) assert tuple(asyncio.run(_run())) == PUBLIC_MCP_VM_RUN_PROFILE_TOOLS def test_create_server_workspace_core_profile_registers_expected_tools(tmp_path: Path) -> None: manager = VmManager( backend_name="mock", base_dir=tmp_path / "vms", network_manager=TapNetworkManager(enabled=False), ) async def _run() -> list[str]: server = create_server(manager=manager, profile="workspace-core") tools = await server.list_tools() return sorted(tool.name for tool in tools) assert tuple(asyncio.run(_run())) == tuple(sorted(PUBLIC_MCP_WORKSPACE_CORE_PROFILE_TOOLS)) def test_create_server_repro_fix_mode_registers_expected_tools(tmp_path: Path) -> None: manager = VmManager( backend_name="mock", base_dir=tmp_path / "vms", network_manager=TapNetworkManager(enabled=False), ) async def _run() -> list[str]: server = create_server(manager=manager, mode="repro-fix") tools = await server.list_tools() return sorted(tool.name for tool in tools) assert tuple(asyncio.run(_run())) == tuple(sorted(PUBLIC_MCP_REPRO_FIX_MODE_TOOLS)) def test_create_server_cold_start_mode_registers_expected_tools(tmp_path: Path) -> None: manager = VmManager( backend_name="mock", base_dir=tmp_path / "vms", network_manager=TapNetworkManager(enabled=False), ) async def _run() -> list[str]: server = create_server(manager=manager, mode="cold-start") tools = await server.list_tools() return sorted(tool.name for tool in tools) assert tuple(asyncio.run(_run())) == tuple(sorted(PUBLIC_MCP_COLD_START_MODE_TOOLS)) def test_create_server_workspace_create_description_mentions_project_source(tmp_path: Path) -> None: manager = VmManager( backend_name="mock", base_dir=tmp_path / "vms", network_manager=TapNetworkManager(enabled=False), ) repo = _make_repo(tmp_path / "repo") async def _run() -> dict[str, Any]: server = create_server(manager=manager, project_path=repo) tools = await server.list_tools() tool_map = {tool.name: tool.model_dump() for tool in tools} return tool_map["workspace_create"] workspace_create = asyncio.run(_run()) description = cast(str, workspace_create["description"]) assert "If `seed_path` is omitted" in description assert str(repo.resolve()) in description def test_create_server_project_path_seeds_workspace_when_seed_path_is_omitted( tmp_path: Path, ) -> None: manager = VmManager( backend_name="mock", base_dir=tmp_path / "vms", network_manager=TapNetworkManager(enabled=False), ) repo = _make_repo(tmp_path / "repo", content="project-aware\n") def _extract_structured(raw_result: object) -> dict[str, Any]: if not isinstance(raw_result, tuple) or len(raw_result) != 2: raise TypeError("unexpected call_tool result shape") _, structured = raw_result if not isinstance(structured, dict): raise TypeError("expected structured dictionary result") return cast(dict[str, Any], structured) async def _run() -> tuple[dict[str, Any], dict[str, Any]]: server = create_server(manager=manager, project_path=repo) created = _extract_structured( await server.call_tool( "workspace_create", { "environment": "debian:12-base", "allow_host_compat": True, }, ) ) executed = _extract_structured( await server.call_tool( "workspace_exec", { "workspace_id": created["workspace_id"], "command": "cat note.txt", }, ) ) return created, executed created, executed = asyncio.run(_run()) assert created["workspace_seed"]["mode"] == "directory" assert created["workspace_seed"]["seed_path"] == str(repo.resolve()) assert created["workspace_seed"]["origin_kind"] == "project_path" assert created["workspace_seed"]["origin_ref"] == str(repo.resolve()) assert executed["stdout"] == "project-aware\n" def test_create_server_repo_url_seeds_workspace_when_seed_path_is_omitted(tmp_path: Path) -> None: manager = VmManager( backend_name="mock", base_dir=tmp_path / "vms", network_manager=TapNetworkManager(enabled=False), ) repo = _make_repo(tmp_path / "repo", content="committed\n") (repo / "note.txt").write_text("dirty\n", encoding="utf-8") def _extract_structured(raw_result: object) -> dict[str, Any]: if not isinstance(raw_result, tuple) or len(raw_result) != 2: raise TypeError("unexpected call_tool result shape") _, structured = raw_result if not isinstance(structured, dict): raise TypeError("expected structured dictionary result") return cast(dict[str, Any], structured) async def _run() -> tuple[dict[str, Any], dict[str, Any]]: server = create_server(manager=manager, repo_url=str(repo.resolve())) created = _extract_structured( await server.call_tool( "workspace_create", { "environment": "debian:12-base", "allow_host_compat": True, }, ) ) executed = _extract_structured( await server.call_tool( "workspace_exec", { "workspace_id": created["workspace_id"], "command": "cat note.txt", }, ) ) return created, executed created, executed = asyncio.run(_run()) assert created["workspace_seed"]["mode"] == "directory" assert created["workspace_seed"]["seed_path"] is None assert created["workspace_seed"]["origin_kind"] == "repo_url" assert created["workspace_seed"]["origin_ref"] == str(repo.resolve()) assert executed["stdout"] == "committed\n" def test_vm_run_round_trip(tmp_path: Path) -> None: manager = VmManager( backend_name="mock", base_dir=tmp_path / "vms", network_manager=TapNetworkManager(enabled=False), ) def _extract_structured(raw_result: object) -> dict[str, Any]: if not isinstance(raw_result, tuple) or len(raw_result) != 2: raise TypeError("unexpected call_tool result shape") _, structured = raw_result if not isinstance(structured, dict): raise TypeError("expected structured dictionary result") return cast(dict[str, Any], structured) async def _run() -> dict[str, Any]: server = create_server(manager=manager) executed = _extract_structured( await server.call_tool( "vm_run", { "environment": "debian:12", "command": "printf 'git version 2.0\\n'", "ttl_seconds": 600, "network": False, "allow_host_compat": True, }, ) ) return executed executed = asyncio.run(_run()) assert int(executed["exit_code"]) == 0 assert "git version" in str(executed["stdout"]) def test_vm_tools_status_stop_delete_and_reap(tmp_path: Path) -> None: manager = VmManager( backend_name="mock", base_dir=tmp_path / "vms", network_manager=TapNetworkManager(enabled=False), ) manager.MIN_TTL_SECONDS = 1 def _extract_structured(raw_result: object) -> dict[str, Any]: if not isinstance(raw_result, tuple) or len(raw_result) != 2: raise TypeError("unexpected call_tool result shape") _, structured = raw_result if not isinstance(structured, dict): raise TypeError("expected structured dictionary result") return cast(dict[str, Any], structured) async def _run() -> tuple[ dict[str, Any], dict[str, Any], dict[str, Any], dict[str, Any], list[dict[str, object]], dict[str, Any], ]: server = create_server(manager=manager, profile="workspace-full") environments_raw = await server.call_tool("vm_list_environments", {}) if not isinstance(environments_raw, tuple) or len(environments_raw) != 2: raise TypeError("unexpected environments result") _, environments_structured = environments_raw if not isinstance(environments_structured, dict): raise TypeError("environments tool should return a dictionary") raw_environments = environments_structured.get("result") if not isinstance(raw_environments, list): raise TypeError("environments tool did not contain a result list") created = _extract_structured( await server.call_tool( "vm_create", { "environment": "debian:12-base", "ttl_seconds": 600, "allow_host_compat": True, }, ) ) vm_id = str(created["vm_id"]) await server.call_tool("vm_start", {"vm_id": vm_id}) status = _extract_structured(await server.call_tool("vm_status", {"vm_id": vm_id})) network = _extract_structured(await server.call_tool("vm_network_info", {"vm_id": vm_id})) stopped = _extract_structured(await server.call_tool("vm_stop", {"vm_id": vm_id})) deleted = _extract_structured(await server.call_tool("vm_delete", {"vm_id": vm_id})) expiring = _extract_structured( await server.call_tool( "vm_create", { "environment": "debian:12-base", "ttl_seconds": 1, "allow_host_compat": True, }, ) ) expiring_id = str(expiring["vm_id"]) manager._instances[expiring_id].expires_at = 0.0 # noqa: SLF001 reaped = _extract_structured(await server.call_tool("vm_reap_expired", {})) return ( status, network, stopped, deleted, cast(list[dict[str, object]], raw_environments), reaped, ) status, network, stopped, deleted, environments, reaped = asyncio.run(_run()) assert status["state"] == "started" assert network["network_enabled"] is False assert stopped["state"] == "stopped" assert bool(deleted["deleted"]) is True assert environments[0]["name"] == "debian:12" assert int(reaped["count"]) == 1 def test_server_main_runs_stdio_transport(monkeypatch: pytest.MonkeyPatch) -> None: called: dict[str, str] = {} class StubServer: def run(self, transport: str) -> None: called["transport"] = transport monkeypatch.setattr(server_module, "create_server", lambda: StubServer()) server_module.main() assert called == {"transport": "stdio"} def test_workspace_core_profile_round_trip(tmp_path: Path) -> None: manager = VmManager( backend_name="mock", base_dir=tmp_path / "vms", network_manager=TapNetworkManager(enabled=False), ) source_dir = tmp_path / "seed" source_dir.mkdir() (source_dir / "note.txt").write_text("old\n", encoding="utf-8") def _extract_structured(raw_result: object) -> dict[str, Any]: if not isinstance(raw_result, tuple) or len(raw_result) != 2: raise TypeError("unexpected call_tool result shape") _, structured = raw_result if not isinstance(structured, dict): raise TypeError("expected structured dictionary result") return cast(dict[str, Any], structured) async def _run() -> tuple[dict[str, Any], ...]: server = create_server(manager=manager, profile="workspace-core") created = _extract_structured( await server.call_tool( "workspace_create", { "environment": "debian:12-base", "allow_host_compat": True, "seed_path": str(source_dir), "name": "chat-loop", "labels": {"issue": "123"}, }, ) ) workspace_id = str(created["workspace_id"]) written = _extract_structured( await server.call_tool( "workspace_file_write", { "workspace_id": workspace_id, "path": "note.txt", "text": "fixed\n", }, ) ) executed = _extract_structured( await server.call_tool( "workspace_exec", { "workspace_id": workspace_id, "command": "cat note.txt", }, ) ) diffed = _extract_structured( await server.call_tool("workspace_diff", {"workspace_id": workspace_id}) ) export_path = tmp_path / "exported-note.txt" exported = _extract_structured( await server.call_tool( "workspace_export", { "workspace_id": workspace_id, "path": "note.txt", "output_path": str(export_path), }, ) ) reset = _extract_structured( await server.call_tool("workspace_reset", {"workspace_id": workspace_id}) ) deleted = _extract_structured( await server.call_tool("workspace_delete", {"workspace_id": workspace_id}) ) return created, written, executed, diffed, exported, reset, deleted created, written, executed, diffed, exported, reset, deleted = asyncio.run(_run()) assert created["name"] == "chat-loop" assert created["labels"] == {"issue": "123"} assert written["bytes_written"] == len("fixed\n".encode("utf-8")) assert executed["stdout"] == "fixed\n" assert diffed["changed"] is True assert Path(str(exported["output_path"])).read_text(encoding="utf-8") == "fixed\n" assert reset["command_count"] == 0 assert deleted["deleted"] is True def test_workspace_tools_round_trip(tmp_path: Path) -> None: manager = VmManager( backend_name="mock", base_dir=tmp_path / "vms", network_manager=TapNetworkManager(enabled=False), ) source_dir = tmp_path / "seed" source_dir.mkdir() (source_dir / "note.txt").write_text("ok\n", encoding="utf-8") secret_file = tmp_path / "token.txt" secret_file.write_text("from-file\n", encoding="utf-8") def _extract_structured(raw_result: object) -> dict[str, Any]: if not isinstance(raw_result, tuple) or len(raw_result) != 2: raise TypeError("unexpected call_tool result shape") _, structured = raw_result if not isinstance(structured, dict): raise TypeError("expected structured dictionary result") return cast(dict[str, Any], structured) async def _run() -> tuple[dict[str, Any], ...]: server = create_server(manager=manager, profile="workspace-full") created = _extract_structured( await server.call_tool( "workspace_create", { "environment": "debian:12-base", "allow_host_compat": True, "seed_path": str(source_dir), "name": "repro-fix", "labels": {"issue": "123"}, "secrets": [ {"name": "API_TOKEN", "value": "expected"}, {"name": "FILE_TOKEN", "file_path": str(secret_file)}, ], }, ) ) workspace_id = str(created["workspace_id"]) listed_before = _extract_structured(await server.call_tool("workspace_list", {})) updated = _extract_structured( await server.call_tool( "workspace_update", { "workspace_id": workspace_id, "labels": {"owner": "codex"}, "clear_labels": ["issue"], }, ) ) update_dir = tmp_path / "update" update_dir.mkdir() (update_dir / "more.txt").write_text("more\n", encoding="utf-8") synced = _extract_structured( await server.call_tool( "workspace_sync_push", { "workspace_id": workspace_id, "source_path": str(update_dir), "dest": "subdir", }, ) ) executed = _extract_structured( await server.call_tool( "workspace_exec", { "workspace_id": workspace_id, "command": 'sh -lc \'printf "%s\\n" "$API_TOKEN"\'', "secret_env": {"API_TOKEN": "API_TOKEN"}, }, ) ) listed_files = _extract_structured( await server.call_tool( "workspace_file_list", { "workspace_id": workspace_id, "path": "/workspace", "recursive": True, }, ) ) file_read = _extract_structured( await server.call_tool( "workspace_file_read", { "workspace_id": workspace_id, "path": "note.txt", "max_bytes": 4096, }, ) ) file_written = _extract_structured( await server.call_tool( "workspace_file_write", { "workspace_id": workspace_id, "path": "src/app.py", "text": "print('hello from file op')\n", }, ) ) patched = _extract_structured( await server.call_tool( "workspace_patch_apply", { "workspace_id": workspace_id, "patch": ( "--- a/note.txt\n" "+++ b/note.txt\n" "@@ -1 +1 @@\n" "-ok\n" "+patched\n" ), }, ) ) diffed = _extract_structured( await server.call_tool("workspace_diff", {"workspace_id": workspace_id}) ) snapshot = _extract_structured( await server.call_tool( "snapshot_create", {"workspace_id": workspace_id, "snapshot_name": "checkpoint"}, ) ) snapshots = _extract_structured( await server.call_tool("snapshot_list", {"workspace_id": workspace_id}) ) export_path = tmp_path / "exported-more.txt" exported = _extract_structured( await server.call_tool( "workspace_export", { "workspace_id": workspace_id, "path": "subdir/more.txt", "output_path": str(export_path), }, ) ) service = _extract_structured( await server.call_tool( "service_start", { "workspace_id": workspace_id, "service_name": "app", "command": ( 'sh -lc \'trap "exit 0" TERM; printf "%s\\n" "$API_TOKEN" >&2; ' 'touch .ready; while true; do sleep 60; done\'' ), "ready_file": ".ready", "secret_env": {"API_TOKEN": "API_TOKEN"}, }, ) ) services = _extract_structured( await server.call_tool("service_list", {"workspace_id": workspace_id}) ) service_status = _extract_structured( await server.call_tool( "service_status", { "workspace_id": workspace_id, "service_name": "app", }, ) ) service_logs = _extract_structured( await server.call_tool( "service_logs", { "workspace_id": workspace_id, "service_name": "app", "all": True, }, ) ) service_stopped = _extract_structured( await server.call_tool( "service_stop", { "workspace_id": workspace_id, "service_name": "app", }, ) ) summary = _extract_structured( await server.call_tool("workspace_summary", {"workspace_id": workspace_id}) ) reset = _extract_structured( await server.call_tool( "workspace_reset", {"workspace_id": workspace_id, "snapshot": "checkpoint"}, ) ) deleted_snapshot = _extract_structured( await server.call_tool( "snapshot_delete", {"workspace_id": workspace_id, "snapshot_name": "checkpoint"}, ) ) logs = _extract_structured( await server.call_tool("workspace_logs", {"workspace_id": workspace_id}) ) deleted = _extract_structured( await server.call_tool("workspace_delete", {"workspace_id": workspace_id}) ) return ( created, listed_before, updated, synced, executed, listed_files, file_read, file_written, patched, diffed, snapshot, snapshots, exported, service, services, service_status, service_logs, service_stopped, summary, reset, deleted_snapshot, logs, deleted, ) ( created, listed_before, updated, synced, executed, listed_files, file_read, file_written, patched, diffed, snapshot, snapshots, exported, service, services, service_status, service_logs, service_stopped, summary, reset, deleted_snapshot, logs, deleted, ) = asyncio.run(_run()) assert created["state"] == "started" assert created["name"] == "repro-fix" assert created["labels"] == {"issue": "123"} assert listed_before["count"] == 1 assert listed_before["workspaces"][0]["name"] == "repro-fix" assert updated["labels"] == {"owner": "codex"} assert created["workspace_seed"]["mode"] == "directory" assert created["secrets"] == [ {"name": "API_TOKEN", "source_kind": "literal"}, {"name": "FILE_TOKEN", "source_kind": "file"}, ] assert synced["workspace_sync"]["destination"] == "/workspace/subdir" assert executed["stdout"] == "[REDACTED]\n" assert any(entry["path"] == "/workspace/note.txt" for entry in listed_files["entries"]) assert file_read["content"] == "ok\n" assert file_written["path"] == "/workspace/src/app.py" assert patched["changed"] is True assert diffed["changed"] is True assert snapshot["snapshot"]["snapshot_name"] == "checkpoint" assert [entry["snapshot_name"] for entry in snapshots["snapshots"]] == [ "baseline", "checkpoint", ] assert exported["artifact_type"] == "file" assert Path(str(exported["output_path"])).read_text(encoding="utf-8") == "more\n" assert service["state"] == "running" assert services["count"] == 1 assert service_status["state"] == "running" assert service_logs["stderr"].count("[REDACTED]") >= 1 assert service_logs["tail_lines"] is None assert service_stopped["state"] == "stopped" assert summary["workspace_id"] == created["workspace_id"] assert summary["commands"]["total"] >= 1 assert summary["changes"]["available"] is True assert summary["artifacts"]["exports"][0]["workspace_path"] == "/workspace/subdir/more.txt" assert reset["workspace_reset"]["snapshot_name"] == "checkpoint" assert reset["secrets"] == created["secrets"] assert reset["command_count"] == 0 assert reset["service_count"] == 0 assert deleted_snapshot["deleted"] is True assert logs["count"] == 0 assert deleted["deleted"] is True