Unify public UX around pyro CLI and Pyro facade
This commit is contained in:
parent
d16aadd03f
commit
23a2dfb330
19 changed files with 936 additions and 407 deletions
85
tests/test_api.py
Normal file
85
tests/test_api.py
Normal file
|
|
@ -0,0 +1,85 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
from pathlib import Path
|
||||
from typing import Any, cast
|
||||
|
||||
from pyro_mcp.api import Pyro
|
||||
from pyro_mcp.vm_manager import VmManager
|
||||
from pyro_mcp.vm_network import TapNetworkManager
|
||||
|
||||
|
||||
def test_pyro_run_in_vm_delegates_to_manager(tmp_path: Path) -> None:
|
||||
pyro = Pyro(
|
||||
manager=VmManager(
|
||||
backend_name="mock",
|
||||
base_dir=tmp_path / "vms",
|
||||
network_manager=TapNetworkManager(enabled=False),
|
||||
)
|
||||
)
|
||||
result = pyro.run_in_vm(
|
||||
profile="debian-base",
|
||||
command="printf 'ok\\n'",
|
||||
vcpu_count=1,
|
||||
mem_mib=512,
|
||||
timeout_seconds=30,
|
||||
ttl_seconds=600,
|
||||
network=False,
|
||||
)
|
||||
assert int(result["exit_code"]) == 0
|
||||
assert str(result["stdout"]) == "ok\n"
|
||||
|
||||
|
||||
def test_pyro_create_server_registers_vm_run(tmp_path: Path) -> None:
|
||||
pyro = Pyro(
|
||||
manager=VmManager(
|
||||
backend_name="mock",
|
||||
base_dir=tmp_path / "vms",
|
||||
network_manager=TapNetworkManager(enabled=False),
|
||||
)
|
||||
)
|
||||
|
||||
async def _run() -> list[str]:
|
||||
server = pyro.create_server()
|
||||
tools = await server.list_tools()
|
||||
return sorted(tool.name for tool in tools)
|
||||
|
||||
tool_names = asyncio.run(_run())
|
||||
assert "vm_run" in tool_names
|
||||
assert "vm_create" in tool_names
|
||||
|
||||
|
||||
def test_pyro_vm_run_tool_executes(tmp_path: Path) -> None:
|
||||
pyro = Pyro(
|
||||
manager=VmManager(
|
||||
backend_name="mock",
|
||||
base_dir=tmp_path / "vms",
|
||||
network_manager=TapNetworkManager(enabled=False),
|
||||
)
|
||||
)
|
||||
|
||||
def _extract_structured(raw_result: object) -> dict[str, Any]:
|
||||
if not isinstance(raw_result, tuple) or len(raw_result) != 2:
|
||||
raise TypeError("unexpected call_tool result shape")
|
||||
_, structured = raw_result
|
||||
if not isinstance(structured, dict):
|
||||
raise TypeError("expected structured dictionary result")
|
||||
return cast(dict[str, Any], structured)
|
||||
|
||||
async def _run() -> dict[str, Any]:
|
||||
server = pyro.create_server()
|
||||
return _extract_structured(
|
||||
await server.call_tool(
|
||||
"vm_run",
|
||||
{
|
||||
"profile": "debian-base",
|
||||
"command": "printf 'ok\\n'",
|
||||
"vcpu_count": 1,
|
||||
"mem_mib": 512,
|
||||
"network": False,
|
||||
},
|
||||
)
|
||||
)
|
||||
|
||||
result = asyncio.run(_run())
|
||||
assert int(result["exit_code"]) == 0
|
||||
89
tests/test_cli.py
Normal file
89
tests/test_cli.py
Normal file
|
|
@ -0,0 +1,89 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
from typing import Any
|
||||
|
||||
import pytest
|
||||
|
||||
import pyro_mcp.cli as cli
|
||||
|
||||
|
||||
def test_cli_run_prints_json(
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
capsys: pytest.CaptureFixture[str],
|
||||
) -> None:
|
||||
class StubPyro:
|
||||
def run_in_vm(self, **kwargs: Any) -> dict[str, Any]:
|
||||
assert kwargs["network"] is True
|
||||
assert kwargs["command"] == "echo hi"
|
||||
return {"exit_code": 0, "stdout": "hi\n"}
|
||||
|
||||
class StubParser:
|
||||
def parse_args(self) -> argparse.Namespace:
|
||||
return argparse.Namespace(
|
||||
command="run",
|
||||
profile="debian-git",
|
||||
vcpu_count=1,
|
||||
mem_mib=512,
|
||||
timeout_seconds=30,
|
||||
ttl_seconds=600,
|
||||
network=True,
|
||||
command_args=["--", "echo", "hi"],
|
||||
)
|
||||
|
||||
monkeypatch.setattr(cli, "_build_parser", lambda: StubParser())
|
||||
monkeypatch.setattr(cli, "Pyro", StubPyro)
|
||||
cli.main()
|
||||
output = json.loads(capsys.readouterr().out)
|
||||
assert output["exit_code"] == 0
|
||||
|
||||
|
||||
def test_cli_doctor_prints_json(
|
||||
monkeypatch: pytest.MonkeyPatch, capsys: pytest.CaptureFixture[str]
|
||||
) -> None:
|
||||
class StubParser:
|
||||
def parse_args(self) -> argparse.Namespace:
|
||||
return argparse.Namespace(command="doctor", platform="linux-x86_64")
|
||||
|
||||
monkeypatch.setattr(cli, "_build_parser", lambda: StubParser())
|
||||
monkeypatch.setattr(
|
||||
cli,
|
||||
"doctor_report",
|
||||
lambda platform: {"platform": platform, "runtime_ok": True},
|
||||
)
|
||||
cli.main()
|
||||
output = json.loads(capsys.readouterr().out)
|
||||
assert output["runtime_ok"] is True
|
||||
|
||||
|
||||
def test_cli_demo_ollama_prints_summary(
|
||||
monkeypatch: pytest.MonkeyPatch, capsys: pytest.CaptureFixture[str]
|
||||
) -> None:
|
||||
class StubParser:
|
||||
def parse_args(self) -> argparse.Namespace:
|
||||
return argparse.Namespace(
|
||||
command="demo",
|
||||
demo_command="ollama",
|
||||
base_url="http://localhost:11434/v1",
|
||||
model="llama3.2:3b",
|
||||
verbose=False,
|
||||
)
|
||||
|
||||
monkeypatch.setattr(cli, "_build_parser", lambda: StubParser())
|
||||
monkeypatch.setattr(
|
||||
cli,
|
||||
"run_ollama_tool_demo",
|
||||
lambda **kwargs: {
|
||||
"exec_result": {"exit_code": 0, "execution_mode": "guest_vsock", "stdout": "true\n"},
|
||||
"fallback_used": False,
|
||||
},
|
||||
)
|
||||
cli.main()
|
||||
output = capsys.readouterr().out
|
||||
assert "[summary] exit_code=0 fallback_used=False execution_mode=guest_vsock" in output
|
||||
|
||||
|
||||
def test_cli_requires_run_command() -> None:
|
||||
with pytest.raises(ValueError, match="command is required"):
|
||||
cli._require_command([])
|
||||
|
|
@ -11,56 +11,55 @@ import pyro_mcp.demo as demo_module
|
|||
def test_run_demo_happy_path(monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
calls: list[tuple[str, dict[str, Any]]] = []
|
||||
|
||||
class StubManager:
|
||||
class StubPyro:
|
||||
def __init__(self) -> None:
|
||||
pass
|
||||
|
||||
def create_vm(
|
||||
def run_in_vm(
|
||||
self,
|
||||
*,
|
||||
profile: str,
|
||||
command: str,
|
||||
vcpu_count: int,
|
||||
mem_mib: int,
|
||||
timeout_seconds: int,
|
||||
ttl_seconds: int,
|
||||
) -> dict[str, str]:
|
||||
network: bool,
|
||||
) -> dict[str, Any]:
|
||||
calls.append(
|
||||
(
|
||||
"create_vm",
|
||||
"run_in_vm",
|
||||
{
|
||||
"profile": profile,
|
||||
"command": command,
|
||||
"vcpu_count": vcpu_count,
|
||||
"mem_mib": mem_mib,
|
||||
"timeout_seconds": timeout_seconds,
|
||||
"ttl_seconds": ttl_seconds,
|
||||
"network": network,
|
||||
},
|
||||
)
|
||||
)
|
||||
return {"vm_id": "vm-1"}
|
||||
return {"vm_id": "vm-1", "stdout": "git version 2.x", "exit_code": 0}
|
||||
|
||||
def start_vm(self, vm_id: str) -> dict[str, str]:
|
||||
calls.append(("start_vm", {"vm_id": vm_id}))
|
||||
return {"vm_id": vm_id}
|
||||
|
||||
def status_vm(self, vm_id: str) -> dict[str, Any]:
|
||||
calls.append(("status_vm", {"vm_id": vm_id}))
|
||||
return {"vm_id": vm_id, "network_enabled": False, "execution_mode": "host_compat"}
|
||||
|
||||
def exec_vm(self, vm_id: str, *, command: str, timeout_seconds: int) -> dict[str, Any]:
|
||||
calls.append(
|
||||
(
|
||||
"exec_vm",
|
||||
{"vm_id": vm_id, "command": command, "timeout_seconds": timeout_seconds},
|
||||
)
|
||||
)
|
||||
return {"vm_id": vm_id, "stdout": "git version 2.x", "exit_code": 0}
|
||||
|
||||
monkeypatch.setattr(demo_module, "VmManager", StubManager)
|
||||
monkeypatch.setattr(demo_module, "Pyro", StubPyro)
|
||||
result = demo_module.run_demo()
|
||||
|
||||
assert result["exit_code"] == 0
|
||||
assert calls[0][0] == "create_vm"
|
||||
assert calls[1] == ("start_vm", {"vm_id": "vm-1"})
|
||||
assert calls[2] == ("status_vm", {"vm_id": "vm-1"})
|
||||
assert calls[3][0] == "exec_vm"
|
||||
assert calls == [
|
||||
(
|
||||
"run_in_vm",
|
||||
{
|
||||
"profile": "debian-git",
|
||||
"command": "git --version",
|
||||
"vcpu_count": 1,
|
||||
"mem_mib": 512,
|
||||
"timeout_seconds": 30,
|
||||
"ttl_seconds": 600,
|
||||
"network": False,
|
||||
},
|
||||
)
|
||||
]
|
||||
|
||||
|
||||
def test_demo_command_prefers_network_probe_for_guest_vsock() -> None:
|
||||
|
|
@ -79,3 +78,20 @@ def test_main_prints_json(
|
|||
demo_module.main()
|
||||
rendered = json.loads(capsys.readouterr().out)
|
||||
assert rendered["exit_code"] == 0
|
||||
|
||||
|
||||
def test_run_demo_network_uses_probe(monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
captured: dict[str, Any] = {}
|
||||
|
||||
class StubPyro:
|
||||
def __init__(self) -> None:
|
||||
pass
|
||||
|
||||
def run_in_vm(self, **kwargs: Any) -> dict[str, Any]:
|
||||
captured.update(kwargs)
|
||||
return {"exit_code": 0}
|
||||
|
||||
monkeypatch.setattr(demo_module, "Pyro", StubPyro)
|
||||
demo_module.run_demo(network=True)
|
||||
assert "https://example.com" in str(captured["command"])
|
||||
assert captured["network"] is True
|
||||
|
|
|
|||
|
|
@ -10,16 +10,17 @@ from typing import Any
|
|||
import pytest
|
||||
|
||||
import pyro_mcp.ollama_demo as ollama_demo
|
||||
from pyro_mcp.api import Pyro as RealPyro
|
||||
from pyro_mcp.vm_manager import VmManager as RealVmManager
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def _mock_vm_manager_for_tests(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None:
|
||||
class TestVmManager(RealVmManager):
|
||||
class TestPyro(RealPyro):
|
||||
def __init__(self) -> None:
|
||||
super().__init__(backend_name="mock", base_dir=tmp_path / "vms")
|
||||
super().__init__(manager=RealVmManager(backend_name="mock", base_dir=tmp_path / "vms"))
|
||||
|
||||
monkeypatch.setattr(ollama_demo, "VmManager", TestVmManager)
|
||||
monkeypatch.setattr(ollama_demo, "Pyro", TestPyro)
|
||||
|
||||
|
||||
def _stepwise_model_response(payload: dict[str, Any], step: int) -> dict[str, Any]:
|
||||
|
|
@ -46,55 +47,14 @@ def _stepwise_model_response(payload: dict[str, Any], step: int) -> dict[str, An
|
|||
{
|
||||
"id": "2",
|
||||
"function": {
|
||||
"name": "vm_create",
|
||||
"arguments": json.dumps(
|
||||
{"profile": "debian-git", "vcpu_count": 1, "mem_mib": 512}
|
||||
),
|
||||
},
|
||||
}
|
||||
],
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
if step == 3:
|
||||
vm_id = json.loads(payload["messages"][-1]["content"])["vm_id"]
|
||||
return {
|
||||
"choices": [
|
||||
{
|
||||
"message": {
|
||||
"role": "assistant",
|
||||
"content": "",
|
||||
"tool_calls": [
|
||||
{
|
||||
"id": "3",
|
||||
"function": {
|
||||
"name": "vm_start",
|
||||
"arguments": json.dumps({"vm_id": vm_id}),
|
||||
},
|
||||
}
|
||||
],
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
if step == 4:
|
||||
vm_id = json.loads(payload["messages"][-1]["content"])["vm_id"]
|
||||
return {
|
||||
"choices": [
|
||||
{
|
||||
"message": {
|
||||
"role": "assistant",
|
||||
"content": "",
|
||||
"tool_calls": [
|
||||
{
|
||||
"id": "4",
|
||||
"function": {
|
||||
"name": "vm_exec",
|
||||
"name": "vm_run",
|
||||
"arguments": json.dumps(
|
||||
{
|
||||
"vm_id": vm_id,
|
||||
"profile": "debian-git",
|
||||
"command": "printf 'true\\n'",
|
||||
"vcpu_count": 1,
|
||||
"mem_mib": 512,
|
||||
"network": True,
|
||||
}
|
||||
),
|
||||
},
|
||||
|
|
@ -127,12 +87,12 @@ def test_run_ollama_tool_demo_happy_path(monkeypatch: pytest.MonkeyPatch) -> Non
|
|||
assert result["fallback_used"] is False
|
||||
assert str(result["exec_result"]["stdout"]).strip() == "true"
|
||||
assert result["final_response"] == "Executed git command in ephemeral VM."
|
||||
assert len(result["tool_events"]) == 4
|
||||
assert len(result["tool_events"]) == 2
|
||||
assert any(line == "[model] input user" for line in logs)
|
||||
assert any(line == "[model] output assistant" for line in logs)
|
||||
assert any("[model] tool_call vm_exec" in line for line in logs)
|
||||
assert any(line == "[tool] calling vm_exec" for line in logs)
|
||||
assert any(line == "[tool] result vm_exec" for line in logs)
|
||||
assert any("[model] tool_call vm_run" in line for line in logs)
|
||||
assert any(line == "[tool] calling vm_run" for line in logs)
|
||||
assert any(line == "[tool] result vm_run" for line in logs)
|
||||
|
||||
|
||||
def test_run_ollama_tool_demo_recovers_from_bad_vm_id(
|
||||
|
|
@ -256,12 +216,12 @@ def test_run_ollama_tool_demo_resolves_vm_id_placeholder(
|
|||
|
||||
|
||||
def test_dispatch_tool_call_vm_exec_autostarts_created_vm(tmp_path: Path) -> None:
|
||||
manager = RealVmManager(backend_name="mock", base_dir=tmp_path / "vms")
|
||||
created = manager.create_vm(profile="debian-base", vcpu_count=1, mem_mib=512, ttl_seconds=60)
|
||||
pyro = RealPyro(manager=RealVmManager(backend_name="mock", base_dir=tmp_path / "vms"))
|
||||
created = pyro.create_vm(profile="debian-base", vcpu_count=1, mem_mib=512, ttl_seconds=60)
|
||||
vm_id = str(created["vm_id"])
|
||||
|
||||
executed = ollama_demo._dispatch_tool_call(
|
||||
manager,
|
||||
pyro,
|
||||
"vm_exec",
|
||||
{"vm_id": vm_id, "command": "printf 'git version\\n'", "timeout_seconds": "30"},
|
||||
)
|
||||
|
|
@ -275,7 +235,7 @@ def test_run_ollama_tool_demo_raises_without_vm_exec(monkeypatch: pytest.MonkeyP
|
|||
return {"choices": [{"message": {"role": "assistant", "content": "No tools"}}]}
|
||||
|
||||
monkeypatch.setattr(ollama_demo, "_post_chat_completion", fake_post_chat_completion)
|
||||
with pytest.raises(RuntimeError, match="did not execute a successful vm_exec"):
|
||||
with pytest.raises(RuntimeError, match="did not execute a successful vm_run or vm_exec"):
|
||||
ollama_demo.run_ollama_tool_demo()
|
||||
|
||||
|
||||
|
|
@ -286,16 +246,16 @@ def test_run_ollama_tool_demo_uses_fallback_when_not_strict(
|
|||
del base_url, payload
|
||||
return {"choices": [{"message": {"role": "assistant", "content": "No tools"}}]}
|
||||
|
||||
class TestVmManager(RealVmManager):
|
||||
class TestPyro(RealPyro):
|
||||
def __init__(self) -> None:
|
||||
super().__init__(backend_name="mock", base_dir=tmp_path / "vms")
|
||||
super().__init__(manager=RealVmManager(backend_name="mock", base_dir=tmp_path / "vms"))
|
||||
|
||||
monkeypatch.setattr(ollama_demo, "_post_chat_completion", fake_post_chat_completion)
|
||||
monkeypatch.setattr(ollama_demo, "VmManager", TestVmManager)
|
||||
monkeypatch.setattr(ollama_demo, "Pyro", TestPyro)
|
||||
monkeypatch.setattr(
|
||||
ollama_demo,
|
||||
"_run_direct_lifecycle_fallback",
|
||||
lambda manager: {
|
||||
lambda pyro: {
|
||||
"vm_id": "vm-1",
|
||||
"command": ollama_demo.NETWORK_PROOF_COMMAND,
|
||||
"stdout": "true\n",
|
||||
|
|
@ -332,7 +292,7 @@ def test_run_ollama_tool_demo_verbose_logs_values(monkeypatch: pytest.MonkeyPatc
|
|||
assert str(result["exec_result"]["stdout"]).strip() == "true"
|
||||
assert any("[model] input user:" in line for line in logs)
|
||||
assert any("[model] tool_call vm_list_profiles args={}" in line for line in logs)
|
||||
assert any("[tool] result vm_exec " in line for line in logs)
|
||||
assert any("[tool] result vm_run " in line for line in logs)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
|
|
@ -397,7 +357,7 @@ def test_run_ollama_tool_demo_exec_result_validation(
|
|||
"message": {
|
||||
"role": "assistant",
|
||||
"tool_calls": [
|
||||
{"id": "1", "function": {"name": "vm_exec", "arguments": "{}"}}
|
||||
{"id": "1", "function": {"name": "vm_run", "arguments": "{}"}}
|
||||
],
|
||||
}
|
||||
}
|
||||
|
|
@ -410,9 +370,9 @@ def test_run_ollama_tool_demo_exec_result_validation(
|
|||
del base_url, payload
|
||||
return responses.pop(0)
|
||||
|
||||
def fake_dispatch(manager: Any, tool_name: str, arguments: dict[str, Any]) -> Any:
|
||||
del manager, arguments
|
||||
if tool_name == "vm_exec":
|
||||
def fake_dispatch(pyro: Any, tool_name: str, arguments: dict[str, Any]) -> Any:
|
||||
del pyro, arguments
|
||||
if tool_name == "vm_run":
|
||||
return exec_result
|
||||
return {"ok": True}
|
||||
|
||||
|
|
@ -423,27 +383,46 @@ def test_run_ollama_tool_demo_exec_result_validation(
|
|||
|
||||
|
||||
def test_dispatch_tool_call_coverage(tmp_path: Path) -> None:
|
||||
manager = RealVmManager(backend_name="mock", base_dir=tmp_path / "vms")
|
||||
profiles = ollama_demo._dispatch_tool_call(manager, "vm_list_profiles", {})
|
||||
pyro = RealPyro(manager=RealVmManager(backend_name="mock", base_dir=tmp_path / "vms"))
|
||||
profiles = ollama_demo._dispatch_tool_call(pyro, "vm_list_profiles", {})
|
||||
assert "profiles" in profiles
|
||||
created = ollama_demo._dispatch_tool_call(
|
||||
manager,
|
||||
pyro,
|
||||
"vm_create",
|
||||
{"profile": "debian-base", "vcpu_count": "1", "mem_mib": "512", "ttl_seconds": "60"},
|
||||
{
|
||||
"profile": "debian-base",
|
||||
"vcpu_count": "1",
|
||||
"mem_mib": "512",
|
||||
"ttl_seconds": "60",
|
||||
"network": False,
|
||||
},
|
||||
)
|
||||
vm_id = str(created["vm_id"])
|
||||
started = ollama_demo._dispatch_tool_call(manager, "vm_start", {"vm_id": vm_id})
|
||||
started = ollama_demo._dispatch_tool_call(pyro, "vm_start", {"vm_id": vm_id})
|
||||
assert started["state"] == "started"
|
||||
status = ollama_demo._dispatch_tool_call(manager, "vm_status", {"vm_id": vm_id})
|
||||
status = ollama_demo._dispatch_tool_call(pyro, "vm_status", {"vm_id": vm_id})
|
||||
assert status["vm_id"] == vm_id
|
||||
executed = ollama_demo._dispatch_tool_call(
|
||||
manager,
|
||||
pyro,
|
||||
"vm_exec",
|
||||
{"vm_id": vm_id, "command": "printf 'true\\n'", "timeout_seconds": "30"},
|
||||
)
|
||||
assert int(executed["exit_code"]) == 0
|
||||
executed_run = ollama_demo._dispatch_tool_call(
|
||||
pyro,
|
||||
"vm_run",
|
||||
{
|
||||
"profile": "debian-base",
|
||||
"command": "printf 'true\\n'",
|
||||
"vcpu_count": "1",
|
||||
"mem_mib": "512",
|
||||
"timeout_seconds": "30",
|
||||
"network": False,
|
||||
},
|
||||
)
|
||||
assert int(executed_run["exit_code"]) == 0
|
||||
with pytest.raises(RuntimeError, match="unexpected tool requested by model"):
|
||||
ollama_demo._dispatch_tool_call(manager, "nope", {})
|
||||
ollama_demo._dispatch_tool_call(pyro, "nope", {})
|
||||
|
||||
|
||||
def test_format_tool_error() -> None:
|
||||
|
|
@ -483,6 +462,16 @@ def test_require_int_validation(value: Any) -> None:
|
|||
ollama_demo._require_int({"k": value}, "k")
|
||||
|
||||
|
||||
@pytest.mark.parametrize(("arguments", "expected"), [({}, False), ({"k": True}, True)])
|
||||
def test_require_bool(arguments: dict[str, Any], expected: bool) -> None:
|
||||
assert ollama_demo._require_bool(arguments, "k", default=False) is expected
|
||||
|
||||
|
||||
def test_require_bool_validation() -> None:
|
||||
with pytest.raises(ValueError, match="must be a boolean"):
|
||||
ollama_demo._require_bool({"k": "true"}, "k")
|
||||
|
||||
|
||||
def test_post_chat_completion_success(monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
class StubResponse:
|
||||
def __enter__(self) -> StubResponse:
|
||||
|
|
|
|||
|
|
@ -3,48 +3,38 @@ from __future__ import annotations
|
|||
import pytest
|
||||
|
||||
import pyro_mcp.runtime_network_check as runtime_network_check
|
||||
from pyro_mcp.vm_network import TapNetworkManager
|
||||
|
||||
|
||||
def test_network_check_uses_network_enabled_manager(monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
observed: dict[str, object] = {}
|
||||
|
||||
class StubManager:
|
||||
class StubPyro:
|
||||
def __init__(self, **kwargs: object) -> None:
|
||||
observed.update(kwargs)
|
||||
|
||||
def create_vm(self, **kwargs: object) -> dict[str, object]:
|
||||
observed["create_kwargs"] = kwargs
|
||||
return {"vm_id": "vm123"}
|
||||
|
||||
def start_vm(self, vm_id: str) -> dict[str, object]:
|
||||
observed["started_vm_id"] = vm_id
|
||||
return {"state": "started"}
|
||||
|
||||
def status_vm(self, vm_id: str) -> dict[str, object]:
|
||||
observed["status_vm_id"] = vm_id
|
||||
return {"network_enabled": True}
|
||||
|
||||
def exec_vm(self, vm_id: str, *, command: str, timeout_seconds: int) -> dict[str, object]:
|
||||
observed["exec_vm_id"] = vm_id
|
||||
observed["command"] = command
|
||||
observed["timeout_seconds"] = timeout_seconds
|
||||
def run_in_vm(self, **kwargs: object) -> dict[str, object]:
|
||||
observed["run_kwargs"] = kwargs
|
||||
return {
|
||||
"vm_id": "vm123",
|
||||
"execution_mode": "guest_vsock",
|
||||
"exit_code": 0,
|
||||
"stdout": "true\n",
|
||||
"stderr": "",
|
||||
"cleanup": {"deleted": True, "vm_id": vm_id, "reason": "post_exec_cleanup"},
|
||||
"cleanup": {"deleted": True, "vm_id": "vm123", "reason": "post_exec_cleanup"},
|
||||
}
|
||||
|
||||
monkeypatch.setattr(runtime_network_check, "VmManager", StubManager)
|
||||
monkeypatch.setattr(runtime_network_check, "Pyro", StubPyro)
|
||||
result = runtime_network_check.run_network_check()
|
||||
|
||||
network_manager = observed["network_manager"]
|
||||
assert isinstance(network_manager, TapNetworkManager)
|
||||
assert network_manager.enabled is True
|
||||
assert observed["command"] == runtime_network_check.NETWORK_CHECK_COMMAND
|
||||
assert observed["timeout_seconds"] == 120
|
||||
assert observed["run_kwargs"] == {
|
||||
"profile": "debian-git",
|
||||
"command": runtime_network_check.NETWORK_CHECK_COMMAND,
|
||||
"vcpu_count": 1,
|
||||
"mem_mib": 1024,
|
||||
"timeout_seconds": 120,
|
||||
"ttl_seconds": 600,
|
||||
"network": True,
|
||||
}
|
||||
assert result.execution_mode == "guest_vsock"
|
||||
assert result.network_enabled is True
|
||||
assert result.exit_code == 0
|
||||
|
|
|
|||
|
|
@ -29,10 +29,11 @@ def test_create_server_registers_vm_tools(tmp_path: Path) -> None:
|
|||
assert "vm_exec" in tool_names
|
||||
assert "vm_list_profiles" in tool_names
|
||||
assert "vm_network_info" in tool_names
|
||||
assert "vm_run" in tool_names
|
||||
assert "vm_status" in tool_names
|
||||
|
||||
|
||||
def test_vm_tools_lifecycle_round_trip(tmp_path: Path) -> None:
|
||||
def test_vm_run_round_trip(tmp_path: Path) -> None:
|
||||
manager = VmManager(
|
||||
backend_name="mock",
|
||||
base_dir=tmp_path / "vms",
|
||||
|
|
@ -49,17 +50,17 @@ def test_vm_tools_lifecycle_round_trip(tmp_path: Path) -> None:
|
|||
|
||||
async def _run() -> dict[str, Any]:
|
||||
server = create_server(manager=manager)
|
||||
created = _extract_structured(
|
||||
await server.call_tool(
|
||||
"vm_create",
|
||||
{"profile": "debian-git", "vcpu_count": 1, "mem_mib": 512, "ttl_seconds": 600},
|
||||
)
|
||||
)
|
||||
vm_id = str(created["vm_id"])
|
||||
await server.call_tool("vm_start", {"vm_id": vm_id})
|
||||
executed = _extract_structured(
|
||||
await server.call_tool(
|
||||
"vm_exec", {"vm_id": vm_id, "command": "printf 'git version 2.0\\n'"}
|
||||
"vm_run",
|
||||
{
|
||||
"profile": "debian-git",
|
||||
"command": "printf 'git version 2.0\\n'",
|
||||
"vcpu_count": 1,
|
||||
"mem_mib": 512,
|
||||
"ttl_seconds": 600,
|
||||
"network": False,
|
||||
},
|
||||
)
|
||||
)
|
||||
return executed
|
||||
|
|
|
|||
|
|
@ -107,7 +107,7 @@ def test_vm_manager_reaps_started_vm(tmp_path: Path) -> None:
|
|||
({"vcpu_count": 1, "mem_mib": 512, "ttl_seconds": 30}, "ttl_seconds must be between"),
|
||||
],
|
||||
)
|
||||
def test_vm_manager_validates_limits(tmp_path: Path, kwargs: dict[str, int], msg: str) -> None:
|
||||
def test_vm_manager_validates_limits(tmp_path: Path, kwargs: dict[str, Any], msg: str) -> None:
|
||||
manager = VmManager(
|
||||
backend_name="mock",
|
||||
base_dir=tmp_path / "vms",
|
||||
|
|
@ -188,6 +188,25 @@ def test_vm_manager_network_info(tmp_path: Path) -> None:
|
|||
assert info["network_enabled"] is False
|
||||
|
||||
|
||||
def test_vm_manager_run_vm(tmp_path: Path) -> None:
|
||||
manager = VmManager(
|
||||
backend_name="mock",
|
||||
base_dir=tmp_path / "vms",
|
||||
network_manager=TapNetworkManager(enabled=False),
|
||||
)
|
||||
result = manager.run_vm(
|
||||
profile="debian-base",
|
||||
command="printf 'ok\\n'",
|
||||
vcpu_count=1,
|
||||
mem_mib=512,
|
||||
timeout_seconds=30,
|
||||
ttl_seconds=600,
|
||||
network=False,
|
||||
)
|
||||
assert int(result["exit_code"]) == 0
|
||||
assert str(result["stdout"]) == "ok\n"
|
||||
|
||||
|
||||
def test_vm_manager_firecracker_backend_path(
|
||||
tmp_path: Path, monkeypatch: pytest.MonkeyPatch
|
||||
) -> None:
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue