diff --git a/src/net_processing.cpp b/src/net_processing.cpp index 0b25396dd21..285aa426662 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -536,7 +536,8 @@ public: std::vector GetOrphanTransactions() override EXCLUSIVE_LOCKS_REQUIRED(!m_tx_download_mutex); PeerManagerInfo GetInfo() const override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); void SendPings() override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); - void RelayTransaction(const uint256& txid, const uint256& wtxid) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); + std::pair GetFanoutPeersCount() override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); + void RelayTransaction(const uint256& txid, const uint256& wtxid, bool consider_fanout) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); void SetBestBlock(int height, std::chrono::seconds time) override { m_best_height = height; @@ -614,19 +615,19 @@ private: /** Handle a transaction whose result was MempoolAcceptResult::ResultType::VALID. * Updates m_txrequest, m_orphanage, and vExtraTxnForCompact. Also queues the tx for relay. */ - void ProcessValidTx(NodeId nodeid, const CTransactionRef& tx, const std::list& replaced_transactions) + void ProcessValidTx(NodeId nodeid, const CTransactionRef& tx, const std::list& replaced_transactions, bool consider_fanout) EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, g_msgproc_mutex, m_tx_download_mutex); /** Handle the results of package validation: calls ProcessValidTx and ProcessInvalidTx for * individual transactions, and caches rejection for the package as a group. */ - void ProcessPackageResult(const node::PackageToValidate& package_to_validate, const PackageMempoolAcceptResult& package_result) + void ProcessPackageResult(const node::PackageToValidate& package_to_validate, const PackageMempoolAcceptResult& package_result, bool consider_fanout) EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, g_msgproc_mutex, m_tx_download_mutex); /** * Reconsider orphan transactions after a parent has been accepted to the mempool. * - * @peer[in] peer The peer whose orphan transactions we will reconsider. Generally only + * @param[in] peer The peer whose orphan transactions we will reconsider. Generally only * one orphan will be reconsidered on each call of this function. If an * accepted orphan has orphaned children, those will need to be * reconsidered, creating more work, possibly for other peers. @@ -637,6 +638,17 @@ private: bool ProcessOrphanTx(Peer& peer) EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, g_msgproc_mutex, !m_tx_download_mutex); + /** Whether we should fanout to a given peer or not. Always returns true for non-Erlay peers + * For Erlay-peers, if they are inbound, returns true as long as the peer has been selected for fanout. + * If they are outbound, returns true as long as the transaction was received via fanout (further filtering will be performed + * before sending out the next INV message to each peer). + * Returns false otherwise. + * + * @param[in] peer The peer we are making the decision on + * @param[in] consider_fanout Whether to consider fanout or not (only applies if the peer is outbound) + */ + bool ShouldFanoutTo(const PeerRef peer, bool consider_fanout) EXCLUSIVE_LOCKS_REQUIRED(m_peer_mutex); + /** Process a single headers message from a peer. * * @param[in] pfrom CNode of the peer @@ -1581,7 +1593,8 @@ void PeerManagerImpl::ReattemptInitialBroadcast(CScheduler& scheduler) CTransactionRef tx = m_mempool.get(txid); if (tx != nullptr) { - RelayTransaction(txid, tx->GetWitnessHash()); + // Always consider fanout when relaying our own transactions + RelayTransaction(txid, tx->GetWitnessHash(), /*consider_fanout=*/true); } else { m_mempool.RemoveUnbroadcastTx(txid, true); } @@ -2148,12 +2161,44 @@ void PeerManagerImpl::SendPings() for(auto& it : m_peer_map) it.second->m_ping_queued = true; } -void PeerManagerImpl::RelayTransaction(const uint256& txid, const uint256& wtxid) +std::pair PeerManagerImpl::GetFanoutPeersCount() +{ + + size_t inbounds_fanout_tx_relay = 0, outbounds_fanout_tx_relay = 0; + + if (m_txreconciliation) { + LOCK(m_peer_mutex); + for(const auto& [peer_id, peer] : m_peer_map) { + if (const auto tx_relay = peer->GetTxRelay()) { + const bool peer_relays_txs = WITH_LOCK(tx_relay->m_bloom_filter_mutex, return tx_relay->m_relay_txs); + if (peer_relays_txs && !m_txreconciliation->IsPeerRegistered(peer_id)) { + peer->m_is_inbound ? ++inbounds_fanout_tx_relay : ++outbounds_fanout_tx_relay; + } + } + } + } + + return std::pair(inbounds_fanout_tx_relay, outbounds_fanout_tx_relay); +} + +bool PeerManagerImpl::ShouldFanoutTo(const PeerRef peer, bool consider_fanout) +{ + // We consider Erlay peers for fanout if they are within our inbound fanout targets, or if they are outbounds + // and the transaction was NOT received via set reconciliation. For the latter group, further filtering + // will be applied at relay time. + if (m_txreconciliation && m_txreconciliation->IsPeerRegistered(peer->m_id)) { + return (!peer->m_is_inbound && consider_fanout) || m_txreconciliation->IsInboundFanoutTarget(peer->m_id); + } else { + // For non-Erlay peers we always fanout (same applies if we do not support Erlay) + return true; + } +} + +void PeerManagerImpl::RelayTransaction(const uint256& txid, const uint256& wtxid, bool consider_fanout) { LOCK(m_peer_mutex); - for(auto& it : m_peer_map) { - Peer& peer = *it.second; - auto tx_relay = peer.GetTxRelay(); + for(auto& [peer_id, peer] : m_peer_map) { + auto tx_relay = peer->GetTxRelay(); if (!tx_relay) continue; LOCK(tx_relay->m_tx_inventory_mutex); @@ -2164,9 +2209,26 @@ void PeerManagerImpl::RelayTransaction(const uint256& txid, const uint256& wtxid // in the announcement. if (tx_relay->m_next_inv_send_time == 0s) continue; - const uint256& hash{peer.m_wtxid_relay ? wtxid : txid}; + const uint256& hash{peer->m_wtxid_relay ? wtxid : txid}; if (!tx_relay->m_tx_inventory_known_filter.contains(hash)) { - tx_relay->m_tx_inventory_to_send.insert(hash); + bool fanout = ShouldFanoutTo(peer, consider_fanout); + // FIXME: This bit here and the corresponding one in SendMessage are basically identical. + // Check if there is a way to make them into a private function We would have to pass a + // lambda that does the "insert into a collection part", which is what's different from both + if (!fanout) { + Assume(m_txreconciliation); + const auto result = m_txreconciliation->AddToSet(peer_id, Wtxid::FromUint256(wtxid)); + if (!result.m_succeeded) { + fanout = true; + if (const auto collision = result.m_collision; collision.has_value()) { + Assume(m_txreconciliation->TryRemovingFromSet(peer_id, collision.value())); + tx_relay->m_tx_inventory_to_send.insert(collision.value().ToUint256()); + } + } + } + if (fanout) { + tx_relay->m_tx_inventory_to_send.insert(hash); + } } }; } @@ -3024,7 +3086,7 @@ std::optional PeerManagerImpl::ProcessInvalidTx(NodeId return package_to_validate; } -void PeerManagerImpl::ProcessValidTx(NodeId nodeid, const CTransactionRef& tx, const std::list& replaced_transactions) +void PeerManagerImpl::ProcessValidTx(NodeId nodeid, const CTransactionRef& tx, const std::list& replaced_transactions, bool consider_fanout) { AssertLockNotHeld(m_peer_mutex); AssertLockHeld(g_msgproc_mutex); @@ -3038,14 +3100,14 @@ void PeerManagerImpl::ProcessValidTx(NodeId nodeid, const CTransactionRef& tx, c tx->GetWitnessHash().ToString(), m_mempool.size(), m_mempool.DynamicMemoryUsage() / 1000); - RelayTransaction(tx->GetHash(), tx->GetWitnessHash()); + RelayTransaction(tx->GetHash(), tx->GetWitnessHash(), consider_fanout); for (const CTransactionRef& removedTx : replaced_transactions) { AddToCompactExtraTransactions(removedTx); } } -void PeerManagerImpl::ProcessPackageResult(const node::PackageToValidate& package_to_validate, const PackageMempoolAcceptResult& package_result) +void PeerManagerImpl::ProcessPackageResult(const node::PackageToValidate& package_to_validate, const PackageMempoolAcceptResult& package_result, bool consider_fanout) { AssertLockNotHeld(m_peer_mutex); AssertLockHeld(g_msgproc_mutex); @@ -3075,7 +3137,7 @@ void PeerManagerImpl::ProcessPackageResult(const node::PackageToValidate& packag switch (tx_result.m_result_type) { case MempoolAcceptResult::ResultType::VALID: { - ProcessValidTx(nodeid, tx, tx_result.m_replaced_transactions); + ProcessValidTx(nodeid, tx, tx_result.m_replaced_transactions, consider_fanout); break; } case MempoolAcceptResult::ResultType::INVALID: @@ -3118,7 +3180,15 @@ bool PeerManagerImpl::ProcessOrphanTx(Peer& peer) if (result.m_result_type == MempoolAcceptResult::ResultType::VALID) { LogDebug(BCLog::TXPACKAGES, " accepted orphan tx %s (wtxid=%s)\n", orphanHash.ToString(), orphan_wtxid.ToString()); - ProcessValidTx(peer.m_id, porphanTx, result.m_replaced_transactions); + // When processing orphans after getting a missing parent, we always consider faning the out. + // TODO: This is a weird case. ProcessOrphanTx is triggered by the reception of a missing parent, but we do not keep track of + // when the orphan was received, so it could be the case that we end up here when the orphan is still in its early propagation + // state, or far after. If we are late, relaying the transaction may not be worth it, our peers may already know (reconciling + // would even be worse than fanout, since it would simply add more roundtrips to the same outcome: annoincing a transaction that is + // already known). + // This should only happen if we connect to the network and receive a depending transaction late on its propagation. + // However, it may be worth considering how to deal with this case in a followup to reduce unnecessary traffic. + ProcessValidTx(peer.m_id, porphanTx, result.m_replaced_transactions, /*consider_fanout=*/true); return true; } else if (state.GetResult() != TxValidationResult::TX_MISSING_INPUTS) { LogDebug(BCLog::TXPACKAGES, " invalid orphan tx %s (wtxid=%s) from peer=%d. %s\n", @@ -4262,6 +4332,9 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, LOCK2(cs_main, m_tx_download_mutex); + // TODO: Until the Erlay p2p flow is defined, all transactions are flagged for fanout + bool consider_fanout = true; + const auto& [should_validate, package_to_validate] = m_txdownloadman.ReceivedTx(pfrom.GetId(), ptx); if (!should_validate) { if (pfrom.HasPermission(NetPermissionFlags::ForceRelay)) { @@ -4274,7 +4347,7 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, } else { LogPrintf("Force relaying tx %s (wtxid=%s) from peer=%d\n", tx.GetHash().ToString(), tx.GetWitnessHash().ToString(), pfrom.GetId()); - RelayTransaction(tx.GetHash(), tx.GetWitnessHash()); + RelayTransaction(tx.GetHash(), tx.GetWitnessHash(), consider_fanout); } } @@ -4282,7 +4355,7 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, const auto package_result{ProcessNewPackage(m_chainman.ActiveChainstate(), m_mempool, package_to_validate->m_txns, /*test_accept=*/false, /*client_maxfeerate=*/std::nullopt)}; LogDebug(BCLog::TXPACKAGES, "package evaluation for %s: %s\n", package_to_validate->ToString(), package_result.m_state.IsValid() ? "package accepted" : "package rejected"); - ProcessPackageResult(package_to_validate.value(), package_result); + ProcessPackageResult(package_to_validate.value(), package_result, consider_fanout); } return; } @@ -4294,7 +4367,7 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, const TxValidationState& state = result.m_state; if (result.m_result_type == MempoolAcceptResult::ResultType::VALID) { - ProcessValidTx(pfrom.GetId(), ptx, result.m_replaced_transactions); + ProcessValidTx(pfrom.GetId(), ptx, result.m_replaced_transactions, consider_fanout); pfrom.m_last_tx_time = GetTime(); } if (state.IsInvalid()) { @@ -4302,7 +4375,7 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, const auto package_result{ProcessNewPackage(m_chainman.ActiveChainstate(), m_mempool, package_to_validate->m_txns, /*test_accept=*/false, /*client_maxfeerate=*/std::nullopt)}; LogDebug(BCLog::TXPACKAGES, "package evaluation for %s: %s\n", package_to_validate->ToString(), package_result.m_state.IsValid() ? "package accepted" : "package rejected"); - ProcessPackageResult(package_to_validate.value(), package_result); + ProcessPackageResult(package_to_validate.value(), package_result, consider_fanout); } } @@ -5702,10 +5775,12 @@ bool PeerManagerImpl::SendMessages(CNode* pto) peer->m_blocks_for_inv_relay.clear(); } + // Check whether periodic sends should happen + bool fSendTrickle = pto->HasPermission(NetPermissionFlags::NoBan); if (auto tx_relay = peer->GetTxRelay(); tx_relay != nullptr) { + std::vector> to_be_announced{}; + { LOCK(tx_relay->m_tx_inventory_mutex); - // Check whether periodic sends should happen - bool fSendTrickle = pto->HasPermission(NetPermissionFlags::NoBan); if (tx_relay->m_next_inv_send_time < current_time) { fSendTrickle = true; if (pto->IsInboundConn()) { @@ -5715,6 +5790,12 @@ bool PeerManagerImpl::SendMessages(CNode* pto) } } + // Rotate inbound fanout targets if the timer has gone off + if (m_txreconciliation && m_txreconciliation->GetNextInboundPeerRotationTime() < current_time) { + m_txreconciliation->SetNextInboundPeerRotationTime(m_rng.rand_exp_duration(INBOUND_FANOUT_ROTATION_INTERVAL)); + m_txreconciliation->RotateInboundFanoutTargets(); + } + // Time to send but the peer has requested we not relay transactions. if (fSendTrickle) { LOCK(tx_relay->m_bloom_filter_mutex); @@ -5779,7 +5860,6 @@ bool PeerManagerImpl::SendMessages(CNode* pto) std::set::iterator it = vInvTx.back(); vInvTx.pop_back(); uint256 hash = *it; - CInv inv(peer->m_wtxid_relay ? MSG_WTX : MSG_TX, hash); // Remove it from the to-be-sent set tx_relay->m_tx_inventory_to_send.erase(it); // Check if not in the filter already @@ -5787,7 +5867,7 @@ bool PeerManagerImpl::SendMessages(CNode* pto) continue; } // Not in the mempool anymore? don't bother sending it. - auto txinfo = m_mempool.info(ToGenTxid(inv)); + auto txinfo = m_mempool.info(GenTxid(peer->m_wtxid_relay ? GenTxid::Wtxid(hash) : GenTxid::Txid(hash))); if (!txinfo.tx) { continue; } @@ -5796,20 +5876,81 @@ bool PeerManagerImpl::SendMessages(CNode* pto) continue; } if (tx_relay->m_bloom_filter && !tx_relay->m_bloom_filter->IsRelevantAndUpdate(*txinfo.tx)) continue; - // Send - vInv.push_back(inv); + // Flag to be sent. + // Initialize with the counter at 0. This in only relevant for Erlay nodes and will be updated in the + // following context after releasing m_tx_inventory_mutex + to_be_announced.emplace_back(hash, 0); nRelayedTransactions++; - if (vInv.size() == MAX_INV_SZ) { - MakeAndPushMessage(*pto, NetMsgType::INV, vInv); - vInv.clear(); - } - tx_relay->m_tx_inventory_known_filter.insert(hash); } - - // Ensure we'll respond to GETDATA requests for anything we've just announced - LOCK(m_mempool.cs); - tx_relay->m_last_inv_sequence = m_mempool.GetSequence(); } + } // Unlock the m_tx_inventory_mutex so we can count over m_peer_map + + // Only care about fanout count if we support Erlay + if (m_txreconciliation) { + LOCK(m_peer_mutex); + for (auto& [hash, out_fanout_count] : to_be_announced) { + for (const auto& [cur_peer_id, cur_peer] : m_peer_map) { + if (auto peer_tx_relay = cur_peer->GetTxRelay()) { + LOCK(peer_tx_relay->m_tx_inventory_mutex); + if (!pto->IsInboundConn() && peer_tx_relay->m_tx_inventory_known_filter.contains(hash)) { + out_fanout_count+=1; + } + } + } + } + } + + // Re-lock + LOCK(tx_relay->m_tx_inventory_mutex); + for (auto& [hash, out_fanout_count] : to_be_announced) { + // Send + bool should_fanout = true; + // For non-Erlay and inbound peer simply fanout. Erlay-enabled inbounds have been assigned transaction to reconcile + // in RelayTransaction, so everything that was added to m_tx_inventory_to_send is to be fanout + if (!pto->IsInboundConn() && m_txreconciliation && m_txreconciliation->IsPeerRegistered(pto->GetId())) { + // For Erlay-enabled outbound peers we fanout based on how we have heard about this transaction + // and how many announcements of this transactions have we sent and receivedx + // TODO: If we are the transaction source, we should reduce the threshold by 1, since this the only case + // where we are not accounting for at least one reception + should_fanout = out_fanout_count <= OUTBOUND_FANOUT_THRESHOLD; + } + + CInv inv(peer->m_wtxid_relay ? MSG_WTX : MSG_TX, hash); + auto add_to_inv_vec = [&](const CInv inv) EXCLUSIVE_LOCKS_REQUIRED(tx_relay->m_tx_inventory_mutex) { + vInv.push_back(inv); + if (vInv.size() == MAX_INV_SZ) { + MakeAndPushMessage(*pto, NetMsgType::INV, vInv); + vInv.clear(); + } + tx_relay->m_tx_inventory_known_filter.insert(inv.hash); + }; + + if (!should_fanout) { + // Note we are not handling the case of ancestors being reconciled and descendants being fanout. + // This can propagate descendants faster than ancestors, making them orphans. However, transactions + // are picked for reconciliation if we consider they have been propagated enough, so in this case + // odds are that the peer already knows about the parent (and it's queued to be announced or reconciled to us). + Assume(m_txreconciliation); + const auto result = m_txreconciliation->AddToSet(pto->GetId(), Wtxid::FromUint256(hash)); + if (!result.m_succeeded) { + should_fanout = true; + if (const auto collision = result.m_collision; collision.has_value()) { + // In case of a collision, this loop will increase nRelayedTransactions twice + Assume(m_txreconciliation->TryRemovingFromSet(pto->GetId(), collision.value())); + add_to_inv_vec(CInv(MSG_WTX, collision.value())); + } + } + } + if (should_fanout) { + add_to_inv_vec(inv); + } + } + + if (fSendTrickle) { + // Ensure we'll respond to GETDATA requests for anything we've just announced + LOCK(m_mempool.cs); + tx_relay->m_last_inv_sequence = m_mempool.GetSequence(); + } } if (!vInv.empty()) MakeAndPushMessage(*pto, NetMsgType::INV, vInv); diff --git a/src/net_processing.h b/src/net_processing.h index 1156ec08f73..d3b2bd8b4df 100644 --- a/src/net_processing.h +++ b/src/net_processing.h @@ -119,7 +119,10 @@ public: virtual PeerManagerInfo GetInfo() const = 0; /** Relay transaction to all peers. */ - virtual void RelayTransaction(const uint256& txid, const uint256& wtxid) = 0; + virtual void RelayTransaction(const uint256& txid, const uint256& wtxid, bool consider_fanout) = 0; + + /** Get the amount of inbounds (first) and outbounds fanout peers (second). */ + virtual std::pair GetFanoutPeersCount() = 0; /** Send ping message to all peers */ virtual void SendPings() = 0; diff --git a/src/node/transaction.cpp b/src/node/transaction.cpp index 666597391e3..9377e640581 100644 --- a/src/node/transaction.cpp +++ b/src/node/transaction.cpp @@ -117,7 +117,8 @@ TransactionError BroadcastTransaction(NodeContext& node, const CTransactionRef t } if (relay) { - node.peerman->RelayTransaction(txid, wtxid); + // Always consider fanout for out own transactions + node.peerman->RelayTransaction(txid, wtxid, /*consider_fanout=*/true); } return TransactionError::OK; diff --git a/src/node/txreconciliation.cpp b/src/node/txreconciliation.cpp index e6e19c5756b..426d04c56a2 100644 --- a/src/node/txreconciliation.cpp +++ b/src/node/txreconciliation.cpp @@ -7,7 +7,9 @@ #include #include #include +#include +#include #include #include @@ -36,27 +38,49 @@ class TxReconciliationState { public: /** - * TODO: This field is public to ignore -Wunused-private-field. Make private once used in - * the following commits. - * * Reconciliation protocol assumes using one role consistently: either a reconciliation * initiator (requesting sketches), or responder (sending sketches). This defines our role, * based on the direction of the p2p connection. - * */ bool m_we_initiate; /** - * TODO: These fields are public to ignore -Wunused-private-field. Make private once used in - * the following commits. - * + * Store all wtxids which we would announce to the peer (policy checks passed, etc.) + * in this set instead of announcing them right away. When reconciliation time comes, we will + * compute a compressed representation of this set ("sketch") and use it to efficiently + * reconcile this set with a set on the peer's side. + */ + std::unordered_set m_local_set; + + /** + * Reconciliation sketches are computed over short transaction IDs. + * This is a cache of these IDs enabling faster lookups of full wtxids, + * useful when peer will ask for missing transactions by short IDs + * at the end of a reconciliation round. + * We also use this to keep track of short ID collisions. In case of a + * collision, both transactions should be fanout. + */ + std::map m_short_id_mapping; + + TxReconciliationState(bool we_initiate, uint64_t k0, uint64_t k1) : m_we_initiate(we_initiate), m_k0(k0), m_k1(k1) {} + + /** + * Reconciliation sketches are computed over short transaction IDs. + * Short IDs are salted with a link-specific constant value. + */ + uint32_t ComputeShortID(const uint256 wtxid) const + { + const uint64_t s = SipHashUint256(m_k0, m_k1, wtxid); + const uint32_t short_txid = 1 + (s & 0xFFFFFFFF); + return short_txid; + } + +private: + /** * These values are used to salt short IDs, which is necessary for transaction reconciliations. */ uint64_t m_k0, m_k1; - - TxReconciliationState(bool we_initiate, uint64_t k0, uint64_t k1) : m_we_initiate(we_initiate), m_k0(k0), m_k1(k1) {} }; - } // namespace /** Actual implementation for TxReconciliationTracker's data structure. */ @@ -76,16 +100,40 @@ private: */ std::unordered_map> m_states GUARDED_BY(m_txreconciliation_mutex); + /* + * Keeps track of how many of the registered peers are inbound. Updated on registering or + * forgetting peers. + */ + size_t m_inbounds_count GUARDED_BY(m_txreconciliation_mutex){0}; + + /** + * Collection of inbound peers selected for fanout. Should get periodically rotated using RotateInboundFanoutTargets. + */ + std::unordered_set m_inbound_fanout_targets; + + /** + * Next time m_inbound_fanout_targets need to be rotated. + */ + std::chrono::microseconds GUARDED_BY(m_txreconciliation_mutex) m_next_inbound_peer_rotation_time{0}; + + TxReconciliationState* GetRegisteredPeerState(NodeId peer_id) EXCLUSIVE_LOCKS_REQUIRED(m_txreconciliation_mutex) + { + AssertLockHeld(m_txreconciliation_mutex); + auto salt_or_state = m_states.find(peer_id); + if (salt_or_state == m_states.end()) return nullptr; + + return std::get_if(&salt_or_state->second); + } + public: explicit Impl(uint32_t recon_version) : m_recon_version(recon_version) {} - uint64_t PreRegisterPeer(NodeId peer_id) EXCLUSIVE_LOCKS_REQUIRED(!m_txreconciliation_mutex) + uint64_t PreRegisterPeer(NodeId peer_id, uint64_t local_salt) EXCLUSIVE_LOCKS_REQUIRED(!m_txreconciliation_mutex) { AssertLockNotHeld(m_txreconciliation_mutex); LOCK(m_txreconciliation_mutex); LogPrintLevel(BCLog::TXRECONCILIATION, BCLog::Level::Debug, "Pre-register peer=%d\n", peer_id); - const uint64_t local_salt{FastRandomContext().rand64()}; // We do this exactly once per peer (which are unique by NodeId, see GetNewNodeId) so it's // safe to assume we don't have this record yet. @@ -93,6 +141,21 @@ public: return local_salt; } + bool HasCollision(TxReconciliationState *peer_state, const Wtxid& wtxid, Wtxid& collision, uint32_t &short_id) EXCLUSIVE_LOCKS_REQUIRED(m_txreconciliation_mutex) + { + AssertLockHeld(m_txreconciliation_mutex); + + short_id = peer_state->ComputeShortID(wtxid); + const auto iter = peer_state->m_short_id_mapping.find(short_id); + + if (iter != peer_state->m_short_id_mapping.end()) { + collision = iter->second; + return true; + } + + return false; + } + ReconciliationRegisterResult RegisterPeer(NodeId peer_id, bool is_peer_inbound, uint32_t peer_recon_version, uint64_t remote_salt) EXCLUSIVE_LOCKS_REQUIRED(!m_txreconciliation_mutex) { @@ -121,19 +184,116 @@ public: peer_id, is_peer_inbound); const uint256 full_salt{ComputeSalt(local_salt, remote_salt)}; - recon_state->second = TxReconciliationState(!is_peer_inbound, full_salt.GetUint64(0), full_salt.GetUint64(1)); + + auto new_state = TxReconciliationState(!is_peer_inbound, full_salt.GetUint64(0), full_salt.GetUint64(1));; + m_states.erase(recon_state); + bool emplaced = m_states.emplace(peer_id, std::move(new_state)).second; + Assume(emplaced); + + if (is_peer_inbound && m_inbounds_count < std::numeric_limits::max()) { + ++m_inbounds_count; + + if (m_inbound_fanout_targets.size() < std::floor(m_inbounds_count * INBOUND_FANOUT_DESTINATIONS_FRACTION)) { + // Scale up fanout targets as we get more connections. Targets will be rotated periodically via RotateInboundFanoutTargets + if (FastRandomContext().randrange(10) <= INBOUND_FANOUT_DESTINATIONS_FRACTION * 10) { + m_inbound_fanout_targets.insert(peer_id); + } + } + } return ReconciliationRegisterResult::SUCCESS; } + AddToSetResult AddToSet(NodeId peer_id, const Wtxid& wtxid) EXCLUSIVE_LOCKS_REQUIRED(!m_txreconciliation_mutex) + { + AssertLockNotHeld(m_txreconciliation_mutex); + LOCK(m_txreconciliation_mutex); + auto peer_state = GetRegisteredPeerState(peer_id); + if (!peer_state) return AddToSetResult::Failed(); + + // Bypass if the wtxid is already in the set + if (peer_state->m_local_set.contains(wtxid)) { + LogPrintLevel(BCLog::TXRECONCILIATION, BCLog::Level::Debug, "%s already in reconciliation set for peer=%d. Bypassing.\n", + wtxid.ToString(), peer_id); + return AddToSetResult::Succeeded(); + } + + // Make sure there is no short id collision between the wtxid we are trying to add + // and any existing one in the reconciliation set + Wtxid collision; + uint32_t short_id; + if (HasCollision(peer_state, wtxid, collision, short_id)) { + return AddToSetResult::Collision(collision); + } + + // Transactions which don't make it to the set due to the limit are announced via fanout. + if (peer_state->m_local_set.size() >= MAX_RECONSET_SIZE) { + LogPrintLevel(BCLog::TXRECONCILIATION, BCLog::Level::Debug, "Reconciliation set maximum size reached for peer=%d.\n", peer_id); + return AddToSetResult::Failed(); + } + + if (peer_state->m_local_set.insert(wtxid).second) { + peer_state->m_short_id_mapping.emplace(short_id, wtxid); + LogPrintLevel(BCLog::TXRECONCILIATION, BCLog::Level::Debug, "Added %s to the reconciliation set for peer=%d. " + "Now the set contains %i transactions.\n", + wtxid.ToString(), peer_id, peer_state->m_local_set.size()); + } + return AddToSetResult::Succeeded(); + } + + bool IsTransactionInSet(NodeId peer_id, const Wtxid& wtxid) EXCLUSIVE_LOCKS_REQUIRED(!m_txreconciliation_mutex) + { + AssertLockNotHeld(m_txreconciliation_mutex); + LOCK(m_txreconciliation_mutex); + auto peer_state = GetRegisteredPeerState(peer_id); + if (!peer_state) return false; + + return peer_state->m_local_set.contains(wtxid); + } + + bool TryRemovingFromSet(NodeId peer_id, const Wtxid& wtxid) EXCLUSIVE_LOCKS_REQUIRED(!m_txreconciliation_mutex) + { + AssertLockNotHeld(m_txreconciliation_mutex); + LOCK(m_txreconciliation_mutex); + auto peer_state = GetRegisteredPeerState(peer_id); + if (!peer_state) return false; + + auto removed = peer_state->m_local_set.erase(wtxid) > 0; + if (removed) { + peer_state->m_short_id_mapping.erase(peer_state->ComputeShortID(wtxid)); + LogPrintLevel(BCLog::TXRECONCILIATION, BCLog::Level::Debug, "Removed %s from the reconciliation set for peer=%d. " + "Now the set contains %i transactions.\n", + wtxid.ToString(), peer_id, peer_state->m_local_set.size()); + } else { + LogPrintLevel(BCLog::TXRECONCILIATION, BCLog::Level::Debug, "Couldn't remove %s from the reconciliation set for peer=%d. " + "Transaction not found\n", + wtxid.ToString(), peer_id); + } + + return removed; + } + void ForgetPeer(NodeId peer_id) EXCLUSIVE_LOCKS_REQUIRED(!m_txreconciliation_mutex) { AssertLockNotHeld(m_txreconciliation_mutex); LOCK(m_txreconciliation_mutex); + const auto peer = m_states.find(peer_id); + if (peer == m_states.end()) return; + + const auto registered = std::get_if(&peer->second); + if (registered && !registered->m_we_initiate) { + Assert(m_inbounds_count > 0); + --m_inbounds_count; + m_inbound_fanout_targets.erase(peer_id); + } + if (m_states.erase(peer_id)) { LogPrintLevel(BCLog::TXRECONCILIATION, BCLog::Level::Debug, "Forget txreconciliation state of peer=%d\n", peer_id); } } + /** + * For calls within this class use GetRegisteredPeerState instead. + */ bool IsPeerRegistered(NodeId peer_id) const EXCLUSIVE_LOCKS_REQUIRED(!m_txreconciliation_mutex) { AssertLockNotHeld(m_txreconciliation_mutex); @@ -142,15 +302,89 @@ public: return (recon_state != m_states.end() && std::holds_alternative(recon_state->second)); } + + bool IsInboundFanoutTarget(NodeId peer_id) const EXCLUSIVE_LOCKS_REQUIRED(!m_txreconciliation_mutex) { + AssertLockNotHeld(m_txreconciliation_mutex); + LOCK(m_txreconciliation_mutex); + return m_inbound_fanout_targets.contains(peer_id); + } + + std::chrono::microseconds GetNextInboundPeerRotationTime() EXCLUSIVE_LOCKS_REQUIRED(!m_txreconciliation_mutex) { + AssertLockNotHeld(m_txreconciliation_mutex); + LOCK(m_txreconciliation_mutex); + return m_next_inbound_peer_rotation_time; + } + + void SetNextInboundPeerRotationTime(std::chrono::microseconds next_time) EXCLUSIVE_LOCKS_REQUIRED(!m_txreconciliation_mutex) { + AssertLockNotHeld(m_txreconciliation_mutex); + LOCK(m_txreconciliation_mutex); + m_next_inbound_peer_rotation_time = next_time; + } + + void RotateInboundFanoutTargets() EXCLUSIVE_LOCKS_REQUIRED(!m_txreconciliation_mutex) { + AssertLockNotHeld(m_txreconciliation_mutex); + LOCK(m_txreconciliation_mutex); + + auto targets_size = std::floor(m_inbounds_count * INBOUND_FANOUT_DESTINATIONS_FRACTION); + if (targets_size == 0) { + return; + } + + std::vector inbound_recon_peers; + inbound_recon_peers.reserve(m_inbounds_count); + + // Collect all inbound reconciling peers ids in a vector and shuffle it + for (const auto& [peer_id, op_peer_state]: m_states) { + const auto peer_state = std::get_if(&op_peer_state); + if (peer_state && !peer_state->m_we_initiate) { + inbound_recon_peers.push_back(peer_id); + } + } + std::shuffle(inbound_recon_peers.begin(), inbound_recon_peers.end(), FastRandomContext()); + + // Pick the new selection of inbound fanout peers + Assume(inbound_recon_peers.size() > targets_size); + m_inbound_fanout_targets.clear(); + m_inbound_fanout_targets.reserve(targets_size); + m_inbound_fanout_targets.insert(inbound_recon_peers.begin(), inbound_recon_peers.begin() + targets_size); + + } }; +AddToSetResult::AddToSetResult(bool succeeded, std::optional collision) +{ + m_succeeded = succeeded; + m_collision = collision; +} + +AddToSetResult AddToSetResult::Succeeded() +{ + return AddToSetResult(true, std::nullopt); +} + +AddToSetResult AddToSetResult::Failed() +{ + return AddToSetResult(false, std::nullopt); +} + +AddToSetResult AddToSetResult::Collision(Wtxid wtxid) +{ + return AddToSetResult(false, std::make_optional(wtxid)); +} + TxReconciliationTracker::TxReconciliationTracker(uint32_t recon_version) : m_impl{std::make_unique(recon_version)} {} TxReconciliationTracker::~TxReconciliationTracker() = default; uint64_t TxReconciliationTracker::PreRegisterPeer(NodeId peer_id) { - return m_impl->PreRegisterPeer(peer_id); + const uint64_t local_salt{FastRandomContext().rand64()}; + return m_impl->PreRegisterPeer(peer_id, local_salt); +} + +void TxReconciliationTracker::PreRegisterPeerWithSalt(NodeId peer_id, uint64_t local_salt) +{ + m_impl->PreRegisterPeer(peer_id, local_salt); } ReconciliationRegisterResult TxReconciliationTracker::RegisterPeer(NodeId peer_id, bool is_peer_inbound, @@ -159,6 +393,22 @@ ReconciliationRegisterResult TxReconciliationTracker::RegisterPeer(NodeId peer_i return m_impl->RegisterPeer(peer_id, is_peer_inbound, peer_recon_version, remote_salt); } +AddToSetResult TxReconciliationTracker::AddToSet(NodeId peer_id, const Wtxid& wtxid) +{ + return m_impl->AddToSet(peer_id, wtxid); +} + +bool TxReconciliationTracker::IsTransactionInSet(NodeId peer_id, const Wtxid& wtxid) +{ + return m_impl->IsTransactionInSet(peer_id, wtxid); +} + + +bool TxReconciliationTracker::TryRemovingFromSet(NodeId peer_id, const Wtxid& wtxid) +{ + return m_impl->TryRemovingFromSet(peer_id, wtxid); +} + void TxReconciliationTracker::ForgetPeer(NodeId peer_id) { m_impl->ForgetPeer(peer_id); @@ -168,3 +418,21 @@ bool TxReconciliationTracker::IsPeerRegistered(NodeId peer_id) const { return m_impl->IsPeerRegistered(peer_id); } + +bool TxReconciliationTracker::IsInboundFanoutTarget(NodeId peer_id) +{ + return m_impl->IsInboundFanoutTarget(peer_id); +} + +std::chrono::microseconds TxReconciliationTracker::GetNextInboundPeerRotationTime(){ + return m_impl->GetNextInboundPeerRotationTime(); +} + +void TxReconciliationTracker::SetNextInboundPeerRotationTime(std::chrono::microseconds next_time) { + return m_impl->SetNextInboundPeerRotationTime(next_time); +} + +void TxReconciliationTracker::RotateInboundFanoutTargets() +{ + return m_impl->RotateInboundFanoutTargets(); +} diff --git a/src/node/txreconciliation.h b/src/node/txreconciliation.h index 3bbb0773664..5bff9c6fa65 100644 --- a/src/node/txreconciliation.h +++ b/src/node/txreconciliation.h @@ -10,10 +10,29 @@ #include #include +#include /** Supported transaction reconciliation protocol version */ static constexpr uint32_t TXRECONCILIATION_VERSION{1}; +/** + * Maximum number of wtxids stored in a peer local set, bounded to protect the memory use of + * reconciliation sets and short ids mappings, and CPU used for sketch computation. + */ +constexpr size_t MAX_RECONSET_SIZE = 3000; + +/** + * Announce transactions via full wtxid to a limited number of inbound and outbound peers. + * Justification for these values are provided here: + * TODO: ADD link to justification based on simulation results */ +constexpr double INBOUND_FANOUT_DESTINATIONS_FRACTION = 0.1; +constexpr size_t OUTBOUND_FANOUT_THRESHOLD = 4; + +/** + * Interval for inbound peer fanout selection. The subset is rotated on a timer. + */ +static constexpr auto INBOUND_FANOUT_ROTATION_INTERVAL{10min}; + enum class ReconciliationRegisterResult { NOT_FOUND, SUCCESS, @@ -21,6 +40,23 @@ enum class ReconciliationRegisterResult { PROTOCOL_VIOLATION, }; +/** + * Record whether or not a wtxid was successfully added to a reconciliation set. + * In case of failure, check whether this was due to a shortid collision and record + * the colliding wtxid. +*/ +class AddToSetResult +{ + public: + bool m_succeeded; + std::optional m_collision; + + explicit AddToSetResult(bool added, std::optional conflict); + static AddToSetResult Succeeded(); + static AddToSetResult Failed(); + static AddToSetResult Collision(Wtxid); +}; + /** * Transaction reconciliation is a way for nodes to efficiently announce transactions. * This object keeps track of all txreconciliation-related communications with the peers. @@ -67,6 +103,11 @@ public: */ uint64_t PreRegisterPeer(NodeId peer_id); + /** + * For testing purposes only. This SHOULD NEVER be used in production. + */ + void PreRegisterPeerWithSalt(NodeId peer_id, uint64_t local_salt); + /** * Step 0. Once the peer agreed to reconcile txs with us, generate the state required to track * ongoing reconciliations. Must be called only after pre-registering the peer and only once. @@ -74,6 +115,25 @@ public: ReconciliationRegisterResult RegisterPeer(NodeId peer_id, bool is_peer_inbound, uint32_t peer_recon_version, uint64_t remote_salt); + /** + * Step 1. Add a to-be-announced transaction to the local reconciliation set of the target peer. + * Returns false if the set is at capacity, or if the set contains a colliding transaction (alongside + * the colliding wtxid). Returns true if the transaction is added to the set (or if it was already in it). + */ + AddToSetResult AddToSet(NodeId peer_id, const Wtxid& wtxid); + + /** + * Checks whether a transaction is part of the peer's reconciliation set. + */ + bool IsTransactionInSet(NodeId peer_id, const Wtxid& wtxid); + + /** + * Before Step 2, we might want to remove a wtxid from the reconciliation set, for example if + * the peer just announced the transaction to us. + * Returns whether the wtxid was removed. + */ + bool TryRemovingFromSet(NodeId peer_id, const Wtxid& wtxid); + /** * Attempts to forget txreconciliation-related state of the peer (if we previously stored any). * After this, we won't be able to reconcile transactions with the peer. @@ -84,6 +144,26 @@ public: * Check if a peer is registered to reconcile transactions with us. */ bool IsPeerRegistered(NodeId peer_id) const; + + /** + * Whether a given peer is currently flagged for fanout. + */ + bool IsInboundFanoutTarget(NodeId peer_id); + + /** + * Get the next time the inbound peer subset should be rotated. + */ + std::chrono::microseconds GetNextInboundPeerRotationTime(); + + /** + * Update the next inbound peer rotation time. + */ + void SetNextInboundPeerRotationTime(std::chrono::microseconds next_time); + + /** + * Picks a different subset of inbound peers to fanout to. + */ + void RotateInboundFanoutTargets(); }; #endif // BITCOIN_NODE_TXRECONCILIATION_H diff --git a/src/test/txreconciliation_tests.cpp b/src/test/txreconciliation_tests.cpp index e258e3353da..e0f0dbcfba5 100644 --- a/src/test/txreconciliation_tests.cpp +++ b/src/test/txreconciliation_tests.cpp @@ -81,4 +81,88 @@ BOOST_AUTO_TEST_CASE(IsPeerRegisteredTest) BOOST_CHECK(!tracker.IsPeerRegistered(peer_id0)); } +BOOST_AUTO_TEST_CASE(AddToSetTest) +{ + TxReconciliationTracker tracker(TXRECONCILIATION_VERSION); + NodeId peer_id0 = 0; + FastRandomContext frc{/*fDeterministic=*/true}; + + Wtxid wtxid{Wtxid::FromUint256(frc.rand256())}; + + // If the peer is not registered, adding to the set fails + BOOST_REQUIRE(!tracker.IsPeerRegistered(peer_id0)); + auto r = tracker.AddToSet(peer_id0, wtxid); + BOOST_REQUIRE(!r.m_succeeded); + BOOST_REQUIRE(!r.m_collision.has_value()); + + // As long as the peer is registered, adding a new wtxid to the set should work + tracker.PreRegisterPeer(peer_id0); + BOOST_REQUIRE_EQUAL(tracker.RegisterPeer(peer_id0, true, 1, 1), ReconciliationRegisterResult::SUCCESS); + BOOST_CHECK(tracker.IsPeerRegistered(peer_id0)); + + r = tracker.AddToSet(peer_id0, wtxid); + BOOST_REQUIRE(r.m_succeeded); + BOOST_REQUIRE(!r.m_collision.has_value()); + + // If the peer is dropped, adding wtxids to its set should fail + tracker.ForgetPeer(peer_id0); + Wtxid wtxid2{Wtxid::FromUint256(frc.rand256())}; + r = tracker.AddToSet(peer_id0, wtxid2); + BOOST_REQUIRE(!r.m_succeeded); + BOOST_REQUIRE(!r.m_collision.has_value()); + + NodeId peer_id1 = 1; + tracker.PreRegisterPeer(peer_id1); + BOOST_REQUIRE_EQUAL(tracker.RegisterPeer(peer_id1, true, 1, 1), ReconciliationRegisterResult::SUCCESS); + BOOST_CHECK(tracker.IsPeerRegistered(peer_id1)); + + // As long as the peer is registered, the transaction is not in the set, and there is no short id + // collision, adding should work + size_t added_txs = 0; + while (added_txs < MAX_RECONSET_SIZE) { + wtxid = Wtxid::FromUint256(frc.rand256()); + Wtxid collision; + + r = tracker.AddToSet(peer_id1, wtxid); + if (r.m_succeeded) { + BOOST_REQUIRE(!r.m_collision.has_value()); + ++added_txs; + } else { + BOOST_REQUIRE_EQUAL(r.m_collision.value(), collision); + } + } + + // Adding one more item will fail due to the set being full + r = tracker.AddToSet(peer_id1, Wtxid::FromUint256(frc.rand256())); + BOOST_REQUIRE(!r.m_succeeded); + BOOST_REQUIRE(!r.m_collision.has_value()); + + // Trying to add the same item twice will just bypass + r = tracker.AddToSet(peer_id1, wtxid); + BOOST_REQUIRE(r.m_succeeded); + BOOST_REQUIRE(!r.m_collision.has_value()); +} + +BOOST_AUTO_TEST_CASE(AddToSetCollisionTest) +{ + TxReconciliationTracker tracker(TXRECONCILIATION_VERSION); + NodeId peer_id0 = 0; + FastRandomContext frc{/*fDeterministic=*/true}; + + // Precompute collision + Wtxid wtxid{Wtxid::FromUint256(uint256("c70d778bccef36a81aed8da0b819d2bd28bd8653e56a5d40903df1a0ade0b876"))}; + Wtxid collision{Wtxid::FromUint256(uint256("ae52a6ecb8733fba1f7af6022a8b9dd327d7825054229fafcad7e03c38ae2a50"))}; + + // Register the peer with a predefined salt so we can force the collision + tracker.PreRegisterPeerWithSalt(peer_id0, 2); + BOOST_REQUIRE_EQUAL(tracker.RegisterPeer(peer_id0, true, 1, 1), ReconciliationRegisterResult::SUCCESS); + BOOST_CHECK(tracker.IsPeerRegistered(peer_id0)); + + // Once the peer is registered, we can try to add both transactions and check + BOOST_REQUIRE(tracker.AddToSet(peer_id0, wtxid).m_succeeded); + auto r = tracker.AddToSet(peer_id0, collision); + BOOST_REQUIRE(!r.m_succeeded); + BOOST_REQUIRE_EQUAL(r.m_collision.value(), wtxid); +} + BOOST_AUTO_TEST_SUITE_END()