Revert "Add interactive edit mode with floating popup"

This reverts commit 99f07aef82.
This commit is contained in:
Thales Maciel 2026-02-26 16:16:41 -03:00
parent 99f07aef82
commit e5d709a393
10 changed files with 46 additions and 1045 deletions

View file

@ -67,10 +67,7 @@ Create `~/.config/aman/config.json` (or let `aman` create it automatically on fi
```json
{
"daemon": {
"hotkey": "Cmd+m",
"edit_hotkey": "Cmd+Shift+m"
},
"daemon": { "hotkey": "Cmd+m" },
"recording": { "input": "0" },
"stt": { "model": "base", "device": "cpu" },
"injection": {
@ -94,7 +91,6 @@ Hotkey notes:
- Use one key plus optional modifiers (for example `Cmd+m`, `Super+m`, `Ctrl+space`).
- `Super` and `Cmd` are equivalent aliases for the same modifier.
- `daemon.hotkey` and `daemon.edit_hotkey` must be different.
AI cleanup is always enabled and uses the locked local Llama-3.2-3B GGUF model
downloaded to `~/.cache/aman/models/` during daemon initialization.
@ -131,21 +127,8 @@ systemctl --user enable --now aman
- Press the hotkey once to start recording.
- Press it again to stop and run STT.
- Press `Esc` while recording to cancel without processing.
- `Esc` is only captured globally while dictation recording is active.
- Transcript contents are logged only when `-v/--verbose` is used.
Edit mode:
- Copy text to clipboard and press `daemon.edit_hotkey`.
- Aman opens an editable popup with the clipboard snapshot and immediately starts recording an instruction.
- If clipboard is empty, the popup opens with empty text so the first instruction can create content.
- Press `daemon.edit_hotkey` again to stop recording and apply the instruction.
- Repeat to iterate with more voice instructions.
- Press `Enter` to close the popup and inject the final text.
- Press `Ctrl+C` to copy final text to clipboard and close the popup (no injection).
- Press `Esc` to cancel the edit session completely.
- While edit mode is open, the normal dictation hotkey is ignored.
Wayland note:
- Running under Wayland currently exits with a message explaining that it is not supported yet.

View file

@ -1,7 +1,6 @@
{
"daemon": {
"hotkey": "Cmd+m",
"edit_hotkey": "Cmd+Shift+m"
"hotkey": "Cmd+m"
},
"recording": {
"input": ""
@ -36,5 +35,8 @@
"Kubernetes",
"PostgreSQL"
]
},
"domain_inference": {
"enabled": true
}
}

View file

@ -36,20 +36,6 @@ SYSTEM_PROMPT = (
" - transcript=\"let's ask Bob, I mean Janice, let's ask Janice\" -> {\"cleaned_text\":\"let's ask Janice\"}\n"
)
EDIT_SYSTEM_PROMPT = (
"You are an amanuensis editor working for a user.\n"
"You'll receive JSON with the current text and spoken editing instructions.\n"
"Rewrite the full text according to those instructions.\n\n"
"Rules:\n"
"- Apply the latest instruction while honoring prior instruction history.\n"
"- Keep unchanged portions intact unless instructions request broader changes.\n"
"- Do not invent facts or context.\n"
"- If a dictionary section exists, apply only the listed corrections.\n"
"- Keep dictionary spellings exactly as provided.\n"
"- Return ONLY valid JSON in this shape: {\"cleaned_text\": \"...\"}\n"
"- Do not wrap with markdown, tags, or extra keys.\n"
)
class LlamaProcessor:
def __init__(self, verbose: bool = False):
@ -83,33 +69,9 @@ class LlamaProcessor:
if cleaned_dictionary:
request_payload["dictionary"] = cleaned_dictionary
return self._run_prompt(SYSTEM_PROMPT, request_payload)
def process_edit(
self,
current_text: str,
latest_instruction: str,
instruction_history: list[str],
lang: str = "en",
*,
dictionary_context: str = "",
) -> str:
request_payload: dict[str, Any] = {
"language": lang,
"current_text": current_text,
"latest_instruction": latest_instruction,
"instruction_history": instruction_history,
}
cleaned_dictionary = dictionary_context.strip()
if cleaned_dictionary:
request_payload["dictionary"] = cleaned_dictionary
return self._run_prompt(EDIT_SYSTEM_PROMPT, request_payload)
def _run_prompt(self, system_prompt: str, request_payload: dict[str, Any]) -> str:
kwargs: dict[str, Any] = {
"messages": [
{"role": "system", "content": system_prompt},
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": json.dumps(request_payload, ensure_ascii=False)},
],
"temperature": 0.0,

396
src/aman.py Normal file → Executable file
View file

@ -29,19 +29,8 @@ class State:
STT = "stt"
PROCESSING = "processing"
OUTPUTTING = "outputting"
EDIT_IDLE = "edit_idle"
EDIT_RECORDING = "edit_recording"
EDIT_STT = "edit_stt"
EDIT_PROCESSING = "edit_processing"
EDIT_STATES = {
State.EDIT_IDLE,
State.EDIT_RECORDING,
State.EDIT_STT,
State.EDIT_PROCESSING,
}
_LOCK_HANDLE = None
@ -71,22 +60,12 @@ class Daemon:
self.cfg = cfg
self.desktop = desktop
self.verbose = verbose
self.lock = threading.RLock()
self.lock = threading.Lock()
self._shutdown_requested = threading.Event()
self.state = State.IDLE
self.stream = None
self.record = None
self.timer: threading.Timer | None = None
self.edit_stream = None
self.edit_record = None
self.edit_timer: threading.Timer | None = None
self.edit_active = False
self.edit_text = ""
self.edit_instruction_history: list[str] = []
self.edit_session_token = 0
self.model = _build_whisper_model(
cfg.stt.model,
cfg.stt.device,
@ -98,18 +77,6 @@ class Daemon:
self.vocabulary = VocabularyEngine(cfg.vocabulary)
self._stt_hint_kwargs_cache: dict[str, Any] | None = None
def _arm_cancel_listener_for_recording(self):
try:
self.desktop.start_cancel_listener(lambda: self.cancel_recording())
except Exception as exc:
logging.error("failed to arm cancel listener: %s", exc)
def _disarm_cancel_listener_for_recording(self):
try:
self.desktop.stop_cancel_listener()
except Exception as exc:
logging.debug("failed to disarm cancel listener: %s", exc)
def set_state(self, state: str):
with self.lock:
prev = self.state
@ -132,9 +99,6 @@ class Daemon:
if self._shutdown_requested.is_set():
logging.info("shutdown in progress, trigger ignored")
return
if self.edit_active:
logging.info("edit session active, dictate trigger ignored")
return
if self.state == State.IDLE:
self._start_recording_locked()
return
@ -145,60 +109,10 @@ class Daemon:
if should_stop:
self.stop_recording(trigger="user")
def toggle_edit(self):
action = ""
token = 0
with self.lock:
if self._shutdown_requested.is_set():
logging.info("shutdown in progress, edit trigger ignored")
return
if self.edit_active:
token = self.edit_session_token
if self.state == State.EDIT_IDLE:
action = "start_recording"
elif self.state == State.EDIT_RECORDING:
action = "stop_recording"
else:
logging.info("edit session busy (%s), trigger ignored", self.state)
return
else:
if self.state != State.IDLE:
logging.info("busy (%s), edit trigger ignored", self.state)
return
self.edit_active = True
self.edit_session_token += 1
token = self.edit_session_token
self.edit_instruction_history = []
self.edit_text = ""
self.set_state(State.EDIT_IDLE)
action = "open_session"
if action == "stop_recording":
self.stop_edit_recording(trigger="user")
return
if action == "start_recording":
self._start_edit_recording(token=token, trigger="user")
return
if action == "open_session":
self._open_edit_session(token)
def handle_cancel(self):
with self.lock:
edit_active = self.edit_active
state = self.state
if edit_active:
self.cancel_edit_session()
return
if state == State.RECORDING:
self.cancel_recording()
def _start_recording_locked(self):
if self.state != State.IDLE:
logging.info("busy (%s), trigger ignored", self.state)
return
if self.edit_active:
logging.info("edit session active, dictate trigger ignored")
return
try:
stream, record = start_audio_recording(self.cfg.recording.input)
except Exception as exc:
@ -206,8 +120,9 @@ class Daemon:
return
self.stream = stream
self.record = record
self.set_state(State.RECORDING)
self._arm_cancel_listener_for_recording()
prev = self.state
self.state = State.RECORDING
logging.debug("state: %s -> %s", prev, self.state)
logging.info("recording started")
if self.timer:
self.timer.cancel()
@ -235,12 +150,13 @@ class Daemon:
if self.timer:
self.timer.cancel()
self.timer = None
self._disarm_cancel_listener_for_recording()
self.set_state(State.STT)
prev = self.state
self.state = State.STT
logging.debug("state: %s -> %s", prev, self.state)
if stream is None or record is None:
logging.warning("recording resources are unavailable during stop")
self.set_state(State.IDLE)
self.state = State.IDLE
return None
return stream, record
@ -338,292 +254,8 @@ class Daemon:
return
self.stop_recording(trigger="cancel", process_audio=False)
def _open_edit_session(self, token: int):
initial_text = ""
try:
initial_text = self.desktop.read_clipboard_text() or ""
except Exception as exc:
logging.error("failed reading clipboard for edit session: %s", exc)
with self.lock:
if not self._edit_session_is_active_locked(token):
return
self.edit_text = initial_text
try:
self.desktop.open_edit_popup(
initial_text,
on_submit=self.finalize_edit_session_inject,
on_copy=self.finalize_edit_session_copy,
on_cancel=self.cancel_edit_session,
)
self._safe_set_edit_popup_status("Recording instruction...")
except Exception as exc:
logging.error("failed opening edit popup: %s", exc)
self._close_edit_session(close_popup=False)
return
if not self._start_edit_recording(token=token, trigger="open"):
self._safe_set_edit_popup_status("Ready. Press edit hotkey to record.")
def _start_edit_recording(self, *, token: int, trigger: str) -> bool:
with self.lock:
if not self._edit_session_is_active_locked(token):
return False
if self.state != State.EDIT_IDLE:
logging.info("edit session busy (%s), start ignored", self.state)
return False
try:
stream, record = start_audio_recording(self.cfg.recording.input)
except Exception as exc:
logging.error("edit record start failed: %s", exc)
return False
self.edit_stream = stream
self.edit_record = record
if self.edit_timer:
self.edit_timer.cancel()
self.edit_timer = threading.Timer(RECORD_TIMEOUT_SEC, self._timeout_edit_stop)
self.edit_timer.daemon = True
self.edit_timer.start()
self.set_state(State.EDIT_RECORDING)
self._safe_set_edit_popup_status("Recording instruction...")
logging.info("edit recording started (%s)", trigger)
return True
def _timeout_edit_stop(self):
self.stop_edit_recording(trigger="timeout")
def stop_edit_recording(self, *, trigger: str = "user", process_audio: bool = True):
payload = None
token = 0
with self.lock:
if not self.edit_active or self.state != State.EDIT_RECORDING:
return
payload = (self.edit_stream, self.edit_record)
token = self.edit_session_token
self.edit_stream = None
self.edit_record = None
if self.edit_timer:
self.edit_timer.cancel()
self.edit_timer = None
self.set_state(State.EDIT_STT)
self._safe_set_edit_popup_status("Transcribing instruction...")
stream, record = payload
if stream is None or record is None:
logging.warning("edit recording resources are unavailable during stop")
with self.lock:
if self._edit_session_is_active_locked(token):
self.set_state(State.EDIT_IDLE)
self._safe_set_edit_popup_status("Ready. Press edit hotkey to record.")
return
threading.Thread(
target=self._edit_stop_and_process,
args=(stream, record, token, trigger, process_audio),
daemon=True,
).start()
def _edit_stop_and_process(
self,
stream: Any,
record: Any,
token: int,
trigger: str,
process_audio: bool,
):
logging.info("stopping edit recording (%s)", trigger)
try:
audio = stop_audio_recording(stream, record)
except Exception as exc:
logging.error("edit record stop failed: %s", exc)
with self.lock:
if self._edit_session_is_active_locked(token):
self.set_state(State.EDIT_IDLE)
self._safe_set_edit_popup_status("Failed to stop recording.")
return
if not process_audio or self._shutdown_requested.is_set():
with self.lock:
if self._edit_session_is_active_locked(token):
self.set_state(State.EDIT_IDLE)
self._safe_set_edit_popup_status("Ready. Press edit hotkey to record.")
return
if audio.size == 0:
logging.error("no audio captured for edit instruction")
with self.lock:
if self._edit_session_is_active_locked(token):
self.set_state(State.EDIT_IDLE)
self._safe_set_edit_popup_status("No audio captured. Record again.")
return
try:
instruction = self._transcribe(audio).strip()
except Exception as exc:
logging.error("edit stt failed: %s", exc)
with self.lock:
if self._edit_session_is_active_locked(token):
self.set_state(State.EDIT_IDLE)
self._safe_set_edit_popup_status("STT failed. Record again.")
return
if not instruction:
with self.lock:
if self._edit_session_is_active_locked(token):
self.set_state(State.EDIT_IDLE)
self._safe_set_edit_popup_status("No instruction heard. Record again.")
return
if self.log_transcript:
logging.debug("edit instruction: %s", instruction)
else:
logging.info("edit instruction length: %d", len(instruction))
with self.lock:
if not self._edit_session_is_active_locked(token):
return
self.edit_instruction_history.append(instruction)
instruction_history = list(self.edit_instruction_history)
self.set_state(State.EDIT_PROCESSING)
self._safe_set_edit_popup_status("Applying instruction...")
current_text = self._current_edit_text()
updated_text = current_text
try:
ai_text = self._get_ai_processor().process_edit(
current_text,
instruction,
instruction_history,
lang=STT_LANGUAGE,
dictionary_context=self.vocabulary.build_ai_dictionary_context(),
)
if ai_text and ai_text.strip():
updated_text = ai_text
except Exception as exc:
logging.error("edit process failed: %s", exc)
updated_text = self.vocabulary.apply_deterministic_replacements(updated_text).strip()
with self.lock:
if not self._edit_session_is_active_locked(token):
return
self.edit_text = updated_text
self.set_state(State.EDIT_IDLE)
self._safe_set_edit_popup_text(updated_text)
self._safe_set_edit_popup_status("Ready. Press edit hotkey to record.")
def _current_edit_text(self) -> str:
try:
text = self.desktop.get_edit_popup_text()
except Exception:
with self.lock:
return self.edit_text
with self.lock:
self.edit_text = text
return text
def finalize_edit_session_inject(self):
threading.Thread(target=self._finalize_edit_session_inject_worker, daemon=True).start()
def _finalize_edit_session_inject_worker(self):
text = self._current_edit_text()
self._close_edit_session(close_popup=True)
if self._shutdown_requested.is_set():
return
try:
self.desktop.restore_previous_focus()
except Exception as exc:
logging.warning("could not restore previous focus: %s", exc)
try:
self.set_state(State.OUTPUTTING)
self.desktop.inject_text(
text,
self.cfg.injection.backend,
remove_transcription_from_clipboard=(
self.cfg.injection.remove_transcription_from_clipboard
),
)
except Exception as exc:
logging.error("edit output failed: %s", exc)
finally:
self.set_state(State.IDLE)
def finalize_edit_session_copy(self):
threading.Thread(target=self._finalize_edit_session_copy_worker, daemon=True).start()
def _finalize_edit_session_copy_worker(self):
text = self._current_edit_text()
self._close_edit_session(close_popup=True)
try:
self.desktop.write_clipboard_text(text)
except Exception as exc:
logging.error("failed to copy edited text to clipboard: %s", exc)
def cancel_edit_session(self):
threading.Thread(target=self._cancel_edit_session_worker, daemon=True).start()
def _cancel_edit_session_worker(self):
self._close_edit_session(close_popup=True)
def _close_edit_session(self, *, close_popup: bool):
stream = None
record = None
with self.lock:
stream = self.edit_stream
record = self.edit_record
self.edit_stream = None
self.edit_record = None
if self.edit_timer:
self.edit_timer.cancel()
self.edit_timer = None
self.edit_active = False
self.edit_session_token += 1
self.edit_instruction_history = []
self.edit_text = ""
if self.state in EDIT_STATES:
self.set_state(State.IDLE)
if close_popup:
try:
self.desktop.close_edit_popup()
except Exception as exc:
logging.debug("failed closing edit popup: %s", exc)
if stream is not None and record is not None:
try:
stop_audio_recording(stream, record)
except Exception:
pass
def _edit_session_is_active_locked(self, token: int) -> bool:
return self.edit_active and self.edit_session_token == token
def _safe_set_edit_popup_status(self, status: str):
with self.lock:
if not self.edit_active:
return
try:
self.desktop.set_edit_popup_status(status)
except Exception as exc:
logging.debug("failed setting edit popup status: %s", exc)
def _safe_set_edit_popup_text(self, text: str):
with self.lock:
if not self.edit_active:
return
try:
self.desktop.set_edit_popup_text(text)
except Exception as exc:
logging.debug("failed setting edit popup text: %s", exc)
def shutdown(self, timeout: float = 5.0) -> bool:
self.request_shutdown()
self._disarm_cancel_listener_for_recording()
self._close_edit_session(close_popup=True)
self.stop_recording(trigger="shutdown", process_audio=False)
return self.wait_for_idle(timeout)
@ -735,7 +367,6 @@ def main():
_LOCK_HANDLE = _lock_single_instance()
logging.info("hotkey: %s", cfg.daemon.hotkey)
logging.info("edit hotkey: %s", cfg.daemon.edit_hotkey)
logging.info(
"config (%s):\n%s",
args.config or str(Path.home() / ".config" / "aman" / "config.json"),
@ -769,16 +400,9 @@ def main():
try:
desktop.start_hotkey_listener(
cfg.daemon.hotkey,
lambda: logging.info("dictate hotkey pressed (dry-run)")
if args.dry_run
else daemon.toggle(),
)
desktop.start_hotkey_listener(
cfg.daemon.edit_hotkey,
lambda: logging.info("edit hotkey pressed (dry-run)")
if args.dry_run
else daemon.toggle_edit(),
lambda: logging.info("hotkey pressed (dry-run)") if args.dry_run else daemon.toggle(),
)
desktop.start_cancel_listener(lambda: daemon.cancel_recording())
except Exception as exc:
logging.error("hotkey setup failed: %s", exc)
raise SystemExit(1)

View file

@ -10,7 +10,6 @@ from hotkey import split_hotkey
DEFAULT_HOTKEY = "Cmd+m"
DEFAULT_EDIT_HOTKEY = "Cmd+Shift+m"
DEFAULT_STT_MODEL = "base"
DEFAULT_STT_DEVICE = "cpu"
DEFAULT_INJECTION_BACKEND = "clipboard"
@ -21,7 +20,6 @@ WILDCARD_CHARS = set("*?[]{}")
@dataclass
class DaemonConfig:
hotkey: str = DEFAULT_HOTKEY
edit_hotkey: str = DEFAULT_EDIT_HOTKEY
@dataclass
@ -95,19 +93,6 @@ def validate(cfg: Config) -> None:
split_hotkey(hotkey)
except ValueError as exc:
raise ValueError(f"daemon.hotkey is invalid: {exc}") from exc
cfg.daemon.hotkey = hotkey
edit_hotkey = cfg.daemon.edit_hotkey.strip()
if not edit_hotkey:
raise ValueError("daemon.edit_hotkey cannot be empty")
try:
split_hotkey(edit_hotkey)
except ValueError as exc:
raise ValueError(f"daemon.edit_hotkey is invalid: {exc}") from exc
cfg.daemon.edit_hotkey = edit_hotkey
if hotkey.casefold() == edit_hotkey.casefold():
raise ValueError("daemon.hotkey and daemon.edit_hotkey must be different")
if isinstance(cfg.recording.input, bool):
raise ValueError("recording.input cannot be boolean")
@ -153,8 +138,6 @@ def _from_dict(data: dict[str, Any], cfg: Config) -> Config:
if "hotkey" in daemon:
cfg.daemon.hotkey = _as_nonempty_str(daemon["hotkey"], "daemon.hotkey")
if "edit_hotkey" in daemon:
cfg.daemon.edit_hotkey = _as_nonempty_str(daemon["edit_hotkey"], "daemon.edit_hotkey")
if "input" in recording:
cfg.recording.input = _as_recording_input(recording["input"])
if "model" in stt:

View file

@ -11,9 +11,6 @@ class DesktopAdapter(Protocol):
def start_cancel_listener(self, callback: Callable[[], None]) -> None:
raise NotImplementedError
def stop_cancel_listener(self) -> None:
raise NotImplementedError
def inject_text(
self,
text: str,
@ -23,37 +20,6 @@ class DesktopAdapter(Protocol):
) -> None:
raise NotImplementedError
def read_clipboard_text(self) -> str | None:
raise NotImplementedError
def write_clipboard_text(self, text: str) -> None:
raise NotImplementedError
def open_edit_popup(
self,
initial_text: str,
*,
on_submit: Callable[[], None],
on_copy: Callable[[], None],
on_cancel: Callable[[], None],
) -> None:
raise NotImplementedError
def close_edit_popup(self) -> None:
raise NotImplementedError
def get_edit_popup_text(self) -> str:
raise NotImplementedError
def set_edit_popup_text(self, text: str) -> None:
raise NotImplementedError
def set_edit_popup_status(self, status: str) -> None:
raise NotImplementedError
def restore_previous_focus(self) -> bool:
raise NotImplementedError
def run_tray(self, state_getter: Callable[[], str], on_quit: Callable[[], None]) -> None:
raise NotImplementedError

View file

@ -10,9 +10,6 @@ class WaylandAdapter:
def start_cancel_listener(self, _callback: Callable[[], None]) -> None:
raise SystemExit("Wayland hotkeys are not supported yet.")
def stop_cancel_listener(self) -> None:
raise SystemExit("Wayland hotkeys are not supported yet.")
def inject_text(
self,
_text: str,
@ -23,38 +20,6 @@ class WaylandAdapter:
_ = remove_transcription_from_clipboard
raise SystemExit("Wayland text injection is not supported yet.")
def read_clipboard_text(self) -> str | None:
raise SystemExit("Wayland clipboard access is not supported yet.")
def write_clipboard_text(self, _text: str) -> None:
raise SystemExit("Wayland clipboard access is not supported yet.")
def open_edit_popup(
self,
_initial_text: str,
*,
on_submit: Callable[[], None],
on_copy: Callable[[], None],
on_cancel: Callable[[], None],
) -> None:
_ = (on_submit, on_copy, on_cancel)
raise SystemExit("Wayland edit popup is not supported yet.")
def close_edit_popup(self) -> None:
raise SystemExit("Wayland edit popup is not supported yet.")
def get_edit_popup_text(self) -> str:
raise SystemExit("Wayland edit popup is not supported yet.")
def set_edit_popup_text(self, _text: str) -> None:
raise SystemExit("Wayland edit popup is not supported yet.")
def set_edit_popup_status(self, _status: str) -> None:
raise SystemExit("Wayland edit popup is not supported yet.")
def restore_previous_focus(self) -> bool:
raise SystemExit("Wayland focus restoration is not supported yet.")
def run_tray(self, _state_getter: Callable[[], str], _on_quit: Callable[[], None]) -> None:
raise SystemExit("Wayland tray support is not available yet.")

View file

@ -4,7 +4,7 @@ import logging
import threading
import time
import warnings
from typing import Any, Callable, Iterable
from typing import Callable, Iterable
import gi
from Xlib import X, XK, display
@ -42,15 +42,6 @@ class X11Adapter:
self.indicator = None
self.status_icon = None
self.menu = None
self._edit_window = None
self._edit_text_view = None
self._edit_text_buffer = None
self._edit_status_label = None
self._edit_callbacks: dict[str, Callable[[], None]] = {}
self._edit_previous_focus_window_id: int | None = None
self._cancel_listener_lock = threading.Lock()
self._cancel_listener_stop_event: threading.Event | None = None
self._cancel_listener_callback: Callable[[], None] | None = None
if AppIndicator3 is not None:
self.indicator = AppIndicator3.Indicator.new(
"aman",
@ -81,36 +72,9 @@ class X11Adapter:
def start_cancel_listener(self, callback: Callable[[], None]) -> None:
mods, keysym = self._parse_hotkey("Escape")
with self._cancel_listener_lock:
if self._cancel_listener_stop_event is not None:
self._cancel_listener_callback = callback
return
self._cancel_listener_callback = callback
stop_event = threading.Event()
self._cancel_listener_stop_event = stop_event
thread = threading.Thread(
target=self._listen,
args=(mods, keysym, self._dispatch_cancel_listener, stop_event),
daemon=True,
)
thread = threading.Thread(target=self._listen, args=(mods, keysym, callback), daemon=True)
thread.start()
def stop_cancel_listener(self) -> None:
stop_event = None
with self._cancel_listener_lock:
stop_event = self._cancel_listener_stop_event
self._cancel_listener_stop_event = None
self._cancel_listener_callback = None
if stop_event is not None:
stop_event.set()
def _dispatch_cancel_listener(self) -> None:
callback = None
with self._cancel_listener_lock:
callback = self._cancel_listener_callback
if callback is not None:
callback()
def inject_text(
self,
text: str,
@ -122,240 +86,24 @@ class X11Adapter:
if backend == "clipboard":
previous_clipboard = None
if remove_transcription_from_clipboard:
previous_clipboard = self.read_clipboard_text()
self.write_clipboard_text(text)
previous_clipboard = self._read_clipboard_text()
self._write_clipboard(text)
self._paste_clipboard()
if remove_transcription_from_clipboard:
time.sleep(CLIPBOARD_RESTORE_DELAY_SEC)
self._set_clipboard_text(previous_clipboard or "")
self._restore_clipboard_text(previous_clipboard)
return
if backend == "injection":
self._type_text(text)
return
raise ValueError(f"unknown injection backend: {backend}")
def read_clipboard_text(self) -> str | None:
return self._run_on_ui_thread(self._read_clipboard_text_ui)
def write_clipboard_text(self, text: str) -> None:
self._run_on_ui_thread(lambda: self._set_clipboard_text(text))
def open_edit_popup(
self,
initial_text: str,
*,
on_submit: Callable[[], None],
on_copy: Callable[[], None],
on_cancel: Callable[[], None],
) -> None:
self._run_on_ui_thread(
lambda: self._open_edit_popup_ui(
initial_text,
on_submit=on_submit,
on_copy=on_copy,
on_cancel=on_cancel,
)
)
def close_edit_popup(self) -> None:
self._run_on_ui_thread(self._close_edit_popup_ui)
def get_edit_popup_text(self) -> str:
return self._run_on_ui_thread(self._get_edit_popup_text_ui)
def set_edit_popup_text(self, text: str) -> None:
self._run_on_ui_thread(lambda: self._set_edit_popup_text_ui(text))
def set_edit_popup_status(self, status: str) -> None:
self._run_on_ui_thread(lambda: self._set_edit_popup_status_ui(status))
def restore_previous_focus(self) -> bool:
window_id = self._edit_previous_focus_window_id
if window_id is None:
return False
try:
dpy = display.Display()
window = dpy.create_resource_object("window", window_id)
window.set_input_focus(X.RevertToParent, X.CurrentTime)
dpy.sync()
dpy.close()
return True
except Exception as exc:
logging.warning("focus restore failed: %s", exc)
return False
def _open_edit_popup_ui(
self,
initial_text: str,
*,
on_submit: Callable[[], None],
on_copy: Callable[[], None],
on_cancel: Callable[[], None],
) -> None:
if self._edit_window is not None:
raise RuntimeError("edit popup is already open")
self._edit_previous_focus_window_id = self._current_focus_window_id()
self._edit_callbacks = {
"submit": on_submit,
"copy": on_copy,
"cancel": on_cancel,
}
window = Gtk.Window(type=Gtk.WindowType.TOPLEVEL)
window.set_title("Aman Editor")
window.set_default_size(900, 520)
window.set_position(Gtk.WindowPosition.CENTER)
window.set_type_hint(Gdk.WindowTypeHint.UTILITY)
window.set_skip_taskbar_hint(True)
window.set_skip_pager_hint(True)
window.set_keep_above(True)
window.connect("key-press-event", self._on_edit_key_press)
window.connect("delete-event", self._on_edit_delete_event)
container = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=8)
container.set_border_width(12)
window.add(container)
status_label = Gtk.Label(label="Recording...")
status_label.set_xalign(0.0)
container.pack_start(status_label, False, False, 0)
scrolled = Gtk.ScrolledWindow()
scrolled.set_hexpand(True)
scrolled.set_vexpand(True)
container.pack_start(scrolled, True, True, 0)
text_view = Gtk.TextView()
text_view.set_wrap_mode(Gtk.WrapMode.WORD_CHAR)
text_view.connect("key-press-event", self._on_edit_key_press)
scrolled.add(text_view)
text_buffer = text_view.get_buffer()
text_buffer.set_text(initial_text or "")
self._edit_window = window
self._edit_text_view = text_view
self._edit_text_buffer = text_buffer
self._edit_status_label = status_label
window.show_all()
text_view.grab_focus()
window.present()
def _on_edit_delete_event(self, _widget, _event):
self._invoke_edit_callback("cancel")
return True
def _on_edit_key_press(self, _widget, event):
key = event.keyval
state = event.state
is_ctrl = bool(state & Gdk.ModifierType.CONTROL_MASK)
if key == Gdk.KEY_Escape:
self._invoke_edit_callback("cancel")
return True
if is_ctrl and key in (Gdk.KEY_c, Gdk.KEY_C):
self._invoke_edit_callback("copy")
return True
if key in (Gdk.KEY_Return, Gdk.KEY_KP_Enter):
self._invoke_edit_callback("submit")
return True
return False
def _invoke_edit_callback(self, name: str) -> None:
callback = self._edit_callbacks.get(name)
if callback is None:
return
try:
callback()
except Exception as exc:
logging.error("edit popup callback failed (%s): %s", name, exc)
def _close_edit_popup_ui(self) -> None:
if self._edit_window is not None:
try:
self._edit_window.destroy()
except Exception:
pass
self._edit_window = None
self._edit_text_view = None
self._edit_text_buffer = None
self._edit_status_label = None
self._edit_callbacks = {}
def _get_edit_popup_text_ui(self) -> str:
buffer = self._edit_text_buffer
if buffer is None:
return ""
start = buffer.get_start_iter()
end = buffer.get_end_iter()
return buffer.get_text(start, end, True)
def _set_edit_popup_text_ui(self, text: str) -> None:
buffer = self._edit_text_buffer
if buffer is None:
return
buffer.set_text(text or "")
def _set_edit_popup_status_ui(self, status: str) -> None:
label = self._edit_status_label
if label is None:
return
label.set_text(status or "")
def _current_focus_window_id(self) -> int | None:
try:
dpy = display.Display()
focused = dpy.get_input_focus().focus
window_id = getattr(focused, "id", None)
dpy.close()
if isinstance(window_id, int) and window_id > 0:
return window_id
return None
except Exception:
return None
def _run_on_ui_thread(self, fn: Callable[[], Any]) -> Any:
if threading.current_thread() is threading.main_thread():
return fn()
done = threading.Event()
result: dict[str, Any] = {}
def runner():
try:
result["value"] = fn()
except Exception as exc:
result["error"] = exc
finally:
done.set()
return False
GLib.idle_add(runner)
done.wait()
error = result.get("error")
if error is not None:
raise error
return result.get("value")
def _read_clipboard_text_ui(self) -> str | None:
def _read_clipboard_text(self) -> str | None:
Gtk.init([])
clipboard = Gtk.Clipboard.get(Gdk.SELECTION_CLIPBOARD)
text = clipboard.wait_for_text()
return str(text) if text is not None else None
def _set_clipboard_text(self, text: str) -> None:
Gtk.init([])
clipboard = Gtk.Clipboard.get(Gdk.SELECTION_CLIPBOARD)
clipboard.set_text(text, -1)
clipboard.store()
while Gtk.events_pending():
Gtk.main_iteration()
def run_tray(self, state_getter: Callable[[], str], on_quit: Callable[[], None]) -> None:
self.menu = Gtk.Menu()
quit_item = Gtk.MenuItem(label="Quit")
@ -378,14 +126,7 @@ class X11Adapter:
finally:
self.request_quit()
def _listen(
self,
mods: int,
keysym: int,
callback: Callable[[], None],
stop_event: threading.Event | None = None,
) -> None:
local_stop = stop_event or threading.Event()
def _listen(self, mods: int, keysym: int, callback: Callable[[], None]) -> None:
disp = None
root = None
keycode = None
@ -393,18 +134,14 @@ class X11Adapter:
disp = display.Display()
root = disp.screen().root
keycode = self._grab_hotkey(disp, root, mods, keysym)
while not local_stop.is_set():
if disp.pending_events() == 0:
time.sleep(0.05)
continue
while True:
ev = disp.next_event()
if ev.type == X.KeyPress and ev.detail == keycode:
state = ev.state & ~(X.LockMask | X.Mod2Mask)
if state == mods:
callback()
except Exception as exc:
if not local_stop.is_set():
logging.error("hotkey listener stopped: %s", exc)
logging.error("hotkey listener stopped: %s", exc)
finally:
if root is not None and keycode is not None and disp is not None:
try:
@ -412,11 +149,6 @@ class X11Adapter:
disp.sync()
except Exception:
pass
if disp is not None:
try:
disp.close()
except Exception:
pass
def _parse_hotkey(self, hotkey: str):
mods = 0
@ -463,6 +195,22 @@ class X11Adapter:
disp.sync()
return keycode
def _write_clipboard(self, text: str) -> None:
Gtk.init([])
clipboard = Gtk.Clipboard.get(Gdk.SELECTION_CLIPBOARD)
clipboard.set_text(text, -1)
clipboard.store()
while Gtk.events_pending():
Gtk.main_iteration()
def _restore_clipboard_text(self, text: str | None) -> None:
Gtk.init([])
clipboard = Gtk.Clipboard.get(Gdk.SELECTION_CLIPBOARD)
clipboard.set_text(text or "", -1)
clipboard.store()
while Gtk.events_pending():
Gtk.main_iteration()
def _paste_clipboard(self) -> None:
dpy = display.Display()
self._send_combo(dpy, ["Control_L", "Shift_L", "v"])
@ -513,11 +261,11 @@ class X11Adapter:
return (keysym if keysym != 0 else None, False)
def _icon_path(self, state: str) -> str:
if state in ("recording", "edit_recording"):
if state == "recording":
return str(ASSETS_DIR / "recording.png")
if state in ("stt", "edit_stt"):
if state == "stt":
return str(ASSETS_DIR / "stt.png")
if state in ("processing", "outputting", "edit_processing"):
if state == "processing":
return str(ASSETS_DIR / "processing.png")
return str(ASSETS_DIR / "idle.png")
@ -528,16 +276,6 @@ class X11Adapter:
return "STT"
if state == "processing":
return "AI Processing"
if state == "outputting":
return "Outputting"
if state == "edit_recording":
return "Editing: Recording"
if state == "edit_stt":
return "Editing: STT"
if state == "edit_processing":
return "Editing: Processing"
if state == "edit_idle":
return "Editing"
return "Idle"
def _update_tray(self, state_getter: Callable[[], str]):

View file

@ -1,7 +1,6 @@
import os
import sys
import tempfile
import time
import unittest
from pathlib import Path
from unittest.mock import patch
@ -19,15 +18,6 @@ class FakeDesktop:
def __init__(self):
self.inject_calls = []
self.quit_calls = 0
self.clipboard_text = ""
self.popup_open = False
self.popup_text = ""
self.popup_statuses = []
self.popup_callbacks = {}
self.popup_close_calls = 0
self.focus_restore_calls = 0
self.cancel_listener_active = False
self.cancel_listener_callback = None
def inject_text(
self,
@ -38,53 +28,6 @@ class FakeDesktop:
) -> None:
self.inject_calls.append((text, backend, remove_transcription_from_clipboard))
def read_clipboard_text(self) -> str | None:
return self.clipboard_text
def write_clipboard_text(self, text: str) -> None:
self.clipboard_text = text
def open_edit_popup(
self,
initial_text: str,
*,
on_submit,
on_copy,
on_cancel,
) -> None:
self.popup_open = True
self.popup_text = initial_text
self.popup_callbacks = {
"submit": on_submit,
"copy": on_copy,
"cancel": on_cancel,
}
def close_edit_popup(self) -> None:
self.popup_open = False
self.popup_close_calls += 1
def get_edit_popup_text(self) -> str:
return self.popup_text
def set_edit_popup_text(self, text: str) -> None:
self.popup_text = text
def set_edit_popup_status(self, status: str) -> None:
self.popup_statuses.append(status)
def restore_previous_focus(self) -> bool:
self.focus_restore_calls += 1
return True
def start_cancel_listener(self, callback) -> None:
self.cancel_listener_active = True
self.cancel_listener_callback = callback
def stop_cancel_listener(self) -> None:
self.cancel_listener_active = False
self.cancel_listener_callback = None
def request_quit(self) -> None:
self.quit_calls += 1
@ -130,30 +73,9 @@ class FakeHintModel:
class FakeAIProcessor:
def __init__(self):
self.edit_calls = []
def process(self, text, lang="en", **_kwargs):
return text
def process_edit(
self,
current_text,
latest_instruction,
instruction_history,
lang="en",
**_kwargs,
):
self.edit_calls.append(
{
"current_text": current_text,
"latest_instruction": latest_instruction,
"instruction_history": list(instruction_history),
"lang": lang,
}
)
return f"{current_text} [{latest_instruction}]"
class FakeAudio:
def __init__(self, size: int):
@ -179,14 +101,6 @@ class DaemonTests(unittest.TestCase):
):
return aman.Daemon(active_cfg, desktop, verbose=verbose)
def _wait_until(self, predicate, timeout: float = 1.0):
end = time.time() + timeout
while time.time() < end:
if predicate():
return True
time.sleep(0.01)
return predicate()
@patch("aman.stop_audio_recording", return_value=FakeAudio(8))
@patch("aman.start_audio_recording", return_value=(object(), object()))
def test_toggle_start_stop_injects_text(self, _start_mock, _stop_mock):
@ -325,120 +239,6 @@ class DaemonTests(unittest.TestCase):
any("DEBUG:root:state: idle -> recording" in line for line in logs.output)
)
@patch("aman.stop_audio_recording", return_value=FakeAudio(8))
@patch("aman.start_audio_recording", return_value=(object(), object()))
def test_escape_listener_is_only_armed_while_recording(self, _start_mock, _stop_mock):
desktop = FakeDesktop()
daemon = self._build_daemon(desktop, FakeModel(), verbose=False)
daemon._start_stop_worker = (
lambda stream, record, trigger, process_audio: daemon._stop_and_process(
stream, record, trigger, process_audio
)
)
self.assertFalse(desktop.cancel_listener_active)
daemon.toggle()
self.assertTrue(desktop.cancel_listener_active)
daemon.toggle()
self.assertFalse(desktop.cancel_listener_active)
@patch("aman.start_audio_recording", return_value=(object(), object()))
def test_edit_mode_opens_popup_and_starts_recording(self, _start_mock):
desktop = FakeDesktop()
desktop.clipboard_text = "Hello team"
daemon = self._build_daemon(desktop, FakeModel(text="make it funnier"), verbose=False)
daemon.toggle_edit()
self.assertTrue(desktop.popup_open)
self.assertEqual(desktop.popup_text, "Hello team")
self.assertEqual(daemon.get_state(), aman.State.EDIT_RECORDING)
@patch("aman.stop_audio_recording", return_value=FakeAudio(8))
@patch("aman.start_audio_recording", return_value=(object(), object()))
def test_edit_mode_instruction_updates_popup_text(self, _start_mock, _stop_mock):
desktop = FakeDesktop()
desktop.clipboard_text = "Hello team"
daemon = self._build_daemon(desktop, FakeModel(text="make it funnier"), verbose=False)
daemon.toggle_edit()
daemon.toggle_edit()
self.assertTrue(
self._wait_until(lambda: daemon.get_state() == aman.State.EDIT_IDLE),
"edit mode did not return to EDIT_IDLE",
)
self.assertEqual(desktop.popup_text, "Hello team [make it funnier]")
self.assertEqual(len(daemon.ai_processor.edit_calls), 1)
self.assertEqual(
daemon.ai_processor.edit_calls[0]["instruction_history"],
["make it funnier"],
)
@patch("aman.stop_audio_recording", return_value=FakeAudio(8))
@patch("aman.start_audio_recording", return_value=(object(), object()))
def test_enter_finalizes_and_injects(self, _start_mock, _stop_mock):
desktop = FakeDesktop()
desktop.clipboard_text = "Initial"
daemon = self._build_daemon(desktop, FakeModel(text="instruction"), verbose=False)
daemon.toggle_edit()
desktop.popup_text = "Final text"
daemon.finalize_edit_session_inject()
self.assertTrue(
self._wait_until(lambda: len(desktop.inject_calls) == 1),
"edit finalize did not inject text",
)
self.assertFalse(desktop.popup_open)
self.assertEqual(desktop.inject_calls[0], ("Final text", "clipboard", False))
self.assertEqual(desktop.focus_restore_calls, 1)
@patch("aman.stop_audio_recording", return_value=FakeAudio(8))
@patch("aman.start_audio_recording", return_value=(object(), object()))
def test_ctrl_c_copies_and_closes_without_inject(self, _start_mock, _stop_mock):
desktop = FakeDesktop()
desktop.clipboard_text = "Initial"
daemon = self._build_daemon(desktop, FakeModel(text="instruction"), verbose=False)
daemon.toggle_edit()
desktop.popup_text = "Copied text"
daemon.finalize_edit_session_copy()
self.assertTrue(
self._wait_until(lambda: not desktop.popup_open),
"edit popup did not close after copy",
)
self.assertEqual(desktop.clipboard_text, "Copied text")
self.assertEqual(desktop.inject_calls, [])
@patch("aman.start_audio_recording", return_value=(object(), object()))
def test_normal_hotkey_ignored_while_edit_session_active(self, _start_mock):
desktop = FakeDesktop()
desktop.clipboard_text = "Initial"
daemon = self._build_daemon(desktop, FakeModel(text="instruction"), verbose=False)
daemon.toggle_edit()
daemon.toggle()
self.assertEqual(daemon.get_state(), aman.State.EDIT_RECORDING)
@patch("aman.stop_audio_recording", return_value=FakeAudio(8))
@patch("aman.start_audio_recording", return_value=(object(), object()))
def test_handle_cancel_closes_edit_session(self, _start_mock, _stop_mock):
desktop = FakeDesktop()
desktop.clipboard_text = "Initial"
daemon = self._build_daemon(desktop, FakeModel(text="instruction"), verbose=False)
daemon.toggle_edit()
daemon.handle_cancel()
self.assertTrue(
self._wait_until(lambda: daemon.get_state() == aman.State.IDLE),
"edit cancel did not reach idle state",
)
self.assertFalse(desktop.popup_open)
class LockTests(unittest.TestCase):
def test_lock_rejects_second_instance(self):

View file

@ -19,7 +19,6 @@ class ConfigTests(unittest.TestCase):
cfg = load(str(missing))
self.assertEqual(cfg.daemon.hotkey, "Cmd+m")
self.assertEqual(cfg.daemon.edit_hotkey, "Cmd+Shift+m")
self.assertEqual(cfg.recording.input, "")
self.assertEqual(cfg.stt.model, "base")
self.assertEqual(cfg.stt.device, "cpu")
@ -34,7 +33,7 @@ class ConfigTests(unittest.TestCase):
def test_loads_nested_config(self):
payload = {
"daemon": {"hotkey": "Ctrl+space", "edit_hotkey": "Ctrl+Shift+space"},
"daemon": {"hotkey": "Ctrl+space"},
"recording": {"input": 3},
"stt": {"model": "small", "device": "cuda"},
"injection": {
@ -56,7 +55,6 @@ class ConfigTests(unittest.TestCase):
cfg = load(str(path))
self.assertEqual(cfg.daemon.hotkey, "Ctrl+space")
self.assertEqual(cfg.daemon.edit_hotkey, "Ctrl+Shift+space")
self.assertEqual(cfg.recording.input, 3)
self.assertEqual(cfg.stt.model, "small")
self.assertEqual(cfg.stt.device, "cuda")
@ -68,7 +66,7 @@ class ConfigTests(unittest.TestCase):
self.assertEqual(cfg.vocabulary.terms, ["Systemd", "Kubernetes"])
def test_super_modifier_hotkey_is_valid(self):
payload = {"daemon": {"hotkey": "Super+m", "edit_hotkey": "Super+Shift+m"}}
payload = {"daemon": {"hotkey": "Super+m"}}
with tempfile.TemporaryDirectory() as td:
path = Path(td) / "config.json"
path.write_text(json.dumps(payload), encoding="utf-8")
@ -76,7 +74,6 @@ class ConfigTests(unittest.TestCase):
cfg = load(str(path))
self.assertEqual(cfg.daemon.hotkey, "Super+m")
self.assertEqual(cfg.daemon.edit_hotkey, "Super+Shift+m")
def test_invalid_hotkey_missing_key_raises(self):
payload = {"daemon": {"hotkey": "Ctrl+Alt"}}
@ -98,24 +95,6 @@ class ConfigTests(unittest.TestCase):
):
load(str(path))
def test_invalid_edit_hotkey_raises(self):
payload = {"daemon": {"edit_hotkey": "Ctrl+Alt"}}
with tempfile.TemporaryDirectory() as td:
path = Path(td) / "config.json"
path.write_text(json.dumps(payload), encoding="utf-8")
with self.assertRaisesRegex(ValueError, "daemon.edit_hotkey is invalid: missing key"):
load(str(path))
def test_equal_hotkeys_raise(self):
payload = {"daemon": {"hotkey": "Cmd+m", "edit_hotkey": "Cmd+m"}}
with tempfile.TemporaryDirectory() as td:
path = Path(td) / "config.json"
path.write_text(json.dumps(payload), encoding="utf-8")
with self.assertRaisesRegex(ValueError, "must be different"):
load(str(path))
def test_invalid_injection_backend_raises(self):
payload = {"injection": {"backend": "invalid"}}
with tempfile.TemporaryDirectory() as td:
@ -147,7 +126,6 @@ class ConfigTests(unittest.TestCase):
cfg = load(str(path))
self.assertEqual(cfg.daemon.hotkey, "Cmd+m")
self.assertEqual(cfg.daemon.edit_hotkey, "Cmd+Shift+m")
self.assertEqual(cfg.injection.backend, "clipboard")
def test_conflicting_replacements_raise(self):