Branch data Line data Source code
1 : : // Copyright (c) 2015-2022 The Bitcoin Core developers 2 : : // Distributed under the MIT software license, see the accompanying 3 : : // file COPYING or http://www.opensource.org/licenses/mit-license.php. 4 : : 5 : : #ifndef BITCOIN_SCHEDULER_H 6 : : #define BITCOIN_SCHEDULER_H 7 : : 8 : : #include <attributes.h> 9 : : #include <sync.h> 10 : : #include <threadsafety.h> 11 : : 12 : : #include <chrono> 13 : : #include <condition_variable> 14 : : #include <cstddef> 15 : : #include <functional> 16 : : #include <list> 17 : : #include <map> 18 : : #include <thread> 19 : : #include <utility> 20 : : 21 : : /** 22 : : * Simple class for background tasks that should be run 23 : : * periodically or once "after a while" 24 : : * 25 : : * Usage: 26 : : * 27 : : * CScheduler* s = new CScheduler(); 28 : : * s->scheduleFromNow(doSomething, std::chrono::milliseconds{11}); // Assuming a: void doSomething() { } 29 : : * s->scheduleFromNow([=] { this->func(argument); }, std::chrono::milliseconds{3}); 30 : : * std::thread* t = new std::thread([&] { s->serviceQueue(); }); 31 : : * 32 : : * ... then at program shutdown, make sure to call stop() to clean up the thread(s) running serviceQueue: 33 : : * s->stop(); 34 : : * t->join(); 35 : : * delete t; 36 : : * delete s; // Must be done after thread is interrupted/joined. 37 : : */ 38 : : class CScheduler 39 : : { 40 : : public: 41 : : CScheduler(); 42 : : ~CScheduler(); 43 : : 44 : : std::thread m_service_thread; 45 : : 46 : : typedef std::function<void()> Function; 47 : : 48 : : /** Call func at/after time t */ 49 : : void schedule(Function f, std::chrono::steady_clock::time_point t) EXCLUSIVE_LOCKS_REQUIRED(!newTaskMutex); 50 : : 51 : : /** Call f once after the delta has passed */ 52 : 1 : void scheduleFromNow(Function f, std::chrono::milliseconds delta) EXCLUSIVE_LOCKS_REQUIRED(!newTaskMutex) 53 : : { 54 [ + - ][ + - ]: 1 : schedule(std::move(f), std::chrono::steady_clock::now() + delta); 55 : 1 : } 56 : : 57 : : /** 58 : : * Repeat f until the scheduler is stopped. First run is after delta has passed once. 59 : : * 60 : : * The timing is not exact: Every time f is finished, it is rescheduled to run again after delta. If you need more 61 : : * accurate scheduling, don't use this method. 62 : : */ 63 : : void scheduleEvery(Function f, std::chrono::milliseconds delta) EXCLUSIVE_LOCKS_REQUIRED(!newTaskMutex); 64 : : 65 : : /** 66 : : * Mock the scheduler to fast forward in time. 67 : : * Iterates through items on taskQueue and reschedules them 68 : : * to be delta_seconds sooner. 69 : : */ 70 : : void MockForward(std::chrono::seconds delta_seconds) EXCLUSIVE_LOCKS_REQUIRED(!newTaskMutex); 71 : : 72 : : /** 73 : : * Services the queue 'forever'. Should be run in a thread. 74 : : */ 75 : : void serviceQueue() EXCLUSIVE_LOCKS_REQUIRED(!newTaskMutex); 76 : : 77 : : /** Tell any threads running serviceQueue to stop as soon as the current task is done */ 78 : 1 : void stop() EXCLUSIVE_LOCKS_REQUIRED(!newTaskMutex) 79 : : { 80 : 2 : WITH_LOCK(newTaskMutex, stopRequested = true); 81 : 1 : newTaskScheduled.notify_all(); 82 [ - + ]: 1 : if (m_service_thread.joinable()) m_service_thread.join(); 83 : 1 : } 84 : : /** Tell any threads running serviceQueue to stop when there is no work left to be done */ 85 : : void StopWhenDrained() EXCLUSIVE_LOCKS_REQUIRED(!newTaskMutex) 86 : : { 87 : : WITH_LOCK(newTaskMutex, stopWhenEmpty = true); 88 : : newTaskScheduled.notify_all(); 89 : : if (m_service_thread.joinable()) m_service_thread.join(); 90 : : } 91 : : 92 : : /** 93 : : * Returns number of tasks waiting to be serviced, 94 : : * and first and last task times 95 : : */ 96 : : size_t getQueueInfo(std::chrono::steady_clock::time_point& first, 97 : : std::chrono::steady_clock::time_point& last) const 98 : : EXCLUSIVE_LOCKS_REQUIRED(!newTaskMutex); 99 : : 100 : : /** Returns true if there are threads actively running in serviceQueue() */ 101 : : bool AreThreadsServicingQueue() const EXCLUSIVE_LOCKS_REQUIRED(!newTaskMutex); 102 : : 103 : : private: 104 : : mutable Mutex newTaskMutex; 105 : : std::condition_variable newTaskScheduled; 106 : : std::multimap<std::chrono::steady_clock::time_point, Function> taskQueue GUARDED_BY(newTaskMutex); 107 : : int nThreadsServicingQueue GUARDED_BY(newTaskMutex){0}; 108 : : bool stopRequested GUARDED_BY(newTaskMutex){false}; 109 : : bool stopWhenEmpty GUARDED_BY(newTaskMutex){false}; 110 [ + + ][ + - ]: 4391 : bool shouldStop() const EXCLUSIVE_LOCKS_REQUIRED(newTaskMutex) { return stopRequested || (stopWhenEmpty && taskQueue.empty()); } 111 : : }; 112 : : 113 : : /** 114 : : * Class used by CScheduler clients which may schedule multiple jobs 115 : : * which are required to be run serially. Jobs may not be run on the 116 : : * same thread, but no two jobs will be executed 117 : : * at the same time and memory will be release-acquire consistent 118 : : * (the scheduler will internally do an acquire before invoking a callback 119 : : * as well as a release at the end). In practice this means that a callback 120 : : * B() will be able to observe all of the effects of callback A() which executed 121 : : * before it. 122 : : */ 123 : 0 : class SingleThreadedSchedulerClient 124 : : { 125 : : private: 126 : : CScheduler& m_scheduler; 127 : : 128 : : Mutex m_callbacks_mutex; 129 : : std::list<std::function<void()>> m_callbacks_pending GUARDED_BY(m_callbacks_mutex); 130 : 1 : bool m_are_callbacks_running GUARDED_BY(m_callbacks_mutex) = false; 131 : : 132 : : void MaybeScheduleProcessQueue() EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex); 133 : : void ProcessQueue() EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex); 134 : : 135 : : public: 136 : 2 : explicit SingleThreadedSchedulerClient(CScheduler& scheduler LIFETIMEBOUND) : m_scheduler{scheduler} {} 137 : : 138 : : /** 139 : : * Add a callback to be executed. Callbacks are executed serially 140 : : * and memory is release-acquire consistent between callback executions. 141 : : * Practically, this means that callbacks can behave as if they are executed 142 : : * in order by a single thread. 143 : : */ 144 : : void AddToProcessQueue(std::function<void()> func) EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex); 145 : : 146 : : /** 147 : : * Processes all remaining queue members on the calling thread, blocking until queue is empty 148 : : * Must be called after the CScheduler has no remaining processing threads! 149 : : */ 150 : : void EmptyQueue() EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex); 151 : : 152 : : size_t CallbacksPending() EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex); 153 : : }; 154 : : 155 : : #endif // BITCOIN_SCHEDULER_H