202 lines
6.1 KiB
Python
202 lines
6.1 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
from dataclasses import asdict, dataclass
|
|
from pathlib import Path
|
|
|
|
from aiprocess import ensure_model
|
|
from config import Config, load
|
|
from desktop import get_desktop_adapter
|
|
from recorder import resolve_input_device
|
|
|
|
|
|
@dataclass
|
|
class DiagnosticCheck:
|
|
id: str
|
|
ok: bool
|
|
message: str
|
|
hint: str = ""
|
|
|
|
|
|
@dataclass
|
|
class DiagnosticReport:
|
|
checks: list[DiagnosticCheck]
|
|
|
|
@property
|
|
def ok(self) -> bool:
|
|
return all(check.ok for check in self.checks)
|
|
|
|
def to_json(self) -> str:
|
|
payload = {"ok": self.ok, "checks": [asdict(check) for check in self.checks]}
|
|
return json.dumps(payload, ensure_ascii=False, indent=2)
|
|
|
|
|
|
def run_diagnostics(config_path: str | None) -> DiagnosticReport:
|
|
checks: list[DiagnosticCheck] = []
|
|
cfg: Config | None = None
|
|
|
|
try:
|
|
cfg = load(config_path or "")
|
|
checks.append(
|
|
DiagnosticCheck(
|
|
id="config.load",
|
|
ok=True,
|
|
message=f"loaded config from {_resolved_config_path(config_path)}",
|
|
)
|
|
)
|
|
except Exception as exc:
|
|
checks.append(
|
|
DiagnosticCheck(
|
|
id="config.load",
|
|
ok=False,
|
|
message=f"failed to load config: {exc}",
|
|
hint=(
|
|
"open Settings... from Aman tray to save a valid config, or run "
|
|
"`aman init --force` for automation"
|
|
),
|
|
)
|
|
)
|
|
|
|
checks.extend(_audio_check(cfg))
|
|
checks.extend(_hotkey_check(cfg))
|
|
checks.extend(_injection_backend_check(cfg))
|
|
checks.extend(_provider_check(cfg))
|
|
checks.extend(_model_check(cfg))
|
|
return DiagnosticReport(checks=checks)
|
|
|
|
|
|
def _audio_check(cfg: Config | None) -> list[DiagnosticCheck]:
|
|
if cfg is None:
|
|
return [
|
|
DiagnosticCheck(
|
|
id="audio.input",
|
|
ok=False,
|
|
message="skipped because config failed to load",
|
|
hint="fix config.load first",
|
|
)
|
|
]
|
|
input_spec = cfg.recording.input
|
|
explicit = input_spec is not None and (not isinstance(input_spec, str) or bool(input_spec.strip()))
|
|
device = resolve_input_device(input_spec)
|
|
if device is None and explicit:
|
|
return [
|
|
DiagnosticCheck(
|
|
id="audio.input",
|
|
ok=False,
|
|
message=f"recording input '{input_spec}' is not resolvable",
|
|
hint="set recording.input to a valid device index or matching device name",
|
|
)
|
|
]
|
|
if device is None:
|
|
return [
|
|
DiagnosticCheck(
|
|
id="audio.input",
|
|
ok=True,
|
|
message="recording input is unset; default system input will be used",
|
|
)
|
|
]
|
|
return [DiagnosticCheck(id="audio.input", ok=True, message=f"resolved recording input to device {device}")]
|
|
|
|
|
|
def _hotkey_check(cfg: Config | None) -> list[DiagnosticCheck]:
|
|
if cfg is None:
|
|
return [
|
|
DiagnosticCheck(
|
|
id="hotkey.parse",
|
|
ok=False,
|
|
message="skipped because config failed to load",
|
|
hint="fix config.load first",
|
|
)
|
|
]
|
|
try:
|
|
desktop = get_desktop_adapter()
|
|
desktop.validate_hotkey(cfg.daemon.hotkey)
|
|
except Exception as exc:
|
|
return [
|
|
DiagnosticCheck(
|
|
id="hotkey.parse",
|
|
ok=False,
|
|
message=f"hotkey '{cfg.daemon.hotkey}' is not available: {exc}",
|
|
hint="pick another daemon.hotkey such as Super+m",
|
|
)
|
|
]
|
|
return [DiagnosticCheck(id="hotkey.parse", ok=True, message=f"hotkey '{cfg.daemon.hotkey}' is valid")]
|
|
|
|
|
|
def _injection_backend_check(cfg: Config | None) -> list[DiagnosticCheck]:
|
|
if cfg is None:
|
|
return [
|
|
DiagnosticCheck(
|
|
id="injection.backend",
|
|
ok=False,
|
|
message="skipped because config failed to load",
|
|
hint="fix config.load first",
|
|
)
|
|
]
|
|
return [
|
|
DiagnosticCheck(
|
|
id="injection.backend",
|
|
ok=True,
|
|
message=f"injection backend '{cfg.injection.backend}' is configured",
|
|
)
|
|
]
|
|
|
|
|
|
def _provider_check(cfg: Config | None) -> list[DiagnosticCheck]:
|
|
if cfg is None:
|
|
return [
|
|
DiagnosticCheck(
|
|
id="provider.runtime",
|
|
ok=False,
|
|
message="skipped because config failed to load",
|
|
hint="fix config.load first",
|
|
)
|
|
]
|
|
return [
|
|
DiagnosticCheck(
|
|
id="provider.runtime",
|
|
ok=True,
|
|
message=f"stt={cfg.stt.provider}, editor=local_llama_builtin",
|
|
)
|
|
]
|
|
|
|
|
|
def _model_check(cfg: Config | None) -> list[DiagnosticCheck]:
|
|
if cfg is None:
|
|
return [
|
|
DiagnosticCheck(
|
|
id="model.cache",
|
|
ok=False,
|
|
message="skipped because config failed to load",
|
|
hint="fix config.load first",
|
|
)
|
|
]
|
|
if cfg.models.allow_custom_models and cfg.models.whisper_model_path.strip():
|
|
path = Path(cfg.models.whisper_model_path)
|
|
if not path.exists():
|
|
return [
|
|
DiagnosticCheck(
|
|
id="model.cache",
|
|
ok=False,
|
|
message=f"custom whisper model path does not exist: {path}",
|
|
hint="fix models.whisper_model_path or disable custom model paths",
|
|
)
|
|
]
|
|
try:
|
|
model_path = ensure_model()
|
|
return [DiagnosticCheck(id="model.cache", ok=True, message=f"editor model is ready at {model_path}")]
|
|
except Exception as exc:
|
|
return [
|
|
DiagnosticCheck(
|
|
id="model.cache",
|
|
ok=False,
|
|
message=f"model is not ready: {exc}",
|
|
hint="check internet access and writable cache directory",
|
|
)
|
|
]
|
|
|
|
|
|
def _resolved_config_path(config_path: str | None) -> Path:
|
|
from constants import DEFAULT_CONFIG_PATH
|
|
|
|
return Path(config_path) if config_path else DEFAULT_CONFIG_PATH
|