from __future__ import annotations from pathlib import Path from typing import Any import pytest import pyro_mcp.vm_manager as vm_manager_module from pyro_mcp.runtime import resolve_runtime_paths from pyro_mcp.vm_manager import VmManager from pyro_mcp.vm_network import TapNetworkManager def test_vm_manager_lifecycle_and_auto_cleanup(tmp_path: Path) -> None: manager = VmManager( backend_name="mock", base_dir=tmp_path / "vms", network_manager=TapNetworkManager(enabled=False), ) created = manager.create_vm( environment="debian:12", vcpu_count=1, mem_mib=512, ttl_seconds=600, ) vm_id = str(created["vm_id"]) started = manager.start_vm(vm_id) assert started["state"] == "started" executed = manager.exec_vm(vm_id, command="printf 'git version 2.43.0\\n'", timeout_seconds=30) assert executed["exit_code"] == 0 assert executed["execution_mode"] == "host_compat" assert "git version" in str(executed["stdout"]) with pytest.raises(ValueError, match="does not exist"): manager.status_vm(vm_id) def test_vm_manager_exec_timeout(tmp_path: Path) -> None: manager = VmManager( backend_name="mock", base_dir=tmp_path / "vms", network_manager=TapNetworkManager(enabled=False), ) vm_id = str( manager.create_vm( environment="debian:12-base", vcpu_count=1, mem_mib=512, ttl_seconds=600, )["vm_id"] ) manager.start_vm(vm_id) result = manager.exec_vm(vm_id, command="sleep 2", timeout_seconds=1) assert result["exit_code"] == 124 assert "timed out" in str(result["stderr"]) def test_vm_manager_stop_and_delete(tmp_path: Path) -> None: manager = VmManager( backend_name="mock", base_dir=tmp_path / "vms", network_manager=TapNetworkManager(enabled=False), ) vm_id = str( manager.create_vm( environment="debian:12-base", vcpu_count=1, mem_mib=512, ttl_seconds=600, )["vm_id"] ) manager.start_vm(vm_id) stopped = manager.stop_vm(vm_id) assert stopped["state"] == "stopped" deleted = manager.delete_vm(vm_id) assert deleted["deleted"] is True def test_vm_manager_reaps_expired(tmp_path: Path) -> None: manager = VmManager( backend_name="mock", base_dir=tmp_path / "vms", network_manager=TapNetworkManager(enabled=False), ) manager.MIN_TTL_SECONDS = 1 vm_id = str( manager.create_vm( environment="debian:12-base", vcpu_count=1, mem_mib=512, ttl_seconds=1, )["vm_id"] ) instance = manager._instances[vm_id] # noqa: SLF001 instance.expires_at = 0.0 result = manager.reap_expired() assert result["count"] == 1 with pytest.raises(ValueError): manager.status_vm(vm_id) def test_vm_manager_reaps_started_vm(tmp_path: Path) -> None: manager = VmManager( backend_name="mock", base_dir=tmp_path / "vms", network_manager=TapNetworkManager(enabled=False), ) manager.MIN_TTL_SECONDS = 1 vm_id = str( manager.create_vm( environment="debian:12-base", vcpu_count=1, mem_mib=512, ttl_seconds=1, )["vm_id"] ) manager.start_vm(vm_id) manager._instances[vm_id].expires_at = 0.0 # noqa: SLF001 result = manager.reap_expired() assert result["count"] == 1 @pytest.mark.parametrize( ("kwargs", "msg"), [ ({"vcpu_count": 0, "mem_mib": 512, "ttl_seconds": 600}, "vcpu_count must be between"), ({"vcpu_count": 1, "mem_mib": 64, "ttl_seconds": 600}, "mem_mib must be between"), ({"vcpu_count": 1, "mem_mib": 512, "ttl_seconds": 30}, "ttl_seconds must be between"), ], ) def test_vm_manager_validates_limits(tmp_path: Path, kwargs: dict[str, Any], msg: str) -> None: manager = VmManager( backend_name="mock", base_dir=tmp_path / "vms", network_manager=TapNetworkManager(enabled=False), ) with pytest.raises(ValueError, match=msg): manager.create_vm(environment="debian:12-base", **kwargs) def test_vm_manager_max_active_limit(tmp_path: Path) -> None: manager = VmManager( backend_name="mock", base_dir=tmp_path / "vms", max_active_vms=1, network_manager=TapNetworkManager(enabled=False), ) manager.create_vm(environment="debian:12-base", vcpu_count=1, mem_mib=512, ttl_seconds=600) with pytest.raises(RuntimeError, match="max active VMs reached"): manager.create_vm(environment="debian:12-base", vcpu_count=1, mem_mib=512, ttl_seconds=600) def test_vm_manager_state_validation(tmp_path: Path) -> None: manager = VmManager( backend_name="mock", base_dir=tmp_path / "vms", network_manager=TapNetworkManager(enabled=False), ) vm_id = str( manager.create_vm( environment="debian:12-base", vcpu_count=1, mem_mib=512, ttl_seconds=600, )["vm_id"] ) with pytest.raises(RuntimeError, match="must be in 'started' state"): manager.exec_vm(vm_id, command="echo hi", timeout_seconds=30) with pytest.raises(ValueError, match="must be positive"): manager.exec_vm(vm_id, command="echo hi", timeout_seconds=0) manager.start_vm(vm_id) with pytest.raises(RuntimeError, match="cannot be started from state"): manager.start_vm(vm_id) def test_vm_manager_status_expired_raises(tmp_path: Path) -> None: manager = VmManager( backend_name="mock", base_dir=tmp_path / "vms", network_manager=TapNetworkManager(enabled=False), ) manager.MIN_TTL_SECONDS = 1 vm_id = str( manager.create_vm( environment="debian:12-base", vcpu_count=1, mem_mib=512, ttl_seconds=1, )["vm_id"] ) manager._instances[vm_id].expires_at = 0.0 # noqa: SLF001 with pytest.raises(RuntimeError, match="expired and was automatically deleted"): manager.status_vm(vm_id) def test_vm_manager_invalid_backend(tmp_path: Path) -> None: with pytest.raises(ValueError, match="invalid backend"): VmManager( backend_name="nope", base_dir=tmp_path / "vms", network_manager=TapNetworkManager(enabled=False), ) def test_vm_manager_network_info(tmp_path: Path) -> None: manager = VmManager( backend_name="mock", base_dir=tmp_path / "vms", network_manager=TapNetworkManager(enabled=False), ) created = manager.create_vm( environment="debian:12-base", vcpu_count=1, mem_mib=512, ttl_seconds=600, ) vm_id = str(created["vm_id"]) status = manager.status_vm(vm_id) info = manager.network_info_vm(vm_id) assert status["network_enabled"] is False assert status["guest_ip"] is None assert info["network_enabled"] is False def test_vm_manager_run_vm(tmp_path: Path) -> None: manager = VmManager( backend_name="mock", base_dir=tmp_path / "vms", network_manager=TapNetworkManager(enabled=False), ) result = manager.run_vm( environment="debian:12-base", command="printf 'ok\\n'", vcpu_count=1, mem_mib=512, timeout_seconds=30, ttl_seconds=600, network=False, ) assert int(result["exit_code"]) == 0 assert str(result["stdout"]) == "ok\n" def test_vm_manager_firecracker_backend_path( tmp_path: Path, monkeypatch: pytest.MonkeyPatch ) -> None: class StubFirecrackerBackend: def __init__( self, environment_store: Any, firecracker_bin: Path, jailer_bin: Path, runtime_capabilities: Any, network_manager: TapNetworkManager, ) -> None: self.environment_store = environment_store self.firecracker_bin = firecracker_bin self.jailer_bin = jailer_bin self.runtime_capabilities = runtime_capabilities self.network_manager = network_manager def create(self, instance: Any) -> None: del instance def start(self, instance: Any) -> None: del instance def exec(self, instance: Any, command: str, timeout_seconds: int) -> Any: del instance, command, timeout_seconds return None def stop(self, instance: Any) -> None: del instance def delete(self, instance: Any) -> None: del instance monkeypatch.setattr(vm_manager_module, "FirecrackerBackend", StubFirecrackerBackend) manager = VmManager( backend_name="firecracker", base_dir=tmp_path / "vms", runtime_paths=resolve_runtime_paths(), network_manager=TapNetworkManager(enabled=False), ) assert manager._backend_name == "firecracker" # noqa: SLF001