kernel: Remove dependency on CScheduler

By defining a virtual interface class for the scheduler client, users of
the kernel can now define their own event consuming infrastructure,
without having to spawn threads or rely on the scheduler design.

Removing CScheduler also allows removing the thread and
exception modules from the kernel library.
This commit is contained in:
TheCharlatan 2023-11-27 17:15:11 +01:00
parent 06069b3913
commit d5228efb53
No known key found for this signature in database
GPG key ID: 9B79B45691DB4173
8 changed files with 85 additions and 37 deletions

View file

@ -326,6 +326,7 @@ BITCOIN_CORE_H = \
util/spanparsing.h \
util/string.h \
util/syserror.h \
util/task_runner.h \
util/thread.h \
util/threadinterrupt.h \
util/threadnames.h \
@ -975,7 +976,6 @@ libbitcoinkernel_la_SOURCES = \
pubkey.cpp \
random.cpp \
randomenv.cpp \
scheduler.cpp \
script/interpreter.cpp \
script/script.cpp \
script/script_error.cpp \
@ -992,7 +992,6 @@ libbitcoinkernel_la_SOURCES = \
util/batchpriority.cpp \
util/chaintype.cpp \
util/check.cpp \
util/exception.cpp \
util/fs.cpp \
util/fs_helpers.cpp \
util/hasher.cpp \
@ -1003,7 +1002,6 @@ libbitcoinkernel_la_SOURCES = \
util/strencodings.cpp \
util/string.cpp \
util/syserror.cpp \
util/thread.cpp \
util/threadnames.cpp \
util/time.cpp \
util/tokenpipe.cpp \

View file

@ -23,11 +23,10 @@
#include <node/caches.h>
#include <node/chainstate.h>
#include <random.h>
#include <scheduler.h>
#include <script/sigcache.h>
#include <util/chaintype.h>
#include <util/fs.h>
#include <util/thread.h>
#include <util/task_runner.h>
#include <validation.h>
#include <validationinterface.h>
@ -68,16 +67,7 @@ int main(int argc, char* argv[])
Assert(InitSignatureCache(validation_cache_sizes.signature_cache_bytes));
Assert(InitScriptExecutionCache(validation_cache_sizes.script_execution_cache_bytes));
// SETUP: Scheduling and Background Signals
CScheduler scheduler{};
// Start the lightweight task scheduler thread
scheduler.m_service_thread = std::thread(util::TraceThread, "scheduler", [&] { scheduler.serviceQueue(); });
ValidationSignals validation_signals{scheduler};
// Gather some entropy once per minute.
scheduler.scheduleEvery(RandAddPeriodic, std::chrono::minutes{1});
ValidationSignals validation_signals{std::make_unique<util::ImmediateTaskRunner>()};
class KernelNotifications : public kernel::Notifications
{
@ -288,7 +278,6 @@ int main(int argc, char* argv[])
epilogue:
// Without this precise shutdown sequence, there will be a lot of nullptr
// dereferencing and UB.
scheduler.stop();
if (chainman.m_thread_load.joinable()) chainman.m_thread_load.join();
validation_signals.FlushBackgroundCallbacks();

View file

@ -1164,7 +1164,7 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info)
}, std::chrono::minutes{5});
assert(!node.validation_signals);
node.validation_signals = std::make_unique<ValidationSignals>(scheduler);
node.validation_signals = std::make_unique<ValidationSignals>(std::make_unique<SerialTaskRunner>(scheduler));
auto& validation_signals = *node.validation_signals;
// Create client interfaces for wallets that are supposed to be loaded

View file

@ -8,6 +8,7 @@
#include <attributes.h>
#include <sync.h>
#include <threadsafety.h>
#include <util/task_runner.h>
#include <chrono>
#include <condition_variable>
@ -120,12 +121,16 @@ private:
* B() will be able to observe all of the effects of callback A() which executed
* before it.
*/
class SerialTaskRunner
class SerialTaskRunner : public util::TaskRunnerInterface
{
private:
CScheduler& m_scheduler;
Mutex m_callbacks_mutex;
// We are not allowed to assume the scheduler only runs in one thread,
// but must ensure all callbacks happen in-order, so we end up creating
// our own queue here :(
std::list<std::function<void()>> m_callbacks_pending GUARDED_BY(m_callbacks_mutex);
bool m_are_callbacks_running GUARDED_BY(m_callbacks_mutex) = false;
@ -141,15 +146,15 @@ public:
* Practically, this means that callbacks can behave as if they are executed
* in order by a single thread.
*/
void insert(std::function<void()> func) EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex);
void insert(std::function<void()> func) override EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex);
/**
* Processes all remaining queue members on the calling thread, blocking until queue is empty
* Must be called after the CScheduler has no remaining processing threads!
*/
void flush() EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex);
void flush() override EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex);
size_t size() EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex);
size_t size() override EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex);
};
#endif // BITCOIN_SCHEDULER_H

View file

