banger/internal/daemon/concurrency_test.go
Thales Maciel 16702bd5e1
daemon split (6/n): extract wireServices + drop lazy service getters
Factor the service + capability wiring out of Daemon.Open() into
wireServices(d), an idempotent helper that constructs HostNetwork,
ImageService, WorkspaceService, and VMService from whatever
infrastructure (runner, store, config, layout, logger, closing) is
already set on d. Open() calls it once after filling the composition
root; tests that build &Daemon{...} literals call it to get a working
service graph, preinstalling stubs on the fields they want to fake.

Drops the four lazy-init getters on *Daemon — d.hostNet(),
d.imageSvc(), d.workspaceSvc(), d.vmSvc() — whose sole purpose was
keeping test literals working. Every production call site now reads
d.net / d.img / d.ws / d.vm directly; the services are guaranteed
non-nil once Open returns. No behavior change.

Mechanical: all existing `d.xxxSvc()` calls (production + tests)
rewritten to field access; each `d := &Daemon{...}` in tests gets a
trailing wireServices(d) so the literal + wiring are side-by-side.
Tests that override a pre-built service (e.g. d.img = &ImageService{
bundleFetch: stub}) now set the override before wireServices so the
replacement propagates into VMService's peer pointer.

Also nil-guards HostNetwork.stopVMDNS and d.store in Close() so
partially-initialised daemons (pre-reconcile open failure) still
tear down cleanly — same contract the old lazy getters provided.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-21 15:55:28 -03:00

208 lines
5.8 KiB
Go

