Update project files

This commit is contained in:
Thales Maciel 2026-02-10 11:01:36 -03:00
parent ad66a0d3cb
commit a17c234360
14 changed files with 952 additions and 529 deletions

View file

@ -1,8 +1,9 @@
from __future__ import annotations
import re
from dataclasses import dataclass
import i3ipc
@dataclass
class Context:
@ -13,88 +14,30 @@ class Context:
title: str
@dataclass
class ContextRule:
match: dict
ai_prompt_file: str | None = None
ai_enabled: bool | None = None
injection_backend: str | None = None
tag: str | None = None
class ContextProvider:
def capture(self) -> Context:
raise NotImplementedError
def is_same_focus(self, ctx: Context) -> bool:
raise NotImplementedError
class I3Provider(ContextProvider):
class I3Provider:
def __init__(self):
import i3ipc
self.i3 = i3ipc.Connection()
self._conn = i3ipc.Connection()
def _focused(self):
node = self.i3.get_tree().find_focused()
if node is None:
raise RuntimeError("no focused window")
return node
return self._conn.get_tree().find_focused()
def capture(self) -> Context:
node = self._focused()
props = getattr(node, "window_properties", None) or {}
return Context(
window_id=node.id,
app_id=getattr(node, "app_id", None) or "",
klass=props.get("class") or "",
instance=props.get("instance") or "",
title=getattr(node, "name", None) or "",
app_id=node.app_id or "",
klass=node.window_class or "",
instance=node.window_instance or "",
title=node.name or "",
)
def is_same_focus(self, ctx: Context) -> bool:
node = self._focused()
return node.id == ctx.window_id
return bool(node and node.id == ctx.window_id)
def focus_window(self, window_id: int) -> bool:
node = self.i3.get_tree().find_by_id(window_id)
if node is None:
try:
self._conn.command(f"[con_id={window_id}] focus")
return True
except Exception:
return False
node.command("focus")
return True
def _match_text(val: str, needle: str | None) -> bool:
if not needle:
return True
return val == needle
def _match_title_contains(title: str, needle: str | None) -> bool:
if not needle:
return True
return needle.lower() in title.lower()
def _match_title_regex(title: str, pattern: str | None) -> bool:
if not pattern:
return True
return re.search(pattern, title) is not None
def match_rule(ctx: Context, rules: list[ContextRule]) -> ContextRule | None:
for rule in rules:
match = rule.match or {}
if not _match_text(ctx.app_id, match.get("app_id")):
continue
if not _match_text(ctx.klass, match.get("class")):
continue
if not _match_text(ctx.instance, match.get("instance")):
continue
if not _match_title_contains(ctx.title, match.get("title_contains")):
continue
if not _match_title_regex(ctx.title, match.get("title_regex")):
continue
return rule
return None

View file