@ -171,7 +171,7 @@ ChainTestingSetup::ChainTestingSetup(const ChainType chainType, const std::vecto
// from blocking due to queue overrun.
m_node.scheduler = std::make_unique<CScheduler>();
m_node.scheduler->m_service_thread = std::thread(util::TraceThread, "scheduler", [&] { m_node.scheduler->serviceQueue(); });
m_node.validation_signals = std::make_unique<ValidationSignals>(*m_node.scheduler);
m_node.validation_signals = std::make_unique<ValidationSignals>(std::make_unique<SerialTaskRunner>(*m_node.scheduler));
m_node.fee_estimator = std::make_unique<CBlockPolicyEstimator>(FeeestPath(*m_node.args), DEFAULT_ACCEPT_STALE_FEE_ESTIMATES);
m_node.mempool = std::make_unique<CTxMemPool>(MemPoolOptionsForTest(m_node));

52
src/util/task_runner.h Normal file
View file

@ -0,0 +1,52 @@
// Copyright (c) 2024-present The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#ifndef BITCOIN_UTIL_TASK_RUNNER_H
#define BITCOIN_UTIL_TASK_RUNNER_H
#include <cstddef>
#include <functional>
namespace util {
/** @file
* This header provides an interface and simple implementation for a task
* runner. Another threaded, serial implementation using a queue is available in
* the scheduler module's SerialTaskRunner.
*/
class TaskRunnerInterface
{
public:
virtual ~TaskRunnerInterface() {}
/**
* The callback can either be queued for later/asynchronous/threaded
* processing, or be executed immediately for synchronous processing.
*/
virtual void insert(std::function<void()> func) = 0;
/**
* Forces the processing of all pending events.
*/
virtual void flush() = 0;
/**
* Returns the number of currently pending events.
*/
virtual size_t size() = 0;
};
class ImmediateTaskRunner : public TaskRunnerInterface
{
public:
void insert(std::function<void()> func) override { func(); }
void flush() override {}
size_t size() override { return 0; }
};
} // namespace util
#endif // BITCOIN_UTIL_TASK_RUNNER_H

View file

@ -5,7 +5,6 @@
#include <validationinterface.h>
#include <attributes.h>
#include <chain.h>
#include <consensus/validation.h>
#include <kernel/chain.h>
@ -13,7 +12,8 @@
#include <logging.h>
#include <primitives/block.h>
#include <primitives/transaction.h>
#include <scheduler.h>
#include <util/check.h>
#include <util/task_runner.h>
#include <future>
#include <unordered_map>
@ -42,12 +42,10 @@ private:
std::unordered_map<CValidationInterface*, std::list<ListEntry>::iterator> m_map GUARDED_BY(m_mutex);
public:
// We are not allowed to assume the scheduler only runs in one thread,
// but must ensure all callbacks happen in-order, so we end up creating
// our own queue here :(
SerialTaskRunner m_task_runner;
std::unique_ptr<util::TaskRunnerInterface> m_task_runner;
explicit ValidationSignalsImpl(CScheduler& scheduler LIFETIMEBOUND) : m_task_runner(scheduler) {}
explicit ValidationSignalsImpl(std::unique_ptr<util::TaskRunnerInterface> task_runner)
: m_task_runner{std::move(Assert(task_runner))} {}
void Register(std::shared_ptr<CValidationInterface> callbacks) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
{
@ -94,19 +92,19 @@ public:
}
};
ValidationSignals::ValidationSignals(CScheduler& scheduler)
: m_internals{std::make_unique<ValidationSignalsImpl>(scheduler)} {}
ValidationSignals::ValidationSignals(std::unique_ptr<util::TaskRunnerInterface> task_runner)
: m_internals{std::make_unique<ValidationSignalsImpl>(std::move(task_runner))} {}
ValidationSignals::~ValidationSignals() {}
void ValidationSignals::FlushBackgroundCallbacks()
{
m_internals->m_task_runner.flush();
m_internals->m_task_runner->flush();
}
size_t ValidationSignals::CallbacksPending()
{
return m_internals->m_task_runner.size();
return m_internals->m_task_runner->size();
}
void ValidationSignals::RegisterSharedValidationInterface(std::shared_ptr<CValidationInterface> callbacks)
@ -140,7 +138,7 @@ void ValidationSignals::UnregisterAllValidationInterfaces()
void ValidationSignals::CallFunctionInValidationInterfaceQueue(std::function<void()> func)
{
m_internals->m_task_runner.insert(std::move(func));
m_internals->m_task_runner->insert(std::move(func));
}
void ValidationSignals::SyncWithValidationInterfaceQueue()
@ -162,7 +160,7 @@ void ValidationSignals::SyncWithValidationInterfaceQueue()
do { \
auto local_name = (name); \
LOG_EVENT("Enqueuing " fmt, local_name, __VA_ARGS__); \
m_internals->m_task_runner.insert([=] { \
m_internals->m_task_runner->insert([=] { \
LOG_EVENT(fmt, local_name, __VA_ARGS__); \
event(); \
}); \

View file

@ -17,11 +17,14 @@
#include <memory>
#include <vector>
namespace util {
class TaskRunnerInterface;
} // namespace util
class BlockValidationState;
class CBlock;
class CBlockIndex;
struct CBlockLocator;
class CScheduler;
enum class MemPoolRemovalReason;
struct RemovedMempoolTransactionInfo;
struct NewMempoolTransactionInfo;
@ -160,7 +163,10 @@ private:
std::unique_ptr<ValidationSignalsImpl> m_internals;
public:
ValidationSignals(CScheduler& scheduler LIFETIMEBOUND);
// The task runner will block validation if it calls its insert method's
// func argument synchronously. In this class func contains a loop that
// dispatches a single validation event to all subscribers sequentially.
explicit ValidationSignals(std::unique_ptr<util::TaskRunnerInterface> task_runner);
~ValidationSignals();