banger/internal/daemon/daemon.go
Thales Maciel 466a7c30c4
daemon split (4/5): extract *VMService service
Phase 4 of the daemon god-struct refactor. VM lifecycle, create-op
registry, handle cache, disk provisioning, stats polling, ports
query, and the per-VM lock set all move off *Daemon onto *VMService.

Daemon keeps thin forwarders only for FindVM / TouchVM (dispatch
surface) and is otherwise out of VM lifecycle. Lazy-init via
d.vmSvc() mirrors the earlier services so test literals like
\`&Daemon{store: db, runner: r}\` still get a functional service
without spelling one out.

Three small cleanups along the way:

  * preflight helpers (validateStartPrereqs / addBaseStartPrereqs
    / addBaseStartCommandPrereqs / validateWorkDiskResizePrereqs)
    move with the VM methods that call them.
  * cleanupRuntime / rebuildDNS move to *VMService, with
    HostNetwork primitives (findFirecrackerPID, cleanupDMSnapshot,
    killVMProcess, releaseTap, waitForExit, sendCtrlAltDel)
    reached through s.net instead of the hostNet() facade.
  * vsockAgentBinary becomes a package-level function so both
    *Daemon (doctor) and *VMService (preflight) call one entry
    point instead of each owning a forwarder method.

WorkspaceService's peer deps switch from eager method values to
closures — vmSvc() constructs VMService with WorkspaceService as a
peer, so resolving d.vmSvc().FindVM at construction time recursed
through workspaceSvc() → vmSvc(). Closures defer the lookup to call
time.

Pure code motion: build + unit tests green, lint clean. No RPC
surface or lock-ordering changes.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-20 20:57:05 -03:00

607 lines
19 KiB
Go

package daemon
import (
"bufio"
"context"
"encoding/json"
"errors"
"fmt"
"log/slog"
"net"
"os"
"sync"
"time"
"banger/internal/api"
"banger/internal/buildinfo"
"banger/internal/config"
"banger/internal/model"
"banger/internal/paths"
"banger/internal/rpc"
"banger/internal/store"
"banger/internal/system"
"banger/internal/vmdns"
)
// Daemon is the composition root: shared infrastructure (store,
// runner, logger, layout, config) plus pointers to the four focused
// services that own behavior. Open wires the services; the dispatch
// loop forwards RPCs to them. No lifecycle / image / workspace /
// networking behavior lives on *Daemon itself — it's wiring.
type Daemon struct {
layout paths.Layout
config model.DaemonConfig
store *store.Store
runner system.CommandRunner
logger *slog.Logger
net *HostNetwork
img *ImageService
ws *WorkspaceService
vm *VMService
closing chan struct{}
once sync.Once
pid int
listener net.Listener
vmCaps []vmCapability
requestHandler func(context.Context, rpc.Request) rpc.Response
guestWaitForSSH func(context.Context, string, string, time.Duration) error
guestDial func(context.Context, string, string) (guestSSHClient, error)
}
func Open(ctx context.Context) (d *Daemon, err error) {
layout, err := paths.Resolve()
if err != nil {
return nil, err
}
if err := paths.Ensure(layout); err != nil {
return nil, err
}
cfg, err := config.Load(layout)
if err != nil {
return nil, err
}
logger, normalizedLevel, err := newDaemonLogger(os.Stderr, cfg.LogLevel)
if err != nil {
return nil, err
}
cfg.LogLevel = normalizedLevel
db, err := store.Open(layout.DBPath)
if err != nil {
return nil, err
}
closing := make(chan struct{})
runner := system.NewRunner()
d = &Daemon{
layout: layout,
config: cfg,
store: db,
runner: runner,
logger: logger,
closing: closing,
pid: os.Getpid(),
net: newHostNetwork(hostNetworkDeps{
runner: runner,
logger: logger,
config: cfg,
layout: layout,
closing: closing,
}),
}
// From here on, every failure path must run Close() so the host
// state we touched (DNS listener goroutine, resolvectl routing,
// SQLite handle, future side effects) gets unwound. Close is
// idempotent + nil-guarded so it's safe to call on a partially
// initialised daemon — `d.vmDNS == nil` and friends short-circuit
// the teardown of components we never set up.
defer func() {
if err != nil {
_ = d.Close()
}
}()
d.ensureVMSSHClientConfig()
d.logger.Info("daemon opened", "socket", layout.SocketPath, "state_dir", layout.StateDir, "log_level", cfg.LogLevel)
if err = d.hostNet().startVMDNS(vmdns.DefaultListenAddr); err != nil {
d.logger.Error("daemon open failed", "stage", "start_vm_dns", "error", err.Error())
return nil, err
}
if err = d.reconcile(ctx); err != nil {
d.logger.Error("daemon open failed", "stage", "reconcile", "error", err.Error())
return nil, err
}
d.hostNet().ensureVMDNSResolverRouting(ctx)
// Seed HostNetwork's pool index from taps already claimed by VMs
// on disk so newly warmed pool entries don't collide with them.
if d.config.TapPoolSize > 0 && d.store != nil {
vms, listErr := d.store.ListVMs(ctx)
if listErr != nil {
d.logger.Error("daemon open failed", "stage", "initialize_tap_pool", "error", listErr.Error())
return nil, listErr
}
used := make([]string, 0, len(vms))
for _, vm := range vms {
if tap := d.vmSvc().vmHandles(vm.ID).TapDevice; tap != "" {
used = append(used, tap)
}
}
d.hostNet().initializeTapPool(used)
}
go d.hostNet().ensureTapPool(context.Background())
return d, nil
}
func (d *Daemon) Close() error {
var err error
d.once.Do(func() {
if d.logger != nil {
d.logger.Info("daemon closing")
}
close(d.closing)
if d.listener != nil {
_ = d.listener.Close()
}
err = errors.Join(d.hostNet().clearVMDNSResolverRouting(context.Background()), d.hostNet().stopVMDNS(), d.store.Close())
})
return err
}
func (d *Daemon) Serve(ctx context.Context) error {
_ = os.Remove(d.layout.SocketPath)
listener, err := net.Listen("unix", d.layout.SocketPath)
if err != nil {
if d.logger != nil {
d.logger.Error("daemon listen failed", "socket", d.layout.SocketPath, "error", err.Error())
}
return err
}
d.listener = listener
defer listener.Close()
defer os.Remove(d.layout.SocketPath)
if err := os.Chmod(d.layout.SocketPath, 0o600); err != nil {
return err
}
if d.logger != nil {
d.logger.Info("daemon serving", "socket", d.layout.SocketPath, "pid", d.pid)
}
go d.backgroundLoop()
for {
conn, err := listener.Accept()
if err != nil {
select {
case <-ctx.Done():
return nil
case <-d.closing:
return nil
default:
}
if _, ok := err.(net.Error); ok {
if d.logger != nil {
d.logger.Warn("daemon accept temporary failure", "error", err.Error())
}
time.Sleep(100 * time.Millisecond)
continue
}
if d.logger != nil {
d.logger.Error("daemon accept failed", "error", err.Error())
}
return err
}
go d.handleConn(conn)
}
}
func (d *Daemon) handleConn(conn net.Conn) {
defer conn.Close()
reader := bufio.NewReader(conn)
var req rpc.Request
if err := json.NewDecoder(reader).Decode(&req); err != nil {
if d.logger != nil {
d.logger.Warn("daemon request decode failed", "remote", conn.RemoteAddr().String(), "error", err.Error())
}
_ = json.NewEncoder(conn).Encode(rpc.NewError("bad_request", err.Error()))
return
}
reqCtx, cancel := context.WithCancel(context.Background())
defer cancel()
stopWatch := d.watchRequestDisconnect(conn, reader, req.Method, cancel)
defer stopWatch()
resp := d.dispatch(reqCtx, req)
if reqCtx.Err() != nil {
return
}
if err := json.NewEncoder(conn).Encode(resp); err != nil && d.logger != nil {
d.logger.Warn("daemon response encode failed", "method", req.Method, "remote", conn.RemoteAddr().String(), "error", err.Error())
}
}
func (d *Daemon) watchRequestDisconnect(conn net.Conn, reader *bufio.Reader, method string, cancel context.CancelFunc) func() {
if conn == nil || reader == nil {
return func() {}
}
done := make(chan struct{})
var once sync.Once
go func() {
go func() {
<-done
if deadlineSetter, ok := conn.(interface{ SetReadDeadline(time.Time) error }); ok {
_ = deadlineSetter.SetReadDeadline(time.Now())
}
}()
var buf [1]byte
for {
_, err := reader.Read(buf[:])
if err == nil {
continue
}
select {
case <-done:
return
default:
}
if d.logger != nil {
d.logger.Info("daemon request canceled", "method", method, "remote", conn.RemoteAddr().String(), "error", err.Error())
}
cancel()
return
}
}()
return func() {
once.Do(func() {
close(done)
})
}
}
func (d *Daemon) dispatch(ctx context.Context, req rpc.Request) rpc.Response {
if req.Version != rpc.Version {
return rpc.NewError("bad_version", fmt.Sprintf("unsupported version %d", req.Version))
}
if d.requestHandler != nil {
return d.requestHandler(ctx, req)
}
switch req.Method {
case "ping":
info := buildinfo.Current()
result, _ := rpc.NewResult(api.PingResult{
Status: "ok",
PID: d.pid,
Version: info.Version,
Commit: info.Commit,
BuiltAt: info.BuiltAt,
})
return result
case "shutdown":
go d.Close()
result, _ := rpc.NewResult(api.ShutdownResult{Status: "stopping"})
return result
case "vm.create":
params, err := rpc.DecodeParams[api.VMCreateParams](req)
if err != nil {
return rpc.NewError("bad_request", err.Error())
}
vm, err := d.vmSvc().CreateVM(ctx, params)
return marshalResultOrError(api.VMShowResult{VM: vm}, err)
case "vm.create.begin":
params, err := rpc.DecodeParams[api.VMCreateParams](req)
if err != nil {
return rpc.NewError("bad_request", err.Error())
}
op, err := d.vmSvc().BeginVMCreate(ctx, params)
return marshalResultOrError(api.VMCreateBeginResult{Operation: op}, err)
case "vm.create.status":
params, err := rpc.DecodeParams[api.VMCreateStatusParams](req)
if err != nil {
return rpc.NewError("bad_request", err.Error())
}
op, err := d.vmSvc().VMCreateStatus(ctx, params.ID)
return marshalResultOrError(api.VMCreateStatusResult{Operation: op}, err)
case "vm.create.cancel":
params, err := rpc.DecodeParams[api.VMCreateStatusParams](req)
if err != nil {
return rpc.NewError("bad_request", err.Error())
}
err = d.vmSvc().CancelVMCreate(ctx, params.ID)
return marshalResultOrError(api.Empty{}, err)
case "vm.list":
vms, err := d.store.ListVMs(ctx)
return marshalResultOrError(api.VMListResult{VMs: vms}, err)
case "vm.show":
params, err := rpc.DecodeParams[api.VMRefParams](req)
if err != nil {
return rpc.NewError("bad_request", err.Error())
}
vm, err := d.vmSvc().FindVM(ctx, params.IDOrName)
return marshalResultOrError(api.VMShowResult{VM: vm}, err)
case "vm.start":
params, err := rpc.DecodeParams[api.VMRefParams](req)
if err != nil {
return rpc.NewError("bad_request", err.Error())
}
vm, err := d.vmSvc().StartVM(ctx, params.IDOrName)
return marshalResultOrError(api.VMShowResult{VM: vm}, err)
case "vm.stop":
params, err := rpc.DecodeParams[api.VMRefParams](req)
if err != nil {
return rpc.NewError("bad_request", err.Error())
}
vm, err := d.vmSvc().StopVM(ctx, params.IDOrName)
return marshalResultOrError(api.VMShowResult{VM: vm}, err)
case "vm.kill":
params, err := rpc.DecodeParams[api.VMKillParams](req)
if err != nil {
return rpc.NewError("bad_request", err.Error())
}
vm, err := d.vmSvc().KillVM(ctx, params)
return marshalResultOrError(api.VMShowResult{VM: vm}, err)
case "vm.restart":
params, err := rpc.DecodeParams[api.VMRefParams](req)
if err != nil {
return rpc.NewError("bad_request", err.Error())
}
vm, err := d.vmSvc().RestartVM(ctx, params.IDOrName)
return marshalResultOrError(api.VMShowResult{VM: vm}, err)
case "vm.delete":
params, err := rpc.DecodeParams[api.VMRefParams](req)
if err != nil {
return rpc.NewError("bad_request", err.Error())
}
vm, err := d.vmSvc().DeleteVM(ctx, params.IDOrName)
return marshalResultOrError(api.VMShowResult{VM: vm}, err)
case "vm.set":
params, err := rpc.DecodeParams[api.VMSetParams](req)
if err != nil {
return rpc.NewError("bad_request", err.Error())
}
vm, err := d.vmSvc().SetVM(ctx, params)
return marshalResultOrError(api.VMShowResult{VM: vm}, err)
case "vm.stats":
params, err := rpc.DecodeParams[api.VMRefParams](req)
if err != nil {
return rpc.NewError("bad_request", err.Error())
}
vm, stats, err := d.vmSvc().GetVMStats(ctx, params.IDOrName)
return marshalResultOrError(api.VMStatsResult{VM: vm, Stats: stats}, err)
case "vm.logs":
params, err := rpc.DecodeParams[api.VMRefParams](req)
if err != nil {
return rpc.NewError("bad_request", err.Error())
}
vm, err := d.vmSvc().FindVM(ctx, params.IDOrName)
if err != nil {
return rpc.NewError("not_found", err.Error())
}
return marshalResultOrError(api.VMLogsResult{LogPath: vm.Runtime.LogPath}, nil)
case "vm.ssh":
params, err := rpc.DecodeParams[api.VMRefParams](req)
if err != nil {
return rpc.NewError("bad_request", err.Error())
}
vm, err := d.vmSvc().TouchVM(ctx, params.IDOrName)
if err != nil {
return rpc.NewError("not_found", err.Error())
}
if !d.vmSvc().vmAlive(vm) {
return rpc.NewError("not_running", fmt.Sprintf("vm %s is not running", vm.Name))
}
return marshalResultOrError(api.VMSSHResult{Name: vm.Name, GuestIP: vm.Runtime.GuestIP}, nil)
case "vm.health":
params, err := rpc.DecodeParams[api.VMRefParams](req)
if err != nil {
return rpc.NewError("bad_request", err.Error())
}
result, err := d.vmSvc().HealthVM(ctx, params.IDOrName)
return marshalResultOrError(result, err)
case "vm.ping":
params, err := rpc.DecodeParams[api.VMRefParams](req)
if err != nil {
return rpc.NewError("bad_request", err.Error())
}
result, err := d.vmSvc().PingVM(ctx, params.IDOrName)
return marshalResultOrError(result, err)
case "vm.ports":
params, err := rpc.DecodeParams[api.VMRefParams](req)
if err != nil {
return rpc.NewError("bad_request", err.Error())
}
result, err := d.vmSvc().PortsVM(ctx, params.IDOrName)
return marshalResultOrError(result, err)
case "vm.workspace.prepare":
params, err := rpc.DecodeParams[api.VMWorkspacePrepareParams](req)
if err != nil {
return rpc.NewError("bad_request", err.Error())
}
workspace, err := d.workspaceSvc().PrepareVMWorkspace(ctx, params)
return marshalResultOrError(api.VMWorkspacePrepareResult{Workspace: workspace}, err)
case "vm.workspace.export":
params, err := rpc.DecodeParams[api.WorkspaceExportParams](req)
if err != nil {
return rpc.NewError("bad_request", err.Error())
}
result, err := d.workspaceSvc().ExportVMWorkspace(ctx, params)
return marshalResultOrError(result, err)
case "image.list":
images, err := d.store.ListImages(ctx)
return marshalResultOrError(api.ImageListResult{Images: images}, err)
case "image.show":
params, err := rpc.DecodeParams[api.ImageRefParams](req)
if err != nil {
return rpc.NewError("bad_request", err.Error())
}
image, err := d.imageSvc().FindImage(ctx, params.IDOrName)
return marshalResultOrError(api.ImageShowResult{Image: image}, err)
case "image.register":
params, err := rpc.DecodeParams[api.ImageRegisterParams](req)
if err != nil {
return rpc.NewError("bad_request", err.Error())
}
image, err := d.imageSvc().RegisterImage(ctx, params)
return marshalResultOrError(api.ImageShowResult{Image: image}, err)
case "image.promote":
params, err := rpc.DecodeParams[api.ImageRefParams](req)
if err != nil {
return rpc.NewError("bad_request", err.Error())
}
image, err := d.imageSvc().PromoteImage(ctx, params.IDOrName)
return marshalResultOrError(api.ImageShowResult{Image: image}, err)
case "image.delete":
params, err := rpc.DecodeParams[api.ImageRefParams](req)
if err != nil {
return rpc.NewError("bad_request", err.Error())
}
image, err := d.imageSvc().DeleteImage(ctx, params.IDOrName)
return marshalResultOrError(api.ImageShowResult{Image: image}, err)
case "image.pull":
params, err := rpc.DecodeParams[api.ImagePullParams](req)
if err != nil {
return rpc.NewError("bad_request", err.Error())
}
image, err := d.imageSvc().PullImage(ctx, params)
return marshalResultOrError(api.ImageShowResult{Image: image}, err)
case "kernel.list":
return marshalResultOrError(d.imageSvc().KernelList(ctx))
case "kernel.show":
params, err := rpc.DecodeParams[api.KernelRefParams](req)
if err != nil {
return rpc.NewError("bad_request", err.Error())
}
entry, err := d.imageSvc().KernelShow(ctx, params.Name)
return marshalResultOrError(api.KernelShowResult{Entry: entry}, err)
case "kernel.delete":
params, err := rpc.DecodeParams[api.KernelRefParams](req)
if err != nil {
return rpc.NewError("bad_request", err.Error())
}
err = d.imageSvc().KernelDelete(ctx, params.Name)
return marshalResultOrError(api.Empty{}, err)
case "kernel.import":
params, err := rpc.DecodeParams[api.KernelImportParams](req)
if err != nil {
return rpc.NewError("bad_request", err.Error())
}
entry, err := d.imageSvc().KernelImport(ctx, params)
return marshalResultOrError(api.KernelShowResult{Entry: entry}, err)
case "kernel.pull":
params, err := rpc.DecodeParams[api.KernelPullParams](req)
if err != nil {
return rpc.NewError("bad_request", err.Error())
}
entry, err := d.imageSvc().KernelPull(ctx, params)
return marshalResultOrError(api.KernelShowResult{Entry: entry}, err)
case "kernel.catalog":
return marshalResultOrError(d.imageSvc().KernelCatalog(ctx))
default:
return rpc.NewError("unknown_method", req.Method)
}
}
func (d *Daemon) backgroundLoop() {
statsTicker := time.NewTicker(d.config.StatsPollInterval)
staleTicker := time.NewTicker(model.DefaultStaleSweepInterval)
defer statsTicker.Stop()
defer staleTicker.Stop()
for {
select {
case <-d.closing:
return
case <-statsTicker.C:
if err := d.vmSvc().pollStats(context.Background()); err != nil && d.logger != nil {
d.logger.Error("background stats poll failed", "error", err.Error())
}
case <-staleTicker.C:
if err := d.vmSvc().stopStaleVMs(context.Background()); err != nil && d.logger != nil {
d.logger.Error("background stale sweep failed", "error", err.Error())
}
d.vmSvc().pruneVMCreateOperations(time.Now().Add(-10 * time.Minute))
}
}
}
func (d *Daemon) ensureDefaultImage(ctx context.Context) error {
_ = ctx
return nil
}
func (d *Daemon) reconcile(ctx context.Context) error {
op := d.beginOperation("daemon.reconcile")
vms, err := d.store.ListVMs(ctx)
if err != nil {
return op.fail(err)
}
for _, vm := range vms {
if err := d.vmSvc().withVMLockByIDErr(ctx, vm.ID, func(vm model.VMRecord) error {
if vm.State != model.VMStateRunning {
// Belt-and-braces: a stopped VM should never have a
// scratch file or a cache entry. Clean up anything
// left by an ungraceful previous daemon crash.
d.vmSvc().clearVMHandles(vm)
return nil
}
// Rebuild the in-memory handle cache by loading the per-VM
// scratch file and verifying the firecracker process is
// still alive.
h, alive, err := d.vmSvc().rediscoverHandles(ctx, vm)
if err != nil && d.logger != nil {
d.logger.Warn("rediscover handles failed", "vm_id", vm.ID, "error", err.Error())
}
// Either way, seed the cache with what the scratch file
// claimed. If alive, subsequent vmAlive() calls pass; if
// not, cleanupRuntime needs these handles to know which
// kernel resources (DM / loops / tap) to tear down.
d.vmSvc().setVMHandlesInMemory(vm.ID, h)
if alive {
return nil
}
op.stage("stale_vm", vmLogAttrs(vm)...)
_ = d.vmSvc().cleanupRuntime(ctx, vm, true)
vm.State = model.VMStateStopped
vm.Runtime.State = model.VMStateStopped
d.vmSvc().clearVMHandles(vm)
vm.UpdatedAt = model.Now()
return d.store.UpsertVM(ctx, vm)
}); err != nil {
return op.fail(err, "vm_id", vm.ID)
}
}
if err := d.vmSvc().rebuildDNS(ctx); err != nil {
return op.fail(err)
}
op.done()
return nil
}
// FindVM stays on Daemon as a thin forwarder to the VM service lookup.
// Dispatch code reads the facade directly; tests that pre-date the
// service split keep compiling.
func (d *Daemon) FindVM(ctx context.Context, idOrName string) (model.VMRecord, error) {
return d.vmSvc().FindVM(ctx, idOrName)
}
// FindImage stays on Daemon as a thin forwarder to the image service
// lookup so callers reading dispatch code see the obvious facade, and
// tests that pre-date the service split still compile.
func (d *Daemon) FindImage(ctx context.Context, idOrName string) (model.Image, error) {
return d.imageSvc().FindImage(ctx, idOrName)
}
func (d *Daemon) TouchVM(ctx context.Context, idOrName string) (model.VMRecord, error) {
return d.vmSvc().TouchVM(ctx, idOrName)
}
func marshalResultOrError(v any, err error) rpc.Response {
if err != nil {
return rpc.NewError("operation_failed", err.Error())
}
resp, marshalErr := rpc.NewResult(v)
if marshalErr != nil {
return rpc.NewError("marshal_failed", marshalErr.Error())
}
return resp
}
func exists(path string) bool {
_, err := os.Stat(path)
return err == nil
}