Add stopped-workspace disk export and inspection

Finish the 3.1.0 secondary disk-tools milestone so stable workspaces can be
stopped, inspected offline, exported as raw ext4 images, and started again
without changing the primary workspace-first interaction model.

Add workspace stop/start plus workspace disk export/list/read across the CLI,
SDK, and MCP, backed by a new offline debugfs inspection helper and guest-only
validation. Scrub runtime-only guest state before disk inspection/export, and
fix the real guest reliability gaps by flushing the filesystem on stop and
removing stale Firecracker socket files before restart.

Update the docs, examples, changelog, and roadmap to mark 3.1.0 done, and
cover the new lifecycle/disk paths with API, CLI, manager, contract, and
package-surface tests.

Validation: uv lock; UV_CACHE_DIR=.uv-cache make check; UV_CACHE_DIR=.uv-cache
make dist-check; real guest-backed smoke for create, shell/service activity,
stop, workspace disk list/read/export, start, exec, and delete.
This commit is contained in:
Thales Maciel 2026-03-12 20:57:16 -03:00
parent f2d20ef30a
commit 287f6d100f
26 changed files with 2585 additions and 34 deletions

View file

@ -50,9 +50,14 @@ def test_pyro_create_server_registers_vm_run(tmp_path: Path) -> None:
assert "vm_run" in tool_names
assert "vm_create" in tool_names
assert "workspace_create" in tool_names
assert "workspace_start" in tool_names
assert "workspace_stop" in tool_names
assert "workspace_diff" in tool_names
assert "workspace_sync_push" in tool_names
assert "workspace_export" in tool_names
assert "workspace_disk_export" in tool_names
assert "workspace_disk_list" in tool_names
assert "workspace_disk_read" in tool_names
assert "snapshot_create" in tool_names
assert "snapshot_list" in tool_names
assert "snapshot_delete" in tool_names
@ -289,3 +294,603 @@ def test_pyro_workspace_methods_delegate_to_manager(tmp_path: Path) -> None:
assert status["service_count"] == 0
assert logs["count"] == 0
assert deleted["deleted"] is True
def test_pyro_workspace_disk_methods_delegate_to_manager() -> None:
calls: list[tuple[str, dict[str, Any]]] = []
class StubManager:
def stop_workspace(self, workspace_id: str) -> dict[str, Any]:
calls.append(("stop_workspace", {"workspace_id": workspace_id}))
return {"workspace_id": workspace_id, "state": "stopped"}
def start_workspace(self, workspace_id: str) -> dict[str, Any]:
calls.append(("start_workspace", {"workspace_id": workspace_id}))
return {"workspace_id": workspace_id, "state": "started"}
def export_workspace_disk(self, workspace_id: str, *, output_path: Path) -> dict[str, Any]:
calls.append(
(
"export_workspace_disk",
{"workspace_id": workspace_id, "output_path": str(output_path)},
)
)
return {"workspace_id": workspace_id, "output_path": str(output_path)}
def list_workspace_disk(
self,
workspace_id: str,
*,
path: str = "/workspace",
recursive: bool = False,
) -> dict[str, Any]:
calls.append(
(
"list_workspace_disk",
{
"workspace_id": workspace_id,
"path": path,
"recursive": recursive,
},
)
)
return {"workspace_id": workspace_id, "entries": []}
def read_workspace_disk(
self,
workspace_id: str,
*,
path: str,
max_bytes: int = 65536,
) -> dict[str, Any]:
calls.append(
(
"read_workspace_disk",
{
"workspace_id": workspace_id,
"path": path,
"max_bytes": max_bytes,
},
)
)
return {"workspace_id": workspace_id, "content": ""}
pyro = Pyro(manager=cast(Any, StubManager()))
stopped = pyro.stop_workspace("workspace-123")
started = pyro.start_workspace("workspace-123")
exported = pyro.export_workspace_disk("workspace-123", output_path=Path("/tmp/workspace.ext4"))
listed = pyro.list_workspace_disk("workspace-123", path="/workspace/src", recursive=True)
read = pyro.read_workspace_disk("workspace-123", "note.txt", max_bytes=4096)
assert stopped["state"] == "stopped"
assert started["state"] == "started"
assert exported["output_path"] == "/tmp/workspace.ext4"
assert listed["entries"] == []
assert read["content"] == ""
assert calls == [
("stop_workspace", {"workspace_id": "workspace-123"}),
("start_workspace", {"workspace_id": "workspace-123"}),
(
"export_workspace_disk",
{
"workspace_id": "workspace-123",
"output_path": "/tmp/workspace.ext4",
},
),
(
"list_workspace_disk",
{
"workspace_id": "workspace-123",
"path": "/workspace/src",
"recursive": True,
},
),
(
"read_workspace_disk",
{
"workspace_id": "workspace-123",
"path": "note.txt",
"max_bytes": 4096,
},
),
]
def test_pyro_create_server_workspace_disk_tools_delegate() -> None:
calls: list[tuple[str, dict[str, Any]]] = []
class StubManager:
def stop_workspace(self, workspace_id: str) -> dict[str, Any]:
calls.append(("stop_workspace", {"workspace_id": workspace_id}))
return {"workspace_id": workspace_id, "state": "stopped"}
def start_workspace(self, workspace_id: str) -> dict[str, Any]:
calls.append(("start_workspace", {"workspace_id": workspace_id}))
return {"workspace_id": workspace_id, "state": "started"}
def export_workspace_disk(self, workspace_id: str, *, output_path: str) -> dict[str, Any]:
calls.append(
(
"export_workspace_disk",
{"workspace_id": workspace_id, "output_path": output_path},
)
)
return {"workspace_id": workspace_id, "output_path": output_path}
def list_workspace_disk(
self,
workspace_id: str,
*,
path: str = "/workspace",
recursive: bool = False,
) -> dict[str, Any]:
calls.append(
(
"list_workspace_disk",
{
"workspace_id": workspace_id,
"path": path,
"recursive": recursive,
},
)
)
return {"workspace_id": workspace_id, "entries": []}
def read_workspace_disk(
self,
workspace_id: str,
*,
path: str,
max_bytes: int = 65536,
) -> dict[str, Any]:
calls.append(
(
"read_workspace_disk",
{
"workspace_id": workspace_id,
"path": path,
"max_bytes": max_bytes,
},
)
)
return {"workspace_id": workspace_id, "content": ""}
pyro = Pyro(manager=cast(Any, StubManager()))
def _extract_structured(raw_result: object) -> dict[str, Any]:
if not isinstance(raw_result, tuple) or len(raw_result) != 2:
raise TypeError("unexpected call_tool result shape")
_, structured = raw_result
if not isinstance(structured, dict):
raise TypeError("expected structured dictionary result")
return cast(dict[str, Any], structured)
async def _run() -> tuple[dict[str, Any], ...]:
server = pyro.create_server()
stopped = _extract_structured(
await server.call_tool("workspace_stop", {"workspace_id": "workspace-123"})
)
started = _extract_structured(
await server.call_tool("workspace_start", {"workspace_id": "workspace-123"})
)
exported = _extract_structured(
await server.call_tool(
"workspace_disk_export",
{
"workspace_id": "workspace-123",
"output_path": "/tmp/workspace.ext4",
},
)
)
listed = _extract_structured(
await server.call_tool(
"workspace_disk_list",
{
"workspace_id": "workspace-123",
"path": "/workspace/src",
"recursive": True,
},
)
)
read = _extract_structured(
await server.call_tool(
"workspace_disk_read",
{
"workspace_id": "workspace-123",
"path": "note.txt",
"max_bytes": 4096,
},
)
)
return stopped, started, exported, listed, read
stopped, started, exported, listed, read = asyncio.run(_run())
assert stopped["state"] == "stopped"
assert started["state"] == "started"
assert exported["output_path"] == "/tmp/workspace.ext4"
assert listed["entries"] == []
assert read["content"] == ""
assert calls == [
("stop_workspace", {"workspace_id": "workspace-123"}),
("start_workspace", {"workspace_id": "workspace-123"}),
(
"export_workspace_disk",
{
"workspace_id": "workspace-123",
"output_path": "/tmp/workspace.ext4",
},
),
(
"list_workspace_disk",
{
"workspace_id": "workspace-123",
"path": "/workspace/src",
"recursive": True,
},
),
(
"read_workspace_disk",
{
"workspace_id": "workspace-123",
"path": "note.txt",
"max_bytes": 4096,
},
),
]
def test_pyro_create_server_workspace_status_shell_and_service_delegate() -> None:
calls: list[tuple[str, dict[str, Any]]] = []
class StubManager:
def status_workspace(self, workspace_id: str) -> dict[str, Any]:
calls.append(("status_workspace", {"workspace_id": workspace_id}))
return {"workspace_id": workspace_id, "state": "started"}
def logs_workspace(self, workspace_id: str) -> dict[str, Any]:
calls.append(("logs_workspace", {"workspace_id": workspace_id}))
return {"workspace_id": workspace_id, "count": 0, "entries": []}
def open_shell(
self,
workspace_id: str,
*,
cwd: str = "/workspace",
cols: int = 120,
rows: int = 30,
secret_env: dict[str, str] | None = None,
) -> dict[str, Any]:
calls.append(
(
"open_shell",
{
"workspace_id": workspace_id,
"cwd": cwd,
"cols": cols,
"rows": rows,
"secret_env": secret_env,
},
)
)
return {"workspace_id": workspace_id, "shell_id": "shell-1", "state": "running"}
def read_shell(
self,
workspace_id: str,
shell_id: str,
*,
cursor: int = 0,
max_chars: int = 65536,
) -> dict[str, Any]:
calls.append(
(
"read_shell",
{
"workspace_id": workspace_id,
"shell_id": shell_id,
"cursor": cursor,
"max_chars": max_chars,
},
)
)
return {"workspace_id": workspace_id, "shell_id": shell_id, "output": ""}
def write_shell(
self,
workspace_id: str,
shell_id: str,
*,
input_text: str,
append_newline: bool = True,
) -> dict[str, Any]:
calls.append(
(
"write_shell",
{
"workspace_id": workspace_id,
"shell_id": shell_id,
"input_text": input_text,
"append_newline": append_newline,
},
)
)
return {
"workspace_id": workspace_id,
"shell_id": shell_id,
"input_length": len(input_text),
}
def signal_shell(
self,
workspace_id: str,
shell_id: str,
*,
signal_name: str = "INT",
) -> dict[str, Any]:
calls.append(
(
"signal_shell",
{
"workspace_id": workspace_id,
"shell_id": shell_id,
"signal_name": signal_name,
},
)
)
return {"workspace_id": workspace_id, "shell_id": shell_id, "signal": signal_name}
def close_shell(self, workspace_id: str, shell_id: str) -> dict[str, Any]:
calls.append(
("close_shell", {"workspace_id": workspace_id, "shell_id": shell_id})
)
return {"workspace_id": workspace_id, "shell_id": shell_id, "closed": True}
def start_service(
self,
workspace_id: str,
service_name: str,
**kwargs: Any,
) -> dict[str, Any]:
calls.append(
(
"start_service",
{
"workspace_id": workspace_id,
"service_name": service_name,
**kwargs,
},
)
)
return {"workspace_id": workspace_id, "service_name": service_name, "state": "running"}
pyro = Pyro(manager=cast(Any, StubManager()))
def _extract_structured(raw_result: object) -> dict[str, Any]:
if not isinstance(raw_result, tuple) or len(raw_result) != 2:
raise TypeError("unexpected call_tool result shape")
_, structured = raw_result
if not isinstance(structured, dict):
raise TypeError("expected structured dictionary result")
return cast(dict[str, Any], structured)
async def _run() -> tuple[dict[str, Any], ...]:
server = pyro.create_server()
status = _extract_structured(
await server.call_tool("workspace_status", {"workspace_id": "workspace-123"})
)
logs = _extract_structured(
await server.call_tool("workspace_logs", {"workspace_id": "workspace-123"})
)
opened = _extract_structured(
await server.call_tool(
"shell_open",
{
"workspace_id": "workspace-123",
"cwd": "/workspace/src",
"cols": 100,
"rows": 20,
"secret_env": {"TOKEN": "API_TOKEN"},
},
)
)
read = _extract_structured(
await server.call_tool(
"shell_read",
{
"workspace_id": "workspace-123",
"shell_id": "shell-1",
"cursor": 5,
"max_chars": 1024,
},
)
)
wrote = _extract_structured(
await server.call_tool(
"shell_write",
{
"workspace_id": "workspace-123",
"shell_id": "shell-1",
"input": "pwd",
"append_newline": False,
},
)
)
signaled = _extract_structured(
await server.call_tool(
"shell_signal",
{
"workspace_id": "workspace-123",
"shell_id": "shell-1",
"signal_name": "TERM",
},
)
)
closed = _extract_structured(
await server.call_tool(
"shell_close",
{"workspace_id": "workspace-123", "shell_id": "shell-1"},
)
)
file_service = _extract_structured(
await server.call_tool(
"service_start",
{
"workspace_id": "workspace-123",
"service_name": "file",
"command": "run-file",
"ready_file": ".ready",
},
)
)
tcp_service = _extract_structured(
await server.call_tool(
"service_start",
{
"workspace_id": "workspace-123",
"service_name": "tcp",
"command": "run-tcp",
"ready_tcp": "127.0.0.1:8080",
},
)
)
http_service = _extract_structured(
await server.call_tool(
"service_start",
{
"workspace_id": "workspace-123",
"service_name": "http",
"command": "run-http",
"ready_http": "http://127.0.0.1:8080/",
},
)
)
command_service = _extract_structured(
await server.call_tool(
"service_start",
{
"workspace_id": "workspace-123",
"service_name": "command",
"command": "run-command",
"ready_command": "test -f .ready",
},
)
)
return (
status,
logs,
opened,
read,
wrote,
signaled,
closed,
file_service,
tcp_service,
http_service,
command_service,
)
results = asyncio.run(_run())
assert results[0]["state"] == "started"
assert results[1]["count"] == 0
assert results[2]["shell_id"] == "shell-1"
assert results[6]["closed"] is True
assert results[7]["state"] == "running"
assert results[10]["state"] == "running"
assert calls == [
("status_workspace", {"workspace_id": "workspace-123"}),
("logs_workspace", {"workspace_id": "workspace-123"}),
(
"open_shell",
{
"workspace_id": "workspace-123",
"cwd": "/workspace/src",
"cols": 100,
"rows": 20,
"secret_env": {"TOKEN": "API_TOKEN"},
},
),
(
"read_shell",
{
"workspace_id": "workspace-123",
"shell_id": "shell-1",
"cursor": 5,
"max_chars": 1024,
},
),
(
"write_shell",
{
"workspace_id": "workspace-123",
"shell_id": "shell-1",
"input_text": "pwd",
"append_newline": False,
},
),
(
"signal_shell",
{
"workspace_id": "workspace-123",
"shell_id": "shell-1",
"signal_name": "TERM",
},
),
("close_shell", {"workspace_id": "workspace-123", "shell_id": "shell-1"}),
(
"start_service",
{
"workspace_id": "workspace-123",
"service_name": "file",
"command": "run-file",
"cwd": "/workspace",
"readiness": {"type": "file", "path": ".ready"},
"ready_timeout_seconds": 30,
"ready_interval_ms": 500,
"secret_env": None,
"published_ports": None,
},
),
(
"start_service",
{
"workspace_id": "workspace-123",
"service_name": "tcp",
"command": "run-tcp",
"cwd": "/workspace",
"readiness": {"type": "tcp", "address": "127.0.0.1:8080"},
"ready_timeout_seconds": 30,
"ready_interval_ms": 500,
"secret_env": None,
"published_ports": None,
},
),
(
"start_service",
{
"workspace_id": "workspace-123",
"service_name": "http",
"command": "run-http",
"cwd": "/workspace",
"readiness": {"type": "http", "url": "http://127.0.0.1:8080/"},
"ready_timeout_seconds": 30,
"ready_interval_ms": 500,
"secret_env": None,
"published_ports": None,
},
),
(
"start_service",
{
"workspace_id": "workspace-123",
"service_name": "command",
"command": "run-command",
"cwd": "/workspace",
"readiness": {"type": "command", "command": "test -f .ready"},
"ready_timeout_seconds": 30,
"ready_interval_ms": 500,
"secret_env": None,
"published_ports": None,
},
),
]

