diff --git a/internal/daemon/daemon.go b/internal/daemon/daemon.go index ac02a2a..ca28261 100644 --- a/internal/daemon/daemon.go +++ b/internal/daemon/daemon.go @@ -18,6 +18,7 @@ import ( "banger/internal/api" "banger/internal/buildinfo" "banger/internal/config" + "banger/internal/daemon/opstate" "banger/internal/model" "banger/internal/paths" "banger/internal/rpc" @@ -34,8 +35,8 @@ type Daemon struct { logger *slog.Logger imageOpsMu sync.Mutex createVMMu sync.Mutex - createOps opRegistry[*vmCreateOperationState] - imageBuildOps opRegistry[*imageBuildOperationState] + createOps opstate.Registry[*vmCreateOperationState] + imageBuildOps opstate.Registry[*imageBuildOperationState] vmLocks vmLockSet sessions sessionRegistry tapPool tapPool diff --git a/internal/daemon/dmsnap/dmsnap.go b/internal/daemon/dmsnap/dmsnap.go new file mode 100644 index 0000000..cbc5945 --- /dev/null +++ b/internal/daemon/dmsnap/dmsnap.go @@ -0,0 +1,128 @@ +// Package dmsnap wraps the host-side device-mapper snapshot operations used +// to give each VM a copy-on-write view over a shared rootfs image. It issues +// losetup/dmsetup via a system.CommandRunner-compatible runner. +package dmsnap + +import ( + "context" + "errors" + "fmt" + "strings" + "time" +) + +// Runner is the narrow command-runner surface dmsnap needs. system.Runner +// satisfies it. +type Runner interface { + RunSudo(ctx context.Context, args ...string) ([]byte, error) +} + +// Handles records the loop devices and dm target allocated for a snapshot. +// Callers pass it back to Cleanup to unwind in the right order. +type Handles struct { + BaseLoop string + COWLoop string + DMName string + DMDev string +} + +// Create sets up a dm-snapshot named dmName layering cowPath over rootfsPath. +// On failure it cleans up whatever it had attached so far. +func Create(ctx context.Context, runner Runner, rootfsPath, cowPath, dmName string) (handles Handles, err error) { + defer func() { + if err == nil { + return + } + if cleanupErr := Cleanup(context.Background(), runner, handles); cleanupErr != nil { + err = errors.Join(err, cleanupErr) + } + }() + + baseBytes, err := runner.RunSudo(ctx, "losetup", "-f", "--show", "--read-only", rootfsPath) + if err != nil { + return handles, err + } + handles.BaseLoop = strings.TrimSpace(string(baseBytes)) + + cowBytes, err := runner.RunSudo(ctx, "losetup", "-f", "--show", cowPath) + if err != nil { + return handles, err + } + handles.COWLoop = strings.TrimSpace(string(cowBytes)) + + sectorsBytes, err := runner.RunSudo(ctx, "blockdev", "--getsz", handles.BaseLoop) + if err != nil { + return handles, err + } + sectors := strings.TrimSpace(string(sectorsBytes)) + + if _, err := runner.RunSudo(ctx, "dmsetup", "create", dmName, "--table", fmt.Sprintf("0 %s snapshot %s %s P 8", sectors, handles.BaseLoop, handles.COWLoop)); err != nil { + return handles, err + } + handles.DMName = dmName + handles.DMDev = "/dev/mapper/" + dmName + return handles, nil +} + +// Cleanup tears down a snapshot: remove the dm target, then detach the loops. +// Missing-handle errors (already cleaned up) are ignored. +func Cleanup(ctx context.Context, runner Runner, handles Handles) error { + var cleanupErr error + + switch { + case handles.DMName != "": + if err := Remove(ctx, runner, handles.DMName); err != nil { + cleanupErr = errors.Join(cleanupErr, err) + } + case handles.DMDev != "": + if err := Remove(ctx, runner, handles.DMDev); err != nil { + cleanupErr = errors.Join(cleanupErr, err) + } + } + + if handles.COWLoop != "" { + if _, err := runner.RunSudo(ctx, "losetup", "-d", handles.COWLoop); err != nil { + if !isMissing(err) { + cleanupErr = errors.Join(cleanupErr, err) + } + } + } + if handles.BaseLoop != "" { + if _, err := runner.RunSudo(ctx, "losetup", "-d", handles.BaseLoop); err != nil { + if !isMissing(err) { + cleanupErr = errors.Join(cleanupErr, err) + } + } + } + + return cleanupErr +} + +// Remove retries dmsetup remove while the device is briefly busy after +// detach. Missing targets succeed. +func Remove(ctx context.Context, runner Runner, target string) error { + deadline := time.Now().Add(15 * time.Second) + for { + if _, err := runner.RunSudo(ctx, "dmsetup", "remove", target); err != nil { + if isMissing(err) { + return nil + } + if strings.Contains(err.Error(), "Device or resource busy") && time.Now().Before(deadline) { + time.Sleep(100 * time.Millisecond) + continue + } + return err + } + return nil + } +} + +func isMissing(err error) bool { + if err == nil { + return false + } + msg := err.Error() + return strings.Contains(msg, "No such device or address") || + strings.Contains(msg, "not found") || + strings.Contains(msg, "does not exist") +} diff --git a/internal/daemon/image_build_ops.go b/internal/daemon/image_build_ops.go index f0e24af..b4d83e1 100644 --- a/internal/daemon/image_build_ops.go +++ b/internal/daemon/image_build_ops.go @@ -11,10 +11,10 @@ import ( "banger/internal/model" ) -func (op *imageBuildOperationState) opID() string { return op.snapshot().ID } -func (op *imageBuildOperationState) opIsDone() bool { return op.snapshot().Done } -func (op *imageBuildOperationState) opUpdatedAt() time.Time { return op.snapshot().UpdatedAt } -func (op *imageBuildOperationState) opCancel() { op.cancelOperation() } +func (op *imageBuildOperationState) ID() string { return op.snapshot().ID } +func (op *imageBuildOperationState) IsDone() bool { return op.snapshot().Done } +func (op *imageBuildOperationState) UpdatedAt() time.Time { return op.snapshot().UpdatedAt } +func (op *imageBuildOperationState) Cancel() { op.cancelOperation() } type imageBuildProgressKey struct{} @@ -166,7 +166,7 @@ func (d *Daemon) BeginImageBuild(_ context.Context, params api.ImageBuildParams) } buildCtx, cancel := context.WithCancel(context.Background()) op.setCancel(cancel) - d.imageBuildOps.insert(op) + d.imageBuildOps.Insert(op) go d.runImageBuildOperation(withImageBuildProgress(buildCtx, op), op, params) return op.snapshot(), nil } @@ -181,7 +181,7 @@ func (d *Daemon) runImageBuildOperation(ctx context.Context, op *imageBuildOpera } func (d *Daemon) ImageBuildStatus(_ context.Context, id string) (api.ImageBuildOperation, error) { - op, ok := d.imageBuildOps.get(strings.TrimSpace(id)) + op, ok := d.imageBuildOps.Get(strings.TrimSpace(id)) if !ok { return api.ImageBuildOperation{}, fmt.Errorf("image build operation not found: %s", id) } @@ -189,7 +189,7 @@ func (d *Daemon) ImageBuildStatus(_ context.Context, id string) (api.ImageBuildO } func (d *Daemon) CancelImageBuild(_ context.Context, id string) error { - op, ok := d.imageBuildOps.get(strings.TrimSpace(id)) + op, ok := d.imageBuildOps.Get(strings.TrimSpace(id)) if !ok { return fmt.Errorf("image build operation not found: %s", id) } @@ -198,5 +198,5 @@ func (d *Daemon) CancelImageBuild(_ context.Context, id string) error { } func (d *Daemon) pruneImageBuildOperations(olderThan time.Time) { - d.imageBuildOps.prune(olderThan) + d.imageBuildOps.Prune(olderThan) } diff --git a/internal/daemon/op_registry.go b/internal/daemon/op_registry.go deleted file mode 100644 index 2130737..0000000 --- a/internal/daemon/op_registry.go +++ /dev/null @@ -1,55 +0,0 @@ -package daemon - -import ( - "sync" - "time" -) - -// asyncOp is the protocol shared by the long-running operation state types -// (VM create, image build). Each operation has a stable ID, a done flag that -// flips to true when its goroutine finishes, an UpdatedAt for pruning, and a -// way to signal cancellation to its goroutine. -type asyncOp interface { - opID() string - opIsDone() bool - opUpdatedAt() time.Time - opCancel() -} - -// opRegistry is a mutex-guarded map of in-flight operations keyed by op ID. -// One registry per operation kind; each owns its own lock, so registries do -// not contend with each other or with Daemon.mu. -type opRegistry[T asyncOp] struct { - mu sync.Mutex - byID map[string]T -} - -func (r *opRegistry[T]) insert(op T) { - r.mu.Lock() - defer r.mu.Unlock() - if r.byID == nil { - r.byID = map[string]T{} - } - r.byID[op.opID()] = op -} - -func (r *opRegistry[T]) get(id string) (T, bool) { - r.mu.Lock() - defer r.mu.Unlock() - op, ok := r.byID[id] - return op, ok -} - -// prune drops completed operations last updated before the cutoff. -func (r *opRegistry[T]) prune(before time.Time) { - r.mu.Lock() - defer r.mu.Unlock() - for id, op := range r.byID { - if !op.opIsDone() { - continue - } - if op.opUpdatedAt().Before(before) { - delete(r.byID, id) - } - } -} diff --git a/internal/daemon/opstate/registry.go b/internal/daemon/opstate/registry.go new file mode 100644 index 0000000..d82c2be --- /dev/null +++ b/internal/daemon/opstate/registry.go @@ -0,0 +1,58 @@ +// Package opstate provides a mutex-guarded registry for long-running +// operations (e.g. async VM create, async image build). A registry stores +// operations by ID and can prune completed ones after a retention window. +package opstate + +import ( + "sync" + "time" +) + +// AsyncOp is the protocol each operation type must satisfy. Implementations +// own their own concurrency for the returned values — the registry treats +// them as opaque. +type AsyncOp interface { + ID() string + IsDone() bool + UpdatedAt() time.Time + Cancel() +} + +// Registry is a mutex-guarded map of in-flight operations keyed by op ID. +// One registry per operation kind; each owns its own lock. +type Registry[T AsyncOp] struct { + mu sync.Mutex + byID map[string]T +} + +// Insert adds op keyed by its ID. +func (r *Registry[T]) Insert(op T) { + r.mu.Lock() + defer r.mu.Unlock() + if r.byID == nil { + r.byID = map[string]T{} + } + r.byID[op.ID()] = op +} + +// Get returns the operation with the given ID, if present. +func (r *Registry[T]) Get(id string) (T, bool) { + r.mu.Lock() + defer r.mu.Unlock() + op, ok := r.byID[id] + return op, ok +} + +// Prune drops completed operations last updated before the cutoff. +func (r *Registry[T]) Prune(before time.Time) { + r.mu.Lock() + defer r.mu.Unlock() + for id, op := range r.byID { + if !op.IsDone() { + continue + } + if op.UpdatedAt().Before(before) { + delete(r.byID, id) + } + } +} diff --git a/internal/daemon/snapshot.go b/internal/daemon/snapshot.go index f6ce45d..78da1f9 100644 --- a/internal/daemon/snapshot.go +++ b/internal/daemon/snapshot.go @@ -2,110 +2,22 @@ package daemon import ( "context" - "errors" - "fmt" - "strings" - "time" + + "banger/internal/daemon/dmsnap" ) -type dmSnapshotHandles struct { - BaseLoop string - COWLoop string - DMName string - DMDev string -} +// dmSnapshotHandles is retained as a package-local alias for the subpackage +// type so existing call sites and tests read naturally. +type dmSnapshotHandles = dmsnap.Handles -func (d *Daemon) createDMSnapshot(ctx context.Context, rootfsPath, cowPath, dmName string) (handles dmSnapshotHandles, err error) { - defer func() { - if err == nil { - return - } - if cleanupErr := d.cleanupDMSnapshot(context.Background(), handles); cleanupErr != nil { - err = errors.Join(err, cleanupErr) - } - }() - - baseBytes, err := d.runner.RunSudo(ctx, "losetup", "-f", "--show", "--read-only", rootfsPath) - if err != nil { - return handles, err - } - handles.BaseLoop = strings.TrimSpace(string(baseBytes)) - - cowBytes, err := d.runner.RunSudo(ctx, "losetup", "-f", "--show", cowPath) - if err != nil { - return handles, err - } - handles.COWLoop = strings.TrimSpace(string(cowBytes)) - - sectorsBytes, err := d.runner.RunSudo(ctx, "blockdev", "--getsz", handles.BaseLoop) - if err != nil { - return handles, err - } - sectors := strings.TrimSpace(string(sectorsBytes)) - - if _, err := d.runner.RunSudo(ctx, "dmsetup", "create", dmName, "--table", fmt.Sprintf("0 %s snapshot %s %s P 8", sectors, handles.BaseLoop, handles.COWLoop)); err != nil { - return handles, err - } - handles.DMName = dmName - handles.DMDev = "/dev/mapper/" + dmName - return handles, nil +func (d *Daemon) createDMSnapshot(ctx context.Context, rootfsPath, cowPath, dmName string) (dmSnapshotHandles, error) { + return dmsnap.Create(ctx, d.runner, rootfsPath, cowPath, dmName) } func (d *Daemon) cleanupDMSnapshot(ctx context.Context, handles dmSnapshotHandles) error { - var cleanupErr error - - switch { - case handles.DMName != "": - if err := d.removeDMSnapshot(ctx, handles.DMName); err != nil { - cleanupErr = errors.Join(cleanupErr, err) - } - case handles.DMDev != "": - if err := d.removeDMSnapshot(ctx, handles.DMDev); err != nil { - cleanupErr = errors.Join(cleanupErr, err) - } - } - - if handles.COWLoop != "" { - if _, err := d.runner.RunSudo(ctx, "losetup", "-d", handles.COWLoop); err != nil { - if !isMissingSnapshotHandle(err) { - cleanupErr = errors.Join(cleanupErr, err) - } - } - } - if handles.BaseLoop != "" { - if _, err := d.runner.RunSudo(ctx, "losetup", "-d", handles.BaseLoop); err != nil { - if !isMissingSnapshotHandle(err) { - cleanupErr = errors.Join(cleanupErr, err) - } - } - } - - return cleanupErr + return dmsnap.Cleanup(ctx, d.runner, handles) } func (d *Daemon) removeDMSnapshot(ctx context.Context, target string) error { - deadline := time.Now().Add(15 * time.Second) - for { - if _, err := d.runner.RunSudo(ctx, "dmsetup", "remove", target); err != nil { - if isMissingSnapshotHandle(err) { - return nil - } - if strings.Contains(err.Error(), "Device or resource busy") && time.Now().Before(deadline) { - time.Sleep(100 * time.Millisecond) - continue - } - return err - } - return nil - } -} - -func isMissingSnapshotHandle(err error) bool { - if err == nil { - return false - } - msg := err.Error() - return strings.Contains(msg, "No such device or address") || - strings.Contains(msg, "not found") || - strings.Contains(msg, "does not exist") + return dmsnap.Remove(ctx, d.runner, target) } diff --git a/internal/daemon/vm_create_ops.go b/internal/daemon/vm_create_ops.go index f5284a0..b27dc90 100644 --- a/internal/daemon/vm_create_ops.go +++ b/internal/daemon/vm_create_ops.go @@ -11,10 +11,10 @@ import ( "banger/internal/model" ) -func (op *vmCreateOperationState) opID() string { return op.snapshot().ID } -func (op *vmCreateOperationState) opIsDone() bool { return op.snapshot().Done } -func (op *vmCreateOperationState) opUpdatedAt() time.Time { return op.snapshot().UpdatedAt } -func (op *vmCreateOperationState) opCancel() { op.cancelOperation() } +func (op *vmCreateOperationState) ID() string { return op.snapshot().ID } +func (op *vmCreateOperationState) IsDone() bool { return op.snapshot().Done } +func (op *vmCreateOperationState) UpdatedAt() time.Time { return op.snapshot().UpdatedAt } +func (op *vmCreateOperationState) Cancel() { op.cancelOperation() } type vmCreateProgressKey struct{} @@ -153,7 +153,7 @@ func (d *Daemon) BeginVMCreate(_ context.Context, params api.VMCreateParams) (ap } createCtx, cancel := context.WithCancel(context.Background()) op.setCancel(cancel) - d.createOps.insert(op) + d.createOps.Insert(op) go d.runVMCreateOperation(withVMCreateProgress(createCtx, op), op, params) return op.snapshot(), nil } @@ -168,7 +168,7 @@ func (d *Daemon) runVMCreateOperation(ctx context.Context, op *vmCreateOperation } func (d *Daemon) VMCreateStatus(_ context.Context, id string) (api.VMCreateOperation, error) { - op, ok := d.createOps.get(strings.TrimSpace(id)) + op, ok := d.createOps.Get(strings.TrimSpace(id)) if !ok { return api.VMCreateOperation{}, fmt.Errorf("vm create operation not found: %s", id) } @@ -176,7 +176,7 @@ func (d *Daemon) VMCreateStatus(_ context.Context, id string) (api.VMCreateOpera } func (d *Daemon) CancelVMCreate(_ context.Context, id string) error { - op, ok := d.createOps.get(strings.TrimSpace(id)) + op, ok := d.createOps.Get(strings.TrimSpace(id)) if !ok { return fmt.Errorf("vm create operation not found: %s", id) } @@ -185,5 +185,5 @@ func (d *Daemon) CancelVMCreate(_ context.Context, id string) error { } func (d *Daemon) pruneVMCreateOperations(olderThan time.Time) { - d.createOps.prune(olderThan) + d.createOps.Prune(olderThan) }