pyro-mcp/tests/test_workspace_shells.py
Thales Maciel f504f0a331 Add workspace service lifecycle with typed readiness
Make persistent workspaces capable of running long-lived background processes instead of forcing everything through one-shot exec calls.

Add workspace service start/list/status/logs/stop across the CLI, Python SDK, and MCP server, with multiple named services per workspace, typed readiness probes (file, tcp, http, and command), and aggregate service counts on workspace status. Keep service state and logs outside /workspace so diff and export semantics stay workspace-scoped, and extend the guest agent plus backends to persist service records and logs across separate calls.

Update the 2.7.0 docs, examples, changelog, and roadmap milestone to reflect the shipped surface.

Validation: uv lock; UV_CACHE_DIR=.uv-cache make check; UV_CACHE_DIR=.uv-cache make dist-check; real guest-backed Firecracker smoke for workspace create, two service starts, list/status/logs, diff unaffected, stop, and delete.
2026-03-12 05:36:28 -03:00

221 lines
6.8 KiB
Python

from __future__ import annotations
import os
import subprocess
import time
from pathlib import Path
from typing import cast
import pytest
from pyro_mcp.workspace_shells import (
create_local_shell,
get_local_shell,
remove_local_shell,
shell_signal_arg_help,
shell_signal_names,
)
def _read_until(
workspace_id: str,
shell_id: str,
text: str,
*,
timeout_seconds: float = 5.0,
) -> dict[str, object]:
deadline = time.time() + timeout_seconds
payload = get_local_shell(workspace_id=workspace_id, shell_id=shell_id).read(
cursor=0,
max_chars=65536,
)
while text not in str(payload["output"]) and time.time() < deadline:
time.sleep(0.05)
payload = get_local_shell(workspace_id=workspace_id, shell_id=shell_id).read(
cursor=0,
max_chars=65536,
)
return payload
def test_workspace_shells_round_trip(tmp_path: Path) -> None:
session = create_local_shell(
workspace_id="workspace-1",
shell_id="shell-1",
cwd=tmp_path,
display_cwd="/workspace",
cols=120,
rows=30,
)
try:
assert session.summary()["state"] == "running"
write = session.write("printf 'hello\\n'", append_newline=True)
assert write["input_length"] == 16
payload = _read_until("workspace-1", "shell-1", "hello")
assert "hello" in str(payload["output"])
assert cast(int, payload["next_cursor"]) >= cast(int, payload["cursor"])
assert isinstance(payload["truncated"], bool)
session.write("sleep 60", append_newline=True)
signaled = session.send_signal("INT")
assert signaled["signal"] == "INT"
finally:
closed = session.close()
assert closed["closed"] is True
def test_workspace_shell_registry_helpers(tmp_path: Path) -> None:
session = create_local_shell(
workspace_id="workspace-2",
shell_id="shell-2",
cwd=tmp_path,
display_cwd="/workspace/subdir",
cols=80,
rows=24,
)
assert get_local_shell(workspace_id="workspace-2", shell_id="shell-2") is session
assert shell_signal_names() == ("HUP", "INT", "TERM", "KILL")
assert "HUP" in shell_signal_arg_help()
with pytest.raises(RuntimeError, match="already exists"):
create_local_shell(
workspace_id="workspace-2",
shell_id="shell-2",
cwd=tmp_path,
display_cwd="/workspace/subdir",
cols=80,
rows=24,
)
removed = remove_local_shell(workspace_id="workspace-2", shell_id="shell-2")
assert removed is session
assert remove_local_shell(workspace_id="workspace-2", shell_id="shell-2") is None
with pytest.raises(ValueError, match="does not exist"):
get_local_shell(workspace_id="workspace-2", shell_id="shell-2")
closed = session.close()
assert closed["closed"] is True
def test_workspace_shells_error_after_exit(tmp_path: Path) -> None:
session = create_local_shell(
workspace_id="workspace-3",
shell_id="shell-3",
cwd=tmp_path,
display_cwd="/workspace",
cols=120,
rows=30,
)
session.write("exit", append_newline=True)
deadline = time.time() + 5
while session.summary()["state"] != "stopped" and time.time() < deadline:
time.sleep(0.05)
assert session.summary()["state"] == "stopped"
with pytest.raises(RuntimeError, match="not running"):
session.write("pwd", append_newline=True)
with pytest.raises(RuntimeError, match="not running"):
session.send_signal("INT")
closed = session.close()
assert closed["closed"] is True
def test_workspace_shells_reject_invalid_signal(tmp_path: Path) -> None:
session = create_local_shell(
workspace_id="workspace-4",
shell_id="shell-4",
cwd=tmp_path,
display_cwd="/workspace",
cols=120,
rows=30,
)
try:
with pytest.raises(ValueError, match="unsupported shell signal"):
session.send_signal("BOGUS")
finally:
session.close()
def test_workspace_shells_init_failure_closes_ptys(
tmp_path: Path,
monkeypatch: pytest.MonkeyPatch,
) -> None:
def _boom(*args: object, **kwargs: object) -> object:
raise RuntimeError("boom")
monkeypatch.setattr(subprocess, "Popen", _boom)
with pytest.raises(RuntimeError, match="boom"):
create_local_shell(
workspace_id="workspace-5",
shell_id="shell-5",
cwd=tmp_path,
display_cwd="/workspace",
cols=120,
rows=30,
)
def test_workspace_shells_write_and_signal_runtime_errors(
tmp_path: Path,
monkeypatch: pytest.MonkeyPatch,
) -> None:
session = create_local_shell(
workspace_id="workspace-6",
shell_id="shell-6",
cwd=tmp_path,
display_cwd="/workspace",
cols=120,
rows=30,
)
try:
with session._lock: # noqa: SLF001
session._master_fd = None # noqa: SLF001
session._input_pipe = None # noqa: SLF001
with pytest.raises(RuntimeError, match="transport is unavailable"):
session.write("pwd", append_newline=True)
with session._lock: # noqa: SLF001
master_fd, slave_fd = os.pipe()
os.close(slave_fd)
session._master_fd = master_fd # noqa: SLF001
def _raise_write(fd: int, data: bytes) -> int:
del fd, data
raise OSError("broken")
monkeypatch.setattr("pyro_mcp.workspace_shells.os.write", _raise_write)
with pytest.raises(RuntimeError, match="failed to write"):
session.write("pwd", append_newline=True)
def _raise_killpg(pid: int, signum: int) -> None:
del pid, signum
raise ProcessLookupError()
monkeypatch.setattr("pyro_mcp.workspace_shells.os.killpg", _raise_killpg)
with pytest.raises(RuntimeError, match="not running"):
session.send_signal("INT")
finally:
try:
session.close()
except Exception:
pass
def test_workspace_shells_refresh_process_state_updates_exit_code(tmp_path: Path) -> None:
session = create_local_shell(
workspace_id="workspace-7",
shell_id="shell-7",
cwd=tmp_path,
display_cwd="/workspace",
cols=120,
rows=30,
)
try:
class StubProcess:
def poll(self) -> int:
return 7
session._process = StubProcess() # type: ignore[assignment] # noqa: SLF001
session._refresh_process_state() # noqa: SLF001
assert session.summary()["state"] == "stopped"
assert session.summary()["exit_code"] == 7
finally:
try:
session.close()
except Exception:
pass