From 1ddecc8ec4e3d623d69fc5d44de5fc54291fb638 Mon Sep 17 00:00:00 2001 From: Fridrik Asmundsson Date: Fri, 6 Feb 2026 17:48:33 +0000 Subject: [PATCH 1/5] feat: add sequencer mode for flashblock preconfirmations --- Cargo.lock | 5 + Cargo.toml | 11 +- src/flashblocks/mod.rs | 41 +- src/flashblocks/test_utils.rs | 2 + src/lib.rs | 1 + src/main.rs | 95 ++++- src/node/evm/builder.rs | 44 ++- src/node/evm/mod.rs | 4 +- src/sequencer/builder.rs | 693 ++++++++++++++++++++++++++++++++++ src/sequencer/mod.rs | 37 ++ src/sequencer/publisher.rs | 186 +++++++++ src/sequencer/signing.rs | 252 +++++++++++++ tests/e2e/flashblocks.rs | 2 + 13 files changed, 1357 insertions(+), 16 deletions(-) create mode 100644 src/sequencer/builder.rs create mode 100644 src/sequencer/mod.rs create mode 100644 src/sequencer/publisher.rs create mode 100644 src/sequencer/signing.rs diff --git a/Cargo.lock b/Cargo.lock index ca5702ba..3d41ac25 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1489,11 +1489,13 @@ dependencies = [ "alloy-sol-macro", "alloy-sol-types", "async-trait", + "blst", "bytes", "clap", "derive_more", "eyre", "futures-util", + "hex", "jsonrpsee", "jsonrpsee-core", "jsonrpsee-proc-macros", @@ -1536,6 +1538,7 @@ dependencies = [ "reth-rpc-eth-types", "reth-storage-api", "reth-transaction-pool", + "revm", "revm-context-interface", "revm-inspectors", "serde", @@ -1544,6 +1547,8 @@ dependencies = [ "test-fuzz", "thiserror 2.0.17", "tokio", + "tokio-tungstenite", + "tokio-util", "tracing", "vergen", "vergen-git2", diff --git a/Cargo.toml b/Cargo.toml index 50ab389e..be9c1fef 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,10 +17,12 @@ alloy-serde = "1.1.2" alloy-signer-local = "1.1.2" alloy-sol-macro = "1.4.1" alloy-sol-types = "1.4.1" +blst = "0.3" bytes = "1.10.1" clap = { version = "4.5.40", features = ["derive"] } derive_more = "2.0.1" eyre = "0.6.12" +hex = "0.4" sha2 = "0.10" # rpc @@ -66,18 +68,21 @@ reth-rpc-eth-api = { git = "https://github.com/paradigmxyz/reth", rev = "536bebf reth-rpc-eth-types = { git = "https://github.com/paradigmxyz/reth", rev = "536bebfcd" } reth-storage-api = { git = "https://github.com/paradigmxyz/reth", rev = "536bebfcd" } reth-transaction-pool = { git = "https://github.com/paradigmxyz/reth", rev = "536bebfcd" } +revm = "33" revm-context-interface = "13.1.0" serde = { version = "1.0", features = ["derive"], default-features = false } +serde_json = "1.0" thiserror = "2.0" -tokio = "1.46.0" +tokio = { version = "1.46.0", features = ["sync", "net", "macros"] } +tokio-tungstenite = "0.26" +tokio-util = "0.7" tracing = "0.1.41" +futures-util = "0.3" [dev-dependencies] alloy-provider = "1.1.2" alloy-rpc-client = "1.1.2" alloy-rpc-types-trace = "1.1.2" -eyre = "0.6.12" -futures-util = { version = "0.3", default-features = false } reth-e2e-test-utils = { git = "https://github.com/paradigmxyz/reth", rev = "536bebfcd" } reth-rpc-builder = { git = "https://github.com/paradigmxyz/reth", rev = "536bebfcd" } revm-inspectors = "0.33.0" diff --git a/src/flashblocks/mod.rs b/src/flashblocks/mod.rs index e581cd86..a0becb07 100644 --- a/src/flashblocks/mod.rs +++ b/src/flashblocks/mod.rs @@ -1,7 +1,11 @@ #[cfg(test)] pub mod test_utils; -use crate::{primitives::header::BlsPublicKey, transaction::BerachainTxEnvelope}; +use crate::{ + primitives::header::BlsPublicKey, + sequencer::signing::BlsSignature, + transaction::BerachainTxEnvelope, +}; use alloy_consensus::{crypto::RecoveryError, transaction::Recovered}; use alloy_eips::{ eip2718::WithEncoded, @@ -117,13 +121,46 @@ pub struct BerachainFlashblockPayloadMetadata { pub block_number: u64, } -#[derive(Clone, Debug, Default, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)] pub struct BerachainFlashblockPayload { pub payload_id: PayloadId, pub index: u64, pub base: Option, pub diff: BerachainFlashblockPayloadDiff, pub metadata: BerachainFlashblockPayloadMetadata, + #[serde(with = "signature_serde")] + pub signature: BlsSignature, + pub is_last: bool, +} + +mod signature_serde { + use super::BlsSignature; + use serde::{Deserialize, Deserializer, Serializer}; + + pub fn serialize(sig: &BlsSignature, serializer: S) -> Result + where + S: Serializer, + { + serializer.serialize_str(&format!("0x{}", hex::encode(sig))) + } + + pub fn deserialize<'de, D>(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + let hex_str = String::deserialize(deserializer)?; + let hex_str = hex_str.trim_start_matches("0x"); + let bytes = hex::decode(hex_str).map_err(serde::de::Error::custom)?; + if bytes.len() != 96 { + return Err(serde::de::Error::custom(format!( + "expected 96 bytes, got {}", + bytes.len() + ))); + } + let mut arr = [0u8; 96]; + arr.copy_from_slice(&bytes); + Ok(arr) + } } impl BerachainFlashblockPayload { diff --git a/src/flashblocks/test_utils.rs b/src/flashblocks/test_utils.rs index 2648c370..75dd67a0 100644 --- a/src/flashblocks/test_utils.rs +++ b/src/flashblocks/test_utils.rs @@ -164,6 +164,8 @@ impl BerachainTestFlashBlockBuilder { blob_gas_used: None, }, metadata: BerachainFlashblockPayloadMetadata { block_number: self.block_number }, + signature: [0u8; 96], + is_last: false, } } } diff --git a/src/lib.rs b/src/lib.rs index 59e85b89..41f537e9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -13,6 +13,7 @@ pub mod node; pub mod pool; pub mod primitives; pub mod rpc; +pub mod sequencer; pub mod transaction; pub mod version; diff --git a/src/main.rs b/src/main.rs index 17857854..168e740f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -8,16 +8,38 @@ use bera_reth::{ consensus::BerachainBeaconConsensus, evm::BerachainEvmFactory, node::{BerachainNode, evm::config::BerachainEvmConfig}, + sequencer::{FlashblockPayloadServiceBuilder, FlashblockSigner, SequencerConfig, WebSocketPublisher}, version::init_bera_version, }; use clap::Parser; use reth::CliRunner; -use reth_cli_commands::node::NoArgs; +use reth_chainspec::EthChainSpec; use reth_ethereum_cli::Cli; -use reth_node_builder::NodeHandle; -use std::sync::Arc; +use reth_node_builder::{components::BasicPayloadServiceBuilder, Node, NodeHandle}; +use std::{net::SocketAddr, path::PathBuf, sync::Arc}; +use tokio_util::sync::CancellationToken; use tracing::info; +/// Sequencer-specific CLI arguments +#[derive(Debug, Clone, clap::Args)] +pub struct SequencerArgs { + /// Enable sequencer mode for flashblock production + #[arg(long, default_value = "false")] + pub sequencer_enabled: bool, + + /// Flashblock emission interval in milliseconds + #[arg(long, default_value = "200")] + pub flashblock_interval_ms: u64, + + /// WebSocket address for flashblock publishing + #[arg(long, default_value = "0.0.0.0:8548")] + pub flashblock_ws_addr: SocketAddr, + + /// Path to BLS secret key file for signing flashblocks (hex-encoded 32-byte key) + #[arg(long)] + pub flashblock_signing_key: Option, +} + fn main() { // Install signal handler for better crash reporting reth_cli_util::sigsegv_handler::install(); @@ -37,16 +59,71 @@ fn main() { ) }; - if let Err(err) = Cli::::parse() + if let Err(err) = Cli::::parse() .with_runner_and_components::( CliRunner::try_default_runtime().expect("Failed to create default runtime"), cli_components_builder, - async move |builder, _| { - info!(target: "reth::cli", "Launching Berachain node"); - let NodeHandle { node: _node, node_exit_future } = - builder.node(BerachainNode::default()).launch_with_debug_capabilities().await?; + async move |builder, extra_args| { + if extra_args.sequencer_enabled { + // Signing key is required in sequencer mode + let chain_id = builder.config().chain.chain().id(); + let key_path = extra_args.flashblock_signing_key.ok_or_else(|| { + eyre::eyre!("--flashblock-signing-key is required in sequencer mode") + })?; + let signer = FlashblockSigner::from_file(&key_path, chain_id) + .map_err(|e| eyre::eyre!("failed to load signing key from {:?}: {}", key_path, e))?; + + info!( + target: "reth::cli", + interval_ms = extra_args.flashblock_interval_ms, + ws_addr = %extra_args.flashblock_ws_addr, + pubkey = %hex::encode(signer.public_key_bytes()), + "Launching Berachain node in SEQUENCER mode" + ); + + let config = SequencerConfig::new( + extra_args.flashblock_interval_ms, + extra_args.flashblock_ws_addr, + signer, + ); + + let publisher = Arc::new(WebSocketPublisher::new(config.ws_addr)); + let ws_cancel = CancellationToken::new(); + + // Spawn WebSocket server + let ws_publisher = publisher.clone(); + let ws_cancel_token = ws_cancel.clone(); + tokio::spawn(async move { + if let Err(e) = ws_publisher.run(ws_cancel_token).await { + tracing::error!(target: "sequencer::publisher", error = %e, "WebSocket server error"); + } + }); + + // Build node with flashblock payload builder + let berachain_node = BerachainNode::default(); + let flashblock_builder = FlashblockPayloadServiceBuilder::new(config, publisher); + + let NodeHandle { node: _node, node_exit_future } = builder + .with_types::() + .with_components( + berachain_node + .components_builder() + .payload(BasicPayloadServiceBuilder::new(flashblock_builder)), + ) + .with_add_ons(berachain_node.add_ons()) + .launch_with_debug_capabilities() + .await?; + + let result = node_exit_future.await; + ws_cancel.cancel(); + result + } else { + info!(target: "reth::cli", "Launching Berachain node"); + let NodeHandle { node: _node, node_exit_future } = + builder.node(BerachainNode::default()).launch_with_debug_capabilities().await?; - node_exit_future.await + node_exit_future.await + } }, ) { diff --git a/src/node/evm/builder.rs b/src/node/evm/builder.rs index eea7e2c0..e35ccb17 100644 --- a/src/node/evm/builder.rs +++ b/src/node/evm/builder.rs @@ -3,7 +3,13 @@ use crate::{ transaction::BerachainTxEnvelope, }; use alloy_consensus::BlockHeader; -use reth::revm::context::result::ExecutionResult; +use reth::revm::{ + State, + context::result::ExecutionResult, + db::states::bundle_state::BundleRetention, + database_interface::Database, +}; +use revm::database::BundleState; use reth_evm::{ Evm, block::{BlockExecutionError, BlockExecutor, CommitChanges}, @@ -13,6 +19,42 @@ use reth_primitives_traits::RecoveredBlock; use reth_storage_api::StateProvider; use std::sync::Arc; +/// Trait for accessing state data needed for flashblock state root computation. +/// +/// This trait abstracts the `State` type's methods that we need for computing +/// intermediate state roots during flashblock production. By using a trait instead +/// of concrete types, we can work through the `BlockBuilder` trait abstraction. +pub trait FlashblockState { + /// Merge pending transitions into the bundle state. + /// + /// This accumulates state changes so they can be used for state root computation. + fn merge_transitions_for_flashblock(&mut self); + + /// Get a reference to the accumulated bundle state. + fn bundle_state(&self) -> &BundleState; +} + +impl FlashblockState for State { + fn merge_transitions_for_flashblock(&mut self) { + self.merge_transitions(BundleRetention::PlainState); + } + + fn bundle_state(&self) -> &BundleState { + &self.bundle_state + } +} + +// Blanket impl for mutable references (needed because Evm::DB is &'a mut State) +impl FlashblockState for &mut T { + fn merge_transitions_for_flashblock(&mut self) { + (*self).merge_transitions_for_flashblock() + } + + fn bundle_state(&self) -> &BundleState { + (**self).bundle_state() + } +} + type EResult = ExecutionResult<<::Evm as Evm>::HaltReason>; /// Berachain block builder wrapper that fixes sender/transaction mismatch from PoL injection. diff --git a/src/node/evm/mod.rs b/src/node/evm/mod.rs index afe8e57d..ecb82109 100644 --- a/src/node/evm/mod.rs +++ b/src/node/evm/mod.rs @@ -2,12 +2,14 @@ mod assembler; mod block_context; -mod builder; +pub mod builder; pub mod config; pub mod error; pub mod executor; pub mod receipt; +pub use builder::{BerachainBlockBuilder, FlashblockState}; + use crate::{ evm::BerachainEvmFactory, node::{BerachainNode, evm::config::BerachainEvmConfig}, diff --git a/src/sequencer/builder.rs b/src/sequencer/builder.rs new file mode 100644 index 00000000..d8a2969c --- /dev/null +++ b/src/sequencer/builder.rs @@ -0,0 +1,693 @@ +//! Flashblock-aware payload builder for sequencer mode. +//! +//! This builder produces flashblocks at regular intervals (~200ms) while building +//! a block, publishing them via WebSocket for preconfirmation subscribers. + +use crate::{ + chainspec::BerachainChainSpec, + engine::payload::{BerachainBuiltPayload, BerachainPayloadBuilderAttributes}, + flashblocks::{ + BerachainFlashblockPayload, BerachainFlashblockPayloadBase, BerachainFlashblockPayloadDiff, + BerachainFlashblockPayloadMetadata, + }, + hardforks::BerachainHardforks, + node::evm::{ + FlashblockState, + config::{BerachainEvmConfig, BerachainNextBlockEnvAttributes}, + }, + primitives::BerachainHeader, + sequencer::{ + signing::{compute_diff_hash, FlashblockSigner}, + SequencerConfig, WebSocketPublisher, + }, + transaction::BerachainTxEnvelope, +}; +use alloy_consensus::{ + Transaction, TxReceipt, EMPTY_OMMER_ROOT_HASH, EMPTY_ROOT_HASH, + proofs::{calculate_withdrawals_root, ordered_trie_root_with_encoder}, +}; +use alloy_eips::{eip2718::Encodable2718, eip4895::Withdrawal}; +use bytes::BufMut; +use alloy_primitives::{logs_bloom, B256, B64, Bloom, Bytes, Sealable, U256}; +use reth::{ + api::{FullNodeTypes, NodeTypes, PayloadBuilderError, PayloadTypes, TxTy}, + chainspec::EthereumHardforks, + providers::StateProviderFactory, + revm::{context::Block, database::StateProviderDatabase, State}, + transaction_pool::{PoolTransaction, TransactionPool}, +}; +use reth_basic_payload_builder::{ + BuildArguments, BuildOutcome, MissingPayloadBehaviour, PayloadBuilder, PayloadConfig, +}; +use reth_chainspec::{ChainSpecProvider, EthChainSpec}; +use reth_ethereum_payload_builder::EthereumBuilderConfig; +use reth_ethereum_primitives::Receipt; +use reth_evm::{ + block::{BlockExecutionError, BlockValidationError, CommitChanges}, + execute::{BlockBuilder, BlockBuilderOutcome}, + ConfigureEvm, Evm, +}; +use reth_node_builder::{components::PayloadBuilderBuilder, BuilderContext, PayloadBuilderConfig}; +use reth_payload_primitives::PayloadBuilderAttributes; +use reth_primitives_traits::transaction::error::InvalidTransactionError; +use reth_transaction_pool::{ + error::InvalidPoolTransactionError, BestTransactions, BestTransactionsAttributes, + ValidPoolTransaction, +}; +use std::{ + sync::Arc, + time::{Duration, Instant}, +}; +use tracing::{debug, info, trace, warn}; + +use crate::transaction::BerachainTxType; + +type BestTransactionsIter = Box< + dyn BestTransactions::Transaction>>>, +>; + +/// Service builder for creating flashblock-aware payload builders. +#[derive(Clone, Debug)] +pub struct FlashblockPayloadServiceBuilder { + config: SequencerConfig, + publisher: Arc, +} + +impl FlashblockPayloadServiceBuilder { + /// Create a new flashblock payload service builder. + pub fn new(config: SequencerConfig, publisher: Arc) -> Self { + Self { config, publisher } + } +} + +impl PayloadBuilderBuilder + for FlashblockPayloadServiceBuilder +where + Types: NodeTypes< + ChainSpec = BerachainChainSpec, + Primitives = crate::primitives::BerachainPrimitives, + >, + Node: FullNodeTypes, + Pool: TransactionPool>> + + Unpin + + 'static, + Types::Payload: PayloadTypes< + BuiltPayload = BerachainBuiltPayload, + PayloadAttributes = crate::engine::payload::BerachainPayloadAttributes, + PayloadBuilderAttributes = BerachainPayloadBuilderAttributes, + >, +{ + type PayloadBuilder = FlashblockPayloadBuilder; + + async fn build_payload_builder( + self, + ctx: &BuilderContext, + pool: Pool, + evm_config: BerachainEvmConfig, + ) -> eyre::Result { + let conf = ctx.payload_builder_config(); + let chain = ctx.chain_spec().chain(); + let gas_limit = conf.gas_limit_for(chain); + + Ok(FlashblockPayloadBuilder::new( + ctx.provider().clone(), + pool, + evm_config, + EthereumBuilderConfig::new().with_gas_limit(gas_limit), + self.config, + self.publisher, + conf.deadline(), + )) + } +} + +/// Flashblock-aware payload builder. +/// +/// This builder emits flashblocks at regular intervals while building a payload, +/// allowing preconfirmation subscribers to track transaction inclusion in real-time. +#[derive(Debug)] +pub struct FlashblockPayloadBuilder { + client: Client, + pool: Pool, + evm_config: BerachainEvmConfig, + builder_config: EthereumBuilderConfig, + sequencer_config: SequencerConfig, + publisher: Arc, + deadline: Duration, +} + +impl Clone for FlashblockPayloadBuilder { + fn clone(&self) -> Self { + Self { + client: self.client.clone(), + pool: self.pool.clone(), + evm_config: self.evm_config.clone(), + builder_config: self.builder_config.clone(), + sequencer_config: self.sequencer_config.clone(), + publisher: self.publisher.clone(), + deadline: self.deadline, + } + } +} + +impl FlashblockPayloadBuilder { + /// Create a new flashblock payload builder. + pub fn new( + client: Client, + pool: Pool, + evm_config: BerachainEvmConfig, + builder_config: EthereumBuilderConfig, + sequencer_config: SequencerConfig, + publisher: Arc, + deadline: Duration, + ) -> Self { + Self { client, pool, evm_config, builder_config, sequencer_config, publisher, deadline } + } +} + +impl PayloadBuilder for FlashblockPayloadBuilder +where + Client: StateProviderFactory + ChainSpecProvider + Clone, + Pool: TransactionPool>, +{ + type Attributes = BerachainPayloadBuilderAttributes; + type BuiltPayload = BerachainBuiltPayload; + + fn try_build( + &self, + args: BuildArguments, + ) -> Result, PayloadBuilderError> { + build_flashblock_payload( + self.evm_config.clone(), + self.client.clone(), + self.pool.clone(), + self.builder_config.clone(), + self.sequencer_config.clone(), + self.publisher.clone(), + self.deadline, + args, + |attributes| self.pool.best_transactions_with_attributes(attributes), + ) + } + + fn on_missing_payload( + &self, + _args: BuildArguments, + ) -> MissingPayloadBehaviour { + MissingPayloadBehaviour::AwaitInProgress + } + + fn build_empty_payload( + &self, + config: PayloadConfig, + ) -> Result { + let args = BuildArguments::new(Default::default(), config, Default::default(), None); + self.try_build(args)?.into_payload().ok_or_else(|| PayloadBuilderError::MissingPayload) + } +} + +/// Tracks execution data for flashblock emission. +struct FlashblockExecutionTracker { + /// Cumulative receipts for all executed transactions. + receipts: Vec>, + /// Encoded transactions for the current interval. + interval_transactions: Vec, + /// All encoded transactions. + all_transactions: Vec, + /// Cumulative gas used. + cumulative_gas_used: u64, + /// Total fees collected. + total_fees: U256, +} + +impl FlashblockExecutionTracker { + fn new() -> Self { + Self { + receipts: Vec::new(), + interval_transactions: Vec::new(), + all_transactions: Vec::new(), + cumulative_gas_used: 0, + total_fees: U256::ZERO, + } + } + + /// Compute the receipts root from accumulated receipts. + fn receipts_root(&self) -> B256 { + Receipt::calculate_receipt_root_no_memo(&self.receipts) + } + + /// Compute the logs bloom from accumulated receipts. + fn logs_bloom(&self) -> Bloom { + logs_bloom(self.receipts.iter().flat_map(|r| r.logs())) + } + + /// Clear interval transactions for next flashblock. + fn clear_interval(&mut self) { + self.interval_transactions.clear(); + } +} + +use reth_storage_api::{HashedPostStateProvider, StateRootProvider}; + +/// Compute the intermediate state root from a block builder. +/// +/// Merges pending state transitions and computes the state root. Requires +/// the builder's database to implement [`FlashblockState`]. +fn compute_intermediate_state_root( + builder: &mut B, + state_provider: &S, +) -> reth_storage_api::errors::ProviderResult +where + B: BlockBuilder, + S: StateRootProvider + HashedPostStateProvider, + <::Evm as Evm>::DB: FlashblockState, +{ + let db = builder.evm_mut().db_mut(); + db.merge_transitions_for_flashblock(); + let hashed_state = state_provider.hashed_post_state(db.bundle_state()); + state_provider.state_root(hashed_state) +} + +/// Build a payload while emitting flashblocks at regular intervals. +#[allow(clippy::too_many_arguments)] +fn build_flashblock_payload( + evm_config: BerachainEvmConfig, + client: Client, + _pool: Pool, + builder_config: EthereumBuilderConfig, + sequencer_config: SequencerConfig, + publisher: Arc, + deadline: Duration, + args: BuildArguments, + best_txs: F, +) -> Result, PayloadBuilderError> +where + Client: StateProviderFactory + ChainSpecProvider, + Pool: TransactionPool>, + F: FnOnce(BestTransactionsAttributes) -> BestTransactionsIter, +{ + let BuildArguments { mut cached_reads, config, cancel, best_payload: _ } = args; + let PayloadConfig { parent_header, attributes } = config; + + let state_provider = client.state_by_block_hash(parent_header.hash())?; + let state = StateProviderDatabase::new(&state_provider); + let mut db = + State::builder().with_database(cached_reads.as_db_mut(state)).with_bundle_update().build(); + + let mut builder = evm_config + .builder_for_next_block( + &mut db, + &parent_header, + BerachainNextBlockEnvAttributes { + timestamp: attributes.timestamp(), + suggested_fee_recipient: attributes.suggested_fee_recipient(), + prev_randao: attributes.prev_randao(), + gas_limit: builder_config.gas_limit(parent_header.gas_limit), + parent_beacon_block_root: attributes.parent_beacon_block_root(), + withdrawals: Some(attributes.withdrawals().clone()), + prev_proposer_pubkey: attributes.prev_proposer_pubkey, + }, + ) + .map_err(PayloadBuilderError::other)?; + + let chain_spec = client.chain_spec(); + let payload_id = attributes.id; + let block_number = parent_header.number + 1; + + info!( + target: "sequencer::builder", + id = %payload_id, + parent_hash = ?parent_header.hash(), + parent_number = parent_header.number, + block_number, + timestamp = attributes.timestamp(), + deadline_ms = deadline.as_millis(), + interval_ms = sequencer_config.interval.as_millis(), + "starting flashblock payload build" + ); + + let block_gas_limit: u64 = builder.evm_mut().block().gas_limit; + let base_fee = builder.evm_mut().block().basefee; + + let mut best_txs = best_txs(BestTransactionsAttributes::new( + base_fee, + builder.evm_mut().block().blob_gasprice().map(|gasprice| gasprice as u64), + )); + + // Apply pre-execution changes (PoL, withdrawals, etc.) + builder.apply_pre_execution_changes().map_err(|err| { + warn!(target: "sequencer::builder", %err, "failed to apply pre-execution changes"); + PayloadBuilderError::Internal(err.into()) + })?; + + // Check if Prague3 is active + if chain_spec.is_prague3_active_at_timestamp(attributes.timestamp()) { + return Err(PayloadBuilderError::Other(Box::from( + "Prague 3 block building is not supported", + ))); + } + + // Build the base payload for flashblock 0 + let base = BerachainFlashblockPayloadBase { + parent_beacon_block_root: attributes.parent_beacon_block_root().unwrap_or_default(), + parent_hash: parent_header.hash(), + fee_recipient: attributes.suggested_fee_recipient(), + prev_randao: attributes.prev_randao(), + block_number, + gas_limit: block_gas_limit, + timestamp: attributes.timestamp(), + extra_data: Bytes::default(), + base_fee_per_gas: U256::from(base_fee), + prev_proposer_pubkey: attributes.prev_proposer_pubkey, + }; + + // Withdrawals go in first flashblock (index 0) + let withdrawals: Vec = attributes.withdrawals().to_vec(); + + let mut tracker = FlashblockExecutionTracker::new(); + let mut flashblock_index = 0u64; + let mut last_flashblock_time = Instant::now(); + let interval = sequencer_config.interval; + let build_start_time = Instant::now(); + + // Helper to compute state root, emit flashblock, and handle errors. + // Returns true if emission succeeded. + let try_emit_flashblock = + |builder: &mut _, flashblock_index: u64, tracker: &_, is_last: bool| -> bool { + match compute_intermediate_state_root(builder, &state_provider) { + Ok(state_root) => { + emit_flashblock( + &publisher, + payload_id, + flashblock_index, + &base, + flashblock_index == 0, + tracker, + block_number, + &sequencer_config.signer, + state_root, + &withdrawals, + is_last, + ); + true + } + Err(e) => { + warn!( + target: "sequencer::builder", + payload_id = %payload_id, + index = flashblock_index, + error = %e, + "skipping flashblock emission due to state root computation failure" + ); + false + } + } + }; + + // Main transaction execution loop with flashblock emission. + // Flashblocks are emitted at regular intervals (~200ms) regardless of transaction activity. + // Empty flashblocks serve as heartbeats, allowing subscribers to detect liveness and + // track the current state even when no transactions are being processed. + loop { + // Check if cancelled (getPayload called). + // TODO: detect orphaned builds on new forkchoiceUpdated. + if cancel.is_cancelled() { + info!( + target: "sequencer::builder", + id = %payload_id, + flashblock_index, + total_txs = tracker.all_transactions.len(), + cumulative_gas = tracker.cumulative_gas_used, + "payload build cancelled (getPayload called), finalizing" + ); + break; + } + + // Check if deadline exceeded (--builder.deadline). + if build_start_time.elapsed() >= deadline { + info!( + target: "sequencer::builder", + id = %payload_id, + flashblock_index, + total_txs = tracker.all_transactions.len(), + cumulative_gas = tracker.cumulative_gas_used, + deadline_ms = deadline.as_millis(), + "payload build deadline reached, finalizing" + ); + break; + } + + // Check if gas limit reached (use a small buffer to account for minimum tx gas) + if tracker.cumulative_gas_used + 21_000 > block_gas_limit { + info!( + target: "sequencer::builder", + id = %payload_id, + flashblock_index, + total_txs = tracker.all_transactions.len(), + cumulative_gas = tracker.cumulative_gas_used, + block_gas_limit, + "block gas limit reached, finalizing" + ); + break; + } + + // Emit flashblock at regular intervals (may be empty, serving as heartbeat) + if last_flashblock_time.elapsed() >= interval { + if try_emit_flashblock(&mut builder, flashblock_index, &tracker, false) { + flashblock_index += 1; + tracker.clear_interval(); + } + last_flashblock_time = Instant::now(); + } + + // Try to get the next transaction + let Some(pool_tx) = best_txs.next() else { + // No transactions available, sleep briefly to prevent busy-waiting and check again + std::thread::sleep(Duration::from_millis(10)); + continue; + }; + + // Check gas limit + if tracker.cumulative_gas_used + pool_tx.gas_limit() > block_gas_limit { + best_txs.mark_invalid( + &pool_tx, + &InvalidPoolTransactionError::ExceedsGasLimit(pool_tx.gas_limit(), block_gas_limit), + ); + continue; + } + + let tx = pool_tx.to_consensus(); + let tx_hash = *tx.hash(); + + // Execute the transaction and capture the result + let mut execution_logs = Vec::new(); + + let result = builder.execute_transaction_with_commit_condition(tx.clone(), |exec_result| { + // Capture execution result before commit decision + // ExecutionResult contains the output with logs + if exec_result.is_success() { + execution_logs = exec_result.logs().to_vec(); + } + CommitChanges::Yes + }); + + let gas_used = match result { + Ok(Some(gas)) => gas, + Ok(None) => continue, // Transaction was not committed + Err(BlockExecutionError::Validation(BlockValidationError::InvalidTx { + error, .. + })) => { + if error.is_nonce_too_low() { + trace!(target: "sequencer::builder", %error, ?tx, "skipping nonce too low transaction"); + } else { + trace!(target: "sequencer::builder", %error, ?tx, "skipping invalid transaction"); + best_txs.mark_invalid( + &pool_tx, + &InvalidPoolTransactionError::Consensus( + InvalidTransactionError::TxTypeNotSupported, + ), + ); + } + continue; + } + Err(err) => return Err(PayloadBuilderError::evm(err)), + }; + + // Build receipt from execution result + let receipt = Receipt { + tx_type: tx.tx_type(), + success: true, + cumulative_gas_used: tracker.cumulative_gas_used + gas_used, + logs: execution_logs, + }; + tracker.receipts.push(receipt); + + // Encode transaction + let tx_bytes = Bytes::from(tx.inner().encoded_2718()); + + // Update tracking + let miner_fee = + tx.effective_tip_per_gas(base_fee).expect("fee is always valid; execution succeeded"); + tracker.total_fees += U256::from(miner_fee) * U256::from(gas_used); + tracker.cumulative_gas_used += gas_used; + + tracker.all_transactions.push(tx_bytes.clone()); + tracker.interval_transactions.push(tx_bytes); + + trace!( + target: "sequencer::builder", + tx_hash = ?tx_hash, + gas_used, + cumulative_gas_used = tracker.cumulative_gas_used, + "executed transaction" + ); + } + + // Always emit the final flashblock marked as last, even if empty. + // This signals to RPC nodes that no more flashblocks will arrive for this payload. + try_emit_flashblock(&mut builder, flashblock_index, &tracker, true); + + // Finalize the block + let BlockBuilderOutcome { execution_result, block, .. } = builder.finish(&state_provider)?; + + let requests = chain_spec + .is_prague_active_at_timestamp(attributes.timestamp()) + .then_some(execution_result.requests); + + let sealed_block = Arc::new(block.sealed_block().clone()); + info!( + target: "sequencer::builder", + id = %payload_id, + block_hash = %sealed_block.hash(), + block_number = sealed_block.number, + total_transactions = tracker.all_transactions.len(), + total_fees = %tracker.total_fees, + "sealed flashblock payload ready for getPayload" + ); + + let payload = BerachainBuiltPayload::new(payload_id, sealed_block, tracker.total_fees, requests); + + Ok(BuildOutcome::Better { payload, cached_reads }) +} + +#[allow(clippy::too_many_arguments)] +fn emit_flashblock( + publisher: &WebSocketPublisher, + payload_id: reth::rpc::types::engine::PayloadId, + index: u64, + base: &BerachainFlashblockPayloadBase, + include_base_in_payload: bool, + tracker: &FlashblockExecutionTracker, + block_number: u64, + signer: &FlashblockSigner, + state_root: B256, + withdrawals: &[Withdrawal], + is_last: bool, +) { + // Compute roots from accumulated receipts + let receipts_root = tracker.receipts_root(); + let logs_bloom = tracker.logs_bloom(); + + // Compute transactions root from all transactions (already encoded) + let transactions_root = ordered_trie_root_with_encoder( + &tracker.all_transactions, + |tx, buf| buf.put_slice(tx.as_ref()), + ); + + // Withdrawals are included in first flashblock only (index 0) + let (diff_withdrawals, withdrawals_root) = if include_base_in_payload { + let root = if withdrawals.is_empty() { + EMPTY_ROOT_HASH + } else { + calculate_withdrawals_root(withdrawals) + }; + (withdrawals.to_vec(), root) + } else { + (vec![], EMPTY_ROOT_HASH) + }; + + // Construct header to compute block_hash + let header = BerachainHeader { + parent_hash: base.parent_hash, + ommers_hash: EMPTY_OMMER_ROOT_HASH, + beneficiary: base.fee_recipient, + state_root, + transactions_root, + receipts_root, + withdrawals_root: Some(withdrawals_root), + logs_bloom, + difficulty: U256::ZERO, + number: block_number, + gas_limit: base.gas_limit, + gas_used: tracker.cumulative_gas_used, + timestamp: base.timestamp, + mix_hash: base.prev_randao, + nonce: B64::ZERO, + base_fee_per_gas: Some(base.base_fee_per_gas.to::()), + blob_gas_used: None, + excess_blob_gas: None, + parent_beacon_block_root: Some(base.parent_beacon_block_root), + requests_hash: None, + prev_proposer_pubkey: base.prev_proposer_pubkey, + extra_data: base.extra_data.clone(), + }; + + let block_hash = header.hash_slow(); + + let diff = BerachainFlashblockPayloadDiff { + state_root, + receipts_root, + logs_bloom, + gas_used: tracker.cumulative_gas_used, + block_hash, + transactions: tracker.interval_transactions.clone(), + withdrawals: diff_withdrawals, + withdrawals_root, + blob_gas_used: None, + }; + + let diff_hash = compute_diff_hash( + state_root, + receipts_root, + logs_bloom.as_slice(), + tracker.cumulative_gas_used, + block_hash, + &tracker.interval_transactions, + ); + let signature = signer.sign_flashblock(block_number, payload_id, index, diff_hash); + + let flashblock = BerachainFlashblockPayload { + payload_id, + index, + base: if include_base_in_payload { Some(base.clone()) } else { None }, + diff, + metadata: BerachainFlashblockPayloadMetadata { block_number }, + signature, + is_last, + }; + + match publisher.publish(&flashblock) { + Ok(count) => { + debug!( + target: "sequencer::builder", + payload_id = %payload_id, + index, + block_number, + block_hash = %block_hash, + transactions = tracker.interval_transactions.len(), + subscribers = count, + is_last, + "emitted flashblock" + ); + } + Err(e) => { + warn!( + target: "sequencer::builder", + payload_id = %payload_id, + index, + error = %e, + "failed to publish flashblock" + ); + } + } +} diff --git a/src/sequencer/mod.rs b/src/sequencer/mod.rs new file mode 100644 index 00000000..7452d6bd --- /dev/null +++ b/src/sequencer/mod.rs @@ -0,0 +1,37 @@ +//! Berachain sequencer module for flashblock production. +//! +//! This module provides the sequencer functionality that produces flashblocks +//! at regular intervals (~200ms) and publishes them via WebSocket. + +mod builder; +mod publisher; +pub mod signing; + +pub use builder::{FlashblockPayloadBuilder, FlashblockPayloadServiceBuilder}; +pub use publisher::WebSocketPublisher; +pub use signing::{BlsPublicKeyBytes, BlsSignature, FlashblockSigner, SigningError}; +pub use tokio_util::sync::CancellationToken; + +use std::{net::SocketAddr, sync::Arc, time::Duration}; + +/// Configuration for the sequencer. +#[derive(Debug, Clone)] +pub struct SequencerConfig { + /// Flashblock emission interval. + pub interval: Duration, + /// WebSocket address for flashblock publishing. + pub ws_addr: SocketAddr, + /// BLS signer for flashblock signing. + pub signer: Arc, +} + +impl SequencerConfig { + /// Create a new sequencer config with the required signer. + pub fn new(interval_ms: u64, ws_addr: SocketAddr, signer: FlashblockSigner) -> Self { + Self { + interval: Duration::from_millis(interval_ms), + ws_addr, + signer: Arc::new(signer), + } + } +} diff --git a/src/sequencer/publisher.rs b/src/sequencer/publisher.rs new file mode 100644 index 00000000..8e1cef4e --- /dev/null +++ b/src/sequencer/publisher.rs @@ -0,0 +1,186 @@ +//! WebSocket publisher for broadcasting flashblocks to subscribers. + +use crate::flashblocks::BerachainFlashblockPayload; +use futures_util::{SinkExt, StreamExt}; +use std::{ + io, + net::SocketAddr, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, +}; +use tokio::{ + net::{TcpListener, TcpStream}, + sync::broadcast, +}; +use tokio_tungstenite::{accept_async, tungstenite::Message}; +use tokio_util::sync::CancellationToken; +use tracing::{debug, error, info, warn}; + +/// Capacity for the flashblock broadcast channel. +/// At ~200ms intervals, 64 messages allows ~12.8 seconds of buffering for slow clients. +const FLASHBLOCK_CHANNEL_CAPACITY: usize = 64; + +/// WebSocket publisher that broadcasts flashblocks to all connected clients. +#[derive(Debug)] +pub struct WebSocketPublisher { + sender: broadcast::Sender, + address: SocketAddr, + subscriber_count: Arc, +} + +impl WebSocketPublisher { + /// Create a new WebSocket publisher. + pub fn new(address: SocketAddr) -> Self { + let (sender, _) = broadcast::channel(FLASHBLOCK_CHANNEL_CAPACITY); + Self { sender, address, subscriber_count: Arc::new(AtomicUsize::new(0)) } + } + + /// Get the number of active subscribers. + pub fn subscriber_count(&self) -> usize { + self.subscriber_count.load(Ordering::Relaxed) + } + + /// Get a receiver for flashblock messages. + pub fn subscribe(&self) -> broadcast::Receiver { + self.sender.subscribe() + } + + /// Publish a flashblock to all subscribers. + pub fn publish(&self, payload: &BerachainFlashblockPayload) -> io::Result { + let json = serde_json::to_string(payload) + .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; + + match self.sender.send(json) { + Ok(count) => { + debug!( + target: "sequencer::publisher", + payload_id = %payload.payload_id, + index = payload.index, + subscribers = count, + "published flashblock" + ); + Ok(count) + } + Err(_) => { + // No subscribers - this is fine + Ok(0) + } + } + } + + /// Run the WebSocket server until cancelled. + pub async fn run(&self, cancel: CancellationToken) -> eyre::Result<()> { + let listener = TcpListener::bind(self.address).await?; + info!( + target: "sequencer::publisher", + address = %self.address, + "flashblock WebSocket server started" + ); + + loop { + tokio::select! { + _ = cancel.cancelled() => { + info!(target: "sequencer::publisher", "shutting down WebSocket server"); + break; + } + result = listener.accept() => { + match result { + Ok((stream, addr)) => { + let rx = self.sender.subscribe(); + let count = self.subscriber_count.clone(); + let conn_cancel = cancel.clone(); + tokio::spawn(async move { + if let Err(e) = handle_connection(stream, addr, rx, count, conn_cancel).await { + warn!(target: "sequencer::publisher", %addr, error = %e, "connection error"); + } + }); + } + Err(e) => { + error!(target: "sequencer::publisher", error = %e, "failed to accept connection"); + } + } + } + } + } + + Ok(()) + } +} + +async fn handle_connection( + stream: TcpStream, + addr: SocketAddr, + mut rx: broadcast::Receiver, + subscriber_count: Arc, + cancel: CancellationToken, +) -> eyre::Result<()> { + let ws_stream = accept_async(stream).await?; + let (mut write, mut read) = ws_stream.split(); + + subscriber_count.fetch_add(1, Ordering::Relaxed); + info!( + target: "sequencer::publisher", + %addr, + count = subscriber_count.load(Ordering::Relaxed), + "client connected" + ); + + // Handle incoming messages and broadcast outgoing flashblocks + loop { + tokio::select! { + _ = cancel.cancelled() => { + // Send close frame before shutting down + let _ = write.send(Message::Close(None)).await; + break; + } + // Forward flashblocks to the client + result = rx.recv() => { + match result { + Ok(json) => { + if let Err(e) = write.send(Message::Text(json.into())).await { + debug!(target: "sequencer::publisher", %addr, error = %e, "failed to send message"); + break; + } + } + Err(broadcast::error::RecvError::Lagged(n)) => { + warn!(target: "sequencer::publisher", %addr, skipped = n, "client lagging"); + } + Err(broadcast::error::RecvError::Closed) => { + break; + } + } + } + // Handle client messages (ping/pong, close) + msg = read.next() => { + match msg { + Some(Ok(Message::Ping(data))) => { + if let Err(e) = write.send(Message::Pong(data)).await { + debug!(target: "sequencer::publisher", %addr, error = %e, "failed to send pong"); + break; + } + } + Some(Ok(Message::Close(_))) | None => { + break; + } + Some(Err(e)) => { + debug!(target: "sequencer::publisher", %addr, error = %e, "websocket error"); + break; + } + _ => {} + } + } + } + } + + subscriber_count.fetch_sub(1, Ordering::Relaxed); + info!( + target: "sequencer::publisher", + %addr, + count = subscriber_count.load(Ordering::Relaxed), + "client disconnected" + ); + + Ok(()) +} diff --git a/src/sequencer/signing.rs b/src/sequencer/signing.rs new file mode 100644 index 00000000..acd24ebb --- /dev/null +++ b/src/sequencer/signing.rs @@ -0,0 +1,252 @@ +//! BLS signing for flashblock preconfirmations. +//! +//! Implements the signing scheme for Berachain flashblocks: +//! `message = keccak256(domain || block_number || payload_id || index || diff_hash)` +//! where `domain = keccak256("BerachainPreconf-v1" || chain_id)` + +use alloy_primitives::{keccak256, B256}; +use blst::min_pk::{PublicKey, SecretKey, Signature}; +use reth::rpc::types::engine::PayloadId; +use std::path::Path; + +/// BLS signature bytes (96 bytes for BLS12-381 signatures). +pub type BlsSignature = [u8; 96]; + +/// BLS public key bytes (48 bytes for BLS12-381 public keys). +pub type BlsPublicKeyBytes = [u8; 48]; + +/// Domain separator version string. +const DOMAIN_VERSION: &[u8] = b"BerachainPreconf-v1"; + +/// BLS DST (Domain Separation Tag) matching beacon-kit's Proof of Possession scheme. +const BLS_DST: &[u8] = b"BLS_SIG_BLS12381G2_XMD:SHA-256_SSWU_RO_POP_"; + +/// Errors that can occur during signing operations. +#[derive(Debug, thiserror::Error)] +pub enum SigningError { + #[error("invalid secret key")] + InvalidSecretKey, + #[error("invalid public key")] + InvalidPublicKey, + #[error("invalid signature")] + InvalidSignature, + #[error("failed to read key file: {0}")] + KeyFileError(#[from] std::io::Error), + #[error("invalid key format: {0}")] + InvalidKeyFormat(String), +} + +/// BLS signer for flashblock preconfirmations. +#[derive(Clone)] +pub struct FlashblockSigner { + secret_key: SecretKey, + public_key: PublicKey, + domain: B256, +} + +impl std::fmt::Debug for FlashblockSigner { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("FlashblockSigner") + .field("public_key", &hex::encode(self.public_key.to_bytes())) + .field("domain", &self.domain) + .finish() + } +} + +impl FlashblockSigner { + /// Create a new signer from a secret key and chain ID. + pub fn new(secret_key: SecretKey, chain_id: u64) -> Self { + let public_key = secret_key.sk_to_pk(); + let domain = compute_domain(chain_id); + Self { secret_key, public_key, domain } + } + + /// Create a signer from a hex-encoded secret key. + pub fn from_hex(hex_key: &str, chain_id: u64) -> Result { + let key_bytes = hex::decode(hex_key.trim_start_matches("0x")) + .map_err(|e| SigningError::InvalidKeyFormat(e.to_string()))?; + + if key_bytes.len() != 32 { + return Err(SigningError::InvalidKeyFormat(format!( + "expected 32 bytes, got {}", + key_bytes.len() + ))); + } + + let secret_key = SecretKey::from_bytes(&key_bytes) + .map_err(|_| SigningError::InvalidSecretKey)?; + + Ok(Self::new(secret_key, chain_id)) + } + + /// Load a signer from a key file (hex-encoded secret key). + pub fn from_file(path: impl AsRef, chain_id: u64) -> Result { + let contents = std::fs::read_to_string(path)?; + Self::from_hex(contents.trim(), chain_id) + } + + /// Get the public key bytes. + pub fn public_key_bytes(&self) -> BlsPublicKeyBytes { + self.public_key.to_bytes() + } + + /// Sign a flashblock. + pub fn sign_flashblock( + &self, + block_number: u64, + payload_id: PayloadId, + index: u64, + diff_hash: B256, + ) -> BlsSignature { + let message = compute_signing_message( + self.domain, + block_number, + payload_id, + index, + diff_hash, + ); + + let signature = self.secret_key.sign(&message, BLS_DST, &[]); + signature.to_bytes() + } + + /// Verify a flashblock signature. + pub fn verify( + public_key: &BlsPublicKeyBytes, + signature: &BlsSignature, + chain_id: u64, + block_number: u64, + payload_id: PayloadId, + index: u64, + diff_hash: B256, + ) -> Result { + let pk = PublicKey::from_bytes(public_key) + .map_err(|_| SigningError::InvalidPublicKey)?; + let sig = Signature::from_bytes(signature) + .map_err(|_| SigningError::InvalidSignature)?; + + let domain = compute_domain(chain_id); + let message = compute_signing_message(domain, block_number, payload_id, index, diff_hash); + + let result = sig.verify( + true, + &message, + BLS_DST, + &[], + &pk, + true, + ); + + Ok(result == blst::BLST_ERROR::BLST_SUCCESS) + } +} + +/// Compute the domain separator for the given chain ID. +fn compute_domain(chain_id: u64) -> B256 { + let mut data = Vec::with_capacity(DOMAIN_VERSION.len() + 8); + data.extend_from_slice(DOMAIN_VERSION); + data.extend_from_slice(&chain_id.to_be_bytes()); + keccak256(&data) +} + +/// Compute the message to sign for a flashblock. +fn compute_signing_message( + domain: B256, + block_number: u64, + payload_id: PayloadId, + index: u64, + diff_hash: B256, +) -> [u8; 32] { + let mut data = Vec::with_capacity(32 + 8 + 8 + 8 + 32); + data.extend_from_slice(domain.as_slice()); + data.extend_from_slice(&block_number.to_be_bytes()); + data.extend_from_slice(payload_id.0.as_slice()); + data.extend_from_slice(&index.to_be_bytes()); + data.extend_from_slice(diff_hash.as_slice()); + keccak256(&data).0 +} + +/// Compute the hash of a flashblock diff for signing. +pub fn compute_diff_hash( + state_root: B256, + receipts_root: B256, + logs_bloom: &[u8], + gas_used: u64, + block_hash: B256, + transactions: &[impl AsRef<[u8]>], +) -> B256 { + let mut data = Vec::new(); + data.extend_from_slice(state_root.as_slice()); + data.extend_from_slice(receipts_root.as_slice()); + data.extend_from_slice(logs_bloom); + data.extend_from_slice(&gas_used.to_be_bytes()); + data.extend_from_slice(block_hash.as_slice()); + for tx in transactions { + data.extend_from_slice(tx.as_ref()); + } + keccak256(&data) +} + +#[cfg(test)] +mod tests { + use super::*; + + fn test_secret_key() -> SecretKey { + let seed = [1u8; 32]; + SecretKey::key_gen(&seed, &[]).unwrap() + } + + #[test] + fn test_sign_and_verify() { + let chain_id = 80094; + let signer = FlashblockSigner::new(test_secret_key(), chain_id); + + let block_number = 100; + let payload_id = PayloadId::new([1u8; 8]); + let index = 0; + let diff_hash = B256::repeat_byte(0x42); + + let signature = signer.sign_flashblock(block_number, payload_id, index, diff_hash); + + let valid = FlashblockSigner::verify( + &signer.public_key_bytes(), + &signature, + chain_id, + block_number, + payload_id, + index, + diff_hash, + ) + .unwrap(); + + assert!(valid); + } + + #[test] + fn test_invalid_signature_fails_verification() { + let chain_id = 80094; + let signer = FlashblockSigner::new(test_secret_key(), chain_id); + + let block_number = 100; + let payload_id = PayloadId::new([1u8; 8]); + let index = 0; + let diff_hash = B256::repeat_byte(0x42); + + let signature = signer.sign_flashblock(block_number, payload_id, index, diff_hash); + + // Verify with wrong diff_hash should fail + let wrong_diff_hash = B256::repeat_byte(0x43); + let valid = FlashblockSigner::verify( + &signer.public_key_bytes(), + &signature, + chain_id, + block_number, + payload_id, + index, + wrong_diff_hash, + ) + .unwrap(); + + assert!(!valid); + } +} diff --git a/tests/e2e/flashblocks.rs b/tests/e2e/flashblocks.rs index 6b442d40..19e9a88b 100644 --- a/tests/e2e/flashblocks.rs +++ b/tests/e2e/flashblocks.rs @@ -69,6 +69,8 @@ fn create_test_flashblock( blob_gas_used: None, }, metadata: BerachainFlashblockPayloadMetadata { block_number }, + signature: [0u8; 96], + is_last: false, } } From 7c8abe862540973350076744cee2cd5dbd430d5a Mon Sep 17 00:00:00 2001 From: Fridrik Asmundsson Date: Sat, 7 Feb 2026 15:11:07 +0000 Subject: [PATCH 2/5] remove computed roots from flashblocks + trim tests --- Cargo.toml | 2 +- src/flashblocks/mod.rs | 381 ++++------------------------------ src/flashblocks/test_utils.rs | 23 +- src/main.rs | 6 +- src/node/evm/builder.rs | 44 +--- src/node/evm/mod.rs | 2 +- src/sequencer/builder.rs | 195 ++++------------- src/sequencer/mod.rs | 6 +- src/sequencer/publisher.rs | 20 +- src/sequencer/signing.rs | 97 +++------ tests/e2e/flashblocks.rs | 15 +- 11 files changed, 145 insertions(+), 646 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index be9c1fef..707756b3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,6 +31,7 @@ jsonrpsee-core = { version = "0.26.0", features = ["server"] } jsonrpsee-proc-macros = "0.26.0" async-trait = "0.1.88" +futures-util = "0.3" modular-bitfield = "0.11.2" reth = { git = "https://github.com/paradigmxyz/reth", rev = "536bebfcd" } reth-basic-payload-builder = { git = "https://github.com/paradigmxyz/reth", rev = "536bebfcd" } @@ -77,7 +78,6 @@ tokio = { version = "1.46.0", features = ["sync", "net", "macros"] } tokio-tungstenite = "0.26" tokio-util = "0.7" tracing = "0.1.41" -futures-util = "0.3" [dev-dependencies] alloy-provider = "1.1.2" diff --git a/src/flashblocks/mod.rs b/src/flashblocks/mod.rs index a0becb07..86819a24 100644 --- a/src/flashblocks/mod.rs +++ b/src/flashblocks/mod.rs @@ -2,8 +2,7 @@ pub mod test_utils; use crate::{ - primitives::header::BlsPublicKey, - sequencer::signing::BlsSignature, + primitives::header::BlsPublicKey, sequencer::signing::BlsSignature, transaction::BerachainTxEnvelope, }; use alloy_consensus::{crypto::RecoveryError, transaction::Recovered}; @@ -58,49 +57,31 @@ impl FlashblockPayloadBase for BerachainFlashblockPayloadBase { #[derive(Clone, Debug, Default, PartialEq, Eq, serde::Serialize, serde::Deserialize)] pub struct BerachainFlashblockPayloadDiff { - /// The state root of the block. - pub state_root: B256, - /// The receipts root of the block. - pub receipts_root: B256, - /// The logs bloom of the block. - pub logs_bloom: Bloom, - /// The gas used of the block. - #[serde(with = "alloy_serde::quantity")] - pub gas_used: u64, - /// The block hash of the block. - pub block_hash: B256, /// The transactions of the block. pub transactions: Vec, - /// Array of [`Withdrawal`] enabled with V2 + /// Array of [`Withdrawal`] - only non-empty in flashblock 0. pub withdrawals: Vec, - /// The withdrawals root of the block. - pub withdrawals_root: B256, - /// The estimated cumulative blob gas used for the block. Introduced in Jovian. - /// spec: - /// Defaults to 0 if not present (for pre-Jovian blocks). - #[serde(default, skip_serializing_if = "Option::is_none", with = "alloy_serde::quantity::opt")] - pub blob_gas_used: Option, } impl FlashblockDiff for BerachainFlashblockPayloadDiff { fn block_hash(&self) -> B256 { - self.block_hash + B256::ZERO } fn state_root(&self) -> B256 { - self.state_root + B256::ZERO } fn gas_used(&self) -> u64 { - self.gas_used + 0 // Not tracked per-flashblock per BRIP-0007 } fn logs_bloom(&self) -> &Bloom { - &self.logs_bloom + &Bloom::ZERO } fn receipts_root(&self) -> B256 { - self.receipts_root + B256::ZERO } fn transactions_raw(&self) -> &[Bytes] { @@ -152,10 +133,7 @@ mod signature_serde { let hex_str = hex_str.trim_start_matches("0x"); let bytes = hex::decode(hex_str).map_err(serde::de::Error::custom)?; if bytes.len() != 96 { - return Err(serde::de::Error::custom(format!( - "expected 96 bytes, got {}", - bytes.len() - ))); + return Err(serde::de::Error::custom(format!("expected 96 bytes, got {}", bytes.len()))); } let mut arr = [0u8; 96]; arr.copy_from_slice(&bytes); @@ -212,326 +190,51 @@ impl FlashblockPayload for BerachainFlashblockPayload { mod tests { use super::*; use crate::flashblocks::test_utils::BerachainTestFlashBlockFactory; - use reth_optimism_flashblocks::{FlashBlockCompleteSequence, FlashBlockPendingSequence}; - - mod serde_tests { - use super::*; - - #[test] - fn test_flashblock_payload_serde_roundtrip() { - let factory = BerachainTestFlashBlockFactory::new(); - let fb = - factory.flashblock_at(0).transactions(vec![Bytes::from_static(&[1, 2, 3])]).build(); - - let serialized = serde_json::to_string(&fb).expect("serialize"); - let deserialized: BerachainFlashblockPayload = - serde_json::from_str(&serialized).expect("deserialize"); - - assert_eq!(fb, deserialized); - } - - #[test] - fn test_flashblock_payload_base_serde_roundtrip() { - let base = BerachainFlashblockPayloadBase { - parent_beacon_block_root: B256::random(), - parent_hash: B256::random(), - fee_recipient: Address::random(), - prev_randao: B256::random(), - block_number: 100, - gas_limit: 30_000_000, - timestamp: 1_000_000, - extra_data: Bytes::from_static(&[0x42]), - base_fee_per_gas: U256::from(1_000_000_000u64), - prev_proposer_pubkey: Some(BlsPublicKey::random()), - }; - - let serialized = serde_json::to_string(&base).expect("serialize"); - let deserialized: BerachainFlashblockPayloadBase = - serde_json::from_str(&serialized).expect("deserialize"); - - assert_eq!(base, deserialized); - } - - #[test] - fn test_flashblock_payload_diff_serde_roundtrip() { - let diff = BerachainFlashblockPayloadDiff { - state_root: B256::random(), - receipts_root: B256::random(), - logs_bloom: Bloom::default(), - gas_used: 21000, - block_hash: B256::random(), - transactions: vec![Bytes::from_static(&[1, 2, 3])], - withdrawals: vec![], - withdrawals_root: B256::ZERO, - blob_gas_used: Some(131072), - }; - - let serialized = serde_json::to_string(&diff).expect("serialize"); - let deserialized: BerachainFlashblockPayloadDiff = - serde_json::from_str(&serialized).expect("deserialize"); - - assert_eq!(diff, deserialized); - } + use reth_optimism_flashblocks::FlashBlockCompleteSequence; - #[test] - fn test_flashblock_sequence_serde_roundtrip() { - let factory = BerachainTestFlashBlockFactory::new(); - let fb0 = factory.flashblock_at(0).build(); - let fb1 = factory.flashblock_after(&fb0).build(); + #[test] + fn test_flashblock_payload_serde_roundtrip() { + let factory = BerachainTestFlashBlockFactory::new(); + let fb = + factory.flashblock_at(0).transactions(vec![Bytes::from_static(&[1, 2, 3])]).build(); - let fbs = vec![fb0, fb1]; - let serialized = serde_json::to_string(&fbs).expect("serialize"); - let deserialized: Vec = - serde_json::from_str(&serialized).expect("deserialize"); + let serialized = serde_json::to_string(&fb).expect("serialize"); + let deserialized: BerachainFlashblockPayload = + serde_json::from_str(&serialized).expect("deserialize"); - assert_eq!(fbs.len(), deserialized.len()); - assert_eq!(fbs[0], deserialized[0]); - assert_eq!(fbs[1], deserialized[1]); - } + assert_eq!(fb, deserialized); } - mod pending_sequence_tests { - use super::*; - - #[test] - fn test_insert_index_zero_creates_new_sequence() { - let mut sequence: FlashBlockPendingSequence = - FlashBlockPendingSequence::new(); - let factory = BerachainTestFlashBlockFactory::new(); - let fb0 = factory.flashblock_at(0).build(); - let payload_id = fb0.payload_id; - - sequence.insert(fb0); - - assert_eq!(sequence.count(), 1); - assert_eq!(sequence.block_number(), Some(100)); - assert_eq!(sequence.payload_id(), Some(payload_id)); - } - - #[test] - fn test_insert_followup_same_block_and_payload() { - let mut sequence: FlashBlockPendingSequence = - FlashBlockPendingSequence::new(); - let factory = BerachainTestFlashBlockFactory::new(); - - let fb0 = factory.flashblock_at(0).build(); - sequence.insert(fb0.clone()); - - let fb1 = factory.flashblock_after(&fb0).build(); - sequence.insert(fb1.clone()); - - let fb2 = factory.flashblock_after(&fb1).build(); - sequence.insert(fb2); - - assert_eq!(sequence.count(), 3); - assert_eq!(sequence.index(), Some(2)); - } - - #[test] - fn test_insert_ignores_different_block_number() { - let mut sequence: FlashBlockPendingSequence = - FlashBlockPendingSequence::new(); - let factory = BerachainTestFlashBlockFactory::new(); - - let fb0 = factory.flashblock_at(0).build(); - sequence.insert(fb0.clone()); - - let fb1 = factory.flashblock_after(&fb0).block_number(101).build(); - sequence.insert(fb1); - - assert_eq!(sequence.count(), 1); - assert_eq!(sequence.block_number(), Some(100)); - } - - #[test] - fn test_insert_ignores_different_payload_id() { - let mut sequence: FlashBlockPendingSequence = - FlashBlockPendingSequence::new(); - let factory = BerachainTestFlashBlockFactory::new(); - - let fb0 = factory.flashblock_at(0).build(); - let payload_id1 = fb0.payload_id; - sequence.insert(fb0.clone()); - - let payload_id2 = PayloadId::new([2u8; 8]); - let fb1 = factory.flashblock_after(&fb0).payload_id(payload_id2).build(); - sequence.insert(fb1); - - assert_eq!(sequence.count(), 1); - assert_eq!(sequence.payload_id(), Some(payload_id1)); - } - - #[test] - fn test_finalize_empty_sequence_fails() { - let mut sequence: FlashBlockPendingSequence = - FlashBlockPendingSequence::new(); - let result = sequence.finalize(); - - assert!(result.is_err()); - } - - #[test] - fn test_finalize_clears_pending_state() { - let mut sequence: FlashBlockPendingSequence = - FlashBlockPendingSequence::new(); - let factory = BerachainTestFlashBlockFactory::new(); + #[test] + fn test_transaction_aggregation_across_flashblocks() { + let factory = BerachainTestFlashBlockFactory::new(); - let fb0 = factory.flashblock_at(0).build(); - sequence.insert(fb0); + let fb0 = factory + .flashblock_at(0) + .transactions(vec![Bytes::from_static(&[1, 2, 3]), Bytes::from_static(&[4, 5, 6])]) + .build(); + let fb1 = factory + .flashblock_after(&fb0) + .transactions(vec![Bytes::from_static(&[7, 8, 9])]) + .build(); - assert_eq!(sequence.count(), 1); + let complete = FlashBlockCompleteSequence::new(vec![fb0, fb1], None).unwrap(); + let all_txs = complete.all_transactions(); - let _complete = sequence.finalize().unwrap(); - - assert_eq!(sequence.count(), 0); - assert_eq!(sequence.block_number(), None); - } + assert_eq!(all_txs.len(), 3); + assert_eq!(all_txs[0], Bytes::from_static(&[1, 2, 3])); + assert_eq!(all_txs[1], Bytes::from_static(&[4, 5, 6])); + assert_eq!(all_txs[2], Bytes::from_static(&[7, 8, 9])); } - mod complete_sequence_tests { - use super::*; + #[test] + fn test_berachain_payload_base_preserved() { + let factory = BerachainTestFlashBlockFactory::new(); + let pubkey = BlsPublicKey::random(); - #[test] - fn test_new_empty_sequence_fails() { - let result = - FlashBlockCompleteSequence::::new(vec![], None); - assert!(result.is_err()); - } + let fb0 = factory.flashblock_at(0).prev_proposer_pubkey(Some(pubkey)).build(); - #[test] - fn test_new_requires_base_at_index_zero() { - let factory = BerachainTestFlashBlockFactory::new(); - let mut fb0_no_base = factory.flashblock_at(1).build(); - fb0_no_base.index = 0; - fb0_no_base.base = None; - - let result = FlashBlockCompleteSequence::new(vec![fb0_no_base], None); - assert!(result.is_err()); - } - - #[test] - fn test_new_validates_successive_indices() { - let factory = BerachainTestFlashBlockFactory::new(); - - let fb0 = factory.flashblock_at(0).build(); - let fb2 = factory.flashblock_after(&fb0).index(2).build(); - - let result = FlashBlockCompleteSequence::new(vec![fb0, fb2], None); - assert!(result.is_err()); - } - - #[test] - fn test_new_valid_single_flashblock() { - let factory = BerachainTestFlashBlockFactory::new(); - let fb0 = factory.flashblock_at(0).build(); - - let result = FlashBlockCompleteSequence::new(vec![fb0], None); - assert!(result.is_ok()); - - let complete = result.unwrap(); - assert_eq!(complete.count(), 1); - assert_eq!(complete.block_number(), 100); - } - - #[test] - fn test_new_valid_multiple_flashblocks() { - let factory = BerachainTestFlashBlockFactory::new(); - - let fb0 = factory.flashblock_at(0).build(); - let fb1 = factory.flashblock_after(&fb0).build(); - let fb2 = factory.flashblock_after(&fb1).build(); - - let result = FlashBlockCompleteSequence::new(vec![fb0, fb1, fb2], None); - assert!(result.is_ok()); - - let complete = result.unwrap(); - assert_eq!(complete.count(), 3); - assert_eq!(complete.last().index(), 2); - } - - #[test] - fn test_all_transactions_aggregates_correctly() { - let factory = BerachainTestFlashBlockFactory::new(); - - let fb0 = factory - .flashblock_at(0) - .transactions(vec![Bytes::from_static(&[1, 2, 3]), Bytes::from_static(&[4, 5, 6])]) - .build(); - - let fb1 = factory - .flashblock_after(&fb0) - .transactions(vec![Bytes::from_static(&[7, 8, 9])]) - .build(); - - let complete = FlashBlockCompleteSequence::new(vec![fb0, fb1], None).unwrap(); - let all_txs = complete.all_transactions(); - - assert_eq!(all_txs.len(), 3); - assert_eq!(all_txs[0], Bytes::from_static(&[1, 2, 3])); - assert_eq!(all_txs[1], Bytes::from_static(&[4, 5, 6])); - assert_eq!(all_txs[2], Bytes::from_static(&[7, 8, 9])); - } - - #[test] - fn test_berachain_specific_payload_base_preserved() { - let factory = BerachainTestFlashBlockFactory::new(); - let pubkey = BlsPublicKey::random(); - - let fb0 = factory.flashblock_at(0).prev_proposer_pubkey(Some(pubkey)).build(); - - let complete = FlashBlockCompleteSequence::new(vec![fb0], None).unwrap(); - let base = complete.payload_base(); - - assert_eq!(base.prev_proposer_pubkey, Some(pubkey)); - } - } - - mod sequence_conversion_tests { - use super::*; - - #[test] - fn test_try_from_pending_to_complete_valid() { - let mut pending: FlashBlockPendingSequence = - FlashBlockPendingSequence::new(); - let factory = BerachainTestFlashBlockFactory::new(); - - let fb0 = factory.flashblock_at(0).build(); - pending.insert(fb0); - - let complete: Result, _> = - pending.try_into(); - assert!(complete.is_ok()); - assert_eq!(complete.unwrap().count(), 1); - } - - #[test] - fn test_try_from_pending_to_complete_empty_fails() { - let pending: FlashBlockPendingSequence = - FlashBlockPendingSequence::new(); - - let complete: Result, _> = - pending.try_into(); - assert!(complete.is_err()); - } - - #[test] - fn test_finalize_multiple_times_after_refill() { - let mut sequence: FlashBlockPendingSequence = - FlashBlockPendingSequence::new(); - let factory = BerachainTestFlashBlockFactory::new(); - - let fb0 = factory.flashblock_at(0).build(); - sequence.insert(fb0); - - let complete1 = sequence.finalize().unwrap(); - assert_eq!(complete1.count(), 1); - - let fb1 = factory.flashblock_for_next_block(&complete1.last().clone()).build(); - sequence.insert(fb1); - - let complete2 = sequence.finalize().unwrap(); - assert_eq!(complete2.count(), 1); - assert_eq!(complete2.block_number(), 101); - } + let complete = FlashBlockCompleteSequence::new(vec![fb0], None).unwrap(); + assert_eq!(complete.payload_base().prev_proposer_pubkey, Some(pubkey)); } } diff --git a/src/flashblocks/test_utils.rs b/src/flashblocks/test_utils.rs index 75dd67a0..abb5294a 100644 --- a/src/flashblocks/test_utils.rs +++ b/src/flashblocks/test_utils.rs @@ -7,7 +7,7 @@ use crate::{ }, primitives::header::BlsPublicKey, }; -use alloy_primitives::{Address, B256, Bloom, Bytes, U256}; +use alloy_primitives::{Address, B256, Bytes, U256}; use reth::rpc::types::engine::PayloadId; #[derive(Debug)] @@ -22,11 +22,6 @@ impl BerachainTestFlashBlockFactory { Self { block_time: 2, base_timestamp: 1_000_000, current_block_number: 100 } } - pub fn with_block_number(mut self, block_number: u64) -> Self { - self.current_block_number = block_number; - self - } - pub fn flashblock_at(&self, index: u64) -> BerachainTestFlashBlockBuilder { self.builder().index(index).block_number(self.current_block_number) } @@ -35,8 +30,7 @@ impl BerachainTestFlashBlockFactory { &self, previous: &BerachainFlashblockPayload, ) -> BerachainTestFlashBlockBuilder { - let parent_hash = - previous.base.as_ref().map(|b| b.parent_hash).unwrap_or(previous.diff.block_hash); + let parent_hash = previous.base.as_ref().map(|b| b.parent_hash).unwrap_or_default(); self.builder() .index(previous.index + 1) @@ -58,7 +52,7 @@ impl BerachainTestFlashBlockFactory { .index(0) .block_number(prev_block_number + 1) .payload_id(PayloadId::new(B256::random().0[0..8].try_into().unwrap())) - .parent_hash(previous.diff.block_hash) + .parent_hash(B256::random()) .timestamp(prev_timestamp + self.block_time) } @@ -69,7 +63,6 @@ impl BerachainTestFlashBlockFactory { payload_id: PayloadId::new([1u8; 8]), parent_hash: B256::random(), timestamp: self.base_timestamp, - gas_limit: 30_000_000, transactions: vec![], prev_proposer_pubkey: Some(BlsPublicKey::random()), } @@ -89,7 +82,6 @@ pub struct BerachainTestFlashBlockBuilder { payload_id: PayloadId, parent_hash: B256, timestamp: u64, - gas_limit: u64, transactions: Vec, prev_proposer_pubkey: Option, } @@ -138,7 +130,7 @@ impl BerachainTestFlashBlockBuilder { fee_recipient: Address::default(), prev_randao: B256::random(), block_number: self.block_number, - gas_limit: self.gas_limit, + gas_limit: 30_000_000, timestamp: self.timestamp, extra_data: Default::default(), base_fee_per_gas: U256::from(1_000_000_000u64), @@ -153,15 +145,8 @@ impl BerachainTestFlashBlockBuilder { payload_id: self.payload_id, base, diff: BerachainFlashblockPayloadDiff { - block_hash: B256::random(), - state_root: B256::random(), - receipts_root: B256::random(), - logs_bloom: Bloom::default(), - gas_used: 0, transactions: self.transactions, withdrawals: vec![], - withdrawals_root: B256::ZERO, - blob_gas_used: None, }, metadata: BerachainFlashblockPayloadMetadata { block_number: self.block_number }, signature: [0u8; 96], diff --git a/src/main.rs b/src/main.rs index 168e740f..f669cb37 100644 --- a/src/main.rs +++ b/src/main.rs @@ -8,14 +8,16 @@ use bera_reth::{ consensus::BerachainBeaconConsensus, evm::BerachainEvmFactory, node::{BerachainNode, evm::config::BerachainEvmConfig}, - sequencer::{FlashblockPayloadServiceBuilder, FlashblockSigner, SequencerConfig, WebSocketPublisher}, + sequencer::{ + FlashblockPayloadServiceBuilder, FlashblockSigner, SequencerConfig, WebSocketPublisher, + }, version::init_bera_version, }; use clap::Parser; use reth::CliRunner; use reth_chainspec::EthChainSpec; use reth_ethereum_cli::Cli; -use reth_node_builder::{components::BasicPayloadServiceBuilder, Node, NodeHandle}; +use reth_node_builder::{Node, NodeHandle, components::BasicPayloadServiceBuilder}; use std::{net::SocketAddr, path::PathBuf, sync::Arc}; use tokio_util::sync::CancellationToken; use tracing::info; diff --git a/src/node/evm/builder.rs b/src/node/evm/builder.rs index e35ccb17..eea7e2c0 100644 --- a/src/node/evm/builder.rs +++ b/src/node/evm/builder.rs @@ -3,13 +3,7 @@ use crate::{ transaction::BerachainTxEnvelope, }; use alloy_consensus::BlockHeader; -use reth::revm::{ - State, - context::result::ExecutionResult, - db::states::bundle_state::BundleRetention, - database_interface::Database, -}; -use revm::database::BundleState; +use reth::revm::context::result::ExecutionResult; use reth_evm::{ Evm, block::{BlockExecutionError, BlockExecutor, CommitChanges}, @@ -19,42 +13,6 @@ use reth_primitives_traits::RecoveredBlock; use reth_storage_api::StateProvider; use std::sync::Arc; -/// Trait for accessing state data needed for flashblock state root computation. -/// -/// This trait abstracts the `State` type's methods that we need for computing -/// intermediate state roots during flashblock production. By using a trait instead -/// of concrete types, we can work through the `BlockBuilder` trait abstraction. -pub trait FlashblockState { - /// Merge pending transitions into the bundle state. - /// - /// This accumulates state changes so they can be used for state root computation. - fn merge_transitions_for_flashblock(&mut self); - - /// Get a reference to the accumulated bundle state. - fn bundle_state(&self) -> &BundleState; -} - -impl FlashblockState for State { - fn merge_transitions_for_flashblock(&mut self) { - self.merge_transitions(BundleRetention::PlainState); - } - - fn bundle_state(&self) -> &BundleState { - &self.bundle_state - } -} - -// Blanket impl for mutable references (needed because Evm::DB is &'a mut State) -impl FlashblockState for &mut T { - fn merge_transitions_for_flashblock(&mut self) { - (*self).merge_transitions_for_flashblock() - } - - fn bundle_state(&self) -> &BundleState { - (**self).bundle_state() - } -} - type EResult = ExecutionResult<<::Evm as Evm>::HaltReason>; /// Berachain block builder wrapper that fixes sender/transaction mismatch from PoL injection. diff --git a/src/node/evm/mod.rs b/src/node/evm/mod.rs index ecb82109..bd43c9ed 100644 --- a/src/node/evm/mod.rs +++ b/src/node/evm/mod.rs @@ -8,7 +8,7 @@ pub mod error; pub mod executor; pub mod receipt; -pub use builder::{BerachainBlockBuilder, FlashblockState}; +pub use builder::BerachainBlockBuilder; use crate::{ evm::BerachainEvmFactory, diff --git a/src/sequencer/builder.rs b/src/sequencer/builder.rs index d8a2969c..2f7c82aa 100644 --- a/src/sequencer/builder.rs +++ b/src/sequencer/builder.rs @@ -11,29 +11,22 @@ use crate::{ BerachainFlashblockPayloadMetadata, }, hardforks::BerachainHardforks, - node::evm::{ - FlashblockState, - config::{BerachainEvmConfig, BerachainNextBlockEnvAttributes}, - }, + node::evm::config::{BerachainEvmConfig, BerachainNextBlockEnvAttributes}, primitives::BerachainHeader, sequencer::{ - signing::{compute_diff_hash, FlashblockSigner}, SequencerConfig, WebSocketPublisher, + signing::{FlashblockSigner, compute_transactions_hash}, }, transaction::BerachainTxEnvelope, }; -use alloy_consensus::{ - Transaction, TxReceipt, EMPTY_OMMER_ROOT_HASH, EMPTY_ROOT_HASH, - proofs::{calculate_withdrawals_root, ordered_trie_root_with_encoder}, -}; +use alloy_consensus::Transaction; use alloy_eips::{eip2718::Encodable2718, eip4895::Withdrawal}; -use bytes::BufMut; -use alloy_primitives::{logs_bloom, B256, B64, Bloom, Bytes, Sealable, U256}; +use alloy_primitives::{Bytes, U256}; use reth::{ api::{FullNodeTypes, NodeTypes, PayloadBuilderError, PayloadTypes, TxTy}, chainspec::EthereumHardforks, providers::StateProviderFactory, - revm::{context::Block, database::StateProviderDatabase, State}, + revm::{State, context::Block, database::StateProviderDatabase}, transaction_pool::{PoolTransaction, TransactionPool}, }; use reth_basic_payload_builder::{ @@ -43,16 +36,16 @@ use reth_chainspec::{ChainSpecProvider, EthChainSpec}; use reth_ethereum_payload_builder::EthereumBuilderConfig; use reth_ethereum_primitives::Receipt; use reth_evm::{ + ConfigureEvm, Evm, block::{BlockExecutionError, BlockValidationError, CommitChanges}, execute::{BlockBuilder, BlockBuilderOutcome}, - ConfigureEvm, Evm, }; -use reth_node_builder::{components::PayloadBuilderBuilder, BuilderContext, PayloadBuilderConfig}; +use reth_node_builder::{BuilderContext, PayloadBuilderConfig, components::PayloadBuilderBuilder}; use reth_payload_primitives::PayloadBuilderAttributes; use reth_primitives_traits::transaction::error::InvalidTransactionError; use reth_transaction_pool::{ - error::InvalidPoolTransactionError, BestTransactions, BestTransactionsAttributes, - ValidPoolTransaction, + BestTransactions, BestTransactionsAttributes, ValidPoolTransaction, + error::InvalidPoolTransactionError, }; use std::{ sync::Arc, @@ -231,43 +224,12 @@ impl FlashblockExecutionTracker { } } - /// Compute the receipts root from accumulated receipts. - fn receipts_root(&self) -> B256 { - Receipt::calculate_receipt_root_no_memo(&self.receipts) - } - - /// Compute the logs bloom from accumulated receipts. - fn logs_bloom(&self) -> Bloom { - logs_bloom(self.receipts.iter().flat_map(|r| r.logs())) - } - /// Clear interval transactions for next flashblock. fn clear_interval(&mut self) { self.interval_transactions.clear(); } } -use reth_storage_api::{HashedPostStateProvider, StateRootProvider}; - -/// Compute the intermediate state root from a block builder. -/// -/// Merges pending state transitions and computes the state root. Requires -/// the builder's database to implement [`FlashblockState`]. -fn compute_intermediate_state_root( - builder: &mut B, - state_provider: &S, -) -> reth_storage_api::errors::ProviderResult -where - B: BlockBuilder, - S: StateRootProvider + HashedPostStateProvider, - <::Evm as Evm>::DB: FlashblockState, -{ - let db = builder.evm_mut().db_mut(); - db.merge_transitions_for_flashblock(); - let hashed_state = state_provider.hashed_post_state(db.bundle_state()); - state_provider.state_root(hashed_state) -} - /// Build a payload while emitting flashblocks at regular intervals. #[allow(clippy::too_many_arguments)] fn build_flashblock_payload( @@ -370,39 +332,21 @@ where let interval = sequencer_config.interval; let build_start_time = Instant::now(); - // Helper to compute state root, emit flashblock, and handle errors. - // Returns true if emission succeeded. - let try_emit_flashblock = - |builder: &mut _, flashblock_index: u64, tracker: &_, is_last: bool| -> bool { - match compute_intermediate_state_root(builder, &state_provider) { - Ok(state_root) => { - emit_flashblock( - &publisher, - payload_id, - flashblock_index, - &base, - flashblock_index == 0, - tracker, - block_number, - &sequencer_config.signer, - state_root, - &withdrawals, - is_last, - ); - true - } - Err(e) => { - warn!( - target: "sequencer::builder", - payload_id = %payload_id, - index = flashblock_index, - error = %e, - "skipping flashblock emission due to state root computation failure" - ); - false - } - } - }; + // Helper to emit flashblock. Per BRIP-0007, no state root computation needed. + let emit = |flashblock_index: u64, tracker: &FlashblockExecutionTracker, is_last: bool| { + emit_flashblock( + &publisher, + payload_id, + flashblock_index, + &base, + flashblock_index == 0, + tracker, + block_number, + &sequencer_config.signer, + &withdrawals, + is_last, + ); + }; // Main transaction execution loop with flashblock emission. // Flashblocks are emitted at regular intervals (~200ms) regardless of transaction activity. @@ -453,10 +397,9 @@ where // Emit flashblock at regular intervals (may be empty, serving as heartbeat) if last_flashblock_time.elapsed() >= interval { - if try_emit_flashblock(&mut builder, flashblock_index, &tracker, false) { - flashblock_index += 1; - tracker.clear_interval(); - } + emit(flashblock_index, &tracker, false); + flashblock_index += 1; + tracker.clear_interval(); last_flashblock_time = Instant::now(); } @@ -481,11 +424,11 @@ where // Execute the transaction and capture the result let mut execution_logs = Vec::new(); + let mut tx_success = false; let result = builder.execute_transaction_with_commit_condition(tx.clone(), |exec_result| { - // Capture execution result before commit decision - // ExecutionResult contains the output with logs - if exec_result.is_success() { + tx_success = exec_result.is_success(); + if tx_success { execution_logs = exec_result.logs().to_vec(); } CommitChanges::Yes @@ -516,7 +459,7 @@ where // Build receipt from execution result let receipt = Receipt { tx_type: tx.tx_type(), - success: true, + success: tx_success, cumulative_gas_used: tracker.cumulative_gas_used + gas_used, logs: execution_logs, }; @@ -545,7 +488,7 @@ where // Always emit the final flashblock marked as last, even if empty. // This signals to RPC nodes that no more flashblocks will arrive for this payload. - try_emit_flashblock(&mut builder, flashblock_index, &tracker, true); + emit(flashblock_index, &tracker, true); // Finalize the block let BlockBuilderOutcome { execution_result, block, .. } = builder.finish(&state_provider)?; @@ -565,7 +508,8 @@ where "sealed flashblock payload ready for getPayload" ); - let payload = BerachainBuiltPayload::new(payload_id, sealed_block, tracker.total_fees, requests); + let payload = + BerachainBuiltPayload::new(payload_id, sealed_block, tracker.total_fees, requests); Ok(BuildOutcome::Better { payload, cached_reads }) } @@ -580,81 +524,21 @@ fn emit_flashblock( tracker: &FlashblockExecutionTracker, block_number: u64, signer: &FlashblockSigner, - state_root: B256, withdrawals: &[Withdrawal], is_last: bool, ) { - // Compute roots from accumulated receipts - let receipts_root = tracker.receipts_root(); - let logs_bloom = tracker.logs_bloom(); - - // Compute transactions root from all transactions (already encoded) - let transactions_root = ordered_trie_root_with_encoder( - &tracker.all_transactions, - |tx, buf| buf.put_slice(tx.as_ref()), - ); - // Withdrawals are included in first flashblock only (index 0) - let (diff_withdrawals, withdrawals_root) = if include_base_in_payload { - let root = if withdrawals.is_empty() { - EMPTY_ROOT_HASH - } else { - calculate_withdrawals_root(withdrawals) - }; - (withdrawals.to_vec(), root) - } else { - (vec![], EMPTY_ROOT_HASH) - }; - - // Construct header to compute block_hash - let header = BerachainHeader { - parent_hash: base.parent_hash, - ommers_hash: EMPTY_OMMER_ROOT_HASH, - beneficiary: base.fee_recipient, - state_root, - transactions_root, - receipts_root, - withdrawals_root: Some(withdrawals_root), - logs_bloom, - difficulty: U256::ZERO, - number: block_number, - gas_limit: base.gas_limit, - gas_used: tracker.cumulative_gas_used, - timestamp: base.timestamp, - mix_hash: base.prev_randao, - nonce: B64::ZERO, - base_fee_per_gas: Some(base.base_fee_per_gas.to::()), - blob_gas_used: None, - excess_blob_gas: None, - parent_beacon_block_root: Some(base.parent_beacon_block_root), - requests_hash: None, - prev_proposer_pubkey: base.prev_proposer_pubkey, - extra_data: base.extra_data.clone(), - }; - - let block_hash = header.hash_slow(); + let diff_withdrawals = if include_base_in_payload { withdrawals.to_vec() } else { vec![] }; + // Flashblocks just contain transactions, no computed roots let diff = BerachainFlashblockPayloadDiff { - state_root, - receipts_root, - logs_bloom, - gas_used: tracker.cumulative_gas_used, - block_hash, transactions: tracker.interval_transactions.clone(), withdrawals: diff_withdrawals, - withdrawals_root, - blob_gas_used: None, }; - let diff_hash = compute_diff_hash( - state_root, - receipts_root, - logs_bloom.as_slice(), - tracker.cumulative_gas_used, - block_hash, - &tracker.interval_transactions, - ); - let signature = signer.sign_flashblock(block_number, payload_id, index, diff_hash); + // Sign over transactions hash + let tx_hash = compute_transactions_hash(&tracker.interval_transactions); + let signature = signer.sign_flashblock(block_number, payload_id, index, tx_hash); let flashblock = BerachainFlashblockPayload { payload_id, @@ -673,7 +557,6 @@ fn emit_flashblock( payload_id = %payload_id, index, block_number, - block_hash = %block_hash, transactions = tracker.interval_transactions.len(), subscribers = count, is_last, diff --git a/src/sequencer/mod.rs b/src/sequencer/mod.rs index 7452d6bd..af3d8061 100644 --- a/src/sequencer/mod.rs +++ b/src/sequencer/mod.rs @@ -28,10 +28,6 @@ pub struct SequencerConfig { impl SequencerConfig { /// Create a new sequencer config with the required signer. pub fn new(interval_ms: u64, ws_addr: SocketAddr, signer: FlashblockSigner) -> Self { - Self { - interval: Duration::from_millis(interval_ms), - ws_addr, - signer: Arc::new(signer), - } + Self { interval: Duration::from_millis(interval_ms), ws_addr, signer: Arc::new(signer) } } } diff --git a/src/sequencer/publisher.rs b/src/sequencer/publisher.rs index 8e1cef4e..409ddb39 100644 --- a/src/sequencer/publisher.rs +++ b/src/sequencer/publisher.rs @@ -6,9 +6,10 @@ use std::{ io, net::SocketAddr, sync::{ - atomic::{AtomicUsize, Ordering}, Arc, + atomic::{AtomicUsize, Ordering}, }, + time::Duration, }; use tokio::{ net::{TcpListener, TcpStream}, @@ -19,8 +20,13 @@ use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, warn}; /// Capacity for the flashblock broadcast channel. -/// At ~200ms intervals, 64 messages allows ~12.8 seconds of buffering for slow clients. -const FLASHBLOCK_CHANNEL_CAPACITY: usize = 64; +const FLASHBLOCK_CHANNEL_CAPACITY: usize = 20; + +/// Maximum concurrent WebSocket connections. +const MAX_CONNECTIONS: usize = 256; + +/// Timeout for WebSocket handshake. +const HANDSHAKE_TIMEOUT: Duration = Duration::from_secs(10); /// WebSocket publisher that broadcasts flashblocks to all connected clients. #[derive(Debug)] @@ -88,6 +94,10 @@ impl WebSocketPublisher { result = listener.accept() => { match result { Ok((stream, addr)) => { + if self.subscriber_count.load(Ordering::Relaxed) >= MAX_CONNECTIONS { + warn!(target: "sequencer::publisher", %addr, "connection limit reached"); + continue; + } let rx = self.sender.subscribe(); let count = self.subscriber_count.clone(); let conn_cancel = cancel.clone(); @@ -116,7 +126,9 @@ async fn handle_connection( subscriber_count: Arc, cancel: CancellationToken, ) -> eyre::Result<()> { - let ws_stream = accept_async(stream).await?; + let ws_stream = tokio::time::timeout(HANDSHAKE_TIMEOUT, accept_async(stream)) + .await + .map_err(|_| eyre::eyre!("handshake timeout"))??; let (mut write, mut read) = ws_stream.split(); subscriber_count.fetch_add(1, Ordering::Relaxed); diff --git a/src/sequencer/signing.rs b/src/sequencer/signing.rs index acd24ebb..6edaf8c9 100644 --- a/src/sequencer/signing.rs +++ b/src/sequencer/signing.rs @@ -4,7 +4,7 @@ //! `message = keccak256(domain || block_number || payload_id || index || diff_hash)` //! where `domain = keccak256("BerachainPreconf-v1" || chain_id)` -use alloy_primitives::{keccak256, B256}; +use alloy_primitives::{B256, keccak256}; use blst::min_pk::{PublicKey, SecretKey, Signature}; use reth::rpc::types::engine::PayloadId; use std::path::Path; @@ -73,8 +73,8 @@ impl FlashblockSigner { ))); } - let secret_key = SecretKey::from_bytes(&key_bytes) - .map_err(|_| SigningError::InvalidSecretKey)?; + let secret_key = + SecretKey::from_bytes(&key_bytes).map_err(|_| SigningError::InvalidSecretKey)?; Ok(Self::new(secret_key, chain_id)) } @@ -98,13 +98,8 @@ impl FlashblockSigner { index: u64, diff_hash: B256, ) -> BlsSignature { - let message = compute_signing_message( - self.domain, - block_number, - payload_id, - index, - diff_hash, - ); + let message = + compute_signing_message(self.domain, block_number, payload_id, index, diff_hash); let signature = self.secret_key.sign(&message, BLS_DST, &[]); signature.to_bytes() @@ -120,22 +115,13 @@ impl FlashblockSigner { index: u64, diff_hash: B256, ) -> Result { - let pk = PublicKey::from_bytes(public_key) - .map_err(|_| SigningError::InvalidPublicKey)?; - let sig = Signature::from_bytes(signature) - .map_err(|_| SigningError::InvalidSignature)?; + let pk = PublicKey::from_bytes(public_key).map_err(|_| SigningError::InvalidPublicKey)?; + let sig = Signature::from_bytes(signature).map_err(|_| SigningError::InvalidSignature)?; let domain = compute_domain(chain_id); let message = compute_signing_message(domain, block_number, payload_id, index, diff_hash); - let result = sig.verify( - true, - &message, - BLS_DST, - &[], - &pk, - true, - ); + let result = sig.verify(true, &message, BLS_DST, &[], &pk, true); Ok(result == blst::BLST_ERROR::BLST_SUCCESS) } @@ -166,23 +152,13 @@ fn compute_signing_message( keccak256(&data).0 } -/// Compute the hash of a flashblock diff for signing. -pub fn compute_diff_hash( - state_root: B256, - receipts_root: B256, - logs_bloom: &[u8], - gas_used: u64, - block_hash: B256, - transactions: &[impl AsRef<[u8]>], -) -> B256 { +/// Compute the hash of flashblock transactions for signing. +pub fn compute_transactions_hash(transactions: &[impl AsRef<[u8]>]) -> B256 { let mut data = Vec::new(); - data.extend_from_slice(state_root.as_slice()); - data.extend_from_slice(receipts_root.as_slice()); - data.extend_from_slice(logs_bloom); - data.extend_from_slice(&gas_used.to_be_bytes()); - data.extend_from_slice(block_hash.as_slice()); for tx in transactions { - data.extend_from_slice(tx.as_ref()); + let tx_bytes = tx.as_ref(); + data.extend_from_slice(&(tx_bytes.len() as u64).to_be_bytes()); + data.extend_from_slice(tx_bytes); } keccak256(&data) } @@ -191,31 +167,31 @@ pub fn compute_diff_hash( mod tests { use super::*; - fn test_secret_key() -> SecretKey { + const CHAIN_ID: u64 = 80094; + const BLOCK_NUMBER: u64 = 100; + const INDEX: u64 = 0; + + fn test_signer() -> FlashblockSigner { let seed = [1u8; 32]; - SecretKey::key_gen(&seed, &[]).unwrap() + FlashblockSigner::new(SecretKey::key_gen(&seed, &[]).unwrap(), CHAIN_ID) } #[test] fn test_sign_and_verify() { - let chain_id = 80094; - let signer = FlashblockSigner::new(test_secret_key(), chain_id); - - let block_number = 100; + let signer = test_signer(); let payload_id = PayloadId::new([1u8; 8]); - let index = 0; - let diff_hash = B256::repeat_byte(0x42); + let tx_hash = B256::repeat_byte(0x42); - let signature = signer.sign_flashblock(block_number, payload_id, index, diff_hash); + let signature = signer.sign_flashblock(BLOCK_NUMBER, payload_id, INDEX, tx_hash); let valid = FlashblockSigner::verify( &signer.public_key_bytes(), &signature, - chain_id, - block_number, + CHAIN_ID, + BLOCK_NUMBER, payload_id, - index, - diff_hash, + INDEX, + tx_hash, ) .unwrap(); @@ -224,26 +200,21 @@ mod tests { #[test] fn test_invalid_signature_fails_verification() { - let chain_id = 80094; - let signer = FlashblockSigner::new(test_secret_key(), chain_id); - - let block_number = 100; + let signer = test_signer(); let payload_id = PayloadId::new([1u8; 8]); - let index = 0; - let diff_hash = B256::repeat_byte(0x42); + let tx_hash = B256::repeat_byte(0x42); - let signature = signer.sign_flashblock(block_number, payload_id, index, diff_hash); + let signature = signer.sign_flashblock(BLOCK_NUMBER, payload_id, INDEX, tx_hash); - // Verify with wrong diff_hash should fail - let wrong_diff_hash = B256::repeat_byte(0x43); + let wrong_hash = B256::repeat_byte(0x43); let valid = FlashblockSigner::verify( &signer.public_key_bytes(), &signature, - chain_id, - block_number, + CHAIN_ID, + BLOCK_NUMBER, payload_id, - index, - wrong_diff_hash, + INDEX, + wrong_hash, ) .unwrap(); diff --git a/tests/e2e/flashblocks.rs b/tests/e2e/flashblocks.rs index 19e9a88b..5f04b12b 100644 --- a/tests/e2e/flashblocks.rs +++ b/tests/e2e/flashblocks.rs @@ -6,7 +6,7 @@ use crate::e2e::{setup_test_boilerplate, test_signer}; use alloy_consensus::BlockHeader; use alloy_eips::eip2718::Encodable2718; -use alloy_primitives::{Address, B256, Bloom, Bytes, U256}; +use alloy_primitives::{Address, B256, Bytes, U256}; use alloy_provider::Provider; use bera_reth::{ engine::validator::BerachainEngineValidatorBuilder, @@ -57,17 +57,7 @@ fn create_test_flashblock( payload_id, index, base, - diff: BerachainFlashblockPayloadDiff { - state_root: B256::random(), - receipts_root: B256::random(), - logs_bloom: Bloom::default(), - gas_used: 21000, - block_hash: B256::random(), - transactions: vec![], - withdrawals: vec![], - withdrawals_root: B256::ZERO, - blob_gas_used: None, - }, + diff: BerachainFlashblockPayloadDiff { transactions: vec![], withdrawals: vec![] }, metadata: BerachainFlashblockPayloadMetadata { block_number }, signature: [0u8; 96], is_last: false, @@ -207,7 +197,6 @@ async fn test_rpc_returns_flashblock_pending_receipt() -> eyre::Result<()> { let payload_id = PayloadId::new([1u8; 8]); let mut fb0 = create_test_flashblock(0, next_block, payload_id, latest_hash, next_timestamp); fb0.diff.transactions = vec![tx_bytes]; - fb0.diff.gas_used = 21000; // Inject the flashblock into the service via our mock stream. fb_tx.send(fb0).await?; From c798a99ade813876ace326014fa23bfced5ff2b0 Mon Sep 17 00:00:00 2001 From: Fridrik Asmundsson Date: Tue, 10 Feb 2026 12:04:06 +0000 Subject: [PATCH 3/5] fix: cancel flashblock build early when getPayload is called --- src/sequencer/builder.rs | 28 ++++++++++++++++++++++------ 1 file changed, 22 insertions(+), 6 deletions(-) diff --git a/src/sequencer/builder.rs b/src/sequencer/builder.rs index 2f7c82aa..5330794e 100644 --- a/src/sequencer/builder.rs +++ b/src/sequencer/builder.rs @@ -48,7 +48,10 @@ use reth_transaction_pool::{ error::InvalidPoolTransactionError, }; use std::{ - sync::Arc, + sync::{ + Arc, + atomic::{AtomicBool, Ordering}, + }, time::{Duration, Instant}, }; use tracing::{debug, info, trace, warn}; @@ -127,6 +130,7 @@ pub struct FlashblockPayloadBuilder { sequencer_config: SequencerConfig, publisher: Arc, deadline: Duration, + payload_requested: Arc, } impl Clone for FlashblockPayloadBuilder { @@ -139,6 +143,7 @@ impl Clone for FlashblockPayloadBuilder FlashblockPayloadBuilder { publisher: Arc, deadline: Duration, ) -> Self { - Self { client, pool, evm_config, builder_config, sequencer_config, publisher, deadline } + Self { + client, + pool, + evm_config, + builder_config, + sequencer_config, + publisher, + deadline, + payload_requested: Arc::new(AtomicBool::new(false)), + } } } @@ -170,6 +184,7 @@ where &self, args: BuildArguments, ) -> Result, PayloadBuilderError> { + self.payload_requested.store(false, Ordering::Relaxed); build_flashblock_payload( self.evm_config.clone(), self.client.clone(), @@ -178,6 +193,7 @@ where self.sequencer_config.clone(), self.publisher.clone(), self.deadline, + self.payload_requested.clone(), args, |attributes| self.pool.best_transactions_with_attributes(attributes), ) @@ -187,6 +203,7 @@ where &self, _args: BuildArguments, ) -> MissingPayloadBehaviour { + self.payload_requested.store(true, Ordering::Relaxed); MissingPayloadBehaviour::AwaitInProgress } @@ -240,6 +257,7 @@ fn build_flashblock_payload( sequencer_config: SequencerConfig, publisher: Arc, deadline: Duration, + payload_requested: Arc, args: BuildArguments, best_txs: F, ) -> Result, PayloadBuilderError> @@ -248,7 +266,7 @@ where Pool: TransactionPool>, F: FnOnce(BestTransactionsAttributes) -> BestTransactionsIter, { - let BuildArguments { mut cached_reads, config, cancel, best_payload: _ } = args; + let BuildArguments { mut cached_reads, config, cancel: _, best_payload: _ } = args; let PayloadConfig { parent_header, attributes } = config; let state_provider = client.state_by_block_hash(parent_header.hash())?; @@ -353,9 +371,7 @@ where // Empty flashblocks serve as heartbeats, allowing subscribers to detect liveness and // track the current state even when no transactions are being processed. loop { - // Check if cancelled (getPayload called). - // TODO: detect orphaned builds on new forkchoiceUpdated. - if cancel.is_cancelled() { + if payload_requested.load(Ordering::Relaxed) { info!( target: "sequencer::builder", id = %payload_id, From af193b66839313cd75b86da0e371baaeb5e814fd Mon Sep 17 00:00:00 2001 From: Fridrik Asmundsson Date: Wed, 18 Feb 2026 15:31:26 +0000 Subject: [PATCH 4/5] feat: serve flashblock state on RPC nodes via sequencer WebSocket subscription --- Cargo.lock | 1 + Cargo.toml | 2 + src/main.rs | 25 ++++++- src/rpc/api.rs | 175 +++++++++++++++++++++++++++++++++++++++++++++++-- 4 files changed, 196 insertions(+), 7 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3d41ac25..70d54d9a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1502,6 +1502,7 @@ dependencies = [ "modular-bitfield", "reth", "reth-basic-payload-builder", + "reth-chain-state", "reth-chainspec", "reth-cli", "reth-cli-commands", diff --git a/Cargo.toml b/Cargo.toml index 707756b3..98f1cfc0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,6 +35,7 @@ futures-util = "0.3" modular-bitfield = "0.11.2" reth = { git = "https://github.com/paradigmxyz/reth", rev = "536bebfcd" } reth-basic-payload-builder = { git = "https://github.com/paradigmxyz/reth", rev = "536bebfcd" } +reth-chain-state = { git = "https://github.com/paradigmxyz/reth", rev = "536bebfcd" } reth-chainspec = { git = "https://github.com/paradigmxyz/reth", rev = "536bebfcd" } reth-cli = { git = "https://github.com/paradigmxyz/reth", rev = "536bebfcd" } reth-cli-commands = { git = "https://github.com/paradigmxyz/reth", rev = "536bebfcd" } @@ -123,6 +124,7 @@ ignored = ["modular-bitfield"] reth = { git = "https://github.com/rezbera/reth", branch = "rezbera/modular-flashblocks" } reth-rpc = { git = "https://github.com/rezbera/reth", branch = "rezbera/modular-flashblocks" } reth-basic-payload-builder = { git = "https://github.com/rezbera/reth", branch = "rezbera/modular-flashblocks" } +reth-chain-state = { git = "https://github.com/rezbera/reth", branch = "rezbera/modular-flashblocks" } reth-chainspec = { git = "https://github.com/rezbera/reth", branch = "rezbera/modular-flashblocks" } reth-cli = { git = "https://github.com/rezbera/reth", branch = "rezbera/modular-flashblocks" } reth-cli-commands = { git = "https://github.com/rezbera/reth", branch = "rezbera/modular-flashblocks" } diff --git a/src/main.rs b/src/main.rs index f669cb37..da446bf5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -8,12 +8,14 @@ use bera_reth::{ consensus::BerachainBeaconConsensus, evm::BerachainEvmFactory, node::{BerachainNode, evm::config::BerachainEvmConfig}, + rpc::{BerachainAddOns, BerachainEthApiBuilder}, sequencer::{ FlashblockPayloadServiceBuilder, FlashblockSigner, SequencerConfig, WebSocketPublisher, }, version::init_bera_version, }; use clap::Parser; +use jsonrpsee::client_transport::ws::Url; use reth::CliRunner; use reth_chainspec::EthChainSpec; use reth_ethereum_cli::Cli; @@ -40,6 +42,10 @@ pub struct SequencerArgs { /// Path to BLS secret key file for signing flashblocks (hex-encoded 32-byte key) #[arg(long)] pub flashblock_signing_key: Option, + + /// WebSocket URL for subscribing to flashblocks from a sequencer (e.g., ws://sequencer:8548) + #[arg(long)] + pub flashblocks_url: Option, } fn main() { @@ -120,9 +126,22 @@ fn main() { ws_cancel.cancel(); result } else { - info!(target: "reth::cli", "Launching Berachain node"); - let NodeHandle { node: _node, node_exit_future } = - builder.node(BerachainNode::default()).launch_with_debug_capabilities().await?; + if let Some(ref url) = extra_args.flashblocks_url { + info!(target: "reth::cli", %url, "Launching Berachain node with flashblocks"); + } else { + info!(target: "reth::cli", "Launching Berachain node"); + } + + let eth_api_builder = BerachainEthApiBuilder::default() + .with_flashblocks_url(extra_args.flashblocks_url); + let berachain_node = BerachainNode::default(); + + let NodeHandle { node: _node, node_exit_future } = builder + .with_types::() + .with_components(berachain_node.components_builder()) + .with_add_ons(BerachainAddOns::new(eth_api_builder)) + .launch_with_debug_capabilities() + .await?; node_exit_future.await } diff --git a/src/rpc/api.rs b/src/rpc/api.rs index 0793f2f2..eb54679a 100644 --- a/src/rpc/api.rs +++ b/src/rpc/api.rs @@ -5,7 +5,7 @@ use crate::{ transaction::{BerachainTxEnvelope, BerachainTxType, POL_TX_TYPE}, }; use alloy_consensus::{BlockHeader, Transaction}; -use alloy_eips::eip2930::AccessList; +use alloy_eips::{BlockId, BlockNumberOrTag, eip2930::AccessList}; use alloy_network::{ BuildResult, Network, NetworkWallet, TransactionBuilder, TransactionBuilderError, }; @@ -21,10 +21,11 @@ use reth::{ pool::{BlockingTaskGuard, BlockingTaskPool}, }, }; +use reth_chain_state::BlockState; use reth_optimism_flashblocks::{FlashBlockBuildInfo, FlashblocksListeners, PendingFlashBlock}; -use reth_primitives_traits::{Recovered, WithEncoded}; +use reth_primitives_traits::{BlockBody, Recovered, RecoveredBlock, WithEncoded}; use reth_rpc_eth_api::{ - EthApiTypes, RpcNodeCore, RpcNodeCoreExt, RpcReceipt, + EthApiTypes, FromEthApiError, RpcNodeCore, RpcNodeCoreExt, RpcReceipt, helpers::{ Call, EthApiSpec, EthBlocks, EthCall, EthFees, EthState, EthTransactions, LoadBlock, LoadFee, LoadPendingBlock, LoadReceipt, LoadState, LoadTransaction, SpawnBlocking, Trace, @@ -33,8 +34,9 @@ use reth_rpc_eth_api::{ }; use reth_rpc_eth_types::{ EthApiError, EthStateCache, FeeHistoryCache, GasPriceOracle, PendingBlock, - builder::config::PendingBlockKind, error::FromEvmError, + block::BlockAndReceipts, builder::config::PendingBlockKind, error::FromEvmError, }; +use reth_storage_api::{BlockIdReader, BlockReader, StateProviderBox, StateProviderFactory}; use reth_transaction_pool::PoolPooledTx; use std::{sync::Arc, time::Duration}; use tokio::time; @@ -580,6 +582,43 @@ where EthApiError: FromEvmError, Rpc: RpcConvert, { + #[allow(clippy::manual_async_fn)] + fn block_transaction_count( + &self, + block_id: BlockId, + ) -> impl Future, Self::Error>> + Send { + async move { + if (block_id.is_latest() || block_id.is_pending()) && + let Ok(Some(pending)) = self.pending_flashblock().await + { + return Ok(Some(pending.block().body().transaction_count())); + } + + if block_id.is_pending() { + return Ok(self + .provider() + .pending_block() + .map_err(Into::::into)? + .map(|block| block.body().transaction_count())); + } + + let block_hash = match self + .provider() + .block_hash_for_id(block_id) + .map_err(Into::::into)? + { + Some(block_hash) => block_hash, + None => return Ok(None), + }; + + Ok(self + .cache() + .get_recovered_block(block_hash) + .await + .map_err(Into::::into)? + .map(|b| b.body().transaction_count())) + } + } } impl LoadBlock for BerachainApi @@ -588,6 +627,52 @@ where N: RpcNodeCore, Rpc: RpcConvert, { + #[allow(clippy::manual_async_fn, clippy::collapsible_if)] + fn recovered_block( + &self, + block_id: BlockId, + ) -> impl Future< + Output = Result< + Option::Block>>>, + Self::Error, + >, + > + Send { + async move { + // Serve flashblock for both "latest" and "pending" when available + if block_id.is_latest() || block_id.is_pending() { + if self.pending_flashblock().await.ok().flatten().is_some() { + if let Some(pending) = self.local_pending_block().await? { + return Ok(Some(pending.block)); + } + } + } + + // Default pending fallback: CL-provided block, then locally built + if block_id.is_pending() { + if let Some(pending_block) = + self.provider().pending_block().map_err(Self::Error::from_eth_err)? + { + return Ok(Some(Arc::new(pending_block))); + } + + return match self.local_pending_block().await? { + Some(pending) => Ok(Some(pending.block)), + None => Ok(None), + }; + } + + let block_hash = match self + .provider() + .block_hash_for_id(block_id) + .map_err(Self::Error::from_eth_err)? + { + Some(block_hash) => block_hash, + None => return Ok(None), + }; + + self.cache().get_recovered_block(block_hash).await.map_err(Self::Error::from_eth_err) + } + } } impl EthCall for BerachainApi @@ -661,6 +746,30 @@ where Rpc: RpcConvert, Self: LoadPendingBlock, { + #[allow(clippy::manual_async_fn, clippy::collapsible_if)] + fn state_at_block_id_or_latest( + &self, + block_id: Option, + ) -> impl Future> + Send + where + Self: SpawnBlocking, + { + async move { + let should_use_flashblock = block_id.is_none_or(|id| id.is_latest() || id.is_pending()); + + if should_use_flashblock { + if let Ok(Some(state)) = self.local_pending_state().await { + return Ok(state); + } + } + + if let Some(block_id) = block_id { + self.state_at_block_id(block_id).await + } else { + Ok(self.latest_state()?) + } + } + } } impl LoadFee for BerachainApi @@ -678,6 +787,25 @@ where fn fee_history_cache(&self) -> &FeeHistoryCache> { self.inner.fee_history_cache() } + + #[allow(clippy::manual_async_fn)] + fn gas_price(&self) -> impl Future> + Send { + async move { + let suggested_tip = LoadFee::suggested_priority_fee(self).await?; + + let base_fee = if let Ok(Some(pending)) = self.pending_flashblock().await { + pending.block().base_fee_per_gas().unwrap_or_default() + } else { + self.provider() + .latest_header() + .map_err(Into::::into)? + .and_then(|h| h.base_fee_per_gas()) + .unwrap_or_default() + }; + + Ok(suggested_tip + U256::from(base_fee)) + } + } } impl LoadPendingBlock for BerachainApi @@ -700,4 +828,43 @@ where fn pending_block_kind(&self) -> PendingBlockKind { self.inner.pending_block_kind() } + + async fn local_pending_state(&self) -> Result, Self::Error> + where + Self: SpawnBlocking, + { + let Ok(Some(pending_block)) = self.pending_flashblock().await else { + return Ok(None); + }; + + let latest_historical = self + .provider() + .history_by_block_hash(pending_block.block().parent_hash()) + .map_err(Into::::into)?; + + let state = BlockState::from(pending_block); + + Ok(Some(Box::new(state.state_provider(latest_historical)) as StateProviderBox)) + } + + async fn local_pending_block( + &self, + ) -> Result>, Self::Error> { + if let Ok(Some(pending)) = self.pending_flashblock().await { + return Ok(Some(pending.into_block_and_receipts())); + } + + let latest = self + .provider() + .latest_header()? + .ok_or(EthApiError::HeaderNotFound(BlockNumberOrTag::Latest.into()))?; + + let latest = self + .cache() + .get_block_and_receipts(latest.hash()) + .await + .map_err(Into::::into)? + .map(|(block, receipts)| BlockAndReceipts { block, receipts }); + Ok(latest) + } } From 2c502ebe978e3c36e8ecb0b4801cb2f620fbfc31 Mon Sep 17 00:00:00 2001 From: Fridrik Asmundsson Date: Mon, 9 Mar 2026 10:53:48 +0000 Subject: [PATCH 5/5] fix: simplify flashblock retrieval and handle missing parent block gracefully --- src/rpc/api.rs | 110 +++++++++++++++++-------------------------------- 1 file changed, 38 insertions(+), 72 deletions(-) diff --git a/src/rpc/api.rs b/src/rpc/api.rs index eb54679a..9501f639 100644 --- a/src/rpc/api.rs +++ b/src/rpc/api.rs @@ -22,7 +22,7 @@ use reth::{ }, }; use reth_chain_state::BlockState; -use reth_optimism_flashblocks::{FlashBlockBuildInfo, FlashblocksListeners, PendingFlashBlock}; +use reth_optimism_flashblocks::FlashblocksListeners; use reth_primitives_traits::{BlockBody, Recovered, RecoveredBlock, WithEncoded}; use reth_rpc_eth_api::{ EthApiTypes, FromEthApiError, RpcNodeCore, RpcNodeCoreExt, RpcReceipt, @@ -38,8 +38,7 @@ use reth_rpc_eth_types::{ }; use reth_storage_api::{BlockIdReader, BlockReader, StateProviderBox, StateProviderFactory}; use reth_transaction_pool::PoolPooledTx; -use std::{sync::Arc, time::Duration}; -use tokio::time; +use std::sync::Arc; impl fmt::Display for BerachainTxType { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { @@ -340,67 +339,21 @@ pub struct BerachainApi { pub flashblocks: Option>>, } -/// Maximum duration to wait for a fresh flashblock when one is being built. -const MAX_FLASHBLOCK_WAIT_DURATION: Duration = Duration::from_millis(50); - -impl BerachainApi { - /// Returns information about the flashblock currently being built, if any. - fn flashblock_build_info(&self) -> Option { - self.flashblocks.as_ref().and_then(|f| *f.in_progress_rx.borrow()) - } - - /// Extracts pending block if it matches the expected parent hash. - fn extract_matching_block( - &self, - block: Option<&PendingFlashBlock>, - parent_hash: B256, - ) -> Option> { - block.filter(|b| b.block().parent_hash() == parent_hash).map(|b| b.pending.clone()) - } - - /// Returns a [`PendingBlock`] that is built out of flashblocks. - /// - /// If flashblocks receiver is not set, then it always returns `None`. +impl BerachainApi +where + N::Provider: BlockReaderIdExt, +{ + /// Returns the current pending block from the flashblock stream, if any. /// - /// It may wait up to 50ms for a fresh flashblock if one is currently being built. - pub async fn pending_flashblock(&self) -> eyre::Result>> - where - // OpEthApiError: FromEvmError, - Rpc: RpcConvert, - { - let Some(latest) = self.provider().latest_header()? else { - return Ok(None); - }; - - self.flashblock(latest.hash()).await - } - - /// Awaits a fresh flashblock if one is being built, otherwise returns current. - async fn flashblock( - &self, - parent_hash: B256, - ) -> eyre::Result>> { - let Some(rx) = self.flashblocks.as_ref().as_ref().map(|f| &f.pending_block_rx) else { - return Ok(None) - }; - - // Check if a flashblock is being built - if let Some(build_info) = self.flashblock_build_info() { - let current_index = rx.borrow().as_ref().map(|b| b.last_flashblock_index); - - // Check if this is the first flashblock or the next consecutive index - let is_next_index = current_index.is_none_or(|idx| build_info.index == idx + 1); - - // Wait only for relevant flashblocks: matching parent and next in sequence - if build_info.parent_hash == parent_hash && is_next_index { - let mut rx_clone = rx.clone(); - // Wait up to MAX_FLASHBLOCK_WAIT_DURATION for a new flashblock to arrive - let _ = time::timeout(MAX_FLASHBLOCK_WAIT_DURATION, rx_clone.changed()).await; - } - } + /// Only returns a block whose parent hash matches the latest canonical header, + /// ensuring stale flashblocks are not served during reorgs. + pub fn pending_flashblock(&self) -> Option> { + let latest_hash = self.provider().latest_header().ok().flatten()?.hash(); - // Fall back to current block - Ok(self.extract_matching_block(rx.borrow().as_ref(), parent_hash)) + self.flashblocks + .as_ref() + .and_then(|f| f.pending_block_rx.borrow().as_ref().map(|b| b.pending.clone())) + .filter(|pending| pending.block().parent_hash() == latest_hash) } } @@ -537,7 +490,7 @@ where if tx_receipt.is_none() { // if flashblocks are supported, attempt to find id from the pending block - if let Ok(Some(pending_block)) = this.pending_flashblock().await && + if let Some(pending_block) = this.pending_flashblock() && let Some(Ok(receipt)) = pending_block .find_and_convert_transaction_receipt(hash, this.converter()) { @@ -589,7 +542,7 @@ where ) -> impl Future, Self::Error>> + Send { async move { if (block_id.is_latest() || block_id.is_pending()) && - let Ok(Some(pending)) = self.pending_flashblock().await + let Some(pending) = self.pending_flashblock() { return Ok(Some(pending.block().body().transaction_count())); } @@ -640,7 +593,7 @@ where async move { // Serve flashblock for both "latest" and "pending" when available if block_id.is_latest() || block_id.is_pending() { - if self.pending_flashblock().await.ok().flatten().is_some() { + if self.pending_flashblock().is_some() { if let Some(pending) = self.local_pending_block().await? { return Ok(Some(pending.block)); } @@ -793,7 +746,7 @@ where async move { let suggested_tip = LoadFee::suggested_priority_fee(self).await?; - let base_fee = if let Ok(Some(pending)) = self.pending_flashblock().await { + let base_fee = if let Some(pending) = self.pending_flashblock() { pending.block().base_fee_per_gas().unwrap_or_default() } else { self.provider() @@ -829,28 +782,41 @@ where self.inner.pending_block_kind() } + /// Returns the pending state built on top of the latest flashblock. + /// + /// If the flashblock's parent block hasn't been imported into the DB yet, falls back to + /// canonical state via `Ok(None)`. async fn local_pending_state(&self) -> Result, Self::Error> where Self: SpawnBlocking, { - let Ok(Some(pending_block)) = self.pending_flashblock().await else { + let Some(pending_block) = self.pending_flashblock() else { + tracing::info!("no pending flashblock available, falling back to canonical state"); return Ok(None); }; - let latest_historical = self + let parent_hash = pending_block.block().parent_hash(); + + let Ok(latest_historical) = self .provider() - .history_by_block_hash(pending_block.block().parent_hash()) - .map_err(Into::::into)?; + .history_by_block_hash(parent_hash) + .map_err(Into::::into) + else { + tracing::info!( + %parent_hash, + "parent block not imported yet, falling back to canonical state" + ); + return Ok(None); + }; let state = BlockState::from(pending_block); - Ok(Some(Box::new(state.state_provider(latest_historical)) as StateProviderBox)) } async fn local_pending_block( &self, ) -> Result>, Self::Error> { - if let Ok(Some(pending)) = self.pending_flashblock().await { + if let Some(pending) = self.pending_flashblock() { return Ok(Some(pending.into_block_and_receipts())); }