mirror of
https://github.com/bitcoin/bitcoin.git
synced 2025-04-29 14:59:39 -04:00
Merge b71131e1af
into c5e44a0435
This commit is contained in:
commit
34f0e0a40b
14 changed files with 2583 additions and 89 deletions
|
@ -536,7 +536,8 @@ public:
|
|||
std::vector<TxOrphanage::OrphanTxBase> GetOrphanTransactions() override EXCLUSIVE_LOCKS_REQUIRED(!m_tx_download_mutex);
|
||||
PeerManagerInfo GetInfo() const override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex);
|
||||
void SendPings() override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex);
|
||||
void RelayTransaction(const uint256& txid, const uint256& wtxid) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex);
|
||||
std::pair<size_t, size_t> GetFanoutPeersCount() override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex);
|
||||
void RelayTransaction(const uint256& txid, const uint256& wtxid, bool consider_fanout) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex);
|
||||
void SetBestBlock(int height, std::chrono::seconds time) override
|
||||
{
|
||||
m_best_height = height;
|
||||
|
@ -614,19 +615,19 @@ private:
|
|||
|
||||
/** Handle a transaction whose result was MempoolAcceptResult::ResultType::VALID.
|
||||
* Updates m_txrequest, m_orphanage, and vExtraTxnForCompact. Also queues the tx for relay. */
|
||||
void ProcessValidTx(NodeId nodeid, const CTransactionRef& tx, const std::list<CTransactionRef>& replaced_transactions)
|
||||
void ProcessValidTx(NodeId nodeid, const CTransactionRef& tx, const std::list<CTransactionRef>& replaced_transactions, bool consider_fanout)
|
||||
EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, g_msgproc_mutex, m_tx_download_mutex);
|
||||
|
||||
/** Handle the results of package validation: calls ProcessValidTx and ProcessInvalidTx for
|
||||
* individual transactions, and caches rejection for the package as a group.
|
||||
*/
|
||||
void ProcessPackageResult(const node::PackageToValidate& package_to_validate, const PackageMempoolAcceptResult& package_result)
|
||||
void ProcessPackageResult(const node::PackageToValidate& package_to_validate, const PackageMempoolAcceptResult& package_result, bool consider_fanout)
|
||||
EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, g_msgproc_mutex, m_tx_download_mutex);
|
||||
|
||||
/**
|
||||
* Reconsider orphan transactions after a parent has been accepted to the mempool.
|
||||
*
|
||||
* @peer[in] peer The peer whose orphan transactions we will reconsider. Generally only
|
||||
* @param[in] peer The peer whose orphan transactions we will reconsider. Generally only
|
||||
* one orphan will be reconsidered on each call of this function. If an
|
||||
* accepted orphan has orphaned children, those will need to be
|
||||
* reconsidered, creating more work, possibly for other peers.
|
||||
|
@ -637,6 +638,17 @@ private:
|
|||
bool ProcessOrphanTx(Peer& peer)
|
||||
EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, g_msgproc_mutex, !m_tx_download_mutex);
|
||||
|
||||
/** Whether we should fanout to a given peer or not. Always returns true for non-Erlay peers
|
||||
* For Erlay-peers, if they are inbound, returns true as long as the peer has been selected for fanout.
|
||||
* If they are outbound, returns true as long as the transaction was received via fanout (further filtering will be performed
|
||||
* before sending out the next INV message to each peer).
|
||||
* Returns false otherwise.
|
||||
*
|
||||
* @param[in] peer The peer we are making the decision on
|
||||
* @param[in] consider_fanout Whether to consider fanout or not (only applies if the peer is outbound)
|
||||
*/
|
||||
bool ShouldFanoutTo(const PeerRef peer, bool consider_fanout) EXCLUSIVE_LOCKS_REQUIRED(m_peer_mutex);
|
||||
|
||||
/** Process a single headers message from a peer.
|
||||
*
|
||||
* @param[in] pfrom CNode of the peer
|
||||
|
@ -720,6 +732,12 @@ private:
|
|||
{
|
||||
m_connman.PushMessage(&node, NetMsg::Make(std::move(msg_type), std::forward<Args>(args)...));
|
||||
}
|
||||
/** Immediately announce transactions to a given peer via INV message(s). */
|
||||
bool AnnounceTxs(std::vector<uint256> remote_missing_wtxids, CNode& pto)
|
||||
EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex);
|
||||
/** Check whether a transaction should be sent to a given peer over several filters */
|
||||
bool ShouldSendTransaction(PeerRef peer, uint256& hash, Peer::TxRelay* tx_relay, const CFeeRate filterrate)
|
||||
EXCLUSIVE_LOCKS_REQUIRED(tx_relay->m_tx_inventory_mutex, tx_relay->m_bloom_filter_mutex);
|
||||
|
||||
/** Send a version message to a peer */
|
||||
void PushNodeVersion(CNode& pnode, const Peer& peer);
|
||||
|
@ -1581,7 +1599,8 @@ void PeerManagerImpl::ReattemptInitialBroadcast(CScheduler& scheduler)
|
|||
CTransactionRef tx = m_mempool.get(txid);
|
||||
|
||||
if (tx != nullptr) {
|
||||
RelayTransaction(txid, tx->GetWitnessHash());
|
||||
// Always consider fanout when relaying our own transactions
|
||||
RelayTransaction(txid, tx->GetWitnessHash(), /*consider_fanout=*/true);
|
||||
} else {
|
||||
m_mempool.RemoveUnbroadcastTx(txid, true);
|
||||
}
|
||||
|
@ -2148,12 +2167,44 @@ void PeerManagerImpl::SendPings()
|
|||
for(auto& it : m_peer_map) it.second->m_ping_queued = true;
|
||||
}
|
||||
|
||||
void PeerManagerImpl::RelayTransaction(const uint256& txid, const uint256& wtxid)
|
||||
std::pair<size_t, size_t> PeerManagerImpl::GetFanoutPeersCount()
|
||||
{
|
||||
|
||||
size_t inbounds_fanout_tx_relay = 0, outbounds_fanout_tx_relay = 0;
|
||||
|
||||
if (m_txreconciliation) {
|
||||
LOCK(m_peer_mutex);
|
||||
for(const auto& [peer_id, peer] : m_peer_map) {
|
||||
if (const auto tx_relay = peer->GetTxRelay()) {
|
||||
const bool peer_relays_txs = WITH_LOCK(tx_relay->m_bloom_filter_mutex, return tx_relay->m_relay_txs);
|
||||
if (peer_relays_txs && !m_txreconciliation->IsPeerRegistered(peer_id)) {
|
||||
peer->m_is_inbound ? ++inbounds_fanout_tx_relay : ++outbounds_fanout_tx_relay;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return std::pair(inbounds_fanout_tx_relay, outbounds_fanout_tx_relay);
|
||||
}
|
||||
|
||||
bool PeerManagerImpl::ShouldFanoutTo(const PeerRef peer, bool consider_fanout)
|
||||
{
|
||||
// We consider Erlay peers for fanout if they are within our inbound fanout targets, or if they are outbounds
|
||||
// and the transaction was NOT received via set reconciliation. For the latter group, further filtering
|
||||
// will be applied at relay time.
|
||||
if (m_txreconciliation && m_txreconciliation->IsPeerRegistered(peer->m_id)) {
|
||||
return (!peer->m_is_inbound && consider_fanout) || m_txreconciliation->IsInboundFanoutTarget(peer->m_id);
|
||||
} else {
|
||||
// For non-Erlay peers we always fanout (same applies if we do not support Erlay)
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
void PeerManagerImpl::RelayTransaction(const uint256& txid, const uint256& wtxid, bool consider_fanout)
|
||||
{
|
||||
LOCK(m_peer_mutex);
|
||||
for(auto& it : m_peer_map) {
|
||||
Peer& peer = *it.second;
|
||||
auto tx_relay = peer.GetTxRelay();
|
||||
for(auto& [peer_id, peer] : m_peer_map) {
|
||||
auto tx_relay = peer->GetTxRelay();
|
||||
if (!tx_relay) continue;
|
||||
|
||||
LOCK(tx_relay->m_tx_inventory_mutex);
|
||||
|
@ -2164,10 +2215,27 @@ void PeerManagerImpl::RelayTransaction(const uint256& txid, const uint256& wtxid
|
|||
// in the announcement.
|
||||
if (tx_relay->m_next_inv_send_time == 0s) continue;
|
||||
|
||||
const uint256& hash{peer.m_wtxid_relay ? wtxid : txid};
|
||||
const uint256& hash{peer->m_wtxid_relay ? wtxid : txid};
|
||||
if (!tx_relay->m_tx_inventory_known_filter.contains(hash)) {
|
||||
bool fanout = ShouldFanoutTo(peer, consider_fanout);
|
||||
// FIXME: This bit here and the corresponding one in SendMessage are basically identical.
|
||||
// Check if there is a way to make them into a private function We would have to pass a
|
||||
// lambda that does the "insert into a collection part", which is what's different from both
|
||||
if (!fanout) {
|
||||
Assume(m_txreconciliation);
|
||||
const auto result = m_txreconciliation->AddToSet(peer_id, Wtxid::FromUint256(wtxid));
|
||||
if (!result.m_succeeded) {
|
||||
fanout = true;
|
||||
if (const auto collision = result.m_collision; collision.has_value()) {
|
||||
Assume(m_txreconciliation->TryRemovingFromSet(peer_id, collision.value()));
|
||||
tx_relay->m_tx_inventory_to_send.insert(collision.value().ToUint256());
|
||||
}
|
||||
}
|
||||
}
|
||||
if (fanout) {
|
||||
tx_relay->m_tx_inventory_to_send.insert(hash);
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
|
@ -3024,7 +3092,7 @@ std::optional<node::PackageToValidate> PeerManagerImpl::ProcessInvalidTx(NodeId
|
|||
return package_to_validate;
|
||||
}
|
||||
|
||||
void PeerManagerImpl::ProcessValidTx(NodeId nodeid, const CTransactionRef& tx, const std::list<CTransactionRef>& replaced_transactions)
|
||||
void PeerManagerImpl::ProcessValidTx(NodeId nodeid, const CTransactionRef& tx, const std::list<CTransactionRef>& replaced_transactions, bool consider_fanout)
|
||||
{
|
||||
AssertLockNotHeld(m_peer_mutex);
|
||||
AssertLockHeld(g_msgproc_mutex);
|
||||
|
@ -3038,14 +3106,14 @@ void PeerManagerImpl::ProcessValidTx(NodeId nodeid, const CTransactionRef& tx, c
|
|||
tx->GetWitnessHash().ToString(),
|
||||
m_mempool.size(), m_mempool.DynamicMemoryUsage() / 1000);
|
||||
|
||||
RelayTransaction(tx->GetHash(), tx->GetWitnessHash());
|
||||
RelayTransaction(tx->GetHash(), tx->GetWitnessHash(), consider_fanout);
|
||||
|
||||
for (const CTransactionRef& removedTx : replaced_transactions) {
|
||||
AddToCompactExtraTransactions(removedTx);
|
||||
}
|
||||
}
|
||||
|
||||
void PeerManagerImpl::ProcessPackageResult(const node::PackageToValidate& package_to_validate, const PackageMempoolAcceptResult& package_result)
|
||||
void PeerManagerImpl::ProcessPackageResult(const node::PackageToValidate& package_to_validate, const PackageMempoolAcceptResult& package_result, bool consider_fanout)
|
||||
{
|
||||
AssertLockNotHeld(m_peer_mutex);
|
||||
AssertLockHeld(g_msgproc_mutex);
|
||||
|
@ -3075,7 +3143,7 @@ void PeerManagerImpl::ProcessPackageResult(const node::PackageToValidate& packag
|
|||
switch (tx_result.m_result_type) {
|
||||
case MempoolAcceptResult::ResultType::VALID:
|
||||
{
|
||||
ProcessValidTx(nodeid, tx, tx_result.m_replaced_transactions);
|
||||
ProcessValidTx(nodeid, tx, tx_result.m_replaced_transactions, consider_fanout);
|
||||
break;
|
||||
}
|
||||
case MempoolAcceptResult::ResultType::INVALID:
|
||||
|
@ -3118,7 +3186,15 @@ bool PeerManagerImpl::ProcessOrphanTx(Peer& peer)
|
|||
|
||||
if (result.m_result_type == MempoolAcceptResult::ResultType::VALID) {
|
||||
LogDebug(BCLog::TXPACKAGES, " accepted orphan tx %s (wtxid=%s)\n", orphanHash.ToString(), orphan_wtxid.ToString());
|
||||
ProcessValidTx(peer.m_id, porphanTx, result.m_replaced_transactions);
|
||||
// When processing orphans after getting a missing parent, we always consider faning the out.
|
||||
// TODO: This is a weird case. ProcessOrphanTx is triggered by the reception of a missing parent, but we do not keep track of
|
||||
// when the orphan was received, so it could be the case that we end up here when the orphan is still in its early propagation
|
||||
// state, or far after. If we are late, relaying the transaction may not be worth it, our peers may already know (reconciling
|
||||
// would even be worse than fanout, since it would simply add more roundtrips to the same outcome: annoincing a transaction that is
|
||||
// already known).
|
||||
// This should only happen if we connect to the network and receive a depending transaction late on its propagation.
|
||||
// However, it may be worth considering how to deal with this case in a followup to reduce unnecessary traffic.
|
||||
ProcessValidTx(peer.m_id, porphanTx, result.m_replaced_transactions, /*consider_fanout=*/true);
|
||||
return true;
|
||||
} else if (state.GetResult() != TxValidationResult::TX_MISSING_INPUTS) {
|
||||
LogDebug(BCLog::TXPACKAGES, " invalid orphan tx %s (wtxid=%s) from peer=%d. %s\n",
|
||||
|
@ -3405,6 +3481,87 @@ void PeerManagerImpl::ProcessCompactBlockTxns(CNode& pfrom, Peer& peer, const Bl
|
|||
return;
|
||||
}
|
||||
|
||||
namespace {
|
||||
class CompareInvMempoolOrder
|
||||
{
|
||||
CTxMemPool* mp;
|
||||
bool m_wtxid_relay;
|
||||
public:
|
||||
explicit CompareInvMempoolOrder(CTxMemPool* _mempool, bool use_wtxid)
|
||||
{
|
||||
mp = _mempool;
|
||||
m_wtxid_relay = use_wtxid;
|
||||
}
|
||||
|
||||
bool operator()(const uint256& a, const uint256& b)
|
||||
{
|
||||
/* As std::make_heap produces a max-heap, we want the entries with the
|
||||
* fewest ancestors/highest fee to sort later. */
|
||||
return mp->CompareDepthAndScore(b, a, m_wtxid_relay);
|
||||
}
|
||||
};
|
||||
} // namespace
|
||||
|
||||
bool PeerManagerImpl::AnnounceTxs(std::vector<uint256> remote_missing_wtxids, CNode& pto)
|
||||
{
|
||||
if (remote_missing_wtxids.size() == 0) return false;
|
||||
PeerRef peer = GetPeerRef(pto.GetId());
|
||||
if (!peer) return false;
|
||||
auto tx_relay = peer->GetTxRelay();
|
||||
if (!tx_relay) return false;
|
||||
LOCK2(tx_relay->m_tx_inventory_mutex, tx_relay->m_bloom_filter_mutex);
|
||||
|
||||
const CFeeRate filterrate{tx_relay->m_fee_filter_received.load()};
|
||||
// Topologically and fee-rate sort the inventory we send for privacy and priority reasons.
|
||||
// A heap is used so that not all items need sorting if only a few are being sent.
|
||||
CompareInvMempoolOrder compareInvMempoolOrder(&m_mempool, true);
|
||||
std::make_heap(remote_missing_wtxids.begin(), remote_missing_wtxids.end(), compareInvMempoolOrder);
|
||||
|
||||
std::vector<CInv> remote_missing_invs;
|
||||
remote_missing_invs.reserve(std::min<size_t>(remote_missing_wtxids.size(), MAX_INV_SZ));
|
||||
unsigned int nRelayedTransactions = 0;
|
||||
while (!remote_missing_wtxids.empty()) {
|
||||
std::pop_heap(remote_missing_wtxids.begin(), remote_missing_wtxids.end(), compareInvMempoolOrder);
|
||||
uint256 wtxid = remote_missing_wtxids.back();
|
||||
remote_missing_wtxids.pop_back();
|
||||
if (!ShouldSendTransaction(peer, wtxid, tx_relay, filterrate)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
tx_relay->m_tx_inventory_known_filter.insert(wtxid);
|
||||
remote_missing_invs.emplace_back(MSG_WTX, wtxid);
|
||||
nRelayedTransactions++;
|
||||
|
||||
if (remote_missing_invs.size() == MAX_INV_SZ || remote_missing_wtxids.empty()) {
|
||||
MakeAndPushMessage(pto, NetMsgType::INV, remote_missing_invs);
|
||||
remote_missing_invs.clear();
|
||||
}
|
||||
}
|
||||
|
||||
return nRelayedTransactions > 0;
|
||||
}
|
||||
|
||||
bool PeerManagerImpl::ShouldSendTransaction(PeerRef peer, uint256& hash, Peer::TxRelay* tx_relay, const CFeeRate filterrate)
|
||||
{
|
||||
AssertLockHeld(tx_relay->m_tx_inventory_mutex);
|
||||
AssertLockHeld(tx_relay->m_bloom_filter_mutex);
|
||||
// Check if not in the filter already
|
||||
if (tx_relay->m_tx_inventory_known_filter.contains(hash)) {
|
||||
return false;
|
||||
}
|
||||
// Not in the mempool anymore? don't bother sending it.
|
||||
auto txinfo = m_mempool.info(GenTxid(peer->m_wtxid_relay ? GenTxid::Wtxid(hash) : GenTxid::Txid(hash)));
|
||||
if (!txinfo.tx) {
|
||||
return false;
|
||||
}
|
||||
// Peer told you to not send transactions at that feerate? Don't bother sending it.
|
||||
if (txinfo.fee < filterrate.GetFee(txinfo.vsize)) {
|
||||
return false;
|
||||
}
|
||||
if (tx_relay->m_bloom_filter && !tx_relay->m_bloom_filter->IsRelevantAndUpdate(*txinfo.tx)) return false;
|
||||
return true;
|
||||
}
|
||||
|
||||
void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, DataStream& vRecv,
|
||||
const std::chrono::microseconds time_received,
|
||||
const std::atomic<bool>& interruptMsgProc)
|
||||
|
@ -4262,6 +4419,9 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type,
|
|||
|
||||
LOCK2(cs_main, m_tx_download_mutex);
|
||||
|
||||
// TODO: Until the Erlay p2p flow is defined, all transactions are flagged for fanout
|
||||
bool consider_fanout = true;
|
||||
|
||||
const auto& [should_validate, package_to_validate] = m_txdownloadman.ReceivedTx(pfrom.GetId(), ptx);
|
||||
if (!should_validate) {
|
||||
if (pfrom.HasPermission(NetPermissionFlags::ForceRelay)) {
|
||||
|
@ -4274,7 +4434,7 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type,
|
|||
} else {
|
||||
LogPrintf("Force relaying tx %s (wtxid=%s) from peer=%d\n",
|
||||
tx.GetHash().ToString(), tx.GetWitnessHash().ToString(), pfrom.GetId());
|
||||
RelayTransaction(tx.GetHash(), tx.GetWitnessHash());
|
||||
RelayTransaction(tx.GetHash(), tx.GetWitnessHash(), consider_fanout);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -4282,7 +4442,7 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type,
|
|||
const auto package_result{ProcessNewPackage(m_chainman.ActiveChainstate(), m_mempool, package_to_validate->m_txns, /*test_accept=*/false, /*client_maxfeerate=*/std::nullopt)};
|
||||
LogDebug(BCLog::TXPACKAGES, "package evaluation for %s: %s\n", package_to_validate->ToString(),
|
||||
package_result.m_state.IsValid() ? "package accepted" : "package rejected");
|
||||
ProcessPackageResult(package_to_validate.value(), package_result);
|
||||
ProcessPackageResult(package_to_validate.value(), package_result, consider_fanout);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
@ -4294,7 +4454,7 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type,
|
|||
const TxValidationState& state = result.m_state;
|
||||
|
||||
if (result.m_result_type == MempoolAcceptResult::ResultType::VALID) {
|
||||
ProcessValidTx(pfrom.GetId(), ptx, result.m_replaced_transactions);
|
||||
ProcessValidTx(pfrom.GetId(), ptx, result.m_replaced_transactions, consider_fanout);
|
||||
pfrom.m_last_tx_time = GetTime<std::chrono::seconds>();
|
||||
}
|
||||
if (state.IsInvalid()) {
|
||||
|
@ -4302,7 +4462,7 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type,
|
|||
const auto package_result{ProcessNewPackage(m_chainman.ActiveChainstate(), m_mempool, package_to_validate->m_txns, /*test_accept=*/false, /*client_maxfeerate=*/std::nullopt)};
|
||||
LogDebug(BCLog::TXPACKAGES, "package evaluation for %s: %s\n", package_to_validate->ToString(),
|
||||
package_result.m_state.IsValid() ? "package accepted" : "package rejected");
|
||||
ProcessPackageResult(package_to_validate.value(), package_result);
|
||||
ProcessPackageResult(package_to_validate.value(), package_result, consider_fanout);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -4914,6 +5074,70 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type,
|
|||
return;
|
||||
}
|
||||
|
||||
if (msg_type == NetMsgType::REQTXRCNCL) {
|
||||
uint16_t peer_recon_set_size, peer_q;
|
||||
vRecv >> peer_recon_set_size >> peer_q;
|
||||
if (!m_txreconciliation->HandleReconciliationRequest(pfrom.GetId(), peer_recon_set_size, peer_q)) {
|
||||
LogInfo("Peer is requesting reconciliation while a previous reconciliation has not finished yet, %s\n", pfrom.DisconnectMsg(fLogIPs));
|
||||
pfrom.fDisconnect = true;
|
||||
};
|
||||
return;
|
||||
}
|
||||
|
||||
if (msg_type == NetMsgType::SKETCH) {
|
||||
std::vector<uint8_t> skdata;
|
||||
vRecv >> skdata;
|
||||
|
||||
std::vector<uint32_t> txs_to_request;
|
||||
std::vector<uint256> txs_to_announce;
|
||||
std::optional<bool> recon_result;
|
||||
bool valid_sketch = m_txreconciliation->HandleSketch(pfrom.GetId(), skdata, txs_to_request, txs_to_announce, recon_result);
|
||||
|
||||
if (valid_sketch) {
|
||||
if (recon_result) {
|
||||
// Handles both successful and failed reconciliation (but not the case per which
|
||||
// we want to request extension).
|
||||
MakeAndPushMessage(pfrom, NetMsgType::RECONCILDIFF, *recon_result, txs_to_request);
|
||||
} else {
|
||||
// No final result means we should request sketch extension to make another
|
||||
// reconciliation attempt without losing the initial data.
|
||||
MakeAndPushMessage(pfrom, NetMsgType::REQSKETCHEXT);
|
||||
}
|
||||
AnnounceTxs(txs_to_announce, pfrom);
|
||||
} else {
|
||||
// Disconnect peers that send reconciliation sketch violating the protocol.
|
||||
LogDebug(BCLog::NET, "sketch from peer=%d violates reconciliation protocol; disconnecting\n", pfrom.GetId());
|
||||
pfrom.fDisconnect = true;
|
||||
return;
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
if (msg_type == NetMsgType::REQSKETCHEXT) {
|
||||
m_txreconciliation->HandleExtensionRequest(pfrom.GetId());
|
||||
return;
|
||||
}
|
||||
|
||||
// Among transactions requested by short ID here, we should send only those transactions
|
||||
// sketched (stored in local set snapshot), because otherwise we would leak privacy (mempool content).
|
||||
if (msg_type == NetMsgType::RECONCILDIFF) {
|
||||
bool recon_result;
|
||||
std::vector<uint32_t> ask_shortids;
|
||||
vRecv >> recon_result >> ask_shortids;
|
||||
|
||||
std::vector<uint256> remote_missing;
|
||||
bool valid_finalization = m_txreconciliation->FinalizeInitByThem(pfrom.GetId(), recon_result, ask_shortids, remote_missing);
|
||||
if (valid_finalization) {
|
||||
AnnounceTxs(remote_missing, pfrom);
|
||||
} else {
|
||||
// Disconnect peers that send reconciliation finalization violating the protocol.
|
||||
LogPrintLevel(BCLog::TXRECONCILIATION, BCLog::Level::Debug, "reconcildiff from peer=%d violates reconciliation protocol; disconnecting\n", pfrom.GetId());
|
||||
pfrom.fDisconnect = true;
|
||||
return;
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// Ignore unknown commands for extensibility
|
||||
LogDebug(BCLog::NET, "Unknown command \"%s\" from peer=%d\n", SanitizeString(msg_type), pfrom.GetId());
|
||||
return;
|
||||
|
@ -5402,26 +5626,6 @@ void PeerManagerImpl::MaybeSendFeefilter(CNode& pto, Peer& peer, std::chrono::mi
|
|||
}
|
||||
}
|
||||
|
||||
namespace {
|
||||
class CompareInvMempoolOrder
|
||||
{
|
||||
CTxMemPool* mp;
|
||||
bool m_wtxid_relay;
|
||||
public:
|
||||
explicit CompareInvMempoolOrder(CTxMemPool *_mempool, bool use_wtxid)
|
||||
{
|
||||
mp = _mempool;
|
||||
m_wtxid_relay = use_wtxid;
|
||||
}
|
||||
|
||||
bool operator()(std::set<uint256>::iterator a, std::set<uint256>::iterator b)
|
||||
{
|
||||
/* As std::make_heap produces a max-heap, we want the entries with the
|
||||
* fewest ancestors/highest fee to sort later. */
|
||||
return mp->CompareDepthAndScore(*b, *a, m_wtxid_relay);
|
||||
}
|
||||
};
|
||||
} // namespace
|
||||
|
||||
bool PeerManagerImpl::RejectIncomingTxs(const CNode& peer) const
|
||||
{
|
||||
|
@ -5490,6 +5694,14 @@ bool PeerManagerImpl::SendMessages(CNode* pto)
|
|||
|
||||
MaybeSendSendHeaders(*pto, *peer);
|
||||
|
||||
// We must look into the reconciliation queue first. Since the queue applies to all peers,
|
||||
// this peer might block other reconciliation if we don't make this call regularly and
|
||||
// unconditionally.
|
||||
bool reconcile = false;
|
||||
if (m_txreconciliation && !m_chainman.IsInitialBlockDownload()) {
|
||||
reconcile = m_txreconciliation->IsPeerNextToReconcileWith(pto->GetId(), current_time);
|
||||
}
|
||||
|
||||
{
|
||||
LOCK(cs_main);
|
||||
|
||||
|
@ -5702,10 +5914,12 @@ bool PeerManagerImpl::SendMessages(CNode* pto)
|
|||
peer->m_blocks_for_inv_relay.clear();
|
||||
}
|
||||
|
||||
if (auto tx_relay = peer->GetTxRelay(); tx_relay != nullptr) {
|
||||
LOCK(tx_relay->m_tx_inventory_mutex);
|
||||
// Check whether periodic sends should happen
|
||||
bool fSendTrickle = pto->HasPermission(NetPermissionFlags::NoBan);
|
||||
if (auto tx_relay = peer->GetTxRelay(); tx_relay != nullptr) {
|
||||
std::vector<std::pair<uint256, uint16_t>> to_be_announced{};
|
||||
{
|
||||
LOCK(tx_relay->m_tx_inventory_mutex);
|
||||
if (tx_relay->m_next_inv_send_time < current_time) {
|
||||
fSendTrickle = true;
|
||||
if (pto->IsInboundConn()) {
|
||||
|
@ -5715,6 +5929,12 @@ bool PeerManagerImpl::SendMessages(CNode* pto)
|
|||
}
|
||||
}
|
||||
|
||||
// Rotate inbound fanout targets if the timer has gone off
|
||||
if (m_txreconciliation && m_txreconciliation->GetNextInboundPeerRotationTime() < current_time) {
|
||||
m_txreconciliation->SetNextInboundPeerRotationTime(m_rng.rand_exp_duration(INBOUND_FANOUT_ROTATION_INTERVAL));
|
||||
m_txreconciliation->RotateInboundFanoutTargets();
|
||||
}
|
||||
|
||||
// Time to send but the peer has requested we not relay transactions.
|
||||
if (fSendTrickle) {
|
||||
LOCK(tx_relay->m_bloom_filter_mutex);
|
||||
|
@ -5757,10 +5977,10 @@ bool PeerManagerImpl::SendMessages(CNode* pto)
|
|||
// Determine transactions to relay
|
||||
if (fSendTrickle) {
|
||||
// Produce a vector with all candidates for sending
|
||||
std::vector<std::set<uint256>::iterator> vInvTx;
|
||||
std::vector<uint256> vInvTx;
|
||||
vInvTx.reserve(tx_relay->m_tx_inventory_to_send.size());
|
||||
for (std::set<uint256>::iterator it = tx_relay->m_tx_inventory_to_send.begin(); it != tx_relay->m_tx_inventory_to_send.end(); it++) {
|
||||
vInvTx.push_back(it);
|
||||
vInvTx.push_back(*it);
|
||||
}
|
||||
const CFeeRate filterrate{tx_relay->m_fee_filter_received.load()};
|
||||
// Topologically and fee-rate sort the inventory we send for privacy and priority reasons.
|
||||
|
@ -5776,36 +5996,88 @@ bool PeerManagerImpl::SendMessages(CNode* pto)
|
|||
while (!vInvTx.empty() && nRelayedTransactions < broadcast_max) {
|
||||
// Fetch the top element from the heap
|
||||
std::pop_heap(vInvTx.begin(), vInvTx.end(), compareInvMempoolOrder);
|
||||
std::set<uint256>::iterator it = vInvTx.back();
|
||||
uint256 hash = vInvTx.back();
|
||||
vInvTx.pop_back();
|
||||
uint256 hash = *it;
|
||||
CInv inv(peer->m_wtxid_relay ? MSG_WTX : MSG_TX, hash);
|
||||
// Remove it from the to-be-sent set
|
||||
tx_relay->m_tx_inventory_to_send.erase(it);
|
||||
// Check if not in the filter already
|
||||
if (tx_relay->m_tx_inventory_known_filter.contains(hash)) {
|
||||
tx_relay->m_tx_inventory_to_send.erase(hash);
|
||||
if (!ShouldSendTransaction(peer, hash, tx_relay, filterrate)) {
|
||||
continue;
|
||||
}
|
||||
// Not in the mempool anymore? don't bother sending it.
|
||||
auto txinfo = m_mempool.info(ToGenTxid(inv));
|
||||
if (!txinfo.tx) {
|
||||
continue;
|
||||
}
|
||||
// Peer told you to not send transactions at that feerate? Don't bother sending it.
|
||||
if (txinfo.fee < filterrate.GetFee(txinfo.vsize)) {
|
||||
continue;
|
||||
}
|
||||
if (tx_relay->m_bloom_filter && !tx_relay->m_bloom_filter->IsRelevantAndUpdate(*txinfo.tx)) continue;
|
||||
// Send
|
||||
vInv.push_back(inv);
|
||||
// Flag to be sent.
|
||||
// Initialize with the counter at 0. This in only relevant for Erlay nodes and will be updated in the
|
||||
// following context after releasing m_tx_inventory_mutex
|
||||
to_be_announced.emplace_back(hash, 0);
|
||||
nRelayedTransactions++;
|
||||
}
|
||||
}
|
||||
} // Unlock the m_tx_inventory_mutex so we can count over m_peer_map
|
||||
|
||||
// Only care about fanout count if we support Erlay
|
||||
if (m_txreconciliation) {
|
||||
LOCK(m_peer_mutex);
|
||||
for (auto& [hash, out_fanout_count] : to_be_announced) {
|
||||
for (const auto& [cur_peer_id, cur_peer] : m_peer_map) {
|
||||
if (auto peer_tx_relay = cur_peer->GetTxRelay()) {
|
||||
LOCK(peer_tx_relay->m_tx_inventory_mutex);
|
||||
if (!pto->IsInboundConn() && peer_tx_relay->m_tx_inventory_known_filter.contains(hash)) {
|
||||
out_fanout_count+=1;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Re-lock
|
||||
LOCK(tx_relay->m_tx_inventory_mutex);
|
||||
for (auto& [hash, out_fanout_count] : to_be_announced) {
|
||||
// Send
|
||||
bool should_fanout = true;
|
||||
|
||||
// FIXME: Remove. A functional test about how many peers to fanout to has failed in CI and I'm currently clueless about why.
|
||||
// Trying to get some context.
|
||||
LogDebug(BCLog::NET, "%d outbound peers currently know about %s\n", out_fanout_count, hash.ToString());
|
||||
// For non-Erlay and inbound peer simply fanout. Erlay-enabled inbounds have been assigned transaction to reconcile
|
||||
// in RelayTransaction, so everything that was added to m_tx_inventory_to_send is to be fanout
|
||||
if (!pto->IsInboundConn() && m_txreconciliation && m_txreconciliation->IsPeerRegistered(pto->GetId())) {
|
||||
// For Erlay-enabled outbound peers we fanout based on how we have heard about this transaction
|
||||
// and how many announcements of this transactions have we sent and receivedx
|
||||
// TODO: If we are the transaction source, we should reduce the threshold by 1, since this the only case
|
||||
// where we are not accounting for at least one reception
|
||||
should_fanout = out_fanout_count <= OUTBOUND_FANOUT_THRESHOLD;
|
||||
}
|
||||
|
||||
CInv inv(peer->m_wtxid_relay ? MSG_WTX : MSG_TX, hash);
|
||||
auto add_to_inv_vec = [&](const CInv inv) EXCLUSIVE_LOCKS_REQUIRED(tx_relay->m_tx_inventory_mutex) {
|
||||
vInv.push_back(inv);
|
||||
if (vInv.size() == MAX_INV_SZ) {
|
||||
MakeAndPushMessage(*pto, NetMsgType::INV, vInv);
|
||||
vInv.clear();
|
||||
}
|
||||
tx_relay->m_tx_inventory_known_filter.insert(hash);
|
||||
tx_relay->m_tx_inventory_known_filter.insert(inv.hash);
|
||||
};
|
||||
|
||||
if (!should_fanout) {
|
||||
// Note we are not handling the case of ancestors being reconciled and descendants being fanout.
|
||||
// This can propagate descendants faster than ancestors, making them orphans. However, transactions
|
||||
// are picked for reconciliation if we consider they have been propagated enough, so in this case
|
||||
// odds are that the peer already knows about the parent (and it's queued to be announced or reconciled to us).
|
||||
Assume(m_txreconciliation);
|
||||
const auto result = m_txreconciliation->AddToSet(pto->GetId(), Wtxid::FromUint256(hash));
|
||||
if (!result.m_succeeded) {
|
||||
should_fanout = true;
|
||||
if (const auto collision = result.m_collision; collision.has_value()) {
|
||||
// In case of a collision, this loop will increase nRelayedTransactions twice
|
||||
Assume(m_txreconciliation->TryRemovingFromSet(pto->GetId(), collision.value()));
|
||||
add_to_inv_vec(CInv(MSG_WTX, collision.value()));
|
||||
}
|
||||
}
|
||||
}
|
||||
if (should_fanout) {
|
||||
add_to_inv_vec(inv);
|
||||
}
|
||||
}
|
||||
|
||||
if (fSendTrickle) {
|
||||
// Ensure we'll respond to GETDATA requests for anything we've just announced
|
||||
LOCK(m_mempool.cs);
|
||||
tx_relay->m_last_inv_sequence = m_mempool.GetSequence();
|
||||
|
@ -5814,6 +6086,38 @@ bool PeerManagerImpl::SendMessages(CNode* pto)
|
|||
if (!vInv.empty())
|
||||
MakeAndPushMessage(*pto, NetMsgType::INV, vInv);
|
||||
|
||||
//
|
||||
// Message: reconciliation response
|
||||
//
|
||||
if (m_txreconciliation && fSendTrickle) {
|
||||
// Transaction reconciliation requests are responded at trickle intervals after making all
|
||||
// relevant transactions available up to this point. This applies only to inbound peers, since
|
||||
// they are the only ones allowed to request reconciliation.
|
||||
std::vector<uint8_t> skdata;
|
||||
if (m_txreconciliation->ShouldRespondToReconciliationRequest(pto->GetId(), skdata)) {
|
||||
// It's perfectly valid to send an empty sketch, because we use this behavior
|
||||
// to trigger early reconciliation termination when it won't help anyway:
|
||||
// - we have no transactions for the peer
|
||||
// - the peer have no transactions for us
|
||||
MakeAndPushMessage(*pto, NetMsgType::SKETCH, skdata);
|
||||
}
|
||||
}
|
||||
|
||||
//
|
||||
// Message: reconciliation request
|
||||
//
|
||||
{
|
||||
if (!m_chainman.IsInitialBlockDownload()) {
|
||||
if (reconcile) {
|
||||
const auto reconciliation_request_params = m_txreconciliation->InitiateReconciliationRequest(pto->GetId());
|
||||
if (reconciliation_request_params) {
|
||||
const auto [local_set_size, local_q_formatted] = *reconciliation_request_params;
|
||||
MakeAndPushMessage(*pto, NetMsgType::REQTXRCNCL, local_set_size, local_q_formatted);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Detect whether we're stalling
|
||||
auto stalling_timeout = m_block_stalling_timeout.load();
|
||||
if (state.m_stalling_since.count() && state.m_stalling_since < current_time - stalling_timeout) {
|
||||
|
|
|
@ -119,7 +119,10 @@ public:
|
|||
virtual PeerManagerInfo GetInfo() const = 0;
|
||||
|
||||
/** Relay transaction to all peers. */
|
||||
virtual void RelayTransaction(const uint256& txid, const uint256& wtxid) = 0;
|
||||
virtual void RelayTransaction(const uint256& txid, const uint256& wtxid, bool consider_fanout) = 0;
|
||||
|
||||
/** Get the amount of inbounds (first) and outbounds fanout peers (second). */
|
||||
virtual std::pair<size_t, size_t> GetFanoutPeersCount() = 0;
|
||||
|
||||
/** Send ping message to all peers */
|
||||
virtual void SendPings() = 0;
|
||||
|
|
|
@ -117,7 +117,8 @@ TransactionError BroadcastTransaction(NodeContext& node, const CTransactionRef t
|
|||
}
|
||||
|
||||
if (relay) {
|
||||
node.peerman->RelayTransaction(txid, wtxid);
|
||||
// Always consider fanout for out own transactions
|
||||
node.peerman->RelayTransaction(txid, wtxid, /*consider_fanout=*/true);
|
||||
}
|
||||
|
||||
return TransactionError::OK;
|
||||
|
|
File diff suppressed because it is too large
Load diff
|
@ -10,10 +10,38 @@
|
|||
|
||||
#include <memory>
|
||||
#include <tuple>
|
||||
#include <optional>
|
||||
|
||||
/** Supported transaction reconciliation protocol version */
|
||||
static constexpr uint32_t TXRECONCILIATION_VERSION{1};
|
||||
|
||||
/**
|
||||
* Maximum number of wtxids stored in a peer local set, bounded to protect the memory use of
|
||||
* reconciliation sets and short ids mappings, and CPU used for sketch computation.
|
||||
*/
|
||||
constexpr size_t MAX_RECONSET_SIZE = 3000;
|
||||
|
||||
/**
|
||||
* Announce transactions via full wtxid to a limited number of inbound and outbound peers.
|
||||
* Justification for these values are provided here:
|
||||
* TODO: ADD link to justification based on simulation results */
|
||||
constexpr double INBOUND_FANOUT_DESTINATIONS_FRACTION = 0.1;
|
||||
constexpr size_t OUTBOUND_FANOUT_THRESHOLD = 4;
|
||||
|
||||
/**
|
||||
* Interval for inbound peer fanout selection. The subset is rotated on a timer.
|
||||
*/
|
||||
static constexpr auto INBOUND_FANOUT_ROTATION_INTERVAL{10min};
|
||||
|
||||
/**
|
||||
* Interval between initiating reconciliations with peers.
|
||||
* This value allows to reconcile ~(7 tx/s * 8s) transactions during normal operation.
|
||||
* More frequent reconciliations would cause significant constant bandwidth overhead
|
||||
* due to reconciliation metadata (sketch sizes etc.), which would nullify the efficiency.
|
||||
* Less frequent reconciliations would introduce high transaction relay latency.
|
||||
*/
|
||||
constexpr std::chrono::microseconds RECON_REQUEST_INTERVAL{8s};
|
||||
|
||||
enum class ReconciliationRegisterResult {
|
||||
NOT_FOUND,
|
||||
SUCCESS,
|
||||
|
@ -21,6 +49,23 @@ enum class ReconciliationRegisterResult {
|
|||
PROTOCOL_VIOLATION,
|
||||
};
|
||||
|
||||
/**
|
||||
* Record whether or not a wtxid was successfully added to a reconciliation set.
|
||||
* In case of failure, check whether this was due to a shortid collision and record
|
||||
* the colliding wtxid.
|
||||
*/
|
||||
class AddToSetResult
|
||||
{
|
||||
public:
|
||||
bool m_succeeded;
|
||||
std::optional<Wtxid> m_collision;
|
||||
|
||||
explicit AddToSetResult(bool added, std::optional<Wtxid> conflict);
|
||||
static AddToSetResult Succeeded();
|
||||
static AddToSetResult Failed();
|
||||
static AddToSetResult Collision(Wtxid);
|
||||
};
|
||||
|
||||
/**
|
||||
* Transaction reconciliation is a way for nodes to efficiently announce transactions.
|
||||
* This object keeps track of all txreconciliation-related communications with the peers.
|
||||
|
@ -67,6 +112,11 @@ public:
|
|||
*/
|
||||
uint64_t PreRegisterPeer(NodeId peer_id);
|
||||
|
||||
/**
|
||||
* For testing purposes only. This SHOULD NEVER be used in production.
|
||||
*/
|
||||
void PreRegisterPeerWithSalt(NodeId peer_id, uint64_t local_salt);
|
||||
|
||||
/**
|
||||
* Step 0. Once the peer agreed to reconcile txs with us, generate the state required to track
|
||||
* ongoing reconciliations. Must be called only after pre-registering the peer and only once.
|
||||
|
@ -74,6 +124,86 @@ public:
|
|||
ReconciliationRegisterResult RegisterPeer(NodeId peer_id, bool is_peer_inbound,
|
||||
uint32_t peer_recon_version, uint64_t remote_salt);
|
||||
|
||||
/**
|
||||
* Step 1. Add a to-be-announced transaction to the local reconciliation set of the target peer.
|
||||
* Returns false if the set is at capacity, or if the set contains a colliding transaction (alongside
|
||||
* the colliding wtxid). Returns true if the transaction is added to the set (or if it was already in it).
|
||||
*/
|
||||
AddToSetResult AddToSet(NodeId peer_id, const Wtxid& wtxid);
|
||||
|
||||
/**
|
||||
* Checks whether a transaction is part of the peer's reconciliation set.
|
||||
*/
|
||||
bool IsTransactionInSet(NodeId peer_id, const Wtxid& wtxid);
|
||||
|
||||
/**
|
||||
* Before Step 2, we might want to remove a wtxid from the reconciliation set, for example if
|
||||
* the peer just announced the transaction to us.
|
||||
* Returns whether the wtxid was removed.
|
||||
*/
|
||||
bool TryRemovingFromSet(NodeId peer_id, const Wtxid& wtxid);
|
||||
|
||||
/**
|
||||
* Returns whether it's time to initiate reconciliation (Step 2) with a given peer, based on:
|
||||
* - time passed since the last reconciliation;
|
||||
* - reconciliation queue;
|
||||
* - whether previous reconciliations for the given peer were finalized.
|
||||
*/
|
||||
bool IsPeerNextToReconcileWith(NodeId peer_id, std::chrono::microseconds now);
|
||||
|
||||
/**
|
||||
* Step 2. Unless the peer hasn't finished a previous reconciliation round, this function will
|
||||
* return the details of our local state, which should be communicated to the peer so that they
|
||||
* better know what we need:
|
||||
* - size of our reconciliation set for the peer
|
||||
* - our q-coefficient with the peer, formatted to be transmitted as integer value
|
||||
* Assumes the peer was previously registered for reconciliations.
|
||||
*/
|
||||
std::optional<std::pair<uint16_t, uint16_t>> InitiateReconciliationRequest(NodeId peer_id);
|
||||
|
||||
/**
|
||||
* Step 2. Record an reconciliation request with parameters to respond when its time.
|
||||
* If peer violates the protocol, disconnect.
|
||||
*/
|
||||
bool HandleReconciliationRequest(NodeId peer_id, uint16_t peer_recon_set_size, uint16_t peer_q);
|
||||
|
||||
/**
|
||||
* Step 2. Once it's time to respond to reconciliation requests, we construct a sketch from
|
||||
* the local reconciliation set, and send it to the initiator.
|
||||
* If the peer was not previously registered for reconciliations or the peers didn't request
|
||||
* to reconcile with us, return false.
|
||||
*/
|
||||
bool ShouldRespondToReconciliationRequest(NodeId peer_id, std::vector<uint8_t>& skdata);
|
||||
|
||||
/**
|
||||
* Step 3. Process a response to our reconciliation request.
|
||||
* Returns false if the peer seems to violate the protocol.
|
||||
* Populates the vectors so that we know which transactions should be requested and announced,
|
||||
* and whether reconciliation succeeded (nullopt if the reconciliation is not over yet and
|
||||
* extension should be requested).
|
||||
*/
|
||||
bool HandleSketch(NodeId peer_id, const std::vector<uint8_t>& skdata,
|
||||
// returning values
|
||||
std::vector<uint32_t>& txs_to_request, std::vector<uint256>& txs_to_announce, std::optional<bool>& result);
|
||||
|
||||
/**
|
||||
* Step 5. Peer requesting extension after reconciliation they initiated failed on their side:
|
||||
* the sketch we sent to them was not sufficient to find the difference.
|
||||
* No privacy leak can happen here because sketch extension is constructed over the snapshot.
|
||||
* If the peer seems to violate the protocol, do nothing.
|
||||
*/
|
||||
void HandleExtensionRequest(NodeId peer_id);
|
||||
|
||||
/**
|
||||
* Step 4. Once we received a signal of reconciliation finalization with a given result from the
|
||||
* initiating peer, announce the following transactions:
|
||||
* - in case of a failure, all transactions we had for that peer
|
||||
* - in case of a success, transactions the peer asked for by short id (ask_shortids)
|
||||
* Return false if the peer seems to violate the protocol.
|
||||
*/
|
||||
bool FinalizeInitByThem(NodeId peer_id, bool recon_result,
|
||||
const std::vector<uint32_t>& remote_missing_short_ids, std::vector<uint256>& remote_missing);
|
||||
|
||||
/**
|
||||
* Attempts to forget txreconciliation-related state of the peer (if we previously stored any).
|
||||
* After this, we won't be able to reconcile transactions with the peer.
|
||||
|
@ -84,6 +214,26 @@ public:
|
|||
* Check if a peer is registered to reconcile transactions with us.
|
||||
*/
|
||||
bool IsPeerRegistered(NodeId peer_id) const;
|
||||
|
||||
/**
|
||||
* Whether a given peer is currently flagged for fanout.
|
||||
*/
|
||||
bool IsInboundFanoutTarget(NodeId peer_id);
|
||||
|
||||
/**
|
||||
* Get the next time the inbound peer subset should be rotated.
|
||||
*/
|
||||
std::chrono::microseconds GetNextInboundPeerRotationTime();
|
||||
|
||||
/**
|
||||
* Update the next inbound peer rotation time.
|
||||
*/
|
||||
void SetNextInboundPeerRotationTime(std::chrono::microseconds next_time);
|
||||
|
||||
/**
|
||||
* Picks a different subset of inbound peers to fanout to.
|
||||
*/
|
||||
void RotateInboundFanoutTargets();
|
||||
};
|
||||
|
||||
#endif // BITCOIN_NODE_TXRECONCILIATION_H
|
||||
|
|
|
@ -264,6 +264,27 @@ inline constexpr const char* WTXIDRELAY{"wtxidrelay"};
|
|||
* txreconciliation, as described by BIP 330.
|
||||
*/
|
||||
inline constexpr const char* SENDTXRCNCL{"sendtxrcncl"};
|
||||
/**
|
||||
* Contains a 4-byte local reconciliation set size and 4-byte q-coefficient
|
||||
* sent to initiate a transaction reconciliation round.
|
||||
* Peer should respond with "sketch" message constructed using these arguments.
|
||||
*/
|
||||
inline constexpr const char* REQTXRCNCL{"reqtxrcncl"};
|
||||
/**
|
||||
* Contains a sketch of the local reconciliation set,
|
||||
* used to efficiently reconcile transactions.
|
||||
* Peer should respond with "reconcildiff" or "reqsketchext" message.
|
||||
*/
|
||||
inline constexpr const char* SKETCH{"sketch"};
|
||||
/**
|
||||
* Indicates whether ongoing reconciliation has succeeded,
|
||||
* and requests the missing transactions by short ids.
|
||||
*/
|
||||
inline constexpr const char* RECONCILDIFF{"reconcildiff"};
|
||||
/**
|
||||
* Requests a sketch extension for ongoing reconciliation.
|
||||
*/
|
||||
inline constexpr const char* REQSKETCHEXT{"reqsketchext"};
|
||||
}; // namespace NetMsgType
|
||||
|
||||
/** All known message types (see above). Keep this in the same order as the list of messages above. */
|
||||
|
@ -303,6 +324,10 @@ inline const std::array ALL_NET_MESSAGE_TYPES{std::to_array<std::string>({
|
|||
NetMsgType::CFCHECKPT,
|
||||
NetMsgType::WTXIDRELAY,
|
||||
NetMsgType::SENDTXRCNCL,
|
||||
NetMsgType::REQTXRCNCL,
|
||||
NetMsgType::SKETCH,
|
||||
NetMsgType::RECONCILDIFF,
|
||||
NetMsgType::REQSKETCHEXT,
|
||||
})};
|
||||
|
||||
/** nServices flags */
|
||||
|
|
|
@ -81,4 +81,189 @@ BOOST_AUTO_TEST_CASE(IsPeerRegisteredTest)
|
|||
BOOST_CHECK(!tracker.IsPeerRegistered(peer_id0));
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(AddToSetTest)
|
||||
{
|
||||
TxReconciliationTracker tracker(TXRECONCILIATION_VERSION);
|
||||
NodeId peer_id0 = 0;
|
||||
FastRandomContext frc{/*fDeterministic=*/true};
|
||||
|
||||
Wtxid wtxid{Wtxid::FromUint256(frc.rand256())};
|
||||
|
||||
// If the peer is not registered, adding to the set fails
|
||||
BOOST_REQUIRE(!tracker.IsPeerRegistered(peer_id0));
|
||||
auto r = tracker.AddToSet(peer_id0, wtxid);
|
||||
BOOST_REQUIRE(!r.m_succeeded);
|
||||
BOOST_REQUIRE(!r.m_collision.has_value());
|
||||
|
||||
// As long as the peer is registered, adding a new wtxid to the set should work
|
||||
tracker.PreRegisterPeer(peer_id0);
|
||||
BOOST_REQUIRE_EQUAL(tracker.RegisterPeer(peer_id0, true, 1, 1), ReconciliationRegisterResult::SUCCESS);
|
||||
BOOST_CHECK(tracker.IsPeerRegistered(peer_id0));
|
||||
|
||||
r = tracker.AddToSet(peer_id0, wtxid);
|
||||
BOOST_REQUIRE(r.m_succeeded);
|
||||
BOOST_REQUIRE(!r.m_collision.has_value());
|
||||
|
||||
// If the peer is dropped, adding wtxids to its set should fail
|
||||
tracker.ForgetPeer(peer_id0);
|
||||
Wtxid wtxid2{Wtxid::FromUint256(frc.rand256())};
|
||||
r = tracker.AddToSet(peer_id0, wtxid2);
|
||||
BOOST_REQUIRE(!r.m_succeeded);
|
||||
BOOST_REQUIRE(!r.m_collision.has_value());
|
||||
|
||||
NodeId peer_id1 = 1;
|
||||
tracker.PreRegisterPeer(peer_id1);
|
||||
BOOST_REQUIRE_EQUAL(tracker.RegisterPeer(peer_id1, true, 1, 1), ReconciliationRegisterResult::SUCCESS);
|
||||
BOOST_CHECK(tracker.IsPeerRegistered(peer_id1));
|
||||
|
||||
// As long as the peer is registered, the transaction is not in the set, and there is no short id
|
||||
// collision, adding should work
|
||||
size_t added_txs = 0;
|
||||
while (added_txs < MAX_RECONSET_SIZE) {
|
||||
wtxid = Wtxid::FromUint256(frc.rand256());
|
||||
Wtxid collision;
|
||||
|
||||
r = tracker.AddToSet(peer_id1, wtxid);
|
||||
if (r.m_succeeded) {
|
||||
BOOST_REQUIRE(!r.m_collision.has_value());
|
||||
++added_txs;
|
||||
} else {
|
||||
BOOST_REQUIRE_EQUAL(r.m_collision.value(), collision);
|
||||
}
|
||||
}
|
||||
|
||||
// Adding one more item will fail due to the set being full
|
||||
r = tracker.AddToSet(peer_id1, Wtxid::FromUint256(frc.rand256()));
|
||||
BOOST_REQUIRE(!r.m_succeeded);
|
||||
BOOST_REQUIRE(!r.m_collision.has_value());
|
||||
|
||||
// Trying to add the same item twice will just bypass
|
||||
r = tracker.AddToSet(peer_id1, wtxid);
|
||||
BOOST_REQUIRE(r.m_succeeded);
|
||||
BOOST_REQUIRE(!r.m_collision.has_value());
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(AddToSetCollisionTest)
|
||||
{
|
||||
TxReconciliationTracker tracker(TXRECONCILIATION_VERSION);
|
||||
NodeId peer_id0 = 0;
|
||||
FastRandomContext frc{/*fDeterministic=*/true};
|
||||
|
||||
// Precompute collision
|
||||
Wtxid wtxid{Wtxid::FromUint256(uint256("c70d778bccef36a81aed8da0b819d2bd28bd8653e56a5d40903df1a0ade0b876"))};
|
||||
Wtxid collision{Wtxid::FromUint256(uint256("ae52a6ecb8733fba1f7af6022a8b9dd327d7825054229fafcad7e03c38ae2a50"))};
|
||||
|
||||
// Register the peer with a predefined salt so we can force the collision
|
||||
tracker.PreRegisterPeerWithSalt(peer_id0, 2);
|
||||
BOOST_REQUIRE_EQUAL(tracker.RegisterPeer(peer_id0, true, 1, 1), ReconciliationRegisterResult::SUCCESS);
|
||||
BOOST_CHECK(tracker.IsPeerRegistered(peer_id0));
|
||||
|
||||
// Once the peer is registered, we can try to add both transactions and check
|
||||
BOOST_REQUIRE(tracker.AddToSet(peer_id0, wtxid).m_succeeded);
|
||||
auto r = tracker.AddToSet(peer_id0, collision);
|
||||
BOOST_REQUIRE(!r.m_succeeded);
|
||||
BOOST_REQUIRE_EQUAL(r.m_collision.value(), wtxid);
|
||||
}
|
||||
|
||||
// Also tests AddToPeerQueue
|
||||
BOOST_AUTO_TEST_CASE(IsPeerNextToReconcileWith)
|
||||
{
|
||||
TxReconciliationTracker tracker(TXRECONCILIATION_VERSION);
|
||||
NodeId peer_id0 = 0;
|
||||
|
||||
// If the peer is not fully registered, the method will return false, doesn't matter the current time
|
||||
BOOST_CHECK(!tracker.IsPeerNextToReconcileWith(peer_id0, GetTime<std::chrono::microseconds>()));
|
||||
tracker.PreRegisterPeer(peer_id0);
|
||||
BOOST_CHECK(!tracker.IsPeerNextToReconcileWith(peer_id0, GetTime<std::chrono::microseconds>()));
|
||||
|
||||
// When the first peer is added to the reconciliation tracker, a full RECON_REQUEST_INTERVAL
|
||||
// is given to let transaction pile up, otherwise the node will request an empty reconciliation
|
||||
// right away
|
||||
BOOST_REQUIRE_EQUAL(tracker.RegisterPeer(peer_id0, false, 1, 1), ReconciliationRegisterResult::SUCCESS);
|
||||
auto current_time = GetTime<std::chrono::microseconds>() + RECON_REQUEST_INTERVAL;
|
||||
BOOST_CHECK(tracker.IsPeerNextToReconcileWith(peer_id0, current_time));
|
||||
|
||||
// Not enough time passed.
|
||||
current_time += RECON_REQUEST_INTERVAL - 1s;
|
||||
BOOST_CHECK(!tracker.IsPeerNextToReconcileWith(peer_id0, current_time));
|
||||
|
||||
// Enough time passed, but the previous reconciliation is still pending.
|
||||
current_time += 1s;
|
||||
BOOST_CHECK(tracker.IsPeerNextToReconcileWith(peer_id0, current_time));
|
||||
|
||||
// TODO: expand these tests once there is a way to drop the pending reconciliation.
|
||||
|
||||
// Two-peer setup
|
||||
tracker.ForgetPeer(peer_id0);
|
||||
NodeId peer_id1 = 1;
|
||||
NodeId peer_id2 = 2;
|
||||
{
|
||||
tracker.PreRegisterPeer(peer_id1);
|
||||
BOOST_REQUIRE_EQUAL(tracker.RegisterPeer(peer_id1, false, 1, 1), ReconciliationRegisterResult::SUCCESS);
|
||||
tracker.PreRegisterPeer(peer_id2);
|
||||
BOOST_REQUIRE_EQUAL(tracker.RegisterPeer(peer_id2, false, 1, 1), ReconciliationRegisterResult::SUCCESS);
|
||||
|
||||
current_time += RECON_REQUEST_INTERVAL;
|
||||
bool peer1_next = tracker.IsPeerNextToReconcileWith(peer_id1, current_time);
|
||||
bool peer2_next = tracker.IsPeerNextToReconcileWith(peer_id2, current_time);
|
||||
BOOST_CHECK(peer1_next && !peer2_next);
|
||||
|
||||
current_time += RECON_REQUEST_INTERVAL/2;
|
||||
peer2_next = tracker.IsPeerNextToReconcileWith(peer_id2, current_time);
|
||||
peer1_next = tracker.IsPeerNextToReconcileWith(peer_id1, current_time);
|
||||
BOOST_CHECK(!peer1_next && peer2_next);
|
||||
|
||||
current_time += RECON_REQUEST_INTERVAL/2;
|
||||
peer1_next = tracker.IsPeerNextToReconcileWith(peer_id1, current_time);
|
||||
peer2_next = tracker.IsPeerNextToReconcileWith(peer_id2, current_time);
|
||||
BOOST_CHECK(peer1_next && !peer2_next);
|
||||
|
||||
// If the peer has pending reconciliation, it doesn't affect the global timer.
|
||||
BOOST_REQUIRE(tracker.InitiateReconciliationRequest(peer_id2) != std::nullopt);
|
||||
current_time += RECON_REQUEST_INTERVAL/2;
|
||||
peer2_next = tracker.IsPeerNextToReconcileWith(peer_id2, current_time);
|
||||
peer1_next = tracker.IsPeerNextToReconcileWith(peer_id1, current_time);
|
||||
BOOST_CHECK(peer1_next && peer2_next);
|
||||
|
||||
tracker.ForgetPeer(peer_id2);
|
||||
current_time += RECON_REQUEST_INTERVAL/2;
|
||||
peer1_next = tracker.IsPeerNextToReconcileWith(peer_id1, current_time);
|
||||
peer2_next = tracker.IsPeerNextToReconcileWith(peer_id2, current_time);
|
||||
BOOST_CHECK(peer1_next && !peer2_next);
|
||||
}
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(InitiateReconciliationRequest)
|
||||
{
|
||||
TxReconciliationTracker tracker(TXRECONCILIATION_VERSION);
|
||||
NodeId peer_id0 = 0;
|
||||
FastRandomContext frc{/*fDeterministic=*/true};
|
||||
|
||||
BOOST_CHECK(tracker.InitiateReconciliationRequest(peer_id0) == std::nullopt);
|
||||
|
||||
tracker.PreRegisterPeer(peer_id0);
|
||||
BOOST_CHECK(tracker.InitiateReconciliationRequest(peer_id0) == std::nullopt);
|
||||
|
||||
BOOST_REQUIRE_EQUAL(tracker.RegisterPeer(peer_id0, false, 1, 1), ReconciliationRegisterResult::SUCCESS);
|
||||
|
||||
const auto reconciliation_request_params = tracker.InitiateReconciliationRequest(peer_id0);
|
||||
BOOST_CHECK(reconciliation_request_params != std::nullopt);
|
||||
const auto [local_set_size, local_q_formatted] = (*reconciliation_request_params);
|
||||
BOOST_CHECK_EQUAL(local_set_size, 0);
|
||||
BOOST_CHECK_EQUAL(local_q_formatted, uint16_t(32767 * 0.25));
|
||||
|
||||
// Start fresh
|
||||
tracker.ForgetPeer(peer_id0);
|
||||
tracker.PreRegisterPeer(peer_id0);
|
||||
BOOST_REQUIRE_EQUAL(tracker.RegisterPeer(peer_id0, false, 1, 1), ReconciliationRegisterResult::SUCCESS);
|
||||
tracker.AddToSet(peer_id0, Wtxid::FromUint256(frc.rand256()));
|
||||
tracker.AddToSet(peer_id0, Wtxid::FromUint256(frc.rand256()));
|
||||
tracker.AddToSet(peer_id0, Wtxid::FromUint256(frc.rand256()));
|
||||
const auto reconciliation_request_params2 = tracker.InitiateReconciliationRequest(peer_id0);
|
||||
BOOST_CHECK(reconciliation_request_params2 != std::nullopt);
|
||||
const auto [local_set_size2, local_q_formatted2] = (*reconciliation_request_params2);
|
||||
BOOST_CHECK_EQUAL(local_set_size2, 3);
|
||||
BOOST_CHECK_EQUAL(local_q_formatted2, uint16_t(32767 * 0.25));
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_SUITE_END()
|
||||
|
|
99
test/functional/p2p_reqtxrcncl.py
Executable file
99
test/functional/p2p_reqtxrcncl.py
Executable file
|
@ -0,0 +1,99 @@
|
|||
#!/usr/bin/env python3
|
||||
# Copyright (c) 2023 The Bitcoin Core developers
|
||||
# Distributed under the MIT software license, see the accompanying
|
||||
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
|
||||
"""Test REQTXRCNCL message
|
||||
"""
|
||||
|
||||
import time
|
||||
import math
|
||||
|
||||
from test_framework.blocktools import COINBASE_MATURITY
|
||||
from test_framework.p2p import P2PInterface
|
||||
from test_framework.test_framework import BitcoinTestFramework
|
||||
from test_framework.util import assert_equal
|
||||
from test_framework.wallet import MiniWallet
|
||||
|
||||
# From txreconciliation.cpp
|
||||
# Interval the node takes to reconcile with all peers. Each peer is reconcile every 1-nth
|
||||
RECON_REQUEST_INTERVAL = 8
|
||||
Q = 0.25
|
||||
Q_PRECISION = (2 << 14) - 1
|
||||
|
||||
class ReqTxrcnclReceiver(P2PInterface):
|
||||
def __init__(self):
|
||||
super().__init__(support_txrcncl = True)
|
||||
self.reqtxrcncl_msg_received = None
|
||||
self.received_inv_items = 0
|
||||
|
||||
def on_inv(self, message):
|
||||
self.received_inv_items += len(message.inv)
|
||||
super().on_inv(message)
|
||||
|
||||
def on_reqtxrcncl(self, message):
|
||||
self.reqtxrcncl_msg_received = message
|
||||
|
||||
class ReqTxRcnclTest(BitcoinTestFramework):
|
||||
def set_test_params(self):
|
||||
self.num_nodes = 1
|
||||
self.extra_args = [['-txreconciliation']]
|
||||
|
||||
def run_test(self):
|
||||
self.nodes[0].setmocktime(int(time.time()))
|
||||
self.generate(self.nodes[0], COINBASE_MATURITY) # mature coinbase UTXO used later
|
||||
|
||||
# Check everything about *sending* REQTXRCNCL.
|
||||
self.log.info('REQTXRCNCL sent to an peer 1 (outbound)')
|
||||
peer = self.nodes[0].add_outbound_p2p_connection(ReqTxrcnclReceiver(), wait_for_verack=True, p2p_idx=0)
|
||||
self.nodes[0].bumpmocktime(RECON_REQUEST_INTERVAL + 1)
|
||||
self.wait_until(lambda: peer.reqtxrcncl_msg_received)
|
||||
|
||||
# No transaction were created, so we expect the set size to be 0, and Q to be default
|
||||
assert_equal(peer.reqtxrcncl_msg_received.set_size, 0)
|
||||
assert_equal(peer.reqtxrcncl_msg_received.q, int(Q_PRECISION * Q))
|
||||
self.nodes[0].disconnect_p2ps()
|
||||
|
||||
self.log.info('REQTXRCNCL sent to outbound peer 0 again, even though we added peer 2 (both outbound)')
|
||||
with self.nodes[0].assert_debug_log(["Register peer=1", "Register peer=2"]):
|
||||
peer1 = self.nodes[0].add_outbound_p2p_connection(ReqTxrcnclReceiver(), wait_for_verack=True, p2p_idx=1)
|
||||
peer2 = self.nodes[0].add_outbound_p2p_connection(ReqTxrcnclReceiver(), wait_for_verack=True, p2p_idx=2)
|
||||
|
||||
# The node takes RECON_REQUEST_INTERVAL to send a REQTXRCNCL to the first peer, since the timer is set on the first
|
||||
# peer connects.
|
||||
self.nodes[0].bumpmocktime(RECON_REQUEST_INTERVAL + 1)
|
||||
self.wait_until(lambda: peer1.reqtxrcncl_msg_received)
|
||||
assert not peer2.reqtxrcncl_msg_received
|
||||
|
||||
# After the timer restarts, the interval per peer is now half, since it is computed over two peers
|
||||
self.log.info('REQTXRCNCL sent to an peer 2 (outbound)')
|
||||
peer1.reqtxrcncl_msg_received = None
|
||||
self.nodes[0].bumpmocktime(math.ceil(RECON_REQUEST_INTERVAL/2) + 1)
|
||||
self.wait_until(lambda: peer2.reqtxrcncl_msg_received)
|
||||
assert not peer1.reqtxrcncl_msg_received
|
||||
self.nodes[0].disconnect_p2ps()
|
||||
|
||||
self.log.info('Check transactions announced (either low-fanout or reconciliation)')
|
||||
with self.nodes[0].assert_debug_log(["Register peer=3", "Register peer=4"]):
|
||||
peer1 = self.nodes[0].add_outbound_p2p_connection(ReqTxrcnclReceiver(), wait_for_verack=True, p2p_idx=3)
|
||||
peer2 = self.nodes[0].add_outbound_p2p_connection(ReqTxrcnclReceiver(), wait_for_verack=True, p2p_idx=4)
|
||||
|
||||
wallet = MiniWallet(self.nodes[0])
|
||||
utxos = wallet.get_utxos()
|
||||
for utxo in utxos:
|
||||
# We want all transactions to be childless.
|
||||
wallet.send_self_transfer(from_node=self.nodes[0], utxo_to_spend=utxo)
|
||||
|
||||
# Since we have disconnected all peers, connecting one again will reset the interval to RECON_REQUEST_INTERVAL per peer
|
||||
self.nodes[0].bumpmocktime(RECON_REQUEST_INTERVAL + 1)
|
||||
self.wait_until(lambda: peer1.reqtxrcncl_msg_received)
|
||||
# After one iteration, it will go down to 1/2 again
|
||||
self.nodes[0].bumpmocktime(math.ceil(RECON_REQUEST_INTERVAL/2) + 1)
|
||||
self.wait_until(lambda: peer2.reqtxrcncl_msg_received)
|
||||
# Some of the announcements may go into the next reconciliation due to the random delay.
|
||||
# TODO: improve this check once it's possible to finalize this reconciliation
|
||||
# and receive the next round.
|
||||
assert peer1.reqtxrcncl_msg_received.set_size + peer1.received_inv_items <= len(utxos)
|
||||
assert peer2.reqtxrcncl_msg_received.set_size + peer2.received_inv_items <= len(utxos)
|
||||
|
||||
if __name__ == '__main__':
|
||||
ReqTxRcnclTest(__file__).main()
|
224
test/functional/p2p_txrecon_initiator.py
Executable file
224
test/functional/p2p_txrecon_initiator.py
Executable file
|
@ -0,0 +1,224 @@
|
|||
#!/usr/bin/env python3
|
||||
# Copyright (c) 2021-2025 The Bitcoin Core developers
|
||||
# Distributed under the MIT software license, see the accompanying
|
||||
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
|
||||
"""Test reconciliation-based transaction relay (node initiates)"""
|
||||
|
||||
import time
|
||||
|
||||
from test_framework.messages import msg_reconcildiff
|
||||
|
||||
from test_framework.wallet import MiniWallet
|
||||
from test_framework.util import assert_equal
|
||||
from test_framework.p2p import P2PDataStore
|
||||
from test_framework.p2p_txrecon import (
|
||||
create_sketch, get_short_id, ReconciliationTest,
|
||||
TxReconTestP2PConn, Q_PRECISION, RECON_Q,
|
||||
MAX_SKETCH_CAPACITY, BYTES_PER_SKETCH_CAPACITY
|
||||
)
|
||||
|
||||
# Taken from net_processing.cpp
|
||||
OUTBOUND_INVENTORY_BROADCAST_INTERVAL = 2
|
||||
|
||||
# Taken from txreconciliation.h
|
||||
OUTBOUND_FANOUT_THRESHOLD = 4
|
||||
RECON_REQUEST_INTERVAL = 8
|
||||
|
||||
|
||||
class ReconciliationInitiatorTest(ReconciliationTest):
|
||||
def set_test_params(self):
|
||||
super().set_test_params()
|
||||
|
||||
# Wait for the next REQTXRCNCL message to be received by the
|
||||
# given peer. Clear and return it.
|
||||
def wait_for_reqtxrcncl(self, peer):
|
||||
def received_reqtxrcncl():
|
||||
return (len(peer.last_reqtxrcncl) > 0)
|
||||
self.wait_until(received_reqtxrcncl)
|
||||
|
||||
return peer.last_reqtxrcncl.pop()
|
||||
|
||||
# Wait for the next RECONCILDIFF message to be received by the
|
||||
# given peer. Clear and return it.
|
||||
def wait_for_reconcildiff(self, peer):
|
||||
def received_reconcildiff():
|
||||
return (len(peer.last_reconcildiff) > 0)
|
||||
self.wait_until(received_reconcildiff)
|
||||
|
||||
return peer.last_reconcildiff.pop()
|
||||
|
||||
# Creates a Sketch using the provided transactions and capacity
|
||||
# and sends it from the given peer.
|
||||
# Returns a list of the short ids contained in the Sketch.
|
||||
def send_sketch_from(self, peer, unique_wtxids, shared_wtxids, capacity):
|
||||
unique_short_txids = [get_short_id(wtxid, peer.combined_salt)
|
||||
for wtxid in unique_wtxids]
|
||||
shared_short_txids = [get_short_id(wtxid, peer.combined_salt)
|
||||
for wtxid in shared_wtxids]
|
||||
|
||||
sketch = create_sketch(unique_short_txids + shared_short_txids, capacity)
|
||||
peer.send_sketch(sketch)
|
||||
|
||||
return unique_short_txids
|
||||
|
||||
def test_reconciliation_initiator_flow_empty_sketch(self):
|
||||
peer = self.test_node.add_outbound_p2p_connection(TxReconTestP2PConn(), p2p_idx=0)
|
||||
|
||||
# Generate transaction only on the node's end, so it has something to announce at the end
|
||||
_, node_unique_txs, _ = self.generate_txs(self.wallet, 0, 10, 0)
|
||||
|
||||
# Do the reconciliation dance announcing an empty sketch
|
||||
# Wait enough to make sure the node adds the transaction to our tracker
|
||||
# And sends us a reconciliation request
|
||||
self.log.info('Testing reconciliation flow sending an empty sketch')
|
||||
self.test_node.bumpmocktime(OUTBOUND_INVENTORY_BROADCAST_INTERVAL * 20)
|
||||
peer.sync_with_ping()
|
||||
self.test_node.bumpmocktime(RECON_REQUEST_INTERVAL)
|
||||
peer.sync_with_ping()
|
||||
self.wait_for_reqtxrcncl(peer)
|
||||
# Sketch is empty
|
||||
self.send_sketch_from(peer, [], [], 0)
|
||||
recon_diff = self.wait_for_reconcildiff(peer)
|
||||
# The node's reply is also empty, signaling early exit
|
||||
assert_equal(recon_diff.ask_shortids, [])
|
||||
|
||||
# The node simply defaults to announce all the transaction it had for us
|
||||
node_unique_wtxids = set([tx.calc_sha256(True) for tx in node_unique_txs])
|
||||
self.wait_for_inv(peer, node_unique_wtxids)
|
||||
self.request_transactions_from(peer, node_unique_wtxids)
|
||||
self.wait_for_txs(peer, node_unique_wtxids)
|
||||
|
||||
# Clear peer
|
||||
peer.peer_disconnect()
|
||||
peer.wait_for_disconnect()
|
||||
|
||||
def test_reconciliation_initiator_protocol_violations(self):
|
||||
# Test disconnect on sending Erlay messages as a non-Erlay peer
|
||||
self.log.info('Testing protocol violation: erlay messages as non-erlay peer')
|
||||
peer = self.test_node.add_outbound_p2p_connection(P2PDataStore(), p2p_idx=0)
|
||||
peer.send_without_ping(msg_reconcildiff())
|
||||
peer.wait_for_disconnect()
|
||||
|
||||
# Test disconnect on sending a REQTXRCNCL as a responder
|
||||
self.log.info('Testing protocol violation: sending REQTXRCNCL as a responder')
|
||||
peer = self.test_node.add_outbound_p2p_connection(TxReconTestP2PConn(), p2p_idx=0)
|
||||
peer.send_reqtxrcncl(0, int(RECON_Q * Q_PRECISION))
|
||||
peer.wait_for_disconnect()
|
||||
|
||||
# Test disconnect on sending a SKETCH out of order
|
||||
self.log.info('Testing protocol violation: sending SKETCH out of order')
|
||||
peer = self.test_node.add_outbound_p2p_connection(TxReconTestP2PConn(), p2p_idx=0)
|
||||
peer.send_sketch([])
|
||||
peer.wait_for_disconnect()
|
||||
|
||||
# Test disconnect on sending a RECONCILDIFF as a responder
|
||||
self.log.info('Testing protocol violation: sending RECONCILDIFF as a responder')
|
||||
peer = self.test_node.add_outbound_p2p_connection(TxReconTestP2PConn(), p2p_idx=0)
|
||||
peer.send_reconcildiff(True, [])
|
||||
peer.wait_for_disconnect()
|
||||
|
||||
# Test disconnect on SKETCH that exceeds maximum capacity
|
||||
self.log.info('Testing protocol violation: sending SKETCH exceeding the maximum capacity')
|
||||
peer = self.test_node.add_outbound_p2p_connection(TxReconTestP2PConn(), p2p_idx=0)
|
||||
# Do the reconciliation dance until announcing the SKETCH
|
||||
self.test_node.bumpmocktime(RECON_REQUEST_INTERVAL)
|
||||
peer.sync_with_ping()
|
||||
self.wait_for_reqtxrcncl(peer)
|
||||
# Send an over-sized sketch (over the maximum allowed capacity)
|
||||
peer.send_sketch([0] * ((MAX_SKETCH_CAPACITY + 1) * BYTES_PER_SKETCH_CAPACITY))
|
||||
peer.wait_for_disconnect()
|
||||
|
||||
def test_reconciliation_initiator_no_extension(self, n_node, n_mininode, n_shared):
|
||||
self.log.info('Testing reconciliation flow without extensions')
|
||||
peers = [self.test_node.add_outbound_p2p_connection(TxReconTestP2PConn(), p2p_idx=i) for i in range(8)]
|
||||
|
||||
# Generate and submit transactions.
|
||||
mininode_unique_txs, node_unique_txs, shared_txs = self.generate_txs(
|
||||
self.wallet, n_mininode, n_node, n_shared)
|
||||
|
||||
# For transactions to have been added to the reconciliation sets of the node's outbound peers, we need no make sure
|
||||
# that the Poisson timer for all of them has ticked. Each timer ticks every 2 seconds on average. However, given the
|
||||
# nature of the test we have no way of checking if the timer has ticked, but we can work around this by making sure we
|
||||
# have waited long enough. By bumping the time ~20 times the expected value, we have a 1/100000000 chances of any of the
|
||||
# timers not ticking (i.e. failing the test later on), which should be more than acceptable
|
||||
self.test_node.bumpmocktime(OUTBOUND_INVENTORY_BROADCAST_INTERVAL * 20)
|
||||
for peer in peers:
|
||||
peer.sync_with_ping()
|
||||
|
||||
# Tick for as many peers as test_node has, so all of them receive a reconciliation request
|
||||
for peer in peers:
|
||||
self.test_node.bumpmocktime(int(RECON_REQUEST_INTERVAL/len(peers)))
|
||||
peer.sync_with_ping()
|
||||
|
||||
empty_recon_requests = 0
|
||||
for peer in peers:
|
||||
# Check we have received a reconciliation request. The request contains either no
|
||||
# elements (the node has been picked for fanout) or as many elements as transactions
|
||||
# where created by the node (n_node + n_shared)
|
||||
node_set_size = self.wait_for_reqtxrcncl(peer).set_size
|
||||
if (node_set_size == 0):
|
||||
empty_recon_requests+=1
|
||||
peer.chosen_for_fanout = True
|
||||
else:
|
||||
assert_equal(node_set_size, n_node + n_shared)
|
||||
peer.chosen_for_fanout = False
|
||||
|
||||
# For outbound peers, if the transaction was created by the node, or receive via fanout,
|
||||
# it will be fanout to up to OUTBOUND_FANOUT_THRESHOLD. We will be reconciling with the rest
|
||||
assert_equal(empty_recon_requests, OUTBOUND_FANOUT_THRESHOLD + 1)
|
||||
|
||||
for peer in peers:
|
||||
# If we received an empty request we can simply respond with an empty sketch
|
||||
# the node will shortcircuit and send us all transactions via fanout
|
||||
capacity = 0 if peer.chosen_for_fanout else n_node + n_mininode
|
||||
unique_wtxids = [tx.calc_sha256(True) for tx in mininode_unique_txs]
|
||||
shared_wtxids = [tx.calc_sha256(True) for tx in shared_txs]
|
||||
peer.unique_short_txids = self.send_sketch_from(peer, unique_wtxids, shared_wtxids, capacity)
|
||||
|
||||
# Check that we received the expected sketch difference, based on the sketch we have sent
|
||||
for peer in peers:
|
||||
recon_diff = self.wait_for_reconcildiff(peer)
|
||||
expected_diff = msg_reconcildiff()
|
||||
if peer.chosen_for_fanout:
|
||||
# If we replied with an empty sketch, they will flag failure and reply with an
|
||||
# empty diff to signal an early exit and default to fanout
|
||||
assert_equal(recon_diff, expected_diff)
|
||||
else:
|
||||
# Otherwise, we expect the decoding to succeed and a request of all out transactions
|
||||
# (given there were no shared transaction)
|
||||
expected_diff.success = 1
|
||||
expected_diff.ask_shortids = peer.unique_short_txids
|
||||
assert_equal(recon_diff, expected_diff)
|
||||
|
||||
# If we were chosen for reconciliation, the node will announce only the transaction we are missing (node_unique)
|
||||
# Otherwise, it will announce all the ones it has (node_unique + shared)
|
||||
node_unique_wtxids = [tx.calc_sha256(True) for tx in node_unique_txs]
|
||||
shared_wtxids = [tx.calc_sha256(True) for tx in shared_txs]
|
||||
for peer in peers:
|
||||
expected_wtxids = set(node_unique_wtxids + shared_wtxids) if peer.chosen_for_fanout else set(node_unique_wtxids)
|
||||
self.wait_for_inv(peer, expected_wtxids)
|
||||
self.request_transactions_from(peer, expected_wtxids)
|
||||
self.wait_for_txs(peer, expected_wtxids)
|
||||
|
||||
if not peer.chosen_for_fanout:
|
||||
# If we received a populated diff, the node will be expecting
|
||||
# some transactions in return. The reconciliation flow has really
|
||||
# finished already, but we should be well behaved
|
||||
peer.send_txs_and_test(mininode_unique_txs, self.test_node)
|
||||
|
||||
def run_test(self):
|
||||
self.test_node = self.nodes[0]
|
||||
self.test_node.setmocktime(int(time.time()))
|
||||
self.wallet = MiniWallet(self.nodes[0])
|
||||
self.generate(self.wallet, 512)
|
||||
|
||||
self.test_reconciliation_initiator_flow_empty_sketch()
|
||||
self.test_reconciliation_initiator_protocol_violations()
|
||||
self.test_reconciliation_initiator_no_extension(20, 15, 0)
|
||||
|
||||
# TODO: Add more cases, potentially including also extensions
|
||||
# if we end up not dropping them from the PR
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
ReconciliationInitiatorTest(__file__).main()
|
161
test/functional/p2p_txrecon_responder.py
Executable file
161
test/functional/p2p_txrecon_responder.py
Executable file
|
@ -0,0 +1,161 @@
|
|||
#!/usr/bin/env python3
|
||||
# Copyright (c) 2021-2025 The Bitcoin Core developers
|
||||
# Distributed under the MIT software license, see the accompanying
|
||||
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
|
||||
"""Test reconciliation-based transaction relay (node responds)"""
|
||||
|
||||
import time
|
||||
|
||||
from test_framework.messages import msg_reqtxrcncl
|
||||
from test_framework.p2p import P2PDataStore
|
||||
from test_framework.p2p_txrecon import (
|
||||
create_sketch, get_short_id, estimate_capacity,
|
||||
Q_PRECISION, RECON_Q, ReconciliationTest, TxReconTestP2PConn
|
||||
)
|
||||
from test_framework.util import assert_equal
|
||||
from test_framework.wallet import MiniWallet
|
||||
|
||||
# Taken from net_processing.cpp
|
||||
INBOUND_INVENTORY_BROADCAST_INTERVAL = 5
|
||||
|
||||
|
||||
class ReconciliationResponderTest(ReconciliationTest):
|
||||
def set_test_params(self):
|
||||
super().set_test_params()
|
||||
|
||||
# Wait for the next SKETCH message to be received by the
|
||||
# given peer. Clear and return it.
|
||||
def wait_for_sketch(self, peer):
|
||||
def received_sketch():
|
||||
return (len(peer.last_sketch) > 0)
|
||||
self.wait_until(received_sketch, timeout=2)
|
||||
|
||||
return peer.last_sketch.pop()
|
||||
|
||||
# Check that the node announced the exact sketch we expected (of the expected capacity
|
||||
# and over the expected transactions)
|
||||
def check_sketch(self, peer, skdata, expected_wtxids, local_set_size):
|
||||
expected_short_ids = [get_short_id(wtxid, peer.combined_salt)
|
||||
for wtxid in expected_wtxids]
|
||||
|
||||
if len(expected_wtxids) == 0:
|
||||
expected_capacity = 0
|
||||
else:
|
||||
expected_capacity = estimate_capacity(len(expected_wtxids), local_set_size)
|
||||
expected_sketch = create_sketch(expected_short_ids, expected_capacity)
|
||||
|
||||
assert_equal(skdata, expected_sketch)
|
||||
|
||||
# Send a RECONCILDIFF message from the given peer, including a sketch of
|
||||
# the given transactions.
|
||||
def send_reconcildiff_from(self, peer, success, wtxids_to_request, sync_with_ping=False):
|
||||
ask_shortids = [get_short_id(wtxid, peer.combined_salt)
|
||||
for wtxid in wtxids_to_request]
|
||||
peer.send_reconcildiff(success, ask_shortids, sync_with_ping)
|
||||
|
||||
def test_reconciliation_responder_flow_empty_sketch(self):
|
||||
self.log.info('Testing reconciliation flow sending an empty REQRXRCNCL')
|
||||
peer = self.test_node.add_p2p_connection(TxReconTestP2PConn())
|
||||
# Send a reconciliation request without creating any transactions
|
||||
peer.send_reqtxrcncl(0, int(RECON_Q * Q_PRECISION))
|
||||
|
||||
# We need to make sure the node has trickled for inbounds. Waiting bumping for 20x the expected
|
||||
# time gives us a 1/1000000000 chances of failing
|
||||
self.test_node.bumpmocktime(INBOUND_INVENTORY_BROADCAST_INTERVAL * 20)
|
||||
peer.sync_with_ping()
|
||||
|
||||
# Node sends us an empty sketch
|
||||
received_sketch = self.wait_for_sketch(peer)
|
||||
assert_equal(received_sketch.skdata, [])
|
||||
|
||||
# It doesn't really matter what we send them here as our diff, given they have no
|
||||
# transaction for us, so nothing will match their local set and the node will simply terminate.
|
||||
self.send_reconcildiff_from(peer, True, [], sync_with_ping=True)
|
||||
|
||||
# We can check this is the case by sending another reconciliation request, and check
|
||||
# how they reply to it (the node won't reply if the previous reconciliation was still pending)
|
||||
peer.send_reqtxrcncl(0, int(RECON_Q * Q_PRECISION))
|
||||
self.test_node.bumpmocktime(INBOUND_INVENTORY_BROADCAST_INTERVAL * 20)
|
||||
peer.sync_with_ping()
|
||||
received_sketch = self.wait_for_sketch(peer)
|
||||
|
||||
# Clear peer
|
||||
peer.peer_disconnect()
|
||||
peer.wait_for_disconnect()
|
||||
|
||||
def test_reconciliation_responder_protocol_violations(self):
|
||||
# Test disconnect on sending Erlay messages as a non-Erlay peer
|
||||
self.log.info('Testing protocol violation: erlay messages as non-erlay peer')
|
||||
peer = self.test_node.add_p2p_connection(P2PDataStore())
|
||||
peer.send_without_ping(msg_reqtxrcncl())
|
||||
peer.wait_for_disconnect()
|
||||
|
||||
# Test disconnect on sending multiple REQTXRCNCL without receiving a response
|
||||
self.log.info('Testing protocol violation: sending multiple REQTXRCNCL without waiting for a response')
|
||||
peer = self.test_node.add_p2p_connection(TxReconTestP2PConn())
|
||||
peer.send_reqtxrcncl(0, int(RECON_Q * Q_PRECISION))
|
||||
peer.send_reqtxrcncl(0, int(RECON_Q * Q_PRECISION))
|
||||
peer.wait_for_disconnect()
|
||||
|
||||
# Test disconnect on sending SKETCH as initiator
|
||||
self.log.info('Testing protocol violation: sending SKETCH as initiator')
|
||||
peer = self.test_node.add_p2p_connection(TxReconTestP2PConn())
|
||||
peer.send_sketch([])
|
||||
peer.wait_for_disconnect()
|
||||
|
||||
# Test disconnect on sending a RECONCILDIFF out-of-order
|
||||
self.log.info('Testing protocol violation: sending RECONCILDIFF out of order')
|
||||
peer = self.test_node.add_p2p_connection(TxReconTestP2PConn())
|
||||
self.send_reconcildiff_from(peer, True, [])
|
||||
peer.wait_for_disconnect()
|
||||
|
||||
def test_reconciliation_responder_flow_no_extension(self, n_mininode, n_node):
|
||||
self.log.info('Testing reconciliation flow without extensions')
|
||||
peer = self.test_node.add_p2p_connection(TxReconTestP2PConn())
|
||||
# Generate and submit transactions.
|
||||
mininode_unique_txs, node_unique_txs, _ = self.generate_txs(self.wallet, n_mininode, n_node, 0)
|
||||
node_unique_wtxids = [tx.calc_sha256(True) for tx in node_unique_txs]
|
||||
|
||||
# Send a reconciliation request. The request will be queued and replied on the next inbound trickle
|
||||
peer.send_reqtxrcncl(n_mininode, int(RECON_Q * Q_PRECISION))
|
||||
|
||||
# We need to make sure the node has trickled for inbounds. Waiting bumping for 20x the expected
|
||||
# time gives us a 1/1000000000 chances of failing
|
||||
self.test_node.bumpmocktime(INBOUND_INVENTORY_BROADCAST_INTERVAL * 20)
|
||||
|
||||
received_sketch = self.wait_for_sketch(peer)
|
||||
self.check_sketch(peer, received_sketch.skdata, node_unique_wtxids, n_mininode)
|
||||
|
||||
# Diff should be all the node has that they don't have (their unique txs)
|
||||
self.send_reconcildiff_from(peer, True, node_unique_wtxids)
|
||||
|
||||
self.wait_for_inv(peer, set(node_unique_wtxids))
|
||||
self.request_transactions_from(peer, node_unique_wtxids)
|
||||
self.wait_for_txs(peer, node_unique_wtxids)
|
||||
|
||||
# Send our bit
|
||||
peer.send_txs_and_test(mininode_unique_txs, self.test_node)
|
||||
|
||||
# Clear peer
|
||||
peer.peer_disconnect()
|
||||
peer.wait_for_disconnect()
|
||||
|
||||
def run_test(self):
|
||||
self.test_node = self.nodes[0]
|
||||
self.test_node.setmocktime(int(time.time()))
|
||||
self.wallet = MiniWallet(self.nodes[0])
|
||||
self.generate(self.wallet, 512)
|
||||
|
||||
# These node will consume some of the low-fanout announcements.
|
||||
self.outbound_peers = [self.test_node.add_p2p_connection(TxReconTestP2PConn()) for _ in range(4)]
|
||||
|
||||
self.test_reconciliation_responder_flow_empty_sketch()
|
||||
self.test_reconciliation_responder_protocol_violations()
|
||||
self.test_reconciliation_responder_flow_no_extension(20, 15)
|
||||
|
||||
# TODO: Add more cases, potentially including also extensions
|
||||
# if we end up not dropping them from the PR
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
ReconciliationResponderTest(__file__).main()
|
|
@ -153,10 +153,14 @@ def ser_string(s):
|
|||
def deser_uint256(f):
|
||||
return int.from_bytes(f.read(32), 'little')
|
||||
|
||||
|
||||
def ser_uint256(u):
|
||||
return u.to_bytes(32, 'little')
|
||||
|
||||
def deser_uint32(f):
|
||||
return int.from_bytes(f.read(4), 'little')
|
||||
|
||||
def ser_uint32(u):
|
||||
return u.to_bytes(4, 'little')
|
||||
|
||||
def uint256_from_str(s):
|
||||
return int.from_bytes(s[:32], 'little')
|
||||
|
@ -211,6 +215,34 @@ def ser_uint256_vector(l):
|
|||
r += ser_uint256(i)
|
||||
return r
|
||||
|
||||
def deser_uint32_vector(f):
|
||||
nit = deser_compact_size(f)
|
||||
r = []
|
||||
for _ in range(nit):
|
||||
t = deser_uint32(f)
|
||||
r.append(t)
|
||||
return r
|
||||
|
||||
def ser_uint32_vector(l):
|
||||
r = ser_compact_size(len(l))
|
||||
for i in l:
|
||||
r += ser_uint32(i)
|
||||
return r
|
||||
|
||||
def deser_uint8_vector(f):
|
||||
nit = deser_compact_size(f)
|
||||
r = []
|
||||
for _ in range(nit):
|
||||
t = int.from_bytes(f.read(1), 'little')
|
||||
r.append(t)
|
||||
return r
|
||||
|
||||
def ser_uint8_vector(l):
|
||||
r = ser_compact_size(len(l))
|
||||
for i in l:
|
||||
r += i.to_bytes(1, 'little')
|
||||
return r
|
||||
|
||||
|
||||
def deser_string_vector(f):
|
||||
nit = deser_compact_size(f)
|
||||
|
@ -1919,6 +1951,87 @@ class msg_sendtxrcncl:
|
|||
return "msg_sendtxrcncl(version=%lu, salt=%lu)" %\
|
||||
(self.version, self.salt)
|
||||
|
||||
class msg_reqtxrcncl:
|
||||
__slots__ = ("set_size", "q")
|
||||
msgtype = b"reqtxrcncl"
|
||||
|
||||
def __init__(self):
|
||||
self.set_size = 0
|
||||
self.q = 0
|
||||
|
||||
def deserialize(self, f):
|
||||
self.set_size = int.from_bytes(f.read(2), "little")
|
||||
self.q = int.from_bytes(f.read(2), "little")
|
||||
|
||||
def serialize(self):
|
||||
r = b""
|
||||
r += self.set_size.to_bytes(2, "little")
|
||||
r += self.q.to_bytes(2, "little")
|
||||
return r
|
||||
|
||||
def __repr__(self):
|
||||
return "msg_reqtxrcncl(set_size=%lu, q=%lu)" %\
|
||||
(self.set_size, self.q)
|
||||
|
||||
class msg_sketch:
|
||||
__slots__ = ("skdata")
|
||||
msgtype = b"sketch"
|
||||
|
||||
def __init__(self):
|
||||
self.skdata = []
|
||||
|
||||
def deserialize(self, f):
|
||||
self.skdata = deser_uint8_vector(f)
|
||||
|
||||
def serialize(self):
|
||||
r = b""
|
||||
r += ser_uint8_vector(self.skdata)
|
||||
return r
|
||||
|
||||
def __repr__(self):
|
||||
return "msg_sketch(sketch_size=%i)" % (len(self.skdata))
|
||||
|
||||
class msg_reqsketchext:
|
||||
__slots__ = ()
|
||||
msgtype = b"reqsketchext"
|
||||
|
||||
def __init__(self):
|
||||
return
|
||||
|
||||
def deserialize(self, f):
|
||||
return
|
||||
|
||||
def serialize(self):
|
||||
r = b""
|
||||
return r
|
||||
|
||||
def __repr__(self):
|
||||
return "msg_reqsketchext"
|
||||
|
||||
class msg_reconcildiff:
|
||||
__slots__ = ("success", "ask_shortids")
|
||||
msgtype = b"reconcildiff"
|
||||
|
||||
def __init__(self):
|
||||
self.success = 0
|
||||
self.ask_shortids = []
|
||||
|
||||
def __eq__(self, other):
|
||||
return self.success == other.success and set(self.ask_shortids) == set(other.ask_shortids)
|
||||
|
||||
def deserialize(self, f):
|
||||
self.success = int.from_bytes(f.read(1), "little")
|
||||
self.ask_shortids = deser_uint32_vector(f)
|
||||
|
||||
def serialize(self):
|
||||
r = b""
|
||||
r += self.success.to_bytes(1, "little")
|
||||
r += ser_uint32_vector(self.ask_shortids)
|
||||
return r
|
||||
|
||||
def __repr__(self):
|
||||
return "msg_reconcildiff(success=%i,ask_shortids=%i)" % (self.success, len(self.ask_shortids))
|
||||
|
||||
class TestFrameworkScript(unittest.TestCase):
|
||||
def test_addrv2_encode_decode(self):
|
||||
def check_addrv2(ip, net):
|
||||
|
|
|
@ -60,10 +60,14 @@ from test_framework.messages import (
|
|||
msg_notfound,
|
||||
msg_ping,
|
||||
msg_pong,
|
||||
msg_reconcildiff,
|
||||
msg_reqsketchext,
|
||||
msg_reqtxrcncl,
|
||||
msg_sendaddrv2,
|
||||
msg_sendcmpct,
|
||||
msg_sendheaders,
|
||||
msg_sendtxrcncl,
|
||||
msg_sketch,
|
||||
msg_tx,
|
||||
MSG_TX,
|
||||
MSG_TYPE_MASK,
|
||||
|
@ -138,10 +142,14 @@ MESSAGEMAP = {
|
|||
b"notfound": msg_notfound,
|
||||
b"ping": msg_ping,
|
||||
b"pong": msg_pong,
|
||||
b"reconcildiff": msg_reconcildiff,
|
||||
b"reqsketchext": msg_reqsketchext,
|
||||
b"reqtxrcncl": msg_reqtxrcncl,
|
||||
b"sendaddrv2": msg_sendaddrv2,
|
||||
b"sendcmpct": msg_sendcmpct,
|
||||
b"sendheaders": msg_sendheaders,
|
||||
b"sendtxrcncl": msg_sendtxrcncl,
|
||||
b"sketch": msg_sketch,
|
||||
b"tx": msg_tx,
|
||||
b"verack": msg_verack,
|
||||
b"version": msg_version,
|
||||
|
@ -454,7 +462,7 @@ class P2PInterface(P2PConnection):
|
|||
|
||||
Individual testcases should subclass this and override the on_* methods
|
||||
if they want to alter message handling behaviour."""
|
||||
def __init__(self, support_addrv2=False, wtxidrelay=True):
|
||||
def __init__(self, support_addrv2=False, wtxidrelay=True, support_txrcncl=False):
|
||||
super().__init__()
|
||||
|
||||
# Track number of messages of each type received.
|
||||
|
@ -473,6 +481,7 @@ class P2PInterface(P2PConnection):
|
|||
self.nServices = 0
|
||||
|
||||
self.support_addrv2 = support_addrv2
|
||||
self.support_txrcncl = support_txrcncl
|
||||
|
||||
# If the peer supports wtxid-relay
|
||||
self.wtxidrelay = wtxidrelay
|
||||
|
@ -584,6 +593,11 @@ class P2PInterface(P2PConnection):
|
|||
self.send_without_ping(msg_wtxidrelay())
|
||||
if self.support_addrv2:
|
||||
self.send_without_ping(msg_sendaddrv2())
|
||||
if self.support_txrcncl:
|
||||
sendtxrcncl_msg = msg_sendtxrcncl()
|
||||
sendtxrcncl_msg.version = 1
|
||||
sendtxrcncl_msg.salt = 2
|
||||
self.send_without_ping(sendtxrcncl_msg)
|
||||
self.send_without_ping(msg_verack())
|
||||
self.nServices = message.nServices
|
||||
self.relay = message.relay
|
||||
|
|
237
test/functional/test_framework/p2p_txrecon.py
Normal file
237
test/functional/test_framework/p2p_txrecon.py
Normal file
|
@ -0,0 +1,237 @@
|
|||
#!/usr/bin/env python3
|
||||
# Copyright (c) 2021-2021 The Bitcoin Core developers
|
||||
# Distributed under the MIT software license, see the accompanying
|
||||
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
|
||||
"""Helpers to test reconciliation-based transaction relay, both initiator and responder roles"""
|
||||
|
||||
import random
|
||||
from io import BytesIO
|
||||
|
||||
|
||||
from test_framework.messages import (
|
||||
msg_wtxidrelay, msg_verack, msg_sendtxrcncl,
|
||||
msg_reqtxrcncl, msg_sketch, msg_reconcildiff,
|
||||
msg_reqsketchext,msg_inv, msg_getdata,
|
||||
MSG_WTX, MSG_BLOCK, CTransaction, CInv,
|
||||
)
|
||||
from test_framework.key import TaggedHash
|
||||
from test_framework.p2p import P2PDataStore
|
||||
from test_framework.util import assert_equal
|
||||
from test_framework.crypto.siphash import siphash256
|
||||
from test_framework.test_framework import BitcoinTestFramework
|
||||
|
||||
|
||||
# These parameters are specified in the BIP-0330.
|
||||
Q_PRECISION = (2 << 14) - 1
|
||||
FIELD_BITS = 32
|
||||
FIELD_MODULUS = (1 << FIELD_BITS) + 0b10001101
|
||||
BYTES_PER_SKETCH_CAPACITY = int(FIELD_BITS / 8)
|
||||
# These parameters are suggested by the Erlay paper based on analysis and
|
||||
# simulations.
|
||||
RECON_Q = 0.25
|
||||
|
||||
MAX_SKETCH_CAPACITY = 2 << 12
|
||||
|
||||
|
||||
def mul2(x):
|
||||
"""Compute 2*x in GF(2^FIELD_BITS)"""
|
||||
return (x << 1) ^ (FIELD_MODULUS if x.bit_length() >= FIELD_BITS else 0)
|
||||
|
||||
|
||||
def mul(x, y):
|
||||
"""Compute x*y in GF(2^FIELD_BITS)"""
|
||||
ret = 0
|
||||
for bit in [(x >> i) & 1 for i in range(x.bit_length())]:
|
||||
ret, y = ret ^ bit * y, mul2(y)
|
||||
return ret
|
||||
|
||||
|
||||
def create_sketch(shortids, capacity):
|
||||
"""Compute the bytes of a sketch for given shortids and given capacity."""
|
||||
odd_sums = [0 for _ in range(capacity)]
|
||||
for shortid in shortids:
|
||||
squared = mul(shortid, shortid)
|
||||
for i in range(capacity):
|
||||
odd_sums[i] ^= shortid
|
||||
shortid = mul(shortid, squared)
|
||||
sketch_bytes = []
|
||||
for odd_sum in odd_sums:
|
||||
for i in range(4):
|
||||
sketch_bytes.append((odd_sum >> (i * 8)) & 0xff)
|
||||
return sketch_bytes
|
||||
|
||||
|
||||
def get_short_id(wtxid, salt):
|
||||
(k0, k1) = salt
|
||||
s = siphash256(k0, k1, wtxid)
|
||||
return 1 + (s & 0xFFFFFFFF)
|
||||
|
||||
|
||||
def estimate_capacity(theirs, ours):
|
||||
set_size_diff = abs(theirs - ours)
|
||||
min_size = min(ours, theirs)
|
||||
weighted_min_size = int(RECON_Q * min_size)
|
||||
estimated_diff = 1 + weighted_min_size + set_size_diff
|
||||
|
||||
# Poor man's minisketch_compute_capacity.
|
||||
return estimated_diff if estimated_diff <= 9 else estimated_diff - 1
|
||||
|
||||
def generate_transaction(node, from_txid):
|
||||
to_address = node.getnewaddress()
|
||||
inputs = [{"txid": from_txid, "vout": 0}]
|
||||
outputs = {to_address: 0.0001}
|
||||
rawtx = node.createrawtransaction(inputs, outputs)
|
||||
signresult = node.signrawtransactionwithwallet(rawtx)
|
||||
tx = CTransaction()
|
||||
tx.deserialize(BytesIO(bytes.fromhex(signresult['hex'])))
|
||||
tx.rehash()
|
||||
return tx
|
||||
|
||||
|
||||
class TxReconTestP2PConn(P2PDataStore):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.recon_version = 1
|
||||
self.mininode_salt = random.randrange(0xffffff)
|
||||
self.node_salt = 0
|
||||
self.combined_salt = None
|
||||
self.last_sendtxrcncl = []
|
||||
self.last_reqtxrcncl = []
|
||||
self.last_sketch = []
|
||||
self.last_reconcildiff = []
|
||||
self.last_reqsketchext = []
|
||||
self.last_inv = []
|
||||
self.last_tx = []
|
||||
|
||||
def on_version(self, message):
|
||||
if self.recon_version == 1:
|
||||
if not self.p2p_connected_to_node:
|
||||
self.send_version()
|
||||
assert message.nVersion >= 70016, "We expect the node to support WTXID relay"
|
||||
self.send_without_ping(msg_wtxidrelay())
|
||||
self.send_sendtxrcncl()
|
||||
self.send_without_ping(msg_verack())
|
||||
self.nServices = message.nServices
|
||||
else:
|
||||
super().on_version(message)
|
||||
|
||||
def on_sendtxrcncl(self, message):
|
||||
self.node_salt = message.salt
|
||||
self.combined_salt = self.compute_salt()
|
||||
|
||||
def on_reqtxrcncl(self, message):
|
||||
self.last_reqtxrcncl.append(message)
|
||||
|
||||
def on_sketch(self, message):
|
||||
self.last_sketch.append(message)
|
||||
|
||||
def on_reconcildiff(self, message):
|
||||
self.last_reconcildiff.append(message)
|
||||
|
||||
def on_reqsketchext(self, message):
|
||||
self.last_reqsketchext.append(message)
|
||||
|
||||
def on_inv(self, message):
|
||||
self.last_inv.append([inv.hash for inv in message.inv if inv.type != MSG_BLOCK]) # ignore block invs
|
||||
|
||||
def on_tx(self, message):
|
||||
self.last_tx.append(message.tx.calc_sha256(True))
|
||||
|
||||
def send_sendtxrcncl(self):
|
||||
msg = msg_sendtxrcncl()
|
||||
msg.salt = self.mininode_salt
|
||||
msg.version = self.recon_version
|
||||
self.send_without_ping(msg)
|
||||
|
||||
def send_reqtxrcncl(self, set_size, q):
|
||||
msg = msg_reqtxrcncl()
|
||||
msg.set_size = set_size
|
||||
msg.q = q
|
||||
self.send_without_ping(msg)
|
||||
|
||||
def send_sketch(self, skdata):
|
||||
msg = msg_sketch()
|
||||
msg.skdata = skdata
|
||||
self.send_without_ping(msg)
|
||||
|
||||
def send_reconcildiff(self, success, ask_shortids, sync_with_ping=False):
|
||||
msg = msg_reconcildiff()
|
||||
msg.success = success
|
||||
msg.ask_shortids = ask_shortids
|
||||
if sync_with_ping:
|
||||
self.send_and_ping(msg)
|
||||
else :
|
||||
self.send_without_ping(msg)
|
||||
|
||||
def send_reqsketchext(self):
|
||||
self.send_without_ping(msg_reqsketchext())
|
||||
|
||||
def send_inv(self, inv_wtxids):
|
||||
msg = msg_inv(inv=[CInv(MSG_WTX, h=wtxid) for wtxid in inv_wtxids])
|
||||
self.send_without_ping(msg)
|
||||
|
||||
def send_getdata(self, ask_wtxids):
|
||||
msg = msg_getdata(inv=[CInv(MSG_WTX, h=wtxid) for wtxid in ask_wtxids])
|
||||
self.send_without_ping(msg)
|
||||
|
||||
def compute_salt(self):
|
||||
RECON_STATIC_SALT = "Tx Relay Salting"
|
||||
salt1, salt2 = self.node_salt, self.mininode_salt
|
||||
salt = min(salt1, salt2).to_bytes(8, "little") + max(salt1, salt2).to_bytes(8, "little")
|
||||
h = TaggedHash(RECON_STATIC_SALT, salt)
|
||||
k0 = int.from_bytes(h[0:8], "little")
|
||||
k1 = int.from_bytes(h[8:16], "little")
|
||||
return k0, k1
|
||||
|
||||
|
||||
class ReconciliationTest(BitcoinTestFramework):
|
||||
def add_options(self, parser):
|
||||
self.add_wallet_options(parser)
|
||||
|
||||
def skip_test_if_missing_module(self):
|
||||
self.skip_if_no_wallet()
|
||||
|
||||
def set_test_params(self):
|
||||
self.setup_clean_chain = True
|
||||
self.num_nodes = 1
|
||||
self.extra_args = [['-txreconciliation']]
|
||||
|
||||
def generate_txs(self, wallet, n_mininode_unique, n_node_unique, n_shared):
|
||||
mininode_unique = [wallet.create_self_transfer()["tx"] for _ in range(n_mininode_unique)]
|
||||
node_unique = [wallet.create_self_transfer()["tx"] for _ in range(n_node_unique)]
|
||||
shared = [wallet.create_self_transfer()["tx"] for _ in range(n_shared)]
|
||||
|
||||
tx_submitter = self.nodes[0].add_p2p_connection(P2PDataStore())
|
||||
tx_submitter.send_txs_and_test(
|
||||
node_unique + shared, self.nodes[0], success=True)
|
||||
tx_submitter.peer_disconnect()
|
||||
|
||||
return mininode_unique, node_unique, shared
|
||||
|
||||
# Wait for the next INV message to be received by the given peer.
|
||||
# Clear and check it matches the expected transactions.
|
||||
def wait_for_inv(self, peer, expected_wtxids):
|
||||
def received_inv():
|
||||
return (len(peer.last_inv) > 0)
|
||||
self.wait_until(received_inv)
|
||||
|
||||
received_wtxids = set(peer.last_inv.pop())
|
||||
assert_equal(expected_wtxids, received_wtxids)
|
||||
|
||||
def request_transactions_from(self, peer, wtxids_to_request):
|
||||
# Make sure there were no unexpected transactions received before
|
||||
assert_equal(peer.last_tx, [])
|
||||
peer.send_getdata(wtxids_to_request)
|
||||
|
||||
# Wait for the next TX message to be received by the given peer.
|
||||
# Clear and check it matches the expected transactions.
|
||||
def wait_for_txs(self, peer, expected_wtxids):
|
||||
def received_txs():
|
||||
return (len(peer.last_tx) == len(expected_wtxids))
|
||||
self.wait_until(received_txs)
|
||||
|
||||
assert_equal(set(expected_wtxids), set(peer.last_tx))
|
||||
peer.last_tx.clear()
|
||||
|
||||
def run_test(self):
|
||||
pass
|
|
@ -156,6 +156,8 @@ BASE_SCRIPTS = [
|
|||
'feature_reindex_readonly.py',
|
||||
'wallet_labels.py',
|
||||
'p2p_compactblocks.py',
|
||||
'p2p_txrecon_initiator.py',
|
||||
'p2p_txrecon_responder.py',
|
||||
'p2p_compactblocks_blocksonly.py',
|
||||
'wallet_hd.py',
|
||||
'wallet_blank.py',
|
||||
|
@ -334,6 +336,7 @@ BASE_SCRIPTS = [
|
|||
'rpc_getdescriptoractivity.py',
|
||||
'rpc_scanblocks.py',
|
||||
'p2p_sendtxrcncl.py',
|
||||
'p2p_reqtxrcncl.py',
|
||||
'rpc_scantxoutset.py',
|
||||
'feature_unsupported_utxo_db.py',
|
||||
'feature_logging.py',
|
||||
|
|
Loading…
Add table
Reference in a new issue