banger/internal/daemon/kernels.go
Thales Maciel 72882e45d7
daemon: serialise concurrent image/kernel pulls + atomic-rename seed refresh
Three concurrency bugs surfaced by `make smoke JOBS=4` that all stem
from `vm.create` paths assuming single-caller semantics:

1. **Kernel auto-pull manifest race.** Parallel `vm.create` calls that
   each need to auto-pull the same kernel ref both run kernelcat.Fetch
   in parallel against the same /var/lib/banger/kernels/<name>/. Fetch
   writes manifest.json non-atomically (truncate + write); the peer
   reads it back mid-write and trips
   "parse manifest for X: unexpected end of JSON input".

   Fix: per-name `sync.Mutex` map on `ImageService` (kernelPullLock).
   `KernelPull` and `readOrAutoPullKernel` both acquire it and re-check
   `kernelcat.ReadLocal` after the lock so a peer who finished while we
   waited is treated as success — `readOrAutoPullKernel` does NOT call
   `s.KernelPull` because that path errors with "already pulled" on a
   peer-success, which would be wrong for auto-pull. Different kernels
   stay parallel.

2. **Image auto-pull race.** Same shape as the kernel race but on the
   image side: parallel `vm.create` calls both run pullFromBundle /
   pullFromOCI for the missing image (each ~minutes of OCI fetch +
   ext4 build). The publishImage atom under imageOpsMu only protects
   the rename + UpsertImage commit, so the loser does all the work
   only to fail at the recheck with "image already exists".

   Fix: per-name `sync.Mutex` map on `ImageService` (imagePullLock).
   `findOrAutoPullImage` acquires it, re-checks FindImage, and only
   then calls PullImage. Loser short-circuits with the
   freshly-published image instead of redoing minutes of work.
   PullImage's own publishImage recheck stays as defense-in-depth
   for callers that bypass the auto-pull path.

3. **Work-seed refresh race.** When the host's SSH key has rotated
   since an image was last refreshed, `ensureAuthorizedKeyOnWorkDisk`
   triggers `refreshManagedWorkSeedFingerprint`, which rewrote the
   shared work-seed.ext4 in place via e2rm + e2cp. Peer `vm.create`
   calls doing parallel `MaterializeWorkDisk` rdumps observed a torn
   ext4 image — "Superblock checksum does not match superblock".

   Fix: stage the rewrite on a sibling tmpfile (`<seed>.refresh.<pid>-<ns>.tmp`)
   and atomic-rename. Concurrent readers either have the file open
   (kernel keeps the pre-rename inode alive) or open after the rename
   (see the new inode) — never observe a partial state. Two parallel
   refreshes are idempotent (same daemon, same SSH key) so unique tmp
   names are enough; whichever rename lands last wins, with identical
   content. UpsertImage runs after the rename so the recorded
   fingerprint always matches what's on disk.

Plus one smoke harness fix: reclassify `vm_prune` from `pure` to
`global`. `vm prune -f` removes ALL stopped VMs system-wide, not just
the ones the scenario created — so a parallel peer scenario that
happens to have its VM in `created`/`stopped` momentarily gets wiped.
Moving prune to the post-pool serial phase keeps it from racing with
in-flight scenarios.

After all four fixes, `make smoke JOBS=4` passes 21/21 in 174s
(serial baseline 141s; the small overhead is the buffered-output and
`wait -n` semaphore cost — well worth the parallelism for fast-iter
work on a 32-core box).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-27 17:24:11 -03:00

232 lines
7.6 KiB
Go

