diff --git a/README.md b/README.md index d0dcc13..7209358 100644 --- a/README.md +++ b/README.md @@ -67,7 +67,10 @@ Create `~/.config/aman/config.json` (or let `aman` create it automatically on fi ```json { - "daemon": { "hotkey": "Cmd+m" }, + "daemon": { + "hotkey": "Cmd+m", + "edit_hotkey": "Cmd+Shift+m" + }, "recording": { "input": "0" }, "stt": { "model": "base", "device": "cpu" }, "injection": { @@ -91,6 +94,7 @@ Hotkey notes: - Use one key plus optional modifiers (for example `Cmd+m`, `Super+m`, `Ctrl+space`). - `Super` and `Cmd` are equivalent aliases for the same modifier. +- `daemon.hotkey` and `daemon.edit_hotkey` must be different. AI cleanup is always enabled and uses the locked local Llama-3.2-3B GGUF model downloaded to `~/.cache/aman/models/` during daemon initialization. @@ -127,8 +131,21 @@ systemctl --user enable --now aman - Press the hotkey once to start recording. - Press it again to stop and run STT. - Press `Esc` while recording to cancel without processing. +- `Esc` is only captured globally while dictation recording is active. - Transcript contents are logged only when `-v/--verbose` is used. +Edit mode: + +- Copy text to clipboard and press `daemon.edit_hotkey`. +- Aman opens an editable popup with the clipboard snapshot and immediately starts recording an instruction. +- If clipboard is empty, the popup opens with empty text so the first instruction can create content. +- Press `daemon.edit_hotkey` again to stop recording and apply the instruction. +- Repeat to iterate with more voice instructions. +- Press `Enter` to close the popup and inject the final text. +- Press `Ctrl+C` to copy final text to clipboard and close the popup (no injection). +- Press `Esc` to cancel the edit session completely. +- While edit mode is open, the normal dictation hotkey is ignored. + Wayland note: - Running under Wayland currently exits with a message explaining that it is not supported yet. diff --git a/config.example.json b/config.example.json index b72cfc9..9689318 100644 --- a/config.example.json +++ b/config.example.json @@ -1,6 +1,7 @@ { "daemon": { - "hotkey": "Cmd+m" + "hotkey": "Cmd+m", + "edit_hotkey": "Cmd+Shift+m" }, "recording": { "input": "" @@ -35,8 +36,5 @@ "Kubernetes", "PostgreSQL" ] - }, - "domain_inference": { - "enabled": true } } diff --git a/src/aiprocess.py b/src/aiprocess.py index 11a92ba..2409527 100644 --- a/src/aiprocess.py +++ b/src/aiprocess.py @@ -36,6 +36,20 @@ SYSTEM_PROMPT = ( " - transcript=\"let's ask Bob, I mean Janice, let's ask Janice\" -> {\"cleaned_text\":\"let's ask Janice\"}\n" ) +EDIT_SYSTEM_PROMPT = ( + "You are an amanuensis editor working for a user.\n" + "You'll receive JSON with the current text and spoken editing instructions.\n" + "Rewrite the full text according to those instructions.\n\n" + "Rules:\n" + "- Apply the latest instruction while honoring prior instruction history.\n" + "- Keep unchanged portions intact unless instructions request broader changes.\n" + "- Do not invent facts or context.\n" + "- If a dictionary section exists, apply only the listed corrections.\n" + "- Keep dictionary spellings exactly as provided.\n" + "- Return ONLY valid JSON in this shape: {\"cleaned_text\": \"...\"}\n" + "- Do not wrap with markdown, tags, or extra keys.\n" +) + class LlamaProcessor: def __init__(self, verbose: bool = False): @@ -69,9 +83,33 @@ class LlamaProcessor: if cleaned_dictionary: request_payload["dictionary"] = cleaned_dictionary + return self._run_prompt(SYSTEM_PROMPT, request_payload) + + def process_edit( + self, + current_text: str, + latest_instruction: str, + instruction_history: list[str], + lang: str = "en", + *, + dictionary_context: str = "", + ) -> str: + request_payload: dict[str, Any] = { + "language": lang, + "current_text": current_text, + "latest_instruction": latest_instruction, + "instruction_history": instruction_history, + } + cleaned_dictionary = dictionary_context.strip() + if cleaned_dictionary: + request_payload["dictionary"] = cleaned_dictionary + + return self._run_prompt(EDIT_SYSTEM_PROMPT, request_payload) + + def _run_prompt(self, system_prompt: str, request_payload: dict[str, Any]) -> str: kwargs: dict[str, Any] = { "messages": [ - {"role": "system", "content": SYSTEM_PROMPT}, + {"role": "system", "content": system_prompt}, {"role": "user", "content": json.dumps(request_payload, ensure_ascii=False)}, ], "temperature": 0.0, diff --git a/src/aman.py b/src/aman.py old mode 100755 new mode 100644 index 3c14542..7c47fe0 --- a/src/aman.py +++ b/src/aman.py @@ -29,8 +29,19 @@ class State: STT = "stt" PROCESSING = "processing" OUTPUTTING = "outputting" + EDIT_IDLE = "edit_idle" + EDIT_RECORDING = "edit_recording" + EDIT_STT = "edit_stt" + EDIT_PROCESSING = "edit_processing" +EDIT_STATES = { + State.EDIT_IDLE, + State.EDIT_RECORDING, + State.EDIT_STT, + State.EDIT_PROCESSING, +} + _LOCK_HANDLE = None @@ -60,12 +71,22 @@ class Daemon: self.cfg = cfg self.desktop = desktop self.verbose = verbose - self.lock = threading.Lock() + self.lock = threading.RLock() self._shutdown_requested = threading.Event() self.state = State.IDLE + self.stream = None self.record = None self.timer: threading.Timer | None = None + + self.edit_stream = None + self.edit_record = None + self.edit_timer: threading.Timer | None = None + self.edit_active = False + self.edit_text = "" + self.edit_instruction_history: list[str] = [] + self.edit_session_token = 0 + self.model = _build_whisper_model( cfg.stt.model, cfg.stt.device, @@ -77,6 +98,18 @@ class Daemon: self.vocabulary = VocabularyEngine(cfg.vocabulary) self._stt_hint_kwargs_cache: dict[str, Any] | None = None + def _arm_cancel_listener_for_recording(self): + try: + self.desktop.start_cancel_listener(lambda: self.cancel_recording()) + except Exception as exc: + logging.error("failed to arm cancel listener: %s", exc) + + def _disarm_cancel_listener_for_recording(self): + try: + self.desktop.stop_cancel_listener() + except Exception as exc: + logging.debug("failed to disarm cancel listener: %s", exc) + def set_state(self, state: str): with self.lock: prev = self.state @@ -99,6 +132,9 @@ class Daemon: if self._shutdown_requested.is_set(): logging.info("shutdown in progress, trigger ignored") return + if self.edit_active: + logging.info("edit session active, dictate trigger ignored") + return if self.state == State.IDLE: self._start_recording_locked() return @@ -109,10 +145,60 @@ class Daemon: if should_stop: self.stop_recording(trigger="user") + def toggle_edit(self): + action = "" + token = 0 + with self.lock: + if self._shutdown_requested.is_set(): + logging.info("shutdown in progress, edit trigger ignored") + return + if self.edit_active: + token = self.edit_session_token + if self.state == State.EDIT_IDLE: + action = "start_recording" + elif self.state == State.EDIT_RECORDING: + action = "stop_recording" + else: + logging.info("edit session busy (%s), trigger ignored", self.state) + return + else: + if self.state != State.IDLE: + logging.info("busy (%s), edit trigger ignored", self.state) + return + self.edit_active = True + self.edit_session_token += 1 + token = self.edit_session_token + self.edit_instruction_history = [] + self.edit_text = "" + self.set_state(State.EDIT_IDLE) + action = "open_session" + + if action == "stop_recording": + self.stop_edit_recording(trigger="user") + return + if action == "start_recording": + self._start_edit_recording(token=token, trigger="user") + return + if action == "open_session": + self._open_edit_session(token) + + def handle_cancel(self): + with self.lock: + edit_active = self.edit_active + state = self.state + if edit_active: + self.cancel_edit_session() + return + if state == State.RECORDING: + self.cancel_recording() + def _start_recording_locked(self): if self.state != State.IDLE: logging.info("busy (%s), trigger ignored", self.state) return + if self.edit_active: + logging.info("edit session active, dictate trigger ignored") + return try: stream, record = start_audio_recording(self.cfg.recording.input) except Exception as exc: @@ -120,9 +206,8 @@ class Daemon: return self.stream = stream self.record = record - prev = self.state - self.state = State.RECORDING - logging.debug("state: %s -> %s", prev, self.state) + self.set_state(State.RECORDING) + self._arm_cancel_listener_for_recording() logging.info("recording started") if self.timer: self.timer.cancel() @@ -150,13 +235,12 @@ class Daemon: if self.timer: self.timer.cancel() self.timer = None - prev = self.state - self.state = State.STT - logging.debug("state: %s -> %s", prev, self.state) + self._disarm_cancel_listener_for_recording() + self.set_state(State.STT) if stream is None or record is None: logging.warning("recording resources are unavailable during stop") - self.state = State.IDLE + self.set_state(State.IDLE) return None return stream, record @@ -254,8 +338,292 @@ class Daemon: return self.stop_recording(trigger="cancel", process_audio=False) + def _open_edit_session(self, token: int): + initial_text = "" + try: + initial_text = self.desktop.read_clipboard_text() or "" + except Exception as exc: + logging.error("failed reading clipboard for edit session: %s", exc) + with self.lock: + if not self._edit_session_is_active_locked(token): + return + self.edit_text = initial_text + + try: + self.desktop.open_edit_popup( + initial_text, + on_submit=self.finalize_edit_session_inject, + on_copy=self.finalize_edit_session_copy, + on_cancel=self.cancel_edit_session, + ) + self._safe_set_edit_popup_status("Recording instruction...") + except Exception as exc: + logging.error("failed opening edit popup: %s", exc) + self._close_edit_session(close_popup=False) + return + + if not self._start_edit_recording(token=token, trigger="open"): + self._safe_set_edit_popup_status("Ready. Press edit hotkey to record.") + + def _start_edit_recording(self, *, token: int, trigger: str) -> bool: + with self.lock: + if not self._edit_session_is_active_locked(token): + return False + if self.state != State.EDIT_IDLE: + logging.info("edit session busy (%s), start ignored", self.state) + return False + try: + stream, record = start_audio_recording(self.cfg.recording.input) + except Exception as exc: + logging.error("edit record start failed: %s", exc) + return False + self.edit_stream = stream + self.edit_record = record + if self.edit_timer: + self.edit_timer.cancel() + self.edit_timer = threading.Timer(RECORD_TIMEOUT_SEC, self._timeout_edit_stop) + self.edit_timer.daemon = True + self.edit_timer.start() + self.set_state(State.EDIT_RECORDING) + self._safe_set_edit_popup_status("Recording instruction...") + logging.info("edit recording started (%s)", trigger) + return True + + def _timeout_edit_stop(self): + self.stop_edit_recording(trigger="timeout") + + def stop_edit_recording(self, *, trigger: str = "user", process_audio: bool = True): + payload = None + token = 0 + with self.lock: + if not self.edit_active or self.state != State.EDIT_RECORDING: + return + payload = (self.edit_stream, self.edit_record) + token = self.edit_session_token + self.edit_stream = None + self.edit_record = None + if self.edit_timer: + self.edit_timer.cancel() + self.edit_timer = None + self.set_state(State.EDIT_STT) + self._safe_set_edit_popup_status("Transcribing instruction...") + + stream, record = payload + if stream is None or record is None: + logging.warning("edit recording resources are unavailable during stop") + with self.lock: + if self._edit_session_is_active_locked(token): + self.set_state(State.EDIT_IDLE) + self._safe_set_edit_popup_status("Ready. Press edit hotkey to record.") + return + + threading.Thread( + target=self._edit_stop_and_process, + args=(stream, record, token, trigger, process_audio), + daemon=True, + ).start() + + def _edit_stop_and_process( + self, + stream: Any, + record: Any, + token: int, + trigger: str, + process_audio: bool, + ): + logging.info("stopping edit recording (%s)", trigger) + try: + audio = stop_audio_recording(stream, record) + except Exception as exc: + logging.error("edit record stop failed: %s", exc) + with self.lock: + if self._edit_session_is_active_locked(token): + self.set_state(State.EDIT_IDLE) + self._safe_set_edit_popup_status("Failed to stop recording.") + return + + if not process_audio or self._shutdown_requested.is_set(): + with self.lock: + if self._edit_session_is_active_locked(token): + self.set_state(State.EDIT_IDLE) + self._safe_set_edit_popup_status("Ready. Press edit hotkey to record.") + return + + if audio.size == 0: + logging.error("no audio captured for edit instruction") + with self.lock: + if self._edit_session_is_active_locked(token): + self.set_state(State.EDIT_IDLE) + self._safe_set_edit_popup_status("No audio captured. Record again.") + return + + try: + instruction = self._transcribe(audio).strip() + except Exception as exc: + logging.error("edit stt failed: %s", exc) + with self.lock: + if self._edit_session_is_active_locked(token): + self.set_state(State.EDIT_IDLE) + self._safe_set_edit_popup_status("STT failed. Record again.") + return + + if not instruction: + with self.lock: + if self._edit_session_is_active_locked(token): + self.set_state(State.EDIT_IDLE) + self._safe_set_edit_popup_status("No instruction heard. Record again.") + return + + if self.log_transcript: + logging.debug("edit instruction: %s", instruction) + else: + logging.info("edit instruction length: %d", len(instruction)) + + with self.lock: + if not self._edit_session_is_active_locked(token): + return + self.edit_instruction_history.append(instruction) + instruction_history = list(self.edit_instruction_history) + self.set_state(State.EDIT_PROCESSING) + self._safe_set_edit_popup_status("Applying instruction...") + + current_text = self._current_edit_text() + updated_text = current_text + try: + ai_text = self._get_ai_processor().process_edit( + current_text, + instruction, + instruction_history, + lang=STT_LANGUAGE, + dictionary_context=self.vocabulary.build_ai_dictionary_context(), + ) + if ai_text and ai_text.strip(): + updated_text = ai_text + except Exception as exc: + logging.error("edit process failed: %s", exc) + + updated_text = self.vocabulary.apply_deterministic_replacements(updated_text).strip() + + with self.lock: + if not self._edit_session_is_active_locked(token): + return + self.edit_text = updated_text + self.set_state(State.EDIT_IDLE) + self._safe_set_edit_popup_text(updated_text) + self._safe_set_edit_popup_status("Ready. Press edit hotkey to record.") + + def _current_edit_text(self) -> str: + try: + text = self.desktop.get_edit_popup_text() + except Exception: + with self.lock: + return self.edit_text + with self.lock: + self.edit_text = text + return text + + def finalize_edit_session_inject(self): + threading.Thread(target=self._finalize_edit_session_inject_worker, daemon=True).start() + + def _finalize_edit_session_inject_worker(self): + text = self._current_edit_text() + self._close_edit_session(close_popup=True) + + if self._shutdown_requested.is_set(): + return + + try: + self.desktop.restore_previous_focus() + except Exception as exc: + logging.warning("could not restore previous focus: %s", exc) + + try: + self.set_state(State.OUTPUTTING) + self.desktop.inject_text( + text, + self.cfg.injection.backend, + remove_transcription_from_clipboard=( + self.cfg.injection.remove_transcription_from_clipboard + ), + ) + except Exception as exc: + logging.error("edit output failed: %s", exc) + finally: + self.set_state(State.IDLE) + + def finalize_edit_session_copy(self): + threading.Thread(target=self._finalize_edit_session_copy_worker, daemon=True).start() + + def _finalize_edit_session_copy_worker(self): + text = self._current_edit_text() + self._close_edit_session(close_popup=True) + try: + self.desktop.write_clipboard_text(text) + except Exception as exc: + logging.error("failed to copy edited text to clipboard: %s", exc) + + def cancel_edit_session(self): + threading.Thread(target=self._cancel_edit_session_worker, daemon=True).start() + + def _cancel_edit_session_worker(self): + self._close_edit_session(close_popup=True) + + def _close_edit_session(self, *, close_popup: bool): + stream = None + record = None + with self.lock: + stream = self.edit_stream + record = self.edit_record + self.edit_stream = None + self.edit_record = None + if self.edit_timer: + self.edit_timer.cancel() + self.edit_timer = None + + self.edit_active = False + self.edit_session_token += 1 + self.edit_instruction_history = [] + self.edit_text = "" + if self.state in EDIT_STATES: + self.set_state(State.IDLE) + + if close_popup: + try: + self.desktop.close_edit_popup() + except Exception as exc: + logging.debug("failed closing edit popup: %s", exc) + + if stream is not None and record is not None: + try: + stop_audio_recording(stream, record) + except Exception: + pass + + def _edit_session_is_active_locked(self, token: int) -> bool: + return self.edit_active and self.edit_session_token == token + + def _safe_set_edit_popup_status(self, status: str): + with self.lock: + if not self.edit_active: + return + try: + self.desktop.set_edit_popup_status(status) + except Exception as exc: + logging.debug("failed setting edit popup status: %s", exc) + + def _safe_set_edit_popup_text(self, text: str): + with self.lock: + if not self.edit_active: + return + try: + self.desktop.set_edit_popup_text(text) + except Exception as exc: + logging.debug("failed setting edit popup text: %s", exc) + def shutdown(self, timeout: float = 5.0) -> bool: self.request_shutdown() + self._disarm_cancel_listener_for_recording() + self._close_edit_session(close_popup=True) self.stop_recording(trigger="shutdown", process_audio=False) return self.wait_for_idle(timeout) @@ -367,6 +735,7 @@ def main(): _LOCK_HANDLE = _lock_single_instance() logging.info("hotkey: %s", cfg.daemon.hotkey) + logging.info("edit hotkey: %s", cfg.daemon.edit_hotkey) logging.info( "config (%s):\n%s", args.config or str(Path.home() / ".config" / "aman" / "config.json"), @@ -400,9 +769,16 @@ def main(): try: desktop.start_hotkey_listener( cfg.daemon.hotkey, - lambda: logging.info("hotkey pressed (dry-run)") if args.dry_run else daemon.toggle(), + lambda: logging.info("dictate hotkey pressed (dry-run)") + if args.dry_run + else daemon.toggle(), + ) + desktop.start_hotkey_listener( + cfg.daemon.edit_hotkey, + lambda: logging.info("edit hotkey pressed (dry-run)") + if args.dry_run + else daemon.toggle_edit(), ) - desktop.start_cancel_listener(lambda: daemon.cancel_recording()) except Exception as exc: logging.error("hotkey setup failed: %s", exc) raise SystemExit(1) diff --git a/src/config.py b/src/config.py index ab4d30a..ed93cf9 100644 --- a/src/config.py +++ b/src/config.py @@ -10,6 +10,7 @@ from hotkey import split_hotkey DEFAULT_HOTKEY = "Cmd+m" +DEFAULT_EDIT_HOTKEY = "Cmd+Shift+m" DEFAULT_STT_MODEL = "base" DEFAULT_STT_DEVICE = "cpu" DEFAULT_INJECTION_BACKEND = "clipboard" @@ -20,6 +21,7 @@ WILDCARD_CHARS = set("*?[]{}") @dataclass class DaemonConfig: hotkey: str = DEFAULT_HOTKEY + edit_hotkey: str = DEFAULT_EDIT_HOTKEY @dataclass @@ -93,6 +95,19 @@ def validate(cfg: Config) -> None: split_hotkey(hotkey) except ValueError as exc: raise ValueError(f"daemon.hotkey is invalid: {exc}") from exc + cfg.daemon.hotkey = hotkey + + edit_hotkey = cfg.daemon.edit_hotkey.strip() + if not edit_hotkey: + raise ValueError("daemon.edit_hotkey cannot be empty") + try: + split_hotkey(edit_hotkey) + except ValueError as exc: + raise ValueError(f"daemon.edit_hotkey is invalid: {exc}") from exc + cfg.daemon.edit_hotkey = edit_hotkey + + if hotkey.casefold() == edit_hotkey.casefold(): + raise ValueError("daemon.hotkey and daemon.edit_hotkey must be different") if isinstance(cfg.recording.input, bool): raise ValueError("recording.input cannot be boolean") @@ -138,6 +153,8 @@ def _from_dict(data: dict[str, Any], cfg: Config) -> Config: if "hotkey" in daemon: cfg.daemon.hotkey = _as_nonempty_str(daemon["hotkey"], "daemon.hotkey") + if "edit_hotkey" in daemon: + cfg.daemon.edit_hotkey = _as_nonempty_str(daemon["edit_hotkey"], "daemon.edit_hotkey") if "input" in recording: cfg.recording.input = _as_recording_input(recording["input"]) if "model" in stt: diff --git a/src/desktop.py b/src/desktop.py index 23ac5f0..3c4b782 100644 --- a/src/desktop.py +++ b/src/desktop.py @@ -11,6 +11,9 @@ class DesktopAdapter(Protocol): def start_cancel_listener(self, callback: Callable[[], None]) -> None: raise NotImplementedError + def stop_cancel_listener(self) -> None: + raise NotImplementedError + def inject_text( self, text: str, @@ -20,6 +23,37 @@ class DesktopAdapter(Protocol): ) -> None: raise NotImplementedError + def read_clipboard_text(self) -> str | None: + raise NotImplementedError + + def write_clipboard_text(self, text: str) -> None: + raise NotImplementedError + + def open_edit_popup( + self, + initial_text: str, + *, + on_submit: Callable[[], None], + on_copy: Callable[[], None], + on_cancel: Callable[[], None], + ) -> None: + raise NotImplementedError + + def close_edit_popup(self) -> None: + raise NotImplementedError + + def get_edit_popup_text(self) -> str: + raise NotImplementedError + + def set_edit_popup_text(self, text: str) -> None: + raise NotImplementedError + + def set_edit_popup_status(self, status: str) -> None: + raise NotImplementedError + + def restore_previous_focus(self) -> bool: + raise NotImplementedError + def run_tray(self, state_getter: Callable[[], str], on_quit: Callable[[], None]) -> None: raise NotImplementedError diff --git a/src/desktop_wayland.py b/src/desktop_wayland.py index 1da88a8..c061b76 100644 --- a/src/desktop_wayland.py +++ b/src/desktop_wayland.py @@ -10,6 +10,9 @@ class WaylandAdapter: def start_cancel_listener(self, _callback: Callable[[], None]) -> None: raise SystemExit("Wayland hotkeys are not supported yet.") + def stop_cancel_listener(self) -> None: + raise SystemExit("Wayland hotkeys are not supported yet.") + def inject_text( self, _text: str, @@ -20,6 +23,38 @@ class WaylandAdapter: _ = remove_transcription_from_clipboard raise SystemExit("Wayland text injection is not supported yet.") + def read_clipboard_text(self) -> str | None: + raise SystemExit("Wayland clipboard access is not supported yet.") + + def write_clipboard_text(self, _text: str) -> None: + raise SystemExit("Wayland clipboard access is not supported yet.") + + def open_edit_popup( + self, + _initial_text: str, + *, + on_submit: Callable[[], None], + on_copy: Callable[[], None], + on_cancel: Callable[[], None], + ) -> None: + _ = (on_submit, on_copy, on_cancel) + raise SystemExit("Wayland edit popup is not supported yet.") + + def close_edit_popup(self) -> None: + raise SystemExit("Wayland edit popup is not supported yet.") + + def get_edit_popup_text(self) -> str: + raise SystemExit("Wayland edit popup is not supported yet.") + + def set_edit_popup_text(self, _text: str) -> None: + raise SystemExit("Wayland edit popup is not supported yet.") + + def set_edit_popup_status(self, _status: str) -> None: + raise SystemExit("Wayland edit popup is not supported yet.") + + def restore_previous_focus(self) -> bool: + raise SystemExit("Wayland focus restoration is not supported yet.") + def run_tray(self, _state_getter: Callable[[], str], _on_quit: Callable[[], None]) -> None: raise SystemExit("Wayland tray support is not available yet.") diff --git a/src/desktop_x11.py b/src/desktop_x11.py index 483ad58..d35f0da 100644 --- a/src/desktop_x11.py +++ b/src/desktop_x11.py @@ -4,7 +4,7 @@ import logging import threading import time import warnings -from typing import Callable, Iterable +from typing import Any, Callable, Iterable import gi from Xlib import X, XK, display @@ -42,6 +42,15 @@ class X11Adapter: self.indicator = None self.status_icon = None self.menu = None + self._edit_window = None + self._edit_text_view = None + self._edit_text_buffer = None + self._edit_status_label = None + self._edit_callbacks: dict[str, Callable[[], None]] = {} + self._edit_previous_focus_window_id: int | None = None + self._cancel_listener_lock = threading.Lock() + self._cancel_listener_stop_event: threading.Event | None = None + self._cancel_listener_callback: Callable[[], None] | None = None if AppIndicator3 is not None: self.indicator = AppIndicator3.Indicator.new( "aman", @@ -72,9 +81,36 @@ class X11Adapter: def start_cancel_listener(self, callback: Callable[[], None]) -> None: mods, keysym = self._parse_hotkey("Escape") - thread = threading.Thread(target=self._listen, args=(mods, keysym, callback), daemon=True) + with self._cancel_listener_lock: + if self._cancel_listener_stop_event is not None: + self._cancel_listener_callback = callback + return + self._cancel_listener_callback = callback + stop_event = threading.Event() + self._cancel_listener_stop_event = stop_event + thread = threading.Thread( + target=self._listen, + args=(mods, keysym, self._dispatch_cancel_listener, stop_event), + daemon=True, + ) thread.start() + def stop_cancel_listener(self) -> None: + stop_event = None + with self._cancel_listener_lock: + stop_event = self._cancel_listener_stop_event + self._cancel_listener_stop_event = None + self._cancel_listener_callback = None + if stop_event is not None: + stop_event.set() + + def _dispatch_cancel_listener(self) -> None: + callback = None + with self._cancel_listener_lock: + callback = self._cancel_listener_callback + if callback is not None: + callback() + def inject_text( self, text: str, @@ -86,24 +122,240 @@ class X11Adapter: if backend == "clipboard": previous_clipboard = None if remove_transcription_from_clipboard: - previous_clipboard = self._read_clipboard_text() - self._write_clipboard(text) + previous_clipboard = self.read_clipboard_text() + self.write_clipboard_text(text) self._paste_clipboard() if remove_transcription_from_clipboard: time.sleep(CLIPBOARD_RESTORE_DELAY_SEC) - self._restore_clipboard_text(previous_clipboard) + self._set_clipboard_text(previous_clipboard or "") return if backend == "injection": self._type_text(text) return raise ValueError(f"unknown injection backend: {backend}") - def _read_clipboard_text(self) -> str | None: + def read_clipboard_text(self) -> str | None: + return self._run_on_ui_thread(self._read_clipboard_text_ui) + + def write_clipboard_text(self, text: str) -> None: + self._run_on_ui_thread(lambda: self._set_clipboard_text(text)) + + def open_edit_popup( + self, + initial_text: str, + *, + on_submit: Callable[[], None], + on_copy: Callable[[], None], + on_cancel: Callable[[], None], + ) -> None: + self._run_on_ui_thread( + lambda: self._open_edit_popup_ui( + initial_text, + on_submit=on_submit, + on_copy=on_copy, + on_cancel=on_cancel, + ) + ) + + def close_edit_popup(self) -> None: + self._run_on_ui_thread(self._close_edit_popup_ui) + + def get_edit_popup_text(self) -> str: + return self._run_on_ui_thread(self._get_edit_popup_text_ui) + + def set_edit_popup_text(self, text: str) -> None: + self._run_on_ui_thread(lambda: self._set_edit_popup_text_ui(text)) + + def set_edit_popup_status(self, status: str) -> None: + self._run_on_ui_thread(lambda: self._set_edit_popup_status_ui(status)) + + def restore_previous_focus(self) -> bool: + window_id = self._edit_previous_focus_window_id + if window_id is None: + return False + try: + dpy = display.Display() + window = dpy.create_resource_object("window", window_id) + window.set_input_focus(X.RevertToParent, X.CurrentTime) + dpy.sync() + dpy.close() + return True + except Exception as exc: + logging.warning("focus restore failed: %s", exc) + return False + + def _open_edit_popup_ui( + self, + initial_text: str, + *, + on_submit: Callable[[], None], + on_copy: Callable[[], None], + on_cancel: Callable[[], None], + ) -> None: + if self._edit_window is not None: + raise RuntimeError("edit popup is already open") + + self._edit_previous_focus_window_id = self._current_focus_window_id() + self._edit_callbacks = { + "submit": on_submit, + "copy": on_copy, + "cancel": on_cancel, + } + + window = Gtk.Window(type=Gtk.WindowType.TOPLEVEL) + window.set_title("Aman Editor") + window.set_default_size(900, 520) + window.set_position(Gtk.WindowPosition.CENTER) + window.set_type_hint(Gdk.WindowTypeHint.UTILITY) + window.set_skip_taskbar_hint(True) + window.set_skip_pager_hint(True) + window.set_keep_above(True) + window.connect("key-press-event", self._on_edit_key_press) + window.connect("delete-event", self._on_edit_delete_event) + + container = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=8) + container.set_border_width(12) + window.add(container) + + status_label = Gtk.Label(label="Recording...") + status_label.set_xalign(0.0) + container.pack_start(status_label, False, False, 0) + + scrolled = Gtk.ScrolledWindow() + scrolled.set_hexpand(True) + scrolled.set_vexpand(True) + container.pack_start(scrolled, True, True, 0) + + text_view = Gtk.TextView() + text_view.set_wrap_mode(Gtk.WrapMode.WORD_CHAR) + text_view.connect("key-press-event", self._on_edit_key_press) + scrolled.add(text_view) + + text_buffer = text_view.get_buffer() + text_buffer.set_text(initial_text or "") + + self._edit_window = window + self._edit_text_view = text_view + self._edit_text_buffer = text_buffer + self._edit_status_label = status_label + + window.show_all() + text_view.grab_focus() + window.present() + + def _on_edit_delete_event(self, _widget, _event): + self._invoke_edit_callback("cancel") + return True + + def _on_edit_key_press(self, _widget, event): + key = event.keyval + state = event.state + is_ctrl = bool(state & Gdk.ModifierType.CONTROL_MASK) + + if key == Gdk.KEY_Escape: + self._invoke_edit_callback("cancel") + return True + + if is_ctrl and key in (Gdk.KEY_c, Gdk.KEY_C): + self._invoke_edit_callback("copy") + return True + + if key in (Gdk.KEY_Return, Gdk.KEY_KP_Enter): + self._invoke_edit_callback("submit") + return True + + return False + + def _invoke_edit_callback(self, name: str) -> None: + callback = self._edit_callbacks.get(name) + if callback is None: + return + try: + callback() + except Exception as exc: + logging.error("edit popup callback failed (%s): %s", name, exc) + + def _close_edit_popup_ui(self) -> None: + if self._edit_window is not None: + try: + self._edit_window.destroy() + except Exception: + pass + self._edit_window = None + self._edit_text_view = None + self._edit_text_buffer = None + self._edit_status_label = None + self._edit_callbacks = {} + + def _get_edit_popup_text_ui(self) -> str: + buffer = self._edit_text_buffer + if buffer is None: + return "" + start = buffer.get_start_iter() + end = buffer.get_end_iter() + return buffer.get_text(start, end, True) + + def _set_edit_popup_text_ui(self, text: str) -> None: + buffer = self._edit_text_buffer + if buffer is None: + return + buffer.set_text(text or "") + + def _set_edit_popup_status_ui(self, status: str) -> None: + label = self._edit_status_label + if label is None: + return + label.set_text(status or "") + + def _current_focus_window_id(self) -> int | None: + try: + dpy = display.Display() + focused = dpy.get_input_focus().focus + window_id = getattr(focused, "id", None) + dpy.close() + if isinstance(window_id, int) and window_id > 0: + return window_id + return None + except Exception: + return None + + def _run_on_ui_thread(self, fn: Callable[[], Any]) -> Any: + if threading.current_thread() is threading.main_thread(): + return fn() + + done = threading.Event() + result: dict[str, Any] = {} + + def runner(): + try: + result["value"] = fn() + except Exception as exc: + result["error"] = exc + finally: + done.set() + return False + + GLib.idle_add(runner) + done.wait() + error = result.get("error") + if error is not None: + raise error + return result.get("value") + + def _read_clipboard_text_ui(self) -> str | None: Gtk.init([]) clipboard = Gtk.Clipboard.get(Gdk.SELECTION_CLIPBOARD) text = clipboard.wait_for_text() return str(text) if text is not None else None + def _set_clipboard_text(self, text: str) -> None: + Gtk.init([]) + clipboard = Gtk.Clipboard.get(Gdk.SELECTION_CLIPBOARD) + clipboard.set_text(text, -1) + clipboard.store() + while Gtk.events_pending(): + Gtk.main_iteration() + def run_tray(self, state_getter: Callable[[], str], on_quit: Callable[[], None]) -> None: self.menu = Gtk.Menu() quit_item = Gtk.MenuItem(label="Quit") @@ -126,7 +378,14 @@ class X11Adapter: finally: self.request_quit() - def _listen(self, mods: int, keysym: int, callback: Callable[[], None]) -> None: + def _listen( + self, + mods: int, + keysym: int, + callback: Callable[[], None], + stop_event: threading.Event | None = None, + ) -> None: + local_stop = stop_event or threading.Event() disp = None root = None keycode = None @@ -134,14 +393,18 @@ class X11Adapter: disp = display.Display() root = disp.screen().root keycode = self._grab_hotkey(disp, root, mods, keysym) - while True: + while not local_stop.is_set(): + if disp.pending_events() == 0: + time.sleep(0.05) + continue ev = disp.next_event() if ev.type == X.KeyPress and ev.detail == keycode: state = ev.state & ~(X.LockMask | X.Mod2Mask) if state == mods: callback() except Exception as exc: - logging.error("hotkey listener stopped: %s", exc) + if not local_stop.is_set(): + logging.error("hotkey listener stopped: %s", exc) finally: if root is not None and keycode is not None and disp is not None: try: @@ -149,6 +412,11 @@ class X11Adapter: disp.sync() except Exception: pass + if disp is not None: + try: + disp.close() + except Exception: + pass def _parse_hotkey(self, hotkey: str): mods = 0 @@ -195,22 +463,6 @@ class X11Adapter: disp.sync() return keycode - def _write_clipboard(self, text: str) -> None: - Gtk.init([]) - clipboard = Gtk.Clipboard.get(Gdk.SELECTION_CLIPBOARD) - clipboard.set_text(text, -1) - clipboard.store() - while Gtk.events_pending(): - Gtk.main_iteration() - - def _restore_clipboard_text(self, text: str | None) -> None: - Gtk.init([]) - clipboard = Gtk.Clipboard.get(Gdk.SELECTION_CLIPBOARD) - clipboard.set_text(text or "", -1) - clipboard.store() - while Gtk.events_pending(): - Gtk.main_iteration() - def _paste_clipboard(self) -> None: dpy = display.Display() self._send_combo(dpy, ["Control_L", "Shift_L", "v"]) @@ -261,11 +513,11 @@ class X11Adapter: return (keysym if keysym != 0 else None, False) def _icon_path(self, state: str) -> str: - if state == "recording": + if state in ("recording", "edit_recording"): return str(ASSETS_DIR / "recording.png") - if state == "stt": + if state in ("stt", "edit_stt"): return str(ASSETS_DIR / "stt.png") - if state == "processing": + if state in ("processing", "outputting", "edit_processing"): return str(ASSETS_DIR / "processing.png") return str(ASSETS_DIR / "idle.png") @@ -276,6 +528,16 @@ class X11Adapter: return "STT" if state == "processing": return "AI Processing" + if state == "outputting": + return "Outputting" + if state == "edit_recording": + return "Editing: Recording" + if state == "edit_stt": + return "Editing: STT" + if state == "edit_processing": + return "Editing: Processing" + if state == "edit_idle": + return "Editing" return "Idle" def _update_tray(self, state_getter: Callable[[], str]): diff --git a/tests/test_aman.py b/tests/test_aman.py index d9a9867..cd33ad7 100644 --- a/tests/test_aman.py +++ b/tests/test_aman.py @@ -1,6 +1,7 @@ import os import sys import tempfile +import time import unittest from pathlib import Path from unittest.mock import patch @@ -18,6 +19,15 @@ class FakeDesktop: def __init__(self): self.inject_calls = [] self.quit_calls = 0 + self.clipboard_text = "" + self.popup_open = False + self.popup_text = "" + self.popup_statuses = [] + self.popup_callbacks = {} + self.popup_close_calls = 0 + self.focus_restore_calls = 0 + self.cancel_listener_active = False + self.cancel_listener_callback = None def inject_text( self, @@ -28,6 +38,53 @@ class FakeDesktop: ) -> None: self.inject_calls.append((text, backend, remove_transcription_from_clipboard)) + def read_clipboard_text(self) -> str | None: + return self.clipboard_text + + def write_clipboard_text(self, text: str) -> None: + self.clipboard_text = text + + def open_edit_popup( + self, + initial_text: str, + *, + on_submit, + on_copy, + on_cancel, + ) -> None: + self.popup_open = True + self.popup_text = initial_text + self.popup_callbacks = { + "submit": on_submit, + "copy": on_copy, + "cancel": on_cancel, + } + + def close_edit_popup(self) -> None: + self.popup_open = False + self.popup_close_calls += 1 + + def get_edit_popup_text(self) -> str: + return self.popup_text + + def set_edit_popup_text(self, text: str) -> None: + self.popup_text = text + + def set_edit_popup_status(self, status: str) -> None: + self.popup_statuses.append(status) + + def restore_previous_focus(self) -> bool: + self.focus_restore_calls += 1 + return True + + def start_cancel_listener(self, callback) -> None: + self.cancel_listener_active = True + self.cancel_listener_callback = callback + + def stop_cancel_listener(self) -> None: + self.cancel_listener_active = False + self.cancel_listener_callback = None + def request_quit(self) -> None: self.quit_calls += 1 @@ -73,9 +130,30 @@ class FakeHintModel: class FakeAIProcessor: + def __init__(self): + self.edit_calls = [] + def process(self, text, lang="en", **_kwargs): return text + def process_edit( + self, + current_text, + latest_instruction, + instruction_history, + lang="en", + **_kwargs, + ): + self.edit_calls.append( + { + "current_text": current_text, + "latest_instruction": latest_instruction, + "instruction_history": list(instruction_history), + "lang": lang, + } + ) + return f"{current_text} [{latest_instruction}]" + class FakeAudio: def __init__(self, size: int): @@ -101,6 +179,14 @@ class DaemonTests(unittest.TestCase): ): return aman.Daemon(active_cfg, desktop, verbose=verbose) + def _wait_until(self, predicate, timeout: float = 1.0): + end = time.time() + timeout + while time.time() < end: + if predicate(): + return True + time.sleep(0.01) + return predicate() + @patch("aman.stop_audio_recording", return_value=FakeAudio(8)) @patch("aman.start_audio_recording", return_value=(object(), object())) def test_toggle_start_stop_injects_text(self, _start_mock, _stop_mock): @@ -239,6 +325,120 @@ class DaemonTests(unittest.TestCase): any("DEBUG:root:state: idle -> recording" in line for line in logs.output) ) + @patch("aman.stop_audio_recording", return_value=FakeAudio(8)) + @patch("aman.start_audio_recording", return_value=(object(), object())) + def test_escape_listener_is_only_armed_while_recording(self, _start_mock, _stop_mock): + desktop = FakeDesktop() + daemon = self._build_daemon(desktop, FakeModel(), verbose=False) + daemon._start_stop_worker = ( + lambda stream, record, trigger, process_audio: daemon._stop_and_process( + stream, record, trigger, process_audio + ) + ) + + self.assertFalse(desktop.cancel_listener_active) + daemon.toggle() + self.assertTrue(desktop.cancel_listener_active) + daemon.toggle() + self.assertFalse(desktop.cancel_listener_active) + + @patch("aman.start_audio_recording", return_value=(object(), object())) + def test_edit_mode_opens_popup_and_starts_recording(self, _start_mock): + desktop = FakeDesktop() + desktop.clipboard_text = "Hello team" + daemon = self._build_daemon(desktop, FakeModel(text="make it funnier"), verbose=False) + + daemon.toggle_edit() + + self.assertTrue(desktop.popup_open) + self.assertEqual(desktop.popup_text, "Hello team") + self.assertEqual(daemon.get_state(), aman.State.EDIT_RECORDING) + + @patch("aman.stop_audio_recording", return_value=FakeAudio(8)) + @patch("aman.start_audio_recording", return_value=(object(), object())) + def test_edit_mode_instruction_updates_popup_text(self, _start_mock, _stop_mock): + desktop = FakeDesktop() + desktop.clipboard_text = "Hello team" + daemon = self._build_daemon(desktop, FakeModel(text="make it funnier"), verbose=False) + + daemon.toggle_edit() + daemon.toggle_edit() + + self.assertTrue( + self._wait_until(lambda: daemon.get_state() == aman.State.EDIT_IDLE), + "edit mode did not return to EDIT_IDLE", + ) + self.assertEqual(desktop.popup_text, "Hello team [make it funnier]") + self.assertEqual(len(daemon.ai_processor.edit_calls), 1) + self.assertEqual( + daemon.ai_processor.edit_calls[0]["instruction_history"], + ["make it funnier"], + ) + + @patch("aman.stop_audio_recording", return_value=FakeAudio(8)) + @patch("aman.start_audio_recording", return_value=(object(), object())) + def test_enter_finalizes_and_injects(self, _start_mock, _stop_mock): + desktop = FakeDesktop() + desktop.clipboard_text = "Initial" + daemon = self._build_daemon(desktop, FakeModel(text="instruction"), verbose=False) + + daemon.toggle_edit() + desktop.popup_text = "Final text" + daemon.finalize_edit_session_inject() + + self.assertTrue( + self._wait_until(lambda: len(desktop.inject_calls) == 1), + "edit finalize did not inject text", + ) + self.assertFalse(desktop.popup_open) + self.assertEqual(desktop.inject_calls[0], ("Final text", "clipboard", False)) + self.assertEqual(desktop.focus_restore_calls, 1) + + @patch("aman.stop_audio_recording", return_value=FakeAudio(8)) + @patch("aman.start_audio_recording", return_value=(object(), object())) + def test_ctrl_c_copies_and_closes_without_inject(self, _start_mock, _stop_mock): + desktop = FakeDesktop() + desktop.clipboard_text = "Initial" + daemon = self._build_daemon(desktop, FakeModel(text="instruction"), verbose=False) + + daemon.toggle_edit() + desktop.popup_text = "Copied text" + daemon.finalize_edit_session_copy() + + self.assertTrue( + self._wait_until(lambda: not desktop.popup_open), + "edit popup did not close after copy", + ) + self.assertEqual(desktop.clipboard_text, "Copied text") + self.assertEqual(desktop.inject_calls, []) + + @patch("aman.start_audio_recording", return_value=(object(), object())) + def test_normal_hotkey_ignored_while_edit_session_active(self, _start_mock): + desktop = FakeDesktop() + desktop.clipboard_text = "Initial" + daemon = self._build_daemon(desktop, FakeModel(text="instruction"), verbose=False) + + daemon.toggle_edit() + daemon.toggle() + + self.assertEqual(daemon.get_state(), aman.State.EDIT_RECORDING) + + @patch("aman.stop_audio_recording", return_value=FakeAudio(8)) + @patch("aman.start_audio_recording", return_value=(object(), object())) + def test_handle_cancel_closes_edit_session(self, _start_mock, _stop_mock): + desktop = FakeDesktop() + desktop.clipboard_text = "Initial" + daemon = self._build_daemon(desktop, FakeModel(text="instruction"), verbose=False) + + daemon.toggle_edit() + daemon.handle_cancel() + + self.assertTrue( + self._wait_until(lambda: daemon.get_state() == aman.State.IDLE), + "edit cancel did not reach idle state", + ) + self.assertFalse(desktop.popup_open) + class LockTests(unittest.TestCase): def test_lock_rejects_second_instance(self): diff --git a/tests/test_config.py b/tests/test_config.py index 1e6dd13..3c86413 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -19,6 +19,7 @@ class ConfigTests(unittest.TestCase): cfg = load(str(missing)) self.assertEqual(cfg.daemon.hotkey, "Cmd+m") + self.assertEqual(cfg.daemon.edit_hotkey, "Cmd+Shift+m") self.assertEqual(cfg.recording.input, "") self.assertEqual(cfg.stt.model, "base") self.assertEqual(cfg.stt.device, "cpu") @@ -33,7 +34,7 @@ class ConfigTests(unittest.TestCase): def test_loads_nested_config(self): payload = { - "daemon": {"hotkey": "Ctrl+space"}, + "daemon": {"hotkey": "Ctrl+space", "edit_hotkey": "Ctrl+Shift+space"}, "recording": {"input": 3}, "stt": {"model": "small", "device": "cuda"}, "injection": { @@ -55,6 +56,7 @@ class ConfigTests(unittest.TestCase): cfg = load(str(path)) self.assertEqual(cfg.daemon.hotkey, "Ctrl+space") + self.assertEqual(cfg.daemon.edit_hotkey, "Ctrl+Shift+space") self.assertEqual(cfg.recording.input, 3) self.assertEqual(cfg.stt.model, "small") self.assertEqual(cfg.stt.device, "cuda") @@ -66,7 +68,7 @@ class ConfigTests(unittest.TestCase): self.assertEqual(cfg.vocabulary.terms, ["Systemd", "Kubernetes"]) def test_super_modifier_hotkey_is_valid(self): - payload = {"daemon": {"hotkey": "Super+m"}} + payload = {"daemon": {"hotkey": "Super+m", "edit_hotkey": "Super+Shift+m"}} with tempfile.TemporaryDirectory() as td: path = Path(td) / "config.json" path.write_text(json.dumps(payload), encoding="utf-8") @@ -74,6 +76,7 @@ class ConfigTests(unittest.TestCase): cfg = load(str(path)) self.assertEqual(cfg.daemon.hotkey, "Super+m") + self.assertEqual(cfg.daemon.edit_hotkey, "Super+Shift+m") def test_invalid_hotkey_missing_key_raises(self): payload = {"daemon": {"hotkey": "Ctrl+Alt"}} @@ -95,6 +98,24 @@ class ConfigTests(unittest.TestCase): ): load(str(path)) + def test_invalid_edit_hotkey_raises(self): + payload = {"daemon": {"edit_hotkey": "Ctrl+Alt"}} + with tempfile.TemporaryDirectory() as td: + path = Path(td) / "config.json" + path.write_text(json.dumps(payload), encoding="utf-8") + + with self.assertRaisesRegex(ValueError, "daemon.edit_hotkey is invalid: missing key"): + load(str(path)) + + def test_equal_hotkeys_raise(self): + payload = {"daemon": {"hotkey": "Cmd+m", "edit_hotkey": "Cmd+m"}} + with tempfile.TemporaryDirectory() as td: + path = Path(td) / "config.json" + path.write_text(json.dumps(payload), encoding="utf-8") + + with self.assertRaisesRegex(ValueError, "must be different"): + load(str(path)) + def test_invalid_injection_backend_raises(self): payload = {"injection": {"backend": "invalid"}} with tempfile.TemporaryDirectory() as td: @@ -126,6 +147,7 @@ class ConfigTests(unittest.TestCase): cfg = load(str(path)) self.assertEqual(cfg.daemon.hotkey, "Cmd+m") + self.assertEqual(cfg.daemon.edit_hotkey, "Cmd+Shift+m") self.assertEqual(cfg.injection.backend, "clipboard") def test_conflicting_replacements_raise(self):