Add CLI subcommands and doctor diagnostics

This commit is contained in:
Thales Maciel 2026-02-26 17:38:06 -03:00
parent 9c7d7b35b1
commit ad1af63fac
7 changed files with 385 additions and 13 deletions

View file

@ -15,9 +15,10 @@ from pathlib import Path
from typing import Any from typing import Any
from aiprocess import LlamaProcessor from aiprocess import LlamaProcessor
from config import Config, load, redacted_dict from config import Config, load, redacted_dict, validate
from constants import MODEL_PATH, RECORD_TIMEOUT_SEC, STT_LANGUAGE from constants import DEFAULT_CONFIG_PATH, MODEL_PATH, RECORD_TIMEOUT_SEC, STT_LANGUAGE
from desktop import get_desktop_adapter from desktop import get_desktop_adapter
from diagnostics import run_diagnostics
from recorder import start_recording as start_audio_recording from recorder import start_recording as start_audio_recording
from recorder import stop_recording as stop_audio_recording from recorder import stop_recording as stop_audio_recording
from vocabulary import VocabularyEngine from vocabulary import VocabularyEngine
@ -377,20 +378,80 @@ def _lock_single_instance():
return lock_file return lock_file
def main(): def _build_parser() -> argparse.ArgumentParser:
global _LOCK_HANDLE
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument("--config", default="", help="path to config.json") subparsers = parser.add_subparsers(dest="command")
parser.add_argument("--dry-run", action="store_true", help="log hotkey only")
parser.add_argument("-v", "--verbose", action="store_true", help="enable verbose logs")
args = parser.parse_args()
run_parser = subparsers.add_parser("run", help="run the aman daemon")
run_parser.add_argument("--config", default="", help="path to config.json")
run_parser.add_argument("--dry-run", action="store_true", help="log hotkey only")
run_parser.add_argument("-v", "--verbose", action="store_true", help="enable verbose logs")
doctor_parser = subparsers.add_parser("doctor", help="run startup diagnostics")
doctor_parser.add_argument("--config", default="", help="path to config.json")
doctor_parser.add_argument("--json", action="store_true", help="print JSON output")
doctor_parser.add_argument("-v", "--verbose", action="store_true", help="enable verbose logs")
init_parser = subparsers.add_parser("init", help="write a default config")
init_parser.add_argument("--config", default="", help="path to config.json")
init_parser.add_argument("--force", action="store_true", help="overwrite existing config")
return parser
def _parse_cli_args(argv: list[str]) -> argparse.Namespace:
parser = _build_parser()
normalized_argv = list(argv)
known_commands = {"run", "doctor", "init"}
if not normalized_argv or normalized_argv[0] not in known_commands:
normalized_argv = ["run", *normalized_argv]
return parser.parse_args(normalized_argv)
def _configure_logging(verbose: bool) -> None:
logging.basicConfig( logging.basicConfig(
stream=sys.stderr, stream=sys.stderr,
level=logging.DEBUG if args.verbose else logging.INFO, level=logging.DEBUG if verbose else logging.INFO,
format="aman: %(asctime)s %(levelname)s %(message)s", format="aman: %(asctime)s %(levelname)s %(message)s",
) )
cfg = load(args.config)
def _doctor_command(args: argparse.Namespace) -> int:
report = run_diagnostics(args.config)
if args.json:
print(report.to_json())
else:
for check in report.checks:
status = "OK" if check.ok else "FAIL"
line = f"[{status}] {check.id}: {check.message}"
if check.hint:
line = f"{line} | hint: {check.hint}"
print(line)
print(f"overall: {'ok' if report.ok else 'failed'}")
return 0 if report.ok else 2
def _init_command(args: argparse.Namespace) -> int:
config_path = Path(args.config) if args.config else DEFAULT_CONFIG_PATH
if config_path.exists() and not args.force:
logging.error("init failed: config already exists at %s (use --force to overwrite)", config_path)
return 1
cfg = Config()
validate(cfg)
config_path.parent.mkdir(parents=True, exist_ok=True)
config_path.write_text(f"{json.dumps(redacted_dict(cfg), indent=2)}\n", encoding="utf-8")
logging.info("wrote default config to %s", config_path)
return 0
def _run_command(args: argparse.Namespace) -> int:
global _LOCK_HANDLE
try:
cfg = load(args.config)
except Exception as exc:
logging.error("startup failed: %s", exc)
return 1
_LOCK_HANDLE = _lock_single_instance() _LOCK_HANDLE = _lock_single_instance()
logging.info("hotkey: %s", cfg.daemon.hotkey) logging.info("hotkey: %s", cfg.daemon.hotkey)
@ -415,7 +476,7 @@ def main():
daemon = Daemon(cfg, desktop, verbose=args.verbose) daemon = Daemon(cfg, desktop, verbose=args.verbose)
except Exception as exc: except Exception as exc:
logging.error("startup failed: %s", exc) logging.error("startup failed: %s", exc)
raise SystemExit(1) return 1
shutdown_once = threading.Event() shutdown_once = threading.Event()
@ -441,13 +502,28 @@ def main():
) )
except Exception as exc: except Exception as exc:
logging.error("hotkey setup failed: %s", exc) logging.error("hotkey setup failed: %s", exc)
raise SystemExit(1) return 1
logging.info("ready") logging.info("ready")
try: try:
desktop.run_tray(daemon.get_state, lambda: shutdown("quit requested")) desktop.run_tray(daemon.get_state, lambda: shutdown("quit requested"))
finally: finally:
daemon.shutdown(timeout=1.0) daemon.shutdown(timeout=1.0)
return 0
def main(argv: list[str] | None = None) -> int:
args = _parse_cli_args(list(argv) if argv is not None else sys.argv[1:])
if args.command == "run":
_configure_logging(args.verbose)
return _run_command(args)
if args.command == "doctor":
_configure_logging(args.verbose)
return _doctor_command(args)
if args.command == "init":
_configure_logging(False)
return _init_command(args)
raise RuntimeError(f"unsupported command: {args.command}")
if __name__ == "__main__": if __name__ == "__main__":
main() raise SystemExit(main())

