Split aman.py into focused CLI and runtime modules

Break the old god module into flat siblings for CLI parsing, run lifecycle, daemon state, shared processing helpers, benchmark tooling, and maintainer-only model sync so changes stop sharing one giant import graph.

Keep aman as a thin shim over aman_cli, move sync-default-model behind the hidden aman-maint entrypoint plus Make wrappers, and update packaging metadata plus maintainer docs to reflect the new surface.

Retarget the tests to the new seams with dedicated runtime, run, benchmark, maintainer, and entrypoint suites, and verify with python3 -m unittest discover -s tests -p "test_*.py", python3 -m py_compile src/*.py tests/*.py, PYTHONPATH=src python3 -m aman --help, PYTHONPATH=src python3 -m aman version, and PYTHONPATH=src python3 -m aman_maint --help.
This commit is contained in:
Thales Maciel 2026-03-14 14:54:57 -03:00
parent 721248ca26
commit 4d0081d1d0
18 changed files with 2838 additions and 2427 deletions

View file

@ -2,9 +2,14 @@
## Project Structure & Module Organization
- `src/aman.py` is the primary entrypoint (X11 STT daemon).
- `src/aman.py` is the thin console/module entrypoint shim.
- `src/aman_cli.py` owns the main end-user CLI parser and dispatch.
- `src/aman_run.py` owns foreground runtime startup, tray wiring, and settings flow.
- `src/aman_runtime.py` owns the daemon lifecycle and runtime state machine.
- `src/aman_benchmarks.py` owns `bench`, `eval-models`, and heuristic dataset tooling.
- `src/aman_model_sync.py` and `src/aman_maint.py` own maintainer-only model promotion flows.
- `src/recorder.py` handles audio capture using PortAudio via `sounddevice`.
- `src/aman.py` owns Whisper setup and transcription.
- `src/aman_processing.py` owns shared Whisper/editor pipeline helpers.
- `src/aiprocess.py` runs the in-process Llama-3.2-3B cleanup.
- `src/desktop_x11.py` encapsulates X11 hotkeys, tray, and injection.
- `src/desktop_wayland.py` scaffolds Wayland support (exits with a message).
@ -13,7 +18,7 @@
- Install deps (X11): `uv sync`.
- Install deps (Wayland scaffold): `uv sync --extra wayland`.
- Run daemon: `uv run python3 src/aman.py --config ~/.config/aman/config.json`.
- Run daemon: `uv run aman run --config ~/.config/aman/config.json`.
System packages (example names):

View file

@ -32,7 +32,7 @@ self-check:
uv run aman self-check --config $(CONFIG)
runtime-check:
$(PYTHON) -m unittest tests.test_diagnostics tests.test_aman_cli tests.test_aman tests.test_aiprocess
$(PYTHON) -m unittest tests.test_diagnostics tests.test_aman_cli tests.test_aman_run tests.test_aman_runtime tests.test_aiprocess
build-heuristic-dataset:
uv run aman build-heuristic-dataset --input $(EVAL_HEURISTIC_RAW) --output $(EVAL_HEURISTIC_DATASET)
@ -41,10 +41,10 @@ eval-models: build-heuristic-dataset
uv run aman eval-models --dataset $(EVAL_DATASET) --matrix $(EVAL_MATRIX) --heuristic-dataset $(EVAL_HEURISTIC_DATASET) --heuristic-weight $(EVAL_HEURISTIC_WEIGHT) --output $(EVAL_OUTPUT)
sync-default-model:
uv run aman sync-default-model --report $(EVAL_OUTPUT) --artifacts $(MODEL_ARTIFACTS) --constants $(CONSTANTS_FILE)
uv run aman-maint sync-default-model --report $(EVAL_OUTPUT) --artifacts $(MODEL_ARTIFACTS) --constants $(CONSTANTS_FILE)
check-default-model:
uv run aman sync-default-model --check --report $(EVAL_OUTPUT) --artifacts $(MODEL_ARTIFACTS) --constants $(CONSTANTS_FILE)
uv run aman-maint sync-default-model --check --report $(EVAL_OUTPUT) --artifacts $(MODEL_ARTIFACTS) --constants $(CONSTANTS_FILE)
sync:
uv sync

View file

@ -67,7 +67,6 @@ aman run --config ~/.config/aman/config.json
aman bench --text "example transcript" --repeat 5 --warmup 1
aman build-heuristic-dataset --input benchmarks/heuristics_dataset.raw.jsonl --output benchmarks/heuristics_dataset.jsonl --json
aman eval-models --dataset benchmarks/cleanup_dataset.jsonl --matrix benchmarks/model_matrix.small_first.json --heuristic-dataset benchmarks/heuristics_dataset.jsonl --heuristic-weight 0.25 --json
aman sync-default-model --check --report benchmarks/results/latest.json --artifacts benchmarks/model_artifacts.json --constants src/constants.py
aman version
aman init --config ~/.config/aman/config.json --force
```
@ -88,14 +87,20 @@ alignment/editor/fact-guard/vocabulary cleanup and prints timing summaries.
```bash
aman build-heuristic-dataset --input benchmarks/heuristics_dataset.raw.jsonl --output benchmarks/heuristics_dataset.jsonl
aman eval-models --dataset benchmarks/cleanup_dataset.jsonl --matrix benchmarks/model_matrix.small_first.json --heuristic-dataset benchmarks/heuristics_dataset.jsonl --heuristic-weight 0.25 --output benchmarks/results/latest.json
aman sync-default-model --report benchmarks/results/latest.json --artifacts benchmarks/model_artifacts.json --constants src/constants.py
make sync-default-model
```
- `eval-models` runs a structured model/parameter sweep over a JSONL dataset
and outputs latency plus quality metrics.
- When `--heuristic-dataset` is provided, the report also includes
alignment-heuristic quality metrics.
- `sync-default-model` promotes the report winner to the managed default model
constants and can be run in `--check` mode for CI and release gates.
- `make sync-default-model` promotes the report winner to the managed default
model constants and `make check-default-model` keeps that drift check in CI.
Internal maintainer CLI:
```bash
aman-maint sync-default-model --check --report benchmarks/results/latest.json --artifacts benchmarks/model_artifacts.json --constants src/constants.py
```
Dataset and artifact details live in [`benchmarks/README.md`](../benchmarks/README.md).

View file

@ -36,6 +36,7 @@ dependencies = [
[project.scripts]
aman = "aman:main"
aman-maint = "aman_maint:main"
[project.optional-dependencies]
wayland = []
@ -52,6 +53,13 @@ packages = ["engine", "stages"]
py-modules = [
"aiprocess",
"aman",
"aman_benchmarks",
"aman_cli",
"aman_maint",
"aman_model_sync",
"aman_processing",
"aman_run",
"aman_runtime",
"config",
"config_ui",
"constants",

File diff suppressed because it is too large Load diff

363
src/aman_benchmarks.py Normal file
View file

@ -0,0 +1,363 @@
from __future__ import annotations
import json
import logging
import statistics
from dataclasses import asdict, dataclass
from pathlib import Path
from config import ConfigValidationError, load, validate
from constants import DEFAULT_CONFIG_PATH
from engine.pipeline import PipelineEngine
from model_eval import (
build_heuristic_dataset,
format_model_eval_summary,
report_to_json,
run_model_eval,
)
from vocabulary import VocabularyEngine
from aman_processing import build_editor_stage, process_transcript_pipeline
@dataclass
class BenchRunMetrics:
run_index: int
input_chars: int
asr_ms: float
alignment_ms: float
alignment_applied: int
fact_guard_ms: float
fact_guard_action: str
fact_guard_violations: int
editor_ms: float
editor_pass1_ms: float
editor_pass2_ms: float
vocabulary_ms: float
total_ms: float
output_chars: int
@dataclass
class BenchSummary:
runs: int
min_total_ms: float
max_total_ms: float
avg_total_ms: float
p50_total_ms: float
p95_total_ms: float
avg_asr_ms: float
avg_alignment_ms: float
avg_alignment_applied: float
avg_fact_guard_ms: float
avg_fact_guard_violations: float
fallback_runs: int
rejected_runs: int
avg_editor_ms: float
avg_editor_pass1_ms: float
avg_editor_pass2_ms: float
avg_vocabulary_ms: float
@dataclass
class BenchReport:
config_path: str
editor_backend: str
profile: str
stt_language: str
warmup_runs: int
measured_runs: int
runs: list[BenchRunMetrics]
summary: BenchSummary
def _percentile(values: list[float], quantile: float) -> float:
if not values:
return 0.0
ordered = sorted(values)
idx = int(round((len(ordered) - 1) * quantile))
idx = min(max(idx, 0), len(ordered) - 1)
return ordered[idx]
def _summarize_bench_runs(runs: list[BenchRunMetrics]) -> BenchSummary:
if not runs:
return BenchSummary(
runs=0,
min_total_ms=0.0,
max_total_ms=0.0,
avg_total_ms=0.0,
p50_total_ms=0.0,
p95_total_ms=0.0,
avg_asr_ms=0.0,
avg_alignment_ms=0.0,
avg_alignment_applied=0.0,
avg_fact_guard_ms=0.0,
avg_fact_guard_violations=0.0,
fallback_runs=0,
rejected_runs=0,
avg_editor_ms=0.0,
avg_editor_pass1_ms=0.0,
avg_editor_pass2_ms=0.0,
avg_vocabulary_ms=0.0,
)
totals = [item.total_ms for item in runs]
asr = [item.asr_ms for item in runs]
alignment = [item.alignment_ms for item in runs]
alignment_applied = [item.alignment_applied for item in runs]
fact_guard = [item.fact_guard_ms for item in runs]
fact_guard_violations = [item.fact_guard_violations for item in runs]
fallback_runs = sum(1 for item in runs if item.fact_guard_action == "fallback")
rejected_runs = sum(1 for item in runs if item.fact_guard_action == "rejected")
editor = [item.editor_ms for item in runs]
editor_pass1 = [item.editor_pass1_ms for item in runs]
editor_pass2 = [item.editor_pass2_ms for item in runs]
vocab = [item.vocabulary_ms for item in runs]
return BenchSummary(
runs=len(runs),
min_total_ms=min(totals),
max_total_ms=max(totals),
avg_total_ms=sum(totals) / len(totals),
p50_total_ms=statistics.median(totals),
p95_total_ms=_percentile(totals, 0.95),
avg_asr_ms=sum(asr) / len(asr),
avg_alignment_ms=sum(alignment) / len(alignment),
avg_alignment_applied=sum(alignment_applied) / len(alignment_applied),
avg_fact_guard_ms=sum(fact_guard) / len(fact_guard),
avg_fact_guard_violations=sum(fact_guard_violations)
/ len(fact_guard_violations),
fallback_runs=fallback_runs,
rejected_runs=rejected_runs,
avg_editor_ms=sum(editor) / len(editor),
avg_editor_pass1_ms=sum(editor_pass1) / len(editor_pass1),
avg_editor_pass2_ms=sum(editor_pass2) / len(editor_pass2),
avg_vocabulary_ms=sum(vocab) / len(vocab),
)
def _read_bench_input_text(args) -> str:
if args.text_file:
try:
return Path(args.text_file).read_text(encoding="utf-8")
except Exception as exc:
raise RuntimeError(
f"failed to read bench text file '{args.text_file}': {exc}"
) from exc
return args.text
def bench_command(args) -> int:
config_path = Path(args.config) if args.config else DEFAULT_CONFIG_PATH
if args.repeat < 1:
logging.error("bench failed: --repeat must be >= 1")
return 1
if args.warmup < 0:
logging.error("bench failed: --warmup must be >= 0")
return 1
try:
cfg = load(str(config_path))
validate(cfg)
except ConfigValidationError as exc:
logging.error(
"bench failed: invalid config field '%s': %s",
exc.field,
exc.reason,
)
if exc.example_fix:
logging.error("bench example fix: %s", exc.example_fix)
return 1
except Exception as exc:
logging.error("bench failed: %s", exc)
return 1
try:
transcript_input = _read_bench_input_text(args)
except Exception as exc:
logging.error("bench failed: %s", exc)
return 1
if not transcript_input.strip():
logging.error("bench failed: input transcript cannot be empty")
return 1
try:
editor_stage = build_editor_stage(cfg, verbose=args.verbose)
editor_stage.warmup()
except Exception as exc:
logging.error("bench failed: could not initialize editor stage: %s", exc)
return 1
vocabulary = VocabularyEngine(cfg.vocabulary)
pipeline = PipelineEngine(
asr_stage=None,
editor_stage=editor_stage,
vocabulary=vocabulary,
safety_enabled=cfg.safety.enabled,
safety_strict=cfg.safety.strict,
)
stt_lang = cfg.stt.language
logging.info(
"bench started: editor=local_llama_builtin profile=%s language=%s "
"warmup=%d repeat=%d",
cfg.ux.profile,
stt_lang,
args.warmup,
args.repeat,
)
for run_idx in range(args.warmup):
try:
process_transcript_pipeline(
transcript_input,
stt_lang=stt_lang,
pipeline=pipeline,
suppress_ai_errors=False,
verbose=args.verbose,
)
except Exception as exc:
logging.error("bench failed during warmup run %d: %s", run_idx + 1, exc)
return 2
runs: list[BenchRunMetrics] = []
last_output = ""
for run_idx in range(args.repeat):
try:
output, timings = process_transcript_pipeline(
transcript_input,
stt_lang=stt_lang,
pipeline=pipeline,
suppress_ai_errors=False,
verbose=args.verbose,
)
except Exception as exc:
logging.error("bench failed during measured run %d: %s", run_idx + 1, exc)
return 2
last_output = output
metric = BenchRunMetrics(
run_index=run_idx + 1,
input_chars=len(transcript_input),
asr_ms=timings.asr_ms,
alignment_ms=timings.alignment_ms,
alignment_applied=timings.alignment_applied,
fact_guard_ms=timings.fact_guard_ms,
fact_guard_action=timings.fact_guard_action,
fact_guard_violations=timings.fact_guard_violations,
editor_ms=timings.editor_ms,
editor_pass1_ms=timings.editor_pass1_ms,
editor_pass2_ms=timings.editor_pass2_ms,
vocabulary_ms=timings.vocabulary_ms,
total_ms=timings.total_ms,
output_chars=len(output),
)
runs.append(metric)
logging.debug(
"bench run %d/%d: asr=%.2fms align=%.2fms applied=%d guard=%.2fms "
"(action=%s violations=%d) editor=%.2fms "
"(pass1=%.2fms pass2=%.2fms) vocab=%.2fms total=%.2fms",
metric.run_index,
args.repeat,
metric.asr_ms,
metric.alignment_ms,
metric.alignment_applied,
metric.fact_guard_ms,
metric.fact_guard_action,
metric.fact_guard_violations,
metric.editor_ms,
metric.editor_pass1_ms,
metric.editor_pass2_ms,
metric.vocabulary_ms,
metric.total_ms,
)
summary = _summarize_bench_runs(runs)
report = BenchReport(
config_path=str(config_path),
editor_backend="local_llama_builtin",
profile=cfg.ux.profile,
stt_language=stt_lang,
warmup_runs=args.warmup,
measured_runs=args.repeat,
runs=runs,
summary=summary,
)
if args.json:
print(json.dumps(asdict(report), indent=2))
else:
print(
"bench summary: "
f"runs={summary.runs} "
f"total_ms(avg={summary.avg_total_ms:.2f} p50={summary.p50_total_ms:.2f} "
f"p95={summary.p95_total_ms:.2f} min={summary.min_total_ms:.2f} "
f"max={summary.max_total_ms:.2f}) "
f"asr_ms(avg={summary.avg_asr_ms:.2f}) "
f"align_ms(avg={summary.avg_alignment_ms:.2f} "
f"applied_avg={summary.avg_alignment_applied:.2f}) "
f"guard_ms(avg={summary.avg_fact_guard_ms:.2f} "
f"viol_avg={summary.avg_fact_guard_violations:.2f} "
f"fallback={summary.fallback_runs} rejected={summary.rejected_runs}) "
f"editor_ms(avg={summary.avg_editor_ms:.2f} "
f"pass1_avg={summary.avg_editor_pass1_ms:.2f} "
f"pass2_avg={summary.avg_editor_pass2_ms:.2f}) "
f"vocab_ms(avg={summary.avg_vocabulary_ms:.2f})"
)
if args.print_output:
print(last_output)
return 0
def eval_models_command(args) -> int:
try:
report = run_model_eval(
args.dataset,
args.matrix,
heuristic_dataset_path=(args.heuristic_dataset.strip() or None),
heuristic_weight=args.heuristic_weight,
report_version=args.report_version,
verbose=args.verbose,
)
except Exception as exc:
logging.error("eval-models failed: %s", exc)
return 1
payload = report_to_json(report)
if args.output:
try:
output_path = Path(args.output)
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(f"{payload}\n", encoding="utf-8")
except Exception as exc:
logging.error("eval-models failed to write output report: %s", exc)
return 1
logging.info("wrote eval-models report: %s", args.output)
if args.json:
print(payload)
else:
print(format_model_eval_summary(report))
winner_name = str(report.get("winner_recommendation", {}).get("name", "")).strip()
if not winner_name:
return 2
return 0
def build_heuristic_dataset_command(args) -> int:
try:
summary = build_heuristic_dataset(args.input, args.output)
except Exception as exc:
logging.error("build-heuristic-dataset failed: %s", exc)
return 1
if args.json:
print(json.dumps(summary, indent=2, ensure_ascii=False))
else:
print(
"heuristic dataset built: "
f"raw_rows={summary.get('raw_rows', 0)} "
f"written_rows={summary.get('written_rows', 0)} "
f"generated_word_rows={summary.get('generated_word_rows', 0)} "
f"output={summary.get('output_path', '')}"
)
return 0

328
src/aman_cli.py Normal file
View file

@ -0,0 +1,328 @@
from __future__ import annotations
import argparse
import importlib.metadata
import json
import logging
import sys
from pathlib import Path
from config import Config, ConfigValidationError, save
from constants import DEFAULT_CONFIG_PATH
from diagnostics import (
format_diagnostic_line,
run_doctor,
run_self_check,
)
LEGACY_MAINT_COMMANDS = {"sync-default-model"}
def _local_project_version() -> str | None:
pyproject_path = Path(__file__).resolve().parents[1] / "pyproject.toml"
if not pyproject_path.exists():
return None
for line in pyproject_path.read_text(encoding="utf-8").splitlines():
stripped = line.strip()
if stripped.startswith('version = "'):
return stripped.split('"')[1]
return None
def app_version() -> str:
local_version = _local_project_version()
if local_version:
return local_version
try:
return importlib.metadata.version("aman")
except importlib.metadata.PackageNotFoundError:
return "0.0.0-dev"
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
description=(
"Aman is an X11 dictation daemon for Linux desktops. "
"Use `run` for foreground setup/support, `doctor` for fast preflight "
"checks, and `self-check` for deeper installed-system readiness."
),
epilog=(
"Supported daily use is the systemd --user service. "
"For recovery: doctor -> self-check -> journalctl -> "
"aman run --verbose."
),
)
subparsers = parser.add_subparsers(dest="command")
run_parser = subparsers.add_parser(
"run",
help="run Aman in the foreground for setup, support, or debugging",
description="Run Aman in the foreground for setup, support, or debugging.",
)
run_parser.add_argument("--config", default="", help="path to config.json")
run_parser.add_argument("--dry-run", action="store_true", help="log hotkey only")
run_parser.add_argument(
"-v",
"--verbose",
action="store_true",
help="enable verbose logs",
)
doctor_parser = subparsers.add_parser(
"doctor",
help="run fast preflight diagnostics for config and local environment",
description="Run fast preflight diagnostics for config and the local environment.",
)
doctor_parser.add_argument("--config", default="", help="path to config.json")
doctor_parser.add_argument("--json", action="store_true", help="print JSON output")
doctor_parser.add_argument(
"-v",
"--verbose",
action="store_true",
help="enable verbose logs",
)
self_check_parser = subparsers.add_parser(
"self-check",
help="run deeper installed-system readiness diagnostics without modifying local state",
description=(
"Run deeper installed-system readiness diagnostics without modifying "
"local state."
),
)
self_check_parser.add_argument("--config", default="", help="path to config.json")
self_check_parser.add_argument("--json", action="store_true", help="print JSON output")
self_check_parser.add_argument(
"-v",
"--verbose",
action="store_true",
help="enable verbose logs",
)
bench_parser = subparsers.add_parser(
"bench",
help="run the processing flow from input text without stt or injection",
)
bench_parser.add_argument("--config", default="", help="path to config.json")
bench_input = bench_parser.add_mutually_exclusive_group(required=True)
bench_input.add_argument("--text", default="", help="input transcript text")
bench_input.add_argument(
"--text-file",
default="",
help="path to transcript text file",
)
bench_parser.add_argument(
"--repeat",
type=int,
default=1,
help="number of measured runs",
)
bench_parser.add_argument(
"--warmup",
type=int,
default=1,
help="number of warmup runs",
)
bench_parser.add_argument("--json", action="store_true", help="print JSON output")
bench_parser.add_argument(
"--print-output",
action="store_true",
help="print final processed output text",
)
bench_parser.add_argument(
"-v",
"--verbose",
action="store_true",
help="enable verbose logs",
)
eval_parser = subparsers.add_parser(
"eval-models",
help="evaluate model/parameter matrices against expected outputs",
)
eval_parser.add_argument(
"--dataset",
required=True,
help="path to evaluation dataset (.jsonl)",
)
eval_parser.add_argument(
"--matrix",
required=True,
help="path to model matrix (.json)",
)
eval_parser.add_argument(
"--heuristic-dataset",
default="",
help="optional path to heuristic alignment dataset (.jsonl)",
)
eval_parser.add_argument(
"--heuristic-weight",
type=float,
default=0.25,
help="weight for heuristic score contribution to combined ranking (0.0-1.0)",
)
eval_parser.add_argument(
"--report-version",
type=int,
default=2,
help="report schema version to emit",
)
eval_parser.add_argument(
"--output",
default="",
help="optional path to write full JSON report",
)
eval_parser.add_argument("--json", action="store_true", help="print JSON output")
eval_parser.add_argument(
"-v",
"--verbose",
action="store_true",
help="enable verbose logs",
)
heuristic_builder = subparsers.add_parser(
"build-heuristic-dataset",
help="build a canonical heuristic dataset from a raw JSONL source",
)
heuristic_builder.add_argument(
"--input",
required=True,
help="path to raw heuristic dataset (.jsonl)",
)
heuristic_builder.add_argument(
"--output",
required=True,
help="path to canonical heuristic dataset (.jsonl)",
)
heuristic_builder.add_argument(
"--json",
action="store_true",
help="print JSON summary output",
)
heuristic_builder.add_argument(
"-v",
"--verbose",
action="store_true",
help="enable verbose logs",
)
subparsers.add_parser("version", help="print aman version")
init_parser = subparsers.add_parser("init", help="write a default config")
init_parser.add_argument("--config", default="", help="path to config.json")
init_parser.add_argument(
"--force",
action="store_true",
help="overwrite existing config",
)
return parser
def parse_cli_args(argv: list[str]) -> argparse.Namespace:
parser = build_parser()
normalized_argv = list(argv)
known_commands = {
"run",
"doctor",
"self-check",
"bench",
"eval-models",
"build-heuristic-dataset",
"version",
"init",
}
if normalized_argv and normalized_argv[0] in {"-h", "--help"}:
return parser.parse_args(normalized_argv)
if normalized_argv and normalized_argv[0] in LEGACY_MAINT_COMMANDS:
parser.error(
"`sync-default-model` moved to `aman-maint sync-default-model` "
"(or use `make sync-default-model`)."
)
if not normalized_argv or normalized_argv[0] not in known_commands:
normalized_argv = ["run", *normalized_argv]
return parser.parse_args(normalized_argv)
def configure_logging(verbose: bool) -> None:
logging.basicConfig(
stream=sys.stderr,
level=logging.DEBUG if verbose else logging.INFO,
format="aman: %(asctime)s %(levelname)s %(message)s",
)
def diagnostic_command(args, runner) -> int:
report = runner(args.config)
if args.json:
print(report.to_json())
else:
for check in report.checks:
print(format_diagnostic_line(check))
print(f"overall: {report.status}")
return 0 if report.ok else 2
def doctor_command(args) -> int:
return diagnostic_command(args, run_doctor)
def self_check_command(args) -> int:
return diagnostic_command(args, run_self_check)
def version_command(_args) -> int:
print(app_version())
return 0
def init_command(args) -> int:
config_path = Path(args.config) if args.config else DEFAULT_CONFIG_PATH
if config_path.exists() and not args.force:
logging.error(
"init failed: config already exists at %s (use --force to overwrite)",
config_path,
)
return 1
cfg = Config()
save(config_path, cfg)
logging.info("wrote default config to %s", config_path)
return 0
def main(argv: list[str] | None = None) -> int:
args = parse_cli_args(list(argv) if argv is not None else sys.argv[1:])
if args.command == "run":
configure_logging(args.verbose)
from aman_run import run_command
return run_command(args)
if args.command == "doctor":
configure_logging(args.verbose)
return diagnostic_command(args, run_doctor)
if args.command == "self-check":
configure_logging(args.verbose)
return diagnostic_command(args, run_self_check)
if args.command == "bench":
configure_logging(args.verbose)
from aman_benchmarks import bench_command
return bench_command(args)
if args.command == "eval-models":
configure_logging(args.verbose)
from aman_benchmarks import eval_models_command
return eval_models_command(args)
if args.command == "build-heuristic-dataset":
configure_logging(args.verbose)
from aman_benchmarks import build_heuristic_dataset_command
return build_heuristic_dataset_command(args)
if args.command == "version":
configure_logging(False)
return version_command(args)
if args.command == "init":
configure_logging(False)
return init_command(args)
raise RuntimeError(f"unsupported command: {args.command}")

70
src/aman_maint.py Normal file
View file

@ -0,0 +1,70 @@
from __future__ import annotations
import argparse
import logging
import sys
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
description="Maintainer commands for Aman release and packaging workflows."
)
subparsers = parser.add_subparsers(dest="command")
subparsers.required = True
sync_model_parser = subparsers.add_parser(
"sync-default-model",
help="sync managed editor model constants with benchmark winner report",
)
sync_model_parser.add_argument(
"--report",
default="benchmarks/results/latest.json",
help="path to winner report JSON",
)
sync_model_parser.add_argument(
"--artifacts",
default="benchmarks/model_artifacts.json",
help="path to model artifact registry JSON",
)
sync_model_parser.add_argument(
"--constants",
default="src/constants.py",
help="path to constants module to update/check",
)
sync_model_parser.add_argument(
"--check",
action="store_true",
help="check only; exit non-zero if constants do not match winner",
)
sync_model_parser.add_argument(
"--json",
action="store_true",
help="print JSON summary output",
)
return parser
def parse_args(argv: list[str]) -> argparse.Namespace:
return build_parser().parse_args(argv)
def _configure_logging() -> None:
logging.basicConfig(
stream=sys.stderr,
level=logging.INFO,
format="aman: %(asctime)s %(levelname)s %(message)s",
)
def main(argv: list[str] | None = None) -> int:
args = parse_args(list(argv) if argv is not None else sys.argv[1:])
_configure_logging()
if args.command == "sync-default-model":
from aman_model_sync import sync_default_model_command
return sync_default_model_command(args)
raise RuntimeError(f"unsupported maintainer command: {args.command}")
if __name__ == "__main__":
raise SystemExit(main())

239
src/aman_model_sync.py Normal file
View file

@ -0,0 +1,239 @@
from __future__ import annotations
import ast
import json
import logging
from pathlib import Path
from typing import Any
def _read_json_file(path: Path) -> Any:
if not path.exists():
raise RuntimeError(f"file does not exist: {path}")
try:
return json.loads(path.read_text(encoding="utf-8"))
except Exception as exc:
raise RuntimeError(f"invalid json file '{path}': {exc}") from exc
def _load_winner_name(report_path: Path) -> str:
payload = _read_json_file(report_path)
if not isinstance(payload, dict):
raise RuntimeError(f"model report must be an object: {report_path}")
winner = payload.get("winner_recommendation")
if not isinstance(winner, dict):
raise RuntimeError(
f"report is missing winner_recommendation object: {report_path}"
)
winner_name = str(winner.get("name", "")).strip()
if not winner_name:
raise RuntimeError(
f"winner_recommendation.name is missing in report: {report_path}"
)
return winner_name
def _load_model_artifact(artifacts_path: Path, model_name: str) -> dict[str, str]:
payload = _read_json_file(artifacts_path)
if not isinstance(payload, dict):
raise RuntimeError(f"artifact registry must be an object: {artifacts_path}")
models_raw = payload.get("models")
if not isinstance(models_raw, list):
raise RuntimeError(
f"artifact registry missing 'models' array: {artifacts_path}"
)
wanted = model_name.strip().casefold()
for row in models_raw:
if not isinstance(row, dict):
continue
name = str(row.get("name", "")).strip()
if not name:
continue
if name.casefold() != wanted:
continue
filename = str(row.get("filename", "")).strip()
url = str(row.get("url", "")).strip()
sha256 = str(row.get("sha256", "")).strip().lower()
is_hex = len(sha256) == 64 and all(
ch in "0123456789abcdef" for ch in sha256
)
if not filename or not url or not is_hex:
raise RuntimeError(
f"artifact '{name}' is missing filename/url/sha256 in {artifacts_path}"
)
return {
"name": name,
"filename": filename,
"url": url,
"sha256": sha256,
}
raise RuntimeError(
f"winner '{model_name}' is not present in artifact registry: {artifacts_path}"
)
def _load_model_constants(constants_path: Path) -> dict[str, str]:
if not constants_path.exists():
raise RuntimeError(f"constants file does not exist: {constants_path}")
source = constants_path.read_text(encoding="utf-8")
try:
tree = ast.parse(source, filename=str(constants_path))
except Exception as exc:
raise RuntimeError(
f"failed to parse constants module '{constants_path}': {exc}"
) from exc
target_names = {"MODEL_NAME", "MODEL_URL", "MODEL_SHA256"}
values: dict[str, str] = {}
for node in tree.body:
if not isinstance(node, ast.Assign):
continue
for target in node.targets:
if not isinstance(target, ast.Name):
continue
if target.id not in target_names:
continue
try:
value = ast.literal_eval(node.value)
except Exception as exc:
raise RuntimeError(
f"failed to evaluate {target.id} from {constants_path}: {exc}"
) from exc
if not isinstance(value, str):
raise RuntimeError(f"{target.id} must be a string in {constants_path}")
values[target.id] = value
missing = sorted(name for name in target_names if name not in values)
if missing:
raise RuntimeError(
f"constants file is missing required assignments: {', '.join(missing)}"
)
return values
def _write_model_constants(
constants_path: Path,
*,
model_name: str,
model_url: str,
model_sha256: str,
) -> None:
source = constants_path.read_text(encoding="utf-8")
try:
tree = ast.parse(source, filename=str(constants_path))
except Exception as exc:
raise RuntimeError(
f"failed to parse constants module '{constants_path}': {exc}"
) from exc
line_ranges: dict[str, tuple[int, int]] = {}
for node in tree.body:
if not isinstance(node, ast.Assign):
continue
start = getattr(node, "lineno", None)
end = getattr(node, "end_lineno", None)
if start is None or end is None:
continue
for target in node.targets:
if not isinstance(target, ast.Name):
continue
if target.id in {"MODEL_NAME", "MODEL_URL", "MODEL_SHA256"}:
line_ranges[target.id] = (int(start), int(end))
missing = sorted(
name
for name in ("MODEL_NAME", "MODEL_URL", "MODEL_SHA256")
if name not in line_ranges
)
if missing:
raise RuntimeError(
f"constants file is missing assignments to update: {', '.join(missing)}"
)
lines = source.splitlines()
replacements = {
"MODEL_NAME": f'MODEL_NAME = "{model_name}"',
"MODEL_URL": f'MODEL_URL = "{model_url}"',
"MODEL_SHA256": f'MODEL_SHA256 = "{model_sha256}"',
}
for key in sorted(line_ranges, key=lambda item: line_ranges[item][0], reverse=True):
start, end = line_ranges[key]
lines[start - 1 : end] = [replacements[key]]
rendered = "\n".join(lines)
if source.endswith("\n"):
rendered = f"{rendered}\n"
constants_path.write_text(rendered, encoding="utf-8")
def sync_default_model_command(args) -> int:
report_path = Path(args.report)
artifacts_path = Path(args.artifacts)
constants_path = Path(args.constants)
try:
winner_name = _load_winner_name(report_path)
artifact = _load_model_artifact(artifacts_path, winner_name)
current = _load_model_constants(constants_path)
except Exception as exc:
logging.error("sync-default-model failed: %s", exc)
return 1
expected = {
"MODEL_NAME": artifact["filename"],
"MODEL_URL": artifact["url"],
"MODEL_SHA256": artifact["sha256"],
}
changed_fields = [
key
for key in ("MODEL_NAME", "MODEL_URL", "MODEL_SHA256")
if str(current.get(key, "")).strip() != str(expected[key]).strip()
]
in_sync = len(changed_fields) == 0
summary = {
"report": str(report_path),
"artifacts": str(artifacts_path),
"constants": str(constants_path),
"winner_name": winner_name,
"in_sync": in_sync,
"changed_fields": changed_fields,
}
if args.check:
if args.json:
print(json.dumps(summary, indent=2, ensure_ascii=False))
if in_sync:
logging.info(
"default model constants are in sync with winner '%s'",
winner_name,
)
return 0
logging.error(
"default model constants are out of sync with winner '%s' (%s)",
winner_name,
", ".join(changed_fields),
)
return 2
if in_sync:
logging.info("default model already matches winner '%s'", winner_name)
else:
try:
_write_model_constants(
constants_path,
model_name=artifact["filename"],
model_url=artifact["url"],
model_sha256=artifact["sha256"],
)
except Exception as exc:
logging.error("sync-default-model failed while writing constants: %s", exc)
return 1
logging.info(
"default model updated to '%s' (%s)",
winner_name,
", ".join(changed_fields),
)
summary["updated"] = True
if args.json:
print(json.dumps(summary, indent=2, ensure_ascii=False))
return 0

160
src/aman_processing.py Normal file
View file

@ -0,0 +1,160 @@
from __future__ import annotations
import logging
from dataclasses import dataclass
from pathlib import Path
from aiprocess import LlamaProcessor
from config import Config
from engine.pipeline import PipelineEngine
from stages.asr_whisper import AsrResult
from stages.editor_llama import LlamaEditorStage
@dataclass
class TranscriptProcessTimings:
asr_ms: float
alignment_ms: float
alignment_applied: int
fact_guard_ms: float
fact_guard_action: str
fact_guard_violations: int
editor_ms: float
editor_pass1_ms: float
editor_pass2_ms: float
vocabulary_ms: float
total_ms: float
def build_whisper_model(model_name: str, device: str):
try:
from faster_whisper import WhisperModel # type: ignore[import-not-found]
except ModuleNotFoundError as exc:
raise RuntimeError(
"faster-whisper is not installed; install dependencies with `uv sync`"
) from exc
return WhisperModel(
model_name,
device=device,
compute_type=_compute_type(device),
)
def _compute_type(device: str) -> str:
dev = (device or "cpu").lower()
if dev.startswith("cuda"):
return "float16"
return "int8"
def resolve_whisper_model_spec(cfg: Config) -> str:
if cfg.stt.provider != "local_whisper":
raise RuntimeError(f"unsupported stt provider: {cfg.stt.provider}")
custom_path = cfg.models.whisper_model_path.strip()
if not custom_path:
return cfg.stt.model
if not cfg.models.allow_custom_models:
raise RuntimeError(
"custom whisper model path requires models.allow_custom_models=true"
)
path = Path(custom_path)
if not path.exists():
raise RuntimeError(f"custom whisper model path does not exist: {path}")
return str(path)
def build_editor_stage(cfg: Config, *, verbose: bool) -> LlamaEditorStage:
processor = LlamaProcessor(
verbose=verbose,
model_path=None,
)
return LlamaEditorStage(
processor,
profile=cfg.ux.profile,
)
def process_transcript_pipeline(
text: str,
*,
stt_lang: str,
pipeline: PipelineEngine,
suppress_ai_errors: bool,
asr_result: AsrResult | None = None,
asr_ms: float = 0.0,
verbose: bool = False,
) -> tuple[str, TranscriptProcessTimings]:
processed = (text or "").strip()
if not processed:
return processed, TranscriptProcessTimings(
asr_ms=asr_ms,
alignment_ms=0.0,
alignment_applied=0,
fact_guard_ms=0.0,
fact_guard_action="accepted",
fact_guard_violations=0,
editor_ms=0.0,
editor_pass1_ms=0.0,
editor_pass2_ms=0.0,
vocabulary_ms=0.0,
total_ms=asr_ms,
)
try:
if asr_result is not None:
result = pipeline.run_asr_result(asr_result)
else:
result = pipeline.run_transcript(processed, language=stt_lang)
except Exception as exc:
if suppress_ai_errors:
logging.error("editor stage failed: %s", exc)
return processed, TranscriptProcessTimings(
asr_ms=asr_ms,
alignment_ms=0.0,
alignment_applied=0,
fact_guard_ms=0.0,
fact_guard_action="accepted",
fact_guard_violations=0,
editor_ms=0.0,
editor_pass1_ms=0.0,
editor_pass2_ms=0.0,
vocabulary_ms=0.0,
total_ms=asr_ms,
)
raise
processed = result.output_text
editor_ms = result.editor.latency_ms if result.editor else 0.0
editor_pass1_ms = result.editor.pass1_ms if result.editor else 0.0
editor_pass2_ms = result.editor.pass2_ms if result.editor else 0.0
if verbose and result.alignment_decisions:
preview = "; ".join(
decision.reason for decision in result.alignment_decisions[:3]
)
logging.debug(
"alignment: applied=%d skipped=%d decisions=%d preview=%s",
result.alignment_applied,
result.alignment_skipped,
len(result.alignment_decisions),
preview,
)
if verbose and result.fact_guard_violations > 0:
preview = "; ".join(item.reason for item in result.fact_guard_details[:3])
logging.debug(
"fact_guard: action=%s violations=%d preview=%s",
result.fact_guard_action,
result.fact_guard_violations,
preview,
)
total_ms = asr_ms + result.total_ms
return processed, TranscriptProcessTimings(
asr_ms=asr_ms,
alignment_ms=result.alignment_ms,
alignment_applied=result.alignment_applied,
fact_guard_ms=result.fact_guard_ms,
fact_guard_action=result.fact_guard_action,
fact_guard_violations=result.fact_guard_violations,
editor_ms=editor_ms,
editor_pass1_ms=editor_pass1_ms,
editor_pass2_ms=editor_pass2_ms,
vocabulary_ms=result.vocabulary_ms,
total_ms=total_ms,
)

458
src/aman_run.py Normal file
View file

@ -0,0 +1,458 @@
from __future__ import annotations
import errno
import json
import logging
import os
import signal
import threading
from pathlib import Path
from config import Config, ConfigValidationError, load, redacted_dict, save, validate
from constants import DEFAULT_CONFIG_PATH, MODEL_PATH
from desktop import get_desktop_adapter
from diagnostics import (
doctor_command,
format_diagnostic_line,
format_support_line,
journalctl_command,
run_self_check,
self_check_command,
verbose_run_command,
)
from aman_runtime import Daemon, State
_LOCK_HANDLE = None
def _log_support_issue(
level: int,
issue_id: str,
message: str,
*,
next_step: str = "",
) -> None:
logging.log(level, format_support_line(issue_id, message, next_step=next_step))
def load_config_ui_attr(attr_name: str):
try:
from config_ui import __dict__ as config_ui_exports
except ModuleNotFoundError as exc:
missing_name = exc.name or "unknown"
raise RuntimeError(
"settings UI is unavailable because a required X11 Python dependency "
f"is missing ({missing_name})"
) from exc
return config_ui_exports[attr_name]
def run_config_ui(*args, **kwargs):
return load_config_ui_attr("run_config_ui")(*args, **kwargs)
def show_help_dialog() -> None:
load_config_ui_attr("show_help_dialog")()
def show_about_dialog() -> None:
load_config_ui_attr("show_about_dialog")()
def _read_lock_pid(lock_file) -> str:
lock_file.seek(0)
return lock_file.read().strip()
def lock_single_instance():
runtime_dir = Path(os.getenv("XDG_RUNTIME_DIR", "/tmp")) / "aman"
runtime_dir.mkdir(parents=True, exist_ok=True)
lock_path = runtime_dir / "aman.lock"
lock_file = open(lock_path, "a+", encoding="utf-8")
try:
import fcntl
fcntl.flock(lock_file, fcntl.LOCK_EX | fcntl.LOCK_NB)
except BlockingIOError as exc:
pid = _read_lock_pid(lock_file)
lock_file.close()
if pid:
raise SystemExit(f"already running (pid={pid})") from exc
raise SystemExit("already running") from exc
except OSError as exc:
if exc.errno in (errno.EACCES, errno.EAGAIN):
pid = _read_lock_pid(lock_file)
lock_file.close()
if pid:
raise SystemExit(f"already running (pid={pid})") from exc
raise SystemExit("already running") from exc
raise
lock_file.seek(0)
lock_file.truncate()
lock_file.write(f"{os.getpid()}\n")
lock_file.flush()
return lock_file
def run_settings_required_tray(desktop, config_path: Path) -> bool:
reopen_settings = {"value": False}
def open_settings_callback():
reopen_settings["value"] = True
desktop.request_quit()
desktop.run_tray(
lambda: "settings_required",
lambda: None,
on_open_settings=open_settings_callback,
on_show_help=show_help_dialog,
on_show_about=show_about_dialog,
on_open_config=lambda: logging.info("config path: %s", config_path),
)
return reopen_settings["value"]
def run_settings_until_config_ready(
desktop,
config_path: Path,
initial_cfg: Config,
) -> Config | None:
draft_cfg = initial_cfg
while True:
result = run_config_ui(
draft_cfg,
desktop,
required=True,
config_path=config_path,
)
if result.saved and result.config is not None:
try:
saved_path = save(config_path, result.config)
except ConfigValidationError as exc:
logging.error(
"settings apply failed: invalid config field '%s': %s",
exc.field,
exc.reason,
)
if exc.example_fix:
logging.error("settings example fix: %s", exc.example_fix)
except Exception as exc:
logging.error("settings save failed: %s", exc)
else:
logging.info("settings saved to %s", saved_path)
return result.config
draft_cfg = result.config
else:
if result.closed_reason:
logging.info("settings were not saved (%s)", result.closed_reason)
if not run_settings_required_tray(desktop, config_path):
logging.info("settings required mode dismissed by user")
return None
def load_runtime_config(config_path: Path) -> Config:
if config_path.exists():
return load(str(config_path))
raise FileNotFoundError(str(config_path))
def run_command(args) -> int:
global _LOCK_HANDLE
config_path = Path(args.config) if args.config else DEFAULT_CONFIG_PATH
config_existed_before_start = config_path.exists()
try:
_LOCK_HANDLE = lock_single_instance()
except Exception as exc:
logging.error("startup failed: %s", exc)
return 1
try:
desktop = get_desktop_adapter()
except Exception as exc:
_log_support_issue(
logging.ERROR,
"session.x11",
f"startup failed: {exc}",
next_step="log into an X11 session and rerun Aman",
)
return 1
if not config_existed_before_start:
cfg = run_settings_until_config_ready(desktop, config_path, Config())
if cfg is None:
return 0
else:
try:
cfg = load_runtime_config(config_path)
except ConfigValidationError as exc:
_log_support_issue(
logging.ERROR,
"config.load",
f"startup failed: invalid config field '{exc.field}': {exc.reason}",
next_step=f"run `{doctor_command(config_path)}` after fixing the config",
)
if exc.example_fix:
logging.error("example fix: %s", exc.example_fix)
return 1
except Exception as exc:
_log_support_issue(
logging.ERROR,
"config.load",
f"startup failed: {exc}",
next_step=f"run `{doctor_command(config_path)}` to inspect config readiness",
)
return 1
try:
validate(cfg)
except ConfigValidationError as exc:
_log_support_issue(
logging.ERROR,
"config.load",
f"startup failed: invalid config field '{exc.field}': {exc.reason}",
next_step=f"run `{doctor_command(config_path)}` after fixing the config",
)
if exc.example_fix:
logging.error("example fix: %s", exc.example_fix)
return 1
except Exception as exc:
_log_support_issue(
logging.ERROR,
"config.load",
f"startup failed: {exc}",
next_step=f"run `{doctor_command(config_path)}` to inspect config readiness",
)
return 1
logging.info("hotkey: %s", cfg.daemon.hotkey)
logging.info(
"config (%s):\n%s",
str(config_path),
json.dumps(redacted_dict(cfg), indent=2),
)
if not config_existed_before_start:
logging.info("first launch settings completed")
logging.info(
"runtime: pid=%s session=%s display=%s wayland_display=%s verbose=%s dry_run=%s",
os.getpid(),
os.getenv("XDG_SESSION_TYPE", ""),
os.getenv("DISPLAY", ""),
os.getenv("WAYLAND_DISPLAY", ""),
args.verbose,
args.dry_run,
)
logging.info("editor backend: local_llama_builtin (%s)", MODEL_PATH)
try:
daemon = Daemon(cfg, desktop, verbose=args.verbose, config_path=config_path)
except Exception as exc:
_log_support_issue(
logging.ERROR,
"startup.readiness",
f"startup failed: {exc}",
next_step=(
f"run `{self_check_command(config_path)}` and inspect "
f"`{journalctl_command()}` if the service still fails"
),
)
return 1
shutdown_once = threading.Event()
def shutdown(reason: str):
if shutdown_once.is_set():
return
shutdown_once.set()
logging.info("%s, shutting down", reason)
try:
desktop.stop_hotkey_listener()
except Exception as exc:
logging.debug("failed to stop hotkey listener: %s", exc)
if not daemon.shutdown(timeout=5.0):
logging.warning("timed out waiting for idle state during shutdown")
desktop.request_quit()
def handle_signal(_sig, _frame):
threading.Thread(
target=shutdown,
args=("signal received",),
daemon=True,
).start()
signal.signal(signal.SIGINT, handle_signal)
signal.signal(signal.SIGTERM, handle_signal)
def hotkey_callback():
if args.dry_run:
logging.info("hotkey pressed (dry-run)")
return
daemon.toggle()
def reload_config_callback():
nonlocal cfg
try:
new_cfg = load(str(config_path))
except ConfigValidationError as exc:
_log_support_issue(
logging.ERROR,
"config.load",
f"reload failed: invalid config field '{exc.field}': {exc.reason}",
next_step=f"run `{doctor_command(config_path)}` after fixing the config",
)
if exc.example_fix:
logging.error("reload example fix: %s", exc.example_fix)
return
except Exception as exc:
_log_support_issue(
logging.ERROR,
"config.load",
f"reload failed: {exc}",
next_step=f"run `{doctor_command(config_path)}` to inspect config readiness",
)
return
try:
desktop.start_hotkey_listener(new_cfg.daemon.hotkey, hotkey_callback)
except Exception as exc:
_log_support_issue(
logging.ERROR,
"hotkey.parse",
f"reload failed: could not apply hotkey '{new_cfg.daemon.hotkey}': {exc}",
next_step=(
f"run `{doctor_command(config_path)}` and choose a different "
"hotkey in Settings"
),
)
return
try:
daemon.apply_config(new_cfg)
except Exception as exc:
_log_support_issue(
logging.ERROR,
"startup.readiness",
f"reload failed: could not apply runtime engines: {exc}",
next_step=(
f"run `{self_check_command(config_path)}` and then "
f"`{verbose_run_command(config_path)}`"
),
)
return
cfg = new_cfg
logging.info("config reloaded from %s", config_path)
def open_settings_callback():
nonlocal cfg
if daemon.get_state() != State.IDLE:
logging.info("settings UI is available only while idle")
return
result = run_config_ui(
cfg,
desktop,
required=False,
config_path=config_path,
)
if not result.saved or result.config is None:
logging.info("settings closed without changes")
return
try:
save(config_path, result.config)
desktop.start_hotkey_listener(result.config.daemon.hotkey, hotkey_callback)
except ConfigValidationError as exc:
_log_support_issue(
logging.ERROR,
"config.load",
f"settings apply failed: invalid config field '{exc.field}': {exc.reason}",
next_step=f"run `{doctor_command(config_path)}` after fixing the config",
)
if exc.example_fix:
logging.error("settings example fix: %s", exc.example_fix)
return
except Exception as exc:
_log_support_issue(
logging.ERROR,
"hotkey.parse",
f"settings apply failed: {exc}",
next_step=(
f"run `{doctor_command(config_path)}` and check the configured "
"hotkey"
),
)
return
try:
daemon.apply_config(result.config)
except Exception as exc:
_log_support_issue(
logging.ERROR,
"startup.readiness",
f"settings apply failed: could not apply runtime engines: {exc}",
next_step=(
f"run `{self_check_command(config_path)}` and then "
f"`{verbose_run_command(config_path)}`"
),
)
return
cfg = result.config
logging.info("settings applied from tray")
def run_diagnostics_callback():
report = run_self_check(str(config_path))
if report.status == "ok":
logging.info(
"diagnostics finished (%s, %d checks)",
report.status,
len(report.checks),
)
return
flagged = [check for check in report.checks if check.status != "ok"]
logging.warning(
"diagnostics finished (%s, %d/%d checks need attention)",
report.status,
len(flagged),
len(report.checks),
)
for check in flagged:
logging.warning("%s", format_diagnostic_line(check))
def open_config_path_callback():
logging.info("config path: %s", config_path)
try:
desktop.start_hotkey_listener(
cfg.daemon.hotkey,
hotkey_callback,
)
except Exception as exc:
_log_support_issue(
logging.ERROR,
"hotkey.parse",
f"hotkey setup failed: {exc}",
next_step=(
f"run `{doctor_command(config_path)}` and choose a different hotkey "
"if needed"
),
)
return 1
logging.info("ready")
try:
desktop.run_tray(
daemon.get_state,
lambda: shutdown("quit requested"),
on_open_settings=open_settings_callback,
on_show_help=show_help_dialog,
on_show_about=show_about_dialog,
is_paused_getter=daemon.is_paused,
on_toggle_pause=daemon.toggle_paused,
on_reload_config=reload_config_callback,
on_run_diagnostics=run_diagnostics_callback,
on_open_config=open_config_path_callback,
)
finally:
try:
desktop.stop_hotkey_listener()
except Exception:
pass
daemon.shutdown(timeout=1.0)
return 0

485
src/aman_runtime.py Normal file
View file

@ -0,0 +1,485 @@
from __future__ import annotations
import inspect
import logging
import threading
import time
from typing import Any
from config import Config
from constants import DEFAULT_CONFIG_PATH, RECORD_TIMEOUT_SEC
from diagnostics import (
doctor_command,
format_support_line,
journalctl_command,
self_check_command,
verbose_run_command,
)
from engine.pipeline import PipelineEngine
from recorder import start_recording as start_audio_recording
from recorder import stop_recording as stop_audio_recording
from stages.asr_whisper import AsrResult, WhisperAsrStage
from vocabulary import VocabularyEngine
from aman_processing import (
build_editor_stage,
build_whisper_model,
process_transcript_pipeline,
resolve_whisper_model_spec,
)
class State:
IDLE = "idle"
RECORDING = "recording"
STT = "stt"
PROCESSING = "processing"
OUTPUTTING = "outputting"
def _log_support_issue(
level: int,
issue_id: str,
message: str,
*,
next_step: str = "",
) -> None:
logging.log(level, format_support_line(issue_id, message, next_step=next_step))
class Daemon:
def __init__(
self,
cfg: Config,
desktop,
*,
verbose: bool = False,
config_path=None,
):
self.cfg = cfg
self.desktop = desktop
self.verbose = verbose
self.config_path = config_path or DEFAULT_CONFIG_PATH
self.lock = threading.Lock()
self._shutdown_requested = threading.Event()
self._paused = False
self.state = State.IDLE
self.stream = None
self.record = None
self.timer: threading.Timer | None = None
self.vocabulary = VocabularyEngine(cfg.vocabulary)
self._stt_hint_kwargs_cache: dict[str, Any] | None = None
self.model = build_whisper_model(
resolve_whisper_model_spec(cfg),
cfg.stt.device,
)
self.asr_stage = WhisperAsrStage(
self.model,
configured_language=cfg.stt.language,
hint_kwargs_provider=self._stt_hint_kwargs,
)
logging.info("initializing editor stage (local_llama_builtin)")
self.editor_stage = build_editor_stage(cfg, verbose=self.verbose)
self._warmup_editor_stage()
self.pipeline = PipelineEngine(
asr_stage=self.asr_stage,
editor_stage=self.editor_stage,
vocabulary=self.vocabulary,
safety_enabled=cfg.safety.enabled,
safety_strict=cfg.safety.strict,
)
logging.info("editor stage ready")
self.log_transcript = verbose
def _arm_cancel_listener(self) -> bool:
try:
self.desktop.start_cancel_listener(lambda: self.cancel_recording())
return True
except Exception as exc:
logging.error("failed to start cancel listener: %s", exc)
return False
def _disarm_cancel_listener(self):
try:
self.desktop.stop_cancel_listener()
except Exception as exc:
logging.debug("failed to stop cancel listener: %s", exc)
def set_state(self, state: str):
with self.lock:
prev = self.state
self.state = state
if prev != state:
logging.debug("state: %s -> %s", prev, state)
else:
logging.debug("redundant state set: %s", state)
def get_state(self):
with self.lock:
return self.state
def request_shutdown(self):
self._shutdown_requested.set()
def is_paused(self) -> bool:
with self.lock:
return self._paused
def toggle_paused(self) -> bool:
with self.lock:
self._paused = not self._paused
paused = self._paused
logging.info("pause %s", "enabled" if paused else "disabled")
return paused
def apply_config(self, cfg: Config) -> None:
new_model = build_whisper_model(
resolve_whisper_model_spec(cfg),
cfg.stt.device,
)
new_vocabulary = VocabularyEngine(cfg.vocabulary)
new_stt_hint_kwargs_cache: dict[str, Any] | None = None
def _hint_kwargs_provider() -> dict[str, Any]:
nonlocal new_stt_hint_kwargs_cache
if new_stt_hint_kwargs_cache is not None:
return new_stt_hint_kwargs_cache
hotwords, initial_prompt = new_vocabulary.build_stt_hints()
if not hotwords and not initial_prompt:
new_stt_hint_kwargs_cache = {}
return new_stt_hint_kwargs_cache
try:
signature = inspect.signature(new_model.transcribe)
except (TypeError, ValueError):
logging.debug("stt signature inspection failed; skipping hints")
new_stt_hint_kwargs_cache = {}
return new_stt_hint_kwargs_cache
params = signature.parameters
kwargs: dict[str, Any] = {}
if hotwords and "hotwords" in params:
kwargs["hotwords"] = hotwords
if initial_prompt and "initial_prompt" in params:
kwargs["initial_prompt"] = initial_prompt
if not kwargs:
logging.debug(
"stt hint arguments are not supported by this whisper runtime"
)
new_stt_hint_kwargs_cache = kwargs
return new_stt_hint_kwargs_cache
new_asr_stage = WhisperAsrStage(
new_model,
configured_language=cfg.stt.language,
hint_kwargs_provider=_hint_kwargs_provider,
)
new_editor_stage = build_editor_stage(cfg, verbose=self.verbose)
new_editor_stage.warmup()
new_pipeline = PipelineEngine(
asr_stage=new_asr_stage,
editor_stage=new_editor_stage,
vocabulary=new_vocabulary,
safety_enabled=cfg.safety.enabled,
safety_strict=cfg.safety.strict,
)
with self.lock:
self.cfg = cfg
self.model = new_model
self.vocabulary = new_vocabulary
self._stt_hint_kwargs_cache = None
self.asr_stage = new_asr_stage
self.editor_stage = new_editor_stage
self.pipeline = new_pipeline
logging.info("applied new runtime config")
def toggle(self):
should_stop = False
with self.lock:
if self._shutdown_requested.is_set():
logging.info("shutdown in progress, trigger ignored")
return
if self.state == State.IDLE:
if self._paused:
logging.info("paused, trigger ignored")
return
self._start_recording_locked()
return
if self.state == State.RECORDING:
should_stop = True
else:
logging.info("busy (%s), trigger ignored", self.state)
if should_stop:
self.stop_recording(trigger="user")
def _start_recording_locked(self):
if self.state != State.IDLE:
logging.info("busy (%s), trigger ignored", self.state)
return
try:
stream, record = start_audio_recording(self.cfg.recording.input)
except Exception as exc:
_log_support_issue(
logging.ERROR,
"audio.input",
f"record start failed: {exc}",
next_step=(
f"run `{doctor_command(self.config_path)}` and verify the "
"selected input device"
),
)
return
if not self._arm_cancel_listener():
try:
stream.stop()
except Exception:
pass
try:
stream.close()
except Exception:
pass
logging.error(
"recording start aborted because cancel listener is unavailable"
)
return
self.stream = stream
self.record = record
prev = self.state
self.state = State.RECORDING
logging.debug("state: %s -> %s", prev, self.state)
logging.info("recording started")
if self.timer:
self.timer.cancel()
self.timer = threading.Timer(RECORD_TIMEOUT_SEC, self._timeout_stop)
self.timer.daemon = True
self.timer.start()
def _timeout_stop(self):
self.stop_recording(trigger="timeout")
def _start_stop_worker(
self, stream: Any, record: Any, trigger: str, process_audio: bool
):
threading.Thread(
target=self._stop_and_process,
args=(stream, record, trigger, process_audio),
daemon=True,
).start()
def _begin_stop_locked(self):
if self.state != State.RECORDING:
return None
stream = self.stream
record = self.record
self.stream = None
self.record = None
if self.timer:
self.timer.cancel()
self.timer = None
self._disarm_cancel_listener()
prev = self.state
self.state = State.STT
logging.debug("state: %s -> %s", prev, self.state)
if stream is None or record is None:
logging.warning("recording resources are unavailable during stop")
self.state = State.IDLE
return None
return stream, record
def _stop_and_process(
self, stream: Any, record: Any, trigger: str, process_audio: bool
):
logging.info("stopping recording (%s)", trigger)
try:
audio = stop_audio_recording(stream, record)
except Exception as exc:
_log_support_issue(
logging.ERROR,
"runtime.audio",
f"record stop failed: {exc}",
next_step=(
f"rerun `{doctor_command(self.config_path)}` and verify the "
"audio runtime"
),
)
self.set_state(State.IDLE)
return
if not process_audio or self._shutdown_requested.is_set():
self.set_state(State.IDLE)
return
if audio.size == 0:
_log_support_issue(
logging.ERROR,
"runtime.audio",
"no audio was captured from the active input device",
next_step="verify the selected microphone level and rerun diagnostics",
)
self.set_state(State.IDLE)
return
try:
logging.info("stt started")
asr_result = self._transcribe_with_metrics(audio)
except Exception as exc:
_log_support_issue(
logging.ERROR,
"startup.readiness",
f"stt failed: {exc}",
next_step=(
f"run `{self_check_command(self.config_path)}` and then "
f"`{verbose_run_command(self.config_path)}`"
),
)
self.set_state(State.IDLE)
return
text = (asr_result.raw_text or "").strip()
stt_lang = asr_result.language
if not text:
self.set_state(State.IDLE)
return
if self.log_transcript:
logging.debug("stt: %s", text)
else:
logging.info("stt produced %d chars", len(text))
if not self._shutdown_requested.is_set():
self.set_state(State.PROCESSING)
logging.info("editor stage started")
try:
text, _timings = process_transcript_pipeline(
text,
stt_lang=stt_lang,
pipeline=self.pipeline,
suppress_ai_errors=False,
asr_result=asr_result,
asr_ms=asr_result.latency_ms,
verbose=self.log_transcript,
)
except Exception as exc:
_log_support_issue(
logging.ERROR,
"model.cache",
f"editor stage failed: {exc}",
next_step=(
f"run `{self_check_command(self.config_path)}` and inspect "
f"`{journalctl_command()}` if the service keeps failing"
),
)
self.set_state(State.IDLE)
return
if self.log_transcript:
logging.debug("processed: %s", text)
else:
logging.info("processed text length: %d", len(text))
if self._shutdown_requested.is_set():
self.set_state(State.IDLE)
return
try:
self.set_state(State.OUTPUTTING)
logging.info("outputting started")
backend = self.cfg.injection.backend
self.desktop.inject_text(
text,
backend,
remove_transcription_from_clipboard=(
self.cfg.injection.remove_transcription_from_clipboard
),
)
except Exception as exc:
_log_support_issue(
logging.ERROR,
"injection.backend",
f"output failed: {exc}",
next_step=(
f"run `{doctor_command(self.config_path)}` and then "
f"`{verbose_run_command(self.config_path)}`"
),
)
finally:
self.set_state(State.IDLE)
def stop_recording(self, *, trigger: str = "user", process_audio: bool = True):
with self.lock:
payload = self._begin_stop_locked()
if payload is None:
return
stream, record = payload
self._start_stop_worker(stream, record, trigger, process_audio)
def cancel_recording(self):
with self.lock:
if self.state != State.RECORDING:
return
self.stop_recording(trigger="cancel", process_audio=False)
def shutdown(self, timeout: float = 5.0) -> bool:
self.request_shutdown()
self._disarm_cancel_listener()
self.stop_recording(trigger="shutdown", process_audio=False)
return self.wait_for_idle(timeout)
def wait_for_idle(self, timeout: float) -> bool:
end = time.time() + timeout
while time.time() < end:
if self.get_state() == State.IDLE:
return True
time.sleep(0.05)
return self.get_state() == State.IDLE
def _transcribe_with_metrics(self, audio) -> AsrResult:
return self.asr_stage.transcribe(audio)
def _transcribe(self, audio) -> tuple[str, str]:
result = self._transcribe_with_metrics(audio)
return result.raw_text, result.language
def _warmup_editor_stage(self) -> None:
logging.info("warming up editor stage")
try:
self.editor_stage.warmup()
except Exception as exc:
if self.cfg.advanced.strict_startup:
raise RuntimeError(f"editor stage warmup failed: {exc}") from exc
logging.warning(
"editor stage warmup failed, continuing because "
"advanced.strict_startup=false: %s",
exc,
)
return
logging.info("editor stage warmup completed")
def _stt_hint_kwargs(self) -> dict[str, Any]:
if self._stt_hint_kwargs_cache is not None:
return self._stt_hint_kwargs_cache
hotwords, initial_prompt = self.vocabulary.build_stt_hints()
if not hotwords and not initial_prompt:
self._stt_hint_kwargs_cache = {}
return self._stt_hint_kwargs_cache
try:
signature = inspect.signature(self.model.transcribe)
except (TypeError, ValueError):
logging.debug("stt signature inspection failed; skipping hints")
self._stt_hint_kwargs_cache = {}
return self._stt_hint_kwargs_cache
params = signature.parameters
kwargs: dict[str, Any] = {}
if hotwords and "hotwords" in params:
kwargs["hotwords"] = hotwords
if initial_prompt and "initial_prompt" in params:
kwargs["initial_prompt"] = initial_prompt
if not kwargs:
logging.debug("stt hint arguments are not supported by this whisper runtime")
self._stt_hint_kwargs_cache = kwargs
return self._stt_hint_kwargs_cache

View file

@ -0,0 +1,191 @@
import io
import json
import sys
import tempfile
import unittest
from pathlib import Path
from types import SimpleNamespace
from unittest.mock import patch
ROOT = Path(__file__).resolve().parents[1]
SRC = ROOT / "src"
if str(SRC) not in sys.path:
sys.path.insert(0, str(SRC))
import aman_benchmarks
import aman_cli
from config import Config
class _FakeBenchEditorStage:
def warmup(self):
return
def rewrite(self, transcript, *, language, dictionary_context):
_ = dictionary_context
return SimpleNamespace(
final_text=f"[{language}] {transcript.strip()}",
latency_ms=1.0,
pass1_ms=0.5,
pass2_ms=0.5,
)
class AmanBenchmarksTests(unittest.TestCase):
def test_bench_command_json_output(self):
args = aman_cli.parse_cli_args(
["bench", "--text", "hello", "--repeat", "2", "--warmup", "0", "--json"]
)
out = io.StringIO()
with patch("aman_benchmarks.load", return_value=Config()), patch(
"aman_benchmarks.build_editor_stage", return_value=_FakeBenchEditorStage()
), patch("sys.stdout", out):
exit_code = aman_benchmarks.bench_command(args)
self.assertEqual(exit_code, 0)
payload = json.loads(out.getvalue())
self.assertEqual(payload["measured_runs"], 2)
self.assertEqual(payload["summary"]["runs"], 2)
self.assertEqual(len(payload["runs"]), 2)
self.assertEqual(payload["editor_backend"], "local_llama_builtin")
self.assertIn("avg_alignment_ms", payload["summary"])
self.assertIn("avg_fact_guard_ms", payload["summary"])
self.assertIn("alignment_applied", payload["runs"][0])
self.assertIn("fact_guard_action", payload["runs"][0])
def test_bench_command_supports_text_file_input(self):
with tempfile.TemporaryDirectory() as td:
text_file = Path(td) / "input.txt"
text_file.write_text("hello from file", encoding="utf-8")
args = aman_cli.parse_cli_args(
["bench", "--text-file", str(text_file), "--repeat", "1", "--warmup", "0", "--print-output"]
)
out = io.StringIO()
with patch("aman_benchmarks.load", return_value=Config()), patch(
"aman_benchmarks.build_editor_stage", return_value=_FakeBenchEditorStage()
), patch("sys.stdout", out):
exit_code = aman_benchmarks.bench_command(args)
self.assertEqual(exit_code, 0)
self.assertIn("[auto] hello from file", out.getvalue())
def test_bench_command_rejects_empty_input(self):
args = aman_cli.parse_cli_args(["bench", "--text", " "])
with patch("aman_benchmarks.load", return_value=Config()), patch(
"aman_benchmarks.build_editor_stage", return_value=_FakeBenchEditorStage()
):
exit_code = aman_benchmarks.bench_command(args)
self.assertEqual(exit_code, 1)
def test_bench_command_rejects_non_positive_repeat(self):
args = aman_cli.parse_cli_args(["bench", "--text", "hello", "--repeat", "0"])
with patch("aman_benchmarks.load", return_value=Config()), patch(
"aman_benchmarks.build_editor_stage", return_value=_FakeBenchEditorStage()
):
exit_code = aman_benchmarks.bench_command(args)
self.assertEqual(exit_code, 1)
def test_eval_models_command_writes_report(self):
with tempfile.TemporaryDirectory() as td:
output_path = Path(td) / "report.json"
args = aman_cli.parse_cli_args(
[
"eval-models",
"--dataset",
"benchmarks/cleanup_dataset.jsonl",
"--matrix",
"benchmarks/model_matrix.small_first.json",
"--output",
str(output_path),
"--json",
]
)
out = io.StringIO()
fake_report = {
"models": [
{
"name": "base",
"best_param_set": {
"latency_ms": {"p50": 1000.0},
"quality": {"hybrid_score_avg": 0.8, "parse_valid_rate": 1.0},
},
}
],
"winner_recommendation": {"name": "base", "reason": "test"},
}
with patch("aman_benchmarks.run_model_eval", return_value=fake_report), patch(
"sys.stdout", out
):
exit_code = aman_benchmarks.eval_models_command(args)
self.assertEqual(exit_code, 0)
self.assertTrue(output_path.exists())
payload = json.loads(output_path.read_text(encoding="utf-8"))
self.assertEqual(payload["winner_recommendation"]["name"], "base")
def test_eval_models_command_forwards_heuristic_arguments(self):
args = aman_cli.parse_cli_args(
[
"eval-models",
"--dataset",
"benchmarks/cleanup_dataset.jsonl",
"--matrix",
"benchmarks/model_matrix.small_first.json",
"--heuristic-dataset",
"benchmarks/heuristics_dataset.jsonl",
"--heuristic-weight",
"0.35",
"--report-version",
"2",
"--json",
]
)
out = io.StringIO()
fake_report = {
"models": [{"name": "base", "best_param_set": {}}],
"winner_recommendation": {"name": "base", "reason": "ok"},
}
with patch("aman_benchmarks.run_model_eval", return_value=fake_report) as run_eval_mock, patch(
"sys.stdout", out
):
exit_code = aman_benchmarks.eval_models_command(args)
self.assertEqual(exit_code, 0)
run_eval_mock.assert_called_once_with(
"benchmarks/cleanup_dataset.jsonl",
"benchmarks/model_matrix.small_first.json",
heuristic_dataset_path="benchmarks/heuristics_dataset.jsonl",
heuristic_weight=0.35,
report_version=2,
verbose=False,
)
def test_build_heuristic_dataset_command_json_output(self):
args = aman_cli.parse_cli_args(
[
"build-heuristic-dataset",
"--input",
"benchmarks/heuristics_dataset.raw.jsonl",
"--output",
"benchmarks/heuristics_dataset.jsonl",
"--json",
]
)
out = io.StringIO()
summary = {
"raw_rows": 4,
"written_rows": 4,
"generated_word_rows": 2,
"output_path": "benchmarks/heuristics_dataset.jsonl",
}
with patch("aman_benchmarks.build_heuristic_dataset", return_value=summary), patch(
"sys.stdout", out
):
exit_code = aman_benchmarks.build_heuristic_dataset_command(args)
self.assertEqual(exit_code, 0)
payload = json.loads(out.getvalue())
self.assertEqual(payload["written_rows"], 4)
if __name__ == "__main__":
unittest.main()

View file

@ -1,11 +1,9 @@
import io
import json
import subprocess
import sys
import tempfile
import unittest
from pathlib import Path
from types import SimpleNamespace
from unittest.mock import patch
ROOT = Path(__file__).resolve().parents[1]
@ -13,114 +11,16 @@ SRC = ROOT / "src"
if str(SRC) not in sys.path:
sys.path.insert(0, str(SRC))
import aman
from config import Config
from config_ui import ConfigUiResult
import aman_cli
from diagnostics import DiagnosticCheck, DiagnosticReport
class _FakeDesktop:
def __init__(self):
self.hotkey = None
self.hotkey_callback = None
def start_hotkey_listener(self, hotkey, callback):
self.hotkey = hotkey
self.hotkey_callback = callback
def stop_hotkey_listener(self):
return
def start_cancel_listener(self, callback):
_ = callback
return
def stop_cancel_listener(self):
return
def validate_hotkey(self, hotkey):
_ = hotkey
return
def inject_text(self, text, backend, *, remove_transcription_from_clipboard=False):
_ = (text, backend, remove_transcription_from_clipboard)
return
def run_tray(self, _state_getter, on_quit, **_kwargs):
on_quit()
def request_quit(self):
return
class _HotkeyFailDesktop(_FakeDesktop):
def start_hotkey_listener(self, hotkey, callback):
_ = (hotkey, callback)
raise RuntimeError("already in use")
class _FakeDaemon:
def __init__(self, cfg, _desktop, *, verbose=False, config_path=None):
self.cfg = cfg
self.verbose = verbose
self.config_path = config_path
self._paused = False
def get_state(self):
return "idle"
def is_paused(self):
return self._paused
def toggle_paused(self):
self._paused = not self._paused
return self._paused
def apply_config(self, cfg):
self.cfg = cfg
def toggle(self):
return
def shutdown(self, timeout=1.0):
_ = timeout
return True
class _RetrySetupDesktop(_FakeDesktop):
def __init__(self):
super().__init__()
self.settings_invocations = 0
def run_tray(self, _state_getter, on_quit, **kwargs):
settings_cb = kwargs.get("on_open_settings")
if settings_cb is not None and self.settings_invocations == 0:
self.settings_invocations += 1
settings_cb()
return
on_quit()
class _FakeBenchEditorStage:
def warmup(self):
return
def rewrite(self, transcript, *, language, dictionary_context):
_ = dictionary_context
return SimpleNamespace(
final_text=f"[{language}] {transcript.strip()}",
latency_ms=1.0,
pass1_ms=0.5,
pass2_ms=0.5,
)
class AmanCliTests(unittest.TestCase):
def test_parse_cli_args_help_flag_uses_top_level_parser(self):
out = io.StringIO()
with patch("sys.stdout", out), self.assertRaises(SystemExit) as exc:
aman._parse_cli_args(["--help"])
aman_cli.parse_cli_args(["--help"])
self.assertEqual(exc.exception.code, 0)
rendered = out.getvalue()
@ -133,31 +33,31 @@ class AmanCliTests(unittest.TestCase):
out = io.StringIO()
with patch("sys.stdout", out), self.assertRaises(SystemExit) as exc:
aman._parse_cli_args(["-h"])
aman_cli.parse_cli_args(["-h"])
self.assertEqual(exc.exception.code, 0)
self.assertIn("self-check", out.getvalue())
def test_parse_cli_args_defaults_to_run_command(self):
args = aman._parse_cli_args(["--dry-run"])
args = aman_cli.parse_cli_args(["--dry-run"])
self.assertEqual(args.command, "run")
self.assertTrue(args.dry_run)
def test_parse_cli_args_doctor_command(self):
args = aman._parse_cli_args(["doctor", "--json"])
args = aman_cli.parse_cli_args(["doctor", "--json"])
self.assertEqual(args.command, "doctor")
self.assertTrue(args.json)
def test_parse_cli_args_self_check_command(self):
args = aman._parse_cli_args(["self-check", "--json"])
args = aman_cli.parse_cli_args(["self-check", "--json"])
self.assertEqual(args.command, "self-check")
self.assertTrue(args.json)
def test_parse_cli_args_bench_command(self):
args = aman._parse_cli_args(
args = aman_cli.parse_cli_args(
["bench", "--text", "hello", "--repeat", "2", "--warmup", "0", "--json"]
)
@ -169,11 +69,17 @@ class AmanCliTests(unittest.TestCase):
def test_parse_cli_args_bench_requires_input(self):
with self.assertRaises(SystemExit):
aman._parse_cli_args(["bench"])
aman_cli.parse_cli_args(["bench"])
def test_parse_cli_args_eval_models_command(self):
args = aman._parse_cli_args(
["eval-models", "--dataset", "benchmarks/cleanup_dataset.jsonl", "--matrix", "benchmarks/model_matrix.small_first.json"]
args = aman_cli.parse_cli_args(
[
"eval-models",
"--dataset",
"benchmarks/cleanup_dataset.jsonl",
"--matrix",
"benchmarks/model_matrix.small_first.json",
]
)
self.assertEqual(args.command, "eval-models")
self.assertEqual(args.dataset, "benchmarks/cleanup_dataset.jsonl")
@ -183,7 +89,7 @@ class AmanCliTests(unittest.TestCase):
self.assertEqual(args.report_version, 2)
def test_parse_cli_args_eval_models_with_heuristic_options(self):
args = aman._parse_cli_args(
args = aman_cli.parse_cli_args(
[
"eval-models",
"--dataset",
@ -203,7 +109,7 @@ class AmanCliTests(unittest.TestCase):
self.assertEqual(args.report_version, 2)
def test_parse_cli_args_build_heuristic_dataset_command(self):
args = aman._parse_cli_args(
args = aman_cli.parse_cli_args(
[
"build-heuristic-dataset",
"--input",
@ -216,79 +122,40 @@ class AmanCliTests(unittest.TestCase):
self.assertEqual(args.input, "benchmarks/heuristics_dataset.raw.jsonl")
self.assertEqual(args.output, "benchmarks/heuristics_dataset.jsonl")
def test_parse_cli_args_sync_default_model_command(self):
args = aman._parse_cli_args(
[
"sync-default-model",
"--report",
"benchmarks/results/latest.json",
"--artifacts",
"benchmarks/model_artifacts.json",
"--constants",
"src/constants.py",
"--check",
]
)
self.assertEqual(args.command, "sync-default-model")
self.assertEqual(args.report, "benchmarks/results/latest.json")
self.assertEqual(args.artifacts, "benchmarks/model_artifacts.json")
self.assertEqual(args.constants, "src/constants.py")
self.assertTrue(args.check)
def test_parse_cli_args_legacy_maint_command_errors_with_migration_hint(self):
err = io.StringIO()
with patch("sys.stderr", err), self.assertRaises(SystemExit) as exc:
aman_cli.parse_cli_args(["sync-default-model"])
self.assertEqual(exc.exception.code, 2)
self.assertIn("aman-maint sync-default-model", err.getvalue())
self.assertIn("make sync-default-model", err.getvalue())
def test_version_command_prints_version(self):
out = io.StringIO()
args = aman._parse_cli_args(["version"])
with patch("aman._app_version", return_value="1.2.3"), patch("sys.stdout", out):
exit_code = aman._version_command(args)
args = aman_cli.parse_cli_args(["version"])
with patch("aman_cli.app_version", return_value="1.2.3"), patch("sys.stdout", out):
exit_code = aman_cli.version_command(args)
self.assertEqual(exit_code, 0)
self.assertEqual(out.getvalue().strip(), "1.2.3")
def test_version_command_does_not_import_config_ui(self):
script = f"""
import builtins
import sys
from pathlib import Path
sys.path.insert(0, {str(SRC)!r})
real_import = builtins.__import__
def blocked(name, globals=None, locals=None, fromlist=(), level=0):
if name == "config_ui":
raise ModuleNotFoundError("blocked config_ui")
return real_import(name, globals, locals, fromlist, level)
builtins.__import__ = blocked
import aman
args = aman._parse_cli_args(["version"])
raise SystemExit(aman._version_command(args))
"""
result = subprocess.run(
[sys.executable, "-c", script],
cwd=ROOT,
text=True,
capture_output=True,
check=False,
)
self.assertEqual(result.returncode, 0, result.stderr)
self.assertRegex(result.stdout.strip(), r"\S+")
def test_app_version_prefers_local_pyproject_version(self):
pyproject_text = '[project]\nversion = "9.9.9"\n'
with patch.object(aman.Path, "exists", return_value=True), patch.object(
aman.Path, "read_text", return_value=pyproject_text
), patch("aman.importlib.metadata.version", return_value="1.0.0"):
self.assertEqual(aman._app_version(), "9.9.9")
with patch.object(aman_cli.Path, "exists", return_value=True), patch.object(
aman_cli.Path, "read_text", return_value=pyproject_text
), patch("aman_cli.importlib.metadata.version", return_value="1.0.0"):
self.assertEqual(aman_cli.app_version(), "9.9.9")
def test_doctor_command_json_output_and_exit_code(self):
report = DiagnosticReport(
checks=[DiagnosticCheck(id="config.load", status="ok", message="ok", next_step="")]
)
args = aman._parse_cli_args(["doctor", "--json"])
args = aman_cli.parse_cli_args(["doctor", "--json"])
out = io.StringIO()
with patch("aman.run_doctor", return_value=report), patch("sys.stdout", out):
exit_code = aman._doctor_command(args)
with patch("aman_cli.run_doctor", return_value=report), patch("sys.stdout", out):
exit_code = aman_cli.doctor_command(args)
self.assertEqual(exit_code, 0)
payload = json.loads(out.getvalue())
@ -300,10 +167,10 @@ raise SystemExit(aman._version_command(args))
report = DiagnosticReport(
checks=[DiagnosticCheck(id="config.load", status="fail", message="broken", next_step="fix")]
)
args = aman._parse_cli_args(["doctor"])
args = aman_cli.parse_cli_args(["doctor"])
out = io.StringIO()
with patch("aman.run_doctor", return_value=report), patch("sys.stdout", out):
exit_code = aman._doctor_command(args)
with patch("aman_cli.run_doctor", return_value=report), patch("sys.stdout", out):
exit_code = aman_cli.doctor_command(args)
self.assertEqual(exit_code, 2)
self.assertIn("[FAIL] config.load", out.getvalue())
@ -313,10 +180,10 @@ raise SystemExit(aman._version_command(args))
report = DiagnosticReport(
checks=[DiagnosticCheck(id="model.cache", status="warn", message="missing", next_step="run aman once")]
)
args = aman._parse_cli_args(["doctor"])
args = aman_cli.parse_cli_args(["doctor"])
out = io.StringIO()
with patch("aman.run_doctor", return_value=report), patch("sys.stdout", out):
exit_code = aman._doctor_command(args)
with patch("aman_cli.run_doctor", return_value=report), patch("sys.stdout", out):
exit_code = aman_cli.doctor_command(args)
self.assertEqual(exit_code, 0)
self.assertIn("[WARN] model.cache", out.getvalue())
@ -326,275 +193,22 @@ raise SystemExit(aman._version_command(args))
report = DiagnosticReport(
checks=[DiagnosticCheck(id="startup.readiness", status="ok", message="ready", next_step="")]
)
args = aman._parse_cli_args(["self-check", "--json"])
args = aman_cli.parse_cli_args(["self-check", "--json"])
out = io.StringIO()
with patch("aman.run_self_check", return_value=report) as runner, patch("sys.stdout", out):
exit_code = aman._self_check_command(args)
with patch("aman_cli.run_self_check", return_value=report) as runner, patch("sys.stdout", out):
exit_code = aman_cli.self_check_command(args)
self.assertEqual(exit_code, 0)
runner.assert_called_once_with("")
payload = json.loads(out.getvalue())
self.assertEqual(payload["status"], "ok")
def test_bench_command_json_output(self):
args = aman._parse_cli_args(["bench", "--text", "hello", "--repeat", "2", "--warmup", "0", "--json"])
out = io.StringIO()
with patch("aman.load", return_value=Config()), patch(
"aman._build_editor_stage", return_value=_FakeBenchEditorStage()
), patch("sys.stdout", out):
exit_code = aman._bench_command(args)
self.assertEqual(exit_code, 0)
payload = json.loads(out.getvalue())
self.assertEqual(payload["measured_runs"], 2)
self.assertEqual(payload["summary"]["runs"], 2)
self.assertEqual(len(payload["runs"]), 2)
self.assertEqual(payload["editor_backend"], "local_llama_builtin")
self.assertIn("avg_alignment_ms", payload["summary"])
self.assertIn("avg_fact_guard_ms", payload["summary"])
self.assertIn("alignment_applied", payload["runs"][0])
self.assertIn("fact_guard_action", payload["runs"][0])
def test_bench_command_supports_text_file_input(self):
with tempfile.TemporaryDirectory() as td:
text_file = Path(td) / "input.txt"
text_file.write_text("hello from file", encoding="utf-8")
args = aman._parse_cli_args(
["bench", "--text-file", str(text_file), "--repeat", "1", "--warmup", "0", "--print-output"]
)
out = io.StringIO()
with patch("aman.load", return_value=Config()), patch(
"aman._build_editor_stage", return_value=_FakeBenchEditorStage()
), patch("sys.stdout", out):
exit_code = aman._bench_command(args)
self.assertEqual(exit_code, 0)
self.assertIn("[auto] hello from file", out.getvalue())
def test_bench_command_rejects_empty_input(self):
args = aman._parse_cli_args(["bench", "--text", " "])
with patch("aman.load", return_value=Config()), patch(
"aman._build_editor_stage", return_value=_FakeBenchEditorStage()
):
exit_code = aman._bench_command(args)
self.assertEqual(exit_code, 1)
def test_bench_command_rejects_non_positive_repeat(self):
args = aman._parse_cli_args(["bench", "--text", "hello", "--repeat", "0"])
with patch("aman.load", return_value=Config()), patch(
"aman._build_editor_stage", return_value=_FakeBenchEditorStage()
):
exit_code = aman._bench_command(args)
self.assertEqual(exit_code, 1)
def test_eval_models_command_writes_report(self):
with tempfile.TemporaryDirectory() as td:
output_path = Path(td) / "report.json"
args = aman._parse_cli_args(
[
"eval-models",
"--dataset",
"benchmarks/cleanup_dataset.jsonl",
"--matrix",
"benchmarks/model_matrix.small_first.json",
"--output",
str(output_path),
"--json",
]
)
out = io.StringIO()
fake_report = {
"models": [{"name": "base", "best_param_set": {"latency_ms": {"p50": 1000.0}, "quality": {"hybrid_score_avg": 0.8, "parse_valid_rate": 1.0}}}],
"winner_recommendation": {"name": "base", "reason": "test"},
}
with patch("aman.run_model_eval", return_value=fake_report), patch("sys.stdout", out):
exit_code = aman._eval_models_command(args)
self.assertEqual(exit_code, 0)
self.assertTrue(output_path.exists())
payload = json.loads(output_path.read_text(encoding="utf-8"))
self.assertEqual(payload["winner_recommendation"]["name"], "base")
def test_eval_models_command_forwards_heuristic_arguments(self):
args = aman._parse_cli_args(
[
"eval-models",
"--dataset",
"benchmarks/cleanup_dataset.jsonl",
"--matrix",
"benchmarks/model_matrix.small_first.json",
"--heuristic-dataset",
"benchmarks/heuristics_dataset.jsonl",
"--heuristic-weight",
"0.35",
"--report-version",
"2",
"--json",
]
)
out = io.StringIO()
fake_report = {
"models": [{"name": "base", "best_param_set": {}}],
"winner_recommendation": {"name": "base", "reason": "ok"},
}
with patch("aman.run_model_eval", return_value=fake_report) as run_eval_mock, patch(
"sys.stdout", out
):
exit_code = aman._eval_models_command(args)
self.assertEqual(exit_code, 0)
run_eval_mock.assert_called_once_with(
"benchmarks/cleanup_dataset.jsonl",
"benchmarks/model_matrix.small_first.json",
heuristic_dataset_path="benchmarks/heuristics_dataset.jsonl",
heuristic_weight=0.35,
report_version=2,
verbose=False,
)
def test_build_heuristic_dataset_command_json_output(self):
args = aman._parse_cli_args(
[
"build-heuristic-dataset",
"--input",
"benchmarks/heuristics_dataset.raw.jsonl",
"--output",
"benchmarks/heuristics_dataset.jsonl",
"--json",
]
)
out = io.StringIO()
summary = {
"raw_rows": 4,
"written_rows": 4,
"generated_word_rows": 2,
"output_path": "benchmarks/heuristics_dataset.jsonl",
}
with patch("aman.build_heuristic_dataset", return_value=summary), patch("sys.stdout", out):
exit_code = aman._build_heuristic_dataset_command(args)
self.assertEqual(exit_code, 0)
payload = json.loads(out.getvalue())
self.assertEqual(payload["written_rows"], 4)
def test_sync_default_model_command_updates_constants(self):
with tempfile.TemporaryDirectory() as td:
report_path = Path(td) / "latest.json"
artifacts_path = Path(td) / "artifacts.json"
constants_path = Path(td) / "constants.py"
report_path.write_text(
json.dumps(
{
"winner_recommendation": {
"name": "test-model",
}
}
),
encoding="utf-8",
)
artifacts_path.write_text(
json.dumps(
{
"models": [
{
"name": "test-model",
"filename": "winner.gguf",
"url": "https://example.invalid/winner.gguf",
"sha256": "a" * 64,
}
]
}
),
encoding="utf-8",
)
constants_path.write_text(
(
'MODEL_NAME = "old.gguf"\n'
'MODEL_URL = "https://example.invalid/old.gguf"\n'
'MODEL_SHA256 = "' + ("b" * 64) + '"\n'
),
encoding="utf-8",
)
args = aman._parse_cli_args(
[
"sync-default-model",
"--report",
str(report_path),
"--artifacts",
str(artifacts_path),
"--constants",
str(constants_path),
]
)
exit_code = aman._sync_default_model_command(args)
self.assertEqual(exit_code, 0)
updated = constants_path.read_text(encoding="utf-8")
self.assertIn('MODEL_NAME = "winner.gguf"', updated)
self.assertIn('MODEL_URL = "https://example.invalid/winner.gguf"', updated)
self.assertIn('MODEL_SHA256 = "' + ("a" * 64) + '"', updated)
def test_sync_default_model_command_check_mode_returns_2_on_drift(self):
with tempfile.TemporaryDirectory() as td:
report_path = Path(td) / "latest.json"
artifacts_path = Path(td) / "artifacts.json"
constants_path = Path(td) / "constants.py"
report_path.write_text(
json.dumps(
{
"winner_recommendation": {
"name": "test-model",
}
}
),
encoding="utf-8",
)
artifacts_path.write_text(
json.dumps(
{
"models": [
{
"name": "test-model",
"filename": "winner.gguf",
"url": "https://example.invalid/winner.gguf",
"sha256": "a" * 64,
}
]
}
),
encoding="utf-8",
)
constants_path.write_text(
(
'MODEL_NAME = "old.gguf"\n'
'MODEL_URL = "https://example.invalid/old.gguf"\n'
'MODEL_SHA256 = "' + ("b" * 64) + '"\n'
),
encoding="utf-8",
)
args = aman._parse_cli_args(
[
"sync-default-model",
"--report",
str(report_path),
"--artifacts",
str(artifacts_path),
"--constants",
str(constants_path),
"--check",
]
)
exit_code = aman._sync_default_model_command(args)
self.assertEqual(exit_code, 2)
updated = constants_path.read_text(encoding="utf-8")
self.assertIn('MODEL_NAME = "old.gguf"', updated)
def test_init_command_creates_default_config(self):
with tempfile.TemporaryDirectory() as td:
path = Path(td) / "config.json"
args = aman._parse_cli_args(["init", "--config", str(path)])
args = aman_cli.parse_cli_args(["init", "--config", str(path)])
exit_code = aman._init_command(args)
exit_code = aman_cli.init_command(args)
self.assertEqual(exit_code, 0)
self.assertTrue(path.exists())
payload = json.loads(path.read_text(encoding="utf-8"))
@ -604,9 +218,9 @@ raise SystemExit(aman._version_command(args))
with tempfile.TemporaryDirectory() as td:
path = Path(td) / "config.json"
path.write_text('{"daemon":{"hotkey":"Super+m"}}\n', encoding="utf-8")
args = aman._parse_cli_args(["init", "--config", str(path)])
args = aman_cli.parse_cli_args(["init", "--config", str(path)])
exit_code = aman._init_command(args)
exit_code = aman_cli.init_command(args)
self.assertEqual(exit_code, 1)
self.assertIn("Super+m", path.read_text(encoding="utf-8"))
@ -614,109 +228,13 @@ raise SystemExit(aman._version_command(args))
with tempfile.TemporaryDirectory() as td:
path = Path(td) / "config.json"
path.write_text('{"daemon":{"hotkey":"Super+m"}}\n', encoding="utf-8")
args = aman._parse_cli_args(["init", "--config", str(path), "--force"])
args = aman_cli.parse_cli_args(["init", "--config", str(path), "--force"])
exit_code = aman._init_command(args)
exit_code = aman_cli.init_command(args)
self.assertEqual(exit_code, 0)
payload = json.loads(path.read_text(encoding="utf-8"))
self.assertEqual(payload["daemon"]["hotkey"], "Cmd+m")
def test_run_command_missing_config_uses_settings_ui_and_writes_file(self):
with tempfile.TemporaryDirectory() as td:
path = Path(td) / "config.json"
args = aman._parse_cli_args(["run", "--config", str(path)])
desktop = _FakeDesktop()
onboard_cfg = Config()
onboard_cfg.daemon.hotkey = "Super+m"
with patch("aman._lock_single_instance", return_value=object()), patch(
"aman.get_desktop_adapter", return_value=desktop
), patch(
"aman._run_config_ui",
return_value=ConfigUiResult(saved=True, config=onboard_cfg, closed_reason="saved"),
) as config_ui_mock, patch("aman.Daemon", _FakeDaemon):
exit_code = aman._run_command(args)
self.assertEqual(exit_code, 0)
self.assertTrue(path.exists())
self.assertEqual(desktop.hotkey, "Super+m")
config_ui_mock.assert_called_once()
def test_run_command_missing_config_cancel_returns_without_starting_daemon(self):
with tempfile.TemporaryDirectory() as td:
path = Path(td) / "config.json"
args = aman._parse_cli_args(["run", "--config", str(path)])
desktop = _FakeDesktop()
with patch("aman._lock_single_instance", return_value=object()), patch(
"aman.get_desktop_adapter", return_value=desktop
), patch(
"aman._run_config_ui",
return_value=ConfigUiResult(saved=False, config=None, closed_reason="cancelled"),
), patch("aman.Daemon") as daemon_cls:
exit_code = aman._run_command(args)
self.assertEqual(exit_code, 0)
self.assertFalse(path.exists())
daemon_cls.assert_not_called()
def test_run_command_missing_config_cancel_then_retry_settings(self):
with tempfile.TemporaryDirectory() as td:
path = Path(td) / "config.json"
args = aman._parse_cli_args(["run", "--config", str(path)])
desktop = _RetrySetupDesktop()
onboard_cfg = Config()
config_ui_results = [
ConfigUiResult(saved=False, config=None, closed_reason="cancelled"),
ConfigUiResult(saved=True, config=onboard_cfg, closed_reason="saved"),
]
with patch("aman._lock_single_instance", return_value=object()), patch(
"aman.get_desktop_adapter", return_value=desktop
), patch(
"aman._run_config_ui",
side_effect=config_ui_results,
), patch("aman.Daemon", _FakeDaemon):
exit_code = aman._run_command(args)
self.assertEqual(exit_code, 0)
self.assertTrue(path.exists())
self.assertEqual(desktop.settings_invocations, 1)
def test_run_command_hotkey_failure_logs_actionable_issue(self):
with tempfile.TemporaryDirectory() as td:
path = Path(td) / "config.json"
path.write_text(json.dumps({"config_version": 1}) + "\n", encoding="utf-8")
args = aman._parse_cli_args(["run", "--config", str(path)])
desktop = _HotkeyFailDesktop()
with patch("aman._lock_single_instance", return_value=object()), patch(
"aman.get_desktop_adapter", return_value=desktop
), patch("aman.load", return_value=Config()), patch("aman.Daemon", _FakeDaemon), self.assertLogs(
level="ERROR"
) as logs:
exit_code = aman._run_command(args)
self.assertEqual(exit_code, 1)
rendered = "\n".join(logs.output)
self.assertIn("hotkey.parse: hotkey setup failed: already in use", rendered)
self.assertIn("next_step: run `aman doctor --config", rendered)
def test_run_command_daemon_init_failure_logs_self_check_next_step(self):
with tempfile.TemporaryDirectory() as td:
path = Path(td) / "config.json"
path.write_text(json.dumps({"config_version": 1}) + "\n", encoding="utf-8")
args = aman._parse_cli_args(["run", "--config", str(path)])
desktop = _FakeDesktop()
with patch("aman._lock_single_instance", return_value=object()), patch(
"aman.get_desktop_adapter", return_value=desktop
), patch("aman.load", return_value=Config()), patch(
"aman.Daemon", side_effect=RuntimeError("warmup boom")
), self.assertLogs(level="ERROR") as logs:
exit_code = aman._run_command(args)
self.assertEqual(exit_code, 1)
rendered = "\n".join(logs.output)
self.assertIn("startup.readiness: startup failed: warmup boom", rendered)
self.assertIn("next_step: run `aman self-check --config", rendered)
if __name__ == "__main__":
unittest.main()

View file

@ -0,0 +1,51 @@
import re
import subprocess
import sys
import unittest
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
SRC = ROOT / "src"
if str(SRC) not in sys.path:
sys.path.insert(0, str(SRC))
import aman
import aman_cli
class AmanEntrypointTests(unittest.TestCase):
def test_aman_module_only_reexports_main(self):
self.assertIs(aman.main, aman_cli.main)
self.assertFalse(hasattr(aman, "Daemon"))
def test_python_m_aman_version_succeeds_without_config_ui(self):
script = f"""
import builtins
import sys
sys.path.insert(0, {str(SRC)!r})
real_import = builtins.__import__
def blocked(name, globals=None, locals=None, fromlist=(), level=0):
if name == "config_ui":
raise ModuleNotFoundError("blocked config_ui")
return real_import(name, globals, locals, fromlist, level)
builtins.__import__ = blocked
import aman
raise SystemExit(aman.main(["version"]))
"""
result = subprocess.run(
[sys.executable, "-c", script],
cwd=ROOT,
text=True,
capture_output=True,
check=False,
)
self.assertEqual(result.returncode, 0, result.stderr)
self.assertRegex(result.stdout.strip(), re.compile(r"\S+"))
if __name__ == "__main__":
unittest.main()

148
tests/test_aman_maint.py Normal file
View file

@ -0,0 +1,148 @@
import json
import sys
import tempfile
import unittest
from pathlib import Path
from unittest.mock import patch
ROOT = Path(__file__).resolve().parents[1]
SRC = ROOT / "src"
if str(SRC) not in sys.path:
sys.path.insert(0, str(SRC))
import aman_maint
import aman_model_sync
class AmanMaintTests(unittest.TestCase):
def test_parse_args_sync_default_model_command(self):
args = aman_maint.parse_args(
[
"sync-default-model",
"--report",
"benchmarks/results/latest.json",
"--artifacts",
"benchmarks/model_artifacts.json",
"--constants",
"src/constants.py",
"--check",
]
)
self.assertEqual(args.command, "sync-default-model")
self.assertEqual(args.report, "benchmarks/results/latest.json")
self.assertEqual(args.artifacts, "benchmarks/model_artifacts.json")
self.assertEqual(args.constants, "src/constants.py")
self.assertTrue(args.check)
def test_main_dispatches_sync_default_model_command(self):
with patch("aman_model_sync.sync_default_model_command", return_value=7) as handler:
exit_code = aman_maint.main(["sync-default-model"])
self.assertEqual(exit_code, 7)
handler.assert_called_once()
def test_sync_default_model_command_updates_constants(self):
with tempfile.TemporaryDirectory() as td:
report_path = Path(td) / "latest.json"
artifacts_path = Path(td) / "artifacts.json"
constants_path = Path(td) / "constants.py"
report_path.write_text(
json.dumps({"winner_recommendation": {"name": "test-model"}}),
encoding="utf-8",
)
artifacts_path.write_text(
json.dumps(
{
"models": [
{
"name": "test-model",
"filename": "winner.gguf",
"url": "https://example.invalid/winner.gguf",
"sha256": "a" * 64,
}
]
}
),
encoding="utf-8",
)
constants_path.write_text(
(
'MODEL_NAME = "old.gguf"\n'
'MODEL_URL = "https://example.invalid/old.gguf"\n'
'MODEL_SHA256 = "' + ("b" * 64) + '"\n'
),
encoding="utf-8",
)
args = aman_maint.parse_args(
[
"sync-default-model",
"--report",
str(report_path),
"--artifacts",
str(artifacts_path),
"--constants",
str(constants_path),
]
)
exit_code = aman_model_sync.sync_default_model_command(args)
self.assertEqual(exit_code, 0)
updated = constants_path.read_text(encoding="utf-8")
self.assertIn('MODEL_NAME = "winner.gguf"', updated)
self.assertIn('MODEL_URL = "https://example.invalid/winner.gguf"', updated)
self.assertIn('MODEL_SHA256 = "' + ("a" * 64) + '"', updated)
def test_sync_default_model_command_check_mode_returns_2_on_drift(self):
with tempfile.TemporaryDirectory() as td:
report_path = Path(td) / "latest.json"
artifacts_path = Path(td) / "artifacts.json"
constants_path = Path(td) / "constants.py"
report_path.write_text(
json.dumps({"winner_recommendation": {"name": "test-model"}}),
encoding="utf-8",
)
artifacts_path.write_text(
json.dumps(
{
"models": [
{
"name": "test-model",
"filename": "winner.gguf",
"url": "https://example.invalid/winner.gguf",
"sha256": "a" * 64,
}
]
}
),
encoding="utf-8",
)
constants_path.write_text(
(
'MODEL_NAME = "old.gguf"\n'
'MODEL_URL = "https://example.invalid/old.gguf"\n'
'MODEL_SHA256 = "' + ("b" * 64) + '"\n'
),
encoding="utf-8",
)
args = aman_maint.parse_args(
[
"sync-default-model",
"--report",
str(report_path),
"--artifacts",
str(artifacts_path),
"--constants",
str(constants_path),
"--check",
]
)
exit_code = aman_model_sync.sync_default_model_command(args)
self.assertEqual(exit_code, 2)
updated = constants_path.read_text(encoding="utf-8")
self.assertIn('MODEL_NAME = "old.gguf"', updated)
if __name__ == "__main__":
unittest.main()

210
tests/test_aman_run.py Normal file
View file

@ -0,0 +1,210 @@
import json
import os
import sys
import tempfile
import unittest
from pathlib import Path
from types import SimpleNamespace
from unittest.mock import patch
ROOT = Path(__file__).resolve().parents[1]
SRC = ROOT / "src"
if str(SRC) not in sys.path:
sys.path.insert(0, str(SRC))
import aman_cli
import aman_run
from config import Config
class _FakeDesktop:
def __init__(self):
self.hotkey = None
self.hotkey_callback = None
def start_hotkey_listener(self, hotkey, callback):
self.hotkey = hotkey
self.hotkey_callback = callback
def stop_hotkey_listener(self):
return
def start_cancel_listener(self, callback):
_ = callback
return
def stop_cancel_listener(self):
return
def validate_hotkey(self, hotkey):
_ = hotkey
return
def inject_text(self, text, backend, *, remove_transcription_from_clipboard=False):
_ = (text, backend, remove_transcription_from_clipboard)
return
def run_tray(self, _state_getter, on_quit, **_kwargs):
on_quit()
def request_quit(self):
return
class _HotkeyFailDesktop(_FakeDesktop):
def start_hotkey_listener(self, hotkey, callback):
_ = (hotkey, callback)
raise RuntimeError("already in use")
class _FakeDaemon:
def __init__(self, cfg, _desktop, *, verbose=False, config_path=None):
self.cfg = cfg
self.verbose = verbose
self.config_path = config_path
self._paused = False
def get_state(self):
return "idle"
def is_paused(self):
return self._paused
def toggle_paused(self):
self._paused = not self._paused
return self._paused
def apply_config(self, cfg):
self.cfg = cfg
def toggle(self):
return
def shutdown(self, timeout=1.0):
_ = timeout
return True
class _RetrySetupDesktop(_FakeDesktop):
def __init__(self):
super().__init__()
self.settings_invocations = 0
def run_tray(self, _state_getter, on_quit, **kwargs):
settings_cb = kwargs.get("on_open_settings")
if settings_cb is not None and self.settings_invocations == 0:
self.settings_invocations += 1
settings_cb()
return
on_quit()
class AmanRunTests(unittest.TestCase):
def test_lock_rejects_second_instance(self):
with tempfile.TemporaryDirectory() as td:
with patch.dict(os.environ, {"XDG_RUNTIME_DIR": td}, clear=False):
first = aman_run.lock_single_instance()
try:
with self.assertRaises(SystemExit) as ctx:
aman_run.lock_single_instance()
self.assertIn("already running", str(ctx.exception))
finally:
first.close()
def test_run_command_missing_config_uses_settings_ui_and_writes_file(self):
with tempfile.TemporaryDirectory() as td:
path = Path(td) / "config.json"
args = aman_cli.parse_cli_args(["run", "--config", str(path)])
desktop = _FakeDesktop()
onboard_cfg = Config()
onboard_cfg.daemon.hotkey = "Super+m"
result = SimpleNamespace(saved=True, config=onboard_cfg, closed_reason="saved")
with patch("aman_run.lock_single_instance", return_value=object()), patch(
"aman_run.get_desktop_adapter", return_value=desktop
), patch("aman_run.run_config_ui", return_value=result) as config_ui_mock, patch(
"aman_run.Daemon", _FakeDaemon
):
exit_code = aman_run.run_command(args)
self.assertEqual(exit_code, 0)
self.assertTrue(path.exists())
self.assertEqual(desktop.hotkey, "Super+m")
config_ui_mock.assert_called_once()
def test_run_command_missing_config_cancel_returns_without_starting_daemon(self):
with tempfile.TemporaryDirectory() as td:
path = Path(td) / "config.json"
args = aman_cli.parse_cli_args(["run", "--config", str(path)])
desktop = _FakeDesktop()
result = SimpleNamespace(saved=False, config=None, closed_reason="cancelled")
with patch("aman_run.lock_single_instance", return_value=object()), patch(
"aman_run.get_desktop_adapter", return_value=desktop
), patch("aman_run.run_config_ui", return_value=result), patch(
"aman_run.Daemon"
) as daemon_cls:
exit_code = aman_run.run_command(args)
self.assertEqual(exit_code, 0)
self.assertFalse(path.exists())
daemon_cls.assert_not_called()
def test_run_command_missing_config_cancel_then_retry_settings(self):
with tempfile.TemporaryDirectory() as td:
path = Path(td) / "config.json"
args = aman_cli.parse_cli_args(["run", "--config", str(path)])
desktop = _RetrySetupDesktop()
onboard_cfg = Config()
config_ui_results = [
SimpleNamespace(saved=False, config=None, closed_reason="cancelled"),
SimpleNamespace(saved=True, config=onboard_cfg, closed_reason="saved"),
]
with patch("aman_run.lock_single_instance", return_value=object()), patch(
"aman_run.get_desktop_adapter", return_value=desktop
), patch("aman_run.run_config_ui", side_effect=config_ui_results), patch(
"aman_run.Daemon", _FakeDaemon
):
exit_code = aman_run.run_command(args)
self.assertEqual(exit_code, 0)
self.assertTrue(path.exists())
self.assertEqual(desktop.settings_invocations, 1)
def test_run_command_hotkey_failure_logs_actionable_issue(self):
with tempfile.TemporaryDirectory() as td:
path = Path(td) / "config.json"
path.write_text(json.dumps({"config_version": 1}) + "\n", encoding="utf-8")
args = aman_cli.parse_cli_args(["run", "--config", str(path)])
desktop = _HotkeyFailDesktop()
with patch("aman_run.lock_single_instance", return_value=object()), patch(
"aman_run.get_desktop_adapter", return_value=desktop
), patch("aman_run.load", return_value=Config()), patch(
"aman_run.Daemon", _FakeDaemon
), self.assertLogs(level="ERROR") as logs:
exit_code = aman_run.run_command(args)
self.assertEqual(exit_code, 1)
rendered = "\n".join(logs.output)
self.assertIn("hotkey.parse: hotkey setup failed: already in use", rendered)
self.assertIn("next_step: run `aman doctor --config", rendered)
def test_run_command_daemon_init_failure_logs_self_check_next_step(self):
with tempfile.TemporaryDirectory() as td:
path = Path(td) / "config.json"
path.write_text(json.dumps({"config_version": 1}) + "\n", encoding="utf-8")
args = aman_cli.parse_cli_args(["run", "--config", str(path)])
desktop = _FakeDesktop()
with patch("aman_run.lock_single_instance", return_value=object()), patch(
"aman_run.get_desktop_adapter", return_value=desktop
), patch("aman_run.load", return_value=Config()), patch(
"aman_run.Daemon", side_effect=RuntimeError("warmup boom")
), self.assertLogs(level="ERROR") as logs:
exit_code = aman_run.run_command(args)
self.assertEqual(exit_code, 1)
rendered = "\n".join(logs.output)
self.assertIn("startup.readiness: startup failed: warmup boom", rendered)
self.assertIn("next_step: run `aman self-check --config", rendered)
if __name__ == "__main__":
unittest.main()

View file

@ -1,6 +1,4 @@
import os
import sys
import tempfile
import unittest
from pathlib import Path
from unittest.mock import patch
@ -10,7 +8,7 @@ SRC = ROOT / "src"
if str(SRC) not in sys.path:
sys.path.insert(0, str(SRC))
import aman
import aman_runtime
from config import Config, VocabularyReplacement
from stages.asr_whisper import AsrResult, AsrSegment, AsrWord
@ -128,10 +126,10 @@ class FakeAIProcessor:
self.warmup_error = None
self.process_error = None
def process(self, text, lang="auto", **_kwargs):
def process(self, text, lang="auto", **kwargs):
if self.process_error is not None:
raise self.process_error
self.last_kwargs = {"lang": lang, **_kwargs}
self.last_kwargs = {"lang": lang, **kwargs}
return text
def warmup(self, profile="default"):
@ -174,8 +172,7 @@ def _asr_result(text: str, words: list[str], *, language: str = "auto") -> AsrRe
class DaemonTests(unittest.TestCase):
def _config(self) -> Config:
cfg = Config()
return cfg
return Config()
def _build_daemon(
self,
@ -185,16 +182,16 @@ class DaemonTests(unittest.TestCase):
cfg: Config | None = None,
verbose: bool = False,
ai_processor: FakeAIProcessor | None = None,
) -> aman.Daemon:
) -> aman_runtime.Daemon:
active_cfg = cfg if cfg is not None else self._config()
active_ai_processor = ai_processor or FakeAIProcessor()
with patch("aman._build_whisper_model", return_value=model), patch(
"aman.LlamaProcessor", return_value=active_ai_processor
with patch("aman_runtime.build_whisper_model", return_value=model), patch(
"aman_processing.LlamaProcessor", return_value=active_ai_processor
):
return aman.Daemon(active_cfg, desktop, verbose=verbose)
return aman_runtime.Daemon(active_cfg, desktop, verbose=verbose)
@patch("aman.stop_audio_recording", return_value=FakeAudio(8))
@patch("aman.start_audio_recording", return_value=(object(), object()))
@patch("aman_runtime.stop_audio_recording", return_value=FakeAudio(8))
@patch("aman_runtime.start_audio_recording", return_value=(object(), object()))
def test_toggle_start_stop_injects_text(self, _start_mock, _stop_mock):
desktop = FakeDesktop()
daemon = self._build_daemon(desktop, FakeModel(), verbose=False)
@ -205,15 +202,15 @@ class DaemonTests(unittest.TestCase):
)
daemon.toggle()
self.assertEqual(daemon.get_state(), aman.State.RECORDING)
self.assertEqual(daemon.get_state(), aman_runtime.State.RECORDING)
daemon.toggle()
self.assertEqual(daemon.get_state(), aman.State.IDLE)
self.assertEqual(daemon.get_state(), aman_runtime.State.IDLE)
self.assertEqual(desktop.inject_calls, [("hello world", "clipboard", False)])
@patch("aman.stop_audio_recording", return_value=FakeAudio(8))
@patch("aman.start_audio_recording", return_value=(object(), object()))
@patch("aman_runtime.stop_audio_recording", return_value=FakeAudio(8))
@patch("aman_runtime.start_audio_recording", return_value=(object(), object()))
def test_shutdown_stops_recording_without_injection(self, _start_mock, _stop_mock):
desktop = FakeDesktop()
daemon = self._build_daemon(desktop, FakeModel(), verbose=False)
@ -224,14 +221,14 @@ class DaemonTests(unittest.TestCase):
)
daemon.toggle()
self.assertEqual(daemon.get_state(), aman.State.RECORDING)
self.assertEqual(daemon.get_state(), aman_runtime.State.RECORDING)
self.assertTrue(daemon.shutdown(timeout=0.2))
self.assertEqual(daemon.get_state(), aman.State.IDLE)
self.assertEqual(daemon.get_state(), aman_runtime.State.IDLE)
self.assertEqual(desktop.inject_calls, [])
@patch("aman.stop_audio_recording", return_value=FakeAudio(8))
@patch("aman.start_audio_recording", return_value=(object(), object()))
@patch("aman_runtime.stop_audio_recording", return_value=FakeAudio(8))
@patch("aman_runtime.start_audio_recording", return_value=(object(), object()))
def test_dictionary_replacement_applies_after_ai(self, _start_mock, _stop_mock):
desktop = FakeDesktop()
model = FakeModel(text="good morning martha")
@ -250,8 +247,8 @@ class DaemonTests(unittest.TestCase):
self.assertEqual(desktop.inject_calls, [("good morning Marta", "clipboard", False)])
@patch("aman.stop_audio_recording", return_value=FakeAudio(8))
@patch("aman.start_audio_recording", return_value=(object(), object()))
@patch("aman_runtime.stop_audio_recording", return_value=FakeAudio(8))
@patch("aman_runtime.start_audio_recording", return_value=(object(), object()))
def test_editor_failure_aborts_output_injection(self, _start_mock, _stop_mock):
desktop = FakeDesktop()
model = FakeModel(text="hello world")
@ -274,10 +271,10 @@ class DaemonTests(unittest.TestCase):
daemon.toggle()
self.assertEqual(desktop.inject_calls, [])
self.assertEqual(daemon.get_state(), aman.State.IDLE)
self.assertEqual(daemon.get_state(), aman_runtime.State.IDLE)
@patch("aman.stop_audio_recording", return_value=FakeAudio(8))
@patch("aman.start_audio_recording", return_value=(object(), object()))
@patch("aman_runtime.stop_audio_recording", return_value=FakeAudio(8))
@patch("aman_runtime.start_audio_recording", return_value=(object(), object()))
def test_live_path_uses_asr_words_for_alignment_correction(self, _start_mock, _stop_mock):
desktop = FakeDesktop()
ai_processor = FakeAIProcessor()
@ -299,8 +296,8 @@ class DaemonTests(unittest.TestCase):
self.assertEqual(desktop.inject_calls, [("set alarm for 7", "clipboard", False)])
self.assertEqual(ai_processor.last_kwargs.get("lang"), "en")
@patch("aman.stop_audio_recording", return_value=FakeAudio(8))
@patch("aman.start_audio_recording", return_value=(object(), object()))
@patch("aman_runtime.stop_audio_recording", return_value=FakeAudio(8))
@patch("aman_runtime.start_audio_recording", return_value=(object(), object()))
def test_live_path_calls_word_aware_pipeline_entrypoint(self, _start_mock, _stop_mock):
desktop = FakeDesktop()
daemon = self._build_daemon(desktop, FakeModel(), verbose=False)
@ -413,10 +410,10 @@ class DaemonTests(unittest.TestCase):
def test_editor_stage_is_initialized_during_daemon_init(self):
desktop = FakeDesktop()
with patch("aman._build_whisper_model", return_value=FakeModel()), patch(
"aman.LlamaProcessor", return_value=FakeAIProcessor()
with patch("aman_runtime.build_whisper_model", return_value=FakeModel()), patch(
"aman_processing.LlamaProcessor", return_value=FakeAIProcessor()
) as processor_cls:
daemon = aman.Daemon(self._config(), desktop, verbose=True)
daemon = aman_runtime.Daemon(self._config(), desktop, verbose=True)
processor_cls.assert_called_once_with(verbose=True, model_path=None)
self.assertIsNotNone(daemon.editor_stage)
@ -424,10 +421,10 @@ class DaemonTests(unittest.TestCase):
def test_editor_stage_is_warmed_up_during_daemon_init(self):
desktop = FakeDesktop()
ai_processor = FakeAIProcessor()
with patch("aman._build_whisper_model", return_value=FakeModel()), patch(
"aman.LlamaProcessor", return_value=ai_processor
with patch("aman_runtime.build_whisper_model", return_value=FakeModel()), patch(
"aman_processing.LlamaProcessor", return_value=ai_processor
):
daemon = aman.Daemon(self._config(), desktop, verbose=False)
daemon = aman_runtime.Daemon(self._config(), desktop, verbose=False)
self.assertIs(daemon.editor_stage._processor, ai_processor)
self.assertEqual(ai_processor.warmup_calls, ["default"])
@ -438,11 +435,11 @@ class DaemonTests(unittest.TestCase):
cfg.advanced.strict_startup = True
ai_processor = FakeAIProcessor()
ai_processor.warmup_error = RuntimeError("warmup boom")
with patch("aman._build_whisper_model", return_value=FakeModel()), patch(
"aman.LlamaProcessor", return_value=ai_processor
with patch("aman_runtime.build_whisper_model", return_value=FakeModel()), patch(
"aman_processing.LlamaProcessor", return_value=ai_processor
):
with self.assertRaisesRegex(RuntimeError, "editor stage warmup failed"):
aman.Daemon(cfg, desktop, verbose=False)
aman_runtime.Daemon(cfg, desktop, verbose=False)
def test_editor_stage_warmup_failure_is_non_fatal_without_strict_startup(self):
desktop = FakeDesktop()
@ -450,19 +447,19 @@ class DaemonTests(unittest.TestCase):
cfg.advanced.strict_startup = False
ai_processor = FakeAIProcessor()
ai_processor.warmup_error = RuntimeError("warmup boom")
with patch("aman._build_whisper_model", return_value=FakeModel()), patch(
"aman.LlamaProcessor", return_value=ai_processor
with patch("aman_runtime.build_whisper_model", return_value=FakeModel()), patch(
"aman_processing.LlamaProcessor", return_value=ai_processor
):
with self.assertLogs(level="WARNING") as logs:
daemon = aman.Daemon(cfg, desktop, verbose=False)
daemon = aman_runtime.Daemon(cfg, desktop, verbose=False)
self.assertIs(daemon.editor_stage._processor, ai_processor)
self.assertTrue(
any("continuing because advanced.strict_startup=false" in line for line in logs.output)
)
@patch("aman.stop_audio_recording", return_value=FakeAudio(8))
@patch("aman.start_audio_recording", return_value=(object(), object()))
@patch("aman_runtime.stop_audio_recording", return_value=FakeAudio(8))
@patch("aman_runtime.start_audio_recording", return_value=(object(), object()))
def test_passes_clipboard_remove_option_to_desktop(self, _start_mock, _stop_mock):
desktop = FakeDesktop()
model = FakeModel(text="hello world")
@ -486,14 +483,12 @@ class DaemonTests(unittest.TestCase):
daemon = self._build_daemon(desktop, FakeModel(), verbose=False)
with self.assertLogs(level="DEBUG") as logs:
daemon.set_state(aman.State.RECORDING)
daemon.set_state(aman_runtime.State.RECORDING)
self.assertTrue(
any("DEBUG:root:state: idle -> recording" in line for line in logs.output)
)
self.assertTrue(any("DEBUG:root:state: idle -> recording" in line for line in logs.output))
@patch("aman.stop_audio_recording", return_value=FakeAudio(8))
@patch("aman.start_audio_recording", return_value=(object(), object()))
@patch("aman_runtime.stop_audio_recording", return_value=FakeAudio(8))
@patch("aman_runtime.start_audio_recording", return_value=(object(), object()))
def test_cancel_listener_armed_only_while_recording(self, _start_mock, _stop_mock):
desktop = FakeDesktop()
daemon = self._build_daemon(desktop, FakeModel(), verbose=False)
@ -514,7 +509,7 @@ class DaemonTests(unittest.TestCase):
self.assertEqual(desktop.cancel_listener_stop_calls, 1)
self.assertIsNone(desktop.cancel_listener_callback)
@patch("aman.start_audio_recording")
@patch("aman_runtime.start_audio_recording")
def test_recording_does_not_start_when_cancel_listener_fails(self, start_mock):
stream = FakeStream()
start_mock.return_value = (stream, object())
@ -523,13 +518,13 @@ class DaemonTests(unittest.TestCase):
daemon.toggle()
self.assertEqual(daemon.get_state(), aman.State.IDLE)
self.assertEqual(daemon.get_state(), aman_runtime.State.IDLE)
self.assertIsNone(daemon.stream)
self.assertIsNone(daemon.record)
self.assertEqual(stream.stop_calls, 1)
self.assertEqual(stream.close_calls, 1)
@patch("aman.start_audio_recording", side_effect=RuntimeError("device missing"))
@patch("aman_runtime.start_audio_recording", side_effect=RuntimeError("device missing"))
def test_record_start_failure_logs_actionable_issue(self, _start_mock):
desktop = FakeDesktop()
daemon = self._build_daemon(desktop, FakeModel(), verbose=False)
@ -541,8 +536,8 @@ class DaemonTests(unittest.TestCase):
self.assertIn("audio.input: record start failed: device missing", rendered)
self.assertIn("next_step: run `aman doctor --config", rendered)
@patch("aman.stop_audio_recording", return_value=FakeAudio(8))
@patch("aman.start_audio_recording", return_value=(object(), object()))
@patch("aman_runtime.stop_audio_recording", return_value=FakeAudio(8))
@patch("aman_runtime.start_audio_recording", return_value=(object(), object()))
def test_output_failure_logs_actionable_issue(self, _start_mock, _stop_mock):
desktop = FailingInjectDesktop()
daemon = self._build_daemon(desktop, FakeModel(), verbose=False)
@ -560,8 +555,8 @@ class DaemonTests(unittest.TestCase):
self.assertIn("injection.backend: output failed: xtest unavailable", rendered)
self.assertIn("next_step: run `aman doctor --config", rendered)
@patch("aman.stop_audio_recording", return_value=FakeAudio(8))
@patch("aman.start_audio_recording", return_value=(object(), object()))
@patch("aman_runtime.stop_audio_recording", return_value=FakeAudio(8))
@patch("aman_runtime.start_audio_recording", return_value=(object(), object()))
def test_ai_processor_receives_active_profile(self, _start_mock, _stop_mock):
desktop = FakeDesktop()
cfg = self._config()
@ -585,8 +580,8 @@ class DaemonTests(unittest.TestCase):
self.assertEqual(ai_processor.last_kwargs.get("profile"), "fast")
@patch("aman.stop_audio_recording", return_value=FakeAudio(8))
@patch("aman.start_audio_recording", return_value=(object(), object()))
@patch("aman_runtime.stop_audio_recording", return_value=FakeAudio(8))
@patch("aman_runtime.start_audio_recording", return_value=(object(), object()))
def test_ai_processor_receives_effective_language(self, _start_mock, _stop_mock):
desktop = FakeDesktop()
cfg = self._config()
@ -610,7 +605,7 @@ class DaemonTests(unittest.TestCase):
self.assertEqual(ai_processor.last_kwargs.get("lang"), "es")
@patch("aman.start_audio_recording")
@patch("aman_runtime.start_audio_recording")
def test_paused_state_blocks_recording_start(self, start_mock):
desktop = FakeDesktop()
daemon = self._build_daemon(desktop, FakeModel(), verbose=False)
@ -619,22 +614,9 @@ class DaemonTests(unittest.TestCase):
daemon.toggle()
start_mock.assert_not_called()
self.assertEqual(daemon.get_state(), aman.State.IDLE)
self.assertEqual(daemon.get_state(), aman_runtime.State.IDLE)
self.assertEqual(desktop.cancel_listener_start_calls, 0)
class LockTests(unittest.TestCase):
def test_lock_rejects_second_instance(self):
with tempfile.TemporaryDirectory() as td:
with patch.dict(os.environ, {"XDG_RUNTIME_DIR": td}, clear=False):
first = aman._lock_single_instance()
try:
with self.assertRaises(SystemExit) as ctx:
aman._lock_single_instance()
self.assertIn("already running", str(ctx.exception))
finally:
first.close()
if __name__ == "__main__":
unittest.main()