diff --git a/README.md b/README.md index 7209358..d0dcc13 100644 --- a/README.md +++ b/README.md @@ -67,10 +67,7 @@ Create `~/.config/aman/config.json` (or let `aman` create it automatically on fi ```json { - "daemon": { - "hotkey": "Cmd+m", - "edit_hotkey": "Cmd+Shift+m" - }, + "daemon": { "hotkey": "Cmd+m" }, "recording": { "input": "0" }, "stt": { "model": "base", "device": "cpu" }, "injection": { @@ -94,7 +91,6 @@ Hotkey notes: - Use one key plus optional modifiers (for example `Cmd+m`, `Super+m`, `Ctrl+space`). - `Super` and `Cmd` are equivalent aliases for the same modifier. -- `daemon.hotkey` and `daemon.edit_hotkey` must be different. AI cleanup is always enabled and uses the locked local Llama-3.2-3B GGUF model downloaded to `~/.cache/aman/models/` during daemon initialization. @@ -131,21 +127,8 @@ systemctl --user enable --now aman - Press the hotkey once to start recording. - Press it again to stop and run STT. - Press `Esc` while recording to cancel without processing. -- `Esc` is only captured globally while dictation recording is active. - Transcript contents are logged only when `-v/--verbose` is used. -Edit mode: - -- Copy text to clipboard and press `daemon.edit_hotkey`. -- Aman opens an editable popup with the clipboard snapshot and immediately starts recording an instruction. -- If clipboard is empty, the popup opens with empty text so the first instruction can create content. -- Press `daemon.edit_hotkey` again to stop recording and apply the instruction. -- Repeat to iterate with more voice instructions. -- Press `Enter` to close the popup and inject the final text. -- Press `Ctrl+C` to copy final text to clipboard and close the popup (no injection). -- Press `Esc` to cancel the edit session completely. -- While edit mode is open, the normal dictation hotkey is ignored. - Wayland note: - Running under Wayland currently exits with a message explaining that it is not supported yet. diff --git a/config.example.json b/config.example.json index 9689318..b72cfc9 100644 --- a/config.example.json +++ b/config.example.json @@ -1,7 +1,6 @@ { "daemon": { - "hotkey": "Cmd+m", - "edit_hotkey": "Cmd+Shift+m" + "hotkey": "Cmd+m" }, "recording": { "input": "" @@ -36,5 +35,8 @@ "Kubernetes", "PostgreSQL" ] + }, + "domain_inference": { + "enabled": true } } diff --git a/src/aiprocess.py b/src/aiprocess.py index 2409527..11a92ba 100644 --- a/src/aiprocess.py +++ b/src/aiprocess.py @@ -36,20 +36,6 @@ SYSTEM_PROMPT = ( " - transcript=\"let's ask Bob, I mean Janice, let's ask Janice\" -> {\"cleaned_text\":\"let's ask Janice\"}\n" ) -EDIT_SYSTEM_PROMPT = ( - "You are an amanuensis editor working for a user.\n" - "You'll receive JSON with the current text and spoken editing instructions.\n" - "Rewrite the full text according to those instructions.\n\n" - "Rules:\n" - "- Apply the latest instruction while honoring prior instruction history.\n" - "- Keep unchanged portions intact unless instructions request broader changes.\n" - "- Do not invent facts or context.\n" - "- If a dictionary section exists, apply only the listed corrections.\n" - "- Keep dictionary spellings exactly as provided.\n" - "- Return ONLY valid JSON in this shape: {\"cleaned_text\": \"...\"}\n" - "- Do not wrap with markdown, tags, or extra keys.\n" -) - class LlamaProcessor: def __init__(self, verbose: bool = False): @@ -83,33 +69,9 @@ class LlamaProcessor: if cleaned_dictionary: request_payload["dictionary"] = cleaned_dictionary - return self._run_prompt(SYSTEM_PROMPT, request_payload) - - def process_edit( - self, - current_text: str, - latest_instruction: str, - instruction_history: list[str], - lang: str = "en", - *, - dictionary_context: str = "", - ) -> str: - request_payload: dict[str, Any] = { - "language": lang, - "current_text": current_text, - "latest_instruction": latest_instruction, - "instruction_history": instruction_history, - } - cleaned_dictionary = dictionary_context.strip() - if cleaned_dictionary: - request_payload["dictionary"] = cleaned_dictionary - - return self._run_prompt(EDIT_SYSTEM_PROMPT, request_payload) - - def _run_prompt(self, system_prompt: str, request_payload: dict[str, Any]) -> str: kwargs: dict[str, Any] = { "messages": [ - {"role": "system", "content": system_prompt}, + {"role": "system", "content": SYSTEM_PROMPT}, {"role": "user", "content": json.dumps(request_payload, ensure_ascii=False)}, ], "temperature": 0.0, diff --git a/src/aman.py b/src/aman.py old mode 100644 new mode 100755 index 7c47fe0..3c14542 --- a/src/aman.py +++ b/src/aman.py @@ -29,19 +29,8 @@ class State: STT = "stt" PROCESSING = "processing" OUTPUTTING = "outputting" - EDIT_IDLE = "edit_idle" - EDIT_RECORDING = "edit_recording" - EDIT_STT = "edit_stt" - EDIT_PROCESSING = "edit_processing" -EDIT_STATES = { - State.EDIT_IDLE, - State.EDIT_RECORDING, - State.EDIT_STT, - State.EDIT_PROCESSING, -} - _LOCK_HANDLE = None @@ -71,22 +60,12 @@ class Daemon: self.cfg = cfg self.desktop = desktop self.verbose = verbose - self.lock = threading.RLock() + self.lock = threading.Lock() self._shutdown_requested = threading.Event() self.state = State.IDLE - self.stream = None self.record = None self.timer: threading.Timer | None = None - - self.edit_stream = None - self.edit_record = None - self.edit_timer: threading.Timer | None = None - self.edit_active = False - self.edit_text = "" - self.edit_instruction_history: list[str] = [] - self.edit_session_token = 0 - self.model = _build_whisper_model( cfg.stt.model, cfg.stt.device, @@ -98,18 +77,6 @@ class Daemon: self.vocabulary = VocabularyEngine(cfg.vocabulary) self._stt_hint_kwargs_cache: dict[str, Any] | None = None - def _arm_cancel_listener_for_recording(self): - try: - self.desktop.start_cancel_listener(lambda: self.cancel_recording()) - except Exception as exc: - logging.error("failed to arm cancel listener: %s", exc) - - def _disarm_cancel_listener_for_recording(self): - try: - self.desktop.stop_cancel_listener() - except Exception as exc: - logging.debug("failed to disarm cancel listener: %s", exc) - def set_state(self, state: str): with self.lock: prev = self.state @@ -132,9 +99,6 @@ class Daemon: if self._shutdown_requested.is_set(): logging.info("shutdown in progress, trigger ignored") return - if self.edit_active: - logging.info("edit session active, dictate trigger ignored") - return if self.state == State.IDLE: self._start_recording_locked() return @@ -145,60 +109,10 @@ class Daemon: if should_stop: self.stop_recording(trigger="user") - def toggle_edit(self): - action = "" - token = 0 - with self.lock: - if self._shutdown_requested.is_set(): - logging.info("shutdown in progress, edit trigger ignored") - return - if self.edit_active: - token = self.edit_session_token - if self.state == State.EDIT_IDLE: - action = "start_recording" - elif self.state == State.EDIT_RECORDING: - action = "stop_recording" - else: - logging.info("edit session busy (%s), trigger ignored", self.state) - return - else: - if self.state != State.IDLE: - logging.info("busy (%s), edit trigger ignored", self.state) - return - self.edit_active = True - self.edit_session_token += 1 - token = self.edit_session_token - self.edit_instruction_history = [] - self.edit_text = "" - self.set_state(State.EDIT_IDLE) - action = "open_session" - - if action == "stop_recording": - self.stop_edit_recording(trigger="user") - return - if action == "start_recording": - self._start_edit_recording(token=token, trigger="user") - return - if action == "open_session": - self._open_edit_session(token) - - def handle_cancel(self): - with self.lock: - edit_active = self.edit_active - state = self.state - if edit_active: - self.cancel_edit_session() - return - if state == State.RECORDING: - self.cancel_recording() - def _start_recording_locked(self): if self.state != State.IDLE: logging.info("busy (%s), trigger ignored", self.state) return - if self.edit_active: - logging.info("edit session active, dictate trigger ignored") - return try: stream, record = start_audio_recording(self.cfg.recording.input) except Exception as exc: @@ -206,8 +120,9 @@ class Daemon: return self.stream = stream self.record = record - self.set_state(State.RECORDING) - self._arm_cancel_listener_for_recording() + prev = self.state + self.state = State.RECORDING + logging.debug("state: %s -> %s", prev, self.state) logging.info("recording started") if self.timer: self.timer.cancel() @@ -235,12 +150,13 @@ class Daemon: if self.timer: self.timer.cancel() self.timer = None - self._disarm_cancel_listener_for_recording() - self.set_state(State.STT) + prev = self.state + self.state = State.STT + logging.debug("state: %s -> %s", prev, self.state) if stream is None or record is None: logging.warning("recording resources are unavailable during stop") - self.set_state(State.IDLE) + self.state = State.IDLE return None return stream, record @@ -338,292 +254,8 @@ class Daemon: return self.stop_recording(trigger="cancel", process_audio=False) - def _open_edit_session(self, token: int): - initial_text = "" - try: - initial_text = self.desktop.read_clipboard_text() or "" - except Exception as exc: - logging.error("failed reading clipboard for edit session: %s", exc) - with self.lock: - if not self._edit_session_is_active_locked(token): - return - self.edit_text = initial_text - - try: - self.desktop.open_edit_popup( - initial_text, - on_submit=self.finalize_edit_session_inject, - on_copy=self.finalize_edit_session_copy, - on_cancel=self.cancel_edit_session, - ) - self._safe_set_edit_popup_status("Recording instruction...") - except Exception as exc: - logging.error("failed opening edit popup: %s", exc) - self._close_edit_session(close_popup=False) - return - - if not self._start_edit_recording(token=token, trigger="open"): - self._safe_set_edit_popup_status("Ready. Press edit hotkey to record.") - - def _start_edit_recording(self, *, token: int, trigger: str) -> bool: - with self.lock: - if not self._edit_session_is_active_locked(token): - return False - if self.state != State.EDIT_IDLE: - logging.info("edit session busy (%s), start ignored", self.state) - return False - try: - stream, record = start_audio_recording(self.cfg.recording.input) - except Exception as exc: - logging.error("edit record start failed: %s", exc) - return False - self.edit_stream = stream - self.edit_record = record - if self.edit_timer: - self.edit_timer.cancel() - self.edit_timer = threading.Timer(RECORD_TIMEOUT_SEC, self._timeout_edit_stop) - self.edit_timer.daemon = True - self.edit_timer.start() - self.set_state(State.EDIT_RECORDING) - self._safe_set_edit_popup_status("Recording instruction...") - logging.info("edit recording started (%s)", trigger) - return True - - def _timeout_edit_stop(self): - self.stop_edit_recording(trigger="timeout") - - def stop_edit_recording(self, *, trigger: str = "user", process_audio: bool = True): - payload = None - token = 0 - with self.lock: - if not self.edit_active or self.state != State.EDIT_RECORDING: - return - payload = (self.edit_stream, self.edit_record) - token = self.edit_session_token - self.edit_stream = None - self.edit_record = None - if self.edit_timer: - self.edit_timer.cancel() - self.edit_timer = None - self.set_state(State.EDIT_STT) - self._safe_set_edit_popup_status("Transcribing instruction...") - - stream, record = payload - if stream is None or record is None: - logging.warning("edit recording resources are unavailable during stop") - with self.lock: - if self._edit_session_is_active_locked(token): - self.set_state(State.EDIT_IDLE) - self._safe_set_edit_popup_status("Ready. Press edit hotkey to record.") - return - - threading.Thread( - target=self._edit_stop_and_process, - args=(stream, record, token, trigger, process_audio), - daemon=True, - ).start() - - def _edit_stop_and_process( - self, - stream: Any, - record: Any, - token: int, - trigger: str, - process_audio: bool, - ): - logging.info("stopping edit recording (%s)", trigger) - try: - audio = stop_audio_recording(stream, record) - except Exception as exc: - logging.error("edit record stop failed: %s", exc) - with self.lock: - if self._edit_session_is_active_locked(token): - self.set_state(State.EDIT_IDLE) - self._safe_set_edit_popup_status("Failed to stop recording.") - return - - if not process_audio or self._shutdown_requested.is_set(): - with self.lock: - if self._edit_session_is_active_locked(token): - self.set_state(State.EDIT_IDLE) - self._safe_set_edit_popup_status("Ready. Press edit hotkey to record.") - return - - if audio.size == 0: - logging.error("no audio captured for edit instruction") - with self.lock: - if self._edit_session_is_active_locked(token): - self.set_state(State.EDIT_IDLE) - self._safe_set_edit_popup_status("No audio captured. Record again.") - return - - try: - instruction = self._transcribe(audio).strip() - except Exception as exc: - logging.error("edit stt failed: %s", exc) - with self.lock: - if self._edit_session_is_active_locked(token): - self.set_state(State.EDIT_IDLE) - self._safe_set_edit_popup_status("STT failed. Record again.") - return - - if not instruction: - with self.lock: - if self._edit_session_is_active_locked(token): - self.set_state(State.EDIT_IDLE) - self._safe_set_edit_popup_status("No instruction heard. Record again.") - return - - if self.log_transcript: - logging.debug("edit instruction: %s", instruction) - else: - logging.info("edit instruction length: %d", len(instruction)) - - with self.lock: - if not self._edit_session_is_active_locked(token): - return - self.edit_instruction_history.append(instruction) - instruction_history = list(self.edit_instruction_history) - self.set_state(State.EDIT_PROCESSING) - self._safe_set_edit_popup_status("Applying instruction...") - - current_text = self._current_edit_text() - updated_text = current_text - try: - ai_text = self._get_ai_processor().process_edit( - current_text, - instruction, - instruction_history, - lang=STT_LANGUAGE, - dictionary_context=self.vocabulary.build_ai_dictionary_context(), - ) - if ai_text and ai_text.strip(): - updated_text = ai_text - except Exception as exc: - logging.error("edit process failed: %s", exc) - - updated_text = self.vocabulary.apply_deterministic_replacements(updated_text).strip() - - with self.lock: - if not self._edit_session_is_active_locked(token): - return - self.edit_text = updated_text - self.set_state(State.EDIT_IDLE) - self._safe_set_edit_popup_text(updated_text) - self._safe_set_edit_popup_status("Ready. Press edit hotkey to record.") - - def _current_edit_text(self) -> str: - try: - text = self.desktop.get_edit_popup_text() - except Exception: - with self.lock: - return self.edit_text - with self.lock: - self.edit_text = text - return text - - def finalize_edit_session_inject(self): - threading.Thread(target=self._finalize_edit_session_inject_worker, daemon=True).start() - - def _finalize_edit_session_inject_worker(self): - text = self._current_edit_text() - self._close_edit_session(close_popup=True) - - if self._shutdown_requested.is_set(): - return - - try: - self.desktop.restore_previous_focus() - except Exception as exc: - logging.warning("could not restore previous focus: %s", exc) - - try: - self.set_state(State.OUTPUTTING) - self.desktop.inject_text( - text, - self.cfg.injection.backend, - remove_transcription_from_clipboard=( - self.cfg.injection.remove_transcription_from_clipboard - ), - ) - except Exception as exc: - logging.error("edit output failed: %s", exc) - finally: - self.set_state(State.IDLE) - - def finalize_edit_session_copy(self): - threading.Thread(target=self._finalize_edit_session_copy_worker, daemon=True).start() - - def _finalize_edit_session_copy_worker(self): - text = self._current_edit_text() - self._close_edit_session(close_popup=True) - try: - self.desktop.write_clipboard_text(text) - except Exception as exc: - logging.error("failed to copy edited text to clipboard: %s", exc) - - def cancel_edit_session(self): - threading.Thread(target=self._cancel_edit_session_worker, daemon=True).start() - - def _cancel_edit_session_worker(self): - self._close_edit_session(close_popup=True) - - def _close_edit_session(self, *, close_popup: bool): - stream = None - record = None - with self.lock: - stream = self.edit_stream - record = self.edit_record - self.edit_stream = None - self.edit_record = None - if self.edit_timer: - self.edit_timer.cancel() - self.edit_timer = None - - self.edit_active = False - self.edit_session_token += 1 - self.edit_instruction_history = [] - self.edit_text = "" - if self.state in EDIT_STATES: - self.set_state(State.IDLE) - - if close_popup: - try: - self.desktop.close_edit_popup() - except Exception as exc: - logging.debug("failed closing edit popup: %s", exc) - - if stream is not None and record is not None: - try: - stop_audio_recording(stream, record) - except Exception: - pass - - def _edit_session_is_active_locked(self, token: int) -> bool: - return self.edit_active and self.edit_session_token == token - - def _safe_set_edit_popup_status(self, status: str): - with self.lock: - if not self.edit_active: - return - try: - self.desktop.set_edit_popup_status(status) - except Exception as exc: - logging.debug("failed setting edit popup status: %s", exc) - - def _safe_set_edit_popup_text(self, text: str): - with self.lock: - if not self.edit_active: - return - try: - self.desktop.set_edit_popup_text(text) - except Exception as exc: - logging.debug("failed setting edit popup text: %s", exc) - def shutdown(self, timeout: float = 5.0) -> bool: self.request_shutdown() - self._disarm_cancel_listener_for_recording() - self._close_edit_session(close_popup=True) self.stop_recording(trigger="shutdown", process_audio=False) return self.wait_for_idle(timeout) @@ -735,7 +367,6 @@ def main(): _LOCK_HANDLE = _lock_single_instance() logging.info("hotkey: %s", cfg.daemon.hotkey) - logging.info("edit hotkey: %s", cfg.daemon.edit_hotkey) logging.info( "config (%s):\n%s", args.config or str(Path.home() / ".config" / "aman" / "config.json"), @@ -769,16 +400,9 @@ def main(): try: desktop.start_hotkey_listener( cfg.daemon.hotkey, - lambda: logging.info("dictate hotkey pressed (dry-run)") - if args.dry_run - else daemon.toggle(), - ) - desktop.start_hotkey_listener( - cfg.daemon.edit_hotkey, - lambda: logging.info("edit hotkey pressed (dry-run)") - if args.dry_run - else daemon.toggle_edit(), + lambda: logging.info("hotkey pressed (dry-run)") if args.dry_run else daemon.toggle(), ) + desktop.start_cancel_listener(lambda: daemon.cancel_recording()) except Exception as exc: logging.error("hotkey setup failed: %s", exc) raise SystemExit(1) diff --git a/src/config.py b/src/config.py index ed93cf9..ab4d30a 100644 --- a/src/config.py +++ b/src/config.py @@ -10,7 +10,6 @@ from hotkey import split_hotkey DEFAULT_HOTKEY = "Cmd+m" -DEFAULT_EDIT_HOTKEY = "Cmd+Shift+m" DEFAULT_STT_MODEL = "base" DEFAULT_STT_DEVICE = "cpu" DEFAULT_INJECTION_BACKEND = "clipboard" @@ -21,7 +20,6 @@ WILDCARD_CHARS = set("*?[]{}") @dataclass class DaemonConfig: hotkey: str = DEFAULT_HOTKEY - edit_hotkey: str = DEFAULT_EDIT_HOTKEY @dataclass @@ -95,19 +93,6 @@ def validate(cfg: Config) -> None: split_hotkey(hotkey) except ValueError as exc: raise ValueError(f"daemon.hotkey is invalid: {exc}") from exc - cfg.daemon.hotkey = hotkey - - edit_hotkey = cfg.daemon.edit_hotkey.strip() - if not edit_hotkey: - raise ValueError("daemon.edit_hotkey cannot be empty") - try: - split_hotkey(edit_hotkey) - except ValueError as exc: - raise ValueError(f"daemon.edit_hotkey is invalid: {exc}") from exc - cfg.daemon.edit_hotkey = edit_hotkey - - if hotkey.casefold() == edit_hotkey.casefold(): - raise ValueError("daemon.hotkey and daemon.edit_hotkey must be different") if isinstance(cfg.recording.input, bool): raise ValueError("recording.input cannot be boolean") @@ -153,8 +138,6 @@ def _from_dict(data: dict[str, Any], cfg: Config) -> Config: if "hotkey" in daemon: cfg.daemon.hotkey = _as_nonempty_str(daemon["hotkey"], "daemon.hotkey") - if "edit_hotkey" in daemon: - cfg.daemon.edit_hotkey = _as_nonempty_str(daemon["edit_hotkey"], "daemon.edit_hotkey") if "input" in recording: cfg.recording.input = _as_recording_input(recording["input"]) if "model" in stt: diff --git a/src/desktop.py b/src/desktop.py index 3c4b782..23ac5f0 100644 --- a/src/desktop.py +++ b/src/desktop.py @@ -11,9 +11,6 @@ class DesktopAdapter(Protocol): def start_cancel_listener(self, callback: Callable[[], None]) -> None: raise NotImplementedError - def stop_cancel_listener(self) -> None: - raise NotImplementedError - def inject_text( self, text: str, @@ -23,37 +20,6 @@ class DesktopAdapter(Protocol): ) -> None: raise NotImplementedError - def read_clipboard_text(self) -> str | None: - raise NotImplementedError - - def write_clipboard_text(self, text: str) -> None: - raise NotImplementedError - - def open_edit_popup( - self, - initial_text: str, - *, - on_submit: Callable[[], None], - on_copy: Callable[[], None], - on_cancel: Callable[[], None], - ) -> None: - raise NotImplementedError - - def close_edit_popup(self) -> None: - raise NotImplementedError - - def get_edit_popup_text(self) -> str: - raise NotImplementedError - - def set_edit_popup_text(self, text: str) -> None: - raise NotImplementedError - - def set_edit_popup_status(self, status: str) -> None: - raise NotImplementedError - - def restore_previous_focus(self) -> bool: - raise NotImplementedError - def run_tray(self, state_getter: Callable[[], str], on_quit: Callable[[], None]) -> None: raise NotImplementedError diff --git a/src/desktop_wayland.py b/src/desktop_wayland.py index c061b76..1da88a8 100644 --- a/src/desktop_wayland.py +++ b/src/desktop_wayland.py @@ -10,9 +10,6 @@ class WaylandAdapter: def start_cancel_listener(self, _callback: Callable[[], None]) -> None: raise SystemExit("Wayland hotkeys are not supported yet.") - def stop_cancel_listener(self) -> None: - raise SystemExit("Wayland hotkeys are not supported yet.") - def inject_text( self, _text: str, @@ -23,38 +20,6 @@ class WaylandAdapter: _ = remove_transcription_from_clipboard raise SystemExit("Wayland text injection is not supported yet.") - def read_clipboard_text(self) -> str | None: - raise SystemExit("Wayland clipboard access is not supported yet.") - - def write_clipboard_text(self, _text: str) -> None: - raise SystemExit("Wayland clipboard access is not supported yet.") - - def open_edit_popup( - self, - _initial_text: str, - *, - on_submit: Callable[[], None], - on_copy: Callable[[], None], - on_cancel: Callable[[], None], - ) -> None: - _ = (on_submit, on_copy, on_cancel) - raise SystemExit("Wayland edit popup is not supported yet.") - - def close_edit_popup(self) -> None: - raise SystemExit("Wayland edit popup is not supported yet.") - - def get_edit_popup_text(self) -> str: - raise SystemExit("Wayland edit popup is not supported yet.") - - def set_edit_popup_text(self, _text: str) -> None: - raise SystemExit("Wayland edit popup is not supported yet.") - - def set_edit_popup_status(self, _status: str) -> None: - raise SystemExit("Wayland edit popup is not supported yet.") - - def restore_previous_focus(self) -> bool: - raise SystemExit("Wayland focus restoration is not supported yet.") - def run_tray(self, _state_getter: Callable[[], str], _on_quit: Callable[[], None]) -> None: raise SystemExit("Wayland tray support is not available yet.") diff --git a/src/desktop_x11.py b/src/desktop_x11.py index d35f0da..483ad58 100644 --- a/src/desktop_x11.py +++ b/src/desktop_x11.py @@ -4,7 +4,7 @@ import logging import threading import time import warnings -from typing import Any, Callable, Iterable +from typing import Callable, Iterable import gi from Xlib import X, XK, display @@ -42,15 +42,6 @@ class X11Adapter: self.indicator = None self.status_icon = None self.menu = None - self._edit_window = None - self._edit_text_view = None - self._edit_text_buffer = None - self._edit_status_label = None - self._edit_callbacks: dict[str, Callable[[], None]] = {} - self._edit_previous_focus_window_id: int | None = None - self._cancel_listener_lock = threading.Lock() - self._cancel_listener_stop_event: threading.Event | None = None - self._cancel_listener_callback: Callable[[], None] | None = None if AppIndicator3 is not None: self.indicator = AppIndicator3.Indicator.new( "aman", @@ -81,36 +72,9 @@ class X11Adapter: def start_cancel_listener(self, callback: Callable[[], None]) -> None: mods, keysym = self._parse_hotkey("Escape") - with self._cancel_listener_lock: - if self._cancel_listener_stop_event is not None: - self._cancel_listener_callback = callback - return - self._cancel_listener_callback = callback - stop_event = threading.Event() - self._cancel_listener_stop_event = stop_event - thread = threading.Thread( - target=self._listen, - args=(mods, keysym, self._dispatch_cancel_listener, stop_event), - daemon=True, - ) + thread = threading.Thread(target=self._listen, args=(mods, keysym, callback), daemon=True) thread.start() - def stop_cancel_listener(self) -> None: - stop_event = None - with self._cancel_listener_lock: - stop_event = self._cancel_listener_stop_event - self._cancel_listener_stop_event = None - self._cancel_listener_callback = None - if stop_event is not None: - stop_event.set() - - def _dispatch_cancel_listener(self) -> None: - callback = None - with self._cancel_listener_lock: - callback = self._cancel_listener_callback - if callback is not None: - callback() - def inject_text( self, text: str, @@ -122,240 +86,24 @@ class X11Adapter: if backend == "clipboard": previous_clipboard = None if remove_transcription_from_clipboard: - previous_clipboard = self.read_clipboard_text() - self.write_clipboard_text(text) + previous_clipboard = self._read_clipboard_text() + self._write_clipboard(text) self._paste_clipboard() if remove_transcription_from_clipboard: time.sleep(CLIPBOARD_RESTORE_DELAY_SEC) - self._set_clipboard_text(previous_clipboard or "") + self._restore_clipboard_text(previous_clipboard) return if backend == "injection": self._type_text(text) return raise ValueError(f"unknown injection backend: {backend}") - def read_clipboard_text(self) -> str | None: - return self._run_on_ui_thread(self._read_clipboard_text_ui) - - def write_clipboard_text(self, text: str) -> None: - self._run_on_ui_thread(lambda: self._set_clipboard_text(text)) - - def open_edit_popup( - self, - initial_text: str, - *, - on_submit: Callable[[], None], - on_copy: Callable[[], None], - on_cancel: Callable[[], None], - ) -> None: - self._run_on_ui_thread( - lambda: self._open_edit_popup_ui( - initial_text, - on_submit=on_submit, - on_copy=on_copy, - on_cancel=on_cancel, - ) - ) - - def close_edit_popup(self) -> None: - self._run_on_ui_thread(self._close_edit_popup_ui) - - def get_edit_popup_text(self) -> str: - return self._run_on_ui_thread(self._get_edit_popup_text_ui) - - def set_edit_popup_text(self, text: str) -> None: - self._run_on_ui_thread(lambda: self._set_edit_popup_text_ui(text)) - - def set_edit_popup_status(self, status: str) -> None: - self._run_on_ui_thread(lambda: self._set_edit_popup_status_ui(status)) - - def restore_previous_focus(self) -> bool: - window_id = self._edit_previous_focus_window_id - if window_id is None: - return False - try: - dpy = display.Display() - window = dpy.create_resource_object("window", window_id) - window.set_input_focus(X.RevertToParent, X.CurrentTime) - dpy.sync() - dpy.close() - return True - except Exception as exc: - logging.warning("focus restore failed: %s", exc) - return False - - def _open_edit_popup_ui( - self, - initial_text: str, - *, - on_submit: Callable[[], None], - on_copy: Callable[[], None], - on_cancel: Callable[[], None], - ) -> None: - if self._edit_window is not None: - raise RuntimeError("edit popup is already open") - - self._edit_previous_focus_window_id = self._current_focus_window_id() - self._edit_callbacks = { - "submit": on_submit, - "copy": on_copy, - "cancel": on_cancel, - } - - window = Gtk.Window(type=Gtk.WindowType.TOPLEVEL) - window.set_title("Aman Editor") - window.set_default_size(900, 520) - window.set_position(Gtk.WindowPosition.CENTER) - window.set_type_hint(Gdk.WindowTypeHint.UTILITY) - window.set_skip_taskbar_hint(True) - window.set_skip_pager_hint(True) - window.set_keep_above(True) - window.connect("key-press-event", self._on_edit_key_press) - window.connect("delete-event", self._on_edit_delete_event) - - container = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=8) - container.set_border_width(12) - window.add(container) - - status_label = Gtk.Label(label="Recording...") - status_label.set_xalign(0.0) - container.pack_start(status_label, False, False, 0) - - scrolled = Gtk.ScrolledWindow() - scrolled.set_hexpand(True) - scrolled.set_vexpand(True) - container.pack_start(scrolled, True, True, 0) - - text_view = Gtk.TextView() - text_view.set_wrap_mode(Gtk.WrapMode.WORD_CHAR) - text_view.connect("key-press-event", self._on_edit_key_press) - scrolled.add(text_view) - - text_buffer = text_view.get_buffer() - text_buffer.set_text(initial_text or "") - - self._edit_window = window - self._edit_text_view = text_view - self._edit_text_buffer = text_buffer - self._edit_status_label = status_label - - window.show_all() - text_view.grab_focus() - window.present() - - def _on_edit_delete_event(self, _widget, _event): - self._invoke_edit_callback("cancel") - return True - - def _on_edit_key_press(self, _widget, event): - key = event.keyval - state = event.state - is_ctrl = bool(state & Gdk.ModifierType.CONTROL_MASK) - - if key == Gdk.KEY_Escape: - self._invoke_edit_callback("cancel") - return True - - if is_ctrl and key in (Gdk.KEY_c, Gdk.KEY_C): - self._invoke_edit_callback("copy") - return True - - if key in (Gdk.KEY_Return, Gdk.KEY_KP_Enter): - self._invoke_edit_callback("submit") - return True - - return False - - def _invoke_edit_callback(self, name: str) -> None: - callback = self._edit_callbacks.get(name) - if callback is None: - return - try: - callback() - except Exception as exc: - logging.error("edit popup callback failed (%s): %s", name, exc) - - def _close_edit_popup_ui(self) -> None: - if self._edit_window is not None: - try: - self._edit_window.destroy() - except Exception: - pass - self._edit_window = None - self._edit_text_view = None - self._edit_text_buffer = None - self._edit_status_label = None - self._edit_callbacks = {} - - def _get_edit_popup_text_ui(self) -> str: - buffer = self._edit_text_buffer - if buffer is None: - return "" - start = buffer.get_start_iter() - end = buffer.get_end_iter() - return buffer.get_text(start, end, True) - - def _set_edit_popup_text_ui(self, text: str) -> None: - buffer = self._edit_text_buffer - if buffer is None: - return - buffer.set_text(text or "") - - def _set_edit_popup_status_ui(self, status: str) -> None: - label = self._edit_status_label - if label is None: - return - label.set_text(status or "") - - def _current_focus_window_id(self) -> int | None: - try: - dpy = display.Display() - focused = dpy.get_input_focus().focus - window_id = getattr(focused, "id", None) - dpy.close() - if isinstance(window_id, int) and window_id > 0: - return window_id - return None - except Exception: - return None - - def _run_on_ui_thread(self, fn: Callable[[], Any]) -> Any: - if threading.current_thread() is threading.main_thread(): - return fn() - - done = threading.Event() - result: dict[str, Any] = {} - - def runner(): - try: - result["value"] = fn() - except Exception as exc: - result["error"] = exc - finally: - done.set() - return False - - GLib.idle_add(runner) - done.wait() - error = result.get("error") - if error is not None: - raise error - return result.get("value") - - def _read_clipboard_text_ui(self) -> str | None: + def _read_clipboard_text(self) -> str | None: Gtk.init([]) clipboard = Gtk.Clipboard.get(Gdk.SELECTION_CLIPBOARD) text = clipboard.wait_for_text() return str(text) if text is not None else None - def _set_clipboard_text(self, text: str) -> None: - Gtk.init([]) - clipboard = Gtk.Clipboard.get(Gdk.SELECTION_CLIPBOARD) - clipboard.set_text(text, -1) - clipboard.store() - while Gtk.events_pending(): - Gtk.main_iteration() - def run_tray(self, state_getter: Callable[[], str], on_quit: Callable[[], None]) -> None: self.menu = Gtk.Menu() quit_item = Gtk.MenuItem(label="Quit") @@ -378,14 +126,7 @@ class X11Adapter: finally: self.request_quit() - def _listen( - self, - mods: int, - keysym: int, - callback: Callable[[], None], - stop_event: threading.Event | None = None, - ) -> None: - local_stop = stop_event or threading.Event() + def _listen(self, mods: int, keysym: int, callback: Callable[[], None]) -> None: disp = None root = None keycode = None @@ -393,18 +134,14 @@ class X11Adapter: disp = display.Display() root = disp.screen().root keycode = self._grab_hotkey(disp, root, mods, keysym) - while not local_stop.is_set(): - if disp.pending_events() == 0: - time.sleep(0.05) - continue + while True: ev = disp.next_event() if ev.type == X.KeyPress and ev.detail == keycode: state = ev.state & ~(X.LockMask | X.Mod2Mask) if state == mods: callback() except Exception as exc: - if not local_stop.is_set(): - logging.error("hotkey listener stopped: %s", exc) + logging.error("hotkey listener stopped: %s", exc) finally: if root is not None and keycode is not None and disp is not None: try: @@ -412,11 +149,6 @@ class X11Adapter: disp.sync() except Exception: pass - if disp is not None: - try: - disp.close() - except Exception: - pass def _parse_hotkey(self, hotkey: str): mods = 0 @@ -463,6 +195,22 @@ class X11Adapter: disp.sync() return keycode + def _write_clipboard(self, text: str) -> None: + Gtk.init([]) + clipboard = Gtk.Clipboard.get(Gdk.SELECTION_CLIPBOARD) + clipboard.set_text(text, -1) + clipboard.store() + while Gtk.events_pending(): + Gtk.main_iteration() + + def _restore_clipboard_text(self, text: str | None) -> None: + Gtk.init([]) + clipboard = Gtk.Clipboard.get(Gdk.SELECTION_CLIPBOARD) + clipboard.set_text(text or "", -1) + clipboard.store() + while Gtk.events_pending(): + Gtk.main_iteration() + def _paste_clipboard(self) -> None: dpy = display.Display() self._send_combo(dpy, ["Control_L", "Shift_L", "v"]) @@ -513,11 +261,11 @@ class X11Adapter: return (keysym if keysym != 0 else None, False) def _icon_path(self, state: str) -> str: - if state in ("recording", "edit_recording"): + if state == "recording": return str(ASSETS_DIR / "recording.png") - if state in ("stt", "edit_stt"): + if state == "stt": return str(ASSETS_DIR / "stt.png") - if state in ("processing", "outputting", "edit_processing"): + if state == "processing": return str(ASSETS_DIR / "processing.png") return str(ASSETS_DIR / "idle.png") @@ -528,16 +276,6 @@ class X11Adapter: return "STT" if state == "processing": return "AI Processing" - if state == "outputting": - return "Outputting" - if state == "edit_recording": - return "Editing: Recording" - if state == "edit_stt": - return "Editing: STT" - if state == "edit_processing": - return "Editing: Processing" - if state == "edit_idle": - return "Editing" return "Idle" def _update_tray(self, state_getter: Callable[[], str]): diff --git a/tests/test_aman.py b/tests/test_aman.py index cd33ad7..d9a9867 100644 --- a/tests/test_aman.py +++ b/tests/test_aman.py @@ -1,7 +1,6 @@ import os import sys import tempfile -import time import unittest from pathlib import Path from unittest.mock import patch @@ -19,15 +18,6 @@ class FakeDesktop: def __init__(self): self.inject_calls = [] self.quit_calls = 0 - self.clipboard_text = "" - self.popup_open = False - self.popup_text = "" - self.popup_statuses = [] - self.popup_callbacks = {} - self.popup_close_calls = 0 - self.focus_restore_calls = 0 - self.cancel_listener_active = False - self.cancel_listener_callback = None def inject_text( self, @@ -38,53 +28,6 @@ class FakeDesktop: ) -> None: self.inject_calls.append((text, backend, remove_transcription_from_clipboard)) - def read_clipboard_text(self) -> str | None: - return self.clipboard_text - - def write_clipboard_text(self, text: str) -> None: - self.clipboard_text = text - - def open_edit_popup( - self, - initial_text: str, - *, - on_submit, - on_copy, - on_cancel, - ) -> None: - self.popup_open = True - self.popup_text = initial_text - self.popup_callbacks = { - "submit": on_submit, - "copy": on_copy, - "cancel": on_cancel, - } - - def close_edit_popup(self) -> None: - self.popup_open = False - self.popup_close_calls += 1 - - def get_edit_popup_text(self) -> str: - return self.popup_text - - def set_edit_popup_text(self, text: str) -> None: - self.popup_text = text - - def set_edit_popup_status(self, status: str) -> None: - self.popup_statuses.append(status) - - def restore_previous_focus(self) -> bool: - self.focus_restore_calls += 1 - return True - - def start_cancel_listener(self, callback) -> None: - self.cancel_listener_active = True - self.cancel_listener_callback = callback - - def stop_cancel_listener(self) -> None: - self.cancel_listener_active = False - self.cancel_listener_callback = None - def request_quit(self) -> None: self.quit_calls += 1 @@ -130,30 +73,9 @@ class FakeHintModel: class FakeAIProcessor: - def __init__(self): - self.edit_calls = [] - def process(self, text, lang="en", **_kwargs): return text - def process_edit( - self, - current_text, - latest_instruction, - instruction_history, - lang="en", - **_kwargs, - ): - self.edit_calls.append( - { - "current_text": current_text, - "latest_instruction": latest_instruction, - "instruction_history": list(instruction_history), - "lang": lang, - } - ) - return f"{current_text} [{latest_instruction}]" - class FakeAudio: def __init__(self, size: int): @@ -179,14 +101,6 @@ class DaemonTests(unittest.TestCase): ): return aman.Daemon(active_cfg, desktop, verbose=verbose) - def _wait_until(self, predicate, timeout: float = 1.0): - end = time.time() + timeout - while time.time() < end: - if predicate(): - return True - time.sleep(0.01) - return predicate() - @patch("aman.stop_audio_recording", return_value=FakeAudio(8)) @patch("aman.start_audio_recording", return_value=(object(), object())) def test_toggle_start_stop_injects_text(self, _start_mock, _stop_mock): @@ -325,120 +239,6 @@ class DaemonTests(unittest.TestCase): any("DEBUG:root:state: idle -> recording" in line for line in logs.output) ) - @patch("aman.stop_audio_recording", return_value=FakeAudio(8)) - @patch("aman.start_audio_recording", return_value=(object(), object())) - def test_escape_listener_is_only_armed_while_recording(self, _start_mock, _stop_mock): - desktop = FakeDesktop() - daemon = self._build_daemon(desktop, FakeModel(), verbose=False) - daemon._start_stop_worker = ( - lambda stream, record, trigger, process_audio: daemon._stop_and_process( - stream, record, trigger, process_audio - ) - ) - - self.assertFalse(desktop.cancel_listener_active) - daemon.toggle() - self.assertTrue(desktop.cancel_listener_active) - daemon.toggle() - self.assertFalse(desktop.cancel_listener_active) - - @patch("aman.start_audio_recording", return_value=(object(), object())) - def test_edit_mode_opens_popup_and_starts_recording(self, _start_mock): - desktop = FakeDesktop() - desktop.clipboard_text = "Hello team" - daemon = self._build_daemon(desktop, FakeModel(text="make it funnier"), verbose=False) - - daemon.toggle_edit() - - self.assertTrue(desktop.popup_open) - self.assertEqual(desktop.popup_text, "Hello team") - self.assertEqual(daemon.get_state(), aman.State.EDIT_RECORDING) - - @patch("aman.stop_audio_recording", return_value=FakeAudio(8)) - @patch("aman.start_audio_recording", return_value=(object(), object())) - def test_edit_mode_instruction_updates_popup_text(self, _start_mock, _stop_mock): - desktop = FakeDesktop() - desktop.clipboard_text = "Hello team" - daemon = self._build_daemon(desktop, FakeModel(text="make it funnier"), verbose=False) - - daemon.toggle_edit() - daemon.toggle_edit() - - self.assertTrue( - self._wait_until(lambda: daemon.get_state() == aman.State.EDIT_IDLE), - "edit mode did not return to EDIT_IDLE", - ) - self.assertEqual(desktop.popup_text, "Hello team [make it funnier]") - self.assertEqual(len(daemon.ai_processor.edit_calls), 1) - self.assertEqual( - daemon.ai_processor.edit_calls[0]["instruction_history"], - ["make it funnier"], - ) - - @patch("aman.stop_audio_recording", return_value=FakeAudio(8)) - @patch("aman.start_audio_recording", return_value=(object(), object())) - def test_enter_finalizes_and_injects(self, _start_mock, _stop_mock): - desktop = FakeDesktop() - desktop.clipboard_text = "Initial" - daemon = self._build_daemon(desktop, FakeModel(text="instruction"), verbose=False) - - daemon.toggle_edit() - desktop.popup_text = "Final text" - daemon.finalize_edit_session_inject() - - self.assertTrue( - self._wait_until(lambda: len(desktop.inject_calls) == 1), - "edit finalize did not inject text", - ) - self.assertFalse(desktop.popup_open) - self.assertEqual(desktop.inject_calls[0], ("Final text", "clipboard", False)) - self.assertEqual(desktop.focus_restore_calls, 1) - - @patch("aman.stop_audio_recording", return_value=FakeAudio(8)) - @patch("aman.start_audio_recording", return_value=(object(), object())) - def test_ctrl_c_copies_and_closes_without_inject(self, _start_mock, _stop_mock): - desktop = FakeDesktop() - desktop.clipboard_text = "Initial" - daemon = self._build_daemon(desktop, FakeModel(text="instruction"), verbose=False) - - daemon.toggle_edit() - desktop.popup_text = "Copied text" - daemon.finalize_edit_session_copy() - - self.assertTrue( - self._wait_until(lambda: not desktop.popup_open), - "edit popup did not close after copy", - ) - self.assertEqual(desktop.clipboard_text, "Copied text") - self.assertEqual(desktop.inject_calls, []) - - @patch("aman.start_audio_recording", return_value=(object(), object())) - def test_normal_hotkey_ignored_while_edit_session_active(self, _start_mock): - desktop = FakeDesktop() - desktop.clipboard_text = "Initial" - daemon = self._build_daemon(desktop, FakeModel(text="instruction"), verbose=False) - - daemon.toggle_edit() - daemon.toggle() - - self.assertEqual(daemon.get_state(), aman.State.EDIT_RECORDING) - - @patch("aman.stop_audio_recording", return_value=FakeAudio(8)) - @patch("aman.start_audio_recording", return_value=(object(), object())) - def test_handle_cancel_closes_edit_session(self, _start_mock, _stop_mock): - desktop = FakeDesktop() - desktop.clipboard_text = "Initial" - daemon = self._build_daemon(desktop, FakeModel(text="instruction"), verbose=False) - - daemon.toggle_edit() - daemon.handle_cancel() - - self.assertTrue( - self._wait_until(lambda: daemon.get_state() == aman.State.IDLE), - "edit cancel did not reach idle state", - ) - self.assertFalse(desktop.popup_open) - class LockTests(unittest.TestCase): def test_lock_rejects_second_instance(self): diff --git a/tests/test_config.py b/tests/test_config.py index 3c86413..1e6dd13 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -19,7 +19,6 @@ class ConfigTests(unittest.TestCase): cfg = load(str(missing)) self.assertEqual(cfg.daemon.hotkey, "Cmd+m") - self.assertEqual(cfg.daemon.edit_hotkey, "Cmd+Shift+m") self.assertEqual(cfg.recording.input, "") self.assertEqual(cfg.stt.model, "base") self.assertEqual(cfg.stt.device, "cpu") @@ -34,7 +33,7 @@ class ConfigTests(unittest.TestCase): def test_loads_nested_config(self): payload = { - "daemon": {"hotkey": "Ctrl+space", "edit_hotkey": "Ctrl+Shift+space"}, + "daemon": {"hotkey": "Ctrl+space"}, "recording": {"input": 3}, "stt": {"model": "small", "device": "cuda"}, "injection": { @@ -56,7 +55,6 @@ class ConfigTests(unittest.TestCase): cfg = load(str(path)) self.assertEqual(cfg.daemon.hotkey, "Ctrl+space") - self.assertEqual(cfg.daemon.edit_hotkey, "Ctrl+Shift+space") self.assertEqual(cfg.recording.input, 3) self.assertEqual(cfg.stt.model, "small") self.assertEqual(cfg.stt.device, "cuda") @@ -68,7 +66,7 @@ class ConfigTests(unittest.TestCase): self.assertEqual(cfg.vocabulary.terms, ["Systemd", "Kubernetes"]) def test_super_modifier_hotkey_is_valid(self): - payload = {"daemon": {"hotkey": "Super+m", "edit_hotkey": "Super+Shift+m"}} + payload = {"daemon": {"hotkey": "Super+m"}} with tempfile.TemporaryDirectory() as td: path = Path(td) / "config.json" path.write_text(json.dumps(payload), encoding="utf-8") @@ -76,7 +74,6 @@ class ConfigTests(unittest.TestCase): cfg = load(str(path)) self.assertEqual(cfg.daemon.hotkey, "Super+m") - self.assertEqual(cfg.daemon.edit_hotkey, "Super+Shift+m") def test_invalid_hotkey_missing_key_raises(self): payload = {"daemon": {"hotkey": "Ctrl+Alt"}} @@ -98,24 +95,6 @@ class ConfigTests(unittest.TestCase): ): load(str(path)) - def test_invalid_edit_hotkey_raises(self): - payload = {"daemon": {"edit_hotkey": "Ctrl+Alt"}} - with tempfile.TemporaryDirectory() as td: - path = Path(td) / "config.json" - path.write_text(json.dumps(payload), encoding="utf-8") - - with self.assertRaisesRegex(ValueError, "daemon.edit_hotkey is invalid: missing key"): - load(str(path)) - - def test_equal_hotkeys_raise(self): - payload = {"daemon": {"hotkey": "Cmd+m", "edit_hotkey": "Cmd+m"}} - with tempfile.TemporaryDirectory() as td: - path = Path(td) / "config.json" - path.write_text(json.dumps(payload), encoding="utf-8") - - with self.assertRaisesRegex(ValueError, "must be different"): - load(str(path)) - def test_invalid_injection_backend_raises(self): payload = {"injection": {"backend": "invalid"}} with tempfile.TemporaryDirectory() as td: @@ -147,7 +126,6 @@ class ConfigTests(unittest.TestCase): cfg = load(str(path)) self.assertEqual(cfg.daemon.hotkey, "Cmd+m") - self.assertEqual(cfg.daemon.edit_hotkey, "Cmd+Shift+m") self.assertEqual(cfg.injection.backend, "clipboard") def test_conflicting_replacements_raise(self):