Branch data Line data Source code
1 : : // Copyright (c) 2012-2022 The Bitcoin Core developers 2 : : // Distributed under the MIT software license, see the accompanying 3 : : // file COPYING or http://www.opensource.org/licenses/mit-license.php. 4 : : 5 : : #ifndef BITCOIN_CHECKQUEUE_H 6 : : #define BITCOIN_CHECKQUEUE_H 7 : : 8 : : #include <sync.h> 9 : : #include <tinyformat.h> 10 : : #include <util/threadnames.h> 11 : : 12 : : #include <algorithm> 13 : : #include <iterator> 14 : : #include <vector> 15 : : 16 : : template <typename T> 17 : : class CCheckQueueControl; 18 : : 19 : : /** 20 : : * Queue for verifications that have to be performed. 21 : : * The verifications are represented by a type T, which must provide an 22 : : * operator(), returning a bool. 23 : : * 24 : : * One thread (the master) is assumed to push batches of verifications 25 : : * onto the queue, where they are processed by N-1 worker threads. When 26 : : * the master is done adding work, it temporarily joins the worker pool 27 : : * as an N'th worker, until all jobs are done. 28 : : */ 29 : : template <typename T> 30 : : class CCheckQueue 31 : : { 32 : : private: 33 : : //! Mutex to protect the inner state 34 : : Mutex m_mutex; 35 : : 36 : : //! Worker threads block on this when out of work 37 : : std::condition_variable m_worker_cv; 38 : : 39 : : //! Master thread blocks on this when out of work 40 : : std::condition_variable m_master_cv; 41 : : 42 : : //! The queue of elements to be processed. 43 : : //! As the order of booleans doesn't matter, it is used as a LIFO (stack) 44 : : std::vector<T> queue GUARDED_BY(m_mutex); 45 : : 46 : : //! The number of workers (including the master) that are idle. 47 : 2 : int nIdle GUARDED_BY(m_mutex){0}; 48 : : 49 : : //! The total number of workers (including the master). 50 : 2 : int nTotal GUARDED_BY(m_mutex){0}; 51 : : 52 : : //! The temporary evaluation result. 53 : 2 : bool fAllOk GUARDED_BY(m_mutex){true}; 54 : : 55 : : /** 56 : : * Number of verifications that haven't completed yet. 57 : : * This includes elements that are no longer queued, but still in the 58 : : * worker's own batches. 59 : : */ 60 : 2 : unsigned int nTodo GUARDED_BY(m_mutex){0}; 61 : : 62 : : //! The maximum number of elements to be processed in one batch 63 : : const unsigned int nBatchSize; 64 : : 65 : : std::vector<std::thread> m_worker_threads; 66 : 2 : bool m_request_stop GUARDED_BY(m_mutex){false}; 67 : : 68 : : /** Internal function that does bulk of the verification work. */ 69 : 402 : bool Loop(bool fMaster) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) 70 : : { 71 [ + + ]: 402 : std::condition_variable& cond = fMaster ? m_master_cv : m_worker_cv; 72 : 402 : std::vector<T> vChecks; 73 [ + - ]: 402 : vChecks.reserve(nBatchSize); 74 : 402 : unsigned int nNow = 0; 75 : 402 : bool fOk = true; 76 : 402 : do { 77 : : { 78 [ + - ]: 402 : WAIT_LOCK(m_mutex, lock); 79 : : // first do the clean-up of the previous loop run (allowing us to do it in the same critsect) 80 [ - + ]: 402 : if (nNow) { 81 : 0 : fAllOk &= fOk; 82 : 0 : nTodo -= nNow; 83 [ # # ][ # # ]: 0 : if (nTodo == 0 && !fMaster) 84 : : // We processed the last element; inform the master it can exit and return the result 85 : 0 : m_master_cv.notify_one(); 86 : 0 : } else { 87 : : // first iteration 88 : 402 : nTotal++; 89 : : } 90 : : // logically, the do loop starts here 91 [ - + ][ + + ]: 404 : while (queue.empty() && !m_request_stop) { 92 [ + + ][ - + ]: 402 : if (fMaster && nTodo == 0) { 93 : 400 : nTotal--; 94 : 400 : bool fRet = fAllOk; 95 : : // reset the status for new work later 96 : 400 : fAllOk = true; 97 : : // return the current status 98 : 400 : return fRet; 99 : : } 100 : 2 : nIdle++; 101 [ + - ]: 2 : cond.wait(lock); // wait 102 : 2 : nIdle--; 103 : : } 104 [ - + ]: 2 : if (m_request_stop) { 105 : 2 : return false; 106 : : } 107 : : 108 : : // Decide how many work units to process now. 109 : : // * Do not try to do everything at once, but aim for increasingly smaller batches so 110 : : // all workers finish approximately simultaneously. 111 : : // * Try to account for idle jobs which will instantly start helping. 112 : : // * Don't do batches smaller than 1 (duh), or larger than nBatchSize. 113 [ # # ][ # # ]: 0 : nNow = std::max(1U, std::min(nBatchSize, (unsigned int)queue.size() / (nTotal + nIdle + 1))); 114 : 0 : auto start_it = queue.end() - nNow; 115 [ # # ][ # # ]: 0 : vChecks.assign(std::make_move_iterator(start_it), std::make_move_iterator(queue.end())); [ # # ] 116 [ # # ]: 0 : queue.erase(start_it, queue.end()); 117 : : // Check whether we need to do work at all 118 : 0 : fOk = fAllOk; 119 [ - + ]: 402 : } 120 : : // execute work 121 [ # # ]: 0 : for (T& check : vChecks) 122 [ # # ]: 0 : if (fOk) 123 [ # # ]: 0 : fOk = check(); 124 : 0 : vChecks.clear(); 125 [ # # ]: 0 : } while (true); 126 : 402 : } 127 : : 128 : : public: 129 : : //! Mutex to ensure only one concurrent CCheckQueueControl 130 : : Mutex m_control_mutex; 131 : : 132 : : //! Create a new check queue 133 : 6 : explicit CCheckQueue(unsigned int nBatchSizeIn) 134 : 2 : : nBatchSize(nBatchSizeIn) 135 : : { 136 : 2 : } 137 : : 138 : : //! Create a pool of new worker threads. 139 : 1 : void StartWorkerThreads(const int threads_num) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) 140 : : { 141 : : { 142 : 1 : LOCK(m_mutex); 143 : 1 : nIdle = 0; 144 : 1 : nTotal = 0; 145 : 1 : fAllOk = true; 146 : 1 : } 147 [ + - ]: 1 : assert(m_worker_threads.empty()); 148 [ + + ]: 3 : for (int n = 0; n < threads_num; ++n) { 149 : 4 : m_worker_threads.emplace_back([this, n]() { 150 [ + - ]: 2 : util::ThreadRename(strprintf("scriptch.%i", n)); 151 : 2 : Loop(false /* worker thread */); 152 : 2 : }); 153 : 2 : } 154 : 1 : } 155 : : 156 : : //! Wait until execution finishes, and return whether all evaluations were successful. 157 : 400 : bool Wait() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) 158 : : { 159 : 400 : return Loop(true /* master thread */); 160 : : } 161 : : 162 : : //! Add a batch of checks to the queue 163 : 0 : void Add(std::vector<T>&& vChecks) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) 164 : : { 165 [ # # ]: 0 : if (vChecks.empty()) { 166 : 0 : return; 167 : : } 168 : : 169 : : { 170 : 0 : LOCK(m_mutex); 171 [ # # ][ # # ]: 0 : queue.insert(queue.end(), std::make_move_iterator(vChecks.begin()), std::make_move_iterator(vChecks.end())); [ # # ] 172 : 0 : nTodo += vChecks.size(); 173 : 0 : } 174 : : 175 [ # # ]: 0 : if (vChecks.size() == 1) { 176 : 0 : m_worker_cv.notify_one(); 177 : 0 : } else { 178 : 0 : m_worker_cv.notify_all(); 179 : : } 180 : 0 : } 181 : : 182 : : //! Stop all of the worker threads. 183 : 1 : void StopWorkerThreads() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) 184 : : { 185 : 2 : WITH_LOCK(m_mutex, m_request_stop = true); 186 : 1 : m_worker_cv.notify_all(); 187 [ + + ]: 3 : for (std::thread& t : m_worker_threads) { 188 : 2 : t.join(); 189 : : } 190 : 1 : m_worker_threads.clear(); 191 : 2 : WITH_LOCK(m_mutex, m_request_stop = false); 192 : 1 : } 193 : : 194 : 401 : bool HasThreads() const { return !m_worker_threads.empty(); } 195 : : 196 : 2 : ~CCheckQueue() 197 : : { 198 [ + - ]: 2 : assert(m_worker_threads.empty()); 199 : 2 : } 200 : : }; 201 : : 202 : : /** 203 : : * RAII-style controller object for a CCheckQueue that guarantees the passed 204 : : * queue is finished before continuing. 205 : : */ 206 : : template <typename T> 207 : : class CCheckQueueControl 208 : : { 209 : : private: 210 : : CCheckQueue<T> * const pqueue; 211 : : bool fDone; 212 : : 213 : : public: 214 : : CCheckQueueControl() = delete; 215 : : CCheckQueueControl(const CCheckQueueControl&) = delete; 216 : : CCheckQueueControl& operator=(const CCheckQueueControl&) = delete; 217 : 400 : explicit CCheckQueueControl(CCheckQueue<T> * const pqueueIn) : pqueue(pqueueIn), fDone(false) 218 : : { 219 : : // passed queue is supposed to be unused, or nullptr 220 [ + - ]: 400 : if (pqueue != nullptr) { 221 : 400 : ENTER_CRITICAL_SECTION(pqueue->m_control_mutex); 222 : 400 : } 223 : 400 : } 224 : : 225 : 400 : bool Wait() 226 : : { 227 [ - + ]: 400 : if (pqueue == nullptr) 228 : 0 : return true; 229 : 400 : bool fRet = pqueue->Wait(); 230 : 400 : fDone = true; 231 : 400 : return fRet; 232 : 400 : } 233 : : 234 : 0 : void Add(std::vector<T>&& vChecks) 235 : : { 236 [ # # ]: 0 : if (pqueue != nullptr) { 237 : 0 : pqueue->Add(std::move(vChecks)); 238 : 0 : } 239 : 0 : } 240 : : 241 : 400 : ~CCheckQueueControl() 242 : : { 243 [ - + ]: 400 : if (!fDone) 244 [ # # ]: 0 : Wait(); 245 [ - + ]: 400 : if (pqueue != nullptr) { 246 [ + - ][ + - ]: 400 : LEAVE_CRITICAL_SECTION(pqueue->m_control_mutex); 247 : 400 : } 248 : 400 : } 249 : : }; 250 : : 251 : : #endif // BITCOIN_CHECKQUEUE_H