banger/internal/daemon/daemon.go
Thales Maciel 3aec590deb
vmservice: delete dead guestWaitForSSH + guestDial seams
VMService carried guestWaitForSSH and guestDial fields + matching
constructor args ever since the daemon split, but nothing on
VMService ever read them. The live guest-SSH path runs on *Daemon
(d.waitForGuestSSH / d.dialGuest in guest_ssh.go); WorkspaceService
reaches those through closures wired in wireServices.

So the VMService copies were refactor residue: they made the
service look more decoupled than it actually is, and any future
test that stubbed VMService.guestDial would be stubbing nothing.
Delete the fields, the deps entries, the newVMService assignments,
and the wireServices passes.

Real seams on *Daemon are unchanged — those are the ones tests
(e.g. workspace_test.go) already set directly.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-22 12:45:27 -03:00

680 lines
21 KiB
Go

package daemon
import (
"bufio"
"context"
"encoding/json"
"errors"
"fmt"
"log/slog"
"net"
"os"
"sync"
"time"
"banger/internal/api"
"banger/internal/buildinfo"
"banger/internal/config"
ws "banger/internal/daemon/workspace"
"banger/internal/model"
"banger/internal/paths"
"banger/internal/rpc"
"banger/internal/store"
"banger/internal/system"
"banger/internal/vmdns"
)
// Daemon is the composition root: shared infrastructure (store,
// runner, logger, layout, config) plus pointers to the four focused
// services that own behavior. Open wires the services; the dispatch
// loop forwards RPCs to them. No lifecycle / image / workspace /
// networking behavior lives on *Daemon itself — it's wiring.
type Daemon struct {
layout paths.Layout
config model.DaemonConfig
store *store.Store
runner system.CommandRunner
logger *slog.Logger
net *HostNetwork
img *ImageService
ws *WorkspaceService
vm *VMService
closing chan struct{}
once sync.Once
pid int
listener net.Listener
vmCaps []vmCapability
requestHandler func(context.Context, rpc.Request) rpc.Response
guestWaitForSSH func(context.Context, string, string, time.Duration) error
guestDial func(context.Context, string, string) (guestSSHClient, error)
}
func Open(ctx context.Context) (d *Daemon, err error) {
layout, err := paths.Resolve()
if err != nil {
return nil, err
}
if err := paths.Ensure(layout); err != nil {
return nil, err
}
cfg, err := config.Load(layout)
if err != nil {
return nil, err
}
logger, normalizedLevel, err := newDaemonLogger(os.Stderr, cfg.LogLevel)
if err != nil {
return nil, err
}
cfg.LogLevel = normalizedLevel
db, err := store.Open(layout.DBPath)
if err != nil {
return nil, err
}
closing := make(chan struct{})
runner := system.NewRunner()
d = &Daemon{
layout: layout,
config: cfg,
store: db,
runner: runner,
logger: logger,
closing: closing,
pid: os.Getpid(),
}
wireServices(d)
// From here on, every failure path must run Close() so the host
// state we touched (DNS listener goroutine, resolvectl routing,
// SQLite handle, future side effects) gets unwound. Close is
// idempotent + nil-guarded so it's safe to call on a partially
// initialised daemon — `d.vmDNS == nil` and friends short-circuit
// the teardown of components we never set up.
defer func() {
if err != nil {
_ = d.Close()
}
}()
d.ensureVMSSHClientConfig()
d.logger.Info("daemon opened", "socket", layout.SocketPath, "state_dir", layout.StateDir, "log_level", cfg.LogLevel)
if err = d.net.startVMDNS(vmdns.DefaultListenAddr); err != nil {
d.logger.Error("daemon open failed", "stage", "start_vm_dns", "error", err.Error())
return nil, err
}
if err = d.reconcile(ctx); err != nil {
d.logger.Error("daemon open failed", "stage", "reconcile", "error", err.Error())
return nil, err
}
d.net.ensureVMDNSResolverRouting(ctx)
// Seed HostNetwork's pool index from taps already claimed by VMs
// on disk so newly warmed pool entries don't collide with them.
if d.config.TapPoolSize > 0 && d.store != nil {
vms, listErr := d.store.ListVMs(ctx)
if listErr != nil {
d.logger.Error("daemon open failed", "stage", "initialize_tap_pool", "error", listErr.Error())
return nil, listErr
}
used := make([]string, 0, len(vms))
for _, vm := range vms {
if tap := d.vm.vmHandles(vm.ID).TapDevice; tap != "" {
used = append(used, tap)
}
}
d.net.initializeTapPool(used)
}
go d.net.ensureTapPool(context.Background())
return d, nil
}
func (d *Daemon) Close() error {
var err error
d.once.Do(func() {
if d.logger != nil {
d.logger.Info("daemon closing")
}
close(d.closing)
if d.listener != nil {
_ = d.listener.Close()
}
var closeErr error
if d.store != nil {
closeErr = d.store.Close()
}
err = errors.Join(d.net.clearVMDNSResolverRouting(context.Background()), d.net.stopVMDNS(), closeErr)
})
return err
}
func (d *Daemon) Serve(ctx context.Context) error {
_ = os.Remove(d.layout.SocketPath)
listener, err := net.Listen("unix", d.layout.SocketPath)
if err != nil {
if d.logger != nil {
d.logger.Error("daemon listen failed", "socket", d.layout.SocketPath, "error", err.Error())
}
return err
}
d.listener = listener
defer listener.Close()
defer os.Remove(d.layout.SocketPath)
if err := os.Chmod(d.layout.SocketPath, 0o600); err != nil {
return err
}
if d.logger != nil {
d.logger.Info("daemon serving", "socket", d.layout.SocketPath, "pid", d.pid)
}
go d.backgroundLoop()
for {
conn, err := listener.Accept()
if err != nil {
select {
case <-ctx.Done():
return nil
case <-d.closing:
return nil
default:
}
if _, ok := err.(net.Error); ok {
if d.logger != nil {
d.logger.Warn("daemon accept temporary failure", "error", err.Error())
}
time.Sleep(100 * time.Millisecond)
continue
}
if d.logger != nil {
d.logger.Error("daemon accept failed", "error", err.Error())
}
return err
}
go d.handleConn(conn)
}
}
func (d *Daemon) handleConn(conn net.Conn) {
defer conn.Close()
reader := bufio.NewReader(conn)
var req rpc.Request
if err := json.NewDecoder(reader).Decode(&req); err != nil {
if d.logger != nil {
d.logger.Warn("daemon request decode failed", "remote", conn.RemoteAddr().String(), "error", err.Error())
}
_ = json.NewEncoder(conn).Encode(rpc.NewError("bad_request", err.Error()))
return
}
reqCtx, cancel := context.WithCancel(context.Background())
defer cancel()
stopWatch := d.watchRequestDisconnect(conn, reader, req.Method, cancel)
defer stopWatch()
resp := d.dispatch(reqCtx, req)
if reqCtx.Err() != nil {
return
}
if err := json.NewEncoder(conn).Encode(resp); err != nil && d.logger != nil {
d.logger.Warn("daemon response encode failed", "method", req.Method, "remote", conn.RemoteAddr().String(), "error", err.Error())
}
}
func (d *Daemon) watchRequestDisconnect(conn net.Conn, reader *bufio.Reader, method string, cancel context.CancelFunc) func() {
if conn == nil || reader == nil {
return func() {}
}
done := make(chan struct{})
var once sync.Once
go func() {
go func() {
<-done
if deadlineSetter, ok := conn.(interface{ SetReadDeadline(time.Time) error }); ok {
_ = deadlineSetter.SetReadDeadline(time.Now())
}
}()
var buf [1]byte
for {
_, err := reader.Read(buf[:])
if err == nil {
continue
}
select {
case <-done:
return
default:
}
if d.logger != nil {
d.logger.Info("daemon request canceled", "method", method, "remote", conn.RemoteAddr().String(), "error", err.Error())
}
cancel()
return
}
}()
return func() {
once.Do(func() {
close(done)
})
}
}
func (d *Daemon) dispatch(ctx context.Context, req rpc.Request) rpc.Response {
if req.Version != rpc.Version {
return rpc.NewError("bad_version", fmt.Sprintf("unsupported version %d", req.Version))
}
if d.requestHandler != nil {
return d.requestHandler(ctx, req)
}
switch req.Method {
case "ping":
info := buildinfo.Current()
result, _ := rpc.NewResult(api.PingResult{
Status: "ok",
PID: d.pid,
Version: info.Version,
Commit: info.Commit,
BuiltAt: info.BuiltAt,
})
return result
case "shutdown":
go d.Close()
result, _ := rpc.NewResult(api.ShutdownResult{Status: "stopping"})
return result
case "vm.create":
params, err := rpc.DecodeParams[api.VMCreateParams](req)
if err != nil {
return rpc.NewError("bad_request", err.Error())
}
vm, err := d.vm.CreateVM(ctx, params)
return marshalResultOrError(api.VMShowResult{VM: vm}, err)
case "vm.create.begin":
params, err := rpc.DecodeParams[api.VMCreateParams](req)
if err != nil {
return rpc.NewError("bad_request", err.Error())
}
op, err := d.vm.BeginVMCreate(ctx, params)
return marshalResultOrError(api.VMCreateBeginResult{Operation: op}, err)
case "vm.create.status":
params, err := rpc.DecodeParams[api.VMCreateStatusParams](req)
if err != nil {
return rpc.NewError("bad_request", err.Error())
}
op, err := d.vm.VMCreateStatus(ctx, params.ID)
return marshalResultOrError(api.VMCreateStatusResult{Operation: op}, err)
case "vm.create.cancel":
params, err := rpc.DecodeParams[api.VMCreateStatusParams](req)
if err != nil {
return rpc.NewError("bad_request", err.Error())
}
err = d.vm.CancelVMCreate(ctx, params.ID)
return marshalResultOrError(api.Empty{}, err)
case "vm.list":
vms, err := d.store.ListVMs(ctx)
return marshalResultOrError(api.VMListResult{VMs: vms}, err)
case "vm.show":
params, err := rpc.DecodeParams[api.VMRefParams](req)
if err != nil {
return rpc.NewError("bad_request", err.Error())
}
vm, err := d.vm.FindVM(ctx, params.IDOrName)
return marshalResultOrError(api.VMShowResult{VM: vm}, err)
case "vm.start":
params, err := rpc.DecodeParams[api.VMRefParams](req)
if err != nil {
return rpc.NewError("bad_request", err.Error())
}
vm, err := d.vm.StartVM(ctx, params.IDOrName)
return marshalResultOrError(api.VMShowResult{VM: vm}, err)
case "vm.stop":
params, err := rpc.DecodeParams[api.VMRefParams](req)
if err != nil {
return rpc.NewError("bad_request", err.Error())
}
vm, err := d.vm.StopVM(ctx, params.IDOrName)
return marshalResultOrError(api.VMShowResult{VM: vm}, err)
case "vm.kill":
params, err := rpc.DecodeParams[api.VMKillParams](req)
if err != nil {
return rpc.NewError("bad_request", err.Error())
}
vm, err := d.vm.KillVM(ctx, params)
return marshalResultOrError(api.VMShowResult{VM: vm}, err)
case "vm.restart":
params, err := rpc.DecodeParams[api.VMRefParams](req)
if err != nil {
return rpc.NewError("bad_request", err.Error())
}
vm, err := d.vm.RestartVM(ctx, params.IDOrName)
return marshalResultOrError(api.VMShowResult{VM: vm}, err)
case "vm.delete":
params, err := rpc.DecodeParams[api.VMRefParams](req)
if err != nil {
return rpc.NewError("bad_request", err.Error())
}
vm, err := d.vm.DeleteVM(ctx, params.IDOrName)
return marshalResultOrError(api.VMShowResult{VM: vm}, err)
case "vm.set":
params, err := rpc.DecodeParams[api.VMSetParams](req)
if err != nil {
return rpc.NewError("bad_request", err.Error())
}
vm, err := d.vm.SetVM(ctx, params)
return marshalResultOrError(api.VMShowResult{VM: vm}, err)
case "vm.stats":
params, err := rpc.DecodeParams[api.VMRefParams](req)
if err != nil {
return rpc.NewError("bad_request", err.Error())
}
vm, stats, err := d.vm.GetVMStats(ctx, params.IDOrName)
return marshalResultOrError(api.VMStatsResult{VM: vm, Stats: stats}, err)
case "vm.logs":
params, err := rpc.DecodeParams[api.VMRefParams](req)
if err != nil {
return rpc.NewError("bad_request", err.Error())
}
vm, err := d.vm.FindVM(ctx, params.IDOrName)
if err != nil {
return rpc.NewError("not_found", err.Error())
}
return marshalResultOrError(api.VMLogsResult{LogPath: vm.Runtime.LogPath}, nil)
case "vm.ssh":
params, err := rpc.DecodeParams[api.VMRefParams](req)
if err != nil {
return rpc.NewError("bad_request", err.Error())
}
vm, err := d.vm.TouchVM(ctx, params.IDOrName)
if err != nil {
return rpc.NewError("not_found", err.Error())
}
if !d.vm.vmAlive(vm) {
return rpc.NewError("not_running", fmt.Sprintf("vm %s is not running", vm.Name))
}
return marshalResultOrError(api.VMSSHResult{Name: vm.Name, GuestIP: vm.Runtime.GuestIP}, nil)
case "vm.health":
params, err := rpc.DecodeParams[api.VMRefParams](req)
if err != nil {
return rpc.NewError("bad_request", err.Error())
}
result, err := d.vm.HealthVM(ctx, params.IDOrName)
return marshalResultOrError(result, err)
case "vm.ping":
params, err := rpc.DecodeParams[api.VMRefParams](req)
if err != nil {
return rpc.NewError("bad_request", err.Error())
}
result, err := d.vm.PingVM(ctx, params.IDOrName)
return marshalResultOrError(result, err)
case "vm.ports":
params, err := rpc.DecodeParams[api.VMRefParams](req)
if err != nil {
return rpc.NewError("bad_request", err.Error())
}
result, err := d.vm.PortsVM(ctx, params.IDOrName)
return marshalResultOrError(result, err)
case "vm.workspace.prepare":
params, err := rpc.DecodeParams[api.VMWorkspacePrepareParams](req)
if err != nil {
return rpc.NewError("bad_request", err.Error())
}
workspace, err := d.ws.PrepareVMWorkspace(ctx, params)
return marshalResultOrError(api.VMWorkspacePrepareResult{Workspace: workspace}, err)
case "vm.workspace.export":
params, err := rpc.DecodeParams[api.WorkspaceExportParams](req)
if err != nil {
return rpc.NewError("bad_request", err.Error())
}
result, err := d.ws.ExportVMWorkspace(ctx, params)
return marshalResultOrError(result, err)
case "image.list":
images, err := d.store.ListImages(ctx)
return marshalResultOrError(api.ImageListResult{Images: images}, err)
case "image.show":
params, err := rpc.DecodeParams[api.ImageRefParams](req)
if err != nil {
return rpc.NewError("bad_request", err.Error())
}
image, err := d.img.FindImage(ctx, params.IDOrName)
return marshalResultOrError(api.ImageShowResult{Image: image}, err)
case "image.register":
params, err := rpc.DecodeParams[api.ImageRegisterParams](req)
if err != nil {
return rpc.NewError("bad_request", err.Error())
}
image, err := d.img.RegisterImage(ctx, params)
return marshalResultOrError(api.ImageShowResult{Image: image}, err)
case "image.promote":
params, err := rpc.DecodeParams[api.ImageRefParams](req)
if err != nil {
return rpc.NewError("bad_request", err.Error())
}
image, err := d.img.PromoteImage(ctx, params.IDOrName)
return marshalResultOrError(api.ImageShowResult{Image: image}, err)
case "image.delete":
params, err := rpc.DecodeParams[api.ImageRefParams](req)
if err != nil {
return rpc.NewError("bad_request", err.Error())
}
image, err := d.img.DeleteImage(ctx, params.IDOrName)
return marshalResultOrError(api.ImageShowResult{Image: image}, err)
case "image.pull":
params, err := rpc.DecodeParams[api.ImagePullParams](req)
if err != nil {
return rpc.NewError("bad_request", err.Error())
}
image, err := d.img.PullImage(ctx, params)
return marshalResultOrError(api.ImageShowResult{Image: image}, err)
case "kernel.list":
return marshalResultOrError(d.img.KernelList(ctx))
case "kernel.show":
params, err := rpc.DecodeParams[api.KernelRefParams](req)
if err != nil {
return rpc.NewError("bad_request", err.Error())
}
entry, err := d.img.KernelShow(ctx, params.Name)
return marshalResultOrError(api.KernelShowResult{Entry: entry}, err)
case "kernel.delete":
params, err := rpc.DecodeParams[api.KernelRefParams](req)
if err != nil {
return rpc.NewError("bad_request", err.Error())
}
err = d.img.KernelDelete(ctx, params.Name)
return marshalResultOrError(api.Empty{}, err)
case "kernel.import":
params, err := rpc.DecodeParams[api.KernelImportParams](req)
if err != nil {
return rpc.NewError("bad_request", err.Error())
}
entry, err := d.img.KernelImport(ctx, params)
return marshalResultOrError(api.KernelShowResult{Entry: entry}, err)
case "kernel.pull":
params, err := rpc.DecodeParams[api.KernelPullParams](req)
if err != nil {
return rpc.NewError("bad_request", err.Error())
}
entry, err := d.img.KernelPull(ctx, params)
return marshalResultOrError(api.KernelShowResult{Entry: entry}, err)
case "kernel.catalog":
return marshalResultOrError(d.img.KernelCatalog(ctx))
default:
return rpc.NewError("unknown_method", req.Method)
}
}
func (d *Daemon) backgroundLoop() {
statsTicker := time.NewTicker(d.config.StatsPollInterval)
staleTicker := time.NewTicker(model.DefaultStaleSweepInterval)
defer statsTicker.Stop()
defer staleTicker.Stop()
for {
select {
case <-d.closing:
return
case <-statsTicker.C:
if err := d.vm.pollStats(context.Background()); err != nil && d.logger != nil {
d.logger.Error("background stats poll failed", "error", err.Error())
}
case <-staleTicker.C:
if err := d.vm.stopStaleVMs(context.Background()); err != nil && d.logger != nil {
d.logger.Error("background stale sweep failed", "error", err.Error())
}
d.vm.pruneVMCreateOperations(time.Now().Add(-10 * time.Minute))
}
}
}
func (d *Daemon) reconcile(ctx context.Context) error {
op := d.beginOperation("daemon.reconcile")
vms, err := d.store.ListVMs(ctx)
if err != nil {
return op.fail(err)
}
for _, vm := range vms {
if err := d.vm.withVMLockByIDErr(ctx, vm.ID, func(vm model.VMRecord) error {
if vm.State != model.VMStateRunning {
// Belt-and-braces: a stopped VM should never have a
// scratch file or a cache entry. Clean up anything
// left by an ungraceful previous daemon crash.
d.vm.clearVMHandles(vm)
return nil
}
// Rebuild the in-memory handle cache by loading the per-VM
// scratch file and verifying the firecracker process is
// still alive.
h, alive, err := d.vm.rediscoverHandles(ctx, vm)
if err != nil && d.logger != nil {
d.logger.Warn("rediscover handles failed", "vm_id", vm.ID, "error", err.Error())
}
// Either way, seed the cache with what the scratch file
// claimed. If alive, subsequent vmAlive() calls pass; if
// not, cleanupRuntime needs these handles to know which
// kernel resources (DM / loops / tap) to tear down.
d.vm.setVMHandlesInMemory(vm.ID, h)
if alive {
return nil
}
op.stage("stale_vm", vmLogAttrs(vm)...)
_ = d.vm.cleanupRuntime(ctx, vm, true)
vm.State = model.VMStateStopped
vm.Runtime.State = model.VMStateStopped
d.vm.clearVMHandles(vm)
vm.UpdatedAt = model.Now()
return d.store.UpsertVM(ctx, vm)
}); err != nil {
return op.fail(err, "vm_id", vm.ID)
}
}
if err := d.vm.rebuildDNS(ctx); err != nil {
return op.fail(err)
}
op.done()
return nil
}
// FindVM stays on Daemon as a thin forwarder to the VM service lookup.
// Dispatch code reads the facade directly; tests that pre-date the
// service split keep compiling.
func (d *Daemon) FindVM(ctx context.Context, idOrName string) (model.VMRecord, error) {
return d.vm.FindVM(ctx, idOrName)
}
// FindImage stays on Daemon as a thin forwarder to the image service
// lookup so callers reading dispatch code see the obvious facade, and
// tests that pre-date the service split still compile.
func (d *Daemon) FindImage(ctx context.Context, idOrName string) (model.Image, error) {
return d.img.FindImage(ctx, idOrName)
}
func (d *Daemon) TouchVM(ctx context.Context, idOrName string) (model.VMRecord, error) {
return d.vm.TouchVM(ctx, idOrName)
}
// wireServices populates the four focused services and their peer
// references from the infrastructure already on d (runner, logger,
// config, layout, store, closing, plus the SSH-client test seams).
// Idempotent: each service is skipped if the field is already non-nil,
// so tests can preinstall stubs for the services they want to fake and
// let wireServices fill the rest. The peer-service closures on
// WorkspaceService capture d rather than a direct *VMService pointer so
// the ws↔vm construction order doesn't recurse: the closures read d.vm
// at call time, by which point it is populated.
func wireServices(d *Daemon) {
if d.net == nil {
d.net = newHostNetwork(hostNetworkDeps{
runner: d.runner,
logger: d.logger,
config: d.config,
layout: d.layout,
closing: d.closing,
})
}
if d.img == nil {
d.img = newImageService(imageServiceDeps{
runner: d.runner,
logger: d.logger,
config: d.config,
layout: d.layout,
store: d.store,
beginOperation: func(name string, attrs ...any) *operationLog {
return d.beginOperation(name, attrs...)
},
})
}
if d.ws == nil {
d.ws = newWorkspaceService(workspaceServiceDeps{
runner: d.runner,
logger: d.logger,
config: d.config,
layout: d.layout,
store: d.store,
repoInspector: ws.NewInspector(),
vmResolver: func(ctx context.Context, idOrName string) (model.VMRecord, error) {
return d.vm.FindVM(ctx, idOrName)
},
aliveChecker: func(vm model.VMRecord) bool {
return d.vm.vmAlive(vm)
},
waitGuestSSH: d.waitForGuestSSH,
dialGuest: d.dialGuest,
imageResolver: func(ctx context.Context, idOrName string) (model.Image, error) {
return d.FindImage(ctx, idOrName)
},
imageWorkSeed: func(ctx context.Context, image model.Image, fingerprint string) error {
return d.img.refreshManagedWorkSeedFingerprint(ctx, image, fingerprint)
},
withVMLockByRef: func(ctx context.Context, idOrName string, fn func(model.VMRecord) (model.VMRecord, error)) (model.VMRecord, error) {
return d.vm.withVMLockByRef(ctx, idOrName, fn)
},
beginOperation: d.beginOperation,
})
}
if d.vm == nil {
d.vm = newVMService(vmServiceDeps{
runner: d.runner,
logger: d.logger,
config: d.config,
layout: d.layout,
store: d.store,
net: d.net,
img: d.img,
ws: d.ws,
capHooks: d.buildCapabilityHooks(),
beginOperation: d.beginOperation,
vsockHostDevice: defaultVsockHostDevice,
})
}
if len(d.vmCaps) == 0 {
d.vmCaps = d.defaultCapabilities()
}
}
func marshalResultOrError(v any, err error) rpc.Response {
if err != nil {
return rpc.NewError("operation_failed", err.Error())
}
resp, marshalErr := rpc.NewResult(v)
if marshalErr != nil {
return rpc.NewError("marshal_failed", marshalErr.Error())
}
return resp
}
func exists(path string) bool {
_, err := os.Stat(path)
return err == nil
}