Branch data Line data Source code
1 : : // Copyright (c) 2015-2022 The Bitcoin Core developers 2 : : // Distributed under the MIT software license, see the accompanying 3 : : // file COPYING or http://www.opensource.org/licenses/mit-license.php. 4 : : 5 : : #include <scheduler.h> 6 : : 7 : : #include <sync.h> 8 : : #include <util/time.h> 9 : : 10 : : #include <cassert> 11 : : #include <functional> 12 : : #include <utility> 13 : : 14 : 2 : CScheduler::CScheduler() = default; 15 : : 16 : 1 : CScheduler::~CScheduler() 17 : : { 18 [ + - ]: 1 : assert(nThreadsServicingQueue == 0); 19 [ + - ][ # # ]: 1 : if (stopWhenEmpty) assert(taskQueue.empty()); 20 : 1 : } 21 : : 22 : : 23 : 1 : void CScheduler::serviceQueue() 24 : : { 25 : 1 : WAIT_LOCK(newTaskMutex, lock); 26 : 1 : ++nThreadsServicingQueue; 27 : : 28 : : // newTaskMutex is locked throughout this loop EXCEPT 29 : : // when the thread is waiting or when the user's function 30 : : // is called. 31 [ - + ][ + + ]: 975 : while (!shouldStop()) { 32 : : try { 33 [ + - ][ - + ]: 976 : while (!shouldStop() && taskQueue.empty()) { [ + + ] 34 : : // Wait until there is something to do. 35 [ + - ]: 2 : newTaskScheduled.wait(lock); 36 : : } 37 : : 38 : : // Wait until either there is a new task, or until 39 : : // the time of the first item on the queue: 40 : : 41 [ + - ][ + + ]: 1466 : while (!shouldStop() && !taskQueue.empty()) { [ + + ] 42 : 1465 : std::chrono::steady_clock::time_point timeToWaitFor = taskQueue.begin()->first; 43 [ + - ][ + + ]: 1465 : if (newTaskScheduled.wait_until(lock, timeToWaitFor) == std::cv_status::timeout) { 44 : 973 : break; // Exit loop after timeout, it means we reached the time of the event 45 : : } 46 : : } 47 : : 48 : : // If there are multiple threads, the queue can empty while we're waiting (another 49 : : // thread may service the task we were waiting on). 50 [ + - ][ + + ]: 974 : if (shouldStop() || taskQueue.empty()) [ + - ] 51 : 1 : continue; 52 : : 53 [ + - ]: 973 : Function f = taskQueue.begin()->second; 54 [ + - ]: 973 : taskQueue.erase(taskQueue.begin()); 55 : : 56 : : { 57 : : // Unlock before calling f, so it can reschedule itself or another task 58 : : // without deadlocking: 59 [ + - ]: 973 : REVERSE_LOCK(lock); 60 [ + - ]: 973 : f(); 61 : 973 : } 62 : 973 : } catch (...) { 63 : 0 : --nThreadsServicingQueue; 64 [ # # ]: 0 : throw; 65 [ # # ]: 0 : } 66 : : } 67 : 1 : --nThreadsServicingQueue; 68 : 1 : newTaskScheduled.notify_one(); 69 : 1 : } 70 : : 71 : 974 : void CScheduler::schedule(CScheduler::Function f, std::chrono::steady_clock::time_point t) 72 : : { 73 : : { 74 : 974 : LOCK(newTaskMutex); 75 [ + - ][ - + ]: 974 : taskQueue.insert(std::make_pair(t, f)); 76 : 974 : } 77 : 974 : newTaskScheduled.notify_one(); 78 : 974 : } 79 : : 80 : 0 : void CScheduler::MockForward(std::chrono::seconds delta_seconds) 81 : : { 82 [ # # ][ # # ]: 0 : assert(delta_seconds > 0s && delta_seconds <= 1h); 83 : : 84 : : { 85 : 0 : LOCK(newTaskMutex); 86 : : 87 : : // use temp_queue to maintain updated schedule 88 : 0 : std::multimap<std::chrono::steady_clock::time_point, Function> temp_queue; 89 : : 90 [ # # ]: 0 : for (const auto& element : taskQueue) { 91 [ # # ][ # # ]: 0 : temp_queue.emplace_hint(temp_queue.cend(), element.first - delta_seconds, element.second); 92 : : } 93 : : 94 : : // point taskQueue to temp_queue 95 : 0 : taskQueue = std::move(temp_queue); 96 : 0 : } 97 : : 98 : : // notify that the taskQueue needs to be processed 99 : 0 : newTaskScheduled.notify_one(); 100 : 0 : } 101 : : 102 : 0 : static void Repeat(CScheduler& s, CScheduler::Function f, std::chrono::milliseconds delta) 103 : : { 104 : 0 : f(); 105 [ # # ][ # # ]: 0 : s.scheduleFromNow([=, &s] { Repeat(s, f, delta); }, delta); [ # # ] 106 : 0 : } 107 : 1 : 108 : 2 : void CScheduler::scheduleEvery(CScheduler::Function f, std::chrono::milliseconds delta) 109 : 1 : { 110 [ + - ][ - + ]: 1 : scheduleFromNow([this, f, delta] { Repeat(*this, f, delta); }, delta); [ # # ] 111 : 1 : } 112 : : 113 : 0 : size_t CScheduler::getQueueInfo(std::chrono::steady_clock::time_point& first, 114 : : std::chrono::steady_clock::time_point& last) const 115 : : { 116 : 0 : LOCK(newTaskMutex); 117 : 0 : size_t result = taskQueue.size(); 118 [ # # ]: 0 : if (!taskQueue.empty()) { 119 : 0 : first = taskQueue.begin()->first; 120 [ # # ]: 0 : last = taskQueue.rbegin()->first; 121 : 0 : } 122 : 0 : return result; 123 : 0 : } 124 : : 125 : 1 : bool CScheduler::AreThreadsServicingQueue() const 126 : : { 127 : 1 : LOCK(newTaskMutex); 128 : 1 : return nThreadsServicingQueue; 129 : 1 : } 130 : : 131 : : 132 : 1490 : void SingleThreadedSchedulerClient::MaybeScheduleProcessQueue() 133 : : { 134 : : { 135 : 1490 : LOCK(m_callbacks_mutex); 136 : : // Try to avoid scheduling too many copies here, but if we 137 : : // accidentally have two ProcessQueue's scheduled at once its 138 : : // not a big deal. 139 [ + + ]: 1490 : if (m_are_callbacks_running) return; 140 [ + + ]: 1486 : if (m_callbacks_pending.empty()) return; 141 [ - + + ]: 1490 : } 142 [ + - ]: 1946 : m_scheduler.schedule([this] { this->ProcessQueue(); }, std::chrono::steady_clock::now()); 143 : 1490 : } 144 : : 145 : 974 : void SingleThreadedSchedulerClient::ProcessQueue() 146 : : { 147 : 974 : std::function<void()> callback; 148 : : { 149 [ + - ][ + - ]: 974 : LOCK(m_callbacks_mutex); 150 [ - + ]: 974 : if (m_are_callbacks_running) return; 151 [ + + ]: 974 : if (m_callbacks_pending.empty()) return; 152 : 745 : m_are_callbacks_running = true; 153 : : 154 : 745 : callback = std::move(m_callbacks_pending.front()); 155 : 745 : m_callbacks_pending.pop_front(); 156 [ + + ]: 974 : } 157 : : 158 : : // RAII the setting of fCallbacksRunning and calling MaybeScheduleProcessQueue 159 : : // to ensure both happen safely even if callback() throws. 160 : : struct RAIICallbacksRunning { 161 : : SingleThreadedSchedulerClient* instance; 162 : 745 : explicit RAIICallbacksRunning(SingleThreadedSchedulerClient* _instance) : instance(_instance) {} 163 : 745 : ~RAIICallbacksRunning() 164 : : { 165 : : { 166 [ + - ]: 745 : LOCK(instance->m_callbacks_mutex); 167 : 745 : instance->m_are_callbacks_running = false; 168 : 745 : } 169 [ + - ]: 745 : instance->MaybeScheduleProcessQueue(); 170 : 745 : } 171 [ + - ]: 745 : } raiicallbacksrunning(this); 172 : : 173 [ + - ]: 745 : callback(); 174 [ - + ]: 974 : } 175 : : 176 : 745 : void SingleThreadedSchedulerClient::AddToProcessQueue(std::function<void()> func) 177 : : { 178 : : { 179 : 745 : LOCK(m_callbacks_mutex); 180 [ + - ]: 745 : m_callbacks_pending.emplace_back(std::move(func)); 181 : 745 : } 182 : 745 : MaybeScheduleProcessQueue(); 183 : 745 : } 184 : : 185 : 1 : void SingleThreadedSchedulerClient::EmptyQueue() 186 : : { 187 [ + - ]: 1 : assert(!m_scheduler.AreThreadsServicingQueue()); 188 : 1 : bool should_continue = true; 189 [ + + ]: 2 : while (should_continue) { 190 : 1 : ProcessQueue(); 191 : 1 : LOCK(m_callbacks_mutex); 192 : 1 : should_continue = !m_callbacks_pending.empty(); 193 : 1 : } 194 : 1 : } 195 : : 196 : 201 : size_t SingleThreadedSchedulerClient::CallbacksPending() 197 : : { 198 : 201 : LOCK(m_callbacks_mutex); 199 : 201 : return m_callbacks_pending.size(); 200 : 201 : }