@ -1,101 +0,0 @@
from __future__ import annotations
import threading
from dataclasses import dataclass
import gi
gi.require_version("Gtk", "3.0")
gi.require_version("Gdk", "3.0")
from gi.repository import Gdk, GLib, Gtk
@dataclass
class EditWindowConfig:
width: int = 800
height: int = 400
class EditWindow:
def __init__(self, text: str, cfg: EditWindowConfig, on_apply, on_copy_close):
self.on_apply = on_apply
self.on_copy_close = on_copy_close
self.window = Gtk.Window(title="lel edit")
self.window.set_default_size(cfg.width, cfg.height)
self.window.set_keep_above(True)
self.window.set_position(Gtk.WindowPosition.CENTER)
self.window.set_type_hint(Gdk.WindowTypeHint.DIALOG)
self.window.connect("delete-event", self._on_close)
self.status = Gtk.Label(label="Listening...")
self.status.set_xalign(0.0)
scrolled = Gtk.ScrolledWindow()
scrolled.set_hexpand(True)
scrolled.set_vexpand(True)
self.textview = Gtk.TextView()
self.textview.set_wrap_mode(Gtk.WrapMode.WORD_CHAR)
buffer = self.textview.get_buffer()
buffer.set_text(text)
scrolled.add(self.textview)
apply_btn = Gtk.Button(label="Apply")
apply_btn.connect("clicked", self._on_apply)
button_box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=8)
button_box.pack_end(apply_btn, False, False, 0)
vbox = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=8)
vbox.set_border_width(12)
vbox.pack_start(self.status, False, False, 0)
vbox.pack_start(scrolled, True, True, 0)
vbox.pack_start(button_box, False, False, 0)
self.window.add(vbox)
self.window.show_all()
self.textview.grab_focus()
accel = Gtk.AccelGroup()
self.window.add_accel_group(accel)
key, mod = Gtk.accelerator_parse("<Ctrl>c")
accel.connect(key, mod, Gtk.AccelFlags.VISIBLE, self._on_copy)
def _on_apply(self, *_args):
self.on_apply(self.get_text())
def _on_copy(self, *_args):
self.on_copy_close(self.get_text())
return True
def _on_close(self, *_args):
self.on_copy_close("")
return True
def get_text(self) -> str:
buf = self.textview.get_buffer()
start, end = buf.get_bounds()
return buf.get_text(start, end, True)
def set_status(self, text: str) -> None:
self.status.set_text(text)
def close(self) -> None:
self.window.destroy()
def open_edit_window(text: str, cfg: EditWindowConfig, on_apply, on_copy_close) -> EditWindow:
holder: dict[str, EditWindow] = {}
ready = threading.Event()
def _create():
holder["win"] = EditWindow(text, cfg, on_apply, on_copy_close)
ready.set()
return False
GLib.idle_add(_create)
if not ready.wait(timeout=2.0):
raise RuntimeError("GTK main loop not running; cannot open edit window")
return holder["win"]

View file

@ -1,24 +0,0 @@
from __future__ import annotations
from langdetect import DetectorFactory, detect
DetectorFactory.seed = 0
def detect_language(text: str, fallback: str = "en") -> str:
cleaned = (text or "").strip()
if not cleaned:
return fallback
try:
code = detect(cleaned)
except Exception:
return fallback
return _normalize(code) or fallback
def _normalize(code: str) -> str:
if not code:
return ""
if code.lower() == "pt":
return "pt-BR"
return code

View file

