From 6406f673c10a5501515a88f636f924e9f1627617 Mon Sep 17 00:00:00 2001 From: Thales Maciel Date: Sun, 8 Mar 2026 18:27:32 -0300 Subject: [PATCH] Add OCI registry publish support --- Makefile | 6 +- src/pyro_mcp/runtime_build.py | 372 +++++++++++++++++++++++++++- tests/test_runtime_build.py | 442 ++++++++++++++++++++++++++++++++++ 3 files changed, 817 insertions(+), 3 deletions(-) diff --git a/Makefile b/Makefile index 2b475ec..0429b54 100644 --- a/Makefile +++ b/Makefile @@ -11,7 +11,7 @@ RUNTIME_MATERIALIZED_DIR ?= build/runtime_sources RUNTIME_OCI_LAYOUT_DIR ?= build/oci_layouts RUNTIME_ENVIRONMENT ?= debian:12-base -.PHONY: help setup lint format typecheck test check dist-check demo network-demo doctor ollama ollama-demo run-server install-hooks runtime-bundle runtime-binaries runtime-kernel runtime-rootfs runtime-agent runtime-validate runtime-manifest runtime-sync runtime-clean runtime-fetch-binaries runtime-build-kernel-real runtime-build-rootfs-real runtime-materialize runtime-export-environment-oci runtime-boot-check runtime-network-check +.PHONY: help setup lint format typecheck test check dist-check demo network-demo doctor ollama ollama-demo run-server install-hooks runtime-bundle runtime-binaries runtime-kernel runtime-rootfs runtime-agent runtime-validate runtime-manifest runtime-sync runtime-clean runtime-fetch-binaries runtime-build-kernel-real runtime-build-rootfs-real runtime-materialize runtime-export-environment-oci runtime-publish-environment-oci runtime-boot-check runtime-network-check help: @printf '%s\n' \ @@ -43,6 +43,7 @@ help: ' runtime-build-rootfs-real Materialize the real guest rootfs images' \ ' runtime-materialize Run all real-source materialization steps' \ ' runtime-export-environment-oci Export one environment as a local OCI layout' \ + ' runtime-publish-environment-oci Publish one exported OCI layout to its registry target' \ ' runtime-boot-check Validate direct Firecracker boot from the bundled runtime' \ ' runtime-network-check Validate outbound guest networking from the bundled runtime' \ ' runtime-clean Remove generated runtime build artifacts' @@ -132,6 +133,9 @@ runtime-materialize: runtime-export-environment-oci: uv run python -m pyro_mcp.runtime_build export-environment-oci --platform "$(RUNTIME_PLATFORM)" --source-dir "$(RUNTIME_SOURCE_DIR)" --build-dir "$(RUNTIME_BUILD_DIR)" --bundle-dir "$(RUNTIME_BUNDLE_DIR)" --materialized-dir "$(RUNTIME_MATERIALIZED_DIR)" --environment "$(RUNTIME_ENVIRONMENT)" --output-dir "$(RUNTIME_OCI_LAYOUT_DIR)" +runtime-publish-environment-oci: + uv run python -m pyro_mcp.runtime_build publish-environment-oci --platform "$(RUNTIME_PLATFORM)" --source-dir "$(RUNTIME_SOURCE_DIR)" --build-dir "$(RUNTIME_BUILD_DIR)" --bundle-dir "$(RUNTIME_BUNDLE_DIR)" --materialized-dir "$(RUNTIME_MATERIALIZED_DIR)" --environment "$(RUNTIME_ENVIRONMENT)" --layout-root "$(RUNTIME_OCI_LAYOUT_DIR)" + runtime-boot-check: uv run python -m pyro_mcp.runtime_boot_check diff --git a/src/pyro_mcp/runtime_build.py b/src/pyro_mcp/runtime_build.py index 1447243..e7f9203 100644 --- a/src/pyro_mcp/runtime_build.py +++ b/src/pyro_mcp/runtime_build.py @@ -3,18 +3,22 @@ from __future__ import annotations import argparse +import base64 import hashlib import io import json +import os import shutil import subprocess import tarfile +import urllib.error +import urllib.parse import urllib.request import uuid from dataclasses import dataclass from datetime import UTC, datetime from pathlib import Path -from typing import Any +from typing import Any, cast from pyro_mcp.runtime import DEFAULT_PLATFORM from pyro_mcp.vm_environments import get_environment @@ -31,6 +35,8 @@ OCI_IMAGE_LAYER_MEDIA_TYPE = "application/vnd.oci.image.layer.v1.tar" OCI_IMAGE_INDEX_MEDIA_TYPE = "application/vnd.oci.image.index.v1+json" OCI_LAYOUT_VERSION = "1.0.0" VALIDATION_READ_LIMIT = 1024 * 1024 +DEFAULT_OCI_USERNAME_ENV = "OCI_REGISTRY_USERNAME" +DEFAULT_OCI_PASSWORD_ENV = "OCI_REGISTRY_PASSWORD" @dataclass(frozen=True) @@ -250,6 +256,340 @@ def _write_tar_blob_from_bytes( raise +def _normalize_headers(headers: Any) -> dict[str, str]: + return {str(key).lower(): str(value) for key, value in headers.items()} + + +def _basic_auth_header(username: str | None, password: str | None) -> str | None: + if username is None or password is None: + return None + token = base64.b64encode(f"{username}:{password}".encode("utf-8")).decode("ascii") + return f"Basic {token}" + + +def _parse_authenticate_parameters(raw: str) -> dict[str, str]: + params: dict[str, str] = {} + for segment in raw.split(","): + if "=" not in segment: + continue + key, value = segment.split("=", 1) + params[key.strip()] = value.strip().strip('"') + return params + + +def _registry_url(registry: str, repository: str, suffix: str) -> str: + return f"https://{registry}/v2/{repository}/{suffix}" + + +def _registry_credentials( + *, + username_env: str = DEFAULT_OCI_USERNAME_ENV, + password_env: str = DEFAULT_OCI_PASSWORD_ENV, +) -> tuple[str | None, str | None]: + username = os.environ.get(username_env) + password = os.environ.get(password_env) + return ( + username if isinstance(username, str) and username != "" else None, + password if isinstance(password, str) and password != "" else None, + ) + + +def _rewind_body(data: object) -> None: + if hasattr(data, "seek"): + data.seek(0) + + +def _read_response_body(response: Any) -> bytes: + if hasattr(response, "read"): + return bytes(response.read()) + return b"" + + +def _fetch_registry_token( + authenticate: str, + *, + repository: str, + scope_actions: str, + username: str | None, + password: str | None, +) -> str: + if not authenticate.startswith("Bearer "): + raise RuntimeError("unsupported OCI registry authentication scheme") + params = _parse_authenticate_parameters(authenticate[len("Bearer ") :]) + realm = params.get("realm") + if realm is None: + raise RuntimeError("OCI auth challenge did not include a token realm") + query = { + "service": params.get("service", ""), + "scope": params.get("scope", f"repository:{repository}:{scope_actions}"), + } + headers: dict[str, str] = {} + basic_auth = _basic_auth_header(username, password) + if basic_auth is not None: + headers["Authorization"] = basic_auth + request = urllib.request.Request( + f"{realm}?{urllib.parse.urlencode(query)}", + headers=headers, + method="GET", + ) + with urllib.request.urlopen(request, timeout=90) as response: # noqa: S310 + payload = json.loads(_read_response_body(response).decode("utf-8")) + if not isinstance(payload, dict): + raise RuntimeError("OCI auth token response was not a JSON object") + raw_token = payload.get("token") or payload.get("access_token") + if not isinstance(raw_token, str) or raw_token == "": + raise RuntimeError("OCI auth token response did not include a bearer token") + return raw_token + + +def _request_registry( + url: str, + *, + method: str, + repository: str, + scope_actions: str, + headers: dict[str, str] | None = None, + data: object = None, + allow_statuses: tuple[int, ...] = (), + username: str | None = None, + password: str | None = None, +) -> tuple[int, bytes, dict[str, str]]: + request_headers = dict(headers or {}) + request = urllib.request.Request( + url, + data=cast(Any, data), + headers=request_headers, + method=method, + ) + try: + with urllib.request.urlopen(request, timeout=90) as response: # noqa: S310 + return ( + response.status, + _read_response_body(response), + _normalize_headers(response.headers), + ) + except urllib.error.HTTPError as exc: + if exc.code == 401: + authenticate = exc.headers.get("WWW-Authenticate") + if authenticate is None: + raise RuntimeError("OCI registry denied access without an auth challenge") from exc + token = _fetch_registry_token( + authenticate, + repository=repository, + scope_actions=scope_actions, + username=username, + password=password, + ) + authenticated_headers = {**request_headers, "Authorization": f"Bearer {token}"} + _rewind_body(data) + retry = urllib.request.Request( + url, + data=cast(Any, data), + headers=authenticated_headers, + method=method, + ) + try: + with urllib.request.urlopen(retry, timeout=90) as response: # noqa: S310 + return ( + response.status, + _read_response_body(response), + _normalize_headers(response.headers), + ) + except urllib.error.HTTPError as retry_exc: + if retry_exc.code in allow_statuses: + return ( + retry_exc.code, + _read_response_body(retry_exc), + _normalize_headers(retry_exc.headers), + ) + raise RuntimeError(f"registry request failed for {url}: {retry_exc}") from retry_exc + if exc.code in allow_statuses: + return exc.code, _read_response_body(exc), _normalize_headers(exc.headers) + raise RuntimeError(f"registry request failed for {url}: {exc}") from exc + + +def _load_oci_layout_manifest( + layout_dir: Path, +) -> tuple[dict[str, Any], bytes, dict[str, Any], str]: + index_path = layout_dir / "index.json" + if not index_path.exists(): + raise RuntimeError(f"OCI layout index not found: {index_path}") + index_payload = json.loads(index_path.read_text(encoding="utf-8")) + manifests = index_payload.get("manifests") + if not isinstance(manifests, list) or len(manifests) != 1: + raise RuntimeError("OCI layout must contain exactly one manifest descriptor") + descriptor = manifests[0] + if not isinstance(descriptor, dict): + raise RuntimeError("OCI layout manifest descriptor is malformed") + raw_digest = descriptor.get("digest") + if not isinstance(raw_digest, str): + raise RuntimeError("OCI layout manifest descriptor is missing a digest") + manifest_path = _blob_path(layout_dir / "blobs", raw_digest) + manifest_bytes = manifest_path.read_bytes() + manifest = json.loads(manifest_bytes.decode("utf-8")) + if not isinstance(manifest, dict): + raise RuntimeError("OCI layout manifest payload is malformed") + media_type = descriptor.get("mediaType") + if not isinstance(media_type, str) or media_type == "": + media_type = str(manifest.get("mediaType") or OCI_IMAGE_MANIFEST_MEDIA_TYPE) + return descriptor, manifest_bytes, manifest, media_type + + +def _blob_exists( + *, + registry: str, + repository: str, + digest: str, + username: str | None, + password: str | None, +) -> bool: + status, _, _ = _request_registry( + _registry_url(registry, repository, f"blobs/{digest}"), + method="HEAD", + repository=repository, + scope_actions="pull,push", + allow_statuses=(404,), + username=username, + password=password, + ) + return status == 200 + + +def _upload_blob( + *, + registry: str, + repository: str, + digest: str, + blob_path: Path, + username: str | None, + password: str | None, +) -> dict[str, Any]: + if _blob_exists( + registry=registry, + repository=repository, + digest=digest, + username=username, + password=password, + ): + return {"digest": digest, "size": blob_path.stat().st_size, "uploaded": False} + + status, _, headers = _request_registry( + _registry_url(registry, repository, "blobs/uploads/"), + method="POST", + repository=repository, + scope_actions="pull,push", + headers={"Content-Length": "0"}, + allow_statuses=(202,), + username=username, + password=password, + ) + if status != 202: + raise RuntimeError(f"unexpected registry status when starting blob upload: {status}") + location = headers.get("location") + if location is None: + raise RuntimeError("registry did not return a blob upload location") + upload_url = urllib.parse.urljoin(f"https://{registry}", location) + separator = "&" if "?" in upload_url else "?" + upload_url = f"{upload_url}{separator}{urllib.parse.urlencode({'digest': digest})}" + size = blob_path.stat().st_size + with blob_path.open("rb") as blob_fp: + status, _, _ = _request_registry( + upload_url, + method="PUT", + repository=repository, + scope_actions="pull,push", + headers={ + "Content-Length": str(size), + "Content-Type": "application/octet-stream", + }, + data=blob_fp, + allow_statuses=(201,), + username=username, + password=password, + ) + if status != 201: + raise RuntimeError(f"unexpected registry status when uploading blob: {status}") + return {"digest": digest, "size": size, "uploaded": True} + + +def publish_environment_oci_layout( + *, + environment: str, + layout_root: Path, + registry: str | None = None, + repository: str | None = None, + reference: str | None = None, + username_env: str = DEFAULT_OCI_USERNAME_ENV, + password_env: str = DEFAULT_OCI_PASSWORD_ENV, +) -> dict[str, Any]: + spec = get_environment(environment) + resolved_registry = registry or spec.oci_registry + resolved_repository = repository or spec.oci_repository + resolved_reference = reference or spec.oci_reference or spec.version + if resolved_registry is None or resolved_repository is None: + raise RuntimeError(f"environment {environment!r} does not define an OCI registry target") + + layout_dir = layout_root / _environment_slug(environment) + descriptor, manifest_bytes, manifest, media_type = _load_oci_layout_manifest(layout_dir) + username, password = _registry_credentials( + username_env=username_env, + password_env=password_env, + ) + + config = manifest.get("config") + layers = manifest.get("layers") + if not isinstance(config, dict): + raise RuntimeError("OCI layout manifest is missing a config descriptor") + if not isinstance(layers, list): + raise RuntimeError("OCI layout manifest is missing layers") + + uploaded_blobs: list[dict[str, Any]] = [] + descriptors = [config, *layers] + for entry in descriptors: + if not isinstance(entry, dict): + raise RuntimeError("OCI layout descriptor is malformed") + raw_digest = entry.get("digest") + if not isinstance(raw_digest, str): + raise RuntimeError("OCI layout descriptor is missing a digest") + blob_path = _blob_path(layout_dir / "blobs", raw_digest) + uploaded_blobs.append( + _upload_blob( + registry=resolved_registry, + repository=resolved_repository, + digest=raw_digest, + blob_path=blob_path, + username=username, + password=password, + ) + ) + + status, _, headers = _request_registry( + _registry_url(resolved_registry, resolved_repository, f"manifests/{resolved_reference}"), + method="PUT", + repository=resolved_repository, + scope_actions="pull,push", + headers={ + "Content-Length": str(len(manifest_bytes)), + "Content-Type": media_type, + }, + data=manifest_bytes, + allow_statuses=(201,), + username=username, + password=password, + ) + if status != 201: + raise RuntimeError(f"unexpected registry status when publishing manifest: {status}") + return { + "environment": spec.name, + "layout_dir": str(layout_dir), + "registry": resolved_registry, + "repository": resolved_repository, + "reference": resolved_reference, + "manifest_digest": headers.get("docker-content-digest", str(descriptor["digest"])), + "uploaded_blobs": uploaded_blobs, + } + + def validate_sources(paths: RuntimeBuildPaths, lock: RuntimeBuildLock) -> None: firecracker_source = _resolved_source_path(paths, lock.binaries["firecracker"]) jailer_source = _resolved_source_path(paths, lock.binaries["jailer"]) @@ -761,6 +1101,7 @@ def _build_parser() -> argparse.ArgumentParser: # pragma: no cover - CLI wiring "build-rootfs", "materialize", "export-environment-oci", + "publish-environment-oci", "stage-binaries", "stage-kernel", "stage-rootfs", @@ -778,7 +1119,12 @@ def _build_parser() -> argparse.ArgumentParser: # pragma: no cover - CLI wiring parser.add_argument("--materialized-dir", default=str(DEFAULT_RUNTIME_MATERIALIZED_DIR)) parser.add_argument("--environment") parser.add_argument("--output-dir", default=str(DEFAULT_RUNTIME_OCI_LAYOUT_DIR)) + parser.add_argument("--layout-root", default=str(DEFAULT_RUNTIME_OCI_LAYOUT_DIR)) + parser.add_argument("--registry") + parser.add_argument("--repository") parser.add_argument("--reference") + parser.add_argument("--username-env", default=DEFAULT_OCI_USERNAME_ENV) + parser.add_argument("--password-env", default=DEFAULT_OCI_PASSWORD_ENV) return parser @@ -791,14 +1137,16 @@ def main() -> None: # pragma: no cover - CLI wiring materialized_dir=Path(args.materialized_dir), platform=args.platform, ) - lock = _load_lock(paths) if args.command == "fetch-binaries": + lock = _load_lock(paths) materialize_binaries(paths, lock) return if args.command == "build-kernel": + lock = _load_lock(paths) materialize_kernel(paths, lock) return if args.command == "build-rootfs": + lock = _load_lock(paths) materialize_rootfs(paths, lock) return if args.command == "materialize": @@ -815,30 +1163,50 @@ def main() -> None: # pragma: no cover - CLI wiring ) print(json.dumps(result, indent=2, sort_keys=True)) return + if args.command == "publish-environment-oci": + if not isinstance(args.environment, str) or args.environment == "": + raise RuntimeError("--environment is required for publish-environment-oci") + result = publish_environment_oci_layout( + environment=args.environment, + layout_root=Path(args.layout_root), + registry=args.registry, + repository=args.repository, + reference=args.reference, + username_env=args.username_env, + password_env=args.password_env, + ) + print(json.dumps(result, indent=2, sort_keys=True)) + return if args.command == "bundle": build_bundle(paths, sync=True) return if args.command == "stage-binaries": + lock = _load_lock(paths) paths.build_platform_root.mkdir(parents=True, exist_ok=True) _copy_notice(paths) stage_binaries(paths, lock) return if args.command == "stage-kernel": + lock = _load_lock(paths) paths.build_platform_root.mkdir(parents=True, exist_ok=True) stage_kernel(paths, lock) return if args.command == "stage-rootfs": + lock = _load_lock(paths) paths.build_platform_root.mkdir(parents=True, exist_ok=True) stage_rootfs(paths, lock) return if args.command == "stage-agent": + lock = _load_lock(paths) paths.build_platform_root.mkdir(parents=True, exist_ok=True) stage_agent(paths, lock) return if args.command == "validate": + lock = _load_lock(paths) validate_sources(paths, lock) return if args.command == "manifest": + lock = _load_lock(paths) generate_manifest(paths, lock) return if args.command == "sync": diff --git a/tests/test_runtime_build.py b/tests/test_runtime_build.py index e84310d..6c66464 100644 --- a/tests/test_runtime_build.py +++ b/tests/test_runtime_build.py @@ -2,8 +2,12 @@ from __future__ import annotations import argparse import hashlib +import io import json import tarfile +import urllib.error +import urllib.request +from email.message import Message from pathlib import Path import pytest @@ -11,19 +15,61 @@ import pytest from pyro_mcp.runtime_build import ( _build_paths, _load_lock, + _load_oci_layout_manifest, + _request_registry, + _upload_blob, build_bundle, export_environment_oci_layout, generate_manifest, main, materialize_binaries, + materialize_sources, + publish_environment_oci_layout, stage_agent, stage_binaries, stage_kernel, stage_rootfs, + sync_bundle, validate_sources, ) +class FakeResponse: + def __init__( + self, + payload: bytes = b"", + *, + status: int = 200, + headers: dict[str, str] | None = None, + ) -> None: + self._buffer = io.BytesIO(payload) + self.status = status + self.headers = headers or {} + + def read(self, size: int = -1) -> bytes: + return self._buffer.read(size) + + def __enter__(self) -> FakeResponse: + return self + + def __exit__(self, exc_type: object, exc: object, tb: object) -> None: + del exc_type, exc, tb + + +def _http_headers(headers: dict[str, str]) -> Message: + message = Message() + for key, value in headers.items(): + message[key] = value + return message + + +def _request_header(request: urllib.request.Request, name: str) -> str | None: + for key, value in request.header_items(): + if key.lower() == name.lower(): + return value + return None + + def _write_text(path: Path, content: str) -> None: path.parent.mkdir(parents=True, exist_ok=True) path.write_text(content, encoding="utf-8") @@ -314,3 +360,399 @@ def test_runtime_build_main_dispatches_export( payload = json.loads(capsys.readouterr().out) assert payload["environment"] == "debian:12-base" assert payload["manifest_digest"] == "sha256:test" + + +def test_request_registry_retries_with_bearer_token(monkeypatch: pytest.MonkeyPatch) -> None: + requested_auth: list[str | None] = [] + + def fake_urlopen(request: object, timeout: int = 90) -> FakeResponse: + del timeout + url = request.full_url if isinstance(request, urllib.request.Request) else str(request) + if url.startswith("https://ghcr.io/token?"): + authorization = ( + _request_header(request, "Authorization") + if isinstance(request, urllib.request.Request) + else None + ) + requested_auth.append(authorization if isinstance(authorization, str) else None) + return FakeResponse(b'{"token":"secret-token"}') + if isinstance(request, urllib.request.Request): + authorization = _request_header(request, "Authorization") + else: + authorization = None + requested_auth.append(authorization if isinstance(authorization, str) else None) + if authorization is None: + raise urllib.error.HTTPError( + url, + 401, + "Unauthorized", + _http_headers( + { + "WWW-Authenticate": ( + 'Bearer realm="https://ghcr.io/token",' + 'service="ghcr.io",' + 'scope="repository:thaloco/pyro-environments/debian-12-base:pull,push"' + ) + } + ), + io.BytesIO(b""), + ) + return FakeResponse(b"ok", headers={"Docker-Content-Digest": "sha256:abc123"}) + + monkeypatch.setattr(urllib.request, "urlopen", fake_urlopen) + + status, payload, headers = _request_registry( + "https://ghcr.io/v2/thaloco/pyro-environments/debian-12-base/test", + method="GET", + repository="thaloco/pyro-environments/debian-12-base", + scope_actions="pull,push", + username="user", + password="token", + ) + + assert status == 200 + assert payload == b"ok" + assert headers["docker-content-digest"] == "sha256:abc123" + assert requested_auth[0] is None + assert requested_auth[1] is not None and requested_auth[1].startswith("Basic ") + assert requested_auth[2] == "Bearer secret-token" + + +def test_request_registry_requires_auth_challenge(monkeypatch: pytest.MonkeyPatch) -> None: + def fake_urlopen(request: object, timeout: int = 90) -> FakeResponse: + del timeout + url = request.full_url if isinstance(request, urllib.request.Request) else str(request) + raise urllib.error.HTTPError( + url, + 401, + "Unauthorized", + _http_headers({}), + io.BytesIO(b""), + ) + + monkeypatch.setattr(urllib.request, "urlopen", fake_urlopen) + + with pytest.raises(RuntimeError, match="denied access without an auth challenge"): + _request_registry( + "https://ghcr.io/v2/thaloco/pyro-environments/debian-12-base/test", + method="GET", + repository="thaloco/pyro-environments/debian-12-base", + scope_actions="pull,push", + ) + + +def test_request_registry_returns_allowed_retry_status(monkeypatch: pytest.MonkeyPatch) -> None: + def fake_urlopen(request: object, timeout: int = 90) -> FakeResponse: + del timeout + url = request.full_url if isinstance(request, urllib.request.Request) else str(request) + if url.startswith("https://ghcr.io/token?"): + return FakeResponse(b'{"token":"secret-token"}') + authorization = ( + _request_header(request, "Authorization") + if isinstance(request, urllib.request.Request) + else None + ) + if authorization is None: + raise urllib.error.HTTPError( + url, + 401, + "Unauthorized", + _http_headers( + { + "WWW-Authenticate": ( + 'Bearer realm="https://ghcr.io/token",' + 'service="ghcr.io",' + 'scope="repository:thaloco/pyro-environments/debian-12-base:pull,push"' + ) + } + ), + io.BytesIO(b""), + ) + raise urllib.error.HTTPError( + url, + 404, + "Not Found", + _http_headers({"Docker-Content-Digest": "sha256:missing"}), + io.BytesIO(b"missing"), + ) + + monkeypatch.setattr(urllib.request, "urlopen", fake_urlopen) + + status, payload, headers = _request_registry( + "https://ghcr.io/v2/thaloco/pyro-environments/debian-12-base/test", + method="GET", + repository="thaloco/pyro-environments/debian-12-base", + scope_actions="pull,push", + allow_statuses=(404,), + ) + + assert status == 404 + assert payload == b"missing" + assert headers["docker-content-digest"] == "sha256:missing" + + +@pytest.mark.parametrize( + ("index_payload", "message"), + [ + ({}, "exactly one manifest descriptor"), + ({"manifests": ["bad"]}, "descriptor is malformed"), + ({"manifests": [{}]}, "missing a digest"), + ], +) +def test_load_oci_layout_manifest_rejects_invalid_index( + tmp_path: Path, + index_payload: dict[str, object], + message: str, +) -> None: + layout_dir = tmp_path / "layout" + layout_dir.mkdir(parents=True) + (layout_dir / "index.json").write_text(json.dumps(index_payload), encoding="utf-8") + + with pytest.raises(RuntimeError, match=message): + _load_oci_layout_manifest(layout_dir) + + +def test_load_oci_layout_manifest_rejects_missing_index(tmp_path: Path) -> None: + with pytest.raises(RuntimeError, match="OCI layout index not found"): + _load_oci_layout_manifest(tmp_path / "missing-layout") + + +def test_upload_blob_skips_existing_blob(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None: + blob_path = tmp_path / "blob.tar" + blob_path.write_bytes(b"blob-data") + monkeypatch.setattr("pyro_mcp.runtime_build._blob_exists", lambda **_: True) + + result = _upload_blob( + registry="ghcr.io", + repository="thaloco/pyro-environments/debian-12-base", + digest="sha256:abc123", + blob_path=blob_path, + username=None, + password=None, + ) + + assert result == { + "digest": "sha256:abc123", + "size": len(b"blob-data"), + "uploaded": False, + } + + +def test_upload_blob_requires_location(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None: + blob_path = tmp_path / "blob.tar" + blob_path.write_bytes(b"blob-data") + monkeypatch.setattr("pyro_mcp.runtime_build._blob_exists", lambda **_: False) + monkeypatch.setattr( + "pyro_mcp.runtime_build._request_registry", + lambda *args, **kwargs: (202, b"", {}) if kwargs["method"] == "POST" else (201, b"", {}), + ) + + with pytest.raises(RuntimeError, match="did not return a blob upload location"): + _upload_blob( + registry="ghcr.io", + repository="thaloco/pyro-environments/debian-12-base", + digest="sha256:abc123", + blob_path=blob_path, + username=None, + password=None, + ) + + +def test_publish_environment_oci_layout_rejects_missing_registry_target( + monkeypatch: pytest.MonkeyPatch, + tmp_path: Path, +) -> None: + monkeypatch.setattr( + "pyro_mcp.runtime_build.get_environment", + lambda environment: type( + "Spec", + (), + { + "name": environment, + "version": "1.0.0", + "oci_registry": None, + "oci_repository": None, + "oci_reference": None, + }, + )(), + ) + + with pytest.raises(RuntimeError, match="does not define an OCI registry target"): + publish_environment_oci_layout( + environment="debian:12-base", + layout_root=tmp_path / "oci_layouts", + ) + + +def test_publish_environment_oci_layout_uploads_blobs_and_manifest( + monkeypatch: pytest.MonkeyPatch, + tmp_path: Path, +) -> None: + source_dir, build_dir, bundle_dir = _make_source_tree(tmp_path) + paths = _build_paths( + source_dir=source_dir, + build_dir=build_dir, + bundle_dir=bundle_dir, + materialized_dir=tmp_path / "materialized_sources", + platform="linux-x86_64", + ) + export_environment_oci_layout( + paths, + environment="debian:12-base", + output_dir=tmp_path / "oci_layouts", + ) + + uploads: list[tuple[str, int]] = [] + manifest_publish: list[tuple[str, str, int]] = [] + upload_counter = 0 + + def fake_urlopen(request: object, timeout: int = 90) -> FakeResponse: + del timeout + nonlocal upload_counter + if not isinstance(request, urllib.request.Request): + raise AssertionError("expected urllib request") + url = request.full_url + method = request.get_method() + if method == "HEAD" and "/blobs/" in url: + raise urllib.error.HTTPError( + url, + 404, + "Not Found", + _http_headers({}), + io.BytesIO(b""), + ) + if method == "POST" and url.endswith("/blobs/uploads/"): + upload_counter += 1 + return FakeResponse( + status=202, + headers={"Location": f"/upload/session-{upload_counter}"}, + ) + if method == "PUT" and "/upload/session-" in url: + digest = url.split("digest=", 1)[1] + body = request.data + if isinstance(body, bytes): + payload = body + elif body is not None and hasattr(body, "read"): + payload = body.read() + else: + raise AssertionError("expected upload body") + uploads.append((digest, len(payload))) + return FakeResponse(status=201) + if method == "PUT" and "/manifests/" in url: + body = request.data + if not isinstance(body, bytes): + raise AssertionError("expected manifest bytes") + manifest_publish.append( + ( + url, + str(_request_header(request, "Content-Type")), + len(body), + ) + ) + return FakeResponse( + status=201, + headers={"Docker-Content-Digest": "sha256:published-manifest"}, + ) + raise AssertionError(f"unexpected registry request: {method} {url}") + + monkeypatch.setattr(urllib.request, "urlopen", fake_urlopen) + + result = publish_environment_oci_layout( + environment="debian:12-base", + layout_root=tmp_path / "oci_layouts", + registry="ghcr.io", + repository="thaloco/pyro-environments/debian-12-base", + reference="1.0.0", + ) + + assert result["manifest_digest"] == "sha256:published-manifest" + assert len(result["uploaded_blobs"]) == 4 + assert all(bool(entry["uploaded"]) for entry in result["uploaded_blobs"]) + assert len(uploads) == 4 + assert len(manifest_publish) == 1 + assert manifest_publish[0][0].endswith("/manifests/1.0.0") + assert manifest_publish[0][1] == "application/vnd.oci.image.manifest.v1+json" + + +def test_sync_bundle_replaces_existing_bundle_dir(tmp_path: Path) -> None: + source_dir, build_dir, bundle_dir = _make_source_tree(tmp_path) + paths = _build_paths( + source_dir=source_dir, + build_dir=build_dir, + bundle_dir=bundle_dir, + materialized_dir=tmp_path / "materialized_sources", + platform="linux-x86_64", + ) + paths.build_platform_root.mkdir(parents=True, exist_ok=True) + (paths.build_root / "NOTICE").write_text("built notice\n", encoding="utf-8") + (paths.build_platform_root / "manifest.json").write_text("{}", encoding="utf-8") + existing_dir = bundle_dir / "linux-x86_64" + existing_dir.mkdir(parents=True, exist_ok=True) + (existing_dir / "stale.txt").write_text("stale\n", encoding="utf-8") + + sync_bundle(paths) + + assert not (existing_dir / "stale.txt").exists() + assert (existing_dir / "manifest.json").exists() + assert (bundle_dir / "NOTICE").read_text(encoding="utf-8") == "built notice\n" + + +def test_build_bundle_rejects_platform_mismatch(tmp_path: Path) -> None: + source_dir, build_dir, bundle_dir = _make_source_tree(tmp_path) + lock_path = source_dir / "linux-x86_64/runtime.lock.json" + lock = json.loads(lock_path.read_text(encoding="utf-8")) + lock["platform"] = "linux-aarch64" + lock_path.write_text(json.dumps(lock, indent=2) + "\n", encoding="utf-8") + paths = _build_paths( + source_dir=source_dir, + build_dir=build_dir, + bundle_dir=bundle_dir, + materialized_dir=tmp_path / "materialized_sources", + platform="linux-x86_64", + ) + + with pytest.raises(RuntimeError, match="does not match requested platform"): + build_bundle(paths, sync=False) + + +def test_materialize_sources_dispatches_steps( + monkeypatch: pytest.MonkeyPatch, + tmp_path: Path, +) -> None: + source_dir, build_dir, bundle_dir = _make_source_tree(tmp_path) + paths = _build_paths( + source_dir=source_dir, + build_dir=build_dir, + bundle_dir=bundle_dir, + materialized_dir=tmp_path / "materialized_sources", + platform="linux-x86_64", + ) + lock = _load_lock(paths) + calls: list[str] = [] + monkeypatch.setattr("pyro_mcp.runtime_build._load_lock", lambda _: lock) + monkeypatch.setattr( + "pyro_mcp.runtime_build.materialize_binaries", + lambda runtime_paths, runtime_lock: calls.append( + f"binaries:{runtime_paths.platform}:{runtime_lock.platform}" + ), + ) + monkeypatch.setattr( + "pyro_mcp.runtime_build.materialize_kernel", + lambda runtime_paths, runtime_lock: calls.append( + f"kernel:{runtime_paths.platform}:{runtime_lock.platform}" + ), + ) + monkeypatch.setattr( + "pyro_mcp.runtime_build.materialize_rootfs", + lambda runtime_paths, runtime_lock: calls.append( + f"rootfs:{runtime_paths.platform}:{runtime_lock.platform}" + ), + ) + + materialize_sources(paths) + + assert calls == [ + "binaries:linux-x86_64:linux-x86_64", + "kernel:linux-x86_64:linux-x86_64", + "rootfs:linux-x86_64:linux-x86_64", + ]