daemon: serialise concurrent image/kernel pulls + atomic-rename seed refresh
Three concurrency bugs surfaced by `make smoke JOBS=4` that all stem from `vm.create` paths assuming single-caller semantics: 1. **Kernel auto-pull manifest race.** Parallel `vm.create` calls that each need to auto-pull the same kernel ref both run kernelcat.Fetch in parallel against the same /var/lib/banger/kernels/<name>/. Fetch writes manifest.json non-atomically (truncate + write); the peer reads it back mid-write and trips "parse manifest for X: unexpected end of JSON input". Fix: per-name `sync.Mutex` map on `ImageService` (kernelPullLock). `KernelPull` and `readOrAutoPullKernel` both acquire it and re-check `kernelcat.ReadLocal` after the lock so a peer who finished while we waited is treated as success — `readOrAutoPullKernel` does NOT call `s.KernelPull` because that path errors with "already pulled" on a peer-success, which would be wrong for auto-pull. Different kernels stay parallel. 2. **Image auto-pull race.** Same shape as the kernel race but on the image side: parallel `vm.create` calls both run pullFromBundle / pullFromOCI for the missing image (each ~minutes of OCI fetch + ext4 build). The publishImage atom under imageOpsMu only protects the rename + UpsertImage commit, so the loser does all the work only to fail at the recheck with "image already exists". Fix: per-name `sync.Mutex` map on `ImageService` (imagePullLock). `findOrAutoPullImage` acquires it, re-checks FindImage, and only then calls PullImage. Loser short-circuits with the freshly-published image instead of redoing minutes of work. PullImage's own publishImage recheck stays as defense-in-depth for callers that bypass the auto-pull path. 3. **Work-seed refresh race.** When the host's SSH key has rotated since an image was last refreshed, `ensureAuthorizedKeyOnWorkDisk` triggers `refreshManagedWorkSeedFingerprint`, which rewrote the shared work-seed.ext4 in place via e2rm + e2cp. Peer `vm.create` calls doing parallel `MaterializeWorkDisk` rdumps observed a torn ext4 image — "Superblock checksum does not match superblock". Fix: stage the rewrite on a sibling tmpfile (`<seed>.refresh.<pid>-<ns>.tmp`) and atomic-rename. Concurrent readers either have the file open (kernel keeps the pre-rename inode alive) or open after the rename (see the new inode) — never observe a partial state. Two parallel refreshes are idempotent (same daemon, same SSH key) so unique tmp names are enough; whichever rename lands last wins, with identical content. UpsertImage runs after the rename so the recorded fingerprint always matches what's on disk. Plus one smoke harness fix: reclassify `vm_prune` from `pure` to `global`. `vm prune -f` removes ALL stopped VMs system-wide, not just the ones the scenario created — so a parallel peer scenario that happens to have its VM in `created`/`stopped` momentarily gets wiped. Moving prune to the post-pool serial phase keeps it from racing with in-flight scenarios. After all four fixes, `make smoke JOBS=4` passes 21/21 in 174s (serial baseline 141s; the small overhead is the buffered-output and `wait -n` semaphore cost — well worth the parallelism for fast-iter work on a 32-core box). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
115eec8576
commit
72882e45d7
6 changed files with 162 additions and 13 deletions
|
|
@ -3,10 +3,13 @@ package daemon
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"os"
|
||||||
"strings"
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
"banger/internal/guest"
|
"banger/internal/guest"
|
||||||
"banger/internal/model"
|
"banger/internal/model"
|
||||||
|
"banger/internal/system"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (s *ImageService) seedAuthorizedKeyOnExt4Image(ctx context.Context, imagePath string) (string, error) {
|
func (s *ImageService) seedAuthorizedKeyOnExt4Image(ctx context.Context, imagePath string) (string, error) {
|
||||||
|
|
@ -27,17 +30,58 @@ func (s *ImageService) seedAuthorizedKeyOnExt4Image(ctx context.Context, imagePa
|
||||||
return fingerprint, nil
|
return fingerprint, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// refreshManagedWorkSeedFingerprint re-seeds work-seed.ext4 with the
|
||||||
|
// daemon's current SSH key when a previously-stored fingerprint has
|
||||||
|
// gone stale (host key rotated, image rebuilt without a new seed).
|
||||||
|
//
|
||||||
|
// This path is reachable from concurrent vm.create RPCs: each one
|
||||||
|
// reads the same stale image.SeededSSHPublicKeyFingerprint from the
|
||||||
|
// store and races into here. Modifying the seed in place via
|
||||||
|
// e2rm/e2cp is not concurrent-read-safe — peer vm.create calls doing
|
||||||
|
// `MaterializeWorkDisk` in parallel `RdumpExt4Dir` the seed and
|
||||||
|
// observe a torn ext4 image ("Superblock checksum does not match").
|
||||||
|
//
|
||||||
|
// Fix: stage the rewrite on a sibling tmpfile and atomic-rename. A
|
||||||
|
// concurrent reader either has the file open (kernel keeps the
|
||||||
|
// pre-rename inode alive) or opens after the rename (sees the new
|
||||||
|
// inode) — never observes a partial state. Two concurrent refreshes
|
||||||
|
// are idempotent (same daemon, same SSH key) so unique tmp suffixes
|
||||||
|
// are enough; whichever rename lands last wins, with identical
|
||||||
|
// content. UpsertImage runs after the rename so the recorded
|
||||||
|
// fingerprint always matches what's actually on disk for any reader
|
||||||
|
// that picks up the image record after this point.
|
||||||
func (s *ImageService) refreshManagedWorkSeedFingerprint(ctx context.Context, image model.Image, fingerprint string) error {
|
func (s *ImageService) refreshManagedWorkSeedFingerprint(ctx context.Context, image model.Image, fingerprint string) error {
|
||||||
if !image.Managed || strings.TrimSpace(image.WorkSeedPath) == "" || strings.TrimSpace(fingerprint) == "" {
|
if !image.Managed || strings.TrimSpace(image.WorkSeedPath) == "" || strings.TrimSpace(fingerprint) == "" {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
seededFingerprint, err := s.seedAuthorizedKeyOnExt4Image(ctx, image.WorkSeedPath)
|
|
||||||
|
// Unique sibling tmp path: same dir guarantees a same-FS rename.
|
||||||
|
// Two concurrent refreshes get distinct paths so they don't clobber
|
||||||
|
// each other's tmpfile mid-write.
|
||||||
|
tmpPath := fmt.Sprintf("%s.refresh.%d-%d.tmp", image.WorkSeedPath, os.Getpid(), time.Now().UnixNano())
|
||||||
|
if err := system.CopyFilePreferClone(image.WorkSeedPath, tmpPath); err != nil {
|
||||||
|
return fmt.Errorf("stage seed for refresh: %w", err)
|
||||||
|
}
|
||||||
|
committed := false
|
||||||
|
defer func() {
|
||||||
|
if !committed {
|
||||||
|
_ = os.Remove(tmpPath)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
seededFingerprint, err := s.seedAuthorizedKeyOnExt4Image(ctx, tmpPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if seededFingerprint == "" || seededFingerprint == image.SeededSSHPublicKeyFingerprint {
|
if seededFingerprint == "" || seededFingerprint == image.SeededSSHPublicKeyFingerprint {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err := os.Rename(tmpPath, image.WorkSeedPath); err != nil {
|
||||||
|
return fmt.Errorf("commit seed refresh: %w", err)
|
||||||
|
}
|
||||||
|
committed = true
|
||||||
|
|
||||||
image.SeededSSHPublicKeyFingerprint = seededFingerprint
|
image.SeededSSHPublicKeyFingerprint = seededFingerprint
|
||||||
image.UpdatedAt = model.Now()
|
image.UpdatedAt = model.Now()
|
||||||
return s.store.UpsertImage(ctx, image)
|
return s.store.UpsertImage(ctx, image)
|
||||||
|
|
|
||||||
|
|
@ -38,6 +38,29 @@ type ImageService struct {
|
||||||
// internal/daemon/ARCHITECTURE.md.
|
// internal/daemon/ARCHITECTURE.md.
|
||||||
imageOpsMu sync.Mutex
|
imageOpsMu sync.Mutex
|
||||||
|
|
||||||
|
// kernelPullLocksMu guards the kernelPullLocks map itself. Per-name
|
||||||
|
// mutexes inside the map serialise concurrent pulls of the same
|
||||||
|
// kernel ref. Without this, two parallel `vm run` callers that
|
||||||
|
// auto-pull the same kernel race on
|
||||||
|
// /var/lib/banger/kernels/<name>/manifest.json: one is mid-write
|
||||||
|
// from kernelcat.Fetch's WriteLocal while the other is reading it
|
||||||
|
// back, yielding "unexpected end of JSON input". The map keeps
|
||||||
|
// pulls of *different* kernels parallel.
|
||||||
|
kernelPullLocksMu sync.Mutex
|
||||||
|
kernelPullLocks map[string]*sync.Mutex
|
||||||
|
|
||||||
|
// imagePullLocksMu / imagePullLocks: same per-name pattern for
|
||||||
|
// image auto-pulls. Without this, parallel `vm.create` callers
|
||||||
|
// resolving a missing image both run the full OCI fetch + ext4
|
||||||
|
// build (each ~minutes), and the loser hits the "image already
|
||||||
|
// exists" recheck inside publishImage and fails after doing all
|
||||||
|
// the work for nothing. Locking around the FindImage-recheck +
|
||||||
|
// PullImage section means only one caller does the heavy work
|
||||||
|
// per image name; peers see the freshly-published image on the
|
||||||
|
// post-lock recheck.
|
||||||
|
imagePullLocksMu sync.Mutex
|
||||||
|
imagePullLocks map[string]*sync.Mutex
|
||||||
|
|
||||||
// Test seams; nil → real implementation.
|
// Test seams; nil → real implementation.
|
||||||
pullAndFlatten func(ctx context.Context, ref, cacheDir, destDir string) (imagepull.Metadata, error)
|
pullAndFlatten func(ctx context.Context, ref, cacheDir, destDir string) (imagepull.Metadata, error)
|
||||||
finalizePulledRootfs func(ctx context.Context, ext4File string, meta imagepull.Metadata) error
|
finalizePulledRootfs func(ctx context.Context, ext4File string, meta imagepull.Metadata) error
|
||||||
|
|
@ -73,6 +96,41 @@ func newImageService(deps imageServiceDeps) *ImageService {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// kernelPullLock returns the per-name mutex used to serialise kernel
|
||||||
|
// pulls of `name`. The map entry is created on first access and lives
|
||||||
|
// for the daemon's lifetime — kernels rarely churn and keeping the
|
||||||
|
// entry around saves the allocation and the second-acquire path stays
|
||||||
|
// branchless. Callers Lock() / Unlock() the returned mutex directly.
|
||||||
|
func (s *ImageService) kernelPullLock(name string) *sync.Mutex {
|
||||||
|
s.kernelPullLocksMu.Lock()
|
||||||
|
defer s.kernelPullLocksMu.Unlock()
|
||||||
|
if s.kernelPullLocks == nil {
|
||||||
|
s.kernelPullLocks = make(map[string]*sync.Mutex)
|
||||||
|
}
|
||||||
|
m, ok := s.kernelPullLocks[name]
|
||||||
|
if !ok {
|
||||||
|
m = &sync.Mutex{}
|
||||||
|
s.kernelPullLocks[name] = m
|
||||||
|
}
|
||||||
|
return m
|
||||||
|
}
|
||||||
|
|
||||||
|
// imagePullLock is the image-name peer of kernelPullLock; same lifetime
|
||||||
|
// and zero-allocation properties on the second-acquire path.
|
||||||
|
func (s *ImageService) imagePullLock(name string) *sync.Mutex {
|
||||||
|
s.imagePullLocksMu.Lock()
|
||||||
|
defer s.imagePullLocksMu.Unlock()
|
||||||
|
if s.imagePullLocks == nil {
|
||||||
|
s.imagePullLocks = make(map[string]*sync.Mutex)
|
||||||
|
}
|
||||||
|
m, ok := s.imagePullLocks[name]
|
||||||
|
if !ok {
|
||||||
|
m = &sync.Mutex{}
|
||||||
|
s.imagePullLocks[name] = m
|
||||||
|
}
|
||||||
|
return m
|
||||||
|
}
|
||||||
|
|
||||||
// FindImage is the service-owned lookup helper. It falls back from
|
// FindImage is the service-owned lookup helper. It falls back from
|
||||||
// exact-name → exact-id → prefix match, matching the historical
|
// exact-name → exact-id → prefix match, matching the historical
|
||||||
// daemon.FindImage behaviour. Kept on ImageService because image
|
// daemon.FindImage behaviour. Kept on ImageService because image
|
||||||
|
|
|
||||||
|
|
@ -276,24 +276,40 @@ func (s *ImageService) resolveKernelInputs(ctx context.Context, kernelRef, kerne
|
||||||
|
|
||||||
// readOrAutoPullKernel tries the local kernelcat first; on miss, checks
|
// readOrAutoPullKernel tries the local kernelcat first; on miss, checks
|
||||||
// the embedded catalog and auto-pulls the bundle.
|
// the embedded catalog and auto-pulls the bundle.
|
||||||
|
//
|
||||||
|
// Concurrency-safe: takes the same per-name pull lock as KernelPull and
|
||||||
|
// re-checks ReadLocal after acquiring it. If a peer finished the pull
|
||||||
|
// while we were waiting, the re-check returns the freshly-pulled entry
|
||||||
|
// — we explicitly do NOT call s.KernelPull from here because that path
|
||||||
|
// errors with "already pulled" on a successful peer-pull. Auto-pull's
|
||||||
|
// contract is "make sure this kernel is local"; "someone beat me to it"
|
||||||
|
// is success, not failure.
|
||||||
func (s *ImageService) readOrAutoPullKernel(ctx context.Context, kernelRef string) (kernelcat.Entry, error) {
|
func (s *ImageService) readOrAutoPullKernel(ctx context.Context, kernelRef string) (kernelcat.Entry, error) {
|
||||||
entry, err := kernelcat.ReadLocal(s.layout.KernelsDir, kernelRef)
|
if entry, err := kernelcat.ReadLocal(s.layout.KernelsDir, kernelRef); err == nil {
|
||||||
if err == nil {
|
|
||||||
return entry, nil
|
return entry, nil
|
||||||
}
|
} else if !os.IsNotExist(err) {
|
||||||
if !os.IsNotExist(err) {
|
|
||||||
return kernelcat.Entry{}, fmt.Errorf("resolve kernel %q: %w", kernelRef, err)
|
return kernelcat.Entry{}, fmt.Errorf("resolve kernel %q: %w", kernelRef, err)
|
||||||
}
|
}
|
||||||
catalog, loadErr := kernelcat.LoadEmbedded()
|
catalog, loadErr := kernelcat.LoadEmbedded()
|
||||||
if loadErr != nil {
|
if loadErr != nil {
|
||||||
return kernelcat.Entry{}, fmt.Errorf("kernel %q not found locally: %w", kernelRef, loadErr)
|
return kernelcat.Entry{}, fmt.Errorf("kernel %q not found locally: %w", kernelRef, loadErr)
|
||||||
}
|
}
|
||||||
if _, lookupErr := catalog.Lookup(kernelRef); lookupErr != nil {
|
catEntry, lookupErr := catalog.Lookup(kernelRef)
|
||||||
|
if lookupErr != nil {
|
||||||
return kernelcat.Entry{}, fmt.Errorf("kernel %q not found in catalog; run 'banger kernel list --available' to browse", kernelRef)
|
return kernelcat.Entry{}, fmt.Errorf("kernel %q not found in catalog; run 'banger kernel list --available' to browse", kernelRef)
|
||||||
}
|
}
|
||||||
vmCreateStage(ctx, "auto_pull_kernel", fmt.Sprintf("pulling kernel %s from catalog", kernelRef))
|
|
||||||
if _, pullErr := s.KernelPull(ctx, api.KernelPullParams{Name: kernelRef}); pullErr != nil {
|
lock := s.kernelPullLock(kernelRef)
|
||||||
return kernelcat.Entry{}, fmt.Errorf("auto-pull kernel %q: %w", kernelRef, pullErr)
|
lock.Lock()
|
||||||
|
defer lock.Unlock()
|
||||||
|
if entry, err := kernelcat.ReadLocal(s.layout.KernelsDir, kernelRef); err == nil {
|
||||||
|
return entry, nil
|
||||||
}
|
}
|
||||||
return kernelcat.ReadLocal(s.layout.KernelsDir, kernelRef)
|
|
||||||
|
vmCreateStage(ctx, "auto_pull_kernel", fmt.Sprintf("pulling kernel %s from catalog", kernelRef))
|
||||||
|
stored, err := kernelcat.Fetch(ctx, nil, s.layout.KernelsDir, catEntry)
|
||||||
|
if err != nil {
|
||||||
|
return kernelcat.Entry{}, fmt.Errorf("auto-pull kernel %q: %w", kernelRef, err)
|
||||||
|
}
|
||||||
|
return stored, nil
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -116,12 +116,23 @@ func (s *ImageService) KernelImport(ctx context.Context, params api.KernelImport
|
||||||
|
|
||||||
// KernelPull downloads a catalog entry by name into the local catalog. It
|
// KernelPull downloads a catalog entry by name into the local catalog. It
|
||||||
// refuses to overwrite an existing entry unless params.Force is set.
|
// refuses to overwrite an existing entry unless params.Force is set.
|
||||||
|
//
|
||||||
|
// Held under a per-name mutex so concurrent callers (the auto-pull
|
||||||
|
// path inside vm.create, parallel `banger kernel pull` invocations,
|
||||||
|
// or a mix) can't tear each other's manifest.json or extracted
|
||||||
|
// tarball. Lock first, then re-check the local catalog: a peer that
|
||||||
|
// already finished the pull while we waited produces the same
|
||||||
|
// "already pulled" error a fully-serial run would.
|
||||||
func (s *ImageService) KernelPull(ctx context.Context, params api.KernelPullParams) (api.KernelEntry, error) {
|
func (s *ImageService) KernelPull(ctx context.Context, params api.KernelPullParams) (api.KernelEntry, error) {
|
||||||
name := strings.TrimSpace(params.Name)
|
name := strings.TrimSpace(params.Name)
|
||||||
if err := kernelcat.ValidateName(name); err != nil {
|
if err := kernelcat.ValidateName(name); err != nil {
|
||||||
return api.KernelEntry{}, err
|
return api.KernelEntry{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
lock := s.kernelPullLock(name)
|
||||||
|
lock.Lock()
|
||||||
|
defer lock.Unlock()
|
||||||
|
|
||||||
if !params.Force {
|
if !params.Force {
|
||||||
if _, err := kernelcat.ReadLocal(s.layout.KernelsDir, name); err == nil {
|
if _, err := kernelcat.ReadLocal(s.layout.KernelsDir, name); err == nil {
|
||||||
return api.KernelEntry{}, fmt.Errorf("kernel %q already pulled; pass --force to re-pull", name)
|
return api.KernelEntry{}, fmt.Errorf("kernel %q already pulled; pass --force to re-pull", name)
|
||||||
|
|
|
||||||
|
|
@ -182,20 +182,40 @@ func (s *VMService) reserveVM(ctx context.Context, requestedName string, image m
|
||||||
// catalog, it auto-pulls the bundle so `vm create --image foo` (and
|
// catalog, it auto-pulls the bundle so `vm create --image foo` (and
|
||||||
// therefore `vm run`) works on a fresh host without the user having
|
// therefore `vm run`) works on a fresh host without the user having
|
||||||
// to run `image pull` first.
|
// to run `image pull` first.
|
||||||
|
//
|
||||||
|
// Concurrency: parallel vm.create RPCs targeting the same missing
|
||||||
|
// image must not both run the full OCI fetch + ext4 build. The pull
|
||||||
|
// itself takes minutes, and the publishImage atom that closes it
|
||||||
|
// only protects the rename + upsert — by the time the second caller
|
||||||
|
// gets there, it has already done all the work, only to fail at the
|
||||||
|
// recheck with "image already exists". Hold a per-name pull lock
|
||||||
|
// around the recheck-and-pull section: the loser waits, sees the
|
||||||
|
// image already published on the post-lock recheck, and short-
|
||||||
|
// circuits with a FindImage. PullImage's own internal recheck stays
|
||||||
|
// in place as defense-in-depth for callers that bypass this path.
|
||||||
func (s *VMService) findOrAutoPullImage(ctx context.Context, idOrName string) (model.Image, error) {
|
func (s *VMService) findOrAutoPullImage(ctx context.Context, idOrName string) (model.Image, error) {
|
||||||
image, err := s.img.FindImage(ctx, idOrName)
|
if image, err := s.img.FindImage(ctx, idOrName); err == nil {
|
||||||
if err == nil {
|
|
||||||
return image, nil
|
return image, nil
|
||||||
}
|
}
|
||||||
catalog, loadErr := imagecat.LoadEmbedded()
|
catalog, loadErr := imagecat.LoadEmbedded()
|
||||||
if loadErr != nil {
|
if loadErr != nil {
|
||||||
|
_, err := s.img.FindImage(ctx, idOrName)
|
||||||
return model.Image{}, err
|
return model.Image{}, err
|
||||||
}
|
}
|
||||||
entry, lookupErr := catalog.Lookup(idOrName)
|
entry, lookupErr := catalog.Lookup(idOrName)
|
||||||
if lookupErr != nil {
|
if lookupErr != nil {
|
||||||
// Not in the catalog either — surface the original not-found.
|
// Not in the catalog either — surface the original not-found.
|
||||||
|
_, err := s.img.FindImage(ctx, idOrName)
|
||||||
return model.Image{}, err
|
return model.Image{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
lock := s.img.imagePullLock(entry.Name)
|
||||||
|
lock.Lock()
|
||||||
|
defer lock.Unlock()
|
||||||
|
if image, err := s.img.FindImage(ctx, idOrName); err == nil {
|
||||||
|
return image, nil
|
||||||
|
}
|
||||||
|
|
||||||
vmCreateStage(ctx, "auto_pull_image", fmt.Sprintf("pulling %s from image catalog", entry.Name))
|
vmCreateStage(ctx, "auto_pull_image", fmt.Sprintf("pulling %s from image catalog", entry.Name))
|
||||||
if _, pullErr := s.img.PullImage(ctx, api.ImagePullParams{Ref: entry.Name}); pullErr != nil {
|
if _, pullErr := s.img.PullImage(ctx, api.ImagePullParams{Ref: entry.Name}); pullErr != nil {
|
||||||
return model.Image{}, fmt.Errorf("auto-pull image %q: %w", entry.Name, pullErr)
|
return model.Image{}, fmt.Errorf("auto-pull image %q: %w", entry.Name, pullErr)
|
||||||
|
|
|
||||||
|
|
@ -118,7 +118,7 @@ declare -A SMOKE_CLASS=(
|
||||||
[vm_set]=pure
|
[vm_set]=pure
|
||||||
[vm_restart]=pure
|
[vm_restart]=pure
|
||||||
[vm_kill]=pure
|
[vm_kill]=pure
|
||||||
[vm_prune]=pure
|
[vm_prune]=global
|
||||||
[vm_ports]=pure
|
[vm_ports]=pure
|
||||||
[workspace_full_copy]=repodir
|
[workspace_full_copy]=repodir
|
||||||
[workspace_basecommit]=repodir
|
[workspace_basecommit]=repodir
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue