Migrate to Python daemon

This commit is contained in:
Thales Maciel 2026-02-07 15:12:17 -03:00
parent 49ef349d48
commit d81f3dbffe
42 changed files with 660 additions and 1816 deletions

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

46
src/aiprocess.py Normal file
View file

@ -0,0 +1,46 @@
from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
import ollama
def load_system_prompt(path: str | None) -> str:
if path:
return Path(path).read_text(encoding="utf-8").strip()
return (Path(__file__).parent / "system_prompt.txt").read_text(encoding="utf-8").strip()
@dataclass
class AIConfig:
provider: str
model: str
temperature: float
system_prompt_file: str
base_url: str
api_key: str
timeout_sec: int
class OllamaProcessor:
def __init__(self, cfg: AIConfig):
self.cfg = cfg
self.system = load_system_prompt(cfg.system_prompt_file)
self.client = ollama.Client(host=cfg.base_url)
def process(self, text: str) -> str:
resp = self.client.generate(
model=self.cfg.model,
prompt=text,
system=self.system,
options={"temperature": self.cfg.temperature},
)
return (resp.get("response") or "").strip()
def build_processor(cfg: AIConfig) -> OllamaProcessor:
provider = cfg.provider.strip().lower()
if provider != "ollama":
raise ValueError(f"unsupported ai provider: {cfg.provider}")
return OllamaProcessor(cfg)

BIN
src/assets/idle.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 82 B

BIN
src/assets/processing.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 82 B

BIN
src/assets/recording.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 82 B

BIN
src/assets/transcribing.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 82 B

109
src/config.py Normal file
View file

@ -0,0 +1,109 @@
import json
import os
from dataclasses import dataclass
from pathlib import Path
def _parse_bool(val: str) -> bool:
return val.strip().lower() in {"1", "true", "yes", "on"}
@dataclass
class Config:
hotkey: str = "Cmd+m"
ffmpeg_input: str = "pulse:default"
ffmpeg_path: str = ""
whisper_model: str = "base"
whisper_lang: str = "en"
whisper_device: str = "cpu"
whisper_extra_args: str = ""
whisper_timeout_sec: int = 300
record_timeout_sec: int = 120
segment_sec: int = 5
streaming: bool = False
injection_backend: str = "clipboard"
ai_enabled: bool = False
ai_provider: str = "ollama"
ai_model: str = "llama3.2:3b"
ai_temperature: float = 0.0
ai_system_prompt_file: str = ""
ai_base_url: str = "http://localhost:11434"
ai_api_key: str = ""
ai_timeout_sec: int = 20
def default_path() -> Path:
return Path.home() / ".config" / "lel" / "config.json"
def load(path: str | None) -> Config:
cfg = Config()
p = Path(path) if path else default_path()
if p.exists():
data = json.loads(p.read_text(encoding="utf-8"))
for k, v in data.items():
if hasattr(cfg, k):
setattr(cfg, k, v)
# env overrides
if os.getenv("WHISPER_MODEL"):
cfg.whisper_model = os.environ["WHISPER_MODEL"]
if os.getenv("WHISPER_LANG"):
cfg.whisper_lang = os.environ["WHISPER_LANG"]
if os.getenv("WHISPER_DEVICE"):
cfg.whisper_device = os.environ["WHISPER_DEVICE"]
if os.getenv("WHISPER_EXTRA_ARGS"):
cfg.whisper_extra_args = os.environ["WHISPER_EXTRA_ARGS"]
if os.getenv("WHISPER_FFMPEG_IN"):
cfg.ffmpeg_input = os.environ["WHISPER_FFMPEG_IN"]
if os.getenv("WHISPER_STREAM"):
cfg.streaming = _parse_bool(os.environ["WHISPER_STREAM"])
if os.getenv("WHISPER_SEGMENT_SEC"):
cfg.segment_sec = int(os.environ["WHISPER_SEGMENT_SEC"])
if os.getenv("WHISPER_TIMEOUT_SEC"):
cfg.whisper_timeout_sec = int(os.environ["WHISPER_TIMEOUT_SEC"])
if os.getenv("LEL_FFMPEG_PATH"):
cfg.ffmpeg_path = os.environ["LEL_FFMPEG_PATH"]
if os.getenv("LEL_RECORD_TIMEOUT_SEC"):
cfg.record_timeout_sec = int(os.environ["LEL_RECORD_TIMEOUT_SEC"])
if os.getenv("LEL_HOTKEY"):
cfg.hotkey = os.environ["LEL_HOTKEY"]
if os.getenv("LEL_INJECTION_BACKEND"):
cfg.injection_backend = os.environ["LEL_INJECTION_BACKEND"]
if os.getenv("LEL_AI_ENABLED"):
cfg.ai_enabled = _parse_bool(os.environ["LEL_AI_ENABLED"])
if os.getenv("LEL_AI_PROVIDER"):
cfg.ai_provider = os.environ["LEL_AI_PROVIDER"]
if os.getenv("LEL_AI_MODEL"):
cfg.ai_model = os.environ["LEL_AI_MODEL"]
if os.getenv("LEL_AI_TEMPERATURE"):
cfg.ai_temperature = float(os.environ["LEL_AI_TEMPERATURE"])
if os.getenv("LEL_AI_SYSTEM_PROMPT_FILE"):
cfg.ai_system_prompt_file = os.environ["LEL_AI_SYSTEM_PROMPT_FILE"]
if os.getenv("LEL_AI_BASE_URL"):
cfg.ai_base_url = os.environ["LEL_AI_BASE_URL"]
if os.getenv("LEL_AI_API_KEY"):
cfg.ai_api_key = os.environ["LEL_AI_API_KEY"]
if os.getenv("LEL_AI_TIMEOUT_SEC"):
cfg.ai_timeout_sec = int(os.environ["LEL_AI_TIMEOUT_SEC"])
if not cfg.hotkey:
raise ValueError("hotkey cannot be empty")
if cfg.record_timeout_sec <= 0:
raise ValueError("record_timeout_sec must be > 0")
if cfg.whisper_timeout_sec <= 0:
raise ValueError("whisper_timeout_sec must be > 0")
return cfg
def redacted_dict(cfg: Config) -> dict:
d = cfg.__dict__.copy()
d["ai_api_key"] = ""
return d