@ -13,12 +13,9 @@ from config import Config, load, redacted_dict
from recorder import start_recording, stop_recording
from stt import FasterWhisperSTT, STTConfig
from aiprocess import AIConfig, build_processor
from context import ContextRule, I3Provider, match_rule
from edit_window import EditWindowConfig, open_edit_window
from inject import inject, write_clipboard
from context import I3Provider
from inject import inject
from history import HistoryStore
from language import detect_language
from selection import read_primary_selection
from x11_hotkey import listen
from tray import run_tray
from settings_window import open_settings_window
@ -29,8 +26,6 @@ class State:
RECORDING = "recording"
TRANSCRIBING = "transcribing"
PROCESSING = "processing"
EDITING = "editing"
EDIT_PROCESSING = "edit_processing"
OUTPUTTING = "outputting"
@ -44,24 +39,13 @@ class Daemon:
self.proc = None
self.record = None
self.timer = None
self.active_language = cfg.whisper_lang
self.context = None
self.edit_proc = None
self.edit_record = None
self.edit_timer = None
self.edit_context = None
self.edit_window = None
self.context_provider = None
if cfg.context_capture.get("provider") == "i3ipc":
self.context_provider = I3Provider()
else:
raise RuntimeError("unsupported context_capture.provider")
self.context_rules = [ContextRule(**r) for r in cfg.context_rules]
self.context_provider = I3Provider()
self.stt = FasterWhisperSTT(
STTConfig(
model=cfg.whisper_model,
model=cfg.transcribing.get("model", "base"),
language=None,
device=cfg.whisper_device,
device=cfg.transcribing.get("device", "cpu"),
vad_filter=True,
)
)
@ -78,10 +62,9 @@ class Daemon:
with self.lock:
return self.state
def toggle(self, language_code: str | None = None):
def toggle(self):
with self.lock:
if self.state == State.IDLE:
self.active_language = language_code or self.cfg.whisper_lang
self._start_recording_locked()
return
if self.state == State.RECORDING:
@ -90,17 +73,9 @@ class Daemon:
return
logging.info("busy (%s), trigger ignored", self.state)
def edit_trigger(self):
with self.lock:
if self.state != State.IDLE:
logging.info("busy (%s), edit trigger ignored", self.state)
return
self.state = State.EDITING
threading.Thread(target=self._start_edit_flow, daemon=True).start()
def _start_recording_locked(self):
try:
proc, record = start_recording(self.cfg.ffmpeg_input, self.cfg.ffmpeg_path)
proc, record = start_recording(self.cfg.recording.get("input", "pulse:default"))
except Exception as exc:
logging.error("record start failed: %s", exc)
return
@ -129,7 +104,7 @@ class Daemon:
self.history.add_artifact(run_id, "audio", {"path": record.wav_path}, record.wav_path)
if self.timer:
self.timer.cancel()
self.timer = threading.Timer(self.cfg.record_timeout_sec, self._timeout_stop)
self.timer = threading.Timer(300, self._timeout_stop)
self.timer.daemon = True
self.timer.start()
@ -169,27 +144,24 @@ class Daemon:
try:
self.set_state(State.TRANSCRIBING)
logging.info("transcribing started")
text = self.stt.transcribe(record.wav_path, language=self.active_language)
text = self.stt.transcribe(record.wav_path, language="en")
except Exception as exc:
logging.error("stt failed: %s", exc)
self.set_state(State.IDLE)
return
text = (text or "").strip()
if not text:
self.set_state(State.IDLE)
return
logging.info("stt: %s", text)
run_id = self.history.add_run("stt", "ok", self.cfg, self._context_json(self.context))
self.history.add_artifact(run_id, "input", {"wav_path": record.wav_path, "language": self.active_language})
self.history.add_artifact(run_id, "input", {"wav_path": record.wav_path, "language": "en"})
self.history.add_artifact(run_id, "output", {"text": text})
rule = match_rule(self.context, self.context_rules) if self.context else None
if rule:
logging.info("context matched rule%s", f" ({rule.tag})" if rule.tag else "")
ai_enabled = self.cfg.ai_enabled
ai_prompt_file = self.cfg.ai_system_prompt_file
if rule and rule.ai_enabled is not None:
ai_enabled = rule.ai_enabled
if rule and rule.ai_prompt_file:
ai_prompt_file = rule.ai_prompt_file
ai_enabled = self.cfg.ai_cleanup.get("enabled", False)
ai_prompt_file = ""
if ai_enabled:
self.set_state(State.PROCESSING)
@ -197,13 +169,13 @@ class Daemon:
try:
processor = build_processor(
AIConfig(
model=self.cfg.ai_model,
temperature=self.cfg.ai_temperature,
model=self.cfg.ai_cleanup.get("model", ""),
temperature=self.cfg.ai_cleanup.get("temperature", 0.0),
system_prompt_file=ai_prompt_file,
base_url=self.cfg.ai_base_url,
api_key=self.cfg.ai_api_key,
timeout_sec=self.cfg.ai_timeout_sec,
language_hint=self.active_language,
base_url=self.cfg.ai_cleanup.get("base_url", ""),
api_key=self.cfg.ai_cleanup.get("api_key", ""),
timeout_sec=25,
language_hint="en",
)
)
ai_input = text
@ -212,7 +184,11 @@ class Daemon:
self.history.add_artifact(
run_id,
"input",
{"text": ai_input, "model": self.cfg.ai_model, "temperature": self.cfg.ai_temperature},
{
"text": ai_input,
"model": self.cfg.ai_cleanup.get("model", ""),
"temperature": self.cfg.ai_cleanup.get("temperature", 0.0),
},
)
self.history.add_artifact(run_id, "output", {"text": text})
except Exception as exc:
@ -228,9 +204,7 @@ class Daemon:
logging.info("focus changed, aborting injection")
self.set_state(State.IDLE)
return
backend = self.cfg.injection_backend
if rule and rule.injection_backend:
backend = rule.injection_backend
backend = self.cfg.injection.get("backend", "clipboard")
inject(text, backend)
run_id = self.history.add_run("inject", "ok", self.cfg, self._context_json(self.context))
self.history.add_artifact(run_id, "input", {"text": text, "backend": backend})
@ -239,183 +213,6 @@ class Daemon:
finally:
self.set_state(State.IDLE)
def _start_edit_flow(self):
try:
text = read_primary_selection()
except Exception as exc:
logging.error("selection capture failed: %s", exc)
self.set_state(State.IDLE)
return
text = (text or "").strip()
if not text:
logging.info("selection empty, aborting edit")
self.set_state(State.IDLE)
return
edit_language = self.cfg.edit_language_detection.get("fallback_code", self.cfg.whisper_lang)
if self.cfg.edit_language_detection.get("enabled"):
edit_language = detect_language(text, fallback=edit_language)
self.active_language = edit_language
try:
if self.context_provider:
self.edit_context = self.context_provider.capture()
except Exception as exc:
logging.error("context capture failed: %s", exc)
self.edit_context = None
if self.edit_context:
logging.info(
"edit context: id=%s app_id=%s class=%s instance=%s title=%s",
self.edit_context.window_id,
self.edit_context.app_id,
self.edit_context.klass,
self.edit_context.instance,
self.edit_context.title,
)
else:
logging.info("edit context: none")
try:
proc, record = start_recording(self.cfg.ffmpeg_input, self.cfg.ffmpeg_path)
except Exception as exc:
logging.error("record start failed: %s", exc)
self.set_state(State.IDLE)
return
self.edit_proc = proc
self.edit_record = record
logging.info("edit recording started (%s)", record.wav_path)
run_id = self.history.add_run("record", "started", self.cfg, self._context_json(self.edit_context))
self.history.add_artifact(run_id, "audio", {"path": record.wav_path}, record.wav_path)
if self.edit_timer:
self.edit_timer.cancel()
self.edit_timer = threading.Timer(self.cfg.edit_record_timeout_sec, self._edit_timeout_stop)
self.edit_timer.daemon = True
self.edit_timer.start()
try:
self.edit_window = open_edit_window(
text,
EditWindowConfig(**self.cfg.edit_window),
self._on_edit_apply,
self._on_edit_copy_close,
)
except Exception as exc:
logging.error("edit window failed: %s", exc)
self._abort_edit()
return
def _edit_timeout_stop(self):
logging.info("edit recording timeout")
self._on_edit_apply(self._edit_get_text())
def _edit_get_text(self) -> str:
if not self.edit_window:
return ""
return self.edit_window.get_text()
def _on_edit_copy_close(self, text: str):
if text:
try:
write_clipboard(text)
except Exception as exc:
logging.error("copy failed: %s", exc)
self._abort_edit()
def _on_edit_apply(self, text: str):
if self.state != State.EDITING:
return
self.set_state(State.EDIT_PROCESSING)
threading.Thread(target=self._stop_and_process_edit, args=(text,), daemon=True).start()
def _stop_and_process_edit(self, base_text: str):
proc = self.edit_proc
record = self.edit_record
self.edit_proc = None
self.edit_record = None
if self.edit_timer:
self.edit_timer.cancel()
self.edit_timer = None
if not proc or not record:
self._abort_edit()
return
try:
stop_recording(proc)
except Exception as exc:
logging.error("record stop failed: %s", exc)
self._abort_edit()
return
if not Path(record.wav_path).exists():
logging.error("no audio captured")
self._abort_edit()
return
try:
logging.info("edit transcribing started")
instruction = self.stt.transcribe(record.wav_path, language=self.active_language)
except Exception as exc:
logging.error("stt failed: %s", exc)
self._abort_edit()
return
logging.info("edit instruction: %s", instruction)
run_id = self.history.add_run("stt", "ok", self.cfg, self._context_json(self.edit_context))
self.history.add_artifact(run_id, "input", {"wav_path": record.wav_path, "language": self.active_language})
self.history.add_artifact(run_id, "output", {"text": instruction})
result = base_text
if self.cfg.edit_ai_enabled:
try:
prompt_file = self.cfg.edit_ai_system_prompt_file
if not prompt_file:
prompt_file = str(Path(__file__).parent / "system_prompt_edit.txt")
processor = build_processor(
AIConfig(
model=self.cfg.ai_model,
temperature=self.cfg.edit_ai_temperature,
system_prompt_file=prompt_file,
base_url=self.cfg.ai_base_url,
api_key=self.cfg.ai_api_key,
timeout_sec=self.cfg.ai_timeout_sec,
language_hint=None,
wrap_transcript=False,
)
)
payload = f"<text>{base_text}</text>\n<instruction>{instruction}</instruction>"
result = processor.process(payload) or base_text
run_id = self.history.add_run("ai", "ok", self.cfg, self._context_json(self.edit_context))
self.history.add_artifact(
run_id,
"input",
{"text": payload, "model": self.cfg.ai_model, "temperature": self.cfg.edit_ai_temperature},
)
self.history.add_artifact(run_id, "output", {"text": result})
except Exception as exc:
logging.error("ai process failed: %s", exc)
logging.info("edit result: %s", result)
if self.edit_window:
self.edit_window.set_status("Applying...")
if self.context_provider and self.edit_context:
if not self.context_provider.focus_window(self.edit_context.window_id):
logging.info("original window missing, aborting edit injection")
self._abort_edit()
return
try:
inject(result, self.cfg.edit_injection_backend)
run_id = self.history.add_run("inject", "ok", self.cfg, self._context_json(self.edit_context))
self.history.add_artifact(run_id, "input", {"text": result, "backend": self.cfg.edit_injection_backend})
except Exception as exc:
logging.error("output failed: %s", exc)
finally:
self._abort_edit()
def _context_json(self, ctx):
if not ctx:
return None
@ -427,21 +224,6 @@ class Daemon:
"title": ctx.title,
}
def _abort_edit(self):
if self.edit_window:
try:
self.edit_window.close()
except Exception:
pass
self.edit_window = None
self.edit_proc = None
self.edit_record = None
self.edit_context = None
if self.edit_timer:
self.edit_timer.cancel()
self.edit_timer = None
self.set_state(State.IDLE)
def stop_recording(self):
with self.lock:
if self.state != State.RECORDING:
@ -487,11 +269,11 @@ def main():
_lock_single_instance()
hotkeys = ", ".join(f"{name}={info.get('hotkey')}" for name, info in cfg.languages.items())
logging.info("ready (hotkeys: %s; edit: %s)", hotkeys, cfg.edit_hotkey)
logging.info("ready (hotkey: %s)", cfg.daemon.get("hotkey", ""))
logging.info("config (%s):\n%s", args.config or str(Path.home() / ".config" / "lel" / "config.json"), json.dumps(redacted_dict(cfg), indent=2))
daemon = Daemon(cfg)
suppress_hotkeys = threading.Event()
def on_quit():
os._exit(0)
@ -508,21 +290,30 @@ def main():
signal.signal(signal.SIGTERM, handle_signal)
if args.no_tray:
listen(cfg.hotkey, lambda: logging.info("hotkey pressed (dry-run)") if args.dry_run else daemon.toggle())
listen(
cfg.daemon.get("hotkey", ""),
lambda: logging.info("hotkey pressed (dry-run)")
if args.dry_run
else (None if suppress_hotkeys.is_set() else daemon.toggle()),
)
return
for name, info in cfg.languages.items():
hotkey = info.get("hotkey")
code = info.get("code")
threading.Thread(
target=lambda h=hotkey, c=code: listen(
h,
lambda: logging.info("hotkey pressed (dry-run)") if args.dry_run else daemon.toggle(c),
),
daemon=True,
).start()
threading.Thread(target=lambda: listen(cfg.edit_hotkey, lambda: logging.info("edit hotkey pressed (dry-run)") if args.dry_run else daemon.edit_trigger()), daemon=True).start()
run_tray(daemon.get_state, on_quit, lambda: open_settings_window(load(args.config), config_path))
threading.Thread(
target=lambda: listen(
cfg.daemon.get("hotkey", ""),
lambda: logging.info("hotkey pressed (dry-run)")
if args.dry_run
else (None if suppress_hotkeys.is_set() else daemon.toggle()),
),
daemon=True,
).start()
def open_settings():
suppress_hotkeys.set()
win = open_settings_window(load(args.config), config_path)
win.window.connect("destroy", lambda *_: suppress_hotkeys.clear())
return win
run_tray(daemon.get_state, on_quit, open_settings)
if __name__ == "__main__":

