pyro-mcp/tests/test_api.py
Thales Maciel c82f4629b2 Add workspace network policy and published ports
Replace the workspace-level boolean network toggle with explicit network policies and attach localhost TCP publication to workspace services.

Persist network_policy in workspace records, validate --publish requests, and run host-side proxy helpers that follow the service lifecycle so published ports are cleaned up on failure, stop, reset, and delete.

Update the CLI, SDK, MCP contract, docs, roadmap, and examples for the new policy model, add coverage for the proxy and manager edge cases, and validate with uv lock, UV_CACHE_DIR=.uv-cache make check, UV_CACHE_DIR=.uv-cache make dist-check, and a real guest-backed published-port probe smoke.
2026-03-12 18:12:57 -03:00

291 lines
9.8 KiB
Python

from __future__ import annotations
import asyncio
import time
from pathlib import Path
from typing import Any, cast
from pyro_mcp.api import Pyro
from pyro_mcp.vm_manager import VmManager
from pyro_mcp.vm_network import TapNetworkManager
def test_pyro_run_in_vm_delegates_to_manager(tmp_path: Path) -> None:
pyro = Pyro(
manager=VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
)
result = pyro.run_in_vm(
environment="debian:12-base",
command="printf 'ok\\n'",
vcpu_count=1,
mem_mib=512,
timeout_seconds=30,
ttl_seconds=600,
network=False,
allow_host_compat=True,
)
assert int(result["exit_code"]) == 0
assert str(result["stdout"]) == "ok\n"
def test_pyro_create_server_registers_vm_run(tmp_path: Path) -> None:
pyro = Pyro(
manager=VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
)
async def _run() -> list[str]:
server = pyro.create_server()
tools = await server.list_tools()
return sorted(tool.name for tool in tools)
tool_names = asyncio.run(_run())
assert "vm_run" in tool_names
assert "vm_create" in tool_names
assert "workspace_create" in tool_names
assert "workspace_diff" in tool_names
assert "workspace_sync_push" in tool_names
assert "workspace_export" in tool_names
assert "snapshot_create" in tool_names
assert "snapshot_list" in tool_names
assert "snapshot_delete" in tool_names
assert "workspace_reset" in tool_names
assert "shell_open" in tool_names
assert "shell_read" in tool_names
assert "shell_write" in tool_names
assert "shell_signal" in tool_names
assert "shell_close" in tool_names
assert "service_start" in tool_names
assert "service_list" in tool_names
assert "service_status" in tool_names
assert "service_logs" in tool_names
assert "service_stop" in tool_names
def test_pyro_vm_run_tool_executes(tmp_path: Path) -> None:
pyro = Pyro(
manager=VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
)
def _extract_structured(raw_result: object) -> dict[str, Any]:
if not isinstance(raw_result, tuple) or len(raw_result) != 2:
raise TypeError("unexpected call_tool result shape")
_, structured = raw_result
if not isinstance(structured, dict):
raise TypeError("expected structured dictionary result")
return cast(dict[str, Any], structured)
async def _run() -> dict[str, Any]:
server = pyro.create_server()
return _extract_structured(
await server.call_tool(
"vm_run",
{
"environment": "debian:12-base",
"command": "printf 'ok\\n'",
"network": False,
"allow_host_compat": True,
},
)
)
result = asyncio.run(_run())
assert int(result["exit_code"]) == 0
def test_pyro_create_vm_defaults_sizing_and_host_compat(tmp_path: Path) -> None:
pyro = Pyro(
manager=VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
)
created = pyro.create_vm(
environment="debian:12-base",
allow_host_compat=True,
)
assert created["vcpu_count"] == 1
assert created["mem_mib"] == 1024
assert created["allow_host_compat"] is True
def test_pyro_workspace_network_policy_and_published_ports_delegate() -> None:
calls: list[tuple[str, dict[str, Any]]] = []
class StubManager:
def create_workspace(self, **kwargs: Any) -> dict[str, Any]:
calls.append(("create_workspace", kwargs))
return {"workspace_id": "workspace-123"}
def start_service(
self,
workspace_id: str,
service_name: str,
**kwargs: Any,
) -> dict[str, Any]:
calls.append(
(
"start_service",
{
"workspace_id": workspace_id,
"service_name": service_name,
**kwargs,
},
)
)
return {"workspace_id": workspace_id, "service_name": service_name, "state": "running"}
pyro = Pyro(manager=cast(Any, StubManager()))
pyro.create_workspace(
environment="debian:12",
network_policy="egress+published-ports",
)
pyro.start_service(
"workspace-123",
"web",
command="python3 -m http.server 8080",
published_ports=[{"guest_port": 8080, "host_port": 18080}],
)
assert calls[0] == (
"create_workspace",
{
"environment": "debian:12",
"vcpu_count": 1,
"mem_mib": 1024,
"ttl_seconds": 600,
"network_policy": "egress+published-ports",
"allow_host_compat": False,
"seed_path": None,
"secrets": None,
},
)
assert calls[1] == (
"start_service",
{
"workspace_id": "workspace-123",
"service_name": "web",
"command": "python3 -m http.server 8080",
"cwd": "/workspace",
"readiness": None,
"ready_timeout_seconds": 30,
"ready_interval_ms": 500,
"secret_env": None,
"published_ports": [{"guest_port": 8080, "host_port": 18080}],
},
)
def test_pyro_workspace_methods_delegate_to_manager(tmp_path: Path) -> None:
pyro = Pyro(
manager=VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
)
source_dir = tmp_path / "seed"
source_dir.mkdir()
(source_dir / "note.txt").write_text("ok\n", encoding="utf-8")
secret_file = tmp_path / "token.txt"
secret_file.write_text("from-file\n", encoding="utf-8")
created = pyro.create_workspace(
environment="debian:12-base",
allow_host_compat=True,
seed_path=source_dir,
secrets=[
{"name": "API_TOKEN", "value": "expected"},
{"name": "FILE_TOKEN", "file_path": str(secret_file)},
],
)
workspace_id = str(created["workspace_id"])
updated_dir = tmp_path / "updated"
updated_dir.mkdir()
(updated_dir / "more.txt").write_text("more\n", encoding="utf-8")
synced = pyro.push_workspace_sync(workspace_id, updated_dir, dest="subdir")
executed = pyro.exec_workspace(
workspace_id,
command='sh -lc \'printf "%s\\n" "$API_TOKEN"\'',
secret_env={"API_TOKEN": "API_TOKEN"},
)
diff_payload = pyro.diff_workspace(workspace_id)
snapshot = pyro.create_snapshot(workspace_id, "checkpoint")
snapshots = pyro.list_snapshots(workspace_id)
export_path = tmp_path / "exported-note.txt"
exported = pyro.export_workspace(workspace_id, "note.txt", output_path=export_path)
shell = pyro.open_shell(
workspace_id,
secret_env={"API_TOKEN": "API_TOKEN"},
)
shell_id = str(shell["shell_id"])
pyro.write_shell(workspace_id, shell_id, input='printf "%s\\n" "$API_TOKEN"')
shell_output: dict[str, Any] = {}
deadline = time.time() + 5
while time.time() < deadline:
shell_output = pyro.read_shell(workspace_id, shell_id, cursor=0, max_chars=65536)
if "[REDACTED]" in str(shell_output.get("output", "")):
break
time.sleep(0.05)
shell_closed = pyro.close_shell(workspace_id, shell_id)
service = pyro.start_service(
workspace_id,
"app",
command=(
'sh -lc \'trap "exit 0" TERM; printf "%s\\n" "$API_TOKEN" >&2; '
'touch .ready; while true; do sleep 60; done\''
),
readiness={"type": "file", "path": ".ready"},
secret_env={"API_TOKEN": "API_TOKEN"},
)
services = pyro.list_services(workspace_id)
service_status = pyro.status_service(workspace_id, "app")
service_logs = pyro.logs_service(workspace_id, "app", all=True)
reset = pyro.reset_workspace(workspace_id, snapshot="checkpoint")
deleted_snapshot = pyro.delete_snapshot(workspace_id, "checkpoint")
status = pyro.status_workspace(workspace_id)
logs = pyro.logs_workspace(workspace_id)
deleted = pyro.delete_workspace(workspace_id)
assert created["secrets"] == [
{"name": "API_TOKEN", "source_kind": "literal"},
{"name": "FILE_TOKEN", "source_kind": "file"},
]
assert executed["stdout"] == "[REDACTED]\n"
assert created["workspace_seed"]["mode"] == "directory"
assert synced["workspace_sync"]["destination"] == "/workspace/subdir"
assert diff_payload["changed"] is True
assert snapshot["snapshot"]["snapshot_name"] == "checkpoint"
assert snapshots["count"] == 2
assert exported["output_path"] == str(export_path)
assert export_path.read_text(encoding="utf-8") == "ok\n"
assert shell_output["output"].count("[REDACTED]") >= 1
assert shell_closed["closed"] is True
assert service["state"] == "running"
assert services["count"] == 1
assert service_status["state"] == "running"
assert service_logs["stderr"].count("[REDACTED]") >= 1
assert service_logs["tail_lines"] is None
assert reset["workspace_reset"]["snapshot_name"] == "checkpoint"
assert reset["secrets"] == created["secrets"]
assert deleted_snapshot["deleted"] is True
assert status["command_count"] == 0
assert status["service_count"] == 0
assert logs["count"] == 0
assert deleted["deleted"] is True