pyro-mcp/tests/test_api.py
Thales Maciel c00c699a9f Make workspace-core the default MCP profile
Flip bare pyro mcp serve, create_server(), and Pyro.create_server() to default to workspace-core in 4.0.0 while keeping workspace-full as the explicit advanced opt-in surface.

Rewrite the MCP-facing docs and host-specific examples around the bare default command, update package and catalog compatibility to 4.x, and move the public-contract wording from 3.x compatibility guidance to the new stable default.

Adjust the server, API, and contract tests so bare server creation now asserts the workspace-core tool set, while explicit workspace-full coverage continues to prove shells, services, snapshots, and disk tools remain available.

Validation: uv lock; .venv/bin/pytest --no-cov tests/test_cli.py tests/test_api.py tests/test_server.py tests/test_public_contract.py; UV_CACHE_DIR=.uv-cache make check; UV_CACHE_DIR=.uv-cache make dist-check; real guest-backed smoke for bare Pyro.create_server() plus explicit profile="workspace-full".
2026-03-13 14:14:15 -03:00

1331 lines
44 KiB
Python

from __future__ import annotations
import asyncio
import time
from pathlib import Path
from typing import Any, cast
from pyro_mcp.api import Pyro
from pyro_mcp.contract import (
PUBLIC_MCP_VM_RUN_PROFILE_TOOLS,
PUBLIC_MCP_WORKSPACE_CORE_PROFILE_TOOLS,
PUBLIC_MCP_WORKSPACE_FULL_PROFILE_TOOLS,
)
from pyro_mcp.vm_manager import VmManager
from pyro_mcp.vm_network import TapNetworkManager
def test_pyro_run_in_vm_delegates_to_manager(tmp_path: Path) -> None:
pyro = Pyro(
manager=VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
)
result = pyro.run_in_vm(
environment="debian:12-base",
command="printf 'ok\\n'",
vcpu_count=1,
mem_mib=512,
timeout_seconds=30,
ttl_seconds=600,
network=False,
allow_host_compat=True,
)
assert int(result["exit_code"]) == 0
assert str(result["stdout"]) == "ok\n"
def test_pyro_create_server_defaults_to_workspace_core_profile(tmp_path: Path) -> None:
pyro = Pyro(
manager=VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
)
async def _run() -> tuple[list[str], dict[str, dict[str, Any]]]:
server = pyro.create_server()
tools = await server.list_tools()
tool_map = {tool.name: tool.model_dump() for tool in tools}
return sorted(tool_map), tool_map
tool_names, tool_map = asyncio.run(_run())
assert tuple(tool_names) == tuple(sorted(PUBLIC_MCP_WORKSPACE_CORE_PROFILE_TOOLS))
create_properties = tool_map["workspace_create"]["inputSchema"]["properties"]
assert "network_policy" not in create_properties
assert "secrets" not in create_properties
exec_properties = tool_map["workspace_exec"]["inputSchema"]["properties"]
assert "secret_env" not in exec_properties
assert "shell_open" not in tool_map
assert "service_start" not in tool_map
assert "snapshot_create" not in tool_map
assert "workspace_disk_export" not in tool_map
def test_pyro_create_server_workspace_full_profile_keeps_shell_read_schema(tmp_path: Path) -> None:
pyro = Pyro(
manager=VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
)
async def _run() -> tuple[list[str], dict[str, dict[str, Any]]]:
server = pyro.create_server(profile="workspace-full")
tools = await server.list_tools()
tool_map = {tool.name: tool.model_dump() for tool in tools}
return sorted(tool_map), tool_map
tool_names, tool_map = asyncio.run(_run())
assert tuple(tool_names) == tuple(sorted(PUBLIC_MCP_WORKSPACE_FULL_PROFILE_TOOLS))
shell_read_properties = tool_map["shell_read"]["inputSchema"]["properties"]
assert "plain" in shell_read_properties
assert "wait_for_idle_ms" in shell_read_properties
def test_pyro_create_server_vm_run_profile_registers_only_vm_run(tmp_path: Path) -> None:
pyro = Pyro(
manager=VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
)
async def _run() -> list[str]:
server = pyro.create_server(profile="vm-run")
tools = await server.list_tools()
return sorted(tool.name for tool in tools)
assert tuple(asyncio.run(_run())) == PUBLIC_MCP_VM_RUN_PROFILE_TOOLS
def test_pyro_create_server_workspace_core_profile_registers_expected_tools_and_schemas(
tmp_path: Path,
) -> None:
pyro = Pyro(
manager=VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
)
async def _run() -> tuple[list[str], dict[str, dict[str, Any]]]:
server = pyro.create_server(profile="workspace-core")
tools = await server.list_tools()
tool_map = {tool.name: tool.model_dump() for tool in tools}
return sorted(tool_map), tool_map
tool_names, tool_map = asyncio.run(_run())
assert tuple(tool_names) == tuple(sorted(PUBLIC_MCP_WORKSPACE_CORE_PROFILE_TOOLS))
create_properties = tool_map["workspace_create"]["inputSchema"]["properties"]
assert "network_policy" not in create_properties
assert "secrets" not in create_properties
exec_properties = tool_map["workspace_exec"]["inputSchema"]["properties"]
assert "secret_env" not in exec_properties
assert "shell_open" not in tool_map
assert "service_start" not in tool_map
assert "snapshot_create" not in tool_map
assert "workspace_disk_export" not in tool_map
def test_pyro_vm_run_tool_executes(tmp_path: Path) -> None:
pyro = Pyro(
manager=VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
)
def _extract_structured(raw_result: object) -> dict[str, Any]:
if not isinstance(raw_result, tuple) or len(raw_result) != 2:
raise TypeError("unexpected call_tool result shape")
_, structured = raw_result
if not isinstance(structured, dict):
raise TypeError("expected structured dictionary result")
return cast(dict[str, Any], structured)
async def _run() -> dict[str, Any]:
server = pyro.create_server()
return _extract_structured(
await server.call_tool(
"vm_run",
{
"environment": "debian:12-base",
"command": "printf 'ok\\n'",
"network": False,
"allow_host_compat": True,
},
)
)
result = asyncio.run(_run())
assert int(result["exit_code"]) == 0
def test_pyro_create_vm_defaults_sizing_and_host_compat(tmp_path: Path) -> None:
pyro = Pyro(
manager=VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
)
created = pyro.create_vm(
environment="debian:12-base",
allow_host_compat=True,
)
assert created["vcpu_count"] == 1
assert created["mem_mib"] == 1024
assert created["allow_host_compat"] is True
def test_pyro_workspace_network_policy_and_published_ports_delegate() -> None:
calls: list[tuple[str, dict[str, Any]]] = []
class StubManager:
def create_workspace(self, **kwargs: Any) -> dict[str, Any]:
calls.append(("create_workspace", kwargs))
return {"workspace_id": "workspace-123"}
def list_workspaces(self) -> dict[str, Any]:
calls.append(("list_workspaces", {}))
return {"count": 1, "workspaces": [{"workspace_id": "workspace-123"}]}
def update_workspace(self, workspace_id: str, **kwargs: Any) -> dict[str, Any]:
calls.append(("update_workspace", {"workspace_id": workspace_id, **kwargs}))
return {"workspace_id": workspace_id, "name": "repro-fix", "labels": {"owner": "codex"}}
def start_service(
self,
workspace_id: str,
service_name: str,
**kwargs: Any,
) -> dict[str, Any]:
calls.append(
(
"start_service",
{
"workspace_id": workspace_id,
"service_name": service_name,
**kwargs,
},
)
)
return {"workspace_id": workspace_id, "service_name": service_name, "state": "running"}
pyro = Pyro(manager=cast(Any, StubManager()))
pyro.create_workspace(
environment="debian:12",
network_policy="egress+published-ports",
name="repro-fix",
labels={"issue": "123"},
)
pyro.list_workspaces()
pyro.update_workspace(
"workspace-123",
name="repro-fix",
labels={"owner": "codex"},
clear_labels=["issue"],
)
pyro.start_service(
"workspace-123",
"web",
command="python3 -m http.server 8080",
published_ports=[{"guest_port": 8080, "host_port": 18080}],
)
assert calls[0] == (
"create_workspace",
{
"environment": "debian:12",
"vcpu_count": 1,
"mem_mib": 1024,
"ttl_seconds": 600,
"network_policy": "egress+published-ports",
"allow_host_compat": False,
"seed_path": None,
"secrets": None,
"name": "repro-fix",
"labels": {"issue": "123"},
},
)
assert calls[1] == (
"list_workspaces",
{},
)
assert calls[2] == (
"update_workspace",
{
"workspace_id": "workspace-123",
"name": "repro-fix",
"clear_name": False,
"labels": {"owner": "codex"},
"clear_labels": ["issue"],
},
)
assert calls[3] == (
"start_service",
{
"workspace_id": "workspace-123",
"service_name": "web",
"command": "python3 -m http.server 8080",
"cwd": "/workspace",
"readiness": None,
"ready_timeout_seconds": 30,
"ready_interval_ms": 500,
"secret_env": None,
"published_ports": [{"guest_port": 8080, "host_port": 18080}],
},
)
def test_pyro_workspace_methods_delegate_to_manager(tmp_path: Path) -> None:
pyro = Pyro(
manager=VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
)
source_dir = tmp_path / "seed"
source_dir.mkdir()
(source_dir / "note.txt").write_text("ok\n", encoding="utf-8")
secret_file = tmp_path / "token.txt"
secret_file.write_text("from-file\n", encoding="utf-8")
created = pyro.create_workspace(
environment="debian:12-base",
allow_host_compat=True,
seed_path=source_dir,
name="repro-fix",
labels={"issue": "123"},
secrets=[
{"name": "API_TOKEN", "value": "expected"},
{"name": "FILE_TOKEN", "file_path": str(secret_file)},
],
)
workspace_id = str(created["workspace_id"])
listed_before = pyro.list_workspaces()
updated_metadata = pyro.update_workspace(
workspace_id,
labels={"owner": "codex"},
clear_labels=["issue"],
)
updated_dir = tmp_path / "updated"
updated_dir.mkdir()
(updated_dir / "more.txt").write_text("more\n", encoding="utf-8")
synced = pyro.push_workspace_sync(workspace_id, updated_dir, dest="subdir")
executed = pyro.exec_workspace(
workspace_id,
command='sh -lc \'printf "%s\\n" "$API_TOKEN"\'',
secret_env={"API_TOKEN": "API_TOKEN"},
)
listed_files = pyro.list_workspace_files(workspace_id, path="/workspace", recursive=True)
file_read = pyro.read_workspace_file(workspace_id, "note.txt")
file_write = pyro.write_workspace_file(
workspace_id,
"src/app.py",
text="print('hello from file op')\n",
)
patch_result = pyro.apply_workspace_patch(
workspace_id,
patch=(
"--- a/note.txt\n"
"+++ b/note.txt\n"
"@@ -1 +1 @@\n"
"-ok\n"
"+patched\n"
),
)
diff_payload = pyro.diff_workspace(workspace_id)
snapshot = pyro.create_snapshot(workspace_id, "checkpoint")
snapshots = pyro.list_snapshots(workspace_id)
export_path = tmp_path / "exported-note.txt"
exported = pyro.export_workspace(workspace_id, "note.txt", output_path=export_path)
shell = pyro.open_shell(
workspace_id,
secret_env={"API_TOKEN": "API_TOKEN"},
)
shell_id = str(shell["shell_id"])
pyro.write_shell(workspace_id, shell_id, input='printf "%s\\n" "$API_TOKEN"')
shell_output: dict[str, Any] = {}
deadline = time.time() + 5
while time.time() < deadline:
shell_output = pyro.read_shell(workspace_id, shell_id, cursor=0, max_chars=65536)
if "[REDACTED]" in str(shell_output.get("output", "")):
break
time.sleep(0.05)
shell_closed = pyro.close_shell(workspace_id, shell_id)
service = pyro.start_service(
workspace_id,
"app",
command=(
'sh -lc \'trap "exit 0" TERM; printf "%s\\n" "$API_TOKEN" >&2; '
'touch .ready; while true; do sleep 60; done\''
),
readiness={"type": "file", "path": ".ready"},
secret_env={"API_TOKEN": "API_TOKEN"},
)
services = pyro.list_services(workspace_id)
service_status = pyro.status_service(workspace_id, "app")
service_logs = pyro.logs_service(workspace_id, "app", all=True)
reset = pyro.reset_workspace(workspace_id, snapshot="checkpoint")
deleted_snapshot = pyro.delete_snapshot(workspace_id, "checkpoint")
status = pyro.status_workspace(workspace_id)
logs = pyro.logs_workspace(workspace_id)
deleted = pyro.delete_workspace(workspace_id)
assert created["secrets"] == [
{"name": "API_TOKEN", "source_kind": "literal"},
{"name": "FILE_TOKEN", "source_kind": "file"},
]
assert created["name"] == "repro-fix"
assert created["labels"] == {"issue": "123"}
assert listed_before["count"] == 1
assert listed_before["workspaces"][0]["name"] == "repro-fix"
assert updated_metadata["labels"] == {"owner": "codex"}
assert executed["stdout"] == "[REDACTED]\n"
assert any(entry["path"] == "/workspace/note.txt" for entry in listed_files["entries"])
assert file_read["content"] == "ok\n"
assert file_write["path"] == "/workspace/src/app.py"
assert file_write["bytes_written"] == len("print('hello from file op')\n".encode("utf-8"))
assert patch_result["changed"] is True
assert patch_result["entries"] == [{"path": "/workspace/note.txt", "status": "modified"}]
assert created["workspace_seed"]["mode"] == "directory"
assert synced["workspace_sync"]["destination"] == "/workspace/subdir"
assert diff_payload["changed"] is True
assert snapshot["snapshot"]["snapshot_name"] == "checkpoint"
assert snapshots["count"] == 2
assert exported["output_path"] == str(export_path)
assert export_path.read_text(encoding="utf-8") == "patched\n"
assert shell_output["output"].count("[REDACTED]") >= 1
assert shell_closed["closed"] is True
assert service["state"] == "running"
assert services["count"] == 1
assert service_status["state"] == "running"
assert service_logs["stderr"].count("[REDACTED]") >= 1
assert service_logs["tail_lines"] is None
assert reset["workspace_reset"]["snapshot_name"] == "checkpoint"
assert reset["secrets"] == created["secrets"]
assert deleted_snapshot["deleted"] is True
assert status["command_count"] == 0
assert status["service_count"] == 0
assert logs["count"] == 0
assert deleted["deleted"] is True
def test_pyro_workspace_disk_methods_delegate_to_manager() -> None:
calls: list[tuple[str, dict[str, Any]]] = []
class StubManager:
def stop_workspace(self, workspace_id: str) -> dict[str, Any]:
calls.append(("stop_workspace", {"workspace_id": workspace_id}))
return {"workspace_id": workspace_id, "state": "stopped"}
def start_workspace(self, workspace_id: str) -> dict[str, Any]:
calls.append(("start_workspace", {"workspace_id": workspace_id}))
return {"workspace_id": workspace_id, "state": "started"}
def export_workspace_disk(self, workspace_id: str, *, output_path: Path) -> dict[str, Any]:
calls.append(
(
"export_workspace_disk",
{"workspace_id": workspace_id, "output_path": str(output_path)},
)
)
return {"workspace_id": workspace_id, "output_path": str(output_path)}
def list_workspace_disk(
self,
workspace_id: str,
*,
path: str = "/workspace",
recursive: bool = False,
) -> dict[str, Any]:
calls.append(
(
"list_workspace_disk",
{
"workspace_id": workspace_id,
"path": path,
"recursive": recursive,
},
)
)
return {"workspace_id": workspace_id, "entries": []}
def read_workspace_disk(
self,
workspace_id: str,
*,
path: str,
max_bytes: int = 65536,
) -> dict[str, Any]:
calls.append(
(
"read_workspace_disk",
{
"workspace_id": workspace_id,
"path": path,
"max_bytes": max_bytes,
},
)
)
return {"workspace_id": workspace_id, "content": ""}
pyro = Pyro(manager=cast(Any, StubManager()))
stopped = pyro.stop_workspace("workspace-123")
started = pyro.start_workspace("workspace-123")
exported = pyro.export_workspace_disk("workspace-123", output_path=Path("/tmp/workspace.ext4"))
listed = pyro.list_workspace_disk("workspace-123", path="/workspace/src", recursive=True)
read = pyro.read_workspace_disk("workspace-123", "note.txt", max_bytes=4096)
assert stopped["state"] == "stopped"
assert started["state"] == "started"
assert exported["output_path"] == "/tmp/workspace.ext4"
assert listed["entries"] == []
assert read["content"] == ""
assert calls == [
("stop_workspace", {"workspace_id": "workspace-123"}),
("start_workspace", {"workspace_id": "workspace-123"}),
(
"export_workspace_disk",
{
"workspace_id": "workspace-123",
"output_path": "/tmp/workspace.ext4",
},
),
(
"list_workspace_disk",
{
"workspace_id": "workspace-123",
"path": "/workspace/src",
"recursive": True,
},
),
(
"read_workspace_disk",
{
"workspace_id": "workspace-123",
"path": "note.txt",
"max_bytes": 4096,
},
),
]
def test_pyro_create_server_workspace_disk_tools_delegate() -> None:
calls: list[tuple[str, dict[str, Any]]] = []
class StubManager:
def stop_workspace(self, workspace_id: str) -> dict[str, Any]:
calls.append(("stop_workspace", {"workspace_id": workspace_id}))
return {"workspace_id": workspace_id, "state": "stopped"}
def start_workspace(self, workspace_id: str) -> dict[str, Any]:
calls.append(("start_workspace", {"workspace_id": workspace_id}))
return {"workspace_id": workspace_id, "state": "started"}
def export_workspace_disk(self, workspace_id: str, *, output_path: str) -> dict[str, Any]:
calls.append(
(
"export_workspace_disk",
{"workspace_id": workspace_id, "output_path": output_path},
)
)
return {"workspace_id": workspace_id, "output_path": output_path}
def list_workspace_disk(
self,
workspace_id: str,
*,
path: str = "/workspace",
recursive: bool = False,
) -> dict[str, Any]:
calls.append(
(
"list_workspace_disk",
{
"workspace_id": workspace_id,
"path": path,
"recursive": recursive,
},
)
)
return {"workspace_id": workspace_id, "entries": []}
def read_workspace_disk(
self,
workspace_id: str,
*,
path: str,
max_bytes: int = 65536,
) -> dict[str, Any]:
calls.append(
(
"read_workspace_disk",
{
"workspace_id": workspace_id,
"path": path,
"max_bytes": max_bytes,
},
)
)
return {"workspace_id": workspace_id, "content": ""}
pyro = Pyro(manager=cast(Any, StubManager()))
def _extract_structured(raw_result: object) -> dict[str, Any]:
if not isinstance(raw_result, tuple) or len(raw_result) != 2:
raise TypeError("unexpected call_tool result shape")
_, structured = raw_result
if not isinstance(structured, dict):
raise TypeError("expected structured dictionary result")
return cast(dict[str, Any], structured)
async def _run() -> tuple[dict[str, Any], ...]:
server = pyro.create_server(profile="workspace-full")
stopped = _extract_structured(
await server.call_tool("workspace_stop", {"workspace_id": "workspace-123"})
)
started = _extract_structured(
await server.call_tool("workspace_start", {"workspace_id": "workspace-123"})
)
exported = _extract_structured(
await server.call_tool(
"workspace_disk_export",
{
"workspace_id": "workspace-123",
"output_path": "/tmp/workspace.ext4",
},
)
)
listed = _extract_structured(
await server.call_tool(
"workspace_disk_list",
{
"workspace_id": "workspace-123",
"path": "/workspace/src",
"recursive": True,
},
)
)
read = _extract_structured(
await server.call_tool(
"workspace_disk_read",
{
"workspace_id": "workspace-123",
"path": "note.txt",
"max_bytes": 4096,
},
)
)
return stopped, started, exported, listed, read
stopped, started, exported, listed, read = asyncio.run(_run())
assert stopped["state"] == "stopped"
assert started["state"] == "started"
assert exported["output_path"] == "/tmp/workspace.ext4"
assert listed["entries"] == []
assert read["content"] == ""
assert calls == [
("stop_workspace", {"workspace_id": "workspace-123"}),
("start_workspace", {"workspace_id": "workspace-123"}),
(
"export_workspace_disk",
{
"workspace_id": "workspace-123",
"output_path": "/tmp/workspace.ext4",
},
),
(
"list_workspace_disk",
{
"workspace_id": "workspace-123",
"path": "/workspace/src",
"recursive": True,
},
),
(
"read_workspace_disk",
{
"workspace_id": "workspace-123",
"path": "note.txt",
"max_bytes": 4096,
},
),
]
def test_pyro_workspace_file_methods_delegate_to_manager() -> None:
calls: list[tuple[str, dict[str, Any]]] = []
class StubManager:
def list_workspace_files(
self,
workspace_id: str,
*,
path: str = "/workspace",
recursive: bool = False,
) -> dict[str, Any]:
calls.append(
(
"list_workspace_files",
{
"workspace_id": workspace_id,
"path": path,
"recursive": recursive,
},
)
)
return {"workspace_id": workspace_id, "entries": []}
def read_workspace_file(
self,
workspace_id: str,
path: str,
*,
max_bytes: int = 65536,
) -> dict[str, Any]:
calls.append(
(
"read_workspace_file",
{
"workspace_id": workspace_id,
"path": path,
"max_bytes": max_bytes,
},
)
)
return {"workspace_id": workspace_id, "content": "hello\n"}
def write_workspace_file(
self,
workspace_id: str,
path: str,
*,
text: str,
) -> dict[str, Any]:
calls.append(
(
"write_workspace_file",
{
"workspace_id": workspace_id,
"path": path,
"text": text,
},
)
)
return {"workspace_id": workspace_id, "bytes_written": len(text.encode("utf-8"))}
def apply_workspace_patch(
self,
workspace_id: str,
*,
patch: str,
) -> dict[str, Any]:
calls.append(
(
"apply_workspace_patch",
{
"workspace_id": workspace_id,
"patch": patch,
},
)
)
return {"workspace_id": workspace_id, "changed": True}
pyro = Pyro(manager=cast(Any, StubManager()))
listed = pyro.list_workspace_files("workspace-123", path="/workspace/src", recursive=True)
read = pyro.read_workspace_file("workspace-123", "note.txt", max_bytes=4096)
written = pyro.write_workspace_file("workspace-123", "src/app.py", text="print('hi')\n")
patched = pyro.apply_workspace_patch(
"workspace-123",
patch="--- a/note.txt\n+++ b/note.txt\n@@ -1 +1 @@\n-old\n+new\n",
)
assert listed["entries"] == []
assert read["content"] == "hello\n"
assert written["bytes_written"] == len("print('hi')\n".encode("utf-8"))
assert patched["changed"] is True
assert calls == [
(
"list_workspace_files",
{
"workspace_id": "workspace-123",
"path": "/workspace/src",
"recursive": True,
},
),
(
"read_workspace_file",
{
"workspace_id": "workspace-123",
"path": "note.txt",
"max_bytes": 4096,
},
),
(
"write_workspace_file",
{
"workspace_id": "workspace-123",
"path": "src/app.py",
"text": "print('hi')\n",
},
),
(
"apply_workspace_patch",
{
"workspace_id": "workspace-123",
"patch": "--- a/note.txt\n+++ b/note.txt\n@@ -1 +1 @@\n-old\n+new\n",
},
),
]
def test_pyro_create_server_workspace_file_tools_delegate() -> None:
calls: list[tuple[str, dict[str, Any]]] = []
class StubManager:
def list_workspace_files(
self,
workspace_id: str,
*,
path: str = "/workspace",
recursive: bool = False,
) -> dict[str, Any]:
calls.append(
(
"list_workspace_files",
{
"workspace_id": workspace_id,
"path": path,
"recursive": recursive,
},
)
)
return {"workspace_id": workspace_id, "entries": []}
def read_workspace_file(
self,
workspace_id: str,
path: str,
*,
max_bytes: int = 65536,
) -> dict[str, Any]:
calls.append(
(
"read_workspace_file",
{
"workspace_id": workspace_id,
"path": path,
"max_bytes": max_bytes,
},
)
)
return {"workspace_id": workspace_id, "content": "hello\n"}
def write_workspace_file(
self,
workspace_id: str,
path: str,
*,
text: str,
) -> dict[str, Any]:
calls.append(
(
"write_workspace_file",
{
"workspace_id": workspace_id,
"path": path,
"text": text,
},
)
)
return {"workspace_id": workspace_id, "bytes_written": len(text.encode("utf-8"))}
def apply_workspace_patch(
self,
workspace_id: str,
*,
patch: str,
) -> dict[str, Any]:
calls.append(
(
"apply_workspace_patch",
{
"workspace_id": workspace_id,
"patch": patch,
},
)
)
return {"workspace_id": workspace_id, "changed": True}
pyro = Pyro(manager=cast(Any, StubManager()))
def _extract_structured(raw_result: object) -> dict[str, Any]:
if not isinstance(raw_result, tuple) or len(raw_result) != 2:
raise TypeError("unexpected call_tool result shape")
_, structured = raw_result
if not isinstance(structured, dict):
raise TypeError("expected structured dictionary result")
return cast(dict[str, Any], structured)
async def _run() -> tuple[dict[str, Any], ...]:
server = pyro.create_server()
listed = _extract_structured(
await server.call_tool(
"workspace_file_list",
{
"workspace_id": "workspace-123",
"path": "/workspace/src",
"recursive": True,
},
)
)
read = _extract_structured(
await server.call_tool(
"workspace_file_read",
{
"workspace_id": "workspace-123",
"path": "note.txt",
"max_bytes": 4096,
},
)
)
written = _extract_structured(
await server.call_tool(
"workspace_file_write",
{
"workspace_id": "workspace-123",
"path": "src/app.py",
"text": "print('hi')\n",
},
)
)
patched = _extract_structured(
await server.call_tool(
"workspace_patch_apply",
{
"workspace_id": "workspace-123",
"patch": "--- a/note.txt\n+++ b/note.txt\n@@ -1 +1 @@\n-old\n+new\n",
},
)
)
return listed, read, written, patched
listed, read, written, patched = asyncio.run(_run())
assert listed["entries"] == []
assert read["content"] == "hello\n"
assert written["bytes_written"] == len("print('hi')\n".encode("utf-8"))
assert patched["changed"] is True
assert calls == [
(
"list_workspace_files",
{
"workspace_id": "workspace-123",
"path": "/workspace/src",
"recursive": True,
},
),
(
"read_workspace_file",
{
"workspace_id": "workspace-123",
"path": "note.txt",
"max_bytes": 4096,
},
),
(
"write_workspace_file",
{
"workspace_id": "workspace-123",
"path": "src/app.py",
"text": "print('hi')\n",
},
),
(
"apply_workspace_patch",
{
"workspace_id": "workspace-123",
"patch": "--- a/note.txt\n+++ b/note.txt\n@@ -1 +1 @@\n-old\n+new\n",
},
),
]
def test_pyro_create_server_workspace_status_shell_and_service_delegate() -> None:
calls: list[tuple[str, dict[str, Any]]] = []
class StubManager:
def status_workspace(self, workspace_id: str) -> dict[str, Any]:
calls.append(("status_workspace", {"workspace_id": workspace_id}))
return {"workspace_id": workspace_id, "state": "started"}
def logs_workspace(self, workspace_id: str) -> dict[str, Any]:
calls.append(("logs_workspace", {"workspace_id": workspace_id}))
return {"workspace_id": workspace_id, "count": 0, "entries": []}
def open_shell(
self,
workspace_id: str,
*,
cwd: str = "/workspace",
cols: int = 120,
rows: int = 30,
secret_env: dict[str, str] | None = None,
) -> dict[str, Any]:
calls.append(
(
"open_shell",
{
"workspace_id": workspace_id,
"cwd": cwd,
"cols": cols,
"rows": rows,
"secret_env": secret_env,
},
)
)
return {"workspace_id": workspace_id, "shell_id": "shell-1", "state": "running"}
def read_shell(
self,
workspace_id: str,
shell_id: str,
*,
cursor: int = 0,
max_chars: int = 65536,
plain: bool = False,
wait_for_idle_ms: int | None = None,
) -> dict[str, Any]:
calls.append(
(
"read_shell",
{
"workspace_id": workspace_id,
"shell_id": shell_id,
"cursor": cursor,
"max_chars": max_chars,
"plain": plain,
"wait_for_idle_ms": wait_for_idle_ms,
},
)
)
return {"workspace_id": workspace_id, "shell_id": shell_id, "output": ""}
def write_shell(
self,
workspace_id: str,
shell_id: str,
*,
input_text: str,
append_newline: bool = True,
) -> dict[str, Any]:
calls.append(
(
"write_shell",
{
"workspace_id": workspace_id,
"shell_id": shell_id,
"input_text": input_text,
"append_newline": append_newline,
},
)
)
return {
"workspace_id": workspace_id,
"shell_id": shell_id,
"input_length": len(input_text),
}
def signal_shell(
self,
workspace_id: str,
shell_id: str,
*,
signal_name: str = "INT",
) -> dict[str, Any]:
calls.append(
(
"signal_shell",
{
"workspace_id": workspace_id,
"shell_id": shell_id,
"signal_name": signal_name,
},
)
)
return {"workspace_id": workspace_id, "shell_id": shell_id, "signal": signal_name}
def close_shell(self, workspace_id: str, shell_id: str) -> dict[str, Any]:
calls.append(
("close_shell", {"workspace_id": workspace_id, "shell_id": shell_id})
)
return {"workspace_id": workspace_id, "shell_id": shell_id, "closed": True}
def start_service(
self,
workspace_id: str,
service_name: str,
**kwargs: Any,
) -> dict[str, Any]:
calls.append(
(
"start_service",
{
"workspace_id": workspace_id,
"service_name": service_name,
**kwargs,
},
)
)
return {"workspace_id": workspace_id, "service_name": service_name, "state": "running"}
pyro = Pyro(manager=cast(Any, StubManager()))
def _extract_structured(raw_result: object) -> dict[str, Any]:
if not isinstance(raw_result, tuple) or len(raw_result) != 2:
raise TypeError("unexpected call_tool result shape")
_, structured = raw_result
if not isinstance(structured, dict):
raise TypeError("expected structured dictionary result")
return cast(dict[str, Any], structured)
async def _run() -> tuple[dict[str, Any], ...]:
server = pyro.create_server(profile="workspace-full")
status = _extract_structured(
await server.call_tool("workspace_status", {"workspace_id": "workspace-123"})
)
logs = _extract_structured(
await server.call_tool("workspace_logs", {"workspace_id": "workspace-123"})
)
opened = _extract_structured(
await server.call_tool(
"shell_open",
{
"workspace_id": "workspace-123",
"cwd": "/workspace/src",
"cols": 100,
"rows": 20,
"secret_env": {"TOKEN": "API_TOKEN"},
},
)
)
read = _extract_structured(
await server.call_tool(
"shell_read",
{
"workspace_id": "workspace-123",
"shell_id": "shell-1",
"cursor": 5,
"max_chars": 1024,
"plain": True,
"wait_for_idle_ms": 300,
},
)
)
wrote = _extract_structured(
await server.call_tool(
"shell_write",
{
"workspace_id": "workspace-123",
"shell_id": "shell-1",
"input": "pwd",
"append_newline": False,
},
)
)
signaled = _extract_structured(
await server.call_tool(
"shell_signal",
{
"workspace_id": "workspace-123",
"shell_id": "shell-1",
"signal_name": "TERM",
},
)
)
closed = _extract_structured(
await server.call_tool(
"shell_close",
{"workspace_id": "workspace-123", "shell_id": "shell-1"},
)
)
file_service = _extract_structured(
await server.call_tool(
"service_start",
{
"workspace_id": "workspace-123",
"service_name": "file",
"command": "run-file",
"ready_file": ".ready",
},
)
)
tcp_service = _extract_structured(
await server.call_tool(
"service_start",
{
"workspace_id": "workspace-123",
"service_name": "tcp",
"command": "run-tcp",
"ready_tcp": "127.0.0.1:8080",
},
)
)
http_service = _extract_structured(
await server.call_tool(
"service_start",
{
"workspace_id": "workspace-123",
"service_name": "http",
"command": "run-http",
"ready_http": "http://127.0.0.1:8080/",
},
)
)
command_service = _extract_structured(
await server.call_tool(
"service_start",
{
"workspace_id": "workspace-123",
"service_name": "command",
"command": "run-command",
"ready_command": "test -f .ready",
},
)
)
return (
status,
logs,
opened,
read,
wrote,
signaled,
closed,
file_service,
tcp_service,
http_service,
command_service,
)
results = asyncio.run(_run())
assert results[0]["state"] == "started"
assert results[1]["count"] == 0
assert results[2]["shell_id"] == "shell-1"
assert results[6]["closed"] is True
assert results[7]["state"] == "running"
assert results[10]["state"] == "running"
assert calls == [
("status_workspace", {"workspace_id": "workspace-123"}),
("logs_workspace", {"workspace_id": "workspace-123"}),
(
"open_shell",
{
"workspace_id": "workspace-123",
"cwd": "/workspace/src",
"cols": 100,
"rows": 20,
"secret_env": {"TOKEN": "API_TOKEN"},
},
),
(
"read_shell",
{
"workspace_id": "workspace-123",
"shell_id": "shell-1",
"cursor": 5,
"max_chars": 1024,
"plain": True,
"wait_for_idle_ms": 300,
},
),
(
"write_shell",
{
"workspace_id": "workspace-123",
"shell_id": "shell-1",
"input_text": "pwd",
"append_newline": False,
},
),
(
"signal_shell",
{
"workspace_id": "workspace-123",
"shell_id": "shell-1",
"signal_name": "TERM",
},
),
("close_shell", {"workspace_id": "workspace-123", "shell_id": "shell-1"}),
(
"start_service",
{
"workspace_id": "workspace-123",
"service_name": "file",
"command": "run-file",
"cwd": "/workspace",
"readiness": {"type": "file", "path": ".ready"},
"ready_timeout_seconds": 30,
"ready_interval_ms": 500,
"secret_env": None,
"published_ports": None,
},
),
(
"start_service",
{
"workspace_id": "workspace-123",
"service_name": "tcp",
"command": "run-tcp",
"cwd": "/workspace",
"readiness": {"type": "tcp", "address": "127.0.0.1:8080"},
"ready_timeout_seconds": 30,
"ready_interval_ms": 500,
"secret_env": None,
"published_ports": None,
},
),
(
"start_service",
{
"workspace_id": "workspace-123",
"service_name": "http",
"command": "run-http",
"cwd": "/workspace",
"readiness": {"type": "http", "url": "http://127.0.0.1:8080/"},
"ready_timeout_seconds": 30,
"ready_interval_ms": 500,
"secret_env": None,
"published_ports": None,
},
),
(
"start_service",
{
"workspace_id": "workspace-123",
"service_name": "command",
"command": "run-command",
"cwd": "/workspace",
"readiness": {"type": "command", "command": "test -f .ready"},
"ready_timeout_seconds": 30,
"ready_interval_ms": 500,
"secret_env": None,
"published_ports": None,
},
),
]