Add context capture and rules

This commit is contained in:
Thales Maciel 2026-02-07 18:10:21 -03:00
parent 34ecdcbfde
commit 0e79edfa20
7 changed files with 247 additions and 80 deletions

View file

@ -13,6 +13,7 @@ from config import Config, load, redacted_dict
from recorder import start_recording, stop_recording
from stt import FasterWhisperSTT, STTConfig
from aiprocess import AIConfig, build_processor
from context import ContextRule, I3Provider, match_rule
from inject import inject
from x11_hotkey import listen
from tray import run_tray
@ -34,6 +35,13 @@ class Daemon:
self.proc = None
self.record = None
self.timer = None
self.context = None
self.context_provider = None
if cfg.context_capture.get("provider") == "i3ipc":
self.context_provider = I3Provider()
else:
raise RuntimeError("unsupported context_capture.provider")
self.context_rules = [ContextRule(**r) for r in cfg.context_rules]
self.stt = FasterWhisperSTT(
STTConfig(
model=cfg.whisper_model,
@ -43,17 +51,6 @@ class Daemon:
)
)
self.ai = None
if cfg.ai_enabled:
self.ai = build_processor(
AIConfig(
model=cfg.ai_model,
temperature=cfg.ai_temperature,
system_prompt_file=cfg.ai_system_prompt_file,
base_url=cfg.ai_base_url,
api_key=cfg.ai_api_key,
timeout_sec=cfg.ai_timeout_sec,
)
)
def set_state(self, state: str):
with self.lock:
@ -80,6 +77,12 @@ class Daemon:
except Exception as exc:
logging.error("record start failed: %s", exc)
return
try:
if self.context_provider:
self.context = self.context_provider.capture()
except Exception as exc:
logging.error("context capture failed: %s", exc)
self.context = None
self.proc = proc
self.record = record
self.state = State.RECORDING
@ -133,10 +136,31 @@ class Daemon:
logging.info("stt: %s", text)
if self.ai:
rule = match_rule(self.context, self.context_rules) if self.context else None
if rule:
logging.info("context matched rule%s", f" ({rule.tag})" if rule.tag else "")
ai_enabled = self.cfg.ai_enabled
ai_prompt_file = self.cfg.ai_system_prompt_file
if rule and rule.ai_enabled is not None:
ai_enabled = rule.ai_enabled
if rule and rule.ai_prompt_file:
ai_prompt_file = rule.ai_prompt_file
if ai_enabled:
self.set_state(State.PROCESSING)
try:
text = self.ai.process(text) or text
processor = build_processor(
AIConfig(
model=self.cfg.ai_model,
temperature=self.cfg.ai_temperature,
system_prompt_file=ai_prompt_file,
base_url=self.cfg.ai_base_url,
api_key=self.cfg.ai_api_key,
timeout_sec=self.cfg.ai_timeout_sec,
)
)
text = processor.process(text) or text
except Exception as exc:
logging.error("ai process failed: %s", exc)
@ -144,7 +168,15 @@ class Daemon:
try:
self.set_state(State.OUTPUTTING)
inject(text, self.cfg.injection_backend)
if self.context_provider and self.context:
if not self.context_provider.is_same_focus(self.context):
logging.info("focus changed, aborting injection")
self.set_state(State.IDLE)
return
backend = self.cfg.injection_backend
if rule and rule.injection_backend:
backend = rule.injection_backend
inject(text, backend)
except Exception as exc:
logging.error("output failed: %s", exc)
finally:
@ -168,7 +200,8 @@ def _lock_single_instance():
fcntl.flock(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
except Exception:
raise SystemExit("another instance is running")
# TODO: kindly try to handle the running PID to the user cleanly in stdout if it's easy to get
raise SystemExit("already running")
return f