refactor: remove boost::thread_group usage

This commit is contained in:
fanquake 2021-01-27 15:04:34 +08:00
parent c8b83510f4
commit dc8be12510
No known key found for this signature in database
GPG key ID: 2EEB9F5CC09526C1
12 changed files with 44 additions and 39 deletions

View file

@ -9,6 +9,8 @@
#include <logging.h> #include <logging.h>
#include <serialize.h> #include <serialize.h>
#include <cmath>
int CAddrInfo::GetTriedBucket(const uint256& nKey, const std::vector<bool> &asmap) const int CAddrInfo::GetTriedBucket(const uint256& nKey, const std::vector<bool> &asmap) const
{ {
uint64_t hash1 = (CHashWriter(SER_GETHASH, 0) << nKey << GetKey()).GetCheapHash(); uint64_t hash1 = (CHashWriter(SER_GETHASH, 0) << nKey << GetKey()).GetCheapHash();

View file

@ -21,6 +21,7 @@
#include <util/url.h> #include <util/url.h>
#include <algorithm> #include <algorithm>
#include <cmath>
#include <functional> #include <functional>
#include <memory> #include <memory>
#include <stdio.h> #include <stdio.h>

View file

@ -68,6 +68,8 @@
#include <set> #include <set>
#include <stdint.h> #include <stdint.h>
#include <stdio.h> #include <stdio.h>
#include <thread>
#include <vector>
#ifndef WIN32 #ifndef WIN32
#include <attributes.h> #include <attributes.h>
@ -78,7 +80,6 @@
#include <boost/algorithm/string/replace.hpp> #include <boost/algorithm/string/replace.hpp>
#include <boost/signals2/signal.hpp> #include <boost/signals2/signal.hpp>
#include <boost/thread/thread.hpp>
#if ENABLE_ZMQ #if ENABLE_ZMQ
#include <zmq/zmqabstractnotifier.h> #include <zmq/zmqabstractnotifier.h>
@ -155,8 +156,6 @@ static std::unique_ptr<ECCVerifyHandle> globalVerifyHandle;
static std::thread g_load_block; static std::thread g_load_block;
static boost::thread_group threadGroup;
void Interrupt(NodeContext& node) void Interrupt(NodeContext& node)
{ {
InterruptHTTPServer(); InterruptHTTPServer();
@ -218,11 +217,9 @@ void Shutdown(NodeContext& node)
StopTorControl(); StopTorControl();
// After everything has been shut down, but before things get flushed, stop the // After everything has been shut down, but before things get flushed, stop the
// CScheduler/checkqueue, threadGroup and load block thread. // CScheduler/checkqueue, scheduler and load block thread.
if (node.scheduler) node.scheduler->stop(); if (node.scheduler) node.scheduler->stop();
if (g_load_block.joinable()) g_load_block.join(); if (g_load_block.joinable()) g_load_block.join();
threadGroup.interrupt_all();
threadGroup.join_all();
StopScriptCheckWorkerThreads(); StopScriptCheckWorkerThreads();
// After the threads that potentially access these pointers have been stopped, // After the threads that potentially access these pointers have been stopped,
@ -1342,7 +1339,7 @@ bool AppInitMain(const util::Ref& context, NodeContext& node, interfaces::BlockA
node.scheduler = MakeUnique<CScheduler>(); node.scheduler = MakeUnique<CScheduler>();
// Start the lightweight task scheduler thread // Start the lightweight task scheduler thread
threadGroup.create_thread([&] { TraceThread("scheduler", [&] { node.scheduler->serviceQueue(); }); }); node.scheduler->m_service_thread = std::thread([&] { TraceThread("scheduler", [&] { node.scheduler->serviceQueue(); }); });
// Gather some entropy once per minute. // Gather some entropy once per minute.
node.scheduler->scheduleEvery([]{ node.scheduler->scheduleEvery([]{

View file

@ -9,6 +9,7 @@
#include <functional> #include <functional>
#include <list> #include <list>
#include <map> #include <map>
#include <thread>
#include <sync.h> #include <sync.h>
@ -35,6 +36,8 @@ public:
CScheduler(); CScheduler();
~CScheduler(); ~CScheduler();
std::thread m_service_thread;
typedef std::function<void()> Function; typedef std::function<void()> Function;
/** Call func at/after time t */ /** Call func at/after time t */
@ -62,8 +65,7 @@ public:
void MockForward(std::chrono::seconds delta_seconds); void MockForward(std::chrono::seconds delta_seconds);
/** /**
* Services the queue 'forever'. Should be run in a thread, * Services the queue 'forever'. Should be run in a thread.
* and interrupted using boost::interrupt_thread
*/ */
void serviceQueue(); void serviceQueue();
@ -72,12 +74,14 @@ public:
{ {
WITH_LOCK(newTaskMutex, stopRequested = true); WITH_LOCK(newTaskMutex, stopRequested = true);
newTaskScheduled.notify_all(); newTaskScheduled.notify_all();
if (m_service_thread.joinable()) m_service_thread.join();
} }
/** Tell any threads running serviceQueue to stop when there is no work left to be done */ /** Tell any threads running serviceQueue to stop when there is no work left to be done */
void StopWhenDrained() void StopWhenDrained()
{ {
WITH_LOCK(newTaskMutex, stopWhenEmpty = true); WITH_LOCK(newTaskMutex, stopWhenEmpty = true);
newTaskScheduled.notify_all(); newTaskScheduled.notify_all();
if (m_service_thread.joinable()) m_service_thread.join();
} }
/** /**

View file

@ -11,6 +11,8 @@
#include <util/system.h> #include <util/system.h>
#include <cuckoocache.h> #include <cuckoocache.h>
#include <boost/thread/lock_types.hpp>
#include <boost/thread/shared_mutex.hpp> #include <boost/thread/shared_mutex.hpp>
namespace { namespace {

View file

@ -10,7 +10,6 @@
#include <util/time.h> #include <util/time.h>
#include <boost/test/unit_test.hpp> #include <boost/test/unit_test.hpp>
#include <boost/thread/thread.hpp>
#include <atomic> #include <atomic>
#include <condition_variable> #include <condition_variable>
@ -363,11 +362,11 @@ BOOST_AUTO_TEST_CASE(test_CheckQueueControl_Locks)
{ {
auto queue = MakeUnique<Standard_Queue>(QUEUE_BATCH_SIZE); auto queue = MakeUnique<Standard_Queue>(QUEUE_BATCH_SIZE);
{ {
boost::thread_group tg; std::vector<std::thread> tg;
std::atomic<int> nThreads {0}; std::atomic<int> nThreads {0};
std::atomic<int> fails {0}; std::atomic<int> fails {0};
for (size_t i = 0; i < 3; ++i) { for (size_t i = 0; i < 3; ++i) {
tg.create_thread( tg.emplace_back(
[&]{ [&]{
CCheckQueueControl<FakeCheck> control(queue.get()); CCheckQueueControl<FakeCheck> control(queue.get());
// While sleeping, no other thread should execute to this point // While sleeping, no other thread should execute to this point
@ -376,11 +375,13 @@ BOOST_AUTO_TEST_CASE(test_CheckQueueControl_Locks)
fails += observed != nThreads; fails += observed != nThreads;
}); });
} }
tg.join_all(); for (auto& thread: tg) {
if (thread.joinable()) thread.join();
}
BOOST_REQUIRE_EQUAL(fails, 0); BOOST_REQUIRE_EQUAL(fails, 0);
} }
{ {
boost::thread_group tg; std::vector<std::thread> tg;
std::mutex m; std::mutex m;
std::condition_variable cv; std::condition_variable cv;
bool has_lock{false}; bool has_lock{false};
@ -389,7 +390,7 @@ BOOST_AUTO_TEST_CASE(test_CheckQueueControl_Locks)
bool done_ack{false}; bool done_ack{false};
{ {
std::unique_lock<std::mutex> l(m); std::unique_lock<std::mutex> l(m);
tg.create_thread([&]{ tg.emplace_back([&]{
CCheckQueueControl<FakeCheck> control(queue.get()); CCheckQueueControl<FakeCheck> control(queue.get());
std::unique_lock<std::mutex> ll(m); std::unique_lock<std::mutex> ll(m);
has_lock = true; has_lock = true;
@ -415,7 +416,9 @@ BOOST_AUTO_TEST_CASE(test_CheckQueueControl_Locks)
cv.notify_one(); cv.notify_one();
BOOST_REQUIRE(!fails); BOOST_REQUIRE(!fails);
} }
tg.join_all(); for (auto& thread: tg) {
if (thread.joinable()) thread.join();
}
} }
} }
BOOST_AUTO_TEST_SUITE_END() BOOST_AUTO_TEST_SUITE_END()

View file

@ -2,6 +2,8 @@
// Distributed under the MIT software license, see the accompanying // Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php. // file COPYING or http://www.opensource.org/licenses/mit-license.php.
#include <boost/test/unit_test.hpp> #include <boost/test/unit_test.hpp>
#include <boost/thread/lock_types.hpp>
#include <boost/thread/shared_mutex.hpp>
#include <cuckoocache.h> #include <cuckoocache.h>
#include <deque> #include <deque>
#include <random.h> #include <random.h>

View file

@ -7,10 +7,11 @@
#include <util/time.h> #include <util/time.h>
#include <boost/test/unit_test.hpp> #include <boost/test/unit_test.hpp>
#include <boost/thread/thread.hpp>
#include <functional> #include <functional>
#include <mutex> #include <mutex>
#include <thread>
#include <vector>
BOOST_AUTO_TEST_SUITE(scheduler_tests) BOOST_AUTO_TEST_SUITE(scheduler_tests)
@ -69,16 +70,16 @@ BOOST_AUTO_TEST_CASE(manythreads)
BOOST_CHECK(last > now); BOOST_CHECK(last > now);
// As soon as these are created they will start running and servicing the queue // As soon as these are created they will start running and servicing the queue
boost::thread_group microThreads; std::vector<std::thread> microThreads;
for (int i = 0; i < 5; i++) for (int i = 0; i < 5; i++)
microThreads.create_thread(std::bind(&CScheduler::serviceQueue, &microTasks)); microThreads.emplace_back(std::bind(&CScheduler::serviceQueue, &microTasks));
UninterruptibleSleep(std::chrono::microseconds{600}); UninterruptibleSleep(std::chrono::microseconds{600});
now = std::chrono::system_clock::now(); now = std::chrono::system_clock::now();
// More threads and more tasks: // More threads and more tasks:
for (int i = 0; i < 5; i++) for (int i = 0; i < 5; i++)
microThreads.create_thread(std::bind(&CScheduler::serviceQueue, &microTasks)); microThreads.emplace_back(std::bind(&CScheduler::serviceQueue, &microTasks));
for (int i = 0; i < 100; i++) { for (int i = 0; i < 100; i++) {
std::chrono::system_clock::time_point t = now + std::chrono::microseconds(randomMsec(rng)); std::chrono::system_clock::time_point t = now + std::chrono::microseconds(randomMsec(rng));
std::chrono::system_clock::time_point tReschedule = now + std::chrono::microseconds(500 + randomMsec(rng)); std::chrono::system_clock::time_point tReschedule = now + std::chrono::microseconds(500 + randomMsec(rng));
@ -91,7 +92,10 @@ BOOST_AUTO_TEST_CASE(manythreads)
// Drain the task queue then exit threads // Drain the task queue then exit threads
microTasks.StopWhenDrained(); microTasks.StopWhenDrained();
microThreads.join_all(); // ... wait until all the threads are done // wait until all the threads are done
for (auto& thread: microThreads) {
if (thread.joinable()) thread.join();
}
int counterSum = 0; int counterSum = 0;
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
@ -131,9 +135,9 @@ BOOST_AUTO_TEST_CASE(singlethreadedscheduler_ordered)
// if the queues only permit execution of one task at once then // if the queues only permit execution of one task at once then
// the extra threads should effectively be doing nothing // the extra threads should effectively be doing nothing
// if they don't we'll get out of order behaviour // if they don't we'll get out of order behaviour
boost::thread_group threads; std::vector<std::thread> threads;
for (int i = 0; i < 5; ++i) { for (int i = 0; i < 5; ++i) {
threads.create_thread(std::bind(&CScheduler::serviceQueue, &scheduler)); threads.emplace_back(std::bind(&CScheduler::serviceQueue, &scheduler));
} }
// these are not atomic, if SinglethreadedSchedulerClient prevents // these are not atomic, if SinglethreadedSchedulerClient prevents
@ -157,7 +161,9 @@ BOOST_AUTO_TEST_CASE(singlethreadedscheduler_ordered)
// finish up // finish up
scheduler.StopWhenDrained(); scheduler.StopWhenDrained();
threads.join_all(); for (auto& thread: threads) {
if (thread.joinable()) thread.join();
}
BOOST_CHECK_EQUAL(counter1, 100); BOOST_CHECK_EQUAL(counter1, 100);
BOOST_CHECK_EQUAL(counter2, 100); BOOST_CHECK_EQUAL(counter2, 100);

View file

@ -131,7 +131,7 @@ ChainTestingSetup::ChainTestingSetup(const std::string& chainName, const std::ve
// We have to run a scheduler thread to prevent ActivateBestChain // We have to run a scheduler thread to prevent ActivateBestChain
// from blocking due to queue overrun. // from blocking due to queue overrun.
m_node.scheduler = MakeUnique<CScheduler>(); m_node.scheduler = MakeUnique<CScheduler>();
threadGroup.create_thread([&] { TraceThread("scheduler", [&] { m_node.scheduler->serviceQueue(); }); }); m_node.scheduler->m_service_thread = std::thread([&] { TraceThread("scheduler", [&] { m_node.scheduler->serviceQueue(); }); });
GetMainSignals().RegisterBackgroundSignalScheduler(*m_node.scheduler); GetMainSignals().RegisterBackgroundSignalScheduler(*m_node.scheduler);
pblocktree.reset(new CBlockTreeDB(1 << 20, true)); pblocktree.reset(new CBlockTreeDB(1 << 20, true));
@ -150,8 +150,6 @@ ChainTestingSetup::ChainTestingSetup(const std::string& chainName, const std::ve
ChainTestingSetup::~ChainTestingSetup() ChainTestingSetup::~ChainTestingSetup()
{ {
if (m_node.scheduler) m_node.scheduler->stop(); if (m_node.scheduler) m_node.scheduler->stop();
threadGroup.interrupt_all();
threadGroup.join_all();
StopScriptCheckWorkerThreads(); StopScriptCheckWorkerThreads();
GetMainSignals().FlushBackgroundCallbacks(); GetMainSignals().FlushBackgroundCallbacks();
GetMainSignals().UnregisterBackgroundSignalScheduler(); GetMainSignals().UnregisterBackgroundSignalScheduler();

View file

@ -17,8 +17,7 @@
#include <util/string.h> #include <util/string.h>
#include <type_traits> #include <type_traits>
#include <vector>
#include <boost/thread/thread.hpp>
/** This is connected to the logger. Can be used to redirect logs to any other log */ /** This is connected to the logger. Can be used to redirect logs to any other log */
extern const std::function<void(const std::string&)> G_TEST_LOG_FUN; extern const std::function<void(const std::string&)> G_TEST_LOG_FUN;
@ -88,7 +87,6 @@ private:
* initialization behaviour. * initialization behaviour.
*/ */
struct ChainTestingSetup : public BasicTestingSetup { struct ChainTestingSetup : public BasicTestingSetup {
boost::thread_group threadGroup;
explicit ChainTestingSetup(const std::string& chainName = CBaseChainParams::MAIN, const std::vector<const char*>& extra_args = {}); explicit ChainTestingSetup(const std::string& chainName = CBaseChainParams::MAIN, const std::vector<const char*>& extra_args = {});
~ChainTestingSetup(); ~ChainTestingSetup();

View file

@ -35,8 +35,6 @@
#include <utility> #include <utility>
#include <vector> #include <vector>
#include <boost/thread/condition_variable.hpp> // for boost::thread_interrupted
class UniValue; class UniValue;
// Application startup time (used for uptime calculation) // Application startup time (used for uptime calculation)
@ -450,11 +448,6 @@ template <typename Callable> void TraceThread(const char* name, Callable func)
func(); func();
LogPrintf("%s thread exit\n", name); LogPrintf("%s thread exit\n", name);
} }
catch (const boost::thread_interrupted&)
{
LogPrintf("%s thread interrupt\n", name);
throw;
}
catch (const std::exception& e) { catch (const std::exception& e) {
PrintExceptionContinue(&e, name); PrintExceptionContinue(&e, name);
throw; throw;

View file

@ -67,9 +67,8 @@ EXPECTED_BOOST_INCLUDES=(
boost/signals2/optional_last_value.hpp boost/signals2/optional_last_value.hpp
boost/signals2/signal.hpp boost/signals2/signal.hpp
boost/test/unit_test.hpp boost/test/unit_test.hpp
boost/thread/condition_variable.hpp boost/thread/lock_types.hpp
boost/thread/shared_mutex.hpp boost/thread/shared_mutex.hpp
boost/thread/thread.hpp
) )
for BOOST_INCLUDE in $(git grep '^#include <boost/' -- "*.cpp" "*.h" | cut -f2 -d: | cut -f2 -d'<' | cut -f1 -d'>' | sort -u); do for BOOST_INCLUDE in $(git grep '^#include <boost/' -- "*.cpp" "*.h" | cut -f2 -d: | cut -f2 -d'<' | cut -f1 -d'>' | sort -u); do