Ship trust-first CLI and runtime defaults

This commit is contained in:
Thales Maciel 2026-03-09 20:52:49 -03:00
parent fb718af154
commit 5d63e4c16e
26 changed files with 894 additions and 134 deletions

View file

@ -7,7 +7,14 @@ from typing import Any
from mcp.server.fastmcp import FastMCP
from pyro_mcp.vm_manager import VmManager
from pyro_mcp.vm_manager import (
DEFAULT_ALLOW_HOST_COMPAT,
DEFAULT_MEM_MIB,
DEFAULT_TIMEOUT_SECONDS,
DEFAULT_TTL_SECONDS,
DEFAULT_VCPU_COUNT,
VmManager,
)
class Pyro:
@ -49,10 +56,11 @@ class Pyro:
self,
*,
environment: str,
vcpu_count: int,
mem_mib: int,
ttl_seconds: int = 600,
vcpu_count: int = DEFAULT_VCPU_COUNT,
mem_mib: int = DEFAULT_MEM_MIB,
ttl_seconds: int = DEFAULT_TTL_SECONDS,
network: bool = False,
allow_host_compat: bool = DEFAULT_ALLOW_HOST_COMPAT,
) -> dict[str, Any]:
return self._manager.create_vm(
environment=environment,
@ -60,6 +68,7 @@ class Pyro:
mem_mib=mem_mib,
ttl_seconds=ttl_seconds,
network=network,
allow_host_compat=allow_host_compat,
)
def start_vm(self, vm_id: str) -> dict[str, Any]:
@ -88,11 +97,12 @@ class Pyro:
*,
environment: str,
command: str,
vcpu_count: int,
mem_mib: int,
timeout_seconds: int = 30,
ttl_seconds: int = 600,
vcpu_count: int = DEFAULT_VCPU_COUNT,
mem_mib: int = DEFAULT_MEM_MIB,
timeout_seconds: int = DEFAULT_TIMEOUT_SECONDS,
ttl_seconds: int = DEFAULT_TTL_SECONDS,
network: bool = False,
allow_host_compat: bool = DEFAULT_ALLOW_HOST_COMPAT,
) -> dict[str, Any]:
return self._manager.run_vm(
environment=environment,
@ -102,6 +112,7 @@ class Pyro:
timeout_seconds=timeout_seconds,
ttl_seconds=ttl_seconds,
network=network,
allow_host_compat=allow_host_compat,
)
def create_server(self) -> FastMCP:
@ -111,11 +122,12 @@ class Pyro:
async def vm_run(
environment: str,
command: str,
vcpu_count: int,
mem_mib: int,
timeout_seconds: int = 30,
ttl_seconds: int = 600,
vcpu_count: int = DEFAULT_VCPU_COUNT,
mem_mib: int = DEFAULT_MEM_MIB,
timeout_seconds: int = DEFAULT_TIMEOUT_SECONDS,
ttl_seconds: int = DEFAULT_TTL_SECONDS,
network: bool = False,
allow_host_compat: bool = DEFAULT_ALLOW_HOST_COMPAT,
) -> dict[str, Any]:
"""Create, start, execute, and clean up an ephemeral VM."""
return self.run_in_vm(
@ -126,6 +138,7 @@ class Pyro:
timeout_seconds=timeout_seconds,
ttl_seconds=ttl_seconds,
network=network,
allow_host_compat=allow_host_compat,
)
@server.tool()
@ -136,10 +149,11 @@ class Pyro:
@server.tool()
async def vm_create(
environment: str,
vcpu_count: int,
mem_mib: int,
ttl_seconds: int = 600,
vcpu_count: int = DEFAULT_VCPU_COUNT,
mem_mib: int = DEFAULT_MEM_MIB,
ttl_seconds: int = DEFAULT_TTL_SECONDS,
network: bool = False,
allow_host_compat: bool = DEFAULT_ALLOW_HOST_COMPAT,
) -> dict[str, Any]:
"""Create an ephemeral VM record with environment and resource sizing."""
return self.create_vm(
@ -148,6 +162,7 @@ class Pyro:
mem_mib=mem_mib,
ttl_seconds=ttl_seconds,
network=network,
allow_host_compat=allow_host_compat,
)
@server.tool()

View file

