Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion apps/evm/server/force_inclusion_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ import (

// mockDA implements block/internal/da.Client for testing
type mockDA struct {
submitFunc func(ctx context.Context, data [][]byte, gasPrice float64, namespace []byte, options []byte) da.ResultSubmit
submitFunc func(ctx context.Context, data [][]byte, gasPrice float64, namespace []byte, options []byte) da.ResultSubmit
subscribeFunc func(ctx context.Context, namespace []byte) (<-chan da.ResultRetrieve, error)
}

func (m *mockDA) Submit(ctx context.Context, data [][]byte, gasPrice float64, namespace []byte, options []byte) da.ResultSubmit {
Expand All @@ -29,6 +30,13 @@ func (m *mockDA) Submit(ctx context.Context, data [][]byte, gasPrice float64, na
return da.ResultSubmit{BaseResult: da.BaseResult{Code: da.StatusSuccess, Height: 1}}
}

func (m *mockDA) Subscribe(ctx context.Context, namespace []byte) (<-chan da.ResultRetrieve, error) {
if m.subscribeFunc != nil {
return m.subscribeFunc(ctx, namespace)
}
return nil, nil
}

func (m *mockDA) Retrieve(ctx context.Context, height uint64, namespace []byte) da.ResultRetrieve {
return da.ResultRetrieve{}
}
Expand Down
58 changes: 58 additions & 0 deletions block/internal/da/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,3 +442,61 @@ func (c *client) Validate(ctx context.Context, ids []datypes.ID, proofs []datype

return results, nil
}

// Subscribe subscribes to the DA layer for new blobs at the specified namespace.
// It bridges the jsonrpc subscription to the ResultRetrieve channel for internal consumption.
func (c *client) Subscribe(ctx context.Context, namespace []byte) (<-chan datypes.ResultRetrieve, error) {
ns, err := share.NewNamespaceFromBytes(namespace)
if err != nil {
return nil, fmt.Errorf("invalid namespace: %w", err)
}

subCh, err := c.blobAPI.Internal.Subscribe(ctx, ns)
if err != nil {
return nil, fmt.Errorf("failed to subscribe to blob namespace: %w", err)
}

outCh := make(chan datypes.ResultRetrieve, 10)

// Start a goroutine to bridge events
go func() {
defer close(outCh)

for {
select {
case <-ctx.Done():
return
case resp, ok := <-subCh:
if !ok {
return
}
if resp == nil {
continue
}

// Convert Blobs to ResultRetrieve
ids := make([]datypes.ID, len(resp.Blobs))
data := make([]datypes.Blob, len(resp.Blobs))
for i, b := range resp.Blobs {
ids[i] = blobrpc.MakeID(resp.Height, b.Commitment)
data[i] = b.Data()
}

// Ideally we would get the block timestamp here but that would require an extra RPC call.
// For subscription feed, we might use local time or 0 as it's mostly for triggering catchup.
// Using 0 or Now() is a trade-off. Let's use Now() for liveness.
outCh <- datypes.ResultRetrieve{
BaseResult: datypes.BaseResult{
Code: datypes.StatusSuccess,
IDs: ids,
Height: resp.Height,
//Timestamp: // TODO: set proper value
},
Data: data,
}
Comment on lines +488 to +496
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

This goroutine has a potential to leak. The send operation on outCh at line 488 is blocking. If the consumer of outCh is not reading from it, this goroutine will block indefinitely on the send, even if the context is canceled. The select statement at line 466 will be stuck waiting for outCh to be ready for a send and won't be able to process the <-ctx.Done() case.

To fix this, you should use a select statement for sending to outCh to also handle context cancellation.

select {
				case outCh <- datypes.ResultRetrieve{
					BaseResult: datypes.BaseResult{
						Code:   datypes.StatusSuccess,
						IDs:    ids,
						Height: resp.Height,
						//Timestamp: // TODO: set proper value
					},
					Data: data,
				}:
				case <-ctx.Done():
					return
				}

}
}
}()

return outCh, nil
}
3 changes: 3 additions & 0 deletions block/internal/da/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ type Client interface {
GetDataNamespace() []byte
GetForcedInclusionNamespace() []byte
HasForcedInclusionNamespace() bool

// Subscribe subscribes to the DA layer for new blobs at the specified namespace.
Subscribe(ctx context.Context, namespace []byte) (<-chan datypes.ResultRetrieve, error)
}

// Verifier defines the interface for DA proof verification operations.
Expand Down
90 changes: 80 additions & 10 deletions block/internal/syncing/da_retriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"context"
"errors"
"fmt"
"sort"
"sync"

"github.com/rs/zerolog"
"google.golang.org/protobuf/proto"
Expand All @@ -21,6 +23,7 @@ import (
// DARetriever defines the interface for retrieving events from the DA layer
type DARetriever interface {
RetrieveFromDA(ctx context.Context, daHeight uint64) ([]common.DAHeightEvent, error)
Subscribe(ctx context.Context, ch chan common.DAHeightEvent) error
}

// daRetriever handles DA retrieval operations for syncing
Expand All @@ -34,6 +37,7 @@ type daRetriever struct {
// on restart, will be refetch as da height is updated by syncer
pendingHeaders map[uint64]*types.SignedHeader
pendingData map[uint64]*types.Data
mu sync.Mutex

// strictMode indicates if the node has seen a valid DAHeaderEnvelope
// and should now reject all legacy/unsigned headers.
Expand Down Expand Up @@ -75,6 +79,70 @@ func (r *daRetriever) RetrieveFromDA(ctx context.Context, daHeight uint64) ([]co
return r.processBlobs(ctx, blobsResp.Data, daHeight), nil
}

// Subscribe subscribes to specific DA namespace
func (r *daRetriever) Subscribe(ctx context.Context, outCh chan common.DAHeightEvent) error {
subChHeader, err := r.client.Subscribe(ctx, r.client.GetHeaderNamespace())
if err != nil {
return fmt.Errorf("subscribe to headers: %w", err)
}

var subChData <-chan datypes.ResultRetrieve
if !bytes.Equal(r.client.GetHeaderNamespace(), r.client.GetDataNamespace()) {
var err error
subChData, err = r.client.Subscribe(ctx, r.client.GetDataNamespace())
if err != nil {
return fmt.Errorf("subscribe to data: %w", err)
}
}

go func() {
defer close(outCh)
for {
var blobs [][]byte
var height uint64
var errCode datypes.StatusCode

select {
case <-ctx.Done():
return
case res, ok := <-subChHeader:
if !ok {
return
}
blobs = res.Data
height = res.Height
errCode = res.Code
case res, ok := <-subChData:
if subChData == nil {
continue
}
Comment on lines +116 to +118
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This check if subChData == nil is inside a case that receives from subChData. A receive on a nil channel blocks forever, so this case will never be selected if subChData is nil. This makes the check effectively dead code and it can be removed to improve clarity.

if !ok {
return
}
blobs = res.Data
height = res.Height
errCode = res.Code
}

if errCode != datypes.StatusSuccess {
r.logger.Error().Uint64("code", uint64(errCode)).Msg("subscription error")
continue
}

events := r.processBlobs(ctx, blobs, height)
for _, ev := range events {
select {
case <-ctx.Done():
return
case outCh <- ev:
}
}
}
}()

return nil
}

// fetchBlobs retrieves blobs from both header and data namespaces
func (r *daRetriever) fetchBlobs(ctx context.Context, daHeight uint64) (datypes.ResultRetrieve, error) {
// Retrieve from both namespaces using the DA client
Expand Down Expand Up @@ -150,6 +218,9 @@ func (r *daRetriever) validateBlobResponse(res datypes.ResultRetrieve, daHeight

// processBlobs processes retrieved blobs to extract headers and data and returns height events
func (r *daRetriever) processBlobs(ctx context.Context, blobs [][]byte, daHeight uint64) []common.DAHeightEvent {
r.mu.Lock()
defer r.mu.Unlock()

// Decode all blobs
for _, bz := range blobs {
if len(bz) == 0 {
Expand Down Expand Up @@ -212,18 +283,17 @@ func (r *daRetriever) processBlobs(ctx context.Context, blobs [][]byte, daHeight
events = append(events, event)
}

// Sort events by height to match execution order
sort.Slice(events, func(i, j int) bool {
if events[i].DaHeight != events[j].DaHeight {
return events[i].DaHeight < events[j].DaHeight
}
return events[i].Header.Height() < events[j].Header.Height()
})

if len(events) > 0 {
startHeight := events[0].Header.Height()
endHeight := events[0].Header.Height()
for _, event := range events {
h := event.Header.Height()
if h < startHeight {
startHeight = h
}
if h > endHeight {
endHeight = h
}
}
endHeight := events[len(events)-1].Header.Height()
r.logger.Info().Uint64("da_height", daHeight).Uint64("start_height", startHeight).Uint64("end_height", endHeight).Msg("processed blocks from DA")
}

Expand Down
57 changes: 57 additions & 0 deletions block/internal/syncing/da_retriever_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

29 changes: 29 additions & 0 deletions block/internal/syncing/da_retriever_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,3 +374,32 @@ func Test_isEmptyDataExpected(t *testing.T) {
h.DataHash = common.DataHashForEmptyTxs
assert.True(t, isEmptyDataExpected(h))
}

func TestDARetriever_ProcessBlobs_Sorting(t *testing.T) {
addr, pub, signer := buildSyncTestSigner(t)
gen := genesis.Genesis{ChainID: "tchain", InitialHeight: 1, StartTime: time.Now().Add(-time.Second), ProposerAddress: addr}
r := newTestDARetriever(t, nil, config.DefaultConfig(), gen)

// Event A: Block Height 10
// Event B: Block Height 5
// Although DaHeight is currently identical for all events in a single processBlobs call,
// this test ensures that the secondary sort key (Block Height) behaves correctly.

data1Bin, data1 := makeSignedDataBytes(t, gen.ChainID, 10, addr, pub, signer, 1)
data2Bin, data2 := makeSignedDataBytes(t, gen.ChainID, 5, addr, pub, signer, 1)

hdr1Bin, _ := makeSignedHeaderBytes(t, gen.ChainID, 10, addr, pub, signer, nil, &data1.Data, nil)
hdr2Bin, _ := makeSignedHeaderBytes(t, gen.ChainID, 5, addr, pub, signer, nil, &data2.Data, nil)

// Process blobs.
daHeight := uint64(100)
// We pass them in mixed order to ensure sorting happens.
events := r.processBlobs(context.Background(), [][]byte{hdr1Bin, data1Bin, hdr2Bin, data2Bin}, daHeight)

require.Len(t, events, 2)
assert.Equal(t, uint64(5), events[0].Header.Height(), "Events should be sorted by block height asc")
assert.Equal(t, uint64(10), events[1].Header.Height())

assert.Equal(t, daHeight, events[0].DaHeight)
assert.Equal(t, daHeight, events[1].DaHeight)
}
Loading
Loading