aman/tests/test_aman.py

446 lines
16 KiB
Python

import json
import os
import sys
import tempfile
import unittest
from pathlib import Path
from unittest.mock import patch
ROOT = Path(__file__).resolve().parents[1]
SRC = ROOT / "src"
if str(SRC) not in sys.path:
sys.path.insert(0, str(SRC))
import aman
from config import Config, VocabularyReplacement, redacted_dict
from engine import PipelineBinding, PipelineOptions
class FakeDesktop:
def __init__(self):
self.inject_calls = []
self.hotkey_updates = []
self.hotkeys = {}
self.cancel_callback = None
self.quit_calls = 0
def set_hotkeys(self, bindings):
self.hotkeys = dict(bindings)
self.hotkey_updates.append(tuple(sorted(bindings.keys())))
def start_cancel_listener(self, callback):
self.cancel_callback = callback
def inject_text(
self,
text: str,
backend: str,
*,
remove_transcription_from_clipboard: bool = False,
) -> None:
self.inject_calls.append((text, backend, remove_transcription_from_clipboard))
def request_quit(self) -> None:
self.quit_calls += 1
class FakeSegment:
def __init__(self, text: str):
self.text = text
class FakeModel:
def __init__(self, text: str = "hello world"):
self.text = text
self.last_kwargs = {}
def transcribe(self, _audio, language=None, vad_filter=None):
self.last_kwargs = {
"language": language,
"vad_filter": vad_filter,
}
return [FakeSegment(self.text)], self.last_kwargs
class FakeHintModel:
def __init__(self, text: str = "hello world"):
self.text = text
self.last_kwargs = {}
def transcribe(
self,
_audio,
language=None,
vad_filter=None,
hotwords=None,
initial_prompt=None,
):
self.last_kwargs = {
"language": language,
"vad_filter": vad_filter,
"hotwords": hotwords,
"initial_prompt": initial_prompt,
}
return [FakeSegment(self.text)], self.last_kwargs
class FakeAIProcessor:
def process(self, text, lang="en", **_kwargs):
return text
def chat(self, *, system_prompt, user_prompt, llm_opts=None):
_ = system_prompt
opts = llm_opts or {}
if "response_format" in opts:
payload = json.loads(user_prompt)
transcript = payload.get("transcript", "")
return json.dumps({"cleaned_text": transcript})
return "general"
class FakeAudio:
def __init__(self, size: int):
self.size = size
class FakeNotifier:
def __init__(self):
self.events = []
def send(self, title, body, *, error=False):
self.events.append((title, body, error))
return True
class DaemonTests(unittest.TestCase):
def _config(self) -> Config:
cfg = Config()
return cfg
def _build_daemon(
self,
desktop: FakeDesktop,
model: FakeModel | FakeHintModel,
*,
cfg: Config | None = None,
verbose: bool = False,
) -> aman.Daemon:
active_cfg = cfg if cfg is not None else self._config()
with patch("aman._build_whisper_model", return_value=model), patch(
"aman.LlamaProcessor", return_value=FakeAIProcessor()
):
return aman.Daemon(active_cfg, desktop, verbose=verbose)
@patch("aman.stop_audio_recording", return_value=FakeAudio(8))
@patch("aman.start_audio_recording", return_value=(object(), object()))
def test_hotkey_start_stop_injects_text(self, _start_mock, _stop_mock):
desktop = FakeDesktop()
daemon = self._build_daemon(desktop, FakeModel(), verbose=False)
daemon._start_stop_worker = (
lambda stream, record, trigger, process_audio: daemon._stop_and_process(
stream,
record,
trigger,
process_audio,
daemon._pending_worker_hotkey,
)
)
daemon.handle_hotkey(daemon.cfg.daemon.hotkey)
self.assertEqual(daemon.get_state(), aman.State.RECORDING)
daemon.handle_hotkey(daemon.cfg.daemon.hotkey)
self.assertEqual(daemon.get_state(), aman.State.IDLE)
self.assertEqual(desktop.inject_calls, [("hello world", "clipboard", False)])
@patch("aman.stop_audio_recording", return_value=FakeAudio(8))
@patch("aman.start_audio_recording", return_value=(object(), object()))
def test_shutdown_stops_recording_without_injection(self, _start_mock, _stop_mock):
desktop = FakeDesktop()
daemon = self._build_daemon(desktop, FakeModel(), verbose=False)
daemon._start_stop_worker = (
lambda stream, record, trigger, process_audio: daemon._stop_and_process(
stream, record, trigger, process_audio
)
)
daemon.handle_hotkey(daemon.cfg.daemon.hotkey)
self.assertEqual(daemon.get_state(), aman.State.RECORDING)
self.assertTrue(daemon.shutdown(timeout=0.2))
self.assertEqual(daemon.get_state(), aman.State.IDLE)
self.assertEqual(desktop.inject_calls, [])
@patch("aman.stop_audio_recording", return_value=FakeAudio(8))
@patch("aman.start_audio_recording", return_value=(object(), object()))
def test_dictionary_replacement_applies_after_ai(self, _start_mock, _stop_mock):
desktop = FakeDesktop()
model = FakeModel(text="good morning martha")
cfg = self._config()
cfg.vocabulary.replacements = [VocabularyReplacement(source="Martha", target="Marta")]
daemon = self._build_daemon(desktop, model, cfg=cfg, verbose=False)
daemon._start_stop_worker = (
lambda stream, record, trigger, process_audio: daemon._stop_and_process(
stream, record, trigger, process_audio
)
)
daemon.handle_hotkey(daemon.cfg.daemon.hotkey)
daemon.handle_hotkey(daemon.cfg.daemon.hotkey)
self.assertEqual(desktop.inject_calls, [("good morning Marta", "clipboard", False)])
def test_transcribe_skips_hints_when_model_does_not_support_them(self):
desktop = FakeDesktop()
model = FakeModel(text="hello")
cfg = self._config()
cfg.vocabulary.terms = ["Docker", "Systemd"]
daemon = self._build_daemon(desktop, model, cfg=cfg, verbose=False)
result = daemon._transcribe(object())
self.assertEqual(result, "hello")
self.assertNotIn("hotwords", model.last_kwargs)
self.assertNotIn("initial_prompt", model.last_kwargs)
def test_transcribe_applies_hints_when_model_supports_them(self):
desktop = FakeDesktop()
model = FakeHintModel(text="hello")
cfg = self._config()
cfg.vocabulary.terms = ["Systemd"]
cfg.vocabulary.replacements = [VocabularyReplacement(source="docker", target="Docker")]
daemon = self._build_daemon(desktop, model, cfg=cfg, verbose=False)
result = daemon._transcribe(object())
self.assertEqual(result, "hello")
self.assertIn("Docker", model.last_kwargs["hotwords"])
self.assertIn("Systemd", model.last_kwargs["hotwords"])
self.assertIn("Preferred vocabulary", model.last_kwargs["initial_prompt"])
def test_verbose_flag_controls_transcript_logging(self):
desktop = FakeDesktop()
cfg = self._config()
daemon = self._build_daemon(desktop, FakeModel(), cfg=cfg, verbose=False)
self.assertFalse(daemon.log_transcript)
daemon_verbose = self._build_daemon(desktop, FakeModel(), cfg=cfg, verbose=True)
self.assertTrue(daemon_verbose.log_transcript)
def test_ai_processor_is_initialized_during_daemon_init(self):
desktop = FakeDesktop()
with patch("aman._build_whisper_model", return_value=FakeModel()), patch(
"aman.LlamaProcessor", return_value=FakeAIProcessor()
) as processor_cls:
daemon = aman.Daemon(self._config(), desktop, verbose=True)
processor_cls.assert_called_once_with(verbose=True)
self.assertIsNotNone(daemon.ai_processor)
@patch("aman.stop_audio_recording", return_value=FakeAudio(8))
@patch("aman.start_audio_recording", return_value=(object(), object()))
def test_passes_clipboard_remove_option_to_desktop(self, _start_mock, _stop_mock):
desktop = FakeDesktop()
model = FakeModel(text="hello world")
cfg = self._config()
cfg.injection.remove_transcription_from_clipboard = True
daemon = self._build_daemon(desktop, model, cfg=cfg, verbose=False)
daemon._start_stop_worker = (
lambda stream, record, trigger, process_audio: daemon._stop_and_process(
stream, record, trigger, process_audio
)
)
daemon.handle_hotkey(daemon.cfg.daemon.hotkey)
daemon.handle_hotkey(daemon.cfg.daemon.hotkey)
self.assertEqual(desktop.inject_calls, [("hello world", "clipboard", True)])
def test_state_changes_are_debug_level(self):
desktop = FakeDesktop()
daemon = self._build_daemon(desktop, FakeModel(), verbose=False)
with self.assertLogs(level="DEBUG") as logs:
daemon.set_state(aman.State.RECORDING)
self.assertTrue(
any("DEBUG:root:state: idle -> recording" in line for line in logs.output)
)
def test_hotkey_dispatches_to_matching_pipeline(self):
desktop = FakeDesktop()
daemon = self._build_daemon(desktop, FakeModel(), verbose=False)
daemon.set_pipeline_bindings(
{
"Super+m": PipelineBinding(
hotkey="Super+m",
handler=lambda _audio, _lib: "default",
options=PipelineOptions(failure_policy="best_effort"),
),
"Super+Shift+m": PipelineBinding(
hotkey="Super+Shift+m",
handler=lambda _audio, _lib: "caps",
options=PipelineOptions(failure_policy="best_effort"),
),
}
)
out = daemon._run_pipeline(object(), "Super+Shift+m")
self.assertEqual(out, "caps")
def test_try_apply_config_applies_new_runtime_settings(self):
desktop = FakeDesktop()
cfg = self._config()
daemon = self._build_daemon(desktop, FakeModel(), cfg=cfg, verbose=False)
daemon._stt_hint_kwargs_cache = {"hotwords": "old"}
candidate = Config()
candidate.injection.backend = "injection"
candidate.vocabulary.replacements = [
VocabularyReplacement(source="Martha", target="Marta"),
]
status, changed, error = daemon.try_apply_config(candidate)
self.assertEqual(status, "applied")
self.assertEqual(error, "")
self.assertIn("injection.backend", changed)
self.assertIn("vocabulary.replacements", changed)
self.assertEqual(daemon.cfg.injection.backend, "injection")
self.assertEqual(
daemon.vocabulary.apply_deterministic_replacements("Martha"),
"Marta",
)
self.assertIsNone(daemon._stt_hint_kwargs_cache)
def test_try_apply_config_reloads_stt_model(self):
desktop = FakeDesktop()
cfg = self._config()
daemon = self._build_daemon(desktop, FakeModel(), cfg=cfg, verbose=False)
candidate = Config()
candidate.stt.model = "small"
next_model = FakeModel(text="from-new-model")
with patch("aman._build_whisper_model", return_value=next_model):
status, changed, error = daemon.try_apply_config(candidate)
self.assertEqual(status, "applied")
self.assertEqual(error, "")
self.assertIn("stt.model", changed)
self.assertIs(daemon.model, next_model)
def test_try_apply_config_is_deferred_while_busy(self):
desktop = FakeDesktop()
daemon = self._build_daemon(desktop, FakeModel(), verbose=False)
daemon.set_state(aman.State.RECORDING)
status, changed, error = daemon.try_apply_config(Config())
self.assertEqual(status, "deferred")
self.assertEqual(changed, [])
self.assertIn("busy", error)
class ConfigReloaderTests(unittest.TestCase):
def _write_config(self, path: Path, cfg: Config):
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(redacted_dict(cfg)), encoding="utf-8")
def _daemon_for_path(self, path: Path) -> tuple[aman.Daemon, FakeDesktop]:
cfg = Config()
self._write_config(path, cfg)
desktop = FakeDesktop()
with patch("aman._build_whisper_model", return_value=FakeModel()), patch(
"aman.LlamaProcessor", return_value=FakeAIProcessor()
):
daemon = aman.Daemon(cfg, desktop, verbose=False)
return daemon, desktop
def test_reloader_applies_changed_config(self):
with tempfile.TemporaryDirectory() as td:
path = Path(td) / "config.json"
daemon, _desktop = self._daemon_for_path(path)
notifier = FakeNotifier()
reloader = aman.ConfigReloader(
daemon=daemon,
config_path=path,
notifier=notifier,
poll_interval_sec=0.01,
)
updated = Config()
updated.injection.backend = "injection"
self._write_config(path, updated)
reloader.tick()
self.assertEqual(daemon.cfg.injection.backend, "injection")
self.assertTrue(any(evt[0] == "Config Reloaded" and not evt[2] for evt in notifier.events))
def test_reloader_keeps_last_good_config_when_invalid(self):
with tempfile.TemporaryDirectory() as td:
path = Path(td) / "config.json"
daemon, _desktop = self._daemon_for_path(path)
notifier = FakeNotifier()
reloader = aman.ConfigReloader(
daemon=daemon,
config_path=path,
notifier=notifier,
poll_interval_sec=0.01,
)
path.write_text('{"injection":{"backend":"invalid"}}', encoding="utf-8")
reloader.tick()
self.assertEqual(daemon.cfg.injection.backend, "clipboard")
fail_events = [evt for evt in notifier.events if evt[0] == "Config Reload Failed"]
self.assertEqual(len(fail_events), 1)
reloader.tick()
fail_events = [evt for evt in notifier.events if evt[0] == "Config Reload Failed"]
self.assertEqual(len(fail_events), 1)
def test_reloader_defers_apply_until_idle(self):
with tempfile.TemporaryDirectory() as td:
path = Path(td) / "config.json"
daemon, _desktop = self._daemon_for_path(path)
notifier = FakeNotifier()
reloader = aman.ConfigReloader(
daemon=daemon,
config_path=path,
notifier=notifier,
poll_interval_sec=0.01,
)
updated = Config()
updated.injection.backend = "injection"
self._write_config(path, updated)
daemon.set_state(aman.State.RECORDING)
reloader.tick()
self.assertEqual(daemon.cfg.injection.backend, "clipboard")
daemon.set_state(aman.State.IDLE)
reloader.tick()
self.assertEqual(daemon.cfg.injection.backend, "injection")
class LockTests(unittest.TestCase):
def test_lock_rejects_second_instance(self):
with tempfile.TemporaryDirectory() as td:
with patch.dict(os.environ, {"XDG_RUNTIME_DIR": td}, clear=False):
first = aman._lock_single_instance()
try:
with self.assertRaises(SystemExit) as ctx:
aman._lock_single_instance()
self.assertIn("already running", str(ctx.exception))
finally:
first.close()
if __name__ == "__main__":
unittest.main()