"""Stopped-workspace disk export and offline inspection helpers.""" from __future__ import annotations import re import shutil import subprocess import tempfile from dataclasses import dataclass from pathlib import Path, PurePosixPath from typing import Literal WorkspaceDiskArtifactType = Literal["file", "directory", "symlink"] WORKSPACE_DISK_RUNTIME_ONLY_PATHS = ( "/run/pyro-secrets", "/run/pyro-shells", "/run/pyro-services", ) _DEBUGFS_LS_RE = re.compile( r"^/(?P\d+)/(?P\d+)/(?P\d+)/(?P\d+)/(?P.*)/(?P\d*)/$" ) _DEBUGFS_SIZE_RE = re.compile(r"Size:\s+(?P\d+)") _DEBUGFS_TYPE_RE = re.compile(r"Type:\s+(?P\w+)") _DEBUGFS_LINK_RE = re.compile(r'Fast link dest:\s+"(?P.*)"') @dataclass(frozen=True) class WorkspaceDiskEntry: """One inspectable path from a stopped workspace rootfs image.""" path: str artifact_type: WorkspaceDiskArtifactType size_bytes: int link_target: str | None = None def to_payload(self) -> dict[str, str | int | None]: return { "path": self.path, "artifact_type": self.artifact_type, "size_bytes": self.size_bytes, "link_target": self.link_target, } @dataclass(frozen=True) class _DebugfsStat: path: str artifact_type: WorkspaceDiskArtifactType size_bytes: int link_target: str | None = None @dataclass(frozen=True) class _DebugfsDirEntry: name: str path: str artifact_type: WorkspaceDiskArtifactType | None size_bytes: int def export_workspace_disk_image(rootfs_image: Path, *, output_path: Path) -> dict[str, str | int]: """Copy one stopped workspace rootfs image to the requested host path.""" output_path.parent.mkdir(parents=True, exist_ok=True) if output_path.exists() or output_path.is_symlink(): raise RuntimeError(f"output_path already exists: {output_path}") shutil.copy2(rootfs_image, output_path) return { "output_path": str(output_path), "disk_format": "ext4", "bytes_written": output_path.stat().st_size, } def list_workspace_disk( rootfs_image: Path, *, guest_path: str, recursive: bool, ) -> list[dict[str, str | int | None]]: """Return inspectable entries from one stopped workspace rootfs path.""" target = _debugfs_stat(rootfs_image, guest_path) if target is None: raise RuntimeError(f"workspace disk path does not exist: {guest_path}") if target.artifact_type != "directory": return [WorkspaceDiskEntry(**target.__dict__).to_payload()] entries: list[WorkspaceDiskEntry] = [] def walk(current_path: str) -> None: children = _debugfs_ls_entries(rootfs_image, current_path) for child in children: if child.artifact_type is None: continue link_target = None if child.artifact_type == "symlink": child_stat = _debugfs_stat(rootfs_image, child.path) link_target = None if child_stat is None else child_stat.link_target entries.append( WorkspaceDiskEntry( path=child.path, artifact_type=child.artifact_type, size_bytes=child.size_bytes, link_target=link_target, ) ) if recursive and child.artifact_type == "directory": walk(child.path) walk(guest_path) entries.sort(key=lambda item: item.path) return [entry.to_payload() for entry in entries] def read_workspace_disk_file( rootfs_image: Path, *, guest_path: str, max_bytes: int, ) -> dict[str, str | int | bool]: """Read one regular file from a stopped workspace rootfs image.""" target = _debugfs_stat(rootfs_image, guest_path) if target is None: raise RuntimeError(f"workspace disk path does not exist: {guest_path}") if target.artifact_type != "file": raise RuntimeError("workspace disk read only supports regular files") if max_bytes <= 0: raise ValueError("max_bytes must be positive") with tempfile.TemporaryDirectory(prefix="pyro-workspace-disk-read-") as temp_dir: dumped_path = Path(temp_dir) / "workspace-disk-read.bin" _run_debugfs(rootfs_image, f"dump {guest_path} {dumped_path}") if not dumped_path.exists(): raise RuntimeError(f"failed to dump workspace disk file: {guest_path}") raw_bytes = dumped_path.read_bytes() return { "path": guest_path, "size_bytes": len(raw_bytes), "max_bytes": max_bytes, "content": raw_bytes[:max_bytes].decode("utf-8", errors="replace"), "truncated": len(raw_bytes) > max_bytes, } def scrub_workspace_runtime_paths(rootfs_image: Path) -> None: """Remove runtime-only guest paths from a stopped workspace rootfs image.""" for guest_path in WORKSPACE_DISK_RUNTIME_ONLY_PATHS: _debugfs_remove_tree(rootfs_image, guest_path) def _run_debugfs(rootfs_image: Path, command: str, *, writable: bool = False) -> str: debugfs_path = shutil.which("debugfs") if debugfs_path is None: raise RuntimeError("debugfs is required for workspace disk operations") debugfs_command = [debugfs_path] if writable: debugfs_command.append("-w") proc = subprocess.run( # noqa: S603 [*debugfs_command, "-R", command, str(rootfs_image)], text=True, capture_output=True, check=False, ) combined = proc.stdout if proc.stderr != "": combined = combined + ("\n" if combined != "" else "") + proc.stderr output = _strip_debugfs_banner(combined) if proc.returncode != 0: message = output.strip() if message == "": message = f"debugfs command failed: {command}" raise RuntimeError(message) return output.strip() def _strip_debugfs_banner(output: str) -> str: lines = output.splitlines() while lines and lines[0].startswith("debugfs "): lines.pop(0) return "\n".join(lines) def _debugfs_missing(output: str) -> bool: return "File not found by ext2_lookup" in output or "File not found by ext2fs_lookup" in output def _artifact_type_from_mode(mode: str) -> WorkspaceDiskArtifactType | None: if mode.startswith("04"): return "directory" if mode.startswith("10"): return "file" if mode.startswith("12"): return "symlink" return None def _debugfs_stat(rootfs_image: Path, guest_path: str) -> _DebugfsStat | None: output = _run_debugfs(rootfs_image, f"stat {guest_path}") if _debugfs_missing(output): return None type_match = _DEBUGFS_TYPE_RE.search(output) size_match = _DEBUGFS_SIZE_RE.search(output) if type_match is None or size_match is None: raise RuntimeError(f"failed to inspect workspace disk path: {guest_path}") raw_type = type_match.group("type") artifact_type: WorkspaceDiskArtifactType if raw_type == "directory": artifact_type = "directory" elif raw_type == "regular": artifact_type = "file" elif raw_type == "symlink": artifact_type = "symlink" else: raise RuntimeError(f"unsupported workspace disk path type: {guest_path}") link_target = None if artifact_type == "symlink": link_match = _DEBUGFS_LINK_RE.search(output) if link_match is not None: link_target = link_match.group("target") return _DebugfsStat( path=guest_path, artifact_type=artifact_type, size_bytes=int(size_match.group("size")), link_target=link_target, ) def _debugfs_ls_entries(rootfs_image: Path, guest_path: str) -> list[_DebugfsDirEntry]: output = _run_debugfs(rootfs_image, f"ls -p {guest_path}") if _debugfs_missing(output): raise RuntimeError(f"workspace disk path does not exist: {guest_path}") entries: list[_DebugfsDirEntry] = [] base = PurePosixPath(guest_path) for raw_line in output.splitlines(): line = raw_line.strip() if line == "": continue match = _DEBUGFS_LS_RE.match(line) if match is None: continue name = match.group("name") if name in {".", ".."}: continue child_path = str(base / name) if str(base) != "/" else f"/{name}" entries.append( _DebugfsDirEntry( name=name, path=child_path, artifact_type=_artifact_type_from_mode(match.group("mode")), size_bytes=int(match.group("size") or "0"), ) ) return entries def _debugfs_remove_tree(rootfs_image: Path, guest_path: str) -> None: stat_result = _debugfs_stat(rootfs_image, guest_path) if stat_result is None: return if stat_result.artifact_type == "directory": for child in _debugfs_ls_entries(rootfs_image, guest_path): _debugfs_remove_tree(rootfs_image, child.path) _run_debugfs(rootfs_image, f"rmdir {guest_path}", writable=True) return _run_debugfs(rootfs_image, f"rm {guest_path}", writable=True)