Skip to content

Commit 754e38b

Browse files
committed
.
1 parent 9c592d0 commit 754e38b

File tree

6 files changed

+258
-173
lines changed

6 files changed

+258
-173
lines changed

src/test/core/Workers_test.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,8 @@ class PerfLogTest : public PerfLog
9999
}
100100

101101
Peer&
102-
getPeerCounters() override {
102+
getPeerCounters() override
103+
{
103104
return peer;
104105
}
105106
};

src/xrpld/overlay/detail/ConnectAttempt.cpp

Lines changed: 57 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ ConnectAttempt::stop()
7474
{
7575
JLOG(journal_.debug()) << "Stop";
7676
}
77-
++app_.getPerfLog().getPeerCounters().connection.connectCloseStop;
77+
++app_.getPerfLog().getPeerCounters().connection.outboundConnectCloseStop;
7878
close();
7979
}
8080

@@ -153,10 +153,10 @@ ConnectAttempt::onTimer(error_code ec)
153153
{
154154
// This should never happen
155155
JLOG(journal_.error()) << "onTimer: " << ec.message();
156-
++app_.getPerfLog().getPeerCounters().connection.connectCloseOnTimer;
156+
++app_.getPerfLog().getPeerCounters().connection.outboundConnectCloseOnTimer;
157157
return close();
158158
}
159-
++app_.getPerfLog().getPeerCounters().connection.connectFailTimeouts;
159+
++app_.getPerfLog().getPeerCounters().connection.outboundConnectFailTimeouts;
160160
fail("Timeout");
161161
}
162162

@@ -170,8 +170,11 @@ ConnectAttempt::onConnect(error_code ec)
170170
endpoint_type local_endpoint;
171171
if (!ec)
172172
local_endpoint = socket_.local_endpoint(ec);
173-
if (ec) {
174-
++app_.getPerfLog().getPeerCounters().connection.connectFailOnConnectError;
173+
if (ec)
174+
{
175+
++app_.getPerfLog()
176+
.getPeerCounters()
177+
.connection.outboundConnectFailOnConnectError;
175178
return fail("onConnect", ec);
176179
}
177180
if (!socket_.is_open())
@@ -199,21 +202,30 @@ ConnectAttempt::onHandshake(error_code ec)
199202
endpoint_type local_endpoint;
200203
if (!ec)
201204
local_endpoint = socket_.local_endpoint(ec);
202-
if (ec) {
203-
++app_.getPerfLog().getPeerCounters().connection.connectFailOnHandshakeError;
205+
if (ec)
206+
{
207+
++app_.getPerfLog()
208+
.getPeerCounters()
209+
.connection.outboundConnectFailOnHandshakeError;
204210
return fail("onHandshake", ec);
205211
}
206212
JLOG(journal_.trace()) << "onHandshake";
207213

208214
if (!overlay_.peerFinder().onConnected(
209-
slot_, beast::IPAddressConversion::from_asio(local_endpoint))) {
210-
++app_.getPerfLog().getPeerCounters().connection.connectFailOnHandshakeDuplicate;
215+
slot_, beast::IPAddressConversion::from_asio(local_endpoint)))
216+
{
217+
++app_.getPerfLog()
218+
.getPeerCounters()
219+
.connection.outboundConnectFailOnHandshakeDuplicate;
211220
return fail("Duplicate connection");
212221
}
213222

214223
auto const sharedValue = makeSharedValue(*stream_ptr_, journal_);
215-
if (!sharedValue) {
216-
++app_.getPerfLog().getPeerCounters().connection.connectCloseOnHandshake;
224+
if (!sharedValue)
225+
{
226+
++app_.getPerfLog()
227+
.getPeerCounters()
228+
.connection.outboundConnectCloseOnHandshake;
217229
return close(); // makeSharedValue logs
218230
}
219231

@@ -250,8 +262,11 @@ ConnectAttempt::onWrite(error_code ec)
250262
return;
251263
if (ec == boost::asio::error::operation_aborted)
252264
return;
253-
if (ec) {
254-
++app_.getPerfLog().getPeerCounters().connection.connectFailOnWriteError;
265+
if (ec)
266+
{
267+
++app_.getPerfLog()
268+
.getPeerCounters()
269+
.connection.outboundConnectFailOnWriteError;
255270
return fail("onWrite", ec);
256271
}
257272
boost::beast::http::async_read(
@@ -282,8 +297,9 @@ ConnectAttempt::onRead(error_code ec)
282297
shared_from_this(),
283298
std::placeholders::_1)));
284299
}
285-
if (ec) {
286-
++app_.getPerfLog().getPeerCounters().connection.connectFailOnReadError;
300+
if (ec)
301+
{
302+
++app_.getPerfLog().getPeerCounters().connection.outboundConnectFailOnReadError;
287303
return fail("onRead", ec);
288304
}
289305
processResponse();
@@ -296,14 +312,19 @@ ConnectAttempt::onShutdown(error_code ec)
296312
if (!ec)
297313
{
298314
JLOG(journal_.error()) << "onShutdown: expected error condition";
299-
++app_.getPerfLog().getPeerCounters().connection.connectCloseOnShutdownNoError;
315+
++app_.getPerfLog()
316+
.getPeerCounters()
317+
.connection.outboundConnectCloseOnShutdownNoError;
300318
return close();
301319
}
302-
if (ec != boost::asio::error::eof) {
303-
++app_.getPerfLog().getPeerCounters().connection.connectFailOnShutdownError;
320+
if (ec != boost::asio::error::eof)
321+
{
322+
++app_.getPerfLog()
323+
.getPeerCounters()
324+
.connection.outboundConnectFailOnShutdownError;
304325
return fail("onShutdown", ec);
305326
}
306-
++app_.getPerfLog().getPeerCounters().connection.connectCloseOnShutdown;
327+
++app_.getPerfLog().getPeerCounters().connection.outboundConnectCloseOnShutdown;
307328
close();
308329
}
309330

@@ -353,7 +374,7 @@ ConnectAttempt::processResponse()
353374
JLOG(journal_.info())
354375
<< "Unable to upgrade to peer protocol: " << response_.result()
355376
<< " (" << response_.reason() << ")";
356-
++app_.getPerfLog().getPeerCounters().connection.connectCloseUpgrade;
377+
++app_.getPerfLog().getPeerCounters().connection.outboundConnectCloseUpgrade;
357378
return close();
358379
}
359380

