aman/src/desktop_x11.py

314 lines
11 KiB
Python

from __future__ import annotations
import logging
import threading
import time
import warnings
from typing import Callable, Iterable
import gi
from Xlib import X, XK, display
from Xlib.ext import xtest
gi.require_version("Gdk", "3.0")
gi.require_version("Gtk", "3.0")
try:
gi.require_version("AppIndicator3", "0.1")
from gi.repository import AppIndicator3 # type: ignore[import-not-found]
except (ImportError, ValueError):
AppIndicator3 = None
from gi.repository import GLib, Gdk, Gtk # type: ignore[import-not-found]
from constants import ASSETS_DIR, TRAY_UPDATE_MS
from hotkey import split_hotkey
MOD_MAP = {
"shift": X.ShiftMask,
"ctrl": X.ControlMask,
"control": X.ControlMask,
"alt": X.Mod1Mask,
"mod1": X.Mod1Mask,
"super": X.Mod4Mask,
"mod4": X.Mod4Mask,
"cmd": X.Mod4Mask,
"command": X.Mod4Mask,
}
CLIPBOARD_RESTORE_DELAY_SEC = 0.15
class X11Adapter:
def __init__(self):
self.indicator = None
self.status_icon = None
self.menu = None
if AppIndicator3 is not None:
self.indicator = AppIndicator3.Indicator.new(
"aman",
self._icon_path("idle"),
AppIndicator3.IndicatorCategory.APPLICATION_STATUS,
)
self.indicator.set_status(AppIndicator3.IndicatorStatus.ACTIVE)
else:
logging.warning("AppIndicator3 unavailable; falling back to deprecated Gtk.StatusIcon")
warnings.filterwarnings(
"ignore",
message=".*Gtk.StatusIcon.*",
category=DeprecationWarning,
)
self.status_icon = Gtk.StatusIcon()
self.status_icon.set_visible(True)
self.status_icon.connect("popup-menu", self._on_tray_menu)
def _on_tray_menu(self, _icon, _button, _time):
if self.menu:
self.menu.popup(None, None, None, None, 0, _time)
def start_hotkey_listener(self, hotkey: str, callback: Callable[[], None]) -> None:
mods, keysym = self._parse_hotkey(hotkey)
self._validate_hotkey_registration(mods, keysym)
thread = threading.Thread(target=self._listen, args=(mods, keysym, callback), daemon=True)
thread.start()
def start_cancel_listener(self, callback: Callable[[], None]) -> None:
mods, keysym = self._parse_hotkey("Escape")
thread = threading.Thread(target=self._listen, args=(mods, keysym, callback), daemon=True)
thread.start()
def inject_text(
self,
text: str,
backend: str,
*,
remove_transcription_from_clipboard: bool = False,
) -> None:
backend = (backend or "").strip().lower()
if backend == "clipboard":
previous_clipboard = None
if remove_transcription_from_clipboard:
previous_clipboard = self._read_clipboard_text()
self._write_clipboard(text)
self._paste_clipboard()
if remove_transcription_from_clipboard:
time.sleep(CLIPBOARD_RESTORE_DELAY_SEC)
self._restore_clipboard_text(previous_clipboard)
return
if backend == "injection":
self._type_text(text)
return
raise ValueError(f"unknown injection backend: {backend}")
def _read_clipboard_text(self) -> str | None:
Gtk.init([])
clipboard = Gtk.Clipboard.get(Gdk.SELECTION_CLIPBOARD)
text = clipboard.wait_for_text()
return str(text) if text is not None else None
def run_tray(self, state_getter: Callable[[], str], on_quit: Callable[[], None]) -> None:
self.menu = Gtk.Menu()
quit_item = Gtk.MenuItem(label="Quit")
quit_item.connect("activate", lambda *_: self._handle_quit(on_quit))
self.menu.append(quit_item)
self.menu.show_all()
if self.indicator is not None:
self.indicator.set_menu(self.menu)
self._update_tray(state_getter)
GLib.timeout_add(TRAY_UPDATE_MS, self._update_tray, state_getter)
Gtk.main()
def request_quit(self) -> None:
GLib.idle_add(Gtk.main_quit)
def _handle_quit(self, on_quit: Callable[[], None]) -> None:
try:
on_quit()
finally:
self.request_quit()
def _listen(self, mods: int, keysym: int, callback: Callable[[], None]) -> None:
disp = None
root = None
keycode = None
try:
disp = display.Display()
root = disp.screen().root
keycode = self._grab_hotkey(disp, root, mods, keysym)
while True:
ev = disp.next_event()
if ev.type == X.KeyPress and ev.detail == keycode:
state = ev.state & ~(X.LockMask | X.Mod2Mask)
if state == mods:
callback()
except Exception as exc:
logging.error("hotkey listener stopped: %s", exc)
finally:
if root is not None and keycode is not None and disp is not None:
try:
root.ungrab_key(keycode, X.AnyModifier)
disp.sync()
except Exception:
pass
def _parse_hotkey(self, hotkey: str):
mods = 0
modifier_parts, key_part = split_hotkey(hotkey)
for modifier in modifier_parts:
mods |= MOD_MAP[modifier]
keysym = XK.string_to_keysym(key_part)
if keysym == 0 and len(key_part) == 1:
keysym = ord(key_part)
if keysym == 0:
raise ValueError(f"unsupported key: {key_part}")
return mods, keysym
def _validate_hotkey_registration(self, mods: int, keysym: int) -> None:
disp = None
root = None
keycode = None
try:
disp = display.Display()
root = disp.screen().root
keycode = self._grab_hotkey(disp, root, mods, keysym)
finally:
if root is not None and keycode is not None and disp is not None:
try:
root.ungrab_key(keycode, X.AnyModifier)
disp.sync()
except Exception:
pass
if disp is not None:
try:
disp.close()
except Exception:
pass
def _grab_hotkey(self, disp, root, mods, keysym):
keycode = disp.keysym_to_keycode(keysym)
if keycode == 0:
raise ValueError("hotkey is not available on this keyboard layout")
root.grab_key(keycode, mods, True, X.GrabModeAsync, X.GrabModeAsync)
root.grab_key(keycode, mods | X.LockMask, True, X.GrabModeAsync, X.GrabModeAsync)
root.grab_key(keycode, mods | X.Mod2Mask, True, X.GrabModeAsync, X.GrabModeAsync)
root.grab_key(keycode, mods | X.LockMask | X.Mod2Mask, True, X.GrabModeAsync, X.GrabModeAsync)
disp.sync()
return keycode
def _write_clipboard(self, text: str) -> None:
Gtk.init([])
clipboard = Gtk.Clipboard.get(Gdk.SELECTION_CLIPBOARD)
clipboard.set_text(text, -1)
clipboard.store()
while Gtk.events_pending():
Gtk.main_iteration()
def _restore_clipboard_text(self, text: str | None) -> None:
Gtk.init([])
clipboard = Gtk.Clipboard.get(Gdk.SELECTION_CLIPBOARD)
clipboard.set_text(text or "", -1)
clipboard.store()
while Gtk.events_pending():
Gtk.main_iteration()
def _paste_clipboard(self) -> None:
dpy = display.Display()
self._send_combo(dpy, ["Control_L", "Shift_L", "v"])
def _type_text(self, text: str) -> None:
if not text:
return
dpy = display.Display()
for ch in text:
if ch == "\n":
self._send_combo(dpy, ["Return"])
continue
keysym, needs_shift = self._keysym_for_char(ch)
if keysym is None:
continue
if needs_shift:
self._send_combo(dpy, ["Shift_L", keysym], already_keysym=True)
else:
self._send_combo(dpy, [keysym], already_keysym=True)
def _send_combo(self, dpy: display.Display, keys: Iterable[str | int], already_keysym: bool = False) -> None:
keycodes: list[int] = []
for key in keys:
keysym = key if already_keysym else XK.string_to_keysym(str(key))
if keysym == 0:
continue
keycode = dpy.keysym_to_keycode(int(keysym))
if keycode == 0:
continue
keycodes.append(keycode)
for code in keycodes:
xtest.fake_input(dpy, X.KeyPress, code)
for code in reversed(keycodes):
xtest.fake_input(dpy, X.KeyRelease, code)
dpy.flush()
def _keysym_for_char(self, ch: str) -> tuple[int | None, bool]:
if ch.isupper():
base = ch.lower()
keysym = XK.string_to_keysym(base)
return (keysym if keysym != 0 else None, True)
if ch in _SHIFTED:
keysym = XK.string_to_keysym(_SHIFTED[ch])
return (keysym if keysym != 0 else None, True)
if ch == " ":
return (XK.string_to_keysym("space"), False)
keysym = XK.string_to_keysym(ch)
return (keysym if keysym != 0 else None, False)
def _icon_path(self, state: str) -> str:
if state == "recording":
return str(ASSETS_DIR / "recording.png")
if state == "stt":
return str(ASSETS_DIR / "stt.png")
if state == "processing":
return str(ASSETS_DIR / "processing.png")
return str(ASSETS_DIR / "idle.png")
def _title(self, state: str) -> str:
if state == "recording":
return "Recording"
if state == "stt":
return "STT"
if state == "processing":
return "AI Processing"
return "Idle"
def _update_tray(self, state_getter: Callable[[], str]):
state = state_getter()
icon_path = self._icon_path(state)
if self.indicator is not None:
self.indicator.set_icon_full(icon_path, self._title(state))
self.indicator.set_label(self._title(state), "")
elif self.status_icon is not None:
self.status_icon.set_from_file(icon_path)
self.status_icon.set_tooltip_text(self._title(state))
return True
_SHIFTED = {
"!": "1",
"@": "2",
"#": "3",
"$": "4",
"%": "5",
"^": "6",
"&": "7",
"*": "8",
"(": "9",
")": "0",
"_": "-",
"+": "=",
"{": "[",
"}": "]",
"|": "\\",
":": ";",
"\"": "'",
"<": ",",
">": ".",
"?": "/",
}