Merge bitcoin/bitcoin#27752: [25.x] Parallel compact block downloads

b8ad3220a9 Unconditionally return when compact block status == READ_STATUS_FAILED (Greg Sanders)
cdd3de08e3 Add tests for parallel compact block downloads (Greg Sanders)
e66a5cbb56 Support up to 3 parallel compact block txn fetchings (Greg Sanders)
d1a93f5d41 Only request full blocks from the peer we thought had the block in-flight (Greg Sanders)
38e3af9fad Convert mapBlocksInFlight to a multimap (Greg Sanders)
a45159b8e2 Remove nBlocksInFlight (Greg Sanders)
722361e129 alias BlockDownloadMap for mapBlocksInFlight (Greg Sanders)

Pull request description:

  Backports:
  * https://github.com/bitcoin/bitcoin/pull/27626
  * https://github.com/bitcoin/bitcoin/pull/27743

ACKs for top commit:
  instagibbs:
    utACK b8ad3220a9
  ajtowns:
    ACK b8ad3220a9 ; confirmed patches are clean cherry-picks from master, and already tested patches prior to 25.0 release

Tree-SHA512: 438901496a5ed927662e62f936e3d1e7ffb727cb235869854983e8e29a68e144eb3bff307d9fc3ae785fb276b67a216b1cce397689252ca49c5d761efc1380ac
This commit is contained in:
fanquake 2023-07-04 11:39:19 +01:00
commit 8825983716
No known key found for this signature in database
GPG key ID: 2EEB9F5CC09526C1
5 changed files with 244 additions and 78 deletions

View file

@ -200,7 +200,9 @@ public:
int nVersion;
std::string cleanSubVer;
bool fInbound;
// We requested high bandwidth connection to peer
bool m_bip152_highbandwidth_to;
// Peer requested high bandwidth connection
bool m_bip152_highbandwidth_from;
int m_starting_height;
uint64_t nSendBytes;

View file