View file

@ -13,9 +13,7 @@ class RecordResult:
temp_dir: str
def _resolve_ffmpeg_path(explicit: str) -> str:
if explicit:
return explicit
def _resolve_ffmpeg_path() -> str:
appdir = os.getenv("APPDIR")
if appdir:
candidate = Path(appdir) / "usr" / "bin" / "ffmpeg"
@ -34,7 +32,7 @@ def _ffmpeg_input_args(spec: str) -> list[str]:
return ["-f", kind, "-i", name]
def start_recording(ffmpeg_input: str, ffmpeg_path: str) -> tuple[subprocess.Popen, RecordResult]:
def start_recording(ffmpeg_input: str) -> tuple[subprocess.Popen, RecordResult]:
tmpdir = tempfile.mkdtemp(prefix="lel-")
wav = str(Path(tmpdir) / "mic.wav")
@ -43,7 +41,7 @@ def start_recording(ffmpeg_input: str, ffmpeg_path: str) -> tuple[subprocess.Pop
args += ["-ac", "1", "-ar", "16000", "-c:a", "pcm_s16le", wav]
proc = subprocess.Popen(
[_resolve_ffmpeg_path(ffmpeg_path), *args],
[_resolve_ffmpeg_path(), *args],
preexec_fn=os.setsid,
)
return proc, RecordResult(wav_path=wav, temp_dir=tmpdir)

View file

@ -1,33 +0,0 @@
from __future__ import annotations
import time
from Xlib import X, Xatom, display
def read_primary_selection(timeout_sec: float = 2.0) -> str:
disp = display.Display()
root = disp.screen().root
win = root.create_window(0, 0, 1, 1, 0, X.CopyFromParent)
utf8 = disp.intern_atom("UTF8_STRING")
prop = disp.intern_atom("LEL_SELECTION")
win.convert_selection(Xatom.PRIMARY, utf8, prop, X.CurrentTime)
disp.flush()
end = time.time() + timeout_sec
while time.time() < end:
if disp.pending_events():
ev = disp.next_event()
if ev.type == X.SelectionNotify:
if ev.property == X.NONE:
return ""
data = win.get_property(prop, X.AnyPropertyType, 0, 2**31 - 1)
if not data or data.value is None:
return ""
try:
return data.value.decode("utf-8", errors="ignore")
except Exception:
return ""
else:
time.sleep(0.01)
return ""

View file

@ -1,15 +0,0 @@
You are a deterministic text editing engine.
You edit the provided text according to the user's spoken instruction.
Follow these rules strictly:
1. Do NOT add content not implied by the instruction.
2. Preserve tone and intent unless instructed otherwise.
3. Prefer minimal edits.
4. Keep formatting unless the instruction says to change it.
5. Do NOT explain; output ONLY the edited text.
Input format:
<text>...</text>
<instruction>...</instruction>
You should only output the raw text content, without any XML tags.

View file

@ -28,12 +28,8 @@ class Tray:
def _icon_path(self, state: str) -> str:
if state == "recording":
return str(self.base / "recording.png")
if state == "editing":
return str(self.base / "recording.png")
if state == "transcribing":
return str(self.base / "transcribing.png")
if state == "edit_processing":
return str(self.base / "processing.png")
if state == "processing":
return str(self.base / "processing.png")
return str(self.base / "idle.png")
@ -41,12 +37,8 @@ class Tray:
def _title(self, state: str) -> str:
if state == "recording":
return "Recording"
if state == "editing":
return "Editing"
if state == "transcribing":
return "Transcribing"
if state == "edit_processing":
return "Edit Processing"
if state == "processing":
return "AI Processing"
return "Idle"