Skip to content

Commit d3e614a

Browse files
authored
feat: forward service self metric (#2489)
* feat: forward service self metric Change-Id: I994caa69c976c9d112b78d8ec53eb1ccf7b667e5 Co-developed-by: Cursor <noreply@cursor.com> * fix Change-Id: If80c18eff78e36812f6a0e37c5fdb8666c219b55 Co-developed-by: Cursor <noreply@cursor.com>
1 parent ad298d6 commit d3e614a

File tree

9 files changed

+41
-7
lines changed

9 files changed

+41
-7
lines changed

core/forward/BaseService.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,15 @@ namespace logtail {
2222

2323
class BaseService {
2424
public:
25+
BaseService(const std::string& address) : mAddress(address) {}
2526
virtual ~BaseService() = default;
2627

2728
virtual bool Update(std::string configName, const Json::Value& config) = 0;
2829
virtual bool Remove(std::string configName, const Json::Value& config) = 0;
2930
[[nodiscard]] virtual const std::string& Name() const = 0;
31+
32+
protected:
33+
std::string mAddress;
3034
};
3135

3236
}; // namespace logtail

core/forward/GrpcInputManager.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ bool GrpcInputManager::AddListenInput(const std::string& configName,
7676
std::lock_guard<std::mutex> lock(mListenAddressToInputMapMutex);
7777
auto it = mListenAddressToInputMap.find(address);
7878
// generate a new service instance to get name
79-
std::unique_ptr<T> service = std::make_unique<T>();
79+
std::unique_ptr<T> service = std::make_unique<T>(address);
8080
if (it != mListenAddressToInputMap.end()) {
8181
if (it->second.mServer == nullptr || it->second.mService == nullptr) {
8282
// should never happen

core/forward/loongsuite/LoongSuiteForwardService.cpp

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
#include "common/Flags.h"
2424
#include "common/ParamExtractor.h"
25+
#include "common/TimeKeeper.h"
2526
#include "grpcpp/support/status.h"
2627
#include "logger/Logger.h"
2728
#include "models/PipelineEventGroup.h"
@@ -69,6 +70,20 @@ bool LoongSuiteForwardServiceImpl::Update(std::string configName, const Json::Va
6970
return true;
7071
}
7172

73+
LoongSuiteForwardServiceImpl::LoongSuiteForwardServiceImpl(const std::string& address) : BaseService(address) {
74+
WriteMetrics::GetInstance()->CreateMetricsRecordRef(mMetricsRecordRef,
75+
MetricCategory::METRIC_CATEGORY_COMPONENT,
76+
{
77+
{METRIC_LABEL_KEY_COMPONENT_NAME, "loongsuite_forward"},
78+
{METRIC_LABEL_KEY_SERVICE_ADDRESS, address},
79+
});
80+
mInEventsTotal = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_IN_EVENTS_TOTAL);
81+
mInSizeBytes = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_IN_SIZE_BYTES);
82+
mTotalDelayMs = mMetricsRecordRef.CreateTimeCounter(METRIC_COMPONENT_TOTAL_DELAY_MS);
83+
mDiscardedEventsTotal = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_DISCARDED_ITEMS_TOTAL);
84+
WriteMetrics::GetInstance()->CommitMetricsRecordRef(mMetricsRecordRef);
85+
}
86+
7287
bool LoongSuiteForwardServiceImpl::Remove(std::string configName, const Json::Value& config) {
7388
std::string errorMsg;
7489

@@ -89,6 +104,9 @@ grpc::ServerUnaryReactor* LoongSuiteForwardServiceImpl::Forward(grpc::CallbackSe
89104
LoongSuiteForwardResponse* response) {
90105
auto* reactor = context->DefaultReactor();
91106
grpc::Status status(grpc::StatusCode::NOT_FOUND, "No matching config found for forward request");
107+
ADD_COUNTER(mInEventsTotal, 1);
108+
ADD_COUNTER(mInSizeBytes, request->data_size());
109+
auto before = TimeKeeper::GetInstance()->NowMs();
92110

93111
std::shared_ptr<ForwardConfig> config;
94112
if (FindMatchingConfig(context, config)) {
@@ -99,10 +117,13 @@ grpc::ServerUnaryReactor* LoongSuiteForwardServiceImpl::Forward(grpc::CallbackSe
99117
if (status.ok()) {
100118
mRetryTimeController.UpRetryTimes(config->configName);
101119
} else {
120+
ADD_COUNTER(mDiscardedEventsTotal, 1);
102121
mRetryTimeController.DownRetryTimes(config->configName);
103122
}
123+
} else {
124+
ADD_COUNTER(mDiscardedEventsTotal, 1);
104125
}
105-
126+
ADD_COUNTER(mTotalDelayMs, std::chrono::milliseconds(TimeKeeper::GetInstance()->NowMs() - before));
106127
reactor->Finish(status);
107128
return reactor;
108129
}

core/forward/loongsuite/LoongSuiteForwardService.h

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424

2525
#include "collection_pipeline/queue/QueueKey.h"
2626
#include "forward/BaseService.h"
27+
#include "monitor/MetricManager.h"
2728
#include "protobuf/forward/loongsuite.grpc.pb.h"
2829

2930
namespace logtail {
@@ -78,7 +79,7 @@ class RetryTimeController {
7879

7980
class LoongSuiteForwardServiceImpl : public BaseService, public LoongSuiteForwardService::CallbackService {
8081
public:
81-
LoongSuiteForwardServiceImpl() = default;
82+
LoongSuiteForwardServiceImpl(const std::string& address);
8283
~LoongSuiteForwardServiceImpl() override = default;
8384

8485
bool Update(std::string configName, const Json::Value& config) override;
@@ -99,6 +100,12 @@ class LoongSuiteForwardServiceImpl : public BaseService, public LoongSuiteForwar
99100

100101
RetryTimeController mRetryTimeController;
101102

103+
MetricsRecordRef mMetricsRecordRef;
104+
CounterPtr mInEventsTotal;
105+
CounterPtr mInSizeBytes;
106+
TimeCounterPtr mTotalDelayMs;
107+
CounterPtr mDiscardedEventsTotal;
108+
102109
bool AddToIndex(std::string& configName, ForwardConfig&& config, std::string& errorMsg);
103110
bool FindMatchingConfig(grpc::CallbackServerContext* context, std::shared_ptr<ForwardConfig>& config) const;
104111
void ProcessForwardRequest(const LoongSuiteForwardRequest* request,

core/monitor/metric_constants/ComponentMetrics.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ namespace logtail {
2222
// label keys
2323
const string METRIC_LABEL_KEY_COMPONENT_NAME = "component_name";
2424
const string METRIC_LABEL_KEY_FLUSHER_PLUGIN_ID = "flusher_plugin_id";
25+
const string METRIC_LABEL_KEY_SERVICE_ADDRESS = "service_address";
2526

2627
/**********************************************************
2728
* queue

core/monitor/metric_constants/MetricConstants.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,7 @@ extern const std::string METRIC_LABEL_KEY_EXACTLY_ONCE_ENABLED;
234234
extern const std::string METRIC_LABEL_KEY_QUEUE_TYPE;
235235
extern const std::string METRIC_LABEL_KEY_TARGET;
236236
extern const std::string METRIC_LABEL_KEY_GROUP_BATCH_ENABLED;
237+
extern const std::string METRIC_LABEL_KEY_SERVICE_ADDRESS;
237238

238239
// label values
239240
extern const std::string METRIC_LABEL_VALUE_COMPONENT_NAME_BATCHER;

core/unittest/forward/GrpcInputManagerUnittest.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,8 @@ void GrpcInputManagerUnittest::TestUpdateListenInputExistingAddressSameTypeUpdat
9090
}
9191

9292
void GrpcInputManagerUnittest::TestUpdateListenInputExistingAddressDifferentTypeError() const {
93-
MockServiceImpl mockService;
93+
const std::string address = "0.0.0.0:50053";
94+
MockServiceImpl mockService(address);
9495
auto* runner = GrpcInputManager::GetInstance();
9596
runner->Init();
9697
Json::Value config;
@@ -99,7 +100,6 @@ void GrpcInputManagerUnittest::TestUpdateListenInputExistingAddressDifferentType
99100
config["QueueKey"] = 1;
100101
config["InputIndex"] = 1;
101102
config["Protocol"] = "LoongSuite";
102-
const std::string address = "0.0.0.0:50053";
103103
bool ret = runner->AddListenInput<LoongSuiteForwardServiceImpl>("configA", address, config);
104104
APSARA_TEST_TRUE_FATAL(ret);
105105
ret = runner->AddListenInput<MockServiceImpl>("configB", address, config);

core/unittest/forward/LoongSuiteForwardServiceUnittest.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ class LoongSuiteForwardServiceUnittest : public testing::Test {
5757

5858
protected:
5959
void SetUp() override {
60-
service = std::unique_ptr<LoongSuiteForwardServiceImpl>(new LoongSuiteForwardServiceImpl());
60+
service = std::unique_ptr<LoongSuiteForwardServiceImpl>(new LoongSuiteForwardServiceImpl("0.0.0.0:50054"));
6161
// Initialize ProcessorRunner for testing
6262
ProcessorRunner::GetInstance()->Init();
6363
}

core/unittest/forward/MockServiceImpl.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ namespace logtail {
2121

2222
class MockServiceImpl : public BaseService, public LoongSuiteForwardService::CallbackService {
2323
public:
24-
MockServiceImpl() = default;
24+
explicit MockServiceImpl(const std::string& address) : BaseService(address) {}
2525
~MockServiceImpl() override = default;
2626

2727
bool Update(std::string configName, const Json::Value& config) override { return true; }

0 commit comments

Comments
 (0)