From b3be44462577a2c5ff3325b7e6a67daf3038c2e5 Mon Sep 17 00:00:00 2001 From: Thales Maciel Date: Tue, 24 Feb 2026 14:15:17 -0300 Subject: [PATCH] Refine config and runtime flow --- AGENTS.md | 3 +- Makefile | 14 ++- README.md | 31 +++-- pyproject.toml | 1 + src/aiprocess.py | 18 ++- src/config.py | 184 +++++++++++++++++++++++++----- src/constants.py | 1 - src/desktop.py | 5 +- src/desktop_wayland.py | 3 + src/desktop_x11.py | 37 ++++-- src/leld.py | 253 ++++++++++++++++++++++++++++++----------- src/recorder.py | 20 +++- systemd/lel.service | 3 +- tests/test_config.py | 98 ++++++++++++++++ tests/test_leld.py | 105 +++++++++++++++++ uv.lock | 3 + 16 files changed, 642 insertions(+), 137 deletions(-) create mode 100644 tests/test_config.py create mode 100644 tests/test_leld.py diff --git a/AGENTS.md b/AGENTS.md index 1b50d91..ea22354 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -28,8 +28,7 @@ System packages (example names): ## Testing Guidelines -- No automated tests are present. -- If you add tests, include a brief note in `AGENTS.md` with the runner command and test location. +- Automated tests live in `tests/` and run with `python3 -m unittest discover -s tests -p 'test_*.py'`. ## Commit & Pull Request Guidelines diff --git a/Makefile b/Makefile index f3d6e44..bb2fcbf 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,6 @@ CONFIG := $(HOME)/.config/lel/config.json -.PHONY: run install sync +.PHONY: run install sync test check run: uv run python3 src/leld.py --config $(CONFIG) @@ -8,9 +8,17 @@ run: sync: uv sync +test: + python3 -m unittest discover -s tests -p 'test_*.py' + +check: + python3 -m py_compile src/*.py + $(MAKE) test + install: - mkdir -p $(HOME)/.local/bin - cp src/leld.py $(HOME)/.local/bin/leld.py + mkdir -p $(HOME)/.local/share/lel/src/assets + cp src/*.py $(HOME)/.local/share/lel/src/ + cp src/assets/*.png $(HOME)/.local/share/lel/src/assets/ cp systemd/lel.service $(HOME)/.config/systemd/user/lel.service systemctl --user daemon-reload systemctl --user enable --now lel diff --git a/README.md b/README.md index 7ce065d..e5aaddc 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # lel -Python X11 STT daemon that records audio, runs Whisper, logs the transcript, and can optionally run AI post-processing before injecting text. +Python X11 STT daemon that records audio, runs Whisper, and injects text. It can optionally run local AI post-processing before injection. ## Requirements @@ -9,7 +9,7 @@ Python X11 STT daemon that records audio, runs Whisper, logs the transcript, and - `faster-whisper` - `llama-cpp-python` - Tray icon deps: `gtk3`, `libayatana-appindicator3` -- Python deps (core): `pillow`, `faster-whisper`, `llama-cpp-python`, `sounddevice` +- Python deps (core): `numpy`, `pillow`, `faster-whisper`, `llama-cpp-python`, `sounddevice` - X11 extras: `PyGObject`, `python-xlib` System packages (example names): `portaudio`/`libportaudio2`. @@ -90,23 +90,29 @@ Create `~/.config/lel/config.json`: "daemon": { "hotkey": "Cmd+m" }, "recording": { "input": "0" }, "stt": { "model": "base", "device": "cpu" }, - "injection": { "backend": "clipboard" } + "injection": { "backend": "clipboard" }, + "ai": { "enabled": true }, + "logging": { "log_transcript": false } } ``` Recording input can be a device index (preferred) or a substring of the device name. -The LLM model is downloaded on first startup to `~/.cache/lel/models/` and uses -the locked Llama-3.2-3B GGUF model. -Pass `-v/--verbose` to see verbose logs, including llama.cpp loader logs; these -messages are prefixed with `llama::`. +`ai.enabled` controls local cleanup. When enabled, the LLM model is downloaded +on first use to `~/.cache/lel/models/` and uses the locked Llama-3.2-3B GGUF +model. + +`logging.log_transcript` controls whether recognized/processed text is written +to logs. This is disabled by default. `-v/--verbose` also enables transcript +logging and llama.cpp logs; llama logs are prefixed with `llama::`. ## systemd user service ```bash -mkdir -p ~/.local/bin -cp src/leld.py ~/.local/bin/leld.py +mkdir -p ~/.local/share/lel/src/assets +cp src/*.py ~/.local/share/lel/src/ +cp src/assets/*.png ~/.local/share/lel/src/assets/ cp systemd/lel.service ~/.config/systemd/user/lel.service systemctl --user daemon-reload systemctl --user enable --now lel @@ -116,7 +122,7 @@ systemctl --user enable --now lel - Press the hotkey once to start recording. - Press it again to stop and run STT. -- The transcript is logged to stderr. +- Transcript contents are logged only when `logging.log_transcript` is enabled or `-v/--verbose` is used. Wayland note: @@ -127,12 +133,13 @@ Injection backends: - `clipboard`: copy to clipboard and inject via Ctrl+Shift+V (GTK clipboard + XTest) - `injection`: type the text with simulated keypresses (XTest) -AI provider: +AI processing: -- Generic OpenAI-compatible chat API at `ai_base_url` (base URL only; the app uses `/v1/chat/completions`) +- Local llama.cpp model only (no remote provider configuration). Control: ```bash make run +make check ``` diff --git a/pyproject.toml b/pyproject.toml index d26c21c..8114ec4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -7,6 +7,7 @@ requires-python = ">=3.10" dependencies = [ "faster-whisper", "llama-cpp-python", + "numpy", "pillow", "sounddevice", ] diff --git a/src/aiprocess.py b/src/aiprocess.py index de6772a..b02badf 100644 --- a/src/aiprocess.py +++ b/src/aiprocess.py @@ -5,12 +5,9 @@ import logging import os import sys import urllib.request -from dataclasses import dataclass from typing import Any, Callable, cast -from llama_cpp import Llama, llama_cpp as llama_cpp_lib # type: ignore[import-not-found] - -from constants import LLM_LANGUAGE, MODEL_DIR, MODEL_NAME, MODEL_PATH, MODEL_URL +from constants import MODEL_DIR, MODEL_NAME, MODEL_PATH, MODEL_URL SYSTEM_PROMPT = ( @@ -36,7 +33,8 @@ SYSTEM_PROMPT = ( class LlamaProcessor: - def __init__(self, verbose=False): + def __init__(self, verbose: bool = False): + Llama, llama_cpp_lib = _load_llama_bindings() ensure_model() if not verbose: os.environ.setdefault("LLAMA_CPP_LOG_LEVEL", "ERROR") @@ -100,6 +98,16 @@ def ensure_model(): raise +def _load_llama_bindings(): + try: + from llama_cpp import Llama, llama_cpp as llama_cpp_lib # type: ignore[import-not-found] + except ModuleNotFoundError as exc: + raise RuntimeError( + "llama-cpp-python is not installed; install dependencies with `uv sync`" + ) from exc + return Llama, llama_cpp_lib + + def _extract_chat_text(payload: Any) -> str: if "choices" in payload and payload["choices"]: choice = payload["choices"][0] diff --git a/src/config.py b/src/config.py index 4d48cf7..a715aa2 100644 --- a/src/config.py +++ b/src/config.py @@ -1,16 +1,59 @@ +from __future__ import annotations + import json -from dataclasses import dataclass, field +from dataclasses import asdict, dataclass, field from pathlib import Path +from typing import Any from constants import DEFAULT_CONFIG_PATH +DEFAULT_HOTKEY = "Cmd+m" +DEFAULT_STT_MODEL = "base" +DEFAULT_STT_DEVICE = "cpu" +DEFAULT_INJECTION_BACKEND = "clipboard" +ALLOWED_INJECTION_BACKENDS = {"clipboard", "injection"} + + +@dataclass +class DaemonConfig: + hotkey: str = DEFAULT_HOTKEY + + +@dataclass +class RecordingConfig: + input: str | int | None = "" + + +@dataclass +class SttConfig: + model: str = DEFAULT_STT_MODEL + device: str = DEFAULT_STT_DEVICE + + +@dataclass +class InjectionConfig: + backend: str = DEFAULT_INJECTION_BACKEND + + +@dataclass +class AiConfig: + enabled: bool = True + + +@dataclass +class LoggingConfig: + log_transcript: bool = False + + @dataclass class Config: - daemon: dict = field(default_factory=lambda: {"hotkey": "Cmd+m"}) - recording: dict = field(default_factory=lambda: {"input": ""}) - stt: dict = field(default_factory=lambda: {"model": "base", "device": "cpu"}) - injection: dict = field(default_factory=lambda: {"backend": "clipboard"}) + daemon: DaemonConfig = field(default_factory=DaemonConfig) + recording: RecordingConfig = field(default_factory=RecordingConfig) + stt: SttConfig = field(default_factory=SttConfig) + injection: InjectionConfig = field(default_factory=InjectionConfig) + ai: AiConfig = field(default_factory=AiConfig) + logging: LoggingConfig = field(default_factory=LoggingConfig) def load(path: str | None) -> Config: @@ -18,33 +61,120 @@ def load(path: str | None) -> Config: p = Path(path) if path else DEFAULT_CONFIG_PATH if p.exists(): data = json.loads(p.read_text(encoding="utf-8")) - if any(k in data for k in ("daemon", "recording", "stt", "injection")): - for k, v in data.items(): - if hasattr(cfg, k): - setattr(cfg, k, v) - else: - cfg.daemon["hotkey"] = data.get("hotkey", cfg.daemon["hotkey"]) - cfg.recording["input"] = data.get("input", cfg.recording["input"]) - cfg.stt["model"] = data.get("whisper_model", cfg.stt["model"]) - cfg.stt["device"] = data.get("whisper_device", cfg.stt["device"]) - cfg.injection["backend"] = data.get("injection_backend", cfg.injection["backend"]) - - if not isinstance(cfg.daemon, dict): - cfg.daemon = {"hotkey": "Cmd+m"} - if not isinstance(cfg.recording, dict): - cfg.recording = {"input": ""} - if not isinstance(cfg.stt, dict): - cfg.stt = {"model": "base", "device": "cpu"} - if not isinstance(cfg.injection, dict): - cfg.injection = {"backend": "clipboard"} + if not isinstance(data, dict): + raise ValueError("config must be a JSON object") + cfg = _from_dict(data, cfg) validate(cfg) return cfg -def redacted_dict(cfg: Config) -> dict: - return cfg.__dict__.copy() +def redacted_dict(cfg: Config) -> dict[str, Any]: + return asdict(cfg) def validate(cfg: Config) -> None: - if not cfg.daemon.get("hotkey"): + hotkey = cfg.daemon.hotkey.strip() + if not hotkey: raise ValueError("daemon.hotkey cannot be empty") + + if isinstance(cfg.recording.input, bool): + raise ValueError("recording.input cannot be boolean") + if not isinstance(cfg.recording.input, (str, int)) and cfg.recording.input is not None: + raise ValueError("recording.input must be string, integer, or null") + + model = cfg.stt.model.strip() + if not model: + raise ValueError("stt.model cannot be empty") + + device = cfg.stt.device.strip() + if not device: + raise ValueError("stt.device cannot be empty") + + backend = cfg.injection.backend.strip().lower() + if backend not in ALLOWED_INJECTION_BACKENDS: + allowed = ", ".join(sorted(ALLOWED_INJECTION_BACKENDS)) + raise ValueError(f"injection.backend must be one of: {allowed}") + cfg.injection.backend = backend + + if not isinstance(cfg.ai.enabled, bool): + raise ValueError("ai.enabled must be boolean") + + if not isinstance(cfg.logging.log_transcript, bool): + raise ValueError("logging.log_transcript must be boolean") + + +def _from_dict(data: dict[str, Any], cfg: Config) -> Config: + has_sections = any( + key in data for key in ("daemon", "recording", "stt", "injection", "ai", "logging") + ) + if has_sections: + daemon = _ensure_dict(data.get("daemon"), "daemon") + recording = _ensure_dict(data.get("recording"), "recording") + stt = _ensure_dict(data.get("stt"), "stt") + injection = _ensure_dict(data.get("injection"), "injection") + ai = _ensure_dict(data.get("ai"), "ai") + logging_cfg = _ensure_dict(data.get("logging"), "logging") + + if "hotkey" in daemon: + cfg.daemon.hotkey = _as_nonempty_str(daemon["hotkey"], "daemon.hotkey") + if "input" in recording: + cfg.recording.input = _as_recording_input(recording["input"]) + if "model" in stt: + cfg.stt.model = _as_nonempty_str(stt["model"], "stt.model") + if "device" in stt: + cfg.stt.device = _as_nonempty_str(stt["device"], "stt.device") + if "backend" in injection: + cfg.injection.backend = _as_nonempty_str(injection["backend"], "injection.backend") + if "enabled" in ai: + cfg.ai.enabled = _as_bool(ai["enabled"], "ai.enabled") + if "log_transcript" in logging_cfg: + cfg.logging.log_transcript = _as_bool(logging_cfg["log_transcript"], "logging.log_transcript") + return cfg + + if "hotkey" in data: + cfg.daemon.hotkey = _as_nonempty_str(data["hotkey"], "hotkey") + if "input" in data: + cfg.recording.input = _as_recording_input(data["input"]) + if "whisper_model" in data: + cfg.stt.model = _as_nonempty_str(data["whisper_model"], "whisper_model") + if "whisper_device" in data: + cfg.stt.device = _as_nonempty_str(data["whisper_device"], "whisper_device") + if "injection_backend" in data: + cfg.injection.backend = _as_nonempty_str(data["injection_backend"], "injection_backend") + if "ai_enabled" in data: + cfg.ai.enabled = _as_bool(data["ai_enabled"], "ai_enabled") + if "log_transcript" in data: + cfg.logging.log_transcript = _as_bool(data["log_transcript"], "log_transcript") + return cfg + + +def _ensure_dict(value: Any, field_name: str) -> dict[str, Any]: + if value is None: + return {} + if not isinstance(value, dict): + raise ValueError(f"{field_name} must be an object") + return value + + +def _as_nonempty_str(value: Any, field_name: str) -> str: + if not isinstance(value, str): + raise ValueError(f"{field_name} must be a string") + if not value.strip(): + raise ValueError(f"{field_name} cannot be empty") + return value + + +def _as_bool(value: Any, field_name: str) -> bool: + if not isinstance(value, bool): + raise ValueError(f"{field_name} must be boolean") + return value + + +def _as_recording_input(value: Any) -> str | int | None: + if value is None: + return None + if isinstance(value, bool): + raise ValueError("recording.input cannot be boolean") + if isinstance(value, (str, int)): + return value + raise ValueError("recording.input must be string, integer, or null") diff --git a/src/constants.py b/src/constants.py index 81a4857..8b20634 100644 --- a/src/constants.py +++ b/src/constants.py @@ -14,4 +14,3 @@ MODEL_URL = ( ) MODEL_DIR = Path.home() / ".cache" / "lel" / "models" MODEL_PATH = MODEL_DIR / MODEL_NAME -LLM_LANGUAGE = "en" diff --git a/src/desktop.py b/src/desktop.py index a20c704..3f321fa 100644 --- a/src/desktop.py +++ b/src/desktop.py @@ -14,12 +14,13 @@ class DesktopAdapter(Protocol): def run_tray(self, state_getter: Callable[[], str], on_quit: Callable[[], None]) -> None: raise NotImplementedError + def request_quit(self) -> None: + raise NotImplementedError + def get_desktop_adapter() -> DesktopAdapter: session_type = os.getenv("XDG_SESSION_TYPE", "").lower() if session_type == "wayland" or os.getenv("WAYLAND_DISPLAY"): - from desktop_wayland import WaylandAdapter - raise SystemExit( "Wayland is not supported yet. Run under X11 (XDG_SESSION_TYPE=x11) to use lel." ) diff --git a/src/desktop_wayland.py b/src/desktop_wayland.py index 5506791..0b6903f 100644 --- a/src/desktop_wayland.py +++ b/src/desktop_wayland.py @@ -12,3 +12,6 @@ class WaylandAdapter: def run_tray(self, _state_getter: Callable[[], str], _on_quit: Callable[[], None]) -> None: raise SystemExit("Wayland tray support is not available yet.") + + def request_quit(self) -> None: + return diff --git a/src/desktop_x11.py b/src/desktop_x11.py index 48b428c..5024ec6 100644 --- a/src/desktop_x11.py +++ b/src/desktop_x11.py @@ -80,7 +80,7 @@ class X11Adapter: def run_tray(self, state_getter: Callable[[], str], on_quit: Callable[[], None]) -> None: self.menu = Gtk.Menu() quit_item = Gtk.MenuItem(label="Quit") - quit_item.connect("activate", lambda *_: on_quit()) + quit_item.connect("activate", lambda *_: self._handle_quit(on_quit)) self.menu.append(quit_item) self.menu.show_all() if self.indicator is not None: @@ -90,24 +90,39 @@ class X11Adapter: GLib.timeout_add(TRAY_UPDATE_MS, self._update_tray, state_getter) Gtk.main() - def _listen(self, hotkey: str, callback: Callable[[], None]) -> None: - disp = display.Display() - root = disp.screen().root - mods, keysym = self._parse_hotkey(hotkey) - keycode = self._grab_hotkey(disp, root, mods, keysym) + def request_quit(self) -> None: + GLib.idle_add(Gtk.main_quit) + + def _handle_quit(self, on_quit: Callable[[], None]) -> None: try: + on_quit() + finally: + self.request_quit() + + def _listen(self, hotkey: str, callback: Callable[[], None]) -> None: + disp = None + root = None + keycode = None + try: + disp = display.Display() + root = disp.screen().root + mods, keysym = self._parse_hotkey(hotkey) + keycode = self._grab_hotkey(disp, root, mods, keysym) while True: ev = disp.next_event() if ev.type == X.KeyPress and ev.detail == keycode: state = ev.state & ~(X.LockMask | X.Mod2Mask) if state == mods: callback() + except Exception as exc: + logging.error("hotkey listener stopped: %s", exc) finally: - try: - root.ungrab_key(keycode, X.AnyModifier) - disp.sync() - except Exception: - pass + if root is not None and keycode is not None and disp is not None: + try: + root.ungrab_key(keycode, X.AnyModifier) + disp.sync() + except Exception: + pass def _parse_hotkey(self, hotkey: str): parts = [p.strip() for p in hotkey.split("+") if p.strip()] diff --git a/src/leld.py b/src/leld.py index 97fc259..f322aae 100755 --- a/src/leld.py +++ b/src/leld.py @@ -1,5 +1,8 @@ #!/usr/bin/env python3 +from __future__ import annotations + import argparse +import errno import json import logging import os @@ -8,15 +11,14 @@ import sys import threading import time from pathlib import Path +from typing import Any -import gi -from faster_whisper import WhisperModel - +from aiprocess import LlamaProcessor from config import Config, load, redacted_dict from constants import RECORD_TIMEOUT_SEC, STT_LANGUAGE -from recorder import start_recording, stop_recording -from aiprocess import LlamaProcessor from desktop import get_desktop_adapter +from recorder import start_recording as start_audio_recording +from recorder import stop_recording as stop_audio_recording class State: @@ -27,11 +29,26 @@ class State: OUTPUTTING = "outputting" +_LOCK_HANDLE = None + + +def _build_whisper_model(model_name: str, device: str): + try: + from faster_whisper import WhisperModel # type: ignore[import-not-found] + except ModuleNotFoundError as exc: + raise RuntimeError( + "faster-whisper is not installed; install dependencies with `uv sync`" + ) from exc + return WhisperModel( + model_name, + device=device, + compute_type=_compute_type(device), + ) def _compute_type(device: str) -> str: dev = (device or "cpu").lower() - if dev == "cuda": + if dev.startswith("cuda"): return "float16" return "int8" @@ -40,17 +57,20 @@ class Daemon: def __init__(self, cfg: Config, desktop, *, verbose: bool = False): self.cfg = cfg self.desktop = desktop + self.verbose = verbose self.lock = threading.Lock() + self._shutdown_requested = threading.Event() self.state = State.IDLE - self.proc = None + self.stream = None self.record = None - self.timer = None - self.model = WhisperModel( - cfg.stt.get("model", "base"), - device=cfg.stt.get("device", "cpu"), - compute_type=_compute_type(cfg.stt.get("device", "cpu")), + self.timer: threading.Timer | None = None + self.model = _build_whisper_model( + cfg.stt.model, + cfg.stt.device, ) - self.ai_processor = LlamaProcessor(verbose=verbose) + self.ai_enabled = cfg.ai.enabled + self.ai_processor: LlamaProcessor | None = None + self.log_transcript = cfg.logging.log_transcript or verbose def set_state(self, state: str): with self.lock: @@ -63,29 +83,39 @@ class Daemon: with self.lock: return self.state - def _quit(self): - os._exit(0) + def request_shutdown(self): + self._shutdown_requested.set() def toggle(self): + should_stop = False with self.lock: + if self._shutdown_requested.is_set(): + logging.info("shutdown in progress, trigger ignored") + return if self.state == State.IDLE: self._start_recording_locked() return if self.state == State.RECORDING: - self.state = State.STT - threading.Thread(target=self._stop_and_process, daemon=True).start() - return - logging.info("busy (%s), trigger ignored", self.state) + should_stop = True + else: + logging.info("busy (%s), trigger ignored", self.state) + if should_stop: + self.stop_recording(trigger="user") def _start_recording_locked(self): + if self.state != State.IDLE: + logging.info("busy (%s), trigger ignored", self.state) + return try: - proc, record = start_recording(self.cfg.recording.get("input", "")) + stream, record = start_audio_recording(self.cfg.recording.input) except Exception as exc: logging.error("record start failed: %s", exc) return - self.proc = proc + self.stream = stream self.record = record + prev = self.state self.state = State.RECORDING + logging.info("state: %s -> %s", prev, self.state) logging.info("recording started") if self.timer: self.timer.cancel() @@ -94,30 +124,45 @@ class Daemon: self.timer.start() def _timeout_stop(self): - with self.lock: - if self.state != State.RECORDING: - return - self.state = State.STT - threading.Thread(target=self._stop_and_process, daemon=True).start() + self.stop_recording(trigger="timeout") - def _stop_and_process(self): - proc = self.proc + def _start_stop_worker(self, stream: Any, record: Any, trigger: str, process_audio: bool): + threading.Thread( + target=self._stop_and_process, + args=(stream, record, trigger, process_audio), + daemon=True, + ).start() + + def _begin_stop_locked(self): + if self.state != State.RECORDING: + return None + stream = self.stream record = self.record - self.proc = None + self.stream = None self.record = None if self.timer: self.timer.cancel() self.timer = None + prev = self.state + self.state = State.STT + logging.info("state: %s -> %s", prev, self.state) - if not proc or not record: + if stream is None or record is None: + logging.warning("recording resources are unavailable during stop") + self.state = State.IDLE + return None + return stream, record + + def _stop_and_process(self, stream: Any, record: Any, trigger: str, process_audio: bool): + logging.info("stopping recording (%s)", trigger) + try: + audio = stop_audio_recording(stream, record) + except Exception as exc: + logging.error("record stop failed: %s", exc) self.set_state(State.IDLE) return - logging.info("stopping recording (user)") - try: - audio = stop_recording(proc, record) - except Exception as exc: - logging.error("record stop failed: %s", exc) + if not process_audio or self._shutdown_requested.is_set(): self.set_state(State.IDLE) return @@ -140,35 +185,64 @@ class Daemon: self.set_state(State.IDLE) return - logging.info("stt: %s", text) + if self.log_transcript: + logging.info("stt: %s", text) + else: + logging.info("stt produced %d chars", len(text)) - self.set_state(State.PROCESSING) - logging.info("ai processing started") - try: - ai_input = text - text = self.ai_processor.process(ai_input) or text - except Exception as exc: - logging.error("ai process failed: %s", exc) + if self.ai_enabled and not self._shutdown_requested.is_set(): + self.set_state(State.PROCESSING) + logging.info("ai processing started") + try: + processor = self._get_ai_processor() + ai_text = processor.process(text) + if ai_text and ai_text.strip(): + text = ai_text.strip() + except Exception as exc: + logging.error("ai process failed: %s", exc) + else: + logging.info("ai processing disabled") - logging.info("processed: %s", text) + if self.log_transcript: + logging.info("processed: %s", text) + else: + logging.info("processed text length: %d", len(text)) + + if self._shutdown_requested.is_set(): + self.set_state(State.IDLE) + return try: self.set_state(State.OUTPUTTING) logging.info("outputting started") - backend = self.cfg.injection.get("backend", "clipboard") + backend = self.cfg.injection.backend self.desktop.inject_text(text, backend) except Exception as exc: logging.error("output failed: %s", exc) finally: self.set_state(State.IDLE) - - def stop_recording(self): + def stop_recording(self, *, trigger: str = "user", process_audio: bool = True): + payload = None with self.lock: - if self.state != State.RECORDING: - return - self.state = State.STT - threading.Thread(target=self._stop_and_process, daemon=True).start() + payload = self._begin_stop_locked() + if payload is None: + return + stream, record = payload + self._start_stop_worker(stream, record, trigger, process_audio) + + def shutdown(self, timeout: float = 5.0) -> bool: + self.request_shutdown() + self.stop_recording(trigger="shutdown", process_audio=False) + return self.wait_for_idle(timeout) + + def wait_for_idle(self, timeout: float) -> bool: + end = time.time() + timeout + while time.time() < end: + if self.get_state() == State.IDLE: + return True + time.sleep(0.05) + return self.get_state() == State.IDLE def _transcribe(self, audio) -> str: segments, _info = self.model.transcribe(audio, language=STT_LANGUAGE, vad_filter=True) @@ -179,22 +253,50 @@ class Daemon: parts.append(text) return " ".join(parts).strip() + def _get_ai_processor(self) -> LlamaProcessor: + if self.ai_processor is None: + self.ai_processor = LlamaProcessor(verbose=self.verbose) + return self.ai_processor + + +def _read_lock_pid(lock_file) -> str: + lock_file.seek(0) + return lock_file.read().strip() + + def _lock_single_instance(): runtime_dir = Path(os.getenv("XDG_RUNTIME_DIR", "/tmp")) / "lel" runtime_dir.mkdir(parents=True, exist_ok=True) lock_path = runtime_dir / "lel.lock" - f = open(lock_path, "w") + lock_file = open(lock_path, "a+", encoding="utf-8") try: import fcntl - fcntl.flock(f, fcntl.LOCK_EX | fcntl.LOCK_NB) - except Exception: - # TODO: kindly try to handle the running PID to the user cleanly in stdout if it's easy to get - raise SystemExit("already running") - return f + fcntl.flock(lock_file, fcntl.LOCK_EX | fcntl.LOCK_NB) + except BlockingIOError as exc: + pid = _read_lock_pid(lock_file) + lock_file.close() + if pid: + raise SystemExit(f"already running (pid={pid})") from exc + raise SystemExit("already running") from exc + except OSError as exc: + if exc.errno in (errno.EACCES, errno.EAGAIN): + pid = _read_lock_pid(lock_file) + lock_file.close() + if pid: + raise SystemExit(f"already running (pid={pid})") from exc + raise SystemExit("already running") from exc + raise + + lock_file.seek(0) + lock_file.truncate() + lock_file.write(f"{os.getpid()}\n") + lock_file.flush() + return lock_file def main(): + global _LOCK_HANDLE parser = argparse.ArgumentParser() parser.add_argument("--config", default="", help="path to config.json") parser.add_argument("--dry-run", action="store_true", help="log hotkey only") @@ -207,37 +309,50 @@ def main(): format="lel: %(asctime)s %(levelname)s %(message)s", ) cfg = load(args.config) - _lock_single_instance() + _LOCK_HANDLE = _lock_single_instance() - logging.info("hotkey: %s", cfg.daemon.get("hotkey", "")) - logging.info("config (%s):\n%s", args.config or str(Path.home() / ".config" / "lel" / "config.json"), json.dumps(redacted_dict(cfg), indent=2)) + logging.info("hotkey: %s", cfg.daemon.hotkey) + logging.info( + "config (%s):\n%s", + args.config or str(Path.home() / ".config" / "lel" / "config.json"), + json.dumps(redacted_dict(cfg), indent=2), + ) if args.verbose: logging.getLogger().setLevel(logging.DEBUG) - desktop = get_desktop_adapter() try: + desktop = get_desktop_adapter() daemon = Daemon(cfg, desktop, verbose=args.verbose) except Exception as exc: logging.error("startup failed: %s", exc) raise SystemExit(1) + shutdown_once = threading.Event() + + def shutdown(reason: str): + if shutdown_once.is_set(): + return + shutdown_once.set() + logging.info("%s, shutting down", reason) + if not daemon.shutdown(timeout=5.0): + logging.warning("timed out waiting for idle state during shutdown") + desktop.request_quit() + def handle_signal(_sig, _frame): - logging.info("signal received, shutting down") - daemon.stop_recording() - end = time.time() + 5 - while time.time() < end and daemon.get_state() != State.IDLE: - time.sleep(0.1) - os._exit(0) + threading.Thread(target=shutdown, args=("signal received",), daemon=True).start() signal.signal(signal.SIGINT, handle_signal) signal.signal(signal.SIGTERM, handle_signal) desktop.start_hotkey_listener( - cfg.daemon.get("hotkey", ""), + cfg.daemon.hotkey, lambda: logging.info("hotkey pressed (dry-run)") if args.dry_run else daemon.toggle(), ) logging.info("ready") - desktop.run_tray(daemon.get_state, daemon._quit) + try: + desktop.run_tray(daemon.get_state, lambda: shutdown("quit requested")) + finally: + daemon.shutdown(timeout=1.0) if __name__ == "__main__": diff --git a/src/recorder.py b/src/recorder.py index 4b2054c..00aa51b 100644 --- a/src/recorder.py +++ b/src/recorder.py @@ -1,8 +1,7 @@ from dataclasses import dataclass, field -from typing import Iterable +from typing import Any, Iterable import numpy as np -import sounddevice as sd # type: ignore[import-not-found] @dataclass @@ -14,6 +13,7 @@ class RecordResult: def list_input_devices() -> list[dict]: + sd = _sounddevice() devices = [] for idx, info in enumerate(sd.query_devices()): if info.get("max_input_channels", 0) > 0: @@ -22,6 +22,7 @@ def list_input_devices() -> list[dict]: def default_input_device() -> int | None: + sd = _sounddevice() default = sd.default.device if isinstance(default, (tuple, list)) and default: return default[0] @@ -48,7 +49,8 @@ def resolve_input_device(spec: str | int | None) -> int | None: return None -def start_recording(input_spec: str | int | None) -> tuple[sd.InputStream, RecordResult]: +def start_recording(input_spec: str | int | None) -> tuple[Any, RecordResult]: + sd = _sounddevice() record = RecordResult() device = resolve_input_device(input_spec) @@ -66,13 +68,23 @@ def start_recording(input_spec: str | int | None) -> tuple[sd.InputStream, Recor return stream, record -def stop_recording(stream: sd.InputStream, record: RecordResult) -> np.ndarray: +def stop_recording(stream: Any, record: RecordResult) -> np.ndarray: if stream: stream.stop() stream.close() return _flatten_frames(record.frames) +def _sounddevice(): + try: + import sounddevice as sd # type: ignore[import-not-found] + except ModuleNotFoundError as exc: + raise RuntimeError( + "sounddevice is not installed; install dependencies with `uv sync --extra x11`" + ) from exc + return sd + + def _flatten_frames(frames: Iterable[np.ndarray]) -> np.ndarray: frames = list(frames) if not frames: diff --git a/systemd/lel.service b/systemd/lel.service index 8a1d407..ef31fd1 100644 --- a/systemd/lel.service +++ b/systemd/lel.service @@ -4,7 +4,8 @@ After=default.target [Service] Type=simple -ExecStart=/usr/bin/uv run python3 %h/.local/bin/leld.py --config %h/.config/lel/config.json +WorkingDirectory=%h/.local/share/lel +ExecStart=/usr/bin/uv run python3 %h/.local/share/lel/src/leld.py --config %h/.config/lel/config.json Restart=on-failure RestartSec=2 diff --git a/tests/test_config.py b/tests/test_config.py new file mode 100644 index 0000000..26a6fd0 --- /dev/null +++ b/tests/test_config.py @@ -0,0 +1,98 @@ +import json +import sys +import tempfile +import unittest +from pathlib import Path + +ROOT = Path(__file__).resolve().parents[1] +SRC = ROOT / "src" +if str(SRC) not in sys.path: + sys.path.insert(0, str(SRC)) + +from config import load + + +class ConfigTests(unittest.TestCase): + def test_defaults_when_file_missing(self): + missing = Path(tempfile.gettempdir()) / "lel_missing_config_test.json" + if missing.exists(): + missing.unlink() + + cfg = load(str(missing)) + + self.assertEqual(cfg.daemon.hotkey, "Cmd+m") + self.assertEqual(cfg.recording.input, "") + self.assertEqual(cfg.stt.model, "base") + self.assertEqual(cfg.stt.device, "cpu") + self.assertEqual(cfg.injection.backend, "clipboard") + self.assertTrue(cfg.ai.enabled) + self.assertFalse(cfg.logging.log_transcript) + + def test_loads_nested_config(self): + payload = { + "daemon": {"hotkey": "Ctrl+space"}, + "recording": {"input": 3}, + "stt": {"model": "small", "device": "cuda"}, + "injection": {"backend": "injection"}, + "ai": {"enabled": False}, + "logging": {"log_transcript": True}, + } + with tempfile.TemporaryDirectory() as td: + path = Path(td) / "config.json" + path.write_text(json.dumps(payload), encoding="utf-8") + + cfg = load(str(path)) + + self.assertEqual(cfg.daemon.hotkey, "Ctrl+space") + self.assertEqual(cfg.recording.input, 3) + self.assertEqual(cfg.stt.model, "small") + self.assertEqual(cfg.stt.device, "cuda") + self.assertEqual(cfg.injection.backend, "injection") + self.assertFalse(cfg.ai.enabled) + self.assertTrue(cfg.logging.log_transcript) + + def test_loads_legacy_keys(self): + payload = { + "hotkey": "Alt+m", + "input": "Mic", + "whisper_model": "tiny", + "whisper_device": "cpu", + "injection_backend": "clipboard", + "ai_enabled": False, + "log_transcript": True, + } + with tempfile.TemporaryDirectory() as td: + path = Path(td) / "config.json" + path.write_text(json.dumps(payload), encoding="utf-8") + + cfg = load(str(path)) + + self.assertEqual(cfg.daemon.hotkey, "Alt+m") + self.assertEqual(cfg.recording.input, "Mic") + self.assertEqual(cfg.stt.model, "tiny") + self.assertEqual(cfg.stt.device, "cpu") + self.assertEqual(cfg.injection.backend, "clipboard") + self.assertFalse(cfg.ai.enabled) + self.assertTrue(cfg.logging.log_transcript) + + def test_invalid_injection_backend_raises(self): + payload = {"injection": {"backend": "invalid"}} + with tempfile.TemporaryDirectory() as td: + path = Path(td) / "config.json" + path.write_text(json.dumps(payload), encoding="utf-8") + + with self.assertRaisesRegex(ValueError, "injection.backend"): + load(str(path)) + + def test_invalid_logging_flag_raises(self): + payload = {"logging": {"log_transcript": "yes"}} + with tempfile.TemporaryDirectory() as td: + path = Path(td) / "config.json" + path.write_text(json.dumps(payload), encoding="utf-8") + + with self.assertRaisesRegex(ValueError, "logging.log_transcript"): + load(str(path)) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_leld.py b/tests/test_leld.py new file mode 100644 index 0000000..4cd724c --- /dev/null +++ b/tests/test_leld.py @@ -0,0 +1,105 @@ +import os +import sys +import tempfile +import unittest +from pathlib import Path +from unittest.mock import patch + +ROOT = Path(__file__).resolve().parents[1] +SRC = ROOT / "src" +if str(SRC) not in sys.path: + sys.path.insert(0, str(SRC)) + +import leld +from config import Config + + +class FakeDesktop: + def __init__(self): + self.inject_calls = [] + self.quit_calls = 0 + + def inject_text(self, text: str, backend: str) -> None: + self.inject_calls.append((text, backend)) + + def request_quit(self) -> None: + self.quit_calls += 1 + + +class FakeSegment: + def __init__(self, text: str): + self.text = text + + +class FakeModel: + def transcribe(self, _audio, language=None, vad_filter=None): + return [FakeSegment("hello world")], {"language": language, "vad_filter": vad_filter} + + +class FakeAudio: + def __init__(self, size: int): + self.size = size + + +class DaemonTests(unittest.TestCase): + def _config(self) -> Config: + cfg = Config() + cfg.ai.enabled = False + cfg.logging.log_transcript = False + return cfg + + @patch("leld._build_whisper_model", return_value=FakeModel()) + @patch("leld.stop_audio_recording", return_value=FakeAudio(8)) + @patch("leld.start_audio_recording", return_value=(object(), object())) + def test_toggle_start_stop_injects_text(self, _start_mock, _stop_mock, _model_mock): + desktop = FakeDesktop() + daemon = leld.Daemon(self._config(), desktop, verbose=False) + daemon._start_stop_worker = ( + lambda stream, record, trigger, process_audio: daemon._stop_and_process( + stream, record, trigger, process_audio + ) + ) + + daemon.toggle() + self.assertEqual(daemon.get_state(), leld.State.RECORDING) + + daemon.toggle() + + self.assertEqual(daemon.get_state(), leld.State.IDLE) + self.assertEqual(desktop.inject_calls, [("hello world", "clipboard")]) + + @patch("leld._build_whisper_model", return_value=FakeModel()) + @patch("leld.stop_audio_recording", return_value=FakeAudio(8)) + @patch("leld.start_audio_recording", return_value=(object(), object())) + def test_shutdown_stops_recording_without_injection(self, _start_mock, _stop_mock, _model_mock): + desktop = FakeDesktop() + daemon = leld.Daemon(self._config(), desktop, verbose=False) + daemon._start_stop_worker = ( + lambda stream, record, trigger, process_audio: daemon._stop_and_process( + stream, record, trigger, process_audio + ) + ) + + daemon.toggle() + self.assertEqual(daemon.get_state(), leld.State.RECORDING) + + self.assertTrue(daemon.shutdown(timeout=0.2)) + self.assertEqual(daemon.get_state(), leld.State.IDLE) + self.assertEqual(desktop.inject_calls, []) + + +class LockTests(unittest.TestCase): + def test_lock_rejects_second_instance(self): + with tempfile.TemporaryDirectory() as td: + with patch.dict(os.environ, {"XDG_RUNTIME_DIR": td}, clear=False): + first = leld._lock_single_instance() + try: + with self.assertRaises(SystemExit) as ctx: + leld._lock_single_instance() + self.assertIn("already running", str(ctx.exception)) + finally: + first.close() + + +if __name__ == "__main__": + unittest.main() diff --git a/uv.lock b/uv.lock index 64838b6..3b3a54a 100644 --- a/uv.lock +++ b/uv.lock @@ -410,6 +410,8 @@ source = { virtual = "." } dependencies = [ { name = "faster-whisper" }, { name = "llama-cpp-python" }, + { name = "numpy", version = "2.2.6", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version < '3.11'" }, + { name = "numpy", version = "2.4.2", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version >= '3.11'" }, { name = "pillow" }, { name = "sounddevice" }, ] @@ -424,6 +426,7 @@ x11 = [ requires-dist = [ { name = "faster-whisper" }, { name = "llama-cpp-python" }, + { name = "numpy" }, { name = "pillow" }, { name = "pygobject", marker = "extra == 'x11'" }, { name = "python-xlib", marker = "extra == 'x11'" },