Line data Source code
1 : // Copyright (c) 2015-2022 The Bitcoin Core developers 2 : // Distributed under the MIT software license, see the accompanying 3 : // file COPYING or http://www.opensource.org/licenses/mit-license.php. 4 : 5 : #ifndef BITCOIN_SCHEDULER_H 6 : #define BITCOIN_SCHEDULER_H 7 : 8 : #include <attributes.h> 9 : #include <sync.h> 10 : #include <threadsafety.h> 11 : 12 : #include <chrono> 13 : #include <condition_variable> 14 : #include <cstddef> 15 : #include <functional> 16 : #include <list> 17 : #include <map> 18 : #include <thread> 19 : #include <utility> 20 : 21 : /** 22 : * Simple class for background tasks that should be run 23 : * periodically or once "after a while" 24 : * 25 : * Usage: 26 : * 27 : * CScheduler* s = new CScheduler(); 28 : * s->scheduleFromNow(doSomething, std::chrono::milliseconds{11}); // Assuming a: void doSomething() { } 29 : * s->scheduleFromNow([=] { this->func(argument); }, std::chrono::milliseconds{3}); 30 : * std::thread* t = new std::thread([&] { s->serviceQueue(); }); 31 : * 32 : * ... then at program shutdown, make sure to call stop() to clean up the thread(s) running serviceQueue: 33 : * s->stop(); 34 : * t->join(); 35 : * delete t; 36 : * delete s; // Must be done after thread is interrupted/joined. 37 : */ 38 : class CScheduler 39 : { 40 : public: 41 : CScheduler(); 42 : ~CScheduler(); 43 : 44 : std::thread m_service_thread; 45 : 46 : typedef std::function<void()> Function; 47 : 48 : /** Call func at/after time t */ 49 : void schedule(Function f, std::chrono::steady_clock::time_point t) EXCLUSIVE_LOCKS_REQUIRED(!newTaskMutex); 50 : 51 : /** Call f once after the delta has passed */ 52 0 : void scheduleFromNow(Function f, std::chrono::milliseconds delta) EXCLUSIVE_LOCKS_REQUIRED(!newTaskMutex) 53 : { 54 0 : schedule(std::move(f), std::chrono::steady_clock::now() + delta); 55 0 : } 56 : 57 : /** 58 : * Repeat f until the scheduler is stopped. First run is after delta has passed once. 59 : * 60 : * The timing is not exact: Every time f is finished, it is rescheduled to run again after delta. If you need more 61 : * accurate scheduling, don't use this method. 62 : */ 63 : void scheduleEvery(Function f, std::chrono::milliseconds delta) EXCLUSIVE_LOCKS_REQUIRED(!newTaskMutex); 64 : 65 : /** 66 : * Mock the scheduler to fast forward in time. 67 : * Iterates through items on taskQueue and reschedules them 68 : * to be delta_seconds sooner. 69 : */ 70 : void MockForward(std::chrono::seconds delta_seconds) EXCLUSIVE_LOCKS_REQUIRED(!newTaskMutex); 71 : 72 : /** 73 : * Services the queue 'forever'. Should be run in a thread. 74 : */ 75 : void serviceQueue() EXCLUSIVE_LOCKS_REQUIRED(!newTaskMutex); 76 : 77 : /** Tell any threads running serviceQueue to stop as soon as the current task is done */ 78 1 : void stop() EXCLUSIVE_LOCKS_REQUIRED(!newTaskMutex) 79 : { 80 2 : WITH_LOCK(newTaskMutex, stopRequested = true); 81 1 : newTaskScheduled.notify_all(); 82 1 : if (m_service_thread.joinable()) m_service_thread.join(); 83 1 : } 84 : /** Tell any threads running serviceQueue to stop when there is no work left to be done */ 85 : void StopWhenDrained() EXCLUSIVE_LOCKS_REQUIRED(!newTaskMutex) 86 : { 87 : WITH_LOCK(newTaskMutex, stopWhenEmpty = true); 88 : newTaskScheduled.notify_all(); 89 : if (m_service_thread.joinable()) m_service_thread.join(); 90 : } 91 : 92 : /** 93 : * Returns number of tasks waiting to be serviced, 94 : * and first and last task times 95 : */ 96 : size_t getQueueInfo(std::chrono::steady_clock::time_point& first, 97 : std::chrono::steady_clock::time_point& last) const 98 : EXCLUSIVE_LOCKS_REQUIRED(!newTaskMutex); 99 : 100 : /** Returns true if there are threads actively running in serviceQueue() */ 101 : bool AreThreadsServicingQueue() const EXCLUSIVE_LOCKS_REQUIRED(!newTaskMutex); 102 : 103 : private: 104 : mutable Mutex newTaskMutex; 105 : std::condition_variable newTaskScheduled; 106 : std::multimap<std::chrono::steady_clock::time_point, Function> taskQueue GUARDED_BY(newTaskMutex); 107 : int nThreadsServicingQueue GUARDED_BY(newTaskMutex){0}; 108 : bool stopRequested GUARDED_BY(newTaskMutex){false}; 109 : bool stopWhenEmpty GUARDED_BY(newTaskMutex){false}; 110 82322 : bool shouldStop() const EXCLUSIVE_LOCKS_REQUIRED(newTaskMutex) { return stopRequested || (stopWhenEmpty && taskQueue.empty()); } 111 : }; 112 : 113 : /** 114 : * Class used by CScheduler clients which may schedule multiple jobs 115 : * which are required to be run serially. Jobs may not be run on the 116 : * same thread, but no two jobs will be executed 117 : * at the same time and memory will be release-acquire consistent 118 : * (the scheduler will internally do an acquire before invoking a callback 119 : * as well as a release at the end). In practice this means that a callback 120 : * B() will be able to observe all of the effects of callback A() which executed 121 : * before it. 122 : */ 123 : class SingleThreadedSchedulerClient 124 : { 125 : private: 126 : CScheduler& m_scheduler; 127 : 128 : Mutex m_callbacks_mutex; 129 : std::list<std::function<void()>> m_callbacks_pending GUARDED_BY(m_callbacks_mutex); 130 1 : bool m_are_callbacks_running GUARDED_BY(m_callbacks_mutex) = false; 131 : 132 : void MaybeScheduleProcessQueue() EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex); 133 : void ProcessQueue() EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex); 134 : 135 : public: 136 2 : explicit SingleThreadedSchedulerClient(CScheduler& scheduler LIFETIMEBOUND) : m_scheduler{scheduler} {} 137 : 138 : /** 139 : * Add a callback to be executed. Callbacks are executed serially 140 : * and memory is release-acquire consistent between callback executions. 141 : * Practically, this means that callbacks can behave as if they are executed 142 : * in order by a single thread. 143 : */ 144 : void AddToProcessQueue(std::function<void()> func) EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex); 145 : 146 : /** 147 : * Processes all remaining queue members on the calling thread, blocking until queue is empty 148 : * Must be called after the CScheduler has no remaining processing threads! 149 : */ 150 : void EmptyQueue() EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex); 151 : 152 : size_t CallbacksPending() EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex); 153 : }; 154 : 155 : #endif // BITCOIN_SCHEDULER_H