Scope Esc cancel listener to active recording

This commit is contained in:
Thales Maciel 2026-02-26 16:28:49 -03:00
parent e5d709a393
commit 64c8c26bce
6 changed files with 105 additions and 7 deletions

View file

@ -127,6 +127,7 @@ systemctl --user enable --now aman
- Press the hotkey once to start recording. - Press the hotkey once to start recording.
- Press it again to stop and run STT. - Press it again to stop and run STT.
- Press `Esc` while recording to cancel without processing. - Press `Esc` while recording to cancel without processing.
- `Esc` is only captured during active recording.
- Transcript contents are logged only when `-v/--verbose` is used. - Transcript contents are logged only when `-v/--verbose` is used.
Wayland note: Wayland note:

View file

@ -77,6 +77,18 @@ class Daemon:
self.vocabulary = VocabularyEngine(cfg.vocabulary) self.vocabulary = VocabularyEngine(cfg.vocabulary)
self._stt_hint_kwargs_cache: dict[str, Any] | None = None self._stt_hint_kwargs_cache: dict[str, Any] | None = None
def _arm_cancel_listener(self):
try:
self.desktop.start_cancel_listener(lambda: self.cancel_recording())
except Exception as exc:
logging.error("failed to start cancel listener: %s", exc)
def _disarm_cancel_listener(self):
try:
self.desktop.stop_cancel_listener()
except Exception as exc:
logging.debug("failed to stop cancel listener: %s", exc)
def set_state(self, state: str): def set_state(self, state: str):
with self.lock: with self.lock:
prev = self.state prev = self.state
@ -84,7 +96,7 @@ class Daemon:
if prev != state: if prev != state:
logging.debug("state: %s -> %s", prev, state) logging.debug("state: %s -> %s", prev, state)
else: else:
logging.warning("redundant state set: %s, kindly inform the dev", state) logging.debug("redundant state set: %s", state)
def get_state(self): def get_state(self):
with self.lock: with self.lock:
@ -123,6 +135,7 @@ class Daemon:
prev = self.state prev = self.state
self.state = State.RECORDING self.state = State.RECORDING
logging.debug("state: %s -> %s", prev, self.state) logging.debug("state: %s -> %s", prev, self.state)
self._arm_cancel_listener()
logging.info("recording started") logging.info("recording started")
if self.timer: if self.timer:
self.timer.cancel() self.timer.cancel()
@ -150,6 +163,7 @@ class Daemon:
if self.timer: if self.timer:
self.timer.cancel() self.timer.cancel()
self.timer = None self.timer = None
self._disarm_cancel_listener()
prev = self.state prev = self.state
self.state = State.STT self.state = State.STT
logging.debug("state: %s -> %s", prev, self.state) logging.debug("state: %s -> %s", prev, self.state)
@ -179,7 +193,6 @@ class Daemon:
return return
try: try:
self.set_state(State.STT)
logging.info("stt started") logging.info("stt started")
text = self._transcribe(audio) text = self._transcribe(audio)
except Exception as exc: except Exception as exc:
@ -256,6 +269,7 @@ class Daemon:
def shutdown(self, timeout: float = 5.0) -> bool: def shutdown(self, timeout: float = 5.0) -> bool:
self.request_shutdown() self.request_shutdown()
self._disarm_cancel_listener()
self.stop_recording(trigger="shutdown", process_audio=False) self.stop_recording(trigger="shutdown", process_audio=False)
return self.wait_for_idle(timeout) return self.wait_for_idle(timeout)
@ -402,7 +416,6 @@ def main():
cfg.daemon.hotkey, cfg.daemon.hotkey,
lambda: logging.info("hotkey pressed (dry-run)") if args.dry_run else daemon.toggle(), lambda: logging.info("hotkey pressed (dry-run)") if args.dry_run else daemon.toggle(),
) )
desktop.start_cancel_listener(lambda: daemon.cancel_recording())
except Exception as exc: except Exception as exc:
logging.error("hotkey setup failed: %s", exc) logging.error("hotkey setup failed: %s", exc)
raise SystemExit(1) raise SystemExit(1)

View file