package daemon
import (
"context"
"os"
"path/filepath"
"sync"
"sync/atomic"
"testing"
"time"
"banger/internal/api"
"banger/internal/imagepull"
"banger/internal/paths"
"banger/internal/system"
)
// TestPullImageDoesNotSerialiseOnDifferentNames confirms the refactor
// actually releases imageOpsMu during the slow staging phase: two
// PullImage calls for distinct names run concurrently, with the
// "pull" half overlapping in time. Before the fix the two would have
// run strictly sequentially (one blocking the other inside
// imageOpsMu across the full OCI pull), which the maxActive >= 2
// assertion would fail.
func TestPullImageDoesNotSerialiseOnDifferentNames(t *testing.T) {
if _, err := os.Stat("/usr/bin/mkfs.ext4"); err != nil {
if _, err := os.Stat("/sbin/mkfs.ext4"); err != nil {
t.Skip("mkfs.ext4 not available; skipping")
}
}
imagesDir := t.TempDir()
cacheDir := t.TempDir()
kernel, initrd, modules := writeFakeKernelTriple(t)
var (
active atomic.Int32
maxActive atomic.Int32
enterPull = make(chan struct{})
startRelease = make(chan struct{})
)
slowPullAndFlatten := func(_ context.Context, _ string, _ string, destDir string) (imagepull.Metadata, error) {
// Record that we entered the pull body.
enterPull <- struct{}{}
// Track concurrent overlap.
n := active.Add(1)
for {
cur := maxActive.Load()
if n <= cur || maxActive.CompareAndSwap(cur, n) {
break
}
}
// Wait for the test to unblock us AFTER both pulls have
// entered the body.
<-startRelease
active.Add(-1)
// Produce the minimal synthetic tree stubPullAndFlatten does.
if err := os.MkdirAll(filepath.Join(destDir, "etc"), 0o755); err != nil {
return imagepull.Metadata{}, err
}
if err := os.WriteFile(filepath.Join(destDir, "etc", "hello"), []byte("world"), 0o644); err != nil {
return imagepull.Metadata{}, err
}
return imagepull.Metadata{Entries: map[string]imagepull.FileMeta{}}, nil
}
d := &Daemon{
layout: paths.Layout{ImagesDir: imagesDir, OCICacheDir: cacheDir},
store: openDaemonStore(t),
runner: system.NewRunner(),
}
d.img = &ImageService{
layout: d.layout,
store: d.store,
runner: d.runner,
pullAndFlatten: slowPullAndFlatten,
finalizePulledRootfs: stubFinalizePulledRootfs,
}
wireServices(d)
mkParams := func(name string) api.ImagePullParams {
return api.ImagePullParams{
Ref: "example.invalid/" + name + ":latest",
Name: name,
KernelPath: kernel,
InitrdPath: initrd,
ModulesDir: modules,
}
}
var wg sync.WaitGroup
errs := make([]error, 2)
for i, name := range []string{"alpha", "beta"} {
wg.Add(1)
go func(i int, name string) {
defer wg.Done()
_, err := d.img.PullImage(context.Background(), mkParams(name))
errs[i] = err
}(i, name)
}
// Wait for BOTH pulls to enter the slow body before we release
// them. If imageOpsMu still wrapped the full flow, the second
// pull would block on the mutex and never reach the enterPull
// send — the timeout below would fire.
for i := 0; i < 2; i++ {
select {
case <-enterPull:
case <-time.After(3 * time.Second):
t.Fatalf("pull %d never entered the slow body — imageOpsMu still serialises distinct pulls", i+1)
}
}
close(startRelease)
wg.Wait()
for i, err := range errs {
if err != nil {
t.Fatalf("pull %d failed: %v", i+1, err)
}
}
if maxActive.Load() < 2 {
t.Fatalf("maxActive = %d, want >= 2 (pulls did not overlap)", maxActive.Load())
}
}
// TestPullImageRejectsNameClashAtPublish confirms the publish-window
// recheck is what actually enforces name uniqueness now that the slow
// body runs unlocked. Two pulls race to the same name; one wins and
// the other errors.
func TestPullImageRejectsNameClashAtPublish(t *testing.T) {
if _, err := os.Stat("/usr/bin/mkfs.ext4"); err != nil {
if _, err := os.Stat("/sbin/mkfs.ext4"); err != nil {
t.Skip("mkfs.ext4 not available; skipping")
}
}
imagesDir := t.TempDir()
cacheDir := t.TempDir()
kernel, initrd, modules := writeFakeKernelTriple(t)
release := make(chan struct{})
synchronised := make(chan struct{}, 2)
pullAndFlatten := func(_ context.Context, _ string, _ string, destDir string) (imagepull.Metadata, error) {
synchronised <- struct{}{}
<-release
if err := os.MkdirAll(filepath.Join(destDir, "etc"), 0o755); err != nil {
return imagepull.Metadata{}, err
}
if err := os.WriteFile(filepath.Join(destDir, "marker"), []byte("ok"), 0o644); err != nil {
return imagepull.Metadata{}, err
}
return imagepull.Metadata{Entries: map[string]imagepull.FileMeta{}}, nil
}
d := &Daemon{
layout: paths.Layout{ImagesDir: imagesDir, OCICacheDir: cacheDir},
store: openDaemonStore(t),
runner: system.NewRunner(),
}
d.img = &ImageService{
layout: d.layout,
store: d.store,
runner: d.runner,
pullAndFlatten: pullAndFlatten,
finalizePulledRootfs: stubFinalizePulledRootfs,
}
wireServices(d)
params := api.ImagePullParams{
Ref: "example.invalid/contender:latest",
Name: "contender",
KernelPath: kernel,
InitrdPath: initrd,
ModulesDir: modules,
}
var wg sync.WaitGroup
errs := make([]error, 2)
for i := 0; i < 2; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
_, err := d.img.PullImage(context.Background(), params)
errs[i] = err
}(i)
}
// Both workers must enter the pull body before either publishes.
for i := 0; i < 2; i++ {
select {
case <-synchronised:
case <-time.After(3 * time.Second):
t.Fatalf("pull %d never entered the slow body", i+1)
}
}
close(release)
wg.Wait()
wins, losses := 0, 0
for _, err := range errs {
if err == nil {
wins++
} else {
losses++
}
}
if wins != 1 || losses != 1 {
t.Fatalf("wins=%d losses=%d, want exactly one of each (errs=%v)", wins, losses, errs)
}
}