Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
257 changes: 102 additions & 155 deletions controlplane/telemetry/internal/geoprobe/pinger.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ const (
senderRetryMin = 50 * time.Millisecond
)

type senderFactory func(ctx context.Context, log *slog.Logger, iface string, local, remote *net.UDPAddr) (twamplight.Sender, error)

type PingerConfig struct {
Logger *slog.Logger
ProbeTimeout time.Duration
Expand All @@ -32,21 +34,21 @@ type PingerConfig struct {
type Pinger struct {
log *slog.Logger
cfg *PingerConfig
senders map[string]*senderEntry
sendersMu sync.Mutex
targets map[string]ProbeAddress
targetsMu sync.Mutex
newSender senderFactory
}

type senderEntry struct {
addr ProbeAddress
sender twamplight.Sender
warmupSender twamplight.Sender
func defaultSenderFactory(ctx context.Context, log *slog.Logger, iface string, local, remote *net.UDPAddr) (twamplight.Sender, error) {
return newSenderWithRetry(ctx, log, iface, local, remote)
}

func NewPinger(cfg *PingerConfig) *Pinger {
return &Pinger{
log: cfg.Logger,
cfg: cfg,
senders: make(map[string]*senderEntry),
log: cfg.Logger,
cfg: cfg,
targets: make(map[string]ProbeAddress),
newSender: defaultSenderFactory,
}
}

Expand Down Expand Up @@ -80,13 +82,12 @@ func newSenderWithRetry(ctx context.Context, log *slog.Logger, iface string, loc
return nil, lastErr
}

func (p *Pinger) AddProbe(ctx context.Context, addr ProbeAddress) error {
p.sendersMu.Lock()
defer p.sendersMu.Unlock()
func (p *Pinger) AddProbe(_ context.Context, addr ProbeAddress) error {
p.targetsMu.Lock()
defer p.targetsMu.Unlock()

key := addr.String()

if _, exists := p.senders[key]; exists {
if _, exists := p.targets[key]; exists {
p.log.Debug("Probe already exists", "probe", key)
return nil
}
Expand All @@ -95,120 +96,110 @@ func (p *Pinger) AddProbe(ctx context.Context, addr ProbeAddress) error {
return fmt.Errorf("invalid probe address %s: %w", key, err)
}

resolvedAddr := &net.UDPAddr{IP: net.ParseIP(addr.Host), Port: int(addr.TWAMPPort)}
p.targets[key] = addr
p.log.Info("Added probe", "probe", key)
return nil
}

sourceAddr := &net.UDPAddr{
IP: net.IPv4zero,
Port: 0,
}
func (p *Pinger) RemoveProbe(addr ProbeAddress) error {
p.targetsMu.Lock()
defer p.targetsMu.Unlock()

iface := ""
if p.cfg.ManagementNamespace != "" {
iface = p.cfg.ManagementNamespace
key := addr.String()
if _, exists := p.targets[key]; !exists {
p.log.Warn("Probe not found for removal", "probe", key)
return nil
}

sender, err := newSenderWithRetry(ctx, p.log, iface, sourceAddr, resolvedAddr)
delete(p.targets, key)
p.log.Info("Removed probe", "probe", key)
return nil
}

func (p *Pinger) createSenderPair(ctx context.Context, addr ProbeAddress) (sender, warmup twamplight.Sender, err error) {
resolvedAddr := &net.UDPAddr{IP: net.ParseIP(addr.Host), Port: int(addr.TWAMPPort)}
iface := p.cfg.ManagementNamespace

sender, err = p.newSender(ctx, p.log, iface, &net.UDPAddr{IP: net.IPv4zero, Port: 0}, resolvedAddr)
if err != nil {
return fmt.Errorf("failed to create TWAMP sender for %s: %w", addr.String(), err)
return nil, nil, fmt.Errorf("create sender for %s: %w", addr.String(), err)
}

warmupSourceAddr := &net.UDPAddr{IP: net.IPv4zero, Port: 0}
warmupSender, err := newSenderWithRetry(ctx, p.log, iface, warmupSourceAddr, resolvedAddr)
warmup, err = p.newSender(ctx, p.log, iface, &net.UDPAddr{IP: net.IPv4zero, Port: 0}, resolvedAddr)
if err != nil {
sender.Close()
return fmt.Errorf("failed to create warmup TWAMP sender for %s: %w", addr.String(), err)
}

p.senders[key] = &senderEntry{
addr: addr,
sender: sender,
warmupSender: warmupSender,
return nil, nil, fmt.Errorf("create warmup sender for %s: %w", addr.String(), err)
}

p.log.Info("Added probe", "probe", key, "resolved", resolvedAddr.String())
return nil
return sender, warmup, nil
}

func (p *Pinger) RemoveProbe(addr ProbeAddress) error {
p.sendersMu.Lock()
defer p.sendersMu.Unlock()
func (p *Pinger) probeTarget(ctx context.Context, addr ProbeAddress) (time.Duration, bool) {
sender, warmup, err := p.createSenderPair(ctx, addr)
if err != nil {
p.log.Warn("Failed to create senders", "probe", addr.String(), "error", err)
return 0, false
}
defer sender.Close()
defer warmup.Close()

key := addr.String()
probeCtx, cancel := context.WithTimeout(ctx, p.cfg.ProbeTimeout)
defer cancel()

entry, exists := p.senders[key]
if !exists {
p.log.Warn("Probe not found for removal", "probe", key)
return nil
type probeResult struct {
rtt time.Duration
err error
}
ch := make(chan probeResult, 2)
go func() {
rtt, err := warmup.Probe(probeCtx)
ch <- probeResult{rtt, err}
}()
go func() {
time.Sleep(DefaultWarmupDelay)
rtt, err := sender.Probe(probeCtx)
ch <- probeResult{rtt, err}
}()

if err := entry.sender.Close(); err != nil {
p.log.Warn("Failed to close sender", "probe", key, "error", err)
}
if err := entry.warmupSender.Close(); err != nil {
p.log.Warn("Failed to close warmup sender", "probe", key, "error", err)
var bestRTT time.Duration
ok := false
for range 2 {
r := <-ch
if r.err == nil && (!ok || r.rtt < bestRTT) {
bestRTT = r.rtt
ok = true
}
}

delete(p.senders, key)
p.log.Info("Removed probe", "probe", key)
return nil
return bestRTT, ok
}

func (p *Pinger) probeWorker(
ctx context.Context,
batch []*senderEntry,
batch []ProbeAddress,
staggerDelay time.Duration,
results map[ProbeAddress]uint64,
resultsMu *sync.Mutex,
wg *sync.WaitGroup,
) {
defer wg.Done()

for i, entry := range batch {
for i, addr := range batch {
select {
case <-ctx.Done():
return
default:
}

probeCtx, cancel := context.WithTimeout(ctx, p.cfg.ProbeTimeout)

// Send a warmup probe first to wake the reflector's thread, then
// send the measurement probe after a short delay. Both run on
// separate sockets so neither blocks the other. We take the min RTT.
type probeResult struct {
rtt time.Duration
err error
}
ch := make(chan probeResult, 2)
go func() {
rtt, err := entry.warmupSender.Probe(probeCtx)
ch <- probeResult{rtt, err}
}()
go func() {
time.Sleep(DefaultWarmupDelay)
rtt, err := entry.sender.Probe(probeCtx)
ch <- probeResult{rtt, err}
}()

var bestRTT time.Duration
ok := false
for range 2 {
r := <-ch
if r.err == nil && (!ok || r.rtt < bestRTT) {
bestRTT = r.rtt
ok = true
}
}
cancel()
rtt, ok := p.probeTarget(ctx, addr)

if ok {
resultsMu.Lock()
results[entry.addr] = uint64(bestRTT.Nanoseconds())
results[addr] = uint64(rtt.Nanoseconds())
resultsMu.Unlock()

p.log.Debug("Probe succeeded", "probe", entry.addr.String(), "rtt", bestRTT)
p.log.Debug("Probe succeeded", "probe", addr.String(), "rtt", rtt)
} else {
p.log.Debug("Probe failed", "probe", entry.addr.String())
p.log.Debug("Probe failed", "probe", addr.String())
}

if i < len(batch)-1 {
Expand All @@ -223,76 +214,45 @@ func (p *Pinger) probeWorker(

// MeasureOne measures a single probe and returns the best RTT in nanoseconds.
func (p *Pinger) MeasureOne(ctx context.Context, addr ProbeAddress) (uint64, bool) {
p.sendersMu.Lock()
entry, exists := p.senders[addr.String()]
p.sendersMu.Unlock()
p.targetsMu.Lock()
_, exists := p.targets[addr.String()]
p.targetsMu.Unlock()
if !exists {
p.log.Warn("MeasureOne called for unknown probe", "probe", addr.String())
return 0, false
}

probeCtx, cancel := context.WithTimeout(ctx, p.cfg.ProbeTimeout)
defer cancel()

type probeResult struct {
rtt time.Duration
err error
}
ch := make(chan probeResult, 2)
go func() {
rtt, err := entry.warmupSender.Probe(probeCtx)
ch <- probeResult{rtt, err}
}()
go func() {
time.Sleep(DefaultWarmupDelay)
rtt, err := entry.sender.Probe(probeCtx)
ch <- probeResult{rtt, err}
}()

var bestRTT time.Duration
ok := false
for range 2 {
r := <-ch
if r.err == nil && (!ok || r.rtt < bestRTT) {
bestRTT = r.rtt
ok = true
}
}

rtt, ok := p.probeTarget(ctx, addr)
if ok {
p.log.Debug("MeasureOne succeeded", "probe", addr.String(), "rtt", bestRTT)
} else {
p.log.Debug("MeasureOne failed", "probe", addr.String())
}

if !ok {
return 0, false
p.log.Debug("MeasureOne succeeded", "probe", addr.String(), "rtt", rtt)
return uint64(rtt.Nanoseconds()), true
}
return uint64(bestRTT.Nanoseconds()), true
p.log.Debug("MeasureOne failed", "probe", addr.String())
return 0, false
}

func (p *Pinger) MeasureAll(ctx context.Context) (map[ProbeAddress]uint64, error) {
p.sendersMu.Lock()
sendersCopy := make([]*senderEntry, 0, len(p.senders))
for _, entry := range p.senders {
sendersCopy = append(sendersCopy, entry)
p.targetsMu.Lock()
targetsCopy := make([]ProbeAddress, 0, len(p.targets))
for _, addr := range p.targets {
targetsCopy = append(targetsCopy, addr)
}
p.sendersMu.Unlock()
p.targetsMu.Unlock()

if len(sendersCopy) == 0 {
if len(targetsCopy) == 0 {
return make(map[ProbeAddress]uint64), nil
}

totalProbes := len(sendersCopy)
totalProbes := len(targetsCopy)
results := make(map[ProbeAddress]uint64)
resultsMu := sync.Mutex{}
var wg sync.WaitGroup

numWorkers := min(MaxWorkers, (len(sendersCopy)+ProbesPerWorker-1)/ProbesPerWorker)
numWorkers := min(MaxWorkers, (len(targetsCopy)+ProbesPerWorker-1)/ProbesPerWorker)
if numWorkers == 0 {
numWorkers = 1
}
batchSize := (len(sendersCopy) + numWorkers - 1) / numWorkers
batchSize := (len(targetsCopy) + numWorkers - 1) / numWorkers

staggerDelay := p.cfg.StaggerDelay
if staggerDelay == 0 {
Expand All @@ -301,12 +261,12 @@ func (p *Pinger) MeasureAll(ctx context.Context) (map[ProbeAddress]uint64, error

for i := 0; i < numWorkers; i++ {
start := i * batchSize
end := min(start+batchSize, len(sendersCopy))
if start >= len(sendersCopy) {
end := min(start+batchSize, len(targetsCopy))
if start >= len(targetsCopy) {
break
}

batch := sendersCopy[start:end]
batch := targetsCopy[start:end]
wg.Add(1)
go p.probeWorker(ctx, batch, staggerDelay, results, &resultsMu, &wg)
}
Expand All @@ -325,23 +285,10 @@ func (p *Pinger) MeasureAll(ctx context.Context) (map[ProbeAddress]uint64, error
}

func (p *Pinger) Close() error {
p.sendersMu.Lock()
defer p.sendersMu.Unlock()
p.targetsMu.Lock()
defer p.targetsMu.Unlock()

var lastErr error
for key, entry := range p.senders {
if err := entry.sender.Close(); err != nil {
p.log.Warn("Failed to close sender", "probe", key, "error", err)
lastErr = err
}
if err := entry.warmupSender.Close(); err != nil {
p.log.Warn("Failed to close warmup sender", "probe", key, "error", err)
lastErr = err
}
}

p.senders = make(map[string]*senderEntry)
p.targets = make(map[string]ProbeAddress)
p.log.Info("Closed all probes")

return lastErr
return nil
}
Loading
Loading