Introduce mempool changesets

Introduce the CTxMemPool::ChangeSet, a wrapper for creating (potential) new
mempool entries and removing conflicts.
This commit is contained in:
Suhas Daftuar 2024-08-23 18:45:49 -04:00
parent 87d92fa340
commit 802214c083
3 changed files with 80 additions and 21 deletions

View file

@ -1369,3 +1369,23 @@ util::Result<std::pair<std::vector<FeeFrac>, std::vector<FeeFrac>>> CTxMemPool::
std::sort(new_chunks.begin(), new_chunks.end(), std::greater()); std::sort(new_chunks.begin(), new_chunks.end(), std::greater());
return std::make_pair(old_chunks, new_chunks); return std::make_pair(old_chunks, new_chunks);
} }
CTxMemPool::ChangeSet::TxHandle CTxMemPool::ChangeSet::StageAddition(const CTransactionRef& tx, const CAmount fee, int64_t time, unsigned int entry_height, uint64_t entry_sequence, bool spends_coinbase, int64_t sigops_cost, LockPoints lp)
{
auto newit = m_to_add.emplace(tx, fee, time, entry_height, entry_sequence, spends_coinbase, sigops_cost, lp).first;
m_entry_vec.push_back(newit);
return newit;
}
void CTxMemPool::ChangeSet::Apply()
{
LOCK(m_pool->cs);
m_pool->RemoveStaged(m_to_remove, false, MemPoolRemovalReason::REPLACED);
for (size_t i=0; i<m_entry_vec.size(); ++i) {
auto tx_entry = m_entry_vec[i];
m_pool->addUnchecked(*tx_entry);
}
m_to_add.clear();
m_to_remove.clear();
m_entry_vec.clear();
}

View file

