from __future__ import annotations import json import selectors import signal import socket import socketserver import threading from pathlib import Path from types import SimpleNamespace from typing import Any, cast import pytest from pyro_mcp import workspace_ports def _socketpair_or_skip() -> tuple[socket.socket, socket.socket]: try: return socket.socketpair() except PermissionError as exc: pytest.skip(f"socketpair unavailable in this environment: {exc}") class _EchoHandler(socketserver.BaseRequestHandler): def handle(self) -> None: data = self.request.recv(65536) if data: self.request.sendall(data) def test_workspace_port_proxy_handler_rejects_invalid_server() -> None: handler = workspace_ports._ProxyHandler.__new__(workspace_ports._ProxyHandler) # noqa: SLF001 handler.server = cast(Any, object()) handler.request = object() with pytest.raises(RuntimeError, match="proxy server is invalid"): handler.handle() def test_workspace_port_proxy_handler_ignores_upstream_connect_failure( monkeypatch: Any, ) -> None: handler = workspace_ports._ProxyHandler.__new__(workspace_ports._ProxyHandler) # noqa: SLF001 server = workspace_ports._ProxyServer.__new__(workspace_ports._ProxyServer) # noqa: SLF001 server.target_address = ("127.0.0.1", 12345) handler.server = server handler.request = object() def _raise_connect(*args: Any, **kwargs: Any) -> socket.socket: del args, kwargs raise OSError("boom") monkeypatch.setattr(socket, "create_connection", _raise_connect) handler.handle() def test_workspace_port_proxy_forwards_tcp_traffic() -> None: try: upstream = socketserver.ThreadingTCPServer( (workspace_ports.DEFAULT_PUBLISHED_PORT_HOST, 0), _EchoHandler, ) except PermissionError as exc: pytest.skip(f"TCP bind unavailable in this environment: {exc}") upstream_thread = threading.Thread(target=upstream.serve_forever, daemon=True) upstream_thread.start() upstream_host = str(upstream.server_address[0]) upstream_port = int(upstream.server_address[1]) try: proxy = workspace_ports._ProxyServer( # noqa: SLF001 (workspace_ports.DEFAULT_PUBLISHED_PORT_HOST, 0), (upstream_host, upstream_port), ) except PermissionError as exc: upstream.shutdown() upstream.server_close() pytest.skip(f"proxy TCP bind unavailable in this environment: {exc}") proxy_thread = threading.Thread(target=proxy.serve_forever, daemon=True) proxy_thread.start() try: proxy_host = str(proxy.server_address[0]) proxy_port = int(proxy.server_address[1]) with socket.create_connection((proxy_host, proxy_port), timeout=5) as client: client.sendall(b"hello") received = client.recv(65536) assert received == b"hello" finally: proxy.shutdown() proxy.server_close() upstream.shutdown() upstream.server_close() def test_workspace_ports_main_writes_ready_file( tmp_path: Path, monkeypatch: Any, ) -> None: ready_file = tmp_path / "proxy.ready.json" signals: list[int] = [] class StubProxyServer: def __init__( self, server_address: tuple[str, int], target_address: tuple[str, int], ) -> None: self.server_address = (server_address[0], 18080) self.target_address = target_address def serve_forever(self, poll_interval: float = 0.2) -> None: assert poll_interval == 0.2 def shutdown(self) -> None: return None def server_close(self) -> None: return None monkeypatch.setattr(workspace_ports, "_ProxyServer", StubProxyServer) monkeypatch.setattr( signal, "signal", lambda signum, handler: signals.append(signum), ) result = workspace_ports.main( [ "--listen-host", "127.0.0.1", "--listen-port", "0", "--target-host", "172.29.1.2", "--target-port", "8080", "--ready-file", str(ready_file), ] ) assert result == 0 payload = json.loads(ready_file.read_text(encoding="utf-8")) assert payload == { "host": "127.0.0.1", "host_port": 18080, "protocol": "tcp", "target_host": "172.29.1.2", "target_port": 8080, } assert signals == [signal.SIGTERM, signal.SIGINT] def test_workspace_ports_main_shutdown_handler_stops_server( tmp_path: Path, monkeypatch: Any, ) -> None: ready_file = tmp_path / "proxy.ready.json" shutdown_called: list[bool] = [] handlers: dict[int, Any] = {} class StubProxyServer: def __init__( self, server_address: tuple[str, int], target_address: tuple[str, int], ) -> None: self.server_address = server_address self.target_address = target_address def serve_forever(self, poll_interval: float = 0.2) -> None: handlers[signal.SIGTERM](signal.SIGTERM, None) assert poll_interval == 0.2 def shutdown(self) -> None: shutdown_called.append(True) def server_close(self) -> None: return None class ImmediateThread: def __init__(self, *, target: Any, daemon: bool) -> None: self._target = target assert daemon is True def start(self) -> None: self._target() monkeypatch.setattr(workspace_ports, "_ProxyServer", StubProxyServer) monkeypatch.setattr( signal, "signal", lambda signum, handler: handlers.__setitem__(signum, handler), ) monkeypatch.setattr(threading, "Thread", ImmediateThread) result = workspace_ports.main( [ "--listen-host", "127.0.0.1", "--listen-port", "18080", "--target-host", "172.29.1.2", "--target-port", "8080", "--ready-file", str(ready_file), ] ) assert result == 0 assert shutdown_called == [True] def test_workspace_port_proxy_handler_handles_empty_and_invalid_selector_events( monkeypatch: Any, ) -> None: source, source_peer = _socketpair_or_skip() upstream, upstream_peer = _socketpair_or_skip() source_peer.close() class FakeSelector: def __init__(self) -> None: self._events = iter( [ [], [(SimpleNamespace(fileobj=object(), data=object()), None)], [(SimpleNamespace(fileobj=source, data=upstream), None)], ] ) def register(self, *_args: Any, **_kwargs: Any) -> None: return None def select(self) -> list[tuple[SimpleNamespace, None]]: return next(self._events) def close(self) -> None: return None handler = workspace_ports._ProxyHandler.__new__(workspace_ports._ProxyHandler) # noqa: SLF001 server = workspace_ports._ProxyServer.__new__(workspace_ports._ProxyServer) # noqa: SLF001 server.target_address = ("127.0.0.1", 12345) handler.server = server handler.request = source monkeypatch.setattr(socket, "create_connection", lambda *args, **kwargs: upstream) monkeypatch.setattr(selectors, "DefaultSelector", FakeSelector) try: handler.handle() finally: source.close() upstream.close() upstream_peer.close() def test_workspace_port_proxy_handler_handles_recv_and_send_errors( monkeypatch: Any, ) -> None: def _run_once(*, close_source: bool) -> None: source, source_peer = _socketpair_or_skip() upstream, upstream_peer = _socketpair_or_skip() if not close_source: try: source_peer.sendall(b"hello") except PermissionError as exc: source.close() source_peer.close() upstream.close() upstream_peer.close() pytest.skip(f"socket send unavailable in this environment: {exc}") class FakeSelector: def register(self, *_args: Any, **_kwargs: Any) -> None: return None def select(self) -> list[tuple[SimpleNamespace, None]]: if close_source: source.close() else: upstream.close() return [(SimpleNamespace(fileobj=source, data=upstream), None)] def close(self) -> None: return None handler = workspace_ports._ProxyHandler.__new__(workspace_ports._ProxyHandler) # noqa: SLF001 server = workspace_ports._ProxyServer.__new__(workspace_ports._ProxyServer) # noqa: SLF001 server.target_address = ("127.0.0.1", 12345) handler.server = server handler.request = source monkeypatch.setattr(socket, "create_connection", lambda *args, **kwargs: upstream) monkeypatch.setattr(selectors, "DefaultSelector", FakeSelector) try: handler.handle() finally: source_peer.close() if close_source: upstream.close() upstream_peer.close() else: source.close() upstream_peer.close() _run_once(close_source=True) _run_once(close_source=False)