workspace prepare: release VM mutex before guest I/O

Previously withVMLockByRef held the per-VM mutex across InspectRepo,
waitForGuestSSH, dialGuest, ImportRepoToGuest (the tar stream!), and
the readonly chmod. A large repo could block `vm stop` / `vm delete`
/ `vm restart` on the same VM for however long the import took.

Split into two phases:

  1. VM mutex held briefly to validate state (running + PID alive)
     and snapshot the fields needed for SSH (guest IP, api sock).
  2. VM mutex released. Acquire workspaceLocks[id] — a separate
     per-VM mutex scoped to workspace.prepare / workspace.export —
     for the guest I/O phase.

Lifecycle ops (stop/delete/restart/set) only take vmLocks, so they
no longer queue behind a slow import. Two concurrent prepares on the
same VM still serialise via workspaceLocks so tar streams don't
interleave. ExportVMWorkspace also acquires workspaceLocks to avoid
snapshotting a half-streamed import.

Two regression tests (sequential — they swap package-level seams):

  ReleasesVMLockDuringGuestIO: stall the import fake, assert the VM
  mutex is acquirable from another goroutine during the stall.

  SerialisesConcurrentPreparesOnSameVM: 3 concurrent prepares, assert
  Import is only ever invoked 1-at-a-time per VM.

ARCHITECTURE.md documents the split + updated lock ordering.
This commit is contained in:
Thales Maciel 2026-04-19 13:32:42 -03:00
parent 99de42385f
commit 6cd52d12f4
No known key found for this signature in database
GPG key ID: 33112E6833C34679
4 changed files with 265 additions and 22 deletions

View file

@ -10,7 +10,14 @@ primitives, and the lock ordering every caller must respect.
owning types:
- Layout, config, store, runner, logger, pid — infrastructure handles.
- `vmLocks vmLockSet` — per-VM `*sync.Mutex`, one per VM ID.
- `vmLocks vmLockSet` — per-VM `*sync.Mutex`, one per VM ID. Held only
across short, synchronous state validation and DB mutations so slow
guest I/O does not block lifecycle ops on the same VM.
- `workspaceLocks vmLockSet` — per-VM mutex scoped to
`workspace.prepare` / `workspace.export`. Serialises concurrent
workspace operations on a single VM (two simultaneous tar imports
would clobber each other) without touching `vmLocks`, so
`vm stop` / `delete` / `restart` never queue behind a slow import.
- `createVMMu sync.Mutex` — serialises `CreateVM` (guards name uniqueness
+ guest IP allocation window).
- `imageOpsMu sync.Mutex` — serialises image-registry mutations
@ -51,9 +58,14 @@ Acquire in this order, release in reverse. Never acquire in the opposite
direction.
```
vmLocks[id] → {createVMMu, imageOpsMu} → subsystem-local locks
vmLocks[id] → workspaceLocks[id] → {createVMMu, imageOpsMu} → subsystem-local locks
```
`vmLocks[id]` and `workspaceLocks[id]` are NEVER held at the same
time. `workspace.prepare` acquires `vmLocks[id]` just long enough to
validate VM state, releases it, then acquires `workspaceLocks[id]`
for the guest I/O phase.
Subsystem-local locks (`tapPool.mu`, `sessionRegistry.mu`,
`opstate.Registry` mu, `guestSessionController.attachMu` /
`writeMu`) are leaves. They do not contend with each other.

View file