View file

@ -8,6 +8,9 @@ class DesktopAdapter(Protocol):
def start_hotkey_listener(self, hotkey: str, callback: Callable[[], None]) -> None: def start_hotkey_listener(self, hotkey: str, callback: Callable[[], None]) -> None:
raise NotImplementedError raise NotImplementedError
def validate_hotkey(self, hotkey: str) -> None:
raise NotImplementedError
def start_cancel_listener(self, callback: Callable[[], None]) -> None: def start_cancel_listener(self, callback: Callable[[], None]) -> None:
raise NotImplementedError raise NotImplementedError

View file

@ -7,6 +7,9 @@ class WaylandAdapter:
def start_hotkey_listener(self, _hotkey: str, _callback: Callable[[], None]) -> None: def start_hotkey_listener(self, _hotkey: str, _callback: Callable[[], None]) -> None:
raise SystemExit("Wayland hotkeys are not supported yet.") raise SystemExit("Wayland hotkeys are not supported yet.")
def validate_hotkey(self, _hotkey: str) -> None:
raise SystemExit("Wayland hotkeys are not supported yet.")
def start_cancel_listener(self, _callback: Callable[[], None]) -> None: def start_cancel_listener(self, _callback: Callable[[], None]) -> None:
raise SystemExit("Wayland hotkeys are not supported yet.") raise SystemExit("Wayland hotkeys are not supported yet.")

View file

@ -73,6 +73,10 @@ class X11Adapter:
thread = threading.Thread(target=self._listen, args=(mods, keysym, callback), daemon=True) thread = threading.Thread(target=self._listen, args=(mods, keysym, callback), daemon=True)
thread.start() thread.start()
def validate_hotkey(self, hotkey: str) -> None:
mods, keysym = self._parse_hotkey(hotkey)
self._validate_hotkey_registration(mods, keysym)
def start_cancel_listener(self, callback: Callable[[], None]) -> None: def start_cancel_listener(self, callback: Callable[[], None]) -> None:
mods, keysym = self._parse_hotkey("Escape") mods, keysym = self._parse_hotkey("Escape")
with self._cancel_listener_lock: with self._cancel_listener_lock:

159
src/diagnostics.py Normal file
View file

