daemon: tighten concurrency around pulls, cleanup, and handle persistence
Four targeted fixes from a race-condition audit of the daemon package.
None change behaviour on the happy path; each closes a window where a
concurrent or interrupted RPC could strand state on the host.
- KernelDelete now holds the same per-name lock as KernelPull /
readOrAutoPullKernel. Without it, a delete racing a concurrent
pull could remove files mid-write or land between the pull's
manifest write and its first use.
- cleanupRuntime no longer early-returns on an inner waitForExit
failure; DM snapshot, capability, and tap teardown always run and
every error is folded into the returned errors.Join. EBUSY against
a still-alive firecracker is benign and surfaces in the joined
error rather than stranding kernel state across daemon restarts.
- Per-name image / kernel pull locks switch from *sync.Mutex to a
1-buffered chan struct{}. Acquire is a select on ctx.Done(), so a
peer waiting behind a pull whose RPC was cancelled can bail out
instead of blocking forever on a pull nobody is consuming.
- setVMHandles writes the per-VM scratch file before updating the
in-memory cache. A daemon crash between the two now leaves disk
ahead of memory (recoverable: reconcile re-seeds the cache from
the file on next start) rather than memory ahead of disk (lost
handles → stranded DM/loops/tap).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
777b597a1e
commit
c4e1cb5953
6 changed files with 99 additions and 39 deletions
|
|
@ -39,15 +39,20 @@ type ImageService struct {
|
|||
imageOpsMu sync.Mutex
|
||||
|
||||
// kernelPullLocksMu guards the kernelPullLocks map itself. Per-name
|
||||
// mutexes inside the map serialise concurrent pulls of the same
|
||||
// kernel ref. Without this, two parallel `vm run` callers that
|
||||
// auto-pull the same kernel race on
|
||||
// channel locks inside the map serialise concurrent pulls of the
|
||||
// same kernel ref. Without this, two parallel `vm run` callers
|
||||
// that auto-pull the same kernel race on
|
||||
// /var/lib/banger/kernels/<name>/manifest.json: one is mid-write
|
||||
// from kernelcat.Fetch's WriteLocal while the other is reading it
|
||||
// back, yielding "unexpected end of JSON input". The map keeps
|
||||
// pulls of *different* kernels parallel.
|
||||
//
|
||||
// chan struct{} (cap 1) instead of sync.Mutex: acquire is a
|
||||
// `select` that respects ctx.Done(), so a peer waiting behind a
|
||||
// pull whose RPC was cancelled can bail out instead of blocking
|
||||
// forever on a pull that nobody is consuming.
|
||||
kernelPullLocksMu sync.Mutex
|
||||
kernelPullLocks map[string]*sync.Mutex
|
||||
kernelPullLocks map[string]chan struct{}
|
||||
|
||||
// imagePullLocksMu / imagePullLocks: same per-name pattern for
|
||||
// image auto-pulls. Without this, parallel `vm.create` callers
|
||||
|
|
@ -59,7 +64,7 @@ type ImageService struct {
|
|||
// per image name; peers see the freshly-published image on the
|
||||
// post-lock recheck.
|
||||
imagePullLocksMu sync.Mutex
|
||||
imagePullLocks map[string]*sync.Mutex
|
||||
imagePullLocks map[string]chan struct{}
|
||||
|
||||
// Test seams; nil → real implementation.
|
||||
pullAndFlatten func(ctx context.Context, ref, cacheDir, destDir string) (imagepull.Metadata, error)
|
||||
|
|
@ -96,39 +101,61 @@ func newImageService(deps imageServiceDeps) *ImageService {
|
|||
}
|
||||
}
|
||||
|
||||
// kernelPullLock returns the per-name mutex used to serialise kernel
|
||||
// pulls of `name`. The map entry is created on first access and lives
|
||||
// for the daemon's lifetime — kernels rarely churn and keeping the
|
||||
// entry around saves the allocation and the second-acquire path stays
|
||||
// branchless. Callers Lock() / Unlock() the returned mutex directly.
|
||||
func (s *ImageService) kernelPullLock(name string) *sync.Mutex {
|
||||
// acquireKernelPullLock blocks until the per-name lock for `name` is
|
||||
// free or ctx is cancelled. On success returns a release func that
|
||||
// the caller must invoke (typically via defer). On ctx cancellation
|
||||
// returns ctx.Err() and a nil release. The map entry is created on
|
||||
// first access and lives for the daemon's lifetime — kernels rarely
|
||||
// churn and keeping the entry around keeps the second-acquire path
|
||||
// branchless.
|
||||
func (s *ImageService) acquireKernelPullLock(ctx context.Context, name string) (func(), error) {
|
||||
ch := s.kernelPullLockChan(name)
|
||||
select {
|
||||
case ch <- struct{}{}:
|
||||
return func() { <-ch }, nil
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
}
|
||||
|
||||
func (s *ImageService) kernelPullLockChan(name string) chan struct{} {
|
||||
s.kernelPullLocksMu.Lock()
|
||||
defer s.kernelPullLocksMu.Unlock()
|
||||
if s.kernelPullLocks == nil {
|
||||
s.kernelPullLocks = make(map[string]*sync.Mutex)
|
||||
s.kernelPullLocks = make(map[string]chan struct{})
|
||||
}
|
||||
m, ok := s.kernelPullLocks[name]
|
||||
ch, ok := s.kernelPullLocks[name]
|
||||
if !ok {
|
||||
m = &sync.Mutex{}
|
||||
s.kernelPullLocks[name] = m
|
||||
ch = make(chan struct{}, 1)
|
||||
s.kernelPullLocks[name] = ch
|
||||
}
|
||||
return m
|
||||
return ch
|
||||
}
|
||||
|
||||
// imagePullLock is the image-name peer of kernelPullLock; same lifetime
|
||||
// and zero-allocation properties on the second-acquire path.
|
||||
func (s *ImageService) imagePullLock(name string) *sync.Mutex {
|
||||
// acquireImagePullLock is the image-name peer of acquireKernelPullLock;
|
||||
// same semantics and lifetime.
|
||||
func (s *ImageService) acquireImagePullLock(ctx context.Context, name string) (func(), error) {
|
||||
ch := s.imagePullLockChan(name)
|
||||
select {
|
||||
case ch <- struct{}{}:
|
||||
return func() { <-ch }, nil
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
}
|
||||
|
||||
func (s *ImageService) imagePullLockChan(name string) chan struct{} {
|
||||
s.imagePullLocksMu.Lock()
|
||||
defer s.imagePullLocksMu.Unlock()
|
||||
if s.imagePullLocks == nil {
|
||||
s.imagePullLocks = make(map[string]*sync.Mutex)
|
||||
s.imagePullLocks = make(map[string]chan struct{})
|
||||
}
|
||||
m, ok := s.imagePullLocks[name]
|
||||
ch, ok := s.imagePullLocks[name]
|
||||
if !ok {
|
||||
m = &sync.Mutex{}
|
||||
s.imagePullLocks[name] = m
|
||||
ch = make(chan struct{}, 1)
|
||||
s.imagePullLocks[name] = ch
|
||||
}
|
||||
return m
|
||||
return ch
|
||||
}
|
||||
|
||||
// FindImage is the service-owned lookup helper. It falls back from
|
||||
|
|
|
|||
|
|
@ -299,9 +299,11 @@ func (s *ImageService) readOrAutoPullKernel(ctx context.Context, kernelRef strin
|
|||
return kernelcat.Entry{}, fmt.Errorf("kernel %q not found in catalog; run 'banger kernel list --available' to browse", kernelRef)
|
||||
}
|
||||
|
||||
lock := s.kernelPullLock(kernelRef)
|
||||
lock.Lock()
|
||||
defer lock.Unlock()
|
||||
release, err := s.acquireKernelPullLock(ctx, kernelRef)
|
||||
if err != nil {
|
||||
return kernelcat.Entry{}, err
|
||||
}
|
||||
defer release()
|
||||
if entry, err := kernelcat.ReadLocal(s.layout.KernelsDir, kernelRef); err == nil {
|
||||
return entry, nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -34,10 +34,19 @@ func (s *ImageService) KernelShow(_ context.Context, name string) (api.KernelEnt
|
|||
return kernelEntryToAPI(entry), nil
|
||||
}
|
||||
|
||||
func (s *ImageService) KernelDelete(_ context.Context, name string) error {
|
||||
func (s *ImageService) KernelDelete(ctx context.Context, name string) error {
|
||||
if err := kernelcat.ValidateName(name); err != nil {
|
||||
return err
|
||||
}
|
||||
// Hold the same per-name lock KernelPull / readOrAutoPullKernel
|
||||
// take. Without it, a delete racing a concurrent pull can land
|
||||
// between the pull's manifest write and the entry's first use,
|
||||
// or remove files the pull is still writing.
|
||||
release, err := s.acquireKernelPullLock(ctx, name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer release()
|
||||
return kernelcat.DeleteLocal(s.layout.KernelsDir, name)
|
||||
}
|
||||
|
||||
|
|
@ -129,9 +138,11 @@ func (s *ImageService) KernelPull(ctx context.Context, params api.KernelPullPara
|
|||
return api.KernelEntry{}, err
|
||||
}
|
||||
|
||||
lock := s.kernelPullLock(name)
|
||||
lock.Lock()
|
||||
defer lock.Unlock()
|
||||
release, err := s.acquireKernelPullLock(ctx, name)
|
||||
if err != nil {
|
||||
return api.KernelEntry{}, err
|
||||
}
|
||||
defer release()
|
||||
|
||||
if !params.Force {
|
||||
if _, err := kernelcat.ReadLocal(s.layout.KernelsDir, name); err == nil {
|
||||
|
|
|
|||
|
|
@ -99,6 +99,15 @@ func teardownHandlesForCleanup(vm model.VMRecord, live model.VMHandles) model.VM
|
|||
// because it reaches into handles (VMService-owned); the capability
|
||||
// teardown goes through the capHooks seam to keep Daemon out of the
|
||||
// dependency chain.
|
||||
//
|
||||
// Idempotency contract: every step runs even when an earlier step
|
||||
// fails, and the per-step errors are joined into the returned value.
|
||||
// A waitForExit timeout (firecracker refused to die) used to early-
|
||||
// return, leaving DM/feature/tap state stranded on the host across
|
||||
// daemon restarts. With collect-and-continue the kernel teardowns
|
||||
// still attempt; in the worst case (firecracker actually still alive)
|
||||
// they fail with EBUSY which is also surfaced via errors.Join — no
|
||||
// damage, but the operator sees the full picture.
|
||||
func (s *VMService) cleanupRuntime(ctx context.Context, vm model.VMRecord, preserveDisks bool) error {
|
||||
if s.logger != nil {
|
||||
s.logger.Debug("cleanup runtime", append(vmLogAttrs(vm), "preserve_disks", preserveDisks)...)
|
||||
|
|
@ -110,10 +119,12 @@ func (s *VMService) cleanupRuntime(ctx context.Context, vm model.VMRecord, prese
|
|||
cleanupPID = pid
|
||||
}
|
||||
}
|
||||
var waitErr error
|
||||
if cleanupPID > 0 && system.ProcessRunning(cleanupPID, vm.Runtime.APISockPath) {
|
||||
_ = s.net.killVMProcess(ctx, cleanupPID)
|
||||
if err := s.net.waitForExit(ctx, cleanupPID, vm.Runtime.APISockPath, 30*time.Second); err != nil {
|
||||
return err
|
||||
waitErr = s.net.waitForExit(ctx, cleanupPID, vm.Runtime.APISockPath, 30*time.Second)
|
||||
if waitErr != nil && s.logger != nil {
|
||||
s.logger.Warn("cleanup wait_for_exit failed; continuing teardown", append(vmLogAttrs(vm), "pid", cleanupPID, "error", waitErr.Error())...)
|
||||
}
|
||||
}
|
||||
handles := teardownHandlesForCleanup(vm, h)
|
||||
|
|
@ -143,9 +154,9 @@ func (s *VMService) cleanupRuntime(ctx context.Context, vm model.VMRecord, prese
|
|||
// when the caller forgets to call clearVMHandles explicitly.
|
||||
s.clearVMHandles(vm)
|
||||
if !preserveDisks && vm.Runtime.VMDir != "" {
|
||||
return errors.Join(snapshotErr, featureErr, tapErr, os.RemoveAll(vm.Runtime.VMDir))
|
||||
return errors.Join(waitErr, snapshotErr, featureErr, tapErr, os.RemoveAll(vm.Runtime.VMDir))
|
||||
}
|
||||
return errors.Join(snapshotErr, featureErr, tapErr)
|
||||
return errors.Join(waitErr, snapshotErr, featureErr, tapErr)
|
||||
}
|
||||
|
||||
func (s *VMService) generateName(ctx context.Context) (string, error) {
|
||||
|
|
|
|||
|
|
@ -209,9 +209,11 @@ func (s *VMService) findOrAutoPullImage(ctx context.Context, idOrName string) (m
|
|||
return model.Image{}, err
|
||||
}
|
||||
|
||||
lock := s.img.imagePullLock(entry.Name)
|
||||
lock.Lock()
|
||||
defer lock.Unlock()
|
||||
release, err := s.img.acquireImagePullLock(ctx, entry.Name)
|
||||
if err != nil {
|
||||
return model.Image{}, err
|
||||
}
|
||||
defer release()
|
||||
if image, err := s.img.FindImage(ctx, idOrName); err == nil {
|
||||
return image, nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -138,16 +138,23 @@ func (s *VMService) vmHandles(vmID string) model.VMHandles {
|
|||
// fields onto VMRuntime, and writes the per-VM scratch file.
|
||||
// Scratch-file errors are logged but not returned; the cache remains
|
||||
// authoritative while the daemon is alive.
|
||||
//
|
||||
// Write order: file first, cache second. A daemon crash between the
|
||||
// two leaves the on-disk scratch file ahead of the in-memory cache —
|
||||
// which is the recoverable direction, since reconcile re-seeds the
|
||||
// cache from the file on the next start. The reverse order would let
|
||||
// a crash strand handles the daemon already saw as live but never
|
||||
// persisted, breaking the next-start teardown of DM/loops/tap.
|
||||
func (s *VMService) setVMHandles(vm *model.VMRecord, h model.VMHandles) {
|
||||
if s == nil || vm == nil {
|
||||
return
|
||||
}
|
||||
persistRuntimeTeardownState(vm, h)
|
||||
s.ensureHandleCache()
|
||||
s.handles.set(vm.ID, h)
|
||||
if err := writeHandlesFile(vm.Runtime.VMDir, h); err != nil && s.logger != nil {
|
||||
s.logger.Warn("persist handles.json failed", "vm_id", vm.ID, "error", err.Error())
|
||||
}
|
||||
s.handles.set(vm.ID, h)
|
||||
}
|
||||
|
||||
// clearVMHandles drops the cache entry and removes the scratch
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue