banger/internal/daemon/tap_pool.go
Thales Maciel c352aba50a
daemon: parallelize tap-pool warmup
Pool warmup ran createTap calls sequentially (one per loop iteration),
so warming N taps cold took N times the per-tap cost. Each releaseTap
also fired its own ensureTapPool goroutine, racing on n.tapPool.next.

Reserve a batch of names under the lock, then run up to
maxConcurrentTapWarmup createTap RPCs in parallel — root helper already
handles each connection in its own goroutine, so multiple in-flight
priv.create_tap requests don't contend at the wire level. Add a
warming flag to dedupe concurrent ensureTapPool invocations triggered
by parallel releases.

Bail-on-first-error semantics preserved: if every goroutine in a
batch fails (e.g. host out of taps, kernel limit), the loop exits
rather than burning monotonic indices forever.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-02 15:54:07 -03:00

181 lines
4.4 KiB
Go

package daemon
import (
"context"
"fmt"
"strconv"
"strings"
"sync"
"sync/atomic"
)
const tapPoolPrefix = "tap-pool-"
// tapPool owns the idle TAP interface cache plus the monotonic index used to
// name new pool entries. All access goes through mu.
type tapPool struct {
mu sync.Mutex
entries []string
next int
warming bool
}
// maxConcurrentTapWarmup caps the number of `priv.create_tap` RPCs the
// warmup loop runs in parallel. Each tap creation is ~4 root-helper
// shell-outs serialized within one RPC handler; running too many at
// once just contends on netlink. 8 is the production sweet spot for
// SMOKE_JOBS=8.
const maxConcurrentTapWarmup = 8
// initializeTapPool seeds the monotonic pool index from the set of
// tap names already in use by running/stopped VMs, so newly warmed
// pool entries don't collide with existing ones. Callers (Daemon.Open)
// enumerate used taps from the handle cache and pass them in.
func (n *HostNetwork) initializeTapPool(usedTaps []string) {
if n.config.TapPoolSize <= 0 {
return
}
next := 0
for _, tapName := range usedTaps {
if index, ok := parseTapPoolIndex(tapName); ok && index >= next {
next = index + 1
}
}
n.tapPool.mu.Lock()
n.tapPool.next = next
n.tapPool.mu.Unlock()
}
func (n *HostNetwork) ensureTapPool(ctx context.Context) {
if n.config.TapPoolSize <= 0 {
return
}
// Dedupe concurrent warmup invocations. Releases trigger a fresh
// ensureTapPool in a goroutine; without this, N parallel releases
// would each spin up their own warmup loop racing on n.tapPool.next.
n.tapPool.mu.Lock()
if n.tapPool.warming {
n.tapPool.mu.Unlock()
return
}
n.tapPool.warming = true
n.tapPool.mu.Unlock()
defer func() {
n.tapPool.mu.Lock()
n.tapPool.warming = false
n.tapPool.mu.Unlock()
}()
for {
select {
case <-ctx.Done():
return
case <-n.closing:
return
default:
}
n.tapPool.mu.Lock()
deficit := n.config.TapPoolSize - len(n.tapPool.entries)
if deficit <= 0 {
n.tapPool.mu.Unlock()
return
}
batch := deficit
if batch > maxConcurrentTapWarmup {
batch = maxConcurrentTapWarmup
}
// Reserve names up front so concurrent goroutines can't collide
// on n.tapPool.next.
names := make([]string, batch)
for i := range names {
names[i] = fmt.Sprintf("%s%d", tapPoolPrefix, n.tapPool.next)
n.tapPool.next++
}
n.tapPool.mu.Unlock()
var (
wg sync.WaitGroup
progress atomic.Int32
)
for _, tapName := range names {
wg.Add(1)
go func(tapName string) {
defer wg.Done()
if err := n.createTap(ctx, tapName); err != nil {
if n.logger != nil {
n.logger.Warn("tap pool warmup failed", "tap_device", tapName, "error", err.Error())
}
return
}
n.tapPool.mu.Lock()
n.tapPool.entries = append(n.tapPool.entries, tapName)
n.tapPool.mu.Unlock()
progress.Add(1)
if n.logger != nil {
n.logger.Debug("tap added to idle pool", "tap_device", tapName)
}
}(tapName)
}
wg.Wait()
// Whole batch failed → bail rather than burn names indefinitely
// (the original sequential loop bailed on first error too).
if progress.Load() == 0 {
return
}
}
}
func (n *HostNetwork) acquireTap(ctx context.Context, fallbackName string) (string, error) {
n.tapPool.mu.Lock()
if count := len(n.tapPool.entries); count > 0 {
tapName := n.tapPool.entries[count-1]
n.tapPool.entries = n.tapPool.entries[:count-1]
n.tapPool.mu.Unlock()
return tapName, nil
}
n.tapPool.mu.Unlock()
if err := n.createTap(ctx, fallbackName); err != nil {
return "", err
}
return fallbackName, nil
}
func (n *HostNetwork) releaseTap(ctx context.Context, tapName string) error {
tapName = strings.TrimSpace(tapName)
if tapName == "" {
return nil
}
if isTapPoolName(tapName) {
n.tapPool.mu.Lock()
if len(n.tapPool.entries) < n.config.TapPoolSize {
n.tapPool.entries = append(n.tapPool.entries, tapName)
n.tapPool.mu.Unlock()
return nil
}
n.tapPool.mu.Unlock()
}
err := n.privOps().DeleteTap(ctx, tapName)
if err == nil {
go n.ensureTapPool(context.Background())
}
return err
}
func isTapPoolName(tapName string) bool {
return strings.HasPrefix(strings.TrimSpace(tapName), tapPoolPrefix)
}
func parseTapPoolIndex(tapName string) (int, bool) {
if !isTapPoolName(tapName) {
return 0, false
}
value, err := strconv.Atoi(strings.TrimPrefix(strings.TrimSpace(tapName), tapPoolPrefix))
if err != nil {
return 0, false
}
return value, true
}