@ -0,0 +1,159 @@
from __future__ import annotations
import json
from dataclasses import asdict, dataclass
from pathlib import Path
from aiprocess import ensure_model
from config import Config, load
from desktop import get_desktop_adapter
from recorder import resolve_input_device
@dataclass
class DiagnosticCheck:
id: str
ok: bool
message: str
hint: str = ""
@dataclass
class DiagnosticReport:
checks: list[DiagnosticCheck]
@property
def ok(self) -> bool:
return all(check.ok for check in self.checks)
def to_json(self) -> str:
payload = {"ok": self.ok, "checks": [asdict(check) for check in self.checks]}
return json.dumps(payload, ensure_ascii=False, indent=2)
def run_diagnostics(config_path: str | None) -> DiagnosticReport:
checks: list[DiagnosticCheck] = []
cfg: Config | None = None
try:
cfg = load(config_path or "")
checks.append(
DiagnosticCheck(
id="config.load",
ok=True,
message=f"loaded config from {_resolved_config_path(config_path)}",
)
)
except Exception as exc:
checks.append(
DiagnosticCheck(
id="config.load",
ok=False,
message=f"failed to load config: {exc}",
hint="run `aman init --force` to regenerate a default config",
)
)
checks.extend(_audio_check(cfg))
checks.extend(_hotkey_check(cfg))
checks.extend(_injection_backend_check(cfg))
checks.extend(_model_check())
return DiagnosticReport(checks=checks)
def _audio_check(cfg: Config | None) -> list[DiagnosticCheck]:
if cfg is None:
return [
DiagnosticCheck(
id="audio.input",
ok=False,
message="skipped because config failed to load",
hint="fix config.load first",
)
]
input_spec = cfg.recording.input
explicit = input_spec is not None and (not isinstance(input_spec, str) or bool(input_spec.strip()))
device = resolve_input_device(input_spec)
if device is None and explicit:
return [
DiagnosticCheck(
id="audio.input",
ok=False,
message=f"recording input '{input_spec}' is not resolvable",
hint="set recording.input to a valid device index or matching device name",
)
]
if device is None:
return [
DiagnosticCheck(
id="audio.input",
ok=True,
message="recording input is unset; default system input will be used",
)
]
return [DiagnosticCheck(id="audio.input", ok=True, message=f"resolved recording input to device {device}")]
def _hotkey_check(cfg: Config | None) -> list[DiagnosticCheck]:
if cfg is None:
return [
DiagnosticCheck(
id="hotkey.parse",
ok=False,
message="skipped because config failed to load",
hint="fix config.load first",
)
]
try:
desktop = get_desktop_adapter()
desktop.validate_hotkey(cfg.daemon.hotkey)
except Exception as exc:
return [
DiagnosticCheck(
id="hotkey.parse",
ok=False,
message=f"hotkey '{cfg.daemon.hotkey}' is not available: {exc}",
hint="pick another daemon.hotkey such as Super+m",
)
]
return [DiagnosticCheck(id="hotkey.parse", ok=True, message=f"hotkey '{cfg.daemon.hotkey}' is valid")]
def _injection_backend_check(cfg: Config | None) -> list[DiagnosticCheck]:
if cfg is None:
return [
DiagnosticCheck(
id="injection.backend",
ok=False,
message="skipped because config failed to load",
hint="fix config.load first",
)
]
return [
DiagnosticCheck(
id="injection.backend",
ok=True,
message=f"injection backend '{cfg.injection.backend}' is configured",
)
]
def _model_check() -> list[DiagnosticCheck]:
try:
model_path = ensure_model()
return [DiagnosticCheck(id="model.cache", ok=True, message=f"model is ready at {model_path}")]
except Exception as exc:
return [
DiagnosticCheck(
id="model.cache",
ok=False,
message=f"model is not ready: {exc}",
hint="check internet access and writable cache directory",
)
]
def _resolved_config_path(config_path: str | None) -> Path:
from constants import DEFAULT_CONFIG_PATH
return Path(config_path) if config_path else DEFAULT_CONFIG_PATH

58
tests/test_aman_cli.py Normal file
View file

