pyro-mcp/tests/test_workspace_use_case_smokes.py
Thales Maciel 79a7d71d3b Align use-case smokes with canonical workspace recipes
The 3.10.0 milestone was about making the advertised smoke pack trustworthy enough to act like a real release gate. The main drift was in the repro-plus-fix scenario: the recipe docs were SDK-first, but the smoke still shelled out to CLI patch apply and asserted a human summary string.\n\nSwitch the smoke runner to use the structured SDK patch flow directly, remove the harness-only CLI dependency, and tighten the fake smoke tests so they prove the same structured path the docs recommend. This keeps smoke failures tied to real user-facing regressions instead of human-output formatting drift.\n\nPromote make smoke-use-cases as the trustworthy guest-backed verification path in the top-level docs, bump the release surface to 3.10.0, and mark the roadmap milestone done.\n\nValidation:\n- uv lock\n- UV_CACHE_DIR=.uv-cache uv run pytest --no-cov tests/test_workspace_use_case_smokes.py\n- UV_CACHE_DIR=.uv-cache make check\n- UV_CACHE_DIR=.uv-cache make dist-check\n- USE_CASE_ENVIRONMENT=debian:12 UV_CACHE_DIR=.uv-cache make smoke-use-cases
2026-03-13 13:30:52 -03:00

528 lines
20 KiB
Python

