Skip to content

Commit 9c592d0

Browse files
committed
Add detailed peer statistics to PerfLog for both
perf_log and server_info counters.
1 parent d494bf4 commit 9c592d0

File tree

10 files changed

+411
-36
lines changed

10 files changed

+411
-36
lines changed

src/test/core/Workers_test.cpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ namespace perf {
3939

4040
class PerfLogTest : public PerfLog
4141
{
42+
Peer peer;
43+
4244
void
4345
rpcStart(std::string const& method, std::uint64_t requestId) override
4446
{
@@ -95,6 +97,11 @@ class PerfLogTest : public PerfLog
9597
rotate() override
9698
{
9799
}
100+
101+
Peer&
102+
getPeerCounters() override {
103+
return peer;
104+
}
98105
};
99106

100107
} // namespace perf

src/xrpld/overlay/Message.h

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,15 @@ class Message : public std::enable_shared_from_this<Message>
9696
return validatorKey_;
9797
}
9898

99+
/** Get the message type from the payload header.
100+
* First four bytes are the compression/algorithm flag and the payload size.
101+
* Next two bytes are the message type
102+
* @param in Payload header pointer
103+
* @return Message type
104+
*/
105+
int
106+
getType(std::uint8_t const* in) const;
107+
99108
private:
100109
std::vector<uint8_t> buffer_;
101110
std::vector<uint8_t> bufferCompressed_;
@@ -125,15 +134,6 @@ class Message : public std::enable_shared_from_this<Message>
125134
*/
126135
void
127136
compress();
128-
129-
/** Get the message type from the payload header.
130-
* First four bytes are the compression/algorithm flag and the payload size.
131-
* Next two bytes are the message type
132-
* @param in Payload header pointer
133-
* @return Message type
134-
*/
135-
int
136-
getType(std::uint8_t const* in) const;
137137
};
138138

139139
} // namespace ripple

src/xrpld/overlay/detail/ConnectAttempt.cpp

Lines changed: 39 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
#include <xrpld/overlay/detail/ConnectAttempt.h>
2222
#include <xrpld/overlay/detail/PeerImp.h>
2323
#include <xrpld/overlay/detail/ProtocolVersion.h>
24+
#include <xrpld/perflog/PerfLog.h>
2425

2526
#include <xrpl/json/json_reader.h>
2627

@@ -52,6 +53,7 @@ ConnectAttempt::ConnectAttempt(
5253
, stream_(*stream_ptr_)
5354
, slot_(slot)
5455
{
56+
++app_.getPerfLog().getPeerCounters().connection.totalOutboundAttempts;
5557
JLOG(journal_.debug()) << "Connect " << remote_endpoint;
5658
}
5759

@@ -72,6 +74,7 @@ ConnectAttempt::stop()
7274
{
7375
JLOG(journal_.debug()) << "Stop";
7476
}
77+
++app_.getPerfLog().getPeerCounters().connection.connectCloseStop;
7578
close();
7679
}
7780

@@ -150,8 +153,10 @@ ConnectAttempt::onTimer(error_code ec)
150153
{
151154
// This should never happen
152155
JLOG(journal_.error()) << "onTimer: " << ec.message();
156+
++app_.getPerfLog().getPeerCounters().connection.connectCloseOnTimer;
153157
return close();
154158
}
159+
++app_.getPerfLog().getPeerCounters().connection.connectFailTimeouts;
155160
fail("Timeout");
156161
}
157162

@@ -165,8 +170,10 @@ ConnectAttempt::onConnect(error_code ec)
165170
endpoint_type local_endpoint;
166171
if (!ec)
167172
local_endpoint = socket_.local_endpoint(ec);
168-
if (ec)
173+
if (ec) {
174+
++app_.getPerfLog().getPeerCounters().connection.connectFailOnConnectError;
169175
return fail("onConnect", ec);
176+
}
170177
if (!socket_.is_open())
171178
return;
172179
JLOG(journal_.trace()) << "onConnect";
@@ -192,17 +199,23 @@ ConnectAttempt::onHandshake(error_code ec)
192199
endpoint_type local_endpoint;
193200
if (!ec)
194201
local_endpoint = socket_.local_endpoint(ec);
195-
if (ec)
202+
if (ec) {
203+
++app_.getPerfLog().getPeerCounters().connection.connectFailOnHandshakeError;
196204
return fail("onHandshake", ec);
205+
}
197206
JLOG(journal_.trace()) << "onHandshake";
198207

