pyro-mcp/tests/test_vm_manager.py
Thales Maciel fc72fcd3a1 Add guest-only workspace secrets
Add explicit workspace secrets across the CLI, SDK, and MCP, with create-time secret definitions and per-call secret-to-env mapping for exec, shell open, and service start. Persist only safe secret metadata in workspace records, materialize secret files under /run/pyro-secrets, and redact secret values from exec output, shell reads, service logs, and surfaced errors.

Fix the remaining real-guest shell gap by shipping bundled guest init alongside the guest agent and patching both into guest-backed workspace rootfs images before boot. The new init mounts devpts so PTY shells work on Firecracker guests, while reset continues to recreate the sandbox and re-materialize secrets from stored task-local secret material.

Validation: uv lock; UV_CACHE_DIR=.uv-cache make check; UV_CACHE_DIR=.uv-cache make dist-check; and a real guest-backed Firecracker smoke covering workspace create with secrets, secret-backed exec, shell, service, reset, and delete.
2026-03-12 15:43:34 -03:00

1924 lines
67 KiB
Python

from __future__ import annotations
import io
import json
import os
import signal
import subprocess
import tarfile
import time
from pathlib import Path
from typing import Any, cast
import pytest
import pyro_mcp.vm_manager as vm_manager_module
from pyro_mcp.runtime import RuntimeCapabilities, resolve_runtime_paths
from pyro_mcp.vm_manager import VmManager
from pyro_mcp.vm_network import NetworkConfig, TapNetworkManager
def test_vm_manager_lifecycle_and_auto_cleanup(tmp_path: Path) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
created = manager.create_vm(
environment="debian:12",
vcpu_count=1,
mem_mib=512,
ttl_seconds=600,
allow_host_compat=True,
)
vm_id = str(created["vm_id"])
started = manager.start_vm(vm_id)
assert started["state"] == "started"
executed = manager.exec_vm(vm_id, command="printf 'git version 2.43.0\\n'", timeout_seconds=30)
assert executed["exit_code"] == 0
assert executed["execution_mode"] == "host_compat"
assert "git version" in str(executed["stdout"])
with pytest.raises(ValueError, match="does not exist"):
manager.status_vm(vm_id)
def test_vm_manager_exec_timeout(tmp_path: Path) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
vm_id = str(
manager.create_vm(
environment="debian:12-base",
vcpu_count=1,
mem_mib=512,
ttl_seconds=600,
allow_host_compat=True,
)["vm_id"]
)
manager.start_vm(vm_id)
result = manager.exec_vm(vm_id, command="sleep 2", timeout_seconds=1)
assert result["exit_code"] == 124
assert "timed out" in str(result["stderr"])
def test_vm_manager_stop_and_delete(tmp_path: Path) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
vm_id = str(
manager.create_vm(
environment="debian:12-base",
vcpu_count=1,
mem_mib=512,
ttl_seconds=600,
allow_host_compat=True,
)["vm_id"]
)
manager.start_vm(vm_id)
stopped = manager.stop_vm(vm_id)
assert stopped["state"] == "stopped"
deleted = manager.delete_vm(vm_id)
assert deleted["deleted"] is True
def test_vm_manager_reaps_expired(tmp_path: Path) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
manager.MIN_TTL_SECONDS = 1
vm_id = str(
manager.create_vm(
environment="debian:12-base",
vcpu_count=1,
mem_mib=512,
ttl_seconds=1,
allow_host_compat=True,
)["vm_id"]
)
instance = manager._instances[vm_id] # noqa: SLF001
instance.expires_at = 0.0
result = manager.reap_expired()
assert result["count"] == 1
with pytest.raises(ValueError):
manager.status_vm(vm_id)
def test_vm_manager_reaps_started_vm(tmp_path: Path) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
manager.MIN_TTL_SECONDS = 1
vm_id = str(
manager.create_vm(
environment="debian:12-base",
vcpu_count=1,
mem_mib=512,
ttl_seconds=1,
allow_host_compat=True,
)["vm_id"]
)
manager.start_vm(vm_id)
manager._instances[vm_id].expires_at = 0.0 # noqa: SLF001
result = manager.reap_expired()
assert result["count"] == 1
@pytest.mark.parametrize(
("kwargs", "msg"),
[
({"vcpu_count": 0, "mem_mib": 512, "ttl_seconds": 600}, "vcpu_count must be between"),
({"vcpu_count": 1, "mem_mib": 64, "ttl_seconds": 600}, "mem_mib must be between"),
({"vcpu_count": 1, "mem_mib": 512, "ttl_seconds": 30}, "ttl_seconds must be between"),
],
)
def test_vm_manager_validates_limits(tmp_path: Path, kwargs: dict[str, Any], msg: str) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
with pytest.raises(ValueError, match=msg):
manager.create_vm(environment="debian:12-base", **kwargs)
def test_vm_manager_max_active_limit(tmp_path: Path) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
max_active_vms=1,
network_manager=TapNetworkManager(enabled=False),
)
manager.create_vm(
environment="debian:12-base",
vcpu_count=1,
mem_mib=512,
ttl_seconds=600,
allow_host_compat=True,
)
with pytest.raises(RuntimeError, match="max active VMs reached"):
manager.create_vm(
environment="debian:12-base",
vcpu_count=1,
mem_mib=512,
ttl_seconds=600,
allow_host_compat=True,
)
def test_vm_manager_state_validation(tmp_path: Path) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
vm_id = str(
manager.create_vm(
environment="debian:12-base",
vcpu_count=1,
mem_mib=512,
ttl_seconds=600,
allow_host_compat=True,
)["vm_id"]
)
with pytest.raises(RuntimeError, match="must be in 'started' state"):
manager.exec_vm(vm_id, command="echo hi", timeout_seconds=30)
with pytest.raises(ValueError, match="must be positive"):
manager.exec_vm(vm_id, command="echo hi", timeout_seconds=0)
manager.start_vm(vm_id)
with pytest.raises(RuntimeError, match="cannot be started from state"):
manager.start_vm(vm_id)
def test_vm_manager_status_expired_raises(tmp_path: Path) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
manager.MIN_TTL_SECONDS = 1
vm_id = str(
manager.create_vm(
environment="debian:12-base",
vcpu_count=1,
mem_mib=512,
ttl_seconds=1,
allow_host_compat=True,
)["vm_id"]
)
manager._instances[vm_id].expires_at = 0.0 # noqa: SLF001
with pytest.raises(RuntimeError, match="expired and was automatically deleted"):
manager.status_vm(vm_id)
def test_vm_manager_invalid_backend(tmp_path: Path) -> None:
with pytest.raises(ValueError, match="invalid backend"):
VmManager(
backend_name="nope",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
def test_vm_manager_network_info(tmp_path: Path) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
created = manager.create_vm(
environment="debian:12-base",
vcpu_count=1,
mem_mib=512,
ttl_seconds=600,
allow_host_compat=True,
)
vm_id = str(created["vm_id"])
status = manager.status_vm(vm_id)
info = manager.network_info_vm(vm_id)
assert status["network_enabled"] is False
assert status["guest_ip"] is None
assert info["network_enabled"] is False
def test_vm_manager_run_vm(tmp_path: Path) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
result = manager.run_vm(
environment="debian:12-base",
command="printf 'ok\\n'",
vcpu_count=1,
mem_mib=512,
timeout_seconds=30,
ttl_seconds=600,
network=False,
allow_host_compat=True,
)
assert int(result["exit_code"]) == 0
assert str(result["stdout"]) == "ok\n"
def test_workspace_lifecycle_and_logs(tmp_path: Path) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
created = manager.create_workspace(
environment="debian:12-base",
allow_host_compat=True,
)
workspace_id = str(created["workspace_id"])
assert created["state"] == "started"
assert created["workspace_path"] == "/workspace"
first = manager.exec_workspace(
workspace_id,
command="printf 'hello\\n' > note.txt",
timeout_seconds=30,
)
second = manager.exec_workspace(workspace_id, command="cat note.txt", timeout_seconds=30)
assert first["exit_code"] == 0
assert second["stdout"] == "hello\n"
status = manager.status_workspace(workspace_id)
assert status["command_count"] == 2
assert status["last_command"] is not None
logs = manager.logs_workspace(workspace_id)
assert logs["count"] == 2
entries = logs["entries"]
assert isinstance(entries, list)
assert entries[1]["stdout"] == "hello\n"
deleted = manager.delete_workspace(workspace_id)
assert deleted["deleted"] is True
with pytest.raises(ValueError, match="does not exist"):
manager.status_workspace(workspace_id)
def test_workspace_create_seeds_directory_source_into_workspace(tmp_path: Path) -> None:
source_dir = tmp_path / "seed"
source_dir.mkdir()
(source_dir / "note.txt").write_text("hello\n", encoding="utf-8")
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
created = manager.create_workspace(
environment="debian:12-base",
allow_host_compat=True,
seed_path=source_dir,
)
workspace_id = str(created["workspace_id"])
workspace_seed = created["workspace_seed"]
assert workspace_seed["mode"] == "directory"
assert workspace_seed["seed_path"] == str(source_dir.resolve())
executed = manager.exec_workspace(workspace_id, command="cat note.txt", timeout_seconds=30)
assert executed["stdout"] == "hello\n"
status = manager.status_workspace(workspace_id)
assert status["workspace_seed"]["mode"] == "directory"
assert status["workspace_seed"]["seed_path"] == str(source_dir.resolve())
def test_workspace_create_seeds_tar_archive_into_workspace(tmp_path: Path) -> None:
archive_path = tmp_path / "seed.tgz"
nested_dir = tmp_path / "src"
nested_dir.mkdir()
(nested_dir / "note.txt").write_text("archive\n", encoding="utf-8")
with tarfile.open(archive_path, "w:gz") as archive:
archive.add(nested_dir / "note.txt", arcname="note.txt")
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
created = manager.create_workspace(
environment="debian:12-base",
allow_host_compat=True,
seed_path=archive_path,
)
workspace_id = str(created["workspace_id"])
assert created["workspace_seed"]["mode"] == "tar_archive"
executed = manager.exec_workspace(workspace_id, command="cat note.txt", timeout_seconds=30)
assert executed["stdout"] == "archive\n"
def test_workspace_sync_push_updates_started_workspace(tmp_path: Path) -> None:
source_dir = tmp_path / "seed"
source_dir.mkdir()
(source_dir / "note.txt").write_text("hello\n", encoding="utf-8")
update_dir = tmp_path / "update"
update_dir.mkdir()
(update_dir / "more.txt").write_text("more\n", encoding="utf-8")
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
created = manager.create_workspace(
environment="debian:12-base",
allow_host_compat=True,
seed_path=source_dir,
)
workspace_id = str(created["workspace_id"])
synced = manager.push_workspace_sync(workspace_id, source_path=update_dir, dest="subdir")
assert synced["workspace_sync"]["mode"] == "directory"
assert synced["workspace_sync"]["destination"] == "/workspace/subdir"
executed = manager.exec_workspace(
workspace_id,
command="cat subdir/more.txt",
timeout_seconds=30,
)
assert executed["stdout"] == "more\n"
status = manager.status_workspace(workspace_id)
assert status["command_count"] == 1
assert status["workspace_seed"]["mode"] == "directory"
def test_workspace_sync_push_requires_started_workspace(tmp_path: Path) -> None:
source_dir = tmp_path / "seed"
source_dir.mkdir()
(source_dir / "note.txt").write_text("hello\n", encoding="utf-8")
update_dir = tmp_path / "update"
update_dir.mkdir()
(update_dir / "more.txt").write_text("more\n", encoding="utf-8")
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
created = manager.create_workspace(
environment="debian:12-base",
allow_host_compat=True,
seed_path=source_dir,
)
workspace_id = str(created["workspace_id"])
workspace_path = tmp_path / "vms" / "workspaces" / workspace_id / "workspace.json"
payload = json.loads(workspace_path.read_text(encoding="utf-8"))
payload["state"] = "stopped"
workspace_path.write_text(json.dumps(payload, indent=2, sort_keys=True), encoding="utf-8")
with pytest.raises(
RuntimeError,
match="must be in 'started' state before workspace_sync_push",
):
manager.push_workspace_sync(workspace_id, source_path=update_dir)
def test_workspace_sync_push_rejects_destination_outside_workspace(tmp_path: Path) -> None:
source_dir = tmp_path / "seed"
source_dir.mkdir()
(source_dir / "note.txt").write_text("hello\n", encoding="utf-8")
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
workspace_id = str(
manager.create_workspace(
environment="debian:12-base",
allow_host_compat=True,
)["workspace_id"]
)
with pytest.raises(ValueError, match="workspace destination must stay inside /workspace"):
manager.push_workspace_sync(workspace_id, source_path=source_dir, dest="../escape")
def test_workspace_diff_and_export_round_trip(tmp_path: Path) -> None:
seed_dir = tmp_path / "seed"
seed_dir.mkdir()
(seed_dir / "note.txt").write_text("hello\n", encoding="utf-8")
update_dir = tmp_path / "update"
update_dir.mkdir()
(update_dir / "note.txt").write_text("hello from sync\n", encoding="utf-8")
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
workspace_id = str(
manager.create_workspace(
environment="debian:12-base",
allow_host_compat=True,
seed_path=seed_dir,
)["workspace_id"]
)
manager.push_workspace_sync(workspace_id, source_path=update_dir)
diff_payload = manager.diff_workspace(workspace_id)
assert diff_payload["workspace_id"] == workspace_id
assert diff_payload["changed"] is True
assert diff_payload["summary"]["modified"] == 1
assert diff_payload["summary"]["text_patched"] == 1
assert "-hello\n" in diff_payload["patch"]
assert "+hello from sync\n" in diff_payload["patch"]
output_path = tmp_path / "exported-note.txt"
export_payload = manager.export_workspace(
workspace_id,
path="note.txt",
output_path=output_path,
)
assert export_payload["workspace_id"] == workspace_id
assert export_payload["artifact_type"] == "file"
assert output_path.read_text(encoding="utf-8") == "hello from sync\n"
status = manager.status_workspace(workspace_id)
logs = manager.logs_workspace(workspace_id)
assert status["command_count"] == 0
assert logs["count"] == 0
def test_workspace_export_directory_uses_exact_output_path(tmp_path: Path) -> None:
seed_dir = tmp_path / "seed"
nested_dir = seed_dir / "src"
nested_dir.mkdir(parents=True)
(nested_dir / "note.txt").write_text("hello\n", encoding="utf-8")
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
workspace_id = str(
manager.create_workspace(
environment="debian:12-base",
allow_host_compat=True,
seed_path=seed_dir,
)["workspace_id"]
)
output_dir = tmp_path / "exported-src"
payload = manager.export_workspace(workspace_id, path="src", output_path=output_dir)
assert payload["artifact_type"] == "directory"
assert (output_dir / "note.txt").read_text(encoding="utf-8") == "hello\n"
assert not (output_dir / "src").exists()
def test_workspace_diff_requires_create_time_baseline(tmp_path: Path) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
workspace_id = str(
manager.create_workspace(
environment="debian:12-base",
allow_host_compat=True,
)["workspace_id"]
)
baseline_path = tmp_path / "vms" / "workspaces" / workspace_id / "baseline" / "workspace.tar"
baseline_path.unlink()
with pytest.raises(RuntimeError, match="require[s]? a baseline snapshot"):
manager.diff_workspace(workspace_id)
def test_workspace_snapshots_and_reset_round_trip(tmp_path: Path) -> None:
seed_dir = tmp_path / "seed"
seed_dir.mkdir()
(seed_dir / "note.txt").write_text("seed\n", encoding="utf-8")
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
workspace_id = str(
manager.create_workspace(
environment="debian:12-base",
allow_host_compat=True,
seed_path=seed_dir,
)["workspace_id"]
)
manager.exec_workspace(
workspace_id,
command="printf 'checkpoint\\n' > note.txt",
timeout_seconds=30,
)
created_snapshot = manager.create_snapshot(workspace_id, "checkpoint")
assert created_snapshot["snapshot"]["snapshot_name"] == "checkpoint"
listed = manager.list_snapshots(workspace_id)
assert listed["count"] == 2
assert [snapshot["snapshot_name"] for snapshot in listed["snapshots"]] == [
"baseline",
"checkpoint",
]
manager.exec_workspace(
workspace_id,
command="printf 'after\\n' > note.txt",
timeout_seconds=30,
)
manager.start_service(
workspace_id,
"app",
command="sh -lc 'touch .ready; while true; do sleep 60; done'",
readiness={"type": "file", "path": ".ready"},
)
reset_to_snapshot = manager.reset_workspace(workspace_id, snapshot="checkpoint")
assert reset_to_snapshot["workspace_reset"]["snapshot_name"] == "checkpoint"
assert reset_to_snapshot["reset_count"] == 1
assert reset_to_snapshot["last_command"] is None
assert reset_to_snapshot["command_count"] == 0
assert reset_to_snapshot["service_count"] == 0
assert reset_to_snapshot["running_service_count"] == 0
checkpoint_result = manager.exec_workspace(
workspace_id,
command="cat note.txt",
timeout_seconds=30,
)
assert checkpoint_result["stdout"] == "checkpoint\n"
logs_after_snapshot_reset = manager.logs_workspace(workspace_id)
assert logs_after_snapshot_reset["count"] == 1
reset_to_baseline = manager.reset_workspace(workspace_id)
assert reset_to_baseline["workspace_reset"]["snapshot_name"] == "baseline"
assert reset_to_baseline["reset_count"] == 2
assert reset_to_baseline["command_count"] == 0
assert reset_to_baseline["service_count"] == 0
assert manager.logs_workspace(workspace_id)["count"] == 0
baseline_result = manager.exec_workspace(
workspace_id,
command="cat note.txt",
timeout_seconds=30,
)
assert baseline_result["stdout"] == "seed\n"
diff_payload = manager.diff_workspace(workspace_id)
assert diff_payload["changed"] is False
deleted_snapshot = manager.delete_snapshot(workspace_id, "checkpoint")
assert deleted_snapshot["deleted"] is True
listed_after_delete = manager.list_snapshots(workspace_id)
assert [snapshot["snapshot_name"] for snapshot in listed_after_delete["snapshots"]] == [
"baseline"
]
def test_workspace_snapshot_and_reset_require_baseline(tmp_path: Path) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
workspace_id = str(
manager.create_workspace(
environment="debian:12-base",
allow_host_compat=True,
)["workspace_id"]
)
baseline_path = tmp_path / "vms" / "workspaces" / workspace_id / "baseline" / "workspace.tar"
baseline_path.unlink()
with pytest.raises(RuntimeError, match="require[s]? a baseline snapshot"):
manager.list_snapshots(workspace_id)
with pytest.raises(RuntimeError, match="require[s]? a baseline snapshot"):
manager.create_snapshot(workspace_id, "checkpoint")
with pytest.raises(RuntimeError, match="require[s]? a baseline snapshot"):
manager.delete_snapshot(workspace_id, "checkpoint")
with pytest.raises(RuntimeError, match="require[s]? a baseline snapshot"):
manager.reset_workspace(workspace_id)
def test_workspace_delete_baseline_snapshot_is_rejected(tmp_path: Path) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
workspace_id = str(
manager.create_workspace(
environment="debian:12-base",
allow_host_compat=True,
)["workspace_id"]
)
with pytest.raises(ValueError, match="cannot delete the baseline snapshot"):
manager.delete_snapshot(workspace_id, "baseline")
def test_workspace_reset_recreates_stopped_workspace(tmp_path: Path) -> None:
seed_dir = tmp_path / "seed"
seed_dir.mkdir()
(seed_dir / "note.txt").write_text("seed\n", encoding="utf-8")
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
workspace_id = str(
manager.create_workspace(
environment="debian:12-base",
allow_host_compat=True,
seed_path=seed_dir,
)["workspace_id"]
)
with manager._lock: # noqa: SLF001
workspace = manager._load_workspace_locked(workspace_id) # noqa: SLF001
workspace.state = "stopped"
workspace.firecracker_pid = None
manager._save_workspace_locked(workspace) # noqa: SLF001
reset_payload = manager.reset_workspace(workspace_id)
assert reset_payload["state"] == "started"
assert reset_payload["workspace_reset"]["snapshot_name"] == "baseline"
result = manager.exec_workspace(workspace_id, command="cat note.txt", timeout_seconds=30)
assert result["stdout"] == "seed\n"
def test_workspace_reset_failure_leaves_workspace_stopped(tmp_path: Path) -> None:
seed_dir = tmp_path / "seed"
seed_dir.mkdir()
(seed_dir / "note.txt").write_text("seed\n", encoding="utf-8")
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
workspace_id = str(
manager.create_workspace(
environment="debian:12-base",
allow_host_compat=True,
seed_path=seed_dir,
)["workspace_id"]
)
manager.create_snapshot(workspace_id, "checkpoint")
def _failing_import_archive(*args: Any, **kwargs: Any) -> dict[str, Any]:
del args, kwargs
raise RuntimeError("boom")
manager._backend.import_archive = _failing_import_archive # type: ignore[method-assign] # noqa: SLF001
with pytest.raises(RuntimeError, match="boom"):
manager.reset_workspace(workspace_id, snapshot="checkpoint")
with manager._lock: # noqa: SLF001
workspace = manager._load_workspace_locked(workspace_id) # noqa: SLF001
assert workspace.state == "stopped"
assert workspace.firecracker_pid is None
assert workspace.reset_count == 0
listed = manager.list_snapshots(workspace_id)
assert [snapshot["snapshot_name"] for snapshot in listed["snapshots"]] == [
"baseline",
"checkpoint",
]
def test_workspace_export_helpers_preserve_directory_symlinks(tmp_path: Path) -> None:
workspace_dir = tmp_path / "workspace"
workspace_dir.mkdir()
(workspace_dir / "note.txt").write_text("hello\n", encoding="utf-8")
os.symlink("note.txt", workspace_dir / "note-link")
(workspace_dir / "empty-dir").mkdir()
archive_path = tmp_path / "workspace-export.tar"
exported = vm_manager_module._prepare_workspace_export_archive( # noqa: SLF001
workspace_dir=workspace_dir,
workspace_path=".",
archive_path=archive_path,
)
assert exported.artifact_type == "directory"
output_dir = tmp_path / "output"
extracted = vm_manager_module._extract_workspace_export_archive( # noqa: SLF001
archive_path,
output_path=output_dir,
artifact_type="directory",
)
assert extracted["artifact_type"] == "directory"
assert (output_dir / "note.txt").read_text(encoding="utf-8") == "hello\n"
assert (output_dir / "note-link").is_symlink()
assert os.readlink(output_dir / "note-link") == "note.txt"
assert (output_dir / "empty-dir").is_dir()
def test_workspace_export_helpers_validate_missing_path_and_existing_output(tmp_path: Path) -> None:
workspace_dir = tmp_path / "workspace"
workspace_dir.mkdir()
(workspace_dir / "note.txt").write_text("hello\n", encoding="utf-8")
with pytest.raises(RuntimeError, match="workspace path does not exist"):
vm_manager_module._prepare_workspace_export_archive( # noqa: SLF001
workspace_dir=workspace_dir,
workspace_path="missing.txt",
archive_path=tmp_path / "missing.tar",
)
archive_path = tmp_path / "note-export.tar"
exported = vm_manager_module._prepare_workspace_export_archive( # noqa: SLF001
workspace_dir=workspace_dir,
workspace_path="note.txt",
archive_path=archive_path,
)
output_path = tmp_path / "note.txt"
output_path.write_text("already here\n", encoding="utf-8")
with pytest.raises(RuntimeError, match="output_path already exists"):
vm_manager_module._extract_workspace_export_archive( # noqa: SLF001
archive_path,
output_path=output_path,
artifact_type=exported.artifact_type,
)
def test_diff_workspace_trees_reports_empty_binary_symlink_and_type_changes(tmp_path: Path) -> None:
baseline_dir = tmp_path / "baseline"
current_dir = tmp_path / "current"
baseline_dir.mkdir()
current_dir.mkdir()
(baseline_dir / "modified.txt").write_text("before\n", encoding="utf-8")
(current_dir / "modified.txt").write_text("after\n", encoding="utf-8")
(baseline_dir / "deleted.txt").write_text("gone\n", encoding="utf-8")
(current_dir / "added.txt").write_text("new\n", encoding="utf-8")
(baseline_dir / "binary.bin").write_bytes(b"\x00before")
(current_dir / "binary.bin").write_bytes(b"\x00after")
os.symlink("link-target-old.txt", baseline_dir / "link")
os.symlink("link-target-new.txt", current_dir / "link")
(baseline_dir / "swap").mkdir()
(current_dir / "swap").write_text("type changed\n", encoding="utf-8")
(baseline_dir / "removed-empty").mkdir()
(current_dir / "added-empty").mkdir()
diff_payload = vm_manager_module._diff_workspace_trees( # noqa: SLF001
baseline_dir,
current_dir,
)
assert diff_payload["changed"] is True
assert diff_payload["summary"] == {
"total": 8,
"added": 2,
"modified": 3,
"deleted": 2,
"type_changed": 1,
"text_patched": 3,
"non_text": 5,
}
assert "--- a/modified.txt" in diff_payload["patch"]
assert "+++ b/modified.txt" in diff_payload["patch"]
assert "--- /dev/null" in diff_payload["patch"]
assert "+++ b/added.txt" in diff_payload["patch"]
assert "--- a/deleted.txt" in diff_payload["patch"]
assert "+++ /dev/null" in diff_payload["patch"]
entries = {entry["path"]: entry for entry in diff_payload["entries"]}
assert entries["binary.bin"]["text_patch"] is None
assert entries["link"]["artifact_type"] == "symlink"
assert entries["swap"]["artifact_type"] == "file"
assert entries["removed-empty"]["artifact_type"] == "directory"
assert entries["added-empty"]["artifact_type"] == "directory"
def test_diff_workspace_trees_unchanged_returns_empty_summary(tmp_path: Path) -> None:
baseline_dir = tmp_path / "baseline"
current_dir = tmp_path / "current"
baseline_dir.mkdir()
current_dir.mkdir()
(baseline_dir / "note.txt").write_text("same\n", encoding="utf-8")
(current_dir / "note.txt").write_text("same\n", encoding="utf-8")
diff_payload = vm_manager_module._diff_workspace_trees( # noqa: SLF001
baseline_dir,
current_dir,
)
assert diff_payload == {
"changed": False,
"summary": {
"total": 0,
"added": 0,
"modified": 0,
"deleted": 0,
"type_changed": 0,
"text_patched": 0,
"non_text": 0,
},
"entries": [],
"patch": "",
}
def test_workspace_shell_lifecycle_and_rehydration(tmp_path: Path) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
created = manager.create_workspace(
environment="debian:12-base",
allow_host_compat=True,
)
workspace_id = str(created["workspace_id"])
opened = manager.open_shell(workspace_id)
shell_id = str(opened["shell_id"])
assert opened["state"] == "running"
manager.write_shell(workspace_id, shell_id, input_text="pwd")
output = ""
deadline = time.time() + 5
while time.time() < deadline:
read = manager.read_shell(workspace_id, shell_id, cursor=0, max_chars=65536)
output = str(read["output"])
if "/workspace" in output:
break
time.sleep(0.05)
assert "/workspace" in output
manager_rehydrated = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
second_opened = manager_rehydrated.open_shell(workspace_id)
second_shell_id = str(second_opened["shell_id"])
assert second_shell_id != shell_id
manager_rehydrated.write_shell(workspace_id, second_shell_id, input_text="printf 'ok\\n'")
second_output = ""
deadline = time.time() + 5
while time.time() < deadline:
read = manager_rehydrated.read_shell(
workspace_id,
second_shell_id,
cursor=0,
max_chars=65536,
)
second_output = str(read["output"])
if "ok" in second_output:
break
time.sleep(0.05)
assert "ok" in second_output
logs = manager.logs_workspace(workspace_id)
assert logs["count"] == 0
closed = manager.close_shell(workspace_id, shell_id)
assert closed["closed"] is True
with pytest.raises(ValueError, match="does not exist"):
manager.read_shell(workspace_id, shell_id)
deleted = manager.delete_workspace(workspace_id)
assert deleted["deleted"] is True
with pytest.raises(ValueError, match="does not exist"):
manager_rehydrated.read_shell(workspace_id, second_shell_id)
def test_workspace_create_rejects_unsafe_seed_archive(tmp_path: Path) -> None:
archive_path = tmp_path / "bad.tgz"
with tarfile.open(archive_path, "w:gz") as archive:
payload = b"bad\n"
info = tarfile.TarInfo(name="../escape.txt")
info.size = len(payload)
archive.addfile(info, io.BytesIO(payload))
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
with pytest.raises(RuntimeError, match="unsafe archive member path"):
manager.create_workspace(
environment="debian:12-base",
allow_host_compat=True,
seed_path=archive_path,
)
assert list((tmp_path / "vms" / "workspaces").iterdir()) == []
def test_workspace_create_rejects_archive_that_writes_through_symlink(tmp_path: Path) -> None:
archive_path = tmp_path / "bad-symlink.tgz"
with tarfile.open(archive_path, "w:gz") as archive:
symlink_info = tarfile.TarInfo(name="linked")
symlink_info.type = tarfile.SYMTYPE
symlink_info.linkname = "outside"
archive.addfile(symlink_info)
payload = b"bad\n"
file_info = tarfile.TarInfo(name="linked/note.txt")
file_info.size = len(payload)
archive.addfile(file_info, io.BytesIO(payload))
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
with pytest.raises(RuntimeError, match="traverse through a symlinked path"):
manager.create_workspace(
environment="debian:12-base",
allow_host_compat=True,
seed_path=archive_path,
)
def test_workspace_create_cleans_up_on_seed_failure(
tmp_path: Path, monkeypatch: pytest.MonkeyPatch
) -> None:
source_dir = tmp_path / "seed"
source_dir.mkdir()
(source_dir / "note.txt").write_text("hello\n", encoding="utf-8")
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
def _boom(*args: Any, **kwargs: Any) -> dict[str, Any]:
del args, kwargs
raise RuntimeError("seed import failed")
monkeypatch.setattr(manager._backend, "import_archive", _boom) # noqa: SLF001
with pytest.raises(RuntimeError, match="seed import failed"):
manager.create_workspace(
environment="debian:12-base",
allow_host_compat=True,
seed_path=source_dir,
)
assert list((tmp_path / "vms" / "workspaces").iterdir()) == []
def test_workspace_rehydrates_across_manager_processes(tmp_path: Path) -> None:
base_dir = tmp_path / "vms"
manager = VmManager(
backend_name="mock",
base_dir=base_dir,
network_manager=TapNetworkManager(enabled=False),
)
workspace_id = str(
manager.create_workspace(
environment="debian:12-base",
allow_host_compat=True,
)["workspace_id"]
)
other = VmManager(
backend_name="mock",
base_dir=base_dir,
network_manager=TapNetworkManager(enabled=False),
)
executed = other.exec_workspace(workspace_id, command="printf 'ok\\n'", timeout_seconds=30)
assert executed["exit_code"] == 0
assert executed["stdout"] == "ok\n"
logs = other.logs_workspace(workspace_id)
assert logs["count"] == 1
def test_workspace_requires_started_state(tmp_path: Path) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
workspace_id = str(
manager.create_workspace(
environment="debian:12-base",
allow_host_compat=True,
)["workspace_id"]
)
workspace_path = tmp_path / "vms" / "workspaces" / workspace_id / "workspace.json"
payload = json.loads(workspace_path.read_text(encoding="utf-8"))
payload["state"] = "stopped"
workspace_path.write_text(json.dumps(payload, indent=2, sort_keys=True), encoding="utf-8")
with pytest.raises(RuntimeError, match="must be in 'started' state"):
manager.exec_workspace(workspace_id, command="true", timeout_seconds=30)
def test_vm_manager_firecracker_backend_path(
tmp_path: Path, monkeypatch: pytest.MonkeyPatch
) -> None:
class StubFirecrackerBackend:
def __init__(
self,
environment_store: Any,
firecracker_bin: Path,
jailer_bin: Path,
runtime_capabilities: Any,
network_manager: TapNetworkManager,
) -> None:
self.environment_store = environment_store
self.firecracker_bin = firecracker_bin
self.jailer_bin = jailer_bin
self.runtime_capabilities = runtime_capabilities
self.network_manager = network_manager
def create(self, instance: Any) -> None:
del instance
def start(self, instance: Any) -> None:
del instance
def exec(self, instance: Any, command: str, timeout_seconds: int) -> Any:
del instance, command, timeout_seconds
return None
def stop(self, instance: Any) -> None:
del instance
def delete(self, instance: Any) -> None:
del instance
monkeypatch.setattr(vm_manager_module, "FirecrackerBackend", StubFirecrackerBackend)
manager = VmManager(
backend_name="firecracker",
base_dir=tmp_path / "vms",
runtime_paths=resolve_runtime_paths(),
network_manager=TapNetworkManager(enabled=False),
)
assert manager._backend_name == "firecracker" # noqa: SLF001
def test_vm_manager_fails_closed_without_host_compat_opt_in(tmp_path: Path) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
vm_id = str(
manager.create_vm(
environment="debian:12-base",
ttl_seconds=600,
)["vm_id"]
)
with pytest.raises(RuntimeError, match="guest boot is unavailable"):
manager.start_vm(vm_id)
def test_vm_manager_uses_canonical_default_cache_dir(
monkeypatch: pytest.MonkeyPatch, tmp_path: Path
) -> None:
monkeypatch.setenv("PYRO_ENVIRONMENT_CACHE_DIR", str(tmp_path / "cache"))
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
assert manager._environment_store.cache_dir == tmp_path / "cache" # noqa: SLF001
def test_vm_manager_helper_round_trips() -> None:
network = NetworkConfig(
vm_id="abc123",
tap_name="tap0",
guest_ip="172.29.1.2",
gateway_ip="172.29.1.1",
subnet_cidr="172.29.1.0/24",
mac_address="06:00:aa:bb:cc:dd",
dns_servers=("1.1.1.1", "8.8.8.8"),
)
assert vm_manager_module._optional_int(None) is None # noqa: SLF001
assert vm_manager_module._optional_int(True) == 1 # noqa: SLF001
assert vm_manager_module._optional_int(7) == 7 # noqa: SLF001
assert vm_manager_module._optional_int(7.2) == 7 # noqa: SLF001
assert vm_manager_module._optional_int("9") == 9 # noqa: SLF001
with pytest.raises(TypeError, match="integer-compatible"):
vm_manager_module._optional_int(object()) # noqa: SLF001
assert vm_manager_module._optional_str(None) is None # noqa: SLF001
assert vm_manager_module._optional_str(1) == "1" # noqa: SLF001
assert vm_manager_module._optional_dict(None) is None # noqa: SLF001
assert vm_manager_module._optional_dict({"x": 1}) == {"x": 1} # noqa: SLF001
with pytest.raises(TypeError, match="dictionary payload"):
vm_manager_module._optional_dict("bad") # noqa: SLF001
assert vm_manager_module._string_dict({"x": 1}) == {"x": "1"} # noqa: SLF001
assert vm_manager_module._string_dict("bad") == {} # noqa: SLF001
serialized = vm_manager_module._serialize_network(network) # noqa: SLF001
assert serialized is not None
restored = vm_manager_module._deserialize_network(serialized) # noqa: SLF001
assert restored == network
assert vm_manager_module._deserialize_network(None) is None # noqa: SLF001
with pytest.raises(TypeError, match="dictionary payload"):
vm_manager_module._deserialize_network("bad") # noqa: SLF001
assert vm_manager_module._wrap_guest_command("echo hi") == "echo hi" # noqa: SLF001
wrapped = vm_manager_module._wrap_guest_command("echo hi", cwd="/workspace") # noqa: SLF001
assert "cd /workspace" in wrapped
assert vm_manager_module._pid_is_running(None) is False # noqa: SLF001
def test_copy_rootfs_falls_back_to_copy2(
tmp_path: Path, monkeypatch: pytest.MonkeyPatch
) -> None:
source = tmp_path / "rootfs.ext4"
source.write_text("payload", encoding="utf-8")
dest = tmp_path / "dest" / "rootfs.ext4"
def _raise_oserror(*args: Any, **kwargs: Any) -> Any:
del args, kwargs
raise OSError("no cp")
monkeypatch.setattr(subprocess, "run", _raise_oserror)
clone_mode = vm_manager_module._copy_rootfs(source, dest) # noqa: SLF001
assert clone_mode == "copy2"
assert dest.read_text(encoding="utf-8") == "payload"
def test_workspace_create_cleans_up_on_start_failure(
tmp_path: Path, monkeypatch: pytest.MonkeyPatch
) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
def _boom(instance: Any) -> None:
del instance
raise RuntimeError("boom")
monkeypatch.setattr(manager._backend, "start", _boom) # noqa: SLF001
with pytest.raises(RuntimeError, match="boom"):
manager.create_workspace(environment="debian:12-base", allow_host_compat=True)
assert list((tmp_path / "vms" / "workspaces").iterdir()) == []
def test_exec_instance_wraps_guest_workspace_command(tmp_path: Path) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
manager._runtime_capabilities = RuntimeCapabilities( # noqa: SLF001
supports_vm_boot=True,
supports_guest_exec=True,
supports_guest_network=False,
reason=None,
)
captured: dict[str, Any] = {}
class StubBackend:
def exec(
self,
instance: Any,
command: str,
timeout_seconds: int,
*,
workdir: Path | None = None,
) -> vm_manager_module.VmExecResult:
del instance, timeout_seconds
captured["command"] = command
captured["workdir"] = workdir
return vm_manager_module.VmExecResult(
stdout="",
stderr="",
exit_code=0,
duration_ms=1,
)
manager._backend = StubBackend() # type: ignore[assignment] # noqa: SLF001
instance = vm_manager_module.VmInstance( # noqa: SLF001
vm_id="vm-123",
environment="debian:12-base",
vcpu_count=1,
mem_mib=512,
ttl_seconds=600,
created_at=time.time(),
expires_at=time.time() + 600,
workdir=tmp_path / "runtime",
state="started",
)
result, execution_mode = manager._exec_instance( # noqa: SLF001
instance,
command="echo hi",
timeout_seconds=30,
guest_cwd="/workspace",
)
assert result.exit_code == 0
assert execution_mode == "unknown"
assert "cd /workspace" in str(captured["command"])
assert captured["workdir"] is None
def test_status_workspace_marks_dead_backing_process_stopped(tmp_path: Path) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
workspace_id = str(
manager.create_workspace(
environment="debian:12-base",
allow_host_compat=True,
)["workspace_id"]
)
workspace_path = tmp_path / "vms" / "workspaces" / workspace_id / "workspace.json"
payload = json.loads(workspace_path.read_text(encoding="utf-8"))
payload["metadata"]["execution_mode"] = "guest_vsock"
payload["firecracker_pid"] = 999999
workspace_path.write_text(json.dumps(payload, indent=2, sort_keys=True), encoding="utf-8")
status = manager.status_workspace(workspace_id)
assert status["state"] == "stopped"
updated_payload = json.loads(workspace_path.read_text(encoding="utf-8"))
assert "backing guest process" in str(updated_payload.get("last_error", ""))
def test_reap_expired_workspaces_removes_invalid_and_expired_records(tmp_path: Path) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
invalid_dir = tmp_path / "vms" / "workspaces" / "invalid"
invalid_dir.mkdir(parents=True)
(invalid_dir / "workspace.json").write_text("[]", encoding="utf-8")
workspace_id = str(
manager.create_workspace(
environment="debian:12-base",
allow_host_compat=True,
)["workspace_id"]
)
workspace_path = tmp_path / "vms" / "workspaces" / workspace_id / "workspace.json"
payload = json.loads(workspace_path.read_text(encoding="utf-8"))
payload["expires_at"] = 0.0
workspace_path.write_text(json.dumps(payload, indent=2, sort_keys=True), encoding="utf-8")
with manager._lock: # noqa: SLF001
manager._reap_expired_workspaces_locked(time.time()) # noqa: SLF001
assert not invalid_dir.exists()
assert not (tmp_path / "vms" / "workspaces" / workspace_id).exists()
def test_workspace_service_lifecycle_and_status_counts(tmp_path: Path) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
workspace_id = str(
manager.create_workspace(
environment="debian:12-base",
allow_host_compat=True,
)["workspace_id"]
)
started = manager.start_service(
workspace_id,
"app",
command="sh -lc 'printf \"service ready\\n\"; touch .ready; while true; do sleep 60; done'",
readiness={"type": "file", "path": ".ready"},
)
assert started["state"] == "running"
listed = manager.list_services(workspace_id)
assert listed["count"] == 1
assert listed["running_count"] == 1
status = manager.status_service(workspace_id, "app")
assert status["state"] == "running"
assert status["ready_at"] is not None
logs = manager.logs_service(workspace_id, "app")
assert "service ready" in str(logs["stdout"])
workspace_status = manager.status_workspace(workspace_id)
assert workspace_status["service_count"] == 1
assert workspace_status["running_service_count"] == 1
stopped = manager.stop_service(workspace_id, "app")
assert stopped["state"] == "stopped"
assert stopped["stop_reason"] in {"sigterm", "sigkill"}
deleted = manager.delete_workspace(workspace_id)
assert deleted["deleted"] is True
def test_workspace_service_start_replaces_non_running_record(tmp_path: Path) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
workspace_id = str(
manager.create_workspace(
environment="debian:12-base",
allow_host_compat=True,
)["workspace_id"]
)
failed = manager.start_service(
workspace_id,
"app",
command="sh -lc 'exit 2'",
readiness={"type": "file", "path": ".ready"},
ready_timeout_seconds=1,
ready_interval_ms=50,
)
assert failed["state"] == "failed"
started = manager.start_service(
workspace_id,
"app",
command="sh -lc 'touch .ready; while true; do sleep 60; done'",
readiness={"type": "file", "path": ".ready"},
)
assert started["state"] == "running"
manager.delete_workspace(workspace_id)
def test_workspace_service_supports_command_readiness_and_helper_probes(
tmp_path: Path, monkeypatch: pytest.MonkeyPatch
) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
workspace_id = str(
manager.create_workspace(
environment="debian:12-base",
allow_host_compat=True,
)["workspace_id"]
)
command_started = manager.start_service(
workspace_id,
"command-ready",
command="sh -lc 'touch command.ready; while true; do sleep 60; done'",
readiness={"type": "command", "command": "test -f command.ready"},
)
assert command_started["state"] == "running"
listed = manager.list_services(workspace_id)
assert listed["count"] == 1
assert listed["running_count"] == 1
status = manager.status_workspace(workspace_id)
assert status["service_count"] == 1
assert status["running_service_count"] == 1
assert manager.stop_service(workspace_id, "command-ready")["state"] == "stopped"
workspace_dir = tmp_path / "vms" / "workspaces" / workspace_id / "workspace"
ready_file = workspace_dir / "probe.ready"
ready_file.write_text("ok\n", encoding="utf-8")
assert vm_manager_module._service_ready_on_host( # noqa: SLF001
readiness={"type": "file", "path": "/workspace/probe.ready"},
workspace_dir=workspace_dir,
cwd=workspace_dir,
)
class StubSocket:
def __enter__(self) -> StubSocket:
return self
def __exit__(self, *args: object) -> None:
del args
def settimeout(self, timeout: int) -> None:
assert timeout == 1
def connect(self, address: tuple[str, int]) -> None:
assert address == ("127.0.0.1", 8080)
monkeypatch.setattr("pyro_mcp.vm_manager.socket.socket", lambda *args: StubSocket())
assert vm_manager_module._service_ready_on_host( # noqa: SLF001
readiness={"type": "tcp", "address": "127.0.0.1:8080"},
workspace_dir=workspace_dir,
cwd=workspace_dir,
)
class StubResponse:
status = 204
def __enter__(self) -> StubResponse:
return self
def __exit__(self, *args: object) -> None:
del args
def _urlopen(request: object, timeout: int) -> StubResponse:
del request
assert timeout == 2
return StubResponse()
monkeypatch.setattr("pyro_mcp.vm_manager.urllib.request.urlopen", _urlopen)
assert vm_manager_module._service_ready_on_host( # noqa: SLF001
readiness={"type": "http", "url": "http://127.0.0.1:8080/"},
workspace_dir=workspace_dir,
cwd=workspace_dir,
)
def test_workspace_service_logs_tail_and_delete_cleanup(tmp_path: Path) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
workspace_id = str(
manager.create_workspace(
environment="debian:12-base",
allow_host_compat=True,
)["workspace_id"]
)
manager.start_service(
workspace_id,
"logger",
command=(
"sh -lc 'printf \"one\\n\"; printf \"two\\n\"; "
"touch .ready; while true; do sleep 60; done'"
),
readiness={"type": "file", "path": ".ready"},
)
logs = manager.logs_service(workspace_id, "logger", tail_lines=1)
assert logs["stdout"] == "two\n"
assert logs["truncated"] is True
services_dir = tmp_path / "vms" / "workspaces" / workspace_id / "services"
assert services_dir.exists()
deleted = manager.delete_workspace(workspace_id)
assert deleted["deleted"] is True
assert not services_dir.exists()
def test_workspace_status_stops_service_counts_when_workspace_is_stopped(tmp_path: Path) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
workspace_id = str(
manager.create_workspace(
environment="debian:12-base",
allow_host_compat=True,
)["workspace_id"]
)
manager.start_service(
workspace_id,
"app",
command="sh -lc 'touch .ready; while true; do sleep 60; done'",
readiness={"type": "file", "path": ".ready"},
)
service_path = tmp_path / "vms" / "workspaces" / workspace_id / "services" / "app.json"
live_service_payload = json.loads(service_path.read_text(encoding="utf-8"))
live_pid = int(live_service_payload["pid"])
try:
workspace_path = tmp_path / "vms" / "workspaces" / workspace_id / "workspace.json"
payload = json.loads(workspace_path.read_text(encoding="utf-8"))
payload["state"] = "stopped"
workspace_path.write_text(json.dumps(payload, indent=2, sort_keys=True), encoding="utf-8")
status = manager.status_workspace(workspace_id)
assert status["state"] == "stopped"
assert status["service_count"] == 1
assert status["running_service_count"] == 0
service_payload = json.loads(service_path.read_text(encoding="utf-8"))
assert service_payload["state"] == "stopped"
assert service_payload["stop_reason"] == "workspace_stopped"
finally:
vm_manager_module._stop_process_group(live_pid) # noqa: SLF001
def test_workspace_service_readiness_validation_helpers() -> None:
assert vm_manager_module._normalize_workspace_service_name("app-1") == "app-1" # noqa: SLF001
with pytest.raises(ValueError, match="service_name must not be empty"):
vm_manager_module._normalize_workspace_service_name(" ") # noqa: SLF001
with pytest.raises(ValueError, match="service_name must match"):
vm_manager_module._normalize_workspace_service_name("bad name") # noqa: SLF001
assert vm_manager_module._normalize_workspace_service_readiness( # noqa: SLF001
{"type": "file", "path": "subdir/.ready"}
) == {"type": "file", "path": "/workspace/subdir/.ready"}
assert vm_manager_module._normalize_workspace_service_readiness( # noqa: SLF001
{"type": "tcp", "address": "127.0.0.1:8080"}
) == {"type": "tcp", "address": "127.0.0.1:8080"}
assert vm_manager_module._normalize_workspace_service_readiness( # noqa: SLF001
{"type": "http", "url": "http://127.0.0.1:8080/"}
) == {"type": "http", "url": "http://127.0.0.1:8080/"}
assert vm_manager_module._normalize_workspace_service_readiness( # noqa: SLF001
{"type": "command", "command": "test -f .ready"}
) == {"type": "command", "command": "test -f .ready"}
with pytest.raises(ValueError, match="one of: file, tcp, http, command"):
vm_manager_module._normalize_workspace_service_readiness({"type": "bogus"}) # noqa: SLF001
with pytest.raises(ValueError, match="required for file readiness"):
vm_manager_module._normalize_workspace_service_readiness({"type": "file"}) # noqa: SLF001
with pytest.raises(ValueError, match="HOST:PORT format"):
vm_manager_module._normalize_workspace_service_readiness( # noqa: SLF001
{"type": "tcp", "address": "127.0.0.1"}
)
with pytest.raises(ValueError, match="required for http readiness"):
vm_manager_module._normalize_workspace_service_readiness({"type": "http"}) # noqa: SLF001
with pytest.raises(ValueError, match="required for command readiness"):
vm_manager_module._normalize_workspace_service_readiness({"type": "command"}) # noqa: SLF001
def test_workspace_service_text_and_exit_code_helpers(tmp_path: Path) -> None:
status_path = tmp_path / "service.status"
assert vm_manager_module._read_service_exit_code(status_path) is None # noqa: SLF001
status_path.write_text("", encoding="utf-8")
assert vm_manager_module._read_service_exit_code(status_path) is None # noqa: SLF001
status_path.write_text("7\n", encoding="utf-8")
assert vm_manager_module._read_service_exit_code(status_path) == 7 # noqa: SLF001
log_path = tmp_path / "service.log"
assert vm_manager_module._tail_text(log_path, tail_lines=10) == ("", False) # noqa: SLF001
log_path.write_text("one\ntwo\nthree\n", encoding="utf-8")
assert vm_manager_module._tail_text(log_path, tail_lines=None) == ( # noqa: SLF001
"one\ntwo\nthree\n",
False,
)
assert vm_manager_module._tail_text(log_path, tail_lines=5) == ( # noqa: SLF001
"one\ntwo\nthree\n",
False,
)
assert vm_manager_module._tail_text(log_path, tail_lines=1) == ("three\n", True) # noqa: SLF001
def test_workspace_service_process_group_helpers(monkeypatch: pytest.MonkeyPatch) -> None:
def _missing(_pid: int, _signal: int) -> None:
raise ProcessLookupError()
monkeypatch.setattr("pyro_mcp.vm_manager.os.killpg", _missing)
assert vm_manager_module._stop_process_group(123) == (False, False) # noqa: SLF001
kill_calls: list[int] = []
monotonic_values = iter([0.0, 0.0, 5.0, 5.0, 10.0])
running_states = iter([True, True, False])
def _killpg(_pid: int, signum: int) -> None:
kill_calls.append(signum)
def _monotonic() -> float:
return next(monotonic_values)
def _is_running(_pid: int | None) -> bool:
return next(running_states)
monkeypatch.setattr("pyro_mcp.vm_manager.os.killpg", _killpg)
monkeypatch.setattr("pyro_mcp.vm_manager.time.monotonic", _monotonic)
monkeypatch.setattr("pyro_mcp.vm_manager.time.sleep", lambda _seconds: None)
monkeypatch.setattr("pyro_mcp.vm_manager._pid_is_running", _is_running)
stopped, killed = vm_manager_module._stop_process_group(456, wait_seconds=5) # noqa: SLF001
assert (stopped, killed) == (True, True)
assert kill_calls == [signal.SIGTERM, signal.SIGKILL]
def test_workspace_service_probe_and_refresh_helpers(
tmp_path: Path, monkeypatch: pytest.MonkeyPatch
) -> None:
assert vm_manager_module._run_service_probe_command(tmp_path, "exit 3") == 3 # noqa: SLF001
services_dir = tmp_path / "services"
services_dir.mkdir()
status_path = services_dir / "app.status"
status_path.write_text("9\n", encoding="utf-8")
running = vm_manager_module.WorkspaceServiceRecord( # noqa: SLF001
workspace_id="workspace-1",
service_name="app",
command="sleep 60",
cwd="/workspace",
state="running",
started_at=time.time(),
readiness=None,
ready_at=None,
ended_at=None,
exit_code=None,
pid=1234,
execution_mode="host_compat",
stop_reason=None,
)
monkeypatch.setattr("pyro_mcp.vm_manager._pid_is_running", lambda _pid: False)
refreshed = vm_manager_module._refresh_local_service_record( # noqa: SLF001
running,
services_dir=services_dir,
)
assert refreshed.state == "exited"
assert refreshed.exit_code == 9
monkeypatch.setattr(
"pyro_mcp.vm_manager._stop_process_group",
lambda _pid: (True, False),
)
stopped = vm_manager_module._stop_local_service( # noqa: SLF001
refreshed,
services_dir=services_dir,
)
assert stopped.state == "stopped"
assert stopped.stop_reason == "sigterm"
def test_workspace_secrets_redact_exec_shell_service_and_survive_reset(tmp_path: Path) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
secret_file = tmp_path / "token.txt"
secret_file.write_text("from-file\n", encoding="utf-8")
created = manager.create_workspace(
environment="debian:12-base",
allow_host_compat=True,
secrets=[
{"name": "API_TOKEN", "value": "expected"},
{"name": "FILE_TOKEN", "file_path": str(secret_file)},
],
)
workspace_id = str(created["workspace_id"])
assert created["secrets"] == [
{"name": "API_TOKEN", "source_kind": "literal"},
{"name": "FILE_TOKEN", "source_kind": "file"},
]
no_secret = manager.exec_workspace(
workspace_id,
command='sh -lc \'printf "%s" "${API_TOKEN:-missing}"\'',
timeout_seconds=30,
)
assert no_secret["stdout"] == "missing"
executed = manager.exec_workspace(
workspace_id,
command='sh -lc \'printf "%s\\n" "$API_TOKEN"\'',
timeout_seconds=30,
secret_env={"API_TOKEN": "API_TOKEN"},
)
assert executed["stdout"] == "[REDACTED]\n"
logs = manager.logs_workspace(workspace_id)
assert logs["entries"][-1]["stdout"] == "[REDACTED]\n"
shell = manager.open_shell(
workspace_id,
secret_env={"API_TOKEN": "API_TOKEN"},
)
shell_id = str(shell["shell_id"])
manager.write_shell(workspace_id, shell_id, input_text='printf "%s\\n" "$API_TOKEN"')
output = ""
deadline = time.time() + 5
while time.time() < deadline:
read = manager.read_shell(workspace_id, shell_id, cursor=0, max_chars=65536)
output = str(read["output"])
if "[REDACTED]" in output:
break
time.sleep(0.05)
assert "[REDACTED]" in output
manager.close_shell(workspace_id, shell_id)
started = manager.start_service(
workspace_id,
"app",
command=(
'sh -lc \'trap "exit 0" TERM; printf "%s\\n" "$API_TOKEN" >&2; '
'touch .ready; while true; do sleep 60; done\''
),
readiness={"type": "file", "path": ".ready"},
secret_env={"API_TOKEN": "API_TOKEN"},
)
assert started["state"] == "running"
service_logs = manager.logs_service(workspace_id, "app", tail_lines=None)
assert "[REDACTED]" in str(service_logs["stderr"])
reset = manager.reset_workspace(workspace_id)
assert reset["secrets"] == created["secrets"]
after_reset = manager.exec_workspace(
workspace_id,
command='sh -lc \'printf "%s\\n" "$API_TOKEN"\'',
timeout_seconds=30,
secret_env={"API_TOKEN": "API_TOKEN"},
)
assert after_reset["stdout"] == "[REDACTED]\n"
def test_workspace_secret_validation_helpers(tmp_path: Path) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
assert vm_manager_module._normalize_workspace_secret_name("API_TOKEN") == "API_TOKEN" # noqa: SLF001
with pytest.raises(ValueError, match="secret name must match"):
vm_manager_module._normalize_workspace_secret_name("bad-name") # noqa: SLF001
with pytest.raises(ValueError, match="must not be empty"):
vm_manager_module._validate_workspace_secret_value("TOKEN", "") # noqa: SLF001
with pytest.raises(ValueError, match="duplicate secret name"):
manager.create_workspace(
environment="debian:12-base",
allow_host_compat=True,
secrets=[
{"name": "TOKEN", "value": "one"},
{"name": "TOKEN", "value": "two"},
],
)
def test_prepare_workspace_secrets_handles_file_inputs_and_validation_errors(
tmp_path: Path,
) -> None:
secrets_dir = tmp_path / "secrets"
valid_file = tmp_path / "token.txt"
valid_file.write_text("from-file\n", encoding="utf-8")
invalid_utf8 = tmp_path / "invalid.bin"
invalid_utf8.write_bytes(b"\xff\xfe")
oversized = tmp_path / "oversized.txt"
oversized.write_text(
"x" * (vm_manager_module.WORKSPACE_SECRET_MAX_BYTES + 1),
encoding="utf-8",
)
records, values = vm_manager_module._prepare_workspace_secrets( # noqa: SLF001
[
{"name": "B_TOKEN", "value": "literal"},
{"name": "A_TOKEN", "file_path": str(valid_file)},
],
secrets_dir=secrets_dir,
)
assert [record.name for record in records] == ["A_TOKEN", "B_TOKEN"]
assert values == {"A_TOKEN": "from-file\n", "B_TOKEN": "literal"}
assert (secrets_dir / "A_TOKEN.secret").read_text(encoding="utf-8") == "from-file\n"
assert oct(secrets_dir.stat().st_mode & 0o777) == "0o700"
assert oct((secrets_dir / "A_TOKEN.secret").stat().st_mode & 0o777) == "0o600"
with pytest.raises(ValueError, match="must be a dictionary"):
vm_manager_module._prepare_workspace_secrets( # noqa: SLF001
[cast(dict[str, str], "bad")],
secrets_dir=tmp_path / "bad1",
)
with pytest.raises(ValueError, match="missing 'name'"):
vm_manager_module._prepare_workspace_secrets([{}], secrets_dir=tmp_path / "bad2") # noqa: SLF001
with pytest.raises(ValueError, match="exactly one of 'value' or 'file_path'"):
vm_manager_module._prepare_workspace_secrets( # noqa: SLF001
[{"name": "TOKEN", "value": "x", "file_path": str(valid_file)}],
secrets_dir=tmp_path / "bad3",
)
with pytest.raises(ValueError, match="file_path must not be empty"):
vm_manager_module._prepare_workspace_secrets( # noqa: SLF001
[{"name": "TOKEN", "file_path": " "}],
secrets_dir=tmp_path / "bad4",
)
with pytest.raises(ValueError, match="does not exist"):
vm_manager_module._prepare_workspace_secrets( # noqa: SLF001
[{"name": "TOKEN", "file_path": str(tmp_path / "missing.txt")}],
secrets_dir=tmp_path / "bad5",
)
with pytest.raises(ValueError, match="must be valid UTF-8 text"):
vm_manager_module._prepare_workspace_secrets( # noqa: SLF001
[{"name": "TOKEN", "file_path": str(invalid_utf8)}],
secrets_dir=tmp_path / "bad6",
)
with pytest.raises(ValueError, match="must be at most"):
vm_manager_module._prepare_workspace_secrets( # noqa: SLF001
[{"name": "TOKEN", "file_path": str(oversized)}],
secrets_dir=tmp_path / "bad7",
)
def test_workspace_secrets_require_guest_exec_on_firecracker_runtime(
tmp_path: Path,
monkeypatch: pytest.MonkeyPatch,
) -> None:
class StubFirecrackerBackend:
def __init__(self, *args: Any, **kwargs: Any) -> None:
del args, kwargs
def create(self, instance: Any) -> None:
del instance
def start(self, instance: Any) -> None:
del instance
def stop(self, instance: Any) -> None:
del instance
def delete(self, instance: Any) -> None:
del instance
monkeypatch.setattr(vm_manager_module, "FirecrackerBackend", StubFirecrackerBackend)
manager = VmManager(
backend_name="firecracker",
base_dir=tmp_path / "vms",
runtime_paths=resolve_runtime_paths(),
network_manager=TapNetworkManager(enabled=False),
)
manager._runtime_capabilities = RuntimeCapabilities( # noqa: SLF001
supports_vm_boot=True,
supports_guest_exec=False,
supports_guest_network=False,
reason="guest exec is unavailable",
)
with pytest.raises(RuntimeError, match="workspace secrets require guest execution"):
manager.create_workspace(
environment="debian:12-base",
allow_host_compat=True,
secrets=[{"name": "TOKEN", "value": "expected"}],
)