From 59f2766139babbd9b42f8e1f2cbf9130bab3284c Mon Sep 17 00:00:00 2001 From: Thales Maciel Date: Wed, 15 Apr 2026 15:58:33 -0300 Subject: [PATCH] Move subsystem state/locks off Daemon into owning types Daemon no longer owns a coarse mu shared across unrelated concerns. Each subsystem now carries its own state and lock: - tapPool: entries, next, and mu move onto a new tapPool struct. - sessionRegistry: sessionControllers + its mutex move off Daemon. - opRegistry[T asyncOp]: generic registry collapses the two ad-hoc vm-create and image-build operation maps (and their mutexes) into one shared type; the Begin/Status/Cancel/Prune methods simplify. - vmLockSet: the sync.Map of per-VM mutexes moves into its own type; lockVMID forwards. - Daemon.mu splits into imageOpsMu (image-registry mutations) and createVMMu (CreateVM serialisation) so image ops and VM creates no longer block each other. Lock ordering collapses to vmLocks[id] -> {createVMMu, imageOpsMu} -> subsystem-local leaves. doc.go and ARCHITECTURE.md updated. No behavior change; tests green. Co-Authored-By: Claude Sonnet 4.6 --- internal/daemon/ARCHITECTURE.md | 43 +++++++------ internal/daemon/daemon.go | 32 ++++------ internal/daemon/doc.go | 10 +-- internal/daemon/image_build_ops.go | 34 +++-------- internal/daemon/images.go | 16 ++--- internal/daemon/op_registry.go | 55 +++++++++++++++++ internal/daemon/session_controller.go | 88 +++++++++++++++++++-------- internal/daemon/tap_pool.go | 55 ++++++++++------- internal/daemon/vm_create.go | 4 +- internal/daemon/vm_create_ops.go | 34 +++-------- internal/daemon/vm_locks.go | 19 ++++++ 11 files changed, 238 insertions(+), 152 deletions(-) create mode 100644 internal/daemon/op_registry.go create mode 100644 internal/daemon/vm_locks.go diff --git a/internal/daemon/ARCHITECTURE.md b/internal/daemon/ARCHITECTURE.md index 5d9af67..da61adf 100644 --- a/internal/daemon/ARCHITECTURE.md +++ b/internal/daemon/ARCHITECTURE.md @@ -7,16 +7,22 @@ against which the phased split described in ## Composition -`Daemon` is a single struct aggregating state for every subsystem: +`Daemon` is the composition root. Subsystem state and locks have moved onto +owning types: - Layout, config, store, runner, logger, pid — infrastructure handles. -- `mu sync.Mutex` — coarse lock, currently guards guest session controller - map mutations and image registry mutations. -- `vmLocks sync.Map` — per-VM `*sync.Mutex`, one per VM ID. -- `createOps`, `createOpsMu` — in-flight `vm create` operations. -- `imageBuildOps`, `imageBuildOpsMu` — in-flight `image build` operations. -- `tapPool`, `tapPoolNext`, `tapPoolMu` — TAP interface pool. -- `sessionControllers` — active guest session controllers (guarded by `mu`). +- `vmLocks vmLockSet` — per-VM `*sync.Mutex`, one per VM ID. +- `createVMMu sync.Mutex` — serialises `CreateVM` (guards name uniqueness + + guest IP allocation window). +- `imageOpsMu sync.Mutex` — serialises image-registry mutations + (`BuildImage`, `RegisterImage`, `PromoteImage`, `DeleteImage`). +- `createOps opRegistry[*vmCreateOperationState]` — in-flight VM create + operations, owns its own lock. +- `imageBuildOps opRegistry[*imageBuildOperationState]` — in-flight image + build operations, owns its own lock. +- `tapPool tapPool` — TAP interface pool; owns its own lock. +- `sessions sessionRegistry` — active guest session controllers; owns its + own lock. - `listener`, `webListener`, `webServer`, `webURL`, `vmDNS` — networking. - `vmCaps` — registered VM capability hooks. - `imageBuild`, `requestHandler`, `guestWaitForSSH`, `guestDial`, @@ -28,23 +34,22 @@ Acquire in this order, release in reverse. Never acquire in the opposite direction. ``` -vmLocks[id] → mu → {createOpsMu, imageBuildOpsMu, tapPoolMu} +vmLocks[id] → {createVMMu, imageOpsMu} → subsystem-local locks ``` +Subsystem-local locks (tapPool.mu, sessionRegistry.mu, opRegistry.mu, +guestSessionController.attachMu/writeMu) are leaves. They do not contend +with each other. + Notes: - `vmLocks[id]` is the outer lock for any operation scoped to a single VM. Acquired via `withVMLockByID` / `withVMLockByRef`. -- `mu` is currently load-bearing for both session controller lookups and - image registry changes. Holding it while calling into guest SSH is - discouraged; prefer copying needed state out under the lock and releasing - before blocking I/O. -- The three subsystem locks (`createOpsMu`, `imageBuildOpsMu`, `tapPoolMu`) - are leaves. Nothing else is acquired while one is held. - -The upcoming Phase 2 refactor will retire `mu` entirely by giving each -concern it currently guards its own owning type and lock. At that point -the ordering collapses to `vmLocks[id] → subsystem-local lock`. +- `createVMMu` and `imageOpsMu` are narrow: each guards one family of + mutations and is released before any blocking guest I/O. +- Holding a subsystem-local lock while calling into guest SSH is + discouraged; copy needed state out under the lock and release before + blocking I/O. ## External API diff --git a/internal/daemon/daemon.go b/internal/daemon/daemon.go index 4cfe4e1..ac02a2a 100644 --- a/internal/daemon/daemon.go +++ b/internal/daemon/daemon.go @@ -32,16 +32,13 @@ type Daemon struct { store *store.Store runner system.CommandRunner logger *slog.Logger - mu sync.Mutex - createOpsMu sync.Mutex - createOps map[string]*vmCreateOperationState - imageBuildOpsMu sync.Mutex - imageBuildOps map[string]*imageBuildOperationState - vmLocks sync.Map // map[string]*sync.Mutex; keyed by VM ID - sessionControllers map[string]*guestSessionController - tapPoolMu sync.Mutex - tapPool []string - tapPoolNext int + imageOpsMu sync.Mutex + createVMMu sync.Mutex + createOps opRegistry[*vmCreateOperationState] + imageBuildOps opRegistry[*imageBuildOperationState] + vmLocks vmLockSet + sessions sessionRegistry + tapPool tapPool closing chan struct{} once sync.Once pid int @@ -85,9 +82,9 @@ func Open(ctx context.Context) (d *Daemon, err error) { store: db, runner: system.NewRunner(), logger: logger, - closing: make(chan struct{}), - pid: os.Getpid(), - sessionControllers: make(map[string]*guestSessionController), + closing: make(chan struct{}), + pid: os.Getpid(), + sessions: newSessionRegistry(), } d.ensureVMSSHClientConfig() d.logger.Info("daemon opened", "socket", layout.SocketPath, "state_dir", layout.StateDir, "log_level", cfg.LogLevel) @@ -720,14 +717,7 @@ func (d *Daemon) withVMLockByIDErr(ctx context.Context, id string, fn func(model } func (d *Daemon) lockVMID(id string) func() { - // LoadOrStore is atomic: exactly one *sync.Mutex wins for each ID. - // Both the map lookup and the conditional insert happen without a - // release-then-reacquire gap, eliminating the TOCTOU window that - // existed when vmLocksMu was released before lock.Lock() was called. - val, _ := d.vmLocks.LoadOrStore(id, &sync.Mutex{}) - mu := val.(*sync.Mutex) - mu.Lock() - return mu.Unlock + return d.vmLocks.lock(id) } func marshalResultOrError(v any, err error) rpc.Response { diff --git a/internal/daemon/doc.go b/internal/daemon/doc.go index f8b91bf..395a516 100644 --- a/internal/daemon/doc.go +++ b/internal/daemon/doc.go @@ -51,11 +51,11 @@ // runtime_assets.go paths to bundled companion binaries // web.go embedded web UI server // -// Lock ordering (current, pre-refactor): +// Lock ordering: // -// vmLocks[id] → mu → {createOpsMu, imageBuildOpsMu, tapPoolMu} +// vmLocks[id] → {createVMMu, imageOpsMu} → subsystem-local locks // -// The coarse mu currently guards unrelated state (session controllers, -// image registry mutations, in-flight VM create bookkeeping) and is the -// target of the Phase 2 split. See ARCHITECTURE.md for details. +// Subsystem-local locks live on the owning type (tapPool.mu, +// sessionRegistry.mu, opRegistry.mu, guestSessionController.attachMu/writeMu) +// and do not contend with each other. See ARCHITECTURE.md for details. package daemon diff --git a/internal/daemon/image_build_ops.go b/internal/daemon/image_build_ops.go index 813a7a2..f0e24af 100644 --- a/internal/daemon/image_build_ops.go +++ b/internal/daemon/image_build_ops.go @@ -11,6 +11,11 @@ import ( "banger/internal/model" ) +func (op *imageBuildOperationState) opID() string { return op.snapshot().ID } +func (op *imageBuildOperationState) opIsDone() bool { return op.snapshot().Done } +func (op *imageBuildOperationState) opUpdatedAt() time.Time { return op.snapshot().UpdatedAt } +func (op *imageBuildOperationState) opCancel() { op.cancelOperation() } + type imageBuildProgressKey struct{} type imageBuildOperationState struct { @@ -161,14 +166,7 @@ func (d *Daemon) BeginImageBuild(_ context.Context, params api.ImageBuildParams) } buildCtx, cancel := context.WithCancel(context.Background()) op.setCancel(cancel) - - d.imageBuildOpsMu.Lock() - if d.imageBuildOps == nil { - d.imageBuildOps = map[string]*imageBuildOperationState{} - } - d.imageBuildOps[op.op.ID] = op - d.imageBuildOpsMu.Unlock() - + d.imageBuildOps.insert(op) go d.runImageBuildOperation(withImageBuildProgress(buildCtx, op), op, params) return op.snapshot(), nil } @@ -183,9 +181,7 @@ func (d *Daemon) runImageBuildOperation(ctx context.Context, op *imageBuildOpera } func (d *Daemon) ImageBuildStatus(_ context.Context, id string) (api.ImageBuildOperation, error) { - d.imageBuildOpsMu.Lock() - op, ok := d.imageBuildOps[strings.TrimSpace(id)] - d.imageBuildOpsMu.Unlock() + op, ok := d.imageBuildOps.get(strings.TrimSpace(id)) if !ok { return api.ImageBuildOperation{}, fmt.Errorf("image build operation not found: %s", id) } @@ -193,9 +189,7 @@ func (d *Daemon) ImageBuildStatus(_ context.Context, id string) (api.ImageBuildO } func (d *Daemon) CancelImageBuild(_ context.Context, id string) error { - d.imageBuildOpsMu.Lock() - op, ok := d.imageBuildOps[strings.TrimSpace(id)] - d.imageBuildOpsMu.Unlock() + op, ok := d.imageBuildOps.get(strings.TrimSpace(id)) if !ok { return fmt.Errorf("image build operation not found: %s", id) } @@ -204,15 +198,5 @@ func (d *Daemon) CancelImageBuild(_ context.Context, id string) error { } func (d *Daemon) pruneImageBuildOperations(olderThan time.Time) { - d.imageBuildOpsMu.Lock() - defer d.imageBuildOpsMu.Unlock() - for id, op := range d.imageBuildOps { - snapshot := op.snapshot() - if !snapshot.Done { - continue - } - if snapshot.UpdatedAt.Before(olderThan) { - delete(d.imageBuildOps, id) - } - } + d.imageBuildOps.prune(olderThan) } diff --git a/internal/daemon/images.go b/internal/daemon/images.go index b20873e..1768b05 100644 --- a/internal/daemon/images.go +++ b/internal/daemon/images.go @@ -16,8 +16,8 @@ import ( ) func (d *Daemon) BuildImage(ctx context.Context, params api.ImageBuildParams) (image model.Image, err error) { - d.mu.Lock() - defer d.mu.Unlock() + d.imageOpsMu.Lock() + defer d.imageOpsMu.Unlock() op := d.beginOperation("image.build") buildLogPath := "" defer func() { @@ -156,8 +156,8 @@ func (d *Daemon) BuildImage(ctx context.Context, params api.ImageBuildParams) (i } func (d *Daemon) RegisterImage(ctx context.Context, params api.ImageRegisterParams) (image model.Image, err error) { - d.mu.Lock() - defer d.mu.Unlock() + d.imageOpsMu.Lock() + defer d.imageOpsMu.Unlock() name := strings.TrimSpace(params.Name) if name == "" { @@ -232,8 +232,8 @@ func (d *Daemon) RegisterImage(ctx context.Context, params api.ImageRegisterPara } func (d *Daemon) PromoteImage(ctx context.Context, idOrName string) (image model.Image, err error) { - d.mu.Lock() - defer d.mu.Unlock() + d.imageOpsMu.Lock() + defer d.imageOpsMu.Unlock() op := d.beginOperation("image.promote") defer func() { @@ -375,8 +375,8 @@ func writePackagesMetadata(rootfsPath string, packages []string) error { } func (d *Daemon) DeleteImage(ctx context.Context, idOrName string) (model.Image, error) { - d.mu.Lock() - defer d.mu.Unlock() + d.imageOpsMu.Lock() + defer d.imageOpsMu.Unlock() image, err := d.FindImage(ctx, idOrName) if err != nil { diff --git a/internal/daemon/op_registry.go b/internal/daemon/op_registry.go new file mode 100644 index 0000000..2130737 --- /dev/null +++ b/internal/daemon/op_registry.go @@ -0,0 +1,55 @@ +package daemon + +import ( + "sync" + "time" +) + +// asyncOp is the protocol shared by the long-running operation state types +// (VM create, image build). Each operation has a stable ID, a done flag that +// flips to true when its goroutine finishes, an UpdatedAt for pruning, and a +// way to signal cancellation to its goroutine. +type asyncOp interface { + opID() string + opIsDone() bool + opUpdatedAt() time.Time + opCancel() +} + +// opRegistry is a mutex-guarded map of in-flight operations keyed by op ID. +// One registry per operation kind; each owns its own lock, so registries do +// not contend with each other or with Daemon.mu. +type opRegistry[T asyncOp] struct { + mu sync.Mutex + byID map[string]T +} + +func (r *opRegistry[T]) insert(op T) { + r.mu.Lock() + defer r.mu.Unlock() + if r.byID == nil { + r.byID = map[string]T{} + } + r.byID[op.opID()] = op +} + +func (r *opRegistry[T]) get(id string) (T, bool) { + r.mu.Lock() + defer r.mu.Unlock() + op, ok := r.byID[id] + return op, ok +} + +// prune drops completed operations last updated before the cutoff. +func (r *opRegistry[T]) prune(before time.Time) { + r.mu.Lock() + defer r.mu.Unlock() + for id, op := range r.byID { + if !op.opIsDone() { + continue + } + if op.opUpdatedAt().Before(before) { + delete(r.byID, id) + } + } +} diff --git a/internal/daemon/session_controller.go b/internal/daemon/session_controller.go index 19a2860..8f45a36 100644 --- a/internal/daemon/session_controller.go +++ b/internal/daemon/session_controller.go @@ -106,47 +106,87 @@ type guestSessionStateSnapshot struct { LastError string } -func (d *Daemon) setGuestSessionController(id string, controller *guestSessionController) { - d.mu.Lock() - defer d.mu.Unlock() - d.sessionControllers[id] = controller +// sessionRegistry owns the live guest-session controller map. Its lock is +// independent of Daemon.mu so guest-session lookups do not contend with +// unrelated daemon state. +type sessionRegistry struct { + mu sync.Mutex + byID map[string]*guestSessionController + closed bool } -func (d *Daemon) claimGuestSessionController(id string, controller *guestSessionController) bool { - d.mu.Lock() - defer d.mu.Unlock() - if d.sessionControllers[id] != nil { +func newSessionRegistry() sessionRegistry { + return sessionRegistry{byID: make(map[string]*guestSessionController)} +} + +func (r *sessionRegistry) set(id string, controller *guestSessionController) { + r.mu.Lock() + defer r.mu.Unlock() + if r.closed { + return + } + r.byID[id] = controller +} + +func (r *sessionRegistry) claim(id string, controller *guestSessionController) bool { + r.mu.Lock() + defer r.mu.Unlock() + if r.closed { return false } - d.sessionControllers[id] = controller + if r.byID[id] != nil { + return false + } + r.byID[id] = controller return true } -func (d *Daemon) getGuestSessionController(id string) *guestSessionController { - d.mu.Lock() - defer d.mu.Unlock() - return d.sessionControllers[id] +func (r *sessionRegistry) get(id string) *guestSessionController { + r.mu.Lock() + defer r.mu.Unlock() + return r.byID[id] } -func (d *Daemon) clearGuestSessionController(id string) *guestSessionController { - d.mu.Lock() - defer d.mu.Unlock() - controller := d.sessionControllers[id] - delete(d.sessionControllers, id) +func (r *sessionRegistry) clear(id string) *guestSessionController { + r.mu.Lock() + defer r.mu.Unlock() + controller := r.byID[id] + delete(r.byID, id) return controller } -func (d *Daemon) closeGuestSessionControllers() error { - d.mu.Lock() - controllers := make([]*guestSessionController, 0, len(d.sessionControllers)) - for _, controller := range d.sessionControllers { +func (r *sessionRegistry) closeAll() error { + r.mu.Lock() + controllers := make([]*guestSessionController, 0, len(r.byID)) + for _, controller := range r.byID { controllers = append(controllers, controller) } - d.sessionControllers = nil - d.mu.Unlock() + r.byID = nil + r.closed = true + r.mu.Unlock() var err error for _, controller := range controllers { err = errors.Join(err, controller.close()) } return err } + +func (d *Daemon) setGuestSessionController(id string, controller *guestSessionController) { + d.sessions.set(id, controller) +} + +func (d *Daemon) claimGuestSessionController(id string, controller *guestSessionController) bool { + return d.sessions.claim(id, controller) +} + +func (d *Daemon) getGuestSessionController(id string) *guestSessionController { + return d.sessions.get(id) +} + +func (d *Daemon) clearGuestSessionController(id string) *guestSessionController { + return d.sessions.clear(id) +} + +func (d *Daemon) closeGuestSessionControllers() error { + return d.sessions.closeAll() +} diff --git a/internal/daemon/tap_pool.go b/internal/daemon/tap_pool.go index ddf436e..75cf44c 100644 --- a/internal/daemon/tap_pool.go +++ b/internal/daemon/tap_pool.go @@ -5,10 +5,19 @@ import ( "fmt" "strconv" "strings" + "sync" ) const tapPoolPrefix = "tap-pool-" +// tapPool owns the idle TAP interface cache plus the monotonic index used to +// name new pool entries. All access goes through mu. +type tapPool struct { + mu sync.Mutex + entries []string + next int +} + func (d *Daemon) initializeTapPool(ctx context.Context) error { if d.config.TapPoolSize <= 0 || d.store == nil { return nil @@ -23,9 +32,9 @@ func (d *Daemon) initializeTapPool(ctx context.Context) error { next = index + 1 } } - d.tapPoolMu.Lock() - d.tapPoolNext = next - d.tapPoolMu.Unlock() + d.tapPool.mu.Lock() + d.tapPool.next = next + d.tapPool.mu.Unlock() return nil } @@ -42,14 +51,14 @@ func (d *Daemon) ensureTapPool(ctx context.Context) { default: } - d.tapPoolMu.Lock() - if len(d.tapPool) >= d.config.TapPoolSize { - d.tapPoolMu.Unlock() + d.tapPool.mu.Lock() + if len(d.tapPool.entries) >= d.config.TapPoolSize { + d.tapPool.mu.Unlock() return } - tapName := fmt.Sprintf("%s%d", tapPoolPrefix, d.tapPoolNext) - d.tapPoolNext++ - d.tapPoolMu.Unlock() + tapName := fmt.Sprintf("%s%d", tapPoolPrefix, d.tapPool.next) + d.tapPool.next++ + d.tapPool.mu.Unlock() if err := d.createTap(ctx, tapName); err != nil { if d.logger != nil { @@ -58,9 +67,9 @@ func (d *Daemon) ensureTapPool(ctx context.Context) { return } - d.tapPoolMu.Lock() - d.tapPool = append(d.tapPool, tapName) - d.tapPoolMu.Unlock() + d.tapPool.mu.Lock() + d.tapPool.entries = append(d.tapPool.entries, tapName) + d.tapPool.mu.Unlock() if d.logger != nil { d.logger.Debug("tap added to idle pool", "tap_device", tapName) @@ -69,14 +78,14 @@ func (d *Daemon) ensureTapPool(ctx context.Context) { } func (d *Daemon) acquireTap(ctx context.Context, fallbackName string) (string, error) { - d.tapPoolMu.Lock() - if n := len(d.tapPool); n > 0 { - tapName := d.tapPool[n-1] - d.tapPool = d.tapPool[:n-1] - d.tapPoolMu.Unlock() + d.tapPool.mu.Lock() + if n := len(d.tapPool.entries); n > 0 { + tapName := d.tapPool.entries[n-1] + d.tapPool.entries = d.tapPool.entries[:n-1] + d.tapPool.mu.Unlock() return tapName, nil } - d.tapPoolMu.Unlock() + d.tapPool.mu.Unlock() if err := d.createTap(ctx, fallbackName); err != nil { return "", err @@ -90,13 +99,13 @@ func (d *Daemon) releaseTap(ctx context.Context, tapName string) error { return nil } if isTapPoolName(tapName) { - d.tapPoolMu.Lock() - if len(d.tapPool) < d.config.TapPoolSize { - d.tapPool = append(d.tapPool, tapName) - d.tapPoolMu.Unlock() + d.tapPool.mu.Lock() + if len(d.tapPool.entries) < d.config.TapPoolSize { + d.tapPool.entries = append(d.tapPool.entries, tapName) + d.tapPool.mu.Unlock() return nil } - d.tapPoolMu.Unlock() + d.tapPool.mu.Unlock() } _, err := d.runner.RunSudo(ctx, "ip", "link", "del", tapName) if err == nil { diff --git a/internal/daemon/vm_create.go b/internal/daemon/vm_create.go index 59493a8..13c1a3f 100644 --- a/internal/daemon/vm_create.go +++ b/internal/daemon/vm_create.go @@ -13,8 +13,8 @@ import ( ) func (d *Daemon) CreateVM(ctx context.Context, params api.VMCreateParams) (vm model.VMRecord, err error) { - d.mu.Lock() - defer d.mu.Unlock() + d.createVMMu.Lock() + defer d.createVMMu.Unlock() op := d.beginOperation("vm.create") defer func() { if err != nil { diff --git a/internal/daemon/vm_create_ops.go b/internal/daemon/vm_create_ops.go index 0b856a3..f5284a0 100644 --- a/internal/daemon/vm_create_ops.go +++ b/internal/daemon/vm_create_ops.go @@ -11,6 +11,11 @@ import ( "banger/internal/model" ) +func (op *vmCreateOperationState) opID() string { return op.snapshot().ID } +func (op *vmCreateOperationState) opIsDone() bool { return op.snapshot().Done } +func (op *vmCreateOperationState) opUpdatedAt() time.Time { return op.snapshot().UpdatedAt } +func (op *vmCreateOperationState) opCancel() { op.cancelOperation() } + type vmCreateProgressKey struct{} type vmCreateOperationState struct { @@ -148,14 +153,7 @@ func (d *Daemon) BeginVMCreate(_ context.Context, params api.VMCreateParams) (ap } createCtx, cancel := context.WithCancel(context.Background()) op.setCancel(cancel) - - d.createOpsMu.Lock() - if d.createOps == nil { - d.createOps = map[string]*vmCreateOperationState{} - } - d.createOps[op.op.ID] = op - d.createOpsMu.Unlock() - + d.createOps.insert(op) go d.runVMCreateOperation(withVMCreateProgress(createCtx, op), op, params) return op.snapshot(), nil } @@ -170,9 +168,7 @@ func (d *Daemon) runVMCreateOperation(ctx context.Context, op *vmCreateOperation } func (d *Daemon) VMCreateStatus(_ context.Context, id string) (api.VMCreateOperation, error) { - d.createOpsMu.Lock() - op, ok := d.createOps[strings.TrimSpace(id)] - d.createOpsMu.Unlock() + op, ok := d.createOps.get(strings.TrimSpace(id)) if !ok { return api.VMCreateOperation{}, fmt.Errorf("vm create operation not found: %s", id) } @@ -180,9 +176,7 @@ func (d *Daemon) VMCreateStatus(_ context.Context, id string) (api.VMCreateOpera } func (d *Daemon) CancelVMCreate(_ context.Context, id string) error { - d.createOpsMu.Lock() - op, ok := d.createOps[strings.TrimSpace(id)] - d.createOpsMu.Unlock() + op, ok := d.createOps.get(strings.TrimSpace(id)) if !ok { return fmt.Errorf("vm create operation not found: %s", id) } @@ -191,15 +185,5 @@ func (d *Daemon) CancelVMCreate(_ context.Context, id string) error { } func (d *Daemon) pruneVMCreateOperations(olderThan time.Time) { - d.createOpsMu.Lock() - defer d.createOpsMu.Unlock() - for id, op := range d.createOps { - snapshot := op.snapshot() - if !snapshot.Done { - continue - } - if snapshot.UpdatedAt.Before(olderThan) { - delete(d.createOps, id) - } - } + d.createOps.prune(olderThan) } diff --git a/internal/daemon/vm_locks.go b/internal/daemon/vm_locks.go new file mode 100644 index 0000000..0c731a7 --- /dev/null +++ b/internal/daemon/vm_locks.go @@ -0,0 +1,19 @@ +package daemon + +import "sync" + +// vmLockSet maps VM IDs to per-VM mutexes. Concurrent operations on different +// VMs run in parallel; concurrent operations on the same VM serialise. +type vmLockSet struct { + byID sync.Map // map[string]*sync.Mutex +} + +// lock acquires the mutex for the given VM ID and returns its unlock func. +// LoadOrStore is atomic — exactly one *sync.Mutex wins for each ID, so there +// is no release-then-reacquire TOCTOU window. +func (s *vmLockSet) lock(id string) func() { + val, _ := s.byID.LoadOrStore(id, &sync.Mutex{}) + mu := val.(*sync.Mutex) + mu.Lock() + return mu.Unlock +}