@ -11,6 +11,9 @@ class DesktopAdapter(Protocol):
def start_cancel_listener(self, callback: Callable[[], None]) -> None: def start_cancel_listener(self, callback: Callable[[], None]) -> None:
raise NotImplementedError raise NotImplementedError
def stop_cancel_listener(self) -> None:
raise NotImplementedError
def inject_text( def inject_text(
self, self,
text: str, text: str,

View file

@ -10,6 +10,9 @@ class WaylandAdapter:
def start_cancel_listener(self, _callback: Callable[[], None]) -> None: def start_cancel_listener(self, _callback: Callable[[], None]) -> None:
raise SystemExit("Wayland hotkeys are not supported yet.") raise SystemExit("Wayland hotkeys are not supported yet.")
def stop_cancel_listener(self) -> None:
raise SystemExit("Wayland hotkeys are not supported yet.")
def inject_text( def inject_text(
self, self,
_text: str, _text: str,

View file

@ -42,6 +42,9 @@ class X11Adapter:
self.indicator = None self.indicator = None
self.status_icon = None self.status_icon = None
self.menu = None self.menu = None
self._cancel_listener_lock = threading.Lock()
self._cancel_listener_stop_event: threading.Event | None = None
self._cancel_listener_callback: Callable[[], None] | None = None
if AppIndicator3 is not None: if AppIndicator3 is not None:
self.indicator = AppIndicator3.Indicator.new( self.indicator = AppIndicator3.Indicator.new(
"aman", "aman",
@ -72,9 +75,35 @@ class X11Adapter:
def start_cancel_listener(self, callback: Callable[[], None]) -> None: def start_cancel_listener(self, callback: Callable[[], None]) -> None:
mods, keysym = self._parse_hotkey("Escape") mods, keysym = self._parse_hotkey("Escape")
thread = threading.Thread(target=self._listen, args=(mods, keysym, callback), daemon=True) with self._cancel_listener_lock:
if self._cancel_listener_stop_event is not None:
self._cancel_listener_callback = callback
return
stop_event = threading.Event()
self._cancel_listener_stop_event = stop_event
self._cancel_listener_callback = callback
thread = threading.Thread(
target=self._listen,
args=(mods, keysym, self._dispatch_cancel_listener, stop_event),
daemon=True,
)
thread.start() thread.start()
def stop_cancel_listener(self) -> None:
with self._cancel_listener_lock:
stop_event = self._cancel_listener_stop_event
self._cancel_listener_stop_event = None
self._cancel_listener_callback = None
if stop_event is not None:
stop_event.set()
def _dispatch_cancel_listener(self) -> None:
with self._cancel_listener_lock:
callback = self._cancel_listener_callback
if callback is not None:
callback()
def inject_text( def inject_text(
self, self,
text: str, text: str,
@ -126,7 +155,14 @@ class X11Adapter:
finally: finally:
self.request_quit() self.request_quit()
def _listen(self, mods: int, keysym: int, callback: Callable[[], None]) -> None: def _listen(
self,
mods: int,
keysym: int,
callback: Callable[[], None],
stop_event: threading.Event | None = None,
) -> None:
local_stop = stop_event or threading.Event()
disp = None disp = None
root = None root = None
keycode = None keycode = None
@ -134,14 +170,18 @@ class X11Adapter:
disp = display.Display() disp = display.Display()
root = disp.screen().root root = disp.screen().root
keycode = self._grab_hotkey(disp, root, mods, keysym) keycode = self._grab_hotkey(disp, root, mods, keysym)
while True: while not local_stop.is_set():
if disp.pending_events() == 0:
time.sleep(0.05)
continue
ev = disp.next_event() ev = disp.next_event()
if ev.type == X.KeyPress and ev.detail == keycode: if ev.type == X.KeyPress and ev.detail == keycode:
state = ev.state & ~(X.LockMask | X.Mod2Mask) state = ev.state & ~(X.LockMask | X.Mod2Mask)
if state == mods: if state == mods:
callback() callback()
except Exception as exc: except Exception as exc:
logging.error("hotkey listener stopped: %s", exc) if not local_stop.is_set():
logging.error("hotkey listener stopped: %s", exc)
finally: finally:
if root is not None and keycode is not None and disp is not None: if root is not None and keycode is not None and disp is not None:
try: try:
@ -149,6 +189,11 @@ class X11Adapter:
disp.sync() disp.sync()
except Exception: except Exception:
pass pass
if disp is not None:
try:
disp.close()
except Exception:
pass
def _parse_hotkey(self, hotkey: str): def _parse_hotkey(self, hotkey: str):
mods = 0 mods = 0

View file

@ -18,6 +18,17 @@ class FakeDesktop:
def __init__(self): def __init__(self):
self.inject_calls = [] self.inject_calls = []
self.quit_calls = 0 self.quit_calls = 0
self.cancel_listener_start_calls = 0
self.cancel_listener_stop_calls = 0
self.cancel_listener_callback = None
def start_cancel_listener(self, callback) -> None:
self.cancel_listener_start_calls += 1
self.cancel_listener_callback = callback
def stop_cancel_listener(self) -> None:
self.cancel_listener_stop_calls += 1
self.cancel_listener_callback = None
def inject_text( def inject_text(
self, self,
@ -239,6 +250,28 @@ class DaemonTests(unittest.TestCase):
any("DEBUG:root:state: idle -> recording" in line for line in logs.output) any("DEBUG:root:state: idle -> recording" in line for line in logs.output)
) )
@patch("aman.stop_audio_recording", return_value=FakeAudio(8))
@patch("aman.start_audio_recording", return_value=(object(), object()))
def test_cancel_listener_armed_only_while_recording(self, _start_mock, _stop_mock):
desktop = FakeDesktop()
daemon = self._build_daemon(desktop, FakeModel(), verbose=False)
daemon._start_stop_worker = (
lambda stream, record, trigger, process_audio: daemon._stop_and_process(
stream, record, trigger, process_audio
)
)
self.assertIsNone(desktop.cancel_listener_callback)
daemon.toggle()
self.assertEqual(desktop.cancel_listener_start_calls, 1)
self.assertEqual(desktop.cancel_listener_stop_calls, 0)
self.assertIsNotNone(desktop.cancel_listener_callback)
daemon.toggle()
self.assertEqual(desktop.cancel_listener_start_calls, 1)
self.assertEqual(desktop.cancel_listener_stop_calls, 1)
self.assertIsNone(desktop.cancel_listener_callback)
class LockTests(unittest.TestCase): class LockTests(unittest.TestCase):
def test_lock_rejects_second_instance(self): def test_lock_rejects_second_instance(self):