diff --git a/.gitignore b/.gitignore index 58a46c4786c..756dc790450 100644 --- a/.gitignore +++ b/.gitignore @@ -112,4 +112,5 @@ mdbx.lck .my -cover.out \ No newline at end of file +cover.out +/erigon diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 33ab8f5c751..9d07bb1e191 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -1140,6 +1140,10 @@ var ( Usage: "EXPERIMENTAL: enables concurrent trie for commitment", Value: false, } + UseForkchoiceFinalityFlag = cli.BoolFlag{ + Name: "experimental.use-forkchoice-finality", + Usage: "Skip changeset generation for blocks finalized by forkchoice (e.g., Polygon milestones). Reduces overhead but requires chaindata reset if finality is reverted.", + } GDBMeFlag = cli.BoolFlag{ Name: "gdbme", Usage: "restart erigon under gdb for debug purposes", diff --git a/core/vm/contracts.go b/core/vm/contracts.go index dfe61d11adc..d093d4dfb90 100644 --- a/core/vm/contracts.go +++ b/core/vm/contracts.go @@ -195,6 +195,7 @@ var PrecompiledContractsPrague = map[common.Address]PrecompiledContract{ common.BytesToAddress([]byte{0x07}): &bn254ScalarMulIstanbul{}, common.BytesToAddress([]byte{0x08}): &bn254PairingIstanbul{}, common.BytesToAddress([]byte{0x09}): &blake2F{}, + common.BytesToAddress([]byte{0x0a}): &pointEvaluation{}, common.BytesToAddress([]byte{0x0b}): &bls12381G1Add{}, common.BytesToAddress([]byte{0x0c}): &bls12381G1MultiExp{}, common.BytesToAddress([]byte{0x0d}): &bls12381G2Add{}, @@ -235,6 +236,7 @@ var PrecompiledContractsMadhugiri = map[common.Address]PrecompiledContract{ common.BytesToAddress([]byte{0x07}): &bn254ScalarMulIstanbul{}, common.BytesToAddress([]byte{0x08}): &bn254PairingIstanbul{}, common.BytesToAddress([]byte{0x09}): &blake2F{}, + common.BytesToAddress([]byte{0x0a}): &pointEvaluation{}, common.BytesToAddress([]byte{0x0b}): &bls12381G1Add{}, common.BytesToAddress([]byte{0x0c}): &bls12381G1MultiExp{}, common.BytesToAddress([]byte{0x0d}): &bls12381G2Add{}, @@ -254,6 +256,7 @@ var PrecompiledContractsMadhugiriPro = map[common.Address]PrecompiledContract{ common.BytesToAddress([]byte{0x07}): &bn254ScalarMulIstanbul{}, common.BytesToAddress([]byte{0x08}): &bn254PairingIstanbul{}, common.BytesToAddress([]byte{0x09}): &blake2F{}, + common.BytesToAddress([]byte{0x0a}): &pointEvaluation{}, common.BytesToAddress([]byte{0x0b}): &bls12381G1Add{}, common.BytesToAddress([]byte{0x0c}): &bls12381G1MultiExp{}, common.BytesToAddress([]byte{0x0d}): &bls12381G2Add{}, diff --git a/core/vm/contracts_test.go b/core/vm/contracts_test.go index ce9eda7b8c0..b2efccb1463 100644 --- a/core/vm/contracts_test.go +++ b/core/vm/contracts_test.go @@ -527,35 +527,73 @@ func TestLisovoCLZOpcode(t *testing.T) { } } -// TestPointEvaluationPrecompileRemoval verifies that the pointEvaluation (KZG) precompile -// is present before LisovoPro and removed starting with LisovoPro. -func TestPointEvaluationPrecompileRemoval(t *testing.T) { +// TestPointEvaluationPrecompileAvailability verifies that the pointEvaluation (KZG) +// precompile remains available through Lisovo and is only removed starting with LisovoPro. +func TestPointEvaluationPrecompileAvailability(t *testing.T) { t.Parallel() pointEvaluationAddr := common.BytesToAddress([]byte{0x0a}) - // Test Lisovo: should have pointEvaluation - lisovoRules := &chain.Rules{ - IsLisovo: true, - IsMadhugiriPro: true, - IsMadhugiri: true, - IsBhilai: true, - } - lisovoPrecompiles := Precompiles(lisovoRules) - if _, exists := lisovoPrecompiles[pointEvaluationAddr]; !exists { - t.Error("pointEvaluation (0x0a) should exist in Lisovo precompiles") + testCases := []struct { + name string + rules *chain.Rules + wantExist bool + }{ + { + name: "Prague", + rules: &chain.Rules{ + IsPrague: true, + }, + wantExist: true, + }, + { + name: "Madhugiri", + rules: &chain.Rules{ + IsMadhugiri: true, + IsBhilai: true, + }, + wantExist: true, + }, + { + name: "MadhugiriPro", + rules: &chain.Rules{ + IsMadhugiriPro: true, + IsMadhugiri: true, + IsBhilai: true, + }, + wantExist: true, + }, + { + name: "Lisovo", + rules: &chain.Rules{ + IsLisovo: true, + IsMadhugiriPro: true, + IsMadhugiri: true, + IsBhilai: true, + }, + wantExist: true, + }, + { + name: "LisovoPro", + rules: &chain.Rules{ + IsLisovoPro: true, + IsLisovo: true, + IsMadhugiriPro: true, + IsMadhugiri: true, + IsBhilai: true, + }, + wantExist: false, + }, } - // Test LisovoPro: should not have pointEvaluation - lisovoProRules := &chain.Rules{ - IsLisovoPro: true, - IsLisovo: true, - IsMadhugiriPro: true, - IsMadhugiri: true, - IsBhilai: true, - } - lisovoProPrecompiles := Precompiles(lisovoProRules) - if _, exists := lisovoProPrecompiles[pointEvaluationAddr]; exists { - t.Error("pointEvaluation (0x0a) should not exist in LisovoPro precompiles") + for _, testCase := range testCases { + testCase := testCase + t.Run(testCase.name, func(t *testing.T) { + precompiles := Precompiles(testCase.rules) + _, exists := precompiles[pointEvaluationAddr] + if exists != testCase.wantExist { + t.Fatalf("pointEvaluation (0x0a) existence = %t, want %t", exists, testCase.wantExist) + } + }) } } diff --git a/db/rawdb/rawtemporaldb/accessors_commitment.go b/db/rawdb/rawtemporaldb/accessors_commitment.go index aa836448e26..c22bc5ad470 100644 --- a/db/rawdb/rawtemporaldb/accessors_commitment.go +++ b/db/rawdb/rawtemporaldb/accessors_commitment.go @@ -15,7 +15,7 @@ func CanUnwindToBlockNum(tx kv.TemporalTx) (uint64, error) { return 0, err } if minUnwindale == math.MaxUint64 { // no unwindable block found - log.Warn("no unwindable block found from changesets, falling back to latest with commitment") + log.Debug("no unwindable block found from changesets, falling back to latest with commitment") return commitmentdb.LatestBlockNumWithCommitment(tx) } if minUnwindale > 0 { diff --git a/db/state/aggregator.go b/db/state/aggregator.go index 7e68d362b7b..890db41e6a1 100644 --- a/db/state/aggregator.go +++ b/db/state/aggregator.go @@ -1019,15 +1019,6 @@ func (at *AggregatorRoTx) PruneSmallBatches(ctx context.Context, timeout time.Du fullStat := newAggregatorPruneStat() for { - if sptx, ok := tx.(kv.HasSpaceDirty); ok && !furiousPrune && !aggressivePrune { - spaceDirty, _, err := sptx.SpaceDirty() - if err != nil { - return false, err - } - if spaceDirty > uint64(statecfg.MaxNonFuriousDirtySpacePerTx) { - return false, nil - } - } iterationStarted := time.Now() // `context.Background()` is important here! // it allows keep DB consistent - prune all keys-related data or noting diff --git a/db/state/statecfg/state_schema.go b/db/state/statecfg/state_schema.go index d7edb845e77..d097edbbe63 100644 --- a/db/state/statecfg/state_schema.go +++ b/db/state/statecfg/state_schema.go @@ -64,7 +64,7 @@ func Configure(Schema SchemaGen, a AggSetters, dirs datadir.Dirs, salt *uint32, } const AggregatorSqueezeCommitmentValues = true -const MaxNonFuriousDirtySpacePerTx = 64 * datasize.MB +const MaxNonFuriousDirtySpacePerTx = 128 * datasize.MB var dbgCommBtIndex = dbg.EnvBool("AGG_COMMITMENT_BT", false) diff --git a/eth/backend.go b/eth/backend.go index 1250488e01d..3324471b23f 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -228,6 +228,14 @@ type Ethereum struct { heimdallService *heimdall.Service stopNode func() error bgComponentsEg errgroup.Group + bgComponentMu sync.Mutex + bgComponentErr error +} + +func (s *Ethereum) BgComponentError() error { + s.bgComponentMu.Lock() + defer s.bgComponentMu.Unlock() + return s.bgComponentErr } func splitAddrIntoHostAndPort(addr string) (host string, port int, err error) { @@ -1824,11 +1832,20 @@ func (s *Ethereum) Stop() error { if err := s.bgComponentsEg.Wait(); err != nil && !errors.Is(err, context.Canceled) { s.logger.Error("background component error", "err", err) + s.bgComponentMu.Lock() + s.bgComponentErr = err + s.bgComponentMu.Unlock() } return nil } +// BgComponentError returns the first non-context-canceled error from background +// components, if any. It is safe to call after Stop() has returned. +func (s *Ethereum) BgComponentError() error { + return s.bgComponentErr +} + func (s *Ethereum) ChainDB() kv.RwDB { return s.chainDB } diff --git a/eth/ethconfig/config.go b/eth/ethconfig/config.go index b4192e00282..52dc96bfd34 100644 --- a/eth/ethconfig/config.go +++ b/eth/ethconfig/config.go @@ -291,6 +291,7 @@ type Sync struct { ChaosMonkey bool AlwaysGenerateChangesets bool + UseForkchoiceFinality bool MaxReorgDepth uint64 KeepExecutionProofs bool PersistReceiptsCacheV2 bool diff --git a/execution/eth1/forkchoice.go b/execution/eth1/forkchoice.go index 9c391650f4a..7c630f1169a 100644 --- a/execution/eth1/forkchoice.go +++ b/execution/eth1/forkchoice.go @@ -486,6 +486,12 @@ func (e *EthereumExecutionModule) updateForkChoice(ctx context.Context, original } } + // Write finalized hash BEFORE execution so shouldGenerateChangeSets() can read it + // during execution stage. This enables the UseForkchoiceFinality optimization. + if finalizedHash != (common.Hash{}) { + rawdb.WriteForkchoiceFinalized(tx, finalizedHash) + } + firstCycle := false loopIter := 0 for { diff --git a/execution/stagedsync/exec3.go b/execution/stagedsync/exec3.go index 18e2a182cf6..3c6341811e8 100644 --- a/execution/stagedsync/exec3.go +++ b/execution/stagedsync/exec3.go @@ -467,9 +467,14 @@ func ExecV3(ctx context.Context, lastFrozenTxNum = uint64((lastFrozenStep+1)*kv.Step(doms.StepSize())) - 1 } + var finalizedBlockNum uint64 + if cfg.syncCfg.UseForkchoiceFinality { + finalizedBlockNum = getFinalizedBlockNum(applyTx) + } + Loop: for ; blockNum <= maxBlockNum; blockNum++ { - shouldGenerateChangesets := shouldGenerateChangeSets(cfg, blockNum, maxBlockNum, initialCycle) + shouldGenerateChangesets := shouldGenerateChangeSets(cfg, finalizedBlockNum, blockNum, maxBlockNum, initialCycle) changeSet := &changeset2.StateChangeSet{} if shouldGenerateChangesets && blockNum > 0 { executor.domains().SetChangesetAccumulator(changeSet) @@ -762,9 +767,15 @@ Loop: timeStart := time.Now() - // allow greedy prune on non-chain-tip + // allow greedy prune on non-chain-tip or when not generating changesets pruneTimeout := 250 * time.Millisecond - if initialCycle { + if initialCycle || !shouldGenerateChangesets { + // When not generating changesets (finalized blocks), we can afford longer pruning + // since we're generating less data overall + if !initialCycle { + logger.Debug(fmt.Sprintf("[%s] aggressive pruning via forkchoice finality", execStage.LogPrefix()), + "block", blockNum, "finalizedBlock", finalizedBlockNum) + } pruneTimeout = 10 * time.Hour if err = executor.tx().(kv.TemporalRwTx).GreedyPruneHistory(ctx, kv.CommitmentDomain); err != nil { @@ -787,7 +798,9 @@ Loop: errExhausted = &ErrLoopExhausted{From: startBlockNum, To: blockNum, Reason: "block batch is full"} break Loop } - if !initialCycle && canPrune { + // Skip pruning break if not generating changesets (finalized blocks via UseForkchoiceFinality) + // This allows larger batches similar to initialCycle behavior + if !initialCycle && canPrune && shouldGenerateChangesets { errExhausted = &ErrLoopExhausted{From: startBlockNum, To: blockNum, Reason: "block batch can be pruned"} break Loop } @@ -1021,7 +1034,21 @@ func blockWithSenders(ctx context.Context, db kv.RoDB, tx kv.Tx, blockReader ser return b, err } -func shouldGenerateChangeSets(cfg ExecuteBlockCfg, blockNum, maxBlockNum uint64, initialCycle bool) bool { +// getFinalizedBlockNum returns the finalized block number from forkchoice state. +// Returns 0 if no finalized block is set or if the hash cannot be resolved to a block number. +func getFinalizedBlockNum(tx kv.Getter) uint64 { + finalizedHash := rawdb.ReadForkchoiceFinalized(tx) + if finalizedHash == (common.Hash{}) { + return 0 + } + finalizedNum := rawdb.ReadHeaderNumber(tx, finalizedHash) + if finalizedNum == nil { + return 0 + } + return *finalizedNum +} + +func shouldGenerateChangeSets(cfg ExecuteBlockCfg, finalizedBlockNum, blockNum, maxBlockNum uint64, initialCycle bool) bool { if cfg.syncCfg.AlwaysGenerateChangesets { return true } @@ -1031,6 +1058,15 @@ func shouldGenerateChangeSets(cfg ExecuteBlockCfg, blockNum, maxBlockNum uint64, if initialCycle { return false } - // once past the initial cycle, make sure to generate changesets for the last blocks that fall in the reorg window + + // Use forkchoice finalized block if enabled and available (e.g., Polygon milestones). + // Blocks at or before the finalized block don't need changesets since they cannot be reorged. + // WARNING: If finality is later reverted (e.g., faulty milestone purged by hard fork), + // the node will require a chaindata reset to recover. + if cfg.syncCfg.UseForkchoiceFinality && finalizedBlockNum > 0 && blockNum <= finalizedBlockNum { + return false + } + + // Fallback: generate changesets for blocks in the reorg window (last MaxReorgDepth blocks) return blockNum+cfg.syncCfg.MaxReorgDepth >= maxBlockNum } diff --git a/execution/stagedsync/exec3_changeset_test.go b/execution/stagedsync/exec3_changeset_test.go new file mode 100644 index 00000000000..df0fd0d6033 --- /dev/null +++ b/execution/stagedsync/exec3_changeset_test.go @@ -0,0 +1,112 @@ +package stagedsync + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/erigontech/erigon/eth/ethconfig" + "github.com/erigontech/erigon/turbo/services" +) + +// mockBlockReader implements services.FullBlockReader with only FrozenBlocks() used. +type mockBlockReader struct { + services.FullBlockReader + frozenBlocks uint64 +} + +func (m *mockBlockReader) FrozenBlocks() uint64 { + return m.frozenBlocks +} + +func TestShouldGenerateChangeSets(t *testing.T) { + tests := []struct { + name string + alwaysGenerate bool + frozenBlocks uint64 + initialCycle bool + useFinality bool + finalizedBlockNum uint64 + maxReorgDepth uint64 + blockNum uint64 + maxBlockNum uint64 + want bool + }{ + { + name: "AlwaysGenerateChangesets overrides everything", + alwaysGenerate: true, + frozenBlocks: 1000, + blockNum: 500, // below frozen + maxBlockNum: 2000, + want: true, + }, + { + name: "block below frozen returns false", + frozenBlocks: 1000, + blockNum: 999, + maxBlockNum: 2000, + want: false, + }, + { + name: "initialCycle returns false", + initialCycle: true, + blockNum: 1500, + maxBlockNum: 2000, + want: false, + }, + { + name: "UseForkchoiceFinality: block below finalized returns false", + useFinality: true, + finalizedBlockNum: 1000, + blockNum: 900, + maxBlockNum: 2000, + want: false, + }, + { + name: "UseForkchoiceFinality: block equal to finalized returns false", + useFinality: true, + finalizedBlockNum: 1000, + blockNum: 1000, + maxBlockNum: 2000, + want: false, + }, + { + name: "UseForkchoiceFinality: block above finalized falls through to MaxReorgDepth", + useFinality: true, + finalizedBlockNum: 1000, + maxReorgDepth: 100, + blockNum: 1001, + maxBlockNum: 2000, + want: false, // 1001 + 100 = 1101 < 2000 + }, + { + name: "MaxReorgDepth: block in reorg window returns true", + maxReorgDepth: 100, + blockNum: 1950, + maxBlockNum: 2000, + want: true, // 1950 + 100 = 2050 >= 2000 + }, + { + name: "MaxReorgDepth: block outside reorg window returns false", + maxReorgDepth: 100, + blockNum: 1800, + maxBlockNum: 2000, + want: false, // 1800 + 100 = 1900 < 2000 + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cfg := ExecuteBlockCfg{ + blockReader: &mockBlockReader{frozenBlocks: tt.frozenBlocks}, + syncCfg: ethconfig.Sync{ + AlwaysGenerateChangesets: tt.alwaysGenerate, + UseForkchoiceFinality: tt.useFinality, + MaxReorgDepth: tt.maxReorgDepth, + }, + } + got := shouldGenerateChangeSets(cfg, tt.finalizedBlockNum, tt.blockNum, tt.maxBlockNum, tt.initialCycle) + assert.Equal(t, tt.want, got, "shouldGenerateChangeSets(%d, %d, %d)", tt.blockNum, tt.maxBlockNum, tt.initialCycle) + }) + } +} diff --git a/execution/stagedsync/exec3_parallel.go b/execution/stagedsync/exec3_parallel.go index 20220b195bf..4281c3094ce 100644 --- a/execution/stagedsync/exec3_parallel.go +++ b/execution/stagedsync/exec3_parallel.go @@ -132,7 +132,11 @@ func (te *txExecutor) getHeader(ctx context.Context, hash common.Hash, number ui } func (te *txExecutor) shouldGenerateChangeSets() bool { - return shouldGenerateChangeSets(te.cfg, te.inputBlockNum.Load(), te.maxBlockNum, te.initialCycle) + var finalizedBlockNum uint64 + if te.cfg.syncCfg.UseForkchoiceFinality { + finalizedBlockNum = getFinalizedBlockNum(te.applyTx) + } + return shouldGenerateChangeSets(te.cfg, finalizedBlockNum, te.inputBlockNum.Load(), te.maxBlockNum, te.initialCycle) } type parallelExecutor struct { diff --git a/execution/stagedsync/stage_execute.go b/execution/stagedsync/stage_execute.go index eb614bedff3..687e31dede3 100644 --- a/execution/stagedsync/stage_execute.go +++ b/execution/stagedsync/stage_execute.go @@ -436,7 +436,7 @@ func PruneExecutionStage(s *PruneState, tx kv.RwTx, cfg ExecuteBlockCfg, ctx con // - stop prune when `tx.SpaceDirty()` is big // - and set ~500ms timeout // because on slow disks - prune is slower. but for now - let's tune for nvme first, and add `tx.SpaceDirty()` check later https://github.com/erigontech/erigon/issues/11635 - quickPruneTimeout := 500 * time.Millisecond + quickPruneTimeout := 250 * time.Millisecond if s.ForwardProgress > cfg.syncCfg.MaxReorgDepth && !cfg.syncCfg.AlwaysGenerateChangesets { // (chunkLen is 8Kb) * (1_000 chunks) = 8mb @@ -489,6 +489,12 @@ func PruneExecutionStage(s *PruneState, tx kv.RwTx, cfg ExecuteBlockCfg, ctx con "externalTx", useExternalTx, ) } + } else if cfg.syncCfg.UseForkchoiceFinality { + // Forkchoice finality keeps the node stable at tip, eliminating the natural + // syncToTip fallbacks that triggered initialCycle=true with aggressive pruning. + // Use aggressive timeout (>=1min triggers adaptive batch ramp-up in PruneSmallBatches) + // to drain accumulated commitment history at step boundaries. + pruneTimeout = 60 * time.Second } pruneSmallBatchesStartTime := time.Now() diff --git a/execution/stages/stageloop.go b/execution/stages/stageloop.go index 9d66dba18b2..955b6932486 100644 --- a/execution/stages/stageloop.go +++ b/execution/stages/stageloop.go @@ -118,6 +118,11 @@ func StageLoop( hd.AfterInitialCycle() } + // Use idle time between sync cycles for aggressive pruning + if !initialCycle { + idlePruneAggressiveWindow(ctx, db, logger) + } + if loopMinTime != 0 { waitTime := loopMinTime - time.Since(start) logger.Info("Wait time until next loop", "for", waitTime) @@ -131,6 +136,29 @@ func StageLoop( } } +// idlePruneAggressiveWindow runs aggressive pruning during idle time between sync cycles. +// On fast chains (e.g. ~2s block time), the normal per-block prune window (250ms) is too short +// to keep up with state growth. This function uses the idle time after a sync cycle completes +// to run a longer prune session with aggressive mode (timeout >= 1min triggers 10x scaling). +func idlePruneAggressiveWindow(ctx context.Context, db kv.RwDB, logger log.Logger) { + const idlePruneTimeout = 2 * time.Minute + pruneStart := time.Now() + var haveMore bool + if err := db.Update(ctx, func(tx kv.RwTx) error { + var err error + haveMore, err = tx.(kv.TemporalRwTx).PruneSmallBatches(ctx, idlePruneTimeout) + return err + }); err != nil { + if !errors.Is(err, context.Canceled) { + logger.Warn("[idle-prune] aggressive prune failed", "err", err) + } + return + } + if duration := time.Since(pruneStart); duration > 1*time.Second { + logger.Info("[idle-prune] aggressive prune completed", "duration", duration, "haveMore", haveMore) + } +} + // ProcessFrozenBlocks - withuot global rwtx func ProcessFrozenBlocks(ctx context.Context, db kv.RwDB, blockReader services.FullBlockReader, sync *stagedsync.Sync, hook *Hook) error { sawZeroBlocksTimes := 0 diff --git a/polygon/sync/sync.go b/polygon/sync/sync.go index c6bfb330201..8fbcc10330d 100644 --- a/polygon/sync/sync.go +++ b/polygon/sync/sync.go @@ -49,6 +49,10 @@ const maxBlockBatchDownloadSize = 256 const heimdallSyncRetryIntervalOnTip = 200 * time.Millisecond const heimdallSyncRetryIntervalOnStartup = 30 * time.Second +// catchUpAgeThreshold is the maximum acceptable tip age before the event loop +// breaks out and re-enters syncToTip for efficient waypoint-based catch-up batching. +const catchUpAgeThreshold = 30 * time.Second + var ( futureMilestoneDelay = 1 * time.Second // amount of time to wait before putting a future milestone back in the event queue errAlreadyProcessed = errors.New("already processed") @@ -143,6 +147,17 @@ type Sync struct { wiggleCalculator wiggleCalculator engineAPISwitcher EngineAPISwitcher blockRequestsCache *lru.ARCCache[common.Hash, struct{}] + + // lastFinalizedBlockNum tracks the end block of the last validated finality + // waypoint (milestone or checkpoint). This is used to set the finalized block + // in forkchoice updates, enabling the execution stage to skip changeset + // generation for finalized blocks. + lastFinalizedBlockNum uint64 + + // lastTipAge tracks how far behind the chain tip the node is. + // Updated in commitExecution, used to detect when the event loop + // should break out and re-enter syncToTip for catch-up batching. + lastTipAge time.Duration } func (s *Sync) commitExecution(ctx context.Context, newTip *types.Header, finalizedHeader *types.Header) error { @@ -150,9 +165,22 @@ func (s *Sync) commitExecution(ctx context.Context, newTip *types.Header, finali return err } + // After flush, improve finalized header if possible. + // If newTip is at or before the last known milestone, it IS finalized. + tipNum := newTip.Number.Uint64() + if s.lastFinalizedBlockNum > 0 && tipNum <= s.lastFinalizedBlockNum { + finalizedHeader = newTip + } else if s.lastFinalizedBlockNum > 0 { + // Try to get the milestone end block header (now available after flush) + if h, err := s.execution.GetHeader(ctx, s.lastFinalizedBlockNum); err == nil && h != nil { + finalizedHeader = h + } + } + blockNum := newTip.Number.Uint64() age := common.PrettyAge(time.Unix(int64(newTip.Time), 0)) + s.lastTipAge = time.Since(time.Unix(int64(newTip.Time), 0)) s.logger.Info(syncLogPrefix("update fork choice"), "block", blockNum, "hash", newTip.Hash(), "age", age) fcStartTime := time.Now() @@ -251,11 +279,22 @@ func (s *Sync) handleMilestoneTipMismatch(ctx context.Context, ccb *CanonicalCha func (s *Sync) applyNewMilestoneOnTip(ctx context.Context, event EventNewMilestone, ccb *CanonicalChainBuilder) error { milestone := event if milestone.EndBlock().Uint64() <= ccb.Root().Number.Uint64() { + s.logger.Debug(syncLogPrefix("skipping milestone - already behind root"), + "milestoneEnd", milestone.EndBlock().Uint64(), + "ccbRoot", ccb.Root().Number.Uint64(), + ) return nil } // milestone is ahead of our current tip if milestone.EndBlock().Uint64() > ccb.Tip().Number.Uint64() { + // Track finality even for future milestones - the milestone IS finality from Heimdall. + // This lets commitExecution() set finalizedHeader = newTip for blocks within this range. + endBlock := milestone.EndBlock().Uint64() + if endBlock > s.lastFinalizedBlockNum { + s.lastFinalizedBlockNum = endBlock + } + s.logger.Debug(syncLogPrefix("putting milestone event back in the queue because our tip is behind the milestone"), "milestoneId", milestone.RawId(), "milestoneStart", milestone.StartBlock().Uint64(), @@ -291,6 +330,15 @@ func (s *Sync) applyNewMilestoneOnTip(ctx context.Context, event EventNewMilesto if endBlock > 0 { pruneTo = endBlock - 1 } + + // Track the milestone end block for forkchoice finality. + // This enables the execution stage to skip changeset generation for finalized blocks. + // Use max to avoid lowering the value when an older at-tip milestone is processed + // after a newer ahead-of-tip milestone has already been recorded. + if endBlock > s.lastFinalizedBlockNum { + s.lastFinalizedBlockNum = endBlock + } + return ccb.PruneRoot(pruneTo) } @@ -862,23 +910,51 @@ func (s *Sync) Run(ctx context.Context) error { } s.logger.Info(syncLogPrefix("running sync component")) - result, err := s.syncToTip(ctx) - if err != nil { - return err - } - if s.config.PolygonPosSingleSlotFinality { - if result.latestTip.Number.Uint64() >= s.config.PolygonPosSingleSlotFinalityBlockAt { - s.engineAPISwitcher.SetConsuming(true) + // Outer catch-up loop: when the event loop falls behind, break out and + // re-enter syncToTip which uses efficient waypoint-based batching. + for { + result, err := s.syncToTip(ctx) + if err != nil { + return err + } + + if s.config.PolygonPosSingleSlotFinality { + if result.latestTip.Number.Uint64() >= s.config.PolygonPosSingleSlotFinalityBlockAt { + s.logger.Info(syncLogPrefix("switching to engine API mode (SSF)"), "tip", result.latestTip.Number.Uint64()) + s.engineAPISwitcher.SetConsuming(true) + return nil + } + } + + ccBuilder, err := s.initialiseCcb(ctx, result) + if err != nil { + return err + } + + needsCatchUp, err := s.runEventLoop(ctx, ccBuilder) + if err != nil { + return err + } + if !needsCatchUp { return nil } - } - ccBuilder, err := s.initialiseCcb(ctx, result) - if err != nil { - return err + s.logger.Info(syncLogPrefix("re-entering syncToTip for catch-up"), + "lastTipAge", s.lastTipAge, + "threshold", catchUpAgeThreshold, + ) } +} +// runEventLoop processes tip events (new blocks, milestones, block hashes) one at a time. +// It returns needsCatchUp=true when the node has fallen too far behind and should re-enter +// syncToTip for efficient waypoint-based batch catch-up. +// +// Known limitations: +// - initialCycle is never true during catch-up re-entries (conservative pruning applies) +// - Span rotation every 128 blocks (~256s) adds ~12s overhead, which can trigger catch-up +func (s *Sync) runEventLoop(ctx context.Context, ccBuilder *CanonicalChainBuilder) (needsCatchUp bool, err error) { inactivityDuration := 30 * time.Second lastProcessedEventTime := time.Now() inactivityTicker := time.NewTicker(inactivityDuration) @@ -889,36 +965,48 @@ func (s *Sync) Run(ctx context.Context) error { if s.config.PolygonPosSingleSlotFinality { block, err := s.execution.CurrentHeader(ctx) if err != nil { - return err + return false, err } if block.Number.Uint64() >= s.config.PolygonPosSingleSlotFinalityBlockAt { s.engineAPISwitcher.SetConsuming(true) - return nil + return false, nil } } + var checkAge bool switch event.Type { case EventTypeNewMilestone: if err = s.applyNewMilestoneOnTip(ctx, event.AsNewMilestone(), ccBuilder); err != nil { - return err + return false, err } case EventTypeNewBlock: if err = s.applyNewBlockOnTip(ctx, event.AsNewBlock(), ccBuilder); err != nil { - return err + return false, err } + checkAge = true case EventTypeNewBlockBatch: if err = s.applyNewBlockBatchOnTip(ctx, event.AsNewBlockBatch(), ccBuilder); err != nil { - return err + return false, err } + checkAge = true case EventTypeNewBlockHashes: if err = s.applyNewBlockHashesOnTip(ctx, event.AsNewBlockHashes(), ccBuilder); err != nil { - return err + return false, err } + checkAge = true default: panic(fmt.Sprintf("unexpected event type: %v", event.Type)) } + // After processing block events (not milestones, which are finality metadata), + // check if we've fallen too far behind and need to switch to batch catch-up mode. + if checkAge && s.lastTipAge > catchUpAgeThreshold { + s.logger.Info(syncLogPrefix("node is behind, switching to catch-up mode"), + "tipAge", s.lastTipAge, "threshold", catchUpAgeThreshold) + return true, nil + } + lastProcessedEventTime = time.Now() case <-inactivityTicker.C: if time.Since(lastProcessedEventTime) < inactivityDuration { @@ -927,7 +1015,7 @@ func (s *Sync) Run(ctx context.Context) error { s.logger.Info(syncLogPrefix("waiting for chain tip events...")) case <-ctx.Done(): - return ctx.Err() + return false, ctx.Err() } } } @@ -949,6 +1037,9 @@ func (s *Sync) initialiseCcb(ctx context.Context, result syncToTipResult) (*Cano if result.latestWaypoint.EndBlock().Uint64() > tipNum { return nil, fmt.Errorf("unexpected rootNum > tipNum: %d > %d", rootNum, tipNum) } + // Initialize lastFinalizedBlockNum from the latest waypoint (milestone or checkpoint) + s.lastFinalizedBlockNum = rootNum + s.logger.Debug(syncLogPrefix("initialized milestone finality"), "lastFinalizedBlock", s.lastFinalizedBlockNum) } s.logger.Debug(syncLogPrefix("initialising canonical chain builder"), "rootNum", rootNum, "tipNum", tipNum) @@ -1046,6 +1137,31 @@ func (s *Sync) syncToTip(ctx context.Context) (syncToTipResult, error) { } } + // If we didn't get a waypoint from sync (e.g., already at tip), fetch the latest milestone + // This ensures we have milestone finality info for the execution stage optimization + if finalisedTip.latestWaypoint == nil { + s.logger.Info(syncLogPrefix("no waypoint from sync, fetching latest milestone...")) + latestMilestone, ok, err := s.heimdallSync.SynchronizeMilestones(ctx) + if err != nil { + s.logger.Warn(syncLogPrefix("failed to get latest milestone for finality"), "err", err) + } else if ok && latestMilestone != nil { + finalisedTip.latestWaypoint = latestMilestone + s.logger.Info(syncLogPrefix("fetched latest milestone for finality"), + "milestoneEndBlock", latestMilestone.EndBlock().Uint64(), + ) + } else { + s.logger.Warn(syncLogPrefix("SynchronizeMilestones returned no milestone"), "ok", ok, "milestoneNil", latestMilestone == nil) + } + } else { + s.logger.Info(syncLogPrefix("waypoint from sync available"), + "waypointEndBlock", finalisedTip.latestWaypoint.EndBlock().Uint64(), + ) + } + + s.logger.Info(syncLogPrefix("syncToTip finished"), + "tipNum", finalisedTip.latestTip.Number.Uint64(), + "hasWaypoint", finalisedTip.latestWaypoint != nil, + ) return finalisedTip, nil } @@ -1091,8 +1207,14 @@ func (s *Sync) sync( return syncToTipResult{}, false, nil } + // Track the waypoint end block for forkchoice finality + waypointEndBlock := waypoint.EndBlock().Uint64() + if waypointEndBlock > s.lastFinalizedBlockNum { + s.lastFinalizedBlockNum = waypointEndBlock + } + // notify about latest waypoint end block so that eth_syncing API doesn't flicker on initial sync - s.notifications.NewLastBlockSeen(waypoint.EndBlock().Uint64()) + s.notifications.NewLastBlockSeen(waypointEndBlock) newTip, err := blockDownload(ctx, tip.Number.Uint64()+1, syncTo) if err != nil { diff --git a/polygon/sync/sync_test.go b/polygon/sync/sync_test.go new file mode 100644 index 00000000000..d5ae0f99b92 --- /dev/null +++ b/polygon/sync/sync_test.go @@ -0,0 +1,92 @@ +package sync + +import ( + "context" + "math/big" + "testing" + "time" + + "github.com/erigontech/erigon-lib/common" + "github.com/erigontech/erigon-lib/log/v3" + "github.com/erigontech/erigon/execution/types" + "go.uber.org/mock/gomock" +) + +// stubExecutionClient is a minimal no-op implementation of ExecutionClient for unit tests. +type stubExecutionClient struct{} + +func (s *stubExecutionClient) Prepare(context.Context) error { return nil } +func (s *stubExecutionClient) InsertBlocks(context.Context, []*types.Block) error { + return nil +} +func (s *stubExecutionClient) UpdateForkChoice(_ context.Context, tip *types.Header, _ *types.Header) (common.Hash, error) { + return tip.Hash(), nil +} +func (s *stubExecutionClient) CurrentHeader(context.Context) (*types.Header, error) { + return nil, nil +} +func (s *stubExecutionClient) GetHeader(context.Context, uint64) (*types.Header, error) { + return nil, nil +} +func (s *stubExecutionClient) GetTd(context.Context, uint64, common.Hash) (*big.Int, error) { + return big.NewInt(0), nil +} + +func newTestSync(t *testing.T) *Sync { + t.Helper() + ctrl := gomock.NewController(t) + store := NewMockStore(ctrl) + store.EXPECT().Flush(gomock.Any()).Return(nil).AnyTimes() + + return &Sync{ + store: store, + execution: &stubExecutionClient{}, + logger: log.New(), + } +} + +func TestCommitExecutionTracksLastTipAge(t *testing.T) { + s := newTestSync(t) + + // Create a header with timestamp 60 seconds in the past. + oldTime := time.Now().Add(-60 * time.Second) + header := &types.Header{ + Number: big.NewInt(100), + Time: uint64(oldTime.Unix()), + } + + err := s.commitExecution(context.Background(), header, header) + if err != nil { + t.Fatalf("commitExecution failed: %v", err) + } + + if s.lastTipAge <= catchUpAgeThreshold { + t.Errorf("expected lastTipAge > %v for a 60s-old block, got %v", catchUpAgeThreshold, s.lastTipAge) + } +} + +func TestCommitExecutionRecentBlockHasLowAge(t *testing.T) { + s := newTestSync(t) + + // Create a header with timestamp 2 seconds in the past. + recentTime := time.Now().Add(-2 * time.Second) + header := &types.Header{ + Number: big.NewInt(200), + Time: uint64(recentTime.Unix()), + } + + err := s.commitExecution(context.Background(), header, header) + if err != nil { + t.Fatalf("commitExecution failed: %v", err) + } + + if s.lastTipAge > catchUpAgeThreshold { + t.Errorf("expected lastTipAge < %v for a 2s-old block, got %v", catchUpAgeThreshold, s.lastTipAge) + } +} + +func TestCatchUpAgeThresholdValue(t *testing.T) { + if catchUpAgeThreshold != 30*time.Second { + t.Errorf("expected catchUpAgeThreshold = 30s, got %v", catchUpAgeThreshold) + } +} diff --git a/rpc/jsonrpc/erigon_block.go b/rpc/jsonrpc/erigon_block.go index e84c10fa6fa..add67574942 100644 --- a/rpc/jsonrpc/erigon_block.go +++ b/rpc/jsonrpc/erigon_block.go @@ -211,6 +211,14 @@ func (api *ErigonImpl) GetBalanceChangesInBlock(ctx context.Context, blockNrOrHa } defer tx.Rollback() + blockNumber, _, _, err := rpchelper.GetBlockNumber(ctx, blockNrOrHash, tx, api._blockReader, api.filters) + if err != nil { + return nil, err + } + if err := rpchelper.CheckBlockExecuted(tx, blockNumber); err != nil { + return nil, err + } + balancesMapping := make(map[common.Address]*hexutil.Big) latestState, err := rpchelper.CreateStateReader(ctx, tx, api._blockReader, blockNrOrHash, 0, api.filters, api.stateCache, api._txNumReader) if err != nil { diff --git a/rpc/jsonrpc/eth_accounts.go b/rpc/jsonrpc/eth_accounts.go index 908ed2bf9e6..2065885be3a 100644 --- a/rpc/jsonrpc/eth_accounts.go +++ b/rpc/jsonrpc/eth_accounts.go @@ -40,16 +40,12 @@ func (api *APIImpl) GetBalance(ctx context.Context, address common.Address, bloc } defer tx.Rollback() - // Check if the requested block is in the future - if blockNrOrHash.BlockNumber != nil && *blockNrOrHash.BlockNumber >= rpc.EarliestBlockNumber { - latestBlock, err := rpchelper.GetLatestBlockNumber(tx) - if err != nil { - return nil, err - } - requestedBlock := blockNrOrHash.BlockNumber.Uint64() - if latestBlock < requestedBlock { - return nil, fmt.Errorf("block number is in the future latest=%d requested=%d", latestBlock, requestedBlock) - } + blockNumber, _, _, err := rpchelper.GetBlockNumber(ctx, blockNrOrHash, tx, api._blockReader, api.filters) + if err != nil { + return nil, err + } + if err := rpchelper.CheckBlockExecuted(tx, blockNumber); err != nil { + return nil, err } reader, err := rpchelper.CreateStateReader(ctx, tx, api._blockReader, blockNrOrHash, 0, api.filters, api.stateCache, api._txNumReader) @@ -88,6 +84,15 @@ func (api *APIImpl) GetTransactionCount(ctx context.Context, address common.Addr return nil, fmt.Errorf("getTransactionCount cannot open tx: %w", err1) } defer tx.Rollback() + + blockNumber, _, _, err := rpchelper.GetBlockNumber(ctx, blockNrOrHash, tx, api._blockReader, api.filters) + if err != nil { + return nil, err + } + if err := rpchelper.CheckBlockExecuted(tx, blockNumber); err != nil { + return nil, err + } + reader, err := rpchelper.CreateStateReader(ctx, tx, api._blockReader, blockNrOrHash, 0, api.filters, api.stateCache, api._txNumReader) if err != nil { return nil, err @@ -107,6 +112,15 @@ func (api *APIImpl) GetCode(ctx context.Context, address common.Address, blockNr return nil, fmt.Errorf("getCode cannot open tx: %w", err1) } defer tx.Rollback() + + blockNumber, _, _, err := rpchelper.GetBlockNumber(ctx, blockNrOrHash, tx, api._blockReader, api.filters) + if err != nil { + return nil, err + } + if err := rpchelper.CheckBlockExecuted(tx, blockNumber); err != nil { + return nil, err + } + reader, err := rpchelper.CreateStateReader(ctx, tx, api._blockReader, blockNrOrHash, 0, api.filters, api.stateCache, api._txNumReader) if err != nil { return nil, err @@ -139,6 +153,14 @@ func (api *APIImpl) GetStorageAt(ctx context.Context, address common.Address, in } defer tx.Rollback() + blockNumber, _, _, err := rpchelper.GetBlockNumber(ctx, blockNrOrHash, tx, api._blockReader, api.filters) + if err != nil { + return hexutil.Encode(common.LeftPadBytes(empty, 32)), err + } + if err := rpchelper.CheckBlockExecuted(tx, blockNumber); err != nil { + return hexutil.Encode(common.LeftPadBytes(empty, 32)), err + } + reader, err := rpchelper.CreateStateReader(ctx, tx, api._blockReader, blockNrOrHash, 0, api.filters, api.stateCache, api._txNumReader) if err != nil { return hexutil.Encode(common.LeftPadBytes(empty, 32)), err @@ -161,6 +183,14 @@ func (api *APIImpl) Exist(ctx context.Context, address common.Address, blockNrOr } defer tx.Rollback() + blockNumber, _, _, err := rpchelper.GetBlockNumber(ctx, blockNrOrHash, tx, api._blockReader, api.filters) + if err != nil { + return false, err + } + if err := rpchelper.CheckBlockExecuted(tx, blockNumber); err != nil { + return false, err + } + reader, err := rpchelper.CreateStateReader(ctx, tx, api._blockReader, blockNrOrHash, 0, api.filters, api.stateCache, api._txNumReader) if err != nil { return false, err diff --git a/rpc/jsonrpc/eth_call.go b/rpc/jsonrpc/eth_call.go index 3f1230e0b5f..413347f72b0 100644 --- a/rpc/jsonrpc/eth_call.go +++ b/rpc/jsonrpc/eth_call.go @@ -84,6 +84,14 @@ func (api *APIImpl) Call(ctx context.Context, args ethapi2.CallArgs, requestedBl args.Gas = (*hexutil.Uint64)(&api.GasCap) } + blockNumber, _, _, err := rpchelper.GetBlockNumber(ctx, blockNrOrHash, tx, api._blockReader, api.filters) + if err != nil { + return nil, err + } + if err := rpchelper.CheckBlockExecuted(tx, blockNumber); err != nil { + return nil, err + } + header, _, err := headerByNumberOrHash(ctx, tx, blockNrOrHash, api) if err != nil { return nil, err @@ -120,7 +128,7 @@ func headerByNumberOrHash(ctx context.Context, tx kv.Tx, blockNrOrHash rpc.Block } block := api.tryBlockFromLru(hash) if block != nil { - return block.Header(), false, nil + return block.Header(), isLatest, nil } blockNum, _, _, err := rpchelper.GetBlockNumber(ctx, blockNrOrHash, tx, api._blockReader, api.filters) @@ -160,6 +168,14 @@ func (api *APIImpl) EstimateGas(ctx context.Context, argsOrNil *ethapi2.CallArgs } engine := api.engine() + blockNum, _, _, err := rpchelper.GetBlockNumber(ctx, *blockNrOrHash, dbtx, api._blockReader, api.filters) + if err != nil { + return 0, err + } + if err := rpchelper.CheckBlockExecuted(dbtx, blockNum); err != nil { + return 0, err + } + header, isLatest, err := headerByNumberOrHash(ctx, dbtx, *blockNrOrHash, api) if err != nil { return 0, err @@ -768,6 +784,14 @@ func (api *APIImpl) CreateAccessList(ctx context.Context, args ethapi2.CallArgs, } engine := api.engine() + accessListBlockNum, _, _, err := rpchelper.GetBlockNumber(ctx, *blockNrOrHash, tx, api._blockReader, api.filters) + if err != nil { + return nil, err + } + if err := rpchelper.CheckBlockExecuted(tx, accessListBlockNum); err != nil { + return nil, err + } + header, latest, err := headerByNumberOrHash(ctx, tx, *blockNrOrHash, api) if err != nil { return nil, err diff --git a/rpc/jsonrpc/eth_callMany.go b/rpc/jsonrpc/eth_callMany.go index 6edba177c68..3ca7fad042b 100644 --- a/rpc/jsonrpc/eth_callMany.go +++ b/rpc/jsonrpc/eth_callMany.go @@ -126,6 +126,9 @@ func (api *APIImpl) CallMany(ctx context.Context, bundles []Bundle, simulateCont if err != nil { return nil, err } + if err := rpchelper.CheckBlockExecuted(tx, blockNum); err != nil { + return nil, err + } block, err := api.blockWithSenders(ctx, tx, hash, blockNum) if err != nil { diff --git a/rpc/jsonrpc/otterscan_has_code.go b/rpc/jsonrpc/otterscan_has_code.go index 1c01c7501b6..840de7f845a 100644 --- a/rpc/jsonrpc/otterscan_has_code.go +++ b/rpc/jsonrpc/otterscan_has_code.go @@ -36,6 +36,9 @@ func (api *OtterscanAPIImpl) HasCode(ctx context.Context, address common.Address if err != nil { return false, err } + if err := rpchelper.CheckBlockExecuted(tx, blockNumber); err != nil { + return false, err + } reader, err := rpchelper.CreateHistoryStateReader(tx, blockNumber, 0, api._txNumReader) if err != nil { diff --git a/rpc/jsonrpc/otterscan_search_trace.go b/rpc/jsonrpc/otterscan_search_trace.go index 4802a13f1b6..d2220b4eb04 100644 --- a/rpc/jsonrpc/otterscan_search_trace.go +++ b/rpc/jsonrpc/otterscan_search_trace.go @@ -65,6 +65,10 @@ func (api *OtterscanAPIImpl) traceBlock(dbtx kv.TemporalTx, ctx context.Context, return false, nil, fmt.Errorf("canonical hash not found %d", blockNum) } + if err := rpchelper.CheckBlockExecuted(dbtx, blockNum); err != nil { + return false, nil, err + } + block, err := api.blockWithSenders(ctx, dbtx, blockHash, blockNum) if err != nil { return false, nil, err diff --git a/rpc/jsonrpc/receipts/bor_receipts_generator.go b/rpc/jsonrpc/receipts/bor_receipts_generator.go index d52d4506b11..26510524b52 100644 --- a/rpc/jsonrpc/receipts/bor_receipts_generator.go +++ b/rpc/jsonrpc/receipts/bor_receipts_generator.go @@ -18,6 +18,7 @@ import ( "github.com/erigontech/erigon/execution/consensus" "github.com/erigontech/erigon/execution/types" bortypes "github.com/erigontech/erigon/polygon/bor/types" + "github.com/erigontech/erigon/rpc/rpchelper" "github.com/erigontech/erigon/turbo/services" "github.com/erigontech/erigon/turbo/transactions" ) @@ -55,6 +56,10 @@ func (g *BorGenerator) GenerateBorReceipt(ctx context.Context, tx kv.TemporalTx, return receipt, nil } + if err := rpchelper.CheckBlockExecuted(tx, block.NumberU64()); err != nil { + return nil, err + } + // Post Madhugiri HF, state-sync txn is part of block body so calculate index accordingly. txIndex := len(block.Transactions()) if chainConfig.Bor.IsMadhugiri(block.NumberU64()) { diff --git a/rpc/jsonrpc/receipts/receipts_generator.go b/rpc/jsonrpc/receipts/receipts_generator.go index 3fc470d23fa..da8940ae1c1 100644 --- a/rpc/jsonrpc/receipts/receipts_generator.go +++ b/rpc/jsonrpc/receipts/receipts_generator.go @@ -25,6 +25,7 @@ import ( "github.com/erigontech/erigon/execution/chain" "github.com/erigontech/erigon/execution/consensus" "github.com/erigontech/erigon/execution/types" + "github.com/erigontech/erigon/rpc/rpchelper" "github.com/erigontech/erigon/turbo/services" "github.com/erigontech/erigon/turbo/transactions" ) @@ -158,6 +159,10 @@ func (g *Generator) GetReceipt(ctx context.Context, cfg *chain.Config, tx kv.Tem return nil, fmt.Errorf("ReceiptGen.GetReceipt: txn is a state-sync transaction") } + if err := rpchelper.CheckBlockExecuted(tx, header.Number.Uint64()); err != nil { + return nil, err + } + blockHash := header.Hash() blockNum := header.Number.Uint64() txnHash := txn.Hash() @@ -289,6 +294,10 @@ func (g *Generator) GetReceipt(ctx context.Context, cfg *chain.Config, tx kv.Tem // GetReceipts regenerates or loads receipts for a given block // This DOES NOT generate state-sync transaction receipt for bor. func (g *Generator) GetReceipts(ctx context.Context, cfg *chain.Config, tx kv.TemporalTx, block *types.Block) (types.Receipts, error) { + if err := rpchelper.CheckBlockExecuted(tx, block.NumberU64()); err != nil { + return nil, err + } + blockHash := block.Hash() var receiptsFromDB types.Receipts diff --git a/rpc/jsonrpc/trace_adhoc.go b/rpc/jsonrpc/trace_adhoc.go index a32250a8157..d3063e23010 100644 --- a/rpc/jsonrpc/trace_adhoc.go +++ b/rpc/jsonrpc/trace_adhoc.go @@ -1050,6 +1050,9 @@ func (api *TraceAPIImpl) Call(ctx context.Context, args TraceCallParam, traceTyp if err != nil { return nil, err } + if err := rpchelper.CheckBlockExecuted(tx, blockNumber); err != nil { + return nil, err + } header, err := api.headerByRPCNumber(ctx, rpc.BlockNumber(blockNumber), tx) if err != nil { @@ -1236,6 +1239,9 @@ func (api *TraceAPIImpl) CallMany(ctx context.Context, calls json.RawMessage, pa if err != nil { return nil, err } + if err := rpchelper.CheckBlockExecuted(tx, blockNumber); err != nil { + return nil, err + } parentHeader, err := api.headerByRPCNumber(ctx, rpc.BlockNumber(blockNumber), tx) if err != nil { diff --git a/rpc/jsonrpc/trace_filtering.go b/rpc/jsonrpc/trace_filtering.go index 8d9208eafb3..f6072f74192 100644 --- a/rpc/jsonrpc/trace_filtering.go +++ b/rpc/jsonrpc/trace_filtering.go @@ -185,6 +185,9 @@ func (api *TraceAPIImpl) Block(ctx context.Context, blockNr rpc.BlockNumber, gas if blockNum == 0 { return []ParityTrace{}, nil } + if err := rpchelper.CheckBlockExecuted(tx, blockNum); err != nil { + return nil, err + } bn := hexutil.Uint64(blockNum) // Extract transactions from block @@ -757,6 +760,10 @@ func (api *TraceAPIImpl) callBlock( RequireCanonical: true, } + if err := rpchelper.CheckBlockExecuted(dbtx, block.NumberU64()); err != nil { + return nil, nil, err + } + stateReader, err := rpchelper.CreateStateReader(ctx, dbtx, api._blockReader, parentNrOrHash, 0, api.filters, api.stateCache, api._txNumReader) if err != nil { return nil, nil, err @@ -873,6 +880,10 @@ func (api *TraceAPIImpl) callTransaction( RequireCanonical: true, } + if err := rpchelper.CheckBlockExecuted(dbtx, blockNumber); err != nil { + return nil, err + } + stateReader, err := rpchelper.CreateStateReader(ctx, dbtx, api._blockReader, parentNrOrHash, 0, api.filters, api.stateCache, api._txNumReader) if err != nil { return nil, err diff --git a/rpc/jsonrpc/tracing.go b/rpc/jsonrpc/tracing.go index a04e5701610..0120e8f1cc7 100644 --- a/rpc/jsonrpc/tracing.go +++ b/rpc/jsonrpc/tracing.go @@ -64,6 +64,9 @@ func (api *DebugAPIImpl) traceBlock(ctx context.Context, blockNrOrHash rpc.Block if err != nil { return err } + if err := rpchelper.CheckBlockExecuted(tx, blockNumber); err != nil { + return err + } if blockNumber == 0 { stream.WriteNil() @@ -373,6 +376,9 @@ func (api *DebugAPIImpl) TraceCall(ctx context.Context, args ethapi.CallArgs, bl if err != nil { return fmt.Errorf("get block number: %v", err) } + if err := rpchelper.CheckBlockExecuted(dbtx, blockNumber); err != nil { + return err + } err = api.BaseAPI.checkPruneHistory(ctx, dbtx, blockNumber) if err != nil { @@ -493,6 +499,9 @@ func (api *DebugAPIImpl) TraceCallMany(ctx context.Context, bundles []Bundle, si if err != nil { return err } + if err := rpchelper.CheckBlockExecuted(tx, blockNum); err != nil { + return err + } err = api.BaseAPI.checkPruneHistory(ctx, tx, blockNum) if err != nil { diff --git a/rpc/rpchelper/helper.go b/rpc/rpchelper/helper.go index 4d43340d3a6..cc2a59f5087 100644 --- a/rpc/rpchelper/helper.go +++ b/rpc/rpchelper/helper.go @@ -126,6 +126,19 @@ func _GetBlockNumber(ctx context.Context, requireCanonical bool, blockNrOrHash r return blockNumber, hash, blockNumber == plainStateBlockNumber, true, nil } +func CheckBlockExecuted(tx kv.Tx, blockNumber uint64) error { + lastExecutedBlock, err := stages.GetStageProgress(tx, stages.Execution) + if err != nil { + return err + } + + if blockNumber > lastExecutedBlock { + return fmt.Errorf("block %d is not executed yet (last executed block: %d)", blockNumber, lastExecutedBlock) + } + + return nil +} + func CreateStateReader(ctx context.Context, tx kv.TemporalTx, br services.FullBlockReader, blockNrOrHash rpc.BlockNumberOrHash, txnIndex int, filters *Filters, stateCache kvcache.Cache, txNumReader rawdbv3.TxNumsReader) (state.StateReader, error) { blockNumber, _, latest, _, err := _GetBlockNumber(ctx, true, blockNrOrHash, tx, br, filters) if err != nil { diff --git a/turbo/cli/default_flags.go b/turbo/cli/default_flags.go index b885a0df39a..731d7d91b37 100644 --- a/turbo/cli/default_flags.go +++ b/turbo/cli/default_flags.go @@ -255,5 +255,6 @@ var DefaultFlags = []cli.Flag{ &utils.GDBMeFlag, &utils.ExperimentalConcurrentCommitmentFlag, + &utils.UseForkchoiceFinalityFlag, &utils.ElBlockDownloaderV2, } diff --git a/turbo/cli/flags.go b/turbo/cli/flags.go index 088b6ac5be3..1d412b67721 100644 --- a/turbo/cli/flags.go +++ b/turbo/cli/flags.go @@ -320,6 +320,8 @@ func ApplyFlagsForEthConfig(ctx *cli.Context, cfg *ethconfig.Config, logger log. if ctx.Bool(utils.ChaosMonkeyFlag.Name) { cfg.ChaosMonkey = true } + + cfg.Sync.UseForkchoiceFinality = ctx.Bool(utils.UseForkchoiceFinalityFlag.Name) } func ApplyFlagsForEthConfigCobra(f *pflag.FlagSet, cfg *ethconfig.Config) { diff --git a/turbo/node/node.go b/turbo/node/node.go index 7e14ef8a5f4..9eb1b0917d7 100644 --- a/turbo/node/node.go +++ b/turbo/node/node.go @@ -57,7 +57,7 @@ func (eri *ErigonNode) Serve() error { eri.stack.Wait() - return nil + return eri.backend.BgComponentError() } func (eri *ErigonNode) Backend() *eth.Ethereum {