banger/internal/daemon/vm_create_ops.go
Thales Maciel e47b8146dc
daemon: thread per-RPC op_id end-to-end
Today there's no way to correlate a CLI failure with a daemon log
line. operationLog records relative timing but no id, two concurrent
vm.start calls log indistinguishably, and the async
vmCreateOperationState.ID is user-facing yet never reaches the
journal. The root helper logs plain text to stderr while bangerd
logs JSON, so a merged journalctl is hard to grep across the
trust-boundary split.

Mint a per-RPC op id at dispatch entry, store it on context, and
include it as an "op_id" attr on every operationLog record. The
id is stamped onto every error response (including the early
short-circuit paths bad_version and unknown_method). rpc.Call
forwards the context op id on requests so a daemon RPC and the
helper RPCs it triggers all share one id. The helper now logs
JSON to match bangerd, adopts the inbound id, and emits a single
"helper rpc completed" / "helper rpc failed" line per call so
operators can see at a glance how long each privileged op took.

vmCreateOperationState.ID is now the same id dispatch generated
for vm.create.begin — one identifier between client status polls,
daemon logs, and helper logs.

The wire format gains two optional fields: rpc.Request.OpID and
rpc.ErrorResponse.OpID, both omitempty so older peers (and the
opposite direction) ignore them. ErrorResponse.Error() now appends
"(op-XXXXXX)" to its string form when set; existing callers that
just print err.Error() get the id for free.

Tests cover: dispatch stamps op_id on unknown_method, bad_version,
and handler-returned errors; rpc.Call exposes the typed
*ErrorResponse via errors.As so the CLI can read code/op_id; ctx
op_id is forwarded to the server in the request envelope.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-26 22:13:44 -03:00

204 lines
5.1 KiB
Go

