pyro-mcp/tests/test_vm_manager.py
Thales Maciel 18b8fd2a7d Add workspace snapshots and full reset
Implement the 2.8.0 workspace milestone with named snapshots and full-sandbox reset across the CLI, Python SDK, and MCP server.

Persist the immutable baseline plus named snapshot archives under each workspace, add workspace reset metadata, and make reset recreate the sandbox while clearing command history, shells, and services without changing the workspace identity or diff baseline.

Refresh the 2.8.0 docs, roadmap, and Python example around reset-over-repair, then validate with uv lock, UV_CACHE_DIR=.uv-cache make check, UV_CACHE_DIR=.uv-cache make dist-check, and a real guest-backed create/snapshot/reset/diff smoke test outside the sandbox.
2026-03-12 12:41:11 -03:00

1715 lines
59 KiB
Python

from __future__ import annotations
import io
import json
import os
import signal
import subprocess
import tarfile
import time
from pathlib import Path
from typing import Any
import pytest
import pyro_mcp.vm_manager as vm_manager_module
from pyro_mcp.runtime import RuntimeCapabilities, resolve_runtime_paths
from pyro_mcp.vm_manager import VmManager
from pyro_mcp.vm_network import NetworkConfig, TapNetworkManager
def test_vm_manager_lifecycle_and_auto_cleanup(tmp_path: Path) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
created = manager.create_vm(
environment="debian:12",
vcpu_count=1,
mem_mib=512,
ttl_seconds=600,
allow_host_compat=True,
)
vm_id = str(created["vm_id"])
started = manager.start_vm(vm_id)
assert started["state"] == "started"
executed = manager.exec_vm(vm_id, command="printf 'git version 2.43.0\\n'", timeout_seconds=30)
assert executed["exit_code"] == 0
assert executed["execution_mode"] == "host_compat"
assert "git version" in str(executed["stdout"])
with pytest.raises(ValueError, match="does not exist"):
manager.status_vm(vm_id)
def test_vm_manager_exec_timeout(tmp_path: Path) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
vm_id = str(
manager.create_vm(
environment="debian:12-base",
vcpu_count=1,
mem_mib=512,
ttl_seconds=600,
allow_host_compat=True,
)["vm_id"]
)
manager.start_vm(vm_id)
result = manager.exec_vm(vm_id, command="sleep 2", timeout_seconds=1)
assert result["exit_code"] == 124
assert "timed out" in str(result["stderr"])
def test_vm_manager_stop_and_delete(tmp_path: Path) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
vm_id = str(
manager.create_vm(
environment="debian:12-base",
vcpu_count=1,
mem_mib=512,
ttl_seconds=600,
allow_host_compat=True,
)["vm_id"]
)
manager.start_vm(vm_id)
stopped = manager.stop_vm(vm_id)
assert stopped["state"] == "stopped"
deleted = manager.delete_vm(vm_id)
assert deleted["deleted"] is True
def test_vm_manager_reaps_expired(tmp_path: Path) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
manager.MIN_TTL_SECONDS = 1
vm_id = str(
manager.create_vm(
environment="debian:12-base",
vcpu_count=1,
mem_mib=512,
ttl_seconds=1,
allow_host_compat=True,
)["vm_id"]
)
instance = manager._instances[vm_id] # noqa: SLF001
instance.expires_at = 0.0
result = manager.reap_expired()
assert result["count"] == 1
with pytest.raises(ValueError):
manager.status_vm(vm_id)
def test_vm_manager_reaps_started_vm(tmp_path: Path) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
manager.MIN_TTL_SECONDS = 1
vm_id = str(
manager.create_vm(
environment="debian:12-base",
vcpu_count=1,
mem_mib=512,
ttl_seconds=1,
allow_host_compat=True,
)["vm_id"]
)
manager.start_vm(vm_id)
manager._instances[vm_id].expires_at = 0.0 # noqa: SLF001
result = manager.reap_expired()
assert result["count"] == 1
@pytest.mark.parametrize(
("kwargs", "msg"),
[
({"vcpu_count": 0, "mem_mib": 512, "ttl_seconds": 600}, "vcpu_count must be between"),
({"vcpu_count": 1, "mem_mib": 64, "ttl_seconds": 600}, "mem_mib must be between"),
({"vcpu_count": 1, "mem_mib": 512, "ttl_seconds": 30}, "ttl_seconds must be between"),
],
)
def test_vm_manager_validates_limits(tmp_path: Path, kwargs: dict[str, Any], msg: str) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
with pytest.raises(ValueError, match=msg):
manager.create_vm(environment="debian:12-base", **kwargs)
def test_vm_manager_max_active_limit(tmp_path: Path) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
max_active_vms=1,
network_manager=TapNetworkManager(enabled=False),
)
manager.create_vm(
environment="debian:12-base",
vcpu_count=1,
mem_mib=512,
ttl_seconds=600,
allow_host_compat=True,
)
with pytest.raises(RuntimeError, match="max active VMs reached"):
manager.create_vm(
environment="debian:12-base",
vcpu_count=1,
mem_mib=512,
ttl_seconds=600,
allow_host_compat=True,
)
def test_vm_manager_state_validation(tmp_path: Path) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
vm_id = str(
manager.create_vm(
environment="debian:12-base",
vcpu_count=1,
mem_mib=512,
ttl_seconds=600,
allow_host_compat=True,
)["vm_id"]
)
with pytest.raises(RuntimeError, match="must be in 'started' state"):
manager.exec_vm(vm_id, command="echo hi", timeout_seconds=30)
with pytest.raises(ValueError, match="must be positive"):
manager.exec_vm(vm_id, command="echo hi", timeout_seconds=0)
manager.start_vm(vm_id)
with pytest.raises(RuntimeError, match="cannot be started from state"):
manager.start_vm(vm_id)
def test_vm_manager_status_expired_raises(tmp_path: Path) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
manager.MIN_TTL_SECONDS = 1
vm_id = str(
manager.create_vm(
environment="debian:12-base",
vcpu_count=1,
mem_mib=512,
ttl_seconds=1,
allow_host_compat=True,
)["vm_id"]
)
manager._instances[vm_id].expires_at = 0.0 # noqa: SLF001
with pytest.raises(RuntimeError, match="expired and was automatically deleted"):
manager.status_vm(vm_id)
def test_vm_manager_invalid_backend(tmp_path: Path) -> None:
with pytest.raises(ValueError, match="invalid backend"):
VmManager(
backend_name="nope",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
def test_vm_manager_network_info(tmp_path: Path) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
created = manager.create_vm(
environment="debian:12-base",
vcpu_count=1,
mem_mib=512,
ttl_seconds=600,
allow_host_compat=True,
)
vm_id = str(created["vm_id"])
status = manager.status_vm(vm_id)
info = manager.network_info_vm(vm_id)
assert status["network_enabled"] is False
assert status["guest_ip"] is None
assert info["network_enabled"] is False
def test_vm_manager_run_vm(tmp_path: Path) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
result = manager.run_vm(
environment="debian:12-base",
command="printf 'ok\\n'",
vcpu_count=1,
mem_mib=512,
timeout_seconds=30,
ttl_seconds=600,
network=False,
allow_host_compat=True,
)
assert int(result["exit_code"]) == 0
assert str(result["stdout"]) == "ok\n"
def test_workspace_lifecycle_and_logs(tmp_path: Path) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
created = manager.create_workspace(
environment="debian:12-base",
allow_host_compat=True,
)
workspace_id = str(created["workspace_id"])
assert created["state"] == "started"
assert created["workspace_path"] == "/workspace"
first = manager.exec_workspace(
workspace_id,
command="printf 'hello\\n' > note.txt",
timeout_seconds=30,
)
second = manager.exec_workspace(workspace_id, command="cat note.txt", timeout_seconds=30)
assert first["exit_code"] == 0
assert second["stdout"] == "hello\n"
status = manager.status_workspace(workspace_id)
assert status["command_count"] == 2
assert status["last_command"] is not None
logs = manager.logs_workspace(workspace_id)
assert logs["count"] == 2
entries = logs["entries"]
assert isinstance(entries, list)
assert entries[1]["stdout"] == "hello\n"
deleted = manager.delete_workspace(workspace_id)
assert deleted["deleted"] is True
with pytest.raises(ValueError, match="does not exist"):
manager.status_workspace(workspace_id)
def test_workspace_create_seeds_directory_source_into_workspace(tmp_path: Path) -> None:
source_dir = tmp_path / "seed"
source_dir.mkdir()
(source_dir / "note.txt").write_text("hello\n", encoding="utf-8")
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
created = manager.create_workspace(
environment="debian:12-base",
allow_host_compat=True,
seed_path=source_dir,
)
workspace_id = str(created["workspace_id"])
workspace_seed = created["workspace_seed"]
assert workspace_seed["mode"] == "directory"
assert workspace_seed["seed_path"] == str(source_dir.resolve())
executed = manager.exec_workspace(workspace_id, command="cat note.txt", timeout_seconds=30)
assert executed["stdout"] == "hello\n"
status = manager.status_workspace(workspace_id)
assert status["workspace_seed"]["mode"] == "directory"
assert status["workspace_seed"]["seed_path"] == str(source_dir.resolve())
def test_workspace_create_seeds_tar_archive_into_workspace(tmp_path: Path) -> None:
archive_path = tmp_path / "seed.tgz"
nested_dir = tmp_path / "src"
nested_dir.mkdir()
(nested_dir / "note.txt").write_text("archive\n", encoding="utf-8")
with tarfile.open(archive_path, "w:gz") as archive:
archive.add(nested_dir / "note.txt", arcname="note.txt")
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
created = manager.create_workspace(
environment="debian:12-base",
allow_host_compat=True,
seed_path=archive_path,
)
workspace_id = str(created["workspace_id"])
assert created["workspace_seed"]["mode"] == "tar_archive"
executed = manager.exec_workspace(workspace_id, command="cat note.txt", timeout_seconds=30)
assert executed["stdout"] == "archive\n"
def test_workspace_sync_push_updates_started_workspace(tmp_path: Path) -> None:
source_dir = tmp_path / "seed"
source_dir.mkdir()
(source_dir / "note.txt").write_text("hello\n", encoding="utf-8")
update_dir = tmp_path / "update"
update_dir.mkdir()
(update_dir / "more.txt").write_text("more\n", encoding="utf-8")
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
created = manager.create_workspace(
environment="debian:12-base",
allow_host_compat=True,
seed_path=source_dir,
)
workspace_id = str(created["workspace_id"])
synced = manager.push_workspace_sync(workspace_id, source_path=update_dir, dest="subdir")
assert synced["workspace_sync"]["mode"] == "directory"
assert synced["workspace_sync"]["destination"] == "/workspace/subdir"
executed = manager.exec_workspace(
workspace_id,
command="cat subdir/more.txt",
timeout_seconds=30,
)
assert executed["stdout"] == "more\n"
status = manager.status_workspace(workspace_id)
assert status["command_count"] == 1
assert status["workspace_seed"]["mode"] == "directory"
def test_workspace_sync_push_requires_started_workspace(tmp_path: Path) -> None:
source_dir = tmp_path / "seed"
source_dir.mkdir()
(source_dir / "note.txt").write_text("hello\n", encoding="utf-8")
update_dir = tmp_path / "update"
update_dir.mkdir()
(update_dir / "more.txt").write_text("more\n", encoding="utf-8")
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
created = manager.create_workspace(
environment="debian:12-base",
allow_host_compat=True,
seed_path=source_dir,
)
workspace_id = str(created["workspace_id"])
workspace_path = tmp_path / "vms" / "workspaces" / workspace_id / "workspace.json"
payload = json.loads(workspace_path.read_text(encoding="utf-8"))
payload["state"] = "stopped"
workspace_path.write_text(json.dumps(payload, indent=2, sort_keys=True), encoding="utf-8")
with pytest.raises(
RuntimeError,
match="must be in 'started' state before workspace_sync_push",
):
manager.push_workspace_sync(workspace_id, source_path=update_dir)
def test_workspace_sync_push_rejects_destination_outside_workspace(tmp_path: Path) -> None:
source_dir = tmp_path / "seed"
source_dir.mkdir()
(source_dir / "note.txt").write_text("hello\n", encoding="utf-8")
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
workspace_id = str(
manager.create_workspace(
environment="debian:12-base",
allow_host_compat=True,
)["workspace_id"]
)
with pytest.raises(ValueError, match="workspace destination must stay inside /workspace"):
manager.push_workspace_sync(workspace_id, source_path=source_dir, dest="../escape")
def test_workspace_diff_and_export_round_trip(tmp_path: Path) -> None:
seed_dir = tmp_path / "seed"
seed_dir.mkdir()
(seed_dir / "note.txt").write_text("hello\n", encoding="utf-8")
update_dir = tmp_path / "update"
update_dir.mkdir()
(update_dir / "note.txt").write_text("hello from sync\n", encoding="utf-8")
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
workspace_id = str(
manager.create_workspace(
environment="debian:12-base",
allow_host_compat=True,
seed_path=seed_dir,
)["workspace_id"]
)
manager.push_workspace_sync(workspace_id, source_path=update_dir)
diff_payload = manager.diff_workspace(workspace_id)
assert diff_payload["workspace_id"] == workspace_id
assert diff_payload["changed"] is True
assert diff_payload["summary"]["modified"] == 1
assert diff_payload["summary"]["text_patched"] == 1
assert "-hello\n" in diff_payload["patch"]
assert "+hello from sync\n" in diff_payload["patch"]
output_path = tmp_path / "exported-note.txt"
export_payload = manager.export_workspace(
workspace_id,
path="note.txt",
output_path=output_path,
)
assert export_payload["workspace_id"] == workspace_id
assert export_payload["artifact_type"] == "file"
assert output_path.read_text(encoding="utf-8") == "hello from sync\n"
status = manager.status_workspace(workspace_id)
logs = manager.logs_workspace(workspace_id)
assert status["command_count"] == 0
assert logs["count"] == 0
def test_workspace_export_directory_uses_exact_output_path(tmp_path: Path) -> None:
seed_dir = tmp_path / "seed"
nested_dir = seed_dir / "src"
nested_dir.mkdir(parents=True)
(nested_dir / "note.txt").write_text("hello\n", encoding="utf-8")
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
workspace_id = str(
manager.create_workspace(
environment="debian:12-base",
allow_host_compat=True,
seed_path=seed_dir,
)["workspace_id"]
)
output_dir = tmp_path / "exported-src"
payload = manager.export_workspace(workspace_id, path="src", output_path=output_dir)
assert payload["artifact_type"] == "directory"
assert (output_dir / "note.txt").read_text(encoding="utf-8") == "hello\n"
assert not (output_dir / "src").exists()
def test_workspace_diff_requires_create_time_baseline(tmp_path: Path) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
workspace_id = str(
manager.create_workspace(
environment="debian:12-base",
allow_host_compat=True,
)["workspace_id"]
)
baseline_path = tmp_path / "vms" / "workspaces" / workspace_id / "baseline" / "workspace.tar"
baseline_path.unlink()
with pytest.raises(RuntimeError, match="require[s]? a baseline snapshot"):
manager.diff_workspace(workspace_id)
def test_workspace_snapshots_and_reset_round_trip(tmp_path: Path) -> None:
seed_dir = tmp_path / "seed"
seed_dir.mkdir()
(seed_dir / "note.txt").write_text("seed\n", encoding="utf-8")
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
workspace_id = str(
manager.create_workspace(
environment="debian:12-base",
allow_host_compat=True,
seed_path=seed_dir,
)["workspace_id"]
)
manager.exec_workspace(
workspace_id,
command="printf 'checkpoint\\n' > note.txt",
timeout_seconds=30,
)
created_snapshot = manager.create_snapshot(workspace_id, "checkpoint")
assert created_snapshot["snapshot"]["snapshot_name"] == "checkpoint"
listed = manager.list_snapshots(workspace_id)
assert listed["count"] == 2
assert [snapshot["snapshot_name"] for snapshot in listed["snapshots"]] == [
"baseline",
"checkpoint",
]
manager.exec_workspace(
workspace_id,
command="printf 'after\\n' > note.txt",
timeout_seconds=30,
)
manager.start_service(
workspace_id,
"app",
command="sh -lc 'touch .ready; while true; do sleep 60; done'",
readiness={"type": "file", "path": ".ready"},
)
reset_to_snapshot = manager.reset_workspace(workspace_id, snapshot="checkpoint")
assert reset_to_snapshot["workspace_reset"]["snapshot_name"] == "checkpoint"
assert reset_to_snapshot["reset_count"] == 1
assert reset_to_snapshot["last_command"] is None
assert reset_to_snapshot["command_count"] == 0
assert reset_to_snapshot["service_count"] == 0
assert reset_to_snapshot["running_service_count"] == 0
checkpoint_result = manager.exec_workspace(
workspace_id,
command="cat note.txt",
timeout_seconds=30,
)
assert checkpoint_result["stdout"] == "checkpoint\n"
logs_after_snapshot_reset = manager.logs_workspace(workspace_id)
assert logs_after_snapshot_reset["count"] == 1
reset_to_baseline = manager.reset_workspace(workspace_id)
assert reset_to_baseline["workspace_reset"]["snapshot_name"] == "baseline"
assert reset_to_baseline["reset_count"] == 2
assert reset_to_baseline["command_count"] == 0
assert reset_to_baseline["service_count"] == 0
assert manager.logs_workspace(workspace_id)["count"] == 0
baseline_result = manager.exec_workspace(
workspace_id,
command="cat note.txt",
timeout_seconds=30,
)
assert baseline_result["stdout"] == "seed\n"
diff_payload = manager.diff_workspace(workspace_id)
assert diff_payload["changed"] is False
deleted_snapshot = manager.delete_snapshot(workspace_id, "checkpoint")
assert deleted_snapshot["deleted"] is True
listed_after_delete = manager.list_snapshots(workspace_id)
assert [snapshot["snapshot_name"] for snapshot in listed_after_delete["snapshots"]] == [
"baseline"
]
def test_workspace_snapshot_and_reset_require_baseline(tmp_path: Path) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
workspace_id = str(
manager.create_workspace(
environment="debian:12-base",
allow_host_compat=True,
)["workspace_id"]
)
baseline_path = tmp_path / "vms" / "workspaces" / workspace_id / "baseline" / "workspace.tar"
baseline_path.unlink()
with pytest.raises(RuntimeError, match="require[s]? a baseline snapshot"):
manager.list_snapshots(workspace_id)
with pytest.raises(RuntimeError, match="require[s]? a baseline snapshot"):
manager.create_snapshot(workspace_id, "checkpoint")
with pytest.raises(RuntimeError, match="require[s]? a baseline snapshot"):
manager.delete_snapshot(workspace_id, "checkpoint")
with pytest.raises(RuntimeError, match="require[s]? a baseline snapshot"):
manager.reset_workspace(workspace_id)
def test_workspace_delete_baseline_snapshot_is_rejected(tmp_path: Path) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
workspace_id = str(
manager.create_workspace(
environment="debian:12-base",
allow_host_compat=True,
)["workspace_id"]
)
with pytest.raises(ValueError, match="cannot delete the baseline snapshot"):
manager.delete_snapshot(workspace_id, "baseline")
def test_workspace_reset_recreates_stopped_workspace(tmp_path: Path) -> None:
seed_dir = tmp_path / "seed"
seed_dir.mkdir()
(seed_dir / "note.txt").write_text("seed\n", encoding="utf-8")
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
workspace_id = str(
manager.create_workspace(
environment="debian:12-base",
allow_host_compat=True,
seed_path=seed_dir,
)["workspace_id"]
)
with manager._lock: # noqa: SLF001
workspace = manager._load_workspace_locked(workspace_id) # noqa: SLF001
workspace.state = "stopped"
workspace.firecracker_pid = None
manager._save_workspace_locked(workspace) # noqa: SLF001
reset_payload = manager.reset_workspace(workspace_id)
assert reset_payload["state"] == "started"
assert reset_payload["workspace_reset"]["snapshot_name"] == "baseline"
result = manager.exec_workspace(workspace_id, command="cat note.txt", timeout_seconds=30)
assert result["stdout"] == "seed\n"
def test_workspace_reset_failure_leaves_workspace_stopped(tmp_path: Path) -> None:
seed_dir = tmp_path / "seed"
seed_dir.mkdir()
(seed_dir / "note.txt").write_text("seed\n", encoding="utf-8")
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
workspace_id = str(
manager.create_workspace(
environment="debian:12-base",
allow_host_compat=True,
seed_path=seed_dir,
)["workspace_id"]
)
manager.create_snapshot(workspace_id, "checkpoint")
def _failing_import_archive(*args: Any, **kwargs: Any) -> dict[str, Any]:
del args, kwargs
raise RuntimeError("boom")
manager._backend.import_archive = _failing_import_archive # type: ignore[method-assign] # noqa: SLF001
with pytest.raises(RuntimeError, match="boom"):
manager.reset_workspace(workspace_id, snapshot="checkpoint")
with manager._lock: # noqa: SLF001
workspace = manager._load_workspace_locked(workspace_id) # noqa: SLF001
assert workspace.state == "stopped"
assert workspace.firecracker_pid is None
assert workspace.reset_count == 0
listed = manager.list_snapshots(workspace_id)
assert [snapshot["snapshot_name"] for snapshot in listed["snapshots"]] == [
"baseline",
"checkpoint",
]
def test_workspace_export_helpers_preserve_directory_symlinks(tmp_path: Path) -> None:
workspace_dir = tmp_path / "workspace"
workspace_dir.mkdir()
(workspace_dir / "note.txt").write_text("hello\n", encoding="utf-8")
os.symlink("note.txt", workspace_dir / "note-link")
(workspace_dir / "empty-dir").mkdir()
archive_path = tmp_path / "workspace-export.tar"
exported = vm_manager_module._prepare_workspace_export_archive( # noqa: SLF001
workspace_dir=workspace_dir,
workspace_path=".",
archive_path=archive_path,
)
assert exported.artifact_type == "directory"
output_dir = tmp_path / "output"
extracted = vm_manager_module._extract_workspace_export_archive( # noqa: SLF001
archive_path,
output_path=output_dir,
artifact_type="directory",
)
assert extracted["artifact_type"] == "directory"
assert (output_dir / "note.txt").read_text(encoding="utf-8") == "hello\n"
assert (output_dir / "note-link").is_symlink()
assert os.readlink(output_dir / "note-link") == "note.txt"
assert (output_dir / "empty-dir").is_dir()
def test_workspace_export_helpers_validate_missing_path_and_existing_output(tmp_path: Path) -> None:
workspace_dir = tmp_path / "workspace"
workspace_dir.mkdir()
(workspace_dir / "note.txt").write_text("hello\n", encoding="utf-8")
with pytest.raises(RuntimeError, match="workspace path does not exist"):
vm_manager_module._prepare_workspace_export_archive( # noqa: SLF001
workspace_dir=workspace_dir,
workspace_path="missing.txt",
archive_path=tmp_path / "missing.tar",
)
archive_path = tmp_path / "note-export.tar"
exported = vm_manager_module._prepare_workspace_export_archive( # noqa: SLF001
workspace_dir=workspace_dir,
workspace_path="note.txt",
archive_path=archive_path,
)
output_path = tmp_path / "note.txt"
output_path.write_text("already here\n", encoding="utf-8")
with pytest.raises(RuntimeError, match="output_path already exists"):
vm_manager_module._extract_workspace_export_archive( # noqa: SLF001
archive_path,
output_path=output_path,
artifact_type=exported.artifact_type,
)
def test_diff_workspace_trees_reports_empty_binary_symlink_and_type_changes(tmp_path: Path) -> None:
baseline_dir = tmp_path / "baseline"
current_dir = tmp_path / "current"
baseline_dir.mkdir()
current_dir.mkdir()
(baseline_dir / "modified.txt").write_text("before\n", encoding="utf-8")
(current_dir / "modified.txt").write_text("after\n", encoding="utf-8")
(baseline_dir / "deleted.txt").write_text("gone\n", encoding="utf-8")
(current_dir / "added.txt").write_text("new\n", encoding="utf-8")
(baseline_dir / "binary.bin").write_bytes(b"\x00before")
(current_dir / "binary.bin").write_bytes(b"\x00after")
os.symlink("link-target-old.txt", baseline_dir / "link")
os.symlink("link-target-new.txt", current_dir / "link")
(baseline_dir / "swap").mkdir()
(current_dir / "swap").write_text("type changed\n", encoding="utf-8")
(baseline_dir / "removed-empty").mkdir()
(current_dir / "added-empty").mkdir()
diff_payload = vm_manager_module._diff_workspace_trees( # noqa: SLF001
baseline_dir,
current_dir,
)
assert diff_payload["changed"] is True
assert diff_payload["summary"] == {
"total": 8,
"added": 2,
"modified": 3,
"deleted": 2,
"type_changed": 1,
"text_patched": 3,
"non_text": 5,
}
assert "--- a/modified.txt" in diff_payload["patch"]
assert "+++ b/modified.txt" in diff_payload["patch"]
assert "--- /dev/null" in diff_payload["patch"]
assert "+++ b/added.txt" in diff_payload["patch"]
assert "--- a/deleted.txt" in diff_payload["patch"]
assert "+++ /dev/null" in diff_payload["patch"]
entries = {entry["path"]: entry for entry in diff_payload["entries"]}
assert entries["binary.bin"]["text_patch"] is None
assert entries["link"]["artifact_type"] == "symlink"
assert entries["swap"]["artifact_type"] == "file"
assert entries["removed-empty"]["artifact_type"] == "directory"
assert entries["added-empty"]["artifact_type"] == "directory"
def test_diff_workspace_trees_unchanged_returns_empty_summary(tmp_path: Path) -> None:
baseline_dir = tmp_path / "baseline"
current_dir = tmp_path / "current"
baseline_dir.mkdir()
current_dir.mkdir()
(baseline_dir / "note.txt").write_text("same\n", encoding="utf-8")
(current_dir / "note.txt").write_text("same\n", encoding="utf-8")
diff_payload = vm_manager_module._diff_workspace_trees( # noqa: SLF001
baseline_dir,
current_dir,
)
assert diff_payload == {
"changed": False,
"summary": {
"total": 0,
"added": 0,
"modified": 0,
"deleted": 0,
"type_changed": 0,
"text_patched": 0,
"non_text": 0,
},
"entries": [],
"patch": "",
}
def test_workspace_shell_lifecycle_and_rehydration(tmp_path: Path) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
created = manager.create_workspace(
environment="debian:12-base",
allow_host_compat=True,
)
workspace_id = str(created["workspace_id"])
opened = manager.open_shell(workspace_id)
shell_id = str(opened["shell_id"])
assert opened["state"] == "running"
manager.write_shell(workspace_id, shell_id, input_text="pwd")
output = ""
deadline = time.time() + 5
while time.time() < deadline:
read = manager.read_shell(workspace_id, shell_id, cursor=0, max_chars=65536)
output = str(read["output"])
if "/workspace" in output:
break
time.sleep(0.05)
assert "/workspace" in output
manager_rehydrated = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
second_opened = manager_rehydrated.open_shell(workspace_id)
second_shell_id = str(second_opened["shell_id"])
assert second_shell_id != shell_id
manager_rehydrated.write_shell(workspace_id, second_shell_id, input_text="printf 'ok\\n'")
second_output = ""
deadline = time.time() + 5
while time.time() < deadline:
read = manager_rehydrated.read_shell(
workspace_id,
second_shell_id,
cursor=0,
max_chars=65536,
)
second_output = str(read["output"])
if "ok" in second_output:
break
time.sleep(0.05)
assert "ok" in second_output
logs = manager.logs_workspace(workspace_id)
assert logs["count"] == 0
closed = manager.close_shell(workspace_id, shell_id)
assert closed["closed"] is True
with pytest.raises(ValueError, match="does not exist"):
manager.read_shell(workspace_id, shell_id)
deleted = manager.delete_workspace(workspace_id)
assert deleted["deleted"] is True
with pytest.raises(ValueError, match="does not exist"):
manager_rehydrated.read_shell(workspace_id, second_shell_id)
def test_workspace_create_rejects_unsafe_seed_archive(tmp_path: Path) -> None:
archive_path = tmp_path / "bad.tgz"
with tarfile.open(archive_path, "w:gz") as archive:
payload = b"bad\n"
info = tarfile.TarInfo(name="../escape.txt")
info.size = len(payload)
archive.addfile(info, io.BytesIO(payload))
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
with pytest.raises(RuntimeError, match="unsafe archive member path"):
manager.create_workspace(
environment="debian:12-base",
allow_host_compat=True,
seed_path=archive_path,
)
assert list((tmp_path / "vms" / "workspaces").iterdir()) == []
def test_workspace_create_rejects_archive_that_writes_through_symlink(tmp_path: Path) -> None:
archive_path = tmp_path / "bad-symlink.tgz"
with tarfile.open(archive_path, "w:gz") as archive:
symlink_info = tarfile.TarInfo(name="linked")
symlink_info.type = tarfile.SYMTYPE
symlink_info.linkname = "outside"
archive.addfile(symlink_info)
payload = b"bad\n"
file_info = tarfile.TarInfo(name="linked/note.txt")
file_info.size = len(payload)
archive.addfile(file_info, io.BytesIO(payload))
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
with pytest.raises(RuntimeError, match="traverse through a symlinked path"):
manager.create_workspace(
environment="debian:12-base",
allow_host_compat=True,
seed_path=archive_path,
)
def test_workspace_create_cleans_up_on_seed_failure(
tmp_path: Path, monkeypatch: pytest.MonkeyPatch
) -> None:
source_dir = tmp_path / "seed"
source_dir.mkdir()
(source_dir / "note.txt").write_text("hello\n", encoding="utf-8")
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
def _boom(*args: Any, **kwargs: Any) -> dict[str, Any]:
del args, kwargs
raise RuntimeError("seed import failed")
monkeypatch.setattr(manager._backend, "import_archive", _boom) # noqa: SLF001
with pytest.raises(RuntimeError, match="seed import failed"):
manager.create_workspace(
environment="debian:12-base",
allow_host_compat=True,
seed_path=source_dir,
)
assert list((tmp_path / "vms" / "workspaces").iterdir()) == []
def test_workspace_rehydrates_across_manager_processes(tmp_path: Path) -> None:
base_dir = tmp_path / "vms"
manager = VmManager(
backend_name="mock",
base_dir=base_dir,
network_manager=TapNetworkManager(enabled=False),
)
workspace_id = str(
manager.create_workspace(
environment="debian:12-base",
allow_host_compat=True,
)["workspace_id"]
)
other = VmManager(
backend_name="mock",
base_dir=base_dir,
network_manager=TapNetworkManager(enabled=False),
)
executed = other.exec_workspace(workspace_id, command="printf 'ok\\n'", timeout_seconds=30)
assert executed["exit_code"] == 0
assert executed["stdout"] == "ok\n"
logs = other.logs_workspace(workspace_id)
assert logs["count"] == 1
def test_workspace_requires_started_state(tmp_path: Path) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
workspace_id = str(
manager.create_workspace(
environment="debian:12-base",
allow_host_compat=True,
)["workspace_id"]
)
workspace_path = tmp_path / "vms" / "workspaces" / workspace_id / "workspace.json"
payload = json.loads(workspace_path.read_text(encoding="utf-8"))
payload["state"] = "stopped"
workspace_path.write_text(json.dumps(payload, indent=2, sort_keys=True), encoding="utf-8")
with pytest.raises(RuntimeError, match="must be in 'started' state"):
manager.exec_workspace(workspace_id, command="true", timeout_seconds=30)
def test_vm_manager_firecracker_backend_path(
tmp_path: Path, monkeypatch: pytest.MonkeyPatch
) -> None:
class StubFirecrackerBackend:
def __init__(
self,
environment_store: Any,
firecracker_bin: Path,
jailer_bin: Path,
runtime_capabilities: Any,
network_manager: TapNetworkManager,
) -> None:
self.environment_store = environment_store
self.firecracker_bin = firecracker_bin
self.jailer_bin = jailer_bin
self.runtime_capabilities = runtime_capabilities
self.network_manager = network_manager
def create(self, instance: Any) -> None:
del instance
def start(self, instance: Any) -> None:
del instance
def exec(self, instance: Any, command: str, timeout_seconds: int) -> Any:
del instance, command, timeout_seconds
return None
def stop(self, instance: Any) -> None:
del instance
def delete(self, instance: Any) -> None:
del instance
monkeypatch.setattr(vm_manager_module, "FirecrackerBackend", StubFirecrackerBackend)
manager = VmManager(
backend_name="firecracker",
base_dir=tmp_path / "vms",
runtime_paths=resolve_runtime_paths(),
network_manager=TapNetworkManager(enabled=False),
)
assert manager._backend_name == "firecracker" # noqa: SLF001
def test_vm_manager_fails_closed_without_host_compat_opt_in(tmp_path: Path) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
vm_id = str(
manager.create_vm(
environment="debian:12-base",
ttl_seconds=600,
)["vm_id"]
)
with pytest.raises(RuntimeError, match="guest boot is unavailable"):
manager.start_vm(vm_id)
def test_vm_manager_uses_canonical_default_cache_dir(
monkeypatch: pytest.MonkeyPatch, tmp_path: Path
) -> None:
monkeypatch.setenv("PYRO_ENVIRONMENT_CACHE_DIR", str(tmp_path / "cache"))
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
assert manager._environment_store.cache_dir == tmp_path / "cache" # noqa: SLF001
def test_vm_manager_helper_round_trips() -> None:
network = NetworkConfig(
vm_id="abc123",
tap_name="tap0",
guest_ip="172.29.1.2",
gateway_ip="172.29.1.1",
subnet_cidr="172.29.1.0/24",
mac_address="06:00:aa:bb:cc:dd",
dns_servers=("1.1.1.1", "8.8.8.8"),
)
assert vm_manager_module._optional_int(None) is None # noqa: SLF001
assert vm_manager_module._optional_int(True) == 1 # noqa: SLF001
assert vm_manager_module._optional_int(7) == 7 # noqa: SLF001
assert vm_manager_module._optional_int(7.2) == 7 # noqa: SLF001
assert vm_manager_module._optional_int("9") == 9 # noqa: SLF001
with pytest.raises(TypeError, match="integer-compatible"):
vm_manager_module._optional_int(object()) # noqa: SLF001
assert vm_manager_module._optional_str(None) is None # noqa: SLF001
assert vm_manager_module._optional_str(1) == "1" # noqa: SLF001
assert vm_manager_module._optional_dict(None) is None # noqa: SLF001
assert vm_manager_module._optional_dict({"x": 1}) == {"x": 1} # noqa: SLF001
with pytest.raises(TypeError, match="dictionary payload"):
vm_manager_module._optional_dict("bad") # noqa: SLF001
assert vm_manager_module._string_dict({"x": 1}) == {"x": "1"} # noqa: SLF001
assert vm_manager_module._string_dict("bad") == {} # noqa: SLF001
serialized = vm_manager_module._serialize_network(network) # noqa: SLF001
assert serialized is not None
restored = vm_manager_module._deserialize_network(serialized) # noqa: SLF001
assert restored == network
assert vm_manager_module._deserialize_network(None) is None # noqa: SLF001
with pytest.raises(TypeError, match="dictionary payload"):
vm_manager_module._deserialize_network("bad") # noqa: SLF001
assert vm_manager_module._wrap_guest_command("echo hi") == "echo hi" # noqa: SLF001
wrapped = vm_manager_module._wrap_guest_command("echo hi", cwd="/workspace") # noqa: SLF001
assert "cd /workspace" in wrapped
assert vm_manager_module._pid_is_running(None) is False # noqa: SLF001
def test_copy_rootfs_falls_back_to_copy2(
tmp_path: Path, monkeypatch: pytest.MonkeyPatch
) -> None:
source = tmp_path / "rootfs.ext4"
source.write_text("payload", encoding="utf-8")
dest = tmp_path / "dest" / "rootfs.ext4"
def _raise_oserror(*args: Any, **kwargs: Any) -> Any:
del args, kwargs
raise OSError("no cp")
monkeypatch.setattr(subprocess, "run", _raise_oserror)
clone_mode = vm_manager_module._copy_rootfs(source, dest) # noqa: SLF001
assert clone_mode == "copy2"
assert dest.read_text(encoding="utf-8") == "payload"
def test_workspace_create_cleans_up_on_start_failure(
tmp_path: Path, monkeypatch: pytest.MonkeyPatch
) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
def _boom(instance: Any) -> None:
del instance
raise RuntimeError("boom")
monkeypatch.setattr(manager._backend, "start", _boom) # noqa: SLF001
with pytest.raises(RuntimeError, match="boom"):
manager.create_workspace(environment="debian:12-base", allow_host_compat=True)
assert list((tmp_path / "vms" / "workspaces").iterdir()) == []
def test_exec_instance_wraps_guest_workspace_command(tmp_path: Path) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
manager._runtime_capabilities = RuntimeCapabilities( # noqa: SLF001
supports_vm_boot=True,
supports_guest_exec=True,
supports_guest_network=False,
reason=None,
)
captured: dict[str, Any] = {}
class StubBackend:
def exec(
self,
instance: Any,
command: str,
timeout_seconds: int,
*,
workdir: Path | None = None,
) -> vm_manager_module.VmExecResult:
del instance, timeout_seconds
captured["command"] = command
captured["workdir"] = workdir
return vm_manager_module.VmExecResult(
stdout="",
stderr="",
exit_code=0,
duration_ms=1,
)
manager._backend = StubBackend() # type: ignore[assignment] # noqa: SLF001
instance = vm_manager_module.VmInstance( # noqa: SLF001
vm_id="vm-123",
environment="debian:12-base",
vcpu_count=1,
mem_mib=512,
ttl_seconds=600,
created_at=time.time(),
expires_at=time.time() + 600,
workdir=tmp_path / "runtime",
state="started",
)
result, execution_mode = manager._exec_instance( # noqa: SLF001
instance,
command="echo hi",
timeout_seconds=30,
guest_cwd="/workspace",
)
assert result.exit_code == 0
assert execution_mode == "unknown"
assert "cd /workspace" in str(captured["command"])
assert captured["workdir"] is None
def test_status_workspace_marks_dead_backing_process_stopped(tmp_path: Path) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
workspace_id = str(
manager.create_workspace(
environment="debian:12-base",
allow_host_compat=True,
)["workspace_id"]
)
workspace_path = tmp_path / "vms" / "workspaces" / workspace_id / "workspace.json"
payload = json.loads(workspace_path.read_text(encoding="utf-8"))
payload["metadata"]["execution_mode"] = "guest_vsock"
payload["firecracker_pid"] = 999999
workspace_path.write_text(json.dumps(payload, indent=2, sort_keys=True), encoding="utf-8")
status = manager.status_workspace(workspace_id)
assert status["state"] == "stopped"
updated_payload = json.loads(workspace_path.read_text(encoding="utf-8"))
assert "backing guest process" in str(updated_payload.get("last_error", ""))
def test_reap_expired_workspaces_removes_invalid_and_expired_records(tmp_path: Path) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
invalid_dir = tmp_path / "vms" / "workspaces" / "invalid"
invalid_dir.mkdir(parents=True)
(invalid_dir / "workspace.json").write_text("[]", encoding="utf-8")
workspace_id = str(
manager.create_workspace(
environment="debian:12-base",
allow_host_compat=True,
)["workspace_id"]
)
workspace_path = tmp_path / "vms" / "workspaces" / workspace_id / "workspace.json"
payload = json.loads(workspace_path.read_text(encoding="utf-8"))
payload["expires_at"] = 0.0
workspace_path.write_text(json.dumps(payload, indent=2, sort_keys=True), encoding="utf-8")
with manager._lock: # noqa: SLF001
manager._reap_expired_workspaces_locked(time.time()) # noqa: SLF001
assert not invalid_dir.exists()
assert not (tmp_path / "vms" / "workspaces" / workspace_id).exists()
def test_workspace_service_lifecycle_and_status_counts(tmp_path: Path) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
workspace_id = str(
manager.create_workspace(
environment="debian:12-base",
allow_host_compat=True,
)["workspace_id"]
)
started = manager.start_service(
workspace_id,
"app",
command="sh -lc 'printf \"service ready\\n\"; touch .ready; while true; do sleep 60; done'",
readiness={"type": "file", "path": ".ready"},
)
assert started["state"] == "running"
listed = manager.list_services(workspace_id)
assert listed["count"] == 1
assert listed["running_count"] == 1
status = manager.status_service(workspace_id, "app")
assert status["state"] == "running"
assert status["ready_at"] is not None
logs = manager.logs_service(workspace_id, "app")
assert "service ready" in str(logs["stdout"])
workspace_status = manager.status_workspace(workspace_id)
assert workspace_status["service_count"] == 1
assert workspace_status["running_service_count"] == 1
stopped = manager.stop_service(workspace_id, "app")
assert stopped["state"] == "stopped"
assert stopped["stop_reason"] in {"sigterm", "sigkill"}
deleted = manager.delete_workspace(workspace_id)
assert deleted["deleted"] is True
def test_workspace_service_start_replaces_non_running_record(tmp_path: Path) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
workspace_id = str(
manager.create_workspace(
environment="debian:12-base",
allow_host_compat=True,
)["workspace_id"]
)
failed = manager.start_service(
workspace_id,
"app",
command="sh -lc 'exit 2'",
readiness={"type": "file", "path": ".ready"},
ready_timeout_seconds=1,
ready_interval_ms=50,
)
assert failed["state"] == "failed"
started = manager.start_service(
workspace_id,
"app",
command="sh -lc 'touch .ready; while true; do sleep 60; done'",
readiness={"type": "file", "path": ".ready"},
)
assert started["state"] == "running"
manager.delete_workspace(workspace_id)
def test_workspace_service_supports_command_readiness_and_helper_probes(
tmp_path: Path, monkeypatch: pytest.MonkeyPatch
) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
workspace_id = str(
manager.create_workspace(
environment="debian:12-base",
allow_host_compat=True,
)["workspace_id"]
)
command_started = manager.start_service(
workspace_id,
"command-ready",
command="sh -lc 'touch command.ready; while true; do sleep 60; done'",
readiness={"type": "command", "command": "test -f command.ready"},
)
assert command_started["state"] == "running"
listed = manager.list_services(workspace_id)
assert listed["count"] == 1
assert listed["running_count"] == 1
status = manager.status_workspace(workspace_id)
assert status["service_count"] == 1
assert status["running_service_count"] == 1
assert manager.stop_service(workspace_id, "command-ready")["state"] == "stopped"
workspace_dir = tmp_path / "vms" / "workspaces" / workspace_id / "workspace"
ready_file = workspace_dir / "probe.ready"
ready_file.write_text("ok\n", encoding="utf-8")
assert vm_manager_module._service_ready_on_host( # noqa: SLF001
readiness={"type": "file", "path": "/workspace/probe.ready"},
workspace_dir=workspace_dir,
cwd=workspace_dir,
)
class StubSocket:
def __enter__(self) -> StubSocket:
return self
def __exit__(self, *args: object) -> None:
del args
def settimeout(self, timeout: int) -> None:
assert timeout == 1
def connect(self, address: tuple[str, int]) -> None:
assert address == ("127.0.0.1", 8080)
monkeypatch.setattr("pyro_mcp.vm_manager.socket.socket", lambda *args: StubSocket())
assert vm_manager_module._service_ready_on_host( # noqa: SLF001
readiness={"type": "tcp", "address": "127.0.0.1:8080"},
workspace_dir=workspace_dir,
cwd=workspace_dir,
)
class StubResponse:
status = 204
def __enter__(self) -> StubResponse:
return self
def __exit__(self, *args: object) -> None:
del args
def _urlopen(request: object, timeout: int) -> StubResponse:
del request
assert timeout == 2
return StubResponse()
monkeypatch.setattr("pyro_mcp.vm_manager.urllib.request.urlopen", _urlopen)
assert vm_manager_module._service_ready_on_host( # noqa: SLF001
readiness={"type": "http", "url": "http://127.0.0.1:8080/"},
workspace_dir=workspace_dir,
cwd=workspace_dir,
)
def test_workspace_service_logs_tail_and_delete_cleanup(tmp_path: Path) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
workspace_id = str(
manager.create_workspace(
environment="debian:12-base",
allow_host_compat=True,
)["workspace_id"]
)
manager.start_service(
workspace_id,
"logger",
command=(
"sh -lc 'printf \"one\\n\"; printf \"two\\n\"; "
"touch .ready; while true; do sleep 60; done'"
),
readiness={"type": "file", "path": ".ready"},
)
logs = manager.logs_service(workspace_id, "logger", tail_lines=1)
assert logs["stdout"] == "two\n"
assert logs["truncated"] is True
services_dir = tmp_path / "vms" / "workspaces" / workspace_id / "services"
assert services_dir.exists()
deleted = manager.delete_workspace(workspace_id)
assert deleted["deleted"] is True
assert not services_dir.exists()
def test_workspace_status_stops_service_counts_when_workspace_is_stopped(tmp_path: Path) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
workspace_id = str(
manager.create_workspace(
environment="debian:12-base",
allow_host_compat=True,
)["workspace_id"]
)
manager.start_service(
workspace_id,
"app",
command="sh -lc 'touch .ready; while true; do sleep 60; done'",
readiness={"type": "file", "path": ".ready"},
)
service_path = tmp_path / "vms" / "workspaces" / workspace_id / "services" / "app.json"
live_service_payload = json.loads(service_path.read_text(encoding="utf-8"))
live_pid = int(live_service_payload["pid"])
try:
workspace_path = tmp_path / "vms" / "workspaces" / workspace_id / "workspace.json"
payload = json.loads(workspace_path.read_text(encoding="utf-8"))
payload["state"] = "stopped"
workspace_path.write_text(json.dumps(payload, indent=2, sort_keys=True), encoding="utf-8")
status = manager.status_workspace(workspace_id)
assert status["state"] == "stopped"
assert status["service_count"] == 1
assert status["running_service_count"] == 0
service_payload = json.loads(service_path.read_text(encoding="utf-8"))
assert service_payload["state"] == "stopped"
assert service_payload["stop_reason"] == "workspace_stopped"
finally:
vm_manager_module._stop_process_group(live_pid) # noqa: SLF001
def test_workspace_service_readiness_validation_helpers() -> None:
assert vm_manager_module._normalize_workspace_service_name("app-1") == "app-1" # noqa: SLF001
with pytest.raises(ValueError, match="service_name must not be empty"):
vm_manager_module._normalize_workspace_service_name(" ") # noqa: SLF001
with pytest.raises(ValueError, match="service_name must match"):
vm_manager_module._normalize_workspace_service_name("bad name") # noqa: SLF001
assert vm_manager_module._normalize_workspace_service_readiness( # noqa: SLF001
{"type": "file", "path": "subdir/.ready"}
) == {"type": "file", "path": "/workspace/subdir/.ready"}
assert vm_manager_module._normalize_workspace_service_readiness( # noqa: SLF001
{"type": "tcp", "address": "127.0.0.1:8080"}
) == {"type": "tcp", "address": "127.0.0.1:8080"}
assert vm_manager_module._normalize_workspace_service_readiness( # noqa: SLF001
{"type": "http", "url": "http://127.0.0.1:8080/"}
) == {"type": "http", "url": "http://127.0.0.1:8080/"}
assert vm_manager_module._normalize_workspace_service_readiness( # noqa: SLF001
{"type": "command", "command": "test -f .ready"}
) == {"type": "command", "command": "test -f .ready"}
with pytest.raises(ValueError, match="one of: file, tcp, http, command"):
vm_manager_module._normalize_workspace_service_readiness({"type": "bogus"}) # noqa: SLF001
with pytest.raises(ValueError, match="required for file readiness"):
vm_manager_module._normalize_workspace_service_readiness({"type": "file"}) # noqa: SLF001
with pytest.raises(ValueError, match="HOST:PORT format"):
vm_manager_module._normalize_workspace_service_readiness( # noqa: SLF001
{"type": "tcp", "address": "127.0.0.1"}
)
with pytest.raises(ValueError, match="required for http readiness"):
vm_manager_module._normalize_workspace_service_readiness({"type": "http"}) # noqa: SLF001
with pytest.raises(ValueError, match="required for command readiness"):
vm_manager_module._normalize_workspace_service_readiness({"type": "command"}) # noqa: SLF001
def test_workspace_service_text_and_exit_code_helpers(tmp_path: Path) -> None:
status_path = tmp_path / "service.status"
assert vm_manager_module._read_service_exit_code(status_path) is None # noqa: SLF001
status_path.write_text("", encoding="utf-8")
assert vm_manager_module._read_service_exit_code(status_path) is None # noqa: SLF001
status_path.write_text("7\n", encoding="utf-8")
assert vm_manager_module._read_service_exit_code(status_path) == 7 # noqa: SLF001
log_path = tmp_path / "service.log"
assert vm_manager_module._tail_text(log_path, tail_lines=10) == ("", False) # noqa: SLF001
log_path.write_text("one\ntwo\nthree\n", encoding="utf-8")
assert vm_manager_module._tail_text(log_path, tail_lines=None) == ( # noqa: SLF001
"one\ntwo\nthree\n",
False,
)
assert vm_manager_module._tail_text(log_path, tail_lines=5) == ( # noqa: SLF001
"one\ntwo\nthree\n",
False,
)
assert vm_manager_module._tail_text(log_path, tail_lines=1) == ("three\n", True) # noqa: SLF001
def test_workspace_service_process_group_helpers(monkeypatch: pytest.MonkeyPatch) -> None:
def _missing(_pid: int, _signal: int) -> None:
raise ProcessLookupError()
monkeypatch.setattr("pyro_mcp.vm_manager.os.killpg", _missing)
assert vm_manager_module._stop_process_group(123) == (False, False) # noqa: SLF001
kill_calls: list[int] = []
monotonic_values = iter([0.0, 0.0, 5.0, 5.0, 10.0])
running_states = iter([True, True, False])
def _killpg(_pid: int, signum: int) -> None:
kill_calls.append(signum)
def _monotonic() -> float:
return next(monotonic_values)
def _is_running(_pid: int | None) -> bool:
return next(running_states)
monkeypatch.setattr("pyro_mcp.vm_manager.os.killpg", _killpg)
monkeypatch.setattr("pyro_mcp.vm_manager.time.monotonic", _monotonic)
monkeypatch.setattr("pyro_mcp.vm_manager.time.sleep", lambda _seconds: None)
monkeypatch.setattr("pyro_mcp.vm_manager._pid_is_running", _is_running)
stopped, killed = vm_manager_module._stop_process_group(456, wait_seconds=5) # noqa: SLF001
assert (stopped, killed) == (True, True)
assert kill_calls == [signal.SIGTERM, signal.SIGKILL]
def test_workspace_service_probe_and_refresh_helpers(
tmp_path: Path, monkeypatch: pytest.MonkeyPatch
) -> None:
assert vm_manager_module._run_service_probe_command(tmp_path, "exit 3") == 3 # noqa: SLF001
services_dir = tmp_path / "services"
services_dir.mkdir()
status_path = services_dir / "app.status"
status_path.write_text("9\n", encoding="utf-8")
running = vm_manager_module.WorkspaceServiceRecord( # noqa: SLF001
workspace_id="workspace-1",
service_name="app",
command="sleep 60",
cwd="/workspace",
state="running",
started_at=time.time(),
readiness=None,
ready_at=None,
ended_at=None,
exit_code=None,
pid=1234,
execution_mode="host_compat",
stop_reason=None,
)
monkeypatch.setattr("pyro_mcp.vm_manager._pid_is_running", lambda _pid: False)
refreshed = vm_manager_module._refresh_local_service_record( # noqa: SLF001
running,
services_dir=services_dir,
)
assert refreshed.state == "exited"
assert refreshed.exit_code == 9
monkeypatch.setattr(
"pyro_mcp.vm_manager._stop_process_group",
lambda _pid: (True, False),
)
stopped = vm_manager_module._stop_local_service( # noqa: SLF001
refreshed,
services_dir=services_dir,
)
assert stopped.state == "stopped"
assert stopped.stop_reason == "sigterm"