diff --git a/src/init.cpp b/src/init.cpp index adc1dacc757f..aa2d73806888 100644 --- a/src/init.cpp +++ b/src/init.cpp @@ -321,7 +321,9 @@ void Shutdown(NodeContext& node) StopTorControl(); + if (node.chainman) node.chainman->InterruptConnectorThread(); if (node.background_init_thread.joinable()) node.background_init_thread.join(); + if (node.chainman) node.chainman->JoinConnectorThread(); // After everything has been shut down, but before things get flushed, stop the // the scheduler. After this point, SyncWithValidationInterfaceQueue() should not be called anymore // as this would prevent the shutdown from completing. @@ -2017,6 +2019,8 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info) vImportFiles.push_back(fs::PathFromString(strFile)); } + chainman.StartConnectorThread(); + node.background_init_thread = std::thread(&util::TraceThread, "initload", [=, &chainman, &args, &node] { ScheduleBatchPriority(); // Import blocks and ActivateBestChain() diff --git a/src/net_processing.cpp b/src/net_processing.cpp index 80622d06efba..f84a21030dee 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -128,6 +128,8 @@ static const unsigned int MAX_INV_SZ = 50000; static const unsigned int MAX_GETDATA_SZ = 1000; /** Number of blocks that can be requested at any given time from a single peer. */ static const int MAX_BLOCKS_IN_TRANSIT_PER_PEER = 16; +/** Number of blocks that can be requested at any given time from a single peer during IBD. */ +static const int MAX_BLOCKS_IN_TRANSIT_PER_PEER_IBD = 64; /** Default time during which a peer must stall block download progress before being disconnected. * the actual timeout is increased temporarily if peers are disconnected for hitting the timeout */ static constexpr auto BLOCK_STALLING_TIMEOUT_DEFAULT{2s}; @@ -144,6 +146,8 @@ static_assert(MAX_BLOCKTXN_DEPTH <= MIN_BLOCKS_TO_KEEP, "MAX_BLOCKTXN_DEPTH too * degree of disordering of blocks on disk (which make reindexing and pruning harder). We'll probably * want to make this a per-peer adaptive value at some point. */ static const unsigned int BLOCK_DOWNLOAD_WINDOW = 1024; +/** Block download window during IBD, when the connector thread processes blocks asynchronously. */ +static const unsigned int BLOCK_DOWNLOAD_WINDOW_IBD = 8192; /** Block download timeout base, expressed in multiples of the block interval (i.e. 10 min) */ static constexpr double BLOCK_DOWNLOAD_TIMEOUT_BASE = 1; /** Additional block download timeout per parallel downloading peer (i.e. 5 min) */ @@ -1437,9 +1441,23 @@ void PeerManagerImpl::FindNextBlocksToDownload(const Peer& peer, unsigned int co // Never fetch further than the best block we know the peer has, or more than BLOCK_DOWNLOAD_WINDOW + 1 beyond the last // linked block we have in common with this peer. The +1 is so we can detect stalling, namely if we would be able to // download that next block if the window were 1 larger. - int nWindowEnd = state->pindexLastCommonBlock->nHeight + BLOCK_DOWNLOAD_WINDOW; + const unsigned int download_window = m_chainman.IsInitialBlockDownload() ? BLOCK_DOWNLOAD_WINDOW_IBD : BLOCK_DOWNLOAD_WINDOW; + int nWindowEnd = state->pindexLastCommonBlock->nHeight + download_window; FindNextBlocks(vBlocks, peer, state, pindexWalk, count, nWindowEnd, &m_chainman.ActiveChain(), &nodeStaller); + + // pindexLastCommonBlock may have advanced during the walk (blocks stored + // but not yet connected to the active chain). Recompute the window and + // retry to avoid false stall detection and to request newly-visible blocks. + if (state->pindexLastCommonBlock->nHeight > pindexWalk->nHeight) { + int nNewWindowEnd = state->pindexLastCommonBlock->nHeight + download_window; + if (nNewWindowEnd > nWindowEnd) { + nodeStaller = -1; + FindNextBlocks(vBlocks, peer, state, state->pindexLastCommonBlock, + count - vBlocks.size(), nNewWindowEnd, + &m_chainman.ActiveChain(), &nodeStaller); + } + } } void PeerManagerImpl::TryDownloadingHistoricalBlocks(const Peer& peer, unsigned int count, std::vector& vBlocks, const CBlockIndex *from_tip, const CBlockIndex* target_block) @@ -1468,7 +1486,8 @@ void PeerManagerImpl::TryDownloadingHistoricalBlocks(const Peer& peer, unsigned return; } - FindNextBlocks(vBlocks, peer, state, from_tip, count, std::min(from_tip->nHeight + BLOCK_DOWNLOAD_WINDOW, target_block->nHeight)); + const unsigned int download_window = m_chainman.IsInitialBlockDownload() ? BLOCK_DOWNLOAD_WINDOW_IBD : BLOCK_DOWNLOAD_WINDOW; + FindNextBlocks(vBlocks, peer, state, from_tip, count, std::min(from_tip->nHeight + download_window, target_block->nHeight)); } void PeerManagerImpl::FindNextBlocks(std::vector& vBlocks, const Peer& peer, CNodeState *state, const CBlockIndex *pindexWalk, unsigned int count, int nWindowEnd, const CChain* activeChain, NodeId* nodeStaller) @@ -3427,7 +3446,8 @@ void PeerManagerImpl::ProcessGetCFCheckPt(CNode& node, Peer& peer, DataStream& v void PeerManagerImpl::ProcessBlock(CNode& node, const std::shared_ptr& block, bool force_processing, bool min_pow_checked) { bool new_block{false}; - m_chainman.ProcessNewBlock(block, force_processing, min_pow_checked, &new_block); + const bool activate_chain = !m_chainman.IsInitialBlockDownload(); + m_chainman.ProcessNewBlock(block, force_processing, min_pow_checked, &new_block, activate_chain); if (new_block) { node.m_last_block_time = GetTime(); // In case this block came from a different peer than we requested @@ -5151,7 +5171,7 @@ void PeerManagerImpl::ProcessMessage(Peer& peer, CNode& pfrom, const std::string std::vector vInv; vRecv >> vInv; std::vector tx_invs; - if (vInv.size() <= node::MAX_PEER_TX_ANNOUNCEMENTS + MAX_BLOCKS_IN_TRANSIT_PER_PEER) { + if (vInv.size() <= node::MAX_PEER_TX_ANNOUNCEMENTS + MAX_BLOCKS_IN_TRANSIT_PER_PEER_IBD) { for (CInv &inv : vInv) { if (inv.IsGenTxMsg()) { tx_invs.emplace_back(ToGenTxid(inv)); @@ -6162,11 +6182,12 @@ bool PeerManagerImpl::SendMessages(CNode& node) // Message: getdata (blocks) // std::vector vGetData; - if (CanServeBlocks(peer) && ((sync_blocks_and_headers_from_peer && !IsLimitedPeer(peer)) || !m_chainman.IsInitialBlockDownload()) && state.vBlocksInFlight.size() < MAX_BLOCKS_IN_TRANSIT_PER_PEER) { + const int max_blocks_in_transit = m_chainman.IsInitialBlockDownload() ? MAX_BLOCKS_IN_TRANSIT_PER_PEER_IBD : MAX_BLOCKS_IN_TRANSIT_PER_PEER; + if (CanServeBlocks(peer) && ((sync_blocks_and_headers_from_peer && !IsLimitedPeer(peer)) || !m_chainman.IsInitialBlockDownload()) && static_cast(state.vBlocksInFlight.size()) < max_blocks_in_transit) { std::vector vToDownload; NodeId staller = -1; - auto get_inflight_budget = [&state]() { - return std::max(0, MAX_BLOCKS_IN_TRANSIT_PER_PEER - static_cast(state.vBlocksInFlight.size())); + auto get_inflight_budget = [&state, max_blocks_in_transit]() { + return std::max(0, max_blocks_in_transit - static_cast(state.vBlocksInFlight.size())); }; // If there are multiple chainstates, download blocks for the diff --git a/src/primitives/block.h b/src/primitives/block.h index 8ca4fb4800ee..dd0529e4ad51 100644 --- a/src/primitives/block.h +++ b/src/primitives/block.h @@ -11,6 +11,7 @@ #include #include +#include #include #include #include @@ -76,10 +77,10 @@ class CBlock : public CBlockHeader // network and disk std::vector vtx; - // Memory-only flags for caching expensive checks - mutable bool fChecked; // CheckBlock() - mutable bool m_checked_witness_commitment{false}; // CheckWitnessCommitment() - mutable bool m_checked_merkle_root{false}; // CheckMerkleRoot() + // Memory-only flags for caching expensive checks (atomic for thread safety) + mutable std::atomic fChecked{false}; // CheckBlock() + mutable std::atomic m_checked_witness_commitment{false}; // CheckWitnessCommitment() + mutable std::atomic m_checked_merkle_root{false}; // CheckMerkleRoot() CBlock() { @@ -92,6 +93,40 @@ class CBlock : public CBlockHeader *(static_cast(this)) = header; } + CBlock(const CBlock& other) : CBlockHeader(other), vtx(other.vtx), + fChecked(other.fChecked.load(std::memory_order_relaxed)), + m_checked_witness_commitment(other.m_checked_witness_commitment.load(std::memory_order_relaxed)), + m_checked_merkle_root(other.m_checked_merkle_root.load(std::memory_order_relaxed)) {} + + CBlock(CBlock&& other) noexcept : CBlockHeader(std::move(other)), vtx(std::move(other.vtx)), + fChecked(other.fChecked.load(std::memory_order_relaxed)), + m_checked_witness_commitment(other.m_checked_witness_commitment.load(std::memory_order_relaxed)), + m_checked_merkle_root(other.m_checked_merkle_root.load(std::memory_order_relaxed)) {} + + CBlock& operator=(const CBlock& other) + { + if (this != &other) { + CBlockHeader::operator=(other); + vtx = other.vtx; + fChecked.store(other.fChecked.load(std::memory_order_relaxed), std::memory_order_relaxed); + m_checked_witness_commitment.store(other.m_checked_witness_commitment.load(std::memory_order_relaxed), std::memory_order_relaxed); + m_checked_merkle_root.store(other.m_checked_merkle_root.load(std::memory_order_relaxed), std::memory_order_relaxed); + } + return *this; + } + + CBlock& operator=(CBlock&& other) noexcept + { + if (this != &other) { + CBlockHeader::operator=(std::move(other)); + vtx = std::move(other.vtx); + fChecked.store(other.fChecked.load(std::memory_order_relaxed), std::memory_order_relaxed); + m_checked_witness_commitment.store(other.m_checked_witness_commitment.load(std::memory_order_relaxed), std::memory_order_relaxed); + m_checked_merkle_root.store(other.m_checked_merkle_root.load(std::memory_order_relaxed), std::memory_order_relaxed); + } + return *this; + } + SERIALIZE_METHODS(CBlock, obj) { READWRITE(AsBase(obj), obj.vtx); @@ -101,9 +136,9 @@ class CBlock : public CBlockHeader { CBlockHeader::SetNull(); vtx.clear(); - fChecked = false; - m_checked_witness_commitment = false; - m_checked_merkle_root = false; + fChecked.store(false, std::memory_order_relaxed); + m_checked_witness_commitment.store(false, std::memory_order_relaxed); + m_checked_merkle_root.store(false, std::memory_order_relaxed); } std::string ToString() const; diff --git a/src/validation.cpp b/src/validation.cpp index 8c4ae2e9b48b..ab0cb6eb814c 100644 --- a/src/validation.cpp +++ b/src/validation.cpp @@ -59,6 +59,7 @@ #include #include #include +#include #include #include #include @@ -3046,6 +3047,14 @@ bool Chainstate::ConnectTip( assert(pindexNew->pprev == m_chain.Tip()); // Read block from disk. const auto time_1{SteadyClock::now()}; + if (!block_to_connect) { + LOCK(m_chainman.m_block_cache_mutex); + auto it = m_chainman.m_block_cache.find(pindexNew->GetBlockHash()); + if (it != m_chainman.m_block_cache.end()) { + block_to_connect = std::move(it->second); + m_chainman.m_block_cache.erase(it); + } + } if (!block_to_connect) { std::shared_ptr pblockNew = std::make_shared(); if (!m_blockman.ReadBlock(*pblockNew, *pindexNew)) { @@ -3868,7 +3877,7 @@ static bool CheckBlockHeader(const CBlockHeader& block, BlockValidationState& st static bool CheckMerkleRoot(const CBlock& block, BlockValidationState& state) { - if (block.m_checked_merkle_root) return true; + if (block.m_checked_merkle_root.load(std::memory_order_relaxed)) return true; bool mutated; uint256 merkle_root = BlockMerkleRoot(block, &mutated); @@ -3889,7 +3898,7 @@ static bool CheckMerkleRoot(const CBlock& block, BlockValidationState& state) /*debug_message=*/"duplicate transaction"); } - block.m_checked_merkle_root = true; + block.m_checked_merkle_root.store(true, std::memory_order_relaxed); return true; } @@ -3902,7 +3911,7 @@ static bool CheckMerkleRoot(const CBlock& block, BlockValidationState& state) static bool CheckWitnessMalleation(const CBlock& block, bool expect_witness_commitment, BlockValidationState& state) { if (expect_witness_commitment) { - if (block.m_checked_witness_commitment) return true; + if (block.m_checked_witness_commitment.load(std::memory_order_relaxed)) return true; int commitpos = GetWitnessCommitmentIndex(block); if (commitpos != NO_WITNESS_COMMITMENT) { @@ -3929,7 +3938,7 @@ static bool CheckWitnessMalleation(const CBlock& block, bool expect_witness_comm /*debug_message=*/strprintf("%s : witness merkle commitment mismatch", __func__)); } - block.m_checked_witness_commitment = true; + block.m_checked_witness_commitment.store(true, std::memory_order_relaxed); return true; } } @@ -3951,7 +3960,7 @@ bool CheckBlock(const CBlock& block, BlockValidationState& state, const Consensu { // These are checks that are independent of context. - if (block.fChecked) + if (block.fChecked.load(std::memory_order_relaxed)) return true; // Check that the header is valid (particularly PoW). This is mostly @@ -4009,7 +4018,7 @@ bool CheckBlock(const CBlock& block, BlockValidationState& state, const Consensu return state.Invalid(BlockValidationResult::BLOCK_CONSENSUS, "bad-blk-sigops", "out-of-bounds SigOpCount"); if (fCheckPOW && fCheckMerkleRoot) - block.fChecked = true; + block.fChecked.store(true, std::memory_order_relaxed); return true; } @@ -4427,7 +4436,7 @@ bool ChainstateManager::AcceptBlock(const std::shared_ptr& pblock, return true; } -bool ChainstateManager::ProcessNewBlock(const std::shared_ptr& block, bool force_processing, bool min_pow_checked, bool* new_block) +bool ChainstateManager::ProcessNewBlock(const std::shared_ptr& block, bool force_processing, bool min_pow_checked, bool* new_block, bool activate_chain) EXCLUSIVE_LOCKS_REQUIRED(!m_connector_mutex) { AssertLockNotHeld(cs_main); @@ -4436,43 +4445,71 @@ bool ChainstateManager::ProcessNewBlock(const std::shared_ptr& blo if (new_block) *new_block = false; BlockValidationState state; - // CheckBlock() does not support multi-threaded block validation because CBlock::fChecked can cause data race. - // Therefore, the following critical section must include the CheckBlock() call as well. - LOCK(cs_main); - + // Context-free validation (no lock needed — cache flags are atomic). // Skipping AcceptBlock() for CheckBlock() failures means that we will never mark a block as invalid if // CheckBlock() fails. This is protective against consensus failure if there are any unknown forms of block // malleability that cause CheckBlock() to fail; see e.g. CVE-2012-2459 and // https://lists.linuxfoundation.org/pipermail/bitcoin-dev/2019-February/016697.html. Because CheckBlock() is // not very expensive, the anti-DoS benefits of caching failure (of a definitely-invalid block) are not substantial. - bool ret = CheckBlock(*block, state, GetConsensus()); - if (ret) { - // Store to disk - ret = AcceptBlock(block, state, &pindex, force_processing, nullptr, new_block, min_pow_checked); - } - if (!ret) { + if (!CheckBlock(*block, state, GetConsensus())) { if (m_options.signals) { m_options.signals->BlockChecked(block, state); } - LogError("%s: AcceptBlock FAILED (%s)\n", __func__, state.ToString()); + LogError("%s: CheckBlock FAILED (%s)\n", __func__, state.ToString()); return false; } + + // During IBD, write the block to disk before acquiring cs_main so + // the lock is only held briefly for the index update, not the I/O. + const FlatFilePos* dbp_ptr{nullptr}; + FlatFilePos block_pos{}; + if (!activate_chain) { + const CBlockIndex* parent{WITH_LOCK(::cs_main, return m_blockman.LookupBlockIndex(block->hashPrevBlock))}; + if (parent) { + block_pos = m_blockman.WriteBlock(*block, parent->nHeight + 1); + if (!block_pos.IsNull()) { + dbp_ptr = &block_pos; + } + } + } + + { + LOCK(cs_main); + bool ret = AcceptBlock(block, state, &pindex, force_processing, dbp_ptr, new_block, min_pow_checked); + if (!ret) { + if (m_options.signals) { + m_options.signals->BlockChecked(block, state); + } + LogError("%s: AcceptBlock FAILED (%s)\n", __func__, state.ToString()); + return false; + } + } } NotifyHeaderTip(); - BlockValidationState state; // Only used to report errors, not invalidity - ignore it - if (!ActiveChainstate().ActivateBestChain(state, block)) { - LogError("%s: ActivateBestChain failed (%s)\n", __func__, state.ToString()); - return false; - } + if (!activate_chain) { + { + LOCK(m_block_cache_mutex); + if (m_block_cache.size() < MAX_BLOCK_CACHE_SIZE) { + m_block_cache.emplace(block->GetHash(), block); + } + } + WakeConnector(); + } else { + BlockValidationState state; + if (!ActiveChainstate().ActivateBestChain(state, block)) { + LogError("%s: ActivateBestChain failed (%s)\n", __func__, state.ToString()); + return false; + } - Chainstate* bg_chain{WITH_LOCK(cs_main, return HistoricalChainstate())}; - BlockValidationState bg_state; - if (bg_chain && !bg_chain->ActivateBestChain(bg_state, block)) { - LogError("%s: [background] ActivateBestChain failed (%s)\n", __func__, bg_state.ToString()); - return false; - } + Chainstate* bg_chain{WITH_LOCK(cs_main, return HistoricalChainstate())}; + BlockValidationState bg_state; + if (bg_chain && !bg_chain->ActivateBestChain(bg_state, block)) { + LogError("%s: [background] ActivateBestChain failed (%s)\n", __func__, bg_state.ToString()); + return false; + } + } return true; } @@ -4498,9 +4535,8 @@ BlockValidationState TestBlockValidity( const bool check_pow, const bool check_merkle_root) { - // Lock must be held throughout this function for two reasons: - // 1. We don't want the tip to change during several of the validation steps - // 2. To prevent a CheckBlock() race condition for fChecked, see ProcessNewBlock() + // Lock must be held throughout this function because we don't want the + // tip to change during several of the validation steps. AssertLockHeld(chainstate.m_chainman.GetMutex()); BlockValidationState state; @@ -6161,11 +6197,64 @@ ChainstateManager::ChainstateManager(const util::SignalInterrupt& interrupt, Opt ChainstateManager::~ChainstateManager() { + InterruptConnectorThread(); + JoinConnectorThread(); + LOCK(::cs_main); m_versionbitscache.Clear(); } +void ChainstateManager::ConnectorThreadFunc() EXCLUSIVE_LOCKS_REQUIRED(!m_connector_mutex) +{ + while (true) { + { + WAIT_LOCK(m_connector_mutex, lock); + m_connector_cv.wait(lock, [&]() EXCLUSIVE_LOCKS_REQUIRED(m_connector_mutex) { + return m_connector_wake || m_connector_shutdown; + }); + if (m_connector_shutdown) return; + m_connector_wake = false; + } + + BlockValidationState state; + ActiveChainstate().ActivateBestChain(state); + + if (auto* bg = WITH_LOCK(::cs_main, return HistoricalChainstate())) { + BlockValidationState bg_state; + bg->ActivateBestChain(bg_state); + } + } +} + +void ChainstateManager::WakeConnector() EXCLUSIVE_LOCKS_REQUIRED(!m_connector_mutex) +{ + { + LOCK(m_connector_mutex); + m_connector_wake = true; + } + m_connector_cv.notify_one(); +} + +void ChainstateManager::StartConnectorThread() +{ + m_connector_thread = std::thread(&util::TraceThread, "blkconnect", [this] { ConnectorThreadFunc(); }); +} + +void ChainstateManager::InterruptConnectorThread() EXCLUSIVE_LOCKS_REQUIRED(!m_connector_mutex) +{ + { + LOCK(m_connector_mutex); + m_connector_shutdown = true; + } + m_connector_cv.notify_one(); +} + +void ChainstateManager::JoinConnectorThread() +{ + if (m_connector_thread.joinable()) m_connector_thread.join(); +} + Chainstate* ChainstateManager::LoadAssumeutxoChainstate() { assert(!CurrentChainstate().m_from_snapshot_blockhash); diff --git a/src/validation.h b/src/validation.h index 482772c0d6ff..5dbdf7d6ce98 100644 --- a/src/validation.h +++ b/src/validation.h @@ -40,6 +40,7 @@ #include #include +#include #include #include #include @@ -47,7 +48,9 @@ #include #include #include +#include #include +#include #include #include @@ -973,6 +976,23 @@ class ChainstateManager //! A queue for script verifications that have to be performed by worker threads. CCheckQueue m_script_check_queue; + //! Background thread that calls ActivateBestChain() during IBD, so + //! the message handler thread can continue processing new blocks. + std::thread m_connector_thread; + Mutex m_connector_mutex; + std::condition_variable m_connector_cv; + bool m_connector_wake GUARDED_BY(m_connector_mutex){false}; + bool m_connector_shutdown GUARDED_BY(m_connector_mutex){false}; + + void ConnectorThreadFunc() EXCLUSIVE_LOCKS_REQUIRED(!m_connector_mutex); + void WakeConnector() EXCLUSIVE_LOCKS_REQUIRED(!m_connector_mutex); + + //! Cache of in-memory blocks for the connector thread, so ConnectTip + //! can skip re-reading them from disk during IBD. + static constexpr size_t MAX_BLOCK_CACHE_SIZE{128}; + Mutex m_block_cache_mutex; + std::unordered_map, BlockHasher> m_block_cache GUARDED_BY(m_block_cache_mutex); + //! Timers and counters used for benchmarking validation in both background //! and active chainstates. SteadyClock::duration GUARDED_BY(::cs_main) time_check{}; @@ -1246,9 +1266,12 @@ class ChainstateManager * block header is already present in block * index then this parameter has no effect) * @param[out] new_block A boolean which is set to indicate if the block was first received via this call + * @param[in] activate_chain If true (default), call ActivateBestChain before returning. + * When false, the caller is responsible for ensuring ActivateBestChain + * is called (e.g. via the connector thread during IBD). * @returns If the block was processed, independently of block validity */ - bool ProcessNewBlock(const std::shared_ptr& block, bool force_processing, bool min_pow_checked, bool* new_block) LOCKS_EXCLUDED(cs_main); + bool ProcessNewBlock(const std::shared_ptr& block, bool force_processing, bool min_pow_checked, bool* new_block, bool activate_chain = true) LOCKS_EXCLUDED(cs_main) EXCLUSIVE_LOCKS_REQUIRED(!m_connector_mutex, !m_block_cache_mutex); /** * Process incoming block headers. @@ -1361,6 +1384,10 @@ class ChainstateManager CCheckQueue& GetCheckQueue() { return m_script_check_queue; } + void StartConnectorThread(); + void InterruptConnectorThread() EXCLUSIVE_LOCKS_REQUIRED(!m_connector_mutex); + void JoinConnectorThread(); + ~ChainstateManager(); //! List of chainstates. Note: in general, it is not safe to delete diff --git a/test/functional/p2p_ibd_stalling.py b/test/functional/p2p_ibd_stalling.py index 5d80ba90fee6..f21a68f5c0b3 100755 --- a/test/functional/p2p_ibd_stalling.py +++ b/test/functional/p2p_ibd_stalling.py @@ -49,8 +49,18 @@ def set_test_params(self): self.setup_clean_chain = True self.num_nodes = 1 + def send_headers_batched(self, peer, blocks, ping=False): + """Send block headers in batches of MAX_HEADERS to respect P2P protocol limit.""" + MAX_HEADERS = 2000 + for i in range(0, len(blocks), MAX_HEADERS): + headers_message = msg_headers() + headers_message.headers = [CBlockHeader(b) for b in blocks[i:i+MAX_HEADERS]] + peer.send_without_ping(headers_message) + if ping: + peer.sync_with_ping() + def run_test(self): - NUM_BLOCKS = 1025 + NUM_BLOCKS = 8193 NUM_PEERS = 5 node = self.nodes[0] tip = int(node.getbestblockhash(), 16) @@ -70,20 +80,18 @@ def run_test(self): second_stall_index = 500 stall_blocks = [blocks[stall_index].hash_int, blocks[second_stall_index].hash_int] - headers_message = msg_headers() - headers_message.headers = [CBlockHeader(b) for b in blocks[:NUM_BLOCKS-1]] peers = [] - self.log.info("Check that a staller does not get disconnected if the 1024 block lookahead buffer is filled") + self.log.info("Check that a staller does not get disconnected if the 8192 block lookahead buffer is filled") self.mocktime = int(time.time()) + 1 node.setmocktime(self.mocktime) for id in range(NUM_PEERS): peers.append(node.add_outbound_p2p_connection(P2PStaller(stall_blocks), p2p_idx=id, connection_type="outbound-full-relay")) peers[-1].block_store = block_dict - peers[-1].send_and_ping(headers_message) + self.send_headers_batched(peers[-1], blocks[:NUM_BLOCKS-1], ping=True) # Wait until all blocks are received (except for the stall blocks), so that no other blocks are in flight. - self.wait_until(lambda: sum(len(peer['inflight']) for peer in node.getpeerinfo()) == len(stall_blocks)) + self.wait_until(lambda: sum(len(peer['inflight']) for peer in node.getpeerinfo()) == len(stall_blocks), timeout=120) self.all_sync_send_with_ping(peers) # If there was a peer marked for stalling, it would get disconnected @@ -92,11 +100,10 @@ def run_test(self): self.all_sync_send_with_ping(peers) assert_equal(node.num_test_p2p_connections(), NUM_PEERS) - self.log.info("Check that increasing the window beyond 1024 blocks triggers stalling logic") - headers_message.headers = [CBlockHeader(b) for b in blocks] + self.log.info("Check that increasing the window beyond 8192 blocks triggers stalling logic") with node.assert_debug_log(expected_msgs=['Stall started']): for p in peers: - p.send_without_ping(headers_message) + self.send_headers_batched(p, blocks) self.all_sync_send_with_ping(peers) self.log.info("Check that the stalling peer is disconnected after 2 seconds")