package daemon
import (
"context"
"fmt"
"strings"
"sync"
"time"
"banger/internal/api"
"banger/internal/model"
)
func (op *vmCreateOperationState) ID() string { return op.snapshot().ID }
func (op *vmCreateOperationState) IsDone() bool { return op.snapshot().Done }
func (op *vmCreateOperationState) UpdatedAt() time.Time { return op.snapshot().UpdatedAt }
func (op *vmCreateOperationState) Cancel() { op.cancelOperation() }
type vmCreateProgressKey struct{}
type vmCreateOperationState struct {
mu sync.Mutex
cancel context.CancelFunc
op api.VMCreateOperation
}
// newVMCreateOperationState constructs the async-progress record for
// a vm.create.begin RPC. When the caller's context already carries a
// dispatch-assigned op id (the normal path), we reuse it so the
// operator-visible status id and the daemon-log op_id are the same
// string. Otherwise we mint a fresh op id — keeps the same shape on
// internal call sites that don't go through dispatch (tests, future
// background creators).
func newVMCreateOperationState(ctx context.Context) (*vmCreateOperationState, error) {
id := OpIDFromContext(ctx)
if id == "" {
var err error
id, err = model.NewOpID()
if err != nil {
return nil, err
}
}
now := model.Now()
return &vmCreateOperationState{
op: api.VMCreateOperation{
ID: id,
Stage: "queued",
Detail: "waiting to start",
StartedAt: now,
UpdatedAt: now,
},
}, nil
}
func withVMCreateProgress(ctx context.Context, op *vmCreateOperationState) context.Context {
if op == nil {
return ctx
}
return context.WithValue(ctx, vmCreateProgressKey{}, op)
}
func vmCreateProgressFromContext(ctx context.Context) *vmCreateOperationState {
if ctx == nil {
return nil
}
op, _ := ctx.Value(vmCreateProgressKey{}).(*vmCreateOperationState)
return op
}
func vmCreateStage(ctx context.Context, stage, detail string) {
if op := vmCreateProgressFromContext(ctx); op != nil {
op.stage(stage, detail)
}
}
func vmCreateBindVM(ctx context.Context, vm model.VMRecord) {
if op := vmCreateProgressFromContext(ctx); op != nil {
op.bindVM(vm)
}
}
func (op *vmCreateOperationState) setCancel(cancel context.CancelFunc) {
op.mu.Lock()
defer op.mu.Unlock()
op.cancel = cancel
}
func (op *vmCreateOperationState) bindVM(vm model.VMRecord) {
op.mu.Lock()
defer op.mu.Unlock()
op.op.VMID = vm.ID
op.op.VMName = vm.Name
}
func (op *vmCreateOperationState) stage(stage, detail string) {
op.mu.Lock()
defer op.mu.Unlock()
stage = strings.TrimSpace(stage)
detail = strings.TrimSpace(detail)
if stage == "" {
stage = op.op.Stage
}
if stage == op.op.Stage && detail == op.op.Detail {
return
}
op.op.Stage = stage
op.op.Detail = detail
op.op.UpdatedAt = model.Now()
}
func (op *vmCreateOperationState) done(vm model.VMRecord) {
op.mu.Lock()
defer op.mu.Unlock()
vmCopy := vm
op.op.VMID = vm.ID
op.op.VMName = vm.Name
op.op.Stage = "ready"
op.op.Detail = "vm is ready"
op.op.Done = true
op.op.Success = true
op.op.Error = ""
op.op.VM = &vmCopy
op.op.UpdatedAt = model.Now()
}
func (op *vmCreateOperationState) fail(err error) {
op.mu.Lock()
defer op.mu.Unlock()
op.op.Done = true
op.op.Success = false
if err != nil {
op.op.Error = err.Error()
}
if strings.TrimSpace(op.op.Detail) == "" {
op.op.Detail = "vm create failed"
}
op.op.UpdatedAt = model.Now()
}
func (op *vmCreateOperationState) snapshot() api.VMCreateOperation {
op.mu.Lock()
defer op.mu.Unlock()
snapshot := op.op
if snapshot.VM != nil {
vmCopy := *snapshot.VM
snapshot.VM = &vmCopy
}
return snapshot
}
func (op *vmCreateOperationState) cancelOperation() {
op.mu.Lock()
cancel := op.cancel
op.mu.Unlock()
if cancel != nil {
cancel()
}
}
func (s *VMService) BeginVMCreate(ctx context.Context, params api.VMCreateParams) (api.VMCreateOperation, error) {
op, err := newVMCreateOperationState(ctx)
if err != nil {
return api.VMCreateOperation{}, err
}
// Detach from the caller's deadline (the begin RPC returns
// immediately) but preserve the op id so every log line emitted
// by the goroutine carries the same identifier the client just
// got back.
createCtx, cancel := context.WithCancel(WithOpID(context.Background(), op.op.ID))
op.setCancel(cancel)
s.createOps.Insert(op)
go s.runVMCreateOperation(withVMCreateProgress(createCtx, op), op, params)
return op.snapshot(), nil
}
func (s *VMService) runVMCreateOperation(ctx context.Context, op *vmCreateOperationState, params api.VMCreateParams) {
vm, err := s.CreateVM(ctx, params)
if err != nil {
op.fail(err)
return
}
op.done(vm)
}
func (s *VMService) VMCreateStatus(_ context.Context, id string) (api.VMCreateOperation, error) {
op, ok := s.createOps.Get(strings.TrimSpace(id))
if !ok {
return api.VMCreateOperation{}, fmt.Errorf("vm create operation not found: %s", id)
}
return op.snapshot(), nil
}
func (s *VMService) CancelVMCreate(_ context.Context, id string) error {
op, ok := s.createOps.Get(strings.TrimSpace(id))
if !ok {
return fmt.Errorf("vm create operation not found: %s", id)
}
op.cancelOperation()
return nil
}
func (s *VMService) pruneVMCreateOperations(olderThan time.Time) {
s.createOps.Prune(olderThan)
}