from __future__ import annotations
import shutil
import time as time_module
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, cast
import pytest
import pyro_mcp.workspace_use_case_smokes as smoke_module
from pyro_mcp.workspace_use_case_smokes import (
USE_CASE_ALL_SCENARIO,
USE_CASE_CHOICES,
USE_CASE_SCENARIOS,
WORKSPACE_USE_CASE_RECIPES,
build_arg_parser,
)
def _repo_root() -> Path:
return Path(__file__).resolve().parents[1]
@dataclass
class _FakeShell:
cwd: str = "/workspace"
buffer: str = ""
cursor: int = 0
closed: bool = False
@dataclass
class _FakeWorkspace:
workspace_id: str
root: Path
baseline_root: Path
environment: str
network_policy: str
name: str | None
labels: dict[str, str]
created_at: float
last_activity_at: float
reset_count: int = 0
snapshots: dict[str, Path] = field(default_factory=dict)
services: dict[str, dict[str, Any]] = field(default_factory=dict)
shells: dict[str, _FakeShell] = field(default_factory=dict)
class _FakePyro:
def __init__(self, root: Path) -> None:
self._root = root
self._workspaces: dict[str, _FakeWorkspace] = {}
self._workspace_counter = 0
self._shell_counter = 0
self._clock = 0.0
self.patch_apply_count = 0
def _tick(self) -> float:
self._clock += 1.0
return self._clock
def _workspace_dir(self, workspace_id: str) -> Path:
return self._root / workspace_id
def _resolve_workspace(self, workspace_id: str) -> _FakeWorkspace:
return self._workspaces[workspace_id]
def _workspace_path(self, workspace: _FakeWorkspace, path: str) -> Path:
if path.startswith("/workspace/"):
relative = path.removeprefix("/workspace/")
elif path == "/workspace":
relative = ""
else:
relative = path
return workspace.root / relative
def _copy_tree_contents(self, source: Path, destination: Path) -> None:
destination.mkdir(parents=True, exist_ok=True)
for child in source.iterdir():
target = destination / child.name
if child.is_dir():
shutil.copytree(child, target)
else:
shutil.copy2(child, target)
def _reset_tree(self, destination: Path, source: Path) -> None:
if destination.exists():
shutil.rmtree(destination)
shutil.copytree(source, destination)
def _diff_changed(self, workspace: _FakeWorkspace) -> bool:
current_paths = {
path.relative_to(workspace.root)
for path in workspace.root.rglob("*")
if path.is_file()
}
baseline_paths = {
path.relative_to(workspace.baseline_root)
for path in workspace.baseline_root.rglob("*")
if path.is_file()
}
if current_paths != baseline_paths:
return True
for relative in current_paths:
if (
(workspace.root / relative).read_bytes()
!= (workspace.baseline_root / relative).read_bytes()
):
return True
return False
def create_workspace(
self,
*,
environment: str,
seed_path: Path,
name: str | None = None,
labels: dict[str, str] | None = None,
network_policy: str = "off",
) -> dict[str, Any]:
self._workspace_counter += 1
workspace_id = f"ws-{self._workspace_counter}"
workspace_dir = self._workspace_dir(workspace_id)
workspace_root = workspace_dir / "workspace"
baseline_root = workspace_dir / "baseline"
self._copy_tree_contents(Path(seed_path), workspace_root)
self._copy_tree_contents(Path(seed_path), baseline_root)
stamp = self._tick()
workspace = _FakeWorkspace(
workspace_id=workspace_id,
root=workspace_root,
baseline_root=baseline_root,
environment=environment,
network_policy=network_policy,
name=name,
labels=dict(labels or {}),
created_at=stamp,
last_activity_at=stamp,
)
workspace.snapshots["baseline"] = baseline_root
self._workspaces[workspace_id] = workspace
return {"workspace_id": workspace_id}
def delete_workspace(self, workspace_id: str) -> dict[str, Any]:
workspace = self._workspaces.pop(workspace_id)
shutil.rmtree(self._workspace_dir(workspace.workspace_id), ignore_errors=True)
return {"workspace_id": workspace_id, "deleted": True}
def status_workspace(self, workspace_id: str) -> dict[str, Any]:
workspace = self._resolve_workspace(workspace_id)
return {
"workspace_id": workspace_id,
"network_policy": workspace.network_policy,
"name": workspace.name,
"labels": dict(workspace.labels),
"last_activity_at": workspace.last_activity_at,
}
def update_workspace(self, workspace_id: str, *, labels: dict[str, str]) -> dict[str, Any]:
workspace = self._resolve_workspace(workspace_id)
workspace.labels.update(labels)
workspace.last_activity_at = self._tick()
return {"workspace_id": workspace_id, "labels": dict(workspace.labels)}
def list_workspaces(self) -> dict[str, Any]:
workspaces = sorted(
self._workspaces.values(),
key=lambda item: (-item.last_activity_at, -item.created_at, item.workspace_id),
)
return {
"count": len(workspaces),
"workspaces": [
{
"workspace_id": workspace.workspace_id,
"name": workspace.name,
"labels": dict(workspace.labels),
"environment": workspace.environment,
"state": "started",
"created_at": workspace.created_at,
"last_activity_at": workspace.last_activity_at,
"expires_at": workspace.created_at + 3600,
"command_count": 0,
"service_count": len(workspace.services),
"running_service_count": sum(
1
for service in workspace.services.values()
if service["state"] == "running"
),
}
for workspace in workspaces
],
}
def exec_workspace(self, workspace_id: str, *, command: str) -> dict[str, Any]:
workspace = self._resolve_workspace(workspace_id)
root = workspace.root
stdout = ""
stderr = ""
exit_code = 0
if command == "sh validate.sh":
(root / "validation-report.txt").write_text("validation=pass\n", encoding="utf-8")
stdout = "validated\n"
elif command == "sh check.sh":
value = (root / "message.txt").read_text(encoding="utf-8").strip()
if value == "fixed":
stdout = "fixed\n"
else:
stderr = f"expected fixed got {value}\n"
exit_code = 1
elif command == "sh -lc 'test -f .app-ready && cat service-state.txt'":
stdout = (root / "service-state.txt").read_text(encoding="utf-8")
elif "inspection-report.txt" in command:
suspicious = (root / "suspicious.sh").read_text(encoding="utf-8").splitlines()
report_lines = [
f"{index}:curl"
for index, line in enumerate(suspicious, start=1)
if "curl" in line
]
report_lines.append("network_policy=off")
(root / "inspection-report.txt").write_text(
"\n".join(report_lines) + "\n",
encoding="utf-8",
)
elif command == "sh review.sh":
artifact = (root / "artifact.txt").read_text(encoding="utf-8").strip()
if artifact == "PASS":
(root / "review-report.txt").write_text("review=pass\n", encoding="utf-8")
stdout = "review passed\n"
else:
(root / "review-report.txt").write_text("review=fail\n", encoding="utf-8")
stderr = "review failed\n"
exit_code = 1
else:
raise AssertionError(f"unexpected exec command: {command}")
workspace.last_activity_at = self._tick()
return {
"workspace_id": workspace_id,
"exit_code": exit_code,
"stdout": stdout,
"stderr": stderr,
"execution_mode": "guest_vsock",
}
def start_service(
self,
workspace_id: str,
service_name: str,
*,
command: str,
readiness: dict[str, Any] | None = None,
) -> dict[str, Any]:
workspace = self._resolve_workspace(workspace_id)
if command == "sh serve.sh":
(workspace.root / "service-state.txt").write_text("service=ready\n", encoding="utf-8")
(workspace.root / ".app-ready").write_text("", encoding="utf-8")
stdout = "service started\n"
else:
stdout = ""
workspace.services[service_name] = {
"state": "running",
"stdout": stdout,
"readiness": readiness,
}
workspace.last_activity_at = self._tick()
return {
"workspace_id": workspace_id,
"service_name": service_name,
"state": "running",
"command": command,
"cwd": "/workspace",
"execution_mode": "guest_vsock",
"readiness": readiness,
}
def logs_service(
self,
workspace_id: str,
service_name: str,
*,
tail_lines: int = 200,
) -> dict[str, Any]:
workspace = self._resolve_workspace(workspace_id)
service = workspace.services[service_name]
return {
"workspace_id": workspace_id,
"service_name": service_name,
"state": service["state"],
"stdout": service["stdout"],
"stderr": "",
"tail_lines": tail_lines,
"truncated": False,
}
def stop_service(self, workspace_id: str, service_name: str) -> dict[str, Any]:
workspace = self._resolve_workspace(workspace_id)
workspace.services[service_name]["state"] = "stopped"
workspace.last_activity_at = self._tick()
return {"workspace_id": workspace_id, "service_name": service_name, "state": "stopped"}
def list_workspace_files(
self, workspace_id: str, *, path: str = "/workspace", recursive: bool = False
) -> dict[str, Any]:
workspace = self._resolve_workspace(workspace_id)
target = self._workspace_path(workspace, path)
entries: list[dict[str, Any]] = []
iterable = target.rglob("*") if recursive else target.iterdir()
for entry in iterable:
artifact_type = "directory" if entry.is_dir() else "file"
entries.append(
{
"path": f"/workspace/{entry.relative_to(workspace.root)}",
"artifact_type": artifact_type,
"size_bytes": entry.stat().st_size if entry.is_file() else 0,
"link_target": None,
}
)
return {"workspace_id": workspace_id, "entries": entries}
def read_workspace_file(self, workspace_id: str, path: str) -> dict[str, Any]:
workspace = self._resolve_workspace(workspace_id)
target = self._workspace_path(workspace, path)
content = target.read_text(encoding="utf-8")
return {"workspace_id": workspace_id, "path": path, "content": content}
def write_workspace_file(self, workspace_id: str, path: str, *, text: str) -> dict[str, Any]:
workspace = self._resolve_workspace(workspace_id)
target = self._workspace_path(workspace, path)
target.parent.mkdir(parents=True, exist_ok=True)
target.write_text(text, encoding="utf-8")
workspace.last_activity_at = self._tick()
return {"workspace_id": workspace_id, "path": path, "bytes_written": len(text.encode())}
def apply_workspace_patch(self, workspace_id: str, *, patch: str) -> dict[str, Any]:
workspace = self._resolve_workspace(workspace_id)
target = workspace.root / "message.txt"
original = target.read_text(encoding="utf-8")
updated = original.replace("broken\n", "fixed\n")
target.write_text(updated, encoding="utf-8")
self.patch_apply_count += 1
workspace.last_activity_at = self._tick()
return {"workspace_id": workspace_id, "changed": updated != original, "patch": patch}
def diff_workspace(self, workspace_id: str) -> dict[str, Any]:
workspace = self._resolve_workspace(workspace_id)
return {"workspace_id": workspace_id, "changed": self._diff_changed(workspace)}
def export_workspace(
self,
workspace_id: str,
path: str,
*,
output_path: Path,
) -> dict[str, Any]:
workspace = self._resolve_workspace(workspace_id)
source = self._workspace_path(workspace, path)
output_path = Path(output_path)
output_path.parent.mkdir(parents=True, exist_ok=True)
if source.is_dir():
shutil.copytree(source, output_path)
artifact_type = "directory"
else:
shutil.copy2(source, output_path)
artifact_type = "file"
return {
"workspace_id": workspace_id,
"workspace_path": path,
"output_path": str(output_path),
"artifact_type": artifact_type,
}
def create_snapshot(self, workspace_id: str, snapshot_name: str) -> dict[str, Any]:
workspace = self._resolve_workspace(workspace_id)
snapshot_root = self._workspace_dir(workspace_id) / f"snapshot-{snapshot_name}"
self._reset_tree(snapshot_root, workspace.root)
workspace.snapshots[snapshot_name] = snapshot_root
return {
"workspace_id": workspace_id,
"snapshot": {"snapshot_name": snapshot_name, "kind": "named"},
}
def reset_workspace(self, workspace_id: str, *, snapshot: str = "baseline") -> dict[str, Any]:
workspace = self._resolve_workspace(workspace_id)
source = workspace.snapshots[snapshot]
self._reset_tree(workspace.root, source)
workspace.reset_count += 1
workspace.last_activity_at = self._tick()
return {
"workspace_id": workspace_id,
"reset_count": workspace.reset_count,
"workspace_reset": {"snapshot_name": snapshot},
}
def open_shell(self, workspace_id: str, **_: Any) -> dict[str, Any]:
workspace = self._resolve_workspace(workspace_id)
self._shell_counter += 1
shell_id = f"shell-{self._shell_counter}"
workspace.shells[shell_id] = _FakeShell()
return {"workspace_id": workspace_id, "shell_id": shell_id, "state": "running"}
def read_shell(
self,
workspace_id: str,
shell_id: str,
*,
cursor: int = 0,
plain: bool = False,
wait_for_idle_ms: int | None = None,
) -> dict[str, Any]:
workspace = self._resolve_workspace(workspace_id)
shell = workspace.shells[shell_id]
output = shell.buffer[cursor:]
next_cursor = len(shell.buffer)
return {
"workspace_id": workspace_id,
"shell_id": shell_id,
"state": "running",
"cursor": cursor,
"next_cursor": next_cursor,
"output": output,
"plain": plain,
"wait_for_idle_ms": wait_for_idle_ms,
"truncated": False,
}
def write_shell(self, workspace_id: str, shell_id: str, *, input: str) -> dict[str, Any]:
workspace = self._resolve_workspace(workspace_id)
shell = workspace.shells[shell_id]
if input == "cat CHECKLIST.md":
shell.buffer += (workspace.root / "CHECKLIST.md").read_text(encoding="utf-8")
workspace.last_activity_at = self._tick()
return {"workspace_id": workspace_id, "shell_id": shell_id}
def close_shell(self, workspace_id: str, shell_id: str) -> dict[str, Any]:
workspace = self._resolve_workspace(workspace_id)
workspace.shells.pop(shell_id, None)
return {"workspace_id": workspace_id, "shell_id": shell_id, "closed": True}
def test_use_case_registry_has_expected_scenarios() -> None:
expected = (
"cold-start-validation",
"repro-fix-loop",
"parallel-workspaces",
"untrusted-inspection",
"review-eval",
)
assert USE_CASE_SCENARIOS == expected
assert USE_CASE_CHOICES == expected + (USE_CASE_ALL_SCENARIO,)
assert tuple(recipe.scenario for recipe in WORKSPACE_USE_CASE_RECIPES) == expected
def test_use_case_docs_and_targets_stay_aligned() -> None:
repo_root = _repo_root()
index_text = (repo_root / "docs" / "use-cases" / "README.md").read_text(encoding="utf-8")
makefile_text = (repo_root / "Makefile").read_text(encoding="utf-8")
assert "trustworthy" in index_text
assert "guest-backed verification path" in index_text
for recipe in WORKSPACE_USE_CASE_RECIPES:
assert (repo_root / recipe.doc_path).is_file(), recipe.doc_path
recipe_text = (repo_root / recipe.doc_path).read_text(encoding="utf-8")
assert recipe.smoke_target in index_text
assert recipe.doc_path.rsplit("/", 1)[-1] in index_text
assert recipe.profile in recipe_text
assert recipe.smoke_target in recipe_text
assert f"{recipe.smoke_target}:" in makefile_text
def test_use_case_parser_exposes_all_scenarios() -> None:
parser = build_arg_parser()
scenario_action = next(
action for action in parser._actions if getattr(action, "dest", None) == "scenario"
)
choices = cast(tuple[Any, ...], scenario_action.choices)
assert tuple(choices) == USE_CASE_CHOICES
def test_run_all_use_case_scenarios_with_fake_pyro(
monkeypatch: pytest.MonkeyPatch,
tmp_path: Path,
) -> None:
fake_pyro = _FakePyro(tmp_path / "fake-pyro")
monkeypatch.setattr(smoke_module, "Pyro", lambda: fake_pyro)
monkeypatch.setattr(time_module, "sleep", lambda _seconds: None)
smoke_module.run_workspace_use_case_scenario("all")
assert fake_pyro._workspaces == {}
assert fake_pyro.patch_apply_count == 1
def test_run_workspace_use_case_scenario_rejects_unknown() -> None:
with pytest.raises(ValueError, match="unknown use-case scenario"):
smoke_module.run_workspace_use_case_scenario("not-a-scenario")
def test_main_runs_selected_scenario(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None:
fake_pyro = _FakePyro(tmp_path / "fake-pyro-main")
monkeypatch.setattr(smoke_module, "Pyro", lambda: fake_pyro)
monkeypatch.setattr(time_module, "sleep", lambda _seconds: None)
monkeypatch.setattr(
"sys.argv",
[
"workspace_use_case_smoke",
"--scenario",
"repro-fix-loop",
"--environment",
"debian:12",
],
)
smoke_module.main()
assert fake_pyro._workspaces == {}
assert fake_pyro.patch_apply_count == 1
def test_repro_fix_scenario_uses_structured_patch_flow(
monkeypatch: pytest.MonkeyPatch,
tmp_path: Path,
) -> None:
fake_pyro = _FakePyro(tmp_path / "fake-pyro-repro-fix")
monkeypatch.setattr(smoke_module, "Pyro", lambda: fake_pyro)
monkeypatch.setattr(time_module, "sleep", lambda _seconds: None)
smoke_module.run_workspace_use_case_scenario("repro-fix-loop")
assert fake_pyro.patch_apply_count == 1