from __future__ import annotations import io import json import os import signal import subprocess import tarfile import time from pathlib import Path from typing import Any, cast import pytest import pyro_mcp.vm_manager as vm_manager_module from pyro_mcp.runtime import RuntimeCapabilities, resolve_runtime_paths from pyro_mcp.vm_manager import VmManager from pyro_mcp.vm_network import NetworkConfig, TapNetworkManager def _run_debugfs_write(rootfs_image: Path, command: str) -> None: proc = subprocess.run( # noqa: S603 ["debugfs", "-w", "-R", command, str(rootfs_image)], text=True, capture_output=True, check=False, ) if proc.returncode != 0: message = proc.stderr.strip() or proc.stdout.strip() or command raise RuntimeError(message) def _create_stopped_workspace_rootfs(tmp_path: Path) -> Path: rootfs_image = tmp_path / "workspace-rootfs.ext4" with rootfs_image.open("wb") as handle: handle.truncate(16 * 1024 * 1024) proc = subprocess.run( # noqa: S603 ["mkfs.ext4", "-F", str(rootfs_image)], text=True, capture_output=True, check=False, ) if proc.returncode != 0: message = proc.stderr.strip() or proc.stdout.strip() or "mkfs.ext4 failed" raise RuntimeError(message) for directory in ( "/workspace", "/workspace/src", "/run", "/run/pyro-secrets", "/run/pyro-services", ): _run_debugfs_write(rootfs_image, f"mkdir {directory}") note_path = tmp_path / "note.txt" note_path.write_text("hello from disk\n", encoding="utf-8") child_path = tmp_path / "child.txt" child_path.write_text("nested child\n", encoding="utf-8") secret_path = tmp_path / "secret.txt" secret_path.write_text("super-secret\n", encoding="utf-8") service_path = tmp_path / "service.log" service_path.write_text("service runtime\n", encoding="utf-8") _run_debugfs_write(rootfs_image, f"write {note_path} /workspace/note.txt") _run_debugfs_write(rootfs_image, f"write {child_path} /workspace/src/child.txt") _run_debugfs_write(rootfs_image, "symlink /workspace/link note.txt") _run_debugfs_write(rootfs_image, f"write {secret_path} /run/pyro-secrets/TOKEN") _run_debugfs_write(rootfs_image, f"write {service_path} /run/pyro-services/app.log") return rootfs_image def test_vm_manager_lifecycle_and_auto_cleanup(tmp_path: Path) -> None: manager = VmManager( backend_name="mock", base_dir=tmp_path / "vms", network_manager=TapNetworkManager(enabled=False), ) created = manager.create_vm( environment="debian:12", vcpu_count=1, mem_mib=512, ttl_seconds=600, allow_host_compat=True, ) vm_id = str(created["vm_id"]) started = manager.start_vm(vm_id) assert started["state"] == "started" executed = manager.exec_vm(vm_id, command="printf 'git version 2.43.0\\n'", timeout_seconds=30) assert executed["exit_code"] == 0 assert executed["execution_mode"] == "host_compat" assert "git version" in str(executed["stdout"]) with pytest.raises(ValueError, match="does not exist"): manager.status_vm(vm_id) def test_vm_manager_exec_timeout(tmp_path: Path) -> None: manager = VmManager( backend_name="mock", base_dir=tmp_path / "vms", network_manager=TapNetworkManager(enabled=False), ) vm_id = str( manager.create_vm( environment="debian:12-base", vcpu_count=1, mem_mib=512, ttl_seconds=600, allow_host_compat=True, )["vm_id"] ) manager.start_vm(vm_id) result = manager.exec_vm(vm_id, command="sleep 2", timeout_seconds=1) assert result["exit_code"] == 124 assert "timed out" in str(result["stderr"]) def test_vm_manager_stop_and_delete(tmp_path: Path) -> None: manager = VmManager( backend_name="mock", base_dir=tmp_path / "vms", network_manager=TapNetworkManager(enabled=False), ) vm_id = str( manager.create_vm( environment="debian:12-base", vcpu_count=1, mem_mib=512, ttl_seconds=600, allow_host_compat=True, )["vm_id"] ) manager.start_vm(vm_id) stopped = manager.stop_vm(vm_id) assert stopped["state"] == "stopped" deleted = manager.delete_vm(vm_id) assert deleted["deleted"] is True def test_vm_manager_reaps_expired(tmp_path: Path) -> None: manager = VmManager( backend_name="mock", base_dir=tmp_path / "vms", network_manager=TapNetworkManager(enabled=False), ) manager.MIN_TTL_SECONDS = 1 vm_id = str( manager.create_vm( environment="debian:12-base", vcpu_count=1, mem_mib=512, ttl_seconds=1, allow_host_compat=True, )["vm_id"] ) instance = manager._instances[vm_id] # noqa: SLF001 instance.expires_at = 0.0 result = manager.reap_expired() assert result["count"] == 1 with pytest.raises(ValueError): manager.status_vm(vm_id) def test_vm_manager_reaps_started_vm(tmp_path: Path) -> None: manager = VmManager( backend_name="mock", base_dir=tmp_path / "vms", network_manager=TapNetworkManager(enabled=False), ) manager.MIN_TTL_SECONDS = 1 vm_id = str( manager.create_vm( environment="debian:12-base", vcpu_count=1, mem_mib=512, ttl_seconds=1, allow_host_compat=True, )["vm_id"] ) manager.start_vm(vm_id) manager._instances[vm_id].expires_at = 0.0 # noqa: SLF001 result = manager.reap_expired() assert result["count"] == 1 @pytest.mark.parametrize( ("kwargs", "msg"), [ ({"vcpu_count": 0, "mem_mib": 512, "ttl_seconds": 600}, "vcpu_count must be between"), ({"vcpu_count": 1, "mem_mib": 64, "ttl_seconds": 600}, "mem_mib must be between"), ({"vcpu_count": 1, "mem_mib": 512, "ttl_seconds": 30}, "ttl_seconds must be between"), ], ) def test_vm_manager_validates_limits(tmp_path: Path, kwargs: dict[str, Any], msg: str) -> None: manager = VmManager( backend_name="mock", base_dir=tmp_path / "vms", network_manager=TapNetworkManager(enabled=False), ) with pytest.raises(ValueError, match=msg): manager.create_vm(environment="debian:12-base", **kwargs) def test_vm_manager_max_active_limit(tmp_path: Path) -> None: manager = VmManager( backend_name="mock", base_dir=tmp_path / "vms", max_active_vms=1, network_manager=TapNetworkManager(enabled=False), ) manager.create_vm( environment="debian:12-base", vcpu_count=1, mem_mib=512, ttl_seconds=600, allow_host_compat=True, ) with pytest.raises(RuntimeError, match="max active VMs reached"): manager.create_vm( environment="debian:12-base", vcpu_count=1, mem_mib=512, ttl_seconds=600, allow_host_compat=True, ) def test_vm_manager_state_validation(tmp_path: Path) -> None: manager = VmManager( backend_name="mock", base_dir=tmp_path / "vms", network_manager=TapNetworkManager(enabled=False), ) vm_id = str( manager.create_vm( environment="debian:12-base", vcpu_count=1, mem_mib=512, ttl_seconds=600, allow_host_compat=True, )["vm_id"] ) with pytest.raises(RuntimeError, match="must be in 'started' state"): manager.exec_vm(vm_id, command="echo hi", timeout_seconds=30) with pytest.raises(ValueError, match="must be positive"): manager.exec_vm(vm_id, command="echo hi", timeout_seconds=0) manager.start_vm(vm_id) with pytest.raises(RuntimeError, match="cannot be started from state"): manager.start_vm(vm_id) def test_vm_manager_status_expired_raises(tmp_path: Path) -> None: manager = VmManager( backend_name="mock", base_dir=tmp_path / "vms", network_manager=TapNetworkManager(enabled=False), ) manager.MIN_TTL_SECONDS = 1 vm_id = str( manager.create_vm( environment="debian:12-base", vcpu_count=1, mem_mib=512, ttl_seconds=1, allow_host_compat=True, )["vm_id"] ) manager._instances[vm_id].expires_at = 0.0 # noqa: SLF001 with pytest.raises(RuntimeError, match="expired and was automatically deleted"): manager.status_vm(vm_id) def test_vm_manager_invalid_backend(tmp_path: Path) -> None: with pytest.raises(ValueError, match="invalid backend"): VmManager( backend_name="nope", base_dir=tmp_path / "vms", network_manager=TapNetworkManager(enabled=False), ) def test_vm_manager_network_info(tmp_path: Path) -> None: manager = VmManager( backend_name="mock", base_dir=tmp_path / "vms", network_manager=TapNetworkManager(enabled=False), ) created = manager.create_vm( environment="debian:12-base", vcpu_count=1, mem_mib=512, ttl_seconds=600, allow_host_compat=True, ) vm_id = str(created["vm_id"]) status = manager.status_vm(vm_id) info = manager.network_info_vm(vm_id) assert status["network_enabled"] is False assert status["guest_ip"] is None assert info["network_enabled"] is False def test_vm_manager_run_vm(tmp_path: Path) -> None: manager = VmManager( backend_name="mock", base_dir=tmp_path / "vms", network_manager=TapNetworkManager(enabled=False), ) result = manager.run_vm( environment="debian:12-base", command="printf 'ok\\n'", vcpu_count=1, mem_mib=512, timeout_seconds=30, ttl_seconds=600, network=False, allow_host_compat=True, ) assert int(result["exit_code"]) == 0 assert str(result["stdout"]) == "ok\n" def test_workspace_lifecycle_and_logs(tmp_path: Path) -> None: manager = VmManager( backend_name="mock", base_dir=tmp_path / "vms", network_manager=TapNetworkManager(enabled=False), ) created = manager.create_workspace( environment="debian:12-base", allow_host_compat=True, ) workspace_id = str(created["workspace_id"]) assert created["state"] == "started" assert created["workspace_path"] == "/workspace" first = manager.exec_workspace( workspace_id, command="printf 'hello\\n' > note.txt", timeout_seconds=30, ) second = manager.exec_workspace(workspace_id, command="cat note.txt", timeout_seconds=30) assert first["exit_code"] == 0 assert second["stdout"] == "hello\n" status = manager.status_workspace(workspace_id) assert status["command_count"] == 2 assert status["last_command"] is not None logs = manager.logs_workspace(workspace_id) assert logs["count"] == 2 entries = logs["entries"] assert isinstance(entries, list) assert entries[1]["stdout"] == "hello\n" deleted = manager.delete_workspace(workspace_id) assert deleted["deleted"] is True with pytest.raises(ValueError, match="does not exist"): manager.status_workspace(workspace_id) def test_workspace_create_seeds_directory_source_into_workspace(tmp_path: Path) -> None: source_dir = tmp_path / "seed" source_dir.mkdir() (source_dir / "note.txt").write_text("hello\n", encoding="utf-8") manager = VmManager( backend_name="mock", base_dir=tmp_path / "vms", network_manager=TapNetworkManager(enabled=False), ) created = manager.create_workspace( environment="debian:12-base", allow_host_compat=True, seed_path=source_dir, ) workspace_id = str(created["workspace_id"]) workspace_seed = created["workspace_seed"] assert workspace_seed["mode"] == "directory" assert workspace_seed["seed_path"] == str(source_dir.resolve()) executed = manager.exec_workspace(workspace_id, command="cat note.txt", timeout_seconds=30) assert executed["stdout"] == "hello\n" status = manager.status_workspace(workspace_id) assert status["workspace_seed"]["mode"] == "directory" assert status["workspace_seed"]["seed_path"] == str(source_dir.resolve()) def test_workspace_create_seeds_tar_archive_into_workspace(tmp_path: Path) -> None: archive_path = tmp_path / "seed.tgz" nested_dir = tmp_path / "src" nested_dir.mkdir() (nested_dir / "note.txt").write_text("archive\n", encoding="utf-8") with tarfile.open(archive_path, "w:gz") as archive: archive.add(nested_dir / "note.txt", arcname="note.txt") manager = VmManager( backend_name="mock", base_dir=tmp_path / "vms", network_manager=TapNetworkManager(enabled=False), ) created = manager.create_workspace( environment="debian:12-base", allow_host_compat=True, seed_path=archive_path, ) workspace_id = str(created["workspace_id"]) assert created["workspace_seed"]["mode"] == "tar_archive" executed = manager.exec_workspace(workspace_id, command="cat note.txt", timeout_seconds=30) assert executed["stdout"] == "archive\n" def test_workspace_sync_push_updates_started_workspace(tmp_path: Path) -> None: source_dir = tmp_path / "seed" source_dir.mkdir() (source_dir / "note.txt").write_text("hello\n", encoding="utf-8") update_dir = tmp_path / "update" update_dir.mkdir() (update_dir / "more.txt").write_text("more\n", encoding="utf-8") manager = VmManager( backend_name="mock", base_dir=tmp_path / "vms", network_manager=TapNetworkManager(enabled=False), ) created = manager.create_workspace( environment="debian:12-base", allow_host_compat=True, seed_path=source_dir, ) workspace_id = str(created["workspace_id"]) synced = manager.push_workspace_sync(workspace_id, source_path=update_dir, dest="subdir") assert synced["workspace_sync"]["mode"] == "directory" assert synced["workspace_sync"]["destination"] == "/workspace/subdir" executed = manager.exec_workspace( workspace_id, command="cat subdir/more.txt", timeout_seconds=30, ) assert executed["stdout"] == "more\n" status = manager.status_workspace(workspace_id) assert status["command_count"] == 1 assert status["workspace_seed"]["mode"] == "directory" def test_workspace_sync_push_requires_started_workspace(tmp_path: Path) -> None: source_dir = tmp_path / "seed" source_dir.mkdir() (source_dir / "note.txt").write_text("hello\n", encoding="utf-8") update_dir = tmp_path / "update" update_dir.mkdir() (update_dir / "more.txt").write_text("more\n", encoding="utf-8") manager = VmManager( backend_name="mock", base_dir=tmp_path / "vms", network_manager=TapNetworkManager(enabled=False), ) created = manager.create_workspace( environment="debian:12-base", allow_host_compat=True, seed_path=source_dir, ) workspace_id = str(created["workspace_id"]) workspace_path = tmp_path / "vms" / "workspaces" / workspace_id / "workspace.json" payload = json.loads(workspace_path.read_text(encoding="utf-8")) payload["state"] = "stopped" workspace_path.write_text(json.dumps(payload, indent=2, sort_keys=True), encoding="utf-8") with pytest.raises( RuntimeError, match="must be in 'started' state before workspace_sync_push", ): manager.push_workspace_sync(workspace_id, source_path=update_dir) def test_workspace_sync_push_rejects_destination_outside_workspace(tmp_path: Path) -> None: source_dir = tmp_path / "seed" source_dir.mkdir() (source_dir / "note.txt").write_text("hello\n", encoding="utf-8") manager = VmManager( backend_name="mock", base_dir=tmp_path / "vms", network_manager=TapNetworkManager(enabled=False), ) workspace_id = str( manager.create_workspace( environment="debian:12-base", allow_host_compat=True, )["workspace_id"] ) with pytest.raises(ValueError, match="workspace destination must stay inside /workspace"): manager.push_workspace_sync(workspace_id, source_path=source_dir, dest="../escape") def test_workspace_metadata_list_update_and_last_activity(tmp_path: Path) -> None: manager = VmManager( backend_name="mock", base_dir=tmp_path / "vms", network_manager=TapNetworkManager(enabled=False), ) created = manager.create_workspace( environment="debian:12-base", allow_host_compat=True, name="repro-fix", labels={"issue": "123", "owner": "codex"}, ) workspace_id = str(created["workspace_id"]) assert created["name"] == "repro-fix" assert created["labels"] == {"issue": "123", "owner": "codex"} created_activity = float(created["last_activity_at"]) listed = manager.list_workspaces() assert listed["count"] == 1 assert listed["workspaces"][0]["name"] == "repro-fix" assert listed["workspaces"][0]["labels"] == {"issue": "123", "owner": "codex"} time.sleep(0.01) updated = manager.update_workspace( workspace_id, name="retry-run", labels={"issue": "124"}, clear_labels=["owner"], ) assert updated["name"] == "retry-run" assert updated["labels"] == {"issue": "124"} updated_activity = float(updated["last_activity_at"]) assert updated_activity >= created_activity status_before_exec = manager.status_workspace(workspace_id) time.sleep(0.01) manager.exec_workspace(workspace_id, command="printf 'ok\\n'", timeout_seconds=30) status_after_exec = manager.status_workspace(workspace_id) assert float(status_before_exec["last_activity_at"]) == updated_activity assert float(status_after_exec["last_activity_at"]) > updated_activity reset = manager.reset_workspace(workspace_id) assert reset["name"] == "retry-run" assert reset["labels"] == {"issue": "124"} def test_workspace_list_loads_legacy_records_without_metadata_fields(tmp_path: Path) -> None: manager = VmManager( backend_name="mock", base_dir=tmp_path / "vms", network_manager=TapNetworkManager(enabled=False), ) created = manager.create_workspace( environment="debian:12-base", allow_host_compat=True, ) workspace_id = str(created["workspace_id"]) record_path = tmp_path / "vms" / "workspaces" / workspace_id / "workspace.json" payload = json.loads(record_path.read_text(encoding="utf-8")) payload.pop("name", None) payload.pop("labels", None) payload.pop("last_activity_at", None) record_path.write_text(json.dumps(payload, indent=2, sort_keys=True), encoding="utf-8") listed = manager.list_workspaces() assert listed["count"] == 1 listed_workspace = listed["workspaces"][0] assert listed_workspace["workspace_id"] == workspace_id assert listed_workspace["name"] is None assert listed_workspace["labels"] == {} assert float(listed_workspace["last_activity_at"]) == float(created["created_at"]) def test_workspace_list_sorts_by_last_activity_and_skips_invalid_payload(tmp_path: Path) -> None: manager = VmManager( backend_name="mock", base_dir=tmp_path / "vms", network_manager=TapNetworkManager(enabled=False), ) first = manager.create_workspace( environment="debian:12-base", allow_host_compat=True, name="first", ) second = manager.create_workspace( environment="debian:12-base", allow_host_compat=True, name="second", ) first_id = str(first["workspace_id"]) second_id = str(second["workspace_id"]) time.sleep(0.01) manager.exec_workspace(second_id, command="printf 'ok\\n'", timeout_seconds=30) invalid_dir = tmp_path / "vms" / "workspaces" / "invalid" invalid_dir.mkdir(parents=True) (invalid_dir / "workspace.json").write_text('"not-a-dict"', encoding="utf-8") listed = manager.list_workspaces() assert listed["count"] == 2 assert [item["workspace_id"] for item in listed["workspaces"]] == [second_id, first_id] def test_workspace_update_clear_name_and_rejects_noop(tmp_path: Path) -> None: manager = VmManager( backend_name="mock", base_dir=tmp_path / "vms", network_manager=TapNetworkManager(enabled=False), ) created = manager.create_workspace( environment="debian:12-base", allow_host_compat=True, name="repro-fix", labels={"issue": "123"}, ) workspace_id = str(created["workspace_id"]) cleared = manager.update_workspace( workspace_id, clear_name=True, clear_labels=["issue"], ) assert cleared["name"] is None assert cleared["labels"] == {} with pytest.raises(ValueError, match="workspace update requested no effective metadata change"): manager.update_workspace(workspace_id, clear_name=True) with pytest.raises(ValueError, match="name and clear_name cannot be used together"): manager.update_workspace(workspace_id, name="retry-run", clear_name=True) def test_workspace_export_rejects_empty_output_path(tmp_path: Path) -> None: manager = VmManager( backend_name="mock", base_dir=tmp_path / "vms", network_manager=TapNetworkManager(enabled=False), ) created = manager.create_workspace( environment="debian:12-base", allow_host_compat=True, ) with pytest.raises(ValueError, match="output_path must not be empty"): manager.export_workspace(str(created["workspace_id"]), path=".", output_path=" ") def test_workspace_diff_and_export_round_trip(tmp_path: Path) -> None: seed_dir = tmp_path / "seed" seed_dir.mkdir() (seed_dir / "note.txt").write_text("hello\n", encoding="utf-8") update_dir = tmp_path / "update" update_dir.mkdir() (update_dir / "note.txt").write_text("hello from sync\n", encoding="utf-8") manager = VmManager( backend_name="mock", base_dir=tmp_path / "vms", network_manager=TapNetworkManager(enabled=False), ) workspace_id = str( manager.create_workspace( environment="debian:12-base", allow_host_compat=True, seed_path=seed_dir, )["workspace_id"] ) manager.push_workspace_sync(workspace_id, source_path=update_dir) diff_payload = manager.diff_workspace(workspace_id) assert diff_payload["workspace_id"] == workspace_id assert diff_payload["changed"] is True assert diff_payload["summary"]["modified"] == 1 assert diff_payload["summary"]["text_patched"] == 1 assert "-hello\n" in diff_payload["patch"] assert "+hello from sync\n" in diff_payload["patch"] output_path = tmp_path / "exported-note.txt" export_payload = manager.export_workspace( workspace_id, path="note.txt", output_path=output_path, ) assert export_payload["workspace_id"] == workspace_id assert export_payload["artifact_type"] == "file" assert output_path.read_text(encoding="utf-8") == "hello from sync\n" status = manager.status_workspace(workspace_id) logs = manager.logs_workspace(workspace_id) assert status["command_count"] == 0 assert logs["count"] == 0 def test_workspace_summary_synthesizes_current_session(tmp_path: Path) -> None: seed_dir = tmp_path / "seed" seed_dir.mkdir() (seed_dir / "note.txt").write_text("hello\n", encoding="utf-8") update_dir = tmp_path / "update" update_dir.mkdir() (update_dir / "more.txt").write_text("more\n", encoding="utf-8") manager = VmManager( backend_name="mock", base_dir=tmp_path / "vms", network_manager=TapNetworkManager(enabled=False), ) workspace_id = str( manager.create_workspace( environment="debian:12-base", allow_host_compat=True, seed_path=seed_dir, name="review-eval", labels={"suite": "smoke"}, )["workspace_id"] ) manager.push_workspace_sync(workspace_id, source_path=update_dir) manager.write_workspace_file(workspace_id, "src/app.py", text="print('hello')\n") manager.apply_workspace_patch( workspace_id, patch=( "--- a/note.txt\n" "+++ b/note.txt\n" "@@ -1 +1 @@\n" "-hello\n" "+patched\n" ), ) manager.exec_workspace(workspace_id, command="cat note.txt", timeout_seconds=30) manager.create_snapshot(workspace_id, "checkpoint") export_path = tmp_path / "exported-note.txt" manager.export_workspace(workspace_id, "note.txt", output_path=export_path) manager.start_service( workspace_id, "app", command='sh -lc \'trap "exit 0" TERM; touch .ready; while true; do sleep 60; done\'', readiness={"type": "file", "path": ".ready"}, ) manager.stop_service(workspace_id, "app") summary = manager.summarize_workspace(workspace_id) assert summary["workspace_id"] == workspace_id assert summary["name"] == "review-eval" assert summary["labels"] == {"suite": "smoke"} assert summary["outcome"]["command_count"] == 1 assert summary["outcome"]["export_count"] == 1 assert summary["outcome"]["snapshot_count"] == 1 assert summary["commands"]["total"] == 1 assert summary["commands"]["recent"][0]["command"] == "cat note.txt" assert [event["event_kind"] for event in summary["edits"]["recent"]] == [ "patch_apply", "file_write", "sync_push", ] assert summary["changes"]["available"] is True assert summary["changes"]["changed"] is True assert summary["changes"]["summary"]["total"] == 4 assert summary["services"]["current"][0]["service_name"] == "app" assert [event["event_kind"] for event in summary["services"]["recent"]] == [ "service_stop", "service_start", ] assert summary["artifacts"]["exports"][0]["workspace_path"] == "/workspace/note.txt" assert summary["snapshots"]["named_count"] == 1 assert summary["snapshots"]["recent"][0]["snapshot_name"] == "checkpoint" def test_workspace_summary_degrades_gracefully_for_stopped_and_legacy_workspaces( tmp_path: Path, ) -> None: seed_dir = tmp_path / "seed" seed_dir.mkdir() (seed_dir / "note.txt").write_text("hello\n", encoding="utf-8") manager = VmManager( backend_name="mock", base_dir=tmp_path / "vms", network_manager=TapNetworkManager(enabled=False), ) stopped_workspace_id = str( manager.create_workspace( environment="debian:12-base", allow_host_compat=True, seed_path=seed_dir, )["workspace_id"] ) manager.exec_workspace(stopped_workspace_id, command="cat note.txt", timeout_seconds=30) manager.stop_workspace(stopped_workspace_id) stopped_summary = manager.summarize_workspace(stopped_workspace_id) assert stopped_summary["commands"]["total"] == 1 assert stopped_summary["changes"]["available"] is False assert "must be in 'started' state" in str(stopped_summary["changes"]["reason"]) legacy_workspace_id = str( manager.create_workspace( environment="debian:12-base", allow_host_compat=True, seed_path=seed_dir, )["workspace_id"] ) baseline_path = ( tmp_path / "vms" / "workspaces" / legacy_workspace_id / "baseline" / "workspace.tar" ) baseline_path.unlink() legacy_summary = manager.summarize_workspace(legacy_workspace_id) assert legacy_summary["changes"]["available"] is False assert "baseline snapshot" in str(legacy_summary["changes"]["reason"]) def test_workspace_file_ops_and_patch_round_trip(tmp_path: Path) -> None: seed_dir = tmp_path / "seed" seed_dir.mkdir() src_dir = seed_dir / "src" src_dir.mkdir() (src_dir / "app.py").write_text('print("bug")\n', encoding="utf-8") manager = VmManager( backend_name="mock", base_dir=tmp_path / "vms", network_manager=TapNetworkManager(enabled=False), ) workspace_id = str( manager.create_workspace( environment="debian:12-base", allow_host_compat=True, seed_path=seed_dir, )["workspace_id"] ) listing = manager.list_workspace_files(workspace_id, path="src", recursive=True) assert listing["entries"] == [ { "path": "/workspace/src/app.py", "artifact_type": "file", "size_bytes": 13, "link_target": None, } ] status_before_read = manager.status_workspace(workspace_id) read_payload = manager.read_workspace_file(workspace_id, "src/app.py") assert read_payload["content"] == 'print("bug")\n' status_after_read = manager.status_workspace(workspace_id) assert float(status_after_read["last_activity_at"]) == float( status_before_read["last_activity_at"] ) written = manager.write_workspace_file( workspace_id, "src/generated/out.txt", text="generated\n", ) assert written["bytes_written"] == 10 patch_payload = manager.apply_workspace_patch( workspace_id, patch=( "--- a/src/app.py\n" "+++ b/src/app.py\n" "@@ -1 +1 @@\n" '-print("bug")\n' '+print("fixed")\n' "--- /dev/null\n" "+++ b/src/new.py\n" "@@ -0,0 +1 @@\n" '+print("new")\n' ), ) assert patch_payload["changed"] is True assert patch_payload["summary"] == { "total": 2, "added": 1, "modified": 1, "deleted": 0, } executed = manager.exec_workspace( workspace_id, command="python3 src/app.py && cat src/new.py && cat src/generated/out.txt", timeout_seconds=30, ) assert executed["stdout"] == 'fixed\nprint("new")\ngenerated\n' diff_payload = manager.diff_workspace(workspace_id) assert diff_payload["changed"] is True assert diff_payload["summary"]["added"] == 2 assert diff_payload["summary"]["modified"] == 1 output_path = tmp_path / "exported-app.py" export_payload = manager.export_workspace( workspace_id, path="src/app.py", output_path=output_path, ) assert export_payload["artifact_type"] == "file" assert output_path.read_text(encoding="utf-8") == 'print("fixed")\n' def test_workspace_export_directory_uses_exact_output_path(tmp_path: Path) -> None: seed_dir = tmp_path / "seed" nested_dir = seed_dir / "src" nested_dir.mkdir(parents=True) (nested_dir / "note.txt").write_text("hello\n", encoding="utf-8") manager = VmManager( backend_name="mock", base_dir=tmp_path / "vms", network_manager=TapNetworkManager(enabled=False), ) workspace_id = str( manager.create_workspace( environment="debian:12-base", allow_host_compat=True, seed_path=seed_dir, )["workspace_id"] ) output_dir = tmp_path / "exported-src" payload = manager.export_workspace(workspace_id, path="src", output_path=output_dir) assert payload["artifact_type"] == "directory" assert (output_dir / "note.txt").read_text(encoding="utf-8") == "hello\n" assert not (output_dir / "src").exists() def test_workspace_diff_requires_create_time_baseline(tmp_path: Path) -> None: manager = VmManager( backend_name="mock", base_dir=tmp_path / "vms", network_manager=TapNetworkManager(enabled=False), ) workspace_id = str( manager.create_workspace( environment="debian:12-base", allow_host_compat=True, )["workspace_id"] ) baseline_path = tmp_path / "vms" / "workspaces" / workspace_id / "baseline" / "workspace.tar" baseline_path.unlink() with pytest.raises(RuntimeError, match="require[s]? a baseline snapshot"): manager.diff_workspace(workspace_id) def test_workspace_snapshots_and_reset_round_trip(tmp_path: Path) -> None: seed_dir = tmp_path / "seed" seed_dir.mkdir() (seed_dir / "note.txt").write_text("seed\n", encoding="utf-8") manager = VmManager( backend_name="mock", base_dir=tmp_path / "vms", network_manager=TapNetworkManager(enabled=False), ) workspace_id = str( manager.create_workspace( environment="debian:12-base", allow_host_compat=True, seed_path=seed_dir, )["workspace_id"] ) manager.exec_workspace( workspace_id, command="printf 'checkpoint\\n' > note.txt", timeout_seconds=30, ) created_snapshot = manager.create_snapshot(workspace_id, "checkpoint") assert created_snapshot["snapshot"]["snapshot_name"] == "checkpoint" listed = manager.list_snapshots(workspace_id) assert listed["count"] == 2 assert [snapshot["snapshot_name"] for snapshot in listed["snapshots"]] == [ "baseline", "checkpoint", ] manager.exec_workspace( workspace_id, command="printf 'after\\n' > note.txt", timeout_seconds=30, ) manager.start_service( workspace_id, "app", command="sh -lc 'touch .ready; while true; do sleep 60; done'", readiness={"type": "file", "path": ".ready"}, ) reset_to_snapshot = manager.reset_workspace(workspace_id, snapshot="checkpoint") assert reset_to_snapshot["workspace_reset"]["snapshot_name"] == "checkpoint" assert reset_to_snapshot["reset_count"] == 1 assert reset_to_snapshot["last_command"] is None assert reset_to_snapshot["command_count"] == 0 assert reset_to_snapshot["service_count"] == 0 assert reset_to_snapshot["running_service_count"] == 0 checkpoint_result = manager.exec_workspace( workspace_id, command="cat note.txt", timeout_seconds=30, ) assert checkpoint_result["stdout"] == "checkpoint\n" logs_after_snapshot_reset = manager.logs_workspace(workspace_id) assert logs_after_snapshot_reset["count"] == 1 reset_to_baseline = manager.reset_workspace(workspace_id) assert reset_to_baseline["workspace_reset"]["snapshot_name"] == "baseline" assert reset_to_baseline["reset_count"] == 2 assert reset_to_baseline["command_count"] == 0 assert reset_to_baseline["service_count"] == 0 assert manager.logs_workspace(workspace_id)["count"] == 0 baseline_result = manager.exec_workspace( workspace_id, command="cat note.txt", timeout_seconds=30, ) assert baseline_result["stdout"] == "seed\n" diff_payload = manager.diff_workspace(workspace_id) assert diff_payload["changed"] is False deleted_snapshot = manager.delete_snapshot(workspace_id, "checkpoint") assert deleted_snapshot["deleted"] is True listed_after_delete = manager.list_snapshots(workspace_id) assert [snapshot["snapshot_name"] for snapshot in listed_after_delete["snapshots"]] == [ "baseline" ] def test_workspace_snapshot_and_reset_require_baseline(tmp_path: Path) -> None: manager = VmManager( backend_name="mock", base_dir=tmp_path / "vms", network_manager=TapNetworkManager(enabled=False), ) workspace_id = str( manager.create_workspace( environment="debian:12-base", allow_host_compat=True, )["workspace_id"] ) baseline_path = tmp_path / "vms" / "workspaces" / workspace_id / "baseline" / "workspace.tar" baseline_path.unlink() with pytest.raises(RuntimeError, match="require[s]? a baseline snapshot"): manager.list_snapshots(workspace_id) with pytest.raises(RuntimeError, match="require[s]? a baseline snapshot"): manager.create_snapshot(workspace_id, "checkpoint") with pytest.raises(RuntimeError, match="require[s]? a baseline snapshot"): manager.delete_snapshot(workspace_id, "checkpoint") with pytest.raises(RuntimeError, match="require[s]? a baseline snapshot"): manager.reset_workspace(workspace_id) def test_workspace_delete_baseline_snapshot_is_rejected(tmp_path: Path) -> None: manager = VmManager( backend_name="mock", base_dir=tmp_path / "vms", network_manager=TapNetworkManager(enabled=False), ) workspace_id = str( manager.create_workspace( environment="debian:12-base", allow_host_compat=True, )["workspace_id"] ) with pytest.raises(ValueError, match="cannot delete the baseline snapshot"): manager.delete_snapshot(workspace_id, "baseline") def test_workspace_reset_recreates_stopped_workspace(tmp_path: Path) -> None: seed_dir = tmp_path / "seed" seed_dir.mkdir() (seed_dir / "note.txt").write_text("seed\n", encoding="utf-8") manager = VmManager( backend_name="mock", base_dir=tmp_path / "vms", network_manager=TapNetworkManager(enabled=False), ) workspace_id = str( manager.create_workspace( environment="debian:12-base", allow_host_compat=True, seed_path=seed_dir, )["workspace_id"] ) with manager._lock: # noqa: SLF001 workspace = manager._load_workspace_locked(workspace_id) # noqa: SLF001 workspace.state = "stopped" workspace.firecracker_pid = None manager._save_workspace_locked(workspace) # noqa: SLF001 reset_payload = manager.reset_workspace(workspace_id) assert reset_payload["state"] == "started" assert reset_payload["workspace_reset"]["snapshot_name"] == "baseline" result = manager.exec_workspace(workspace_id, command="cat note.txt", timeout_seconds=30) assert result["stdout"] == "seed\n" def test_workspace_reset_failure_leaves_workspace_stopped(tmp_path: Path) -> None: seed_dir = tmp_path / "seed" seed_dir.mkdir() (seed_dir / "note.txt").write_text("seed\n", encoding="utf-8") manager = VmManager( backend_name="mock", base_dir=tmp_path / "vms", network_manager=TapNetworkManager(enabled=False), ) workspace_id = str( manager.create_workspace( environment="debian:12-base", allow_host_compat=True, seed_path=seed_dir, )["workspace_id"] ) manager.create_snapshot(workspace_id, "checkpoint") def _failing_import_archive(*args: Any, **kwargs: Any) -> dict[str, Any]: del args, kwargs raise RuntimeError("boom") manager._backend.import_archive = _failing_import_archive # type: ignore[method-assign] # noqa: SLF001 with pytest.raises(RuntimeError, match="boom"): manager.reset_workspace(workspace_id, snapshot="checkpoint") with manager._lock: # noqa: SLF001 workspace = manager._load_workspace_locked(workspace_id) # noqa: SLF001 assert workspace.state == "stopped" assert workspace.firecracker_pid is None assert workspace.reset_count == 0 listed = manager.list_snapshots(workspace_id) assert [snapshot["snapshot_name"] for snapshot in listed["snapshots"]] == [ "baseline", "checkpoint", ] def test_workspace_export_helpers_preserve_directory_symlinks(tmp_path: Path) -> None: workspace_dir = tmp_path / "workspace" workspace_dir.mkdir() (workspace_dir / "note.txt").write_text("hello\n", encoding="utf-8") os.symlink("note.txt", workspace_dir / "note-link") (workspace_dir / "empty-dir").mkdir() archive_path = tmp_path / "workspace-export.tar" exported = vm_manager_module._prepare_workspace_export_archive( # noqa: SLF001 workspace_dir=workspace_dir, workspace_path=".", archive_path=archive_path, ) assert exported.artifact_type == "directory" output_dir = tmp_path / "output" extracted = vm_manager_module._extract_workspace_export_archive( # noqa: SLF001 archive_path, output_path=output_dir, artifact_type="directory", ) assert extracted["artifact_type"] == "directory" assert (output_dir / "note.txt").read_text(encoding="utf-8") == "hello\n" assert (output_dir / "note-link").is_symlink() assert os.readlink(output_dir / "note-link") == "note.txt" assert (output_dir / "empty-dir").is_dir() def test_workspace_export_helpers_validate_missing_path_and_existing_output(tmp_path: Path) -> None: workspace_dir = tmp_path / "workspace" workspace_dir.mkdir() (workspace_dir / "note.txt").write_text("hello\n", encoding="utf-8") with pytest.raises(RuntimeError, match="workspace path does not exist"): vm_manager_module._prepare_workspace_export_archive( # noqa: SLF001 workspace_dir=workspace_dir, workspace_path="missing.txt", archive_path=tmp_path / "missing.tar", ) archive_path = tmp_path / "note-export.tar" exported = vm_manager_module._prepare_workspace_export_archive( # noqa: SLF001 workspace_dir=workspace_dir, workspace_path="note.txt", archive_path=archive_path, ) output_path = tmp_path / "note.txt" output_path.write_text("already here\n", encoding="utf-8") with pytest.raises(RuntimeError, match="output_path already exists"): vm_manager_module._extract_workspace_export_archive( # noqa: SLF001 archive_path, output_path=output_path, artifact_type=exported.artifact_type, ) def test_diff_workspace_trees_reports_empty_binary_symlink_and_type_changes(tmp_path: Path) -> None: baseline_dir = tmp_path / "baseline" current_dir = tmp_path / "current" baseline_dir.mkdir() current_dir.mkdir() (baseline_dir / "modified.txt").write_text("before\n", encoding="utf-8") (current_dir / "modified.txt").write_text("after\n", encoding="utf-8") (baseline_dir / "deleted.txt").write_text("gone\n", encoding="utf-8") (current_dir / "added.txt").write_text("new\n", encoding="utf-8") (baseline_dir / "binary.bin").write_bytes(b"\x00before") (current_dir / "binary.bin").write_bytes(b"\x00after") os.symlink("link-target-old.txt", baseline_dir / "link") os.symlink("link-target-new.txt", current_dir / "link") (baseline_dir / "swap").mkdir() (current_dir / "swap").write_text("type changed\n", encoding="utf-8") (baseline_dir / "removed-empty").mkdir() (current_dir / "added-empty").mkdir() diff_payload = vm_manager_module._diff_workspace_trees( # noqa: SLF001 baseline_dir, current_dir, ) assert diff_payload["changed"] is True assert diff_payload["summary"] == { "total": 8, "added": 2, "modified": 3, "deleted": 2, "type_changed": 1, "text_patched": 3, "non_text": 5, } assert "--- a/modified.txt" in diff_payload["patch"] assert "+++ b/modified.txt" in diff_payload["patch"] assert "--- /dev/null" in diff_payload["patch"] assert "+++ b/added.txt" in diff_payload["patch"] assert "--- a/deleted.txt" in diff_payload["patch"] assert "+++ /dev/null" in diff_payload["patch"] entries = {entry["path"]: entry for entry in diff_payload["entries"]} assert entries["binary.bin"]["text_patch"] is None assert entries["link"]["artifact_type"] == "symlink" assert entries["swap"]["artifact_type"] == "file" assert entries["removed-empty"]["artifact_type"] == "directory" assert entries["added-empty"]["artifact_type"] == "directory" def test_diff_workspace_trees_unchanged_returns_empty_summary(tmp_path: Path) -> None: baseline_dir = tmp_path / "baseline" current_dir = tmp_path / "current" baseline_dir.mkdir() current_dir.mkdir() (baseline_dir / "note.txt").write_text("same\n", encoding="utf-8") (current_dir / "note.txt").write_text("same\n", encoding="utf-8") diff_payload = vm_manager_module._diff_workspace_trees( # noqa: SLF001 baseline_dir, current_dir, ) assert diff_payload == { "changed": False, "summary": { "total": 0, "added": 0, "modified": 0, "deleted": 0, "type_changed": 0, "text_patched": 0, "non_text": 0, }, "entries": [], "patch": "", } def test_workspace_shell_lifecycle_and_rehydration(tmp_path: Path) -> None: manager = VmManager( backend_name="mock", base_dir=tmp_path / "vms", network_manager=TapNetworkManager(enabled=False), ) created = manager.create_workspace( environment="debian:12-base", allow_host_compat=True, ) workspace_id = str(created["workspace_id"]) opened = manager.open_shell(workspace_id) shell_id = str(opened["shell_id"]) assert opened["state"] == "running" manager.write_shell(workspace_id, shell_id, input_text="pwd") output = "" deadline = time.time() + 5 while time.time() < deadline: read = manager.read_shell(workspace_id, shell_id, cursor=0, max_chars=65536) output = str(read["output"]) if "/workspace" in output: break time.sleep(0.05) assert "/workspace" in output manager_rehydrated = VmManager( backend_name="mock", base_dir=tmp_path / "vms", network_manager=TapNetworkManager(enabled=False), ) second_opened = manager_rehydrated.open_shell(workspace_id) second_shell_id = str(second_opened["shell_id"]) assert second_shell_id != shell_id manager_rehydrated.write_shell(workspace_id, second_shell_id, input_text="printf 'ok\\n'") second_output = "" deadline = time.time() + 5 while time.time() < deadline: read = manager_rehydrated.read_shell( workspace_id, second_shell_id, cursor=0, max_chars=65536, ) second_output = str(read["output"]) if "ok" in second_output: break time.sleep(0.05) assert "ok" in second_output logs = manager.logs_workspace(workspace_id) assert logs["count"] == 0 closed = manager.close_shell(workspace_id, shell_id) assert closed["closed"] is True with pytest.raises(ValueError, match="does not exist"): manager.read_shell(workspace_id, shell_id) deleted = manager.delete_workspace(workspace_id) assert deleted["deleted"] is True with pytest.raises(ValueError, match="does not exist"): manager_rehydrated.read_shell(workspace_id, second_shell_id) def test_workspace_read_shell_plain_renders_control_sequences( tmp_path: Path, monkeypatch: pytest.MonkeyPatch, ) -> None: manager = VmManager( backend_name="mock", base_dir=tmp_path / "vms", network_manager=TapNetworkManager(enabled=False), ) created = manager.create_workspace( environment="debian:12-base", allow_host_compat=True, ) workspace_id = str(created["workspace_id"]) opened = manager.open_shell(workspace_id) shell_id = str(opened["shell_id"]) monkeypatch.setattr( manager._backend, # noqa: SLF001 "read_shell", lambda *args, **kwargs: { "shell_id": shell_id, "cwd": "/workspace", "cols": 120, "rows": 30, "state": "running", "started_at": 1.0, "ended_at": None, "exit_code": None, "execution_mode": "host_compat", "cursor": 0, "next_cursor": 15, "output": "hello\r\x1b[2Kbye\n", "truncated": False, }, ) read = manager.read_shell( workspace_id, shell_id, cursor=0, max_chars=1024, plain=True, ) assert read["output"] == "bye\n" assert read["plain"] is True assert read["wait_for_idle_ms"] is None def test_workspace_read_shell_wait_for_idle_batches_output( tmp_path: Path, monkeypatch: pytest.MonkeyPatch, ) -> None: manager = VmManager( backend_name="mock", base_dir=tmp_path / "vms", network_manager=TapNetworkManager(enabled=False), ) created = manager.create_workspace( environment="debian:12-base", allow_host_compat=True, ) workspace_id = str(created["workspace_id"]) opened = manager.open_shell(workspace_id) shell_id = str(opened["shell_id"]) payloads = [ { "shell_id": shell_id, "cwd": "/workspace", "cols": 120, "rows": 30, "state": "running", "started_at": 1.0, "ended_at": None, "exit_code": None, "execution_mode": "host_compat", "cursor": 0, "next_cursor": 4, "output": "one\n", "truncated": False, }, { "shell_id": shell_id, "cwd": "/workspace", "cols": 120, "rows": 30, "state": "running", "started_at": 1.0, "ended_at": None, "exit_code": None, "execution_mode": "host_compat", "cursor": 4, "next_cursor": 8, "output": "two\n", "truncated": False, }, { "shell_id": shell_id, "cwd": "/workspace", "cols": 120, "rows": 30, "state": "running", "started_at": 1.0, "ended_at": None, "exit_code": None, "execution_mode": "host_compat", "cursor": 8, "next_cursor": 8, "output": "", "truncated": False, }, ] def fake_read_shell(*args: Any, **kwargs: Any) -> dict[str, Any]: del args, kwargs return payloads.pop(0) monotonic_values = iter([0.0, 0.05, 0.10, 0.41]) monkeypatch.setattr(manager._backend, "read_shell", fake_read_shell) # noqa: SLF001 monkeypatch.setattr(time, "monotonic", lambda: next(monotonic_values)) monkeypatch.setattr(time, "sleep", lambda _: None) read = manager.read_shell( workspace_id, shell_id, cursor=0, max_chars=1024, wait_for_idle_ms=300, ) assert read["output"] == "one\ntwo\n" assert read["next_cursor"] == 8 assert read["wait_for_idle_ms"] == 300 assert read["plain"] is False def test_workspace_create_rejects_unsafe_seed_archive(tmp_path: Path) -> None: archive_path = tmp_path / "bad.tgz" with tarfile.open(archive_path, "w:gz") as archive: payload = b"bad\n" info = tarfile.TarInfo(name="../escape.txt") info.size = len(payload) archive.addfile(info, io.BytesIO(payload)) manager = VmManager( backend_name="mock", base_dir=tmp_path / "vms", network_manager=TapNetworkManager(enabled=False), ) with pytest.raises(RuntimeError, match="unsafe archive member path"): manager.create_workspace( environment="debian:12-base", allow_host_compat=True, seed_path=archive_path, ) assert list((tmp_path / "vms" / "workspaces").iterdir()) == [] def test_workspace_create_rejects_archive_that_writes_through_symlink(tmp_path: Path) -> None: archive_path = tmp_path / "bad-symlink.tgz" with tarfile.open(archive_path, "w:gz") as archive: symlink_info = tarfile.TarInfo(name="linked") symlink_info.type = tarfile.SYMTYPE symlink_info.linkname = "outside" archive.addfile(symlink_info) payload = b"bad\n" file_info = tarfile.TarInfo(name="linked/note.txt") file_info.size = len(payload) archive.addfile(file_info, io.BytesIO(payload)) manager = VmManager( backend_name="mock", base_dir=tmp_path / "vms", network_manager=TapNetworkManager(enabled=False), ) with pytest.raises(RuntimeError, match="traverse through a symlinked path"): manager.create_workspace( environment="debian:12-base", allow_host_compat=True, seed_path=archive_path, ) def test_workspace_create_cleans_up_on_seed_failure( tmp_path: Path, monkeypatch: pytest.MonkeyPatch ) -> None: source_dir = tmp_path / "seed" source_dir.mkdir() (source_dir / "note.txt").write_text("hello\n", encoding="utf-8") manager = VmManager( backend_name="mock", base_dir=tmp_path / "vms", network_manager=TapNetworkManager(enabled=False), ) def _boom(*args: Any, **kwargs: Any) -> dict[str, Any]: del args, kwargs raise RuntimeError("seed import failed") monkeypatch.setattr(manager._backend, "import_archive", _boom) # noqa: SLF001 with pytest.raises(RuntimeError, match="seed import failed"): manager.create_workspace( environment="debian:12-base", allow_host_compat=True, seed_path=source_dir, ) assert list((tmp_path / "vms" / "workspaces").iterdir()) == [] def test_workspace_rehydrates_across_manager_processes(tmp_path: Path) -> None: base_dir = tmp_path / "vms" manager = VmManager( backend_name="mock", base_dir=base_dir, network_manager=TapNetworkManager(enabled=False), ) workspace_id = str( manager.create_workspace( environment="debian:12-base", allow_host_compat=True, )["workspace_id"] ) other = VmManager( backend_name="mock", base_dir=base_dir, network_manager=TapNetworkManager(enabled=False), ) executed = other.exec_workspace(workspace_id, command="printf 'ok\\n'", timeout_seconds=30) assert executed["exit_code"] == 0 assert executed["stdout"] == "ok\n" logs = other.logs_workspace(workspace_id) assert logs["count"] == 1 def test_workspace_requires_started_state(tmp_path: Path) -> None: manager = VmManager( backend_name="mock", base_dir=tmp_path / "vms", network_manager=TapNetworkManager(enabled=False), ) workspace_id = str( manager.create_workspace( environment="debian:12-base", allow_host_compat=True, )["workspace_id"] ) workspace_path = tmp_path / "vms" / "workspaces" / workspace_id / "workspace.json" payload = json.loads(workspace_path.read_text(encoding="utf-8")) payload["state"] = "stopped" workspace_path.write_text(json.dumps(payload, indent=2, sort_keys=True), encoding="utf-8") with pytest.raises(RuntimeError, match="must be in 'started' state"): manager.exec_workspace(workspace_id, command="true", timeout_seconds=30) def test_vm_manager_firecracker_backend_path( tmp_path: Path, monkeypatch: pytest.MonkeyPatch ) -> None: class StubFirecrackerBackend: def __init__( self, environment_store: Any, firecracker_bin: Path, jailer_bin: Path, runtime_capabilities: Any, network_manager: TapNetworkManager, ) -> None: self.environment_store = environment_store self.firecracker_bin = firecracker_bin self.jailer_bin = jailer_bin self.runtime_capabilities = runtime_capabilities self.network_manager = network_manager def create(self, instance: Any) -> None: del instance def start(self, instance: Any) -> None: del instance def exec(self, instance: Any, command: str, timeout_seconds: int) -> Any: del instance, command, timeout_seconds return None def stop(self, instance: Any) -> None: del instance def delete(self, instance: Any) -> None: del instance monkeypatch.setattr(vm_manager_module, "FirecrackerBackend", StubFirecrackerBackend) manager = VmManager( backend_name="firecracker", base_dir=tmp_path / "vms", runtime_paths=resolve_runtime_paths(), network_manager=TapNetworkManager(enabled=False), ) assert manager._backend_name == "firecracker" # noqa: SLF001 def test_firecracker_backend_start_removes_stale_socket_files( tmp_path: Path, monkeypatch: pytest.MonkeyPatch, ) -> None: backend = cast(Any, object.__new__(vm_manager_module.FirecrackerBackend)) backend._environment_store = object() # noqa: SLF001 backend._firecracker_bin = tmp_path / "firecracker" # noqa: SLF001 backend._jailer_bin = tmp_path / "jailer" # noqa: SLF001 backend._runtime_capabilities = RuntimeCapabilities( # noqa: SLF001 supports_vm_boot=True, supports_guest_exec=True, supports_guest_network=False, reason=None, ) backend._network_manager = TapNetworkManager(enabled=False) # noqa: SLF001 backend._guest_exec_client = None # noqa: SLF001 backend._processes = {} # noqa: SLF001 backend._firecracker_bin.write_text("fc", encoding="utf-8") # noqa: SLF001 backend._jailer_bin.write_text("jailer", encoding="utf-8") # noqa: SLF001 kernel_image = tmp_path / "vmlinux" kernel_image.write_text("kernel", encoding="utf-8") rootfs_image = tmp_path / "rootfs.ext4" rootfs_image.write_bytes(b"rootfs") workdir = tmp_path / "runtime" workdir.mkdir() firecracker_socket = workdir / "firecracker.sock" vsock_socket = workdir / "vsock.sock" firecracker_socket.write_text("stale firecracker socket", encoding="utf-8") vsock_socket.write_text("stale vsock socket", encoding="utf-8") class DummyPopen: def __init__(self, *args: Any, **kwargs: Any) -> None: del args, kwargs self.pid = 4242 def poll(self) -> None: return None monkeypatch.setattr( cast(Any, vm_manager_module).subprocess, "run", lambda *args, **kwargs: subprocess.CompletedProcess( # noqa: ARG005 args=args[0], returncode=0, stdout="Firecracker v1.0.0\n", stderr="", ), ) monkeypatch.setattr(cast(Any, vm_manager_module).subprocess, "Popen", DummyPopen) instance = vm_manager_module.VmInstance( vm_id="abcd1234", environment="debian:12", vcpu_count=1, mem_mib=512, ttl_seconds=600, created_at=time.time(), expires_at=time.time() + 600, workdir=workdir, metadata={ "kernel_image": str(kernel_image), "rootfs_image": str(rootfs_image), }, ) backend.start(instance) assert instance.firecracker_pid == 4242 assert not firecracker_socket.exists() assert not vsock_socket.exists() def test_vm_manager_fails_closed_without_host_compat_opt_in(tmp_path: Path) -> None: manager = VmManager( backend_name="mock", base_dir=tmp_path / "vms", network_manager=TapNetworkManager(enabled=False), ) vm_id = str( manager.create_vm( environment="debian:12-base", ttl_seconds=600, )["vm_id"] ) with pytest.raises(RuntimeError, match="guest boot is unavailable"): manager.start_vm(vm_id) def test_vm_manager_uses_canonical_default_cache_dir( monkeypatch: pytest.MonkeyPatch, tmp_path: Path ) -> None: monkeypatch.setenv("PYRO_ENVIRONMENT_CACHE_DIR", str(tmp_path / "cache")) manager = VmManager( backend_name="mock", base_dir=tmp_path / "vms", network_manager=TapNetworkManager(enabled=False), ) assert manager._environment_store.cache_dir == tmp_path / "cache" # noqa: SLF001 def test_vm_manager_helper_round_trips() -> None: network = NetworkConfig( vm_id="abc123", tap_name="tap0", guest_ip="172.29.1.2", gateway_ip="172.29.1.1", subnet_cidr="172.29.1.0/24", mac_address="06:00:aa:bb:cc:dd", dns_servers=("1.1.1.1", "8.8.8.8"), ) assert vm_manager_module._optional_int(None) is None # noqa: SLF001 assert vm_manager_module._optional_int(True) == 1 # noqa: SLF001 assert vm_manager_module._optional_int(7) == 7 # noqa: SLF001 assert vm_manager_module._optional_int(7.2) == 7 # noqa: SLF001 assert vm_manager_module._optional_int("9") == 9 # noqa: SLF001 with pytest.raises(TypeError, match="integer-compatible"): vm_manager_module._optional_int(object()) # noqa: SLF001 assert vm_manager_module._optional_str(None) is None # noqa: SLF001 assert vm_manager_module._optional_str(1) == "1" # noqa: SLF001 assert vm_manager_module._optional_dict(None) is None # noqa: SLF001 assert vm_manager_module._optional_dict({"x": 1}) == {"x": 1} # noqa: SLF001 with pytest.raises(TypeError, match="dictionary payload"): vm_manager_module._optional_dict("bad") # noqa: SLF001 assert vm_manager_module._string_dict({"x": 1}) == {"x": "1"} # noqa: SLF001 assert vm_manager_module._string_dict("bad") == {} # noqa: SLF001 serialized = vm_manager_module._serialize_network(network) # noqa: SLF001 assert serialized is not None restored = vm_manager_module._deserialize_network(serialized) # noqa: SLF001 assert restored == network assert vm_manager_module._deserialize_network(None) is None # noqa: SLF001 with pytest.raises(TypeError, match="dictionary payload"): vm_manager_module._deserialize_network("bad") # noqa: SLF001 assert vm_manager_module._wrap_guest_command("echo hi") == "echo hi" # noqa: SLF001 wrapped = vm_manager_module._wrap_guest_command("echo hi", cwd="/workspace") # noqa: SLF001 assert "cd /workspace" in wrapped assert vm_manager_module._pid_is_running(None) is False # noqa: SLF001 def test_copy_rootfs_falls_back_to_copy2( tmp_path: Path, monkeypatch: pytest.MonkeyPatch ) -> None: source = tmp_path / "rootfs.ext4" source.write_text("payload", encoding="utf-8") dest = tmp_path / "dest" / "rootfs.ext4" def _raise_oserror(*args: Any, **kwargs: Any) -> Any: del args, kwargs raise OSError("no cp") monkeypatch.setattr(subprocess, "run", _raise_oserror) clone_mode = vm_manager_module._copy_rootfs(source, dest) # noqa: SLF001 assert clone_mode == "copy2" assert dest.read_text(encoding="utf-8") == "payload" def test_workspace_create_cleans_up_on_start_failure( tmp_path: Path, monkeypatch: pytest.MonkeyPatch ) -> None: manager = VmManager( backend_name="mock", base_dir=tmp_path / "vms", network_manager=TapNetworkManager(enabled=False), ) def _boom(instance: Any) -> None: del instance raise RuntimeError("boom") monkeypatch.setattr(manager._backend, "start", _boom) # noqa: SLF001 with pytest.raises(RuntimeError, match="boom"): manager.create_workspace(environment="debian:12-base", allow_host_compat=True) assert list((tmp_path / "vms" / "workspaces").iterdir()) == [] def test_exec_instance_wraps_guest_workspace_command(tmp_path: Path) -> None: manager = VmManager( backend_name="mock", base_dir=tmp_path / "vms", network_manager=TapNetworkManager(enabled=False), ) manager._runtime_capabilities = RuntimeCapabilities( # noqa: SLF001 supports_vm_boot=True, supports_guest_exec=True, supports_guest_network=False, reason=None, ) captured: dict[str, Any] = {} class StubBackend: def exec( self, instance: Any, command: str, timeout_seconds: int, *, workdir: Path | None = None, ) -> vm_manager_module.VmExecResult: del instance, timeout_seconds captured["command"] = command captured["workdir"] = workdir return vm_manager_module.VmExecResult( stdout="", stderr="", exit_code=0, duration_ms=1, ) manager._backend = StubBackend() # type: ignore[assignment] # noqa: SLF001 instance = vm_manager_module.VmInstance( # noqa: SLF001 vm_id="vm-123", environment="debian:12-base", vcpu_count=1, mem_mib=512, ttl_seconds=600, created_at=time.time(), expires_at=time.time() + 600, workdir=tmp_path / "runtime", state="started", ) result, execution_mode = manager._exec_instance( # noqa: SLF001 instance, command="echo hi", timeout_seconds=30, guest_cwd="/workspace", ) assert result.exit_code == 0 assert execution_mode == "unknown" assert "cd /workspace" in str(captured["command"]) assert captured["workdir"] is None def test_status_workspace_marks_dead_backing_process_stopped(tmp_path: Path) -> None: manager = VmManager( backend_name="mock", base_dir=tmp_path / "vms", network_manager=TapNetworkManager(enabled=False), ) workspace_id = str( manager.create_workspace( environment="debian:12-base", allow_host_compat=True, )["workspace_id"] ) workspace_path = tmp_path / "vms" / "workspaces" / workspace_id / "workspace.json" payload = json.loads(workspace_path.read_text(encoding="utf-8")) payload["metadata"]["execution_mode"] = "guest_vsock" payload["firecracker_pid"] = 999999 workspace_path.write_text(json.dumps(payload, indent=2, sort_keys=True), encoding="utf-8") status = manager.status_workspace(workspace_id) assert status["state"] == "stopped" updated_payload = json.loads(workspace_path.read_text(encoding="utf-8")) assert "backing guest process" in str(updated_payload.get("last_error", "")) def test_reap_expired_workspaces_removes_invalid_and_expired_records(tmp_path: Path) -> None: manager = VmManager( backend_name="mock", base_dir=tmp_path / "vms", network_manager=TapNetworkManager(enabled=False), ) invalid_dir = tmp_path / "vms" / "workspaces" / "invalid" invalid_dir.mkdir(parents=True) (invalid_dir / "workspace.json").write_text("[]", encoding="utf-8") workspace_id = str( manager.create_workspace( environment="debian:12-base", allow_host_compat=True, )["workspace_id"] ) workspace_path = tmp_path / "vms" / "workspaces" / workspace_id / "workspace.json" payload = json.loads(workspace_path.read_text(encoding="utf-8")) payload["expires_at"] = 0.0 workspace_path.write_text(json.dumps(payload, indent=2, sort_keys=True), encoding="utf-8") with manager._lock: # noqa: SLF001 manager._reap_expired_workspaces_locked(time.time()) # noqa: SLF001 assert not invalid_dir.exists() assert not (tmp_path / "vms" / "workspaces" / workspace_id).exists() def test_workspace_service_lifecycle_and_status_counts(tmp_path: Path) -> None: manager = VmManager( backend_name="mock", base_dir=tmp_path / "vms", network_manager=TapNetworkManager(enabled=False), ) workspace_id = str( manager.create_workspace( environment="debian:12-base", allow_host_compat=True, )["workspace_id"] ) started = manager.start_service( workspace_id, "app", command="sh -lc 'printf \"service ready\\n\"; touch .ready; while true; do sleep 60; done'", readiness={"type": "file", "path": ".ready"}, ) assert started["state"] == "running" listed = manager.list_services(workspace_id) assert listed["count"] == 1 assert listed["running_count"] == 1 status = manager.status_service(workspace_id, "app") assert status["state"] == "running" assert status["ready_at"] is not None logs = manager.logs_service(workspace_id, "app") assert "service ready" in str(logs["stdout"]) workspace_status = manager.status_workspace(workspace_id) assert workspace_status["service_count"] == 1 assert workspace_status["running_service_count"] == 1 stopped = manager.stop_service(workspace_id, "app") assert stopped["state"] == "stopped" assert stopped["stop_reason"] in {"sigterm", "sigkill"} deleted = manager.delete_workspace(workspace_id) assert deleted["deleted"] is True def test_workspace_create_serializes_network_policy(tmp_path: Path) -> None: manager = VmManager( backend_name="mock", base_dir=tmp_path / "vms", network_manager=TapNetworkManager(enabled=False), ) manager._runtime_capabilities = RuntimeCapabilities( # noqa: SLF001 supports_vm_boot=True, supports_guest_exec=True, supports_guest_network=True, ) manager._ensure_workspace_guest_bootstrap_support = lambda instance: None # type: ignore[method-assign] # noqa: SLF001 created = manager.create_workspace( environment="debian:12-base", network_policy="egress", ) assert created["network_policy"] == "egress" workspace_id = str(created["workspace_id"]) workspace_path = tmp_path / "vms" / "workspaces" / workspace_id / "workspace.json" payload = json.loads(workspace_path.read_text(encoding="utf-8")) assert payload["network_policy"] == "egress" def test_workspace_service_start_serializes_published_ports( tmp_path: Path, monkeypatch: pytest.MonkeyPatch, ) -> None: manager = VmManager( backend_name="mock", base_dir=tmp_path / "vms", network_manager=TapNetworkManager(enabled=False), ) manager._runtime_capabilities = RuntimeCapabilities( # noqa: SLF001 supports_vm_boot=True, supports_guest_exec=True, supports_guest_network=True, ) manager._ensure_workspace_guest_bootstrap_support = lambda instance: None # type: ignore[method-assign] # noqa: SLF001 created = manager.create_workspace( environment="debian:12-base", network_policy="egress+published-ports", allow_host_compat=True, ) workspace_id = str(created["workspace_id"]) workspace_path = tmp_path / "vms" / "workspaces" / workspace_id / "workspace.json" payload = json.loads(workspace_path.read_text(encoding="utf-8")) payload["network"] = { "vm_id": workspace_id, "tap_name": "tap-test0", "guest_ip": "172.29.1.2", "gateway_ip": "172.29.1.1", "subnet_cidr": "172.29.1.0/30", "mac_address": "06:00:ac:1d:01:02", "dns_servers": ["1.1.1.1"], } workspace_path.write_text(json.dumps(payload, indent=2, sort_keys=True), encoding="utf-8") monkeypatch.setattr( manager, "_start_workspace_service_published_ports", lambda **kwargs: [ vm_manager_module.WorkspacePublishedPortRecord( guest_port=8080, host_port=18080, host="127.0.0.1", protocol="tcp", proxy_pid=9999, ) ], ) monkeypatch.setattr( manager, "_refresh_workspace_liveness_locked", lambda workspace: None, ) started = manager.start_service( workspace_id, "web", command="sh -lc 'touch .ready && while true; do sleep 60; done'", readiness={"type": "file", "path": ".ready"}, published_ports=[{"guest_port": 8080, "host_port": 18080}], ) assert started["published_ports"] == [ { "host": "127.0.0.1", "host_port": 18080, "guest_port": 8080, "protocol": "tcp", } ] def test_workspace_service_start_rejects_published_ports_without_network_policy( tmp_path: Path, ) -> None: manager = VmManager( backend_name="mock", base_dir=tmp_path / "vms", network_manager=TapNetworkManager(enabled=False), ) workspace_id = str( manager.create_workspace( environment="debian:12-base", allow_host_compat=True, )["workspace_id"] ) with pytest.raises( RuntimeError, match="published ports require workspace network_policy 'egress\\+published-ports'", ): manager.start_service( workspace_id, "web", command="sh -lc 'touch .ready && while true; do sleep 60; done'", readiness={"type": "file", "path": ".ready"}, published_ports=[{"guest_port": 8080}], ) def test_workspace_service_start_rejects_published_ports_without_active_network( tmp_path: Path, monkeypatch: pytest.MonkeyPatch, ) -> None: manager = VmManager( backend_name="mock", base_dir=tmp_path / "vms", network_manager=TapNetworkManager(enabled=False), ) manager._runtime_capabilities = RuntimeCapabilities( # noqa: SLF001 supports_vm_boot=True, supports_guest_exec=True, supports_guest_network=True, ) manager._ensure_workspace_guest_bootstrap_support = lambda instance: None # type: ignore[method-assign] # noqa: SLF001 monkeypatch.setattr( manager, "_refresh_workspace_liveness_locked", lambda workspace: None, ) workspace_id = str( manager.create_workspace( environment="debian:12-base", network_policy="egress+published-ports", allow_host_compat=True, )["workspace_id"] ) with pytest.raises(RuntimeError, match="published ports require an active guest network"): manager.start_service( workspace_id, "web", command="sh -lc 'touch .ready && while true; do sleep 60; done'", readiness={"type": "file", "path": ".ready"}, published_ports=[{"guest_port": 8080}], ) def test_workspace_service_start_published_port_failure_marks_service_failed( tmp_path: Path, monkeypatch: pytest.MonkeyPatch, ) -> None: manager = VmManager( backend_name="mock", base_dir=tmp_path / "vms", network_manager=TapNetworkManager(enabled=False), ) manager._runtime_capabilities = RuntimeCapabilities( # noqa: SLF001 supports_vm_boot=True, supports_guest_exec=True, supports_guest_network=True, ) manager._ensure_workspace_guest_bootstrap_support = lambda instance: None # type: ignore[method-assign] # noqa: SLF001 monkeypatch.setattr( manager, "_refresh_workspace_liveness_locked", lambda workspace: None, ) created = manager.create_workspace( environment="debian:12-base", network_policy="egress+published-ports", allow_host_compat=True, ) workspace_id = str(created["workspace_id"]) workspace_path = tmp_path / "vms" / "workspaces" / workspace_id / "workspace.json" payload = json.loads(workspace_path.read_text(encoding="utf-8")) payload["network"] = { "vm_id": workspace_id, "tap_name": "tap-test0", "guest_ip": "172.29.1.2", "gateway_ip": "172.29.1.1", "subnet_cidr": "172.29.1.0/30", "mac_address": "06:00:ac:1d:01:02", "dns_servers": ["1.1.1.1"], } workspace_path.write_text(json.dumps(payload, indent=2, sort_keys=True), encoding="utf-8") def _raise_proxy_failure( **kwargs: object, ) -> list[vm_manager_module.WorkspacePublishedPortRecord]: del kwargs raise RuntimeError("proxy boom") monkeypatch.setattr(manager, "_start_workspace_service_published_ports", _raise_proxy_failure) started = manager.start_service( workspace_id, "web", command="sh -lc 'touch .ready && while true; do sleep 60; done'", readiness={"type": "file", "path": ".ready"}, published_ports=[{"guest_port": 8080, "host_port": 18080}], ) assert started["state"] == "failed" assert started["stop_reason"] == "published_port_failed" assert started["published_ports"] == [] def test_workspace_service_cleanup_stops_published_port_proxies( tmp_path: Path, monkeypatch: pytest.MonkeyPatch, ) -> None: manager = VmManager( backend_name="mock", base_dir=tmp_path / "vms", network_manager=TapNetworkManager(enabled=False), ) workspace_id = "workspace-cleanup" service = vm_manager_module.WorkspaceServiceRecord( workspace_id=workspace_id, service_name="web", command="sleep 60", cwd="/workspace", state="running", pid=1234, started_at=time.time(), ended_at=None, exit_code=None, execution_mode="host_compat", readiness=None, ready_at=None, stop_reason=None, published_ports=[ vm_manager_module.WorkspacePublishedPortRecord( guest_port=8080, host_port=18080, proxy_pid=9999, ) ], ) manager._save_workspace_service_locked(service) # noqa: SLF001 stopped: list[int | None] = [] monkeypatch.setattr( vm_manager_module, "_stop_workspace_published_port_proxy", lambda published_port: stopped.append(published_port.proxy_pid), ) manager._delete_workspace_service_artifacts_locked(workspace_id, "web") # noqa: SLF001 assert stopped == [9999] assert not manager._workspace_service_record_path(workspace_id, "web").exists() # noqa: SLF001 def test_workspace_refresh_workspace_service_counts_stops_published_ports_when_stopped( tmp_path: Path, monkeypatch: pytest.MonkeyPatch, ) -> None: manager = VmManager( backend_name="mock", base_dir=tmp_path / "vms", network_manager=TapNetworkManager(enabled=False), ) workspace = vm_manager_module.WorkspaceRecord( workspace_id="workspace-counts", environment="debian:12-base", vcpu_count=1, mem_mib=1024, ttl_seconds=600, created_at=time.time(), expires_at=time.time() + 600, state="stopped", firecracker_pid=None, last_error=None, allow_host_compat=True, network_policy="off", metadata={}, command_count=0, last_command=None, workspace_seed={ "mode": "empty", "seed_path": None, "destination": "/workspace", "entry_count": 0, "bytes_written": 0, }, secrets=[], reset_count=0, last_reset_at=None, ) service = vm_manager_module.WorkspaceServiceRecord( workspace_id=workspace.workspace_id, service_name="web", command="sleep 60", cwd="/workspace", state="running", pid=1234, started_at=time.time(), ended_at=None, exit_code=None, execution_mode="host_compat", readiness=None, ready_at=None, stop_reason=None, published_ports=[ vm_manager_module.WorkspacePublishedPortRecord( guest_port=8080, host_port=18080, proxy_pid=9999, ) ], ) manager._save_workspace_service_locked(service) # noqa: SLF001 stopped: list[int | None] = [] monkeypatch.setattr( vm_manager_module, "_stop_workspace_published_port_proxy", lambda published_port: stopped.append(published_port.proxy_pid), ) manager._refresh_workspace_service_counts_locked(workspace) # noqa: SLF001 assert stopped == [9999] refreshed = manager._load_workspace_service_locked(workspace.workspace_id, "web") # noqa: SLF001 assert refreshed.state == "stopped" assert refreshed.stop_reason == "workspace_stopped" def test_workspace_published_port_proxy_helpers( tmp_path: Path, monkeypatch: pytest.MonkeyPatch, ) -> None: services_dir = tmp_path / "services" services_dir.mkdir(parents=True, exist_ok=True) class StubProcess: def __init__(self, pid: int, *, exited: bool = False) -> None: self.pid = pid self._exited = exited def poll(self) -> int | None: return 1 if self._exited else None def _fake_popen(command: list[str], **kwargs: object) -> StubProcess: del kwargs ready_file = Path(command[command.index("--ready-file") + 1]) ready_file.write_text( json.dumps( { "host": "127.0.0.1", "host_port": 18080, "target_host": "172.29.1.2", "target_port": 8080, "protocol": "tcp", } ), encoding="utf-8", ) return StubProcess(4242) monkeypatch.setattr(subprocess, "Popen", _fake_popen) record = vm_manager_module._start_workspace_published_port_proxy( # noqa: SLF001 services_dir=services_dir, service_name="web", workspace_id="workspace-proxy", guest_ip="172.29.1.2", spec=vm_manager_module.WorkspacePublishedPortSpec( guest_port=8080, host_port=18080, ), ) assert record.guest_port == 8080 assert record.host_port == 18080 assert record.proxy_pid == 4242 def test_workspace_published_port_proxy_timeout_and_stop( tmp_path: Path, monkeypatch: pytest.MonkeyPatch, ) -> None: services_dir = tmp_path / "services" services_dir.mkdir(parents=True, exist_ok=True) class StubProcess: pid = 4242 def poll(self) -> int | None: return None monkeypatch.setattr(subprocess, "Popen", lambda *args, **kwargs: StubProcess()) monotonic_values = iter([0.0, 0.0, 5.1]) monkeypatch.setattr(time, "monotonic", lambda: next(monotonic_values)) monkeypatch.setattr(time, "sleep", lambda _: None) stopped: list[int | None] = [] monkeypatch.setattr( vm_manager_module, "_stop_workspace_published_port_proxy", lambda published_port: stopped.append(published_port.proxy_pid), ) with pytest.raises(RuntimeError, match="timed out waiting for published port proxy readiness"): vm_manager_module._start_workspace_published_port_proxy( # noqa: SLF001 services_dir=services_dir, service_name="web", workspace_id="workspace-proxy", guest_ip="172.29.1.2", spec=vm_manager_module.WorkspacePublishedPortSpec( guest_port=8080, host_port=18080, ), ) assert stopped == [4242] def test_workspace_published_port_validation_and_stop_helper( monkeypatch: pytest.MonkeyPatch, ) -> None: spec = vm_manager_module._normalize_workspace_published_port( # noqa: SLF001 guest_port="8080", host_port="18080", ) assert spec.guest_port == 8080 assert spec.host_port == 18080 with pytest.raises(ValueError, match="published guest_port must be an integer"): vm_manager_module._normalize_workspace_published_port(guest_port=object()) # noqa: SLF001 with pytest.raises(ValueError, match="published host_port must be between 1025 and 65535"): vm_manager_module._normalize_workspace_published_port( # noqa: SLF001 guest_port=8080, host_port=80, ) signals: list[int] = [] monkeypatch.setattr(os, "killpg", lambda pid, sig: signals.append(sig)) running = iter([True, False]) monkeypatch.setattr(vm_manager_module, "_pid_is_running", lambda pid: next(running)) monkeypatch.setattr(time, "sleep", lambda _: None) vm_manager_module._stop_workspace_published_port_proxy( # noqa: SLF001 vm_manager_module.WorkspacePublishedPortRecord( guest_port=8080, host_port=18080, proxy_pid=9999, ) ) assert signals == [signal.SIGTERM] def test_workspace_network_policy_requires_guest_network_support(tmp_path: Path) -> None: manager = VmManager( backend_name="firecracker", base_dir=tmp_path / "vms", network_manager=TapNetworkManager(enabled=False), ) manager._runtime_capabilities = RuntimeCapabilities( # noqa: SLF001 supports_vm_boot=False, supports_guest_exec=False, supports_guest_network=False, reason="no guest network", ) with pytest.raises(RuntimeError, match="workspace network_policy requires guest networking"): manager._require_workspace_network_policy_support( # noqa: SLF001 network_policy="egress" ) def test_prepare_workspace_seed_rejects_missing_and_invalid_paths(tmp_path: Path) -> None: manager = VmManager( backend_name="mock", base_dir=tmp_path / "vms", network_manager=TapNetworkManager(enabled=False), ) with pytest.raises(ValueError, match="does not exist"): manager._prepare_workspace_seed(tmp_path / "missing") # noqa: SLF001 invalid_source = tmp_path / "seed.txt" invalid_source.write_text("seed", encoding="utf-8") with pytest.raises( ValueError, match="seed_path must be a directory or a .tar/.tar.gz/.tgz archive", ): manager._prepare_workspace_seed(invalid_source) # noqa: SLF001 def test_workspace_baseline_snapshot_requires_archive(tmp_path: Path) -> None: manager = VmManager( backend_name="mock", base_dir=tmp_path / "vms", network_manager=TapNetworkManager(enabled=False), ) created = manager.create_workspace( environment="debian:12", vcpu_count=1, mem_mib=512, ttl_seconds=600, allow_host_compat=True, ) workspace_id = str(created["workspace_id"]) baseline_path = tmp_path / "vms" / "workspaces" / workspace_id / "baseline" / "workspace.tar" baseline_path.unlink() workspace = manager._load_workspace_locked(workspace_id) # noqa: SLF001 with pytest.raises(RuntimeError, match="baseline snapshot"): manager._workspace_baseline_snapshot_locked(workspace) # noqa: SLF001 def test_workspace_snapshot_and_service_loaders_handle_invalid_payloads(tmp_path: Path) -> None: manager = VmManager( backend_name="mock", base_dir=tmp_path / "vms", network_manager=TapNetworkManager(enabled=False), ) workspace_id = "workspace-invalid" services_dir = tmp_path / "vms" / "workspaces" / workspace_id / "services" snapshots_dir = tmp_path / "vms" / "workspaces" / workspace_id / "snapshots" services_dir.mkdir(parents=True, exist_ok=True) snapshots_dir.mkdir(parents=True, exist_ok=True) (services_dir / "svc.json").write_text("[]", encoding="utf-8") (snapshots_dir / "snap.json").write_text("[]", encoding="utf-8") with pytest.raises(RuntimeError, match="service record"): manager._load_workspace_service_locked(workspace_id, "svc") # noqa: SLF001 with pytest.raises(RuntimeError, match="snapshot record"): manager._load_workspace_snapshot_locked(workspace_id, "snap") # noqa: SLF001 with pytest.raises(RuntimeError, match="snapshot record"): manager._load_workspace_snapshot_locked_optional(workspace_id, "snap") # noqa: SLF001 assert manager._load_workspace_snapshot_locked_optional(workspace_id, "missing") is None # noqa: SLF001 def test_workspace_shell_helpers_handle_missing_invalid_and_close_errors( tmp_path: Path, monkeypatch: pytest.MonkeyPatch, ) -> None: manager = VmManager( backend_name="mock", base_dir=tmp_path / "vms", network_manager=TapNetworkManager(enabled=False), ) created = manager.create_workspace( environment="debian:12", vcpu_count=1, mem_mib=512, ttl_seconds=600, allow_host_compat=True, ) workspace_id = str(created["workspace_id"]) assert manager._list_workspace_shells_locked(workspace_id) == [] # noqa: SLF001 shells_dir = tmp_path / "vms" / "workspaces" / workspace_id / "shells" shells_dir.mkdir(parents=True, exist_ok=True) (shells_dir / "invalid.json").write_text("[]", encoding="utf-8") assert manager._list_workspace_shells_locked(workspace_id) == [] # noqa: SLF001 shell = vm_manager_module.WorkspaceShellRecord( workspace_id=workspace_id, shell_id="shell-1", cwd="/workspace", cols=120, rows=30, state="running", started_at=time.time(), ) manager._save_workspace_shell_locked(shell) # noqa: SLF001 workspace = manager._load_workspace_locked(workspace_id) # noqa: SLF001 instance = workspace.to_instance( workdir=tmp_path / "vms" / "workspaces" / workspace_id / "runtime" ) def _raise_close(**kwargs: object) -> dict[str, object]: del kwargs raise RuntimeError("shell close boom") monkeypatch.setattr(manager._backend, "close_shell", _raise_close) manager._close_workspace_shells_locked(workspace, instance) # noqa: SLF001 assert manager._list_workspace_shells_locked(workspace_id) == [] # noqa: SLF001 def test_workspace_refresh_service_helpers_cover_exit_and_started_refresh( tmp_path: Path, monkeypatch: pytest.MonkeyPatch, ) -> None: manager = VmManager( backend_name="mock", base_dir=tmp_path / "vms", network_manager=TapNetworkManager(enabled=False), ) created = manager.create_workspace( environment="debian:12", vcpu_count=1, mem_mib=512, ttl_seconds=600, allow_host_compat=True, ) workspace_id = str(created["workspace_id"]) workspace_path = tmp_path / "vms" / "workspaces" / workspace_id / "workspace.json" payload = json.loads(workspace_path.read_text(encoding="utf-8")) payload["state"] = "started" payload["network"] = { "vm_id": workspace_id, "tap_name": "tap-test0", "guest_ip": "172.29.1.2", "gateway_ip": "172.29.1.1", "subnet_cidr": "172.29.1.0/30", "mac_address": "06:00:ac:1d:01:02", "dns_servers": ["1.1.1.1"], } workspace_path.write_text(json.dumps(payload, indent=2, sort_keys=True), encoding="utf-8") workspace = manager._load_workspace_locked(workspace_id) # noqa: SLF001 instance = workspace.to_instance( workdir=tmp_path / "vms" / "workspaces" / workspace_id / "runtime" ) service = vm_manager_module.WorkspaceServiceRecord( workspace_id=workspace_id, service_name="web", command="sleep 60", cwd="/workspace", state="running", started_at=time.time(), execution_mode="guest_vsock", published_ports=[ vm_manager_module.WorkspacePublishedPortRecord( guest_port=8080, host_port=18080, proxy_pid=9999, ) ], ) manager._save_workspace_service_locked(service) # noqa: SLF001 stopped: list[int | None] = [] monkeypatch.setattr( vm_manager_module, "_stop_workspace_published_port_proxy", lambda published_port: stopped.append(published_port.proxy_pid), ) monkeypatch.setattr( manager._backend, "status_service", lambda *args, **kwargs: { "service_name": "web", "command": "sleep 60", "cwd": "/workspace", "state": "exited", "started_at": service.started_at, "ended_at": service.started_at + 1, "exit_code": 0, "execution_mode": "guest_vsock", }, ) refreshed = manager._refresh_workspace_service_locked( # noqa: SLF001 workspace, instance, service, ) assert refreshed.state == "exited" assert refreshed.published_ports == [ vm_manager_module.WorkspacePublishedPortRecord( guest_port=8080, host_port=18080, proxy_pid=None, ) ] assert stopped == [9999] manager._save_workspace_service_locked(service) # noqa: SLF001 refreshed_calls: list[str] = [] monkeypatch.setattr(manager, "_require_workspace_service_support", lambda instance: None) def _refresh_services( workspace: vm_manager_module.WorkspaceRecord, instance: vm_manager_module.VmInstance, ) -> list[vm_manager_module.WorkspaceServiceRecord]: del instance refreshed_calls.append(workspace.workspace_id) return [] monkeypatch.setattr( manager, "_refresh_workspace_services_locked", _refresh_services, ) manager._refresh_workspace_service_counts_locked(workspace) # noqa: SLF001 assert refreshed_calls == [workspace_id] def test_workspace_start_published_ports_cleans_up_partial_failure( tmp_path: Path, monkeypatch: pytest.MonkeyPatch, ) -> None: manager = VmManager( backend_name="mock", base_dir=tmp_path / "vms", network_manager=TapNetworkManager(enabled=False), ) created = manager.create_workspace( environment="debian:12", vcpu_count=1, mem_mib=512, ttl_seconds=600, allow_host_compat=True, ) workspace = manager._load_workspace_locked(str(created["workspace_id"])) # noqa: SLF001 service = vm_manager_module.WorkspaceServiceRecord( workspace_id=workspace.workspace_id, service_name="web", command="sleep 60", cwd="/workspace", state="running", started_at=time.time(), execution_mode="guest_vsock", ) started_record = vm_manager_module.WorkspacePublishedPortRecord( guest_port=8080, host_port=18080, proxy_pid=9999, ) calls: list[int] = [] def _start_proxy(**kwargs: object) -> vm_manager_module.WorkspacePublishedPortRecord: spec = cast(vm_manager_module.WorkspacePublishedPortSpec, kwargs["spec"]) if spec.guest_port == 8080: return started_record raise RuntimeError("proxy boom") monkeypatch.setattr(vm_manager_module, "_start_workspace_published_port_proxy", _start_proxy) monkeypatch.setattr( vm_manager_module, "_stop_workspace_published_port_proxy", lambda published_port: calls.append(published_port.proxy_pid or -1), ) with pytest.raises(RuntimeError, match="proxy boom"): manager._start_workspace_service_published_ports( # noqa: SLF001 workspace=workspace, service=service, guest_ip="172.29.1.2", published_ports=[ vm_manager_module.WorkspacePublishedPortSpec(guest_port=8080), vm_manager_module.WorkspacePublishedPortSpec(guest_port=9090), ], ) assert calls == [9999] def test_workspace_service_start_replaces_non_running_record(tmp_path: Path) -> None: manager = VmManager( backend_name="mock", base_dir=tmp_path / "vms", network_manager=TapNetworkManager(enabled=False), ) workspace_id = str( manager.create_workspace( environment="debian:12-base", allow_host_compat=True, )["workspace_id"] ) failed = manager.start_service( workspace_id, "app", command="sh -lc 'exit 2'", readiness={"type": "file", "path": ".ready"}, ready_timeout_seconds=1, ready_interval_ms=50, ) assert failed["state"] == "failed" started = manager.start_service( workspace_id, "app", command="sh -lc 'touch .ready; while true; do sleep 60; done'", readiness={"type": "file", "path": ".ready"}, ) assert started["state"] == "running" manager.delete_workspace(workspace_id) def test_workspace_service_supports_command_readiness_and_helper_probes( tmp_path: Path, monkeypatch: pytest.MonkeyPatch ) -> None: manager = VmManager( backend_name="mock", base_dir=tmp_path / "vms", network_manager=TapNetworkManager(enabled=False), ) workspace_id = str( manager.create_workspace( environment="debian:12-base", allow_host_compat=True, )["workspace_id"] ) command_started = manager.start_service( workspace_id, "command-ready", command="sh -lc 'touch command.ready; while true; do sleep 60; done'", readiness={"type": "command", "command": "test -f command.ready"}, ) assert command_started["state"] == "running" listed = manager.list_services(workspace_id) assert listed["count"] == 1 assert listed["running_count"] == 1 status = manager.status_workspace(workspace_id) assert status["service_count"] == 1 assert status["running_service_count"] == 1 assert manager.stop_service(workspace_id, "command-ready")["state"] == "stopped" workspace_dir = tmp_path / "vms" / "workspaces" / workspace_id / "workspace" ready_file = workspace_dir / "probe.ready" ready_file.write_text("ok\n", encoding="utf-8") assert vm_manager_module._service_ready_on_host( # noqa: SLF001 readiness={"type": "file", "path": "/workspace/probe.ready"}, workspace_dir=workspace_dir, cwd=workspace_dir, ) class StubSocket: def __enter__(self) -> StubSocket: return self def __exit__(self, *args: object) -> None: del args def settimeout(self, timeout: int) -> None: assert timeout == 1 def connect(self, address: tuple[str, int]) -> None: assert address == ("127.0.0.1", 8080) monkeypatch.setattr("pyro_mcp.vm_manager.socket.socket", lambda *args: StubSocket()) assert vm_manager_module._service_ready_on_host( # noqa: SLF001 readiness={"type": "tcp", "address": "127.0.0.1:8080"}, workspace_dir=workspace_dir, cwd=workspace_dir, ) class StubResponse: status = 204 def __enter__(self) -> StubResponse: return self def __exit__(self, *args: object) -> None: del args def _urlopen(request: object, timeout: int) -> StubResponse: del request assert timeout == 2 return StubResponse() monkeypatch.setattr("pyro_mcp.vm_manager.urllib.request.urlopen", _urlopen) assert vm_manager_module._service_ready_on_host( # noqa: SLF001 readiness={"type": "http", "url": "http://127.0.0.1:8080/"}, workspace_dir=workspace_dir, cwd=workspace_dir, ) def test_workspace_service_logs_tail_and_delete_cleanup(tmp_path: Path) -> None: manager = VmManager( backend_name="mock", base_dir=tmp_path / "vms", network_manager=TapNetworkManager(enabled=False), ) workspace_id = str( manager.create_workspace( environment="debian:12-base", allow_host_compat=True, )["workspace_id"] ) manager.start_service( workspace_id, "logger", command=( "sh -lc 'printf \"one\\n\"; printf \"two\\n\"; " "touch .ready; while true; do sleep 60; done'" ), readiness={"type": "file", "path": ".ready"}, ) logs = manager.logs_service(workspace_id, "logger", tail_lines=1) assert logs["stdout"] == "two\n" assert logs["truncated"] is True services_dir = tmp_path / "vms" / "workspaces" / workspace_id / "services" assert services_dir.exists() deleted = manager.delete_workspace(workspace_id) assert deleted["deleted"] is True assert not services_dir.exists() def test_workspace_status_stops_service_counts_when_workspace_is_stopped(tmp_path: Path) -> None: manager = VmManager( backend_name="mock", base_dir=tmp_path / "vms", network_manager=TapNetworkManager(enabled=False), ) workspace_id = str( manager.create_workspace( environment="debian:12-base", allow_host_compat=True, )["workspace_id"] ) manager.start_service( workspace_id, "app", command="sh -lc 'touch .ready; while true; do sleep 60; done'", readiness={"type": "file", "path": ".ready"}, ) service_path = tmp_path / "vms" / "workspaces" / workspace_id / "services" / "app.json" live_service_payload = json.loads(service_path.read_text(encoding="utf-8")) live_pid = int(live_service_payload["pid"]) try: workspace_path = tmp_path / "vms" / "workspaces" / workspace_id / "workspace.json" payload = json.loads(workspace_path.read_text(encoding="utf-8")) payload["state"] = "stopped" workspace_path.write_text(json.dumps(payload, indent=2, sort_keys=True), encoding="utf-8") status = manager.status_workspace(workspace_id) assert status["state"] == "stopped" assert status["service_count"] == 1 assert status["running_service_count"] == 0 service_payload = json.loads(service_path.read_text(encoding="utf-8")) assert service_payload["state"] == "stopped" assert service_payload["stop_reason"] == "workspace_stopped" finally: vm_manager_module._stop_process_group(live_pid) # noqa: SLF001 def test_workspace_service_readiness_validation_helpers() -> None: assert vm_manager_module._normalize_workspace_service_name("app-1") == "app-1" # noqa: SLF001 with pytest.raises(ValueError, match="service_name must not be empty"): vm_manager_module._normalize_workspace_service_name(" ") # noqa: SLF001 with pytest.raises(ValueError, match="service_name must match"): vm_manager_module._normalize_workspace_service_name("bad name") # noqa: SLF001 assert vm_manager_module._normalize_workspace_service_readiness( # noqa: SLF001 {"type": "file", "path": "subdir/.ready"} ) == {"type": "file", "path": "/workspace/subdir/.ready"} assert vm_manager_module._normalize_workspace_service_readiness( # noqa: SLF001 {"type": "tcp", "address": "127.0.0.1:8080"} ) == {"type": "tcp", "address": "127.0.0.1:8080"} assert vm_manager_module._normalize_workspace_service_readiness( # noqa: SLF001 {"type": "http", "url": "http://127.0.0.1:8080/"} ) == {"type": "http", "url": "http://127.0.0.1:8080/"} assert vm_manager_module._normalize_workspace_service_readiness( # noqa: SLF001 {"type": "command", "command": "test -f .ready"} ) == {"type": "command", "command": "test -f .ready"} with pytest.raises(ValueError, match="one of: file, tcp, http, command"): vm_manager_module._normalize_workspace_service_readiness({"type": "bogus"}) # noqa: SLF001 with pytest.raises(ValueError, match="required for file readiness"): vm_manager_module._normalize_workspace_service_readiness({"type": "file"}) # noqa: SLF001 with pytest.raises(ValueError, match="HOST:PORT format"): vm_manager_module._normalize_workspace_service_readiness( # noqa: SLF001 {"type": "tcp", "address": "127.0.0.1"} ) with pytest.raises(ValueError, match="required for http readiness"): vm_manager_module._normalize_workspace_service_readiness({"type": "http"}) # noqa: SLF001 with pytest.raises(ValueError, match="required for command readiness"): vm_manager_module._normalize_workspace_service_readiness({"type": "command"}) # noqa: SLF001 def test_workspace_service_text_and_exit_code_helpers(tmp_path: Path) -> None: status_path = tmp_path / "service.status" assert vm_manager_module._read_service_exit_code(status_path) is None # noqa: SLF001 status_path.write_text("", encoding="utf-8") assert vm_manager_module._read_service_exit_code(status_path) is None # noqa: SLF001 status_path.write_text("7\n", encoding="utf-8") assert vm_manager_module._read_service_exit_code(status_path) == 7 # noqa: SLF001 log_path = tmp_path / "service.log" assert vm_manager_module._tail_text(log_path, tail_lines=10) == ("", False) # noqa: SLF001 log_path.write_text("one\ntwo\nthree\n", encoding="utf-8") assert vm_manager_module._tail_text(log_path, tail_lines=None) == ( # noqa: SLF001 "one\ntwo\nthree\n", False, ) assert vm_manager_module._tail_text(log_path, tail_lines=5) == ( # noqa: SLF001 "one\ntwo\nthree\n", False, ) assert vm_manager_module._tail_text(log_path, tail_lines=1) == ("three\n", True) # noqa: SLF001 def test_workspace_service_process_group_helpers(monkeypatch: pytest.MonkeyPatch) -> None: def _missing(_pid: int, _signal: int) -> None: raise ProcessLookupError() monkeypatch.setattr("pyro_mcp.vm_manager.os.killpg", _missing) assert vm_manager_module._stop_process_group(123) == (False, False) # noqa: SLF001 kill_calls: list[int] = [] monotonic_values = iter([0.0, 0.0, 5.0, 5.0, 10.0]) running_states = iter([True, True, False]) def _killpg(_pid: int, signum: int) -> None: kill_calls.append(signum) def _monotonic() -> float: return next(monotonic_values) def _is_running(_pid: int | None) -> bool: return next(running_states) monkeypatch.setattr("pyro_mcp.vm_manager.os.killpg", _killpg) monkeypatch.setattr("pyro_mcp.vm_manager.time.monotonic", _monotonic) monkeypatch.setattr("pyro_mcp.vm_manager.time.sleep", lambda _seconds: None) monkeypatch.setattr("pyro_mcp.vm_manager._pid_is_running", _is_running) stopped, killed = vm_manager_module._stop_process_group(456, wait_seconds=5) # noqa: SLF001 assert (stopped, killed) == (True, True) assert kill_calls == [signal.SIGTERM, signal.SIGKILL] def test_pid_is_running_treats_zombies_as_stopped(monkeypatch: pytest.MonkeyPatch) -> None: monkeypatch.setattr(vm_manager_module, "_linux_process_state", lambda _pid: "Z") assert vm_manager_module._pid_is_running(123) is False # noqa: SLF001 def test_workspace_service_probe_and_refresh_helpers( tmp_path: Path, monkeypatch: pytest.MonkeyPatch ) -> None: assert vm_manager_module._run_service_probe_command(tmp_path, "exit 3") == 3 # noqa: SLF001 services_dir = tmp_path / "services" services_dir.mkdir() status_path = services_dir / "app.status" status_path.write_text("9\n", encoding="utf-8") running = vm_manager_module.WorkspaceServiceRecord( # noqa: SLF001 workspace_id="workspace-1", service_name="app", command="sleep 60", cwd="/workspace", state="running", started_at=time.time(), readiness=None, ready_at=None, ended_at=None, exit_code=None, pid=1234, execution_mode="host_compat", stop_reason=None, ) monkeypatch.setattr("pyro_mcp.vm_manager._pid_is_running", lambda _pid: False) refreshed = vm_manager_module._refresh_local_service_record( # noqa: SLF001 running, services_dir=services_dir, ) assert refreshed.state == "exited" assert refreshed.exit_code == 9 monkeypatch.setattr( "pyro_mcp.vm_manager._stop_process_group", lambda _pid: (True, False), ) stopped = vm_manager_module._stop_local_service( # noqa: SLF001 refreshed, services_dir=services_dir, ) assert stopped.state == "stopped" assert stopped.stop_reason == "sigterm" def test_workspace_secrets_redact_exec_shell_service_and_survive_reset(tmp_path: Path) -> None: manager = VmManager( backend_name="mock", base_dir=tmp_path / "vms", network_manager=TapNetworkManager(enabled=False), ) secret_file = tmp_path / "token.txt" secret_file.write_text("from-file\n", encoding="utf-8") created = manager.create_workspace( environment="debian:12-base", allow_host_compat=True, secrets=[ {"name": "API_TOKEN", "value": "expected"}, {"name": "FILE_TOKEN", "file_path": str(secret_file)}, ], ) workspace_id = str(created["workspace_id"]) assert created["secrets"] == [ {"name": "API_TOKEN", "source_kind": "literal"}, {"name": "FILE_TOKEN", "source_kind": "file"}, ] no_secret = manager.exec_workspace( workspace_id, command='sh -lc \'printf "%s" "${API_TOKEN:-missing}"\'', timeout_seconds=30, ) assert no_secret["stdout"] == "missing" executed = manager.exec_workspace( workspace_id, command='sh -lc \'printf "%s\\n" "$API_TOKEN"\'', timeout_seconds=30, secret_env={"API_TOKEN": "API_TOKEN"}, ) assert executed["stdout"] == "[REDACTED]\n" logs = manager.logs_workspace(workspace_id) assert logs["entries"][-1]["stdout"] == "[REDACTED]\n" shell = manager.open_shell( workspace_id, secret_env={"API_TOKEN": "API_TOKEN"}, ) shell_id = str(shell["shell_id"]) manager.write_shell(workspace_id, shell_id, input_text='printf "%s\\n" "$API_TOKEN"') output = "" deadline = time.time() + 5 while time.time() < deadline: read = manager.read_shell(workspace_id, shell_id, cursor=0, max_chars=65536) output = str(read["output"]) if "[REDACTED]" in output: break time.sleep(0.05) assert "[REDACTED]" in output manager.close_shell(workspace_id, shell_id) started = manager.start_service( workspace_id, "app", command=( 'sh -lc \'trap "exit 0" TERM; printf "%s\\n" "$API_TOKEN" >&2; ' 'touch .ready; while true; do sleep 60; done\'' ), readiness={"type": "file", "path": ".ready"}, secret_env={"API_TOKEN": "API_TOKEN"}, ) assert started["state"] == "running" service_logs = manager.logs_service(workspace_id, "app", tail_lines=None) assert "[REDACTED]" in str(service_logs["stderr"]) reset = manager.reset_workspace(workspace_id) assert reset["secrets"] == created["secrets"] after_reset = manager.exec_workspace( workspace_id, command='sh -lc \'printf "%s\\n" "$API_TOKEN"\'', timeout_seconds=30, secret_env={"API_TOKEN": "API_TOKEN"}, ) assert after_reset["stdout"] == "[REDACTED]\n" def test_workspace_secret_validation_helpers(tmp_path: Path) -> None: manager = VmManager( backend_name="mock", base_dir=tmp_path / "vms", network_manager=TapNetworkManager(enabled=False), ) assert vm_manager_module._normalize_workspace_secret_name("API_TOKEN") == "API_TOKEN" # noqa: SLF001 with pytest.raises(ValueError, match="secret name must match"): vm_manager_module._normalize_workspace_secret_name("bad-name") # noqa: SLF001 with pytest.raises(ValueError, match="must not be empty"): vm_manager_module._validate_workspace_secret_value("TOKEN", "") # noqa: SLF001 with pytest.raises(ValueError, match="duplicate secret name"): manager.create_workspace( environment="debian:12-base", allow_host_compat=True, secrets=[ {"name": "TOKEN", "value": "one"}, {"name": "TOKEN", "value": "two"}, ], ) def test_prepare_workspace_secrets_handles_file_inputs_and_validation_errors( tmp_path: Path, ) -> None: secrets_dir = tmp_path / "secrets" valid_file = tmp_path / "token.txt" valid_file.write_text("from-file\n", encoding="utf-8") invalid_utf8 = tmp_path / "invalid.bin" invalid_utf8.write_bytes(b"\xff\xfe") oversized = tmp_path / "oversized.txt" oversized.write_text( "x" * (vm_manager_module.WORKSPACE_SECRET_MAX_BYTES + 1), encoding="utf-8", ) records, values = vm_manager_module._prepare_workspace_secrets( # noqa: SLF001 [ {"name": "B_TOKEN", "value": "literal"}, {"name": "A_TOKEN", "file_path": str(valid_file)}, ], secrets_dir=secrets_dir, ) assert [record.name for record in records] == ["A_TOKEN", "B_TOKEN"] assert values == {"A_TOKEN": "from-file\n", "B_TOKEN": "literal"} assert (secrets_dir / "A_TOKEN.secret").read_text(encoding="utf-8") == "from-file\n" assert oct(secrets_dir.stat().st_mode & 0o777) == "0o700" assert oct((secrets_dir / "A_TOKEN.secret").stat().st_mode & 0o777) == "0o600" with pytest.raises(ValueError, match="must be a dictionary"): vm_manager_module._prepare_workspace_secrets( # noqa: SLF001 [cast(dict[str, str], "bad")], secrets_dir=tmp_path / "bad1", ) with pytest.raises(ValueError, match="missing 'name'"): vm_manager_module._prepare_workspace_secrets([{}], secrets_dir=tmp_path / "bad2") # noqa: SLF001 with pytest.raises(ValueError, match="exactly one of 'value' or 'file_path'"): vm_manager_module._prepare_workspace_secrets( # noqa: SLF001 [{"name": "TOKEN", "value": "x", "file_path": str(valid_file)}], secrets_dir=tmp_path / "bad3", ) with pytest.raises(ValueError, match="file_path must not be empty"): vm_manager_module._prepare_workspace_secrets( # noqa: SLF001 [{"name": "TOKEN", "file_path": " "}], secrets_dir=tmp_path / "bad4", ) with pytest.raises(ValueError, match="does not exist"): vm_manager_module._prepare_workspace_secrets( # noqa: SLF001 [{"name": "TOKEN", "file_path": str(tmp_path / "missing.txt")}], secrets_dir=tmp_path / "bad5", ) with pytest.raises(ValueError, match="must be valid UTF-8 text"): vm_manager_module._prepare_workspace_secrets( # noqa: SLF001 [{"name": "TOKEN", "file_path": str(invalid_utf8)}], secrets_dir=tmp_path / "bad6", ) with pytest.raises(ValueError, match="must be at most"): vm_manager_module._prepare_workspace_secrets( # noqa: SLF001 [{"name": "TOKEN", "file_path": str(oversized)}], secrets_dir=tmp_path / "bad7", ) def test_workspace_secrets_require_guest_exec_on_firecracker_runtime( tmp_path: Path, monkeypatch: pytest.MonkeyPatch, ) -> None: class StubFirecrackerBackend: def __init__(self, *args: Any, **kwargs: Any) -> None: del args, kwargs def create(self, instance: Any) -> None: del instance def start(self, instance: Any) -> None: del instance def stop(self, instance: Any) -> None: del instance def delete(self, instance: Any) -> None: del instance monkeypatch.setattr(vm_manager_module, "FirecrackerBackend", StubFirecrackerBackend) manager = VmManager( backend_name="firecracker", base_dir=tmp_path / "vms", runtime_paths=resolve_runtime_paths(), network_manager=TapNetworkManager(enabled=False), ) manager._runtime_capabilities = RuntimeCapabilities( # noqa: SLF001 supports_vm_boot=True, supports_guest_exec=False, supports_guest_network=False, reason="guest exec is unavailable", ) with pytest.raises(RuntimeError, match="workspace secrets require guest execution"): manager.create_workspace( environment="debian:12-base", allow_host_compat=True, secrets=[{"name": "TOKEN", "value": "expected"}], ) def test_workspace_stop_and_start_preserve_logs_and_clear_live_state(tmp_path: Path) -> None: manager = VmManager( backend_name="mock", base_dir=tmp_path / "vms", network_manager=TapNetworkManager(enabled=False), ) seed_dir = tmp_path / "seed" seed_dir.mkdir() (seed_dir / "note.txt").write_text("hello from seed\n", encoding="utf-8") created = manager.create_workspace( environment="debian:12-base", allow_host_compat=True, seed_path=seed_dir, ) workspace_id = str(created["workspace_id"]) manager.exec_workspace(workspace_id, command="cat note.txt", timeout_seconds=30) shell = manager.open_shell(workspace_id) shell_id = str(shell["shell_id"]) started_service = manager.start_service( workspace_id, "app", command='sh -lc \'touch .ready && trap "exit 0" TERM; while true; do sleep 60; done\'', readiness={"type": "file", "path": ".ready"}, ) assert started_service["state"] == "running" stopped = manager.stop_workspace(workspace_id) assert stopped["state"] == "stopped" assert stopped["command_count"] == 1 assert stopped["service_count"] == 0 assert stopped["running_service_count"] == 0 assert manager.logs_workspace(workspace_id)["count"] == 1 with pytest.raises(RuntimeError, match="must be in 'started' state"): manager.read_shell(workspace_id, shell_id, cursor=0, max_chars=1024) restarted = manager.start_workspace(workspace_id) assert restarted["state"] == "started" assert restarted["command_count"] == 1 assert restarted["service_count"] == 0 rerun = manager.exec_workspace(workspace_id, command="cat note.txt", timeout_seconds=30) assert rerun["stdout"] == "hello from seed\n" def test_workspace_read_shell_rejects_invalid_wait_for_idle_ms(tmp_path: Path) -> None: manager = VmManager( backend_name="mock", base_dir=tmp_path / "vms", network_manager=TapNetworkManager(enabled=False), ) created = manager.create_workspace( environment="debian:12-base", allow_host_compat=True, ) workspace_id = str(created["workspace_id"]) opened = manager.open_shell(workspace_id) shell_id = str(opened["shell_id"]) with pytest.raises(ValueError, match="wait_for_idle_ms must be between 1 and 10000"): manager.read_shell(workspace_id, shell_id, cursor=0, max_chars=1024, wait_for_idle_ms=0) def test_workspace_stop_flushes_guest_filesystem_before_stopping( tmp_path: Path, ) -> None: manager = VmManager( backend_name="mock", base_dir=tmp_path / "vms", network_manager=TapNetworkManager(enabled=False), ) created = manager.create_workspace( environment="debian:12-base", allow_host_compat=True, ) workspace_id = str(created["workspace_id"]) workspace_path = tmp_path / "vms" / "workspaces" / workspace_id / "workspace.json" payload = json.loads(workspace_path.read_text(encoding="utf-8")) payload["state"] = "started" payload["firecracker_pid"] = os.getpid() payload["metadata"]["execution_mode"] = "guest_vsock" payload["metadata"]["rootfs_image"] = str(_create_stopped_workspace_rootfs(tmp_path)) workspace_path.write_text(json.dumps(payload, indent=2, sort_keys=True), encoding="utf-8") calls: list[tuple[str, str]] = [] class StubBackend: def exec( self, instance: Any, command: str, timeout_seconds: int, *, workdir: Path | None = None, env: dict[str, str] | None = None, ) -> vm_manager_module.VmExecResult: del instance, timeout_seconds, workdir, env calls.append(("exec", command)) return vm_manager_module.VmExecResult( stdout="", stderr="", exit_code=0, duration_ms=1, ) def stop(self, instance: Any) -> None: del instance calls.append(("stop", "instance")) manager._backend = StubBackend() # type: ignore[assignment] # noqa: SLF001 manager._backend_name = "firecracker" # noqa: SLF001 manager._runtime_capabilities = RuntimeCapabilities( # noqa: SLF001 supports_vm_boot=True, supports_guest_exec=True, supports_guest_network=False, reason=None, ) stopped = manager.stop_workspace(workspace_id) assert calls == [("exec", "sync"), ("stop", "instance")] assert stopped["state"] == "stopped" def test_workspace_disk_operations_scrub_runtime_only_paths_and_export( tmp_path: Path, ) -> None: manager = VmManager( backend_name="mock", base_dir=tmp_path / "vms", network_manager=TapNetworkManager(enabled=False), ) rootfs_image = _create_stopped_workspace_rootfs(tmp_path) workspace_id = "workspace-disk-123" workspace = vm_manager_module.WorkspaceRecord( workspace_id=workspace_id, environment="debian:12-base", vcpu_count=1, mem_mib=512, ttl_seconds=600, created_at=time.time(), expires_at=time.time() + 600, state="stopped", network_policy="off", allow_host_compat=False, metadata={ "execution_mode": "guest_vsock", "rootfs_image": str(rootfs_image), "workspace_path": "/workspace", }, ) manager._save_workspace_locked(workspace) # noqa: SLF001 listed = manager.list_workspace_disk(workspace_id, path="/workspace", recursive=True) assert listed["path"] == "/workspace" listed_paths = {entry["path"] for entry in listed["entries"]} assert "/workspace/note.txt" in listed_paths assert "/workspace/src/child.txt" in listed_paths assert "/workspace/link" in listed_paths read_payload = manager.read_workspace_disk(workspace_id, path="note.txt", max_bytes=4096) assert read_payload["content"] == "hello from disk\n" assert read_payload["truncated"] is False run_listing = manager.list_workspace_disk(workspace_id, path="/run", recursive=True) run_paths = {entry["path"] for entry in run_listing["entries"]} assert "/run/pyro-secrets" not in run_paths assert "/run/pyro-services" not in run_paths exported_path = tmp_path / "workspace-copy.ext4" exported = manager.export_workspace_disk(workspace_id, output_path=exported_path) assert exported["disk_format"] == "ext4" assert exported_path.exists() assert exported_path.stat().st_size == int(exported["bytes_written"]) def test_workspace_disk_operations_reject_host_compat_workspaces(tmp_path: Path) -> None: manager = VmManager( backend_name="mock", base_dir=tmp_path / "vms", network_manager=TapNetworkManager(enabled=False), ) created = manager.create_workspace( environment="debian:12-base", allow_host_compat=True, ) workspace_id = str(created["workspace_id"]) manager.stop_workspace(workspace_id) with pytest.raises(RuntimeError, match="host_compat workspaces"): manager.export_workspace_disk(workspace_id, output_path=tmp_path / "workspace.ext4") with pytest.raises(RuntimeError, match="host_compat workspaces"): manager.list_workspace_disk(workspace_id) with pytest.raises(RuntimeError, match="host_compat workspaces"): manager.read_workspace_disk(workspace_id, path="note.txt")