Skip to content

Commit 2e2740d

Browse files
authored
feat: Published subscription message counters (#1618)
This PR adds counters to track the amount of published messages for each subscription stream.
1 parent 5004dc4 commit 2e2740d

File tree

7 files changed

+34
-2
lines changed

7 files changed

+34
-2
lines changed

src/feed/impl/ProposedTransactionFeed.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,13 +104,17 @@ ProposedTransactionFeed::pub(boost::json::object const& receivedTxJson)
104104
boost::asio::post(strand_, [this, pubMsg = std::move(pubMsg), affectedAccounts = std::move(affectedAccounts)]() {
105105
notified_.clear();
106106
signal_.emit(pubMsg);
107+
107108
// Prevent the same connection from receiving the same message twice if it is subscribed to multiple accounts
108109
// However, if the same connection subscribe both stream and account, it will still receive the message twice.
109110
// notified_ can be cleared before signal_ emit to improve this, but let's keep it as is for now, since rippled
110111
// acts like this.
111112
notified_.clear();
113+
112114
for (auto const& account : affectedAccounts)
113115
accountSignal_.emit(account, pubMsg);
116+
117+
++pubCount_.get();
114118
});
115119
}
116120

src/feed/impl/ProposedTransactionFeed.hpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
#include "feed/impl/TrackableSignalMap.hpp"
2525
#include "feed/impl/Util.hpp"
2626
#include "util/log/Logger.hpp"
27+
#include "util/prometheus/Counter.hpp"
2728
#include "util/prometheus/Gauge.hpp"
2829

