Make workspace shell reads usable as direct chat-model input without changing the PTY or cursor model. This adds optional plain rendering and idle-window batching across CLI, SDK, and MCP while keeping raw reads backward-compatible. Implement the rendering and wait-for-idle logic in the manager layer so the existing guest/backend shell transport stays unchanged. The new helper strips ANSI and other terminal control noise, handles carriage-return overwrite and backspace, and preserves raw cursor semantics even when plain output is requested. Refresh the stable shell docs/examples to recommend --plain --wait-for-idle-ms 300, mark the 3.5.0 roadmap milestone done, and bump the package/catalog version to 3.5.0. Validation: uv lock; UV_CACHE_DIR=.uv-cache make check; UV_CACHE_DIR=.uv-cache make dist-check; real guest-backed Firecracker smoke covering shell open/write/read with ANSI plus delayed output.
116 lines
3.5 KiB
Python
116 lines
3.5 KiB
Python
"""Helpers for chat-friendly workspace shell output rendering."""
|
|
|
|
from __future__ import annotations
|
|
|
|
|
|
def _apply_csi(
|
|
final: str,
|
|
parameters: str,
|
|
line: list[str],
|
|
cursor: int,
|
|
lines: list[str],
|
|
) -> tuple[list[str], int, list[str]]:
|
|
if final == "K":
|
|
mode = parameters or "0"
|
|
if mode in {"0", ""}:
|
|
del line[cursor:]
|
|
elif mode == "1":
|
|
for index in range(min(cursor, len(line))):
|
|
line[index] = " "
|
|
elif mode == "2":
|
|
line.clear()
|
|
cursor = 0
|
|
elif final == "J":
|
|
mode = parameters or "0"
|
|
if mode in {"2", "3"}:
|
|
lines.clear()
|
|
line.clear()
|
|
cursor = 0
|
|
return line, cursor, lines
|
|
|
|
|
|
def _consume_escape_sequence(
|
|
text: str,
|
|
index: int,
|
|
line: list[str],
|
|
cursor: int,
|
|
lines: list[str],
|
|
) -> tuple[int, list[str], int, list[str]]:
|
|
if index + 1 >= len(text):
|
|
return len(text), line, cursor, lines
|
|
leader = text[index + 1]
|
|
if leader == "[":
|
|
cursor_index = index + 2
|
|
while cursor_index < len(text):
|
|
char = text[cursor_index]
|
|
if "\x40" <= char <= "\x7e":
|
|
parameters = text[index + 2 : cursor_index]
|
|
line, cursor, lines = _apply_csi(char, parameters, line, cursor, lines)
|
|
return cursor_index + 1, line, cursor, lines
|
|
cursor_index += 1
|
|
return len(text), line, cursor, lines
|
|
if leader in {"]", "P", "_", "^"}:
|
|
cursor_index = index + 2
|
|
while cursor_index < len(text):
|
|
char = text[cursor_index]
|
|
if char == "\x07":
|
|
return cursor_index + 1, line, cursor, lines
|
|
if char == "\x1b" and cursor_index + 1 < len(text) and text[cursor_index + 1] == "\\":
|
|
return cursor_index + 2, line, cursor, lines
|
|
cursor_index += 1
|
|
return len(text), line, cursor, lines
|
|
if leader == "O":
|
|
return min(index + 3, len(text)), line, cursor, lines
|
|
return min(index + 2, len(text)), line, cursor, lines
|
|
|
|
|
|
def render_plain_shell_output(raw_text: str) -> str:
|
|
"""Render PTY output into chat-friendly plain text."""
|
|
lines: list[str] = []
|
|
line: list[str] = []
|
|
cursor = 0
|
|
ended_with_newline = False
|
|
index = 0
|
|
while index < len(raw_text):
|
|
char = raw_text[index]
|
|
if char == "\x1b":
|
|
index, line, cursor, lines = _consume_escape_sequence(
|
|
raw_text,
|
|
index,
|
|
line,
|
|
cursor,
|
|
lines,
|
|
)
|
|
ended_with_newline = False
|
|
continue
|
|
if char == "\r":
|
|
cursor = 0
|
|
ended_with_newline = False
|
|
index += 1
|
|
continue
|
|
if char == "\n":
|
|
lines.append("".join(line))
|
|
line = []
|
|
cursor = 0
|
|
ended_with_newline = True
|
|
index += 1
|
|
continue
|
|
if char == "\b":
|
|
if cursor > 0:
|
|
cursor -= 1
|
|
if cursor < len(line):
|
|
del line[cursor]
|
|
ended_with_newline = False
|
|
index += 1
|
|
continue
|
|
if char == "\t" or (ord(char) >= 32 and ord(char) != 127):
|
|
if cursor < len(line):
|
|
line[cursor] = char
|
|
else:
|
|
line.append(char)
|
|
cursor += 1
|
|
ended_with_newline = False
|
|
index += 1
|
|
if line or ended_with_newline:
|
|
lines.append("".join(line))
|
|
return "\n".join(lines)
|