from __future__ import annotations import logging import threading import time import warnings from typing import Any, Callable, Iterable import gi from Xlib import X, XK, display from Xlib.ext import xtest gi.require_version("Gdk", "3.0") gi.require_version("Gtk", "3.0") try: gi.require_version("AppIndicator3", "0.1") from gi.repository import AppIndicator3 # type: ignore[import-not-found] except (ImportError, ValueError): AppIndicator3 = None from gi.repository import GLib, Gdk, Gtk # type: ignore[import-not-found] from constants import ASSETS_DIR, TRAY_UPDATE_MS from hotkey import split_hotkey MOD_MAP = { "shift": X.ShiftMask, "ctrl": X.ControlMask, "control": X.ControlMask, "alt": X.Mod1Mask, "mod1": X.Mod1Mask, "super": X.Mod4Mask, "mod4": X.Mod4Mask, "cmd": X.Mod4Mask, "command": X.Mod4Mask, } CLIPBOARD_RESTORE_DELAY_SEC = 0.15 class X11Adapter: def __init__(self): self.indicator = None self.status_icon = None self.menu = None self._edit_window = None self._edit_text_view = None self._edit_text_buffer = None self._edit_status_label = None self._edit_callbacks: dict[str, Callable[[], None]] = {} self._edit_previous_focus_window_id: int | None = None self._cancel_listener_lock = threading.Lock() self._cancel_listener_stop_event: threading.Event | None = None self._cancel_listener_callback: Callable[[], None] | None = None if AppIndicator3 is not None: self.indicator = AppIndicator3.Indicator.new( "aman", self._icon_path("idle"), AppIndicator3.IndicatorCategory.APPLICATION_STATUS, ) self.indicator.set_status(AppIndicator3.IndicatorStatus.ACTIVE) else: logging.warning("AppIndicator3 unavailable; falling back to deprecated Gtk.StatusIcon") warnings.filterwarnings( "ignore", message=".*Gtk.StatusIcon.*", category=DeprecationWarning, ) self.status_icon = Gtk.StatusIcon() self.status_icon.set_visible(True) self.status_icon.connect("popup-menu", self._on_tray_menu) def _on_tray_menu(self, _icon, _button, _time): if self.menu: self.menu.popup(None, None, None, None, 0, _time) def start_hotkey_listener(self, hotkey: str, callback: Callable[[], None]) -> None: mods, keysym = self._parse_hotkey(hotkey) self._validate_hotkey_registration(mods, keysym) thread = threading.Thread(target=self._listen, args=(mods, keysym, callback), daemon=True) thread.start() def start_cancel_listener(self, callback: Callable[[], None]) -> None: mods, keysym = self._parse_hotkey("Escape") with self._cancel_listener_lock: if self._cancel_listener_stop_event is not None: self._cancel_listener_callback = callback return self._cancel_listener_callback = callback stop_event = threading.Event() self._cancel_listener_stop_event = stop_event thread = threading.Thread( target=self._listen, args=(mods, keysym, self._dispatch_cancel_listener, stop_event), daemon=True, ) thread.start() def stop_cancel_listener(self) -> None: stop_event = None with self._cancel_listener_lock: stop_event = self._cancel_listener_stop_event self._cancel_listener_stop_event = None self._cancel_listener_callback = None if stop_event is not None: stop_event.set() def _dispatch_cancel_listener(self) -> None: callback = None with self._cancel_listener_lock: callback = self._cancel_listener_callback if callback is not None: callback() def inject_text( self, text: str, backend: str, *, remove_transcription_from_clipboard: bool = False, ) -> None: backend = (backend or "").strip().lower() if backend == "clipboard": previous_clipboard = None if remove_transcription_from_clipboard: previous_clipboard = self.read_clipboard_text() self.write_clipboard_text(text) self._paste_clipboard() if remove_transcription_from_clipboard: time.sleep(CLIPBOARD_RESTORE_DELAY_SEC) self._set_clipboard_text(previous_clipboard or "") return if backend == "injection": self._type_text(text) return raise ValueError(f"unknown injection backend: {backend}") def read_clipboard_text(self) -> str | None: return self._run_on_ui_thread(self._read_clipboard_text_ui) def write_clipboard_text(self, text: str) -> None: self._run_on_ui_thread(lambda: self._set_clipboard_text(text)) def open_edit_popup( self, initial_text: str, *, on_submit: Callable[[], None], on_copy: Callable[[], None], on_cancel: Callable[[], None], ) -> None: self._run_on_ui_thread( lambda: self._open_edit_popup_ui( initial_text, on_submit=on_submit, on_copy=on_copy, on_cancel=on_cancel, ) ) def close_edit_popup(self) -> None: self._run_on_ui_thread(self._close_edit_popup_ui) def get_edit_popup_text(self) -> str: return self._run_on_ui_thread(self._get_edit_popup_text_ui) def set_edit_popup_text(self, text: str) -> None: self._run_on_ui_thread(lambda: self._set_edit_popup_text_ui(text)) def set_edit_popup_status(self, status: str) -> None: self._run_on_ui_thread(lambda: self._set_edit_popup_status_ui(status)) def restore_previous_focus(self) -> bool: window_id = self._edit_previous_focus_window_id if window_id is None: return False try: dpy = display.Display() window = dpy.create_resource_object("window", window_id) window.set_input_focus(X.RevertToParent, X.CurrentTime) dpy.sync() dpy.close() return True except Exception as exc: logging.warning("focus restore failed: %s", exc) return False def _open_edit_popup_ui( self, initial_text: str, *, on_submit: Callable[[], None], on_copy: Callable[[], None], on_cancel: Callable[[], None], ) -> None: if self._edit_window is not None: raise RuntimeError("edit popup is already open") self._edit_previous_focus_window_id = self._current_focus_window_id() self._edit_callbacks = { "submit": on_submit, "copy": on_copy, "cancel": on_cancel, } window = Gtk.Window(type=Gtk.WindowType.TOPLEVEL) window.set_title("Aman Editor") window.set_default_size(900, 520) window.set_position(Gtk.WindowPosition.CENTER) window.set_type_hint(Gdk.WindowTypeHint.UTILITY) window.set_skip_taskbar_hint(True) window.set_skip_pager_hint(True) window.set_keep_above(True) window.connect("key-press-event", self._on_edit_key_press) window.connect("delete-event", self._on_edit_delete_event) container = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=8) container.set_border_width(12) window.add(container) status_label = Gtk.Label(label="Recording...") status_label.set_xalign(0.0) container.pack_start(status_label, False, False, 0) scrolled = Gtk.ScrolledWindow() scrolled.set_hexpand(True) scrolled.set_vexpand(True) container.pack_start(scrolled, True, True, 0) text_view = Gtk.TextView() text_view.set_wrap_mode(Gtk.WrapMode.WORD_CHAR) text_view.connect("key-press-event", self._on_edit_key_press) scrolled.add(text_view) text_buffer = text_view.get_buffer() text_buffer.set_text(initial_text or "") self._edit_window = window self._edit_text_view = text_view self._edit_text_buffer = text_buffer self._edit_status_label = status_label window.show_all() text_view.grab_focus() window.present() def _on_edit_delete_event(self, _widget, _event): self._invoke_edit_callback("cancel") return True def _on_edit_key_press(self, _widget, event): key = event.keyval state = event.state is_ctrl = bool(state & Gdk.ModifierType.CONTROL_MASK) if key == Gdk.KEY_Escape: self._invoke_edit_callback("cancel") return True if is_ctrl and key in (Gdk.KEY_c, Gdk.KEY_C): self._invoke_edit_callback("copy") return True if key in (Gdk.KEY_Return, Gdk.KEY_KP_Enter): self._invoke_edit_callback("submit") return True return False def _invoke_edit_callback(self, name: str) -> None: callback = self._edit_callbacks.get(name) if callback is None: return try: callback() except Exception as exc: logging.error("edit popup callback failed (%s): %s", name, exc) def _close_edit_popup_ui(self) -> None: if self._edit_window is not None: try: self._edit_window.destroy() except Exception: pass self._edit_window = None self._edit_text_view = None self._edit_text_buffer = None self._edit_status_label = None self._edit_callbacks = {} def _get_edit_popup_text_ui(self) -> str: buffer = self._edit_text_buffer if buffer is None: return "" start = buffer.get_start_iter() end = buffer.get_end_iter() return buffer.get_text(start, end, True) def _set_edit_popup_text_ui(self, text: str) -> None: buffer = self._edit_text_buffer if buffer is None: return buffer.set_text(text or "") def _set_edit_popup_status_ui(self, status: str) -> None: label = self._edit_status_label if label is None: return label.set_text(status or "") def _current_focus_window_id(self) -> int | None: try: dpy = display.Display() focused = dpy.get_input_focus().focus window_id = getattr(focused, "id", None) dpy.close() if isinstance(window_id, int) and window_id > 0: return window_id return None except Exception: return None def _run_on_ui_thread(self, fn: Callable[[], Any]) -> Any: if threading.current_thread() is threading.main_thread(): return fn() done = threading.Event() result: dict[str, Any] = {} def runner(): try: result["value"] = fn() except Exception as exc: result["error"] = exc finally: done.set() return False GLib.idle_add(runner) done.wait() error = result.get("error") if error is not None: raise error return result.get("value") def _read_clipboard_text_ui(self) -> str | None: Gtk.init([]) clipboard = Gtk.Clipboard.get(Gdk.SELECTION_CLIPBOARD) text = clipboard.wait_for_text() return str(text) if text is not None else None def _set_clipboard_text(self, text: str) -> None: Gtk.init([]) clipboard = Gtk.Clipboard.get(Gdk.SELECTION_CLIPBOARD) clipboard.set_text(text, -1) clipboard.store() while Gtk.events_pending(): Gtk.main_iteration() def run_tray(self, state_getter: Callable[[], str], on_quit: Callable[[], None]) -> None: self.menu = Gtk.Menu() quit_item = Gtk.MenuItem(label="Quit") quit_item.connect("activate", lambda *_: self._handle_quit(on_quit)) self.menu.append(quit_item) self.menu.show_all() if self.indicator is not None: self.indicator.set_menu(self.menu) self._update_tray(state_getter) GLib.timeout_add(TRAY_UPDATE_MS, self._update_tray, state_getter) Gtk.main() def request_quit(self) -> None: GLib.idle_add(Gtk.main_quit) def _handle_quit(self, on_quit: Callable[[], None]) -> None: try: on_quit() finally: self.request_quit() def _listen( self, mods: int, keysym: int, callback: Callable[[], None], stop_event: threading.Event | None = None, ) -> None: local_stop = stop_event or threading.Event() disp = None root = None keycode = None try: disp = display.Display() root = disp.screen().root keycode = self._grab_hotkey(disp, root, mods, keysym) while not local_stop.is_set(): if disp.pending_events() == 0: time.sleep(0.05) continue ev = disp.next_event() if ev.type == X.KeyPress and ev.detail == keycode: state = ev.state & ~(X.LockMask | X.Mod2Mask) if state == mods: callback() except Exception as exc: if not local_stop.is_set(): logging.error("hotkey listener stopped: %s", exc) finally: if root is not None and keycode is not None and disp is not None: try: root.ungrab_key(keycode, X.AnyModifier) disp.sync() except Exception: pass if disp is not None: try: disp.close() except Exception: pass def _parse_hotkey(self, hotkey: str): mods = 0 modifier_parts, key_part = split_hotkey(hotkey) for modifier in modifier_parts: mods |= MOD_MAP[modifier] keysym = XK.string_to_keysym(key_part) if keysym == 0 and len(key_part) == 1: keysym = ord(key_part) if keysym == 0: raise ValueError(f"unsupported key: {key_part}") return mods, keysym def _validate_hotkey_registration(self, mods: int, keysym: int) -> None: disp = None root = None keycode = None try: disp = display.Display() root = disp.screen().root keycode = self._grab_hotkey(disp, root, mods, keysym) finally: if root is not None and keycode is not None and disp is not None: try: root.ungrab_key(keycode, X.AnyModifier) disp.sync() except Exception: pass if disp is not None: try: disp.close() except Exception: pass def _grab_hotkey(self, disp, root, mods, keysym): keycode = disp.keysym_to_keycode(keysym) if keycode == 0: raise ValueError("hotkey is not available on this keyboard layout") root.grab_key(keycode, mods, True, X.GrabModeAsync, X.GrabModeAsync) root.grab_key(keycode, mods | X.LockMask, True, X.GrabModeAsync, X.GrabModeAsync) root.grab_key(keycode, mods | X.Mod2Mask, True, X.GrabModeAsync, X.GrabModeAsync) root.grab_key(keycode, mods | X.LockMask | X.Mod2Mask, True, X.GrabModeAsync, X.GrabModeAsync) disp.sync() return keycode def _paste_clipboard(self) -> None: dpy = display.Display() self._send_combo(dpy, ["Control_L", "Shift_L", "v"]) def _type_text(self, text: str) -> None: if not text: return dpy = display.Display() for ch in text: if ch == "\n": self._send_combo(dpy, ["Return"]) continue keysym, needs_shift = self._keysym_for_char(ch) if keysym is None: continue if needs_shift: self._send_combo(dpy, ["Shift_L", keysym], already_keysym=True) else: self._send_combo(dpy, [keysym], already_keysym=True) def _send_combo(self, dpy: display.Display, keys: Iterable[str | int], already_keysym: bool = False) -> None: keycodes: list[int] = [] for key in keys: keysym = key if already_keysym else XK.string_to_keysym(str(key)) if keysym == 0: continue keycode = dpy.keysym_to_keycode(int(keysym)) if keycode == 0: continue keycodes.append(keycode) for code in keycodes: xtest.fake_input(dpy, X.KeyPress, code) for code in reversed(keycodes): xtest.fake_input(dpy, X.KeyRelease, code) dpy.flush() def _keysym_for_char(self, ch: str) -> tuple[int | None, bool]: if ch.isupper(): base = ch.lower() keysym = XK.string_to_keysym(base) return (keysym if keysym != 0 else None, True) if ch in _SHIFTED: keysym = XK.string_to_keysym(_SHIFTED[ch]) return (keysym if keysym != 0 else None, True) if ch == " ": return (XK.string_to_keysym("space"), False) keysym = XK.string_to_keysym(ch) return (keysym if keysym != 0 else None, False) def _icon_path(self, state: str) -> str: if state in ("recording", "edit_recording"): return str(ASSETS_DIR / "recording.png") if state in ("stt", "edit_stt"): return str(ASSETS_DIR / "stt.png") if state in ("processing", "outputting", "edit_processing"): return str(ASSETS_DIR / "processing.png") return str(ASSETS_DIR / "idle.png") def _title(self, state: str) -> str: if state == "recording": return "Recording" if state == "stt": return "STT" if state == "processing": return "AI Processing" if state == "outputting": return "Outputting" if state == "edit_recording": return "Editing: Recording" if state == "edit_stt": return "Editing: STT" if state == "edit_processing": return "Editing: Processing" if state == "edit_idle": return "Editing" return "Idle" def _update_tray(self, state_getter: Callable[[], str]): state = state_getter() icon_path = self._icon_path(state) if self.indicator is not None: self.indicator.set_icon_full(icon_path, self._title(state)) self.indicator.set_label(self._title(state), "") elif self.status_icon is not None: self.status_icon.set_from_file(icon_path) self.status_icon.set_tooltip_text(self._title(state)) return True _SHIFTED = { "!": "1", "@": "2", "#": "3", "$": "4", "%": "5", "^": "6", "&": "7", "*": "8", "(": "9", ")": "0", "_": "-", "+": "=", "{": "[", "}": "]", "|": "\\", ":": ";", "\"": "'", "<": ",", ">": ".", "?": "/", }