package daemon
import (
"context"
"errors"
"fmt"
"os"
"path/filepath"
"strings"
"time"
"banger/internal/api"
"banger/internal/kernelcat"
"banger/internal/system"
)
func (s *ImageService) KernelList(_ context.Context) (api.KernelListResult, error) {
entries, err := kernelcat.ListLocal(s.layout.KernelsDir)
if err != nil {
return api.KernelListResult{}, err
}
result := api.KernelListResult{Entries: make([]api.KernelEntry, 0, len(entries))}
for _, entry := range entries {
result.Entries = append(result.Entries, kernelEntryToAPI(entry))
}
return result, nil
}
func (s *ImageService) KernelShow(_ context.Context, name string) (api.KernelEntry, error) {
entry, err := kernelcat.ReadLocal(s.layout.KernelsDir, name)
if err != nil {
return api.KernelEntry{}, kernelNotFoundIfMissing(name, err)
}
return kernelEntryToAPI(entry), nil
}
func (s *ImageService) KernelDelete(_ context.Context, name string) error {
if err := kernelcat.ValidateName(name); err != nil {
return err
}
return kernelcat.DeleteLocal(s.layout.KernelsDir, name)
}
// KernelImport copies the kernel / initrd / modules artifacts produced by
// scripts/make-*-kernel.sh (under params.FromDir) into the local catalog
// under params.Name and writes the manifest. It is the primary bridge from
// "I built a kernel with the helper scripts" to "banger kernel list shows
// it and image register --kernel-ref works."
func (s *ImageService) KernelImport(ctx context.Context, params api.KernelImportParams) (api.KernelEntry, error) {
name := strings.TrimSpace(params.Name)
if err := kernelcat.ValidateName(name); err != nil {
return api.KernelEntry{}, err
}
fromDir := strings.TrimSpace(params.FromDir)
if fromDir == "" {
return api.KernelEntry{}, errors.New("--from <dir> is required")
}
discovered, err := kernelcat.DiscoverPaths(fromDir)
if err != nil {
return api.KernelEntry{}, fmt.Errorf("discover artifacts under %s: %w", fromDir, err)
}
targetDir := kernelcat.EntryDir(s.layout.KernelsDir, name)
// Overwrite-by-default: clear any prior entry so a re-import is clean.
if err := kernelcat.DeleteLocal(s.layout.KernelsDir, name); err != nil {
return api.KernelEntry{}, fmt.Errorf("clear prior catalog entry %q: %w", name, err)
}
if err := os.MkdirAll(targetDir, 0o755); err != nil {
return api.KernelEntry{}, err
}
kernelTarget := filepath.Join(targetDir, "vmlinux")
if err := system.CopyFilePreferClone(discovered.KernelPath, kernelTarget); err != nil {
return api.KernelEntry{}, fmt.Errorf("copy kernel: %w", err)
}
if discovered.InitrdPath != "" {
initrdTarget := filepath.Join(targetDir, "initrd.img")
if err := system.CopyFilePreferClone(discovered.InitrdPath, initrdTarget); err != nil {
return api.KernelEntry{}, fmt.Errorf("copy initrd: %w", err)
}
}
if discovered.ModulesDir != "" {
modulesTarget := filepath.Join(targetDir, "modules")
if err := os.MkdirAll(modulesTarget, 0o755); err != nil {
return api.KernelEntry{}, err
}
if err := system.CopyDirContents(ctx, s.runner, discovered.ModulesDir, modulesTarget, false); err != nil {
return api.KernelEntry{}, fmt.Errorf("copy modules: %w", err)
}
}
sum, err := kernelcat.SumFile(kernelTarget)
if err != nil {
return api.KernelEntry{}, fmt.Errorf("sha256 kernel: %w", err)
}
entry := kernelcat.Entry{
Name: name,
Distro: strings.TrimSpace(params.Distro),
Arch: strings.TrimSpace(params.Arch),
KernelVersion: inferKernelVersion(discovered.KernelPath, discovered.ModulesDir),
SHA256: sum,
Source: "import:" + fromDir,
ImportedAt: time.Now().UTC(),
}
if err := kernelcat.WriteLocal(s.layout.KernelsDir, entry); err != nil {
return api.KernelEntry{}, fmt.Errorf("write manifest: %w", err)
}
stored, err := kernelcat.ReadLocal(s.layout.KernelsDir, name)
if err != nil {
return api.KernelEntry{}, err
}
return kernelEntryToAPI(stored), nil
}
// KernelPull downloads a catalog entry by name into the local catalog. It
// refuses to overwrite an existing entry unless params.Force is set.
//
// Held under a per-name mutex so concurrent callers (the auto-pull
// path inside vm.create, parallel `banger kernel pull` invocations,
// or a mix) can't tear each other's manifest.json or extracted
// tarball. Lock first, then re-check the local catalog: a peer that
// already finished the pull while we waited produces the same
// "already pulled" error a fully-serial run would.
func (s *ImageService) KernelPull(ctx context.Context, params api.KernelPullParams) (api.KernelEntry, error) {
name := strings.TrimSpace(params.Name)
if err := kernelcat.ValidateName(name); err != nil {
return api.KernelEntry{}, err
}
lock := s.kernelPullLock(name)
lock.Lock()
defer lock.Unlock()
if !params.Force {
if _, err := kernelcat.ReadLocal(s.layout.KernelsDir, name); err == nil {
return api.KernelEntry{}, fmt.Errorf("kernel %q already pulled; pass --force to re-pull", name)
} else if !os.IsNotExist(err) {
return api.KernelEntry{}, err
}
}
catalog, err := kernelcat.LoadEmbedded()
if err != nil {
return api.KernelEntry{}, err
}
catEntry, err := catalog.Lookup(name)
if err != nil {
return api.KernelEntry{}, fmt.Errorf("kernel %q not in catalog (run 'banger kernel list --available' to browse)", name)
}
stored, err := kernelcat.Fetch(ctx, nil, s.layout.KernelsDir, catEntry)
if err != nil {
return api.KernelEntry{}, err
}
return kernelEntryToAPI(stored), nil
}
// KernelCatalog returns every entry from the embedded catalog annotated
// with whether it has already been pulled locally.
func (s *ImageService) KernelCatalog(_ context.Context) (api.KernelCatalogResult, error) {
catalog, err := kernelcat.LoadEmbedded()
if err != nil {
return api.KernelCatalogResult{}, err
}
local, _ := kernelcat.ListLocal(s.layout.KernelsDir)
pulled := make(map[string]bool, len(local))
for _, entry := range local {
pulled[entry.Name] = true
}
result := api.KernelCatalogResult{Entries: make([]api.KernelCatalogEntry, 0, len(catalog.Entries))}
for _, entry := range catalog.Entries {
result.Entries = append(result.Entries, api.KernelCatalogEntry{
Name: entry.Name,
Distro: entry.Distro,
Arch: entry.Arch,
KernelVersion: entry.KernelVersion,
SizeBytes: entry.SizeBytes,
Description: entry.Description,
Pulled: pulled[entry.Name],
})
}
return result, nil
}
// inferKernelVersion makes a best-effort guess at the kernel version from
// the source filename (e.g. "vmlinux-6.12.79_1") or falls back to the
// modules directory basename. Returns "" if nothing looks useful.
func inferKernelVersion(kernelPath, modulesDir string) string {
if modulesDir != "" {
if base := filepath.Base(modulesDir); base != "." && base != string(filepath.Separator) {
return base
}
}
base := filepath.Base(kernelPath)
for _, prefix := range []string{"vmlinux-", "vmlinuz-"} {
if strings.HasPrefix(base, prefix) {
return strings.TrimPrefix(base, prefix)
}
}
return ""
}
func kernelEntryToAPI(entry kernelcat.Entry) api.KernelEntry {
importedAt := ""
if !entry.ImportedAt.IsZero() {
importedAt = entry.ImportedAt.UTC().Format(time.RFC3339)
}
return api.KernelEntry{
Name: entry.Name,
Distro: entry.Distro,
Arch: entry.Arch,
KernelVersion: entry.KernelVersion,
SHA256: entry.SHA256,
Source: entry.Source,
ImportedAt: importedAt,
KernelPath: entry.KernelPath,
InitrdPath: entry.InitrdPath,
ModulesDir: entry.ModulesDir,
}
}
func kernelNotFoundIfMissing(name string, err error) error {
if err == nil {
return nil
}
if os.IsNotExist(err) {
return fmt.Errorf("kernel %q not found", name)
}
return err
}