50
src/inject.py Normal file
View file

@ -0,0 +1,50 @@
import subprocess
import sys
def write_clipboard(text: str) -> None:
proc = subprocess.run(
["xclip", "-selection", "clipboard", "-in", "-quiet", "-loops", "1"],
input=text,
text=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
if proc.returncode != 0:
raise RuntimeError(proc.stderr.strip() or "xclip failed")
def paste_clipboard() -> None:
proc = subprocess.run(
["xdotool", "key", "--clearmodifiers", "ctrl+v"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
if proc.returncode != 0:
raise RuntimeError(proc.stderr.strip() or "xdotool paste failed")
def type_text(text: str) -> None:
if not text:
return
proc = subprocess.run(
["xdotool", "type", "--clearmodifiers", "--delay", "1", text],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
if proc.returncode != 0:
raise RuntimeError(proc.stderr.strip() or "xdotool type failed")
def inject(text: str, backend: str) -> None:
backend = (backend or "").strip().lower()
if backend in ("", "clipboard"):
write_clipboard(text)
paste_clipboard()
return
if backend == "injection":
type_text(text)
return
raise ValueError(f"unknown injection backend: {backend}")

209
src/leld.py Executable file
View file

@ -0,0 +1,209 @@
#!/usr/bin/env python3
import argparse
import json
import logging
import os
import signal
import sys
import threading
import time
from pathlib import Path
from config import Config, load, redacted_dict
from recorder import start_recording, stop_recording
from stt import WhisperSTT
from aiprocess import AIConfig, build_processor
from inject import inject
from x11_hotkey import listen
from tray import run_tray
class State:
IDLE = "idle"
RECORDING = "recording"
TRANSCRIBING = "transcribing"
PROCESSING = "processing"
OUTPUTTING = "outputting"
class Daemon:
def __init__(self, cfg: Config):
self.cfg = cfg
self.lock = threading.Lock()
self.state = State.IDLE
self.proc = None
self.record = None
self.timer = None
self.stt = WhisperSTT(cfg.whisper_model, cfg.whisper_lang, cfg.whisper_device)
self.ai = None
if cfg.ai_enabled:
self.ai = build_processor(
AIConfig(
provider=cfg.ai_provider,
model=cfg.ai_model,
temperature=cfg.ai_temperature,
system_prompt_file=cfg.ai_system_prompt_file,
base_url=cfg.ai_base_url,
api_key=cfg.ai_api_key,
timeout_sec=cfg.ai_timeout_sec,
)
)
def set_state(self, state: str):
with self.lock:
self.state = state
def get_state(self):
with self.lock:
return self.state
def toggle(self):
with self.lock:
if self.state == State.IDLE:
self._start_recording_locked()
return
if self.state == State.RECORDING:
self.state = State.TRANSCRIBING
threading.Thread(target=self._stop_and_process, daemon=True).start()
return
logging.info("busy (%s), trigger ignored", self.state)
def _start_recording_locked(self):
try:
proc, record = start_recording(self.cfg.ffmpeg_input, self.cfg.ffmpeg_path)
except Exception as exc:
logging.error("record start failed: %s", exc)
return
self.proc = proc
self.record = record
self.state = State.RECORDING
logging.info("recording started (%s)", record.wav_path)
if self.timer:
self.timer.cancel()
self.timer = threading.Timer(self.cfg.record_timeout_sec, self._timeout_stop)
self.timer.daemon = True
self.timer.start()
def _timeout_stop(self):
with self.lock:
if self.state != State.RECORDING:
return
self.state = State.TRANSCRIBING
threading.Thread(target=self._stop_and_process, daemon=True).start()
def _stop_and_process(self):
proc = self.proc
record = self.record
self.proc = None
self.record = None
if self.timer:
self.timer.cancel()
self.timer = None
if not proc or not record:
self.set_state(State.IDLE)
return
logging.info("stopping recording (user)")
try:
stop_recording(proc)
except Exception as exc:
logging.error("record stop failed: %s", exc)
self.set_state(State.IDLE)
return
if not Path(record.wav_path).exists():
logging.error("no audio captured")
self.set_state(State.IDLE)
return
try:
self.set_state(State.TRANSCRIBING)
text = self.stt.transcribe(record.wav_path)
except Exception as exc:
logging.error("whisper failed: %s", exc)
self.set_state(State.IDLE)
return
logging.info("transcript: %s", text)
if self.ai:
self.set_state(State.PROCESSING)
try:
text = self.ai.process(text) or text
except Exception as exc:
logging.error("ai process failed: %s", exc)
logging.info("output: %s", text)
try:
self.set_state(State.OUTPUTTING)
inject(text, self.cfg.injection_backend)
except Exception as exc:
logging.error("output failed: %s", exc)
finally:
self.set_state(State.IDLE)
def stop_recording(self):
with self.lock:
if self.state != State.RECORDING:
return
self.state = State.TRANSCRIBING
threading.Thread(target=self._stop_and_process, daemon=True).start()
def _lock_single_instance():
runtime_dir = Path(os.getenv("XDG_RUNTIME_DIR", "/tmp")) / "lel"
runtime_dir.mkdir(parents=True, exist_ok=True)
lock_path = runtime_dir / "lel.lock"
f = open(lock_path, "w")
try:
import fcntl
fcntl.flock(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
except Exception:
raise SystemExit("another instance is running")
return f
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--config", default="", help="path to config.json")
parser.add_argument("--no-tray", action="store_true", help="disable tray icon")
parser.add_argument("--dry-run", action="store_true", help="log hotkey only")
args = parser.parse_args()
logging.basicConfig(stream=sys.stderr, level=logging.INFO, format="leld: %(asctime)s %(message)s")
cfg = load(args.config)
_lock_single_instance()
logging.info("ready (hotkey: %s)", cfg.hotkey)
logging.info("config (%s):\n%s", args.config or str(Path.home() / ".config" / "lel" / "config.json"), json.dumps(redacted_dict(cfg), indent=2))
daemon = Daemon(cfg)
def on_quit():
os._exit(0)
def handle_signal(_sig, _frame):
logging.info("signal received, shutting down")
daemon.stop_recording()
end = time.time() + 5
while time.time() < end and daemon.get_state() != State.IDLE:
time.sleep(0.1)
os._exit(0)
signal.signal(signal.SIGINT, handle_signal)
signal.signal(signal.SIGTERM, handle_signal)
if args.no_tray:
listen(cfg.hotkey, lambda: logging.info("hotkey pressed (dry-run)") if args.dry_run else daemon.toggle())
return
threading.Thread(target=lambda: listen(cfg.hotkey, lambda: logging.info("hotkey pressed (dry-run)") if args.dry_run else daemon.toggle()), daemon=True).start()
run_tray(daemon.get_state, on_quit)
if __name__ == "__main__":
main()

70
src/recorder.py Normal file
View file

@ -0,0 +1,70 @@
import os
import signal
import subprocess
import tempfile
import time
from dataclasses import dataclass
from pathlib import Path
@dataclass
class RecordResult:
wav_path: str
temp_dir: str
def _resolve_ffmpeg_path(explicit: str) -> str:
if explicit:
return explicit
appdir = os.getenv("APPDIR")
if appdir:
candidate = Path(appdir) / "usr" / "bin" / "ffmpeg"
if candidate.exists():
return str(candidate)
return "ffmpeg"
def _ffmpeg_input_args(spec: str) -> list[str]:
if not spec:
spec = "pulse:default"
kind = spec
name = "default"
if ":" in spec:
kind, name = spec.split(":", 1)
return ["-f", kind, "-i", name]
def start_recording(ffmpeg_input: str, ffmpeg_path: str) -> tuple[subprocess.Popen, RecordResult]:
tmpdir = tempfile.mkdtemp(prefix="lel-")
wav = str(Path(tmpdir) / "mic.wav")
args = ["-hide_banner", "-loglevel", "error"]
args += _ffmpeg_input_args(ffmpeg_input)
args += ["-ac", "1", "-ar", "16000", "-c:a", "pcm_s16le", wav]
proc = subprocess.Popen(
[_resolve_ffmpeg_path(ffmpeg_path), *args],
preexec_fn=os.setsid,
)
return proc, RecordResult(wav_path=wav, temp_dir=tmpdir)
def stop_recording(proc: subprocess.Popen, timeout_sec: float = 5.0) -> None:
if proc.poll() is None:
try:
os.killpg(proc.pid, signal.SIGINT)
except ProcessLookupError:
return
start = time.time()
while proc.poll() is None:
if time.time() - start > timeout_sec:
try:
os.killpg(proc.pid, signal.SIGKILL)
except ProcessLookupError:
pass
break
time.sleep(0.05)
# ffmpeg returns 255 on SIGINT; treat as success
if proc.returncode not in (0, 255, None):
raise RuntimeError(f"ffmpeg exited with status {proc.returncode}")

25
src/stt.py Normal file
View file

@ -0,0 +1,25 @@
import os
import whisper
def _force_cpu():
os.environ.setdefault("CUDA_VISIBLE_DEVICES", "")
class WhisperSTT:
def __init__(self, model: str, language: str | None = None, device: str = "cpu"):
self.model_name = model
self.language = language
self.device = (device or "cpu").lower()
self._model = None
def _load(self):
if self._model is None:
if self.device == "cpu":
_force_cpu()
self._model = whisper.load_model(self.model_name, device=self.device)
def transcribe(self, wav_path: str) -> str:
self._load()
result = self._model.transcribe(wav_path, language=self.language)
return (result.get("text") or "").strip()

16
src/system_prompt.txt Normal file
View file

@ -0,0 +1,16 @@
You are a deterministic text transcription cleaning engine.
You transform speech transcripts into clean written text while keeping its meaning.
Follow these rules strictly:
1. Remove filler words (um, uh, like, okay so).
2. Resolve self-corrections by keeping ONLY the final version.
Examples:
- "schedule that for 5 PM, I mean 4 PM" -> "schedule that for 4 PM"
- "let's ask Bob, I mean Janice, let's ask Janice" -> "let's ask Janice"
3. Fix grammar, capitalization, and punctuation.
4. Do NOT add new content.
5. Do NOT remove real content.
6. Do NOT rewrite stylistically.
7. Preserve meaning exactly.
Return ONLY the cleaned text. No explanations.

52
src/tray.py Normal file
View file

@ -0,0 +1,52 @@
from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
from threading import Thread
import pystray
from PIL import Image
@dataclass
class TrayIcons:
idle: Image.Image
recording: Image.Image
transcribing: Image.Image
processing: Image.Image
def load_icons() -> TrayIcons:
base = Path(__file__).parent / "assets"
return TrayIcons(
idle=Image.open(base / "idle.png"),
recording=Image.open(base / "recording.png"),
transcribing=Image.open(base / "transcribing.png"),
processing=Image.open(base / "processing.png"),
)
def run_tray(state_getter, on_quit):
icons = load_icons()
icon = pystray.Icon("lel", icons.idle, "lel")
def update():
while True:
state = state_getter()
if state == "recording":
icon.icon = icons.recording
icon.title = "Recording"
elif state == "transcribing":
icon.icon = icons.transcribing
icon.title = "Transcribing"
elif state == "processing":
icon.icon = icons.processing
icon.title = "AI Processing"
else:
icon.icon = icons.idle
icon.title = "Idle"
icon.update_menu()
icon.menu = pystray.Menu(pystray.MenuItem("Quit", lambda: on_quit()))
Thread(target=update, daemon=True).start()
icon.run()

67
src/x11_hotkey.py Normal file
View file

@ -0,0 +1,67 @@
from Xlib import X, display
from Xlib import XK
MOD_MAP = {
"shift": X.ShiftMask,
"ctrl": X.ControlMask,
"control": X.ControlMask,
"alt": X.Mod1Mask,
"mod1": X.Mod1Mask,
"super": X.Mod4Mask,
"mod4": X.Mod4Mask,
"cmd": X.Mod4Mask,
"command": X.Mod4Mask,
}
def parse_hotkey(hotkey: str):
parts = [p.strip() for p in hotkey.split("+") if p.strip()]
mods = 0
key_part = None
for p in parts:
low = p.lower()
if low in MOD_MAP:
mods |= MOD_MAP[low]
else:
key_part = p
if not key_part:
raise ValueError("hotkey missing key")
keysym = XK.string_to_keysym(key_part)
if keysym == 0 and len(key_part) == 1:
keysym = ord(key_part)
if keysym == 0:
raise ValueError(f"unsupported key: {key_part}")
return mods, keysym
def grab_hotkey(disp, root, mods, keysym):
keycode = disp.keysym_to_keycode(keysym)
root.grab_key(keycode, mods, True, X.GrabModeAsync, X.GrabModeAsync)
# ignore CapsLock/NumLock
root.grab_key(keycode, mods | X.LockMask, True, X.GrabModeAsync, X.GrabModeAsync)
root.grab_key(keycode, mods | X.Mod2Mask, True, X.GrabModeAsync, X.GrabModeAsync)
root.grab_key(keycode, mods | X.LockMask | X.Mod2Mask, True, X.GrabModeAsync, X.GrabModeAsync)
disp.sync()
return keycode
def listen(hotkey: str, on_trigger):
disp = display.Display()
root = disp.screen().root
mods, keysym = parse_hotkey(hotkey)
keycode = grab_hotkey(disp, root, mods, keysym)
try:
while True:
ev = disp.next_event()
if ev.type == X.KeyPress and ev.detail == keycode:
state = ev.state & ~(X.LockMask | X.Mod2Mask)
if state == mods:
on_trigger()
finally:
try:
root.ungrab_key(keycode, X.AnyModifier)
disp.sync()
except Exception:
pass