Today there's no way to correlate a CLI failure with a daemon log line. operationLog records relative timing but no id, two concurrent vm.start calls log indistinguishably, and the async vmCreateOperationState.ID is user-facing yet never reaches the journal. The root helper logs plain text to stderr while bangerd logs JSON, so a merged journalctl is hard to grep across the trust-boundary split. Mint a per-RPC op id at dispatch entry, store it on context, and include it as an "op_id" attr on every operationLog record. The id is stamped onto every error response (including the early short-circuit paths bad_version and unknown_method). rpc.Call forwards the context op id on requests so a daemon RPC and the helper RPCs it triggers all share one id. The helper now logs JSON to match bangerd, adopts the inbound id, and emits a single "helper rpc completed" / "helper rpc failed" line per call so operators can see at a glance how long each privileged op took. vmCreateOperationState.ID is now the same id dispatch generated for vm.create.begin — one identifier between client status polls, daemon logs, and helper logs. The wire format gains two optional fields: rpc.Request.OpID and rpc.ErrorResponse.OpID, both omitempty so older peers (and the opposite direction) ignore them. ErrorResponse.Error() now appends "(op-XXXXXX)" to its string form when set; existing callers that just print err.Error() get the id for free. Tests cover: dispatch stamps op_id on unknown_method, bad_version, and handler-returned errors; rpc.Call exposes the typed *ErrorResponse via errors.As so the CLI can read code/op_id; ctx op_id is forwarded to the server in the request envelope. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
211 lines
5.6 KiB
Go
211 lines
5.6 KiB
Go
package rpc
|
|
|
|
import (
|
|
"bufio"
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"net"
|
|
"net/http"
|
|
"os"
|
|
"time"
|
|
)
|
|
|
|
const Version = 1
|
|
|
|
type Request struct {
|
|
Version int `json:"version"`
|
|
Method string `json:"method"`
|
|
Params json.RawMessage `json:"params,omitempty"`
|
|
// OpID is the per-RPC correlation id. Optional on the wire so
|
|
// older clients (which don't set it) and older servers (which
|
|
// don't read it) keep interoperating. The daemon attaches it on
|
|
// every incoming request via dispatch; rpc.Call forwards
|
|
// whatever id is on ctx so a helper RPC carries the same id as
|
|
// the daemon RPC that triggered it.
|
|
OpID string `json:"op_id,omitempty"`
|
|
}
|
|
|
|
// opIDKey is the context-value key for the per-RPC correlation id
|
|
// that flows from CLI → daemon → root helper. Lives in the rpc
|
|
// package because rpc.Call needs to read it without depending on
|
|
// the daemon package; daemon and roothelper both import it.
|
|
type opIDKey struct{}
|
|
|
|
// WithOpID stores opID on ctx. Used by the daemon dispatch layer to
|
|
// inject the per-request id; rpc.Call picks it up automatically.
|
|
func WithOpID(ctx context.Context, opID string) context.Context {
|
|
if ctx == nil || opID == "" {
|
|
return ctx
|
|
}
|
|
return context.WithValue(ctx, opIDKey{}, opID)
|
|
}
|
|
|
|
// OpIDFromContext returns the op id stored on ctx by WithOpID, or
|
|
// "" if none was set.
|
|
func OpIDFromContext(ctx context.Context) string {
|
|
if ctx == nil {
|
|
return ""
|
|
}
|
|
if id, _ := ctx.Value(opIDKey{}).(string); id != "" {
|
|
return id
|
|
}
|
|
return ""
|
|
}
|
|
|
|
type Response struct {
|
|
OK bool `json:"ok"`
|
|
Result json.RawMessage `json:"result,omitempty"`
|
|
Error *ErrorResponse `json:"error,omitempty"`
|
|
}
|
|
|
|
type ErrorResponse struct {
|
|
Code string `json:"code"`
|
|
Message string `json:"message"`
|
|
// OpID is the daemon-assigned correlation id for the RPC that
|
|
// produced this error. Optional and may be empty (older daemons
|
|
// don't set it); when present the CLI surfaces it so an operator
|
|
// can grep journalctl by that id and find the full context.
|
|
OpID string `json:"op_id,omitempty"`
|
|
}
|
|
|
|
// Error makes ErrorResponse satisfy the error interface so callers
|
|
// can errors.As it out of an rpc.Call return value and read the
|
|
// structured fields directly. The default string form is
|
|
// "code: message (op-id)" — the op id only appears when the daemon
|
|
// attached one. CLI code paths that want a translated, user-facing
|
|
// message render the typed fields themselves; this fallback is for
|
|
// log lines, fmt.Errorf %w wrappers, and any caller that hasn't
|
|
// bothered to errors.As yet.
|
|
func (e *ErrorResponse) Error() string {
|
|
if e == nil {
|
|
return ""
|
|
}
|
|
if e.OpID == "" {
|
|
return e.Code + ": " + e.Message
|
|
}
|
|
return e.Code + ": " + e.Message + " (" + e.OpID + ")"
|
|
}
|
|
|
|
func NewResult(v any) (Response, error) {
|
|
data, err := json.Marshal(v)
|
|
if err != nil {
|
|
return Response{}, err
|
|
}
|
|
return Response{OK: true, Result: data}, nil
|
|
}
|
|
|
|
func NewError(code, message string) Response {
|
|
return Response{OK: false, Error: &ErrorResponse{Code: code, Message: message}}
|
|
}
|
|
|
|
// NewErrorWithOpID is the variant for daemon dispatch sites that have
|
|
// resolved an op id by the time they encode the response.
|
|
func NewErrorWithOpID(code, message, opID string) Response {
|
|
return Response{OK: false, Error: &ErrorResponse{Code: code, Message: message, OpID: opID}}
|
|
}
|
|
|
|
func DecodeParams[T any](req Request) (T, error) {
|
|
var zero T
|
|
if len(req.Params) == 0 {
|
|
return zero, nil
|
|
}
|
|
var out T
|
|
if err := json.Unmarshal(req.Params, &out); err != nil {
|
|
return zero, err
|
|
}
|
|
return out, nil
|
|
}
|
|
|
|
func Call[T any](ctx context.Context, socketPath, method string, params any) (T, error) {
|
|
var zero T
|
|
dialer := &net.Dialer{Timeout: 2 * time.Second}
|
|
conn, err := dialer.DialContext(ctx, "unix", socketPath)
|
|
if err != nil {
|
|
return zero, err
|
|
}
|
|
defer conn.Close()
|
|
done := make(chan struct{})
|
|
defer close(done)
|
|
go func() {
|
|
select {
|
|
case <-ctx.Done():
|
|
_ = conn.SetDeadline(time.Now())
|
|
_ = conn.Close()
|
|
case <-done:
|
|
}
|
|
}()
|
|
|
|
if deadline, ok := ctx.Deadline(); ok {
|
|
_ = conn.SetDeadline(deadline)
|
|
}
|
|
|
|
request := Request{Version: Version, Method: method, OpID: OpIDFromContext(ctx)}
|
|
if params != nil {
|
|
raw, err := json.Marshal(params)
|
|
if err != nil {
|
|
return zero, err
|
|
}
|
|
request.Params = raw
|
|
}
|
|
|
|
if err := json.NewEncoder(conn).Encode(request); err != nil {
|
|
if ctx.Err() != nil {
|
|
return zero, ctx.Err()
|
|
}
|
|
return zero, err
|
|
}
|
|
|
|
var response Response
|
|
if err := json.NewDecoder(bufio.NewReader(conn)).Decode(&response); err != nil {
|
|
if ctx.Err() != nil {
|
|
return zero, ctx.Err()
|
|
}
|
|
return zero, err
|
|
}
|
|
if !response.OK {
|
|
if response.Error == nil {
|
|
return zero, errors.New("rpc error")
|
|
}
|
|
// Return the typed error directly so callers that need code
|
|
// or op_id can errors.As it out. err.Error() format is
|
|
// preserved for callers that only print the message.
|
|
return zero, response.Error
|
|
}
|
|
if len(response.Result) == 0 {
|
|
return zero, nil
|
|
}
|
|
var result T
|
|
if err := json.Unmarshal(response.Result, &result); err != nil {
|
|
return zero, err
|
|
}
|
|
return result, nil
|
|
}
|
|
|
|
func WaitForSocket(path string, timeout time.Duration) error {
|
|
deadline := time.Now().Add(timeout)
|
|
for {
|
|
if _, err := os.Stat(path); err == nil {
|
|
conn, err := net.DialTimeout("unix", path, 500*time.Millisecond)
|
|
if err == nil {
|
|
_ = conn.Close()
|
|
return nil
|
|
}
|
|
}
|
|
if time.Now().After(deadline) {
|
|
return fmt.Errorf("socket %s not ready", path)
|
|
}
|
|
time.Sleep(100 * time.Millisecond)
|
|
}
|
|
}
|
|
|
|
func NewUnixHTTPClient(socketPath string) *http.Client {
|
|
return &http.Client{
|
|
Transport: &http.Transport{
|
|
DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) {
|
|
return (&net.Dialer{}).DialContext(ctx, "unix", socketPath)
|
|
},
|
|
},
|
|
}
|
|
}
|