from __future__ import annotations import io import json import socket import tarfile from pathlib import Path import pytest from pyro_mcp.vm_guest import VsockExecClient class StubSocket: def __init__(self, responses: list[bytes] | bytes) -> None: if isinstance(responses, bytes): self.responses = [responses] else: self.responses = responses self._buffer = b"" self.connected: object | None = None self.sent = b"" self.timeout: int | None = None self.closed = False def settimeout(self, timeout: int) -> None: self.timeout = timeout def connect(self, address: tuple[int, int]) -> None: self.connected = address def sendall(self, data: bytes) -> None: self.sent += data def recv(self, size: int) -> bytes: if not self._buffer and self.responses: self._buffer = self.responses.pop(0) if not self._buffer: return b"" data = self._buffer[:size] self._buffer = self._buffer[size:] return data def close(self) -> None: self.closed = True def test_vsock_exec_client_round_trip(monkeypatch: pytest.MonkeyPatch) -> None: monkeypatch.setattr(socket, "AF_VSOCK", 40, raising=False) stub = StubSocket( b'{"stdout":"ok\\n","stderr":"","exit_code":0,"duration_ms":7}' ) def socket_factory(family: int, sock_type: int) -> StubSocket: assert family == socket.AF_VSOCK assert sock_type == socket.SOCK_STREAM return stub client = VsockExecClient(socket_factory=socket_factory) response = client.exec(1234, 5005, "echo ok", 30, env={"TOKEN": "expected"}) assert response.exit_code == 0 assert response.stdout == "ok\n" assert stub.connected == (1234, 5005) assert b'"command": "echo ok"' in stub.sent assert b'"env": {"TOKEN": "expected"}' in stub.sent assert stub.closed is True def test_vsock_exec_client_upload_archive_round_trip( monkeypatch: pytest.MonkeyPatch, tmp_path: Path ) -> None: monkeypatch.setattr(socket, "AF_VSOCK", 40, raising=False) archive_path = tmp_path / "seed.tgz" with tarfile.open(archive_path, "w:gz") as archive: payload = b"hello\n" info = tarfile.TarInfo(name="note.txt") info.size = len(payload) archive.addfile(info, io.BytesIO(payload)) stub = StubSocket( b'{"destination":"/workspace","entry_count":1,"bytes_written":6}' ) def socket_factory(family: int, sock_type: int) -> StubSocket: assert family == socket.AF_VSOCK assert sock_type == socket.SOCK_STREAM return stub client = VsockExecClient(socket_factory=socket_factory) response = client.upload_archive( 1234, 5005, archive_path, destination="/workspace", timeout_seconds=60, ) request_payload, archive_payload = stub.sent.split(b"\n", 1) request = json.loads(request_payload.decode("utf-8")) assert request["action"] == "extract_archive" assert request["destination"] == "/workspace" assert int(request["archive_size"]) == archive_path.stat().st_size assert archive_payload == archive_path.read_bytes() assert response.entry_count == 1 assert response.bytes_written == 6 assert stub.closed is True def test_vsock_exec_client_install_secrets_round_trip( monkeypatch: pytest.MonkeyPatch, tmp_path: Path ) -> None: monkeypatch.setattr(socket, "AF_VSOCK", 40, raising=False) archive_path = tmp_path / "secrets.tar" with tarfile.open(archive_path, "w") as archive: payload = b"expected\n" info = tarfile.TarInfo(name="API_TOKEN") info.size = len(payload) archive.addfile(info, io.BytesIO(payload)) stub = StubSocket( b'{"destination":"/run/pyro-secrets","entry_count":1,"bytes_written":9}' ) def socket_factory(family: int, sock_type: int) -> StubSocket: assert family == socket.AF_VSOCK assert sock_type == socket.SOCK_STREAM return stub client = VsockExecClient(socket_factory=socket_factory) response = client.install_secrets(1234, 5005, archive_path, timeout_seconds=60) request_payload, archive_payload = stub.sent.split(b"\n", 1) request = json.loads(request_payload.decode("utf-8")) assert request["action"] == "install_secrets" assert int(request["archive_size"]) == archive_path.stat().st_size assert archive_payload == archive_path.read_bytes() assert response.destination == "/run/pyro-secrets" assert response.entry_count == 1 assert response.bytes_written == 9 assert stub.closed is True def test_vsock_exec_client_export_archive_round_trip( monkeypatch: pytest.MonkeyPatch, tmp_path: Path ) -> None: monkeypatch.setattr(socket, "AF_VSOCK", 40, raising=False) archive_bytes = io.BytesIO() with tarfile.open(fileobj=archive_bytes, mode="w") as archive: payload = b"hello\n" info = tarfile.TarInfo(name="note.txt") info.size = len(payload) archive.addfile(info, io.BytesIO(payload)) archive_payload = archive_bytes.getvalue() header = json.dumps( { "workspace_path": "/workspace/note.txt", "artifact_type": "file", "archive_size": len(archive_payload), "entry_count": 1, "bytes_written": 6, } ).encode("utf-8") + b"\n" stub = StubSocket(header + archive_payload) def socket_factory(family: int, sock_type: int) -> StubSocket: assert family == socket.AF_VSOCK assert sock_type == socket.SOCK_STREAM return stub client = VsockExecClient(socket_factory=socket_factory) archive_path = tmp_path / "export.tar" response = client.export_archive( 1234, 5005, workspace_path="/workspace/note.txt", archive_path=archive_path, timeout_seconds=60, ) request = json.loads(stub.sent.decode("utf-8").strip()) assert request["action"] == "export_archive" assert request["path"] == "/workspace/note.txt" assert archive_path.read_bytes() == archive_payload assert response.workspace_path == "/workspace/note.txt" assert response.artifact_type == "file" assert response.entry_count == 1 assert response.bytes_written == 6 assert stub.closed is True def test_vsock_exec_client_shell_round_trip(monkeypatch: pytest.MonkeyPatch) -> None: monkeypatch.setattr(socket, "AF_VSOCK", 40, raising=False) responses = [ json.dumps( { "shell_id": "shell-1", "cwd": "/workspace", "cols": 120, "rows": 30, "state": "running", "started_at": 1.0, "ended_at": None, "exit_code": None, } ).encode("utf-8"), json.dumps( { "shell_id": "shell-1", "cwd": "/workspace", "cols": 120, "rows": 30, "state": "running", "started_at": 1.0, "ended_at": None, "exit_code": None, "cursor": 0, "next_cursor": 12, "output": "pyro$ pwd\n", "truncated": False, } ).encode("utf-8"), json.dumps( { "shell_id": "shell-1", "cwd": "/workspace", "cols": 120, "rows": 30, "state": "running", "started_at": 1.0, "ended_at": None, "exit_code": None, "input_length": 3, "append_newline": True, } ).encode("utf-8"), json.dumps( { "shell_id": "shell-1", "cwd": "/workspace", "cols": 120, "rows": 30, "state": "running", "started_at": 1.0, "ended_at": None, "exit_code": None, "signal": "INT", } ).encode("utf-8"), json.dumps( { "shell_id": "shell-1", "cwd": "/workspace", "cols": 120, "rows": 30, "state": "stopped", "started_at": 1.0, "ended_at": 2.0, "exit_code": 0, "closed": True, } ).encode("utf-8"), ] stubs = [StubSocket(response) for response in responses] remaining = list(stubs) def socket_factory(family: int, sock_type: int) -> StubSocket: assert family == socket.AF_VSOCK assert sock_type == socket.SOCK_STREAM return remaining.pop(0) client = VsockExecClient(socket_factory=socket_factory) opened = client.open_shell( 1234, 5005, shell_id="shell-1", cwd="/workspace", cols=120, rows=30, env={"TOKEN": "expected"}, redact_values=["expected"], ) assert opened.shell_id == "shell-1" read = client.read_shell(1234, 5005, shell_id="shell-1", cursor=0, max_chars=1024) assert read.output == "pyro$ pwd\n" write = client.write_shell( 1234, 5005, shell_id="shell-1", input_text="pwd", append_newline=True, ) assert write["input_length"] == 3 signaled = client.signal_shell(1234, 5005, shell_id="shell-1", signal_name="INT") assert signaled["signal"] == "INT" closed = client.close_shell(1234, 5005, shell_id="shell-1") assert closed["closed"] is True open_request = json.loads(stubs[0].sent.decode("utf-8").strip()) assert open_request["action"] == "open_shell" assert open_request["shell_id"] == "shell-1" assert open_request["env"] == {"TOKEN": "expected"} assert open_request["redact_values"] == ["expected"] def test_vsock_exec_client_service_round_trip(monkeypatch: pytest.MonkeyPatch) -> None: monkeypatch.setattr(socket, "AF_VSOCK", 40, raising=False) responses = [ json.dumps( { "service_name": "app", "command": "echo ok", "cwd": "/workspace", "state": "running", "started_at": 1.0, "ready_at": 2.0, "ended_at": None, "exit_code": None, "pid": 42, "readiness": {"type": "file", "path": "/workspace/.ready"}, "stop_reason": None, } ).encode("utf-8"), json.dumps( { "service_name": "app", "command": "echo ok", "cwd": "/workspace", "state": "running", "started_at": 1.0, "ready_at": 2.0, "ended_at": None, "exit_code": None, "pid": 42, "readiness": {"type": "file", "path": "/workspace/.ready"}, "stop_reason": None, } ).encode("utf-8"), json.dumps( { "service_name": "app", "command": "echo ok", "cwd": "/workspace", "state": "running", "started_at": 1.0, "ready_at": 2.0, "ended_at": None, "exit_code": None, "pid": 42, "readiness": {"type": "file", "path": "/workspace/.ready"}, "stop_reason": None, "stdout": "ok\n", "stderr": "", "tail_lines": 200, "truncated": False, } ).encode("utf-8"), json.dumps( { "service_name": "app", "command": "echo ok", "cwd": "/workspace", "state": "stopped", "started_at": 1.0, "ready_at": 2.0, "ended_at": 3.0, "exit_code": 0, "pid": 42, "readiness": {"type": "file", "path": "/workspace/.ready"}, "stop_reason": "sigterm", } ).encode("utf-8"), ] stubs = [StubSocket(response) for response in responses] remaining = list(stubs) def socket_factory(family: int, sock_type: int) -> StubSocket: assert family == socket.AF_VSOCK assert sock_type == socket.SOCK_STREAM return remaining.pop(0) client = VsockExecClient(socket_factory=socket_factory) started = client.start_service( 1234, 5005, service_name="app", command="echo ok", cwd="/workspace", readiness={"type": "file", "path": "/workspace/.ready"}, ready_timeout_seconds=30, ready_interval_ms=500, env={"TOKEN": "expected"}, ) assert started["service_name"] == "app" status = client.status_service(1234, 5005, service_name="app") assert status["state"] == "running" logs = client.logs_service(1234, 5005, service_name="app", tail_lines=200) assert logs["stdout"] == "ok\n" stopped = client.stop_service(1234, 5005, service_name="app") assert stopped["state"] == "stopped" start_request = json.loads(stubs[0].sent.decode("utf-8").strip()) assert start_request["action"] == "start_service" assert start_request["service_name"] == "app" assert start_request["env"] == {"TOKEN": "expected"} def test_vsock_exec_client_raises_agent_error(monkeypatch: pytest.MonkeyPatch) -> None: monkeypatch.setattr(socket, "AF_VSOCK", 40, raising=False) stub = StubSocket(b'{"error":"shell is unavailable"}') client = VsockExecClient(socket_factory=lambda family, sock_type: stub) with pytest.raises(RuntimeError, match="shell is unavailable"): client.open_shell( 1234, 5005, shell_id="shell-1", cwd="/workspace", cols=120, rows=30, ) def test_vsock_exec_client_rejects_bad_json(monkeypatch: pytest.MonkeyPatch) -> None: monkeypatch.setattr(socket, "AF_VSOCK", 40, raising=False) stub = StubSocket(b"[]") client = VsockExecClient(socket_factory=lambda family, sock_type: stub) with pytest.raises(RuntimeError, match="JSON object"): client.exec(1234, 5005, "echo ok", 30) def test_vsock_exec_client_uses_unix_bridge_when_vsock_is_unavailable( monkeypatch: pytest.MonkeyPatch, ) -> None: monkeypatch.delattr(socket, "AF_VSOCK", raising=False) stub = StubSocket( [ b"OK 1073746829\n", b'{"stdout":"ready\\n","stderr":"","exit_code":0,"duration_ms":5}', ] ) def socket_factory(family: int, sock_type: int) -> StubSocket: assert family == socket.AF_UNIX assert sock_type == socket.SOCK_STREAM return stub client = VsockExecClient(socket_factory=socket_factory) response = client.exec(1234, 5005, "echo ready", 30, uds_path="/tmp/vsock.sock") assert response.stdout == "ready\n" assert stub.connected == "/tmp/vsock.sock" assert stub.sent.startswith(b"CONNECT 5005\n") def test_vsock_exec_client_requires_transport_when_vsock_is_unavailable( monkeypatch: pytest.MonkeyPatch, ) -> None: monkeypatch.delattr(socket, "AF_VSOCK", raising=False) client = VsockExecClient(socket_factory=lambda family, sock_type: StubSocket(b"")) with pytest.raises(RuntimeError, match="not supported"): client.exec(1234, 5005, "echo ok", 30)