Move subsystem state/locks off Daemon into owning types
Daemon no longer owns a coarse mu shared across unrelated concerns.
Each subsystem now carries its own state and lock:
- tapPool: entries, next, and mu move onto a new tapPool struct.
- sessionRegistry: sessionControllers + its mutex move off Daemon.
- opRegistry[T asyncOp]: generic registry collapses the two ad-hoc
vm-create and image-build operation maps (and their mutexes) into one
shared type; the Begin/Status/Cancel/Prune methods simplify.
- vmLockSet: the sync.Map of per-VM mutexes moves into its own type;
lockVMID forwards.
- Daemon.mu splits into imageOpsMu (image-registry mutations) and
createVMMu (CreateVM serialisation) so image ops and VM creates no
longer block each other.
Lock ordering collapses to vmLocks[id] -> {createVMMu, imageOpsMu} ->
subsystem-local leaves. doc.go and ARCHITECTURE.md updated.
No behavior change; tests green.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
ea0db1e17e
commit
59f2766139
11 changed files with 238 additions and 152 deletions
|
|
@ -7,16 +7,22 @@ against which the phased split described in
|
||||||
|
|
||||||
## Composition
|
## Composition
|
||||||
|
|
||||||
`Daemon` is a single struct aggregating state for every subsystem:
|
`Daemon` is the composition root. Subsystem state and locks have moved onto
|
||||||
|
owning types:
|
||||||
|
|
||||||
- Layout, config, store, runner, logger, pid — infrastructure handles.
|
- Layout, config, store, runner, logger, pid — infrastructure handles.
|
||||||
- `mu sync.Mutex` — coarse lock, currently guards guest session controller
|
- `vmLocks vmLockSet` — per-VM `*sync.Mutex`, one per VM ID.
|
||||||
map mutations and image registry mutations.
|
- `createVMMu sync.Mutex` — serialises `CreateVM` (guards name uniqueness
|
||||||
- `vmLocks sync.Map` — per-VM `*sync.Mutex`, one per VM ID.
|
+ guest IP allocation window).
|
||||||
- `createOps`, `createOpsMu` — in-flight `vm create` operations.
|
- `imageOpsMu sync.Mutex` — serialises image-registry mutations
|
||||||
- `imageBuildOps`, `imageBuildOpsMu` — in-flight `image build` operations.
|
(`BuildImage`, `RegisterImage`, `PromoteImage`, `DeleteImage`).
|
||||||
- `tapPool`, `tapPoolNext`, `tapPoolMu` — TAP interface pool.
|
- `createOps opRegistry[*vmCreateOperationState]` — in-flight VM create
|
||||||
- `sessionControllers` — active guest session controllers (guarded by `mu`).
|
operations, owns its own lock.
|
||||||
|
- `imageBuildOps opRegistry[*imageBuildOperationState]` — in-flight image
|
||||||
|
build operations, owns its own lock.
|
||||||
|
- `tapPool tapPool` — TAP interface pool; owns its own lock.
|
||||||
|
- `sessions sessionRegistry` — active guest session controllers; owns its
|
||||||
|
own lock.
|
||||||
- `listener`, `webListener`, `webServer`, `webURL`, `vmDNS` — networking.
|
- `listener`, `webListener`, `webServer`, `webURL`, `vmDNS` — networking.
|
||||||
- `vmCaps` — registered VM capability hooks.
|
- `vmCaps` — registered VM capability hooks.
|
||||||
- `imageBuild`, `requestHandler`, `guestWaitForSSH`, `guestDial`,
|
- `imageBuild`, `requestHandler`, `guestWaitForSSH`, `guestDial`,
|
||||||
|
|
@ -28,23 +34,22 @@ Acquire in this order, release in reverse. Never acquire in the opposite
|
||||||
direction.
|
direction.
|
||||||
|
|
||||||
```
|
```
|
||||||
vmLocks[id] → mu → {createOpsMu, imageBuildOpsMu, tapPoolMu}
|
vmLocks[id] → {createVMMu, imageOpsMu} → subsystem-local locks
|
||||||
```
|
```
|
||||||
|
|
||||||
|
Subsystem-local locks (tapPool.mu, sessionRegistry.mu, opRegistry.mu,
|
||||||
|
guestSessionController.attachMu/writeMu) are leaves. They do not contend
|
||||||
|
with each other.
|
||||||
|
|
||||||
Notes:
|
Notes:
|
||||||
|
|
||||||
- `vmLocks[id]` is the outer lock for any operation scoped to a single VM.
|
- `vmLocks[id]` is the outer lock for any operation scoped to a single VM.
|
||||||
Acquired via `withVMLockByID` / `withVMLockByRef`.
|
Acquired via `withVMLockByID` / `withVMLockByRef`.
|
||||||
- `mu` is currently load-bearing for both session controller lookups and
|
- `createVMMu` and `imageOpsMu` are narrow: each guards one family of
|
||||||
image registry changes. Holding it while calling into guest SSH is
|
mutations and is released before any blocking guest I/O.
|
||||||
discouraged; prefer copying needed state out under the lock and releasing
|
- Holding a subsystem-local lock while calling into guest SSH is
|
||||||
before blocking I/O.
|
discouraged; copy needed state out under the lock and release before
|
||||||
- The three subsystem locks (`createOpsMu`, `imageBuildOpsMu`, `tapPoolMu`)
|
blocking I/O.
|
||||||
are leaves. Nothing else is acquired while one is held.
|
|
||||||
|
|
||||||
The upcoming Phase 2 refactor will retire `mu` entirely by giving each
|
|
||||||
concern it currently guards its own owning type and lock. At that point
|
|
||||||
the ordering collapses to `vmLocks[id] → subsystem-local lock`.
|
|
||||||
|
|
||||||
## External API
|
## External API
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -32,16 +32,13 @@ type Daemon struct {
|
||||||
store *store.Store
|
store *store.Store
|
||||||
runner system.CommandRunner
|
runner system.CommandRunner
|
||||||
logger *slog.Logger
|
logger *slog.Logger
|
||||||
mu sync.Mutex
|
imageOpsMu sync.Mutex
|
||||||
createOpsMu sync.Mutex
|
createVMMu sync.Mutex
|
||||||
createOps map[string]*vmCreateOperationState
|
createOps opRegistry[*vmCreateOperationState]
|
||||||
imageBuildOpsMu sync.Mutex
|
imageBuildOps opRegistry[*imageBuildOperationState]
|
||||||
imageBuildOps map[string]*imageBuildOperationState
|
vmLocks vmLockSet
|
||||||
vmLocks sync.Map // map[string]*sync.Mutex; keyed by VM ID
|
sessions sessionRegistry
|
||||||
sessionControllers map[string]*guestSessionController
|
tapPool tapPool
|
||||||
tapPoolMu sync.Mutex
|
|
||||||
tapPool []string
|
|
||||||
tapPoolNext int
|
|
||||||
closing chan struct{}
|
closing chan struct{}
|
||||||
once sync.Once
|
once sync.Once
|
||||||
pid int
|
pid int
|
||||||
|
|
@ -85,9 +82,9 @@ func Open(ctx context.Context) (d *Daemon, err error) {
|
||||||
store: db,
|
store: db,
|
||||||
runner: system.NewRunner(),
|
runner: system.NewRunner(),
|
||||||
logger: logger,
|
logger: logger,
|
||||||
closing: make(chan struct{}),
|
closing: make(chan struct{}),
|
||||||
pid: os.Getpid(),
|
pid: os.Getpid(),
|
||||||
sessionControllers: make(map[string]*guestSessionController),
|
sessions: newSessionRegistry(),
|
||||||
}
|
}
|
||||||
d.ensureVMSSHClientConfig()
|
d.ensureVMSSHClientConfig()
|
||||||
d.logger.Info("daemon opened", "socket", layout.SocketPath, "state_dir", layout.StateDir, "log_level", cfg.LogLevel)
|
d.logger.Info("daemon opened", "socket", layout.SocketPath, "state_dir", layout.StateDir, "log_level", cfg.LogLevel)
|
||||||
|
|
@ -720,14 +717,7 @@ func (d *Daemon) withVMLockByIDErr(ctx context.Context, id string, fn func(model
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *Daemon) lockVMID(id string) func() {
|
func (d *Daemon) lockVMID(id string) func() {
|
||||||
// LoadOrStore is atomic: exactly one *sync.Mutex wins for each ID.
|
return d.vmLocks.lock(id)
|
||||||
// Both the map lookup and the conditional insert happen without a
|
|
||||||
// release-then-reacquire gap, eliminating the TOCTOU window that
|
|
||||||
// existed when vmLocksMu was released before lock.Lock() was called.
|
|
||||||
val, _ := d.vmLocks.LoadOrStore(id, &sync.Mutex{})
|
|
||||||
mu := val.(*sync.Mutex)
|
|
||||||
mu.Lock()
|
|
||||||
return mu.Unlock
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func marshalResultOrError(v any, err error) rpc.Response {
|
func marshalResultOrError(v any, err error) rpc.Response {
|
||||||
|
|
|
||||||
|
|
@ -51,11 +51,11 @@
|
||||||
// runtime_assets.go paths to bundled companion binaries
|
// runtime_assets.go paths to bundled companion binaries
|
||||||
// web.go embedded web UI server
|
// web.go embedded web UI server
|
||||||
//
|
//
|
||||||
// Lock ordering (current, pre-refactor):
|
// Lock ordering:
|
||||||
//
|
//
|
||||||
// vmLocks[id] → mu → {createOpsMu, imageBuildOpsMu, tapPoolMu}
|
// vmLocks[id] → {createVMMu, imageOpsMu} → subsystem-local locks
|
||||||
//
|
//
|
||||||
// The coarse mu currently guards unrelated state (session controllers,
|
// Subsystem-local locks live on the owning type (tapPool.mu,
|
||||||
// image registry mutations, in-flight VM create bookkeeping) and is the
|
// sessionRegistry.mu, opRegistry.mu, guestSessionController.attachMu/writeMu)
|
||||||
// target of the Phase 2 split. See ARCHITECTURE.md for details.
|
// and do not contend with each other. See ARCHITECTURE.md for details.
|
||||||
package daemon
|
package daemon
|
||||||
|
|
|
||||||
|
|
@ -11,6 +11,11 @@ import (
|
||||||
"banger/internal/model"
|
"banger/internal/model"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func (op *imageBuildOperationState) opID() string { return op.snapshot().ID }
|
||||||
|
func (op *imageBuildOperationState) opIsDone() bool { return op.snapshot().Done }
|
||||||
|
func (op *imageBuildOperationState) opUpdatedAt() time.Time { return op.snapshot().UpdatedAt }
|
||||||
|
func (op *imageBuildOperationState) opCancel() { op.cancelOperation() }
|
||||||
|
|
||||||
type imageBuildProgressKey struct{}
|
type imageBuildProgressKey struct{}
|
||||||
|
|
||||||
type imageBuildOperationState struct {
|
type imageBuildOperationState struct {
|
||||||
|
|
@ -161,14 +166,7 @@ func (d *Daemon) BeginImageBuild(_ context.Context, params api.ImageBuildParams)
|
||||||
}
|
}
|
||||||
buildCtx, cancel := context.WithCancel(context.Background())
|
buildCtx, cancel := context.WithCancel(context.Background())
|
||||||
op.setCancel(cancel)
|
op.setCancel(cancel)
|
||||||
|
d.imageBuildOps.insert(op)
|
||||||
d.imageBuildOpsMu.Lock()
|
|
||||||
if d.imageBuildOps == nil {
|
|
||||||
d.imageBuildOps = map[string]*imageBuildOperationState{}
|
|
||||||
}
|
|
||||||
d.imageBuildOps[op.op.ID] = op
|
|
||||||
d.imageBuildOpsMu.Unlock()
|
|
||||||
|
|
||||||
go d.runImageBuildOperation(withImageBuildProgress(buildCtx, op), op, params)
|
go d.runImageBuildOperation(withImageBuildProgress(buildCtx, op), op, params)
|
||||||
return op.snapshot(), nil
|
return op.snapshot(), nil
|
||||||
}
|
}
|
||||||
|
|
@ -183,9 +181,7 @@ func (d *Daemon) runImageBuildOperation(ctx context.Context, op *imageBuildOpera
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *Daemon) ImageBuildStatus(_ context.Context, id string) (api.ImageBuildOperation, error) {
|
func (d *Daemon) ImageBuildStatus(_ context.Context, id string) (api.ImageBuildOperation, error) {
|
||||||
d.imageBuildOpsMu.Lock()
|
op, ok := d.imageBuildOps.get(strings.TrimSpace(id))
|
||||||
op, ok := d.imageBuildOps[strings.TrimSpace(id)]
|
|
||||||
d.imageBuildOpsMu.Unlock()
|
|
||||||
if !ok {
|
if !ok {
|
||||||
return api.ImageBuildOperation{}, fmt.Errorf("image build operation not found: %s", id)
|
return api.ImageBuildOperation{}, fmt.Errorf("image build operation not found: %s", id)
|
||||||
}
|
}
|
||||||
|
|
@ -193,9 +189,7 @@ func (d *Daemon) ImageBuildStatus(_ context.Context, id string) (api.ImageBuildO
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *Daemon) CancelImageBuild(_ context.Context, id string) error {
|
func (d *Daemon) CancelImageBuild(_ context.Context, id string) error {
|
||||||
d.imageBuildOpsMu.Lock()
|
op, ok := d.imageBuildOps.get(strings.TrimSpace(id))
|
||||||
op, ok := d.imageBuildOps[strings.TrimSpace(id)]
|
|
||||||
d.imageBuildOpsMu.Unlock()
|
|
||||||
if !ok {
|
if !ok {
|
||||||
return fmt.Errorf("image build operation not found: %s", id)
|
return fmt.Errorf("image build operation not found: %s", id)
|
||||||
}
|
}
|
||||||
|
|
@ -204,15 +198,5 @@ func (d *Daemon) CancelImageBuild(_ context.Context, id string) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *Daemon) pruneImageBuildOperations(olderThan time.Time) {
|
func (d *Daemon) pruneImageBuildOperations(olderThan time.Time) {
|
||||||
d.imageBuildOpsMu.Lock()
|
d.imageBuildOps.prune(olderThan)
|
||||||
defer d.imageBuildOpsMu.Unlock()
|
|
||||||
for id, op := range d.imageBuildOps {
|
|
||||||
snapshot := op.snapshot()
|
|
||||||
if !snapshot.Done {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if snapshot.UpdatedAt.Before(olderThan) {
|
|
||||||
delete(d.imageBuildOps, id)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -16,8 +16,8 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
func (d *Daemon) BuildImage(ctx context.Context, params api.ImageBuildParams) (image model.Image, err error) {
|
func (d *Daemon) BuildImage(ctx context.Context, params api.ImageBuildParams) (image model.Image, err error) {
|
||||||
d.mu.Lock()
|
d.imageOpsMu.Lock()
|
||||||
defer d.mu.Unlock()
|
defer d.imageOpsMu.Unlock()
|
||||||
op := d.beginOperation("image.build")
|
op := d.beginOperation("image.build")
|
||||||
buildLogPath := ""
|
buildLogPath := ""
|
||||||
defer func() {
|
defer func() {
|
||||||
|
|
@ -156,8 +156,8 @@ func (d *Daemon) BuildImage(ctx context.Context, params api.ImageBuildParams) (i
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *Daemon) RegisterImage(ctx context.Context, params api.ImageRegisterParams) (image model.Image, err error) {
|
func (d *Daemon) RegisterImage(ctx context.Context, params api.ImageRegisterParams) (image model.Image, err error) {
|
||||||
d.mu.Lock()
|
d.imageOpsMu.Lock()
|
||||||
defer d.mu.Unlock()
|
defer d.imageOpsMu.Unlock()
|
||||||
|
|
||||||
name := strings.TrimSpace(params.Name)
|
name := strings.TrimSpace(params.Name)
|
||||||
if name == "" {
|
if name == "" {
|
||||||
|
|
@ -232,8 +232,8 @@ func (d *Daemon) RegisterImage(ctx context.Context, params api.ImageRegisterPara
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *Daemon) PromoteImage(ctx context.Context, idOrName string) (image model.Image, err error) {
|
func (d *Daemon) PromoteImage(ctx context.Context, idOrName string) (image model.Image, err error) {
|
||||||
d.mu.Lock()
|
d.imageOpsMu.Lock()
|
||||||
defer d.mu.Unlock()
|
defer d.imageOpsMu.Unlock()
|
||||||
|
|
||||||
op := d.beginOperation("image.promote")
|
op := d.beginOperation("image.promote")
|
||||||
defer func() {
|
defer func() {
|
||||||
|
|
@ -375,8 +375,8 @@ func writePackagesMetadata(rootfsPath string, packages []string) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *Daemon) DeleteImage(ctx context.Context, idOrName string) (model.Image, error) {
|
func (d *Daemon) DeleteImage(ctx context.Context, idOrName string) (model.Image, error) {
|
||||||
d.mu.Lock()
|
d.imageOpsMu.Lock()
|
||||||
defer d.mu.Unlock()
|
defer d.imageOpsMu.Unlock()
|
||||||
|
|
||||||
image, err := d.FindImage(ctx, idOrName)
|
image, err := d.FindImage(ctx, idOrName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
||||||
55
internal/daemon/op_registry.go
Normal file
55
internal/daemon/op_registry.go
Normal file
|
|
@ -0,0 +1,55 @@
|
||||||
|
package daemon
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// asyncOp is the protocol shared by the long-running operation state types
|
||||||
|
// (VM create, image build). Each operation has a stable ID, a done flag that
|
||||||
|
// flips to true when its goroutine finishes, an UpdatedAt for pruning, and a
|
||||||
|
// way to signal cancellation to its goroutine.
|
||||||
|
type asyncOp interface {
|
||||||
|
opID() string
|
||||||
|
opIsDone() bool
|
||||||
|
opUpdatedAt() time.Time
|
||||||
|
opCancel()
|
||||||
|
}
|
||||||
|
|
||||||
|
// opRegistry is a mutex-guarded map of in-flight operations keyed by op ID.
|
||||||
|
// One registry per operation kind; each owns its own lock, so registries do
|
||||||
|
// not contend with each other or with Daemon.mu.
|
||||||
|
type opRegistry[T asyncOp] struct {
|
||||||
|
mu sync.Mutex
|
||||||
|
byID map[string]T
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *opRegistry[T]) insert(op T) {
|
||||||
|
r.mu.Lock()
|
||||||
|
defer r.mu.Unlock()
|
||||||
|
if r.byID == nil {
|
||||||
|
r.byID = map[string]T{}
|
||||||
|
}
|
||||||
|
r.byID[op.opID()] = op
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *opRegistry[T]) get(id string) (T, bool) {
|
||||||
|
r.mu.Lock()
|
||||||
|
defer r.mu.Unlock()
|
||||||
|
op, ok := r.byID[id]
|
||||||
|
return op, ok
|
||||||
|
}
|
||||||
|
|
||||||
|
// prune drops completed operations last updated before the cutoff.
|
||||||
|
func (r *opRegistry[T]) prune(before time.Time) {
|
||||||
|
r.mu.Lock()
|
||||||
|
defer r.mu.Unlock()
|
||||||
|
for id, op := range r.byID {
|
||||||
|
if !op.opIsDone() {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if op.opUpdatedAt().Before(before) {
|
||||||
|
delete(r.byID, id)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -106,47 +106,87 @@ type guestSessionStateSnapshot struct {
|
||||||
LastError string
|
LastError string
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *Daemon) setGuestSessionController(id string, controller *guestSessionController) {
|
// sessionRegistry owns the live guest-session controller map. Its lock is
|
||||||
d.mu.Lock()
|
// independent of Daemon.mu so guest-session lookups do not contend with
|
||||||
defer d.mu.Unlock()
|
// unrelated daemon state.
|
||||||
d.sessionControllers[id] = controller
|
type sessionRegistry struct {
|
||||||
|
mu sync.Mutex
|
||||||
|
byID map[string]*guestSessionController
|
||||||
|
closed bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *Daemon) claimGuestSessionController(id string, controller *guestSessionController) bool {
|
func newSessionRegistry() sessionRegistry {
|
||||||
d.mu.Lock()
|
return sessionRegistry{byID: make(map[string]*guestSessionController)}
|
||||||
defer d.mu.Unlock()
|
}
|
||||||
if d.sessionControllers[id] != nil {
|
|
||||||
|
func (r *sessionRegistry) set(id string, controller *guestSessionController) {
|
||||||
|
r.mu.Lock()
|
||||||
|
defer r.mu.Unlock()
|
||||||
|
if r.closed {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
r.byID[id] = controller
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *sessionRegistry) claim(id string, controller *guestSessionController) bool {
|
||||||
|
r.mu.Lock()
|
||||||
|
defer r.mu.Unlock()
|
||||||
|
if r.closed {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
d.sessionControllers[id] = controller
|
if r.byID[id] != nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
r.byID[id] = controller
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *Daemon) getGuestSessionController(id string) *guestSessionController {
|
func (r *sessionRegistry) get(id string) *guestSessionController {
|
||||||
d.mu.Lock()
|
r.mu.Lock()
|
||||||
defer d.mu.Unlock()
|
defer r.mu.Unlock()
|
||||||
return d.sessionControllers[id]
|
return r.byID[id]
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *Daemon) clearGuestSessionController(id string) *guestSessionController {
|
func (r *sessionRegistry) clear(id string) *guestSessionController {
|
||||||
d.mu.Lock()
|
r.mu.Lock()
|
||||||
defer d.mu.Unlock()
|
defer r.mu.Unlock()
|
||||||
controller := d.sessionControllers[id]
|
controller := r.byID[id]
|
||||||
delete(d.sessionControllers, id)
|
delete(r.byID, id)
|
||||||
return controller
|
return controller
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *Daemon) closeGuestSessionControllers() error {
|
func (r *sessionRegistry) closeAll() error {
|
||||||
d.mu.Lock()
|
r.mu.Lock()
|
||||||
controllers := make([]*guestSessionController, 0, len(d.sessionControllers))
|
controllers := make([]*guestSessionController, 0, len(r.byID))
|
||||||
for _, controller := range d.sessionControllers {
|
for _, controller := range r.byID {
|
||||||
controllers = append(controllers, controller)
|
controllers = append(controllers, controller)
|
||||||
}
|
}
|
||||||
d.sessionControllers = nil
|
r.byID = nil
|
||||||
d.mu.Unlock()
|
r.closed = true
|
||||||
|
r.mu.Unlock()
|
||||||
var err error
|
var err error
|
||||||
for _, controller := range controllers {
|
for _, controller := range controllers {
|
||||||
err = errors.Join(err, controller.close())
|
err = errors.Join(err, controller.close())
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (d *Daemon) setGuestSessionController(id string, controller *guestSessionController) {
|
||||||
|
d.sessions.set(id, controller)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *Daemon) claimGuestSessionController(id string, controller *guestSessionController) bool {
|
||||||
|
return d.sessions.claim(id, controller)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *Daemon) getGuestSessionController(id string) *guestSessionController {
|
||||||
|
return d.sessions.get(id)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *Daemon) clearGuestSessionController(id string) *guestSessionController {
|
||||||
|
return d.sessions.clear(id)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *Daemon) closeGuestSessionControllers() error {
|
||||||
|
return d.sessions.closeAll()
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -5,10 +5,19 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
)
|
)
|
||||||
|
|
||||||
const tapPoolPrefix = "tap-pool-"
|
const tapPoolPrefix = "tap-pool-"
|
||||||
|
|
||||||
|
// tapPool owns the idle TAP interface cache plus the monotonic index used to
|
||||||
|
// name new pool entries. All access goes through mu.
|
||||||
|
type tapPool struct {
|
||||||
|
mu sync.Mutex
|
||||||
|
entries []string
|
||||||
|
next int
|
||||||
|
}
|
||||||
|
|
||||||
func (d *Daemon) initializeTapPool(ctx context.Context) error {
|
func (d *Daemon) initializeTapPool(ctx context.Context) error {
|
||||||
if d.config.TapPoolSize <= 0 || d.store == nil {
|
if d.config.TapPoolSize <= 0 || d.store == nil {
|
||||||
return nil
|
return nil
|
||||||
|
|
@ -23,9 +32,9 @@ func (d *Daemon) initializeTapPool(ctx context.Context) error {
|
||||||
next = index + 1
|
next = index + 1
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
d.tapPoolMu.Lock()
|
d.tapPool.mu.Lock()
|
||||||
d.tapPoolNext = next
|
d.tapPool.next = next
|
||||||
d.tapPoolMu.Unlock()
|
d.tapPool.mu.Unlock()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -42,14 +51,14 @@ func (d *Daemon) ensureTapPool(ctx context.Context) {
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
|
|
||||||
d.tapPoolMu.Lock()
|
d.tapPool.mu.Lock()
|
||||||
if len(d.tapPool) >= d.config.TapPoolSize {
|
if len(d.tapPool.entries) >= d.config.TapPoolSize {
|
||||||
d.tapPoolMu.Unlock()
|
d.tapPool.mu.Unlock()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
tapName := fmt.Sprintf("%s%d", tapPoolPrefix, d.tapPoolNext)
|
tapName := fmt.Sprintf("%s%d", tapPoolPrefix, d.tapPool.next)
|
||||||
d.tapPoolNext++
|
d.tapPool.next++
|
||||||
d.tapPoolMu.Unlock()
|
d.tapPool.mu.Unlock()
|
||||||
|
|
||||||
if err := d.createTap(ctx, tapName); err != nil {
|
if err := d.createTap(ctx, tapName); err != nil {
|
||||||
if d.logger != nil {
|
if d.logger != nil {
|
||||||
|
|
@ -58,9 +67,9 @@ func (d *Daemon) ensureTapPool(ctx context.Context) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
d.tapPoolMu.Lock()
|
d.tapPool.mu.Lock()
|
||||||
d.tapPool = append(d.tapPool, tapName)
|
d.tapPool.entries = append(d.tapPool.entries, tapName)
|
||||||
d.tapPoolMu.Unlock()
|
d.tapPool.mu.Unlock()
|
||||||
|
|
||||||
if d.logger != nil {
|
if d.logger != nil {
|
||||||
d.logger.Debug("tap added to idle pool", "tap_device", tapName)
|
d.logger.Debug("tap added to idle pool", "tap_device", tapName)
|
||||||
|
|
@ -69,14 +78,14 @@ func (d *Daemon) ensureTapPool(ctx context.Context) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *Daemon) acquireTap(ctx context.Context, fallbackName string) (string, error) {
|
func (d *Daemon) acquireTap(ctx context.Context, fallbackName string) (string, error) {
|
||||||
d.tapPoolMu.Lock()
|
d.tapPool.mu.Lock()
|
||||||
if n := len(d.tapPool); n > 0 {
|
if n := len(d.tapPool.entries); n > 0 {
|
||||||
tapName := d.tapPool[n-1]
|
tapName := d.tapPool.entries[n-1]
|
||||||
d.tapPool = d.tapPool[:n-1]
|
d.tapPool.entries = d.tapPool.entries[:n-1]
|
||||||
d.tapPoolMu.Unlock()
|
d.tapPool.mu.Unlock()
|
||||||
return tapName, nil
|
return tapName, nil
|
||||||
}
|
}
|
||||||
d.tapPoolMu.Unlock()
|
d.tapPool.mu.Unlock()
|
||||||
|
|
||||||
if err := d.createTap(ctx, fallbackName); err != nil {
|
if err := d.createTap(ctx, fallbackName); err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
|
|
@ -90,13 +99,13 @@ func (d *Daemon) releaseTap(ctx context.Context, tapName string) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
if isTapPoolName(tapName) {
|
if isTapPoolName(tapName) {
|
||||||
d.tapPoolMu.Lock()
|
d.tapPool.mu.Lock()
|
||||||
if len(d.tapPool) < d.config.TapPoolSize {
|
if len(d.tapPool.entries) < d.config.TapPoolSize {
|
||||||
d.tapPool = append(d.tapPool, tapName)
|
d.tapPool.entries = append(d.tapPool.entries, tapName)
|
||||||
d.tapPoolMu.Unlock()
|
d.tapPool.mu.Unlock()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
d.tapPoolMu.Unlock()
|
d.tapPool.mu.Unlock()
|
||||||
}
|
}
|
||||||
_, err := d.runner.RunSudo(ctx, "ip", "link", "del", tapName)
|
_, err := d.runner.RunSudo(ctx, "ip", "link", "del", tapName)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
|
|
|
||||||
|
|
@ -13,8 +13,8 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
func (d *Daemon) CreateVM(ctx context.Context, params api.VMCreateParams) (vm model.VMRecord, err error) {
|
func (d *Daemon) CreateVM(ctx context.Context, params api.VMCreateParams) (vm model.VMRecord, err error) {
|
||||||
d.mu.Lock()
|
d.createVMMu.Lock()
|
||||||
defer d.mu.Unlock()
|
defer d.createVMMu.Unlock()
|
||||||
op := d.beginOperation("vm.create")
|
op := d.beginOperation("vm.create")
|
||||||
defer func() {
|
defer func() {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
||||||
|
|
@ -11,6 +11,11 @@ import (
|
||||||
"banger/internal/model"
|
"banger/internal/model"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func (op *vmCreateOperationState) opID() string { return op.snapshot().ID }
|
||||||
|
func (op *vmCreateOperationState) opIsDone() bool { return op.snapshot().Done }
|
||||||
|
func (op *vmCreateOperationState) opUpdatedAt() time.Time { return op.snapshot().UpdatedAt }
|
||||||
|
func (op *vmCreateOperationState) opCancel() { op.cancelOperation() }
|
||||||
|
|
||||||
type vmCreateProgressKey struct{}
|
type vmCreateProgressKey struct{}
|
||||||
|
|
||||||
type vmCreateOperationState struct {
|
type vmCreateOperationState struct {
|
||||||
|
|
@ -148,14 +153,7 @@ func (d *Daemon) BeginVMCreate(_ context.Context, params api.VMCreateParams) (ap
|
||||||
}
|
}
|
||||||
createCtx, cancel := context.WithCancel(context.Background())
|
createCtx, cancel := context.WithCancel(context.Background())
|
||||||
op.setCancel(cancel)
|
op.setCancel(cancel)
|
||||||
|
d.createOps.insert(op)
|
||||||
d.createOpsMu.Lock()
|
|
||||||
if d.createOps == nil {
|
|
||||||
d.createOps = map[string]*vmCreateOperationState{}
|
|
||||||
}
|
|
||||||
d.createOps[op.op.ID] = op
|
|
||||||
d.createOpsMu.Unlock()
|
|
||||||
|
|
||||||
go d.runVMCreateOperation(withVMCreateProgress(createCtx, op), op, params)
|
go d.runVMCreateOperation(withVMCreateProgress(createCtx, op), op, params)
|
||||||
return op.snapshot(), nil
|
return op.snapshot(), nil
|
||||||
}
|
}
|
||||||
|
|
@ -170,9 +168,7 @@ func (d *Daemon) runVMCreateOperation(ctx context.Context, op *vmCreateOperation
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *Daemon) VMCreateStatus(_ context.Context, id string) (api.VMCreateOperation, error) {
|
func (d *Daemon) VMCreateStatus(_ context.Context, id string) (api.VMCreateOperation, error) {
|
||||||
d.createOpsMu.Lock()
|
op, ok := d.createOps.get(strings.TrimSpace(id))
|
||||||
op, ok := d.createOps[strings.TrimSpace(id)]
|
|
||||||
d.createOpsMu.Unlock()
|
|
||||||
if !ok {
|
if !ok {
|
||||||
return api.VMCreateOperation{}, fmt.Errorf("vm create operation not found: %s", id)
|
return api.VMCreateOperation{}, fmt.Errorf("vm create operation not found: %s", id)
|
||||||
}
|
}
|
||||||
|
|
@ -180,9 +176,7 @@ func (d *Daemon) VMCreateStatus(_ context.Context, id string) (api.VMCreateOpera
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *Daemon) CancelVMCreate(_ context.Context, id string) error {
|
func (d *Daemon) CancelVMCreate(_ context.Context, id string) error {
|
||||||
d.createOpsMu.Lock()
|
op, ok := d.createOps.get(strings.TrimSpace(id))
|
||||||
op, ok := d.createOps[strings.TrimSpace(id)]
|
|
||||||
d.createOpsMu.Unlock()
|
|
||||||
if !ok {
|
if !ok {
|
||||||
return fmt.Errorf("vm create operation not found: %s", id)
|
return fmt.Errorf("vm create operation not found: %s", id)
|
||||||
}
|
}
|
||||||
|
|
@ -191,15 +185,5 @@ func (d *Daemon) CancelVMCreate(_ context.Context, id string) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *Daemon) pruneVMCreateOperations(olderThan time.Time) {
|
func (d *Daemon) pruneVMCreateOperations(olderThan time.Time) {
|
||||||
d.createOpsMu.Lock()
|
d.createOps.prune(olderThan)
|
||||||
defer d.createOpsMu.Unlock()
|
|
||||||
for id, op := range d.createOps {
|
|
||||||
snapshot := op.snapshot()
|
|
||||||
if !snapshot.Done {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if snapshot.UpdatedAt.Before(olderThan) {
|
|
||||||
delete(d.createOps, id)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
19
internal/daemon/vm_locks.go
Normal file
19
internal/daemon/vm_locks.go
Normal file
|
|
@ -0,0 +1,19 @@
|
||||||
|
package daemon
|
||||||
|
|
||||||
|
import "sync"
|
||||||
|
|
||||||
|
// vmLockSet maps VM IDs to per-VM mutexes. Concurrent operations on different
|
||||||
|
// VMs run in parallel; concurrent operations on the same VM serialise.
|
||||||
|
type vmLockSet struct {
|
||||||
|
byID sync.Map // map[string]*sync.Mutex
|
||||||
|
}
|
||||||
|
|
||||||
|
// lock acquires the mutex for the given VM ID and returns its unlock func.
|
||||||
|
// LoadOrStore is atomic — exactly one *sync.Mutex wins for each ID, so there
|
||||||
|
// is no release-then-reacquire TOCTOU window.
|
||||||
|
func (s *vmLockSet) lock(id string) func() {
|
||||||
|
val, _ := s.byID.LoadOrStore(id, &sync.Mutex{})
|
||||||
|
mu := val.(*sync.Mutex)
|
||||||
|
mu.Lock()
|
||||||
|
return mu.Unlock
|
||||||
|
}
|
||||||
Loading…
Add table
Add a link
Reference in a new issue