Refine config and runtime flow

This commit is contained in:
Thales Maciel 2026-02-24 14:15:17 -03:00
parent 85e082dd46
commit b3be444625
No known key found for this signature in database
GPG key ID: 33112E6833C34679
16 changed files with 642 additions and 137 deletions

View file

@ -5,12 +5,9 @@ import logging
import os
import sys
import urllib.request
from dataclasses import dataclass
from typing import Any, Callable, cast
from llama_cpp import Llama, llama_cpp as llama_cpp_lib # type: ignore[import-not-found]
from constants import LLM_LANGUAGE, MODEL_DIR, MODEL_NAME, MODEL_PATH, MODEL_URL
from constants import MODEL_DIR, MODEL_NAME, MODEL_PATH, MODEL_URL
SYSTEM_PROMPT = (
@ -36,7 +33,8 @@ SYSTEM_PROMPT = (
class LlamaProcessor:
def __init__(self, verbose=False):
def __init__(self, verbose: bool = False):
Llama, llama_cpp_lib = _load_llama_bindings()
ensure_model()
if not verbose:
os.environ.setdefault("LLAMA_CPP_LOG_LEVEL", "ERROR")
@ -100,6 +98,16 @@ def ensure_model():
raise
def _load_llama_bindings():
try:
from llama_cpp import Llama, llama_cpp as llama_cpp_lib # type: ignore[import-not-found]
except ModuleNotFoundError as exc:
raise RuntimeError(
"llama-cpp-python is not installed; install dependencies with `uv sync`"
) from exc
return Llama, llama_cpp_lib
def _extract_chat_text(payload: Any) -> str:
if "choices" in payload and payload["choices"]:
choice = payload["choices"][0]

View file

@ -1,16 +1,59 @@
from __future__ import annotations
import json
from dataclasses import dataclass, field
from dataclasses import asdict, dataclass, field
from pathlib import Path
from typing import Any
from constants import DEFAULT_CONFIG_PATH
DEFAULT_HOTKEY = "Cmd+m"
DEFAULT_STT_MODEL = "base"
DEFAULT_STT_DEVICE = "cpu"
DEFAULT_INJECTION_BACKEND = "clipboard"
ALLOWED_INJECTION_BACKENDS = {"clipboard", "injection"}
@dataclass
class DaemonConfig:
hotkey: str = DEFAULT_HOTKEY
@dataclass
class RecordingConfig:
input: str | int | None = ""
@dataclass
class SttConfig:
model: str = DEFAULT_STT_MODEL
device: str = DEFAULT_STT_DEVICE
@dataclass
class InjectionConfig:
backend: str = DEFAULT_INJECTION_BACKEND
@dataclass
class AiConfig:
enabled: bool = True
@dataclass
class LoggingConfig:
log_transcript: bool = False
@dataclass
class Config:
daemon: dict = field(default_factory=lambda: {"hotkey": "Cmd+m"})
recording: dict = field(default_factory=lambda: {"input": ""})
stt: dict = field(default_factory=lambda: {"model": "base", "device": "cpu"})
injection: dict = field(default_factory=lambda: {"backend": "clipboard"})
daemon: DaemonConfig = field(default_factory=DaemonConfig)
recording: RecordingConfig = field(default_factory=RecordingConfig)
stt: SttConfig = field(default_factory=SttConfig)
injection: InjectionConfig = field(default_factory=InjectionConfig)
ai: AiConfig = field(default_factory=AiConfig)
logging: LoggingConfig = field(default_factory=LoggingConfig)
def load(path: str | None) -> Config:
@ -18,33 +61,120 @@ def load(path: str | None) -> Config:
p = Path(path) if path else DEFAULT_CONFIG_PATH
if p.exists():
data = json.loads(p.read_text(encoding="utf-8"))
if any(k in data for k in ("daemon", "recording", "stt", "injection")):
for k, v in data.items():
if hasattr(cfg, k):
setattr(cfg, k, v)
else:
cfg.daemon["hotkey"] = data.get("hotkey", cfg.daemon["hotkey"])
cfg.recording["input"] = data.get("input", cfg.recording["input"])
cfg.stt["model"] = data.get("whisper_model", cfg.stt["model"])
cfg.stt["device"] = data.get("whisper_device", cfg.stt["device"])
cfg.injection["backend"] = data.get("injection_backend", cfg.injection["backend"])
if not isinstance(cfg.daemon, dict):
cfg.daemon = {"hotkey": "Cmd+m"}
if not isinstance(cfg.recording, dict):
cfg.recording = {"input": ""}
if not isinstance(cfg.stt, dict):
cfg.stt = {"model": "base", "device": "cpu"}
if not isinstance(cfg.injection, dict):
cfg.injection = {"backend": "clipboard"}
if not isinstance(data, dict):
raise ValueError("config must be a JSON object")
cfg = _from_dict(data, cfg)
validate(cfg)
return cfg
def redacted_dict(cfg: Config) -> dict:
return cfg.__dict__.copy()
def redacted_dict(cfg: Config) -> dict[str, Any]:
return asdict(cfg)
def validate(cfg: Config) -> None:
if not cfg.daemon.get("hotkey"):
hotkey = cfg.daemon.hotkey.strip()
if not hotkey:
raise ValueError("daemon.hotkey cannot be empty")
if isinstance(cfg.recording.input, bool):
raise ValueError("recording.input cannot be boolean")
if not isinstance(cfg.recording.input, (str, int)) and cfg.recording.input is not None:
raise ValueError("recording.input must be string, integer, or null")
model = cfg.stt.model.strip()
if not model:
raise ValueError("stt.model cannot be empty")
device = cfg.stt.device.strip()
if not device:
raise ValueError("stt.device cannot be empty")
backend = cfg.injection.backend.strip().lower()
if backend not in ALLOWED_INJECTION_BACKENDS:
allowed = ", ".join(sorted(ALLOWED_INJECTION_BACKENDS))
raise ValueError(f"injection.backend must be one of: {allowed}")
cfg.injection.backend = backend
if not isinstance(cfg.ai.enabled, bool):
raise ValueError("ai.enabled must be boolean")
if not isinstance(cfg.logging.log_transcript, bool):
raise ValueError("logging.log_transcript must be boolean")
def _from_dict(data: dict[str, Any], cfg: Config) -> Config:
has_sections = any(
key in data for key in ("daemon", "recording", "stt", "injection", "ai", "logging")
)
if has_sections:
daemon = _ensure_dict(data.get("daemon"), "daemon")
recording = _ensure_dict(data.get("recording"), "recording")
stt = _ensure_dict(data.get("stt"), "stt")
injection = _ensure_dict(data.get("injection"), "injection")
ai = _ensure_dict(data.get("ai"), "ai")
logging_cfg = _ensure_dict(data.get("logging"), "logging")
if "hotkey" in daemon:
cfg.daemon.hotkey = _as_nonempty_str(daemon["hotkey"], "daemon.hotkey")
if "input" in recording:
cfg.recording.input = _as_recording_input(recording["input"])
if "model" in stt:
cfg.stt.model = _as_nonempty_str(stt["model"], "stt.model")
if "device" in stt:
cfg.stt.device = _as_nonempty_str(stt["device"], "stt.device")
if "backend" in injection:
cfg.injection.backend = _as_nonempty_str(injection["backend"], "injection.backend")
if "enabled" in ai:
cfg.ai.enabled = _as_bool(ai["enabled"], "ai.enabled")
if "log_transcript" in logging_cfg:
cfg.logging.log_transcript = _as_bool(logging_cfg["log_transcript"], "logging.log_transcript")
return cfg
if "hotkey" in data:
cfg.daemon.hotkey = _as_nonempty_str(data["hotkey"], "hotkey")
if "input" in data:
cfg.recording.input = _as_recording_input(data["input"])
if "whisper_model" in data:
cfg.stt.model = _as_nonempty_str(data["whisper_model"], "whisper_model")
if "whisper_device" in data:
cfg.stt.device = _as_nonempty_str(data["whisper_device"], "whisper_device")
if "injection_backend" in data:
cfg.injection.backend = _as_nonempty_str(data["injection_backend"], "injection_backend")
if "ai_enabled" in data:
cfg.ai.enabled = _as_bool(data["ai_enabled"], "ai_enabled")
if "log_transcript" in data:
cfg.logging.log_transcript = _as_bool(data["log_transcript"], "log_transcript")
return cfg
def _ensure_dict(value: Any, field_name: str) -> dict[str, Any]:
if value is None:
return {}
if not isinstance(value, dict):
raise ValueError(f"{field_name} must be an object")
return value
def _as_nonempty_str(value: Any, field_name: str) -> str:
if not isinstance(value, str):
raise ValueError(f"{field_name} must be a string")
if not value.strip():
raise ValueError(f"{field_name} cannot be empty")
return value
def _as_bool(value: Any, field_name: str) -> bool:
if not isinstance(value, bool):
raise ValueError(f"{field_name} must be boolean")
return value
def _as_recording_input(value: Any) -> str | int | None:
if value is None:
return None
if isinstance(value, bool):
raise ValueError("recording.input cannot be boolean")
if isinstance(value, (str, int)):
return value
raise ValueError("recording.input must be string, integer, or null")

View file

@ -14,4 +14,3 @@ MODEL_URL = (
)
MODEL_DIR = Path.home() / ".cache" / "lel" / "models"
MODEL_PATH = MODEL_DIR / MODEL_NAME
LLM_LANGUAGE = "en"

View file

@ -14,12 +14,13 @@ class DesktopAdapter(Protocol):
def run_tray(self, state_getter: Callable[[], str], on_quit: Callable[[], None]) -> None:
raise NotImplementedError
def request_quit(self) -> None:
raise NotImplementedError
def get_desktop_adapter() -> DesktopAdapter:
session_type = os.getenv("XDG_SESSION_TYPE", "").lower()
if session_type == "wayland" or os.getenv("WAYLAND_DISPLAY"):
from desktop_wayland import WaylandAdapter
raise SystemExit(
"Wayland is not supported yet. Run under X11 (XDG_SESSION_TYPE=x11) to use lel."
)

View file

@ -12,3 +12,6 @@ class WaylandAdapter:
def run_tray(self, _state_getter: Callable[[], str], _on_quit: Callable[[], None]) -> None:
raise SystemExit("Wayland tray support is not available yet.")
def request_quit(self) -> None:
return

View file

@ -80,7 +80,7 @@ class X11Adapter:
def run_tray(self, state_getter: Callable[[], str], on_quit: Callable[[], None]) -> None:
self.menu = Gtk.Menu()
quit_item = Gtk.MenuItem(label="Quit")
quit_item.connect("activate", lambda *_: on_quit())
quit_item.connect("activate", lambda *_: self._handle_quit(on_quit))
self.menu.append(quit_item)
self.menu.show_all()
if self.indicator is not None:
@ -90,24 +90,39 @@ class X11Adapter:
GLib.timeout_add(TRAY_UPDATE_MS, self._update_tray, state_getter)
Gtk.main()
def _listen(self, hotkey: str, callback: Callable[[], None]) -> None:
disp = display.Display()
root = disp.screen().root
mods, keysym = self._parse_hotkey(hotkey)
keycode = self._grab_hotkey(disp, root, mods, keysym)
def request_quit(self) -> None:
GLib.idle_add(Gtk.main_quit)
def _handle_quit(self, on_quit: Callable[[], None]) -> None:
try:
on_quit()
finally:
self.request_quit()
def _listen(self, hotkey: str, callback: Callable[[], None]) -> None:
disp = None
root = None
keycode = None
try:
disp = display.Display()
root = disp.screen().root
mods, keysym = self._parse_hotkey(hotkey)
keycode = self._grab_hotkey(disp, root, mods, keysym)
while True:
ev = disp.next_event()
if ev.type == X.KeyPress and ev.detail == keycode:
state = ev.state & ~(X.LockMask | X.Mod2Mask)
if state == mods:
callback()
except Exception as exc:
logging.error("hotkey listener stopped: %s", exc)
finally:
try:
root.ungrab_key(keycode, X.AnyModifier)
disp.sync()
except Exception:
pass
if root is not None and keycode is not None and disp is not None:
try:
root.ungrab_key(keycode, X.AnyModifier)
disp.sync()
except Exception:
pass
def _parse_hotkey(self, hotkey: str):
parts = [p.strip() for p in hotkey.split("+") if p.strip()]

View file

@ -1,5 +1,8 @@
#!/usr/bin/env python3
from __future__ import annotations
import argparse
import errno
import json
import logging
import os
@ -8,15 +11,14 @@ import sys
import threading
import time
from pathlib import Path
from typing import Any
import gi
from faster_whisper import WhisperModel
from aiprocess import LlamaProcessor
from config import Config, load, redacted_dict
from constants import RECORD_TIMEOUT_SEC, STT_LANGUAGE
from recorder import start_recording, stop_recording
from aiprocess import LlamaProcessor
from desktop import get_desktop_adapter
from recorder import start_recording as start_audio_recording
from recorder import stop_recording as stop_audio_recording
class State:
@ -27,11 +29,26 @@ class State:
OUTPUTTING = "outputting"
_LOCK_HANDLE = None
def _build_whisper_model(model_name: str, device: str):
try:
from faster_whisper import WhisperModel # type: ignore[import-not-found]
except ModuleNotFoundError as exc:
raise RuntimeError(
"faster-whisper is not installed; install dependencies with `uv sync`"
) from exc
return WhisperModel(
model_name,
device=device,
compute_type=_compute_type(device),
)
def _compute_type(device: str) -> str:
dev = (device or "cpu").lower()
if dev == "cuda":
if dev.startswith("cuda"):
return "float16"
return "int8"
@ -40,17 +57,20 @@ class Daemon:
def __init__(self, cfg: Config, desktop, *, verbose: bool = False):
self.cfg = cfg
self.desktop = desktop
self.verbose = verbose
self.lock = threading.Lock()
self._shutdown_requested = threading.Event()
self.state = State.IDLE
self.proc = None
self.stream = None
self.record = None
self.timer = None
self.model = WhisperModel(
cfg.stt.get("model", "base"),
device=cfg.stt.get("device", "cpu"),
compute_type=_compute_type(cfg.stt.get("device", "cpu")),
self.timer: threading.Timer | None = None
self.model = _build_whisper_model(
cfg.stt.model,
cfg.stt.device,
)
self.ai_processor = LlamaProcessor(verbose=verbose)
self.ai_enabled = cfg.ai.enabled
self.ai_processor: LlamaProcessor | None = None
self.log_transcript = cfg.logging.log_transcript or verbose
def set_state(self, state: str):
with self.lock:
@ -63,29 +83,39 @@ class Daemon:
with self.lock:
return self.state
def _quit(self):
os._exit(0)
def request_shutdown(self):
self._shutdown_requested.set()
def toggle(self):
should_stop = False
with self.lock:
if self._shutdown_requested.is_set():
logging.info("shutdown in progress, trigger ignored")
return
if self.state == State.IDLE:
self._start_recording_locked()
return
if self.state == State.RECORDING:
self.state = State.STT
threading.Thread(target=self._stop_and_process, daemon=True).start()
return
logging.info("busy (%s), trigger ignored", self.state)
should_stop = True
else:
logging.info("busy (%s), trigger ignored", self.state)
if should_stop:
self.stop_recording(trigger="user")
def _start_recording_locked(self):
if self.state != State.IDLE:
logging.info("busy (%s), trigger ignored", self.state)
return
try:
proc, record = start_recording(self.cfg.recording.get("input", ""))
stream, record = start_audio_recording(self.cfg.recording.input)
except Exception as exc:
logging.error("record start failed: %s", exc)
return
self.proc = proc
self.stream = stream
self.record = record
prev = self.state
self.state = State.RECORDING
logging.info("state: %s -> %s", prev, self.state)
logging.info("recording started")
if self.timer:
self.timer.cancel()
@ -94,30 +124,45 @@ class Daemon:
self.timer.start()
def _timeout_stop(self):
with self.lock:
if self.state != State.RECORDING:
return
self.state = State.STT
threading.Thread(target=self._stop_and_process, daemon=True).start()
self.stop_recording(trigger="timeout")
def _stop_and_process(self):
proc = self.proc
def _start_stop_worker(self, stream: Any, record: Any, trigger: str, process_audio: bool):
threading.Thread(
target=self._stop_and_process,
args=(stream, record, trigger, process_audio),
daemon=True,
).start()
def _begin_stop_locked(self):
if self.state != State.RECORDING:
return None
stream = self.stream
record = self.record
self.proc = None
self.stream = None
self.record = None
if self.timer:
self.timer.cancel()
self.timer = None
prev = self.state
self.state = State.STT
logging.info("state: %s -> %s", prev, self.state)
if not proc or not record:
if stream is None or record is None:
logging.warning("recording resources are unavailable during stop")
self.state = State.IDLE
return None
return stream, record
def _stop_and_process(self, stream: Any, record: Any, trigger: str, process_audio: bool):
logging.info("stopping recording (%s)", trigger)
try:
audio = stop_audio_recording(stream, record)
except Exception as exc:
logging.error("record stop failed: %s", exc)
self.set_state(State.IDLE)
return
logging.info("stopping recording (user)")
try:
audio = stop_recording(proc, record)
except Exception as exc:
logging.error("record stop failed: %s", exc)
if not process_audio or self._shutdown_requested.is_set():
self.set_state(State.IDLE)
return
@ -140,35 +185,64 @@ class Daemon:
self.set_state(State.IDLE)
return
logging.info("stt: %s", text)
if self.log_transcript:
logging.info("stt: %s", text)
else:
logging.info("stt produced %d chars", len(text))
self.set_state(State.PROCESSING)
logging.info("ai processing started")
try:
ai_input = text
text = self.ai_processor.process(ai_input) or text
except Exception as exc:
logging.error("ai process failed: %s", exc)
if self.ai_enabled and not self._shutdown_requested.is_set():
self.set_state(State.PROCESSING)
logging.info("ai processing started")
try:
processor = self._get_ai_processor()
ai_text = processor.process(text)
if ai_text and ai_text.strip():
text = ai_text.strip()
except Exception as exc:
logging.error("ai process failed: %s", exc)
else:
logging.info("ai processing disabled")
logging.info("processed: %s", text)
if self.log_transcript:
logging.info("processed: %s", text)
else:
logging.info("processed text length: %d", len(text))
if self._shutdown_requested.is_set():
self.set_state(State.IDLE)
return
try:
self.set_state(State.OUTPUTTING)
logging.info("outputting started")
backend = self.cfg.injection.get("backend", "clipboard")
backend = self.cfg.injection.backend
self.desktop.inject_text(text, backend)
except Exception as exc:
logging.error("output failed: %s", exc)
finally:
self.set_state(State.IDLE)
def stop_recording(self):
def stop_recording(self, *, trigger: str = "user", process_audio: bool = True):
payload = None
with self.lock:
if self.state != State.RECORDING:
return
self.state = State.STT
threading.Thread(target=self._stop_and_process, daemon=True).start()
payload = self._begin_stop_locked()
if payload is None:
return
stream, record = payload
self._start_stop_worker(stream, record, trigger, process_audio)
def shutdown(self, timeout: float = 5.0) -> bool:
self.request_shutdown()
self.stop_recording(trigger="shutdown", process_audio=False)
return self.wait_for_idle(timeout)
def wait_for_idle(self, timeout: float) -> bool:
end = time.time() + timeout
while time.time() < end:
if self.get_state() == State.IDLE:
return True
time.sleep(0.05)
return self.get_state() == State.IDLE
def _transcribe(self, audio) -> str:
segments, _info = self.model.transcribe(audio, language=STT_LANGUAGE, vad_filter=True)
@ -179,22 +253,50 @@ class Daemon:
parts.append(text)
return " ".join(parts).strip()
def _get_ai_processor(self) -> LlamaProcessor:
if self.ai_processor is None:
self.ai_processor = LlamaProcessor(verbose=self.verbose)
return self.ai_processor
def _read_lock_pid(lock_file) -> str:
lock_file.seek(0)
return lock_file.read().strip()
def _lock_single_instance():
runtime_dir = Path(os.getenv("XDG_RUNTIME_DIR", "/tmp")) / "lel"
runtime_dir.mkdir(parents=True, exist_ok=True)
lock_path = runtime_dir / "lel.lock"
f = open(lock_path, "w")
lock_file = open(lock_path, "a+", encoding="utf-8")
try:
import fcntl
fcntl.flock(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
except Exception:
# TODO: kindly try to handle the running PID to the user cleanly in stdout if it's easy to get
raise SystemExit("already running")
return f
fcntl.flock(lock_file, fcntl.LOCK_EX | fcntl.LOCK_NB)
except BlockingIOError as exc:
pid = _read_lock_pid(lock_file)
lock_file.close()
if pid:
raise SystemExit(f"already running (pid={pid})") from exc
raise SystemExit("already running") from exc
except OSError as exc:
if exc.errno in (errno.EACCES, errno.EAGAIN):
pid = _read_lock_pid(lock_file)
lock_file.close()
if pid:
raise SystemExit(f"already running (pid={pid})") from exc
raise SystemExit("already running") from exc
raise
lock_file.seek(0)
lock_file.truncate()
lock_file.write(f"{os.getpid()}\n")
lock_file.flush()
return lock_file
def main():
global _LOCK_HANDLE
parser = argparse.ArgumentParser()
parser.add_argument("--config", default="", help="path to config.json")
parser.add_argument("--dry-run", action="store_true", help="log hotkey only")
@ -207,37 +309,50 @@ def main():
format="lel: %(asctime)s %(levelname)s %(message)s",
)
cfg = load(args.config)
_lock_single_instance()
_LOCK_HANDLE = _lock_single_instance()
logging.info("hotkey: %s", cfg.daemon.get("hotkey", ""))
logging.info("config (%s):\n%s", args.config or str(Path.home() / ".config" / "lel" / "config.json"), json.dumps(redacted_dict(cfg), indent=2))
logging.info("hotkey: %s", cfg.daemon.hotkey)
logging.info(
"config (%s):\n%s",
args.config or str(Path.home() / ".config" / "lel" / "config.json"),
json.dumps(redacted_dict(cfg), indent=2),
)
if args.verbose:
logging.getLogger().setLevel(logging.DEBUG)
desktop = get_desktop_adapter()
try:
desktop = get_desktop_adapter()
daemon = Daemon(cfg, desktop, verbose=args.verbose)
except Exception as exc:
logging.error("startup failed: %s", exc)
raise SystemExit(1)
shutdown_once = threading.Event()
def shutdown(reason: str):
if shutdown_once.is_set():
return
shutdown_once.set()
logging.info("%s, shutting down", reason)
if not daemon.shutdown(timeout=5.0):
logging.warning("timed out waiting for idle state during shutdown")
desktop.request_quit()
def handle_signal(_sig, _frame):
logging.info("signal received, shutting down")
daemon.stop_recording()
end = time.time() + 5
while time.time() < end and daemon.get_state() != State.IDLE:
time.sleep(0.1)
os._exit(0)
threading.Thread(target=shutdown, args=("signal received",), daemon=True).start()
signal.signal(signal.SIGINT, handle_signal)
signal.signal(signal.SIGTERM, handle_signal)
desktop.start_hotkey_listener(
cfg.daemon.get("hotkey", ""),
cfg.daemon.hotkey,
lambda: logging.info("hotkey pressed (dry-run)") if args.dry_run else daemon.toggle(),
)
logging.info("ready")
desktop.run_tray(daemon.get_state, daemon._quit)
try:
desktop.run_tray(daemon.get_state, lambda: shutdown("quit requested"))
finally:
daemon.shutdown(timeout=1.0)
if __name__ == "__main__":

View file

@ -1,8 +1,7 @@
from dataclasses import dataclass, field
from typing import Iterable
from typing import Any, Iterable
import numpy as np
import sounddevice as sd # type: ignore[import-not-found]
@dataclass
@ -14,6 +13,7 @@ class RecordResult:
def list_input_devices() -> list[dict]:
sd = _sounddevice()
devices = []
for idx, info in enumerate(sd.query_devices()):
if info.get("max_input_channels", 0) > 0:
@ -22,6 +22,7 @@ def list_input_devices() -> list[dict]:
def default_input_device() -> int | None:
sd = _sounddevice()
default = sd.default.device
if isinstance(default, (tuple, list)) and default:
return default[0]
@ -48,7 +49,8 @@ def resolve_input_device(spec: str | int | None) -> int | None:
return None
def start_recording(input_spec: str | int | None) -> tuple[sd.InputStream, RecordResult]:
def start_recording(input_spec: str | int | None) -> tuple[Any, RecordResult]:
sd = _sounddevice()
record = RecordResult()
device = resolve_input_device(input_spec)
@ -66,13 +68,23 @@ def start_recording(input_spec: str | int | None) -> tuple[sd.InputStream, Recor
return stream, record
def stop_recording(stream: sd.InputStream, record: RecordResult) -> np.ndarray:
def stop_recording(stream: Any, record: RecordResult) -> np.ndarray:
if stream:
stream.stop()
stream.close()
return _flatten_frames(record.frames)
def _sounddevice():
try:
import sounddevice as sd # type: ignore[import-not-found]
except ModuleNotFoundError as exc:
raise RuntimeError(
"sounddevice is not installed; install dependencies with `uv sync --extra x11`"
) from exc
return sd
def _flatten_frames(frames: Iterable[np.ndarray]) -> np.ndarray:
frames = list(frames)
if not frames: