mirror of
https://github.com/bitcoin/bitcoin.git
synced 2025-04-29 14:59:39 -04:00
Merge 08f6d4514a
into c5e44a0435
This commit is contained in:
commit
573f558643
6 changed files with 628 additions and 51 deletions
|
@ -536,7 +536,8 @@ public:
|
|||
std::vector<TxOrphanage::OrphanTxBase> GetOrphanTransactions() override EXCLUSIVE_LOCKS_REQUIRED(!m_tx_download_mutex);
|
||||
PeerManagerInfo GetInfo() const override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex);
|
||||
void SendPings() override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex);
|
||||
void RelayTransaction(const uint256& txid, const uint256& wtxid) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex);
|
||||
std::pair<size_t, size_t> GetFanoutPeersCount() override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex);
|
||||
void RelayTransaction(const uint256& txid, const uint256& wtxid, bool consider_fanout) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex);
|
||||
void SetBestBlock(int height, std::chrono::seconds time) override
|
||||
{
|
||||
m_best_height = height;
|
||||
|
@ -614,19 +615,19 @@ private:
|
|||
|
||||
/** Handle a transaction whose result was MempoolAcceptResult::ResultType::VALID.
|
||||
* Updates m_txrequest, m_orphanage, and vExtraTxnForCompact. Also queues the tx for relay. */
|
||||
void ProcessValidTx(NodeId nodeid, const CTransactionRef& tx, const std::list<CTransactionRef>& replaced_transactions)
|
||||
void ProcessValidTx(NodeId nodeid, const CTransactionRef& tx, const std::list<CTransactionRef>& replaced_transactions, bool consider_fanout)
|
||||
EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, g_msgproc_mutex, m_tx_download_mutex);
|
||||
|
||||
/** Handle the results of package validation: calls ProcessValidTx and ProcessInvalidTx for
|
||||
* individual transactions, and caches rejection for the package as a group.
|
||||
*/
|
||||
void ProcessPackageResult(const node::PackageToValidate& package_to_validate, const PackageMempoolAcceptResult& package_result)
|
||||
void ProcessPackageResult(const node::PackageToValidate& package_to_validate, const PackageMempoolAcceptResult& package_result, bool consider_fanout)
|
||||
EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, g_msgproc_mutex, m_tx_download_mutex);
|
||||
|
||||
/**
|
||||
* Reconsider orphan transactions after a parent has been accepted to the mempool.
|
||||
*
|
||||
* @peer[in] peer The peer whose orphan transactions we will reconsider. Generally only
|
||||
* @param[in] peer The peer whose orphan transactions we will reconsider. Generally only
|
||||
* one orphan will be reconsidered on each call of this function. If an
|
||||
* accepted orphan has orphaned children, those will need to be
|
||||
* reconsidered, creating more work, possibly for other peers.
|
||||
|
@ -637,6 +638,17 @@ private:
|
|||
bool ProcessOrphanTx(Peer& peer)
|
||||
EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, g_msgproc_mutex, !m_tx_download_mutex);
|
||||
|
||||
/** Whether we should fanout to a given peer or not. Always returns true for non-Erlay peers
|
||||
* For Erlay-peers, if they are inbound, returns true as long as the peer has been selected for fanout.
|
||||
* If they are outbound, returns true as long as the transaction was received via fanout (further filtering will be performed
|
||||
* before sending out the next INV message to each peer).
|
||||
* Returns false otherwise.
|
||||
*
|
||||
* @param[in] peer The peer we are making the decision on
|
||||
* @param[in] consider_fanout Whether to consider fanout or not (only applies if the peer is outbound)
|
||||
*/
|
||||
bool ShouldFanoutTo(const PeerRef peer, bool consider_fanout) EXCLUSIVE_LOCKS_REQUIRED(m_peer_mutex);
|
||||
|
||||
/** Process a single headers message from a peer.
|
||||
*
|
||||
* @param[in] pfrom CNode of the peer
|
||||
|
@ -1581,7 +1593,8 @@ void PeerManagerImpl::ReattemptInitialBroadcast(CScheduler& scheduler)
|
|||
CTransactionRef tx = m_mempool.get(txid);
|
||||
|
||||
if (tx != nullptr) {
|
||||
RelayTransaction(txid, tx->GetWitnessHash());
|
||||
// Always consider fanout when relaying our own transactions
|
||||
RelayTransaction(txid, tx->GetWitnessHash(), /*consider_fanout=*/true);
|
||||
} else {
|
||||
m_mempool.RemoveUnbroadcastTx(txid, true);
|
||||
}
|
||||
|
@ -2148,12 +2161,44 @@ void PeerManagerImpl::SendPings()
|
|||
for(auto& it : m_peer_map) it.second->m_ping_queued = true;
|
||||
}
|
||||
|
||||
void PeerManagerImpl::RelayTransaction(const uint256& txid, const uint256& wtxid)
|
||||
std::pair<size_t, size_t> PeerManagerImpl::GetFanoutPeersCount()
|
||||
{
|
||||
|
||||
size_t inbounds_fanout_tx_relay = 0, outbounds_fanout_tx_relay = 0;
|
||||
|
||||
if (m_txreconciliation) {
|
||||
LOCK(m_peer_mutex);
|
||||
for(const auto& [peer_id, peer] : m_peer_map) {
|
||||
if (const auto tx_relay = peer->GetTxRelay()) {
|
||||
const bool peer_relays_txs = WITH_LOCK(tx_relay->m_bloom_filter_mutex, return tx_relay->m_relay_txs);
|
||||
if (peer_relays_txs && !m_txreconciliation->IsPeerRegistered(peer_id)) {
|
||||
peer->m_is_inbound ? ++inbounds_fanout_tx_relay : ++outbounds_fanout_tx_relay;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return std::pair(inbounds_fanout_tx_relay, outbounds_fanout_tx_relay);
|
||||
}
|
||||
|
||||
bool PeerManagerImpl::ShouldFanoutTo(const PeerRef peer, bool consider_fanout)
|
||||
{
|
||||
// We consider Erlay peers for fanout if they are within our inbound fanout targets, or if they are outbounds
|
||||
// and the transaction was NOT received via set reconciliation. For the latter group, further filtering
|
||||
// will be applied at relay time.
|
||||
if (m_txreconciliation && m_txreconciliation->IsPeerRegistered(peer->m_id)) {
|
||||
return (!peer->m_is_inbound && consider_fanout) || m_txreconciliation->IsInboundFanoutTarget(peer->m_id);
|
||||
} else {
|
||||
// For non-Erlay peers we always fanout (same applies if we do not support Erlay)
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
void PeerManagerImpl::RelayTransaction(const uint256& txid, const uint256& wtxid, bool consider_fanout)
|
||||
{
|
||||
LOCK(m_peer_mutex);
|
||||
for(auto& it : m_peer_map) {
|
||||
Peer& peer = *it.second;
|
||||
auto tx_relay = peer.GetTxRelay();
|
||||
for(auto& [peer_id, peer] : m_peer_map) {
|
||||
auto tx_relay = peer->GetTxRelay();
|
||||
if (!tx_relay) continue;
|
||||
|
||||
LOCK(tx_relay->m_tx_inventory_mutex);
|
||||
|
@ -2164,9 +2209,26 @@ void PeerManagerImpl::RelayTransaction(const uint256& txid, const uint256& wtxid
|
|||
// in the announcement.
|
||||
if (tx_relay->m_next_inv_send_time == 0s) continue;
|
||||
|
||||
const uint256& hash{peer.m_wtxid_relay ? wtxid : txid};
|
||||
const uint256& hash{peer->m_wtxid_relay ? wtxid : txid};
|
||||
if (!tx_relay->m_tx_inventory_known_filter.contains(hash)) {
|
||||
tx_relay->m_tx_inventory_to_send.insert(hash);
|
||||
bool fanout = ShouldFanoutTo(peer, consider_fanout);
|
||||
// FIXME: This bit here and the corresponding one in SendMessage are basically identical.
|
||||
// Check if there is a way to make them into a private function We would have to pass a
|
||||
// lambda that does the "insert into a collection part", which is what's different from both
|
||||
if (!fanout) {
|
||||
Assume(m_txreconciliation);
|
||||
const auto result = m_txreconciliation->AddToSet(peer_id, Wtxid::FromUint256(wtxid));
|
||||
if (!result.m_succeeded) {
|
||||
fanout = true;
|
||||
if (const auto collision = result.m_collision; collision.has_value()) {
|
||||
Assume(m_txreconciliation->TryRemovingFromSet(peer_id, collision.value()));
|
||||
tx_relay->m_tx_inventory_to_send.insert(collision.value().ToUint256());
|
||||
}
|
||||
}
|
||||
}
|
||||
if (fanout) {
|
||||
tx_relay->m_tx_inventory_to_send.insert(hash);
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
@ -3024,7 +3086,7 @@ std::optional<node::PackageToValidate> PeerManagerImpl::ProcessInvalidTx(NodeId
|
|||
return package_to_validate;
|
||||
}
|
||||
|
||||
void PeerManagerImpl::ProcessValidTx(NodeId nodeid, const CTransactionRef& tx, const std::list<CTransactionRef>& replaced_transactions)
|
||||
void PeerManagerImpl::ProcessValidTx(NodeId nodeid, const CTransactionRef& tx, const std::list<CTransactionRef>& replaced_transactions, bool consider_fanout)
|
||||
{
|
||||
AssertLockNotHeld(m_peer_mutex);
|
||||
AssertLockHeld(g_msgproc_mutex);
|
||||
|
@ -3038,14 +3100,14 @@ void PeerManagerImpl::ProcessValidTx(NodeId nodeid, const CTransactionRef& tx, c
|
|||
tx->GetWitnessHash().ToString(),
|
||||
m_mempool.size(), m_mempool.DynamicMemoryUsage() / 1000);
|
||||
|
||||
RelayTransaction(tx->GetHash(), tx->GetWitnessHash());
|
||||
RelayTransaction(tx->GetHash(), tx->GetWitnessHash(), consider_fanout);
|
||||
|
||||
for (const CTransactionRef& removedTx : replaced_transactions) {
|
||||
AddToCompactExtraTransactions(removedTx);
|
||||
}
|
||||
}
|
||||
|
||||
void PeerManagerImpl::ProcessPackageResult(const node::PackageToValidate& package_to_validate, const PackageMempoolAcceptResult& package_result)
|
||||
void PeerManagerImpl::ProcessPackageResult(const node::PackageToValidate& package_to_validate, const PackageMempoolAcceptResult& package_result, bool consider_fanout)
|
||||
{
|
||||
AssertLockNotHeld(m_peer_mutex);
|
||||
AssertLockHeld(g_msgproc_mutex);
|
||||
|
@ -3075,7 +3137,7 @@ void PeerManagerImpl::ProcessPackageResult(const node::PackageToValidate& packag
|
|||
switch (tx_result.m_result_type) {
|
||||
case MempoolAcceptResult::ResultType::VALID:
|
||||
{
|
||||
ProcessValidTx(nodeid, tx, tx_result.m_replaced_transactions);
|
||||
ProcessValidTx(nodeid, tx, tx_result.m_replaced_transactions, consider_fanout);
|
||||
break;
|
||||
}
|
||||
case MempoolAcceptResult::ResultType::INVALID:
|
||||
|
@ -3118,7 +3180,15 @@ bool PeerManagerImpl::ProcessOrphanTx(Peer& peer)
|
|||
|
||||
if (result.m_result_type == MempoolAcceptResult::ResultType::VALID) {
|
||||
LogDebug(BCLog::TXPACKAGES, " accepted orphan tx %s (wtxid=%s)\n", orphanHash.ToString(), orphan_wtxid.ToString());
|
||||
ProcessValidTx(peer.m_id, porphanTx, result.m_replaced_transactions);
|
||||
// When processing orphans after getting a missing parent, we always consider faning the out.
|
||||
// TODO: This is a weird case. ProcessOrphanTx is triggered by the reception of a missing parent, but we do not keep track of
|
||||
// when the orphan was received, so it could be the case that we end up here when the orphan is still in its early propagation
|
||||
// state, or far after. If we are late, relaying the transaction may not be worth it, our peers may already know (reconciling
|
||||
// would even be worse than fanout, since it would simply add more roundtrips to the same outcome: annoincing a transaction that is
|
||||
// already known).
|
||||
// This should only happen if we connect to the network and receive a depending transaction late on its propagation.
|
||||
// However, it may be worth considering how to deal with this case in a followup to reduce unnecessary traffic.
|
||||
ProcessValidTx(peer.m_id, porphanTx, result.m_replaced_transactions, /*consider_fanout=*/true);
|
||||
return true;
|
||||
} else if (state.GetResult() != TxValidationResult::TX_MISSING_INPUTS) {
|
||||
LogDebug(BCLog::TXPACKAGES, " invalid orphan tx %s (wtxid=%s) from peer=%d. %s\n",
|
||||
|
@ -4262,6 +4332,9 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type,
|
|||
|
||||
LOCK2(cs_main, m_tx_download_mutex);
|
||||
|
||||
// TODO: Until the Erlay p2p flow is defined, all transactions are flagged for fanout
|
||||
bool consider_fanout = true;
|
||||
|
||||
const auto& [should_validate, package_to_validate] = m_txdownloadman.ReceivedTx(pfrom.GetId(), ptx);
|
||||
if (!should_validate) {
|
||||
if (pfrom.HasPermission(NetPermissionFlags::ForceRelay)) {
|
||||
|
@ -4274,7 +4347,7 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type,
|
|||
} else {
|
||||
LogPrintf("Force relaying tx %s (wtxid=%s) from peer=%d\n",
|
||||
tx.GetHash().ToString(), tx.GetWitnessHash().ToString(), pfrom.GetId());
|
||||
RelayTransaction(tx.GetHash(), tx.GetWitnessHash());
|
||||
RelayTransaction(tx.GetHash(), tx.GetWitnessHash(), consider_fanout);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -4282,7 +4355,7 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type,
|
|||
const auto package_result{ProcessNewPackage(m_chainman.ActiveChainstate(), m_mempool, package_to_validate->m_txns, /*test_accept=*/false, /*client_maxfeerate=*/std::nullopt)};
|
||||
LogDebug(BCLog::TXPACKAGES, "package evaluation for %s: %s\n", package_to_validate->ToString(),
|
||||
package_result.m_state.IsValid() ? "package accepted" : "package rejected");
|
||||
ProcessPackageResult(package_to_validate.value(), package_result);
|
||||
ProcessPackageResult(package_to_validate.value(), package_result, consider_fanout);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
@ -4294,7 +4367,7 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type,
|
|||
const TxValidationState& state = result.m_state;
|
||||
|
||||
if (result.m_result_type == MempoolAcceptResult::ResultType::VALID) {
|
||||
ProcessValidTx(pfrom.GetId(), ptx, result.m_replaced_transactions);
|
||||
ProcessValidTx(pfrom.GetId(), ptx, result.m_replaced_transactions, consider_fanout);
|
||||
pfrom.m_last_tx_time = GetTime<std::chrono::seconds>();
|
||||
}
|
||||
if (state.IsInvalid()) {
|
||||
|
@ -4302,7 +4375,7 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type,
|
|||
const auto package_result{ProcessNewPackage(m_chainman.ActiveChainstate(), m_mempool, package_to_validate->m_txns, /*test_accept=*/false, /*client_maxfeerate=*/std::nullopt)};
|
||||
LogDebug(BCLog::TXPACKAGES, "package evaluation for %s: %s\n", package_to_validate->ToString(),
|
||||
package_result.m_state.IsValid() ? "package accepted" : "package rejected");
|
||||
ProcessPackageResult(package_to_validate.value(), package_result);
|
||||
ProcessPackageResult(package_to_validate.value(), package_result, consider_fanout);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -5702,10 +5775,12 @@ bool PeerManagerImpl::SendMessages(CNode* pto)
|
|||
peer->m_blocks_for_inv_relay.clear();
|
||||
}
|
||||
|
||||
// Check whether periodic sends should happen
|
||||
bool fSendTrickle = pto->HasPermission(NetPermissionFlags::NoBan);
|
||||
if (auto tx_relay = peer->GetTxRelay(); tx_relay != nullptr) {
|
||||
std::vector<std::pair<uint256, uint16_t>> to_be_announced{};
|
||||
{
|
||||
LOCK(tx_relay->m_tx_inventory_mutex);
|
||||
// Check whether periodic sends should happen
|
||||
bool fSendTrickle = pto->HasPermission(NetPermissionFlags::NoBan);
|
||||
if (tx_relay->m_next_inv_send_time < current_time) {
|
||||
fSendTrickle = true;
|
||||
if (pto->IsInboundConn()) {
|
||||
|
@ -5715,6 +5790,12 @@ bool PeerManagerImpl::SendMessages(CNode* pto)
|
|||
}
|
||||
}
|
||||
|
||||
// Rotate inbound fanout targets if the timer has gone off
|
||||
if (m_txreconciliation && m_txreconciliation->GetNextInboundPeerRotationTime() < current_time) {
|
||||
m_txreconciliation->SetNextInboundPeerRotationTime(m_rng.rand_exp_duration(INBOUND_FANOUT_ROTATION_INTERVAL));
|
||||
m_txreconciliation->RotateInboundFanoutTargets();
|
||||
}
|
||||
|
||||
// Time to send but the peer has requested we not relay transactions.
|
||||
if (fSendTrickle) {
|
||||
LOCK(tx_relay->m_bloom_filter_mutex);
|
||||
|
@ -5779,7 +5860,6 @@ bool PeerManagerImpl::SendMessages(CNode* pto)
|
|||
std::set<uint256>::iterator it = vInvTx.back();
|
||||
vInvTx.pop_back();
|
||||
uint256 hash = *it;
|
||||
CInv inv(peer->m_wtxid_relay ? MSG_WTX : MSG_TX, hash);
|
||||
// Remove it from the to-be-sent set
|
||||
tx_relay->m_tx_inventory_to_send.erase(it);
|
||||
// Check if not in the filter already
|
||||
|
@ -5787,7 +5867,7 @@ bool PeerManagerImpl::SendMessages(CNode* pto)
|
|||
continue;
|
||||
}
|
||||
// Not in the mempool anymore? don't bother sending it.
|
||||
auto txinfo = m_mempool.info(ToGenTxid(inv));
|
||||
auto txinfo = m_mempool.info(GenTxid(peer->m_wtxid_relay ? GenTxid::Wtxid(hash) : GenTxid::Txid(hash)));
|
||||
if (!txinfo.tx) {
|
||||
continue;
|
||||
}
|
||||
|
@ -5796,20 +5876,81 @@ bool PeerManagerImpl::SendMessages(CNode* pto)
|
|||
continue;
|
||||
}
|
||||
if (tx_relay->m_bloom_filter && !tx_relay->m_bloom_filter->IsRelevantAndUpdate(*txinfo.tx)) continue;
|
||||
// Send
|
||||
vInv.push_back(inv);
|
||||
// Flag to be sent.
|
||||
// Initialize with the counter at 0. This in only relevant for Erlay nodes and will be updated in the
|
||||
// following context after releasing m_tx_inventory_mutex
|
||||
to_be_announced.emplace_back(hash, 0);
|
||||
nRelayedTransactions++;
|
||||
if (vInv.size() == MAX_INV_SZ) {
|
||||
MakeAndPushMessage(*pto, NetMsgType::INV, vInv);
|
||||
vInv.clear();
|
||||
}
|
||||
tx_relay->m_tx_inventory_known_filter.insert(hash);
|
||||
}
|
||||
|
||||
// Ensure we'll respond to GETDATA requests for anything we've just announced
|
||||
LOCK(m_mempool.cs);
|
||||
tx_relay->m_last_inv_sequence = m_mempool.GetSequence();
|
||||
}
|
||||
} // Unlock the m_tx_inventory_mutex so we can count over m_peer_map
|
||||
|
||||
// Only care about fanout count if we support Erlay
|
||||
if (m_txreconciliation) {
|
||||
LOCK(m_peer_mutex);
|
||||
for (auto& [hash, out_fanout_count] : to_be_announced) {
|
||||
for (const auto& [cur_peer_id, cur_peer] : m_peer_map) {
|
||||
if (auto peer_tx_relay = cur_peer->GetTxRelay()) {
|
||||
LOCK(peer_tx_relay->m_tx_inventory_mutex);
|
||||
if (!pto->IsInboundConn() && peer_tx_relay->m_tx_inventory_known_filter.contains(hash)) {
|
||||
out_fanout_count+=1;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Re-lock
|
||||
LOCK(tx_relay->m_tx_inventory_mutex);
|
||||
for (auto& [hash, out_fanout_count] : to_be_announced) {
|
||||
// Send
|
||||
bool should_fanout = true;
|
||||
// For non-Erlay and inbound peer simply fanout. Erlay-enabled inbounds have been assigned transaction to reconcile
|
||||
// in RelayTransaction, so everything that was added to m_tx_inventory_to_send is to be fanout
|
||||
if (!pto->IsInboundConn() && m_txreconciliation && m_txreconciliation->IsPeerRegistered(pto->GetId())) {
|
||||
// For Erlay-enabled outbound peers we fanout based on how we have heard about this transaction
|
||||
// and how many announcements of this transactions have we sent and receivedx
|
||||
// TODO: If we are the transaction source, we should reduce the threshold by 1, since this the only case
|
||||
// where we are not accounting for at least one reception
|
||||
should_fanout = out_fanout_count <= OUTBOUND_FANOUT_THRESHOLD;
|
||||
}
|
||||
|
||||
CInv inv(peer->m_wtxid_relay ? MSG_WTX : MSG_TX, hash);
|
||||
auto add_to_inv_vec = [&](const CInv inv) EXCLUSIVE_LOCKS_REQUIRED(tx_relay->m_tx_inventory_mutex) {
|
||||
vInv.push_back(inv);
|
||||
if (vInv.size() == MAX_INV_SZ) {
|
||||
MakeAndPushMessage(*pto, NetMsgType::INV, vInv);
|
||||
vInv.clear();
|
||||
}
|
||||
tx_relay->m_tx_inventory_known_filter.insert(inv.hash);
|
||||
};
|
||||
|
||||
if (!should_fanout) {
|
||||
// Note we are not handling the case of ancestors being reconciled and descendants being fanout.
|
||||
// This can propagate descendants faster than ancestors, making them orphans. However, transactions
|
||||
// are picked for reconciliation if we consider they have been propagated enough, so in this case
|
||||
// odds are that the peer already knows about the parent (and it's queued to be announced or reconciled to us).
|
||||
Assume(m_txreconciliation);
|
||||
const auto result = m_txreconciliation->AddToSet(pto->GetId(), Wtxid::FromUint256(hash));
|
||||
if (!result.m_succeeded) {
|
||||
should_fanout = true;
|
||||
if (const auto collision = result.m_collision; collision.has_value()) {
|
||||
// In case of a collision, this loop will increase nRelayedTransactions twice
|
||||
Assume(m_txreconciliation->TryRemovingFromSet(pto->GetId(), collision.value()));
|
||||
add_to_inv_vec(CInv(MSG_WTX, collision.value()));
|
||||
}
|
||||
}
|
||||
}
|
||||
if (should_fanout) {
|
||||
add_to_inv_vec(inv);
|
||||
}
|
||||
}
|
||||
|
||||
if (fSendTrickle) {
|
||||
// Ensure we'll respond to GETDATA requests for anything we've just announced
|
||||
LOCK(m_mempool.cs);
|
||||
tx_relay->m_last_inv_sequence = m_mempool.GetSequence();
|
||||
}
|
||||
}
|
||||
if (!vInv.empty())
|
||||
MakeAndPushMessage(*pto, NetMsgType::INV, vInv);
|
||||
|
|
|
@ -119,7 +119,10 @@ public:
|
|||
virtual PeerManagerInfo GetInfo() const = 0;
|
||||
|
||||
/** Relay transaction to all peers. */
|
||||
virtual void RelayTransaction(const uint256& txid, const uint256& wtxid) = 0;
|
||||
virtual void RelayTransaction(const uint256& txid, const uint256& wtxid, bool consider_fanout) = 0;
|
||||
|
||||
/** Get the amount of inbounds (first) and outbounds fanout peers (second). */
|
||||
virtual std::pair<size_t, size_t> GetFanoutPeersCount() = 0;
|
||||
|
||||
/** Send ping message to all peers */
|
||||
virtual void SendPings() = 0;
|
||||
|
|
|
@ -117,7 +117,8 @@ TransactionError BroadcastTransaction(NodeContext& node, const CTransactionRef t
|
|||
}
|
||||
|
||||
if (relay) {
|
||||
node.peerman->RelayTransaction(txid, wtxid);
|
||||
// Always consider fanout for out own transactions
|
||||
node.peerman->RelayTransaction(txid, wtxid, /*consider_fanout=*/true);
|
||||
}
|
||||
|
||||
return TransactionError::OK;
|
||||
|
|
|
@ -7,7 +7,9 @@
|
|||
#include <common/system.h>
|
||||
#include <logging.h>
|
||||
#include <util/check.h>
|
||||
#include <util/hasher.h>
|
||||
|
||||
#include <cmath>
|
||||
#include <unordered_map>
|
||||
#include <variant>
|
||||
|
||||
|
@ -36,27 +38,49 @@ class TxReconciliationState
|
|||
{
|
||||
public:
|
||||
/**
|
||||
* TODO: This field is public to ignore -Wunused-private-field. Make private once used in
|
||||
* the following commits.
|
||||
*
|
||||
* Reconciliation protocol assumes using one role consistently: either a reconciliation
|
||||
* initiator (requesting sketches), or responder (sending sketches). This defines our role,
|
||||
* based on the direction of the p2p connection.
|
||||
*
|
||||
*/
|
||||
bool m_we_initiate;
|
||||
|
||||
/**
|
||||
* TODO: These fields are public to ignore -Wunused-private-field. Make private once used in
|
||||
* the following commits.
|
||||
*
|
||||
* Store all wtxids which we would announce to the peer (policy checks passed, etc.)
|
||||
* in this set instead of announcing them right away. When reconciliation time comes, we will
|
||||
* compute a compressed representation of this set ("sketch") and use it to efficiently
|
||||
* reconcile this set with a set on the peer's side.
|
||||
*/
|
||||
std::unordered_set<Wtxid, SaltedTxidHasher> m_local_set;
|
||||
|
||||
/**
|
||||
* Reconciliation sketches are computed over short transaction IDs.
|
||||
* This is a cache of these IDs enabling faster lookups of full wtxids,
|
||||
* useful when peer will ask for missing transactions by short IDs
|
||||
* at the end of a reconciliation round.
|
||||
* We also use this to keep track of short ID collisions. In case of a
|
||||
* collision, both transactions should be fanout.
|
||||
*/
|
||||
std::map<uint32_t, Wtxid> m_short_id_mapping;
|
||||
|
||||
TxReconciliationState(bool we_initiate, uint64_t k0, uint64_t k1) : m_we_initiate(we_initiate), m_k0(k0), m_k1(k1) {}
|
||||
|
||||
/**
|
||||
* Reconciliation sketches are computed over short transaction IDs.
|
||||
* Short IDs are salted with a link-specific constant value.
|
||||
*/
|
||||
uint32_t ComputeShortID(const uint256 wtxid) const
|
||||
{
|
||||
const uint64_t s = SipHashUint256(m_k0, m_k1, wtxid);
|
||||
const uint32_t short_txid = 1 + (s & 0xFFFFFFFF);
|
||||
return short_txid;
|
||||
}
|
||||
|
||||
private:
|
||||
/**
|
||||
* These values are used to salt short IDs, which is necessary for transaction reconciliations.
|
||||
*/
|
||||
uint64_t m_k0, m_k1;
|
||||
|
||||
TxReconciliationState(bool we_initiate, uint64_t k0, uint64_t k1) : m_we_initiate(we_initiate), m_k0(k0), m_k1(k1) {}
|
||||
};
|
||||
|
||||
} // namespace
|
||||
|
||||
/** Actual implementation for TxReconciliationTracker's data structure. */
|
||||
|
@ -76,16 +100,40 @@ private:
|
|||
*/
|
||||
std::unordered_map<NodeId, std::variant<uint64_t, TxReconciliationState>> m_states GUARDED_BY(m_txreconciliation_mutex);
|
||||
|
||||
/*
|
||||
* Keeps track of how many of the registered peers are inbound. Updated on registering or
|
||||
* forgetting peers.
|
||||
*/
|
||||
size_t m_inbounds_count GUARDED_BY(m_txreconciliation_mutex){0};
|
||||
|
||||
/**
|
||||
* Collection of inbound peers selected for fanout. Should get periodically rotated using RotateInboundFanoutTargets.
|
||||
*/
|
||||
std::unordered_set<NodeId> m_inbound_fanout_targets;
|
||||
|
||||
/**
|
||||
* Next time m_inbound_fanout_targets need to be rotated.
|
||||
*/
|
||||
std::chrono::microseconds GUARDED_BY(m_txreconciliation_mutex) m_next_inbound_peer_rotation_time{0};
|
||||
|
||||
TxReconciliationState* GetRegisteredPeerState(NodeId peer_id) EXCLUSIVE_LOCKS_REQUIRED(m_txreconciliation_mutex)
|
||||
{
|
||||
AssertLockHeld(m_txreconciliation_mutex);
|
||||
auto salt_or_state = m_states.find(peer_id);
|
||||
if (salt_or_state == m_states.end()) return nullptr;
|
||||
|
||||
return std::get_if<TxReconciliationState>(&salt_or_state->second);
|
||||
}
|
||||
|
||||
public:
|
||||
explicit Impl(uint32_t recon_version) : m_recon_version(recon_version) {}
|
||||
|
||||
uint64_t PreRegisterPeer(NodeId peer_id) EXCLUSIVE_LOCKS_REQUIRED(!m_txreconciliation_mutex)
|
||||
uint64_t PreRegisterPeer(NodeId peer_id, uint64_t local_salt) EXCLUSIVE_LOCKS_REQUIRED(!m_txreconciliation_mutex)
|
||||
{
|
||||
AssertLockNotHeld(m_txreconciliation_mutex);
|
||||
LOCK(m_txreconciliation_mutex);
|
||||
|
||||
LogPrintLevel(BCLog::TXRECONCILIATION, BCLog::Level::Debug, "Pre-register peer=%d\n", peer_id);
|
||||
const uint64_t local_salt{FastRandomContext().rand64()};
|
||||
|
||||
// We do this exactly once per peer (which are unique by NodeId, see GetNewNodeId) so it's
|
||||
// safe to assume we don't have this record yet.
|
||||
|
@ -93,6 +141,21 @@ public:
|
|||
return local_salt;
|
||||
}
|
||||
|
||||
bool HasCollision(TxReconciliationState *peer_state, const Wtxid& wtxid, Wtxid& collision, uint32_t &short_id) EXCLUSIVE_LOCKS_REQUIRED(m_txreconciliation_mutex)
|
||||
{
|
||||
AssertLockHeld(m_txreconciliation_mutex);
|
||||
|
||||
short_id = peer_state->ComputeShortID(wtxid);
|
||||
const auto iter = peer_state->m_short_id_mapping.find(short_id);
|
||||
|
||||
if (iter != peer_state->m_short_id_mapping.end()) {
|
||||
collision = iter->second;
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
ReconciliationRegisterResult RegisterPeer(NodeId peer_id, bool is_peer_inbound, uint32_t peer_recon_version,
|
||||
uint64_t remote_salt) EXCLUSIVE_LOCKS_REQUIRED(!m_txreconciliation_mutex)
|
||||
{
|
||||
|
@ -121,19 +184,116 @@ public:
|
|||
peer_id, is_peer_inbound);
|
||||
|
||||
const uint256 full_salt{ComputeSalt(local_salt, remote_salt)};
|
||||
recon_state->second = TxReconciliationState(!is_peer_inbound, full_salt.GetUint64(0), full_salt.GetUint64(1));
|
||||
|
||||
auto new_state = TxReconciliationState(!is_peer_inbound, full_salt.GetUint64(0), full_salt.GetUint64(1));;
|
||||
m_states.erase(recon_state);
|
||||
bool emplaced = m_states.emplace(peer_id, std::move(new_state)).second;
|
||||
Assume(emplaced);
|
||||
|
||||
if (is_peer_inbound && m_inbounds_count < std::numeric_limits<size_t>::max()) {
|
||||
++m_inbounds_count;
|
||||
|
||||
if (m_inbound_fanout_targets.size() < std::floor(m_inbounds_count * INBOUND_FANOUT_DESTINATIONS_FRACTION)) {
|
||||
// Scale up fanout targets as we get more connections. Targets will be rotated periodically via RotateInboundFanoutTargets
|
||||
if (FastRandomContext().randrange(10) <= INBOUND_FANOUT_DESTINATIONS_FRACTION * 10) {
|
||||
m_inbound_fanout_targets.insert(peer_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
return ReconciliationRegisterResult::SUCCESS;
|
||||
}
|
||||
|
||||
AddToSetResult AddToSet(NodeId peer_id, const Wtxid& wtxid) EXCLUSIVE_LOCKS_REQUIRED(!m_txreconciliation_mutex)
|
||||
{
|
||||
AssertLockNotHeld(m_txreconciliation_mutex);
|
||||
LOCK(m_txreconciliation_mutex);
|
||||
auto peer_state = GetRegisteredPeerState(peer_id);
|
||||
if (!peer_state) return AddToSetResult::Failed();
|
||||
|
||||
// Bypass if the wtxid is already in the set
|
||||
if (peer_state->m_local_set.contains(wtxid)) {
|
||||
LogPrintLevel(BCLog::TXRECONCILIATION, BCLog::Level::Debug, "%s already in reconciliation set for peer=%d. Bypassing.\n",
|
||||
wtxid.ToString(), peer_id);
|
||||
return AddToSetResult::Succeeded();
|
||||
}
|
||||
|
||||
// Make sure there is no short id collision between the wtxid we are trying to add
|
||||
// and any existing one in the reconciliation set
|
||||
Wtxid collision;
|
||||
uint32_t short_id;
|
||||
if (HasCollision(peer_state, wtxid, collision, short_id)) {
|
||||
return AddToSetResult::Collision(collision);
|
||||
}
|
||||
|
||||
// Transactions which don't make it to the set due to the limit are announced via fanout.
|
||||
if (peer_state->m_local_set.size() >= MAX_RECONSET_SIZE) {
|
||||
LogPrintLevel(BCLog::TXRECONCILIATION, BCLog::Level::Debug, "Reconciliation set maximum size reached for peer=%d.\n", peer_id);
|
||||
return AddToSetResult::Failed();
|
||||
}
|
||||
|
||||
if (peer_state->m_local_set.insert(wtxid).second) {
|
||||
peer_state->m_short_id_mapping.emplace(short_id, wtxid);
|
||||
LogPrintLevel(BCLog::TXRECONCILIATION, BCLog::Level::Debug, "Added %s to the reconciliation set for peer=%d. "
|
||||
"Now the set contains %i transactions.\n",
|
||||
wtxid.ToString(), peer_id, peer_state->m_local_set.size());
|
||||
}
|
||||
return AddToSetResult::Succeeded();
|
||||
}
|
||||
|
||||
bool IsTransactionInSet(NodeId peer_id, const Wtxid& wtxid) EXCLUSIVE_LOCKS_REQUIRED(!m_txreconciliation_mutex)
|
||||
{
|
||||
AssertLockNotHeld(m_txreconciliation_mutex);
|
||||
LOCK(m_txreconciliation_mutex);
|
||||
auto peer_state = GetRegisteredPeerState(peer_id);
|
||||
if (!peer_state) return false;
|
||||
|
||||
return peer_state->m_local_set.contains(wtxid);
|
||||
}
|
||||
|
||||
bool TryRemovingFromSet(NodeId peer_id, const Wtxid& wtxid) EXCLUSIVE_LOCKS_REQUIRED(!m_txreconciliation_mutex)
|
||||
{
|
||||
AssertLockNotHeld(m_txreconciliation_mutex);
|
||||
LOCK(m_txreconciliation_mutex);
|
||||
auto peer_state = GetRegisteredPeerState(peer_id);
|
||||
if (!peer_state) return false;
|
||||
|
||||
auto removed = peer_state->m_local_set.erase(wtxid) > 0;
|
||||
if (removed) {
|
||||
peer_state->m_short_id_mapping.erase(peer_state->ComputeShortID(wtxid));
|
||||
LogPrintLevel(BCLog::TXRECONCILIATION, BCLog::Level::Debug, "Removed %s from the reconciliation set for peer=%d. "
|
||||
"Now the set contains %i transactions.\n",
|
||||
wtxid.ToString(), peer_id, peer_state->m_local_set.size());
|
||||
} else {
|
||||
LogPrintLevel(BCLog::TXRECONCILIATION, BCLog::Level::Debug, "Couldn't remove %s from the reconciliation set for peer=%d. "
|
||||
"Transaction not found\n",
|
||||
wtxid.ToString(), peer_id);
|
||||
}
|
||||
|
||||
return removed;
|
||||
}
|
||||
|
||||
void ForgetPeer(NodeId peer_id) EXCLUSIVE_LOCKS_REQUIRED(!m_txreconciliation_mutex)
|
||||
{
|
||||
AssertLockNotHeld(m_txreconciliation_mutex);
|
||||
LOCK(m_txreconciliation_mutex);
|
||||
const auto peer = m_states.find(peer_id);
|
||||
if (peer == m_states.end()) return;
|
||||
|
||||
const auto registered = std::get_if<TxReconciliationState>(&peer->second);
|
||||
if (registered && !registered->m_we_initiate) {
|
||||
Assert(m_inbounds_count > 0);
|
||||
--m_inbounds_count;
|
||||
m_inbound_fanout_targets.erase(peer_id);
|
||||
}
|
||||
|
||||
if (m_states.erase(peer_id)) {
|
||||
LogPrintLevel(BCLog::TXRECONCILIATION, BCLog::Level::Debug, "Forget txreconciliation state of peer=%d\n", peer_id);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* For calls within this class use GetRegisteredPeerState instead.
|
||||
*/
|
||||
bool IsPeerRegistered(NodeId peer_id) const EXCLUSIVE_LOCKS_REQUIRED(!m_txreconciliation_mutex)
|
||||
{
|
||||
AssertLockNotHeld(m_txreconciliation_mutex);
|
||||
|
@ -142,15 +302,89 @@ public:
|
|||
return (recon_state != m_states.end() &&
|
||||
std::holds_alternative<TxReconciliationState>(recon_state->second));
|
||||
}
|
||||
|
||||
bool IsInboundFanoutTarget(NodeId peer_id) const EXCLUSIVE_LOCKS_REQUIRED(!m_txreconciliation_mutex) {
|
||||
AssertLockNotHeld(m_txreconciliation_mutex);
|
||||
LOCK(m_txreconciliation_mutex);
|
||||
return m_inbound_fanout_targets.contains(peer_id);
|
||||
}
|
||||
|
||||
std::chrono::microseconds GetNextInboundPeerRotationTime() EXCLUSIVE_LOCKS_REQUIRED(!m_txreconciliation_mutex) {
|
||||
AssertLockNotHeld(m_txreconciliation_mutex);
|
||||
LOCK(m_txreconciliation_mutex);
|
||||
return m_next_inbound_peer_rotation_time;
|
||||
}
|
||||
|
||||
void SetNextInboundPeerRotationTime(std::chrono::microseconds next_time) EXCLUSIVE_LOCKS_REQUIRED(!m_txreconciliation_mutex) {
|
||||
AssertLockNotHeld(m_txreconciliation_mutex);
|
||||
LOCK(m_txreconciliation_mutex);
|
||||
m_next_inbound_peer_rotation_time = next_time;
|
||||
}
|
||||
|
||||
void RotateInboundFanoutTargets() EXCLUSIVE_LOCKS_REQUIRED(!m_txreconciliation_mutex) {
|
||||
AssertLockNotHeld(m_txreconciliation_mutex);
|
||||
LOCK(m_txreconciliation_mutex);
|
||||
|
||||
auto targets_size = std::floor(m_inbounds_count * INBOUND_FANOUT_DESTINATIONS_FRACTION);
|
||||
if (targets_size == 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
std::vector<NodeId> inbound_recon_peers;
|
||||
inbound_recon_peers.reserve(m_inbounds_count);
|
||||
|
||||
// Collect all inbound reconciling peers ids in a vector and shuffle it
|
||||
for (const auto& [peer_id, op_peer_state]: m_states) {
|
||||
const auto peer_state = std::get_if<TxReconciliationState>(&op_peer_state);
|
||||
if (peer_state && !peer_state->m_we_initiate) {
|
||||
inbound_recon_peers.push_back(peer_id);
|
||||
}
|
||||
}
|
||||
std::shuffle(inbound_recon_peers.begin(), inbound_recon_peers.end(), FastRandomContext());
|
||||
|
||||
// Pick the new selection of inbound fanout peers
|
||||
Assume(inbound_recon_peers.size() > targets_size);
|
||||
m_inbound_fanout_targets.clear();
|
||||
m_inbound_fanout_targets.reserve(targets_size);
|
||||
m_inbound_fanout_targets.insert(inbound_recon_peers.begin(), inbound_recon_peers.begin() + targets_size);
|
||||
|
||||
}
|
||||
};
|
||||
|
||||
AddToSetResult::AddToSetResult(bool succeeded, std::optional<Wtxid> collision)
|
||||
{
|
||||
m_succeeded = succeeded;
|
||||
m_collision = collision;
|
||||
}
|
||||
|
||||
AddToSetResult AddToSetResult::Succeeded()
|
||||
{
|
||||
return AddToSetResult(true, std::nullopt);
|
||||
}
|
||||
|
||||
AddToSetResult AddToSetResult::Failed()
|
||||
{
|
||||
return AddToSetResult(false, std::nullopt);
|
||||
}
|
||||
|
||||
AddToSetResult AddToSetResult::Collision(Wtxid wtxid)
|
||||
{
|
||||
return AddToSetResult(false, std::make_optional(wtxid));
|
||||
}
|
||||
|
||||
TxReconciliationTracker::TxReconciliationTracker(uint32_t recon_version) : m_impl{std::make_unique<TxReconciliationTracker::Impl>(recon_version)} {}
|
||||
|
||||
TxReconciliationTracker::~TxReconciliationTracker() = default;
|
||||
|
||||
uint64_t TxReconciliationTracker::PreRegisterPeer(NodeId peer_id)
|
||||
{
|
||||
return m_impl->PreRegisterPeer(peer_id);
|
||||
const uint64_t local_salt{FastRandomContext().rand64()};
|
||||
return m_impl->PreRegisterPeer(peer_id, local_salt);
|
||||
}
|
||||
|
||||
void TxReconciliationTracker::PreRegisterPeerWithSalt(NodeId peer_id, uint64_t local_salt)
|
||||
{
|
||||
m_impl->PreRegisterPeer(peer_id, local_salt);
|
||||
}
|
||||
|
||||
ReconciliationRegisterResult TxReconciliationTracker::RegisterPeer(NodeId peer_id, bool is_peer_inbound,
|
||||
|
@ -159,6 +393,22 @@ ReconciliationRegisterResult TxReconciliationTracker::RegisterPeer(NodeId peer_i
|
|||
return m_impl->RegisterPeer(peer_id, is_peer_inbound, peer_recon_version, remote_salt);
|
||||
}
|
||||
|
||||
AddToSetResult TxReconciliationTracker::AddToSet(NodeId peer_id, const Wtxid& wtxid)
|
||||
{
|
||||
return m_impl->AddToSet(peer_id, wtxid);
|
||||
}
|
||||
|
||||
bool TxReconciliationTracker::IsTransactionInSet(NodeId peer_id, const Wtxid& wtxid)
|
||||
{
|
||||
return m_impl->IsTransactionInSet(peer_id, wtxid);
|
||||
}
|
||||
|
||||
|
||||
bool TxReconciliationTracker::TryRemovingFromSet(NodeId peer_id, const Wtxid& wtxid)
|
||||
{
|
||||
return m_impl->TryRemovingFromSet(peer_id, wtxid);
|
||||
}
|
||||
|
||||
void TxReconciliationTracker::ForgetPeer(NodeId peer_id)
|
||||
{
|
||||
m_impl->ForgetPeer(peer_id);
|
||||
|
@ -168,3 +418,21 @@ bool TxReconciliationTracker::IsPeerRegistered(NodeId peer_id) const
|
|||
{
|
||||
return m_impl->IsPeerRegistered(peer_id);
|
||||
}
|
||||
|
||||
bool TxReconciliationTracker::IsInboundFanoutTarget(NodeId peer_id)
|
||||
{
|
||||
return m_impl->IsInboundFanoutTarget(peer_id);
|
||||
}
|
||||
|
||||
std::chrono::microseconds TxReconciliationTracker::GetNextInboundPeerRotationTime(){
|
||||
return m_impl->GetNextInboundPeerRotationTime();
|
||||
}
|
||||
|
||||
void TxReconciliationTracker::SetNextInboundPeerRotationTime(std::chrono::microseconds next_time) {
|
||||
return m_impl->SetNextInboundPeerRotationTime(next_time);
|
||||
}
|
||||
|
||||
void TxReconciliationTracker::RotateInboundFanoutTargets()
|
||||
{
|
||||
return m_impl->RotateInboundFanoutTargets();
|
||||
}
|
||||
|
|
|
@ -10,10 +10,29 @@
|
|||
|
||||
#include <memory>
|
||||
#include <tuple>
|
||||
#include <optional>
|
||||
|
||||
/** Supported transaction reconciliation protocol version */
|
||||
static constexpr uint32_t TXRECONCILIATION_VERSION{1};
|
||||
|
||||
/**
|
||||
* Maximum number of wtxids stored in a peer local set, bounded to protect the memory use of
|
||||
* reconciliation sets and short ids mappings, and CPU used for sketch computation.
|
||||
*/
|
||||
constexpr size_t MAX_RECONSET_SIZE = 3000;
|
||||
|
||||
/**
|
||||
* Announce transactions via full wtxid to a limited number of inbound and outbound peers.
|
||||
* Justification for these values are provided here:
|
||||
* TODO: ADD link to justification based on simulation results */
|
||||
constexpr double INBOUND_FANOUT_DESTINATIONS_FRACTION = 0.1;
|
||||
constexpr size_t OUTBOUND_FANOUT_THRESHOLD = 4;
|
||||
|
||||
/**
|
||||
* Interval for inbound peer fanout selection. The subset is rotated on a timer.
|
||||
*/
|
||||
static constexpr auto INBOUND_FANOUT_ROTATION_INTERVAL{10min};
|
||||
|
||||
enum class ReconciliationRegisterResult {
|
||||
NOT_FOUND,
|
||||
SUCCESS,
|
||||
|
@ -21,6 +40,23 @@ enum class ReconciliationRegisterResult {
|
|||
PROTOCOL_VIOLATION,
|
||||
};
|
||||
|
||||
/**
|
||||
* Record whether or not a wtxid was successfully added to a reconciliation set.
|
||||
* In case of failure, check whether this was due to a shortid collision and record
|
||||
* the colliding wtxid.
|
||||
*/
|
||||
class AddToSetResult
|
||||
{
|
||||
public:
|
||||
bool m_succeeded;
|
||||
std::optional<Wtxid> m_collision;
|
||||
|
||||
explicit AddToSetResult(bool added, std::optional<Wtxid> conflict);
|
||||
static AddToSetResult Succeeded();
|
||||
static AddToSetResult Failed();
|
||||
static AddToSetResult Collision(Wtxid);
|
||||
};
|
||||
|
||||
/**
|
||||
* Transaction reconciliation is a way for nodes to efficiently announce transactions.
|
||||
* This object keeps track of all txreconciliation-related communications with the peers.
|
||||
|
@ -67,6 +103,11 @@ public:
|
|||
*/
|
||||
uint64_t PreRegisterPeer(NodeId peer_id);
|
||||
|
||||
/**
|
||||
* For testing purposes only. This SHOULD NEVER be used in production.
|
||||
*/
|
||||
void PreRegisterPeerWithSalt(NodeId peer_id, uint64_t local_salt);
|
||||
|
||||
/**
|
||||
* Step 0. Once the peer agreed to reconcile txs with us, generate the state required to track
|
||||
* ongoing reconciliations. Must be called only after pre-registering the peer and only once.
|
||||
|
@ -74,6 +115,25 @@ public:
|
|||
ReconciliationRegisterResult RegisterPeer(NodeId peer_id, bool is_peer_inbound,
|
||||
uint32_t peer_recon_version, uint64_t remote_salt);
|
||||
|
||||
/**
|
||||
* Step 1. Add a to-be-announced transaction to the local reconciliation set of the target peer.
|
||||
* Returns false if the set is at capacity, or if the set contains a colliding transaction (alongside
|
||||
* the colliding wtxid). Returns true if the transaction is added to the set (or if it was already in it).
|
||||
*/
|
||||
AddToSetResult AddToSet(NodeId peer_id, const Wtxid& wtxid);
|
||||
|
||||
/**
|
||||
* Checks whether a transaction is part of the peer's reconciliation set.
|
||||
*/
|
||||
bool IsTransactionInSet(NodeId peer_id, const Wtxid& wtxid);
|
||||
|
||||
/**
|
||||
* Before Step 2, we might want to remove a wtxid from the reconciliation set, for example if
|
||||
* the peer just announced the transaction to us.
|
||||
* Returns whether the wtxid was removed.
|
||||
*/
|
||||
bool TryRemovingFromSet(NodeId peer_id, const Wtxid& wtxid);
|
||||
|
||||
/**
|
||||
* Attempts to forget txreconciliation-related state of the peer (if we previously stored any).
|
||||
* After this, we won't be able to reconcile transactions with the peer.
|
||||
|
@ -84,6 +144,26 @@ public:
|
|||
* Check if a peer is registered to reconcile transactions with us.
|
||||
*/
|
||||
bool IsPeerRegistered(NodeId peer_id) const;
|
||||
|
||||
/**
|
||||
* Whether a given peer is currently flagged for fanout.
|
||||
*/
|
||||
bool IsInboundFanoutTarget(NodeId peer_id);
|
||||
|
||||
/**
|
||||
* Get the next time the inbound peer subset should be rotated.
|
||||
*/
|
||||
std::chrono::microseconds GetNextInboundPeerRotationTime();
|
||||
|
||||
/**
|
||||
* Update the next inbound peer rotation time.
|
||||
*/
|
||||
void SetNextInboundPeerRotationTime(std::chrono::microseconds next_time);
|
||||
|
||||
/**
|
||||
* Picks a different subset of inbound peers to fanout to.
|
||||
*/
|
||||
void RotateInboundFanoutTargets();
|
||||
};
|
||||
|
||||
#endif // BITCOIN_NODE_TXRECONCILIATION_H
|
||||
|
|
|
@ -81,4 +81,88 @@ BOOST_AUTO_TEST_CASE(IsPeerRegisteredTest)
|
|||
BOOST_CHECK(!tracker.IsPeerRegistered(peer_id0));
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(AddToSetTest)
|
||||
{
|
||||
TxReconciliationTracker tracker(TXRECONCILIATION_VERSION);
|
||||
NodeId peer_id0 = 0;
|
||||
FastRandomContext frc{/*fDeterministic=*/true};
|
||||
|
||||
Wtxid wtxid{Wtxid::FromUint256(frc.rand256())};
|
||||
|
||||
// If the peer is not registered, adding to the set fails
|
||||
BOOST_REQUIRE(!tracker.IsPeerRegistered(peer_id0));
|
||||
auto r = tracker.AddToSet(peer_id0, wtxid);
|
||||
BOOST_REQUIRE(!r.m_succeeded);
|
||||
BOOST_REQUIRE(!r.m_collision.has_value());
|
||||
|
||||
// As long as the peer is registered, adding a new wtxid to the set should work
|
||||
tracker.PreRegisterPeer(peer_id0);
|
||||
BOOST_REQUIRE_EQUAL(tracker.RegisterPeer(peer_id0, true, 1, 1), ReconciliationRegisterResult::SUCCESS);
|
||||
BOOST_CHECK(tracker.IsPeerRegistered(peer_id0));
|
||||
|
||||
r = tracker.AddToSet(peer_id0, wtxid);
|
||||
BOOST_REQUIRE(r.m_succeeded);
|
||||
BOOST_REQUIRE(!r.m_collision.has_value());
|
||||
|
||||
// If the peer is dropped, adding wtxids to its set should fail
|
||||
tracker.ForgetPeer(peer_id0);
|
||||
Wtxid wtxid2{Wtxid::FromUint256(frc.rand256())};
|
||||
r = tracker.AddToSet(peer_id0, wtxid2);
|
||||
BOOST_REQUIRE(!r.m_succeeded);
|
||||
BOOST_REQUIRE(!r.m_collision.has_value());
|
||||
|
||||
NodeId peer_id1 = 1;
|
||||
tracker.PreRegisterPeer(peer_id1);
|
||||
BOOST_REQUIRE_EQUAL(tracker.RegisterPeer(peer_id1, true, 1, 1), ReconciliationRegisterResult::SUCCESS);
|
||||
BOOST_CHECK(tracker.IsPeerRegistered(peer_id1));
|
||||
|
||||
// As long as the peer is registered, the transaction is not in the set, and there is no short id
|
||||
// collision, adding should work
|
||||
size_t added_txs = 0;
|
||||
while (added_txs < MAX_RECONSET_SIZE) {
|
||||
wtxid = Wtxid::FromUint256(frc.rand256());
|
||||
Wtxid collision;
|
||||
|
||||
r = tracker.AddToSet(peer_id1, wtxid);
|
||||
if (r.m_succeeded) {
|
||||
BOOST_REQUIRE(!r.m_collision.has_value());
|
||||
++added_txs;
|
||||
} else {
|
||||
BOOST_REQUIRE_EQUAL(r.m_collision.value(), collision);
|
||||
}
|
||||
}
|
||||
|
||||
// Adding one more item will fail due to the set being full
|
||||
r = tracker.AddToSet(peer_id1, Wtxid::FromUint256(frc.rand256()));
|
||||
BOOST_REQUIRE(!r.m_succeeded);
|
||||
BOOST_REQUIRE(!r.m_collision.has_value());
|
||||
|
||||
// Trying to add the same item twice will just bypass
|
||||
r = tracker.AddToSet(peer_id1, wtxid);
|
||||
BOOST_REQUIRE(r.m_succeeded);
|
||||
BOOST_REQUIRE(!r.m_collision.has_value());
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(AddToSetCollisionTest)
|
||||
{
|
||||
TxReconciliationTracker tracker(TXRECONCILIATION_VERSION);
|
||||
NodeId peer_id0 = 0;
|
||||
FastRandomContext frc{/*fDeterministic=*/true};
|
||||
|
||||
// Precompute collision
|
||||
Wtxid wtxid{Wtxid::FromUint256(uint256("c70d778bccef36a81aed8da0b819d2bd28bd8653e56a5d40903df1a0ade0b876"))};
|
||||
Wtxid collision{Wtxid::FromUint256(uint256("ae52a6ecb8733fba1f7af6022a8b9dd327d7825054229fafcad7e03c38ae2a50"))};
|
||||
|
||||
// Register the peer with a predefined salt so we can force the collision
|
||||
tracker.PreRegisterPeerWithSalt(peer_id0, 2);
|
||||
BOOST_REQUIRE_EQUAL(tracker.RegisterPeer(peer_id0, true, 1, 1), ReconciliationRegisterResult::SUCCESS);
|
||||
BOOST_CHECK(tracker.IsPeerRegistered(peer_id0));
|
||||
|
||||
// Once the peer is registered, we can try to add both transactions and check
|
||||
BOOST_REQUIRE(tracker.AddToSet(peer_id0, wtxid).m_succeeded);
|
||||
auto r = tracker.AddToSet(peer_id0, collision);
|
||||
BOOST_REQUIRE(!r.m_succeeded);
|
||||
BOOST_REQUIRE_EQUAL(r.m_collision.value(), wtxid);
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_SUITE_END()
|
||||
|
|
Loading…
Add table
Reference in a new issue