#!/usr/bin/env python3 import argparse import json import logging import os import signal import sys import threading import time from pathlib import Path import gi from faster_whisper import WhisperModel from config import Config, load, redacted_dict from constants import RECORD_TIMEOUT_SEC, STT_LANGUAGE from recorder import start_recording, stop_recording from aiprocess import LlamaProcessor from desktop import get_desktop_adapter class State: IDLE = "idle" RECORDING = "recording" STT = "stt" PROCESSING = "processing" OUTPUTTING = "outputting" def _compute_type(device: str) -> str: dev = (device or "cpu").lower() if dev == "cuda": return "float16" return "int8" class Daemon: def __init__(self, cfg: Config, desktop, *, verbose: bool = False): self.cfg = cfg self.desktop = desktop self.lock = threading.Lock() self.state = State.IDLE self.proc = None self.record = None self.timer = None self.model = WhisperModel( cfg.stt.get("model", "base"), device=cfg.stt.get("device", "cpu"), compute_type=_compute_type(cfg.stt.get("device", "cpu")), ) self.ai_processor = LlamaProcessor(verbose=verbose) def set_state(self, state: str): with self.lock: prev = self.state self.state = state if prev != state: logging.info("state: %s -> %s", prev, state) def get_state(self): with self.lock: return self.state def _quit(self): os._exit(0) def toggle(self): with self.lock: if self.state == State.IDLE: self._start_recording_locked() return if self.state == State.RECORDING: self.state = State.STT threading.Thread(target=self._stop_and_process, daemon=True).start() return logging.info("busy (%s), trigger ignored", self.state) def _start_recording_locked(self): try: proc, record = start_recording(self.cfg.recording.get("input", "")) except Exception as exc: logging.error("record start failed: %s", exc) return self.proc = proc self.record = record self.state = State.RECORDING logging.info("recording started") if self.timer: self.timer.cancel() self.timer = threading.Timer(RECORD_TIMEOUT_SEC, self._timeout_stop) self.timer.daemon = True self.timer.start() def _timeout_stop(self): with self.lock: if self.state != State.RECORDING: return self.state = State.STT threading.Thread(target=self._stop_and_process, daemon=True).start() def _stop_and_process(self): proc = self.proc record = self.record self.proc = None self.record = None if self.timer: self.timer.cancel() self.timer = None if not proc or not record: self.set_state(State.IDLE) return logging.info("stopping recording (user)") try: audio = stop_recording(proc, record) except Exception as exc: logging.error("record stop failed: %s", exc) self.set_state(State.IDLE) return if audio.size == 0: logging.error("no audio captured") self.set_state(State.IDLE) return try: self.set_state(State.STT) logging.info("stt started") text = self._transcribe(audio) except Exception as exc: logging.error("stt failed: %s", exc) self.set_state(State.IDLE) return text = (text or "").strip() if not text: self.set_state(State.IDLE) return logging.info("stt: %s", text) self.set_state(State.PROCESSING) logging.info("ai processing started") try: ai_input = text text = self.ai_processor.process(ai_input) or text except Exception as exc: logging.error("ai process failed: %s", exc) logging.info("processed: %s", text) try: self.set_state(State.OUTPUTTING) logging.info("outputting started") backend = self.cfg.injection.get("backend", "clipboard") self.desktop.inject_text(text, backend) except Exception as exc: logging.error("output failed: %s", exc) finally: self.set_state(State.IDLE) def stop_recording(self): with self.lock: if self.state != State.RECORDING: return self.state = State.STT threading.Thread(target=self._stop_and_process, daemon=True).start() def _transcribe(self, audio) -> str: segments, _info = self.model.transcribe(audio, language=STT_LANGUAGE, vad_filter=True) parts = [] for seg in segments: text = (seg.text or "").strip() if text: parts.append(text) return " ".join(parts).strip() def _lock_single_instance(): runtime_dir = Path(os.getenv("XDG_RUNTIME_DIR", "/tmp")) / "lel" runtime_dir.mkdir(parents=True, exist_ok=True) lock_path = runtime_dir / "lel.lock" f = open(lock_path, "w") try: import fcntl fcntl.flock(f, fcntl.LOCK_EX | fcntl.LOCK_NB) except Exception: # TODO: kindly try to handle the running PID to the user cleanly in stdout if it's easy to get raise SystemExit("already running") return f def main(): parser = argparse.ArgumentParser() parser.add_argument("--config", default="", help="path to config.json") parser.add_argument("--dry-run", action="store_true", help="log hotkey only") parser.add_argument("-v", "--verbose", action="store_true", help="enable verbose logs") args = parser.parse_args() logging.basicConfig( stream=sys.stderr, level=logging.INFO, format="lel: %(asctime)s %(levelname)s %(message)s", ) cfg = load(args.config) _lock_single_instance() logging.info("hotkey: %s", cfg.daemon.get("hotkey", "")) logging.info("config (%s):\n%s", args.config or str(Path.home() / ".config" / "lel" / "config.json"), json.dumps(redacted_dict(cfg), indent=2)) if args.verbose: logging.getLogger().setLevel(logging.DEBUG) desktop = get_desktop_adapter() try: daemon = Daemon(cfg, desktop, verbose=args.verbose) except Exception as exc: logging.error("startup failed: %s", exc) raise SystemExit(1) def handle_signal(_sig, _frame): logging.info("signal received, shutting down") daemon.stop_recording() end = time.time() + 5 while time.time() < end and daemon.get_state() != State.IDLE: time.sleep(0.1) os._exit(0) signal.signal(signal.SIGINT, handle_signal) signal.signal(signal.SIGTERM, handle_signal) desktop.start_hotkey_listener( cfg.daemon.get("hotkey", ""), lambda: logging.info("hotkey pressed (dry-run)") if args.dry_run else daemon.toggle(), ) logging.info("ready") desktop.run_tray(daemon.get_state, daemon._quit) if __name__ == "__main__": main()