Branch data Line data Source code
1 : : // Copyright (c) 2012-2022 The Bitcoin Core developers 2 : : // Distributed under the MIT software license, see the accompanying 3 : : // file COPYING or http://www.opensource.org/licenses/mit-license.php. 4 : : 5 : : #ifndef BITCOIN_CHECKQUEUE_H 6 : : #define BITCOIN_CHECKQUEUE_H 7 : : 8 : : #include <sync.h> 9 : : #include <tinyformat.h> 10 : : #include <util/threadnames.h> 11 : : 12 : : #include <algorithm> 13 : : #include <iterator> 14 : : #include <vector> 15 : : 16 : : /** 17 : : * Queue for verifications that have to be performed. 18 : : * The verifications are represented by a type T, which must provide an 19 : : * operator(), returning a bool. 20 : : * 21 : : * One thread (the master) is assumed to push batches of verifications 22 : : * onto the queue, where they are processed by N-1 worker threads. When 23 : : * the master is done adding work, it temporarily joins the worker pool 24 : : * as an N'th worker, until all jobs are done. 25 : : */ 26 : : template <typename T> 27 : : class CCheckQueue 28 : : { 29 : : private: 30 : : //! Mutex to protect the inner state 31 : : Mutex m_mutex; 32 : : 33 : : //! Worker threads block on this when out of work 34 : : std::condition_variable m_worker_cv; 35 : : 36 : : //! Master thread blocks on this when out of work 37 : : std::condition_variable m_master_cv; 38 : : 39 : : //! The queue of elements to be processed. 40 : : //! As the order of booleans doesn't matter, it is used as a LIFO (stack) 41 : : std::vector<T> queue GUARDED_BY(m_mutex); 42 : : 43 : : //! The number of workers (including the master) that are idle. 44 : 0 : int nIdle GUARDED_BY(m_mutex){0}; 45 : : 46 : : //! The total number of workers (including the master). 47 : 0 : int nTotal GUARDED_BY(m_mutex){0}; 48 : : 49 : : //! The temporary evaluation result. 50 : 0 : bool fAllOk GUARDED_BY(m_mutex){true}; 51 : : 52 : : /** 53 : : * Number of verifications that haven't completed yet. 54 : : * This includes elements that are no longer queued, but still in the 55 : : * worker's own batches. 56 : : */ 57 : 0 : unsigned int nTodo GUARDED_BY(m_mutex){0}; 58 : : 59 : : //! The maximum number of elements to be processed in one batch 60 : : const unsigned int nBatchSize; 61 : : 62 : : std::vector<std::thread> m_worker_threads; 63 : 0 : bool m_request_stop GUARDED_BY(m_mutex){false}; 64 : : 65 : : /** Internal function that does bulk of the verification work. */ 66 : 0 : bool Loop(bool fMaster) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) 67 : : { 68 [ # # ]: 0 : std::condition_variable& cond = fMaster ? m_master_cv : m_worker_cv; 69 : 0 : std::vector<T> vChecks; 70 [ # # ]: 0 : vChecks.reserve(nBatchSize); 71 : 0 : unsigned int nNow = 0; 72 : 0 : bool fOk = true; 73 : 0 : do { 74 : : { 75 [ # # ][ # # ]: 0 : WAIT_LOCK(m_mutex, lock); 76 : : // first do the clean-up of the previous loop run (allowing us to do it in the same critsect) 77 [ # # ]: 0 : if (nNow) { 78 : 0 : fAllOk &= fOk; 79 : 0 : nTodo -= nNow; 80 [ # # ][ # # ]: 0 : if (nTodo == 0 && !fMaster) 81 : : // We processed the last element; inform the master it can exit and return the result 82 : 0 : m_master_cv.notify_one(); 83 : 0 : } else { 84 : : // first iteration 85 : 0 : nTotal++; 86 : : } 87 : : // logically, the do loop starts here 88 [ # # ][ # # ]: 0 : while (queue.empty() && !m_request_stop) { 89 [ # # ][ # # ]: 0 : if (fMaster && nTodo == 0) { 90 : 0 : nTotal--; 91 : 0 : bool fRet = fAllOk; 92 : : // reset the status for new work later 93 : 0 : fAllOk = true; 94 : : // return the current status 95 : 0 : return fRet; 96 : : } 97 : 0 : nIdle++; 98 [ # # ]: 0 : cond.wait(lock); // wait 99 : 0 : nIdle--; 100 : : } 101 [ # # ]: 0 : if (m_request_stop) { 102 : 0 : return false; 103 : : } 104 : : 105 : : // Decide how many work units to process now. 106 : : // * Do not try to do everything at once, but aim for increasingly smaller batches so 107 : : // all workers finish approximately simultaneously. 108 : : // * Try to account for idle jobs which will instantly start helping. 109 : : // * Don't do batches smaller than 1 (duh), or larger than nBatchSize. 110 [ # # ][ # # ]: 0 : nNow = std::max(1U, std::min(nBatchSize, (unsigned int)queue.size() / (nTotal + nIdle + 1))); 111 : 0 : auto start_it = queue.end() - nNow; 112 [ # # ][ # # ]: 0 : vChecks.assign(std::make_move_iterator(start_it), std::make_move_iterator(queue.end())); [ # # ] 113 [ # # ]: 0 : queue.erase(start_it, queue.end()); 114 : : // Check whether we need to do work at all 115 : 0 : fOk = fAllOk; 116 [ # # ]: 0 : } 117 : : // execute work 118 [ # # ]: 0 : for (T& check : vChecks) 119 [ # # ]: 0 : if (fOk) 120 [ # # ]: 0 : fOk = check(); 121 : 0 : vChecks.clear(); 122 [ # # ]: 0 : } while (true); 123 : 0 : } 124 : : 125 : : public: 126 : : //! Mutex to ensure only one concurrent CCheckQueueControl 127 : : Mutex m_control_mutex; 128 : : 129 : : //! Create a new check queue 130 : 0 : explicit CCheckQueue(unsigned int batch_size, int worker_threads_num) 131 : 0 : : nBatchSize(batch_size) 132 : : { 133 [ # # ]: 0 : m_worker_threads.reserve(worker_threads_num); 134 [ # # ]: 0 : for (int n = 0; n < worker_threads_num; ++n) { 135 [ # # ]: 0 : m_worker_threads.emplace_back([this, n]() { 136 [ # # ]: 0 : util::ThreadRename(strprintf("scriptch.%i", n)); 137 : 0 : Loop(false /* worker thread */); 138 : 0 : }); 139 : 0 : } 140 : 0 : } 141 : : 142 : : // Since this class manages its own resources, which is a thread 143 : : // pool `m_worker_threads`, copy and move operations are not appropriate. 144 : : CCheckQueue(const CCheckQueue&) = delete; 145 : : CCheckQueue& operator=(const CCheckQueue&) = delete; 146 : : CCheckQueue(CCheckQueue&&) = delete; 147 : : CCheckQueue& operator=(CCheckQueue&&) = delete; 148 : : 149 : : //! Wait until execution finishes, and return whether all evaluations were successful. 150 : 0 : bool Wait() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) 151 : : { 152 : 0 : return Loop(true /* master thread */); 153 : : } 154 : : 155 : : //! Add a batch of checks to the queue 156 : 0 : void Add(std::vector<T>&& vChecks) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) 157 : : { 158 [ # # ]: 0 : if (vChecks.empty()) { 159 : 0 : return; 160 : : } 161 : : 162 : : { 163 : 0 : LOCK(m_mutex); 164 [ # # ][ # # ]: 0 : queue.insert(queue.end(), std::make_move_iterator(vChecks.begin()), std::make_move_iterator(vChecks.end())); [ # # ] 165 : 0 : nTodo += vChecks.size(); 166 : 0 : } 167 : : 168 [ # # ]: 0 : if (vChecks.size() == 1) { 169 : 0 : m_worker_cv.notify_one(); 170 : 0 : } else { 171 : 0 : m_worker_cv.notify_all(); 172 : : } 173 : 0 : } 174 : : 175 : 0 : ~CCheckQueue() 176 : : { 177 [ # # ]: 0 : WITH_LOCK(m_mutex, m_request_stop = true); 178 : 0 : m_worker_cv.notify_all(); 179 [ # # ]: 0 : for (std::thread& t : m_worker_threads) { 180 [ # # ]: 0 : t.join(); 181 : : } 182 : 0 : } 183 : : 184 : 0 : bool HasThreads() const { return !m_worker_threads.empty(); } 185 : : }; 186 : : 187 : : /** 188 : : * RAII-style controller object for a CCheckQueue that guarantees the passed 189 : : * queue is finished before continuing. 190 : : */ 191 : : template <typename T> 192 : : class CCheckQueueControl 193 : : { 194 : : private: 195 : : CCheckQueue<T> * const pqueue; 196 : : bool fDone; 197 : : 198 : : public: 199 : : CCheckQueueControl() = delete; 200 : : CCheckQueueControl(const CCheckQueueControl&) = delete; 201 : : CCheckQueueControl& operator=(const CCheckQueueControl&) = delete; 202 : 0 : explicit CCheckQueueControl(CCheckQueue<T> * const pqueueIn) : pqueue(pqueueIn), fDone(false) 203 : : { 204 : : // passed queue is supposed to be unused, or nullptr 205 [ # # ]: 0 : if (pqueue != nullptr) { 206 : 0 : ENTER_CRITICAL_SECTION(pqueue->m_control_mutex); 207 : 0 : } 208 : 0 : } 209 : : 210 : 0 : bool Wait() 211 : : { 212 [ # # ]: 0 : if (pqueue == nullptr) 213 : 0 : return true; 214 : 0 : bool fRet = pqueue->Wait(); 215 : 0 : fDone = true; 216 : 0 : return fRet; 217 : 0 : } 218 : : 219 : 0 : void Add(std::vector<T>&& vChecks) 220 : : { 221 [ # # ]: 0 : if (pqueue != nullptr) { 222 : 0 : pqueue->Add(std::move(vChecks)); 223 : 0 : } 224 : 0 : } 225 : : 226 : 0 : ~CCheckQueueControl() 227 : : { 228 [ # # ]: 0 : if (!fDone) 229 [ # # ]: 0 : Wait(); 230 [ # # ]: 0 : if (pqueue != nullptr) { 231 [ # # ][ # # ]: 0 : LEAVE_CRITICAL_SECTION(pqueue->m_control_mutex); 232 : 0 : } 233 : 0 : } 234 : : }; 235 : : 236 : : #endif // BITCOIN_CHECKQUEUE_H