From 29add887132f84707ffd695960c8d4ae4a0c0781 Mon Sep 17 00:00:00 2001 From: adytzu2007 Date: Mon, 23 Feb 2026 10:53:39 +0200 Subject: [PATCH] polygon/sync: fix goroutine deadlock when NewBlockBatch event is dropped When the p2p event queue (capacity 1000) is full, PushEvent silently evicts the oldest event. If that event is a NewBlockBatch, the goroutine in backwardDownloadBlockBatches blocks forever waiting on its Processed channel (ctx is still active so ctx.Done() never fires either). PushEvent now returns the dropped event. TipEventsCompositeChannel.PushEvent checks if a dropped event is a NewBlockBatch and signals its Processed channel with an error in a background goroutine, allowing the blocked goroutine to return and clean up blockRequestsCache. Co-Authored-By: Claude Sonnet 4.6 --- polygon/sync/event_channel.go | 14 +++++++++----- polygon/sync/tip_events.go | 11 ++++++++++- polygon/sync/tip_events_test.go | 32 ++++++++++++++++++++++++++++++++ 3 files changed, 51 insertions(+), 6 deletions(-) diff --git a/polygon/sync/event_channel.go b/polygon/sync/event_channel.go index 01d79c27f1d..4f421985c4f 100644 --- a/polygon/sync/event_channel.go +++ b/polygon/sync/event_channel.go @@ -63,21 +63,25 @@ func (ec *EventChannel[TEvent]) Events() <-chan TEvent { } // PushEvent queues an event. If the queue is full, it drops the oldest event to make space. -func (ec *EventChannel[TEvent]) PushEvent(e TEvent) { +// It returns a pointer to the dropped event, or nil if nothing was dropped. +func (ec *EventChannel[TEvent]) PushEvent(e TEvent) *TEvent { ec.queueMutex.Lock() defer ec.queueMutex.Unlock() - var dropped bool + var droppedEvent *TEvent if uint(ec.queue.Len()) == ec.queueCap { - ec.queue.Remove(ec.queue.Front()) - dropped = true + front := ec.queue.Front() + dropped := front.Value.(TEvent) + droppedEvent = &dropped + ec.queue.Remove(front) } - if ec.opts.logger != nil && dropped { + if ec.opts.logger != nil && droppedEvent != nil { ec.opts.logger.Log(ec.opts.loggerLvl, fmt.Sprintf("[event-channel-%s] dropping event", ec.opts.loggerId)) } ec.queue.PushBack(e) ec.queueCond.Signal() + return droppedEvent } // takeEvent dequeues an event. If the queue was empty, it returns false. diff --git a/polygon/sync/tip_events.go b/polygon/sync/tip_events.go index 7f15c6fb982..230efde72d5 100644 --- a/polygon/sync/tip_events.go +++ b/polygon/sync/tip_events.go @@ -18,6 +18,7 @@ package sync import ( "context" + "errors" "fmt" lru "github.com/hashicorp/golang-lru/v2" @@ -264,12 +265,20 @@ func (c TipEventsCompositeChannel) Events() <-chan Event { return c.events } +var errEventDropped = errors.New("event dropped: p2p event queue full") + func (c TipEventsCompositeChannel) PushEvent(e Event) { switch e.Topic() { case EventTopicHeimdall: c.heimdallEventsChannel.PushEvent(e) case EventTopicP2P: - c.p2pEventsChannel.PushEvent(e) + dropped := c.p2pEventsChannel.PushEvent(e) + if dropped != nil && dropped.Type == EventTypeNewBlockBatch { + processedC := dropped.newBlockBatch.Processed + go func() { + processedC <- errEventDropped + }() + } default: panic(fmt.Sprintf("unsupported topic in tip events composite channel: %s", e.Topic())) } diff --git a/polygon/sync/tip_events_test.go b/polygon/sync/tip_events_test.go index 0b5c5e1a40e..52e4bdf0e4c 100644 --- a/polygon/sync/tip_events_test.go +++ b/polygon/sync/tip_events_test.go @@ -80,6 +80,38 @@ func TestTipEventsCompositeChannel(t *testing.T) { require.ErrorIs(t, err, context.Canceled) } +func TestTipEventsCompositeChannelDroppedNewBlockBatchSignalsProcessed(t *testing.T) { + t.Parallel() + + heimdallEvents := NewEventChannel[Event](3) + p2pEvents := NewEventChannel[Event](1) // capacity 1 so the second NewBlockBatch evicts the first + ch := NewTipEventsCompositeChannel(heimdallEvents, p2pEvents) + + processedC1 := make(chan error, 1) + ch.PushEvent(Event{ + Type: EventTypeNewBlockBatch, + newBlockBatch: EventNewBlockBatch{ + Processed: processedC1, + }, + }) + + processedC2 := make(chan error, 1) + // pushing a second NewBlockBatch evicts the first; processedC1 must be signalled + ch.PushEvent(Event{ + Type: EventTypeNewBlockBatch, + newBlockBatch: EventNewBlockBatch{ + Processed: processedC2, + }, + }) + + select { + case err := <-processedC1: + require.ErrorIs(t, err, errEventDropped) + case <-time.After(time.Second): + t.Fatal("timed out waiting for dropped NewBlockBatch Processed signal") + } +} + func read(ctx context.Context, t *testing.T, ch <-chan Event) Event { select { case e := <-ch: