293 lines
8.9 KiB
Python
Executable file
293 lines
8.9 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
import argparse
|
|
import json
|
|
import logging
|
|
import os
|
|
import signal
|
|
import sys
|
|
import threading
|
|
import time
|
|
from pathlib import Path
|
|
|
|
import gi
|
|
from faster_whisper import WhisperModel
|
|
|
|
from config import Config, load, redacted_dict
|
|
from recorder import start_recording, stop_recording
|
|
from aiprocess import AIConfig, build_processor
|
|
from inject import inject
|
|
from x11_hotkey import listen
|
|
|
|
gi.require_version("Gtk", "3.0")
|
|
|
|
from gi.repository import GLib, Gtk # type: ignore[import-not-found]
|
|
|
|
|
|
class State:
|
|
IDLE = "idle"
|
|
RECORDING = "recording"
|
|
STT = "stt"
|
|
PROCESSING = "processing"
|
|
OUTPUTTING = "outputting"
|
|
|
|
|
|
ASSETS_DIR = Path(__file__).parent / "assets"
|
|
RECORD_TIMEOUT_SEC = 300
|
|
STT_LANGUAGE = "en"
|
|
TRAY_UPDATE_MS = 250
|
|
|
|
|
|
def _compute_type(device: str) -> str:
|
|
dev = (device or "cpu").lower()
|
|
if dev == "cuda":
|
|
return "float16"
|
|
return "int8"
|
|
|
|
|
|
class Daemon:
|
|
def __init__(self, cfg: Config):
|
|
self.cfg = cfg
|
|
self.lock = threading.Lock()
|
|
self.state = State.IDLE
|
|
self.proc = None
|
|
self.record = None
|
|
self.timer = None
|
|
self.model = WhisperModel(
|
|
cfg.stt.get("model", "base"),
|
|
device=cfg.stt.get("device", "cpu"),
|
|
compute_type=_compute_type(cfg.stt.get("device", "cpu")),
|
|
)
|
|
self.icon = Gtk.StatusIcon()
|
|
self.icon.set_visible(True)
|
|
self.icon.connect("popup-menu", self._on_tray_menu)
|
|
self.menu = Gtk.Menu()
|
|
quit_item = Gtk.MenuItem(label="Quit")
|
|
quit_item.connect("activate", lambda *_: self._quit())
|
|
self.menu.append(quit_item)
|
|
self.menu.show_all()
|
|
|
|
def set_state(self, state: str):
|
|
with self.lock:
|
|
prev = self.state
|
|
self.state = state
|
|
if prev != state:
|
|
logging.info("state: %s -> %s", prev, state)
|
|
|
|
def get_state(self):
|
|
with self.lock:
|
|
return self.state
|
|
|
|
def _quit(self):
|
|
os._exit(0)
|
|
|
|
def _on_tray_menu(self, _icon, _button, _time):
|
|
self.menu.popup(None, None, None, None, 0, _time)
|
|
|
|
def toggle(self):
|
|
with self.lock:
|
|
if self.state == State.IDLE:
|
|
self._start_recording_locked()
|
|
return
|
|
if self.state == State.RECORDING:
|
|
self.state = State.STT
|
|
threading.Thread(target=self._stop_and_process, daemon=True).start()
|
|
return
|
|
logging.info("busy (%s), trigger ignored", self.state)
|
|
|
|
def _start_recording_locked(self):
|
|
try:
|
|
proc, record = start_recording(self.cfg.recording.get("input", ""))
|
|
except Exception as exc:
|
|
logging.error("record start failed: %s", exc)
|
|
return
|
|
self.proc = proc
|
|
self.record = record
|
|
self.state = State.RECORDING
|
|
logging.info("recording started (%s)", record.wav_path)
|
|
if self.timer:
|
|
self.timer.cancel()
|
|
self.timer = threading.Timer(RECORD_TIMEOUT_SEC, self._timeout_stop)
|
|
self.timer.daemon = True
|
|
self.timer.start()
|
|
|
|
def _timeout_stop(self):
|
|
with self.lock:
|
|
if self.state != State.RECORDING:
|
|
return
|
|
self.state = State.STT
|
|
threading.Thread(target=self._stop_and_process, daemon=True).start()
|
|
|
|
def _stop_and_process(self):
|
|
proc = self.proc
|
|
record = self.record
|
|
self.proc = None
|
|
self.record = None
|
|
if self.timer:
|
|
self.timer.cancel()
|
|
self.timer = None
|
|
|
|
if not proc or not record:
|
|
self.set_state(State.IDLE)
|
|
return
|
|
|
|
logging.info("stopping recording (user)")
|
|
try:
|
|
stop_recording(proc, record)
|
|
except Exception as exc:
|
|
logging.error("record stop failed: %s", exc)
|
|
self.set_state(State.IDLE)
|
|
return
|
|
|
|
if not Path(record.wav_path).exists():
|
|
logging.error("no audio captured")
|
|
self.set_state(State.IDLE)
|
|
return
|
|
|
|
try:
|
|
self.set_state(State.STT)
|
|
logging.info("stt started")
|
|
text = self._transcribe(record.wav_path)
|
|
except Exception as exc:
|
|
logging.error("stt failed: %s", exc)
|
|
self.set_state(State.IDLE)
|
|
return
|
|
|
|
text = (text or "").strip()
|
|
if not text:
|
|
self.set_state(State.IDLE)
|
|
return
|
|
|
|
logging.info("stt: %s", text)
|
|
|
|
ai_model = (self.cfg.ai_cleanup.get("model") or "").strip()
|
|
ai_base_url = (self.cfg.ai_cleanup.get("base_url") or "").strip()
|
|
if ai_model and ai_base_url:
|
|
self.set_state(State.PROCESSING)
|
|
logging.info("ai processing started")
|
|
try:
|
|
processor = build_processor(
|
|
AIConfig(
|
|
model=ai_model,
|
|
base_url=ai_base_url,
|
|
api_key=self.cfg.ai_cleanup.get("api_key", ""),
|
|
timeout_sec=25,
|
|
language_hint="en",
|
|
)
|
|
)
|
|
ai_input = text
|
|
text = processor.process(ai_input) or text
|
|
except Exception as exc:
|
|
logging.error("ai process failed: %s", exc)
|
|
|
|
logging.info("processed: %s", text)
|
|
|
|
try:
|
|
self.set_state(State.OUTPUTTING)
|
|
logging.info("outputting started")
|
|
backend = self.cfg.injection.get("backend", "clipboard")
|
|
inject(text, backend)
|
|
except Exception as exc:
|
|
logging.error("output failed: %s", exc)
|
|
finally:
|
|
self.set_state(State.IDLE)
|
|
|
|
|
|
def stop_recording(self):
|
|
with self.lock:
|
|
if self.state != State.RECORDING:
|
|
return
|
|
self.state = State.STT
|
|
threading.Thread(target=self._stop_and_process, daemon=True).start()
|
|
|
|
def _transcribe(self, wav_path: str) -> str:
|
|
segments, _info = self.model.transcribe(wav_path, language=STT_LANGUAGE, vad_filter=True)
|
|
parts = []
|
|
for seg in segments:
|
|
text = (seg.text or "").strip()
|
|
if text:
|
|
parts.append(text)
|
|
return " ".join(parts).strip()
|
|
|
|
def _icon_path(self, state: str) -> str:
|
|
if state == State.RECORDING:
|
|
return str(ASSETS_DIR / "recording.png")
|
|
if state == State.STT:
|
|
return str(ASSETS_DIR / "transcribing.png")
|
|
if state == State.PROCESSING:
|
|
return str(ASSETS_DIR / "processing.png")
|
|
return str(ASSETS_DIR / "idle.png")
|
|
|
|
def _title(self, state: str) -> str:
|
|
if state == State.RECORDING:
|
|
return "Recording"
|
|
if state == State.STT:
|
|
return "STT"
|
|
if state == State.PROCESSING:
|
|
return "AI Processing"
|
|
return "Idle"
|
|
|
|
def _update_tray(self):
|
|
state = self.get_state()
|
|
self.icon.set_from_file(self._icon_path(state))
|
|
self.icon.set_tooltip_text(self._title(state))
|
|
return True
|
|
|
|
def run_tray(self):
|
|
self._update_tray()
|
|
GLib.timeout_add(TRAY_UPDATE_MS, self._update_tray)
|
|
Gtk.main()
|
|
|
|
|
|
def _lock_single_instance():
|
|
runtime_dir = Path(os.getenv("XDG_RUNTIME_DIR", "/tmp")) / "lel"
|
|
runtime_dir.mkdir(parents=True, exist_ok=True)
|
|
lock_path = runtime_dir / "lel.lock"
|
|
f = open(lock_path, "w")
|
|
try:
|
|
import fcntl
|
|
|
|
fcntl.flock(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
|
|
except Exception:
|
|
# TODO: kindly try to handle the running PID to the user cleanly in stdout if it's easy to get
|
|
raise SystemExit("already running")
|
|
return f
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("--config", default="", help="path to config.json")
|
|
parser.add_argument("--dry-run", action="store_true", help="log hotkey only")
|
|
args = parser.parse_args()
|
|
|
|
logging.basicConfig(stream=sys.stderr, level=logging.INFO, format="leld: %(asctime)s %(message)s")
|
|
cfg = load(args.config)
|
|
_lock_single_instance()
|
|
|
|
logging.info("ready (hotkey: %s)", cfg.daemon.get("hotkey", ""))
|
|
logging.info("config (%s):\n%s", args.config or str(Path.home() / ".config" / "lel" / "config.json"), json.dumps(redacted_dict(cfg), indent=2))
|
|
|
|
daemon = Daemon(cfg)
|
|
|
|
def handle_signal(_sig, _frame):
|
|
logging.info("signal received, shutting down")
|
|
daemon.stop_recording()
|
|
end = time.time() + 5
|
|
while time.time() < end and daemon.get_state() != State.IDLE:
|
|
time.sleep(0.1)
|
|
os._exit(0)
|
|
|
|
signal.signal(signal.SIGINT, handle_signal)
|
|
signal.signal(signal.SIGTERM, handle_signal)
|
|
|
|
threading.Thread(
|
|
target=lambda: listen(
|
|
cfg.daemon.get("hotkey", ""),
|
|
lambda: logging.info("hotkey pressed (dry-run)") if args.dry_run else daemon.toggle(),
|
|
),
|
|
daemon=True,
|
|
).start()
|
|
daemon.run_tray()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|