from __future__ import annotations import asyncio from pathlib import Path from typing import Any, cast import pytest import pyro_mcp.server as server_module from pyro_mcp.server import create_server from pyro_mcp.vm_manager import VmManager from pyro_mcp.vm_network import TapNetworkManager def test_create_server_registers_vm_tools(tmp_path: Path) -> None: manager = VmManager( backend_name="mock", base_dir=tmp_path / "vms", network_manager=TapNetworkManager(enabled=False), ) async def _run() -> list[str]: server = create_server(manager=manager) tools = await server.list_tools() return sorted(tool.name for tool in tools) tool_names = asyncio.run(_run()) assert "vm_create" in tool_names assert "vm_exec" in tool_names assert "vm_list_environments" in tool_names assert "vm_network_info" in tool_names assert "vm_run" in tool_names assert "vm_status" in tool_names def test_vm_run_round_trip(tmp_path: Path) -> None: manager = VmManager( backend_name="mock", base_dir=tmp_path / "vms", network_manager=TapNetworkManager(enabled=False), ) def _extract_structured(raw_result: object) -> dict[str, Any]: if not isinstance(raw_result, tuple) or len(raw_result) != 2: raise TypeError("unexpected call_tool result shape") _, structured = raw_result if not isinstance(structured, dict): raise TypeError("expected structured dictionary result") return cast(dict[str, Any], structured) async def _run() -> dict[str, Any]: server = create_server(manager=manager) executed = _extract_structured( await server.call_tool( "vm_run", { "environment": "debian:12", "command": "printf 'git version 2.0\\n'", "vcpu_count": 1, "mem_mib": 512, "ttl_seconds": 600, "network": False, }, ) ) return executed executed = asyncio.run(_run()) assert int(executed["exit_code"]) == 0 assert "git version" in str(executed["stdout"]) def test_vm_tools_status_stop_delete_and_reap(tmp_path: Path) -> None: manager = VmManager( backend_name="mock", base_dir=tmp_path / "vms", network_manager=TapNetworkManager(enabled=False), ) manager.MIN_TTL_SECONDS = 1 def _extract_structured(raw_result: object) -> dict[str, Any]: if not isinstance(raw_result, tuple) or len(raw_result) != 2: raise TypeError("unexpected call_tool result shape") _, structured = raw_result if not isinstance(structured, dict): raise TypeError("expected structured dictionary result") return cast(dict[str, Any], structured) async def _run() -> tuple[ dict[str, Any], dict[str, Any], dict[str, Any], dict[str, Any], list[dict[str, object]], dict[str, Any], ]: server = create_server(manager=manager) environments_raw = await server.call_tool("vm_list_environments", {}) if not isinstance(environments_raw, tuple) or len(environments_raw) != 2: raise TypeError("unexpected environments result") _, environments_structured = environments_raw if not isinstance(environments_structured, dict): raise TypeError("environments tool should return a dictionary") raw_environments = environments_structured.get("result") if not isinstance(raw_environments, list): raise TypeError("environments tool did not contain a result list") created = _extract_structured( await server.call_tool( "vm_create", { "environment": "debian:12-base", "vcpu_count": 1, "mem_mib": 512, "ttl_seconds": 600, }, ) ) vm_id = str(created["vm_id"]) await server.call_tool("vm_start", {"vm_id": vm_id}) status = _extract_structured(await server.call_tool("vm_status", {"vm_id": vm_id})) network = _extract_structured(await server.call_tool("vm_network_info", {"vm_id": vm_id})) stopped = _extract_structured(await server.call_tool("vm_stop", {"vm_id": vm_id})) deleted = _extract_structured(await server.call_tool("vm_delete", {"vm_id": vm_id})) expiring = _extract_structured( await server.call_tool( "vm_create", { "environment": "debian:12-base", "vcpu_count": 1, "mem_mib": 512, "ttl_seconds": 1, }, ) ) expiring_id = str(expiring["vm_id"]) manager._instances[expiring_id].expires_at = 0.0 # noqa: SLF001 reaped = _extract_structured(await server.call_tool("vm_reap_expired", {})) return ( status, network, stopped, deleted, cast(list[dict[str, object]], raw_environments), reaped, ) status, network, stopped, deleted, environments, reaped = asyncio.run(_run()) assert status["state"] == "started" assert network["network_enabled"] is False assert stopped["state"] == "stopped" assert bool(deleted["deleted"]) is True assert environments[0]["name"] == "debian:12" assert int(reaped["count"]) == 1 def test_server_main_runs_stdio_transport(monkeypatch: pytest.MonkeyPatch) -> None: called: dict[str, str] = {} class StubServer: def run(self, transport: str) -> None: called["transport"] = transport monkeypatch.setattr(server_module, "create_server", lambda: StubServer()) server_module.main() assert called == {"transport": "stdio"}