@ -30,15 +30,21 @@ import (
)
type Daemon struct {
layout paths.Layout
config model.DaemonConfig
store *store.Store
runner system.CommandRunner
logger *slog.Logger
imageOpsMu sync.Mutex
createVMMu sync.Mutex
createOps opstate.Registry[*vmCreateOperationState]
vmLocks vmLockSet
layout paths.Layout
config model.DaemonConfig
store *store.Store
runner system.CommandRunner
logger *slog.Logger
imageOpsMu sync.Mutex
createVMMu sync.Mutex
createOps opstate.Registry[*vmCreateOperationState]
vmLocks vmLockSet
// workspaceLocks serialises workspace.prepare / workspace.export
// calls on the same VM (two concurrent prepares would clobber each
// other's tar streams). It is a SEPARATE scope from vmLocks so
// slow guest I/O — SSH dial, tar upload, chmod — does not block
// vm stop/delete/restart. See ARCHITECTURE.md.
workspaceLocks vmLockSet
sessions sessionRegistry
tapPool tapPool
closing chan struct{}

View file

@ -16,6 +16,14 @@ import (
"banger/internal/system"
)
// Test seams. Tests swap these to observe or stall the guest-I/O
// phase without needing a real git repo or SSH server. Production
// callers see the real implementations from the workspace package.
var (
workspaceInspectRepoFunc = ws.InspectRepo
workspaceImportFunc = ws.ImportRepoToGuest
)
func (d *Daemon) ExportVMWorkspace(ctx context.Context, params api.WorkspaceExportParams) (api.WorkspaceExportResult, error) {
guestPath := strings.TrimSpace(params.GuestPath)
if guestPath == "" {
@ -28,6 +36,12 @@ func (d *Daemon) ExportVMWorkspace(ctx context.Context, params api.WorkspaceExpo
if vm.State != model.VMStateRunning || !system.ProcessRunning(vm.Runtime.PID, vm.Runtime.APISockPath) {
return api.WorkspaceExportResult{}, fmt.Errorf("vm %q is not running", vm.Name)
}
// Serialise with any in-flight workspace.prepare on the same VM so
// we never snapshot a half-streamed tar. Does not block vm stop /
// delete / restart — those only take the VM mutex.
unlock := d.workspaceLocks.lock(vm.ID)
defer unlock()
client, err := d.dialGuest(ctx, net.JoinHostPort(vm.Runtime.GuestIP, "22"))
if err != nil {
return api.WorkspaceExportResult{}, fmt.Errorf("dial guest: %w", err)
@ -113,23 +127,37 @@ func (d *Daemon) PrepareVMWorkspace(ctx context.Context, params api.VMWorkspaceP
if branchName == "" && strings.TrimSpace(params.From) != "" {
return model.WorkspacePrepareResult{}, errors.New("workspace from requires branch")
}
var prepared model.WorkspacePrepareResult
_, err = d.withVMLockByRef(ctx, params.IDOrName, func(vm model.VMRecord) (model.VMRecord, error) {
// Phase 1: acquire the VM mutex ONLY long enough to verify state
// and snapshot the fields we need (IP, PID, api sock). Release it
// before any SSH or tar I/O so this slow operation cannot block
// vm stop / vm delete / vm restart on the same VM.
vm, err := d.withVMLockByRef(ctx, params.IDOrName, func(vm model.VMRecord) (model.VMRecord, error) {
if vm.State != model.VMStateRunning || !system.ProcessRunning(vm.Runtime.PID, vm.Runtime.APISockPath) {
return model.VMRecord{}, fmt.Errorf("vm %q is not running", vm.Name)
}
result, err := d.prepareVMWorkspaceLocked(ctx, vm, strings.TrimSpace(params.SourcePath), guestPath, branchName, fromRef, mode, params.ReadOnly)
if err != nil {
return model.VMRecord{}, err
}
prepared = result
return vm, nil
})
return prepared, err
if err != nil {
return model.WorkspacePrepareResult{}, err
}
// Phase 2: serialise concurrent workspace operations on THIS vm
// (so two prepares don't interleave tar streams), but do not
// block lifecycle ops. If the VM gets stopped or deleted mid-
// flight, the SSH dial or stream will fail naturally; ctx
// cancellation propagates through.
unlock := d.workspaceLocks.lock(vm.ID)
defer unlock()
return d.prepareVMWorkspaceGuestIO(ctx, vm, strings.TrimSpace(params.SourcePath), guestPath, branchName, fromRef, mode, params.ReadOnly)
}
func (d *Daemon) prepareVMWorkspaceLocked(ctx context.Context, vm model.VMRecord, sourcePath, guestPath, branchName, fromRef string, mode model.WorkspacePrepareMode, readOnly bool) (model.WorkspacePrepareResult, error) {
spec, err := ws.InspectRepo(ctx, sourcePath, branchName, fromRef)
// prepareVMWorkspaceGuestIO performs the actual guest-side work:
// inspect the local repo, dial SSH, stream the tar, optionally chmod
// readonly. It is called without holding the VM mutex.
func (d *Daemon) prepareVMWorkspaceGuestIO(ctx context.Context, vm model.VMRecord, sourcePath, guestPath, branchName, fromRef string, mode model.WorkspacePrepareMode, readOnly bool) (model.WorkspacePrepareResult, error) {
spec, err := workspaceInspectRepoFunc(ctx, sourcePath, branchName, fromRef)
if err != nil {
return model.WorkspacePrepareResult{}, err
}
@ -145,7 +173,7 @@ func (d *Daemon) prepareVMWorkspaceLocked(ctx context.Context, vm model.VMRecord
return model.WorkspacePrepareResult{}, fmt.Errorf("dial guest ssh: %w", err)
}
defer client.Close()
if err := ws.ImportRepoToGuest(ctx, client, spec, guestPath, mode); err != nil {
if err := workspaceImportFunc(ctx, client, spec, guestPath, mode); err != nil {
return model.WorkspacePrepareResult{}, err
}
if readOnly {

View file

@ -7,9 +7,12 @@ import (
"os"
"path/filepath"
"strings"
"sync/atomic"
"testing"
"time"
"banger/internal/api"
"banger/internal/daemon/workspace"
"banger/internal/model"
)
@ -356,6 +359,200 @@ func TestExportVMWorkspace_MultipleChangedFiles(t *testing.T) {
}
}
// TestPrepareVMWorkspace_ReleasesVMLockDuringGuestIO is a regression
// guard for an earlier design that held the per-VM mutex across SSH
// dial, tar streaming, and remote chmod. A long import could then
// block unrelated lifecycle ops (vm stop / delete / restart) on the
// same VM until it completed. The fix switched to a dedicated
// workspaceLocks set for I/O, with vmLocks held only for the brief
// state-validation phase. This test kicks off a prepare that blocks
// inside the import step and then asserts the VM mutex is acquirable
// while the prepare is mid-flight.
func TestPrepareVMWorkspace_ReleasesVMLockDuringGuestIO(t *testing.T) {
// Not parallel: mutates package-level workspaceInspectRepoFunc /
// workspaceImportFunc seams, which the other prepare-concurrency
// test also swaps.
ctx := context.Background()
apiSock := filepath.Join(t.TempDir(), "fc.sock")
firecracker := startFakeFirecracker(t, apiSock)
vm := testVM("lockbox", "image-x", "172.16.0.210")
vm.State = model.VMStateRunning
vm.Runtime.State = model.VMStateRunning
vm.Runtime.PID = firecracker.Process.Pid
vm.Runtime.APISockPath = apiSock
d := &Daemon{
store: openDaemonStore(t),
config: model.DaemonConfig{SSHKeyPath: filepath.Join(t.TempDir(), "id_ed25519")},
logger: slog.New(slog.NewTextHandler(io.Discard, nil)),
}
d.guestWaitForSSH = func(_ context.Context, _, _ string, _ time.Duration) error { return nil }
d.guestDial = func(_ context.Context, _, _ string) (guestSSHClient, error) {
return &exportGuestClient{}, nil
}
upsertDaemonVM(t, ctx, d.store, vm)
// Replace the seams. InspectRepo returns a trivial spec so the
// real filesystem isn't touched; Import blocks until we say go.
origInspect := workspaceInspectRepoFunc
origImport := workspaceImportFunc
t.Cleanup(func() {
workspaceInspectRepoFunc = origInspect
workspaceImportFunc = origImport
})
importStarted := make(chan struct{})
releaseImport := make(chan struct{})
workspaceInspectRepoFunc = func(context.Context, string, string, string) (workspace.RepoSpec, error) {
return workspace.RepoSpec{RepoName: "fake", RepoRoot: "/tmp/fake"}, nil
}
workspaceImportFunc = func(context.Context, workspace.GuestClient, workspace.RepoSpec, string, model.WorkspacePrepareMode) error {
close(importStarted)
<-releaseImport
return nil
}
// Kick off prepare in a goroutine. It will block inside the import.
prepareDone := make(chan error, 1)
go func() {
_, err := d.PrepareVMWorkspace(ctx, api.VMWorkspacePrepareParams{
IDOrName: vm.Name,
SourcePath: "/tmp/fake",
})
prepareDone <- err
}()
// Wait for prepare to reach the guest-I/O phase (past the VM
// mutex) before testing the assertion.
select {
case <-importStarted:
case <-time.After(2 * time.Second):
t.Fatal("import never started; prepare blocked before reaching guest I/O")
}
// With the fix in place, the VM mutex is free even though the
// import is in flight. Acquiring it must not wait.
acquired := make(chan struct{})
go func() {
unlock := d.lockVMID(vm.ID)
close(acquired)
unlock()
}()
select {
case <-acquired:
case <-time.After(500 * time.Millisecond):
close(releaseImport) // unblock the goroutine so the test can exit
<-prepareDone
t.Fatal("VM mutex held during guest I/O — lifecycle ops would block behind workspace prepare")
}
// Now let the import finish and make sure prepare returns.
close(releaseImport)
select {
case err := <-prepareDone:
if err != nil {
t.Fatalf("prepare returned error: %v", err)
}
case <-time.After(2 * time.Second):
t.Fatal("prepare did not return after import unblocked")
}
}
// TestPrepareVMWorkspace_SerialisesConcurrentPreparesOnSameVM asserts
// the workspaceLocks scope: two concurrent prepares on the same VM do
// NOT interleave, even though they no longer take the core VM mutex.
func TestPrepareVMWorkspace_SerialisesConcurrentPreparesOnSameVM(t *testing.T) {
// Not parallel: see note on ReleasesVMLockDuringGuestIO.
ctx := context.Background()
apiSock := filepath.Join(t.TempDir(), "fc.sock")
firecracker := startFakeFirecracker(t, apiSock)
vm := testVM("serialbox", "image-x", "172.16.0.211")
vm.State = model.VMStateRunning
vm.Runtime.State = model.VMStateRunning
vm.Runtime.PID = firecracker.Process.Pid
vm.Runtime.APISockPath = apiSock
d := &Daemon{
store: openDaemonStore(t),
config: model.DaemonConfig{SSHKeyPath: filepath.Join(t.TempDir(), "id_ed25519")},
logger: slog.New(slog.NewTextHandler(io.Discard, nil)),
}
d.guestWaitForSSH = func(_ context.Context, _, _ string, _ time.Duration) error { return nil }
d.guestDial = func(_ context.Context, _, _ string) (guestSSHClient, error) {
return &exportGuestClient{}, nil
}
upsertDaemonVM(t, ctx, d.store, vm)
origInspect := workspaceInspectRepoFunc
origImport := workspaceImportFunc
t.Cleanup(func() {
workspaceInspectRepoFunc = origInspect
workspaceImportFunc = origImport
})
workspaceInspectRepoFunc = func(context.Context, string, string, string) (workspace.RepoSpec, error) {
return workspace.RepoSpec{RepoName: "fake", RepoRoot: "/tmp/fake"}, nil
}
// Counter of simultaneous Import calls. Should never exceed 1.
var active int32
var maxObserved int32
release := make(chan struct{})
workspaceImportFunc = func(context.Context, workspace.GuestClient, workspace.RepoSpec, string, model.WorkspacePrepareMode) error {
n := atomic.AddInt32(&active, 1)
for {
prev := atomic.LoadInt32(&maxObserved)
if n <= prev || atomic.CompareAndSwapInt32(&maxObserved, prev, n) {
break
}
}
<-release
atomic.AddInt32(&active, -1)
return nil
}
const n = 3
done := make(chan error, n)
for i := 0; i < n; i++ {
go func() {
_, err := d.PrepareVMWorkspace(ctx, api.VMWorkspacePrepareParams{
IDOrName: vm.Name,
SourcePath: "/tmp/fake",
})
done <- err
}()
}
// Give goroutines a moment to queue up.
time.Sleep(100 * time.Millisecond)
if got := atomic.LoadInt32(&active); got != 1 {
close(release) // unblock to avoid hang
for i := 0; i < n; i++ {
<-done
}
t.Fatalf("%d concurrent imports, want exactly 1 (workspace lock should serialise)", got)
}
// Drain: release imports one at a time.
for i := 0; i < n; i++ {
release <- struct{}{}
}
close(release)
for i := 0; i < n; i++ {
if err := <-done; err != nil {
t.Errorf("prepare #%d error: %v", i, err)
}
}
if got := atomic.LoadInt32(&maxObserved); got != 1 {
t.Fatalf("peak concurrent imports = %d, want 1", got)
}
}
// TestExportVMWorkspace_DoesNotMutateRealIndex is a regression guard
// for an earlier design where `git add -A` ran against the guest's
// real `.git/index`, leaving staged changes behind after what the user