From 08096bbbc6d6fef86943ca8ce5e6de18744d58ea Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Mon, 10 Apr 2017 14:55:49 -0400 Subject: [PATCH] Support more than one CScheduler thread for serial clients This will be used by CValidationInterface soon. This requires a bit of work as we need to ensure that most of our callbacks happen in-order (to avoid synchronization issues in wallet) - we keep our own internal queue and push things onto it, scheduling a queue-draining function immediately upon new callbacks. --- src/scheduler.cpp | 52 +++++++++++++++++++++++++++++++++++++ src/scheduler.h | 24 +++++++++++++++++ src/validationinterface.cpp | 22 ++++++++++------ src/validationinterface.h | 2 -- 4 files changed, 90 insertions(+), 10 deletions(-) diff --git a/src/scheduler.cpp b/src/scheduler.cpp index 923ba2c231..a76a87e10a 100644 --- a/src/scheduler.cpp +++ b/src/scheduler.cpp @@ -139,3 +139,55 @@ size_t CScheduler::getQueueInfo(boost::chrono::system_clock::time_point &first, } return result; } + + +void SingleThreadedSchedulerClient::MaybeScheduleProcessQueue() { + { + LOCK(m_cs_callbacks_pending); + // Try to avoid scheduling too many copies here, but if we + // accidentally have two ProcessQueue's scheduled at once its + // not a big deal. + if (m_are_callbacks_running) return; + if (m_callbacks_pending.empty()) return; + } + m_pscheduler->schedule(std::bind(&SingleThreadedSchedulerClient::ProcessQueue, this)); +} + +void SingleThreadedSchedulerClient::ProcessQueue() { + std::function callback; + { + LOCK(m_cs_callbacks_pending); + if (m_are_callbacks_running) return; + if (m_callbacks_pending.empty()) return; + m_are_callbacks_running = true; + + callback = std::move(m_callbacks_pending.front()); + m_callbacks_pending.pop_front(); + } + + // RAII the setting of fCallbacksRunning and calling MaybeScheduleProcessQueue + // to ensure both happen safely even if callback() throws. + struct RAIICallbacksRunning { + SingleThreadedSchedulerClient* instance; + RAIICallbacksRunning(SingleThreadedSchedulerClient* _instance) : instance(_instance) {} + ~RAIICallbacksRunning() { + { + LOCK(instance->m_cs_callbacks_pending); + instance->m_are_callbacks_running = false; + } + instance->MaybeScheduleProcessQueue(); + } + } raiicallbacksrunning(this); + + callback(); +} + +void SingleThreadedSchedulerClient::AddToProcessQueue(std::function func) { + assert(m_pscheduler); + + { + LOCK(m_cs_callbacks_pending); + m_callbacks_pending.emplace_back(std::move(func)); + } + MaybeScheduleProcessQueue(); +} diff --git a/src/scheduler.h b/src/scheduler.h index 5da6e6f69f..82036afdf0 100644 --- a/src/scheduler.h +++ b/src/scheduler.h @@ -14,6 +14,8 @@ #include #include +#include "sync.h" + // // Simple class for background tasks that should be run // periodically or once "after a while" @@ -79,4 +81,26 @@ private: bool shouldStop() { return stopRequested || (stopWhenEmpty && taskQueue.empty()); } }; +/** + * Class used by CScheduler clients which may schedule multiple jobs + * which are required to be run serially. Does not require such jobs + * to be executed on the same thread, but no two jobs will be executed + * at the same time. + */ +class SingleThreadedSchedulerClient { +private: + CScheduler *m_pscheduler; + + CCriticalSection m_cs_callbacks_pending; + std::list> m_callbacks_pending; + bool m_are_callbacks_running = false; + + void MaybeScheduleProcessQueue(); + void ProcessQueue(); + +public: + SingleThreadedSchedulerClient(CScheduler *pschedulerIn) : m_pscheduler(pschedulerIn) {} + void AddToProcessQueue(std::function func); +}; + #endif diff --git a/src/validationinterface.cpp b/src/validationinterface.cpp index a17a08eee2..8edc7c398d 100644 --- a/src/validationinterface.cpp +++ b/src/validationinterface.cpp @@ -6,6 +6,11 @@ #include "validationinterface.h" #include "init.h" #include "scheduler.h" +#include "sync.h" +#include "util.h" + +#include +#include #include @@ -20,22 +25,23 @@ struct MainSignalsInstance { boost::signals2::signal BlockChecked; boost::signals2::signal&)> NewPoWValidBlock; - CScheduler *m_scheduler = NULL; + // We are not allowed to assume the scheduler only runs in one thread, + // but must ensure all callbacks happen in-order, so we end up creating + // our own queue here :( + SingleThreadedSchedulerClient m_schedulerClient; + + MainSignalsInstance(CScheduler *pscheduler) : m_schedulerClient(pscheduler) {} }; static CMainSignals g_signals; -CMainSignals::CMainSignals() { - m_internals.reset(new MainSignalsInstance()); -} - void CMainSignals::RegisterBackgroundSignalScheduler(CScheduler& scheduler) { - assert(!m_internals->m_scheduler); - m_internals->m_scheduler = &scheduler; + assert(!m_internals); + m_internals.reset(new MainSignalsInstance(&scheduler)); } void CMainSignals::UnregisterBackgroundSignalScheduler() { - m_internals->m_scheduler = NULL; + m_internals.reset(nullptr); } CMainSignals& GetMainSignals() diff --git a/src/validationinterface.h b/src/validationinterface.h index 8cae3c6db4..fbfe273b10 100644 --- a/src/validationinterface.h +++ b/src/validationinterface.h @@ -75,8 +75,6 @@ private: friend void ::UnregisterAllValidationInterfaces(); public: - CMainSignals(); - /** Register a CScheduler to give callbacks which should run in the background (may only be called once) */ void RegisterBackgroundSignalScheduler(CScheduler& scheduler); /** Unregister a CScheduler to give callbacks which should run in the background - these callbacks will now be dropped! */