package daemon import ( "context" "fmt" "strings" "sync" "time" "banger/internal/api" "banger/internal/model" ) func (op *vmCreateOperationState) ID() string { return op.snapshot().ID } func (op *vmCreateOperationState) IsDone() bool { return op.snapshot().Done } func (op *vmCreateOperationState) UpdatedAt() time.Time { return op.snapshot().UpdatedAt } func (op *vmCreateOperationState) Cancel() { op.cancelOperation() } type vmCreateProgressKey struct{} type vmCreateOperationState struct { mu sync.Mutex cancel context.CancelFunc op api.VMCreateOperation } // newVMCreateOperationState constructs the async-progress record for // a vm.create.begin RPC. When the caller's context already carries a // dispatch-assigned op id (the normal path), we reuse it so the // operator-visible status id and the daemon-log op_id are the same // string. Otherwise we mint a fresh op id — keeps the same shape on // internal call sites that don't go through dispatch (tests, future // background creators). func newVMCreateOperationState(ctx context.Context) (*vmCreateOperationState, error) { id := OpIDFromContext(ctx) if id == "" { var err error id, err = model.NewOpID() if err != nil { return nil, err } } now := model.Now() return &vmCreateOperationState{ op: api.VMCreateOperation{ ID: id, Stage: "queued", Detail: "waiting to start", StartedAt: now, UpdatedAt: now, }, }, nil } func withVMCreateProgress(ctx context.Context, op *vmCreateOperationState) context.Context { if op == nil { return ctx } return context.WithValue(ctx, vmCreateProgressKey{}, op) } func vmCreateProgressFromContext(ctx context.Context) *vmCreateOperationState { if ctx == nil { return nil } op, _ := ctx.Value(vmCreateProgressKey{}).(*vmCreateOperationState) return op } func vmCreateStage(ctx context.Context, stage, detail string) { if op := vmCreateProgressFromContext(ctx); op != nil { op.stage(stage, detail) } } func vmCreateBindVM(ctx context.Context, vm model.VMRecord) { if op := vmCreateProgressFromContext(ctx); op != nil { op.bindVM(vm) } } func (op *vmCreateOperationState) setCancel(cancel context.CancelFunc) { op.mu.Lock() defer op.mu.Unlock() op.cancel = cancel } func (op *vmCreateOperationState) bindVM(vm model.VMRecord) { op.mu.Lock() defer op.mu.Unlock() op.op.VMID = vm.ID op.op.VMName = vm.Name } func (op *vmCreateOperationState) stage(stage, detail string) { op.mu.Lock() defer op.mu.Unlock() stage = strings.TrimSpace(stage) detail = strings.TrimSpace(detail) if stage == "" { stage = op.op.Stage } if stage == op.op.Stage && detail == op.op.Detail { return } op.op.Stage = stage op.op.Detail = detail op.op.UpdatedAt = model.Now() } func (op *vmCreateOperationState) done(vm model.VMRecord) { op.mu.Lock() defer op.mu.Unlock() vmCopy := vm op.op.VMID = vm.ID op.op.VMName = vm.Name op.op.Stage = "ready" op.op.Detail = "vm is ready" op.op.Done = true op.op.Success = true op.op.Error = "" op.op.VM = &vmCopy op.op.UpdatedAt = model.Now() } func (op *vmCreateOperationState) fail(err error) { op.mu.Lock() defer op.mu.Unlock() op.op.Done = true op.op.Success = false if err != nil { op.op.Error = err.Error() } if strings.TrimSpace(op.op.Detail) == "" { op.op.Detail = "vm create failed" } op.op.UpdatedAt = model.Now() } func (op *vmCreateOperationState) snapshot() api.VMCreateOperation { op.mu.Lock() defer op.mu.Unlock() snapshot := op.op if snapshot.VM != nil { vmCopy := *snapshot.VM snapshot.VM = &vmCopy } return snapshot } func (op *vmCreateOperationState) cancelOperation() { op.mu.Lock() cancel := op.cancel op.mu.Unlock() if cancel != nil { cancel() } } func (s *VMService) BeginVMCreate(ctx context.Context, params api.VMCreateParams) (api.VMCreateOperation, error) { op, err := newVMCreateOperationState(ctx) if err != nil { return api.VMCreateOperation{}, err } // Detach from the caller's deadline (the begin RPC returns // immediately) but preserve the op id so every log line emitted // by the goroutine carries the same identifier the client just // got back. createCtx, cancel := context.WithCancel(WithOpID(context.Background(), op.op.ID)) op.setCancel(cancel) s.createOps.Insert(op) go s.runVMCreateOperation(withVMCreateProgress(createCtx, op), op, params) return op.snapshot(), nil } func (s *VMService) runVMCreateOperation(ctx context.Context, op *vmCreateOperationState, params api.VMCreateParams) { vm, err := s.CreateVM(ctx, params) if err != nil { op.fail(err) return } op.done(vm) } func (s *VMService) VMCreateStatus(_ context.Context, id string) (api.VMCreateOperation, error) { op, ok := s.createOps.Get(strings.TrimSpace(id)) if !ok { return api.VMCreateOperation{}, fmt.Errorf("vm create operation not found: %s", id) } return op.snapshot(), nil } func (s *VMService) CancelVMCreate(_ context.Context, id string) error { op, ok := s.createOps.Get(strings.TrimSpace(id)) if !ok { return fmt.Errorf("vm create operation not found: %s", id) } op.cancelOperation() return nil } func (s *VMService) pruneVMCreateOperations(olderThan time.Time) { s.createOps.Prune(olderThan) }