pyro-mcp/src/pyro_mcp/api.py
Thales Maciel eecfd7a7d7 Add MCP tool profiles for workspace chat flows
Expose stable MCP/server tool profiles so chat hosts can start narrow and widen only when needed. This adds vm-run, workspace-core, and workspace-full across the CLI serve path, Pyro.create_server(), and the package-level create_server() factory while keeping workspace-full as the default.

Register profile-specific tool sets from one shared contract mapping, and narrow the workspace-core schemas so secrets, network policy, shells, services, snapshots, and disk tools do not leak into the default persistent chat profile. The full surface remains available unchanged under workspace-full.

Refresh the public docs and examples around the profile progression, add a canonical OpenAI Responses workspace-core example, mark the 3.4.0 roadmap milestone done, and verify with uv lock, UV_CACHE_DIR=.uv-cache make check, UV_CACHE_DIR=.uv-cache make dist-check, and a real guest-backed workspace-core smoke for create, file write, exec, diff, export, reset, and delete.
2026-03-12 23:52:13 -03:00

1037 lines
35 KiB
Python

"""Public facade shared by the Python SDK and MCP server."""
from __future__ import annotations
from pathlib import Path
from typing import Any, Literal, cast
from mcp.server.fastmcp import FastMCP
from pyro_mcp.contract import (
PUBLIC_MCP_PROFILES,
PUBLIC_MCP_VM_RUN_PROFILE_TOOLS,
PUBLIC_MCP_WORKSPACE_CORE_PROFILE_TOOLS,
PUBLIC_MCP_WORKSPACE_FULL_PROFILE_TOOLS,
)
from pyro_mcp.vm_manager import (
DEFAULT_ALLOW_HOST_COMPAT,
DEFAULT_MEM_MIB,
DEFAULT_TIMEOUT_SECONDS,
DEFAULT_TTL_SECONDS,
DEFAULT_VCPU_COUNT,
DEFAULT_WORKSPACE_NETWORK_POLICY,
VmManager,
)
McpToolProfile = Literal["vm-run", "workspace-core", "workspace-full"]
_PROFILE_TOOLS: dict[str, tuple[str, ...]] = {
"vm-run": PUBLIC_MCP_VM_RUN_PROFILE_TOOLS,
"workspace-core": PUBLIC_MCP_WORKSPACE_CORE_PROFILE_TOOLS,
"workspace-full": PUBLIC_MCP_WORKSPACE_FULL_PROFILE_TOOLS,
}
def _validate_mcp_profile(profile: str) -> McpToolProfile:
if profile not in PUBLIC_MCP_PROFILES:
expected = ", ".join(PUBLIC_MCP_PROFILES)
raise ValueError(f"unknown MCP profile {profile!r}; expected one of: {expected}")
return cast(McpToolProfile, profile)
class Pyro:
"""High-level facade over the ephemeral VM runtime."""
def __init__(
self,
manager: VmManager | None = None,
*,
backend_name: str | None = None,
base_dir: Path | None = None,
cache_dir: Path | None = None,
max_active_vms: int = 4,
) -> None:
self._manager = manager or VmManager(
backend_name=backend_name,
base_dir=base_dir,
cache_dir=cache_dir,
max_active_vms=max_active_vms,
)
@property
def manager(self) -> VmManager:
return self._manager
def list_environments(self) -> list[dict[str, object]]:
return self._manager.list_environments()
def pull_environment(self, environment: str) -> dict[str, object]:
return self._manager.pull_environment(environment)
def inspect_environment(self, environment: str) -> dict[str, object]:
return self._manager.inspect_environment(environment)
def prune_environments(self) -> dict[str, object]:
return self._manager.prune_environments()
def create_vm(
self,
*,
environment: str,
vcpu_count: int = DEFAULT_VCPU_COUNT,
mem_mib: int = DEFAULT_MEM_MIB,
ttl_seconds: int = DEFAULT_TTL_SECONDS,
network: bool = False,
allow_host_compat: bool = DEFAULT_ALLOW_HOST_COMPAT,
) -> dict[str, Any]:
return self._manager.create_vm(
environment=environment,
vcpu_count=vcpu_count,
mem_mib=mem_mib,
ttl_seconds=ttl_seconds,
network=network,
allow_host_compat=allow_host_compat,
)
def start_vm(self, vm_id: str) -> dict[str, Any]:
return self._manager.start_vm(vm_id)
def exec_vm(self, vm_id: str, *, command: str, timeout_seconds: int = 30) -> dict[str, Any]:
return self._manager.exec_vm(vm_id, command=command, timeout_seconds=timeout_seconds)
def create_workspace(
self,
*,
environment: str,
vcpu_count: int = DEFAULT_VCPU_COUNT,
mem_mib: int = DEFAULT_MEM_MIB,
ttl_seconds: int = DEFAULT_TTL_SECONDS,
network_policy: str = DEFAULT_WORKSPACE_NETWORK_POLICY,
allow_host_compat: bool = DEFAULT_ALLOW_HOST_COMPAT,
seed_path: str | Path | None = None,
secrets: list[dict[str, str]] | None = None,
name: str | None = None,
labels: dict[str, str] | None = None,
) -> dict[str, Any]:
return self._manager.create_workspace(
environment=environment,
vcpu_count=vcpu_count,
mem_mib=mem_mib,
ttl_seconds=ttl_seconds,
network_policy=network_policy,
allow_host_compat=allow_host_compat,
seed_path=seed_path,
secrets=secrets,
name=name,
labels=labels,
)
def list_workspaces(self) -> dict[str, Any]:
return self._manager.list_workspaces()
def update_workspace(
self,
workspace_id: str,
*,
name: str | None = None,
clear_name: bool = False,
labels: dict[str, str] | None = None,
clear_labels: list[str] | None = None,
) -> dict[str, Any]:
return self._manager.update_workspace(
workspace_id,
name=name,
clear_name=clear_name,
labels=labels,
clear_labels=clear_labels,
)
def exec_workspace(
self,
workspace_id: str,
*,
command: str,
timeout_seconds: int = DEFAULT_TIMEOUT_SECONDS,
secret_env: dict[str, str] | None = None,
) -> dict[str, Any]:
return self._manager.exec_workspace(
workspace_id,
command=command,
timeout_seconds=timeout_seconds,
secret_env=secret_env,
)
def status_workspace(self, workspace_id: str) -> dict[str, Any]:
return self._manager.status_workspace(workspace_id)
def stop_workspace(self, workspace_id: str) -> dict[str, Any]:
return self._manager.stop_workspace(workspace_id)
def start_workspace(self, workspace_id: str) -> dict[str, Any]:
return self._manager.start_workspace(workspace_id)
def push_workspace_sync(
self,
workspace_id: str,
source_path: str | Path,
*,
dest: str = "/workspace",
) -> dict[str, Any]:
return self._manager.push_workspace_sync(
workspace_id,
source_path=source_path,
dest=dest,
)
def logs_workspace(self, workspace_id: str) -> dict[str, Any]:
return self._manager.logs_workspace(workspace_id)
def export_workspace(
self,
workspace_id: str,
path: str,
*,
output_path: str | Path,
) -> dict[str, Any]:
return self._manager.export_workspace(
workspace_id,
path=path,
output_path=output_path,
)
def diff_workspace(self, workspace_id: str) -> dict[str, Any]:
return self._manager.diff_workspace(workspace_id)
def list_workspace_files(
self,
workspace_id: str,
*,
path: str = "/workspace",
recursive: bool = False,
) -> dict[str, Any]:
return self._manager.list_workspace_files(
workspace_id,
path=path,
recursive=recursive,
)
def read_workspace_file(
self,
workspace_id: str,
path: str,
*,
max_bytes: int = 65536,
) -> dict[str, Any]:
return self._manager.read_workspace_file(
workspace_id,
path,
max_bytes=max_bytes,
)
def write_workspace_file(
self,
workspace_id: str,
path: str,
*,
text: str,
) -> dict[str, Any]:
return self._manager.write_workspace_file(
workspace_id,
path,
text=text,
)
def apply_workspace_patch(
self,
workspace_id: str,
*,
patch: str,
) -> dict[str, Any]:
return self._manager.apply_workspace_patch(
workspace_id,
patch=patch,
)
def export_workspace_disk(
self,
workspace_id: str,
*,
output_path: str | Path,
) -> dict[str, Any]:
return self._manager.export_workspace_disk(
workspace_id,
output_path=output_path,
)
def list_workspace_disk(
self,
workspace_id: str,
*,
path: str = "/workspace",
recursive: bool = False,
) -> dict[str, Any]:
return self._manager.list_workspace_disk(
workspace_id,
path=path,
recursive=recursive,
)
def read_workspace_disk(
self,
workspace_id: str,
path: str,
*,
max_bytes: int = 65536,
) -> dict[str, Any]:
return self._manager.read_workspace_disk(
workspace_id,
path=path,
max_bytes=max_bytes,
)
def create_snapshot(self, workspace_id: str, snapshot_name: str) -> dict[str, Any]:
return self._manager.create_snapshot(workspace_id, snapshot_name)
def list_snapshots(self, workspace_id: str) -> dict[str, Any]:
return self._manager.list_snapshots(workspace_id)
def delete_snapshot(self, workspace_id: str, snapshot_name: str) -> dict[str, Any]:
return self._manager.delete_snapshot(workspace_id, snapshot_name)
def reset_workspace(
self,
workspace_id: str,
*,
snapshot: str = "baseline",
) -> dict[str, Any]:
return self._manager.reset_workspace(workspace_id, snapshot=snapshot)
def open_shell(
self,
workspace_id: str,
*,
cwd: str = "/workspace",
cols: int = 120,
rows: int = 30,
secret_env: dict[str, str] | None = None,
) -> dict[str, Any]:
return self._manager.open_shell(
workspace_id,
cwd=cwd,
cols=cols,
rows=rows,
secret_env=secret_env,
)
def read_shell(
self,
workspace_id: str,
shell_id: str,
*,
cursor: int = 0,
max_chars: int = 65536,
) -> dict[str, Any]:
return self._manager.read_shell(
workspace_id,
shell_id,
cursor=cursor,
max_chars=max_chars,
)
def write_shell(
self,
workspace_id: str,
shell_id: str,
*,
input: str,
append_newline: bool = True,
) -> dict[str, Any]:
return self._manager.write_shell(
workspace_id,
shell_id,
input_text=input,
append_newline=append_newline,
)
def signal_shell(
self,
workspace_id: str,
shell_id: str,
*,
signal_name: str = "INT",
) -> dict[str, Any]:
return self._manager.signal_shell(
workspace_id,
shell_id,
signal_name=signal_name,
)
def close_shell(self, workspace_id: str, shell_id: str) -> dict[str, Any]:
return self._manager.close_shell(workspace_id, shell_id)
def start_service(
self,
workspace_id: str,
service_name: str,
*,
command: str,
cwd: str = "/workspace",
readiness: dict[str, Any] | None = None,
ready_timeout_seconds: int = 30,
ready_interval_ms: int = 500,
secret_env: dict[str, str] | None = None,
published_ports: list[dict[str, int | None]] | None = None,
) -> dict[str, Any]:
return self._manager.start_service(
workspace_id,
service_name,
command=command,
cwd=cwd,
readiness=readiness,
ready_timeout_seconds=ready_timeout_seconds,
ready_interval_ms=ready_interval_ms,
secret_env=secret_env,
published_ports=published_ports,
)
def list_services(self, workspace_id: str) -> dict[str, Any]:
return self._manager.list_services(workspace_id)
def status_service(self, workspace_id: str, service_name: str) -> dict[str, Any]:
return self._manager.status_service(workspace_id, service_name)
def logs_service(
self,
workspace_id: str,
service_name: str,
*,
tail_lines: int = 200,
all: bool = False,
) -> dict[str, Any]:
return self._manager.logs_service(
workspace_id,
service_name,
tail_lines=None if all else tail_lines,
)
def stop_service(self, workspace_id: str, service_name: str) -> dict[str, Any]:
return self._manager.stop_service(workspace_id, service_name)
def delete_workspace(self, workspace_id: str) -> dict[str, Any]:
return self._manager.delete_workspace(workspace_id)
def stop_vm(self, vm_id: str) -> dict[str, Any]:
return self._manager.stop_vm(vm_id)
def delete_vm(self, vm_id: str) -> dict[str, Any]:
return self._manager.delete_vm(vm_id)
def status_vm(self, vm_id: str) -> dict[str, Any]:
return self._manager.status_vm(vm_id)
def network_info_vm(self, vm_id: str) -> dict[str, Any]:
return self._manager.network_info_vm(vm_id)
def reap_expired(self) -> dict[str, Any]:
return self._manager.reap_expired()
def run_in_vm(
self,
*,
environment: str,
command: str,
vcpu_count: int = DEFAULT_VCPU_COUNT,
mem_mib: int = DEFAULT_MEM_MIB,
timeout_seconds: int = DEFAULT_TIMEOUT_SECONDS,
ttl_seconds: int = DEFAULT_TTL_SECONDS,
network: bool = False,
allow_host_compat: bool = DEFAULT_ALLOW_HOST_COMPAT,
) -> dict[str, Any]:
return self._manager.run_vm(
environment=environment,
command=command,
vcpu_count=vcpu_count,
mem_mib=mem_mib,
timeout_seconds=timeout_seconds,
ttl_seconds=ttl_seconds,
network=network,
allow_host_compat=allow_host_compat,
)
def create_server(self, *, profile: McpToolProfile = "workspace-full") -> FastMCP:
normalized_profile = _validate_mcp_profile(profile)
enabled_tools = set(_PROFILE_TOOLS[normalized_profile])
server = FastMCP(name="pyro_mcp")
def _enabled(tool_name: str) -> bool:
return tool_name in enabled_tools
if _enabled("vm_run"):
@server.tool()
async def vm_run(
environment: str,
command: str,
vcpu_count: int = DEFAULT_VCPU_COUNT,
mem_mib: int = DEFAULT_MEM_MIB,
timeout_seconds: int = DEFAULT_TIMEOUT_SECONDS,
ttl_seconds: int = DEFAULT_TTL_SECONDS,
network: bool = False,
allow_host_compat: bool = DEFAULT_ALLOW_HOST_COMPAT,
) -> dict[str, Any]:
"""Create, start, execute, and clean up an ephemeral VM."""
return self.run_in_vm(
environment=environment,
command=command,
vcpu_count=vcpu_count,
mem_mib=mem_mib,
timeout_seconds=timeout_seconds,
ttl_seconds=ttl_seconds,
network=network,
allow_host_compat=allow_host_compat,
)
if _enabled("vm_list_environments"):
@server.tool()
async def vm_list_environments() -> list[dict[str, object]]:
"""List curated Linux environments and installation status."""
return self.list_environments()
if _enabled("vm_create"):
@server.tool()
async def vm_create(
environment: str,
vcpu_count: int = DEFAULT_VCPU_COUNT,
mem_mib: int = DEFAULT_MEM_MIB,
ttl_seconds: int = DEFAULT_TTL_SECONDS,
network: bool = False,
allow_host_compat: bool = DEFAULT_ALLOW_HOST_COMPAT,
) -> dict[str, Any]:
"""Create an ephemeral VM record with environment and resource sizing."""
return self.create_vm(
environment=environment,
vcpu_count=vcpu_count,
mem_mib=mem_mib,
ttl_seconds=ttl_seconds,
network=network,
allow_host_compat=allow_host_compat,
)
if _enabled("vm_start"):
@server.tool()
async def vm_start(vm_id: str) -> dict[str, Any]:
"""Start a created VM and transition it into a command-ready state."""
return self.start_vm(vm_id)
if _enabled("vm_exec"):
@server.tool()
async def vm_exec(
vm_id: str,
command: str,
timeout_seconds: int = 30,
) -> dict[str, Any]:
"""Run one non-interactive command and auto-clean the VM."""
return self.exec_vm(vm_id, command=command, timeout_seconds=timeout_seconds)
if _enabled("vm_stop"):
@server.tool()
async def vm_stop(vm_id: str) -> dict[str, Any]:
"""Stop a running VM."""
return self.stop_vm(vm_id)
if _enabled("vm_delete"):
@server.tool()
async def vm_delete(vm_id: str) -> dict[str, Any]:
"""Delete a VM and its runtime artifacts."""
return self.delete_vm(vm_id)
if _enabled("vm_status"):
@server.tool()
async def vm_status(vm_id: str) -> dict[str, Any]:
"""Get the current state and metadata for a VM."""
return self.status_vm(vm_id)
if _enabled("vm_network_info"):
@server.tool()
async def vm_network_info(vm_id: str) -> dict[str, Any]:
"""Get the current network configuration assigned to a VM."""
return self.network_info_vm(vm_id)
if _enabled("vm_reap_expired"):
@server.tool()
async def vm_reap_expired() -> dict[str, Any]:
"""Delete VMs whose TTL has expired."""
return self.reap_expired()
if _enabled("workspace_create"):
if normalized_profile == "workspace-core":
@server.tool(name="workspace_create")
async def workspace_create_core(
environment: str,
vcpu_count: int = DEFAULT_VCPU_COUNT,
mem_mib: int = DEFAULT_MEM_MIB,
ttl_seconds: int = DEFAULT_TTL_SECONDS,
allow_host_compat: bool = DEFAULT_ALLOW_HOST_COMPAT,
seed_path: str | None = None,
name: str | None = None,
labels: dict[str, str] | None = None,
) -> dict[str, Any]:
"""Create and start a persistent workspace."""
return self.create_workspace(
environment=environment,
vcpu_count=vcpu_count,
mem_mib=mem_mib,
ttl_seconds=ttl_seconds,
network_policy=DEFAULT_WORKSPACE_NETWORK_POLICY,
allow_host_compat=allow_host_compat,
seed_path=seed_path,
secrets=None,
name=name,
labels=labels,
)
else:
@server.tool(name="workspace_create")
async def workspace_create_full(
environment: str,
vcpu_count: int = DEFAULT_VCPU_COUNT,
mem_mib: int = DEFAULT_MEM_MIB,
ttl_seconds: int = DEFAULT_TTL_SECONDS,
network_policy: str = DEFAULT_WORKSPACE_NETWORK_POLICY,
allow_host_compat: bool = DEFAULT_ALLOW_HOST_COMPAT,
seed_path: str | None = None,
secrets: list[dict[str, str]] | None = None,
name: str | None = None,
labels: dict[str, str] | None = None,
) -> dict[str, Any]:
"""Create and start a persistent workspace."""
return self.create_workspace(
environment=environment,
vcpu_count=vcpu_count,
mem_mib=mem_mib,
ttl_seconds=ttl_seconds,
network_policy=network_policy,
allow_host_compat=allow_host_compat,
seed_path=seed_path,
secrets=secrets,
name=name,
labels=labels,
)
if _enabled("workspace_list"):
@server.tool()
async def workspace_list() -> dict[str, Any]:
"""List persisted workspaces with summary metadata."""
return self.list_workspaces()
if _enabled("workspace_update"):
@server.tool()
async def workspace_update(
workspace_id: str,
name: str | None = None,
clear_name: bool = False,
labels: dict[str, str] | None = None,
clear_labels: list[str] | None = None,
) -> dict[str, Any]:
"""Update optional workspace name and labels."""
return self.update_workspace(
workspace_id,
name=name,
clear_name=clear_name,
labels=labels,
clear_labels=clear_labels,
)
if _enabled("workspace_exec"):
if normalized_profile == "workspace-core":
@server.tool(name="workspace_exec")
async def workspace_exec_core(
workspace_id: str,
command: str,
timeout_seconds: int = DEFAULT_TIMEOUT_SECONDS,
) -> dict[str, Any]:
"""Run one command inside an existing persistent workspace."""
return self.exec_workspace(
workspace_id,
command=command,
timeout_seconds=timeout_seconds,
secret_env=None,
)
else:
@server.tool(name="workspace_exec")
async def workspace_exec_full(
workspace_id: str,
command: str,
timeout_seconds: int = DEFAULT_TIMEOUT_SECONDS,
secret_env: dict[str, str] | None = None,
) -> dict[str, Any]:
"""Run one command inside an existing persistent workspace."""
return self.exec_workspace(
workspace_id,
command=command,
timeout_seconds=timeout_seconds,
secret_env=secret_env,
)
if _enabled("workspace_sync_push"):
@server.tool()
async def workspace_sync_push(
workspace_id: str,
source_path: str,
dest: str = "/workspace",
) -> dict[str, Any]:
"""Push host content into the persistent `/workspace` of a started workspace."""
return self.push_workspace_sync(workspace_id, source_path=source_path, dest=dest)
if _enabled("workspace_status"):
@server.tool()
async def workspace_status(workspace_id: str) -> dict[str, Any]:
"""Inspect workspace state and latest command metadata."""
return self.status_workspace(workspace_id)
if _enabled("workspace_stop"):
@server.tool()
async def workspace_stop(workspace_id: str) -> dict[str, Any]:
"""Stop one persistent workspace without resetting `/workspace`."""
return self.stop_workspace(workspace_id)
if _enabled("workspace_start"):
@server.tool()
async def workspace_start(workspace_id: str) -> dict[str, Any]:
"""Start one stopped persistent workspace without resetting `/workspace`."""
return self.start_workspace(workspace_id)
if _enabled("workspace_logs"):
@server.tool()
async def workspace_logs(workspace_id: str) -> dict[str, Any]:
"""Return persisted command history for one workspace."""
return self.logs_workspace(workspace_id)
if _enabled("workspace_export"):
@server.tool()
async def workspace_export(
workspace_id: str,
path: str,
output_path: str,
) -> dict[str, Any]:
"""Export one file or directory from `/workspace` back to the host."""
return self.export_workspace(workspace_id, path, output_path=output_path)
if _enabled("workspace_diff"):
@server.tool()
async def workspace_diff(workspace_id: str) -> dict[str, Any]:
"""Compare `/workspace` to the immutable create-time baseline."""
return self.diff_workspace(workspace_id)
if _enabled("workspace_file_list"):
@server.tool()
async def workspace_file_list(
workspace_id: str,
path: str = "/workspace",
recursive: bool = False,
) -> dict[str, Any]:
"""List metadata for files and directories under one live workspace path."""
return self.list_workspace_files(
workspace_id,
path=path,
recursive=recursive,
)
if _enabled("workspace_file_read"):
@server.tool()
async def workspace_file_read(
workspace_id: str,
path: str,
max_bytes: int = 65536,
) -> dict[str, Any]:
"""Read one regular text file from a live workspace path."""
return self.read_workspace_file(
workspace_id,
path,
max_bytes=max_bytes,
)
if _enabled("workspace_file_write"):
@server.tool()
async def workspace_file_write(
workspace_id: str,
path: str,
text: str,
) -> dict[str, Any]:
"""Create or replace one regular text file under `/workspace`."""
return self.write_workspace_file(
workspace_id,
path,
text=text,
)
if _enabled("workspace_patch_apply"):
@server.tool()
async def workspace_patch_apply(
workspace_id: str,
patch: str,
) -> dict[str, Any]:
"""Apply a unified text patch inside one live workspace."""
return self.apply_workspace_patch(
workspace_id,
patch=patch,
)
if _enabled("workspace_disk_export"):
@server.tool()
async def workspace_disk_export(
workspace_id: str,
output_path: str,
) -> dict[str, Any]:
"""Export the raw stopped workspace rootfs image to one host path."""
return self.export_workspace_disk(workspace_id, output_path=output_path)
if _enabled("workspace_disk_list"):
@server.tool()
async def workspace_disk_list(
workspace_id: str,
path: str = "/workspace",
recursive: bool = False,
) -> dict[str, Any]:
"""Inspect one stopped workspace rootfs path without booting the guest."""
return self.list_workspace_disk(
workspace_id,
path=path,
recursive=recursive,
)
if _enabled("workspace_disk_read"):
@server.tool()
async def workspace_disk_read(
workspace_id: str,
path: str,
max_bytes: int = 65536,
) -> dict[str, Any]:
"""Read one regular file from a stopped workspace rootfs offline."""
return self.read_workspace_disk(
workspace_id,
path,
max_bytes=max_bytes,
)
if _enabled("snapshot_create"):
@server.tool()
async def snapshot_create(workspace_id: str, snapshot_name: str) -> dict[str, Any]:
"""Create one named workspace snapshot from the current `/workspace` tree."""
return self.create_snapshot(workspace_id, snapshot_name)
if _enabled("snapshot_list"):
@server.tool()
async def snapshot_list(workspace_id: str) -> dict[str, Any]:
"""List the baseline plus named snapshots for one workspace."""
return self.list_snapshots(workspace_id)
if _enabled("snapshot_delete"):
@server.tool()
async def snapshot_delete(workspace_id: str, snapshot_name: str) -> dict[str, Any]:
"""Delete one named snapshot from a workspace."""
return self.delete_snapshot(workspace_id, snapshot_name)
if _enabled("workspace_reset"):
@server.tool()
async def workspace_reset(
workspace_id: str,
snapshot: str = "baseline",
) -> dict[str, Any]:
"""Recreate a workspace and restore `/workspace` from one snapshot."""
return self.reset_workspace(workspace_id, snapshot=snapshot)
if _enabled("shell_open"):
@server.tool()
async def shell_open(
workspace_id: str,
cwd: str = "/workspace",
cols: int = 120,
rows: int = 30,
secret_env: dict[str, str] | None = None,
) -> dict[str, Any]:
"""Open a persistent interactive shell inside one workspace."""
return self.open_shell(
workspace_id,
cwd=cwd,
cols=cols,
rows=rows,
secret_env=secret_env,
)
if _enabled("shell_read"):
@server.tool()
async def shell_read(
workspace_id: str,
shell_id: str,
cursor: int = 0,
max_chars: int = 65536,
) -> dict[str, Any]:
"""Read merged PTY output from a workspace shell."""
return self.read_shell(
workspace_id,
shell_id,
cursor=cursor,
max_chars=max_chars,
)
if _enabled("shell_write"):
@server.tool()
async def shell_write(
workspace_id: str,
shell_id: str,
input: str,
append_newline: bool = True,
) -> dict[str, Any]:
"""Write text input to a persistent workspace shell."""
return self.write_shell(
workspace_id,
shell_id,
input=input,
append_newline=append_newline,
)
if _enabled("shell_signal"):
@server.tool()
async def shell_signal(
workspace_id: str,
shell_id: str,
signal_name: str = "INT",
) -> dict[str, Any]:
"""Send a signal to the shell process group."""
return self.signal_shell(
workspace_id,
shell_id,
signal_name=signal_name,
)
if _enabled("shell_close"):
@server.tool()
async def shell_close(workspace_id: str, shell_id: str) -> dict[str, Any]:
"""Close a persistent workspace shell."""
return self.close_shell(workspace_id, shell_id)
if _enabled("service_start"):
@server.tool()
async def service_start(
workspace_id: str,
service_name: str,
command: str,
cwd: str = "/workspace",
ready_file: str | None = None,
ready_tcp: str | None = None,
ready_http: str | None = None,
ready_command: str | None = None,
ready_timeout_seconds: int = 30,
ready_interval_ms: int = 500,
secret_env: dict[str, str] | None = None,
published_ports: list[dict[str, int | None]] | None = None,
) -> dict[str, Any]:
"""Start a named long-running service inside a workspace."""
readiness: dict[str, Any] | None = None
if ready_file is not None:
readiness = {"type": "file", "path": ready_file}
elif ready_tcp is not None:
readiness = {"type": "tcp", "address": ready_tcp}
elif ready_http is not None:
readiness = {"type": "http", "url": ready_http}
elif ready_command is not None:
readiness = {"type": "command", "command": ready_command}
return self.start_service(
workspace_id,
service_name,
command=command,
cwd=cwd,
readiness=readiness,
ready_timeout_seconds=ready_timeout_seconds,
ready_interval_ms=ready_interval_ms,
secret_env=secret_env,
published_ports=published_ports,
)
if _enabled("service_list"):
@server.tool()
async def service_list(workspace_id: str) -> dict[str, Any]:
"""List named services in one workspace."""
return self.list_services(workspace_id)
if _enabled("service_status"):
@server.tool()
async def service_status(workspace_id: str, service_name: str) -> dict[str, Any]:
"""Inspect one named workspace service."""
return self.status_service(workspace_id, service_name)
if _enabled("service_logs"):
@server.tool()
async def service_logs(
workspace_id: str,
service_name: str,
tail_lines: int = 200,
all: bool = False,
) -> dict[str, Any]:
"""Read persisted stdout/stderr for one workspace service."""
return self.logs_service(
workspace_id,
service_name,
tail_lines=tail_lines,
all=all,
)
if _enabled("service_stop"):
@server.tool()
async def service_stop(workspace_id: str, service_name: str) -> dict[str, Any]:
"""Stop one running service in a workspace."""
return self.stop_service(workspace_id, service_name)
if _enabled("workspace_delete"):
@server.tool()
async def workspace_delete(workspace_id: str) -> dict[str, Any]:
"""Delete a persistent workspace and its backing sandbox."""
return self.delete_workspace(workspace_id)
return server