Skip to content

Commit a9739b7

Browse files
committed
Add detailed peer statistics to PerfLog for both
perf_log and server_info counters.
1 parent d494bf4 commit a9739b7

File tree

10 files changed

+502
-19
lines changed

10 files changed

+502
-19
lines changed

src/test/core/Workers_test.cpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ namespace perf {
3939

4040
class PerfLogTest : public PerfLog
4141
{
42+
Peer peer;
43+
4244
void
4345
rpcStart(std::string const& method, std::uint64_t requestId) override
4446
{
@@ -95,6 +97,12 @@ class PerfLogTest : public PerfLog
9597
rotate() override
9698
{
9799
}
100+
101+
Peer&
102+
getPeerCounters() override
103+
{
104+
return peer;
105+
}
98106
};
99107

100108
} // namespace perf

src/xrpld/overlay/Message.h

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,15 @@ class Message : public std::enable_shared_from_this<Message>
9696
return validatorKey_;
9797
}
9898

99+
/** Get the message type from the payload header.
100+
* First four bytes are the compression/algorithm flag and the payload size.
101+
* Next two bytes are the message type
102+
* @param in Payload header pointer
103+
* @return Message type
104+
*/
105+
int
106+
getType(std::uint8_t const* in) const;
107+
99108
private:
100109
std::vector<uint8_t> buffer_;
101110
std::vector<uint8_t> bufferCompressed_;
@@ -125,15 +134,6 @@ class Message : public std::enable_shared_from_this<Message>
125134
*/
126135
void
127136
compress();
128-
129-
/** Get the message type from the payload header.
130-
* First four bytes are the compression/algorithm flag and the payload size.
131-
* Next two bytes are the message type
132-
* @param in Payload header pointer
133-
* @return Message type
134-
*/
135-
int
136-
getType(std::uint8_t const* in) const;
137137
};
138138

139139
} // namespace ripple

src/xrpld/overlay/detail/ConnectAttempt.cpp

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
#include <xrpld/overlay/detail/ConnectAttempt.h>
2222
#include <xrpld/overlay/detail/PeerImp.h>
2323
#include <xrpld/overlay/detail/ProtocolVersion.h>
24+
#include <xrpld/perflog/PerfLog.h>
2425

2526
#include <xrpl/json/json_reader.h>
2627

@@ -52,6 +53,7 @@ ConnectAttempt::ConnectAttempt(
5253
, stream_(*stream_ptr_)
5354
, slot_(slot)
5455
{
56+
++app_.getPerfLog().getPeerCounters().connection.totalOutboundAttempts;
5557
JLOG(journal_.debug()) << "Connect " << remote_endpoint;
5658
}
5759

@@ -72,6 +74,7 @@ ConnectAttempt::stop()
7274
{
7375
JLOG(journal_.debug()) << "Stop";
7476
}
77+
++app_.getPerfLog().getPeerCounters().connection.outboundConnectCloseStop;
7578
close();
7679
}
7780

@@ -150,8 +153,14 @@ ConnectAttempt::onTimer(error_code ec)
150153
{
151154
// This should never happen
152155
JLOG(journal_.error()) << "onTimer: " << ec.message();
156+
++app_.getPerfLog()
157+
.getPeerCounters()
158+
.connection.outboundConnectCloseOnTimer;
153159
return close();
154160
}
161+
++app_.getPerfLog()
162+
.getPeerCounters()
163+
.connection.outboundConnectFailTimeouts;
155164
fail("Timeout");
156165
}
157166

@@ -166,7 +175,12 @@ ConnectAttempt::onConnect(error_code ec)
166175
if (!ec)
167176
local_endpoint = socket_.local_endpoint(ec);
168177
if (ec)
178+
{
179+
++app_.getPerfLog()
180+
.getPeerCounters()
181+
.connection.outboundConnectFailOnConnectError;
169182
return fail("onConnect", ec);
183+
}
170184
if (!socket_.is_open())
171185
return;
172186
JLOG(journal_.trace()) << "onConnect";
@@ -193,16 +207,31 @@ ConnectAttempt::onHandshake(error_code ec)
193207
if (!ec)
194208
local_endpoint = socket_.local_endpoint(ec);
195209
if (ec)
210+
{
211+
++app_.getPerfLog()
212+
.getPeerCounters()
213+
.connection.outboundConnectFailOnHandshakeError;
196214
return fail("onHandshake", ec);
215+
}
197216
JLOG(journal_.trace()) << "onHandshake";
198217

199218
if (!overlay_.peerFinder().onConnected(
200219
slot_, beast::IPAddressConversion::from_asio(local_endpoint)))
220+
{
221+
++app_.getPerfLog()
222+
.getPeerCounters()
223+
.connection.outboundConnectFailOnHandshakeDuplicate;
201224
return fail("Duplicate connection");
225+
}
202226

203227
auto const sharedValue = makeSharedValue(*stream_ptr_, journal_);
204228
if (!sharedValue)
229+
{
230+
++app_.getPerfLog()
231+
.getPeerCounters()
232+
.connection.outboundConnectCloseOnHandshake;
205233
return close(); // makeSharedValue logs
234+
}
206235

