Enable real guest networking and make demos network-first

This commit is contained in:
Thales Maciel 2026-03-06 22:47:16 -03:00
parent c43c718c83
commit b01efa6452
14 changed files with 618 additions and 72 deletions

View file

@ -8,9 +8,13 @@ from pyro_mcp.vm_guest import VsockExecClient
class StubSocket:
def __init__(self, response: bytes) -> None:
self.response = response
self.connected: tuple[int, int] | None = None
def __init__(self, responses: list[bytes] | bytes) -> None:
if isinstance(responses, bytes):
self.responses = [responses]
else:
self.responses = responses
self._buffer = b""
self.connected: object | None = None
self.sent = b""
self.timeout: int | None = None
self.closed = False
@ -25,10 +29,12 @@ class StubSocket:
self.sent += data
def recv(self, size: int) -> bytes:
del size
if self.response == b"":
if not self._buffer and self.responses:
self._buffer = self.responses.pop(0)
if not self._buffer:
return b""
data, self.response = self.response, b""
data = self._buffer[:size]
self._buffer = self._buffer[size:]
return data
def close(self) -> None:
@ -62,3 +68,36 @@ def test_vsock_exec_client_rejects_bad_json(monkeypatch: pytest.MonkeyPatch) ->
client = VsockExecClient(socket_factory=lambda family, sock_type: stub)
with pytest.raises(RuntimeError, match="JSON object"):
client.exec(1234, 5005, "echo ok", 30)
def test_vsock_exec_client_uses_unix_bridge_when_vsock_is_unavailable(
monkeypatch: pytest.MonkeyPatch,
) -> None:
monkeypatch.delattr(socket, "AF_VSOCK", raising=False)
stub = StubSocket(
[
b"OK 1073746829\n",
b'{"stdout":"ready\\n","stderr":"","exit_code":0,"duration_ms":5}',
]
)
def socket_factory(family: int, sock_type: int) -> StubSocket:
assert family == socket.AF_UNIX
assert sock_type == socket.SOCK_STREAM
return stub
client = VsockExecClient(socket_factory=socket_factory)
response = client.exec(1234, 5005, "echo ready", 30, uds_path="/tmp/vsock.sock")
assert response.stdout == "ready\n"
assert stub.connected == "/tmp/vsock.sock"
assert stub.sent.startswith(b"CONNECT 5005\n")
def test_vsock_exec_client_requires_transport_when_vsock_is_unavailable(
monkeypatch: pytest.MonkeyPatch,
) -> None:
monkeypatch.delattr(socket, "AF_VSOCK", raising=False)
client = VsockExecClient(socket_factory=lambda family, sock_type: StubSocket(b""))
with pytest.raises(RuntimeError, match="not supported"):
client.exec(1234, 5005, "echo ok", 30)