Add a dedicated pyro host surface for supported chat hosts so Claude Code, Codex, and OpenCode users can connect or repair the canonical MCP setup without hand-writing raw commands or config edits. Implement the shared host helper layer and wire it through the CLI with connect, print-config, doctor, and repair, all generated from the same canonical pyro mcp serve command shape and project-source flags. Update the docs, public contract, examples, changelog, and roadmap so the helper flow becomes the primary onramp while raw host-specific commands remain as reference material. Harden the verification path that this milestone exposed: temp git repos in tests now disable commit signing, socket-based port tests skip cleanly when the sandbox forbids those primitives, and make test still uses multiple cores by default but caps xdist workers to a stable value so make check stays fast and deterministic here. Validation: - uv lock - UV_OFFLINE=1 UV_CACHE_DIR=.uv-cache make check - UV_OFFLINE=1 UV_CACHE_DIR=.uv-cache make dist-check
709 lines
25 KiB
Python
709 lines
25 KiB
Python
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import subprocess
|
|
from pathlib import Path
|
|
from typing import Any, cast
|
|
|
|
import pytest
|
|
|
|
import pyro_mcp.server as server_module
|
|
from pyro_mcp.contract import (
|
|
PUBLIC_MCP_VM_RUN_PROFILE_TOOLS,
|
|
PUBLIC_MCP_WORKSPACE_CORE_PROFILE_TOOLS,
|
|
)
|
|
from pyro_mcp.server import create_server
|
|
from pyro_mcp.vm_manager import VmManager
|
|
from pyro_mcp.vm_network import TapNetworkManager
|
|
|
|
|
|
def _git(repo: Path, *args: str) -> str:
|
|
result = subprocess.run( # noqa: S603
|
|
["git", "-c", "commit.gpgsign=false", *args],
|
|
cwd=repo,
|
|
check=True,
|
|
capture_output=True,
|
|
text=True,
|
|
)
|
|
return result.stdout.strip()
|
|
|
|
|
|
def _make_repo(root: Path, *, content: str = "hello\n") -> Path:
|
|
root.mkdir()
|
|
_git(root, "init")
|
|
_git(root, "config", "user.name", "Pyro Tests")
|
|
_git(root, "config", "user.email", "pyro-tests@example.com")
|
|
(root / "note.txt").write_text(content, encoding="utf-8")
|
|
_git(root, "add", "note.txt")
|
|
_git(root, "commit", "-m", "init")
|
|
return root
|
|
|
|
|
|
def test_create_server_registers_vm_tools(tmp_path: Path) -> None:
|
|
manager = VmManager(
|
|
backend_name="mock",
|
|
base_dir=tmp_path / "vms",
|
|
network_manager=TapNetworkManager(enabled=False),
|
|
)
|
|
|
|
async def _run() -> list[str]:
|
|
server = create_server(manager=manager)
|
|
tools = await server.list_tools()
|
|
return sorted(tool.name for tool in tools)
|
|
|
|
tool_names = asyncio.run(_run())
|
|
assert tuple(tool_names) == tuple(sorted(PUBLIC_MCP_WORKSPACE_CORE_PROFILE_TOOLS))
|
|
|
|
|
|
def test_create_server_vm_run_profile_registers_only_vm_run(tmp_path: Path) -> None:
|
|
manager = VmManager(
|
|
backend_name="mock",
|
|
base_dir=tmp_path / "vms",
|
|
network_manager=TapNetworkManager(enabled=False),
|
|
)
|
|
|
|
async def _run() -> list[str]:
|
|
server = create_server(manager=manager, profile="vm-run")
|
|
tools = await server.list_tools()
|
|
return sorted(tool.name for tool in tools)
|
|
|
|
assert tuple(asyncio.run(_run())) == PUBLIC_MCP_VM_RUN_PROFILE_TOOLS
|
|
|
|
|
|
def test_create_server_workspace_core_profile_registers_expected_tools(tmp_path: Path) -> None:
|
|
manager = VmManager(
|
|
backend_name="mock",
|
|
base_dir=tmp_path / "vms",
|
|
network_manager=TapNetworkManager(enabled=False),
|
|
)
|
|
|
|
async def _run() -> list[str]:
|
|
server = create_server(manager=manager, profile="workspace-core")
|
|
tools = await server.list_tools()
|
|
return sorted(tool.name for tool in tools)
|
|
|
|
assert tuple(asyncio.run(_run())) == tuple(sorted(PUBLIC_MCP_WORKSPACE_CORE_PROFILE_TOOLS))
|
|
|
|
|
|
def test_create_server_workspace_create_description_mentions_project_source(tmp_path: Path) -> None:
|
|
manager = VmManager(
|
|
backend_name="mock",
|
|
base_dir=tmp_path / "vms",
|
|
network_manager=TapNetworkManager(enabled=False),
|
|
)
|
|
repo = _make_repo(tmp_path / "repo")
|
|
|
|
async def _run() -> dict[str, Any]:
|
|
server = create_server(manager=manager, project_path=repo)
|
|
tools = await server.list_tools()
|
|
tool_map = {tool.name: tool.model_dump() for tool in tools}
|
|
return tool_map["workspace_create"]
|
|
|
|
workspace_create = asyncio.run(_run())
|
|
description = cast(str, workspace_create["description"])
|
|
assert "If `seed_path` is omitted" in description
|
|
assert str(repo.resolve()) in description
|
|
|
|
|
|
def test_create_server_project_path_seeds_workspace_when_seed_path_is_omitted(
|
|
tmp_path: Path,
|
|
) -> None:
|
|
manager = VmManager(
|
|
backend_name="mock",
|
|
base_dir=tmp_path / "vms",
|
|
network_manager=TapNetworkManager(enabled=False),
|
|
)
|
|
repo = _make_repo(tmp_path / "repo", content="project-aware\n")
|
|
|
|
def _extract_structured(raw_result: object) -> dict[str, Any]:
|
|
if not isinstance(raw_result, tuple) or len(raw_result) != 2:
|
|
raise TypeError("unexpected call_tool result shape")
|
|
_, structured = raw_result
|
|
if not isinstance(structured, dict):
|
|
raise TypeError("expected structured dictionary result")
|
|
return cast(dict[str, Any], structured)
|
|
|
|
async def _run() -> tuple[dict[str, Any], dict[str, Any]]:
|
|
server = create_server(manager=manager, project_path=repo)
|
|
created = _extract_structured(
|
|
await server.call_tool(
|
|
"workspace_create",
|
|
{
|
|
"environment": "debian:12-base",
|
|
"allow_host_compat": True,
|
|
},
|
|
)
|
|
)
|
|
executed = _extract_structured(
|
|
await server.call_tool(
|
|
"workspace_exec",
|
|
{
|
|
"workspace_id": created["workspace_id"],
|
|
"command": "cat note.txt",
|
|
},
|
|
)
|
|
)
|
|
return created, executed
|
|
|
|
created, executed = asyncio.run(_run())
|
|
assert created["workspace_seed"]["mode"] == "directory"
|
|
assert created["workspace_seed"]["seed_path"] == str(repo.resolve())
|
|
assert created["workspace_seed"]["origin_kind"] == "project_path"
|
|
assert created["workspace_seed"]["origin_ref"] == str(repo.resolve())
|
|
assert executed["stdout"] == "project-aware\n"
|
|
|
|
|
|
def test_create_server_repo_url_seeds_workspace_when_seed_path_is_omitted(tmp_path: Path) -> None:
|
|
manager = VmManager(
|
|
backend_name="mock",
|
|
base_dir=tmp_path / "vms",
|
|
network_manager=TapNetworkManager(enabled=False),
|
|
)
|
|
repo = _make_repo(tmp_path / "repo", content="committed\n")
|
|
(repo / "note.txt").write_text("dirty\n", encoding="utf-8")
|
|
|
|
def _extract_structured(raw_result: object) -> dict[str, Any]:
|
|
if not isinstance(raw_result, tuple) or len(raw_result) != 2:
|
|
raise TypeError("unexpected call_tool result shape")
|
|
_, structured = raw_result
|
|
if not isinstance(structured, dict):
|
|
raise TypeError("expected structured dictionary result")
|
|
return cast(dict[str, Any], structured)
|
|
|
|
async def _run() -> tuple[dict[str, Any], dict[str, Any]]:
|
|
server = create_server(manager=manager, repo_url=str(repo.resolve()))
|
|
created = _extract_structured(
|
|
await server.call_tool(
|
|
"workspace_create",
|
|
{
|
|
"environment": "debian:12-base",
|
|
"allow_host_compat": True,
|
|
},
|
|
)
|
|
)
|
|
executed = _extract_structured(
|
|
await server.call_tool(
|
|
"workspace_exec",
|
|
{
|
|
"workspace_id": created["workspace_id"],
|
|
"command": "cat note.txt",
|
|
},
|
|
)
|
|
)
|
|
return created, executed
|
|
|
|
created, executed = asyncio.run(_run())
|
|
assert created["workspace_seed"]["mode"] == "directory"
|
|
assert created["workspace_seed"]["seed_path"] is None
|
|
assert created["workspace_seed"]["origin_kind"] == "repo_url"
|
|
assert created["workspace_seed"]["origin_ref"] == str(repo.resolve())
|
|
assert executed["stdout"] == "committed\n"
|
|
|
|
|
|
def test_vm_run_round_trip(tmp_path: Path) -> None:
|
|
manager = VmManager(
|
|
backend_name="mock",
|
|
base_dir=tmp_path / "vms",
|
|
network_manager=TapNetworkManager(enabled=False),
|
|
)
|
|
|
|
def _extract_structured(raw_result: object) -> dict[str, Any]:
|
|
if not isinstance(raw_result, tuple) or len(raw_result) != 2:
|
|
raise TypeError("unexpected call_tool result shape")
|
|
_, structured = raw_result
|
|
if not isinstance(structured, dict):
|
|
raise TypeError("expected structured dictionary result")
|
|
return cast(dict[str, Any], structured)
|
|
|
|
async def _run() -> dict[str, Any]:
|
|
server = create_server(manager=manager)
|
|
executed = _extract_structured(
|
|
await server.call_tool(
|
|
"vm_run",
|
|
{
|
|
"environment": "debian:12",
|
|
"command": "printf 'git version 2.0\\n'",
|
|
"ttl_seconds": 600,
|
|
"network": False,
|
|
"allow_host_compat": True,
|
|
},
|
|
)
|
|
)
|
|
return executed
|
|
|
|
executed = asyncio.run(_run())
|
|
assert int(executed["exit_code"]) == 0
|
|
assert "git version" in str(executed["stdout"])
|
|
|
|
|
|
def test_vm_tools_status_stop_delete_and_reap(tmp_path: Path) -> None:
|
|
manager = VmManager(
|
|
backend_name="mock",
|
|
base_dir=tmp_path / "vms",
|
|
network_manager=TapNetworkManager(enabled=False),
|
|
)
|
|
manager.MIN_TTL_SECONDS = 1
|
|
|
|
def _extract_structured(raw_result: object) -> dict[str, Any]:
|
|
if not isinstance(raw_result, tuple) or len(raw_result) != 2:
|
|
raise TypeError("unexpected call_tool result shape")
|
|
_, structured = raw_result
|
|
if not isinstance(structured, dict):
|
|
raise TypeError("expected structured dictionary result")
|
|
return cast(dict[str, Any], structured)
|
|
|
|
async def _run() -> tuple[
|
|
dict[str, Any],
|
|
dict[str, Any],
|
|
dict[str, Any],
|
|
dict[str, Any],
|
|
list[dict[str, object]],
|
|
dict[str, Any],
|
|
]:
|
|
server = create_server(manager=manager, profile="workspace-full")
|
|
environments_raw = await server.call_tool("vm_list_environments", {})
|
|
if not isinstance(environments_raw, tuple) or len(environments_raw) != 2:
|
|
raise TypeError("unexpected environments result")
|
|
_, environments_structured = environments_raw
|
|
if not isinstance(environments_structured, dict):
|
|
raise TypeError("environments tool should return a dictionary")
|
|
raw_environments = environments_structured.get("result")
|
|
if not isinstance(raw_environments, list):
|
|
raise TypeError("environments tool did not contain a result list")
|
|
created = _extract_structured(
|
|
await server.call_tool(
|
|
"vm_create",
|
|
{
|
|
"environment": "debian:12-base",
|
|
"ttl_seconds": 600,
|
|
"allow_host_compat": True,
|
|
},
|
|
)
|
|
)
|
|
vm_id = str(created["vm_id"])
|
|
await server.call_tool("vm_start", {"vm_id": vm_id})
|
|
status = _extract_structured(await server.call_tool("vm_status", {"vm_id": vm_id}))
|
|
network = _extract_structured(await server.call_tool("vm_network_info", {"vm_id": vm_id}))
|
|
stopped = _extract_structured(await server.call_tool("vm_stop", {"vm_id": vm_id}))
|
|
deleted = _extract_structured(await server.call_tool("vm_delete", {"vm_id": vm_id}))
|
|
|
|
expiring = _extract_structured(
|
|
await server.call_tool(
|
|
"vm_create",
|
|
{
|
|
"environment": "debian:12-base",
|
|
"ttl_seconds": 1,
|
|
"allow_host_compat": True,
|
|
},
|
|
)
|
|
)
|
|
expiring_id = str(expiring["vm_id"])
|
|
manager._instances[expiring_id].expires_at = 0.0 # noqa: SLF001
|
|
reaped = _extract_structured(await server.call_tool("vm_reap_expired", {}))
|
|
return (
|
|
status,
|
|
network,
|
|
stopped,
|
|
deleted,
|
|
cast(list[dict[str, object]], raw_environments),
|
|
reaped,
|
|
)
|
|
|
|
status, network, stopped, deleted, environments, reaped = asyncio.run(_run())
|
|
assert status["state"] == "started"
|
|
assert network["network_enabled"] is False
|
|
assert stopped["state"] == "stopped"
|
|
assert bool(deleted["deleted"]) is True
|
|
assert environments[0]["name"] == "debian:12"
|
|
assert int(reaped["count"]) == 1
|
|
|
|
|
|
def test_server_main_runs_stdio_transport(monkeypatch: pytest.MonkeyPatch) -> None:
|
|
called: dict[str, str] = {}
|
|
|
|
class StubServer:
|
|
def run(self, transport: str) -> None:
|
|
called["transport"] = transport
|
|
|
|
monkeypatch.setattr(server_module, "create_server", lambda: StubServer())
|
|
server_module.main()
|
|
assert called == {"transport": "stdio"}
|
|
|
|
|
|
def test_workspace_core_profile_round_trip(tmp_path: Path) -> None:
|
|
manager = VmManager(
|
|
backend_name="mock",
|
|
base_dir=tmp_path / "vms",
|
|
network_manager=TapNetworkManager(enabled=False),
|
|
)
|
|
source_dir = tmp_path / "seed"
|
|
source_dir.mkdir()
|
|
(source_dir / "note.txt").write_text("old\n", encoding="utf-8")
|
|
|
|
def _extract_structured(raw_result: object) -> dict[str, Any]:
|
|
if not isinstance(raw_result, tuple) or len(raw_result) != 2:
|
|
raise TypeError("unexpected call_tool result shape")
|
|
_, structured = raw_result
|
|
if not isinstance(structured, dict):
|
|
raise TypeError("expected structured dictionary result")
|
|
return cast(dict[str, Any], structured)
|
|
|
|
async def _run() -> tuple[dict[str, Any], ...]:
|
|
server = create_server(manager=manager, profile="workspace-core")
|
|
created = _extract_structured(
|
|
await server.call_tool(
|
|
"workspace_create",
|
|
{
|
|
"environment": "debian:12-base",
|
|
"allow_host_compat": True,
|
|
"seed_path": str(source_dir),
|
|
"name": "chat-loop",
|
|
"labels": {"issue": "123"},
|
|
},
|
|
)
|
|
)
|
|
workspace_id = str(created["workspace_id"])
|
|
written = _extract_structured(
|
|
await server.call_tool(
|
|
"workspace_file_write",
|
|
{
|
|
"workspace_id": workspace_id,
|
|
"path": "note.txt",
|
|
"text": "fixed\n",
|
|
},
|
|
)
|
|
)
|
|
executed = _extract_structured(
|
|
await server.call_tool(
|
|
"workspace_exec",
|
|
{
|
|
"workspace_id": workspace_id,
|
|
"command": "cat note.txt",
|
|
},
|
|
)
|
|
)
|
|
diffed = _extract_structured(
|
|
await server.call_tool("workspace_diff", {"workspace_id": workspace_id})
|
|
)
|
|
export_path = tmp_path / "exported-note.txt"
|
|
exported = _extract_structured(
|
|
await server.call_tool(
|
|
"workspace_export",
|
|
{
|
|
"workspace_id": workspace_id,
|
|
"path": "note.txt",
|
|
"output_path": str(export_path),
|
|
},
|
|
)
|
|
)
|
|
reset = _extract_structured(
|
|
await server.call_tool("workspace_reset", {"workspace_id": workspace_id})
|
|
)
|
|
deleted = _extract_structured(
|
|
await server.call_tool("workspace_delete", {"workspace_id": workspace_id})
|
|
)
|
|
return created, written, executed, diffed, exported, reset, deleted
|
|
|
|
created, written, executed, diffed, exported, reset, deleted = asyncio.run(_run())
|
|
assert created["name"] == "chat-loop"
|
|
assert created["labels"] == {"issue": "123"}
|
|
assert written["bytes_written"] == len("fixed\n".encode("utf-8"))
|
|
assert executed["stdout"] == "fixed\n"
|
|
assert diffed["changed"] is True
|
|
assert Path(str(exported["output_path"])).read_text(encoding="utf-8") == "fixed\n"
|
|
assert reset["command_count"] == 0
|
|
assert deleted["deleted"] is True
|
|
|
|
|
|
def test_workspace_tools_round_trip(tmp_path: Path) -> None:
|
|
manager = VmManager(
|
|
backend_name="mock",
|
|
base_dir=tmp_path / "vms",
|
|
network_manager=TapNetworkManager(enabled=False),
|
|
)
|
|
source_dir = tmp_path / "seed"
|
|
source_dir.mkdir()
|
|
(source_dir / "note.txt").write_text("ok\n", encoding="utf-8")
|
|
secret_file = tmp_path / "token.txt"
|
|
secret_file.write_text("from-file\n", encoding="utf-8")
|
|
|
|
def _extract_structured(raw_result: object) -> dict[str, Any]:
|
|
if not isinstance(raw_result, tuple) or len(raw_result) != 2:
|
|
raise TypeError("unexpected call_tool result shape")
|
|
_, structured = raw_result
|
|
if not isinstance(structured, dict):
|
|
raise TypeError("expected structured dictionary result")
|
|
return cast(dict[str, Any], structured)
|
|
|
|
async def _run() -> tuple[dict[str, Any], ...]:
|
|
server = create_server(manager=manager, profile="workspace-full")
|
|
created = _extract_structured(
|
|
await server.call_tool(
|
|
"workspace_create",
|
|
{
|
|
"environment": "debian:12-base",
|
|
"allow_host_compat": True,
|
|
"seed_path": str(source_dir),
|
|
"name": "repro-fix",
|
|
"labels": {"issue": "123"},
|
|
"secrets": [
|
|
{"name": "API_TOKEN", "value": "expected"},
|
|
{"name": "FILE_TOKEN", "file_path": str(secret_file)},
|
|
],
|
|
},
|
|
)
|
|
)
|
|
workspace_id = str(created["workspace_id"])
|
|
listed_before = _extract_structured(await server.call_tool("workspace_list", {}))
|
|
updated = _extract_structured(
|
|
await server.call_tool(
|
|
"workspace_update",
|
|
{
|
|
"workspace_id": workspace_id,
|
|
"labels": {"owner": "codex"},
|
|
"clear_labels": ["issue"],
|
|
},
|
|
)
|
|
)
|
|
update_dir = tmp_path / "update"
|
|
update_dir.mkdir()
|
|
(update_dir / "more.txt").write_text("more\n", encoding="utf-8")
|
|
synced = _extract_structured(
|
|
await server.call_tool(
|
|
"workspace_sync_push",
|
|
{
|
|
"workspace_id": workspace_id,
|
|
"source_path": str(update_dir),
|
|
"dest": "subdir",
|
|
},
|
|
)
|
|
)
|
|
executed = _extract_structured(
|
|
await server.call_tool(
|
|
"workspace_exec",
|
|
{
|
|
"workspace_id": workspace_id,
|
|
"command": 'sh -lc \'printf "%s\\n" "$API_TOKEN"\'',
|
|
"secret_env": {"API_TOKEN": "API_TOKEN"},
|
|
},
|
|
)
|
|
)
|
|
listed_files = _extract_structured(
|
|
await server.call_tool(
|
|
"workspace_file_list",
|
|
{
|
|
"workspace_id": workspace_id,
|
|
"path": "/workspace",
|
|
"recursive": True,
|
|
},
|
|
)
|
|
)
|
|
file_read = _extract_structured(
|
|
await server.call_tool(
|
|
"workspace_file_read",
|
|
{
|
|
"workspace_id": workspace_id,
|
|
"path": "note.txt",
|
|
"max_bytes": 4096,
|
|
},
|
|
)
|
|
)
|
|
file_written = _extract_structured(
|
|
await server.call_tool(
|
|
"workspace_file_write",
|
|
{
|
|
"workspace_id": workspace_id,
|
|
"path": "src/app.py",
|
|
"text": "print('hello from file op')\n",
|
|
},
|
|
)
|
|
)
|
|
patched = _extract_structured(
|
|
await server.call_tool(
|
|
"workspace_patch_apply",
|
|
{
|
|
"workspace_id": workspace_id,
|
|
"patch": (
|
|
"--- a/note.txt\n"
|
|
"+++ b/note.txt\n"
|
|
"@@ -1 +1 @@\n"
|
|
"-ok\n"
|
|
"+patched\n"
|
|
),
|
|
},
|
|
)
|
|
)
|
|
diffed = _extract_structured(
|
|
await server.call_tool("workspace_diff", {"workspace_id": workspace_id})
|
|
)
|
|
snapshot = _extract_structured(
|
|
await server.call_tool(
|
|
"snapshot_create",
|
|
{"workspace_id": workspace_id, "snapshot_name": "checkpoint"},
|
|
)
|
|
)
|
|
snapshots = _extract_structured(
|
|
await server.call_tool("snapshot_list", {"workspace_id": workspace_id})
|
|
)
|
|
export_path = tmp_path / "exported-more.txt"
|
|
exported = _extract_structured(
|
|
await server.call_tool(
|
|
"workspace_export",
|
|
{
|
|
"workspace_id": workspace_id,
|
|
"path": "subdir/more.txt",
|
|
"output_path": str(export_path),
|
|
},
|
|
)
|
|
)
|
|
service = _extract_structured(
|
|
await server.call_tool(
|
|
"service_start",
|
|
{
|
|
"workspace_id": workspace_id,
|
|
"service_name": "app",
|
|
"command": (
|
|
'sh -lc \'trap "exit 0" TERM; printf "%s\\n" "$API_TOKEN" >&2; '
|
|
'touch .ready; while true; do sleep 60; done\''
|
|
),
|
|
"ready_file": ".ready",
|
|
"secret_env": {"API_TOKEN": "API_TOKEN"},
|
|
},
|
|
)
|
|
)
|
|
services = _extract_structured(
|
|
await server.call_tool("service_list", {"workspace_id": workspace_id})
|
|
)
|
|
service_status = _extract_structured(
|
|
await server.call_tool(
|
|
"service_status",
|
|
{
|
|
"workspace_id": workspace_id,
|
|
"service_name": "app",
|
|
},
|
|
)
|
|
)
|
|
service_logs = _extract_structured(
|
|
await server.call_tool(
|
|
"service_logs",
|
|
{
|
|
"workspace_id": workspace_id,
|
|
"service_name": "app",
|
|
"all": True,
|
|
},
|
|
)
|
|
)
|
|
service_stopped = _extract_structured(
|
|
await server.call_tool(
|
|
"service_stop",
|
|
{
|
|
"workspace_id": workspace_id,
|
|
"service_name": "app",
|
|
},
|
|
)
|
|
)
|
|
reset = _extract_structured(
|
|
await server.call_tool(
|
|
"workspace_reset",
|
|
{"workspace_id": workspace_id, "snapshot": "checkpoint"},
|
|
)
|
|
)
|
|
deleted_snapshot = _extract_structured(
|
|
await server.call_tool(
|
|
"snapshot_delete",
|
|
{"workspace_id": workspace_id, "snapshot_name": "checkpoint"},
|
|
)
|
|
)
|
|
logs = _extract_structured(
|
|
await server.call_tool("workspace_logs", {"workspace_id": workspace_id})
|
|
)
|
|
deleted = _extract_structured(
|
|
await server.call_tool("workspace_delete", {"workspace_id": workspace_id})
|
|
)
|
|
return (
|
|
created,
|
|
listed_before,
|
|
updated,
|
|
synced,
|
|
executed,
|
|
listed_files,
|
|
file_read,
|
|
file_written,
|
|
patched,
|
|
diffed,
|
|
snapshot,
|
|
snapshots,
|
|
exported,
|
|
service,
|
|
services,
|
|
service_status,
|
|
service_logs,
|
|
service_stopped,
|
|
reset,
|
|
deleted_snapshot,
|
|
logs,
|
|
deleted,
|
|
)
|
|
|
|
(
|
|
created,
|
|
listed_before,
|
|
updated,
|
|
synced,
|
|
executed,
|
|
listed_files,
|
|
file_read,
|
|
file_written,
|
|
patched,
|
|
diffed,
|
|
snapshot,
|
|
snapshots,
|
|
exported,
|
|
service,
|
|
services,
|
|
service_status,
|
|
service_logs,
|
|
service_stopped,
|
|
reset,
|
|
deleted_snapshot,
|
|
logs,
|
|
deleted,
|
|
) = asyncio.run(_run())
|
|
assert created["state"] == "started"
|
|
assert created["name"] == "repro-fix"
|
|
assert created["labels"] == {"issue": "123"}
|
|
assert listed_before["count"] == 1
|
|
assert listed_before["workspaces"][0]["name"] == "repro-fix"
|
|
assert updated["labels"] == {"owner": "codex"}
|
|
assert created["workspace_seed"]["mode"] == "directory"
|
|
assert created["secrets"] == [
|
|
{"name": "API_TOKEN", "source_kind": "literal"},
|
|
{"name": "FILE_TOKEN", "source_kind": "file"},
|
|
]
|
|
assert synced["workspace_sync"]["destination"] == "/workspace/subdir"
|
|
assert executed["stdout"] == "[REDACTED]\n"
|
|
assert any(entry["path"] == "/workspace/note.txt" for entry in listed_files["entries"])
|
|
assert file_read["content"] == "ok\n"
|
|
assert file_written["path"] == "/workspace/src/app.py"
|
|
assert patched["changed"] is True
|
|
assert diffed["changed"] is True
|
|
assert snapshot["snapshot"]["snapshot_name"] == "checkpoint"
|
|
assert [entry["snapshot_name"] for entry in snapshots["snapshots"]] == [
|
|
"baseline",
|
|
"checkpoint",
|
|
]
|
|
assert exported["artifact_type"] == "file"
|
|
assert Path(str(exported["output_path"])).read_text(encoding="utf-8") == "more\n"
|
|
assert service["state"] == "running"
|
|
assert services["count"] == 1
|
|
assert service_status["state"] == "running"
|
|
assert service_logs["stderr"].count("[REDACTED]") >= 1
|
|
assert service_logs["tail_lines"] is None
|
|
assert service_stopped["state"] == "stopped"
|
|
assert reset["workspace_reset"]["snapshot_name"] == "checkpoint"
|
|
assert reset["secrets"] == created["secrets"]
|
|
assert reset["command_count"] == 0
|
|
assert reset["service_count"] == 0
|
|
assert deleted_snapshot["deleted"] is True
|
|
assert logs["count"] == 0
|
|
assert deleted["deleted"] is True
|