banger/internal/daemon/opstate/registry.go
Thales Maciel 3c0af3a2de
opstate,daemon: list in-flight operations via daemon.operations.list
Prerequisite for `banger update`'s preflight, which refuses to swap
binaries while anything is in flight. Today's opstate.Registry
exposes Insert/Get/Prune but no iteration; without a snapshot
accessor the update flow can't tell whether a vm.create is
mid-prepare-work-disk.

  * opstate.Registry.List(): returns a freshly-allocated snapshot
    of every entry. Mutating the slice doesn't poison the
    registry. Pinned by tests covering the snapshot semantics
    and the empty case.
  * api.OperationSummary / OperationsListResult: a public-shape
    record per op. Today the Kind is always "vm.create" — the
    field exists so future async kinds (image.pull, kernel.pull)
    plug in without an API change.
  * Daemon.ListOperations + daemon.operations.list RPC:
    walks vmService.createOps and emits OperationSummary entries.
    Done ops are included in the snapshot; the update preflight
    filters by Done itself.
  * dispatch_test's documented-methods list updated.

No behaviour change for existing flows; this is a read-only
addition.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-28 18:14:57 -03:00

75 lines
1.9 KiB
Go

// Package opstate provides a mutex-guarded registry for long-running
// operations (e.g. async VM create, async image build). A registry stores
// operations by ID and can prune completed ones after a retention window.
package opstate
import (
"sync"
"time"
)
// AsyncOp is the protocol each operation type must satisfy. Implementations
// own their own concurrency for the returned values — the registry treats
// them as opaque.
type AsyncOp interface {
ID() string
IsDone() bool
UpdatedAt() time.Time
Cancel()
}
// Registry is a mutex-guarded map of in-flight operations keyed by op ID.
// One registry per operation kind; each owns its own lock.
type Registry[T AsyncOp] struct {
mu sync.Mutex
byID map[string]T
}
// Insert adds op keyed by its ID.
func (r *Registry[T]) Insert(op T) {
r.mu.Lock()
defer r.mu.Unlock()
if r.byID == nil {
r.byID = map[string]T{}
}
r.byID[op.ID()] = op
}
// Get returns the operation with the given ID, if present.
func (r *Registry[T]) Get(id string) (T, bool) {
r.mu.Lock()
defer r.mu.Unlock()
op, ok := r.byID[id]
return op, ok
}
// List returns a snapshot of every operation currently in the
// registry — both pending and (un-pruned) completed. Callers filter
// by IsDone() if they care about state. The slice is freshly
// allocated; mutating it doesn't affect the registry.
//
// Used by `banger update`'s preflight to detect in-flight operations
// before swapping binaries.
func (r *Registry[T]) List() []T {
r.mu.Lock()
defer r.mu.Unlock()
out := make([]T, 0, len(r.byID))
for _, op := range r.byID {
out = append(out, op)
}
return out
}
// Prune drops completed operations last updated before the cutoff.
func (r *Registry[T]) Prune(before time.Time) {
r.mu.Lock()
defer r.mu.Unlock()
for id, op := range r.byID {
if !op.IsDone() {
continue
}
if op.UpdatedAt().Before(before) {
delete(r.byID, id)
}
}
}