#!/usr/bin/env python3 from __future__ import annotations import argparse import errno import inspect import json import logging import os import signal import sys import threading import time from pathlib import Path from typing import Any from aiprocess import LlamaProcessor from config import Config, ConfigValidationError, load, redacted_dict, validate from constants import DEFAULT_CONFIG_PATH, MODEL_PATH, RECORD_TIMEOUT_SEC, STT_LANGUAGE from desktop import get_desktop_adapter from diagnostics import run_diagnostics from recorder import start_recording as start_audio_recording from recorder import stop_recording as stop_audio_recording from vocabulary import VocabularyEngine class State: IDLE = "idle" RECORDING = "recording" STT = "stt" PROCESSING = "processing" OUTPUTTING = "outputting" _LOCK_HANDLE = None def _build_whisper_model(model_name: str, device: str): try: from faster_whisper import WhisperModel # type: ignore[import-not-found] except ModuleNotFoundError as exc: raise RuntimeError( "faster-whisper is not installed; install dependencies with `uv sync`" ) from exc return WhisperModel( model_name, device=device, compute_type=_compute_type(device), ) def _compute_type(device: str) -> str: dev = (device or "cpu").lower() if dev.startswith("cuda"): return "float16" return "int8" class Daemon: def __init__(self, cfg: Config, desktop, *, verbose: bool = False): self.cfg = cfg self.desktop = desktop self.verbose = verbose self.lock = threading.Lock() self._shutdown_requested = threading.Event() self.state = State.IDLE self.stream = None self.record = None self.timer: threading.Timer | None = None self.model = _build_whisper_model( cfg.stt.model, cfg.stt.device, ) logging.info("initializing ai processor") self.ai_processor = LlamaProcessor(verbose=self.verbose) logging.info("ai processor ready") self.log_transcript = verbose self.vocabulary = VocabularyEngine(cfg.vocabulary) self._stt_hint_kwargs_cache: dict[str, Any] | None = None def _arm_cancel_listener(self) -> bool: try: self.desktop.start_cancel_listener(lambda: self.cancel_recording()) return True except Exception as exc: logging.error("failed to start cancel listener: %s", exc) return False def _disarm_cancel_listener(self): try: self.desktop.stop_cancel_listener() except Exception as exc: logging.debug("failed to stop cancel listener: %s", exc) def set_state(self, state: str): with self.lock: prev = self.state self.state = state if prev != state: logging.debug("state: %s -> %s", prev, state) else: logging.debug("redundant state set: %s", state) def get_state(self): with self.lock: return self.state def request_shutdown(self): self._shutdown_requested.set() def toggle(self): should_stop = False with self.lock: if self._shutdown_requested.is_set(): logging.info("shutdown in progress, trigger ignored") return if self.state == State.IDLE: self._start_recording_locked() return if self.state == State.RECORDING: should_stop = True else: logging.info("busy (%s), trigger ignored", self.state) if should_stop: self.stop_recording(trigger="user") def _start_recording_locked(self): if self.state != State.IDLE: logging.info("busy (%s), trigger ignored", self.state) return try: stream, record = start_audio_recording(self.cfg.recording.input) except Exception as exc: logging.error("record start failed: %s", exc) return if not self._arm_cancel_listener(): try: stream.stop() except Exception: pass try: stream.close() except Exception: pass logging.error("recording start aborted because cancel listener is unavailable") return self.stream = stream self.record = record prev = self.state self.state = State.RECORDING logging.debug("state: %s -> %s", prev, self.state) logging.info("recording started") if self.timer: self.timer.cancel() self.timer = threading.Timer(RECORD_TIMEOUT_SEC, self._timeout_stop) self.timer.daemon = True self.timer.start() def _timeout_stop(self): self.stop_recording(trigger="timeout") def _start_stop_worker(self, stream: Any, record: Any, trigger: str, process_audio: bool): threading.Thread( target=self._stop_and_process, args=(stream, record, trigger, process_audio), daemon=True, ).start() def _begin_stop_locked(self): if self.state != State.RECORDING: return None stream = self.stream record = self.record self.stream = None self.record = None if self.timer: self.timer.cancel() self.timer = None self._disarm_cancel_listener() prev = self.state self.state = State.STT logging.debug("state: %s -> %s", prev, self.state) if stream is None or record is None: logging.warning("recording resources are unavailable during stop") self.state = State.IDLE return None return stream, record def _stop_and_process(self, stream: Any, record: Any, trigger: str, process_audio: bool): logging.info("stopping recording (%s)", trigger) try: audio = stop_audio_recording(stream, record) except Exception as exc: logging.error("record stop failed: %s", exc) self.set_state(State.IDLE) return if not process_audio or self._shutdown_requested.is_set(): self.set_state(State.IDLE) return if audio.size == 0: logging.error("no audio captured") self.set_state(State.IDLE) return try: logging.info("stt started") text = self._transcribe(audio) except Exception as exc: logging.error("stt failed: %s", exc) self.set_state(State.IDLE) return text = (text or "").strip() if not text: self.set_state(State.IDLE) return if self.log_transcript: logging.debug("stt: %s", text) else: logging.info("stt produced %d chars", len(text)) if not self._shutdown_requested.is_set(): self.set_state(State.PROCESSING) logging.info("ai processing started") try: processor = self._get_ai_processor() ai_text = processor.process( text, lang=STT_LANGUAGE, dictionary_context=self.vocabulary.build_ai_dictionary_context(), ) if ai_text and ai_text.strip(): text = ai_text.strip() except Exception as exc: logging.error("ai process failed: %s", exc) text = self.vocabulary.apply_deterministic_replacements(text).strip() if self.log_transcript: logging.debug("processed: %s", text) else: logging.info("processed text length: %d", len(text)) if self._shutdown_requested.is_set(): self.set_state(State.IDLE) return try: self.set_state(State.OUTPUTTING) logging.info("outputting started") backend = self.cfg.injection.backend self.desktop.inject_text( text, backend, remove_transcription_from_clipboard=( self.cfg.injection.remove_transcription_from_clipboard ), ) except Exception as exc: logging.error("output failed: %s", exc) finally: self.set_state(State.IDLE) def stop_recording(self, *, trigger: str = "user", process_audio: bool = True): payload = None with self.lock: payload = self._begin_stop_locked() if payload is None: return stream, record = payload self._start_stop_worker(stream, record, trigger, process_audio) def cancel_recording(self): with self.lock: if self.state != State.RECORDING: return self.stop_recording(trigger="cancel", process_audio=False) def shutdown(self, timeout: float = 5.0) -> bool: self.request_shutdown() self._disarm_cancel_listener() self.stop_recording(trigger="shutdown", process_audio=False) return self.wait_for_idle(timeout) def wait_for_idle(self, timeout: float) -> bool: end = time.time() + timeout while time.time() < end: if self.get_state() == State.IDLE: return True time.sleep(0.05) return self.get_state() == State.IDLE def _transcribe(self, audio) -> str: kwargs: dict[str, Any] = { "language": STT_LANGUAGE, "vad_filter": True, } kwargs.update(self._stt_hint_kwargs()) segments, _info = self.model.transcribe(audio, **kwargs) parts = [] for seg in segments: text = (seg.text or "").strip() if text: parts.append(text) return " ".join(parts).strip() def _get_ai_processor(self) -> LlamaProcessor: if self.ai_processor is None: raise RuntimeError("ai processor is not initialized") return self.ai_processor def _stt_hint_kwargs(self) -> dict[str, Any]: if self._stt_hint_kwargs_cache is not None: return self._stt_hint_kwargs_cache hotwords, initial_prompt = self.vocabulary.build_stt_hints() if not hotwords and not initial_prompt: self._stt_hint_kwargs_cache = {} return self._stt_hint_kwargs_cache try: signature = inspect.signature(self.model.transcribe) except (TypeError, ValueError): logging.debug("stt signature inspection failed; skipping hints") self._stt_hint_kwargs_cache = {} return self._stt_hint_kwargs_cache params = signature.parameters kwargs: dict[str, Any] = {} if hotwords and "hotwords" in params: kwargs["hotwords"] = hotwords if initial_prompt and "initial_prompt" in params: kwargs["initial_prompt"] = initial_prompt if not kwargs: logging.debug("stt hint arguments are not supported by this whisper runtime") self._stt_hint_kwargs_cache = kwargs return self._stt_hint_kwargs_cache def _read_lock_pid(lock_file) -> str: lock_file.seek(0) return lock_file.read().strip() def _lock_single_instance(): runtime_dir = Path(os.getenv("XDG_RUNTIME_DIR", "/tmp")) / "aman" runtime_dir.mkdir(parents=True, exist_ok=True) lock_path = runtime_dir / "aman.lock" lock_file = open(lock_path, "a+", encoding="utf-8") try: import fcntl fcntl.flock(lock_file, fcntl.LOCK_EX | fcntl.LOCK_NB) except BlockingIOError as exc: pid = _read_lock_pid(lock_file) lock_file.close() if pid: raise SystemExit(f"already running (pid={pid})") from exc raise SystemExit("already running") from exc except OSError as exc: if exc.errno in (errno.EACCES, errno.EAGAIN): pid = _read_lock_pid(lock_file) lock_file.close() if pid: raise SystemExit(f"already running (pid={pid})") from exc raise SystemExit("already running") from exc raise lock_file.seek(0) lock_file.truncate() lock_file.write(f"{os.getpid()}\n") lock_file.flush() return lock_file def _build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser() subparsers = parser.add_subparsers(dest="command") run_parser = subparsers.add_parser("run", help="run the aman daemon") run_parser.add_argument("--config", default="", help="path to config.json") run_parser.add_argument("--dry-run", action="store_true", help="log hotkey only") run_parser.add_argument("-v", "--verbose", action="store_true", help="enable verbose logs") doctor_parser = subparsers.add_parser("doctor", help="run startup diagnostics") doctor_parser.add_argument("--config", default="", help="path to config.json") doctor_parser.add_argument("--json", action="store_true", help="print JSON output") doctor_parser.add_argument("-v", "--verbose", action="store_true", help="enable verbose logs") init_parser = subparsers.add_parser("init", help="write a default config") init_parser.add_argument("--config", default="", help="path to config.json") init_parser.add_argument("--force", action="store_true", help="overwrite existing config") return parser def _parse_cli_args(argv: list[str]) -> argparse.Namespace: parser = _build_parser() normalized_argv = list(argv) known_commands = {"run", "doctor", "init"} if not normalized_argv or normalized_argv[0] not in known_commands: normalized_argv = ["run", *normalized_argv] return parser.parse_args(normalized_argv) def _configure_logging(verbose: bool) -> None: logging.basicConfig( stream=sys.stderr, level=logging.DEBUG if verbose else logging.INFO, format="aman: %(asctime)s %(levelname)s %(message)s", ) def _doctor_command(args: argparse.Namespace) -> int: report = run_diagnostics(args.config) if args.json: print(report.to_json()) else: for check in report.checks: status = "OK" if check.ok else "FAIL" line = f"[{status}] {check.id}: {check.message}" if check.hint: line = f"{line} | hint: {check.hint}" print(line) print(f"overall: {'ok' if report.ok else 'failed'}") return 0 if report.ok else 2 def _init_command(args: argparse.Namespace) -> int: config_path = Path(args.config) if args.config else DEFAULT_CONFIG_PATH if config_path.exists() and not args.force: logging.error("init failed: config already exists at %s (use --force to overwrite)", config_path) return 1 cfg = Config() validate(cfg) config_path.parent.mkdir(parents=True, exist_ok=True) config_path.write_text(f"{json.dumps(redacted_dict(cfg), indent=2)}\n", encoding="utf-8") logging.info("wrote default config to %s", config_path) return 0 def _run_command(args: argparse.Namespace) -> int: global _LOCK_HANDLE try: cfg = load(args.config) except ConfigValidationError as exc: logging.error("startup failed: invalid config field '%s': %s", exc.field, exc.reason) if exc.example_fix: logging.error("example fix: %s", exc.example_fix) return 1 except Exception as exc: logging.error("startup failed: %s", exc) return 1 _LOCK_HANDLE = _lock_single_instance() logging.info("hotkey: %s", cfg.daemon.hotkey) logging.info( "config (%s):\n%s", args.config or str(Path.home() / ".config" / "aman" / "config.json"), json.dumps(redacted_dict(cfg), indent=2), ) logging.info( "runtime: pid=%s session=%s display=%s wayland_display=%s verbose=%s dry_run=%s", os.getpid(), os.getenv("XDG_SESSION_TYPE", ""), os.getenv("DISPLAY", ""), os.getenv("WAYLAND_DISPLAY", ""), args.verbose, args.dry_run, ) logging.info("model cache path: %s", MODEL_PATH) try: desktop = get_desktop_adapter() daemon = Daemon(cfg, desktop, verbose=args.verbose) except Exception as exc: logging.error("startup failed: %s", exc) return 1 shutdown_once = threading.Event() def shutdown(reason: str): if shutdown_once.is_set(): return shutdown_once.set() logging.info("%s, shutting down", reason) if not daemon.shutdown(timeout=5.0): logging.warning("timed out waiting for idle state during shutdown") desktop.request_quit() def handle_signal(_sig, _frame): threading.Thread(target=shutdown, args=("signal received",), daemon=True).start() signal.signal(signal.SIGINT, handle_signal) signal.signal(signal.SIGTERM, handle_signal) try: desktop.start_hotkey_listener( cfg.daemon.hotkey, lambda: logging.info("hotkey pressed (dry-run)") if args.dry_run else daemon.toggle(), ) except Exception as exc: logging.error("hotkey setup failed: %s", exc) return 1 logging.info("ready") try: desktop.run_tray(daemon.get_state, lambda: shutdown("quit requested")) finally: daemon.shutdown(timeout=1.0) return 0 def main(argv: list[str] | None = None) -> int: args = _parse_cli_args(list(argv) if argv is not None else sys.argv[1:]) if args.command == "run": _configure_logging(args.verbose) return _run_command(args) if args.command == "doctor": _configure_logging(args.verbose) return _doctor_command(args) if args.command == "init": _configure_logging(False) return _init_command(args) raise RuntimeError(f"unsupported command: {args.command}") if __name__ == "__main__": raise SystemExit(main())