199208
if (!overlay_.peerFinder().onConnected(
200-
slot_, beast::IPAddressConversion::from_asio(local_endpoint)))
209+
slot_, beast::IPAddressConversion::from_asio(local_endpoint))) {
210+
++app_.getPerfLog().getPeerCounters().connection.connectFailOnHandshakeDuplicate;
201211
return fail("Duplicate connection");
212+
}
202213

203214
auto const sharedValue = makeSharedValue(*stream_ptr_, journal_);
204-
if (!sharedValue)
215+
if (!sharedValue) {
216+
++app_.getPerfLog().getPeerCounters().connection.connectCloseOnHandshake;
205217
return close(); // makeSharedValue logs
218+
}
206219

207220
req_ = makeRequest(
208221
!overlay_.peerFinder().config().peerPrivate,
@@ -237,8 +250,10 @@ ConnectAttempt::onWrite(error_code ec)
237250
return;
238251
if (ec == boost::asio::error::operation_aborted)
239252
return;
240-
if (ec)
253+
if (ec) {
254+
++app_.getPerfLog().getPeerCounters().connection.connectFailOnWriteError;
241255
return fail("onWrite", ec);
256+
}
242257
boost::beast::http::async_read(
243258
stream_,
244259
read_buf_,
@@ -267,8 +282,10 @@ ConnectAttempt::onRead(error_code ec)
267282
shared_from_this(),
268283
std::placeholders::_1)));
269284
}
270-
if (ec)
285+
if (ec) {
286+
++app_.getPerfLog().getPeerCounters().connection.connectFailOnReadError;
271287
return fail("onRead", ec);
288+
}
272289
processResponse();
273290
}
274291

@@ -279,10 +296,14 @@ ConnectAttempt::onShutdown(error_code ec)
279296
if (!ec)
280297
{
281298
JLOG(journal_.error()) << "onShutdown: expected error condition";
299+
++app_.getPerfLog().getPeerCounters().connection.connectCloseOnShutdownNoError;
282300
return close();
283301
}
284-
if (ec != boost::asio::error::eof)
302+
if (ec != boost::asio::error::eof) {
303+
++app_.getPerfLog().getPeerCounters().connection.connectFailOnShutdownError;
285304
return fail("onShutdown", ec);
305+
}
306+
++app_.getPerfLog().getPeerCounters().connection.connectCloseOnShutdown;
286307
close();
287308
}
288309

@@ -332,6 +353,7 @@ ConnectAttempt::processResponse()
332353
JLOG(journal_.info())
333354
<< "Unable to upgrade to peer protocol: " << response_.result()
334355
<< " (" << response_.reason() << ")";
356+
++app_.getPerfLog().getPeerCounters().connection.connectCloseUpgrade;
335357
return close();
336358
}
337359

@@ -345,14 +367,18 @@ ConnectAttempt::processResponse()
345367
if (pvs.size() == 1 && isProtocolSupported(pvs[0]))
346368
negotiatedProtocol = pvs[0];
347369

348-
if (!negotiatedProtocol)
370+
if (!negotiatedProtocol) {
371+
++app_.getPerfLog().getPeerCounters().connection.connectFailProtocol;
349372
return fail(
350373
"processResponse: Unable to negotiate protocol version");
374+
}
351375
}
352376

353377
auto const sharedValue = makeSharedValue(*stream_ptr_, journal_);
354-
if (!sharedValue)
378+
if (!sharedValue) {
379+
++app_.getPerfLog().getPeerCounters().connection.connectCloseShared;
355380
return close(); // makeSharedValue logs
381+
}
356382

