mirror of
https://github.com/bitcoin/bitcoin.git
synced 2025-01-25 02:33:24 -03:00
scheduler: switch from boost to std
Changes from boost::chrono to std::chrono, boost::condition_var to std::condition_var, boost::mutex to sync.h Mutex, and reverselock.h to sync.h REVERSE_LOCK. Also adds threadsafety annotations to CScheduler members.
This commit is contained in:
parent
b9c4260127
commit
d0ebd93270
5 changed files with 52 additions and 70 deletions
|
@ -393,7 +393,7 @@ static UniValue mockscheduler(const JSONRPCRequest& request)
|
|||
// protect against null pointer dereference
|
||||
CHECK_NONFATAL(g_rpc_node);
|
||||
CHECK_NONFATAL(g_rpc_node->scheduler);
|
||||
g_rpc_node->scheduler->MockForward(boost::chrono::seconds(delta_seconds));
|
||||
g_rpc_node->scheduler->MockForward(std::chrono::seconds(delta_seconds));
|
||||
|
||||
return NullUniValue;
|
||||
}
|
||||
|
|
|
@ -5,7 +5,6 @@
|
|||
#include <scheduler.h>
|
||||
|
||||
#include <random.h>
|
||||
#include <reverselock.h>
|
||||
|
||||
#include <assert.h>
|
||||
#include <utility>
|
||||
|
@ -20,18 +19,9 @@ CScheduler::~CScheduler()
|
|||
}
|
||||
|
||||
|
||||
#if BOOST_VERSION < 105000
|
||||
static boost::system_time toPosixTime(const boost::chrono::system_clock::time_point& t)
|
||||
{
|
||||
// Creating the posix_time using from_time_t loses sub-second precision. So rather than exporting the time_point to time_t,
|
||||
// start with a posix_time at the epoch (0) and add the milliseconds that have passed since then.
|
||||
return boost::posix_time::from_time_t(0) + boost::posix_time::milliseconds(boost::chrono::duration_cast<boost::chrono::milliseconds>(t.time_since_epoch()).count());
|
||||
}
|
||||
#endif
|
||||
|
||||
void CScheduler::serviceQueue()
|
||||
{
|
||||
boost::unique_lock<boost::mutex> lock(newTaskMutex);
|
||||
WAIT_LOCK(newTaskMutex, lock);
|
||||
++nThreadsServicingQueue;
|
||||
|
||||
// newTaskMutex is locked throughout this loop EXCEPT
|
||||
|
@ -40,7 +30,7 @@ void CScheduler::serviceQueue()
|
|||
while (!shouldStop()) {
|
||||
try {
|
||||
if (!shouldStop() && taskQueue.empty()) {
|
||||
reverse_lock<boost::unique_lock<boost::mutex> > rlock(lock);
|
||||
REVERSE_LOCK(lock);
|
||||
}
|
||||
while (!shouldStop() && taskQueue.empty()) {
|
||||
// Wait until there is something to do.
|
||||
|
@ -50,21 +40,13 @@ void CScheduler::serviceQueue()
|
|||
// Wait until either there is a new task, or until
|
||||
// the time of the first item on the queue:
|
||||
|
||||
// wait_until needs boost 1.50 or later; older versions have timed_wait:
|
||||
#if BOOST_VERSION < 105000
|
||||
while (!shouldStop() && !taskQueue.empty() &&
|
||||
newTaskScheduled.timed_wait(lock, toPosixTime(taskQueue.begin()->first))) {
|
||||
// Keep waiting until timeout
|
||||
}
|
||||
#else
|
||||
// Some boost versions have a conflicting overload of wait_until that returns void.
|
||||
// Explicitly use a template here to avoid hitting that overload.
|
||||
while (!shouldStop() && !taskQueue.empty()) {
|
||||
boost::chrono::system_clock::time_point timeToWaitFor = taskQueue.begin()->first;
|
||||
if (newTaskScheduled.wait_until<>(lock, timeToWaitFor) == boost::cv_status::timeout)
|
||||
std::chrono::system_clock::time_point timeToWaitFor = taskQueue.begin()->first;
|
||||
if (newTaskScheduled.wait_until(lock, timeToWaitFor) == std::cv_status::timeout) {
|
||||
break; // Exit loop after timeout, it means we reached the time of the event
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
// If there are multiple threads, the queue can empty while we're waiting (another
|
||||
// thread may service the task we were waiting on).
|
||||
if (shouldStop() || taskQueue.empty())
|
||||
|
@ -76,7 +58,7 @@ void CScheduler::serviceQueue()
|
|||
{
|
||||
// Unlock before calling f, so it can reschedule itself or another task
|
||||
// without deadlocking:
|
||||
reverse_lock<boost::unique_lock<boost::mutex> > rlock(lock);
|
||||
REVERSE_LOCK(lock);
|
||||
f();
|
||||
}
|
||||
} catch (...) {
|
||||
|
@ -91,7 +73,7 @@ void CScheduler::serviceQueue()
|
|||
void CScheduler::stop(bool drain)
|
||||
{
|
||||
{
|
||||
boost::unique_lock<boost::mutex> lock(newTaskMutex);
|
||||
LOCK(newTaskMutex);
|
||||
if (drain)
|
||||
stopWhenEmpty = true;
|
||||
else
|
||||
|
@ -100,10 +82,10 @@ void CScheduler::stop(bool drain)
|
|||
newTaskScheduled.notify_all();
|
||||
}
|
||||
|
||||
void CScheduler::schedule(CScheduler::Function f, boost::chrono::system_clock::time_point t)
|
||||
void CScheduler::schedule(CScheduler::Function f, std::chrono::system_clock::time_point t)
|
||||
{
|
||||
{
|
||||
boost::unique_lock<boost::mutex> lock(newTaskMutex);
|
||||
LOCK(newTaskMutex);
|
||||
taskQueue.insert(std::make_pair(t, f));
|
||||
}
|
||||
newTaskScheduled.notify_one();
|
||||
|
@ -111,18 +93,18 @@ void CScheduler::schedule(CScheduler::Function f, boost::chrono::system_clock::t
|
|||
|
||||
void CScheduler::scheduleFromNow(CScheduler::Function f, int64_t deltaMilliSeconds)
|
||||
{
|
||||
schedule(f, boost::chrono::system_clock::now() + boost::chrono::milliseconds(deltaMilliSeconds));
|
||||
schedule(f, std::chrono::system_clock::now() + std::chrono::milliseconds(deltaMilliSeconds));
|
||||
}
|
||||
|
||||
void CScheduler::MockForward(boost::chrono::seconds delta_seconds)
|
||||
void CScheduler::MockForward(std::chrono::seconds delta_seconds)
|
||||
{
|
||||
assert(delta_seconds.count() > 0 && delta_seconds < boost::chrono::hours{1});
|
||||
assert(delta_seconds.count() > 0 && delta_seconds < std::chrono::hours{1});
|
||||
|
||||
{
|
||||
boost::unique_lock<boost::mutex> lock(newTaskMutex);
|
||||
LOCK(newTaskMutex);
|
||||
|
||||
// use temp_queue to maintain updated schedule
|
||||
std::multimap<boost::chrono::system_clock::time_point, Function> temp_queue;
|
||||
std::multimap<std::chrono::system_clock::time_point, Function> temp_queue;
|
||||
|
||||
for (const auto& element : taskQueue) {
|
||||
temp_queue.emplace_hint(temp_queue.cend(), element.first - delta_seconds, element.second);
|
||||
|
@ -147,10 +129,10 @@ void CScheduler::scheduleEvery(CScheduler::Function f, int64_t deltaMilliSeconds
|
|||
scheduleFromNow(std::bind(&Repeat, this, f, deltaMilliSeconds), deltaMilliSeconds);
|
||||
}
|
||||
|
||||
size_t CScheduler::getQueueInfo(boost::chrono::system_clock::time_point &first,
|
||||
boost::chrono::system_clock::time_point &last) const
|
||||
size_t CScheduler::getQueueInfo(std::chrono::system_clock::time_point &first,
|
||||
std::chrono::system_clock::time_point &last) const
|
||||
{
|
||||
boost::unique_lock<boost::mutex> lock(newTaskMutex);
|
||||
LOCK(newTaskMutex);
|
||||
size_t result = taskQueue.size();
|
||||
if (!taskQueue.empty()) {
|
||||
first = taskQueue.begin()->first;
|
||||
|
@ -160,7 +142,7 @@ size_t CScheduler::getQueueInfo(boost::chrono::system_clock::time_point &first,
|
|||
}
|
||||
|
||||
bool CScheduler::AreThreadsServicingQueue() const {
|
||||
boost::unique_lock<boost::mutex> lock(newTaskMutex);
|
||||
LOCK(newTaskMutex);
|
||||
return nThreadsServicingQueue;
|
||||
}
|
||||
|
||||
|
@ -174,7 +156,7 @@ void SingleThreadedSchedulerClient::MaybeScheduleProcessQueue() {
|
|||
if (m_are_callbacks_running) return;
|
||||
if (m_callbacks_pending.empty()) return;
|
||||
}
|
||||
m_pscheduler->schedule(std::bind(&SingleThreadedSchedulerClient::ProcessQueue, this));
|
||||
m_pscheduler->schedule(std::bind(&SingleThreadedSchedulerClient::ProcessQueue, this), std::chrono::system_clock::now());
|
||||
}
|
||||
|
||||
void SingleThreadedSchedulerClient::ProcessQueue() {
|
||||
|
|
|
@ -7,11 +7,12 @@
|
|||
|
||||
//
|
||||
// NOTE:
|
||||
// boost::thread / boost::chrono should be ported to std::thread / std::chrono
|
||||
// boost::thread should be ported to std::thread
|
||||
// when we support C++11.
|
||||
//
|
||||
#include <boost/chrono/chrono.hpp>
|
||||
#include <boost/thread.hpp>
|
||||
#include <condition_variable>
|
||||
#include <functional>
|
||||
#include <list>
|
||||
#include <map>
|
||||
|
||||
#include <sync.h>
|
||||
|
@ -27,8 +28,8 @@
|
|||
// s->scheduleFromNow(std::bind(Class::func, this, argument), 3);
|
||||
// boost::thread* t = new boost::thread(std::bind(CScheduler::serviceQueue, s));
|
||||
//
|
||||
// ... then at program shutdown, clean up the thread running serviceQueue:
|
||||
// t->interrupt();
|
||||
// ... then at program shutdown, make sure to call stop() to clean up the thread(s) running serviceQueue:
|
||||
// s->stop();
|
||||
// t->join();
|
||||
// delete t;
|
||||
// delete s; // Must be done after thread is interrupted/joined.
|
||||
|
@ -43,7 +44,7 @@ public:
|
|||
typedef std::function<void()> Function;
|
||||
|
||||
// Call func at/after time t
|
||||
void schedule(Function f, boost::chrono::system_clock::time_point t=boost::chrono::system_clock::now());
|
||||
void schedule(Function f, std::chrono::system_clock::time_point t);
|
||||
|
||||
// Convenience method: call f once deltaMilliSeconds from now
|
||||
void scheduleFromNow(Function f, int64_t deltaMilliSeconds);
|
||||
|
@ -60,7 +61,7 @@ public:
|
|||
* Iterates through items on taskQueue and reschedules them
|
||||
* to be delta_seconds sooner.
|
||||
*/
|
||||
void MockForward(boost::chrono::seconds delta_seconds);
|
||||
void MockForward(std::chrono::seconds delta_seconds);
|
||||
|
||||
// To keep things as simple as possible, there is no unschedule.
|
||||
|
||||
|
@ -75,20 +76,20 @@ public:
|
|||
|
||||
// Returns number of tasks waiting to be serviced,
|
||||
// and first and last task times
|
||||
size_t getQueueInfo(boost::chrono::system_clock::time_point &first,
|
||||
boost::chrono::system_clock::time_point &last) const;
|
||||
size_t getQueueInfo(std::chrono::system_clock::time_point &first,
|
||||
std::chrono::system_clock::time_point &last) const;
|
||||
|
||||
// Returns true if there are threads actively running in serviceQueue()
|
||||
bool AreThreadsServicingQueue() const;
|
||||
|
||||
private:
|
||||
std::multimap<boost::chrono::system_clock::time_point, Function> taskQueue;
|
||||
boost::condition_variable newTaskScheduled;
|
||||
mutable boost::mutex newTaskMutex;
|
||||
int nThreadsServicingQueue;
|
||||
bool stopRequested;
|
||||
bool stopWhenEmpty;
|
||||
bool shouldStop() const { return stopRequested || (stopWhenEmpty && taskQueue.empty()); }
|
||||
mutable Mutex newTaskMutex;
|
||||
std::condition_variable newTaskScheduled;
|
||||
std::multimap<std::chrono::system_clock::time_point, Function> taskQueue GUARDED_BY(newTaskMutex);
|
||||
int nThreadsServicingQueue GUARDED_BY(newTaskMutex);
|
||||
bool stopRequested GUARDED_BY(newTaskMutex);
|
||||
bool stopWhenEmpty GUARDED_BY(newTaskMutex);
|
||||
bool shouldStop() const EXCLUSIVE_LOCKS_REQUIRED(newTaskMutex) { return stopRequested || (stopWhenEmpty && taskQueue.empty()); }
|
||||
};
|
||||
|
||||
/**
|
||||
|
|
|
@ -11,13 +11,13 @@
|
|||
|
||||
BOOST_AUTO_TEST_SUITE(scheduler_tests)
|
||||
|
||||
static void microTask(CScheduler& s, boost::mutex& mutex, int& counter, int delta, boost::chrono::system_clock::time_point rescheduleTime)
|
||||
static void microTask(CScheduler& s, boost::mutex& mutex, int& counter, int delta, std::chrono::system_clock::time_point rescheduleTime)
|
||||
{
|
||||
{
|
||||
boost::unique_lock<boost::mutex> lock(mutex);
|
||||
counter += delta;
|
||||
}
|
||||
boost::chrono::system_clock::time_point noTime = boost::chrono::system_clock::time_point::min();
|
||||
std::chrono::system_clock::time_point noTime = std::chrono::system_clock::time_point::min();
|
||||
if (rescheduleTime != noTime) {
|
||||
CScheduler::Function f = std::bind(µTask, std::ref(s), std::ref(mutex), std::ref(counter), -delta + 1, noTime);
|
||||
s.schedule(f, rescheduleTime);
|
||||
|
@ -45,15 +45,15 @@ BOOST_AUTO_TEST_CASE(manythreads)
|
|||
auto randomMsec = [](FastRandomContext& rc) -> int { return -11 + (int)rc.randrange(1012); }; // [-11, 1000]
|
||||
auto randomDelta = [](FastRandomContext& rc) -> int { return -1000 + (int)rc.randrange(2001); }; // [-1000, 1000]
|
||||
|
||||
boost::chrono::system_clock::time_point start = boost::chrono::system_clock::now();
|
||||
boost::chrono::system_clock::time_point now = start;
|
||||
boost::chrono::system_clock::time_point first, last;
|
||||
std::chrono::system_clock::time_point start = std::chrono::system_clock::now();
|
||||
std::chrono::system_clock::time_point now = start;
|
||||
std::chrono::system_clock::time_point first, last;
|
||||
size_t nTasks = microTasks.getQueueInfo(first, last);
|
||||
BOOST_CHECK(nTasks == 0);
|
||||
|
||||
for (int i = 0; i < 100; ++i) {
|
||||
boost::chrono::system_clock::time_point t = now + boost::chrono::microseconds(randomMsec(rng));
|
||||
boost::chrono::system_clock::time_point tReschedule = now + boost::chrono::microseconds(500 + randomMsec(rng));
|
||||
std::chrono::system_clock::time_point t = now + std::chrono::microseconds(randomMsec(rng));
|
||||
std::chrono::system_clock::time_point tReschedule = now + std::chrono::microseconds(500 + randomMsec(rng));
|
||||
int whichCounter = zeroToNine(rng);
|
||||
CScheduler::Function f = std::bind(µTask, std::ref(microTasks),
|
||||
std::ref(counterMutex[whichCounter]), std::ref(counter[whichCounter]),
|
||||
|
@ -71,14 +71,14 @@ BOOST_AUTO_TEST_CASE(manythreads)
|
|||
microThreads.create_thread(std::bind(&CScheduler::serviceQueue, µTasks));
|
||||
|
||||
UninterruptibleSleep(std::chrono::microseconds{600});
|
||||
now = boost::chrono::system_clock::now();
|
||||
now = std::chrono::system_clock::now();
|
||||
|
||||
// More threads and more tasks:
|
||||
for (int i = 0; i < 5; i++)
|
||||
microThreads.create_thread(std::bind(&CScheduler::serviceQueue, µTasks));
|
||||
for (int i = 0; i < 100; i++) {
|
||||
boost::chrono::system_clock::time_point t = now + boost::chrono::microseconds(randomMsec(rng));
|
||||
boost::chrono::system_clock::time_point tReschedule = now + boost::chrono::microseconds(500 + randomMsec(rng));
|
||||
std::chrono::system_clock::time_point t = now + std::chrono::microseconds(randomMsec(rng));
|
||||
std::chrono::system_clock::time_point tReschedule = now + std::chrono::microseconds(500 + randomMsec(rng));
|
||||
int whichCounter = zeroToNine(rng);
|
||||
CScheduler::Function f = std::bind(µTask, std::ref(microTasks),
|
||||
std::ref(counterMutex[whichCounter]), std::ref(counter[whichCounter]),
|
||||
|
@ -157,14 +157,14 @@ BOOST_AUTO_TEST_CASE(mockforward)
|
|||
scheduler.scheduleFromNow(dummy, 8*min_in_milli);
|
||||
|
||||
// check taskQueue
|
||||
boost::chrono::system_clock::time_point first, last;
|
||||
std::chrono::system_clock::time_point first, last;
|
||||
size_t num_tasks = scheduler.getQueueInfo(first, last);
|
||||
BOOST_CHECK_EQUAL(num_tasks, 3ul);
|
||||
|
||||
std::thread scheduler_thread([&]() { scheduler.serviceQueue(); });
|
||||
|
||||
// bump the scheduler forward 5 minutes
|
||||
scheduler.MockForward(boost::chrono::seconds(5*60));
|
||||
scheduler.MockForward(std::chrono::seconds(5*60));
|
||||
|
||||
// ensure scheduler has chance to process all tasks queued for before 1 ms from now.
|
||||
scheduler.scheduleFromNow([&scheduler]{ scheduler.stop(false); }, 1);
|
||||
|
@ -178,8 +178,8 @@ BOOST_AUTO_TEST_CASE(mockforward)
|
|||
BOOST_CHECK_EQUAL(counter, 2);
|
||||
|
||||
// check that the time of the remaining job has been updated
|
||||
boost::chrono::system_clock::time_point now = boost::chrono::system_clock::now();
|
||||
int delta = boost::chrono::duration_cast<boost::chrono::seconds>(first - now).count();
|
||||
std::chrono::system_clock::time_point now = std::chrono::system_clock::now();
|
||||
int delta = std::chrono::duration_cast<std::chrono::seconds>(first - now).count();
|
||||
// should be between 2 & 3 minutes from now
|
||||
BOOST_CHECK(delta > 2*60 && delta < 3*60);
|
||||
}
|
||||
|
|
|
@ -53,7 +53,6 @@ EXPECTED_BOOST_INCLUDES=(
|
|||
boost/algorithm/string/classification.hpp
|
||||
boost/algorithm/string/replace.hpp
|
||||
boost/algorithm/string/split.hpp
|
||||
boost/chrono/chrono.hpp
|
||||
boost/date_time/posix_time/posix_time.hpp
|
||||
boost/filesystem.hpp
|
||||
boost/filesystem/fstream.hpp
|
||||
|
|
Loading…
Add table
Reference in a new issue