from __future__ import annotations import logging import threading import time import warnings from typing import Callable, Iterable import gi from Xlib import X, XK, display from Xlib.ext import xtest gi.require_version("Gdk", "3.0") gi.require_version("Gtk", "3.0") try: gi.require_version("AppIndicator3", "0.1") from gi.repository import AppIndicator3 # type: ignore[import-not-found] except ValueError: AppIndicator3 = None from gi.repository import GLib, Gdk, Gtk # type: ignore[import-not-found] from constants import ASSETS_DIR, TRAY_UPDATE_MS MOD_MAP = { "shift": X.ShiftMask, "ctrl": X.ControlMask, "control": X.ControlMask, "alt": X.Mod1Mask, "mod1": X.Mod1Mask, "super": X.Mod4Mask, "mod4": X.Mod4Mask, "cmd": X.Mod4Mask, "command": X.Mod4Mask, } CLIPBOARD_RESTORE_DELAY_SEC = 0.15 class X11Adapter: def __init__(self): self.indicator = None self.status_icon = None self.menu = None if AppIndicator3 is not None: self.indicator = AppIndicator3.Indicator.new( "aman", self._icon_path("idle"), AppIndicator3.IndicatorCategory.APPLICATION_STATUS, ) self.indicator.set_status(AppIndicator3.IndicatorStatus.ACTIVE) else: logging.warning("AppIndicator3 unavailable; falling back to deprecated Gtk.StatusIcon") warnings.filterwarnings( "ignore", message=".*Gtk.StatusIcon.*", category=DeprecationWarning, ) self.status_icon = Gtk.StatusIcon() self.status_icon.set_visible(True) self.status_icon.connect("popup-menu", self._on_tray_menu) def _on_tray_menu(self, _icon, _button, _time): if self.menu: self.menu.popup(None, None, None, None, 0, _time) def start_hotkey_listener(self, hotkey: str, callback: Callable[[], None]) -> None: thread = threading.Thread(target=self._listen, args=(hotkey, callback), daemon=True) thread.start() def start_cancel_listener(self, callback: Callable[[], None]) -> None: thread = threading.Thread(target=self._listen, args=("Escape", callback), daemon=True) thread.start() def inject_text( self, text: str, backend: str, *, remove_transcription_from_clipboard: bool = False, ) -> None: backend = (backend or "").strip().lower() if backend in ("", "clipboard"): previous_clipboard = None if remove_transcription_from_clipboard: previous_clipboard = self._read_clipboard_text() self._write_clipboard(text) self._paste_clipboard() if remove_transcription_from_clipboard: time.sleep(CLIPBOARD_RESTORE_DELAY_SEC) self._restore_clipboard_text(previous_clipboard) return if backend == "injection": self._type_text(text) return raise ValueError(f"unknown injection backend: {backend}") def _read_clipboard_text(self) -> str | None: Gtk.init([]) clipboard = Gtk.Clipboard.get(Gdk.SELECTION_CLIPBOARD) text = clipboard.wait_for_text() return str(text) if text is not None else None def run_tray(self, state_getter: Callable[[], str], on_quit: Callable[[], None]) -> None: self.menu = Gtk.Menu() quit_item = Gtk.MenuItem(label="Quit") quit_item.connect("activate", lambda *_: self._handle_quit(on_quit)) self.menu.append(quit_item) self.menu.show_all() if self.indicator is not None: self.indicator.set_menu(self.menu) self._update_tray(state_getter) GLib.timeout_add(TRAY_UPDATE_MS, self._update_tray, state_getter) Gtk.main() def request_quit(self) -> None: GLib.idle_add(Gtk.main_quit) def _handle_quit(self, on_quit: Callable[[], None]) -> None: try: on_quit() finally: self.request_quit() def _listen(self, hotkey: str, callback: Callable[[], None]) -> None: disp = None root = None keycode = None try: disp = display.Display() root = disp.screen().root mods, keysym = self._parse_hotkey(hotkey) keycode = self._grab_hotkey(disp, root, mods, keysym) while True: ev = disp.next_event() if ev.type == X.KeyPress and ev.detail == keycode: state = ev.state & ~(X.LockMask | X.Mod2Mask) if state == mods: callback() except Exception as exc: logging.error("hotkey listener stopped: %s", exc) finally: if root is not None and keycode is not None and disp is not None: try: root.ungrab_key(keycode, X.AnyModifier) disp.sync() except Exception: pass def _parse_hotkey(self, hotkey: str): parts = [p.strip() for p in hotkey.split("+") if p.strip()] mods = 0 key_part = None for p in parts: low = p.lower() if low in MOD_MAP: mods |= MOD_MAP[low] else: key_part = p if not key_part: raise ValueError("hotkey missing key") keysym = XK.string_to_keysym(key_part) if keysym == 0 and len(key_part) == 1: keysym = ord(key_part) if keysym == 0: raise ValueError(f"unsupported key: {key_part}") return mods, keysym def _grab_hotkey(self, disp, root, mods, keysym): keycode = disp.keysym_to_keycode(keysym) root.grab_key(keycode, mods, True, X.GrabModeAsync, X.GrabModeAsync) root.grab_key(keycode, mods | X.LockMask, True, X.GrabModeAsync, X.GrabModeAsync) root.grab_key(keycode, mods | X.Mod2Mask, True, X.GrabModeAsync, X.GrabModeAsync) root.grab_key(keycode, mods | X.LockMask | X.Mod2Mask, True, X.GrabModeAsync, X.GrabModeAsync) disp.sync() return keycode def _write_clipboard(self, text: str) -> None: Gtk.init([]) clipboard = Gtk.Clipboard.get(Gdk.SELECTION_CLIPBOARD) clipboard.set_text(text, -1) clipboard.store() while Gtk.events_pending(): Gtk.main_iteration() def _restore_clipboard_text(self, text: str | None) -> None: Gtk.init([]) clipboard = Gtk.Clipboard.get(Gdk.SELECTION_CLIPBOARD) clipboard.set_text(text or "", -1) clipboard.store() while Gtk.events_pending(): Gtk.main_iteration() def _paste_clipboard(self) -> None: dpy = display.Display() self._send_combo(dpy, ["Control_L", "Shift_L", "v"]) def _type_text(self, text: str) -> None: if not text: return dpy = display.Display() for ch in text: if ch == "\n": self._send_combo(dpy, ["Return"]) continue keysym, needs_shift = self._keysym_for_char(ch) if keysym is None: continue if needs_shift: self._send_combo(dpy, ["Shift_L", keysym], already_keysym=True) else: self._send_combo(dpy, [keysym], already_keysym=True) def _send_combo(self, dpy: display.Display, keys: Iterable[str | int], already_keysym: bool = False) -> None: keycodes: list[int] = [] for key in keys: keysym = key if already_keysym else XK.string_to_keysym(str(key)) if keysym == 0: continue keycode = dpy.keysym_to_keycode(int(keysym)) if keycode == 0: continue keycodes.append(keycode) for code in keycodes: xtest.fake_input(dpy, X.KeyPress, code) for code in reversed(keycodes): xtest.fake_input(dpy, X.KeyRelease, code) dpy.flush() def _keysym_for_char(self, ch: str) -> tuple[int | None, bool]: if ch.isupper(): base = ch.lower() keysym = XK.string_to_keysym(base) return (keysym if keysym != 0 else None, True) if ch in _SHIFTED: keysym = XK.string_to_keysym(_SHIFTED[ch]) return (keysym if keysym != 0 else None, True) if ch == " ": return (XK.string_to_keysym("space"), False) keysym = XK.string_to_keysym(ch) return (keysym if keysym != 0 else None, False) def _icon_path(self, state: str) -> str: if state == "recording": return str(ASSETS_DIR / "recording.png") if state == "stt": return str(ASSETS_DIR / "stt.png") if state == "processing": return str(ASSETS_DIR / "processing.png") return str(ASSETS_DIR / "idle.png") def _title(self, state: str) -> str: if state == "recording": return "Recording" if state == "stt": return "STT" if state == "processing": return "AI Processing" return "Idle" def _update_tray(self, state_getter: Callable[[], str]): state = state_getter() icon_path = self._icon_path(state) if self.indicator is not None: self.indicator.set_icon_full(icon_path, self._title(state)) self.indicator.set_label(self._title(state), "") elif self.status_icon is not None: self.status_icon.set_from_file(icon_path) self.status_icon.set_tooltip_text(self._title(state)) return True _SHIFTED = { "!": "1", "@": "2", "#": "3", "$": "4", "%": "5", "^": "6", "&": "7", "*": "8", "(": "9", ")": "0", "_": "-", "+": "=", "{": "[", "}": "]", "|": "\\", ":": ";", "\"": "'", "<": ",", ">": ".", "?": "/", }