mirror of
https://github.com/bitcoin/bitcoin.git
synced 2025-04-29 14:59:39 -04:00
Merge b2da764446
into c5e44a0435
This commit is contained in:
commit
ce9d3eb9b5
11 changed files with 666 additions and 6 deletions
|
@ -28,6 +28,7 @@ add_executable(bench_bitcoin
|
||||||
gcs_filter.cpp
|
gcs_filter.cpp
|
||||||
hashpadding.cpp
|
hashpadding.cpp
|
||||||
index_blockfilter.cpp
|
index_blockfilter.cpp
|
||||||
|
inputfetcher.cpp
|
||||||
load_external.cpp
|
load_external.cpp
|
||||||
lockedpool.cpp
|
lockedpool.cpp
|
||||||
logging.cpp
|
logging.cpp
|
||||||
|
|
57
src/bench/inputfetcher.cpp
Normal file
57
src/bench/inputfetcher.cpp
Normal file
|
@ -0,0 +1,57 @@
|
||||||
|
// Copyright (c) 2024-present The Bitcoin Core developers
|
||||||
|
// Distributed under the MIT software license, see the accompanying
|
||||||
|
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
|
||||||
|
|
||||||
|
#include <bench/bench.h>
|
||||||
|
#include <bench/data/block413567.raw.h>
|
||||||
|
#include <coins.h>
|
||||||
|
#include <common/system.h>
|
||||||
|
#include <inputfetcher.h>
|
||||||
|
#include <primitives/block.h>
|
||||||
|
#include <serialize.h>
|
||||||
|
#include <streams.h>
|
||||||
|
#include <util/time.h>
|
||||||
|
|
||||||
|
static constexpr auto QUEUE_BATCH_SIZE{128};
|
||||||
|
static constexpr auto DELAY{2ms};
|
||||||
|
|
||||||
|
//! Simulates a DB by adding a delay when calling GetCoin
|
||||||
|
class DelayedCoinsView : public CCoinsView
|
||||||
|
{
|
||||||
|
private:
|
||||||
|
std::chrono::milliseconds m_delay;
|
||||||
|
|
||||||
|
public:
|
||||||
|
DelayedCoinsView(std::chrono::milliseconds delay) : m_delay(delay) {}
|
||||||
|
|
||||||
|
std::optional<Coin> GetCoin(const COutPoint& outpoint) const override
|
||||||
|
{
|
||||||
|
UninterruptibleSleep(m_delay);
|
||||||
|
return Coin{};
|
||||||
|
}
|
||||||
|
|
||||||
|
bool BatchWrite(CoinsViewCacheCursor& cursor, const uint256 &hashBlock) override { return true; }
|
||||||
|
};
|
||||||
|
|
||||||
|
static void InputFetcherBenchmark(benchmark::Bench& bench)
|
||||||
|
{
|
||||||
|
DataStream stream{benchmark::data::block413567};
|
||||||
|
CBlock block;
|
||||||
|
stream >> TX_WITH_WITNESS(block);
|
||||||
|
|
||||||
|
DelayedCoinsView db(DELAY);
|
||||||
|
CCoinsViewCache cache(&db);
|
||||||
|
|
||||||
|
// The main thread should be counted to prevent thread oversubscription, and
|
||||||
|
// to decrease the variance of benchmark results.
|
||||||
|
const auto worker_threads_num{GetNumCores() - 1};
|
||||||
|
InputFetcher fetcher{QUEUE_BATCH_SIZE, worker_threads_num};
|
||||||
|
|
||||||
|
bench.run([&] {
|
||||||
|
const auto ok{cache.Flush()};
|
||||||
|
assert(ok);
|
||||||
|
fetcher.FetchInputs(cache, db, block);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
BENCHMARK(InputFetcherBenchmark, benchmark::PriorityLevel::HIGH);
|
|
@ -110,10 +110,15 @@ void CCoinsViewCache::AddCoin(const COutPoint &outpoint, Coin&& coin, bool possi
|
||||||
(bool)it->second.coin.IsCoinBase());
|
(bool)it->second.coin.IsCoinBase());
|
||||||
}
|
}
|
||||||
|
|
||||||
void CCoinsViewCache::EmplaceCoinInternalDANGER(COutPoint&& outpoint, Coin&& coin) {
|
void CCoinsViewCache::EmplaceCoinInternalDANGER(COutPoint&& outpoint, Coin&& coin, bool set_dirty) {
|
||||||
cachedCoinsUsage += coin.DynamicMemoryUsage();
|
const auto mem_usage{coin.DynamicMemoryUsage()};
|
||||||
auto [it, inserted] = cacheCoins.try_emplace(std::move(outpoint), std::move(coin));
|
auto [it, inserted] = cacheCoins.try_emplace(std::move(outpoint), std::move(coin));
|
||||||
if (inserted) CCoinsCacheEntry::SetDirty(*it, m_sentinel);
|
if (inserted) {
|
||||||
|
cachedCoinsUsage += mem_usage;
|
||||||
|
if (set_dirty) {
|
||||||
|
CCoinsCacheEntry::SetDirty(*it, m_sentinel);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void AddCoins(CCoinsViewCache& cache, const CTransaction &tx, int nHeight, bool check_for_overwrite) {
|
void AddCoins(CCoinsViewCache& cache, const CTransaction &tx, int nHeight, bool check_for_overwrite) {
|
||||||
|
|
|
@ -423,12 +423,13 @@ public:
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Emplace a coin into cacheCoins without performing any checks, marking
|
* Emplace a coin into cacheCoins without performing any checks, marking
|
||||||
* the emplaced coin as dirty.
|
* the emplaced coin as dirty unless `set_dirty` is `false`.
|
||||||
*
|
*
|
||||||
* NOT FOR GENERAL USE. Used only when loading coins from a UTXO snapshot.
|
* NOT FOR GENERAL USE. Used when loading coins from a UTXO snapshot, and
|
||||||
|
* in the InputFetcher.
|
||||||
* @sa ChainstateManager::PopulateAndValidateSnapshot()
|
* @sa ChainstateManager::PopulateAndValidateSnapshot()
|
||||||
*/
|
*/
|
||||||
void EmplaceCoinInternalDANGER(COutPoint&& outpoint, Coin&& coin);
|
void EmplaceCoinInternalDANGER(COutPoint&& outpoint, Coin&& coin, bool set_dirty = true);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Spend a coin. Pass moveto in order to get the deleted data.
|
* Spend a coin. Pass moveto in order to get the deleted data.
|
||||||
|
|
246
src/inputfetcher.h
Normal file
246
src/inputfetcher.h
Normal file
|
@ -0,0 +1,246 @@
|
||||||
|
// Copyright (c) 2024-present The Bitcoin Core developers
|
||||||
|
// Distributed under the MIT software license, see the accompanying
|
||||||
|
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
|
||||||
|
|
||||||
|
#ifndef BITCOIN_INPUTFETCHER_H
|
||||||
|
#define BITCOIN_INPUTFETCHER_H
|
||||||
|
|
||||||
|
#include <coins.h>
|
||||||
|
#include <sync.h>
|
||||||
|
#include <tinyformat.h>
|
||||||
|
#include <txdb.h>
|
||||||
|
#include <util/hasher.h>
|
||||||
|
#include <util/threadnames.h>
|
||||||
|
#include <util/transaction_identifier.h>
|
||||||
|
|
||||||
|
#include <cstdint>
|
||||||
|
#include <stdexcept>
|
||||||
|
#include <thread>
|
||||||
|
#include <unordered_set>
|
||||||
|
#include <vector>
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Input fetcher for fetching inputs from the CoinsDB and inserting
|
||||||
|
* into the CoinsTip.
|
||||||
|
*
|
||||||
|
* The main thread loops through the block and writes all input prevouts to a
|
||||||
|
* global vector. It then wakes all workers and starts working as well. Each
|
||||||
|
* thread assigns itself a range of outpoints from the shared vector, and
|
||||||
|
* fetches the coins from disk. The outpoint and coin pairs are written to a
|
||||||
|
* thread local vector of pairs. Once all outpoints are fetched, the main thread
|
||||||
|
* loops through all thread local vectors and writes the pairs to the cache.
|
||||||
|
*/
|
||||||
|
class InputFetcher
|
||||||
|
{
|
||||||
|
private:
|
||||||
|
//! Mutex to protect the inner state
|
||||||
|
Mutex m_mutex{};
|
||||||
|
//! Worker threads block on this when out of work
|
||||||
|
std::condition_variable m_worker_cv{};
|
||||||
|
//! Main thread blocks on this when out of work
|
||||||
|
std::condition_variable m_main_cv{};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The outpoints to be fetched from disk.
|
||||||
|
* This is written to on the main thread, then read from all worker
|
||||||
|
* threads only after the main thread is done writing. Hence, it doesn't
|
||||||
|
* need to be guarded by a lock.
|
||||||
|
*/
|
||||||
|
std::vector<COutPoint> m_outpoints{};
|
||||||
|
/**
|
||||||
|
* The index of the last outpoint that is being fetched. Workers assign
|
||||||
|
* themselves a range of outpoints to fetch from m_outpoints. They will use
|
||||||
|
* this index as the end of their range, and then set this index to the
|
||||||
|
* beginning of the range they take for the next worker. Once it gets to
|
||||||
|
* zero, all outpoints have been assigned and the next worker will wait.
|
||||||
|
*/
|
||||||
|
size_t m_last_outpoint_index GUARDED_BY(m_mutex){0};
|
||||||
|
|
||||||
|
//! The set of txids of the transactions in the current block being fetched.
|
||||||
|
std::unordered_set<Txid, SaltedTxidHasher> m_txids{};
|
||||||
|
//! The vector of thread local vectors of pairs to be written to the cache.
|
||||||
|
std::vector<std::vector<std::pair<COutPoint, Coin>>> m_pairs{};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Number of outpoint fetches that haven't completed yet.
|
||||||
|
* This includes outpoints that have already been assigned, but are still in
|
||||||
|
* the worker's own batches.
|
||||||
|
*/
|
||||||
|
int32_t m_in_flight_outpoints_count GUARDED_BY(m_mutex){0};
|
||||||
|
//! The number of worker threads that are waiting on m_worker_cv
|
||||||
|
int32_t m_idle_worker_count GUARDED_BY(m_mutex){0};
|
||||||
|
//! The maximum number of outpoints to be assigned in one batch
|
||||||
|
const int32_t m_batch_size;
|
||||||
|
//! DB coins view to fetch from.
|
||||||
|
const CCoinsView* m_db{nullptr};
|
||||||
|
//! The cache to check if we already have this input.
|
||||||
|
const CCoinsViewCache* m_cache{nullptr};
|
||||||
|
|
||||||
|
std::vector<std::thread> m_worker_threads;
|
||||||
|
bool m_request_stop GUARDED_BY(m_mutex){false};
|
||||||
|
|
||||||
|
//! Internal function that does the fetching from disk.
|
||||||
|
void Loop(int32_t index, bool is_main_thread = false) noexcept EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
|
||||||
|
{
|
||||||
|
auto local_batch_size{0};
|
||||||
|
auto end_index{0};
|
||||||
|
auto& cond{is_main_thread ? m_main_cv : m_worker_cv};
|
||||||
|
do {
|
||||||
|
{
|
||||||
|
WAIT_LOCK(m_mutex, lock);
|
||||||
|
// first do the clean-up of the previous loop run (allowing us to do
|
||||||
|
// it in the same critsect) local_batch_size will only be
|
||||||
|
// truthy after first run.
|
||||||
|
if (local_batch_size) {
|
||||||
|
m_in_flight_outpoints_count -= local_batch_size;
|
||||||
|
if (!is_main_thread && m_in_flight_outpoints_count == 0) {
|
||||||
|
m_main_cv.notify_one();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// logically, the do loop starts here
|
||||||
|
while (m_last_outpoint_index == 0) {
|
||||||
|
if ((is_main_thread && m_in_flight_outpoints_count == 0) || m_request_stop) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
++m_idle_worker_count;
|
||||||
|
cond.wait(lock);
|
||||||
|
--m_idle_worker_count;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Assign a batch of outpoints to this thread
|
||||||
|
local_batch_size = std::max(1, std::min(m_batch_size,
|
||||||
|
static_cast<int32_t>(m_last_outpoint_index /
|
||||||
|
(m_worker_threads.size() + 1 + m_idle_worker_count))));
|
||||||
|
end_index = m_last_outpoint_index;
|
||||||
|
m_last_outpoint_index -= local_batch_size;
|
||||||
|
}
|
||||||
|
|
||||||
|
auto& local_pairs{m_pairs[index]};
|
||||||
|
local_pairs.reserve(local_pairs.size() + local_batch_size);
|
||||||
|
try {
|
||||||
|
for (auto i{end_index - local_batch_size}; i < end_index; ++i) {
|
||||||
|
const auto& outpoint{m_outpoints[i]};
|
||||||
|
// If an input spends an outpoint from earlier in the
|
||||||
|
// block, it won't be in the cache yet but it also won't be
|
||||||
|
// in the db either.
|
||||||
|
if (m_txids.contains(outpoint.hash)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (m_cache->HaveCoinInCache(outpoint)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (auto coin{m_db->GetCoin(outpoint)}; coin) {
|
||||||
|
local_pairs.emplace_back(outpoint, std::move(*coin));
|
||||||
|
} else {
|
||||||
|
// Missing an input. This block will fail validation.
|
||||||
|
// Skip remaining outpoints and continue so main thread
|
||||||
|
// can proceed.
|
||||||
|
LOCK(m_mutex);
|
||||||
|
m_in_flight_outpoints_count -= m_last_outpoint_index;
|
||||||
|
m_last_outpoint_index = 0;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (const std::runtime_error&) {
|
||||||
|
// Database error. This will be handled later in validation.
|
||||||
|
// Skip remaining outpoints and continue so main thread
|
||||||
|
// can proceed.
|
||||||
|
LOCK(m_mutex);
|
||||||
|
m_in_flight_outpoints_count -= m_last_outpoint_index;
|
||||||
|
m_last_outpoint_index = 0;
|
||||||
|
}
|
||||||
|
} while (true);
|
||||||
|
}
|
||||||
|
|
||||||
|
public:
|
||||||
|
|
||||||
|
//! Create a new input fetcher
|
||||||
|
explicit InputFetcher(int32_t batch_size, int32_t worker_thread_count) noexcept
|
||||||
|
: m_batch_size(batch_size)
|
||||||
|
{
|
||||||
|
if (worker_thread_count < 1) {
|
||||||
|
// Don't do anything if there are no worker threads.
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
m_pairs.reserve(worker_thread_count + 1);
|
||||||
|
for (auto n{0}; n < worker_thread_count + 1; ++n) {
|
||||||
|
m_pairs.emplace_back();
|
||||||
|
}
|
||||||
|
m_worker_threads.reserve(worker_thread_count);
|
||||||
|
for (auto n{0}; n < worker_thread_count; ++n) {
|
||||||
|
m_worker_threads.emplace_back([this, n]() {
|
||||||
|
util::ThreadRename(strprintf("inputfetch.%i", n));
|
||||||
|
Loop(n);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Since this class manages its own resources, which is a thread
|
||||||
|
// pool `m_worker_threads`, copy and move operations are not appropriate.
|
||||||
|
InputFetcher(const InputFetcher&) = delete;
|
||||||
|
InputFetcher& operator=(const InputFetcher&) = delete;
|
||||||
|
InputFetcher(InputFetcher&&) = delete;
|
||||||
|
InputFetcher& operator=(InputFetcher&&) = delete;
|
||||||
|
|
||||||
|
//! Fetch all block inputs from db, and insert into cache.
|
||||||
|
void FetchInputs(CCoinsViewCache& cache,
|
||||||
|
const CCoinsView& db,
|
||||||
|
const CBlock& block) noexcept
|
||||||
|
EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
|
||||||
|
{
|
||||||
|
if (m_worker_threads.empty() || block.vtx.size() <= 1) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set the db and cache to use for this block.
|
||||||
|
m_db = &db;
|
||||||
|
m_cache = &cache;
|
||||||
|
|
||||||
|
// Loop through the inputs of the block and add them to the queue
|
||||||
|
m_txids.reserve(block.vtx.size() - 1);
|
||||||
|
for (const auto& tx : block.vtx) {
|
||||||
|
if (tx->IsCoinBase()) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
m_outpoints.reserve(m_outpoints.size() + tx->vin.size());
|
||||||
|
for (const auto& in : tx->vin) {
|
||||||
|
m_outpoints.emplace_back(in.prevout);
|
||||||
|
}
|
||||||
|
m_txids.emplace(tx->GetHash());
|
||||||
|
}
|
||||||
|
{
|
||||||
|
LOCK(m_mutex);
|
||||||
|
m_in_flight_outpoints_count = m_outpoints.size();
|
||||||
|
m_last_outpoint_index = m_outpoints.size();
|
||||||
|
}
|
||||||
|
m_worker_cv.notify_all();
|
||||||
|
|
||||||
|
// Have the main thread work too while we wait for other threads
|
||||||
|
Loop(m_worker_threads.size(), /*is_main_thread=*/true);
|
||||||
|
|
||||||
|
// At this point all threads are done writing to m_pairs, so we can
|
||||||
|
// safely read from it and insert the fetched coins into the cache.
|
||||||
|
for (auto& local_pairs : m_pairs) {
|
||||||
|
for (auto&& [outpoint, coin] : local_pairs) {
|
||||||
|
cache.EmplaceCoinInternalDANGER(std::move(outpoint),
|
||||||
|
std::move(coin),
|
||||||
|
/*set_dirty=*/false);
|
||||||
|
}
|
||||||
|
local_pairs.clear();
|
||||||
|
}
|
||||||
|
m_txids.clear();
|
||||||
|
m_outpoints.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
~InputFetcher()
|
||||||
|
{
|
||||||
|
WITH_LOCK(m_mutex, m_request_stop = true);
|
||||||
|
m_worker_cv.notify_all();
|
||||||
|
for (std::thread& t : m_worker_threads) {
|
||||||
|
t.join();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
#endif // BITCOIN_INPUTFETCHER_H
|
|
@ -47,6 +47,7 @@ add_executable(test_bitcoin
|
||||||
headers_sync_chainwork_tests.cpp
|
headers_sync_chainwork_tests.cpp
|
||||||
httpserver_tests.cpp
|
httpserver_tests.cpp
|
||||||
i2p_tests.cpp
|
i2p_tests.cpp
|
||||||
|
inputfetcher_tests.cpp
|
||||||
interfaces_tests.cpp
|
interfaces_tests.cpp
|
||||||
key_io_tests.cpp
|
key_io_tests.cpp
|
||||||
key_tests.cpp
|
key_tests.cpp
|
||||||
|
|
|
@ -53,6 +53,7 @@ add_executable(fuzz
|
||||||
hex.cpp
|
hex.cpp
|
||||||
http_request.cpp
|
http_request.cpp
|
||||||
i2p.cpp
|
i2p.cpp
|
||||||
|
inputfetcher.cpp
|
||||||
integer.cpp
|
integer.cpp
|
||||||
key.cpp
|
key.cpp
|
||||||
key_io.cpp
|
key_io.cpp
|
||||||
|
|
151
src/test/fuzz/inputfetcher.cpp
Normal file
151
src/test/fuzz/inputfetcher.cpp
Normal file
|
@ -0,0 +1,151 @@
|
||||||
|
// Copyright (c) 2024-present The Bitcoin Core developers
|
||||||
|
// Distributed under the MIT software license, see the accompanying
|
||||||
|
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
|
||||||
|
|
||||||
|
#include <consensus/amount.h>
|
||||||
|
#include <inputfetcher.h>
|
||||||
|
#include <test/fuzz/FuzzedDataProvider.h>
|
||||||
|
#include <test/fuzz/fuzz.h>
|
||||||
|
#include <test/fuzz/util.h>
|
||||||
|
#include <util/transaction_identifier.h>
|
||||||
|
|
||||||
|
#include <cstdint>
|
||||||
|
#include <map>
|
||||||
|
#include <optional>
|
||||||
|
#include <stdexcept>
|
||||||
|
#include <utility>
|
||||||
|
|
||||||
|
using DbMap = std::map<const COutPoint, std::pair<std::optional<const Coin>, bool>>;
|
||||||
|
|
||||||
|
class DbCoinsView : public CCoinsView
|
||||||
|
{
|
||||||
|
private:
|
||||||
|
DbMap& m_map;
|
||||||
|
|
||||||
|
public:
|
||||||
|
DbCoinsView(DbMap& map) : m_map(map) {}
|
||||||
|
|
||||||
|
std::optional<Coin> GetCoin(const COutPoint& outpoint) const override
|
||||||
|
{
|
||||||
|
const auto it{m_map.find(outpoint)};
|
||||||
|
assert(it != m_map.end());
|
||||||
|
const auto [coin, err] = it->second;
|
||||||
|
if (err) {
|
||||||
|
throw std::runtime_error("database error");
|
||||||
|
}
|
||||||
|
return coin;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
class NoAccessCoinsView : public CCoinsView
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
std::optional<Coin> GetCoin(const COutPoint& outpoint) const override
|
||||||
|
{
|
||||||
|
abort();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
FUZZ_TARGET(inputfetcher)
|
||||||
|
{
|
||||||
|
FuzzedDataProvider fuzzed_data_provider(buffer.data(), buffer.size());
|
||||||
|
|
||||||
|
const auto batch_size{
|
||||||
|
fuzzed_data_provider.ConsumeIntegralInRange<int32_t>(0, 1024)};
|
||||||
|
const auto worker_threads{
|
||||||
|
fuzzed_data_provider.ConsumeIntegralInRange<int32_t>(2, 4)};
|
||||||
|
InputFetcher fetcher{batch_size, worker_threads};
|
||||||
|
|
||||||
|
LIMITED_WHILE(fuzzed_data_provider.ConsumeBool(), 10000) {
|
||||||
|
CBlock block;
|
||||||
|
Txid prevhash{Txid::FromUint256(ConsumeUInt256(fuzzed_data_provider))};
|
||||||
|
|
||||||
|
DbMap db_map{};
|
||||||
|
std::map<const COutPoint, const Coin> cache_map{};
|
||||||
|
|
||||||
|
DbCoinsView db(db_map);
|
||||||
|
|
||||||
|
NoAccessCoinsView back;
|
||||||
|
CCoinsViewCache cache(&back);
|
||||||
|
|
||||||
|
LIMITED_WHILE(fuzzed_data_provider.ConsumeBool(), static_cast<uint32_t>(batch_size * worker_threads * 2)) {
|
||||||
|
CMutableTransaction tx;
|
||||||
|
|
||||||
|
LIMITED_WHILE(fuzzed_data_provider.ConsumeBool(), 10) {
|
||||||
|
const auto txid{fuzzed_data_provider.ConsumeBool()
|
||||||
|
? Txid::FromUint256(ConsumeUInt256(fuzzed_data_provider))
|
||||||
|
: prevhash};
|
||||||
|
const auto index{fuzzed_data_provider.ConsumeIntegral<uint32_t>()};
|
||||||
|
const COutPoint outpoint(txid, index);
|
||||||
|
|
||||||
|
tx.vin.emplace_back(outpoint);
|
||||||
|
|
||||||
|
std::optional<Coin> maybe_coin;
|
||||||
|
if (fuzzed_data_provider.ConsumeBool()) {
|
||||||
|
Coin coin{};
|
||||||
|
coin.fCoinBase = fuzzed_data_provider.ConsumeBool();
|
||||||
|
coin.nHeight =
|
||||||
|
fuzzed_data_provider.ConsumeIntegralInRange<int32_t>(
|
||||||
|
0, std::numeric_limits<int32_t>::max());
|
||||||
|
coin.out.nValue = ConsumeMoney(fuzzed_data_provider);
|
||||||
|
maybe_coin = coin;
|
||||||
|
} else {
|
||||||
|
maybe_coin = std::nullopt;
|
||||||
|
}
|
||||||
|
db_map.try_emplace(outpoint, std::make_pair(
|
||||||
|
maybe_coin,
|
||||||
|
fuzzed_data_provider.ConsumeBool()));
|
||||||
|
|
||||||
|
// Add the coin to the cache
|
||||||
|
if (fuzzed_data_provider.ConsumeBool()) {
|
||||||
|
Coin coin{};
|
||||||
|
coin.fCoinBase = fuzzed_data_provider.ConsumeBool();
|
||||||
|
coin.nHeight =
|
||||||
|
fuzzed_data_provider.ConsumeIntegralInRange<int32_t>(
|
||||||
|
0, std::numeric_limits<int32_t>::max());
|
||||||
|
coin.out.nValue =
|
||||||
|
fuzzed_data_provider.ConsumeIntegralInRange<int64_t>(
|
||||||
|
-1, MAX_MONEY);
|
||||||
|
cache_map.try_emplace(outpoint, coin);
|
||||||
|
cache.EmplaceCoinInternalDANGER(
|
||||||
|
COutPoint(outpoint),
|
||||||
|
std::move(coin),
|
||||||
|
/*set_dirty=*/fuzzed_data_provider.ConsumeBool());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
prevhash = tx.GetHash();
|
||||||
|
block.vtx.push_back(MakeTransactionRef(tx));
|
||||||
|
}
|
||||||
|
|
||||||
|
fetcher.FetchInputs(cache, db, block);
|
||||||
|
|
||||||
|
for (const auto& [outpoint, pair] : db_map) {
|
||||||
|
// Check pre-existing coins in the cache have not been updated
|
||||||
|
const auto it{cache_map.find(outpoint)};
|
||||||
|
if (it != cache_map.end()) {
|
||||||
|
const auto& cache_coin{it->second};
|
||||||
|
const auto& coin{cache.AccessCoin(outpoint)};
|
||||||
|
assert(coin.IsSpent() == cache_coin.IsSpent());
|
||||||
|
assert(coin.fCoinBase == cache_coin.fCoinBase);
|
||||||
|
assert(coin.nHeight == cache_coin.nHeight);
|
||||||
|
assert(coin.out == cache_coin.out);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!cache.HaveCoinInCache(outpoint)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
const auto& [maybe_coin, err] = pair;
|
||||||
|
assert(maybe_coin && !err);
|
||||||
|
|
||||||
|
// Check any newly added coins in the cache are the same as the db
|
||||||
|
const auto& coin{cache.AccessCoin(outpoint)};
|
||||||
|
assert(!coin.IsSpent());
|
||||||
|
assert(coin.fCoinBase == (*maybe_coin).fCoinBase);
|
||||||
|
assert(coin.nHeight == (*maybe_coin).nHeight);
|
||||||
|
assert(coin.out == (*maybe_coin).out);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
191
src/test/inputfetcher_tests.cpp
Normal file
191
src/test/inputfetcher_tests.cpp
Normal file
|
@ -0,0 +1,191 @@
|
||||||
|
// Copyright (c) 2024-present The Bitcoin Core developers
|
||||||
|
// Distributed under the MIT software license, see the accompanying
|
||||||
|
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
|
||||||
|
|
||||||
|
#include <coins.h>
|
||||||
|
#include <common/system.h>
|
||||||
|
#include <inputfetcher.h>
|
||||||
|
#include <primitives/block.h>
|
||||||
|
#include <primitives/transaction.h>
|
||||||
|
#include <test/util/random.h>
|
||||||
|
#include <test/util/setup_common.h>
|
||||||
|
#include <uint256.h>
|
||||||
|
#include <util/transaction_identifier.h>
|
||||||
|
|
||||||
|
#include <boost/test/unit_test.hpp>
|
||||||
|
|
||||||
|
#include <cstdint>
|
||||||
|
#include <memory>
|
||||||
|
#include <stdexcept>
|
||||||
|
#include <string>
|
||||||
|
#include <unordered_set>
|
||||||
|
|
||||||
|
BOOST_AUTO_TEST_SUITE(inputfetcher_tests)
|
||||||
|
|
||||||
|
struct InputFetcherTest : BasicTestingSetup {
|
||||||
|
private:
|
||||||
|
std::unique_ptr<InputFetcher> m_fetcher{nullptr};
|
||||||
|
std::unique_ptr<CBlock> m_block{nullptr};
|
||||||
|
|
||||||
|
CBlock CreateBlock(int32_t num_txs)
|
||||||
|
{
|
||||||
|
CBlock block;
|
||||||
|
CMutableTransaction coinbase;
|
||||||
|
coinbase.vin.emplace_back();
|
||||||
|
block.vtx.push_back(MakeTransactionRef(coinbase));
|
||||||
|
|
||||||
|
Txid prevhash{Txid::FromUint256(uint256(1))};
|
||||||
|
|
||||||
|
for (auto i{1}; i < num_txs; ++i) {
|
||||||
|
CMutableTransaction tx;
|
||||||
|
const auto txid{m_rng.randbool() ? Txid::FromUint256(uint256(i)) : prevhash};
|
||||||
|
tx.vin.emplace_back(COutPoint(txid, 0));
|
||||||
|
prevhash = tx.GetHash();
|
||||||
|
block.vtx.push_back(MakeTransactionRef(tx));
|
||||||
|
}
|
||||||
|
|
||||||
|
return block;
|
||||||
|
}
|
||||||
|
|
||||||
|
public:
|
||||||
|
explicit InputFetcherTest(const ChainType chainType = ChainType::MAIN,
|
||||||
|
TestOpts opts = {})
|
||||||
|
: BasicTestingSetup{chainType, opts}
|
||||||
|
{
|
||||||
|
SeedRandomForTest(SeedRand::ZEROS);
|
||||||
|
|
||||||
|
const auto cores{GetNumCores()};
|
||||||
|
const auto num_txs{m_rng.randrange(cores * 10)};
|
||||||
|
m_block = std::make_unique<CBlock>(CreateBlock(num_txs));
|
||||||
|
const auto batch_size{m_rng.randrange<int32_t>(m_block->vtx.size() * 2)};
|
||||||
|
const auto worker_threads{m_rng.randrange(cores * 2) + 1};
|
||||||
|
m_fetcher = std::make_unique<InputFetcher>(batch_size, worker_threads);
|
||||||
|
}
|
||||||
|
|
||||||
|
InputFetcher& getFetcher() { return *m_fetcher; }
|
||||||
|
const CBlock& getBlock() { return *m_block; }
|
||||||
|
};
|
||||||
|
|
||||||
|
BOOST_FIXTURE_TEST_CASE(fetch_inputs, InputFetcherTest)
|
||||||
|
{
|
||||||
|
const auto& block{getBlock()};
|
||||||
|
for (auto i{0}; i < 3; ++i) {
|
||||||
|
CCoinsView dummy;
|
||||||
|
CCoinsViewCache db(&dummy);
|
||||||
|
|
||||||
|
for (const auto& tx : block.vtx) {
|
||||||
|
for (const auto& in : tx->vin) {
|
||||||
|
auto outpoint{in.prevout};
|
||||||
|
Coin coin{};
|
||||||
|
coin.out.nValue = 1;
|
||||||
|
db.EmplaceCoinInternalDANGER(std::move(outpoint), std::move(coin));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
CCoinsViewCache cache(&db);
|
||||||
|
getFetcher().FetchInputs(cache, db, block);
|
||||||
|
|
||||||
|
std::unordered_set<Txid, SaltedTxidHasher> txids{};
|
||||||
|
txids.reserve(block.vtx.size() - 1);
|
||||||
|
|
||||||
|
for (const auto& tx : block.vtx) {
|
||||||
|
if (tx->IsCoinBase()) {
|
||||||
|
BOOST_CHECK(!cache.HaveCoinInCache(tx->vin[0].prevout));
|
||||||
|
} else {
|
||||||
|
for (const auto& in : tx->vin) {
|
||||||
|
const auto& outpoint{in.prevout};
|
||||||
|
const auto have{cache.HaveCoinInCache(outpoint)};
|
||||||
|
const auto should_have{!txids.contains(outpoint.hash)};
|
||||||
|
BOOST_CHECK(should_have ? have : !have);
|
||||||
|
}
|
||||||
|
txids.emplace(tx->GetHash());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test for the case where a block spends coins that are spent in the cache, but
|
||||||
|
// the spentness has not been flushed to the db. So the input fetcher will fetch
|
||||||
|
// the coin from the db since HaveCoinInCache will return false for an existing
|
||||||
|
// but spent coin. However, the fetched coin will fail to be inserted into the
|
||||||
|
// cache because the emplace call in EmplaceCoinInternalDANGER will not insert
|
||||||
|
// the unspent coin due to the collision with the already spent coin in the map.
|
||||||
|
BOOST_FIXTURE_TEST_CASE(fetch_no_double_spend, InputFetcherTest)
|
||||||
|
{
|
||||||
|
const auto& block{getBlock()};
|
||||||
|
for (auto i{0}; i < 3; ++i) {
|
||||||
|
CCoinsView dummy;
|
||||||
|
CCoinsViewCache db(&dummy);
|
||||||
|
|
||||||
|
for (const auto& tx : block.vtx) {
|
||||||
|
for (const auto& in : tx->vin) {
|
||||||
|
auto outpoint{in.prevout};
|
||||||
|
Coin coin{};
|
||||||
|
coin.out.nValue = 1;
|
||||||
|
db.EmplaceCoinInternalDANGER(std::move(outpoint), std::move(coin));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
CCoinsViewCache cache(&db);
|
||||||
|
|
||||||
|
// Add all inputs as spent already in cache
|
||||||
|
for (const auto& tx : block.vtx) {
|
||||||
|
for (const auto& in : tx->vin) {
|
||||||
|
auto outpoint{in.prevout};
|
||||||
|
Coin coin{};
|
||||||
|
cache.EmplaceCoinInternalDANGER(std::move(outpoint), std::move(coin));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
getFetcher().FetchInputs(cache, db, block);
|
||||||
|
|
||||||
|
// Coins are still spent, even though they exist unspent in the parent db
|
||||||
|
for (const auto& tx : block.vtx) {
|
||||||
|
for (const auto& in : tx->vin) {
|
||||||
|
BOOST_CHECK(!cache.HaveCoinInCache(in.prevout));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
BOOST_FIXTURE_TEST_CASE(fetch_no_inputs, InputFetcherTest)
|
||||||
|
{
|
||||||
|
const auto& block{getBlock()};
|
||||||
|
for (auto i{0}; i < 3; ++i) {
|
||||||
|
CCoinsView db;
|
||||||
|
CCoinsViewCache cache(&db);
|
||||||
|
getFetcher().FetchInputs(cache, db, block);
|
||||||
|
|
||||||
|
for (const auto& tx : block.vtx) {
|
||||||
|
for (const auto& in : tx->vin) {
|
||||||
|
BOOST_CHECK(!cache.HaveCoinInCache(in.prevout));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
class ThrowCoinsView : public CCoinsView
|
||||||
|
{
|
||||||
|
std::optional<Coin> GetCoin(const COutPoint& outpoint) const override
|
||||||
|
{
|
||||||
|
throw std::runtime_error("database error");
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
BOOST_FIXTURE_TEST_CASE(fetch_input_exceptions, InputFetcherTest)
|
||||||
|
{
|
||||||
|
const auto& block{getBlock()};
|
||||||
|
for (auto i{0}; i < 3; ++i) {
|
||||||
|
ThrowCoinsView db;
|
||||||
|
CCoinsViewCache cache(&db);
|
||||||
|
getFetcher().FetchInputs(cache, db, block);
|
||||||
|
|
||||||
|
for (const auto& tx : block.vtx) {
|
||||||
|
for (const auto& in : tx->vin) {
|
||||||
|
BOOST_CHECK(!cache.HaveCoinInCache(in.prevout));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
BOOST_AUTO_TEST_SUITE_END()
|
|
@ -3196,6 +3196,8 @@ bool Chainstate::ConnectTip(BlockValidationState& state, CBlockIndex* pindexNew,
|
||||||
LogDebug(BCLog::BENCH, " - Load block from disk: %.2fms\n",
|
LogDebug(BCLog::BENCH, " - Load block from disk: %.2fms\n",
|
||||||
Ticks<MillisecondsDouble>(time_2 - time_1));
|
Ticks<MillisecondsDouble>(time_2 - time_1));
|
||||||
{
|
{
|
||||||
|
m_chainman.GetInputFetcher().FetchInputs(CoinsTip(), CoinsDB(), blockConnecting);
|
||||||
|
|
||||||
CCoinsViewCache view(&CoinsTip());
|
CCoinsViewCache view(&CoinsTip());
|
||||||
bool rv = ConnectBlock(blockConnecting, state, pindexNew, view);
|
bool rv = ConnectBlock(blockConnecting, state, pindexNew, view);
|
||||||
if (m_chainman.m_options.signals) {
|
if (m_chainman.m_options.signals) {
|
||||||
|
@ -6303,6 +6305,7 @@ static ChainstateManager::Options&& Flatten(ChainstateManager::Options&& opts)
|
||||||
|
|
||||||
ChainstateManager::ChainstateManager(const util::SignalInterrupt& interrupt, Options options, node::BlockManager::Options blockman_options)
|
ChainstateManager::ChainstateManager(const util::SignalInterrupt& interrupt, Options options, node::BlockManager::Options blockman_options)
|
||||||
: m_script_check_queue{/*batch_size=*/128, std::clamp(options.worker_threads_num, 0, MAX_SCRIPTCHECK_THREADS)},
|
: m_script_check_queue{/*batch_size=*/128, std::clamp(options.worker_threads_num, 0, MAX_SCRIPTCHECK_THREADS)},
|
||||||
|
m_input_fetcher{/*batch_size=*/128, std::clamp(options.worker_threads_num, 0, MAX_SCRIPTCHECK_THREADS)},
|
||||||
m_interrupt{interrupt},
|
m_interrupt{interrupt},
|
||||||
m_options{Flatten(std::move(options))},
|
m_options{Flatten(std::move(options))},
|
||||||
m_blockman{interrupt, std::move(blockman_options)},
|
m_blockman{interrupt, std::move(blockman_options)},
|
||||||
|
|
|
@ -13,6 +13,7 @@
|
||||||
#include <consensus/amount.h>
|
#include <consensus/amount.h>
|
||||||
#include <cuckoocache.h>
|
#include <cuckoocache.h>
|
||||||
#include <deploymentstatus.h>
|
#include <deploymentstatus.h>
|
||||||
|
#include <inputfetcher.h>
|
||||||
#include <kernel/chain.h>
|
#include <kernel/chain.h>
|
||||||
#include <kernel/chainparams.h>
|
#include <kernel/chainparams.h>
|
||||||
#include <kernel/chainstatemanager_opts.h>
|
#include <kernel/chainstatemanager_opts.h>
|
||||||
|
@ -948,6 +949,7 @@ private:
|
||||||
|
|
||||||
//! A queue for script verifications that have to be performed by worker threads.
|
//! A queue for script verifications that have to be performed by worker threads.
|
||||||
CCheckQueue<CScriptCheck> m_script_check_queue;
|
CCheckQueue<CScriptCheck> m_script_check_queue;
|
||||||
|
InputFetcher m_input_fetcher;
|
||||||
|
|
||||||
//! Timers and counters used for benchmarking validation in both background
|
//! Timers and counters used for benchmarking validation in both background
|
||||||
//! and active chainstates.
|
//! and active chainstates.
|
||||||
|
@ -1323,6 +1325,7 @@ public:
|
||||||
void RecalculateBestHeader() EXCLUSIVE_LOCKS_REQUIRED(::cs_main);
|
void RecalculateBestHeader() EXCLUSIVE_LOCKS_REQUIRED(::cs_main);
|
||||||
|
|
||||||
CCheckQueue<CScriptCheck>& GetCheckQueue() { return m_script_check_queue; }
|
CCheckQueue<CScriptCheck>& GetCheckQueue() { return m_script_check_queue; }
|
||||||
|
InputFetcher& GetInputFetcher() { return m_input_fetcher; }
|
||||||
|
|
||||||
~ChainstateManager();
|
~ChainstateManager();
|
||||||
};
|
};
|
||||||
|
|
Loading…
Add table
Reference in a new issue