Prerequisite for `banger update`'s preflight, which refuses to swap
binaries while anything is in flight. Today's opstate.Registry
exposes Insert/Get/Prune but no iteration; without a snapshot
accessor the update flow can't tell whether a vm.create is
mid-prepare-work-disk.
* opstate.Registry.List(): returns a freshly-allocated snapshot
of every entry. Mutating the slice doesn't poison the
registry. Pinned by tests covering the snapshot semantics
and the empty case.
* api.OperationSummary / OperationsListResult: a public-shape
record per op. Today the Kind is always "vm.create" — the
field exists so future async kinds (image.pull, kernel.pull)
plug in without an API change.
* Daemon.ListOperations + daemon.operations.list RPC:
walks vmService.createOps and emits OperationSummary entries.
Done ops are included in the snapshot; the update preflight
filters by Done itself.
* dispatch_test's documented-methods list updated.
No behaviour change for existing flows; this is a read-only
addition.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
75 lines
1.9 KiB
Go
75 lines
1.9 KiB
Go
// Package opstate provides a mutex-guarded registry for long-running
|
|
// operations (e.g. async VM create, async image build). A registry stores
|
|
// operations by ID and can prune completed ones after a retention window.
|
|
package opstate
|
|
|
|
import (
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
// AsyncOp is the protocol each operation type must satisfy. Implementations
|
|
// own their own concurrency for the returned values — the registry treats
|
|
// them as opaque.
|
|
type AsyncOp interface {
|
|
ID() string
|
|
IsDone() bool
|
|
UpdatedAt() time.Time
|
|
Cancel()
|
|
}
|
|
|
|
// Registry is a mutex-guarded map of in-flight operations keyed by op ID.
|
|
// One registry per operation kind; each owns its own lock.
|
|
type Registry[T AsyncOp] struct {
|
|
mu sync.Mutex
|
|
byID map[string]T
|
|
}
|
|
|
|
// Insert adds op keyed by its ID.
|
|
func (r *Registry[T]) Insert(op T) {
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
if r.byID == nil {
|
|
r.byID = map[string]T{}
|
|
}
|
|
r.byID[op.ID()] = op
|
|
}
|
|
|
|
// Get returns the operation with the given ID, if present.
|
|
func (r *Registry[T]) Get(id string) (T, bool) {
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
op, ok := r.byID[id]
|
|
return op, ok
|
|
}
|
|
|
|
// List returns a snapshot of every operation currently in the
|
|
// registry — both pending and (un-pruned) completed. Callers filter
|
|
// by IsDone() if they care about state. The slice is freshly
|
|
// allocated; mutating it doesn't affect the registry.
|
|
//
|
|
// Used by `banger update`'s preflight to detect in-flight operations
|
|
// before swapping binaries.
|
|
func (r *Registry[T]) List() []T {
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
out := make([]T, 0, len(r.byID))
|
|
for _, op := range r.byID {
|
|
out = append(out, op)
|
|
}
|
|
return out
|
|
}
|
|
|
|
// Prune drops completed operations last updated before the cutoff.
|
|
func (r *Registry[T]) Prune(before time.Time) {
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
for id, op := range r.byID {
|
|
if !op.IsDone() {
|
|
continue
|
|
}
|
|
if op.UpdatedAt().Before(before) {
|
|
delete(r.byID, id)
|
|
}
|
|
}
|
|
}
|