Line data Source code
1 : // Copyright (c) 2012-2022 The Bitcoin Core developers 2 : // Distributed under the MIT software license, see the accompanying 3 : // file COPYING or http://www.opensource.org/licenses/mit-license.php. 4 : 5 : #ifndef BITCOIN_CHECKQUEUE_H 6 : #define BITCOIN_CHECKQUEUE_H 7 : 8 : #include <sync.h> 9 : #include <tinyformat.h> 10 : #include <util/threadnames.h> 11 : 12 : #include <algorithm> 13 : #include <iterator> 14 : #include <vector> 15 : 16 : template <typename T> 17 : class CCheckQueueControl; 18 : 19 : /** 20 : * Queue for verifications that have to be performed. 21 : * The verifications are represented by a type T, which must provide an 22 : * operator(), returning a bool. 23 : * 24 : * One thread (the master) is assumed to push batches of verifications 25 : * onto the queue, where they are processed by N-1 worker threads. When 26 : * the master is done adding work, it temporarily joins the worker pool 27 : * as an N'th worker, until all jobs are done. 28 : */ 29 : template <typename T> 30 : class CCheckQueue 31 : { 32 : private: 33 : //! Mutex to protect the inner state 34 : Mutex m_mutex; 35 : 36 : //! Worker threads block on this when out of work 37 : std::condition_variable m_worker_cv; 38 : 39 : //! Master thread blocks on this when out of work 40 : std::condition_variable m_master_cv; 41 : 42 : //! The queue of elements to be processed. 43 : //! As the order of booleans doesn't matter, it is used as a LIFO (stack) 44 : std::vector<T> queue GUARDED_BY(m_mutex); 45 : 46 : //! The number of workers (including the master) that are idle. 47 2 : int nIdle GUARDED_BY(m_mutex){0}; 48 : 49 : //! The total number of workers (including the master). 50 2 : int nTotal GUARDED_BY(m_mutex){0}; 51 : 52 : //! The temporary evaluation result. 53 2 : bool fAllOk GUARDED_BY(m_mutex){true}; 54 : 55 : /** 56 : * Number of verifications that haven't completed yet. 57 : * This includes elements that are no longer queued, but still in the 58 : * worker's own batches. 59 : */ 60 2 : unsigned int nTodo GUARDED_BY(m_mutex){0}; 61 : 62 : //! The maximum number of elements to be processed in one batch 63 : const unsigned int nBatchSize; 64 : 65 : std::vector<std::thread> m_worker_threads; 66 2 : bool m_request_stop GUARDED_BY(m_mutex){false}; 67 : 68 : /** Internal function that does bulk of the verification work. */ 69 5261 : bool Loop(bool fMaster) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) 70 : { 71 5261 : std::condition_variable& cond = fMaster ? m_master_cv : m_worker_cv; 72 5261 : std::vector<T> vChecks; 73 5261 : vChecks.reserve(nBatchSize); 74 5261 : unsigned int nNow = 0; 75 5261 : bool fOk = true; 76 5261 : do { 77 : { 78 5261 : WAIT_LOCK(m_mutex, lock); 79 : // first do the clean-up of the previous loop run (allowing us to do it in the same critsect) 80 5261 : if (nNow) { 81 0 : fAllOk &= fOk; 82 0 : nTodo -= nNow; 83 0 : if (nTodo == 0 && !fMaster) 84 : // We processed the last element; inform the master it can exit and return the result 85 0 : m_master_cv.notify_one(); 86 0 : } else { 87 : // first iteration 88 5261 : nTotal++; 89 : } 90 : // logically, the do loop starts here 91 5263 : while (queue.empty() && !m_request_stop) { 92 5261 : if (fMaster && nTodo == 0) { 93 5259 : nTotal--; 94 5259 : bool fRet = fAllOk; 95 : // reset the status for new work later 96 5259 : fAllOk = true; 97 : // return the current status 98 5259 : return fRet; 99 : } 100 2 : nIdle++; 101 2 : cond.wait(lock); // wait 102 2 : nIdle--; 103 : } 104 2 : if (m_request_stop) { 105 2 : return false; 106 : } 107 : 108 : // Decide how many work units to process now. 109 : // * Do not try to do everything at once, but aim for increasingly smaller batches so 110 : // all workers finish approximately simultaneously. 111 : // * Try to account for idle jobs which will instantly start helping. 112 : // * Don't do batches smaller than 1 (duh), or larger than nBatchSize. 113 0 : nNow = std::max(1U, std::min(nBatchSize, (unsigned int)queue.size() / (nTotal + nIdle + 1))); 114 0 : auto start_it = queue.end() - nNow; 115 0 : vChecks.assign(std::make_move_iterator(start_it), std::make_move_iterator(queue.end())); 116 0 : queue.erase(start_it, queue.end()); 117 : // Check whether we need to do work at all 118 0 : fOk = fAllOk; 119 5261 : } 120 : // execute work 121 0 : for (T& check : vChecks) 122 0 : if (fOk) 123 0 : fOk = check(); 124 0 : vChecks.clear(); 125 0 : } while (true); 126 5261 : } 127 : 128 : public: 129 : //! Mutex to ensure only one concurrent CCheckQueueControl 130 : Mutex m_control_mutex; 131 : 132 : //! Create a new check queue 133 6 : explicit CCheckQueue(unsigned int nBatchSizeIn) 134 2 : : nBatchSize(nBatchSizeIn) 135 : { 136 2 : } 137 : 138 : //! Create a pool of new worker threads. 139 1 : void StartWorkerThreads(const int threads_num) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) 140 : { 141 : { 142 1 : LOCK(m_mutex); 143 1 : nIdle = 0; 144 1 : nTotal = 0; 145 1 : fAllOk = true; 146 1 : } 147 1 : assert(m_worker_threads.empty()); 148 3 : for (int n = 0; n < threads_num; ++n) { 149 4 : m_worker_threads.emplace_back([this, n]() { 150 2 : util::ThreadRename(strprintf("scriptch.%i", n)); 151 2 : Loop(false /* worker thread */); 152 2 : }); 153 2 : } 154 1 : } 155 : 156 : //! Wait until execution finishes, and return whether all evaluations were successful. 157 5259 : bool Wait() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) 158 : { 159 5259 : return Loop(true /* master thread */); 160 : } 161 : 162 : //! Add a batch of checks to the queue 163 815 : void Add(std::vector<T>&& vChecks) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) 164 : { 165 815 : if (vChecks.empty()) { 166 815 : return; 167 : } 168 : 169 : { 170 0 : LOCK(m_mutex); 171 0 : queue.insert(queue.end(), std::make_move_iterator(vChecks.begin()), std::make_move_iterator(vChecks.end())); 172 0 : nTodo += vChecks.size(); 173 0 : } 174 : 175 0 : if (vChecks.size() == 1) { 176 0 : m_worker_cv.notify_one(); 177 0 : } else { 178 0 : m_worker_cv.notify_all(); 179 : } 180 815 : } 181 : 182 : //! Stop all of the worker threads. 183 1 : void StopWorkerThreads() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) 184 : { 185 2 : WITH_LOCK(m_mutex, m_request_stop = true); 186 1 : m_worker_cv.notify_all(); 187 3 : for (std::thread& t : m_worker_threads) { 188 2 : t.join(); 189 : } 190 1 : m_worker_threads.clear(); 191 2 : WITH_LOCK(m_mutex, m_request_stop = false); 192 1 : } 193 : 194 5260 : bool HasThreads() const { return !m_worker_threads.empty(); } 195 : 196 2 : ~CCheckQueue() 197 : { 198 2 : assert(m_worker_threads.empty()); 199 2 : } 200 : }; 201 : 202 : /** 203 : * RAII-style controller object for a CCheckQueue that guarantees the passed 204 : * queue is finished before continuing. 205 : */ 206 : template <typename T> 207 : class CCheckQueueControl 208 : { 209 : private: 210 : CCheckQueue<T> * const pqueue; 211 : bool fDone; 212 : 213 : public: 214 : CCheckQueueControl() = delete; 215 : CCheckQueueControl(const CCheckQueueControl&) = delete; 216 : CCheckQueueControl& operator=(const CCheckQueueControl&) = delete; 217 5259 : explicit CCheckQueueControl(CCheckQueue<T> * const pqueueIn) : pqueue(pqueueIn), fDone(false) 218 : { 219 : // passed queue is supposed to be unused, or nullptr 220 5259 : if (pqueue != nullptr) { 221 5259 : ENTER_CRITICAL_SECTION(pqueue->m_control_mutex); 222 5259 : } 223 5259 : } 224 : 225 5259 : bool Wait() 226 : { 227 5259 : if (pqueue == nullptr) 228 0 : return true; 229 5259 : bool fRet = pqueue->Wait(); 230 5259 : fDone = true; 231 5259 : return fRet; 232 5259 : } 233 : 234 815 : void Add(std::vector<T>&& vChecks) 235 : { 236 815 : if (pqueue != nullptr) { 237 815 : pqueue->Add(std::move(vChecks)); 238 815 : } 239 815 : } 240 : 241 5259 : ~CCheckQueueControl() 242 : { 243 5259 : if (!fDone) 244 0 : Wait(); 245 5259 : if (pqueue != nullptr) { 246 5259 : LEAVE_CRITICAL_SECTION(pqueue->m_control_mutex); 247 5259 : } 248 5259 : } 249 : }; 250 : 251 : #endif // BITCOIN_CHECKQUEUE_H