207236
req_ = makeRequest(
208237
!overlay_.peerFinder().config().peerPrivate,
@@ -238,7 +267,12 @@ ConnectAttempt::onWrite(error_code ec)
238267
if (ec == boost::asio::error::operation_aborted)
239268
return;
240269
if (ec)
270+
{
271+
++app_.getPerfLog()
272+
.getPeerCounters()
273+
.connection.outboundConnectFailOnWriteError;
241274
return fail("onWrite", ec);
275+
}
242276
boost::beast::http::async_read(
243277
stream_,
244278
read_buf_,
@@ -268,7 +302,12 @@ ConnectAttempt::onRead(error_code ec)
268302
std::placeholders::_1)));
269303
}
270304
if (ec)
305+
{
306+
++app_.getPerfLog()
307+
.getPeerCounters()
308+
.connection.outboundConnectFailOnReadError;
271309
return fail("onRead", ec);
310+
}
272311
processResponse();
273312
}
274313

@@ -279,10 +318,21 @@ ConnectAttempt::onShutdown(error_code ec)
279318
if (!ec)
280319
{
281320
JLOG(journal_.error()) << "onShutdown: expected error condition";
321+
++app_.getPerfLog()
322+
.getPeerCounters()
323+
.connection.outboundConnectCloseOnShutdownNoError;
282324
return close();
283325
}
284326
if (ec != boost::asio::error::eof)
327+
{
328+
++app_.getPerfLog()
329+
.getPeerCounters()
330+
.connection.outboundConnectFailOnShutdownError;
285331
return fail("onShutdown", ec);
332+
}
333+
++app_.getPerfLog()
334+
.getPeerCounters()
335+
.connection.outboundConnectCloseOnShutdown;
286336
close();
287337
}
288338

@@ -332,6 +382,9 @@ ConnectAttempt::processResponse()
332382
JLOG(journal_.info())
333383
<< "Unable to upgrade to peer protocol: " << response_.result()
334384
<< " (" << response_.reason() << ")";
385+
++app_.getPerfLog()
386+
.getPeerCounters()
387+
.connection.outboundConnectCloseUpgrade;
335388
return close();
336389
}
337390

@@ -346,13 +399,23 @@ ConnectAttempt::processResponse()
346399
negotiatedProtocol = pvs[0];
347400

348401
if (!negotiatedProtocol)
402+
{
403+
++app_.getPerfLog()
404+
.getPeerCounters()
405+
.connection.outboundConnectFailProtocol;
349406
return fail(
350407
"processResponse: Unable to negotiate protocol version");
408+
}
351409
}
352410

353411
auto const sharedValue = makeSharedValue(*stream_ptr_, journal_);
354412
if (!sharedValue)
413+
{
414+
++app_.getPerfLog()
415+
.getPeerCounters()
416+
.connection.outboundConnectCloseShared;
355417
return close(); // makeSharedValue logs
418+
}
356419

357420
try
358421
{
@@ -379,7 +442,12 @@ ConnectAttempt::processResponse()
379442
auto const result = overlay_.peerFinder().activate(
380443
slot_, publicKey, static_cast<bool>(member));
381444
if (result != PeerFinder::Result::success)
445+
{
446+
++app_.getPerfLog()
447+
.getPeerCounters()
448+
.connection.outboundConnectFailSlotsFull;
382449
return fail("Outbound slots full");
450+
}
383451

384452
auto const peer = std::make_shared<PeerImp>(
385453
app_,
@@ -397,6 +465,9 @@ ConnectAttempt::processResponse()
397465
}
398466
catch (std::exception const& e)
399467
{
468+
++app_.getPerfLog()
469+
.getPeerCounters()
470+
.connection.outboundConnectFailOnHandshakeFailure;
400471
return fail(std::string("Handshake failure (") + e.what() + ")");
401472
}
402473
}

