from __future__ import annotations import hashlib import io import json import tarfile import urllib.error import urllib.request from email.message import Message from pathlib import Path import pytest from pyro_mcp.runtime import RuntimePaths, resolve_runtime_paths from pyro_mcp.vm_environments import ( EnvironmentStore, VmEnvironment, get_environment, list_environments, ) class FakeResponse: def __init__(self, payload: bytes, *, headers: dict[str, str] | None = None) -> None: self._buffer = io.BytesIO(payload) self.headers = headers or {} def read(self, size: int = -1) -> bytes: return self._buffer.read(size) def __enter__(self) -> FakeResponse: return self def __exit__(self, exc_type: object, exc: object, tb: object) -> None: del exc_type, exc, tb def _fake_runtime_paths(tmp_path: Path) -> RuntimePaths: bundle_parent = tmp_path / "runtime" bundle_root = bundle_parent / "linux-x86_64" manifest_path = bundle_root / "manifest.json" firecracker_bin = bundle_root / "bin" / "firecracker" jailer_bin = bundle_root / "bin" / "jailer" guest_agent_path = bundle_root / "guest" / "pyro_guest_agent.py" guest_init_path = bundle_root / "guest" / "pyro-init" artifacts_dir = bundle_root / "profiles" notice_path = bundle_parent / "NOTICE" artifacts_dir.mkdir(parents=True, exist_ok=True) firecracker_bin.parent.mkdir(parents=True, exist_ok=True) jailer_bin.parent.mkdir(parents=True, exist_ok=True) guest_agent_path.parent.mkdir(parents=True, exist_ok=True) manifest_path.write_text('{"platform": "linux-x86_64"}\n', encoding="utf-8") firecracker_bin.write_text("firecracker\n", encoding="utf-8") jailer_bin.write_text("jailer\n", encoding="utf-8") guest_agent_path.write_text("print('guest')\n", encoding="utf-8") guest_init_path.write_text("#!/bin/sh\n", encoding="utf-8") notice_path.write_text("notice\n", encoding="utf-8") return RuntimePaths( bundle_root=bundle_root, manifest_path=manifest_path, firecracker_bin=firecracker_bin, jailer_bin=jailer_bin, guest_agent_path=guest_agent_path, guest_init_path=guest_init_path, artifacts_dir=artifacts_dir, notice_path=notice_path, manifest={"platform": "linux-x86_64"}, ) def _write_local_profile( runtime_paths: RuntimePaths, profile_name: str, *, kernel: str = "kernel\n", rootfs: str = "rootfs\n", ) -> None: profile_dir = runtime_paths.artifacts_dir / profile_name profile_dir.mkdir(parents=True, exist_ok=True) (profile_dir / "vmlinux").write_text(kernel, encoding="utf-8") (profile_dir / "rootfs.ext4").write_text(rootfs, encoding="utf-8") def _sha256_digest(payload: bytes) -> str: return f"sha256:{hashlib.sha256(payload).hexdigest()}" def _layer_archive(filename: str, content: bytes) -> bytes: archive_buffer = io.BytesIO() with tarfile.open(fileobj=archive_buffer, mode="w:gz") as archive: info = tarfile.TarInfo(name=filename) info.size = len(content) archive.addfile(info, io.BytesIO(content)) return archive_buffer.getvalue() def _authorization_header(request: object) -> str | None: if isinstance(request, urllib.request.Request): for key, value in request.header_items(): if key.lower() == "authorization": return value return None def _http_headers(headers: dict[str, str]) -> Message: message = Message() for key, value in headers.items(): message[key] = value return message def test_list_environments_includes_expected_entries() -> None: environments = list_environments(runtime_paths=resolve_runtime_paths()) names = {str(entry["name"]) for entry in environments} assert {"debian:12", "debian:12-base", "debian:12-build"} <= names def test_get_environment_rejects_unknown() -> None: with pytest.raises(ValueError, match="unknown environment"): get_environment("does-not-exist") def test_environment_store_installs_from_local_runtime_source(tmp_path: Path) -> None: runtime_paths = _fake_runtime_paths(tmp_path) _write_local_profile(runtime_paths, "debian-git") store = EnvironmentStore(runtime_paths=runtime_paths, cache_dir=tmp_path / "cache") installed = store.ensure_installed("debian:12") assert installed.kernel_image.exists() assert installed.rootfs_image.exists() assert (installed.install_dir / "environment.json").exists() def test_environment_store_pull_and_cached_inspect(tmp_path: Path) -> None: runtime_paths = _fake_runtime_paths(tmp_path) _write_local_profile(runtime_paths, "debian-git") store = EnvironmentStore(runtime_paths=runtime_paths, cache_dir=tmp_path / "cache") before = store.inspect_environment("debian:12") assert before["installed"] is False pulled = store.pull_environment("debian:12") assert pulled["installed"] is True assert "install_manifest" in pulled cached = store.ensure_installed("debian:12") assert cached.installed is True after = store.inspect_environment("debian:12") assert after["installed"] is True assert "install_manifest" in after def test_environment_store_uses_env_override_for_default_cache_dir( monkeypatch: pytest.MonkeyPatch, tmp_path: Path ) -> None: monkeypatch.setenv("PYRO_ENVIRONMENT_CACHE_DIR", str(tmp_path / "override-cache")) store = EnvironmentStore(runtime_paths=resolve_runtime_paths()) assert store.cache_dir == tmp_path / "override-cache" def test_environment_store_installs_from_archive_when_runtime_source_missing( tmp_path: Path, monkeypatch: pytest.MonkeyPatch ) -> None: runtime_paths = _fake_runtime_paths(tmp_path) source_environment = get_environment("debian:12-base", runtime_paths=runtime_paths) archive_dir = tmp_path / "archive" archive_dir.mkdir(parents=True, exist_ok=True) (archive_dir / "vmlinux").write_text("kernel\n", encoding="utf-8") (archive_dir / "rootfs.ext4").write_text("rootfs\n", encoding="utf-8") archive_path = tmp_path / "environment.tgz" with tarfile.open(archive_path, "w:gz") as archive: archive.add(archive_dir / "vmlinux", arcname="vmlinux") archive.add(archive_dir / "rootfs.ext4", arcname="rootfs.ext4") monkeypatch.setattr( "pyro_mcp.vm_environments.CATALOG", { "debian:12-base": source_environment.__class__( name=source_environment.name, version=source_environment.version, description=source_environment.description, default_packages=source_environment.default_packages, distribution=source_environment.distribution, distribution_version=source_environment.distribution_version, source_profile=source_environment.source_profile, platform=source_environment.platform, source_url=archive_path.resolve().as_uri(), source_digest=source_environment.source_digest, compatibility=source_environment.compatibility, ) }, ) store = EnvironmentStore( runtime_paths=runtime_paths, cache_dir=tmp_path / "cache", ) installed = store.ensure_installed("debian:12-base") assert installed.kernel_image.read_text(encoding="utf-8") == "kernel\n" assert installed.rootfs_image.read_text(encoding="utf-8") == "rootfs\n" def test_environment_store_skips_empty_local_source_dir_and_uses_archive( tmp_path: Path, monkeypatch: pytest.MonkeyPatch ) -> None: runtime_paths = _fake_runtime_paths(tmp_path) source_environment = get_environment("debian:12-base", runtime_paths=runtime_paths) (runtime_paths.artifacts_dir / source_environment.source_profile).mkdir( parents=True, exist_ok=True, ) archive_dir = tmp_path / "archive" archive_dir.mkdir(parents=True, exist_ok=True) (archive_dir / "vmlinux").write_text("kernel\n", encoding="utf-8") (archive_dir / "rootfs.ext4").write_text("rootfs\n", encoding="utf-8") archive_path = tmp_path / "environment.tgz" with tarfile.open(archive_path, "w:gz") as archive: archive.add(archive_dir / "vmlinux", arcname="vmlinux") archive.add(archive_dir / "rootfs.ext4", arcname="rootfs.ext4") monkeypatch.setattr( "pyro_mcp.vm_environments.CATALOG", { "debian:12-base": source_environment.__class__( name=source_environment.name, version=source_environment.version, description=source_environment.description, default_packages=source_environment.default_packages, distribution=source_environment.distribution, distribution_version=source_environment.distribution_version, source_profile=source_environment.source_profile, platform=source_environment.platform, source_url=archive_path.resolve().as_uri(), source_digest=source_environment.source_digest, compatibility=source_environment.compatibility, ) }, ) store = EnvironmentStore(runtime_paths=runtime_paths, cache_dir=tmp_path / "cache") installed = store.ensure_installed("debian:12-base") assert installed.kernel_image.read_text(encoding="utf-8") == "kernel\n" assert installed.rootfs_image.read_text(encoding="utf-8") == "rootfs\n" def test_environment_store_marks_broken_symlink_install_uninstalled_and_repairs_it( tmp_path: Path, ) -> None: runtime_paths = _fake_runtime_paths(tmp_path) _write_local_profile( runtime_paths, "debian-git", kernel="kernel-fixed\n", rootfs="rootfs-fixed\n", ) store = EnvironmentStore(runtime_paths=runtime_paths, cache_dir=tmp_path / "cache") spec = get_environment("debian:12", runtime_paths=runtime_paths) install_dir = store.cache_dir / "linux-x86_64" / "debian_12-1.0.0" install_dir.mkdir(parents=True, exist_ok=True) (install_dir / "environment.json").write_text( json.dumps( { "catalog_version": "2.0.0", "name": spec.name, "version": spec.version, "source": "bundled-runtime-source", "source_digest": spec.source_digest, "installed_at": 0, } ), encoding="utf-8", ) (install_dir / "vmlinux").symlink_to("missing-vmlinux") (install_dir / "rootfs.ext4").symlink_to("missing-rootfs.ext4") inspected_before = store.inspect_environment("debian:12") assert inspected_before["installed"] is False pulled = store.pull_environment("debian:12") assert pulled["installed"] is True assert Path(str(pulled["kernel_image"])).read_text(encoding="utf-8") == "kernel-fixed\n" assert Path(str(pulled["rootfs_image"])).read_text(encoding="utf-8") == "rootfs-fixed\n" def test_environment_store_prunes_stale_entries(tmp_path: Path) -> None: store = EnvironmentStore(runtime_paths=resolve_runtime_paths(), cache_dir=tmp_path / "cache") platform_dir = store.cache_dir / "linux-x86_64" platform_dir.mkdir(parents=True, exist_ok=True) (platform_dir / ".partial-download").mkdir() (platform_dir / "missing-marker").mkdir() invalid = platform_dir / "invalid" invalid.mkdir() (invalid / "environment.json").write_text('{"name": 1, "version": 2}', encoding="utf-8") unknown = platform_dir / "unknown" unknown.mkdir() (unknown / "environment.json").write_text( '{"name": "unknown:1", "version": "1.0.0"}', encoding="utf-8", ) stale = platform_dir / "stale" stale.mkdir() (stale / "environment.json").write_text( '{"name": "debian:12", "version": "0.9.0"}', encoding="utf-8", ) result = store.prune_environments() assert result["count"] == 5 def test_fetch_oci_manifest_resolves_linux_amd64_index_with_bearer_auth( tmp_path: Path, monkeypatch: pytest.MonkeyPatch ) -> None: runtime_paths = _fake_runtime_paths(tmp_path) store = EnvironmentStore(runtime_paths=runtime_paths, cache_dir=tmp_path / "cache") spec = VmEnvironment( name="debian:12-registry", version="1.0.0", description="OCI-backed environment", default_packages=("bash", "git"), distribution="debian", distribution_version="12", source_profile="missing-profile", oci_registry="registry-1.docker.io", oci_repository="thalesmaciel/pyro-environment-debian-12", oci_reference="1.0.0", ) child_manifest = { "schemaVersion": 2, "mediaType": "application/vnd.oci.image.manifest.v1+json", "layers": [], } child_payload = json.dumps(child_manifest).encode("utf-8") child_digest = _sha256_digest(child_payload) index_manifest = { "schemaVersion": 2, "mediaType": "application/vnd.oci.image.index.v1+json", "manifests": [ { "digest": "sha256:arm64digest", "mediaType": "application/vnd.oci.image.manifest.v1+json", "platform": {"os": "linux", "architecture": "arm64"}, }, { "digest": child_digest, "mediaType": "application/vnd.oci.image.manifest.v1+json", "platform": {"os": "linux", "architecture": "amd64"}, }, ], } index_payload = json.dumps(index_manifest).encode("utf-8") index_digest = _sha256_digest(index_payload) authorized_urls: list[str] = [] def fake_urlopen(request: object, timeout: int = 90) -> FakeResponse: del timeout url = request.full_url if isinstance(request, urllib.request.Request) else str(request) if url.startswith("https://auth.docker.io/token?"): return FakeResponse(b'{"token":"secret-token"}') authorization = _authorization_header(request) if url.endswith("/manifests/1.0.0"): if authorization is None: raise urllib.error.HTTPError( url, 401, "Unauthorized", _http_headers( { "WWW-Authenticate": ( 'Bearer realm="https://auth.docker.io/token",' 'service="registry.docker.io",' 'scope="repository:thalesmaciel/pyro-environment-debian-12:pull"' ) } ), io.BytesIO(b""), ) authorized_urls.append(url) return FakeResponse( index_payload, headers={"Docker-Content-Digest": index_digest}, ) if url.endswith(f"/manifests/{child_digest}"): if authorization is None: raise urllib.error.HTTPError( url, 401, "Unauthorized", _http_headers( { "WWW-Authenticate": ( 'Bearer realm="https://auth.docker.io/token",' 'service="registry.docker.io",' 'scope="repository:thalesmaciel/pyro-environment-debian-12:pull"' ) } ), io.BytesIO(b""), ) authorized_urls.append(url) assert authorization == "Bearer secret-token" return FakeResponse( child_payload, headers={"Docker-Content-Digest": child_digest}, ) raise AssertionError(f"unexpected OCI request: {url}") monkeypatch.setattr(urllib.request, "urlopen", fake_urlopen) manifest, resolved_digest = store._fetch_oci_manifest(spec) # noqa: SLF001 assert manifest == child_manifest assert resolved_digest == child_digest assert authorized_urls == [ "https://registry-1.docker.io/v2/thalesmaciel/pyro-environment-debian-12/manifests/1.0.0", ( "https://registry-1.docker.io/v2/thalesmaciel/pyro-environment-debian-12/" f"manifests/{child_digest}" ), ] def test_environment_store_installs_from_oci_when_runtime_source_missing( tmp_path: Path, monkeypatch: pytest.MonkeyPatch ) -> None: runtime_paths = _fake_runtime_paths(tmp_path) kernel_layer = _layer_archive("vmlinux", b"kernel\n") rootfs_layer = _layer_archive("rootfs.ext4", b"rootfs\n") kernel_digest = _sha256_digest(kernel_layer) rootfs_digest = _sha256_digest(rootfs_layer) manifest = { "schemaVersion": 2, "mediaType": "application/vnd.oci.image.manifest.v1+json", "layers": [ { "mediaType": "application/vnd.oci.image.layer.v1.tar+gzip", "digest": kernel_digest, "size": len(kernel_layer), }, { "mediaType": "application/vnd.oci.image.layer.v1.tar+gzip", "digest": rootfs_digest, "size": len(rootfs_layer), }, ], } manifest_payload = json.dumps(manifest).encode("utf-8") manifest_digest = _sha256_digest(manifest_payload) environment = VmEnvironment( name="debian:12-registry", version="1.0.0", description="OCI-backed environment", default_packages=("bash", "git"), distribution="debian", distribution_version="12", source_profile="missing-profile", oci_registry="registry-1.docker.io", oci_repository="thalesmaciel/pyro-environment-debian-12", oci_reference="1.0.0", ) def fake_urlopen(request: object, timeout: int = 90) -> FakeResponse: del timeout url = request.full_url if isinstance(request, urllib.request.Request) else str(request) if url.endswith("/manifests/1.0.0"): return FakeResponse( manifest_payload, headers={"Docker-Content-Digest": manifest_digest}, ) if url.endswith(f"/blobs/{kernel_digest}"): return FakeResponse(kernel_layer) if url.endswith(f"/blobs/{rootfs_digest}"): return FakeResponse(rootfs_layer) raise AssertionError(f"unexpected OCI request: {url}") monkeypatch.setattr(urllib.request, "urlopen", fake_urlopen) monkeypatch.setattr( "pyro_mcp.vm_environments.CATALOG", {environment.name: environment}, ) store = EnvironmentStore(runtime_paths=runtime_paths, cache_dir=tmp_path / "cache") installed = store.ensure_installed(environment.name) assert installed.kernel_image.read_text(encoding="utf-8") == "kernel\n" assert installed.rootfs_image.read_text(encoding="utf-8") == "rootfs\n" assert installed.source == ( "oci://registry-1.docker.io/thalesmaciel/pyro-environment-debian-12" f"@{manifest_digest}" ) metadata = json.loads((installed.install_dir / "environment.json").read_text(encoding="utf-8")) assert metadata["source_digest"] == manifest_digest