From 9db2286d93e60868fedcf48417df273c3ec8b13e Mon Sep 17 00:00:00 2001 From: Aaro Altonen <48052676+altonen@users.noreply.github.com> Date: Sat, 25 Mar 2023 01:47:28 +0200 Subject: [PATCH 1/4] Get the correct number of connected peers in `SyncState` (#13700) `Protocol` is not a reliable source for the information of connected peers because it doesn't have real-time information of the actual connectivity state because it's not resposible for accepting/rejecting connections and gets that information with delay from `SyncinEngine`. --- client/informant/src/display.rs | 2 +- client/network/common/src/sync.rs | 2 ++ client/network/sync/src/engine.rs | 4 +++- client/network/sync/src/lib.rs | 1 + 4 files changed, 7 insertions(+), 2 deletions(-) diff --git a/client/informant/src/display.rs b/client/informant/src/display.rs index 46e7229273d79..2f101307229da 100644 --- a/client/informant/src/display.rs +++ b/client/informant/src/display.rs @@ -76,7 +76,7 @@ impl InformantDisplay { let best_number = info.chain.best_number; let best_hash = info.chain.best_hash; let finalized_number = info.chain.finalized_number; - let num_connected_peers = net_status.num_connected_peers; + let num_connected_peers = sync_status.num_connected_peers; let speed = speed::(best_number, self.last_number, self.last_update); let total_bytes_inbound = net_status.total_bytes_inbound; let total_bytes_outbound = net_status.total_bytes_outbound; diff --git a/client/network/common/src/sync.rs b/client/network/common/src/sync.rs index 130f354b70050..d02a81379aea0 100644 --- a/client/network/common/src/sync.rs +++ b/client/network/common/src/sync.rs @@ -94,6 +94,8 @@ pub struct SyncStatus { pub best_seen_block: Option>, /// Number of peers participating in syncing. pub num_peers: u32, + /// Number of peers known to `SyncingEngine` (both full and light). + pub num_connected_peers: u32, /// Number of blocks queued for import pub queued_blocks: u32, /// State sync status in progress, if any. diff --git a/client/network/sync/src/engine.rs b/client/network/sync/src/engine.rs index e6e62101ffcd0..0e5ac499f7382 100644 --- a/client/network/sync/src/engine.rs +++ b/client/network/sync/src/engine.rs @@ -723,7 +723,9 @@ where ToServiceCommand::NewBestBlockImported(hash, number) => self.new_best_block_imported(hash, number), ToServiceCommand::Status(tx) => { - let _ = tx.send(self.chain_sync.status()); + let mut status = self.chain_sync.status(); + status.num_connected_peers = self.peers.len() as u32; + let _ = tx.send(status); }, ToServiceCommand::NumActivePeers(tx) => { let _ = tx.send(self.chain_sync.num_active_peers()); diff --git a/client/network/sync/src/lib.rs b/client/network/sync/src/lib.rs index 45d14ffa7bb37..28959e7f9c886 100644 --- a/client/network/sync/src/lib.rs +++ b/client/network/sync/src/lib.rs @@ -523,6 +523,7 @@ where state: sync_state, best_seen_block, num_peers: self.peers.len() as u32, + num_connected_peers: 0u32, queued_blocks: self.queue_blocks.len() as u32, state_sync: self.state_sync.as_ref().map(|s| s.progress()), warp_sync: warp_sync_progress, From 0a9b226b39a75ac26efee5eea537754f6f1a0b4a Mon Sep 17 00:00:00 2001 From: Sebastian Kunert Date: Tue, 28 Mar 2023 09:10:50 +0200 Subject: [PATCH 2/4] Remove HeaderBackend requirement from AuthorityDiscovery and NetworkWorker (#13730) * Remove `HeaderBackend` requirement from `NetworkWorker` * Remove HeaderBackend from authority-discovery --- client/authority-discovery/src/error.rs | 4 ++++ client/authority-discovery/src/lib.rs | 3 ++- client/authority-discovery/src/worker.rs | 15 +++++++++++---- client/network/src/config.rs | 10 +++++----- client/network/src/service.rs | 19 +++++++------------ client/network/test/src/lib.rs | 8 +++++--- client/network/test/src/service.rs | 9 ++++++--- client/service/src/builder.rs | 5 +++-- 8 files changed, 43 insertions(+), 30 deletions(-) diff --git a/client/authority-discovery/src/error.rs b/client/authority-discovery/src/error.rs index 89c05b71b9ea6..c299966015c26 100644 --- a/client/authority-discovery/src/error.rs +++ b/client/authority-discovery/src/error.rs @@ -25,6 +25,7 @@ pub type Result = std::result::Result; /// Error type for the authority discovery module. #[derive(Debug, thiserror::Error)] +#[allow(missing_docs)] pub enum Error { #[error("Received dht value found event with records with different keys.")] ReceivingDhtValueFoundEventWithDifferentKeys, @@ -76,4 +77,7 @@ pub enum Error { #[error("Received authority record without a valid signature for the remote peer id.")] MissingPeerIdSignature, + + #[error("Unable to fetch best block.")] + BestBlockFetchingError, } diff --git a/client/authority-discovery/src/lib.rs b/client/authority-discovery/src/lib.rs index a3c6699091297..6bb12804cada3 100644 --- a/client/authority-discovery/src/lib.rs +++ b/client/authority-discovery/src/lib.rs @@ -28,6 +28,7 @@ //! See [`Worker`] and [`Service`] for more documentation. pub use crate::{ + error::Error, service::Service, worker::{AuthorityDiscovery, NetworkProvider, Role, Worker}, }; @@ -148,7 +149,7 @@ pub fn new_worker_and_service_with_config + HeaderBackend + 'static, + Client: AuthorityDiscovery + 'static, DhtEventStream: Stream + Unpin, { let (to_worker, from_service) = mpsc::channel(0); diff --git a/client/authority-discovery/src/worker.rs b/client/authority-discovery/src/worker.rs index 034d72902e65d..0b2055c818a9b 100644 --- a/client/authority-discovery/src/worker.rs +++ b/client/authority-discovery/src/worker.rs @@ -157,12 +157,15 @@ pub trait AuthorityDiscovery { /// Retrieve authority identifiers of the current and next authority set. async fn authorities(&self, at: Block::Hash) -> std::result::Result, ApiError>; + + /// Retrieve best block hash + async fn best_hash(&self) -> std::result::Result; } #[async_trait::async_trait] impl AuthorityDiscovery for T where - T: ProvideRuntimeApi + Send + Sync, + T: ProvideRuntimeApi + HeaderBackend + Send + Sync, T::Api: AuthorityDiscoveryApi, Block: BlockT, { @@ -172,13 +175,17 @@ where ) -> std::result::Result, ApiError> { self.runtime_api().authorities(at) } + + async fn best_hash(&self) -> std::result::Result { + Ok(self.info().best_hash) + } } impl Worker where Block: BlockT + Unpin + 'static, Network: NetworkProvider, - Client: AuthorityDiscovery + HeaderBackend + 'static, + Client: AuthorityDiscovery + 'static, DhtEventStream: Stream + Unpin, { /// Construct a [`Worker`]. @@ -377,7 +384,7 @@ where } async fn refill_pending_lookups_queue(&mut self) -> Result<()> { - let best_hash = self.client.info().best_hash; + let best_hash = self.client.best_hash().await?; let local_keys = match &self.role { Role::PublishAndDiscover(key_store) => key_store @@ -597,7 +604,7 @@ where .into_iter() .collect::>(); - let best_hash = client.info().best_hash; + let best_hash = client.best_hash().await?; let authorities = client .authorities(best_hash) .await diff --git a/client/network/src/config.rs b/client/network/src/config.rs index 925a7795d290f..3c6801f22a35c 100644 --- a/client/network/src/config.rs +++ b/client/network/src/config.rs @@ -34,6 +34,7 @@ use prometheus_endpoint::Registry; pub use sc_network_common::{role::Role, sync::warp::WarpSyncProvider, ExHashT}; use zeroize::Zeroize; +use sp_runtime::traits::Block as BlockT; use std::{ error::Error, fmt, fs, @@ -44,7 +45,6 @@ use std::{ path::{Path, PathBuf}, pin::Pin, str::{self, FromStr}, - sync::Arc, }; pub use libp2p::{ @@ -688,7 +688,7 @@ impl NetworkConfiguration { } /// Network initialization parameters. -pub struct Params { +pub struct Params { /// Assigned role for our node (full, light, ...). pub role: Role, @@ -698,12 +698,12 @@ pub struct Params { /// Network layer configuration. pub network_config: NetworkConfiguration, - /// Client that contains the blockchain. - pub chain: Arc, - /// Legacy name of the protocol to use on the wire. Should be different for each chain. pub protocol_id: ProtocolId, + /// Genesis hash of the chain + pub genesis_hash: Block::Hash, + /// Fork ID to distinguish protocols of different hard forks. Part of the standard protocol /// name on the wire. pub fork_id: Option, diff --git a/client/network/src/service.rs b/client/network/src/service.rs index 6dc00b36ceb53..5f60f51321b42 100644 --- a/client/network/src/service.rs +++ b/client/network/src/service.rs @@ -73,8 +73,7 @@ use parking_lot::Mutex; use sc_network_common::ExHashT; use sc_peerset::PeersetHandle; use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender}; -use sp_blockchain::HeaderBackend; -use sp_runtime::traits::{Block as BlockT, Zero}; +use sp_runtime::traits::Block as BlockT; use std::{ cmp, @@ -147,9 +146,7 @@ where /// Returns a `NetworkWorker` that implements `Future` and must be regularly polled in order /// for the network processing to advance. From it, you can extract a `NetworkService` using /// `worker.service()`. The `NetworkService` can be shared through the codebase. - pub fn new + 'static>( - mut params: Params, - ) -> Result { + pub fn new(mut params: Params) -> Result { // Private and public keys configuration. let local_identity = params.network_config.node_key.clone().into_keypair()?; let local_public = local_identity.public(); @@ -277,13 +274,11 @@ where config.discovery_limit( u64::from(params.network_config.default_peers_set.out_peers) + 15, ); - let genesis_hash = params - .chain - .hash(Zero::zero()) - .ok() - .flatten() - .expect("Genesis block exists; qed"); - config.with_kademlia(genesis_hash, params.fork_id.as_deref(), ¶ms.protocol_id); + config.with_kademlia( + params.genesis_hash, + params.fork_id.as_deref(), + ¶ms.protocol_id, + ); config.with_dht_random_walk(params.network_config.enable_dht_random_walk); config.allow_non_globals_in_dht(params.network_config.allow_non_globals_in_dht); config.use_kademlia_disjoint_query_paths( diff --git a/client/network/test/src/lib.rs b/client/network/test/src/lib.rs index 75b8287b08dcf..6a99304035ab8 100644 --- a/client/network/test/src/lib.rs +++ b/client/network/test/src/lib.rs @@ -83,7 +83,7 @@ use sp_core::H256; use sp_runtime::{ codec::{Decode, Encode}, generic::BlockId, - traits::{Block as BlockT, Header as HeaderT, NumberFor}, + traits::{Block as BlockT, Header as HeaderT, NumberFor, Zero}, Justification, Justifications, }; use substrate_test_runtime_client::AccountKeyring; @@ -916,13 +916,15 @@ where let sync_service_import_queue = Box::new(sync_service.clone()); let sync_service = Arc::new(sync_service.clone()); - let network = NetworkWorker::new(sc_network::config::Params { + let genesis_hash = + client.hash(Zero::zero()).ok().flatten().expect("Genesis block exists; qed"); + let network = NetworkWorker::new::(sc_network::config::Params { role: if config.is_authority { Role::Authority } else { Role::Full }, executor: Box::new(|f| { tokio::spawn(f); }), network_config, - chain: client.clone(), + genesis_hash, protocol_id, fork_id, metrics_registry: None, diff --git a/client/network/test/src/service.rs b/client/network/test/src/service.rs index b1de2a91ebcc9..67915c38637ed 100644 --- a/client/network/test/src/service.rs +++ b/client/network/test/src/service.rs @@ -34,7 +34,8 @@ use sc_network_sync::{ service::network::{NetworkServiceHandle, NetworkServiceProvider}, state_request_handler::StateRequestHandler, }; -use sp_runtime::traits::Block as BlockT; +use sp_blockchain::HeaderBackend; +use sp_runtime::traits::{Block as BlockT, Zero}; use substrate_test_runtime_client::{ runtime::{Block as TestBlock, Hash as TestHash}, TestClientBuilder, TestClientBuilderExt as _, @@ -194,17 +195,19 @@ impl TestNetworkBuilder { ) .unwrap(); let mut link = self.link.unwrap_or(Box::new(chain_sync_service.clone())); + let genesis_hash = + client.hash(Zero::zero()).ok().flatten().expect("Genesis block exists; qed"); let worker = NetworkWorker::< substrate_test_runtime_client::runtime::Block, substrate_test_runtime_client::runtime::Hash, - >::new(config::Params { + >::new(config::Params:: { block_announce_config, role: config::Role::Full, executor: Box::new(|f| { tokio::spawn(f); }), + genesis_hash, network_config, - chain: client.clone(), protocol_id, fork_id, metrics_registry: None, diff --git a/client/service/src/builder.rs b/client/service/src/builder.rs index 91ef65cf134e4..dcec96595daf4 100644 --- a/client/service/src/builder.rs +++ b/client/service/src/builder.rs @@ -907,7 +907,8 @@ where protocol_config })); - let mut network_params = sc_network::config::Params { + let genesis_hash = client.hash(Zero::zero()).ok().flatten().expect("Genesis block exists; qed"); + let mut network_params = sc_network::config::Params:: { role: config.role.clone(), executor: { let spawn_handle = Clone::clone(&spawn_handle); @@ -916,7 +917,7 @@ where }) }, network_config: config.network.clone(), - chain: client.clone(), + genesis_hash, protocol_id: protocol_id.clone(), fork_id: config.chain_spec.fork_id().map(ToOwned::to_owned), metrics_registry: config.prometheus_config.as_ref().map(|config| config.registry.clone()), From 980eb16791bab5d63b47c07657bb9dc198d9307b Mon Sep 17 00:00:00 2001 From: Aaro Altonen <48052676+altonen@users.noreply.github.com> Date: Thu, 30 Mar 2023 14:59:58 +0300 Subject: [PATCH 3/4] Attempt to relieve pressure on `mpsc_network_worker` (#13725) * Attempt to relieve pressure on `mpsc_network_worker` `SyncingEngine` interacting with `NetworkWorker` can put a lot of strain on the channel if the number of inbound connections is high. This is because `SyncingEngine` is notified of each inbound substream which it then can either accept or reject and this causes a lot of message exchange on the already busy channel. Use a direct channel pair between `Protocol` and `SyncingEngine` to exchange notification events. It is a temporary change to alleviate the problems caused by syncing being an independent protocol and the fix will be removed once `NotificationService` is implemented. * Apply review comments * fixes * trigger ci * Fix tests Verify that both peers have a connection now that the validation goes through `SyncingEngine`. Depending on how the tasks are scheduled, one of them might not have the peer registered in `SyncingEngine` at which point the test won't make any progress because block announcement received from an unknown peer is discarded. Move polling of `ChainSync` at the end of the function so that if a block announcement causes a block request to be sent, that can be sent in the same call to `SyncingEngine::poll()`. --------- Co-authored-by: parity-processbot <> --- Cargo.lock | 1 + client/network/src/config.rs | 11 +- client/network/src/event.rs | 47 ++++++- client/network/src/lib.rs | 6 +- client/network/src/protocol.rs | 108 +++++++++++++--- client/network/src/service.rs | 6 +- client/network/sync/src/engine.rs | 197 +++++++++++++---------------- client/network/test/Cargo.toml | 1 + client/network/test/src/lib.rs | 12 +- client/network/test/src/service.rs | 6 +- client/network/test/src/sync.rs | 2 +- client/service/src/builder.rs | 13 +- 12 files changed, 259 insertions(+), 151 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d0aea7729bd2d..282344a00cd99 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9000,6 +9000,7 @@ dependencies = [ "sc-network-light", "sc-network-sync", "sc-service", + "sc-utils", "sp-blockchain", "sp-consensus", "sp-consensus-babe", diff --git a/client/network/src/config.rs b/client/network/src/config.rs index 3c6801f22a35c..781ae9c786694 100644 --- a/client/network/src/config.rs +++ b/client/network/src/config.rs @@ -22,6 +22,7 @@ //! See the documentation of [`Params`]. pub use crate::{ + protocol::NotificationsSink, request_responses::{ IncomingRequest, OutgoingResponse, ProtocolConfig as RequestResponseConfig, }, @@ -31,7 +32,12 @@ pub use crate::{ use codec::Encode; use libp2p::{identity::Keypair, multiaddr, Multiaddr, PeerId}; use prometheus_endpoint::Registry; -pub use sc_network_common::{role::Role, sync::warp::WarpSyncProvider, ExHashT}; +pub use sc_network_common::{ + role::{Role, Roles}, + sync::warp::WarpSyncProvider, + ExHashT, +}; +use sc_utils::mpsc::TracingUnboundedSender; use zeroize::Zeroize; use sp_runtime::traits::Block as BlockT; @@ -714,6 +720,9 @@ pub struct Params { /// Block announce protocol configuration pub block_announce_config: NonDefaultSetConfig, + /// TX channel for direct communication with `SyncingEngine` and `Protocol`. + pub tx: TracingUnboundedSender>, + /// Request response protocol configurations pub request_response_protocol_configs: Vec, } diff --git a/client/network/src/event.rs b/client/network/src/event.rs index 3ecd8f9311429..975fde0e40a28 100644 --- a/client/network/src/event.rs +++ b/client/network/src/event.rs @@ -19,12 +19,14 @@ //! Network event types. These are are not the part of the protocol, but rather //! events that happen on the network like DHT get/put results received. -use crate::types::ProtocolName; +use crate::{types::ProtocolName, NotificationsSink}; use bytes::Bytes; +use futures::channel::oneshot; use libp2p::{core::PeerId, kad::record::Key}; -use sc_network_common::role::ObservedRole; +use sc_network_common::{role::ObservedRole, sync::message::BlockAnnouncesHandshake}; +use sp_runtime::traits::Block as BlockT; /// Events generated by DHT as a response to get_value and put_value requests. #[derive(Debug, Clone)] @@ -90,3 +92,44 @@ pub enum Event { messages: Vec<(ProtocolName, Bytes)>, }, } + +/// Event sent to `SyncingEngine` +// TODO: remove once `NotificationService` is implemented. +pub enum SyncEvent { + /// Opened a substream with the given node with the given notifications protocol. + /// + /// The protocol is always one of the notification protocols that have been registered. + NotificationStreamOpened { + /// Node we opened the substream with. + remote: PeerId, + /// Received handshake. + received_handshake: BlockAnnouncesHandshake, + /// Notification sink. + sink: NotificationsSink, + /// Channel for reporting accept/reject of the substream. + tx: oneshot::Sender, + }, + + /// Closed a substream with the given node. Always matches a corresponding previous + /// `NotificationStreamOpened` message. + NotificationStreamClosed { + /// Node we closed the substream with. + remote: PeerId, + }, + + /// Notification sink was replaced. + NotificationSinkReplaced { + /// Node we closed the substream with. + remote: PeerId, + /// Notification sink. + sink: NotificationsSink, + }, + + /// Received one or more messages from the given node using the given protocol. + NotificationsReceived { + /// Node we received the message from. + remote: PeerId, + /// Concerned protocol and associated message. + messages: Vec, + }, +} diff --git a/client/network/src/lib.rs b/client/network/src/lib.rs index c290f4b94db53..5374ac13435be 100644 --- a/client/network/src/lib.rs +++ b/client/network/src/lib.rs @@ -259,7 +259,7 @@ pub mod request_responses; pub mod types; pub mod utils; -pub use event::{DhtEvent, Event}; +pub use event::{DhtEvent, Event, SyncEvent}; #[doc(inline)] pub use libp2p::{multiaddr, Multiaddr, PeerId}; pub use request_responses::{IfDisconnected, RequestFailure, RequestResponseConfig}; @@ -278,8 +278,8 @@ pub use service::{ NetworkStatusProvider, NetworkSyncForkRequest, NotificationSender as NotificationSenderT, NotificationSenderError, NotificationSenderReady, }, - DecodingError, Keypair, NetworkService, NetworkWorker, NotificationSender, OutboundFailure, - PublicKey, + DecodingError, Keypair, NetworkService, NetworkWorker, NotificationSender, NotificationsSink, + OutboundFailure, PublicKey, }; pub use types::ProtocolName; diff --git a/client/network/src/protocol.rs b/client/network/src/protocol.rs index 06ca02c0ca8d5..a7e6f36ef6215 100644 --- a/client/network/src/protocol.rs +++ b/client/network/src/protocol.rs @@ -24,6 +24,7 @@ use crate::{ use bytes::Bytes; use codec::{DecodeAll, Encode}; +use futures::{channel::oneshot, stream::FuturesUnordered, StreamExt}; use libp2p::{ core::connection::ConnectionId, swarm::{ @@ -35,11 +36,14 @@ use libp2p::{ use log::{debug, error, warn}; use sc_network_common::{role::Roles, sync::message::BlockAnnouncesHandshake}; +use sc_utils::mpsc::TracingUnboundedSender; use sp_runtime::traits::Block as BlockT; use std::{ collections::{HashMap, HashSet, VecDeque}, + future::Future, iter, + pin::Pin, task::Poll, }; @@ -68,6 +72,9 @@ mod rep { pub const BAD_MESSAGE: Rep = Rep::new(-(1 << 12), "Bad message"); } +type PendingSyncSubstreamValidation = + Pin> + Send>>; + // Lock must always be taken in order declared here. pub struct Protocol { /// Pending list of messages to return from `poll` as a priority. @@ -87,6 +94,8 @@ pub struct Protocol { bad_handshake_substreams: HashSet<(PeerId, sc_peerset::SetId)>, /// Connected peers. peers: HashMap, + sync_substream_validations: FuturesUnordered, + tx: TracingUnboundedSender>, _marker: std::marker::PhantomData, } @@ -96,6 +105,7 @@ impl Protocol { roles: Roles, network_config: &config::NetworkConfiguration, block_announces_protocol: config::NonDefaultSetConfig, + tx: TracingUnboundedSender>, ) -> error::Result<(Self, sc_peerset::PeersetHandle, Vec<(PeerId, Multiaddr)>)> { let mut known_addresses = Vec::new(); @@ -179,6 +189,8 @@ impl Protocol { .collect(), bad_handshake_substreams: Default::default(), peers: HashMap::new(), + sync_substream_validations: FuturesUnordered::new(), + tx, // TODO: remove when `BlockAnnouncesHandshake` is moved away from `Protocol` _marker: Default::default(), }; @@ -418,6 +430,23 @@ impl NetworkBehaviour for Protocol { return Poll::Ready(NetworkBehaviourAction::CloseConnection { peer_id, connection }), }; + while let Poll::Ready(Some(validation_result)) = + self.sync_substream_validations.poll_next_unpin(cx) + { + match validation_result { + Ok((peer, roles)) => { + self.peers.insert(peer, roles); + }, + Err(peer) => { + log::debug!( + target: "sub-libp2p", + "`SyncingEngine` rejected stream" + ); + self.behaviour.disconnect_peer(&peer, HARDCODED_PEERSETS_SYNC); + }, + } + } + let outcome = match event { NotificationsOut::CustomProtocolOpen { peer_id, @@ -440,16 +469,29 @@ impl NetworkBehaviour for Protocol { best_hash: handshake.best_hash, genesis_hash: handshake.genesis_hash, }; - self.peers.insert(peer_id, roles); - CustomMessageOutcome::NotificationStreamOpened { - remote: peer_id, - protocol: self.notification_protocols[usize::from(set_id)].clone(), - negotiated_fallback, - received_handshake: handshake.encode(), - roles, - notifications_sink, - } + let (tx, rx) = oneshot::channel(); + let _ = self.tx.unbounded_send( + crate::SyncEvent::NotificationStreamOpened { + remote: peer_id, + received_handshake: handshake, + sink: notifications_sink, + tx, + }, + ); + self.sync_substream_validations.push(Box::pin(async move { + match rx.await { + Ok(accepted) => + if accepted { + Ok((peer_id, roles)) + } else { + Err(peer_id) + }, + Err(_) => Err(peer_id), + } + })); + + CustomMessageOutcome::None }, Ok(msg) => { debug!( @@ -469,15 +511,27 @@ impl NetworkBehaviour for Protocol { let roles = handshake.roles; self.peers.insert(peer_id, roles); - CustomMessageOutcome::NotificationStreamOpened { - remote: peer_id, - protocol: self.notification_protocols[usize::from(set_id)] - .clone(), - negotiated_fallback, - received_handshake, - roles, - notifications_sink, - } + let (tx, rx) = oneshot::channel(); + let _ = self.tx.unbounded_send( + crate::SyncEvent::NotificationStreamOpened { + remote: peer_id, + received_handshake: handshake, + sink: notifications_sink, + tx, + }, + ); + self.sync_substream_validations.push(Box::pin(async move { + match rx.await { + Ok(accepted) => + if accepted { + Ok((peer_id, roles)) + } else { + Err(peer_id) + }, + Err(_) => Err(peer_id), + } + })); + CustomMessageOutcome::None }, Err(err2) => { log::debug!( @@ -535,6 +589,12 @@ impl NetworkBehaviour for Protocol { NotificationsOut::CustomProtocolReplaced { peer_id, notifications_sink, set_id } => if self.bad_handshake_substreams.contains(&(peer_id, set_id)) { CustomMessageOutcome::None + } else if set_id == HARDCODED_PEERSETS_SYNC { + let _ = self.tx.unbounded_send(crate::SyncEvent::NotificationSinkReplaced { + remote: peer_id, + sink: notifications_sink, + }); + CustomMessageOutcome::None } else { CustomMessageOutcome::NotificationStreamReplaced { remote: peer_id, @@ -548,6 +608,12 @@ impl NetworkBehaviour for Protocol { // handshake. The outer layers have never received an opening event about this // substream, and consequently shouldn't receive a closing event either. CustomMessageOutcome::None + } else if set_id == HARDCODED_PEERSETS_SYNC { + let _ = self.tx.unbounded_send(crate::SyncEvent::NotificationStreamClosed { + remote: peer_id, + }); + self.peers.remove(&peer_id); + CustomMessageOutcome::None } else { CustomMessageOutcome::NotificationStreamClosed { remote: peer_id, @@ -558,6 +624,12 @@ impl NetworkBehaviour for Protocol { NotificationsOut::Notification { peer_id, set_id, message } => { if self.bad_handshake_substreams.contains(&(peer_id, set_id)) { CustomMessageOutcome::None + } else if set_id == HARDCODED_PEERSETS_SYNC { + let _ = self.tx.unbounded_send(crate::SyncEvent::NotificationsReceived { + remote: peer_id, + messages: vec![message.freeze()], + }); + CustomMessageOutcome::None } else { let protocol_name = self.notification_protocols[usize::from(set_id)].clone(); CustomMessageOutcome::NotificationsReceived { diff --git a/client/network/src/service.rs b/client/network/src/service.rs index 5f60f51321b42..9708b24d29b52 100644 --- a/client/network/src/service.rs +++ b/client/network/src/service.rs @@ -36,7 +36,7 @@ use crate::{ network_state::{ NetworkState, NotConnectedPeer as NetworkStateNotConnectedPeer, Peer as NetworkStatePeer, }, - protocol::{self, NotificationsSink, NotifsHandlerError, Protocol, Ready}, + protocol::{self, NotifsHandlerError, Protocol, Ready}, request_responses::{IfDisconnected, RequestFailure}, service::{ signature::{Signature, SigningError}, @@ -91,6 +91,7 @@ use std::{ pub use behaviour::{InboundFailure, OutboundFailure, ResponseFailure}; pub use libp2p::identity::{error::DecodingError, Keypair, PublicKey}; +pub use protocol::NotificationsSink; mod metrics; mod out_events; @@ -146,7 +147,7 @@ where /// Returns a `NetworkWorker` that implements `Future` and must be regularly polled in order /// for the network processing to advance. From it, you can extract a `NetworkService` using /// `worker.service()`. The `NetworkService` can be shared through the codebase. - pub fn new(mut params: Params) -> Result { + pub fn new(mut params: Params) -> Result { // Private and public keys configuration. let local_identity = params.network_config.node_key.clone().into_keypair()?; let local_public = local_identity.public(); @@ -227,6 +228,7 @@ where From::from(¶ms.role), ¶ms.network_config, params.block_announce_config, + params.tx, )?; // List of multiaddresses that we know in the network. diff --git a/client/network/sync/src/engine.rs b/client/network/sync/src/engine.rs index 0e5ac499f7382..4310db22bd622 100644 --- a/client/network/sync/src/engine.rs +++ b/client/network/sync/src/engine.rs @@ -24,8 +24,8 @@ use crate::{ ChainSync, ClientError, SyncingService, }; -use codec::{Decode, DecodeAll, Encode}; -use futures::{FutureExt, Stream, StreamExt}; +use codec::{Decode, Encode}; +use futures::{FutureExt, StreamExt}; use futures_timer::Delay; use libp2p::PeerId; use lru::LruCache; @@ -39,9 +39,8 @@ use sc_network::{ config::{ NetworkConfiguration, NonDefaultSetConfig, ProtocolId, SyncMode as SyncOperationMode, }, - event::Event, utils::LruHashSet, - ProtocolName, + NotificationsSink, ProtocolName, }; use sc_network_common::{ role::Roles, @@ -66,7 +65,6 @@ use sp_runtime::{ use std::{ collections::{HashMap, HashSet}, num::NonZeroUsize, - pin::Pin, sync::{ atomic::{AtomicBool, AtomicUsize, Ordering}, Arc, @@ -174,6 +172,8 @@ pub struct Peer { pub info: ExtendedPeerInfo, /// Holds a set of blocks known to this peer. pub known_blocks: LruHashSet, + /// Notification sink. + sink: NotificationsSink, } pub struct SyncingEngine { @@ -196,6 +196,9 @@ pub struct SyncingEngine { /// Channel for receiving service commands service_rx: TracingUnboundedReceiver>, + /// Channel for receiving inbound connections from `Protocol`. + rx: sc_utils::mpsc::TracingUnboundedReceiver>, + /// Assigned roles. roles: Roles, @@ -266,6 +269,7 @@ where block_request_protocol_name: ProtocolName, state_request_protocol_name: ProtocolName, warp_sync_protocol_name: Option, + rx: sc_utils::mpsc::TracingUnboundedReceiver>, ) -> Result<(Self, SyncingService, NonDefaultSetConfig), ClientError> { let mode = match network_config.sync_mode { SyncOperationMode::Full => SyncMode::Full, @@ -359,6 +363,7 @@ where num_connected: num_connected.clone(), is_major_syncing: is_major_syncing.clone(), service_rx, + rx, genesis_hash, important_peers, default_peers_set_no_slot_connected_peers: HashSet::new(), @@ -566,11 +571,7 @@ where data: Some(data.clone()), }; - self.network_service.write_notification( - *who, - self.block_announce_protocol_name.clone(), - message.encode(), - ); + peer.sink.send_sync_notification(message.encode()); } } } @@ -587,17 +588,13 @@ where ) } - pub async fn run(mut self, mut stream: Pin + Send>>) { + pub async fn run(mut self) { loop { - futures::future::poll_fn(|cx| self.poll(cx, &mut stream)).await; + futures::future::poll_fn(|cx| self.poll(cx)).await; } } - pub fn poll( - &mut self, - cx: &mut std::task::Context, - event_stream: &mut Pin + Send>>, - ) -> Poll<()> { + pub fn poll(&mut self, cx: &mut std::task::Context) -> Poll<()> { self.num_connected.store(self.peers.len(), Ordering::Relaxed); self.is_major_syncing .store(self.chain_sync.status().state.is_major_syncing(), Ordering::Relaxed); @@ -607,84 +604,6 @@ where self.tick_timeout.reset(TICK_TIMEOUT); } - while let Poll::Ready(Some(event)) = event_stream.poll_next_unpin(cx) { - match event { - Event::NotificationStreamOpened { - remote, protocol, received_handshake, .. - } => { - if protocol != self.block_announce_protocol_name { - continue - } - - match as DecodeAll>::decode_all( - &mut &received_handshake[..], - ) { - Ok(handshake) => { - if self.on_sync_peer_connected(remote, handshake).is_err() { - log::debug!( - target: "sync", - "Failed to register peer {remote:?}: {received_handshake:?}", - ); - } - }, - Err(err) => { - log::debug!( - target: "sync", - "Couldn't decode handshake sent by {}: {:?}: {}", - remote, - received_handshake, - err, - ); - self.network_service.report_peer(remote, rep::BAD_MESSAGE); - }, - } - }, - Event::NotificationStreamClosed { remote, protocol } => { - if protocol != self.block_announce_protocol_name { - continue - } - - if self.on_sync_peer_disconnected(remote).is_err() { - log::trace!( - target: "sync", - "Disconnected peer which had earlier been refused by on_sync_peer_connected {}", - remote - ); - } - }, - Event::NotificationsReceived { remote, messages } => { - for (protocol, message) in messages { - if protocol != self.block_announce_protocol_name { - continue - } - - if self.peers.contains_key(&remote) { - if let Ok(announce) = BlockAnnounce::decode(&mut message.as_ref()) { - self.push_block_announce_validation(remote, announce); - - // Make sure that the newly added block announce validation future - // was polled once to be registered in the task. - if let Poll::Ready(res) = - self.chain_sync.poll_block_announce_validation(cx) - { - self.process_block_announce_validation_result(res) - } - } else { - log::warn!(target: "sub-libp2p", "Failed to decode block announce"); - } - } else { - log::trace!( - target: "sync", - "Received sync for peer earlier refused by sync layer: {}", - remote - ); - } - } - }, - _ => {}, - } - } - while let Poll::Ready(Some(event)) = self.service_rx.poll_next_unpin(cx) { match event { ToServiceCommand::SetSyncForkRequest(peers, hash, number) => { @@ -758,6 +677,70 @@ where } } + while let Poll::Ready(Some(event)) = self.rx.poll_next_unpin(cx) { + match event { + sc_network::SyncEvent::NotificationStreamOpened { + remote, + received_handshake, + sink, + tx, + } => match self.on_sync_peer_connected(remote, &received_handshake, sink) { + Ok(()) => { + let _ = tx.send(true); + }, + Err(()) => { + log::debug!( + target: "sync", + "Failed to register peer {remote:?}: {received_handshake:?}", + ); + let _ = tx.send(false); + }, + }, + sc_network::SyncEvent::NotificationStreamClosed { remote } => { + if self.on_sync_peer_disconnected(remote).is_err() { + log::trace!( + target: "sync", + "Disconnected peer which had earlier been refused by on_sync_peer_connected {}", + remote + ); + } + }, + sc_network::SyncEvent::NotificationsReceived { remote, messages } => { + for message in messages { + if self.peers.contains_key(&remote) { + if let Ok(announce) = BlockAnnounce::decode(&mut message.as_ref()) { + self.push_block_announce_validation(remote, announce); + + // Make sure that the newly added block announce validation future + // was polled once to be registered in the task. + if let Poll::Ready(res) = + self.chain_sync.poll_block_announce_validation(cx) + { + self.process_block_announce_validation_result(res) + } + } else { + log::warn!(target: "sub-libp2p", "Failed to decode block announce"); + } + } else { + log::trace!( + target: "sync", + "Received sync for peer earlier refused by sync layer: {}", + remote + ); + } + } + }, + sc_network::SyncEvent::NotificationSinkReplaced { remote, sink } => { + if let Some(peer) = self.peers.get_mut(&remote) { + peer.sink = sink; + } + }, + } + } + + // poll `ChainSync` last because of a block announcement was received through the + // event stream between `SyncingEngine` and `Protocol` and the validation finished + // right after it as queued, the resulting block request (if any) can be sent right away. while let Poll::Ready(result) = self.chain_sync.poll(cx) { self.process_block_announce_validation_result(result); } @@ -769,13 +752,13 @@ where /// /// Returns a result if the handshake of this peer was indeed accepted. pub fn on_sync_peer_disconnected(&mut self, peer: PeerId) -> Result<(), ()> { - if self.important_peers.contains(&peer) { - log::warn!(target: "sync", "Reserved peer {} disconnected", peer); - } else { - log::debug!(target: "sync", "{} disconnected", peer); - } - if self.peers.remove(&peer).is_some() { + if self.important_peers.contains(&peer) { + log::warn!(target: "sync", "Reserved peer {} disconnected", peer); + } else { + log::debug!(target: "sync", "{} disconnected", peer); + } + self.chain_sync.peer_disconnected(&peer); self.default_peers_set_no_slot_connected_peers.remove(&peer); self.event_streams @@ -794,7 +777,8 @@ where pub fn on_sync_peer_connected( &mut self, who: PeerId, - status: BlockAnnouncesHandshake, + status: &BlockAnnouncesHandshake, + sink: NotificationsSink, ) -> Result<(), ()> { log::trace!(target: "sync", "New peer {} {:?}", who, status); @@ -806,8 +790,6 @@ where if status.genesis_hash != self.genesis_hash { self.network_service.report_peer(who, rep::GENESIS_MISMATCH); - self.network_service - .disconnect_peer(who, self.block_announce_protocol_name.clone()); if self.important_peers.contains(&who) { log::error!( @@ -871,8 +853,6 @@ where this_peer_reserved_slot { log::debug!(target: "sync", "Too many full nodes, rejecting {}", who); - self.network_service - .disconnect_peer(who, self.block_announce_protocol_name.clone()); return Err(()) } @@ -881,8 +861,6 @@ where { // Make sure that not all slots are occupied by light clients. log::debug!(target: "sync", "Too many light nodes, rejecting {}", who); - self.network_service - .disconnect_peer(who, self.block_announce_protocol_name.clone()); return Err(()) } @@ -895,14 +873,13 @@ where known_blocks: LruHashSet::new( NonZeroUsize::new(MAX_KNOWN_BLOCKS).expect("Constant is nonzero"), ), + sink, }; let req = if peer.info.roles.is_full() { match self.chain_sync.new_peer(who, peer.info.best_hash, peer.info.best_number) { Ok(req) => req, Err(BadPeer(id, repu)) => { - self.network_service - .disconnect_peer(id, self.block_announce_protocol_name.clone()); self.network_service.report_peer(id, repu); return Err(()) }, diff --git a/client/network/test/Cargo.toml b/client/network/test/Cargo.toml index 8368fa278712a..9763feed5ea54 100644 --- a/client/network/test/Cargo.toml +++ b/client/network/test/Cargo.toml @@ -26,6 +26,7 @@ sc-client-api = { version = "4.0.0-dev", path = "../../api" } sc-consensus = { version = "0.10.0-dev", path = "../../consensus/common" } sc-network = { version = "0.10.0-dev", path = "../" } sc-network-common = { version = "0.10.0-dev", path = "../common" } +sc-utils = { version = "4.0.0-dev", path = "../../utils" } sc-network-light = { version = "0.10.0-dev", path = "../light" } sc-network-sync = { version = "0.10.0-dev", path = "../sync" } sc-service = { version = "0.10.0-dev", default-features = false, features = ["test-helpers"], path = "../../service" } diff --git a/client/network/test/src/lib.rs b/client/network/test/src/lib.rs index 6a99304035ab8..f85d6ed63c247 100644 --- a/client/network/test/src/lib.rs +++ b/client/network/test/src/lib.rs @@ -55,8 +55,8 @@ use sc_network::{ }, request_responses::ProtocolConfig as RequestResponseConfig, types::ProtocolName, - Multiaddr, NetworkBlock, NetworkEventStream, NetworkService, NetworkStateInfo, - NetworkSyncForkRequest, NetworkWorker, + Multiaddr, NetworkBlock, NetworkService, NetworkStateInfo, NetworkSyncForkRequest, + NetworkWorker, }; use sc_network_common::{ role::Roles, @@ -896,6 +896,7 @@ where let (chain_sync_network_provider, chain_sync_network_handle) = NetworkServiceProvider::new(); + let (tx, rx) = sc_utils::mpsc::tracing_unbounded("mpsc_syncing_engine_protocol", 100_000); let (engine, sync_service, block_announce_config) = sc_network_sync::engine::SyncingEngine::new( Roles::from(if config.is_authority { &Role::Authority } else { &Role::Full }), @@ -911,6 +912,7 @@ where block_request_protocol_config.name.clone(), state_request_protocol_config.name.clone(), Some(warp_protocol_config.name.clone()), + rx, ) .unwrap(); let sync_service_import_queue = Box::new(sync_service.clone()); @@ -918,7 +920,7 @@ where let genesis_hash = client.hash(Zero::zero()).ok().flatten().expect("Genesis block exists; qed"); - let network = NetworkWorker::new::(sc_network::config::Params { + let network = NetworkWorker::new(sc_network::config::Params { role: if config.is_authority { Role::Authority } else { Role::Full }, executor: Box::new(|f| { tokio::spawn(f); @@ -929,6 +931,7 @@ where fork_id, metrics_registry: None, block_announce_config, + tx, request_response_protocol_configs: [ block_request_protocol_config, state_request_protocol_config, @@ -950,9 +953,8 @@ where import_queue.run(sync_service_import_queue).await; }); - let service = network.service().clone(); tokio::spawn(async move { - engine.run(service.event_stream("syncing")).await; + engine.run().await; }); self.mut_peers(move |peers| { diff --git a/client/network/test/src/service.rs b/client/network/test/src/service.rs index 67915c38637ed..5871860a7c4a6 100644 --- a/client/network/test/src/service.rs +++ b/client/network/test/src/service.rs @@ -177,6 +177,7 @@ impl TestNetworkBuilder { let (chain_sync_network_provider, chain_sync_network_handle) = self.chain_sync_network.unwrap_or(NetworkServiceProvider::new()); + let (tx, rx) = sc_utils::mpsc::tracing_unbounded("mpsc_syncing_engine_protocol", 100_000); let (engine, chain_sync_service, block_announce_config) = SyncingEngine::new( Roles::from(&config::Role::Full), @@ -192,6 +193,7 @@ impl TestNetworkBuilder { block_request_protocol_config.name.clone(), state_request_protocol_config.name.clone(), None, + rx, ) .unwrap(); let mut link = self.link.unwrap_or(Box::new(chain_sync_service.clone())); @@ -217,6 +219,7 @@ impl TestNetworkBuilder { light_client_request_protocol_config, ] .to_vec(), + tx, }) .unwrap(); @@ -234,8 +237,7 @@ impl TestNetworkBuilder { tokio::time::sleep(std::time::Duration::from_millis(250)).await; } }); - let stream = worker.service().event_stream("syncing"); - tokio::spawn(engine.run(stream)); + tokio::spawn(engine.run()); TestNetwork::new(worker) } diff --git a/client/network/test/src/sync.rs b/client/network/test/src/sync.rs index d87b03fb3a78c..af46d15a2bacd 100644 --- a/client/network/test/src/sync.rs +++ b/client/network/test/src/sync.rs @@ -414,7 +414,7 @@ async fn can_sync_small_non_best_forks() { // poll until the two nodes connect, otherwise announcing the block will not work futures::future::poll_fn::<(), _>(|cx| { net.poll(cx); - if net.peer(0).num_peers() == 0 { + if net.peer(0).num_peers() == 0 || net.peer(1).num_peers() == 0 { Poll::Pending } else { Poll::Ready(()) diff --git a/client/service/src/builder.rs b/client/service/src/builder.rs index dcec96595daf4..1d8cbae004eee 100644 --- a/client/service/src/builder.rs +++ b/client/service/src/builder.rs @@ -38,9 +38,7 @@ use sc_client_db::{Backend, DatabaseSettings}; use sc_consensus::import_queue::ImportQueue; use sc_executor::RuntimeVersionOf; use sc_keystore::LocalKeystore; -use sc_network::{ - config::SyncMode, NetworkEventStream, NetworkService, NetworkStateInfo, NetworkStatusProvider, -}; +use sc_network::{config::SyncMode, NetworkService, NetworkStateInfo, NetworkStatusProvider}; use sc_network_bitswap::BitswapRequestHandler; use sc_network_common::{role::Roles, sync::warp::WarpSyncParams}; use sc_network_light::light_client_requests::handler::LightClientRequestHandler; @@ -882,6 +880,7 @@ where protocol_config }; + let (tx, rx) = sc_utils::mpsc::tracing_unbounded("mpsc_syncing_engine_protocol", 100_000); let (chain_sync_network_provider, chain_sync_network_handle) = NetworkServiceProvider::new(); let (engine, sync_service, block_announce_config) = SyncingEngine::new( Roles::from(&config.role), @@ -897,6 +896,7 @@ where block_request_protocol_config.name.clone(), state_request_protocol_config.name.clone(), warp_sync_protocol_config.as_ref().map(|config| config.name.clone()), + rx, )?; let sync_service_import_queue = sync_service.clone(); let sync_service = Arc::new(sync_service); @@ -922,6 +922,7 @@ where fork_id: config.chain_spec.fork_id().map(ToOwned::to_owned), metrics_registry: config.prometheus_config.as_ref().map(|config| config.registry.clone()), block_announce_config, + tx, request_response_protocol_configs: request_response_protocol_configs .into_iter() .chain([ @@ -961,15 +962,13 @@ where )?; spawn_handle.spawn("network-transactions-handler", Some("networking"), tx_handler.run()); - spawn_handle.spawn( + spawn_handle.spawn_blocking( "chain-sync-network-service-provider", Some("networking"), chain_sync_network_provider.run(network.clone()), ); spawn_handle.spawn("import-queue", None, import_queue.run(Box::new(sync_service_import_queue))); - - let event_stream = network.event_stream("syncing"); - spawn_handle.spawn("syncing", None, engine.run(event_stream)); + spawn_handle.spawn_blocking("syncing", None, engine.run()); let (system_rpc_tx, system_rpc_rx) = tracing_unbounded("mpsc_system_rpc", 10_000); spawn_handle.spawn( From 3bd809d59c31c7205a0f36c6ddcba603e43051fc Mon Sep 17 00:00:00 2001 From: PG Herveou Date: Fri, 21 Apr 2023 17:24:12 +0200 Subject: [PATCH 4/4] Update macro use and optimize storage collection (#13970) --- frame/contracts/src/lib.rs | 3 +-- frame/contracts/src/storage/meter.rs | 2 +- frame/contracts/src/tests.rs | 26 +++++++++++++------------- 3 files changed, 15 insertions(+), 16 deletions(-) diff --git a/frame/contracts/src/lib.rs b/frame/contracts/src/lib.rs index 4f5d7ba305fc1..aef3cd1bd5615 100644 --- a/frame/contracts/src/lib.rs +++ b/frame/contracts/src/lib.rs @@ -83,11 +83,10 @@ #![cfg_attr(not(feature = "std"), no_std)] #![cfg_attr(feature = "runtime-benchmarks", recursion_limit = "1024")] -#[macro_use] -mod gas; mod address; mod benchmarking; mod exec; +mod gas; mod migration; mod schedule; mod storage; diff --git a/frame/contracts/src/storage/meter.rs b/frame/contracts/src/storage/meter.rs index ef6fb3277c11b..9842c116883f9 100644 --- a/frame/contracts/src/storage/meter.rs +++ b/frame/contracts/src/storage/meter.rs @@ -290,8 +290,8 @@ where .total_deposit .saturating_add(&absorbed.total_deposit) .saturating_add(&own_deposit); + self.charges.extend_from_slice(&absorbed.charges); if !own_deposit.is_zero() { - self.charges.extend_from_slice(&absorbed.charges); self.charges.push(Charge { deposit_account, amount: own_deposit, diff --git a/frame/contracts/src/tests.rs b/frame/contracts/src/tests.rs index d74b08243df4b..655bcca9fcd17 100644 --- a/frame/contracts/src/tests.rs +++ b/frame/contracts/src/tests.rs @@ -72,7 +72,19 @@ frame_support::construct_runtime!( } ); -#[macro_use] +macro_rules! assert_return_code { + ( $x:expr , $y:expr $(,)? ) => {{ + assert_eq!(u32::from_le_bytes($x.data[..].try_into().unwrap()), $y as u32); + }}; +} + +macro_rules! assert_refcount { + ( $code_hash:expr , $should:expr $(,)? ) => {{ + let is = crate::OwnerInfoOf::::get($code_hash).map(|m| m.refcount()).unwrap(); + assert_eq!(is, $should); + }}; +} + pub mod test_utils { use super::{Balances, Hash, SysConfig, Test}; use crate::{exec::AccountIdOf, CodeHash, Config, ContractInfo, ContractInfoOf, Nonce}; @@ -104,18 +116,6 @@ pub mod test_utils { pub fn hash(s: &S) -> <::Hashing as Hash>::Output { <::Hashing as Hash>::hash_of(s) } - macro_rules! assert_return_code { - ( $x:expr , $y:expr $(,)? ) => {{ - assert_eq!(u32::from_le_bytes($x.data[..].try_into().unwrap()), $y as u32); - }}; - } - - macro_rules! assert_refcount { - ( $code_hash:expr , $should:expr $(,)? ) => {{ - let is = crate::OwnerInfoOf::::get($code_hash).map(|m| m.refcount()).unwrap(); - assert_eq!(is, $should); - }}; - } } impl Test {