aman/tests/test_aman.py
Thales Maciel fa91f313c4
Some checks are pending
ci / test-and-build (push) Waiting to run
Simplify editor cleanup and keep live ASR metadata
Keep the daemon path on the full ASR result so word timings and detected language survive into the editor pipeline instead of falling back to a plain transcript string.

Add PipelineEngine.run_asr_result(), have aman call it when live ASR data is available, and cover the word-aware alignment behavior in the daemon tests.

Collapse the llama cleanup flow to a single JSON-shaped completion while leaving the legacy pass1/pass2 parameters in place as compatibility no-ops.

Validated with PYTHONPATH=src python3 -m unittest tests.test_aiprocess tests.test_aman.
2026-03-12 13:24:36 -03:00

597 lines
22 KiB
Python

import os
import sys
import tempfile
import unittest
from pathlib import Path
from unittest.mock import patch
ROOT = Path(__file__).resolve().parents[1]
SRC = ROOT / "src"
if str(SRC) not in sys.path:
sys.path.insert(0, str(SRC))
import aman
from config import Config, VocabularyReplacement
from stages.asr_whisper import AsrResult, AsrSegment, AsrWord
class FakeDesktop:
def __init__(self, *, fail_cancel_listener: bool = False):
self.inject_calls = []
self.quit_calls = 0
self.cancel_listener_start_calls = 0
self.cancel_listener_stop_calls = 0
self.cancel_listener_callback = None
self.fail_cancel_listener = fail_cancel_listener
def start_cancel_listener(self, callback) -> None:
if self.fail_cancel_listener:
raise RuntimeError("cancel listener unavailable")
self.cancel_listener_start_calls += 1
self.cancel_listener_callback = callback
def stop_cancel_listener(self) -> None:
self.cancel_listener_stop_calls += 1
self.cancel_listener_callback = None
def inject_text(
self,
text: str,
backend: str,
*,
remove_transcription_from_clipboard: bool = False,
) -> None:
self.inject_calls.append((text, backend, remove_transcription_from_clipboard))
def request_quit(self) -> None:
self.quit_calls += 1
class FakeSegment:
def __init__(self, text: str):
self.text = text
class FakeModel:
def __init__(self, text: str = "hello world"):
self.text = text
self.last_kwargs = {}
def transcribe(self, _audio, language=None, vad_filter=None):
self.last_kwargs = {
"language": language,
"vad_filter": vad_filter,
}
return [FakeSegment(self.text)], self.last_kwargs
class FakeHintModel:
def __init__(self, text: str = "hello world"):
self.text = text
self.last_kwargs = {}
def transcribe(
self,
_audio,
language=None,
vad_filter=None,
hotwords=None,
initial_prompt=None,
):
self.last_kwargs = {
"language": language,
"vad_filter": vad_filter,
"hotwords": hotwords,
"initial_prompt": initial_prompt,
}
return [FakeSegment(self.text)], self.last_kwargs
class FakeKwargModel:
def __init__(self, text: str = "hello world"):
self.text = text
self.last_kwargs = {}
def transcribe(self, _audio, **kwargs):
self.last_kwargs = dict(kwargs)
return [FakeSegment(self.text)], self.last_kwargs
class FakeUnsupportedLanguageModel:
def __init__(self, text: str = "hello world"):
self.text = text
self.calls = []
def transcribe(self, _audio, language=None, vad_filter=None):
self.calls.append({"language": language, "vad_filter": vad_filter})
if language:
raise RuntimeError(f"unsupported language: {language}")
return [FakeSegment(self.text)], {"language": language, "vad_filter": vad_filter}
class FakeAIProcessor:
def __init__(self):
self.last_kwargs = {}
self.warmup_calls = []
self.warmup_error = None
self.process_error = None
def process(self, text, lang="auto", **_kwargs):
if self.process_error is not None:
raise self.process_error
self.last_kwargs = {"lang": lang, **_kwargs}
return text
def warmup(self, profile="default"):
self.warmup_calls.append(profile)
if self.warmup_error:
raise self.warmup_error
class FakeAudio:
def __init__(self, size: int):
self.size = size
class FakeStream:
def __init__(self):
self.stop_calls = 0
self.close_calls = 0
def stop(self):
self.stop_calls += 1
def close(self):
self.close_calls += 1
def _asr_result(text: str, words: list[str], *, language: str = "auto") -> AsrResult:
asr_words: list[AsrWord] = []
start = 0.0
for token in words:
asr_words.append(AsrWord(text=token, start_s=start, end_s=start + 0.1, prob=0.9))
start += 0.2
return AsrResult(
raw_text=text,
language=language,
latency_ms=5.0,
words=asr_words,
segments=[AsrSegment(text=text, start_s=0.0, end_s=max(start, 0.1))],
)
class DaemonTests(unittest.TestCase):
def _config(self) -> Config:
cfg = Config()
return cfg
def _build_daemon(
self,
desktop: FakeDesktop,
model: FakeModel | FakeHintModel,
*,
cfg: Config | None = None,
verbose: bool = False,
ai_processor: FakeAIProcessor | None = None,
) -> aman.Daemon:
active_cfg = cfg if cfg is not None else self._config()
active_ai_processor = ai_processor or FakeAIProcessor()
with patch("aman._build_whisper_model", return_value=model), patch(
"aman.LlamaProcessor", return_value=active_ai_processor
):
return aman.Daemon(active_cfg, desktop, verbose=verbose)
@patch("aman.stop_audio_recording", return_value=FakeAudio(8))
@patch("aman.start_audio_recording", return_value=(object(), object()))
def test_toggle_start_stop_injects_text(self, _start_mock, _stop_mock):
desktop = FakeDesktop()
daemon = self._build_daemon(desktop, FakeModel(), verbose=False)
daemon._start_stop_worker = (
lambda stream, record, trigger, process_audio: daemon._stop_and_process(
stream, record, trigger, process_audio
)
)
daemon.toggle()
self.assertEqual(daemon.get_state(), aman.State.RECORDING)
daemon.toggle()
self.assertEqual(daemon.get_state(), aman.State.IDLE)
self.assertEqual(desktop.inject_calls, [("hello world", "clipboard", False)])
@patch("aman.stop_audio_recording", return_value=FakeAudio(8))
@patch("aman.start_audio_recording", return_value=(object(), object()))
def test_shutdown_stops_recording_without_injection(self, _start_mock, _stop_mock):
desktop = FakeDesktop()
daemon = self._build_daemon(desktop, FakeModel(), verbose=False)
daemon._start_stop_worker = (
lambda stream, record, trigger, process_audio: daemon._stop_and_process(
stream, record, trigger, process_audio
)
)
daemon.toggle()
self.assertEqual(daemon.get_state(), aman.State.RECORDING)
self.assertTrue(daemon.shutdown(timeout=0.2))
self.assertEqual(daemon.get_state(), aman.State.IDLE)
self.assertEqual(desktop.inject_calls, [])
@patch("aman.stop_audio_recording", return_value=FakeAudio(8))
@patch("aman.start_audio_recording", return_value=(object(), object()))
def test_dictionary_replacement_applies_after_ai(self, _start_mock, _stop_mock):
desktop = FakeDesktop()
model = FakeModel(text="good morning martha")
cfg = self._config()
cfg.vocabulary.replacements = [VocabularyReplacement(source="Martha", target="Marta")]
daemon = self._build_daemon(desktop, model, cfg=cfg, verbose=False)
daemon._start_stop_worker = (
lambda stream, record, trigger, process_audio: daemon._stop_and_process(
stream, record, trigger, process_audio
)
)
daemon.toggle()
daemon.toggle()
self.assertEqual(desktop.inject_calls, [("good morning Marta", "clipboard", False)])
@patch("aman.stop_audio_recording", return_value=FakeAudio(8))
@patch("aman.start_audio_recording", return_value=(object(), object()))
def test_editor_failure_aborts_output_injection(self, _start_mock, _stop_mock):
desktop = FakeDesktop()
model = FakeModel(text="hello world")
ai_processor = FakeAIProcessor()
ai_processor.process_error = RuntimeError("editor boom")
daemon = self._build_daemon(
desktop,
model,
verbose=False,
ai_processor=ai_processor,
)
daemon._start_stop_worker = (
lambda stream, record, trigger, process_audio: daemon._stop_and_process(
stream, record, trigger, process_audio
)
)
daemon.toggle()
daemon.toggle()
self.assertEqual(desktop.inject_calls, [])
self.assertEqual(daemon.get_state(), aman.State.IDLE)
@patch("aman.stop_audio_recording", return_value=FakeAudio(8))
@patch("aman.start_audio_recording", return_value=(object(), object()))
def test_live_path_uses_asr_words_for_alignment_correction(self, _start_mock, _stop_mock):
desktop = FakeDesktop()
ai_processor = FakeAIProcessor()
daemon = self._build_daemon(desktop, FakeModel(), verbose=False, ai_processor=ai_processor)
daemon.asr_stage.transcribe = lambda _audio: _asr_result(
"set alarm for 6 i mean 7",
["set", "alarm", "for", "6", "i", "mean", "7"],
language="en",
)
daemon._start_stop_worker = (
lambda stream, record, trigger, process_audio: daemon._stop_and_process(
stream, record, trigger, process_audio
)
)
daemon.toggle()
daemon.toggle()
self.assertEqual(desktop.inject_calls, [("set alarm for 7", "clipboard", False)])
self.assertEqual(ai_processor.last_kwargs.get("lang"), "en")
@patch("aman.stop_audio_recording", return_value=FakeAudio(8))
@patch("aman.start_audio_recording", return_value=(object(), object()))
def test_live_path_calls_word_aware_pipeline_entrypoint(self, _start_mock, _stop_mock):
desktop = FakeDesktop()
daemon = self._build_daemon(desktop, FakeModel(), verbose=False)
asr_result = _asr_result(
"set alarm for 6 i mean 7",
["set", "alarm", "for", "6", "i", "mean", "7"],
language="en",
)
daemon.asr_stage.transcribe = lambda _audio: asr_result
daemon._start_stop_worker = (
lambda stream, record, trigger, process_audio: daemon._stop_and_process(
stream, record, trigger, process_audio
)
)
with patch.object(daemon.pipeline, "run_asr_result", wraps=daemon.pipeline.run_asr_result) as run_asr:
daemon.toggle()
daemon.toggle()
run_asr.assert_called_once()
self.assertIs(run_asr.call_args.args[0], asr_result)
def test_transcribe_skips_hints_when_model_does_not_support_them(self):
desktop = FakeDesktop()
model = FakeModel(text="hello")
cfg = self._config()
cfg.vocabulary.terms = ["Docker", "Systemd"]
daemon = self._build_daemon(desktop, model, cfg=cfg, verbose=False)
result, used_lang = daemon._transcribe(object())
self.assertEqual(result, "hello")
self.assertEqual(used_lang, "auto")
self.assertNotIn("hotwords", model.last_kwargs)
self.assertNotIn("initial_prompt", model.last_kwargs)
def test_transcribe_applies_hints_when_model_supports_them(self):
desktop = FakeDesktop()
model = FakeHintModel(text="hello")
cfg = self._config()
cfg.vocabulary.terms = ["Systemd"]
cfg.vocabulary.replacements = [VocabularyReplacement(source="docker", target="Docker")]
daemon = self._build_daemon(desktop, model, cfg=cfg, verbose=False)
result, used_lang = daemon._transcribe(object())
self.assertEqual(result, "hello")
self.assertEqual(used_lang, "auto")
self.assertIn("Docker", model.last_kwargs["hotwords"])
self.assertIn("Systemd", model.last_kwargs["hotwords"])
self.assertIsNone(model.last_kwargs["initial_prompt"])
def test_transcribe_uses_configured_language_hint(self):
desktop = FakeDesktop()
model = FakeModel(text="hola")
cfg = self._config()
cfg.stt.language = "es"
daemon = self._build_daemon(desktop, model, cfg=cfg, verbose=False)
result, used_lang = daemon._transcribe(object())
self.assertEqual(result, "hola")
self.assertEqual(used_lang, "es")
self.assertEqual(model.last_kwargs["language"], "es")
def test_transcribe_auto_language_omits_language_kwarg(self):
desktop = FakeDesktop()
model = FakeKwargModel(text="hello")
cfg = self._config()
cfg.stt.language = "auto"
daemon = self._build_daemon(desktop, model, cfg=cfg, verbose=False)
result, used_lang = daemon._transcribe(object())
self.assertEqual(result, "hello")
self.assertEqual(used_lang, "auto")
self.assertNotIn("language", model.last_kwargs)
def test_transcribe_falls_back_to_auto_when_hint_is_rejected(self):
desktop = FakeDesktop()
model = FakeUnsupportedLanguageModel(text="bonjour")
cfg = self._config()
cfg.stt.language = "fr"
daemon = self._build_daemon(desktop, model, cfg=cfg, verbose=False)
with self.assertLogs(level="WARNING") as logs:
result, used_lang = daemon._transcribe(object())
self.assertEqual(result, "bonjour")
self.assertEqual(used_lang, "auto")
self.assertEqual(len(model.calls), 2)
self.assertEqual(model.calls[0]["language"], "fr")
self.assertIsNone(model.calls[1]["language"])
self.assertTrue(any("falling back to auto-detect" in line for line in logs.output))
def test_verbose_flag_controls_transcript_logging(self):
desktop = FakeDesktop()
cfg = self._config()
daemon = self._build_daemon(desktop, FakeModel(), cfg=cfg, verbose=False)
self.assertFalse(daemon.log_transcript)
daemon_verbose = self._build_daemon(desktop, FakeModel(), cfg=cfg, verbose=True)
self.assertTrue(daemon_verbose.log_transcript)
def test_editor_stage_is_initialized_during_daemon_init(self):
desktop = FakeDesktop()
with patch("aman._build_whisper_model", return_value=FakeModel()), patch(
"aman.LlamaProcessor", return_value=FakeAIProcessor()
) as processor_cls:
daemon = aman.Daemon(self._config(), desktop, verbose=True)
processor_cls.assert_called_once_with(verbose=True, model_path=None)
self.assertIsNotNone(daemon.editor_stage)
def test_editor_stage_is_warmed_up_during_daemon_init(self):
desktop = FakeDesktop()
ai_processor = FakeAIProcessor()
with patch("aman._build_whisper_model", return_value=FakeModel()), patch(
"aman.LlamaProcessor", return_value=ai_processor
):
daemon = aman.Daemon(self._config(), desktop, verbose=False)
self.assertIs(daemon.editor_stage._processor, ai_processor)
self.assertEqual(ai_processor.warmup_calls, ["default"])
def test_editor_stage_warmup_failure_is_fatal_with_strict_startup(self):
desktop = FakeDesktop()
cfg = self._config()
cfg.advanced.strict_startup = True
ai_processor = FakeAIProcessor()
ai_processor.warmup_error = RuntimeError("warmup boom")
with patch("aman._build_whisper_model", return_value=FakeModel()), patch(
"aman.LlamaProcessor", return_value=ai_processor
):
with self.assertRaisesRegex(RuntimeError, "editor stage warmup failed"):
aman.Daemon(cfg, desktop, verbose=False)
def test_editor_stage_warmup_failure_is_non_fatal_without_strict_startup(self):
desktop = FakeDesktop()
cfg = self._config()
cfg.advanced.strict_startup = False
ai_processor = FakeAIProcessor()
ai_processor.warmup_error = RuntimeError("warmup boom")
with patch("aman._build_whisper_model", return_value=FakeModel()), patch(
"aman.LlamaProcessor", return_value=ai_processor
):
with self.assertLogs(level="WARNING") as logs:
daemon = aman.Daemon(cfg, desktop, verbose=False)
self.assertIs(daemon.editor_stage._processor, ai_processor)
self.assertTrue(
any("continuing because advanced.strict_startup=false" in line for line in logs.output)
)
@patch("aman.stop_audio_recording", return_value=FakeAudio(8))
@patch("aman.start_audio_recording", return_value=(object(), object()))
def test_passes_clipboard_remove_option_to_desktop(self, _start_mock, _stop_mock):
desktop = FakeDesktop()
model = FakeModel(text="hello world")
cfg = self._config()
cfg.injection.remove_transcription_from_clipboard = True
daemon = self._build_daemon(desktop, model, cfg=cfg, verbose=False)
daemon._start_stop_worker = (
lambda stream, record, trigger, process_audio: daemon._stop_and_process(
stream, record, trigger, process_audio
)
)
daemon.toggle()
daemon.toggle()
self.assertEqual(desktop.inject_calls, [("hello world", "clipboard", True)])
def test_state_changes_are_debug_level(self):
desktop = FakeDesktop()
daemon = self._build_daemon(desktop, FakeModel(), verbose=False)
with self.assertLogs(level="DEBUG") as logs:
daemon.set_state(aman.State.RECORDING)
self.assertTrue(
any("DEBUG:root:state: idle -> recording" in line for line in logs.output)
)
@patch("aman.stop_audio_recording", return_value=FakeAudio(8))
@patch("aman.start_audio_recording", return_value=(object(), object()))
def test_cancel_listener_armed_only_while_recording(self, _start_mock, _stop_mock):
desktop = FakeDesktop()
daemon = self._build_daemon(desktop, FakeModel(), verbose=False)
daemon._start_stop_worker = (
lambda stream, record, trigger, process_audio: daemon._stop_and_process(
stream, record, trigger, process_audio
)
)
self.assertIsNone(desktop.cancel_listener_callback)
daemon.toggle()
self.assertEqual(desktop.cancel_listener_start_calls, 1)
self.assertEqual(desktop.cancel_listener_stop_calls, 0)
self.assertIsNotNone(desktop.cancel_listener_callback)
daemon.toggle()
self.assertEqual(desktop.cancel_listener_start_calls, 1)
self.assertEqual(desktop.cancel_listener_stop_calls, 1)
self.assertIsNone(desktop.cancel_listener_callback)
@patch("aman.start_audio_recording")
def test_recording_does_not_start_when_cancel_listener_fails(self, start_mock):
stream = FakeStream()
start_mock.return_value = (stream, object())
desktop = FakeDesktop(fail_cancel_listener=True)
daemon = self._build_daemon(desktop, FakeModel(), verbose=False)
daemon.toggle()
self.assertEqual(daemon.get_state(), aman.State.IDLE)
self.assertIsNone(daemon.stream)
self.assertIsNone(daemon.record)
self.assertEqual(stream.stop_calls, 1)
self.assertEqual(stream.close_calls, 1)
@patch("aman.stop_audio_recording", return_value=FakeAudio(8))
@patch("aman.start_audio_recording", return_value=(object(), object()))
def test_ai_processor_receives_active_profile(self, _start_mock, _stop_mock):
desktop = FakeDesktop()
cfg = self._config()
cfg.ux.profile = "fast"
ai_processor = FakeAIProcessor()
daemon = self._build_daemon(
desktop,
FakeModel(text="hello world"),
cfg=cfg,
verbose=False,
ai_processor=ai_processor,
)
daemon._start_stop_worker = (
lambda stream, record, trigger, process_audio: daemon._stop_and_process(
stream, record, trigger, process_audio
)
)
daemon.toggle()
daemon.toggle()
self.assertEqual(ai_processor.last_kwargs.get("profile"), "fast")
@patch("aman.stop_audio_recording", return_value=FakeAudio(8))
@patch("aman.start_audio_recording", return_value=(object(), object()))
def test_ai_processor_receives_effective_language(self, _start_mock, _stop_mock):
desktop = FakeDesktop()
cfg = self._config()
cfg.stt.language = "es"
ai_processor = FakeAIProcessor()
daemon = self._build_daemon(
desktop,
FakeModel(text="hola mundo"),
cfg=cfg,
verbose=False,
ai_processor=ai_processor,
)
daemon._start_stop_worker = (
lambda stream, record, trigger, process_audio: daemon._stop_and_process(
stream, record, trigger, process_audio
)
)
daemon.toggle()
daemon.toggle()
self.assertEqual(ai_processor.last_kwargs.get("lang"), "es")
@patch("aman.start_audio_recording")
def test_paused_state_blocks_recording_start(self, start_mock):
desktop = FakeDesktop()
daemon = self._build_daemon(desktop, FakeModel(), verbose=False)
self.assertTrue(daemon.toggle_paused())
daemon.toggle()
start_mock.assert_not_called()
self.assertEqual(daemon.get_state(), aman.State.IDLE)
self.assertEqual(desktop.cancel_listener_start_calls, 0)
class LockTests(unittest.TestCase):
def test_lock_rejects_second_instance(self):
with tempfile.TemporaryDirectory() as td:
with patch.dict(os.environ, {"XDG_RUNTIME_DIR": td}, clear=False):
first = aman._lock_single_instance()
try:
with self.assertRaises(SystemExit) as ctx:
aman._lock_single_instance()
self.assertIn("already running", str(ctx.exception))
finally:
first.close()
if __name__ == "__main__":
unittest.main()