@ -0,0 +1,58 @@
import io
import json
import sys
import unittest
from pathlib import Path
from unittest.mock import patch
ROOT = Path(__file__).resolve().parents[1]
SRC = ROOT / "src"
if str(SRC) not in sys.path:
sys.path.insert(0, str(SRC))
import aman
from diagnostics import DiagnosticCheck, DiagnosticReport
class AmanCliTests(unittest.TestCase):
def test_parse_cli_args_defaults_to_run_command(self):
args = aman._parse_cli_args(["--dry-run"])
self.assertEqual(args.command, "run")
self.assertTrue(args.dry_run)
def test_parse_cli_args_doctor_command(self):
args = aman._parse_cli_args(["doctor", "--json"])
self.assertEqual(args.command, "doctor")
self.assertTrue(args.json)
def test_doctor_command_json_output_and_exit_code(self):
report = DiagnosticReport(
checks=[DiagnosticCheck(id="config.load", ok=True, message="ok", hint="")]
)
args = aman._parse_cli_args(["doctor", "--json"])
out = io.StringIO()
with patch("aman.run_diagnostics", return_value=report), patch("sys.stdout", out):
exit_code = aman._doctor_command(args)
self.assertEqual(exit_code, 0)
payload = json.loads(out.getvalue())
self.assertTrue(payload["ok"])
self.assertEqual(payload["checks"][0]["id"], "config.load")
def test_doctor_command_failed_report_returns_exit_code_2(self):
report = DiagnosticReport(
checks=[DiagnosticCheck(id="config.load", ok=False, message="broken", hint="fix")]
)
args = aman._parse_cli_args(["doctor"])
out = io.StringIO()
with patch("aman.run_diagnostics", return_value=report), patch("sys.stdout", out):
exit_code = aman._doctor_command(args)
self.assertEqual(exit_code, 2)
self.assertIn("[FAIL] config.load", out.getvalue())
if __name__ == "__main__":
unittest.main()

69
tests/test_diagnostics.py Normal file
View file

@ -0,0 +1,69 @@
import json
import sys
import unittest
from pathlib import Path
from unittest.mock import patch
ROOT = Path(__file__).resolve().parents[1]
SRC = ROOT / "src"
if str(SRC) not in sys.path:
sys.path.insert(0, str(SRC))
from config import Config
from diagnostics import DiagnosticCheck, DiagnosticReport, run_diagnostics
class _FakeDesktop:
def validate_hotkey(self, _hotkey: str) -> None:
return
class DiagnosticsTests(unittest.TestCase):
def test_run_diagnostics_all_checks_pass(self):
cfg = Config()
with patch("diagnostics.load", return_value=cfg), patch(
"diagnostics.resolve_input_device", return_value=1
), patch("diagnostics.get_desktop_adapter", return_value=_FakeDesktop()), patch(
"diagnostics.ensure_model", return_value=Path("/tmp/model.gguf")
):
report = run_diagnostics("/tmp/config.json")
self.assertTrue(report.ok)
ids = [check.id for check in report.checks]
self.assertEqual(
ids,
["config.load", "audio.input", "hotkey.parse", "injection.backend", "model.cache"],
)
self.assertTrue(all(check.ok for check in report.checks))
def test_run_diagnostics_marks_config_fail_and_skips_dependent_checks(self):
with patch("diagnostics.load", side_effect=ValueError("broken config")), patch(
"diagnostics.ensure_model", return_value=Path("/tmp/model.gguf")
):
report = run_diagnostics("/tmp/config.json")
self.assertFalse(report.ok)
results = {check.id: check for check in report.checks}
self.assertFalse(results["config.load"].ok)
self.assertFalse(results["audio.input"].ok)
self.assertFalse(results["hotkey.parse"].ok)
self.assertFalse(results["injection.backend"].ok)
self.assertTrue(results["model.cache"].ok)
def test_report_json_schema(self):
report = DiagnosticReport(
checks=[
DiagnosticCheck(id="config.load", ok=True, message="ok", hint=""),
DiagnosticCheck(id="model.cache", ok=False, message="nope", hint="fix"),
]
)
payload = json.loads(report.to_json())
self.assertFalse(payload["ok"])
self.assertEqual(payload["checks"][0]["id"], "config.load")
self.assertEqual(payload["checks"][1]["hint"], "fix")
if __name__ == "__main__":
unittest.main()