txgraph: Track multiple potential would-be clusters in Trim (improvement)

In a Trim function, for any given would-be group of clusters, a (rudimentary)
linearization for the would-be cluster is constructed on the fly by adding
eligible transactions to a heap. This continues until the total count or
size of the transaction exists a configured limit. Any transactions which
appear later in this linearization are discarded.

However, given that transactions at the end are discarded, it is possible that
the would-be cluster splits apart into multiple clusters. And those clusters
may well permit far more transactions before their limits are reached.

Take this into account by using a union-find structure inside TrimTxData to
keep track of the count/size of all would-be clusters that would be formed
at any point.

This is not an optimization in terms of CPU usage or memory; it just
improves the quality of the transactions removed by Trim().
This commit is contained in:
Pieter Wuille 2024-12-19 23:06:07 -05:00
parent c4c96fb3e3
commit 3566dca6c9

View file

@ -62,12 +62,29 @@ struct TrimTxData
TxGraph::GraphIndex m_index;
/** Number of unmet dependencies this transaction has. -1 if the transaction is included. */
uint32_t m_deps_left;
/** Number of dependencies that apply to this transaction as child. */
uint32_t m_parent_count;
/** Where in deps_by_child those dependencies begin. */
uint32_t m_parent_offset;
/** Number of dependencies that apply to this transaction as parent. */
uint32_t m_children_count;
/** Where in deps those dependencies begin. */
/** Where in deps_by_parent those dependencies begin. */
uint32_t m_children_offset;
/** Size of the transaction. */
uint32_t m_tx_size;
// As transactions get processed, they get organized into trees which form partitions
// representing the would-be clusters up to that point. The root of each tree is a
// representative for that partition.
// See https://en.wikipedia.org/wiki/Disjoint-set_data_structure.
/** Pointer to another TrimTxData, towards the root of the tree. If this is a root, m_uf_parent
* is equal to this itself. */
TrimTxData* m_uf_parent;
/** If this is a root, the total number of transactions in the parition. */
uint32_t m_uf_count;
/** If this is a root, the total size of transactions in the partition. */
uint64_t m_uf_size;
};
/** A grouping of connected transactions inside a TxGraphImpl::ClusterSet. */
@ -2619,21 +2636,26 @@ std::vector<TxGraph::Ref*> TxGraphImpl::Trim() noexcept
// but respecting the dependencies being added.
//
// This rudimentary linearization is computed lazily, by putting all eligible (no unmet
// dependencies) transactions in a heap, and popping the highest-feerate one from it. This
// continues as long the number or size of all picked transactions together does not exceed the
// graph's configured cluster limits. All remaining transactions are then marked as removed.
// dependencies) transactions in a heap, and popping the highest-feerate one from it. Along the
// way, the counts and sizes of the would-be clusters up to that point are tracked (by
// partitioning the involved transactions using a union-find structure). Any transaction whose
// addition would cause a violation is removed, along with all their descendants.
//
// A next invocation of GroupClusters (after applying the removals) will compute the new
// resulting clusters, and none of them will violate the limits.
/** All dependencies (both to be added ones, and implicit ones between consecutive transactions
* in existing cluster linearizations). */
std::vector<std::pair<GraphIndex, GraphIndex>> deps;
* in existing cluster linearizations), sorted by parent. */
std::vector<std::pair<GraphIndex, GraphIndex>> deps_by_parent;
/** Same, but sorted by child. */
std::vector<std::pair<GraphIndex, GraphIndex>> deps_by_child;
/** Information about all transactions involved in a Cluster group to be trimmed, sorted by
* GraphIndex. */
std::vector<TrimTxData> trim_data;
/** Iterators into trim_data, treated as a max heap according to cmp_fn below. */
std::vector<std::vector<TrimTxData>::iterator> trim_heap;
/** The list of representatives of the partitions a given transaction depends on. */
std::vector<TrimTxData*> current_deps;
/** Function to define the ordering of trim_heap. */
static constexpr auto cmp_fn = [](auto a, auto b) noexcept {
@ -2644,6 +2666,38 @@ std::vector<TxGraph::Ref*> TxGraphImpl::Trim() noexcept
return a->m_chunk_feerate < b->m_chunk_feerate;
};
/** Given a TrimTxData entry, find the representative of the partition it is in. */
static constexpr auto find_fn = [](TrimTxData* arg) noexcept {
while (arg != arg->m_uf_parent) {
// Replace pointer to parent with pointer to grandparent (path splitting).
// See https://en.wikipedia.org/wiki/Disjoint-set_data_structure#Finding_set_representatives.
auto par = arg->m_uf_parent;
arg->m_uf_parent = par->m_uf_parent;
arg = par;
}
return arg;
};
/** Given two TrimTxData entries, union the partitions they are in, and return the
* representative. */
static constexpr auto union_fn = [](TrimTxData* arg1, TrimTxData* arg2) noexcept {
// Replace arg1 and arg2 by their representatives.
auto rep1 = find_fn(arg1);
auto rep2 = find_fn(arg2);
// Bail out if both representatives are the same, because that means arg1 and arg2 are in
// the same partition already.
if (rep1 == rep2) return rep1;
// Pick the lower-count root to become a child of the higher-count one.
// See https://en.wikipedia.org/wiki/Disjoint-set_data_structure#Union_by_size.
if (rep1->m_uf_count < rep2->m_uf_count) std::swap(rep1, rep2);
rep2->m_uf_parent = rep1;
// Add the statistics of arg2 (which is no longer a representative) to those of arg1 (which
// is now the representative for both).
rep1->m_uf_size += rep2->m_uf_size;
rep1->m_uf_count += rep2->m_uf_count;
return rep1;
};
/** Get iterator to TrimTxData entry for a given index. */
auto locate_fn = [&](GraphIndex index) noexcept {
auto it = std::lower_bound(trim_data.begin(), trim_data.end(), index, [](TrimTxData& elem, GraphIndex idx) noexcept {
@ -2657,14 +2711,15 @@ std::vector<TxGraph::Ref*> TxGraphImpl::Trim() noexcept
for (const auto& group_data : clusterset.m_group_data->m_groups) {
trim_data.clear();
trim_heap.clear();
deps.clear();
deps_by_child.clear();
deps_by_parent.clear();
// Gather trim data from all involved Clusters.
auto cluster_span = std::span{clusterset.m_group_data->m_group_clusters}
.subspan(group_data.m_cluster_offset, group_data.m_cluster_count);
uint64_t size{0};
for (Cluster* cluster : cluster_span) {
size += cluster->AppendTrimData(trim_data, deps);
size += cluster->AppendTrimData(trim_data, deps_by_child);
}
// If this group of Clusters does not violate any limits, continue to the next group.
if (trim_data.size() <= m_max_cluster_count && size <= m_max_cluster_size) continue;
@ -2672,44 +2727,52 @@ std::vector<TxGraph::Ref*> TxGraphImpl::Trim() noexcept
// a map from GraphIndex to TrimTxData, and its ordering will not change anymore.
std::sort(trim_data.begin(), trim_data.end(), [](auto& a, auto& b) noexcept { return a.m_index < b.m_index; });
// Construct deps, and sort it by child.
deps.insert(deps.end(),
clusterset.m_deps_to_add.begin() + group_data.m_deps_offset,
clusterset.m_deps_to_add.begin() + group_data.m_deps_offset + group_data.m_deps_count);
std::sort(deps.begin(), deps.end(), [](auto& a, auto& b) noexcept { return a.second < b.second; });
// Fill m_deps_left in trim_data. Because of the sort above, all
// Construct deps_by_child.
deps_by_child.insert(deps_by_child.end(),
clusterset.m_deps_to_add.begin() + group_data.m_deps_offset,
clusterset.m_deps_to_add.begin() + group_data.m_deps_offset + group_data.m_deps_count);
std::sort(deps_by_child.begin(), deps_by_child.end(), [](auto& a, auto& b) noexcept { return a.second < b.second; });
// Fill m_parents_count and m_parents_offset in trim_data. Because of the sort above, all
// dependencies involving the same child are grouped together, so a single linear scan
// suffices.
auto deps_it = deps.begin();
auto deps_it = deps_by_child.begin();
for (auto trim_it = trim_data.begin(); trim_it != trim_data.end(); ++trim_it) {
trim_it->m_parent_offset = deps_it - deps_by_child.begin();
trim_it->m_deps_left = 0;
while (deps_it != deps.end() && deps_it->second == trim_it->m_index) {
while (deps_it != deps_by_child.end() && deps_it->second == trim_it->m_index) {
++trim_it->m_deps_left;
++deps_it;
}
trim_it->m_parent_count = trim_it->m_deps_left;
// If this transaction has no unmet dependencies, and is not oversized, add it to the
// heap (just append for now, the heapification happens below).
if (trim_it->m_deps_left == 0 && trim_it->m_tx_size <= m_max_cluster_size) {
// Initialize it as a singleton partition.
trim_it->m_uf_parent = &*trim_it;
trim_it->m_uf_count = 1;
trim_it->m_uf_size = trim_it->m_tx_size;
// Add to heap.
trim_heap.push_back(trim_it);
}
}
Assume(deps_it == deps.end());
Assume(deps_it == deps_by_child.end());
// Sort deps by parent.
std::sort(deps.begin(), deps.end(), [](auto& a, auto& b) noexcept { return a.first < b.first; });
// Construct deps_by_parent.
deps_by_parent = deps_by_child;
std::sort(deps_by_parent.begin(), deps_by_parent.end(), [](auto& a, auto& b) noexcept { return a.first < b.first; });
// Fill m_children_offset and m_children_count in trim_data. Because of the sort above, all
// dependencies involving the same parent are grouped together, so a single linear scan
// suffices.
deps_it = deps.begin();
deps_it = deps_by_parent.begin();
for (auto& trim_entry : trim_data) {
trim_entry.m_children_count = 0;
trim_entry.m_children_offset = deps_it - deps.begin();
while (deps_it != deps.end() && deps_it->first == trim_entry.m_index) {
trim_entry.m_children_offset = deps_it - deps_by_parent.begin();
while (deps_it != deps_by_parent.end() && deps_it->first == trim_entry.m_index) {
++trim_entry.m_children_count;
++deps_it;
}
}
Assume(deps_it == deps.end());
Assume(deps_it == deps_by_parent.end());
// Build a heap of all transactions with 0 unmet dependencies.
std::make_heap(trim_heap.begin(), trim_heap.end(), cmp_fn);
@ -2723,8 +2786,6 @@ std::vector<TxGraph::Ref*> TxGraphImpl::Trim() noexcept
// reorganizations that would violate cluster limits, as all added dependencies are in the
// same direction (from old mempool transactions to new from-block transactions); cycles
// require dependencies in both directions to be added.
uint32_t total_count{0};
uint64_t total_size{0};
while (!trim_heap.empty()) {
// Move the best remaining transaction to the end of trim_heap.
std::pop_heap(trim_heap.begin(), trim_heap.end(), cmp_fn);
@ -2732,22 +2793,43 @@ std::vector<TxGraph::Ref*> TxGraphImpl::Trim() noexcept
auto& entry = *trim_heap.back();
trim_heap.pop_back();
// Compute resource counts.
total_count += 1;
total_size += entry.m_tx_size;
// Stop if this would violate any limit.
if (total_count > m_max_cluster_count || total_size > m_max_cluster_size) break;
// Find the distinct transaction partitions this entry depends on.
current_deps.clear();
for (auto& [par, chl] : std::span{deps_by_child}.subspan(entry.m_parent_offset, entry.m_parent_count)) {
Assume(chl == entry.m_index);
current_deps.push_back(find_fn(&*locate_fn(par)));
}
std::sort(current_deps.begin(), current_deps.end());
current_deps.erase(std::unique(current_deps.begin(), current_deps.end()), current_deps.end());
// Compute resource counts.
uint32_t new_count = 1;
uint64_t new_size = entry.m_tx_size;
for (TrimTxData* ptr : current_deps) {
new_count += ptr->m_uf_count;
new_size += ptr->m_uf_size;
}
// Skip the entry if this would violate any limit.
if (new_count > m_max_cluster_count || new_size > m_max_cluster_size) break;
// Union the partitions this transactions and all its dependencies are in together.
auto rep = &entry;
for (TrimTxData* ptr : current_deps) rep = union_fn(ptr, rep);
// Mark the entry as included (so the loop below will not remove the transaction).
entry.m_deps_left = uint32_t(-1);
// Mark each to-be-added dependency involving this transaction as parent satisfied.
for (auto& [par, chl] : std::span{deps}.subspan(entry.m_children_offset, entry.m_children_count)) {
for (auto& [par, chl] : std::span{deps_by_parent}.subspan(entry.m_children_offset, entry.m_children_count)) {
Assume(par == entry.m_index);
auto chl_it = locate_fn(chl);
// Reduce the number of unmet dependencies of chl_it, and if that brings the number
// to zero, add it to the heap.
Assume(chl_it->m_deps_left > 0);
if (--chl_it->m_deps_left == 0) {
// Initialize as a singleton partition.
chl_it->m_uf_parent = &*chl_it;
chl_it->m_uf_count = 1;
chl_it->m_uf_size = chl_it->m_tx_size;
// Add it to the heap.
trim_heap.push_back(chl_it);
std::push_heap(trim_heap.begin(), trim_heap.end(), cmp_fn);
}