daemon: shrink createVMMu + imageOpsMu to reservation/publication windows
Before: createVMMu was held across the whole of CreateVM — including
image resolution (which could fire a full auto-pull) and startVMLocked
(boot of multiple seconds). imageOpsMu was held across the whole of
PullImage/RegisterImage/PromoteImage/DeleteImage, so any slow OCI pull,
bundle download, or file copy blocked every other image mutation and
every other VM create that needed to auto-pull. The async create API
bought nothing if all creates serialised on the same mutex.
CreateVM is now three phases:
1. Validate + resolve image (possibly auto-pulling). No global lock.
2. reserveVM: take createVMMu only long enough to re-check the name
is free, allocate the next guest IP, and UpsertVM the "created"
row. Milliseconds.
3. startVMLocked: run the full boot flow under the per-VM lock only.
Parallel creates of different VMs now overlap on image resolution +
boot; they contend only across the reservation claim.
For the image surface a new publishImage helper isolates the commit
atom (recheck name free, atomic rename stagingDir→finalDir, UpsertImage)
under imageOpsMu. pullFromBundle + pullFromOCI do their network fetch
+ ext4 build + ownership fixup + agent injection outside the lock;
Register moves validation + kernel resolution outside; Promote moves
file copy + SSH-key seeding outside; Delete keeps a brief lock over
the lookup + reference check + store delete and does file cleanup
unlocked.
Two concurrency tests assert the new behaviour:
- TestPullImageDoesNotSerialiseOnDifferentNames fails the old code
(second pull blocks on imageOpsMu and never reaches the body).
- TestPullImageRejectsNameClashAtPublish confirms the publish-window
recheck is what enforces name uniqueness now that the body runs
unlocked — exactly one winner.
ARCHITECTURE.md updated to describe the new scope explicitly instead
of calling the locks "narrow".
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
afe91e805a
commit
99d0811097
5 changed files with 390 additions and 95 deletions
|
|
@ -31,10 +31,23 @@ owning types:
|
|||
reconstruct the cache and verify processes against `/proc` via
|
||||
pgrep. Nothing in the durable `vms` SQLite row describes transient
|
||||
kernel state. See `internal/daemon/vm_handles.go`.
|
||||
- `createVMMu sync.Mutex` — serialises `CreateVM` (guards name uniqueness
|
||||
+ guest IP allocation window).
|
||||
- `imageOpsMu sync.Mutex` — serialises image-registry mutations
|
||||
(`PullImage`, `RegisterImage`, `PromoteImage`, `DeleteImage`).
|
||||
- `createVMMu sync.Mutex` — narrow **reservation** mutex. `CreateVM`
|
||||
resolves the image (possibly auto-pulling, which self-locks on
|
||||
`imageOpsMu`) and parses sizing flags outside this lock, then holds
|
||||
`createVMMu` only to re-check that the requested VM name is still
|
||||
free, allocate the next guest IP, and insert the initial "created"
|
||||
row. The subsequent boot flow runs under the per-VM lock only.
|
||||
Parallel `vm create` calls therefore overlap on image resolution and
|
||||
boot; they contend only across the millisecond-scale name+IP claim.
|
||||
- `imageOpsMu sync.Mutex` — narrow **publication** mutex. `PullImage`
|
||||
(both bundle and OCI paths), `RegisterImage`, `PromoteImage`, and
|
||||
`DeleteImage` do their slow work (network fetch, ext4 build,
|
||||
ownership fixup, file copy, SSH-key seeding) without this lock and
|
||||
acquire it only for the commit atom: recheck name free, atomic
|
||||
rename of the staging dir to its final home, upsert the store row.
|
||||
Two pulls for different images run fully in parallel; two pulls that
|
||||
race to the same name are resolved at the recheck — the loser fails
|
||||
fast and its staging dir is cleaned up.
|
||||
- `createOps opstate.Registry[*vmCreateOperationState]` — in-flight VM
|
||||
create operations; owns its own lock.
|
||||
- `tapPool tapPool` — TAP interface pool; owns its own lock.
|
||||
|
|
@ -93,8 +106,13 @@ Notes:
|
|||
- `vmLocks[id]` is the outer lock for any operation scoped to a single VM.
|
||||
Acquired via `withVMLockByID` / `withVMLockByRef`. The callback runs
|
||||
under the lock — treat the whole function body as critical section.
|
||||
- `createVMMu` and `imageOpsMu` are narrow: each guards one family of
|
||||
mutations and is released before any blocking guest I/O.
|
||||
- `createVMMu` is held only across the VM-name reservation + IP
|
||||
allocation + initial UpsertVM. Image resolution and the full boot
|
||||
flow happen outside it.
|
||||
- `imageOpsMu` is held only across the publication atom (recheck name
|
||||
+ atomic rename + UpsertImage, or the equivalent for Register /
|
||||
Promote / Delete). Network fetch, ext4 build, and file copies run
|
||||
unlocked.
|
||||
- Holding a subsystem-local lock while calling into guest SSH is
|
||||
discouraged; copy needed state out under the lock and release before
|
||||
blocking I/O.
|
||||
|
|
|
|||
196
internal/daemon/concurrency_test.go
Normal file
196
internal/daemon/concurrency_test.go
Normal file
|
|
@ -0,0 +1,196 @@
|
|||
package daemon
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"banger/internal/api"
|
||||
"banger/internal/imagepull"
|
||||
"banger/internal/paths"
|
||||
"banger/internal/system"
|
||||
)
|
||||
|
||||
// TestPullImageDoesNotSerialiseOnDifferentNames confirms the refactor
|
||||
// actually releases imageOpsMu during the slow staging phase: two
|
||||
// PullImage calls for distinct names run concurrently, with the
|
||||
// "pull" half overlapping in time. Before the fix the two would have
|
||||
// run strictly sequentially (one blocking the other inside
|
||||
// imageOpsMu across the full OCI pull), which the maxActive >= 2
|
||||
// assertion would fail.
|
||||
func TestPullImageDoesNotSerialiseOnDifferentNames(t *testing.T) {
|
||||
if _, err := os.Stat("/usr/bin/mkfs.ext4"); err != nil {
|
||||
if _, err := os.Stat("/sbin/mkfs.ext4"); err != nil {
|
||||
t.Skip("mkfs.ext4 not available; skipping")
|
||||
}
|
||||
}
|
||||
imagesDir := t.TempDir()
|
||||
cacheDir := t.TempDir()
|
||||
kernel, initrd, modules := writeFakeKernelTriple(t)
|
||||
|
||||
var (
|
||||
active atomic.Int32
|
||||
maxActive atomic.Int32
|
||||
enterPull = make(chan struct{})
|
||||
startRelease = make(chan struct{})
|
||||
)
|
||||
|
||||
slowPullAndFlatten := func(_ context.Context, _ string, _ string, destDir string) (imagepull.Metadata, error) {
|
||||
// Record that we entered the pull body.
|
||||
enterPull <- struct{}{}
|
||||
// Track concurrent overlap.
|
||||
n := active.Add(1)
|
||||
for {
|
||||
cur := maxActive.Load()
|
||||
if n <= cur || maxActive.CompareAndSwap(cur, n) {
|
||||
break
|
||||
}
|
||||
}
|
||||
// Wait for the test to unblock us AFTER both pulls have
|
||||
// entered the body.
|
||||
<-startRelease
|
||||
active.Add(-1)
|
||||
// Produce the minimal synthetic tree stubPullAndFlatten does.
|
||||
if err := os.MkdirAll(filepath.Join(destDir, "etc"), 0o755); err != nil {
|
||||
return imagepull.Metadata{}, err
|
||||
}
|
||||
if err := os.WriteFile(filepath.Join(destDir, "etc", "hello"), []byte("world"), 0o644); err != nil {
|
||||
return imagepull.Metadata{}, err
|
||||
}
|
||||
return imagepull.Metadata{Entries: map[string]imagepull.FileMeta{}}, nil
|
||||
}
|
||||
|
||||
d := &Daemon{
|
||||
layout: paths.Layout{ImagesDir: imagesDir, OCICacheDir: cacheDir},
|
||||
store: openDaemonStore(t),
|
||||
runner: system.NewRunner(),
|
||||
pullAndFlatten: slowPullAndFlatten,
|
||||
finalizePulledRootfs: stubFinalizePulledRootfs,
|
||||
}
|
||||
|
||||
mkParams := func(name string) api.ImagePullParams {
|
||||
return api.ImagePullParams{
|
||||
Ref: "example.invalid/" + name + ":latest",
|
||||
Name: name,
|
||||
KernelPath: kernel,
|
||||
InitrdPath: initrd,
|
||||
ModulesDir: modules,
|
||||
}
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
errs := make([]error, 2)
|
||||
for i, name := range []string{"alpha", "beta"} {
|
||||
wg.Add(1)
|
||||
go func(i int, name string) {
|
||||
defer wg.Done()
|
||||
_, err := d.PullImage(context.Background(), mkParams(name))
|
||||
errs[i] = err
|
||||
}(i, name)
|
||||
}
|
||||
|
||||
// Wait for BOTH pulls to enter the slow body before we release
|
||||
// them. If imageOpsMu still wrapped the full flow, the second
|
||||
// pull would block on the mutex and never reach the enterPull
|
||||
// send — the timeout below would fire.
|
||||
for i := 0; i < 2; i++ {
|
||||
select {
|
||||
case <-enterPull:
|
||||
case <-time.After(3 * time.Second):
|
||||
t.Fatalf("pull %d never entered the slow body — imageOpsMu still serialises distinct pulls", i+1)
|
||||
}
|
||||
}
|
||||
close(startRelease)
|
||||
wg.Wait()
|
||||
|
||||
for i, err := range errs {
|
||||
if err != nil {
|
||||
t.Fatalf("pull %d failed: %v", i+1, err)
|
||||
}
|
||||
}
|
||||
if maxActive.Load() < 2 {
|
||||
t.Fatalf("maxActive = %d, want >= 2 (pulls did not overlap)", maxActive.Load())
|
||||
}
|
||||
}
|
||||
|
||||
// TestPullImageRejectsNameClashAtPublish confirms the publish-window
|
||||
// recheck is what actually enforces name uniqueness now that the slow
|
||||
// body runs unlocked. Two pulls race to the same name; one wins and
|
||||
// the other errors.
|
||||
func TestPullImageRejectsNameClashAtPublish(t *testing.T) {
|
||||
if _, err := os.Stat("/usr/bin/mkfs.ext4"); err != nil {
|
||||
if _, err := os.Stat("/sbin/mkfs.ext4"); err != nil {
|
||||
t.Skip("mkfs.ext4 not available; skipping")
|
||||
}
|
||||
}
|
||||
imagesDir := t.TempDir()
|
||||
cacheDir := t.TempDir()
|
||||
kernel, initrd, modules := writeFakeKernelTriple(t)
|
||||
|
||||
release := make(chan struct{})
|
||||
synchronised := make(chan struct{}, 2)
|
||||
pullAndFlatten := func(_ context.Context, _ string, _ string, destDir string) (imagepull.Metadata, error) {
|
||||
synchronised <- struct{}{}
|
||||
<-release
|
||||
if err := os.MkdirAll(filepath.Join(destDir, "etc"), 0o755); err != nil {
|
||||
return imagepull.Metadata{}, err
|
||||
}
|
||||
if err := os.WriteFile(filepath.Join(destDir, "marker"), []byte("ok"), 0o644); err != nil {
|
||||
return imagepull.Metadata{}, err
|
||||
}
|
||||
return imagepull.Metadata{Entries: map[string]imagepull.FileMeta{}}, nil
|
||||
}
|
||||
|
||||
d := &Daemon{
|
||||
layout: paths.Layout{ImagesDir: imagesDir, OCICacheDir: cacheDir},
|
||||
store: openDaemonStore(t),
|
||||
runner: system.NewRunner(),
|
||||
pullAndFlatten: pullAndFlatten,
|
||||
finalizePulledRootfs: stubFinalizePulledRootfs,
|
||||
}
|
||||
|
||||
params := api.ImagePullParams{
|
||||
Ref: "example.invalid/contender:latest",
|
||||
Name: "contender",
|
||||
KernelPath: kernel,
|
||||
InitrdPath: initrd,
|
||||
ModulesDir: modules,
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
errs := make([]error, 2)
|
||||
for i := 0; i < 2; i++ {
|
||||
wg.Add(1)
|
||||
go func(i int) {
|
||||
defer wg.Done()
|
||||
_, err := d.PullImage(context.Background(), params)
|
||||
errs[i] = err
|
||||
}(i)
|
||||
}
|
||||
// Both workers must enter the pull body before either publishes.
|
||||
for i := 0; i < 2; i++ {
|
||||
select {
|
||||
case <-synchronised:
|
||||
case <-time.After(3 * time.Second):
|
||||
t.Fatalf("pull %d never entered the slow body", i+1)
|
||||
}
|
||||
}
|
||||
close(release)
|
||||
wg.Wait()
|
||||
|
||||
wins, losses := 0, 0
|
||||
for _, err := range errs {
|
||||
if err == nil {
|
||||
wins++
|
||||
} else {
|
||||
losses++
|
||||
}
|
||||
}
|
||||
if wins != 1 || losses != 1 {
|
||||
t.Fatalf("wins=%d losses=%d, want exactly one of each (errs=%v)", wins, losses, errs)
|
||||
}
|
||||
}
|
||||
|
|
@ -16,10 +16,11 @@ import (
|
|||
"banger/internal/system"
|
||||
)
|
||||
|
||||
// RegisterImage creates or updates an unmanaged image row. Path
|
||||
// validation + kernel resolution run without imageOpsMu — only the
|
||||
// lookup-then-upsert atom is held under the lock so concurrent
|
||||
// registers of the same name don't race.
|
||||
func (d *Daemon) RegisterImage(ctx context.Context, params api.ImageRegisterParams) (image model.Image, err error) {
|
||||
d.imageOpsMu.Lock()
|
||||
defer d.imageOpsMu.Unlock()
|
||||
|
||||
name := strings.TrimSpace(params.Name)
|
||||
if name == "" {
|
||||
return model.Image{}, fmt.Errorf("image name is required")
|
||||
|
|
@ -47,6 +48,9 @@ func (d *Daemon) RegisterImage(ctx context.Context, params api.ImageRegisterPara
|
|||
return model.Image{}, err
|
||||
}
|
||||
|
||||
d.imageOpsMu.Lock()
|
||||
defer d.imageOpsMu.Unlock()
|
||||
|
||||
now := model.Now()
|
||||
existing, lookupErr := d.store.GetImageByName(ctx, name)
|
||||
switch {
|
||||
|
|
@ -90,10 +94,12 @@ func (d *Daemon) RegisterImage(ctx context.Context, params api.ImageRegisterPara
|
|||
return image, nil
|
||||
}
|
||||
|
||||
// PromoteImage copies an unmanaged image's files into the managed
|
||||
// artifacts dir and flips its managed bit. The expensive file copy,
|
||||
// SSH-key seeding, and boot-artifact staging all happen outside
|
||||
// imageOpsMu — only the find/rename/upsert commit atom holds the
|
||||
// lock.
|
||||
func (d *Daemon) PromoteImage(ctx context.Context, idOrName string) (image model.Image, err error) {
|
||||
d.imageOpsMu.Lock()
|
||||
defer d.imageOpsMu.Unlock()
|
||||
|
||||
op := d.beginOperation("image.promote")
|
||||
defer func() {
|
||||
if err != nil {
|
||||
|
|
@ -173,12 +179,6 @@ func (d *Daemon) PromoteImage(ctx context.Context, idOrName string) (image model
|
|||
return model.Image{}, err
|
||||
}
|
||||
|
||||
op.stage("activate_artifacts", "artifact_dir", artifactDir)
|
||||
if err := os.Rename(stageDir, artifactDir); err != nil {
|
||||
return model.Image{}, err
|
||||
}
|
||||
cleanupStage = false
|
||||
|
||||
image.Managed = true
|
||||
image.ArtifactDir = artifactDir
|
||||
image.RootfsPath = filepath.Join(artifactDir, "rootfs.ext4")
|
||||
|
|
@ -189,6 +189,14 @@ func (d *Daemon) PromoteImage(ctx context.Context, idOrName string) (image model
|
|||
image.InitrdPath = imagemgr.StageOptionalArtifactPath(artifactDir, initrdPath, "initrd.img")
|
||||
image.ModulesDir = imagemgr.StageOptionalArtifactPath(artifactDir, modulesDir, "modules")
|
||||
image.UpdatedAt = model.Now()
|
||||
|
||||
op.stage("activate_artifacts", "artifact_dir", artifactDir)
|
||||
d.imageOpsMu.Lock()
|
||||
defer d.imageOpsMu.Unlock()
|
||||
if err := os.Rename(stageDir, artifactDir); err != nil {
|
||||
return model.Image{}, err
|
||||
}
|
||||
cleanupStage = false
|
||||
if err := d.store.UpsertImage(ctx, image); err != nil {
|
||||
_ = os.RemoveAll(artifactDir)
|
||||
return model.Image{}, err
|
||||
|
|
@ -196,24 +204,33 @@ func (d *Daemon) PromoteImage(ctx context.Context, idOrName string) (image model
|
|||
return image, nil
|
||||
}
|
||||
|
||||
// DeleteImage runs the lookup + reference check + store delete under
|
||||
// imageOpsMu so a concurrent CreateVM can't slip an image_id reference
|
||||
// in between the check and the delete. File cleanup happens after the
|
||||
// lock is released — the store row is the authoritative handle.
|
||||
func (d *Daemon) DeleteImage(ctx context.Context, idOrName string) (model.Image, error) {
|
||||
d.imageOpsMu.Lock()
|
||||
defer d.imageOpsMu.Unlock()
|
||||
|
||||
image, err := d.FindImage(ctx, idOrName)
|
||||
image, err := func() (model.Image, error) {
|
||||
d.imageOpsMu.Lock()
|
||||
defer d.imageOpsMu.Unlock()
|
||||
img, err := d.FindImage(ctx, idOrName)
|
||||
if err != nil {
|
||||
return model.Image{}, err
|
||||
}
|
||||
vms, err := d.store.FindVMsUsingImage(ctx, img.ID)
|
||||
if err != nil {
|
||||
return model.Image{}, err
|
||||
}
|
||||
if len(vms) > 0 {
|
||||
return model.Image{}, fmt.Errorf("image %s is still referenced by %d VM(s)", img.Name, len(vms))
|
||||
}
|
||||
if err := d.store.DeleteImage(ctx, img.ID); err != nil {
|
||||
return model.Image{}, err
|
||||
}
|
||||
return img, nil
|
||||
}()
|
||||
if err != nil {
|
||||
return model.Image{}, err
|
||||
}
|
||||
vms, err := d.store.FindVMsUsingImage(ctx, image.ID)
|
||||
if err != nil {
|
||||
return model.Image{}, err
|
||||
}
|
||||
if len(vms) > 0 {
|
||||
return model.Image{}, fmt.Errorf("image %s is still referenced by %d VM(s)", image.Name, len(vms))
|
||||
}
|
||||
if err := d.store.DeleteImage(ctx, image.ID); err != nil {
|
||||
return model.Image{}, err
|
||||
}
|
||||
if image.Managed && image.ArtifactDir != "" {
|
||||
if err := os.RemoveAll(image.ArtifactDir); err != nil {
|
||||
return model.Image{}, err
|
||||
|
|
|
|||
|
|
@ -36,10 +36,15 @@ const minPullExt4Size int64 = 1 << 30 // 1 GiB
|
|||
//
|
||||
// Kernel info falls back through: `params.KernelRef` → catalog entry's
|
||||
// `kernel_ref` (bundle path only) → `params.Kernel/Initrd/ModulesDir`.
|
||||
//
|
||||
// Concurrency: the slow staging work (network fetch, ext4 build,
|
||||
// ownership fixup, guest-agent injection) runs WITHOUT imageOpsMu so
|
||||
// parallel pulls of different images interleave. imageOpsMu is taken
|
||||
// only for the publish window — recheck name is free, rename the
|
||||
// staging dir to the final artifact dir, insert the store row. If two
|
||||
// pulls race to the same name, the loser fails fast at the recheck
|
||||
// and its staging dir is cleaned up via defer.
|
||||
func (d *Daemon) PullImage(ctx context.Context, params api.ImagePullParams) (model.Image, error) {
|
||||
d.imageOpsMu.Lock()
|
||||
defer d.imageOpsMu.Unlock()
|
||||
|
||||
ref := strings.TrimSpace(params.Ref)
|
||||
if ref == "" {
|
||||
return model.Image{}, errors.New("reference is required")
|
||||
|
|
@ -55,6 +60,38 @@ func (d *Daemon) PullImage(ctx context.Context, params api.ImagePullParams) (mod
|
|||
return d.pullFromOCI(ctx, params)
|
||||
}
|
||||
|
||||
// publishImage is the narrow critical section shared by every image-
|
||||
// creation path (pull bundle/OCI, register, promote). It re-verifies
|
||||
// that `image.Name` is still free, atomically renames the staging
|
||||
// directory to its final home (when applicable), and persists the row.
|
||||
// The caller owns stagingDir cleanup on failure via its own defer; on
|
||||
// success, publishImage unsets it so the defer is a no-op.
|
||||
//
|
||||
// finalDir == "" means "already published" (the caller built artifacts
|
||||
// in place, e.g. RegisterImage which only touches the store). When
|
||||
// non-empty the rename is the publication atom: finalDir must not
|
||||
// already exist before the rename fires.
|
||||
func (d *Daemon) publishImage(ctx context.Context, image model.Image, stagingDir, finalDir string) (model.Image, error) {
|
||||
d.imageOpsMu.Lock()
|
||||
defer d.imageOpsMu.Unlock()
|
||||
|
||||
if existing, err := d.store.GetImageByName(ctx, image.Name); err == nil {
|
||||
return model.Image{}, fmt.Errorf("image %q already exists (id=%s); pick a different --name or delete it first", image.Name, existing.ID)
|
||||
}
|
||||
if finalDir != "" {
|
||||
if err := os.Rename(stagingDir, finalDir); err != nil {
|
||||
return model.Image{}, fmt.Errorf("publish artifact dir: %w", err)
|
||||
}
|
||||
}
|
||||
if err := d.store.UpsertImage(ctx, image); err != nil {
|
||||
if finalDir != "" {
|
||||
_ = os.RemoveAll(finalDir)
|
||||
}
|
||||
return model.Image{}, err
|
||||
}
|
||||
return image, nil
|
||||
}
|
||||
|
||||
// pullFromOCI is the original OCI-registry-pull path. See PullImage for
|
||||
// the intent.
|
||||
func (d *Daemon) pullFromOCI(ctx context.Context, params api.ImagePullParams) (image model.Image, err error) {
|
||||
|
|
@ -137,11 +174,6 @@ func (d *Daemon) pullFromOCI(ctx context.Context, params api.ImagePullParams) (i
|
|||
return model.Image{}, fmt.Errorf("stage boot artifacts: %w", err)
|
||||
}
|
||||
|
||||
if err := os.Rename(stagingDir, finalDir); err != nil {
|
||||
return model.Image{}, fmt.Errorf("publish artifact dir: %w", err)
|
||||
}
|
||||
cleanupStaging = false
|
||||
|
||||
now := model.Now()
|
||||
image = model.Image{
|
||||
ID: id,
|
||||
|
|
@ -155,11 +187,12 @@ func (d *Daemon) pullFromOCI(ctx context.Context, params api.ImagePullParams) (i
|
|||
CreatedAt: now,
|
||||
UpdatedAt: now,
|
||||
}
|
||||
if err := d.store.UpsertImage(ctx, image); err != nil {
|
||||
_ = os.RemoveAll(finalDir)
|
||||
published, err := d.publishImage(ctx, image, stagingDir, finalDir)
|
||||
if err != nil {
|
||||
return model.Image{}, err
|
||||
}
|
||||
return image, nil
|
||||
cleanupStaging = false
|
||||
return published, nil
|
||||
}
|
||||
|
||||
// pullFromBundle is the imagecat-backed path: download a ready-to-boot
|
||||
|
|
@ -218,11 +251,6 @@ func (d *Daemon) pullFromBundle(ctx context.Context, params api.ImagePullParams,
|
|||
return model.Image{}, fmt.Errorf("stage boot artifacts: %w", err)
|
||||
}
|
||||
|
||||
if err := os.Rename(stagingDir, finalDir); err != nil {
|
||||
return model.Image{}, fmt.Errorf("publish artifact dir: %w", err)
|
||||
}
|
||||
cleanupStaging = false
|
||||
|
||||
now := model.Now()
|
||||
image = model.Image{
|
||||
ID: id,
|
||||
|
|
@ -236,11 +264,12 @@ func (d *Daemon) pullFromBundle(ctx context.Context, params api.ImagePullParams,
|
|||
CreatedAt: now,
|
||||
UpdatedAt: now,
|
||||
}
|
||||
if err := d.store.UpsertImage(ctx, image); err != nil {
|
||||
_ = os.RemoveAll(finalDir)
|
||||
published, err := d.publishImage(ctx, image, stagingDir, finalDir)
|
||||
if err != nil {
|
||||
return model.Image{}, err
|
||||
}
|
||||
return image, nil
|
||||
cleanupStaging = false
|
||||
return published, nil
|
||||
}
|
||||
|
||||
// runBundleFetch is the seam tests substitute. nil → real implementation.
|
||||
|
|
|
|||
|
|
@ -2,6 +2,8 @@ package daemon
|
|||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
|
@ -13,9 +15,19 @@ import (
|
|||
"banger/internal/vmdns"
|
||||
)
|
||||
|
||||
// CreateVM is split into three phases so the global createVMMu guards
|
||||
// only the narrow name+IP reservation window, not the slow image
|
||||
// resolution or the multi-second boot flow:
|
||||
//
|
||||
// 1. Validate + resolve image. No global lock. Image auto-pull
|
||||
// self-locks via imageOpsMu (which is also now publication-only).
|
||||
// 2. Reserve a row: generate id, pick next IP, claim the name,
|
||||
// UpsertVM the "created" record. Held under createVMMu so two
|
||||
// concurrent `vm create --name foo` calls can't both think they
|
||||
// won.
|
||||
// 3. Boot. Only the per-VM lock is held — parallel creates against
|
||||
// different VMs fully overlap.
|
||||
func (d *Daemon) CreateVM(ctx context.Context, params api.VMCreateParams) (vm model.VMRecord, err error) {
|
||||
d.createVMMu.Lock()
|
||||
defer d.createVMMu.Unlock()
|
||||
op := d.beginOperation("vm.create")
|
||||
defer func() {
|
||||
if err != nil {
|
||||
|
|
@ -42,34 +54,7 @@ func (d *Daemon) CreateVM(ctx context.Context, params api.VMCreateParams) (vm mo
|
|||
}
|
||||
vmCreateStage(ctx, "resolve_image", "using image "+image.Name)
|
||||
op.stage("image_resolved", imageLogAttrs(image)...)
|
||||
name := strings.TrimSpace(params.Name)
|
||||
if name == "" {
|
||||
name, err = d.generateName(ctx)
|
||||
if err != nil {
|
||||
return model.VMRecord{}, err
|
||||
}
|
||||
}
|
||||
if _, err := d.FindVM(ctx, name); err == nil {
|
||||
return model.VMRecord{}, fmt.Errorf("vm name already exists: %s", name)
|
||||
}
|
||||
id, err := model.NewID()
|
||||
if err != nil {
|
||||
return model.VMRecord{}, err
|
||||
}
|
||||
unlockVM := d.lockVMID(id)
|
||||
defer unlockVM()
|
||||
guestIP, err := d.store.NextGuestIP(ctx, bridgePrefix(d.config.BridgeIP))
|
||||
if err != nil {
|
||||
return model.VMRecord{}, err
|
||||
}
|
||||
vmDir := filepath.Join(d.layout.VMsDir, id)
|
||||
if err := os.MkdirAll(vmDir, 0o755); err != nil {
|
||||
return model.VMRecord{}, err
|
||||
}
|
||||
vsockCID, err := defaultVSockCID(guestIP)
|
||||
if err != nil {
|
||||
return model.VMRecord{}, err
|
||||
}
|
||||
|
||||
systemOverlaySize := int64(model.DefaultSystemOverlaySize)
|
||||
if params.SystemOverlaySize != "" {
|
||||
systemOverlaySize, err = model.ParseSize(params.SystemOverlaySize)
|
||||
|
|
@ -84,7 +69,6 @@ func (d *Daemon) CreateVM(ctx context.Context, params api.VMCreateParams) (vm mo
|
|||
return model.VMRecord{}, err
|
||||
}
|
||||
}
|
||||
now := model.Now()
|
||||
spec := model.VMSpec{
|
||||
VCPUCount: optionalIntOrDefault(params.VCPUCount, model.DefaultVCPUCount),
|
||||
MemoryMiB: optionalIntOrDefault(params.MemoryMiB, model.DefaultMemoryMiB),
|
||||
|
|
@ -92,7 +76,69 @@ func (d *Daemon) CreateVM(ctx context.Context, params api.VMCreateParams) (vm mo
|
|||
WorkDiskSizeBytes: workDiskSize,
|
||||
NATEnabled: params.NATEnabled,
|
||||
}
|
||||
vm = model.VMRecord{
|
||||
|
||||
vm, err = d.reserveVM(ctx, strings.TrimSpace(params.Name), image, spec)
|
||||
if err != nil {
|
||||
return model.VMRecord{}, err
|
||||
}
|
||||
op.stage("persisted", vmLogAttrs(vm)...)
|
||||
vmCreateBindVM(ctx, vm)
|
||||
vmCreateStage(ctx, "reserve_vm", fmt.Sprintf("allocated %s (%s)", vm.Name, vm.Runtime.GuestIP))
|
||||
|
||||
unlockVM := d.lockVMID(vm.ID)
|
||||
defer unlockVM()
|
||||
|
||||
if params.NoStart {
|
||||
vm.State = model.VMStateStopped
|
||||
vm.Runtime.State = model.VMStateStopped
|
||||
if err := d.store.UpsertVM(ctx, vm); err != nil {
|
||||
return model.VMRecord{}, err
|
||||
}
|
||||
return vm, nil
|
||||
}
|
||||
return d.startVMLocked(ctx, vm, image)
|
||||
}
|
||||
|
||||
// reserveVM holds createVMMu only long enough to verify the name is
|
||||
// free, allocate a guest IP from the store, and persist the "created"
|
||||
// reservation row. Everything else (image resolution upstream, boot
|
||||
// downstream) runs outside this lock.
|
||||
func (d *Daemon) reserveVM(ctx context.Context, requestedName string, image model.Image, spec model.VMSpec) (model.VMRecord, error) {
|
||||
d.createVMMu.Lock()
|
||||
defer d.createVMMu.Unlock()
|
||||
|
||||
name := requestedName
|
||||
if name == "" {
|
||||
generated, err := d.generateName(ctx)
|
||||
if err != nil {
|
||||
return model.VMRecord{}, err
|
||||
}
|
||||
name = generated
|
||||
}
|
||||
if _, err := d.FindVM(ctx, name); err == nil {
|
||||
return model.VMRecord{}, fmt.Errorf("vm name already exists: %s", name)
|
||||
} else if !errors.Is(err, sql.ErrNoRows) && !strings.Contains(err.Error(), "not found") {
|
||||
return model.VMRecord{}, err
|
||||
}
|
||||
|
||||
id, err := model.NewID()
|
||||
if err != nil {
|
||||
return model.VMRecord{}, err
|
||||
}
|
||||
guestIP, err := d.store.NextGuestIP(ctx, bridgePrefix(d.config.BridgeIP))
|
||||
if err != nil {
|
||||
return model.VMRecord{}, err
|
||||
}
|
||||
vmDir := filepath.Join(d.layout.VMsDir, id)
|
||||
if err := os.MkdirAll(vmDir, 0o755); err != nil {
|
||||
return model.VMRecord{}, err
|
||||
}
|
||||
vsockCID, err := defaultVSockCID(guestIP)
|
||||
if err != nil {
|
||||
return model.VMRecord{}, err
|
||||
}
|
||||
now := model.Now()
|
||||
vm := model.VMRecord{
|
||||
ID: id,
|
||||
Name: name,
|
||||
ImageID: image.ID,
|
||||
|
|
@ -114,21 +160,10 @@ func (d *Daemon) CreateVM(ctx context.Context, params api.VMCreateParams) (vm mo
|
|||
MetricsPath: filepath.Join(vmDir, "metrics.json"),
|
||||
},
|
||||
}
|
||||
vmCreateBindVM(ctx, vm)
|
||||
vmCreateStage(ctx, "reserve_vm", fmt.Sprintf("allocated %s (%s)", vm.Name, vm.Runtime.GuestIP))
|
||||
if err := d.store.UpsertVM(ctx, vm); err != nil {
|
||||
return model.VMRecord{}, err
|
||||
}
|
||||
op.stage("persisted", vmLogAttrs(vm)...)
|
||||
if params.NoStart {
|
||||
vm.State = model.VMStateStopped
|
||||
vm.Runtime.State = model.VMStateStopped
|
||||
if err := d.store.UpsertVM(ctx, vm); err != nil {
|
||||
return model.VMRecord{}, err
|
||||
}
|
||||
return vm, nil
|
||||
}
|
||||
return d.startVMLocked(ctx, vm, image)
|
||||
return vm, nil
|
||||
}
|
||||
|
||||
// findOrAutoPullImage tries the local image store first; if the name
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue