daemon: thread per-RPC op_id end-to-end

Today there's no way to correlate a CLI failure with a daemon log
line. operationLog records relative timing but no id, two concurrent
vm.start calls log indistinguishably, and the async
vmCreateOperationState.ID is user-facing yet never reaches the
journal. The root helper logs plain text to stderr while bangerd
logs JSON, so a merged journalctl is hard to grep across the
trust-boundary split.

Mint a per-RPC op id at dispatch entry, store it on context, and
include it as an "op_id" attr on every operationLog record. The
id is stamped onto every error response (including the early
short-circuit paths bad_version and unknown_method). rpc.Call
forwards the context op id on requests so a daemon RPC and the
helper RPCs it triggers all share one id. The helper now logs
JSON to match bangerd, adopts the inbound id, and emits a single
"helper rpc completed" / "helper rpc failed" line per call so
operators can see at a glance how long each privileged op took.

vmCreateOperationState.ID is now the same id dispatch generated
for vm.create.begin — one identifier between client status polls,
daemon logs, and helper logs.

The wire format gains two optional fields: rpc.Request.OpID and
rpc.ErrorResponse.OpID, both omitempty so older peers (and the
opposite direction) ignore them. ErrorResponse.Error() now appends
"(op-XXXXXX)" to its string form when set; existing callers that
just print err.Error() get the id for free.

Tests cover: dispatch stamps op_id on unknown_method, bad_version,
and handler-returned errors; rpc.Call exposes the typed
*ErrorResponse via errors.As so the CLI can read code/op_id; ctx
op_id is forwarded to the server in the request envelope.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Thales Maciel 2026-04-26 22:13:44 -03:00
parent b8c48765fb
commit e47b8146dc
No known key found for this signature in database
GPG key ID: 33112E6833C34679
16 changed files with 333 additions and 44 deletions

View file