2930
#include <boost/asio/io_context.hpp>
@@ -54,6 +55,7 @@ class ProposedTransactionFeed {
5455
boost::asio::strand<boost::asio::io_context::executor_type> strand_;
5556
std::reference_wrapper<util::prometheus::GaugeInt> subAllCount_;
5657
std::reference_wrapper<util::prometheus::GaugeInt> subAccountCount_;
58+
std::reference_wrapper<util::prometheus::CounterInt> pubCount_;
5759

5860
TrackableSignalMap<ripple::AccountID, Subscriber, std::shared_ptr<std::string>> accountSignal_;
5961
TrackableSignal<Subscriber, std::shared_ptr<std::string>> signal_;
@@ -67,7 +69,7 @@ class ProposedTransactionFeed {
6769
: strand_(boost::asio::make_strand(ioContext))
6870
, subAllCount_(getSubscriptionsGaugeInt("tx_proposed"))
6971
, subAccountCount_(getSubscriptionsGaugeInt("account_proposed"))
70-
72+
, pubCount_(getPublishedMessagesCounterInt("tx_proposed"))
7173
{
7274
}
7375

src/feed/impl/SingleFeedBase.cpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,10 @@
3636
namespace feed::impl {
3737

3838
SingleFeedBase::SingleFeedBase(boost::asio::io_context& ioContext, std::string const& name)
39-
: strand_(boost::asio::make_strand(ioContext)), subCount_(getSubscriptionsGaugeInt(name)), name_(name)
39+
: strand_(boost::asio::make_strand(ioContext))
40+
, subCount_(getSubscriptionsGaugeInt(name))
41+
, pubCount_(getPublishedMessagesCounterInt(name))
42+
, name_(name)
4043
{
4144
}
4245

@@ -70,6 +73,7 @@ SingleFeedBase::pub(std::string msg) const
7073
boost::asio::post(strand_, [this, msg = std::move(msg)]() mutable {
7174
auto const msgPtr = std::make_shared<std::string>(std::move(msg));
7275
signal_.emit(msgPtr);
76+
++pubCount_.get();
7377
});
7478
}
7579

src/feed/impl/SingleFeedBase.hpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
#include "feed/Types.hpp"
2323
#include "feed/impl/TrackableSignal.hpp"
2424
#include "util/log/Logger.hpp"
25+
#include "util/prometheus/Counter.hpp"
2526
#include "util/prometheus/Gauge.hpp"
2627

2728
#include <boost/asio/io_context.hpp>
@@ -40,6 +41,7 @@ namespace feed::impl {
4041
class SingleFeedBase {
4142
boost::asio::strand<boost::asio::io_context::executor_type> strand_;
4243
std::reference_wrapper<util::prometheus::GaugeInt> subCount_;
44+
std::reference_wrapper<util::prometheus::CounterInt> pubCount_;
4345
TrackableSignal<Subscriber, std::shared_ptr<std::string> const&> signal_;
4446
util::Logger logger_{"Subscriptions"};
4547
std::string name_;

src/feed/impl/TransactionFeed.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -284,23 +284,29 @@ TransactionFeed::pub(
284284
affectedBooks = std::move(affectedBooks)]() {
285285
notified_.clear();
286286
signal_.emit(allVersionsMsgs);
287+
287288
// clear the notified set. If the same connection subscribes both transactions + proposed_transactions,
288289
// rippled SENDS the same message twice
289290
notified_.clear();
290291
txProposedsignal_.emit(allVersionsMsgs);
291292
notified_.clear();
293+
292294
// check duplicate for account and proposed_account, this prevents sending the same message multiple times
293295
// if it affects multiple accounts watched by the same connection
294296
for (auto const& account : affectedAccounts) {
295297
accountSignal_.emit(account, allVersionsMsgs);
296298
accountProposedSignal_.emit(account, allVersionsMsgs);
297299
}
300+
298301
notified_.clear();
302+
299303
// check duplicate for books, this prevents sending the same message multiple times if it affects multiple
300304
// books watched by the same connection
301305
for (auto const& book : affectedBooks) {
302306
bookSignal_.emit(book, allVersionsMsgs);
303307
}
308+
309+
++pubCount_.get();
304310
}
305311
);
306312
}

src/feed/impl/TransactionFeed.hpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
#include "feed/impl/TrackableSignalMap.hpp"
2727
#include "feed/impl/Util.hpp"
2828
#include "util/log/Logger.hpp"
29+
#include "util/prometheus/Counter.hpp"
2930
#include "util/prometheus/Gauge.hpp"
3031

3132
#include <boost/asio/io_context.hpp>
@@ -67,6 +68,7 @@ class TransactionFeed {
6768
std::reference_wrapper<util::prometheus::GaugeInt> subAllCount_;
6869
std::reference_wrapper<util::prometheus::GaugeInt> subAccountCount_;
6970
std::reference_wrapper<util::prometheus::GaugeInt> subBookCount_;
71+
std::reference_wrapper<util::prometheus::CounterInt> pubCount_;
7072

7173
TrackableSignalMap<ripple::AccountID, Subscriber, AllVersionTransactionsType const&> accountSignal_;
7274
TrackableSignalMap<ripple::Book, Subscriber, AllVersionTransactionsType const&> bookSignal_;
@@ -89,6 +91,7 @@ class TransactionFeed {
8991
, subAllCount_(getSubscriptionsGaugeInt("tx"))
9092
, subAccountCount_(getSubscriptionsGaugeInt("account"))
9193
, subBookCount_(getSubscriptionsGaugeInt("book"))
94+
, pubCount_(getPublishedMessagesCounterInt("tx"))
9295
{
9396
}
9497

src/feed/impl/Util.hpp

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
#pragma once
2121

22+
#include "util/prometheus/Counter.hpp"
2223
#include "util/prometheus/Gauge.hpp"
2324
#include "util/prometheus/Label.hpp"
2425
#include "util/prometheus/Prometheus.hpp"
@@ -38,4 +39,14 @@ getSubscriptionsGaugeInt(std::string const& counterName)
3839
fmt::format("Current subscribers number on the {} stream", counterName)
3940
);
4041
}
42+
43+
inline util::prometheus::CounterInt&
44+
getPublishedMessagesCounterInt(std::string const& counterName)
45+
{
46+
return PrometheusService::counterInt(
47+
"subscriptions_published_count",
48+
util::prometheus::Labels({util::prometheus::Label{"stream", counterName}}),
49+
fmt::format("Total published messages on the {} stream", counterName)
50+
);
51+
}
4152
} // namespace feed::impl

0 commit comments

Comments
 (0)