From ea0db1e17e881268a68cdf10c41f8f0e0ae2c1b6 Mon Sep 17 00:00:00 2001 From: Thales Maciel Date: Wed, 15 Apr 2026 15:47:08 -0300 Subject: [PATCH] Split internal/daemon vm.go and guest_sessions.go by concern vm.go (1529 LOC) splits into vm_create, vm_lifecycle, vm_set, vm_stats, vm_disk, vm_authsync; firecracker/DNS/helpers stay in vm.go. guest_sessions.go (1266 LOC) splits into session_controller, session_lifecycle, session_attach, session_stream; scripts and helpers stay in guest_sessions.go. Mechanical move only. No behavior change. Adds doc.go and ARCHITECTURE.md capturing subsystem map and current lock ordering as the baseline for the upcoming subsystem extraction. Co-Authored-By: Claude Sonnet 4.6 --- internal/daemon/ARCHITECTURE.md | 59 ++ internal/daemon/doc.go | 61 ++ internal/daemon/guest_sessions.go | 650 -------------- internal/daemon/session_attach.go | 224 +++++ internal/daemon/session_controller.go | 152 ++++ internal/daemon/session_lifecycle.go | 213 +++++ internal/daemon/session_stream.go | 119 +++ internal/daemon/vm.go | 1196 ------------------------- internal/daemon/vm_authsync.go | 353 ++++++++ internal/daemon/vm_create.go | 131 +++ internal/daemon/vm_disk.go | 159 ++++ internal/daemon/vm_lifecycle.go | 386 ++++++++ internal/daemon/vm_set.go | 87 ++ internal/daemon/vm_stats.go | 157 ++++ 14 files changed, 2101 insertions(+), 1846 deletions(-) create mode 100644 internal/daemon/ARCHITECTURE.md create mode 100644 internal/daemon/doc.go create mode 100644 internal/daemon/session_attach.go create mode 100644 internal/daemon/session_controller.go create mode 100644 internal/daemon/session_lifecycle.go create mode 100644 internal/daemon/session_stream.go create mode 100644 internal/daemon/vm_authsync.go create mode 100644 internal/daemon/vm_create.go create mode 100644 internal/daemon/vm_disk.go create mode 100644 internal/daemon/vm_lifecycle.go create mode 100644 internal/daemon/vm_set.go create mode 100644 internal/daemon/vm_stats.go diff --git a/internal/daemon/ARCHITECTURE.md b/internal/daemon/ARCHITECTURE.md new file mode 100644 index 0000000..5d9af67 --- /dev/null +++ b/internal/daemon/ARCHITECTURE.md @@ -0,0 +1,59 @@ +# `internal/daemon` architecture + +This document captures the current (pre-refactor) layout of the daemon +package and the lock ordering its callers must respect. It is the baseline +against which the phased split described in +`~/.claude/plans/fluffy-seeking-teapot.md` is executed. + +## Composition + +`Daemon` is a single struct aggregating state for every subsystem: + +- Layout, config, store, runner, logger, pid — infrastructure handles. +- `mu sync.Mutex` — coarse lock, currently guards guest session controller + map mutations and image registry mutations. +- `vmLocks sync.Map` — per-VM `*sync.Mutex`, one per VM ID. +- `createOps`, `createOpsMu` — in-flight `vm create` operations. +- `imageBuildOps`, `imageBuildOpsMu` — in-flight `image build` operations. +- `tapPool`, `tapPoolNext`, `tapPoolMu` — TAP interface pool. +- `sessionControllers` — active guest session controllers (guarded by `mu`). +- `listener`, `webListener`, `webServer`, `webURL`, `vmDNS` — networking. +- `vmCaps` — registered VM capability hooks. +- `imageBuild`, `requestHandler`, `guestWaitForSSH`, `guestDial`, + `waitForGuestSessionReady` — injectable seams used by tests. + +## Lock ordering + +Acquire in this order, release in reverse. Never acquire in the opposite +direction. + +``` +vmLocks[id] → mu → {createOpsMu, imageBuildOpsMu, tapPoolMu} +``` + +Notes: + +- `vmLocks[id]` is the outer lock for any operation scoped to a single VM. + Acquired via `withVMLockByID` / `withVMLockByRef`. +- `mu` is currently load-bearing for both session controller lookups and + image registry changes. Holding it while calling into guest SSH is + discouraged; prefer copying needed state out under the lock and releasing + before blocking I/O. +- The three subsystem locks (`createOpsMu`, `imageBuildOpsMu`, `tapPoolMu`) + are leaves. Nothing else is acquired while one is held. + +The upcoming Phase 2 refactor will retire `mu` entirely by giving each +concern it currently guards its own owning type and lock. At that point +the ordering collapses to `vmLocks[id] → subsystem-local lock`. + +## External API + +Only `internal/cli` imports this package. The surface is: + +- `daemon.Open(ctx) (*Daemon, error)` +- `(*Daemon).Serve(ctx) error` +- `(*Daemon).Close() error` +- `daemon.Doctor(...)` — host diagnostics (no receiver). + +All other `*Daemon` methods are reached only through the RPC `dispatch` +switch in `daemon.go` and are free to move/rename during refactoring. diff --git a/internal/daemon/doc.go b/internal/daemon/doc.go new file mode 100644 index 0000000..f8b91bf --- /dev/null +++ b/internal/daemon/doc.go @@ -0,0 +1,61 @@ +// Package daemon hosts the Banger daemon process. +// +// The daemon exposes a JSON-RPC endpoint over a Unix socket and, optionally, +// a local web UI. It owns VM lifecycle, image management, guest sessions, +// host networking bootstrap, and state persistence via internal/store. +// +// The package is organised into cohesive groups. A phased refactor is +// splitting each group into a subpackage; file names below reflect the +// current (in-progress) grouping. +// +// VM lifecycle: +// +// vm_create.go CreateVM and create-time disk provisioning +// vm_lifecycle.go Start/Stop/Restart/Kill/Delete +// vm_set.go SetVM mutation +// vm_stats.go stats, health, ping, stale reaper +// vm_disk.go system overlay, work disk provisioning +// vm_authsync.go per-VM authorized_key, git identity, and auth file sync +// vm_create_ops.go async begin/status/cancel registry for create +// capabilities.go pluggable capability hooks executed at VM start +// preflight.go prereq validation for VM start +// snapshot.go device-mapper COW snapshot helpers +// ports.go port forwarding inspection +// +// Image management: +// +// images.go register, promote, delete, find, list +// imagebuild.go build via firecracker build VM +// image_build_ops.go async begin/status/cancel registry for build +// image_seed.go managed work-seed fingerprint refresh +// +// Guest interaction: +// +// guest_sessions.go long-lived guest commands, attach, logs +// ssh_client_config.go daemon-managed SSH client key material +// workspace.go materialising host repos into guest +// opencode.go opencode host-side helpers +// +// Host bootstrap: +// +// nat.go NAT prereq registration +// dns_routing.go systemd-resolved per-interface routing +// tap_pool.go TAP interface pool +// +// Core: +// +// daemon.go Daemon struct, Open/Close/Serve, dispatch +// dashboard.go dashboard metrics aggregation +// doctor.go host diagnostics +// logger.go slog configuration +// runtime_assets.go paths to bundled companion binaries +// web.go embedded web UI server +// +// Lock ordering (current, pre-refactor): +// +// vmLocks[id] → mu → {createOpsMu, imageBuildOpsMu, tapPoolMu} +// +// The coarse mu currently guards unrelated state (session controllers, +// image registry mutations, in-flight VM create bookkeeping) and is the +// target of the Phase 2 split. See ARCHITECTURE.md for details. +package daemon diff --git a/internal/daemon/guest_sessions.go b/internal/daemon/guest_sessions.go index 5aa1e68..6dd3938 100644 --- a/internal/daemon/guest_sessions.go +++ b/internal/daemon/guest_sessions.go @@ -13,14 +13,11 @@ import ( "sort" "strconv" "strings" - "sync" "syscall" "time" - "banger/internal/api" "banger/internal/guest" "banger/internal/model" - "banger/internal/sessionstream" "banger/internal/system" "golang.org/x/crypto/ssh" @@ -80,622 +77,6 @@ func (d *Daemon) waitForGuestSessionReadyHook(ctx context.Context, vm model.VMRe return d.waitForGuestSessionReadyDefault(ctx, vm, session) } -type guestSessionController struct { - stream *guest.StreamSession - streams []*guest.StreamSession - stdin io.WriteCloser - attachMu sync.Mutex - attach net.Conn - writeMu sync.Mutex - closeOnce sync.Once -} - -func (c *guestSessionController) setAttach(conn net.Conn) error { - c.attachMu.Lock() - defer c.attachMu.Unlock() - if c.attach != nil { - return errors.New("session already has an active attach") - } - c.attach = conn - return nil -} - -func (c *guestSessionController) clearAttach(conn net.Conn) { - c.attachMu.Lock() - defer c.attachMu.Unlock() - if c.attach == conn { - c.attach = nil - } -} - -func (c *guestSessionController) writeFrame(channel byte, payload []byte) { - c.attachMu.Lock() - conn := c.attach - c.attachMu.Unlock() - if conn == nil { - return - } - c.writeMu.Lock() - err := sessionstream.WriteFrame(conn, channel, payload) - c.writeMu.Unlock() - if err != nil { - _ = conn.Close() - c.clearAttach(conn) - } -} - -func (c *guestSessionController) writeControl(message sessionstream.ControlMessage) { - c.attachMu.Lock() - conn := c.attach - c.attachMu.Unlock() - if conn == nil { - return - } - c.writeMu.Lock() - err := sessionstream.WriteControl(conn, message) - c.writeMu.Unlock() - if err != nil { - _ = conn.Close() - c.clearAttach(conn) - } -} - -func (c *guestSessionController) close() error { - if c == nil { - return nil - } - var err error - c.closeOnce.Do(func() { - c.attachMu.Lock() - conn := c.attach - c.attach = nil - c.attachMu.Unlock() - if conn != nil { - err = errors.Join(err, conn.Close()) - } - if c.stdin != nil { - err = errors.Join(err, c.stdin.Close()) - } - if c.stream != nil { - err = errors.Join(err, c.stream.Close()) - } - for _, stream := range c.streams { - if stream != nil { - err = errors.Join(err, stream.Close()) - } - } - }) - return err -} - -type guestSessionStateSnapshot struct { - Status string - GuestPID int - ExitCode *int - Alive bool - LastError string -} - -func (d *Daemon) StartGuestSession(ctx context.Context, params api.GuestSessionStartParams) (model.GuestSession, error) { - stdinMode := model.GuestSessionStdinMode(strings.TrimSpace(params.StdinMode)) - if stdinMode == "" { - stdinMode = model.GuestSessionStdinClosed - } - if stdinMode != model.GuestSessionStdinClosed && stdinMode != model.GuestSessionStdinPipe { - return model.GuestSession{}, fmt.Errorf("unsupported stdin mode %q", params.StdinMode) - } - if strings.TrimSpace(params.Command) == "" { - return model.GuestSession{}, errors.New("session command is required") - } - var created model.GuestSession - _, err := d.withVMLockByRef(ctx, params.VMIDOrName, func(vm model.VMRecord) (model.VMRecord, error) { - if vm.State != model.VMStateRunning || !system.ProcessRunning(vm.Runtime.PID, vm.Runtime.APISockPath) { - return model.VMRecord{}, fmt.Errorf("vm %q is not running", vm.Name) - } - session, err := d.startGuestSessionLocked(ctx, vm, params, stdinMode) - if err != nil { - return model.VMRecord{}, err - } - created = session - return vm, nil - }) - return created, err -} - -func (d *Daemon) startGuestSessionLocked(ctx context.Context, vm model.VMRecord, params api.GuestSessionStartParams, stdinMode model.GuestSessionStdinMode) (model.GuestSession, error) { - id, err := model.NewID() - if err != nil { - return model.GuestSession{}, err - } - now := model.Now() - session := model.GuestSession{ - ID: id, - VMID: vm.ID, - Name: defaultGuestSessionName(id, params.Command, params.Name), - Backend: guestSessionBackendSSH, - Command: params.Command, - Args: append([]string(nil), params.Args...), - CWD: strings.TrimSpace(params.CWD), - Env: cloneStringMap(params.Env), - StdinMode: stdinMode, - Status: model.GuestSessionStatusStarting, - GuestStateDir: guestSessionStateDir(id), - StdoutLogPath: guestSessionStdoutLogPath(id), - StderrLogPath: guestSessionStderrLogPath(id), - Tags: cloneStringMap(params.Tags), - Attachable: stdinMode == model.GuestSessionStdinPipe, - Reattachable: stdinMode == model.GuestSessionStdinPipe, - CreatedAt: now, - UpdatedAt: now, - } - if session.Attachable { - session.AttachBackend = guestSessionAttachBackendSSHBridge - session.AttachMode = guestSessionAttachModeExclusive - } else { - session.AttachBackend = guestSessionAttachBackendNone - } - if err := d.store.UpsertGuestSession(ctx, session); err != nil { - return model.GuestSession{}, err - } - fail := func(stage, message, rawLog string) (model.GuestSession, error) { - session = failGuestSessionLaunch(session, stage, message, rawLog) - if err := d.store.UpsertGuestSession(ctx, session); err != nil { - return model.GuestSession{}, err - } - return session, nil - } - address := net.JoinHostPort(vm.Runtime.GuestIP, "22") - if err := d.waitForGuestSSH(ctx, address, 250*time.Millisecond); err != nil { - return fail("ssh_unavailable", fmt.Sprintf("guest ssh unavailable: %v", err), "") - } - client, err := d.dialGuest(ctx, address) - if err != nil { - return fail("dial_guest", fmt.Sprintf("dial guest ssh: %v", err), "") - } - defer client.Close() - var preflightLog bytes.Buffer - if err := client.RunScript(ctx, guestSessionCWDPreflightScript(session.CWD), &preflightLog); err != nil { - return fail("preflight_cwd", fmt.Sprintf("guest working directory is unavailable: %s", defaultGuestSessionCWD(session.CWD)), preflightLog.String()) - } - preflightLog.Reset() - requiredCommands := normalizeGuestSessionRequiredCommands(params.Command, params.RequiredCommands) - if err := client.RunScript(ctx, guestSessionCommandPreflightScript(requiredCommands), &preflightLog); err != nil { - return fail("preflight_command", fmt.Sprintf("required guest command is unavailable: %s", strings.TrimSpace(preflightLog.String())), preflightLog.String()) - } - var uploadLog bytes.Buffer - if err := client.UploadFile(ctx, guestSessionScriptPath(id), 0o755, []byte(guestSessionScript(session)), &uploadLog); err != nil { - return fail("upload_script", "upload guest session script failed", uploadLog.String()) - } - var launchLog bytes.Buffer - launchScript := fmt.Sprintf("set -euo pipefail\nnohup bash %s >/dev/null 2>&1 > %s\nrm -f %s\n", - guestShellQuote(tmpPath), - guestShellQuote(guestSessionStdinPipePath(session.ID)), - guestShellQuote(tmpPath), - ) - var sendLog bytes.Buffer - if err := client.RunScript(ctx, sendScript, &sendLog); err != nil { - return api.GuestSessionSendResult{}, fmt.Errorf("send to session: %w: %s", err, strings.TrimSpace(sendLog.String())) - } - return api.GuestSessionSendResult{Session: session, BytesWritten: len(params.Payload)}, nil -} - -func (d *Daemon) BeginGuestSessionAttach(ctx context.Context, params api.GuestSessionAttachBeginParams) (api.GuestSessionAttachBeginResult, error) { - vm, err := d.FindVM(ctx, params.VMIDOrName) - if err != nil { - return api.GuestSessionAttachBeginResult{}, err - } - session, err := d.findGuestSession(ctx, vm.ID, params.SessionIDOrName) - if err != nil { - return api.GuestSessionAttachBeginResult{}, err - } - session, _ = d.refreshGuestSession(ctx, vm, session) - if !session.Attachable { - return api.GuestSessionAttachBeginResult{}, errors.New("session is not attachable") - } - controller := &guestSessionController{} - if !d.claimGuestSessionController(session.ID, controller) { - return api.GuestSessionAttachBeginResult{}, errors.New("session already has an active attach") - } - attachID, err := model.NewID() - if err != nil { - d.clearGuestSessionController(session.ID) - return api.GuestSessionAttachBeginResult{}, err - } - socketPath := filepath.Join(d.layout.RuntimeDir, "guest-session-attach-"+attachID[:12]+".sock") - _ = os.Remove(socketPath) - listener, err := net.Listen("unix", socketPath) - if err != nil { - d.clearGuestSessionController(session.ID) - return api.GuestSessionAttachBeginResult{}, err - } - if err := os.Chmod(socketPath, 0o600); err != nil { - _ = listener.Close() - _ = os.Remove(socketPath) - d.clearGuestSessionController(session.ID) - return api.GuestSessionAttachBeginResult{}, err - } - go d.serveGuestSessionAttach(session, controller, attachID, socketPath, listener) - return api.GuestSessionAttachBeginResult{ - Session: session, - AttachID: attachID, - TransportKind: guestSessionTransportUnixSocket, - TransportTarget: socketPath, - SocketPath: socketPath, - StreamFormat: sessionstream.FormatV1, - }, nil -} - -func (d *Daemon) setGuestSessionController(id string, controller *guestSessionController) { - d.mu.Lock() - defer d.mu.Unlock() - d.sessionControllers[id] = controller -} - -func (d *Daemon) claimGuestSessionController(id string, controller *guestSessionController) bool { - d.mu.Lock() - defer d.mu.Unlock() - if d.sessionControllers[id] != nil { - return false - } - d.sessionControllers[id] = controller - return true -} - -func (d *Daemon) getGuestSessionController(id string) *guestSessionController { - d.mu.Lock() - defer d.mu.Unlock() - return d.sessionControllers[id] -} - -func (d *Daemon) clearGuestSessionController(id string) *guestSessionController { - d.mu.Lock() - defer d.mu.Unlock() - controller := d.sessionControllers[id] - delete(d.sessionControllers, id) - return controller -} - -func (d *Daemon) closeGuestSessionControllers() error { - d.mu.Lock() - controllers := make([]*guestSessionController, 0, len(d.sessionControllers)) - for _, controller := range d.sessionControllers { - controllers = append(controllers, controller) - } - d.sessionControllers = nil - d.mu.Unlock() - var err error - for _, controller := range controllers { - err = errors.Join(err, controller.close()) - } - return err -} - -func (d *Daemon) forwardGuestSessionOutput(_ string, controller *guestSessionController, channel byte, reader io.Reader) { - buffer := make([]byte, 32*1024) - for { - n, err := reader.Read(buffer) - if n > 0 { - controller.writeFrame(channel, buffer[:n]) - } - if err != nil { - if !errors.Is(err, io.EOF) { - controller.writeControl(sessionstream.ControlMessage{Type: "error", Error: err.Error()}) - } - return - } - } -} - -func (d *Daemon) waitForGuestSessionExit(id string, controller *guestSessionController, session model.GuestSession) { - err := controller.stream.Wait() - updated := session - updated.Attachable = false - now := model.Now() - updated.UpdatedAt = now - updated.EndedAt = now - if exitCode, ok := guestSessionExitCode(err); ok { - updated.ExitCode = &exitCode - if exitCode == 0 { - updated.Status = model.GuestSessionStatusExited - } else { - updated.Status = model.GuestSessionStatusFailed - } - } - if err != nil && updated.LastError == "" { - updated.LastError = err.Error() - } - if vm, getErr := d.store.GetVMByID(context.Background(), updated.VMID); getErr == nil { - if refreshed, refreshErr := d.refreshGuestSession(context.Background(), vm, updated); refreshErr == nil { - updated = refreshed - } - } - _ = d.store.UpsertGuestSession(context.Background(), updated) - controller.writeControl(sessionstream.ControlMessage{Type: "exit", ExitCode: updated.ExitCode}) - _ = controller.close() - d.clearGuestSessionController(id) -} - -func (d *Daemon) serveGuestSessionAttach(session model.GuestSession, controller *guestSessionController, _ string, socketPath string, listener net.Listener) { - defer func() { - _ = listener.Close() - _ = os.Remove(socketPath) - _ = controller.close() - d.clearGuestSessionController(session.ID) - }() - conn, err := listener.Accept() - if err != nil { - return - } - defer conn.Close() - if err := controller.setAttach(conn); err != nil { - _ = sessionstream.WriteControl(conn, sessionstream.ControlMessage{Type: "error", Error: err.Error()}) - return - } - defer controller.clearAttach(conn) - if err := d.attachGuestSessionBridge(session, controller); err != nil { - _ = sessionstream.WriteControl(conn, sessionstream.ControlMessage{Type: "error", Error: err.Error()}) - return - } - for { - channel, payload, err := sessionstream.ReadFrame(conn) - if err != nil { - return - } - switch channel { - case sessionstream.ChannelStdin: - if controller.stdin == nil { - continue - } - if _, err := controller.stdin.Write(payload); err != nil { - _ = sessionstream.WriteControl(conn, sessionstream.ControlMessage{Type: "error", Error: err.Error()}) - return - } - case sessionstream.ChannelControl: - message, err := sessionstream.ReadControl(payload) - if err != nil { - _ = sessionstream.WriteControl(conn, sessionstream.ControlMessage{Type: "error", Error: err.Error()}) - return - } - if message.Type == "eof" && controller.stdin != nil { - _ = controller.stdin.Close() - } - } - } -} - -func (d *Daemon) attachGuestSessionBridge(session model.GuestSession, controller *guestSessionController) error { - vm, err := d.store.GetVMByID(context.Background(), session.VMID) - if err != nil { - return err - } - if vm.State != model.VMStateRunning || !system.ProcessRunning(vm.Runtime.PID, vm.Runtime.APISockPath) { - return fmt.Errorf("vm %q is not running", vm.Name) - } - address := net.JoinHostPort(vm.Runtime.GuestIP, "22") - stdinStream, err := d.openGuestSessionAttachStream(address, guestSessionAttachInputCommand(session.ID)) - if err != nil { - return fmt.Errorf("open guest session stdin stream: %w", err) - } - stdoutStream, err := d.openGuestSessionAttachStream(address, guestSessionAttachTailCommand(session.StdoutLogPath)) - if err != nil { - _ = stdinStream.Close() - return fmt.Errorf("open guest session stdout stream: %w", err) - } - stderrStream, err := d.openGuestSessionAttachStream(address, guestSessionAttachTailCommand(session.StderrLogPath)) - if err != nil { - _ = stdinStream.Close() - _ = stdoutStream.Close() - return fmt.Errorf("open guest session stderr stream: %w", err) - } - controller.streams = append(controller.streams, stdinStream, stdoutStream, stderrStream) - controller.stdin = stdinStream.Stdin() - go d.forwardGuestSessionOutput(session.ID, controller, sessionstream.ChannelStdout, stdoutStream.Stdout()) - go d.forwardGuestSessionOutput(session.ID, controller, sessionstream.ChannelStderr, stderrStream.Stdout()) - go d.watchGuestSessionAttach(session.ID, controller, session) - return nil -} - -func (d *Daemon) openGuestSessionAttachStream(address, command string) (*guest.StreamSession, error) { - client, err := guest.Dial(context.Background(), address, d.config.SSHKeyPath) - if err != nil { - return nil, err - } - stream, err := client.StartCommand(context.Background(), command) - if err != nil { - _ = client.Close() - return nil, err - } - return stream, nil -} - -func (d *Daemon) watchGuestSessionAttach(id string, controller *guestSessionController, session model.GuestSession) { - ticker := time.NewTicker(250 * time.Millisecond) - defer ticker.Stop() - for range ticker.C { - vm, err := d.store.GetVMByID(context.Background(), session.VMID) - if err != nil { - controller.writeControl(sessionstream.ControlMessage{Type: "error", Error: err.Error()}) - _ = controller.close() - return - } - refreshed, err := d.refreshGuestSession(context.Background(), vm, session) - if err == nil { - session = refreshed - } - if session.Status == model.GuestSessionStatusExited || session.Status == model.GuestSessionStatusFailed { - controller.writeControl(sessionstream.ControlMessage{Type: "exit", ExitCode: session.ExitCode}) - _ = controller.close() - return - } - } -} - func (d *Daemon) waitForGuestSessionReadyDefault(ctx context.Context, vm model.VMRecord, session model.GuestSession) (model.GuestSession, error) { for { updated, err := d.refreshGuestSession(ctx, vm, session) @@ -846,37 +227,6 @@ func inspectGuestSessionStateFromDir(stateDir string) (guestSessionStateSnapshot return snapshot, nil } -func (d *Daemon) readGuestSessionLog(ctx context.Context, vm model.VMRecord, session model.GuestSession, stream string, tailLines int) (string, error) { - if vm.State == model.VMStateRunning && system.ProcessRunning(vm.Runtime.PID, vm.Runtime.APISockPath) { - client, err := guest.Dial(ctx, net.JoinHostPort(vm.Runtime.GuestIP, "22"), d.config.SSHKeyPath) - if err != nil { - return "", err - } - defer client.Close() - path := session.StdoutLogPath - if stream == "stderr" { - path = session.StderrLogPath - } - var output bytes.Buffer - script := fmt.Sprintf("set -euo pipefail\nif [ -f %s ]; then tail -n %d %s; fi\n", guestShellQuote(path), tailLines, guestShellQuote(path)) - if err := client.RunScript(ctx, script, &output); err != nil { - return "", formatGuestSessionStepError("read guest session log", err, output.String()) - } - return output.String(), nil - } - runner := d.runner - if runner == nil { - runner = system.NewRunner() - } - workMount, cleanup, err := system.MountTempDir(ctx, runner, vm.Runtime.WorkDiskPath, false) - if err != nil { - return "", err - } - defer cleanup() - logPath := filepath.Join(workMount, guestSessionRelativeStateDir(session.ID), stream+".log") - return tailFileContent(logPath, tailLines) -} - func (d *Daemon) findGuestSession(ctx context.Context, vmID, idOrName string) (model.GuestSession, error) { if strings.TrimSpace(idOrName) == "" { return model.GuestSession{}, errors.New("session id or name is required") diff --git a/internal/daemon/session_attach.go b/internal/daemon/session_attach.go new file mode 100644 index 0000000..5a3c4a0 --- /dev/null +++ b/internal/daemon/session_attach.go @@ -0,0 +1,224 @@ +package daemon + +import ( + "context" + "errors" + "fmt" + "io" + "net" + "os" + "path/filepath" + "time" + + "banger/internal/api" + "banger/internal/guest" + "banger/internal/model" + "banger/internal/sessionstream" + "banger/internal/system" +) + +func (d *Daemon) BeginGuestSessionAttach(ctx context.Context, params api.GuestSessionAttachBeginParams) (api.GuestSessionAttachBeginResult, error) { + vm, err := d.FindVM(ctx, params.VMIDOrName) + if err != nil { + return api.GuestSessionAttachBeginResult{}, err + } + session, err := d.findGuestSession(ctx, vm.ID, params.SessionIDOrName) + if err != nil { + return api.GuestSessionAttachBeginResult{}, err + } + session, _ = d.refreshGuestSession(ctx, vm, session) + if !session.Attachable { + return api.GuestSessionAttachBeginResult{}, errors.New("session is not attachable") + } + controller := &guestSessionController{} + if !d.claimGuestSessionController(session.ID, controller) { + return api.GuestSessionAttachBeginResult{}, errors.New("session already has an active attach") + } + attachID, err := model.NewID() + if err != nil { + d.clearGuestSessionController(session.ID) + return api.GuestSessionAttachBeginResult{}, err + } + socketPath := filepath.Join(d.layout.RuntimeDir, "guest-session-attach-"+attachID[:12]+".sock") + _ = os.Remove(socketPath) + listener, err := net.Listen("unix", socketPath) + if err != nil { + d.clearGuestSessionController(session.ID) + return api.GuestSessionAttachBeginResult{}, err + } + if err := os.Chmod(socketPath, 0o600); err != nil { + _ = listener.Close() + _ = os.Remove(socketPath) + d.clearGuestSessionController(session.ID) + return api.GuestSessionAttachBeginResult{}, err + } + go d.serveGuestSessionAttach(session, controller, attachID, socketPath, listener) + return api.GuestSessionAttachBeginResult{ + Session: session, + AttachID: attachID, + TransportKind: guestSessionTransportUnixSocket, + TransportTarget: socketPath, + SocketPath: socketPath, + StreamFormat: sessionstream.FormatV1, + }, nil +} + +func (d *Daemon) forwardGuestSessionOutput(_ string, controller *guestSessionController, channel byte, reader io.Reader) { + buffer := make([]byte, 32*1024) + for { + n, err := reader.Read(buffer) + if n > 0 { + controller.writeFrame(channel, buffer[:n]) + } + if err != nil { + if !errors.Is(err, io.EOF) { + controller.writeControl(sessionstream.ControlMessage{Type: "error", Error: err.Error()}) + } + return + } + } +} + +func (d *Daemon) waitForGuestSessionExit(id string, controller *guestSessionController, session model.GuestSession) { + err := controller.stream.Wait() + updated := session + updated.Attachable = false + now := model.Now() + updated.UpdatedAt = now + updated.EndedAt = now + if exitCode, ok := guestSessionExitCode(err); ok { + updated.ExitCode = &exitCode + if exitCode == 0 { + updated.Status = model.GuestSessionStatusExited + } else { + updated.Status = model.GuestSessionStatusFailed + } + } + if err != nil && updated.LastError == "" { + updated.LastError = err.Error() + } + if vm, getErr := d.store.GetVMByID(context.Background(), updated.VMID); getErr == nil { + if refreshed, refreshErr := d.refreshGuestSession(context.Background(), vm, updated); refreshErr == nil { + updated = refreshed + } + } + _ = d.store.UpsertGuestSession(context.Background(), updated) + controller.writeControl(sessionstream.ControlMessage{Type: "exit", ExitCode: updated.ExitCode}) + _ = controller.close() + d.clearGuestSessionController(id) +} + +func (d *Daemon) serveGuestSessionAttach(session model.GuestSession, controller *guestSessionController, _ string, socketPath string, listener net.Listener) { + defer func() { + _ = listener.Close() + _ = os.Remove(socketPath) + _ = controller.close() + d.clearGuestSessionController(session.ID) + }() + conn, err := listener.Accept() + if err != nil { + return + } + defer conn.Close() + if err := controller.setAttach(conn); err != nil { + _ = sessionstream.WriteControl(conn, sessionstream.ControlMessage{Type: "error", Error: err.Error()}) + return + } + defer controller.clearAttach(conn) + if err := d.attachGuestSessionBridge(session, controller); err != nil { + _ = sessionstream.WriteControl(conn, sessionstream.ControlMessage{Type: "error", Error: err.Error()}) + return + } + for { + channel, payload, err := sessionstream.ReadFrame(conn) + if err != nil { + return + } + switch channel { + case sessionstream.ChannelStdin: + if controller.stdin == nil { + continue + } + if _, err := controller.stdin.Write(payload); err != nil { + _ = sessionstream.WriteControl(conn, sessionstream.ControlMessage{Type: "error", Error: err.Error()}) + return + } + case sessionstream.ChannelControl: + message, err := sessionstream.ReadControl(payload) + if err != nil { + _ = sessionstream.WriteControl(conn, sessionstream.ControlMessage{Type: "error", Error: err.Error()}) + return + } + if message.Type == "eof" && controller.stdin != nil { + _ = controller.stdin.Close() + } + } + } +} + +func (d *Daemon) attachGuestSessionBridge(session model.GuestSession, controller *guestSessionController) error { + vm, err := d.store.GetVMByID(context.Background(), session.VMID) + if err != nil { + return err + } + if vm.State != model.VMStateRunning || !system.ProcessRunning(vm.Runtime.PID, vm.Runtime.APISockPath) { + return fmt.Errorf("vm %q is not running", vm.Name) + } + address := net.JoinHostPort(vm.Runtime.GuestIP, "22") + stdinStream, err := d.openGuestSessionAttachStream(address, guestSessionAttachInputCommand(session.ID)) + if err != nil { + return fmt.Errorf("open guest session stdin stream: %w", err) + } + stdoutStream, err := d.openGuestSessionAttachStream(address, guestSessionAttachTailCommand(session.StdoutLogPath)) + if err != nil { + _ = stdinStream.Close() + return fmt.Errorf("open guest session stdout stream: %w", err) + } + stderrStream, err := d.openGuestSessionAttachStream(address, guestSessionAttachTailCommand(session.StderrLogPath)) + if err != nil { + _ = stdinStream.Close() + _ = stdoutStream.Close() + return fmt.Errorf("open guest session stderr stream: %w", err) + } + controller.streams = append(controller.streams, stdinStream, stdoutStream, stderrStream) + controller.stdin = stdinStream.Stdin() + go d.forwardGuestSessionOutput(session.ID, controller, sessionstream.ChannelStdout, stdoutStream.Stdout()) + go d.forwardGuestSessionOutput(session.ID, controller, sessionstream.ChannelStderr, stderrStream.Stdout()) + go d.watchGuestSessionAttach(session.ID, controller, session) + return nil +} + +func (d *Daemon) openGuestSessionAttachStream(address, command string) (*guest.StreamSession, error) { + client, err := guest.Dial(context.Background(), address, d.config.SSHKeyPath) + if err != nil { + return nil, err + } + stream, err := client.StartCommand(context.Background(), command) + if err != nil { + _ = client.Close() + return nil, err + } + return stream, nil +} + +func (d *Daemon) watchGuestSessionAttach(id string, controller *guestSessionController, session model.GuestSession) { + ticker := time.NewTicker(250 * time.Millisecond) + defer ticker.Stop() + for range ticker.C { + vm, err := d.store.GetVMByID(context.Background(), session.VMID) + if err != nil { + controller.writeControl(sessionstream.ControlMessage{Type: "error", Error: err.Error()}) + _ = controller.close() + return + } + refreshed, err := d.refreshGuestSession(context.Background(), vm, session) + if err == nil { + session = refreshed + } + if session.Status == model.GuestSessionStatusExited || session.Status == model.GuestSessionStatusFailed { + controller.writeControl(sessionstream.ControlMessage{Type: "exit", ExitCode: session.ExitCode}) + _ = controller.close() + return + } + } +} diff --git a/internal/daemon/session_controller.go b/internal/daemon/session_controller.go new file mode 100644 index 0000000..19a2860 --- /dev/null +++ b/internal/daemon/session_controller.go @@ -0,0 +1,152 @@ +package daemon + +import ( + "errors" + "io" + "net" + "sync" + + "banger/internal/guest" + "banger/internal/sessionstream" +) + +type guestSessionController struct { + stream *guest.StreamSession + streams []*guest.StreamSession + stdin io.WriteCloser + attachMu sync.Mutex + attach net.Conn + writeMu sync.Mutex + closeOnce sync.Once +} + +func (c *guestSessionController) setAttach(conn net.Conn) error { + c.attachMu.Lock() + defer c.attachMu.Unlock() + if c.attach != nil { + return errors.New("session already has an active attach") + } + c.attach = conn + return nil +} + +func (c *guestSessionController) clearAttach(conn net.Conn) { + c.attachMu.Lock() + defer c.attachMu.Unlock() + if c.attach == conn { + c.attach = nil + } +} + +func (c *guestSessionController) writeFrame(channel byte, payload []byte) { + c.attachMu.Lock() + conn := c.attach + c.attachMu.Unlock() + if conn == nil { + return + } + c.writeMu.Lock() + err := sessionstream.WriteFrame(conn, channel, payload) + c.writeMu.Unlock() + if err != nil { + _ = conn.Close() + c.clearAttach(conn) + } +} + +func (c *guestSessionController) writeControl(message sessionstream.ControlMessage) { + c.attachMu.Lock() + conn := c.attach + c.attachMu.Unlock() + if conn == nil { + return + } + c.writeMu.Lock() + err := sessionstream.WriteControl(conn, message) + c.writeMu.Unlock() + if err != nil { + _ = conn.Close() + c.clearAttach(conn) + } +} + +func (c *guestSessionController) close() error { + if c == nil { + return nil + } + var err error + c.closeOnce.Do(func() { + c.attachMu.Lock() + conn := c.attach + c.attach = nil + c.attachMu.Unlock() + if conn != nil { + err = errors.Join(err, conn.Close()) + } + if c.stdin != nil { + err = errors.Join(err, c.stdin.Close()) + } + if c.stream != nil { + err = errors.Join(err, c.stream.Close()) + } + for _, stream := range c.streams { + if stream != nil { + err = errors.Join(err, stream.Close()) + } + } + }) + return err +} + +type guestSessionStateSnapshot struct { + Status string + GuestPID int + ExitCode *int + Alive bool + LastError string +} + +func (d *Daemon) setGuestSessionController(id string, controller *guestSessionController) { + d.mu.Lock() + defer d.mu.Unlock() + d.sessionControllers[id] = controller +} + +func (d *Daemon) claimGuestSessionController(id string, controller *guestSessionController) bool { + d.mu.Lock() + defer d.mu.Unlock() + if d.sessionControllers[id] != nil { + return false + } + d.sessionControllers[id] = controller + return true +} + +func (d *Daemon) getGuestSessionController(id string) *guestSessionController { + d.mu.Lock() + defer d.mu.Unlock() + return d.sessionControllers[id] +} + +func (d *Daemon) clearGuestSessionController(id string) *guestSessionController { + d.mu.Lock() + defer d.mu.Unlock() + controller := d.sessionControllers[id] + delete(d.sessionControllers, id) + return controller +} + +func (d *Daemon) closeGuestSessionControllers() error { + d.mu.Lock() + controllers := make([]*guestSessionController, 0, len(d.sessionControllers)) + for _, controller := range d.sessionControllers { + controllers = append(controllers, controller) + } + d.sessionControllers = nil + d.mu.Unlock() + var err error + for _, controller := range controllers { + err = errors.Join(err, controller.close()) + } + return err +} diff --git a/internal/daemon/session_lifecycle.go b/internal/daemon/session_lifecycle.go new file mode 100644 index 0000000..3ca56b4 --- /dev/null +++ b/internal/daemon/session_lifecycle.go @@ -0,0 +1,213 @@ +package daemon + +import ( + "bytes" + "context" + "errors" + "fmt" + "net" + "strings" + "time" + + "banger/internal/api" + "banger/internal/guest" + "banger/internal/model" + "banger/internal/system" +) + +func (d *Daemon) StartGuestSession(ctx context.Context, params api.GuestSessionStartParams) (model.GuestSession, error) { + stdinMode := model.GuestSessionStdinMode(strings.TrimSpace(params.StdinMode)) + if stdinMode == "" { + stdinMode = model.GuestSessionStdinClosed + } + if stdinMode != model.GuestSessionStdinClosed && stdinMode != model.GuestSessionStdinPipe { + return model.GuestSession{}, fmt.Errorf("unsupported stdin mode %q", params.StdinMode) + } + if strings.TrimSpace(params.Command) == "" { + return model.GuestSession{}, errors.New("session command is required") + } + var created model.GuestSession + _, err := d.withVMLockByRef(ctx, params.VMIDOrName, func(vm model.VMRecord) (model.VMRecord, error) { + if vm.State != model.VMStateRunning || !system.ProcessRunning(vm.Runtime.PID, vm.Runtime.APISockPath) { + return model.VMRecord{}, fmt.Errorf("vm %q is not running", vm.Name) + } + session, err := d.startGuestSessionLocked(ctx, vm, params, stdinMode) + if err != nil { + return model.VMRecord{}, err + } + created = session + return vm, nil + }) + return created, err +} + +func (d *Daemon) startGuestSessionLocked(ctx context.Context, vm model.VMRecord, params api.GuestSessionStartParams, stdinMode model.GuestSessionStdinMode) (model.GuestSession, error) { + id, err := model.NewID() + if err != nil { + return model.GuestSession{}, err + } + now := model.Now() + session := model.GuestSession{ + ID: id, + VMID: vm.ID, + Name: defaultGuestSessionName(id, params.Command, params.Name), + Backend: guestSessionBackendSSH, + Command: params.Command, + Args: append([]string(nil), params.Args...), + CWD: strings.TrimSpace(params.CWD), + Env: cloneStringMap(params.Env), + StdinMode: stdinMode, + Status: model.GuestSessionStatusStarting, + GuestStateDir: guestSessionStateDir(id), + StdoutLogPath: guestSessionStdoutLogPath(id), + StderrLogPath: guestSessionStderrLogPath(id), + Tags: cloneStringMap(params.Tags), + Attachable: stdinMode == model.GuestSessionStdinPipe, + Reattachable: stdinMode == model.GuestSessionStdinPipe, + CreatedAt: now, + UpdatedAt: now, + } + if session.Attachable { + session.AttachBackend = guestSessionAttachBackendSSHBridge + session.AttachMode = guestSessionAttachModeExclusive + } else { + session.AttachBackend = guestSessionAttachBackendNone + } + if err := d.store.UpsertGuestSession(ctx, session); err != nil { + return model.GuestSession{}, err + } + fail := func(stage, message, rawLog string) (model.GuestSession, error) { + session = failGuestSessionLaunch(session, stage, message, rawLog) + if err := d.store.UpsertGuestSession(ctx, session); err != nil { + return model.GuestSession{}, err + } + return session, nil + } + address := net.JoinHostPort(vm.Runtime.GuestIP, "22") + if err := d.waitForGuestSSH(ctx, address, 250*time.Millisecond); err != nil { + return fail("ssh_unavailable", fmt.Sprintf("guest ssh unavailable: %v", err), "") + } + client, err := d.dialGuest(ctx, address) + if err != nil { + return fail("dial_guest", fmt.Sprintf("dial guest ssh: %v", err), "") + } + defer client.Close() + var preflightLog bytes.Buffer + if err := client.RunScript(ctx, guestSessionCWDPreflightScript(session.CWD), &preflightLog); err != nil { + return fail("preflight_cwd", fmt.Sprintf("guest working directory is unavailable: %s", defaultGuestSessionCWD(session.CWD)), preflightLog.String()) + } + preflightLog.Reset() + requiredCommands := normalizeGuestSessionRequiredCommands(params.Command, params.RequiredCommands) + if err := client.RunScript(ctx, guestSessionCommandPreflightScript(requiredCommands), &preflightLog); err != nil { + return fail("preflight_command", fmt.Sprintf("required guest command is unavailable: %s", strings.TrimSpace(preflightLog.String())), preflightLog.String()) + } + var uploadLog bytes.Buffer + if err := client.UploadFile(ctx, guestSessionScriptPath(id), 0o755, []byte(guestSessionScript(session)), &uploadLog); err != nil { + return fail("upload_script", "upload guest session script failed", uploadLog.String()) + } + var launchLog bytes.Buffer + launchScript := fmt.Sprintf("set -euo pipefail\nnohup bash %s >/dev/null 2>&1 > %s\nrm -f %s\n", + guestShellQuote(tmpPath), + guestShellQuote(guestSessionStdinPipePath(session.ID)), + guestShellQuote(tmpPath), + ) + var sendLog bytes.Buffer + if err := client.RunScript(ctx, sendScript, &sendLog); err != nil { + return api.GuestSessionSendResult{}, fmt.Errorf("send to session: %w: %s", err, strings.TrimSpace(sendLog.String())) + } + return api.GuestSessionSendResult{Session: session, BytesWritten: len(params.Payload)}, nil +} + +func (d *Daemon) readGuestSessionLog(ctx context.Context, vm model.VMRecord, session model.GuestSession, stream string, tailLines int) (string, error) { + if vm.State == model.VMStateRunning && system.ProcessRunning(vm.Runtime.PID, vm.Runtime.APISockPath) { + client, err := guest.Dial(ctx, net.JoinHostPort(vm.Runtime.GuestIP, "22"), d.config.SSHKeyPath) + if err != nil { + return "", err + } + defer client.Close() + path := session.StdoutLogPath + if stream == "stderr" { + path = session.StderrLogPath + } + var output bytes.Buffer + script := fmt.Sprintf("set -euo pipefail\nif [ -f %s ]; then tail -n %d %s; fi\n", guestShellQuote(path), tailLines, guestShellQuote(path)) + if err := client.RunScript(ctx, script, &output); err != nil { + return "", formatGuestSessionStepError("read guest session log", err, output.String()) + } + return output.String(), nil + } + runner := d.runner + if runner == nil { + runner = system.NewRunner() + } + workMount, cleanup, err := system.MountTempDir(ctx, runner, vm.Runtime.WorkDiskPath, false) + if err != nil { + return "", err + } + defer cleanup() + logPath := filepath.Join(workMount, guestSessionRelativeStateDir(session.ID), stream+".log") + return tailFileContent(logPath, tailLines) +} diff --git a/internal/daemon/vm.go b/internal/daemon/vm.go index 21e3836..a3d9a1b 100644 --- a/internal/daemon/vm.go +++ b/internal/daemon/vm.go @@ -12,11 +12,7 @@ import ( "strings" "time" - "banger/internal/api" "banger/internal/firecracker" - "banger/internal/guest" - "banger/internal/guestconfig" - "banger/internal/guestnet" "banger/internal/model" "banger/internal/namegen" "banger/internal/system" @@ -31,1198 +27,6 @@ var ( vsockReadyPoll = 200 * time.Millisecond ) -const ( - workDiskGitConfigRelativePath = ".gitconfig" - workDiskOpencodeAuthDirRelativePath = ".local/share/opencode" - workDiskOpencodeAuthRelativePath = workDiskOpencodeAuthDirRelativePath + "/auth.json" - workDiskClaudeAuthDirRelativePath = ".claude" - workDiskClaudeAuthRelativePath = workDiskClaudeAuthDirRelativePath + "/.credentials.json" - workDiskPiAuthDirRelativePath = ".pi/agent" - workDiskPiAuthRelativePath = workDiskPiAuthDirRelativePath + "/auth.json" - hostGlobalGitIdentitySource = "git config --global" - hostOpencodeAuthDefaultDisplayPath = "~/" + workDiskOpencodeAuthRelativePath - hostClaudeAuthDefaultDisplayPath = "~/" + workDiskClaudeAuthRelativePath - hostPiAuthDefaultDisplayPath = "~/" + workDiskPiAuthRelativePath -) - -type gitIdentity struct { - Name string - Email string -} - -func (d *Daemon) CreateVM(ctx context.Context, params api.VMCreateParams) (vm model.VMRecord, err error) { - d.mu.Lock() - defer d.mu.Unlock() - op := d.beginOperation("vm.create") - defer func() { - if err != nil { - op.fail(err) - return - } - op.done(vmLogAttrs(vm)...) - }() - if err := validateOptionalPositiveSetting("vcpu", params.VCPUCount); err != nil { - return model.VMRecord{}, err - } - if err := validateOptionalPositiveSetting("memory", params.MemoryMiB); err != nil { - return model.VMRecord{}, err - } - - imageName := params.ImageName - if imageName == "" { - imageName = d.config.DefaultImageName - } - vmCreateStage(ctx, "resolve_image", "resolving image") - image, err := d.FindImage(ctx, imageName) - if err != nil { - return model.VMRecord{}, err - } - vmCreateStage(ctx, "resolve_image", "using image "+image.Name) - op.stage("image_resolved", imageLogAttrs(image)...) - name := strings.TrimSpace(params.Name) - if name == "" { - name, err = d.generateName(ctx) - if err != nil { - return model.VMRecord{}, err - } - } - if _, err := d.FindVM(ctx, name); err == nil { - return model.VMRecord{}, fmt.Errorf("vm name already exists: %s", name) - } - id, err := model.NewID() - if err != nil { - return model.VMRecord{}, err - } - unlockVM := d.lockVMID(id) - defer unlockVM() - guestIP, err := d.store.NextGuestIP(ctx, bridgePrefix(d.config.BridgeIP)) - if err != nil { - return model.VMRecord{}, err - } - vmDir := filepath.Join(d.layout.VMsDir, id) - if err := os.MkdirAll(vmDir, 0o755); err != nil { - return model.VMRecord{}, err - } - vsockCID, err := defaultVSockCID(guestIP) - if err != nil { - return model.VMRecord{}, err - } - systemOverlaySize := int64(model.DefaultSystemOverlaySize) - if params.SystemOverlaySize != "" { - systemOverlaySize, err = model.ParseSize(params.SystemOverlaySize) - if err != nil { - return model.VMRecord{}, err - } - } - workDiskSize := int64(model.DefaultWorkDiskSize) - if params.WorkDiskSize != "" { - workDiskSize, err = model.ParseSize(params.WorkDiskSize) - if err != nil { - return model.VMRecord{}, err - } - } - now := model.Now() - spec := model.VMSpec{ - VCPUCount: optionalIntOrDefault(params.VCPUCount, model.DefaultVCPUCount), - MemoryMiB: optionalIntOrDefault(params.MemoryMiB, model.DefaultMemoryMiB), - SystemOverlaySizeByte: systemOverlaySize, - WorkDiskSizeBytes: workDiskSize, - NATEnabled: params.NATEnabled, - } - vm = model.VMRecord{ - ID: id, - Name: name, - ImageID: image.ID, - State: model.VMStateCreated, - CreatedAt: now, - UpdatedAt: now, - LastTouchedAt: now, - Spec: spec, - Runtime: model.VMRuntime{ - State: model.VMStateCreated, - GuestIP: guestIP, - DNSName: vmdns.RecordName(name), - VMDir: vmDir, - VSockPath: defaultVSockPath(d.layout.RuntimeDir, id), - VSockCID: vsockCID, - SystemOverlay: filepath.Join(vmDir, "system.cow"), - WorkDiskPath: filepath.Join(vmDir, "root.ext4"), - LogPath: filepath.Join(vmDir, "firecracker.log"), - MetricsPath: filepath.Join(vmDir, "metrics.json"), - }, - } - vmCreateBindVM(ctx, vm) - vmCreateStage(ctx, "reserve_vm", fmt.Sprintf("allocated %s (%s)", vm.Name, vm.Runtime.GuestIP)) - if err := d.store.UpsertVM(ctx, vm); err != nil { - return model.VMRecord{}, err - } - op.stage("persisted", vmLogAttrs(vm)...) - if params.NoStart { - vm.State = model.VMStateStopped - vm.Runtime.State = model.VMStateStopped - if err := d.store.UpsertVM(ctx, vm); err != nil { - return model.VMRecord{}, err - } - return vm, nil - } - return d.startVMLocked(ctx, vm, image) -} - -func (d *Daemon) StartVM(ctx context.Context, idOrName string) (model.VMRecord, error) { - return d.withVMLockByRef(ctx, idOrName, func(vm model.VMRecord) (model.VMRecord, error) { - image, err := d.store.GetImageByID(ctx, vm.ImageID) - if err != nil { - return model.VMRecord{}, err - } - if vm.State == model.VMStateRunning && system.ProcessRunning(vm.Runtime.PID, vm.Runtime.APISockPath) { - if d.logger != nil { - d.logger.Info("vm already running", vmLogAttrs(vm)...) - } - return vm, nil - } - return d.startVMLocked(ctx, vm, image) - }) -} - -func (d *Daemon) startVMLocked(ctx context.Context, vm model.VMRecord, image model.Image) (_ model.VMRecord, err error) { - op := d.beginOperation("vm.start", append(vmLogAttrs(vm), imageLogAttrs(image)...)...) - defer func() { - if err != nil { - err = annotateLogPath(err, vm.Runtime.LogPath) - op.fail(err, vmLogAttrs(vm)...) - return - } - op.done(vmLogAttrs(vm)...) - }() - op.stage("preflight") - vmCreateStage(ctx, "preflight", "checking host prerequisites") - if err := d.validateStartPrereqs(ctx, vm, image); err != nil { - return model.VMRecord{}, err - } - if err := os.MkdirAll(vm.Runtime.VMDir, 0o755); err != nil { - return model.VMRecord{}, err - } - op.stage("cleanup_runtime") - if err := d.cleanupRuntime(ctx, vm, true); err != nil { - return model.VMRecord{}, err - } - clearRuntimeHandles(&vm) - op.stage("bridge") - if err := d.ensureBridge(ctx); err != nil { - return model.VMRecord{}, err - } - op.stage("socket_dir") - if err := d.ensureSocketDir(); err != nil { - return model.VMRecord{}, err - } - - shortID := system.ShortID(vm.ID) - apiSock := filepath.Join(d.layout.RuntimeDir, "fc-"+shortID+".sock") - dmName := "fc-rootfs-" + shortID - tapName := "tap-fc-" + shortID - if strings.TrimSpace(vm.Runtime.VSockPath) == "" { - vm.Runtime.VSockPath = defaultVSockPath(d.layout.RuntimeDir, vm.ID) - } - if vm.Runtime.VSockCID == 0 { - vm.Runtime.VSockCID, err = defaultVSockCID(vm.Runtime.GuestIP) - if err != nil { - return model.VMRecord{}, err - } - } - if err := os.RemoveAll(apiSock); err != nil && !os.IsNotExist(err) { - return model.VMRecord{}, err - } - if err := os.RemoveAll(vm.Runtime.VSockPath); err != nil && !os.IsNotExist(err) { - return model.VMRecord{}, err - } - - op.stage("system_overlay", "overlay_path", vm.Runtime.SystemOverlay) - vmCreateStage(ctx, "prepare_rootfs", "preparing system overlay") - if err := d.ensureSystemOverlay(ctx, &vm); err != nil { - return model.VMRecord{}, err - } - - op.stage("dm_snapshot", "dm_name", dmName) - vmCreateStage(ctx, "prepare_rootfs", "creating root filesystem snapshot") - handles, err := d.createDMSnapshot(ctx, image.RootfsPath, vm.Runtime.SystemOverlay, dmName) - if err != nil { - return model.VMRecord{}, err - } - vm.Runtime.BaseLoop = handles.BaseLoop - vm.Runtime.COWLoop = handles.COWLoop - vm.Runtime.DMName = handles.DMName - vm.Runtime.DMDev = handles.DMDev - vm.Runtime.APISockPath = apiSock - vm.Runtime.State = model.VMStateRunning - vm.State = model.VMStateRunning - vm.Runtime.LastError = "" - - cleanupOnErr := func(err error) (model.VMRecord, error) { - vm.State = model.VMStateError - vm.Runtime.State = model.VMStateError - vm.Runtime.LastError = err.Error() - op.stage("cleanup_after_failure", "error", err.Error()) - if cleanupErr := d.cleanupRuntime(context.Background(), vm, true); cleanupErr != nil { - err = errors.Join(err, cleanupErr) - } - clearRuntimeHandles(&vm) - _ = d.store.UpsertVM(context.Background(), vm) - return model.VMRecord{}, err - } - - op.stage("patch_root_overlay") - vmCreateStage(ctx, "prepare_rootfs", "writing guest configuration") - if err := d.patchRootOverlay(ctx, vm, image); err != nil { - return cleanupOnErr(err) - } - op.stage("prepare_host_features") - vmCreateStage(ctx, "prepare_host_features", "preparing host-side vm features") - if err := d.prepareCapabilityHosts(ctx, &vm, image); err != nil { - return cleanupOnErr(err) - } - op.stage("tap") - tap, err := d.acquireTap(ctx, tapName) - if err != nil { - return cleanupOnErr(err) - } - vm.Runtime.TapDevice = tap - op.stage("metrics_file", "metrics_path", vm.Runtime.MetricsPath) - if err := os.WriteFile(vm.Runtime.MetricsPath, nil, 0o644); err != nil { - return cleanupOnErr(err) - } - - op.stage("firecracker_binary") - fcPath, err := d.firecrackerBinary() - if err != nil { - return cleanupOnErr(err) - } - op.stage("firecracker_launch", "log_path", vm.Runtime.LogPath, "metrics_path", vm.Runtime.MetricsPath) - vmCreateStage(ctx, "boot_firecracker", "starting firecracker") - machineConfig := firecracker.MachineConfig{ - BinaryPath: fcPath, - VMID: vm.ID, - SocketPath: apiSock, - LogPath: vm.Runtime.LogPath, - MetricsPath: vm.Runtime.MetricsPath, - KernelImagePath: image.KernelPath, - InitrdPath: image.InitrdPath, - KernelArgs: system.BuildBootArgs(vm.Name), - Drives: []firecracker.DriveConfig{{ - ID: "rootfs", - Path: vm.Runtime.DMDev, - ReadOnly: false, - IsRoot: true, - }}, - TapDevice: tap, - VSockPath: vm.Runtime.VSockPath, - VSockCID: vm.Runtime.VSockCID, - VCPUCount: vm.Spec.VCPUCount, - MemoryMiB: vm.Spec.MemoryMiB, - Logger: d.logger, - } - d.contributeMachineConfig(&machineConfig, vm, image) - machine, err := firecracker.NewMachine(ctx, machineConfig) - if err != nil { - return cleanupOnErr(err) - } - if err := machine.Start(ctx); err != nil { - // Use a fresh context: the request ctx may already be cancelled (client - // disconnect), but we still need the PID so cleanupRuntime can kill the - // Firecracker process that was spawned before the failure. - vm.Runtime.PID = d.resolveFirecrackerPID(context.Background(), machine, apiSock) - return cleanupOnErr(err) - } - vm.Runtime.PID = d.resolveFirecrackerPID(context.Background(), machine, apiSock) - op.debugStage("firecracker_started", "pid", vm.Runtime.PID) - op.stage("socket_access", "api_socket", apiSock) - if err := d.ensureSocketAccess(ctx, apiSock, "firecracker api socket"); err != nil { - return cleanupOnErr(err) - } - op.stage("vsock_access", "vsock_path", vm.Runtime.VSockPath, "vsock_cid", vm.Runtime.VSockCID) - if err := d.ensureSocketAccess(ctx, vm.Runtime.VSockPath, "firecracker vsock socket"); err != nil { - return cleanupOnErr(err) - } - vmCreateStage(ctx, "wait_vsock_agent", "waiting for guest vsock agent") - if err := waitForGuestVSockAgent(ctx, d.logger, vm.Runtime.VSockPath, vsockReadyWait); err != nil { - return cleanupOnErr(err) - } - op.stage("post_start_features") - vmCreateStage(ctx, "wait_guest_ready", "waiting for guest services") - if err := d.postStartCapabilities(ctx, vm, image); err != nil { - return cleanupOnErr(err) - } - system.TouchNow(&vm) - op.stage("persist") - vmCreateStage(ctx, "finalize", "saving vm state") - if err := d.store.UpsertVM(ctx, vm); err != nil { - return cleanupOnErr(err) - } - return vm, nil -} - -func (d *Daemon) StopVM(ctx context.Context, idOrName string) (model.VMRecord, error) { - return d.withVMLockByRef(ctx, idOrName, func(vm model.VMRecord) (model.VMRecord, error) { - return d.stopVMLocked(ctx, vm) - }) -} - -func (d *Daemon) stopVMLocked(ctx context.Context, current model.VMRecord) (vm model.VMRecord, err error) { - vm = current - op := d.beginOperation("vm.stop", "vm_ref", vm.ID) - defer func() { - if err != nil { - op.fail(err, vmLogAttrs(vm)...) - return - } - op.done(vmLogAttrs(vm)...) - }() - if vm.State != model.VMStateRunning || !system.ProcessRunning(vm.Runtime.PID, vm.Runtime.APISockPath) { - op.stage("cleanup_stale_runtime") - if err := d.cleanupRuntime(ctx, vm, true); err != nil { - return model.VMRecord{}, err - } - vm.State = model.VMStateStopped - vm.Runtime.State = model.VMStateStopped - clearRuntimeHandles(&vm) - if err := d.store.UpsertVM(ctx, vm); err != nil { - return model.VMRecord{}, err - } - return vm, nil - } - op.stage("graceful_shutdown") - if err := d.sendCtrlAltDel(ctx, vm); err != nil { - return model.VMRecord{}, err - } - op.stage("wait_for_exit", "pid", vm.Runtime.PID) - if err := d.waitForExit(ctx, vm.Runtime.PID, vm.Runtime.APISockPath, gracefulShutdownWait); err != nil { - if !errors.Is(err, errWaitForExitTimeout) { - return model.VMRecord{}, err - } - op.stage("graceful_shutdown_timeout", "pid", vm.Runtime.PID) - } - op.stage("cleanup_runtime") - if err := d.cleanupRuntime(ctx, vm, true); err != nil { - return model.VMRecord{}, err - } - vm.State = model.VMStateStopped - vm.Runtime.State = model.VMStateStopped - clearRuntimeHandles(&vm) - system.TouchNow(&vm) - if err := d.store.UpsertVM(ctx, vm); err != nil { - return model.VMRecord{}, err - } - return vm, nil -} - -func (d *Daemon) KillVM(ctx context.Context, params api.VMKillParams) (model.VMRecord, error) { - return d.withVMLockByRef(ctx, params.IDOrName, func(vm model.VMRecord) (model.VMRecord, error) { - return d.killVMLocked(ctx, vm, params.Signal) - }) -} - -func (d *Daemon) killVMLocked(ctx context.Context, current model.VMRecord, signalValue string) (vm model.VMRecord, err error) { - vm = current - op := d.beginOperation("vm.kill", "vm_ref", vm.ID, "signal", signalValue) - defer func() { - if err != nil { - op.fail(err, vmLogAttrs(vm)...) - return - } - op.done(vmLogAttrs(vm)...) - }() - if vm.State != model.VMStateRunning || !system.ProcessRunning(vm.Runtime.PID, vm.Runtime.APISockPath) { - op.stage("cleanup_stale_runtime") - if err := d.cleanupRuntime(ctx, vm, true); err != nil { - return model.VMRecord{}, err - } - vm.State = model.VMStateStopped - vm.Runtime.State = model.VMStateStopped - clearRuntimeHandles(&vm) - if err := d.store.UpsertVM(ctx, vm); err != nil { - return model.VMRecord{}, err - } - return vm, nil - } - - signal := strings.TrimSpace(signalValue) - if signal == "" { - signal = "TERM" - } - op.stage("send_signal", "pid", vm.Runtime.PID, "signal", signal) - if _, err := d.runner.RunSudo(ctx, "kill", "-"+signal, strconv.Itoa(vm.Runtime.PID)); err != nil { - return model.VMRecord{}, err - } - op.stage("wait_for_exit", "pid", vm.Runtime.PID) - if err := d.waitForExit(ctx, vm.Runtime.PID, vm.Runtime.APISockPath, 30*time.Second); err != nil { - if !errors.Is(err, errWaitForExitTimeout) { - return model.VMRecord{}, err - } - op.stage("signal_timeout", "pid", vm.Runtime.PID, "signal", signal) - } - op.stage("cleanup_runtime") - if err := d.cleanupRuntime(ctx, vm, true); err != nil { - return model.VMRecord{}, err - } - vm.State = model.VMStateStopped - vm.Runtime.State = model.VMStateStopped - clearRuntimeHandles(&vm) - system.TouchNow(&vm) - if err := d.store.UpsertVM(ctx, vm); err != nil { - return model.VMRecord{}, err - } - return vm, nil -} - -func (d *Daemon) RestartVM(ctx context.Context, idOrName string) (vm model.VMRecord, err error) { - op := d.beginOperation("vm.restart", "vm_ref", idOrName) - defer func() { - if err != nil { - op.fail(err, vmLogAttrs(vm)...) - return - } - op.done(vmLogAttrs(vm)...) - }() - resolved, err := d.FindVM(ctx, idOrName) - if err != nil { - return model.VMRecord{}, err - } - return d.withVMLockByID(ctx, resolved.ID, func(vm model.VMRecord) (model.VMRecord, error) { - op.stage("stop") - vm, err = d.stopVMLocked(ctx, vm) - if err != nil { - return model.VMRecord{}, err - } - image, err := d.store.GetImageByID(ctx, vm.ImageID) - if err != nil { - return model.VMRecord{}, err - } - op.stage("start", vmLogAttrs(vm)...) - return d.startVMLocked(ctx, vm, image) - }) -} - -func (d *Daemon) DeleteVM(ctx context.Context, idOrName string) (model.VMRecord, error) { - return d.withVMLockByRef(ctx, idOrName, func(vm model.VMRecord) (model.VMRecord, error) { - return d.deleteVMLocked(ctx, vm) - }) -} - -func (d *Daemon) deleteVMLocked(ctx context.Context, current model.VMRecord) (vm model.VMRecord, err error) { - vm = current - op := d.beginOperation("vm.delete", "vm_ref", vm.ID) - defer func() { - if err != nil { - op.fail(err, vmLogAttrs(vm)...) - return - } - op.done(vmLogAttrs(vm)...) - }() - if vm.State == model.VMStateRunning && system.ProcessRunning(vm.Runtime.PID, vm.Runtime.APISockPath) { - op.stage("kill_running_vm", "pid", vm.Runtime.PID) - _ = d.killVMProcess(ctx, vm.Runtime.PID) - } - op.stage("cleanup_runtime") - if err := d.cleanupRuntime(ctx, vm, false); err != nil { - return model.VMRecord{}, err - } - op.stage("delete_store_record") - if err := d.store.DeleteVM(ctx, vm.ID); err != nil { - return model.VMRecord{}, err - } - if vm.Runtime.VMDir != "" { - op.stage("delete_vm_dir", "vm_dir", vm.Runtime.VMDir) - if err := os.RemoveAll(vm.Runtime.VMDir); err != nil { - return model.VMRecord{}, err - } - } - return vm, nil -} - -func (d *Daemon) SetVM(ctx context.Context, params api.VMSetParams) (model.VMRecord, error) { - return d.withVMLockByRef(ctx, params.IDOrName, func(vm model.VMRecord) (model.VMRecord, error) { - return d.setVMLocked(ctx, vm, params) - }) -} - -func (d *Daemon) setVMLocked(ctx context.Context, current model.VMRecord, params api.VMSetParams) (vm model.VMRecord, err error) { - vm = current - op := d.beginOperation("vm.set", "vm_ref", vm.ID) - defer func() { - if err != nil { - op.fail(err, vmLogAttrs(vm)...) - return - } - op.done(vmLogAttrs(vm)...) - }() - running := vm.State == model.VMStateRunning && system.ProcessRunning(vm.Runtime.PID, vm.Runtime.APISockPath) - if params.VCPUCount != nil { - if err := validateOptionalPositiveSetting("vcpu", params.VCPUCount); err != nil { - return model.VMRecord{}, err - } - if running { - return model.VMRecord{}, errors.New("vcpu changes require the VM to be stopped") - } - op.stage("update_vcpu", "vcpu_count", *params.VCPUCount) - vm.Spec.VCPUCount = *params.VCPUCount - } - if params.MemoryMiB != nil { - if err := validateOptionalPositiveSetting("memory", params.MemoryMiB); err != nil { - return model.VMRecord{}, err - } - if running { - return model.VMRecord{}, errors.New("memory changes require the VM to be stopped") - } - op.stage("update_memory", "memory_mib", *params.MemoryMiB) - vm.Spec.MemoryMiB = *params.MemoryMiB - } - if params.WorkDiskSize != "" { - size, err := model.ParseSize(params.WorkDiskSize) - if err != nil { - return model.VMRecord{}, err - } - if running { - return model.VMRecord{}, errors.New("disk changes require the VM to be stopped") - } - if size < vm.Spec.WorkDiskSizeBytes { - return model.VMRecord{}, errors.New("disk size can only grow") - } - if size > vm.Spec.WorkDiskSizeBytes { - if exists(vm.Runtime.WorkDiskPath) { - op.stage("resize_work_disk", "from_bytes", vm.Spec.WorkDiskSizeBytes, "to_bytes", size) - if err := d.validateWorkDiskResizePrereqs(); err != nil { - return model.VMRecord{}, err - } - if err := system.ResizeExt4Image(ctx, d.runner, vm.Runtime.WorkDiskPath, size); err != nil { - return model.VMRecord{}, err - } - } - vm.Spec.WorkDiskSizeBytes = size - } - } - if params.NATEnabled != nil { - op.stage("update_nat", "nat_enabled", *params.NATEnabled) - vm.Spec.NATEnabled = *params.NATEnabled - } - if running { - if err := d.applyCapabilityConfigChanges(ctx, current, vm); err != nil { - return model.VMRecord{}, err - } - } - system.TouchNow(&vm) - if err := d.store.UpsertVM(ctx, vm); err != nil { - return model.VMRecord{}, err - } - return vm, nil -} - -func (d *Daemon) GetVMStats(ctx context.Context, idOrName string) (model.VMRecord, model.VMStats, error) { - vm, err := d.withVMLockByRef(ctx, idOrName, func(vm model.VMRecord) (model.VMRecord, error) { - return d.getVMStatsLocked(ctx, vm) - }) - if err != nil { - return model.VMRecord{}, model.VMStats{}, err - } - return vm, vm.Stats, nil -} - -func (d *Daemon) HealthVM(ctx context.Context, idOrName string) (result api.VMHealthResult, err error) { - _, err = d.withVMLockByRef(ctx, idOrName, func(vm model.VMRecord) (model.VMRecord, error) { - result.Name = vm.Name - if vm.State != model.VMStateRunning || !system.ProcessRunning(vm.Runtime.PID, vm.Runtime.APISockPath) { - result.Healthy = false - return vm, nil - } - if strings.TrimSpace(vm.Runtime.VSockPath) == "" { - return model.VMRecord{}, errors.New("vm has no vsock path") - } - if vm.Runtime.VSockCID == 0 { - return model.VMRecord{}, errors.New("vm has no vsock cid") - } - if err := d.ensureSocketAccess(ctx, vm.Runtime.VSockPath, "firecracker vsock socket"); err != nil { - return model.VMRecord{}, err - } - pingCtx, cancel := context.WithTimeout(ctx, 3*time.Second) - defer cancel() - if err := vsockagent.Health(pingCtx, d.logger, vm.Runtime.VSockPath); err != nil { - return model.VMRecord{}, err - } - result.Healthy = true - return vm, nil - }) - return result, err -} - -func (d *Daemon) PingVM(ctx context.Context, idOrName string) (result api.VMPingResult, err error) { - health, err := d.HealthVM(ctx, idOrName) - if err != nil { - return api.VMPingResult{}, err - } - return api.VMPingResult{Name: health.Name, Alive: health.Healthy}, nil -} - -func (d *Daemon) getVMStatsLocked(ctx context.Context, vm model.VMRecord) (model.VMRecord, error) { - stats, err := d.collectStats(ctx, vm) - if err == nil { - vm.Stats = stats - vm.UpdatedAt = model.Now() - _ = d.store.UpsertVM(ctx, vm) - if d.logger != nil { - d.logger.Debug("vm stats collected", append(vmLogAttrs(vm), "rss_bytes", stats.RSSBytes, "vsz_bytes", stats.VSZBytes, "cpu_percent", stats.CPUPercent)...) - } - } - return vm, nil -} - -func (d *Daemon) pollStats(ctx context.Context) error { - vms, err := d.store.ListVMs(ctx) - if err != nil { - return err - } - for _, vm := range vms { - if err := d.withVMLockByIDErr(ctx, vm.ID, func(vm model.VMRecord) error { - if vm.State != model.VMStateRunning || !system.ProcessRunning(vm.Runtime.PID, vm.Runtime.APISockPath) { - return nil - } - stats, err := d.collectStats(ctx, vm) - if err != nil { - if d.logger != nil { - d.logger.Debug("vm stats collection failed", append(vmLogAttrs(vm), "error", err.Error())...) - } - return nil - } - vm.Stats = stats - vm.UpdatedAt = model.Now() - return d.store.UpsertVM(ctx, vm) - }); err != nil { - return err - } - } - return nil -} - -func (d *Daemon) stopStaleVMs(ctx context.Context) (err error) { - if d.config.AutoStopStaleAfter <= 0 { - return nil - } - op := d.beginOperation("vm.stop_stale") - defer func() { - if err != nil { - op.fail(err) - return - } - op.done() - }() - vms, err := d.store.ListVMs(ctx) - if err != nil { - return err - } - now := model.Now() - for _, vm := range vms { - if err := d.withVMLockByIDErr(ctx, vm.ID, func(vm model.VMRecord) error { - if vm.State != model.VMStateRunning || !system.ProcessRunning(vm.Runtime.PID, vm.Runtime.APISockPath) { - return nil - } - if now.Sub(vm.LastTouchedAt) < d.config.AutoStopStaleAfter { - return nil - } - op.stage("stopping_vm", vmLogAttrs(vm)...) - _ = d.sendCtrlAltDel(ctx, vm) - _ = d.waitForExit(ctx, vm.Runtime.PID, vm.Runtime.APISockPath, 10*time.Second) - _ = d.cleanupRuntime(ctx, vm, true) - vm.State = model.VMStateStopped - vm.Runtime.State = model.VMStateStopped - clearRuntimeHandles(&vm) - vm.UpdatedAt = model.Now() - return d.store.UpsertVM(ctx, vm) - }); err != nil { - return err - } - } - return nil -} - -func (d *Daemon) collectStats(ctx context.Context, vm model.VMRecord) (model.VMStats, error) { - stats := model.VMStats{ - CollectedAt: model.Now(), - SystemOverlayBytes: system.AllocatedBytes(vm.Runtime.SystemOverlay), - WorkDiskBytes: system.AllocatedBytes(vm.Runtime.WorkDiskPath), - MetricsRaw: system.ParseMetricsFile(vm.Runtime.MetricsPath), - } - if vm.Runtime.PID > 0 && system.ProcessRunning(vm.Runtime.PID, vm.Runtime.APISockPath) { - ps, err := system.ReadProcessStats(ctx, vm.Runtime.PID) - if err == nil { - stats.CPUPercent = ps.CPUPercent - stats.RSSBytes = ps.RSSBytes - stats.VSZBytes = ps.VSZBytes - } - } - return stats, nil -} - -func (d *Daemon) ensureSystemOverlay(ctx context.Context, vm *model.VMRecord) error { - if exists(vm.Runtime.SystemOverlay) { - return nil - } - _, err := d.runner.Run(ctx, "truncate", "-s", strconv.FormatInt(vm.Spec.SystemOverlaySizeByte, 10), vm.Runtime.SystemOverlay) - return err -} - -func (d *Daemon) patchRootOverlay(ctx context.Context, vm model.VMRecord, image model.Image) error { - resolv := []byte(fmt.Sprintf("nameserver %s\n", d.config.DefaultDNS)) - hostname := []byte(vm.Name + "\n") - hosts := []byte(fmt.Sprintf("127.0.0.1 localhost\n127.0.1.1 %s\n", vm.Name)) - sshdConfig := []byte(strings.Join([]string{ - "LogLevel DEBUG3", - "PermitRootLogin yes", - "PubkeyAuthentication yes", - "AuthorizedKeysFile /root/.ssh/authorized_keys", - "StrictModes no", - "", - }, "\n")) - fstab, err := system.ReadDebugFSText(ctx, d.runner, vm.Runtime.DMDev, "/etc/fstab") - if err != nil { - fstab = "" - } - builder := guestconfig.NewBuilder() - builder.WriteFile("/etc/resolv.conf", resolv) - builder.WriteFile("/etc/hostname", hostname) - builder.WriteFile("/etc/hosts", hosts) - builder.WriteFile(guestnet.ConfigPath, guestnet.ConfigFile(vm.Runtime.GuestIP, d.config.BridgeIP, d.config.DefaultDNS)) - builder.WriteFile(guestnet.GuestScriptPath, []byte(guestnet.BootstrapScript())) - builder.WriteFile("/etc/ssh/sshd_config.d/99-banger.conf", sshdConfig) - builder.DropMountTarget("/home") - builder.DropMountTarget("/var") - builder.AddMount(guestconfig.MountSpec{ - Source: "tmpfs", - Target: "/run", - FSType: "tmpfs", - Options: []string{"defaults", "nodev", "nosuid", "mode=0755"}, - Dump: 0, - Pass: 0, - }) - builder.AddMount(guestconfig.MountSpec{ - Source: "tmpfs", - Target: "/tmp", - FSType: "tmpfs", - Options: []string{"defaults", "nodev", "nosuid", "mode=1777"}, - Dump: 0, - Pass: 0, - }) - d.contributeGuestConfig(builder, vm, image) - builder.WriteFile("/etc/fstab", []byte(builder.RenderFSTab(fstab))) - files := builder.Files() - for _, guestPath := range builder.FilePaths() { - data := files[guestPath] - if guestPath == guestnet.GuestScriptPath { - if err := system.WriteExt4FileMode(ctx, d.runner, vm.Runtime.DMDev, guestPath, 0o755, data); err != nil { - return err - } - continue - } - if err := system.WriteExt4File(ctx, d.runner, vm.Runtime.DMDev, guestPath, data); err != nil { - return err - } - } - return nil -} - -type workDiskPreparation struct { - ClonedFromSeed bool -} - -func (d *Daemon) ensureWorkDisk(ctx context.Context, vm *model.VMRecord, image model.Image) (workDiskPreparation, error) { - if exists(vm.Runtime.WorkDiskPath) { - return workDiskPreparation{}, nil - } - if exists(image.WorkSeedPath) { - vmCreateStage(ctx, "prepare_work_disk", "cloning work seed") - if err := system.CopyFilePreferClone(image.WorkSeedPath, vm.Runtime.WorkDiskPath); err != nil { - return workDiskPreparation{}, err - } - seedInfo, err := os.Stat(image.WorkSeedPath) - if err != nil { - return workDiskPreparation{}, err - } - if vm.Spec.WorkDiskSizeBytes < seedInfo.Size() { - return workDiskPreparation{}, fmt.Errorf("requested work disk size %d is smaller than seed image %d", vm.Spec.WorkDiskSizeBytes, seedInfo.Size()) - } - if vm.Spec.WorkDiskSizeBytes > seedInfo.Size() { - vmCreateStage(ctx, "prepare_work_disk", "resizing work disk") - if err := system.ResizeExt4Image(ctx, d.runner, vm.Runtime.WorkDiskPath, vm.Spec.WorkDiskSizeBytes); err != nil { - return workDiskPreparation{}, err - } - } - return workDiskPreparation{ClonedFromSeed: true}, nil - } - vmCreateStage(ctx, "prepare_work_disk", "creating empty work disk") - if _, err := d.runner.Run(ctx, "truncate", "-s", strconv.FormatInt(vm.Spec.WorkDiskSizeBytes, 10), vm.Runtime.WorkDiskPath); err != nil { - return workDiskPreparation{}, err - } - if _, err := d.runner.Run(ctx, "mkfs.ext4", "-F", vm.Runtime.WorkDiskPath); err != nil { - return workDiskPreparation{}, err - } - rootMount, cleanupRoot, err := system.MountTempDir(ctx, d.runner, vm.Runtime.DMDev, true) - if err != nil { - return workDiskPreparation{}, err - } - defer cleanupRoot() - workMount, cleanupWork, err := system.MountTempDir(ctx, d.runner, vm.Runtime.WorkDiskPath, false) - if err != nil { - return workDiskPreparation{}, err - } - defer cleanupWork() - vmCreateStage(ctx, "prepare_work_disk", "copying /root into work disk") - if err := system.CopyDirContents(ctx, d.runner, filepath.Join(rootMount, "root"), workMount, true); err != nil { - return workDiskPreparation{}, err - } - if err := d.flattenNestedWorkHome(ctx, workMount); err != nil { - return workDiskPreparation{}, err - } - return workDiskPreparation{}, nil -} - -func (d *Daemon) ensureAuthorizedKeyOnWorkDisk(ctx context.Context, vm *model.VMRecord, image model.Image, prep workDiskPreparation) error { - fingerprint, err := guest.AuthorizedPublicKeyFingerprint(d.config.SSHKeyPath) - if err != nil { - return fmt.Errorf("derive authorized ssh key fingerprint: %w", err) - } - if prep.ClonedFromSeed && image.SeededSSHPublicKeyFingerprint != "" && image.SeededSSHPublicKeyFingerprint == fingerprint { - vmCreateStage(ctx, "prepare_work_disk", "using seeded SSH access") - return nil - } - publicKey, err := guest.AuthorizedPublicKey(d.config.SSHKeyPath) - if err != nil { - return fmt.Errorf("derive authorized ssh key: %w", err) - } - vmCreateStage(ctx, "prepare_work_disk", "repairing SSH access on work disk") - workMount, cleanupWork, err := system.MountTempDir(ctx, d.runner, vm.Runtime.WorkDiskPath, false) - if err != nil { - return err - } - defer cleanupWork() - - if err := d.flattenNestedWorkHome(ctx, workMount); err != nil { - return err - } - - sshDir := filepath.Join(workMount, ".ssh") - if _, err := d.runner.RunSudo(ctx, "mkdir", "-p", sshDir); err != nil { - return err - } - if _, err := d.runner.RunSudo(ctx, "chmod", "700", sshDir); err != nil { - return err - } - - authorizedKeysPath := filepath.Join(sshDir, "authorized_keys") - existing, err := d.runner.RunSudo(ctx, "cat", authorizedKeysPath) - if err != nil { - existing = nil - } - merged := mergeAuthorizedKey(existing, publicKey) - - tmpFile, err := os.CreateTemp("", "banger-authorized-keys-*") - if err != nil { - return err - } - tmpPath := tmpFile.Name() - if _, err := tmpFile.Write(merged); err != nil { - _ = tmpFile.Close() - _ = os.Remove(tmpPath) - return err - } - if err := tmpFile.Close(); err != nil { - _ = os.Remove(tmpPath) - return err - } - defer os.Remove(tmpPath) - - if _, err := d.runner.RunSudo(ctx, "install", "-m", "600", tmpPath, authorizedKeysPath); err != nil { - return err - } - if prep.ClonedFromSeed && image.Managed { - vmCreateStage(ctx, "prepare_work_disk", "refreshing managed work seed") - if err := d.refreshManagedWorkSeedFingerprint(ctx, image, fingerprint); err != nil { - return err - } - } - return nil -} - -func (d *Daemon) ensureGitIdentityOnWorkDisk(ctx context.Context, vm *model.VMRecord) error { - runner := d.runner - if runner == nil { - runner = system.NewRunner() - } - - identity, err := resolveHostGlobalGitIdentity(ctx, runner) - if err != nil { - d.warnGitIdentitySyncSkipped(*vm, hostGlobalGitIdentitySource, err) - return nil - } - - vmCreateStage(ctx, "prepare_work_disk", "syncing git identity") - workMount, cleanupWork, err := system.MountTempDir(ctx, runner, vm.Runtime.WorkDiskPath, false) - if err != nil { - return err - } - defer cleanupWork() - - if err := d.flattenNestedWorkHome(ctx, workMount); err != nil { - return err - } - - return writeGitIdentity(ctx, runner, filepath.Join(workMount, workDiskGitConfigRelativePath), identity) -} - -func (d *Daemon) ensureOpencodeAuthOnWorkDisk(ctx context.Context, vm *model.VMRecord) error { - return d.ensureAuthFileOnWorkDisk( - ctx, - vm, - "syncing opencode auth", - hostOpencodeAuthDefaultDisplayPath, - resolveHostOpencodeAuthPath, - workDiskOpencodeAuthRelativePath, - d.warnOpencodeAuthSyncSkipped, - ) -} - -func (d *Daemon) ensureClaudeAuthOnWorkDisk(ctx context.Context, vm *model.VMRecord) error { - return d.ensureAuthFileOnWorkDisk( - ctx, - vm, - "syncing claude auth", - hostClaudeAuthDefaultDisplayPath, - resolveHostClaudeAuthPath, - workDiskClaudeAuthRelativePath, - d.warnClaudeAuthSyncSkipped, - ) -} - -func (d *Daemon) ensurePiAuthOnWorkDisk(ctx context.Context, vm *model.VMRecord) error { - return d.ensureAuthFileOnWorkDisk( - ctx, - vm, - "syncing pi auth", - hostPiAuthDefaultDisplayPath, - resolveHostPiAuthPath, - workDiskPiAuthRelativePath, - d.warnPiAuthSyncSkipped, - ) -} - -func (d *Daemon) ensureAuthFileOnWorkDisk(ctx context.Context, vm *model.VMRecord, stageDetail, defaultDisplayPath string, resolveHostPath func() (string, error), guestRelativePath string, warn func(model.VMRecord, string, error)) error { - hostAuthPath, err := resolveHostPath() - if err != nil { - warn(*vm, defaultDisplayPath, err) - return nil - } - authData, err := os.ReadFile(hostAuthPath) - if err != nil { - warn(*vm, hostAuthPath, err) - return nil - } - - runner := d.runner - if runner == nil { - runner = system.NewRunner() - } - - vmCreateStage(ctx, "prepare_work_disk", stageDetail) - workMount, cleanupWork, err := system.MountTempDir(ctx, runner, vm.Runtime.WorkDiskPath, false) - if err != nil { - return err - } - defer cleanupWork() - - if err := d.flattenNestedWorkHome(ctx, workMount); err != nil { - return err - } - - authDir := filepath.Join(workMount, filepath.Dir(guestRelativePath)) - if _, err := runner.RunSudo(ctx, "mkdir", "-p", authDir); err != nil { - return err - } - authPath := filepath.Join(workMount, guestRelativePath) - - tmpFile, err := os.CreateTemp("", "banger-auth-*") - if err != nil { - return err - } - tmpPath := tmpFile.Name() - if _, err := tmpFile.Write(authData); err != nil { - _ = tmpFile.Close() - _ = os.Remove(tmpPath) - return err - } - if err := tmpFile.Close(); err != nil { - _ = os.Remove(tmpPath) - return err - } - defer os.Remove(tmpPath) - - _, err = runner.RunSudo(ctx, "install", "-m", "600", tmpPath, authPath) - return err -} - -func resolveHostOpencodeAuthPath() (string, error) { - return resolveHostAuthPath(workDiskOpencodeAuthRelativePath) -} - -func resolveHostClaudeAuthPath() (string, error) { - return resolveHostAuthPath(workDiskClaudeAuthRelativePath) -} - -func resolveHostPiAuthPath() (string, error) { - return resolveHostAuthPath(workDiskPiAuthRelativePath) -} - -func resolveHostAuthPath(relativePath string) (string, error) { - home, err := os.UserHomeDir() - if err != nil { - return "", err - } - return filepath.Join(home, relativePath), nil -} - -func resolveHostGlobalGitIdentity(ctx context.Context, runner system.CommandRunner) (gitIdentity, error) { - name, err := gitConfigValue(ctx, runner, nil, "user.name") - if err != nil { - return gitIdentity{}, err - } - if name == "" { - return gitIdentity{}, errors.New("host git user.name is empty") - } - - email, err := gitConfigValue(ctx, runner, nil, "user.email") - if err != nil { - return gitIdentity{}, err - } - if email == "" { - return gitIdentity{}, errors.New("host git user.email is empty") - } - - return gitIdentity{Name: name, Email: email}, nil -} - -func gitConfigValue(ctx context.Context, runner system.CommandRunner, extraArgs []string, key string) (string, error) { - args := []string{"config"} - args = append(args, extraArgs...) - args = append(args, "--default", "", "--get", key) - out, err := runner.Run(ctx, "git", args...) - if err != nil { - return "", err - } - return strings.TrimSpace(string(out)), nil -} - -func writeGitIdentity(ctx context.Context, runner system.CommandRunner, gitConfigPath string, identity gitIdentity) error { - existing, err := runner.RunSudo(ctx, "cat", gitConfigPath) - if err != nil { - existing = nil - } - - tmpFile, err := os.CreateTemp("", "banger-gitconfig-*") - if err != nil { - return err - } - tmpPath := tmpFile.Name() - if _, err := tmpFile.Write(existing); err != nil { - _ = tmpFile.Close() - _ = os.Remove(tmpPath) - return err - } - if err := tmpFile.Close(); err != nil { - _ = os.Remove(tmpPath) - return err - } - defer os.Remove(tmpPath) - - if _, err := runner.Run(ctx, "git", "config", "--file", tmpPath, "user.name", identity.Name); err != nil { - return err - } - if _, err := runner.Run(ctx, "git", "config", "--file", tmpPath, "user.email", identity.Email); err != nil { - return err - } - _, err = runner.RunSudo(ctx, "install", "-m", "644", tmpPath, gitConfigPath) - return err -} - -func (d *Daemon) warnOpencodeAuthSyncSkipped(vm model.VMRecord, hostPath string, err error) { - if d.logger == nil || err == nil { - return - } - d.logger.Warn("guest opencode auth sync skipped", append(vmLogAttrs(vm), "host_path", hostPath, "error", err.Error())...) -} - -func (d *Daemon) warnClaudeAuthSyncSkipped(vm model.VMRecord, hostPath string, err error) { - if d.logger == nil || err == nil { - return - } - d.logger.Warn("guest claude auth sync skipped", append(vmLogAttrs(vm), "host_path", hostPath, "error", err.Error())...) -} - -func (d *Daemon) warnPiAuthSyncSkipped(vm model.VMRecord, hostPath string, err error) { - if d.logger == nil || err == nil { - return - } - d.logger.Warn("guest pi auth sync skipped", append(vmLogAttrs(vm), "host_path", hostPath, "error", err.Error())...) -} - -func (d *Daemon) warnGitIdentitySyncSkipped(vm model.VMRecord, source string, err error) { - if d.logger == nil || err == nil { - return - } - d.logger.Warn("guest git identity sync skipped", append(vmLogAttrs(vm), "source", source, "error", err.Error())...) -} - -func mergeAuthorizedKey(existing, managed []byte) []byte { - managedLine := strings.TrimSpace(string(managed)) - if managedLine == "" { - return append([]byte(nil), existing...) - } - - lines := strings.Split(strings.ReplaceAll(string(existing), "\r\n", "\n"), "\n") - out := make([]string, 0, len(lines)+1) - found := false - for _, line := range lines { - line = strings.TrimRight(line, "\r") - trimmed := strings.TrimSpace(line) - if trimmed == "" { - continue - } - if trimmed == managedLine { - found = true - } - out = append(out, line) - } - if !found { - out = append(out, managedLine) - } - return []byte(strings.Join(out, "\n") + "\n") -} - -func (d *Daemon) flattenNestedWorkHome(ctx context.Context, workMount string) error { - nestedHome := filepath.Join(workMount, "root") - if !exists(nestedHome) { - return nil - } - if _, err := d.runner.RunSudo(ctx, "chmod", "755", nestedHome); err != nil { - return err - } - entries, err := os.ReadDir(nestedHome) - if err != nil { - return err - } - for _, entry := range entries { - sourcePath := filepath.Join(nestedHome, entry.Name()) - if _, err := d.runner.RunSudo(ctx, "cp", "-a", sourcePath, workMount+"/"); err != nil { - return err - } - } - _, err = d.runner.RunSudo(ctx, "rm", "-rf", nestedHome) - return err -} - func (d *Daemon) ensureBridge(ctx context.Context) error { if _, err := d.runner.Run(ctx, "ip", "link", "show", d.config.BridgeName); err == nil { _, err = d.runner.RunSudo(ctx, "ip", "link", "set", d.config.BridgeName, "up") diff --git a/internal/daemon/vm_authsync.go b/internal/daemon/vm_authsync.go new file mode 100644 index 0000000..20e3261 --- /dev/null +++ b/internal/daemon/vm_authsync.go @@ -0,0 +1,353 @@ +package daemon + +import ( + "context" + "errors" + "fmt" + "os" + "path/filepath" + "strings" + + "banger/internal/guest" + "banger/internal/model" + "banger/internal/system" +) + +const ( + workDiskGitConfigRelativePath = ".gitconfig" + workDiskOpencodeAuthDirRelativePath = ".local/share/opencode" + workDiskOpencodeAuthRelativePath = workDiskOpencodeAuthDirRelativePath + "/auth.json" + workDiskClaudeAuthDirRelativePath = ".claude" + workDiskClaudeAuthRelativePath = workDiskClaudeAuthDirRelativePath + "/.credentials.json" + workDiskPiAuthDirRelativePath = ".pi/agent" + workDiskPiAuthRelativePath = workDiskPiAuthDirRelativePath + "/auth.json" + hostGlobalGitIdentitySource = "git config --global" + hostOpencodeAuthDefaultDisplayPath = "~/" + workDiskOpencodeAuthRelativePath + hostClaudeAuthDefaultDisplayPath = "~/" + workDiskClaudeAuthRelativePath + hostPiAuthDefaultDisplayPath = "~/" + workDiskPiAuthRelativePath +) + +type gitIdentity struct { + Name string + Email string +} + +func (d *Daemon) ensureAuthorizedKeyOnWorkDisk(ctx context.Context, vm *model.VMRecord, image model.Image, prep workDiskPreparation) error { + fingerprint, err := guest.AuthorizedPublicKeyFingerprint(d.config.SSHKeyPath) + if err != nil { + return fmt.Errorf("derive authorized ssh key fingerprint: %w", err) + } + if prep.ClonedFromSeed && image.SeededSSHPublicKeyFingerprint != "" && image.SeededSSHPublicKeyFingerprint == fingerprint { + vmCreateStage(ctx, "prepare_work_disk", "using seeded SSH access") + return nil + } + publicKey, err := guest.AuthorizedPublicKey(d.config.SSHKeyPath) + if err != nil { + return fmt.Errorf("derive authorized ssh key: %w", err) + } + vmCreateStage(ctx, "prepare_work_disk", "repairing SSH access on work disk") + workMount, cleanupWork, err := system.MountTempDir(ctx, d.runner, vm.Runtime.WorkDiskPath, false) + if err != nil { + return err + } + defer cleanupWork() + + if err := d.flattenNestedWorkHome(ctx, workMount); err != nil { + return err + } + + sshDir := filepath.Join(workMount, ".ssh") + if _, err := d.runner.RunSudo(ctx, "mkdir", "-p", sshDir); err != nil { + return err + } + if _, err := d.runner.RunSudo(ctx, "chmod", "700", sshDir); err != nil { + return err + } + + authorizedKeysPath := filepath.Join(sshDir, "authorized_keys") + existing, err := d.runner.RunSudo(ctx, "cat", authorizedKeysPath) + if err != nil { + existing = nil + } + merged := mergeAuthorizedKey(existing, publicKey) + + tmpFile, err := os.CreateTemp("", "banger-authorized-keys-*") + if err != nil { + return err + } + tmpPath := tmpFile.Name() + if _, err := tmpFile.Write(merged); err != nil { + _ = tmpFile.Close() + _ = os.Remove(tmpPath) + return err + } + if err := tmpFile.Close(); err != nil { + _ = os.Remove(tmpPath) + return err + } + defer os.Remove(tmpPath) + + if _, err := d.runner.RunSudo(ctx, "install", "-m", "600", tmpPath, authorizedKeysPath); err != nil { + return err + } + if prep.ClonedFromSeed && image.Managed { + vmCreateStage(ctx, "prepare_work_disk", "refreshing managed work seed") + if err := d.refreshManagedWorkSeedFingerprint(ctx, image, fingerprint); err != nil { + return err + } + } + return nil +} + +func (d *Daemon) ensureGitIdentityOnWorkDisk(ctx context.Context, vm *model.VMRecord) error { + runner := d.runner + if runner == nil { + runner = system.NewRunner() + } + + identity, err := resolveHostGlobalGitIdentity(ctx, runner) + if err != nil { + d.warnGitIdentitySyncSkipped(*vm, hostGlobalGitIdentitySource, err) + return nil + } + + vmCreateStage(ctx, "prepare_work_disk", "syncing git identity") + workMount, cleanupWork, err := system.MountTempDir(ctx, runner, vm.Runtime.WorkDiskPath, false) + if err != nil { + return err + } + defer cleanupWork() + + if err := d.flattenNestedWorkHome(ctx, workMount); err != nil { + return err + } + + return writeGitIdentity(ctx, runner, filepath.Join(workMount, workDiskGitConfigRelativePath), identity) +} + +func (d *Daemon) ensureOpencodeAuthOnWorkDisk(ctx context.Context, vm *model.VMRecord) error { + return d.ensureAuthFileOnWorkDisk( + ctx, + vm, + "syncing opencode auth", + hostOpencodeAuthDefaultDisplayPath, + resolveHostOpencodeAuthPath, + workDiskOpencodeAuthRelativePath, + d.warnOpencodeAuthSyncSkipped, + ) +} + +func (d *Daemon) ensureClaudeAuthOnWorkDisk(ctx context.Context, vm *model.VMRecord) error { + return d.ensureAuthFileOnWorkDisk( + ctx, + vm, + "syncing claude auth", + hostClaudeAuthDefaultDisplayPath, + resolveHostClaudeAuthPath, + workDiskClaudeAuthRelativePath, + d.warnClaudeAuthSyncSkipped, + ) +} + +func (d *Daemon) ensurePiAuthOnWorkDisk(ctx context.Context, vm *model.VMRecord) error { + return d.ensureAuthFileOnWorkDisk( + ctx, + vm, + "syncing pi auth", + hostPiAuthDefaultDisplayPath, + resolveHostPiAuthPath, + workDiskPiAuthRelativePath, + d.warnPiAuthSyncSkipped, + ) +} + +func (d *Daemon) ensureAuthFileOnWorkDisk(ctx context.Context, vm *model.VMRecord, stageDetail, defaultDisplayPath string, resolveHostPath func() (string, error), guestRelativePath string, warn func(model.VMRecord, string, error)) error { + hostAuthPath, err := resolveHostPath() + if err != nil { + warn(*vm, defaultDisplayPath, err) + return nil + } + authData, err := os.ReadFile(hostAuthPath) + if err != nil { + warn(*vm, hostAuthPath, err) + return nil + } + + runner := d.runner + if runner == nil { + runner = system.NewRunner() + } + + vmCreateStage(ctx, "prepare_work_disk", stageDetail) + workMount, cleanupWork, err := system.MountTempDir(ctx, runner, vm.Runtime.WorkDiskPath, false) + if err != nil { + return err + } + defer cleanupWork() + + if err := d.flattenNestedWorkHome(ctx, workMount); err != nil { + return err + } + + authDir := filepath.Join(workMount, filepath.Dir(guestRelativePath)) + if _, err := runner.RunSudo(ctx, "mkdir", "-p", authDir); err != nil { + return err + } + authPath := filepath.Join(workMount, guestRelativePath) + + tmpFile, err := os.CreateTemp("", "banger-auth-*") + if err != nil { + return err + } + tmpPath := tmpFile.Name() + if _, err := tmpFile.Write(authData); err != nil { + _ = tmpFile.Close() + _ = os.Remove(tmpPath) + return err + } + if err := tmpFile.Close(); err != nil { + _ = os.Remove(tmpPath) + return err + } + defer os.Remove(tmpPath) + + _, err = runner.RunSudo(ctx, "install", "-m", "600", tmpPath, authPath) + return err +} + +func resolveHostOpencodeAuthPath() (string, error) { + return resolveHostAuthPath(workDiskOpencodeAuthRelativePath) +} + +func resolveHostClaudeAuthPath() (string, error) { + return resolveHostAuthPath(workDiskClaudeAuthRelativePath) +} + +func resolveHostPiAuthPath() (string, error) { + return resolveHostAuthPath(workDiskPiAuthRelativePath) +} + +func resolveHostAuthPath(relativePath string) (string, error) { + home, err := os.UserHomeDir() + if err != nil { + return "", err + } + return filepath.Join(home, relativePath), nil +} + +func resolveHostGlobalGitIdentity(ctx context.Context, runner system.CommandRunner) (gitIdentity, error) { + name, err := gitConfigValue(ctx, runner, nil, "user.name") + if err != nil { + return gitIdentity{}, err + } + if name == "" { + return gitIdentity{}, errors.New("host git user.name is empty") + } + + email, err := gitConfigValue(ctx, runner, nil, "user.email") + if err != nil { + return gitIdentity{}, err + } + if email == "" { + return gitIdentity{}, errors.New("host git user.email is empty") + } + + return gitIdentity{Name: name, Email: email}, nil +} + +func gitConfigValue(ctx context.Context, runner system.CommandRunner, extraArgs []string, key string) (string, error) { + args := []string{"config"} + args = append(args, extraArgs...) + args = append(args, "--default", "", "--get", key) + out, err := runner.Run(ctx, "git", args...) + if err != nil { + return "", err + } + return strings.TrimSpace(string(out)), nil +} + +func writeGitIdentity(ctx context.Context, runner system.CommandRunner, gitConfigPath string, identity gitIdentity) error { + existing, err := runner.RunSudo(ctx, "cat", gitConfigPath) + if err != nil { + existing = nil + } + + tmpFile, err := os.CreateTemp("", "banger-gitconfig-*") + if err != nil { + return err + } + tmpPath := tmpFile.Name() + if _, err := tmpFile.Write(existing); err != nil { + _ = tmpFile.Close() + _ = os.Remove(tmpPath) + return err + } + if err := tmpFile.Close(); err != nil { + _ = os.Remove(tmpPath) + return err + } + defer os.Remove(tmpPath) + + if _, err := runner.Run(ctx, "git", "config", "--file", tmpPath, "user.name", identity.Name); err != nil { + return err + } + if _, err := runner.Run(ctx, "git", "config", "--file", tmpPath, "user.email", identity.Email); err != nil { + return err + } + _, err = runner.RunSudo(ctx, "install", "-m", "644", tmpPath, gitConfigPath) + return err +} + +func (d *Daemon) warnOpencodeAuthSyncSkipped(vm model.VMRecord, hostPath string, err error) { + if d.logger == nil || err == nil { + return + } + d.logger.Warn("guest opencode auth sync skipped", append(vmLogAttrs(vm), "host_path", hostPath, "error", err.Error())...) +} + +func (d *Daemon) warnClaudeAuthSyncSkipped(vm model.VMRecord, hostPath string, err error) { + if d.logger == nil || err == nil { + return + } + d.logger.Warn("guest claude auth sync skipped", append(vmLogAttrs(vm), "host_path", hostPath, "error", err.Error())...) +} + +func (d *Daemon) warnPiAuthSyncSkipped(vm model.VMRecord, hostPath string, err error) { + if d.logger == nil || err == nil { + return + } + d.logger.Warn("guest pi auth sync skipped", append(vmLogAttrs(vm), "host_path", hostPath, "error", err.Error())...) +} + +func (d *Daemon) warnGitIdentitySyncSkipped(vm model.VMRecord, source string, err error) { + if d.logger == nil || err == nil { + return + } + d.logger.Warn("guest git identity sync skipped", append(vmLogAttrs(vm), "source", source, "error", err.Error())...) +} + +func mergeAuthorizedKey(existing, managed []byte) []byte { + managedLine := strings.TrimSpace(string(managed)) + if managedLine == "" { + return append([]byte(nil), existing...) + } + + lines := strings.Split(strings.ReplaceAll(string(existing), "\r\n", "\n"), "\n") + out := make([]string, 0, len(lines)+1) + found := false + for _, line := range lines { + line = strings.TrimRight(line, "\r") + trimmed := strings.TrimSpace(line) + if trimmed == "" { + continue + } + if trimmed == managedLine { + found = true + } + out = append(out, line) + } + if !found { + out = append(out, managedLine) + } + return []byte(strings.Join(out, "\n") + "\n") +} diff --git a/internal/daemon/vm_create.go b/internal/daemon/vm_create.go new file mode 100644 index 0000000..59493a8 --- /dev/null +++ b/internal/daemon/vm_create.go @@ -0,0 +1,131 @@ +package daemon + +import ( + "context" + "fmt" + "os" + "path/filepath" + "strings" + + "banger/internal/api" + "banger/internal/model" + "banger/internal/vmdns" +) + +func (d *Daemon) CreateVM(ctx context.Context, params api.VMCreateParams) (vm model.VMRecord, err error) { + d.mu.Lock() + defer d.mu.Unlock() + op := d.beginOperation("vm.create") + defer func() { + if err != nil { + op.fail(err) + return + } + op.done(vmLogAttrs(vm)...) + }() + if err := validateOptionalPositiveSetting("vcpu", params.VCPUCount); err != nil { + return model.VMRecord{}, err + } + if err := validateOptionalPositiveSetting("memory", params.MemoryMiB); err != nil { + return model.VMRecord{}, err + } + + imageName := params.ImageName + if imageName == "" { + imageName = d.config.DefaultImageName + } + vmCreateStage(ctx, "resolve_image", "resolving image") + image, err := d.FindImage(ctx, imageName) + if err != nil { + return model.VMRecord{}, err + } + vmCreateStage(ctx, "resolve_image", "using image "+image.Name) + op.stage("image_resolved", imageLogAttrs(image)...) + name := strings.TrimSpace(params.Name) + if name == "" { + name, err = d.generateName(ctx) + if err != nil { + return model.VMRecord{}, err + } + } + if _, err := d.FindVM(ctx, name); err == nil { + return model.VMRecord{}, fmt.Errorf("vm name already exists: %s", name) + } + id, err := model.NewID() + if err != nil { + return model.VMRecord{}, err + } + unlockVM := d.lockVMID(id) + defer unlockVM() + guestIP, err := d.store.NextGuestIP(ctx, bridgePrefix(d.config.BridgeIP)) + if err != nil { + return model.VMRecord{}, err + } + vmDir := filepath.Join(d.layout.VMsDir, id) + if err := os.MkdirAll(vmDir, 0o755); err != nil { + return model.VMRecord{}, err + } + vsockCID, err := defaultVSockCID(guestIP) + if err != nil { + return model.VMRecord{}, err + } + systemOverlaySize := int64(model.DefaultSystemOverlaySize) + if params.SystemOverlaySize != "" { + systemOverlaySize, err = model.ParseSize(params.SystemOverlaySize) + if err != nil { + return model.VMRecord{}, err + } + } + workDiskSize := int64(model.DefaultWorkDiskSize) + if params.WorkDiskSize != "" { + workDiskSize, err = model.ParseSize(params.WorkDiskSize) + if err != nil { + return model.VMRecord{}, err + } + } + now := model.Now() + spec := model.VMSpec{ + VCPUCount: optionalIntOrDefault(params.VCPUCount, model.DefaultVCPUCount), + MemoryMiB: optionalIntOrDefault(params.MemoryMiB, model.DefaultMemoryMiB), + SystemOverlaySizeByte: systemOverlaySize, + WorkDiskSizeBytes: workDiskSize, + NATEnabled: params.NATEnabled, + } + vm = model.VMRecord{ + ID: id, + Name: name, + ImageID: image.ID, + State: model.VMStateCreated, + CreatedAt: now, + UpdatedAt: now, + LastTouchedAt: now, + Spec: spec, + Runtime: model.VMRuntime{ + State: model.VMStateCreated, + GuestIP: guestIP, + DNSName: vmdns.RecordName(name), + VMDir: vmDir, + VSockPath: defaultVSockPath(d.layout.RuntimeDir, id), + VSockCID: vsockCID, + SystemOverlay: filepath.Join(vmDir, "system.cow"), + WorkDiskPath: filepath.Join(vmDir, "root.ext4"), + LogPath: filepath.Join(vmDir, "firecracker.log"), + MetricsPath: filepath.Join(vmDir, "metrics.json"), + }, + } + vmCreateBindVM(ctx, vm) + vmCreateStage(ctx, "reserve_vm", fmt.Sprintf("allocated %s (%s)", vm.Name, vm.Runtime.GuestIP)) + if err := d.store.UpsertVM(ctx, vm); err != nil { + return model.VMRecord{}, err + } + op.stage("persisted", vmLogAttrs(vm)...) + if params.NoStart { + vm.State = model.VMStateStopped + vm.Runtime.State = model.VMStateStopped + if err := d.store.UpsertVM(ctx, vm); err != nil { + return model.VMRecord{}, err + } + return vm, nil + } + return d.startVMLocked(ctx, vm, image) +} diff --git a/internal/daemon/vm_disk.go b/internal/daemon/vm_disk.go new file mode 100644 index 0000000..fb273c0 --- /dev/null +++ b/internal/daemon/vm_disk.go @@ -0,0 +1,159 @@ +package daemon + +import ( + "context" + "fmt" + "os" + "path/filepath" + "strconv" + "strings" + + "banger/internal/guestconfig" + "banger/internal/guestnet" + "banger/internal/model" + "banger/internal/system" +) + +type workDiskPreparation struct { + ClonedFromSeed bool +} + +func (d *Daemon) ensureSystemOverlay(ctx context.Context, vm *model.VMRecord) error { + if exists(vm.Runtime.SystemOverlay) { + return nil + } + _, err := d.runner.Run(ctx, "truncate", "-s", strconv.FormatInt(vm.Spec.SystemOverlaySizeByte, 10), vm.Runtime.SystemOverlay) + return err +} + +func (d *Daemon) patchRootOverlay(ctx context.Context, vm model.VMRecord, image model.Image) error { + resolv := []byte(fmt.Sprintf("nameserver %s\n", d.config.DefaultDNS)) + hostname := []byte(vm.Name + "\n") + hosts := []byte(fmt.Sprintf("127.0.0.1 localhost\n127.0.1.1 %s\n", vm.Name)) + sshdConfig := []byte(strings.Join([]string{ + "LogLevel DEBUG3", + "PermitRootLogin yes", + "PubkeyAuthentication yes", + "AuthorizedKeysFile /root/.ssh/authorized_keys", + "StrictModes no", + "", + }, "\n")) + fstab, err := system.ReadDebugFSText(ctx, d.runner, vm.Runtime.DMDev, "/etc/fstab") + if err != nil { + fstab = "" + } + builder := guestconfig.NewBuilder() + builder.WriteFile("/etc/resolv.conf", resolv) + builder.WriteFile("/etc/hostname", hostname) + builder.WriteFile("/etc/hosts", hosts) + builder.WriteFile(guestnet.ConfigPath, guestnet.ConfigFile(vm.Runtime.GuestIP, d.config.BridgeIP, d.config.DefaultDNS)) + builder.WriteFile(guestnet.GuestScriptPath, []byte(guestnet.BootstrapScript())) + builder.WriteFile("/etc/ssh/sshd_config.d/99-banger.conf", sshdConfig) + builder.DropMountTarget("/home") + builder.DropMountTarget("/var") + builder.AddMount(guestconfig.MountSpec{ + Source: "tmpfs", + Target: "/run", + FSType: "tmpfs", + Options: []string{"defaults", "nodev", "nosuid", "mode=0755"}, + Dump: 0, + Pass: 0, + }) + builder.AddMount(guestconfig.MountSpec{ + Source: "tmpfs", + Target: "/tmp", + FSType: "tmpfs", + Options: []string{"defaults", "nodev", "nosuid", "mode=1777"}, + Dump: 0, + Pass: 0, + }) + d.contributeGuestConfig(builder, vm, image) + builder.WriteFile("/etc/fstab", []byte(builder.RenderFSTab(fstab))) + files := builder.Files() + for _, guestPath := range builder.FilePaths() { + data := files[guestPath] + if guestPath == guestnet.GuestScriptPath { + if err := system.WriteExt4FileMode(ctx, d.runner, vm.Runtime.DMDev, guestPath, 0o755, data); err != nil { + return err + } + continue + } + if err := system.WriteExt4File(ctx, d.runner, vm.Runtime.DMDev, guestPath, data); err != nil { + return err + } + } + return nil +} + +func (d *Daemon) ensureWorkDisk(ctx context.Context, vm *model.VMRecord, image model.Image) (workDiskPreparation, error) { + if exists(vm.Runtime.WorkDiskPath) { + return workDiskPreparation{}, nil + } + if exists(image.WorkSeedPath) { + vmCreateStage(ctx, "prepare_work_disk", "cloning work seed") + if err := system.CopyFilePreferClone(image.WorkSeedPath, vm.Runtime.WorkDiskPath); err != nil { + return workDiskPreparation{}, err + } + seedInfo, err := os.Stat(image.WorkSeedPath) + if err != nil { + return workDiskPreparation{}, err + } + if vm.Spec.WorkDiskSizeBytes < seedInfo.Size() { + return workDiskPreparation{}, fmt.Errorf("requested work disk size %d is smaller than seed image %d", vm.Spec.WorkDiskSizeBytes, seedInfo.Size()) + } + if vm.Spec.WorkDiskSizeBytes > seedInfo.Size() { + vmCreateStage(ctx, "prepare_work_disk", "resizing work disk") + if err := system.ResizeExt4Image(ctx, d.runner, vm.Runtime.WorkDiskPath, vm.Spec.WorkDiskSizeBytes); err != nil { + return workDiskPreparation{}, err + } + } + return workDiskPreparation{ClonedFromSeed: true}, nil + } + vmCreateStage(ctx, "prepare_work_disk", "creating empty work disk") + if _, err := d.runner.Run(ctx, "truncate", "-s", strconv.FormatInt(vm.Spec.WorkDiskSizeBytes, 10), vm.Runtime.WorkDiskPath); err != nil { + return workDiskPreparation{}, err + } + if _, err := d.runner.Run(ctx, "mkfs.ext4", "-F", vm.Runtime.WorkDiskPath); err != nil { + return workDiskPreparation{}, err + } + rootMount, cleanupRoot, err := system.MountTempDir(ctx, d.runner, vm.Runtime.DMDev, true) + if err != nil { + return workDiskPreparation{}, err + } + defer cleanupRoot() + workMount, cleanupWork, err := system.MountTempDir(ctx, d.runner, vm.Runtime.WorkDiskPath, false) + if err != nil { + return workDiskPreparation{}, err + } + defer cleanupWork() + vmCreateStage(ctx, "prepare_work_disk", "copying /root into work disk") + if err := system.CopyDirContents(ctx, d.runner, filepath.Join(rootMount, "root"), workMount, true); err != nil { + return workDiskPreparation{}, err + } + if err := d.flattenNestedWorkHome(ctx, workMount); err != nil { + return workDiskPreparation{}, err + } + return workDiskPreparation{}, nil +} + +func (d *Daemon) flattenNestedWorkHome(ctx context.Context, workMount string) error { + nestedHome := filepath.Join(workMount, "root") + if !exists(nestedHome) { + return nil + } + if _, err := d.runner.RunSudo(ctx, "chmod", "755", nestedHome); err != nil { + return err + } + entries, err := os.ReadDir(nestedHome) + if err != nil { + return err + } + for _, entry := range entries { + sourcePath := filepath.Join(nestedHome, entry.Name()) + if _, err := d.runner.RunSudo(ctx, "cp", "-a", sourcePath, workMount+"/"); err != nil { + return err + } + } + _, err = d.runner.RunSudo(ctx, "rm", "-rf", nestedHome) + return err +} diff --git a/internal/daemon/vm_lifecycle.go b/internal/daemon/vm_lifecycle.go new file mode 100644 index 0000000..1b52108 --- /dev/null +++ b/internal/daemon/vm_lifecycle.go @@ -0,0 +1,386 @@ +package daemon + +import ( + "context" + "errors" + "os" + "path/filepath" + "strconv" + "strings" + "time" + + "banger/internal/api" + "banger/internal/firecracker" + "banger/internal/model" + "banger/internal/system" +) + +func (d *Daemon) StartVM(ctx context.Context, idOrName string) (model.VMRecord, error) { + return d.withVMLockByRef(ctx, idOrName, func(vm model.VMRecord) (model.VMRecord, error) { + image, err := d.store.GetImageByID(ctx, vm.ImageID) + if err != nil { + return model.VMRecord{}, err + } + if vm.State == model.VMStateRunning && system.ProcessRunning(vm.Runtime.PID, vm.Runtime.APISockPath) { + if d.logger != nil { + d.logger.Info("vm already running", vmLogAttrs(vm)...) + } + return vm, nil + } + return d.startVMLocked(ctx, vm, image) + }) +} + +func (d *Daemon) startVMLocked(ctx context.Context, vm model.VMRecord, image model.Image) (_ model.VMRecord, err error) { + op := d.beginOperation("vm.start", append(vmLogAttrs(vm), imageLogAttrs(image)...)...) + defer func() { + if err != nil { + err = annotateLogPath(err, vm.Runtime.LogPath) + op.fail(err, vmLogAttrs(vm)...) + return + } + op.done(vmLogAttrs(vm)...) + }() + op.stage("preflight") + vmCreateStage(ctx, "preflight", "checking host prerequisites") + if err := d.validateStartPrereqs(ctx, vm, image); err != nil { + return model.VMRecord{}, err + } + if err := os.MkdirAll(vm.Runtime.VMDir, 0o755); err != nil { + return model.VMRecord{}, err + } + op.stage("cleanup_runtime") + if err := d.cleanupRuntime(ctx, vm, true); err != nil { + return model.VMRecord{}, err + } + clearRuntimeHandles(&vm) + op.stage("bridge") + if err := d.ensureBridge(ctx); err != nil { + return model.VMRecord{}, err + } + op.stage("socket_dir") + if err := d.ensureSocketDir(); err != nil { + return model.VMRecord{}, err + } + + shortID := system.ShortID(vm.ID) + apiSock := filepath.Join(d.layout.RuntimeDir, "fc-"+shortID+".sock") + dmName := "fc-rootfs-" + shortID + tapName := "tap-fc-" + shortID + if strings.TrimSpace(vm.Runtime.VSockPath) == "" { + vm.Runtime.VSockPath = defaultVSockPath(d.layout.RuntimeDir, vm.ID) + } + if vm.Runtime.VSockCID == 0 { + vm.Runtime.VSockCID, err = defaultVSockCID(vm.Runtime.GuestIP) + if err != nil { + return model.VMRecord{}, err + } + } + if err := os.RemoveAll(apiSock); err != nil && !os.IsNotExist(err) { + return model.VMRecord{}, err + } + if err := os.RemoveAll(vm.Runtime.VSockPath); err != nil && !os.IsNotExist(err) { + return model.VMRecord{}, err + } + + op.stage("system_overlay", "overlay_path", vm.Runtime.SystemOverlay) + vmCreateStage(ctx, "prepare_rootfs", "preparing system overlay") + if err := d.ensureSystemOverlay(ctx, &vm); err != nil { + return model.VMRecord{}, err + } + + op.stage("dm_snapshot", "dm_name", dmName) + vmCreateStage(ctx, "prepare_rootfs", "creating root filesystem snapshot") + handles, err := d.createDMSnapshot(ctx, image.RootfsPath, vm.Runtime.SystemOverlay, dmName) + if err != nil { + return model.VMRecord{}, err + } + vm.Runtime.BaseLoop = handles.BaseLoop + vm.Runtime.COWLoop = handles.COWLoop + vm.Runtime.DMName = handles.DMName + vm.Runtime.DMDev = handles.DMDev + vm.Runtime.APISockPath = apiSock + vm.Runtime.State = model.VMStateRunning + vm.State = model.VMStateRunning + vm.Runtime.LastError = "" + + cleanupOnErr := func(err error) (model.VMRecord, error) { + vm.State = model.VMStateError + vm.Runtime.State = model.VMStateError + vm.Runtime.LastError = err.Error() + op.stage("cleanup_after_failure", "error", err.Error()) + if cleanupErr := d.cleanupRuntime(context.Background(), vm, true); cleanupErr != nil { + err = errors.Join(err, cleanupErr) + } + clearRuntimeHandles(&vm) + _ = d.store.UpsertVM(context.Background(), vm) + return model.VMRecord{}, err + } + + op.stage("patch_root_overlay") + vmCreateStage(ctx, "prepare_rootfs", "writing guest configuration") + if err := d.patchRootOverlay(ctx, vm, image); err != nil { + return cleanupOnErr(err) + } + op.stage("prepare_host_features") + vmCreateStage(ctx, "prepare_host_features", "preparing host-side vm features") + if err := d.prepareCapabilityHosts(ctx, &vm, image); err != nil { + return cleanupOnErr(err) + } + op.stage("tap") + tap, err := d.acquireTap(ctx, tapName) + if err != nil { + return cleanupOnErr(err) + } + vm.Runtime.TapDevice = tap + op.stage("metrics_file", "metrics_path", vm.Runtime.MetricsPath) + if err := os.WriteFile(vm.Runtime.MetricsPath, nil, 0o644); err != nil { + return cleanupOnErr(err) + } + + op.stage("firecracker_binary") + fcPath, err := d.firecrackerBinary() + if err != nil { + return cleanupOnErr(err) + } + op.stage("firecracker_launch", "log_path", vm.Runtime.LogPath, "metrics_path", vm.Runtime.MetricsPath) + vmCreateStage(ctx, "boot_firecracker", "starting firecracker") + machineConfig := firecracker.MachineConfig{ + BinaryPath: fcPath, + VMID: vm.ID, + SocketPath: apiSock, + LogPath: vm.Runtime.LogPath, + MetricsPath: vm.Runtime.MetricsPath, + KernelImagePath: image.KernelPath, + InitrdPath: image.InitrdPath, + KernelArgs: system.BuildBootArgs(vm.Name), + Drives: []firecracker.DriveConfig{{ + ID: "rootfs", + Path: vm.Runtime.DMDev, + ReadOnly: false, + IsRoot: true, + }}, + TapDevice: tap, + VSockPath: vm.Runtime.VSockPath, + VSockCID: vm.Runtime.VSockCID, + VCPUCount: vm.Spec.VCPUCount, + MemoryMiB: vm.Spec.MemoryMiB, + Logger: d.logger, + } + d.contributeMachineConfig(&machineConfig, vm, image) + machine, err := firecracker.NewMachine(ctx, machineConfig) + if err != nil { + return cleanupOnErr(err) + } + if err := machine.Start(ctx); err != nil { + // Use a fresh context: the request ctx may already be cancelled (client + // disconnect), but we still need the PID so cleanupRuntime can kill the + // Firecracker process that was spawned before the failure. + vm.Runtime.PID = d.resolveFirecrackerPID(context.Background(), machine, apiSock) + return cleanupOnErr(err) + } + vm.Runtime.PID = d.resolveFirecrackerPID(context.Background(), machine, apiSock) + op.debugStage("firecracker_started", "pid", vm.Runtime.PID) + op.stage("socket_access", "api_socket", apiSock) + if err := d.ensureSocketAccess(ctx, apiSock, "firecracker api socket"); err != nil { + return cleanupOnErr(err) + } + op.stage("vsock_access", "vsock_path", vm.Runtime.VSockPath, "vsock_cid", vm.Runtime.VSockCID) + if err := d.ensureSocketAccess(ctx, vm.Runtime.VSockPath, "firecracker vsock socket"); err != nil { + return cleanupOnErr(err) + } + vmCreateStage(ctx, "wait_vsock_agent", "waiting for guest vsock agent") + if err := waitForGuestVSockAgent(ctx, d.logger, vm.Runtime.VSockPath, vsockReadyWait); err != nil { + return cleanupOnErr(err) + } + op.stage("post_start_features") + vmCreateStage(ctx, "wait_guest_ready", "waiting for guest services") + if err := d.postStartCapabilities(ctx, vm, image); err != nil { + return cleanupOnErr(err) + } + system.TouchNow(&vm) + op.stage("persist") + vmCreateStage(ctx, "finalize", "saving vm state") + if err := d.store.UpsertVM(ctx, vm); err != nil { + return cleanupOnErr(err) + } + return vm, nil +} + +func (d *Daemon) StopVM(ctx context.Context, idOrName string) (model.VMRecord, error) { + return d.withVMLockByRef(ctx, idOrName, func(vm model.VMRecord) (model.VMRecord, error) { + return d.stopVMLocked(ctx, vm) + }) +} + +func (d *Daemon) stopVMLocked(ctx context.Context, current model.VMRecord) (vm model.VMRecord, err error) { + vm = current + op := d.beginOperation("vm.stop", "vm_ref", vm.ID) + defer func() { + if err != nil { + op.fail(err, vmLogAttrs(vm)...) + return + } + op.done(vmLogAttrs(vm)...) + }() + if vm.State != model.VMStateRunning || !system.ProcessRunning(vm.Runtime.PID, vm.Runtime.APISockPath) { + op.stage("cleanup_stale_runtime") + if err := d.cleanupRuntime(ctx, vm, true); err != nil { + return model.VMRecord{}, err + } + vm.State = model.VMStateStopped + vm.Runtime.State = model.VMStateStopped + clearRuntimeHandles(&vm) + if err := d.store.UpsertVM(ctx, vm); err != nil { + return model.VMRecord{}, err + } + return vm, nil + } + op.stage("graceful_shutdown") + if err := d.sendCtrlAltDel(ctx, vm); err != nil { + return model.VMRecord{}, err + } + op.stage("wait_for_exit", "pid", vm.Runtime.PID) + if err := d.waitForExit(ctx, vm.Runtime.PID, vm.Runtime.APISockPath, gracefulShutdownWait); err != nil { + if !errors.Is(err, errWaitForExitTimeout) { + return model.VMRecord{}, err + } + op.stage("graceful_shutdown_timeout", "pid", vm.Runtime.PID) + } + op.stage("cleanup_runtime") + if err := d.cleanupRuntime(ctx, vm, true); err != nil { + return model.VMRecord{}, err + } + vm.State = model.VMStateStopped + vm.Runtime.State = model.VMStateStopped + clearRuntimeHandles(&vm) + system.TouchNow(&vm) + if err := d.store.UpsertVM(ctx, vm); err != nil { + return model.VMRecord{}, err + } + return vm, nil +} + +func (d *Daemon) KillVM(ctx context.Context, params api.VMKillParams) (model.VMRecord, error) { + return d.withVMLockByRef(ctx, params.IDOrName, func(vm model.VMRecord) (model.VMRecord, error) { + return d.killVMLocked(ctx, vm, params.Signal) + }) +} + +func (d *Daemon) killVMLocked(ctx context.Context, current model.VMRecord, signalValue string) (vm model.VMRecord, err error) { + vm = current + op := d.beginOperation("vm.kill", "vm_ref", vm.ID, "signal", signalValue) + defer func() { + if err != nil { + op.fail(err, vmLogAttrs(vm)...) + return + } + op.done(vmLogAttrs(vm)...) + }() + if vm.State != model.VMStateRunning || !system.ProcessRunning(vm.Runtime.PID, vm.Runtime.APISockPath) { + op.stage("cleanup_stale_runtime") + if err := d.cleanupRuntime(ctx, vm, true); err != nil { + return model.VMRecord{}, err + } + vm.State = model.VMStateStopped + vm.Runtime.State = model.VMStateStopped + clearRuntimeHandles(&vm) + if err := d.store.UpsertVM(ctx, vm); err != nil { + return model.VMRecord{}, err + } + return vm, nil + } + + signal := strings.TrimSpace(signalValue) + if signal == "" { + signal = "TERM" + } + op.stage("send_signal", "pid", vm.Runtime.PID, "signal", signal) + if _, err := d.runner.RunSudo(ctx, "kill", "-"+signal, strconv.Itoa(vm.Runtime.PID)); err != nil { + return model.VMRecord{}, err + } + op.stage("wait_for_exit", "pid", vm.Runtime.PID) + if err := d.waitForExit(ctx, vm.Runtime.PID, vm.Runtime.APISockPath, 30*time.Second); err != nil { + if !errors.Is(err, errWaitForExitTimeout) { + return model.VMRecord{}, err + } + op.stage("signal_timeout", "pid", vm.Runtime.PID, "signal", signal) + } + op.stage("cleanup_runtime") + if err := d.cleanupRuntime(ctx, vm, true); err != nil { + return model.VMRecord{}, err + } + vm.State = model.VMStateStopped + vm.Runtime.State = model.VMStateStopped + clearRuntimeHandles(&vm) + system.TouchNow(&vm) + if err := d.store.UpsertVM(ctx, vm); err != nil { + return model.VMRecord{}, err + } + return vm, nil +} + +func (d *Daemon) RestartVM(ctx context.Context, idOrName string) (vm model.VMRecord, err error) { + op := d.beginOperation("vm.restart", "vm_ref", idOrName) + defer func() { + if err != nil { + op.fail(err, vmLogAttrs(vm)...) + return + } + op.done(vmLogAttrs(vm)...) + }() + resolved, err := d.FindVM(ctx, idOrName) + if err != nil { + return model.VMRecord{}, err + } + return d.withVMLockByID(ctx, resolved.ID, func(vm model.VMRecord) (model.VMRecord, error) { + op.stage("stop") + vm, err = d.stopVMLocked(ctx, vm) + if err != nil { + return model.VMRecord{}, err + } + image, err := d.store.GetImageByID(ctx, vm.ImageID) + if err != nil { + return model.VMRecord{}, err + } + op.stage("start", vmLogAttrs(vm)...) + return d.startVMLocked(ctx, vm, image) + }) +} + +func (d *Daemon) DeleteVM(ctx context.Context, idOrName string) (model.VMRecord, error) { + return d.withVMLockByRef(ctx, idOrName, func(vm model.VMRecord) (model.VMRecord, error) { + return d.deleteVMLocked(ctx, vm) + }) +} + +func (d *Daemon) deleteVMLocked(ctx context.Context, current model.VMRecord) (vm model.VMRecord, err error) { + vm = current + op := d.beginOperation("vm.delete", "vm_ref", vm.ID) + defer func() { + if err != nil { + op.fail(err, vmLogAttrs(vm)...) + return + } + op.done(vmLogAttrs(vm)...) + }() + if vm.State == model.VMStateRunning && system.ProcessRunning(vm.Runtime.PID, vm.Runtime.APISockPath) { + op.stage("kill_running_vm", "pid", vm.Runtime.PID) + _ = d.killVMProcess(ctx, vm.Runtime.PID) + } + op.stage("cleanup_runtime") + if err := d.cleanupRuntime(ctx, vm, false); err != nil { + return model.VMRecord{}, err + } + op.stage("delete_store_record") + if err := d.store.DeleteVM(ctx, vm.ID); err != nil { + return model.VMRecord{}, err + } + if vm.Runtime.VMDir != "" { + op.stage("delete_vm_dir", "vm_dir", vm.Runtime.VMDir) + if err := os.RemoveAll(vm.Runtime.VMDir); err != nil { + return model.VMRecord{}, err + } + } + return vm, nil +} diff --git a/internal/daemon/vm_set.go b/internal/daemon/vm_set.go new file mode 100644 index 0000000..5ffae29 --- /dev/null +++ b/internal/daemon/vm_set.go @@ -0,0 +1,87 @@ +package daemon + +import ( + "context" + "errors" + + "banger/internal/api" + "banger/internal/model" + "banger/internal/system" +) + +func (d *Daemon) SetVM(ctx context.Context, params api.VMSetParams) (model.VMRecord, error) { + return d.withVMLockByRef(ctx, params.IDOrName, func(vm model.VMRecord) (model.VMRecord, error) { + return d.setVMLocked(ctx, vm, params) + }) +} + +func (d *Daemon) setVMLocked(ctx context.Context, current model.VMRecord, params api.VMSetParams) (vm model.VMRecord, err error) { + vm = current + op := d.beginOperation("vm.set", "vm_ref", vm.ID) + defer func() { + if err != nil { + op.fail(err, vmLogAttrs(vm)...) + return + } + op.done(vmLogAttrs(vm)...) + }() + running := vm.State == model.VMStateRunning && system.ProcessRunning(vm.Runtime.PID, vm.Runtime.APISockPath) + if params.VCPUCount != nil { + if err := validateOptionalPositiveSetting("vcpu", params.VCPUCount); err != nil { + return model.VMRecord{}, err + } + if running { + return model.VMRecord{}, errors.New("vcpu changes require the VM to be stopped") + } + op.stage("update_vcpu", "vcpu_count", *params.VCPUCount) + vm.Spec.VCPUCount = *params.VCPUCount + } + if params.MemoryMiB != nil { + if err := validateOptionalPositiveSetting("memory", params.MemoryMiB); err != nil { + return model.VMRecord{}, err + } + if running { + return model.VMRecord{}, errors.New("memory changes require the VM to be stopped") + } + op.stage("update_memory", "memory_mib", *params.MemoryMiB) + vm.Spec.MemoryMiB = *params.MemoryMiB + } + if params.WorkDiskSize != "" { + size, err := model.ParseSize(params.WorkDiskSize) + if err != nil { + return model.VMRecord{}, err + } + if running { + return model.VMRecord{}, errors.New("disk changes require the VM to be stopped") + } + if size < vm.Spec.WorkDiskSizeBytes { + return model.VMRecord{}, errors.New("disk size can only grow") + } + if size > vm.Spec.WorkDiskSizeBytes { + if exists(vm.Runtime.WorkDiskPath) { + op.stage("resize_work_disk", "from_bytes", vm.Spec.WorkDiskSizeBytes, "to_bytes", size) + if err := d.validateWorkDiskResizePrereqs(); err != nil { + return model.VMRecord{}, err + } + if err := system.ResizeExt4Image(ctx, d.runner, vm.Runtime.WorkDiskPath, size); err != nil { + return model.VMRecord{}, err + } + } + vm.Spec.WorkDiskSizeBytes = size + } + } + if params.NATEnabled != nil { + op.stage("update_nat", "nat_enabled", *params.NATEnabled) + vm.Spec.NATEnabled = *params.NATEnabled + } + if running { + if err := d.applyCapabilityConfigChanges(ctx, current, vm); err != nil { + return model.VMRecord{}, err + } + } + system.TouchNow(&vm) + if err := d.store.UpsertVM(ctx, vm); err != nil { + return model.VMRecord{}, err + } + return vm, nil +} diff --git a/internal/daemon/vm_stats.go b/internal/daemon/vm_stats.go new file mode 100644 index 0000000..9d49043 --- /dev/null +++ b/internal/daemon/vm_stats.go @@ -0,0 +1,157 @@ +package daemon + +import ( + "context" + "errors" + "strings" + "time" + + "banger/internal/api" + "banger/internal/model" + "banger/internal/system" + "banger/internal/vsockagent" +) + +func (d *Daemon) GetVMStats(ctx context.Context, idOrName string) (model.VMRecord, model.VMStats, error) { + vm, err := d.withVMLockByRef(ctx, idOrName, func(vm model.VMRecord) (model.VMRecord, error) { + return d.getVMStatsLocked(ctx, vm) + }) + if err != nil { + return model.VMRecord{}, model.VMStats{}, err + } + return vm, vm.Stats, nil +} + +func (d *Daemon) HealthVM(ctx context.Context, idOrName string) (result api.VMHealthResult, err error) { + _, err = d.withVMLockByRef(ctx, idOrName, func(vm model.VMRecord) (model.VMRecord, error) { + result.Name = vm.Name + if vm.State != model.VMStateRunning || !system.ProcessRunning(vm.Runtime.PID, vm.Runtime.APISockPath) { + result.Healthy = false + return vm, nil + } + if strings.TrimSpace(vm.Runtime.VSockPath) == "" { + return model.VMRecord{}, errors.New("vm has no vsock path") + } + if vm.Runtime.VSockCID == 0 { + return model.VMRecord{}, errors.New("vm has no vsock cid") + } + if err := d.ensureSocketAccess(ctx, vm.Runtime.VSockPath, "firecracker vsock socket"); err != nil { + return model.VMRecord{}, err + } + pingCtx, cancel := context.WithTimeout(ctx, 3*time.Second) + defer cancel() + if err := vsockagent.Health(pingCtx, d.logger, vm.Runtime.VSockPath); err != nil { + return model.VMRecord{}, err + } + result.Healthy = true + return vm, nil + }) + return result, err +} + +func (d *Daemon) PingVM(ctx context.Context, idOrName string) (result api.VMPingResult, err error) { + health, err := d.HealthVM(ctx, idOrName) + if err != nil { + return api.VMPingResult{}, err + } + return api.VMPingResult{Name: health.Name, Alive: health.Healthy}, nil +} + +func (d *Daemon) getVMStatsLocked(ctx context.Context, vm model.VMRecord) (model.VMRecord, error) { + stats, err := d.collectStats(ctx, vm) + if err == nil { + vm.Stats = stats + vm.UpdatedAt = model.Now() + _ = d.store.UpsertVM(ctx, vm) + if d.logger != nil { + d.logger.Debug("vm stats collected", append(vmLogAttrs(vm), "rss_bytes", stats.RSSBytes, "vsz_bytes", stats.VSZBytes, "cpu_percent", stats.CPUPercent)...) + } + } + return vm, nil +} + +func (d *Daemon) pollStats(ctx context.Context) error { + vms, err := d.store.ListVMs(ctx) + if err != nil { + return err + } + for _, vm := range vms { + if err := d.withVMLockByIDErr(ctx, vm.ID, func(vm model.VMRecord) error { + if vm.State != model.VMStateRunning || !system.ProcessRunning(vm.Runtime.PID, vm.Runtime.APISockPath) { + return nil + } + stats, err := d.collectStats(ctx, vm) + if err != nil { + if d.logger != nil { + d.logger.Debug("vm stats collection failed", append(vmLogAttrs(vm), "error", err.Error())...) + } + return nil + } + vm.Stats = stats + vm.UpdatedAt = model.Now() + return d.store.UpsertVM(ctx, vm) + }); err != nil { + return err + } + } + return nil +} + +func (d *Daemon) stopStaleVMs(ctx context.Context) (err error) { + if d.config.AutoStopStaleAfter <= 0 { + return nil + } + op := d.beginOperation("vm.stop_stale") + defer func() { + if err != nil { + op.fail(err) + return + } + op.done() + }() + vms, err := d.store.ListVMs(ctx) + if err != nil { + return err + } + now := model.Now() + for _, vm := range vms { + if err := d.withVMLockByIDErr(ctx, vm.ID, func(vm model.VMRecord) error { + if vm.State != model.VMStateRunning || !system.ProcessRunning(vm.Runtime.PID, vm.Runtime.APISockPath) { + return nil + } + if now.Sub(vm.LastTouchedAt) < d.config.AutoStopStaleAfter { + return nil + } + op.stage("stopping_vm", vmLogAttrs(vm)...) + _ = d.sendCtrlAltDel(ctx, vm) + _ = d.waitForExit(ctx, vm.Runtime.PID, vm.Runtime.APISockPath, 10*time.Second) + _ = d.cleanupRuntime(ctx, vm, true) + vm.State = model.VMStateStopped + vm.Runtime.State = model.VMStateStopped + clearRuntimeHandles(&vm) + vm.UpdatedAt = model.Now() + return d.store.UpsertVM(ctx, vm) + }); err != nil { + return err + } + } + return nil +} + +func (d *Daemon) collectStats(ctx context.Context, vm model.VMRecord) (model.VMStats, error) { + stats := model.VMStats{ + CollectedAt: model.Now(), + SystemOverlayBytes: system.AllocatedBytes(vm.Runtime.SystemOverlay), + WorkDiskBytes: system.AllocatedBytes(vm.Runtime.WorkDiskPath), + MetricsRaw: system.ParseMetricsFile(vm.Runtime.MetricsPath), + } + if vm.Runtime.PID > 0 && system.ProcessRunning(vm.Runtime.PID, vm.Runtime.APISockPath) { + ps, err := system.ReadProcessStats(ctx, vm.Runtime.PID) + if err == nil { + stats.CPUPercent = ps.CPUPercent + stats.RSSBytes = ps.RSSBytes + stats.VSZBytes = ps.VSZBytes + } + } + return stats, nil +}