pyro-mcp/tests/test_vm_manager.py
Thales Maciel ab02ae46c7 Add model-native workspace file operations
Remove shell-escaped file mutation from the stable workspace flow by adding explicit file and patch tools across the CLI, SDK, and MCP surfaces.

This adds workspace file list/read/write plus unified text patch application, backed by new guest and manager file primitives that stay scoped to started workspaces and /workspace only. Patch application is preflighted on the host, file writes stay text-only and bounded, and the existing diff/export/reset semantics remain intact.

The milestone also updates the 3.2.0 roadmap, public contract, docs, examples, and versioning, and includes focused coverage for the new helper module and dispatch paths.

Validation:
- uv lock
- UV_CACHE_DIR=.uv-cache make check
- UV_CACHE_DIR=.uv-cache make dist-check
- real guest-backed smoke for workspace file read, patch apply, exec, export, and delete
2026-03-12 22:03:25 -03:00

3079 lines
106 KiB
Python

from __future__ import annotations
import io
import json
import os
import signal
import subprocess
import tarfile
import time
from pathlib import Path
from typing import Any, cast
import pytest
import pyro_mcp.vm_manager as vm_manager_module
from pyro_mcp.runtime import RuntimeCapabilities, resolve_runtime_paths
from pyro_mcp.vm_manager import VmManager
from pyro_mcp.vm_network import NetworkConfig, TapNetworkManager
def _run_debugfs_write(rootfs_image: Path, command: str) -> None:
proc = subprocess.run( # noqa: S603
["debugfs", "-w", "-R", command, str(rootfs_image)],
text=True,
capture_output=True,
check=False,
)
if proc.returncode != 0:
message = proc.stderr.strip() or proc.stdout.strip() or command
raise RuntimeError(message)
def _create_stopped_workspace_rootfs(tmp_path: Path) -> Path:
rootfs_image = tmp_path / "workspace-rootfs.ext4"
with rootfs_image.open("wb") as handle:
handle.truncate(16 * 1024 * 1024)
proc = subprocess.run( # noqa: S603
["mkfs.ext4", "-F", str(rootfs_image)],
text=True,
capture_output=True,
check=False,
)
if proc.returncode != 0:
message = proc.stderr.strip() or proc.stdout.strip() or "mkfs.ext4 failed"
raise RuntimeError(message)
for directory in (
"/workspace",
"/workspace/src",
"/run",
"/run/pyro-secrets",
"/run/pyro-services",
):
_run_debugfs_write(rootfs_image, f"mkdir {directory}")
note_path = tmp_path / "note.txt"
note_path.write_text("hello from disk\n", encoding="utf-8")
child_path = tmp_path / "child.txt"
child_path.write_text("nested child\n", encoding="utf-8")
secret_path = tmp_path / "secret.txt"
secret_path.write_text("super-secret\n", encoding="utf-8")
service_path = tmp_path / "service.log"
service_path.write_text("service runtime\n", encoding="utf-8")
_run_debugfs_write(rootfs_image, f"write {note_path} /workspace/note.txt")
_run_debugfs_write(rootfs_image, f"write {child_path} /workspace/src/child.txt")
_run_debugfs_write(rootfs_image, "symlink /workspace/link note.txt")
_run_debugfs_write(rootfs_image, f"write {secret_path} /run/pyro-secrets/TOKEN")
_run_debugfs_write(rootfs_image, f"write {service_path} /run/pyro-services/app.log")
return rootfs_image
def test_vm_manager_lifecycle_and_auto_cleanup(tmp_path: Path) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
created = manager.create_vm(
environment="debian:12",
vcpu_count=1,
mem_mib=512,
ttl_seconds=600,
allow_host_compat=True,
)
vm_id = str(created["vm_id"])
started = manager.start_vm(vm_id)
assert started["state"] == "started"
executed = manager.exec_vm(vm_id, command="printf 'git version 2.43.0\\n'", timeout_seconds=30)
assert executed["exit_code"] == 0
assert executed["execution_mode"] == "host_compat"
assert "git version" in str(executed["stdout"])
with pytest.raises(ValueError, match="does not exist"):
manager.status_vm(vm_id)
def test_vm_manager_exec_timeout(tmp_path: Path) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
vm_id = str(
manager.create_vm(
environment="debian:12-base",
vcpu_count=1,
mem_mib=512,
ttl_seconds=600,
allow_host_compat=True,
)["vm_id"]
)
manager.start_vm(vm_id)
result = manager.exec_vm(vm_id, command="sleep 2", timeout_seconds=1)
assert result["exit_code"] == 124
assert "timed out" in str(result["stderr"])
def test_vm_manager_stop_and_delete(tmp_path: Path) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
vm_id = str(
manager.create_vm(
environment="debian:12-base",
vcpu_count=1,
mem_mib=512,
ttl_seconds=600,
allow_host_compat=True,
)["vm_id"]
)
manager.start_vm(vm_id)
stopped = manager.stop_vm(vm_id)
assert stopped["state"] == "stopped"
deleted = manager.delete_vm(vm_id)
assert deleted["deleted"] is True
def test_vm_manager_reaps_expired(tmp_path: Path) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
manager.MIN_TTL_SECONDS = 1
vm_id = str(
manager.create_vm(
environment="debian:12-base",
vcpu_count=1,
mem_mib=512,
ttl_seconds=1,
allow_host_compat=True,
)["vm_id"]
)
instance = manager._instances[vm_id] # noqa: SLF001
instance.expires_at = 0.0
result = manager.reap_expired()
assert result["count"] == 1
with pytest.raises(ValueError):
manager.status_vm(vm_id)
def test_vm_manager_reaps_started_vm(tmp_path: Path) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
manager.MIN_TTL_SECONDS = 1
vm_id = str(
manager.create_vm(
environment="debian:12-base",
vcpu_count=1,
mem_mib=512,
ttl_seconds=1,
allow_host_compat=True,
)["vm_id"]
)
manager.start_vm(vm_id)
manager._instances[vm_id].expires_at = 0.0 # noqa: SLF001
result = manager.reap_expired()
assert result["count"] == 1
@pytest.mark.parametrize(
("kwargs", "msg"),
[
({"vcpu_count": 0, "mem_mib": 512, "ttl_seconds": 600}, "vcpu_count must be between"),
({"vcpu_count": 1, "mem_mib": 64, "ttl_seconds": 600}, "mem_mib must be between"),
({"vcpu_count": 1, "mem_mib": 512, "ttl_seconds": 30}, "ttl_seconds must be between"),
],
)
def test_vm_manager_validates_limits(tmp_path: Path, kwargs: dict[str, Any], msg: str) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
with pytest.raises(ValueError, match=msg):
manager.create_vm(environment="debian:12-base", **kwargs)
def test_vm_manager_max_active_limit(tmp_path: Path) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
max_active_vms=1,
network_manager=TapNetworkManager(enabled=False),
)
manager.create_vm(
environment="debian:12-base",
vcpu_count=1,
mem_mib=512,
ttl_seconds=600,
allow_host_compat=True,
)
with pytest.raises(RuntimeError, match="max active VMs reached"):
manager.create_vm(
environment="debian:12-base",
vcpu_count=1,
mem_mib=512,
ttl_seconds=600,
allow_host_compat=True,
)
def test_vm_manager_state_validation(tmp_path: Path) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
vm_id = str(
manager.create_vm(
environment="debian:12-base",
vcpu_count=1,
mem_mib=512,
ttl_seconds=600,
allow_host_compat=True,
)["vm_id"]
)
with pytest.raises(RuntimeError, match="must be in 'started' state"):
manager.exec_vm(vm_id, command="echo hi", timeout_seconds=30)
with pytest.raises(ValueError, match="must be positive"):
manager.exec_vm(vm_id, command="echo hi", timeout_seconds=0)
manager.start_vm(vm_id)
with pytest.raises(RuntimeError, match="cannot be started from state"):
manager.start_vm(vm_id)
def test_vm_manager_status_expired_raises(tmp_path: Path) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
manager.MIN_TTL_SECONDS = 1
vm_id = str(
manager.create_vm(
environment="debian:12-base",
vcpu_count=1,
mem_mib=512,
ttl_seconds=1,
allow_host_compat=True,
)["vm_id"]
)
manager._instances[vm_id].expires_at = 0.0 # noqa: SLF001
with pytest.raises(RuntimeError, match="expired and was automatically deleted"):
manager.status_vm(vm_id)
def test_vm_manager_invalid_backend(tmp_path: Path) -> None:
with pytest.raises(ValueError, match="invalid backend"):
VmManager(
backend_name="nope",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
def test_vm_manager_network_info(tmp_path: Path) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
created = manager.create_vm(
environment="debian:12-base",
vcpu_count=1,
mem_mib=512,
ttl_seconds=600,
allow_host_compat=True,
)
vm_id = str(created["vm_id"])
status = manager.status_vm(vm_id)
info = manager.network_info_vm(vm_id)
assert status["network_enabled"] is False
assert status["guest_ip"] is None
assert info["network_enabled"] is False
def test_vm_manager_run_vm(tmp_path: Path) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
result = manager.run_vm(
environment="debian:12-base",
command="printf 'ok\\n'",
vcpu_count=1,
mem_mib=512,
timeout_seconds=30,
ttl_seconds=600,
network=False,
allow_host_compat=True,
)
assert int(result["exit_code"]) == 0
assert str(result["stdout"]) == "ok\n"
def test_workspace_lifecycle_and_logs(tmp_path: Path) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
created = manager.create_workspace(
environment="debian:12-base",
allow_host_compat=True,
)
workspace_id = str(created["workspace_id"])
assert created["state"] == "started"
assert created["workspace_path"] == "/workspace"
first = manager.exec_workspace(
workspace_id,
command="printf 'hello\\n' > note.txt",
timeout_seconds=30,
)
second = manager.exec_workspace(workspace_id, command="cat note.txt", timeout_seconds=30)
assert first["exit_code"] == 0
assert second["stdout"] == "hello\n"
status = manager.status_workspace(workspace_id)
assert status["command_count"] == 2
assert status["last_command"] is not None
logs = manager.logs_workspace(workspace_id)
assert logs["count"] == 2
entries = logs["entries"]
assert isinstance(entries, list)
assert entries[1]["stdout"] == "hello\n"
deleted = manager.delete_workspace(workspace_id)
assert deleted["deleted"] is True
with pytest.raises(ValueError, match="does not exist"):
manager.status_workspace(workspace_id)
def test_workspace_create_seeds_directory_source_into_workspace(tmp_path: Path) -> None:
source_dir = tmp_path / "seed"
source_dir.mkdir()
(source_dir / "note.txt").write_text("hello\n", encoding="utf-8")
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
created = manager.create_workspace(
environment="debian:12-base",
allow_host_compat=True,
seed_path=source_dir,
)
workspace_id = str(created["workspace_id"])
workspace_seed = created["workspace_seed"]
assert workspace_seed["mode"] == "directory"
assert workspace_seed["seed_path"] == str(source_dir.resolve())
executed = manager.exec_workspace(workspace_id, command="cat note.txt", timeout_seconds=30)
assert executed["stdout"] == "hello\n"
status = manager.status_workspace(workspace_id)
assert status["workspace_seed"]["mode"] == "directory"
assert status["workspace_seed"]["seed_path"] == str(source_dir.resolve())
def test_workspace_create_seeds_tar_archive_into_workspace(tmp_path: Path) -> None:
archive_path = tmp_path / "seed.tgz"
nested_dir = tmp_path / "src"
nested_dir.mkdir()
(nested_dir / "note.txt").write_text("archive\n", encoding="utf-8")
with tarfile.open(archive_path, "w:gz") as archive:
archive.add(nested_dir / "note.txt", arcname="note.txt")
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
created = manager.create_workspace(
environment="debian:12-base",
allow_host_compat=True,
seed_path=archive_path,
)
workspace_id = str(created["workspace_id"])
assert created["workspace_seed"]["mode"] == "tar_archive"
executed = manager.exec_workspace(workspace_id, command="cat note.txt", timeout_seconds=30)
assert executed["stdout"] == "archive\n"
def test_workspace_sync_push_updates_started_workspace(tmp_path: Path) -> None:
source_dir = tmp_path / "seed"
source_dir.mkdir()
(source_dir / "note.txt").write_text("hello\n", encoding="utf-8")
update_dir = tmp_path / "update"
update_dir.mkdir()
(update_dir / "more.txt").write_text("more\n", encoding="utf-8")
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
created = manager.create_workspace(
environment="debian:12-base",
allow_host_compat=True,
seed_path=source_dir,
)
workspace_id = str(created["workspace_id"])
synced = manager.push_workspace_sync(workspace_id, source_path=update_dir, dest="subdir")
assert synced["workspace_sync"]["mode"] == "directory"
assert synced["workspace_sync"]["destination"] == "/workspace/subdir"
executed = manager.exec_workspace(
workspace_id,
command="cat subdir/more.txt",
timeout_seconds=30,
)
assert executed["stdout"] == "more\n"
status = manager.status_workspace(workspace_id)
assert status["command_count"] == 1
assert status["workspace_seed"]["mode"] == "directory"
def test_workspace_sync_push_requires_started_workspace(tmp_path: Path) -> None:
source_dir = tmp_path / "seed"
source_dir.mkdir()
(source_dir / "note.txt").write_text("hello\n", encoding="utf-8")
update_dir = tmp_path / "update"
update_dir.mkdir()
(update_dir / "more.txt").write_text("more\n", encoding="utf-8")
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
created = manager.create_workspace(
environment="debian:12-base",
allow_host_compat=True,
seed_path=source_dir,
)
workspace_id = str(created["workspace_id"])
workspace_path = tmp_path / "vms" / "workspaces" / workspace_id / "workspace.json"
payload = json.loads(workspace_path.read_text(encoding="utf-8"))
payload["state"] = "stopped"
workspace_path.write_text(json.dumps(payload, indent=2, sort_keys=True), encoding="utf-8")
with pytest.raises(
RuntimeError,
match="must be in 'started' state before workspace_sync_push",
):
manager.push_workspace_sync(workspace_id, source_path=update_dir)
def test_workspace_sync_push_rejects_destination_outside_workspace(tmp_path: Path) -> None:
source_dir = tmp_path / "seed"
source_dir.mkdir()
(source_dir / "note.txt").write_text("hello\n", encoding="utf-8")
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
workspace_id = str(
manager.create_workspace(
environment="debian:12-base",
allow_host_compat=True,
)["workspace_id"]
)
with pytest.raises(ValueError, match="workspace destination must stay inside /workspace"):
manager.push_workspace_sync(workspace_id, source_path=source_dir, dest="../escape")
def test_workspace_diff_and_export_round_trip(tmp_path: Path) -> None:
seed_dir = tmp_path / "seed"
seed_dir.mkdir()
(seed_dir / "note.txt").write_text("hello\n", encoding="utf-8")
update_dir = tmp_path / "update"
update_dir.mkdir()
(update_dir / "note.txt").write_text("hello from sync\n", encoding="utf-8")
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
workspace_id = str(
manager.create_workspace(
environment="debian:12-base",
allow_host_compat=True,
seed_path=seed_dir,
)["workspace_id"]
)
manager.push_workspace_sync(workspace_id, source_path=update_dir)
diff_payload = manager.diff_workspace(workspace_id)
assert diff_payload["workspace_id"] == workspace_id
assert diff_payload["changed"] is True
assert diff_payload["summary"]["modified"] == 1
assert diff_payload["summary"]["text_patched"] == 1
assert "-hello\n" in diff_payload["patch"]
assert "+hello from sync\n" in diff_payload["patch"]
output_path = tmp_path / "exported-note.txt"
export_payload = manager.export_workspace(
workspace_id,
path="note.txt",
output_path=output_path,
)
assert export_payload["workspace_id"] == workspace_id
assert export_payload["artifact_type"] == "file"
assert output_path.read_text(encoding="utf-8") == "hello from sync\n"
status = manager.status_workspace(workspace_id)
logs = manager.logs_workspace(workspace_id)
assert status["command_count"] == 0
assert logs["count"] == 0
def test_workspace_file_ops_and_patch_round_trip(tmp_path: Path) -> None:
seed_dir = tmp_path / "seed"
seed_dir.mkdir()
src_dir = seed_dir / "src"
src_dir.mkdir()
(src_dir / "app.py").write_text('print("bug")\n', encoding="utf-8")
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
workspace_id = str(
manager.create_workspace(
environment="debian:12-base",
allow_host_compat=True,
seed_path=seed_dir,
)["workspace_id"]
)
listing = manager.list_workspace_files(workspace_id, path="src", recursive=True)
assert listing["entries"] == [
{
"path": "/workspace/src/app.py",
"artifact_type": "file",
"size_bytes": 13,
"link_target": None,
}
]
read_payload = manager.read_workspace_file(workspace_id, "src/app.py")
assert read_payload["content"] == 'print("bug")\n'
written = manager.write_workspace_file(
workspace_id,
"src/generated/out.txt",
text="generated\n",
)
assert written["bytes_written"] == 10
patch_payload = manager.apply_workspace_patch(
workspace_id,
patch=(
"--- a/src/app.py\n"
"+++ b/src/app.py\n"
"@@ -1 +1 @@\n"
'-print("bug")\n'
'+print("fixed")\n'
"--- /dev/null\n"
"+++ b/src/new.py\n"
"@@ -0,0 +1 @@\n"
'+print("new")\n'
),
)
assert patch_payload["changed"] is True
assert patch_payload["summary"] == {
"total": 2,
"added": 1,
"modified": 1,
"deleted": 0,
}
executed = manager.exec_workspace(
workspace_id,
command="python3 src/app.py && cat src/new.py && cat src/generated/out.txt",
timeout_seconds=30,
)
assert executed["stdout"] == 'fixed\nprint("new")\ngenerated\n'
diff_payload = manager.diff_workspace(workspace_id)
assert diff_payload["changed"] is True
assert diff_payload["summary"]["added"] == 2
assert diff_payload["summary"]["modified"] == 1
output_path = tmp_path / "exported-app.py"
export_payload = manager.export_workspace(
workspace_id,
path="src/app.py",
output_path=output_path,
)
assert export_payload["artifact_type"] == "file"
assert output_path.read_text(encoding="utf-8") == 'print("fixed")\n'
def test_workspace_export_directory_uses_exact_output_path(tmp_path: Path) -> None:
seed_dir = tmp_path / "seed"
nested_dir = seed_dir / "src"
nested_dir.mkdir(parents=True)
(nested_dir / "note.txt").write_text("hello\n", encoding="utf-8")
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
workspace_id = str(
manager.create_workspace(
environment="debian:12-base",
allow_host_compat=True,
seed_path=seed_dir,
)["workspace_id"]
)
output_dir = tmp_path / "exported-src"
payload = manager.export_workspace(workspace_id, path="src", output_path=output_dir)
assert payload["artifact_type"] == "directory"
assert (output_dir / "note.txt").read_text(encoding="utf-8") == "hello\n"
assert not (output_dir / "src").exists()
def test_workspace_diff_requires_create_time_baseline(tmp_path: Path) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
workspace_id = str(
manager.create_workspace(
environment="debian:12-base",
allow_host_compat=True,
)["workspace_id"]
)
baseline_path = tmp_path / "vms" / "workspaces" / workspace_id / "baseline" / "workspace.tar"
baseline_path.unlink()
with pytest.raises(RuntimeError, match="require[s]? a baseline snapshot"):
manager.diff_workspace(workspace_id)
def test_workspace_snapshots_and_reset_round_trip(tmp_path: Path) -> None:
seed_dir = tmp_path / "seed"
seed_dir.mkdir()
(seed_dir / "note.txt").write_text("seed\n", encoding="utf-8")
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
workspace_id = str(
manager.create_workspace(
environment="debian:12-base",
allow_host_compat=True,
seed_path=seed_dir,
)["workspace_id"]
)
manager.exec_workspace(
workspace_id,
command="printf 'checkpoint\\n' > note.txt",
timeout_seconds=30,
)
created_snapshot = manager.create_snapshot(workspace_id, "checkpoint")
assert created_snapshot["snapshot"]["snapshot_name"] == "checkpoint"
listed = manager.list_snapshots(workspace_id)
assert listed["count"] == 2
assert [snapshot["snapshot_name"] for snapshot in listed["snapshots"]] == [
"baseline",
"checkpoint",
]
manager.exec_workspace(
workspace_id,
command="printf 'after\\n' > note.txt",
timeout_seconds=30,
)
manager.start_service(
workspace_id,
"app",
command="sh -lc 'touch .ready; while true; do sleep 60; done'",
readiness={"type": "file", "path": ".ready"},
)
reset_to_snapshot = manager.reset_workspace(workspace_id, snapshot="checkpoint")
assert reset_to_snapshot["workspace_reset"]["snapshot_name"] == "checkpoint"
assert reset_to_snapshot["reset_count"] == 1
assert reset_to_snapshot["last_command"] is None
assert reset_to_snapshot["command_count"] == 0
assert reset_to_snapshot["service_count"] == 0
assert reset_to_snapshot["running_service_count"] == 0
checkpoint_result = manager.exec_workspace(
workspace_id,
command="cat note.txt",
timeout_seconds=30,
)
assert checkpoint_result["stdout"] == "checkpoint\n"
logs_after_snapshot_reset = manager.logs_workspace(workspace_id)
assert logs_after_snapshot_reset["count"] == 1
reset_to_baseline = manager.reset_workspace(workspace_id)
assert reset_to_baseline["workspace_reset"]["snapshot_name"] == "baseline"
assert reset_to_baseline["reset_count"] == 2
assert reset_to_baseline["command_count"] == 0
assert reset_to_baseline["service_count"] == 0
assert manager.logs_workspace(workspace_id)["count"] == 0
baseline_result = manager.exec_workspace(
workspace_id,
command="cat note.txt",
timeout_seconds=30,
)
assert baseline_result["stdout"] == "seed\n"
diff_payload = manager.diff_workspace(workspace_id)
assert diff_payload["changed"] is False
deleted_snapshot = manager.delete_snapshot(workspace_id, "checkpoint")
assert deleted_snapshot["deleted"] is True
listed_after_delete = manager.list_snapshots(workspace_id)
assert [snapshot["snapshot_name"] for snapshot in listed_after_delete["snapshots"]] == [
"baseline"
]
def test_workspace_snapshot_and_reset_require_baseline(tmp_path: Path) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
workspace_id = str(
manager.create_workspace(
environment="debian:12-base",
allow_host_compat=True,
)["workspace_id"]
)
baseline_path = tmp_path / "vms" / "workspaces" / workspace_id / "baseline" / "workspace.tar"
baseline_path.unlink()
with pytest.raises(RuntimeError, match="require[s]? a baseline snapshot"):
manager.list_snapshots(workspace_id)
with pytest.raises(RuntimeError, match="require[s]? a baseline snapshot"):
manager.create_snapshot(workspace_id, "checkpoint")
with pytest.raises(RuntimeError, match="require[s]? a baseline snapshot"):
manager.delete_snapshot(workspace_id, "checkpoint")
with pytest.raises(RuntimeError, match="require[s]? a baseline snapshot"):
manager.reset_workspace(workspace_id)
def test_workspace_delete_baseline_snapshot_is_rejected(tmp_path: Path) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
workspace_id = str(
manager.create_workspace(
environment="debian:12-base",
allow_host_compat=True,
)["workspace_id"]
)
with pytest.raises(ValueError, match="cannot delete the baseline snapshot"):
manager.delete_snapshot(workspace_id, "baseline")
def test_workspace_reset_recreates_stopped_workspace(tmp_path: Path) -> None:
seed_dir = tmp_path / "seed"
seed_dir.mkdir()
(seed_dir / "note.txt").write_text("seed\n", encoding="utf-8")
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
workspace_id = str(
manager.create_workspace(
environment="debian:12-base",
allow_host_compat=True,
seed_path=seed_dir,
)["workspace_id"]
)
with manager._lock: # noqa: SLF001
workspace = manager._load_workspace_locked(workspace_id) # noqa: SLF001
workspace.state = "stopped"
workspace.firecracker_pid = None
manager._save_workspace_locked(workspace) # noqa: SLF001
reset_payload = manager.reset_workspace(workspace_id)
assert reset_payload["state"] == "started"
assert reset_payload["workspace_reset"]["snapshot_name"] == "baseline"
result = manager.exec_workspace(workspace_id, command="cat note.txt", timeout_seconds=30)
assert result["stdout"] == "seed\n"
def test_workspace_reset_failure_leaves_workspace_stopped(tmp_path: Path) -> None:
seed_dir = tmp_path / "seed"
seed_dir.mkdir()
(seed_dir / "note.txt").write_text("seed\n", encoding="utf-8")
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
workspace_id = str(
manager.create_workspace(
environment="debian:12-base",
allow_host_compat=True,
seed_path=seed_dir,
)["workspace_id"]
)
manager.create_snapshot(workspace_id, "checkpoint")
def _failing_import_archive(*args: Any, **kwargs: Any) -> dict[str, Any]:
del args, kwargs
raise RuntimeError("boom")
manager._backend.import_archive = _failing_import_archive # type: ignore[method-assign] # noqa: SLF001
with pytest.raises(RuntimeError, match="boom"):
manager.reset_workspace(workspace_id, snapshot="checkpoint")
with manager._lock: # noqa: SLF001
workspace = manager._load_workspace_locked(workspace_id) # noqa: SLF001
assert workspace.state == "stopped"
assert workspace.firecracker_pid is None
assert workspace.reset_count == 0
listed = manager.list_snapshots(workspace_id)
assert [snapshot["snapshot_name"] for snapshot in listed["snapshots"]] == [
"baseline",
"checkpoint",
]
def test_workspace_export_helpers_preserve_directory_symlinks(tmp_path: Path) -> None:
workspace_dir = tmp_path / "workspace"
workspace_dir.mkdir()
(workspace_dir / "note.txt").write_text("hello\n", encoding="utf-8")
os.symlink("note.txt", workspace_dir / "note-link")
(workspace_dir / "empty-dir").mkdir()
archive_path = tmp_path / "workspace-export.tar"
exported = vm_manager_module._prepare_workspace_export_archive( # noqa: SLF001
workspace_dir=workspace_dir,
workspace_path=".",
archive_path=archive_path,
)
assert exported.artifact_type == "directory"
output_dir = tmp_path / "output"
extracted = vm_manager_module._extract_workspace_export_archive( # noqa: SLF001
archive_path,
output_path=output_dir,
artifact_type="directory",
)
assert extracted["artifact_type"] == "directory"
assert (output_dir / "note.txt").read_text(encoding="utf-8") == "hello\n"
assert (output_dir / "note-link").is_symlink()
assert os.readlink(output_dir / "note-link") == "note.txt"
assert (output_dir / "empty-dir").is_dir()
def test_workspace_export_helpers_validate_missing_path_and_existing_output(tmp_path: Path) -> None:
workspace_dir = tmp_path / "workspace"
workspace_dir.mkdir()
(workspace_dir / "note.txt").write_text("hello\n", encoding="utf-8")
with pytest.raises(RuntimeError, match="workspace path does not exist"):
vm_manager_module._prepare_workspace_export_archive( # noqa: SLF001
workspace_dir=workspace_dir,
workspace_path="missing.txt",
archive_path=tmp_path / "missing.tar",
)
archive_path = tmp_path / "note-export.tar"
exported = vm_manager_module._prepare_workspace_export_archive( # noqa: SLF001
workspace_dir=workspace_dir,
workspace_path="note.txt",
archive_path=archive_path,
)
output_path = tmp_path / "note.txt"
output_path.write_text("already here\n", encoding="utf-8")
with pytest.raises(RuntimeError, match="output_path already exists"):
vm_manager_module._extract_workspace_export_archive( # noqa: SLF001
archive_path,
output_path=output_path,
artifact_type=exported.artifact_type,
)
def test_diff_workspace_trees_reports_empty_binary_symlink_and_type_changes(tmp_path: Path) -> None:
baseline_dir = tmp_path / "baseline"
current_dir = tmp_path / "current"
baseline_dir.mkdir()
current_dir.mkdir()
(baseline_dir / "modified.txt").write_text("before\n", encoding="utf-8")
(current_dir / "modified.txt").write_text("after\n", encoding="utf-8")
(baseline_dir / "deleted.txt").write_text("gone\n", encoding="utf-8")
(current_dir / "added.txt").write_text("new\n", encoding="utf-8")
(baseline_dir / "binary.bin").write_bytes(b"\x00before")
(current_dir / "binary.bin").write_bytes(b"\x00after")
os.symlink("link-target-old.txt", baseline_dir / "link")
os.symlink("link-target-new.txt", current_dir / "link")
(baseline_dir / "swap").mkdir()
(current_dir / "swap").write_text("type changed\n", encoding="utf-8")
(baseline_dir / "removed-empty").mkdir()
(current_dir / "added-empty").mkdir()
diff_payload = vm_manager_module._diff_workspace_trees( # noqa: SLF001
baseline_dir,
current_dir,
)
assert diff_payload["changed"] is True
assert diff_payload["summary"] == {
"total": 8,
"added": 2,
"modified": 3,
"deleted": 2,
"type_changed": 1,
"text_patched": 3,
"non_text": 5,
}
assert "--- a/modified.txt" in diff_payload["patch"]
assert "+++ b/modified.txt" in diff_payload["patch"]
assert "--- /dev/null" in diff_payload["patch"]
assert "+++ b/added.txt" in diff_payload["patch"]
assert "--- a/deleted.txt" in diff_payload["patch"]
assert "+++ /dev/null" in diff_payload["patch"]
entries = {entry["path"]: entry for entry in diff_payload["entries"]}
assert entries["binary.bin"]["text_patch"] is None
assert entries["link"]["artifact_type"] == "symlink"
assert entries["swap"]["artifact_type"] == "file"
assert entries["removed-empty"]["artifact_type"] == "directory"
assert entries["added-empty"]["artifact_type"] == "directory"
def test_diff_workspace_trees_unchanged_returns_empty_summary(tmp_path: Path) -> None:
baseline_dir = tmp_path / "baseline"
current_dir = tmp_path / "current"
baseline_dir.mkdir()
current_dir.mkdir()
(baseline_dir / "note.txt").write_text("same\n", encoding="utf-8")
(current_dir / "note.txt").write_text("same\n", encoding="utf-8")
diff_payload = vm_manager_module._diff_workspace_trees( # noqa: SLF001
baseline_dir,
current_dir,
)
assert diff_payload == {
"changed": False,
"summary": {
"total": 0,
"added": 0,
"modified": 0,
"deleted": 0,
"type_changed": 0,
"text_patched": 0,
"non_text": 0,
},
"entries": [],
"patch": "",
}
def test_workspace_shell_lifecycle_and_rehydration(tmp_path: Path) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
created = manager.create_workspace(
environment="debian:12-base",
allow_host_compat=True,
)
workspace_id = str(created["workspace_id"])
opened = manager.open_shell(workspace_id)
shell_id = str(opened["shell_id"])
assert opened["state"] == "running"
manager.write_shell(workspace_id, shell_id, input_text="pwd")
output = ""
deadline = time.time() + 5
while time.time() < deadline:
read = manager.read_shell(workspace_id, shell_id, cursor=0, max_chars=65536)
output = str(read["output"])
if "/workspace" in output:
break
time.sleep(0.05)
assert "/workspace" in output
manager_rehydrated = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
second_opened = manager_rehydrated.open_shell(workspace_id)
second_shell_id = str(second_opened["shell_id"])
assert second_shell_id != shell_id
manager_rehydrated.write_shell(workspace_id, second_shell_id, input_text="printf 'ok\\n'")
second_output = ""
deadline = time.time() + 5
while time.time() < deadline:
read = manager_rehydrated.read_shell(
workspace_id,
second_shell_id,
cursor=0,
max_chars=65536,
)
second_output = str(read["output"])
if "ok" in second_output:
break
time.sleep(0.05)
assert "ok" in second_output
logs = manager.logs_workspace(workspace_id)
assert logs["count"] == 0
closed = manager.close_shell(workspace_id, shell_id)
assert closed["closed"] is True
with pytest.raises(ValueError, match="does not exist"):
manager.read_shell(workspace_id, shell_id)
deleted = manager.delete_workspace(workspace_id)
assert deleted["deleted"] is True
with pytest.raises(ValueError, match="does not exist"):
manager_rehydrated.read_shell(workspace_id, second_shell_id)
def test_workspace_create_rejects_unsafe_seed_archive(tmp_path: Path) -> None:
archive_path = tmp_path / "bad.tgz"
with tarfile.open(archive_path, "w:gz") as archive:
payload = b"bad\n"
info = tarfile.TarInfo(name="../escape.txt")
info.size = len(payload)
archive.addfile(info, io.BytesIO(payload))
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
with pytest.raises(RuntimeError, match="unsafe archive member path"):
manager.create_workspace(
environment="debian:12-base",
allow_host_compat=True,
seed_path=archive_path,
)
assert list((tmp_path / "vms" / "workspaces").iterdir()) == []
def test_workspace_create_rejects_archive_that_writes_through_symlink(tmp_path: Path) -> None:
archive_path = tmp_path / "bad-symlink.tgz"
with tarfile.open(archive_path, "w:gz") as archive:
symlink_info = tarfile.TarInfo(name="linked")
symlink_info.type = tarfile.SYMTYPE
symlink_info.linkname = "outside"
archive.addfile(symlink_info)
payload = b"bad\n"
file_info = tarfile.TarInfo(name="linked/note.txt")
file_info.size = len(payload)
archive.addfile(file_info, io.BytesIO(payload))
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
with pytest.raises(RuntimeError, match="traverse through a symlinked path"):
manager.create_workspace(
environment="debian:12-base",
allow_host_compat=True,
seed_path=archive_path,
)
def test_workspace_create_cleans_up_on_seed_failure(
tmp_path: Path, monkeypatch: pytest.MonkeyPatch
) -> None:
source_dir = tmp_path / "seed"
source_dir.mkdir()
(source_dir / "note.txt").write_text("hello\n", encoding="utf-8")
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
def _boom(*args: Any, **kwargs: Any) -> dict[str, Any]:
del args, kwargs
raise RuntimeError("seed import failed")
monkeypatch.setattr(manager._backend, "import_archive", _boom) # noqa: SLF001
with pytest.raises(RuntimeError, match="seed import failed"):
manager.create_workspace(
environment="debian:12-base",
allow_host_compat=True,
seed_path=source_dir,
)
assert list((tmp_path / "vms" / "workspaces").iterdir()) == []
def test_workspace_rehydrates_across_manager_processes(tmp_path: Path) -> None:
base_dir = tmp_path / "vms"
manager = VmManager(
backend_name="mock",
base_dir=base_dir,
network_manager=TapNetworkManager(enabled=False),
)
workspace_id = str(
manager.create_workspace(
environment="debian:12-base",
allow_host_compat=True,
)["workspace_id"]
)
other = VmManager(
backend_name="mock",
base_dir=base_dir,
network_manager=TapNetworkManager(enabled=False),
)
executed = other.exec_workspace(workspace_id, command="printf 'ok\\n'", timeout_seconds=30)
assert executed["exit_code"] == 0
assert executed["stdout"] == "ok\n"
logs = other.logs_workspace(workspace_id)
assert logs["count"] == 1
def test_workspace_requires_started_state(tmp_path: Path) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
workspace_id = str(
manager.create_workspace(
environment="debian:12-base",
allow_host_compat=True,
)["workspace_id"]
)
workspace_path = tmp_path / "vms" / "workspaces" / workspace_id / "workspace.json"
payload = json.loads(workspace_path.read_text(encoding="utf-8"))
payload["state"] = "stopped"
workspace_path.write_text(json.dumps(payload, indent=2, sort_keys=True), encoding="utf-8")
with pytest.raises(RuntimeError, match="must be in 'started' state"):
manager.exec_workspace(workspace_id, command="true", timeout_seconds=30)
def test_vm_manager_firecracker_backend_path(
tmp_path: Path, monkeypatch: pytest.MonkeyPatch
) -> None:
class StubFirecrackerBackend:
def __init__(
self,
environment_store: Any,
firecracker_bin: Path,
jailer_bin: Path,
runtime_capabilities: Any,
network_manager: TapNetworkManager,
) -> None:
self.environment_store = environment_store
self.firecracker_bin = firecracker_bin
self.jailer_bin = jailer_bin
self.runtime_capabilities = runtime_capabilities
self.network_manager = network_manager
def create(self, instance: Any) -> None:
del instance
def start(self, instance: Any) -> None:
del instance
def exec(self, instance: Any, command: str, timeout_seconds: int) -> Any:
del instance, command, timeout_seconds
return None
def stop(self, instance: Any) -> None:
del instance
def delete(self, instance: Any) -> None:
del instance
monkeypatch.setattr(vm_manager_module, "FirecrackerBackend", StubFirecrackerBackend)
manager = VmManager(
backend_name="firecracker",
base_dir=tmp_path / "vms",
runtime_paths=resolve_runtime_paths(),
network_manager=TapNetworkManager(enabled=False),
)
assert manager._backend_name == "firecracker" # noqa: SLF001
def test_firecracker_backend_start_removes_stale_socket_files(
tmp_path: Path,
monkeypatch: pytest.MonkeyPatch,
) -> None:
backend = cast(Any, object.__new__(vm_manager_module.FirecrackerBackend))
backend._environment_store = object() # noqa: SLF001
backend._firecracker_bin = tmp_path / "firecracker" # noqa: SLF001
backend._jailer_bin = tmp_path / "jailer" # noqa: SLF001
backend._runtime_capabilities = RuntimeCapabilities( # noqa: SLF001
supports_vm_boot=True,
supports_guest_exec=True,
supports_guest_network=False,
reason=None,
)
backend._network_manager = TapNetworkManager(enabled=False) # noqa: SLF001
backend._guest_exec_client = None # noqa: SLF001
backend._processes = {} # noqa: SLF001
backend._firecracker_bin.write_text("fc", encoding="utf-8") # noqa: SLF001
backend._jailer_bin.write_text("jailer", encoding="utf-8") # noqa: SLF001
kernel_image = tmp_path / "vmlinux"
kernel_image.write_text("kernel", encoding="utf-8")
rootfs_image = tmp_path / "rootfs.ext4"
rootfs_image.write_bytes(b"rootfs")
workdir = tmp_path / "runtime"
workdir.mkdir()
firecracker_socket = workdir / "firecracker.sock"
vsock_socket = workdir / "vsock.sock"
firecracker_socket.write_text("stale firecracker socket", encoding="utf-8")
vsock_socket.write_text("stale vsock socket", encoding="utf-8")
class DummyPopen:
def __init__(self, *args: Any, **kwargs: Any) -> None:
del args, kwargs
self.pid = 4242
def poll(self) -> None:
return None
monkeypatch.setattr(
cast(Any, vm_manager_module).subprocess,
"run",
lambda *args, **kwargs: subprocess.CompletedProcess( # noqa: ARG005
args=args[0],
returncode=0,
stdout="Firecracker v1.0.0\n",
stderr="",
),
)
monkeypatch.setattr(cast(Any, vm_manager_module).subprocess, "Popen", DummyPopen)
instance = vm_manager_module.VmInstance(
vm_id="abcd1234",
environment="debian:12",
vcpu_count=1,
mem_mib=512,
ttl_seconds=600,
created_at=time.time(),
expires_at=time.time() + 600,
workdir=workdir,
metadata={
"kernel_image": str(kernel_image),
"rootfs_image": str(rootfs_image),
},
)
backend.start(instance)
assert instance.firecracker_pid == 4242
assert not firecracker_socket.exists()
assert not vsock_socket.exists()
def test_vm_manager_fails_closed_without_host_compat_opt_in(tmp_path: Path) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
vm_id = str(
manager.create_vm(
environment="debian:12-base",
ttl_seconds=600,
)["vm_id"]
)
with pytest.raises(RuntimeError, match="guest boot is unavailable"):
manager.start_vm(vm_id)
def test_vm_manager_uses_canonical_default_cache_dir(
monkeypatch: pytest.MonkeyPatch, tmp_path: Path
) -> None:
monkeypatch.setenv("PYRO_ENVIRONMENT_CACHE_DIR", str(tmp_path / "cache"))
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
assert manager._environment_store.cache_dir == tmp_path / "cache" # noqa: SLF001
def test_vm_manager_helper_round_trips() -> None:
network = NetworkConfig(
vm_id="abc123",
tap_name="tap0",
guest_ip="172.29.1.2",
gateway_ip="172.29.1.1",
subnet_cidr="172.29.1.0/24",
mac_address="06:00:aa:bb:cc:dd",
dns_servers=("1.1.1.1", "8.8.8.8"),
)
assert vm_manager_module._optional_int(None) is None # noqa: SLF001
assert vm_manager_module._optional_int(True) == 1 # noqa: SLF001
assert vm_manager_module._optional_int(7) == 7 # noqa: SLF001
assert vm_manager_module._optional_int(7.2) == 7 # noqa: SLF001
assert vm_manager_module._optional_int("9") == 9 # noqa: SLF001
with pytest.raises(TypeError, match="integer-compatible"):
vm_manager_module._optional_int(object()) # noqa: SLF001
assert vm_manager_module._optional_str(None) is None # noqa: SLF001
assert vm_manager_module._optional_str(1) == "1" # noqa: SLF001
assert vm_manager_module._optional_dict(None) is None # noqa: SLF001
assert vm_manager_module._optional_dict({"x": 1}) == {"x": 1} # noqa: SLF001
with pytest.raises(TypeError, match="dictionary payload"):
vm_manager_module._optional_dict("bad") # noqa: SLF001
assert vm_manager_module._string_dict({"x": 1}) == {"x": "1"} # noqa: SLF001
assert vm_manager_module._string_dict("bad") == {} # noqa: SLF001
serialized = vm_manager_module._serialize_network(network) # noqa: SLF001
assert serialized is not None
restored = vm_manager_module._deserialize_network(serialized) # noqa: SLF001
assert restored == network
assert vm_manager_module._deserialize_network(None) is None # noqa: SLF001
with pytest.raises(TypeError, match="dictionary payload"):
vm_manager_module._deserialize_network("bad") # noqa: SLF001
assert vm_manager_module._wrap_guest_command("echo hi") == "echo hi" # noqa: SLF001
wrapped = vm_manager_module._wrap_guest_command("echo hi", cwd="/workspace") # noqa: SLF001
assert "cd /workspace" in wrapped
assert vm_manager_module._pid_is_running(None) is False # noqa: SLF001
def test_copy_rootfs_falls_back_to_copy2(
tmp_path: Path, monkeypatch: pytest.MonkeyPatch
) -> None:
source = tmp_path / "rootfs.ext4"
source.write_text("payload", encoding="utf-8")
dest = tmp_path / "dest" / "rootfs.ext4"
def _raise_oserror(*args: Any, **kwargs: Any) -> Any:
del args, kwargs
raise OSError("no cp")
monkeypatch.setattr(subprocess, "run", _raise_oserror)
clone_mode = vm_manager_module._copy_rootfs(source, dest) # noqa: SLF001
assert clone_mode == "copy2"
assert dest.read_text(encoding="utf-8") == "payload"
def test_workspace_create_cleans_up_on_start_failure(
tmp_path: Path, monkeypatch: pytest.MonkeyPatch
) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
def _boom(instance: Any) -> None:
del instance
raise RuntimeError("boom")
monkeypatch.setattr(manager._backend, "start", _boom) # noqa: SLF001
with pytest.raises(RuntimeError, match="boom"):
manager.create_workspace(environment="debian:12-base", allow_host_compat=True)
assert list((tmp_path / "vms" / "workspaces").iterdir()) == []
def test_exec_instance_wraps_guest_workspace_command(tmp_path: Path) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
manager._runtime_capabilities = RuntimeCapabilities( # noqa: SLF001
supports_vm_boot=True,
supports_guest_exec=True,
supports_guest_network=False,
reason=None,
)
captured: dict[str, Any] = {}
class StubBackend:
def exec(
self,
instance: Any,
command: str,
timeout_seconds: int,
*,
workdir: Path | None = None,
) -> vm_manager_module.VmExecResult:
del instance, timeout_seconds
captured["command"] = command
captured["workdir"] = workdir
return vm_manager_module.VmExecResult(
stdout="",
stderr="",
exit_code=0,
duration_ms=1,
)
manager._backend = StubBackend() # type: ignore[assignment] # noqa: SLF001
instance = vm_manager_module.VmInstance( # noqa: SLF001
vm_id="vm-123",
environment="debian:12-base",
vcpu_count=1,
mem_mib=512,
ttl_seconds=600,
created_at=time.time(),
expires_at=time.time() + 600,
workdir=tmp_path / "runtime",
state="started",
)
result, execution_mode = manager._exec_instance( # noqa: SLF001
instance,
command="echo hi",
timeout_seconds=30,
guest_cwd="/workspace",
)
assert result.exit_code == 0
assert execution_mode == "unknown"
assert "cd /workspace" in str(captured["command"])
assert captured["workdir"] is None
def test_status_workspace_marks_dead_backing_process_stopped(tmp_path: Path) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
workspace_id = str(
manager.create_workspace(
environment="debian:12-base",
allow_host_compat=True,
)["workspace_id"]
)
workspace_path = tmp_path / "vms" / "workspaces" / workspace_id / "workspace.json"
payload = json.loads(workspace_path.read_text(encoding="utf-8"))
payload["metadata"]["execution_mode"] = "guest_vsock"
payload["firecracker_pid"] = 999999
workspace_path.write_text(json.dumps(payload, indent=2, sort_keys=True), encoding="utf-8")
status = manager.status_workspace(workspace_id)
assert status["state"] == "stopped"
updated_payload = json.loads(workspace_path.read_text(encoding="utf-8"))
assert "backing guest process" in str(updated_payload.get("last_error", ""))
def test_reap_expired_workspaces_removes_invalid_and_expired_records(tmp_path: Path) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
invalid_dir = tmp_path / "vms" / "workspaces" / "invalid"
invalid_dir.mkdir(parents=True)
(invalid_dir / "workspace.json").write_text("[]", encoding="utf-8")
workspace_id = str(
manager.create_workspace(
environment="debian:12-base",
allow_host_compat=True,
)["workspace_id"]
)
workspace_path = tmp_path / "vms" / "workspaces" / workspace_id / "workspace.json"
payload = json.loads(workspace_path.read_text(encoding="utf-8"))
payload["expires_at"] = 0.0
workspace_path.write_text(json.dumps(payload, indent=2, sort_keys=True), encoding="utf-8")
with manager._lock: # noqa: SLF001
manager._reap_expired_workspaces_locked(time.time()) # noqa: SLF001
assert not invalid_dir.exists()
assert not (tmp_path / "vms" / "workspaces" / workspace_id).exists()
def test_workspace_service_lifecycle_and_status_counts(tmp_path: Path) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
workspace_id = str(
manager.create_workspace(
environment="debian:12-base",
allow_host_compat=True,
)["workspace_id"]
)
started = manager.start_service(
workspace_id,
"app",
command="sh -lc 'printf \"service ready\\n\"; touch .ready; while true; do sleep 60; done'",
readiness={"type": "file", "path": ".ready"},
)
assert started["state"] == "running"
listed = manager.list_services(workspace_id)
assert listed["count"] == 1
assert listed["running_count"] == 1
status = manager.status_service(workspace_id, "app")
assert status["state"] == "running"
assert status["ready_at"] is not None
logs = manager.logs_service(workspace_id, "app")
assert "service ready" in str(logs["stdout"])
workspace_status = manager.status_workspace(workspace_id)
assert workspace_status["service_count"] == 1
assert workspace_status["running_service_count"] == 1
stopped = manager.stop_service(workspace_id, "app")
assert stopped["state"] == "stopped"
assert stopped["stop_reason"] in {"sigterm", "sigkill"}
deleted = manager.delete_workspace(workspace_id)
assert deleted["deleted"] is True
def test_workspace_create_serializes_network_policy(tmp_path: Path) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
manager._runtime_capabilities = RuntimeCapabilities( # noqa: SLF001
supports_vm_boot=True,
supports_guest_exec=True,
supports_guest_network=True,
)
manager._ensure_workspace_guest_bootstrap_support = lambda instance: None # type: ignore[method-assign] # noqa: SLF001
created = manager.create_workspace(
environment="debian:12-base",
network_policy="egress",
)
assert created["network_policy"] == "egress"
workspace_id = str(created["workspace_id"])
workspace_path = tmp_path / "vms" / "workspaces" / workspace_id / "workspace.json"
payload = json.loads(workspace_path.read_text(encoding="utf-8"))
assert payload["network_policy"] == "egress"
def test_workspace_service_start_serializes_published_ports(
tmp_path: Path,
monkeypatch: pytest.MonkeyPatch,
) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
manager._runtime_capabilities = RuntimeCapabilities( # noqa: SLF001
supports_vm_boot=True,
supports_guest_exec=True,
supports_guest_network=True,
)
manager._ensure_workspace_guest_bootstrap_support = lambda instance: None # type: ignore[method-assign] # noqa: SLF001
created = manager.create_workspace(
environment="debian:12-base",
network_policy="egress+published-ports",
allow_host_compat=True,
)
workspace_id = str(created["workspace_id"])
workspace_path = tmp_path / "vms" / "workspaces" / workspace_id / "workspace.json"
payload = json.loads(workspace_path.read_text(encoding="utf-8"))
payload["network"] = {
"vm_id": workspace_id,
"tap_name": "tap-test0",
"guest_ip": "172.29.1.2",
"gateway_ip": "172.29.1.1",
"subnet_cidr": "172.29.1.0/30",
"mac_address": "06:00:ac:1d:01:02",
"dns_servers": ["1.1.1.1"],
}
workspace_path.write_text(json.dumps(payload, indent=2, sort_keys=True), encoding="utf-8")
monkeypatch.setattr(
manager,
"_start_workspace_service_published_ports",
lambda **kwargs: [
vm_manager_module.WorkspacePublishedPortRecord(
guest_port=8080,
host_port=18080,
host="127.0.0.1",
protocol="tcp",
proxy_pid=9999,
)
],
)
monkeypatch.setattr(
manager,
"_refresh_workspace_liveness_locked",
lambda workspace: None,
)
started = manager.start_service(
workspace_id,
"web",
command="sh -lc 'touch .ready && while true; do sleep 60; done'",
readiness={"type": "file", "path": ".ready"},
published_ports=[{"guest_port": 8080, "host_port": 18080}],
)
assert started["published_ports"] == [
{
"host": "127.0.0.1",
"host_port": 18080,
"guest_port": 8080,
"protocol": "tcp",
}
]
def test_workspace_service_start_rejects_published_ports_without_network_policy(
tmp_path: Path,
) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
workspace_id = str(
manager.create_workspace(
environment="debian:12-base",
allow_host_compat=True,
)["workspace_id"]
)
with pytest.raises(
RuntimeError,
match="published ports require workspace network_policy 'egress\\+published-ports'",
):
manager.start_service(
workspace_id,
"web",
command="sh -lc 'touch .ready && while true; do sleep 60; done'",
readiness={"type": "file", "path": ".ready"},
published_ports=[{"guest_port": 8080}],
)
def test_workspace_service_start_rejects_published_ports_without_active_network(
tmp_path: Path,
monkeypatch: pytest.MonkeyPatch,
) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
manager._runtime_capabilities = RuntimeCapabilities( # noqa: SLF001
supports_vm_boot=True,
supports_guest_exec=True,
supports_guest_network=True,
)
manager._ensure_workspace_guest_bootstrap_support = lambda instance: None # type: ignore[method-assign] # noqa: SLF001
monkeypatch.setattr(
manager,
"_refresh_workspace_liveness_locked",
lambda workspace: None,
)
workspace_id = str(
manager.create_workspace(
environment="debian:12-base",
network_policy="egress+published-ports",
allow_host_compat=True,
)["workspace_id"]
)
with pytest.raises(RuntimeError, match="published ports require an active guest network"):
manager.start_service(
workspace_id,
"web",
command="sh -lc 'touch .ready && while true; do sleep 60; done'",
readiness={"type": "file", "path": ".ready"},
published_ports=[{"guest_port": 8080}],
)
def test_workspace_service_start_published_port_failure_marks_service_failed(
tmp_path: Path,
monkeypatch: pytest.MonkeyPatch,
) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
manager._runtime_capabilities = RuntimeCapabilities( # noqa: SLF001
supports_vm_boot=True,
supports_guest_exec=True,
supports_guest_network=True,
)
manager._ensure_workspace_guest_bootstrap_support = lambda instance: None # type: ignore[method-assign] # noqa: SLF001
monkeypatch.setattr(
manager,
"_refresh_workspace_liveness_locked",
lambda workspace: None,
)
created = manager.create_workspace(
environment="debian:12-base",
network_policy="egress+published-ports",
allow_host_compat=True,
)
workspace_id = str(created["workspace_id"])
workspace_path = tmp_path / "vms" / "workspaces" / workspace_id / "workspace.json"
payload = json.loads(workspace_path.read_text(encoding="utf-8"))
payload["network"] = {
"vm_id": workspace_id,
"tap_name": "tap-test0",
"guest_ip": "172.29.1.2",
"gateway_ip": "172.29.1.1",
"subnet_cidr": "172.29.1.0/30",
"mac_address": "06:00:ac:1d:01:02",
"dns_servers": ["1.1.1.1"],
}
workspace_path.write_text(json.dumps(payload, indent=2, sort_keys=True), encoding="utf-8")
def _raise_proxy_failure(
**kwargs: object,
) -> list[vm_manager_module.WorkspacePublishedPortRecord]:
del kwargs
raise RuntimeError("proxy boom")
monkeypatch.setattr(manager, "_start_workspace_service_published_ports", _raise_proxy_failure)
started = manager.start_service(
workspace_id,
"web",
command="sh -lc 'touch .ready && while true; do sleep 60; done'",
readiness={"type": "file", "path": ".ready"},
published_ports=[{"guest_port": 8080, "host_port": 18080}],
)
assert started["state"] == "failed"
assert started["stop_reason"] == "published_port_failed"
assert started["published_ports"] == []
def test_workspace_service_cleanup_stops_published_port_proxies(
tmp_path: Path,
monkeypatch: pytest.MonkeyPatch,
) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
workspace_id = "workspace-cleanup"
service = vm_manager_module.WorkspaceServiceRecord(
workspace_id=workspace_id,
service_name="web",
command="sleep 60",
cwd="/workspace",
state="running",
pid=1234,
started_at=time.time(),
ended_at=None,
exit_code=None,
execution_mode="host_compat",
readiness=None,
ready_at=None,
stop_reason=None,
published_ports=[
vm_manager_module.WorkspacePublishedPortRecord(
guest_port=8080,
host_port=18080,
proxy_pid=9999,
)
],
)
manager._save_workspace_service_locked(service) # noqa: SLF001
stopped: list[int | None] = []
monkeypatch.setattr(
vm_manager_module,
"_stop_workspace_published_port_proxy",
lambda published_port: stopped.append(published_port.proxy_pid),
)
manager._delete_workspace_service_artifacts_locked(workspace_id, "web") # noqa: SLF001
assert stopped == [9999]
assert not manager._workspace_service_record_path(workspace_id, "web").exists() # noqa: SLF001
def test_workspace_refresh_workspace_service_counts_stops_published_ports_when_stopped(
tmp_path: Path,
monkeypatch: pytest.MonkeyPatch,
) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
workspace = vm_manager_module.WorkspaceRecord(
workspace_id="workspace-counts",
environment="debian:12-base",
vcpu_count=1,
mem_mib=1024,
ttl_seconds=600,
created_at=time.time(),
expires_at=time.time() + 600,
state="stopped",
firecracker_pid=None,
last_error=None,
allow_host_compat=True,
network_policy="off",
metadata={},
command_count=0,
last_command=None,
workspace_seed={
"mode": "empty",
"seed_path": None,
"destination": "/workspace",
"entry_count": 0,
"bytes_written": 0,
},
secrets=[],
reset_count=0,
last_reset_at=None,
)
service = vm_manager_module.WorkspaceServiceRecord(
workspace_id=workspace.workspace_id,
service_name="web",
command="sleep 60",
cwd="/workspace",
state="running",
pid=1234,
started_at=time.time(),
ended_at=None,
exit_code=None,
execution_mode="host_compat",
readiness=None,
ready_at=None,
stop_reason=None,
published_ports=[
vm_manager_module.WorkspacePublishedPortRecord(
guest_port=8080,
host_port=18080,
proxy_pid=9999,
)
],
)
manager._save_workspace_service_locked(service) # noqa: SLF001
stopped: list[int | None] = []
monkeypatch.setattr(
vm_manager_module,
"_stop_workspace_published_port_proxy",
lambda published_port: stopped.append(published_port.proxy_pid),
)
manager._refresh_workspace_service_counts_locked(workspace) # noqa: SLF001
assert stopped == [9999]
refreshed = manager._load_workspace_service_locked(workspace.workspace_id, "web") # noqa: SLF001
assert refreshed.state == "stopped"
assert refreshed.stop_reason == "workspace_stopped"
def test_workspace_published_port_proxy_helpers(
tmp_path: Path,
monkeypatch: pytest.MonkeyPatch,
) -> None:
services_dir = tmp_path / "services"
services_dir.mkdir(parents=True, exist_ok=True)
class StubProcess:
def __init__(self, pid: int, *, exited: bool = False) -> None:
self.pid = pid
self._exited = exited
def poll(self) -> int | None:
return 1 if self._exited else None
def _fake_popen(command: list[str], **kwargs: object) -> StubProcess:
del kwargs
ready_file = Path(command[command.index("--ready-file") + 1])
ready_file.write_text(
json.dumps(
{
"host": "127.0.0.1",
"host_port": 18080,
"target_host": "172.29.1.2",
"target_port": 8080,
"protocol": "tcp",
}
),
encoding="utf-8",
)
return StubProcess(4242)
monkeypatch.setattr(subprocess, "Popen", _fake_popen)
record = vm_manager_module._start_workspace_published_port_proxy( # noqa: SLF001
services_dir=services_dir,
service_name="web",
workspace_id="workspace-proxy",
guest_ip="172.29.1.2",
spec=vm_manager_module.WorkspacePublishedPortSpec(
guest_port=8080,
host_port=18080,
),
)
assert record.guest_port == 8080
assert record.host_port == 18080
assert record.proxy_pid == 4242
def test_workspace_published_port_proxy_timeout_and_stop(
tmp_path: Path,
monkeypatch: pytest.MonkeyPatch,
) -> None:
services_dir = tmp_path / "services"
services_dir.mkdir(parents=True, exist_ok=True)
class StubProcess:
pid = 4242
def poll(self) -> int | None:
return None
monkeypatch.setattr(subprocess, "Popen", lambda *args, **kwargs: StubProcess())
monotonic_values = iter([0.0, 0.0, 5.1])
monkeypatch.setattr(time, "monotonic", lambda: next(monotonic_values))
monkeypatch.setattr(time, "sleep", lambda _: None)
stopped: list[int | None] = []
monkeypatch.setattr(
vm_manager_module,
"_stop_workspace_published_port_proxy",
lambda published_port: stopped.append(published_port.proxy_pid),
)
with pytest.raises(RuntimeError, match="timed out waiting for published port proxy readiness"):
vm_manager_module._start_workspace_published_port_proxy( # noqa: SLF001
services_dir=services_dir,
service_name="web",
workspace_id="workspace-proxy",
guest_ip="172.29.1.2",
spec=vm_manager_module.WorkspacePublishedPortSpec(
guest_port=8080,
host_port=18080,
),
)
assert stopped == [4242]
def test_workspace_published_port_validation_and_stop_helper(
monkeypatch: pytest.MonkeyPatch,
) -> None:
spec = vm_manager_module._normalize_workspace_published_port( # noqa: SLF001
guest_port="8080",
host_port="18080",
)
assert spec.guest_port == 8080
assert spec.host_port == 18080
with pytest.raises(ValueError, match="published guest_port must be an integer"):
vm_manager_module._normalize_workspace_published_port(guest_port=object()) # noqa: SLF001
with pytest.raises(ValueError, match="published host_port must be between 1025 and 65535"):
vm_manager_module._normalize_workspace_published_port( # noqa: SLF001
guest_port=8080,
host_port=80,
)
signals: list[int] = []
monkeypatch.setattr(os, "killpg", lambda pid, sig: signals.append(sig))
running = iter([True, False])
monkeypatch.setattr(vm_manager_module, "_pid_is_running", lambda pid: next(running))
monkeypatch.setattr(time, "sleep", lambda _: None)
vm_manager_module._stop_workspace_published_port_proxy( # noqa: SLF001
vm_manager_module.WorkspacePublishedPortRecord(
guest_port=8080,
host_port=18080,
proxy_pid=9999,
)
)
assert signals == [signal.SIGTERM]
def test_workspace_network_policy_requires_guest_network_support(tmp_path: Path) -> None:
manager = VmManager(
backend_name="firecracker",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
manager._runtime_capabilities = RuntimeCapabilities( # noqa: SLF001
supports_vm_boot=False,
supports_guest_exec=False,
supports_guest_network=False,
reason="no guest network",
)
with pytest.raises(RuntimeError, match="workspace network_policy requires guest networking"):
manager._require_workspace_network_policy_support( # noqa: SLF001
network_policy="egress"
)
def test_prepare_workspace_seed_rejects_missing_and_invalid_paths(tmp_path: Path) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
with pytest.raises(ValueError, match="does not exist"):
manager._prepare_workspace_seed(tmp_path / "missing") # noqa: SLF001
invalid_source = tmp_path / "seed.txt"
invalid_source.write_text("seed", encoding="utf-8")
with pytest.raises(
ValueError,
match="seed_path must be a directory or a .tar/.tar.gz/.tgz archive",
):
manager._prepare_workspace_seed(invalid_source) # noqa: SLF001
def test_workspace_baseline_snapshot_requires_archive(tmp_path: Path) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
created = manager.create_workspace(
environment="debian:12",
vcpu_count=1,
mem_mib=512,
ttl_seconds=600,
allow_host_compat=True,
)
workspace_id = str(created["workspace_id"])
baseline_path = tmp_path / "vms" / "workspaces" / workspace_id / "baseline" / "workspace.tar"
baseline_path.unlink()
workspace = manager._load_workspace_locked(workspace_id) # noqa: SLF001
with pytest.raises(RuntimeError, match="baseline snapshot"):
manager._workspace_baseline_snapshot_locked(workspace) # noqa: SLF001
def test_workspace_snapshot_and_service_loaders_handle_invalid_payloads(tmp_path: Path) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
workspace_id = "workspace-invalid"
services_dir = tmp_path / "vms" / "workspaces" / workspace_id / "services"
snapshots_dir = tmp_path / "vms" / "workspaces" / workspace_id / "snapshots"
services_dir.mkdir(parents=True, exist_ok=True)
snapshots_dir.mkdir(parents=True, exist_ok=True)
(services_dir / "svc.json").write_text("[]", encoding="utf-8")
(snapshots_dir / "snap.json").write_text("[]", encoding="utf-8")
with pytest.raises(RuntimeError, match="service record"):
manager._load_workspace_service_locked(workspace_id, "svc") # noqa: SLF001
with pytest.raises(RuntimeError, match="snapshot record"):
manager._load_workspace_snapshot_locked(workspace_id, "snap") # noqa: SLF001
with pytest.raises(RuntimeError, match="snapshot record"):
manager._load_workspace_snapshot_locked_optional(workspace_id, "snap") # noqa: SLF001
assert manager._load_workspace_snapshot_locked_optional(workspace_id, "missing") is None # noqa: SLF001
def test_workspace_shell_helpers_handle_missing_invalid_and_close_errors(
tmp_path: Path,
monkeypatch: pytest.MonkeyPatch,
) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
created = manager.create_workspace(
environment="debian:12",
vcpu_count=1,
mem_mib=512,
ttl_seconds=600,
allow_host_compat=True,
)
workspace_id = str(created["workspace_id"])
assert manager._list_workspace_shells_locked(workspace_id) == [] # noqa: SLF001
shells_dir = tmp_path / "vms" / "workspaces" / workspace_id / "shells"
shells_dir.mkdir(parents=True, exist_ok=True)
(shells_dir / "invalid.json").write_text("[]", encoding="utf-8")
assert manager._list_workspace_shells_locked(workspace_id) == [] # noqa: SLF001
shell = vm_manager_module.WorkspaceShellRecord(
workspace_id=workspace_id,
shell_id="shell-1",
cwd="/workspace",
cols=120,
rows=30,
state="running",
started_at=time.time(),
)
manager._save_workspace_shell_locked(shell) # noqa: SLF001
workspace = manager._load_workspace_locked(workspace_id) # noqa: SLF001
instance = workspace.to_instance(
workdir=tmp_path / "vms" / "workspaces" / workspace_id / "runtime"
)
def _raise_close(**kwargs: object) -> dict[str, object]:
del kwargs
raise RuntimeError("shell close boom")
monkeypatch.setattr(manager._backend, "close_shell", _raise_close)
manager._close_workspace_shells_locked(workspace, instance) # noqa: SLF001
assert manager._list_workspace_shells_locked(workspace_id) == [] # noqa: SLF001
def test_workspace_refresh_service_helpers_cover_exit_and_started_refresh(
tmp_path: Path,
monkeypatch: pytest.MonkeyPatch,
) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
created = manager.create_workspace(
environment="debian:12",
vcpu_count=1,
mem_mib=512,
ttl_seconds=600,
allow_host_compat=True,
)
workspace_id = str(created["workspace_id"])
workspace_path = tmp_path / "vms" / "workspaces" / workspace_id / "workspace.json"
payload = json.loads(workspace_path.read_text(encoding="utf-8"))
payload["state"] = "started"
payload["network"] = {
"vm_id": workspace_id,
"tap_name": "tap-test0",
"guest_ip": "172.29.1.2",
"gateway_ip": "172.29.1.1",
"subnet_cidr": "172.29.1.0/30",
"mac_address": "06:00:ac:1d:01:02",
"dns_servers": ["1.1.1.1"],
}
workspace_path.write_text(json.dumps(payload, indent=2, sort_keys=True), encoding="utf-8")
workspace = manager._load_workspace_locked(workspace_id) # noqa: SLF001
instance = workspace.to_instance(
workdir=tmp_path / "vms" / "workspaces" / workspace_id / "runtime"
)
service = vm_manager_module.WorkspaceServiceRecord(
workspace_id=workspace_id,
service_name="web",
command="sleep 60",
cwd="/workspace",
state="running",
started_at=time.time(),
execution_mode="guest_vsock",
published_ports=[
vm_manager_module.WorkspacePublishedPortRecord(
guest_port=8080,
host_port=18080,
proxy_pid=9999,
)
],
)
manager._save_workspace_service_locked(service) # noqa: SLF001
stopped: list[int | None] = []
monkeypatch.setattr(
vm_manager_module,
"_stop_workspace_published_port_proxy",
lambda published_port: stopped.append(published_port.proxy_pid),
)
monkeypatch.setattr(
manager._backend,
"status_service",
lambda *args, **kwargs: {
"service_name": "web",
"command": "sleep 60",
"cwd": "/workspace",
"state": "exited",
"started_at": service.started_at,
"ended_at": service.started_at + 1,
"exit_code": 0,
"execution_mode": "guest_vsock",
},
)
refreshed = manager._refresh_workspace_service_locked( # noqa: SLF001
workspace,
instance,
service,
)
assert refreshed.state == "exited"
assert refreshed.published_ports == [
vm_manager_module.WorkspacePublishedPortRecord(
guest_port=8080,
host_port=18080,
proxy_pid=None,
)
]
assert stopped == [9999]
manager._save_workspace_service_locked(service) # noqa: SLF001
refreshed_calls: list[str] = []
monkeypatch.setattr(manager, "_require_workspace_service_support", lambda instance: None)
def _refresh_services(
workspace: vm_manager_module.WorkspaceRecord,
instance: vm_manager_module.VmInstance,
) -> list[vm_manager_module.WorkspaceServiceRecord]:
del instance
refreshed_calls.append(workspace.workspace_id)
return []
monkeypatch.setattr(
manager,
"_refresh_workspace_services_locked",
_refresh_services,
)
manager._refresh_workspace_service_counts_locked(workspace) # noqa: SLF001
assert refreshed_calls == [workspace_id]
def test_workspace_start_published_ports_cleans_up_partial_failure(
tmp_path: Path,
monkeypatch: pytest.MonkeyPatch,
) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
created = manager.create_workspace(
environment="debian:12",
vcpu_count=1,
mem_mib=512,
ttl_seconds=600,
allow_host_compat=True,
)
workspace = manager._load_workspace_locked(str(created["workspace_id"])) # noqa: SLF001
service = vm_manager_module.WorkspaceServiceRecord(
workspace_id=workspace.workspace_id,
service_name="web",
command="sleep 60",
cwd="/workspace",
state="running",
started_at=time.time(),
execution_mode="guest_vsock",
)
started_record = vm_manager_module.WorkspacePublishedPortRecord(
guest_port=8080,
host_port=18080,
proxy_pid=9999,
)
calls: list[int] = []
def _start_proxy(**kwargs: object) -> vm_manager_module.WorkspacePublishedPortRecord:
spec = cast(vm_manager_module.WorkspacePublishedPortSpec, kwargs["spec"])
if spec.guest_port == 8080:
return started_record
raise RuntimeError("proxy boom")
monkeypatch.setattr(vm_manager_module, "_start_workspace_published_port_proxy", _start_proxy)
monkeypatch.setattr(
vm_manager_module,
"_stop_workspace_published_port_proxy",
lambda published_port: calls.append(published_port.proxy_pid or -1),
)
with pytest.raises(RuntimeError, match="proxy boom"):
manager._start_workspace_service_published_ports( # noqa: SLF001
workspace=workspace,
service=service,
guest_ip="172.29.1.2",
published_ports=[
vm_manager_module.WorkspacePublishedPortSpec(guest_port=8080),
vm_manager_module.WorkspacePublishedPortSpec(guest_port=9090),
],
)
assert calls == [9999]
def test_workspace_service_start_replaces_non_running_record(tmp_path: Path) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
workspace_id = str(
manager.create_workspace(
environment="debian:12-base",
allow_host_compat=True,
)["workspace_id"]
)
failed = manager.start_service(
workspace_id,
"app",
command="sh -lc 'exit 2'",
readiness={"type": "file", "path": ".ready"},
ready_timeout_seconds=1,
ready_interval_ms=50,
)
assert failed["state"] == "failed"
started = manager.start_service(
workspace_id,
"app",
command="sh -lc 'touch .ready; while true; do sleep 60; done'",
readiness={"type": "file", "path": ".ready"},
)
assert started["state"] == "running"
manager.delete_workspace(workspace_id)
def test_workspace_service_supports_command_readiness_and_helper_probes(
tmp_path: Path, monkeypatch: pytest.MonkeyPatch
) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
workspace_id = str(
manager.create_workspace(
environment="debian:12-base",
allow_host_compat=True,
)["workspace_id"]
)
command_started = manager.start_service(
workspace_id,
"command-ready",
command="sh -lc 'touch command.ready; while true; do sleep 60; done'",
readiness={"type": "command", "command": "test -f command.ready"},
)
assert command_started["state"] == "running"
listed = manager.list_services(workspace_id)
assert listed["count"] == 1
assert listed["running_count"] == 1
status = manager.status_workspace(workspace_id)
assert status["service_count"] == 1
assert status["running_service_count"] == 1
assert manager.stop_service(workspace_id, "command-ready")["state"] == "stopped"
workspace_dir = tmp_path / "vms" / "workspaces" / workspace_id / "workspace"
ready_file = workspace_dir / "probe.ready"
ready_file.write_text("ok\n", encoding="utf-8")
assert vm_manager_module._service_ready_on_host( # noqa: SLF001
readiness={"type": "file", "path": "/workspace/probe.ready"},
workspace_dir=workspace_dir,
cwd=workspace_dir,
)
class StubSocket:
def __enter__(self) -> StubSocket:
return self
def __exit__(self, *args: object) -> None:
del args
def settimeout(self, timeout: int) -> None:
assert timeout == 1
def connect(self, address: tuple[str, int]) -> None:
assert address == ("127.0.0.1", 8080)
monkeypatch.setattr("pyro_mcp.vm_manager.socket.socket", lambda *args: StubSocket())
assert vm_manager_module._service_ready_on_host( # noqa: SLF001
readiness={"type": "tcp", "address": "127.0.0.1:8080"},
workspace_dir=workspace_dir,
cwd=workspace_dir,
)
class StubResponse:
status = 204
def __enter__(self) -> StubResponse:
return self
def __exit__(self, *args: object) -> None:
del args
def _urlopen(request: object, timeout: int) -> StubResponse:
del request
assert timeout == 2
return StubResponse()
monkeypatch.setattr("pyro_mcp.vm_manager.urllib.request.urlopen", _urlopen)
assert vm_manager_module._service_ready_on_host( # noqa: SLF001
readiness={"type": "http", "url": "http://127.0.0.1:8080/"},
workspace_dir=workspace_dir,
cwd=workspace_dir,
)
def test_workspace_service_logs_tail_and_delete_cleanup(tmp_path: Path) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
workspace_id = str(
manager.create_workspace(
environment="debian:12-base",
allow_host_compat=True,
)["workspace_id"]
)
manager.start_service(
workspace_id,
"logger",
command=(
"sh -lc 'printf \"one\\n\"; printf \"two\\n\"; "
"touch .ready; while true; do sleep 60; done'"
),
readiness={"type": "file", "path": ".ready"},
)
logs = manager.logs_service(workspace_id, "logger", tail_lines=1)
assert logs["stdout"] == "two\n"
assert logs["truncated"] is True
services_dir = tmp_path / "vms" / "workspaces" / workspace_id / "services"
assert services_dir.exists()
deleted = manager.delete_workspace(workspace_id)
assert deleted["deleted"] is True
assert not services_dir.exists()
def test_workspace_status_stops_service_counts_when_workspace_is_stopped(tmp_path: Path) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
workspace_id = str(
manager.create_workspace(
environment="debian:12-base",
allow_host_compat=True,
)["workspace_id"]
)
manager.start_service(
workspace_id,
"app",
command="sh -lc 'touch .ready; while true; do sleep 60; done'",
readiness={"type": "file", "path": ".ready"},
)
service_path = tmp_path / "vms" / "workspaces" / workspace_id / "services" / "app.json"
live_service_payload = json.loads(service_path.read_text(encoding="utf-8"))
live_pid = int(live_service_payload["pid"])
try:
workspace_path = tmp_path / "vms" / "workspaces" / workspace_id / "workspace.json"
payload = json.loads(workspace_path.read_text(encoding="utf-8"))
payload["state"] = "stopped"
workspace_path.write_text(json.dumps(payload, indent=2, sort_keys=True), encoding="utf-8")
status = manager.status_workspace(workspace_id)
assert status["state"] == "stopped"
assert status["service_count"] == 1
assert status["running_service_count"] == 0
service_payload = json.loads(service_path.read_text(encoding="utf-8"))
assert service_payload["state"] == "stopped"
assert service_payload["stop_reason"] == "workspace_stopped"
finally:
vm_manager_module._stop_process_group(live_pid) # noqa: SLF001
def test_workspace_service_readiness_validation_helpers() -> None:
assert vm_manager_module._normalize_workspace_service_name("app-1") == "app-1" # noqa: SLF001
with pytest.raises(ValueError, match="service_name must not be empty"):
vm_manager_module._normalize_workspace_service_name(" ") # noqa: SLF001
with pytest.raises(ValueError, match="service_name must match"):
vm_manager_module._normalize_workspace_service_name("bad name") # noqa: SLF001
assert vm_manager_module._normalize_workspace_service_readiness( # noqa: SLF001
{"type": "file", "path": "subdir/.ready"}
) == {"type": "file", "path": "/workspace/subdir/.ready"}
assert vm_manager_module._normalize_workspace_service_readiness( # noqa: SLF001
{"type": "tcp", "address": "127.0.0.1:8080"}
) == {"type": "tcp", "address": "127.0.0.1:8080"}
assert vm_manager_module._normalize_workspace_service_readiness( # noqa: SLF001
{"type": "http", "url": "http://127.0.0.1:8080/"}
) == {"type": "http", "url": "http://127.0.0.1:8080/"}
assert vm_manager_module._normalize_workspace_service_readiness( # noqa: SLF001
{"type": "command", "command": "test -f .ready"}
) == {"type": "command", "command": "test -f .ready"}
with pytest.raises(ValueError, match="one of: file, tcp, http, command"):
vm_manager_module._normalize_workspace_service_readiness({"type": "bogus"}) # noqa: SLF001
with pytest.raises(ValueError, match="required for file readiness"):
vm_manager_module._normalize_workspace_service_readiness({"type": "file"}) # noqa: SLF001
with pytest.raises(ValueError, match="HOST:PORT format"):
vm_manager_module._normalize_workspace_service_readiness( # noqa: SLF001
{"type": "tcp", "address": "127.0.0.1"}
)
with pytest.raises(ValueError, match="required for http readiness"):
vm_manager_module._normalize_workspace_service_readiness({"type": "http"}) # noqa: SLF001
with pytest.raises(ValueError, match="required for command readiness"):
vm_manager_module._normalize_workspace_service_readiness({"type": "command"}) # noqa: SLF001
def test_workspace_service_text_and_exit_code_helpers(tmp_path: Path) -> None:
status_path = tmp_path / "service.status"
assert vm_manager_module._read_service_exit_code(status_path) is None # noqa: SLF001
status_path.write_text("", encoding="utf-8")
assert vm_manager_module._read_service_exit_code(status_path) is None # noqa: SLF001
status_path.write_text("7\n", encoding="utf-8")
assert vm_manager_module._read_service_exit_code(status_path) == 7 # noqa: SLF001
log_path = tmp_path / "service.log"
assert vm_manager_module._tail_text(log_path, tail_lines=10) == ("", False) # noqa: SLF001
log_path.write_text("one\ntwo\nthree\n", encoding="utf-8")
assert vm_manager_module._tail_text(log_path, tail_lines=None) == ( # noqa: SLF001
"one\ntwo\nthree\n",
False,
)
assert vm_manager_module._tail_text(log_path, tail_lines=5) == ( # noqa: SLF001
"one\ntwo\nthree\n",
False,
)
assert vm_manager_module._tail_text(log_path, tail_lines=1) == ("three\n", True) # noqa: SLF001
def test_workspace_service_process_group_helpers(monkeypatch: pytest.MonkeyPatch) -> None:
def _missing(_pid: int, _signal: int) -> None:
raise ProcessLookupError()
monkeypatch.setattr("pyro_mcp.vm_manager.os.killpg", _missing)
assert vm_manager_module._stop_process_group(123) == (False, False) # noqa: SLF001
kill_calls: list[int] = []
monotonic_values = iter([0.0, 0.0, 5.0, 5.0, 10.0])
running_states = iter([True, True, False])
def _killpg(_pid: int, signum: int) -> None:
kill_calls.append(signum)
def _monotonic() -> float:
return next(monotonic_values)
def _is_running(_pid: int | None) -> bool:
return next(running_states)
monkeypatch.setattr("pyro_mcp.vm_manager.os.killpg", _killpg)
monkeypatch.setattr("pyro_mcp.vm_manager.time.monotonic", _monotonic)
monkeypatch.setattr("pyro_mcp.vm_manager.time.sleep", lambda _seconds: None)
monkeypatch.setattr("pyro_mcp.vm_manager._pid_is_running", _is_running)
stopped, killed = vm_manager_module._stop_process_group(456, wait_seconds=5) # noqa: SLF001
assert (stopped, killed) == (True, True)
assert kill_calls == [signal.SIGTERM, signal.SIGKILL]
def test_workspace_service_probe_and_refresh_helpers(
tmp_path: Path, monkeypatch: pytest.MonkeyPatch
) -> None:
assert vm_manager_module._run_service_probe_command(tmp_path, "exit 3") == 3 # noqa: SLF001
services_dir = tmp_path / "services"
services_dir.mkdir()
status_path = services_dir / "app.status"
status_path.write_text("9\n", encoding="utf-8")
running = vm_manager_module.WorkspaceServiceRecord( # noqa: SLF001
workspace_id="workspace-1",
service_name="app",
command="sleep 60",
cwd="/workspace",
state="running",
started_at=time.time(),
readiness=None,
ready_at=None,
ended_at=None,
exit_code=None,
pid=1234,
execution_mode="host_compat",
stop_reason=None,
)
monkeypatch.setattr("pyro_mcp.vm_manager._pid_is_running", lambda _pid: False)
refreshed = vm_manager_module._refresh_local_service_record( # noqa: SLF001
running,
services_dir=services_dir,
)
assert refreshed.state == "exited"
assert refreshed.exit_code == 9
monkeypatch.setattr(
"pyro_mcp.vm_manager._stop_process_group",
lambda _pid: (True, False),
)
stopped = vm_manager_module._stop_local_service( # noqa: SLF001
refreshed,
services_dir=services_dir,
)
assert stopped.state == "stopped"
assert stopped.stop_reason == "sigterm"
def test_workspace_secrets_redact_exec_shell_service_and_survive_reset(tmp_path: Path) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
secret_file = tmp_path / "token.txt"
secret_file.write_text("from-file\n", encoding="utf-8")
created = manager.create_workspace(
environment="debian:12-base",
allow_host_compat=True,
secrets=[
{"name": "API_TOKEN", "value": "expected"},
{"name": "FILE_TOKEN", "file_path": str(secret_file)},
],
)
workspace_id = str(created["workspace_id"])
assert created["secrets"] == [
{"name": "API_TOKEN", "source_kind": "literal"},
{"name": "FILE_TOKEN", "source_kind": "file"},
]
no_secret = manager.exec_workspace(
workspace_id,
command='sh -lc \'printf "%s" "${API_TOKEN:-missing}"\'',
timeout_seconds=30,
)
assert no_secret["stdout"] == "missing"
executed = manager.exec_workspace(
workspace_id,
command='sh -lc \'printf "%s\\n" "$API_TOKEN"\'',
timeout_seconds=30,
secret_env={"API_TOKEN": "API_TOKEN"},
)
assert executed["stdout"] == "[REDACTED]\n"
logs = manager.logs_workspace(workspace_id)
assert logs["entries"][-1]["stdout"] == "[REDACTED]\n"
shell = manager.open_shell(
workspace_id,
secret_env={"API_TOKEN": "API_TOKEN"},
)
shell_id = str(shell["shell_id"])
manager.write_shell(workspace_id, shell_id, input_text='printf "%s\\n" "$API_TOKEN"')
output = ""
deadline = time.time() + 5
while time.time() < deadline:
read = manager.read_shell(workspace_id, shell_id, cursor=0, max_chars=65536)
output = str(read["output"])
if "[REDACTED]" in output:
break
time.sleep(0.05)
assert "[REDACTED]" in output
manager.close_shell(workspace_id, shell_id)
started = manager.start_service(
workspace_id,
"app",
command=(
'sh -lc \'trap "exit 0" TERM; printf "%s\\n" "$API_TOKEN" >&2; '
'touch .ready; while true; do sleep 60; done\''
),
readiness={"type": "file", "path": ".ready"},
secret_env={"API_TOKEN": "API_TOKEN"},
)
assert started["state"] == "running"
service_logs = manager.logs_service(workspace_id, "app", tail_lines=None)
assert "[REDACTED]" in str(service_logs["stderr"])
reset = manager.reset_workspace(workspace_id)
assert reset["secrets"] == created["secrets"]
after_reset = manager.exec_workspace(
workspace_id,
command='sh -lc \'printf "%s\\n" "$API_TOKEN"\'',
timeout_seconds=30,
secret_env={"API_TOKEN": "API_TOKEN"},
)
assert after_reset["stdout"] == "[REDACTED]\n"
def test_workspace_secret_validation_helpers(tmp_path: Path) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
assert vm_manager_module._normalize_workspace_secret_name("API_TOKEN") == "API_TOKEN" # noqa: SLF001
with pytest.raises(ValueError, match="secret name must match"):
vm_manager_module._normalize_workspace_secret_name("bad-name") # noqa: SLF001
with pytest.raises(ValueError, match="must not be empty"):
vm_manager_module._validate_workspace_secret_value("TOKEN", "") # noqa: SLF001
with pytest.raises(ValueError, match="duplicate secret name"):
manager.create_workspace(
environment="debian:12-base",
allow_host_compat=True,
secrets=[
{"name": "TOKEN", "value": "one"},
{"name": "TOKEN", "value": "two"},
],
)
def test_prepare_workspace_secrets_handles_file_inputs_and_validation_errors(
tmp_path: Path,
) -> None:
secrets_dir = tmp_path / "secrets"
valid_file = tmp_path / "token.txt"
valid_file.write_text("from-file\n", encoding="utf-8")
invalid_utf8 = tmp_path / "invalid.bin"
invalid_utf8.write_bytes(b"\xff\xfe")
oversized = tmp_path / "oversized.txt"
oversized.write_text(
"x" * (vm_manager_module.WORKSPACE_SECRET_MAX_BYTES + 1),
encoding="utf-8",
)
records, values = vm_manager_module._prepare_workspace_secrets( # noqa: SLF001
[
{"name": "B_TOKEN", "value": "literal"},
{"name": "A_TOKEN", "file_path": str(valid_file)},
],
secrets_dir=secrets_dir,
)
assert [record.name for record in records] == ["A_TOKEN", "B_TOKEN"]
assert values == {"A_TOKEN": "from-file\n", "B_TOKEN": "literal"}
assert (secrets_dir / "A_TOKEN.secret").read_text(encoding="utf-8") == "from-file\n"
assert oct(secrets_dir.stat().st_mode & 0o777) == "0o700"
assert oct((secrets_dir / "A_TOKEN.secret").stat().st_mode & 0o777) == "0o600"
with pytest.raises(ValueError, match="must be a dictionary"):
vm_manager_module._prepare_workspace_secrets( # noqa: SLF001
[cast(dict[str, str], "bad")],
secrets_dir=tmp_path / "bad1",
)
with pytest.raises(ValueError, match="missing 'name'"):
vm_manager_module._prepare_workspace_secrets([{}], secrets_dir=tmp_path / "bad2") # noqa: SLF001
with pytest.raises(ValueError, match="exactly one of 'value' or 'file_path'"):
vm_manager_module._prepare_workspace_secrets( # noqa: SLF001
[{"name": "TOKEN", "value": "x", "file_path": str(valid_file)}],
secrets_dir=tmp_path / "bad3",
)
with pytest.raises(ValueError, match="file_path must not be empty"):
vm_manager_module._prepare_workspace_secrets( # noqa: SLF001
[{"name": "TOKEN", "file_path": " "}],
secrets_dir=tmp_path / "bad4",
)
with pytest.raises(ValueError, match="does not exist"):
vm_manager_module._prepare_workspace_secrets( # noqa: SLF001
[{"name": "TOKEN", "file_path": str(tmp_path / "missing.txt")}],
secrets_dir=tmp_path / "bad5",
)
with pytest.raises(ValueError, match="must be valid UTF-8 text"):
vm_manager_module._prepare_workspace_secrets( # noqa: SLF001
[{"name": "TOKEN", "file_path": str(invalid_utf8)}],
secrets_dir=tmp_path / "bad6",
)
with pytest.raises(ValueError, match="must be at most"):
vm_manager_module._prepare_workspace_secrets( # noqa: SLF001
[{"name": "TOKEN", "file_path": str(oversized)}],
secrets_dir=tmp_path / "bad7",
)
def test_workspace_secrets_require_guest_exec_on_firecracker_runtime(
tmp_path: Path,
monkeypatch: pytest.MonkeyPatch,
) -> None:
class StubFirecrackerBackend:
def __init__(self, *args: Any, **kwargs: Any) -> None:
del args, kwargs
def create(self, instance: Any) -> None:
del instance
def start(self, instance: Any) -> None:
del instance
def stop(self, instance: Any) -> None:
del instance
def delete(self, instance: Any) -> None:
del instance
monkeypatch.setattr(vm_manager_module, "FirecrackerBackend", StubFirecrackerBackend)
manager = VmManager(
backend_name="firecracker",
base_dir=tmp_path / "vms",
runtime_paths=resolve_runtime_paths(),
network_manager=TapNetworkManager(enabled=False),
)
manager._runtime_capabilities = RuntimeCapabilities( # noqa: SLF001
supports_vm_boot=True,
supports_guest_exec=False,
supports_guest_network=False,
reason="guest exec is unavailable",
)
with pytest.raises(RuntimeError, match="workspace secrets require guest execution"):
manager.create_workspace(
environment="debian:12-base",
allow_host_compat=True,
secrets=[{"name": "TOKEN", "value": "expected"}],
)
def test_workspace_stop_and_start_preserve_logs_and_clear_live_state(tmp_path: Path) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
seed_dir = tmp_path / "seed"
seed_dir.mkdir()
(seed_dir / "note.txt").write_text("hello from seed\n", encoding="utf-8")
created = manager.create_workspace(
environment="debian:12-base",
allow_host_compat=True,
seed_path=seed_dir,
)
workspace_id = str(created["workspace_id"])
manager.exec_workspace(workspace_id, command="cat note.txt", timeout_seconds=30)
shell = manager.open_shell(workspace_id)
shell_id = str(shell["shell_id"])
started_service = manager.start_service(
workspace_id,
"app",
command='sh -lc \'touch .ready && trap "exit 0" TERM; while true; do sleep 60; done\'',
readiness={"type": "file", "path": ".ready"},
)
assert started_service["state"] == "running"
stopped = manager.stop_workspace(workspace_id)
assert stopped["state"] == "stopped"
assert stopped["command_count"] == 1
assert stopped["service_count"] == 0
assert stopped["running_service_count"] == 0
assert manager.logs_workspace(workspace_id)["count"] == 1
with pytest.raises(RuntimeError, match="must be in 'started' state"):
manager.read_shell(workspace_id, shell_id, cursor=0, max_chars=1024)
restarted = manager.start_workspace(workspace_id)
assert restarted["state"] == "started"
assert restarted["command_count"] == 1
assert restarted["service_count"] == 0
rerun = manager.exec_workspace(workspace_id, command="cat note.txt", timeout_seconds=30)
assert rerun["stdout"] == "hello from seed\n"
def test_workspace_stop_flushes_guest_filesystem_before_stopping(
tmp_path: Path,
) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
created = manager.create_workspace(
environment="debian:12-base",
allow_host_compat=True,
)
workspace_id = str(created["workspace_id"])
workspace_path = tmp_path / "vms" / "workspaces" / workspace_id / "workspace.json"
payload = json.loads(workspace_path.read_text(encoding="utf-8"))
payload["state"] = "started"
payload["firecracker_pid"] = os.getpid()
payload["metadata"]["execution_mode"] = "guest_vsock"
payload["metadata"]["rootfs_image"] = str(_create_stopped_workspace_rootfs(tmp_path))
workspace_path.write_text(json.dumps(payload, indent=2, sort_keys=True), encoding="utf-8")
calls: list[tuple[str, str]] = []
class StubBackend:
def exec(
self,
instance: Any,
command: str,
timeout_seconds: int,
*,
workdir: Path | None = None,
env: dict[str, str] | None = None,
) -> vm_manager_module.VmExecResult:
del instance, timeout_seconds, workdir, env
calls.append(("exec", command))
return vm_manager_module.VmExecResult(
stdout="",
stderr="",
exit_code=0,
duration_ms=1,
)
def stop(self, instance: Any) -> None:
del instance
calls.append(("stop", "instance"))
manager._backend = StubBackend() # type: ignore[assignment] # noqa: SLF001
manager._backend_name = "firecracker" # noqa: SLF001
manager._runtime_capabilities = RuntimeCapabilities( # noqa: SLF001
supports_vm_boot=True,
supports_guest_exec=True,
supports_guest_network=False,
reason=None,
)
stopped = manager.stop_workspace(workspace_id)
assert calls == [("exec", "sync"), ("stop", "instance")]
assert stopped["state"] == "stopped"
def test_workspace_disk_operations_scrub_runtime_only_paths_and_export(
tmp_path: Path,
) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
rootfs_image = _create_stopped_workspace_rootfs(tmp_path)
workspace_id = "workspace-disk-123"
workspace = vm_manager_module.WorkspaceRecord(
workspace_id=workspace_id,
environment="debian:12-base",
vcpu_count=1,
mem_mib=512,
ttl_seconds=600,
created_at=time.time(),
expires_at=time.time() + 600,
state="stopped",
network_policy="off",
allow_host_compat=False,
metadata={
"execution_mode": "guest_vsock",
"rootfs_image": str(rootfs_image),
"workspace_path": "/workspace",
},
)
manager._save_workspace_locked(workspace) # noqa: SLF001
listed = manager.list_workspace_disk(workspace_id, path="/workspace", recursive=True)
assert listed["path"] == "/workspace"
listed_paths = {entry["path"] for entry in listed["entries"]}
assert "/workspace/note.txt" in listed_paths
assert "/workspace/src/child.txt" in listed_paths
assert "/workspace/link" in listed_paths
read_payload = manager.read_workspace_disk(workspace_id, path="note.txt", max_bytes=4096)
assert read_payload["content"] == "hello from disk\n"
assert read_payload["truncated"] is False
run_listing = manager.list_workspace_disk(workspace_id, path="/run", recursive=True)
run_paths = {entry["path"] for entry in run_listing["entries"]}
assert "/run/pyro-secrets" not in run_paths
assert "/run/pyro-services" not in run_paths
exported_path = tmp_path / "workspace-copy.ext4"
exported = manager.export_workspace_disk(workspace_id, output_path=exported_path)
assert exported["disk_format"] == "ext4"
assert exported_path.exists()
assert exported_path.stat().st_size == int(exported["bytes_written"])
def test_workspace_disk_operations_reject_host_compat_workspaces(tmp_path: Path) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
created = manager.create_workspace(
environment="debian:12-base",
allow_host_compat=True,
)
workspace_id = str(created["workspace_id"])
manager.stop_workspace(workspace_id)
with pytest.raises(RuntimeError, match="host_compat workspaces"):
manager.export_workspace_disk(workspace_id, output_path=tmp_path / "workspace.ext4")
with pytest.raises(RuntimeError, match="host_compat workspaces"):
manager.list_workspace_disk(workspace_id)
with pytest.raises(RuntimeError, match="host_compat workspaces"):
manager.read_workspace_disk(workspace_id, path="note.txt")