Line data Source code
1 : // Copyright (c) 2015-2022 The Bitcoin Core developers 2 : // Distributed under the MIT software license, see the accompanying 3 : // file COPYING or http://www.opensource.org/licenses/mit-license.php. 4 : 5 : #include <scheduler.h> 6 : 7 : #include <sync.h> 8 : #include <util/time.h> 9 : 10 : #include <cassert> 11 : #include <functional> 12 : #include <utility> 13 : 14 2 : CScheduler::CScheduler() = default; 15 : 16 1 : CScheduler::~CScheduler() 17 : { 18 1 : assert(nThreadsServicingQueue == 0); 19 1 : if (stopWhenEmpty) assert(taskQueue.empty()); 20 1 : } 21 : 22 : 23 1 : void CScheduler::serviceQueue() 24 : { 25 1 : WAIT_LOCK(newTaskMutex, lock); 26 1 : ++nThreadsServicingQueue; 27 : 28 : // newTaskMutex is locked throughout this loop EXCEPT 29 : // when the thread is waiting or when the user's function 30 : // is called. 31 17025 : while (!shouldStop()) { 32 : try { 33 31249 : while (!shouldStop() && taskQueue.empty()) { 34 : // Wait until there is something to do. 35 14225 : newTaskScheduled.wait(lock); 36 : } 37 : 38 : // Wait until either there is a new task, or until 39 : // the time of the first item on the queue: 40 : 41 17024 : while (!shouldStop() && !taskQueue.empty()) { 42 17023 : std::chrono::steady_clock::time_point timeToWaitFor = taskQueue.begin()->first; 43 17023 : if (newTaskScheduled.wait_until(lock, timeToWaitFor) == std::cv_status::timeout) { 44 17023 : break; // Exit loop after timeout, it means we reached the time of the event 45 : } 46 : } 47 : 48 : // If there are multiple threads, the queue can empty while we're waiting (another 49 : // thread may service the task we were waiting on). 50 17024 : if (shouldStop() || taskQueue.empty()) 51 1 : continue; 52 : 53 17023 : Function f = taskQueue.begin()->second; 54 17023 : taskQueue.erase(taskQueue.begin()); 55 : 56 : { 57 : // Unlock before calling f, so it can reschedule itself or another task 58 : // without deadlocking: 59 17023 : REVERSE_LOCK(lock); 60 17023 : f(); 61 17023 : } 62 17023 : } catch (...) { 63 0 : --nThreadsServicingQueue; 64 0 : throw; 65 0 : } 66 : } 67 1 : --nThreadsServicingQueue; 68 1 : newTaskScheduled.notify_one(); 69 1 : } 70 : 71 17023 : void CScheduler::schedule(CScheduler::Function f, std::chrono::steady_clock::time_point t) 72 : { 73 : { 74 17023 : LOCK(newTaskMutex); 75 17023 : taskQueue.insert(std::make_pair(t, f)); 76 17023 : } 77 17023 : newTaskScheduled.notify_one(); 78 17023 : } 79 : 80 0 : void CScheduler::MockForward(std::chrono::seconds delta_seconds) 81 : { 82 0 : assert(delta_seconds > 0s && delta_seconds <= 1h); 83 : 84 : { 85 0 : LOCK(newTaskMutex); 86 : 87 : // use temp_queue to maintain updated schedule 88 0 : std::multimap<std::chrono::steady_clock::time_point, Function> temp_queue; 89 : 90 0 : for (const auto& element : taskQueue) { 91 0 : temp_queue.emplace_hint(temp_queue.cend(), element.first - delta_seconds, element.second); 92 : } 93 : 94 : // point taskQueue to temp_queue 95 0 : taskQueue = std::move(temp_queue); 96 0 : } 97 : 98 : // notify that the taskQueue needs to be processed 99 0 : newTaskScheduled.notify_one(); 100 0 : } 101 : 102 0 : static void Repeat(CScheduler& s, CScheduler::Function f, std::chrono::milliseconds delta) 103 : { 104 0 : f(); 105 0 : s.scheduleFromNow([=, &s] { Repeat(s, f, delta); }, delta); 106 0 : } 107 1 : 108 1 : void CScheduler::scheduleEvery(CScheduler::Function f, std::chrono::milliseconds delta) 109 1 : { 110 0 : scheduleFromNow([this, f, delta] { Repeat(*this, f, delta); }, delta); 111 0 : } 112 : 113 0 : size_t CScheduler::getQueueInfo(std::chrono::steady_clock::time_point& first, 114 : std::chrono::steady_clock::time_point& last) const 115 : { 116 0 : LOCK(newTaskMutex); 117 0 : size_t result = taskQueue.size(); 118 0 : if (!taskQueue.empty()) { 119 0 : first = taskQueue.begin()->first; 120 0 : last = taskQueue.rbegin()->first; 121 0 : } 122 0 : return result; 123 0 : } 124 : 125 1 : bool CScheduler::AreThreadsServicingQueue() const 126 : { 127 1 : LOCK(newTaskMutex); 128 1 : return nThreadsServicingQueue; 129 1 : } 130 : 131 : 132 31278 : void SingleThreadedSchedulerClient::MaybeScheduleProcessQueue() 133 : { 134 : { 135 31278 : LOCK(m_callbacks_mutex); 136 : // Try to avoid scheduling too many copies here, but if we 137 : // accidentally have two ProcessQueue's scheduled at once its 138 : // not a big deal. 139 31278 : if (m_are_callbacks_running) return; 140 31271 : if (m_callbacks_pending.empty()) return; 141 31278 : } 142 34046 : m_scheduler.schedule([this] { this->ProcessQueue(); }, std::chrono::steady_clock::now()); 143 31278 : } 144 : 145 17024 : void SingleThreadedSchedulerClient::ProcessQueue() 146 : { 147 17024 : std::function<void()> callback; 148 : { 149 17024 : LOCK(m_callbacks_mutex); 150 17024 : if (m_are_callbacks_running) return; 151 17024 : if (m_callbacks_pending.empty()) return; 152 15639 : m_are_callbacks_running = true; 153 : 154 15639 : callback = std::move(m_callbacks_pending.front()); 155 15639 : m_callbacks_pending.pop_front(); 156 17024 : } 157 : 158 : // RAII the setting of fCallbacksRunning and calling MaybeScheduleProcessQueue 159 : // to ensure both happen safely even if callback() throws. 160 : struct RAIICallbacksRunning { 161 : SingleThreadedSchedulerClient* instance; 162 15639 : explicit RAIICallbacksRunning(SingleThreadedSchedulerClient* _instance) : instance(_instance) {} 163 15639 : ~RAIICallbacksRunning() 164 : { 165 : { 166 15639 : LOCK(instance->m_callbacks_mutex); 167 15639 : instance->m_are_callbacks_running = false; 168 15639 : } 169 15639 : instance->MaybeScheduleProcessQueue(); 170 15639 : } 171 15639 : } raiicallbacksrunning(this); 172 : 173 15639 : callback(); 174 17024 : } 175 : 176 15639 : void SingleThreadedSchedulerClient::AddToProcessQueue(std::function<void()> func) 177 : { 178 : { 179 15639 : LOCK(m_callbacks_mutex); 180 15639 : m_callbacks_pending.emplace_back(std::move(func)); 181 15639 : } 182 15639 : MaybeScheduleProcessQueue(); 183 15639 : } 184 : 185 1 : void SingleThreadedSchedulerClient::EmptyQueue() 186 : { 187 1 : assert(!m_scheduler.AreThreadsServicingQueue()); 188 1 : bool should_continue = true; 189 2 : while (should_continue) { 190 1 : ProcessQueue(); 191 1 : LOCK(m_callbacks_mutex); 192 1 : should_continue = !m_callbacks_pending.empty(); 193 1 : } 194 1 : } 195 : 196 201 : size_t SingleThreadedSchedulerClient::CallbacksPending() 197 : { 198 201 : LOCK(m_callbacks_mutex); 199 201 : return m_callbacks_pending.size(); 200 201 : }