@ -4,6 +4,7 @@ from __future__ import annotations
import argparse
import json
import sys
from typing import Any
from pyro_mcp import __version__
@ -12,12 +13,135 @@ from pyro_mcp.demo import run_demo
from pyro_mcp.ollama_demo import DEFAULT_OLLAMA_BASE_URL, DEFAULT_OLLAMA_MODEL, run_ollama_tool_demo
from pyro_mcp.runtime import DEFAULT_PLATFORM, doctor_report
from pyro_mcp.vm_environments import DEFAULT_CATALOG_VERSION
from pyro_mcp.vm_manager import (
DEFAULT_MEM_MIB,
DEFAULT_VCPU_COUNT,
)
def _print_json(payload: dict[str, Any]) -> None:
print(json.dumps(payload, indent=2, sort_keys=True))
def _write_stream(text: str, *, stream: Any) -> None:
if text == "":
return
stream.write(text)
stream.flush()
def _print_run_human(payload: dict[str, Any]) -> None:
stdout = str(payload.get("stdout", ""))
stderr = str(payload.get("stderr", ""))
_write_stream(stdout, stream=sys.stdout)
_write_stream(stderr, stream=sys.stderr)
print(
"[run] "
f"environment={str(payload.get('environment', 'unknown'))} "
f"execution_mode={str(payload.get('execution_mode', 'unknown'))} "
f"exit_code={int(payload.get('exit_code', 1))} "
f"duration_ms={int(payload.get('duration_ms', 0))}",
file=sys.stderr,
flush=True,
)
def _print_env_list_human(payload: dict[str, Any]) -> None:
print(f"Catalog version: {payload.get('catalog_version', 'unknown')}")
environments = payload.get("environments")
if not isinstance(environments, list) or not environments:
print("No environments found.")
return
for entry in environments:
if not isinstance(entry, dict):
continue
status = "installed" if bool(entry.get("installed")) else "not installed"
print(
f"{str(entry.get('name', 'unknown'))} [{status}] "
f"{str(entry.get('description', '')).strip()}".rstrip()
)
def _print_env_detail_human(payload: dict[str, Any], *, action: str) -> None:
print(f"{action}: {str(payload.get('name', 'unknown'))}")
print(f"Version: {str(payload.get('version', 'unknown'))}")
print(
f"Distribution: {str(payload.get('distribution', 'unknown'))} "
f"{str(payload.get('distribution_version', 'unknown'))}"
)
print(f"Installed: {'yes' if bool(payload.get('installed')) else 'no'}")
print(f"Cache dir: {str(payload.get('cache_dir', 'unknown'))}")
packages = payload.get("default_packages")
if isinstance(packages, list) and packages:
print("Default packages: " + ", ".join(str(item) for item in packages))
description = str(payload.get("description", "")).strip()
if description != "":
print(f"Description: {description}")
if payload.get("installed"):
print(f"Install dir: {str(payload.get('install_dir', 'unknown'))}")
install_manifest = payload.get("install_manifest")
if install_manifest is not None:
print(f"Install manifest: {str(install_manifest)}")
kernel_image = payload.get("kernel_image")
if kernel_image is not None:
print(f"Kernel image: {str(kernel_image)}")
rootfs_image = payload.get("rootfs_image")
if rootfs_image is not None:
print(f"Rootfs image: {str(rootfs_image)}")
registry = payload.get("oci_registry")
repository = payload.get("oci_repository")
reference = payload.get("oci_reference")
if isinstance(registry, str) and isinstance(repository, str) and isinstance(reference, str):
print(f"OCI source: {registry}/{repository}:{reference}")
def _print_prune_human(payload: dict[str, Any]) -> None:
count = int(payload.get("count", 0))
print(f"Deleted {count} cached environment entr{'y' if count == 1 else 'ies'}.")
deleted = payload.get("deleted_environment_dirs")
if isinstance(deleted, list):
for entry in deleted:
print(f"- {entry}")
def _print_doctor_human(payload: dict[str, Any]) -> None:
issues = payload.get("issues")
runtime_ok = bool(payload.get("runtime_ok"))
print(f"Platform: {str(payload.get('platform', 'unknown'))}")
print(f"Runtime: {'PASS' if runtime_ok else 'FAIL'}")
kvm = payload.get("kvm")
if isinstance(kvm, dict):
print(
"KVM: "
f"exists={'yes' if bool(kvm.get('exists')) else 'no'} "
f"readable={'yes' if bool(kvm.get('readable')) else 'no'} "
f"writable={'yes' if bool(kvm.get('writable')) else 'no'}"
)
runtime = payload.get("runtime")
if isinstance(runtime, dict):
print(f"Environment cache: {str(runtime.get('cache_dir', 'unknown'))}")
capabilities = runtime.get("capabilities")
if isinstance(capabilities, dict):
print(
"Capabilities: "
f"vm_boot={'yes' if bool(capabilities.get('supports_vm_boot')) else 'no'} "
f"guest_exec={'yes' if bool(capabilities.get('supports_guest_exec')) else 'no'} "
"guest_network="
f"{'yes' if bool(capabilities.get('supports_guest_network')) else 'no'}"
)
networking = payload.get("networking")
if isinstance(networking, dict):
print(
"Networking: "
f"tun={'yes' if bool(networking.get('tun_available')) else 'no'} "
f"ip_forward={'yes' if bool(networking.get('ip_forward_enabled')) else 'no'}"
)
if isinstance(issues, list) and issues:
print("Issues:")
for issue in issues:
print(f"- {issue}")
def _build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
description="pyro CLI for curated ephemeral Linux environments."
@ -27,15 +151,19 @@ def _build_parser() -> argparse.ArgumentParser:
env_parser = subparsers.add_parser("env", help="Inspect and manage curated environments.")
env_subparsers = env_parser.add_subparsers(dest="env_command", required=True)
env_subparsers.add_parser("list", help="List official environments.")
list_parser = env_subparsers.add_parser("list", help="List official environments.")
list_parser.add_argument("--json", action="store_true")
pull_parser = env_subparsers.add_parser(
"pull",
help="Install an environment into the local cache.",
)
pull_parser.add_argument("environment")
pull_parser.add_argument("--json", action="store_true")
inspect_parser = env_subparsers.add_parser("inspect", help="Inspect one environment.")
inspect_parser.add_argument("environment")
env_subparsers.add_parser("prune", help="Delete stale cached environments.")
inspect_parser.add_argument("--json", action="store_true")
prune_parser = env_subparsers.add_parser("prune", help="Delete stale cached environments.")
prune_parser.add_argument("--json", action="store_true")
mcp_parser = subparsers.add_parser("mcp", help="Run the MCP server.")
mcp_subparsers = mcp_parser.add_subparsers(dest="mcp_command", required=True)
@ -43,15 +171,18 @@ def _build_parser() -> argparse.ArgumentParser:
run_parser = subparsers.add_parser("run", help="Run one command inside an ephemeral VM.")
run_parser.add_argument("environment")
run_parser.add_argument("--vcpu-count", type=int, required=True)
run_parser.add_argument("--mem-mib", type=int, required=True)
run_parser.add_argument("--vcpu-count", type=int, default=DEFAULT_VCPU_COUNT)
run_parser.add_argument("--mem-mib", type=int, default=DEFAULT_MEM_MIB)
run_parser.add_argument("--timeout-seconds", type=int, default=30)
run_parser.add_argument("--ttl-seconds", type=int, default=600)
run_parser.add_argument("--network", action="store_true")
run_parser.add_argument("--allow-host-compat", action="store_true")
run_parser.add_argument("--json", action="store_true")
run_parser.add_argument("command_args", nargs="*")
doctor_parser = subparsers.add_parser("doctor", help="Inspect runtime and host diagnostics.")
doctor_parser.add_argument("--platform", default=DEFAULT_PLATFORM)
doctor_parser.add_argument("--json", action="store_true")
demo_parser = subparsers.add_parser("demo", help="Run built-in demos.")
demo_subparsers = demo_parser.add_subparsers(dest="demo_command")
@ -77,40 +208,72 @@ def main() -> None:
pyro = Pyro()
if args.command == "env":
if args.env_command == "list":
_print_json(
{
"catalog_version": DEFAULT_CATALOG_VERSION,
"environments": pyro.list_environments(),
}
)
list_payload: dict[str, Any] = {
"catalog_version": DEFAULT_CATALOG_VERSION,
"environments": pyro.list_environments(),
}
if bool(args.json):
_print_json(list_payload)
else:
_print_env_list_human(list_payload)
return
if args.env_command == "pull":
_print_json(dict(pyro.pull_environment(args.environment)))
pull_payload = pyro.pull_environment(args.environment)
if bool(args.json):
_print_json(pull_payload)
else:
_print_env_detail_human(pull_payload, action="Pulled")
return
if args.env_command == "inspect":
_print_json(dict(pyro.inspect_environment(args.environment)))
inspect_payload = pyro.inspect_environment(args.environment)
if bool(args.json):
_print_json(inspect_payload)
else:
_print_env_detail_human(inspect_payload, action="Environment")
return
if args.env_command == "prune":
_print_json(dict(pyro.prune_environments()))
prune_payload = pyro.prune_environments()
if bool(args.json):
_print_json(prune_payload)
else:
_print_prune_human(prune_payload)
return
if args.command == "mcp":
pyro.create_server().run(transport="stdio")
return
if args.command == "run":
command = _require_command(args.command_args)
result = pyro.run_in_vm(
environment=args.environment,
command=command,
vcpu_count=args.vcpu_count,
mem_mib=args.mem_mib,
timeout_seconds=args.timeout_seconds,
ttl_seconds=args.ttl_seconds,
network=args.network,
)
_print_json(result)
try:
result = pyro.run_in_vm(
environment=args.environment,
command=command,
vcpu_count=args.vcpu_count,
mem_mib=args.mem_mib,
timeout_seconds=args.timeout_seconds,
ttl_seconds=args.ttl_seconds,
network=args.network,
allow_host_compat=args.allow_host_compat,
)
except Exception as exc: # noqa: BLE001
if bool(args.json):
_print_json({"ok": False, "error": str(exc)})
else:
print(f"[error] {exc}", file=sys.stderr, flush=True)
raise SystemExit(1) from exc
if bool(args.json):
_print_json(result)
else:
_print_run_human(result)
exit_code = int(result.get("exit_code", 1))
if exit_code != 0:
raise SystemExit(exit_code)
return
if args.command == "doctor":
_print_json(doctor_report(platform=args.platform))
payload = doctor_report(platform=args.platform)
if bool(args.json):
_print_json(payload)
else:
_print_doctor_human(payload)
return
if args.command == "demo" and args.demo_command == "ollama":
try:

