diff --git a/src/aman.py b/src/aman.py index 61b28fb..b8699ca 100755 --- a/src/aman.py +++ b/src/aman.py @@ -15,9 +15,10 @@ from pathlib import Path from typing import Any from aiprocess import LlamaProcessor -from config import Config, load, redacted_dict -from constants import MODEL_PATH, RECORD_TIMEOUT_SEC, STT_LANGUAGE +from config import Config, load, redacted_dict, validate +from constants import DEFAULT_CONFIG_PATH, MODEL_PATH, RECORD_TIMEOUT_SEC, STT_LANGUAGE from desktop import get_desktop_adapter +from diagnostics import run_diagnostics from recorder import start_recording as start_audio_recording from recorder import stop_recording as stop_audio_recording from vocabulary import VocabularyEngine @@ -377,20 +378,80 @@ def _lock_single_instance(): return lock_file -def main(): - global _LOCK_HANDLE +def _build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser() - parser.add_argument("--config", default="", help="path to config.json") - parser.add_argument("--dry-run", action="store_true", help="log hotkey only") - parser.add_argument("-v", "--verbose", action="store_true", help="enable verbose logs") - args = parser.parse_args() + subparsers = parser.add_subparsers(dest="command") + run_parser = subparsers.add_parser("run", help="run the aman daemon") + run_parser.add_argument("--config", default="", help="path to config.json") + run_parser.add_argument("--dry-run", action="store_true", help="log hotkey only") + run_parser.add_argument("-v", "--verbose", action="store_true", help="enable verbose logs") + + doctor_parser = subparsers.add_parser("doctor", help="run startup diagnostics") + doctor_parser.add_argument("--config", default="", help="path to config.json") + doctor_parser.add_argument("--json", action="store_true", help="print JSON output") + doctor_parser.add_argument("-v", "--verbose", action="store_true", help="enable verbose logs") + + init_parser = subparsers.add_parser("init", help="write a default config") + init_parser.add_argument("--config", default="", help="path to config.json") + init_parser.add_argument("--force", action="store_true", help="overwrite existing config") + return parser + + +def _parse_cli_args(argv: list[str]) -> argparse.Namespace: + parser = _build_parser() + normalized_argv = list(argv) + known_commands = {"run", "doctor", "init"} + if not normalized_argv or normalized_argv[0] not in known_commands: + normalized_argv = ["run", *normalized_argv] + return parser.parse_args(normalized_argv) + + +def _configure_logging(verbose: bool) -> None: logging.basicConfig( stream=sys.stderr, - level=logging.DEBUG if args.verbose else logging.INFO, + level=logging.DEBUG if verbose else logging.INFO, format="aman: %(asctime)s %(levelname)s %(message)s", ) - cfg = load(args.config) + + +def _doctor_command(args: argparse.Namespace) -> int: + report = run_diagnostics(args.config) + if args.json: + print(report.to_json()) + else: + for check in report.checks: + status = "OK" if check.ok else "FAIL" + line = f"[{status}] {check.id}: {check.message}" + if check.hint: + line = f"{line} | hint: {check.hint}" + print(line) + print(f"overall: {'ok' if report.ok else 'failed'}") + return 0 if report.ok else 2 + + +def _init_command(args: argparse.Namespace) -> int: + config_path = Path(args.config) if args.config else DEFAULT_CONFIG_PATH + if config_path.exists() and not args.force: + logging.error("init failed: config already exists at %s (use --force to overwrite)", config_path) + return 1 + + cfg = Config() + validate(cfg) + config_path.parent.mkdir(parents=True, exist_ok=True) + config_path.write_text(f"{json.dumps(redacted_dict(cfg), indent=2)}\n", encoding="utf-8") + logging.info("wrote default config to %s", config_path) + return 0 + + +def _run_command(args: argparse.Namespace) -> int: + global _LOCK_HANDLE + + try: + cfg = load(args.config) + except Exception as exc: + logging.error("startup failed: %s", exc) + return 1 _LOCK_HANDLE = _lock_single_instance() logging.info("hotkey: %s", cfg.daemon.hotkey) @@ -415,7 +476,7 @@ def main(): daemon = Daemon(cfg, desktop, verbose=args.verbose) except Exception as exc: logging.error("startup failed: %s", exc) - raise SystemExit(1) + return 1 shutdown_once = threading.Event() @@ -441,13 +502,28 @@ def main(): ) except Exception as exc: logging.error("hotkey setup failed: %s", exc) - raise SystemExit(1) + return 1 logging.info("ready") try: desktop.run_tray(daemon.get_state, lambda: shutdown("quit requested")) finally: daemon.shutdown(timeout=1.0) + return 0 + + +def main(argv: list[str] | None = None) -> int: + args = _parse_cli_args(list(argv) if argv is not None else sys.argv[1:]) + if args.command == "run": + _configure_logging(args.verbose) + return _run_command(args) + if args.command == "doctor": + _configure_logging(args.verbose) + return _doctor_command(args) + if args.command == "init": + _configure_logging(False) + return _init_command(args) + raise RuntimeError(f"unsupported command: {args.command}") if __name__ == "__main__": - main() + raise SystemExit(main()) diff --git a/src/desktop.py b/src/desktop.py index d378bc4..4e448d5 100644 --- a/src/desktop.py +++ b/src/desktop.py @@ -8,6 +8,9 @@ class DesktopAdapter(Protocol): def start_hotkey_listener(self, hotkey: str, callback: Callable[[], None]) -> None: raise NotImplementedError + def validate_hotkey(self, hotkey: str) -> None: + raise NotImplementedError + def start_cancel_listener(self, callback: Callable[[], None]) -> None: raise NotImplementedError diff --git a/src/desktop_wayland.py b/src/desktop_wayland.py index 8e194f2..26c2894 100644 --- a/src/desktop_wayland.py +++ b/src/desktop_wayland.py @@ -7,6 +7,9 @@ class WaylandAdapter: def start_hotkey_listener(self, _hotkey: str, _callback: Callable[[], None]) -> None: raise SystemExit("Wayland hotkeys are not supported yet.") + def validate_hotkey(self, _hotkey: str) -> None: + raise SystemExit("Wayland hotkeys are not supported yet.") + def start_cancel_listener(self, _callback: Callable[[], None]) -> None: raise SystemExit("Wayland hotkeys are not supported yet.") diff --git a/src/desktop_x11.py b/src/desktop_x11.py index ba4bda6..cfaeeee 100644 --- a/src/desktop_x11.py +++ b/src/desktop_x11.py @@ -73,6 +73,10 @@ class X11Adapter: thread = threading.Thread(target=self._listen, args=(mods, keysym, callback), daemon=True) thread.start() + def validate_hotkey(self, hotkey: str) -> None: + mods, keysym = self._parse_hotkey(hotkey) + self._validate_hotkey_registration(mods, keysym) + def start_cancel_listener(self, callback: Callable[[], None]) -> None: mods, keysym = self._parse_hotkey("Escape") with self._cancel_listener_lock: diff --git a/src/diagnostics.py b/src/diagnostics.py new file mode 100644 index 0000000..cca0bec --- /dev/null +++ b/src/diagnostics.py @@ -0,0 +1,159 @@ +from __future__ import annotations + +import json +from dataclasses import asdict, dataclass +from pathlib import Path + +from aiprocess import ensure_model +from config import Config, load +from desktop import get_desktop_adapter +from recorder import resolve_input_device + + +@dataclass +class DiagnosticCheck: + id: str + ok: bool + message: str + hint: str = "" + + +@dataclass +class DiagnosticReport: + checks: list[DiagnosticCheck] + + @property + def ok(self) -> bool: + return all(check.ok for check in self.checks) + + def to_json(self) -> str: + payload = {"ok": self.ok, "checks": [asdict(check) for check in self.checks]} + return json.dumps(payload, ensure_ascii=False, indent=2) + + +def run_diagnostics(config_path: str | None) -> DiagnosticReport: + checks: list[DiagnosticCheck] = [] + cfg: Config | None = None + + try: + cfg = load(config_path or "") + checks.append( + DiagnosticCheck( + id="config.load", + ok=True, + message=f"loaded config from {_resolved_config_path(config_path)}", + ) + ) + except Exception as exc: + checks.append( + DiagnosticCheck( + id="config.load", + ok=False, + message=f"failed to load config: {exc}", + hint="run `aman init --force` to regenerate a default config", + ) + ) + + checks.extend(_audio_check(cfg)) + checks.extend(_hotkey_check(cfg)) + checks.extend(_injection_backend_check(cfg)) + checks.extend(_model_check()) + return DiagnosticReport(checks=checks) + + +def _audio_check(cfg: Config | None) -> list[DiagnosticCheck]: + if cfg is None: + return [ + DiagnosticCheck( + id="audio.input", + ok=False, + message="skipped because config failed to load", + hint="fix config.load first", + ) + ] + input_spec = cfg.recording.input + explicit = input_spec is not None and (not isinstance(input_spec, str) or bool(input_spec.strip())) + device = resolve_input_device(input_spec) + if device is None and explicit: + return [ + DiagnosticCheck( + id="audio.input", + ok=False, + message=f"recording input '{input_spec}' is not resolvable", + hint="set recording.input to a valid device index or matching device name", + ) + ] + if device is None: + return [ + DiagnosticCheck( + id="audio.input", + ok=True, + message="recording input is unset; default system input will be used", + ) + ] + return [DiagnosticCheck(id="audio.input", ok=True, message=f"resolved recording input to device {device}")] + + +def _hotkey_check(cfg: Config | None) -> list[DiagnosticCheck]: + if cfg is None: + return [ + DiagnosticCheck( + id="hotkey.parse", + ok=False, + message="skipped because config failed to load", + hint="fix config.load first", + ) + ] + try: + desktop = get_desktop_adapter() + desktop.validate_hotkey(cfg.daemon.hotkey) + except Exception as exc: + return [ + DiagnosticCheck( + id="hotkey.parse", + ok=False, + message=f"hotkey '{cfg.daemon.hotkey}' is not available: {exc}", + hint="pick another daemon.hotkey such as Super+m", + ) + ] + return [DiagnosticCheck(id="hotkey.parse", ok=True, message=f"hotkey '{cfg.daemon.hotkey}' is valid")] + + +def _injection_backend_check(cfg: Config | None) -> list[DiagnosticCheck]: + if cfg is None: + return [ + DiagnosticCheck( + id="injection.backend", + ok=False, + message="skipped because config failed to load", + hint="fix config.load first", + ) + ] + return [ + DiagnosticCheck( + id="injection.backend", + ok=True, + message=f"injection backend '{cfg.injection.backend}' is configured", + ) + ] + + +def _model_check() -> list[DiagnosticCheck]: + try: + model_path = ensure_model() + return [DiagnosticCheck(id="model.cache", ok=True, message=f"model is ready at {model_path}")] + except Exception as exc: + return [ + DiagnosticCheck( + id="model.cache", + ok=False, + message=f"model is not ready: {exc}", + hint="check internet access and writable cache directory", + ) + ] + + +def _resolved_config_path(config_path: str | None) -> Path: + from constants import DEFAULT_CONFIG_PATH + + return Path(config_path) if config_path else DEFAULT_CONFIG_PATH diff --git a/tests/test_aman_cli.py b/tests/test_aman_cli.py new file mode 100644 index 0000000..7f47b17 --- /dev/null +++ b/tests/test_aman_cli.py @@ -0,0 +1,58 @@ +import io +import json +import sys +import unittest +from pathlib import Path +from unittest.mock import patch + +ROOT = Path(__file__).resolve().parents[1] +SRC = ROOT / "src" +if str(SRC) not in sys.path: + sys.path.insert(0, str(SRC)) + +import aman +from diagnostics import DiagnosticCheck, DiagnosticReport + + +class AmanCliTests(unittest.TestCase): + def test_parse_cli_args_defaults_to_run_command(self): + args = aman._parse_cli_args(["--dry-run"]) + + self.assertEqual(args.command, "run") + self.assertTrue(args.dry_run) + + def test_parse_cli_args_doctor_command(self): + args = aman._parse_cli_args(["doctor", "--json"]) + + self.assertEqual(args.command, "doctor") + self.assertTrue(args.json) + + def test_doctor_command_json_output_and_exit_code(self): + report = DiagnosticReport( + checks=[DiagnosticCheck(id="config.load", ok=True, message="ok", hint="")] + ) + args = aman._parse_cli_args(["doctor", "--json"]) + out = io.StringIO() + with patch("aman.run_diagnostics", return_value=report), patch("sys.stdout", out): + exit_code = aman._doctor_command(args) + + self.assertEqual(exit_code, 0) + payload = json.loads(out.getvalue()) + self.assertTrue(payload["ok"]) + self.assertEqual(payload["checks"][0]["id"], "config.load") + + def test_doctor_command_failed_report_returns_exit_code_2(self): + report = DiagnosticReport( + checks=[DiagnosticCheck(id="config.load", ok=False, message="broken", hint="fix")] + ) + args = aman._parse_cli_args(["doctor"]) + out = io.StringIO() + with patch("aman.run_diagnostics", return_value=report), patch("sys.stdout", out): + exit_code = aman._doctor_command(args) + + self.assertEqual(exit_code, 2) + self.assertIn("[FAIL] config.load", out.getvalue()) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_diagnostics.py b/tests/test_diagnostics.py new file mode 100644 index 0000000..2938828 --- /dev/null +++ b/tests/test_diagnostics.py @@ -0,0 +1,69 @@ +import json +import sys +import unittest +from pathlib import Path +from unittest.mock import patch + +ROOT = Path(__file__).resolve().parents[1] +SRC = ROOT / "src" +if str(SRC) not in sys.path: + sys.path.insert(0, str(SRC)) + +from config import Config +from diagnostics import DiagnosticCheck, DiagnosticReport, run_diagnostics + + +class _FakeDesktop: + def validate_hotkey(self, _hotkey: str) -> None: + return + + +class DiagnosticsTests(unittest.TestCase): + def test_run_diagnostics_all_checks_pass(self): + cfg = Config() + with patch("diagnostics.load", return_value=cfg), patch( + "diagnostics.resolve_input_device", return_value=1 + ), patch("diagnostics.get_desktop_adapter", return_value=_FakeDesktop()), patch( + "diagnostics.ensure_model", return_value=Path("/tmp/model.gguf") + ): + report = run_diagnostics("/tmp/config.json") + + self.assertTrue(report.ok) + ids = [check.id for check in report.checks] + self.assertEqual( + ids, + ["config.load", "audio.input", "hotkey.parse", "injection.backend", "model.cache"], + ) + self.assertTrue(all(check.ok for check in report.checks)) + + def test_run_diagnostics_marks_config_fail_and_skips_dependent_checks(self): + with patch("diagnostics.load", side_effect=ValueError("broken config")), patch( + "diagnostics.ensure_model", return_value=Path("/tmp/model.gguf") + ): + report = run_diagnostics("/tmp/config.json") + + self.assertFalse(report.ok) + results = {check.id: check for check in report.checks} + self.assertFalse(results["config.load"].ok) + self.assertFalse(results["audio.input"].ok) + self.assertFalse(results["hotkey.parse"].ok) + self.assertFalse(results["injection.backend"].ok) + self.assertTrue(results["model.cache"].ok) + + def test_report_json_schema(self): + report = DiagnosticReport( + checks=[ + DiagnosticCheck(id="config.load", ok=True, message="ok", hint=""), + DiagnosticCheck(id="model.cache", ok=False, message="nope", hint="fix"), + ] + ) + + payload = json.loads(report.to_json()) + + self.assertFalse(payload["ok"]) + self.assertEqual(payload["checks"][0]["id"], "config.load") + self.assertEqual(payload["checks"][1]["hint"], "fix") + + +if __name__ == "__main__": + unittest.main()