pyro-mcp/src/pyro_mcp/vm_manager.py
Thales Maciel 48b82d8386 Pivot persistent APIs to workspaces
Replace the public persistent-sandbox contract with workspace-first naming across CLI, SDK, MCP, payloads, and on-disk state.

Rename the task surface to workspace equivalents, switch create-time seeding to `seed_path`, and store records under `workspaces/<workspace_id>/workspace.json` without carrying legacy task aliases or migrating old local task state.

Keep `pyro run` and `vm_*` unchanged. Validation covered `uv lock`, focused public-contract/API/CLI/manager tests, `UV_CACHE_DIR=.uv-cache make check`, and `UV_CACHE_DIR=.uv-cache make dist-check`.
2026-03-12 01:24:01 -03:00

1706 lines
68 KiB
Python

"""Lifecycle manager for ephemeral VM environments and persistent workspaces."""
from __future__ import annotations
import json
import os
import shlex
import shutil
import signal
import subprocess
import tarfile
import tempfile
import threading
import time
import uuid
from dataclasses import dataclass, field
from pathlib import Path, PurePosixPath
from typing import Any, Literal, cast
from pyro_mcp.runtime import (
RuntimeCapabilities,
RuntimePaths,
resolve_runtime_paths,
runtime_capabilities,
)
from pyro_mcp.vm_environments import EnvironmentStore, default_cache_dir, get_environment
from pyro_mcp.vm_firecracker import build_launch_plan
from pyro_mcp.vm_guest import VsockExecClient
from pyro_mcp.vm_network import NetworkConfig, TapNetworkManager
VmState = Literal["created", "started", "stopped"]
DEFAULT_VCPU_COUNT = 1
DEFAULT_MEM_MIB = 1024
DEFAULT_TIMEOUT_SECONDS = 30
DEFAULT_TTL_SECONDS = 600
DEFAULT_ALLOW_HOST_COMPAT = False
WORKSPACE_LAYOUT_VERSION = 2
WORKSPACE_DIRNAME = "workspace"
WORKSPACE_COMMANDS_DIRNAME = "commands"
WORKSPACE_RUNTIME_DIRNAME = "runtime"
WORKSPACE_GUEST_PATH = "/workspace"
WORKSPACE_GUEST_AGENT_PATH = "/opt/pyro/bin/pyro_guest_agent.py"
WORKSPACE_ARCHIVE_UPLOAD_TIMEOUT_SECONDS = 60
WorkspaceSeedMode = Literal["empty", "directory", "tar_archive"]
@dataclass
class VmInstance:
"""In-memory VM lifecycle record."""
vm_id: str
environment: str
vcpu_count: int
mem_mib: int
ttl_seconds: int
created_at: float
expires_at: float
workdir: Path
state: VmState = "created"
network_requested: bool = False
allow_host_compat: bool = DEFAULT_ALLOW_HOST_COMPAT
firecracker_pid: int | None = None
last_error: str | None = None
metadata: dict[str, str] = field(default_factory=dict)
network: NetworkConfig | None = None
@dataclass
class WorkspaceRecord:
"""Persistent workspace metadata stored on disk."""
workspace_id: str
environment: str
vcpu_count: int
mem_mib: int
ttl_seconds: int
created_at: float
expires_at: float
state: VmState
network_requested: bool
allow_host_compat: bool
firecracker_pid: int | None = None
last_error: str | None = None
metadata: dict[str, str] = field(default_factory=dict)
network: NetworkConfig | None = None
command_count: int = 0
last_command: dict[str, Any] | None = None
workspace_seed: dict[str, Any] = field(default_factory=dict)
@classmethod
def from_instance(
cls,
instance: VmInstance,
*,
command_count: int = 0,
last_command: dict[str, Any] | None = None,
workspace_seed: dict[str, Any] | None = None,
) -> WorkspaceRecord:
return cls(
workspace_id=instance.vm_id,
environment=instance.environment,
vcpu_count=instance.vcpu_count,
mem_mib=instance.mem_mib,
ttl_seconds=instance.ttl_seconds,
created_at=instance.created_at,
expires_at=instance.expires_at,
state=instance.state,
network_requested=instance.network_requested,
allow_host_compat=instance.allow_host_compat,
firecracker_pid=instance.firecracker_pid,
last_error=instance.last_error,
metadata=dict(instance.metadata),
network=instance.network,
command_count=command_count,
last_command=last_command,
workspace_seed=dict(workspace_seed or _empty_workspace_seed_payload()),
)
def to_instance(self, *, workdir: Path) -> VmInstance:
return VmInstance(
vm_id=self.workspace_id,
environment=self.environment,
vcpu_count=self.vcpu_count,
mem_mib=self.mem_mib,
ttl_seconds=self.ttl_seconds,
created_at=self.created_at,
expires_at=self.expires_at,
workdir=workdir,
state=self.state,
network_requested=self.network_requested,
allow_host_compat=self.allow_host_compat,
firecracker_pid=self.firecracker_pid,
last_error=self.last_error,
metadata=dict(self.metadata),
network=self.network,
)
def to_payload(self) -> dict[str, Any]:
return {
"layout_version": WORKSPACE_LAYOUT_VERSION,
"workspace_id": self.workspace_id,
"environment": self.environment,
"vcpu_count": self.vcpu_count,
"mem_mib": self.mem_mib,
"ttl_seconds": self.ttl_seconds,
"created_at": self.created_at,
"expires_at": self.expires_at,
"state": self.state,
"network_requested": self.network_requested,
"allow_host_compat": self.allow_host_compat,
"firecracker_pid": self.firecracker_pid,
"last_error": self.last_error,
"metadata": self.metadata,
"network": _serialize_network(self.network),
"command_count": self.command_count,
"last_command": self.last_command,
"workspace_seed": self.workspace_seed,
}
@classmethod
def from_payload(cls, payload: dict[str, Any]) -> WorkspaceRecord:
return cls(
workspace_id=str(payload["workspace_id"]),
environment=str(payload["environment"]),
vcpu_count=int(payload["vcpu_count"]),
mem_mib=int(payload["mem_mib"]),
ttl_seconds=int(payload["ttl_seconds"]),
created_at=float(payload["created_at"]),
expires_at=float(payload["expires_at"]),
state=cast(VmState, str(payload.get("state", "stopped"))),
network_requested=bool(payload.get("network_requested", False)),
allow_host_compat=bool(payload.get("allow_host_compat", DEFAULT_ALLOW_HOST_COMPAT)),
firecracker_pid=_optional_int(payload.get("firecracker_pid")),
last_error=_optional_str(payload.get("last_error")),
metadata=_string_dict(payload.get("metadata")),
network=_deserialize_network(payload.get("network")),
command_count=int(payload.get("command_count", 0)),
last_command=_optional_dict(payload.get("last_command")),
workspace_seed=_workspace_seed_dict(payload.get("workspace_seed")),
)
@dataclass(frozen=True)
class PreparedWorkspaceSeed:
"""Prepared host-side seed archive plus metadata."""
mode: WorkspaceSeedMode
source_path: str | None
archive_path: Path | None = None
entry_count: int = 0
bytes_written: int = 0
cleanup_dir: Path | None = None
def to_payload(
self,
*,
destination: str = WORKSPACE_GUEST_PATH,
path_key: str = "seed_path",
) -> dict[str, Any]:
return {
"mode": self.mode,
path_key: self.source_path,
"destination": destination,
"entry_count": self.entry_count,
"bytes_written": self.bytes_written,
}
def cleanup(self) -> None:
if self.cleanup_dir is not None:
shutil.rmtree(self.cleanup_dir, ignore_errors=True)
@dataclass(frozen=True)
class VmExecResult:
"""Command execution output."""
stdout: str
stderr: str
exit_code: int
duration_ms: int
def _optional_int(value: object) -> int | None:
if value is None:
return None
if isinstance(value, bool):
return int(value)
if isinstance(value, int):
return value
if isinstance(value, float):
return int(value)
if isinstance(value, str):
return int(value)
raise TypeError("expected integer-compatible payload")
def _optional_str(value: object) -> str | None:
if value is None:
return None
return str(value)
def _optional_dict(value: object) -> dict[str, Any] | None:
if value is None:
return None
if not isinstance(value, dict):
raise TypeError("expected dictionary payload")
return dict(value)
def _string_dict(value: object) -> dict[str, str]:
if not isinstance(value, dict):
return {}
return {str(key): str(item) for key, item in value.items()}
def _empty_workspace_seed_payload() -> dict[str, Any]:
return {
"mode": "empty",
"seed_path": None,
"destination": WORKSPACE_GUEST_PATH,
"entry_count": 0,
"bytes_written": 0,
}
def _workspace_seed_dict(value: object) -> dict[str, Any]:
if not isinstance(value, dict):
return _empty_workspace_seed_payload()
payload = _empty_workspace_seed_payload()
payload.update(
{
"mode": str(value.get("mode", payload["mode"])),
"seed_path": _optional_str(value.get("seed_path")),
"destination": str(value.get("destination", payload["destination"])),
"entry_count": int(value.get("entry_count", payload["entry_count"])),
"bytes_written": int(value.get("bytes_written", payload["bytes_written"])),
}
)
return payload
def _serialize_network(network: NetworkConfig | None) -> dict[str, Any] | None:
if network is None:
return None
return {
"vm_id": network.vm_id,
"tap_name": network.tap_name,
"guest_ip": network.guest_ip,
"gateway_ip": network.gateway_ip,
"subnet_cidr": network.subnet_cidr,
"mac_address": network.mac_address,
"dns_servers": list(network.dns_servers),
}
def _deserialize_network(payload: object) -> NetworkConfig | None:
if payload is None:
return None
if not isinstance(payload, dict):
raise TypeError("expected dictionary payload")
dns_servers = payload.get("dns_servers", [])
dns_values = tuple(str(item) for item in dns_servers) if isinstance(dns_servers, list) else ()
return NetworkConfig(
vm_id=str(payload["vm_id"]),
tap_name=str(payload["tap_name"]),
guest_ip=str(payload["guest_ip"]),
gateway_ip=str(payload["gateway_ip"]),
subnet_cidr=str(payload["subnet_cidr"]),
mac_address=str(payload["mac_address"]),
dns_servers=dns_values,
)
def _run_host_command(workdir: Path, command: str, timeout_seconds: int) -> VmExecResult:
started = time.monotonic()
env = {"PATH": os.environ.get("PATH", ""), "HOME": str(workdir)}
try:
proc = subprocess.run( # noqa: S603
["bash", "-lc", command], # noqa: S607
cwd=workdir,
env=env,
text=True,
capture_output=True,
timeout=timeout_seconds,
check=False,
)
return VmExecResult(
stdout=proc.stdout,
stderr=proc.stderr,
exit_code=proc.returncode,
duration_ms=int((time.monotonic() - started) * 1000),
)
except subprocess.TimeoutExpired:
return VmExecResult(
stdout="",
stderr=f"command timed out after {timeout_seconds}s",
exit_code=124,
duration_ms=int((time.monotonic() - started) * 1000),
)
def _copy_rootfs(source: Path, dest: Path) -> str:
dest.parent.mkdir(parents=True, exist_ok=True)
try:
proc = subprocess.run( # noqa: S603
["cp", "--reflink=auto", str(source), str(dest)],
text=True,
capture_output=True,
check=False,
)
if proc.returncode == 0:
return "reflink_or_copy"
except OSError:
pass
shutil.copy2(source, dest)
return "copy2"
def _wrap_guest_command(command: str, *, cwd: str | None = None) -> str:
if cwd is None:
return command
quoted_cwd = shlex.quote(cwd)
return f"mkdir -p {quoted_cwd} && cd {quoted_cwd} && {command}"
def _is_supported_seed_archive(path: Path) -> bool:
name = path.name.lower()
return name.endswith(".tar") or name.endswith(".tar.gz") or name.endswith(".tgz")
def _normalize_workspace_destination(destination: str) -> tuple[str, PurePosixPath]:
candidate = destination.strip()
if candidate == "":
raise ValueError("workspace destination must not be empty")
destination_path = PurePosixPath(candidate)
if any(part == ".." for part in destination_path.parts):
raise ValueError("workspace destination must stay inside /workspace")
workspace_root = PurePosixPath(WORKSPACE_GUEST_PATH)
if not destination_path.is_absolute():
destination_path = workspace_root / destination_path
parts = [part for part in destination_path.parts if part not in {"", "."}]
normalized = PurePosixPath("/") / PurePosixPath(*parts)
if normalized == PurePosixPath("/"):
raise ValueError("workspace destination must stay inside /workspace")
if normalized.parts[: len(workspace_root.parts)] != workspace_root.parts:
raise ValueError("workspace destination must stay inside /workspace")
suffix = normalized.relative_to(workspace_root)
return str(normalized), suffix
def _workspace_host_destination(workspace_dir: Path, destination: str) -> Path:
_, suffix = _normalize_workspace_destination(destination)
if str(suffix) in {"", "."}:
return workspace_dir
return workspace_dir.joinpath(*suffix.parts)
def _normalize_archive_member_name(name: str) -> PurePosixPath:
candidate = name.strip()
if candidate == "":
raise RuntimeError("archive member path is empty")
member_path = PurePosixPath(candidate)
if member_path.is_absolute():
raise RuntimeError(f"absolute archive member paths are not allowed: {name}")
parts = [part for part in member_path.parts if part not in {"", "."}]
if any(part == ".." for part in parts):
raise RuntimeError(f"unsafe archive member path: {name}")
normalized = PurePosixPath(*parts)
if str(normalized) in {"", "."}:
raise RuntimeError(f"unsafe archive member path: {name}")
return normalized
def _validate_archive_symlink_target(member_name: PurePosixPath, link_target: str) -> None:
target = link_target.strip()
if target == "":
raise RuntimeError(f"symlink {member_name} has an empty target")
link_path = PurePosixPath(target)
if link_path.is_absolute():
raise RuntimeError(f"symlink {member_name} escapes the workspace")
combined = member_name.parent.joinpath(link_path)
parts = [part for part in combined.parts if part not in {"", "."}]
if any(part == ".." for part in parts):
raise RuntimeError(f"symlink {member_name} escapes the workspace")
def _inspect_seed_archive(archive_path: Path) -> tuple[int, int]:
entry_count = 0
bytes_written = 0
with tarfile.open(archive_path, "r:*") as archive:
for member in archive.getmembers():
member_name = _normalize_archive_member_name(member.name)
entry_count += 1
if member.isdir():
continue
if member.isfile():
bytes_written += member.size
continue
if member.issym():
_validate_archive_symlink_target(member_name, member.linkname)
continue
if member.islnk():
raise RuntimeError(
f"hard links are not allowed in workspace archives: {member.name}"
)
raise RuntimeError(f"unsupported archive member type: {member.name}")
return entry_count, bytes_written
def _write_directory_seed_archive(source_dir: Path, archive_path: Path) -> None:
archive_path.parent.mkdir(parents=True, exist_ok=True)
with tarfile.open(archive_path, "w") as archive:
for child in sorted(source_dir.iterdir(), key=lambda item: item.name):
archive.add(child, arcname=child.name, recursive=True)
def _extract_seed_archive_to_host_workspace(
archive_path: Path,
*,
workspace_dir: Path,
destination: str,
) -> dict[str, Any]:
normalized_destination, _ = _normalize_workspace_destination(destination)
destination_root = _workspace_host_destination(workspace_dir, normalized_destination)
destination_root.mkdir(parents=True, exist_ok=True)
entry_count = 0
bytes_written = 0
with tarfile.open(archive_path, "r:*") as archive:
for member in archive.getmembers():
member_name = _normalize_archive_member_name(member.name)
target_path = destination_root.joinpath(*member_name.parts)
entry_count += 1
_ensure_no_symlink_parents(workspace_dir, target_path, member.name)
if member.isdir():
if target_path.is_symlink() or (target_path.exists() and not target_path.is_dir()):
raise RuntimeError(f"directory conflicts with existing path: {member.name}")
target_path.mkdir(parents=True, exist_ok=True)
continue
if member.isfile():
target_path.parent.mkdir(parents=True, exist_ok=True)
if target_path.is_symlink() or target_path.is_dir():
raise RuntimeError(f"file conflicts with existing path: {member.name}")
source = archive.extractfile(member)
if source is None:
raise RuntimeError(f"failed to read archive member: {member.name}")
with target_path.open("wb") as handle:
shutil.copyfileobj(source, handle)
bytes_written += member.size
continue
if member.issym():
_validate_archive_symlink_target(member_name, member.linkname)
target_path.parent.mkdir(parents=True, exist_ok=True)
if target_path.exists() and not target_path.is_symlink():
raise RuntimeError(f"symlink conflicts with existing path: {member.name}")
if target_path.is_symlink():
target_path.unlink()
os.symlink(member.linkname, target_path)
continue
if member.islnk():
raise RuntimeError(
f"hard links are not allowed in workspace archives: {member.name}"
)
raise RuntimeError(f"unsupported archive member type: {member.name}")
return {
"destination": normalized_destination,
"entry_count": entry_count,
"bytes_written": bytes_written,
}
def _instance_workspace_host_dir(instance: VmInstance) -> Path:
raw_value = instance.metadata.get("workspace_host_dir")
if raw_value is None or raw_value == "":
raise RuntimeError("workspace host directory is unavailable")
return Path(raw_value)
def _patch_rootfs_guest_agent(rootfs_image: Path, guest_agent_path: Path) -> None:
debugfs_path = shutil.which("debugfs")
if debugfs_path is None:
raise RuntimeError(
"debugfs is required to seed workspaces on guest-backed runtimes"
)
with tempfile.TemporaryDirectory(prefix="pyro-guest-agent-") as temp_dir:
staged_agent_path = Path(temp_dir) / "pyro_guest_agent.py"
shutil.copy2(guest_agent_path, staged_agent_path)
subprocess.run( # noqa: S603
[debugfs_path, "-w", "-R", f"rm {WORKSPACE_GUEST_AGENT_PATH}", str(rootfs_image)],
text=True,
capture_output=True,
check=False,
)
proc = subprocess.run( # noqa: S603
[
debugfs_path,
"-w",
"-R",
f"write {staged_agent_path} {WORKSPACE_GUEST_AGENT_PATH}",
str(rootfs_image),
],
text=True,
capture_output=True,
check=False,
)
if proc.returncode != 0:
raise RuntimeError(
"failed to patch guest agent into workspace rootfs: "
f"{proc.stderr.strip() or proc.stdout.strip()}"
)
def _ensure_no_symlink_parents(root: Path, target_path: Path, member_name: str) -> None:
relative_path = target_path.relative_to(root)
current = root
for part in relative_path.parts[:-1]:
current = current / part
if current.is_symlink():
raise RuntimeError(
f"archive member would traverse through a symlinked path: {member_name}"
)
def _pid_is_running(pid: int | None) -> bool:
if pid is None:
return False
try:
os.kill(pid, 0)
except ProcessLookupError:
return False
except PermissionError:
return True
return True
class VmBackend:
"""Backend interface for lifecycle operations."""
def create(self, instance: VmInstance) -> None: # pragma: no cover
raise NotImplementedError
def start(self, instance: VmInstance) -> None: # pragma: no cover
raise NotImplementedError
def exec( # pragma: no cover
self,
instance: VmInstance,
command: str,
timeout_seconds: int,
*,
workdir: Path | None = None,
) -> VmExecResult:
raise NotImplementedError
def stop(self, instance: VmInstance) -> None: # pragma: no cover
raise NotImplementedError
def delete(self, instance: VmInstance) -> None: # pragma: no cover
raise NotImplementedError
def import_archive( # pragma: no cover
self,
instance: VmInstance,
*,
archive_path: Path,
destination: str,
) -> dict[str, Any]:
raise NotImplementedError
class MockBackend(VmBackend):
"""Host-process backend used for development and testability."""
def create(self, instance: VmInstance) -> None:
instance.workdir.mkdir(parents=True, exist_ok=False)
def start(self, instance: VmInstance) -> None:
marker_path = instance.workdir / ".started"
marker_path.write_text("started\n", encoding="utf-8")
def exec(
self,
instance: VmInstance,
command: str,
timeout_seconds: int,
*,
workdir: Path | None = None,
) -> VmExecResult:
return _run_host_command(workdir or instance.workdir, command, timeout_seconds)
def stop(self, instance: VmInstance) -> None:
marker_path = instance.workdir / ".stopped"
marker_path.write_text("stopped\n", encoding="utf-8")
def delete(self, instance: VmInstance) -> None:
shutil.rmtree(instance.workdir, ignore_errors=True)
def import_archive(
self,
instance: VmInstance,
*,
archive_path: Path,
destination: str,
) -> dict[str, Any]:
return _extract_seed_archive_to_host_workspace(
archive_path,
workspace_dir=_instance_workspace_host_dir(instance),
destination=destination,
)
class FirecrackerBackend(VmBackend): # pragma: no cover
"""Host-gated backend that validates Firecracker prerequisites."""
def __init__(
self,
environment_store: EnvironmentStore,
firecracker_bin: Path,
jailer_bin: Path,
runtime_capabilities: RuntimeCapabilities,
network_manager: TapNetworkManager | None = None,
guest_exec_client: VsockExecClient | None = None,
) -> None:
self._environment_store = environment_store
self._firecracker_bin = firecracker_bin
self._jailer_bin = jailer_bin
self._runtime_capabilities = runtime_capabilities
self._network_manager = network_manager or TapNetworkManager()
self._guest_exec_client = guest_exec_client or VsockExecClient()
self._processes: dict[str, subprocess.Popen[str]] = {}
if not self._firecracker_bin.exists():
raise RuntimeError(f"bundled firecracker binary not found at {self._firecracker_bin}")
if not self._jailer_bin.exists():
raise RuntimeError(f"bundled jailer binary not found at {self._jailer_bin}")
if not Path("/dev/kvm").exists():
raise RuntimeError("/dev/kvm is not available on this host")
def create(self, instance: VmInstance) -> None:
instance.workdir.mkdir(parents=True, exist_ok=False)
try:
installed_environment = self._environment_store.ensure_installed(instance.environment)
if (
not installed_environment.kernel_image.exists()
or not installed_environment.rootfs_image.exists()
):
raise RuntimeError(
f"missing environment artifacts for {instance.environment}; expected "
f"{installed_environment.kernel_image} and {installed_environment.rootfs_image}"
)
instance.metadata["environment_version"] = installed_environment.version
instance.metadata["environment_source"] = installed_environment.source
if installed_environment.source_digest is not None:
instance.metadata["environment_digest"] = installed_environment.source_digest
instance.metadata["environment_install_dir"] = str(installed_environment.install_dir)
instance.metadata["kernel_image"] = str(installed_environment.kernel_image)
rootfs_copy = instance.workdir / "rootfs.ext4"
instance.metadata["rootfs_clone_mode"] = _copy_rootfs(
installed_environment.rootfs_image,
rootfs_copy,
)
instance.metadata["rootfs_image"] = str(rootfs_copy)
if instance.network_requested:
network = self._network_manager.allocate(instance.vm_id)
instance.network = network
instance.metadata.update(self._network_manager.to_metadata(network))
else:
instance.network = None
instance.metadata["network_enabled"] = "false"
except Exception:
shutil.rmtree(instance.workdir, ignore_errors=True)
raise
def start(self, instance: VmInstance) -> None:
launch_plan = build_launch_plan(instance)
instance.metadata["firecracker_config_path"] = str(launch_plan.config_path)
instance.metadata["guest_network_path"] = str(launch_plan.guest_network_path)
instance.metadata["guest_exec_path"] = str(launch_plan.guest_exec_path)
instance.metadata["guest_cid"] = str(launch_plan.guest_cid)
instance.metadata["guest_exec_port"] = str(launch_plan.vsock_port)
instance.metadata["guest_exec_uds_path"] = str(instance.workdir / "vsock.sock")
serial_log_path = instance.workdir / "serial.log"
firecracker_log_path = instance.workdir / "firecracker.log"
firecracker_log_path.touch()
instance.metadata["serial_log_path"] = str(serial_log_path)
instance.metadata["firecracker_log_path"] = str(firecracker_log_path)
proc = subprocess.run( # noqa: S603
[str(self._firecracker_bin), "--version"],
text=True,
capture_output=True,
check=False,
)
if proc.returncode != 0:
raise RuntimeError(f"firecracker startup preflight failed: {proc.stderr.strip()}")
instance.metadata["firecracker_version"] = proc.stdout.strip()
instance.metadata["jailer_path"] = str(self._jailer_bin)
if not self._runtime_capabilities.supports_vm_boot:
instance.metadata["execution_mode"] = "host_compat"
instance.metadata["boot_mode"] = "shim"
if self._runtime_capabilities.reason is not None:
instance.metadata["runtime_reason"] = self._runtime_capabilities.reason
return
with serial_log_path.open("w", encoding="utf-8") as serial_fp:
process = subprocess.Popen( # noqa: S603
[
str(self._firecracker_bin),
"--no-api",
"--config-file",
str(launch_plan.config_path),
"--log-path",
str(firecracker_log_path),
"--level",
"Info",
],
stdout=serial_fp,
stderr=subprocess.STDOUT,
text=True,
start_new_session=True,
)
self._processes[instance.vm_id] = process
time.sleep(2)
if process.poll() is not None:
serial_log = serial_log_path.read_text(encoding="utf-8", errors="ignore")
firecracker_log = firecracker_log_path.read_text(encoding="utf-8", errors="ignore")
self._processes.pop(instance.vm_id, None)
raise RuntimeError(
"firecracker microVM exited during startup: "
f"{(serial_log or firecracker_log).strip()}"
)
instance.firecracker_pid = process.pid
instance.metadata["execution_mode"] = (
"guest_vsock" if self._runtime_capabilities.supports_guest_exec else "guest_boot_only"
)
instance.metadata["boot_mode"] = "native"
def exec(
self,
instance: VmInstance,
command: str,
timeout_seconds: int,
*,
workdir: Path | None = None,
) -> VmExecResult:
if self._runtime_capabilities.supports_guest_exec:
guest_cid = int(instance.metadata["guest_cid"])
port = int(instance.metadata["guest_exec_port"])
uds_path = instance.metadata.get("guest_exec_uds_path")
deadline = time.monotonic() + min(timeout_seconds, 10)
while True:
try:
response = self._guest_exec_client.exec(
guest_cid,
port,
command,
timeout_seconds,
uds_path=uds_path,
)
break
except (OSError, RuntimeError) as exc:
if time.monotonic() >= deadline:
raise RuntimeError(
f"guest exec transport did not become ready: {exc}"
) from exc
time.sleep(0.2)
return VmExecResult(
stdout=response.stdout,
stderr=response.stderr,
exit_code=response.exit_code,
duration_ms=response.duration_ms,
)
instance.metadata["execution_mode"] = "host_compat"
return _run_host_command(workdir or instance.workdir, command, timeout_seconds)
def stop(self, instance: VmInstance) -> None:
process = self._processes.pop(instance.vm_id, None)
if process is not None:
process.terminate()
try:
process.wait(timeout=5)
except subprocess.TimeoutExpired:
process.kill()
process.wait(timeout=5)
instance.firecracker_pid = None
return
if instance.firecracker_pid is None:
return
try:
os.kill(instance.firecracker_pid, signal.SIGTERM)
except ProcessLookupError:
instance.firecracker_pid = None
return
deadline = time.monotonic() + 5
while time.monotonic() < deadline:
try:
os.kill(instance.firecracker_pid, 0)
except ProcessLookupError:
instance.firecracker_pid = None
return
time.sleep(0.1)
os.kill(instance.firecracker_pid, signal.SIGKILL)
instance.firecracker_pid = None
def delete(self, instance: VmInstance) -> None:
self._processes.pop(instance.vm_id, None)
if instance.network is not None:
self._network_manager.cleanup(instance.network)
shutil.rmtree(instance.workdir, ignore_errors=True)
def import_archive(
self,
instance: VmInstance,
*,
archive_path: Path,
destination: str,
) -> dict[str, Any]:
if self._runtime_capabilities.supports_guest_exec:
guest_cid = int(instance.metadata["guest_cid"])
port = int(instance.metadata["guest_exec_port"])
uds_path = instance.metadata.get("guest_exec_uds_path")
deadline = time.monotonic() + 10
while True:
try:
response = self._guest_exec_client.upload_archive(
guest_cid,
port,
archive_path,
destination=destination,
timeout_seconds=WORKSPACE_ARCHIVE_UPLOAD_TIMEOUT_SECONDS,
uds_path=uds_path,
)
return {
"destination": response.destination,
"entry_count": response.entry_count,
"bytes_written": response.bytes_written,
}
except (OSError, RuntimeError) as exc:
if time.monotonic() >= deadline:
raise RuntimeError(
f"guest archive transport did not become ready: {exc}"
) from exc
time.sleep(0.2)
instance.metadata["execution_mode"] = "host_compat"
return _extract_seed_archive_to_host_workspace(
archive_path,
workspace_dir=_instance_workspace_host_dir(instance),
destination=destination,
)
class VmManager:
"""In-process lifecycle manager for ephemeral VM environments and workspaces."""
MIN_VCPUS = 1
MAX_VCPUS = 8
MIN_MEM_MIB = 256
MAX_MEM_MIB = 32768
MIN_TTL_SECONDS = 60
MAX_TTL_SECONDS = 3600
DEFAULT_VCPU_COUNT = DEFAULT_VCPU_COUNT
DEFAULT_MEM_MIB = DEFAULT_MEM_MIB
DEFAULT_TIMEOUT_SECONDS = DEFAULT_TIMEOUT_SECONDS
DEFAULT_TTL_SECONDS = DEFAULT_TTL_SECONDS
DEFAULT_ALLOW_HOST_COMPAT = DEFAULT_ALLOW_HOST_COMPAT
def __init__(
self,
*,
backend_name: str | None = None,
base_dir: Path | None = None,
cache_dir: Path | None = None,
max_active_vms: int = 4,
runtime_paths: RuntimePaths | None = None,
network_manager: TapNetworkManager | None = None,
) -> None:
self._backend_name = backend_name or "firecracker"
self._base_dir = base_dir or Path("/tmp/pyro-mcp")
self._workspaces_dir = self._base_dir / "workspaces"
resolved_cache_dir = cache_dir or default_cache_dir()
self._runtime_paths = runtime_paths
if self._backend_name == "firecracker":
self._runtime_paths = self._runtime_paths or resolve_runtime_paths()
self._runtime_capabilities = runtime_capabilities(self._runtime_paths)
self._environment_store = EnvironmentStore(
runtime_paths=self._runtime_paths,
cache_dir=resolved_cache_dir,
)
else:
self._runtime_capabilities = RuntimeCapabilities(
supports_vm_boot=False,
supports_guest_exec=False,
supports_guest_network=False,
reason="mock backend does not boot a guest",
)
if self._runtime_paths is None:
self._runtime_paths = resolve_runtime_paths(verify_checksums=False)
self._environment_store = EnvironmentStore(
runtime_paths=self._runtime_paths,
cache_dir=resolved_cache_dir,
)
self._max_active_vms = max_active_vms
if network_manager is not None:
self._network_manager = network_manager
elif self._backend_name == "firecracker":
self._network_manager = TapNetworkManager(enabled=True)
else:
self._network_manager = TapNetworkManager(enabled=False)
self._lock = threading.Lock()
self._instances: dict[str, VmInstance] = {}
self._base_dir.mkdir(parents=True, exist_ok=True)
self._workspaces_dir.mkdir(parents=True, exist_ok=True)
self._backend = self._build_backend()
def _build_backend(self) -> VmBackend:
if self._backend_name == "mock":
return MockBackend()
if self._backend_name == "firecracker":
if self._runtime_paths is None:
raise RuntimeError("runtime paths were not initialized for firecracker backend")
return FirecrackerBackend(
self._environment_store,
firecracker_bin=self._runtime_paths.firecracker_bin,
jailer_bin=self._runtime_paths.jailer_bin,
runtime_capabilities=self._runtime_capabilities,
network_manager=self._network_manager,
)
raise ValueError("invalid backend; expected one of: mock, firecracker")
def list_environments(self) -> list[dict[str, object]]:
return self._environment_store.list_environments()
def pull_environment(self, environment: str) -> dict[str, object]:
return self._environment_store.pull_environment(environment)
def inspect_environment(self, environment: str) -> dict[str, object]:
return self._environment_store.inspect_environment(environment)
def prune_environments(self) -> dict[str, object]:
return self._environment_store.prune_environments()
def create_vm(
self,
*,
environment: str,
vcpu_count: int = DEFAULT_VCPU_COUNT,
mem_mib: int = DEFAULT_MEM_MIB,
ttl_seconds: int = DEFAULT_TTL_SECONDS,
network: bool = False,
allow_host_compat: bool = DEFAULT_ALLOW_HOST_COMPAT,
) -> dict[str, Any]:
self._validate_limits(vcpu_count=vcpu_count, mem_mib=mem_mib, ttl_seconds=ttl_seconds)
get_environment(environment, runtime_paths=self._runtime_paths)
now = time.time()
with self._lock:
self._reap_expired_locked(now)
self._reap_expired_workspaces_locked(now)
active_count = len(self._instances) + self._count_workspaces_locked()
if active_count >= self._max_active_vms:
raise RuntimeError(
f"max active VMs reached ({self._max_active_vms}); delete old VMs first"
)
vm_id = uuid.uuid4().hex[:12]
instance = VmInstance(
vm_id=vm_id,
environment=environment,
vcpu_count=vcpu_count,
mem_mib=mem_mib,
ttl_seconds=ttl_seconds,
created_at=now,
expires_at=now + ttl_seconds,
workdir=self._base_dir / vm_id,
network_requested=network,
allow_host_compat=allow_host_compat,
)
instance.metadata["allow_host_compat"] = str(allow_host_compat).lower()
self._backend.create(instance)
self._instances[vm_id] = instance
return self._serialize(instance)
def run_vm(
self,
*,
environment: str,
command: str,
vcpu_count: int = DEFAULT_VCPU_COUNT,
mem_mib: int = DEFAULT_MEM_MIB,
timeout_seconds: int = DEFAULT_TIMEOUT_SECONDS,
ttl_seconds: int = DEFAULT_TTL_SECONDS,
network: bool = False,
allow_host_compat: bool = DEFAULT_ALLOW_HOST_COMPAT,
) -> dict[str, Any]:
created = self.create_vm(
environment=environment,
vcpu_count=vcpu_count,
mem_mib=mem_mib,
ttl_seconds=ttl_seconds,
network=network,
allow_host_compat=allow_host_compat,
)
vm_id = str(created["vm_id"])
try:
self.start_vm(vm_id)
return self.exec_vm(vm_id, command=command, timeout_seconds=timeout_seconds)
except Exception:
try:
self.delete_vm(vm_id, reason="run_vm_error_cleanup")
except ValueError:
pass
raise
def start_vm(self, vm_id: str) -> dict[str, Any]:
with self._lock:
instance = self._get_instance_locked(vm_id)
self._ensure_not_expired_locked(instance, time.time())
self._start_instance_locked(instance)
return self._serialize(instance)
def exec_vm(self, vm_id: str, *, command: str, timeout_seconds: int) -> dict[str, Any]:
with self._lock:
instance = self._get_instance_locked(vm_id)
self._ensure_not_expired_locked(instance, time.time())
exec_instance = instance
exec_result, execution_mode = self._exec_instance(
exec_instance,
command=command,
timeout_seconds=timeout_seconds,
)
cleanup = self.delete_vm(vm_id, reason="post_exec_cleanup")
return {
"vm_id": vm_id,
"environment": exec_instance.environment,
"environment_version": exec_instance.metadata.get("environment_version"),
"command": command,
"stdout": exec_result.stdout,
"stderr": exec_result.stderr,
"exit_code": exec_result.exit_code,
"duration_ms": exec_result.duration_ms,
"execution_mode": execution_mode,
"cleanup": cleanup,
}
def stop_vm(self, vm_id: str) -> dict[str, Any]:
with self._lock:
instance = self._get_instance_locked(vm_id)
self._backend.stop(instance)
instance.state = "stopped"
return self._serialize(instance)
def delete_vm(self, vm_id: str, *, reason: str = "explicit_delete") -> dict[str, Any]:
with self._lock:
instance = self._get_instance_locked(vm_id)
if instance.state == "started":
self._backend.stop(instance)
instance.state = "stopped"
self._backend.delete(instance)
del self._instances[vm_id]
return {"vm_id": vm_id, "deleted": True, "reason": reason}
def status_vm(self, vm_id: str) -> dict[str, Any]:
with self._lock:
instance = self._get_instance_locked(vm_id)
self._ensure_not_expired_locked(instance, time.time())
return self._serialize(instance)
def network_info_vm(self, vm_id: str) -> dict[str, Any]:
with self._lock:
instance = self._get_instance_locked(vm_id)
self._ensure_not_expired_locked(instance, time.time())
if instance.network is None:
return {
"vm_id": vm_id,
"network_enabled": False,
"outbound_connectivity_expected": False,
"reason": "network configuration is unavailable for this VM",
}
return {"vm_id": vm_id, **self._network_manager.network_info(instance.network)}
def reap_expired(self) -> dict[str, Any]:
now = time.time()
with self._lock:
expired_vm_ids = [
vm_id for vm_id, inst in self._instances.items() if inst.expires_at <= now
]
for vm_id in expired_vm_ids:
instance = self._instances[vm_id]
if instance.state == "started":
self._backend.stop(instance)
instance.state = "stopped"
self._backend.delete(instance)
del self._instances[vm_id]
return {"deleted_vm_ids": expired_vm_ids, "count": len(expired_vm_ids)}
def create_workspace(
self,
*,
environment: str,
vcpu_count: int = DEFAULT_VCPU_COUNT,
mem_mib: int = DEFAULT_MEM_MIB,
ttl_seconds: int = DEFAULT_TTL_SECONDS,
network: bool = False,
allow_host_compat: bool = DEFAULT_ALLOW_HOST_COMPAT,
seed_path: str | Path | None = None,
) -> dict[str, Any]:
self._validate_limits(vcpu_count=vcpu_count, mem_mib=mem_mib, ttl_seconds=ttl_seconds)
get_environment(environment, runtime_paths=self._runtime_paths)
prepared_seed = self._prepare_workspace_seed(seed_path)
now = time.time()
workspace_id = uuid.uuid4().hex[:12]
workspace_dir = self._workspace_dir(workspace_id)
runtime_dir = self._workspace_runtime_dir(workspace_id)
host_workspace_dir = self._workspace_host_dir(workspace_id)
commands_dir = self._workspace_commands_dir(workspace_id)
workspace_dir.mkdir(parents=True, exist_ok=False)
host_workspace_dir.mkdir(parents=True, exist_ok=True)
commands_dir.mkdir(parents=True, exist_ok=True)
instance = VmInstance(
vm_id=workspace_id,
environment=environment,
vcpu_count=vcpu_count,
mem_mib=mem_mib,
ttl_seconds=ttl_seconds,
created_at=now,
expires_at=now + ttl_seconds,
workdir=runtime_dir,
network_requested=network,
allow_host_compat=allow_host_compat,
)
instance.metadata["allow_host_compat"] = str(allow_host_compat).lower()
instance.metadata["workspace_path"] = WORKSPACE_GUEST_PATH
instance.metadata["workspace_host_dir"] = str(host_workspace_dir)
try:
with self._lock:
self._reap_expired_locked(now)
self._reap_expired_workspaces_locked(now)
active_count = len(self._instances) + self._count_workspaces_locked()
if active_count >= self._max_active_vms:
raise RuntimeError(
f"max active VMs reached ({self._max_active_vms}); delete old VMs first"
)
self._backend.create(instance)
if (
prepared_seed.archive_path is not None
and self._runtime_capabilities.supports_guest_exec
):
self._ensure_workspace_guest_seed_support(instance)
with self._lock:
self._start_instance_locked(instance)
self._require_guest_exec_or_opt_in(instance)
workspace_seed = prepared_seed.to_payload()
if prepared_seed.archive_path is not None:
import_summary = self._backend.import_archive(
instance,
archive_path=prepared_seed.archive_path,
destination=WORKSPACE_GUEST_PATH,
)
workspace_seed["entry_count"] = int(import_summary["entry_count"])
workspace_seed["bytes_written"] = int(import_summary["bytes_written"])
workspace_seed["destination"] = str(import_summary["destination"])
elif self._runtime_capabilities.supports_guest_exec:
self._backend.exec(
instance,
f"mkdir -p {shlex.quote(WORKSPACE_GUEST_PATH)}",
10,
)
else:
instance.metadata["execution_mode"] = "host_compat"
workspace = WorkspaceRecord.from_instance(instance, workspace_seed=workspace_seed)
self._save_workspace_locked(workspace)
return self._serialize_workspace(workspace)
except Exception:
if runtime_dir.exists():
try:
if instance.state == "started":
self._backend.stop(instance)
instance.state = "stopped"
except Exception:
pass
try:
self._backend.delete(instance)
except Exception:
pass
shutil.rmtree(workspace_dir, ignore_errors=True)
raise
finally:
prepared_seed.cleanup()
def push_workspace_sync(
self,
workspace_id: str,
*,
source_path: str | Path,
dest: str = WORKSPACE_GUEST_PATH,
) -> dict[str, Any]:
prepared_seed = self._prepare_workspace_seed(source_path)
if prepared_seed.archive_path is None:
prepared_seed.cleanup()
raise ValueError("source_path is required")
normalized_destination, _ = _normalize_workspace_destination(dest)
with self._lock:
workspace = self._load_workspace_locked(workspace_id)
self._ensure_workspace_not_expired_locked(workspace, time.time())
self._refresh_workspace_liveness_locked(workspace)
if workspace.state != "started":
raise RuntimeError(
f"workspace {workspace_id} must be in 'started' state "
"before workspace_sync_push"
)
instance = workspace.to_instance(
workdir=self._workspace_runtime_dir(workspace.workspace_id)
)
try:
import_summary = self._backend.import_archive(
instance,
archive_path=prepared_seed.archive_path,
destination=normalized_destination,
)
finally:
prepared_seed.cleanup()
workspace_sync = prepared_seed.to_payload(
destination=normalized_destination,
path_key="source_path",
)
workspace_sync["entry_count"] = int(import_summary["entry_count"])
workspace_sync["bytes_written"] = int(import_summary["bytes_written"])
workspace_sync["destination"] = str(import_summary["destination"])
with self._lock:
workspace = self._load_workspace_locked(workspace_id)
workspace.state = instance.state
workspace.firecracker_pid = instance.firecracker_pid
workspace.last_error = instance.last_error
workspace.metadata = dict(instance.metadata)
self._save_workspace_locked(workspace)
return {
"workspace_id": workspace_id,
"execution_mode": instance.metadata.get("execution_mode", "pending"),
"workspace_sync": workspace_sync,
}
def exec_workspace(
self,
workspace_id: str,
*,
command: str,
timeout_seconds: int = 30,
) -> dict[str, Any]:
if timeout_seconds <= 0:
raise ValueError("timeout_seconds must be positive")
with self._lock:
workspace = self._load_workspace_locked(workspace_id)
self._ensure_workspace_not_expired_locked(workspace, time.time())
self._refresh_workspace_liveness_locked(workspace)
if workspace.state != "started":
raise RuntimeError(
f"workspace {workspace_id} must be in 'started' state before workspace_exec"
)
instance = workspace.to_instance(
workdir=self._workspace_runtime_dir(workspace.workspace_id)
)
exec_result, execution_mode = self._exec_instance(
instance,
command=command,
timeout_seconds=timeout_seconds,
host_workdir=self._workspace_host_dir(workspace.workspace_id),
guest_cwd=WORKSPACE_GUEST_PATH,
)
with self._lock:
workspace = self._load_workspace_locked(workspace_id)
workspace.state = instance.state
workspace.firecracker_pid = instance.firecracker_pid
workspace.last_error = instance.last_error
workspace.metadata = dict(instance.metadata)
entry = self._record_workspace_command_locked(
workspace,
command=command,
exec_result=exec_result,
execution_mode=execution_mode,
cwd=WORKSPACE_GUEST_PATH,
)
self._save_workspace_locked(workspace)
return {
"workspace_id": workspace_id,
"environment": workspace.environment,
"environment_version": workspace.metadata.get("environment_version"),
"command": command,
"stdout": exec_result.stdout,
"stderr": exec_result.stderr,
"exit_code": exec_result.exit_code,
"duration_ms": exec_result.duration_ms,
"execution_mode": execution_mode,
"sequence": entry["sequence"],
"cwd": WORKSPACE_GUEST_PATH,
}
def status_workspace(self, workspace_id: str) -> dict[str, Any]:
with self._lock:
workspace = self._load_workspace_locked(workspace_id)
self._ensure_workspace_not_expired_locked(workspace, time.time())
self._refresh_workspace_liveness_locked(workspace)
self._save_workspace_locked(workspace)
return self._serialize_workspace(workspace)
def logs_workspace(self, workspace_id: str) -> dict[str, Any]:
with self._lock:
workspace = self._load_workspace_locked(workspace_id)
self._ensure_workspace_not_expired_locked(workspace, time.time())
self._refresh_workspace_liveness_locked(workspace)
self._save_workspace_locked(workspace)
entries = self._read_workspace_logs_locked(workspace.workspace_id)
return {
"workspace_id": workspace.workspace_id,
"count": len(entries),
"entries": entries,
}
def delete_workspace(
self,
workspace_id: str,
*,
reason: str = "explicit_delete",
) -> dict[str, Any]:
with self._lock:
workspace = self._load_workspace_locked(workspace_id)
instance = workspace.to_instance(
workdir=self._workspace_runtime_dir(workspace.workspace_id)
)
if workspace.state == "started":
self._backend.stop(instance)
workspace.state = "stopped"
self._backend.delete(instance)
shutil.rmtree(self._workspace_dir(workspace_id), ignore_errors=True)
return {"workspace_id": workspace_id, "deleted": True, "reason": reason}
def _validate_limits(self, *, vcpu_count: int, mem_mib: int, ttl_seconds: int) -> None:
if not self.MIN_VCPUS <= vcpu_count <= self.MAX_VCPUS:
raise ValueError(f"vcpu_count must be between {self.MIN_VCPUS} and {self.MAX_VCPUS}")
if not self.MIN_MEM_MIB <= mem_mib <= self.MAX_MEM_MIB:
raise ValueError(f"mem_mib must be between {self.MIN_MEM_MIB} and {self.MAX_MEM_MIB}")
if not self.MIN_TTL_SECONDS <= ttl_seconds <= self.MAX_TTL_SECONDS:
raise ValueError(
f"ttl_seconds must be between {self.MIN_TTL_SECONDS} and {self.MAX_TTL_SECONDS}"
)
def _serialize(self, instance: VmInstance) -> dict[str, Any]:
return {
"vm_id": instance.vm_id,
"environment": instance.environment,
"environment_version": instance.metadata.get("environment_version"),
"vcpu_count": instance.vcpu_count,
"mem_mib": instance.mem_mib,
"ttl_seconds": instance.ttl_seconds,
"created_at": instance.created_at,
"expires_at": instance.expires_at,
"state": instance.state,
"network_enabled": instance.network is not None,
"allow_host_compat": instance.allow_host_compat,
"guest_ip": instance.network.guest_ip if instance.network is not None else None,
"tap_name": instance.network.tap_name if instance.network is not None else None,
"execution_mode": instance.metadata.get("execution_mode", "pending"),
"metadata": instance.metadata,
}
def _serialize_workspace(self, workspace: WorkspaceRecord) -> dict[str, Any]:
return {
"workspace_id": workspace.workspace_id,
"environment": workspace.environment,
"environment_version": workspace.metadata.get("environment_version"),
"vcpu_count": workspace.vcpu_count,
"mem_mib": workspace.mem_mib,
"ttl_seconds": workspace.ttl_seconds,
"created_at": workspace.created_at,
"expires_at": workspace.expires_at,
"state": workspace.state,
"network_enabled": workspace.network is not None,
"allow_host_compat": workspace.allow_host_compat,
"guest_ip": workspace.network.guest_ip if workspace.network is not None else None,
"tap_name": workspace.network.tap_name if workspace.network is not None else None,
"execution_mode": workspace.metadata.get("execution_mode", "pending"),
"workspace_path": WORKSPACE_GUEST_PATH,
"workspace_seed": _workspace_seed_dict(workspace.workspace_seed),
"command_count": workspace.command_count,
"last_command": workspace.last_command,
"metadata": workspace.metadata,
}
def _require_guest_boot_or_opt_in(self, instance: VmInstance) -> None:
if self._runtime_capabilities.supports_vm_boot or instance.allow_host_compat:
return
reason = self._runtime_capabilities.reason or "runtime does not support real VM boot"
raise RuntimeError(
"guest boot is unavailable and host compatibility mode is disabled: "
f"{reason}. Set allow_host_compat=True (CLI: --allow-host-compat) to opt into "
"host execution."
)
def _require_guest_exec_or_opt_in(self, instance: VmInstance) -> None:
if self._runtime_capabilities.supports_guest_exec or instance.allow_host_compat:
return
reason = self._runtime_capabilities.reason or (
"runtime does not support guest command execution"
)
raise RuntimeError(
"guest command execution is unavailable and host compatibility mode is disabled: "
f"{reason}. Set allow_host_compat=True (CLI: --allow-host-compat) to opt into "
"host execution."
)
def _get_instance_locked(self, vm_id: str) -> VmInstance:
try:
return self._instances[vm_id]
except KeyError as exc:
raise ValueError(f"vm {vm_id!r} does not exist") from exc
def _reap_expired_locked(self, now: float) -> None:
expired_vm_ids = [
vm_id for vm_id, inst in self._instances.items() if inst.expires_at <= now
]
for vm_id in expired_vm_ids:
instance = self._instances[vm_id]
if instance.state == "started":
self._backend.stop(instance)
instance.state = "stopped"
self._backend.delete(instance)
del self._instances[vm_id]
def _ensure_not_expired_locked(self, instance: VmInstance, now: float) -> None:
if instance.expires_at <= now:
vm_id = instance.vm_id
self._reap_expired_locked(now)
raise RuntimeError(f"vm {vm_id!r} expired and was automatically deleted")
def _start_instance_locked(self, instance: VmInstance) -> None:
if instance.state not in {"created", "stopped"}:
raise RuntimeError(
f"vm {instance.vm_id} cannot be started from state {instance.state!r}"
)
self._require_guest_boot_or_opt_in(instance)
if not self._runtime_capabilities.supports_vm_boot:
instance.metadata["execution_mode"] = "host_compat"
instance.metadata["boot_mode"] = "compat"
if self._runtime_capabilities.reason is not None:
instance.metadata["runtime_reason"] = self._runtime_capabilities.reason
self._backend.start(instance)
instance.state = "started"
def _exec_instance(
self,
instance: VmInstance,
*,
command: str,
timeout_seconds: int,
host_workdir: Path | None = None,
guest_cwd: str | None = None,
) -> tuple[VmExecResult, str]:
if timeout_seconds <= 0:
raise ValueError("timeout_seconds must be positive")
if instance.state != "started":
raise RuntimeError(f"vm {instance.vm_id} must be in 'started' state before execution")
self._require_guest_exec_or_opt_in(instance)
prepared_command = command
if self._runtime_capabilities.supports_guest_exec:
prepared_command = _wrap_guest_command(command, cwd=guest_cwd)
workdir = None
else:
instance.metadata["execution_mode"] = "host_compat"
workdir = host_workdir
exec_result = self._backend.exec(
instance,
prepared_command,
timeout_seconds,
workdir=workdir,
)
execution_mode = instance.metadata.get("execution_mode", "unknown")
return exec_result, execution_mode
def _prepare_workspace_seed(self, seed_path: str | Path | None) -> PreparedWorkspaceSeed:
if seed_path is None:
return PreparedWorkspaceSeed(mode="empty", source_path=None)
resolved_source_path = Path(seed_path).expanduser().resolve()
if not resolved_source_path.exists():
raise ValueError(f"seed_path {resolved_source_path} does not exist")
if resolved_source_path.is_dir():
cleanup_dir = Path(tempfile.mkdtemp(prefix="pyro-workspace-seed-"))
archive_path = cleanup_dir / "workspace-seed.tar"
try:
_write_directory_seed_archive(resolved_source_path, archive_path)
entry_count, bytes_written = _inspect_seed_archive(archive_path)
except Exception:
shutil.rmtree(cleanup_dir, ignore_errors=True)
raise
return PreparedWorkspaceSeed(
mode="directory",
source_path=str(resolved_source_path),
archive_path=archive_path,
entry_count=entry_count,
bytes_written=bytes_written,
cleanup_dir=cleanup_dir,
)
if (
not resolved_source_path.is_file()
or not _is_supported_seed_archive(resolved_source_path)
):
raise ValueError(
"seed_path must be a directory or a .tar/.tar.gz/.tgz archive"
)
entry_count, bytes_written = _inspect_seed_archive(resolved_source_path)
return PreparedWorkspaceSeed(
mode="tar_archive",
source_path=str(resolved_source_path),
archive_path=resolved_source_path,
entry_count=entry_count,
bytes_written=bytes_written,
)
def _ensure_workspace_guest_seed_support(self, instance: VmInstance) -> None:
if self._runtime_paths is None or self._runtime_paths.guest_agent_path is None:
raise RuntimeError(
"runtime bundle does not provide a guest agent for workspace seeding"
)
rootfs_image = instance.metadata.get("rootfs_image")
if rootfs_image is None or rootfs_image == "":
raise RuntimeError("workspace rootfs image is unavailable for guest seeding")
_patch_rootfs_guest_agent(Path(rootfs_image), self._runtime_paths.guest_agent_path)
def _workspace_dir(self, workspace_id: str) -> Path:
return self._workspaces_dir / workspace_id
def _workspace_runtime_dir(self, workspace_id: str) -> Path:
return self._workspace_dir(workspace_id) / WORKSPACE_RUNTIME_DIRNAME
def _workspace_host_dir(self, workspace_id: str) -> Path:
return self._workspace_dir(workspace_id) / WORKSPACE_DIRNAME
def _workspace_commands_dir(self, workspace_id: str) -> Path:
return self._workspace_dir(workspace_id) / WORKSPACE_COMMANDS_DIRNAME
def _workspace_metadata_path(self, workspace_id: str) -> Path:
return self._workspace_dir(workspace_id) / "workspace.json"
def _count_workspaces_locked(self) -> int:
return sum(1 for _ in self._workspaces_dir.glob("*/workspace.json"))
def _load_workspace_locked(self, workspace_id: str) -> WorkspaceRecord:
metadata_path = self._workspace_metadata_path(workspace_id)
if not metadata_path.exists():
raise ValueError(f"workspace {workspace_id!r} does not exist")
payload = json.loads(metadata_path.read_text(encoding="utf-8"))
if not isinstance(payload, dict):
raise RuntimeError(f"workspace record at {metadata_path} is invalid")
return WorkspaceRecord.from_payload(payload)
def _save_workspace_locked(self, workspace: WorkspaceRecord) -> None:
metadata_path = self._workspace_metadata_path(workspace.workspace_id)
metadata_path.parent.mkdir(parents=True, exist_ok=True)
metadata_path.write_text(
json.dumps(workspace.to_payload(), indent=2, sort_keys=True),
encoding="utf-8",
)
def _reap_expired_workspaces_locked(self, now: float) -> None:
for metadata_path in list(self._workspaces_dir.glob("*/workspace.json")):
payload = json.loads(metadata_path.read_text(encoding="utf-8"))
if not isinstance(payload, dict):
shutil.rmtree(metadata_path.parent, ignore_errors=True)
continue
workspace = WorkspaceRecord.from_payload(payload)
if workspace.expires_at > now:
continue
instance = workspace.to_instance(
workdir=self._workspace_runtime_dir(workspace.workspace_id)
)
if workspace.state == "started":
self._backend.stop(instance)
workspace.state = "stopped"
self._backend.delete(instance)
shutil.rmtree(self._workspace_dir(workspace.workspace_id), ignore_errors=True)
def _ensure_workspace_not_expired_locked(
self,
workspace: WorkspaceRecord,
now: float,
) -> None:
if workspace.expires_at <= now:
workspace_id = workspace.workspace_id
self._reap_expired_workspaces_locked(now)
raise RuntimeError(f"workspace {workspace_id!r} expired and was automatically deleted")
def _refresh_workspace_liveness_locked(self, workspace: WorkspaceRecord) -> None:
if workspace.state != "started":
return
execution_mode = workspace.metadata.get("execution_mode")
if execution_mode == "host_compat":
return
if _pid_is_running(workspace.firecracker_pid):
return
workspace.state = "stopped"
workspace.firecracker_pid = None
workspace.last_error = "backing guest process is no longer running"
def _record_workspace_command_locked(
self,
workspace: WorkspaceRecord,
*,
command: str,
exec_result: VmExecResult,
execution_mode: str,
cwd: str,
) -> dict[str, Any]:
sequence = workspace.command_count + 1
commands_dir = self._workspace_commands_dir(workspace.workspace_id)
commands_dir.mkdir(parents=True, exist_ok=True)
base_name = f"{sequence:06d}"
stdout_path = commands_dir / f"{base_name}.stdout"
stderr_path = commands_dir / f"{base_name}.stderr"
record_path = commands_dir / f"{base_name}.json"
stdout_path.write_text(exec_result.stdout, encoding="utf-8")
stderr_path.write_text(exec_result.stderr, encoding="utf-8")
entry: dict[str, Any] = {
"sequence": sequence,
"command": command,
"cwd": cwd,
"exit_code": exec_result.exit_code,
"duration_ms": exec_result.duration_ms,
"execution_mode": execution_mode,
"stdout_file": stdout_path.name,
"stderr_file": stderr_path.name,
"recorded_at": time.time(),
}
record_path.write_text(json.dumps(entry, indent=2, sort_keys=True), encoding="utf-8")
workspace.command_count = sequence
workspace.last_command = {
"sequence": sequence,
"command": command,
"cwd": cwd,
"exit_code": exec_result.exit_code,
"duration_ms": exec_result.duration_ms,
"execution_mode": execution_mode,
}
return entry
def _read_workspace_logs_locked(self, workspace_id: str) -> list[dict[str, Any]]:
entries: list[dict[str, Any]] = []
commands_dir = self._workspace_commands_dir(workspace_id)
if not commands_dir.exists():
return entries
for record_path in sorted(commands_dir.glob("*.json")):
payload = json.loads(record_path.read_text(encoding="utf-8"))
if not isinstance(payload, dict):
continue
stdout_name = str(payload.get("stdout_file", ""))
stderr_name = str(payload.get("stderr_file", ""))
stdout = ""
stderr = ""
if stdout_name != "":
stdout_path = commands_dir / stdout_name
if stdout_path.exists():
stdout = stdout_path.read_text(encoding="utf-8")
if stderr_name != "":
stderr_path = commands_dir / stderr_name
if stderr_path.exists():
stderr = stderr_path.read_text(encoding="utf-8")
entry = dict(payload)
entry["stdout"] = stdout
entry["stderr"] = stderr
entries.append(entry)
return entries