Add concurrent multi-VM CLI actions

Teach the lifecycle and set commands to accept multiple VM refs, resolve them from one vm list snapshot, dedupe repeated refs, and fan out the existing single-target RPCs concurrently. Valid targets still run when other refs are ambiguous or missing, and batch output stays in first-seen order.

Refactor the daemon off the single global VM mutation lock by adding per-VM locks for start/stop/restart/delete/kill/set, touch, reconcile, stale-stop, and stats updates. That keeps same-VM operations serialized while allowing different VMs to progress in parallel, including newly created VMs once their ID exists.

Verified with go test ./... and make build.
This commit is contained in:
Thales Maciel 2026-03-18 14:04:16 -03:00
parent 2d5bcb5516
commit 4812693c1e
No known key found for this signature in database
GPG key ID: 33112E6833C34679
5 changed files with 542 additions and 118 deletions

View file

@ -32,6 +32,8 @@ type Daemon struct {
runner system.CommandRunner
logger *slog.Logger
mu sync.Mutex
vmLocksMu sync.Mutex
vmLocks map[string]*sync.Mutex
closing chan struct{}
once sync.Once
pid int
@ -488,26 +490,22 @@ func (d *Daemon) reconcile(ctx context.Context) error {
return op.fail(err)
}
for _, vm := range vms {
if vm.State != model.VMStateRunning {
continue
}
if system.ProcessRunning(vm.Runtime.PID, vm.Runtime.APISockPath) {
continue
}
op.stage("stale_vm", vmLogAttrs(vm)...)
_ = d.cleanupRuntime(ctx, vm, true)
vm.State = model.VMStateStopped
vm.Runtime.State = model.VMStateStopped
vm.Runtime.PID = 0
vm.Runtime.TapDevice = ""
vm.Runtime.APISockPath = ""
vm.Runtime.BaseLoop = ""
vm.Runtime.COWLoop = ""
vm.Runtime.DMName = ""
vm.Runtime.DMDev = ""
vm.UpdatedAt = model.Now()
if err := d.store.UpsertVM(ctx, vm); err != nil {
return op.fail(err, vmLogAttrs(vm)...)
if err := d.withVMLockByIDErr(ctx, vm.ID, func(vm model.VMRecord) error {
if vm.State != model.VMStateRunning {
return nil
}
if system.ProcessRunning(vm.Runtime.PID, vm.Runtime.APISockPath) {
return nil
}
op.stage("stale_vm", vmLogAttrs(vm)...)
_ = d.cleanupRuntime(ctx, vm, true)
vm.State = model.VMStateStopped
vm.Runtime.State = model.VMStateStopped
clearRuntimeHandles(&vm)
vm.UpdatedAt = model.Now()
return d.store.UpsertVM(ctx, vm)
}); err != nil {
return op.fail(err, "vm_id", vm.ID)
}
}
if err := d.rebuildDNS(ctx); err != nil {
@ -577,17 +575,64 @@ func (d *Daemon) FindImage(ctx context.Context, idOrName string) (model.Image, e
}
func (d *Daemon) TouchVM(ctx context.Context, idOrName string) (model.VMRecord, error) {
d.mu.Lock()
defer d.mu.Unlock()
return d.withVMLockByRef(ctx, idOrName, func(vm model.VMRecord) (model.VMRecord, error) {
system.TouchNow(&vm)
if err := d.store.UpsertVM(ctx, vm); err != nil {
return model.VMRecord{}, err
}
return vm, nil
})
}
func (d *Daemon) withVMLockByRef(ctx context.Context, idOrName string, fn func(model.VMRecord) (model.VMRecord, error)) (model.VMRecord, error) {
vm, err := d.FindVM(ctx, idOrName)
if err != nil {
return model.VMRecord{}, err
}
system.TouchNow(&vm)
if err := d.store.UpsertVM(ctx, vm); err != nil {
return d.withVMLockByID(ctx, vm.ID, fn)
}
func (d *Daemon) withVMLockByID(ctx context.Context, id string, fn func(model.VMRecord) (model.VMRecord, error)) (model.VMRecord, error) {
if strings.TrimSpace(id) == "" {
return model.VMRecord{}, errors.New("vm id is required")
}
unlock := d.lockVMID(id)
defer unlock()
vm, err := d.store.GetVMByID(ctx, id)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return model.VMRecord{}, fmt.Errorf("vm %q not found", id)
}
return model.VMRecord{}, err
}
return vm, nil
return fn(vm)
}
func (d *Daemon) withVMLockByIDErr(ctx context.Context, id string, fn func(model.VMRecord) error) error {
_, err := d.withVMLockByID(ctx, id, func(vm model.VMRecord) (model.VMRecord, error) {
if err := fn(vm); err != nil {
return model.VMRecord{}, err
}
return vm, nil
})
return err
}
func (d *Daemon) lockVMID(id string) func() {
d.vmLocksMu.Lock()
if d.vmLocks == nil {
d.vmLocks = make(map[string]*sync.Mutex)
}
lock, ok := d.vmLocks[id]
if !ok {
lock = &sync.Mutex{}
d.vmLocks[id] = lock
}
d.vmLocksMu.Unlock()
lock.Lock()
return lock.Unlock
}
func marshalResultOrError(v any, err error) rpc.Response {

View file

@ -64,6 +64,8 @@ func (d *Daemon) CreateVM(ctx context.Context, params api.VMCreateParams) (vm mo
if err != nil {
return model.VMRecord{}, err
}
unlockVM := d.lockVMID(id)
defer unlockVM()
guestIP, err := d.store.NextGuestIP(ctx, bridgePrefix(d.config.BridgeIP))
if err != nil {
return model.VMRecord{}, err
@ -130,23 +132,19 @@ func (d *Daemon) CreateVM(ctx context.Context, params api.VMCreateParams) (vm mo
}
func (d *Daemon) StartVM(ctx context.Context, idOrName string) (model.VMRecord, error) {
d.mu.Lock()
defer d.mu.Unlock()
vm, err := d.FindVM(ctx, idOrName)
if err != nil {
return model.VMRecord{}, err
}
image, err := d.store.GetImageByID(ctx, vm.ImageID)
if err != nil {
return model.VMRecord{}, err
}
if vm.State == model.VMStateRunning && system.ProcessRunning(vm.Runtime.PID, vm.Runtime.APISockPath) {
if d.logger != nil {
d.logger.Info("vm already running", vmLogAttrs(vm)...)
return d.withVMLockByRef(ctx, idOrName, func(vm model.VMRecord) (model.VMRecord, error) {
image, err := d.store.GetImageByID(ctx, vm.ImageID)
if err != nil {
return model.VMRecord{}, err
}
return vm, nil
}
return d.startVMLocked(ctx, vm, image)
if vm.State == model.VMStateRunning && system.ProcessRunning(vm.Runtime.PID, vm.Runtime.APISockPath) {
if d.logger != nil {
d.logger.Info("vm already running", vmLogAttrs(vm)...)
}
return vm, nil
}
return d.startVMLocked(ctx, vm, image)
})
}
func (d *Daemon) startVMLocked(ctx context.Context, vm model.VMRecord, image model.Image) (_ model.VMRecord, err error) {
@ -292,10 +290,15 @@ func (d *Daemon) startVMLocked(ctx context.Context, vm model.VMRecord, image mod
return vm, nil
}
func (d *Daemon) StopVM(ctx context.Context, idOrName string) (vm model.VMRecord, err error) {
d.mu.Lock()
defer d.mu.Unlock()
op := d.beginOperation("vm.stop", "vm_ref", idOrName)
func (d *Daemon) StopVM(ctx context.Context, idOrName string) (model.VMRecord, error) {
return d.withVMLockByRef(ctx, idOrName, func(vm model.VMRecord) (model.VMRecord, error) {
return d.stopVMLocked(ctx, vm)
})
}
func (d *Daemon) stopVMLocked(ctx context.Context, current model.VMRecord) (vm model.VMRecord, err error) {
vm = current
op := d.beginOperation("vm.stop", "vm_ref", vm.ID)
defer func() {
if err != nil {
op.fail(err, vmLogAttrs(vm)...)
@ -303,10 +306,6 @@ func (d *Daemon) StopVM(ctx context.Context, idOrName string) (vm model.VMRecord
}
op.done(vmLogAttrs(vm)...)
}()
vm, err = d.FindVM(ctx, idOrName)
if err != nil {
return model.VMRecord{}, err
}
if vm.State != model.VMStateRunning || !system.ProcessRunning(vm.Runtime.PID, vm.Runtime.APISockPath) {
op.stage("cleanup_stale_runtime")
if err := d.cleanupRuntime(ctx, vm, true); err != nil {
@ -345,10 +344,15 @@ func (d *Daemon) StopVM(ctx context.Context, idOrName string) (vm model.VMRecord
return vm, nil
}
func (d *Daemon) KillVM(ctx context.Context, params api.VMKillParams) (vm model.VMRecord, err error) {
d.mu.Lock()
defer d.mu.Unlock()
op := d.beginOperation("vm.kill", "vm_ref", params.IDOrName, "signal", params.Signal)
func (d *Daemon) KillVM(ctx context.Context, params api.VMKillParams) (model.VMRecord, error) {
return d.withVMLockByRef(ctx, params.IDOrName, func(vm model.VMRecord) (model.VMRecord, error) {
return d.killVMLocked(ctx, vm, params.Signal)
})
}
func (d *Daemon) killVMLocked(ctx context.Context, current model.VMRecord, signalValue string) (vm model.VMRecord, err error) {
vm = current
op := d.beginOperation("vm.kill", "vm_ref", vm.ID, "signal", signalValue)
defer func() {
if err != nil {
op.fail(err, vmLogAttrs(vm)...)
@ -356,11 +360,6 @@ func (d *Daemon) KillVM(ctx context.Context, params api.VMKillParams) (vm model.
}
op.done(vmLogAttrs(vm)...)
}()
vm, err = d.FindVM(ctx, params.IDOrName)
if err != nil {
return model.VMRecord{}, err
}
if vm.State != model.VMStateRunning || !system.ProcessRunning(vm.Runtime.PID, vm.Runtime.APISockPath) {
op.stage("cleanup_stale_runtime")
if err := d.cleanupRuntime(ctx, vm, true); err != nil {
@ -375,7 +374,7 @@ func (d *Daemon) KillVM(ctx context.Context, params api.VMKillParams) (vm model.
return vm, nil
}
signal := strings.TrimSpace(params.Signal)
signal := strings.TrimSpace(signalValue)
if signal == "" {
signal = "TERM"
}
@ -413,19 +412,34 @@ func (d *Daemon) RestartVM(ctx context.Context, idOrName string) (vm model.VMRec
}
op.done(vmLogAttrs(vm)...)
}()
op.stage("stop")
vm, err = d.StopVM(ctx, idOrName)
resolved, err := d.FindVM(ctx, idOrName)
if err != nil {
return model.VMRecord{}, err
}
op.stage("start", vmLogAttrs(vm)...)
return d.StartVM(ctx, vm.ID)
return d.withVMLockByID(ctx, resolved.ID, func(vm model.VMRecord) (model.VMRecord, error) {
op.stage("stop")
vm, err = d.stopVMLocked(ctx, vm)
if err != nil {
return model.VMRecord{}, err
}
image, err := d.store.GetImageByID(ctx, vm.ImageID)
if err != nil {
return model.VMRecord{}, err
}
op.stage("start", vmLogAttrs(vm)...)
return d.startVMLocked(ctx, vm, image)
})
}
func (d *Daemon) DeleteVM(ctx context.Context, idOrName string) (vm model.VMRecord, err error) {
d.mu.Lock()
defer d.mu.Unlock()
op := d.beginOperation("vm.delete", "vm_ref", idOrName)
func (d *Daemon) DeleteVM(ctx context.Context, idOrName string) (model.VMRecord, error) {
return d.withVMLockByRef(ctx, idOrName, func(vm model.VMRecord) (model.VMRecord, error) {
return d.deleteVMLocked(ctx, vm)
})
}
func (d *Daemon) deleteVMLocked(ctx context.Context, current model.VMRecord) (vm model.VMRecord, err error) {
vm = current
op := d.beginOperation("vm.delete", "vm_ref", vm.ID)
defer func() {
if err != nil {
op.fail(err, vmLogAttrs(vm)...)
@ -433,10 +447,6 @@ func (d *Daemon) DeleteVM(ctx context.Context, idOrName string) (vm model.VMReco
}
op.done(vmLogAttrs(vm)...)
}()
vm, err = d.FindVM(ctx, idOrName)
if err != nil {
return model.VMRecord{}, err
}
if vm.State == model.VMStateRunning && system.ProcessRunning(vm.Runtime.PID, vm.Runtime.APISockPath) {
op.stage("kill_running_vm", "pid", vm.Runtime.PID)
_ = d.killVMProcess(ctx, vm.Runtime.PID)
@ -464,10 +474,15 @@ func (d *Daemon) DeleteVM(ctx context.Context, idOrName string) (vm model.VMReco
return vm, nil
}
func (d *Daemon) SetVM(ctx context.Context, params api.VMSetParams) (vm model.VMRecord, err error) {
d.mu.Lock()
defer d.mu.Unlock()
op := d.beginOperation("vm.set", "vm_ref", params.IDOrName)
func (d *Daemon) SetVM(ctx context.Context, params api.VMSetParams) (model.VMRecord, error) {
return d.withVMLockByRef(ctx, params.IDOrName, func(vm model.VMRecord) (model.VMRecord, error) {
return d.setVMLocked(ctx, vm, params)
})
}
func (d *Daemon) setVMLocked(ctx context.Context, current model.VMRecord, params api.VMSetParams) (vm model.VMRecord, err error) {
vm = current
op := d.beginOperation("vm.set", "vm_ref", vm.ID)
defer func() {
if err != nil {
op.fail(err, vmLogAttrs(vm)...)
@ -475,10 +490,6 @@ func (d *Daemon) SetVM(ctx context.Context, params api.VMSetParams) (vm model.VM
}
op.done(vmLogAttrs(vm)...)
}()
vm, err = d.FindVM(ctx, params.IDOrName)
if err != nil {
return model.VMRecord{}, err
}
running := vm.State == model.VMStateRunning && system.ProcessRunning(vm.Runtime.PID, vm.Runtime.APISockPath)
if params.VCPUCount != nil {
if err := validateOptionalPositiveSetting("vcpu", params.VCPUCount); err != nil {
@ -541,12 +552,16 @@ func (d *Daemon) SetVM(ctx context.Context, params api.VMSetParams) (vm model.VM
}
func (d *Daemon) GetVMStats(ctx context.Context, idOrName string) (model.VMRecord, model.VMStats, error) {
d.mu.Lock()
defer d.mu.Unlock()
vm, err := d.FindVM(ctx, idOrName)
vm, err := d.withVMLockByRef(ctx, idOrName, func(vm model.VMRecord) (model.VMRecord, error) {
return d.getVMStatsLocked(ctx, vm)
})
if err != nil {
return model.VMRecord{}, model.VMStats{}, err
}
return vm, vm.Stats, nil
}
func (d *Daemon) getVMStatsLocked(ctx context.Context, vm model.VMRecord) (model.VMRecord, error) {
stats, err := d.collectStats(ctx, vm)
if err == nil {
vm.Stats = stats
@ -556,30 +571,32 @@ func (d *Daemon) GetVMStats(ctx context.Context, idOrName string) (model.VMRecor
d.logger.Debug("vm stats collected", append(vmLogAttrs(vm), "rss_bytes", stats.RSSBytes, "vsz_bytes", stats.VSZBytes, "cpu_percent", stats.CPUPercent)...)
}
}
return vm, vm.Stats, nil
return vm, nil
}
func (d *Daemon) pollStats(ctx context.Context) error {
d.mu.Lock()
defer d.mu.Unlock()
vms, err := d.store.ListVMs(ctx)
if err != nil {
return err
}
for _, vm := range vms {
if vm.State != model.VMStateRunning || !system.ProcessRunning(vm.Runtime.PID, vm.Runtime.APISockPath) {
continue
}
stats, err := d.collectStats(ctx, vm)
if err != nil {
if d.logger != nil {
d.logger.Debug("vm stats collection failed", append(vmLogAttrs(vm), "error", err.Error())...)
if err := d.withVMLockByIDErr(ctx, vm.ID, func(vm model.VMRecord) error {
if vm.State != model.VMStateRunning || !system.ProcessRunning(vm.Runtime.PID, vm.Runtime.APISockPath) {
return nil
}
continue
stats, err := d.collectStats(ctx, vm)
if err != nil {
if d.logger != nil {
d.logger.Debug("vm stats collection failed", append(vmLogAttrs(vm), "error", err.Error())...)
}
return nil
}
vm.Stats = stats
vm.UpdatedAt = model.Now()
return d.store.UpsertVM(ctx, vm)
}); err != nil {
return err
}
vm.Stats = stats
vm.UpdatedAt = model.Now()
_ = d.store.UpsertVM(ctx, vm)
}
return nil
}
@ -596,29 +613,31 @@ func (d *Daemon) stopStaleVMs(ctx context.Context) (err error) {
}
op.done()
}()
d.mu.Lock()
defer d.mu.Unlock()
vms, err := d.store.ListVMs(ctx)
if err != nil {
return err
}
now := model.Now()
for _, vm := range vms {
if vm.State != model.VMStateRunning || !system.ProcessRunning(vm.Runtime.PID, vm.Runtime.APISockPath) {
continue
if err := d.withVMLockByIDErr(ctx, vm.ID, func(vm model.VMRecord) error {
if vm.State != model.VMStateRunning || !system.ProcessRunning(vm.Runtime.PID, vm.Runtime.APISockPath) {
return nil
}
if now.Sub(vm.LastTouchedAt) < d.config.AutoStopStaleAfter {
return nil
}
op.stage("stopping_vm", vmLogAttrs(vm)...)
_ = d.sendCtrlAltDel(ctx, vm)
_ = d.waitForExit(ctx, vm.Runtime.PID, vm.Runtime.APISockPath, 10*time.Second)
_ = d.cleanupRuntime(ctx, vm, true)
vm.State = model.VMStateStopped
vm.Runtime.State = model.VMStateStopped
clearRuntimeHandles(&vm)
vm.UpdatedAt = model.Now()
return d.store.UpsertVM(ctx, vm)
}); err != nil {
return err
}
if now.Sub(vm.LastTouchedAt) < d.config.AutoStopStaleAfter {
continue
}
op.stage("stopping_vm", vmLogAttrs(vm)...)
_ = d.sendCtrlAltDel(ctx, vm)
_ = d.waitForExit(ctx, vm.Runtime.PID, vm.Runtime.APISockPath, 10*time.Second)
_ = d.cleanupRuntime(ctx, vm, true)
vm.State = model.VMStateStopped
vm.Runtime.State = model.VMStateStopped
clearRuntimeHandles(&vm)
vm.UpdatedAt = model.Now()
_ = d.store.UpsertVM(ctx, vm)
}
return nil
}

View file

@ -538,6 +538,108 @@ func TestStopVMFallsBackToForcedCleanupAfterGracefulTimeout(t *testing.T) {
}
}
func TestWithVMLockByIDSerializesSameVM(t *testing.T) {
ctx := context.Background()
db := openDaemonStore(t)
vm := testVM("serial", "image-serial", "172.16.0.30")
if err := db.UpsertVM(ctx, vm); err != nil {
t.Fatalf("UpsertVM: %v", err)
}
d := &Daemon{store: db}
firstEntered := make(chan struct{})
releaseFirst := make(chan struct{})
secondEntered := make(chan struct{})
errCh := make(chan error, 2)
go func() {
_, err := d.withVMLockByID(ctx, vm.ID, func(vm model.VMRecord) (model.VMRecord, error) {
close(firstEntered)
<-releaseFirst
return vm, nil
})
errCh <- err
}()
select {
case <-firstEntered:
case <-time.After(500 * time.Millisecond):
t.Fatal("first lock holder did not enter")
}
go func() {
_, err := d.withVMLockByID(ctx, vm.ID, func(vm model.VMRecord) (model.VMRecord, error) {
close(secondEntered)
return vm, nil
})
errCh <- err
}()
select {
case <-secondEntered:
t.Fatal("second same-vm lock holder entered before release")
case <-time.After(150 * time.Millisecond):
}
close(releaseFirst)
select {
case <-secondEntered:
case <-time.After(500 * time.Millisecond):
t.Fatal("second same-vm lock holder never entered")
}
for i := 0; i < 2; i++ {
if err := <-errCh; err != nil {
t.Fatalf("withVMLockByID returned error: %v", err)
}
}
}
func TestWithVMLockByIDAllowsDifferentVMsConcurrently(t *testing.T) {
ctx := context.Background()
db := openDaemonStore(t)
vmA := testVM("alpha-lock", "image-alpha", "172.16.0.31")
vmB := testVM("bravo-lock", "image-bravo", "172.16.0.32")
for _, vm := range []model.VMRecord{vmA, vmB} {
if err := db.UpsertVM(ctx, vm); err != nil {
t.Fatalf("UpsertVM(%s): %v", vm.Name, err)
}
}
d := &Daemon{store: db}
started := make(chan string, 2)
release := make(chan struct{})
errCh := make(chan error, 2)
run := func(id string) {
_, err := d.withVMLockByID(ctx, id, func(vm model.VMRecord) (model.VMRecord, error) {
started <- vm.ID
<-release
return vm, nil
})
errCh <- err
}
go run(vmA.ID)
go run(vmB.ID)
for i := 0; i < 2; i++ {
select {
case <-started:
case <-time.After(500 * time.Millisecond):
t.Fatal("different VM locks did not overlap")
}
}
close(release)
for i := 0; i < 2; i++ {
if err := <-errCh; err != nil {
t.Fatalf("withVMLockByID returned error: %v", err)
}
}
}
func openDaemonStore(t *testing.T) *store.Store {
t.Helper()
db, err := store.Open(filepath.Join(t.TempDir(), "state.db"))