pyro-mcp/tests/test_server.py
Thales Maciel 58df176148 Add persistent task workspace alpha
Start the first workspace milestone toward the task-oriented product without changing the existing one-shot vm_run/pyro run contract.

Add a disk-backed task registry in the manager, auto-started task workspaces rooted at /workspace, repeated non-cleaning exec, and persisted command journals exposed through task create/exec/status/logs/delete across the CLI, Python SDK, and MCP server.

Update the public contract, docs, examples, and version/catalog metadata for 2.1.0, and cover the new surface with manager, CLI, SDK, and MCP tests. Validation: UV_CACHE_DIR=.uv-cache make check and UV_CACHE_DIR=.uv-cache make dist-check.
2026-03-11 20:10:10 -03:00

212 lines
7.6 KiB
Python

from __future__ import annotations
import asyncio
from pathlib import Path
from typing import Any, cast
import pytest
import pyro_mcp.server as server_module
from pyro_mcp.server import create_server
from pyro_mcp.vm_manager import VmManager
from pyro_mcp.vm_network import TapNetworkManager
def test_create_server_registers_vm_tools(tmp_path: Path) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
async def _run() -> list[str]:
server = create_server(manager=manager)
tools = await server.list_tools()
return sorted(tool.name for tool in tools)
tool_names = asyncio.run(_run())
assert "vm_create" in tool_names
assert "vm_exec" in tool_names
assert "vm_list_environments" in tool_names
assert "vm_network_info" in tool_names
assert "vm_run" in tool_names
assert "vm_status" in tool_names
assert "task_create" in tool_names
assert "task_logs" in tool_names
def test_vm_run_round_trip(tmp_path: Path) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
def _extract_structured(raw_result: object) -> dict[str, Any]:
if not isinstance(raw_result, tuple) or len(raw_result) != 2:
raise TypeError("unexpected call_tool result shape")
_, structured = raw_result
if not isinstance(structured, dict):
raise TypeError("expected structured dictionary result")
return cast(dict[str, Any], structured)
async def _run() -> dict[str, Any]:
server = create_server(manager=manager)
executed = _extract_structured(
await server.call_tool(
"vm_run",
{
"environment": "debian:12",
"command": "printf 'git version 2.0\\n'",
"ttl_seconds": 600,
"network": False,
"allow_host_compat": True,
},
)
)
return executed
executed = asyncio.run(_run())
assert int(executed["exit_code"]) == 0
assert "git version" in str(executed["stdout"])
def test_vm_tools_status_stop_delete_and_reap(tmp_path: Path) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
manager.MIN_TTL_SECONDS = 1
def _extract_structured(raw_result: object) -> dict[str, Any]:
if not isinstance(raw_result, tuple) or len(raw_result) != 2:
raise TypeError("unexpected call_tool result shape")
_, structured = raw_result
if not isinstance(structured, dict):
raise TypeError("expected structured dictionary result")
return cast(dict[str, Any], structured)
async def _run() -> tuple[
dict[str, Any],
dict[str, Any],
dict[str, Any],
dict[str, Any],
list[dict[str, object]],
dict[str, Any],
]:
server = create_server(manager=manager)
environments_raw = await server.call_tool("vm_list_environments", {})
if not isinstance(environments_raw, tuple) or len(environments_raw) != 2:
raise TypeError("unexpected environments result")
_, environments_structured = environments_raw
if not isinstance(environments_structured, dict):
raise TypeError("environments tool should return a dictionary")
raw_environments = environments_structured.get("result")
if not isinstance(raw_environments, list):
raise TypeError("environments tool did not contain a result list")
created = _extract_structured(
await server.call_tool(
"vm_create",
{
"environment": "debian:12-base",
"ttl_seconds": 600,
"allow_host_compat": True,
},
)
)
vm_id = str(created["vm_id"])
await server.call_tool("vm_start", {"vm_id": vm_id})
status = _extract_structured(await server.call_tool("vm_status", {"vm_id": vm_id}))
network = _extract_structured(await server.call_tool("vm_network_info", {"vm_id": vm_id}))
stopped = _extract_structured(await server.call_tool("vm_stop", {"vm_id": vm_id}))
deleted = _extract_structured(await server.call_tool("vm_delete", {"vm_id": vm_id}))
expiring = _extract_structured(
await server.call_tool(
"vm_create",
{
"environment": "debian:12-base",
"ttl_seconds": 1,
"allow_host_compat": True,
},
)
)
expiring_id = str(expiring["vm_id"])
manager._instances[expiring_id].expires_at = 0.0 # noqa: SLF001
reaped = _extract_structured(await server.call_tool("vm_reap_expired", {}))
return (
status,
network,
stopped,
deleted,
cast(list[dict[str, object]], raw_environments),
reaped,
)
status, network, stopped, deleted, environments, reaped = asyncio.run(_run())
assert status["state"] == "started"
assert network["network_enabled"] is False
assert stopped["state"] == "stopped"
assert bool(deleted["deleted"]) is True
assert environments[0]["name"] == "debian:12"
assert int(reaped["count"]) == 1
def test_server_main_runs_stdio_transport(monkeypatch: pytest.MonkeyPatch) -> None:
called: dict[str, str] = {}
class StubServer:
def run(self, transport: str) -> None:
called["transport"] = transport
monkeypatch.setattr(server_module, "create_server", lambda: StubServer())
server_module.main()
assert called == {"transport": "stdio"}
def test_task_tools_round_trip(tmp_path: Path) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
def _extract_structured(raw_result: object) -> dict[str, Any]:
if not isinstance(raw_result, tuple) or len(raw_result) != 2:
raise TypeError("unexpected call_tool result shape")
_, structured = raw_result
if not isinstance(structured, dict):
raise TypeError("expected structured dictionary result")
return cast(dict[str, Any], structured)
async def _run() -> tuple[dict[str, Any], dict[str, Any], dict[str, Any], dict[str, Any]]:
server = create_server(manager=manager)
created = _extract_structured(
await server.call_tool(
"task_create",
{
"environment": "debian:12-base",
"allow_host_compat": True,
},
)
)
task_id = str(created["task_id"])
executed = _extract_structured(
await server.call_tool(
"task_exec",
{
"task_id": task_id,
"command": "printf 'ok\\n'",
},
)
)
logs = _extract_structured(await server.call_tool("task_logs", {"task_id": task_id}))
deleted = _extract_structured(await server.call_tool("task_delete", {"task_id": task_id}))
return created, executed, logs, deleted
created, executed, logs, deleted = asyncio.run(_run())
assert created["state"] == "started"
assert executed["stdout"] == "ok\n"
assert logs["count"] == 1
assert deleted["deleted"] is True