pyro-mcp/src/pyro_mcp/api.py
Thales Maciel dc86d84e96 Add workspace review summaries
Add workspace summary across the CLI, SDK, and MCP, and include it in the workspace-core profile so chat hosts can review one concise view of the current session.

Persist lightweight review events for syncs, file edits, patch applies, exports, service lifecycle, and snapshot activity, then synthesize them with command history, current services, snapshot state, and current diff data since the last reset.

Update the walkthroughs, use-case docs, public contract, changelog, and roadmap for 4.3.0, and make dist-check invoke the CLI module directly so local package reinstall quirks do not break the packaging gate.

Validation: uv lock; ./.venv/bin/pytest --no-cov tests/test_vm_manager.py tests/test_cli.py tests/test_api.py tests/test_server.py tests/test_public_contract.py tests/test_workspace_use_case_smokes.py; UV_OFFLINE=1 UV_CACHE_DIR=.uv-cache make check; UV_OFFLINE=1 UV_CACHE_DIR=.uv-cache make dist-check; real guest-backed workspace create -> patch apply -> workspace summary --json -> delete smoke.
2026-03-13 19:21:11 -03:00

1141 lines
39 KiB
Python

"""Public facade shared by the Python SDK and MCP server."""
from __future__ import annotations
from pathlib import Path
from typing import Any, Literal, cast
from mcp.server.fastmcp import FastMCP
from pyro_mcp.contract import (
PUBLIC_MCP_PROFILES,
PUBLIC_MCP_VM_RUN_PROFILE_TOOLS,
PUBLIC_MCP_WORKSPACE_CORE_PROFILE_TOOLS,
PUBLIC_MCP_WORKSPACE_FULL_PROFILE_TOOLS,
)
from pyro_mcp.project_startup import (
ProjectStartupSource,
describe_project_startup_source,
materialize_project_startup_source,
resolve_project_startup_source,
)
from pyro_mcp.vm_manager import (
DEFAULT_ALLOW_HOST_COMPAT,
DEFAULT_MEM_MIB,
DEFAULT_TIMEOUT_SECONDS,
DEFAULT_TTL_SECONDS,
DEFAULT_VCPU_COUNT,
DEFAULT_WORKSPACE_NETWORK_POLICY,
VmManager,
)
McpToolProfile = Literal["vm-run", "workspace-core", "workspace-full"]
_PROFILE_TOOLS: dict[str, tuple[str, ...]] = {
"vm-run": PUBLIC_MCP_VM_RUN_PROFILE_TOOLS,
"workspace-core": PUBLIC_MCP_WORKSPACE_CORE_PROFILE_TOOLS,
"workspace-full": PUBLIC_MCP_WORKSPACE_FULL_PROFILE_TOOLS,
}
def _validate_mcp_profile(profile: str) -> McpToolProfile:
if profile not in PUBLIC_MCP_PROFILES:
expected = ", ".join(PUBLIC_MCP_PROFILES)
raise ValueError(f"unknown MCP profile {profile!r}; expected one of: {expected}")
return cast(McpToolProfile, profile)
def _workspace_create_description(startup_source: ProjectStartupSource | None) -> str:
if startup_source is None:
return "Create and start a persistent workspace."
described_source = describe_project_startup_source(startup_source)
if described_source is None:
return "Create and start a persistent workspace."
return (
"Create and start a persistent workspace. If `seed_path` is omitted, "
f"the server seeds from {described_source}."
)
class Pyro:
"""High-level facade over the ephemeral VM runtime."""
def __init__(
self,
manager: VmManager | None = None,
*,
backend_name: str | None = None,
base_dir: Path | None = None,
cache_dir: Path | None = None,
max_active_vms: int = 4,
) -> None:
self._manager = manager or VmManager(
backend_name=backend_name,
base_dir=base_dir,
cache_dir=cache_dir,
max_active_vms=max_active_vms,
)
@property
def manager(self) -> VmManager:
return self._manager
def list_environments(self) -> list[dict[str, object]]:
return self._manager.list_environments()
def pull_environment(self, environment: str) -> dict[str, object]:
return self._manager.pull_environment(environment)
def inspect_environment(self, environment: str) -> dict[str, object]:
return self._manager.inspect_environment(environment)
def prune_environments(self) -> dict[str, object]:
return self._manager.prune_environments()
def create_vm(
self,
*,
environment: str,
vcpu_count: int = DEFAULT_VCPU_COUNT,
mem_mib: int = DEFAULT_MEM_MIB,
ttl_seconds: int = DEFAULT_TTL_SECONDS,
network: bool = False,
allow_host_compat: bool = DEFAULT_ALLOW_HOST_COMPAT,
) -> dict[str, Any]:
return self._manager.create_vm(
environment=environment,
vcpu_count=vcpu_count,
mem_mib=mem_mib,
ttl_seconds=ttl_seconds,
network=network,
allow_host_compat=allow_host_compat,
)
def start_vm(self, vm_id: str) -> dict[str, Any]:
return self._manager.start_vm(vm_id)
def exec_vm(self, vm_id: str, *, command: str, timeout_seconds: int = 30) -> dict[str, Any]:
return self._manager.exec_vm(vm_id, command=command, timeout_seconds=timeout_seconds)
def create_workspace(
self,
*,
environment: str,
vcpu_count: int = DEFAULT_VCPU_COUNT,
mem_mib: int = DEFAULT_MEM_MIB,
ttl_seconds: int = DEFAULT_TTL_SECONDS,
network_policy: str = DEFAULT_WORKSPACE_NETWORK_POLICY,
allow_host_compat: bool = DEFAULT_ALLOW_HOST_COMPAT,
seed_path: str | Path | None = None,
secrets: list[dict[str, str]] | None = None,
name: str | None = None,
labels: dict[str, str] | None = None,
) -> dict[str, Any]:
return self._manager.create_workspace(
environment=environment,
vcpu_count=vcpu_count,
mem_mib=mem_mib,
ttl_seconds=ttl_seconds,
network_policy=network_policy,
allow_host_compat=allow_host_compat,
seed_path=seed_path,
secrets=secrets,
name=name,
labels=labels,
)
def list_workspaces(self) -> dict[str, Any]:
return self._manager.list_workspaces()
def update_workspace(
self,
workspace_id: str,
*,
name: str | None = None,
clear_name: bool = False,
labels: dict[str, str] | None = None,
clear_labels: list[str] | None = None,
) -> dict[str, Any]:
return self._manager.update_workspace(
workspace_id,
name=name,
clear_name=clear_name,
labels=labels,
clear_labels=clear_labels,
)
def exec_workspace(
self,
workspace_id: str,
*,
command: str,
timeout_seconds: int = DEFAULT_TIMEOUT_SECONDS,
secret_env: dict[str, str] | None = None,
) -> dict[str, Any]:
return self._manager.exec_workspace(
workspace_id,
command=command,
timeout_seconds=timeout_seconds,
secret_env=secret_env,
)
def status_workspace(self, workspace_id: str) -> dict[str, Any]:
return self._manager.status_workspace(workspace_id)
def stop_workspace(self, workspace_id: str) -> dict[str, Any]:
return self._manager.stop_workspace(workspace_id)
def start_workspace(self, workspace_id: str) -> dict[str, Any]:
return self._manager.start_workspace(workspace_id)
def push_workspace_sync(
self,
workspace_id: str,
source_path: str | Path,
*,
dest: str = "/workspace",
) -> dict[str, Any]:
return self._manager.push_workspace_sync(
workspace_id,
source_path=source_path,
dest=dest,
)
def logs_workspace(self, workspace_id: str) -> dict[str, Any]:
return self._manager.logs_workspace(workspace_id)
def summarize_workspace(self, workspace_id: str) -> dict[str, Any]:
return self._manager.summarize_workspace(workspace_id)
def export_workspace(
self,
workspace_id: str,
path: str,
*,
output_path: str | Path,
) -> dict[str, Any]:
return self._manager.export_workspace(
workspace_id,
path=path,
output_path=output_path,
)
def diff_workspace(self, workspace_id: str) -> dict[str, Any]:
return self._manager.diff_workspace(workspace_id)
def list_workspace_files(
self,
workspace_id: str,
*,
path: str = "/workspace",
recursive: bool = False,
) -> dict[str, Any]:
return self._manager.list_workspace_files(
workspace_id,
path=path,
recursive=recursive,
)
def read_workspace_file(
self,
workspace_id: str,
path: str,
*,
max_bytes: int = 65536,
) -> dict[str, Any]:
return self._manager.read_workspace_file(
workspace_id,
path,
max_bytes=max_bytes,
)
def write_workspace_file(
self,
workspace_id: str,
path: str,
*,
text: str,
) -> dict[str, Any]:
return self._manager.write_workspace_file(
workspace_id,
path,
text=text,
)
def apply_workspace_patch(
self,
workspace_id: str,
*,
patch: str,
) -> dict[str, Any]:
return self._manager.apply_workspace_patch(
workspace_id,
patch=patch,
)
def export_workspace_disk(
self,
workspace_id: str,
*,
output_path: str | Path,
) -> dict[str, Any]:
return self._manager.export_workspace_disk(
workspace_id,
output_path=output_path,
)
def list_workspace_disk(
self,
workspace_id: str,
*,
path: str = "/workspace",
recursive: bool = False,
) -> dict[str, Any]:
return self._manager.list_workspace_disk(
workspace_id,
path=path,
recursive=recursive,
)
def read_workspace_disk(
self,
workspace_id: str,
path: str,
*,
max_bytes: int = 65536,
) -> dict[str, Any]:
return self._manager.read_workspace_disk(
workspace_id,
path=path,
max_bytes=max_bytes,
)
def create_snapshot(self, workspace_id: str, snapshot_name: str) -> dict[str, Any]:
return self._manager.create_snapshot(workspace_id, snapshot_name)
def list_snapshots(self, workspace_id: str) -> dict[str, Any]:
return self._manager.list_snapshots(workspace_id)
def delete_snapshot(self, workspace_id: str, snapshot_name: str) -> dict[str, Any]:
return self._manager.delete_snapshot(workspace_id, snapshot_name)
def reset_workspace(
self,
workspace_id: str,
*,
snapshot: str = "baseline",
) -> dict[str, Any]:
return self._manager.reset_workspace(workspace_id, snapshot=snapshot)
def open_shell(
self,
workspace_id: str,
*,
cwd: str = "/workspace",
cols: int = 120,
rows: int = 30,
secret_env: dict[str, str] | None = None,
) -> dict[str, Any]:
return self._manager.open_shell(
workspace_id,
cwd=cwd,
cols=cols,
rows=rows,
secret_env=secret_env,
)
def read_shell(
self,
workspace_id: str,
shell_id: str,
*,
cursor: int = 0,
max_chars: int = 65536,
plain: bool = False,
wait_for_idle_ms: int | None = None,
) -> dict[str, Any]:
return self._manager.read_shell(
workspace_id,
shell_id,
cursor=cursor,
max_chars=max_chars,
plain=plain,
wait_for_idle_ms=wait_for_idle_ms,
)
def write_shell(
self,
workspace_id: str,
shell_id: str,
*,
input: str,
append_newline: bool = True,
) -> dict[str, Any]:
return self._manager.write_shell(
workspace_id,
shell_id,
input_text=input,
append_newline=append_newline,
)
def signal_shell(
self,
workspace_id: str,
shell_id: str,
*,
signal_name: str = "INT",
) -> dict[str, Any]:
return self._manager.signal_shell(
workspace_id,
shell_id,
signal_name=signal_name,
)
def close_shell(self, workspace_id: str, shell_id: str) -> dict[str, Any]:
return self._manager.close_shell(workspace_id, shell_id)
def start_service(
self,
workspace_id: str,
service_name: str,
*,
command: str,
cwd: str = "/workspace",
readiness: dict[str, Any] | None = None,
ready_timeout_seconds: int = 30,
ready_interval_ms: int = 500,
secret_env: dict[str, str] | None = None,
published_ports: list[dict[str, int | None]] | None = None,
) -> dict[str, Any]:
return self._manager.start_service(
workspace_id,
service_name,
command=command,
cwd=cwd,
readiness=readiness,
ready_timeout_seconds=ready_timeout_seconds,
ready_interval_ms=ready_interval_ms,
secret_env=secret_env,
published_ports=published_ports,
)
def list_services(self, workspace_id: str) -> dict[str, Any]:
return self._manager.list_services(workspace_id)
def status_service(self, workspace_id: str, service_name: str) -> dict[str, Any]:
return self._manager.status_service(workspace_id, service_name)
def logs_service(
self,
workspace_id: str,
service_name: str,
*,
tail_lines: int = 200,
all: bool = False,
) -> dict[str, Any]:
return self._manager.logs_service(
workspace_id,
service_name,
tail_lines=None if all else tail_lines,
)
def stop_service(self, workspace_id: str, service_name: str) -> dict[str, Any]:
return self._manager.stop_service(workspace_id, service_name)
def delete_workspace(self, workspace_id: str) -> dict[str, Any]:
return self._manager.delete_workspace(workspace_id)
def stop_vm(self, vm_id: str) -> dict[str, Any]:
return self._manager.stop_vm(vm_id)
def delete_vm(self, vm_id: str) -> dict[str, Any]:
return self._manager.delete_vm(vm_id)
def status_vm(self, vm_id: str) -> dict[str, Any]:
return self._manager.status_vm(vm_id)
def network_info_vm(self, vm_id: str) -> dict[str, Any]:
return self._manager.network_info_vm(vm_id)
def reap_expired(self) -> dict[str, Any]:
return self._manager.reap_expired()
def run_in_vm(
self,
*,
environment: str,
command: str,
vcpu_count: int = DEFAULT_VCPU_COUNT,
mem_mib: int = DEFAULT_MEM_MIB,
timeout_seconds: int = DEFAULT_TIMEOUT_SECONDS,
ttl_seconds: int = DEFAULT_TTL_SECONDS,
network: bool = False,
allow_host_compat: bool = DEFAULT_ALLOW_HOST_COMPAT,
) -> dict[str, Any]:
return self._manager.run_vm(
environment=environment,
command=command,
vcpu_count=vcpu_count,
mem_mib=mem_mib,
timeout_seconds=timeout_seconds,
ttl_seconds=ttl_seconds,
network=network,
allow_host_compat=allow_host_compat,
)
def create_server(
self,
*,
profile: McpToolProfile = "workspace-core",
project_path: str | Path | None = None,
repo_url: str | None = None,
repo_ref: str | None = None,
no_project_source: bool = False,
) -> FastMCP:
"""Create an MCP server for one of the stable public tool profiles.
`workspace-core` is the default stable chat-host profile in 4.x. Use
`profile="workspace-full"` only when the host truly needs the full
advanced workspace surface. By default, the server auto-detects the
nearest Git worktree root from its current working directory and uses
that source when `workspace_create` omits `seed_path`. `project_path`,
`repo_url`, and `no_project_source` override that behavior explicitly.
"""
normalized_profile = _validate_mcp_profile(profile)
startup_source = resolve_project_startup_source(
project_path=project_path,
repo_url=repo_url,
repo_ref=repo_ref,
no_project_source=no_project_source,
)
enabled_tools = set(_PROFILE_TOOLS[normalized_profile])
server = FastMCP(name="pyro_mcp")
def _enabled(tool_name: str) -> bool:
return tool_name in enabled_tools
if _enabled("vm_run"):
@server.tool()
async def vm_run(
environment: str,
command: str,
vcpu_count: int = DEFAULT_VCPU_COUNT,
mem_mib: int = DEFAULT_MEM_MIB,
timeout_seconds: int = DEFAULT_TIMEOUT_SECONDS,
ttl_seconds: int = DEFAULT_TTL_SECONDS,
network: bool = False,
allow_host_compat: bool = DEFAULT_ALLOW_HOST_COMPAT,
) -> dict[str, Any]:
"""Create, start, execute, and clean up an ephemeral VM."""
return self.run_in_vm(
environment=environment,
command=command,
vcpu_count=vcpu_count,
mem_mib=mem_mib,
timeout_seconds=timeout_seconds,
ttl_seconds=ttl_seconds,
network=network,
allow_host_compat=allow_host_compat,
)
if _enabled("vm_list_environments"):
@server.tool()
async def vm_list_environments() -> list[dict[str, object]]:
"""List curated Linux environments and installation status."""
return self.list_environments()
if _enabled("vm_create"):
@server.tool()
async def vm_create(
environment: str,
vcpu_count: int = DEFAULT_VCPU_COUNT,
mem_mib: int = DEFAULT_MEM_MIB,
ttl_seconds: int = DEFAULT_TTL_SECONDS,
network: bool = False,
allow_host_compat: bool = DEFAULT_ALLOW_HOST_COMPAT,
) -> dict[str, Any]:
"""Create an ephemeral VM record with environment and resource sizing."""
return self.create_vm(
environment=environment,
vcpu_count=vcpu_count,
mem_mib=mem_mib,
ttl_seconds=ttl_seconds,
network=network,
allow_host_compat=allow_host_compat,
)
if _enabled("vm_start"):
@server.tool()
async def vm_start(vm_id: str) -> dict[str, Any]:
"""Start a created VM and transition it into a command-ready state."""
return self.start_vm(vm_id)
if _enabled("vm_exec"):
@server.tool()
async def vm_exec(
vm_id: str,
command: str,
timeout_seconds: int = 30,
) -> dict[str, Any]:
"""Run one non-interactive command and auto-clean the VM."""
return self.exec_vm(vm_id, command=command, timeout_seconds=timeout_seconds)
if _enabled("vm_stop"):
@server.tool()
async def vm_stop(vm_id: str) -> dict[str, Any]:
"""Stop a running VM."""
return self.stop_vm(vm_id)
if _enabled("vm_delete"):
@server.tool()
async def vm_delete(vm_id: str) -> dict[str, Any]:
"""Delete a VM and its runtime artifacts."""
return self.delete_vm(vm_id)
if _enabled("vm_status"):
@server.tool()
async def vm_status(vm_id: str) -> dict[str, Any]:
"""Get the current state and metadata for a VM."""
return self.status_vm(vm_id)
if _enabled("vm_network_info"):
@server.tool()
async def vm_network_info(vm_id: str) -> dict[str, Any]:
"""Get the current network configuration assigned to a VM."""
return self.network_info_vm(vm_id)
if _enabled("vm_reap_expired"):
@server.tool()
async def vm_reap_expired() -> dict[str, Any]:
"""Delete VMs whose TTL has expired."""
return self.reap_expired()
if _enabled("workspace_create"):
workspace_create_description = _workspace_create_description(startup_source)
def _create_workspace_from_server_defaults(
*,
environment: str,
vcpu_count: int,
mem_mib: int,
ttl_seconds: int,
network_policy: str,
allow_host_compat: bool,
seed_path: str | None,
secrets: list[dict[str, str]] | None,
name: str | None,
labels: dict[str, str] | None,
) -> dict[str, Any]:
if seed_path is not None or startup_source is None:
return self.create_workspace(
environment=environment,
vcpu_count=vcpu_count,
mem_mib=mem_mib,
ttl_seconds=ttl_seconds,
network_policy=network_policy,
allow_host_compat=allow_host_compat,
seed_path=seed_path,
secrets=secrets,
name=name,
labels=labels,
)
with materialize_project_startup_source(startup_source) as resolved_seed_path:
prepared_seed = self._manager._prepare_workspace_seed( # noqa: SLF001
resolved_seed_path,
origin_kind=startup_source.kind,
origin_ref=startup_source.origin_ref,
)
return self._manager.create_workspace(
environment=environment,
vcpu_count=vcpu_count,
mem_mib=mem_mib,
ttl_seconds=ttl_seconds,
network_policy=network_policy,
allow_host_compat=allow_host_compat,
secrets=secrets,
name=name,
labels=labels,
_prepared_seed=prepared_seed,
)
if normalized_profile == "workspace-core":
@server.tool(name="workspace_create", description=workspace_create_description)
async def workspace_create_core(
environment: str,
vcpu_count: int = DEFAULT_VCPU_COUNT,
mem_mib: int = DEFAULT_MEM_MIB,
ttl_seconds: int = DEFAULT_TTL_SECONDS,
allow_host_compat: bool = DEFAULT_ALLOW_HOST_COMPAT,
seed_path: str | None = None,
name: str | None = None,
labels: dict[str, str] | None = None,
) -> dict[str, Any]:
return _create_workspace_from_server_defaults(
environment=environment,
vcpu_count=vcpu_count,
mem_mib=mem_mib,
ttl_seconds=ttl_seconds,
network_policy=DEFAULT_WORKSPACE_NETWORK_POLICY,
allow_host_compat=allow_host_compat,
seed_path=seed_path,
secrets=None,
name=name,
labels=labels,
)
else:
@server.tool(name="workspace_create", description=workspace_create_description)
async def workspace_create_full(
environment: str,
vcpu_count: int = DEFAULT_VCPU_COUNT,
mem_mib: int = DEFAULT_MEM_MIB,
ttl_seconds: int = DEFAULT_TTL_SECONDS,
network_policy: str = DEFAULT_WORKSPACE_NETWORK_POLICY,
allow_host_compat: bool = DEFAULT_ALLOW_HOST_COMPAT,
seed_path: str | None = None,
secrets: list[dict[str, str]] | None = None,
name: str | None = None,
labels: dict[str, str] | None = None,
) -> dict[str, Any]:
return _create_workspace_from_server_defaults(
environment=environment,
vcpu_count=vcpu_count,
mem_mib=mem_mib,
ttl_seconds=ttl_seconds,
network_policy=network_policy,
allow_host_compat=allow_host_compat,
seed_path=seed_path,
secrets=secrets,
name=name,
labels=labels,
)
if _enabled("workspace_list"):
@server.tool()
async def workspace_list() -> dict[str, Any]:
"""List persisted workspaces with summary metadata."""
return self.list_workspaces()
if _enabled("workspace_update"):
@server.tool()
async def workspace_update(
workspace_id: str,
name: str | None = None,
clear_name: bool = False,
labels: dict[str, str] | None = None,
clear_labels: list[str] | None = None,
) -> dict[str, Any]:
"""Update optional workspace name and labels."""
return self.update_workspace(
workspace_id,
name=name,
clear_name=clear_name,
labels=labels,
clear_labels=clear_labels,
)
if _enabled("workspace_exec"):
if normalized_profile == "workspace-core":
@server.tool(name="workspace_exec")
async def workspace_exec_core(
workspace_id: str,
command: str,
timeout_seconds: int = DEFAULT_TIMEOUT_SECONDS,
) -> dict[str, Any]:
"""Run one command inside an existing persistent workspace."""
return self.exec_workspace(
workspace_id,
command=command,
timeout_seconds=timeout_seconds,
secret_env=None,
)
else:
@server.tool(name="workspace_exec")
async def workspace_exec_full(
workspace_id: str,
command: str,
timeout_seconds: int = DEFAULT_TIMEOUT_SECONDS,
secret_env: dict[str, str] | None = None,
) -> dict[str, Any]:
"""Run one command inside an existing persistent workspace."""
return self.exec_workspace(
workspace_id,
command=command,
timeout_seconds=timeout_seconds,
secret_env=secret_env,
)
if _enabled("workspace_sync_push"):
@server.tool()
async def workspace_sync_push(
workspace_id: str,
source_path: str,
dest: str = "/workspace",
) -> dict[str, Any]:
"""Push host content into the persistent `/workspace` of a started workspace."""
return self.push_workspace_sync(workspace_id, source_path=source_path, dest=dest)
if _enabled("workspace_status"):
@server.tool()
async def workspace_status(workspace_id: str) -> dict[str, Any]:
"""Inspect workspace state and latest command metadata."""
return self.status_workspace(workspace_id)
if _enabled("workspace_stop"):
@server.tool()
async def workspace_stop(workspace_id: str) -> dict[str, Any]:
"""Stop one persistent workspace without resetting `/workspace`."""
return self.stop_workspace(workspace_id)
if _enabled("workspace_start"):
@server.tool()
async def workspace_start(workspace_id: str) -> dict[str, Any]:
"""Start one stopped persistent workspace without resetting `/workspace`."""
return self.start_workspace(workspace_id)
if _enabled("workspace_logs"):
@server.tool()
async def workspace_logs(workspace_id: str) -> dict[str, Any]:
"""Return persisted command history for one workspace."""
return self.logs_workspace(workspace_id)
if _enabled("workspace_summary"):
@server.tool()
async def workspace_summary(workspace_id: str) -> dict[str, Any]:
"""Summarize the current workspace session for human review."""
return self.summarize_workspace(workspace_id)
if _enabled("workspace_export"):
@server.tool()
async def workspace_export(
workspace_id: str,
path: str,
output_path: str,
) -> dict[str, Any]:
"""Export one file or directory from `/workspace` back to the host."""
return self.export_workspace(workspace_id, path, output_path=output_path)
if _enabled("workspace_diff"):
@server.tool()
async def workspace_diff(workspace_id: str) -> dict[str, Any]:
"""Compare `/workspace` to the immutable create-time baseline."""
return self.diff_workspace(workspace_id)
if _enabled("workspace_file_list"):
@server.tool()
async def workspace_file_list(
workspace_id: str,
path: str = "/workspace",
recursive: bool = False,
) -> dict[str, Any]:
"""List metadata for files and directories under one live workspace path."""
return self.list_workspace_files(
workspace_id,
path=path,
recursive=recursive,
)
if _enabled("workspace_file_read"):
@server.tool()
async def workspace_file_read(
workspace_id: str,
path: str,
max_bytes: int = 65536,
) -> dict[str, Any]:
"""Read one regular text file from a live workspace path."""
return self.read_workspace_file(
workspace_id,
path,
max_bytes=max_bytes,
)
if _enabled("workspace_file_write"):
@server.tool()
async def workspace_file_write(
workspace_id: str,
path: str,
text: str,
) -> dict[str, Any]:
"""Create or replace one regular text file under `/workspace`."""
return self.write_workspace_file(
workspace_id,
path,
text=text,
)
if _enabled("workspace_patch_apply"):
@server.tool()
async def workspace_patch_apply(
workspace_id: str,
patch: str,
) -> dict[str, Any]:
"""Apply a unified text patch inside one live workspace."""
return self.apply_workspace_patch(
workspace_id,
patch=patch,
)
if _enabled("workspace_disk_export"):
@server.tool()
async def workspace_disk_export(
workspace_id: str,
output_path: str,
) -> dict[str, Any]:
"""Export the raw stopped workspace rootfs image to one host path."""
return self.export_workspace_disk(workspace_id, output_path=output_path)
if _enabled("workspace_disk_list"):
@server.tool()
async def workspace_disk_list(
workspace_id: str,
path: str = "/workspace",
recursive: bool = False,
) -> dict[str, Any]:
"""Inspect one stopped workspace rootfs path without booting the guest."""
return self.list_workspace_disk(
workspace_id,
path=path,
recursive=recursive,
)
if _enabled("workspace_disk_read"):
@server.tool()
async def workspace_disk_read(
workspace_id: str,
path: str,
max_bytes: int = 65536,
) -> dict[str, Any]:
"""Read one regular file from a stopped workspace rootfs offline."""
return self.read_workspace_disk(
workspace_id,
path,
max_bytes=max_bytes,
)
if _enabled("snapshot_create"):
@server.tool()
async def snapshot_create(workspace_id: str, snapshot_name: str) -> dict[str, Any]:
"""Create one named workspace snapshot from the current `/workspace` tree."""
return self.create_snapshot(workspace_id, snapshot_name)
if _enabled("snapshot_list"):
@server.tool()
async def snapshot_list(workspace_id: str) -> dict[str, Any]:
"""List the baseline plus named snapshots for one workspace."""
return self.list_snapshots(workspace_id)
if _enabled("snapshot_delete"):
@server.tool()
async def snapshot_delete(workspace_id: str, snapshot_name: str) -> dict[str, Any]:
"""Delete one named snapshot from a workspace."""
return self.delete_snapshot(workspace_id, snapshot_name)
if _enabled("workspace_reset"):
@server.tool()
async def workspace_reset(
workspace_id: str,
snapshot: str = "baseline",
) -> dict[str, Any]:
"""Recreate a workspace and restore `/workspace` from one snapshot."""
return self.reset_workspace(workspace_id, snapshot=snapshot)
if _enabled("shell_open"):
@server.tool()
async def shell_open(
workspace_id: str,
cwd: str = "/workspace",
cols: int = 120,
rows: int = 30,
secret_env: dict[str, str] | None = None,
) -> dict[str, Any]:
"""Open a persistent interactive shell inside one workspace."""
return self.open_shell(
workspace_id,
cwd=cwd,
cols=cols,
rows=rows,
secret_env=secret_env,
)
if _enabled("shell_read"):
@server.tool()
async def shell_read(
workspace_id: str,
shell_id: str,
cursor: int = 0,
max_chars: int = 65536,
plain: bool = False,
wait_for_idle_ms: int | None = None,
) -> dict[str, Any]:
"""Read merged PTY output from a workspace shell."""
return self.read_shell(
workspace_id,
shell_id,
cursor=cursor,
max_chars=max_chars,
plain=plain,
wait_for_idle_ms=wait_for_idle_ms,
)
if _enabled("shell_write"):
@server.tool()
async def shell_write(
workspace_id: str,
shell_id: str,
input: str,
append_newline: bool = True,
) -> dict[str, Any]:
"""Write text input to a persistent workspace shell."""
return self.write_shell(
workspace_id,
shell_id,
input=input,
append_newline=append_newline,
)
if _enabled("shell_signal"):
@server.tool()
async def shell_signal(
workspace_id: str,
shell_id: str,
signal_name: str = "INT",
) -> dict[str, Any]:
"""Send a signal to the shell process group."""
return self.signal_shell(
workspace_id,
shell_id,
signal_name=signal_name,
)
if _enabled("shell_close"):
@server.tool()
async def shell_close(workspace_id: str, shell_id: str) -> dict[str, Any]:
"""Close a persistent workspace shell."""
return self.close_shell(workspace_id, shell_id)
if _enabled("service_start"):
@server.tool()
async def service_start(
workspace_id: str,
service_name: str,
command: str,
cwd: str = "/workspace",
ready_file: str | None = None,
ready_tcp: str | None = None,
ready_http: str | None = None,
ready_command: str | None = None,
ready_timeout_seconds: int = 30,
ready_interval_ms: int = 500,
secret_env: dict[str, str] | None = None,
published_ports: list[dict[str, int | None]] | None = None,
) -> dict[str, Any]:
"""Start a named long-running service inside a workspace."""
readiness: dict[str, Any] | None = None
if ready_file is not None:
readiness = {"type": "file", "path": ready_file}
elif ready_tcp is not None:
readiness = {"type": "tcp", "address": ready_tcp}
elif ready_http is not None:
readiness = {"type": "http", "url": ready_http}
elif ready_command is not None:
readiness = {"type": "command", "command": ready_command}
return self.start_service(
workspace_id,
service_name,
command=command,
cwd=cwd,
readiness=readiness,
ready_timeout_seconds=ready_timeout_seconds,
ready_interval_ms=ready_interval_ms,
secret_env=secret_env,
published_ports=published_ports,
)
if _enabled("service_list"):
@server.tool()
async def service_list(workspace_id: str) -> dict[str, Any]:
"""List named services in one workspace."""
return self.list_services(workspace_id)
if _enabled("service_status"):
@server.tool()
async def service_status(workspace_id: str, service_name: str) -> dict[str, Any]:
"""Inspect one named workspace service."""
return self.status_service(workspace_id, service_name)
if _enabled("service_logs"):
@server.tool()
async def service_logs(
workspace_id: str,
service_name: str,
tail_lines: int = 200,
all: bool = False,
) -> dict[str, Any]:
"""Read persisted stdout/stderr for one workspace service."""
return self.logs_service(
workspace_id,
service_name,
tail_lines=tail_lines,
all=all,
)
if _enabled("service_stop"):
@server.tool()
async def service_stop(workspace_id: str, service_name: str) -> dict[str, Any]:
"""Stop one running service in a workspace."""
return self.stop_service(workspace_id, service_name)
if _enabled("workspace_delete"):
@server.tool()
async def workspace_delete(workspace_id: str) -> dict[str, Any]:
"""Delete a persistent workspace and its backing sandbox."""
return self.delete_workspace(workspace_id)
return server