Branch data Line data Source code
1 : : // Copyright (c) 2020-2022 The Bitcoin Core developers
2 : : // Distributed under the MIT software license, see the accompanying
3 : : // file COPYING or http://www.opensource.org/licenses/mit-license.php.
4 : :
5 : : #include <test/util/net.h>
6 : :
7 : : #include <chainparams.h>
8 : : #include <node/eviction.h>
9 : : #include <net.h>
10 : : #include <net_processing.h>
11 : : #include <netmessagemaker.h>
12 : : #include <span.h>
13 : : #include <sync.h>
14 : :
15 : : #include <chrono>
16 : : #include <optional>
17 [ + - ]: 2 : #include <vector>
18 [ + - ]: 2 :
19 : 0 : void ConnmanTestMsg::Handshake(CNode& node,
20 : : bool successfully_connected,
21 : : ServiceFlags remote_services,
22 : : ServiceFlags local_services,
23 : : int32_t version,
24 : : bool relay_txs)
25 : : {
26 : 0 : auto& peerman{static_cast<PeerManager&>(*m_msgproc)};
27 : 0 : auto& connman{*this};
28 : 0 : const CNetMsgMaker mm{0};
29 : :
30 : 0 : peerman.InitializeNode(node, local_services);
31 : 0 : FlushSendBuffer(node); // Drop the version message added by InitializeNode.
32 : :
33 : 0 : CSerializedNetMsg msg_version{
34 [ # # ][ # # ]: 0 : mm.Make(NetMsgType::VERSION,
35 : : version, //
36 [ # # ]: 0 : Using<CustomUintFormatter<8>>(remote_services), //
37 : 0 : int64_t{}, // dummy time
38 : 0 : int64_t{}, // ignored service bits
39 [ # # ][ # # ]: 0 : CNetAddr::V1(CService{}), // dummy
40 : 0 : int64_t{}, // ignored service bits
41 [ # # ][ # # ]: 0 : CNetAddr::V1(CService{}), // ignored
42 : 0 : uint64_t{1}, // dummy nonce
43 : 0 : std::string{}, // dummy subver
44 : 0 : int32_t{}, // dummy starting_height
45 : : relay_txs),
46 : : };
47 : :
48 [ # # ]: 0 : (void)connman.ReceiveMsgFrom(node, std::move(msg_version));
49 : 0 : node.fPauseSend = false;
50 [ # # ]: 0 : connman.ProcessMessagesOnce(node);
51 [ # # ]: 0 : peerman.SendMessages(&node);
52 [ # # ]: 0 : FlushSendBuffer(node); // Drop the verack message added by SendMessages.
53 [ # # ]: 0 : if (node.fDisconnect) return;
54 [ # # ]: 0 : assert(node.nVersion == version);
55 [ # # ][ # # ]: 0 : assert(node.GetCommonVersion() == std::min(version, PROTOCOL_VERSION));
[ # # ]
56 : 0 : CNodeStateStats statestats;
57 [ # # ][ # # ]: 0 : assert(peerman.GetNodeStateStats(node.GetId(), statestats));
[ # # ]
58 [ # # ][ # # ]: 0 : assert(statestats.m_relay_txs == (relay_txs && !node.IsBlockOnlyConn()));
[ # # ]
59 [ # # ]: 0 : assert(statestats.their_services == remote_services);
60 [ # # ]: 0 : if (successfully_connected) {
61 [ # # ][ # # ]: 0 : CSerializedNetMsg msg_verack{mm.Make(NetMsgType::VERACK)};
62 [ # # ]: 0 : (void)connman.ReceiveMsgFrom(node, std::move(msg_verack));
63 : 0 : node.fPauseSend = false;
64 [ # # ]: 0 : connman.ProcessMessagesOnce(node);
65 [ # # ]: 0 : peerman.SendMessages(&node);
66 [ # # ]: 0 : assert(node.fSuccessfullyConnected == true);
67 : 0 : }
68 [ # # ]: 0 : }
69 : :
70 : 0 : void ConnmanTestMsg::NodeReceiveMsgBytes(CNode& node, Span<const uint8_t> msg_bytes, bool& complete) const
71 : : {
72 [ # # ]: 0 : assert(node.ReceiveMsgBytes(msg_bytes, complete));
73 [ # # ]: 0 : if (complete) {
74 : 0 : node.MarkReceivedMsgsForProcessing();
75 : 0 : }
76 : 0 : }
77 : :
78 : 0 : void ConnmanTestMsg::FlushSendBuffer(CNode& node) const
79 : : {
80 : 0 : LOCK(node.cs_vSend);
81 : 0 : node.vSendMsg.clear();
82 : 0 : node.m_send_memusage = 0;
83 [ + - ]: 2 : while (true) {
84 : 0 : const auto& [to_send, _more, _msg_type] = node.m_transport->GetBytesToSend(false);
85 [ # # ]: 0 : if (to_send.empty()) break;
86 : 0 : node.m_transport->MarkBytesSent(to_send.size());
87 : : }
88 : 0 : }
89 : :
90 : 0 : bool ConnmanTestMsg::ReceiveMsgFrom(CNode& node, CSerializedNetMsg&& ser_msg) const
91 : : {
92 : 0 : bool queued = node.m_transport->SetMessageToSend(ser_msg);
93 [ # # ]: 0 : assert(queued);
94 : 0 : bool complete{false};
95 : 0 : while (true) {
96 : 0 : const auto& [to_send, _more, _msg_type] = node.m_transport->GetBytesToSend(false);
97 [ # # ]: 0 : if (to_send.empty()) break;
98 : 0 : NodeReceiveMsgBytes(node, to_send, complete);
99 : 0 : node.m_transport->MarkBytesSent(to_send.size());
100 : : }
101 : 0 : return complete;
102 : : }
103 : :
104 : 0 : std::vector<NodeEvictionCandidate> GetRandomNodeEvictionCandidates(int n_candidates, FastRandomContext& random_context)
105 : : {
106 : 0 : std::vector<NodeEvictionCandidate> candidates;
107 [ # # ]: 0 : candidates.reserve(n_candidates);
108 [ # # ]: 0 : for (int id = 0; id < n_candidates; ++id) {
109 [ # # ][ # # ]: 0 : candidates.push_back({
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ][ # # ]
110 : 0 : /*id=*/id,
111 [ # # ]: 0 : /*m_connected=*/std::chrono::seconds{random_context.randrange(100)},
112 [ # # ]: 0 : /*m_min_ping_time=*/std::chrono::microseconds{random_context.randrange(100)},
113 [ # # ]: 0 : /*m_last_block_time=*/std::chrono::seconds{random_context.randrange(100)},
114 [ # # ]: 0 : /*m_last_tx_time=*/std::chrono::seconds{random_context.randrange(100)},
115 : 0 : /*fRelevantServices=*/random_context.randbool(),
116 : 0 : /*m_relay_txs=*/random_context.randbool(),
117 : 0 : /*fBloomFilter=*/random_context.randbool(),
118 : 0 : /*nKeyedNetGroup=*/random_context.randrange(100),
119 : 0 : /*prefer_evict=*/random_context.randbool(),
120 : 0 : /*m_is_local=*/random_context.randbool(),
121 : 0 : /*m_network=*/ALL_NETWORKS[random_context.randrange(ALL_NETWORKS.size())],
122 : : /*m_noban=*/false,
123 : : /*m_conn_type=*/ConnectionType::INBOUND,
124 : : });
125 : 0 : }
126 : 0 : return candidates;
127 [ # # ]: 0 : }
128 : :
129 : 144 : ZeroSock::ZeroSock() : Sock{INVALID_SOCKET} {}
130 : :
131 : 144 : ZeroSock::~ZeroSock() {}
132 : :
133 : 0 : ssize_t ZeroSock::Send(const void*, size_t len, int) const { return len; }
134 : :
135 : 0 : ssize_t ZeroSock::Recv(void* buf, size_t len, int flags) const
136 : : {
137 : 0 : memset(buf, 0x0, len);
138 : 0 : return len;
139 : : }
140 : :
141 : 144 : int ZeroSock::Connect(const sockaddr*, socklen_t) const { return 0; }
142 : :
143 : 0 : int ZeroSock::Bind(const sockaddr*, socklen_t) const { return 0; }
144 : :
145 : 0 : int ZeroSock::Listen(int) const { return 0; }
146 : :
147 : 0 : std::unique_ptr<Sock> ZeroSock::Accept(sockaddr* addr, socklen_t* addr_len) const
148 : : {
149 [ # # ]: 0 : if (addr != nullptr) {
150 : : // Pretend all connections come from 5.5.5.5:6789
151 : 0 : memset(addr, 0x00, *addr_len);
152 : 0 : const socklen_t write_len = static_cast<socklen_t>(sizeof(sockaddr_in));
153 [ # # ]: 0 : if (*addr_len >= write_len) {
154 : 0 : *addr_len = write_len;
155 : 0 : sockaddr_in* addr_in = reinterpret_cast<sockaddr_in*>(addr);
156 : 0 : addr_in->sin_family = AF_INET;
157 : 0 : memset(&addr_in->sin_addr, 0x05, sizeof(addr_in->sin_addr));
158 : 0 : addr_in->sin_port = htons(6789);
159 : 0 : }
160 : 0 : }
161 : 0 : return std::make_unique<ZeroSock>();
162 : : }
163 : :
164 : 0 : int ZeroSock::GetSockOpt(int level, int opt_name, void* opt_val, socklen_t* opt_len) const
165 : : {
166 : 0 : std::memset(opt_val, 0x0, *opt_len);
167 : 0 : return 0;
168 : : }
169 : :
170 : 0 : int ZeroSock::SetSockOpt(int, int, const void*, socklen_t) const { return 0; }
171 : :
172 : 144 : int ZeroSock::GetSockName(sockaddr* name, socklen_t* name_len) const
173 : : {
174 : 144 : std::memset(name, 0x0, *name_len);
175 : 144 : return 0;
176 : : }
177 : :
178 : 0 : bool ZeroSock::SetNonBlocking() const { return true; }
179 : 0 :
180 : 0 : bool ZeroSock::IsSelectable() const { return true; }
181 : :
182 : 0 : bool ZeroSock::Wait(std::chrono::milliseconds timeout, Event requested, Event* occurred) const
183 : : {
184 [ # # ]: 0 : if (occurred != nullptr) {
185 : 0 : *occurred = requested;
186 : 0 : }
187 : 0 : return true;
188 : : }
189 : :
190 : 0 : bool ZeroSock::WaitMany(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock) const
191 : : {
192 [ # # ]: 0 : for (auto& [sock, events] : events_per_sock) {
193 : : (void)sock;
194 : 0 : events.occurred = events.requested;
195 : : }
196 : 0 : return true;
197 : : }
198 : :
199 : 0 : ZeroSock& ZeroSock::operator=(Sock&& other)
200 : : {
201 : 0 : assert(false && "Move of Sock into ZeroSock not allowed.");
202 : : return *this;
203 : : }
204 : :
205 : 0 : StaticContentsSock::StaticContentsSock(const std::string& contents)
206 [ # # ]: 0 : : m_contents{contents}
207 : 0 : {
208 : 0 : }
209 : :
210 : 0 : ssize_t StaticContentsSock::Recv(void* buf, size_t len, int flags) const
211 : : {
212 : 0 : const size_t consume_bytes{std::min(len, m_contents.size() - m_consumed)};
213 : 0 : std::memcpy(buf, m_contents.data() + m_consumed, consume_bytes);
214 [ # # ]: 0 : if ((flags & MSG_PEEK) == 0) {
215 : 0 : m_consumed += consume_bytes;
216 : 0 : }
217 : 0 : return consume_bytes;
218 : : }
219 : :
220 : 0 : StaticContentsSock& StaticContentsSock::operator=(Sock&& other)
221 : : {
222 : 0 : assert(false && "Move of Sock into StaticContentsSock not allowed.");
223 : : return *this;
224 : : }
225 : :
226 : 5732 : ssize_t DynSock::Pipe::GetBytes(void* buf, size_t len, int flags)
227 : : {
228 : 5732 : WAIT_LOCK(m_mutex, lock);
229 : :
230 [ + + ]: 5732 : if (m_data.empty()) {
231 [ + + ]: 5248 : if (m_eof) {
232 : 2 : return 0;
233 : : }
234 : 5246 : errno = EAGAIN; // Same as recv(2) on a non-blocking socket.
235 : 5246 : return -1;
236 : : }
237 : :
238 [ + - ]: 484 : const size_t read_bytes{std::min(len, m_data.size())};
239 : :
240 : 484 : std::memcpy(buf, m_data.data(), read_bytes);
241 [ + + ]: 484 : if ((flags & MSG_PEEK) == 0) {
242 [ + - ]: 314 : m_data.erase(m_data.begin(), m_data.begin() + read_bytes);
243 : 314 : }
244 : :
245 : 484 : return read_bytes;
246 : 5732 : }
247 : :
248 : 206 : std::optional<CNetMessage> DynSock::Pipe::GetNetMsg()
249 : : {
250 : 206 : V1Transport transport{/*node_id=*/0, /*nTypeIn=*/SER_NETWORK, /*nVersionIn=*/INIT_PROTO_VERSION};
251 : :
252 : : {
253 [ + - ][ + - ]: 206 : WAIT_LOCK(m_mutex, lock);
254 : :
255 : 413 : auto WaitForDataOrEof = [&]() {
256 : 596 : m_cond.wait(lock, [&]() EXCLUSIVE_LOCKS_REQUIRED(m_mutex) {
257 : 389 : AssertLockHeld(m_mutex);
258 [ + + ]: 389 : return !m_data.empty() || m_eof;
259 : : });
260 : 207 : };
261 : :
262 [ + - ]: 206 : WaitForDataOrEof();
263 [ + + ][ + + ]: 206 : if (m_eof && m_data.empty()) {
264 : 141 : return std::nullopt;
265 : : }
266 : :
267 : 94 : for (;;) {
268 [ + - ]: 94 : Span<const uint8_t> s{m_data};
269 [ + - ][ + - ]: 94 : if (!transport.ReceivedBytes(s)) { // Consumed bytes are removed from the front of s.
270 : 0 : return std::nullopt;
271 : : }
272 [ + - ]: 94 : m_data.erase(m_data.begin(), m_data.begin() + m_data.size() - s.size());
273 [ + - ][ + + ]: 94 : if (transport.ReceivedMessageComplete()) {
274 : 64 : break;
275 : : }
276 [ + + ]: 30 : if (m_data.empty()) {
277 [ + - ]: 1 : WaitForDataOrEof();
278 [ + - ][ - + ]: 1 : if (m_eof && m_data.empty()) {
279 : 1 : return std::nullopt;
280 : : }
281 : 0 : }
282 : : }
283 [ + + ]: 206 : }
284 : :
285 : 64 : bool reject{false};
286 [ + - ]: 64 : CNetMessage msg{transport.GetReceivedMessage(/*time=*/{}, reject)};
287 [ - + ]: 64 : if (reject) {
288 : 0 : return std::nullopt;
289 : : }
290 : 64 : return std::make_optional<CNetMessage>(std::move(msg));
291 : 206 : }
292 : :
293 : 538 : void DynSock::Pipe::PushBytes(const void* buf, size_t len)
294 : : {
295 : 538 : LOCK(m_mutex);
296 : 538 : const uint8_t* b = static_cast<const uint8_t*>(buf);
297 [ + - ]: 538 : m_data.insert(m_data.end(), b, b + len);
298 : 538 : m_cond.notify_all();
299 : 538 : }
300 : :
301 : 144 : void DynSock::Pipe::Eof()
302 : : {
303 : 144 : LOCK(m_mutex);
304 : 144 : m_eof = true;
305 : 144 : m_cond.notify_all();
306 : 144 : }
307 : :
308 : 144 : DynSock::DynSock(std::shared_ptr<Pipes> pipes) : m_pipes{pipes} {}
309 : :
310 : 288 : DynSock::~DynSock()
311 : 288 : {
312 [ + - ]: 144 : m_pipes->send.Eof();
313 : 288 : }
314 : :
315 : 170 : ssize_t DynSock::Recv(void* buf, size_t len, int flags) const
316 : : {
317 : 170 : return m_pipes->recv.GetBytes(buf, len, flags);
318 : : }
319 : :
320 : 396 : ssize_t DynSock::Send(const void* buf, size_t len, int) const
321 : : {
322 : 396 : m_pipes->send.PushBytes(buf, len);
323 : 396 : return len;
324 : : }
325 : :
326 : 0 : bool DynSock::Wait(std::chrono::milliseconds timeout,
327 : : Event requested,
328 : : Event* occurred) const
329 : : {
330 : 0 : EventsPerSock ev;
331 [ # # ][ # # ]: 0 : ev.emplace(this, Events{requested});
332 [ # # ]: 0 : const bool ret{WaitMany(timeout, ev)};
333 [ # # ]: 0 : if (occurred != nullptr) {
334 : 0 : *occurred = ev.begin()->second.occurred;
335 : 0 : }
336 : 0 : return ret;
337 : 0 : }
338 : :
339 : 170 : bool DynSock::WaitMany(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock) const
340 : : {
341 : 170 : const auto deadline = std::chrono::steady_clock::now() + timeout;
342 : 170 : bool at_least_one_event_occurred{false};
343 : :
344 : 171 : for (;;) {
345 : : // Check all sockets for readiness without waiting.
346 [ + + ]: 342 : for (auto& [sock, events] : events_per_sock) {
347 [ + - ]: 171 : if ((events.requested & Sock::SEND) != 0) {
348 : : // Always ready for Send().
349 : 0 : events.occurred |= Sock::SEND;
350 : 0 : at_least_one_event_occurred = true;
351 : 0 : }
352 : :
353 [ - + ]: 171 : if ((events.requested & Sock::RECV) != 0) {
354 : 171 : auto dyn_sock = reinterpret_cast<const DynSock*>(sock.get());
355 : : uint8_t b;
356 [ + + ]: 171 : if (dyn_sock->m_pipes->recv.GetBytes(&b, 1, MSG_PEEK) == 1) {
357 : 170 : events.occurred |= Sock::RECV;
358 : 170 : at_least_one_event_occurred = true;
359 : 170 : }
360 : 171 : }
361 : : }
362 : :
363 [ + + ][ + + ]: 171 : if (at_least_one_event_occurred || std::chrono::steady_clock::now() > deadline) {
364 : 170 : break;
365 : : }
366 : :
367 : 1 : std::this_thread::sleep_for(10ms);
368 : : }
369 : :
370 : 170 : return true;
371 : : }
372 : :
373 : 0 : DynSock& DynSock::operator=(Sock&&)
374 : : {
375 : 0 : assert(false && "Move of Sock into DynSock not allowed.");
376 : : return *this;
377 : : }
|