Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions dash-spv/src/client/lifecycle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,14 @@ impl<W: WalletInterface, N: NetworkManager, S: StorageManager> DashSpvClient<W,
_ => Vec::new(),
};
let checkpoint_manager = Arc::new(CheckpointManager::new(checkpoints));
managers.block_headers =
Some(BlockHeadersManager::new(storage.block_headers(), checkpoint_manager).await?);
managers.block_headers = Some(
BlockHeadersManager::new(
storage.block_headers(),
storage.metadata(),
checkpoint_manager,
)
.await?,
);

if config.enable_filters {
managers.filter_headers = Some(
Expand Down
52 changes: 50 additions & 2 deletions dash-spv/src/storage/metadata.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,25 @@
use std::path::PathBuf;

use async_trait::async_trait;

use crate::{
error::StorageResult,
storage::{io::atomic_write, PersistentStorage},
StorageError,
};
use async_trait::async_trait;
use dashcore::prelude::CoreBlockHeight;

/// Metadata key for persisting the best known peer height.
const LAST_TARGET_HEIGHT_KEY: &str = "last_target_height";

#[async_trait]
pub trait MetadataStorage: Send + Sync + 'static {
async fn store_metadata(&mut self, key: &str, value: &[u8]) -> StorageResult<()>;

async fn load_metadata(&self, key: &str) -> StorageResult<Option<Vec<u8>>>;
/// Persist the last target height to metadata storage.
async fn store_last_target_height(&mut self, height: CoreBlockHeight) -> StorageResult<()>;
/// Load the best chainlock from metadata storage and restore progress.
async fn load_last_target_height(&self) -> StorageResult<CoreBlockHeight>;
}

pub struct PersistentMetadataStorage {
Expand Down Expand Up @@ -59,4 +67,44 @@ impl MetadataStorage for PersistentMetadataStorage {
let data = tokio::fs::read(path).await?;
Ok(Some(data))
}

/// Persist the last target height to metadata storage.
async fn store_last_target_height(&mut self, height: CoreBlockHeight) -> StorageResult<()> {
match serde_json::to_vec(&height) {
Ok(converted) => self.store_metadata(LAST_TARGET_HEIGHT_KEY, &converted).await,
Err(e) => {
let error = format!("Failed to serialize last target height: {}", e);
tracing::warn!(error);
Err(StorageError::Serialization(error))
}
}
}

/// Load the last target height from metadata storage. Used by the block headers manager to
/// restore progress after restart.
async fn load_last_target_height(&self) -> StorageResult<CoreBlockHeight> {
match self.load_metadata(LAST_TARGET_HEIGHT_KEY).await {
Ok(Some(bytes)) => match serde_json::from_slice::<CoreBlockHeight>(&bytes) {
Ok(last_target_height) => {
tracing::debug!("Restored last target height {}", last_target_height);
Ok(last_target_height)
}
Err(e) => {
let error = format!("Failed to deserialize last target height: {}", e);
tracing::warn!(error);
Err(StorageError::Serialization(error))
}
},
Ok(None) => {
let error = "No last target height found (fresh start)".to_string();
tracing::debug!(error);
Err(StorageError::NotFound(error))
}
Err(e) => {
let error = format!("Failed to load last target height: {}", e);
tracing::warn!(error);
Err(StorageError::Corruption(error))
}
}
}
}
20 changes: 14 additions & 6 deletions dash-spv/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,14 @@ mod peers;
mod segments;
mod transactions;

use crate::error::StorageResult;
use crate::storage::lockfile::LockFile;
use crate::storage::transactions::PersistentTransactionStorage;
use crate::types::{HashedBlock, HashedBlockHeader, MempoolState, UnconfirmedTransaction};
use crate::ClientConfig;
use async_trait::async_trait;
use dashcore::hash_types::FilterHeader;
use dashcore::prelude::CoreBlockHeight;
use dashcore::{Header as BlockHeader, Txid};
use std::collections::HashMap;
use std::ops::Range;
Expand All @@ -24,12 +30,6 @@ use std::sync::Arc;
use std::time::Duration;
use tokio::sync::RwLock;

use crate::error::StorageResult;
use crate::storage::lockfile::LockFile;
use crate::storage::transactions::PersistentTransactionStorage;
use crate::types::{HashedBlock, HashedBlockHeader, MempoolState, UnconfirmedTransaction};
use crate::ClientConfig;

pub use crate::storage::block_headers::{
BlockHeaderStorage, BlockHeaderTip, PersistentBlockHeaderStorage,
};
Expand Down Expand Up @@ -434,6 +434,14 @@ impl metadata::MetadataStorage for DiskStorageManager {
async fn load_metadata(&self, key: &str) -> StorageResult<Option<Vec<u8>>> {
self.metadata.read().await.load_metadata(key).await
}

async fn store_last_target_height(&mut self, height: CoreBlockHeight) -> StorageResult<()> {
self.metadata.write().await.store_last_target_height(height).await
}

async fn load_last_target_height(&self) -> StorageResult<CoreBlockHeight> {
self.metadata.read().await.load_last_target_height().await
}
}

#[async_trait]
Expand Down
27 changes: 19 additions & 8 deletions dash-spv/src/sync/block_headers/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use std::time::Instant;
use crate::chain::CheckpointManager;
use crate::error::{SyncError, SyncResult};
use crate::network::RequestSender;
use crate::storage::{BlockHeaderStorage, BlockHeaderTip};
use crate::storage::{BlockHeaderStorage, BlockHeaderTip, MetadataStorage};
use crate::sync::block_headers::HeadersPipeline;
use crate::sync::{BlockHeadersProgress, ProgressPercentage, SyncEvent, SyncManager, SyncState};
use crate::types::HashedBlockHeader;
Expand All @@ -30,18 +30,20 @@ use tokio::sync::RwLock;
/// - Post-sync header updates via inventory announcements
///
/// Generic over `H: BlockHeaderStorage` to allow different storage implementations.
pub struct BlockHeadersManager<H: BlockHeaderStorage> {
pub struct BlockHeadersManager<H: BlockHeaderStorage, M: MetadataStorage> {
/// Current progress of the manager.
pub(super) progress: BlockHeadersProgress,
/// Block header storage.
pub(super) header_storage: Arc<RwLock<H>>,
/// Metadata storage for persisting the best peer tip height.
pub(super) metadata_storage: Arc<RwLock<M>>,
/// Pipeline for parallel header downloads (used for both initial sync and post-sync).
pub(super) pipeline: HeadersPipeline,
/// Pending block announcements waiting for headers message (post-sync).
pub(super) pending_announcements: HashMap<BlockHash, Instant>,
}

impl<H: BlockHeaderStorage> std::fmt::Debug for BlockHeadersManager<H> {
impl<H: BlockHeaderStorage, M: MetadataStorage> std::fmt::Debug for BlockHeadersManager<H, M> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("BlockHeadersManager")
.field("progress", &self.progress)
Expand All @@ -50,10 +52,11 @@ impl<H: BlockHeaderStorage> std::fmt::Debug for BlockHeadersManager<H> {
}
}

impl<H: BlockHeaderStorage> BlockHeadersManager<H> {
impl<H: BlockHeaderStorage, M: MetadataStorage> BlockHeadersManager<H, M> {
/// Create a new headers manager with the given storage and checkpoint manager.
pub async fn new(
header_storage: Arc<RwLock<H>>,
metadata_storage: Arc<RwLock<M>>,
checkpoint_manager: Arc<CheckpointManager>,
) -> SyncResult<Self> {
let tip = header_storage
Expand All @@ -63,16 +66,21 @@ impl<H: BlockHeaderStorage> BlockHeadersManager<H> {
.await
.ok_or_else(|| SyncError::MissingDependency("No tip in storage".to_string()))?;

// Restore persisted target height, fall back to tip height
let target_height =
metadata_storage.read().await.load_last_target_height().await.unwrap_or(tip.height());

let mut initial_progress = BlockHeadersProgress::default();
initial_progress.set_state(SyncState::WaitingForConnections);
initial_progress.update_tip_height(tip.height());
initial_progress.update_target_height(tip.height());
initial_progress.update_target_height(target_height);

tracing::info!("BlockHeadersManager initialized at height {}", tip.height());

Ok(Self {
progress: initial_progress,
header_storage,
metadata_storage,
pipeline: HeadersPipeline::new(checkpoint_manager),
pending_announcements: HashMap::new(),
})
Expand Down Expand Up @@ -234,10 +242,13 @@ mod tests {
use super::*;
use crate::chain::checkpoints::testnet_checkpoints;
use crate::network::MessageType;
use crate::storage::{DiskStorageManager, PersistentBlockHeaderStorage, StorageManager};
use crate::storage::{
DiskStorageManager, PersistentBlockHeaderStorage, PersistentMetadataStorage, StorageManager,
};
use crate::sync::{ManagerIdentifier, SyncManagerProgress};

type TestBlockHeadersManager = BlockHeadersManager<PersistentBlockHeaderStorage>;
type TestBlockHeadersManager =
BlockHeadersManager<PersistentBlockHeaderStorage, PersistentMetadataStorage>;

fn create_test_checkpoint_manager() -> Arc<CheckpointManager> {
Arc::new(CheckpointManager::new(testnet_checkpoints()))
Expand All @@ -249,7 +260,7 @@ mod tests {
let genesis = Header::dummy_batch(0..1);
storage.store_headers(&genesis).await.unwrap();
let checkpoint_manager = create_test_checkpoint_manager();
BlockHeadersManager::new(storage.block_headers(), checkpoint_manager)
BlockHeadersManager::new(storage.block_headers(), storage.metadata(), checkpoint_manager)
.await
.expect("Failed to create BlockHeadersManager")
}
Expand Down
6 changes: 4 additions & 2 deletions dash-spv/src/sync/block_headers/sync_manager.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::error::SyncResult;
use crate::network::{Message, MessageType, NetworkEvent, RequestSender};
use crate::storage::BlockHeaderStorage;
use crate::storage::{BlockHeaderStorage, MetadataStorage};
use crate::sync::sync_manager::ensure_not_started;
use crate::sync::{
BlockHeadersManager, ManagerIdentifier, ProgressPercentage, SyncEvent, SyncManager,
Expand All @@ -15,7 +15,7 @@ use std::time::{Duration, Instant};
pub(super) const UNSOLICITED_HEADERS_WAIT_TIMEOUT: Duration = Duration::from_secs(3);

#[async_trait]
impl<H: BlockHeaderStorage> SyncManager for BlockHeadersManager<H> {
impl<H: BlockHeaderStorage, M: MetadataStorage> SyncManager for BlockHeadersManager<H, M> {
fn identifier(&self) -> ManagerIdentifier {
ManagerIdentifier::BlockHeader
}
Expand Down Expand Up @@ -152,6 +152,8 @@ impl<H: BlockHeaderStorage> SyncManager for BlockHeadersManager<H> {
{
if let Some(best_height) = best_height {
self.progress.update_target_height(*best_height);
let mut metadata_storage = self.metadata_storage.write().await;
metadata_storage.store_last_target_height(*best_height).await?;
}
if *connected_count == 0 {
self.stop_sync();
Expand Down
2 changes: 1 addition & 1 deletion dash-spv/src/sync/sync_coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ where
M: MetadataStorage,
W: WalletInterface + 'static,
{
pub block_headers: Option<BlockHeadersManager<H>>,
pub block_headers: Option<BlockHeadersManager<H, M>>,
pub filter_headers: Option<FilterHeadersManager<H, FH>>,
pub filters: Option<FiltersManager<H, FH, F, W>>,
pub blocks: Option<BlocksManager<H, B, W>>,
Expand Down