@@ -367,16 +388,20 @@ ConnectAttempt::processResponse()
367388
if (pvs.size() == 1 && isProtocolSupported(pvs[0]))
368389
negotiatedProtocol = pvs[0];
369390

370-
if (!negotiatedProtocol) {
371-
++app_.getPerfLog().getPeerCounters().connection.connectFailProtocol;
391+
if (!negotiatedProtocol)
392+
{
393+
++app_.getPerfLog()
394+
.getPeerCounters()
395+
.connection.outboundConnectFailProtocol;
372396
return fail(
373397
"processResponse: Unable to negotiate protocol version");
374398
}
375399
}
376400

377401
auto const sharedValue = makeSharedValue(*stream_ptr_, journal_);
378-
if (!sharedValue) {
379-
++app_.getPerfLog().getPeerCounters().connection.connectCloseShared;
402+
if (!sharedValue)
403+
{
404+
++app_.getPerfLog().getPeerCounters().connection.outboundConnectCloseShared;
380405
return close(); // makeSharedValue logs
381406
}
382407

@@ -404,8 +429,11 @@ ConnectAttempt::processResponse()
404429

405430
auto const result = overlay_.peerFinder().activate(
406431
slot_, publicKey, static_cast<bool>(member));
407-
if (result != PeerFinder::Result::success) {
408-
++app_.getPerfLog().getPeerCounters().connection.connectFailSlotsFull;
432+
if (result != PeerFinder::Result::success)
433+
{
434+
++app_.getPerfLog()
435+
.getPeerCounters()
436+
.connection.outboundConnectFailSlotsFull;
409437
return fail("Outbound slots full");
410438
}
411439

@@ -425,7 +453,9 @@ ConnectAttempt::processResponse()
425453
}
426454
catch (std::exception const& e)
427455
{
428-
++app_.getPerfLog().getPeerCounters().connection.connectFailOnHandshakeFailure;
456+
++app_.getPerfLog()
457+
.getPeerCounters()
458+
.connection.outboundConnectFailOnHandshakeFailure;
429459
return fail(std::string("Handshake failure (") + e.what() + ")");
430460
}
431461
}

src/xrpld/overlay/detail/PeerImp.cpp

Lines changed: 29 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -243,11 +243,13 @@ PeerImp::send(std::shared_ptr<Message> const& m)
243243
if (!strand_.running_in_this_thread())
244244
return post(strand_, std::bind(&PeerImp::send, shared_from_this(), m));
245245
auto& peerCounters = app_.getPerfLog().getPeerCounters();
246-
if (gracefulClose_) {
246+
if (gracefulClose_)
247+
{
247248
++peerCounters.send.sendQueueFailedGracefulClose;
248249
return;
249250
}
250-
if (detaching_) {
251+
if (detaching_)
252+
{
251253
++peerCounters.send.sendQueueFailedDetaching;
252254
return;
253255
}
@@ -270,8 +272,7 @@ PeerImp::send(std::shared_ptr<Message> const& m)
270272

271273
// report total outgoing traffic
272274
overlay_.reportOutboundTraffic(
273-
TrafficCount::category::total,
274-
static_cast<int>(msgSize));
275+
TrafficCount::category::total, static_cast<int>(msgSize));
275276

276277
auto sendq_size = send_queue_.size();
277278

@@ -291,8 +292,10 @@ PeerImp::send(std::shared_ptr<Message> const& m)
291292
}
292293

