from __future__ import annotations import os from pathlib import Path import pytest from pyro_mcp.workspace_files import ( WORKSPACE_FILE_MAX_BYTES, WORKSPACE_PATCH_MAX_BYTES, WorkspacePatchHunk, WorkspaceTextPatch, apply_unified_text_patch, delete_workspace_path, list_workspace_files, normalize_workspace_path, parse_unified_text_patch, read_workspace_file, workspace_host_path, write_workspace_file, ) def test_workspace_files_list_read_write_and_delete(tmp_path: Path) -> None: workspace_dir = tmp_path / "workspace" workspace_dir.mkdir() (workspace_dir / "src").mkdir() (workspace_dir / "src" / "note.txt").write_text("hello\n", encoding="utf-8") os.symlink("note.txt", workspace_dir / "src" / "note-link") listing = list_workspace_files( workspace_dir, workspace_path="/workspace/src", recursive=True, ) assert listing.path == "/workspace/src" assert listing.artifact_type == "directory" assert [entry.to_payload() for entry in listing.entries] == [ { "path": "/workspace/src/note-link", "artifact_type": "symlink", "size_bytes": 8, "link_target": "note.txt", }, { "path": "/workspace/src/note.txt", "artifact_type": "file", "size_bytes": 6, "link_target": None, }, ] read_payload = read_workspace_file( workspace_dir, workspace_path="/workspace/src/note.txt", ) assert read_payload.content_bytes == b"hello\n" written = write_workspace_file( workspace_dir, workspace_path="/workspace/generated/out.txt", text="generated\n", ) assert written.bytes_written == 10 assert (workspace_dir / "generated" / "out.txt").read_text(encoding="utf-8") == "generated\n" deleted = delete_workspace_path( workspace_dir, workspace_path="/workspace/generated/out.txt", ) assert deleted.deleted is True assert not (workspace_dir / "generated" / "out.txt").exists() def test_workspace_file_read_and_delete_reject_unsupported_paths(tmp_path: Path) -> None: workspace_dir = tmp_path / "workspace" workspace_dir.mkdir() (workspace_dir / "dir").mkdir() (workspace_dir / "file.txt").write_text("ok\n", encoding="utf-8") os.symlink("file.txt", workspace_dir / "link.txt") with pytest.raises(RuntimeError, match="regular files"): read_workspace_file(workspace_dir, workspace_path="/workspace/dir") with pytest.raises(RuntimeError, match="regular files"): read_workspace_file(workspace_dir, workspace_path="/workspace/link.txt") with pytest.raises(RuntimeError, match="does not support directories"): delete_workspace_path(workspace_dir, workspace_path="/workspace/dir") def test_workspace_file_helpers_cover_single_paths_and_path_validation(tmp_path: Path) -> None: workspace_dir = tmp_path / "workspace" workspace_dir.mkdir() (workspace_dir / "note.txt").write_text("hello\n", encoding="utf-8") listing = list_workspace_files( workspace_dir, workspace_path="/workspace/note.txt", recursive=False, ) assert listing.path == "/workspace/note.txt" assert listing.artifact_type == "file" assert [entry.path for entry in listing.entries] == ["/workspace/note.txt"] assert normalize_workspace_path("src/app.py") == "/workspace/src/app.py" assert workspace_host_path(workspace_dir, "src/app.py") == workspace_dir / "src" / "app.py" with pytest.raises(ValueError, match="must not be empty"): normalize_workspace_path(" ") with pytest.raises(ValueError, match="must stay inside /workspace"): normalize_workspace_path("..") with pytest.raises(ValueError, match="must stay inside /workspace"): normalize_workspace_path("/tmp/outside") with pytest.raises(ValueError, match="must stay inside /workspace"): normalize_workspace_path("/") def test_workspace_file_read_limits_and_write_validation(tmp_path: Path) -> None: workspace_dir = tmp_path / "workspace" workspace_dir.mkdir() (workspace_dir / "big.txt").write_text("hello\n", encoding="utf-8") (workspace_dir / "dir").mkdir() real_dir = workspace_dir / "real" real_dir.mkdir() os.symlink("real", workspace_dir / "linked") with pytest.raises(ValueError, match="max_bytes must be positive"): read_workspace_file(workspace_dir, workspace_path="/workspace/big.txt", max_bytes=0) with pytest.raises(ValueError, match="at most"): read_workspace_file( workspace_dir, workspace_path="/workspace/big.txt", max_bytes=WORKSPACE_FILE_MAX_BYTES + 1, ) with pytest.raises(RuntimeError, match="exceeds the maximum supported size"): read_workspace_file(workspace_dir, workspace_path="/workspace/big.txt", max_bytes=4) with pytest.raises(RuntimeError, match="regular file targets"): write_workspace_file(workspace_dir, workspace_path="/workspace/dir", text="nope\n") with pytest.raises(RuntimeError, match="symlinked parent"): write_workspace_file( workspace_dir, workspace_path="/workspace/linked/out.txt", text="nope\n", ) with pytest.raises(ValueError, match="at most"): write_workspace_file( workspace_dir, workspace_path="/workspace/huge.txt", text="x" * (WORKSPACE_FILE_MAX_BYTES + 1), ) def test_workspace_file_list_rejects_unsupported_filesystem_types(tmp_path: Path) -> None: workspace_dir = tmp_path / "workspace" workspace_dir.mkdir() fifo_path = workspace_dir / "pipe" os.mkfifo(fifo_path) with pytest.raises(RuntimeError, match="unsupported workspace path type"): list_workspace_files(workspace_dir, workspace_path="/workspace", recursive=True) def test_parse_and_apply_unified_text_patch_round_trip() -> None: patch_text = """--- a/src/app.py +++ b/src/app.py @@ -1,2 +1,3 @@ print("old") -print("bug") +print("fixed") +print("done") --- /dev/null +++ b/src/new.py @@ -0,0 +1 @@ +print("new") --- a/src/remove.py +++ /dev/null @@ -1 +0,0 @@ -print("remove") """ patches = parse_unified_text_patch(patch_text) assert [(item.path, item.status) for item in patches] == [ ("/workspace/src/app.py", "modified"), ("/workspace/src/new.py", "added"), ("/workspace/src/remove.py", "deleted"), ] modified = apply_unified_text_patch( path="/workspace/src/app.py", patch=patches[0], before_text='print("old")\nprint("bug")\n', ) added = apply_unified_text_patch( path="/workspace/src/new.py", patch=patches[1], before_text=None, ) deleted = apply_unified_text_patch( path="/workspace/src/remove.py", patch=patches[2], before_text='print("remove")\n', ) assert modified == 'print("old")\nprint("fixed")\nprint("done")\n' assert added == 'print("new")\n' assert deleted is None def test_parse_unified_text_patch_rejects_unsupported_features() -> None: with pytest.raises(ValueError, match="unsupported patch feature"): parse_unified_text_patch( """diff --git a/file.txt b/file.txt old mode 100644 --- a/file.txt +++ b/file.txt @@ -1 +1 @@ -old +new """ ) with pytest.raises(ValueError, match="rename and copy patches are not supported"): parse_unified_text_patch( """--- a/old.txt +++ b/new.txt @@ -1 +1 @@ -old +new """ ) def test_parse_unified_text_patch_handles_git_headers_and_validation_errors() -> None: parsed = parse_unified_text_patch( """ diff --git a/file.txt b/file.txt index 1234567..89abcde 100644 --- /workspace/file.txt +++ /workspace/file.txt @@ -1 +1 @@ -old +new \\ No newline at end of file """ ) assert parsed[0].path == "/workspace/file.txt" with pytest.raises(ValueError, match="must not be empty"): parse_unified_text_patch("") with pytest.raises(ValueError, match="invalid patch header"): parse_unified_text_patch("oops\n") with pytest.raises(ValueError, match="missing '\\+\\+\\+' header"): parse_unified_text_patch("--- a/file.txt\n") with pytest.raises(ValueError, match="has no hunks"): parse_unified_text_patch("--- a/file.txt\n+++ b/file.txt\n") with pytest.raises(ValueError, match="line counts do not match"): parse_unified_text_patch( """--- a/file.txt +++ b/file.txt @@ -1,2 +1,1 @@ -old +new """ ) with pytest.raises(ValueError, match="invalid patch hunk line"): parse_unified_text_patch( """--- a/file.txt +++ b/file.txt @@ -1 +1 @@ ?bad """ ) with pytest.raises(ValueError, match="at most"): parse_unified_text_patch("x" * (WORKSPACE_PATCH_MAX_BYTES + 1)) with pytest.raises(ValueError, match="patch must target a workspace path"): parse_unified_text_patch("--- /dev/null\n+++ /dev/null\n") with pytest.raises(ValueError, match="patch must contain at least one file change"): parse_unified_text_patch( """diff --git a/file.txt b/file.txt index 1234567..89abcde 100644 """ ) with pytest.raises(ValueError, match="unsupported patch feature"): parse_unified_text_patch( """--- a/file.txt +++ b/file.txt new mode 100644 """ ) with pytest.raises(ValueError, match="invalid patch hunk header"): parse_unified_text_patch( """--- a/file.txt +++ b/file.txt @@ -1 +1 @@ old @@bogus """ ) parsed = parse_unified_text_patch( """--- a/file.txt +++ b/file.txt index 1234567..89abcde 100644 @@ -1 +1 @@ -old +new @@ -3 +3 @@ -before +after """ ) assert len(parsed[0].hunks) == 2 def test_apply_unified_text_patch_rejects_context_mismatches() -> None: patch = parse_unified_text_patch( """--- a/file.txt +++ b/file.txt @@ -1 +1 @@ -before +after """ )[0] with pytest.raises(RuntimeError, match="patch context does not match"): apply_unified_text_patch( path="/workspace/file.txt", patch=patch, before_text="different\n", ) with pytest.raises(RuntimeError, match="patch context does not match"): apply_unified_text_patch( path="/workspace/file.txt", patch=WorkspaceTextPatch( path="/workspace/file.txt", status="modified", hunks=[ WorkspacePatchHunk( old_start=1, old_count=1, new_start=1, new_count=1, lines=[" same\n"], ) ], ), before_text="", ) def test_apply_unified_text_patch_rejects_range_prefix_delete_and_size_errors() -> None: with pytest.raises(RuntimeError, match="out of range"): apply_unified_text_patch( path="/workspace/file.txt", patch=WorkspaceTextPatch( path="/workspace/file.txt", status="modified", hunks=[ WorkspacePatchHunk( old_start=3, old_count=1, new_start=3, new_count=1, lines=["-old\n", "+new\n"], ) ], ), before_text="old\n", ) with pytest.raises(RuntimeError, match="invalid patch line prefix"): apply_unified_text_patch( path="/workspace/file.txt", patch=WorkspaceTextPatch( path="/workspace/file.txt", status="modified", hunks=[ WorkspacePatchHunk( old_start=1, old_count=0, new_start=1, new_count=0, lines=["?bad\n"], ) ], ), before_text="", ) with pytest.raises(RuntimeError, match="delete patch did not remove all content"): apply_unified_text_patch( path="/workspace/file.txt", patch=WorkspaceTextPatch( path="/workspace/file.txt", status="deleted", hunks=[ WorkspacePatchHunk( old_start=1, old_count=1, new_start=1, new_count=0, lines=["-first\n"], ) ], ), before_text="first\nsecond\n", ) huge_payload = "x" * (WORKSPACE_FILE_MAX_BYTES + 1) with pytest.raises(RuntimeError, match="exceeds the maximum supported size"): apply_unified_text_patch( path="/workspace/file.txt", patch=WorkspaceTextPatch( path="/workspace/file.txt", status="added", hunks=[ WorkspacePatchHunk( old_start=0, old_count=0, new_start=1, new_count=1, lines=[f"+{huge_payload}"], ) ], ), before_text=None, )