Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions polygon/sync/event_channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,21 +63,25 @@ func (ec *EventChannel[TEvent]) Events() <-chan TEvent {
}

// PushEvent queues an event. If the queue is full, it drops the oldest event to make space.
func (ec *EventChannel[TEvent]) PushEvent(e TEvent) {
// It returns a pointer to the dropped event, or nil if nothing was dropped.
func (ec *EventChannel[TEvent]) PushEvent(e TEvent) *TEvent {
ec.queueMutex.Lock()
defer ec.queueMutex.Unlock()

var dropped bool
var droppedEvent *TEvent
if uint(ec.queue.Len()) == ec.queueCap {
ec.queue.Remove(ec.queue.Front())
dropped = true
front := ec.queue.Front()
dropped := front.Value.(TEvent)
droppedEvent = &dropped
ec.queue.Remove(front)
}
if ec.opts.logger != nil && dropped {
if ec.opts.logger != nil && droppedEvent != nil {
ec.opts.logger.Log(ec.opts.loggerLvl, fmt.Sprintf("[event-channel-%s] dropping event", ec.opts.loggerId))
}

ec.queue.PushBack(e)
ec.queueCond.Signal()
return droppedEvent
}

// takeEvent dequeues an event. If the queue was empty, it returns false.
Expand Down
11 changes: 10 additions & 1 deletion polygon/sync/tip_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package sync

import (
"context"
"errors"
"fmt"

lru "github.com/hashicorp/golang-lru/v2"
Expand Down Expand Up @@ -264,12 +265,20 @@ func (c TipEventsCompositeChannel) Events() <-chan Event {
return c.events
}

var errEventDropped = errors.New("event dropped: p2p event queue full")

func (c TipEventsCompositeChannel) PushEvent(e Event) {
switch e.Topic() {
case EventTopicHeimdall:
c.heimdallEventsChannel.PushEvent(e)
case EventTopicP2P:
c.p2pEventsChannel.PushEvent(e)
dropped := c.p2pEventsChannel.PushEvent(e)
if dropped != nil && dropped.Type == EventTypeNewBlockBatch {
processedC := dropped.newBlockBatch.Processed
go func() {
processedC <- errEventDropped
}()
}
default:
panic(fmt.Sprintf("unsupported topic in tip events composite channel: %s", e.Topic()))
}
Expand Down
32 changes: 32 additions & 0 deletions polygon/sync/tip_events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,38 @@ func TestTipEventsCompositeChannel(t *testing.T) {
require.ErrorIs(t, err, context.Canceled)
}

func TestTipEventsCompositeChannelDroppedNewBlockBatchSignalsProcessed(t *testing.T) {
t.Parallel()

heimdallEvents := NewEventChannel[Event](3)
p2pEvents := NewEventChannel[Event](1) // capacity 1 so the second NewBlockBatch evicts the first
ch := NewTipEventsCompositeChannel(heimdallEvents, p2pEvents)

processedC1 := make(chan error, 1)
ch.PushEvent(Event{
Type: EventTypeNewBlockBatch,
newBlockBatch: EventNewBlockBatch{
Processed: processedC1,
},
})

processedC2 := make(chan error, 1)
// pushing a second NewBlockBatch evicts the first; processedC1 must be signalled
ch.PushEvent(Event{
Type: EventTypeNewBlockBatch,
newBlockBatch: EventNewBlockBatch{
Processed: processedC2,
},
})

select {
case err := <-processedC1:
require.ErrorIs(t, err, errEventDropped)
case <-time.After(time.Second):
t.Fatal("timed out waiting for dropped NewBlockBatch Processed signal")
}
}

func read(ctx context.Context, t *testing.T, ch <-chan Event) Event {
select {
case e := <-ch:
Expand Down