@ -310,17 +310,34 @@ func (d *Daemon) watchRequestDisconnect(conn net.Conn, reader *bufio.Reader, met
}
func (d *Daemon) dispatch(ctx context.Context, req rpc.Request) rpc.Response {
// Per-RPC correlation id is generated unconditionally — even
// errors that short-circuit before reaching a handler get one
// so the operator has a handle for every CLI failure.
// Generation can fail in theory (crypto/rand IO error) —
// degrade gracefully to a blank id rather than tearing down
// the request.
opID, _ := model.NewOpID()
if opID != "" {
ctx = WithOpID(ctx, opID)
}
stampOpID := func(resp rpc.Response) rpc.Response {
if !resp.OK && resp.Error != nil && resp.Error.OpID == "" && opID != "" {
resp.Error.OpID = opID
}
return resp
}
if req.Version != rpc.Version {
return rpc.NewError("bad_version", fmt.Sprintf("unsupported version %d", req.Version))
return stampOpID(rpc.NewError("bad_version", fmt.Sprintf("unsupported version %d", req.Version)))
}
if d.requestHandler != nil {
return d.requestHandler(ctx, req)
return stampOpID(d.requestHandler(ctx, req))
}
h, ok := rpcHandlers[req.Method]
if !ok {
return rpc.NewError("unknown_method", req.Method)
return stampOpID(rpc.NewError("unknown_method", req.Method))
}
return h(ctx, d, req)
return stampOpID(h(ctx, d, req))
}
func (d *Daemon) backgroundLoop() {
@ -346,7 +363,7 @@ func (d *Daemon) backgroundLoop() {
}
func (d *Daemon) reconcile(ctx context.Context) error {
op := d.beginOperation("daemon.reconcile")
op := d.beginOperation(ctx, "daemon.reconcile")
vms, err := d.store.ListVMs(ctx)
if err != nil {
return op.fail(err)
@ -441,14 +458,12 @@ func wireServices(d *Daemon) {
}
if d.img == nil {
d.img = newImageService(imageServiceDeps{
runner: d.runner,
logger: d.logger,
config: d.config,
layout: d.layout,
store: d.store,
beginOperation: func(name string, attrs ...any) *operationLog {
return d.beginOperation(name, attrs...)
},
runner: d.runner,
logger: d.logger,
config: d.config,
layout: d.layout,
store: d.store,
beginOperation: d.beginOperation,
})
}
if d.ws == nil {

View file

@ -1,8 +1,12 @@
package daemon
import (
"context"
"sort"
"strings"
"testing"
"banger/internal/rpc"
)
// TestRPCHandlersMatchDocumentedMethods pins the surface of the RPC
@ -82,3 +86,55 @@ func TestRPCHandlersAllNonNil(t *testing.T) {
}
}
}
// TestDispatchStampsOpIDOnError pins the contract that every error
// response leaving dispatch carries an op_id, even on the
// short-circuit paths (bad_version, unknown_method) that never
// reach a handler. Operators rely on this id to correlate a CLI
// failure to a daemon log line.
func TestDispatchStampsOpIDOnError(t *testing.T) {
d := &Daemon{}
t.Run("unknown_method", func(t *testing.T) {
resp := d.dispatch(context.Background(), rpc.Request{Version: rpc.Version, Method: "no.such.method"})
if resp.OK {
t.Fatalf("expected error response, got %+v", resp)
}
if resp.Error == nil || resp.Error.Code != "unknown_method" {
t.Fatalf("error = %+v, want unknown_method", resp.Error)
}
if !strings.HasPrefix(resp.Error.OpID, "op-") {
t.Fatalf("op_id = %q, want op-* prefix", resp.Error.OpID)
}
})
t.Run("bad_version", func(t *testing.T) {
resp := d.dispatch(context.Background(), rpc.Request{Version: rpc.Version + 99, Method: "ping"})
if resp.OK {
t.Fatalf("expected error response, got %+v", resp)
}
if resp.Error == nil || resp.Error.Code != "bad_version" {
t.Fatalf("error = %+v, want bad_version", resp.Error)
}
if !strings.HasPrefix(resp.Error.OpID, "op-") {
t.Fatalf("op_id = %q, want op-* prefix", resp.Error.OpID)
}
})
}
// TestDispatchPropagatesOpIDFromContext covers the case where a
// handler returns its own rpc.NewError with an empty op_id (most
// service errors do); the dispatch wrapper must stamp the
// dispatch-generated id on the way out.
func TestDispatchPropagatesOpIDFromContext(t *testing.T) {
d := &Daemon{
requestHandler: func(_ context.Context, _ rpc.Request) rpc.Response {
return rpc.NewError("operation_failed", "deliberate test failure")
},
}
resp := d.dispatch(context.Background(), rpc.Request{Version: rpc.Version, Method: "anything"})
if resp.OK || resp.Error == nil {
t.Fatalf("expected error response, got %+v", resp)
}
if !strings.HasPrefix(resp.Error.OpID, "op-") {
t.Fatalf("dispatch did not stamp op_id: %+v", resp.Error)
}
}

View file

@ -47,7 +47,7 @@ type ImageService struct {
// beginOperation is a test seam used by a couple of image ops that
// want structured operation logging. Nil → Daemon's beginOperation,
// injected at construction.
beginOperation func(name string, attrs ...any) *operationLog
beginOperation func(ctx context.Context, name string, attrs ...any) *operationLog
}
// imageServiceDeps names every handle ImageService needs from the
@ -59,7 +59,7 @@ type imageServiceDeps struct {
config model.DaemonConfig
layout paths.Layout
store *store.Store
beginOperation func(name string, attrs ...any) *operationLog
beginOperation func(ctx context.Context, name string, attrs ...any) *operationLog
}
func newImageService(deps imageServiceDeps) *ImageService {

View file

@ -98,7 +98,7 @@ func (s *ImageService) RegisterImage(ctx context.Context, params api.ImageRegist
// imageOpsMu — only the find/rename/upsert commit atom holds the
// lock.
func (s *ImageService) PromoteImage(ctx context.Context, idOrName string) (image model.Image, err error) {
op := s.beginOperation("image.promote")
op := s.beginOperation(ctx, "image.promote")
defer func() {
if err != nil {
op.fail(err, imageLogAttrs(image)...)

View file

@ -9,6 +9,7 @@ import (
"time"
"banger/internal/model"
"banger/internal/rpc"
)
func newDaemonLogger(w io.Writer, rawLevel string) (*slog.Logger, string, error) {
@ -35,9 +36,37 @@ func parseLogLevel(raw string) (slog.Level, string, error) {
}
}
func (d *Daemon) beginOperation(name string, attrs ...any) *operationLog {
// WithOpID stores the per-RPC correlation id on ctx. Re-exported
// from rpc so daemon-side call sites don't have to import rpc just
// for context plumbing. The dispatch layer calls this on every
// incoming request; capability hooks, lifecycle steps, and the
// privileged-ops shim that crosses into the root helper all read
// the id back via OpIDFromContext so a single id stitches the
// whole chain together in journalctl.
func WithOpID(ctx context.Context, opID string) context.Context {
return rpc.WithOpID(ctx, opID)
}
// OpIDFromContext returns the dispatch-assigned op id stored on
// ctx, or "" if none was set.
func OpIDFromContext(ctx context.Context) string {
return rpc.OpIDFromContext(ctx)
}
// beginOperation starts a logged operation. When ctx carries a
// dispatch-assigned op id (see WithOpID) every log line emitted
// through the returned operationLog includes it as an "op_id" attr,
// so the daemon journal can be greppable by id from the user's CLI
// error all the way down through capability hooks and the root
// helper.
func (d *Daemon) beginOperation(ctx context.Context, name string, attrs ...any) *operationLog {
opID := OpIDFromContext(ctx)
allAttrs := append([]any(nil), attrs...)
if opID != "" {
allAttrs = append([]any{"op_id", opID}, allAttrs...)
}
if d.logger != nil {
d.logger.Info("operation started", append([]any{"operation", name}, attrs...)...)
d.logger.Info("operation started", append([]any{"operation", name}, allAttrs...)...)
}
now := time.Now()
return &operationLog{
@ -45,7 +74,8 @@ func (d *Daemon) beginOperation(name string, attrs ...any) *operationLog {
name: name,
started: now,
last: now,
attrs: append([]any(nil), attrs...),
attrs: allAttrs,
opID: opID,
}
}
@ -55,6 +85,16 @@ type operationLog struct {
started time.Time
last time.Time
attrs []any
opID string
}
// OpID exposes the correlation id this operation was started with so
// dispatch can stamp it onto an outgoing error response.
func (o *operationLog) OpID() string {
if o == nil {
return ""
}
return o.opID
}
func (o *operationLog) stage(stage string, attrs ...any) {

View file

@ -39,7 +39,7 @@ type StatsService struct {
config model.DaemonConfig
store *store.Store
net *HostNetwork
beginOperation func(name string, attrs ...any) *operationLog
beginOperation func(ctx context.Context, name string, attrs ...any) *operationLog
// vmAlive / vmHandles are the minimum pair needed to answer "is
// this VM actually running right now?" + "what PID is it?".
@ -68,7 +68,7 @@ type statsServiceDeps struct {
config model.DaemonConfig
store *store.Store
net *HostNetwork
beginOperation func(name string, attrs ...any) *operationLog
beginOperation func(ctx context.Context, name string, attrs ...any) *operationLog
vmAlive func(vm model.VMRecord) bool
vmHandles func(vmID string) model.VMHandles
withVMLockByRef func(ctx context.Context, idOrName string, fn func(model.VMRecord) (model.VMRecord, error)) (model.VMRecord, error)
@ -189,7 +189,7 @@ func (s *StatsService) stopStaleVMs(ctx context.Context) (err error) {
if s.config.AutoStopStaleAfter <= 0 {
return nil
}
op := s.beginOperation("vm.stop_stale")
op := s.beginOperation(ctx, "vm.stop_stale")
defer func() {
if err != nil {
op.fail(err)

View file

@ -28,7 +28,7 @@ import (
// 3. Boot. Only the per-VM lock is held — parallel creates against
// different VMs fully overlap.
func (s *VMService) CreateVM(ctx context.Context, params api.VMCreateParams) (vm model.VMRecord, err error) {
op := s.beginOperation("vm.create")
op := s.beginOperation(ctx, "vm.create")
defer func() {
if err != nil {
op.fail(err)

View file

@ -24,10 +24,21 @@ type vmCreateOperationState struct {
op api.VMCreateOperation
}
func newVMCreateOperationState() (*vmCreateOperationState, error) {
id, err := model.NewID()
if err != nil {
return nil, err
// newVMCreateOperationState constructs the async-progress record for
// a vm.create.begin RPC. When the caller's context already carries a
// dispatch-assigned op id (the normal path), we reuse it so the
// operator-visible status id and the daemon-log op_id are the same
// string. Otherwise we mint a fresh op id — keeps the same shape on
// internal call sites that don't go through dispatch (tests, future
// background creators).
func newVMCreateOperationState(ctx context.Context) (*vmCreateOperationState, error) {
id := OpIDFromContext(ctx)
if id == "" {
var err error
id, err = model.NewOpID()
if err != nil {
return nil, err
}
}
now := model.Now()
return &vmCreateOperationState{
@ -146,12 +157,16 @@ func (op *vmCreateOperationState) cancelOperation() {
}
}
func (s *VMService) BeginVMCreate(_ context.Context, params api.VMCreateParams) (api.VMCreateOperation, error) {
op, err := newVMCreateOperationState()
func (s *VMService) BeginVMCreate(ctx context.Context, params api.VMCreateParams) (api.VMCreateOperation, error) {
op, err := newVMCreateOperationState(ctx)
if err != nil {
return api.VMCreateOperation{}, err
}
createCtx, cancel := context.WithCancel(context.Background())
// Detach from the caller's deadline (the begin RPC returns
// immediately) but preserve the op id so every log line emitted
// by the goroutine carries the same identifier the client just
// got back.
createCtx, cancel := context.WithCancel(WithOpID(context.Background(), op.op.ID))
op.setCancel(cancel)
s.createOps.Insert(op)
go s.runVMCreateOperation(withVMCreateProgress(createCtx, op), op, params)

View file

@ -30,7 +30,7 @@ func (s *VMService) StartVM(ctx context.Context, idOrName string) (model.VMRecor
}
func (s *VMService) startVMLocked(ctx context.Context, vm model.VMRecord, image model.Image) (_ model.VMRecord, err error) {
op := s.beginOperation("vm.start", append(vmLogAttrs(vm), imageLogAttrs(image)...)...)
op := s.beginOperation(ctx, "vm.start", append(vmLogAttrs(vm), imageLogAttrs(image)...)...)
defer func() {
if err != nil {
err = annotateLogPath(err, vm.Runtime.LogPath)
@ -97,7 +97,7 @@ func (s *VMService) StopVM(ctx context.Context, idOrName string) (model.VMRecord
func (s *VMService) stopVMLocked(ctx context.Context, current model.VMRecord) (vm model.VMRecord, err error) {
vm = current
op := s.beginOperation("vm.stop", "vm_ref", vm.ID)
op := s.beginOperation(ctx, "vm.stop", "vm_ref", vm.ID)
defer func() {
if err != nil {
op.fail(err, vmLogAttrs(vm)...)
@ -154,7 +154,7 @@ func (s *VMService) KillVM(ctx context.Context, params api.VMKillParams) (model.
func (s *VMService) killVMLocked(ctx context.Context, current model.VMRecord, signalValue string) (vm model.VMRecord, err error) {
vm = current
op := s.beginOperation("vm.kill", "vm_ref", vm.ID, "signal", signalValue)
op := s.beginOperation(ctx, "vm.kill", "vm_ref", vm.ID, "signal", signalValue)
defer func() {
if err != nil {
op.fail(err, vmLogAttrs(vm)...)
@ -209,7 +209,7 @@ func (s *VMService) killVMLocked(ctx context.Context, current model.VMRecord, si
}
func (s *VMService) RestartVM(ctx context.Context, idOrName string) (vm model.VMRecord, err error) {
op := s.beginOperation("vm.restart", "vm_ref", idOrName)
op := s.beginOperation(ctx, "vm.restart", "vm_ref", idOrName)
defer func() {
if err != nil {
op.fail(err, vmLogAttrs(vm)...)
@ -244,7 +244,7 @@ func (s *VMService) DeleteVM(ctx context.Context, idOrName string) (model.VMReco
func (s *VMService) deleteVMLocked(ctx context.Context, current model.VMRecord) (vm model.VMRecord, err error) {
vm = current
op := s.beginOperation("vm.delete", "vm_ref", vm.ID)
op := s.beginOperation(ctx, "vm.delete", "vm_ref", vm.ID)
defer func() {
if err != nil {
op.fail(err, vmLogAttrs(vm)...)

View file

@ -76,7 +76,7 @@ type VMService struct {
// VMService never reaches back to *Daemon.
capHooks capabilityHooks
beginOperation func(name string, attrs ...any) *operationLog
beginOperation func(ctx context.Context, name string, attrs ...any) *operationLog
}
// capabilityHooks bundles the capability-dispatch entry points that
@ -104,7 +104,7 @@ type vmServiceDeps struct {
ws *WorkspaceService
priv privilegedOps
capHooks capabilityHooks
beginOperation func(name string, attrs ...any) *operationLog
beginOperation func(ctx context.Context, name string, attrs ...any) *operationLog
vsockHostDevice string
}

View file

@ -17,7 +17,7 @@ func (s *VMService) SetVM(ctx context.Context, params api.VMSetParams) (model.VM
func (s *VMService) setVMLocked(ctx context.Context, current model.VMRecord, params api.VMSetParams) (vm model.VMRecord, err error) {
vm = current
op := s.beginOperation("vm.set", "vm_ref", vm.ID)
op := s.beginOperation(ctx, "vm.set", "vm_ref", vm.ID)
defer func() {
if err != nil {
op.fail(err, vmLogAttrs(vm)...)

View file

@ -43,7 +43,7 @@ type WorkspaceService struct {
imageWorkSeed func(ctx context.Context, image model.Image, fingerprint string) error
withVMLockByRef func(ctx context.Context, idOrName string, fn func(model.VMRecord) (model.VMRecord, error)) (model.VMRecord, error)
beginOperation func(name string, attrs ...any) *operationLog
beginOperation func(ctx context.Context, name string, attrs ...any) *operationLog
// repoInspector is the Inspector used by the real InspectRepo /
// ImportRepoToGuest fallbacks when the test seams below aren't
@ -71,7 +71,7 @@ type workspaceServiceDeps struct {
imageResolver func(ctx context.Context, idOrName string) (model.Image, error)
imageWorkSeed func(ctx context.Context, image model.Image, fingerprint string) error
withVMLockByRef func(ctx context.Context, idOrName string, fn func(model.VMRecord) (model.VMRecord, error)) (model.VMRecord, error)
beginOperation func(name string, attrs ...any) *operationLog
beginOperation func(ctx context.Context, name string, attrs ...any) *operationLog
}
func newWorkspaceService(deps workspaceServiceDeps) *WorkspaceService {