"""Server-scoped project startup source helpers for MCP chat flows.""" from __future__ import annotations import shutil import subprocess import tempfile from contextlib import contextmanager from dataclasses import dataclass from pathlib import Path from typing import Iterator, Literal ProjectStartupSourceKind = Literal["project_path", "repo_url"] @dataclass(frozen=True) class ProjectStartupSource: """Server-scoped default source for workspace creation.""" kind: ProjectStartupSourceKind origin_ref: str resolved_path: Path | None = None repo_ref: str | None = None def _run_git(command: list[str], *, cwd: Path | None = None) -> subprocess.CompletedProcess[str]: return subprocess.run( # noqa: S603 command, cwd=str(cwd) if cwd is not None else None, check=False, capture_output=True, text=True, ) def _detect_git_root(start_dir: Path) -> Path | None: result = _run_git(["git", "rev-parse", "--show-toplevel"], cwd=start_dir) if result.returncode != 0: return None stdout = result.stdout.strip() if stdout == "": return None return Path(stdout).expanduser().resolve() def _resolve_project_path(project_path: str | Path, *, cwd: Path) -> Path: resolved = Path(project_path).expanduser() if not resolved.is_absolute(): resolved = (cwd / resolved).resolve() else: resolved = resolved.resolve() if not resolved.exists(): raise ValueError(f"project_path {resolved} does not exist") if not resolved.is_dir(): raise ValueError(f"project_path {resolved} must be a directory") git_root = _detect_git_root(resolved) if git_root is not None: return git_root return resolved def resolve_project_startup_source( *, project_path: str | Path | None = None, repo_url: str | None = None, repo_ref: str | None = None, no_project_source: bool = False, cwd: Path | None = None, ) -> ProjectStartupSource | None: working_dir = Path.cwd() if cwd is None else cwd.resolve() if no_project_source: if project_path is not None or repo_url is not None or repo_ref is not None: raise ValueError( "--no-project-source cannot be combined with --project-path, " "--repo-url, or --repo-ref" ) return None if project_path is not None and repo_url is not None: raise ValueError("--project-path and --repo-url are mutually exclusive") if repo_ref is not None and repo_url is None: raise ValueError("--repo-ref requires --repo-url") if project_path is not None: resolved_path = _resolve_project_path(project_path, cwd=working_dir) return ProjectStartupSource( kind="project_path", origin_ref=str(resolved_path), resolved_path=resolved_path, ) if repo_url is not None: normalized_repo_url = repo_url.strip() if normalized_repo_url == "": raise ValueError("--repo-url must not be empty") normalized_repo_ref = None if repo_ref is None else repo_ref.strip() if normalized_repo_ref == "": raise ValueError("--repo-ref must not be empty") return ProjectStartupSource( kind="repo_url", origin_ref=normalized_repo_url, repo_ref=normalized_repo_ref, ) detected_root = _detect_git_root(working_dir) if detected_root is None: return None return ProjectStartupSource( kind="project_path", origin_ref=str(detected_root), resolved_path=detected_root, ) @contextmanager def materialize_project_startup_source(source: ProjectStartupSource) -> Iterator[Path]: if source.kind == "project_path": if source.resolved_path is None: raise RuntimeError("project_path source is missing a resolved path") yield source.resolved_path return temp_dir = Path(tempfile.mkdtemp(prefix="pyro-project-source-")) clone_dir = temp_dir / "clone" try: clone_result = _run_git(["git", "clone", "--quiet", source.origin_ref, str(clone_dir)]) if clone_result.returncode != 0: stderr = clone_result.stderr.strip() or "git clone failed" raise RuntimeError(f"failed to clone repo_url {source.origin_ref!r}: {stderr}") if source.repo_ref is not None: checkout_result = _run_git( ["git", "checkout", "--quiet", source.repo_ref], cwd=clone_dir, ) if checkout_result.returncode != 0: stderr = checkout_result.stderr.strip() or "git checkout failed" raise RuntimeError( f"failed to checkout repo_ref {source.repo_ref!r} for " f"repo_url {source.origin_ref!r}: {stderr}" ) yield clone_dir finally: shutil.rmtree(temp_dir, ignore_errors=True) def describe_project_startup_source(source: ProjectStartupSource | None) -> str | None: if source is None: return None if source.kind == "project_path": return f"the current project at {source.origin_ref}" if source.repo_ref is None: return f"the clean clone source {source.origin_ref}" return f"the clean clone source {source.origin_ref} at ref {source.repo_ref}"