Add workspace network policy and published ports

Replace the workspace-level boolean network toggle with explicit network policies and attach localhost TCP publication to workspace services.

Persist network_policy in workspace records, validate --publish requests, and run host-side proxy helpers that follow the service lifecycle so published ports are cleaned up on failure, stop, reset, and delete.

Update the CLI, SDK, MCP contract, docs, roadmap, and examples for the new policy model, add coverage for the proxy and manager edge cases, and validate with uv lock, UV_CACHE_DIR=.uv-cache make check, UV_CACHE_DIR=.uv-cache make dist-check, and a real guest-backed published-port probe smoke.
This commit is contained in:
Thales Maciel 2026-03-12 18:12:57 -03:00
parent fc72fcd3a1
commit c82f4629b2
21 changed files with 1944 additions and 49 deletions

View file

@ -1393,6 +1393,775 @@ def test_workspace_service_lifecycle_and_status_counts(tmp_path: Path) -> None:
assert deleted["deleted"] is True
def test_workspace_create_serializes_network_policy(tmp_path: Path) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
manager._runtime_capabilities = RuntimeCapabilities( # noqa: SLF001
supports_vm_boot=True,
supports_guest_exec=True,
supports_guest_network=True,
)
manager._ensure_workspace_guest_bootstrap_support = lambda instance: None # type: ignore[method-assign] # noqa: SLF001
created = manager.create_workspace(
environment="debian:12-base",
network_policy="egress",
)
assert created["network_policy"] == "egress"
workspace_id = str(created["workspace_id"])
workspace_path = tmp_path / "vms" / "workspaces" / workspace_id / "workspace.json"
payload = json.loads(workspace_path.read_text(encoding="utf-8"))
assert payload["network_policy"] == "egress"
def test_workspace_service_start_serializes_published_ports(
tmp_path: Path,
monkeypatch: pytest.MonkeyPatch,
) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
manager._runtime_capabilities = RuntimeCapabilities( # noqa: SLF001
supports_vm_boot=True,
supports_guest_exec=True,
supports_guest_network=True,
)
manager._ensure_workspace_guest_bootstrap_support = lambda instance: None # type: ignore[method-assign] # noqa: SLF001
created = manager.create_workspace(
environment="debian:12-base",
network_policy="egress+published-ports",
allow_host_compat=True,
)
workspace_id = str(created["workspace_id"])
workspace_path = tmp_path / "vms" / "workspaces" / workspace_id / "workspace.json"
payload = json.loads(workspace_path.read_text(encoding="utf-8"))
payload["network"] = {
"vm_id": workspace_id,
"tap_name": "tap-test0",
"guest_ip": "172.29.1.2",
"gateway_ip": "172.29.1.1",
"subnet_cidr": "172.29.1.0/30",
"mac_address": "06:00:ac:1d:01:02",
"dns_servers": ["1.1.1.1"],
}
workspace_path.write_text(json.dumps(payload, indent=2, sort_keys=True), encoding="utf-8")
monkeypatch.setattr(
manager,
"_start_workspace_service_published_ports",
lambda **kwargs: [
vm_manager_module.WorkspacePublishedPortRecord(
guest_port=8080,
host_port=18080,
host="127.0.0.1",
protocol="tcp",
proxy_pid=9999,
)
],
)
monkeypatch.setattr(
manager,
"_refresh_workspace_liveness_locked",
lambda workspace: None,
)
started = manager.start_service(
workspace_id,
"web",
command="sh -lc 'touch .ready && while true; do sleep 60; done'",
readiness={"type": "file", "path": ".ready"},
published_ports=[{"guest_port": 8080, "host_port": 18080}],
)
assert started["published_ports"] == [
{
"host": "127.0.0.1",
"host_port": 18080,
"guest_port": 8080,
"protocol": "tcp",
}
]
def test_workspace_service_start_rejects_published_ports_without_network_policy(
tmp_path: Path,
) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
workspace_id = str(
manager.create_workspace(
environment="debian:12-base",
allow_host_compat=True,
)["workspace_id"]
)
with pytest.raises(
RuntimeError,
match="published ports require workspace network_policy 'egress\\+published-ports'",
):
manager.start_service(
workspace_id,
"web",
command="sh -lc 'touch .ready && while true; do sleep 60; done'",
readiness={"type": "file", "path": ".ready"},
published_ports=[{"guest_port": 8080}],
)
def test_workspace_service_start_rejects_published_ports_without_active_network(
tmp_path: Path,
monkeypatch: pytest.MonkeyPatch,
) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
manager._runtime_capabilities = RuntimeCapabilities( # noqa: SLF001
supports_vm_boot=True,
supports_guest_exec=True,
supports_guest_network=True,
)
manager._ensure_workspace_guest_bootstrap_support = lambda instance: None # type: ignore[method-assign] # noqa: SLF001
monkeypatch.setattr(
manager,
"_refresh_workspace_liveness_locked",
lambda workspace: None,
)
workspace_id = str(
manager.create_workspace(
environment="debian:12-base",
network_policy="egress+published-ports",
allow_host_compat=True,
)["workspace_id"]
)
with pytest.raises(RuntimeError, match="published ports require an active guest network"):
manager.start_service(
workspace_id,
"web",
command="sh -lc 'touch .ready && while true; do sleep 60; done'",
readiness={"type": "file", "path": ".ready"},
published_ports=[{"guest_port": 8080}],
)
def test_workspace_service_start_published_port_failure_marks_service_failed(
tmp_path: Path,
monkeypatch: pytest.MonkeyPatch,
) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
manager._runtime_capabilities = RuntimeCapabilities( # noqa: SLF001
supports_vm_boot=True,
supports_guest_exec=True,
supports_guest_network=True,
)
manager._ensure_workspace_guest_bootstrap_support = lambda instance: None # type: ignore[method-assign] # noqa: SLF001
monkeypatch.setattr(
manager,
"_refresh_workspace_liveness_locked",
lambda workspace: None,
)
created = manager.create_workspace(
environment="debian:12-base",
network_policy="egress+published-ports",
allow_host_compat=True,
)
workspace_id = str(created["workspace_id"])
workspace_path = tmp_path / "vms" / "workspaces" / workspace_id / "workspace.json"
payload = json.loads(workspace_path.read_text(encoding="utf-8"))
payload["network"] = {
"vm_id": workspace_id,
"tap_name": "tap-test0",
"guest_ip": "172.29.1.2",
"gateway_ip": "172.29.1.1",
"subnet_cidr": "172.29.1.0/30",
"mac_address": "06:00:ac:1d:01:02",
"dns_servers": ["1.1.1.1"],
}
workspace_path.write_text(json.dumps(payload, indent=2, sort_keys=True), encoding="utf-8")
def _raise_proxy_failure(
**kwargs: object,
) -> list[vm_manager_module.WorkspacePublishedPortRecord]:
del kwargs
raise RuntimeError("proxy boom")
monkeypatch.setattr(manager, "_start_workspace_service_published_ports", _raise_proxy_failure)
started = manager.start_service(
workspace_id,
"web",
command="sh -lc 'touch .ready && while true; do sleep 60; done'",
readiness={"type": "file", "path": ".ready"},
published_ports=[{"guest_port": 8080, "host_port": 18080}],
)
assert started["state"] == "failed"
assert started["stop_reason"] == "published_port_failed"
assert started["published_ports"] == []
def test_workspace_service_cleanup_stops_published_port_proxies(
tmp_path: Path,
monkeypatch: pytest.MonkeyPatch,
) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
workspace_id = "workspace-cleanup"
service = vm_manager_module.WorkspaceServiceRecord(
workspace_id=workspace_id,
service_name="web",
command="sleep 60",
cwd="/workspace",
state="running",
pid=1234,
started_at=time.time(),
ended_at=None,
exit_code=None,
execution_mode="host_compat",
readiness=None,
ready_at=None,
stop_reason=None,
published_ports=[
vm_manager_module.WorkspacePublishedPortRecord(
guest_port=8080,
host_port=18080,
proxy_pid=9999,
)
],
)
manager._save_workspace_service_locked(service) # noqa: SLF001
stopped: list[int | None] = []
monkeypatch.setattr(
vm_manager_module,
"_stop_workspace_published_port_proxy",
lambda published_port: stopped.append(published_port.proxy_pid),
)
manager._delete_workspace_service_artifacts_locked(workspace_id, "web") # noqa: SLF001
assert stopped == [9999]
assert not manager._workspace_service_record_path(workspace_id, "web").exists() # noqa: SLF001
def test_workspace_refresh_workspace_service_counts_stops_published_ports_when_stopped(
tmp_path: Path,
monkeypatch: pytest.MonkeyPatch,
) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
workspace = vm_manager_module.WorkspaceRecord(
workspace_id="workspace-counts",
environment="debian:12-base",
vcpu_count=1,
mem_mib=1024,
ttl_seconds=600,
created_at=time.time(),
expires_at=time.time() + 600,
state="stopped",
firecracker_pid=None,
last_error=None,
allow_host_compat=True,
network_policy="off",
metadata={},
command_count=0,
last_command=None,
workspace_seed={
"mode": "empty",
"seed_path": None,
"destination": "/workspace",
"entry_count": 0,
"bytes_written": 0,
},
secrets=[],
reset_count=0,
last_reset_at=None,
)
service = vm_manager_module.WorkspaceServiceRecord(
workspace_id=workspace.workspace_id,
service_name="web",
command="sleep 60",
cwd="/workspace",
state="running",
pid=1234,
started_at=time.time(),
ended_at=None,
exit_code=None,
execution_mode="host_compat",
readiness=None,
ready_at=None,
stop_reason=None,
published_ports=[
vm_manager_module.WorkspacePublishedPortRecord(
guest_port=8080,
host_port=18080,
proxy_pid=9999,
)
],
)
manager._save_workspace_service_locked(service) # noqa: SLF001
stopped: list[int | None] = []
monkeypatch.setattr(
vm_manager_module,
"_stop_workspace_published_port_proxy",
lambda published_port: stopped.append(published_port.proxy_pid),
)
manager._refresh_workspace_service_counts_locked(workspace) # noqa: SLF001
assert stopped == [9999]
refreshed = manager._load_workspace_service_locked(workspace.workspace_id, "web") # noqa: SLF001
assert refreshed.state == "stopped"
assert refreshed.stop_reason == "workspace_stopped"
def test_workspace_published_port_proxy_helpers(
tmp_path: Path,
monkeypatch: pytest.MonkeyPatch,
) -> None:
services_dir = tmp_path / "services"
services_dir.mkdir(parents=True, exist_ok=True)
class StubProcess:
def __init__(self, pid: int, *, exited: bool = False) -> None:
self.pid = pid
self._exited = exited
def poll(self) -> int | None:
return 1 if self._exited else None
def _fake_popen(command: list[str], **kwargs: object) -> StubProcess:
del kwargs
ready_file = Path(command[command.index("--ready-file") + 1])
ready_file.write_text(
json.dumps(
{
"host": "127.0.0.1",
"host_port": 18080,
"target_host": "172.29.1.2",
"target_port": 8080,
"protocol": "tcp",
}
),
encoding="utf-8",
)
return StubProcess(4242)
monkeypatch.setattr(subprocess, "Popen", _fake_popen)
record = vm_manager_module._start_workspace_published_port_proxy( # noqa: SLF001
services_dir=services_dir,
service_name="web",
workspace_id="workspace-proxy",
guest_ip="172.29.1.2",
spec=vm_manager_module.WorkspacePublishedPortSpec(
guest_port=8080,
host_port=18080,
),
)
assert record.guest_port == 8080
assert record.host_port == 18080
assert record.proxy_pid == 4242
def test_workspace_published_port_proxy_timeout_and_stop(
tmp_path: Path,
monkeypatch: pytest.MonkeyPatch,
) -> None:
services_dir = tmp_path / "services"
services_dir.mkdir(parents=True, exist_ok=True)
class StubProcess:
pid = 4242
def poll(self) -> int | None:
return None
monkeypatch.setattr(subprocess, "Popen", lambda *args, **kwargs: StubProcess())
monotonic_values = iter([0.0, 0.0, 5.1])
monkeypatch.setattr(time, "monotonic", lambda: next(monotonic_values))
monkeypatch.setattr(time, "sleep", lambda _: None)
stopped: list[int | None] = []
monkeypatch.setattr(
vm_manager_module,
"_stop_workspace_published_port_proxy",
lambda published_port: stopped.append(published_port.proxy_pid),
)
with pytest.raises(RuntimeError, match="timed out waiting for published port proxy readiness"):
vm_manager_module._start_workspace_published_port_proxy( # noqa: SLF001
services_dir=services_dir,
service_name="web",
workspace_id="workspace-proxy",
guest_ip="172.29.1.2",
spec=vm_manager_module.WorkspacePublishedPortSpec(
guest_port=8080,
host_port=18080,
),
)
assert stopped == [4242]
def test_workspace_published_port_validation_and_stop_helper(
monkeypatch: pytest.MonkeyPatch,
) -> None:
spec = vm_manager_module._normalize_workspace_published_port( # noqa: SLF001
guest_port="8080",
host_port="18080",
)
assert spec.guest_port == 8080
assert spec.host_port == 18080
with pytest.raises(ValueError, match="published guest_port must be an integer"):
vm_manager_module._normalize_workspace_published_port(guest_port=object()) # noqa: SLF001
with pytest.raises(ValueError, match="published host_port must be between 1025 and 65535"):
vm_manager_module._normalize_workspace_published_port( # noqa: SLF001
guest_port=8080,
host_port=80,
)
signals: list[int] = []
monkeypatch.setattr(os, "killpg", lambda pid, sig: signals.append(sig))
running = iter([True, False])
monkeypatch.setattr(vm_manager_module, "_pid_is_running", lambda pid: next(running))
monkeypatch.setattr(time, "sleep", lambda _: None)
vm_manager_module._stop_workspace_published_port_proxy( # noqa: SLF001
vm_manager_module.WorkspacePublishedPortRecord(
guest_port=8080,
host_port=18080,
proxy_pid=9999,
)
)
assert signals == [signal.SIGTERM]
def test_workspace_network_policy_requires_guest_network_support(tmp_path: Path) -> None:
manager = VmManager(
backend_name="firecracker",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
manager._runtime_capabilities = RuntimeCapabilities( # noqa: SLF001
supports_vm_boot=False,
supports_guest_exec=False,
supports_guest_network=False,
reason="no guest network",
)
with pytest.raises(RuntimeError, match="workspace network_policy requires guest networking"):
manager._require_workspace_network_policy_support( # noqa: SLF001
network_policy="egress"
)
def test_prepare_workspace_seed_rejects_missing_and_invalid_paths(tmp_path: Path) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
with pytest.raises(ValueError, match="does not exist"):
manager._prepare_workspace_seed(tmp_path / "missing") # noqa: SLF001
invalid_source = tmp_path / "seed.txt"
invalid_source.write_text("seed", encoding="utf-8")
with pytest.raises(
ValueError,
match="seed_path must be a directory or a .tar/.tar.gz/.tgz archive",
):
manager._prepare_workspace_seed(invalid_source) # noqa: SLF001
def test_workspace_baseline_snapshot_requires_archive(tmp_path: Path) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
created = manager.create_workspace(
environment="debian:12",
vcpu_count=1,
mem_mib=512,
ttl_seconds=600,
allow_host_compat=True,
)
workspace_id = str(created["workspace_id"])
baseline_path = tmp_path / "vms" / "workspaces" / workspace_id / "baseline" / "workspace.tar"
baseline_path.unlink()
workspace = manager._load_workspace_locked(workspace_id) # noqa: SLF001
with pytest.raises(RuntimeError, match="baseline snapshot"):
manager._workspace_baseline_snapshot_locked(workspace) # noqa: SLF001
def test_workspace_snapshot_and_service_loaders_handle_invalid_payloads(tmp_path: Path) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
workspace_id = "workspace-invalid"
services_dir = tmp_path / "vms" / "workspaces" / workspace_id / "services"
snapshots_dir = tmp_path / "vms" / "workspaces" / workspace_id / "snapshots"
services_dir.mkdir(parents=True, exist_ok=True)
snapshots_dir.mkdir(parents=True, exist_ok=True)
(services_dir / "svc.json").write_text("[]", encoding="utf-8")
(snapshots_dir / "snap.json").write_text("[]", encoding="utf-8")
with pytest.raises(RuntimeError, match="service record"):
manager._load_workspace_service_locked(workspace_id, "svc") # noqa: SLF001
with pytest.raises(RuntimeError, match="snapshot record"):
manager._load_workspace_snapshot_locked(workspace_id, "snap") # noqa: SLF001
with pytest.raises(RuntimeError, match="snapshot record"):
manager._load_workspace_snapshot_locked_optional(workspace_id, "snap") # noqa: SLF001
assert manager._load_workspace_snapshot_locked_optional(workspace_id, "missing") is None # noqa: SLF001
def test_workspace_shell_helpers_handle_missing_invalid_and_close_errors(
tmp_path: Path,
monkeypatch: pytest.MonkeyPatch,
) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
created = manager.create_workspace(
environment="debian:12",
vcpu_count=1,
mem_mib=512,
ttl_seconds=600,
allow_host_compat=True,
)
workspace_id = str(created["workspace_id"])
assert manager._list_workspace_shells_locked(workspace_id) == [] # noqa: SLF001
shells_dir = tmp_path / "vms" / "workspaces" / workspace_id / "shells"
shells_dir.mkdir(parents=True, exist_ok=True)
(shells_dir / "invalid.json").write_text("[]", encoding="utf-8")
assert manager._list_workspace_shells_locked(workspace_id) == [] # noqa: SLF001
shell = vm_manager_module.WorkspaceShellRecord(
workspace_id=workspace_id,
shell_id="shell-1",
cwd="/workspace",
cols=120,
rows=30,
state="running",
started_at=time.time(),
)
manager._save_workspace_shell_locked(shell) # noqa: SLF001
workspace = manager._load_workspace_locked(workspace_id) # noqa: SLF001
instance = workspace.to_instance(
workdir=tmp_path / "vms" / "workspaces" / workspace_id / "runtime"
)
def _raise_close(**kwargs: object) -> dict[str, object]:
del kwargs
raise RuntimeError("shell close boom")
monkeypatch.setattr(manager._backend, "close_shell", _raise_close)
manager._close_workspace_shells_locked(workspace, instance) # noqa: SLF001
assert manager._list_workspace_shells_locked(workspace_id) == [] # noqa: SLF001
def test_workspace_refresh_service_helpers_cover_exit_and_started_refresh(
tmp_path: Path,
monkeypatch: pytest.MonkeyPatch,
) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
created = manager.create_workspace(
environment="debian:12",
vcpu_count=1,
mem_mib=512,
ttl_seconds=600,
allow_host_compat=True,
)
workspace_id = str(created["workspace_id"])
workspace_path = tmp_path / "vms" / "workspaces" / workspace_id / "workspace.json"
payload = json.loads(workspace_path.read_text(encoding="utf-8"))
payload["state"] = "started"
payload["network"] = {
"vm_id": workspace_id,
"tap_name": "tap-test0",
"guest_ip": "172.29.1.2",
"gateway_ip": "172.29.1.1",
"subnet_cidr": "172.29.1.0/30",
"mac_address": "06:00:ac:1d:01:02",
"dns_servers": ["1.1.1.1"],
}
workspace_path.write_text(json.dumps(payload, indent=2, sort_keys=True), encoding="utf-8")
workspace = manager._load_workspace_locked(workspace_id) # noqa: SLF001
instance = workspace.to_instance(
workdir=tmp_path / "vms" / "workspaces" / workspace_id / "runtime"
)
service = vm_manager_module.WorkspaceServiceRecord(
workspace_id=workspace_id,
service_name="web",
command="sleep 60",
cwd="/workspace",
state="running",
started_at=time.time(),
execution_mode="guest_vsock",
published_ports=[
vm_manager_module.WorkspacePublishedPortRecord(
guest_port=8080,
host_port=18080,
proxy_pid=9999,
)
],
)
manager._save_workspace_service_locked(service) # noqa: SLF001
stopped: list[int | None] = []
monkeypatch.setattr(
vm_manager_module,
"_stop_workspace_published_port_proxy",
lambda published_port: stopped.append(published_port.proxy_pid),
)
monkeypatch.setattr(
manager._backend,
"status_service",
lambda *args, **kwargs: {
"service_name": "web",
"command": "sleep 60",
"cwd": "/workspace",
"state": "exited",
"started_at": service.started_at,
"ended_at": service.started_at + 1,
"exit_code": 0,
"execution_mode": "guest_vsock",
},
)
refreshed = manager._refresh_workspace_service_locked( # noqa: SLF001
workspace,
instance,
service,
)
assert refreshed.state == "exited"
assert refreshed.published_ports == [
vm_manager_module.WorkspacePublishedPortRecord(
guest_port=8080,
host_port=18080,
proxy_pid=None,
)
]
assert stopped == [9999]
manager._save_workspace_service_locked(service) # noqa: SLF001
refreshed_calls: list[str] = []
monkeypatch.setattr(manager, "_require_workspace_service_support", lambda instance: None)
def _refresh_services(
workspace: vm_manager_module.WorkspaceRecord,
instance: vm_manager_module.VmInstance,
) -> list[vm_manager_module.WorkspaceServiceRecord]:
del instance
refreshed_calls.append(workspace.workspace_id)
return []
monkeypatch.setattr(
manager,
"_refresh_workspace_services_locked",
_refresh_services,
)
manager._refresh_workspace_service_counts_locked(workspace) # noqa: SLF001
assert refreshed_calls == [workspace_id]
def test_workspace_start_published_ports_cleans_up_partial_failure(
tmp_path: Path,
monkeypatch: pytest.MonkeyPatch,
) -> None:
manager = VmManager(
backend_name="mock",
base_dir=tmp_path / "vms",
network_manager=TapNetworkManager(enabled=False),
)
created = manager.create_workspace(
environment="debian:12",
vcpu_count=1,
mem_mib=512,
ttl_seconds=600,
allow_host_compat=True,
)
workspace = manager._load_workspace_locked(str(created["workspace_id"])) # noqa: SLF001
service = vm_manager_module.WorkspaceServiceRecord(
workspace_id=workspace.workspace_id,
service_name="web",
command="sleep 60",
cwd="/workspace",
state="running",
started_at=time.time(),
execution_mode="guest_vsock",
)
started_record = vm_manager_module.WorkspacePublishedPortRecord(
guest_port=8080,
host_port=18080,
proxy_pid=9999,
)
calls: list[int] = []
def _start_proxy(**kwargs: object) -> vm_manager_module.WorkspacePublishedPortRecord:
spec = cast(vm_manager_module.WorkspacePublishedPortSpec, kwargs["spec"])
if spec.guest_port == 8080:
return started_record
raise RuntimeError("proxy boom")
monkeypatch.setattr(vm_manager_module, "_start_workspace_published_port_proxy", _start_proxy)
monkeypatch.setattr(
vm_manager_module,
"_stop_workspace_published_port_proxy",
lambda published_port: calls.append(published_port.proxy_pid or -1),
)
with pytest.raises(RuntimeError, match="proxy boom"):
manager._start_workspace_service_published_ports( # noqa: SLF001
workspace=workspace,
service=service,
guest_ip="172.29.1.2",
published_ports=[
vm_manager_module.WorkspacePublishedPortSpec(guest_port=8080),
vm_manager_module.WorkspacePublishedPortSpec(guest_port=9090),
],
)
assert calls == [9999]
def test_workspace_service_start_replaces_non_running_record(tmp_path: Path) -> None:
manager = VmManager(
backend_name="mock",