src/xrpld/overlay/detail/OverlayImpl.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,7 @@ OverlayImpl::onHandoff(
165165
http_request_type&& request,
166166
endpoint_type remote_endpoint)
167167
{
168+
++app_.getPerfLog().getPeerCounters().connection.totalInboundAttempts;
168169
auto const id = next_id_++;
169170
beast::WrappedSink sink(app_.logs()["Peer"], makePrefix(id));
170171
beast::Journal journal(sink);

src/xrpld/overlay/detail/PeerImp.cpp

Lines changed: 47 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ PeerImp::PeerImp(
119119
app_.config().LEDGER_REPLAY))
120120
, ledgerReplayMsgHandler_(app, app.getLedgerReplayer())
121121
{
122+
++app_.getPerfLog().getPeerCounters().connection.totalInboundConnects;
122123
JLOG(journal_.info())
123124
<< "compression enabled " << (compressionEnabled_ == Compressed::On)
124125
<< " vp reduce-relay base squelch enabled "
@@ -241,29 +242,37 @@ PeerImp::send(std::shared_ptr<Message> const& m)
241242
{
242243
if (!strand_.running_in_this_thread())
243244
return post(strand_, std::bind(&PeerImp::send, shared_from_this(), m));
245+
auto& peerCounters = app_.getPerfLog().getPeerCounters();
244246
if (gracefulClose_)
247+
{
248+
++peerCounters.send.sendQueueFailedGracefulClose;
245249
return;
250+
}
246251
if (detaching_)
252+
{
253+
++peerCounters.send.sendQueueFailedDetaching;
247254
return;
255+
}
248256

257+
std::size_t const msgSize = m->getBuffer(compressionEnabled_).size();
249258
auto validator = m->getValidatorKey();
250259
if (validator && !squelch_.expireSquelch(*validator))
251260
{
261+
++peerCounters.send.sendQueueFailedSquelch;
252262
overlay_.reportOutboundTraffic(
253263
TrafficCount::category::squelch_suppressed,
254-
static_cast<int>(m->getBuffer(compressionEnabled_).size()));
264+
static_cast<int>(msgSize));
255265
return;
256266
}
257267

258268
// report categorized outgoing traffic
259269
overlay_.reportOutboundTraffic(
260270
safe_cast<TrafficCount::category>(m->getCategory()),
261-
static_cast<int>(m->getBuffer(compressionEnabled_).size()));
271+
static_cast<int>(msgSize));
262272

263273
// report total outgoing traffic
264274
overlay_.reportOutboundTraffic(
265-
TrafficCount::category::total,
266-
static_cast<int>(m->getBuffer(compressionEnabled_).size()));
275+
TrafficCount::category::total, static_cast<int>(msgSize));
267276

268277
auto sendq_size = send_queue_.size();
269278

@@ -283,6 +292,10 @@ PeerImp::send(std::shared_ptr<Message> const& m)
283292
}
284293

285294
send_queue_.push(m);
295+
peerCounters.queuedPeerMessage(
296+
m->getType(m->getBuffer(compressionEnabled_).data()),
297+
msgSize,
298+
journal_);
286299

287300
if (sendq_size != 0)
288301
return;
@@ -356,6 +369,14 @@ PeerImp::charge(Resource::Charge const& fee, std::string const& context)
356369
{
357370
// Sever the connection
358371
overlay_.incPeerDisconnectCharges();
372+
if (inbound_)
373+
++app_.getPerfLog()
374+
.getPeerCounters()
375+
.connection.disconnectInboundResources;
376+
else
377+
++app_.getPerfLog()
378+
.getPeerCounters()
379+
.connection.disconnectOutboundResources;
359380
fail("charge: Resources");
360381
}
361382
}
@@ -588,10 +609,16 @@ PeerImp::close()
588609
if (inbound_)
589610
{
590611
JLOG(journal_.debug()) << "Closed";
612+
++app_.getPerfLog()
613+
.getPeerCounters()
614+
.connection.totalInboundDisconnects;
591615
}
592616
else
593617
{
594618
JLOG(journal_.info()) << "Closed";
619+
++app_.getPerfLog()
620+
.getPeerCounters()
621+
.connection.totalOutboundDisconnects;
595622
}
596623
}
597624
}
@@ -889,6 +916,8 @@ PeerImp::doProtocolStart()
889916
void
890917
PeerImp::onReadMessage(error_code ec, std::size_t bytes_transferred)
891918
{
919+
auto& peerCounters = app_.getPerfLog().getPeerCounters();
920+
++peerCounters.receive.receivePackets;
892921
if (!socket_.is_open())
893922
return;
894923
if (ec == boost::asio::error::operation_aborted)
@@ -921,7 +950,8 @@ PeerImp::onReadMessage(error_code ec, std::size_t bytes_transferred)
921950
using namespace std::chrono_literals;
922951
std::tie(bytes_consumed, ec) = perf::measureDurationAndLog(
923952
[&]() {
924-
return invokeProtocolMessage(read_buffer_.data(), *this, hint);
953+
return invokeProtocolMessage(
954+
read_buffer_.data(), *this, hint, peerCounters, journal_);
925955
},
926956
"invokeProtocolMessage",
927957
350ms,
@@ -953,12 +983,24 @@ PeerImp::onReadMessage(error_code ec, std::size_t bytes_transferred)
953983
void
954984
PeerImp::onWriteMessage(error_code ec, std::size_t bytes_transferred)
955985
{
986+
auto& peerCounters = app_.getPerfLog().getPeerCounters();
956987
if (!socket_.is_open())
988+
{
989+
++peerCounters.send.sendFailedClosed;
957990
return;
991+
}
958992
if (ec == boost::asio::error::operation_aborted)
993+
{
994+
++peerCounters.send.sendFailedAborted;
959995
return;
996+
}
960997
if (ec)
998+
{
999+
++peerCounters.send.sendFailedOther;
9611000
return fail("onWriteMessage", ec);
1001+
}
1002+
++peerCounters.send.sent;
1003+
peerCounters.send.sentBytes += bytes_transferred;
9621004
if (auto stream = journal_.trace())
9631005
{
9641006
if (bytes_transferred > 0)

0 commit comments

Comments
 (0)