LCOV - code coverage report
Current view: top level - src - checkqueue.h (source / functions) Hit Total Coverage
Test: fuzz_coverage.info Lines: 109 110 99.1 %
Date: 2023-10-05 15:40:34 Functions: 24 24 100.0 %
Branches: 57 80 71.2 %

           Branch data     Line data    Source code
       1                 :            : // Copyright (c) 2012-2022 The Bitcoin Core developers
       2                 :            : // Distributed under the MIT software license, see the accompanying
       3                 :            : // file COPYING or http://www.opensource.org/licenses/mit-license.php.
       4                 :            : 
       5                 :            : #ifndef BITCOIN_CHECKQUEUE_H
       6                 :            : #define BITCOIN_CHECKQUEUE_H
       7                 :            : 
       8                 :            : #include <sync.h>
       9                 :            : #include <tinyformat.h>
      10                 :            : #include <util/threadnames.h>
      11                 :            : 
      12                 :            : #include <algorithm>
      13                 :            : #include <iterator>
      14                 :            : #include <vector>
      15                 :            : 
      16                 :            : template <typename T>
      17                 :            : class CCheckQueueControl;
      18                 :            : 
      19                 :            : /**
      20                 :            :  * Queue for verifications that have to be performed.
      21                 :            :   * The verifications are represented by a type T, which must provide an
      22                 :            :   * operator(), returning a bool.
      23                 :            :   *
      24                 :            :   * One thread (the master) is assumed to push batches of verifications
      25                 :            :   * onto the queue, where they are processed by N-1 worker threads. When
      26                 :            :   * the master is done adding work, it temporarily joins the worker pool
      27                 :            :   * as an N'th worker, until all jobs are done.
      28                 :            :   */
      29                 :            : template <typename T>
      30                 :            : class CCheckQueue
      31                 :            : {
      32                 :            : private:
      33                 :            :     //! Mutex to protect the inner state
      34                 :            :     Mutex m_mutex;
      35                 :            : 
      36                 :            :     //! Worker threads block on this when out of work
      37                 :            :     std::condition_variable m_worker_cv;
      38                 :            : 
      39                 :            :     //! Master thread blocks on this when out of work
      40                 :            :     std::condition_variable m_master_cv;
      41                 :            : 
      42                 :            :     //! The queue of elements to be processed.
      43                 :            :     //! As the order of booleans doesn't matter, it is used as a LIFO (stack)
      44                 :            :     std::vector<T> queue GUARDED_BY(m_mutex);
      45                 :            : 
      46                 :            :     //! The number of workers (including the master) that are idle.
      47                 :        297 :     int nIdle GUARDED_BY(m_mutex){0};
      48                 :            : 
      49                 :            :     //! The total number of workers (including the master).
      50                 :        297 :     int nTotal GUARDED_BY(m_mutex){0};
      51                 :            : 
      52                 :            :     //! The temporary evaluation result.
      53                 :        297 :     bool fAllOk GUARDED_BY(m_mutex){true};
      54                 :            : 
      55                 :            :     /**
      56                 :            :      * Number of verifications that haven't completed yet.
      57                 :            :      * This includes elements that are no longer queued, but still in the
      58                 :            :      * worker's own batches.
      59                 :            :      */
      60                 :        297 :     unsigned int nTodo GUARDED_BY(m_mutex){0};
      61                 :            : 
      62                 :            :     //! The maximum number of elements to be processed in one batch
      63                 :            :     const unsigned int nBatchSize;
      64                 :            : 
      65                 :            :     std::vector<std::thread> m_worker_threads;
      66                 :        297 :     bool m_request_stop GUARDED_BY(m_mutex){false};
      67                 :            : 
      68                 :            :     /** Internal function that does bulk of the verification work. */
      69                 :      99958 :     bool Loop(bool fMaster) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
      70                 :            :     {
      71         [ +  + ]:      99958 :         std::condition_variable& cond = fMaster ? m_master_cv : m_worker_cv;
      72                 :      99958 :         std::vector<T> vChecks;
      73         [ +  - ]:      99958 :         vChecks.reserve(nBatchSize);
      74                 :      99958 :         unsigned int nNow = 0;
      75                 :      99958 :         bool fOk = true;
      76                 :      99958 :         do {
      77                 :            :             {
      78         [ +  - ]:     101426 :                 WAIT_LOCK(m_mutex, lock);
      79                 :            :                 // first do the clean-up of the previous loop run (allowing us to do it in the same critsect)
      80         [ +  + ]:     101426 :                 if (nNow) {
      81                 :       1468 :                     fAllOk &= fOk;
      82                 :       1468 :                     nTodo -= nNow;
      83   [ +  +  +  + ]:       1468 :                     if (nTodo == 0 && !fMaster)
      84                 :            :                         // We processed the last element; inform the master it can exit and return the result
      85                 :        187 :                         m_master_cv.notify_one();
      86                 :       1468 :                 } else {
      87                 :            :                     // first iteration
      88                 :      99958 :                     nTotal++;
      89                 :            :                 }
      90                 :            :                 // logically, the do loop starts here
      91   [ +  +  +  + ]:     103424 :                 while (queue.empty() && !m_request_stop) {
      92   [ +  +  +  + ]:     100284 :                     if (fMaster && nTodo == 0) {
      93                 :      98286 :                         nTotal--;
      94                 :      98286 :                         bool fRet = fAllOk;
      95                 :            :                         // reset the status for new work later
      96                 :      98286 :                         fAllOk = true;
      97                 :            :                         // return the current status
      98                 :      98286 :                         return fRet;
      99                 :            :                     }
     100                 :       1998 :                     nIdle++;
     101                 :       1998 :                     cond.wait(lock); // wait
     102                 :       1998 :                     nIdle--;
     103                 :            :                 }
     104         [ +  + ]:       3140 :                 if (m_request_stop) {
     105                 :       1672 :                     return false;
     106                 :            :                 }
     107                 :            : 
     108                 :            :                 // Decide how many work units to process now.
     109                 :            :                 // * Do not try to do everything at once, but aim for increasingly smaller batches so
     110                 :            :                 //   all workers finish approximately simultaneously.
     111                 :            :                 // * Try to account for idle jobs which will instantly start helping.
     112                 :            :                 // * Don't do batches smaller than 1 (duh), or larger than nBatchSize.
     113   [ +  -  +  - ]:       1468 :                 nNow = std::max(1U, std::min(nBatchSize, (unsigned int)queue.size() / (nTotal + nIdle + 1)));
     114                 :       1468 :                 auto start_it = queue.end() - nNow;
     115   [ +  -  +  -  :       1468 :                 vChecks.assign(std::make_move_iterator(start_it), std::make_move_iterator(queue.end()));
                   +  - ]
     116         [ +  - ]:       1468 :                 queue.erase(start_it, queue.end());
     117                 :            :                 // Check whether we need to do work at all
     118                 :       1468 :                 fOk = fAllOk;
     119         [ +  + ]:     101426 :             }
     120                 :            :             // execute work
     121         [ +  + ]:      16586 :             for (T& check : vChecks)
     122         [ +  + ]:      15118 :                 if (fOk)
     123         [ +  - ]:       1386 :                     fOk = check();
     124                 :       1468 :             vChecks.clear();
     125         [ +  - ]:       1468 :         } while (true);
     126                 :      99958 :     }
     127                 :            : 
     128                 :            : public:
     129                 :            :     //! Mutex to ensure only one concurrent CCheckQueueControl
     130                 :            :     Mutex m_control_mutex;
     131                 :            : 
     132                 :            :     //! Create a new check queue
     133                 :        891 :     explicit CCheckQueue(unsigned int nBatchSizeIn)
     134                 :        297 :         : nBatchSize(nBatchSizeIn)
     135                 :            :     {
     136                 :        297 :     }
     137                 :            : 
     138                 :            :     //! Create a pool of new worker threads.
     139                 :        836 :     void StartWorkerThreads(const int threads_num) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
     140                 :            :     {
     141                 :            :         {
     142                 :        836 :             LOCK(m_mutex);
     143                 :        836 :             nIdle = 0;
     144                 :        836 :             nTotal = 0;
     145                 :        836 :             fAllOk = true;
     146                 :        836 :         }
     147         [ +  - ]:        836 :         assert(m_worker_threads.empty());
     148         [ +  + ]:       2508 :         for (int n = 0; n < threads_num; ++n) {
     149                 :       3344 :             m_worker_threads.emplace_back([this, n]() {
     150         [ +  - ]:       1672 :                 util::ThreadRename(strprintf("scriptch.%i", n));
     151                 :       1672 :                 Loop(false /* worker thread */);
     152                 :       1672 :             });
     153                 :       1672 :         }
     154                 :        836 :     }
     155                 :            : 
     156                 :            :     //! Wait until execution finishes, and return whether all evaluations were successful.
     157                 :      98286 :     bool Wait() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
     158                 :            :     {
     159                 :      98286 :         return Loop(true /* master thread */);
     160                 :            :     }
     161                 :            : 
     162                 :            :     //! Add a batch of checks to the queue
     163                 :       9841 :     void Add(std::vector<T>&& vChecks) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
     164                 :            :     {
     165         [ +  + ]:       9841 :         if (vChecks.empty()) {
     166                 :       9501 :             return;
     167                 :            :         }
     168                 :            : 
     169                 :            :         {
     170                 :        340 :             LOCK(m_mutex);
     171   [ +  -  +  -  :        340 :             queue.insert(queue.end(), std::make_move_iterator(vChecks.begin()), std::make_move_iterator(vChecks.end()));
                   +  - ]
     172                 :        340 :             nTodo += vChecks.size();
     173                 :        340 :         }
     174                 :            : 
     175         [ +  + ]:        340 :         if (vChecks.size() == 1) {
     176                 :        251 :             m_worker_cv.notify_one();
     177                 :        251 :         } else {
     178                 :         89 :             m_worker_cv.notify_all();
     179                 :            :         }
     180                 :       9841 :     }
     181                 :            : 
     182                 :            :     //! Stop all of the worker threads.
     183                 :        836 :     void StopWorkerThreads() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
     184                 :            :     {
     185                 :       1672 :         WITH_LOCK(m_mutex, m_request_stop = true);
     186                 :        836 :         m_worker_cv.notify_all();
     187         [ +  + ]:       2508 :         for (std::thread& t : m_worker_threads) {
     188                 :       1672 :             t.join();
     189                 :            :         }
     190                 :        836 :         m_worker_threads.clear();
     191                 :       1672 :         WITH_LOCK(m_mutex, m_request_stop = false);
     192                 :        836 :     }
     193                 :            : 
     194                 :      99089 :     bool HasThreads() const { return !m_worker_threads.empty(); }
     195                 :            : 
     196                 :        297 :     ~CCheckQueue()
     197                 :            :     {
     198         [ +  - ]:        297 :         assert(m_worker_threads.empty());
     199                 :        297 :     }
     200                 :            : };
     201                 :            : 
     202                 :            : /**
     203                 :            :  * RAII-style controller object for a CCheckQueue that guarantees the passed
     204                 :            :  * queue is finished before continuing.
     205                 :            :  */
     206                 :            : template <typename T>
     207                 :            : class CCheckQueueControl
     208                 :            : {
     209                 :            : private:
     210                 :            :     CCheckQueue<T> * const pqueue;
     211                 :            :     bool fDone;
     212                 :            : 
     213                 :            : public:
     214                 :            :     CCheckQueueControl() = delete;
     215                 :            :     CCheckQueueControl(const CCheckQueueControl&) = delete;
     216                 :            :     CCheckQueueControl& operator=(const CCheckQueueControl&) = delete;
     217                 :      98237 :     explicit CCheckQueueControl(CCheckQueue<T> * const pqueueIn) : pqueue(pqueueIn), fDone(false)
     218                 :            :     {
     219                 :            :         // passed queue is supposed to be unused, or nullptr
     220         [ +  - ]:      98237 :         if (pqueue != nullptr) {
     221                 :      98237 :             ENTER_CRITICAL_SECTION(pqueue->m_control_mutex);
     222                 :      98237 :         }
     223                 :      98237 :     }
     224                 :            : 
     225                 :      98237 :     bool Wait()
     226                 :            :     {
     227         [ -  + ]:      98237 :         if (pqueue == nullptr)
     228                 :          0 :             return true;
     229                 :      98237 :         bool fRet = pqueue->Wait();
     230                 :      98237 :         fDone = true;
     231                 :      98237 :         return fRet;
     232                 :      98237 :     }
     233                 :            : 
     234                 :       9789 :     void Add(std::vector<T>&& vChecks)
     235                 :            :     {
     236         [ +  - ]:       9789 :         if (pqueue != nullptr) {
     237                 :       9789 :             pqueue->Add(std::move(vChecks));
     238                 :       9789 :         }
     239                 :       9789 :     }
     240                 :            : 
     241                 :      98237 :     ~CCheckQueueControl()
     242                 :            :     {
     243         [ +  + ]:      98237 :         if (!fDone)
     244         [ +  - ]:       3842 :             Wait();
     245         [ -  + ]:      98237 :         if (pqueue != nullptr) {
     246   [ +  -  +  - ]:      98237 :             LEAVE_CRITICAL_SECTION(pqueue->m_control_mutex);
     247                 :      98237 :         }
     248                 :      98237 :     }
     249                 :            : };
     250                 :            : 
     251                 :            : #endif // BITCOIN_CHECKQUEUE_H

Generated by: LCOV version 1.14