aman/src/leld.py

278 lines
8.8 KiB
Python
Executable file

#!/usr/bin/env python3
import argparse
import json
import logging
import os
import signal
import sys
import threading
import time
from pathlib import Path
from config import Config, load, redacted_dict
from recorder import start_recording, stop_recording
from stt import FasterWhisperSTT, STTConfig
from aiprocess import AIConfig, build_processor
from context import I3Provider
from inject import inject
from x11_hotkey import listen
from tray import run_tray
class State:
IDLE = "idle"
RECORDING = "recording"
TRANSCRIBING = "transcribing"
PROCESSING = "processing"
OUTPUTTING = "outputting"
class Daemon:
def __init__(self, cfg: Config):
self.cfg = cfg
self.lock = threading.Lock()
self.state = State.IDLE
self.proc = None
self.record = None
self.timer = None
self.context = None
self.context_provider = I3Provider()
self.stt = FasterWhisperSTT(
STTConfig(
model=cfg.transcribing.get("model", "base"),
language=None,
device=cfg.transcribing.get("device", "cpu"),
vad_filter=True,
)
)
self.ai = None
def set_state(self, state: str):
with self.lock:
prev = self.state
self.state = state
if prev != state:
logging.info("state: %s -> %s", prev, state)
def get_state(self):
with self.lock:
return self.state
def toggle(self):
with self.lock:
if self.state == State.IDLE:
self._start_recording_locked()
return
if self.state == State.RECORDING:
self.state = State.TRANSCRIBING
threading.Thread(target=self._stop_and_process, daemon=True).start()
return
logging.info("busy (%s), trigger ignored", self.state)
def _start_recording_locked(self):
try:
proc, record = start_recording(self.cfg.recording.get("input", ""))
except Exception as exc:
logging.error("record start failed: %s", exc)
return
try:
if self.context_provider:
self.context = self.context_provider.capture()
except Exception as exc:
logging.error("context capture failed: %s", exc)
self.context = None
if self.context:
logging.info(
"context: id=%s app_id=%s class=%s instance=%s title=%s",
self.context.window_id,
self.context.app_id,
self.context.klass,
self.context.instance,
self.context.title,
)
else:
logging.info("context: none")
self.proc = proc
self.record = record
self.state = State.RECORDING
logging.info("recording started (%s)", record.wav_path)
if self.timer:
self.timer.cancel()
self.timer = threading.Timer(300, self._timeout_stop)
self.timer.daemon = True
self.timer.start()
def _timeout_stop(self):
with self.lock:
if self.state != State.RECORDING:
return
self.state = State.TRANSCRIBING
threading.Thread(target=self._stop_and_process, daemon=True).start()
def _stop_and_process(self):
proc = self.proc
record = self.record
self.proc = None
self.record = None
if self.timer:
self.timer.cancel()
self.timer = None
if not proc or not record:
self.set_state(State.IDLE)
return
logging.info("stopping recording (user)")
try:
stop_recording(proc, record)
except Exception as exc:
logging.error("record stop failed: %s", exc)
self.set_state(State.IDLE)
return
if not Path(record.wav_path).exists():
logging.error("no audio captured")
self.set_state(State.IDLE)
return
try:
self.set_state(State.TRANSCRIBING)
logging.info("transcribing started")
text = self.stt.transcribe(record.wav_path, language="en")
except Exception as exc:
logging.error("stt failed: %s", exc)
self.set_state(State.IDLE)
return
text = (text or "").strip()
if not text:
self.set_state(State.IDLE)
return
logging.info("stt: %s", text)
ai_enabled = self.cfg.ai_cleanup.get("enabled", False)
ai_prompt_file = ""
if ai_enabled:
self.set_state(State.PROCESSING)
logging.info("ai processing started")
try:
processor = build_processor(
AIConfig(
model=self.cfg.ai_cleanup.get("model", ""),
temperature=self.cfg.ai_cleanup.get("temperature", 0.0),
system_prompt_file=ai_prompt_file,
base_url=self.cfg.ai_cleanup.get("base_url", ""),
api_key=self.cfg.ai_cleanup.get("api_key", ""),
timeout_sec=25,
language_hint="en",
)
)
ai_input = text
text = processor.process(ai_input) or text
except Exception as exc:
logging.error("ai process failed: %s", exc)
logging.info("processed: %s", text)
try:
self.set_state(State.OUTPUTTING)
logging.info("outputting started")
if self.context_provider and self.context:
if not self.context_provider.is_same_focus(self.context):
logging.info("focus changed, aborting injection")
self.set_state(State.IDLE)
return
backend = self.cfg.injection.get("backend", "clipboard")
inject(text, backend)
except Exception as exc:
logging.error("output failed: %s", exc)
finally:
self.set_state(State.IDLE)
def _context_json(self, ctx):
if not ctx:
return None
return {
"window_id": ctx.window_id,
"app_id": ctx.app_id,
"class": ctx.klass,
"instance": ctx.instance,
"title": ctx.title,
}
def stop_recording(self):
with self.lock:
if self.state != State.RECORDING:
return
self.state = State.TRANSCRIBING
threading.Thread(target=self._stop_and_process, daemon=True).start()
def _lock_single_instance():
runtime_dir = Path(os.getenv("XDG_RUNTIME_DIR", "/tmp")) / "lel"
runtime_dir.mkdir(parents=True, exist_ok=True)
lock_path = runtime_dir / "lel.lock"
f = open(lock_path, "w")
try:
import fcntl
fcntl.flock(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
except Exception:
# TODO: kindly try to handle the running PID to the user cleanly in stdout if it's easy to get
raise SystemExit("already running")
return f
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--config", default="", help="path to config.json")
parser.add_argument("--no-tray", action="store_true", help="disable tray icon")
parser.add_argument("--dry-run", action="store_true", help="log hotkey only")
args = parser.parse_args()
logging.basicConfig(stream=sys.stderr, level=logging.INFO, format="leld: %(asctime)s %(message)s")
cfg = load(args.config)
config_path = Path(args.config) if args.config else Path.home() / ".config" / "lel" / "config.json"
_lock_single_instance()
logging.info("ready (hotkey: %s)", cfg.daemon.get("hotkey", ""))
logging.info("config (%s):\n%s", args.config or str(Path.home() / ".config" / "lel" / "config.json"), json.dumps(redacted_dict(cfg), indent=2))
daemon = Daemon(cfg)
def on_quit():
os._exit(0)
def handle_signal(_sig, _frame):
logging.info("signal received, shutting down")
daemon.stop_recording()
end = time.time() + 5
while time.time() < end and daemon.get_state() != State.IDLE:
time.sleep(0.1)
os._exit(0)
signal.signal(signal.SIGINT, handle_signal)
signal.signal(signal.SIGTERM, handle_signal)
if args.no_tray:
listen(
cfg.daemon.get("hotkey", ""),
lambda: logging.info("hotkey pressed (dry-run)") if args.dry_run else daemon.toggle(),
)
return
threading.Thread(
target=lambda: listen(
cfg.daemon.get("hotkey", ""),
lambda: logging.info("hotkey pressed (dry-run)") if args.dry_run else daemon.toggle(),
),
daemon=True,
).start()
run_tray(daemon.get_state, on_quit, None)
if __name__ == "__main__":
main()