Add workspace summary across the CLI, SDK, and MCP, and include it in the workspace-core profile so chat hosts can review one concise view of the current session. Persist lightweight review events for syncs, file edits, patch applies, exports, service lifecycle, and snapshot activity, then synthesize them with command history, current services, snapshot state, and current diff data since the last reset. Update the walkthroughs, use-case docs, public contract, changelog, and roadmap for 4.3.0, and make dist-check invoke the CLI module directly so local package reinstall quirks do not break the packaging gate. Validation: uv lock; ./.venv/bin/pytest --no-cov tests/test_vm_manager.py tests/test_cli.py tests/test_api.py tests/test_server.py tests/test_public_contract.py tests/test_workspace_use_case_smokes.py; UV_OFFLINE=1 UV_CACHE_DIR=.uv-cache make check; UV_OFFLINE=1 UV_CACHE_DIR=.uv-cache make dist-check; real guest-backed workspace create -> patch apply -> workspace summary --json -> delete smoke.
3510 lines
121 KiB
Python
3510 lines
121 KiB
Python
from __future__ import annotations
|
|
|
|
import io
|
|
import json
|
|
import os
|
|
import signal
|
|
import subprocess
|
|
import tarfile
|
|
import time
|
|
from pathlib import Path
|
|
from typing import Any, cast
|
|
|
|
import pytest
|
|
|
|
import pyro_mcp.vm_manager as vm_manager_module
|
|
from pyro_mcp.runtime import RuntimeCapabilities, resolve_runtime_paths
|
|
from pyro_mcp.vm_manager import VmManager
|
|
from pyro_mcp.vm_network import NetworkConfig, TapNetworkManager
|
|
|
|
|
|
def _run_debugfs_write(rootfs_image: Path, command: str) -> None:
|
|
proc = subprocess.run( # noqa: S603
|
|
["debugfs", "-w", "-R", command, str(rootfs_image)],
|
|
text=True,
|
|
capture_output=True,
|
|
check=False,
|
|
)
|
|
if proc.returncode != 0:
|
|
message = proc.stderr.strip() or proc.stdout.strip() or command
|
|
raise RuntimeError(message)
|
|
|
|
|
|
def _create_stopped_workspace_rootfs(tmp_path: Path) -> Path:
|
|
rootfs_image = tmp_path / "workspace-rootfs.ext4"
|
|
with rootfs_image.open("wb") as handle:
|
|
handle.truncate(16 * 1024 * 1024)
|
|
proc = subprocess.run( # noqa: S603
|
|
["mkfs.ext4", "-F", str(rootfs_image)],
|
|
text=True,
|
|
capture_output=True,
|
|
check=False,
|
|
)
|
|
if proc.returncode != 0:
|
|
message = proc.stderr.strip() or proc.stdout.strip() or "mkfs.ext4 failed"
|
|
raise RuntimeError(message)
|
|
for directory in (
|
|
"/workspace",
|
|
"/workspace/src",
|
|
"/run",
|
|
"/run/pyro-secrets",
|
|
"/run/pyro-services",
|
|
):
|
|
_run_debugfs_write(rootfs_image, f"mkdir {directory}")
|
|
note_path = tmp_path / "note.txt"
|
|
note_path.write_text("hello from disk\n", encoding="utf-8")
|
|
child_path = tmp_path / "child.txt"
|
|
child_path.write_text("nested child\n", encoding="utf-8")
|
|
secret_path = tmp_path / "secret.txt"
|
|
secret_path.write_text("super-secret\n", encoding="utf-8")
|
|
service_path = tmp_path / "service.log"
|
|
service_path.write_text("service runtime\n", encoding="utf-8")
|
|
_run_debugfs_write(rootfs_image, f"write {note_path} /workspace/note.txt")
|
|
_run_debugfs_write(rootfs_image, f"write {child_path} /workspace/src/child.txt")
|
|
_run_debugfs_write(rootfs_image, "symlink /workspace/link note.txt")
|
|
_run_debugfs_write(rootfs_image, f"write {secret_path} /run/pyro-secrets/TOKEN")
|
|
_run_debugfs_write(rootfs_image, f"write {service_path} /run/pyro-services/app.log")
|
|
return rootfs_image
|
|
|
|
|
|
def test_vm_manager_lifecycle_and_auto_cleanup(tmp_path: Path) -> None:
|
|
manager = VmManager(
|
|
backend_name="mock",
|
|
base_dir=tmp_path / "vms",
|
|
network_manager=TapNetworkManager(enabled=False),
|
|
)
|
|
created = manager.create_vm(
|
|
environment="debian:12",
|
|
vcpu_count=1,
|
|
mem_mib=512,
|
|
ttl_seconds=600,
|
|
allow_host_compat=True,
|
|
)
|
|
vm_id = str(created["vm_id"])
|
|
started = manager.start_vm(vm_id)
|
|
assert started["state"] == "started"
|
|
|
|
executed = manager.exec_vm(vm_id, command="printf 'git version 2.43.0\\n'", timeout_seconds=30)
|
|
assert executed["exit_code"] == 0
|
|
assert executed["execution_mode"] == "host_compat"
|
|
assert "git version" in str(executed["stdout"])
|
|
with pytest.raises(ValueError, match="does not exist"):
|
|
manager.status_vm(vm_id)
|
|
|
|
|
|
def test_vm_manager_exec_timeout(tmp_path: Path) -> None:
|
|
manager = VmManager(
|
|
backend_name="mock",
|
|
base_dir=tmp_path / "vms",
|
|
network_manager=TapNetworkManager(enabled=False),
|
|
)
|
|
vm_id = str(
|
|
manager.create_vm(
|
|
environment="debian:12-base",
|
|
vcpu_count=1,
|
|
mem_mib=512,
|
|
ttl_seconds=600,
|
|
allow_host_compat=True,
|
|
)["vm_id"]
|
|
)
|
|
manager.start_vm(vm_id)
|
|
result = manager.exec_vm(vm_id, command="sleep 2", timeout_seconds=1)
|
|
assert result["exit_code"] == 124
|
|
assert "timed out" in str(result["stderr"])
|
|
|
|
|
|
def test_vm_manager_stop_and_delete(tmp_path: Path) -> None:
|
|
manager = VmManager(
|
|
backend_name="mock",
|
|
base_dir=tmp_path / "vms",
|
|
network_manager=TapNetworkManager(enabled=False),
|
|
)
|
|
vm_id = str(
|
|
manager.create_vm(
|
|
environment="debian:12-base",
|
|
vcpu_count=1,
|
|
mem_mib=512,
|
|
ttl_seconds=600,
|
|
allow_host_compat=True,
|
|
)["vm_id"]
|
|
)
|
|
manager.start_vm(vm_id)
|
|
stopped = manager.stop_vm(vm_id)
|
|
assert stopped["state"] == "stopped"
|
|
deleted = manager.delete_vm(vm_id)
|
|
assert deleted["deleted"] is True
|
|
|
|
|
|
def test_vm_manager_reaps_expired(tmp_path: Path) -> None:
|
|
manager = VmManager(
|
|
backend_name="mock",
|
|
base_dir=tmp_path / "vms",
|
|
network_manager=TapNetworkManager(enabled=False),
|
|
)
|
|
manager.MIN_TTL_SECONDS = 1
|
|
vm_id = str(
|
|
manager.create_vm(
|
|
environment="debian:12-base",
|
|
vcpu_count=1,
|
|
mem_mib=512,
|
|
ttl_seconds=1,
|
|
allow_host_compat=True,
|
|
)["vm_id"]
|
|
)
|
|
instance = manager._instances[vm_id] # noqa: SLF001
|
|
instance.expires_at = 0.0
|
|
result = manager.reap_expired()
|
|
assert result["count"] == 1
|
|
with pytest.raises(ValueError):
|
|
manager.status_vm(vm_id)
|
|
|
|
|
|
def test_vm_manager_reaps_started_vm(tmp_path: Path) -> None:
|
|
manager = VmManager(
|
|
backend_name="mock",
|
|
base_dir=tmp_path / "vms",
|
|
network_manager=TapNetworkManager(enabled=False),
|
|
)
|
|
manager.MIN_TTL_SECONDS = 1
|
|
vm_id = str(
|
|
manager.create_vm(
|
|
environment="debian:12-base",
|
|
vcpu_count=1,
|
|
mem_mib=512,
|
|
ttl_seconds=1,
|
|
allow_host_compat=True,
|
|
)["vm_id"]
|
|
)
|
|
manager.start_vm(vm_id)
|
|
manager._instances[vm_id].expires_at = 0.0 # noqa: SLF001
|
|
result = manager.reap_expired()
|
|
assert result["count"] == 1
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
("kwargs", "msg"),
|
|
[
|
|
({"vcpu_count": 0, "mem_mib": 512, "ttl_seconds": 600}, "vcpu_count must be between"),
|
|
({"vcpu_count": 1, "mem_mib": 64, "ttl_seconds": 600}, "mem_mib must be between"),
|
|
({"vcpu_count": 1, "mem_mib": 512, "ttl_seconds": 30}, "ttl_seconds must be between"),
|
|
],
|
|
)
|
|
def test_vm_manager_validates_limits(tmp_path: Path, kwargs: dict[str, Any], msg: str) -> None:
|
|
manager = VmManager(
|
|
backend_name="mock",
|
|
base_dir=tmp_path / "vms",
|
|
network_manager=TapNetworkManager(enabled=False),
|
|
)
|
|
with pytest.raises(ValueError, match=msg):
|
|
manager.create_vm(environment="debian:12-base", **kwargs)
|
|
|
|
|
|
def test_vm_manager_max_active_limit(tmp_path: Path) -> None:
|
|
manager = VmManager(
|
|
backend_name="mock",
|
|
base_dir=tmp_path / "vms",
|
|
max_active_vms=1,
|
|
network_manager=TapNetworkManager(enabled=False),
|
|
)
|
|
manager.create_vm(
|
|
environment="debian:12-base",
|
|
vcpu_count=1,
|
|
mem_mib=512,
|
|
ttl_seconds=600,
|
|
allow_host_compat=True,
|
|
)
|
|
with pytest.raises(RuntimeError, match="max active VMs reached"):
|
|
manager.create_vm(
|
|
environment="debian:12-base",
|
|
vcpu_count=1,
|
|
mem_mib=512,
|
|
ttl_seconds=600,
|
|
allow_host_compat=True,
|
|
)
|
|
|
|
|
|
def test_vm_manager_state_validation(tmp_path: Path) -> None:
|
|
manager = VmManager(
|
|
backend_name="mock",
|
|
base_dir=tmp_path / "vms",
|
|
network_manager=TapNetworkManager(enabled=False),
|
|
)
|
|
vm_id = str(
|
|
manager.create_vm(
|
|
environment="debian:12-base",
|
|
vcpu_count=1,
|
|
mem_mib=512,
|
|
ttl_seconds=600,
|
|
allow_host_compat=True,
|
|
)["vm_id"]
|
|
)
|
|
with pytest.raises(RuntimeError, match="must be in 'started' state"):
|
|
manager.exec_vm(vm_id, command="echo hi", timeout_seconds=30)
|
|
with pytest.raises(ValueError, match="must be positive"):
|
|
manager.exec_vm(vm_id, command="echo hi", timeout_seconds=0)
|
|
manager.start_vm(vm_id)
|
|
with pytest.raises(RuntimeError, match="cannot be started from state"):
|
|
manager.start_vm(vm_id)
|
|
|
|
|
|
def test_vm_manager_status_expired_raises(tmp_path: Path) -> None:
|
|
manager = VmManager(
|
|
backend_name="mock",
|
|
base_dir=tmp_path / "vms",
|
|
network_manager=TapNetworkManager(enabled=False),
|
|
)
|
|
manager.MIN_TTL_SECONDS = 1
|
|
vm_id = str(
|
|
manager.create_vm(
|
|
environment="debian:12-base",
|
|
vcpu_count=1,
|
|
mem_mib=512,
|
|
ttl_seconds=1,
|
|
allow_host_compat=True,
|
|
)["vm_id"]
|
|
)
|
|
manager._instances[vm_id].expires_at = 0.0 # noqa: SLF001
|
|
with pytest.raises(RuntimeError, match="expired and was automatically deleted"):
|
|
manager.status_vm(vm_id)
|
|
|
|
|
|
def test_vm_manager_invalid_backend(tmp_path: Path) -> None:
|
|
with pytest.raises(ValueError, match="invalid backend"):
|
|
VmManager(
|
|
backend_name="nope",
|
|
base_dir=tmp_path / "vms",
|
|
network_manager=TapNetworkManager(enabled=False),
|
|
)
|
|
|
|
|
|
def test_vm_manager_network_info(tmp_path: Path) -> None:
|
|
manager = VmManager(
|
|
backend_name="mock",
|
|
base_dir=tmp_path / "vms",
|
|
network_manager=TapNetworkManager(enabled=False),
|
|
)
|
|
created = manager.create_vm(
|
|
environment="debian:12-base",
|
|
vcpu_count=1,
|
|
mem_mib=512,
|
|
ttl_seconds=600,
|
|
allow_host_compat=True,
|
|
)
|
|
vm_id = str(created["vm_id"])
|
|
status = manager.status_vm(vm_id)
|
|
info = manager.network_info_vm(vm_id)
|
|
assert status["network_enabled"] is False
|
|
assert status["guest_ip"] is None
|
|
assert info["network_enabled"] is False
|
|
|
|
|
|
def test_vm_manager_run_vm(tmp_path: Path) -> None:
|
|
manager = VmManager(
|
|
backend_name="mock",
|
|
base_dir=tmp_path / "vms",
|
|
network_manager=TapNetworkManager(enabled=False),
|
|
)
|
|
result = manager.run_vm(
|
|
environment="debian:12-base",
|
|
command="printf 'ok\\n'",
|
|
vcpu_count=1,
|
|
mem_mib=512,
|
|
timeout_seconds=30,
|
|
ttl_seconds=600,
|
|
network=False,
|
|
allow_host_compat=True,
|
|
)
|
|
assert int(result["exit_code"]) == 0
|
|
assert str(result["stdout"]) == "ok\n"
|
|
|
|
|
|
def test_workspace_lifecycle_and_logs(tmp_path: Path) -> None:
|
|
manager = VmManager(
|
|
backend_name="mock",
|
|
base_dir=tmp_path / "vms",
|
|
network_manager=TapNetworkManager(enabled=False),
|
|
)
|
|
|
|
created = manager.create_workspace(
|
|
environment="debian:12-base",
|
|
allow_host_compat=True,
|
|
)
|
|
workspace_id = str(created["workspace_id"])
|
|
assert created["state"] == "started"
|
|
assert created["workspace_path"] == "/workspace"
|
|
|
|
first = manager.exec_workspace(
|
|
workspace_id,
|
|
command="printf 'hello\\n' > note.txt",
|
|
timeout_seconds=30,
|
|
)
|
|
second = manager.exec_workspace(workspace_id, command="cat note.txt", timeout_seconds=30)
|
|
|
|
assert first["exit_code"] == 0
|
|
assert second["stdout"] == "hello\n"
|
|
|
|
status = manager.status_workspace(workspace_id)
|
|
assert status["command_count"] == 2
|
|
assert status["last_command"] is not None
|
|
|
|
logs = manager.logs_workspace(workspace_id)
|
|
assert logs["count"] == 2
|
|
entries = logs["entries"]
|
|
assert isinstance(entries, list)
|
|
assert entries[1]["stdout"] == "hello\n"
|
|
|
|
deleted = manager.delete_workspace(workspace_id)
|
|
assert deleted["deleted"] is True
|
|
with pytest.raises(ValueError, match="does not exist"):
|
|
manager.status_workspace(workspace_id)
|
|
|
|
|
|
def test_workspace_create_seeds_directory_source_into_workspace(tmp_path: Path) -> None:
|
|
source_dir = tmp_path / "seed"
|
|
source_dir.mkdir()
|
|
(source_dir / "note.txt").write_text("hello\n", encoding="utf-8")
|
|
|
|
manager = VmManager(
|
|
backend_name="mock",
|
|
base_dir=tmp_path / "vms",
|
|
network_manager=TapNetworkManager(enabled=False),
|
|
)
|
|
|
|
created = manager.create_workspace(
|
|
environment="debian:12-base",
|
|
allow_host_compat=True,
|
|
seed_path=source_dir,
|
|
)
|
|
workspace_id = str(created["workspace_id"])
|
|
|
|
workspace_seed = created["workspace_seed"]
|
|
assert workspace_seed["mode"] == "directory"
|
|
assert workspace_seed["seed_path"] == str(source_dir.resolve())
|
|
executed = manager.exec_workspace(workspace_id, command="cat note.txt", timeout_seconds=30)
|
|
assert executed["stdout"] == "hello\n"
|
|
|
|
status = manager.status_workspace(workspace_id)
|
|
assert status["workspace_seed"]["mode"] == "directory"
|
|
assert status["workspace_seed"]["seed_path"] == str(source_dir.resolve())
|
|
|
|
|
|
def test_workspace_create_seeds_tar_archive_into_workspace(tmp_path: Path) -> None:
|
|
archive_path = tmp_path / "seed.tgz"
|
|
nested_dir = tmp_path / "src"
|
|
nested_dir.mkdir()
|
|
(nested_dir / "note.txt").write_text("archive\n", encoding="utf-8")
|
|
with tarfile.open(archive_path, "w:gz") as archive:
|
|
archive.add(nested_dir / "note.txt", arcname="note.txt")
|
|
|
|
manager = VmManager(
|
|
backend_name="mock",
|
|
base_dir=tmp_path / "vms",
|
|
network_manager=TapNetworkManager(enabled=False),
|
|
)
|
|
|
|
created = manager.create_workspace(
|
|
environment="debian:12-base",
|
|
allow_host_compat=True,
|
|
seed_path=archive_path,
|
|
)
|
|
workspace_id = str(created["workspace_id"])
|
|
|
|
assert created["workspace_seed"]["mode"] == "tar_archive"
|
|
executed = manager.exec_workspace(workspace_id, command="cat note.txt", timeout_seconds=30)
|
|
assert executed["stdout"] == "archive\n"
|
|
|
|
|
|
def test_workspace_sync_push_updates_started_workspace(tmp_path: Path) -> None:
|
|
source_dir = tmp_path / "seed"
|
|
source_dir.mkdir()
|
|
(source_dir / "note.txt").write_text("hello\n", encoding="utf-8")
|
|
update_dir = tmp_path / "update"
|
|
update_dir.mkdir()
|
|
(update_dir / "more.txt").write_text("more\n", encoding="utf-8")
|
|
|
|
manager = VmManager(
|
|
backend_name="mock",
|
|
base_dir=tmp_path / "vms",
|
|
network_manager=TapNetworkManager(enabled=False),
|
|
)
|
|
|
|
created = manager.create_workspace(
|
|
environment="debian:12-base",
|
|
allow_host_compat=True,
|
|
seed_path=source_dir,
|
|
)
|
|
workspace_id = str(created["workspace_id"])
|
|
synced = manager.push_workspace_sync(workspace_id, source_path=update_dir, dest="subdir")
|
|
|
|
assert synced["workspace_sync"]["mode"] == "directory"
|
|
assert synced["workspace_sync"]["destination"] == "/workspace/subdir"
|
|
|
|
executed = manager.exec_workspace(
|
|
workspace_id,
|
|
command="cat subdir/more.txt",
|
|
timeout_seconds=30,
|
|
)
|
|
assert executed["stdout"] == "more\n"
|
|
|
|
status = manager.status_workspace(workspace_id)
|
|
assert status["command_count"] == 1
|
|
assert status["workspace_seed"]["mode"] == "directory"
|
|
|
|
|
|
def test_workspace_sync_push_requires_started_workspace(tmp_path: Path) -> None:
|
|
source_dir = tmp_path / "seed"
|
|
source_dir.mkdir()
|
|
(source_dir / "note.txt").write_text("hello\n", encoding="utf-8")
|
|
update_dir = tmp_path / "update"
|
|
update_dir.mkdir()
|
|
(update_dir / "more.txt").write_text("more\n", encoding="utf-8")
|
|
|
|
manager = VmManager(
|
|
backend_name="mock",
|
|
base_dir=tmp_path / "vms",
|
|
network_manager=TapNetworkManager(enabled=False),
|
|
)
|
|
|
|
created = manager.create_workspace(
|
|
environment="debian:12-base",
|
|
allow_host_compat=True,
|
|
seed_path=source_dir,
|
|
)
|
|
workspace_id = str(created["workspace_id"])
|
|
workspace_path = tmp_path / "vms" / "workspaces" / workspace_id / "workspace.json"
|
|
payload = json.loads(workspace_path.read_text(encoding="utf-8"))
|
|
payload["state"] = "stopped"
|
|
workspace_path.write_text(json.dumps(payload, indent=2, sort_keys=True), encoding="utf-8")
|
|
|
|
with pytest.raises(
|
|
RuntimeError,
|
|
match="must be in 'started' state before workspace_sync_push",
|
|
):
|
|
manager.push_workspace_sync(workspace_id, source_path=update_dir)
|
|
|
|
|
|
def test_workspace_sync_push_rejects_destination_outside_workspace(tmp_path: Path) -> None:
|
|
source_dir = tmp_path / "seed"
|
|
source_dir.mkdir()
|
|
(source_dir / "note.txt").write_text("hello\n", encoding="utf-8")
|
|
|
|
manager = VmManager(
|
|
backend_name="mock",
|
|
base_dir=tmp_path / "vms",
|
|
network_manager=TapNetworkManager(enabled=False),
|
|
)
|
|
|
|
workspace_id = str(
|
|
manager.create_workspace(
|
|
environment="debian:12-base",
|
|
allow_host_compat=True,
|
|
)["workspace_id"]
|
|
)
|
|
|
|
with pytest.raises(ValueError, match="workspace destination must stay inside /workspace"):
|
|
manager.push_workspace_sync(workspace_id, source_path=source_dir, dest="../escape")
|
|
|
|
|
|
def test_workspace_metadata_list_update_and_last_activity(tmp_path: Path) -> None:
|
|
manager = VmManager(
|
|
backend_name="mock",
|
|
base_dir=tmp_path / "vms",
|
|
network_manager=TapNetworkManager(enabled=False),
|
|
)
|
|
|
|
created = manager.create_workspace(
|
|
environment="debian:12-base",
|
|
allow_host_compat=True,
|
|
name="repro-fix",
|
|
labels={"issue": "123", "owner": "codex"},
|
|
)
|
|
workspace_id = str(created["workspace_id"])
|
|
assert created["name"] == "repro-fix"
|
|
assert created["labels"] == {"issue": "123", "owner": "codex"}
|
|
created_activity = float(created["last_activity_at"])
|
|
|
|
listed = manager.list_workspaces()
|
|
assert listed["count"] == 1
|
|
assert listed["workspaces"][0]["name"] == "repro-fix"
|
|
assert listed["workspaces"][0]["labels"] == {"issue": "123", "owner": "codex"}
|
|
|
|
time.sleep(0.01)
|
|
updated = manager.update_workspace(
|
|
workspace_id,
|
|
name="retry-run",
|
|
labels={"issue": "124"},
|
|
clear_labels=["owner"],
|
|
)
|
|
assert updated["name"] == "retry-run"
|
|
assert updated["labels"] == {"issue": "124"}
|
|
updated_activity = float(updated["last_activity_at"])
|
|
assert updated_activity >= created_activity
|
|
|
|
status_before_exec = manager.status_workspace(workspace_id)
|
|
time.sleep(0.01)
|
|
manager.exec_workspace(workspace_id, command="printf 'ok\\n'", timeout_seconds=30)
|
|
status_after_exec = manager.status_workspace(workspace_id)
|
|
assert float(status_before_exec["last_activity_at"]) == updated_activity
|
|
assert float(status_after_exec["last_activity_at"]) > updated_activity
|
|
reset = manager.reset_workspace(workspace_id)
|
|
assert reset["name"] == "retry-run"
|
|
assert reset["labels"] == {"issue": "124"}
|
|
|
|
|
|
def test_workspace_list_loads_legacy_records_without_metadata_fields(tmp_path: Path) -> None:
|
|
manager = VmManager(
|
|
backend_name="mock",
|
|
base_dir=tmp_path / "vms",
|
|
network_manager=TapNetworkManager(enabled=False),
|
|
)
|
|
created = manager.create_workspace(
|
|
environment="debian:12-base",
|
|
allow_host_compat=True,
|
|
)
|
|
workspace_id = str(created["workspace_id"])
|
|
record_path = tmp_path / "vms" / "workspaces" / workspace_id / "workspace.json"
|
|
payload = json.loads(record_path.read_text(encoding="utf-8"))
|
|
payload.pop("name", None)
|
|
payload.pop("labels", None)
|
|
payload.pop("last_activity_at", None)
|
|
record_path.write_text(json.dumps(payload, indent=2, sort_keys=True), encoding="utf-8")
|
|
|
|
listed = manager.list_workspaces()
|
|
assert listed["count"] == 1
|
|
listed_workspace = listed["workspaces"][0]
|
|
assert listed_workspace["workspace_id"] == workspace_id
|
|
assert listed_workspace["name"] is None
|
|
assert listed_workspace["labels"] == {}
|
|
assert float(listed_workspace["last_activity_at"]) == float(created["created_at"])
|
|
|
|
|
|
def test_workspace_list_sorts_by_last_activity_and_skips_invalid_payload(tmp_path: Path) -> None:
|
|
manager = VmManager(
|
|
backend_name="mock",
|
|
base_dir=tmp_path / "vms",
|
|
network_manager=TapNetworkManager(enabled=False),
|
|
)
|
|
first = manager.create_workspace(
|
|
environment="debian:12-base",
|
|
allow_host_compat=True,
|
|
name="first",
|
|
)
|
|
second = manager.create_workspace(
|
|
environment="debian:12-base",
|
|
allow_host_compat=True,
|
|
name="second",
|
|
)
|
|
first_id = str(first["workspace_id"])
|
|
second_id = str(second["workspace_id"])
|
|
time.sleep(0.01)
|
|
manager.exec_workspace(second_id, command="printf 'ok\\n'", timeout_seconds=30)
|
|
|
|
invalid_dir = tmp_path / "vms" / "workspaces" / "invalid"
|
|
invalid_dir.mkdir(parents=True)
|
|
(invalid_dir / "workspace.json").write_text('"not-a-dict"', encoding="utf-8")
|
|
|
|
listed = manager.list_workspaces()
|
|
assert listed["count"] == 2
|
|
assert [item["workspace_id"] for item in listed["workspaces"]] == [second_id, first_id]
|
|
|
|
|
|
def test_workspace_update_clear_name_and_rejects_noop(tmp_path: Path) -> None:
|
|
manager = VmManager(
|
|
backend_name="mock",
|
|
base_dir=tmp_path / "vms",
|
|
network_manager=TapNetworkManager(enabled=False),
|
|
)
|
|
created = manager.create_workspace(
|
|
environment="debian:12-base",
|
|
allow_host_compat=True,
|
|
name="repro-fix",
|
|
labels={"issue": "123"},
|
|
)
|
|
workspace_id = str(created["workspace_id"])
|
|
|
|
cleared = manager.update_workspace(
|
|
workspace_id,
|
|
clear_name=True,
|
|
clear_labels=["issue"],
|
|
)
|
|
assert cleared["name"] is None
|
|
assert cleared["labels"] == {}
|
|
|
|
with pytest.raises(ValueError, match="workspace update requested no effective metadata change"):
|
|
manager.update_workspace(workspace_id, clear_name=True)
|
|
|
|
with pytest.raises(ValueError, match="name and clear_name cannot be used together"):
|
|
manager.update_workspace(workspace_id, name="retry-run", clear_name=True)
|
|
|
|
|
|
def test_workspace_export_rejects_empty_output_path(tmp_path: Path) -> None:
|
|
manager = VmManager(
|
|
backend_name="mock",
|
|
base_dir=tmp_path / "vms",
|
|
network_manager=TapNetworkManager(enabled=False),
|
|
)
|
|
created = manager.create_workspace(
|
|
environment="debian:12-base",
|
|
allow_host_compat=True,
|
|
)
|
|
|
|
with pytest.raises(ValueError, match="output_path must not be empty"):
|
|
manager.export_workspace(str(created["workspace_id"]), path=".", output_path=" ")
|
|
|
|
|
|
def test_workspace_diff_and_export_round_trip(tmp_path: Path) -> None:
|
|
seed_dir = tmp_path / "seed"
|
|
seed_dir.mkdir()
|
|
(seed_dir / "note.txt").write_text("hello\n", encoding="utf-8")
|
|
update_dir = tmp_path / "update"
|
|
update_dir.mkdir()
|
|
(update_dir / "note.txt").write_text("hello from sync\n", encoding="utf-8")
|
|
|
|
manager = VmManager(
|
|
backend_name="mock",
|
|
base_dir=tmp_path / "vms",
|
|
network_manager=TapNetworkManager(enabled=False),
|
|
)
|
|
|
|
workspace_id = str(
|
|
manager.create_workspace(
|
|
environment="debian:12-base",
|
|
allow_host_compat=True,
|
|
seed_path=seed_dir,
|
|
)["workspace_id"]
|
|
)
|
|
manager.push_workspace_sync(workspace_id, source_path=update_dir)
|
|
|
|
diff_payload = manager.diff_workspace(workspace_id)
|
|
assert diff_payload["workspace_id"] == workspace_id
|
|
assert diff_payload["changed"] is True
|
|
assert diff_payload["summary"]["modified"] == 1
|
|
assert diff_payload["summary"]["text_patched"] == 1
|
|
assert "-hello\n" in diff_payload["patch"]
|
|
assert "+hello from sync\n" in diff_payload["patch"]
|
|
|
|
output_path = tmp_path / "exported-note.txt"
|
|
export_payload = manager.export_workspace(
|
|
workspace_id,
|
|
path="note.txt",
|
|
output_path=output_path,
|
|
)
|
|
assert export_payload["workspace_id"] == workspace_id
|
|
assert export_payload["artifact_type"] == "file"
|
|
assert output_path.read_text(encoding="utf-8") == "hello from sync\n"
|
|
|
|
status = manager.status_workspace(workspace_id)
|
|
logs = manager.logs_workspace(workspace_id)
|
|
assert status["command_count"] == 0
|
|
assert logs["count"] == 0
|
|
|
|
|
|
def test_workspace_summary_synthesizes_current_session(tmp_path: Path) -> None:
|
|
seed_dir = tmp_path / "seed"
|
|
seed_dir.mkdir()
|
|
(seed_dir / "note.txt").write_text("hello\n", encoding="utf-8")
|
|
update_dir = tmp_path / "update"
|
|
update_dir.mkdir()
|
|
(update_dir / "more.txt").write_text("more\n", encoding="utf-8")
|
|
|
|
manager = VmManager(
|
|
backend_name="mock",
|
|
base_dir=tmp_path / "vms",
|
|
network_manager=TapNetworkManager(enabled=False),
|
|
)
|
|
|
|
workspace_id = str(
|
|
manager.create_workspace(
|
|
environment="debian:12-base",
|
|
allow_host_compat=True,
|
|
seed_path=seed_dir,
|
|
name="review-eval",
|
|
labels={"suite": "smoke"},
|
|
)["workspace_id"]
|
|
)
|
|
manager.push_workspace_sync(workspace_id, source_path=update_dir)
|
|
manager.write_workspace_file(workspace_id, "src/app.py", text="print('hello')\n")
|
|
manager.apply_workspace_patch(
|
|
workspace_id,
|
|
patch=(
|
|
"--- a/note.txt\n"
|
|
"+++ b/note.txt\n"
|
|
"@@ -1 +1 @@\n"
|
|
"-hello\n"
|
|
"+patched\n"
|
|
),
|
|
)
|
|
manager.exec_workspace(workspace_id, command="cat note.txt", timeout_seconds=30)
|
|
manager.create_snapshot(workspace_id, "checkpoint")
|
|
export_path = tmp_path / "exported-note.txt"
|
|
manager.export_workspace(workspace_id, "note.txt", output_path=export_path)
|
|
manager.start_service(
|
|
workspace_id,
|
|
"app",
|
|
command='sh -lc \'trap "exit 0" TERM; touch .ready; while true; do sleep 60; done\'',
|
|
readiness={"type": "file", "path": ".ready"},
|
|
)
|
|
manager.stop_service(workspace_id, "app")
|
|
|
|
summary = manager.summarize_workspace(workspace_id)
|
|
|
|
assert summary["workspace_id"] == workspace_id
|
|
assert summary["name"] == "review-eval"
|
|
assert summary["labels"] == {"suite": "smoke"}
|
|
assert summary["outcome"]["command_count"] == 1
|
|
assert summary["outcome"]["export_count"] == 1
|
|
assert summary["outcome"]["snapshot_count"] == 1
|
|
assert summary["commands"]["total"] == 1
|
|
assert summary["commands"]["recent"][0]["command"] == "cat note.txt"
|
|
assert [event["event_kind"] for event in summary["edits"]["recent"]] == [
|
|
"patch_apply",
|
|
"file_write",
|
|
"sync_push",
|
|
]
|
|
assert summary["changes"]["available"] is True
|
|
assert summary["changes"]["changed"] is True
|
|
assert summary["changes"]["summary"]["total"] == 4
|
|
assert summary["services"]["current"][0]["service_name"] == "app"
|
|
assert [event["event_kind"] for event in summary["services"]["recent"]] == [
|
|
"service_stop",
|
|
"service_start",
|
|
]
|
|
assert summary["artifacts"]["exports"][0]["workspace_path"] == "/workspace/note.txt"
|
|
assert summary["snapshots"]["named_count"] == 1
|
|
assert summary["snapshots"]["recent"][0]["snapshot_name"] == "checkpoint"
|
|
|
|
|
|
def test_workspace_summary_degrades_gracefully_for_stopped_and_legacy_workspaces(
|
|
tmp_path: Path,
|
|
) -> None:
|
|
seed_dir = tmp_path / "seed"
|
|
seed_dir.mkdir()
|
|
(seed_dir / "note.txt").write_text("hello\n", encoding="utf-8")
|
|
|
|
manager = VmManager(
|
|
backend_name="mock",
|
|
base_dir=tmp_path / "vms",
|
|
network_manager=TapNetworkManager(enabled=False),
|
|
)
|
|
|
|
stopped_workspace_id = str(
|
|
manager.create_workspace(
|
|
environment="debian:12-base",
|
|
allow_host_compat=True,
|
|
seed_path=seed_dir,
|
|
)["workspace_id"]
|
|
)
|
|
manager.exec_workspace(stopped_workspace_id, command="cat note.txt", timeout_seconds=30)
|
|
manager.stop_workspace(stopped_workspace_id)
|
|
stopped_summary = manager.summarize_workspace(stopped_workspace_id)
|
|
assert stopped_summary["commands"]["total"] == 1
|
|
assert stopped_summary["changes"]["available"] is False
|
|
assert "must be in 'started' state" in str(stopped_summary["changes"]["reason"])
|
|
|
|
legacy_workspace_id = str(
|
|
manager.create_workspace(
|
|
environment="debian:12-base",
|
|
allow_host_compat=True,
|
|
seed_path=seed_dir,
|
|
)["workspace_id"]
|
|
)
|
|
baseline_path = (
|
|
tmp_path / "vms" / "workspaces" / legacy_workspace_id / "baseline" / "workspace.tar"
|
|
)
|
|
baseline_path.unlink()
|
|
legacy_summary = manager.summarize_workspace(legacy_workspace_id)
|
|
assert legacy_summary["changes"]["available"] is False
|
|
assert "baseline snapshot" in str(legacy_summary["changes"]["reason"])
|
|
|
|
|
|
def test_workspace_file_ops_and_patch_round_trip(tmp_path: Path) -> None:
|
|
seed_dir = tmp_path / "seed"
|
|
seed_dir.mkdir()
|
|
src_dir = seed_dir / "src"
|
|
src_dir.mkdir()
|
|
(src_dir / "app.py").write_text('print("bug")\n', encoding="utf-8")
|
|
|
|
manager = VmManager(
|
|
backend_name="mock",
|
|
base_dir=tmp_path / "vms",
|
|
network_manager=TapNetworkManager(enabled=False),
|
|
)
|
|
|
|
workspace_id = str(
|
|
manager.create_workspace(
|
|
environment="debian:12-base",
|
|
allow_host_compat=True,
|
|
seed_path=seed_dir,
|
|
)["workspace_id"]
|
|
)
|
|
|
|
listing = manager.list_workspace_files(workspace_id, path="src", recursive=True)
|
|
assert listing["entries"] == [
|
|
{
|
|
"path": "/workspace/src/app.py",
|
|
"artifact_type": "file",
|
|
"size_bytes": 13,
|
|
"link_target": None,
|
|
}
|
|
]
|
|
|
|
status_before_read = manager.status_workspace(workspace_id)
|
|
read_payload = manager.read_workspace_file(workspace_id, "src/app.py")
|
|
assert read_payload["content"] == 'print("bug")\n'
|
|
status_after_read = manager.status_workspace(workspace_id)
|
|
assert float(status_after_read["last_activity_at"]) == float(
|
|
status_before_read["last_activity_at"]
|
|
)
|
|
|
|
written = manager.write_workspace_file(
|
|
workspace_id,
|
|
"src/generated/out.txt",
|
|
text="generated\n",
|
|
)
|
|
assert written["bytes_written"] == 10
|
|
|
|
patch_payload = manager.apply_workspace_patch(
|
|
workspace_id,
|
|
patch=(
|
|
"--- a/src/app.py\n"
|
|
"+++ b/src/app.py\n"
|
|
"@@ -1 +1 @@\n"
|
|
'-print("bug")\n'
|
|
'+print("fixed")\n'
|
|
"--- /dev/null\n"
|
|
"+++ b/src/new.py\n"
|
|
"@@ -0,0 +1 @@\n"
|
|
'+print("new")\n'
|
|
),
|
|
)
|
|
assert patch_payload["changed"] is True
|
|
assert patch_payload["summary"] == {
|
|
"total": 2,
|
|
"added": 1,
|
|
"modified": 1,
|
|
"deleted": 0,
|
|
}
|
|
|
|
executed = manager.exec_workspace(
|
|
workspace_id,
|
|
command="python3 src/app.py && cat src/new.py && cat src/generated/out.txt",
|
|
timeout_seconds=30,
|
|
)
|
|
assert executed["stdout"] == 'fixed\nprint("new")\ngenerated\n'
|
|
|
|
diff_payload = manager.diff_workspace(workspace_id)
|
|
assert diff_payload["changed"] is True
|
|
assert diff_payload["summary"]["added"] == 2
|
|
assert diff_payload["summary"]["modified"] == 1
|
|
|
|
output_path = tmp_path / "exported-app.py"
|
|
export_payload = manager.export_workspace(
|
|
workspace_id,
|
|
path="src/app.py",
|
|
output_path=output_path,
|
|
)
|
|
assert export_payload["artifact_type"] == "file"
|
|
assert output_path.read_text(encoding="utf-8") == 'print("fixed")\n'
|
|
|
|
|
|
def test_workspace_export_directory_uses_exact_output_path(tmp_path: Path) -> None:
|
|
seed_dir = tmp_path / "seed"
|
|
nested_dir = seed_dir / "src"
|
|
nested_dir.mkdir(parents=True)
|
|
(nested_dir / "note.txt").write_text("hello\n", encoding="utf-8")
|
|
|
|
manager = VmManager(
|
|
backend_name="mock",
|
|
base_dir=tmp_path / "vms",
|
|
network_manager=TapNetworkManager(enabled=False),
|
|
)
|
|
|
|
workspace_id = str(
|
|
manager.create_workspace(
|
|
environment="debian:12-base",
|
|
allow_host_compat=True,
|
|
seed_path=seed_dir,
|
|
)["workspace_id"]
|
|
)
|
|
|
|
output_dir = tmp_path / "exported-src"
|
|
payload = manager.export_workspace(workspace_id, path="src", output_path=output_dir)
|
|
assert payload["artifact_type"] == "directory"
|
|
assert (output_dir / "note.txt").read_text(encoding="utf-8") == "hello\n"
|
|
assert not (output_dir / "src").exists()
|
|
|
|
|
|
def test_workspace_diff_requires_create_time_baseline(tmp_path: Path) -> None:
|
|
manager = VmManager(
|
|
backend_name="mock",
|
|
base_dir=tmp_path / "vms",
|
|
network_manager=TapNetworkManager(enabled=False),
|
|
)
|
|
workspace_id = str(
|
|
manager.create_workspace(
|
|
environment="debian:12-base",
|
|
allow_host_compat=True,
|
|
)["workspace_id"]
|
|
)
|
|
baseline_path = tmp_path / "vms" / "workspaces" / workspace_id / "baseline" / "workspace.tar"
|
|
baseline_path.unlink()
|
|
|
|
with pytest.raises(RuntimeError, match="require[s]? a baseline snapshot"):
|
|
manager.diff_workspace(workspace_id)
|
|
|
|
|
|
def test_workspace_snapshots_and_reset_round_trip(tmp_path: Path) -> None:
|
|
seed_dir = tmp_path / "seed"
|
|
seed_dir.mkdir()
|
|
(seed_dir / "note.txt").write_text("seed\n", encoding="utf-8")
|
|
|
|
manager = VmManager(
|
|
backend_name="mock",
|
|
base_dir=tmp_path / "vms",
|
|
network_manager=TapNetworkManager(enabled=False),
|
|
)
|
|
|
|
workspace_id = str(
|
|
manager.create_workspace(
|
|
environment="debian:12-base",
|
|
allow_host_compat=True,
|
|
seed_path=seed_dir,
|
|
)["workspace_id"]
|
|
)
|
|
|
|
manager.exec_workspace(
|
|
workspace_id,
|
|
command="printf 'checkpoint\\n' > note.txt",
|
|
timeout_seconds=30,
|
|
)
|
|
created_snapshot = manager.create_snapshot(workspace_id, "checkpoint")
|
|
assert created_snapshot["snapshot"]["snapshot_name"] == "checkpoint"
|
|
|
|
listed = manager.list_snapshots(workspace_id)
|
|
assert listed["count"] == 2
|
|
assert [snapshot["snapshot_name"] for snapshot in listed["snapshots"]] == [
|
|
"baseline",
|
|
"checkpoint",
|
|
]
|
|
|
|
manager.exec_workspace(
|
|
workspace_id,
|
|
command="printf 'after\\n' > note.txt",
|
|
timeout_seconds=30,
|
|
)
|
|
manager.start_service(
|
|
workspace_id,
|
|
"app",
|
|
command="sh -lc 'touch .ready; while true; do sleep 60; done'",
|
|
readiness={"type": "file", "path": ".ready"},
|
|
)
|
|
|
|
reset_to_snapshot = manager.reset_workspace(workspace_id, snapshot="checkpoint")
|
|
assert reset_to_snapshot["workspace_reset"]["snapshot_name"] == "checkpoint"
|
|
assert reset_to_snapshot["reset_count"] == 1
|
|
assert reset_to_snapshot["last_command"] is None
|
|
assert reset_to_snapshot["command_count"] == 0
|
|
assert reset_to_snapshot["service_count"] == 0
|
|
assert reset_to_snapshot["running_service_count"] == 0
|
|
|
|
checkpoint_result = manager.exec_workspace(
|
|
workspace_id,
|
|
command="cat note.txt",
|
|
timeout_seconds=30,
|
|
)
|
|
assert checkpoint_result["stdout"] == "checkpoint\n"
|
|
logs_after_snapshot_reset = manager.logs_workspace(workspace_id)
|
|
assert logs_after_snapshot_reset["count"] == 1
|
|
|
|
reset_to_baseline = manager.reset_workspace(workspace_id)
|
|
assert reset_to_baseline["workspace_reset"]["snapshot_name"] == "baseline"
|
|
assert reset_to_baseline["reset_count"] == 2
|
|
assert reset_to_baseline["command_count"] == 0
|
|
assert reset_to_baseline["service_count"] == 0
|
|
assert manager.logs_workspace(workspace_id)["count"] == 0
|
|
|
|
baseline_result = manager.exec_workspace(
|
|
workspace_id,
|
|
command="cat note.txt",
|
|
timeout_seconds=30,
|
|
)
|
|
assert baseline_result["stdout"] == "seed\n"
|
|
diff_payload = manager.diff_workspace(workspace_id)
|
|
assert diff_payload["changed"] is False
|
|
|
|
deleted_snapshot = manager.delete_snapshot(workspace_id, "checkpoint")
|
|
assert deleted_snapshot["deleted"] is True
|
|
listed_after_delete = manager.list_snapshots(workspace_id)
|
|
assert [snapshot["snapshot_name"] for snapshot in listed_after_delete["snapshots"]] == [
|
|
"baseline"
|
|
]
|
|
|
|
|
|
def test_workspace_snapshot_and_reset_require_baseline(tmp_path: Path) -> None:
|
|
manager = VmManager(
|
|
backend_name="mock",
|
|
base_dir=tmp_path / "vms",
|
|
network_manager=TapNetworkManager(enabled=False),
|
|
)
|
|
workspace_id = str(
|
|
manager.create_workspace(
|
|
environment="debian:12-base",
|
|
allow_host_compat=True,
|
|
)["workspace_id"]
|
|
)
|
|
baseline_path = tmp_path / "vms" / "workspaces" / workspace_id / "baseline" / "workspace.tar"
|
|
baseline_path.unlink()
|
|
|
|
with pytest.raises(RuntimeError, match="require[s]? a baseline snapshot"):
|
|
manager.list_snapshots(workspace_id)
|
|
with pytest.raises(RuntimeError, match="require[s]? a baseline snapshot"):
|
|
manager.create_snapshot(workspace_id, "checkpoint")
|
|
with pytest.raises(RuntimeError, match="require[s]? a baseline snapshot"):
|
|
manager.delete_snapshot(workspace_id, "checkpoint")
|
|
with pytest.raises(RuntimeError, match="require[s]? a baseline snapshot"):
|
|
manager.reset_workspace(workspace_id)
|
|
|
|
|
|
def test_workspace_delete_baseline_snapshot_is_rejected(tmp_path: Path) -> None:
|
|
manager = VmManager(
|
|
backend_name="mock",
|
|
base_dir=tmp_path / "vms",
|
|
network_manager=TapNetworkManager(enabled=False),
|
|
)
|
|
workspace_id = str(
|
|
manager.create_workspace(
|
|
environment="debian:12-base",
|
|
allow_host_compat=True,
|
|
)["workspace_id"]
|
|
)
|
|
|
|
with pytest.raises(ValueError, match="cannot delete the baseline snapshot"):
|
|
manager.delete_snapshot(workspace_id, "baseline")
|
|
|
|
|
|
def test_workspace_reset_recreates_stopped_workspace(tmp_path: Path) -> None:
|
|
seed_dir = tmp_path / "seed"
|
|
seed_dir.mkdir()
|
|
(seed_dir / "note.txt").write_text("seed\n", encoding="utf-8")
|
|
|
|
manager = VmManager(
|
|
backend_name="mock",
|
|
base_dir=tmp_path / "vms",
|
|
network_manager=TapNetworkManager(enabled=False),
|
|
)
|
|
workspace_id = str(
|
|
manager.create_workspace(
|
|
environment="debian:12-base",
|
|
allow_host_compat=True,
|
|
seed_path=seed_dir,
|
|
)["workspace_id"]
|
|
)
|
|
|
|
with manager._lock: # noqa: SLF001
|
|
workspace = manager._load_workspace_locked(workspace_id) # noqa: SLF001
|
|
workspace.state = "stopped"
|
|
workspace.firecracker_pid = None
|
|
manager._save_workspace_locked(workspace) # noqa: SLF001
|
|
|
|
reset_payload = manager.reset_workspace(workspace_id)
|
|
|
|
assert reset_payload["state"] == "started"
|
|
assert reset_payload["workspace_reset"]["snapshot_name"] == "baseline"
|
|
result = manager.exec_workspace(workspace_id, command="cat note.txt", timeout_seconds=30)
|
|
assert result["stdout"] == "seed\n"
|
|
|
|
|
|
def test_workspace_reset_failure_leaves_workspace_stopped(tmp_path: Path) -> None:
|
|
seed_dir = tmp_path / "seed"
|
|
seed_dir.mkdir()
|
|
(seed_dir / "note.txt").write_text("seed\n", encoding="utf-8")
|
|
|
|
manager = VmManager(
|
|
backend_name="mock",
|
|
base_dir=tmp_path / "vms",
|
|
network_manager=TapNetworkManager(enabled=False),
|
|
)
|
|
workspace_id = str(
|
|
manager.create_workspace(
|
|
environment="debian:12-base",
|
|
allow_host_compat=True,
|
|
seed_path=seed_dir,
|
|
)["workspace_id"]
|
|
)
|
|
manager.create_snapshot(workspace_id, "checkpoint")
|
|
|
|
def _failing_import_archive(*args: Any, **kwargs: Any) -> dict[str, Any]:
|
|
del args, kwargs
|
|
raise RuntimeError("boom")
|
|
|
|
manager._backend.import_archive = _failing_import_archive # type: ignore[method-assign] # noqa: SLF001
|
|
|
|
with pytest.raises(RuntimeError, match="boom"):
|
|
manager.reset_workspace(workspace_id, snapshot="checkpoint")
|
|
|
|
with manager._lock: # noqa: SLF001
|
|
workspace = manager._load_workspace_locked(workspace_id) # noqa: SLF001
|
|
assert workspace.state == "stopped"
|
|
assert workspace.firecracker_pid is None
|
|
assert workspace.reset_count == 0
|
|
|
|
listed = manager.list_snapshots(workspace_id)
|
|
assert [snapshot["snapshot_name"] for snapshot in listed["snapshots"]] == [
|
|
"baseline",
|
|
"checkpoint",
|
|
]
|
|
|
|
|
|
def test_workspace_export_helpers_preserve_directory_symlinks(tmp_path: Path) -> None:
|
|
workspace_dir = tmp_path / "workspace"
|
|
workspace_dir.mkdir()
|
|
(workspace_dir / "note.txt").write_text("hello\n", encoding="utf-8")
|
|
os.symlink("note.txt", workspace_dir / "note-link")
|
|
(workspace_dir / "empty-dir").mkdir()
|
|
|
|
archive_path = tmp_path / "workspace-export.tar"
|
|
exported = vm_manager_module._prepare_workspace_export_archive( # noqa: SLF001
|
|
workspace_dir=workspace_dir,
|
|
workspace_path=".",
|
|
archive_path=archive_path,
|
|
)
|
|
|
|
assert exported.artifact_type == "directory"
|
|
|
|
output_dir = tmp_path / "output"
|
|
extracted = vm_manager_module._extract_workspace_export_archive( # noqa: SLF001
|
|
archive_path,
|
|
output_path=output_dir,
|
|
artifact_type="directory",
|
|
)
|
|
|
|
assert extracted["artifact_type"] == "directory"
|
|
assert (output_dir / "note.txt").read_text(encoding="utf-8") == "hello\n"
|
|
assert (output_dir / "note-link").is_symlink()
|
|
assert os.readlink(output_dir / "note-link") == "note.txt"
|
|
assert (output_dir / "empty-dir").is_dir()
|
|
|
|
|
|
def test_workspace_export_helpers_validate_missing_path_and_existing_output(tmp_path: Path) -> None:
|
|
workspace_dir = tmp_path / "workspace"
|
|
workspace_dir.mkdir()
|
|
(workspace_dir / "note.txt").write_text("hello\n", encoding="utf-8")
|
|
|
|
with pytest.raises(RuntimeError, match="workspace path does not exist"):
|
|
vm_manager_module._prepare_workspace_export_archive( # noqa: SLF001
|
|
workspace_dir=workspace_dir,
|
|
workspace_path="missing.txt",
|
|
archive_path=tmp_path / "missing.tar",
|
|
)
|
|
|
|
archive_path = tmp_path / "note-export.tar"
|
|
exported = vm_manager_module._prepare_workspace_export_archive( # noqa: SLF001
|
|
workspace_dir=workspace_dir,
|
|
workspace_path="note.txt",
|
|
archive_path=archive_path,
|
|
)
|
|
output_path = tmp_path / "note.txt"
|
|
output_path.write_text("already here\n", encoding="utf-8")
|
|
with pytest.raises(RuntimeError, match="output_path already exists"):
|
|
vm_manager_module._extract_workspace_export_archive( # noqa: SLF001
|
|
archive_path,
|
|
output_path=output_path,
|
|
artifact_type=exported.artifact_type,
|
|
)
|
|
|
|
|
|
def test_diff_workspace_trees_reports_empty_binary_symlink_and_type_changes(tmp_path: Path) -> None:
|
|
baseline_dir = tmp_path / "baseline"
|
|
current_dir = tmp_path / "current"
|
|
baseline_dir.mkdir()
|
|
current_dir.mkdir()
|
|
|
|
(baseline_dir / "modified.txt").write_text("before\n", encoding="utf-8")
|
|
(current_dir / "modified.txt").write_text("after\n", encoding="utf-8")
|
|
|
|
(baseline_dir / "deleted.txt").write_text("gone\n", encoding="utf-8")
|
|
(current_dir / "added.txt").write_text("new\n", encoding="utf-8")
|
|
|
|
(baseline_dir / "binary.bin").write_bytes(b"\x00before")
|
|
(current_dir / "binary.bin").write_bytes(b"\x00after")
|
|
|
|
os.symlink("link-target-old.txt", baseline_dir / "link")
|
|
os.symlink("link-target-new.txt", current_dir / "link")
|
|
|
|
(baseline_dir / "swap").mkdir()
|
|
(current_dir / "swap").write_text("type changed\n", encoding="utf-8")
|
|
|
|
(baseline_dir / "removed-empty").mkdir()
|
|
(current_dir / "added-empty").mkdir()
|
|
|
|
diff_payload = vm_manager_module._diff_workspace_trees( # noqa: SLF001
|
|
baseline_dir,
|
|
current_dir,
|
|
)
|
|
|
|
assert diff_payload["changed"] is True
|
|
assert diff_payload["summary"] == {
|
|
"total": 8,
|
|
"added": 2,
|
|
"modified": 3,
|
|
"deleted": 2,
|
|
"type_changed": 1,
|
|
"text_patched": 3,
|
|
"non_text": 5,
|
|
}
|
|
assert "--- a/modified.txt" in diff_payload["patch"]
|
|
assert "+++ b/modified.txt" in diff_payload["patch"]
|
|
assert "--- /dev/null" in diff_payload["patch"]
|
|
assert "+++ b/added.txt" in diff_payload["patch"]
|
|
assert "--- a/deleted.txt" in diff_payload["patch"]
|
|
assert "+++ /dev/null" in diff_payload["patch"]
|
|
entries = {entry["path"]: entry for entry in diff_payload["entries"]}
|
|
assert entries["binary.bin"]["text_patch"] is None
|
|
assert entries["link"]["artifact_type"] == "symlink"
|
|
assert entries["swap"]["artifact_type"] == "file"
|
|
assert entries["removed-empty"]["artifact_type"] == "directory"
|
|
assert entries["added-empty"]["artifact_type"] == "directory"
|
|
|
|
|
|
def test_diff_workspace_trees_unchanged_returns_empty_summary(tmp_path: Path) -> None:
|
|
baseline_dir = tmp_path / "baseline"
|
|
current_dir = tmp_path / "current"
|
|
baseline_dir.mkdir()
|
|
current_dir.mkdir()
|
|
(baseline_dir / "note.txt").write_text("same\n", encoding="utf-8")
|
|
(current_dir / "note.txt").write_text("same\n", encoding="utf-8")
|
|
|
|
diff_payload = vm_manager_module._diff_workspace_trees( # noqa: SLF001
|
|
baseline_dir,
|
|
current_dir,
|
|
)
|
|
|
|
assert diff_payload == {
|
|
"changed": False,
|
|
"summary": {
|
|
"total": 0,
|
|
"added": 0,
|
|
"modified": 0,
|
|
"deleted": 0,
|
|
"type_changed": 0,
|
|
"text_patched": 0,
|
|
"non_text": 0,
|
|
},
|
|
"entries": [],
|
|
"patch": "",
|
|
}
|
|
|
|
|
|
def test_workspace_shell_lifecycle_and_rehydration(tmp_path: Path) -> None:
|
|
manager = VmManager(
|
|
backend_name="mock",
|
|
base_dir=tmp_path / "vms",
|
|
network_manager=TapNetworkManager(enabled=False),
|
|
)
|
|
created = manager.create_workspace(
|
|
environment="debian:12-base",
|
|
allow_host_compat=True,
|
|
)
|
|
workspace_id = str(created["workspace_id"])
|
|
|
|
opened = manager.open_shell(workspace_id)
|
|
shell_id = str(opened["shell_id"])
|
|
assert opened["state"] == "running"
|
|
|
|
manager.write_shell(workspace_id, shell_id, input_text="pwd")
|
|
|
|
output = ""
|
|
deadline = time.time() + 5
|
|
while time.time() < deadline:
|
|
read = manager.read_shell(workspace_id, shell_id, cursor=0, max_chars=65536)
|
|
output = str(read["output"])
|
|
if "/workspace" in output:
|
|
break
|
|
time.sleep(0.05)
|
|
assert "/workspace" in output
|
|
|
|
manager_rehydrated = VmManager(
|
|
backend_name="mock",
|
|
base_dir=tmp_path / "vms",
|
|
network_manager=TapNetworkManager(enabled=False),
|
|
)
|
|
second_opened = manager_rehydrated.open_shell(workspace_id)
|
|
second_shell_id = str(second_opened["shell_id"])
|
|
assert second_shell_id != shell_id
|
|
|
|
manager_rehydrated.write_shell(workspace_id, second_shell_id, input_text="printf 'ok\\n'")
|
|
second_output = ""
|
|
deadline = time.time() + 5
|
|
while time.time() < deadline:
|
|
read = manager_rehydrated.read_shell(
|
|
workspace_id,
|
|
second_shell_id,
|
|
cursor=0,
|
|
max_chars=65536,
|
|
)
|
|
second_output = str(read["output"])
|
|
if "ok" in second_output:
|
|
break
|
|
time.sleep(0.05)
|
|
assert "ok" in second_output
|
|
|
|
logs = manager.logs_workspace(workspace_id)
|
|
assert logs["count"] == 0
|
|
|
|
closed = manager.close_shell(workspace_id, shell_id)
|
|
assert closed["closed"] is True
|
|
with pytest.raises(ValueError, match="does not exist"):
|
|
manager.read_shell(workspace_id, shell_id)
|
|
|
|
deleted = manager.delete_workspace(workspace_id)
|
|
assert deleted["deleted"] is True
|
|
with pytest.raises(ValueError, match="does not exist"):
|
|
manager_rehydrated.read_shell(workspace_id, second_shell_id)
|
|
|
|
|
|
def test_workspace_read_shell_plain_renders_control_sequences(
|
|
tmp_path: Path,
|
|
monkeypatch: pytest.MonkeyPatch,
|
|
) -> None:
|
|
manager = VmManager(
|
|
backend_name="mock",
|
|
base_dir=tmp_path / "vms",
|
|
network_manager=TapNetworkManager(enabled=False),
|
|
)
|
|
created = manager.create_workspace(
|
|
environment="debian:12-base",
|
|
allow_host_compat=True,
|
|
)
|
|
workspace_id = str(created["workspace_id"])
|
|
opened = manager.open_shell(workspace_id)
|
|
shell_id = str(opened["shell_id"])
|
|
|
|
monkeypatch.setattr(
|
|
manager._backend, # noqa: SLF001
|
|
"read_shell",
|
|
lambda *args, **kwargs: {
|
|
"shell_id": shell_id,
|
|
"cwd": "/workspace",
|
|
"cols": 120,
|
|
"rows": 30,
|
|
"state": "running",
|
|
"started_at": 1.0,
|
|
"ended_at": None,
|
|
"exit_code": None,
|
|
"execution_mode": "host_compat",
|
|
"cursor": 0,
|
|
"next_cursor": 15,
|
|
"output": "hello\r\x1b[2Kbye\n",
|
|
"truncated": False,
|
|
},
|
|
)
|
|
|
|
read = manager.read_shell(
|
|
workspace_id,
|
|
shell_id,
|
|
cursor=0,
|
|
max_chars=1024,
|
|
plain=True,
|
|
)
|
|
|
|
assert read["output"] == "bye\n"
|
|
assert read["plain"] is True
|
|
assert read["wait_for_idle_ms"] is None
|
|
|
|
|
|
def test_workspace_read_shell_wait_for_idle_batches_output(
|
|
tmp_path: Path,
|
|
monkeypatch: pytest.MonkeyPatch,
|
|
) -> None:
|
|
manager = VmManager(
|
|
backend_name="mock",
|
|
base_dir=tmp_path / "vms",
|
|
network_manager=TapNetworkManager(enabled=False),
|
|
)
|
|
created = manager.create_workspace(
|
|
environment="debian:12-base",
|
|
allow_host_compat=True,
|
|
)
|
|
workspace_id = str(created["workspace_id"])
|
|
opened = manager.open_shell(workspace_id)
|
|
shell_id = str(opened["shell_id"])
|
|
|
|
payloads = [
|
|
{
|
|
"shell_id": shell_id,
|
|
"cwd": "/workspace",
|
|
"cols": 120,
|
|
"rows": 30,
|
|
"state": "running",
|
|
"started_at": 1.0,
|
|
"ended_at": None,
|
|
"exit_code": None,
|
|
"execution_mode": "host_compat",
|
|
"cursor": 0,
|
|
"next_cursor": 4,
|
|
"output": "one\n",
|
|
"truncated": False,
|
|
},
|
|
{
|
|
"shell_id": shell_id,
|
|
"cwd": "/workspace",
|
|
"cols": 120,
|
|
"rows": 30,
|
|
"state": "running",
|
|
"started_at": 1.0,
|
|
"ended_at": None,
|
|
"exit_code": None,
|
|
"execution_mode": "host_compat",
|
|
"cursor": 4,
|
|
"next_cursor": 8,
|
|
"output": "two\n",
|
|
"truncated": False,
|
|
},
|
|
{
|
|
"shell_id": shell_id,
|
|
"cwd": "/workspace",
|
|
"cols": 120,
|
|
"rows": 30,
|
|
"state": "running",
|
|
"started_at": 1.0,
|
|
"ended_at": None,
|
|
"exit_code": None,
|
|
"execution_mode": "host_compat",
|
|
"cursor": 8,
|
|
"next_cursor": 8,
|
|
"output": "",
|
|
"truncated": False,
|
|
},
|
|
]
|
|
|
|
def fake_read_shell(*args: Any, **kwargs: Any) -> dict[str, Any]:
|
|
del args, kwargs
|
|
return payloads.pop(0)
|
|
|
|
monotonic_values = iter([0.0, 0.05, 0.10, 0.41])
|
|
monkeypatch.setattr(manager._backend, "read_shell", fake_read_shell) # noqa: SLF001
|
|
monkeypatch.setattr(time, "monotonic", lambda: next(monotonic_values))
|
|
monkeypatch.setattr(time, "sleep", lambda _: None)
|
|
|
|
read = manager.read_shell(
|
|
workspace_id,
|
|
shell_id,
|
|
cursor=0,
|
|
max_chars=1024,
|
|
wait_for_idle_ms=300,
|
|
)
|
|
|
|
assert read["output"] == "one\ntwo\n"
|
|
assert read["next_cursor"] == 8
|
|
assert read["wait_for_idle_ms"] == 300
|
|
assert read["plain"] is False
|
|
|
|
|
|
def test_workspace_create_rejects_unsafe_seed_archive(tmp_path: Path) -> None:
|
|
archive_path = tmp_path / "bad.tgz"
|
|
with tarfile.open(archive_path, "w:gz") as archive:
|
|
payload = b"bad\n"
|
|
info = tarfile.TarInfo(name="../escape.txt")
|
|
info.size = len(payload)
|
|
archive.addfile(info, io.BytesIO(payload))
|
|
|
|
manager = VmManager(
|
|
backend_name="mock",
|
|
base_dir=tmp_path / "vms",
|
|
network_manager=TapNetworkManager(enabled=False),
|
|
)
|
|
|
|
with pytest.raises(RuntimeError, match="unsafe archive member path"):
|
|
manager.create_workspace(
|
|
environment="debian:12-base",
|
|
allow_host_compat=True,
|
|
seed_path=archive_path,
|
|
)
|
|
assert list((tmp_path / "vms" / "workspaces").iterdir()) == []
|
|
|
|
|
|
def test_workspace_create_rejects_archive_that_writes_through_symlink(tmp_path: Path) -> None:
|
|
archive_path = tmp_path / "bad-symlink.tgz"
|
|
with tarfile.open(archive_path, "w:gz") as archive:
|
|
symlink_info = tarfile.TarInfo(name="linked")
|
|
symlink_info.type = tarfile.SYMTYPE
|
|
symlink_info.linkname = "outside"
|
|
archive.addfile(symlink_info)
|
|
|
|
payload = b"bad\n"
|
|
file_info = tarfile.TarInfo(name="linked/note.txt")
|
|
file_info.size = len(payload)
|
|
archive.addfile(file_info, io.BytesIO(payload))
|
|
|
|
manager = VmManager(
|
|
backend_name="mock",
|
|
base_dir=tmp_path / "vms",
|
|
network_manager=TapNetworkManager(enabled=False),
|
|
)
|
|
|
|
with pytest.raises(RuntimeError, match="traverse through a symlinked path"):
|
|
manager.create_workspace(
|
|
environment="debian:12-base",
|
|
allow_host_compat=True,
|
|
seed_path=archive_path,
|
|
)
|
|
|
|
|
|
def test_workspace_create_cleans_up_on_seed_failure(
|
|
tmp_path: Path, monkeypatch: pytest.MonkeyPatch
|
|
) -> None:
|
|
source_dir = tmp_path / "seed"
|
|
source_dir.mkdir()
|
|
(source_dir / "note.txt").write_text("hello\n", encoding="utf-8")
|
|
|
|
manager = VmManager(
|
|
backend_name="mock",
|
|
base_dir=tmp_path / "vms",
|
|
network_manager=TapNetworkManager(enabled=False),
|
|
)
|
|
|
|
def _boom(*args: Any, **kwargs: Any) -> dict[str, Any]:
|
|
del args, kwargs
|
|
raise RuntimeError("seed import failed")
|
|
|
|
monkeypatch.setattr(manager._backend, "import_archive", _boom) # noqa: SLF001
|
|
|
|
with pytest.raises(RuntimeError, match="seed import failed"):
|
|
manager.create_workspace(
|
|
environment="debian:12-base",
|
|
allow_host_compat=True,
|
|
seed_path=source_dir,
|
|
)
|
|
|
|
assert list((tmp_path / "vms" / "workspaces").iterdir()) == []
|
|
|
|
|
|
def test_workspace_rehydrates_across_manager_processes(tmp_path: Path) -> None:
|
|
base_dir = tmp_path / "vms"
|
|
manager = VmManager(
|
|
backend_name="mock",
|
|
base_dir=base_dir,
|
|
network_manager=TapNetworkManager(enabled=False),
|
|
)
|
|
workspace_id = str(
|
|
manager.create_workspace(
|
|
environment="debian:12-base",
|
|
allow_host_compat=True,
|
|
)["workspace_id"]
|
|
)
|
|
|
|
other = VmManager(
|
|
backend_name="mock",
|
|
base_dir=base_dir,
|
|
network_manager=TapNetworkManager(enabled=False),
|
|
)
|
|
executed = other.exec_workspace(workspace_id, command="printf 'ok\\n'", timeout_seconds=30)
|
|
assert executed["exit_code"] == 0
|
|
assert executed["stdout"] == "ok\n"
|
|
|
|
logs = other.logs_workspace(workspace_id)
|
|
assert logs["count"] == 1
|
|
|
|
|
|
def test_workspace_requires_started_state(tmp_path: Path) -> None:
|
|
manager = VmManager(
|
|
backend_name="mock",
|
|
base_dir=tmp_path / "vms",
|
|
network_manager=TapNetworkManager(enabled=False),
|
|
)
|
|
workspace_id = str(
|
|
manager.create_workspace(
|
|
environment="debian:12-base",
|
|
allow_host_compat=True,
|
|
)["workspace_id"]
|
|
)
|
|
workspace_path = tmp_path / "vms" / "workspaces" / workspace_id / "workspace.json"
|
|
payload = json.loads(workspace_path.read_text(encoding="utf-8"))
|
|
payload["state"] = "stopped"
|
|
workspace_path.write_text(json.dumps(payload, indent=2, sort_keys=True), encoding="utf-8")
|
|
|
|
with pytest.raises(RuntimeError, match="must be in 'started' state"):
|
|
manager.exec_workspace(workspace_id, command="true", timeout_seconds=30)
|
|
|
|
|
|
def test_vm_manager_firecracker_backend_path(
|
|
tmp_path: Path, monkeypatch: pytest.MonkeyPatch
|
|
) -> None:
|
|
class StubFirecrackerBackend:
|
|
def __init__(
|
|
self,
|
|
environment_store: Any,
|
|
firecracker_bin: Path,
|
|
jailer_bin: Path,
|
|
runtime_capabilities: Any,
|
|
network_manager: TapNetworkManager,
|
|
) -> None:
|
|
self.environment_store = environment_store
|
|
self.firecracker_bin = firecracker_bin
|
|
self.jailer_bin = jailer_bin
|
|
self.runtime_capabilities = runtime_capabilities
|
|
self.network_manager = network_manager
|
|
|
|
def create(self, instance: Any) -> None:
|
|
del instance
|
|
|
|
def start(self, instance: Any) -> None:
|
|
del instance
|
|
|
|
def exec(self, instance: Any, command: str, timeout_seconds: int) -> Any:
|
|
del instance, command, timeout_seconds
|
|
return None
|
|
|
|
def stop(self, instance: Any) -> None:
|
|
del instance
|
|
|
|
def delete(self, instance: Any) -> None:
|
|
del instance
|
|
|
|
monkeypatch.setattr(vm_manager_module, "FirecrackerBackend", StubFirecrackerBackend)
|
|
manager = VmManager(
|
|
backend_name="firecracker",
|
|
base_dir=tmp_path / "vms",
|
|
runtime_paths=resolve_runtime_paths(),
|
|
network_manager=TapNetworkManager(enabled=False),
|
|
)
|
|
assert manager._backend_name == "firecracker" # noqa: SLF001
|
|
|
|
|
|
def test_firecracker_backend_start_removes_stale_socket_files(
|
|
tmp_path: Path,
|
|
monkeypatch: pytest.MonkeyPatch,
|
|
) -> None:
|
|
backend = cast(Any, object.__new__(vm_manager_module.FirecrackerBackend))
|
|
backend._environment_store = object() # noqa: SLF001
|
|
backend._firecracker_bin = tmp_path / "firecracker" # noqa: SLF001
|
|
backend._jailer_bin = tmp_path / "jailer" # noqa: SLF001
|
|
backend._runtime_capabilities = RuntimeCapabilities( # noqa: SLF001
|
|
supports_vm_boot=True,
|
|
supports_guest_exec=True,
|
|
supports_guest_network=False,
|
|
reason=None,
|
|
)
|
|
backend._network_manager = TapNetworkManager(enabled=False) # noqa: SLF001
|
|
backend._guest_exec_client = None # noqa: SLF001
|
|
backend._processes = {} # noqa: SLF001
|
|
|
|
backend._firecracker_bin.write_text("fc", encoding="utf-8") # noqa: SLF001
|
|
backend._jailer_bin.write_text("jailer", encoding="utf-8") # noqa: SLF001
|
|
kernel_image = tmp_path / "vmlinux"
|
|
kernel_image.write_text("kernel", encoding="utf-8")
|
|
rootfs_image = tmp_path / "rootfs.ext4"
|
|
rootfs_image.write_bytes(b"rootfs")
|
|
|
|
workdir = tmp_path / "runtime"
|
|
workdir.mkdir()
|
|
firecracker_socket = workdir / "firecracker.sock"
|
|
vsock_socket = workdir / "vsock.sock"
|
|
firecracker_socket.write_text("stale firecracker socket", encoding="utf-8")
|
|
vsock_socket.write_text("stale vsock socket", encoding="utf-8")
|
|
|
|
class DummyPopen:
|
|
def __init__(self, *args: Any, **kwargs: Any) -> None:
|
|
del args, kwargs
|
|
self.pid = 4242
|
|
|
|
def poll(self) -> None:
|
|
return None
|
|
|
|
monkeypatch.setattr(
|
|
cast(Any, vm_manager_module).subprocess,
|
|
"run",
|
|
lambda *args, **kwargs: subprocess.CompletedProcess( # noqa: ARG005
|
|
args=args[0],
|
|
returncode=0,
|
|
stdout="Firecracker v1.0.0\n",
|
|
stderr="",
|
|
),
|
|
)
|
|
monkeypatch.setattr(cast(Any, vm_manager_module).subprocess, "Popen", DummyPopen)
|
|
|
|
instance = vm_manager_module.VmInstance(
|
|
vm_id="abcd1234",
|
|
environment="debian:12",
|
|
vcpu_count=1,
|
|
mem_mib=512,
|
|
ttl_seconds=600,
|
|
created_at=time.time(),
|
|
expires_at=time.time() + 600,
|
|
workdir=workdir,
|
|
metadata={
|
|
"kernel_image": str(kernel_image),
|
|
"rootfs_image": str(rootfs_image),
|
|
},
|
|
)
|
|
|
|
backend.start(instance)
|
|
|
|
assert instance.firecracker_pid == 4242
|
|
assert not firecracker_socket.exists()
|
|
assert not vsock_socket.exists()
|
|
|
|
|
|
def test_vm_manager_fails_closed_without_host_compat_opt_in(tmp_path: Path) -> None:
|
|
manager = VmManager(
|
|
backend_name="mock",
|
|
base_dir=tmp_path / "vms",
|
|
network_manager=TapNetworkManager(enabled=False),
|
|
)
|
|
vm_id = str(
|
|
manager.create_vm(
|
|
environment="debian:12-base",
|
|
ttl_seconds=600,
|
|
)["vm_id"]
|
|
)
|
|
|
|
with pytest.raises(RuntimeError, match="guest boot is unavailable"):
|
|
manager.start_vm(vm_id)
|
|
|
|
|
|
def test_vm_manager_uses_canonical_default_cache_dir(
|
|
monkeypatch: pytest.MonkeyPatch, tmp_path: Path
|
|
) -> None:
|
|
monkeypatch.setenv("PYRO_ENVIRONMENT_CACHE_DIR", str(tmp_path / "cache"))
|
|
manager = VmManager(
|
|
backend_name="mock",
|
|
base_dir=tmp_path / "vms",
|
|
network_manager=TapNetworkManager(enabled=False),
|
|
)
|
|
|
|
assert manager._environment_store.cache_dir == tmp_path / "cache" # noqa: SLF001
|
|
|
|
|
|
def test_vm_manager_helper_round_trips() -> None:
|
|
network = NetworkConfig(
|
|
vm_id="abc123",
|
|
tap_name="tap0",
|
|
guest_ip="172.29.1.2",
|
|
gateway_ip="172.29.1.1",
|
|
subnet_cidr="172.29.1.0/24",
|
|
mac_address="06:00:aa:bb:cc:dd",
|
|
dns_servers=("1.1.1.1", "8.8.8.8"),
|
|
)
|
|
|
|
assert vm_manager_module._optional_int(None) is None # noqa: SLF001
|
|
assert vm_manager_module._optional_int(True) == 1 # noqa: SLF001
|
|
assert vm_manager_module._optional_int(7) == 7 # noqa: SLF001
|
|
assert vm_manager_module._optional_int(7.2) == 7 # noqa: SLF001
|
|
assert vm_manager_module._optional_int("9") == 9 # noqa: SLF001
|
|
with pytest.raises(TypeError, match="integer-compatible"):
|
|
vm_manager_module._optional_int(object()) # noqa: SLF001
|
|
|
|
assert vm_manager_module._optional_str(None) is None # noqa: SLF001
|
|
assert vm_manager_module._optional_str(1) == "1" # noqa: SLF001
|
|
assert vm_manager_module._optional_dict(None) is None # noqa: SLF001
|
|
assert vm_manager_module._optional_dict({"x": 1}) == {"x": 1} # noqa: SLF001
|
|
with pytest.raises(TypeError, match="dictionary payload"):
|
|
vm_manager_module._optional_dict("bad") # noqa: SLF001
|
|
assert vm_manager_module._string_dict({"x": 1}) == {"x": "1"} # noqa: SLF001
|
|
assert vm_manager_module._string_dict("bad") == {} # noqa: SLF001
|
|
|
|
serialized = vm_manager_module._serialize_network(network) # noqa: SLF001
|
|
assert serialized is not None
|
|
restored = vm_manager_module._deserialize_network(serialized) # noqa: SLF001
|
|
assert restored == network
|
|
assert vm_manager_module._deserialize_network(None) is None # noqa: SLF001
|
|
with pytest.raises(TypeError, match="dictionary payload"):
|
|
vm_manager_module._deserialize_network("bad") # noqa: SLF001
|
|
|
|
assert vm_manager_module._wrap_guest_command("echo hi") == "echo hi" # noqa: SLF001
|
|
wrapped = vm_manager_module._wrap_guest_command("echo hi", cwd="/workspace") # noqa: SLF001
|
|
assert "cd /workspace" in wrapped
|
|
assert vm_manager_module._pid_is_running(None) is False # noqa: SLF001
|
|
|
|
|
|
def test_copy_rootfs_falls_back_to_copy2(
|
|
tmp_path: Path, monkeypatch: pytest.MonkeyPatch
|
|
) -> None:
|
|
source = tmp_path / "rootfs.ext4"
|
|
source.write_text("payload", encoding="utf-8")
|
|
dest = tmp_path / "dest" / "rootfs.ext4"
|
|
|
|
def _raise_oserror(*args: Any, **kwargs: Any) -> Any:
|
|
del args, kwargs
|
|
raise OSError("no cp")
|
|
|
|
monkeypatch.setattr(subprocess, "run", _raise_oserror)
|
|
|
|
clone_mode = vm_manager_module._copy_rootfs(source, dest) # noqa: SLF001
|
|
assert clone_mode == "copy2"
|
|
assert dest.read_text(encoding="utf-8") == "payload"
|
|
|
|
|
|
def test_workspace_create_cleans_up_on_start_failure(
|
|
tmp_path: Path, monkeypatch: pytest.MonkeyPatch
|
|
) -> None:
|
|
manager = VmManager(
|
|
backend_name="mock",
|
|
base_dir=tmp_path / "vms",
|
|
network_manager=TapNetworkManager(enabled=False),
|
|
)
|
|
|
|
def _boom(instance: Any) -> None:
|
|
del instance
|
|
raise RuntimeError("boom")
|
|
|
|
monkeypatch.setattr(manager._backend, "start", _boom) # noqa: SLF001
|
|
|
|
with pytest.raises(RuntimeError, match="boom"):
|
|
manager.create_workspace(environment="debian:12-base", allow_host_compat=True)
|
|
|
|
assert list((tmp_path / "vms" / "workspaces").iterdir()) == []
|
|
|
|
|
|
def test_exec_instance_wraps_guest_workspace_command(tmp_path: Path) -> None:
|
|
manager = VmManager(
|
|
backend_name="mock",
|
|
base_dir=tmp_path / "vms",
|
|
network_manager=TapNetworkManager(enabled=False),
|
|
)
|
|
manager._runtime_capabilities = RuntimeCapabilities( # noqa: SLF001
|
|
supports_vm_boot=True,
|
|
supports_guest_exec=True,
|
|
supports_guest_network=False,
|
|
reason=None,
|
|
)
|
|
captured: dict[str, Any] = {}
|
|
|
|
class StubBackend:
|
|
def exec(
|
|
self,
|
|
instance: Any,
|
|
command: str,
|
|
timeout_seconds: int,
|
|
*,
|
|
workdir: Path | None = None,
|
|
) -> vm_manager_module.VmExecResult:
|
|
del instance, timeout_seconds
|
|
captured["command"] = command
|
|
captured["workdir"] = workdir
|
|
return vm_manager_module.VmExecResult(
|
|
stdout="",
|
|
stderr="",
|
|
exit_code=0,
|
|
duration_ms=1,
|
|
)
|
|
|
|
manager._backend = StubBackend() # type: ignore[assignment] # noqa: SLF001
|
|
instance = vm_manager_module.VmInstance( # noqa: SLF001
|
|
vm_id="vm-123",
|
|
environment="debian:12-base",
|
|
vcpu_count=1,
|
|
mem_mib=512,
|
|
ttl_seconds=600,
|
|
created_at=time.time(),
|
|
expires_at=time.time() + 600,
|
|
workdir=tmp_path / "runtime",
|
|
state="started",
|
|
)
|
|
result, execution_mode = manager._exec_instance( # noqa: SLF001
|
|
instance,
|
|
command="echo hi",
|
|
timeout_seconds=30,
|
|
guest_cwd="/workspace",
|
|
)
|
|
assert result.exit_code == 0
|
|
assert execution_mode == "unknown"
|
|
assert "cd /workspace" in str(captured["command"])
|
|
assert captured["workdir"] is None
|
|
|
|
|
|
def test_status_workspace_marks_dead_backing_process_stopped(tmp_path: Path) -> None:
|
|
manager = VmManager(
|
|
backend_name="mock",
|
|
base_dir=tmp_path / "vms",
|
|
network_manager=TapNetworkManager(enabled=False),
|
|
)
|
|
workspace_id = str(
|
|
manager.create_workspace(
|
|
environment="debian:12-base",
|
|
allow_host_compat=True,
|
|
)["workspace_id"]
|
|
)
|
|
workspace_path = tmp_path / "vms" / "workspaces" / workspace_id / "workspace.json"
|
|
payload = json.loads(workspace_path.read_text(encoding="utf-8"))
|
|
payload["metadata"]["execution_mode"] = "guest_vsock"
|
|
payload["firecracker_pid"] = 999999
|
|
workspace_path.write_text(json.dumps(payload, indent=2, sort_keys=True), encoding="utf-8")
|
|
|
|
status = manager.status_workspace(workspace_id)
|
|
assert status["state"] == "stopped"
|
|
updated_payload = json.loads(workspace_path.read_text(encoding="utf-8"))
|
|
assert "backing guest process" in str(updated_payload.get("last_error", ""))
|
|
|
|
|
|
def test_reap_expired_workspaces_removes_invalid_and_expired_records(tmp_path: Path) -> None:
|
|
manager = VmManager(
|
|
backend_name="mock",
|
|
base_dir=tmp_path / "vms",
|
|
network_manager=TapNetworkManager(enabled=False),
|
|
)
|
|
invalid_dir = tmp_path / "vms" / "workspaces" / "invalid"
|
|
invalid_dir.mkdir(parents=True)
|
|
(invalid_dir / "workspace.json").write_text("[]", encoding="utf-8")
|
|
|
|
workspace_id = str(
|
|
manager.create_workspace(
|
|
environment="debian:12-base",
|
|
allow_host_compat=True,
|
|
)["workspace_id"]
|
|
)
|
|
workspace_path = tmp_path / "vms" / "workspaces" / workspace_id / "workspace.json"
|
|
payload = json.loads(workspace_path.read_text(encoding="utf-8"))
|
|
payload["expires_at"] = 0.0
|
|
workspace_path.write_text(json.dumps(payload, indent=2, sort_keys=True), encoding="utf-8")
|
|
|
|
with manager._lock: # noqa: SLF001
|
|
manager._reap_expired_workspaces_locked(time.time()) # noqa: SLF001
|
|
|
|
assert not invalid_dir.exists()
|
|
assert not (tmp_path / "vms" / "workspaces" / workspace_id).exists()
|
|
|
|
|
|
def test_workspace_service_lifecycle_and_status_counts(tmp_path: Path) -> None:
|
|
manager = VmManager(
|
|
backend_name="mock",
|
|
base_dir=tmp_path / "vms",
|
|
network_manager=TapNetworkManager(enabled=False),
|
|
)
|
|
workspace_id = str(
|
|
manager.create_workspace(
|
|
environment="debian:12-base",
|
|
allow_host_compat=True,
|
|
)["workspace_id"]
|
|
)
|
|
|
|
started = manager.start_service(
|
|
workspace_id,
|
|
"app",
|
|
command="sh -lc 'printf \"service ready\\n\"; touch .ready; while true; do sleep 60; done'",
|
|
readiness={"type": "file", "path": ".ready"},
|
|
)
|
|
assert started["state"] == "running"
|
|
|
|
listed = manager.list_services(workspace_id)
|
|
assert listed["count"] == 1
|
|
assert listed["running_count"] == 1
|
|
|
|
status = manager.status_service(workspace_id, "app")
|
|
assert status["state"] == "running"
|
|
assert status["ready_at"] is not None
|
|
|
|
logs = manager.logs_service(workspace_id, "app")
|
|
assert "service ready" in str(logs["stdout"])
|
|
|
|
workspace_status = manager.status_workspace(workspace_id)
|
|
assert workspace_status["service_count"] == 1
|
|
assert workspace_status["running_service_count"] == 1
|
|
|
|
stopped = manager.stop_service(workspace_id, "app")
|
|
assert stopped["state"] == "stopped"
|
|
assert stopped["stop_reason"] in {"sigterm", "sigkill"}
|
|
|
|
deleted = manager.delete_workspace(workspace_id)
|
|
assert deleted["deleted"] is True
|
|
|
|
|
|
def test_workspace_create_serializes_network_policy(tmp_path: Path) -> None:
|
|
manager = VmManager(
|
|
backend_name="mock",
|
|
base_dir=tmp_path / "vms",
|
|
network_manager=TapNetworkManager(enabled=False),
|
|
)
|
|
manager._runtime_capabilities = RuntimeCapabilities( # noqa: SLF001
|
|
supports_vm_boot=True,
|
|
supports_guest_exec=True,
|
|
supports_guest_network=True,
|
|
)
|
|
manager._ensure_workspace_guest_bootstrap_support = lambda instance: None # type: ignore[method-assign] # noqa: SLF001
|
|
|
|
created = manager.create_workspace(
|
|
environment="debian:12-base",
|
|
network_policy="egress",
|
|
)
|
|
|
|
assert created["network_policy"] == "egress"
|
|
workspace_id = str(created["workspace_id"])
|
|
workspace_path = tmp_path / "vms" / "workspaces" / workspace_id / "workspace.json"
|
|
payload = json.loads(workspace_path.read_text(encoding="utf-8"))
|
|
assert payload["network_policy"] == "egress"
|
|
|
|
|
|
def test_workspace_service_start_serializes_published_ports(
|
|
tmp_path: Path,
|
|
monkeypatch: pytest.MonkeyPatch,
|
|
) -> None:
|
|
manager = VmManager(
|
|
backend_name="mock",
|
|
base_dir=tmp_path / "vms",
|
|
network_manager=TapNetworkManager(enabled=False),
|
|
)
|
|
manager._runtime_capabilities = RuntimeCapabilities( # noqa: SLF001
|
|
supports_vm_boot=True,
|
|
supports_guest_exec=True,
|
|
supports_guest_network=True,
|
|
)
|
|
manager._ensure_workspace_guest_bootstrap_support = lambda instance: None # type: ignore[method-assign] # noqa: SLF001
|
|
created = manager.create_workspace(
|
|
environment="debian:12-base",
|
|
network_policy="egress+published-ports",
|
|
allow_host_compat=True,
|
|
)
|
|
workspace_id = str(created["workspace_id"])
|
|
|
|
workspace_path = tmp_path / "vms" / "workspaces" / workspace_id / "workspace.json"
|
|
payload = json.loads(workspace_path.read_text(encoding="utf-8"))
|
|
payload["network"] = {
|
|
"vm_id": workspace_id,
|
|
"tap_name": "tap-test0",
|
|
"guest_ip": "172.29.1.2",
|
|
"gateway_ip": "172.29.1.1",
|
|
"subnet_cidr": "172.29.1.0/30",
|
|
"mac_address": "06:00:ac:1d:01:02",
|
|
"dns_servers": ["1.1.1.1"],
|
|
}
|
|
workspace_path.write_text(json.dumps(payload, indent=2, sort_keys=True), encoding="utf-8")
|
|
|
|
monkeypatch.setattr(
|
|
manager,
|
|
"_start_workspace_service_published_ports",
|
|
lambda **kwargs: [
|
|
vm_manager_module.WorkspacePublishedPortRecord(
|
|
guest_port=8080,
|
|
host_port=18080,
|
|
host="127.0.0.1",
|
|
protocol="tcp",
|
|
proxy_pid=9999,
|
|
)
|
|
],
|
|
)
|
|
monkeypatch.setattr(
|
|
manager,
|
|
"_refresh_workspace_liveness_locked",
|
|
lambda workspace: None,
|
|
)
|
|
|
|
started = manager.start_service(
|
|
workspace_id,
|
|
"web",
|
|
command="sh -lc 'touch .ready && while true; do sleep 60; done'",
|
|
readiness={"type": "file", "path": ".ready"},
|
|
published_ports=[{"guest_port": 8080, "host_port": 18080}],
|
|
)
|
|
|
|
assert started["published_ports"] == [
|
|
{
|
|
"host": "127.0.0.1",
|
|
"host_port": 18080,
|
|
"guest_port": 8080,
|
|
"protocol": "tcp",
|
|
}
|
|
]
|
|
|
|
|
|
def test_workspace_service_start_rejects_published_ports_without_network_policy(
|
|
tmp_path: Path,
|
|
) -> None:
|
|
manager = VmManager(
|
|
backend_name="mock",
|
|
base_dir=tmp_path / "vms",
|
|
network_manager=TapNetworkManager(enabled=False),
|
|
)
|
|
workspace_id = str(
|
|
manager.create_workspace(
|
|
environment="debian:12-base",
|
|
allow_host_compat=True,
|
|
)["workspace_id"]
|
|
)
|
|
|
|
with pytest.raises(
|
|
RuntimeError,
|
|
match="published ports require workspace network_policy 'egress\\+published-ports'",
|
|
):
|
|
manager.start_service(
|
|
workspace_id,
|
|
"web",
|
|
command="sh -lc 'touch .ready && while true; do sleep 60; done'",
|
|
readiness={"type": "file", "path": ".ready"},
|
|
published_ports=[{"guest_port": 8080}],
|
|
)
|
|
|
|
|
|
def test_workspace_service_start_rejects_published_ports_without_active_network(
|
|
tmp_path: Path,
|
|
monkeypatch: pytest.MonkeyPatch,
|
|
) -> None:
|
|
manager = VmManager(
|
|
backend_name="mock",
|
|
base_dir=tmp_path / "vms",
|
|
network_manager=TapNetworkManager(enabled=False),
|
|
)
|
|
manager._runtime_capabilities = RuntimeCapabilities( # noqa: SLF001
|
|
supports_vm_boot=True,
|
|
supports_guest_exec=True,
|
|
supports_guest_network=True,
|
|
)
|
|
manager._ensure_workspace_guest_bootstrap_support = lambda instance: None # type: ignore[method-assign] # noqa: SLF001
|
|
monkeypatch.setattr(
|
|
manager,
|
|
"_refresh_workspace_liveness_locked",
|
|
lambda workspace: None,
|
|
)
|
|
workspace_id = str(
|
|
manager.create_workspace(
|
|
environment="debian:12-base",
|
|
network_policy="egress+published-ports",
|
|
allow_host_compat=True,
|
|
)["workspace_id"]
|
|
)
|
|
|
|
with pytest.raises(RuntimeError, match="published ports require an active guest network"):
|
|
manager.start_service(
|
|
workspace_id,
|
|
"web",
|
|
command="sh -lc 'touch .ready && while true; do sleep 60; done'",
|
|
readiness={"type": "file", "path": ".ready"},
|
|
published_ports=[{"guest_port": 8080}],
|
|
)
|
|
|
|
|
|
def test_workspace_service_start_published_port_failure_marks_service_failed(
|
|
tmp_path: Path,
|
|
monkeypatch: pytest.MonkeyPatch,
|
|
) -> None:
|
|
manager = VmManager(
|
|
backend_name="mock",
|
|
base_dir=tmp_path / "vms",
|
|
network_manager=TapNetworkManager(enabled=False),
|
|
)
|
|
manager._runtime_capabilities = RuntimeCapabilities( # noqa: SLF001
|
|
supports_vm_boot=True,
|
|
supports_guest_exec=True,
|
|
supports_guest_network=True,
|
|
)
|
|
manager._ensure_workspace_guest_bootstrap_support = lambda instance: None # type: ignore[method-assign] # noqa: SLF001
|
|
monkeypatch.setattr(
|
|
manager,
|
|
"_refresh_workspace_liveness_locked",
|
|
lambda workspace: None,
|
|
)
|
|
created = manager.create_workspace(
|
|
environment="debian:12-base",
|
|
network_policy="egress+published-ports",
|
|
allow_host_compat=True,
|
|
)
|
|
workspace_id = str(created["workspace_id"])
|
|
|
|
workspace_path = tmp_path / "vms" / "workspaces" / workspace_id / "workspace.json"
|
|
payload = json.loads(workspace_path.read_text(encoding="utf-8"))
|
|
payload["network"] = {
|
|
"vm_id": workspace_id,
|
|
"tap_name": "tap-test0",
|
|
"guest_ip": "172.29.1.2",
|
|
"gateway_ip": "172.29.1.1",
|
|
"subnet_cidr": "172.29.1.0/30",
|
|
"mac_address": "06:00:ac:1d:01:02",
|
|
"dns_servers": ["1.1.1.1"],
|
|
}
|
|
workspace_path.write_text(json.dumps(payload, indent=2, sort_keys=True), encoding="utf-8")
|
|
|
|
def _raise_proxy_failure(
|
|
**kwargs: object,
|
|
) -> list[vm_manager_module.WorkspacePublishedPortRecord]:
|
|
del kwargs
|
|
raise RuntimeError("proxy boom")
|
|
|
|
monkeypatch.setattr(manager, "_start_workspace_service_published_ports", _raise_proxy_failure)
|
|
|
|
started = manager.start_service(
|
|
workspace_id,
|
|
"web",
|
|
command="sh -lc 'touch .ready && while true; do sleep 60; done'",
|
|
readiness={"type": "file", "path": ".ready"},
|
|
published_ports=[{"guest_port": 8080, "host_port": 18080}],
|
|
)
|
|
|
|
assert started["state"] == "failed"
|
|
assert started["stop_reason"] == "published_port_failed"
|
|
assert started["published_ports"] == []
|
|
|
|
|
|
def test_workspace_service_cleanup_stops_published_port_proxies(
|
|
tmp_path: Path,
|
|
monkeypatch: pytest.MonkeyPatch,
|
|
) -> None:
|
|
manager = VmManager(
|
|
backend_name="mock",
|
|
base_dir=tmp_path / "vms",
|
|
network_manager=TapNetworkManager(enabled=False),
|
|
)
|
|
workspace_id = "workspace-cleanup"
|
|
service = vm_manager_module.WorkspaceServiceRecord(
|
|
workspace_id=workspace_id,
|
|
service_name="web",
|
|
command="sleep 60",
|
|
cwd="/workspace",
|
|
state="running",
|
|
pid=1234,
|
|
started_at=time.time(),
|
|
ended_at=None,
|
|
exit_code=None,
|
|
execution_mode="host_compat",
|
|
readiness=None,
|
|
ready_at=None,
|
|
stop_reason=None,
|
|
published_ports=[
|
|
vm_manager_module.WorkspacePublishedPortRecord(
|
|
guest_port=8080,
|
|
host_port=18080,
|
|
proxy_pid=9999,
|
|
)
|
|
],
|
|
)
|
|
manager._save_workspace_service_locked(service) # noqa: SLF001
|
|
stopped: list[int | None] = []
|
|
monkeypatch.setattr(
|
|
vm_manager_module,
|
|
"_stop_workspace_published_port_proxy",
|
|
lambda published_port: stopped.append(published_port.proxy_pid),
|
|
)
|
|
|
|
manager._delete_workspace_service_artifacts_locked(workspace_id, "web") # noqa: SLF001
|
|
|
|
assert stopped == [9999]
|
|
assert not manager._workspace_service_record_path(workspace_id, "web").exists() # noqa: SLF001
|
|
|
|
|
|
def test_workspace_refresh_workspace_service_counts_stops_published_ports_when_stopped(
|
|
tmp_path: Path,
|
|
monkeypatch: pytest.MonkeyPatch,
|
|
) -> None:
|
|
manager = VmManager(
|
|
backend_name="mock",
|
|
base_dir=tmp_path / "vms",
|
|
network_manager=TapNetworkManager(enabled=False),
|
|
)
|
|
workspace = vm_manager_module.WorkspaceRecord(
|
|
workspace_id="workspace-counts",
|
|
environment="debian:12-base",
|
|
vcpu_count=1,
|
|
mem_mib=1024,
|
|
ttl_seconds=600,
|
|
created_at=time.time(),
|
|
expires_at=time.time() + 600,
|
|
state="stopped",
|
|
firecracker_pid=None,
|
|
last_error=None,
|
|
allow_host_compat=True,
|
|
network_policy="off",
|
|
metadata={},
|
|
command_count=0,
|
|
last_command=None,
|
|
workspace_seed={
|
|
"mode": "empty",
|
|
"seed_path": None,
|
|
"destination": "/workspace",
|
|
"entry_count": 0,
|
|
"bytes_written": 0,
|
|
},
|
|
secrets=[],
|
|
reset_count=0,
|
|
last_reset_at=None,
|
|
)
|
|
service = vm_manager_module.WorkspaceServiceRecord(
|
|
workspace_id=workspace.workspace_id,
|
|
service_name="web",
|
|
command="sleep 60",
|
|
cwd="/workspace",
|
|
state="running",
|
|
pid=1234,
|
|
started_at=time.time(),
|
|
ended_at=None,
|
|
exit_code=None,
|
|
execution_mode="host_compat",
|
|
readiness=None,
|
|
ready_at=None,
|
|
stop_reason=None,
|
|
published_ports=[
|
|
vm_manager_module.WorkspacePublishedPortRecord(
|
|
guest_port=8080,
|
|
host_port=18080,
|
|
proxy_pid=9999,
|
|
)
|
|
],
|
|
)
|
|
manager._save_workspace_service_locked(service) # noqa: SLF001
|
|
stopped: list[int | None] = []
|
|
monkeypatch.setattr(
|
|
vm_manager_module,
|
|
"_stop_workspace_published_port_proxy",
|
|
lambda published_port: stopped.append(published_port.proxy_pid),
|
|
)
|
|
|
|
manager._refresh_workspace_service_counts_locked(workspace) # noqa: SLF001
|
|
|
|
assert stopped == [9999]
|
|
refreshed = manager._load_workspace_service_locked(workspace.workspace_id, "web") # noqa: SLF001
|
|
assert refreshed.state == "stopped"
|
|
assert refreshed.stop_reason == "workspace_stopped"
|
|
|
|
|
|
def test_workspace_published_port_proxy_helpers(
|
|
tmp_path: Path,
|
|
monkeypatch: pytest.MonkeyPatch,
|
|
) -> None:
|
|
services_dir = tmp_path / "services"
|
|
services_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
class StubProcess:
|
|
def __init__(self, pid: int, *, exited: bool = False) -> None:
|
|
self.pid = pid
|
|
self._exited = exited
|
|
|
|
def poll(self) -> int | None:
|
|
return 1 if self._exited else None
|
|
|
|
def _fake_popen(command: list[str], **kwargs: object) -> StubProcess:
|
|
del kwargs
|
|
ready_file = Path(command[command.index("--ready-file") + 1])
|
|
ready_file.write_text(
|
|
json.dumps(
|
|
{
|
|
"host": "127.0.0.1",
|
|
"host_port": 18080,
|
|
"target_host": "172.29.1.2",
|
|
"target_port": 8080,
|
|
"protocol": "tcp",
|
|
}
|
|
),
|
|
encoding="utf-8",
|
|
)
|
|
return StubProcess(4242)
|
|
|
|
monkeypatch.setattr(subprocess, "Popen", _fake_popen)
|
|
|
|
record = vm_manager_module._start_workspace_published_port_proxy( # noqa: SLF001
|
|
services_dir=services_dir,
|
|
service_name="web",
|
|
workspace_id="workspace-proxy",
|
|
guest_ip="172.29.1.2",
|
|
spec=vm_manager_module.WorkspacePublishedPortSpec(
|
|
guest_port=8080,
|
|
host_port=18080,
|
|
),
|
|
)
|
|
|
|
assert record.guest_port == 8080
|
|
assert record.host_port == 18080
|
|
assert record.proxy_pid == 4242
|
|
|
|
|
|
def test_workspace_published_port_proxy_timeout_and_stop(
|
|
tmp_path: Path,
|
|
monkeypatch: pytest.MonkeyPatch,
|
|
) -> None:
|
|
services_dir = tmp_path / "services"
|
|
services_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
class StubProcess:
|
|
pid = 4242
|
|
|
|
def poll(self) -> int | None:
|
|
return None
|
|
|
|
monkeypatch.setattr(subprocess, "Popen", lambda *args, **kwargs: StubProcess())
|
|
monotonic_values = iter([0.0, 0.0, 5.1])
|
|
monkeypatch.setattr(time, "monotonic", lambda: next(monotonic_values))
|
|
monkeypatch.setattr(time, "sleep", lambda _: None)
|
|
stopped: list[int | None] = []
|
|
monkeypatch.setattr(
|
|
vm_manager_module,
|
|
"_stop_workspace_published_port_proxy",
|
|
lambda published_port: stopped.append(published_port.proxy_pid),
|
|
)
|
|
|
|
with pytest.raises(RuntimeError, match="timed out waiting for published port proxy readiness"):
|
|
vm_manager_module._start_workspace_published_port_proxy( # noqa: SLF001
|
|
services_dir=services_dir,
|
|
service_name="web",
|
|
workspace_id="workspace-proxy",
|
|
guest_ip="172.29.1.2",
|
|
spec=vm_manager_module.WorkspacePublishedPortSpec(
|
|
guest_port=8080,
|
|
host_port=18080,
|
|
),
|
|
)
|
|
|
|
assert stopped == [4242]
|
|
|
|
|
|
def test_workspace_published_port_validation_and_stop_helper(
|
|
monkeypatch: pytest.MonkeyPatch,
|
|
) -> None:
|
|
spec = vm_manager_module._normalize_workspace_published_port( # noqa: SLF001
|
|
guest_port="8080",
|
|
host_port="18080",
|
|
)
|
|
assert spec.guest_port == 8080
|
|
assert spec.host_port == 18080
|
|
with pytest.raises(ValueError, match="published guest_port must be an integer"):
|
|
vm_manager_module._normalize_workspace_published_port(guest_port=object()) # noqa: SLF001
|
|
with pytest.raises(ValueError, match="published host_port must be between 1025 and 65535"):
|
|
vm_manager_module._normalize_workspace_published_port( # noqa: SLF001
|
|
guest_port=8080,
|
|
host_port=80,
|
|
)
|
|
|
|
signals: list[int] = []
|
|
monkeypatch.setattr(os, "killpg", lambda pid, sig: signals.append(sig))
|
|
running = iter([True, False])
|
|
monkeypatch.setattr(vm_manager_module, "_pid_is_running", lambda pid: next(running))
|
|
monkeypatch.setattr(time, "sleep", lambda _: None)
|
|
|
|
vm_manager_module._stop_workspace_published_port_proxy( # noqa: SLF001
|
|
vm_manager_module.WorkspacePublishedPortRecord(
|
|
guest_port=8080,
|
|
host_port=18080,
|
|
proxy_pid=9999,
|
|
)
|
|
)
|
|
|
|
assert signals == [signal.SIGTERM]
|
|
|
|
|
|
def test_workspace_network_policy_requires_guest_network_support(tmp_path: Path) -> None:
|
|
manager = VmManager(
|
|
backend_name="firecracker",
|
|
base_dir=tmp_path / "vms",
|
|
network_manager=TapNetworkManager(enabled=False),
|
|
)
|
|
manager._runtime_capabilities = RuntimeCapabilities( # noqa: SLF001
|
|
supports_vm_boot=False,
|
|
supports_guest_exec=False,
|
|
supports_guest_network=False,
|
|
reason="no guest network",
|
|
)
|
|
|
|
with pytest.raises(RuntimeError, match="workspace network_policy requires guest networking"):
|
|
manager._require_workspace_network_policy_support( # noqa: SLF001
|
|
network_policy="egress"
|
|
)
|
|
|
|
|
|
def test_prepare_workspace_seed_rejects_missing_and_invalid_paths(tmp_path: Path) -> None:
|
|
manager = VmManager(
|
|
backend_name="mock",
|
|
base_dir=tmp_path / "vms",
|
|
network_manager=TapNetworkManager(enabled=False),
|
|
)
|
|
|
|
with pytest.raises(ValueError, match="does not exist"):
|
|
manager._prepare_workspace_seed(tmp_path / "missing") # noqa: SLF001
|
|
|
|
invalid_source = tmp_path / "seed.txt"
|
|
invalid_source.write_text("seed", encoding="utf-8")
|
|
|
|
with pytest.raises(
|
|
ValueError,
|
|
match="seed_path must be a directory or a .tar/.tar.gz/.tgz archive",
|
|
):
|
|
manager._prepare_workspace_seed(invalid_source) # noqa: SLF001
|
|
|
|
|
|
def test_workspace_baseline_snapshot_requires_archive(tmp_path: Path) -> None:
|
|
manager = VmManager(
|
|
backend_name="mock",
|
|
base_dir=tmp_path / "vms",
|
|
network_manager=TapNetworkManager(enabled=False),
|
|
)
|
|
created = manager.create_workspace(
|
|
environment="debian:12",
|
|
vcpu_count=1,
|
|
mem_mib=512,
|
|
ttl_seconds=600,
|
|
allow_host_compat=True,
|
|
)
|
|
workspace_id = str(created["workspace_id"])
|
|
baseline_path = tmp_path / "vms" / "workspaces" / workspace_id / "baseline" / "workspace.tar"
|
|
baseline_path.unlink()
|
|
workspace = manager._load_workspace_locked(workspace_id) # noqa: SLF001
|
|
|
|
with pytest.raises(RuntimeError, match="baseline snapshot"):
|
|
manager._workspace_baseline_snapshot_locked(workspace) # noqa: SLF001
|
|
|
|
|
|
def test_workspace_snapshot_and_service_loaders_handle_invalid_payloads(tmp_path: Path) -> None:
|
|
manager = VmManager(
|
|
backend_name="mock",
|
|
base_dir=tmp_path / "vms",
|
|
network_manager=TapNetworkManager(enabled=False),
|
|
)
|
|
workspace_id = "workspace-invalid"
|
|
services_dir = tmp_path / "vms" / "workspaces" / workspace_id / "services"
|
|
snapshots_dir = tmp_path / "vms" / "workspaces" / workspace_id / "snapshots"
|
|
services_dir.mkdir(parents=True, exist_ok=True)
|
|
snapshots_dir.mkdir(parents=True, exist_ok=True)
|
|
(services_dir / "svc.json").write_text("[]", encoding="utf-8")
|
|
(snapshots_dir / "snap.json").write_text("[]", encoding="utf-8")
|
|
|
|
with pytest.raises(RuntimeError, match="service record"):
|
|
manager._load_workspace_service_locked(workspace_id, "svc") # noqa: SLF001
|
|
with pytest.raises(RuntimeError, match="snapshot record"):
|
|
manager._load_workspace_snapshot_locked(workspace_id, "snap") # noqa: SLF001
|
|
with pytest.raises(RuntimeError, match="snapshot record"):
|
|
manager._load_workspace_snapshot_locked_optional(workspace_id, "snap") # noqa: SLF001
|
|
assert manager._load_workspace_snapshot_locked_optional(workspace_id, "missing") is None # noqa: SLF001
|
|
|
|
|
|
def test_workspace_shell_helpers_handle_missing_invalid_and_close_errors(
|
|
tmp_path: Path,
|
|
monkeypatch: pytest.MonkeyPatch,
|
|
) -> None:
|
|
manager = VmManager(
|
|
backend_name="mock",
|
|
base_dir=tmp_path / "vms",
|
|
network_manager=TapNetworkManager(enabled=False),
|
|
)
|
|
created = manager.create_workspace(
|
|
environment="debian:12",
|
|
vcpu_count=1,
|
|
mem_mib=512,
|
|
ttl_seconds=600,
|
|
allow_host_compat=True,
|
|
)
|
|
workspace_id = str(created["workspace_id"])
|
|
|
|
assert manager._list_workspace_shells_locked(workspace_id) == [] # noqa: SLF001
|
|
|
|
shells_dir = tmp_path / "vms" / "workspaces" / workspace_id / "shells"
|
|
shells_dir.mkdir(parents=True, exist_ok=True)
|
|
(shells_dir / "invalid.json").write_text("[]", encoding="utf-8")
|
|
assert manager._list_workspace_shells_locked(workspace_id) == [] # noqa: SLF001
|
|
|
|
shell = vm_manager_module.WorkspaceShellRecord(
|
|
workspace_id=workspace_id,
|
|
shell_id="shell-1",
|
|
cwd="/workspace",
|
|
cols=120,
|
|
rows=30,
|
|
state="running",
|
|
started_at=time.time(),
|
|
)
|
|
manager._save_workspace_shell_locked(shell) # noqa: SLF001
|
|
workspace = manager._load_workspace_locked(workspace_id) # noqa: SLF001
|
|
instance = workspace.to_instance(
|
|
workdir=tmp_path / "vms" / "workspaces" / workspace_id / "runtime"
|
|
)
|
|
|
|
def _raise_close(**kwargs: object) -> dict[str, object]:
|
|
del kwargs
|
|
raise RuntimeError("shell close boom")
|
|
|
|
monkeypatch.setattr(manager._backend, "close_shell", _raise_close)
|
|
manager._close_workspace_shells_locked(workspace, instance) # noqa: SLF001
|
|
assert manager._list_workspace_shells_locked(workspace_id) == [] # noqa: SLF001
|
|
|
|
|
|
def test_workspace_refresh_service_helpers_cover_exit_and_started_refresh(
|
|
tmp_path: Path,
|
|
monkeypatch: pytest.MonkeyPatch,
|
|
) -> None:
|
|
manager = VmManager(
|
|
backend_name="mock",
|
|
base_dir=tmp_path / "vms",
|
|
network_manager=TapNetworkManager(enabled=False),
|
|
)
|
|
created = manager.create_workspace(
|
|
environment="debian:12",
|
|
vcpu_count=1,
|
|
mem_mib=512,
|
|
ttl_seconds=600,
|
|
allow_host_compat=True,
|
|
)
|
|
workspace_id = str(created["workspace_id"])
|
|
workspace_path = tmp_path / "vms" / "workspaces" / workspace_id / "workspace.json"
|
|
payload = json.loads(workspace_path.read_text(encoding="utf-8"))
|
|
payload["state"] = "started"
|
|
payload["network"] = {
|
|
"vm_id": workspace_id,
|
|
"tap_name": "tap-test0",
|
|
"guest_ip": "172.29.1.2",
|
|
"gateway_ip": "172.29.1.1",
|
|
"subnet_cidr": "172.29.1.0/30",
|
|
"mac_address": "06:00:ac:1d:01:02",
|
|
"dns_servers": ["1.1.1.1"],
|
|
}
|
|
workspace_path.write_text(json.dumps(payload, indent=2, sort_keys=True), encoding="utf-8")
|
|
workspace = manager._load_workspace_locked(workspace_id) # noqa: SLF001
|
|
instance = workspace.to_instance(
|
|
workdir=tmp_path / "vms" / "workspaces" / workspace_id / "runtime"
|
|
)
|
|
|
|
service = vm_manager_module.WorkspaceServiceRecord(
|
|
workspace_id=workspace_id,
|
|
service_name="web",
|
|
command="sleep 60",
|
|
cwd="/workspace",
|
|
state="running",
|
|
started_at=time.time(),
|
|
execution_mode="guest_vsock",
|
|
published_ports=[
|
|
vm_manager_module.WorkspacePublishedPortRecord(
|
|
guest_port=8080,
|
|
host_port=18080,
|
|
proxy_pid=9999,
|
|
)
|
|
],
|
|
)
|
|
manager._save_workspace_service_locked(service) # noqa: SLF001
|
|
stopped: list[int | None] = []
|
|
monkeypatch.setattr(
|
|
vm_manager_module,
|
|
"_stop_workspace_published_port_proxy",
|
|
lambda published_port: stopped.append(published_port.proxy_pid),
|
|
)
|
|
monkeypatch.setattr(
|
|
manager._backend,
|
|
"status_service",
|
|
lambda *args, **kwargs: {
|
|
"service_name": "web",
|
|
"command": "sleep 60",
|
|
"cwd": "/workspace",
|
|
"state": "exited",
|
|
"started_at": service.started_at,
|
|
"ended_at": service.started_at + 1,
|
|
"exit_code": 0,
|
|
"execution_mode": "guest_vsock",
|
|
},
|
|
)
|
|
|
|
refreshed = manager._refresh_workspace_service_locked( # noqa: SLF001
|
|
workspace,
|
|
instance,
|
|
service,
|
|
)
|
|
assert refreshed.state == "exited"
|
|
assert refreshed.published_ports == [
|
|
vm_manager_module.WorkspacePublishedPortRecord(
|
|
guest_port=8080,
|
|
host_port=18080,
|
|
proxy_pid=None,
|
|
)
|
|
]
|
|
assert stopped == [9999]
|
|
|
|
manager._save_workspace_service_locked(service) # noqa: SLF001
|
|
refreshed_calls: list[str] = []
|
|
monkeypatch.setattr(manager, "_require_workspace_service_support", lambda instance: None)
|
|
|
|
def _refresh_services(
|
|
workspace: vm_manager_module.WorkspaceRecord,
|
|
instance: vm_manager_module.VmInstance,
|
|
) -> list[vm_manager_module.WorkspaceServiceRecord]:
|
|
del instance
|
|
refreshed_calls.append(workspace.workspace_id)
|
|
return []
|
|
|
|
monkeypatch.setattr(
|
|
manager,
|
|
"_refresh_workspace_services_locked",
|
|
_refresh_services,
|
|
)
|
|
manager._refresh_workspace_service_counts_locked(workspace) # noqa: SLF001
|
|
assert refreshed_calls == [workspace_id]
|
|
|
|
|
|
def test_workspace_start_published_ports_cleans_up_partial_failure(
|
|
tmp_path: Path,
|
|
monkeypatch: pytest.MonkeyPatch,
|
|
) -> None:
|
|
manager = VmManager(
|
|
backend_name="mock",
|
|
base_dir=tmp_path / "vms",
|
|
network_manager=TapNetworkManager(enabled=False),
|
|
)
|
|
created = manager.create_workspace(
|
|
environment="debian:12",
|
|
vcpu_count=1,
|
|
mem_mib=512,
|
|
ttl_seconds=600,
|
|
allow_host_compat=True,
|
|
)
|
|
workspace = manager._load_workspace_locked(str(created["workspace_id"])) # noqa: SLF001
|
|
service = vm_manager_module.WorkspaceServiceRecord(
|
|
workspace_id=workspace.workspace_id,
|
|
service_name="web",
|
|
command="sleep 60",
|
|
cwd="/workspace",
|
|
state="running",
|
|
started_at=time.time(),
|
|
execution_mode="guest_vsock",
|
|
)
|
|
started_record = vm_manager_module.WorkspacePublishedPortRecord(
|
|
guest_port=8080,
|
|
host_port=18080,
|
|
proxy_pid=9999,
|
|
)
|
|
calls: list[int] = []
|
|
|
|
def _start_proxy(**kwargs: object) -> vm_manager_module.WorkspacePublishedPortRecord:
|
|
spec = cast(vm_manager_module.WorkspacePublishedPortSpec, kwargs["spec"])
|
|
if spec.guest_port == 8080:
|
|
return started_record
|
|
raise RuntimeError("proxy boom")
|
|
|
|
monkeypatch.setattr(vm_manager_module, "_start_workspace_published_port_proxy", _start_proxy)
|
|
monkeypatch.setattr(
|
|
vm_manager_module,
|
|
"_stop_workspace_published_port_proxy",
|
|
lambda published_port: calls.append(published_port.proxy_pid or -1),
|
|
)
|
|
|
|
with pytest.raises(RuntimeError, match="proxy boom"):
|
|
manager._start_workspace_service_published_ports( # noqa: SLF001
|
|
workspace=workspace,
|
|
service=service,
|
|
guest_ip="172.29.1.2",
|
|
published_ports=[
|
|
vm_manager_module.WorkspacePublishedPortSpec(guest_port=8080),
|
|
vm_manager_module.WorkspacePublishedPortSpec(guest_port=9090),
|
|
],
|
|
)
|
|
|
|
assert calls == [9999]
|
|
|
|
|
|
def test_workspace_service_start_replaces_non_running_record(tmp_path: Path) -> None:
|
|
manager = VmManager(
|
|
backend_name="mock",
|
|
base_dir=tmp_path / "vms",
|
|
network_manager=TapNetworkManager(enabled=False),
|
|
)
|
|
workspace_id = str(
|
|
manager.create_workspace(
|
|
environment="debian:12-base",
|
|
allow_host_compat=True,
|
|
)["workspace_id"]
|
|
)
|
|
|
|
failed = manager.start_service(
|
|
workspace_id,
|
|
"app",
|
|
command="sh -lc 'exit 2'",
|
|
readiness={"type": "file", "path": ".ready"},
|
|
ready_timeout_seconds=1,
|
|
ready_interval_ms=50,
|
|
)
|
|
assert failed["state"] == "failed"
|
|
|
|
started = manager.start_service(
|
|
workspace_id,
|
|
"app",
|
|
command="sh -lc 'touch .ready; while true; do sleep 60; done'",
|
|
readiness={"type": "file", "path": ".ready"},
|
|
)
|
|
assert started["state"] == "running"
|
|
manager.delete_workspace(workspace_id)
|
|
|
|
|
|
def test_workspace_service_supports_command_readiness_and_helper_probes(
|
|
tmp_path: Path, monkeypatch: pytest.MonkeyPatch
|
|
) -> None:
|
|
manager = VmManager(
|
|
backend_name="mock",
|
|
base_dir=tmp_path / "vms",
|
|
network_manager=TapNetworkManager(enabled=False),
|
|
)
|
|
workspace_id = str(
|
|
manager.create_workspace(
|
|
environment="debian:12-base",
|
|
allow_host_compat=True,
|
|
)["workspace_id"]
|
|
)
|
|
|
|
command_started = manager.start_service(
|
|
workspace_id,
|
|
"command-ready",
|
|
command="sh -lc 'touch command.ready; while true; do sleep 60; done'",
|
|
readiness={"type": "command", "command": "test -f command.ready"},
|
|
)
|
|
assert command_started["state"] == "running"
|
|
|
|
listed = manager.list_services(workspace_id)
|
|
assert listed["count"] == 1
|
|
assert listed["running_count"] == 1
|
|
|
|
status = manager.status_workspace(workspace_id)
|
|
assert status["service_count"] == 1
|
|
assert status["running_service_count"] == 1
|
|
|
|
assert manager.stop_service(workspace_id, "command-ready")["state"] == "stopped"
|
|
|
|
workspace_dir = tmp_path / "vms" / "workspaces" / workspace_id / "workspace"
|
|
ready_file = workspace_dir / "probe.ready"
|
|
ready_file.write_text("ok\n", encoding="utf-8")
|
|
assert vm_manager_module._service_ready_on_host( # noqa: SLF001
|
|
readiness={"type": "file", "path": "/workspace/probe.ready"},
|
|
workspace_dir=workspace_dir,
|
|
cwd=workspace_dir,
|
|
)
|
|
|
|
class StubSocket:
|
|
def __enter__(self) -> StubSocket:
|
|
return self
|
|
|
|
def __exit__(self, *args: object) -> None:
|
|
del args
|
|
|
|
def settimeout(self, timeout: int) -> None:
|
|
assert timeout == 1
|
|
|
|
def connect(self, address: tuple[str, int]) -> None:
|
|
assert address == ("127.0.0.1", 8080)
|
|
|
|
monkeypatch.setattr("pyro_mcp.vm_manager.socket.socket", lambda *args: StubSocket())
|
|
assert vm_manager_module._service_ready_on_host( # noqa: SLF001
|
|
readiness={"type": "tcp", "address": "127.0.0.1:8080"},
|
|
workspace_dir=workspace_dir,
|
|
cwd=workspace_dir,
|
|
)
|
|
|
|
class StubResponse:
|
|
status = 204
|
|
|
|
def __enter__(self) -> StubResponse:
|
|
return self
|
|
|
|
def __exit__(self, *args: object) -> None:
|
|
del args
|
|
|
|
def _urlopen(request: object, timeout: int) -> StubResponse:
|
|
del request
|
|
assert timeout == 2
|
|
return StubResponse()
|
|
|
|
monkeypatch.setattr("pyro_mcp.vm_manager.urllib.request.urlopen", _urlopen)
|
|
assert vm_manager_module._service_ready_on_host( # noqa: SLF001
|
|
readiness={"type": "http", "url": "http://127.0.0.1:8080/"},
|
|
workspace_dir=workspace_dir,
|
|
cwd=workspace_dir,
|
|
)
|
|
|
|
|
|
def test_workspace_service_logs_tail_and_delete_cleanup(tmp_path: Path) -> None:
|
|
manager = VmManager(
|
|
backend_name="mock",
|
|
base_dir=tmp_path / "vms",
|
|
network_manager=TapNetworkManager(enabled=False),
|
|
)
|
|
workspace_id = str(
|
|
manager.create_workspace(
|
|
environment="debian:12-base",
|
|
allow_host_compat=True,
|
|
)["workspace_id"]
|
|
)
|
|
|
|
manager.start_service(
|
|
workspace_id,
|
|
"logger",
|
|
command=(
|
|
"sh -lc 'printf \"one\\n\"; printf \"two\\n\"; "
|
|
"touch .ready; while true; do sleep 60; done'"
|
|
),
|
|
readiness={"type": "file", "path": ".ready"},
|
|
)
|
|
|
|
logs = manager.logs_service(workspace_id, "logger", tail_lines=1)
|
|
assert logs["stdout"] == "two\n"
|
|
assert logs["truncated"] is True
|
|
|
|
services_dir = tmp_path / "vms" / "workspaces" / workspace_id / "services"
|
|
assert services_dir.exists()
|
|
deleted = manager.delete_workspace(workspace_id)
|
|
assert deleted["deleted"] is True
|
|
assert not services_dir.exists()
|
|
|
|
|
|
def test_workspace_status_stops_service_counts_when_workspace_is_stopped(tmp_path: Path) -> None:
|
|
manager = VmManager(
|
|
backend_name="mock",
|
|
base_dir=tmp_path / "vms",
|
|
network_manager=TapNetworkManager(enabled=False),
|
|
)
|
|
workspace_id = str(
|
|
manager.create_workspace(
|
|
environment="debian:12-base",
|
|
allow_host_compat=True,
|
|
)["workspace_id"]
|
|
)
|
|
manager.start_service(
|
|
workspace_id,
|
|
"app",
|
|
command="sh -lc 'touch .ready; while true; do sleep 60; done'",
|
|
readiness={"type": "file", "path": ".ready"},
|
|
)
|
|
service_path = tmp_path / "vms" / "workspaces" / workspace_id / "services" / "app.json"
|
|
live_service_payload = json.loads(service_path.read_text(encoding="utf-8"))
|
|
live_pid = int(live_service_payload["pid"])
|
|
|
|
try:
|
|
workspace_path = tmp_path / "vms" / "workspaces" / workspace_id / "workspace.json"
|
|
payload = json.loads(workspace_path.read_text(encoding="utf-8"))
|
|
payload["state"] = "stopped"
|
|
workspace_path.write_text(json.dumps(payload, indent=2, sort_keys=True), encoding="utf-8")
|
|
|
|
status = manager.status_workspace(workspace_id)
|
|
assert status["state"] == "stopped"
|
|
assert status["service_count"] == 1
|
|
assert status["running_service_count"] == 0
|
|
|
|
service_payload = json.loads(service_path.read_text(encoding="utf-8"))
|
|
assert service_payload["state"] == "stopped"
|
|
assert service_payload["stop_reason"] == "workspace_stopped"
|
|
finally:
|
|
vm_manager_module._stop_process_group(live_pid) # noqa: SLF001
|
|
|
|
|
|
def test_workspace_service_readiness_validation_helpers() -> None:
|
|
assert vm_manager_module._normalize_workspace_service_name("app-1") == "app-1" # noqa: SLF001
|
|
with pytest.raises(ValueError, match="service_name must not be empty"):
|
|
vm_manager_module._normalize_workspace_service_name(" ") # noqa: SLF001
|
|
with pytest.raises(ValueError, match="service_name must match"):
|
|
vm_manager_module._normalize_workspace_service_name("bad name") # noqa: SLF001
|
|
|
|
assert vm_manager_module._normalize_workspace_service_readiness( # noqa: SLF001
|
|
{"type": "file", "path": "subdir/.ready"}
|
|
) == {"type": "file", "path": "/workspace/subdir/.ready"}
|
|
assert vm_manager_module._normalize_workspace_service_readiness( # noqa: SLF001
|
|
{"type": "tcp", "address": "127.0.0.1:8080"}
|
|
) == {"type": "tcp", "address": "127.0.0.1:8080"}
|
|
assert vm_manager_module._normalize_workspace_service_readiness( # noqa: SLF001
|
|
{"type": "http", "url": "http://127.0.0.1:8080/"}
|
|
) == {"type": "http", "url": "http://127.0.0.1:8080/"}
|
|
assert vm_manager_module._normalize_workspace_service_readiness( # noqa: SLF001
|
|
{"type": "command", "command": "test -f .ready"}
|
|
) == {"type": "command", "command": "test -f .ready"}
|
|
|
|
with pytest.raises(ValueError, match="one of: file, tcp, http, command"):
|
|
vm_manager_module._normalize_workspace_service_readiness({"type": "bogus"}) # noqa: SLF001
|
|
with pytest.raises(ValueError, match="required for file readiness"):
|
|
vm_manager_module._normalize_workspace_service_readiness({"type": "file"}) # noqa: SLF001
|
|
with pytest.raises(ValueError, match="HOST:PORT format"):
|
|
vm_manager_module._normalize_workspace_service_readiness( # noqa: SLF001
|
|
{"type": "tcp", "address": "127.0.0.1"}
|
|
)
|
|
with pytest.raises(ValueError, match="required for http readiness"):
|
|
vm_manager_module._normalize_workspace_service_readiness({"type": "http"}) # noqa: SLF001
|
|
with pytest.raises(ValueError, match="required for command readiness"):
|
|
vm_manager_module._normalize_workspace_service_readiness({"type": "command"}) # noqa: SLF001
|
|
|
|
|
|
def test_workspace_service_text_and_exit_code_helpers(tmp_path: Path) -> None:
|
|
status_path = tmp_path / "service.status"
|
|
assert vm_manager_module._read_service_exit_code(status_path) is None # noqa: SLF001
|
|
status_path.write_text("", encoding="utf-8")
|
|
assert vm_manager_module._read_service_exit_code(status_path) is None # noqa: SLF001
|
|
status_path.write_text("7\n", encoding="utf-8")
|
|
assert vm_manager_module._read_service_exit_code(status_path) == 7 # noqa: SLF001
|
|
|
|
log_path = tmp_path / "service.log"
|
|
assert vm_manager_module._tail_text(log_path, tail_lines=10) == ("", False) # noqa: SLF001
|
|
log_path.write_text("one\ntwo\nthree\n", encoding="utf-8")
|
|
assert vm_manager_module._tail_text(log_path, tail_lines=None) == ( # noqa: SLF001
|
|
"one\ntwo\nthree\n",
|
|
False,
|
|
)
|
|
assert vm_manager_module._tail_text(log_path, tail_lines=5) == ( # noqa: SLF001
|
|
"one\ntwo\nthree\n",
|
|
False,
|
|
)
|
|
assert vm_manager_module._tail_text(log_path, tail_lines=1) == ("three\n", True) # noqa: SLF001
|
|
|
|
|
|
def test_workspace_service_process_group_helpers(monkeypatch: pytest.MonkeyPatch) -> None:
|
|
def _missing(_pid: int, _signal: int) -> None:
|
|
raise ProcessLookupError()
|
|
|
|
monkeypatch.setattr("pyro_mcp.vm_manager.os.killpg", _missing)
|
|
assert vm_manager_module._stop_process_group(123) == (False, False) # noqa: SLF001
|
|
|
|
kill_calls: list[int] = []
|
|
monotonic_values = iter([0.0, 0.0, 5.0, 5.0, 10.0])
|
|
running_states = iter([True, True, False])
|
|
|
|
def _killpg(_pid: int, signum: int) -> None:
|
|
kill_calls.append(signum)
|
|
|
|
def _monotonic() -> float:
|
|
return next(monotonic_values)
|
|
|
|
def _is_running(_pid: int | None) -> bool:
|
|
return next(running_states)
|
|
|
|
monkeypatch.setattr("pyro_mcp.vm_manager.os.killpg", _killpg)
|
|
monkeypatch.setattr("pyro_mcp.vm_manager.time.monotonic", _monotonic)
|
|
monkeypatch.setattr("pyro_mcp.vm_manager.time.sleep", lambda _seconds: None)
|
|
monkeypatch.setattr("pyro_mcp.vm_manager._pid_is_running", _is_running)
|
|
|
|
stopped, killed = vm_manager_module._stop_process_group(456, wait_seconds=5) # noqa: SLF001
|
|
assert (stopped, killed) == (True, True)
|
|
assert kill_calls == [signal.SIGTERM, signal.SIGKILL]
|
|
|
|
|
|
def test_pid_is_running_treats_zombies_as_stopped(monkeypatch: pytest.MonkeyPatch) -> None:
|
|
monkeypatch.setattr(vm_manager_module, "_linux_process_state", lambda _pid: "Z")
|
|
assert vm_manager_module._pid_is_running(123) is False # noqa: SLF001
|
|
|
|
|
|
def test_workspace_service_probe_and_refresh_helpers(
|
|
tmp_path: Path, monkeypatch: pytest.MonkeyPatch
|
|
) -> None:
|
|
assert vm_manager_module._run_service_probe_command(tmp_path, "exit 3") == 3 # noqa: SLF001
|
|
|
|
services_dir = tmp_path / "services"
|
|
services_dir.mkdir()
|
|
status_path = services_dir / "app.status"
|
|
status_path.write_text("9\n", encoding="utf-8")
|
|
running = vm_manager_module.WorkspaceServiceRecord( # noqa: SLF001
|
|
workspace_id="workspace-1",
|
|
service_name="app",
|
|
command="sleep 60",
|
|
cwd="/workspace",
|
|
state="running",
|
|
started_at=time.time(),
|
|
readiness=None,
|
|
ready_at=None,
|
|
ended_at=None,
|
|
exit_code=None,
|
|
pid=1234,
|
|
execution_mode="host_compat",
|
|
stop_reason=None,
|
|
)
|
|
|
|
monkeypatch.setattr("pyro_mcp.vm_manager._pid_is_running", lambda _pid: False)
|
|
refreshed = vm_manager_module._refresh_local_service_record( # noqa: SLF001
|
|
running,
|
|
services_dir=services_dir,
|
|
)
|
|
assert refreshed.state == "exited"
|
|
assert refreshed.exit_code == 9
|
|
|
|
monkeypatch.setattr(
|
|
"pyro_mcp.vm_manager._stop_process_group",
|
|
lambda _pid: (True, False),
|
|
)
|
|
stopped = vm_manager_module._stop_local_service( # noqa: SLF001
|
|
refreshed,
|
|
services_dir=services_dir,
|
|
)
|
|
assert stopped.state == "stopped"
|
|
assert stopped.stop_reason == "sigterm"
|
|
|
|
|
|
def test_workspace_secrets_redact_exec_shell_service_and_survive_reset(tmp_path: Path) -> None:
|
|
manager = VmManager(
|
|
backend_name="mock",
|
|
base_dir=tmp_path / "vms",
|
|
network_manager=TapNetworkManager(enabled=False),
|
|
)
|
|
secret_file = tmp_path / "token.txt"
|
|
secret_file.write_text("from-file\n", encoding="utf-8")
|
|
|
|
created = manager.create_workspace(
|
|
environment="debian:12-base",
|
|
allow_host_compat=True,
|
|
secrets=[
|
|
{"name": "API_TOKEN", "value": "expected"},
|
|
{"name": "FILE_TOKEN", "file_path": str(secret_file)},
|
|
],
|
|
)
|
|
workspace_id = str(created["workspace_id"])
|
|
assert created["secrets"] == [
|
|
{"name": "API_TOKEN", "source_kind": "literal"},
|
|
{"name": "FILE_TOKEN", "source_kind": "file"},
|
|
]
|
|
|
|
no_secret = manager.exec_workspace(
|
|
workspace_id,
|
|
command='sh -lc \'printf "%s" "${API_TOKEN:-missing}"\'',
|
|
timeout_seconds=30,
|
|
)
|
|
assert no_secret["stdout"] == "missing"
|
|
|
|
executed = manager.exec_workspace(
|
|
workspace_id,
|
|
command='sh -lc \'printf "%s\\n" "$API_TOKEN"\'',
|
|
timeout_seconds=30,
|
|
secret_env={"API_TOKEN": "API_TOKEN"},
|
|
)
|
|
assert executed["stdout"] == "[REDACTED]\n"
|
|
|
|
logs = manager.logs_workspace(workspace_id)
|
|
assert logs["entries"][-1]["stdout"] == "[REDACTED]\n"
|
|
|
|
shell = manager.open_shell(
|
|
workspace_id,
|
|
secret_env={"API_TOKEN": "API_TOKEN"},
|
|
)
|
|
shell_id = str(shell["shell_id"])
|
|
manager.write_shell(workspace_id, shell_id, input_text='printf "%s\\n" "$API_TOKEN"')
|
|
output = ""
|
|
deadline = time.time() + 5
|
|
while time.time() < deadline:
|
|
read = manager.read_shell(workspace_id, shell_id, cursor=0, max_chars=65536)
|
|
output = str(read["output"])
|
|
if "[REDACTED]" in output:
|
|
break
|
|
time.sleep(0.05)
|
|
assert "[REDACTED]" in output
|
|
manager.close_shell(workspace_id, shell_id)
|
|
|
|
started = manager.start_service(
|
|
workspace_id,
|
|
"app",
|
|
command=(
|
|
'sh -lc \'trap "exit 0" TERM; printf "%s\\n" "$API_TOKEN" >&2; '
|
|
'touch .ready; while true; do sleep 60; done\''
|
|
),
|
|
readiness={"type": "file", "path": ".ready"},
|
|
secret_env={"API_TOKEN": "API_TOKEN"},
|
|
)
|
|
assert started["state"] == "running"
|
|
service_logs = manager.logs_service(workspace_id, "app", tail_lines=None)
|
|
assert "[REDACTED]" in str(service_logs["stderr"])
|
|
|
|
reset = manager.reset_workspace(workspace_id)
|
|
assert reset["secrets"] == created["secrets"]
|
|
|
|
after_reset = manager.exec_workspace(
|
|
workspace_id,
|
|
command='sh -lc \'printf "%s\\n" "$API_TOKEN"\'',
|
|
timeout_seconds=30,
|
|
secret_env={"API_TOKEN": "API_TOKEN"},
|
|
)
|
|
assert after_reset["stdout"] == "[REDACTED]\n"
|
|
|
|
|
|
def test_workspace_secret_validation_helpers(tmp_path: Path) -> None:
|
|
manager = VmManager(
|
|
backend_name="mock",
|
|
base_dir=tmp_path / "vms",
|
|
network_manager=TapNetworkManager(enabled=False),
|
|
)
|
|
assert vm_manager_module._normalize_workspace_secret_name("API_TOKEN") == "API_TOKEN" # noqa: SLF001
|
|
with pytest.raises(ValueError, match="secret name must match"):
|
|
vm_manager_module._normalize_workspace_secret_name("bad-name") # noqa: SLF001
|
|
with pytest.raises(ValueError, match="must not be empty"):
|
|
vm_manager_module._validate_workspace_secret_value("TOKEN", "") # noqa: SLF001
|
|
with pytest.raises(ValueError, match="duplicate secret name"):
|
|
manager.create_workspace(
|
|
environment="debian:12-base",
|
|
allow_host_compat=True,
|
|
secrets=[
|
|
{"name": "TOKEN", "value": "one"},
|
|
{"name": "TOKEN", "value": "two"},
|
|
],
|
|
)
|
|
|
|
|
|
def test_prepare_workspace_secrets_handles_file_inputs_and_validation_errors(
|
|
tmp_path: Path,
|
|
) -> None:
|
|
secrets_dir = tmp_path / "secrets"
|
|
valid_file = tmp_path / "token.txt"
|
|
valid_file.write_text("from-file\n", encoding="utf-8")
|
|
invalid_utf8 = tmp_path / "invalid.bin"
|
|
invalid_utf8.write_bytes(b"\xff\xfe")
|
|
oversized = tmp_path / "oversized.txt"
|
|
oversized.write_text(
|
|
"x" * (vm_manager_module.WORKSPACE_SECRET_MAX_BYTES + 1),
|
|
encoding="utf-8",
|
|
)
|
|
|
|
records, values = vm_manager_module._prepare_workspace_secrets( # noqa: SLF001
|
|
[
|
|
{"name": "B_TOKEN", "value": "literal"},
|
|
{"name": "A_TOKEN", "file_path": str(valid_file)},
|
|
],
|
|
secrets_dir=secrets_dir,
|
|
)
|
|
assert [record.name for record in records] == ["A_TOKEN", "B_TOKEN"]
|
|
assert values == {"A_TOKEN": "from-file\n", "B_TOKEN": "literal"}
|
|
assert (secrets_dir / "A_TOKEN.secret").read_text(encoding="utf-8") == "from-file\n"
|
|
assert oct(secrets_dir.stat().st_mode & 0o777) == "0o700"
|
|
assert oct((secrets_dir / "A_TOKEN.secret").stat().st_mode & 0o777) == "0o600"
|
|
|
|
with pytest.raises(ValueError, match="must be a dictionary"):
|
|
vm_manager_module._prepare_workspace_secrets( # noqa: SLF001
|
|
[cast(dict[str, str], "bad")],
|
|
secrets_dir=tmp_path / "bad1",
|
|
)
|
|
with pytest.raises(ValueError, match="missing 'name'"):
|
|
vm_manager_module._prepare_workspace_secrets([{}], secrets_dir=tmp_path / "bad2") # noqa: SLF001
|
|
with pytest.raises(ValueError, match="exactly one of 'value' or 'file_path'"):
|
|
vm_manager_module._prepare_workspace_secrets( # noqa: SLF001
|
|
[{"name": "TOKEN", "value": "x", "file_path": str(valid_file)}],
|
|
secrets_dir=tmp_path / "bad3",
|
|
)
|
|
with pytest.raises(ValueError, match="file_path must not be empty"):
|
|
vm_manager_module._prepare_workspace_secrets( # noqa: SLF001
|
|
[{"name": "TOKEN", "file_path": " "}],
|
|
secrets_dir=tmp_path / "bad4",
|
|
)
|
|
with pytest.raises(ValueError, match="does not exist"):
|
|
vm_manager_module._prepare_workspace_secrets( # noqa: SLF001
|
|
[{"name": "TOKEN", "file_path": str(tmp_path / "missing.txt")}],
|
|
secrets_dir=tmp_path / "bad5",
|
|
)
|
|
with pytest.raises(ValueError, match="must be valid UTF-8 text"):
|
|
vm_manager_module._prepare_workspace_secrets( # noqa: SLF001
|
|
[{"name": "TOKEN", "file_path": str(invalid_utf8)}],
|
|
secrets_dir=tmp_path / "bad6",
|
|
)
|
|
with pytest.raises(ValueError, match="must be at most"):
|
|
vm_manager_module._prepare_workspace_secrets( # noqa: SLF001
|
|
[{"name": "TOKEN", "file_path": str(oversized)}],
|
|
secrets_dir=tmp_path / "bad7",
|
|
)
|
|
|
|
|
|
def test_workspace_secrets_require_guest_exec_on_firecracker_runtime(
|
|
tmp_path: Path,
|
|
monkeypatch: pytest.MonkeyPatch,
|
|
) -> None:
|
|
class StubFirecrackerBackend:
|
|
def __init__(self, *args: Any, **kwargs: Any) -> None:
|
|
del args, kwargs
|
|
|
|
def create(self, instance: Any) -> None:
|
|
del instance
|
|
|
|
def start(self, instance: Any) -> None:
|
|
del instance
|
|
|
|
def stop(self, instance: Any) -> None:
|
|
del instance
|
|
|
|
def delete(self, instance: Any) -> None:
|
|
del instance
|
|
|
|
monkeypatch.setattr(vm_manager_module, "FirecrackerBackend", StubFirecrackerBackend)
|
|
manager = VmManager(
|
|
backend_name="firecracker",
|
|
base_dir=tmp_path / "vms",
|
|
runtime_paths=resolve_runtime_paths(),
|
|
network_manager=TapNetworkManager(enabled=False),
|
|
)
|
|
manager._runtime_capabilities = RuntimeCapabilities( # noqa: SLF001
|
|
supports_vm_boot=True,
|
|
supports_guest_exec=False,
|
|
supports_guest_network=False,
|
|
reason="guest exec is unavailable",
|
|
)
|
|
|
|
with pytest.raises(RuntimeError, match="workspace secrets require guest execution"):
|
|
manager.create_workspace(
|
|
environment="debian:12-base",
|
|
allow_host_compat=True,
|
|
secrets=[{"name": "TOKEN", "value": "expected"}],
|
|
)
|
|
|
|
|
|
def test_workspace_stop_and_start_preserve_logs_and_clear_live_state(tmp_path: Path) -> None:
|
|
manager = VmManager(
|
|
backend_name="mock",
|
|
base_dir=tmp_path / "vms",
|
|
network_manager=TapNetworkManager(enabled=False),
|
|
)
|
|
seed_dir = tmp_path / "seed"
|
|
seed_dir.mkdir()
|
|
(seed_dir / "note.txt").write_text("hello from seed\n", encoding="utf-8")
|
|
|
|
created = manager.create_workspace(
|
|
environment="debian:12-base",
|
|
allow_host_compat=True,
|
|
seed_path=seed_dir,
|
|
)
|
|
workspace_id = str(created["workspace_id"])
|
|
manager.exec_workspace(workspace_id, command="cat note.txt", timeout_seconds=30)
|
|
shell = manager.open_shell(workspace_id)
|
|
shell_id = str(shell["shell_id"])
|
|
started_service = manager.start_service(
|
|
workspace_id,
|
|
"app",
|
|
command='sh -lc \'touch .ready && trap "exit 0" TERM; while true; do sleep 60; done\'',
|
|
readiness={"type": "file", "path": ".ready"},
|
|
)
|
|
assert started_service["state"] == "running"
|
|
|
|
stopped = manager.stop_workspace(workspace_id)
|
|
assert stopped["state"] == "stopped"
|
|
assert stopped["command_count"] == 1
|
|
assert stopped["service_count"] == 0
|
|
assert stopped["running_service_count"] == 0
|
|
assert manager.logs_workspace(workspace_id)["count"] == 1
|
|
with pytest.raises(RuntimeError, match="must be in 'started' state"):
|
|
manager.read_shell(workspace_id, shell_id, cursor=0, max_chars=1024)
|
|
|
|
restarted = manager.start_workspace(workspace_id)
|
|
assert restarted["state"] == "started"
|
|
assert restarted["command_count"] == 1
|
|
assert restarted["service_count"] == 0
|
|
rerun = manager.exec_workspace(workspace_id, command="cat note.txt", timeout_seconds=30)
|
|
assert rerun["stdout"] == "hello from seed\n"
|
|
|
|
|
|
def test_workspace_read_shell_rejects_invalid_wait_for_idle_ms(tmp_path: Path) -> None:
|
|
manager = VmManager(
|
|
backend_name="mock",
|
|
base_dir=tmp_path / "vms",
|
|
network_manager=TapNetworkManager(enabled=False),
|
|
)
|
|
created = manager.create_workspace(
|
|
environment="debian:12-base",
|
|
allow_host_compat=True,
|
|
)
|
|
workspace_id = str(created["workspace_id"])
|
|
opened = manager.open_shell(workspace_id)
|
|
shell_id = str(opened["shell_id"])
|
|
|
|
with pytest.raises(ValueError, match="wait_for_idle_ms must be between 1 and 10000"):
|
|
manager.read_shell(workspace_id, shell_id, cursor=0, max_chars=1024, wait_for_idle_ms=0)
|
|
|
|
|
|
def test_workspace_stop_flushes_guest_filesystem_before_stopping(
|
|
tmp_path: Path,
|
|
) -> None:
|
|
manager = VmManager(
|
|
backend_name="mock",
|
|
base_dir=tmp_path / "vms",
|
|
network_manager=TapNetworkManager(enabled=False),
|
|
)
|
|
created = manager.create_workspace(
|
|
environment="debian:12-base",
|
|
allow_host_compat=True,
|
|
)
|
|
workspace_id = str(created["workspace_id"])
|
|
workspace_path = tmp_path / "vms" / "workspaces" / workspace_id / "workspace.json"
|
|
payload = json.loads(workspace_path.read_text(encoding="utf-8"))
|
|
payload["state"] = "started"
|
|
payload["firecracker_pid"] = os.getpid()
|
|
payload["metadata"]["execution_mode"] = "guest_vsock"
|
|
payload["metadata"]["rootfs_image"] = str(_create_stopped_workspace_rootfs(tmp_path))
|
|
workspace_path.write_text(json.dumps(payload, indent=2, sort_keys=True), encoding="utf-8")
|
|
|
|
calls: list[tuple[str, str]] = []
|
|
|
|
class StubBackend:
|
|
def exec(
|
|
self,
|
|
instance: Any,
|
|
command: str,
|
|
timeout_seconds: int,
|
|
*,
|
|
workdir: Path | None = None,
|
|
env: dict[str, str] | None = None,
|
|
) -> vm_manager_module.VmExecResult:
|
|
del instance, timeout_seconds, workdir, env
|
|
calls.append(("exec", command))
|
|
return vm_manager_module.VmExecResult(
|
|
stdout="",
|
|
stderr="",
|
|
exit_code=0,
|
|
duration_ms=1,
|
|
)
|
|
|
|
def stop(self, instance: Any) -> None:
|
|
del instance
|
|
calls.append(("stop", "instance"))
|
|
|
|
manager._backend = StubBackend() # type: ignore[assignment] # noqa: SLF001
|
|
manager._backend_name = "firecracker" # noqa: SLF001
|
|
manager._runtime_capabilities = RuntimeCapabilities( # noqa: SLF001
|
|
supports_vm_boot=True,
|
|
supports_guest_exec=True,
|
|
supports_guest_network=False,
|
|
reason=None,
|
|
)
|
|
|
|
stopped = manager.stop_workspace(workspace_id)
|
|
|
|
assert calls == [("exec", "sync"), ("stop", "instance")]
|
|
assert stopped["state"] == "stopped"
|
|
|
|
|
|
def test_workspace_disk_operations_scrub_runtime_only_paths_and_export(
|
|
tmp_path: Path,
|
|
) -> None:
|
|
manager = VmManager(
|
|
backend_name="mock",
|
|
base_dir=tmp_path / "vms",
|
|
network_manager=TapNetworkManager(enabled=False),
|
|
)
|
|
rootfs_image = _create_stopped_workspace_rootfs(tmp_path)
|
|
workspace_id = "workspace-disk-123"
|
|
workspace = vm_manager_module.WorkspaceRecord(
|
|
workspace_id=workspace_id,
|
|
environment="debian:12-base",
|
|
vcpu_count=1,
|
|
mem_mib=512,
|
|
ttl_seconds=600,
|
|
created_at=time.time(),
|
|
expires_at=time.time() + 600,
|
|
state="stopped",
|
|
network_policy="off",
|
|
allow_host_compat=False,
|
|
metadata={
|
|
"execution_mode": "guest_vsock",
|
|
"rootfs_image": str(rootfs_image),
|
|
"workspace_path": "/workspace",
|
|
},
|
|
)
|
|
manager._save_workspace_locked(workspace) # noqa: SLF001
|
|
|
|
listed = manager.list_workspace_disk(workspace_id, path="/workspace", recursive=True)
|
|
assert listed["path"] == "/workspace"
|
|
listed_paths = {entry["path"] for entry in listed["entries"]}
|
|
assert "/workspace/note.txt" in listed_paths
|
|
assert "/workspace/src/child.txt" in listed_paths
|
|
assert "/workspace/link" in listed_paths
|
|
|
|
read_payload = manager.read_workspace_disk(workspace_id, path="note.txt", max_bytes=4096)
|
|
assert read_payload["content"] == "hello from disk\n"
|
|
assert read_payload["truncated"] is False
|
|
|
|
run_listing = manager.list_workspace_disk(workspace_id, path="/run", recursive=True)
|
|
run_paths = {entry["path"] for entry in run_listing["entries"]}
|
|
assert "/run/pyro-secrets" not in run_paths
|
|
assert "/run/pyro-services" not in run_paths
|
|
|
|
exported_path = tmp_path / "workspace-copy.ext4"
|
|
exported = manager.export_workspace_disk(workspace_id, output_path=exported_path)
|
|
assert exported["disk_format"] == "ext4"
|
|
assert exported_path.exists()
|
|
assert exported_path.stat().st_size == int(exported["bytes_written"])
|
|
|
|
|
|
def test_workspace_disk_operations_reject_host_compat_workspaces(tmp_path: Path) -> None:
|
|
manager = VmManager(
|
|
backend_name="mock",
|
|
base_dir=tmp_path / "vms",
|
|
network_manager=TapNetworkManager(enabled=False),
|
|
)
|
|
created = manager.create_workspace(
|
|
environment="debian:12-base",
|
|
allow_host_compat=True,
|
|
)
|
|
workspace_id = str(created["workspace_id"])
|
|
manager.stop_workspace(workspace_id)
|
|
|
|
with pytest.raises(RuntimeError, match="host_compat workspaces"):
|
|
manager.export_workspace_disk(workspace_id, output_path=tmp_path / "workspace.ext4")
|
|
with pytest.raises(RuntimeError, match="host_compat workspaces"):
|
|
manager.list_workspace_disk(workspace_id)
|
|
with pytest.raises(RuntimeError, match="host_compat workspaces"):
|
|
manager.read_workspace_disk(workspace_id, path="note.txt")
|