diff --git a/block/internal/common/metrics.go b/block/internal/common/metrics.go index 13a041943..cf806bd89 100644 --- a/block/internal/common/metrics.go +++ b/block/internal/common/metrics.go @@ -69,6 +69,11 @@ type Metrics struct { // Forced inclusion metrics ForcedInclusionTxsInGracePeriod metrics.Gauge // Number of forced inclusion txs currently in grace period ForcedInclusionTxsMalicious metrics.Counter // Total number of forced inclusion txs marked as malicious + + // Sync mode metrics + SyncMode metrics.Gauge // Current sync mode: 0=catchup, 1=follow + SubscribeErrors metrics.Counter // Number of subscription failures + ModeSwitches metrics.Counter // Number of catchup<->follow mode transitions } // PrometheusMetrics returns Metrics built using Prometheus client library @@ -201,6 +206,28 @@ func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics { Help: "Total number of forced inclusion transactions marked as malicious (past grace boundary)", }, labels).With(labelsAndValues...) + // Sync mode metrics + m.SyncMode = prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "sync_mode", + Help: "Current sync mode: 0=catchup (polling), 1=follow (subscription)", + }, labels).With(labelsAndValues...) + + m.SubscribeErrors = prometheus.NewCounterFrom(stdprometheus.CounterOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "subscribe_errors_total", + Help: "Total number of DA subscription failures", + }, labels).With(labelsAndValues...) + + m.ModeSwitches = prometheus.NewCounterFrom(stdprometheus.CounterOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "mode_switches_total", + Help: "Total number of sync mode transitions between catchup and follow", + }, labels).With(labelsAndValues...) + // DA Submitter metrics m.DASubmitterPendingBlobs = prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ Namespace: namespace, @@ -269,6 +296,11 @@ func NopMetrics() *Metrics { // Forced inclusion metrics ForcedInclusionTxsInGracePeriod: discard.NewGauge(), ForcedInclusionTxsMalicious: discard.NewCounter(), + + // Sync mode metrics + SyncMode: discard.NewGauge(), + SubscribeErrors: discard.NewCounter(), + ModeSwitches: discard.NewCounter(), } // Initialize maps with no-op metrics diff --git a/block/internal/da/client.go b/block/internal/da/client.go index d2e1d626e..146ee7ccf 100644 --- a/block/internal/da/client.go +++ b/block/internal/da/client.go @@ -442,3 +442,27 @@ func (c *client) Validate(ctx context.Context, ids []datypes.ID, proofs []datype return results, nil } + +// Subscribe subscribes to blobs in the specified namespace. +// Returns a channel that receives subscription responses as new blobs are included. +func (c *client) Subscribe(ctx context.Context, namespace []byte) (<-chan *blobrpc.SubscriptionResponse, error) { + ns, err := share.NewNamespaceFromBytes(namespace) + if err != nil { + return nil, fmt.Errorf("invalid namespace: %w", err) + } + + return c.blobAPI.Subscribe(ctx, ns) +} + +// LocalHead returns the height of the locally synced DA head. +func (c *client) LocalHead(ctx context.Context) (uint64, error) { + headCtx, cancel := context.WithTimeout(ctx, c.defaultTimeout) + defer cancel() + + header, err := c.headerAPI.LocalHead(headCtx) + if err != nil { + return 0, fmt.Errorf("failed to get local head: %w", err) + } + + return header.Height, nil +} diff --git a/block/internal/da/interface.go b/block/internal/da/interface.go index 69c2d18f7..1d56009d4 100644 --- a/block/internal/da/interface.go +++ b/block/internal/da/interface.go @@ -3,6 +3,7 @@ package da import ( "context" + blobrpc "github.com/evstack/ev-node/pkg/da/jsonrpc" datypes "github.com/evstack/ev-node/pkg/da/types" ) @@ -22,6 +23,15 @@ type Client interface { GetDataNamespace() []byte GetForcedInclusionNamespace() []byte HasForcedInclusionNamespace() bool + + // Subscribe subscribes to blobs in the specified namespace. + // Returns a channel that receives subscription responses as new blobs are included. + // Used for follow mode to receive real-time blob notifications. + Subscribe(ctx context.Context, namespace []byte) (<-chan *blobrpc.SubscriptionResponse, error) + + // LocalHead returns the height of the locally synced DA head. + // Used to determine if the node is caught up with the DA layer. + LocalHead(ctx context.Context) (uint64, error) } // Verifier defines the interface for DA proof verification operations. diff --git a/block/internal/syncing/da_retriever.go b/block/internal/syncing/da_retriever.go index 1307b3968..576862495 100644 --- a/block/internal/syncing/da_retriever.go +++ b/block/internal/syncing/da_retriever.go @@ -21,6 +21,9 @@ import ( // DARetriever defines the interface for retrieving events from the DA layer type DARetriever interface { RetrieveFromDA(ctx context.Context, daHeight uint64) ([]common.DAHeightEvent, error) + // ProcessBlobs processes raw blobs from subscription and returns height events. + // Used by follow mode to process real-time blob notifications. + ProcessBlobs(ctx context.Context, blobs [][]byte, daHeight uint64) []common.DAHeightEvent } // daRetriever handles DA retrieval operations for syncing @@ -72,7 +75,7 @@ func (r *daRetriever) RetrieveFromDA(ctx context.Context, daHeight uint64) ([]co } r.logger.Debug().Int("blobs", len(blobsResp.Data)).Uint64("da_height", daHeight).Msg("retrieved blob data") - return r.processBlobs(ctx, blobsResp.Data, daHeight), nil + return r.ProcessBlobs(ctx, blobsResp.Data, daHeight), nil } // fetchBlobs retrieves blobs from both header and data namespaces @@ -148,8 +151,9 @@ func (r *daRetriever) validateBlobResponse(res datypes.ResultRetrieve, daHeight } } -// processBlobs processes retrieved blobs to extract headers and data and returns height events -func (r *daRetriever) processBlobs(ctx context.Context, blobs [][]byte, daHeight uint64) []common.DAHeightEvent { +// ProcessBlobs processes retrieved blobs to extract headers and data and returns height events. +// This method implements the DARetriever interface and is used by both polling and subscription modes. +func (r *daRetriever) ProcessBlobs(ctx context.Context, blobs [][]byte, daHeight uint64) []common.DAHeightEvent { // Decode all blobs for _, bz := range blobs { if len(bz) == 0 { diff --git a/block/internal/syncing/da_retriever_mock.go b/block/internal/syncing/da_retriever_mock.go index d94dff4d6..10e08bbd9 100644 --- a/block/internal/syncing/da_retriever_mock.go +++ b/block/internal/syncing/da_retriever_mock.go @@ -38,6 +38,71 @@ func (_m *MockDARetriever) EXPECT() *MockDARetriever_Expecter { return &MockDARetriever_Expecter{mock: &_m.Mock} } +// ProcessBlobs provides a mock function for the type MockDARetriever +func (_mock *MockDARetriever) ProcessBlobs(ctx context.Context, blobs [][]byte, daHeight uint64) []common.DAHeightEvent { + ret := _mock.Called(ctx, blobs, daHeight) + + if len(ret) == 0 { + panic("no return value specified for ProcessBlobs") + } + + var r0 []common.DAHeightEvent + if returnFunc, ok := ret.Get(0).(func(context.Context, [][]byte, uint64) []common.DAHeightEvent); ok { + r0 = returnFunc(ctx, blobs, daHeight) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]common.DAHeightEvent) + } + } + return r0 +} + +// MockDARetriever_ProcessBlobs_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ProcessBlobs' +type MockDARetriever_ProcessBlobs_Call struct { + *mock.Call +} + +// ProcessBlobs is a helper method to define mock.On call +// - ctx context.Context +// - blobs [][]byte +// - daHeight uint64 +func (_e *MockDARetriever_Expecter) ProcessBlobs(ctx interface{}, blobs interface{}, daHeight interface{}) *MockDARetriever_ProcessBlobs_Call { + return &MockDARetriever_ProcessBlobs_Call{Call: _e.mock.On("ProcessBlobs", ctx, blobs, daHeight)} +} + +func (_c *MockDARetriever_ProcessBlobs_Call) Run(run func(ctx context.Context, blobs [][]byte, daHeight uint64)) *MockDARetriever_ProcessBlobs_Call { + _c.Call.Run(func(args mock.Arguments) { + var arg0 context.Context + if args[0] != nil { + arg0 = args[0].(context.Context) + } + var arg1 [][]byte + if args[1] != nil { + arg1 = args[1].([][]byte) + } + var arg2 uint64 + if args[2] != nil { + arg2 = args[2].(uint64) + } + run( + arg0, + arg1, + arg2, + ) + }) + return _c +} + +func (_c *MockDARetriever_ProcessBlobs_Call) Return(dAHeightEvents []common.DAHeightEvent) *MockDARetriever_ProcessBlobs_Call { + _c.Call.Return(dAHeightEvents) + return _c +} + +func (_c *MockDARetriever_ProcessBlobs_Call) RunAndReturn(run func(ctx context.Context, blobs [][]byte, daHeight uint64) []common.DAHeightEvent) *MockDARetriever_ProcessBlobs_Call { + _c.Call.Return(run) + return _c +} + // RetrieveFromDA provides a mock function for the type MockDARetriever func (_mock *MockDARetriever) RetrieveFromDA(ctx context.Context, daHeight uint64) ([]common.DAHeightEvent, error) { ret := _mock.Called(ctx, daHeight) diff --git a/block/internal/syncing/da_retriever_test.go b/block/internal/syncing/da_retriever_test.go index 8b27513a8..76cc666dd 100644 --- a/block/internal/syncing/da_retriever_test.go +++ b/block/internal/syncing/da_retriever_test.go @@ -148,7 +148,7 @@ func TestDARetriever_ProcessBlobs_HeaderAndData_Success(t *testing.T) { dataBin, data := makeSignedDataBytes(t, gen.ChainID, 2, addr, pub, signer, 2) hdrBin, _ := makeSignedHeaderBytes(t, gen.ChainID, 2, addr, pub, signer, nil, &data.Data, nil) - events := r.processBlobs(context.Background(), [][]byte{hdrBin, dataBin}, 77) + events := r.ProcessBlobs(context.Background(), [][]byte{hdrBin, dataBin}, 77) require.Len(t, events, 1) assert.Equal(t, uint64(2), events[0].Header.Height()) assert.Equal(t, uint64(2), events[0].Data.Height()) @@ -172,7 +172,7 @@ func TestDARetriever_ProcessBlobs_HeaderOnly_EmptyDataExpected(t *testing.T) { // Header with no data hash present should trigger empty data creation (per current logic) hb, _ := makeSignedHeaderBytes(t, gen.ChainID, 3, addr, pub, signer, nil, nil, nil) - events := r.processBlobs(context.Background(), [][]byte{hb}, 88) + events := r.ProcessBlobs(context.Background(), [][]byte{hb}, 88) require.Len(t, events, 1) assert.Equal(t, uint64(3), events[0].Header.Height()) assert.NotNil(t, events[0].Data) @@ -282,14 +282,14 @@ func TestDARetriever_ProcessBlobs_CrossDAHeightMatching(t *testing.T) { hdrBin, _ := makeSignedHeaderBytes(t, gen.ChainID, 5, addr, pub, signer, nil, &data.Data, nil) // Process header from DA height 100 first - events1 := r.processBlobs(context.Background(), [][]byte{hdrBin}, 100) + events1 := r.ProcessBlobs(context.Background(), [][]byte{hdrBin}, 100) require.Len(t, events1, 0, "should not create event yet - data is missing") // Verify header is stored in pending headers require.Contains(t, r.pendingHeaders, uint64(5), "header should be stored as pending") // Process data from DA height 102 - events2 := r.processBlobs(context.Background(), [][]byte{dataBin}, 102) + events2 := r.ProcessBlobs(context.Background(), [][]byte{dataBin}, 102) require.Len(t, events2, 1, "should create event when matching data arrives") event := events2[0] @@ -319,7 +319,7 @@ func TestDARetriever_ProcessBlobs_MultipleHeadersCrossDAHeightMatching(t *testin hdr5Bin, _ := makeSignedHeaderBytes(t, gen.ChainID, 5, addr, pub, signer, nil, &data5.Data, nil) // Process multiple headers from DA height 200 - should be stored as pending - events1 := r.processBlobs(context.Background(), [][]byte{hdr3Bin, hdr4Bin, hdr5Bin}, 200) + events1 := r.ProcessBlobs(context.Background(), [][]byte{hdr3Bin, hdr4Bin, hdr5Bin}, 200) require.Len(t, events1, 0, "should not create events yet - all data is missing") // Verify all headers are stored in pending @@ -328,7 +328,7 @@ func TestDARetriever_ProcessBlobs_MultipleHeadersCrossDAHeightMatching(t *testin require.Contains(t, r.pendingHeaders, uint64(5), "header 5 should be pending") // Process some data from DA height 203 - should create partial events - events2 := r.processBlobs(context.Background(), [][]byte{data3Bin, data5Bin}, 203) + events2 := r.ProcessBlobs(context.Background(), [][]byte{data3Bin, data5Bin}, 203) require.Len(t, events2, 2, "should create events for heights 3 and 5") // Sort events by height for consistent testing @@ -352,7 +352,7 @@ func TestDARetriever_ProcessBlobs_MultipleHeadersCrossDAHeightMatching(t *testin require.NotContains(t, r.pendingHeaders, uint64(5), "header 5 should be removed from pending") // Process remaining data from DA height 205 - events3 := r.processBlobs(context.Background(), [][]byte{data4Bin}, 205) + events3 := r.ProcessBlobs(context.Background(), [][]byte{data4Bin}, 205) require.Len(t, events3, 1, "should create event for height 4") // Verify final event for height 4 diff --git a/block/internal/syncing/syncer.go b/block/internal/syncing/syncer.go index 266bc55e4..74ef76d0a 100644 --- a/block/internal/syncing/syncer.go +++ b/block/internal/syncing/syncer.go @@ -22,12 +22,45 @@ import ( "github.com/evstack/ev-node/block/internal/common" "github.com/evstack/ev-node/block/internal/da" "github.com/evstack/ev-node/pkg/config" + blobrpc "github.com/evstack/ev-node/pkg/da/jsonrpc" datypes "github.com/evstack/ev-node/pkg/da/types" "github.com/evstack/ev-node/pkg/genesis" "github.com/evstack/ev-node/pkg/store" "github.com/evstack/ev-node/types" ) +// SyncMode represents the current synchronization mode for the DA worker. +type SyncMode int + +const ( + // SyncModeCatchup indicates the node is behind the DA chain head and polling aggressively. + SyncModeCatchup SyncMode = iota + // SyncModeFollow indicates the node is caught up and using subscription for real-time updates. + SyncModeFollow +) + +// String returns a human-readable representation of the sync mode. +func (m SyncMode) String() string { + switch m { + case SyncModeCatchup: + return "catchup" + case SyncModeFollow: + return "follow" + default: + return "unknown" + } +} + +const ( + // catchupThreshold is the number of DA blocks behind local head + // before switching from follow to catchup mode. + catchupThreshold = 2 + + // followWatchdogMultiplier is the multiplier for BlockTime + // used as subscription watchdog timeout. + followWatchdogMultiplier = 3 +) + // forcedInclusionGracePeriodConfig contains internal configuration for forced inclusion grace periods. type forcedInclusionGracePeriodConfig struct { // basePeriod is the base number of additional epochs allowed for including forced inclusion transactions @@ -118,6 +151,9 @@ type Syncer struct { // P2P wait coordination p2pWaitState atomic.Value // stores p2pWaitState + + // Sync mode tracking + currentSyncMode atomic.Int32 // stores SyncMode as int32 } // pendingForcedInclusionTx represents a forced inclusion transaction that hasn't been included yet @@ -318,28 +354,237 @@ func (s *Syncer) daWorkerLoop() { defer s.logger.Info().Msg("DA worker stopped") for { - err := s.fetchDAUntilCaughtUp() + select { + case <-s.ctx.Done(): + return + default: + } - var backoff time.Duration - if err == nil { - // No error, means we are caught up. - backoff = s.config.DA.BlockTime.Duration - } else { - // Error, back off for a shorter duration. - backoff = s.config.DA.BlockTime.Duration - if backoff <= 0 { - backoff = 2 * time.Second + mode := s.determineSyncMode() + previousMode := SyncMode(s.currentSyncMode.Load()) + + // Track mode switches + if mode != previousMode { + s.currentSyncMode.Store(int32(mode)) + s.metrics.ModeSwitches.Add(1) + s.logger.Info(). + Str("from", previousMode.String()). + Str("to", mode.String()). + Msg("sync mode changed") + } + + switch mode { + case SyncModeCatchup: + s.runCatchupMode() + case SyncModeFollow: + s.runFollowMode() + } + } +} + +// determineSyncMode checks the current DA sync status and returns the appropriate mode. +func (s *Syncer) determineSyncMode() SyncMode { + // If DA client is nil (e.g., in tests), default to catchup mode + if s.daClient == nil { + return SyncModeCatchup + } + + localHead, err := s.daClient.LocalHead(s.ctx) + if err != nil { + // Default to catchup on error - safer to poll than assume we're caught up + s.logger.Debug().Err(err).Msg("failed to get local DA head, defaulting to catchup mode") + return SyncModeCatchup + } + + currentDAHeight := s.daRetrieverHeight.Load() + + // Consider "caught up" if within catchupThreshold blocks of local head + if currentDAHeight+catchupThreshold >= localHead { + return SyncModeFollow + } + return SyncModeCatchup +} + +// runCatchupMode runs the catchup sync mode - aggressive polling until caught up. +func (s *Syncer) runCatchupMode() { + s.logger.Debug().Msg("running catchup mode") + s.metrics.SyncMode.Set(float64(SyncModeCatchup)) + + err := s.fetchDAUntilCaughtUp() + if errors.Is(err, context.Canceled) { + return + } + + // Back off before next iteration: + // - On error: wait before retrying to avoid hammering a failing DA layer + // - On success (caught up): wait for new DA blocks to appear + backoff := s.config.DA.BlockTime.Duration + if backoff <= 0 { + backoff = 2 * time.Second + } + + if err != nil { + s.logger.Debug().Err(err).Msg("catchup failed, backing off before retry") + } else { + s.logger.Debug().Msg("caught up with DA, backing off before next check") + } + + s.sleepOrDone(backoff) +} + +// runFollowMode runs the follow sync mode - subscription-based real-time updates. +func (s *Syncer) runFollowMode() { + s.logger.Debug().Msg("running follow mode") + s.metrics.SyncMode.Set(float64(SyncModeFollow)) + + err := s.subscribeAndFollow() + if err != nil && !errors.Is(err, context.Canceled) { + s.metrics.SubscribeErrors.Add(1) + s.logger.Warn().Err(err).Msg("subscribe failed, falling back to catchup") + // Don't sleep - go straight to catchup mode to recover + } +} + +// subscribeAndFollow uses the DA subscription API to receive real-time blob notifications. +// It subscribes to header, data, and forced inclusion namespaces and processes incoming blobs. +// Returns when subscription fails, context is cancelled, or node falls behind. +func (s *Syncer) subscribeAndFollow() error { + // Get namespaces + headerNS := s.daClient.GetHeaderNamespace() + dataNS := s.daClient.GetDataNamespace() + + // Create subscription context with cancellation + subCtx, cancel := context.WithCancel(s.ctx) + defer cancel() + + // Subscribe to header namespace + headerCh, err := s.daClient.Subscribe(subCtx, headerNS) + if err != nil { + return fmt.Errorf("failed to subscribe to header namespace: %w", err) + } + + // Subscribe to data namespace (only if different from header namespace) + var dataCh <-chan *blobrpc.SubscriptionResponse + if !bytes.Equal(headerNS, dataNS) { + dataCh, err = s.daClient.Subscribe(subCtx, dataNS) + if err != nil { + return fmt.Errorf("failed to subscribe to data namespace: %w", err) + } + } + + // Subscribe to forced inclusion namespace if configured + var forcedInclusionCh <-chan *blobrpc.SubscriptionResponse + if s.daClient.HasForcedInclusionNamespace() { + fiNS := s.daClient.GetForcedInclusionNamespace() + // Only subscribe if it's different from both header and data namespaces + if !bytes.Equal(fiNS, headerNS) && !bytes.Equal(fiNS, dataNS) { + forcedInclusionCh, err = s.daClient.Subscribe(subCtx, fiNS) + if err != nil { + return fmt.Errorf("failed to subscribe to forced inclusion namespace: %w", err) } + s.logger.Info().Msg("subscribed to forced inclusion namespace for follow mode") } + } + s.logger.Info().Msg("subscribed to DA namespaces for follow mode") + + // Calculate watchdog timeout + watchdogTimeout := s.config.DA.BlockTime.Duration * followWatchdogMultiplier + if watchdogTimeout <= 0 { + watchdogTimeout = 30 * time.Second + } + + // Process subscription events + // Note: Select on a nil channel blocks forever, so nil channels are effectively disabled + for { select { case <-s.ctx.Done(): - return - case <-time.After(backoff): + return s.ctx.Err() + + case resp, ok := <-headerCh: + if !ok { + return errors.New("header subscription closed") + } + if err := s.processSubscriptionResponse(resp); err != nil { + s.logger.Error().Err(err).Uint64("height", resp.Height).Msg("failed to process header subscription") + } + + case resp, ok := <-dataCh: + // Note: if dataCh is nil (same namespace as header), this case never fires + if !ok { + return errors.New("data subscription closed") + } + if err := s.processSubscriptionResponse(resp); err != nil { + s.logger.Error().Err(err).Uint64("height", resp.Height).Msg("failed to process data subscription") + } + + case resp, ok := <-forcedInclusionCh: + // Note: if forcedInclusionCh is nil (not configured), this case never fires + if !ok { + return errors.New("forced inclusion subscription closed") + } + // Forced inclusion responses are logged but not processed through processSubscriptionResponse + // They are handled separately by the forced inclusion retriever during block verification + s.logger.Debug(). + Uint64("da_height", resp.Height). + Int("blobs", len(resp.Blobs)). + Msg("received forced inclusion subscription notification") + + case <-time.After(watchdogTimeout): + // Watchdog: if no events for watchdogTimeout, recheck mode + // Might have fallen behind due to network issues + s.logger.Debug().Dur("timeout", watchdogTimeout).Msg("subscription watchdog triggered, checking sync mode") + if s.determineSyncMode() == SyncModeCatchup { + return errors.New("fell behind, switching to catchup") + } } } } +// processSubscriptionResponse processes a subscription response and sends events to the processing channel. +func (s *Syncer) processSubscriptionResponse(resp *blobrpc.SubscriptionResponse) error { + if resp == nil || len(resp.Blobs) == 0 { + return nil + } + + s.logger.Debug(). + Uint64("da_height", resp.Height). + Int("blobs", len(resp.Blobs)). + Msg("processing subscription response") + + // Convert blobs to raw byte slices for processing + blobs := make([][]byte, len(resp.Blobs)) + for i, blob := range resp.Blobs { + blobs[i] = blob.Data() + } + + // Process blobs using the DA retriever's ProcessBlobs method + events := s.daRetriever.ProcessBlobs(s.ctx, blobs, resp.Height) + + // Send events to the processing channel + for _, event := range events { + select { + case s.heightInCh <- event: + s.logger.Debug(). + Uint64("height", event.Header.Height()). + Uint64("da_height", event.DaHeight). + Msg("sent subscription event to processing") + default: + s.cache.SetPendingEvent(event.Header.Height(), &event) + s.logger.Debug(). + Uint64("height", event.Header.Height()). + Msg("subscription event queued as pending") + } + } + + // Update retriever height + if resp.Height >= s.daRetrieverHeight.Load() { + s.daRetrieverHeight.Store(resp.Height + 1) + } + + return nil +} + func (s *Syncer) fetchDAUntilCaughtUp() error { for { select { diff --git a/block/internal/syncing/syncer_test.go b/block/internal/syncing/syncer_test.go index 21b012cf2..bb90447f5 100644 --- a/block/internal/syncing/syncer_test.go +++ b/block/internal/syncing/syncer_test.go @@ -700,3 +700,186 @@ func TestSyncer_getHighestStoredDAHeight(t *testing.T) { highestDA = syncer.getHighestStoredDAHeight() assert.Equal(t, uint64(200), highestDA, "should return highest DA height from most recent included height") } + +func TestSyncMode_String(t *testing.T) { + tests := []struct { + mode SyncMode + expected string + }{ + {SyncModeCatchup, "catchup"}, + {SyncModeFollow, "follow"}, + {SyncMode(99), "unknown"}, + } + + for _, tt := range tests { + t.Run(tt.expected, func(t *testing.T) { + assert.Equal(t, tt.expected, tt.mode.String()) + }) + } +} + +func TestSyncer_determineSyncMode(t *testing.T) { + tests := []struct { + name string + localHead uint64 + localHeadErr error + currentHeight uint64 + expectedMode SyncMode + }{ + { + name: "caught up - at head", + localHead: 100, + localHeadErr: nil, + currentHeight: 100, + expectedMode: SyncModeFollow, + }, + { + name: "caught up - within threshold", + localHead: 100, + localHeadErr: nil, + currentHeight: 99, // within catchupThreshold (2) + expectedMode: SyncModeFollow, + }, + { + name: "caught up - at threshold boundary", + localHead: 100, + localHeadErr: nil, + currentHeight: 98, // exactly at threshold + expectedMode: SyncModeFollow, + }, + { + name: "behind - just past threshold", + localHead: 100, + localHeadErr: nil, + currentHeight: 97, // 3 behind, past threshold of 2 + expectedMode: SyncModeCatchup, + }, + { + name: "behind - significantly behind", + localHead: 100, + localHeadErr: nil, + currentHeight: 50, + expectedMode: SyncModeCatchup, + }, + { + name: "error getting local head - defaults to catchup", + localHead: 0, + localHeadErr: errors.New("connection failed"), + currentHeight: 100, + expectedMode: SyncModeCatchup, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mockDA := testmocks.NewMockClient(t) + if tt.localHeadErr != nil { + mockDA.EXPECT().LocalHead(mock.Anything).Return(uint64(0), tt.localHeadErr) + } else { + mockDA.EXPECT().LocalHead(mock.Anything).Return(tt.localHead, nil) + } + + syncer := &Syncer{ + daClient: mockDA, + daRetrieverHeight: &atomic.Uint64{}, + ctx: context.Background(), + logger: zerolog.Nop(), + } + syncer.daRetrieverHeight.Store(tt.currentHeight) + + mode := syncer.determineSyncMode() + assert.Equal(t, tt.expectedMode, mode) + }) + } +} + +func TestSyncer_runCatchupMode(t *testing.T) { + // Test that runCatchupMode correctly sets metrics and calls fetchDAUntilCaughtUp + mockDA := testmocks.NewMockClient(t) + // Use same namespace for header and data to simplify the test + namespace := []byte("namespace") + mockDA.EXPECT().GetHeaderNamespace().Return(namespace).Maybe() + mockDA.EXPECT().GetDataNamespace().Return(namespace).Maybe() + mockDA.EXPECT().Retrieve(mock.Anything, mock.Anything, namespace). + Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{ + Code: datypes.StatusHeightFromFuture, + Message: datypes.ErrHeightFromFuture.Error(), + }, + }).Once() + + cfg := config.DefaultConfig() + cfg.DA.BlockTime.Duration = 10 * time.Millisecond + + metrics := common.NopMetrics() + + syncer := &Syncer{ + daClient: mockDA, + daRetrieverHeight: &atomic.Uint64{}, + ctx: context.Background(), + logger: zerolog.Nop(), + config: cfg, + metrics: metrics, + cache: &mockCacheManager{}, + } + syncer.daRetrieverHeight.Store(1) + syncer.daRetriever = NewDARetriever(mockDA, &mockCacheManager{}, genesis.Genesis{}, zerolog.Nop()) + + // Run catchup mode - should return when caught up (ErrHeightFromFuture) + syncer.runCatchupMode() + + mockDA.AssertExpectations(t) +} + +func TestSyncer_modeSwitching(t *testing.T) { + // Test that mode switches are tracked correctly + mockDA := testmocks.NewMockClient(t) + + syncer := &Syncer{ + daClient: mockDA, + daRetrieverHeight: &atomic.Uint64{}, + ctx: context.Background(), + logger: zerolog.Nop(), + metrics: common.NopMetrics(), + currentSyncMode: atomic.Int32{}, + } + + // Initial mode should be catchup (0) + assert.Equal(t, SyncModeCatchup, SyncMode(syncer.currentSyncMode.Load())) + + // Simulate switching to follow mode + syncer.currentSyncMode.Store(int32(SyncModeFollow)) + assert.Equal(t, SyncModeFollow, SyncMode(syncer.currentSyncMode.Load())) + + // Switch back to catchup + syncer.currentSyncMode.Store(int32(SyncModeCatchup)) + assert.Equal(t, SyncModeCatchup, SyncMode(syncer.currentSyncMode.Load())) +} + +// mockCacheManager is a minimal implementation for testing +type mockCacheManager struct{} + +func (m *mockCacheManager) DaHeight() uint64 { return 0 } +func (m *mockCacheManager) SetHeaderSeen(hash string, height uint64) { +} +func (m *mockCacheManager) IsHeaderSeen(hash string) bool { return false } +func (m *mockCacheManager) SetDataSeen(hash string, height uint64) { +} +func (m *mockCacheManager) IsDataSeen(hash string) bool { return false } +func (m *mockCacheManager) SetHeaderDAIncluded(hash string, daHeight, height uint64) { +} +func (m *mockCacheManager) GetHeaderDAIncluded(hash string) (uint64, bool) { return 0, false } +func (m *mockCacheManager) RemoveHeaderDAIncluded(hash string) {} +func (m *mockCacheManager) SetDataDAIncluded(hash string, daHeight, height uint64) { +} +func (m *mockCacheManager) GetDataDAIncluded(hash string) (uint64, bool) { return 0, false } +func (m *mockCacheManager) IsTxSeen(hash string) bool { return false } +func (m *mockCacheManager) SetTxSeen(hash string) {} +func (m *mockCacheManager) CleanupOldTxs(olderThan time.Duration) int { return 0 } +func (m *mockCacheManager) SetPendingEvent(height uint64, event *common.DAHeightEvent) { +} +func (m *mockCacheManager) GetNextPendingEvent(height uint64) *common.DAHeightEvent { return nil } +func (m *mockCacheManager) SaveToDisk() error { return nil } +func (m *mockCacheManager) LoadFromDisk() error { return nil } +func (m *mockCacheManager) ClearFromDisk() error { return nil } +func (m *mockCacheManager) DeleteHeight(blockHeight uint64) {} diff --git a/test/mocks/da.go b/test/mocks/da.go index 0b5c71a49..fd9bb8db6 100644 --- a/test/mocks/da.go +++ b/test/mocks/da.go @@ -7,6 +7,7 @@ package mocks import ( "context" + "github.com/evstack/ev-node/pkg/da/jsonrpc" "github.com/evstack/ev-node/pkg/da/types" mock "github.com/stretchr/testify/mock" ) @@ -294,6 +295,66 @@ func (_c *MockClient_HasForcedInclusionNamespace_Call) RunAndReturn(run func() b return _c } +// LocalHead provides a mock function for the type MockClient +func (_mock *MockClient) LocalHead(ctx context.Context) (uint64, error) { + ret := _mock.Called(ctx) + + if len(ret) == 0 { + panic("no return value specified for LocalHead") + } + + var r0 uint64 + var r1 error + if returnFunc, ok := ret.Get(0).(func(context.Context) (uint64, error)); ok { + return returnFunc(ctx) + } + if returnFunc, ok := ret.Get(0).(func(context.Context) uint64); ok { + r0 = returnFunc(ctx) + } else { + r0 = ret.Get(0).(uint64) + } + if returnFunc, ok := ret.Get(1).(func(context.Context) error); ok { + r1 = returnFunc(ctx) + } else { + r1 = ret.Error(1) + } + return r0, r1 +} + +// MockClient_LocalHead_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'LocalHead' +type MockClient_LocalHead_Call struct { + *mock.Call +} + +// LocalHead is a helper method to define mock.On call +// - ctx context.Context +func (_e *MockClient_Expecter) LocalHead(ctx interface{}) *MockClient_LocalHead_Call { + return &MockClient_LocalHead_Call{Call: _e.mock.On("LocalHead", ctx)} +} + +func (_c *MockClient_LocalHead_Call) Run(run func(ctx context.Context)) *MockClient_LocalHead_Call { + _c.Call.Run(func(args mock.Arguments) { + var arg0 context.Context + if args[0] != nil { + arg0 = args[0].(context.Context) + } + run( + arg0, + ) + }) + return _c +} + +func (_c *MockClient_LocalHead_Call) Return(v uint64, err error) *MockClient_LocalHead_Call { + _c.Call.Return(v, err) + return _c +} + +func (_c *MockClient_LocalHead_Call) RunAndReturn(run func(ctx context.Context) (uint64, error)) *MockClient_LocalHead_Call { + _c.Call.Return(run) + return _c +} + // Retrieve provides a mock function for the type MockClient func (_mock *MockClient) Retrieve(ctx context.Context, height uint64, namespace []byte) da.ResultRetrieve { ret := _mock.Called(ctx, height, namespace) @@ -432,6 +493,74 @@ func (_c *MockClient_Submit_Call) RunAndReturn(run func(ctx context.Context, dat return _c } +// Subscribe provides a mock function for the type MockClient +func (_mock *MockClient) Subscribe(ctx context.Context, namespace []byte) (<-chan *jsonrpc.SubscriptionResponse, error) { + ret := _mock.Called(ctx, namespace) + + if len(ret) == 0 { + panic("no return value specified for Subscribe") + } + + var r0 <-chan *jsonrpc.SubscriptionResponse + var r1 error + if returnFunc, ok := ret.Get(0).(func(context.Context, []byte) (<-chan *jsonrpc.SubscriptionResponse, error)); ok { + return returnFunc(ctx, namespace) + } + if returnFunc, ok := ret.Get(0).(func(context.Context, []byte) <-chan *jsonrpc.SubscriptionResponse); ok { + r0 = returnFunc(ctx, namespace) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(<-chan *jsonrpc.SubscriptionResponse) + } + } + if returnFunc, ok := ret.Get(1).(func(context.Context, []byte) error); ok { + r1 = returnFunc(ctx, namespace) + } else { + r1 = ret.Error(1) + } + return r0, r1 +} + +// MockClient_Subscribe_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Subscribe' +type MockClient_Subscribe_Call struct { + *mock.Call +} + +// Subscribe is a helper method to define mock.On call +// - ctx context.Context +// - namespace []byte +func (_e *MockClient_Expecter) Subscribe(ctx interface{}, namespace interface{}) *MockClient_Subscribe_Call { + return &MockClient_Subscribe_Call{Call: _e.mock.On("Subscribe", ctx, namespace)} +} + +func (_c *MockClient_Subscribe_Call) Run(run func(ctx context.Context, namespace []byte)) *MockClient_Subscribe_Call { + _c.Call.Run(func(args mock.Arguments) { + var arg0 context.Context + if args[0] != nil { + arg0 = args[0].(context.Context) + } + var arg1 []byte + if args[1] != nil { + arg1 = args[1].([]byte) + } + run( + arg0, + arg1, + ) + }) + return _c +} + +func (_c *MockClient_Subscribe_Call) Return(subscriptionResponseCh <-chan *jsonrpc.SubscriptionResponse, err error) *MockClient_Subscribe_Call { + _c.Call.Return(subscriptionResponseCh, err) + return _c +} + +func (_c *MockClient_Subscribe_Call) RunAndReturn(run func(ctx context.Context, namespace []byte) (<-chan *jsonrpc.SubscriptionResponse, error)) *MockClient_Subscribe_Call { + _c.Call.Return(run) + return _c +} + // NewMockVerifier creates a new instance of MockVerifier. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. // The first argument is typically a *testing.T value. func NewMockVerifier(t interface { diff --git a/test/testda/dummy.go b/test/testda/dummy.go index 633bf1cf9..876475f9c 100644 --- a/test/testda/dummy.go +++ b/test/testda/dummy.go @@ -7,6 +7,7 @@ import ( "sync/atomic" "time" + blobrpc "github.com/evstack/ev-node/pkg/da/jsonrpc" datypes "github.com/evstack/ev-node/pkg/da/types" ) @@ -290,3 +291,18 @@ func (d *DummyDA) GetHeaderByHeight(_ context.Context, height uint64) (*Header, } return header, nil } + +// Subscribe returns a channel that receives subscription responses. +// This is a stub implementation that returns an empty channel. +// In tests, callers should not rely on actual subscription behavior. +func (d *DummyDA) Subscribe(_ context.Context, _ []byte) (<-chan *blobrpc.SubscriptionResponse, error) { + // Return a channel that will never receive anything - tests should use mocks for subscription behavior + ch := make(chan *blobrpc.SubscriptionResponse) + return ch, nil +} + +// LocalHead returns the current DA height. +// This mirrors the HeaderAPI.LocalHead method and is used to determine sync mode. +func (d *DummyDA) LocalHead(_ context.Context) (uint64, error) { + return d.height.Load(), nil +} diff --git a/tools/local-da/rpc.go b/tools/local-da/rpc.go index 60dd51ac4..32c907bff 100644 --- a/tools/local-da/rpc.go +++ b/tools/local-da/rpc.go @@ -136,10 +136,53 @@ func (s *blobServer) Subscribe(_ context.Context, _ libshare.Namespace) (<-chan return ch, nil } -// startBlobServer starts an HTTP JSON-RPC server on addr serving the blob namespace. +// headerServer exposes a minimal header RPC surface backed by LocalDA. +type headerServer struct { + da *LocalDA + logger zerolog.Logger +} + +// LocalHead returns the header for the locally synced DA head. +func (s *headerServer) LocalHead(_ context.Context) (*jsonrpc.Header, error) { + s.da.mu.Lock() + defer s.da.mu.Unlock() + + return &jsonrpc.Header{ + Height: s.da.height, + BlockTime: s.da.timestamps[s.da.height], + }, nil +} + +// NetworkHead returns the header for the network DA head (same as local for LocalDA). +func (s *headerServer) NetworkHead(_ context.Context) (*jsonrpc.Header, error) { + return s.LocalHead(context.Background()) +} + +// GetByHeight returns the header for a specific height. +func (s *headerServer) GetByHeight(_ context.Context, height uint64) (*jsonrpc.Header, error) { + s.da.mu.Lock() + defer s.da.mu.Unlock() + + if height > s.da.height { + return nil, datypes.ErrHeightFromFuture + } + + ts, ok := s.da.timestamps[height] + if !ok { + ts = time.Time{} + } + + return &jsonrpc.Header{ + Height: height, + BlockTime: ts, + }, nil +} + +// startBlobServer starts an HTTP JSON-RPC server on addr serving the blob and header namespaces. func startBlobServer(logger zerolog.Logger, addr string, da *LocalDA) (*http.Server, error) { rpc := fjrpc.NewServer() rpc.Register("blob", &blobServer{da: da, logger: logger}) + rpc.Register("header", &headerServer{da: da, logger: logger}) srv := &http.Server{ Addr: addr,