From 5023e59a3c67fe0ea4bba72ae78541f15a649245 Mon Sep 17 00:00:00 2001 From: shamb0 Date: Tue, 13 Aug 2024 11:52:18 +0530 Subject: [PATCH 1/3] [CX_HARDENING] Enhance DA Task Test Coverage - Test outdated proposal handling - Verify duplicate vote detection - Validate vote collection and processing - Ensure correct DA vote handling by non-leader nodes Signed-off-by: shamb0 --- crates/hotshot/src/lib.rs | 25 + crates/testing/src/helpers.rs | 190 +++++- crates/testing/src/predicates/event.rs | 50 +- crates/testing/src/script.rs | 12 +- crates/testing/tests/tests_cx_hardening.rs | 3 + .../tests/tests_cx_hardening/cxh_da_task.rs | 564 ++++++++++++++++++ crates/types/src/event.rs | 64 ++ 7 files changed, 902 insertions(+), 6 deletions(-) create mode 100644 crates/testing/tests/tests_cx_hardening.rs create mode 100644 crates/testing/tests/tests_cx_hardening/cxh_da_task.rs diff --git a/crates/hotshot/src/lib.rs b/crates/hotshot/src/lib.rs index 090a9687de..40d0b4bda7 100644 --- a/crates/hotshot/src/lib.rs +++ b/crates/hotshot/src/lib.rs @@ -639,6 +639,31 @@ impl, V: Versions> SystemContext, V: Versions> SystemContext { + /// Creates a lightweight version of the system handle for task state testing. + /// + /// This method provides a minimal context for task state tests, omitting the full + /// consensus and network task launches. It results in fewer traces and simplifies debugging. + /// + /// # Returns + /// A `SystemContextHandle` with minimal setup for task state testing. + pub fn create_lean_test_handle(&self) -> SystemContextHandle { + let (internal_sender, internal_receiver) = broadcast(EVENT_CHANNEL_SIZE); + + SystemContextHandle { + consensus_registry: ConsensusTaskRegistry::new(), + network_registry: NetworkTaskRegistry::new(), + output_event_stream: self.external_event_stream.clone(), + internal_event_stream: (internal_sender, internal_receiver.deactivate()), + hotshot: self.clone().into(), + storage: Arc::clone(&self.storage), + network: Arc::clone(&self.network), + memberships: Arc::clone(&self.memberships), + } + } +} + /// An async broadcast channel type Channel = (Sender>, Receiver>); diff --git a/crates/testing/src/helpers.rs b/crates/testing/src/helpers.rs index 09c3b6e87e..514a511c9c 100644 --- a/crates/testing/src/helpers.rs +++ b/crates/testing/src/helpers.rs @@ -5,15 +5,17 @@ // along with the HotShot repository. If not, see . #![allow(clippy::panic)] -use std::{fmt::Debug, hash::Hash, marker::PhantomData, sync::Arc}; +use std::{fmt::Debug, hash::Hash, marker::PhantomData, sync::Arc, time::Duration}; use async_broadcast::{Receiver, Sender}; +use async_compatibility_layer::art::async_timeout; use bitvec::bitvec; use committable::Committable; use ethereum_types::U256; +use futures::StreamExt; use hotshot::{ traits::{NodeImplementation, TestableNodeImplementation}, - types::{BLSPubKey, SignatureKey, SystemContextHandle}, + types::{BLSPubKey, Event, SignatureKey, SystemContextHandle}, HotShotInitializer, Memberships, SystemContext, }; use hotshot_example_types::{ @@ -40,11 +42,15 @@ use hotshot_types::{ utils::{View, ViewInner}, vid::{vid_scheme, VidCommitment, VidSchemeType}, vote::{Certificate, HasViewNumber, Vote}, + PeerConfig, }; use jf_vid::VidScheme; use serde::Serialize; -use crate::test_builder::TestDescription; +use crate::{ + predicates::PredicateResult, script::ExternalEventsExpectations, test_builder::TestDescription, + test_launcher::TestLauncher, +}; /// create the [`SystemContextHandle`] from a node id /// # Panics @@ -128,6 +134,105 @@ pub async fn build_system_handle< .expect("Could not init hotshot") } +/// create the [`SystemContextHandle`] from a launcher +/// # Panics +/// if cannot create a [`HotShotInitializer`] +pub async fn build_system_handle_from_launcher< + TYPES: NodeType, + I: NodeImplementation< + TYPES, + Storage = TestStorage, + AuctionResultsProvider = TestAuctionResultsProvider, + > + TestableNodeImplementation, + V: Versions, +>( + node_id: u64, + launcher: TestLauncher, + membership_config: Option>, +) -> Result, anyhow::Error> { + tracing::info!("Creating system handle for node {}", node_id); + + let network = (launcher.resource_generator.channel_generator)(node_id).await; + let storage = (launcher.resource_generator.storage)(node_id); + let marketplace_config = (launcher.resource_generator.marketplace_config)(node_id); + let config = launcher.resource_generator.config.clone(); + + let known_nodes_with_stake = config.known_nodes_with_stake.clone(); + let private_key = config.my_own_validator_config.private_key.clone(); + let public_key = config.my_own_validator_config.public_key.clone(); + + let memberships = membership_config.unwrap_or_else(|| { + let create_membership = |nodes: &[PeerConfig], topic: Topic| { + TYPES::Membership::create_election( + known_nodes_with_stake.clone(), + nodes.to_vec(), + topic, + config.fixed_leader_for_gpuvid, + ) + }; + + Memberships { + quorum_membership: create_membership(&known_nodes_with_stake, Topic::Global), + da_membership: create_membership(&config.known_da_nodes, Topic::Da), + vid_membership: create_membership(&known_nodes_with_stake, Topic::Global), + view_sync_membership: create_membership(&known_nodes_with_stake, Topic::Global), + } + }); + + let initializer = HotShotInitializer::::from_genesis(TestInstanceState::new( + launcher.metadata.async_delay_config, + )) + .await + .unwrap(); + + let system_context = SystemContext::new( + public_key, + private_key, + node_id, + config, + memberships, + network, + initializer, + ConsensusMetricsValue::default(), + storage, + marketplace_config, + ); + + let system_context_handle = system_context.create_lean_test_handle(); + + tracing::info!("Successfully created system handle for node {}", node_id); + Ok(system_context_handle) +} + +/// create certificate +/// # Panics +/// if we fail to sign the data +pub fn build_da_cert< + TYPES: NodeType, + DATAType: Committable + Clone + Eq + Hash + Serialize + Debug + 'static, + VOTE: Vote, + CERT: Certificate, +>( + data: DATAType, + membership: &TYPES::Membership, + view: TYPES::Time, + public_key: &TYPES::SignatureKey, + private_key: &::PrivateKey, +) -> CERT { + let real_qc_sig = + build_da_assembled_sig::(&data, membership, view); + + let vote = + SimpleVote::::create_signed_vote(data, view, public_key, private_key) + .expect("Failed to sign data!"); + CERT::create_signed_certificate( + vote.date_commitment(), + vote.date().clone(), + real_qc_sig, + vote.view_number(), + ) +} + /// create certificate /// # Panics /// if we fail to sign the data @@ -171,6 +276,46 @@ pub fn vid_share( .clone() } +/// create DA signature +/// # Panics +/// if fails to convert node id into keypair +pub fn build_da_assembled_sig( + data: &DATAType, + membership: &TYPES::Membership, + view: TYPES::Time, +) -> ::QcType +where + TYPES: NodeType, + VOTE: Vote, + CERT: Certificate, + DATAType: Committable + Clone + Eq + Hash + Serialize + Debug + 'static, +{ + let stake_table = membership.committee_qc_stake_table(); + let total_nodes = stake_table.len(); + let threshold = CERT::threshold(membership) as usize; + let real_qc_pp = + TYPES::SignatureKey::public_parameter(stake_table.clone(), U256::from(threshold as u64)); + + let mut signers = bitvec![0; total_nodes]; + let sig_lists: Vec<_> = (0..threshold) + .map(|node_id| { + let (private_key, public_key) = key_pair_for_id(node_id as u64); + let vote = SimpleVote::::create_signed_vote( + data.clone(), + view, + &public_key, + &private_key, + ) + .expect("Failed to sign data"); + + signers.set(node_id, true); + vote.signature() + }) + .collect(); + + TYPES::SignatureKey::assemble(&real_qc_pp, signers.as_bitslice(), &sig_lists) +} + /// create signature /// # Panics /// if fails to convert node id into keypair @@ -330,7 +475,7 @@ pub fn build_da_certificate( payload_commit: da_payload_commitment, }; - build_cert::, DaCertificate>( + build_da_cert::, DaCertificate>( da_data, da_membership, view_number, @@ -393,3 +538,40 @@ pub fn build_fake_view_with_leaf_and_state( }, } } + +pub async fn check_external_events> + Unpin>( + mut output_stream: S, + expectations: &[ExternalEventsExpectations], + timeout: Duration, +) -> Result<(), String> { + let mut external_event_expectations_met = vec![false; expectations.len()]; + + while let Ok(Some(ext_event_received_output)) = + async_timeout(timeout, output_stream.next()).await + { + tracing::debug!("Test received Ext Event: {:?}", ext_event_received_output); + for (index, expectation) in expectations.iter().enumerate() { + if !external_event_expectations_met[index] { + for predicate in &expectation.output_asserts { + if predicate + .evaluate(&Arc::new(ext_event_received_output.clone())) + .await + == PredicateResult::Pass + { + external_event_expectations_met[index] = true; + break; + } + } + } + } + if external_event_expectations_met.iter().all(|&x| x) { + return Ok(()); + } + } + + if external_event_expectations_met.iter().all(|&x| x) { + Ok(()) + } else { + Err("Not all external event expectations were met".to_string()) + } +} diff --git a/crates/testing/src/predicates/event.rs b/crates/testing/src/predicates/event.rs index 8b740b8482..074dd129e5 100644 --- a/crates/testing/src/predicates/event.rs +++ b/crates/testing/src/predicates/event.rs @@ -11,10 +11,14 @@ use async_trait::async_trait; use hotshot_task_impls::events::{HotShotEvent, HotShotEvent::*}; use hotshot_types::{ data::null_block, + event::Event, traits::{block_contents::BlockHeader, node_implementation::NodeType}, }; -use crate::predicates::{Predicate, PredicateResult}; +use crate::{ + predicates::{Predicate, PredicateResult}, + script::ExternalEventsExpectations, +}; type EventCallback = Arc>) -> bool + Send + Sync>; @@ -294,3 +298,47 @@ where }); Box::new(EventPredicate { check, info }) } + +// New predicate type for EventType +type ExternalEventCheckFn = dyn Fn(&Event) -> bool + Send + Sync; + +pub struct ExternalEventPredicate { + check: Arc>, + info: String, +} + +impl std::fmt::Debug for ExternalEventPredicate { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.info) + } +} + +#[async_trait] +impl Predicate>> + for ExternalEventPredicate +{ + async fn evaluate(&self, input: &Arc>) -> PredicateResult { + PredicateResult::from((self.check)(input)) + } + + async fn info(&self) -> String { + self.info.clone() + } +} + +pub fn ext_event_exact( + event: Event, +) -> Box>>> { + let info = format!("{:?}", event); + let check = Arc::new(move |e: &Event| { + e.event == event.event && e.view_number == event.view_number + }); + + Box::new(ExternalEventPredicate { check, info }) +} + +pub fn expect_external_events( + predicates: Vec>>>>, +) -> ExternalEventsExpectations { + ExternalEventsExpectations::from_outputs(predicates) +} diff --git a/crates/testing/src/script.rs b/crates/testing/src/script.rs index b25e286f9f..eb46c2d86e 100644 --- a/crates/testing/src/script.rs +++ b/crates/testing/src/script.rs @@ -7,7 +7,7 @@ use std::{sync::Arc, time::Duration}; use hotshot_task_impls::events::HotShotEvent; -use hotshot_types::traits::node_implementation::NodeType; +use hotshot_types::{event::Event, traits::node_implementation::NodeType}; use crate::predicates::{Predicate, PredicateResult}; @@ -66,6 +66,16 @@ impl Expectations { } } +pub struct ExternalEventsExpectations { + pub output_asserts: Vec>>>>, +} + +impl ExternalEventsExpectations { + pub fn from_outputs(output_asserts: Vec>>>>) -> Self { + Self { output_asserts } + } +} + pub fn panic_extra_output_in_script(stage_number: usize, script_name: String, output: &S) where S: std::fmt::Debug, diff --git a/crates/testing/tests/tests_cx_hardening.rs b/crates/testing/tests/tests_cx_hardening.rs new file mode 100644 index 0000000000..e521b7ce12 --- /dev/null +++ b/crates/testing/tests/tests_cx_hardening.rs @@ -0,0 +1,3 @@ +mod tests_cx_hardening { + automod::dir!("tests/tests_cx_hardening"); +} diff --git a/crates/testing/tests/tests_cx_hardening/cxh_da_task.rs b/crates/testing/tests/tests_cx_hardening/cxh_da_task.rs new file mode 100644 index 0000000000..2198b4bc73 --- /dev/null +++ b/crates/testing/tests/tests_cx_hardening/cxh_da_task.rs @@ -0,0 +1,564 @@ +use std::{sync::Arc, time::Duration}; + +use futures::StreamExt; +use hotshot::tasks::task_state::CreateTaskState; +use hotshot_example_types::{ + block_types::TestTransaction, + node_types::{MemoryImpl, TestTypes, TestVersions}, +}; +use hotshot_macros::{run_test, test_scripts}; +use hotshot_task_impls::{da::DaTaskState, events::HotShotEvent}; +use hotshot_testing::{ + helpers::{build_system_handle_from_launcher, check_external_events}, + predicates::event::{exact, expect_external_events, ext_event_exact}, + script::{Expectations, InputOrder, TaskScript}, + serial, + test_builder::TestDescription, + view_generator::TestViewGenerator, +}; +use hotshot_types::{ + data::ViewNumber, + event::{Event, EventType}, + simple_vote::DaData, + traits::{ + block_contents::precompute_vid_commitment, election::Membership, + node_implementation::ConsensusTime, + }, +}; + + +/// Test the DA Task for handling an outdated proposal. +/// +/// This test checks that when an outdated DA proposal is received, it doesn't produce +/// any output, while a current proposal triggers appropriate actions (validation and voting). +#[cfg_attr(async_executor_impl = "tokio", tokio::test(flavor = "multi_thread"))] +#[cfg_attr(async_executor_impl = "async-std", async_std::test)] +async fn test_da_task_outdated_proposal() { + // Setup logging and backtrace for debugging + async_compatibility_layer::logging::setup_logging(); + async_compatibility_layer::logging::setup_backtrace(); + + // Parameters for the test + let node_id: u64 = 2; + let num_nodes: usize = 10; + let da_committee_size: usize = 7; + + // Initialize test description with node and committee details + let test_description = TestDescription { + num_nodes_with_stake: num_nodes, + da_staked_committee_size: da_committee_size, + start_nodes: num_nodes, + ..TestDescription::default() + }; + + // Generate a launcher for the test system with a custom configuration + let launcher = test_description + .gen_launcher(node_id) + .modify_default_config(|config| { + config.next_view_timeout = 1000; + config.timeout_ratio = (12, 10); + config.da_staked_committee_size = da_committee_size; + }); + + // Build the system handle using the launcher configuration + let handle = + build_system_handle_from_launcher::(node_id, launcher, None) + .await + .expect("Failed to initialize HotShot"); + + // Prepare empty transactions and compute a commitment for later use + let transactions = vec![TestTransaction::new(vec![0])]; + let encoded_transactions = Arc::from(TestTransaction::encode(&transactions)); + let (payload_commit, _precompute) = precompute_vid_commitment( + &encoded_transactions, + handle.hotshot.memberships.quorum_membership.total_nodes(), + ); + + // Initialize a view generator using the current memberships + let mut view_generator = TestViewGenerator::generate( + handle.hotshot.memberships.quorum_membership.clone(), + handle.hotshot.memberships.da_membership.clone(), + ); + + // Generate views for the test + let view1 = view_generator.next().await.unwrap(); + let _view2 = view_generator.next().await.unwrap(); + view_generator.add_transactions(transactions); + let view3 = view_generator.next().await.unwrap(); + + // Define input events for the test: + // 1. Three view changes and an outdated proposal for view 1 when in view 3 + // 2. A current proposal for view 3 + let inputs = vec![ + serial![ + HotShotEvent::ViewChange(ViewNumber::new(1)), + HotShotEvent::ViewChange(ViewNumber::new(2)), + HotShotEvent::ViewChange(ViewNumber::new(3)), + // Send an outdated proposal (view 1) when we're in view 3 + HotShotEvent::DaProposalRecv(view1.da_proposal.clone(), view1.leader_public_key), + ], + serial![ + // Send a current proposal (view 3) + HotShotEvent::DaProposalRecv(view3.da_proposal.clone(), view3.leader_public_key), + ], + ]; + + // Define expectations: + // 1. No output for the outdated proposal + // 2. Validation and voting actions for the current proposal + let expectations = vec![ + Expectations::from_outputs(vec![]), + Expectations::from_outputs(vec![ + exact(HotShotEvent::DaProposalValidated( + view3.da_proposal.clone(), + view3.leader_public_key, + )), + exact(HotShotEvent::DaVoteSend(view3.create_da_vote( + DaData { + payload_commit, + }, + &handle, + ))), + ]), + ]; + + // Define expectations for external events triggered by the system + let external_event_expectations = vec![expect_external_events(vec![ext_event_exact(Event { + view_number: view3.view_number, + event: EventType::DaProposal { + proposal: view3.da_proposal.clone(), + sender: view3.leader_public_key, + }, + })])]; + + // Create DA task state and script for the test + let da_state = DaTaskState::::create_from(&handle).await; + let mut da_script = TaskScript { + timeout: Duration::from_millis(100), + state: da_state, + expectations, + }; + + // Run the test with the inputs and check the resulting events + let output_event_stream_recv = handle.event_stream(); + run_test![inputs, da_script].await; + + // Validate the external events against expectations + let result = check_external_events( + output_event_stream_recv, + &external_event_expectations, + da_script.timeout, + ) + .await; + assert!(result.is_ok(), "{}", result.err().unwrap()); +} + +/// Test the DA Task for handling duplicate votes. +/// +/// This test ensures that when duplicate votes are received in the DA Task, +/// they are correctly ignored without producing any output. The original +/// proposal and vote are processed as expected, validating the proposal +/// and sending the first vote, while the duplicate vote is disregarded. +#[cfg_attr(async_executor_impl = "tokio", tokio::test(flavor = "multi_thread"))] +#[cfg_attr(async_executor_impl = "async-std", async_std::test)] +async fn test_da_task_duplicate_votes() { + // Setup logging and backtrace for debugging + async_compatibility_layer::logging::setup_logging(); + async_compatibility_layer::logging::setup_backtrace(); + + // Parameters for the test + let node_id: u64 = 2; + let num_nodes: usize = 10; + let da_committee_size: usize = 7; + + // Initialize test description with node and committee details + let test_description = TestDescription { + num_nodes_with_stake: num_nodes, + da_staked_committee_size: da_committee_size, + start_nodes: num_nodes, + ..TestDescription::default() + }; + + // Generate a launcher for the test system with a custom configuration + let launcher = test_description + .gen_launcher(node_id) + .modify_default_config(|config| { + config.next_view_timeout = 1000; + config.timeout_ratio = (12, 10); + config.da_staked_committee_size = da_committee_size; + }); + + // Build the system handle using the launcher configuration + let handle = + build_system_handle_from_launcher::(node_id, launcher, None) + .await + .expect("Failed to initialize HotShot"); + + // Prepare empty transactions and compute a commitment for later use + let transactions = vec![TestTransaction::new(vec![0])]; + let encoded_transactions = Arc::from(TestTransaction::encode(&transactions)); + let (payload_commit, _precompute) = precompute_vid_commitment( + &encoded_transactions, + handle.hotshot.memberships.quorum_membership.total_nodes(), + ); + + // Initialize a view generator using the current memberships + let mut view_generator = TestViewGenerator::generate( + handle.hotshot.memberships.quorum_membership.clone(), + handle.hotshot.memberships.da_membership.clone(), + ); + + // Generate views for the test + let _view1 = view_generator.next().await.unwrap(); + view_generator.add_transactions(transactions); + let view2 = view_generator.next().await.unwrap(); + + // Create a duplicate vote + let duplicate_vote = view2.create_da_vote( + DaData { + payload_commit, + }, + &handle, + ); + + let inputs = vec![ + serial![ + HotShotEvent::ViewChange(ViewNumber::new(1)), + HotShotEvent::ViewChange(ViewNumber::new(2)), + HotShotEvent::DaProposalRecv(view2.da_proposal.clone(), view2.leader_public_key), + ], + serial![ + // Send the original vote + HotShotEvent::DaVoteRecv(duplicate_vote.clone()), + // Send the duplicate vote + HotShotEvent::DaVoteRecv(duplicate_vote.clone()), + ], + ]; + + // We expect the task to process the proposal and the first vote, but ignore the duplicate + let expectations = vec![ + Expectations::from_outputs(vec![ + exact(HotShotEvent::DaProposalValidated( + view2.da_proposal.clone(), + view2.leader_public_key, + )), + exact(HotShotEvent::DaVoteSend(duplicate_vote.clone())), + ]), + Expectations::from_outputs(vec![ + // No output expected for the duplicate vote + ]), + ]; + + // We expect to see an external event for the proposal, but not for individual votes + let external_event_expectations = vec![expect_external_events(vec![ext_event_exact(Event { + view_number: view2.view_number, + event: EventType::DaProposal { + proposal: view2.da_proposal.clone(), + sender: view2.leader_public_key, + }, + })])]; + + // Create DA task state and script for the test + let da_state = DaTaskState::::create_from(&handle).await; + let mut da_script = TaskScript { + timeout: Duration::from_millis(100), + state: da_state, + expectations, + }; + + // Run the test with the inputs and check the resulting events + let output_event_stream_recv = handle.event_stream(); + run_test![inputs, da_script].await; + + // Validate the external events against expectations + let result = check_external_events( + output_event_stream_recv, + &external_event_expectations, + da_script.timeout, + ) + .await; + assert!(result.is_ok(), "{}", result.err().unwrap()); +} + +/// Tests the DA Task for collecting and processing valid votes. +/// +/// This test verifies that the DA Task correctly handles the proposal and votes +/// from DA committee members. It does the following: +/// +/// 1. Initializes a test environment with multiple nodes, configuring one as the leader. +/// 2. Creates and sends a DA proposal followed by valid votes from the committee. +/// 3. Asserts that the proposal is validated and a vote is sent in response by the leader. +/// +/// The test ensures that only the intended vote is processed. +#[cfg_attr(async_executor_impl = "tokio", tokio::test(flavor = "multi_thread"))] +#[cfg_attr(async_executor_impl = "async-std", async_std::test)] +async fn test_da_task_vote_collection() { + // Initialize logging and backtrace for debugging + async_compatibility_layer::logging::setup_logging(); + async_compatibility_layer::logging::setup_backtrace(); + + // Setup: Create system handles for multiple nodes (at least 5 for DA committee) + let leader_node_id: usize = 4; + let num_nodes: usize = 6; + let da_committee_size: usize = 5; + + let test_description = TestDescription { + num_nodes_with_stake: num_nodes, + da_staked_committee_size: da_committee_size, + start_nodes: num_nodes, + ..TestDescription::default() + }; + + // Create handles and view generators for all nodes + let mut handles = Vec::new(); + let mut view_generators = Vec::new(); + + for node_id in 0..num_nodes as u64 { + let launcher = test_description + .clone() + .gen_launcher(node_id) + .modify_default_config(|config| { + config.next_view_timeout = 1000; + config.timeout_ratio = (12, 10); + config.da_staked_committee_size = da_committee_size; + }); + + let handle = + build_system_handle_from_launcher::(node_id, launcher, None) + .await + .expect("Failed to initialize HotShot"); + + // Create the scenario with necessary views + let view_generator = TestViewGenerator::generate( + handle.hotshot.memberships.quorum_membership.clone(), + handle.hotshot.memberships.da_membership.clone(), + ); + + handles.push(handle); + view_generators.push(view_generator); + } + + // Prepare empty transactions and compute a commitment for later use + let transactions = vec![TestTransaction::new(vec![0])]; + let encoded_transactions = Arc::from(TestTransaction::encode(&transactions)); + let (payload_commit, _precompute) = precompute_vid_commitment( + &encoded_transactions, + handles[leader_node_id] + .hotshot + .memberships + .quorum_membership + .total_nodes(), + ); + + let _view1 = view_generators[leader_node_id].next().await.unwrap(); + let _view2 = view_generators[leader_node_id].next().await.unwrap(); + let _view3 = view_generators[leader_node_id].next().await.unwrap(); + view_generators[leader_node_id].add_transactions(transactions); + let view4 = view_generators[leader_node_id].next().await.unwrap(); + + // Create votes for all nodes + let votes: Vec<_> = handles + .iter() + .map(|handle| view4.create_da_vote(DaData { payload_commit }, handle)) + .collect(); + + // Simulate sending valid votes + let inputs = vec![ + serial![ + HotShotEvent::ViewChange(ViewNumber::new(1)), + HotShotEvent::ViewChange(ViewNumber::new(2)), + HotShotEvent::ViewChange(ViewNumber::new(3)), + HotShotEvent::ViewChange(ViewNumber::new(4)), + HotShotEvent::DaProposalRecv(view4.da_proposal.clone(), view4.leader_public_key), + ], + serial![ + // Send votes from the DA committee members + HotShotEvent::DaVoteRecv(votes[0].clone()), + HotShotEvent::DaVoteRecv(votes[1].clone()), + HotShotEvent::DaVoteRecv(votes[2].clone()), + HotShotEvent::DaVoteRecv(votes[3].clone()), + ], + ]; + + // Assert the outcome + let expectations = vec![ + Expectations::from_outputs(vec![ + exact(HotShotEvent::DaProposalValidated( + view4.da_proposal.clone(), + view4.leader_public_key, + )), + exact(HotShotEvent::DaVoteSend(votes[leader_node_id].clone())), + ]), + Expectations::from_outputs(vec![exact(HotShotEvent::DacSend( + view4.da_certificate, + view4.leader_public_key, + ))]), + ]; + + // Create DA task state and script + let da_state = + DaTaskState::::create_from(&handles[leader_node_id]).await; + let mut da_script = TaskScript { + timeout: Duration::from_millis(100), + state: da_state, + expectations, + }; + + // Run the test + let output_event_stream_recv = handles[leader_node_id].event_stream(); + run_test![inputs, da_script].await; + + // Check for DacSend event in the output stream + let result = check_external_events( + output_event_stream_recv, + &[expect_external_events(vec![ext_event_exact(Event { + view_number: view4.view_number, + event: EventType::DaProposal { + proposal: view4.da_proposal.clone(), + sender: view4.leader_public_key, + }, + })])], + da_script.timeout, + ) + .await; + assert!(result.is_ok(), "{}", result.err().unwrap()); +} + +/// Test that non-leader nodes correctly handle DA (Data Availability) votes and +/// ignore the certificate event when the node is not the leader during the voting process. +/// +/// This test sets up a scenario where a specific node (not the leader) processes +/// multiple views and receives votes from DA committee members. The test checks +/// that the expected outcome occurs, verifying that a `DacSend` event is not +/// generated since the node is not the leader. +#[cfg_attr(async_executor_impl = "tokio", tokio::test(flavor = "multi_thread"))] +#[cfg_attr(async_executor_impl = "async-std", async_std::test)] +async fn test_da_task_non_leader_vote_collection_ignore() { + // Initialize logging and backtrace for debugging + async_compatibility_layer::logging::setup_logging(); + async_compatibility_layer::logging::setup_backtrace(); + + // Setup: Create system handles for multiple nodes (at least 5 for DA committee) + let leader_node_id: usize = 3; + let num_nodes: usize = 6; + let da_committee_size: usize = 5; + + let test_description = TestDescription { + num_nodes_with_stake: num_nodes, + da_staked_committee_size: da_committee_size, + start_nodes: num_nodes, + ..TestDescription::default() + }; + + // Create handles and view generators for all nodes + let mut handles = Vec::new(); + let mut view_generators = Vec::new(); + + for node_id in 0..num_nodes as u64 { + let launcher = test_description + .clone() + .gen_launcher(node_id) + .modify_default_config(|config| { + config.next_view_timeout = 1000; + config.timeout_ratio = (12, 10); + config.da_staked_committee_size = da_committee_size; + }); + + let handle = + build_system_handle_from_launcher::(node_id, launcher, None) + .await + .expect("Failed to initialize HotShot"); + + // Create the scenario with necessary views + let view_generator = TestViewGenerator::generate( + handle.hotshot.memberships.quorum_membership.clone(), + handle.hotshot.memberships.da_membership.clone(), + ); + + handles.push(handle); + view_generators.push(view_generator); + } + + // Prepare empty transactions and compute a commitment for later use + let transactions = vec![TestTransaction::new(vec![0])]; + let encoded_transactions = Arc::from(TestTransaction::encode(&transactions)); + let (payload_commit, _precompute) = precompute_vid_commitment( + &encoded_transactions, + handles[leader_node_id] + .hotshot + .memberships + .quorum_membership + .total_nodes(), + ); + + // Generate multiple views and add transactions for the leader node + let _view1 = view_generators[leader_node_id].next().await.unwrap(); + let _view2 = view_generators[leader_node_id].next().await.unwrap(); + let _view3 = view_generators[leader_node_id].next().await.unwrap(); + view_generators[leader_node_id].add_transactions(transactions); + let view4 = view_generators[leader_node_id].next().await.unwrap(); + + // Create votes for all nodes + let votes: Vec<_> = handles + .iter() + .map(|handle| view4.create_da_vote(DaData { payload_commit }, handle)) + .collect(); + + // Simulate sending valid votes and a proposal to the DA committee + let inputs = vec![ + serial![ + HotShotEvent::ViewChange(ViewNumber::new(1)), + HotShotEvent::ViewChange(ViewNumber::new(2)), + HotShotEvent::ViewChange(ViewNumber::new(3)), + HotShotEvent::ViewChange(ViewNumber::new(4)), + HotShotEvent::DaProposalRecv(view4.da_proposal.clone(), view4.leader_public_key), + ], + serial![ + // Send votes from the DA committee members + HotShotEvent::DaVoteRecv(votes[0].clone()), + HotShotEvent::DaVoteRecv(votes[1].clone()), + HotShotEvent::DaVoteRecv(votes[2].clone()), + HotShotEvent::DaVoteRecv(votes[3].clone()), + ], + ]; + + // Define the expected outcome for the non-leader node + let expectations = vec![ + Expectations::from_outputs(vec![ + exact(HotShotEvent::DaProposalValidated( + view4.da_proposal.clone(), + view4.leader_public_key, + )), + exact(HotShotEvent::DaVoteSend(votes[leader_node_id].clone())), + ]), + Expectations::from_outputs(vec![]), + ]; + + // Create DA task state and script for the test + let da_state = + DaTaskState::::create_from(&handles[leader_node_id]).await; + let mut da_script = TaskScript { + timeout: Duration::from_millis(100), + state: da_state, + expectations, + }; + + // Run the test with the prepared inputs and script + let output_event_stream_recv = handles[leader_node_id].event_stream(); + run_test![inputs, da_script].await; + + // Verify that the non-leader node did not generate the DacSend event + let result = check_external_events( + output_event_stream_recv, + &[expect_external_events(vec![ext_event_exact(Event { + view_number: view4.view_number, + event: EventType::DaProposal { + proposal: view4.da_proposal.clone(), + sender: view4.leader_public_key, + }, + })])], + da_script.timeout, + ) + .await; + assert!(result.is_ok(), "{}", result.err().unwrap()); +} diff --git a/crates/types/src/event.rs b/crates/types/src/event.rs index 88c429d5f0..2e23cfaebb 100644 --- a/crates/types/src/event.rs +++ b/crates/types/src/event.rs @@ -173,6 +173,70 @@ pub enum EventType { /// A message destined for external listeners was received ExternalMessageReceived(Vec), } + +// Implement Eq as well, since EventType should have reflexive equality +impl Eq for EventType {} + +impl PartialEq for EventType { + fn eq(&self, other: &Self) -> bool { + match (self, other) { + ( + EventType::DaProposal { + proposal: p1, + sender: s1, + }, + EventType::DaProposal { + proposal: p2, + sender: s2, + }, + ) => p1 == p2 && s1 == s2, + ( + EventType::ReplicaViewTimeout { view_number: v1 }, + EventType::ReplicaViewTimeout { view_number: v2 }, + ) + | ( + EventType::ViewFinished { view_number: v1 }, + EventType::ViewFinished { view_number: v2 }, + ) + | ( + EventType::ViewTimeout { view_number: v1 }, + EventType::ViewTimeout { view_number: v2 }, + ) => v1 == v2, + ( + EventType::Transactions { transactions: t1 }, + EventType::Transactions { transactions: t2 }, + ) => t1 == t2, + ( + EventType::QuorumProposal { + proposal: p1, + sender: s1, + }, + EventType::QuorumProposal { + proposal: p2, + sender: s2, + }, + ) => p1 == p2 && s1 == s2, + ( + EventType::UpgradeProposal { + proposal: p1, + sender: s1, + }, + EventType::UpgradeProposal { + proposal: p2, + sender: s2, + }, + ) => p1 == p2 && s1 == s2, + (EventType::ExternalMessageReceived(m1), EventType::ExternalMessageReceived(m2)) => { + m1 == m2 + } + (event_v1, _event_v2) => unreachable!( + "TODO: PartialEq not yet implemented for this EventType variant {:#?}", + event_v1 + ), + } + } +} + #[derive(Debug, Serialize, Deserialize, Clone)] /// A list of actions that we track for nodes pub enum HotShotAction { From 3cb4bc0e23d0c9adc214a3fe91ce4dbf1910174c Mon Sep 17 00:00:00 2001 From: shamb0 Date: Wed, 14 Aug 2024 09:29:48 +0530 Subject: [PATCH 2/3] Refactor: Address review comments Signed-off-by: shamb0 --- crates/testing/tests/tests_1/da_task.rs | 548 ++++++++++++++++- crates/testing/tests/tests_cx_hardening.rs | 3 - .../tests/tests_cx_hardening/cxh_da_task.rs | 564 ------------------ 3 files changed, 545 insertions(+), 570 deletions(-) delete mode 100644 crates/testing/tests/tests_cx_hardening.rs delete mode 100644 crates/testing/tests/tests_cx_hardening/cxh_da_task.rs diff --git a/crates/testing/tests/tests_1/da_task.rs b/crates/testing/tests/tests_1/da_task.rs index 34dfc406b6..f2ef882f8f 100644 --- a/crates/testing/tests/tests_1/da_task.rs +++ b/crates/testing/tests/tests_1/da_task.rs @@ -13,17 +13,22 @@ use hotshot_example_types::{ node_types::{MemoryImpl, TestTypes, TestVersions}, }; use hotshot_macros::{run_test, test_scripts}; -use hotshot_task_impls::{da::DaTaskState, events::HotShotEvent::*}; +use hotshot_task_impls::{ + da::DaTaskState, events::HotShotEvent::*, + events::HotShotEvent, +}; use hotshot_testing::{ - helpers::build_system_handle, - predicates::event::exact, + helpers::{ build_system_handle, build_system_handle_from_launcher, check_external_events }, + predicates::event::{ exact, expect_external_events, ext_event_exact }, script::{Expectations, InputOrder, TaskScript}, serial, view_generator::TestViewGenerator, + test_builder::TestDescription, }; use hotshot_types::{ data::{null_block, PackedBundle, ViewNumber}, simple_vote::DaData, + event::{Event, EventType}, traits::{ block_contents::precompute_vid_commitment, election::Membership, @@ -209,3 +214,540 @@ async fn test_da_task_storage_failure() { run_test![inputs, da_script].await; } + + +/// Test the DA Task for handling an outdated proposal. +/// +/// This test checks that when an outdated DA proposal is received, it doesn't produce +/// any output, while a current proposal triggers appropriate actions (validation and voting). +#[cfg_attr(async_executor_impl = "tokio", tokio::test(flavor = "multi_thread"))] +#[cfg_attr(async_executor_impl = "async-std", async_std::test)] +async fn test_da_task_outdated_proposal() { + // Setup logging and backtrace for debugging + async_compatibility_layer::logging::setup_logging(); + async_compatibility_layer::logging::setup_backtrace(); + + // Parameters for the test + let node_id: u64 = 2; + let num_nodes: usize = 10; + let da_committee_size: usize = 7; + + // Initialize test description with node and committee details + let test_description = TestDescription { + num_nodes_with_stake: num_nodes, + da_staked_committee_size: da_committee_size, + start_nodes: num_nodes, + ..TestDescription::default() + }; + + // Generate a launcher for the test system with a custom configuration + let launcher = test_description + .gen_launcher(node_id) + .modify_default_config(|config| { + config.next_view_timeout = 1000; + config.timeout_ratio = (12, 10); + config.da_staked_committee_size = da_committee_size; + }); + + // Build the system handle using the launcher configuration + let handle = + build_system_handle_from_launcher::(node_id, launcher, None) + .await + .expect("Failed to initialize HotShot"); + + // Prepare empty transactions and compute a commitment for later use + let transactions = vec![TestTransaction::new(vec![0])]; + let encoded_transactions = Arc::from(TestTransaction::encode(&transactions)); + let (payload_commit, _precompute) = precompute_vid_commitment( + &encoded_transactions, + handle.hotshot.memberships.quorum_membership.total_nodes(), + ); + + // Initialize a view generator using the current memberships + let mut view_generator = TestViewGenerator::generate( + handle.hotshot.memberships.quorum_membership.clone(), + handle.hotshot.memberships.da_membership.clone(), + ); + + // Generate views for the test + let view1 = view_generator.next().await.unwrap(); + let _view2 = view_generator.next().await.unwrap(); + view_generator.add_transactions(transactions); + let view3 = view_generator.next().await.unwrap(); + + // Define input events for the test: + // 1. Three view changes and an outdated proposal for view 1 when in view 3 + // 2. A current proposal for view 3 + let inputs = vec![ + serial![ + HotShotEvent::ViewChange(ViewNumber::new(1)), + HotShotEvent::ViewChange(ViewNumber::new(2)), + HotShotEvent::ViewChange(ViewNumber::new(3)), + // Send an outdated proposal (view 1) when we're in view 3 + HotShotEvent::DaProposalRecv(view1.da_proposal.clone(), view1.leader_public_key), + ], + serial![ + // Send a current proposal (view 3) + HotShotEvent::DaProposalRecv(view3.da_proposal.clone(), view3.leader_public_key), + ], + ]; + + // Define expectations: + // 1. No output for the outdated proposal + // 2. Validation and voting actions for the current proposal + let expectations = vec![ + Expectations::from_outputs(vec![]), + Expectations::from_outputs(vec![ + exact(HotShotEvent::DaProposalValidated( + view3.da_proposal.clone(), + view3.leader_public_key, + )), + exact(HotShotEvent::DaVoteSend(view3.create_da_vote( + DaData { + payload_commit, + }, + &handle, + ))), + ]), + ]; + + // Define expectations for external events triggered by the system + let external_event_expectations = vec![expect_external_events(vec![ext_event_exact(Event { + view_number: view3.view_number, + event: EventType::DaProposal { + proposal: view3.da_proposal.clone(), + sender: view3.leader_public_key, + }, + })])]; + + // Create DA task state and script for the test + let da_state = DaTaskState::::create_from(&handle).await; + let mut da_script = TaskScript { + timeout: Duration::from_millis(100), + state: da_state, + expectations, + }; + + // Run the test with the inputs and check the resulting events + let output_event_stream_recv = handle.event_stream(); + run_test![inputs, da_script].await; + + // Validate the external events against expectations + let result = check_external_events( + output_event_stream_recv, + &external_event_expectations, + da_script.timeout, + ) + .await; + assert!(result.is_ok(), "{}", result.err().unwrap()); +} + +/// Test the DA Task for handling duplicate votes. +/// +/// This test ensures that when duplicate votes are received in the DA Task, +/// they are correctly ignored without producing any output. The original +/// proposal and vote are processed as expected, validating the proposal +/// and sending the first vote, while the duplicate vote is disregarded. +#[cfg_attr(async_executor_impl = "tokio", tokio::test(flavor = "multi_thread"))] +#[cfg_attr(async_executor_impl = "async-std", async_std::test)] +async fn test_da_task_duplicate_votes() { + // Setup logging and backtrace for debugging + async_compatibility_layer::logging::setup_logging(); + async_compatibility_layer::logging::setup_backtrace(); + + // Parameters for the test + let node_id: u64 = 2; + let num_nodes: usize = 10; + let da_committee_size: usize = 7; + + // Initialize test description with node and committee details + let test_description = TestDescription { + num_nodes_with_stake: num_nodes, + da_staked_committee_size: da_committee_size, + start_nodes: num_nodes, + ..TestDescription::default() + }; + + // Generate a launcher for the test system with a custom configuration + let launcher = test_description + .gen_launcher(node_id) + .modify_default_config(|config| { + config.next_view_timeout = 1000; + config.timeout_ratio = (12, 10); + config.da_staked_committee_size = da_committee_size; + }); + + // Build the system handle using the launcher configuration + let handle = + build_system_handle_from_launcher::(node_id, launcher, None) + .await + .expect("Failed to initialize HotShot"); + + // Prepare empty transactions and compute a commitment for later use + let transactions = vec![TestTransaction::new(vec![0])]; + let encoded_transactions = Arc::from(TestTransaction::encode(&transactions)); + let (payload_commit, _precompute) = precompute_vid_commitment( + &encoded_transactions, + handle.hotshot.memberships.quorum_membership.total_nodes(), + ); + + // Initialize a view generator using the current memberships + let mut view_generator = TestViewGenerator::generate( + handle.hotshot.memberships.quorum_membership.clone(), + handle.hotshot.memberships.da_membership.clone(), + ); + + // Generate views for the test + let _view1 = view_generator.next().await.unwrap(); + view_generator.add_transactions(transactions); + let view2 = view_generator.next().await.unwrap(); + + // Create a duplicate vote + let duplicate_vote = view2.create_da_vote( + DaData { + payload_commit, + }, + &handle, + ); + + let inputs = vec![ + serial![ + HotShotEvent::ViewChange(ViewNumber::new(1)), + HotShotEvent::ViewChange(ViewNumber::new(2)), + HotShotEvent::DaProposalRecv(view2.da_proposal.clone(), view2.leader_public_key), + ], + serial![ + // Send the original vote + HotShotEvent::DaVoteRecv(duplicate_vote.clone()), + // Send the duplicate vote + HotShotEvent::DaVoteRecv(duplicate_vote.clone()), + ], + ]; + + // We expect the task to process the proposal and the first vote, but ignore the duplicate + let expectations = vec![ + Expectations::from_outputs(vec![ + exact(HotShotEvent::DaProposalValidated( + view2.da_proposal.clone(), + view2.leader_public_key, + )), + exact(HotShotEvent::DaVoteSend(duplicate_vote.clone())), + ]), + Expectations::from_outputs(vec![ + // No output expected for the duplicate vote + ]), + ]; + + // We expect to see an external event for the proposal, but not for individual votes + let external_event_expectations = vec![expect_external_events(vec![ext_event_exact(Event { + view_number: view2.view_number, + event: EventType::DaProposal { + proposal: view2.da_proposal.clone(), + sender: view2.leader_public_key, + }, + })])]; + + // Create DA task state and script for the test + let da_state = DaTaskState::::create_from(&handle).await; + let mut da_script = TaskScript { + timeout: Duration::from_millis(100), + state: da_state, + expectations, + }; + + // Run the test with the inputs and check the resulting events + let output_event_stream_recv = handle.event_stream(); + run_test![inputs, da_script].await; + + // Validate the external events against expectations + let result = check_external_events( + output_event_stream_recv, + &external_event_expectations, + da_script.timeout, + ) + .await; + assert!(result.is_ok(), "{}", result.err().unwrap()); +} + +/// Tests the DA Task for collecting and processing valid votes. +/// +/// This test verifies that the DA Task correctly handles the proposal and votes +/// from DA committee members. It does the following: +/// +/// 1. Initializes a test environment with multiple nodes, configuring one as the leader. +/// 2. Creates and sends a DA proposal followed by valid votes from the committee. +/// 3. Asserts that the proposal is validated and a vote is sent in response by the leader. +/// +/// The test ensures that only the intended vote is processed. +#[cfg_attr(async_executor_impl = "tokio", tokio::test(flavor = "multi_thread"))] +#[cfg_attr(async_executor_impl = "async-std", async_std::test)] +async fn test_da_task_vote_collection() { + // Initialize logging and backtrace for debugging + async_compatibility_layer::logging::setup_logging(); + async_compatibility_layer::logging::setup_backtrace(); + + // Setup: Create system handles for multiple nodes (at least 5 for DA committee) + let leader_node_id: usize = 4; + let num_nodes: usize = 6; + let da_committee_size: usize = 5; + + let test_description = TestDescription { + num_nodes_with_stake: num_nodes, + da_staked_committee_size: da_committee_size, + start_nodes: num_nodes, + ..TestDescription::default() + }; + + // Create handles and view generators for all nodes + let mut handles = Vec::new(); + let mut view_generators = Vec::new(); + + for node_id in 0..num_nodes as u64 { + let launcher = test_description + .clone() + .gen_launcher(node_id) + .modify_default_config(|config| { + config.next_view_timeout = 1000; + config.timeout_ratio = (12, 10); + config.da_staked_committee_size = da_committee_size; + }); + + let handle = + build_system_handle_from_launcher::(node_id, launcher, None) + .await + .expect("Failed to initialize HotShot"); + + // Create the scenario with necessary views + let view_generator = TestViewGenerator::generate( + handle.hotshot.memberships.quorum_membership.clone(), + handle.hotshot.memberships.da_membership.clone(), + ); + + handles.push(handle); + view_generators.push(view_generator); + } + + // Prepare empty transactions and compute a commitment for later use + let transactions = vec![TestTransaction::new(vec![0])]; + let encoded_transactions = Arc::from(TestTransaction::encode(&transactions)); + let (payload_commit, _precompute) = precompute_vid_commitment( + &encoded_transactions, + handles[leader_node_id] + .hotshot + .memberships + .quorum_membership + .total_nodes(), + ); + + let _view1 = view_generators[leader_node_id].next().await.unwrap(); + let _view2 = view_generators[leader_node_id].next().await.unwrap(); + let _view3 = view_generators[leader_node_id].next().await.unwrap(); + view_generators[leader_node_id].add_transactions(transactions); + let view4 = view_generators[leader_node_id].next().await.unwrap(); + + // Create votes for all nodes + let votes: Vec<_> = handles + .iter() + .map(|handle| view4.create_da_vote(DaData { payload_commit }, handle)) + .collect(); + + // Simulate sending valid votes + let inputs = vec![ + serial![ + HotShotEvent::ViewChange(ViewNumber::new(1)), + HotShotEvent::ViewChange(ViewNumber::new(2)), + HotShotEvent::ViewChange(ViewNumber::new(3)), + HotShotEvent::ViewChange(ViewNumber::new(4)), + HotShotEvent::DaProposalRecv(view4.da_proposal.clone(), view4.leader_public_key), + ], + serial![ + // Send votes from the DA committee members + HotShotEvent::DaVoteRecv(votes[0].clone()), + HotShotEvent::DaVoteRecv(votes[1].clone()), + HotShotEvent::DaVoteRecv(votes[2].clone()), + HotShotEvent::DaVoteRecv(votes[3].clone()), + ], + ]; + + // Assert the outcome + let expectations = vec![ + Expectations::from_outputs(vec![ + exact(HotShotEvent::DaProposalValidated( + view4.da_proposal.clone(), + view4.leader_public_key, + )), + exact(HotShotEvent::DaVoteSend(votes[leader_node_id].clone())), + ]), + Expectations::from_outputs(vec![exact(HotShotEvent::DacSend( + view4.da_certificate, + view4.leader_public_key, + ))]), + ]; + + // Create DA task state and script + let da_state = + DaTaskState::::create_from(&handles[leader_node_id]).await; + let mut da_script = TaskScript { + timeout: Duration::from_millis(100), + state: da_state, + expectations, + }; + + // Run the test + let output_event_stream_recv = handles[leader_node_id].event_stream(); + run_test![inputs, da_script].await; + + // Check for DacSend event in the output stream + let result = check_external_events( + output_event_stream_recv, + &[expect_external_events(vec![ext_event_exact(Event { + view_number: view4.view_number, + event: EventType::DaProposal { + proposal: view4.da_proposal.clone(), + sender: view4.leader_public_key, + }, + })])], + da_script.timeout, + ) + .await; + assert!(result.is_ok(), "{}", result.err().unwrap()); +} + +/// Test that non-leader nodes correctly handle DA (Data Availability) votes and +/// ignore the certificate event when the node is not the leader during the voting process. +/// +/// This test sets up a scenario where a specific node (not the leader) processes +/// multiple views and receives votes from DA committee members. The test checks +/// that the expected outcome occurs, verifying that a `DacSend` event is not +/// generated since the node is not the leader. +#[cfg_attr(async_executor_impl = "tokio", tokio::test(flavor = "multi_thread"))] +#[cfg_attr(async_executor_impl = "async-std", async_std::test)] +async fn test_da_task_non_leader_vote_collection_ignore() { + // Initialize logging and backtrace for debugging + async_compatibility_layer::logging::setup_logging(); + async_compatibility_layer::logging::setup_backtrace(); + + // Setup: Create system handles for multiple nodes (at least 5 for DA committee) + let leader_node_id: usize = 3; + let num_nodes: usize = 6; + let da_committee_size: usize = 5; + + let test_description = TestDescription { + num_nodes_with_stake: num_nodes, + da_staked_committee_size: da_committee_size, + start_nodes: num_nodes, + ..TestDescription::default() + }; + + // Create handles and view generators for all nodes + let mut handles = Vec::new(); + let mut view_generators = Vec::new(); + + for node_id in 0..num_nodes as u64 { + let launcher = test_description + .clone() + .gen_launcher(node_id) + .modify_default_config(|config| { + config.next_view_timeout = 1000; + config.timeout_ratio = (12, 10); + config.da_staked_committee_size = da_committee_size; + }); + + let handle = + build_system_handle_from_launcher::(node_id, launcher, None) + .await + .expect("Failed to initialize HotShot"); + + // Create the scenario with necessary views + let view_generator = TestViewGenerator::generate( + handle.hotshot.memberships.quorum_membership.clone(), + handle.hotshot.memberships.da_membership.clone(), + ); + + handles.push(handle); + view_generators.push(view_generator); + } + + // Prepare empty transactions and compute a commitment for later use + let transactions = vec![TestTransaction::new(vec![0])]; + let encoded_transactions = Arc::from(TestTransaction::encode(&transactions)); + let (payload_commit, _precompute) = precompute_vid_commitment( + &encoded_transactions, + handles[leader_node_id] + .hotshot + .memberships + .quorum_membership + .total_nodes(), + ); + + // Generate multiple views and add transactions for the leader node + let _view1 = view_generators[leader_node_id].next().await.unwrap(); + let _view2 = view_generators[leader_node_id].next().await.unwrap(); + let _view3 = view_generators[leader_node_id].next().await.unwrap(); + view_generators[leader_node_id].add_transactions(transactions); + let view4 = view_generators[leader_node_id].next().await.unwrap(); + + // Create votes for all nodes + let votes: Vec<_> = handles + .iter() + .map(|handle| view4.create_da_vote(DaData { payload_commit }, handle)) + .collect(); + + // Simulate sending valid votes and a proposal to the DA committee + let inputs = vec![ + serial![ + HotShotEvent::ViewChange(ViewNumber::new(1)), + HotShotEvent::ViewChange(ViewNumber::new(2)), + HotShotEvent::ViewChange(ViewNumber::new(3)), + HotShotEvent::ViewChange(ViewNumber::new(4)), + HotShotEvent::DaProposalRecv(view4.da_proposal.clone(), view4.leader_public_key), + ], + serial![ + // Send votes from the DA committee members + HotShotEvent::DaVoteRecv(votes[0].clone()), + HotShotEvent::DaVoteRecv(votes[1].clone()), + HotShotEvent::DaVoteRecv(votes[2].clone()), + HotShotEvent::DaVoteRecv(votes[3].clone()), + ], + ]; + + // Define the expected outcome for the non-leader node + let expectations = vec![ + Expectations::from_outputs(vec![ + exact(HotShotEvent::DaProposalValidated( + view4.da_proposal.clone(), + view4.leader_public_key, + )), + exact(HotShotEvent::DaVoteSend(votes[leader_node_id].clone())), + ]), + Expectations::from_outputs(vec![]), + ]; + + // Create DA task state and script for the test + let da_state = + DaTaskState::::create_from(&handles[leader_node_id]).await; + let mut da_script = TaskScript { + timeout: Duration::from_millis(100), + state: da_state, + expectations, + }; + + // Run the test with the prepared inputs and script + let output_event_stream_recv = handles[leader_node_id].event_stream(); + run_test![inputs, da_script].await; + + // Verify that the non-leader node did not generate the DacSend event + let result = check_external_events( + output_event_stream_recv, + &[expect_external_events(vec![ext_event_exact(Event { + view_number: view4.view_number, + event: EventType::DaProposal { + proposal: view4.da_proposal.clone(), + sender: view4.leader_public_key, + }, + })])], + da_script.timeout, + ) + .await; + assert!(result.is_ok(), "{}", result.err().unwrap()); +} diff --git a/crates/testing/tests/tests_cx_hardening.rs b/crates/testing/tests/tests_cx_hardening.rs deleted file mode 100644 index e521b7ce12..0000000000 --- a/crates/testing/tests/tests_cx_hardening.rs +++ /dev/null @@ -1,3 +0,0 @@ -mod tests_cx_hardening { - automod::dir!("tests/tests_cx_hardening"); -} diff --git a/crates/testing/tests/tests_cx_hardening/cxh_da_task.rs b/crates/testing/tests/tests_cx_hardening/cxh_da_task.rs deleted file mode 100644 index 2198b4bc73..0000000000 --- a/crates/testing/tests/tests_cx_hardening/cxh_da_task.rs +++ /dev/null @@ -1,564 +0,0 @@ -use std::{sync::Arc, time::Duration}; - -use futures::StreamExt; -use hotshot::tasks::task_state::CreateTaskState; -use hotshot_example_types::{ - block_types::TestTransaction, - node_types::{MemoryImpl, TestTypes, TestVersions}, -}; -use hotshot_macros::{run_test, test_scripts}; -use hotshot_task_impls::{da::DaTaskState, events::HotShotEvent}; -use hotshot_testing::{ - helpers::{build_system_handle_from_launcher, check_external_events}, - predicates::event::{exact, expect_external_events, ext_event_exact}, - script::{Expectations, InputOrder, TaskScript}, - serial, - test_builder::TestDescription, - view_generator::TestViewGenerator, -}; -use hotshot_types::{ - data::ViewNumber, - event::{Event, EventType}, - simple_vote::DaData, - traits::{ - block_contents::precompute_vid_commitment, election::Membership, - node_implementation::ConsensusTime, - }, -}; - - -/// Test the DA Task for handling an outdated proposal. -/// -/// This test checks that when an outdated DA proposal is received, it doesn't produce -/// any output, while a current proposal triggers appropriate actions (validation and voting). -#[cfg_attr(async_executor_impl = "tokio", tokio::test(flavor = "multi_thread"))] -#[cfg_attr(async_executor_impl = "async-std", async_std::test)] -async fn test_da_task_outdated_proposal() { - // Setup logging and backtrace for debugging - async_compatibility_layer::logging::setup_logging(); - async_compatibility_layer::logging::setup_backtrace(); - - // Parameters for the test - let node_id: u64 = 2; - let num_nodes: usize = 10; - let da_committee_size: usize = 7; - - // Initialize test description with node and committee details - let test_description = TestDescription { - num_nodes_with_stake: num_nodes, - da_staked_committee_size: da_committee_size, - start_nodes: num_nodes, - ..TestDescription::default() - }; - - // Generate a launcher for the test system with a custom configuration - let launcher = test_description - .gen_launcher(node_id) - .modify_default_config(|config| { - config.next_view_timeout = 1000; - config.timeout_ratio = (12, 10); - config.da_staked_committee_size = da_committee_size; - }); - - // Build the system handle using the launcher configuration - let handle = - build_system_handle_from_launcher::(node_id, launcher, None) - .await - .expect("Failed to initialize HotShot"); - - // Prepare empty transactions and compute a commitment for later use - let transactions = vec![TestTransaction::new(vec![0])]; - let encoded_transactions = Arc::from(TestTransaction::encode(&transactions)); - let (payload_commit, _precompute) = precompute_vid_commitment( - &encoded_transactions, - handle.hotshot.memberships.quorum_membership.total_nodes(), - ); - - // Initialize a view generator using the current memberships - let mut view_generator = TestViewGenerator::generate( - handle.hotshot.memberships.quorum_membership.clone(), - handle.hotshot.memberships.da_membership.clone(), - ); - - // Generate views for the test - let view1 = view_generator.next().await.unwrap(); - let _view2 = view_generator.next().await.unwrap(); - view_generator.add_transactions(transactions); - let view3 = view_generator.next().await.unwrap(); - - // Define input events for the test: - // 1. Three view changes and an outdated proposal for view 1 when in view 3 - // 2. A current proposal for view 3 - let inputs = vec![ - serial![ - HotShotEvent::ViewChange(ViewNumber::new(1)), - HotShotEvent::ViewChange(ViewNumber::new(2)), - HotShotEvent::ViewChange(ViewNumber::new(3)), - // Send an outdated proposal (view 1) when we're in view 3 - HotShotEvent::DaProposalRecv(view1.da_proposal.clone(), view1.leader_public_key), - ], - serial![ - // Send a current proposal (view 3) - HotShotEvent::DaProposalRecv(view3.da_proposal.clone(), view3.leader_public_key), - ], - ]; - - // Define expectations: - // 1. No output for the outdated proposal - // 2. Validation and voting actions for the current proposal - let expectations = vec![ - Expectations::from_outputs(vec![]), - Expectations::from_outputs(vec![ - exact(HotShotEvent::DaProposalValidated( - view3.da_proposal.clone(), - view3.leader_public_key, - )), - exact(HotShotEvent::DaVoteSend(view3.create_da_vote( - DaData { - payload_commit, - }, - &handle, - ))), - ]), - ]; - - // Define expectations for external events triggered by the system - let external_event_expectations = vec![expect_external_events(vec![ext_event_exact(Event { - view_number: view3.view_number, - event: EventType::DaProposal { - proposal: view3.da_proposal.clone(), - sender: view3.leader_public_key, - }, - })])]; - - // Create DA task state and script for the test - let da_state = DaTaskState::::create_from(&handle).await; - let mut da_script = TaskScript { - timeout: Duration::from_millis(100), - state: da_state, - expectations, - }; - - // Run the test with the inputs and check the resulting events - let output_event_stream_recv = handle.event_stream(); - run_test![inputs, da_script].await; - - // Validate the external events against expectations - let result = check_external_events( - output_event_stream_recv, - &external_event_expectations, - da_script.timeout, - ) - .await; - assert!(result.is_ok(), "{}", result.err().unwrap()); -} - -/// Test the DA Task for handling duplicate votes. -/// -/// This test ensures that when duplicate votes are received in the DA Task, -/// they are correctly ignored without producing any output. The original -/// proposal and vote are processed as expected, validating the proposal -/// and sending the first vote, while the duplicate vote is disregarded. -#[cfg_attr(async_executor_impl = "tokio", tokio::test(flavor = "multi_thread"))] -#[cfg_attr(async_executor_impl = "async-std", async_std::test)] -async fn test_da_task_duplicate_votes() { - // Setup logging and backtrace for debugging - async_compatibility_layer::logging::setup_logging(); - async_compatibility_layer::logging::setup_backtrace(); - - // Parameters for the test - let node_id: u64 = 2; - let num_nodes: usize = 10; - let da_committee_size: usize = 7; - - // Initialize test description with node and committee details - let test_description = TestDescription { - num_nodes_with_stake: num_nodes, - da_staked_committee_size: da_committee_size, - start_nodes: num_nodes, - ..TestDescription::default() - }; - - // Generate a launcher for the test system with a custom configuration - let launcher = test_description - .gen_launcher(node_id) - .modify_default_config(|config| { - config.next_view_timeout = 1000; - config.timeout_ratio = (12, 10); - config.da_staked_committee_size = da_committee_size; - }); - - // Build the system handle using the launcher configuration - let handle = - build_system_handle_from_launcher::(node_id, launcher, None) - .await - .expect("Failed to initialize HotShot"); - - // Prepare empty transactions and compute a commitment for later use - let transactions = vec![TestTransaction::new(vec![0])]; - let encoded_transactions = Arc::from(TestTransaction::encode(&transactions)); - let (payload_commit, _precompute) = precompute_vid_commitment( - &encoded_transactions, - handle.hotshot.memberships.quorum_membership.total_nodes(), - ); - - // Initialize a view generator using the current memberships - let mut view_generator = TestViewGenerator::generate( - handle.hotshot.memberships.quorum_membership.clone(), - handle.hotshot.memberships.da_membership.clone(), - ); - - // Generate views for the test - let _view1 = view_generator.next().await.unwrap(); - view_generator.add_transactions(transactions); - let view2 = view_generator.next().await.unwrap(); - - // Create a duplicate vote - let duplicate_vote = view2.create_da_vote( - DaData { - payload_commit, - }, - &handle, - ); - - let inputs = vec![ - serial![ - HotShotEvent::ViewChange(ViewNumber::new(1)), - HotShotEvent::ViewChange(ViewNumber::new(2)), - HotShotEvent::DaProposalRecv(view2.da_proposal.clone(), view2.leader_public_key), - ], - serial![ - // Send the original vote - HotShotEvent::DaVoteRecv(duplicate_vote.clone()), - // Send the duplicate vote - HotShotEvent::DaVoteRecv(duplicate_vote.clone()), - ], - ]; - - // We expect the task to process the proposal and the first vote, but ignore the duplicate - let expectations = vec![ - Expectations::from_outputs(vec![ - exact(HotShotEvent::DaProposalValidated( - view2.da_proposal.clone(), - view2.leader_public_key, - )), - exact(HotShotEvent::DaVoteSend(duplicate_vote.clone())), - ]), - Expectations::from_outputs(vec![ - // No output expected for the duplicate vote - ]), - ]; - - // We expect to see an external event for the proposal, but not for individual votes - let external_event_expectations = vec![expect_external_events(vec![ext_event_exact(Event { - view_number: view2.view_number, - event: EventType::DaProposal { - proposal: view2.da_proposal.clone(), - sender: view2.leader_public_key, - }, - })])]; - - // Create DA task state and script for the test - let da_state = DaTaskState::::create_from(&handle).await; - let mut da_script = TaskScript { - timeout: Duration::from_millis(100), - state: da_state, - expectations, - }; - - // Run the test with the inputs and check the resulting events - let output_event_stream_recv = handle.event_stream(); - run_test![inputs, da_script].await; - - // Validate the external events against expectations - let result = check_external_events( - output_event_stream_recv, - &external_event_expectations, - da_script.timeout, - ) - .await; - assert!(result.is_ok(), "{}", result.err().unwrap()); -} - -/// Tests the DA Task for collecting and processing valid votes. -/// -/// This test verifies that the DA Task correctly handles the proposal and votes -/// from DA committee members. It does the following: -/// -/// 1. Initializes a test environment with multiple nodes, configuring one as the leader. -/// 2. Creates and sends a DA proposal followed by valid votes from the committee. -/// 3. Asserts that the proposal is validated and a vote is sent in response by the leader. -/// -/// The test ensures that only the intended vote is processed. -#[cfg_attr(async_executor_impl = "tokio", tokio::test(flavor = "multi_thread"))] -#[cfg_attr(async_executor_impl = "async-std", async_std::test)] -async fn test_da_task_vote_collection() { - // Initialize logging and backtrace for debugging - async_compatibility_layer::logging::setup_logging(); - async_compatibility_layer::logging::setup_backtrace(); - - // Setup: Create system handles for multiple nodes (at least 5 for DA committee) - let leader_node_id: usize = 4; - let num_nodes: usize = 6; - let da_committee_size: usize = 5; - - let test_description = TestDescription { - num_nodes_with_stake: num_nodes, - da_staked_committee_size: da_committee_size, - start_nodes: num_nodes, - ..TestDescription::default() - }; - - // Create handles and view generators for all nodes - let mut handles = Vec::new(); - let mut view_generators = Vec::new(); - - for node_id in 0..num_nodes as u64 { - let launcher = test_description - .clone() - .gen_launcher(node_id) - .modify_default_config(|config| { - config.next_view_timeout = 1000; - config.timeout_ratio = (12, 10); - config.da_staked_committee_size = da_committee_size; - }); - - let handle = - build_system_handle_from_launcher::(node_id, launcher, None) - .await - .expect("Failed to initialize HotShot"); - - // Create the scenario with necessary views - let view_generator = TestViewGenerator::generate( - handle.hotshot.memberships.quorum_membership.clone(), - handle.hotshot.memberships.da_membership.clone(), - ); - - handles.push(handle); - view_generators.push(view_generator); - } - - // Prepare empty transactions and compute a commitment for later use - let transactions = vec![TestTransaction::new(vec![0])]; - let encoded_transactions = Arc::from(TestTransaction::encode(&transactions)); - let (payload_commit, _precompute) = precompute_vid_commitment( - &encoded_transactions, - handles[leader_node_id] - .hotshot - .memberships - .quorum_membership - .total_nodes(), - ); - - let _view1 = view_generators[leader_node_id].next().await.unwrap(); - let _view2 = view_generators[leader_node_id].next().await.unwrap(); - let _view3 = view_generators[leader_node_id].next().await.unwrap(); - view_generators[leader_node_id].add_transactions(transactions); - let view4 = view_generators[leader_node_id].next().await.unwrap(); - - // Create votes for all nodes - let votes: Vec<_> = handles - .iter() - .map(|handle| view4.create_da_vote(DaData { payload_commit }, handle)) - .collect(); - - // Simulate sending valid votes - let inputs = vec![ - serial![ - HotShotEvent::ViewChange(ViewNumber::new(1)), - HotShotEvent::ViewChange(ViewNumber::new(2)), - HotShotEvent::ViewChange(ViewNumber::new(3)), - HotShotEvent::ViewChange(ViewNumber::new(4)), - HotShotEvent::DaProposalRecv(view4.da_proposal.clone(), view4.leader_public_key), - ], - serial![ - // Send votes from the DA committee members - HotShotEvent::DaVoteRecv(votes[0].clone()), - HotShotEvent::DaVoteRecv(votes[1].clone()), - HotShotEvent::DaVoteRecv(votes[2].clone()), - HotShotEvent::DaVoteRecv(votes[3].clone()), - ], - ]; - - // Assert the outcome - let expectations = vec![ - Expectations::from_outputs(vec![ - exact(HotShotEvent::DaProposalValidated( - view4.da_proposal.clone(), - view4.leader_public_key, - )), - exact(HotShotEvent::DaVoteSend(votes[leader_node_id].clone())), - ]), - Expectations::from_outputs(vec![exact(HotShotEvent::DacSend( - view4.da_certificate, - view4.leader_public_key, - ))]), - ]; - - // Create DA task state and script - let da_state = - DaTaskState::::create_from(&handles[leader_node_id]).await; - let mut da_script = TaskScript { - timeout: Duration::from_millis(100), - state: da_state, - expectations, - }; - - // Run the test - let output_event_stream_recv = handles[leader_node_id].event_stream(); - run_test![inputs, da_script].await; - - // Check for DacSend event in the output stream - let result = check_external_events( - output_event_stream_recv, - &[expect_external_events(vec![ext_event_exact(Event { - view_number: view4.view_number, - event: EventType::DaProposal { - proposal: view4.da_proposal.clone(), - sender: view4.leader_public_key, - }, - })])], - da_script.timeout, - ) - .await; - assert!(result.is_ok(), "{}", result.err().unwrap()); -} - -/// Test that non-leader nodes correctly handle DA (Data Availability) votes and -/// ignore the certificate event when the node is not the leader during the voting process. -/// -/// This test sets up a scenario where a specific node (not the leader) processes -/// multiple views and receives votes from DA committee members. The test checks -/// that the expected outcome occurs, verifying that a `DacSend` event is not -/// generated since the node is not the leader. -#[cfg_attr(async_executor_impl = "tokio", tokio::test(flavor = "multi_thread"))] -#[cfg_attr(async_executor_impl = "async-std", async_std::test)] -async fn test_da_task_non_leader_vote_collection_ignore() { - // Initialize logging and backtrace for debugging - async_compatibility_layer::logging::setup_logging(); - async_compatibility_layer::logging::setup_backtrace(); - - // Setup: Create system handles for multiple nodes (at least 5 for DA committee) - let leader_node_id: usize = 3; - let num_nodes: usize = 6; - let da_committee_size: usize = 5; - - let test_description = TestDescription { - num_nodes_with_stake: num_nodes, - da_staked_committee_size: da_committee_size, - start_nodes: num_nodes, - ..TestDescription::default() - }; - - // Create handles and view generators for all nodes - let mut handles = Vec::new(); - let mut view_generators = Vec::new(); - - for node_id in 0..num_nodes as u64 { - let launcher = test_description - .clone() - .gen_launcher(node_id) - .modify_default_config(|config| { - config.next_view_timeout = 1000; - config.timeout_ratio = (12, 10); - config.da_staked_committee_size = da_committee_size; - }); - - let handle = - build_system_handle_from_launcher::(node_id, launcher, None) - .await - .expect("Failed to initialize HotShot"); - - // Create the scenario with necessary views - let view_generator = TestViewGenerator::generate( - handle.hotshot.memberships.quorum_membership.clone(), - handle.hotshot.memberships.da_membership.clone(), - ); - - handles.push(handle); - view_generators.push(view_generator); - } - - // Prepare empty transactions and compute a commitment for later use - let transactions = vec![TestTransaction::new(vec![0])]; - let encoded_transactions = Arc::from(TestTransaction::encode(&transactions)); - let (payload_commit, _precompute) = precompute_vid_commitment( - &encoded_transactions, - handles[leader_node_id] - .hotshot - .memberships - .quorum_membership - .total_nodes(), - ); - - // Generate multiple views and add transactions for the leader node - let _view1 = view_generators[leader_node_id].next().await.unwrap(); - let _view2 = view_generators[leader_node_id].next().await.unwrap(); - let _view3 = view_generators[leader_node_id].next().await.unwrap(); - view_generators[leader_node_id].add_transactions(transactions); - let view4 = view_generators[leader_node_id].next().await.unwrap(); - - // Create votes for all nodes - let votes: Vec<_> = handles - .iter() - .map(|handle| view4.create_da_vote(DaData { payload_commit }, handle)) - .collect(); - - // Simulate sending valid votes and a proposal to the DA committee - let inputs = vec![ - serial![ - HotShotEvent::ViewChange(ViewNumber::new(1)), - HotShotEvent::ViewChange(ViewNumber::new(2)), - HotShotEvent::ViewChange(ViewNumber::new(3)), - HotShotEvent::ViewChange(ViewNumber::new(4)), - HotShotEvent::DaProposalRecv(view4.da_proposal.clone(), view4.leader_public_key), - ], - serial![ - // Send votes from the DA committee members - HotShotEvent::DaVoteRecv(votes[0].clone()), - HotShotEvent::DaVoteRecv(votes[1].clone()), - HotShotEvent::DaVoteRecv(votes[2].clone()), - HotShotEvent::DaVoteRecv(votes[3].clone()), - ], - ]; - - // Define the expected outcome for the non-leader node - let expectations = vec![ - Expectations::from_outputs(vec![ - exact(HotShotEvent::DaProposalValidated( - view4.da_proposal.clone(), - view4.leader_public_key, - )), - exact(HotShotEvent::DaVoteSend(votes[leader_node_id].clone())), - ]), - Expectations::from_outputs(vec![]), - ]; - - // Create DA task state and script for the test - let da_state = - DaTaskState::::create_from(&handles[leader_node_id]).await; - let mut da_script = TaskScript { - timeout: Duration::from_millis(100), - state: da_state, - expectations, - }; - - // Run the test with the prepared inputs and script - let output_event_stream_recv = handles[leader_node_id].event_stream(); - run_test![inputs, da_script].await; - - // Verify that the non-leader node did not generate the DacSend event - let result = check_external_events( - output_event_stream_recv, - &[expect_external_events(vec![ext_event_exact(Event { - view_number: view4.view_number, - event: EventType::DaProposal { - proposal: view4.da_proposal.clone(), - sender: view4.leader_public_key, - }, - })])], - da_script.timeout, - ) - .await; - assert!(result.is_ok(), "{}", result.err().unwrap()); -} From 02a3d4d012d20ceb5c26cb2ba1cbabf4ca6b6c02 Mon Sep 17 00:00:00 2001 From: shamb0 Date: Fri, 16 Aug 2024 10:59:25 +0530 Subject: [PATCH 3/3] Refactor: Address review comments Signed-off-by: shamb0 [Libp2p] DHT overhaul (#3548) Update builder marketplace API (#3573) Fix BuilderDataSource for marketplace builder (#3576) * Fix BuilderDataSource for marketplace builder Batch dependabot PRs (#3570) Bump serde_json in the all group across 1 directory (#3579) Fix upgrade lock in network task (#3580) Bump the all group with 2 updates (#3582) merge to upstream main Signed-off-by: shamb0 Refactor: Address review comments Signed-off-by: shamb0 --- .github/dependabot.yml | 11 +- Cargo.lock | 28 +- crates/builder-api/api/v0_3/builder.toml | 4 +- crates/builder-api/src/v0_3/builder.rs | 13 +- crates/builder-api/src/v0_3/data_source.rs | 9 +- crates/examples/Cargo.toml | 2 +- crates/hotshot/Cargo.toml | 2 +- crates/hotshot/src/lib.rs | 6 +- crates/hotshot/src/tasks/mod.rs | 5 +- .../src/traits/networking/libp2p_network.rs | 95 +++-- crates/libp2p-networking/Cargo.toml | 1 + .../src/network/behaviours/dht/mod.rs | 43 ++- .../src/network/behaviours/dht/record.rs | 358 ++++++++++++++++++ .../src/network/behaviours/dht/store.rs | 166 ++++++++ crates/libp2p-networking/src/network/def.rs | 23 +- crates/libp2p-networking/src/network/node.rs | 24 +- .../src/network/node/handle.rs | 79 ++-- crates/libp2p-networking/tests/counter.rs | 39 +- crates/task-impls/src/builder.rs | 13 +- crates/task-impls/src/transactions.rs | 325 +++++++++------- crates/testing/Cargo.toml | 2 +- crates/testing/src/helpers.rs | 50 +-- crates/testing/src/predicates/event.rs | 50 +-- crates/testing/src/script.rs | 12 +- crates/testing/tests/tests_1/da_task.rs | 79 +--- crates/types/src/event.rs | 64 ---- crates/types/src/request_response.rs | 3 +- 27 files changed, 984 insertions(+), 522 deletions(-) create mode 100644 crates/libp2p-networking/src/network/behaviours/dht/record.rs create mode 100644 crates/libp2p-networking/src/network/behaviours/dht/store.rs diff --git a/.github/dependabot.yml b/.github/dependabot.yml index 0f1cd1d508..4fbeb81866 100644 --- a/.github/dependabot.yml +++ b/.github/dependabot.yml @@ -9,5 +9,12 @@ updates: directory: "/" schedule: interval: daily - ignore: - - dependency-name: "hotshot-types" + groups: + all: + patterns: + - "*" + exclude-patterns: + - "cdn-*" + cdn: + patterns: + - "cdn-*" diff --git a/Cargo.lock b/Cargo.lock index afba7ef22a..11efc55ff5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1452,9 +1452,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.5.15" +version = "4.5.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "11d8838454fda655dafd3accb2b6e2bea645b9e4078abe84a22ceb947235c5cc" +checksum = "ed6719fffa43d0d87e5fd8caeab59be1554fb028cd30edc88fc4369b17971019" dependencies = [ "clap_builder", "clap_derive", @@ -2040,6 +2040,17 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "delegate" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4e018fccbeeb50ff26562ece792ed06659b9c2dae79ece77c4456bb10d9bf79b" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.74", +] + [[package]] name = "der" version = "0.7.9" @@ -4513,6 +4524,7 @@ dependencies = [ "bincode", "blake3", "custom_debug", + "delegate", "derive_builder", "either", "futures", @@ -6638,9 +6650,9 @@ checksum = "388a1df253eca08550bef6c72392cfe7c30914bf41df5269b68cbd6ff8f570a3" [[package]] name = "serde" -version = "1.0.207" +version = "1.0.208" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5665e14a49a4ea1b91029ba7d3bca9f299e1f7cfa194388ccc20f14743e784f2" +checksum = "cff085d2cb684faa248efb494c39b68e522822ac0de72ccf08109abde717cfb2" dependencies = [ "serde_derive", ] @@ -6667,9 +6679,9 @@ dependencies = [ [[package]] name = "serde_derive" -version = "1.0.207" +version = "1.0.208" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6aea2634c86b0e8ef2cfdc0c340baede54ec27b1e46febd7f80dffb2aa44a00e" +checksum = "24008e81ff7613ed8e5ba0cfaf24e2c2f1e5b8a0495711e44fcd4882fca62bcf" dependencies = [ "proc-macro2", "quote", @@ -6687,9 +6699,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.124" +version = "1.0.125" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "66ad62847a56b3dba58cc891acd13884b9c61138d330c0d7b6181713d4fce38d" +checksum = "83c8e735a073ccf5be70aa8066aa984eaf2fa000db6c8d0100ae605b366d31ed" dependencies = [ "itoa", "memchr", diff --git a/crates/builder-api/api/v0_3/builder.toml b/crates/builder-api/api/v0_3/builder.toml index 5614e38bb8..a7f3b92832 100644 --- a/crates/builder-api/api/v0_3/builder.toml +++ b/crates/builder-api/api/v0_3/builder.toml @@ -27,7 +27,9 @@ DESCRIPTION = "" FORMAT_VERSION = "0.1.0" [route.bundle] -PATH = ["bundle/:view_number"] +PATH = ["bundle/:parent_view/:parent_hash/:view_number"] +":parent_view" = "Integer" +":parent_hash" = "TaggedBase64" ":view_number" = "Integer" DOC = """ Fetch the bundle from the builder for the specified view. diff --git a/crates/builder-api/src/v0_3/builder.rs b/crates/builder-api/src/v0_3/builder.rs index b064fd84af..6c17dd0cee 100644 --- a/crates/builder-api/src/v0_3/builder.rs +++ b/crates/builder-api/src/v0_3/builder.rs @@ -26,10 +26,17 @@ where api.with_version("0.0.3".parse().unwrap()) .get("bundle", |req, state| { async move { + let parent_view = req.integer_param("parent_view")?; + let parent_hash = req.blob_param("parent_hash")?; let view_number = req.integer_param("view_number")?; - state.bundle(view_number).await.context(BlockClaimSnafu { - resource: view_number.to_string(), - }) + state + .bundle(parent_view, &parent_hash, view_number) + .await + .with_context(|_| BlockClaimSnafu { + resource: format!( + "Block for parent {parent_hash}@{parent_view} and view {view_number}" + ), + }) } .boxed() })? diff --git a/crates/builder-api/src/v0_3/data_source.rs b/crates/builder-api/src/v0_3/data_source.rs index d37acd3dde..60a77c37f3 100644 --- a/crates/builder-api/src/v0_3/data_source.rs +++ b/crates/builder-api/src/v0_3/data_source.rs @@ -1,5 +1,5 @@ use async_trait::async_trait; -use hotshot_types::{bundle::Bundle, traits::node_implementation::NodeType}; +use hotshot_types::{bundle::Bundle, traits::node_implementation::NodeType, vid::VidCommitment}; use super::builder::BuildError; /// No changes to these types @@ -8,7 +8,12 @@ pub use crate::v0_1::data_source::AcceptsTxnSubmits; #[async_trait] pub trait BuilderDataSource { /// To get the list of available blocks - async fn bundle(&self, view_number: u64) -> Result, BuildError>; + async fn bundle( + &self, + parent_view: u64, + parent_hash: &VidCommitment, + view_number: u64, + ) -> Result, BuildError>; /// To get the builder's address async fn builder_address(&self) -> Result; diff --git a/crates/examples/Cargo.toml b/crates/examples/Cargo.toml index 479b0422bb..6e0a36da7f 100644 --- a/crates/examples/Cargo.toml +++ b/crates/examples/Cargo.toml @@ -106,7 +106,7 @@ surf-disco = { workspace = true } time = { workspace = true } derive_more = { workspace = true } portpicker = "0.1" -lru = "0.12" +lru.workspace = true hotshot-task = { path = "../task" } hotshot = { path = "../hotshot" } hotshot-example-types = { path = "../example-types" } diff --git a/crates/hotshot/Cargo.toml b/crates/hotshot/Cargo.toml index b17b060a73..696a9cc7c4 100644 --- a/crates/hotshot/Cargo.toml +++ b/crates/hotshot/Cargo.toml @@ -46,7 +46,7 @@ hotshot-task-impls = { path = "../task-impls", version = "0.5.36", default-featu hotshot-types = { path = "../types" } libp2p-identity = { workspace = true } libp2p-networking = { workspace = true } -lru = "0.12" +lru.workspace = true portpicker = "0.1" rand = { workspace = true } serde = { workspace = true, features = ["rc"] } diff --git a/crates/hotshot/src/lib.rs b/crates/hotshot/src/lib.rs index 40d0b4bda7..85df5edac5 100644 --- a/crates/hotshot/src/lib.rs +++ b/crates/hotshot/src/lib.rs @@ -637,10 +637,7 @@ impl, V: Versions> SystemContext, V: Versions> SystemContext { /// Creates a lightweight version of the system handle for task state testing. /// /// This method provides a minimal context for task state tests, omitting the full @@ -648,7 +645,8 @@ impl, V: Versions> SystemContext` with minimal setup for task state testing. - pub fn create_lean_test_handle(&self) -> SystemContextHandle { + #[cfg(feature = "hotshot-testing")] + pub fn build_inactive_handle(&self) -> SystemContextHandle { let (internal_sender, internal_receiver) = broadcast(EVENT_CHANNEL_SIZE); SystemContextHandle { diff --git a/crates/hotshot/src/tasks/mod.rs b/crates/hotshot/src/tasks/mod.rs index ea5710dd23..269c771ff0 100644 --- a/crates/hotshot/src/tasks/mod.rs +++ b/crates/hotshot/src/tasks/mod.rs @@ -48,8 +48,7 @@ use vbs::version::StaticVersionType; use crate::{ tasks::task_state::CreateTaskState, types::SystemContextHandle, ConsensusApi, ConsensusMetricsValue, ConsensusTaskRegistry, HotShotConfig, HotShotInitializer, - MarketplaceConfig, Memberships, NetworkTaskRegistry, SignatureKey, SystemContext, UpgradeLock, - Versions, + MarketplaceConfig, Memberships, NetworkTaskRegistry, SignatureKey, SystemContext, Versions, }; /// event for global event stream @@ -193,7 +192,7 @@ pub fn add_network_event_task< membership, filter, storage: Arc::clone(&handle.storage()), - upgrade_lock: UpgradeLock::new(), + upgrade_lock: handle.hotshot.upgrade_lock.clone(), }; let task = Task::new( network_state, diff --git a/crates/hotshot/src/traits/networking/libp2p_network.rs b/crates/hotshot/src/traits/networking/libp2p_network.rs index 1aaada8428..b334c27fe4 100644 --- a/crates/hotshot/src/traits/networking/libp2p_network.rs +++ b/crates/hotshot/src/traits/networking/libp2p_network.rs @@ -39,12 +39,16 @@ use futures::{ FutureExt, StreamExt, }; use hotshot_orchestrator::config::NetworkConfig; +#[cfg(feature = "hotshot-testing")] +use hotshot_types::traits::network::{ + AsyncGenerator, NetworkReliability, TestableNetworkingImplementation, +}; use hotshot_types::{ boxed_sync, constants::LOOK_AHEAD, data::ViewNumber, message::{DataMessage::DataResponse, Message, MessageKind}, - request_response::{NetworkMsgResponseChannel, Request, Response}, + request_response::{NetworkMsgResponseChannel, Request, Response, TakeReceiver}, traits::{ election::Membership, metrics::{Counter, Gauge, Metrics, NoMetrics}, @@ -54,17 +58,13 @@ use hotshot_types::{ }, BoxSyncFuture, }; -#[cfg(feature = "hotshot-testing")] -use hotshot_types::{ - request_response::TakeReceiver, - traits::network::{AsyncGenerator, NetworkReliability, TestableNetworkingImplementation}, -}; use libp2p_identity::{ ed25519::{self, SecretKey}, Keypair, PeerId, }; use libp2p_networking::{ network::{ + behaviours::dht::record::{Namespace, RecordKey, RecordValue}, spawn_network_node, transport::construct_auth_message, MeshParams, @@ -76,7 +76,7 @@ use libp2p_networking::{ }; use rand::{rngs::StdRng, seq::IteratorRandom, SeedableRng}; use serde::Serialize; -use tracing::{debug, error, info, instrument, warn}; +use tracing::{debug, error, info, instrument, trace, warn}; use crate::BroadcastDelay; @@ -255,6 +255,19 @@ impl TestableNetworkingImplementation let privkey = TYPES::SignatureKey::generated_from_seed_indexed([0u8; 32], node_id).1; let pubkey = TYPES::SignatureKey::from_private(&privkey); + + // Derive the Libp2p keypair from the private key + let libp2p_keypair = derive_libp2p_keypair::(&privkey) + .expect("Failed to derive libp2p keypair"); + + // Sign the lookup record + let lookup_record_value = RecordValue::new_signed( + &RecordKey::new(Namespace::Lookup, pubkey.to_bytes()), + libp2p_keypair.public().to_peer_id().to_bytes(), + &privkey, + ) + .expect("Failed to sign DHT lookup record"); + // we want the majority of peers to have this lying around. let replication_factor = NonZeroUsize::new(2 * expected_node_count / 3).unwrap(); let config = if node_id < num_bootstrap as u64 { @@ -269,6 +282,7 @@ impl TestableNetworkingImplementation mesh_n: (expected_node_count / 2 + 3), })) .server_mode(true) + .identity(libp2p_keypair) .replication_factor(replication_factor) .node_type(NetworkNodeType::Bootstrap) .bound_addr(Some(addr)) @@ -290,6 +304,7 @@ impl TestableNetworkingImplementation mesh_n: 8, })) .server_mode(true) + .identity(libp2p_keypair) .replication_factor(replication_factor) .node_type(NetworkNodeType::Regular) .bound_addr(Some(addr)) @@ -304,6 +319,7 @@ impl TestableNetworkingImplementation let node_ids_ref = Arc::clone(&node_ids); let da = da_keys.clone(); let reliability_config_dup = reliability_config.clone(); + Box::pin(async move { // If it's the second time we are starting this network, clear the bootstrap info let mut write_ids = node_ids_ref.write().await; @@ -318,6 +334,7 @@ impl TestableNetworkingImplementation Libp2pMetricsValue::default(), config, pubkey.clone(), + lookup_record_value, bootstrap_addrs_ref, usize::try_from(node_id).unwrap(), #[cfg(feature = "hotshot-testing")] @@ -437,6 +454,15 @@ impl Libp2pNetwork { )) .with_context(|| "Failed to calculate replication factor")?; + // Sign our DHT lookup record + let lookup_record_value = RecordValue::new_signed( + &RecordKey::new(Namespace::Lookup, pub_key.to_bytes()), + // The value is our Libp2p Peer ID + keypair.public().to_peer_id().to_bytes(), + priv_key, + ) + .with_context(|| "Failed to sign DHT lookup record")?; + config_builder .server_mode(libp2p_config.server_mode) .identity(keypair) @@ -477,6 +503,7 @@ impl Libp2pNetwork { metrics, node_config, pub_key.clone(), + lookup_record_value, Arc::new(RwLock::new(bootstrap_nodes)), usize::try_from(config.node_index)?, #[cfg(feature = "hotshot-testing")] @@ -519,12 +546,13 @@ impl Libp2pNetwork { metrics: Libp2pMetricsValue, config: NetworkNodeConfig, pk: K, + lookup_record_value: RecordValue, bootstrap_addrs: BootstrapAddrs, id: usize, #[cfg(feature = "hotshot-testing")] reliability_config: Option>, is_da: bool, ) -> Result, NetworkError> { - let (mut rx, network_handle) = spawn_network_node(config.clone(), id) + let (mut rx, network_handle) = spawn_network_node::(config.clone(), id) .await .map_err(Into::::into)?; // Make bootstrap mappings known @@ -588,7 +616,7 @@ impl Libp2pNetwork { result.handle_event_generator(sender, requests_tx, rx); result.spawn_node_lookup(node_lookup_recv); - result.spawn_connect(id); + result.spawn_connect(id, lookup_record_value); Ok(result) } @@ -608,19 +636,12 @@ impl Libp2pNetwork { #[allow(clippy::cast_possible_truncation)] const THRESHOLD: u64 = (LOOK_AHEAD as f64 * 0.8) as u64; - debug!("Performing lookup for peer {:?}", pk); + trace!("Performing lookup for peer {:?}", pk); // only run if we are not too close to the next view number if latest_seen_view.load(Ordering::Relaxed) + THRESHOLD <= *view_number { - let pk_bytes = match bincode::serialize(&pk) { - Ok(serialized) => serialized, - Err(e) => { - tracing::error!("Failed to serialize public key; this should never happen. Error: {e}"); - return; - } - }; // look up - if let Err(err) = handle.lookup_node(&pk_bytes, dht_timeout).await { + if let Err(err) = handle.lookup_node(&pk.to_bytes(), dht_timeout).await { warn!("Failed to perform lookup for key {:?}: {}", pk, err); }; } @@ -629,7 +650,7 @@ impl Libp2pNetwork { } /// Initiates connection to the outside world - fn spawn_connect(&mut self, id: usize) { + fn spawn_connect(&mut self, id: usize, lookup_record_value: RecordValue) { let pk = self.inner.pk.clone(); let bootstrap_ref = Arc::clone(&self.inner.bootstrap_addrs); let handle = Arc::clone(&self.inner.handle); @@ -660,15 +681,12 @@ impl Libp2pNetwork { handle.subscribe("DA".to_string()).await.unwrap(); } - // we want our records published before - // we begin participating in consensus - // - // Note: this serialization should never fail, - // and if it does the error is unrecoverable. + // Map our staking key to our Libp2p Peer ID so we can properly + // route direct messages while handle .put_record( - &bincode::serialize(&pk).unwrap(), - &bincode::serialize(&handle.peer_id()).unwrap(), + RecordKey::new(Namespace::Lookup, pk.to_bytes()), + lookup_record_value.clone(), ) .await .is_err() @@ -676,19 +694,6 @@ impl Libp2pNetwork { async_sleep(Duration::from_secs(1)).await; } - while handle - .put_record( - &bincode::serialize(&handle.peer_id()).unwrap(), - &bincode::serialize(&pk).unwrap(), - ) - .await - .is_err() - { - async_sleep(Duration::from_secs(1)).await; - } - - info!("Finished putting Kademlia records"); - // Wait for the network to connect to the required number of peers if let Err(e) = handle.wait_to_connect(4, id).await { error!("Failed to connect to peers: {:?}", e); @@ -826,11 +831,7 @@ impl ConnectedNetwork for Libp2pNetwork { let pid = match self .inner .handle - .lookup_node( - &bincode::serialize(&recipient) - .map_err(|e| NetworkError::Libp2p { source: e.into() })?, - self.inner.dht_timeout, - ) + .lookup_node(&recipient.to_bytes(), self.inner.dht_timeout) .await { Ok(pid) => pid, @@ -1043,11 +1044,7 @@ impl ConnectedNetwork for Libp2pNetwork { let pid = match self .inner .handle - .lookup_node( - &bincode::serialize(&recipient) - .map_err(|e| NetworkError::Libp2p { source: e.into() })?, - self.inner.dht_timeout, - ) + .lookup_node(&recipient.to_bytes(), self.inner.dht_timeout) .await { Ok(pid) => pid, diff --git a/crates/libp2p-networking/Cargo.toml b/crates/libp2p-networking/Cargo.toml index 5e2d53f5f9..01c46f91d4 100644 --- a/crates/libp2p-networking/Cargo.toml +++ b/crates/libp2p-networking/Cargo.toml @@ -19,6 +19,7 @@ async-trait = { workspace = true } blake3 = { workspace = true } bincode = { workspace = true } custom_debug = { workspace = true } +delegate = "0.12" derive_builder = "0.20" either = { workspace = true } futures = { workspace = true } diff --git a/crates/libp2p-networking/src/network/behaviours/dht/mod.rs b/crates/libp2p-networking/src/network/behaviours/dht/mod.rs index d7245f8226..c65969c870 100644 --- a/crates/libp2p-networking/src/network/behaviours/dht/mod.rs +++ b/crates/libp2p-networking/src/network/behaviours/dht/mod.rs @@ -8,6 +8,7 @@ pub mod bootstrap; use std::{ collections::{HashMap, HashSet}, + marker::PhantomData, num::NonZeroUsize, time::Duration, }; @@ -18,6 +19,7 @@ use futures::{ channel::{mpsc, oneshot::Sender}, SinkExt, }; +use hotshot_types::traits::signature_key::SignatureKey; use lazy_static::lazy_static; use libp2p::kad::{ /* handler::KademliaHandlerIn, */ store::MemoryStore, BootstrapOk, GetClosestPeersOk, @@ -27,8 +29,15 @@ use libp2p::kad::{ store::RecordStore, Behaviour as KademliaBehaviour, BootstrapError, Event as KademliaEvent, }; use libp2p_identity::PeerId; +use store::ValidatedStore; use tracing::{debug, error, info, warn}; +/// Additional DHT record functionality +pub mod record; + +/// Additional DHT store functionality +pub mod store; + /// the number of nodes required to get an answer from /// in order to trust that the answer is correct when retrieving from the DHT pub(crate) const NUM_REPLICATED_TO_TRUST: usize = 2; @@ -48,7 +57,7 @@ use crate::network::{ClientRequest, NetworkEvent}; /// - bootstrapping into the network /// - peer discovery #[derive(Debug)] -pub struct DHTBehaviour { +pub struct DHTBehaviour { /// in progress queries for nearby peers pub in_progress_get_closest_peers: HashMap>, /// List of in-progress get requests @@ -67,6 +76,9 @@ pub struct DHTBehaviour { retry_tx: Option>, /// Sender to the bootstrap task bootstrap_tx: Option>, + + /// Phantom type for the key + phantom: PhantomData, } /// State of bootstrapping @@ -94,7 +106,7 @@ pub enum DHTEvent { IsBootstrapped, } -impl DHTBehaviour { +impl DHTBehaviour { /// Give the handler a way to retry requests. pub fn set_retry(&mut self, tx: UnboundedSender) { self.retry_tx = Some(tx); @@ -124,11 +136,15 @@ impl DHTBehaviour { replication_factor, retry_tx: None, bootstrap_tx: None, + phantom: PhantomData, } } /// print out the routing table to stderr - pub fn print_routing_table(&mut self, kadem: &mut KademliaBehaviour) { + pub fn print_routing_table( + &mut self, + kadem: &mut KademliaBehaviour>, + ) { let mut err = format!("KBUCKETS: PID: {:?}, ", self.peer_id); let v = kadem.kbuckets().collect::>(); for i in v { @@ -160,14 +176,14 @@ impl DHTBehaviour { /// Value (serialized) is sent over `chan`, and if a value is not found, /// a [`crate::network::error::DHTError`] is sent instead. /// NOTE: noop if `retry_count` is 0 - pub fn record( + pub fn get_record( &mut self, key: Vec, chan: Sender>, factor: NonZeroUsize, backoff: ExponentialBackoff, retry_count: u8, - kad: &mut KademliaBehaviour, + kad: &mut KademliaBehaviour>, ) { // noop if retry_count == 0 { @@ -235,7 +251,7 @@ impl DHTBehaviour { /// update state based on recv-ed get query fn handle_get_query( &mut self, - store: &mut MemoryStore, + store: &mut ValidatedStore, record_results: GetRecordResult, id: QueryId, mut last: bool, @@ -314,10 +330,14 @@ impl DHTBehaviour { publisher: None, expires: None, }; - let _ = store.put(record); - // return value - if notify.send(r).is_err() { - error!("Get DHT: channel closed before get record request result could be sent"); + + // Only return the record if we can store it (validation passed) + if store.put(record).is_ok() { + if notify.send(r).is_err() { + error!("Get DHT: channel closed before get record request result could be sent"); + } + } else { + error!("Failed to store record in local store"); } } // disagreement => query more nodes @@ -391,7 +411,7 @@ impl DHTBehaviour { pub fn dht_handle_event( &mut self, event: KademliaEvent, - store: &mut MemoryStore, + store: &mut ValidatedStore, ) -> Option { match event { KademliaEvent::OutboundQueryProgressed { @@ -470,7 +490,6 @@ impl DHTBehaviour { KademliaEvent::UnroutablePeer { peer } => { debug!("Found unroutable peer {:?}", peer); } - KademliaEvent::InboundRequest { request: _r } => {} KademliaEvent::RoutingUpdated { peer: _, is_new_peer: _, diff --git a/crates/libp2p-networking/src/network/behaviours/dht/record.rs b/crates/libp2p-networking/src/network/behaviours/dht/record.rs new file mode 100644 index 0000000000..189f0f5774 --- /dev/null +++ b/crates/libp2p-networking/src/network/behaviours/dht/record.rs @@ -0,0 +1,358 @@ +use anyhow::{bail, Context, Result}; +use hotshot_types::traits::signature_key::SignatureKey; +use libp2p::kad::Record; +use serde::{Deserialize, Serialize}; +use tracing::warn; + +/// A (signed or unsigned) record value to be stored (serialized) in the DHT. +/// This is a wrapper around a value that includes a possible signature. +#[derive(Serialize, Deserialize, Clone, PartialEq, Eq, Debug)] +pub enum RecordValue { + /// A signed record value + Signed(Vec, K::PureAssembledSignatureType), + + /// An unsigned record value + Unsigned(Vec), +} + +/// The namespace of a record. This is included with the key +/// and allows for multiple types of records to be stored in the DHT. +#[repr(u8)] +#[derive(Serialize, Deserialize, Clone, Copy, Eq, PartialEq)] +pub enum Namespace { + /// A namespace for looking up P2P identities + Lookup = 0, + + /// An authenticated namespace useful for testing + #[cfg(test)] + Testing = 254, + + /// An unauthenticated namespace useful for testing + #[cfg(test)] + TestingUnauthenticated = 255, +} + +/// Require certain namespaces to be authenticated +fn requires_authentication(namespace: Namespace) -> bool { + match namespace { + Namespace::Lookup => true, + #[cfg(test)] + Namespace::Testing => true, + #[cfg(test)] + Namespace::TestingUnauthenticated => false, + } +} + +/// Allow fallible conversion from a byte to a namespace +impl TryFrom for Namespace { + type Error = anyhow::Error; + + fn try_from(value: u8) -> Result { + match value { + 0 => Ok(Self::Lookup), + #[cfg(test)] + 254 => Ok(Self::Testing), + #[cfg(test)] + 255 => Ok(Self::TestingUnauthenticated), + _ => bail!("Unknown namespace"), + } + } +} + +/// A record's key. This is a concatenation of the namespace and the key. +#[derive(Clone)] +pub struct RecordKey { + /// The namespace of the record key + pub namespace: Namespace, + + /// The actual key + pub key: Vec, +} + +impl RecordKey { + #[must_use] + /// Create and return a new record key in the given namespace + pub fn new(namespace: Namespace, key: Vec) -> Self { + Self { namespace, key } + } + + /// Convert the record key to a byte vector + #[must_use] + pub fn to_bytes(&self) -> Vec { + // Concatenate the namespace and the key + let mut bytes = vec![self.namespace as u8]; + bytes.extend_from_slice(&self.key); + bytes + } + + /// Try to convert a byte vector to a record key + /// + /// # Errors + /// If the provided array is empty + pub fn try_from_bytes(bytes: &[u8]) -> Result { + // Check if the bytes are empty + if bytes.is_empty() { + bail!("Empty record key bytes") + } + + // The first byte is the namespace + let namespace = Namespace::try_from(bytes[0])?; + + // Return the record key + Ok(Self { + namespace, + key: bytes[1..].to_vec(), + }) + } +} + +impl RecordValue { + /// Creates and returns a new signed record by signing the key and value + /// with the private key + /// + /// # Errors + /// - If we fail to sign the value + /// - If we fail to serialize the signature + pub fn new_signed( + record_key: &RecordKey, + value: Vec, + private_key: &K::PrivateKey, + ) -> Result { + // The value to sign should be the record key concatenated with the value + let mut value_to_sign = record_key.to_bytes(); + value_to_sign.extend_from_slice(&value); + + let signature = + K::sign(private_key, &value_to_sign).with_context(|| "Failed to sign record")?; + + // Return the signed record + Ok(Self::Signed(value, signature)) + } + + /// Creates and returns a new unsigned record + #[must_use] + pub fn new(value: Vec) -> Self { + Self::Unsigned(value) + } + + /// If the message requires authentication, validate the record by verifying the signature with the + /// given key + pub fn validate(&self, record_key: &RecordKey) -> bool { + // If the record requires authentication, validate the signature + if !requires_authentication(record_key.namespace) { + return true; + } + + // The record must be signed + let Self::Signed(value, signature) = self else { + warn!("Record should be signed but is not"); + return false; + }; + + // If the request is "signed", the public key is the record's key + let Ok(public_key) = K::from_bytes(record_key.key.as_slice()) else { + warn!("Failed to deserialize signer's public key"); + return false; + }; + + // The value to sign should be the record key concatenated with the value + let mut signed_value = record_key.to_bytes(); + signed_value.extend_from_slice(value); + + // Validate the signature + public_key.validate(signature, &signed_value) + } + + /// Get the underlying value of the record + pub fn value(&self) -> &[u8] { + match self { + Self::Unsigned(value) | Self::Signed(value, _) => value, + } + } +} + +impl TryFrom for RecordValue { + type Error = anyhow::Error; + + fn try_from(record: Record) -> Result { + // Deserialize the record value + let record: RecordValue = bincode::deserialize(&record.value) + .with_context(|| "Failed to deserialize record value")?; + + // Return the record + Ok(record) + } +} + +#[cfg(test)] +mod test { + use hotshot_types::signature_key::BLSPubKey; + + use super::*; + + /// Test that namespace serialization and deserialization is consistent + #[test] + fn test_namespace_serialization_parity() { + // Serialize the namespace + let namespace = Namespace::Lookup; + let bytes = namespace as u8; + + // Deserialize the namespace + let namespace = Namespace::try_from(bytes).expect("Failed to deserialize namespace"); + assert!(namespace == Namespace::Lookup, "Wrong namespace"); + } + + /// Test that record key serialization and deserialization is consistent + #[test] + fn test_record_key_serialization_parity() { + // Create a new record key + let namespace = Namespace::Lookup; + let key = vec![1, 2, 3, 4]; + let record_key = RecordKey::new(namespace, key.clone()); + + // Serialize it + let bytes = record_key.to_bytes(); + + // Deserialize it + let record_key = + RecordKey::try_from_bytes(&bytes).expect("Failed to deserialize record key"); + + // Make sure the deserialized record key is the same as the original + assert!(record_key.namespace == namespace, "Namespace mismatch"); + assert!(record_key.key == key, "Key mismatch"); + } + + /// Test that the validity of a valid, signed record is correct + #[test] + fn test_valid_signature() { + // Generate a staking keypair + let (public_key, private_key) = BLSPubKey::generated_from_seed_indexed([1; 32], 1337); + + // Create a value. The key is the public key + let value = vec![5, 6, 7, 8]; + + // Create a record key (as we need to sign both the key and the value) + let record_key = RecordKey::new(Namespace::Lookup, public_key.to_bytes()); + + // Sign the record and value with the private key + let record_value: RecordValue = + RecordValue::new_signed(&record_key, value.clone(), &private_key).unwrap(); + + // Validate the signed record + assert!( + record_value.validate(&record_key), + "Failed to validate signed record" + ); + } + + /// Test that altering the namespace byte causes a validation failure + #[test] + fn test_invalid_namespace() { + // Generate a staking keypair + let (public_key, private_key) = BLSPubKey::generated_from_seed_indexed([1; 32], 1337); + + // Create a value. The key is the public key + let value = vec![5, 6, 7, 8]; + + // Create a record key (as we need to sign both the key and the value) + let mut record_key = RecordKey::new(Namespace::Lookup, public_key.to_bytes()); + + // Sign the record and value with the private key + let record_value: RecordValue = + RecordValue::new_signed(&record_key, value.clone(), &private_key).unwrap(); + + // Alter the namespace + record_key.namespace = Namespace::Testing; + + // Validate the signed record + assert!( + !record_value.validate(&record_key), + "Failed to detect invalid namespace" + ); + } + + /// Test that altering the contents of the record key causes a validation failure + #[test] + fn test_invalid_key() { + // Generate a staking keypair + let (public_key, private_key) = BLSPubKey::generated_from_seed_indexed([1; 32], 1337); + + // Create a value. The key is the public key + let value = vec![5, 6, 7, 8]; + + // Create a record key (as we need to sign both the key and the value) + let mut record_key = RecordKey::new(Namespace::Lookup, public_key.to_bytes()); + + // Sign the record and value with the private key + let record_value: RecordValue = + RecordValue::new_signed(&record_key, value.clone(), &private_key).unwrap(); + + // Set the key to a different one + record_key.key = BLSPubKey::generated_from_seed_indexed([1; 32], 1338) + .0 + .to_bytes(); + + // Validate the signed record + assert!( + !record_value.validate(&record_key), + "Failed to detect invalid record key" + ); + } + + /// Test that unsigned records are always valid + #[test] + fn test_unsigned_record_is_valid() { + // Create a value + let value = vec![5, 6, 7, 8]; + + // Create a record key + let record_key = RecordKey::new(Namespace::TestingUnauthenticated, vec![1, 2, 3, 4]); + + // Create an unsigned record + let record_value: RecordValue = RecordValue::new(value.clone()); + + // Validate the unsigned record + assert!( + record_value.validate(&record_key), + "Failed to validate unsigned record" + ); + } + + /// Test that unauthenticated namespaces do not require validation for unsigned records + #[test] + fn test_unauthenticated_namespace() { + // Generate a staking keypair + let (public_key, _) = BLSPubKey::generated_from_seed_indexed([1; 32], 1337); + + // Create a record key (as we need to sign both the key and the value) + let record_key = RecordKey::new(Namespace::TestingUnauthenticated, public_key.to_bytes()); + + // Created an unsigned record + let record_value: RecordValue = RecordValue::new(vec![5, 6, 7, 8]); + + // Validate it + assert!( + record_value.validate(&record_key), + "Failed to validate unsigned record in unauthenticated namespace" + ); + } + + /// Test that authenticated namespaces do require validation for unsigned records + #[test] + fn test_authenticated_namespace() { + // Generate a staking keypair + let (public_key, _) = BLSPubKey::generated_from_seed_indexed([1; 32], 1337); + + // Create a record key (as we need to sign both the key and the value) + let record_key = RecordKey::new(Namespace::Lookup, public_key.to_bytes()); + + // Created an unsigned record + let record_value: RecordValue = RecordValue::new(vec![5, 6, 7, 8]); + + // Validate it + assert!( + !record_value.validate(&record_key), + "Failed to detect invalid unsigned record" + ); + } +} diff --git a/crates/libp2p-networking/src/network/behaviours/dht/store.rs b/crates/libp2p-networking/src/network/behaviours/dht/store.rs new file mode 100644 index 0000000000..6969ced1ff --- /dev/null +++ b/crates/libp2p-networking/src/network/behaviours/dht/store.rs @@ -0,0 +1,166 @@ +//! This file contains the `ValidatedStore` struct, which is a wrapper around a `RecordStore` that +//! validates records before storing them. +//! +//! The `ValidatedStore` struct is used to ensure that only valid records are stored in the DHT. + +use std::marker::PhantomData; + +use delegate::delegate; +use hotshot_types::traits::signature_key::SignatureKey; +use libp2p::kad::store::{Error, RecordStore, Result}; +use tracing::warn; + +use super::record::RecordValue; +use crate::network::behaviours::dht::record::RecordKey; + +/// A `RecordStore` wrapper that validates records before storing them. +pub struct ValidatedStore { + /// The underlying store + store: R, + + /// Phantom type for the key + phantom: std::marker::PhantomData, +} + +impl ValidatedStore { + /// Create a new `ValidatedStore` with the given underlying store + pub fn new(store: R) -> Self { + ValidatedStore { + store, + phantom: PhantomData, + } + } +} + +/// Implement the `RecordStore` trait for `ValidatedStore` +impl RecordStore for ValidatedStore +where + K: 'static, +{ + type ProvidedIter<'a> = R::ProvidedIter<'a> where R: 'a, K: 'a; + type RecordsIter<'a> = R::RecordsIter<'a> where R: 'a, K: 'a; + + // Delegate all `RecordStore` methods except `put` to the inner store + delegate! { + to self.store{ + fn add_provider(&mut self, record: libp2p::kad::ProviderRecord) -> libp2p::kad::store::Result<()>; + fn get(&self, k: &libp2p::kad::RecordKey) -> Option>; + fn provided(&self) -> Self::ProvidedIter<'_>; + fn providers(&self, key: &libp2p::kad::RecordKey) -> Vec; + fn records(&self) -> Self::RecordsIter<'_>; + fn remove(&mut self, k: &libp2p::kad::RecordKey); + fn remove_provider(&mut self, k: &libp2p::kad::RecordKey, p: &libp2p::PeerId); + } + } + + /// Overwrite the `put` method to validate the record before storing it + fn put(&mut self, record: libp2p::kad::Record) -> Result<()> { + // Convert the record to the correct type + if let Ok(record_value) = RecordValue::::try_from(record.clone()) { + // Convert the key to the correct type + let Ok(record_key) = RecordKey::try_from_bytes(&record.key.to_vec()) else { + warn!("Failed to convert record key"); + return Err(Error::MaxRecords); + }; + + // If the record is signed by the correct key, + if record_value.validate(&record_key) { + // Store the record + if let Err(err) = self.store.put(record.clone()) { + warn!("Failed to store record: {:?}", err); + return Err(Error::MaxRecords); + } + } else { + warn!("Failed to validate record"); + return Err(Error::MaxRecords); + } + } + + Ok(()) + } +} + +#[cfg(test)] +mod test { + use hotshot_types::signature_key::BLSPubKey; + use libp2p::{ + kad::{store::MemoryStore, Record}, + PeerId, + }; + + use super::*; + use crate::network::behaviours::dht::record::Namespace; + + /// Test that a valid record is stored + #[test] + fn test_valid_stored() { + // Generate a staking keypair + let (public_key, private_key) = BLSPubKey::generated_from_seed_indexed([1; 32], 1337); + + // Create a value. The key is the public key + let value = vec![5, 6, 7, 8]; + + // Create a record key (as we need to sign both the key and the value) + let record_key = RecordKey::new(Namespace::Lookup, public_key.to_bytes()); + + // Sign the record and value with the private key + let record_value: RecordValue = + RecordValue::new_signed(&record_key, value.clone(), &private_key).unwrap(); + + // Initialize the store + let mut store: ValidatedStore = + ValidatedStore::new(MemoryStore::new(PeerId::random())); + + // Serialize the record value + let record_value_bytes = + bincode::serialize(&record_value).expect("Failed to serialize record value"); + + // Create and store the record + let record = Record::new(record_key.to_bytes(), record_value_bytes); + store.put(record).expect("Failed to store record"); + + // Check that the record is stored + let libp2p_record_key = libp2p::kad::RecordKey::new(&record_key.to_bytes()); + let stored_record = store.get(&libp2p_record_key).expect("Failed to get record"); + let stored_record_value: RecordValue = + bincode::deserialize(&stored_record.value).expect("Failed to deserialize record value"); + + // Make sure the stored record is the same as the original record + assert_eq!( + record_value, stored_record_value, + "Stored record is not the same as original" + ); + } + + /// Test that an invalid record is not stored + #[test] + fn test_invalid_not_stored() { + // Generate a staking keypair + let (public_key, _) = BLSPubKey::generated_from_seed_indexed([1; 32], 1337); + + // Create a record key (as we need to sign both the key and the value) + let record_key = RecordKey::new(Namespace::Lookup, public_key.to_bytes()); + + // Create a new (unsigned, invalid) record value + let record_value: RecordValue = RecordValue::new(vec![2, 3]); + + // Initialize the store + let mut store: ValidatedStore = + ValidatedStore::new(MemoryStore::new(PeerId::random())); + + // Serialize the record value + let record_value_bytes = + bincode::serialize(&record_value).expect("Failed to serialize record value"); + + // Make sure we are unable to store the record + let record = Record::new(record_key.to_bytes(), record_value_bytes); + assert!(store.put(record).is_err(), "Should not have stored record"); + + // Check that the record is not stored + let libp2p_record_key = libp2p::kad::RecordKey::new(&record_key.to_bytes()); + assert!( + store.get(&libp2p_record_key).is_none(), + "Should not have stored record" + ); + } +} diff --git a/crates/libp2p-networking/src/network/def.rs b/crates/libp2p-networking/src/network/def.rs index b43db8a04d..9c1871c43f 100644 --- a/crates/libp2p-networking/src/network/def.rs +++ b/crates/libp2p-networking/src/network/def.rs @@ -4,6 +4,10 @@ // You should have received a copy of the MIT License // along with the HotShot repository. If not, see . +use hotshot_types::{ + request_response::{Request, Response}, + traits::signature_key::SignatureKey, +}; use libp2p::{ autonat, gossipsub::{Behaviour as GossipBehaviour, Event as GossipEvent, IdentTopic}, @@ -16,8 +20,7 @@ use libp2p_identity::PeerId; use libp2p_swarm_derive::NetworkBehaviour; use tracing::{debug, error}; -use super::NetworkEventInternal; -use hotshot_types::request_response::{Request, Response}; +use super::{behaviours::dht::store::ValidatedStore, NetworkEventInternal}; /// Overarching network behaviour performing: /// - network topology discovoery @@ -26,7 +29,7 @@ use hotshot_types::request_response::{Request, Response}; /// - connection management #[derive(NetworkBehaviour, custom_debug::Debug)] #[behaviour(to_swarm = "NetworkEventInternal")] -pub struct NetworkDef { +pub struct NetworkDef { /// purpose: broadcasting messages to many peers /// NOTE gossipsub works ONLY for sharing messages right now /// in the future it may be able to do peer discovery and routing @@ -37,7 +40,7 @@ pub struct NetworkDef { /// purpose: peer routing /// purpose: storing pub key <-> peer id bijection #[debug(skip)] - pub dht: libp2p::kad::Behaviour, + pub dht: libp2p::kad::Behaviour>, /// purpose: identifying the addresses from an outside POV #[debug(skip)] @@ -57,17 +60,17 @@ pub struct NetworkDef { pub autonat: libp2p::autonat::Behaviour, } -impl NetworkDef { +impl NetworkDef { /// Create a new instance of a `NetworkDef` #[must_use] pub fn new( gossipsub: GossipBehaviour, - dht: libp2p::kad::Behaviour, + dht: libp2p::kad::Behaviour>, identify: IdentifyBehaviour, direct_message: cbor::Behaviour, Vec>, request_response: cbor::Behaviour, autonat: autonat::Behaviour, - ) -> NetworkDef { + ) -> NetworkDef { Self { gossipsub, dht, @@ -80,7 +83,7 @@ impl NetworkDef { } /// Address functions -impl NetworkDef { +impl NetworkDef { /// Add an address pub fn add_address(&mut self, peer_id: &PeerId, address: Multiaddr) { // NOTE to get this address to play nice with the other @@ -94,7 +97,7 @@ impl NetworkDef { } /// Gossip functions -impl NetworkDef { +impl NetworkDef { /// Publish a given gossip pub fn publish_gossip(&mut self, topic: IdentTopic, contents: Vec) { if let Err(e) = self.gossipsub.publish(topic, contents) { @@ -117,7 +120,7 @@ impl NetworkDef { } /// Request/response functions -impl NetworkDef { +impl NetworkDef { /// Add a direct request for a given peer pub fn add_direct_request(&mut self, peer_id: PeerId, data: Vec) -> OutboundRequestId { self.direct_message.send_request(&peer_id, data) diff --git a/crates/libp2p-networking/src/network/node.rs b/crates/libp2p-networking/src/network/node.rs index ff77c0675b..3ef40fbab8 100644 --- a/crates/libp2p-networking/src/network/node.rs +++ b/crates/libp2p-networking/src/network/node.rs @@ -14,6 +14,7 @@ mod handle; use std::{ collections::{HashMap, HashSet}, iter, + marker::PhantomData, num::{NonZeroU32, NonZeroUsize}, time::Duration, }; @@ -63,7 +64,10 @@ pub use self::{ }, }; use super::{ - behaviours::dht::bootstrap::{self, DHTBootstrapTask, InputEvent}, + behaviours::dht::{ + bootstrap::{self, DHTBootstrapTask, InputEvent}, + store::ValidatedStore, + }, error::{GossipsubBuildSnafu, GossipsubConfigSnafu, NetworkError, TransportSnafu}, gen_transport, BoxedTransport, ClientRequest, NetworkDef, NetworkEvent, NetworkEventInternal, NetworkNodeType, @@ -93,7 +97,7 @@ pub struct NetworkNode { peer_id: PeerId, /// the swarm of networkbehaviours #[debug(skip)] - swarm: Swarm, + swarm: Swarm>, /// the configuration parameters of the netework config: NetworkNodeConfig, /// the listener id we are listening on, if it exists @@ -103,11 +107,14 @@ pub struct NetworkNode { /// Handler for direct messages direct_message_state: DMBehaviour, /// Handler for DHT Events - dht_handler: DHTBehaviour, + dht_handler: DHTBehaviour, /// Channel to resend requests, set to Some when we call `spawn_listeners` resend_tx: Option>, /// Send to the bootstrap task to tell it to start a bootstrap bootstrap_tx: Option>, + + /// Phantom data to hold the key type + pd: PhantomData, } impl NetworkNode { @@ -188,7 +195,7 @@ impl NetworkNode { .await?; // Generate the swarm - let mut swarm: Swarm = { + let mut swarm: Swarm> = { // Use the hash of the message's contents as the ID // Use blake3 for much paranoia at very high speeds let message_id_fn = |message: &GossipsubMessage| { @@ -286,7 +293,11 @@ impl NetworkNode { panic!("Replication factor not set"); } - let mut kadem = Behaviour::with_config(peer_id, MemoryStore::new(peer_id), kconfig); + let mut kadem = Behaviour::with_config( + peer_id, + ValidatedStore::new(MemoryStore::new(peer_id)), + kconfig, + ); if config.server_mode { kadem.set_mode(Some(Mode::Server)); } @@ -362,6 +373,7 @@ impl NetworkNode { ), resend_tx: None, bootstrap_tx: None, + pd: PhantomData, }) } @@ -457,7 +469,7 @@ impl NetworkNode { notify, retry_count, } => { - self.dht_handler.record( + self.dht_handler.get_record( key, notify, NonZeroUsize::new(NUM_REPLICATED_TO_TRUST).unwrap(), diff --git a/crates/libp2p-networking/src/network/node/handle.rs b/crates/libp2p-networking/src/network/node/handle.rs index 89e820c12e..6aa1804b6e 100644 --- a/crates/libp2p-networking/src/network/node/handle.rs +++ b/crates/libp2p-networking/src/network/node/handle.rs @@ -4,7 +4,7 @@ // You should have received a copy of the MIT License // along with the HotShot repository. If not, see . -use std::{collections::HashSet, fmt::Debug, time::Duration}; +use std::{collections::HashSet, fmt::Debug, marker::PhantomData, time::Duration}; use async_compatibility_layer::{ art::{async_sleep, async_timeout, future::to}, @@ -21,6 +21,7 @@ use snafu::{ResultExt, Snafu}; use tracing::{debug, info, instrument}; use crate::network::{ + behaviours::dht::record::{Namespace, RecordKey, RecordValue}, error::{CancelledRequestSnafu, DHTError}, gen_multiaddr, ClientRequest, NetworkError, NetworkEvent, NetworkNode, NetworkNodeConfig, NetworkNodeConfigBuilderError, @@ -45,6 +46,9 @@ pub struct NetworkNodeHandle { /// human readable id id: usize, + + /// Phantom data to hold the key type + pd: PhantomData, } /// internal network node receiver @@ -105,12 +109,13 @@ pub async fn spawn_network_node( recv_kill: None, }; - let handle = NetworkNodeHandle { + let handle = NetworkNodeHandle:: { network_config: config, send_network: send_chan, listen_addr, peer_id, id, + pd: PhantomData, }; Ok((receiver, handle)) } @@ -228,33 +233,45 @@ impl NetworkNodeHandle { r.await.map_err(|_| NetworkNodeHandleError::RecvError) } - /// Looks up a node's `PeerId` and attempts to validate routing + /// Looks up a node's `PeerId` by its staking key. Is authenticated through + /// `get_record` assuming each record should be signed. + /// /// # Errors - /// if the peer was unable to be looked up (did not provide a response, DNE) + /// If the DHT lookup fails pub async fn lookup_node( &self, key: &[u8], dht_timeout: Duration, ) -> Result { - // get record (from DHT) - let pid = self.record_timeout(key, dht_timeout).await?; + // Create the record key + let key = RecordKey::new(Namespace::Lookup, key.to_vec()); - // pid lookup for routing - // self.lookup_pid(pid).await?; + // Get the record from the DHT + let pid = self.get_record_timeout(key, dht_timeout).await?; - bincode::deserialize(&pid) - .map_err(|e| NetworkNodeHandleError::DeserializationError { source: e.into() }) + PeerId::from_bytes(&pid).map_err(|_| NetworkNodeHandleError::FailedToDeserialize) } /// Insert a record into the kademlia DHT /// # Errors /// - Will return [`NetworkNodeHandleError::DHTError`] when encountering an error putting to DHT /// - Will return [`NetworkNodeHandleError::SerializationError`] when unable to serialize the key or value - pub async fn put_record(&self, key: &[u8], value: &[u8]) -> Result<(), NetworkNodeHandleError> { + pub async fn put_record( + &self, + key: RecordKey, + value: RecordValue, + ) -> Result<(), NetworkNodeHandleError> { + // Serialize the key + let key = key.to_bytes(); + + // Serialize the record + let value = bincode::serialize(&value) + .map_err(|e| NetworkNodeHandleError::SerializationError { source: e.into() })?; + let (s, r) = futures::channel::oneshot::channel(); let req = ClientRequest::PutDHT { - key: key.to_vec(), - value: value.to_vec(), + key: key.clone(), + value, notify: s, }; @@ -269,23 +286,33 @@ impl NetworkNodeHandle { /// - Will return [`NetworkNodeHandleError::DHTError`] when encountering an error putting to DHT /// - Will return [`NetworkNodeHandleError::SerializationError`] when unable to serialize the key /// - Will return [`NetworkNodeHandleError::DeserializationError`] when unable to deserialize the returned value - pub async fn record( + pub async fn get_record( &self, - key: &[u8], + key: RecordKey, retry_count: u8, ) -> Result, NetworkNodeHandleError> { + // Serialize the key + let serialized_key = key.to_bytes(); + let (s, r) = futures::channel::oneshot::channel(); let req = ClientRequest::GetDHT { - key: key.to_vec(), + key: serialized_key.clone(), notify: s, retry_count, }; self.send_request(req).await?; - match r.await.context(CancelledRequestSnafu) { + // Map the error + let result = match r.await.context(CancelledRequestSnafu) { Ok(result) => Ok(result), Err(e) => Err(e).context(DHTSnafu), - } + }?; + + // Deserialize the record's value + let record: RecordValue = bincode::deserialize(&result) + .map_err(|e| NetworkNodeHandleError::DeserializationError { source: e.into() })?; + + Ok(record.value().to_vec()) } /// Get a record from the kademlia DHT with a timeout @@ -294,12 +321,12 @@ impl NetworkNodeHandle { /// - Will return [`NetworkNodeHandleError::TimeoutError`] when times out /// - Will return [`NetworkNodeHandleError::SerializationError`] when unable to serialize the key /// - Will return [`NetworkNodeHandleError::DeserializationError`] when unable to deserialize the returned value - pub async fn record_timeout( + pub async fn get_record_timeout( &self, - key: &[u8], + key: RecordKey, timeout: Duration, ) -> Result, NetworkNodeHandleError> { - let result = async_timeout(timeout, self.record(key, 3)).await; + let result = async_timeout(timeout, self.get_record(key, 3)).await; match result { Err(e) => Err(e).context(TimeoutSnafu), Ok(r) => r, @@ -314,8 +341,8 @@ impl NetworkNodeHandle { /// - Will return [`NetworkNodeHandleError::SendError`] when underlying `NetworkNode` has been killed pub async fn put_record_timeout( &self, - key: &[u8], - value: &[u8], + key: RecordKey, + value: RecordValue, timeout: Duration, ) -> Result<(), NetworkNodeHandleError> { let result = async_timeout(timeout, self.put_record(key, value)).await; @@ -554,6 +581,12 @@ pub enum NetworkNodeHandleError { }, /// no known topic matches the hashset of keys NoSuchTopic, + + /// Deserialization error + FailedToDeserialize, + + /// Signature verification error + FailedToVerify, } impl From for HotshotNetworkError { diff --git a/crates/libp2p-networking/tests/counter.rs b/crates/libp2p-networking/tests/counter.rs index 88f08af159..036991bea9 100644 --- a/crates/libp2p-networking/tests/counter.rs +++ b/crates/libp2p-networking/tests/counter.rs @@ -15,8 +15,11 @@ use async_lock::RwLock; use async_std::prelude::StreamExt; use common::{test_bed, HandleSnafu, HandleWithState, TestError}; use hotshot_types::{signature_key::BLSPubKey, traits::signature_key::SignatureKey}; -use libp2p_networking::network::{NetworkEvent, NetworkNodeHandleError}; -use rand::seq::IteratorRandom; +use libp2p_networking::network::{ + behaviours::dht::record::{Namespace, RecordKey, RecordValue}, + NetworkEvent, NetworkNodeHandleError, +}; +use rand::{rngs::StdRng, seq::IteratorRandom, Rng, SeedableRng}; use serde::{Deserialize, Serialize}; use snafu::ResultExt; #[cfg(async_executor_impl = "tokio")] @@ -373,33 +376,47 @@ async fn run_gossip_one_round( async fn run_dht_rounds( handles: &[HandleWithState], timeout: Duration, - starting_val: usize, + _starting_val: usize, num_rounds: usize, ) { let mut rng = rand::thread_rng(); for i in 0..num_rounds { debug!("begin round {}", i); let msg_handle = random_handle(handles, &mut rng); - let mut key = vec![0; DHT_KV_PADDING]; - let inc_val = u8::try_from(starting_val + i).unwrap(); - key.push(inc_val); - let mut value = vec![0; DHT_KV_PADDING]; - value.push(inc_val); + + // Create a random keypair + let mut rng = StdRng::from_entropy(); + let (public_key, private_key) = K::generated_from_seed_indexed([1; 32], rng.gen::()); + + // Create a random value to sign + let value = (0..DHT_KV_PADDING) + .map(|_| rng.gen::()) + .collect::>(); + + // Create the record key + let key = RecordKey::new(Namespace::Lookup, public_key.to_bytes().clone()); + + // Sign the value + let value = RecordValue::new_signed(&key, value, &private_key).expect("signing failed"); // put the key - msg_handle.handle.put_record(&key, &value).await.unwrap(); + msg_handle + .handle + .put_record(key.clone(), value.clone()) + .await + .unwrap(); // get the key from the other nodes for handle in handles { let result: Result, NetworkNodeHandleError> = - handle.handle.record_timeout(&key, timeout).await; + handle.handle.get_record_timeout(key.clone(), timeout).await; match result { Err(e) => { error!("DHT error {e:?} during GET"); std::process::exit(-1); } Ok(v) => { - assert_eq!(v, value); + assert_eq!(v, value.value()); } } } diff --git a/crates/task-impls/src/builder.rs b/crates/task-impls/src/builder.rs index 63927c2f4e..01c29f7e2c 100644 --- a/crates/task-impls/src/builder.rs +++ b/crates/task-impls/src/builder.rs @@ -203,7 +203,9 @@ pub mod v0_2 { /// Version 0.3: marketplace. Bundles. pub mod v0_3 { pub use hotshot_builder_api::v0_3::Version; - use hotshot_types::{bundle::Bundle, traits::node_implementation::NodeType}; + use hotshot_types::{ + bundle::Bundle, traits::node_implementation::NodeType, vid::VidCommitment, + }; use vbs::version::StaticVersion; pub use super::BuilderClientError; @@ -217,9 +219,14 @@ pub mod v0_3 { /// # Errors /// - [`BuilderClientError::NotFound`] if block isn't available /// - [`BuilderClientError::Api`] if API isn't responding or responds incorrectly - pub async fn bundle(&self, view_number: u64) -> Result, BuilderClientError> { + pub async fn bundle( + &self, + parent_view: u64, + parent_hash: VidCommitment, + view_number: u64, + ) -> Result, BuilderClientError> { self.inner - .get(&format!("bundle/{view_number}")) + .get(&format!("bundle/{parent_view}/{parent_hash}/{view_number}")) .send() .await .map_err(Into::into) diff --git a/crates/task-impls/src/transactions.rs b/crates/task-impls/src/transactions.rs index ee7203f491..fc10b549ee 100644 --- a/crates/task-impls/src/transactions.rs +++ b/crates/task-impls/src/transactions.rs @@ -9,7 +9,7 @@ use std::{ time::{Duration, Instant}, }; -use anyhow::{bail, Result}; +use anyhow::{bail, ensure, Context, Result}; use async_broadcast::{Receiver, Sender}; use async_compatibility_layer::art::{async_sleep, async_timeout}; use async_trait::async_trait; @@ -34,7 +34,7 @@ use hotshot_types::{ }; use tracing::{debug, error, instrument, warn}; use url::Url; -use vbs::version::StaticVersionType; +use vbs::version::{StaticVersionType, Version}; use vec1::Vec1; use crate::{ @@ -232,127 +232,118 @@ impl, V: Versions> TransactionTask return None; } - #[allow(clippy::too_many_lines)] - /// marketplace view change handler - pub async fn handle_view_change_marketplace( + /// Produce a block by fetching auction results from the solver and bundles from builders. + /// + /// # Errors + /// + /// Returns an error if the solver cannot be contacted, or if none of the builders respond. + async fn produce_block_marketplace( &mut self, - event_stream: &Sender>>, block_view: TYPES::Time, - ) -> Option { - let version = match self.upgrade_lock.version(block_view).await { - Ok(v) => v, - Err(err) => { - error!("Upgrade certificate requires unsupported version, refusing to request blocks: {}", err); - return None; - } - }; - - // Only request bundles and propose with a nonempty block if we are not between versions. - if !self - .upgrade_lock - .decided_upgrade_certificate - .read() - .await - .as_ref() - .is_some_and(|cert| cert.upgrading_in(block_view)) - { - let start = Instant::now(); + task_start_time: Instant, + ) -> Result> { + ensure!( + !self + .upgrade_lock + .decided_upgrade_certificate + .read() + .await + .as_ref() + .is_some_and(|cert| cert.upgrading_in(block_view)), + "Not requesting block because we are upgrading", + ); - if let Ok(maybe_auction_result) = async_timeout( - self.builder_timeout, - self.auction_results_provider - .fetch_auction_result(block_view), - ) + let (parent_view, parent_hash) = self + .last_vid_commitment_retry(block_view, task_start_time) .await - { - let auction_result = maybe_auction_result - .map_err(|e| warn!("Failed to get auction results: {e:#}")) - .unwrap_or_default(); // We continue here, as we still have fallback builder URL + .context("Failed to find parent hash in time")?; - let mut futures = Vec::new(); + let start = Instant::now(); - let mut builder_urls = auction_result.clone().urls(); - builder_urls.push(self.fallback_builder_url.clone()); - - for url in builder_urls { - futures.push(async_timeout( - self.builder_timeout.saturating_sub(start.elapsed()), - async { - let client = BuilderClientMarketplace::new(url); - client.bundle(*block_view).await - }, - )); - } + let maybe_auction_result = async_timeout( + self.builder_timeout, + self.auction_results_provider + .fetch_auction_result(block_view), + ) + .await + .context("Timeout while getting auction result")?; + + let auction_result = maybe_auction_result + .map_err(|e| warn!("Failed to get auction results: {e:#}")) + .unwrap_or_default(); // We continue here, as we still have fallback builder URL + + let mut futures = Vec::new(); + + let mut builder_urls = auction_result.clone().urls(); + builder_urls.push(self.fallback_builder_url.clone()); + + for url in builder_urls { + futures.push(async_timeout( + self.builder_timeout.saturating_sub(start.elapsed()), + async { + let client = BuilderClientMarketplace::new(url); + client.bundle(*parent_view, parent_hash, *block_view).await + }, + )); + } - let mut bundles = Vec::new(); + let mut bundles = Vec::new(); - for bundle in join_all(futures).await { - match bundle { - Ok(Ok(b)) => bundles.push(b), - _ => continue, - } + for bundle in join_all(futures).await { + match bundle { + Ok(Ok(b)) => bundles.push(b), + Ok(Err(e)) => { + tracing::debug!("Failed to retrieve bundle: {e}"); + continue; } - - let mut sequencing_fees = Vec::new(); - let mut transactions: Vec< - >::Transaction, - > = Vec::new(); - - for bundle in bundles { - sequencing_fees.push(bundle.sequencing_fee); - transactions.extend(bundle.transactions); + Err(e) => { + tracing::debug!("Failed to retrieve bundle: {e}"); + continue; } + } + } - let validated_state = self.consensus.read().await.decided_state(); - - if let (Ok(sequencing_fees), Ok((block_payload, metadata))) = ( - Vec1::try_from_vec(sequencing_fees), - TYPES::BlockPayload::from_transactions( - transactions, - &validated_state, - &Arc::clone(&self.instance_state), - ) - .await, - ) { - broadcast_event( - Arc::new(HotShotEvent::BlockRecv(PackedBundle::new( - block_payload.encode(), - metadata, - block_view, - sequencing_fees, - None, - Some(auction_result), - ))), - event_stream, - ) - .await; + let mut sequencing_fees = Vec::new(); + let mut transactions: Vec<>::Transaction> = + Vec::new(); - return None; - } - } else { - warn!("Timeout while getting auction results"); - } + for bundle in bundles { + sequencing_fees.push(bundle.sequencing_fee); + transactions.extend(bundle.transactions); } - // If we couldn't get any bundles (due to either all of the builders or solver failing to return a result), send an empty block - warn!( - "Failed to get a block for view {:?}, proposing empty block", - block_view - ); + let validated_state = self.consensus.read().await.decided_state(); - // Increment the metric for number of empty blocks proposed - self.consensus - .write() - .await - .metrics - .number_of_empty_blocks_proposed - .add(1); + let sequencing_fees = Vec1::try_from_vec(sequencing_fees) + .context("Failed to receive a bundle from any builder.")?; + let (block_payload, metadata) = TYPES::BlockPayload::from_transactions( + transactions, + &validated_state, + &Arc::clone(&self.instance_state), + ) + .await?; + + Ok(PackedBundle::new( + block_payload.encode(), + metadata, + block_view, + sequencing_fees, + None, + Some(auction_result), + )) + } + /// Produce a null block + pub fn null_block( + &self, + block_view: TYPES::Time, + version: Version, + ) -> Option> { let membership_total_nodes = self.membership.total_nodes(); let Some(null_fee) = null_block::builder_fee::(self.membership.total_nodes(), version) else { - error!("Failed to get null fee"); + error!("Failed to calculate null block fee."); return None; }; @@ -361,16 +352,61 @@ impl, V: Versions> TransactionTask let (_, precompute_data) = precompute_vid_commitment(&[], membership_total_nodes); - // Broadcast the empty block + Some(PackedBundle::new( + vec![].into(), + metadata, + block_view, + vec1::vec1![null_fee], + Some(precompute_data), + Some(TYPES::AuctionResult::default()), + )) + } + + #[allow(clippy::too_many_lines)] + /// marketplace view change handler + pub async fn handle_view_change_marketplace( + &mut self, + event_stream: &Sender>>, + block_view: TYPES::Time, + ) -> Option { + let task_start_time = Instant::now(); + + let version = match self.upgrade_lock.version(block_view).await { + Ok(v) => v, + Err(err) => { + error!("Upgrade certificate requires unsupported version, refusing to request blocks: {}", err); + return None; + } + }; + + let packed_bundle = match self + .produce_block_marketplace(block_view, task_start_time) + .await + { + Ok(b) => b, + Err(e) => { + tracing::info!( + "Failed to get a block for view {:?}: {}. Continuing with empty block.", + block_view, + e + ); + + let null_block = self.null_block(block_view, version)?; + + // Increment the metric for number of empty blocks proposed + self.consensus + .write() + .await + .metrics + .number_of_empty_blocks_proposed + .add(1); + + null_block + } + }; + broadcast_event( - Arc::new(HotShotEvent::BlockRecv(PackedBundle::new( - vec![].into(), - metadata, - block_view, - vec1::vec1![null_fee], - Some(precompute_data), - Some(TYPES::AuctionResult::default()), - ))), + Arc::new(HotShotEvent::BlockRecv(packed_bundle)), event_stream, ) .await; @@ -437,38 +473,59 @@ impl, V: Versions> TransactionTask None } + /// Get VID commitment for the last successful view before `block_view`. + /// Returns None if we don't have said commitment recorded. + #[instrument(skip_all, target = "TransactionTaskState", fields(id = self.id, cur_view = *self.cur_view, block_view = *block_view))] + async fn last_vid_commitment_retry( + &self, + block_view: TYPES::Time, + task_start_time: Instant, + ) -> Result<(TYPES::Time, VidCommitment)> { + loop { + match self.last_vid_commitment(block_view).await { + Ok((view, comm)) => break Ok((view, comm)), + Err(e) if task_start_time.elapsed() >= self.builder_timeout => break Err(e), + _ => { + // We still have time, will re-try in a bit + async_sleep(RETRY_DELAY).await; + continue; + } + } + } + } + /// Get VID commitment for the last successful view before `block_view`. /// Returns None if we don't have said commitment recorded. #[instrument(skip_all, target = "TransactionTaskState", fields(id = self.id, cur_view = *self.cur_view, block_view = *block_view))] async fn last_vid_commitment( &self, block_view: TYPES::Time, - ) -> Option<(TYPES::Time, VidCommitment)> { + ) -> Result<(TYPES::Time, VidCommitment)> { let consensus = self.consensus.read().await; let mut target_view = TYPES::Time::new(block_view.saturating_sub(1)); loop { - let Some(view_data) = consensus.validated_state_map().get(&target_view) else { - tracing::warn!(?target_view, "Missing record for view in validated state"); - return None; - }; + let view_data = consensus + .validated_state_map() + .get(&target_view) + .context("Missing record for view {?target_view} in validated state")?; + match view_data.view_inner { ViewInner::Da { payload_commitment } => { - return Some((target_view, payload_commitment)) + return Ok((target_view, payload_commitment)) } ViewInner::Leaf { leaf: leaf_commitment, .. } => { - let Some(leaf) = consensus.saved_leaves().get(&leaf_commitment) else { - tracing::warn!(?target_view, %leaf_commitment, "Missing leaf in saved_leaves"); - return None; - }; - return Some((target_view, leaf.payload_commitment())); + let leaf = consensus.saved_leaves().get(&leaf_commitment).context + ("Missing leaf with commitment {leaf_commitment} for view {target_view} in saved_leaves")?; + return Ok((target_view, leaf.payload_commitment())); } ViewInner::Failed => { // For failed views, backtrack - target_view = TYPES::Time::new(target_view.checked_sub(1)?); + target_view = + TYPES::Time::new(target_view.checked_sub(1).context("Reached genesis")?); continue; } } @@ -480,18 +537,14 @@ impl, V: Versions> TransactionTask let task_start_time = Instant::now(); // Find commitment to the block we want to build upon - let (parent_view, parent_comm) = loop { - match self.last_vid_commitment(block_view).await { - Some((view, comm)) => break (view, comm), - None if task_start_time.elapsed() < self.builder_timeout => { - // We still have time, will re-try in a bit - async_sleep(RETRY_DELAY).await; - continue; - } - _ => { - tracing::warn!("Failed to find commitment in time"); - return None; - } + let (parent_view, parent_comm) = match self + .last_vid_commitment_retry(block_view, task_start_time) + .await + { + Ok((v, c)) => (v, c), + Err(e) => { + tracing::warn!("Failed to find last vid commitment in time: {e}"); + return None; } }; diff --git a/crates/testing/Cargo.toml b/crates/testing/Cargo.toml index cccd8c8f5e..721f526a08 100644 --- a/crates/testing/Cargo.toml +++ b/crates/testing/Cargo.toml @@ -45,7 +45,7 @@ snafu = { workspace = true } tide-disco = { workspace = true } tracing = { workspace = true } vbs = { workspace = true } -lru = { workspace = true } +lru.workspace = true tagged-base64.workspace = true vec1 = { workspace = true } reqwest = { workspace = true } diff --git a/crates/testing/src/helpers.rs b/crates/testing/src/helpers.rs index 514a511c9c..f3b4411185 100644 --- a/crates/testing/src/helpers.rs +++ b/crates/testing/src/helpers.rs @@ -5,17 +5,15 @@ // along with the HotShot repository. If not, see . #![allow(clippy::panic)] -use std::{fmt::Debug, hash::Hash, marker::PhantomData, sync::Arc, time::Duration}; +use std::{fmt::Debug, hash::Hash, marker::PhantomData, sync::Arc}; use async_broadcast::{Receiver, Sender}; -use async_compatibility_layer::art::async_timeout; use bitvec::bitvec; use committable::Committable; use ethereum_types::U256; -use futures::StreamExt; use hotshot::{ traits::{NodeImplementation, TestableNodeImplementation}, - types::{BLSPubKey, Event, SignatureKey, SystemContextHandle}, + types::{BLSPubKey, SignatureKey, SystemContextHandle}, HotShotInitializer, Memberships, SystemContext, }; use hotshot_example_types::{ @@ -47,10 +45,7 @@ use hotshot_types::{ use jf_vid::VidScheme; use serde::Serialize; -use crate::{ - predicates::PredicateResult, script::ExternalEventsExpectations, test_builder::TestDescription, - test_launcher::TestLauncher, -}; +use crate::{test_builder::TestDescription, test_launcher::TestLauncher}; /// create the [`SystemContextHandle`] from a node id /// # Panics @@ -198,7 +193,7 @@ pub async fn build_system_handle_from_launcher< marketplace_config, ); - let system_context_handle = system_context.create_lean_test_handle(); + let system_context_handle = system_context.build_inactive_handle(); tracing::info!("Successfully created system handle for node {}", node_id); Ok(system_context_handle) @@ -538,40 +533,3 @@ pub fn build_fake_view_with_leaf_and_state( }, } } - -pub async fn check_external_events> + Unpin>( - mut output_stream: S, - expectations: &[ExternalEventsExpectations], - timeout: Duration, -) -> Result<(), String> { - let mut external_event_expectations_met = vec![false; expectations.len()]; - - while let Ok(Some(ext_event_received_output)) = - async_timeout(timeout, output_stream.next()).await - { - tracing::debug!("Test received Ext Event: {:?}", ext_event_received_output); - for (index, expectation) in expectations.iter().enumerate() { - if !external_event_expectations_met[index] { - for predicate in &expectation.output_asserts { - if predicate - .evaluate(&Arc::new(ext_event_received_output.clone())) - .await - == PredicateResult::Pass - { - external_event_expectations_met[index] = true; - break; - } - } - } - } - if external_event_expectations_met.iter().all(|&x| x) { - return Ok(()); - } - } - - if external_event_expectations_met.iter().all(|&x| x) { - Ok(()) - } else { - Err("Not all external event expectations were met".to_string()) - } -} diff --git a/crates/testing/src/predicates/event.rs b/crates/testing/src/predicates/event.rs index 074dd129e5..8b740b8482 100644 --- a/crates/testing/src/predicates/event.rs +++ b/crates/testing/src/predicates/event.rs @@ -11,14 +11,10 @@ use async_trait::async_trait; use hotshot_task_impls::events::{HotShotEvent, HotShotEvent::*}; use hotshot_types::{ data::null_block, - event::Event, traits::{block_contents::BlockHeader, node_implementation::NodeType}, }; -use crate::{ - predicates::{Predicate, PredicateResult}, - script::ExternalEventsExpectations, -}; +use crate::predicates::{Predicate, PredicateResult}; type EventCallback = Arc>) -> bool + Send + Sync>; @@ -298,47 +294,3 @@ where }); Box::new(EventPredicate { check, info }) } - -// New predicate type for EventType -type ExternalEventCheckFn = dyn Fn(&Event) -> bool + Send + Sync; - -pub struct ExternalEventPredicate { - check: Arc>, - info: String, -} - -impl std::fmt::Debug for ExternalEventPredicate { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{}", self.info) - } -} - -#[async_trait] -impl Predicate>> - for ExternalEventPredicate -{ - async fn evaluate(&self, input: &Arc>) -> PredicateResult { - PredicateResult::from((self.check)(input)) - } - - async fn info(&self) -> String { - self.info.clone() - } -} - -pub fn ext_event_exact( - event: Event, -) -> Box>>> { - let info = format!("{:?}", event); - let check = Arc::new(move |e: &Event| { - e.event == event.event && e.view_number == event.view_number - }); - - Box::new(ExternalEventPredicate { check, info }) -} - -pub fn expect_external_events( - predicates: Vec>>>>, -) -> ExternalEventsExpectations { - ExternalEventsExpectations::from_outputs(predicates) -} diff --git a/crates/testing/src/script.rs b/crates/testing/src/script.rs index eb46c2d86e..b25e286f9f 100644 --- a/crates/testing/src/script.rs +++ b/crates/testing/src/script.rs @@ -7,7 +7,7 @@ use std::{sync::Arc, time::Duration}; use hotshot_task_impls::events::HotShotEvent; -use hotshot_types::{event::Event, traits::node_implementation::NodeType}; +use hotshot_types::traits::node_implementation::NodeType; use crate::predicates::{Predicate, PredicateResult}; @@ -66,16 +66,6 @@ impl Expectations { } } -pub struct ExternalEventsExpectations { - pub output_asserts: Vec>>>>, -} - -impl ExternalEventsExpectations { - pub fn from_outputs(output_asserts: Vec>>>>) -> Self { - Self { output_asserts } - } -} - pub fn panic_extra_output_in_script(stage_number: usize, script_name: String, output: &S) where S: std::fmt::Debug, diff --git a/crates/testing/tests/tests_1/da_task.rs b/crates/testing/tests/tests_1/da_task.rs index f2ef882f8f..0c7ed2ed84 100644 --- a/crates/testing/tests/tests_1/da_task.rs +++ b/crates/testing/tests/tests_1/da_task.rs @@ -18,8 +18,8 @@ use hotshot_task_impls::{ events::HotShotEvent, }; use hotshot_testing::{ - helpers::{ build_system_handle, build_system_handle_from_launcher, check_external_events }, - predicates::event::{ exact, expect_external_events, ext_event_exact }, + helpers::{ build_system_handle, build_system_handle_from_launcher }, + predicates::event::exact, script::{Expectations, InputOrder, TaskScript}, serial, view_generator::TestViewGenerator, @@ -28,7 +28,6 @@ use hotshot_testing::{ use hotshot_types::{ data::{null_block, PackedBundle, ViewNumber}, simple_vote::DaData, - event::{Event, EventType}, traits::{ block_contents::precompute_vid_commitment, election::Membership, @@ -310,16 +309,7 @@ async fn test_da_task_outdated_proposal() { ))), ]), ]; - - // Define expectations for external events triggered by the system - let external_event_expectations = vec![expect_external_events(vec![ext_event_exact(Event { - view_number: view3.view_number, - event: EventType::DaProposal { - proposal: view3.da_proposal.clone(), - sender: view3.leader_public_key, - }, - })])]; - + // Create DA task state and script for the test let da_state = DaTaskState::::create_from(&handle).await; let mut da_script = TaskScript { @@ -329,17 +319,7 @@ async fn test_da_task_outdated_proposal() { }; // Run the test with the inputs and check the resulting events - let output_event_stream_recv = handle.event_stream(); run_test![inputs, da_script].await; - - // Validate the external events against expectations - let result = check_external_events( - output_event_stream_recv, - &external_event_expectations, - da_script.timeout, - ) - .await; - assert!(result.is_ok(), "{}", result.err().unwrap()); } /// Test the DA Task for handling duplicate votes. @@ -438,15 +418,6 @@ async fn test_da_task_duplicate_votes() { ]), ]; - // We expect to see an external event for the proposal, but not for individual votes - let external_event_expectations = vec![expect_external_events(vec![ext_event_exact(Event { - view_number: view2.view_number, - event: EventType::DaProposal { - proposal: view2.da_proposal.clone(), - sender: view2.leader_public_key, - }, - })])]; - // Create DA task state and script for the test let da_state = DaTaskState::::create_from(&handle).await; let mut da_script = TaskScript { @@ -456,17 +427,7 @@ async fn test_da_task_duplicate_votes() { }; // Run the test with the inputs and check the resulting events - let output_event_stream_recv = handle.event_stream(); run_test![inputs, da_script].await; - - // Validate the external events against expectations - let result = check_external_events( - output_event_stream_recv, - &external_event_expectations, - da_script.timeout, - ) - .await; - assert!(result.is_ok(), "{}", result.err().unwrap()); } /// Tests the DA Task for collecting and processing valid votes. @@ -568,7 +529,7 @@ async fn test_da_task_vote_collection() { HotShotEvent::DaVoteRecv(votes[3].clone()), ], ]; - + // Assert the outcome let expectations = vec![ Expectations::from_outputs(vec![ @@ -594,23 +555,7 @@ async fn test_da_task_vote_collection() { }; // Run the test - let output_event_stream_recv = handles[leader_node_id].event_stream(); run_test![inputs, da_script].await; - - // Check for DacSend event in the output stream - let result = check_external_events( - output_event_stream_recv, - &[expect_external_events(vec![ext_event_exact(Event { - view_number: view4.view_number, - event: EventType::DaProposal { - proposal: view4.da_proposal.clone(), - sender: view4.leader_public_key, - }, - })])], - da_script.timeout, - ) - .await; - assert!(result.is_ok(), "{}", result.err().unwrap()); } /// Test that non-leader nodes correctly handle DA (Data Availability) votes and @@ -733,21 +678,5 @@ async fn test_da_task_non_leader_vote_collection_ignore() { }; // Run the test with the prepared inputs and script - let output_event_stream_recv = handles[leader_node_id].event_stream(); run_test![inputs, da_script].await; - - // Verify that the non-leader node did not generate the DacSend event - let result = check_external_events( - output_event_stream_recv, - &[expect_external_events(vec![ext_event_exact(Event { - view_number: view4.view_number, - event: EventType::DaProposal { - proposal: view4.da_proposal.clone(), - sender: view4.leader_public_key, - }, - })])], - da_script.timeout, - ) - .await; - assert!(result.is_ok(), "{}", result.err().unwrap()); } diff --git a/crates/types/src/event.rs b/crates/types/src/event.rs index 2e23cfaebb..88c429d5f0 100644 --- a/crates/types/src/event.rs +++ b/crates/types/src/event.rs @@ -173,70 +173,6 @@ pub enum EventType { /// A message destined for external listeners was received ExternalMessageReceived(Vec), } - -// Implement Eq as well, since EventType should have reflexive equality -impl Eq for EventType {} - -impl PartialEq for EventType { - fn eq(&self, other: &Self) -> bool { - match (self, other) { - ( - EventType::DaProposal { - proposal: p1, - sender: s1, - }, - EventType::DaProposal { - proposal: p2, - sender: s2, - }, - ) => p1 == p2 && s1 == s2, - ( - EventType::ReplicaViewTimeout { view_number: v1 }, - EventType::ReplicaViewTimeout { view_number: v2 }, - ) - | ( - EventType::ViewFinished { view_number: v1 }, - EventType::ViewFinished { view_number: v2 }, - ) - | ( - EventType::ViewTimeout { view_number: v1 }, - EventType::ViewTimeout { view_number: v2 }, - ) => v1 == v2, - ( - EventType::Transactions { transactions: t1 }, - EventType::Transactions { transactions: t2 }, - ) => t1 == t2, - ( - EventType::QuorumProposal { - proposal: p1, - sender: s1, - }, - EventType::QuorumProposal { - proposal: p2, - sender: s2, - }, - ) => p1 == p2 && s1 == s2, - ( - EventType::UpgradeProposal { - proposal: p1, - sender: s1, - }, - EventType::UpgradeProposal { - proposal: p2, - sender: s2, - }, - ) => p1 == p2 && s1 == s2, - (EventType::ExternalMessageReceived(m1), EventType::ExternalMessageReceived(m2)) => { - m1 == m2 - } - (event_v1, _event_v2) => unreachable!( - "TODO: PartialEq not yet implemented for this EventType variant {:#?}", - event_v1 - ), - } - } -} - #[derive(Debug, Serialize, Deserialize, Clone)] /// A list of actions that we track for nodes pub enum HotShotAction { diff --git a/crates/types/src/request_response.rs b/crates/types/src/request_response.rs index ca65c204c7..3397445935 100644 --- a/crates/types/src/request_response.rs +++ b/crates/types/src/request_response.rs @@ -7,12 +7,13 @@ //! Types for the request/response implementations. This module incorporates all //! of the shared types for all of the network backends. -use crate::traits::network::NetworkMsg; use async_lock::Mutex; use futures::channel::{mpsc::Receiver, oneshot}; use libp2p::request_response::ResponseChannel; use serde::{Deserialize, Serialize}; +use crate::traits::network::NetworkMsg; + /// Request for Consenus data #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct Request(#[serde(with = "serde_bytes")] pub Vec);