From 4da2793069d4b38c0f7d6cfdf6ae3e4c2d339337 Mon Sep 17 00:00:00 2001 From: Akrama Baig Mirza Date: Thu, 6 Feb 2025 11:20:53 -0800 Subject: [PATCH] Add credits_available quantile counter (p1, p50, p99) Summary: Add quantile counter (p1, p50, and p99) for number of stream credits the server currently has. The value is updated each time the server receives more credits or if it runs out of credits. Reviewed By: praihan Differential Revision: D68467565 fbshipit-source-id: ecc316562572badf057b94fb460af4164e6c4f4b --- thrift/lib/cpp2/server/metrics/StreamMetricCallback.h | 3 +++ .../transport/rocket/server/RocketStreamClientCallback.cpp | 4 +++- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/thrift/lib/cpp2/server/metrics/StreamMetricCallback.h b/thrift/lib/cpp2/server/metrics/StreamMetricCallback.h index 49af713246b..39a4afac9e3 100644 --- a/thrift/lib/cpp2/server/metrics/StreamMetricCallback.h +++ b/thrift/lib/cpp2/server/metrics/StreamMetricCallback.h @@ -31,6 +31,8 @@ class StreamMetricCallback { virtual void onStreamCancel(std::string_view /* methodName */) = 0; virtual void onStreamRequestN( std::string_view /* methodName */, uint32_t /* credits */) = 0; + virtual void recordCreditsAvailable( + std::string_view /* methodName */, uint32_t /* creditsAvailable */) = 0; }; class NoopStreamMetricCallback final : public StreamMetricCallback { @@ -41,6 +43,7 @@ class NoopStreamMetricCallback final : public StreamMetricCallback { void onStreamComplete(std::string_view) override {} void onStreamCancel(std::string_view) override {} void onStreamRequestN(std::string_view, uint32_t) override {} + void recordCreditsAvailable(std::string_view, uint32_t) override {} }; } // namespace apache::thrift diff --git a/thrift/lib/cpp2/transport/rocket/server/RocketStreamClientCallback.cpp b/thrift/lib/cpp2/transport/rocket/server/RocketStreamClientCallback.cpp index 47263c726ed..b1d08ce0ec8 100644 --- a/thrift/lib/cpp2/transport/rocket/server/RocketStreamClientCallback.cpp +++ b/thrift/lib/cpp2/transport/rocket/server/RocketStreamClientCallback.cpp @@ -226,8 +226,9 @@ bool RocketStreamClientCallback::request(uint32_t tokens) { } cancelTimeout(); - tokens_ += tokens; + streamMetricCallback_.recordCreditsAvailable(rpcMethodName_, tokens_); streamMetricCallback_.onStreamRequestN(rpcMethodName_, tokens); + tokens_ += tokens; return serverCallback()->onStreamRequestN(tokens); } @@ -292,6 +293,7 @@ void RocketStreamClientCallback::scheduleTimeout() { timeoutCallback_ = std::make_unique(*this); } connection_.scheduleStreamTimeout(timeoutCallback_.get()); + streamMetricCallback_.recordCreditsAvailable(rpcMethodName_, tokens_); } void RocketStreamClientCallback::cancelTimeout() {