458 lines
16 KiB
Python
458 lines
16 KiB
Python
import os
|
|
import sys
|
|
import tempfile
|
|
import unittest
|
|
from pathlib import Path
|
|
from unittest.mock import patch
|
|
|
|
ROOT = Path(__file__).resolve().parents[1]
|
|
SRC = ROOT / "src"
|
|
if str(SRC) not in sys.path:
|
|
sys.path.insert(0, str(SRC))
|
|
|
|
import aman
|
|
from config import Config, VocabularyReplacement
|
|
|
|
|
|
class FakeDesktop:
|
|
def __init__(self, *, fail_cancel_listener: bool = False):
|
|
self.inject_calls = []
|
|
self.quit_calls = 0
|
|
self.cancel_listener_start_calls = 0
|
|
self.cancel_listener_stop_calls = 0
|
|
self.cancel_listener_callback = None
|
|
self.fail_cancel_listener = fail_cancel_listener
|
|
|
|
def start_cancel_listener(self, callback) -> None:
|
|
if self.fail_cancel_listener:
|
|
raise RuntimeError("cancel listener unavailable")
|
|
self.cancel_listener_start_calls += 1
|
|
self.cancel_listener_callback = callback
|
|
|
|
def stop_cancel_listener(self) -> None:
|
|
self.cancel_listener_stop_calls += 1
|
|
self.cancel_listener_callback = None
|
|
|
|
def inject_text(
|
|
self,
|
|
text: str,
|
|
backend: str,
|
|
*,
|
|
remove_transcription_from_clipboard: bool = False,
|
|
) -> None:
|
|
self.inject_calls.append((text, backend, remove_transcription_from_clipboard))
|
|
|
|
def request_quit(self) -> None:
|
|
self.quit_calls += 1
|
|
|
|
|
|
class FakeSegment:
|
|
def __init__(self, text: str):
|
|
self.text = text
|
|
|
|
|
|
class FakeModel:
|
|
def __init__(self, text: str = "hello world"):
|
|
self.text = text
|
|
self.last_kwargs = {}
|
|
|
|
def transcribe(self, _audio, language=None, vad_filter=None):
|
|
self.last_kwargs = {
|
|
"language": language,
|
|
"vad_filter": vad_filter,
|
|
}
|
|
return [FakeSegment(self.text)], self.last_kwargs
|
|
|
|
|
|
class FakeHintModel:
|
|
def __init__(self, text: str = "hello world"):
|
|
self.text = text
|
|
self.last_kwargs = {}
|
|
|
|
def transcribe(
|
|
self,
|
|
_audio,
|
|
language=None,
|
|
vad_filter=None,
|
|
hotwords=None,
|
|
initial_prompt=None,
|
|
):
|
|
self.last_kwargs = {
|
|
"language": language,
|
|
"vad_filter": vad_filter,
|
|
"hotwords": hotwords,
|
|
"initial_prompt": initial_prompt,
|
|
}
|
|
return [FakeSegment(self.text)], self.last_kwargs
|
|
|
|
|
|
class FakeKwargModel:
|
|
def __init__(self, text: str = "hello world"):
|
|
self.text = text
|
|
self.last_kwargs = {}
|
|
|
|
def transcribe(self, _audio, **kwargs):
|
|
self.last_kwargs = dict(kwargs)
|
|
return [FakeSegment(self.text)], self.last_kwargs
|
|
|
|
|
|
class FakeUnsupportedLanguageModel:
|
|
def __init__(self, text: str = "hello world"):
|
|
self.text = text
|
|
self.calls = []
|
|
|
|
def transcribe(self, _audio, language=None, vad_filter=None):
|
|
self.calls.append({"language": language, "vad_filter": vad_filter})
|
|
if language:
|
|
raise RuntimeError(f"unsupported language: {language}")
|
|
return [FakeSegment(self.text)], {"language": language, "vad_filter": vad_filter}
|
|
|
|
|
|
class FakeAIProcessor:
|
|
def __init__(self):
|
|
self.last_kwargs = {}
|
|
|
|
def process(self, text, lang="auto", **_kwargs):
|
|
self.last_kwargs = {"lang": lang, **_kwargs}
|
|
return text
|
|
|
|
|
|
class FakeAudio:
|
|
def __init__(self, size: int):
|
|
self.size = size
|
|
|
|
|
|
class FakeStream:
|
|
def __init__(self):
|
|
self.stop_calls = 0
|
|
self.close_calls = 0
|
|
|
|
def stop(self):
|
|
self.stop_calls += 1
|
|
|
|
def close(self):
|
|
self.close_calls += 1
|
|
|
|
|
|
class DaemonTests(unittest.TestCase):
|
|
def _config(self) -> Config:
|
|
cfg = Config()
|
|
return cfg
|
|
|
|
def _build_daemon(
|
|
self,
|
|
desktop: FakeDesktop,
|
|
model: FakeModel | FakeHintModel,
|
|
*,
|
|
cfg: Config | None = None,
|
|
verbose: bool = False,
|
|
ai_processor: FakeAIProcessor | None = None,
|
|
) -> aman.Daemon:
|
|
active_cfg = cfg if cfg is not None else self._config()
|
|
active_ai_processor = ai_processor or FakeAIProcessor()
|
|
with patch("aman._build_whisper_model", return_value=model), patch(
|
|
"aman.LlamaProcessor", return_value=active_ai_processor
|
|
):
|
|
return aman.Daemon(active_cfg, desktop, verbose=verbose)
|
|
|
|
@patch("aman.stop_audio_recording", return_value=FakeAudio(8))
|
|
@patch("aman.start_audio_recording", return_value=(object(), object()))
|
|
def test_toggle_start_stop_injects_text(self, _start_mock, _stop_mock):
|
|
desktop = FakeDesktop()
|
|
daemon = self._build_daemon(desktop, FakeModel(), verbose=False)
|
|
daemon._start_stop_worker = (
|
|
lambda stream, record, trigger, process_audio: daemon._stop_and_process(
|
|
stream, record, trigger, process_audio
|
|
)
|
|
)
|
|
|
|
daemon.toggle()
|
|
self.assertEqual(daemon.get_state(), aman.State.RECORDING)
|
|
|
|
daemon.toggle()
|
|
|
|
self.assertEqual(daemon.get_state(), aman.State.IDLE)
|
|
self.assertEqual(desktop.inject_calls, [("hello world", "clipboard", False)])
|
|
|
|
@patch("aman.stop_audio_recording", return_value=FakeAudio(8))
|
|
@patch("aman.start_audio_recording", return_value=(object(), object()))
|
|
def test_shutdown_stops_recording_without_injection(self, _start_mock, _stop_mock):
|
|
desktop = FakeDesktop()
|
|
daemon = self._build_daemon(desktop, FakeModel(), verbose=False)
|
|
daemon._start_stop_worker = (
|
|
lambda stream, record, trigger, process_audio: daemon._stop_and_process(
|
|
stream, record, trigger, process_audio
|
|
)
|
|
)
|
|
|
|
daemon.toggle()
|
|
self.assertEqual(daemon.get_state(), aman.State.RECORDING)
|
|
|
|
self.assertTrue(daemon.shutdown(timeout=0.2))
|
|
self.assertEqual(daemon.get_state(), aman.State.IDLE)
|
|
self.assertEqual(desktop.inject_calls, [])
|
|
|
|
@patch("aman.stop_audio_recording", return_value=FakeAudio(8))
|
|
@patch("aman.start_audio_recording", return_value=(object(), object()))
|
|
def test_dictionary_replacement_applies_after_ai(self, _start_mock, _stop_mock):
|
|
desktop = FakeDesktop()
|
|
model = FakeModel(text="good morning martha")
|
|
cfg = self._config()
|
|
cfg.vocabulary.replacements = [VocabularyReplacement(source="Martha", target="Marta")]
|
|
|
|
daemon = self._build_daemon(desktop, model, cfg=cfg, verbose=False)
|
|
daemon._start_stop_worker = (
|
|
lambda stream, record, trigger, process_audio: daemon._stop_and_process(
|
|
stream, record, trigger, process_audio
|
|
)
|
|
)
|
|
|
|
daemon.toggle()
|
|
daemon.toggle()
|
|
|
|
self.assertEqual(desktop.inject_calls, [("good morning Marta", "clipboard", False)])
|
|
|
|
def test_transcribe_skips_hints_when_model_does_not_support_them(self):
|
|
desktop = FakeDesktop()
|
|
model = FakeModel(text="hello")
|
|
cfg = self._config()
|
|
cfg.vocabulary.terms = ["Docker", "Systemd"]
|
|
|
|
daemon = self._build_daemon(desktop, model, cfg=cfg, verbose=False)
|
|
|
|
result, used_lang = daemon._transcribe(object())
|
|
|
|
self.assertEqual(result, "hello")
|
|
self.assertEqual(used_lang, "auto")
|
|
self.assertNotIn("hotwords", model.last_kwargs)
|
|
self.assertNotIn("initial_prompt", model.last_kwargs)
|
|
|
|
def test_transcribe_applies_hints_when_model_supports_them(self):
|
|
desktop = FakeDesktop()
|
|
model = FakeHintModel(text="hello")
|
|
cfg = self._config()
|
|
cfg.vocabulary.terms = ["Systemd"]
|
|
cfg.vocabulary.replacements = [VocabularyReplacement(source="docker", target="Docker")]
|
|
|
|
daemon = self._build_daemon(desktop, model, cfg=cfg, verbose=False)
|
|
|
|
result, used_lang = daemon._transcribe(object())
|
|
|
|
self.assertEqual(result, "hello")
|
|
self.assertEqual(used_lang, "auto")
|
|
self.assertIn("Docker", model.last_kwargs["hotwords"])
|
|
self.assertIn("Systemd", model.last_kwargs["hotwords"])
|
|
self.assertIn("Preferred vocabulary", model.last_kwargs["initial_prompt"])
|
|
|
|
def test_transcribe_uses_configured_language_hint(self):
|
|
desktop = FakeDesktop()
|
|
model = FakeModel(text="hola")
|
|
cfg = self._config()
|
|
cfg.stt.language = "es"
|
|
|
|
daemon = self._build_daemon(desktop, model, cfg=cfg, verbose=False)
|
|
|
|
result, used_lang = daemon._transcribe(object())
|
|
|
|
self.assertEqual(result, "hola")
|
|
self.assertEqual(used_lang, "es")
|
|
self.assertEqual(model.last_kwargs["language"], "es")
|
|
|
|
def test_transcribe_auto_language_omits_language_kwarg(self):
|
|
desktop = FakeDesktop()
|
|
model = FakeKwargModel(text="hello")
|
|
cfg = self._config()
|
|
cfg.stt.language = "auto"
|
|
|
|
daemon = self._build_daemon(desktop, model, cfg=cfg, verbose=False)
|
|
|
|
result, used_lang = daemon._transcribe(object())
|
|
|
|
self.assertEqual(result, "hello")
|
|
self.assertEqual(used_lang, "auto")
|
|
self.assertNotIn("language", model.last_kwargs)
|
|
|
|
def test_transcribe_falls_back_to_auto_when_hint_is_rejected(self):
|
|
desktop = FakeDesktop()
|
|
model = FakeUnsupportedLanguageModel(text="bonjour")
|
|
cfg = self._config()
|
|
cfg.stt.language = "fr"
|
|
|
|
daemon = self._build_daemon(desktop, model, cfg=cfg, verbose=False)
|
|
|
|
with self.assertLogs(level="WARNING") as logs:
|
|
result, used_lang = daemon._transcribe(object())
|
|
|
|
self.assertEqual(result, "bonjour")
|
|
self.assertEqual(used_lang, "auto")
|
|
self.assertEqual(len(model.calls), 2)
|
|
self.assertEqual(model.calls[0]["language"], "fr")
|
|
self.assertIsNone(model.calls[1]["language"])
|
|
self.assertTrue(any("falling back to auto-detect" in line for line in logs.output))
|
|
|
|
def test_verbose_flag_controls_transcript_logging(self):
|
|
desktop = FakeDesktop()
|
|
cfg = self._config()
|
|
|
|
daemon = self._build_daemon(desktop, FakeModel(), cfg=cfg, verbose=False)
|
|
self.assertFalse(daemon.log_transcript)
|
|
|
|
daemon_verbose = self._build_daemon(desktop, FakeModel(), cfg=cfg, verbose=True)
|
|
self.assertTrue(daemon_verbose.log_transcript)
|
|
|
|
def test_ai_processor_is_initialized_during_daemon_init(self):
|
|
desktop = FakeDesktop()
|
|
with patch("aman._build_whisper_model", return_value=FakeModel()), patch(
|
|
"aman.LlamaProcessor", return_value=FakeAIProcessor()
|
|
) as processor_cls:
|
|
daemon = aman.Daemon(self._config(), desktop, verbose=True)
|
|
|
|
processor_cls.assert_called_once_with(verbose=True, model_path=None)
|
|
self.assertIsNotNone(daemon.ai_processor)
|
|
|
|
@patch("aman.stop_audio_recording", return_value=FakeAudio(8))
|
|
@patch("aman.start_audio_recording", return_value=(object(), object()))
|
|
def test_passes_clipboard_remove_option_to_desktop(self, _start_mock, _stop_mock):
|
|
desktop = FakeDesktop()
|
|
model = FakeModel(text="hello world")
|
|
cfg = self._config()
|
|
cfg.injection.remove_transcription_from_clipboard = True
|
|
|
|
daemon = self._build_daemon(desktop, model, cfg=cfg, verbose=False)
|
|
daemon._start_stop_worker = (
|
|
lambda stream, record, trigger, process_audio: daemon._stop_and_process(
|
|
stream, record, trigger, process_audio
|
|
)
|
|
)
|
|
|
|
daemon.toggle()
|
|
daemon.toggle()
|
|
|
|
self.assertEqual(desktop.inject_calls, [("hello world", "clipboard", True)])
|
|
|
|
def test_state_changes_are_debug_level(self):
|
|
desktop = FakeDesktop()
|
|
daemon = self._build_daemon(desktop, FakeModel(), verbose=False)
|
|
|
|
with self.assertLogs(level="DEBUG") as logs:
|
|
daemon.set_state(aman.State.RECORDING)
|
|
|
|
self.assertTrue(
|
|
any("DEBUG:root:state: idle -> recording" in line for line in logs.output)
|
|
)
|
|
|
|
@patch("aman.stop_audio_recording", return_value=FakeAudio(8))
|
|
@patch("aman.start_audio_recording", return_value=(object(), object()))
|
|
def test_cancel_listener_armed_only_while_recording(self, _start_mock, _stop_mock):
|
|
desktop = FakeDesktop()
|
|
daemon = self._build_daemon(desktop, FakeModel(), verbose=False)
|
|
daemon._start_stop_worker = (
|
|
lambda stream, record, trigger, process_audio: daemon._stop_and_process(
|
|
stream, record, trigger, process_audio
|
|
)
|
|
)
|
|
|
|
self.assertIsNone(desktop.cancel_listener_callback)
|
|
daemon.toggle()
|
|
self.assertEqual(desktop.cancel_listener_start_calls, 1)
|
|
self.assertEqual(desktop.cancel_listener_stop_calls, 0)
|
|
self.assertIsNotNone(desktop.cancel_listener_callback)
|
|
|
|
daemon.toggle()
|
|
self.assertEqual(desktop.cancel_listener_start_calls, 1)
|
|
self.assertEqual(desktop.cancel_listener_stop_calls, 1)
|
|
self.assertIsNone(desktop.cancel_listener_callback)
|
|
|
|
@patch("aman.start_audio_recording")
|
|
def test_recording_does_not_start_when_cancel_listener_fails(self, start_mock):
|
|
stream = FakeStream()
|
|
start_mock.return_value = (stream, object())
|
|
desktop = FakeDesktop(fail_cancel_listener=True)
|
|
daemon = self._build_daemon(desktop, FakeModel(), verbose=False)
|
|
|
|
daemon.toggle()
|
|
|
|
self.assertEqual(daemon.get_state(), aman.State.IDLE)
|
|
self.assertIsNone(daemon.stream)
|
|
self.assertIsNone(daemon.record)
|
|
self.assertEqual(stream.stop_calls, 1)
|
|
self.assertEqual(stream.close_calls, 1)
|
|
|
|
@patch("aman.stop_audio_recording", return_value=FakeAudio(8))
|
|
@patch("aman.start_audio_recording", return_value=(object(), object()))
|
|
def test_ai_processor_receives_active_profile(self, _start_mock, _stop_mock):
|
|
desktop = FakeDesktop()
|
|
cfg = self._config()
|
|
cfg.ux.profile = "fast"
|
|
ai_processor = FakeAIProcessor()
|
|
daemon = self._build_daemon(
|
|
desktop,
|
|
FakeModel(text="hello world"),
|
|
cfg=cfg,
|
|
verbose=False,
|
|
ai_processor=ai_processor,
|
|
)
|
|
daemon._start_stop_worker = (
|
|
lambda stream, record, trigger, process_audio: daemon._stop_and_process(
|
|
stream, record, trigger, process_audio
|
|
)
|
|
)
|
|
|
|
daemon.toggle()
|
|
daemon.toggle()
|
|
|
|
self.assertEqual(ai_processor.last_kwargs.get("profile"), "fast")
|
|
|
|
@patch("aman.stop_audio_recording", return_value=FakeAudio(8))
|
|
@patch("aman.start_audio_recording", return_value=(object(), object()))
|
|
def test_ai_processor_receives_effective_language(self, _start_mock, _stop_mock):
|
|
desktop = FakeDesktop()
|
|
cfg = self._config()
|
|
cfg.stt.language = "es"
|
|
ai_processor = FakeAIProcessor()
|
|
daemon = self._build_daemon(
|
|
desktop,
|
|
FakeModel(text="hola mundo"),
|
|
cfg=cfg,
|
|
verbose=False,
|
|
ai_processor=ai_processor,
|
|
)
|
|
daemon._start_stop_worker = (
|
|
lambda stream, record, trigger, process_audio: daemon._stop_and_process(
|
|
stream, record, trigger, process_audio
|
|
)
|
|
)
|
|
|
|
daemon.toggle()
|
|
daemon.toggle()
|
|
|
|
self.assertEqual(ai_processor.last_kwargs.get("lang"), "es")
|
|
|
|
@patch("aman.start_audio_recording")
|
|
def test_paused_state_blocks_recording_start(self, start_mock):
|
|
desktop = FakeDesktop()
|
|
daemon = self._build_daemon(desktop, FakeModel(), verbose=False)
|
|
|
|
self.assertTrue(daemon.toggle_paused())
|
|
daemon.toggle()
|
|
|
|
start_mock.assert_not_called()
|
|
self.assertEqual(daemon.get_state(), aman.State.IDLE)
|
|
self.assertEqual(desktop.cancel_listener_start_calls, 0)
|
|
|
|
|
|
class LockTests(unittest.TestCase):
|
|
def test_lock_rejects_second_instance(self):
|
|
with tempfile.TemporaryDirectory() as td:
|
|
with patch.dict(os.environ, {"XDG_RUNTIME_DIR": td}, clear=False):
|
|
first = aman._lock_single_instance()
|
|
try:
|
|
with self.assertRaises(SystemExit) as ctx:
|
|
aman._lock_single_instance()
|
|
self.assertIn("already running", str(ctx.exception))
|
|
finally:
|
|
first.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|