from __future__ import annotations import os import subprocess import time from pathlib import Path from typing import cast import pytest from pyro_mcp.workspace_shells import ( create_local_shell, get_local_shell, remove_local_shell, shell_signal_arg_help, shell_signal_names, ) def _read_until( workspace_id: str, shell_id: str, text: str, *, timeout_seconds: float = 5.0, ) -> dict[str, object]: deadline = time.time() + timeout_seconds payload = get_local_shell(workspace_id=workspace_id, shell_id=shell_id).read( cursor=0, max_chars=65536, ) while text not in str(payload["output"]) and time.time() < deadline: time.sleep(0.05) payload = get_local_shell(workspace_id=workspace_id, shell_id=shell_id).read( cursor=0, max_chars=65536, ) return payload def test_workspace_shells_round_trip(tmp_path: Path) -> None: session = create_local_shell( workspace_id="workspace-1", shell_id="shell-1", cwd=tmp_path, display_cwd="/workspace", cols=120, rows=30, ) try: assert session.summary()["state"] == "running" write = session.write("printf 'hello\\n'", append_newline=True) assert write["input_length"] == 16 payload = _read_until("workspace-1", "shell-1", "hello") assert "hello" in str(payload["output"]) assert cast(int, payload["next_cursor"]) >= cast(int, payload["cursor"]) assert isinstance(payload["truncated"], bool) session.write("sleep 60", append_newline=True) signaled = session.send_signal("INT") assert signaled["signal"] == "INT" finally: closed = session.close() assert closed["closed"] is True def test_workspace_shell_registry_helpers(tmp_path: Path) -> None: session = create_local_shell( workspace_id="workspace-2", shell_id="shell-2", cwd=tmp_path, display_cwd="/workspace/subdir", cols=80, rows=24, ) assert get_local_shell(workspace_id="workspace-2", shell_id="shell-2") is session assert shell_signal_names() == ("HUP", "INT", "TERM", "KILL") assert "HUP" in shell_signal_arg_help() with pytest.raises(RuntimeError, match="already exists"): create_local_shell( workspace_id="workspace-2", shell_id="shell-2", cwd=tmp_path, display_cwd="/workspace/subdir", cols=80, rows=24, ) removed = remove_local_shell(workspace_id="workspace-2", shell_id="shell-2") assert removed is session assert remove_local_shell(workspace_id="workspace-2", shell_id="shell-2") is None with pytest.raises(ValueError, match="does not exist"): get_local_shell(workspace_id="workspace-2", shell_id="shell-2") closed = session.close() assert closed["closed"] is True def test_workspace_shells_error_after_exit(tmp_path: Path) -> None: session = create_local_shell( workspace_id="workspace-3", shell_id="shell-3", cwd=tmp_path, display_cwd="/workspace", cols=120, rows=30, ) session.write("exit", append_newline=True) deadline = time.time() + 5 while session.summary()["state"] != "stopped" and time.time() < deadline: time.sleep(0.05) assert session.summary()["state"] == "stopped" with pytest.raises(RuntimeError, match="not running"): session.write("pwd", append_newline=True) with pytest.raises(RuntimeError, match="not running"): session.send_signal("INT") closed = session.close() assert closed["closed"] is True def test_workspace_shells_reject_invalid_signal(tmp_path: Path) -> None: session = create_local_shell( workspace_id="workspace-4", shell_id="shell-4", cwd=tmp_path, display_cwd="/workspace", cols=120, rows=30, ) try: with pytest.raises(ValueError, match="unsupported shell signal"): session.send_signal("BOGUS") finally: session.close() def test_workspace_shells_init_failure_closes_ptys( tmp_path: Path, monkeypatch: pytest.MonkeyPatch, ) -> None: def _boom(*args: object, **kwargs: object) -> object: raise RuntimeError("boom") monkeypatch.setattr(subprocess, "Popen", _boom) with pytest.raises(RuntimeError, match="boom"): create_local_shell( workspace_id="workspace-5", shell_id="shell-5", cwd=tmp_path, display_cwd="/workspace", cols=120, rows=30, ) def test_workspace_shells_write_and_signal_runtime_errors( tmp_path: Path, monkeypatch: pytest.MonkeyPatch, ) -> None: real_killpg = os.killpg session = create_local_shell( workspace_id="workspace-6", shell_id="shell-6", cwd=tmp_path, display_cwd="/workspace", cols=120, rows=30, ) try: with session._lock: # noqa: SLF001 session._master_fd = None # noqa: SLF001 session._input_pipe = None # noqa: SLF001 with pytest.raises(RuntimeError, match="transport is unavailable"): session.write("pwd", append_newline=True) with session._lock: # noqa: SLF001 master_fd, slave_fd = os.pipe() os.close(slave_fd) session._master_fd = master_fd # noqa: SLF001 def _raise_write(fd: int, data: bytes) -> int: del fd, data raise OSError("broken") monkeypatch.setattr("pyro_mcp.workspace_shells.os.write", _raise_write) with pytest.raises(RuntimeError, match="failed to write"): session.write("pwd", append_newline=True) def _raise_killpg(pid: int, signum: int) -> None: del pid, signum raise ProcessLookupError() monkeypatch.setattr("pyro_mcp.workspace_shells.os.killpg", _raise_killpg) with pytest.raises(RuntimeError, match="not running"): session.send_signal("INT") monkeypatch.setattr("pyro_mcp.workspace_shells.os.killpg", real_killpg) finally: try: session.close() except Exception: pass def test_workspace_shells_refresh_process_state_updates_exit_code(tmp_path: Path) -> None: session = create_local_shell( workspace_id="workspace-7", shell_id="shell-7", cwd=tmp_path, display_cwd="/workspace", cols=120, rows=30, ) try: class StubProcess: def poll(self) -> int: return 7 session._process = StubProcess() # type: ignore[assignment] # noqa: SLF001 session._refresh_process_state() # noqa: SLF001 assert session.summary()["state"] == "stopped" assert session.summary()["exit_code"] == 7 finally: try: session.close() except Exception: pass