pyro-mcp/tests/test_vm_manager.py

285 lines
8.9 KiB
Python

from __future__ import annotations
from pathlib import Path
from typing import Any
import pytest
import pyro_mcp.vm_manager as vm_manager_module
from pyro_mcp.runtime import resolve_runtime_paths
from pyro_mcp.vm_manager import VmManager
from pyro_mcp.vm_network import TapNetworkManager
def test_vm_manager_lifecycle_and_auto_cleanup(tmp_path: Path) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
created = manager.create_vm(
environment="debian:12",
vcpu_count=1,
mem_mib=512,
ttl_seconds=600,
)
vm_id = str(created["vm_id"])
started = manager.start_vm(vm_id)
assert started["state"] == "started"
executed = manager.exec_vm(vm_id, command="printf 'git version 2.43.0\\n'", timeout_seconds=30)
assert executed["exit_code"] == 0
assert executed["execution_mode"] == "host_compat"
assert "git version" in str(executed["stdout"])
with pytest.raises(ValueError, match="does not exist"):
manager.status_vm(vm_id)
def test_vm_manager_exec_timeout(tmp_path: Path) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
vm_id = str(
manager.create_vm(
environment="debian:12-base",
vcpu_count=1,
mem_mib=512,
ttl_seconds=600,
)["vm_id"]
)
manager.start_vm(vm_id)
result = manager.exec_vm(vm_id, command="sleep 2", timeout_seconds=1)
assert result["exit_code"] == 124
assert "timed out" in str(result["stderr"])
def test_vm_manager_stop_and_delete(tmp_path: Path) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
vm_id = str(
manager.create_vm(
environment="debian:12-base",
vcpu_count=1,
mem_mib=512,
ttl_seconds=600,
)["vm_id"]
)
manager.start_vm(vm_id)
stopped = manager.stop_vm(vm_id)
assert stopped["state"] == "stopped"
deleted = manager.delete_vm(vm_id)
assert deleted["deleted"] is True
def test_vm_manager_reaps_expired(tmp_path: Path) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
manager.MIN_TTL_SECONDS = 1
vm_id = str(
manager.create_vm(
environment="debian:12-base",
vcpu_count=1,
mem_mib=512,
ttl_seconds=1,
)["vm_id"]
)
instance = manager._instances[vm_id] # noqa: SLF001
instance.expires_at = 0.0
result = manager.reap_expired()
assert result["count"] == 1
with pytest.raises(ValueError):
manager.status_vm(vm_id)
def test_vm_manager_reaps_started_vm(tmp_path: Path) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
manager.MIN_TTL_SECONDS = 1
vm_id = str(
manager.create_vm(
environment="debian:12-base",
vcpu_count=1,
mem_mib=512,
ttl_seconds=1,
)["vm_id"]
)
manager.start_vm(vm_id)
manager._instances[vm_id].expires_at = 0.0 # noqa: SLF001
result = manager.reap_expired()
assert result["count"] == 1
@pytest.mark.parametrize(
("kwargs", "msg"),
[
({"vcpu_count": 0, "mem_mib": 512, "ttl_seconds": 600}, "vcpu_count must be between"),
({"vcpu_count": 1, "mem_mib": 64, "ttl_seconds": 600}, "mem_mib must be between"),
({"vcpu_count": 1, "mem_mib": 512, "ttl_seconds": 30}, "ttl_seconds must be between"),
],
)
def test_vm_manager_validates_limits(tmp_path: Path, kwargs: dict[str, Any], msg: str) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
with pytest.raises(ValueError, match=msg):
manager.create_vm(environment="debian:12-base", **kwargs)
def test_vm_manager_max_active_limit(tmp_path: Path) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
max_active_vms=1,
network_manager=TapNetworkManager(enabled=False),
)
manager.create_vm(environment="debian:12-base", vcpu_count=1, mem_mib=512, ttl_seconds=600)
with pytest.raises(RuntimeError, match="max active VMs reached"):
manager.create_vm(environment="debian:12-base", vcpu_count=1, mem_mib=512, ttl_seconds=600)
def test_vm_manager_state_validation(tmp_path: Path) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
vm_id = str(
manager.create_vm(
environment="debian:12-base",
vcpu_count=1,
mem_mib=512,
ttl_seconds=600,
)["vm_id"]
)
with pytest.raises(RuntimeError, match="must be in 'started' state"):
manager.exec_vm(vm_id, command="echo hi", timeout_seconds=30)
with pytest.raises(ValueError, match="must be positive"):
manager.exec_vm(vm_id, command="echo hi", timeout_seconds=0)
manager.start_vm(vm_id)
with pytest.raises(RuntimeError, match="cannot be started from state"):
manager.start_vm(vm_id)
def test_vm_manager_status_expired_raises(tmp_path: Path) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
manager.MIN_TTL_SECONDS = 1
vm_id = str(
manager.create_vm(
environment="debian:12-base",
vcpu_count=1,
mem_mib=512,
ttl_seconds=1,
)["vm_id"]
)
manager._instances[vm_id].expires_at = 0.0 # noqa: SLF001
with pytest.raises(RuntimeError, match="expired and was automatically deleted"):
manager.status_vm(vm_id)
def test_vm_manager_invalid_backend(tmp_path: Path) -> None:
with pytest.raises(ValueError, match="invalid backend"):
VmManager(
backend_name="nope",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
def test_vm_manager_network_info(tmp_path: Path) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
created = manager.create_vm(
environment="debian:12-base",
vcpu_count=1,
mem_mib=512,
ttl_seconds=600,
)
vm_id = str(created["vm_id"])
status = manager.status_vm(vm_id)
info = manager.network_info_vm(vm_id)
assert status["network_enabled"] is False
assert status["guest_ip"] is None
assert info["network_enabled"] is False
def test_vm_manager_run_vm(tmp_path: Path) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
result = manager.run_vm(
environment="debian:12-base",
command="printf 'ok\\n'",
vcpu_count=1,
mem_mib=512,
timeout_seconds=30,
ttl_seconds=600,
network=False,
)
assert int(result["exit_code"]) == 0
assert str(result["stdout"]) == "ok\n"
def test_vm_manager_firecracker_backend_path(
tmp_path: Path, monkeypatch: pytest.MonkeyPatch
) -> None:
class StubFirecrackerBackend:
def __init__(
self,
environment_store: Any,
firecracker_bin: Path,
jailer_bin: Path,
runtime_capabilities: Any,
network_manager: TapNetworkManager,
) -> None:
self.environment_store = environment_store
self.firecracker_bin = firecracker_bin
self.jailer_bin = jailer_bin
self.runtime_capabilities = runtime_capabilities
self.network_manager = network_manager
def create(self, instance: Any) -> None:
del instance
def start(self, instance: Any) -> None:
del instance
def exec(self, instance: Any, command: str, timeout_seconds: int) -> Any:
del instance, command, timeout_seconds
return None
def stop(self, instance: Any) -> None:
del instance
def delete(self, instance: Any) -> None:
del instance
monkeypatch.setattr(vm_manager_module, "FirecrackerBackend", StubFirecrackerBackend)
manager = VmManager(
backend_name="firecracker",
base_dir=tmp_path / "vms",
runtime_paths=resolve_runtime_paths(),
network_manager=TapNetworkManager(enabled=False),
)
assert manager._backend_name == "firecracker" # noqa: SLF001