LCOV - code coverage report
Current view: top level - src - checkqueue.h (source / functions) Hit Total Coverage
Test: fuzz_coverage.info Lines: 85 110 77.3 %
Date: 2023-09-26 12:08:55 Functions: 15 24 62.5 %

          Line data    Source code
       1             : // Copyright (c) 2012-2022 The Bitcoin Core developers
       2             : // Distributed under the MIT software license, see the accompanying
       3             : // file COPYING or http://www.opensource.org/licenses/mit-license.php.
       4             : 
       5             : #ifndef BITCOIN_CHECKQUEUE_H
       6             : #define BITCOIN_CHECKQUEUE_H
       7             : 
       8             : #include <sync.h>
       9             : #include <tinyformat.h>
      10             : #include <util/threadnames.h>
      11             : 
      12             : #include <algorithm>
      13             : #include <iterator>
      14             : #include <vector>
      15             : 
      16             : template <typename T>
      17             : class CCheckQueueControl;
      18             : 
      19             : /**
      20             :  * Queue for verifications that have to be performed.
      21             :   * The verifications are represented by a type T, which must provide an
      22             :   * operator(), returning a bool.
      23             :   *
      24             :   * One thread (the master) is assumed to push batches of verifications
      25             :   * onto the queue, where they are processed by N-1 worker threads. When
      26             :   * the master is done adding work, it temporarily joins the worker pool
      27             :   * as an N'th worker, until all jobs are done.
      28             :   */
      29             : template <typename T>
      30             : class CCheckQueue
      31             : {
      32             : private:
      33             :     //! Mutex to protect the inner state
      34             :     Mutex m_mutex;
      35             : 
      36             :     //! Worker threads block on this when out of work
      37             :     std::condition_variable m_worker_cv;
      38             : 
      39             :     //! Master thread blocks on this when out of work
      40             :     std::condition_variable m_master_cv;
      41             : 
      42             :     //! The queue of elements to be processed.
      43             :     //! As the order of booleans doesn't matter, it is used as a LIFO (stack)
      44             :     std::vector<T> queue GUARDED_BY(m_mutex);
      45             : 
      46             :     //! The number of workers (including the master) that are idle.
      47           2 :     int nIdle GUARDED_BY(m_mutex){0};
      48             : 
      49             :     //! The total number of workers (including the master).
      50           2 :     int nTotal GUARDED_BY(m_mutex){0};
      51             : 
      52             :     //! The temporary evaluation result.
      53           2 :     bool fAllOk GUARDED_BY(m_mutex){true};
      54             : 
      55             :     /**
      56             :      * Number of verifications that haven't completed yet.
      57             :      * This includes elements that are no longer queued, but still in the
      58             :      * worker's own batches.
      59             :      */
      60           2 :     unsigned int nTodo GUARDED_BY(m_mutex){0};
      61             : 
      62             :     //! The maximum number of elements to be processed in one batch
      63             :     const unsigned int nBatchSize;
      64             : 
      65             :     std::vector<std::thread> m_worker_threads;
      66           2 :     bool m_request_stop GUARDED_BY(m_mutex){false};
      67             : 
      68             :     /** Internal function that does bulk of the verification work. */
      69        5261 :     bool Loop(bool fMaster) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
      70             :     {
      71        5261 :         std::condition_variable& cond = fMaster ? m_master_cv : m_worker_cv;
      72        5261 :         std::vector<T> vChecks;
      73        5261 :         vChecks.reserve(nBatchSize);
      74        5261 :         unsigned int nNow = 0;
      75        5261 :         bool fOk = true;
      76        5261 :         do {
      77             :             {
      78        5261 :                 WAIT_LOCK(m_mutex, lock);
      79             :                 // first do the clean-up of the previous loop run (allowing us to do it in the same critsect)
      80        5261 :                 if (nNow) {
      81           0 :                     fAllOk &= fOk;
      82           0 :                     nTodo -= nNow;
      83           0 :                     if (nTodo == 0 && !fMaster)
      84             :                         // We processed the last element; inform the master it can exit and return the result
      85           0 :                         m_master_cv.notify_one();
      86           0 :                 } else {
      87             :                     // first iteration
      88        5261 :                     nTotal++;
      89             :                 }
      90             :                 // logically, the do loop starts here
      91        5263 :                 while (queue.empty() && !m_request_stop) {
      92        5261 :                     if (fMaster && nTodo == 0) {
      93        5259 :                         nTotal--;
      94        5259 :                         bool fRet = fAllOk;
      95             :                         // reset the status for new work later
      96        5259 :                         fAllOk = true;
      97             :                         // return the current status
      98        5259 :                         return fRet;
      99             :                     }
     100           2 :                     nIdle++;
     101           2 :                     cond.wait(lock); // wait
     102           2 :                     nIdle--;
     103             :                 }
     104           2 :                 if (m_request_stop) {
     105           2 :                     return false;
     106             :                 }
     107             : 
     108             :                 // Decide how many work units to process now.
     109             :                 // * Do not try to do everything at once, but aim for increasingly smaller batches so
     110             :                 //   all workers finish approximately simultaneously.
     111             :                 // * Try to account for idle jobs which will instantly start helping.
     112             :                 // * Don't do batches smaller than 1 (duh), or larger than nBatchSize.
     113           0 :                 nNow = std::max(1U, std::min(nBatchSize, (unsigned int)queue.size() / (nTotal + nIdle + 1)));
     114           0 :                 auto start_it = queue.end() - nNow;
     115           0 :                 vChecks.assign(std::make_move_iterator(start_it), std::make_move_iterator(queue.end()));
     116           0 :                 queue.erase(start_it, queue.end());
     117             :                 // Check whether we need to do work at all
     118           0 :                 fOk = fAllOk;
     119        5261 :             }
     120             :             // execute work
     121           0 :             for (T& check : vChecks)
     122           0 :                 if (fOk)
     123           0 :                     fOk = check();
     124           0 :             vChecks.clear();
     125           0 :         } while (true);
     126        5261 :     }
     127             : 
     128             : public:
     129             :     //! Mutex to ensure only one concurrent CCheckQueueControl
     130             :     Mutex m_control_mutex;
     131             : 
     132             :     //! Create a new check queue
     133           6 :     explicit CCheckQueue(unsigned int nBatchSizeIn)
     134           2 :         : nBatchSize(nBatchSizeIn)
     135             :     {
     136           2 :     }
     137             : 
     138             :     //! Create a pool of new worker threads.
     139           1 :     void StartWorkerThreads(const int threads_num) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
     140             :     {
     141             :         {
     142           1 :             LOCK(m_mutex);
     143           1 :             nIdle = 0;
     144           1 :             nTotal = 0;
     145           1 :             fAllOk = true;
     146           1 :         }
     147           1 :         assert(m_worker_threads.empty());
     148           3 :         for (int n = 0; n < threads_num; ++n) {
     149           4 :             m_worker_threads.emplace_back([this, n]() {
     150           2 :                 util::ThreadRename(strprintf("scriptch.%i", n));
     151           2 :                 Loop(false /* worker thread */);
     152           2 :             });
     153           2 :         }
     154           1 :     }
     155             : 
     156             :     //! Wait until execution finishes, and return whether all evaluations were successful.
     157        5259 :     bool Wait() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
     158             :     {
     159        5259 :         return Loop(true /* master thread */);
     160             :     }
     161             : 
     162             :     //! Add a batch of checks to the queue
     163         815 :     void Add(std::vector<T>&& vChecks) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
     164             :     {
     165         815 :         if (vChecks.empty()) {
     166         815 :             return;
     167             :         }
     168             : 
     169             :         {
     170           0 :             LOCK(m_mutex);
     171           0 :             queue.insert(queue.end(), std::make_move_iterator(vChecks.begin()), std::make_move_iterator(vChecks.end()));
     172           0 :             nTodo += vChecks.size();
     173           0 :         }
     174             : 
     175           0 :         if (vChecks.size() == 1) {
     176           0 :             m_worker_cv.notify_one();
     177           0 :         } else {
     178           0 :             m_worker_cv.notify_all();
     179             :         }
     180         815 :     }
     181             : 
     182             :     //! Stop all of the worker threads.
     183           1 :     void StopWorkerThreads() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
     184             :     {
     185           2 :         WITH_LOCK(m_mutex, m_request_stop = true);
     186           1 :         m_worker_cv.notify_all();
     187           3 :         for (std::thread& t : m_worker_threads) {
     188           2 :             t.join();
     189             :         }
     190           1 :         m_worker_threads.clear();
     191           2 :         WITH_LOCK(m_mutex, m_request_stop = false);
     192           1 :     }
     193             : 
     194        5260 :     bool HasThreads() const { return !m_worker_threads.empty(); }
     195             : 
     196           2 :     ~CCheckQueue()
     197             :     {
     198           2 :         assert(m_worker_threads.empty());
     199           2 :     }
     200             : };
     201             : 
     202             : /**
     203             :  * RAII-style controller object for a CCheckQueue that guarantees the passed
     204             :  * queue is finished before continuing.
     205             :  */
     206             : template <typename T>
     207             : class CCheckQueueControl
     208             : {
     209             : private:
     210             :     CCheckQueue<T> * const pqueue;
     211             :     bool fDone;
     212             : 
     213             : public:
     214             :     CCheckQueueControl() = delete;
     215             :     CCheckQueueControl(const CCheckQueueControl&) = delete;
     216             :     CCheckQueueControl& operator=(const CCheckQueueControl&) = delete;
     217        5259 :     explicit CCheckQueueControl(CCheckQueue<T> * const pqueueIn) : pqueue(pqueueIn), fDone(false)
     218             :     {
     219             :         // passed queue is supposed to be unused, or nullptr
     220        5259 :         if (pqueue != nullptr) {
     221        5259 :             ENTER_CRITICAL_SECTION(pqueue->m_control_mutex);
     222        5259 :         }
     223        5259 :     }
     224             : 
     225        5259 :     bool Wait()
     226             :     {
     227        5259 :         if (pqueue == nullptr)
     228           0 :             return true;
     229        5259 :         bool fRet = pqueue->Wait();
     230        5259 :         fDone = true;
     231        5259 :         return fRet;
     232        5259 :     }
     233             : 
     234         815 :     void Add(std::vector<T>&& vChecks)
     235             :     {
     236         815 :         if (pqueue != nullptr) {
     237         815 :             pqueue->Add(std::move(vChecks));
     238         815 :         }
     239         815 :     }
     240             : 
     241        5259 :     ~CCheckQueueControl()
     242             :     {
     243        5259 :         if (!fDone)
     244           0 :             Wait();
     245        5259 :         if (pqueue != nullptr) {
     246        5259 :             LEAVE_CRITICAL_SECTION(pqueue->m_control_mutex);
     247        5259 :         }
     248        5259 :     }
     249             : };
     250             : 
     251             : #endif // BITCOIN_CHECKQUEUE_H

Generated by: LCOV version 1.14