daemon: parallelize tap-pool warmup
Pool warmup ran createTap calls sequentially (one per loop iteration), so warming N taps cold took N times the per-tap cost. Each releaseTap also fired its own ensureTapPool goroutine, racing on n.tapPool.next. Reserve a batch of names under the lock, then run up to maxConcurrentTapWarmup createTap RPCs in parallel — root helper already handles each connection in its own goroutine, so multiple in-flight priv.create_tap requests don't contend at the wire level. Add a warming flag to dedupe concurrent ensureTapPool invocations triggered by parallel releases. Bail-on-first-error semantics preserved: if every goroutine in a batch fails (e.g. host out of taps, kernel limit), the loop exits rather than burning monotonic indices forever. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
71e073ac49
commit
c352aba50a
1 changed files with 67 additions and 15 deletions
|
|
@ -6,6 +6,7 @@ import (
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
)
|
)
|
||||||
|
|
||||||
const tapPoolPrefix = "tap-pool-"
|
const tapPoolPrefix = "tap-pool-"
|
||||||
|
|
@ -16,8 +17,16 @@ type tapPool struct {
|
||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
entries []string
|
entries []string
|
||||||
next int
|
next int
|
||||||
|
warming bool
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// maxConcurrentTapWarmup caps the number of `priv.create_tap` RPCs the
|
||||||
|
// warmup loop runs in parallel. Each tap creation is ~4 root-helper
|
||||||
|
// shell-outs serialized within one RPC handler; running too many at
|
||||||
|
// once just contends on netlink. 8 is the production sweet spot for
|
||||||
|
// SMOKE_JOBS=8.
|
||||||
|
const maxConcurrentTapWarmup = 8
|
||||||
|
|
||||||
// initializeTapPool seeds the monotonic pool index from the set of
|
// initializeTapPool seeds the monotonic pool index from the set of
|
||||||
// tap names already in use by running/stopped VMs, so newly warmed
|
// tap names already in use by running/stopped VMs, so newly warmed
|
||||||
// pool entries don't collide with existing ones. Callers (Daemon.Open)
|
// pool entries don't collide with existing ones. Callers (Daemon.Open)
|
||||||
|
|
@ -41,6 +50,23 @@ func (n *HostNetwork) ensureTapPool(ctx context.Context) {
|
||||||
if n.config.TapPoolSize <= 0 {
|
if n.config.TapPoolSize <= 0 {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Dedupe concurrent warmup invocations. Releases trigger a fresh
|
||||||
|
// ensureTapPool in a goroutine; without this, N parallel releases
|
||||||
|
// would each spin up their own warmup loop racing on n.tapPool.next.
|
||||||
|
n.tapPool.mu.Lock()
|
||||||
|
if n.tapPool.warming {
|
||||||
|
n.tapPool.mu.Unlock()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
n.tapPool.warming = true
|
||||||
|
n.tapPool.mu.Unlock()
|
||||||
|
defer func() {
|
||||||
|
n.tapPool.mu.Lock()
|
||||||
|
n.tapPool.warming = false
|
||||||
|
n.tapPool.mu.Unlock()
|
||||||
|
}()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
|
|
@ -51,27 +77,53 @@ func (n *HostNetwork) ensureTapPool(ctx context.Context) {
|
||||||
}
|
}
|
||||||
|
|
||||||
n.tapPool.mu.Lock()
|
n.tapPool.mu.Lock()
|
||||||
if len(n.tapPool.entries) >= n.config.TapPoolSize {
|
deficit := n.config.TapPoolSize - len(n.tapPool.entries)
|
||||||
|
if deficit <= 0 {
|
||||||
n.tapPool.mu.Unlock()
|
n.tapPool.mu.Unlock()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
tapName := fmt.Sprintf("%s%d", tapPoolPrefix, n.tapPool.next)
|
batch := deficit
|
||||||
n.tapPool.next++
|
if batch > maxConcurrentTapWarmup {
|
||||||
n.tapPool.mu.Unlock()
|
batch = maxConcurrentTapWarmup
|
||||||
|
}
|
||||||
if err := n.createTap(ctx, tapName); err != nil {
|
// Reserve names up front so concurrent goroutines can't collide
|
||||||
if n.logger != nil {
|
// on n.tapPool.next.
|
||||||
n.logger.Warn("tap pool warmup failed", "tap_device", tapName, "error", err.Error())
|
names := make([]string, batch)
|
||||||
}
|
for i := range names {
|
||||||
return
|
names[i] = fmt.Sprintf("%s%d", tapPoolPrefix, n.tapPool.next)
|
||||||
|
n.tapPool.next++
|
||||||
}
|
}
|
||||||
|
|
||||||
n.tapPool.mu.Lock()
|
|
||||||
n.tapPool.entries = append(n.tapPool.entries, tapName)
|
|
||||||
n.tapPool.mu.Unlock()
|
n.tapPool.mu.Unlock()
|
||||||
|
|
||||||
if n.logger != nil {
|
var (
|
||||||
n.logger.Debug("tap added to idle pool", "tap_device", tapName)
|
wg sync.WaitGroup
|
||||||
|
progress atomic.Int32
|
||||||
|
)
|
||||||
|
for _, tapName := range names {
|
||||||
|
wg.Add(1)
|
||||||
|
go func(tapName string) {
|
||||||
|
defer wg.Done()
|
||||||
|
if err := n.createTap(ctx, tapName); err != nil {
|
||||||
|
if n.logger != nil {
|
||||||
|
n.logger.Warn("tap pool warmup failed", "tap_device", tapName, "error", err.Error())
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
n.tapPool.mu.Lock()
|
||||||
|
n.tapPool.entries = append(n.tapPool.entries, tapName)
|
||||||
|
n.tapPool.mu.Unlock()
|
||||||
|
progress.Add(1)
|
||||||
|
if n.logger != nil {
|
||||||
|
n.logger.Debug("tap added to idle pool", "tap_device", tapName)
|
||||||
|
}
|
||||||
|
}(tapName)
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
// Whole batch failed → bail rather than burn names indefinitely
|
||||||
|
// (the original sequential loop bailed on first error too).
|
||||||
|
if progress.Load() == 0 {
|
||||||
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue