pyro-mcp/tests/test_api.py
Thales Maciel 3f8293ad24 Add persistent workspace shell sessions
Let agents inhabit a workspace across separate calls instead of only submitting one-shot execs.

Add workspace shell open/read/write/signal/close across the CLI, Python SDK, and MCP server, with persisted shell records, a local PTY-backed mock implementation, and guest-agent support for real Firecracker workspaces.

Mark the 2.5.0 roadmap milestone done, refresh docs/examples and the release metadata, and verify with uv lock, UV_CACHE_DIR=.uv-cache make check, and UV_CACHE_DIR=.uv-cache make dist-check.
2026-03-12 02:31:57 -03:00

162 lines
5.2 KiB
Python

from __future__ import annotations
import asyncio
import time
from pathlib import Path
from typing import Any, cast
from pyro_mcp.api import Pyro
from pyro_mcp.vm_manager import VmManager
from pyro_mcp.vm_network import TapNetworkManager
def test_pyro_run_in_vm_delegates_to_manager(tmp_path: Path) -> None:
pyro = Pyro(
manager=VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
)
result = pyro.run_in_vm(
environment="debian:12-base",
command="printf 'ok\\n'",
vcpu_count=1,
mem_mib=512,
timeout_seconds=30,
ttl_seconds=600,
network=False,
allow_host_compat=True,
)
assert int(result["exit_code"]) == 0
assert str(result["stdout"]) == "ok\n"
def test_pyro_create_server_registers_vm_run(tmp_path: Path) -> None:
pyro = Pyro(
manager=VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
)
async def _run() -> list[str]:
server = pyro.create_server()
tools = await server.list_tools()
return sorted(tool.name for tool in tools)
tool_names = asyncio.run(_run())
assert "vm_run" in tool_names
assert "vm_create" in tool_names
assert "workspace_create" in tool_names
assert "workspace_sync_push" in tool_names
assert "shell_open" in tool_names
assert "shell_read" in tool_names
assert "shell_write" in tool_names
assert "shell_signal" in tool_names
assert "shell_close" in tool_names
def test_pyro_vm_run_tool_executes(tmp_path: Path) -> None:
pyro = Pyro(
manager=VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
)
def _extract_structured(raw_result: object) -> dict[str, Any]:
if not isinstance(raw_result, tuple) or len(raw_result) != 2:
raise TypeError("unexpected call_tool result shape")
_, structured = raw_result
if not isinstance(structured, dict):
raise TypeError("expected structured dictionary result")
return cast(dict[str, Any], structured)
async def _run() -> dict[str, Any]:
server = pyro.create_server()
return _extract_structured(
await server.call_tool(
"vm_run",
{
"environment": "debian:12-base",
"command": "printf 'ok\\n'",
"network": False,
"allow_host_compat": True,
},
)
)
result = asyncio.run(_run())
assert int(result["exit_code"]) == 0
def test_pyro_create_vm_defaults_sizing_and_host_compat(tmp_path: Path) -> None:
pyro = Pyro(
manager=VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
)
created = pyro.create_vm(
environment="debian:12-base",
allow_host_compat=True,
)
assert created["vcpu_count"] == 1
assert created["mem_mib"] == 1024
assert created["allow_host_compat"] is True
def test_pyro_workspace_methods_delegate_to_manager(tmp_path: Path) -> None:
pyro = Pyro(
manager=VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
)
source_dir = tmp_path / "seed"
source_dir.mkdir()
(source_dir / "note.txt").write_text("ok\n", encoding="utf-8")
created = pyro.create_workspace(
environment="debian:12-base",
allow_host_compat=True,
seed_path=source_dir,
)
workspace_id = str(created["workspace_id"])
updated_dir = tmp_path / "updated"
updated_dir.mkdir()
(updated_dir / "more.txt").write_text("more\n", encoding="utf-8")
synced = pyro.push_workspace_sync(workspace_id, updated_dir, dest="subdir")
executed = pyro.exec_workspace(workspace_id, command="cat note.txt")
opened = pyro.open_shell(workspace_id)
shell_id = str(opened["shell_id"])
written = pyro.write_shell(workspace_id, shell_id, input="pwd")
read = pyro.read_shell(workspace_id, shell_id)
deadline = time.time() + 5
while "/workspace" not in str(read["output"]) and time.time() < deadline:
read = pyro.read_shell(workspace_id, shell_id, cursor=0)
time.sleep(0.05)
signaled = pyro.signal_shell(workspace_id, shell_id)
closed = pyro.close_shell(workspace_id, shell_id)
status = pyro.status_workspace(workspace_id)
logs = pyro.logs_workspace(workspace_id)
deleted = pyro.delete_workspace(workspace_id)
assert executed["stdout"] == "ok\n"
assert created["workspace_seed"]["mode"] == "directory"
assert synced["workspace_sync"]["destination"] == "/workspace/subdir"
assert written["input_length"] == 3
assert "/workspace" in read["output"]
assert signaled["signal"] == "INT"
assert closed["closed"] is True
assert status["command_count"] == 1
assert logs["count"] == 1
assert deleted["deleted"] is True