293294
send_queue_.push(m);
294-
peerCounters.queuedPeerMessage(m->getType(
295-
m->getBuffer(compressionEnabled_).data()), msgSize, journal_);
295+
peerCounters.queuedPeerMessage(
296+
m->getType(m->getBuffer(compressionEnabled_).data()),
297+
msgSize,
298+
journal_);
296299

297300
if (sendq_size != 0)
298301
return;
@@ -367,9 +370,13 @@ PeerImp::charge(Resource::Charge const& fee, std::string const& context)
367370
// Sever the connection
368371
overlay_.incPeerDisconnectCharges();
369372
if (inbound_)
370-
++app_.getPerfLog().getPeerCounters().connection.disconnectInboundResources;
373+
++app_.getPerfLog()
374+
.getPeerCounters()
375+
.connection.disconnectInboundResources;
371376
else
372-
++app_.getPerfLog().getPeerCounters().connection.disconnectOutboundResources;
377+
++app_.getPerfLog()
378+
.getPeerCounters()
379+
.connection.disconnectOutboundResources;
373380
fail("charge: Resources");
374381
}
375382
}
@@ -602,12 +609,16 @@ PeerImp::close()
602609
if (inbound_)
603610
{
604611
JLOG(journal_.debug()) << "Closed";
605-
++app_.getPerfLog().getPeerCounters().connection.totalInboundDisconnects;
612+
++app_.getPerfLog()
613+
.getPeerCounters()
614+
.connection.totalInboundDisconnects;
606615
}
607616
else
608617
{
609618
JLOG(journal_.info()) << "Closed";
610-
++app_.getPerfLog().getPeerCounters().connection.totalOutboundDisconnects;
619+
++app_.getPerfLog()
620+
.getPeerCounters()
621+
.connection.totalOutboundDisconnects;
611622
}
612623
}
613624
}
@@ -939,8 +950,8 @@ PeerImp::onReadMessage(error_code ec, std::size_t bytes_transferred)
939950
using namespace std::chrono_literals;
940951
std::tie(bytes_consumed, ec) = perf::measureDurationAndLog(
941952
[&]() {
942-
return invokeProtocolMessage(read_buffer_.data(), *this, hint,
943-
peerCounters, journal_);
953+
return invokeProtocolMessage(
954+
read_buffer_.data(), *this, hint, peerCounters, journal_);
944955
},
945956
"invokeProtocolMessage",
946957
350ms,
@@ -973,15 +984,18 @@ void
973984
PeerImp::onWriteMessage(error_code ec, std::size_t bytes_transferred)
974985
{
975986
auto& peerCounters = app_.getPerfLog().getPeerCounters();
976-
if (!socket_.is_open()) {
987+
if (!socket_.is_open())
988+
{
977989
++peerCounters.send.sendFailedClosed;
978990
return;
979991
}
980-
if (ec == boost::asio::error::operation_aborted) {
992+
if (ec == boost::asio::error::operation_aborted)
993+
{
981994
++peerCounters.send.sendFailedAborted;
982995
return;
983996
}
984-
if (ec) {
997+
if (ec)
998+
{
985999
++peerCounters.send.sendFailedOther;
9861000
return fail("onWriteMessage", ec);
9871001
}

src/xrpld/overlay/detail/ProtocolMessage.h

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
#include <xrpld/overlay/Message.h>
2525
#include <xrpld/overlay/detail/ZeroCopyStream.h>
2626
#include <xrpld/perflog/PerfLog.h>
27+
2728
#include <xrpl/beast/utility/instrumentation.h>
2829
#include <xrpl/protocol/messages.h>
2930

@@ -344,7 +345,8 @@ invokeProtocolMessage(
344345

345346
auto const size = boost::asio::buffer_size(buffers);
346347

347-
if (size == 0) {
348+
if (size == 0)
349+
{
348350
++peerCounters.receive.receiveFailedZeroSize;
349351
return result;
350352
}
@@ -356,7 +358,8 @@ invokeProtocolMessage(
356358
// Otherwise we failed to match the header's marker (error_code is set to
357359
// no_message) or the compression algorithm is invalid (error_code is
358360
// protocol_error) and signal an error.
359-
if (!header) {
361+
if (!header)
362+
{
360363
++peerCounters.receive.receiveFailedHeader;
361364
return result;
362365
}
@@ -392,7 +395,8 @@ invokeProtocolMessage(
392395

393396
bool success;
394397

395-
peerCounters.receivedPeerMessage(header->message_type, header->payload_wire_size, j);
398+
peerCounters.receivedPeerMessage(
399+
header->message_type, header->payload_wire_size, j);
396400
switch (header->message_type)
397401
{
398402
case protocol::mtMANIFESTS:

0 commit comments

Comments
 (0)