diff --git a/dash-spv/ARCHITECTURE.md b/dash-spv/ARCHITECTURE.md index 1e84a2ca3..6fbd17cd2 100644 --- a/dash-spv/ARCHITECTURE.md +++ b/dash-spv/ARCHITECTURE.md @@ -1013,7 +1013,6 @@ pub trait SyncManager: Send + Sync + Debug { fn state(&self) -> SyncState; fn wanted_message_types(&self) -> &'static [MessageType]; - async fn initialize(&mut self) -> SyncResult<()>; async fn start_sync(&mut self, requests: &RequestSender) -> SyncResult>; async fn handle_message(&mut self, msg: Message, requests: &RequestSender) -> SyncResult>; async fn handle_sync_event(&mut self, event: &SyncEvent, requests: &RequestSender) -> SyncResult>; diff --git a/dash-spv/src/client/lifecycle.rs b/dash-spv/src/client/lifecycle.rs index 2716aa65e..4a2215686 100644 --- a/dash-spv/src/client/lifecycle.rs +++ b/dash-spv/src/client/lifecycle.rs @@ -36,12 +36,16 @@ impl DashSpvClient>, ) -> Result { // Validate configuration config.validate().map_err(SpvError::Config)?; + // Initialize genesis block or checkpoint before creating managers, + // so they can read the tip from storage during construction. + Self::initialize_genesis_block(&config, &mut storage).await?; + let masternode_engine = { if config.enable_masternodes { Some(Arc::new(RwLock::new(MasternodeListEngine::default_for_network( @@ -68,19 +72,25 @@ impl DashSpvClient DashSpvClient DashSpvClient let storage = Arc::new(Mutex::new(storage)); - Ok(Self { + let client = Self { config: Arc::new(RwLock::new(config)), network: Arc::new(Mutex::new(network)), storage, @@ -120,56 +135,51 @@ impl DashSpvClient Result<()> { - { - let running = self.running.read().await; - if *running { - return Err(SpvError::Config("Client already running".to_string())); - } - } + }; // Load wallet data from storage - self.load_wallet_data().await?; - - let config = self.config.read().await; + client.load_wallet_data().await?; // Initialize mempool filter if mempool tracking is enabled - if config.enable_mempool_tracking { - // TODO: Get monitored addresses from wallet - let filter = Arc::new(MempoolFilter::new( - config.mempool_strategy, - config.max_mempool_transactions, - self.mempool_state.clone(), - HashSet::new(), // Will be populated from wallet's monitored addresses - config.network, - )); - - *self.mempool_filter.write().await = Some(filter); - - // Load mempool state from storage if persistence is enabled - if config.persist_mempool { - if let Some(state) = self - .storage - .lock() - .await - .load_mempool_state() - .await - .map_err(SpvError::Storage)? - { - *self.mempool_state.write().await = state; + { + let config = client.config.read().await; + if config.enable_mempool_tracking { + let filter = Arc::new(MempoolFilter::new( + config.mempool_strategy, + config.max_mempool_transactions, + client.mempool_state.clone(), + HashSet::new(), // TODO: populate from wallet's monitored addresses + config.network, + )); + *client.mempool_filter.write().await = Some(filter); + + // Load mempool state from storage if persistence is enabled + if config.persist_mempool { + if let Some(state) = client + .storage + .lock() + .await + .load_mempool_state() + .await + .map_err(SpvError::Storage)? + { + *client.mempool_state.write().await = state; + } } } } - // Drop config before calling methods that also read it - drop(config); + Ok(client) + } - // Initialize genesis block if not already present - self.initialize_genesis_block().await?; + /// Start the SPV client: spawn sync tasks and connect to the network. + pub(super) async fn start(&self) -> Result<()> { + { + let running = self.running.read().await; + if *running { + return Err(SpvError::Config("Client already running".to_string())); + } + } // Start all sync tasks before connecting to the network to make sure initial connection // events are handled correctly in the sync coordinator. @@ -230,15 +240,12 @@ impl DashSpvClient Result<()> { - let config = self.config.read().await; - + /// Initialize genesis block or checkpoint in storage. + /// + /// Called before creating managers so they can read the tip during construction. + async fn initialize_genesis_block(config: &ClientConfig, storage: &mut S) -> Result<()> { // Check if we already have any headers in storage - let current_tip = { - let storage = self.storage.lock().await; - storage.get_tip_height().await - }; + let current_tip = storage.get_tip_height().await; if current_tip.is_some() { // We already have headers, genesis block should be at height 0 @@ -298,12 +305,9 @@ impl DashSpvClient DashSpvClient std::fmt::Debug for BlockHeadersManager { impl BlockHeadersManager { /// Create a new headers manager with the given storage and checkpoint manager. - pub fn new(header_storage: Arc>, checkpoint_manager: Arc) -> Self { - Self { - progress: BlockHeadersProgress::default(), + pub async fn new( + header_storage: Arc>, + checkpoint_manager: Arc, + ) -> SyncResult { + let tip = header_storage + .read() + .await + .get_tip() + .await + .ok_or_else(|| SyncError::MissingDependency("No tip in storage".to_string()))?; + + let mut initial_progress = BlockHeadersProgress::default(); + initial_progress.set_state(SyncState::WaitingForConnections); + initial_progress.update_tip_height(tip.height()); + initial_progress.update_target_height(tip.height()); + + tracing::info!("BlockHeadersManager initialized at height {}", tip.height()); + + Ok(Self { + progress: initial_progress, header_storage, - pipeline: HeadersPipeline::new(checkpoint_manager.clone()), + pipeline: HeadersPipeline::new(checkpoint_manager), pending_announcements: HashMap::new(), - } + }) } pub(super) async fn tip(&self) -> SyncResult { @@ -227,16 +244,21 @@ mod tests { } async fn create_test_manager() -> TestBlockHeadersManager { - let storage = DiskStorageManager::with_temp_dir().await.unwrap(); + let mut storage = DiskStorageManager::with_temp_dir().await.unwrap(); + // Store a genesis header so the manager can initialize + let genesis = Header::dummy_batch(0..1); + storage.store_headers(&genesis).await.unwrap(); let checkpoint_manager = create_test_checkpoint_manager(); BlockHeadersManager::new(storage.block_headers(), checkpoint_manager) + .await + .expect("Failed to create BlockHeadersManager") } #[tokio::test] async fn test_block_headers_manager_new() { let manager = create_test_manager().await; assert_eq!(manager.identifier(), ManagerIdentifier::BlockHeader); - assert_eq!(manager.state(), SyncState::WaitForEvents); + assert_eq!(manager.state(), SyncState::WaitingForConnections); assert_eq!(manager.wanted_message_types(), vec![MessageType::Headers, MessageType::Inv]); } @@ -249,7 +271,7 @@ mod tests { let progress = manager.progress(); if let SyncManagerProgress::BlockHeaders(progress) = progress { - assert_eq!(progress.state(), SyncState::WaitForEvents); + assert_eq!(progress.state(), SyncState::WaitingForConnections); assert_eq!(progress.tip_height(), 100); assert_eq!(progress.target_height(), 200); assert_eq!(progress.processed(), 50); diff --git a/dash-spv/src/sync/block_headers/sync_manager.rs b/dash-spv/src/sync/block_headers/sync_manager.rs index 0d9dd3737..5f6d60b78 100644 --- a/dash-spv/src/sync/block_headers/sync_manager.rs +++ b/dash-spv/src/sync/block_headers/sync_manager.rs @@ -6,7 +6,6 @@ use crate::sync::{ BlockHeadersManager, ManagerIdentifier, ProgressPercentage, SyncEvent, SyncManager, SyncManagerProgress, SyncState, }; -use crate::SyncError; use async_trait::async_trait; use dashcore::network::message::NetworkMessage; use dashcore::BlockHash; @@ -37,24 +36,6 @@ impl SyncManager for BlockHeadersManager { &[MessageType::Headers, MessageType::Inv] } - async fn initialize(&mut self) -> SyncResult<()> { - let tip = self - .header_storage - .read() - .await - .get_tip() - .await - .ok_or_else(|| SyncError::MissingDependency("No tip in storage".to_string()))?; - - self.progress.set_state(SyncState::WaitingForConnections); - self.progress.update_tip_height(tip.height()); - self.progress.update_target_height(tip.height()); - - tracing::info!("BlockHeadersManager initialized at height {}", tip.height()); - - Ok(()) - } - async fn start_sync(&mut self, requests: &RequestSender) -> SyncResult> { ensure_not_started(self.state(), self.identifier())?; self.progress.set_state(SyncState::Syncing); diff --git a/dash-spv/src/sync/blocks/manager.rs b/dash-spv/src/sync/blocks/manager.rs index c9d7a4ce3..34771d9e9 100644 --- a/dash-spv/src/sync/blocks/manager.rs +++ b/dash-spv/src/sync/blocks/manager.rs @@ -43,13 +43,18 @@ pub struct BlocksManager BlocksManager { /// Create a new blocks manager with the given storage references. - pub fn new( + pub async fn new( wallet: Arc>, header_storage: Arc>, block_storage: Arc>, ) -> Self { + let synced_height = wallet.read().await.synced_height(); + + let mut initial_progress = BlocksProgress::default(); + initial_progress.update_last_processed(synced_height); + Self { - progress: BlocksProgress::default(), + progress: initial_progress, header_storage, block_storage, wallet, @@ -170,7 +175,7 @@ mod tests { async fn create_test_manager() -> TestBlocksManager { let storage = DiskStorageManager::with_temp_dir().await.unwrap(); let wallet = Arc::new(RwLock::new(MockWallet::new())); - BlocksManager::new(wallet, storage.block_headers(), storage.blocks()) + BlocksManager::new(wallet, storage.block_headers(), storage.blocks()).await } #[tokio::test] diff --git a/dash-spv/src/sync/blocks/sync_manager.rs b/dash-spv/src/sync/blocks/sync_manager.rs index 6a9c9471b..d8bc875db 100644 --- a/dash-spv/src/sync/blocks/sync_manager.rs +++ b/dash-spv/src/sync/blocks/sync_manager.rs @@ -31,20 +31,6 @@ impl SyncM &[MessageType::Block] } - async fn initialize(&mut self) -> SyncResult<()> { - // Get wallet state - let wallet = self.wallet.read().await; - let synced_height = wallet.synced_height(); - drop(wallet); - - self.progress.update_last_processed(synced_height); - self.progress.set_state(SyncState::WaitingForConnections); - - tracing::info!("BlocksManager initialized at height {}", self.progress.last_processed()); - - Ok(()) - } - async fn start_sync(&mut self, _requests: &RequestSender) -> SyncResult> { ensure_not_started(self.state(), self.identifier())?; // Check if filters already completed (event received before start_sync) diff --git a/dash-spv/src/sync/chainlock/manager.rs b/dash-spv/src/sync/chainlock/manager.rs index f30ee161f..fb29d436e 100644 --- a/dash-spv/src/sync/chainlock/manager.rs +++ b/dash-spv/src/sync/chainlock/manager.rs @@ -46,12 +46,12 @@ pub struct ChainLockManager { impl ChainLockManager { /// Create a new ChainLock manager. - pub fn new( + pub async fn new( header_storage: Arc>, metadata_storage: Arc>, masternode_engine: Arc>, ) -> Self { - Self { + let mut manager = Self { progress: ChainLockProgress::default(), header_storage, metadata_storage, @@ -59,7 +59,12 @@ impl ChainLockManager { best_chainlock: None, requested_chainlocks: HashSet::new(), masternode_ready: false, - } + }; + + // TODO: Move load_best_chainlock() and save_best_chainlock() to MetadataStorage trait. + manager.load_best_chainlock().await; + + manager } /// Notify the manager that masternode sync is complete. @@ -257,7 +262,7 @@ mod tests { let storage = DiskStorageManager::with_temp_dir().await.unwrap(); let engine = Arc::new(RwLock::new(MasternodeListEngine::default_for_network(Network::Testnet))); - ChainLockManager::new(storage.block_headers(), storage.metadata(), engine) + ChainLockManager::new(storage.block_headers(), storage.metadata(), engine).await } async fn create_test_manager_with_storage( @@ -265,7 +270,7 @@ mod tests { ) -> TestChainLockManager { let engine = Arc::new(RwLock::new(MasternodeListEngine::default_for_network(Network::Testnet))); - ChainLockManager::new(storage.block_headers(), storage.metadata(), engine) + ChainLockManager::new(storage.block_headers(), storage.metadata(), engine).await } fn create_test_chainlock(height: u32) -> ChainLock { @@ -391,12 +396,9 @@ mod tests { manager.save_best_chainlock().await; } - // Load it back via a fresh manager sharing the same storage + // Fresh manager sharing the same storage should load the chainlock automatically { - let mut manager = create_test_manager_with_storage(&storage).await; - assert!(manager.best_chainlock().is_none()); - - manager.load_best_chainlock().await; + let manager = create_test_manager_with_storage(&storage).await; let restored = manager.best_chainlock().expect("chainlock should be restored"); assert_eq!(restored.block_height, 42000); @@ -420,14 +422,13 @@ mod tests { } // Create a new manager and call initialize (the SyncManager trait method) - let mut manager = create_test_manager_with_storage(&storage).await; - manager.initialize().await.unwrap(); + let manager = create_test_manager_with_storage(&storage).await; let restored = manager.best_chainlock().expect("chainlock should be restored after initialize"); assert_eq!(restored.block_height, 99999); assert_eq!(manager.progress.best_validated_height(), 99999); - assert_eq!(manager.state(), SyncState::WaitingForConnections); + assert_eq!(manager.state(), SyncState::WaitForEvents); } #[tokio::test] diff --git a/dash-spv/src/sync/chainlock/sync_manager.rs b/dash-spv/src/sync/chainlock/sync_manager.rs index 0f9537c9b..78cd1ff26 100644 --- a/dash-spv/src/sync/chainlock/sync_manager.rs +++ b/dash-spv/src/sync/chainlock/sync_manager.rs @@ -26,13 +26,6 @@ impl SyncManager for ChainLockManager &[MessageType::CLSig, MessageType::Inv] } - async fn initialize(&mut self) -> SyncResult<()> { - self.load_best_chainlock().await; - self.set_state(SyncState::WaitingForConnections); - tracing::info!("{} initialized", self.identifier()); - Ok(()) - } - async fn handle_message( &mut self, msg: Message, diff --git a/dash-spv/src/sync/filter_headers/manager.rs b/dash-spv/src/sync/filter_headers/manager.rs index 9f9c28253..43a0ecc86 100644 --- a/dash-spv/src/sync/filter_headers/manager.rs +++ b/dash-spv/src/sync/filter_headers/manager.rs @@ -41,14 +41,30 @@ pub struct FilterHeadersManager impl FilterHeadersManager { /// Create a new filter headers manager with the given storage references. - pub fn new(header_storage: Arc>, filter_header_storage: Arc>) -> Self { - Self { - progress: FilterHeadersProgress::default(), + pub async fn new( + header_storage: Arc>, + filter_header_storage: Arc>, + ) -> SyncResult { + // Load current filter tip + let filter_tip = + filter_header_storage.read().await.get_filter_tip_height().await?.unwrap_or(0); + + // Load block header tip for progress display + let header_tip = + header_storage.read().await.get_tip().await.map(|t| t.height()).unwrap_or(0); + + let mut initial_progress = FilterHeadersProgress::default(); + initial_progress.update_current_height(filter_tip); + initial_progress.update_target_height(header_tip); + initial_progress.update_block_header_tip_height(header_tip); + + Ok(Self { + progress: initial_progress, header_storage, filter_header_storage, pipeline: FilterHeadersPipeline::default(), checkpoint_start_height: None, - } + }) } /// Process a CFHeaders response - store headers and update state. @@ -244,6 +260,8 @@ mod tests { async fn create_test_manager() -> TestFilterHeadersManager { let storage = DiskStorageManager::with_temp_dir().await.unwrap(); FilterHeadersManager::new(storage.block_headers(), storage.filter_headers()) + .await + .expect("Failed to create FilterHeadersManager") } #[tokio::test] diff --git a/dash-spv/src/sync/filter_headers/sync_manager.rs b/dash-spv/src/sync/filter_headers/sync_manager.rs index bcb8d6f80..22bbbb6d8 100644 --- a/dash-spv/src/sync/filter_headers/sync_manager.rs +++ b/dash-spv/src/sync/filter_headers/sync_manager.rs @@ -30,22 +30,6 @@ impl SyncManager for FilterHeade &[MessageType::CFHeaders] } - async fn initialize(&mut self) -> SyncResult<()> { - // Load current filter tip - let filter_tip = - self.filter_header_storage.read().await.get_filter_tip_height().await?.unwrap_or(0); - - self.progress.update_current_height(filter_tip); - self.set_state(SyncState::WaitingForConnections); - - tracing::info!( - "FilterHeadersManager initialized at height {}, waiting for headers", - self.progress.current_height() - ); - - Ok(()) - } - async fn handle_message( &mut self, msg: Message, diff --git a/dash-spv/src/sync/filters/manager.rs b/dash-spv/src/sync/filters/manager.rs index e7dac64e0..9a7592418 100644 --- a/dash-spv/src/sync/filters/manager.rs +++ b/dash-spv/src/sync/filters/manager.rs @@ -77,14 +77,24 @@ impl { /// Create a new filters manager with the given storage references. - pub fn new( + pub async fn new( wallet: Arc>, header_storage: Arc>, filter_header_storage: Arc>, filter_storage: Arc>, ) -> Self { + let committed_height = wallet.read().await.filter_committed_height(); + + // Load block header tip for target display + let header_tip = + header_storage.read().await.get_tip().await.map(|t| t.height()).unwrap_or(0); + + let mut initial_progress = FiltersProgress::default(); + initial_progress.update_committed_height(committed_height); + initial_progress.update_target_height(header_tip); + Self { - progress: FiltersProgress::default(), + progress: initial_progress, header_storage, filter_header_storage, filter_storage, @@ -791,6 +801,7 @@ mod tests { storage.filter_headers(), storage.filters(), ) + .await } #[tokio::test] diff --git a/dash-spv/src/sync/filters/sync_manager.rs b/dash-spv/src/sync/filters/sync_manager.rs index 8fd9e1fbd..4357fc9f5 100644 --- a/dash-spv/src/sync/filters/sync_manager.rs +++ b/dash-spv/src/sync/filters/sync_manager.rs @@ -43,26 +43,6 @@ impl< self.clear_in_flight_state(); } - async fn initialize(&mut self) -> SyncResult<()> { - let wallet = self.wallet.read().await; - let committed_height = wallet.filter_committed_height(); - drop(wallet); - - let stored_height = self.filter_storage.read().await.filter_tip_height().await?; - - self.progress.update_committed_height(committed_height); - self.progress.update_stored_height(stored_height); - self.set_state(SyncState::WaitingForConnections); - - tracing::info!( - "FiltersManager initialized (committed={}, stored={}), waiting for filter headers", - committed_height, - stored_height, - ); - - Ok(()) - } - async fn start_sync(&mut self, requests: &RequestSender) -> SyncResult> { ensure_not_started(self.state(), self.identifier())?; diff --git a/dash-spv/src/sync/masternodes/manager.rs b/dash-spv/src/sync/masternodes/manager.rs index 078a835ba..78e168669 100644 --- a/dash-spv/src/sync/masternodes/manager.rs +++ b/dash-spv/src/sync/masternodes/manager.rs @@ -91,13 +91,27 @@ pub struct MasternodesManager { impl MasternodesManager { /// Create a new masternode manager with the given header storage. - pub fn new( + pub async fn new( header_storage: Arc>, engine: Arc>, network: dashcore::Network, ) -> Self { + // Load current height from engine's masternode lists + let current_height = + engine.read().await.masternode_lists.keys().last().copied().unwrap_or(0); + + // Load block header tip for progress display + let header_tip = + header_storage.read().await.get_tip().await.map(|t| t.height()).unwrap_or(0); + + let mut initial_progress = MasternodesProgress::default(); + initial_progress.update_current_height(current_height); + initial_progress.update_target_height(header_tip); + initial_progress.update_block_header_tip_height(header_tip); + initial_progress.set_state(SyncState::WaitingForConnections); + Self { - progress: MasternodesProgress::default(), + progress: initial_progress, header_storage, engine, network, @@ -224,14 +238,14 @@ mod tests { let engine = Arc::new(RwLock::new(MasternodeListEngine::default_for_network( dashcore::Network::Testnet, ))); - MasternodesManager::new(storage.block_headers(), engine, dashcore::Network::Testnet) + MasternodesManager::new(storage.block_headers(), engine, dashcore::Network::Testnet).await } #[tokio::test] async fn test_masternode_manager_new() { let manager = create_test_manager().await; assert_eq!(manager.identifier(), ManagerIdentifier::Masternode); - assert_eq!(manager.state(), SyncState::WaitForEvents); + assert_eq!(manager.state(), SyncState::WaitingForConnections); assert_eq!( manager.wanted_message_types(), vec![MessageType::MnListDiff, MessageType::QRInfo] diff --git a/dash-spv/src/sync/sync_coordinator.rs b/dash-spv/src/sync/sync_coordinator.rs index ad074d2ec..63d26e94f 100644 --- a/dash-spv/src/sync/sync_coordinator.rs +++ b/dash-spv/src/sync/sync_coordinator.rs @@ -31,8 +31,8 @@ const DEFAULT_SYNC_EVENT_CAPACITY: usize = 10000; /// Macro to spawn a manager if present. macro_rules! spawn_manager { - ($self:expr, $field:ident, $network:expr) => { - if let Some(manager) = $self.managers.$field.take() { + ($self:expr, $manager:expr, $network:expr) => { + if let Some(manager) = $manager { let identifier = manager.identifier(); let wanted_message_types = manager.wanted_message_types(); let requests = $network.request_sender(); @@ -146,10 +146,21 @@ where W: WalletInterface + 'static, { /// Create a new coordinator with the given config. - /// - /// Managers are passed to `start()` when sync begins. - pub fn new(managers: Managers) -> Self { - let (progress_sender, progress_receiver) = watch::channel(SyncProgress::default()); + pub(crate) async fn new(managers: Managers) -> Self { + let mut initial_progress = SyncProgress::default(); + + try_update_progress(managers.block_headers.as_ref(), &mut initial_progress); + try_update_progress(managers.filter_headers.as_ref(), &mut initial_progress); + try_update_progress(managers.filters.as_ref(), &mut initial_progress); + try_update_progress(managers.blocks.as_ref(), &mut initial_progress); + try_update_progress(managers.masternode.as_ref(), &mut initial_progress); + try_update_progress(managers.chainlock.as_ref(), &mut initial_progress); + try_update_progress(managers.instantsend.as_ref(), &mut initial_progress); + + tracing::info!("Initial sync progress {}", initial_progress.clone()); + + let (progress_sender, progress_receiver) = watch::channel(initial_progress); + Self { managers, progress_receivers: Vec::new(), @@ -194,6 +205,15 @@ where let sync_start_time = Instant::now(); self.sync_start_time = Some(sync_start_time); + // Take managers for spawning + let block_headers = self.managers.block_headers.take(); + let filter_headers = self.managers.filter_headers.take(); + let filters = self.managers.filters.take(); + let blocks = self.managers.blocks.take(); + let masternode = self.managers.masternode.take(); + let chainlock = self.managers.chainlock.take(); + let instantsend = self.managers.instantsend.take(); + // Spawn each manager using the macro spawn_manager!(self, block_headers, network); spawn_manager!(self, filter_headers, network); @@ -336,11 +356,10 @@ async fn run_progress_task( shutdown: CancellationToken, sync_start_time: Instant, ) { - let streams: Vec<_> = - receivers.into_iter().map(|rx| WatchStream::new(rx).map(move |p| p)).collect(); + let streams: Vec<_> = receivers.into_iter().map(WatchStream::from_changes).collect(); let mut merged = select_all(streams); - let mut progress = SyncProgress::default(); + let mut progress = progress_sender.borrow().clone(); let mut was_synced = false; let mut sync_cycle: u32 = 0; let mut cycle_start = sync_start_time; @@ -386,6 +405,13 @@ fn update_progress_from_manager( } } +/// Try to merge progress from an optional manager into a SyncProgress. +fn try_update_progress(manager: Option<&impl SyncManager>, sync_progress: &mut SyncProgress) { + if let Some(manager) = manager { + update_progress_from_manager(sync_progress, manager.progress()); + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/dash-spv/src/sync/sync_manager.rs b/dash-spv/src/sync/sync_manager.rs index 421faf94f..2d8189b03 100644 --- a/dash-spv/src/sync/sync_manager.rs +++ b/dash-spv/src/sync/sync_manager.rs @@ -98,16 +98,6 @@ pub trait SyncManager: Send + Sync + std::fmt::Debug { /// manager's task via topic-based filtering. fn wanted_message_types(&self) -> &'static [MessageType]; - /// Initialize the manager. - /// - /// Called once at startup before the main loop. Loads persisted state - /// from internal storage and initial target heights. - async fn initialize(&mut self) -> SyncResult<()> { - self.set_state(SyncState::WaitingForConnections); - tracing::info!("{} initialized", self.identifier()); - Ok(()) - } - /// Start the sync process. /// /// Called after initialization to trigger the initial sync requests. @@ -207,7 +197,6 @@ pub trait SyncManager: Send + Sync + std::fmt::Debug { /// Run the manager task, processing messages, events, and periodic ticks. /// /// This consumes the manager and runs until shutdown is signaled. - /// The `initial_peer_count` parameter indicates how many peers are connected at start. async fn run(mut self, mut context: SyncManagerTaskContext) -> SyncResult where Self: Sized, @@ -217,9 +206,6 @@ pub trait SyncManager: Send + Sync + std::fmt::Debug { let mut sync_event_receiver = context.sync_event_sender.subscribe(); - // Initialize the manager - self.initialize().await?; - // Tick interval for periodic housekeeping let mut tick_interval = interval(Duration::from_millis(100));