package rpc import ( "bufio" "context" "encoding/json" "errors" "fmt" "net" "net/http" "os" "time" ) const Version = 1 type Request struct { Version int `json:"version"` Method string `json:"method"` Params json.RawMessage `json:"params,omitempty"` // OpID is the per-RPC correlation id. Optional on the wire so // older clients (which don't set it) and older servers (which // don't read it) keep interoperating. The daemon attaches it on // every incoming request via dispatch; rpc.Call forwards // whatever id is on ctx so a helper RPC carries the same id as // the daemon RPC that triggered it. OpID string `json:"op_id,omitempty"` } // opIDKey is the context-value key for the per-RPC correlation id // that flows from CLI → daemon → root helper. Lives in the rpc // package because rpc.Call needs to read it without depending on // the daemon package; daemon and roothelper both import it. type opIDKey struct{} // WithOpID stores opID on ctx. Used by the daemon dispatch layer to // inject the per-request id; rpc.Call picks it up automatically. func WithOpID(ctx context.Context, opID string) context.Context { if ctx == nil || opID == "" { return ctx } return context.WithValue(ctx, opIDKey{}, opID) } // OpIDFromContext returns the op id stored on ctx by WithOpID, or // "" if none was set. func OpIDFromContext(ctx context.Context) string { if ctx == nil { return "" } if id, _ := ctx.Value(opIDKey{}).(string); id != "" { return id } return "" } type Response struct { OK bool `json:"ok"` Result json.RawMessage `json:"result,omitempty"` Error *ErrorResponse `json:"error,omitempty"` } type ErrorResponse struct { Code string `json:"code"` Message string `json:"message"` // OpID is the daemon-assigned correlation id for the RPC that // produced this error. Optional and may be empty (older daemons // don't set it); when present the CLI surfaces it so an operator // can grep journalctl by that id and find the full context. OpID string `json:"op_id,omitempty"` } // Error makes ErrorResponse satisfy the error interface so callers // can errors.As it out of an rpc.Call return value and read the // structured fields directly. The default string form is // "code: message (op-id)" — the op id only appears when the daemon // attached one. CLI code paths that want a translated, user-facing // message render the typed fields themselves; this fallback is for // log lines, fmt.Errorf %w wrappers, and any caller that hasn't // bothered to errors.As yet. func (e *ErrorResponse) Error() string { if e == nil { return "" } if e.OpID == "" { return e.Code + ": " + e.Message } return e.Code + ": " + e.Message + " (" + e.OpID + ")" } func NewResult(v any) (Response, error) { data, err := json.Marshal(v) if err != nil { return Response{}, err } return Response{OK: true, Result: data}, nil } func NewError(code, message string) Response { return Response{OK: false, Error: &ErrorResponse{Code: code, Message: message}} } // NewErrorWithOpID is the variant for daemon dispatch sites that have // resolved an op id by the time they encode the response. func NewErrorWithOpID(code, message, opID string) Response { return Response{OK: false, Error: &ErrorResponse{Code: code, Message: message, OpID: opID}} } func DecodeParams[T any](req Request) (T, error) { var zero T if len(req.Params) == 0 { return zero, nil } var out T if err := json.Unmarshal(req.Params, &out); err != nil { return zero, err } return out, nil } func Call[T any](ctx context.Context, socketPath, method string, params any) (T, error) { var zero T dialer := &net.Dialer{Timeout: 2 * time.Second} conn, err := dialer.DialContext(ctx, "unix", socketPath) if err != nil { return zero, err } defer conn.Close() done := make(chan struct{}) defer close(done) go func() { select { case <-ctx.Done(): _ = conn.SetDeadline(time.Now()) _ = conn.Close() case <-done: } }() if deadline, ok := ctx.Deadline(); ok { _ = conn.SetDeadline(deadline) } request := Request{Version: Version, Method: method, OpID: OpIDFromContext(ctx)} if params != nil { raw, err := json.Marshal(params) if err != nil { return zero, err } request.Params = raw } if err := json.NewEncoder(conn).Encode(request); err != nil { if ctx.Err() != nil { return zero, ctx.Err() } return zero, err } var response Response if err := json.NewDecoder(bufio.NewReader(conn)).Decode(&response); err != nil { if ctx.Err() != nil { return zero, ctx.Err() } return zero, err } if !response.OK { if response.Error == nil { return zero, errors.New("rpc error") } // Return the typed error directly so callers that need code // or op_id can errors.As it out. err.Error() format is // preserved for callers that only print the message. return zero, response.Error } if len(response.Result) == 0 { return zero, nil } var result T if err := json.Unmarshal(response.Result, &result); err != nil { return zero, err } return result, nil } func WaitForSocket(path string, timeout time.Duration) error { deadline := time.Now().Add(timeout) for { if _, err := os.Stat(path); err == nil { conn, err := net.DialTimeout("unix", path, 500*time.Millisecond) if err == nil { _ = conn.Close() return nil } } if time.Now().After(deadline) { return fmt.Errorf("socket %s not ready", path) } time.Sleep(100 * time.Millisecond) } } func NewUnixHTTPClient(socketPath string) *http.Client { return &http.Client{ Transport: &http.Transport{ DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) { return (&net.Dialer{}).DialContext(ctx, "unix", socketPath) }, }, } }