Harden runtime diagnostics for milestone 3

Make the milestone 3 runtime story predictable instead of treating doctor, self-check, and startup failures as loosely related surfaces.

Split doctor and self-check into distinct read-only flows, add tri-state diagnostic status with stable IDs and next steps, and reuse that wording in CLI output, service logs, and tray-triggered diagnostics. Add non-mutating config/model probes, a make runtime-check gate, and public recovery/validation docs for the X11 GA roadmap.

Validation: make runtime-check; PYTHONPATH=src python3 -m unittest discover -s tests -p 'test_*.py'; python3 -m py_compile src/*.py tests/*.py; PYTHONPATH=src python3 -m aman doctor --help; PYTHONPATH=src python3 -m aman self-check --help. Leave milestone 3 open in the roadmap until the manual X11 validation rows are filled.
This commit is contained in:
Thales Maciel 2026-03-12 17:41:23 -03:00
parent a3368056ff
commit ed1b59240b
No known key found for this signature in database
GPG key ID: 33112E6833C34679
16 changed files with 1298 additions and 248 deletions

View file

@ -34,6 +34,13 @@ class ProcessTimings:
total_ms: float
@dataclass(frozen=True)
class ManagedModelStatus:
status: str
path: Path
message: str
_EXAMPLE_CASES = [
{
"id": "corr-time-01",
@ -748,6 +755,32 @@ def ensure_model():
return MODEL_PATH
def probe_managed_model() -> ManagedModelStatus:
if not MODEL_PATH.exists():
return ManagedModelStatus(
status="missing",
path=MODEL_PATH,
message=f"managed editor model is not cached at {MODEL_PATH}",
)
checksum = _sha256_file(MODEL_PATH)
if checksum.casefold() != MODEL_SHA256.casefold():
return ManagedModelStatus(
status="invalid",
path=MODEL_PATH,
message=(
"managed editor model checksum mismatch "
f"(expected {MODEL_SHA256}, got {checksum})"
),
)
return ManagedModelStatus(
status="ready",
path=MODEL_PATH,
message=f"managed editor model is ready at {MODEL_PATH}",
)
def _assert_expected_model_checksum(checksum: str) -> None:
if checksum.casefold() == MODEL_SHA256.casefold():
return

View file

@ -23,7 +23,16 @@ from config import Config, ConfigValidationError, load, redacted_dict, save, val
from constants import DEFAULT_CONFIG_PATH, MODEL_PATH, RECORD_TIMEOUT_SEC
from config_ui import ConfigUiResult, run_config_ui, show_about_dialog, show_help_dialog
from desktop import get_desktop_adapter
from diagnostics import run_diagnostics
from diagnostics import (
doctor_command,
format_diagnostic_line,
format_support_line,
journalctl_command,
run_doctor,
run_self_check,
self_check_command,
verbose_run_command,
)
from engine.pipeline import PipelineEngine
from model_eval import (
build_heuristic_dataset,
@ -286,10 +295,18 @@ def _summarize_bench_runs(runs: list[BenchRunMetrics]) -> BenchSummary:
class Daemon:
def __init__(self, cfg: Config, desktop, *, verbose: bool = False):
def __init__(
self,
cfg: Config,
desktop,
*,
verbose: bool = False,
config_path: Path | None = None,
):
self.cfg = cfg
self.desktop = desktop
self.verbose = verbose
self.config_path = config_path or DEFAULT_CONFIG_PATH
self.lock = threading.Lock()
self._shutdown_requested = threading.Event()
self._paused = False
@ -447,7 +464,12 @@ class Daemon:
try:
stream, record = start_audio_recording(self.cfg.recording.input)
except Exception as exc:
logging.error("record start failed: %s", exc)
_log_support_issue(
logging.ERROR,
"audio.input",
f"record start failed: {exc}",
next_step=f"run `{doctor_command(self.config_path)}` and verify the selected input device",
)
return
if not self._arm_cancel_listener():
try:
@ -509,7 +531,12 @@ class Daemon:
try:
audio = stop_audio_recording(stream, record)
except Exception as exc:
logging.error("record stop failed: %s", exc)
_log_support_issue(
logging.ERROR,
"runtime.audio",
f"record stop failed: {exc}",
next_step=f"rerun `{doctor_command(self.config_path)}` and verify the audio runtime",
)
self.set_state(State.IDLE)
return
@ -518,7 +545,12 @@ class Daemon:
return
if audio.size == 0:
logging.error("no audio captured")
_log_support_issue(
logging.ERROR,
"runtime.audio",
"no audio was captured from the active input device",
next_step="verify the selected microphone level and rerun diagnostics",
)
self.set_state(State.IDLE)
return
@ -526,7 +558,12 @@ class Daemon:
logging.info("stt started")
asr_result = self._transcribe_with_metrics(audio)
except Exception as exc:
logging.error("stt failed: %s", exc)
_log_support_issue(
logging.ERROR,
"startup.readiness",
f"stt failed: {exc}",
next_step=f"run `{self_check_command(self.config_path)}` and then `{verbose_run_command(self.config_path)}`",
)
self.set_state(State.IDLE)
return
@ -555,7 +592,12 @@ class Daemon:
verbose=self.log_transcript,
)
except Exception as exc:
logging.error("editor stage failed: %s", exc)
_log_support_issue(
logging.ERROR,
"model.cache",
f"editor stage failed: {exc}",
next_step=f"run `{self_check_command(self.config_path)}` and inspect `{journalctl_command()}` if the service keeps failing",
)
self.set_state(State.IDLE)
return
@ -580,7 +622,12 @@ class Daemon:
),
)
except Exception as exc:
logging.error("output failed: %s", exc)
_log_support_issue(
logging.ERROR,
"injection.backend",
f"output failed: {exc}",
next_step=f"run `{doctor_command(self.config_path)}` and then `{verbose_run_command(self.config_path)}`",
)
finally:
self.set_state(State.IDLE)
@ -964,8 +1011,8 @@ def _build_parser() -> argparse.ArgumentParser:
doctor_parser = subparsers.add_parser(
"doctor",
help="run preflight diagnostics for config and local environment",
description="Run preflight diagnostics for config and the local environment.",
help="run fast preflight diagnostics for config and local environment",
description="Run fast preflight diagnostics for config and the local environment.",
)
doctor_parser.add_argument("--config", default="", help="path to config.json")
doctor_parser.add_argument("--json", action="store_true", help="print JSON output")
@ -973,8 +1020,8 @@ def _build_parser() -> argparse.ArgumentParser:
self_check_parser = subparsers.add_parser(
"self-check",
help="run installed-system readiness diagnostics",
description="Run installed-system readiness diagnostics.",
help="run deeper installed-system readiness diagnostics without modifying local state",
description="Run deeper installed-system readiness diagnostics without modifying local state.",
)
self_check_parser.add_argument("--config", default="", help="path to config.json")
self_check_parser.add_argument("--json", action="store_true", help="print JSON output")
@ -1095,21 +1142,38 @@ def _configure_logging(verbose: bool) -> None:
)
def _doctor_command(args: argparse.Namespace) -> int:
report = run_diagnostics(args.config)
def _log_support_issue(
level: int,
issue_id: str,
message: str,
*,
next_step: str = "",
) -> None:
logging.log(level, format_support_line(issue_id, message, next_step=next_step))
def _diagnostic_command(
args: argparse.Namespace,
runner,
) -> int:
report = runner(args.config)
if args.json:
print(report.to_json())
else:
for check in report.checks:
status = "OK" if check.ok else "FAIL"
line = f"[{status}] {check.id}: {check.message}"
if check.hint:
line = f"{line} | hint: {check.hint}"
print(line)
print(f"overall: {'ok' if report.ok else 'failed'}")
print(format_diagnostic_line(check))
print(f"overall: {report.status}")
return 0 if report.ok else 2
def _doctor_command(args: argparse.Namespace) -> int:
return _diagnostic_command(args, run_doctor)
def _self_check_command(args: argparse.Namespace) -> int:
return _diagnostic_command(args, run_self_check)
def _read_bench_input_text(args: argparse.Namespace) -> str:
if args.text_file:
try:
@ -1413,7 +1477,12 @@ def _run_command(args: argparse.Namespace) -> int:
try:
desktop = get_desktop_adapter()
except Exception as exc:
logging.error("startup failed: %s", exc)
_log_support_issue(
logging.ERROR,
"session.x11",
f"startup failed: {exc}",
next_step="log into an X11 session and rerun Aman",
)
return 1
if not config_existed_before_start:
@ -1424,23 +1493,43 @@ def _run_command(args: argparse.Namespace) -> int:
try:
cfg = _load_runtime_config(config_path)
except ConfigValidationError as exc:
logging.error("startup failed: invalid config field '%s': %s", exc.field, exc.reason)
_log_support_issue(
logging.ERROR,
"config.load",
f"startup failed: invalid config field '{exc.field}': {exc.reason}",
next_step=f"run `{doctor_command(config_path)}` after fixing the config",
)
if exc.example_fix:
logging.error("example fix: %s", exc.example_fix)
return 1
except Exception as exc:
logging.error("startup failed: %s", exc)
_log_support_issue(
logging.ERROR,
"config.load",
f"startup failed: {exc}",
next_step=f"run `{doctor_command(config_path)}` to inspect config readiness",
)
return 1
try:
validate(cfg)
except ConfigValidationError as exc:
logging.error("startup failed: invalid config field '%s': %s", exc.field, exc.reason)
_log_support_issue(
logging.ERROR,
"config.load",
f"startup failed: invalid config field '{exc.field}': {exc.reason}",
next_step=f"run `{doctor_command(config_path)}` after fixing the config",
)
if exc.example_fix:
logging.error("example fix: %s", exc.example_fix)
return 1
except Exception as exc:
logging.error("startup failed: %s", exc)
_log_support_issue(
logging.ERROR,
"config.load",
f"startup failed: {exc}",
next_step=f"run `{doctor_command(config_path)}` to inspect config readiness",
)
return 1
logging.info("hotkey: %s", cfg.daemon.hotkey)
@ -1463,9 +1552,14 @@ def _run_command(args: argparse.Namespace) -> int:
logging.info("editor backend: local_llama_builtin (%s)", MODEL_PATH)
try:
daemon = Daemon(cfg, desktop, verbose=args.verbose)
daemon = Daemon(cfg, desktop, verbose=args.verbose, config_path=config_path)
except Exception as exc:
logging.error("startup failed: %s", exc)
_log_support_issue(
logging.ERROR,
"startup.readiness",
f"startup failed: {exc}",
next_step=f"run `{self_check_command(config_path)}` and inspect `{journalctl_command()}` if the service still fails",
)
return 1
shutdown_once = threading.Event()
@ -1500,22 +1594,42 @@ def _run_command(args: argparse.Namespace) -> int:
try:
new_cfg = load(str(config_path))
except ConfigValidationError as exc:
logging.error("reload failed: invalid config field '%s': %s", exc.field, exc.reason)
_log_support_issue(
logging.ERROR,
"config.load",
f"reload failed: invalid config field '{exc.field}': {exc.reason}",
next_step=f"run `{doctor_command(config_path)}` after fixing the config",
)
if exc.example_fix:
logging.error("reload example fix: %s", exc.example_fix)
return
except Exception as exc:
logging.error("reload failed: %s", exc)
_log_support_issue(
logging.ERROR,
"config.load",
f"reload failed: {exc}",
next_step=f"run `{doctor_command(config_path)}` to inspect config readiness",
)
return
try:
desktop.start_hotkey_listener(new_cfg.daemon.hotkey, hotkey_callback)
except Exception as exc:
logging.error("reload failed: could not apply hotkey '%s': %s", new_cfg.daemon.hotkey, exc)
_log_support_issue(
logging.ERROR,
"hotkey.parse",
f"reload failed: could not apply hotkey '{new_cfg.daemon.hotkey}': {exc}",
next_step=f"run `{doctor_command(config_path)}` and choose a different hotkey in Settings",
)
return
try:
daemon.apply_config(new_cfg)
except Exception as exc:
logging.error("reload failed: could not apply runtime engines: %s", exc)
_log_support_issue(
logging.ERROR,
"startup.readiness",
f"reload failed: could not apply runtime engines: {exc}",
next_step=f"run `{self_check_command(config_path)}` and then `{verbose_run_command(config_path)}`",
)
return
cfg = new_cfg
logging.info("config reloaded from %s", config_path)
@ -1538,33 +1652,45 @@ def _run_command(args: argparse.Namespace) -> int:
save(config_path, result.config)
desktop.start_hotkey_listener(result.config.daemon.hotkey, hotkey_callback)
except ConfigValidationError as exc:
logging.error("settings apply failed: invalid config field '%s': %s", exc.field, exc.reason)
_log_support_issue(
logging.ERROR,
"config.load",
f"settings apply failed: invalid config field '{exc.field}': {exc.reason}",
next_step=f"run `{doctor_command(config_path)}` after fixing the config",
)
if exc.example_fix:
logging.error("settings example fix: %s", exc.example_fix)
return
except Exception as exc:
logging.error("settings apply failed: %s", exc)
_log_support_issue(
logging.ERROR,
"hotkey.parse",
f"settings apply failed: {exc}",
next_step=f"run `{doctor_command(config_path)}` and check the configured hotkey",
)
return
try:
daemon.apply_config(result.config)
except Exception as exc:
logging.error("settings apply failed: could not apply runtime engines: %s", exc)
_log_support_issue(
logging.ERROR,
"startup.readiness",
f"settings apply failed: could not apply runtime engines: {exc}",
next_step=f"run `{self_check_command(config_path)}` and then `{verbose_run_command(config_path)}`",
)
return
cfg = result.config
logging.info("settings applied from tray")
def run_diagnostics_callback():
report = run_diagnostics(str(config_path))
if report.ok:
logging.info("diagnostics passed (%d checks)", len(report.checks))
report = run_self_check(str(config_path))
if report.status == "ok":
logging.info("diagnostics finished (%s, %d checks)", report.status, len(report.checks))
return
failed = [check for check in report.checks if not check.ok]
logging.warning("diagnostics failed (%d/%d checks)", len(failed), len(report.checks))
for check in failed:
if check.hint:
logging.warning("%s: %s | hint: %s", check.id, check.message, check.hint)
else:
logging.warning("%s: %s", check.id, check.message)
flagged = [check for check in report.checks if check.status != "ok"]
logging.warning("diagnostics finished (%s, %d/%d checks need attention)", report.status, len(flagged), len(report.checks))
for check in flagged:
logging.warning("%s", format_diagnostic_line(check))
def open_config_path_callback():
logging.info("config path: %s", config_path)
@ -1575,7 +1701,12 @@ def _run_command(args: argparse.Namespace) -> int:
hotkey_callback,
)
except Exception as exc:
logging.error("hotkey setup failed: %s", exc)
_log_support_issue(
logging.ERROR,
"hotkey.parse",
f"hotkey setup failed: {exc}",
next_step=f"run `{doctor_command(config_path)}` and choose a different hotkey if needed",
)
return 1
logging.info("ready")
try:
@ -1607,10 +1738,10 @@ def main(argv: list[str] | None = None) -> int:
return _run_command(args)
if args.command == "doctor":
_configure_logging(args.verbose)
return _doctor_command(args)
return _diagnostic_command(args, run_doctor)
if args.command == "self-check":
_configure_logging(args.verbose)
return _doctor_command(args)
return _diagnostic_command(args, run_self_check)
if args.command == "bench":
_configure_logging(args.verbose)
return _bench_command(args)

View file

@ -112,11 +112,10 @@ class Config:
vocabulary: VocabularyConfig = field(default_factory=VocabularyConfig)
def load(path: str | None) -> Config:
def _load_from_path(path: Path, *, create_default: bool) -> Config:
cfg = Config()
p = Path(path) if path else DEFAULT_CONFIG_PATH
if p.exists():
data = json.loads(p.read_text(encoding="utf-8"))
if path.exists():
data = json.loads(path.read_text(encoding="utf-8"))
if not isinstance(data, dict):
_raise_cfg_error(
"config",
@ -128,11 +127,24 @@ def load(path: str | None) -> Config:
validate(cfg)
return cfg
if not create_default:
raise FileNotFoundError(str(path))
validate(cfg)
_write_default_config(p, cfg)
_write_default_config(path, cfg)
return cfg
def load(path: str | None) -> Config:
target = Path(path) if path else DEFAULT_CONFIG_PATH
return _load_from_path(target, create_default=True)
def load_existing(path: str | None) -> Config:
target = Path(path) if path else DEFAULT_CONFIG_PATH
return _load_from_path(target, create_default=False)
def save(path: str | Path | None, cfg: Config) -> Path:
validate(cfg)
target = Path(path) if path else DEFAULT_CONFIG_PATH

View file

@ -1,202 +1,630 @@
from __future__ import annotations
import json
from dataclasses import asdict, dataclass
import os
import shutil
import subprocess
from dataclasses import dataclass
from pathlib import Path
from aiprocess import ensure_model
from config import Config, load
from aiprocess import _load_llama_bindings, probe_managed_model
from config import Config, load_existing
from constants import DEFAULT_CONFIG_PATH, MODEL_DIR
from desktop import get_desktop_adapter
from recorder import resolve_input_device
from recorder import list_input_devices, resolve_input_device
STATUS_OK = "ok"
STATUS_WARN = "warn"
STATUS_FAIL = "fail"
_VALID_STATUSES = {STATUS_OK, STATUS_WARN, STATUS_FAIL}
SERVICE_NAME = "aman"
@dataclass
class DiagnosticCheck:
id: str
ok: bool
status: str
message: str
hint: str = ""
next_step: str = ""
def __post_init__(self) -> None:
if self.status not in _VALID_STATUSES:
raise ValueError(f"invalid diagnostic status: {self.status}")
@property
def ok(self) -> bool:
return self.status != STATUS_FAIL
@property
def hint(self) -> str:
return self.next_step
def to_payload(self) -> dict[str, str | bool]:
return {
"id": self.id,
"status": self.status,
"ok": self.ok,
"message": self.message,
"next_step": self.next_step,
"hint": self.next_step,
}
@dataclass
class DiagnosticReport:
checks: list[DiagnosticCheck]
@property
def status(self) -> str:
if any(check.status == STATUS_FAIL for check in self.checks):
return STATUS_FAIL
if any(check.status == STATUS_WARN for check in self.checks):
return STATUS_WARN
return STATUS_OK
@property
def ok(self) -> bool:
return all(check.ok for check in self.checks)
return self.status != STATUS_FAIL
def to_json(self) -> str:
payload = {"ok": self.ok, "checks": [asdict(check) for check in self.checks]}
payload = {
"status": self.status,
"ok": self.ok,
"checks": [check.to_payload() for check in self.checks],
}
return json.dumps(payload, ensure_ascii=False, indent=2)
def run_diagnostics(config_path: str | None) -> DiagnosticReport:
checks: list[DiagnosticCheck] = []
cfg: Config | None = None
@dataclass
class _ConfigLoadResult:
check: DiagnosticCheck
cfg: Config | None
try:
cfg = load(config_path or "")
checks.append(
DiagnosticCheck(
id="config.load",
ok=True,
message=f"loaded config from {_resolved_config_path(config_path)}",
)
)
except Exception as exc:
checks.append(
DiagnosticCheck(
id="config.load",
ok=False,
message=f"failed to load config: {exc}",
hint=(
"open Settings... from Aman tray to save a valid config, or run "
"`aman init --force` for automation"
),
)
)
checks.extend(_audio_check(cfg))
checks.extend(_hotkey_check(cfg))
checks.extend(_injection_backend_check(cfg))
checks.extend(_provider_check(cfg))
checks.extend(_model_check(cfg))
def doctor_command(config_path: str | Path | None = None) -> str:
return f"aman doctor --config {_resolved_config_path(config_path)}"
def self_check_command(config_path: str | Path | None = None) -> str:
return f"aman self-check --config {_resolved_config_path(config_path)}"
def run_command(config_path: str | Path | None = None) -> str:
return f"aman run --config {_resolved_config_path(config_path)}"
def verbose_run_command(config_path: str | Path | None = None) -> str:
return f"{run_command(config_path)} --verbose"
def journalctl_command() -> str:
return "journalctl --user -u aman -f"
def format_support_line(issue_id: str, message: str, *, next_step: str = "") -> str:
line = f"{issue_id}: {message}"
if next_step:
line = f"{line} | next_step: {next_step}"
return line
def format_diagnostic_line(check: DiagnosticCheck) -> str:
return f"[{check.status.upper()}] {format_support_line(check.id, check.message, next_step=check.next_step)}"
def run_doctor(config_path: str | None) -> DiagnosticReport:
resolved_path = _resolved_config_path(config_path)
config_result = _load_config_check(resolved_path)
session_check = _session_check()
runtime_audio_check, input_devices = _runtime_audio_check(resolved_path)
service_prereq = _service_prereq_check()
checks = [
config_result.check,
session_check,
runtime_audio_check,
_audio_input_check(config_result.cfg, resolved_path, input_devices),
_hotkey_check(config_result.cfg, resolved_path, session_check),
_injection_backend_check(config_result.cfg, resolved_path, session_check),
service_prereq,
]
return DiagnosticReport(checks=checks)
def _audio_check(cfg: Config | None) -> list[DiagnosticCheck]:
if cfg is None:
return [
def run_self_check(config_path: str | None) -> DiagnosticReport:
resolved_path = _resolved_config_path(config_path)
doctor_report = run_doctor(config_path)
checks = list(doctor_report.checks)
by_id = {check.id: check for check in checks}
model_check = _managed_model_check(resolved_path)
cache_check = _cache_writable_check(resolved_path)
unit_check = _service_unit_check(by_id["service.prereq"])
state_check = _service_state_check(by_id["service.prereq"], unit_check)
startup_check = _startup_readiness_check(
config=_config_from_checks(checks),
config_path=resolved_path,
model_check=model_check,
cache_check=cache_check,
)
checks.extend([model_check, cache_check, unit_check, state_check, startup_check])
return DiagnosticReport(checks=checks)
def run_diagnostics(config_path: str | None) -> DiagnosticReport:
return run_doctor(config_path)
def _resolved_config_path(config_path: str | Path | None) -> Path:
if config_path:
return Path(config_path)
return DEFAULT_CONFIG_PATH
def _config_from_checks(checks: list[DiagnosticCheck]) -> Config | None:
for check in checks:
cfg = getattr(check, "_diagnostic_cfg", None)
if cfg is not None:
return cfg
return None
def _load_config_check(config_path: Path) -> _ConfigLoadResult:
if not config_path.exists():
return _ConfigLoadResult(
check=DiagnosticCheck(
id="config.load",
status=STATUS_WARN,
message=f"config file does not exist at {config_path}",
next_step=(
f"run `{run_command(config_path)}` once to open Settings, "
"or run `aman init --force` for automation"
),
),
cfg=None,
)
try:
cfg = load_existing(str(config_path))
except Exception as exc:
return _ConfigLoadResult(
check=DiagnosticCheck(
id="config.load",
status=STATUS_FAIL,
message=f"failed to load config from {config_path}: {exc}",
next_step=(
f"fix {config_path} from Settings or rerun `{doctor_command(config_path)}` "
"after correcting the config"
),
),
cfg=None,
)
check = DiagnosticCheck(
id="config.load",
status=STATUS_OK,
message=f"loaded config from {config_path}",
)
setattr(check, "_diagnostic_cfg", cfg)
return _ConfigLoadResult(check=check, cfg=cfg)
def _session_check() -> DiagnosticCheck:
session_type = os.getenv("XDG_SESSION_TYPE", "").strip().lower()
if session_type == "wayland" or os.getenv("WAYLAND_DISPLAY"):
return DiagnosticCheck(
id="session.x11",
status=STATUS_FAIL,
message="Wayland session detected; Aman supports X11 only",
next_step="log into an X11 session and rerun diagnostics",
)
display = os.getenv("DISPLAY", "").strip()
if not display:
return DiagnosticCheck(
id="session.x11",
status=STATUS_FAIL,
message="DISPLAY is not set; no X11 desktop session is available",
next_step="run diagnostics from the same X11 user session that will run Aman",
)
return DiagnosticCheck(
id="session.x11",
status=STATUS_OK,
message=f"X11 session detected on DISPLAY={display}",
)
def _runtime_audio_check(config_path: Path) -> tuple[DiagnosticCheck, list[dict]]:
try:
devices = list_input_devices()
except Exception as exc:
return (
DiagnosticCheck(
id="audio.input",
ok=False,
message="skipped because config failed to load",
hint="fix config.load first",
)
]
id="runtime.audio",
status=STATUS_FAIL,
message=f"audio runtime is unavailable: {exc}",
next_step=(
f"install the PortAudio runtime dependencies, then rerun `{doctor_command(config_path)}`"
),
),
[],
)
if not devices:
return (
DiagnosticCheck(
id="runtime.audio",
status=STATUS_WARN,
message="audio runtime is available but no input devices were detected",
next_step="connect a microphone or fix the system input device, then rerun diagnostics",
),
devices,
)
return (
DiagnosticCheck(
id="runtime.audio",
status=STATUS_OK,
message=f"audio runtime is available with {len(devices)} input device(s)",
),
devices,
)
def _audio_input_check(
cfg: Config | None,
config_path: Path,
input_devices: list[dict],
) -> DiagnosticCheck:
if cfg is None:
return DiagnosticCheck(
id="audio.input",
status=STATUS_WARN,
message="skipped until config.load is ready",
next_step=f"fix config.load first, then rerun `{doctor_command(config_path)}`",
)
input_spec = cfg.recording.input
explicit = input_spec is not None and (not isinstance(input_spec, str) or bool(input_spec.strip()))
explicit = input_spec is not None and (
not isinstance(input_spec, str) or bool(input_spec.strip())
)
device = resolve_input_device(input_spec)
if device is None and explicit:
return [
DiagnosticCheck(
id="audio.input",
ok=False,
message=f"recording input '{input_spec}' is not resolvable",
hint="set recording.input to a valid device index or matching device name",
)
]
return DiagnosticCheck(
id="audio.input",
status=STATUS_FAIL,
message=f"recording input '{input_spec}' is not resolvable",
next_step="choose a valid recording.input in Settings or set it to a visible input device",
)
if device is None and not input_devices:
return DiagnosticCheck(
id="audio.input",
status=STATUS_WARN,
message="recording input is unset and there is no default input device yet",
next_step="connect a microphone or choose a recording.input in Settings",
)
if device is None:
return [
DiagnosticCheck(
id="audio.input",
ok=True,
message="recording input is unset; default system input will be used",
)
]
return [DiagnosticCheck(id="audio.input", ok=True, message=f"resolved recording input to device {device}")]
return DiagnosticCheck(
id="audio.input",
status=STATUS_OK,
message="recording input is unset; Aman will use the default system input",
)
return DiagnosticCheck(
id="audio.input",
status=STATUS_OK,
message=f"resolved recording input to device {device}",
)
def _hotkey_check(cfg: Config | None) -> list[DiagnosticCheck]:
def _hotkey_check(
cfg: Config | None,
config_path: Path,
session_check: DiagnosticCheck,
) -> DiagnosticCheck:
if cfg is None:
return [
DiagnosticCheck(
id="hotkey.parse",
ok=False,
message="skipped because config failed to load",
hint="fix config.load first",
)
]
return DiagnosticCheck(
id="hotkey.parse",
status=STATUS_WARN,
message="skipped until config.load is ready",
next_step=f"fix config.load first, then rerun `{doctor_command(config_path)}`",
)
if session_check.status == STATUS_FAIL:
return DiagnosticCheck(
id="hotkey.parse",
status=STATUS_WARN,
message="skipped until session.x11 is ready",
next_step="fix session.x11 first, then rerun diagnostics",
)
try:
desktop = get_desktop_adapter()
desktop.validate_hotkey(cfg.daemon.hotkey)
except Exception as exc:
return [
DiagnosticCheck(
id="hotkey.parse",
ok=False,
message=f"hotkey '{cfg.daemon.hotkey}' is not available: {exc}",
hint="pick another daemon.hotkey such as Super+m",
)
]
return [DiagnosticCheck(id="hotkey.parse", ok=True, message=f"hotkey '{cfg.daemon.hotkey}' is valid")]
return DiagnosticCheck(
id="hotkey.parse",
status=STATUS_FAIL,
message=f"hotkey '{cfg.daemon.hotkey}' is not available: {exc}",
next_step="choose a different daemon.hotkey in Settings, then rerun diagnostics",
)
return DiagnosticCheck(
id="hotkey.parse",
status=STATUS_OK,
message=f"hotkey '{cfg.daemon.hotkey}' is available",
)
def _injection_backend_check(cfg: Config | None) -> list[DiagnosticCheck]:
def _injection_backend_check(
cfg: Config | None,
config_path: Path,
session_check: DiagnosticCheck,
) -> DiagnosticCheck:
if cfg is None:
return [
DiagnosticCheck(
id="injection.backend",
ok=False,
message="skipped because config failed to load",
hint="fix config.load first",
)
]
return [
DiagnosticCheck(
return DiagnosticCheck(
id="injection.backend",
ok=True,
message=f"injection backend '{cfg.injection.backend}' is configured",
status=STATUS_WARN,
message="skipped until config.load is ready",
next_step=f"fix config.load first, then rerun `{doctor_command(config_path)}`",
)
]
def _provider_check(cfg: Config | None) -> list[DiagnosticCheck]:
if cfg is None:
return [
DiagnosticCheck(
id="provider.runtime",
ok=False,
message="skipped because config failed to load",
hint="fix config.load first",
)
]
return [
DiagnosticCheck(
id="provider.runtime",
ok=True,
message=f"stt={cfg.stt.provider}, editor=local_llama_builtin",
if session_check.status == STATUS_FAIL:
return DiagnosticCheck(
id="injection.backend",
status=STATUS_WARN,
message="skipped until session.x11 is ready",
next_step="fix session.x11 first, then rerun diagnostics",
)
]
if cfg.injection.backend == "clipboard":
return DiagnosticCheck(
id="injection.backend",
status=STATUS_OK,
message="clipboard injection is configured for X11",
)
return DiagnosticCheck(
id="injection.backend",
status=STATUS_OK,
message=f"X11 key injection backend '{cfg.injection.backend}' is configured",
)
def _model_check(cfg: Config | None) -> list[DiagnosticCheck]:
if cfg is None:
return [
DiagnosticCheck(
id="model.cache",
ok=False,
message="skipped because config failed to load",
hint="fix config.load first",
)
]
if cfg.models.allow_custom_models and cfg.models.whisper_model_path.strip():
path = Path(cfg.models.whisper_model_path)
def _service_prereq_check() -> DiagnosticCheck:
if shutil.which("systemctl") is None:
return DiagnosticCheck(
id="service.prereq",
status=STATUS_FAIL,
message="systemctl is not available; supported daily use requires systemd --user",
next_step="install or use a systemd --user session for the supported Aman service mode",
)
result = _run_systemctl_user(["is-system-running"])
state = (result.stdout or "").strip()
stderr = (result.stderr or "").strip()
if result.returncode == 0 and state == "running":
return DiagnosticCheck(
id="service.prereq",
status=STATUS_OK,
message="systemd --user is available (state=running)",
)
if state == "degraded":
return DiagnosticCheck(
id="service.prereq",
status=STATUS_WARN,
message="systemd --user is available but degraded",
next_step="check your user services and rerun diagnostics before relying on service mode",
)
if stderr:
return DiagnosticCheck(
id="service.prereq",
status=STATUS_FAIL,
message=f"systemd --user is unavailable: {stderr}",
next_step="log into a systemd --user session, then rerun diagnostics",
)
return DiagnosticCheck(
id="service.prereq",
status=STATUS_WARN,
message=f"systemd --user reported state '{state or 'unknown'}'",
next_step="verify the user service manager is healthy before relying on service mode",
)
def _managed_model_check(config_path: Path) -> DiagnosticCheck:
result = probe_managed_model()
if result.status == "ready":
return DiagnosticCheck(
id="model.cache",
status=STATUS_OK,
message=result.message,
)
if result.status == "missing":
return DiagnosticCheck(
id="model.cache",
status=STATUS_WARN,
message=result.message,
next_step=(
"start Aman once on a networked connection so it can download the managed editor model, "
f"then rerun `{self_check_command(config_path)}`"
),
)
return DiagnosticCheck(
id="model.cache",
status=STATUS_FAIL,
message=result.message,
next_step=(
"remove the corrupted managed model cache and rerun Aman on a networked connection, "
f"then rerun `{self_check_command(config_path)}`"
),
)
def _cache_writable_check(config_path: Path) -> DiagnosticCheck:
target = MODEL_DIR
probe_path = target
while not probe_path.exists() and probe_path != probe_path.parent:
probe_path = probe_path.parent
if os.access(probe_path, os.W_OK):
message = (
f"managed model cache directory is writable at {target}"
if target.exists()
else f"managed model cache can be created under {probe_path}"
)
return DiagnosticCheck(
id="cache.writable",
status=STATUS_OK,
message=message,
)
return DiagnosticCheck(
id="cache.writable",
status=STATUS_FAIL,
message=f"managed model cache is not writable under {probe_path}",
next_step=(
f"fix write permissions for {MODEL_DIR}, then rerun `{self_check_command(config_path)}`"
),
)
def _service_unit_check(service_prereq: DiagnosticCheck) -> DiagnosticCheck:
if service_prereq.status == STATUS_FAIL:
return DiagnosticCheck(
id="service.unit",
status=STATUS_WARN,
message="skipped until service.prereq is ready",
next_step="fix service.prereq first, then rerun self-check",
)
result = _run_systemctl_user(
["show", SERVICE_NAME, "--property=FragmentPath", "--value"]
)
fragment_path = (result.stdout or "").strip()
if result.returncode == 0 and fragment_path:
return DiagnosticCheck(
id="service.unit",
status=STATUS_OK,
message=f"user service unit is installed at {fragment_path}",
)
stderr = (result.stderr or "").strip()
if stderr:
return DiagnosticCheck(
id="service.unit",
status=STATUS_FAIL,
message=f"user service unit is unavailable: {stderr}",
next_step="rerun the portable install or reinstall the package-provided user service",
)
return DiagnosticCheck(
id="service.unit",
status=STATUS_FAIL,
message="user service unit is not installed for aman",
next_step="rerun the portable install or reinstall the package-provided user service",
)
def _service_state_check(
service_prereq: DiagnosticCheck,
service_unit: DiagnosticCheck,
) -> DiagnosticCheck:
if service_prereq.status == STATUS_FAIL or service_unit.status == STATUS_FAIL:
return DiagnosticCheck(
id="service.state",
status=STATUS_WARN,
message="skipped until service.prereq and service.unit are ready",
next_step="fix the service prerequisites first, then rerun self-check",
)
enabled_result = _run_systemctl_user(["is-enabled", SERVICE_NAME])
active_result = _run_systemctl_user(["is-active", SERVICE_NAME])
enabled = (enabled_result.stdout or enabled_result.stderr or "").strip()
active = (active_result.stdout or active_result.stderr or "").strip()
if enabled == "enabled" and active == "active":
return DiagnosticCheck(
id="service.state",
status=STATUS_OK,
message="user service is enabled and active",
)
if active == "failed":
return DiagnosticCheck(
id="service.state",
status=STATUS_FAIL,
message="user service is installed but failed to start",
next_step=f"inspect `{journalctl_command()}` to see why aman.service is failing",
)
return DiagnosticCheck(
id="service.state",
status=STATUS_WARN,
message=f"user service state is enabled={enabled or 'unknown'} active={active or 'unknown'}",
next_step=f"run `systemctl --user enable --now {SERVICE_NAME}` and rerun self-check",
)
def _startup_readiness_check(
config: Config | None,
config_path: Path,
model_check: DiagnosticCheck,
cache_check: DiagnosticCheck,
) -> DiagnosticCheck:
if config is None:
return DiagnosticCheck(
id="startup.readiness",
status=STATUS_WARN,
message="skipped until config.load is ready",
next_step=f"fix config.load first, then rerun `{self_check_command(config_path)}`",
)
custom_path = config.models.whisper_model_path.strip()
if custom_path:
path = Path(custom_path)
if not path.exists():
return [
DiagnosticCheck(
id="model.cache",
ok=False,
message=f"custom whisper model path does not exist: {path}",
hint="fix models.whisper_model_path or disable custom model paths",
)
]
try:
model_path = ensure_model()
return [DiagnosticCheck(id="model.cache", ok=True, message=f"editor model is ready at {model_path}")]
except Exception as exc:
return [
DiagnosticCheck(
id="model.cache",
ok=False,
message=f"model is not ready: {exc}",
hint="check internet access and writable cache directory",
return DiagnosticCheck(
id="startup.readiness",
status=STATUS_FAIL,
message=f"custom Whisper model path does not exist: {path}",
next_step="fix models.whisper_model_path or disable custom model paths in Settings",
)
]
try:
from faster_whisper import WhisperModel # type: ignore[import-not-found]
_ = WhisperModel
except ModuleNotFoundError as exc:
return DiagnosticCheck(
id="startup.readiness",
status=STATUS_FAIL,
message=f"Whisper runtime is unavailable: {exc}",
next_step="install Aman's Python runtime dependencies, then rerun self-check",
)
try:
_load_llama_bindings()
except Exception as exc:
return DiagnosticCheck(
id="startup.readiness",
status=STATUS_FAIL,
message=f"editor runtime is unavailable: {exc}",
next_step="install llama-cpp-python and rerun self-check",
)
if cache_check.status == STATUS_FAIL:
return DiagnosticCheck(
id="startup.readiness",
status=STATUS_FAIL,
message="startup is blocked because the managed model cache is not writable",
next_step=cache_check.next_step,
)
if model_check.status == STATUS_FAIL:
return DiagnosticCheck(
id="startup.readiness",
status=STATUS_FAIL,
message="startup is blocked because the managed editor model cache is invalid",
next_step=model_check.next_step,
)
if model_check.status == STATUS_WARN:
return DiagnosticCheck(
id="startup.readiness",
status=STATUS_WARN,
message="startup prerequisites are present, but offline startup is not ready until the managed model is cached",
next_step=model_check.next_step,
)
return DiagnosticCheck(
id="startup.readiness",
status=STATUS_OK,
message="startup prerequisites are ready without requiring downloads",
)
def _resolved_config_path(config_path: str | None) -> Path:
from constants import DEFAULT_CONFIG_PATH
return Path(config_path) if config_path else DEFAULT_CONFIG_PATH
def _run_systemctl_user(args: list[str]) -> subprocess.CompletedProcess[str]:
return subprocess.run(
["systemctl", "--user", *args],
text=True,
capture_output=True,
check=False,
)