pyro-mcp/tests/test_workspace_ports.py
Thales Maciel 899a6760c4 Add host bootstrap and repair helpers
Add a dedicated pyro host surface for supported chat hosts so Claude Code, Codex, and OpenCode users can connect or repair the canonical MCP setup without hand-writing raw commands or config edits.

Implement the shared host helper layer and wire it through the CLI with connect, print-config, doctor, and repair, all generated from the same canonical pyro mcp serve command shape and project-source flags. Update the docs, public contract, examples, changelog, and roadmap so the helper flow becomes the primary onramp while raw host-specific commands remain as reference material.

Harden the verification path that this milestone exposed: temp git repos in tests now disable commit signing, socket-based port tests skip cleanly when the sandbox forbids those primitives, and make test still uses multiple cores by default but caps xdist workers to a stable value so make check stays fast and deterministic here.

Validation:
- uv lock
- UV_OFFLINE=1 UV_CACHE_DIR=.uv-cache make check
- UV_OFFLINE=1 UV_CACHE_DIR=.uv-cache make dist-check
2026-03-13 16:46:29 -03:00

311 lines
9.4 KiB
Python

from __future__ import annotations
import json
import selectors
import signal
import socket
import socketserver
import threading
from pathlib import Path
from types import SimpleNamespace
from typing import Any, cast
import pytest
from pyro_mcp import workspace_ports
def _socketpair_or_skip() -> tuple[socket.socket, socket.socket]:
try:
return socket.socketpair()
except PermissionError as exc:
pytest.skip(f"socketpair unavailable in this environment: {exc}")
class _EchoHandler(socketserver.BaseRequestHandler):
def handle(self) -> None:
data = self.request.recv(65536)
if data:
self.request.sendall(data)
def test_workspace_port_proxy_handler_rejects_invalid_server() -> None:
handler = workspace_ports._ProxyHandler.__new__(workspace_ports._ProxyHandler) # noqa: SLF001
handler.server = cast(Any, object())
handler.request = object()
with pytest.raises(RuntimeError, match="proxy server is invalid"):
handler.handle()
def test_workspace_port_proxy_handler_ignores_upstream_connect_failure(
monkeypatch: Any,
) -> None:
handler = workspace_ports._ProxyHandler.__new__(workspace_ports._ProxyHandler) # noqa: SLF001
server = workspace_ports._ProxyServer.__new__(workspace_ports._ProxyServer) # noqa: SLF001
server.target_address = ("127.0.0.1", 12345)
handler.server = server
handler.request = object()
def _raise_connect(*args: Any, **kwargs: Any) -> socket.socket:
del args, kwargs
raise OSError("boom")
monkeypatch.setattr(socket, "create_connection", _raise_connect)
handler.handle()
def test_workspace_port_proxy_forwards_tcp_traffic() -> None:
try:
upstream = socketserver.ThreadingTCPServer(
(workspace_ports.DEFAULT_PUBLISHED_PORT_HOST, 0),
_EchoHandler,
)
except PermissionError as exc:
pytest.skip(f"TCP bind unavailable in this environment: {exc}")
upstream_thread = threading.Thread(target=upstream.serve_forever, daemon=True)
upstream_thread.start()
upstream_host = str(upstream.server_address[0])
upstream_port = int(upstream.server_address[1])
try:
proxy = workspace_ports._ProxyServer( # noqa: SLF001
(workspace_ports.DEFAULT_PUBLISHED_PORT_HOST, 0),
(upstream_host, upstream_port),
)
except PermissionError as exc:
upstream.shutdown()
upstream.server_close()
pytest.skip(f"proxy TCP bind unavailable in this environment: {exc}")
proxy_thread = threading.Thread(target=proxy.serve_forever, daemon=True)
proxy_thread.start()
try:
proxy_host = str(proxy.server_address[0])
proxy_port = int(proxy.server_address[1])
with socket.create_connection((proxy_host, proxy_port), timeout=5) as client:
client.sendall(b"hello")
received = client.recv(65536)
assert received == b"hello"
finally:
proxy.shutdown()
proxy.server_close()
upstream.shutdown()
upstream.server_close()
def test_workspace_ports_main_writes_ready_file(
tmp_path: Path,
monkeypatch: Any,
) -> None:
ready_file = tmp_path / "proxy.ready.json"
signals: list[int] = []
class StubProxyServer:
def __init__(
self,
server_address: tuple[str, int],
target_address: tuple[str, int],
) -> None:
self.server_address = (server_address[0], 18080)
self.target_address = target_address
def serve_forever(self, poll_interval: float = 0.2) -> None:
assert poll_interval == 0.2
def shutdown(self) -> None:
return None
def server_close(self) -> None:
return None
monkeypatch.setattr(workspace_ports, "_ProxyServer", StubProxyServer)
monkeypatch.setattr(
signal,
"signal",
lambda signum, handler: signals.append(signum),
)
result = workspace_ports.main(
[
"--listen-host",
"127.0.0.1",
"--listen-port",
"0",
"--target-host",
"172.29.1.2",
"--target-port",
"8080",
"--ready-file",
str(ready_file),
]
)
assert result == 0
payload = json.loads(ready_file.read_text(encoding="utf-8"))
assert payload == {
"host": "127.0.0.1",
"host_port": 18080,
"protocol": "tcp",
"target_host": "172.29.1.2",
"target_port": 8080,
}
assert signals == [signal.SIGTERM, signal.SIGINT]
def test_workspace_ports_main_shutdown_handler_stops_server(
tmp_path: Path,
monkeypatch: Any,
) -> None:
ready_file = tmp_path / "proxy.ready.json"
shutdown_called: list[bool] = []
handlers: dict[int, Any] = {}
class StubProxyServer:
def __init__(
self,
server_address: tuple[str, int],
target_address: tuple[str, int],
) -> None:
self.server_address = server_address
self.target_address = target_address
def serve_forever(self, poll_interval: float = 0.2) -> None:
handlers[signal.SIGTERM](signal.SIGTERM, None)
assert poll_interval == 0.2
def shutdown(self) -> None:
shutdown_called.append(True)
def server_close(self) -> None:
return None
class ImmediateThread:
def __init__(self, *, target: Any, daemon: bool) -> None:
self._target = target
assert daemon is True
def start(self) -> None:
self._target()
monkeypatch.setattr(workspace_ports, "_ProxyServer", StubProxyServer)
monkeypatch.setattr(
signal,
"signal",
lambda signum, handler: handlers.__setitem__(signum, handler),
)
monkeypatch.setattr(threading, "Thread", ImmediateThread)
result = workspace_ports.main(
[
"--listen-host",
"127.0.0.1",
"--listen-port",
"18080",
"--target-host",
"172.29.1.2",
"--target-port",
"8080",
"--ready-file",
str(ready_file),
]
)
assert result == 0
assert shutdown_called == [True]
def test_workspace_port_proxy_handler_handles_empty_and_invalid_selector_events(
monkeypatch: Any,
) -> None:
source, source_peer = _socketpair_or_skip()
upstream, upstream_peer = _socketpair_or_skip()
source_peer.close()
class FakeSelector:
def __init__(self) -> None:
self._events = iter(
[
[],
[(SimpleNamespace(fileobj=object(), data=object()), None)],
[(SimpleNamespace(fileobj=source, data=upstream), None)],
]
)
def register(self, *_args: Any, **_kwargs: Any) -> None:
return None
def select(self) -> list[tuple[SimpleNamespace, None]]:
return next(self._events)
def close(self) -> None:
return None
handler = workspace_ports._ProxyHandler.__new__(workspace_ports._ProxyHandler) # noqa: SLF001
server = workspace_ports._ProxyServer.__new__(workspace_ports._ProxyServer) # noqa: SLF001
server.target_address = ("127.0.0.1", 12345)
handler.server = server
handler.request = source
monkeypatch.setattr(socket, "create_connection", lambda *args, **kwargs: upstream)
monkeypatch.setattr(selectors, "DefaultSelector", FakeSelector)
try:
handler.handle()
finally:
source.close()
upstream.close()
upstream_peer.close()
def test_workspace_port_proxy_handler_handles_recv_and_send_errors(
monkeypatch: Any,
) -> None:
def _run_once(*, close_source: bool) -> None:
source, source_peer = _socketpair_or_skip()
upstream, upstream_peer = _socketpair_or_skip()
if not close_source:
try:
source_peer.sendall(b"hello")
except PermissionError as exc:
source.close()
source_peer.close()
upstream.close()
upstream_peer.close()
pytest.skip(f"socket send unavailable in this environment: {exc}")
class FakeSelector:
def register(self, *_args: Any, **_kwargs: Any) -> None:
return None
def select(self) -> list[tuple[SimpleNamespace, None]]:
if close_source:
source.close()
else:
upstream.close()
return [(SimpleNamespace(fileobj=source, data=upstream), None)]
def close(self) -> None:
return None
handler = workspace_ports._ProxyHandler.__new__(workspace_ports._ProxyHandler) # noqa: SLF001
server = workspace_ports._ProxyServer.__new__(workspace_ports._ProxyServer) # noqa: SLF001
server.target_address = ("127.0.0.1", 12345)
handler.server = server
handler.request = source
monkeypatch.setattr(socket, "create_connection", lambda *args, **kwargs: upstream)
monkeypatch.setattr(selectors, "DefaultSelector", FakeSelector)
try:
handler.handle()
finally:
source_peer.close()
if close_source:
upstream.close()
upstream_peer.close()
else:
source.close()
upstream_peer.close()
_run_once(close_source=True)
_run_once(close_source=False)