From 6cd52d12f4f97cfdd6941ca27501873df6ab2625 Mon Sep 17 00:00:00 2001 From: Thales Maciel Date: Sun, 19 Apr 2026 13:32:42 -0300 Subject: [PATCH] workspace prepare: release VM mutex before guest I/O MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previously withVMLockByRef held the per-VM mutex across InspectRepo, waitForGuestSSH, dialGuest, ImportRepoToGuest (the tar stream!), and the readonly chmod. A large repo could block `vm stop` / `vm delete` / `vm restart` on the same VM for however long the import took. Split into two phases: 1. VM mutex held briefly to validate state (running + PID alive) and snapshot the fields needed for SSH (guest IP, api sock). 2. VM mutex released. Acquire workspaceLocks[id] — a separate per-VM mutex scoped to workspace.prepare / workspace.export — for the guest I/O phase. Lifecycle ops (stop/delete/restart/set) only take vmLocks, so they no longer queue behind a slow import. Two concurrent prepares on the same VM still serialise via workspaceLocks so tar streams don't interleave. ExportVMWorkspace also acquires workspaceLocks to avoid snapshotting a half-streamed import. Two regression tests (sequential — they swap package-level seams): ReleasesVMLockDuringGuestIO: stall the import fake, assert the VM mutex is acquirable from another goroutine during the stall. SerialisesConcurrentPreparesOnSameVM: 3 concurrent prepares, assert Import is only ever invoked 1-at-a-time per VM. ARCHITECTURE.md documents the split + updated lock ordering. --- internal/daemon/ARCHITECTURE.md | 16 ++- internal/daemon/daemon.go | 24 ++-- internal/daemon/workspace.go | 50 ++++++-- internal/daemon/workspace_test.go | 197 ++++++++++++++++++++++++++++++ 4 files changed, 265 insertions(+), 22 deletions(-) diff --git a/internal/daemon/ARCHITECTURE.md b/internal/daemon/ARCHITECTURE.md index eed47dd..c8674e7 100644 --- a/internal/daemon/ARCHITECTURE.md +++ b/internal/daemon/ARCHITECTURE.md @@ -10,7 +10,14 @@ primitives, and the lock ordering every caller must respect. owning types: - Layout, config, store, runner, logger, pid — infrastructure handles. -- `vmLocks vmLockSet` — per-VM `*sync.Mutex`, one per VM ID. +- `vmLocks vmLockSet` — per-VM `*sync.Mutex`, one per VM ID. Held only + across short, synchronous state validation and DB mutations so slow + guest I/O does not block lifecycle ops on the same VM. +- `workspaceLocks vmLockSet` — per-VM mutex scoped to + `workspace.prepare` / `workspace.export`. Serialises concurrent + workspace operations on a single VM (two simultaneous tar imports + would clobber each other) without touching `vmLocks`, so + `vm stop` / `delete` / `restart` never queue behind a slow import. - `createVMMu sync.Mutex` — serialises `CreateVM` (guards name uniqueness + guest IP allocation window). - `imageOpsMu sync.Mutex` — serialises image-registry mutations @@ -51,9 +58,14 @@ Acquire in this order, release in reverse. Never acquire in the opposite direction. ``` -vmLocks[id] → {createVMMu, imageOpsMu} → subsystem-local locks +vmLocks[id] → workspaceLocks[id] → {createVMMu, imageOpsMu} → subsystem-local locks ``` +`vmLocks[id]` and `workspaceLocks[id]` are NEVER held at the same +time. `workspace.prepare` acquires `vmLocks[id]` just long enough to +validate VM state, releases it, then acquires `workspaceLocks[id]` +for the guest I/O phase. + Subsystem-local locks (`tapPool.mu`, `sessionRegistry.mu`, `opstate.Registry` mu, `guestSessionController.attachMu` / `writeMu`) are leaves. They do not contend with each other. diff --git a/internal/daemon/daemon.go b/internal/daemon/daemon.go index c39fdae..0c252e5 100644 --- a/internal/daemon/daemon.go +++ b/internal/daemon/daemon.go @@ -30,15 +30,21 @@ import ( ) type Daemon struct { - layout paths.Layout - config model.DaemonConfig - store *store.Store - runner system.CommandRunner - logger *slog.Logger - imageOpsMu sync.Mutex - createVMMu sync.Mutex - createOps opstate.Registry[*vmCreateOperationState] - vmLocks vmLockSet + layout paths.Layout + config model.DaemonConfig + store *store.Store + runner system.CommandRunner + logger *slog.Logger + imageOpsMu sync.Mutex + createVMMu sync.Mutex + createOps opstate.Registry[*vmCreateOperationState] + vmLocks vmLockSet + // workspaceLocks serialises workspace.prepare / workspace.export + // calls on the same VM (two concurrent prepares would clobber each + // other's tar streams). It is a SEPARATE scope from vmLocks so + // slow guest I/O — SSH dial, tar upload, chmod — does not block + // vm stop/delete/restart. See ARCHITECTURE.md. + workspaceLocks vmLockSet sessions sessionRegistry tapPool tapPool closing chan struct{} diff --git a/internal/daemon/workspace.go b/internal/daemon/workspace.go index 58bae96..f94085b 100644 --- a/internal/daemon/workspace.go +++ b/internal/daemon/workspace.go @@ -16,6 +16,14 @@ import ( "banger/internal/system" ) +// Test seams. Tests swap these to observe or stall the guest-I/O +// phase without needing a real git repo or SSH server. Production +// callers see the real implementations from the workspace package. +var ( + workspaceInspectRepoFunc = ws.InspectRepo + workspaceImportFunc = ws.ImportRepoToGuest +) + func (d *Daemon) ExportVMWorkspace(ctx context.Context, params api.WorkspaceExportParams) (api.WorkspaceExportResult, error) { guestPath := strings.TrimSpace(params.GuestPath) if guestPath == "" { @@ -28,6 +36,12 @@ func (d *Daemon) ExportVMWorkspace(ctx context.Context, params api.WorkspaceExpo if vm.State != model.VMStateRunning || !system.ProcessRunning(vm.Runtime.PID, vm.Runtime.APISockPath) { return api.WorkspaceExportResult{}, fmt.Errorf("vm %q is not running", vm.Name) } + // Serialise with any in-flight workspace.prepare on the same VM so + // we never snapshot a half-streamed tar. Does not block vm stop / + // delete / restart — those only take the VM mutex. + unlock := d.workspaceLocks.lock(vm.ID) + defer unlock() + client, err := d.dialGuest(ctx, net.JoinHostPort(vm.Runtime.GuestIP, "22")) if err != nil { return api.WorkspaceExportResult{}, fmt.Errorf("dial guest: %w", err) @@ -113,23 +127,37 @@ func (d *Daemon) PrepareVMWorkspace(ctx context.Context, params api.VMWorkspaceP if branchName == "" && strings.TrimSpace(params.From) != "" { return model.WorkspacePrepareResult{}, errors.New("workspace from requires branch") } - var prepared model.WorkspacePrepareResult - _, err = d.withVMLockByRef(ctx, params.IDOrName, func(vm model.VMRecord) (model.VMRecord, error) { + + // Phase 1: acquire the VM mutex ONLY long enough to verify state + // and snapshot the fields we need (IP, PID, api sock). Release it + // before any SSH or tar I/O so this slow operation cannot block + // vm stop / vm delete / vm restart on the same VM. + vm, err := d.withVMLockByRef(ctx, params.IDOrName, func(vm model.VMRecord) (model.VMRecord, error) { if vm.State != model.VMStateRunning || !system.ProcessRunning(vm.Runtime.PID, vm.Runtime.APISockPath) { return model.VMRecord{}, fmt.Errorf("vm %q is not running", vm.Name) } - result, err := d.prepareVMWorkspaceLocked(ctx, vm, strings.TrimSpace(params.SourcePath), guestPath, branchName, fromRef, mode, params.ReadOnly) - if err != nil { - return model.VMRecord{}, err - } - prepared = result return vm, nil }) - return prepared, err + if err != nil { + return model.WorkspacePrepareResult{}, err + } + + // Phase 2: serialise concurrent workspace operations on THIS vm + // (so two prepares don't interleave tar streams), but do not + // block lifecycle ops. If the VM gets stopped or deleted mid- + // flight, the SSH dial or stream will fail naturally; ctx + // cancellation propagates through. + unlock := d.workspaceLocks.lock(vm.ID) + defer unlock() + + return d.prepareVMWorkspaceGuestIO(ctx, vm, strings.TrimSpace(params.SourcePath), guestPath, branchName, fromRef, mode, params.ReadOnly) } -func (d *Daemon) prepareVMWorkspaceLocked(ctx context.Context, vm model.VMRecord, sourcePath, guestPath, branchName, fromRef string, mode model.WorkspacePrepareMode, readOnly bool) (model.WorkspacePrepareResult, error) { - spec, err := ws.InspectRepo(ctx, sourcePath, branchName, fromRef) +// prepareVMWorkspaceGuestIO performs the actual guest-side work: +// inspect the local repo, dial SSH, stream the tar, optionally chmod +// readonly. It is called without holding the VM mutex. +func (d *Daemon) prepareVMWorkspaceGuestIO(ctx context.Context, vm model.VMRecord, sourcePath, guestPath, branchName, fromRef string, mode model.WorkspacePrepareMode, readOnly bool) (model.WorkspacePrepareResult, error) { + spec, err := workspaceInspectRepoFunc(ctx, sourcePath, branchName, fromRef) if err != nil { return model.WorkspacePrepareResult{}, err } @@ -145,7 +173,7 @@ func (d *Daemon) prepareVMWorkspaceLocked(ctx context.Context, vm model.VMRecord return model.WorkspacePrepareResult{}, fmt.Errorf("dial guest ssh: %w", err) } defer client.Close() - if err := ws.ImportRepoToGuest(ctx, client, spec, guestPath, mode); err != nil { + if err := workspaceImportFunc(ctx, client, spec, guestPath, mode); err != nil { return model.WorkspacePrepareResult{}, err } if readOnly { diff --git a/internal/daemon/workspace_test.go b/internal/daemon/workspace_test.go index 8e26eaf..49f7ff4 100644 --- a/internal/daemon/workspace_test.go +++ b/internal/daemon/workspace_test.go @@ -7,9 +7,12 @@ import ( "os" "path/filepath" "strings" + "sync/atomic" "testing" + "time" "banger/internal/api" + "banger/internal/daemon/workspace" "banger/internal/model" ) @@ -356,6 +359,200 @@ func TestExportVMWorkspace_MultipleChangedFiles(t *testing.T) { } } +// TestPrepareVMWorkspace_ReleasesVMLockDuringGuestIO is a regression +// guard for an earlier design that held the per-VM mutex across SSH +// dial, tar streaming, and remote chmod. A long import could then +// block unrelated lifecycle ops (vm stop / delete / restart) on the +// same VM until it completed. The fix switched to a dedicated +// workspaceLocks set for I/O, with vmLocks held only for the brief +// state-validation phase. This test kicks off a prepare that blocks +// inside the import step and then asserts the VM mutex is acquirable +// while the prepare is mid-flight. +func TestPrepareVMWorkspace_ReleasesVMLockDuringGuestIO(t *testing.T) { + // Not parallel: mutates package-level workspaceInspectRepoFunc / + // workspaceImportFunc seams, which the other prepare-concurrency + // test also swaps. + ctx := context.Background() + + apiSock := filepath.Join(t.TempDir(), "fc.sock") + firecracker := startFakeFirecracker(t, apiSock) + + vm := testVM("lockbox", "image-x", "172.16.0.210") + vm.State = model.VMStateRunning + vm.Runtime.State = model.VMStateRunning + vm.Runtime.PID = firecracker.Process.Pid + vm.Runtime.APISockPath = apiSock + + d := &Daemon{ + store: openDaemonStore(t), + config: model.DaemonConfig{SSHKeyPath: filepath.Join(t.TempDir(), "id_ed25519")}, + logger: slog.New(slog.NewTextHandler(io.Discard, nil)), + } + d.guestWaitForSSH = func(_ context.Context, _, _ string, _ time.Duration) error { return nil } + d.guestDial = func(_ context.Context, _, _ string) (guestSSHClient, error) { + return &exportGuestClient{}, nil + } + upsertDaemonVM(t, ctx, d.store, vm) + + // Replace the seams. InspectRepo returns a trivial spec so the + // real filesystem isn't touched; Import blocks until we say go. + origInspect := workspaceInspectRepoFunc + origImport := workspaceImportFunc + t.Cleanup(func() { + workspaceInspectRepoFunc = origInspect + workspaceImportFunc = origImport + }) + + importStarted := make(chan struct{}) + releaseImport := make(chan struct{}) + workspaceInspectRepoFunc = func(context.Context, string, string, string) (workspace.RepoSpec, error) { + return workspace.RepoSpec{RepoName: "fake", RepoRoot: "/tmp/fake"}, nil + } + workspaceImportFunc = func(context.Context, workspace.GuestClient, workspace.RepoSpec, string, model.WorkspacePrepareMode) error { + close(importStarted) + <-releaseImport + return nil + } + + // Kick off prepare in a goroutine. It will block inside the import. + prepareDone := make(chan error, 1) + go func() { + _, err := d.PrepareVMWorkspace(ctx, api.VMWorkspacePrepareParams{ + IDOrName: vm.Name, + SourcePath: "/tmp/fake", + }) + prepareDone <- err + }() + + // Wait for prepare to reach the guest-I/O phase (past the VM + // mutex) before testing the assertion. + select { + case <-importStarted: + case <-time.After(2 * time.Second): + t.Fatal("import never started; prepare blocked before reaching guest I/O") + } + + // With the fix in place, the VM mutex is free even though the + // import is in flight. Acquiring it must not wait. + acquired := make(chan struct{}) + go func() { + unlock := d.lockVMID(vm.ID) + close(acquired) + unlock() + }() + select { + case <-acquired: + case <-time.After(500 * time.Millisecond): + close(releaseImport) // unblock the goroutine so the test can exit + <-prepareDone + t.Fatal("VM mutex held during guest I/O — lifecycle ops would block behind workspace prepare") + } + + // Now let the import finish and make sure prepare returns. + close(releaseImport) + select { + case err := <-prepareDone: + if err != nil { + t.Fatalf("prepare returned error: %v", err) + } + case <-time.After(2 * time.Second): + t.Fatal("prepare did not return after import unblocked") + } +} + +// TestPrepareVMWorkspace_SerialisesConcurrentPreparesOnSameVM asserts +// the workspaceLocks scope: two concurrent prepares on the same VM do +// NOT interleave, even though they no longer take the core VM mutex. +func TestPrepareVMWorkspace_SerialisesConcurrentPreparesOnSameVM(t *testing.T) { + // Not parallel: see note on ReleasesVMLockDuringGuestIO. + ctx := context.Background() + + apiSock := filepath.Join(t.TempDir(), "fc.sock") + firecracker := startFakeFirecracker(t, apiSock) + + vm := testVM("serialbox", "image-x", "172.16.0.211") + vm.State = model.VMStateRunning + vm.Runtime.State = model.VMStateRunning + vm.Runtime.PID = firecracker.Process.Pid + vm.Runtime.APISockPath = apiSock + + d := &Daemon{ + store: openDaemonStore(t), + config: model.DaemonConfig{SSHKeyPath: filepath.Join(t.TempDir(), "id_ed25519")}, + logger: slog.New(slog.NewTextHandler(io.Discard, nil)), + } + d.guestWaitForSSH = func(_ context.Context, _, _ string, _ time.Duration) error { return nil } + d.guestDial = func(_ context.Context, _, _ string) (guestSSHClient, error) { + return &exportGuestClient{}, nil + } + upsertDaemonVM(t, ctx, d.store, vm) + + origInspect := workspaceInspectRepoFunc + origImport := workspaceImportFunc + t.Cleanup(func() { + workspaceInspectRepoFunc = origInspect + workspaceImportFunc = origImport + }) + + workspaceInspectRepoFunc = func(context.Context, string, string, string) (workspace.RepoSpec, error) { + return workspace.RepoSpec{RepoName: "fake", RepoRoot: "/tmp/fake"}, nil + } + + // Counter of simultaneous Import calls. Should never exceed 1. + var active int32 + var maxObserved int32 + release := make(chan struct{}) + workspaceImportFunc = func(context.Context, workspace.GuestClient, workspace.RepoSpec, string, model.WorkspacePrepareMode) error { + n := atomic.AddInt32(&active, 1) + for { + prev := atomic.LoadInt32(&maxObserved) + if n <= prev || atomic.CompareAndSwapInt32(&maxObserved, prev, n) { + break + } + } + <-release + atomic.AddInt32(&active, -1) + return nil + } + + const n = 3 + done := make(chan error, n) + for i := 0; i < n; i++ { + go func() { + _, err := d.PrepareVMWorkspace(ctx, api.VMWorkspacePrepareParams{ + IDOrName: vm.Name, + SourcePath: "/tmp/fake", + }) + done <- err + }() + } + + // Give goroutines a moment to queue up. + time.Sleep(100 * time.Millisecond) + + if got := atomic.LoadInt32(&active); got != 1 { + close(release) // unblock to avoid hang + for i := 0; i < n; i++ { + <-done + } + t.Fatalf("%d concurrent imports, want exactly 1 (workspace lock should serialise)", got) + } + + // Drain: release imports one at a time. + for i := 0; i < n; i++ { + release <- struct{}{} + } + close(release) + for i := 0; i < n; i++ { + if err := <-done; err != nil { + t.Errorf("prepare #%d error: %v", i, err) + } + } + if got := atomic.LoadInt32(&maxObserved); got != 1 { + t.Fatalf("peak concurrent imports = %d, want 1", got) + } +} + // TestExportVMWorkspace_DoesNotMutateRealIndex is a regression guard // for an earlier design where `git add -A` ran against the guest's // real `.git/index`, leaving staged changes behind after what the user