Skip to content

Commit 6e8ab28

Browse files
authored
fix: allow metrics batch in different minutes (#2160)
* fix: allow metrics batch in different minutes * chore: update * chore: add ut * chore: add ut * chore: add explanation
1 parent 5c286ac commit 6e8ab28

File tree

2 files changed

+21
-0
lines changed

2 files changed

+21
-0
lines changed

Diff for: core/collection_pipeline/batch/FlushStrategy.cpp

+9
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,22 @@
1414

1515
#include "collection_pipeline/batch/FlushStrategy.h"
1616

17+
#include <cstdlib>
18+
1719
using namespace std;
1820

1921
namespace logtail {
2022

2123
template <>
2224
bool EventFlushStrategy<SLSEventBatchStatus>::NeedFlushByTime(const SLSEventBatchStatus& status,
2325
const PipelineEventPtr& e) {
26+
if (e.Is<MetricEvent>()) {
27+
// It is necessary to flush, if the event timestamp and the batch creation time differ by more than 300 seconds.
28+
// The 300 seconds is to avoid frequent batching to reduce the flusher traffic, because metrics such as cAdvisor
29+
// has out-of-order situations.
30+
return time(nullptr) - status.GetCreateTime() > mTimeoutSecs
31+
|| abs(status.GetCreateTime() - e->GetTimestamp()) > 300;
32+
}
2433
return time(nullptr) - status.GetCreateTime() > mTimeoutSecs
2534
|| status.GetCreateTimeMinute() != e->GetTimestamp() / 60;
2635
}

Diff for: core/unittest/batch/FlushStrategyUnittest.cpp

+12
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
// limitations under the License.
1414

1515

16+
#include "PipelineEventGroup.h"
17+
#include "PipelineEventPtr.h"
1618
#include "collection_pipeline/batch/BatchStatus.h"
1719
#include "collection_pipeline/batch/FlushStrategy.h"
1820
#include "unittest/Unittest.h"
@@ -113,6 +115,8 @@ void SLSEventFlushStrategyUnittest::TestNeedFlush() {
113115
PipelineEventGroup eventGroup(make_shared<SourceBuffer>());
114116
PipelineEventPtr event(eventGroup.CreateLogEvent(), false, nullptr);
115117
event->SetTimestamp(1717398001);
118+
PipelineEventPtr metricEvent(eventGroup.CreateMetricEvent(), false, nullptr);
119+
metricEvent->SetTimestamp(time(nullptr));
116120

117121
SLSEventBatchStatus status;
118122
status.mCnt = 2;
@@ -122,6 +126,7 @@ void SLSEventFlushStrategyUnittest::TestNeedFlush() {
122126
APSARA_TEST_TRUE(mStrategy.NeedFlushByCnt(status));
123127
APSARA_TEST_FALSE(mStrategy.NeedFlushBySize(status));
124128
APSARA_TEST_FALSE(mStrategy.NeedFlushByTime(status, event));
129+
APSARA_TEST_FALSE(mStrategy.NeedFlushByTime(status, metricEvent));
125130

126131
status.mCnt = 1;
127132
status.mSizeBytes = 100;
@@ -130,6 +135,7 @@ void SLSEventFlushStrategyUnittest::TestNeedFlush() {
130135
APSARA_TEST_FALSE(mStrategy.NeedFlushByCnt(status));
131136
APSARA_TEST_TRUE(mStrategy.NeedFlushBySize(status));
132137
APSARA_TEST_FALSE(mStrategy.NeedFlushByTime(status, event));
138+
APSARA_TEST_FALSE(mStrategy.NeedFlushByTime(status, metricEvent));
133139

134140
status.mCnt = 1;
135141
status.mSizeBytes = 50;
@@ -138,6 +144,7 @@ void SLSEventFlushStrategyUnittest::TestNeedFlush() {
138144
APSARA_TEST_FALSE(mStrategy.NeedFlushByCnt(status));
139145
APSARA_TEST_FALSE(mStrategy.NeedFlushBySize(status));
140146
APSARA_TEST_TRUE(mStrategy.NeedFlushByTime(status, event));
147+
APSARA_TEST_TRUE(mStrategy.NeedFlushByTime(status, metricEvent));
141148

142149
status.mCnt = 1;
143150
status.mSizeBytes = 50;
@@ -146,6 +153,11 @@ void SLSEventFlushStrategyUnittest::TestNeedFlush() {
146153
APSARA_TEST_FALSE(mStrategy.NeedFlushByCnt(status));
147154
APSARA_TEST_FALSE(mStrategy.NeedFlushBySize(status));
148155
APSARA_TEST_TRUE(mStrategy.NeedFlushByTime(status, event));
156+
APSARA_TEST_FALSE(mStrategy.NeedFlushByTime(status, metricEvent));
157+
metricEvent->SetTimestamp(time(nullptr) - 302);
158+
APSARA_TEST_TRUE(mStrategy.NeedFlushByTime(status, metricEvent));
159+
metricEvent->SetTimestamp(time(nullptr) + 301);
160+
APSARA_TEST_TRUE(mStrategy.NeedFlushByTime(status, metricEvent));
149161
}
150162

151163
UNIT_TEST_CASE(SLSEventFlushStrategyUnittest, TestNeedFlush)

0 commit comments

Comments
 (0)