357383
try
358384
{
@@ -378,8 +404,10 @@ ConnectAttempt::processResponse()
378404

379405
auto const result = overlay_.peerFinder().activate(
380406
slot_, publicKey, static_cast<bool>(member));
381-
if (result != PeerFinder::Result::success)
407+
if (result != PeerFinder::Result::success) {
408+
++app_.getPerfLog().getPeerCounters().connection.connectFailSlotsFull;
382409
return fail("Outbound slots full");
410+
}
383411

384412
auto const peer = std::make_shared<PeerImp>(
385413
app_,
@@ -397,6 +425,7 @@ ConnectAttempt::processResponse()
397425
}
398426
catch (std::exception const& e)
399427
{
428+
++app_.getPerfLog().getPeerCounters().connection.connectFailOnHandshakeFailure;
400429
return fail(std::string("Handshake failure (") + e.what() + ")");
401430
}
402431
}

src/xrpld/overlay/detail/OverlayImpl.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,7 @@ OverlayImpl::onHandoff(
165165
http_request_type&& request,
166166
endpoint_type remote_endpoint)
167167
{
168+
++app_.getPerfLog().getPeerCounters().connection.totalInboundAttempts;
168169
auto const id = next_id_++;
169170
beast::WrappedSink sink(app_.logs()["Peer"], makePrefix(id));
170171
beast::Journal journal(sink);

src/xrpld/overlay/detail/PeerImp.cpp

Lines changed: 37 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ PeerImp::PeerImp(
119119
app_.config().LEDGER_REPLAY))
120120
, ledgerReplayMsgHandler_(app, app.getLedgerReplayer())
121121
{
122+
++app_.getPerfLog().getPeerCounters().connection.totalInboundConnects;
122123
JLOG(journal_.info())
123124
<< "compression enabled " << (compressionEnabled_ == Compressed::On)
124125
<< " vp reduce-relay base squelch enabled "
@@ -241,29 +242,36 @@ PeerImp::send(std::shared_ptr<Message> const& m)
241242
{
242243
if (!strand_.running_in_this_thread())
243244
return post(strand_, std::bind(&PeerImp::send, shared_from_this(), m));
244-
if (gracefulClose_)
245+
auto& peerCounters = app_.getPerfLog().getPeerCounters();
246+
if (gracefulClose_) {
247+
++peerCounters.send.sendQueueFailedGracefulClose;
245248
return;
246-
if (detaching_)
249+
}
250+
if (detaching_) {
251+
++peerCounters.send.sendQueueFailedDetaching;
247252
return;
253+
}
248254

255+
std::size_t const msgSize = m->getBuffer(compressionEnabled_).size();
249256
auto validator = m->getValidatorKey();
250257
if (validator && !squelch_.expireSquelch(*validator))
251258
{
259+
++peerCounters.send.sendQueueFailedSquelch;
252260
overlay_.reportOutboundTraffic(
253261
TrafficCount::category::squelch_suppressed,
254-
static_cast<int>(m->getBuffer(compressionEnabled_).size()));
262+
static_cast<int>(msgSize));
255263
return;
256264
}
257265

258266
// report categorized outgoing traffic
259267
overlay_.reportOutboundTraffic(
260268
safe_cast<TrafficCount::category>(m->getCategory()),
261-
static_cast<int>(m->getBuffer(compressionEnabled_).size()));
269+
static_cast<int>(msgSize));
262270

263271
// report total outgoing traffic
264272
overlay_.reportOutboundTraffic(
265273
TrafficCount::category::total,
266-
static_cast<int>(m->getBuffer(compressionEnabled_).size()));
274+
static_cast<int>(msgSize));
267275

268276
auto sendq_size = send_queue_.size();
269277

@@ -283,6 +291,8 @@ PeerImp::send(std::shared_ptr<Message> const& m)
283291
}
284292

285293
send_queue_.push(m);
294+
peerCounters.queuedPeerMessage(m->getType(
295+
m->getBuffer(compressionEnabled_).data()), msgSize, journal_);
286296

