diff --git a/controlplane/telemetry/internal/geoprobe/pinger.go b/controlplane/telemetry/internal/geoprobe/pinger.go index 868abc7b5..474e1a860 100644 --- a/controlplane/telemetry/internal/geoprobe/pinger.go +++ b/controlplane/telemetry/internal/geoprobe/pinger.go @@ -21,6 +21,8 @@ const ( senderRetryMin = 50 * time.Millisecond ) +type senderFactory func(ctx context.Context, log *slog.Logger, iface string, local, remote *net.UDPAddr) (twamplight.Sender, error) + type PingerConfig struct { Logger *slog.Logger ProbeTimeout time.Duration @@ -32,21 +34,21 @@ type PingerConfig struct { type Pinger struct { log *slog.Logger cfg *PingerConfig - senders map[string]*senderEntry - sendersMu sync.Mutex + targets map[string]ProbeAddress + targetsMu sync.Mutex + newSender senderFactory } -type senderEntry struct { - addr ProbeAddress - sender twamplight.Sender - warmupSender twamplight.Sender +func defaultSenderFactory(ctx context.Context, log *slog.Logger, iface string, local, remote *net.UDPAddr) (twamplight.Sender, error) { + return newSenderWithRetry(ctx, log, iface, local, remote) } func NewPinger(cfg *PingerConfig) *Pinger { return &Pinger{ - log: cfg.Logger, - cfg: cfg, - senders: make(map[string]*senderEntry), + log: cfg.Logger, + cfg: cfg, + targets: make(map[string]ProbeAddress), + newSender: defaultSenderFactory, } } @@ -80,13 +82,12 @@ func newSenderWithRetry(ctx context.Context, log *slog.Logger, iface string, loc return nil, lastErr } -func (p *Pinger) AddProbe(ctx context.Context, addr ProbeAddress) error { - p.sendersMu.Lock() - defer p.sendersMu.Unlock() +func (p *Pinger) AddProbe(_ context.Context, addr ProbeAddress) error { + p.targetsMu.Lock() + defer p.targetsMu.Unlock() key := addr.String() - - if _, exists := p.senders[key]; exists { + if _, exists := p.targets[key]; exists { p.log.Debug("Probe already exists", "probe", key) return nil } @@ -95,67 +96,87 @@ func (p *Pinger) AddProbe(ctx context.Context, addr ProbeAddress) error { return fmt.Errorf("invalid probe address %s: %w", key, err) } - resolvedAddr := &net.UDPAddr{IP: net.ParseIP(addr.Host), Port: int(addr.TWAMPPort)} + p.targets[key] = addr + p.log.Info("Added probe", "probe", key) + return nil +} - sourceAddr := &net.UDPAddr{ - IP: net.IPv4zero, - Port: 0, - } +func (p *Pinger) RemoveProbe(addr ProbeAddress) error { + p.targetsMu.Lock() + defer p.targetsMu.Unlock() - iface := "" - if p.cfg.ManagementNamespace != "" { - iface = p.cfg.ManagementNamespace + key := addr.String() + if _, exists := p.targets[key]; !exists { + p.log.Warn("Probe not found for removal", "probe", key) + return nil } - sender, err := newSenderWithRetry(ctx, p.log, iface, sourceAddr, resolvedAddr) + delete(p.targets, key) + p.log.Info("Removed probe", "probe", key) + return nil +} + +func (p *Pinger) createSenderPair(ctx context.Context, addr ProbeAddress) (sender, warmup twamplight.Sender, err error) { + resolvedAddr := &net.UDPAddr{IP: net.ParseIP(addr.Host), Port: int(addr.TWAMPPort)} + iface := p.cfg.ManagementNamespace + + sender, err = p.newSender(ctx, p.log, iface, &net.UDPAddr{IP: net.IPv4zero, Port: 0}, resolvedAddr) if err != nil { - return fmt.Errorf("failed to create TWAMP sender for %s: %w", addr.String(), err) + return nil, nil, fmt.Errorf("create sender for %s: %w", addr.String(), err) } - warmupSourceAddr := &net.UDPAddr{IP: net.IPv4zero, Port: 0} - warmupSender, err := newSenderWithRetry(ctx, p.log, iface, warmupSourceAddr, resolvedAddr) + warmup, err = p.newSender(ctx, p.log, iface, &net.UDPAddr{IP: net.IPv4zero, Port: 0}, resolvedAddr) if err != nil { sender.Close() - return fmt.Errorf("failed to create warmup TWAMP sender for %s: %w", addr.String(), err) - } - - p.senders[key] = &senderEntry{ - addr: addr, - sender: sender, - warmupSender: warmupSender, + return nil, nil, fmt.Errorf("create warmup sender for %s: %w", addr.String(), err) } - p.log.Info("Added probe", "probe", key, "resolved", resolvedAddr.String()) - return nil + return sender, warmup, nil } -func (p *Pinger) RemoveProbe(addr ProbeAddress) error { - p.sendersMu.Lock() - defer p.sendersMu.Unlock() +func (p *Pinger) probeTarget(ctx context.Context, addr ProbeAddress) (time.Duration, bool) { + sender, warmup, err := p.createSenderPair(ctx, addr) + if err != nil { + p.log.Warn("Failed to create senders", "probe", addr.String(), "error", err) + return 0, false + } + defer sender.Close() + defer warmup.Close() - key := addr.String() + probeCtx, cancel := context.WithTimeout(ctx, p.cfg.ProbeTimeout) + defer cancel() - entry, exists := p.senders[key] - if !exists { - p.log.Warn("Probe not found for removal", "probe", key) - return nil + type probeResult struct { + rtt time.Duration + err error } + ch := make(chan probeResult, 2) + go func() { + rtt, err := warmup.Probe(probeCtx) + ch <- probeResult{rtt, err} + }() + go func() { + time.Sleep(DefaultWarmupDelay) + rtt, err := sender.Probe(probeCtx) + ch <- probeResult{rtt, err} + }() - if err := entry.sender.Close(); err != nil { - p.log.Warn("Failed to close sender", "probe", key, "error", err) - } - if err := entry.warmupSender.Close(); err != nil { - p.log.Warn("Failed to close warmup sender", "probe", key, "error", err) + var bestRTT time.Duration + ok := false + for range 2 { + r := <-ch + if r.err == nil && (!ok || r.rtt < bestRTT) { + bestRTT = r.rtt + ok = true + } } - delete(p.senders, key) - p.log.Info("Removed probe", "probe", key) - return nil + return bestRTT, ok } func (p *Pinger) probeWorker( ctx context.Context, - batch []*senderEntry, + batch []ProbeAddress, staggerDelay time.Duration, results map[ProbeAddress]uint64, resultsMu *sync.Mutex, @@ -163,52 +184,22 @@ func (p *Pinger) probeWorker( ) { defer wg.Done() - for i, entry := range batch { + for i, addr := range batch { select { case <-ctx.Done(): return default: } - probeCtx, cancel := context.WithTimeout(ctx, p.cfg.ProbeTimeout) - - // Send a warmup probe first to wake the reflector's thread, then - // send the measurement probe after a short delay. Both run on - // separate sockets so neither blocks the other. We take the min RTT. - type probeResult struct { - rtt time.Duration - err error - } - ch := make(chan probeResult, 2) - go func() { - rtt, err := entry.warmupSender.Probe(probeCtx) - ch <- probeResult{rtt, err} - }() - go func() { - time.Sleep(DefaultWarmupDelay) - rtt, err := entry.sender.Probe(probeCtx) - ch <- probeResult{rtt, err} - }() - - var bestRTT time.Duration - ok := false - for range 2 { - r := <-ch - if r.err == nil && (!ok || r.rtt < bestRTT) { - bestRTT = r.rtt - ok = true - } - } - cancel() + rtt, ok := p.probeTarget(ctx, addr) if ok { resultsMu.Lock() - results[entry.addr] = uint64(bestRTT.Nanoseconds()) + results[addr] = uint64(rtt.Nanoseconds()) resultsMu.Unlock() - - p.log.Debug("Probe succeeded", "probe", entry.addr.String(), "rtt", bestRTT) + p.log.Debug("Probe succeeded", "probe", addr.String(), "rtt", rtt) } else { - p.log.Debug("Probe failed", "probe", entry.addr.String()) + p.log.Debug("Probe failed", "probe", addr.String()) } if i < len(batch)-1 { @@ -223,76 +214,45 @@ func (p *Pinger) probeWorker( // MeasureOne measures a single probe and returns the best RTT in nanoseconds. func (p *Pinger) MeasureOne(ctx context.Context, addr ProbeAddress) (uint64, bool) { - p.sendersMu.Lock() - entry, exists := p.senders[addr.String()] - p.sendersMu.Unlock() + p.targetsMu.Lock() + _, exists := p.targets[addr.String()] + p.targetsMu.Unlock() if !exists { p.log.Warn("MeasureOne called for unknown probe", "probe", addr.String()) return 0, false } - probeCtx, cancel := context.WithTimeout(ctx, p.cfg.ProbeTimeout) - defer cancel() - - type probeResult struct { - rtt time.Duration - err error - } - ch := make(chan probeResult, 2) - go func() { - rtt, err := entry.warmupSender.Probe(probeCtx) - ch <- probeResult{rtt, err} - }() - go func() { - time.Sleep(DefaultWarmupDelay) - rtt, err := entry.sender.Probe(probeCtx) - ch <- probeResult{rtt, err} - }() - - var bestRTT time.Duration - ok := false - for range 2 { - r := <-ch - if r.err == nil && (!ok || r.rtt < bestRTT) { - bestRTT = r.rtt - ok = true - } - } - + rtt, ok := p.probeTarget(ctx, addr) if ok { - p.log.Debug("MeasureOne succeeded", "probe", addr.String(), "rtt", bestRTT) - } else { - p.log.Debug("MeasureOne failed", "probe", addr.String()) - } - - if !ok { - return 0, false + p.log.Debug("MeasureOne succeeded", "probe", addr.String(), "rtt", rtt) + return uint64(rtt.Nanoseconds()), true } - return uint64(bestRTT.Nanoseconds()), true + p.log.Debug("MeasureOne failed", "probe", addr.String()) + return 0, false } func (p *Pinger) MeasureAll(ctx context.Context) (map[ProbeAddress]uint64, error) { - p.sendersMu.Lock() - sendersCopy := make([]*senderEntry, 0, len(p.senders)) - for _, entry := range p.senders { - sendersCopy = append(sendersCopy, entry) + p.targetsMu.Lock() + targetsCopy := make([]ProbeAddress, 0, len(p.targets)) + for _, addr := range p.targets { + targetsCopy = append(targetsCopy, addr) } - p.sendersMu.Unlock() + p.targetsMu.Unlock() - if len(sendersCopy) == 0 { + if len(targetsCopy) == 0 { return make(map[ProbeAddress]uint64), nil } - totalProbes := len(sendersCopy) + totalProbes := len(targetsCopy) results := make(map[ProbeAddress]uint64) resultsMu := sync.Mutex{} var wg sync.WaitGroup - numWorkers := min(MaxWorkers, (len(sendersCopy)+ProbesPerWorker-1)/ProbesPerWorker) + numWorkers := min(MaxWorkers, (len(targetsCopy)+ProbesPerWorker-1)/ProbesPerWorker) if numWorkers == 0 { numWorkers = 1 } - batchSize := (len(sendersCopy) + numWorkers - 1) / numWorkers + batchSize := (len(targetsCopy) + numWorkers - 1) / numWorkers staggerDelay := p.cfg.StaggerDelay if staggerDelay == 0 { @@ -301,12 +261,12 @@ func (p *Pinger) MeasureAll(ctx context.Context) (map[ProbeAddress]uint64, error for i := 0; i < numWorkers; i++ { start := i * batchSize - end := min(start+batchSize, len(sendersCopy)) - if start >= len(sendersCopy) { + end := min(start+batchSize, len(targetsCopy)) + if start >= len(targetsCopy) { break } - batch := sendersCopy[start:end] + batch := targetsCopy[start:end] wg.Add(1) go p.probeWorker(ctx, batch, staggerDelay, results, &resultsMu, &wg) } @@ -325,23 +285,10 @@ func (p *Pinger) MeasureAll(ctx context.Context) (map[ProbeAddress]uint64, error } func (p *Pinger) Close() error { - p.sendersMu.Lock() - defer p.sendersMu.Unlock() + p.targetsMu.Lock() + defer p.targetsMu.Unlock() - var lastErr error - for key, entry := range p.senders { - if err := entry.sender.Close(); err != nil { - p.log.Warn("Failed to close sender", "probe", key, "error", err) - lastErr = err - } - if err := entry.warmupSender.Close(); err != nil { - p.log.Warn("Failed to close warmup sender", "probe", key, "error", err) - lastErr = err - } - } - - p.senders = make(map[string]*senderEntry) + p.targets = make(map[string]ProbeAddress) p.log.Info("Closed all probes") - - return lastErr + return nil } diff --git a/controlplane/telemetry/internal/geoprobe/pinger_test.go b/controlplane/telemetry/internal/geoprobe/pinger_test.go index e7e68b337..9b603039e 100644 --- a/controlplane/telemetry/internal/geoprobe/pinger_test.go +++ b/controlplane/telemetry/internal/geoprobe/pinger_test.go @@ -9,6 +9,7 @@ import ( "testing" "time" + twamplight "github.com/malbeclabs/doublezero/tools/twamp/pkg/light" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -28,6 +29,17 @@ func (m *mockSender) LocalAddr() *net.UDPAddr { return &net.UDPAddr{IP: net.IPv4zero, Port: 0} } +func mockFactory(sender, warmup *mockSender) senderFactory { + call := 0 + return func(_ context.Context, _ *slog.Logger, _ string, _, _ *net.UDPAddr) (twamplight.Sender, error) { + call++ + if call%2 == 1 { + return sender, nil + } + return warmup, nil + } +} + func TestNewPinger(t *testing.T) { t.Parallel() @@ -44,8 +56,8 @@ func TestNewPinger(t *testing.T) { require.NotNil(t, pinger) assert.NotNil(t, pinger.log) assert.NotNil(t, pinger.cfg) - assert.NotNil(t, pinger.senders) - assert.Empty(t, pinger.senders) + assert.NotNil(t, pinger.targets) + assert.Empty(t, pinger.targets) } func TestPinger_AddProbe(t *testing.T) { @@ -71,9 +83,9 @@ func TestPinger_AddProbe(t *testing.T) { err := pinger.AddProbe(ctx, addr) require.NoError(t, err) - pinger.sendersMu.Lock() - _, exists := pinger.senders[addr.String()] - pinger.sendersMu.Unlock() + pinger.targetsMu.Lock() + _, exists := pinger.targets[addr.String()] + pinger.targetsMu.Unlock() assert.True(t, exists, "probe should exist after AddProbe") } @@ -101,20 +113,14 @@ func TestPinger_AddProbe_Duplicate(t *testing.T) { err := pinger.AddProbe(ctx, addr) require.NoError(t, err) - pinger.sendersMu.Lock() - firstSender := pinger.senders[addr.String()].sender - pinger.sendersMu.Unlock() - err = pinger.AddProbe(ctx, addr) require.NoError(t, err) - pinger.sendersMu.Lock() - secondSender := pinger.senders[addr.String()].sender - count := len(pinger.senders) - pinger.sendersMu.Unlock() + pinger.targetsMu.Lock() + count := len(pinger.targets) + pinger.targetsMu.Unlock() - assert.Equal(t, 1, count, "should only have one sender after duplicate AddProbe") - assert.Equal(t, firstSender, secondSender, "should reuse existing sender on duplicate") + assert.Equal(t, 1, count, "should only have one target after duplicate AddProbe") } func TestPinger_AddProbe_InvalidHost(t *testing.T) { @@ -140,11 +146,11 @@ func TestPinger_AddProbe_InvalidHost(t *testing.T) { err := pinger.AddProbe(ctx, addr) assert.Error(t, err, "should fail with invalid IP address") - pinger.sendersMu.Lock() - count := len(pinger.senders) - pinger.sendersMu.Unlock() + pinger.targetsMu.Lock() + count := len(pinger.targets) + pinger.targetsMu.Unlock() - assert.Equal(t, 0, count, "should not add sender for invalid IP") + assert.Equal(t, 0, count, "should not add target for invalid IP") } func TestPinger_RemoveProbe(t *testing.T) { @@ -173,9 +179,9 @@ func TestPinger_RemoveProbe(t *testing.T) { err = pinger.RemoveProbe(addr) require.NoError(t, err) - pinger.sendersMu.Lock() - _, exists := pinger.senders[addr.String()] - pinger.sendersMu.Unlock() + pinger.targetsMu.Lock() + _, exists := pinger.targets[addr.String()] + pinger.targetsMu.Unlock() assert.False(t, exists, "probe should not exist after RemoveProbe") } @@ -276,11 +282,11 @@ func TestPinger_Close(t *testing.T) { err = pinger.Close() assert.NoError(t, err) - pinger.sendersMu.Lock() - count := len(pinger.senders) - pinger.sendersMu.Unlock() + pinger.targetsMu.Lock() + count := len(pinger.targets) + pinger.targetsMu.Unlock() - assert.Equal(t, 0, count, "all senders should be removed after Close") + assert.Equal(t, 0, count, "all targets should be removed after Close") } func TestPinger_Concurrent(t *testing.T) { @@ -324,9 +330,9 @@ func TestPinger_Concurrent(t *testing.T) { wg.Wait() - pinger.sendersMu.Lock() - count := len(pinger.senders) - pinger.sendersMu.Unlock() + pinger.targetsMu.Lock() + count := len(pinger.targets) + pinger.targetsMu.Unlock() assert.Equal(t, 0, count, "all probes should be removed after concurrent operations") } @@ -354,25 +360,25 @@ func TestPinger_AddRemoveSequential(t *testing.T) { err := pinger.AddProbe(ctx, addr) require.NoError(t, err) - pinger.sendersMu.Lock() - count1 := len(pinger.senders) - pinger.sendersMu.Unlock() + pinger.targetsMu.Lock() + count1 := len(pinger.targets) + pinger.targetsMu.Unlock() assert.Equal(t, 1, count1) err = pinger.RemoveProbe(addr) require.NoError(t, err) - pinger.sendersMu.Lock() - count2 := len(pinger.senders) - pinger.sendersMu.Unlock() + pinger.targetsMu.Lock() + count2 := len(pinger.targets) + pinger.targetsMu.Unlock() assert.Equal(t, 0, count2) err = pinger.AddProbe(ctx, addr) require.NoError(t, err) - pinger.sendersMu.Lock() - count3 := len(pinger.senders) - pinger.sendersMu.Unlock() + pinger.targetsMu.Lock() + count3 := len(pinger.targets) + pinger.targetsMu.Unlock() assert.Equal(t, 1, count3) } @@ -579,13 +585,14 @@ func TestPinger_DualProbe_MinRTT(t *testing.T) { } pinger := NewPinger(cfg) + pinger.newSender = mockFactory( + &mockSender{rtt: 10 * time.Millisecond}, + &mockSender{rtt: 50 * time.Millisecond}, + ) addr := ProbeAddress{Host: "192.0.2.1", Port: 8923, TWAMPPort: 8925} - pinger.senders[addr.String()] = &senderEntry{ - addr: addr, - sender: &mockSender{rtt: 10 * time.Millisecond}, - warmupSender: &mockSender{rtt: 50 * time.Millisecond}, - } + err := pinger.AddProbe(context.Background(), addr) + require.NoError(t, err) results, err := pinger.MeasureAll(context.Background()) require.NoError(t, err) @@ -606,13 +613,14 @@ func TestPinger_DualProbe_OneFailsUsesOther(t *testing.T) { } pinger := NewPinger(cfg) + pinger.newSender = mockFactory( + &mockSender{err: context.DeadlineExceeded}, + &mockSender{rtt: 20 * time.Millisecond}, + ) addr := ProbeAddress{Host: "192.0.2.1", Port: 8923, TWAMPPort: 8925} - pinger.senders[addr.String()] = &senderEntry{ - addr: addr, - sender: &mockSender{err: context.DeadlineExceeded}, - warmupSender: &mockSender{rtt: 20 * time.Millisecond}, - } + err := pinger.AddProbe(context.Background(), addr) + require.NoError(t, err) results, err := pinger.MeasureAll(context.Background()) require.NoError(t, err) @@ -633,13 +641,14 @@ func TestPinger_DualProbe_BothFail(t *testing.T) { } pinger := NewPinger(cfg) + pinger.newSender = mockFactory( + &mockSender{err: context.DeadlineExceeded}, + &mockSender{err: context.DeadlineExceeded}, + ) addr := ProbeAddress{Host: "192.0.2.1", Port: 8923, TWAMPPort: 8925} - pinger.senders[addr.String()] = &senderEntry{ - addr: addr, - sender: &mockSender{err: context.DeadlineExceeded}, - warmupSender: &mockSender{err: context.DeadlineExceeded}, - } + err := pinger.AddProbe(context.Background(), addr) + require.NoError(t, err) results, err := pinger.MeasureAll(context.Background()) require.NoError(t, err) @@ -675,12 +684,14 @@ func TestPinger_MeasureOne_Success(t *testing.T) { StaggerDelay: 1 * time.Millisecond, }) + pinger.newSender = mockFactory( + &mockSender{rtt: 10 * time.Millisecond}, + &mockSender{rtt: 50 * time.Millisecond}, + ) + addr := ProbeAddress{Host: "192.0.2.1", Port: 8923, TWAMPPort: 8925} - pinger.senders[addr.String()] = &senderEntry{ - addr: addr, - sender: &mockSender{rtt: 10 * time.Millisecond}, - warmupSender: &mockSender{rtt: 50 * time.Millisecond}, - } + err := pinger.AddProbe(context.Background(), addr) + require.NoError(t, err) rtt, ok := pinger.MeasureOne(context.Background(), addr) assert.True(t, ok) @@ -699,12 +710,14 @@ func TestPinger_MeasureOne_BothFail(t *testing.T) { StaggerDelay: 1 * time.Millisecond, }) + pinger.newSender = mockFactory( + &mockSender{err: context.DeadlineExceeded}, + &mockSender{err: context.DeadlineExceeded}, + ) + addr := ProbeAddress{Host: "192.0.2.1", Port: 8923, TWAMPPort: 8925} - pinger.senders[addr.String()] = &senderEntry{ - addr: addr, - sender: &mockSender{err: context.DeadlineExceeded}, - warmupSender: &mockSender{err: context.DeadlineExceeded}, - } + err := pinger.AddProbe(context.Background(), addr) + require.NoError(t, err) rtt, ok := pinger.MeasureOne(context.Background(), addr) assert.False(t, ok)