pyro-mcp/tests/test_api.py
Thales Maciel aa886b346e Add seeded task workspace creation
Current persistent tasks started with an empty workspace, which blocked the first useful host-to-task workflow in the task roadmap. This change lets task creation start from a host directory or tar archive without changing the one-shot VM surfaces.

Expose source_path on task create across the CLI, SDK, and MCP, add safe archive upload and extraction support for guest and host-compat backends, persist workspace_seed metadata, and patch the per-task rootfs with the bundled guest agent before boot so seeded guest tasks work without republishing environments. Also switch post--- command reconstruction to shlex.join() so documented sh -lc task examples preserve argument boundaries.

Validation:
- uv lock
- UV_CACHE_DIR=.uv-cache uv run pytest --no-cov tests/test_vm_guest.py tests/test_vm_manager.py tests/test_cli.py tests/test_api.py tests/test_server.py tests/test_public_contract.py
- UV_CACHE_DIR=.uv-cache make check
- UV_CACHE_DIR=.uv-cache make dist-check
- real guest-backed smoke: task create --source-path, task exec -- cat note.txt, task delete
2026-03-11 21:45:38 -03:00

136 lines
3.9 KiB
Python

from __future__ import annotations
import asyncio
from pathlib import Path
from typing import Any, cast
from pyro_mcp.api import Pyro
from pyro_mcp.vm_manager import VmManager
from pyro_mcp.vm_network import TapNetworkManager
def test_pyro_run_in_vm_delegates_to_manager(tmp_path: Path) -> None:
pyro = Pyro(
manager=VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
)
result = pyro.run_in_vm(
environment="debian:12-base",
command="printf 'ok\\n'",
vcpu_count=1,
mem_mib=512,
timeout_seconds=30,
ttl_seconds=600,
network=False,
allow_host_compat=True,
)
assert int(result["exit_code"]) == 0
assert str(result["stdout"]) == "ok\n"
def test_pyro_create_server_registers_vm_run(tmp_path: Path) -> None:
pyro = Pyro(
manager=VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
)
async def _run() -> list[str]:
server = pyro.create_server()
tools = await server.list_tools()
return sorted(tool.name for tool in tools)
tool_names = asyncio.run(_run())
assert "vm_run" in tool_names
assert "vm_create" in tool_names
assert "task_create" in tool_names
def test_pyro_vm_run_tool_executes(tmp_path: Path) -> None:
pyro = Pyro(
manager=VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
)
def _extract_structured(raw_result: object) -> dict[str, Any]:
if not isinstance(raw_result, tuple) or len(raw_result) != 2:
raise TypeError("unexpected call_tool result shape")
_, structured = raw_result
if not isinstance(structured, dict):
raise TypeError("expected structured dictionary result")
return cast(dict[str, Any], structured)
async def _run() -> dict[str, Any]:
server = pyro.create_server()
return _extract_structured(
await server.call_tool(
"vm_run",
{
"environment": "debian:12-base",
"command": "printf 'ok\\n'",
"network": False,
"allow_host_compat": True,
},
)
)
result = asyncio.run(_run())
assert int(result["exit_code"]) == 0
def test_pyro_create_vm_defaults_sizing_and_host_compat(tmp_path: Path) -> None:
pyro = Pyro(
manager=VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
)
created = pyro.create_vm(
environment="debian:12-base",
allow_host_compat=True,
)
assert created["vcpu_count"] == 1
assert created["mem_mib"] == 1024
assert created["allow_host_compat"] is True
def test_pyro_task_methods_delegate_to_manager(tmp_path: Path) -> None:
pyro = Pyro(
manager=VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
)
source_dir = tmp_path / "seed"
source_dir.mkdir()
(source_dir / "note.txt").write_text("ok\n", encoding="utf-8")
created = pyro.create_task(
environment="debian:12-base",
allow_host_compat=True,
source_path=source_dir,
)
task_id = str(created["task_id"])
executed = pyro.exec_task(task_id, command="cat note.txt")
status = pyro.status_task(task_id)
logs = pyro.logs_task(task_id)
deleted = pyro.delete_task(task_id)
assert executed["stdout"] == "ok\n"
assert created["workspace_seed"]["mode"] == "directory"
assert status["command_count"] == 1
assert logs["count"] == 1
assert deleted["deleted"] is True