287297
if (sendq_size != 0)
288298
return;
@@ -356,6 +366,10 @@ PeerImp::charge(Resource::Charge const& fee, std::string const& context)
356366
{
357367
// Sever the connection
358368
overlay_.incPeerDisconnectCharges();
369+
if (inbound_)
370+
++app_.getPerfLog().getPeerCounters().connection.disconnectInboundResources;
371+
else
372+
++app_.getPerfLog().getPeerCounters().connection.disconnectOutboundResources;
359373
fail("charge: Resources");
360374
}
361375
}
@@ -588,10 +602,12 @@ PeerImp::close()
588602
if (inbound_)
589603
{
590604
JLOG(journal_.debug()) << "Closed";
605+
++app_.getPerfLog().getPeerCounters().connection.totalInboundDisconnects;
591606
}
592607
else
593608
{
594609
JLOG(journal_.info()) << "Closed";
610+
++app_.getPerfLog().getPeerCounters().connection.totalOutboundDisconnects;
595611
}
596612
}
597613
}
@@ -889,6 +905,8 @@ PeerImp::doProtocolStart()
889905
void
890906
PeerImp::onReadMessage(error_code ec, std::size_t bytes_transferred)
891907
{
908+
auto& peerCounters = app_.getPerfLog().getPeerCounters();
909+
++peerCounters.receive.receivePackets;
892910
if (!socket_.is_open())
893911
return;
894912
if (ec == boost::asio::error::operation_aborted)
@@ -921,7 +939,8 @@ PeerImp::onReadMessage(error_code ec, std::size_t bytes_transferred)
921939
using namespace std::chrono_literals;
922940
std::tie(bytes_consumed, ec) = perf::measureDurationAndLog(
923941
[&]() {
924-
return invokeProtocolMessage(read_buffer_.data(), *this, hint);
942+
return invokeProtocolMessage(read_buffer_.data(), *this, hint,
943+
peerCounters, journal_);
925944
},
926945
"invokeProtocolMessage",
927946
350ms,
@@ -953,12 +972,21 @@ PeerImp::onReadMessage(error_code ec, std::size_t bytes_transferred)
953972
void
954973
PeerImp::onWriteMessage(error_code ec, std::size_t bytes_transferred)
955974
{
956-
if (!socket_.is_open())
975+
auto& peerCounters = app_.getPerfLog().getPeerCounters();
976+
if (!socket_.is_open()) {
977+
++peerCounters.send.sendFailedClosed;
957978
return;
958-
if (ec == boost::asio::error::operation_aborted)
979+
}
980+
if (ec == boost::asio::error::operation_aborted) {
981+
++peerCounters.send.sendFailedAborted;
959982
return;
960-
if (ec)
983+
}
984+
if (ec) {
985+
++peerCounters.send.sendFailedOther;
961986
return fail("onWriteMessage", ec);
987+
}
988+
++peerCounters.send.sent;
989+
peerCounters.send.sentBytes += bytes_transferred;
962990
if (auto stream = journal_.trace())
963991
{
964992
if (bytes_transferred > 0)

src/xrpld/overlay/detail/PeerImp.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
#include <xrpld/overlay/detail/OverlayImpl.h>
2727
#include <xrpld/overlay/detail/ProtocolVersion.h>
2828
#include <xrpld/peerfinder/PeerfinderManager.h>
29+
#include <xrpld/perflog/PerfLog.h>
2930

3031
#include <xrpl/basics/Log.h>
3132
#include <xrpl/basics/UnorderedContainers.h>
@@ -703,6 +704,7 @@ PeerImp::PeerImp(
703704
app_.config().LEDGER_REPLAY))
704705
, ledgerReplayMsgHandler_(app, app.getLedgerReplayer())
705706
{
707+
++app_.getPerfLog().getPeerCounters().connection.totalOutboundConnects;
706708
read_buffer_.commit(boost::asio::buffer_copy(
707709
read_buffer_.prepare(boost::asio::buffer_size(buffers)), buffers));
708710
JLOG(journal_.info())

0 commit comments

Comments
 (0)