Bundle firecracker runtime and switch ollama demo to live logs

This commit is contained in:
Thales Maciel 2026-03-05 20:20:36 -03:00
parent ef0ddeaa11
commit 65f7c0d262
26 changed files with 1896 additions and 408 deletions

View file

@ -1,86 +1,71 @@
from __future__ import annotations
import asyncio
from collections.abc import Sequence
import json
from typing import Any
import pytest
from mcp.types import TextContent
import pyro_mcp.demo as demo_module
from pyro_mcp.demo import run_demo
from pyro_mcp.server import HELLO_STATIC_PAYLOAD
def test_run_demo_returns_static_payload() -> None:
payload = asyncio.run(run_demo())
assert payload == HELLO_STATIC_PAYLOAD
def test_run_demo_happy_path(monkeypatch: pytest.MonkeyPatch) -> None:
calls: list[tuple[str, dict[str, Any]]] = []
class StubManager:
def __init__(self) -> None:
pass
def test_run_demo_raises_for_non_text_blocks(monkeypatch: pytest.MonkeyPatch) -> None:
class StubServer:
async def call_tool(
def create_vm(
self,
name: str,
arguments: dict[str, Any],
) -> tuple[Sequence[int], dict[str, str]]:
assert name == "hello_static"
assert arguments == {}
return [123], HELLO_STATIC_PAYLOAD
*,
profile: str,
vcpu_count: int,
mem_mib: int,
ttl_seconds: int,
) -> dict[str, str]:
calls.append(
(
"create_vm",
{
"profile": profile,
"vcpu_count": vcpu_count,
"mem_mib": mem_mib,
"ttl_seconds": ttl_seconds,
},
)
)
return {"vm_id": "vm-1"}
monkeypatch.setattr(demo_module, "create_server", lambda: StubServer())
def start_vm(self, vm_id: str) -> dict[str, str]:
calls.append(("start_vm", {"vm_id": vm_id}))
return {"vm_id": vm_id}
with pytest.raises(TypeError, match="unexpected MCP content block output"):
asyncio.run(demo_module.run_demo())
def exec_vm(self, vm_id: str, *, command: str, timeout_seconds: int) -> dict[str, Any]:
calls.append(
(
"exec_vm",
{"vm_id": vm_id, "command": command, "timeout_seconds": timeout_seconds},
)
)
return {"vm_id": vm_id, "stdout": "git version 2.x", "exit_code": 0}
monkeypatch.setattr(demo_module, "VmManager", StubManager)
result = demo_module.run_demo()
assert result["exit_code"] == 0
assert calls[0][0] == "create_vm"
assert calls[1] == ("start_vm", {"vm_id": "vm-1"})
assert calls[2][0] == "exec_vm"
def test_run_demo_raises_for_non_dict_payload(monkeypatch: pytest.MonkeyPatch) -> None:
class StubServer:
async def call_tool(
self,
name: str,
arguments: dict[str, Any],
) -> tuple[list[TextContent], str]:
assert name == "hello_static"
assert arguments == {}
return [TextContent(type="text", text="x")], "bad"
monkeypatch.setattr(demo_module, "create_server", lambda: StubServer())
with pytest.raises(TypeError, match="expected a structured dictionary payload"):
asyncio.run(demo_module.run_demo())
def test_run_demo_raises_for_unexpected_payload(monkeypatch: pytest.MonkeyPatch) -> None:
class StubServer:
async def call_tool(
self,
name: str,
arguments: dict[str, Any],
) -> tuple[list[TextContent], dict[str, str]]:
assert name == "hello_static"
assert arguments == {}
return [TextContent(type="text", text="x")], {
"message": "different",
"status": "ok",
"version": "0.0.1",
}
monkeypatch.setattr(demo_module, "create_server", lambda: StubServer())
with pytest.raises(ValueError, match="static payload did not match expected value"):
asyncio.run(demo_module.run_demo())
def test_demo_main_prints_json(
monkeypatch: pytest.MonkeyPatch,
capsys: pytest.CaptureFixture[str],
def test_main_prints_json(
monkeypatch: pytest.MonkeyPatch, capsys: pytest.CaptureFixture[str]
) -> None:
async def fake_run_demo() -> dict[str, str]:
return HELLO_STATIC_PAYLOAD
monkeypatch.setattr(demo_module, "run_demo", fake_run_demo)
monkeypatch.setattr(
demo_module,
"run_demo",
lambda: {"stdout": "git version 2.x", "exit_code": 0},
)
demo_module.main()
output = capsys.readouterr().out
assert '"message": "hello from pyro_mcp"' in output
rendered = json.loads(capsys.readouterr().out)
assert rendered["exit_code"] == 0

27
tests/test_doctor.py Normal file
View file

@ -0,0 +1,27 @@
from __future__ import annotations
import argparse
import json
import pytest
import pyro_mcp.doctor as doctor_module
def test_doctor_main_prints_json(
monkeypatch: pytest.MonkeyPatch,
capsys: pytest.CaptureFixture[str],
) -> None:
class StubParser:
def parse_args(self) -> argparse.Namespace:
return argparse.Namespace(platform="linux-x86_64")
monkeypatch.setattr(doctor_module, "_build_parser", lambda: StubParser())
monkeypatch.setattr(
doctor_module,
"doctor_report",
lambda platform: {"platform": platform, "runtime_ok": True, "issues": []},
)
doctor_module.main()
output = json.loads(capsys.readouterr().out)
assert output["runtime_ok"] is True

View file

@ -4,84 +4,38 @@ import argparse
import json
import urllib.error
import urllib.request
from pathlib import Path
from typing import Any
import pytest
import pyro_mcp.ollama_demo as ollama_demo
from pyro_mcp.server import HELLO_STATIC_PAYLOAD
from pyro_mcp.vm_manager import VmManager as RealVmManager
def test_run_ollama_tool_demo_triggers_tool_and_returns_final_response(
monkeypatch: pytest.MonkeyPatch,
) -> None:
requests: list[dict[str, Any]] = []
@pytest.fixture(autouse=True)
def _mock_vm_manager_for_tests(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None:
class TestVmManager(RealVmManager):
def __init__(self) -> None:
super().__init__(backend_name="mock", base_dir=tmp_path / "vms")
def fake_post_chat_completion(base_url: str, payload: dict[str, Any]) -> dict[str, Any]:
assert base_url == "http://localhost:11434/v1"
requests.append(payload)
if len(requests) == 1:
return {
"choices": [
{
"message": {
"role": "assistant",
"content": "",
"tool_calls": [
{
"id": "call_1",
"type": "function",
"function": {"name": "hello_static", "arguments": "{}"},
}
],
}
}
]
}
monkeypatch.setattr(ollama_demo, "VmManager", TestVmManager)
def _stepwise_model_response(payload: dict[str, Any], step: int) -> dict[str, Any]:
if step == 1:
return {
"choices": [
{
"message": {
"role": "assistant",
"content": "Tool says hello from pyro_mcp.",
"content": "",
"tool_calls": [{"id": "1", "function": {"name": "vm_list_profiles"}}],
}
}
]
}
async def fake_run_demo() -> dict[str, str]:
return HELLO_STATIC_PAYLOAD
monkeypatch.setattr(ollama_demo, "_post_chat_completion", fake_post_chat_completion)
monkeypatch.setattr(ollama_demo, "run_demo", fake_run_demo)
result = ollama_demo.run_ollama_tool_demo()
assert result["tool_payload"] == HELLO_STATIC_PAYLOAD
assert result["final_response"] == "Tool says hello from pyro_mcp."
assert len(requests) == 2
assert requests[0]["tools"][0]["function"]["name"] == "hello_static"
tool_message = requests[1]["messages"][-1]
assert tool_message["role"] == "tool"
assert tool_message["tool_call_id"] == "call_1"
def test_run_ollama_tool_demo_raises_when_model_does_not_call_tool(
monkeypatch: pytest.MonkeyPatch,
) -> None:
def fake_post_chat_completion(base_url: str, payload: dict[str, Any]) -> dict[str, Any]:
del base_url, payload
return {"choices": [{"message": {"role": "assistant", "content": "No tool call."}}]}
monkeypatch.setattr(ollama_demo, "_post_chat_completion", fake_post_chat_completion)
with pytest.raises(RuntimeError, match="model did not trigger any tool call"):
ollama_demo.run_ollama_tool_demo()
def test_run_ollama_tool_demo_raises_on_unexpected_tool(monkeypatch: pytest.MonkeyPatch) -> None:
def fake_post_chat_completion(base_url: str, payload: dict[str, Any]) -> dict[str, Any]:
del base_url, payload
if step == 2:
return {
"choices": [
{
@ -90,22 +44,302 @@ def test_run_ollama_tool_demo_raises_on_unexpected_tool(monkeypatch: pytest.Monk
"content": "",
"tool_calls": [
{
"id": "call_1",
"type": "function",
"function": {"name": "unexpected_tool", "arguments": "{}"},
"id": "2",
"function": {
"name": "vm_create",
"arguments": json.dumps(
{"profile": "debian-git", "vcpu_count": 1, "mem_mib": 512}
),
},
}
],
}
}
]
}
if step == 3:
vm_id = json.loads(payload["messages"][-1]["content"])["vm_id"]
return {
"choices": [
{
"message": {
"role": "assistant",
"content": "",
"tool_calls": [
{
"id": "3",
"function": {
"name": "vm_start",
"arguments": json.dumps({"vm_id": vm_id}),
},
}
],
}
}
]
}
if step == 4:
vm_id = json.loads(payload["messages"][-1]["content"])["vm_id"]
return {
"choices": [
{
"message": {
"role": "assistant",
"content": "",
"tool_calls": [
{
"id": "4",
"function": {
"name": "vm_exec",
"arguments": json.dumps(
{
"vm_id": vm_id,
"command": "printf 'git version 2.44.0\\n'",
}
),
},
}
],
}
}
]
}
return {
"choices": [
{"message": {"role": "assistant", "content": "Executed git command in ephemeral VM."}}
]
}
def test_run_ollama_tool_demo_happy_path(monkeypatch: pytest.MonkeyPatch) -> None:
requests: list[dict[str, Any]] = []
def fake_post_chat_completion(base_url: str, payload: dict[str, Any]) -> dict[str, Any]:
assert base_url == "http://localhost:11434/v1"
requests.append(payload)
return _stepwise_model_response(payload, len(requests))
monkeypatch.setattr(ollama_demo, "_post_chat_completion", fake_post_chat_completion)
with pytest.raises(RuntimeError, match="unexpected tool requested by model"):
logs: list[str] = []
result = ollama_demo.run_ollama_tool_demo(log=logs.append)
assert result["fallback_used"] is False
assert "git version" in str(result["exec_result"]["stdout"])
assert result["final_response"] == "Executed git command in ephemeral VM."
assert len(result["tool_events"]) == 4
assert any("[tool] calling vm_exec" in line for line in logs)
def test_run_ollama_tool_demo_recovers_from_bad_vm_id(
monkeypatch: pytest.MonkeyPatch,
) -> None:
requests: list[dict[str, Any]] = []
def fake_post_chat_completion(base_url: str, payload: dict[str, Any]) -> dict[str, Any]:
assert base_url == "http://localhost:11434/v1"
requests.append(payload)
step = len(requests)
if step == 1:
return {
"choices": [
{
"message": {
"role": "assistant",
"tool_calls": [
{
"id": "1",
"function": {
"name": "vm_exec",
"arguments": json.dumps(
{
"vm_id": "vm_list_profiles",
"command": "git --version",
}
),
},
}
],
}
}
]
}
return _stepwise_model_response(payload, step - 1)
monkeypatch.setattr(ollama_demo, "_post_chat_completion", fake_post_chat_completion)
result = ollama_demo.run_ollama_tool_demo()
first_event = result["tool_events"][0]
assert first_event["tool_name"] == "vm_exec"
assert first_event["success"] is False
assert "does not exist" in str(first_event["result"]["error"])
assert int(result["exec_result"]["exit_code"]) == 0
def test_run_ollama_tool_demo_raises_without_vm_exec(monkeypatch: pytest.MonkeyPatch) -> None:
def fake_post_chat_completion(base_url: str, payload: dict[str, Any]) -> dict[str, Any]:
del base_url, payload
return {"choices": [{"message": {"role": "assistant", "content": "No tools"}}]}
monkeypatch.setattr(ollama_demo, "_post_chat_completion", fake_post_chat_completion)
with pytest.raises(RuntimeError, match="did not execute a successful vm_exec"):
ollama_demo.run_ollama_tool_demo()
def test_run_ollama_tool_demo_uses_fallback_when_not_strict(
monkeypatch: pytest.MonkeyPatch, tmp_path: Path
) -> None:
def fake_post_chat_completion(base_url: str, payload: dict[str, Any]) -> dict[str, Any]:
del base_url, payload
return {"choices": [{"message": {"role": "assistant", "content": "No tools"}}]}
class TestVmManager(RealVmManager):
def __init__(self) -> None:
super().__init__(backend_name="mock", base_dir=tmp_path / "vms")
monkeypatch.setattr(ollama_demo, "_post_chat_completion", fake_post_chat_completion)
monkeypatch.setattr(ollama_demo, "VmManager", TestVmManager)
logs: list[str] = []
result = ollama_demo.run_ollama_tool_demo(strict=False, log=logs.append)
assert result["fallback_used"] is True
assert int(result["exec_result"]["exit_code"]) == 0
assert any("[fallback]" in line for line in logs)
@pytest.mark.parametrize(
("tool_call", "error"),
[
(1, "invalid tool call entry"),
({"id": "", "function": {"name": "vm_list_profiles"}}, "valid call id"),
({"id": "1"}, "function metadata"),
({"id": "1", "function": {"name": 3}}, "name is invalid"),
],
)
def test_run_ollama_tool_demo_tool_call_validation(
monkeypatch: pytest.MonkeyPatch,
tool_call: Any,
error: str,
) -> None:
def fake_post_chat_completion(base_url: str, payload: dict[str, Any]) -> dict[str, Any]:
del base_url, payload
return {"choices": [{"message": {"role": "assistant", "tool_calls": [tool_call]}}]}
monkeypatch.setattr(ollama_demo, "_post_chat_completion", fake_post_chat_completion)
with pytest.raises(RuntimeError, match=error):
ollama_demo.run_ollama_tool_demo()
def test_run_ollama_tool_demo_max_rounds(monkeypatch: pytest.MonkeyPatch) -> None:
def fake_post_chat_completion(base_url: str, payload: dict[str, Any]) -> dict[str, Any]:
del base_url, payload
return {
"choices": [
{
"message": {
"role": "assistant",
"tool_calls": [{"id": "1", "function": {"name": "vm_list_profiles"}}],
}
}
]
}
monkeypatch.setattr(ollama_demo, "_post_chat_completion", fake_post_chat_completion)
with pytest.raises(RuntimeError, match="exceeded maximum rounds"):
ollama_demo.run_ollama_tool_demo()
@pytest.mark.parametrize(
("exec_result", "error"),
[
("bad", "result shape is invalid"),
({"exit_code": 1, "stdout": "git version 2"}, "expected exit_code=0"),
({"exit_code": 0, "stdout": "no git"}, "did not contain `git version`"),
],
)
def test_run_ollama_tool_demo_exec_result_validation(
monkeypatch: pytest.MonkeyPatch,
exec_result: Any,
error: str,
) -> None:
responses: list[dict[str, Any]] = [
{
"choices": [
{
"message": {
"role": "assistant",
"tool_calls": [
{"id": "1", "function": {"name": "vm_exec", "arguments": "{}"}}
],
}
}
]
},
{"choices": [{"message": {"role": "assistant", "content": "done"}}]},
]
def fake_post_chat_completion(base_url: str, payload: dict[str, Any]) -> dict[str, Any]:
del base_url, payload
return responses.pop(0)
def fake_dispatch(manager: Any, tool_name: str, arguments: dict[str, Any]) -> Any:
del manager, arguments
if tool_name == "vm_exec":
return exec_result
return {"ok": True}
monkeypatch.setattr(ollama_demo, "_post_chat_completion", fake_post_chat_completion)
monkeypatch.setattr(ollama_demo, "_dispatch_tool_call", fake_dispatch)
with pytest.raises(RuntimeError, match=error):
ollama_demo.run_ollama_tool_demo()
def test_dispatch_tool_call_coverage(tmp_path: Path) -> None:
manager = RealVmManager(backend_name="mock", base_dir=tmp_path / "vms")
profiles = ollama_demo._dispatch_tool_call(manager, "vm_list_profiles", {})
assert "profiles" in profiles
created = ollama_demo._dispatch_tool_call(
manager,
"vm_create",
{"profile": "debian-base", "vcpu_count": 1, "mem_mib": 512},
)
vm_id = str(created["vm_id"])
started = ollama_demo._dispatch_tool_call(manager, "vm_start", {"vm_id": vm_id})
assert started["state"] == "started"
status = ollama_demo._dispatch_tool_call(manager, "vm_status", {"vm_id": vm_id})
assert status["vm_id"] == vm_id
executed = ollama_demo._dispatch_tool_call(
manager, "vm_exec", {"vm_id": vm_id, "command": "printf 'git version\\n'"}
)
assert int(executed["exit_code"]) == 0
with pytest.raises(RuntimeError, match="unexpected tool requested by model"):
ollama_demo._dispatch_tool_call(manager, "nope", {})
def test_format_tool_error() -> None:
error = ValueError("bad args")
result = ollama_demo._format_tool_error("vm_exec", {"vm_id": "x"}, error)
assert result["ok"] is False
assert result["error_type"] == "ValueError"
@pytest.mark.parametrize(
("arguments", "error"),
[
({}, "must be a non-empty string"),
({"k": 3}, "must be a non-empty string"),
],
)
def test_require_str(arguments: dict[str, Any], error: str) -> None:
with pytest.raises(ValueError, match=error):
ollama_demo._require_str(arguments, "k")
def test_require_int_validation() -> None:
with pytest.raises(ValueError, match="must be an integer"):
ollama_demo._require_int({"k": "1"}, "k")
assert ollama_demo._require_int({"k": 1}, "k") == 1
def test_post_chat_completion_success(monkeypatch: pytest.MonkeyPatch) -> None:
class StubResponse:
def __enter__(self) -> StubResponse:
@ -118,32 +352,23 @@ def test_post_chat_completion_success(monkeypatch: pytest.MonkeyPatch) -> None:
return b'{"ok": true}'
def fake_urlopen(request: Any, timeout: int) -> StubResponse:
assert timeout == 60
assert timeout == 90
assert request.full_url == "http://localhost:11434/v1/chat/completions"
return StubResponse()
monkeypatch.setattr(urllib.request, "urlopen", fake_urlopen)
result = ollama_demo._post_chat_completion("http://localhost:11434/v1", {"x": 1})
assert result == {"ok": True}
assert ollama_demo._post_chat_completion("http://localhost:11434/v1", {"x": 1}) == {"ok": True}
def test_post_chat_completion_raises_for_ollama_connection_error(
monkeypatch: pytest.MonkeyPatch,
) -> None:
def fake_urlopen(request: Any, timeout: int) -> Any:
def test_post_chat_completion_errors(monkeypatch: pytest.MonkeyPatch) -> None:
def fake_urlopen_error(request: Any, timeout: int) -> Any:
del request, timeout
raise urllib.error.URLError("boom")
monkeypatch.setattr(urllib.request, "urlopen", fake_urlopen)
monkeypatch.setattr(urllib.request, "urlopen", fake_urlopen_error)
with pytest.raises(RuntimeError, match="failed to call Ollama"):
ollama_demo._post_chat_completion("http://localhost:11434/v1", {"x": 1})
def test_post_chat_completion_raises_for_non_object_response(
monkeypatch: pytest.MonkeyPatch,
) -> None:
class StubResponse:
def __enter__(self) -> StubResponse:
return self
@ -152,89 +377,53 @@ def test_post_chat_completion_raises_for_non_object_response(
del exc_type, exc, tb
def read(self) -> bytes:
return b'["not-an-object"]'
return b'["bad"]'
def fake_urlopen(request: Any, timeout: int) -> StubResponse:
def fake_urlopen_non_object(request: Any, timeout: int) -> StubResponse:
del request, timeout
return StubResponse()
monkeypatch.setattr(urllib.request, "urlopen", fake_urlopen)
monkeypatch.setattr(urllib.request, "urlopen", fake_urlopen_non_object)
with pytest.raises(TypeError, match="unexpected Ollama response shape"):
ollama_demo._post_chat_completion("http://localhost:11434/v1", {"x": 1})
@pytest.mark.parametrize(
("response", "expected_error"),
("raw", "expected"),
[(None, {}), ({}, {}), ("", {}), ('{"a":1}', {"a": 1})],
)
def test_parse_tool_arguments(raw: Any, expected: dict[str, Any]) -> None:
assert ollama_demo._parse_tool_arguments(raw) == expected
def test_parse_tool_arguments_invalid() -> None:
with pytest.raises(TypeError, match="decode to an object"):
ollama_demo._parse_tool_arguments("[]")
with pytest.raises(TypeError, match="dictionary or JSON object string"):
ollama_demo._parse_tool_arguments(3)
@pytest.mark.parametrize(
("response", "msg"),
[
({}, "did not contain completion choices"),
({"choices": [1]}, "unexpected completion choice format"),
({"choices": [{"message": "bad"}]}, "did not contain a message"),
],
)
def test_extract_message_validation_errors(
response: dict[str, Any],
expected_error: str,
) -> None:
with pytest.raises(RuntimeError, match=expected_error):
def test_extract_message_validation(response: dict[str, Any], msg: str) -> None:
with pytest.raises(RuntimeError, match=msg):
ollama_demo._extract_message(response)
def test_parse_tool_arguments_variants() -> None:
assert ollama_demo._parse_tool_arguments(None) == {}
assert ollama_demo._parse_tool_arguments({}) == {}
assert ollama_demo._parse_tool_arguments("") == {}
assert ollama_demo._parse_tool_arguments('{"a": 1}') == {"a": 1}
def test_build_parser_defaults() -> None:
parser = ollama_demo._build_parser()
args = parser.parse_args([])
assert args.model == ollama_demo.DEFAULT_OLLAMA_MODEL
assert args.base_url == ollama_demo.DEFAULT_OLLAMA_BASE_URL
def test_parse_tool_arguments_rejects_invalid_types() -> None:
with pytest.raises(TypeError, match="must decode to an object"):
ollama_demo._parse_tool_arguments("[]")
with pytest.raises(TypeError, match="must be a dictionary or JSON object string"):
ollama_demo._parse_tool_arguments(123)
@pytest.mark.parametrize(
("tool_call", "expected_error"),
[
(1, "invalid tool call entry"),
({"id": "c1"}, "did not include function metadata"),
(
{"id": "c1", "function": {"name": "hello_static", "arguments": '{"x": 1}'}},
"does not accept arguments",
),
(
{"id": "", "function": {"name": "hello_static", "arguments": "{}"}},
"did not provide a valid call id",
),
],
)
def test_run_ollama_tool_demo_validation_branches(
monkeypatch: pytest.MonkeyPatch,
tool_call: Any,
expected_error: str,
) -> None:
def fake_post_chat_completion(base_url: str, payload: dict[str, Any]) -> dict[str, Any]:
del base_url, payload
return {
"choices": [
{
"message": {
"role": "assistant",
"content": "",
"tool_calls": [tool_call],
}
}
]
}
monkeypatch.setattr(ollama_demo, "_post_chat_completion", fake_post_chat_completion)
with pytest.raises(RuntimeError, match=expected_error):
ollama_demo.run_ollama_tool_demo()
def test_main_uses_parser_and_prints_json(
def test_main_uses_parser_and_prints_logs(
monkeypatch: pytest.MonkeyPatch,
capsys: pytest.CaptureFixture[str],
) -> None:
@ -246,10 +435,12 @@ def test_main_uses_parser_and_prints_json(
monkeypatch.setattr(
ollama_demo,
"run_ollama_tool_demo",
lambda base_url, model: {"base_url": base_url, "model": model},
lambda base_url, model, strict=True, log=None: {
"exec_result": {"exit_code": 0, "stdout": "git version 2.44.0\n"},
"fallback_used": False,
},
)
ollama_demo.main()
output = json.loads(capsys.readouterr().out)
assert output == {"base_url": "http://x", "model": "m"}
output = capsys.readouterr().out
assert "[summary] exit_code=0 fallback_used=False" in output
assert "[summary] stdout=git version 2.44.0" in output

73
tests/test_runtime.py Normal file
View file

@ -0,0 +1,73 @@
from __future__ import annotations
import json
from pathlib import Path
import pytest
from pyro_mcp.runtime import doctor_report, resolve_runtime_paths
def test_resolve_runtime_paths_default_bundle() -> None:
paths = resolve_runtime_paths()
assert paths.firecracker_bin.exists()
assert paths.jailer_bin.exists()
assert (paths.artifacts_dir / "debian-git" / "vmlinux").exists()
assert paths.manifest.get("platform") == "linux-x86_64"
def test_resolve_runtime_paths_missing_manifest(
monkeypatch: pytest.MonkeyPatch, tmp_path: Path
) -> None:
empty_root = tmp_path / "bundle"
empty_root.mkdir(parents=True, exist_ok=True)
monkeypatch.setenv("PYRO_RUNTIME_BUNDLE_DIR", str(empty_root))
with pytest.raises(RuntimeError, match="manifest not found"):
resolve_runtime_paths()
def test_resolve_runtime_paths_checksum_mismatch(
monkeypatch: pytest.MonkeyPatch, tmp_path: Path
) -> None:
source = resolve_runtime_paths()
copied_bundle = tmp_path / "bundle"
copied_platform = copied_bundle / "linux-x86_64"
copied_platform.mkdir(parents=True, exist_ok=True)
(copied_bundle / "NOTICE").write_text(
source.notice_path.read_text(encoding="utf-8"), encoding="utf-8"
)
manifest = json.loads(source.manifest_path.read_text(encoding="utf-8"))
(copied_platform / "manifest.json").write_text(
json.dumps(manifest, indent=2),
encoding="utf-8",
)
firecracker_path = copied_platform / "bin" / "firecracker"
firecracker_path.parent.mkdir(parents=True, exist_ok=True)
firecracker_path.write_text("tampered\n", encoding="utf-8")
(copied_platform / "bin" / "jailer").write_text(
(source.jailer_bin).read_text(encoding="utf-8"),
encoding="utf-8",
)
for profile in ("debian-base", "debian-git", "debian-build"):
profile_dir = copied_platform / "profiles" / profile
profile_dir.mkdir(parents=True, exist_ok=True)
for filename in ("vmlinux", "rootfs.ext4"):
source_file = source.artifacts_dir / profile / filename
(profile_dir / filename).write_text(
source_file.read_text(encoding="utf-8"), encoding="utf-8"
)
monkeypatch.setenv("PYRO_RUNTIME_BUNDLE_DIR", str(copied_bundle))
with pytest.raises(RuntimeError, match="checksum mismatch"):
resolve_runtime_paths()
def test_doctor_report_has_runtime_fields() -> None:
report = doctor_report()
assert "runtime_ok" in report
assert "kvm" in report
if report["runtime_ok"]:
runtime = report.get("runtime")
assert isinstance(runtime, dict)
assert "firecracker_bin" in runtime

View file

@ -1,36 +1,118 @@
from __future__ import annotations
import asyncio
from typing import Any
from pathlib import Path
from typing import Any, cast
import pytest
from mcp.types import TextContent
import pyro_mcp.server as server_module
from pyro_mcp.server import HELLO_STATIC_PAYLOAD, create_server
from pyro_mcp.server import create_server
from pyro_mcp.vm_manager import VmManager
def test_create_server_registers_static_tool() -> None:
def test_create_server_registers_vm_tools(tmp_path: Path) -> None:
manager = VmManager(backend_name="mock", base_dir=tmp_path / "vms")
async def _run() -> list[str]:
server = create_server()
server = create_server(manager=manager)
tools = await server.list_tools()
return [tool.name for tool in tools]
return sorted(tool.name for tool in tools)
tool_names = asyncio.run(_run())
assert "hello_static" in tool_names
assert "vm_create" in tool_names
assert "vm_exec" in tool_names
assert "vm_list_profiles" in tool_names
assert "vm_status" in tool_names
def test_hello_static_returns_expected_payload() -> None:
async def _run() -> tuple[list[TextContent], dict[str, Any]]:
server = create_server()
blocks, structured = await server.call_tool("hello_static", {})
assert isinstance(blocks, list)
assert all(isinstance(block, TextContent) for block in blocks)
assert isinstance(structured, dict)
return blocks, structured
def test_vm_tools_lifecycle_round_trip(tmp_path: Path) -> None:
manager = VmManager(backend_name="mock", base_dir=tmp_path / "vms")
_, structured_output = asyncio.run(_run())
assert structured_output == HELLO_STATIC_PAYLOAD
def _extract_structured(raw_result: object) -> dict[str, Any]:
if not isinstance(raw_result, tuple) or len(raw_result) != 2:
raise TypeError("unexpected call_tool result shape")
_, structured = raw_result
if not isinstance(structured, dict):
raise TypeError("expected structured dictionary result")
return cast(dict[str, Any], structured)
async def _run() -> dict[str, Any]:
server = create_server(manager=manager)
created = _extract_structured(
await server.call_tool(
"vm_create",
{"profile": "debian-git", "vcpu_count": 1, "mem_mib": 512, "ttl_seconds": 600},
)
)
vm_id = str(created["vm_id"])
await server.call_tool("vm_start", {"vm_id": vm_id})
executed = _extract_structured(
await server.call_tool(
"vm_exec", {"vm_id": vm_id, "command": "printf 'git version 2.0\\n'"}
)
)
return executed
executed = asyncio.run(_run())
assert int(executed["exit_code"]) == 0
assert "git version" in str(executed["stdout"])
def test_vm_tools_status_stop_delete_and_reap(tmp_path: Path) -> None:
manager = VmManager(backend_name="mock", base_dir=tmp_path / "vms")
manager.MIN_TTL_SECONDS = 1
def _extract_structured(raw_result: object) -> dict[str, Any]:
if not isinstance(raw_result, tuple) or len(raw_result) != 2:
raise TypeError("unexpected call_tool result shape")
_, structured = raw_result
if not isinstance(structured, dict):
raise TypeError("expected structured dictionary result")
return cast(dict[str, Any], structured)
async def _run() -> tuple[
dict[str, Any], dict[str, Any], dict[str, Any], list[dict[str, object]], dict[str, Any]
]:
server = create_server(manager=manager)
profiles_raw = await server.call_tool("vm_list_profiles", {})
if not isinstance(profiles_raw, tuple) or len(profiles_raw) != 2:
raise TypeError("unexpected profiles result")
_, profiles_structured = profiles_raw
if not isinstance(profiles_structured, dict):
raise TypeError("profiles tool should return a dictionary")
raw_profiles = profiles_structured.get("result")
if not isinstance(raw_profiles, list):
raise TypeError("profiles tool did not contain a result list")
created = _extract_structured(
await server.call_tool(
"vm_create",
{"profile": "debian-base", "vcpu_count": 1, "mem_mib": 512, "ttl_seconds": 600},
)
)
vm_id = str(created["vm_id"])
await server.call_tool("vm_start", {"vm_id": vm_id})
status = _extract_structured(await server.call_tool("vm_status", {"vm_id": vm_id}))
stopped = _extract_structured(await server.call_tool("vm_stop", {"vm_id": vm_id}))
deleted = _extract_structured(await server.call_tool("vm_delete", {"vm_id": vm_id}))
expiring = _extract_structured(
await server.call_tool(
"vm_create",
{"profile": "debian-base", "vcpu_count": 1, "mem_mib": 512, "ttl_seconds": 1},
)
)
expiring_id = str(expiring["vm_id"])
manager._instances[expiring_id].expires_at = 0.0 # noqa: SLF001
reaped = _extract_structured(await server.call_tool("vm_reap_expired", {}))
return status, stopped, deleted, cast(list[dict[str, object]], raw_profiles), reaped
status, stopped, deleted, profiles, reaped = asyncio.run(_run())
assert status["state"] == "started"
assert stopped["state"] == "stopped"
assert bool(deleted["deleted"]) is True
assert profiles[0]["name"] == "debian-base"
assert int(reaped["count"]) == 1
def test_server_main_runs_stdio_transport(monkeypatch: pytest.MonkeyPatch) -> None:
@ -42,5 +124,4 @@ def test_server_main_runs_stdio_transport(monkeypatch: pytest.MonkeyPatch) -> No
monkeypatch.setattr(server_module, "create_server", lambda: StubServer())
server_module.main()
assert called == {"transport": "stdio"}

164
tests/test_vm_manager.py Normal file
View file

@ -0,0 +1,164 @@
from __future__ import annotations
from pathlib import Path
from typing import Any
import pytest
import pyro_mcp.vm_manager as vm_manager_module
from pyro_mcp.runtime import resolve_runtime_paths
from pyro_mcp.vm_manager import VmManager
def test_vm_manager_lifecycle_and_auto_cleanup(tmp_path: Path) -> None:
manager = VmManager(backend_name="mock", base_dir=tmp_path / "vms")
created = manager.create_vm(profile="debian-git", vcpu_count=1, mem_mib=512, ttl_seconds=600)
vm_id = str(created["vm_id"])
started = manager.start_vm(vm_id)
assert started["state"] == "started"
executed = manager.exec_vm(vm_id, command="printf 'git version 2.43.0\\n'", timeout_seconds=30)
assert executed["exit_code"] == 0
assert "git version" in str(executed["stdout"])
with pytest.raises(ValueError, match="does not exist"):
manager.status_vm(vm_id)
def test_vm_manager_exec_timeout(tmp_path: Path) -> None:
manager = VmManager(backend_name="mock", base_dir=tmp_path / "vms")
vm_id = str(
manager.create_vm(profile="debian-base", vcpu_count=1, mem_mib=512, ttl_seconds=600)[
"vm_id"
]
)
manager.start_vm(vm_id)
result = manager.exec_vm(vm_id, command="sleep 2", timeout_seconds=1)
assert result["exit_code"] == 124
assert "timed out" in str(result["stderr"])
def test_vm_manager_stop_and_delete(tmp_path: Path) -> None:
manager = VmManager(backend_name="mock", base_dir=tmp_path / "vms")
vm_id = str(
manager.create_vm(profile="debian-base", vcpu_count=1, mem_mib=512, ttl_seconds=600)[
"vm_id"
]
)
manager.start_vm(vm_id)
stopped = manager.stop_vm(vm_id)
assert stopped["state"] == "stopped"
deleted = manager.delete_vm(vm_id)
assert deleted["deleted"] is True
def test_vm_manager_reaps_expired(tmp_path: Path) -> None:
manager = VmManager(backend_name="mock", base_dir=tmp_path / "vms")
manager.MIN_TTL_SECONDS = 1
vm_id = str(
manager.create_vm(profile="debian-base", vcpu_count=1, mem_mib=512, ttl_seconds=1)["vm_id"]
)
instance = manager._instances[vm_id] # noqa: SLF001
instance.expires_at = 0.0
result = manager.reap_expired()
assert result["count"] == 1
with pytest.raises(ValueError):
manager.status_vm(vm_id)
def test_vm_manager_reaps_started_vm(tmp_path: Path) -> None:
manager = VmManager(backend_name="mock", base_dir=tmp_path / "vms")
manager.MIN_TTL_SECONDS = 1
vm_id = str(
manager.create_vm(profile="debian-base", vcpu_count=1, mem_mib=512, ttl_seconds=1)["vm_id"]
)
manager.start_vm(vm_id)
manager._instances[vm_id].expires_at = 0.0 # noqa: SLF001
result = manager.reap_expired()
assert result["count"] == 1
@pytest.mark.parametrize(
("kwargs", "msg"),
[
({"vcpu_count": 0, "mem_mib": 512, "ttl_seconds": 600}, "vcpu_count must be between"),
({"vcpu_count": 1, "mem_mib": 64, "ttl_seconds": 600}, "mem_mib must be between"),
({"vcpu_count": 1, "mem_mib": 512, "ttl_seconds": 30}, "ttl_seconds must be between"),
],
)
def test_vm_manager_validates_limits(tmp_path: Path, kwargs: dict[str, int], msg: str) -> None:
manager = VmManager(backend_name="mock", base_dir=tmp_path / "vms")
with pytest.raises(ValueError, match=msg):
manager.create_vm(profile="debian-base", **kwargs)
def test_vm_manager_max_active_limit(tmp_path: Path) -> None:
manager = VmManager(backend_name="mock", base_dir=tmp_path / "vms", max_active_vms=1)
manager.create_vm(profile="debian-base", vcpu_count=1, mem_mib=512, ttl_seconds=600)
with pytest.raises(RuntimeError, match="max active VMs reached"):
manager.create_vm(profile="debian-base", vcpu_count=1, mem_mib=512, ttl_seconds=600)
def test_vm_manager_state_validation(tmp_path: Path) -> None:
manager = VmManager(backend_name="mock", base_dir=tmp_path / "vms")
vm_id = str(
manager.create_vm(profile="debian-base", vcpu_count=1, mem_mib=512, ttl_seconds=600)[
"vm_id"
]
)
with pytest.raises(RuntimeError, match="must be in 'started' state"):
manager.exec_vm(vm_id, command="echo hi", timeout_seconds=30)
with pytest.raises(ValueError, match="must be positive"):
manager.exec_vm(vm_id, command="echo hi", timeout_seconds=0)
manager.start_vm(vm_id)
with pytest.raises(RuntimeError, match="cannot be started from state"):
manager.start_vm(vm_id)
def test_vm_manager_status_expired_raises(tmp_path: Path) -> None:
manager = VmManager(backend_name="mock", base_dir=tmp_path / "vms")
manager.MIN_TTL_SECONDS = 1
vm_id = str(
manager.create_vm(profile="debian-base", vcpu_count=1, mem_mib=512, ttl_seconds=1)["vm_id"]
)
manager._instances[vm_id].expires_at = 0.0 # noqa: SLF001
with pytest.raises(RuntimeError, match="expired and was automatically deleted"):
manager.status_vm(vm_id)
def test_vm_manager_invalid_backend(tmp_path: Path) -> None:
with pytest.raises(ValueError, match="invalid backend"):
VmManager(backend_name="nope", base_dir=tmp_path / "vms")
def test_vm_manager_firecracker_backend_path(
tmp_path: Path, monkeypatch: pytest.MonkeyPatch
) -> None:
class StubFirecrackerBackend:
def __init__(self, artifacts_dir: Path, firecracker_bin: Path, jailer_bin: Path) -> None:
self.artifacts_dir = artifacts_dir
self.firecracker_bin = firecracker_bin
self.jailer_bin = jailer_bin
def create(self, instance: Any) -> None:
del instance
def start(self, instance: Any) -> None:
del instance
def exec(self, instance: Any, command: str, timeout_seconds: int) -> Any:
del instance, command, timeout_seconds
return None
def stop(self, instance: Any) -> None:
del instance
def delete(self, instance: Any) -> None:
del instance
monkeypatch.setattr(vm_manager_module, "FirecrackerBackend", StubFirecrackerBackend)
manager = VmManager(
backend_name="firecracker",
base_dir=tmp_path / "vms",
runtime_paths=resolve_runtime_paths(),
)
assert manager._backend_name == "firecracker" # noqa: SLF001

24
tests/test_vm_profiles.py Normal file
View file

@ -0,0 +1,24 @@
from __future__ import annotations
from pathlib import Path
import pytest
from pyro_mcp.vm_profiles import get_profile, list_profiles, resolve_artifacts
def test_list_profiles_includes_expected_entries() -> None:
profiles = list_profiles()
names = {str(entry["name"]) for entry in profiles}
assert {"debian-base", "debian-git", "debian-build"} <= names
def test_get_profile_rejects_unknown() -> None:
with pytest.raises(ValueError, match="unknown profile"):
get_profile("does-not-exist")
def test_resolve_artifacts() -> None:
artifacts = resolve_artifacts(Path("/tmp/artifacts"), "debian-git")
assert str(artifacts.kernel_image).endswith("/debian-git/vmlinux")
assert str(artifacts.rootfs_image).endswith("/debian-git/rootfs.ext4")