Branch data Line data Source code
1 : : // Copyright (c) 2012-2022 The Bitcoin Core developers
2 : : // Distributed under the MIT software license, see the accompanying
3 : : // file COPYING or http://www.opensource.org/licenses/mit-license.php.
4 : :
5 : : #include <random.h>
6 : : #include <scheduler.h>
7 : : #include <util/time.h>
8 : :
9 : : #include <boost/test/unit_test.hpp>
10 : :
11 : : #include <functional>
12 : : #include <mutex>
13 : : #include <thread>
14 : : #include <vector>
15 : :
16 : 0 : BOOST_AUTO_TEST_SUITE(scheduler_tests)
17 : :
18 : 0 : static void microTask(CScheduler& s, std::mutex& mutex, int& counter, int delta, std::chrono::steady_clock::time_point rescheduleTime)
19 : : {
20 : : {
21 : 0 : std::lock_guard<std::mutex> lock(mutex);
22 : 0 : counter += delta;
23 : 0 : }
24 : 0 : auto noTime = std::chrono::steady_clock::time_point::min();
25 : 0 : if (rescheduleTime != noTime) {
26 : 0 : CScheduler::Function f = std::bind(µTask, std::ref(s), std::ref(mutex), std::ref(counter), -delta + 1, noTime);
27 : 0 : s.schedule(f, rescheduleTime);
28 : 0 : }
29 : 0 : }
30 : :
31 : 0 : BOOST_AUTO_TEST_CASE(manythreads)
32 : : {
33 : : // Stress test: hundreds of microsecond-scheduled tasks,
34 : : // serviced by 10 threads.
35 : : //
36 : : // So... ten shared counters, which if all the tasks execute
37 : : // properly will sum to the number of tasks done.
38 : : // Each task adds or subtracts a random amount from one of the
39 : : // counters, and then schedules another task 0-1000
40 : : // microseconds in the future to subtract or add from
41 : : // the counter -random_amount+1, so in the end the shared
42 : : // counters should sum to the number of initial tasks performed.
43 : 0 : CScheduler microTasks;
44 : :
45 : 0 : std::mutex counterMutex[10];
46 : 0 : int counter[10] = { 0 };
47 : 0 : FastRandomContext rng{/*fDeterministic=*/true};
48 : 0 : auto zeroToNine = [](FastRandomContext& rc) -> int { return rc.randrange(10); }; // [0, 9]
49 : 0 : auto randomMsec = [](FastRandomContext& rc) -> int { return -11 + (int)rc.randrange(1012); }; // [-11, 1000]
50 : 0 : auto randomDelta = [](FastRandomContext& rc) -> int { return -1000 + (int)rc.randrange(2001); }; // [-1000, 1000]
51 : :
52 : 0 : auto start = std::chrono::steady_clock::now();
53 : 0 : auto now = start;
54 : 0 : std::chrono::steady_clock::time_point first, last;
55 : 0 : size_t nTasks = microTasks.getQueueInfo(first, last);
56 : 0 : BOOST_CHECK(nTasks == 0);
57 : :
58 : 0 : for (int i = 0; i < 100; ++i) {
59 : 0 : auto t = now + std::chrono::microseconds(randomMsec(rng));
60 : 0 : auto tReschedule = now + std::chrono::microseconds(500 + randomMsec(rng));
61 : 0 : int whichCounter = zeroToNine(rng);
62 : 0 : CScheduler::Function f = std::bind(µTask, std::ref(microTasks),
63 : 0 : std::ref(counterMutex[whichCounter]), std::ref(counter[whichCounter]),
64 : 0 : randomDelta(rng), tReschedule);
65 : 0 : microTasks.schedule(f, t);
66 : 0 : }
67 : 0 : nTasks = microTasks.getQueueInfo(first, last);
68 : 0 : BOOST_CHECK(nTasks == 100);
69 : 0 : BOOST_CHECK(first < last);
70 : 0 : BOOST_CHECK(last > now);
71 : :
72 : : // As soon as these are created they will start running and servicing the queue
73 : 0 : std::vector<std::thread> microThreads;
74 : 0 : microThreads.reserve(10);
75 : 0 : for (int i = 0; i < 5; i++)
76 : 0 : microThreads.emplace_back(std::bind(&CScheduler::serviceQueue, µTasks));
77 : :
78 : 0 : UninterruptibleSleep(std::chrono::microseconds{600});
79 : 0 : now = std::chrono::steady_clock::now();
80 : :
81 : : // More threads and more tasks:
82 : 0 : for (int i = 0; i < 5; i++)
83 : 0 : microThreads.emplace_back(std::bind(&CScheduler::serviceQueue, µTasks));
84 : 0 : for (int i = 0; i < 100; i++) {
85 : 0 : auto t = now + std::chrono::microseconds(randomMsec(rng));
86 : 0 : auto tReschedule = now + std::chrono::microseconds(500 + randomMsec(rng));
87 : 0 : int whichCounter = zeroToNine(rng);
88 : 0 : CScheduler::Function f = std::bind(µTask, std::ref(microTasks),
89 : 0 : std::ref(counterMutex[whichCounter]), std::ref(counter[whichCounter]),
90 : 0 : randomDelta(rng), tReschedule);
91 : 0 : microTasks.schedule(f, t);
92 : 0 : }
93 : :
94 : : // Drain the task queue then exit threads
95 : 0 : microTasks.StopWhenDrained();
96 : : // wait until all the threads are done
97 : 0 : for (auto& thread: microThreads) {
98 : 0 : if (thread.joinable()) thread.join();
99 : : }
100 : :
101 : 0 : int counterSum = 0;
102 : 0 : for (int i = 0; i < 10; i++) {
103 : 0 : BOOST_CHECK(counter[i] != 0);
104 : 0 : counterSum += counter[i];
105 : 0 : }
106 : 0 : BOOST_CHECK_EQUAL(counterSum, 200);
107 : 0 : }
108 : :
109 : 0 : BOOST_AUTO_TEST_CASE(wait_until_past)
110 : : {
111 : 0 : std::condition_variable condvar;
112 : 0 : Mutex mtx;
113 : 0 : WAIT_LOCK(mtx, lock);
114 : :
115 : 0 : const auto no_wait = [&](const std::chrono::seconds& d) {
116 : 0 : return condvar.wait_until(lock, std::chrono::steady_clock::now() - d);
117 : : };
118 : :
119 : 0 : BOOST_CHECK(std::cv_status::timeout == no_wait(std::chrono::seconds{1}));
120 : 0 : BOOST_CHECK(std::cv_status::timeout == no_wait(std::chrono::minutes{1}));
121 : 0 : BOOST_CHECK(std::cv_status::timeout == no_wait(std::chrono::hours{1}));
122 : 0 : BOOST_CHECK(std::cv_status::timeout == no_wait(std::chrono::hours{10}));
123 : 0 : BOOST_CHECK(std::cv_status::timeout == no_wait(std::chrono::hours{100}));
124 : 0 : BOOST_CHECK(std::cv_status::timeout == no_wait(std::chrono::hours{1000}));
125 : 0 : }
126 : :
127 : 0 : BOOST_AUTO_TEST_CASE(singlethreadedscheduler_ordered)
128 : : {
129 : 0 : CScheduler scheduler;
130 : :
131 : : // each queue should be well ordered with respect to itself but not other queues
132 : 0 : SingleThreadedSchedulerClient queue1(scheduler);
133 : 0 : SingleThreadedSchedulerClient queue2(scheduler);
134 : :
135 : : // create more threads than queues
136 : : // if the queues only permit execution of one task at once then
137 : : // the extra threads should effectively be doing nothing
138 : : // if they don't we'll get out of order behaviour
139 : 0 : std::vector<std::thread> threads;
140 : 0 : threads.reserve(5);
141 : 0 : for (int i = 0; i < 5; ++i) {
142 : 0 : threads.emplace_back([&] { scheduler.serviceQueue(); });
143 : 0 : }
144 : :
145 : : // these are not atomic, if SinglethreadedSchedulerClient prevents
146 : : // parallel execution at the queue level no synchronization should be required here
147 : 0 : int counter1 = 0;
148 : 0 : int counter2 = 0;
149 : :
150 : : // just simply count up on each queue - if execution is properly ordered then
151 : : // the callbacks should run in exactly the order in which they were enqueued
152 : 0 : for (int i = 0; i < 100; ++i) {
153 : 0 : queue1.AddToProcessQueue([i, &counter1]() {
154 : 0 : bool expectation = i == counter1++;
155 : 0 : assert(expectation);
156 : 0 : });
157 : :
158 : 0 : queue2.AddToProcessQueue([i, &counter2]() {
159 : 0 : bool expectation = i == counter2++;
160 : 0 : assert(expectation);
161 : 0 : });
162 : 0 : }
163 : :
164 : : // finish up
165 : 0 : scheduler.StopWhenDrained();
166 : 0 : for (auto& thread: threads) {
167 : 0 : if (thread.joinable()) thread.join();
168 : : }
169 : :
170 : 0 : BOOST_CHECK_EQUAL(counter1, 100);
171 : 0 : BOOST_CHECK_EQUAL(counter2, 100);
172 : 0 : }
173 : :
174 : 0 : BOOST_AUTO_TEST_CASE(mockforward)
175 : : {
176 : 0 : CScheduler scheduler;
177 : :
178 : 0 : int counter{0};
179 : 0 : CScheduler::Function dummy = [&counter]{counter++;};
180 : :
181 : : // schedule jobs for 2, 5 & 8 minutes into the future
182 : :
183 : 0 : scheduler.scheduleFromNow(dummy, std::chrono::minutes{2});
184 : 0 : scheduler.scheduleFromNow(dummy, std::chrono::minutes{5});
185 : 0 : scheduler.scheduleFromNow(dummy, std::chrono::minutes{8});
186 : :
187 : : // check taskQueue
188 : 0 : std::chrono::steady_clock::time_point first, last;
189 : 0 : size_t num_tasks = scheduler.getQueueInfo(first, last);
190 : 0 : BOOST_CHECK_EQUAL(num_tasks, 3ul);
191 : :
192 : 0 : std::thread scheduler_thread([&]() { scheduler.serviceQueue(); });
193 : :
194 : : // bump the scheduler forward 5 minutes
195 : 0 : scheduler.MockForward(std::chrono::minutes{5});
196 : :
197 : : // ensure scheduler has chance to process all tasks queued for before 1 ms from now.
198 : 0 : scheduler.scheduleFromNow([&scheduler] { scheduler.stop(); }, std::chrono::milliseconds{1});
199 : 0 : scheduler_thread.join();
200 : :
201 : : // check that the queue only has one job remaining
202 : 0 : num_tasks = scheduler.getQueueInfo(first, last);
203 : 0 : BOOST_CHECK_EQUAL(num_tasks, 1ul);
204 : :
205 : : // check that the dummy function actually ran
206 : 0 : BOOST_CHECK_EQUAL(counter, 2);
207 : :
208 : : // check that the time of the remaining job has been updated
209 : 0 : auto now = std::chrono::steady_clock::now();
210 : 0 : int delta = std::chrono::duration_cast<std::chrono::seconds>(first - now).count();
211 : : // should be between 2 & 3 minutes from now
212 : 0 : BOOST_CHECK(delta > 2*60 && delta < 3*60);
213 : 0 : }
214 : :
215 : 0 : BOOST_AUTO_TEST_SUITE_END()
|