Add runtime capability scaffolding and align docs

This commit is contained in:
Thales Maciel 2026-03-05 22:57:09 -03:00
parent fb8b985049
commit cbf212bb7b
19 changed files with 1048 additions and 71 deletions

View file

@ -40,6 +40,10 @@ def test_run_demo_happy_path(monkeypatch: pytest.MonkeyPatch) -> None:
calls.append(("start_vm", {"vm_id": vm_id}))
return {"vm_id": vm_id}
def status_vm(self, vm_id: str) -> dict[str, Any]:
calls.append(("status_vm", {"vm_id": vm_id}))
return {"vm_id": vm_id, "network_enabled": False, "execution_mode": "host_compat"}
def exec_vm(self, vm_id: str, *, command: str, timeout_seconds: int) -> dict[str, Any]:
calls.append(
(
@ -55,7 +59,13 @@ def test_run_demo_happy_path(monkeypatch: pytest.MonkeyPatch) -> None:
assert result["exit_code"] == 0
assert calls[0][0] == "create_vm"
assert calls[1] == ("start_vm", {"vm_id": "vm-1"})
assert calls[2][0] == "exec_vm"
assert calls[2] == ("status_vm", {"vm_id": "vm-1"})
assert calls[3][0] == "exec_vm"
def test_demo_command_prefers_network_probe_for_guest_vsock() -> None:
status = {"network_enabled": True, "execution_mode": "guest_vsock"}
assert "https://example.com" in demo_module._demo_command(status)
def test_main_prints_json(

View file

@ -94,7 +94,7 @@ def _stepwise_model_response(payload: dict[str, Any], step: int) -> dict[str, An
"arguments": json.dumps(
{
"vm_id": vm_id,
"command": "printf 'git version 2.44.0\\n'",
"command": "printf 'true\\n'",
}
),
},
@ -125,14 +125,14 @@ def test_run_ollama_tool_demo_happy_path(monkeypatch: pytest.MonkeyPatch) -> Non
result = ollama_demo.run_ollama_tool_demo(log=logs.append)
assert result["fallback_used"] is False
assert "git version" in str(result["exec_result"]["stdout"])
assert str(result["exec_result"]["stdout"]).strip() == "true"
assert result["final_response"] == "Executed git command in ephemeral VM."
assert len(result["tool_events"]) == 4
assert any("[model] input user:" in line for line in logs)
assert any("[model] output assistant:" in line for line in logs)
assert any(line == "[model] input user" for line in logs)
assert any(line == "[model] output assistant" for line in logs)
assert any("[model] tool_call vm_exec" in line for line in logs)
assert any("[tool] calling vm_exec" in line for line in logs)
assert any("[tool] result vm_exec " in line for line in logs)
assert any(line == "[tool] calling vm_exec" for line in logs)
assert any(line == "[tool] result vm_exec" for line in logs)
def test_run_ollama_tool_demo_recovers_from_bad_vm_id(
@ -158,7 +158,7 @@ def test_run_ollama_tool_demo_recovers_from_bad_vm_id(
"arguments": json.dumps(
{
"vm_id": "vm_list_profiles",
"command": "git --version",
"command": ollama_demo.NETWORK_PROOF_COMMAND,
}
),
},
@ -219,7 +219,7 @@ def test_run_ollama_tool_demo_resolves_vm_id_placeholder(
"arguments": json.dumps(
{
"vm_id": "<vm_id_returned_by_vm_create>",
"command": "printf 'git version 2.44.0\\n'",
"command": "printf 'true\\n'",
"timeout_seconds": "300",
}
),
@ -292,14 +292,49 @@ def test_run_ollama_tool_demo_uses_fallback_when_not_strict(
monkeypatch.setattr(ollama_demo, "_post_chat_completion", fake_post_chat_completion)
monkeypatch.setattr(ollama_demo, "VmManager", TestVmManager)
monkeypatch.setattr(
ollama_demo,
"_run_direct_lifecycle_fallback",
lambda manager: {
"vm_id": "vm-1",
"command": ollama_demo.NETWORK_PROOF_COMMAND,
"stdout": "true\n",
"stderr": "",
"exit_code": 0,
"duration_ms": 5,
"execution_mode": "host_compat",
"cleanup": {"deleted": True, "reason": "post_exec_cleanup", "vm_id": "vm-1"},
},
)
logs: list[str] = []
result = ollama_demo.run_ollama_tool_demo(strict=False, log=logs.append)
assert result["fallback_used"] is True
assert int(result["exec_result"]["exit_code"]) == 0
assert any("[model] output assistant: No tools" in line for line in logs)
assert any(line == "[model] output assistant" for line in logs)
assert any("[fallback]" in line for line in logs)
def test_run_ollama_tool_demo_verbose_logs_values(monkeypatch: pytest.MonkeyPatch) -> None:
requests = 0
def fake_post_chat_completion(base_url: str, payload: dict[str, Any]) -> dict[str, Any]:
del base_url
nonlocal requests
requests += 1
return _stepwise_model_response(payload, requests)
monkeypatch.setattr(ollama_demo, "_post_chat_completion", fake_post_chat_completion)
logs: list[str] = []
result = ollama_demo.run_ollama_tool_demo(verbose=True, log=logs.append)
assert result["fallback_used"] is False
assert str(result["exec_result"]["stdout"]).strip() == "true"
assert any("[model] input user:" in line for line in logs)
assert any("[model] tool_call vm_list_profiles args={}" in line for line in logs)
assert any("[tool] result vm_exec " in line for line in logs)
@pytest.mark.parametrize(
("tool_call", "error"),
[
@ -346,8 +381,8 @@ def test_run_ollama_tool_demo_max_rounds(monkeypatch: pytest.MonkeyPatch) -> Non
("exec_result", "error"),
[
("bad", "result shape is invalid"),
({"exit_code": 1, "stdout": "git version 2"}, "expected exit_code=0"),
({"exit_code": 0, "stdout": "no git"}, "did not contain `git version`"),
({"exit_code": 1, "stdout": "true"}, "expected exit_code=0"),
({"exit_code": 0, "stdout": "false"}, "did not confirm repository clone success"),
],
)
def test_run_ollama_tool_demo_exec_result_validation(
@ -404,7 +439,7 @@ def test_dispatch_tool_call_coverage(tmp_path: Path) -> None:
executed = ollama_demo._dispatch_tool_call(
manager,
"vm_exec",
{"vm_id": vm_id, "command": "printf 'git version\\n'", "timeout_seconds": "30"},
{"vm_id": vm_id, "command": "printf 'true\\n'", "timeout_seconds": "30"},
)
assert int(executed["exit_code"]) == 0
with pytest.raises(RuntimeError, match="unexpected tool requested by model"):
@ -529,6 +564,13 @@ def test_build_parser_defaults() -> None:
args = parser.parse_args([])
assert args.model == ollama_demo.DEFAULT_OLLAMA_MODEL
assert args.base_url == ollama_demo.DEFAULT_OLLAMA_BASE_URL
assert args.verbose is False
def test_build_parser_verbose_flag() -> None:
parser = ollama_demo._build_parser()
args = parser.parse_args(["-v"])
assert args.verbose is True
def test_main_uses_parser_and_prints_logs(
@ -537,21 +579,51 @@ def test_main_uses_parser_and_prints_logs(
) -> None:
class StubParser:
def parse_args(self) -> argparse.Namespace:
return argparse.Namespace(base_url="http://x", model="m")
return argparse.Namespace(base_url="http://x", model="m", verbose=False)
monkeypatch.setattr(ollama_demo, "_build_parser", lambda: StubParser())
monkeypatch.setattr(
ollama_demo,
"run_ollama_tool_demo",
lambda base_url, model, strict=True, log=None: {
"exec_result": {"exit_code": 0, "stdout": "git version 2.44.0\n"},
lambda base_url, model, strict=True, verbose=False, log=None: {
"exec_result": {
"exit_code": 0,
"stdout": "true\n",
"execution_mode": "host_compat",
},
"fallback_used": False,
},
)
ollama_demo.main()
output = capsys.readouterr().out
assert "[summary] exit_code=0 fallback_used=False" in output
assert "[summary] stdout=git version 2.44.0" in output
assert "[summary] exit_code=0 fallback_used=False execution_mode=host_compat" in output
assert "[summary] stdout=" not in output
def test_main_verbose_prints_stdout(
monkeypatch: pytest.MonkeyPatch,
capsys: pytest.CaptureFixture[str],
) -> None:
class StubParser:
def parse_args(self) -> argparse.Namespace:
return argparse.Namespace(base_url="http://x", model="m", verbose=True)
monkeypatch.setattr(ollama_demo, "_build_parser", lambda: StubParser())
monkeypatch.setattr(
ollama_demo,
"run_ollama_tool_demo",
lambda base_url, model, strict=True, verbose=False, log=None: {
"exec_result": {
"exit_code": 0,
"stdout": "true\n",
"execution_mode": "host_compat",
},
"fallback_used": False,
},
)
ollama_demo.main()
output = capsys.readouterr().out
assert "[summary] stdout=true" in output
def test_main_logs_error_and_exits_nonzero(
@ -560,12 +632,18 @@ def test_main_logs_error_and_exits_nonzero(
) -> None:
class StubParser:
def parse_args(self) -> argparse.Namespace:
return argparse.Namespace(base_url="http://x", model="m")
return argparse.Namespace(base_url="http://x", model="m", verbose=False)
monkeypatch.setattr(ollama_demo, "_build_parser", lambda: StubParser())
def fake_run(base_url: str, model: str, strict: bool = True, log: Any = None) -> dict[str, Any]:
del base_url, model, strict, log
def fake_run(
base_url: str,
model: str,
strict: bool = True,
verbose: bool = False,
log: Any = None,
) -> dict[str, Any]:
del base_url, model, strict, verbose, log
raise RuntimeError("demo did not execute a successful vm_exec")
monkeypatch.setattr(ollama_demo, "run_ollama_tool_demo", fake_run)

View file

@ -5,7 +5,7 @@ from pathlib import Path
import pytest
from pyro_mcp.runtime import doctor_report, resolve_runtime_paths
from pyro_mcp.runtime import doctor_report, resolve_runtime_paths, runtime_capabilities
def test_resolve_runtime_paths_default_bundle() -> None:
@ -67,7 +67,19 @@ def test_doctor_report_has_runtime_fields() -> None:
report = doctor_report()
assert "runtime_ok" in report
assert "kvm" in report
assert "networking" in report
if report["runtime_ok"]:
runtime = report.get("runtime")
assert isinstance(runtime, dict)
assert "firecracker_bin" in runtime
networking = report["networking"]
assert isinstance(networking, dict)
assert "tun_available" in networking
def test_runtime_capabilities_reports_shim_bundle() -> None:
paths = resolve_runtime_paths()
capabilities = runtime_capabilities(paths)
assert capabilities.supports_vm_boot is False
assert capabilities.supports_guest_exec is False
assert capabilities.supports_guest_network is False

View file

@ -9,10 +9,15 @@ import pytest
import pyro_mcp.server as server_module
from pyro_mcp.server import create_server
from pyro_mcp.vm_manager import VmManager
from pyro_mcp.vm_network import TapNetworkManager
def test_create_server_registers_vm_tools(tmp_path: Path) -> None:
manager = VmManager(backend_name="mock", base_dir=tmp_path / "vms")
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
async def _run() -> list[str]:
server = create_server(manager=manager)
@ -23,11 +28,16 @@ def test_create_server_registers_vm_tools(tmp_path: Path) -> None:
assert "vm_create" in tool_names
assert "vm_exec" in tool_names
assert "vm_list_profiles" in tool_names
assert "vm_network_info" in tool_names
assert "vm_status" in tool_names
def test_vm_tools_lifecycle_round_trip(tmp_path: Path) -> None:
manager = VmManager(backend_name="mock", base_dir=tmp_path / "vms")
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
def _extract_structured(raw_result: object) -> dict[str, Any]:
if not isinstance(raw_result, tuple) or len(raw_result) != 2:
@ -60,7 +70,11 @@ def test_vm_tools_lifecycle_round_trip(tmp_path: Path) -> None:
def test_vm_tools_status_stop_delete_and_reap(tmp_path: Path) -> None:
manager = VmManager(backend_name="mock", base_dir=tmp_path / "vms")
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
manager.MIN_TTL_SECONDS = 1
def _extract_structured(raw_result: object) -> dict[str, Any]:
@ -72,7 +86,12 @@ def test_vm_tools_status_stop_delete_and_reap(tmp_path: Path) -> None:
return cast(dict[str, Any], structured)
async def _run() -> tuple[
dict[str, Any], dict[str, Any], dict[str, Any], list[dict[str, object]], dict[str, Any]
dict[str, Any],
dict[str, Any],
dict[str, Any],
dict[str, Any],
list[dict[str, object]],
dict[str, Any],
]:
server = create_server(manager=manager)
profiles_raw = await server.call_tool("vm_list_profiles", {})
@ -93,6 +112,7 @@ def test_vm_tools_status_stop_delete_and_reap(tmp_path: Path) -> None:
vm_id = str(created["vm_id"])
await server.call_tool("vm_start", {"vm_id": vm_id})
status = _extract_structured(await server.call_tool("vm_status", {"vm_id": vm_id}))
network = _extract_structured(await server.call_tool("vm_network_info", {"vm_id": vm_id}))
stopped = _extract_structured(await server.call_tool("vm_stop", {"vm_id": vm_id}))
deleted = _extract_structured(await server.call_tool("vm_delete", {"vm_id": vm_id}))
@ -105,10 +125,18 @@ def test_vm_tools_status_stop_delete_and_reap(tmp_path: Path) -> None:
expiring_id = str(expiring["vm_id"])
manager._instances[expiring_id].expires_at = 0.0 # noqa: SLF001
reaped = _extract_structured(await server.call_tool("vm_reap_expired", {}))
return status, stopped, deleted, cast(list[dict[str, object]], raw_profiles), reaped
return (
status,
network,
stopped,
deleted,
cast(list[dict[str, object]], raw_profiles),
reaped,
)
status, stopped, deleted, profiles, reaped = asyncio.run(_run())
status, network, stopped, deleted, profiles, reaped = asyncio.run(_run())
assert status["state"] == "started"
assert network["network_enabled"] is False
assert stopped["state"] == "stopped"
assert bool(deleted["deleted"]) is True
assert profiles[0]["name"] == "debian-base"

View file

@ -0,0 +1,51 @@
from __future__ import annotations
import json
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any
from pyro_mcp.vm_firecracker import build_launch_plan
from pyro_mcp.vm_network import NetworkConfig
@dataclass
class StubInstance:
vm_id: str
vcpu_count: int
mem_mib: int
workdir: Path
metadata: dict[str, str] = field(default_factory=dict)
network: Any = None
def test_build_launch_plan_writes_expected_files(tmp_path: Path) -> None:
instance = StubInstance(
vm_id="abcdef123456",
vcpu_count=2,
mem_mib=2048,
workdir=tmp_path,
metadata={
"kernel_image": "/bundle/profiles/debian-git/vmlinux",
"rootfs_image": "/bundle/profiles/debian-git/rootfs.ext4",
},
network=NetworkConfig(
vm_id="abcdef123456",
tap_name="pyroabcdef12",
guest_ip="172.29.100.2",
gateway_ip="172.29.100.1",
subnet_cidr="172.29.100.0/24",
mac_address="06:00:ab:cd:ef:12",
),
)
plan = build_launch_plan(instance)
assert plan.config_path.exists()
assert plan.guest_network_path.exists()
assert plan.guest_exec_path.exists()
rendered = json.loads(plan.config_path.read_text(encoding="utf-8"))
assert rendered["machine-config"]["vcpu_count"] == 2
assert rendered["network-interfaces"][0]["host_dev_name"] == "pyroabcdef12"
guest_exec = json.loads(plan.guest_exec_path.read_text(encoding="utf-8"))
assert guest_exec["transport"] == "vsock"

64
tests/test_vm_guest.py Normal file
View file

@ -0,0 +1,64 @@
from __future__ import annotations
import socket
import pytest
from pyro_mcp.vm_guest import VsockExecClient
class StubSocket:
def __init__(self, response: bytes) -> None:
self.response = response
self.connected: tuple[int, int] | None = None
self.sent = b""
self.timeout: int | None = None
self.closed = False
def settimeout(self, timeout: int) -> None:
self.timeout = timeout
def connect(self, address: tuple[int, int]) -> None:
self.connected = address
def sendall(self, data: bytes) -> None:
self.sent += data
def recv(self, size: int) -> bytes:
del size
if self.response == b"":
return b""
data, self.response = self.response, b""
return data
def close(self) -> None:
self.closed = True
def test_vsock_exec_client_round_trip(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setattr(socket, "AF_VSOCK", 40, raising=False)
stub = StubSocket(
b'{"stdout":"ok\\n","stderr":"","exit_code":0,"duration_ms":7}'
)
def socket_factory(family: int, sock_type: int) -> StubSocket:
assert family == socket.AF_VSOCK
assert sock_type == socket.SOCK_STREAM
return stub
client = VsockExecClient(socket_factory=socket_factory)
response = client.exec(1234, 5005, "echo ok", 30)
assert response.exit_code == 0
assert response.stdout == "ok\n"
assert stub.connected == (1234, 5005)
assert b'"command": "echo ok"' in stub.sent
assert stub.closed is True
def test_vsock_exec_client_rejects_bad_json(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setattr(socket, "AF_VSOCK", 40, raising=False)
stub = StubSocket(b"[]")
client = VsockExecClient(socket_factory=lambda family, sock_type: stub)
with pytest.raises(RuntimeError, match="JSON object"):
client.exec(1234, 5005, "echo ok", 30)

View file

@ -8,10 +8,15 @@ import pytest
import pyro_mcp.vm_manager as vm_manager_module
from pyro_mcp.runtime import resolve_runtime_paths
from pyro_mcp.vm_manager import VmManager
from pyro_mcp.vm_network import TapNetworkManager
def test_vm_manager_lifecycle_and_auto_cleanup(tmp_path: Path) -> None:
manager = VmManager(backend_name="mock", base_dir=tmp_path / "vms")
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
created = manager.create_vm(profile="debian-git", vcpu_count=1, mem_mib=512, ttl_seconds=600)
vm_id = str(created["vm_id"])
started = manager.start_vm(vm_id)
@ -19,13 +24,18 @@ def test_vm_manager_lifecycle_and_auto_cleanup(tmp_path: Path) -> None:
executed = manager.exec_vm(vm_id, command="printf 'git version 2.43.0\\n'", timeout_seconds=30)
assert executed["exit_code"] == 0
assert executed["execution_mode"] == "host_compat"
assert "git version" in str(executed["stdout"])
with pytest.raises(ValueError, match="does not exist"):
manager.status_vm(vm_id)
def test_vm_manager_exec_timeout(tmp_path: Path) -> None:
manager = VmManager(backend_name="mock", base_dir=tmp_path / "vms")
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
vm_id = str(
manager.create_vm(profile="debian-base", vcpu_count=1, mem_mib=512, ttl_seconds=600)[
"vm_id"
@ -38,7 +48,11 @@ def test_vm_manager_exec_timeout(tmp_path: Path) -> None:
def test_vm_manager_stop_and_delete(tmp_path: Path) -> None:
manager = VmManager(backend_name="mock", base_dir=tmp_path / "vms")
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
vm_id = str(
manager.create_vm(profile="debian-base", vcpu_count=1, mem_mib=512, ttl_seconds=600)[
"vm_id"
@ -52,7 +66,11 @@ def test_vm_manager_stop_and_delete(tmp_path: Path) -> None:
def test_vm_manager_reaps_expired(tmp_path: Path) -> None:
manager = VmManager(backend_name="mock", base_dir=tmp_path / "vms")
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
manager.MIN_TTL_SECONDS = 1
vm_id = str(
manager.create_vm(profile="debian-base", vcpu_count=1, mem_mib=512, ttl_seconds=1)["vm_id"]
@ -66,7 +84,11 @@ def test_vm_manager_reaps_expired(tmp_path: Path) -> None:
def test_vm_manager_reaps_started_vm(tmp_path: Path) -> None:
manager = VmManager(backend_name="mock", base_dir=tmp_path / "vms")
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
manager.MIN_TTL_SECONDS = 1
vm_id = str(
manager.create_vm(profile="debian-base", vcpu_count=1, mem_mib=512, ttl_seconds=1)["vm_id"]
@ -86,20 +108,33 @@ def test_vm_manager_reaps_started_vm(tmp_path: Path) -> None:
],
)
def test_vm_manager_validates_limits(tmp_path: Path, kwargs: dict[str, int], msg: str) -> None:
manager = VmManager(backend_name="mock", base_dir=tmp_path / "vms")
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
with pytest.raises(ValueError, match=msg):
manager.create_vm(profile="debian-base", **kwargs)
def test_vm_manager_max_active_limit(tmp_path: Path) -> None:
manager = VmManager(backend_name="mock", base_dir=tmp_path / "vms", max_active_vms=1)
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
max_active_vms=1,
network_manager=TapNetworkManager(enabled=False),
)
manager.create_vm(profile="debian-base", vcpu_count=1, mem_mib=512, ttl_seconds=600)
with pytest.raises(RuntimeError, match="max active VMs reached"):
manager.create_vm(profile="debian-base", vcpu_count=1, mem_mib=512, ttl_seconds=600)
def test_vm_manager_state_validation(tmp_path: Path) -> None:
manager = VmManager(backend_name="mock", base_dir=tmp_path / "vms")
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
vm_id = str(
manager.create_vm(profile="debian-base", vcpu_count=1, mem_mib=512, ttl_seconds=600)[
"vm_id"
@ -115,7 +150,11 @@ def test_vm_manager_state_validation(tmp_path: Path) -> None:
def test_vm_manager_status_expired_raises(tmp_path: Path) -> None:
manager = VmManager(backend_name="mock", base_dir=tmp_path / "vms")
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
manager.MIN_TTL_SECONDS = 1
vm_id = str(
manager.create_vm(profile="debian-base", vcpu_count=1, mem_mib=512, ttl_seconds=1)["vm_id"]
@ -127,17 +166,45 @@ def test_vm_manager_status_expired_raises(tmp_path: Path) -> None:
def test_vm_manager_invalid_backend(tmp_path: Path) -> None:
with pytest.raises(ValueError, match="invalid backend"):
VmManager(backend_name="nope", base_dir=tmp_path / "vms")
VmManager(
backend_name="nope",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
def test_vm_manager_network_info(tmp_path: Path) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
created = manager.create_vm(profile="debian-base", vcpu_count=1, mem_mib=512, ttl_seconds=600)
vm_id = str(created["vm_id"])
status = manager.status_vm(vm_id)
info = manager.network_info_vm(vm_id)
assert status["network_enabled"] is False
assert status["guest_ip"] is None
assert info["network_enabled"] is False
def test_vm_manager_firecracker_backend_path(
tmp_path: Path, monkeypatch: pytest.MonkeyPatch
) -> None:
class StubFirecrackerBackend:
def __init__(self, artifacts_dir: Path, firecracker_bin: Path, jailer_bin: Path) -> None:
def __init__(
self,
artifacts_dir: Path,
firecracker_bin: Path,
jailer_bin: Path,
runtime_capabilities: Any,
network_manager: TapNetworkManager,
) -> None:
self.artifacts_dir = artifacts_dir
self.firecracker_bin = firecracker_bin
self.jailer_bin = jailer_bin
self.runtime_capabilities = runtime_capabilities
self.network_manager = network_manager
def create(self, instance: Any) -> None:
del instance
@ -160,5 +227,6 @@ def test_vm_manager_firecracker_backend_path(
backend_name="firecracker",
base_dir=tmp_path / "vms",
runtime_paths=resolve_runtime_paths(),
network_manager=TapNetworkManager(enabled=False),
)
assert manager._backend_name == "firecracker" # noqa: SLF001

60
tests/test_vm_network.py Normal file
View file

@ -0,0 +1,60 @@
from __future__ import annotations
import subprocess
from pathlib import Path
import pytest
from pyro_mcp.vm_network import NetworkConfig, TapNetworkManager
def test_tap_network_manager_allocation_disabled_by_default(
monkeypatch: pytest.MonkeyPatch,
) -> None:
monkeypatch.delenv("PYRO_VM_ENABLE_NETWORK", raising=False)
manager = TapNetworkManager()
config = manager.allocate("abcdef123456")
assert manager.enabled is False
assert config.tap_name == "pyroabcdef12"
assert config.guest_ip.startswith("172.29.")
metadata = manager.to_metadata(config)
assert metadata["network_enabled"] == "false"
def test_tap_network_manager_network_info() -> None:
manager = TapNetworkManager(enabled=False)
config = NetworkConfig(
vm_id="abcdef123456",
tap_name="pyroabcdef12",
guest_ip="172.29.100.2",
gateway_ip="172.29.100.1",
subnet_cidr="172.29.100.0/24",
mac_address="06:00:ab:cd:ef:12",
)
info = manager.network_info(config)
assert info["tap_name"] == "pyroabcdef12"
assert info["outbound_connectivity_expected"] is False
def test_tap_network_manager_enabled_runs_host_commands() -> None:
commands: list[list[str]] = []
def runner(command: list[str]) -> subprocess.CompletedProcess[str]:
commands.append(command)
return subprocess.CompletedProcess(command, 0, "", "")
manager = TapNetworkManager(enabled=True, runner=runner)
config = manager.allocate("abcdef123456")
manager.cleanup(config)
assert commands[0][:4] == ["ip", "tuntap", "add", "dev"]
assert commands[-1][:3] in (["ip", "link", "del"], ["nft", "delete", "table"])
def test_tap_network_manager_missing_host_support(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setattr(Path, "exists", lambda self: False if str(self) == "/dev/net/tun" else True)
manager = TapNetworkManager(
enabled=True,
runner=lambda command: subprocess.CompletedProcess(command, 0, "", ""),
)
with pytest.raises(RuntimeError, match="/dev/net/tun"):
manager.allocate("abcdef123456")