Skip to content

Conversation

@alpe
Copy link
Contributor

@alpe alpe commented Jan 8, 2026

🚧 WIP - early version
See #2003

Overview

@claude
Copy link
Contributor

claude bot commented Jan 8, 2026

Claude finished @alpe's task in 2m 38s —— View job


Comprehensive Code Review Complete

I've completed a thorough review of the DA event subscription feature. Here are my findings organized by focus area:


1. Code Quality & Architecture

Strengths

  • Clean interface design: The Subscribe method addition to the Client interface (block/internal/da/interface.go:27) is well-designed and follows Go idioms
  • Proper abstraction: The DA retriever correctly separates concerns between subscription management and blob processing
  • Good separation of concerns: The syncer's followDA() and fetchDAUntilCaughtUp() pattern cleanly separates catch-up vs. live-follow modes

⚠️ Issues & Recommendations

CRITICAL - Goroutine Leak in Subscribe (block/internal/da/client.go:446-502)
The Subscribe method spawns a goroutine that bridges events but has a potential goroutine leak:

// Start a goroutine to bridge events
go func() {
    defer close(outCh)
    
    for {
        select {
        case <-ctx.Done():
            return  // ✅ Handles context cancellation
        case resp, ok := <-subCh:
            if !ok {
                return  // ✅ Handles channel close
            // ...

Problem: If subCh never closes and ctx is never cancelled, the goroutine will leak. The underlying c.blobAPI.Internal.Subscribe may not guarantee channel closure.

Recommendation: Add documentation clarifying ownership semantics, or consider adding a timeout/heartbeat mechanism.

Missing Timestamp in Subscription Events (block/internal/da/client.go:485-496)

outCh <- datypes.ResultRetrieve{
    BaseResult: datypes.BaseResult{
        Code:   datypes.StatusSuccess,
        IDs:    ids,
        Height: resp.Height,
        //Timestamp: // TODO: set proper value  ⚠️ 
    },
    Data: data,
}

Issue: The TODO comment at line 493 indicates missing timestamp handling. This could cause:

  • Inconsistent timestamp handling between subscription and polling paths
  • Issues with based sequencers (as noted in comment at line 228)

Recommendation: Either:

  1. Fetch timestamps lazily when needed
  2. Document that subscription timestamps are intentionally omitted for performance
  3. Use a sentinel value (like time.Time{}) and document the behavior

Gap Detection Logic Complexity (block/internal/syncing/syncer.go:350-404)
The followDA() method has complex gap detection logic that could benefit from clearer documentation:

// If we receive an event for a future height (gap), break to trigger catchup
if event.DaHeight > nextExpectedHeight {
    s.logger.Info().
        Uint64("event_da_height", event.DaHeight).
        Uint64("expected_da_height", nextExpectedHeight).
        Msg("gap detected in DA stream, switching to catchup")
    return nil  // This returns to daWorkerLoop which triggers catchup
}

Recommendation: Add a state diagram or flowchart in comments explaining the catchup→follow→gap→catchup cycle.


2. Error Handling & Resilience

Strengths

  • Excellent error wrapping and context in daRetriever.Subscribe (block/internal/syncing/da_retriever.go:82-144)
  • Proper handling of context cancellation throughout
  • Good use of errors.Is() for error checking

⚠️ Issues & Recommendations

Inconsistent Error Handling in DARetriever.Subscribe (block/internal/syncing/da_retriever.go:127-130)

if errCode != datypes.StatusSuccess {
    r.logger.Error().Uint64("code", uint64(errCode)).Msg("subscription error")
    continue  // ⚠️ Silently discards error
}

Issue: The subscription goroutine logs errors but continues processing. This could:

  • Hide persistent DA issues
  • Make debugging difficult
  • Lead to silent data loss

Recommendation: Consider:

  1. Accumulating error counts and failing after N consecutive errors
  2. Exposing subscription health via metrics
  3. Returning errors on the outCh as a special event type

Missing Backpressure Handling (block/internal/syncing/da_retriever.go:134-139)

select {
case <-ctx.Done():
    return
case outCh <- ev:  // ⚠️ Blocking send with no timeout
}

Issue: If the consumer is slow, this will block the subscription goroutine indefinitely, preventing processing of new events.

Recommendation: Add a timeout or use a non-blocking send with buffering:

select {
case <-ctx.Done():
    return
case outCh <- ev:
case <-time.After(30 * time.Second):
    r.logger.Error().Msg("consumer too slow, dropping event")
    r.metrics.SubscriptionDroppedEvents.Inc()
}

Panic Risk in processBlobs (block/internal/syncing/da_retriever.go:287-292)

sort.Slice(events, func(i, j int) bool {
    if events[i].DaHeight != events[j].DaHeight {
        return events[i].DaHeight < events[j].DaHeight
    }
    return events[i].Header.Height() < events[j].Header.Height()  // ⚠️ No nil check
})

Issue: If events[i].Header is nil, this will panic.

Recommendation: Add defensive nil checks or document that headers are guaranteed non-nil at this point.


3. Security

Strengths

  • Proper validation of envelope signatures (block/internal/syncing/da_retriever.go:328-343)
  • Strict mode enforcement prevents downgrade attacks
  • Good proposer verification

⚠️ Issues & Recommendations

Strict Mode State is Transient (block/internal/syncing/da_retriever.go:44)

// strictMode indicates if the node has seen a valid DAHeaderEnvelope
// and should now reject all legacy/unsigned headers.
strictMode bool  // ⚠️ Not persisted

Issue: On restart, strictMode resets to false, allowing potential replay of old unsigned blocks.

Recommendation:

  1. Persist strict mode state to disk/cache
  2. Or document that this is intentional for testing/rollback
  3. Add a genesis flag to force strict mode from height 0

Potential DOS via Duplicate Blob Submission (block/internal/syncing/da_retriever.go:231-236)

if _, ok := r.pendingHeaders[header.Height()]; ok {
    // a (malicious) node may have re-published valid header to another da height (should never happen)
    // we can already discard it, only the first one is valid
    r.logger.Debug().Uint64("height", header.Height()).Uint64("da_height", daHeight).Msg("header blob already exists for height, discarding")
    continue
}

Issue: While the code correctly discards duplicates, a malicious DA publisher could spam many duplicate blobs, causing:

  • Excessive memory usage in pendingHeaders
  • CPU waste on signature verification

Recommendation: Add rate limiting or max pending entries per height.


4. Performance & Resource Efficiency

Strengths

  • Buffered channels (capacity 10-100) reduce blocking
  • Efficient use of sync.Map for concurrent access
  • Good use of atomic operations

⚠️ Issues & Recommendations

Unbounded Memory Growth in Pending Maps (block/internal/syncing/da_retriever.go:38-40)

// transient cache, only full event need to be passed to the syncer
// on restart, will be refetch as da height is updated by syncer
pendingHeaders map[uint64]*types.SignedHeader  // ⚠️ Unbounded
pendingData    map[uint64]*types.Data          // ⚠️ Unbounded

Issue: If headers and data arrive from very different DA heights (split namespace scenario), these maps can grow unbounded.

Recommendation:

  1. Add a max size limit (e.g., 1000 entries)
  2. Implement LRU eviction
  3. Add monitoring for map size
  4. Document expected max gap between header/data heights

Channel Buffer Size Discrepancy (block/internal/da/client.go:459 vs block/internal/syncing/da_retriever.go:356)

// In client.Subscribe:
outCh := make(chan datypes.ResultRetrieve, 10)  // Buffer: 10

// In syncer.followDA:
subCh := make(chan common.DAHeightEvent, 1)  // Buffer: 1

Issue: The small buffer (1) in syncer may cause backpressure quickly.

Recommendation: Document buffer sizing rationale or make it configurable.

Inefficient Namespace Comparison (block/internal/syncing/da_retriever.go:90, 152)

if !bytes.Equal(r.client.GetHeaderNamespace(), r.client.GetDataNamespace()) {
    // ... subscribe to both
}

Issue: This comparison happens on every call. Namespace bytes should be cached.

Recommendation: Cache the comparison result in daRetriever struct during initialization.


5. Testing & Reliability

Strengths

  • Excellent test coverage for subscription scenarios (syncer_subscribe_test.go)
  • Good use of mocks for testing
  • Tests cover catchup→follow transitions and gap detection

⚠️ Issues & Recommendations

Test Uses Fixed Timeouts (block/internal/syncing/syncer_subscribe_test.go:78-80)

select {
case e := <-syncer.heightInCh:
    assert.Equal(t, uint64(1), e.DaHeight)
case <-time.After(1 * time.Second):  // ⚠️ May be flaky on slow CI
    t.Fatal("timed out waiting for catchup event")
}

Issue: Fixed 1-second timeouts can cause flaky tests on slow CI systems.

Recommendation: Use a test helper or increase timeout to 5-10 seconds, or better yet, use a synchronization primitive instead of time-based polling.

Missing Edge Case Tests
Missing test scenarios:

  1. Subscription channel closes unexpectedly
  2. Very large gap in DA heights (>1000)
  3. Concurrent header/data arrival for same height
  4. Context cancellation during catchup
  5. Memory pressure from unbounded pending maps

Recommendation: Add tests for these scenarios.

Mock Interface Mismatch (block/internal/syncing/syncer_subscribe_test.go:68)

mockDARetriever.On("Subscribe", mock.Anything).Return((<-chan common.DAHeightEvent)(subCh), nil)

Issue: The mock signature differs from the actual interface (takes ch chan vs returns <-chan).

Looking at the actual interface (block/internal/syncing/da_retriever.go:26):

Subscribe(ctx context.Context, ch chan common.DAHeightEvent) error

And the test (syncer_subscribe_test.go:68):

mockDARetriever.On("Subscribe", mock.Anything).Return((<-chan common.DAHeightEvent)(subCh), nil)

Issue: The test expects Subscribe to return a channel, but the actual interface takes a channel as input and returns an error. This suggests the mock may be outdated or there's interface confusion.

Recommendation: Fix the mock interface to match the actual implementation.


6. Documentation & Developer Experience

Strengths

  • Good inline comments explaining complex logic
  • Clear TODO comments marking incomplete work
  • Helpful log messages with context

⚠️ Issues & Recommendations

Missing Package-Level Documentation
Neither block/internal/da/client.go nor block/internal/syncing/da_retriever.go has package-level documentation.

Recommendation: Add package comments explaining:

  • The subscription model
  • Catchup vs. follow modes
  • Split namespace handling
  • Gap detection strategy

Incomplete TODOs Should Be Tracked

  • block/internal/da/client.go:493 - Missing timestamp in subscription
  • block/internal/da/client.go:228, 266 - Timestamp retry logic needed

Recommendation: Create GitHub issues for these TODOs or resolve them before merging.

Confusing Variable Name (block/internal/syncing/syncer.go:356)

subCh := make(chan common.DAHeightEvent, 1)

This is the output channel for subscription events, but the name suggests it's a subscription channel (input).

Recommendation: Rename to daEventCh or subscriptionOutCh for clarity.


7. Critical Issues Summary

Must Fix Before Merge:

  1. Mock interface mismatch in tests (syncer_subscribe_test.go:68) - tests may not be validating the actual behavior
  2. Missing timestamp in subscription events - could break based sequencers
  3. Unbounded memory growth in pendingHeaders/pendingData maps
  4. Potential goroutine leak in Subscribe implementation

Should Fix Before Merge:

  1. Strict mode state not persisted - security regression on restart
  2. Error handling in subscription - silent error discarding
  3. Missing backpressure handling - can block DA processing
  4. Test flakiness - fixed timeouts may cause CI failures

Nice to Have:

  1. Better documentation of catchup/follow state machine
  2. Gap detection complexity could be simplified
  3. Package-level documentation
  4. Additional edge case test coverage

8. Specific File Reviews

block/internal/da/client.go:446-502

View file

The Subscribe implementation is well-structured but needs:

  • Goroutine lifecycle documentation
  • Timestamp handling (line 493)
  • Metrics for subscription health

block/internal/syncing/syncer.go:350-404

View file

The followDA method is complex. Consider:

  • Breaking into smaller functions
  • Adding state machine diagram
  • More granular error handling

block/internal/syncing/da_retriever.go:82-144

View file

Good implementation but needs:

  • Bounds checking on pending maps
  • Better error propagation
  • Backpressure handling

9. Recommendations

Before Merging:

  1. Fix mock interface mismatch in tests
  2. Resolve or document timestamp TODO
  3. Add bounds to pending maps
  4. Document goroutine lifecycle in Subscribe
  5. Add metrics for subscription health
  6. Run make lint and make test (I cannot run these - please verify)

For Future Work:

  1. Add subscription health monitoring
  2. Implement graceful degradation if subscription fails
  3. Consider adding subscription reconnection logic
  4. Add more comprehensive integration tests
  5. Document the expected DA event flow in CLAUDE.md

10. Overall Assessment

This is a solid implementation of DA event subscription with good architecture and test coverage. The main concerns are:

  • Production Readiness: 6/10 - needs fixes for goroutine lifecycle, memory bounds, and error handling
  • Code Quality: 8/10 - clean, idiomatic Go with good separation of concerns
  • Test Coverage: 7/10 - good scenarios covered but missing edge cases
  • Documentation: 6/10 - inline comments good, but missing high-level docs

Recommendation: Address the "Must Fix" items before merging. The "Should Fix" items can be follow-up PRs if needed.


Note: I was unable to run linters or tests due to tool restrictions. Please run make lint and make test locally to verify no regressions.

@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello @alpe, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly upgrades the system's interaction with the Data Availability (DA) layer by moving from a polling-based approach to a more efficient, subscription-driven model. The core synchronization logic has been enhanced to dynamically manage data retrieval, ensuring that the system can both efficiently catch up on past events and stay current with real-time updates from the DA layer. This change aims to improve the responsiveness and efficiency of block synchronization.

Highlights

  • DA Layer Subscription: Introduced a new Subscribe method to the da.Client interface and its implementation, enabling real-time subscriptions to Data Availability (DA) layer events for specified namespaces. This bridges JSON-RPC subscriptions to an internal ResultRetrieve channel.
  • Syncer Refactoring for DA Events: The Syncer's daWorkerLoop has been refactored to support both a 'catch-up' mode for historical data retrieval and a 'follow' mode that utilizes the new DA layer subscriptions for real-time updates. The system intelligently switches between these modes, falling back to catch-up if a gap is detected in the subscription stream.
  • DARetriever Enhancements: The DARetriever interface and its concrete implementation now include a Subscribe method. This method subscribes to both header and data namespaces on the DA layer and processes incoming blobs, sorting them by DA height and then by block height to ensure correct event ordering.
  • Concurrency Safety and Event Sorting: A sync.Mutex has been added to daRetriever to protect shared state (pendingHeaders and pendingData) during concurrent access. Additionally, the processBlobs function now explicitly sorts retrieved events to maintain a consistent processing order.
  • Comprehensive Testing for Subscription Logic: New test cases have been added in syncer_subscribe_test.go to validate the Syncer's new subscription-based logic, covering scenarios such as the transition from catch-up to follow mode, gap detection, and handling of events from split namespaces.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@github-actions
Copy link
Contributor

github-actions bot commented Jan 8, 2026

The latest Buf updates on your PR. Results from workflow CI / buf-check (pull_request).

BuildFormatLintBreakingUpdated (UTC)
✅ passed⏩ skipped✅ passed✅ passedJan 8, 2026, 4:43 PM

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a subscription-based mechanism for receiving events from the Data Availability (DA) layer, which is a great improvement for efficiency over polling. The implementation introduces a 'follow mode' in the syncer that uses these subscriptions, with a fallback to the existing 'catch-up mode' for handling gaps or failures. The changes are well-structured and include new tests. I've identified a critical issue in the new tests that needs to be addressed, a potential goroutine leak, and a minor point of confusion in the code. Overall, this is a solid step forward.

Comment on lines +67 to +68
subCh := make(chan common.DAHeightEvent, 10)
mockDARetriever.On("Subscribe", mock.Anything).Return((<-chan common.DAHeightEvent)(subCh), nil)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The mock setup for daRetriever.Subscribe is incorrect, which will cause these tests to panic. The DARetriever.Subscribe method signature is func(ctx context.Context, ch chan common.DAHeightEvent) error.

Your mock setup has two issues:

  1. mockDARetriever.On("Subscribe", mock.Anything) only provides one argument, but the method expects two (ctx and ch).
  2. .Return((<-chan common.DAHeightEvent)(subCh), nil) provides two return values (a channel and an error), but the method only returns a single error.

To fix this, you need to adjust the mock to match the interface. You should also use the mock's Run method to capture the channel passed by the syncer so you can send test events on it.

Here is an example of how you could fix TestSyncer_DAWorker_CatchupThenFollow (the same logic applies to the other tests in this file):

	// 2. Subscribe
	// Use a WaitGroup to reliably capture the channel from the mock's Run function.
	var subWg sync.WaitGroup
	subWg.Add(1)
	var capturedCh chan common.DAHeightEvent
	mockDARetriever.On("Subscribe", mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
		capturedCh = args.Get(1).(chan common.DAHeightEvent)
		subWg.Done()
	}).Return(nil).Once()

	// Run daWorkerLoop in goroutine
	syncer.wg.Add(1)
	go syncer.daWorkerLoop()

	// ... (rest of catchup verification)

	// Wait for Subscribe to be called and the channel to be captured
	subWg.Wait()
	require.NotNil(t, capturedCh, "channel should have been captured by mock")

	// Now we should be in Follow mode (Subscribe called).
	// Feed event 2 to subscription
	capturedCh <- evt2

	// ... (rest of follow verification)

This is a critical issue as it breaks all the new tests in this file.

Comment on lines +488 to +496
outCh <- datypes.ResultRetrieve{
BaseResult: datypes.BaseResult{
Code: datypes.StatusSuccess,
IDs: ids,
Height: resp.Height,
//Timestamp: // TODO: set proper value
},
Data: data,
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

This goroutine has a potential to leak. The send operation on outCh at line 488 is blocking. If the consumer of outCh is not reading from it, this goroutine will block indefinitely on the send, even if the context is canceled. The select statement at line 466 will be stuck waiting for outCh to be ready for a send and won't be able to process the <-ctx.Done() case.

To fix this, you should use a select statement for sending to outCh to also handle context cancellation.

select {
				case outCh <- datypes.ResultRetrieve{
					BaseResult: datypes.BaseResult{
						Code:   datypes.StatusSuccess,
						IDs:    ids,
						Height: resp.Height,
						//Timestamp: // TODO: set proper value
					},
					Data: data,
				}:
				case <-ctx.Done():
					return
				}

Comment on lines +116 to +118
if subChData == nil {
continue
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This check if subChData == nil is inside a case that receives from subChData. A receive on a nil channel blocks forever, so this case will never be selected if subChData is nil. This makes the check effectively dead code and it can be removed to improve clarity.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants