From 64c8c26bce2ae600a4bb3d72da62c6ddba340201 Mon Sep 17 00:00:00 2001 From: Thales Maciel Date: Thu, 26 Feb 2026 16:28:49 -0300 Subject: [PATCH] Scope Esc cancel listener to active recording --- README.md | 1 + src/aman.py | 19 ++++++++++++--- src/desktop.py | 3 +++ src/desktop_wayland.py | 3 +++ src/desktop_x11.py | 53 ++++++++++++++++++++++++++++++++++++++---- tests/test_aman.py | 33 ++++++++++++++++++++++++++ 6 files changed, 105 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index d0dcc13..573d4b0 100644 --- a/README.md +++ b/README.md @@ -127,6 +127,7 @@ systemctl --user enable --now aman - Press the hotkey once to start recording. - Press it again to stop and run STT. - Press `Esc` while recording to cancel without processing. +- `Esc` is only captured during active recording. - Transcript contents are logged only when `-v/--verbose` is used. Wayland note: diff --git a/src/aman.py b/src/aman.py index 3c14542..abb3f1b 100755 --- a/src/aman.py +++ b/src/aman.py @@ -77,6 +77,18 @@ class Daemon: self.vocabulary = VocabularyEngine(cfg.vocabulary) self._stt_hint_kwargs_cache: dict[str, Any] | None = None + def _arm_cancel_listener(self): + try: + self.desktop.start_cancel_listener(lambda: self.cancel_recording()) + except Exception as exc: + logging.error("failed to start cancel listener: %s", exc) + + def _disarm_cancel_listener(self): + try: + self.desktop.stop_cancel_listener() + except Exception as exc: + logging.debug("failed to stop cancel listener: %s", exc) + def set_state(self, state: str): with self.lock: prev = self.state @@ -84,7 +96,7 @@ class Daemon: if prev != state: logging.debug("state: %s -> %s", prev, state) else: - logging.warning("redundant state set: %s, kindly inform the dev", state) + logging.debug("redundant state set: %s", state) def get_state(self): with self.lock: @@ -123,6 +135,7 @@ class Daemon: prev = self.state self.state = State.RECORDING logging.debug("state: %s -> %s", prev, self.state) + self._arm_cancel_listener() logging.info("recording started") if self.timer: self.timer.cancel() @@ -150,6 +163,7 @@ class Daemon: if self.timer: self.timer.cancel() self.timer = None + self._disarm_cancel_listener() prev = self.state self.state = State.STT logging.debug("state: %s -> %s", prev, self.state) @@ -179,7 +193,6 @@ class Daemon: return try: - self.set_state(State.STT) logging.info("stt started") text = self._transcribe(audio) except Exception as exc: @@ -256,6 +269,7 @@ class Daemon: def shutdown(self, timeout: float = 5.0) -> bool: self.request_shutdown() + self._disarm_cancel_listener() self.stop_recording(trigger="shutdown", process_audio=False) return self.wait_for_idle(timeout) @@ -402,7 +416,6 @@ def main(): cfg.daemon.hotkey, lambda: logging.info("hotkey pressed (dry-run)") if args.dry_run else daemon.toggle(), ) - desktop.start_cancel_listener(lambda: daemon.cancel_recording()) except Exception as exc: logging.error("hotkey setup failed: %s", exc) raise SystemExit(1) diff --git a/src/desktop.py b/src/desktop.py index 23ac5f0..d378bc4 100644 --- a/src/desktop.py +++ b/src/desktop.py @@ -11,6 +11,9 @@ class DesktopAdapter(Protocol): def start_cancel_listener(self, callback: Callable[[], None]) -> None: raise NotImplementedError + def stop_cancel_listener(self) -> None: + raise NotImplementedError + def inject_text( self, text: str, diff --git a/src/desktop_wayland.py b/src/desktop_wayland.py index 1da88a8..8e194f2 100644 --- a/src/desktop_wayland.py +++ b/src/desktop_wayland.py @@ -10,6 +10,9 @@ class WaylandAdapter: def start_cancel_listener(self, _callback: Callable[[], None]) -> None: raise SystemExit("Wayland hotkeys are not supported yet.") + def stop_cancel_listener(self) -> None: + raise SystemExit("Wayland hotkeys are not supported yet.") + def inject_text( self, _text: str, diff --git a/src/desktop_x11.py b/src/desktop_x11.py index 483ad58..b19f6e4 100644 --- a/src/desktop_x11.py +++ b/src/desktop_x11.py @@ -42,6 +42,9 @@ class X11Adapter: self.indicator = None self.status_icon = None self.menu = None + self._cancel_listener_lock = threading.Lock() + self._cancel_listener_stop_event: threading.Event | None = None + self._cancel_listener_callback: Callable[[], None] | None = None if AppIndicator3 is not None: self.indicator = AppIndicator3.Indicator.new( "aman", @@ -72,9 +75,35 @@ class X11Adapter: def start_cancel_listener(self, callback: Callable[[], None]) -> None: mods, keysym = self._parse_hotkey("Escape") - thread = threading.Thread(target=self._listen, args=(mods, keysym, callback), daemon=True) + with self._cancel_listener_lock: + if self._cancel_listener_stop_event is not None: + self._cancel_listener_callback = callback + return + stop_event = threading.Event() + self._cancel_listener_stop_event = stop_event + self._cancel_listener_callback = callback + + thread = threading.Thread( + target=self._listen, + args=(mods, keysym, self._dispatch_cancel_listener, stop_event), + daemon=True, + ) thread.start() + def stop_cancel_listener(self) -> None: + with self._cancel_listener_lock: + stop_event = self._cancel_listener_stop_event + self._cancel_listener_stop_event = None + self._cancel_listener_callback = None + if stop_event is not None: + stop_event.set() + + def _dispatch_cancel_listener(self) -> None: + with self._cancel_listener_lock: + callback = self._cancel_listener_callback + if callback is not None: + callback() + def inject_text( self, text: str, @@ -126,7 +155,14 @@ class X11Adapter: finally: self.request_quit() - def _listen(self, mods: int, keysym: int, callback: Callable[[], None]) -> None: + def _listen( + self, + mods: int, + keysym: int, + callback: Callable[[], None], + stop_event: threading.Event | None = None, + ) -> None: + local_stop = stop_event or threading.Event() disp = None root = None keycode = None @@ -134,14 +170,18 @@ class X11Adapter: disp = display.Display() root = disp.screen().root keycode = self._grab_hotkey(disp, root, mods, keysym) - while True: + while not local_stop.is_set(): + if disp.pending_events() == 0: + time.sleep(0.05) + continue ev = disp.next_event() if ev.type == X.KeyPress and ev.detail == keycode: state = ev.state & ~(X.LockMask | X.Mod2Mask) if state == mods: callback() except Exception as exc: - logging.error("hotkey listener stopped: %s", exc) + if not local_stop.is_set(): + logging.error("hotkey listener stopped: %s", exc) finally: if root is not None and keycode is not None and disp is not None: try: @@ -149,6 +189,11 @@ class X11Adapter: disp.sync() except Exception: pass + if disp is not None: + try: + disp.close() + except Exception: + pass def _parse_hotkey(self, hotkey: str): mods = 0 diff --git a/tests/test_aman.py b/tests/test_aman.py index d9a9867..50bbc3e 100644 --- a/tests/test_aman.py +++ b/tests/test_aman.py @@ -18,6 +18,17 @@ class FakeDesktop: def __init__(self): self.inject_calls = [] self.quit_calls = 0 + self.cancel_listener_start_calls = 0 + self.cancel_listener_stop_calls = 0 + self.cancel_listener_callback = None + + def start_cancel_listener(self, callback) -> None: + self.cancel_listener_start_calls += 1 + self.cancel_listener_callback = callback + + def stop_cancel_listener(self) -> None: + self.cancel_listener_stop_calls += 1 + self.cancel_listener_callback = None def inject_text( self, @@ -239,6 +250,28 @@ class DaemonTests(unittest.TestCase): any("DEBUG:root:state: idle -> recording" in line for line in logs.output) ) + @patch("aman.stop_audio_recording", return_value=FakeAudio(8)) + @patch("aman.start_audio_recording", return_value=(object(), object())) + def test_cancel_listener_armed_only_while_recording(self, _start_mock, _stop_mock): + desktop = FakeDesktop() + daemon = self._build_daemon(desktop, FakeModel(), verbose=False) + daemon._start_stop_worker = ( + lambda stream, record, trigger, process_audio: daemon._stop_and_process( + stream, record, trigger, process_audio + ) + ) + + self.assertIsNone(desktop.cancel_listener_callback) + daemon.toggle() + self.assertEqual(desktop.cancel_listener_start_calls, 1) + self.assertEqual(desktop.cancel_listener_stop_calls, 0) + self.assertIsNotNone(desktop.cancel_listener_callback) + + daemon.toggle() + self.assertEqual(desktop.cancel_listener_start_calls, 1) + self.assertEqual(desktop.cancel_listener_stop_calls, 1) + self.assertIsNone(desktop.cancel_listener_callback) + class LockTests(unittest.TestCase): def test_lock_rejects_second_instance(self):