Export bootable environments as OCI layouts

This commit is contained in:
Thales Maciel 2026-03-08 18:17:25 -03:00
parent 89f3d6f012
commit f6d3bf0e90
3 changed files with 450 additions and 6 deletions

View file

@ -4,6 +4,7 @@ from __future__ import annotations
import argparse
import hashlib
import io
import json
import shutil
import subprocess
@ -11,16 +12,25 @@ import tarfile
import urllib.request
import uuid
from dataclasses import dataclass
from datetime import UTC, datetime
from pathlib import Path
from typing import Any
from pyro_mcp.runtime import DEFAULT_PLATFORM
from pyro_mcp.vm_environments import get_environment
DEFAULT_RUNTIME_SOURCE_DIR = Path("runtime_sources")
DEFAULT_RUNTIME_BUILD_DIR = Path("build/runtime_bundle")
DEFAULT_RUNTIME_BUNDLE_DIR = Path("src/pyro_mcp/runtime_bundle")
DEFAULT_RUNTIME_MATERIALIZED_DIR = Path("build/runtime_sources")
DEFAULT_RUNTIME_OCI_LAYOUT_DIR = Path("build/oci_layouts")
DOWNLOAD_CHUNK_SIZE = 1024 * 1024
OCI_IMAGE_MANIFEST_MEDIA_TYPE = "application/vnd.oci.image.manifest.v1+json"
OCI_IMAGE_CONFIG_MEDIA_TYPE = "application/vnd.oci.image.config.v1+json"
OCI_IMAGE_LAYER_MEDIA_TYPE = "application/vnd.oci.image.layer.v1.tar"
OCI_IMAGE_INDEX_MEDIA_TYPE = "application/vnd.oci.image.index.v1+json"
OCI_LAYOUT_VERSION = "1.0.0"
VALIDATION_READ_LIMIT = 1024 * 1024
@dataclass(frozen=True)
@ -120,21 +130,139 @@ def _run(command: list[str]) -> None: # pragma: no cover - integration helper
raise RuntimeError(f"command {' '.join(command)!r} failed: {stderr}")
def _sha256_bytes(payload: bytes) -> str:
return hashlib.sha256(payload).hexdigest()
def _path_contains_marker(
path: Path,
marker: bytes,
*,
read_limit: int = VALIDATION_READ_LIMIT,
) -> bool:
bytes_remaining = read_limit
overlap = max(len(marker) - 1, 0)
previous_tail = b""
with path.open("rb") as fp:
while bytes_remaining > 0:
chunk = fp.read(min(DOWNLOAD_CHUNK_SIZE, bytes_remaining))
if chunk == b"":
break
payload = previous_tail + chunk
if marker in payload:
return True
previous_tail = payload[-overlap:] if overlap > 0 else b""
bytes_remaining -= len(chunk)
return False
def _environment_slug(environment: str) -> str:
return environment.replace(":", "_").replace("/", "_")
def _platform_to_oci_platform(platform: str) -> tuple[str, str]:
os_name, separator, architecture = platform.partition("-")
if separator == "" or os_name == "" or architecture == "":
raise RuntimeError(f"unsupported runtime platform format: {platform}")
architecture_aliases = {"x86_64": "amd64", "aarch64": "arm64"}
return os_name, architecture_aliases.get(architecture, architecture)
def _blob_path(blobs_dir: Path, digest: str) -> Path:
algorithm, separator, value = digest.partition(":")
if separator == "" or value == "":
raise RuntimeError(f"invalid OCI digest: {digest}")
return blobs_dir / algorithm / value
def _write_blob_bytes(
blobs_dir: Path,
payload: bytes,
) -> tuple[str, int, Path]:
digest = f"sha256:{_sha256_bytes(payload)}"
blob_path = _blob_path(blobs_dir, digest)
blob_path.parent.mkdir(parents=True, exist_ok=True)
blob_path.write_bytes(payload)
return digest, len(payload), blob_path
def _normalized_tar_info(name: str, size: int, *, mode: int = 0o644) -> tarfile.TarInfo:
info = tarfile.TarInfo(name=name)
info.size = size
info.mode = mode
info.mtime = 0
info.uid = 0
info.gid = 0
info.uname = ""
info.gname = ""
return info
def _write_tar_blob_from_path(
blobs_dir: Path,
*,
source_path: Path,
arcname: str,
) -> tuple[str, int, Path]:
temp_dir = blobs_dir / "sha256"
temp_dir.mkdir(parents=True, exist_ok=True)
temp_path = temp_dir / f".tmp-{uuid.uuid4().hex}.tar"
try:
with temp_path.open("wb") as fp:
with tarfile.open(fileobj=fp, mode="w") as archive:
file_size = source_path.stat().st_size
info = _normalized_tar_info(arcname, file_size)
with source_path.open("rb") as source_fp:
archive.addfile(info, source_fp)
digest = f"sha256:{_sha256(temp_path)}"
blob_path = _blob_path(blobs_dir, digest)
blob_path.parent.mkdir(parents=True, exist_ok=True)
size = temp_path.stat().st_size
temp_path.replace(blob_path)
return digest, size, blob_path
except Exception:
temp_path.unlink(missing_ok=True)
raise
def _write_tar_blob_from_bytes(
blobs_dir: Path,
*,
payload: bytes,
arcname: str,
) -> tuple[str, int, Path]:
temp_dir = blobs_dir / "sha256"
temp_dir.mkdir(parents=True, exist_ok=True)
temp_path = temp_dir / f".tmp-{uuid.uuid4().hex}.tar"
try:
with temp_path.open("wb") as fp:
with tarfile.open(fileobj=fp, mode="w") as archive:
info = _normalized_tar_info(arcname, len(payload))
archive.addfile(info, io.BytesIO(payload))
digest = f"sha256:{_sha256(temp_path)}"
blob_path = _blob_path(blobs_dir, digest)
blob_path.parent.mkdir(parents=True, exist_ok=True)
size = temp_path.stat().st_size
temp_path.replace(blob_path)
return digest, size, blob_path
except Exception:
temp_path.unlink(missing_ok=True)
raise
def validate_sources(paths: RuntimeBuildPaths, lock: RuntimeBuildLock) -> None:
firecracker_source = _resolved_source_path(paths, lock.binaries["firecracker"])
jailer_source = _resolved_source_path(paths, lock.binaries["jailer"])
firecracker_text = firecracker_source.read_text(encoding="utf-8", errors="ignore")
jailer_text = jailer_source.read_text(encoding="utf-8", errors="ignore")
has_shim_binaries = (
"bundled firecracker shim" in firecracker_text or "bundled jailer shim" in jailer_text
_path_contains_marker(firecracker_source, b"bundled firecracker shim")
or _path_contains_marker(jailer_source, b"bundled jailer shim")
)
has_placeholder_profiles = False
for profile in lock.profiles.values():
for kind in ("kernel", "rootfs"):
source = _resolved_source_path(paths, profile[kind])
text = source.read_text(encoding="utf-8", errors="ignore")
if "placeholder-" in text:
if _path_contains_marker(source, b"placeholder-"):
has_placeholder_profiles = True
break
if has_placeholder_profiles:
@ -365,6 +493,170 @@ def stage_agent(paths: RuntimeBuildPaths, lock: RuntimeBuildLock) -> None:
dest.chmod(dest.stat().st_mode | 0o111)
def export_environment_oci_layout(
paths: RuntimeBuildPaths,
*,
environment: str,
output_dir: Path,
reference: str | None = None,
) -> dict[str, Any]:
lock = _load_lock(paths)
validate_sources(paths, lock)
spec = get_environment(environment)
try:
profile = lock.profiles[spec.source_profile]
except KeyError as exc:
raise RuntimeError(
f"runtime lock does not define source profile {spec.source_profile!r} "
f"for environment {environment!r}"
) from exc
kernel_path = _resolved_source_path(paths, profile["kernel"])
rootfs_path = _resolved_source_path(paths, profile["rootfs"])
if not kernel_path.exists() or not rootfs_path.exists():
raise RuntimeError(
f"missing artifacts for environment {environment!r}; expected "
f"{kernel_path} and {rootfs_path}"
)
layout_dir = output_dir / _environment_slug(environment)
if layout_dir.exists():
shutil.rmtree(layout_dir)
blobs_dir = layout_dir / "blobs"
blob_sha_dir = blobs_dir / "sha256"
blob_sha_dir.mkdir(parents=True, exist_ok=True)
metadata_payload = {
"environment": spec.name,
"version": spec.version,
"platform": spec.platform,
"distribution": spec.distribution,
"distribution_version": spec.distribution_version,
"description": spec.description,
"default_packages": list(spec.default_packages),
"source_profile": spec.source_profile,
"bundle_version": lock.bundle_version,
"component_versions": lock.component_versions,
"capabilities": lock.capabilities,
}
metadata_bytes = json.dumps(metadata_payload, indent=2, sort_keys=True).encode("utf-8") + b"\n"
kernel_digest, kernel_size, _ = _write_tar_blob_from_path(
blobs_dir,
source_path=kernel_path,
arcname="vmlinux",
)
rootfs_digest, rootfs_size, _ = _write_tar_blob_from_path(
blobs_dir,
source_path=rootfs_path,
arcname="rootfs.ext4",
)
metadata_digest, metadata_size, _ = _write_tar_blob_from_bytes(
blobs_dir,
payload=metadata_bytes,
arcname="environment.json",
)
created_at = datetime.now(UTC).isoformat().replace("+00:00", "Z")
ref_name = reference or spec.version
os_name, architecture = _platform_to_oci_platform(spec.platform)
labels = {
"io.pyro.environment": spec.name,
"io.pyro.environment.version": spec.version,
"io.pyro.source_profile": spec.source_profile,
"org.opencontainers.image.title": spec.name,
"org.opencontainers.image.version": spec.version,
}
config_payload = {
"created": created_at,
"architecture": architecture,
"os": os_name,
"config": {"Labels": labels},
"rootfs": {
"type": "layers",
"diff_ids": [kernel_digest, rootfs_digest, metadata_digest],
},
"history": [
{"created": created_at, "created_by": "pyro runtime_build export-environment-oci"}
],
}
config_bytes = json.dumps(config_payload, indent=2, sort_keys=True).encode("utf-8") + b"\n"
config_digest, config_size, _ = _write_blob_bytes(blobs_dir, config_bytes)
manifest_payload = {
"schemaVersion": 2,
"mediaType": OCI_IMAGE_MANIFEST_MEDIA_TYPE,
"config": {
"mediaType": OCI_IMAGE_CONFIG_MEDIA_TYPE,
"digest": config_digest,
"size": config_size,
},
"layers": [
{
"mediaType": OCI_IMAGE_LAYER_MEDIA_TYPE,
"digest": kernel_digest,
"size": kernel_size,
"annotations": {"org.opencontainers.image.title": "vmlinux"},
},
{
"mediaType": OCI_IMAGE_LAYER_MEDIA_TYPE,
"digest": rootfs_digest,
"size": rootfs_size,
"annotations": {"org.opencontainers.image.title": "rootfs.ext4"},
},
{
"mediaType": OCI_IMAGE_LAYER_MEDIA_TYPE,
"digest": metadata_digest,
"size": metadata_size,
"annotations": {"org.opencontainers.image.title": "environment.json"},
},
],
"annotations": labels,
}
manifest_bytes = json.dumps(manifest_payload, indent=2, sort_keys=True).encode("utf-8") + b"\n"
manifest_digest, manifest_size, _ = _write_blob_bytes(blobs_dir, manifest_bytes)
index_payload = {
"schemaVersion": 2,
"mediaType": OCI_IMAGE_INDEX_MEDIA_TYPE,
"manifests": [
{
"mediaType": OCI_IMAGE_MANIFEST_MEDIA_TYPE,
"digest": manifest_digest,
"size": manifest_size,
"annotations": {
"org.opencontainers.image.ref.name": ref_name,
"org.opencontainers.image.title": spec.name,
},
"platform": {"os": os_name, "architecture": architecture},
}
],
}
(layout_dir / "oci-layout").write_text(
json.dumps({"imageLayoutVersion": OCI_LAYOUT_VERSION}, indent=2, sort_keys=True) + "\n",
encoding="utf-8",
)
(layout_dir / "index.json").write_text(
json.dumps(index_payload, indent=2, sort_keys=True) + "\n",
encoding="utf-8",
)
return {
"environment": spec.name,
"version": spec.version,
"reference": ref_name,
"layout_dir": str(layout_dir),
"manifest_digest": manifest_digest,
"config_digest": config_digest,
"layers": [
{"title": "vmlinux", "digest": kernel_digest, "size": kernel_size},
{"title": "rootfs.ext4", "digest": rootfs_digest, "size": rootfs_size},
{"title": "environment.json", "digest": metadata_digest, "size": metadata_size},
],
}
def generate_manifest(paths: RuntimeBuildPaths, lock: RuntimeBuildLock) -> dict[str, Any]:
manifest: dict[str, Any] = {
"bundle_version": lock.bundle_version,
@ -468,6 +760,7 @@ def _build_parser() -> argparse.ArgumentParser: # pragma: no cover - CLI wiring
"build-kernel",
"build-rootfs",
"materialize",
"export-environment-oci",
"stage-binaries",
"stage-kernel",
"stage-rootfs",
@ -483,6 +776,9 @@ def _build_parser() -> argparse.ArgumentParser: # pragma: no cover - CLI wiring
parser.add_argument("--build-dir", default=str(DEFAULT_RUNTIME_BUILD_DIR))
parser.add_argument("--bundle-dir", default=str(DEFAULT_RUNTIME_BUNDLE_DIR))
parser.add_argument("--materialized-dir", default=str(DEFAULT_RUNTIME_MATERIALIZED_DIR))
parser.add_argument("--environment")
parser.add_argument("--output-dir", default=str(DEFAULT_RUNTIME_OCI_LAYOUT_DIR))
parser.add_argument("--reference")
return parser
@ -508,6 +804,17 @@ def main() -> None: # pragma: no cover - CLI wiring
if args.command == "materialize":
materialize_sources(paths)
return
if args.command == "export-environment-oci":
if not isinstance(args.environment, str) or args.environment == "":
raise RuntimeError("--environment is required for export-environment-oci")
result = export_environment_oci_layout(
paths,
environment=args.environment,
output_dir=Path(args.output_dir),
reference=args.reference,
)
print(json.dumps(result, indent=2, sort_keys=True))
return
if args.command == "bundle":
build_bundle(paths, sync=True)
return
@ -538,3 +845,7 @@ def main() -> None: # pragma: no cover - CLI wiring
sync_bundle(paths)
return
raise RuntimeError(f"unknown command: {args.command}")
if __name__ == "__main__": # pragma: no cover - CLI entrypoint
main()