Add use-case recipes and smoke packs

Turn the stable workspace surface into five documented, runnable stories with a shared guest-backed smoke runner, new docs/use-cases recipes, and Make targets for cold-start validation, repro/fix loops, parallel workspaces, untrusted inspection, and review/eval workflows.

Bump the package and catalog surface to 3.6.0, update the main docs to point users from the stable workspace walkthrough into the recipe index and smoke packs, and mark the 3.6.0 roadmap milestone done.

Fix a regression uncovered by the real parallel-workspaces smoke: workspace_file_read must not bump last_activity_at. Verified with uv lock, UV_CACHE_DIR=.uv-cache make check, UV_CACHE_DIR=.uv-cache make dist-check, and USE_CASE_ENVIRONMENT=debian:12 UV_CACHE_DIR=.uv-cache make smoke-use-cases.
This commit is contained in:
Thales Maciel 2026-03-13 10:27:38 -03:00
parent 21a88312b6
commit 894706af50
22 changed files with 1310 additions and 16 deletions

View file

@ -730,8 +730,13 @@ def test_workspace_file_ops_and_patch_round_trip(tmp_path: Path) -> None:
}
]
status_before_read = manager.status_workspace(workspace_id)
read_payload = manager.read_workspace_file(workspace_id, "src/app.py")
assert read_payload["content"] == 'print("bug")\n'
status_after_read = manager.status_workspace(workspace_id)
assert float(status_after_read["last_activity_at"]) == float(
status_before_read["last_activity_at"]
)
written = manager.write_workspace_file(
workspace_id,

View file

@ -0,0 +1,509 @@
from __future__ import annotations
import shutil
import time as time_module
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, cast
import pytest
import pyro_mcp.workspace_use_case_smokes as smoke_module
from pyro_mcp.workspace_use_case_smokes import (
USE_CASE_ALL_SCENARIO,
USE_CASE_CHOICES,
USE_CASE_SCENARIOS,
WORKSPACE_USE_CASE_RECIPES,
build_arg_parser,
)
def _repo_root() -> Path:
return Path(__file__).resolve().parents[1]
@dataclass
class _FakeShell:
cwd: str = "/workspace"
buffer: str = ""
cursor: int = 0
closed: bool = False
@dataclass
class _FakeWorkspace:
workspace_id: str
root: Path
baseline_root: Path
environment: str
network_policy: str
name: str | None
labels: dict[str, str]
created_at: float
last_activity_at: float
reset_count: int = 0
snapshots: dict[str, Path] = field(default_factory=dict)
services: dict[str, dict[str, Any]] = field(default_factory=dict)
shells: dict[str, _FakeShell] = field(default_factory=dict)
class _FakePyro:
def __init__(self, root: Path) -> None:
self._root = root
self._workspaces: dict[str, _FakeWorkspace] = {}
self._workspace_counter = 0
self._shell_counter = 0
self._clock = 0.0
def _tick(self) -> float:
self._clock += 1.0
return self._clock
def _workspace_dir(self, workspace_id: str) -> Path:
return self._root / workspace_id
def _resolve_workspace(self, workspace_id: str) -> _FakeWorkspace:
return self._workspaces[workspace_id]
def _workspace_path(self, workspace: _FakeWorkspace, path: str) -> Path:
if path.startswith("/workspace/"):
relative = path.removeprefix("/workspace/")
elif path == "/workspace":
relative = ""
else:
relative = path
return workspace.root / relative
def _copy_tree_contents(self, source: Path, destination: Path) -> None:
destination.mkdir(parents=True, exist_ok=True)
for child in source.iterdir():
target = destination / child.name
if child.is_dir():
shutil.copytree(child, target)
else:
shutil.copy2(child, target)
def _reset_tree(self, destination: Path, source: Path) -> None:
if destination.exists():
shutil.rmtree(destination)
shutil.copytree(source, destination)
def _diff_changed(self, workspace: _FakeWorkspace) -> bool:
current_paths = {
path.relative_to(workspace.root)
for path in workspace.root.rglob("*")
if path.is_file()
}
baseline_paths = {
path.relative_to(workspace.baseline_root)
for path in workspace.baseline_root.rglob("*")
if path.is_file()
}
if current_paths != baseline_paths:
return True
for relative in current_paths:
if (
(workspace.root / relative).read_bytes()
!= (workspace.baseline_root / relative).read_bytes()
):
return True
return False
def create_workspace(
self,
*,
environment: str,
seed_path: Path,
name: str | None = None,
labels: dict[str, str] | None = None,
network_policy: str = "off",
) -> dict[str, Any]:
self._workspace_counter += 1
workspace_id = f"ws-{self._workspace_counter}"
workspace_dir = self._workspace_dir(workspace_id)
workspace_root = workspace_dir / "workspace"
baseline_root = workspace_dir / "baseline"
self._copy_tree_contents(Path(seed_path), workspace_root)
self._copy_tree_contents(Path(seed_path), baseline_root)
stamp = self._tick()
workspace = _FakeWorkspace(
workspace_id=workspace_id,
root=workspace_root,
baseline_root=baseline_root,
environment=environment,
network_policy=network_policy,
name=name,
labels=dict(labels or {}),
created_at=stamp,
last_activity_at=stamp,
)
workspace.snapshots["baseline"] = baseline_root
self._workspaces[workspace_id] = workspace
return {"workspace_id": workspace_id}
def delete_workspace(self, workspace_id: str) -> dict[str, Any]:
workspace = self._workspaces.pop(workspace_id)
shutil.rmtree(self._workspace_dir(workspace.workspace_id), ignore_errors=True)
return {"workspace_id": workspace_id, "deleted": True}
def status_workspace(self, workspace_id: str) -> dict[str, Any]:
workspace = self._resolve_workspace(workspace_id)
return {
"workspace_id": workspace_id,
"network_policy": workspace.network_policy,
"name": workspace.name,
"labels": dict(workspace.labels),
"last_activity_at": workspace.last_activity_at,
}
def update_workspace(self, workspace_id: str, *, labels: dict[str, str]) -> dict[str, Any]:
workspace = self._resolve_workspace(workspace_id)
workspace.labels.update(labels)
workspace.last_activity_at = self._tick()
return {"workspace_id": workspace_id, "labels": dict(workspace.labels)}
def list_workspaces(self) -> dict[str, Any]:
workspaces = sorted(
self._workspaces.values(),
key=lambda item: (-item.last_activity_at, -item.created_at, item.workspace_id),
)
return {
"count": len(workspaces),
"workspaces": [
{
"workspace_id": workspace.workspace_id,
"name": workspace.name,
"labels": dict(workspace.labels),
"environment": workspace.environment,
"state": "started",
"created_at": workspace.created_at,
"last_activity_at": workspace.last_activity_at,
"expires_at": workspace.created_at + 3600,
"command_count": 0,
"service_count": len(workspace.services),
"running_service_count": sum(
1
for service in workspace.services.values()
if service["state"] == "running"
),
}
for workspace in workspaces
],
}
def exec_workspace(self, workspace_id: str, *, command: str) -> dict[str, Any]:
workspace = self._resolve_workspace(workspace_id)
root = workspace.root
stdout = ""
stderr = ""
exit_code = 0
if command == "sh validate.sh":
(root / "validation-report.txt").write_text("validation=pass\n", encoding="utf-8")
stdout = "validated\n"
elif command == "sh check.sh":
value = (root / "message.txt").read_text(encoding="utf-8").strip()
if value == "fixed":
stdout = "fixed\n"
else:
stderr = f"expected fixed got {value}\n"
exit_code = 1
elif command == "sh -lc 'test -f .app-ready && cat service-state.txt'":
stdout = (root / "service-state.txt").read_text(encoding="utf-8")
elif "inspection-report.txt" in command:
suspicious = (root / "suspicious.sh").read_text(encoding="utf-8").splitlines()
report_lines = [
f"{index}:curl"
for index, line in enumerate(suspicious, start=1)
if "curl" in line
]
report_lines.append("network_policy=off")
(root / "inspection-report.txt").write_text(
"\n".join(report_lines) + "\n",
encoding="utf-8",
)
elif command == "sh review.sh":
artifact = (root / "artifact.txt").read_text(encoding="utf-8").strip()
if artifact == "PASS":
(root / "review-report.txt").write_text("review=pass\n", encoding="utf-8")
stdout = "review passed\n"
else:
(root / "review-report.txt").write_text("review=fail\n", encoding="utf-8")
stderr = "review failed\n"
exit_code = 1
else:
raise AssertionError(f"unexpected exec command: {command}")
workspace.last_activity_at = self._tick()
return {
"workspace_id": workspace_id,
"exit_code": exit_code,
"stdout": stdout,
"stderr": stderr,
"execution_mode": "guest_vsock",
}
def start_service(
self,
workspace_id: str,
service_name: str,
*,
command: str,
readiness: dict[str, Any] | None = None,
) -> dict[str, Any]:
workspace = self._resolve_workspace(workspace_id)
if command == "sh serve.sh":
(workspace.root / "service-state.txt").write_text("service=ready\n", encoding="utf-8")
(workspace.root / ".app-ready").write_text("", encoding="utf-8")
stdout = "service started\n"
else:
stdout = ""
workspace.services[service_name] = {
"state": "running",
"stdout": stdout,
"readiness": readiness,
}
workspace.last_activity_at = self._tick()
return {
"workspace_id": workspace_id,
"service_name": service_name,
"state": "running",
"command": command,
"cwd": "/workspace",
"execution_mode": "guest_vsock",
"readiness": readiness,
}
def logs_service(
self,
workspace_id: str,
service_name: str,
*,
tail_lines: int = 200,
) -> dict[str, Any]:
workspace = self._resolve_workspace(workspace_id)
service = workspace.services[service_name]
return {
"workspace_id": workspace_id,
"service_name": service_name,
"state": service["state"],
"stdout": service["stdout"],
"stderr": "",
"tail_lines": tail_lines,
"truncated": False,
}
def stop_service(self, workspace_id: str, service_name: str) -> dict[str, Any]:
workspace = self._resolve_workspace(workspace_id)
workspace.services[service_name]["state"] = "stopped"
workspace.last_activity_at = self._tick()
return {"workspace_id": workspace_id, "service_name": service_name, "state": "stopped"}
def list_workspace_files(
self, workspace_id: str, *, path: str = "/workspace", recursive: bool = False
) -> dict[str, Any]:
workspace = self._resolve_workspace(workspace_id)
target = self._workspace_path(workspace, path)
entries: list[dict[str, Any]] = []
iterable = target.rglob("*") if recursive else target.iterdir()
for entry in iterable:
artifact_type = "directory" if entry.is_dir() else "file"
entries.append(
{
"path": f"/workspace/{entry.relative_to(workspace.root)}",
"artifact_type": artifact_type,
"size_bytes": entry.stat().st_size if entry.is_file() else 0,
"link_target": None,
}
)
return {"workspace_id": workspace_id, "entries": entries}
def read_workspace_file(self, workspace_id: str, path: str) -> dict[str, Any]:
workspace = self._resolve_workspace(workspace_id)
target = self._workspace_path(workspace, path)
content = target.read_text(encoding="utf-8")
return {"workspace_id": workspace_id, "path": path, "content": content}
def write_workspace_file(self, workspace_id: str, path: str, *, text: str) -> dict[str, Any]:
workspace = self._resolve_workspace(workspace_id)
target = self._workspace_path(workspace, path)
target.parent.mkdir(parents=True, exist_ok=True)
target.write_text(text, encoding="utf-8")
workspace.last_activity_at = self._tick()
return {"workspace_id": workspace_id, "path": path, "bytes_written": len(text.encode())}
def apply_workspace_patch(self, workspace_id: str, *, patch: str) -> dict[str, Any]:
workspace = self._resolve_workspace(workspace_id)
target = workspace.root / "message.txt"
original = target.read_text(encoding="utf-8")
updated = original.replace("broken\n", "fixed\n")
target.write_text(updated, encoding="utf-8")
workspace.last_activity_at = self._tick()
return {"workspace_id": workspace_id, "changed": updated != original, "patch": patch}
def diff_workspace(self, workspace_id: str) -> dict[str, Any]:
workspace = self._resolve_workspace(workspace_id)
return {"workspace_id": workspace_id, "changed": self._diff_changed(workspace)}
def export_workspace(
self,
workspace_id: str,
path: str,
*,
output_path: Path,
) -> dict[str, Any]:
workspace = self._resolve_workspace(workspace_id)
source = self._workspace_path(workspace, path)
output_path = Path(output_path)
output_path.parent.mkdir(parents=True, exist_ok=True)
if source.is_dir():
shutil.copytree(source, output_path)
artifact_type = "directory"
else:
shutil.copy2(source, output_path)
artifact_type = "file"
return {
"workspace_id": workspace_id,
"workspace_path": path,
"output_path": str(output_path),
"artifact_type": artifact_type,
}
def create_snapshot(self, workspace_id: str, snapshot_name: str) -> dict[str, Any]:
workspace = self._resolve_workspace(workspace_id)
snapshot_root = self._workspace_dir(workspace_id) / f"snapshot-{snapshot_name}"
self._reset_tree(snapshot_root, workspace.root)
workspace.snapshots[snapshot_name] = snapshot_root
return {
"workspace_id": workspace_id,
"snapshot": {"snapshot_name": snapshot_name, "kind": "named"},
}
def reset_workspace(self, workspace_id: str, *, snapshot: str = "baseline") -> dict[str, Any]:
workspace = self._resolve_workspace(workspace_id)
source = workspace.snapshots[snapshot]
self._reset_tree(workspace.root, source)
workspace.reset_count += 1
workspace.last_activity_at = self._tick()
return {
"workspace_id": workspace_id,
"reset_count": workspace.reset_count,
"workspace_reset": {"snapshot_name": snapshot},
}
def open_shell(self, workspace_id: str, **_: Any) -> dict[str, Any]:
workspace = self._resolve_workspace(workspace_id)
self._shell_counter += 1
shell_id = f"shell-{self._shell_counter}"
workspace.shells[shell_id] = _FakeShell()
return {"workspace_id": workspace_id, "shell_id": shell_id, "state": "running"}
def read_shell(
self,
workspace_id: str,
shell_id: str,
*,
cursor: int = 0,
plain: bool = False,
wait_for_idle_ms: int | None = None,
) -> dict[str, Any]:
workspace = self._resolve_workspace(workspace_id)
shell = workspace.shells[shell_id]
output = shell.buffer[cursor:]
next_cursor = len(shell.buffer)
return {
"workspace_id": workspace_id,
"shell_id": shell_id,
"state": "running",
"cursor": cursor,
"next_cursor": next_cursor,
"output": output,
"plain": plain,
"wait_for_idle_ms": wait_for_idle_ms,
"truncated": False,
}
def write_shell(self, workspace_id: str, shell_id: str, *, input: str) -> dict[str, Any]:
workspace = self._resolve_workspace(workspace_id)
shell = workspace.shells[shell_id]
if input == "cat CHECKLIST.md":
shell.buffer += (workspace.root / "CHECKLIST.md").read_text(encoding="utf-8")
workspace.last_activity_at = self._tick()
return {"workspace_id": workspace_id, "shell_id": shell_id}
def close_shell(self, workspace_id: str, shell_id: str) -> dict[str, Any]:
workspace = self._resolve_workspace(workspace_id)
workspace.shells.pop(shell_id, None)
return {"workspace_id": workspace_id, "shell_id": shell_id, "closed": True}
def test_use_case_registry_has_expected_scenarios() -> None:
expected = (
"cold-start-validation",
"repro-fix-loop",
"parallel-workspaces",
"untrusted-inspection",
"review-eval",
)
assert USE_CASE_SCENARIOS == expected
assert USE_CASE_CHOICES == expected + (USE_CASE_ALL_SCENARIO,)
assert tuple(recipe.scenario for recipe in WORKSPACE_USE_CASE_RECIPES) == expected
def test_use_case_docs_and_targets_stay_aligned() -> None:
repo_root = _repo_root()
index_text = (repo_root / "docs" / "use-cases" / "README.md").read_text(encoding="utf-8")
makefile_text = (repo_root / "Makefile").read_text(encoding="utf-8")
for recipe in WORKSPACE_USE_CASE_RECIPES:
assert (repo_root / recipe.doc_path).is_file(), recipe.doc_path
recipe_text = (repo_root / recipe.doc_path).read_text(encoding="utf-8")
assert recipe.smoke_target in index_text
assert recipe.doc_path.rsplit("/", 1)[-1] in index_text
assert recipe.profile in recipe_text
assert recipe.smoke_target in recipe_text
assert f"{recipe.smoke_target}:" in makefile_text
def test_use_case_parser_exposes_all_scenarios() -> None:
parser = build_arg_parser()
scenario_action = next(
action for action in parser._actions if getattr(action, "dest", None) == "scenario"
)
choices = cast(tuple[Any, ...], scenario_action.choices)
assert tuple(choices) == USE_CASE_CHOICES
def test_run_all_use_case_scenarios_with_fake_pyro(
monkeypatch: pytest.MonkeyPatch,
tmp_path: Path,
) -> None:
fake_pyro = _FakePyro(tmp_path / "fake-pyro")
monkeypatch.setattr(smoke_module, "Pyro", lambda: fake_pyro)
monkeypatch.setattr(time_module, "sleep", lambda _seconds: None)
smoke_module.run_workspace_use_case_scenario("all")
assert fake_pyro._workspaces == {}
def test_run_workspace_use_case_scenario_rejects_unknown() -> None:
with pytest.raises(ValueError, match="unknown use-case scenario"):
smoke_module.run_workspace_use_case_scenario("not-a-scenario")
def test_main_runs_selected_scenario(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None:
fake_pyro = _FakePyro(tmp_path / "fake-pyro-main")
monkeypatch.setattr(smoke_module, "Pyro", lambda: fake_pyro)
monkeypatch.setattr(time_module, "sleep", lambda _seconds: None)
monkeypatch.setattr(
"sys.argv",
[
"workspace_use_case_smoke",
"--scenario",
"repro-fix-loop",
"--environment",
"debian:12",
],
)
smoke_module.main()
assert fake_pyro._workspaces == {}