@ -433,7 +433,6 @@ struct CNodeState {
std::list<QueuedBlock> vBlocksInFlight;
//! When the first entry in vBlocksInFlight started downloading. Don't care when vBlocksInFlight is empty.
std::chrono::microseconds m_downloading_since{0us};
int nBlocksInFlight{0};
//! Whether we consider this a preferred download peer.
bool fPreferredDownload{false};
/** Whether this peer wants invs or cmpctblocks (when possible) for block announcements. */
@ -879,6 +878,9 @@ private:
/** Have we requested this block from a peer */
bool IsBlockRequested(const uint256& hash) EXCLUSIVE_LOCKS_REQUIRED(cs_main);
/** Have we requested this block from an outbound peer */
bool IsBlockRequestedFromOutbound(const uint256& hash) EXCLUSIVE_LOCKS_REQUIRED(cs_main);
/** Remove this block from our tracked requested blocks. Called if:
* - the block has been received from a peer
* - the request for the block has timed out
@ -901,7 +903,9 @@ private:
*/
void FindNextBlocksToDownload(const Peer& peer, unsigned int count, std::vector<const CBlockIndex*>& vBlocks, NodeId& nodeStaller) EXCLUSIVE_LOCKS_REQUIRED(cs_main);
std::map<uint256, std::pair<NodeId, std::list<QueuedBlock>::iterator> > mapBlocksInFlight GUARDED_BY(cs_main);
/* Multimap used to preserve insertion order */
typedef std::multimap<uint256, std::pair<NodeId, std::list<QueuedBlock>::iterator>> BlockDownloadMap;
BlockDownloadMap mapBlocksInFlight GUARDED_BY(cs_main);
/** When our tip was last updated. */
std::atomic<std::chrono::seconds> m_last_tip_update{0s};
@ -1119,40 +1123,55 @@ std::chrono::microseconds PeerManagerImpl::NextInvToInbounds(std::chrono::micros
bool PeerManagerImpl::IsBlockRequested(const uint256& hash)
{
return mapBlocksInFlight.find(hash) != mapBlocksInFlight.end();
return mapBlocksInFlight.count(hash);
}
bool PeerManagerImpl::IsBlockRequestedFromOutbound(const uint256& hash)
{
for (auto range = mapBlocksInFlight.equal_range(hash); range.first != range.second; range.first++) {
auto [nodeid, block_it] = range.first->second;
CNodeState& nodestate = *Assert(State(nodeid));
if (!nodestate.m_is_inbound) return true;
}
return false;
}
void PeerManagerImpl::RemoveBlockRequest(const uint256& hash, std::optional<NodeId> from_peer)
{
auto it = mapBlocksInFlight.find(hash);
if (it == mapBlocksInFlight.end()) {
// Block was not requested
auto range = mapBlocksInFlight.equal_range(hash);
if (range.first == range.second) {
// Block was not requested from any peer
return;
}
auto [node_id, list_it] = it->second;
// We should not have requested too many of this block
Assume(mapBlocksInFlight.count(hash) <= MAX_CMPCTBLOCKS_INFLIGHT_PER_BLOCK);
if (from_peer && node_id != *from_peer) {
// Block was requested by another peer
return;
while (range.first != range.second) {
auto [node_id, list_it] = range.first->second;
if (from_peer && *from_peer != node_id) {
range.first++;
continue;
}
CNodeState& state = *Assert(State(node_id));
if (state.vBlocksInFlight.begin() == list_it) {
// First block on the queue was received, update the start download time for the next one
state.m_downloading_since = std::max(state.m_downloading_since, GetTime<std::chrono::microseconds>());
}
state.vBlocksInFlight.erase(list_it);
if (state.vBlocksInFlight.empty()) {
// Last validated block on the queue for this peer was received.
m_peers_downloading_from--;
}
state.m_stalling_since = 0us;
range.first = mapBlocksInFlight.erase(range.first);
}
CNodeState *state = State(node_id);
assert(state != nullptr);
if (state->vBlocksInFlight.begin() == list_it) {
// First block on the queue was received, update the start download time for the next one
state->m_downloading_since = std::max(state->m_downloading_since, GetTime<std::chrono::microseconds>());
}
state->vBlocksInFlight.erase(list_it);
state->nBlocksInFlight--;
if (state->nBlocksInFlight == 0) {
// Last validated block on the queue was received.
m_peers_downloading_from--;
}
state->m_stalling_since = 0us;
mapBlocksInFlight.erase(it);
}
bool PeerManagerImpl::BlockRequested(NodeId nodeid, const CBlockIndex& block, std::list<QueuedBlock>::iterator** pit)
@ -1162,27 +1181,29 @@ bool PeerManagerImpl::BlockRequested(NodeId nodeid, const CBlockIndex& block, st
CNodeState *state = State(nodeid);
assert(state != nullptr);
Assume(mapBlocksInFlight.count(hash) <= MAX_CMPCTBLOCKS_INFLIGHT_PER_BLOCK);
// Short-circuit most stuff in case it is from the same node
std::map<uint256, std::pair<NodeId, std::list<QueuedBlock>::iterator> >::iterator itInFlight = mapBlocksInFlight.find(hash);
if (itInFlight != mapBlocksInFlight.end() && itInFlight->second.first == nodeid) {
if (pit) {
*pit = &itInFlight->second.second;
for (auto range = mapBlocksInFlight.equal_range(hash); range.first != range.second; range.first++) {
if (range.first->second.first == nodeid) {
if (pit) {
*pit = &range.first->second.second;
}
return false;
}
return false;
}
// Make sure it's not listed somewhere already.
RemoveBlockRequest(hash, std::nullopt);
// Make sure it's not being fetched already from same peer.
RemoveBlockRequest(hash, nodeid);
std::list<QueuedBlock>::iterator it = state->vBlocksInFlight.insert(state->vBlocksInFlight.end(),
{&block, std::unique_ptr<PartiallyDownloadedBlock>(pit ? new PartiallyDownloadedBlock(&m_mempool) : nullptr)});
state->nBlocksInFlight++;
if (state->nBlocksInFlight == 1) {
if (state->vBlocksInFlight.size() == 1) {
// We're starting a block download (batch) from this peer.
state->m_downloading_since = GetTime<std::chrono::microseconds>();
m_peers_downloading_from++;
}
itInFlight = mapBlocksInFlight.insert(std::make_pair(hash, std::make_pair(nodeid, it))).first;
auto itInFlight = mapBlocksInFlight.insert(std::make_pair(hash, std::make_pair(nodeid, it)));
if (pit) {
*pit = &itInFlight->second.second;
}
@ -1385,7 +1406,7 @@ void PeerManagerImpl::FindNextBlocksToDownload(const Peer& peer, unsigned int co
}
} else if (waitingfor == -1) {
// This is the first already-in-flight block.
waitingfor = mapBlocksInFlight[pindex->GetBlockHash()].first;
waitingfor = mapBlocksInFlight.lower_bound(pindex->GetBlockHash())->second.first;
}
}
}
@ -1515,13 +1536,21 @@ void PeerManagerImpl::FinalizeNode(const CNode& node)
nSyncStarted--;
for (const QueuedBlock& entry : state->vBlocksInFlight) {
mapBlocksInFlight.erase(entry.pindex->GetBlockHash());
auto range = mapBlocksInFlight.equal_range(entry.pindex->GetBlockHash());
while (range.first != range.second) {
auto [node_id, list_it] = range.first->second;
if (node_id != nodeid) {
range.first++;
} else {
range.first = mapBlocksInFlight.erase(range.first);
}
}
}
m_orphanage.EraseForPeer(nodeid);
m_txrequest.DisconnectedPeer(nodeid);
if (m_txreconciliation) m_txreconciliation->ForgetPeer(nodeid);
m_num_preferred_download_peers -= state->fPreferredDownload;
m_peers_downloading_from -= (state->nBlocksInFlight != 0);
m_peers_downloading_from -= (!state->vBlocksInFlight.empty());
assert(m_peers_downloading_from >= 0);
m_outbound_peers_with_protect_from_disconnect -= state->m_chain_sync.m_protect;
assert(m_outbound_peers_with_protect_from_disconnect >= 0);
@ -1762,11 +1791,10 @@ std::optional<std::string> PeerManagerImpl::FetchBlock(NodeId peer_id, const CBl
LOCK(cs_main);
// Mark block as in-flight unless it already is (for this peer).
// If the peer does not send us a block, vBlocksInFlight remains non-empty,
// causing us to timeout and disconnect.
// If a block was already in-flight for a different peer, its BLOCKTXN
// response will be dropped.
// Forget about all prior requests
RemoveBlockRequest(block_index.GetBlockHash(), std::nullopt);
// Mark block as in-flight
if (!BlockRequested(peer_id, block_index)) return "Already requested from this peer";
// Construct message to request the block
@ -2682,7 +2710,7 @@ void PeerManagerImpl::HeadersDirectFetchBlocks(CNode& pfrom, const Peer& peer, c
std::vector<CInv> vGetData;
// Download as much as possible, from earliest to latest.
for (const CBlockIndex *pindex : reverse_iterate(vToFetch)) {
if (nodestate->nBlocksInFlight >= MAX_BLOCKS_IN_TRANSIT_PER_PEER) {
if (nodestate->vBlocksInFlight.size() >= MAX_BLOCKS_IN_TRANSIT_PER_PEER) {
// Can't download any more from this peer
break;
}
@ -4273,15 +4301,27 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type,
nodestate->m_last_block_announcement = GetTime();
}
std::map<uint256, std::pair<NodeId, std::list<QueuedBlock>::iterator> >::iterator blockInFlightIt = mapBlocksInFlight.find(pindex->GetBlockHash());
bool fAlreadyInFlight = blockInFlightIt != mapBlocksInFlight.end();
if (pindex->nStatus & BLOCK_HAVE_DATA) // Nothing to do here
return;
auto range_flight = mapBlocksInFlight.equal_range(pindex->GetBlockHash());
size_t already_in_flight = std::distance(range_flight.first, range_flight.second);
bool requested_block_from_this_peer{false};
// Multimap ensures ordering of outstanding requests. It's either empty or first in line.
bool first_in_flight = already_in_flight == 0 || (range_flight.first->second.first == pfrom.GetId());
while (range_flight.first != range_flight.second) {
if (range_flight.first->second.first == pfrom.GetId()) {
requested_block_from_this_peer = true;
break;
}
range_flight.first++;
}
if (pindex->nChainWork <= m_chainman.ActiveChain().Tip()->nChainWork || // We know something better
pindex->nTx != 0) { // We had this block at some point, but pruned it
if (fAlreadyInFlight) {
if (requested_block_from_this_peer) {
// We requested this block for some reason, but our mempool will probably be useless
// so we just grab the block via normal getdata
std::vector<CInv> vInv(1);
@ -4292,15 +4332,15 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type,
}
// If we're not close to tip yet, give up and let parallel block fetch work its magic
if (!fAlreadyInFlight && !CanDirectFetch()) {
if (!already_in_flight && !CanDirectFetch()) {
return;
}
// We want to be a bit conservative just to be extra careful about DoS
// possibilities in compact block processing...
if (pindex->nHeight <= m_chainman.ActiveChain().Height() + 2) {
if ((!fAlreadyInFlight && nodestate->nBlocksInFlight < MAX_BLOCKS_IN_TRANSIT_PER_PEER) ||
(fAlreadyInFlight && blockInFlightIt->second.first == pfrom.GetId())) {
if ((already_in_flight < MAX_CMPCTBLOCKS_INFLIGHT_PER_BLOCK && nodestate->vBlocksInFlight.size() < MAX_BLOCKS_IN_TRANSIT_PER_PEER) ||
requested_block_from_this_peer) {
std::list<QueuedBlock>::iterator* queuedBlockIt = nullptr;
if (!BlockRequested(pfrom.GetId(), *pindex, &queuedBlockIt)) {
if (!(*queuedBlockIt)->partialBlock)
@ -4319,10 +4359,15 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type,
Misbehaving(*peer, 100, "invalid compact block");
return;
} else if (status == READ_STATUS_FAILED) {
// Duplicate txindexes, the block is now in-flight, so just request it
std::vector<CInv> vInv(1);
vInv[0] = CInv(MSG_BLOCK | GetFetchFlags(*peer), blockhash);
m_connman.PushMessage(&pfrom, msgMaker.Make(NetMsgType::GETDATA, vInv));
if (first_in_flight) {
// Duplicate txindexes, the block is now in-flight, so just request it
std::vector<CInv> vInv(1);
vInv[0] = CInv(MSG_BLOCK | GetFetchFlags(*peer), blockhash);
m_connman.PushMessage(&pfrom, msgMaker.Make(NetMsgType::GETDATA, vInv));
} else {
// Give up for this peer and wait for other peer(s)
RemoveBlockRequest(pindex->GetBlockHash(), pfrom.GetId());
}
return;
}
@ -4337,9 +4382,24 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type,
txn.blockhash = blockhash;
blockTxnMsg << txn;
fProcessBLOCKTXN = true;
} else {
} else if (first_in_flight) {
// We will try to round-trip any compact blocks we get on failure,
// as long as it's first...
req.blockhash = pindex->GetBlockHash();
m_connman.PushMessage(&pfrom, msgMaker.Make(NetMsgType::GETBLOCKTXN, req));
} else if (pfrom.m_bip152_highbandwidth_to &&
(!pfrom.IsInboundConn() ||
IsBlockRequestedFromOutbound(blockhash) ||
already_in_flight < MAX_CMPCTBLOCKS_INFLIGHT_PER_BLOCK - 1)) {
// ... or it's a hb relay peer and:
// - peer is outbound, or
// - we already have an outbound attempt in flight(so we'll take what we can get), or
// - it's not the final parallel download slot (which we may reserve for first outbound)
req.blockhash = pindex->GetBlockHash();
m_connman.PushMessage(&pfrom, msgMaker.Make(NetMsgType::GETBLOCKTXN, req));
} else {
// Give up for this peer and wait for other peer(s)
RemoveBlockRequest(pindex->GetBlockHash(), pfrom.GetId());
}
} else {
// This block is either already in flight from a different
@ -4360,7 +4420,7 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type,
}
}
} else {
if (fAlreadyInFlight) {
if (requested_block_from_this_peer) {
// We requested this block, but its far into the future, so our
// mempool will probably be useless - request the block normally
std::vector<CInv> vInv(1);
@ -4432,24 +4492,44 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type,
{
LOCK(cs_main);
std::map<uint256, std::pair<NodeId, std::list<QueuedBlock>::iterator> >::iterator it = mapBlocksInFlight.find(resp.blockhash);
if (it == mapBlocksInFlight.end() || !it->second.second->partialBlock ||
it->second.first != pfrom.GetId()) {
auto range_flight = mapBlocksInFlight.equal_range(resp.blockhash);
size_t already_in_flight = std::distance(range_flight.first, range_flight.second);
bool requested_block_from_this_peer{false};
// Multimap ensures ordering of outstanding requests. It's either empty or first in line.
bool first_in_flight = already_in_flight == 0 || (range_flight.first->second.first == pfrom.GetId());
while (range_flight.first != range_flight.second) {
auto [node_id, block_it] = range_flight.first->second;
if (node_id == pfrom.GetId() && block_it->partialBlock) {
requested_block_from_this_peer = true;
break;
}
range_flight.first++;
}
if (!requested_block_from_this_peer) {
LogPrint(BCLog::NET, "Peer %d sent us block transactions for block we weren't expecting\n", pfrom.GetId());
return;
}
PartiallyDownloadedBlock& partialBlock = *it->second.second->partialBlock;
PartiallyDownloadedBlock& partialBlock = *range_flight.first->second.second->partialBlock;
ReadStatus status = partialBlock.FillBlock(*pblock, resp.txn);
if (status == READ_STATUS_INVALID) {
RemoveBlockRequest(resp.blockhash, pfrom.GetId()); // Reset in-flight state in case Misbehaving does not result in a disconnect
Misbehaving(*peer, 100, "invalid compact block/non-matching block transactions");
return;
} else if (status == READ_STATUS_FAILED) {
// Might have collided, fall back to getdata now :(
std::vector<CInv> invs;
invs.push_back(CInv(MSG_BLOCK | GetFetchFlags(*peer), resp.blockhash));
m_connman.PushMessage(&pfrom, msgMaker.Make(NetMsgType::GETDATA, invs));
if (first_in_flight) {
// Might have collided, fall back to getdata now :(
std::vector<CInv> invs;
invs.push_back(CInv(MSG_BLOCK | GetFetchFlags(*peer), resp.blockhash));
m_connman.PushMessage(&pfrom, msgMaker.Make(NetMsgType::GETDATA, invs));
} else {
RemoveBlockRequest(resp.blockhash, pfrom.GetId());
LogPrint(BCLog::NET, "Peer %d sent us a compact block but it failed to reconstruct, waiting on first download to complete\n", pfrom.GetId());
return;
}
} else {
// Block is either okay, or possibly we received
// READ_STATUS_CHECKBLOCK_FAILED.
@ -5040,14 +5120,14 @@ void PeerManagerImpl::EvictExtraOutboundPeers(std::chrono::seconds now)
// valid headers chain with at least as much work as our tip.
CNodeState *node_state = State(pnode->GetId());
if (node_state == nullptr ||
(now - pnode->m_connected >= MINIMUM_CONNECT_TIME && node_state->nBlocksInFlight == 0)) {
(now - pnode->m_connected >= MINIMUM_CONNECT_TIME && node_state->vBlocksInFlight.empty())) {
pnode->fDisconnect = true;
LogPrint(BCLog::NET, "disconnecting extra block-relay-only peer=%d (last block received at time %d)\n",
pnode->GetId(), count_seconds(pnode->m_last_block_time));
return true;
} else {
LogPrint(BCLog::NET, "keeping block-relay-only peer=%d chosen for eviction (connect time: %d, blocks_in_flight: %d)\n",
pnode->GetId(), count_seconds(pnode->m_connected), node_state->nBlocksInFlight);
pnode->GetId(), count_seconds(pnode->m_connected), node_state->vBlocksInFlight.size());
}
return false;
});
@ -5087,13 +5167,13 @@ void PeerManagerImpl::EvictExtraOutboundPeers(std::chrono::seconds now)
// Also don't disconnect any peer we're trying to download a
// block from.
CNodeState &state = *State(pnode->GetId());
if (now - pnode->m_connected > MINIMUM_CONNECT_TIME && state.nBlocksInFlight == 0) {
if (now - pnode->m_connected > MINIMUM_CONNECT_TIME && state.vBlocksInFlight.empty()) {
LogPrint(BCLog::NET, "disconnecting extra outbound peer=%d (last block announcement received at time %d)\n", pnode->GetId(), oldest_block_announcement);
pnode->fDisconnect = true;
return true;
} else {
LogPrint(BCLog::NET, "keeping outbound peer=%d chosen for eviction (connect time: %d, blocks_in_flight: %d)\n",
pnode->GetId(), count_seconds(pnode->m_connected), state.nBlocksInFlight);
pnode->GetId(), count_seconds(pnode->m_connected), state.vBlocksInFlight.size());
return false;
}
});
@ -5813,10 +5893,10 @@ bool PeerManagerImpl::SendMessages(CNode* pto)
// Message: getdata (blocks)
//
std::vector<CInv> vGetData;
if (CanServeBlocks(*peer) && ((sync_blocks_and_headers_from_peer && !IsLimitedPeer(*peer)) || !m_chainman.ActiveChainstate().IsInitialBlockDownload()) && state.nBlocksInFlight < MAX_BLOCKS_IN_TRANSIT_PER_PEER) {
if (CanServeBlocks(*peer) && ((sync_blocks_and_headers_from_peer && !IsLimitedPeer(*peer)) || !m_chainman.ActiveChainstate().IsInitialBlockDownload()) && state.vBlocksInFlight.size() < MAX_BLOCKS_IN_TRANSIT_PER_PEER) {
std::vector<const CBlockIndex*> vToDownload;
NodeId staller = -1;
FindNextBlocksToDownload(*peer, MAX_BLOCKS_IN_TRANSIT_PER_PEER - state.nBlocksInFlight, vToDownload, staller);
FindNextBlocksToDownload(*peer, MAX_BLOCKS_IN_TRANSIT_PER_PEER - state.vBlocksInFlight.size(), vToDownload, staller);
for (const CBlockIndex *pindex : vToDownload) {
uint32_t nFetchFlags = GetFetchFlags(*peer);
vGetData.push_back(CInv(MSG_BLOCK | nFetchFlags, pindex->GetBlockHash()));
@ -5824,7 +5904,7 @@ bool PeerManagerImpl::SendMessages(CNode* pto)
LogPrint(BCLog::NET, "Requesting block %s (%d) peer=%d\n", pindex->GetBlockHash().ToString(),
pindex->nHeight, pto->GetId());
}
if (state.nBlocksInFlight == 0 && staller != -1) {
if (state.vBlocksInFlight.empty() && staller != -1) {
if (State(staller)->m_stalling_since == 0us) {
State(staller)->m_stalling_since = current_time;
LogPrint(BCLog::NET, "Stall started peer=%d\n", staller);

View file

@ -22,6 +22,8 @@ static const bool DEFAULT_PEERBLOOMFILTERS = false;
static const bool DEFAULT_PEERBLOCKFILTERS = false;
/** Threshold for marking a node to be discouraged, e.g. disconnected and added to the discouragement filter. */
static const int DISCOURAGEMENT_THRESHOLD{100};
/** Maximum number of outstanding CMPCTBLOCK requests for the same block. */
static const unsigned int MAX_CMPCTBLOCKS_INFLIGHT_PER_BLOCK = 3;
struct CNodeStateStats {
int nSyncHeight = -1;

View file

@ -430,7 +430,7 @@ static RPCHelpMan getblockfrompeer()
"getblockfrompeer",
"Attempt to fetch block from a given peer.\n\n"
"We must have the header for this block, e.g. using submitheader.\n"
"Subsequent calls for the same block and a new peer will cause the response from the previous peer to be ignored.\n"
"Subsequent calls for the same block may cause the response from the previous peer to be ignored.\n"
"Peers generally ignore requests for a stale block that they never fully verified, or one that is more than a month old.\n"
"When a peer does not respond with a block, we will disconnect.\n"
"Note: The block could be re-pruned as soon as it is received.\n\n"

View file

@ -105,6 +105,10 @@ class TestP2PConn(P2PInterface):
self.last_message.pop("headers", None)
self.last_message.pop("cmpctblock", None)
def clear_getblocktxn(self):
with p2p_lock:
self.last_message.pop("getblocktxn", None)
def get_headers(self, locator, hashstop):
msg = msg_getheaders()
msg.locator.vHave = locator
@ -745,7 +749,7 @@ class CompactBlocksTest(BitcoinTestFramework):
peer.get_headers(locator=[int(tip, 16)], hashstop=0)
peer.send_and_ping(msg_sendcmpct(announce=True, version=2))
def test_compactblock_reconstruction_multiple_peers(self, stalling_peer, delivery_peer):
def test_compactblock_reconstruction_stalling_peer(self, stalling_peer, delivery_peer):
node = self.nodes[0]
assert len(self.utxos)
@ -823,12 +827,85 @@ class CompactBlocksTest(BitcoinTestFramework):
hb_test_node.send_and_ping(msg_sendcmpct(announce=False, version=2))
assert_highbandwidth_states(self.nodes[0], hb_to=True, hb_from=False)
def test_compactblock_reconstruction_parallel_reconstruction(self, stalling_peer, delivery_peer, inbound_peer, outbound_peer):
""" All p2p connections are inbound except outbound_peer. We test that ultimate parallel slot
can only be taken by an outbound node unless prior attempts were done by an outbound
"""
node = self.nodes[0]
assert len(self.utxos)
def announce_cmpct_block(node, peer, txn_count):
utxo = self.utxos.pop(0)
block = self.build_block_with_transactions(node, utxo, txn_count)
cmpct_block = HeaderAndShortIDs()
cmpct_block.initialize_from_block(block)
msg = msg_cmpctblock(cmpct_block.to_p2p())
peer.send_and_ping(msg)
with p2p_lock:
assert "getblocktxn" in peer.last_message
return block, cmpct_block
for name, peer in [("delivery", delivery_peer), ("inbound", inbound_peer), ("outbound", outbound_peer)]:
self.log.info(f"Setting {name} as high bandwidth peer")
block, cmpct_block = announce_cmpct_block(node, peer, 1)
msg = msg_blocktxn()
msg.block_transactions.blockhash = block.sha256
msg.block_transactions.transactions = block.vtx[1:]
peer.send_and_ping(msg)
assert_equal(int(node.getbestblockhash(), 16), block.sha256)
peer.clear_getblocktxn()
# Test the simple parallel download case...
for num_missing in [1, 5, 20]:
# Remaining low-bandwidth peer is stalling_peer, who announces first
assert_equal([peer['bip152_hb_to'] for peer in node.getpeerinfo()], [False, True, True, True])
block, cmpct_block = announce_cmpct_block(node, stalling_peer, num_missing)
delivery_peer.send_and_ping(msg_cmpctblock(cmpct_block.to_p2p()))
with p2p_lock:
# The second peer to announce should still get a getblocktxn
assert "getblocktxn" in delivery_peer.last_message
assert int(node.getbestblockhash(), 16) != block.sha256
inbound_peer.send_and_ping(msg_cmpctblock(cmpct_block.to_p2p()))
with p2p_lock:
# The third inbound peer to announce should *not* get a getblocktxn
assert "getblocktxn" not in inbound_peer.last_message
assert int(node.getbestblockhash(), 16) != block.sha256
outbound_peer.send_and_ping(msg_cmpctblock(cmpct_block.to_p2p()))
with p2p_lock:
# The third peer to announce should get a getblocktxn if outbound
assert "getblocktxn" in outbound_peer.last_message
assert int(node.getbestblockhash(), 16) != block.sha256
# Second peer completes the compact block first
msg = msg_blocktxn()
msg.block_transactions.blockhash = block.sha256
msg.block_transactions.transactions = block.vtx[1:]
delivery_peer.send_and_ping(msg)
assert_equal(int(node.getbestblockhash(), 16), block.sha256)
# Nothing bad should happen if we get a late fill from the first peer...
stalling_peer.send_and_ping(msg)
self.utxos.append([block.vtx[-1].sha256, 0, block.vtx[-1].vout[0].nValue])
delivery_peer.clear_getblocktxn()
inbound_peer.clear_getblocktxn()
outbound_peer.clear_getblocktxn()
def run_test(self):
self.wallet = MiniWallet(self.nodes[0])
# Setup the p2p connections
self.segwit_node = self.nodes[0].add_p2p_connection(TestP2PConn())
self.additional_segwit_node = self.nodes[0].add_p2p_connection(TestP2PConn())
self.onemore_inbound_node = self.nodes[0].add_p2p_connection(TestP2PConn())
self.outbound_node = self.nodes[0].add_outbound_p2p_connection(TestP2PConn(), p2p_idx=3, connection_type="outbound-full-relay")
# We will need UTXOs to construct transactions in later tests.
self.make_utxos()
@ -838,6 +915,8 @@ class CompactBlocksTest(BitcoinTestFramework):
self.log.info("Testing SENDCMPCT p2p message... ")
self.test_sendcmpct(self.segwit_node)
self.test_sendcmpct(self.additional_segwit_node)
self.test_sendcmpct(self.onemore_inbound_node)
self.test_sendcmpct(self.outbound_node)
self.log.info("Testing compactblock construction...")
self.test_compactblock_construction(self.segwit_node)
@ -860,8 +939,11 @@ class CompactBlocksTest(BitcoinTestFramework):
self.log.info("Testing handling of incorrect blocktxn responses...")
self.test_incorrect_blocktxn_response(self.segwit_node)
self.log.info("Testing reconstructing compact blocks from all peers...")
self.test_compactblock_reconstruction_multiple_peers(self.segwit_node, self.additional_segwit_node)
self.log.info("Testing reconstructing compact blocks with a stalling peer...")
self.test_compactblock_reconstruction_stalling_peer(self.segwit_node, self.additional_segwit_node)
self.log.info("Testing reconstructing compact blocks from multiple peers...")
self.test_compactblock_reconstruction_parallel_reconstruction(stalling_peer=self.segwit_node, inbound_peer=self.onemore_inbound_node, delivery_peer=self.additional_segwit_node, outbound_peer=self.outbound_node)
# Test that if we submitblock to node1, we'll get a compact block
# announcement to all peers.