@ -816,6 +816,37 @@ public:
assert(m_epoch.guarded()); // verify guard even when it==nullopt assert(m_epoch.guarded()); // verify guard even when it==nullopt
return !it || visited(*it); return !it || visited(*it);
} }
class ChangeSet {
public:
explicit ChangeSet(CTxMemPool* pool) : m_pool(pool) {}
~ChangeSet() = default;
ChangeSet(const ChangeSet&) = delete;
ChangeSet& operator=(const ChangeSet&) = delete;
using TxHandle = CTxMemPool::txiter;
TxHandle StageAddition(const CTransactionRef& tx, const CAmount fee, int64_t time, unsigned int entry_height, uint64_t entry_sequence, bool spends_coinbase, int64_t sigops_cost, LockPoints lp);
void StageRemoval(CTxMemPool::txiter it) { m_to_remove.insert(it); }
util::Result<CTxMemPool::setEntries> CalculateMemPoolAncestors(TxHandle tx, const Limits& limits)
{
LOCK(m_pool->cs);
auto ret{m_pool->CalculateMemPoolAncestors(*tx, limits)};
return ret;
}
void Apply() EXCLUSIVE_LOCKS_REQUIRED(cs_main);
private:
CTxMemPool* m_pool;
CTxMemPool::indexed_transaction_set m_to_add;
std::vector<CTxMemPool::txiter> m_entry_vec; // track the added transactions' insertion order
CTxMemPool::setEntries m_to_remove;
};
std::unique_ptr<ChangeSet> GetChangeSet() EXCLUSIVE_LOCKS_REQUIRED(cs) { return std::make_unique<ChangeSet>(this); }
}; };
/** /**

View file

@ -633,9 +633,9 @@ private:
CTxMemPool::setEntries m_iters_conflicting; CTxMemPool::setEntries m_iters_conflicting;
/** All mempool ancestors of this transaction. */ /** All mempool ancestors of this transaction. */
CTxMemPool::setEntries m_ancestors; CTxMemPool::setEntries m_ancestors;
/** Mempool entry constructed for this transaction. Constructed in PreChecks() but not /* Changeset representing adding a transaction and removing its conflicts. */
* inserted into the mempool until Finalize(). */ std::unique_ptr<CTxMemPool::ChangeSet> m_changeset;
std::unique_ptr<CTxMemPoolEntry> m_entry; CTxMemPool::ChangeSet::TxHandle m_tx_handle;
/** Whether RBF-related data structures (m_conflicts, m_iters_conflicting, m_all_conflicting, /** Whether RBF-related data structures (m_conflicts, m_iters_conflicting, m_all_conflicting,
* m_replaced_transactions) include a sibling in addition to txns with conflicting inputs. */ * m_replaced_transactions) include a sibling in addition to txns with conflicting inputs. */
bool m_sibling_eviction{false}; bool m_sibling_eviction{false};
@ -780,7 +780,6 @@ bool MemPoolAccept::PreChecks(ATMPArgs& args, Workspace& ws)
// Alias what we need out of ws // Alias what we need out of ws
TxValidationState& state = ws.m_state; TxValidationState& state = ws.m_state;
std::unique_ptr<CTxMemPoolEntry>& entry = ws.m_entry;
if (!CheckTransaction(tx, state)) { if (!CheckTransaction(tx, state)) {
return false; // state filled in by CheckTransaction return false; // state filled in by CheckTransaction
@ -909,9 +908,10 @@ bool MemPoolAccept::PreChecks(ATMPArgs& args, Workspace& ws)
// Set entry_sequence to 0 when bypass_limits is used; this allows txs from a block // Set entry_sequence to 0 when bypass_limits is used; this allows txs from a block
// reorg to be marked earlier than any child txs that were already in the mempool. // reorg to be marked earlier than any child txs that were already in the mempool.
const uint64_t entry_sequence = bypass_limits ? 0 : m_pool.GetSequence(); const uint64_t entry_sequence = bypass_limits ? 0 : m_pool.GetSequence();
entry.reset(new CTxMemPoolEntry(ptx, ws.m_base_fees, nAcceptTime, m_active_chainstate.m_chain.Height(), entry_sequence, ws.m_changeset = m_pool.GetChangeSet();
fSpendsCoinbase, nSigOpsCost, lock_points.value())); ws.m_tx_handle = ws.m_changeset->StageAddition(ptx, ws.m_base_fees, nAcceptTime, m_active_chainstate.m_chain.Height(), entry_sequence, fSpendsCoinbase, nSigOpsCost, lock_points.value());
ws.m_vsize = entry->GetTxSize();
ws.m_vsize = ws.m_tx_handle->GetTxSize();
// Enforces 0-fee for dust transactions, no incentive to be mined alone // Enforces 0-fee for dust transactions, no incentive to be mined alone
if (m_pool.m_opts.require_standard) { if (m_pool.m_opts.require_standard) {
@ -983,7 +983,7 @@ bool MemPoolAccept::PreChecks(ATMPArgs& args, Workspace& ws)
maybe_rbf_limits.descendant_size_vbytes += conflict->GetSizeWithDescendants(); maybe_rbf_limits.descendant_size_vbytes += conflict->GetSizeWithDescendants();
} }
if (auto ancestors{m_pool.CalculateMemPoolAncestors(*entry, maybe_rbf_limits)}) { if (auto ancestors{ws.m_changeset->CalculateMemPoolAncestors(ws.m_tx_handle, maybe_rbf_limits)}) {
ws.m_ancestors = std::move(*ancestors); ws.m_ancestors = std::move(*ancestors);
} else { } else {
// If CalculateMemPoolAncestors fails second time, we want the original error string. // If CalculateMemPoolAncestors fails second time, we want the original error string.
@ -1015,7 +1015,7 @@ bool MemPoolAccept::PreChecks(ATMPArgs& args, Workspace& ws)
if (ws.m_vsize > EXTRA_DESCENDANT_TX_SIZE_LIMIT || ws.m_ptx->version == TRUC_VERSION) { if (ws.m_vsize > EXTRA_DESCENDANT_TX_SIZE_LIMIT || ws.m_ptx->version == TRUC_VERSION) {
return state.Invalid(TxValidationResult::TX_MEMPOOL_POLICY, "too-long-mempool-chain", error_message); return state.Invalid(TxValidationResult::TX_MEMPOOL_POLICY, "too-long-mempool-chain", error_message);
} }
if (auto ancestors_retry{m_pool.CalculateMemPoolAncestors(*entry, cpfp_carve_out_limits)}) { if (auto ancestors_retry{ws.m_changeset->CalculateMemPoolAncestors(ws.m_tx_handle, cpfp_carve_out_limits)}) {
ws.m_ancestors = std::move(*ancestors_retry); ws.m_ancestors = std::move(*ancestors_retry);
} else { } else {
return state.Invalid(TxValidationResult::TX_MEMPOOL_POLICY, "too-long-mempool-chain", error_message); return state.Invalid(TxValidationResult::TX_MEMPOOL_POLICY, "too-long-mempool-chain", error_message);
@ -1114,6 +1114,11 @@ bool MemPoolAccept::ReplacementChecks(Workspace& ws)
return state.Invalid(TxValidationResult::TX_RECONSIDERABLE, return state.Invalid(TxValidationResult::TX_RECONSIDERABLE,
strprintf("insufficient fee%s", ws.m_sibling_eviction ? " (including sibling eviction)" : ""), *err_string); strprintf("insufficient fee%s", ws.m_sibling_eviction ? " (including sibling eviction)" : ""), *err_string);
} }
// Add all the to-be-removed transactions to the changeset.
for (auto it : m_subpackage.m_all_conflicts) {
ws.m_changeset->StageRemoval(it);
}
return true; return true;
} }
@ -1173,7 +1178,9 @@ bool MemPoolAccept::PackageMempoolChecks(const std::vector<CTransactionRef>& txn
"package RBF failed: too many potential replacements", *err_string); "package RBF failed: too many potential replacements", *err_string);
} }
for (CTxMemPool::txiter it : m_subpackage.m_all_conflicts) { for (CTxMemPool::txiter it : m_subpackage.m_all_conflicts) {
parent_ws.m_changeset->StageRemoval(it);
m_subpackage.m_conflicting_fees += it->GetModifiedFee(); m_subpackage.m_conflicting_fees += it->GetModifiedFee();
m_subpackage.m_conflicting_size += it->GetTxSize(); m_subpackage.m_conflicting_size += it->GetTxSize();
} }
@ -1283,7 +1290,6 @@ bool MemPoolAccept::Finalize(const ATMPArgs& args, Workspace& ws)
const uint256& hash = ws.m_hash; const uint256& hash = ws.m_hash;
TxValidationState& state = ws.m_state; TxValidationState& state = ws.m_state;
const bool bypass_limits = args.m_bypass_limits; const bool bypass_limits = args.m_bypass_limits;
std::unique_ptr<CTxMemPoolEntry>& entry = ws.m_entry;
if (!m_subpackage.m_all_conflicts.empty()) Assume(args.m_allow_replacement); if (!m_subpackage.m_all_conflicts.empty()) Assume(args.m_allow_replacement);
// Remove conflicting transactions from the mempool // Remove conflicting transactions from the mempool
@ -1296,25 +1302,23 @@ bool MemPoolAccept::Finalize(const ATMPArgs& args, Workspace& ws)
it->GetTxSize(), it->GetTxSize(),
hash.ToString(), hash.ToString(),
tx.GetWitnessHash().ToString(), tx.GetWitnessHash().ToString(),
entry->GetFee(), ws.m_tx_handle->GetFee(),
entry->GetTxSize()); ws.m_tx_handle->GetTxSize());
TRACEPOINT(mempool, replaced, TRACEPOINT(mempool, replaced,
it->GetTx().GetHash().data(), it->GetTx().GetHash().data(),
it->GetTxSize(), it->GetTxSize(),
it->GetFee(), it->GetFee(),
std::chrono::duration_cast<std::chrono::duration<std::uint64_t>>(it->GetTime()).count(), std::chrono::duration_cast<std::chrono::duration<std::uint64_t>>(it->GetTime()).count(),
hash.data(), hash.data(),
entry->GetTxSize(), ws.m_tx_handle->GetTxSize(),
entry->GetFee() ws.m_tx_handle->GetFee()
); );
m_subpackage.m_replaced_transactions.push_back(it->GetSharedTx()); m_subpackage.m_replaced_transactions.push_back(it->GetSharedTx());
} }
m_pool.RemoveStaged(m_subpackage.m_all_conflicts, false, MemPoolRemovalReason::REPLACED); ws.m_changeset->Apply();
// Don't attempt to process the same conflicts repeatedly during subpackage evaluation: // Don't attempt to process the same conflicts repeatedly during subpackage evaluation:
// they no longer exist on subsequent calls to Finalize() post-RemoveStaged // they no longer exist on subsequent calls to Finalize() post-Apply()
m_subpackage.m_all_conflicts.clear(); m_subpackage.m_all_conflicts.clear();
// Store transaction in memory
m_pool.addUnchecked(*entry, ws.m_ancestors);
// trim mempool and check if tx was trimmed // trim mempool and check if tx was trimmed
// If we are validating a package, don't trim here because we could evict a previous transaction // If we are validating a package, don't trim here because we could evict a previous transaction
@ -1359,7 +1363,7 @@ bool MemPoolAccept::SubmitPackage(const ATMPArgs& args, std::vector<Workspace>&
// Re-calculate mempool ancestors to call addUnchecked(). They may have changed since the // Re-calculate mempool ancestors to call addUnchecked(). They may have changed since the
// last calculation done in PreChecks, since package ancestors have already been submitted. // last calculation done in PreChecks, since package ancestors have already been submitted.
{ {
auto ancestors{m_pool.CalculateMemPoolAncestors(*ws.m_entry, m_pool.m_opts.limits)}; auto ancestors{ws.m_changeset->CalculateMemPoolAncestors(ws.m_tx_handle, m_pool.m_opts.limits)};
if(!ancestors) { if(!ancestors) {
results.emplace(ws.m_ptx->GetWitnessHash(), MempoolAcceptResult::Failure(ws.m_state)); results.emplace(ws.m_ptx->GetWitnessHash(), MempoolAcceptResult::Failure(ws.m_state));
// Since PreChecks() and PackageMempoolChecks() both enforce limits, this should never fail. // Since PreChecks() and PackageMempoolChecks() both enforce limits, this should never fail.
@ -1400,6 +1404,8 @@ bool MemPoolAccept::SubmitPackage(const ATMPArgs& args, std::vector<Workspace>&
// Add successful results. The returned results may change later if LimitMempoolSize() evicts them. // Add successful results. The returned results may change later if LimitMempoolSize() evicts them.
for (Workspace& ws : workspaces) { for (Workspace& ws : workspaces) {
auto iter = m_pool.GetIter(ws.m_ptx->GetHash());
Assume(iter.has_value());
const auto effective_feerate = args.m_package_feerates ? ws.m_package_feerate : const auto effective_feerate = args.m_package_feerates ? ws.m_package_feerate :
CFeeRate{ws.m_modified_fees, static_cast<uint32_t>(ws.m_vsize)}; CFeeRate{ws.m_modified_fees, static_cast<uint32_t>(ws.m_vsize)};
const auto effective_feerate_wtxids = args.m_package_feerates ? all_package_wtxids : const auto effective_feerate_wtxids = args.m_package_feerates ? all_package_wtxids :
@ -1410,7 +1416,7 @@ bool MemPoolAccept::SubmitPackage(const ATMPArgs& args, std::vector<Workspace>&
if (!m_pool.m_opts.signals) continue; if (!m_pool.m_opts.signals) continue;
const CTransaction& tx = *ws.m_ptx; const CTransaction& tx = *ws.m_ptx;
const auto tx_info = NewMempoolTransactionInfo(ws.m_ptx, ws.m_base_fees, const auto tx_info = NewMempoolTransactionInfo(ws.m_ptx, ws.m_base_fees,
ws.m_vsize, ws.m_entry->GetHeight(), ws.m_vsize, (*iter)->GetHeight(),
args.m_bypass_limits, args.m_package_submission, args.m_bypass_limits, args.m_package_submission,
IsCurrentForFeeEstimation(m_active_chainstate), IsCurrentForFeeEstimation(m_active_chainstate),
m_pool.HasNoInputsOf(tx)); m_pool.HasNoInputsOf(tx));
@ -1481,8 +1487,10 @@ MempoolAcceptResult MemPoolAccept::AcceptSingleTransaction(const CTransactionRef
if (m_pool.m_opts.signals) { if (m_pool.m_opts.signals) {
const CTransaction& tx = *ws.m_ptx; const CTransaction& tx = *ws.m_ptx;
auto iter = m_pool.GetIter(tx.GetHash());
Assume(iter.has_value());
const auto tx_info = NewMempoolTransactionInfo(ws.m_ptx, ws.m_base_fees, const auto tx_info = NewMempoolTransactionInfo(ws.m_ptx, ws.m_base_fees,
ws.m_vsize, ws.m_entry->GetHeight(), ws.m_vsize, (*iter)->GetHeight(),
args.m_bypass_limits, args.m_package_submission, args.m_bypass_limits, args.m_package_submission,
IsCurrentForFeeEstimation(m_active_chainstate), IsCurrentForFeeEstimation(m_active_chainstate),
m_pool.HasNoInputsOf(tx)); m_pool.HasNoInputsOf(tx));