View file

@ -72,6 +72,10 @@ def test_cli_subcommand_help_includes_examples_and_guidance() -> None:
assert "pyro workspace exec WORKSPACE_ID" in workspace_help
assert "pyro workspace diff WORKSPACE_ID" in workspace_help
assert "pyro workspace export WORKSPACE_ID src/note.txt --output ./note.txt" in workspace_help
assert "pyro workspace stop WORKSPACE_ID" in workspace_help
assert "pyro workspace disk list WORKSPACE_ID" in workspace_help
assert "pyro workspace disk export WORKSPACE_ID --output ./workspace.ext4" in workspace_help
assert "pyro workspace start WORKSPACE_ID" in workspace_help
assert "pyro workspace snapshot create WORKSPACE_ID checkpoint" in workspace_help
assert "pyro workspace reset WORKSPACE_ID --snapshot checkpoint" in workspace_help
assert "pyro workspace shell open WORKSPACE_ID" in workspace_help
@ -112,6 +116,37 @@ def test_cli_subcommand_help_includes_examples_and_guidance() -> None:
assert "--output" in workspace_export_help
assert "Export one file or directory from `/workspace`" in workspace_export_help
workspace_stop_help = _subparser_choice(
_subparser_choice(parser, "workspace"), "stop"
).format_help()
assert "Stop the backing sandbox" in workspace_stop_help
workspace_start_help = _subparser_choice(
_subparser_choice(parser, "workspace"), "start"
).format_help()
assert "previously stopped workspace" in workspace_start_help
workspace_disk_help = _subparser_choice(
_subparser_choice(parser, "workspace"), "disk"
).format_help()
assert "secondary stopped-workspace disk tools" in workspace_disk_help
assert "pyro workspace disk read WORKSPACE_ID note.txt" in workspace_disk_help
workspace_disk_export_help = _subparser_choice(
_subparser_choice(_subparser_choice(parser, "workspace"), "disk"), "export"
).format_help()
assert "--output" in workspace_disk_export_help
workspace_disk_list_help = _subparser_choice(
_subparser_choice(_subparser_choice(parser, "workspace"), "disk"), "list"
).format_help()
assert "--recursive" in workspace_disk_list_help
workspace_disk_read_help = _subparser_choice(
_subparser_choice(_subparser_choice(parser, "workspace"), "disk"), "read"
).format_help()
assert "--max-bytes" in workspace_disk_read_help
workspace_diff_help = _subparser_choice(
_subparser_choice(parser, "workspace"), "diff"
).format_help()
@ -647,6 +682,193 @@ def test_cli_workspace_export_prints_human_output(
assert "artifact_type=file" in output
def test_cli_workspace_stop_and_start_print_human_output(
monkeypatch: pytest.MonkeyPatch,
capsys: pytest.CaptureFixture[str],
) -> None:
class StubPyro:
def stop_workspace(self, workspace_id: str) -> dict[str, Any]:
assert workspace_id == "workspace-123"
return {
"workspace_id": workspace_id,
"environment": "debian:12",
"state": "stopped",
"workspace_path": "/workspace",
"network_policy": "off",
"execution_mode": "guest_vsock",
"vcpu_count": 1,
"mem_mib": 1024,
"command_count": 2,
"reset_count": 0,
"service_count": 0,
"running_service_count": 0,
}
def start_workspace(self, workspace_id: str) -> dict[str, Any]:
assert workspace_id == "workspace-123"
return {
"workspace_id": workspace_id,
"environment": "debian:12",
"state": "started",
"workspace_path": "/workspace",
"network_policy": "off",
"execution_mode": "guest_vsock",
"vcpu_count": 1,
"mem_mib": 1024,
"command_count": 2,
"reset_count": 0,
"service_count": 0,
"running_service_count": 0,
}
class StopParser:
def parse_args(self) -> argparse.Namespace:
return argparse.Namespace(
command="workspace",
workspace_command="stop",
workspace_id="workspace-123",
json=False,
)
class StartParser:
def parse_args(self) -> argparse.Namespace:
return argparse.Namespace(
command="workspace",
workspace_command="start",
workspace_id="workspace-123",
json=False,
)
monkeypatch.setattr(cli, "Pyro", StubPyro)
monkeypatch.setattr(cli, "_build_parser", lambda: StopParser())
cli.main()
stopped_output = capsys.readouterr().out
assert "Stopped workspace ID: workspace-123" in stopped_output
monkeypatch.setattr(cli, "_build_parser", lambda: StartParser())
cli.main()
started_output = capsys.readouterr().out
assert "Started workspace ID: workspace-123" in started_output
def test_cli_workspace_disk_commands_print_human_and_json(
monkeypatch: pytest.MonkeyPatch,
capsys: pytest.CaptureFixture[str],
) -> None:
class StubPyro:
def export_workspace_disk(
self,
workspace_id: str,
*,
output_path: str,
) -> dict[str, Any]:
assert workspace_id == "workspace-123"
assert output_path == "./workspace.ext4"
return {
"workspace_id": workspace_id,
"output_path": "/tmp/workspace.ext4",
"disk_format": "ext4",
"bytes_written": 8192,
}
def list_workspace_disk(
self,
workspace_id: str,
*,
path: str,
recursive: bool,
) -> dict[str, Any]:
assert workspace_id == "workspace-123"
assert path == "/workspace"
assert recursive is True
return {
"workspace_id": workspace_id,
"path": path,
"recursive": recursive,
"entries": [
{
"path": "/workspace/note.txt",
"artifact_type": "file",
"size_bytes": 6,
"link_target": None,
}
],
}
def read_workspace_disk(
self,
workspace_id: str,
path: str,
*,
max_bytes: int,
) -> dict[str, Any]:
assert workspace_id == "workspace-123"
assert path == "note.txt"
assert max_bytes == 4096
return {
"workspace_id": workspace_id,
"path": "/workspace/note.txt",
"size_bytes": 6,
"max_bytes": max_bytes,
"content": "hello\n",
"truncated": False,
}
class ExportParser:
def parse_args(self) -> argparse.Namespace:
return argparse.Namespace(
command="workspace",
workspace_command="disk",
workspace_disk_command="export",
workspace_id="workspace-123",
output="./workspace.ext4",
json=False,
)
class ListParser:
def parse_args(self) -> argparse.Namespace:
return argparse.Namespace(
command="workspace",
workspace_command="disk",
workspace_disk_command="list",
workspace_id="workspace-123",
path="/workspace",
recursive=True,
json=False,
)
class ReadParser:
def parse_args(self) -> argparse.Namespace:
return argparse.Namespace(
command="workspace",
workspace_command="disk",
workspace_disk_command="read",
workspace_id="workspace-123",
path="note.txt",
max_bytes=4096,
json=True,
)
monkeypatch.setattr(cli, "Pyro", StubPyro)
monkeypatch.setattr(cli, "_build_parser", lambda: ExportParser())
cli.main()
export_output = capsys.readouterr().out
assert "[workspace-disk-export] workspace_id=workspace-123" in export_output
monkeypatch.setattr(cli, "_build_parser", lambda: ListParser())
cli.main()
list_output = capsys.readouterr().out
assert "Workspace disk path: /workspace" in list_output
assert "/workspace/note.txt [file]" in list_output
monkeypatch.setattr(cli, "_build_parser", lambda: ReadParser())
cli.main()
read_payload = json.loads(capsys.readouterr().out)
assert read_payload["path"] == "/workspace/note.txt"
assert read_payload["content"] == "hello\n"
def test_cli_workspace_diff_prints_human_output(
monkeypatch: pytest.MonkeyPatch,
capsys: pytest.CaptureFixture[str],

View file

@ -6,6 +6,7 @@ import json
import pytest
import pyro_mcp.doctor as doctor_module
from pyro_mcp.runtime import DEFAULT_PLATFORM
def test_doctor_main_prints_json(
@ -25,3 +26,9 @@ def test_doctor_main_prints_json(
doctor_module.main()
output = json.loads(capsys.readouterr().out)
assert output["runtime_ok"] is True
def test_doctor_build_parser_defaults_platform() -> None:
parser = doctor_module._build_parser()
args = parser.parse_args([])
assert args.platform == DEFAULT_PLATFORM

View file

@ -0,0 +1,67 @@
from __future__ import annotations
from importlib.metadata import PackageNotFoundError
from typing import Any, cast
import pyro_mcp as package_module
def test_resolve_version_prefers_pyproject_version(monkeypatch: Any) -> None:
monkeypatch.setattr(package_module, "version", lambda _name: "9.9.9")
assert package_module._resolve_version() == package_module.__version__ # noqa: SLF001
def test_resolve_version_falls_back_to_unknown_without_metadata(monkeypatch: Any) -> None:
class _FakePyprojectPath:
def exists(self) -> bool:
return False
class _FakeResolvedPath:
@property
def parents(self) -> dict[int, Any]:
return {2: self}
def __truediv__(self, _other: str) -> _FakePyprojectPath:
return _FakePyprojectPath()
class _FakePathFactory:
def __init__(self, _value: str) -> None:
return None
def resolve(self) -> _FakeResolvedPath:
return _FakeResolvedPath()
monkeypatch.setattr(
package_module,
"version",
lambda _name: (_ for _ in ()).throw(PackageNotFoundError()),
)
monkeypatch.setattr(package_module, "Path", cast(Any, _FakePathFactory))
assert package_module._resolve_version() == "0+unknown" # noqa: SLF001
def test_resolve_version_falls_back_to_installed_version(monkeypatch: Any) -> None:
class _FakePyprojectPath:
def exists(self) -> bool:
return False
class _FakeResolvedPath:
@property
def parents(self) -> dict[int, Any]:
return {2: self}
def __truediv__(self, _other: str) -> _FakePyprojectPath:
return _FakePyprojectPath()
class _FakePathFactory:
def __init__(self, _value: str) -> None:
return None
def resolve(self) -> _FakeResolvedPath:
return _FakeResolvedPath()
monkeypatch.setattr(package_module, "version", lambda _name: "9.9.9")
monkeypatch.setattr(package_module, "Path", cast(Any, _FakePathFactory))
assert package_module._resolve_version() == "9.9.9" # noqa: SLF001

View file

@ -19,6 +19,9 @@ from pyro_mcp.contract import (
PUBLIC_CLI_RUN_FLAGS,
PUBLIC_CLI_WORKSPACE_CREATE_FLAGS,
PUBLIC_CLI_WORKSPACE_DIFF_FLAGS,
PUBLIC_CLI_WORKSPACE_DISK_EXPORT_FLAGS,
PUBLIC_CLI_WORKSPACE_DISK_LIST_FLAGS,
PUBLIC_CLI_WORKSPACE_DISK_READ_FLAGS,
PUBLIC_CLI_WORKSPACE_EXEC_FLAGS,
PUBLIC_CLI_WORKSPACE_EXPORT_FLAGS,
PUBLIC_CLI_WORKSPACE_RESET_FLAGS,
@ -38,6 +41,8 @@ from pyro_mcp.contract import (
PUBLIC_CLI_WORKSPACE_SNAPSHOT_DELETE_FLAGS,
PUBLIC_CLI_WORKSPACE_SNAPSHOT_LIST_FLAGS,
PUBLIC_CLI_WORKSPACE_SNAPSHOT_SUBCOMMANDS,
PUBLIC_CLI_WORKSPACE_START_FLAGS,
PUBLIC_CLI_WORKSPACE_STOP_FLAGS,
PUBLIC_CLI_WORKSPACE_SUBCOMMANDS,
PUBLIC_CLI_WORKSPACE_SYNC_PUSH_FLAGS,
PUBLIC_CLI_WORKSPACE_SYNC_SUBCOMMANDS,
@ -116,6 +121,26 @@ def test_public_cli_help_lists_commands_and_run_flags() -> None:
).format_help()
for flag in PUBLIC_CLI_WORKSPACE_EXPORT_FLAGS:
assert flag in workspace_export_help_text
workspace_disk_help_text = _subparser_choice(
_subparser_choice(parser, "workspace"), "disk"
).format_help()
for subcommand_name in ("export", "list", "read"):
assert subcommand_name in workspace_disk_help_text
workspace_disk_export_help_text = _subparser_choice(
_subparser_choice(_subparser_choice(parser, "workspace"), "disk"), "export"
).format_help()
for flag in PUBLIC_CLI_WORKSPACE_DISK_EXPORT_FLAGS:
assert flag in workspace_disk_export_help_text
workspace_disk_list_help_text = _subparser_choice(
_subparser_choice(_subparser_choice(parser, "workspace"), "disk"), "list"
).format_help()
for flag in PUBLIC_CLI_WORKSPACE_DISK_LIST_FLAGS:
assert flag in workspace_disk_list_help_text
workspace_disk_read_help_text = _subparser_choice(
_subparser_choice(_subparser_choice(parser, "workspace"), "disk"), "read"
).format_help()
for flag in PUBLIC_CLI_WORKSPACE_DISK_READ_FLAGS:
assert flag in workspace_disk_read_help_text
workspace_diff_help_text = _subparser_choice(
_subparser_choice(parser, "workspace"), "diff"
).format_help()
@ -150,6 +175,16 @@ def test_public_cli_help_lists_commands_and_run_flags() -> None:
).format_help()
for flag in PUBLIC_CLI_WORKSPACE_RESET_FLAGS:
assert flag in workspace_reset_help_text
workspace_start_help_text = _subparser_choice(
_subparser_choice(parser, "workspace"), "start"
).format_help()
for flag in PUBLIC_CLI_WORKSPACE_START_FLAGS:
assert flag in workspace_start_help_text
workspace_stop_help_text = _subparser_choice(
_subparser_choice(parser, "workspace"), "stop"
).format_help()
for flag in PUBLIC_CLI_WORKSPACE_STOP_FLAGS:
assert flag in workspace_stop_help_text
workspace_shell_help_text = _subparser_choice(
_subparser_choice(parser, "workspace"),
"shell",

View file

@ -1,6 +1,8 @@
from __future__ import annotations
from pyro_mcp.runtime_boot_check import _classify_result
import pytest
from pyro_mcp.runtime_boot_check import _classify_result, run_boot_check
def test_classify_result_reports_kernel_panic() -> None:
@ -19,3 +21,32 @@ def test_classify_result_reports_success_when_vm_stays_alive() -> None:
vm_alive=True,
)
assert reason is None
def test_classify_result_reports_logger_failure_and_early_exit() -> None:
logger_reason = _classify_result(
firecracker_log="Successfully started microvm",
serial_log="Could not initialize logger",
vm_alive=False,
)
early_exit_reason = _classify_result(
firecracker_log="partial log",
serial_log="boot log",
vm_alive=False,
)
assert logger_reason == "firecracker logger initialization failed"
assert early_exit_reason == "firecracker did not fully start the microVM"
def test_classify_result_reports_boot_window_exit_after_start() -> None:
reason = _classify_result(
firecracker_log="Successfully started microvm",
serial_log="boot log",
vm_alive=False,
)
assert reason == "microVM exited before boot validation window elapsed"
def test_run_boot_check_requires_positive_wait_seconds() -> None:
with pytest.raises(ValueError, match="wait_seconds must be positive"):
run_boot_check(wait_seconds=0)

View file

@ -32,8 +32,13 @@ def test_create_server_registers_vm_tools(tmp_path: Path) -> None:
assert "vm_run" in tool_names
assert "vm_status" in tool_names
assert "workspace_create" in tool_names
assert "workspace_start" in tool_names
assert "workspace_stop" in tool_names
assert "workspace_diff" in tool_names
assert "workspace_export" in tool_names
assert "workspace_disk_export" in tool_names
assert "workspace_disk_list" in tool_names
assert "workspace_disk_read" in tool_names
assert "workspace_logs" in tool_names
assert "workspace_sync_push" in tool_names
assert "shell_open" in tool_names

View file

@ -18,6 +18,55 @@ from pyro_mcp.vm_manager import VmManager
from pyro_mcp.vm_network import NetworkConfig, TapNetworkManager
def _run_debugfs_write(rootfs_image: Path, command: str) -> None:
proc = subprocess.run( # noqa: S603
["debugfs", "-w", "-R", command, str(rootfs_image)],
text=True,
capture_output=True,
check=False,
)
if proc.returncode != 0:
message = proc.stderr.strip() or proc.stdout.strip() or command
raise RuntimeError(message)
def _create_stopped_workspace_rootfs(tmp_path: Path) -> Path:
rootfs_image = tmp_path / "workspace-rootfs.ext4"
with rootfs_image.open("wb") as handle:
handle.truncate(16 * 1024 * 1024)
proc = subprocess.run( # noqa: S603
["mkfs.ext4", "-F", str(rootfs_image)],
text=True,
capture_output=True,
check=False,
)
if proc.returncode != 0:
message = proc.stderr.strip() or proc.stdout.strip() or "mkfs.ext4 failed"
raise RuntimeError(message)
for directory in (
"/workspace",
"/workspace/src",
"/run",
"/run/pyro-secrets",
"/run/pyro-services",
):
_run_debugfs_write(rootfs_image, f"mkdir {directory}")
note_path = tmp_path / "note.txt"
note_path.write_text("hello from disk\n", encoding="utf-8")
child_path = tmp_path / "child.txt"
child_path.write_text("nested child\n", encoding="utf-8")
secret_path = tmp_path / "secret.txt"
secret_path.write_text("super-secret\n", encoding="utf-8")
service_path = tmp_path / "service.log"
service_path.write_text("service runtime\n", encoding="utf-8")
_run_debugfs_write(rootfs_image, f"write {note_path} /workspace/note.txt")
_run_debugfs_write(rootfs_image, f"write {child_path} /workspace/src/child.txt")
_run_debugfs_write(rootfs_image, "symlink /workspace/link note.txt")
_run_debugfs_write(rootfs_image, f"write {secret_path} /run/pyro-secrets/TOKEN")
_run_debugfs_write(rootfs_image, f"write {service_path} /run/pyro-services/app.log")
return rootfs_image
def test_vm_manager_lifecycle_and_auto_cleanup(tmp_path: Path) -> None:
manager = VmManager(
backend_name="mock",
@ -1129,6 +1178,80 @@ def test_vm_manager_firecracker_backend_path(
assert manager._backend_name == "firecracker" # noqa: SLF001
def test_firecracker_backend_start_removes_stale_socket_files(
tmp_path: Path,
monkeypatch: pytest.MonkeyPatch,
) -> None:
backend = cast(Any, object.__new__(vm_manager_module.FirecrackerBackend))
backend._environment_store = object() # noqa: SLF001
backend._firecracker_bin = tmp_path / "firecracker" # noqa: SLF001
backend._jailer_bin = tmp_path / "jailer" # noqa: SLF001
backend._runtime_capabilities = RuntimeCapabilities( # noqa: SLF001
supports_vm_boot=True,
supports_guest_exec=True,
supports_guest_network=False,
reason=None,
)
backend._network_manager = TapNetworkManager(enabled=False) # noqa: SLF001
backend._guest_exec_client = None # noqa: SLF001
backend._processes = {} # noqa: SLF001
backend._firecracker_bin.write_text("fc", encoding="utf-8") # noqa: SLF001
backend._jailer_bin.write_text("jailer", encoding="utf-8") # noqa: SLF001
kernel_image = tmp_path / "vmlinux"
kernel_image.write_text("kernel", encoding="utf-8")
rootfs_image = tmp_path / "rootfs.ext4"
rootfs_image.write_bytes(b"rootfs")
workdir = tmp_path / "runtime"
workdir.mkdir()
firecracker_socket = workdir / "firecracker.sock"
vsock_socket = workdir / "vsock.sock"
firecracker_socket.write_text("stale firecracker socket", encoding="utf-8")
vsock_socket.write_text("stale vsock socket", encoding="utf-8")
class DummyPopen:
def __init__(self, *args: Any, **kwargs: Any) -> None:
del args, kwargs
self.pid = 4242
def poll(self) -> None:
return None
monkeypatch.setattr(
cast(Any, vm_manager_module).subprocess,
"run",
lambda *args, **kwargs: subprocess.CompletedProcess( # noqa: ARG005
args=args[0],
returncode=0,
stdout="Firecracker v1.0.0\n",
stderr="",
),
)
monkeypatch.setattr(cast(Any, vm_manager_module).subprocess, "Popen", DummyPopen)
instance = vm_manager_module.VmInstance(
vm_id="abcd1234",
environment="debian:12",
vcpu_count=1,
mem_mib=512,
ttl_seconds=600,
created_at=time.time(),
expires_at=time.time() + 600,
workdir=workdir,
metadata={
"kernel_image": str(kernel_image),
"rootfs_image": str(rootfs_image),
},
)
backend.start(instance)
assert instance.firecracker_pid == 4242
assert not firecracker_socket.exists()
assert not vsock_socket.exists()
def test_vm_manager_fails_closed_without_host_compat_opt_in(tmp_path: Path) -> None:
manager = VmManager(
backend_name="mock",
@ -2691,3 +2814,181 @@ def test_workspace_secrets_require_guest_exec_on_firecracker_runtime(
allow_host_compat=True,
secrets=[{"name": "TOKEN", "value": "expected"}],
)
def test_workspace_stop_and_start_preserve_logs_and_clear_live_state(tmp_path: Path) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
seed_dir = tmp_path / "seed"
seed_dir.mkdir()
(seed_dir / "note.txt").write_text("hello from seed\n", encoding="utf-8")
created = manager.create_workspace(
environment="debian:12-base",
allow_host_compat=True,
seed_path=seed_dir,
)
workspace_id = str(created["workspace_id"])
manager.exec_workspace(workspace_id, command="cat note.txt", timeout_seconds=30)
shell = manager.open_shell(workspace_id)
shell_id = str(shell["shell_id"])
started_service = manager.start_service(
workspace_id,
"app",
command='sh -lc \'touch .ready && trap "exit 0" TERM; while true; do sleep 60; done\'',
readiness={"type": "file", "path": ".ready"},
)
assert started_service["state"] == "running"
stopped = manager.stop_workspace(workspace_id)
assert stopped["state"] == "stopped"
assert stopped["command_count"] == 1
assert stopped["service_count"] == 0
assert stopped["running_service_count"] == 0
assert manager.logs_workspace(workspace_id)["count"] == 1
with pytest.raises(RuntimeError, match="must be in 'started' state"):
manager.read_shell(workspace_id, shell_id, cursor=0, max_chars=1024)
restarted = manager.start_workspace(workspace_id)
assert restarted["state"] == "started"
assert restarted["command_count"] == 1
assert restarted["service_count"] == 0
rerun = manager.exec_workspace(workspace_id, command="cat note.txt", timeout_seconds=30)
assert rerun["stdout"] == "hello from seed\n"
def test_workspace_stop_flushes_guest_filesystem_before_stopping(
tmp_path: Path,
) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
created = manager.create_workspace(
environment="debian:12-base",
allow_host_compat=True,
)
workspace_id = str(created["workspace_id"])
workspace_path = tmp_path / "vms" / "workspaces" / workspace_id / "workspace.json"
payload = json.loads(workspace_path.read_text(encoding="utf-8"))
payload["state"] = "started"
payload["firecracker_pid"] = os.getpid()
payload["metadata"]["execution_mode"] = "guest_vsock"
payload["metadata"]["rootfs_image"] = str(_create_stopped_workspace_rootfs(tmp_path))
workspace_path.write_text(json.dumps(payload, indent=2, sort_keys=True), encoding="utf-8")
calls: list[tuple[str, str]] = []
class StubBackend:
def exec(
self,
instance: Any,
command: str,
timeout_seconds: int,
*,
workdir: Path | None = None,
env: dict[str, str] | None = None,
) -> vm_manager_module.VmExecResult:
del instance, timeout_seconds, workdir, env
calls.append(("exec", command))
return vm_manager_module.VmExecResult(
stdout="",
stderr="",
exit_code=0,
duration_ms=1,
)
def stop(self, instance: Any) -> None:
del instance
calls.append(("stop", "instance"))
manager._backend = StubBackend() # type: ignore[assignment] # noqa: SLF001
manager._backend_name = "firecracker" # noqa: SLF001
manager._runtime_capabilities = RuntimeCapabilities( # noqa: SLF001
supports_vm_boot=True,
supports_guest_exec=True,
supports_guest_network=False,
reason=None,
)
stopped = manager.stop_workspace(workspace_id)
assert calls == [("exec", "sync"), ("stop", "instance")]
assert stopped["state"] == "stopped"
def test_workspace_disk_operations_scrub_runtime_only_paths_and_export(
tmp_path: Path,
) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
rootfs_image = _create_stopped_workspace_rootfs(tmp_path)
workspace_id = "workspace-disk-123"
workspace = vm_manager_module.WorkspaceRecord(
workspace_id=workspace_id,
environment="debian:12-base",
vcpu_count=1,
mem_mib=512,
ttl_seconds=600,
created_at=time.time(),
expires_at=time.time() + 600,
state="stopped",
network_policy="off",
allow_host_compat=False,
metadata={
"execution_mode": "guest_vsock",
"rootfs_image": str(rootfs_image),
"workspace_path": "/workspace",
},
)
manager._save_workspace_locked(workspace) # noqa: SLF001
listed = manager.list_workspace_disk(workspace_id, path="/workspace", recursive=True)
assert listed["path"] == "/workspace"
listed_paths = {entry["path"] for entry in listed["entries"]}
assert "/workspace/note.txt" in listed_paths
assert "/workspace/src/child.txt" in listed_paths
assert "/workspace/link" in listed_paths
read_payload = manager.read_workspace_disk(workspace_id, path="note.txt", max_bytes=4096)
assert read_payload["content"] == "hello from disk\n"
assert read_payload["truncated"] is False
run_listing = manager.list_workspace_disk(workspace_id, path="/run", recursive=True)
run_paths = {entry["path"] for entry in run_listing["entries"]}
assert "/run/pyro-secrets" not in run_paths
assert "/run/pyro-services" not in run_paths
exported_path = tmp_path / "workspace-copy.ext4"
exported = manager.export_workspace_disk(workspace_id, output_path=exported_path)
assert exported["disk_format"] == "ext4"
assert exported_path.exists()
assert exported_path.stat().st_size == int(exported["bytes_written"])
def test_workspace_disk_operations_reject_host_compat_workspaces(tmp_path: Path) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
created = manager.create_workspace(
environment="debian:12-base",
allow_host_compat=True,
)
workspace_id = str(created["workspace_id"])
manager.stop_workspace(workspace_id)
with pytest.raises(RuntimeError, match="host_compat workspaces"):
manager.export_workspace_disk(workspace_id, output_path=tmp_path / "workspace.ext4")
with pytest.raises(RuntimeError, match="host_compat workspaces"):
manager.list_workspace_disk(workspace_id)
with pytest.raises(RuntimeError, match="host_compat workspaces"):
manager.read_workspace_disk(workspace_id, path="note.txt")

View file

@ -0,0 +1,258 @@
from __future__ import annotations
import subprocess
from pathlib import Path
from types import SimpleNamespace
from typing import Any, cast
import pytest
import pyro_mcp.workspace_disk as workspace_disk_module
from pyro_mcp.workspace_disk import (
_artifact_type_from_mode,
_debugfs_ls_entries,
_debugfs_stat,
_run_debugfs,
export_workspace_disk_image,
list_workspace_disk,
read_workspace_disk_file,
scrub_workspace_runtime_paths,
)
def _run_debugfs_write(rootfs_image: Path, command: str) -> None:
proc = subprocess.run( # noqa: S603
["debugfs", "-w", "-R", command, str(rootfs_image)],
text=True,
capture_output=True,
check=False,
)
if proc.returncode != 0:
message = proc.stderr.strip() or proc.stdout.strip() or command
raise RuntimeError(message)
def _create_rootfs_image(tmp_path: Path) -> Path:
rootfs_image = tmp_path / "workspace-rootfs.ext4"
with rootfs_image.open("wb") as handle:
handle.truncate(16 * 1024 * 1024)
proc = subprocess.run( # noqa: S603
["mkfs.ext4", "-F", str(rootfs_image)],
text=True,
capture_output=True,
check=False,
)
if proc.returncode != 0:
message = proc.stderr.strip() or proc.stdout.strip() or "mkfs.ext4 failed"
raise RuntimeError(message)
for directory in (
"/workspace",
"/workspace/src",
"/run",
"/run/pyro-secrets",
"/run/pyro-services",
):
_run_debugfs_write(rootfs_image, f"mkdir {directory}")
note_path = tmp_path / "note.txt"
note_path.write_text("hello from disk\n", encoding="utf-8")
child_path = tmp_path / "child.txt"
child_path.write_text("nested child\n", encoding="utf-8")
secret_path = tmp_path / "secret.txt"
secret_path.write_text("super-secret\n", encoding="utf-8")
service_path = tmp_path / "service.log"
service_path.write_text("service runtime\n", encoding="utf-8")
_run_debugfs_write(rootfs_image, f"write {note_path} /workspace/note.txt")
_run_debugfs_write(rootfs_image, f"write {child_path} /workspace/src/child.txt")
_run_debugfs_write(rootfs_image, "symlink /workspace/link note.txt")
_run_debugfs_write(rootfs_image, f"write {secret_path} /run/pyro-secrets/TOKEN")
_run_debugfs_write(rootfs_image, f"write {service_path} /run/pyro-services/app.log")
return rootfs_image
def test_workspace_disk_list_read_export_and_scrub(tmp_path: Path) -> None:
rootfs_image = _create_rootfs_image(tmp_path)
listing = list_workspace_disk(rootfs_image, guest_path="/workspace", recursive=True)
assert listing == [
{
"path": "/workspace/link",
"artifact_type": "symlink",
"size_bytes": 8,
"link_target": "note.txt",
},
{
"path": "/workspace/note.txt",
"artifact_type": "file",
"size_bytes": 16,
"link_target": None,
},
{
"path": "/workspace/src",
"artifact_type": "directory",
"size_bytes": 0,
"link_target": None,
},
{
"path": "/workspace/src/child.txt",
"artifact_type": "file",
"size_bytes": 13,
"link_target": None,
},
]
single = list_workspace_disk(rootfs_image, guest_path="/workspace/note.txt", recursive=False)
assert single == [
{
"path": "/workspace/note.txt",
"artifact_type": "file",
"size_bytes": 16,
"link_target": None,
}
]
read_payload = read_workspace_disk_file(
rootfs_image,
guest_path="/workspace/note.txt",
max_bytes=5,
)
assert read_payload == {
"path": "/workspace/note.txt",
"size_bytes": 16,
"max_bytes": 5,
"content": "hello",
"truncated": True,
}
output_path = tmp_path / "workspace.ext4"
exported = export_workspace_disk_image(rootfs_image, output_path=output_path)
assert exported["output_path"] == str(output_path)
assert exported["disk_format"] == "ext4"
assert int(exported["bytes_written"]) == output_path.stat().st_size
scrub_workspace_runtime_paths(rootfs_image)
run_listing = list_workspace_disk(rootfs_image, guest_path="/run", recursive=True)
assert run_listing == []
def test_workspace_disk_rejects_invalid_inputs(tmp_path: Path) -> None:
rootfs_image = _create_rootfs_image(tmp_path)
with pytest.raises(RuntimeError, match="workspace disk path does not exist"):
list_workspace_disk(rootfs_image, guest_path="/missing", recursive=False)
with pytest.raises(RuntimeError, match="workspace disk path does not exist"):
read_workspace_disk_file(
rootfs_image,
guest_path="/missing.txt",
max_bytes=4096,
)
with pytest.raises(RuntimeError, match="regular files"):
read_workspace_disk_file(
rootfs_image,
guest_path="/workspace/src",
max_bytes=4096,
)
with pytest.raises(ValueError, match="max_bytes must be positive"):
read_workspace_disk_file(
rootfs_image,
guest_path="/workspace/note.txt",
max_bytes=0,
)
output_path = tmp_path / "existing.ext4"
output_path.write_text("present\n", encoding="utf-8")
with pytest.raises(RuntimeError, match="output_path already exists"):
export_workspace_disk_image(rootfs_image, output_path=output_path)
def test_workspace_disk_internal_error_paths(
tmp_path: Path,
monkeypatch: pytest.MonkeyPatch,
) -> None:
rootfs_image = tmp_path / "dummy.ext4"
rootfs_image.write_bytes(b"rootfs")
monkeypatch.setattr(cast(Any, workspace_disk_module).shutil, "which", lambda _name: None)
with pytest.raises(RuntimeError, match="debugfs is required"):
_run_debugfs(rootfs_image, "stat /workspace")
monkeypatch.setattr(
cast(Any, workspace_disk_module).shutil,
"which",
lambda _name: "/usr/bin/debugfs",
)
monkeypatch.setattr(
cast(Any, workspace_disk_module).subprocess,
"run",
lambda *args, **kwargs: SimpleNamespace( # noqa: ARG005
returncode=1,
stdout="",
stderr="",
),
)
with pytest.raises(RuntimeError, match="debugfs command failed: stat /workspace"):
_run_debugfs(rootfs_image, "stat /workspace")
assert _artifact_type_from_mode("00000") is None
monkeypatch.setattr(workspace_disk_module, "_run_debugfs", lambda *_args, **_kwargs: "noise")
with pytest.raises(RuntimeError, match="failed to inspect workspace disk path"):
_debugfs_stat(rootfs_image, "/workspace/bad")
monkeypatch.setattr(
workspace_disk_module,
"_run_debugfs",
lambda *_args, **_kwargs: "Type: fifo\nSize: 1\n",
)
with pytest.raises(RuntimeError, match="unsupported workspace disk path type"):
_debugfs_stat(rootfs_image, "/workspace/fifo")
monkeypatch.setattr(
workspace_disk_module,
"_run_debugfs",
lambda *_args, **_kwargs: "File not found by ext2_lookup",
)
with pytest.raises(RuntimeError, match="workspace disk path does not exist"):
_debugfs_ls_entries(rootfs_image, "/workspace/missing")
monkeypatch.setattr(
workspace_disk_module,
"_debugfs_stat",
lambda *_args, **_kwargs: workspace_disk_module._DebugfsStat( # noqa: SLF001
path="/workspace",
artifact_type="directory",
size_bytes=0,
),
)
monkeypatch.setattr(
workspace_disk_module,
"_debugfs_ls_entries",
lambda *_args, **_kwargs: [
workspace_disk_module._DebugfsDirEntry( # noqa: SLF001
name="special",
path="/workspace/special",
artifact_type=None,
size_bytes=0,
)
],
)
assert list_workspace_disk(rootfs_image, guest_path="/workspace", recursive=True) == []
monkeypatch.setattr(
workspace_disk_module,
"_debugfs_stat",
lambda *_args, **_kwargs: workspace_disk_module._DebugfsStat( # noqa: SLF001
path="/workspace/note.txt",
artifact_type="file",
size_bytes=12,
),
)
monkeypatch.setattr(workspace_disk_module, "_run_debugfs", lambda *_args, **_kwargs: "")
with pytest.raises(RuntimeError, match="failed to dump workspace disk file"):
read_workspace_disk_file(
rootfs_image,
guest_path="/workspace/note.txt",
max_bytes=16,
)