diff --git a/src/bench/checkqueue.cpp b/src/bench/checkqueue.cpp index ffa772d8c1..d7b8c1badc 100644 --- a/src/bench/checkqueue.cpp +++ b/src/bench/checkqueue.cpp @@ -10,8 +10,6 @@ #include #include -#include - #include static const size_t BATCHES = 101; @@ -44,12 +42,9 @@ static void CCheckQueueSpeedPrevectorJob(benchmark::Bench& bench) void swap(PrevectorJob& x){p.swap(x.p);}; }; CCheckQueue queue {QUEUE_BATCH_SIZE}; - boost::thread_group tg; // The main thread should be counted to prevent thread oversubscription, and // to decrease the variance of benchmark results. - for (auto x = 0; x < GetNumCores() - 1; ++x) { - tg.create_thread([&]{queue.Thread();}); - } + queue.StartWorkerThreads(GetNumCores() - 1); // create all the data once, then submit copies in the benchmark. FastRandomContext insecure_rand(true); @@ -70,8 +65,7 @@ static void CCheckQueueSpeedPrevectorJob(benchmark::Bench& bench) // it is done explicitly here for clarity control.Wait(); }); - tg.interrupt_all(); - tg.join_all(); + queue.StopWorkerThreads(); ECC_Stop(); } BENCHMARK(CCheckQueueSpeedPrevectorJob); diff --git a/src/checkqueue.h b/src/checkqueue.h index e3faa1dec0..4ceeb3600a 100644 --- a/src/checkqueue.h +++ b/src/checkqueue.h @@ -6,13 +6,12 @@ #define BITCOIN_CHECKQUEUE_H #include +#include +#include #include #include -#include -#include - template class CCheckQueueControl; @@ -31,61 +30,64 @@ class CCheckQueue { private: //! Mutex to protect the inner state - boost::mutex mutex; + Mutex m_mutex; //! Worker threads block on this when out of work - boost::condition_variable condWorker; + std::condition_variable m_worker_cv; //! Master thread blocks on this when out of work - boost::condition_variable condMaster; + std::condition_variable m_master_cv; //! The queue of elements to be processed. //! As the order of booleans doesn't matter, it is used as a LIFO (stack) - std::vector queue; + std::vector queue GUARDED_BY(m_mutex); //! The number of workers (including the master) that are idle. - int nIdle; + int nIdle GUARDED_BY(m_mutex){0}; //! The total number of workers (including the master). - int nTotal; + int nTotal GUARDED_BY(m_mutex){0}; //! The temporary evaluation result. - bool fAllOk; + bool fAllOk GUARDED_BY(m_mutex){true}; /** * Number of verifications that haven't completed yet. * This includes elements that are no longer queued, but still in the * worker's own batches. */ - unsigned int nTodo; + unsigned int nTodo GUARDED_BY(m_mutex){0}; //! The maximum number of elements to be processed in one batch - unsigned int nBatchSize; + const unsigned int nBatchSize; + + std::vector m_worker_threads; + bool m_request_stop GUARDED_BY(m_mutex){false}; /** Internal function that does bulk of the verification work. */ - bool Loop(bool fMaster = false) + bool Loop(bool fMaster) { - boost::condition_variable& cond = fMaster ? condMaster : condWorker; + std::condition_variable& cond = fMaster ? m_master_cv : m_worker_cv; std::vector vChecks; vChecks.reserve(nBatchSize); unsigned int nNow = 0; bool fOk = true; do { { - boost::unique_lock lock(mutex); + WAIT_LOCK(m_mutex, lock); // first do the clean-up of the previous loop run (allowing us to do it in the same critsect) if (nNow) { fAllOk &= fOk; nTodo -= nNow; if (nTodo == 0 && !fMaster) // We processed the last element; inform the master it can exit and return the result - condMaster.notify_one(); + m_master_cv.notify_one(); } else { // first iteration nTotal++; } // logically, the do loop starts here - while (queue.empty()) { + while (queue.empty() && !m_request_stop) { if (fMaster && nTodo == 0) { nTotal--; bool fRet = fAllOk; @@ -98,6 +100,10 @@ private: cond.wait(lock); // wait nIdle--; } + if (m_request_stop) { + return false; + } + // Decide how many work units to process now. // * Do not try to do everything at once, but aim for increasingly smaller batches so // all workers finish approximately simultaneously. @@ -106,7 +112,7 @@ private: nNow = std::max(1U, std::min(nBatchSize, (unsigned int)queue.size() / (nTotal + nIdle + 1))); vChecks.resize(nNow); for (unsigned int i = 0; i < nNow; i++) { - // We want the lock on the mutex to be as short as possible, so swap jobs from the global + // We want the lock on the m_mutex to be as short as possible, so swap jobs from the global // queue to the local batch vector instead of copying. vChecks[i].swap(queue.back()); queue.pop_back(); @@ -124,40 +130,68 @@ private: public: //! Mutex to ensure only one concurrent CCheckQueueControl - boost::mutex ControlMutex; + Mutex m_control_mutex; //! Create a new check queue - explicit CCheckQueue(unsigned int nBatchSizeIn) : nIdle(0), nTotal(0), fAllOk(true), nTodo(0), nBatchSize(nBatchSizeIn) {} - - //! Worker thread - void Thread() + explicit CCheckQueue(unsigned int nBatchSizeIn) + : nBatchSize(nBatchSizeIn) { - Loop(); + } + + //! Create a pool of new worker threads. + void StartWorkerThreads(const int threads_num) + { + { + LOCK(m_mutex); + nIdle = 0; + nTotal = 0; + fAllOk = true; + } + assert(m_worker_threads.empty()); + for (int n = 0; n < threads_num; ++n) { + m_worker_threads.emplace_back([this, n]() { + util::ThreadRename(strprintf("scriptch.%i", n)); + Loop(false /* worker thread */); + }); + } } //! Wait until execution finishes, and return whether all evaluations were successful. bool Wait() { - return Loop(true); + return Loop(true /* master thread */); } //! Add a batch of checks to the queue void Add(std::vector& vChecks) { - boost::unique_lock lock(mutex); + LOCK(m_mutex); for (T& check : vChecks) { queue.push_back(T()); check.swap(queue.back()); } nTodo += vChecks.size(); if (vChecks.size() == 1) - condWorker.notify_one(); + m_worker_cv.notify_one(); else if (vChecks.size() > 1) - condWorker.notify_all(); + m_worker_cv.notify_all(); + } + + //! Stop all of the worker threads. + void StopWorkerThreads() + { + WITH_LOCK(m_mutex, m_request_stop = true); + m_worker_cv.notify_all(); + for (std::thread& t : m_worker_threads) { + t.join(); + } + m_worker_threads.clear(); + WITH_LOCK(m_mutex, m_request_stop = false); } ~CCheckQueue() { + assert(m_worker_threads.empty()); } }; @@ -181,7 +215,7 @@ public: { // passed queue is supposed to be unused, or nullptr if (pqueue != nullptr) { - ENTER_CRITICAL_SECTION(pqueue->ControlMutex); + ENTER_CRITICAL_SECTION(pqueue->m_control_mutex); } } @@ -205,7 +239,7 @@ public: if (!fDone) Wait(); if (pqueue != nullptr) { - LEAVE_CRITICAL_SECTION(pqueue->ControlMutex); + LEAVE_CRITICAL_SECTION(pqueue->m_control_mutex); } } }; diff --git a/src/init.cpp b/src/init.cpp index 09eb76eaee..8e37300c99 100644 --- a/src/init.cpp +++ b/src/init.cpp @@ -223,6 +223,7 @@ void Shutdown(NodeContext& node) if (g_load_block.joinable()) g_load_block.join(); threadGroup.interrupt_all(); threadGroup.join_all(); + StopScriptCheckWorkerThreads(); // After the threads that potentially access these pointers have been stopped, // destruct and reset all to nullptr. @@ -1334,9 +1335,7 @@ bool AppInitMain(const util::Ref& context, NodeContext& node, interfaces::BlockA LogPrintf("Script verification uses %d additional threads\n", script_threads); if (script_threads >= 1) { g_parallel_script_checks = true; - for (int i = 0; i < script_threads; ++i) { - threadGroup.create_thread([i]() { return ThreadScriptCheck(i); }); - } + StartScriptCheckWorkerThreads(script_threads); } assert(!node.scheduler); diff --git a/src/test/checkqueue_tests.cpp b/src/test/checkqueue_tests.cpp index 5dbf07b420..76d6727044 100644 --- a/src/test/checkqueue_tests.cpp +++ b/src/test/checkqueue_tests.cpp @@ -148,10 +148,7 @@ typedef CCheckQueue FrozenCleanup_Queue; static void Correct_Queue_range(std::vector range) { auto small_queue = MakeUnique(QUEUE_BATCH_SIZE); - boost::thread_group tg; - for (auto x = 0; x < SCRIPT_CHECK_THREADS; ++x) { - tg.create_thread([&]{small_queue->Thread();}); - } + small_queue->StartWorkerThreads(SCRIPT_CHECK_THREADS); // Make vChecks here to save on malloc (this test can be slow...) std::vector vChecks; for (const size_t i : range) { @@ -168,8 +165,7 @@ static void Correct_Queue_range(std::vector range) BOOST_REQUIRE_EQUAL(FakeCheckCheckCompletion::n_calls, i); } } - tg.interrupt_all(); - tg.join_all(); + small_queue->StopWorkerThreads(); } /** Test that 0 checks is correct @@ -212,11 +208,7 @@ BOOST_AUTO_TEST_CASE(test_CheckQueue_Correct_Random) BOOST_AUTO_TEST_CASE(test_CheckQueue_Catches_Failure) { auto fail_queue = MakeUnique(QUEUE_BATCH_SIZE); - - boost::thread_group tg; - for (auto x = 0; x < SCRIPT_CHECK_THREADS; ++x) { - tg.create_thread([&]{fail_queue->Thread();}); - } + fail_queue->StartWorkerThreads(SCRIPT_CHECK_THREADS); for (size_t i = 0; i < 1001; ++i) { CCheckQueueControl control(fail_queue.get()); @@ -237,18 +229,14 @@ BOOST_AUTO_TEST_CASE(test_CheckQueue_Catches_Failure) BOOST_REQUIRE(success); } } - tg.interrupt_all(); - tg.join_all(); + fail_queue->StopWorkerThreads(); } // Test that a block validation which fails does not interfere with // future blocks, ie, the bad state is cleared. BOOST_AUTO_TEST_CASE(test_CheckQueue_Recovers_From_Failure) { auto fail_queue = MakeUnique(QUEUE_BATCH_SIZE); - boost::thread_group tg; - for (auto x = 0; x < SCRIPT_CHECK_THREADS; ++x) { - tg.create_thread([&]{fail_queue->Thread();}); - } + fail_queue->StartWorkerThreads(SCRIPT_CHECK_THREADS); for (auto times = 0; times < 10; ++times) { for (const bool end_fails : {true, false}) { @@ -263,8 +251,7 @@ BOOST_AUTO_TEST_CASE(test_CheckQueue_Recovers_From_Failure) BOOST_REQUIRE(r != end_fails); } } - tg.interrupt_all(); - tg.join_all(); + fail_queue->StopWorkerThreads(); } // Test that unique checks are actually all called individually, rather than @@ -273,11 +260,7 @@ BOOST_AUTO_TEST_CASE(test_CheckQueue_Recovers_From_Failure) BOOST_AUTO_TEST_CASE(test_CheckQueue_UniqueCheck) { auto queue = MakeUnique(QUEUE_BATCH_SIZE); - boost::thread_group tg; - for (auto x = 0; x < SCRIPT_CHECK_THREADS; ++x) { - tg.create_thread([&]{queue->Thread();}); - - } + queue->StartWorkerThreads(SCRIPT_CHECK_THREADS); size_t COUNT = 100000; size_t total = COUNT; @@ -300,8 +283,7 @@ BOOST_AUTO_TEST_CASE(test_CheckQueue_UniqueCheck) } BOOST_REQUIRE(r); } - tg.interrupt_all(); - tg.join_all(); + queue->StopWorkerThreads(); } @@ -313,10 +295,7 @@ BOOST_AUTO_TEST_CASE(test_CheckQueue_UniqueCheck) BOOST_AUTO_TEST_CASE(test_CheckQueue_Memory) { auto queue = MakeUnique(QUEUE_BATCH_SIZE); - boost::thread_group tg; - for (auto x = 0; x < SCRIPT_CHECK_THREADS; ++x) { - tg.create_thread([&]{queue->Thread();}); - } + queue->StartWorkerThreads(SCRIPT_CHECK_THREADS); for (size_t i = 0; i < 1000; ++i) { size_t total = i; { @@ -335,8 +314,7 @@ BOOST_AUTO_TEST_CASE(test_CheckQueue_Memory) } BOOST_REQUIRE_EQUAL(MemoryCheck::fake_allocated_memory, 0U); } - tg.interrupt_all(); - tg.join_all(); + queue->StopWorkerThreads(); } // Test that a new verification cannot occur until all checks @@ -344,11 +322,8 @@ BOOST_AUTO_TEST_CASE(test_CheckQueue_Memory) BOOST_AUTO_TEST_CASE(test_CheckQueue_FrozenCleanup) { auto queue = MakeUnique(QUEUE_BATCH_SIZE); - boost::thread_group tg; bool fails = false; - for (auto x = 0; x < SCRIPT_CHECK_THREADS; ++x) { - tg.create_thread([&]{queue->Thread();}); - } + queue->StartWorkerThreads(SCRIPT_CHECK_THREADS); std::thread t0([&]() { CCheckQueueControl control(queue.get()); std::vector vChecks(1); @@ -367,7 +342,7 @@ BOOST_AUTO_TEST_CASE(test_CheckQueue_FrozenCleanup) } // Try to get control of the queue a bunch of times for (auto x = 0; x < 100 && !fails; ++x) { - fails = queue->ControlMutex.try_lock(); + fails = queue->m_control_mutex.try_lock(); } { // Unfreeze (we need lock n case of spurious wakeup) @@ -378,9 +353,8 @@ BOOST_AUTO_TEST_CASE(test_CheckQueue_FrozenCleanup) FrozenCleanupCheck::cv.notify_one(); // Wait for control to finish t0.join(); - tg.interrupt_all(); - tg.join_all(); BOOST_REQUIRE(!fails); + queue->StopWorkerThreads(); } @@ -431,7 +405,7 @@ BOOST_AUTO_TEST_CASE(test_CheckQueueControl_Locks) cv.wait(l, [&](){return has_lock;}); bool fails = false; for (auto x = 0; x < 100 && !fails; ++x) { - fails = queue->ControlMutex.try_lock(); + fails = queue->m_control_mutex.try_lock(); } has_tried = true; cv.notify_one(); @@ -445,4 +419,3 @@ BOOST_AUTO_TEST_CASE(test_CheckQueueControl_Locks) } } BOOST_AUTO_TEST_SUITE_END() - diff --git a/src/test/transaction_tests.cpp b/src/test/transaction_tests.cpp index 5b35ed6976..3d06568be1 100644 --- a/src/test/transaction_tests.cpp +++ b/src/test/transaction_tests.cpp @@ -428,12 +428,10 @@ BOOST_AUTO_TEST_CASE(test_big_witness_transaction) // check all inputs concurrently, with the cache PrecomputedTransactionData txdata(tx); - boost::thread_group threadGroup; CCheckQueue scriptcheckqueue(128); CCheckQueueControl control(&scriptcheckqueue); - for (int i=0; i<20; i++) - threadGroup.create_thread(std::bind(&CCheckQueue::Thread, std::ref(scriptcheckqueue))); + scriptcheckqueue.StartWorkerThreads(20); std::vector coins; for(uint32_t i = 0; i < mtx.vin.size(); i++) { @@ -455,9 +453,7 @@ BOOST_AUTO_TEST_CASE(test_big_witness_transaction) bool controlCheck = control.Wait(); assert(controlCheck); - - threadGroup.interrupt_all(); - threadGroup.join_all(); + scriptcheckqueue.StopWorkerThreads(); } SignatureData CombineSignatures(const CMutableTransaction& input1, const CMutableTransaction& input2, const CTransactionRef tx) diff --git a/src/test/util/setup_common.cpp b/src/test/util/setup_common.cpp index 738f414cd0..8a9694f55b 100644 --- a/src/test/util/setup_common.cpp +++ b/src/test/util/setup_common.cpp @@ -143,9 +143,7 @@ ChainTestingSetup::ChainTestingSetup(const std::string& chainName, const std::ve // Start script-checking threads. Set g_parallel_script_checks to true so they are used. constexpr int script_check_threads = 2; - for (int i = 0; i < script_check_threads; ++i) { - threadGroup.create_thread([i]() { return ThreadScriptCheck(i); }); - } + StartScriptCheckWorkerThreads(script_check_threads); g_parallel_script_checks = true; } @@ -154,6 +152,7 @@ ChainTestingSetup::~ChainTestingSetup() if (m_node.scheduler) m_node.scheduler->stop(); threadGroup.interrupt_all(); threadGroup.join_all(); + StopScriptCheckWorkerThreads(); GetMainSignals().FlushBackgroundCallbacks(); GetMainSignals().UnregisterBackgroundSignalScheduler(); m_node.connman.reset(); diff --git a/src/validation.cpp b/src/validation.cpp index a22c75a686..eeb99f0f0a 100644 --- a/src/validation.cpp +++ b/src/validation.cpp @@ -1763,9 +1763,14 @@ static bool WriteUndoDataForBlock(const CBlockUndo& blockundo, BlockValidationSt static CCheckQueue scriptcheckqueue(128); -void ThreadScriptCheck(int worker_num) { - util::ThreadRename(strprintf("scriptch.%i", worker_num)); - scriptcheckqueue.Thread(); +void StartScriptCheckWorkerThreads(int threads_num) +{ + scriptcheckqueue.StartWorkerThreads(threads_num); +} + +void StopScriptCheckWorkerThreads() +{ + scriptcheckqueue.StopWorkerThreads(); } VersionBitsCache versionbitscache GUARDED_BY(cs_main); diff --git a/src/validation.h b/src/validation.h index 1b4e40cd53..d3fbabe1a2 100644 --- a/src/validation.h +++ b/src/validation.h @@ -149,8 +149,10 @@ void LoadExternalBlockFile(const CChainParams& chainparams, FILE* fileIn, FlatFi bool LoadGenesisBlock(const CChainParams& chainparams); /** Unload database information */ void UnloadBlockIndex(CTxMemPool* mempool, ChainstateManager& chainman); -/** Run an instance of the script checking thread */ -void ThreadScriptCheck(int worker_num); +/** Run instances of script checking worker threads */ +void StartScriptCheckWorkerThreads(int threads_num); +/** Stop all of the script checking worker threads */ +void StopScriptCheckWorkerThreads(); /** * Return transaction from the block at block_index. * If block_index is not provided, fall back to mempool. diff --git a/test/lint/lint-includes.sh b/test/lint/lint-includes.sh index dc032665e4..262e9d4c79 100755 --- a/test/lint/lint-includes.sh +++ b/test/lint/lint-includes.sh @@ -68,7 +68,6 @@ EXPECTED_BOOST_INCLUDES=( boost/signals2/signal.hpp boost/test/unit_test.hpp boost/thread/condition_variable.hpp - boost/thread/mutex.hpp boost/thread/shared_mutex.hpp boost/thread/thread.hpp )