-
Notifications
You must be signed in to change notification settings - Fork 245
[WIP] feat: Subscribe to da events #2962
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
|
Claude finished @alpe's task in 2m 38s —— View job Comprehensive Code Review CompleteI've completed a thorough review of the DA event subscription feature. Here are my findings organized by focus area: 1. Code Quality & Architecture✅ Strengths
|
Summary of ChangesHello @alpe, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly upgrades the system's interaction with the Data Availability (DA) layer by moving from a polling-based approach to a more efficient, subscription-driven model. The core synchronization logic has been enhanced to dynamically manage data retrieval, ensuring that the system can both efficiently catch up on past events and stay current with real-time updates from the DA layer. This change aims to improve the responsiveness and efficiency of block synchronization. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
|
The latest Buf updates on your PR. Results from workflow CI / buf-check (pull_request).
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a subscription-based mechanism for receiving events from the Data Availability (DA) layer, which is a great improvement for efficiency over polling. The implementation introduces a 'follow mode' in the syncer that uses these subscriptions, with a fallback to the existing 'catch-up mode' for handling gaps or failures. The changes are well-structured and include new tests. I've identified a critical issue in the new tests that needs to be addressed, a potential goroutine leak, and a minor point of confusion in the code. Overall, this is a solid step forward.
| subCh := make(chan common.DAHeightEvent, 10) | ||
| mockDARetriever.On("Subscribe", mock.Anything).Return((<-chan common.DAHeightEvent)(subCh), nil) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The mock setup for daRetriever.Subscribe is incorrect, which will cause these tests to panic. The DARetriever.Subscribe method signature is func(ctx context.Context, ch chan common.DAHeightEvent) error.
Your mock setup has two issues:
mockDARetriever.On("Subscribe", mock.Anything)only provides one argument, but the method expects two (ctxandch)..Return((<-chan common.DAHeightEvent)(subCh), nil)provides two return values (a channel and an error), but the method only returns a singleerror.
To fix this, you need to adjust the mock to match the interface. You should also use the mock's Run method to capture the channel passed by the syncer so you can send test events on it.
Here is an example of how you could fix TestSyncer_DAWorker_CatchupThenFollow (the same logic applies to the other tests in this file):
// 2. Subscribe
// Use a WaitGroup to reliably capture the channel from the mock's Run function.
var subWg sync.WaitGroup
subWg.Add(1)
var capturedCh chan common.DAHeightEvent
mockDARetriever.On("Subscribe", mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
capturedCh = args.Get(1).(chan common.DAHeightEvent)
subWg.Done()
}).Return(nil).Once()
// Run daWorkerLoop in goroutine
syncer.wg.Add(1)
go syncer.daWorkerLoop()
// ... (rest of catchup verification)
// Wait for Subscribe to be called and the channel to be captured
subWg.Wait()
require.NotNil(t, capturedCh, "channel should have been captured by mock")
// Now we should be in Follow mode (Subscribe called).
// Feed event 2 to subscription
capturedCh <- evt2
// ... (rest of follow verification)This is a critical issue as it breaks all the new tests in this file.
| outCh <- datypes.ResultRetrieve{ | ||
| BaseResult: datypes.BaseResult{ | ||
| Code: datypes.StatusSuccess, | ||
| IDs: ids, | ||
| Height: resp.Height, | ||
| //Timestamp: // TODO: set proper value | ||
| }, | ||
| Data: data, | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This goroutine has a potential to leak. The send operation on outCh at line 488 is blocking. If the consumer of outCh is not reading from it, this goroutine will block indefinitely on the send, even if the context is canceled. The select statement at line 466 will be stuck waiting for outCh to be ready for a send and won't be able to process the <-ctx.Done() case.
To fix this, you should use a select statement for sending to outCh to also handle context cancellation.
select {
case outCh <- datypes.ResultRetrieve{
BaseResult: datypes.BaseResult{
Code: datypes.StatusSuccess,
IDs: ids,
Height: resp.Height,
//Timestamp: // TODO: set proper value
},
Data: data,
}:
case <-ctx.Done():
return
}| if subChData == nil { | ||
| continue | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🚧 WIP - early version
See #2003
Overview