Add settings history and quick AI chain

This commit is contained in:
Thales Maciel 2026-02-09 13:45:07 -03:00
parent 328dcec458
commit a0c3b02ab1
13 changed files with 1627 additions and 23 deletions

View file

@ -7,6 +7,7 @@ Python X11 transcription daemon that records audio, runs Whisper, logs the trans
- X11 (not Wayland)
- `ffmpeg`
- `faster-whisper`
- `pactl` (PulseAudio utilities for mic selection)
- Tray icon deps: `gtk3`
- i3 window manager (focus metadata via i3 IPC)
- Python deps: `pillow`, `python-xlib`, `faster-whisper`, `PyGObject`, `i3ipc`
@ -25,6 +26,12 @@ Run:
python3 src/leld.py --config ~/.config/lel/config.json
```
Open settings:
```bash
python3 src/leld.py --settings --config ~/.config/lel/config.json
```
## Config
Create `~/.config/lel/config.json`:
@ -32,13 +39,21 @@ Create `~/.config/lel/config.json`:
```json
{
"hotkey": "Cmd+m",
"edit_hotkey": "Cmd+n",
"ffmpeg_input": "pulse:default",
"ffmpeg_path": "",
"whisper_model": "base",
"whisper_lang": "en",
"whisper_device": "cpu",
"record_timeout_sec": 120,
"edit_record_timeout_sec": 120,
"injection_backend": "clipboard",
"edit_injection_backend": "clipboard",
"languages": {
"en": { "code": "en", "hotkey": "Cmd+m", "label": "English" },
"ptBR": { "code": "pt-BR", "hotkey": "Cmd+b", "label": "Português (Brasil)" }
},
"edit_language_detection": { "enabled": true, "provider": "langdetect", "fallback_code": "en" },
"context_capture": {
"provider": "i3ipc",
@ -63,7 +78,11 @@ Create `~/.config/lel/config.json`:
"ai_system_prompt_file": "",
"ai_base_url": "http://localhost:11434/v1/chat/completions",
"ai_api_key": "",
"ai_timeout_sec": 20
"ai_timeout_sec": 20,
"edit_ai_enabled": true,
"edit_ai_temperature": 0.0,
"edit_ai_system_prompt_file": "",
"edit_window": { "width": 800, "height": 400 }
}
```
@ -72,10 +91,13 @@ Env overrides:
- `WHISPER_MODEL`, `WHISPER_LANG`, `WHISPER_DEVICE`
- `WHISPER_FFMPEG_IN`
- `LEL_RECORD_TIMEOUT_SEC`, `LEL_HOTKEY`, `LEL_INJECTION_BACKEND`
- `LEL_EDIT_RECORD_TIMEOUT_SEC`, `LEL_EDIT_HOTKEY`, `LEL_EDIT_INJECTION_BACKEND`
- `LEL_FFMPEG_PATH`
- `LEL_AI_ENABLED`, `LEL_AI_MODEL`, `LEL_AI_TEMPERATURE`, `LEL_AI_SYSTEM_PROMPT_FILE`
- `LEL_AI_BASE_URL`, `LEL_AI_API_KEY`, `LEL_AI_TIMEOUT_SEC`
- `LEL_EDIT_AI_ENABLED`, `LEL_EDIT_AI_TEMPERATURE`, `LEL_EDIT_AI_SYSTEM_PROMPT_FILE`
- `LEL_CONTEXT_PROVIDER`, `LEL_CONTEXT_ON_FOCUS_CHANGE`
- `LEL_LANGUAGES_JSON`, `LEL_EDIT_LANG_FALLBACK`
## systemd user service
@ -92,6 +114,14 @@ systemctl --user enable --now lel
- Press the hotkey once to start recording.
- Press it again to stop and transcribe.
- The transcript is logged to stderr.
- Press the edit hotkey to open the edit window; click Apply to edit using spoken instructions.
- Default language hotkeys: English `Cmd+m`, Portuguese (Brazil) `Cmd+b`.
Edit workflow notes:
- Uses the X11 primary selection (currently selected text).
- Opens a floating GTK window with the selected text.
- Records your spoken edit instruction until you click Apply.
Injection backends:

View file

@ -3,3 +3,4 @@ pillow
python-xlib
PyGObject
i3ipc
langdetect

View file

@ -23,6 +23,8 @@ class AIConfig:
base_url: str
api_key: str
timeout_sec: int
language_hint: str | None = None
wrap_transcript: bool = True
class GenericAPIProcessor:
@ -31,11 +33,18 @@ class GenericAPIProcessor:
self.system = load_system_prompt(cfg.system_prompt_file)
def process(self, text: str) -> str:
language = self.cfg.language_hint or ""
if self.cfg.wrap_transcript:
user_content = f"<transcript>{text}</transcript>"
else:
user_content = text
if language:
user_content = f"<language>{language}</language>\n{user_content}"
payload = {
"model": self.cfg.model,
"messages": [
{"role": "system", "content": self.system},
{"role": "user", "content": f"<transcript>{text}</transcript>"},
{"role": "user", "content": user_content},
],
"temperature": self.cfg.temperature,
}
@ -70,6 +79,34 @@ def build_processor(cfg: AIConfig) -> GenericAPIProcessor:
return GenericAPIProcessor(cfg)
def list_models(base_url: str, api_key: str = "", timeout_sec: int = 10) -> list[str]:
if not base_url:
return []
url = _models_url(base_url)
req = urllib.request.Request(url, method="GET")
if api_key:
req.add_header("Authorization", f"Bearer {api_key}")
try:
with urllib.request.urlopen(req, timeout=timeout_sec) as resp:
body = resp.read()
data = json.loads(body.decode("utf-8"))
models = []
for item in data.get("data", []):
model_id = item.get("id")
if model_id:
models.append(model_id)
return models
except Exception:
return []
def _models_url(base_url: str) -> str:
if "/v1/" in base_url:
root = base_url.split("/v1/")[0]
return root.rstrip("/") + "/v1/models"
return base_url.rstrip("/") + "/v1/models"
def _read_text(arg_text: str) -> str:
if arg_text:
return arg_text

View file

@ -11,6 +11,7 @@ def _parse_bool(val: str) -> bool:
@dataclass
class Config:
hotkey: str = "Cmd+m"
edit_hotkey: str = "Cmd+n"
ffmpeg_input: str = "pulse:default"
ffmpeg_path: str = ""
@ -19,8 +20,10 @@ class Config:
whisper_device: str = "cpu"
record_timeout_sec: int = 120
edit_record_timeout_sec: int = 120
injection_backend: str = "clipboard"
edit_injection_backend: str = "clipboard"
ai_enabled: bool = False
ai_model: str = "llama3.2:3b"
@ -29,10 +32,22 @@ class Config:
ai_base_url: str = "http://localhost:11434/v1/chat/completions"
ai_api_key: str = ""
ai_timeout_sec: int = 20
edit_ai_enabled: bool = True
edit_ai_temperature: float = 0.0
edit_ai_system_prompt_file: str = ""
edit_window: dict = field(default_factory=lambda: {"width": 800, "height": 400})
context_capture: dict = field(default_factory=lambda: {"provider": "i3ipc", "on_focus_change": "abort"})
context_rules: list[dict] = field(default_factory=list)
languages: dict = field(
default_factory=lambda: {
"en": {"code": "en", "hotkey": "Cmd+m", "label": "English"},
"ptBR": {"code": "pt-BR", "hotkey": "Cmd+b", "label": "Português (Brasil)"},
}
)
edit_language_detection: dict = field(default_factory=lambda: {"enabled": True, "provider": "langdetect", "fallback_code": "en"})
def default_path() -> Path:
return Path.home() / ".config" / "lel" / "config.json"
@ -66,10 +81,16 @@ def load(path: str | None) -> Config:
cfg.ffmpeg_path = os.environ["LEL_FFMPEG_PATH"]
if os.getenv("LEL_RECORD_TIMEOUT_SEC"):
cfg.record_timeout_sec = int(os.environ["LEL_RECORD_TIMEOUT_SEC"])
if os.getenv("LEL_EDIT_RECORD_TIMEOUT_SEC"):
cfg.edit_record_timeout_sec = int(os.environ["LEL_EDIT_RECORD_TIMEOUT_SEC"])
if os.getenv("LEL_HOTKEY"):
cfg.hotkey = os.environ["LEL_HOTKEY"]
if os.getenv("LEL_EDIT_HOTKEY"):
cfg.edit_hotkey = os.environ["LEL_EDIT_HOTKEY"]
if os.getenv("LEL_INJECTION_BACKEND"):
cfg.injection_backend = os.environ["LEL_INJECTION_BACKEND"]
if os.getenv("LEL_EDIT_INJECTION_BACKEND"):
cfg.edit_injection_backend = os.environ["LEL_EDIT_INJECTION_BACKEND"]
if os.getenv("LEL_AI_ENABLED"):
cfg.ai_enabled = _parse_bool(os.environ["LEL_AI_ENABLED"])
@ -85,22 +106,24 @@ def load(path: str | None) -> Config:
cfg.ai_api_key = os.environ["LEL_AI_API_KEY"]
if os.getenv("LEL_AI_TIMEOUT_SEC"):
cfg.ai_timeout_sec = int(os.environ["LEL_AI_TIMEOUT_SEC"])
if os.getenv("LEL_EDIT_AI_ENABLED"):
cfg.edit_ai_enabled = _parse_bool(os.environ["LEL_EDIT_AI_ENABLED"])
if os.getenv("LEL_EDIT_AI_TEMPERATURE"):
cfg.edit_ai_temperature = float(os.environ["LEL_EDIT_AI_TEMPERATURE"])
if os.getenv("LEL_EDIT_AI_SYSTEM_PROMPT_FILE"):
cfg.edit_ai_system_prompt_file = os.environ["LEL_EDIT_AI_SYSTEM_PROMPT_FILE"]
if os.getenv("LEL_LANGUAGES_JSON"):
cfg.languages = json.loads(os.environ["LEL_LANGUAGES_JSON"])
if os.getenv("LEL_EDIT_LANG_FALLBACK"):
cfg.edit_language_detection["fallback_code"] = os.environ["LEL_EDIT_LANG_FALLBACK"]
if os.getenv("LEL_CONTEXT_PROVIDER"):
cfg.context_capture["provider"] = os.environ["LEL_CONTEXT_PROVIDER"]
if os.getenv("LEL_CONTEXT_ON_FOCUS_CHANGE"):
cfg.context_capture["on_focus_change"] = os.environ["LEL_CONTEXT_ON_FOCUS_CHANGE"]
if not cfg.hotkey:
raise ValueError("hotkey cannot be empty")
if cfg.record_timeout_sec <= 0:
raise ValueError("record_timeout_sec must be > 0")
if cfg.context_capture.get("provider") not in {"i3ipc"}:
raise ValueError("context_capture.provider must be i3ipc")
if cfg.context_capture.get("on_focus_change") not in {"abort"}:
raise ValueError("context_capture.on_focus_change must be abort")
if not isinstance(cfg.context_rules, list):
cfg.context_rules = []
validate(cfg)
return cfg
@ -108,3 +131,39 @@ def redacted_dict(cfg: Config) -> dict:
d = cfg.__dict__.copy()
d["ai_api_key"] = ""
return d
def validate(cfg: Config) -> None:
if not cfg.hotkey:
raise ValueError("hotkey cannot be empty")
if not cfg.edit_hotkey:
raise ValueError("edit_hotkey cannot be empty")
if cfg.record_timeout_sec <= 0:
raise ValueError("record_timeout_sec must be > 0")
if cfg.edit_record_timeout_sec <= 0:
raise ValueError("edit_record_timeout_sec must be > 0")
if cfg.context_capture.get("provider") not in {"i3ipc"}:
raise ValueError("context_capture.provider must be i3ipc")
if cfg.context_capture.get("on_focus_change") not in {"abort"}:
raise ValueError("context_capture.on_focus_change must be abort")
if not isinstance(cfg.context_rules, list):
cfg.context_rules = []
if not isinstance(cfg.edit_window, dict):
cfg.edit_window = {"width": 800, "height": 400}
if not isinstance(cfg.languages, dict) or not cfg.languages:
raise ValueError("languages must be a non-empty map")
seen_hotkeys = set()
for name, info in cfg.languages.items():
if not isinstance(info, dict):
raise ValueError(f"languages[{name}] must be an object")
code = info.get("code")
hotkey = info.get("hotkey")
if not code or not hotkey:
raise ValueError(f"languages[{name}] must include code and hotkey")
if hotkey in seen_hotkeys:
raise ValueError(f"duplicate hotkey in languages: {hotkey}")
seen_hotkeys.add(hotkey)
if not isinstance(cfg.edit_language_detection, dict):
cfg.edit_language_detection = {"enabled": True, "provider": "langdetect", "fallback_code": "en"}
if cfg.edit_language_detection.get("provider") not in {"langdetect"}:
raise ValueError("edit_language_detection.provider must be langdetect")

101
src/edit_window.py Normal file
View file

@ -0,0 +1,101 @@
from __future__ import annotations
import threading
from dataclasses import dataclass
import gi
gi.require_version("Gtk", "3.0")
gi.require_version("Gdk", "3.0")
from gi.repository import Gdk, GLib, Gtk
@dataclass
class EditWindowConfig:
width: int = 800
height: int = 400
class EditWindow:
def __init__(self, text: str, cfg: EditWindowConfig, on_apply, on_copy_close):
self.on_apply = on_apply
self.on_copy_close = on_copy_close
self.window = Gtk.Window(title="lel edit")
self.window.set_default_size(cfg.width, cfg.height)
self.window.set_keep_above(True)
self.window.set_position(Gtk.WindowPosition.CENTER)
self.window.set_type_hint(Gdk.WindowTypeHint.DIALOG)
self.window.connect("delete-event", self._on_close)
self.status = Gtk.Label(label="Listening...")
self.status.set_xalign(0.0)
scrolled = Gtk.ScrolledWindow()
scrolled.set_hexpand(True)
scrolled.set_vexpand(True)
self.textview = Gtk.TextView()
self.textview.set_wrap_mode(Gtk.WrapMode.WORD_CHAR)
buffer = self.textview.get_buffer()
buffer.set_text(text)
scrolled.add(self.textview)
apply_btn = Gtk.Button(label="Apply")
apply_btn.connect("clicked", self._on_apply)
button_box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=8)
button_box.pack_end(apply_btn, False, False, 0)
vbox = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=8)
vbox.set_border_width(12)
vbox.pack_start(self.status, False, False, 0)
vbox.pack_start(scrolled, True, True, 0)
vbox.pack_start(button_box, False, False, 0)
self.window.add(vbox)
self.window.show_all()
self.textview.grab_focus()
accel = Gtk.AccelGroup()
self.window.add_accel_group(accel)
key, mod = Gtk.accelerator_parse("<Ctrl>c")
accel.connect(key, mod, Gtk.AccelFlags.VISIBLE, self._on_copy)
def _on_apply(self, *_args):
self.on_apply(self.get_text())
def _on_copy(self, *_args):
self.on_copy_close(self.get_text())
return True
def _on_close(self, *_args):
self.on_copy_close("")
return True
def get_text(self) -> str:
buf = self.textview.get_buffer()
start, end = buf.get_bounds()
return buf.get_text(start, end, True)
def set_status(self, text: str) -> None:
self.status.set_text(text)
def close(self) -> None:
self.window.destroy()
def open_edit_window(text: str, cfg: EditWindowConfig, on_apply, on_copy_close) -> EditWindow:
holder: dict[str, EditWindow] = {}
ready = threading.Event()
def _create():
holder["win"] = EditWindow(text, cfg, on_apply, on_copy_close)
ready.set()
return False
GLib.idle_add(_create)
if not ready.wait(timeout=2.0):
raise RuntimeError("GTK main loop not running; cannot open edit window")
return holder["win"]

141
src/history.py Normal file
View file

@ -0,0 +1,141 @@
from __future__ import annotations
import json
import sqlite3
import time
from dataclasses import asdict
from pathlib import Path
from config import redacted_dict
def _default_db_path() -> Path:
return Path.home() / ".local" / "share" / "lel" / "history.db"
class HistoryStore:
def __init__(self, path: Path | None = None):
self.path = path or _default_db_path()
self.path.parent.mkdir(parents=True, exist_ok=True)
self.conn = sqlite3.connect(str(self.path), check_same_thread=False)
self._init_db()
def _init_db(self):
cur = self.conn.cursor()
cur.execute(
"""
CREATE TABLE IF NOT EXISTS runs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
created_at REAL NOT NULL,
phase TEXT NOT NULL,
status TEXT NOT NULL,
config_json TEXT,
context_json TEXT
)
"""
)
cur.execute(
"""
CREATE TABLE IF NOT EXISTS artifacts (
run_id INTEGER NOT NULL,
kind TEXT NOT NULL,
data_json TEXT,
file_path TEXT,
created_at REAL NOT NULL,
FOREIGN KEY(run_id) REFERENCES runs(id)
)
"""
)
self.conn.commit()
def add_run(self, phase: str, status: str, config, context: dict | None = None) -> int:
cur = self.conn.cursor()
cur.execute(
"INSERT INTO runs (created_at, phase, status, config_json, context_json) VALUES (?, ?, ?, ?, ?)",
(
time.time(),
phase,
status,
json.dumps(redacted_dict(config)) if config else None,
json.dumps(context) if context else None,
),
)
self.conn.commit()
return int(cur.lastrowid)
def add_artifact(self, run_id: int, kind: str, data: dict | None = None, file_path: str | None = None):
cur = self.conn.cursor()
cur.execute(
"INSERT INTO artifacts (run_id, kind, data_json, file_path, created_at) VALUES (?, ?, ?, ?, ?)",
(
run_id,
kind,
json.dumps(data) if data is not None else None,
file_path,
time.time(),
),
)
self.conn.commit()
def list_runs(self, phase: str | None = None, limit: int = 200) -> list[dict]:
cur = self.conn.cursor()
if phase:
cur.execute(
"SELECT id, created_at, phase, status, config_json, context_json FROM runs WHERE phase = ? ORDER BY id DESC LIMIT ?",
(phase, limit),
)
else:
cur.execute(
"SELECT id, created_at, phase, status, config_json, context_json FROM runs ORDER BY id DESC LIMIT ?",
(limit,),
)
rows = []
for row in cur.fetchall():
rows.append(
{
"id": row[0],
"created_at": row[1],
"phase": row[2],
"status": row[3],
"config": json.loads(row[4]) if row[4] else None,
"context": json.loads(row[5]) if row[5] else None,
}
)
return rows
def list_artifacts(self, run_id: int) -> list[dict]:
cur = self.conn.cursor()
cur.execute(
"SELECT kind, data_json, file_path, created_at FROM artifacts WHERE run_id = ? ORDER BY created_at ASC",
(run_id,),
)
out = []
for row in cur.fetchall():
out.append(
{
"kind": row[0],
"data": json.loads(row[1]) if row[1] else None,
"file_path": row[2],
"created_at": row[3],
}
)
return out
def prune(self, limit_per_phase: int = 1000):
cur = self.conn.cursor()
cur.execute("SELECT DISTINCT phase FROM runs")
phases = [r[0] for r in cur.fetchall()]
for phase in phases:
cur.execute("SELECT id FROM runs WHERE phase = ? ORDER BY id DESC LIMIT ?", (phase, limit_per_phase))
keep_ids = [r[0] for r in cur.fetchall()]
if not keep_ids:
continue
cur.execute(
"DELETE FROM runs WHERE phase = ? AND id NOT IN (%s)" % ",".join("?" * len(keep_ids)),
(phase, *keep_ids),
)
cur.execute(
"DELETE FROM artifacts WHERE run_id NOT IN (%s)" % ",".join("?" * len(keep_ids)),
(*keep_ids,),
)
self.conn.commit()

24
src/language.py Normal file
View file

@ -0,0 +1,24 @@
from __future__ import annotations
from langdetect import DetectorFactory, detect
DetectorFactory.seed = 0
def detect_language(text: str, fallback: str = "en") -> str:
cleaned = (text or "").strip()
if not cleaned:
return fallback
try:
code = detect(cleaned)
except Exception:
return fallback
return _normalize(code) or fallback
def _normalize(code: str) -> str:
if not code:
return ""
if code.lower() == "pt":
return "pt-BR"
return code

View file

@ -14,9 +14,14 @@ from recorder import start_recording, stop_recording
from stt import FasterWhisperSTT, STTConfig
from aiprocess import AIConfig, build_processor
from context import ContextRule, I3Provider, match_rule
from inject import inject
from edit_window import EditWindowConfig, open_edit_window
from inject import inject, write_clipboard
from history import HistoryStore
from language import detect_language
from selection import read_primary_selection
from x11_hotkey import listen
from tray import run_tray
from settings_window import open_settings_window
class State:
@ -24,18 +29,28 @@ class State:
RECORDING = "recording"
TRANSCRIBING = "transcribing"
PROCESSING = "processing"
EDITING = "editing"
EDIT_PROCESSING = "edit_processing"
OUTPUTTING = "outputting"
class Daemon:
def __init__(self, cfg: Config):
self.cfg = cfg
self.history = HistoryStore()
self.history.prune(1000)
self.lock = threading.Lock()
self.state = State.IDLE
self.proc = None
self.record = None
self.timer = None
self.active_language = cfg.whisper_lang
self.context = None
self.edit_proc = None
self.edit_record = None
self.edit_timer = None
self.edit_context = None
self.edit_window = None
self.context_provider = None
if cfg.context_capture.get("provider") == "i3ipc":
self.context_provider = I3Provider()
@ -45,7 +60,7 @@ class Daemon:
self.stt = FasterWhisperSTT(
STTConfig(
model=cfg.whisper_model,
language=cfg.whisper_lang,
language=None,
device=cfg.whisper_device,
vad_filter=True,
)
@ -54,15 +69,19 @@ class Daemon:
def set_state(self, state: str):
with self.lock:
prev = self.state
self.state = state
if prev != state:
logging.info("state: %s -> %s", prev, state)
def get_state(self):
with self.lock:
return self.state
def toggle(self):
def toggle(self, language_code: str | None = None):
with self.lock:
if self.state == State.IDLE:
self.active_language = language_code or self.cfg.whisper_lang
self._start_recording_locked()
return
if self.state == State.RECORDING:
@ -71,6 +90,14 @@ class Daemon:
return
logging.info("busy (%s), trigger ignored", self.state)
def edit_trigger(self):
with self.lock:
if self.state != State.IDLE:
logging.info("busy (%s), edit trigger ignored", self.state)
return
self.state = State.EDITING
threading.Thread(target=self._start_edit_flow, daemon=True).start()
def _start_recording_locked(self):
try:
proc, record = start_recording(self.cfg.ffmpeg_input, self.cfg.ffmpeg_path)
@ -83,10 +110,23 @@ class Daemon:
except Exception as exc:
logging.error("context capture failed: %s", exc)
self.context = None
if self.context:
logging.info(
"context: id=%s app_id=%s class=%s instance=%s title=%s",
self.context.window_id,
self.context.app_id,
self.context.klass,
self.context.instance,
self.context.title,
)
else:
logging.info("context: none")
self.proc = proc
self.record = record
self.state = State.RECORDING
logging.info("recording started (%s)", record.wav_path)
run_id = self.history.add_run("record", "started", self.cfg, self._context_json(self.context))
self.history.add_artifact(run_id, "audio", {"path": record.wav_path}, record.wav_path)
if self.timer:
self.timer.cancel()
self.timer = threading.Timer(self.cfg.record_timeout_sec, self._timeout_stop)
@ -128,13 +168,17 @@ class Daemon:
try:
self.set_state(State.TRANSCRIBING)
text = self.stt.transcribe(record.wav_path)
logging.info("transcribing started")
text = self.stt.transcribe(record.wav_path, language=self.active_language)
except Exception as exc:
logging.error("stt failed: %s", exc)
self.set_state(State.IDLE)
return
logging.info("stt: %s", text)
run_id = self.history.add_run("stt", "ok", self.cfg, self._context_json(self.context))
self.history.add_artifact(run_id, "input", {"wav_path": record.wav_path, "language": self.active_language})
self.history.add_artifact(run_id, "output", {"text": text})
rule = match_rule(self.context, self.context_rules) if self.context else None
if rule:
@ -149,6 +193,7 @@ class Daemon:
if ai_enabled:
self.set_state(State.PROCESSING)
logging.info("ai processing started")
try:
processor = build_processor(
AIConfig(
@ -158,9 +203,18 @@ class Daemon:
base_url=self.cfg.ai_base_url,
api_key=self.cfg.ai_api_key,
timeout_sec=self.cfg.ai_timeout_sec,
language_hint=self.active_language,
)
)
text = processor.process(text) or text
ai_input = text
text = processor.process(ai_input) or text
run_id = self.history.add_run("ai", "ok", self.cfg, self._context_json(self.context))
self.history.add_artifact(
run_id,
"input",
{"text": ai_input, "model": self.cfg.ai_model, "temperature": self.cfg.ai_temperature},
)
self.history.add_artifact(run_id, "output", {"text": text})
except Exception as exc:
logging.error("ai process failed: %s", exc)
@ -168,6 +222,7 @@ class Daemon:
try:
self.set_state(State.OUTPUTTING)
logging.info("outputting started")
if self.context_provider and self.context:
if not self.context_provider.is_same_focus(self.context):
logging.info("focus changed, aborting injection")
@ -177,11 +232,216 @@ class Daemon:
if rule and rule.injection_backend:
backend = rule.injection_backend
inject(text, backend)
run_id = self.history.add_run("inject", "ok", self.cfg, self._context_json(self.context))
self.history.add_artifact(run_id, "input", {"text": text, "backend": backend})
except Exception as exc:
logging.error("output failed: %s", exc)
finally:
self.set_state(State.IDLE)
def _start_edit_flow(self):
try:
text = read_primary_selection()
except Exception as exc:
logging.error("selection capture failed: %s", exc)
self.set_state(State.IDLE)
return
text = (text or "").strip()
if not text:
logging.info("selection empty, aborting edit")
self.set_state(State.IDLE)
return
edit_language = self.cfg.edit_language_detection.get("fallback_code", self.cfg.whisper_lang)
if self.cfg.edit_language_detection.get("enabled"):
edit_language = detect_language(text, fallback=edit_language)
self.active_language = edit_language
try:
if self.context_provider:
self.edit_context = self.context_provider.capture()
except Exception as exc:
logging.error("context capture failed: %s", exc)
self.edit_context = None
if self.edit_context:
logging.info(
"edit context: id=%s app_id=%s class=%s instance=%s title=%s",
self.edit_context.window_id,
self.edit_context.app_id,
self.edit_context.klass,
self.edit_context.instance,
self.edit_context.title,
)
else:
logging.info("edit context: none")
try:
proc, record = start_recording(self.cfg.ffmpeg_input, self.cfg.ffmpeg_path)
except Exception as exc:
logging.error("record start failed: %s", exc)
self.set_state(State.IDLE)
return
self.edit_proc = proc
self.edit_record = record
logging.info("edit recording started (%s)", record.wav_path)
run_id = self.history.add_run("record", "started", self.cfg, self._context_json(self.edit_context))
self.history.add_artifact(run_id, "audio", {"path": record.wav_path}, record.wav_path)
if self.edit_timer:
self.edit_timer.cancel()
self.edit_timer = threading.Timer(self.cfg.edit_record_timeout_sec, self._edit_timeout_stop)
self.edit_timer.daemon = True
self.edit_timer.start()
try:
self.edit_window = open_edit_window(
text,
EditWindowConfig(**self.cfg.edit_window),
self._on_edit_apply,
self._on_edit_copy_close,
)
except Exception as exc:
logging.error("edit window failed: %s", exc)
self._abort_edit()
return
def _edit_timeout_stop(self):
logging.info("edit recording timeout")
self._on_edit_apply(self._edit_get_text())
def _edit_get_text(self) -> str:
if not self.edit_window:
return ""
return self.edit_window.get_text()
def _on_edit_copy_close(self, text: str):
if text:
try:
write_clipboard(text)
except Exception as exc:
logging.error("copy failed: %s", exc)
self._abort_edit()
def _on_edit_apply(self, text: str):
if self.state != State.EDITING:
return
self.set_state(State.EDIT_PROCESSING)
threading.Thread(target=self._stop_and_process_edit, args=(text,), daemon=True).start()
def _stop_and_process_edit(self, base_text: str):
proc = self.edit_proc
record = self.edit_record
self.edit_proc = None
self.edit_record = None
if self.edit_timer:
self.edit_timer.cancel()
self.edit_timer = None
if not proc or not record:
self._abort_edit()
return
try:
stop_recording(proc)
except Exception as exc:
logging.error("record stop failed: %s", exc)
self._abort_edit()
return
if not Path(record.wav_path).exists():
logging.error("no audio captured")
self._abort_edit()
return
try:
logging.info("edit transcribing started")
instruction = self.stt.transcribe(record.wav_path, language=self.active_language)
except Exception as exc:
logging.error("stt failed: %s", exc)
self._abort_edit()
return
logging.info("edit instruction: %s", instruction)
run_id = self.history.add_run("stt", "ok", self.cfg, self._context_json(self.edit_context))
self.history.add_artifact(run_id, "input", {"wav_path": record.wav_path, "language": self.active_language})
self.history.add_artifact(run_id, "output", {"text": instruction})
result = base_text
if self.cfg.edit_ai_enabled:
try:
prompt_file = self.cfg.edit_ai_system_prompt_file
if not prompt_file:
prompt_file = str(Path(__file__).parent / "system_prompt_edit.txt")
processor = build_processor(
AIConfig(
model=self.cfg.ai_model,
temperature=self.cfg.edit_ai_temperature,
system_prompt_file=prompt_file,
base_url=self.cfg.ai_base_url,
api_key=self.cfg.ai_api_key,
timeout_sec=self.cfg.ai_timeout_sec,
language_hint=None,
wrap_transcript=False,
)
)
payload = f"<text>{base_text}</text>\n<instruction>{instruction}</instruction>"
result = processor.process(payload) or base_text
run_id = self.history.add_run("ai", "ok", self.cfg, self._context_json(self.edit_context))
self.history.add_artifact(
run_id,
"input",
{"text": payload, "model": self.cfg.ai_model, "temperature": self.cfg.edit_ai_temperature},
)
self.history.add_artifact(run_id, "output", {"text": result})
except Exception as exc:
logging.error("ai process failed: %s", exc)
logging.info("edit result: %s", result)
if self.edit_window:
self.edit_window.set_status("Applying...")
if self.context_provider and self.edit_context:
if not self.context_provider.focus_window(self.edit_context.window_id):
logging.info("original window missing, aborting edit injection")
self._abort_edit()
return
try:
inject(result, self.cfg.edit_injection_backend)
run_id = self.history.add_run("inject", "ok", self.cfg, self._context_json(self.edit_context))
self.history.add_artifact(run_id, "input", {"text": result, "backend": self.cfg.edit_injection_backend})
except Exception as exc:
logging.error("output failed: %s", exc)
finally:
self._abort_edit()
def _context_json(self, ctx):
if not ctx:
return None
return {
"window_id": ctx.window_id,
"app_id": ctx.app_id,
"class": ctx.klass,
"instance": ctx.instance,
"title": ctx.title,
}
def _abort_edit(self):
if self.edit_window:
try:
self.edit_window.close()
except Exception:
pass
self.edit_window = None
self.edit_proc = None
self.edit_record = None
self.edit_context = None
if self.edit_timer:
self.edit_timer.cancel()
self.edit_timer = None
self.set_state(State.IDLE)
def stop_recording(self):
with self.lock:
if self.state != State.RECORDING:
@ -209,15 +469,26 @@ def main():
parser = argparse.ArgumentParser()
parser.add_argument("--config", default="", help="path to config.json")
parser.add_argument("--no-tray", action="store_true", help="disable tray icon")
parser.add_argument("--settings", action="store_true", help="open settings window and exit")
parser.add_argument("--dry-run", action="store_true", help="log hotkey only")
args = parser.parse_args()
logging.basicConfig(stream=sys.stderr, level=logging.INFO, format="leld: %(asctime)s %(message)s")
cfg = load(args.config)
config_path = Path(args.config) if args.config else Path.home() / ".config" / "lel" / "config.json"
if args.settings:
open_settings_window(cfg, config_path)
import gi
gi.require_version("Gtk", "3.0")
from gi.repository import Gtk
Gtk.main()
return
_lock_single_instance()
logging.info("ready (hotkey: %s)", cfg.hotkey)
hotkeys = ", ".join(f"{name}={info.get('hotkey')}" for name, info in cfg.languages.items())
logging.info("ready (hotkeys: %s; edit: %s)", hotkeys, cfg.edit_hotkey)
logging.info("config (%s):\n%s", args.config or str(Path.home() / ".config" / "lel" / "config.json"), json.dumps(redacted_dict(cfg), indent=2))
daemon = Daemon(cfg)
@ -240,8 +511,18 @@ def main():
listen(cfg.hotkey, lambda: logging.info("hotkey pressed (dry-run)") if args.dry_run else daemon.toggle())
return
threading.Thread(target=lambda: listen(cfg.hotkey, lambda: logging.info("hotkey pressed (dry-run)") if args.dry_run else daemon.toggle()), daemon=True).start()
run_tray(daemon.get_state, on_quit)
for name, info in cfg.languages.items():
hotkey = info.get("hotkey")
code = info.get("code")
threading.Thread(
target=lambda h=hotkey, c=code: listen(
h,
lambda: logging.info("hotkey pressed (dry-run)") if args.dry_run else daemon.toggle(c),
),
daemon=True,
).start()
threading.Thread(target=lambda: listen(cfg.edit_hotkey, lambda: logging.info("edit hotkey pressed (dry-run)") if args.dry_run else daemon.edit_trigger()), daemon=True).start()
run_tray(daemon.get_state, on_quit, lambda: open_settings_window(load(args.config), config_path))
if __name__ == "__main__":

33
src/selection.py Normal file
View file

@ -0,0 +1,33 @@
from __future__ import annotations
import time
from Xlib import X, Xatom, display
def read_primary_selection(timeout_sec: float = 2.0) -> str:
disp = display.Display()
root = disp.screen().root
win = root.create_window(0, 0, 1, 1, 0, X.CopyFromParent)
utf8 = disp.intern_atom("UTF8_STRING")
prop = disp.intern_atom("LEL_SELECTION")
win.convert_selection(Xatom.PRIMARY, utf8, prop, X.CurrentTime)
disp.flush()
end = time.time() + timeout_sec
while time.time() < end:
if disp.pending_events():
ev = disp.next_event()
if ev.type == X.SelectionNotify:
if ev.property == X.NONE:
return ""
data = win.get_property(prop, X.AnyPropertyType, 0, 2**31 - 1)
if not data or data.value is None:
return ""
try:
return data.value.decode("utf-8", errors="ignore")
except Exception:
return ""
else:
time.sleep(0.01)
return ""

869
src/settings_window.py Normal file
View file

@ -0,0 +1,869 @@
from __future__ import annotations
import json
import subprocess
import time
from dataclasses import asdict
from pathlib import Path
import gi
gi.require_version("Gtk", "3.0")
gi.require_version("Gdk", "3.0")
from gi.repository import Gdk, Gtk
from config import Config, validate
from history import HistoryStore
from recorder import _resolve_ffmpeg_path
from aiprocess import list_models
class SettingsWindow:
def __init__(self, cfg: Config, config_path: Path):
self.cfg = cfg
self.config_path = config_path
self.history = HistoryStore()
self._model_cache: dict[str, list[str]] = {}
self.window = Gtk.Window(title="lel settings")
self.window.set_default_size(920, 700)
self.window.set_position(Gtk.WindowPosition.CENTER)
self.window.set_type_hint(Gdk.WindowTypeHint.DIALOG)
self.error_label = Gtk.Label()
self.error_label.set_xalign(0.0)
self.error_label.get_style_context().add_class("error")
self.notebook = Gtk.Notebook()
self.widgets: dict[str, Gtk.Widget] = {}
self._build_tabs()
btn_save = Gtk.Button(label="Save")
btn_save.connect("clicked", self._on_save)
btn_cancel = Gtk.Button(label="Cancel")
btn_cancel.connect("clicked", lambda *_: self.window.destroy())
btn_row = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=8)
btn_row.pack_end(btn_save, False, False, 0)
btn_row.pack_end(btn_cancel, False, False, 0)
vbox = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=8)
vbox.set_border_width(12)
vbox.pack_start(self.error_label, False, False, 0)
vbox.pack_start(self.notebook, True, True, 0)
vbox.pack_start(btn_row, False, False, 0)
self.window.add(vbox)
self.window.show_all()
def _refresh_history(self, *_args):
if not hasattr(self, "history_list"):
return
for row in self.history_list.get_children():
self.history_list.remove(row)
phase = self.widgets["history_phase"].get_active_text()
if phase == "all":
phase = None
runs = self.history.list_runs(phase=phase, limit=200)
for run in runs:
row = Gtk.ListBoxRow()
label = Gtk.Label(
label=f"#{run['id']} {run['phase']} {run['status']} {time.strftime('%H:%M:%S', time.localtime(run['created_at']))}"
)
label.set_xalign(0.0)
row.add(label)
row._run = run
self.history_list.add(row)
self.history_list.show_all()
def _on_history_select(self, _listbox, row):
if not row:
return
run = row._run
artifacts = self.history.list_artifacts(run["id"])
buf = self.history_detail.get_buffer()
buf.set_text(self._format_run(run, artifacts))
def _format_run(self, run: dict, artifacts: list[dict]) -> str:
lines = [f"Run #{run['id']} ({run['phase']})", f"Status: {run['status']}"]
if run.get("context"):
lines.append(f"Context: {run['context']}")
for art in artifacts:
lines.append(f"- {art['kind']}: {art.get('data') or art.get('file_path')}")
return "\n".join(lines)
def _on_history_copy(self, *_args):
row = self.history_list.get_selected_row()
if not row:
return
run = row._run
artifacts = self.history.list_artifacts(run["id"])
text = ""
for art in artifacts:
if art["kind"] == "output" and art.get("data") and art["data"].get("text"):
text = art["data"]["text"]
if text:
clipboard = Gtk.Clipboard.get(Gdk.SELECTION_CLIPBOARD)
clipboard.set_text(text, -1)
clipboard.store()
def _on_history_rerun(self, *_args):
row = self.history_list.get_selected_row()
if not row:
return
run = row._run
artifacts = self.history.list_artifacts(run["id"])
phase = run["phase"]
if phase == "ai":
self._open_ai_rerun(run, artifacts)
def _on_quick_run(self, *_args):
buf = self.quick_text.get_buffer()
start, end = buf.get_bounds()
text = buf.get_text(start, end, True).strip()
if not text:
self.widgets["quick_status"].set_text("No input text")
return
language = self.widgets["quick_language"].get_text().strip()
output = text
steps = self._collect_quick_steps()
if not steps:
self.widgets["quick_status"].set_text("No AI steps")
return
from aiprocess import AIConfig, build_processor
for idx, step in enumerate(steps, 1):
prompt_text = step.get("prompt_text") or ""
if prompt_text:
from aiprocess import GenericAPIProcessor
processor = GenericAPIProcessor(
AIConfig(
model=step["model"],
temperature=step["temperature"],
system_prompt_file=self.cfg.ai_system_prompt_file,
base_url=step["base_url"],
api_key=step["api_key"],
timeout_sec=step["timeout"],
language_hint=language,
)
)
processor.system = prompt_text
else:
processor = build_processor(
AIConfig(
model=step["model"],
temperature=step["temperature"],
system_prompt_file=self.cfg.ai_system_prompt_file,
base_url=step["base_url"],
api_key=step["api_key"],
timeout_sec=step["timeout"],
language_hint=language,
)
)
output = processor.process(output)
run_id = self.history.add_run("ai", "ok", self.cfg, None)
self.history.add_artifact(
run_id,
"input",
{
"step": idx,
"text": output,
"language": language,
"model": step["model"],
"temperature": step["temperature"],
"prompt_text": step.get("prompt_text") or "",
"base_url": step["base_url"],
},
)
self.history.add_artifact(run_id, "output", {"text": output})
self.widgets["quick_status"].set_text("Done")
self._refresh_history()
def _collect_quick_steps(self) -> list[dict]:
steps: list[dict] = []
for row in self.quick_steps.get_children():
e = row._lel_step_entries
model = e["model_entry"].get_text().strip()
combo = e["model_combo"]
if combo.get_visible():
combo_text = combo.get_active_text()
if combo_text:
model = combo_text
prompt_buf = e["prompt_text"].get_buffer()
start, end = prompt_buf.get_bounds()
prompt_text = prompt_buf.get_text(start, end, True).strip()
steps.append(
{
"model": model or self.cfg.ai_model,
"temperature": float(e["temperature"].get_value()),
"prompt_text": prompt_text,
"base_url": e["base_url"].get_text().strip() or self.cfg.ai_base_url,
"api_key": e["api_key"].get_text().strip() or self.cfg.ai_api_key,
"timeout": int(e["timeout"].get_value()),
}
)
return steps
def _open_ai_rerun(self, _run: dict, artifacts: list[dict]):
input_text = ""
for art in artifacts:
if art["kind"] == "input" and art.get("data"):
input_text = art["data"].get("text", "")
dialog = Gtk.Dialog(title="Re-run AI", transient_for=self.window, flags=0)
dialog.add_button("Run", Gtk.ResponseType.OK)
dialog.add_button("Cancel", Gtk.ResponseType.CANCEL)
box = dialog.get_content_area()
textview = Gtk.TextView()
textview.get_buffer().set_text(input_text)
scroll = Gtk.ScrolledWindow()
scroll.add(textview)
scroll.set_size_request(600, 300)
box.add(scroll)
dialog.show_all()
resp = dialog.run()
if resp == Gtk.ResponseType.OK:
buf = textview.get_buffer()
start, end = buf.get_bounds()
text = buf.get_text(start, end, True)
from aiprocess import AIConfig, build_processor
processor = build_processor(
AIConfig(
model=self.cfg.ai_model,
temperature=self.cfg.ai_temperature,
system_prompt_file=self.cfg.ai_system_prompt_file,
base_url=self.cfg.ai_base_url,
api_key=self.cfg.ai_api_key,
timeout_sec=self.cfg.ai_timeout_sec,
)
)
output = processor.process(text)
run_id = self.history.add_run("ai", "ok", self.cfg, None)
self.history.add_artifact(run_id, "input", {"text": text})
self.history.add_artifact(run_id, "output", {"text": output})
self._refresh_history()
dialog.destroy()
def _build_tabs(self):
self._add_tab("Hotkeys", self._build_hotkeys_tab())
self._add_tab("Recording", self._build_recording_tab())
self._add_tab("STT", self._build_stt_tab())
self._add_tab("Injection", self._build_injection_tab())
self._add_tab("AI", self._build_ai_tab())
self._add_tab("Edit", self._build_edit_tab())
self._add_tab("Context", self._build_context_tab())
self._add_tab("History", self._build_history_tab())
self._add_tab("Quick Run", self._build_quick_run_tab())
def _add_tab(self, title: str, widget: Gtk.Widget):
label = Gtk.Label(label=title)
self.notebook.append_page(widget, label)
def _grid(self) -> Gtk.Grid:
grid = Gtk.Grid()
grid.set_row_spacing(8)
grid.set_column_spacing(12)
grid.set_margin_top(8)
grid.set_margin_bottom(8)
grid.set_margin_start(8)
grid.set_margin_end(8)
return grid
def _entry(self, value: str) -> Gtk.Entry:
entry = Gtk.Entry()
entry.set_text(value or "")
return entry
def _spin(self, value: int, min_val: int, max_val: int) -> Gtk.SpinButton:
adj = Gtk.Adjustment(value=value, lower=min_val, upper=max_val, step_increment=1, page_increment=10)
spin = Gtk.SpinButton(adjustment=adj, climb_rate=1, digits=0)
return spin
def _float_spin(self, value: float, min_val: float, max_val: float, step: float) -> Gtk.SpinButton:
adj = Gtk.Adjustment(value=value, lower=min_val, upper=max_val, step_increment=step, page_increment=0.1)
spin = Gtk.SpinButton(adjustment=adj, climb_rate=0.1, digits=2)
return spin
def _combo(self, options: list[str], value: str) -> Gtk.ComboBoxText:
combo = Gtk.ComboBoxText()
for opt in options:
combo.append_text(opt)
combo.set_active(options.index(value) if value in options else 0)
return combo
def _row(self, grid: Gtk.Grid, row: int, label: str, widget: Gtk.Widget):
lbl = Gtk.Label(label=label)
lbl.set_xalign(0.0)
grid.attach(lbl, 0, row, 1, 1)
grid.attach(widget, 1, row, 1, 1)
def _build_hotkeys_tab(self) -> Gtk.Widget:
box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=8)
grid = self._grid()
self.widgets["hotkey"] = self._entry(self.cfg.hotkey)
self.widgets["edit_hotkey"] = self._entry(self.cfg.edit_hotkey)
self._row(grid, 0, "Hotkey", self.widgets["hotkey"])
self._row(grid, 1, "Edit Hotkey", self.widgets["edit_hotkey"])
box.pack_start(grid, False, False, 0)
lang_box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=6)
lang_label = Gtk.Label(label="Languages")
lang_label.set_xalign(0.0)
lang_box.pack_start(lang_label, False, False, 0)
self.lang_list = Gtk.ListBox()
for key, info in self.cfg.languages.items():
self._add_language_row(key, info)
lang_box.pack_start(self.lang_list, False, False, 0)
btn_add = Gtk.Button(label="Add Language")
btn_add.connect("clicked", lambda *_: self._add_language_row("", {"code": "", "hotkey": "", "label": ""}))
lang_box.pack_start(btn_add, False, False, 0)
box.pack_start(lang_box, False, False, 0)
return box
def _add_language_row(self, key: str, info: dict):
row = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=6)
key_entry = self._entry(key)
code_entry = self._entry(info.get("code", ""))
hotkey_entry = self._entry(info.get("hotkey", ""))
label_entry = self._entry(info.get("label", ""))
row.pack_start(Gtk.Label(label="Key"), False, False, 0)
row.pack_start(key_entry, True, True, 0)
row.pack_start(Gtk.Label(label="Code"), False, False, 0)
row.pack_start(code_entry, True, True, 0)
row.pack_start(Gtk.Label(label="Hotkey"), False, False, 0)
row.pack_start(hotkey_entry, True, True, 0)
row.pack_start(Gtk.Label(label="Label"), False, False, 0)
row.pack_start(label_entry, True, True, 0)
btn_remove = Gtk.Button(label="Remove")
btn_remove.connect("clicked", lambda *_: self.lang_list.remove(row))
row.pack_start(btn_remove, False, False, 0)
row._lel_lang_entries = (key_entry, code_entry, hotkey_entry, label_entry)
self.lang_list.add(row)
self.lang_list.show_all()
def _build_recording_tab(self) -> Gtk.Widget:
box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=8)
grid = self._grid()
self.widgets["ffmpeg_input"] = Gtk.ComboBoxText()
self._populate_mic_sources()
self.widgets["ffmpeg_path"] = self._entry(self.cfg.ffmpeg_path)
self.widgets["record_timeout_sec"] = self._spin(self.cfg.record_timeout_sec, 1, 3600)
self.widgets["edit_record_timeout_sec"] = self._spin(self.cfg.edit_record_timeout_sec, 1, 3600)
refresh_btn = Gtk.Button(label="Refresh")
refresh_btn.connect("clicked", lambda *_: self._populate_mic_sources())
mic_row = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=6)
mic_row.pack_start(self.widgets["ffmpeg_input"], True, True, 0)
mic_row.pack_start(refresh_btn, False, False, 0)
self._row(grid, 0, "Microphone", mic_row)
self._row(grid, 1, "FFmpeg Path", self.widgets["ffmpeg_path"])
self._row(grid, 2, "Record Timeout (sec)", self.widgets["record_timeout_sec"])
self._row(grid, 3, "Edit Record Timeout (sec)", self.widgets["edit_record_timeout_sec"])
box.pack_start(grid, False, False, 0)
return box
def _selected_mic_source(self) -> str:
combo = self.widgets["ffmpeg_input"]
text = combo.get_active_text() or ""
if text.startswith("pulse:"):
return text.split(" ", 1)[0]
return self.cfg.ffmpeg_input
def _populate_mic_sources(self):
combo: Gtk.ComboBoxText = self.widgets["ffmpeg_input"]
combo.remove_all()
sources, default_name = self._list_pulse_sources()
self._mic_sources = sources
selected = self.cfg.ffmpeg_input or "pulse:default"
selected_index = 0
for idx, (name, desc) in enumerate(sources):
text = f"pulse:{name} ({desc})"
combo.append_text(text)
if selected.startswith(f"pulse:{name}"):
selected_index = idx
if selected == "pulse:default" and default_name:
for idx, (name, _desc) in enumerate(sources):
if name == default_name:
selected_index = idx
break
if sources:
combo.set_active(selected_index)
else:
combo.append_text("pulse:default (default)")
combo.set_active(0)
def _list_pulse_sources(self) -> tuple[list[tuple[str, str]], str | None]:
default_name = None
try:
proc = subprocess.run(["pactl", "list", "sources", "short"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
if proc.returncode != 0:
return ([], None)
out = []
for line in proc.stdout.splitlines():
parts = line.split("\t")
if len(parts) >= 2:
name = parts[1]
desc = parts[-1] if parts[-1] else name
out.append((name, desc))
default_name = self._get_pulse_default_source()
return (out, default_name)
except Exception:
return ([], None)
def _get_pulse_default_source(self) -> str | None:
try:
proc = subprocess.run(["pactl", "info"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
if proc.returncode != 0:
return None
for line in proc.stdout.splitlines():
if line.lower().startswith("default source:"):
return line.split(":", 1)[1].strip()
except Exception:
return None
return None
def _build_stt_tab(self) -> Gtk.Widget:
grid = self._grid()
self.widgets["whisper_model"] = self._entry(self.cfg.whisper_model)
self.widgets["whisper_lang"] = self._entry(self.cfg.whisper_lang)
self.widgets["whisper_device"] = self._entry(self.cfg.whisper_device)
self._row(grid, 0, "Model", self.widgets["whisper_model"])
self._row(grid, 1, "Language", self.widgets["whisper_lang"])
self._row(grid, 2, "Device", self.widgets["whisper_device"])
return grid
def _build_injection_tab(self) -> Gtk.Widget:
grid = self._grid()
self.widgets["injection_backend"] = self._entry(self.cfg.injection_backend)
self.widgets["edit_injection_backend"] = self._entry(self.cfg.edit_injection_backend)
self._row(grid, 0, "Injection Backend", self.widgets["injection_backend"])
self._row(grid, 1, "Edit Injection Backend", self.widgets["edit_injection_backend"])
return grid
def _build_ai_tab(self) -> Gtk.Widget:
grid = self._grid()
self.widgets["ai_enabled"] = Gtk.CheckButton()
self.widgets["ai_enabled"].set_active(self.cfg.ai_enabled)
self.widgets["ai_model"] = self._entry(self.cfg.ai_model)
self.widgets["ai_temperature"] = self._float_spin(self.cfg.ai_temperature, 0.0, 2.0, 0.05)
self.widgets["ai_system_prompt_file"] = self._entry(self.cfg.ai_system_prompt_file)
self.widgets["ai_base_url"] = self._entry(self.cfg.ai_base_url)
self.widgets["ai_api_key"] = self._entry(self.cfg.ai_api_key)
self.widgets["ai_api_key"].set_visibility(False)
self.widgets["ai_timeout_sec"] = self._spin(self.cfg.ai_timeout_sec, 1, 600)
self._row(grid, 0, "AI Enabled", self.widgets["ai_enabled"])
self._row(grid, 1, "AI Model", self.widgets["ai_model"])
self._row(grid, 2, "AI Temperature", self.widgets["ai_temperature"])
self._row(grid, 3, "AI Prompt File", self.widgets["ai_system_prompt_file"])
self._row(grid, 4, "AI Base URL", self.widgets["ai_base_url"])
self._row(grid, 5, "AI API Key", self.widgets["ai_api_key"])
self._row(grid, 6, "AI Timeout (sec)", self.widgets["ai_timeout_sec"])
return grid
def _build_edit_tab(self) -> Gtk.Widget:
box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=8)
grid = self._grid()
self.widgets["edit_ai_enabled"] = Gtk.CheckButton()
self.widgets["edit_ai_enabled"].set_active(self.cfg.edit_ai_enabled)
self.widgets["edit_ai_temperature"] = self._float_spin(self.cfg.edit_ai_temperature, 0.0, 2.0, 0.05)
self.widgets["edit_ai_system_prompt_file"] = self._entry(self.cfg.edit_ai_system_prompt_file)
self.widgets["edit_window_width"] = self._spin(self.cfg.edit_window.get("width", 800), 200, 2400)
self.widgets["edit_window_height"] = self._spin(self.cfg.edit_window.get("height", 400), 200, 1600)
self._row(grid, 0, "Edit AI Enabled", self.widgets["edit_ai_enabled"])
self._row(grid, 1, "Edit AI Temperature", self.widgets["edit_ai_temperature"])
self._row(grid, 2, "Edit Prompt File", self.widgets["edit_ai_system_prompt_file"])
self._row(grid, 3, "Edit Window Width", self.widgets["edit_window_width"])
self._row(grid, 4, "Edit Window Height", self.widgets["edit_window_height"])
box.pack_start(grid, False, False, 0)
detect_grid = self._grid()
self.widgets["edit_lang_enabled"] = Gtk.CheckButton()
self.widgets["edit_lang_enabled"].set_active(self.cfg.edit_language_detection.get("enabled", True))
self.widgets["edit_lang_provider"] = self._entry(self.cfg.edit_language_detection.get("provider", "langdetect"))
self.widgets["edit_lang_fallback"] = self._entry(self.cfg.edit_language_detection.get("fallback_code", "en"))
self._row(detect_grid, 0, "Edit Lang Detect Enabled", self.widgets["edit_lang_enabled"])
self._row(detect_grid, 1, "Edit Lang Provider", self.widgets["edit_lang_provider"])
self._row(detect_grid, 2, "Edit Lang Fallback", self.widgets["edit_lang_fallback"])
box.pack_start(detect_grid, False, False, 0)
return box
def _build_context_tab(self) -> Gtk.Widget:
box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=8)
grid = self._grid()
self.widgets["context_provider"] = self._entry(self.cfg.context_capture.get("provider", "i3ipc"))
self.widgets["context_on_focus_change"] = self._entry(self.cfg.context_capture.get("on_focus_change", "abort"))
self._row(grid, 0, "Context Provider", self.widgets["context_provider"])
self._row(grid, 1, "On Focus Change", self.widgets["context_on_focus_change"])
box.pack_start(grid, False, False, 0)
rules_label = Gtk.Label(label="Context Rules")
rules_label.set_xalign(0.0)
box.pack_start(rules_label, False, False, 0)
self.rules_list = Gtk.ListBox()
for rule in self.cfg.context_rules:
self._add_rule_row(rule)
box.pack_start(self.rules_list, False, False, 0)
btn_add = Gtk.Button(label="Add Rule")
btn_add.connect("clicked", lambda *_: self._add_rule_row({}))
box.pack_start(btn_add, False, False, 0)
return box
def _build_history_tab(self) -> Gtk.Widget:
box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=8)
header = Gtk.Label(label="History")
header.set_xalign(0.0)
box.pack_start(header, False, False, 0)
filter_row = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=8)
self.widgets["history_phase"] = self._combo(["all", "record", "stt", "ai", "inject"], "all")
refresh_btn = Gtk.Button(label="Refresh")
refresh_btn.connect("clicked", self._refresh_history)
filter_row.pack_start(Gtk.Label(label="Phase"), False, False, 0)
filter_row.pack_start(self.widgets["history_phase"], False, False, 0)
filter_row.pack_start(refresh_btn, False, False, 0)
box.pack_start(filter_row, False, False, 0)
self.history_list = Gtk.ListBox()
self.history_list.set_selection_mode(Gtk.SelectionMode.SINGLE)
self.history_list.connect("row-selected", self._on_history_select)
box.pack_start(self.history_list, True, True, 0)
self.history_detail = Gtk.TextView()
self.history_detail.set_editable(False)
self.history_detail.set_wrap_mode(Gtk.WrapMode.WORD_CHAR)
detail_scroll = Gtk.ScrolledWindow()
detail_scroll.add(self.history_detail)
detail_scroll.set_vexpand(True)
box.pack_start(detail_scroll, True, True, 0)
action_row = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=8)
self.widgets["history_rerun"] = Gtk.Button(label="Re-run")
self.widgets["history_rerun"].connect("clicked", self._on_history_rerun)
self.widgets["history_copy"] = Gtk.Button(label="Copy Output")
self.widgets["history_copy"].connect("clicked", self._on_history_copy)
action_row.pack_start(self.widgets["history_rerun"], False, False, 0)
action_row.pack_start(self.widgets["history_copy"], False, False, 0)
box.pack_start(action_row, False, False, 0)
self._refresh_history()
return box
def _build_quick_run_tab(self) -> Gtk.Widget:
box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=8)
header = Gtk.Label(label="Bypass recording and run from text")
header.set_xalign(0.0)
box.pack_start(header, False, False, 0)
self.quick_text = Gtk.TextView()
self.quick_text.set_wrap_mode(Gtk.WrapMode.WORD_CHAR)
scroll = Gtk.ScrolledWindow()
scroll.add(self.quick_text)
scroll.set_size_request(600, 140)
box.pack_start(scroll, True, True, 0)
opts = self._grid()
self.widgets["quick_language"] = self._entry(self.cfg.whisper_lang)
self._row(opts, 0, "Language Hint", self.widgets["quick_language"])
box.pack_start(opts, False, False, 0)
steps_label = Gtk.Label(label="AI Steps (run in order)")
steps_label.set_xalign(0.0)
box.pack_start(steps_label, False, False, 0)
self.quick_steps = Gtk.ListBox()
self.quick_steps.set_selection_mode(Gtk.SelectionMode.NONE)
self.quick_steps.set_can_focus(False)
self._add_quick_step_row(
{
"model": self.cfg.ai_model,
"temperature": self.cfg.ai_temperature,
"prompt_file": self.cfg.ai_system_prompt_file,
"base_url": self.cfg.ai_base_url,
"api_key": self.cfg.ai_api_key,
"timeout": self.cfg.ai_timeout_sec,
}
)
box.pack_start(self.quick_steps, False, False, 0)
step_actions = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=8)
add_btn = Gtk.Button(label="Add Step")
add_btn.connect("clicked", lambda *_: self._add_quick_step_row({}))
step_actions.pack_start(add_btn, False, False, 0)
box.pack_start(step_actions, False, False, 0)
action = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=8)
run_btn = Gtk.Button(label="Run")
run_btn.connect("clicked", self._on_quick_run)
action.pack_start(run_btn, False, False, 0)
self.widgets["quick_status"] = Gtk.Label(label="")
self.widgets["quick_status"].set_xalign(0.0)
action.pack_start(self.widgets["quick_status"], True, True, 0)
box.pack_start(action, False, False, 0)
return box
def _add_quick_step_row(self, step: dict):
row = Gtk.ListBoxRow()
row.set_activatable(False)
row.set_selectable(False)
row.set_can_focus(False)
content = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=6)
content.set_can_focus(False)
grid = self._grid()
model_box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=6)
model_combo = Gtk.ComboBoxText()
model_entry = self._entry(step.get("model", self.cfg.ai_model))
model_box.pack_start(model_combo, True, True, 0)
model_box.pack_start(model_entry, True, True, 0)
temperature = self._float_spin(step.get("temperature", self.cfg.ai_temperature), 0.0, 2.0, 0.05)
prompt_text = Gtk.TextView()
prompt_text.set_wrap_mode(Gtk.WrapMode.WORD_CHAR)
prompt_buf = prompt_text.get_buffer()
prompt_buf.set_text(step.get("prompt_text", ""))
prompt_scroll = Gtk.ScrolledWindow()
prompt_scroll.set_size_request(400, 120)
prompt_scroll.add(prompt_text)
base_url = self._entry(step.get("base_url", self.cfg.ai_base_url))
api_key = self._entry(step.get("api_key", self.cfg.ai_api_key))
api_key.set_visibility(False)
timeout = self._spin(step.get("timeout", self.cfg.ai_timeout_sec), 1, 600)
self._row(grid, 0, "AI Model", model_box)
self._row(grid, 1, "AI Temperature", temperature)
self._row(grid, 2, "AI Prompt", prompt_scroll)
self._row(grid, 3, "AI Base URL", base_url)
self._row(grid, 4, "AI API Key", api_key)
self._row(grid, 5, "AI Timeout (sec)", timeout)
base_url.connect("changed", lambda *_: self._refresh_models_for_row(row))
controls = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=8)
btn_up = Gtk.Button(label="Up")
btn_down = Gtk.Button(label="Down")
btn_remove = Gtk.Button(label="Remove")
btn_up.connect("clicked", lambda *_: self._move_step(row, -1))
btn_down.connect("clicked", lambda *_: self._move_step(row, 1))
btn_remove.connect("clicked", lambda *_: self.quick_steps.remove(row))
controls.pack_start(btn_up, False, False, 0)
controls.pack_start(btn_down, False, False, 0)
controls.pack_start(btn_remove, False, False, 0)
content.pack_start(grid, False, False, 0)
content.pack_start(controls, False, False, 0)
row.add(content)
row._lel_step_entries = {
"model_combo": model_combo,
"model_entry": model_entry,
"temperature": temperature,
"prompt_text": prompt_text,
"base_url": base_url,
"api_key": api_key,
"timeout": timeout,
}
self._refresh_models_for_row(row)
self.quick_steps.add(row)
self.quick_steps.show_all()
def _move_step(self, row: Gtk.Widget, direction: int):
children = self.quick_steps.get_children()
idx = children.index(row)
new_idx = idx + direction
if new_idx < 0 or new_idx >= len(children):
return
self.quick_steps.remove(row)
self.quick_steps.insert(row, new_idx)
self.quick_steps.show_all()
def _refresh_models_for_row(self, row: Gtk.Widget):
e = row._lel_step_entries
base_url = e["base_url"].get_text().strip()
api_key = e["api_key"].get_text().strip()
timeout = int(e["timeout"].get_value())
models = self._get_models(base_url, api_key, timeout)
combo = e["model_combo"]
entry = e["model_entry"]
combo.remove_all()
if models:
for m in models:
combo.append_text(m)
combo.set_active(0)
combo.show()
entry.hide()
else:
combo.hide()
entry.show()
def _get_models(self, base_url: str, api_key: str, timeout: int) -> list[str]:
key = f"{base_url}|{api_key}|{timeout}"
if key in self._model_cache:
return self._model_cache[key]
models = list_models(base_url, api_key, timeout)
self._model_cache[key] = models
return models
def _add_rule_row(self, rule: dict):
row = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=6)
top = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=6)
tag_entry = self._entry(rule.get("tag", ""))
ai_prompt_entry = self._entry(rule.get("ai_prompt_file", ""))
inj_entry = self._entry(rule.get("injection_backend", ""))
ai_enabled = self._combo(["default", "true", "false"], "default")
if rule.get("ai_enabled") is True:
ai_enabled.set_active(1)
elif rule.get("ai_enabled") is False:
ai_enabled.set_active(2)
top.pack_start(Gtk.Label(label="Tag"), False, False, 0)
top.pack_start(tag_entry, True, True, 0)
top.pack_start(Gtk.Label(label="AI Prompt"), False, False, 0)
top.pack_start(ai_prompt_entry, True, True, 0)
top.pack_start(Gtk.Label(label="AI Enabled"), False, False, 0)
top.pack_start(ai_enabled, False, False, 0)
top.pack_start(Gtk.Label(label="Injection"), False, False, 0)
top.pack_start(inj_entry, True, True, 0)
match = rule.get("match") or {}
match_row = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=6)
app_id = self._entry(match.get("app_id", ""))
klass = self._entry(match.get("class", ""))
instance = self._entry(match.get("instance", ""))
title_contains = self._entry(match.get("title_contains", ""))
title_regex = self._entry(match.get("title_regex", ""))
match_row.pack_start(Gtk.Label(label="App ID"), False, False, 0)
match_row.pack_start(app_id, True, True, 0)
match_row.pack_start(Gtk.Label(label="Class"), False, False, 0)
match_row.pack_start(klass, True, True, 0)
match_row.pack_start(Gtk.Label(label="Instance"), False, False, 0)
match_row.pack_start(instance, True, True, 0)
match_row.pack_start(Gtk.Label(label="Title Contains"), False, False, 0)
match_row.pack_start(title_contains, True, True, 0)
match_row.pack_start(Gtk.Label(label="Title Regex"), False, False, 0)
match_row.pack_start(title_regex, True, True, 0)
btn_remove = Gtk.Button(label="Remove")
btn_remove.connect("clicked", lambda *_: self.rules_list.remove(row))
row.pack_start(top, False, False, 0)
row.pack_start(match_row, False, False, 0)
row.pack_start(btn_remove, False, False, 0)
row._lel_rule_entries = {
"tag": tag_entry,
"ai_prompt_file": ai_prompt_entry,
"ai_enabled": ai_enabled,
"injection_backend": inj_entry,
"app_id": app_id,
"class": klass,
"instance": instance,
"title_contains": title_contains,
"title_regex": title_regex,
}
self.rules_list.add(row)
self.rules_list.show_all()
def _on_save(self, *_args):
try:
cfg = self._collect_config()
validate(cfg)
self._write_config(cfg)
self.window.destroy()
except Exception as exc:
self._set_error(str(exc))
def _set_error(self, text: str):
self.error_label.set_text(text)
def _collect_config(self) -> Config:
cfg = Config()
cfg.hotkey = self.widgets["hotkey"].get_text().strip()
cfg.edit_hotkey = self.widgets["edit_hotkey"].get_text().strip()
cfg.ffmpeg_input = self._selected_mic_source()
cfg.ffmpeg_path = self.widgets["ffmpeg_path"].get_text().strip()
cfg.record_timeout_sec = int(self.widgets["record_timeout_sec"].get_value())
cfg.edit_record_timeout_sec = int(self.widgets["edit_record_timeout_sec"].get_value())
cfg.whisper_model = self.widgets["whisper_model"].get_text().strip()
cfg.whisper_lang = self.widgets["whisper_lang"].get_text().strip()
cfg.whisper_device = self.widgets["whisper_device"].get_text().strip()
cfg.injection_backend = self.widgets["injection_backend"].get_text().strip()
cfg.edit_injection_backend = self.widgets["edit_injection_backend"].get_text().strip()
cfg.ai_enabled = self.widgets["ai_enabled"].get_active()
cfg.ai_model = self.widgets["ai_model"].get_text().strip()
cfg.ai_temperature = float(self.widgets["ai_temperature"].get_value())
cfg.ai_system_prompt_file = self.widgets["ai_system_prompt_file"].get_text().strip()
cfg.ai_base_url = self.widgets["ai_base_url"].get_text().strip()
cfg.ai_api_key = self.widgets["ai_api_key"].get_text().strip()
cfg.ai_timeout_sec = int(self.widgets["ai_timeout_sec"].get_value())
cfg.edit_ai_enabled = self.widgets["edit_ai_enabled"].get_active()
cfg.edit_ai_temperature = float(self.widgets["edit_ai_temperature"].get_value())
cfg.edit_ai_system_prompt_file = self.widgets["edit_ai_system_prompt_file"].get_text().strip()
cfg.edit_window = {
"width": int(self.widgets["edit_window_width"].get_value()),
"height": int(self.widgets["edit_window_height"].get_value()),
}
cfg.edit_language_detection = {
"enabled": self.widgets["edit_lang_enabled"].get_active(),
"provider": self.widgets["edit_lang_provider"].get_text().strip() or "langdetect",
"fallback_code": self.widgets["edit_lang_fallback"].get_text().strip() or "en",
}
cfg.context_capture = {
"provider": self.widgets["context_provider"].get_text().strip() or "i3ipc",
"on_focus_change": self.widgets["context_on_focus_change"].get_text().strip() or "abort",
}
cfg.context_rules = self._collect_rules()
cfg.languages = self._collect_languages()
return cfg
def _collect_languages(self) -> dict:
out: dict[str, dict] = {}
for row in self.lang_list.get_children():
key_entry, code_entry, hotkey_entry, label_entry = row._lel_lang_entries
key = key_entry.get_text().strip()
if not key:
continue
out[key] = {
"code": code_entry.get_text().strip(),
"hotkey": hotkey_entry.get_text().strip(),
"label": label_entry.get_text().strip(),
}
return out
def _collect_rules(self) -> list[dict]:
rules: list[dict] = []
for row in self.rules_list.get_children():
e = row._lel_rule_entries
ai_enabled_val = e["ai_enabled"].get_active_text()
ai_enabled = None
if ai_enabled_val == "true":
ai_enabled = True
elif ai_enabled_val == "false":
ai_enabled = False
match = {
"app_id": e["app_id"].get_text().strip(),
"class": e["class"].get_text().strip(),
"instance": e["instance"].get_text().strip(),
"title_contains": e["title_contains"].get_text().strip(),
"title_regex": e["title_regex"].get_text().strip(),
}
match = {k: v for k, v in match.items() if v}
rule = {
"tag": e["tag"].get_text().strip(),
"ai_prompt_file": e["ai_prompt_file"].get_text().strip(),
"ai_enabled": ai_enabled,
"injection_backend": e["injection_backend"].get_text().strip(),
"match": match,
}
rule = {k: v for k, v in rule.items() if v is not None and v != ""}
rules.append(rule)
return rules
def _write_config(self, cfg: Config):
self.config_path.parent.mkdir(parents=True, exist_ok=True)
data = asdict(cfg)
self.config_path.write_text(json.dumps(data, indent=2), encoding="utf-8")
def open_settings_window(cfg: Config, config_path: Path):
return SettingsWindow(cfg, config_path)

View file

@ -33,11 +33,11 @@ class FasterWhisperSTT:
compute_type=_compute_type(self.cfg.device),
)
def transcribe(self, wav_path: str) -> str:
def transcribe(self, wav_path: str, language: str | None = None) -> str:
self._load()
segments, _info = self._model.transcribe(
wav_path,
language=self.cfg.language,
language=language or self.cfg.language,
vad_filter=self.cfg.vad_filter,
)
parts = []

View file

@ -0,0 +1,15 @@
You are a deterministic text editing engine.
You edit the provided text according to the user's spoken instruction.
Follow these rules strictly:
1. Do NOT add content not implied by the instruction.
2. Preserve tone and intent unless instructed otherwise.
3. Prefer minimal edits.
4. Keep formatting unless the instruction says to change it.
5. Do NOT explain; output ONLY the edited text.
Input format:
<text>...</text>
<instruction>...</instruction>
You should only output the raw text content, without any XML tags.

View file

@ -28,8 +28,12 @@ class Tray:
def _icon_path(self, state: str) -> str:
if state == "recording":
return str(self.base / "recording.png")
if state == "editing":
return str(self.base / "recording.png")
if state == "transcribing":
return str(self.base / "transcribing.png")
if state == "edit_processing":
return str(self.base / "processing.png")
if state == "processing":
return str(self.base / "processing.png")
return str(self.base / "idle.png")
@ -37,8 +41,12 @@ class Tray:
def _title(self, state: str) -> str:
if state == "recording":
return "Recording"
if state == "editing":
return "Editing"
if state == "transcribing":
return "Transcribing"
if state == "edit_processing":
return "Edit Processing"
if state == "processing":
return "AI Processing"
return "Idle"
@ -50,8 +58,13 @@ class Tray:
return True
def run_tray(state_getter, on_quit):
def run_tray(state_getter, on_quit, on_settings):
tray = Tray(state_getter, on_quit)
tray.update()
GLib.timeout_add(250, tray.update)
if on_settings:
settings_item = Gtk.MenuItem(label="Settings")
settings_item.connect("activate", lambda *_: on_settings())
tray.menu.prepend(settings_item)
tray.menu.show_all()
Gtk.main()