banger/internal/rpc/rpc.go
Thales Maciel e47b8146dc
daemon: thread per-RPC op_id end-to-end
Today there's no way to correlate a CLI failure with a daemon log
line. operationLog records relative timing but no id, two concurrent
vm.start calls log indistinguishably, and the async
vmCreateOperationState.ID is user-facing yet never reaches the
journal. The root helper logs plain text to stderr while bangerd
logs JSON, so a merged journalctl is hard to grep across the
trust-boundary split.

Mint a per-RPC op id at dispatch entry, store it on context, and
include it as an "op_id" attr on every operationLog record. The
id is stamped onto every error response (including the early
short-circuit paths bad_version and unknown_method). rpc.Call
forwards the context op id on requests so a daemon RPC and the
helper RPCs it triggers all share one id. The helper now logs
JSON to match bangerd, adopts the inbound id, and emits a single
"helper rpc completed" / "helper rpc failed" line per call so
operators can see at a glance how long each privileged op took.

vmCreateOperationState.ID is now the same id dispatch generated
for vm.create.begin — one identifier between client status polls,
daemon logs, and helper logs.

The wire format gains two optional fields: rpc.Request.OpID and
rpc.ErrorResponse.OpID, both omitempty so older peers (and the
opposite direction) ignore them. ErrorResponse.Error() now appends
"(op-XXXXXX)" to its string form when set; existing callers that
just print err.Error() get the id for free.

Tests cover: dispatch stamps op_id on unknown_method, bad_version,
and handler-returned errors; rpc.Call exposes the typed
*ErrorResponse via errors.As so the CLI can read code/op_id; ctx
op_id is forwarded to the server in the request envelope.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-26 22:13:44 -03:00

211 lines
5.6 KiB
Go

package rpc
import (
"bufio"
"context"
"encoding/json"
"errors"
"fmt"
"net"
"net/http"
"os"
"time"
)
const Version = 1
type Request struct {
Version int `json:"version"`
Method string `json:"method"`
Params json.RawMessage `json:"params,omitempty"`
// OpID is the per-RPC correlation id. Optional on the wire so
// older clients (which don't set it) and older servers (which
// don't read it) keep interoperating. The daemon attaches it on
// every incoming request via dispatch; rpc.Call forwards
// whatever id is on ctx so a helper RPC carries the same id as
// the daemon RPC that triggered it.
OpID string `json:"op_id,omitempty"`
}
// opIDKey is the context-value key for the per-RPC correlation id
// that flows from CLI → daemon → root helper. Lives in the rpc
// package because rpc.Call needs to read it without depending on
// the daemon package; daemon and roothelper both import it.
type opIDKey struct{}
// WithOpID stores opID on ctx. Used by the daemon dispatch layer to
// inject the per-request id; rpc.Call picks it up automatically.
func WithOpID(ctx context.Context, opID string) context.Context {
if ctx == nil || opID == "" {
return ctx
}
return context.WithValue(ctx, opIDKey{}, opID)
}
// OpIDFromContext returns the op id stored on ctx by WithOpID, or
// "" if none was set.
func OpIDFromContext(ctx context.Context) string {
if ctx == nil {
return ""
}
if id, _ := ctx.Value(opIDKey{}).(string); id != "" {
return id
}
return ""
}
type Response struct {
OK bool `json:"ok"`
Result json.RawMessage `json:"result,omitempty"`
Error *ErrorResponse `json:"error,omitempty"`
}
type ErrorResponse struct {
Code string `json:"code"`
Message string `json:"message"`
// OpID is the daemon-assigned correlation id for the RPC that
// produced this error. Optional and may be empty (older daemons
// don't set it); when present the CLI surfaces it so an operator
// can grep journalctl by that id and find the full context.
OpID string `json:"op_id,omitempty"`
}
// Error makes ErrorResponse satisfy the error interface so callers
// can errors.As it out of an rpc.Call return value and read the
// structured fields directly. The default string form is
// "code: message (op-id)" — the op id only appears when the daemon
// attached one. CLI code paths that want a translated, user-facing
// message render the typed fields themselves; this fallback is for
// log lines, fmt.Errorf %w wrappers, and any caller that hasn't
// bothered to errors.As yet.
func (e *ErrorResponse) Error() string {
if e == nil {
return ""
}
if e.OpID == "" {
return e.Code + ": " + e.Message
}
return e.Code + ": " + e.Message + " (" + e.OpID + ")"
}
func NewResult(v any) (Response, error) {
data, err := json.Marshal(v)
if err != nil {
return Response{}, err
}
return Response{OK: true, Result: data}, nil
}
func NewError(code, message string) Response {
return Response{OK: false, Error: &ErrorResponse{Code: code, Message: message}}
}
// NewErrorWithOpID is the variant for daemon dispatch sites that have
// resolved an op id by the time they encode the response.
func NewErrorWithOpID(code, message, opID string) Response {
return Response{OK: false, Error: &ErrorResponse{Code: code, Message: message, OpID: opID}}
}
func DecodeParams[T any](req Request) (T, error) {
var zero T
if len(req.Params) == 0 {
return zero, nil
}
var out T
if err := json.Unmarshal(req.Params, &out); err != nil {
return zero, err
}
return out, nil
}
func Call[T any](ctx context.Context, socketPath, method string, params any) (T, error) {
var zero T
dialer := &net.Dialer{Timeout: 2 * time.Second}
conn, err := dialer.DialContext(ctx, "unix", socketPath)
if err != nil {
return zero, err
}
defer conn.Close()
done := make(chan struct{})
defer close(done)
go func() {
select {
case <-ctx.Done():
_ = conn.SetDeadline(time.Now())
_ = conn.Close()
case <-done:
}
}()
if deadline, ok := ctx.Deadline(); ok {
_ = conn.SetDeadline(deadline)
}
request := Request{Version: Version, Method: method, OpID: OpIDFromContext(ctx)}
if params != nil {
raw, err := json.Marshal(params)
if err != nil {
return zero, err
}
request.Params = raw
}
if err := json.NewEncoder(conn).Encode(request); err != nil {
if ctx.Err() != nil {
return zero, ctx.Err()
}
return zero, err
}
var response Response
if err := json.NewDecoder(bufio.NewReader(conn)).Decode(&response); err != nil {
if ctx.Err() != nil {
return zero, ctx.Err()
}
return zero, err
}
if !response.OK {
if response.Error == nil {
return zero, errors.New("rpc error")
}
// Return the typed error directly so callers that need code
// or op_id can errors.As it out. err.Error() format is
// preserved for callers that only print the message.
return zero, response.Error
}
if len(response.Result) == 0 {
return zero, nil
}
var result T
if err := json.Unmarshal(response.Result, &result); err != nil {
return zero, err
}
return result, nil
}
func WaitForSocket(path string, timeout time.Duration) error {
deadline := time.Now().Add(timeout)
for {
if _, err := os.Stat(path); err == nil {
conn, err := net.DialTimeout("unix", path, 500*time.Millisecond)
if err == nil {
_ = conn.Close()
return nil
}
}
if time.Now().After(deadline) {
return fmt.Errorf("socket %s not ready", path)
}
time.Sleep(100 * time.Millisecond)
}
}
func NewUnixHTTPClient(socketPath string) *http.Client {
return &http.Client{
Transport: &http.Transport{
DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) {
return (&net.Dialer{}).DialContext(ctx, "unix", socketPath)
},
},
}
}