Today there's no way to correlate a CLI failure with a daemon log line. operationLog records relative timing but no id, two concurrent vm.start calls log indistinguishably, and the async vmCreateOperationState.ID is user-facing yet never reaches the journal. The root helper logs plain text to stderr while bangerd logs JSON, so a merged journalctl is hard to grep across the trust-boundary split. Mint a per-RPC op id at dispatch entry, store it on context, and include it as an "op_id" attr on every operationLog record. The id is stamped onto every error response (including the early short-circuit paths bad_version and unknown_method). rpc.Call forwards the context op id on requests so a daemon RPC and the helper RPCs it triggers all share one id. The helper now logs JSON to match bangerd, adopts the inbound id, and emits a single "helper rpc completed" / "helper rpc failed" line per call so operators can see at a glance how long each privileged op took. vmCreateOperationState.ID is now the same id dispatch generated for vm.create.begin — one identifier between client status polls, daemon logs, and helper logs. The wire format gains two optional fields: rpc.Request.OpID and rpc.ErrorResponse.OpID, both omitempty so older peers (and the opposite direction) ignore them. ErrorResponse.Error() now appends "(op-XXXXXX)" to its string form when set; existing callers that just print err.Error() get the id for free. Tests cover: dispatch stamps op_id on unknown_method, bad_version, and handler-returned errors; rpc.Call exposes the typed *ErrorResponse via errors.As so the CLI can read code/op_id; ctx op_id is forwarded to the server in the request envelope. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
257 lines
6.8 KiB
Go
257 lines
6.8 KiB
Go
package rpc
|
|
|
|
import (
|
|
"bufio"
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"net"
|
|
"path/filepath"
|
|
"strings"
|
|
"testing"
|
|
"time"
|
|
)
|
|
|
|
func TestDecodeParams(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
type payload struct {
|
|
Name string `json:"name"`
|
|
}
|
|
|
|
got, err := DecodeParams[payload](Request{})
|
|
if err != nil {
|
|
t.Fatalf("DecodeParams(empty): %v", err)
|
|
}
|
|
if got.Name != "" {
|
|
t.Fatalf("DecodeParams(empty) = %+v, want zero value", got)
|
|
}
|
|
|
|
_, err = DecodeParams[payload](Request{Params: json.RawMessage(`{"name":`)})
|
|
if err == nil {
|
|
t.Fatal("DecodeParams(malformed) returned nil error")
|
|
}
|
|
}
|
|
|
|
func TestCallRoundTripSuccess(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
socketPath, cleanup := serveRPCOnce(t, func(conn net.Conn) {
|
|
defer conn.Close()
|
|
var req Request
|
|
if err := json.NewDecoder(bufio.NewReader(conn)).Decode(&req); err != nil {
|
|
t.Fatalf("decode request: %v", err)
|
|
}
|
|
if req.Version != Version || req.Method != "ping" {
|
|
t.Fatalf("unexpected request: %+v", req)
|
|
}
|
|
var params map[string]string
|
|
if err := json.Unmarshal(req.Params, ¶ms); err != nil {
|
|
t.Fatalf("unmarshal params: %v", err)
|
|
}
|
|
if params["name"] != "devbox" {
|
|
t.Fatalf("params = %v, want name=devbox", params)
|
|
}
|
|
resp, err := NewResult(map[string]string{"status": "ok"})
|
|
if err != nil {
|
|
t.Fatalf("NewResult: %v", err)
|
|
}
|
|
if err := json.NewEncoder(conn).Encode(resp); err != nil {
|
|
t.Fatalf("encode response: %v", err)
|
|
}
|
|
})
|
|
defer cleanup()
|
|
|
|
result, err := Call[map[string]string](context.Background(), socketPath, "ping", map[string]string{"name": "devbox"})
|
|
if err != nil {
|
|
t.Fatalf("Call: %v", err)
|
|
}
|
|
if result["status"] != "ok" {
|
|
t.Fatalf("Call() result = %v, want status=ok", result)
|
|
}
|
|
}
|
|
|
|
func TestCallReturnsRemoteError(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
socketPath, cleanup := serveRPCOnce(t, func(conn net.Conn) {
|
|
defer conn.Close()
|
|
var req Request
|
|
if err := json.NewDecoder(bufio.NewReader(conn)).Decode(&req); err != nil {
|
|
t.Fatalf("decode request: %v", err)
|
|
}
|
|
if err := json.NewEncoder(conn).Encode(NewError("operation_failed", "boom")); err != nil {
|
|
t.Fatalf("encode error response: %v", err)
|
|
}
|
|
})
|
|
defer cleanup()
|
|
|
|
_, err := Call[map[string]string](context.Background(), socketPath, "ping", nil)
|
|
if err == nil || !strings.Contains(err.Error(), "operation_failed: boom") {
|
|
t.Fatalf("Call() error = %v, want remote error", err)
|
|
}
|
|
}
|
|
|
|
func TestCallExposesTypedErrorWithOpID(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
socketPath, cleanup := serveRPCOnce(t, func(conn net.Conn) {
|
|
defer conn.Close()
|
|
var req Request
|
|
if err := json.NewDecoder(bufio.NewReader(conn)).Decode(&req); err != nil {
|
|
t.Fatalf("decode request: %v", err)
|
|
}
|
|
if err := json.NewEncoder(conn).Encode(NewErrorWithOpID("not_found", "vm \"foo\" not found", "op-deadbeef00ff")); err != nil {
|
|
t.Fatalf("encode error response: %v", err)
|
|
}
|
|
})
|
|
defer cleanup()
|
|
|
|
_, err := Call[map[string]string](context.Background(), socketPath, "vm.show", nil)
|
|
if err == nil {
|
|
t.Fatal("Call() returned nil error")
|
|
}
|
|
var rpcErr *ErrorResponse
|
|
if !errors.As(err, &rpcErr) {
|
|
t.Fatalf("Call() error %T (%v) is not *ErrorResponse — CLI cannot read the op_id", err, err)
|
|
}
|
|
if rpcErr.Code != "not_found" || rpcErr.OpID != "op-deadbeef00ff" {
|
|
t.Fatalf("typed error = %+v, want code=not_found op-deadbeef00ff", rpcErr)
|
|
}
|
|
// String form keeps the op_id in parens so callers that only
|
|
// log err.Error() still surface the id.
|
|
if got := rpcErr.Error(); !strings.Contains(got, "(op-deadbeef00ff)") {
|
|
t.Fatalf("err.Error() = %q, want op-id suffix", got)
|
|
}
|
|
}
|
|
|
|
func TestCallForwardsOpIDFromContext(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
var seenReq Request
|
|
socketPath, cleanup := serveRPCOnce(t, func(conn net.Conn) {
|
|
defer conn.Close()
|
|
if err := json.NewDecoder(bufio.NewReader(conn)).Decode(&seenReq); err != nil {
|
|
t.Fatalf("decode request: %v", err)
|
|
}
|
|
resp, _ := NewResult(map[string]string{"status": "ok"})
|
|
_ = json.NewEncoder(conn).Encode(resp)
|
|
})
|
|
defer cleanup()
|
|
|
|
ctx := WithOpID(context.Background(), "op-cafef00d1234")
|
|
if _, err := Call[map[string]string](ctx, socketPath, "ping", nil); err != nil {
|
|
t.Fatalf("Call: %v", err)
|
|
}
|
|
if seenReq.OpID != "op-cafef00d1234" {
|
|
t.Fatalf("server saw op_id = %q, want op-cafef00d1234", seenReq.OpID)
|
|
}
|
|
}
|
|
|
|
func TestCallRejectsMalformedResponse(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
socketPath, cleanup := serveRPCOnce(t, func(conn net.Conn) {
|
|
defer conn.Close()
|
|
_, _ = conn.Write([]byte("{not-json}\n"))
|
|
})
|
|
defer cleanup()
|
|
|
|
_, err := Call[map[string]string](context.Background(), socketPath, "ping", nil)
|
|
if err == nil {
|
|
t.Fatal("Call() returned nil error for malformed response")
|
|
}
|
|
}
|
|
|
|
func TestCallHonorsContextDeadline(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
socketPath, cleanup := serveRPCOnce(t, func(conn net.Conn) {
|
|
defer conn.Close()
|
|
var req Request
|
|
if err := json.NewDecoder(bufio.NewReader(conn)).Decode(&req); err != nil {
|
|
t.Fatalf("decode request: %v", err)
|
|
}
|
|
time.Sleep(100 * time.Millisecond)
|
|
})
|
|
defer cleanup()
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Millisecond)
|
|
defer cancel()
|
|
_, err := Call[map[string]string](ctx, socketPath, "ping", nil)
|
|
if err == nil {
|
|
t.Fatal("Call() returned nil error for deadline")
|
|
}
|
|
}
|
|
|
|
func TestCallHonorsContextCancellationWithoutDeadline(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
socketPath, cleanup := serveRPCOnce(t, func(conn net.Conn) {
|
|
defer conn.Close()
|
|
var req Request
|
|
if err := json.NewDecoder(bufio.NewReader(conn)).Decode(&req); err != nil {
|
|
t.Fatalf("decode request: %v", err)
|
|
}
|
|
var buf [1]byte
|
|
_, _ = conn.Read(buf[:])
|
|
})
|
|
defer cleanup()
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
go func() {
|
|
time.Sleep(20 * time.Millisecond)
|
|
cancel()
|
|
}()
|
|
|
|
_, err := Call[map[string]string](ctx, socketPath, "ping", nil)
|
|
if !errors.Is(err, context.Canceled) {
|
|
t.Fatalf("Call() error = %v, want context canceled", err)
|
|
}
|
|
}
|
|
|
|
func TestWaitForSocket(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
socketPath, cleanup := serveRPCOnce(t, func(conn net.Conn) {
|
|
_ = conn.Close()
|
|
})
|
|
defer cleanup()
|
|
|
|
if err := WaitForSocket(socketPath, 2*time.Second); err != nil {
|
|
t.Fatalf("WaitForSocket(success): %v", err)
|
|
}
|
|
|
|
err := WaitForSocket(filepath.Join(t.TempDir(), "missing.sock"), 50*time.Millisecond)
|
|
if err == nil || !strings.Contains(err.Error(), "not ready") {
|
|
t.Fatalf("WaitForSocket(timeout) error = %v, want timeout", err)
|
|
}
|
|
}
|
|
|
|
func serveRPCOnce(t *testing.T, handler func(net.Conn)) (string, func()) {
|
|
t.Helper()
|
|
|
|
socketPath := filepath.Join(t.TempDir(), "rpc.sock")
|
|
listener, err := net.Listen("unix", socketPath)
|
|
if err != nil {
|
|
t.Fatalf("Listen: %v", err)
|
|
}
|
|
done := make(chan struct{})
|
|
go func() {
|
|
defer close(done)
|
|
conn, err := listener.Accept()
|
|
if err != nil {
|
|
if !errors.Is(err, net.ErrClosed) {
|
|
t.Errorf("Accept: %v", err)
|
|
}
|
|
return
|
|
}
|
|
handler(conn)
|
|
}()
|
|
|
|
cleanup := func() {
|
|
_ = listener.Close()
|
|
<-done
|
|
}
|
|
return socketPath, cleanup
|
|
}
|