View file

@ -11,6 +11,8 @@ PUBLIC_CLI_RUN_FLAGS = (
"--timeout-seconds",
"--ttl-seconds",
"--network",
"--allow-host-compat",
"--json",
)
PUBLIC_SDK_METHODS = (

View file

@ -6,6 +6,7 @@ import json
from typing import Any
from pyro_mcp.api import Pyro
from pyro_mcp.vm_manager import DEFAULT_MEM_MIB, DEFAULT_TTL_SECONDS, DEFAULT_VCPU_COUNT
INTERNET_PROBE_COMMAND = (
'python3 -c "import urllib.request; '
@ -30,10 +31,10 @@ def run_demo(*, network: bool = False) -> dict[str, Any]:
return pyro.run_in_vm(
environment="debian:12",
command=_demo_command(status),
vcpu_count=1,
mem_mib=512,
vcpu_count=DEFAULT_VCPU_COUNT,
mem_mib=DEFAULT_MEM_MIB,
timeout_seconds=30,
ttl_seconds=600,
ttl_seconds=DEFAULT_TTL_SECONDS,
network=network,
)

View file

@ -10,6 +10,13 @@ from collections.abc import Callable
from typing import Any, Final, cast
from pyro_mcp.api import Pyro
from pyro_mcp.vm_manager import (
DEFAULT_ALLOW_HOST_COMPAT,
DEFAULT_MEM_MIB,
DEFAULT_TIMEOUT_SECONDS,
DEFAULT_TTL_SECONDS,
DEFAULT_VCPU_COUNT,
)
__all__ = ["Pyro", "run_ollama_tool_demo"]
@ -39,8 +46,9 @@ TOOL_SPECS: Final[list[dict[str, Any]]] = [
"timeout_seconds": {"type": "integer"},
"ttl_seconds": {"type": "integer"},
"network": {"type": "boolean"},
"allow_host_compat": {"type": "boolean"},
},
"required": ["environment", "command", "vcpu_count", "mem_mib"],
"required": ["environment", "command"],
"additionalProperties": False,
},
},
@ -61,7 +69,7 @@ TOOL_SPECS: Final[list[dict[str, Any]]] = [
"type": "function",
"function": {
"name": "vm_create",
"description": "Create an ephemeral VM with explicit vCPU and memory sizing.",
"description": "Create an ephemeral VM with optional resource sizing.",
"parameters": {
"type": "object",
"properties": {
@ -70,8 +78,9 @@ TOOL_SPECS: Final[list[dict[str, Any]]] = [
"mem_mib": {"type": "integer"},
"ttl_seconds": {"type": "integer"},
"network": {"type": "boolean"},
"allow_host_compat": {"type": "boolean"},
},
"required": ["environment", "vcpu_count", "mem_mib"],
"required": ["environment"],
"additionalProperties": False,
},
},
@ -192,6 +201,12 @@ def _require_int(arguments: dict[str, Any], key: str) -> int:
raise ValueError(f"{key} must be an integer")
def _optional_int(arguments: dict[str, Any], key: str, *, default: int) -> int:
if key not in arguments:
return default
return _require_int(arguments, key)
def _require_bool(arguments: dict[str, Any], key: str, *, default: bool = False) -> bool:
value = arguments.get(key, default)
if isinstance(value, bool):
@ -211,27 +226,37 @@ def _dispatch_tool_call(
pyro: Pyro, tool_name: str, arguments: dict[str, Any]
) -> dict[str, Any]:
if tool_name == "vm_run":
ttl_seconds = arguments.get("ttl_seconds", 600)
timeout_seconds = arguments.get("timeout_seconds", 30)
ttl_seconds = arguments.get("ttl_seconds", DEFAULT_TTL_SECONDS)
timeout_seconds = arguments.get("timeout_seconds", DEFAULT_TIMEOUT_SECONDS)
return pyro.run_in_vm(
environment=_require_str(arguments, "environment"),
command=_require_str(arguments, "command"),
vcpu_count=_require_int(arguments, "vcpu_count"),
mem_mib=_require_int(arguments, "mem_mib"),
vcpu_count=_optional_int(arguments, "vcpu_count", default=DEFAULT_VCPU_COUNT),
mem_mib=_optional_int(arguments, "mem_mib", default=DEFAULT_MEM_MIB),
timeout_seconds=_require_int({"timeout_seconds": timeout_seconds}, "timeout_seconds"),
ttl_seconds=_require_int({"ttl_seconds": ttl_seconds}, "ttl_seconds"),
network=_require_bool(arguments, "network", default=False),
allow_host_compat=_require_bool(
arguments,
"allow_host_compat",
default=DEFAULT_ALLOW_HOST_COMPAT,
),
)
if tool_name == "vm_list_environments":
return {"environments": pyro.list_environments()}
if tool_name == "vm_create":
ttl_seconds = arguments.get("ttl_seconds", 600)
ttl_seconds = arguments.get("ttl_seconds", DEFAULT_TTL_SECONDS)
return pyro.create_vm(
environment=_require_str(arguments, "environment"),
vcpu_count=_require_int(arguments, "vcpu_count"),
mem_mib=_require_int(arguments, "mem_mib"),
vcpu_count=_optional_int(arguments, "vcpu_count", default=DEFAULT_VCPU_COUNT),
mem_mib=_optional_int(arguments, "mem_mib", default=DEFAULT_MEM_MIB),
ttl_seconds=_require_int({"ttl_seconds": ttl_seconds}, "ttl_seconds"),
network=_require_bool(arguments, "network", default=False),
allow_host_compat=_require_bool(
arguments,
"allow_host_compat",
default=DEFAULT_ALLOW_HOST_COMPAT,
),
)
if tool_name == "vm_start":
return pyro.start_vm(_require_str(arguments, "vm_id"))
@ -275,10 +300,10 @@ def _run_direct_lifecycle_fallback(pyro: Pyro) -> dict[str, Any]:
return pyro.run_in_vm(
environment="debian:12",
command=NETWORK_PROOF_COMMAND,
vcpu_count=1,
mem_mib=512,
vcpu_count=DEFAULT_VCPU_COUNT,
mem_mib=DEFAULT_MEM_MIB,
timeout_seconds=60,
ttl_seconds=600,
ttl_seconds=DEFAULT_TTL_SECONDS,
network=True,
)

View file

@ -19,7 +19,7 @@ from typing import Any
from pyro_mcp.runtime import DEFAULT_PLATFORM, RuntimePaths
DEFAULT_ENVIRONMENT_VERSION = "1.0.0"
DEFAULT_CATALOG_VERSION = "1.0.0"
DEFAULT_CATALOG_VERSION = "2.0.0"
OCI_MANIFEST_ACCEPT = ", ".join(
(
"application/vnd.oci.image.index.v1+json",
@ -48,7 +48,7 @@ class VmEnvironment:
oci_repository: str | None = None
oci_reference: str | None = None
source_digest: str | None = None
compatibility: str = ">=1.0.0,<2.0.0"
compatibility: str = ">=2.0.0,<3.0.0"
@dataclass(frozen=True)
@ -114,6 +114,11 @@ def _default_cache_dir() -> Path:
)
def default_cache_dir() -> Path:
"""Return the canonical default environment cache directory."""
return _default_cache_dir()
def _manifest_profile_digest(runtime_paths: RuntimePaths, profile_name: str) -> str | None:
profiles = runtime_paths.manifest.get("profiles")
if not isinstance(profiles, dict):

View file

@ -19,13 +19,19 @@ from pyro_mcp.runtime import (
resolve_runtime_paths,
runtime_capabilities,
)
from pyro_mcp.vm_environments import EnvironmentStore, get_environment
from pyro_mcp.vm_environments import EnvironmentStore, default_cache_dir, get_environment
from pyro_mcp.vm_firecracker import build_launch_plan
from pyro_mcp.vm_guest import VsockExecClient
from pyro_mcp.vm_network import NetworkConfig, TapNetworkManager
VmState = Literal["created", "started", "stopped"]
DEFAULT_VCPU_COUNT = 1
DEFAULT_MEM_MIB = 1024
DEFAULT_TIMEOUT_SECONDS = 30
DEFAULT_TTL_SECONDS = 600
DEFAULT_ALLOW_HOST_COMPAT = False
@dataclass
class VmInstance:
@ -41,6 +47,7 @@ class VmInstance:
workdir: Path
state: VmState = "created"
network_requested: bool = False
allow_host_compat: bool = DEFAULT_ALLOW_HOST_COMPAT
firecracker_pid: int | None = None
last_error: str | None = None
metadata: dict[str, str] = field(default_factory=dict)
@ -262,7 +269,7 @@ class FirecrackerBackend(VmBackend): # pragma: no cover
)
instance.firecracker_pid = process.pid
instance.metadata["execution_mode"] = (
"guest_vsock" if self._runtime_capabilities.supports_guest_exec else "host_compat"
"guest_vsock" if self._runtime_capabilities.supports_guest_exec else "guest_boot_only"
)
instance.metadata["boot_mode"] = "native"
@ -342,6 +349,11 @@ class VmManager:
MAX_MEM_MIB = 32768
MIN_TTL_SECONDS = 60
MAX_TTL_SECONDS = 3600
DEFAULT_VCPU_COUNT = DEFAULT_VCPU_COUNT
DEFAULT_MEM_MIB = DEFAULT_MEM_MIB
DEFAULT_TIMEOUT_SECONDS = DEFAULT_TIMEOUT_SECONDS
DEFAULT_TTL_SECONDS = DEFAULT_TTL_SECONDS
DEFAULT_ALLOW_HOST_COMPAT = DEFAULT_ALLOW_HOST_COMPAT
def __init__(
self,
@ -355,7 +367,7 @@ class VmManager:
) -> None:
self._backend_name = backend_name or "firecracker"
self._base_dir = base_dir or Path("/tmp/pyro-mcp")
resolved_cache_dir = cache_dir or self._base_dir / ".environment-cache"
resolved_cache_dir = cache_dir or default_cache_dir()
self._runtime_paths = runtime_paths
if self._backend_name == "firecracker":
self._runtime_paths = self._runtime_paths or resolve_runtime_paths()
@ -420,10 +432,11 @@ class VmManager:
self,
*,
environment: str,
vcpu_count: int,
mem_mib: int,
ttl_seconds: int,
vcpu_count: int = DEFAULT_VCPU_COUNT,
mem_mib: int = DEFAULT_MEM_MIB,
ttl_seconds: int = DEFAULT_TTL_SECONDS,
network: bool = False,
allow_host_compat: bool = DEFAULT_ALLOW_HOST_COMPAT,
) -> dict[str, Any]:
self._validate_limits(vcpu_count=vcpu_count, mem_mib=mem_mib, ttl_seconds=ttl_seconds)
get_environment(environment, runtime_paths=self._runtime_paths)
@ -446,7 +459,9 @@ class VmManager:
expires_at=now + ttl_seconds,
workdir=self._base_dir / vm_id,
network_requested=network,
allow_host_compat=allow_host_compat,
)
instance.metadata["allow_host_compat"] = str(allow_host_compat).lower()
self._backend.create(instance)
self._instances[vm_id] = instance
return self._serialize(instance)
@ -456,11 +471,12 @@ class VmManager:
*,
environment: str,
command: str,
vcpu_count: int,
mem_mib: int,
timeout_seconds: int = 30,
ttl_seconds: int = 600,
vcpu_count: int = DEFAULT_VCPU_COUNT,
mem_mib: int = DEFAULT_MEM_MIB,
timeout_seconds: int = DEFAULT_TIMEOUT_SECONDS,
ttl_seconds: int = DEFAULT_TTL_SECONDS,
network: bool = False,
allow_host_compat: bool = DEFAULT_ALLOW_HOST_COMPAT,
) -> dict[str, Any]:
created = self.create_vm(
environment=environment,
@ -468,6 +484,7 @@ class VmManager:
mem_mib=mem_mib,
ttl_seconds=ttl_seconds,
network=network,
allow_host_compat=allow_host_compat,
)
vm_id = str(created["vm_id"])
try:
@ -486,6 +503,12 @@ class VmManager:
self._ensure_not_expired_locked(instance, time.time())
if instance.state not in {"created", "stopped"}:
raise RuntimeError(f"vm {vm_id} cannot be started from state {instance.state!r}")
self._require_guest_boot_or_opt_in(instance)
if not self._runtime_capabilities.supports_vm_boot:
instance.metadata["execution_mode"] = "host_compat"
instance.metadata["boot_mode"] = "compat"
if self._runtime_capabilities.reason is not None:
instance.metadata["runtime_reason"] = self._runtime_capabilities.reason
self._backend.start(instance)
instance.state = "started"
return self._serialize(instance)
@ -498,8 +521,11 @@ class VmManager:
self._ensure_not_expired_locked(instance, time.time())
if instance.state != "started":
raise RuntimeError(f"vm {vm_id} must be in 'started' state before vm_exec")
self._require_guest_exec_or_opt_in(instance)
if not self._runtime_capabilities.supports_guest_exec:
instance.metadata["execution_mode"] = "host_compat"
exec_result = self._backend.exec(instance, command, timeout_seconds)
execution_mode = instance.metadata.get("execution_mode", "host_compat")
execution_mode = instance.metadata.get("execution_mode", "unknown")
cleanup = self.delete_vm(vm_id, reason="post_exec_cleanup")
return {
"vm_id": vm_id,
@ -587,12 +613,35 @@ class VmManager:
"expires_at": instance.expires_at,
"state": instance.state,
"network_enabled": instance.network is not None,
"allow_host_compat": instance.allow_host_compat,
"guest_ip": instance.network.guest_ip if instance.network is not None else None,
"tap_name": instance.network.tap_name if instance.network is not None else None,
"execution_mode": instance.metadata.get("execution_mode", "host_compat"),
"execution_mode": instance.metadata.get("execution_mode", "pending"),
"metadata": instance.metadata,
}
def _require_guest_boot_or_opt_in(self, instance: VmInstance) -> None:
if self._runtime_capabilities.supports_vm_boot or instance.allow_host_compat:
return
reason = self._runtime_capabilities.reason or "runtime does not support real VM boot"
raise RuntimeError(
"guest boot is unavailable and host compatibility mode is disabled: "
f"{reason}. Set allow_host_compat=True (CLI: --allow-host-compat) to opt into "
"host execution."
)
def _require_guest_exec_or_opt_in(self, instance: VmInstance) -> None:
if self._runtime_capabilities.supports_guest_exec or instance.allow_host_compat:
return
reason = self._runtime_capabilities.reason or (
"runtime does not support guest command execution"
)
raise RuntimeError(
"guest command execution is unavailable and host compatibility mode is disabled: "
f"{reason}. Set allow_host_compat=True (CLI: --allow-host-compat) to opt into "
"host execution."
)
def _get_instance_locked(self, vm_id: str) -> VmInstance:
try:
return self._instances[vm_id]