Add desktop adapters and extras
This commit is contained in:
parent
a83a843e1a
commit
fb1d0c07f9
10 changed files with 383 additions and 276 deletions
28
src/desktop.py
Normal file
28
src/desktop.py
Normal file
|
|
@ -0,0 +1,28 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
from typing import Callable, Protocol
|
||||
|
||||
|
||||
class DesktopAdapter(Protocol):
|
||||
def start_hotkey_listener(self, hotkey: str, callback: Callable[[], None]) -> None:
|
||||
raise NotImplementedError
|
||||
|
||||
def inject_text(self, text: str, backend: str) -> None:
|
||||
raise NotImplementedError
|
||||
|
||||
def run_tray(self, state_getter: Callable[[], str], on_quit: Callable[[], None]) -> None:
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
def get_desktop_adapter() -> DesktopAdapter:
|
||||
session_type = os.getenv("XDG_SESSION_TYPE", "").lower()
|
||||
if session_type == "wayland" or os.getenv("WAYLAND_DISPLAY"):
|
||||
from desktop_wayland import WaylandAdapter
|
||||
|
||||
raise SystemExit(
|
||||
"Wayland is not supported yet. Run under X11 (XDG_SESSION_TYPE=x11) to use lel."
|
||||
)
|
||||
from desktop_x11 import X11Adapter
|
||||
|
||||
return X11Adapter()
|
||||
14
src/desktop_wayland.py
Normal file
14
src/desktop_wayland.py
Normal file
|
|
@ -0,0 +1,14 @@
|
|||
from __future__ import annotations
|
||||
|
||||
from typing import Callable
|
||||
|
||||
|
||||
class WaylandAdapter:
|
||||
def start_hotkey_listener(self, _hotkey: str, _callback: Callable[[], None]) -> None:
|
||||
raise SystemExit("Wayland hotkeys are not supported yet.")
|
||||
|
||||
def inject_text(self, _text: str, _backend: str) -> None:
|
||||
raise SystemExit("Wayland text injection is not supported yet.")
|
||||
|
||||
def run_tray(self, _state_getter: Callable[[], str], _on_quit: Callable[[], None]) -> None:
|
||||
raise SystemExit("Wayland tray support is not available yet.")
|
||||
246
src/desktop_x11.py
Normal file
246
src/desktop_x11.py
Normal file
|
|
@ -0,0 +1,246 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import threading
|
||||
import warnings
|
||||
from pathlib import Path
|
||||
from typing import Callable, Iterable
|
||||
|
||||
import gi
|
||||
from Xlib import X, XK, display
|
||||
from Xlib.ext import xtest
|
||||
|
||||
gi.require_version("Gtk", "3.0")
|
||||
try:
|
||||
gi.require_version("AppIndicator3", "0.1")
|
||||
from gi.repository import AppIndicator3 # type: ignore[import-not-found]
|
||||
except ValueError:
|
||||
AppIndicator3 = None
|
||||
|
||||
from gi.repository import GLib, Gdk, Gtk # type: ignore[import-not-found]
|
||||
|
||||
|
||||
ASSETS_DIR = Path(__file__).parent / "assets"
|
||||
TRAY_UPDATE_MS = 250
|
||||
|
||||
MOD_MAP = {
|
||||
"shift": X.ShiftMask,
|
||||
"ctrl": X.ControlMask,
|
||||
"control": X.ControlMask,
|
||||
"alt": X.Mod1Mask,
|
||||
"mod1": X.Mod1Mask,
|
||||
"super": X.Mod4Mask,
|
||||
"mod4": X.Mod4Mask,
|
||||
"cmd": X.Mod4Mask,
|
||||
"command": X.Mod4Mask,
|
||||
}
|
||||
|
||||
|
||||
class X11Adapter:
|
||||
def __init__(self):
|
||||
self.indicator = None
|
||||
self.status_icon = None
|
||||
self.menu = None
|
||||
if AppIndicator3 is not None:
|
||||
self.indicator = AppIndicator3.Indicator.new(
|
||||
"lel",
|
||||
self._icon_path("idle"),
|
||||
AppIndicator3.IndicatorCategory.APPLICATION_STATUS,
|
||||
)
|
||||
self.indicator.set_status(AppIndicator3.IndicatorStatus.ACTIVE)
|
||||
else:
|
||||
logging.warning("AppIndicator3 unavailable; falling back to deprecated Gtk.StatusIcon")
|
||||
warnings.filterwarnings(
|
||||
"ignore",
|
||||
message=".*Gtk.StatusIcon.*",
|
||||
category=DeprecationWarning,
|
||||
)
|
||||
self.status_icon = Gtk.StatusIcon()
|
||||
self.status_icon.set_visible(True)
|
||||
self.status_icon.connect("popup-menu", self._on_tray_menu)
|
||||
|
||||
def start_hotkey_listener(self, hotkey: str, callback: Callable[[], None]) -> None:
|
||||
thread = threading.Thread(target=self._listen, args=(hotkey, callback), daemon=True)
|
||||
thread.start()
|
||||
|
||||
def inject_text(self, text: str, backend: str) -> None:
|
||||
backend = (backend or "").strip().lower()
|
||||
if backend in ("", "clipboard"):
|
||||
self._write_clipboard(text)
|
||||
self._paste_clipboard()
|
||||
return
|
||||
if backend == "injection":
|
||||
self._type_text(text)
|
||||
return
|
||||
raise ValueError(f"unknown injection backend: {backend}")
|
||||
|
||||
def run_tray(self, state_getter: Callable[[], str], on_quit: Callable[[], None]) -> None:
|
||||
self.menu = Gtk.Menu()
|
||||
quit_item = Gtk.MenuItem(label="Quit")
|
||||
quit_item.connect("activate", lambda *_: on_quit())
|
||||
self.menu.append(quit_item)
|
||||
self.menu.show_all()
|
||||
if self.indicator is not None:
|
||||
self.indicator.set_menu(self.menu)
|
||||
|
||||
self._update_tray(state_getter)
|
||||
GLib.timeout_add(TRAY_UPDATE_MS, self._update_tray, state_getter)
|
||||
Gtk.main()
|
||||
|
||||
def _listen(self, hotkey: str, callback: Callable[[], None]) -> None:
|
||||
disp = display.Display()
|
||||
root = disp.screen().root
|
||||
mods, keysym = self._parse_hotkey(hotkey)
|
||||
keycode = self._grab_hotkey(disp, root, mods, keysym)
|
||||
try:
|
||||
while True:
|
||||
ev = disp.next_event()
|
||||
if ev.type == X.KeyPress and ev.detail == keycode:
|
||||
state = ev.state & ~(X.LockMask | X.Mod2Mask)
|
||||
if state == mods:
|
||||
callback()
|
||||
finally:
|
||||
try:
|
||||
root.ungrab_key(keycode, X.AnyModifier)
|
||||
disp.sync()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def _parse_hotkey(self, hotkey: str):
|
||||
parts = [p.strip() for p in hotkey.split("+") if p.strip()]
|
||||
mods = 0
|
||||
key_part = None
|
||||
for p in parts:
|
||||
low = p.lower()
|
||||
if low in MOD_MAP:
|
||||
mods |= MOD_MAP[low]
|
||||
else:
|
||||
key_part = p
|
||||
if not key_part:
|
||||
raise ValueError("hotkey missing key")
|
||||
|
||||
keysym = XK.string_to_keysym(key_part)
|
||||
if keysym == 0 and len(key_part) == 1:
|
||||
keysym = ord(key_part)
|
||||
if keysym == 0:
|
||||
raise ValueError(f"unsupported key: {key_part}")
|
||||
return mods, keysym
|
||||
|
||||
def _grab_hotkey(self, disp, root, mods, keysym):
|
||||
keycode = disp.keysym_to_keycode(keysym)
|
||||
root.grab_key(keycode, mods, True, X.GrabModeAsync, X.GrabModeAsync)
|
||||
root.grab_key(keycode, mods | X.LockMask, True, X.GrabModeAsync, X.GrabModeAsync)
|
||||
root.grab_key(keycode, mods | X.Mod2Mask, True, X.GrabModeAsync, X.GrabModeAsync)
|
||||
root.grab_key(keycode, mods | X.LockMask | X.Mod2Mask, True, X.GrabModeAsync, X.GrabModeAsync)
|
||||
disp.sync()
|
||||
return keycode
|
||||
|
||||
def _write_clipboard(self, text: str) -> None:
|
||||
Gtk.init([])
|
||||
clipboard = Gtk.Clipboard.get(Gdk.SELECTION_CLIPBOARD)
|
||||
clipboard.set_text(text, -1)
|
||||
clipboard.store()
|
||||
while Gtk.events_pending():
|
||||
Gtk.main_iteration()
|
||||
|
||||
def _paste_clipboard(self) -> None:
|
||||
dpy = display.Display()
|
||||
self._send_combo(dpy, ["Control_L", "Shift_L", "v"])
|
||||
|
||||
def _type_text(self, text: str) -> None:
|
||||
if not text:
|
||||
return
|
||||
dpy = display.Display()
|
||||
for ch in text:
|
||||
if ch == "\n":
|
||||
self._send_combo(dpy, ["Return"])
|
||||
continue
|
||||
keysym, needs_shift = self._keysym_for_char(ch)
|
||||
if keysym is None:
|
||||
continue
|
||||
if needs_shift:
|
||||
self._send_combo(dpy, ["Shift_L", keysym], already_keysym=True)
|
||||
else:
|
||||
self._send_combo(dpy, [keysym], already_keysym=True)
|
||||
|
||||
def _send_combo(self, dpy: display.Display, keys: Iterable[str], already_keysym: bool = False) -> None:
|
||||
keycodes: list[int] = []
|
||||
for key in keys:
|
||||
keysym = key if already_keysym else XK.string_to_keysym(key)
|
||||
if keysym == 0:
|
||||
continue
|
||||
keycode = dpy.keysym_to_keycode(keysym)
|
||||
if keycode == 0:
|
||||
continue
|
||||
keycodes.append(keycode)
|
||||
for code in keycodes:
|
||||
xtest.fake_input(dpy, X.KeyPress, code)
|
||||
for code in reversed(keycodes):
|
||||
xtest.fake_input(dpy, X.KeyRelease, code)
|
||||
dpy.flush()
|
||||
|
||||
def _keysym_for_char(self, ch: str) -> tuple[int | None, bool]:
|
||||
if ch.isupper():
|
||||
base = ch.lower()
|
||||
keysym = XK.string_to_keysym(base)
|
||||
return (keysym if keysym != 0 else None, True)
|
||||
if ch in _SHIFTED:
|
||||
keysym = XK.string_to_keysym(_SHIFTED[ch])
|
||||
return (keysym if keysym != 0 else None, True)
|
||||
if ch == " ":
|
||||
return (XK.string_to_keysym("space"), False)
|
||||
keysym = XK.string_to_keysym(ch)
|
||||
return (keysym if keysym != 0 else None, False)
|
||||
|
||||
def _icon_path(self, state: str) -> str:
|
||||
if state == "recording":
|
||||
return str(ASSETS_DIR / "recording.png")
|
||||
if state == "stt":
|
||||
return str(ASSETS_DIR / "stt.png")
|
||||
if state == "processing":
|
||||
return str(ASSETS_DIR / "processing.png")
|
||||
return str(ASSETS_DIR / "idle.png")
|
||||
|
||||
def _title(self, state: str) -> str:
|
||||
if state == "recording":
|
||||
return "Recording"
|
||||
if state == "stt":
|
||||
return "STT"
|
||||
if state == "processing":
|
||||
return "AI Processing"
|
||||
return "Idle"
|
||||
|
||||
def _update_tray(self, state_getter: Callable[[], str]):
|
||||
state = state_getter()
|
||||
icon_path = self._icon_path(state)
|
||||
if self.indicator is not None:
|
||||
self.indicator.set_icon_full(icon_path, self._title(state))
|
||||
self.indicator.set_label(self._title(state), "")
|
||||
elif self.status_icon is not None:
|
||||
self.status_icon.set_from_file(icon_path)
|
||||
self.status_icon.set_tooltip_text(self._title(state))
|
||||
return True
|
||||
|
||||
|
||||
_SHIFTED = {
|
||||
"!": "1",
|
||||
"@": "2",
|
||||
"#": "3",
|
||||
"$": "4",
|
||||
"%": "5",
|
||||
"^": "6",
|
||||
"&": "7",
|
||||
"*": "8",
|
||||
"(": "9",
|
||||
")": "0",
|
||||
"_": "-",
|
||||
"+": "=",
|
||||
"{": "[",
|
||||
"}": "]",
|
||||
"|": "\\",
|
||||
":": ";",
|
||||
"\"": "'",
|
||||
"<": ",",
|
||||
">": ".",
|
||||
"?": "/",
|
||||
}
|
||||
110
src/inject.py
110
src/inject.py
|
|
@ -1,110 +0,0 @@
|
|||
from __future__ import annotations
|
||||
|
||||
from typing import Iterable
|
||||
|
||||
import gi
|
||||
|
||||
gi.require_version("Gtk", "3.0")
|
||||
gi.require_version("Gdk", "3.0")
|
||||
|
||||
from gi.repository import Gdk, Gtk
|
||||
from Xlib import X, XK, display
|
||||
from Xlib.ext import xtest
|
||||
|
||||
|
||||
def write_clipboard(text: str) -> None:
|
||||
Gtk.init([])
|
||||
clipboard = Gtk.Clipboard.get(Gdk.SELECTION_CLIPBOARD)
|
||||
clipboard.set_text(text, -1)
|
||||
clipboard.store()
|
||||
while Gtk.events_pending():
|
||||
Gtk.main_iteration()
|
||||
|
||||
|
||||
def paste_clipboard() -> None:
|
||||
dpy = display.Display()
|
||||
_send_combo(dpy, ["Control_L", "Shift_L", "v"])
|
||||
|
||||
|
||||
def type_text(text: str) -> None:
|
||||
if not text:
|
||||
return
|
||||
dpy = display.Display()
|
||||
for ch in text:
|
||||
if ch == "\n":
|
||||
_send_combo(dpy, ["Return"])
|
||||
continue
|
||||
keysym, needs_shift = _keysym_for_char(ch)
|
||||
if keysym is None:
|
||||
continue
|
||||
if needs_shift:
|
||||
_send_combo(dpy, ["Shift_L", keysym], already_keysym=True)
|
||||
else:
|
||||
_send_combo(dpy, [keysym], already_keysym=True)
|
||||
|
||||
|
||||
def inject(text: str, backend: str) -> None:
|
||||
backend = (backend or "").strip().lower()
|
||||
if backend in ("", "clipboard"):
|
||||
write_clipboard(text)
|
||||
paste_clipboard()
|
||||
return
|
||||
if backend == "injection":
|
||||
type_text(text)
|
||||
return
|
||||
raise ValueError(f"unknown injection backend: {backend}")
|
||||
|
||||
|
||||
def _send_combo(dpy: display.Display, keys: Iterable[str], already_keysym: bool = False) -> None:
|
||||
keycodes: list[int] = []
|
||||
for key in keys:
|
||||
keysym = key if already_keysym else XK.string_to_keysym(key)
|
||||
if keysym == 0:
|
||||
continue
|
||||
keycode = dpy.keysym_to_keycode(keysym)
|
||||
if keycode == 0:
|
||||
continue
|
||||
keycodes.append(keycode)
|
||||
for code in keycodes:
|
||||
xtest.fake_input(dpy, X.KeyPress, code)
|
||||
for code in reversed(keycodes):
|
||||
xtest.fake_input(dpy, X.KeyRelease, code)
|
||||
dpy.flush()
|
||||
|
||||
|
||||
_SHIFTED = {
|
||||
"!": "1",
|
||||
"@": "2",
|
||||
"#": "3",
|
||||
"$": "4",
|
||||
"%": "5",
|
||||
"^": "6",
|
||||
"&": "7",
|
||||
"*": "8",
|
||||
"(": "9",
|
||||
")": "0",
|
||||
"_": "-",
|
||||
"+": "=",
|
||||
"{": "[",
|
||||
"}": "]",
|
||||
"|": "\\",
|
||||
":": ";",
|
||||
"\"": "'",
|
||||
"<": ",",
|
||||
">": ".",
|
||||
"?": "/",
|
||||
}
|
||||
|
||||
|
||||
def _keysym_for_char(ch: str) -> tuple[int | None, bool]:
|
||||
if ch.isupper():
|
||||
base = ch.lower()
|
||||
keysym = XK.string_to_keysym(base)
|
||||
return (keysym if keysym != 0 else None, True)
|
||||
if ch in _SHIFTED:
|
||||
keysym = XK.string_to_keysym(_SHIFTED[ch])
|
||||
return (keysym if keysym != 0 else None, True)
|
||||
if ch == " ":
|
||||
return (XK.string_to_keysym("space"), False)
|
||||
keysym = XK.string_to_keysym(ch)
|
||||
return (keysym if keysym != 0 else None, False)
|
||||
100
src/leld.py
100
src/leld.py
|
|
@ -7,7 +7,6 @@ import signal
|
|||
import sys
|
||||
import threading
|
||||
import time
|
||||
import warnings
|
||||
from pathlib import Path
|
||||
|
||||
import gi
|
||||
|
|
@ -16,17 +15,7 @@ from faster_whisper import WhisperModel
|
|||
from config import Config, load, redacted_dict
|
||||
from recorder import start_recording, stop_recording
|
||||
from aiprocess import build_processor
|
||||
from inject import inject
|
||||
from x11_hotkey import listen
|
||||
|
||||
gi.require_version("Gtk", "3.0")
|
||||
try:
|
||||
gi.require_version("AppIndicator3", "0.1")
|
||||
from gi.repository import AppIndicator3 # type: ignore[import-not-found]
|
||||
except ValueError:
|
||||
AppIndicator3 = None
|
||||
|
||||
from gi.repository import GLib, Gtk # type: ignore[import-not-found]
|
||||
from desktop import get_desktop_adapter
|
||||
|
||||
|
||||
class State:
|
||||
|
|
@ -37,10 +26,8 @@ class State:
|
|||
OUTPUTTING = "outputting"
|
||||
|
||||
|
||||
ASSETS_DIR = Path(__file__).parent / "assets"
|
||||
RECORD_TIMEOUT_SEC = 300
|
||||
STT_LANGUAGE = "en"
|
||||
TRAY_UPDATE_MS = 250
|
||||
|
||||
|
||||
def _compute_type(device: str) -> str:
|
||||
|
|
@ -51,8 +38,9 @@ def _compute_type(device: str) -> str:
|
|||
|
||||
|
||||
class Daemon:
|
||||
def __init__(self, cfg: Config, *, llama_verbose: bool = False):
|
||||
def __init__(self, cfg: Config, desktop, *, llama_verbose: bool = False):
|
||||
self.cfg = cfg
|
||||
self.desktop = desktop
|
||||
self.lock = threading.Lock()
|
||||
self.state = State.IDLE
|
||||
self.proc = None
|
||||
|
|
@ -64,32 +52,6 @@ class Daemon:
|
|||
compute_type=_compute_type(cfg.stt.get("device", "cpu")),
|
||||
)
|
||||
self.ai_processor = build_processor(verbose=llama_verbose)
|
||||
self.indicator = None
|
||||
self.status_icon = None
|
||||
if AppIndicator3 is not None:
|
||||
self.indicator = AppIndicator3.Indicator.new(
|
||||
"lel",
|
||||
self._icon_path(State.IDLE),
|
||||
AppIndicator3.IndicatorCategory.APPLICATION_STATUS,
|
||||
)
|
||||
self.indicator.set_status(AppIndicator3.IndicatorStatus.ACTIVE)
|
||||
else:
|
||||
logging.warning("AppIndicator3 unavailable; falling back to deprecated Gtk.StatusIcon")
|
||||
warnings.filterwarnings(
|
||||
"ignore",
|
||||
message=".*Gtk.StatusIcon.*",
|
||||
category=DeprecationWarning,
|
||||
)
|
||||
self.status_icon = Gtk.StatusIcon()
|
||||
self.status_icon.set_visible(True)
|
||||
self.status_icon.connect("popup-menu", self._on_tray_menu)
|
||||
self.menu = Gtk.Menu()
|
||||
quit_item = Gtk.MenuItem(label="Quit")
|
||||
quit_item.connect("activate", lambda *_: self._quit())
|
||||
self.menu.append(quit_item)
|
||||
self.menu.show_all()
|
||||
if self.indicator is not None:
|
||||
self.indicator.set_menu(self.menu)
|
||||
|
||||
def set_state(self, state: str):
|
||||
with self.lock:
|
||||
|
|
@ -105,9 +67,6 @@ class Daemon:
|
|||
def _quit(self):
|
||||
os._exit(0)
|
||||
|
||||
def _on_tray_menu(self, _icon, _button, _time):
|
||||
self.menu.popup(None, None, None, None, 0, _time)
|
||||
|
||||
def toggle(self):
|
||||
with self.lock:
|
||||
if self.state == State.IDLE:
|
||||
|
|
@ -198,7 +157,7 @@ class Daemon:
|
|||
self.set_state(State.OUTPUTTING)
|
||||
logging.info("outputting started")
|
||||
backend = self.cfg.injection.get("backend", "clipboard")
|
||||
inject(text, backend)
|
||||
self.desktop.inject_text(text, backend)
|
||||
except Exception as exc:
|
||||
logging.error("output failed: %s", exc)
|
||||
finally:
|
||||
|
|
@ -221,41 +180,6 @@ class Daemon:
|
|||
parts.append(text)
|
||||
return " ".join(parts).strip()
|
||||
|
||||
def _icon_path(self, state: str) -> str:
|
||||
if state == State.RECORDING:
|
||||
return str(ASSETS_DIR / "recording.png")
|
||||
if state == State.STT:
|
||||
return str(ASSETS_DIR / "stt.png")
|
||||
if state == State.PROCESSING:
|
||||
return str(ASSETS_DIR / "processing.png")
|
||||
return str(ASSETS_DIR / "idle.png")
|
||||
|
||||
def _title(self, state: str) -> str:
|
||||
if state == State.RECORDING:
|
||||
return "Recording"
|
||||
if state == State.STT:
|
||||
return "STT"
|
||||
if state == State.PROCESSING:
|
||||
return "AI Processing"
|
||||
return "Idle"
|
||||
|
||||
def _update_tray(self):
|
||||
state = self.get_state()
|
||||
icon_path = self._icon_path(state)
|
||||
if self.indicator is not None:
|
||||
self.indicator.set_icon_full(icon_path, self._title(state))
|
||||
self.indicator.set_label(self._title(state), "")
|
||||
elif self.status_icon is not None:
|
||||
self.status_icon.set_from_file(icon_path)
|
||||
self.status_icon.set_tooltip_text(self._title(state))
|
||||
return True
|
||||
|
||||
def run_tray(self):
|
||||
self._update_tray()
|
||||
GLib.timeout_add(TRAY_UPDATE_MS, self._update_tray)
|
||||
Gtk.main()
|
||||
|
||||
|
||||
def _lock_single_instance():
|
||||
runtime_dir = Path(os.getenv("XDG_RUNTIME_DIR", "/tmp")) / "lel"
|
||||
runtime_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
|
@ -291,8 +215,9 @@ def main():
|
|||
|
||||
if args.verbose:
|
||||
logging.getLogger().setLevel(logging.DEBUG)
|
||||
desktop = get_desktop_adapter()
|
||||
try:
|
||||
daemon = Daemon(cfg, llama_verbose=args.verbose)
|
||||
daemon = Daemon(cfg, desktop, llama_verbose=args.verbose)
|
||||
except Exception as exc:
|
||||
logging.error("startup failed: %s", exc)
|
||||
raise SystemExit(1)
|
||||
|
|
@ -308,14 +233,11 @@ def main():
|
|||
signal.signal(signal.SIGINT, handle_signal)
|
||||
signal.signal(signal.SIGTERM, handle_signal)
|
||||
|
||||
threading.Thread(
|
||||
target=lambda: listen(
|
||||
cfg.daemon.get("hotkey", ""),
|
||||
lambda: logging.info("hotkey pressed (dry-run)") if args.dry_run else daemon.toggle(),
|
||||
),
|
||||
daemon=True,
|
||||
).start()
|
||||
daemon.run_tray()
|
||||
desktop.start_hotkey_listener(
|
||||
cfg.daemon.get("hotkey", ""),
|
||||
lambda: logging.info("hotkey pressed (dry-run)") if args.dry_run else daemon.toggle(),
|
||||
)
|
||||
desktop.run_tray(daemon.get_state, daemon._quit)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
|
|
|||
|
|
@ -1,67 +0,0 @@
|
|||
from Xlib import X, display
|
||||
from Xlib import XK
|
||||
|
||||
MOD_MAP = {
|
||||
"shift": X.ShiftMask,
|
||||
"ctrl": X.ControlMask,
|
||||
"control": X.ControlMask,
|
||||
"alt": X.Mod1Mask,
|
||||
"mod1": X.Mod1Mask,
|
||||
"super": X.Mod4Mask,
|
||||
"mod4": X.Mod4Mask,
|
||||
"cmd": X.Mod4Mask,
|
||||
"command": X.Mod4Mask,
|
||||
}
|
||||
|
||||
|
||||
def parse_hotkey(hotkey: str):
|
||||
parts = [p.strip() for p in hotkey.split("+") if p.strip()]
|
||||
mods = 0
|
||||
key_part = None
|
||||
for p in parts:
|
||||
low = p.lower()
|
||||
if low in MOD_MAP:
|
||||
mods |= MOD_MAP[low]
|
||||
else:
|
||||
key_part = p
|
||||
if not key_part:
|
||||
raise ValueError("hotkey missing key")
|
||||
|
||||
keysym = XK.string_to_keysym(key_part)
|
||||
if keysym == 0 and len(key_part) == 1:
|
||||
keysym = ord(key_part)
|
||||
if keysym == 0:
|
||||
raise ValueError(f"unsupported key: {key_part}")
|
||||
|
||||
return mods, keysym
|
||||
|
||||
|
||||
def grab_hotkey(disp, root, mods, keysym):
|
||||
keycode = disp.keysym_to_keycode(keysym)
|
||||
root.grab_key(keycode, mods, True, X.GrabModeAsync, X.GrabModeAsync)
|
||||
# ignore CapsLock/NumLock
|
||||
root.grab_key(keycode, mods | X.LockMask, True, X.GrabModeAsync, X.GrabModeAsync)
|
||||
root.grab_key(keycode, mods | X.Mod2Mask, True, X.GrabModeAsync, X.GrabModeAsync)
|
||||
root.grab_key(keycode, mods | X.LockMask | X.Mod2Mask, True, X.GrabModeAsync, X.GrabModeAsync)
|
||||
disp.sync()
|
||||
return keycode
|
||||
|
||||
|
||||
def listen(hotkey: str, on_trigger):
|
||||
disp = display.Display()
|
||||
root = disp.screen().root
|
||||
mods, keysym = parse_hotkey(hotkey)
|
||||
keycode = grab_hotkey(disp, root, mods, keysym)
|
||||
try:
|
||||
while True:
|
||||
ev = disp.next_event()
|
||||
if ev.type == X.KeyPress and ev.detail == keycode:
|
||||
state = ev.state & ~(X.LockMask | X.Mod2Mask)
|
||||
if state == mods:
|
||||
on_trigger()
|
||||
finally:
|
||||
try:
|
||||
root.ungrab_key(keycode, X.AnyModifier)
|
||||
disp.sync()
|
||||
except Exception:
|
||||
pass
|
||||
Loading…
Add table
Add a link
Reference in a new issue