Today there's no way to correlate a CLI failure with a daemon log line. operationLog records relative timing but no id, two concurrent vm.start calls log indistinguishably, and the async vmCreateOperationState.ID is user-facing yet never reaches the journal. The root helper logs plain text to stderr while bangerd logs JSON, so a merged journalctl is hard to grep across the trust-boundary split. Mint a per-RPC op id at dispatch entry, store it on context, and include it as an "op_id" attr on every operationLog record. The id is stamped onto every error response (including the early short-circuit paths bad_version and unknown_method). rpc.Call forwards the context op id on requests so a daemon RPC and the helper RPCs it triggers all share one id. The helper now logs JSON to match bangerd, adopts the inbound id, and emits a single "helper rpc completed" / "helper rpc failed" line per call so operators can see at a glance how long each privileged op took. vmCreateOperationState.ID is now the same id dispatch generated for vm.create.begin — one identifier between client status polls, daemon logs, and helper logs. The wire format gains two optional fields: rpc.Request.OpID and rpc.ErrorResponse.OpID, both omitempty so older peers (and the opposite direction) ignore them. ErrorResponse.Error() now appends "(op-XXXXXX)" to its string form when set; existing callers that just print err.Error() get the id for free. Tests cover: dispatch stamps op_id on unknown_method, bad_version, and handler-returned errors; rpc.Call exposes the typed *ErrorResponse via errors.As so the CLI can read code/op_id; ctx op_id is forwarded to the server in the request envelope. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
204 lines
5.1 KiB
Go
204 lines
5.1 KiB
Go
package daemon
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"banger/internal/api"
|
|
"banger/internal/model"
|
|
)
|
|
|
|
func (op *vmCreateOperationState) ID() string { return op.snapshot().ID }
|
|
func (op *vmCreateOperationState) IsDone() bool { return op.snapshot().Done }
|
|
func (op *vmCreateOperationState) UpdatedAt() time.Time { return op.snapshot().UpdatedAt }
|
|
func (op *vmCreateOperationState) Cancel() { op.cancelOperation() }
|
|
|
|
type vmCreateProgressKey struct{}
|
|
|
|
type vmCreateOperationState struct {
|
|
mu sync.Mutex
|
|
cancel context.CancelFunc
|
|
op api.VMCreateOperation
|
|
}
|
|
|
|
// newVMCreateOperationState constructs the async-progress record for
|
|
// a vm.create.begin RPC. When the caller's context already carries a
|
|
// dispatch-assigned op id (the normal path), we reuse it so the
|
|
// operator-visible status id and the daemon-log op_id are the same
|
|
// string. Otherwise we mint a fresh op id — keeps the same shape on
|
|
// internal call sites that don't go through dispatch (tests, future
|
|
// background creators).
|
|
func newVMCreateOperationState(ctx context.Context) (*vmCreateOperationState, error) {
|
|
id := OpIDFromContext(ctx)
|
|
if id == "" {
|
|
var err error
|
|
id, err = model.NewOpID()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
now := model.Now()
|
|
return &vmCreateOperationState{
|
|
op: api.VMCreateOperation{
|
|
ID: id,
|
|
Stage: "queued",
|
|
Detail: "waiting to start",
|
|
StartedAt: now,
|
|
UpdatedAt: now,
|
|
},
|
|
}, nil
|
|
}
|
|
|
|
func withVMCreateProgress(ctx context.Context, op *vmCreateOperationState) context.Context {
|
|
if op == nil {
|
|
return ctx
|
|
}
|
|
return context.WithValue(ctx, vmCreateProgressKey{}, op)
|
|
}
|
|
|
|
func vmCreateProgressFromContext(ctx context.Context) *vmCreateOperationState {
|
|
if ctx == nil {
|
|
return nil
|
|
}
|
|
op, _ := ctx.Value(vmCreateProgressKey{}).(*vmCreateOperationState)
|
|
return op
|
|
}
|
|
|
|
func vmCreateStage(ctx context.Context, stage, detail string) {
|
|
if op := vmCreateProgressFromContext(ctx); op != nil {
|
|
op.stage(stage, detail)
|
|
}
|
|
}
|
|
|
|
func vmCreateBindVM(ctx context.Context, vm model.VMRecord) {
|
|
if op := vmCreateProgressFromContext(ctx); op != nil {
|
|
op.bindVM(vm)
|
|
}
|
|
}
|
|
|
|
func (op *vmCreateOperationState) setCancel(cancel context.CancelFunc) {
|
|
op.mu.Lock()
|
|
defer op.mu.Unlock()
|
|
op.cancel = cancel
|
|
}
|
|
|
|
func (op *vmCreateOperationState) bindVM(vm model.VMRecord) {
|
|
op.mu.Lock()
|
|
defer op.mu.Unlock()
|
|
op.op.VMID = vm.ID
|
|
op.op.VMName = vm.Name
|
|
}
|
|
|
|
func (op *vmCreateOperationState) stage(stage, detail string) {
|
|
op.mu.Lock()
|
|
defer op.mu.Unlock()
|
|
stage = strings.TrimSpace(stage)
|
|
detail = strings.TrimSpace(detail)
|
|
if stage == "" {
|
|
stage = op.op.Stage
|
|
}
|
|
if stage == op.op.Stage && detail == op.op.Detail {
|
|
return
|
|
}
|
|
op.op.Stage = stage
|
|
op.op.Detail = detail
|
|
op.op.UpdatedAt = model.Now()
|
|
}
|
|
|
|
func (op *vmCreateOperationState) done(vm model.VMRecord) {
|
|
op.mu.Lock()
|
|
defer op.mu.Unlock()
|
|
vmCopy := vm
|
|
op.op.VMID = vm.ID
|
|
op.op.VMName = vm.Name
|
|
op.op.Stage = "ready"
|
|
op.op.Detail = "vm is ready"
|
|
op.op.Done = true
|
|
op.op.Success = true
|
|
op.op.Error = ""
|
|
op.op.VM = &vmCopy
|
|
op.op.UpdatedAt = model.Now()
|
|
}
|
|
|
|
func (op *vmCreateOperationState) fail(err error) {
|
|
op.mu.Lock()
|
|
defer op.mu.Unlock()
|
|
op.op.Done = true
|
|
op.op.Success = false
|
|
if err != nil {
|
|
op.op.Error = err.Error()
|
|
}
|
|
if strings.TrimSpace(op.op.Detail) == "" {
|
|
op.op.Detail = "vm create failed"
|
|
}
|
|
op.op.UpdatedAt = model.Now()
|
|
}
|
|
|
|
func (op *vmCreateOperationState) snapshot() api.VMCreateOperation {
|
|
op.mu.Lock()
|
|
defer op.mu.Unlock()
|
|
snapshot := op.op
|
|
if snapshot.VM != nil {
|
|
vmCopy := *snapshot.VM
|
|
snapshot.VM = &vmCopy
|
|
}
|
|
return snapshot
|
|
}
|
|
|
|
func (op *vmCreateOperationState) cancelOperation() {
|
|
op.mu.Lock()
|
|
cancel := op.cancel
|
|
op.mu.Unlock()
|
|
if cancel != nil {
|
|
cancel()
|
|
}
|
|
}
|
|
|
|
func (s *VMService) BeginVMCreate(ctx context.Context, params api.VMCreateParams) (api.VMCreateOperation, error) {
|
|
op, err := newVMCreateOperationState(ctx)
|
|
if err != nil {
|
|
return api.VMCreateOperation{}, err
|
|
}
|
|
// Detach from the caller's deadline (the begin RPC returns
|
|
// immediately) but preserve the op id so every log line emitted
|
|
// by the goroutine carries the same identifier the client just
|
|
// got back.
|
|
createCtx, cancel := context.WithCancel(WithOpID(context.Background(), op.op.ID))
|
|
op.setCancel(cancel)
|
|
s.createOps.Insert(op)
|
|
go s.runVMCreateOperation(withVMCreateProgress(createCtx, op), op, params)
|
|
return op.snapshot(), nil
|
|
}
|
|
|
|
func (s *VMService) runVMCreateOperation(ctx context.Context, op *vmCreateOperationState, params api.VMCreateParams) {
|
|
vm, err := s.CreateVM(ctx, params)
|
|
if err != nil {
|
|
op.fail(err)
|
|
return
|
|
}
|
|
op.done(vm)
|
|
}
|
|
|
|
func (s *VMService) VMCreateStatus(_ context.Context, id string) (api.VMCreateOperation, error) {
|
|
op, ok := s.createOps.Get(strings.TrimSpace(id))
|
|
if !ok {
|
|
return api.VMCreateOperation{}, fmt.Errorf("vm create operation not found: %s", id)
|
|
}
|
|
return op.snapshot(), nil
|
|
}
|
|
|
|
func (s *VMService) CancelVMCreate(_ context.Context, id string) error {
|
|
op, ok := s.createOps.Get(strings.TrimSpace(id))
|
|
if !ok {
|
|
return fmt.Errorf("vm create operation not found: %s", id)
|
|
}
|
|
op.cancelOperation()
|
|
return nil
|
|
}
|
|
|
|
func (s *VMService) pruneVMCreateOperations(olderThan time.Time) {
|
|
s.createOps.Prune(olderThan)
|
|
}
|