banger/internal/daemon/daemon.go
Thales Maciel 572bf32424
Remove runtime-bundle image dependencies
Hard-cut banger away from source-checkout runtime bundles as an implicit source of\nimage and host defaults. Managed images now own their full boot set,\nimage build starts from an existing registered image, and daemon startup\nno longer synthesizes a default image from host paths.\n\nResolve Firecracker from PATH or firecracker_bin, make SSH keys config-owned\nwith an auto-managed XDG default, replace the external name generator and\npackage manifests with Go code, and keep the vsock helper as a companion\nbinary instead of a user-managed runtime asset.\n\nUpdate the manual scripts, web/CLI forms, config surface, and docs around\nthe new build/manual flow and explicit image registration semantics.\n\nValidation: GOCACHE=/tmp/banger-gocache go test ./..., bash -n scripts/*.sh,\nand make build.
2026-03-21 18:34:53 -03:00

667 lines
19 KiB
Go

package daemon
import (
"bufio"
"context"
"database/sql"
"encoding/json"
"errors"
"fmt"
"log/slog"
"net"
"net/http"
"os"
"strings"
"sync"
"time"
"banger/internal/api"
"banger/internal/config"
"banger/internal/model"
"banger/internal/paths"
"banger/internal/rpc"
"banger/internal/store"
"banger/internal/system"
"banger/internal/vmdns"
)
type Daemon struct {
layout paths.Layout
config model.DaemonConfig
store *store.Store
runner system.CommandRunner
logger *slog.Logger
mu sync.Mutex
createOpsMu sync.Mutex
createOps map[string]*vmCreateOperationState
imageBuildOpsMu sync.Mutex
imageBuildOps map[string]*imageBuildOperationState
vmLocksMu sync.Mutex
vmLocks map[string]*sync.Mutex
tapPoolMu sync.Mutex
tapPool []string
tapPoolNext int
closing chan struct{}
once sync.Once
pid int
listener net.Listener
webListener net.Listener
webServer *http.Server
webURL string
vmDNS *vmdns.Server
vmCaps []vmCapability
imageBuild func(context.Context, imageBuildSpec) error
requestHandler func(context.Context, rpc.Request) rpc.Response
}
func Open(ctx context.Context) (d *Daemon, err error) {
layout, err := paths.Resolve()
if err != nil {
return nil, err
}
if err := paths.Ensure(layout); err != nil {
return nil, err
}
cfg, err := config.Load(layout)
if err != nil {
return nil, err
}
logger, normalizedLevel, err := newDaemonLogger(os.Stderr, cfg.LogLevel)
if err != nil {
return nil, err
}
cfg.LogLevel = normalizedLevel
db, err := store.Open(layout.DBPath)
if err != nil {
return nil, err
}
d = &Daemon{
layout: layout,
config: cfg,
store: db,
runner: system.NewRunner(),
logger: logger,
closing: make(chan struct{}),
pid: os.Getpid(),
}
d.logger.Info("daemon opened", "socket", layout.SocketPath, "state_dir", layout.StateDir, "log_level", cfg.LogLevel)
if err = d.startVMDNS(vmdns.DefaultListenAddr); err != nil {
d.logger.Error("daemon open failed", "stage", "start_vm_dns", "error", err.Error())
return nil, err
}
defer func() {
if err != nil {
_ = d.stopVMDNS()
}
}()
if err = d.reconcile(ctx); err != nil {
d.logger.Error("daemon open failed", "stage", "reconcile", "error", err.Error())
return nil, err
}
if err = d.initializeTapPool(ctx); err != nil {
d.logger.Error("daemon open failed", "stage", "initialize_tap_pool", "error", err.Error())
return nil, err
}
go d.ensureTapPool(context.Background())
return d, nil
}
func (d *Daemon) Close() error {
var err error
d.once.Do(func() {
if d.logger != nil {
d.logger.Info("daemon closing")
}
close(d.closing)
if d.listener != nil {
_ = d.listener.Close()
}
if d.webServer != nil {
_ = d.webServer.Close()
}
if d.webListener != nil {
_ = d.webListener.Close()
}
err = errors.Join(d.stopVMDNS(), d.store.Close())
})
return err
}
func (d *Daemon) Serve(ctx context.Context) error {
_ = os.Remove(d.layout.SocketPath)
listener, err := net.Listen("unix", d.layout.SocketPath)
if err != nil {
if d.logger != nil {
d.logger.Error("daemon listen failed", "socket", d.layout.SocketPath, "error", err.Error())
}
return err
}
d.listener = listener
defer listener.Close()
defer os.Remove(d.layout.SocketPath)
if err := os.Chmod(d.layout.SocketPath, 0o600); err != nil {
return err
}
if d.logger != nil {
d.logger.Info("daemon serving", "socket", d.layout.SocketPath, "pid", d.pid)
}
if err := d.startWebServer(); err != nil {
return err
}
go d.backgroundLoop()
for {
conn, err := listener.Accept()
if err != nil {
select {
case <-ctx.Done():
return nil
case <-d.closing:
return nil
default:
}
if ne, ok := err.(net.Error); ok && ne.Temporary() {
if d.logger != nil {
d.logger.Warn("daemon accept temporary failure", "error", err.Error())
}
time.Sleep(100 * time.Millisecond)
continue
}
if d.logger != nil {
d.logger.Error("daemon accept failed", "error", err.Error())
}
return err
}
go d.handleConn(conn)
}
}
func (d *Daemon) handleConn(conn net.Conn) {
defer conn.Close()
reader := bufio.NewReader(conn)
var req rpc.Request
if err := json.NewDecoder(reader).Decode(&req); err != nil {
if d.logger != nil {
d.logger.Warn("daemon request decode failed", "remote", conn.RemoteAddr().String(), "error", err.Error())
}
_ = json.NewEncoder(conn).Encode(rpc.NewError("bad_request", err.Error()))
return
}
reqCtx, cancel := context.WithCancel(context.Background())
defer cancel()
stopWatch := d.watchRequestDisconnect(conn, reader, req.Method, cancel)
defer stopWatch()
resp := d.dispatch(reqCtx, req)
if reqCtx.Err() != nil {
return
}
if err := json.NewEncoder(conn).Encode(resp); err != nil && d.logger != nil {
d.logger.Warn("daemon response encode failed", "method", req.Method, "remote", conn.RemoteAddr().String(), "error", err.Error())
}
}
func (d *Daemon) watchRequestDisconnect(conn net.Conn, reader *bufio.Reader, method string, cancel context.CancelFunc) func() {
if conn == nil || reader == nil {
return func() {}
}
done := make(chan struct{})
var once sync.Once
go func() {
go func() {
<-done
if deadlineSetter, ok := conn.(interface{ SetReadDeadline(time.Time) error }); ok {
_ = deadlineSetter.SetReadDeadline(time.Now())
}
}()
var buf [1]byte
for {
_, err := reader.Read(buf[:])
if err == nil {
continue
}
select {
case <-done:
return
default:
}
if d.logger != nil {
d.logger.Info("daemon request canceled", "method", method, "remote", conn.RemoteAddr().String(), "error", err.Error())
}
cancel()
return
}
}()
return func() {
once.Do(func() {
close(done)
})
}
}
func (d *Daemon) dispatch(ctx context.Context, req rpc.Request) rpc.Response {
if req.Version != rpc.Version {
return rpc.NewError("bad_version", fmt.Sprintf("unsupported version %d", req.Version))
}
if d.requestHandler != nil {
return d.requestHandler(ctx, req)
}
switch req.Method {
case "ping":
result, _ := rpc.NewResult(api.PingResult{Status: "ok", PID: d.pid, WebURL: d.webURL})
return result
case "shutdown":
go d.Close()
result, _ := rpc.NewResult(api.ShutdownResult{Status: "stopping"})
return result
case "vm.create":
params, err := rpc.DecodeParams[api.VMCreateParams](req)
if err != nil {
return rpc.NewError("bad_request", err.Error())
}
vm, err := d.CreateVM(ctx, params)
return marshalResultOrError(api.VMShowResult{VM: vm}, err)
case "vm.create.begin":
params, err := rpc.DecodeParams[api.VMCreateParams](req)
if err != nil {
return rpc.NewError("bad_request", err.Error())
}
op, err := d.BeginVMCreate(ctx, params)
return marshalResultOrError(api.VMCreateBeginResult{Operation: op}, err)
case "vm.create.status":
params, err := rpc.DecodeParams[api.VMCreateStatusParams](req)
if err != nil {
return rpc.NewError("bad_request", err.Error())
}
op, err := d.VMCreateStatus(ctx, params.ID)
return marshalResultOrError(api.VMCreateStatusResult{Operation: op}, err)
case "vm.create.cancel":
params, err := rpc.DecodeParams[api.VMCreateStatusParams](req)
if err != nil {
return rpc.NewError("bad_request", err.Error())
}
err = d.CancelVMCreate(ctx, params.ID)
return marshalResultOrError(api.Empty{}, err)
case "vm.list":
vms, err := d.store.ListVMs(ctx)
return marshalResultOrError(api.VMListResult{VMs: vms}, err)
case "vm.show":
params, err := rpc.DecodeParams[api.VMRefParams](req)
if err != nil {
return rpc.NewError("bad_request", err.Error())
}
vm, err := d.FindVM(ctx, params.IDOrName)
return marshalResultOrError(api.VMShowResult{VM: vm}, err)
case "vm.start":
params, err := rpc.DecodeParams[api.VMRefParams](req)
if err != nil {
return rpc.NewError("bad_request", err.Error())
}
vm, err := d.StartVM(ctx, params.IDOrName)
return marshalResultOrError(api.VMShowResult{VM: vm}, err)
case "vm.stop":
params, err := rpc.DecodeParams[api.VMRefParams](req)
if err != nil {
return rpc.NewError("bad_request", err.Error())
}
vm, err := d.StopVM(ctx, params.IDOrName)
return marshalResultOrError(api.VMShowResult{VM: vm}, err)
case "vm.kill":
params, err := rpc.DecodeParams[api.VMKillParams](req)
if err != nil {
return rpc.NewError("bad_request", err.Error())
}
vm, err := d.KillVM(ctx, params)
return marshalResultOrError(api.VMShowResult{VM: vm}, err)
case "vm.restart":
params, err := rpc.DecodeParams[api.VMRefParams](req)
if err != nil {
return rpc.NewError("bad_request", err.Error())
}
vm, err := d.RestartVM(ctx, params.IDOrName)
return marshalResultOrError(api.VMShowResult{VM: vm}, err)
case "vm.delete":
params, err := rpc.DecodeParams[api.VMRefParams](req)
if err != nil {
return rpc.NewError("bad_request", err.Error())
}
vm, err := d.DeleteVM(ctx, params.IDOrName)
return marshalResultOrError(api.VMShowResult{VM: vm}, err)
case "vm.set":
params, err := rpc.DecodeParams[api.VMSetParams](req)
if err != nil {
return rpc.NewError("bad_request", err.Error())
}
vm, err := d.SetVM(ctx, params)
return marshalResultOrError(api.VMShowResult{VM: vm}, err)
case "vm.stats":
params, err := rpc.DecodeParams[api.VMRefParams](req)
if err != nil {
return rpc.NewError("bad_request", err.Error())
}
vm, stats, err := d.GetVMStats(ctx, params.IDOrName)
return marshalResultOrError(api.VMStatsResult{VM: vm, Stats: stats}, err)
case "vm.logs":
params, err := rpc.DecodeParams[api.VMRefParams](req)
if err != nil {
return rpc.NewError("bad_request", err.Error())
}
vm, err := d.FindVM(ctx, params.IDOrName)
if err != nil {
return rpc.NewError("not_found", err.Error())
}
return marshalResultOrError(api.VMLogsResult{LogPath: vm.Runtime.LogPath}, nil)
case "vm.ssh":
params, err := rpc.DecodeParams[api.VMRefParams](req)
if err != nil {
return rpc.NewError("bad_request", err.Error())
}
vm, err := d.TouchVM(ctx, params.IDOrName)
if err != nil {
return rpc.NewError("not_found", err.Error())
}
if vm.State != model.VMStateRunning || !system.ProcessRunning(vm.Runtime.PID, vm.Runtime.APISockPath) {
return rpc.NewError("not_running", fmt.Sprintf("vm %s is not running", vm.Name))
}
return marshalResultOrError(api.VMSSHResult{Name: vm.Name, GuestIP: vm.Runtime.GuestIP}, nil)
case "vm.health":
params, err := rpc.DecodeParams[api.VMRefParams](req)
if err != nil {
return rpc.NewError("bad_request", err.Error())
}
result, err := d.HealthVM(ctx, params.IDOrName)
return marshalResultOrError(result, err)
case "vm.ping":
params, err := rpc.DecodeParams[api.VMRefParams](req)
if err != nil {
return rpc.NewError("bad_request", err.Error())
}
result, err := d.PingVM(ctx, params.IDOrName)
return marshalResultOrError(result, err)
case "vm.ports":
params, err := rpc.DecodeParams[api.VMRefParams](req)
if err != nil {
return rpc.NewError("bad_request", err.Error())
}
result, err := d.PortsVM(ctx, params.IDOrName)
return marshalResultOrError(result, err)
case "image.list":
images, err := d.store.ListImages(ctx)
return marshalResultOrError(api.ImageListResult{Images: images}, err)
case "image.show":
params, err := rpc.DecodeParams[api.ImageRefParams](req)
if err != nil {
return rpc.NewError("bad_request", err.Error())
}
image, err := d.FindImage(ctx, params.IDOrName)
return marshalResultOrError(api.ImageShowResult{Image: image}, err)
case "image.build":
params, err := rpc.DecodeParams[api.ImageBuildParams](req)
if err != nil {
return rpc.NewError("bad_request", err.Error())
}
image, err := d.BuildImage(ctx, params)
return marshalResultOrError(api.ImageShowResult{Image: image}, err)
case "image.build.begin":
params, err := rpc.DecodeParams[api.ImageBuildParams](req)
if err != nil {
return rpc.NewError("bad_request", err.Error())
}
op, err := d.BeginImageBuild(ctx, params)
return marshalResultOrError(api.ImageBuildBeginResult{Operation: op}, err)
case "image.build.status":
params, err := rpc.DecodeParams[api.ImageBuildStatusParams](req)
if err != nil {
return rpc.NewError("bad_request", err.Error())
}
op, err := d.ImageBuildStatus(ctx, params.ID)
return marshalResultOrError(api.ImageBuildStatusResult{Operation: op}, err)
case "image.build.cancel":
params, err := rpc.DecodeParams[api.ImageBuildStatusParams](req)
if err != nil {
return rpc.NewError("bad_request", err.Error())
}
err = d.CancelImageBuild(ctx, params.ID)
return marshalResultOrError(api.Empty{}, err)
case "image.register":
params, err := rpc.DecodeParams[api.ImageRegisterParams](req)
if err != nil {
return rpc.NewError("bad_request", err.Error())
}
image, err := d.RegisterImage(ctx, params)
return marshalResultOrError(api.ImageShowResult{Image: image}, err)
case "image.promote":
params, err := rpc.DecodeParams[api.ImageRefParams](req)
if err != nil {
return rpc.NewError("bad_request", err.Error())
}
image, err := d.PromoteImage(ctx, params.IDOrName)
return marshalResultOrError(api.ImageShowResult{Image: image}, err)
case "image.delete":
params, err := rpc.DecodeParams[api.ImageRefParams](req)
if err != nil {
return rpc.NewError("bad_request", err.Error())
}
image, err := d.DeleteImage(ctx, params.IDOrName)
return marshalResultOrError(api.ImageShowResult{Image: image}, err)
default:
return rpc.NewError("unknown_method", req.Method)
}
}
func (d *Daemon) backgroundLoop() {
statsTicker := time.NewTicker(d.config.StatsPollInterval)
staleTicker := time.NewTicker(model.DefaultStaleSweepInterval)
defer statsTicker.Stop()
defer staleTicker.Stop()
for {
select {
case <-d.closing:
return
case <-statsTicker.C:
if err := d.pollStats(context.Background()); err != nil && d.logger != nil {
d.logger.Error("background stats poll failed", "error", err.Error())
}
case <-staleTicker.C:
if err := d.stopStaleVMs(context.Background()); err != nil && d.logger != nil {
d.logger.Error("background stale sweep failed", "error", err.Error())
}
d.pruneVMCreateOperations(time.Now().Add(-10 * time.Minute))
d.pruneImageBuildOperations(time.Now().Add(-10 * time.Minute))
}
}
}
func (d *Daemon) startVMDNS(addr string) error {
server, err := vmdns.New(addr, d.logger)
if err != nil {
return err
}
d.vmDNS = server
if d.logger != nil {
d.logger.Info("vm dns serving", "dns_addr", server.Addr())
}
return nil
}
func (d *Daemon) stopVMDNS() error {
if d.vmDNS == nil {
return nil
}
err := d.vmDNS.Close()
d.vmDNS = nil
return err
}
func (d *Daemon) ensureDefaultImage(ctx context.Context) error {
_ = ctx
return nil
}
func (d *Daemon) reconcile(ctx context.Context) error {
op := d.beginOperation("daemon.reconcile")
vms, err := d.store.ListVMs(ctx)
if err != nil {
return op.fail(err)
}
for _, vm := range vms {
if err := d.withVMLockByIDErr(ctx, vm.ID, func(vm model.VMRecord) error {
if vm.State != model.VMStateRunning {
return nil
}
if system.ProcessRunning(vm.Runtime.PID, vm.Runtime.APISockPath) {
return nil
}
op.stage("stale_vm", vmLogAttrs(vm)...)
_ = d.cleanupRuntime(ctx, vm, true)
vm.State = model.VMStateStopped
vm.Runtime.State = model.VMStateStopped
clearRuntimeHandles(&vm)
vm.UpdatedAt = model.Now()
return d.store.UpsertVM(ctx, vm)
}); err != nil {
return op.fail(err, "vm_id", vm.ID)
}
}
if err := d.rebuildDNS(ctx); err != nil {
return op.fail(err)
}
op.done()
return nil
}
func (d *Daemon) FindVM(ctx context.Context, idOrName string) (model.VMRecord, error) {
if idOrName == "" {
return model.VMRecord{}, errors.New("vm id or name is required")
}
if vm, err := d.store.GetVM(ctx, idOrName); err == nil {
return vm, nil
}
vms, err := d.store.ListVMs(ctx)
if err != nil {
return model.VMRecord{}, err
}
matchCount := 0
var match model.VMRecord
for _, vm := range vms {
if strings.HasPrefix(vm.ID, idOrName) || strings.HasPrefix(vm.Name, idOrName) {
match = vm
matchCount++
}
}
if matchCount == 1 {
return match, nil
}
if matchCount > 1 {
return model.VMRecord{}, fmt.Errorf("multiple VMs match %q", idOrName)
}
return model.VMRecord{}, fmt.Errorf("vm %q not found", idOrName)
}
func (d *Daemon) FindImage(ctx context.Context, idOrName string) (model.Image, error) {
if idOrName == "" {
return model.Image{}, errors.New("image id or name is required")
}
if image, err := d.store.GetImageByName(ctx, idOrName); err == nil {
return image, nil
}
if image, err := d.store.GetImageByID(ctx, idOrName); err == nil {
return image, nil
}
images, err := d.store.ListImages(ctx)
if err != nil {
return model.Image{}, err
}
matchCount := 0
var match model.Image
for _, image := range images {
if strings.HasPrefix(image.ID, idOrName) || strings.HasPrefix(image.Name, idOrName) {
match = image
matchCount++
}
}
if matchCount == 1 {
return match, nil
}
if matchCount > 1 {
return model.Image{}, fmt.Errorf("multiple images match %q", idOrName)
}
return model.Image{}, fmt.Errorf("image %q not found", idOrName)
}
func (d *Daemon) TouchVM(ctx context.Context, idOrName string) (model.VMRecord, error) {
return d.withVMLockByRef(ctx, idOrName, func(vm model.VMRecord) (model.VMRecord, error) {
system.TouchNow(&vm)
if err := d.store.UpsertVM(ctx, vm); err != nil {
return model.VMRecord{}, err
}
return vm, nil
})
}
func (d *Daemon) withVMLockByRef(ctx context.Context, idOrName string, fn func(model.VMRecord) (model.VMRecord, error)) (model.VMRecord, error) {
vm, err := d.FindVM(ctx, idOrName)
if err != nil {
return model.VMRecord{}, err
}
return d.withVMLockByID(ctx, vm.ID, fn)
}
func (d *Daemon) withVMLockByID(ctx context.Context, id string, fn func(model.VMRecord) (model.VMRecord, error)) (model.VMRecord, error) {
if strings.TrimSpace(id) == "" {
return model.VMRecord{}, errors.New("vm id is required")
}
unlock := d.lockVMID(id)
defer unlock()
vm, err := d.store.GetVMByID(ctx, id)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return model.VMRecord{}, fmt.Errorf("vm %q not found", id)
}
return model.VMRecord{}, err
}
return fn(vm)
}
func (d *Daemon) withVMLockByIDErr(ctx context.Context, id string, fn func(model.VMRecord) error) error {
_, err := d.withVMLockByID(ctx, id, func(vm model.VMRecord) (model.VMRecord, error) {
if err := fn(vm); err != nil {
return model.VMRecord{}, err
}
return vm, nil
})
return err
}
func (d *Daemon) lockVMID(id string) func() {
d.vmLocksMu.Lock()
if d.vmLocks == nil {
d.vmLocks = make(map[string]*sync.Mutex)
}
lock, ok := d.vmLocks[id]
if !ok {
lock = &sync.Mutex{}
d.vmLocks[id] = lock
}
d.vmLocksMu.Unlock()
lock.Lock()
return lock.Unlock
}
func marshalResultOrError(v any, err error) rpc.Response {
if err != nil {
return rpc.NewError("operation_failed", err.Error())
}
resp, marshalErr := rpc.NewResult(v)
if marshalErr != nil {
return rpc.NewError("marshal_failed", marshalErr.Error())
}
return resp
}
func exists(path string) bool {
_, err := os.Stat(path)
return err == nil
}