Skip to content

Commit a4e79f0

Browse files
committed
improve multi thread performance for multiline log split
1 parent 7d44414 commit a4e79f0

File tree

4 files changed

+62
-22
lines changed

4 files changed

+62
-22
lines changed

Diff for: core/plugin/processor/inner/ProcessorSplitMultilineLogStringNative.cpp

+44-22
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,16 @@
2020

2121
#include "boost/regex.hpp"
2222

23-
#include "PipelineEventGroup.h"
24-
#include "TagConstants.h"
2523
#include "app_config/AppConfig.h"
2624
#include "collection_pipeline/plugin/instance/ProcessorInstance.h"
2725
#include "common/ParamExtractor.h"
2826
#include "constants/Constants.h"
27+
#include "constants/TagConstants.h"
2928
#include "logger/Logger.h"
3029
#include "models/LogEvent.h"
30+
#include "models/PipelineEventGroup.h"
3131
#include "monitor/metric_constants/MetricConstants.h"
32+
#include "runner/ProcessorRunner.h"
3233

3334
namespace logtail {
3435

@@ -67,6 +68,18 @@ bool ProcessorSplitMultilineLogStringNative::Init(const Json::Value& config) {
6768
mContext->GetRegion());
6869
}
6970

71+
for (int i = 0; i < AppConfig::GetInstance()->GetProcessThreadCount(); ++i) {
72+
if (!mMultiline.mStartPattern.empty()) {
73+
mStartPatternReg.emplace_back(mMultiline.mStartPattern);
74+
}
75+
if (!mMultiline.mContinuePattern.empty()) {
76+
mContinuePatternReg.emplace_back(mMultiline.mContinuePattern);
77+
}
78+
if (!mMultiline.mEndPattern.empty()) {
79+
mEndPatternReg.emplace_back(mMultiline.mEndPattern);
80+
}
81+
}
82+
7083
mMatchedEventsTotal = GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_MATCHED_EVENTS_TOTAL);
7184
mMatchedLinesTotal = GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_MATCHED_LINES_TOTAL);
7285
mUnmatchedLinesTotal = GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_UNMATCHED_LINES_TOTAL);
@@ -150,8 +163,7 @@ void ProcessorSplitMultilineLogStringNative::ProcessEvent(PipelineEventGroup& lo
150163
std::string exception;
151164
const char* multiStartIndex = nullptr;
152165
bool isPartialLog = false;
153-
if (mMultiline.GetStartPatternReg() == nullptr && mMultiline.GetContinuePatternReg() == nullptr
154-
&& mMultiline.GetEndPatternReg() != nullptr) {
166+
if (!HasStartPattern() && !HasContinuePattern() && HasEndPattern()) {
155167
// if only end pattern is given, then it will stick to this state
156168
isPartialLog = true;
157169
multiStartIndex = sourceVal.data();
@@ -165,17 +177,16 @@ void ProcessorSplitMultilineLogStringNative::ProcessEvent(PipelineEventGroup& lo
165177
if (!isPartialLog) {
166178
// it is impossible to enter this state if only end pattern is given
167179
boost::regex regex;
168-
if (mMultiline.GetStartPatternReg() != nullptr) {
169-
regex = *mMultiline.GetStartPatternReg();
180+
if (HasStartPattern()) {
181+
regex = GetStartPatternReg();
170182
} else {
171-
regex = *mMultiline.GetContinuePatternReg();
183+
regex = GetContinuePatternReg();
172184
}
173185
if (BoostRegexSearch(content.data(), content.size(), regex, exception)) {
174186
multiStartIndex = content.data();
175187
isPartialLog = true;
176-
} else if (mMultiline.GetEndPatternReg() != nullptr && mMultiline.GetStartPatternReg() == nullptr
177-
&& mMultiline.GetContinuePatternReg() != nullptr
178-
&& BoostRegexSearch(content.data(), content.size(), *mMultiline.GetEndPatternReg(), exception)) {
188+
} else if (HasEndPattern() && !HasStartPattern() && HasContinuePattern()
189+
&& BoostRegexSearch(content.data(), content.size(), GetEndPatternReg(), exception)) {
179190
// case: continue + end
180191
CreateNewEvent(content, isLastLog, sourceKey, sourceEvent, logGroup, newEvents);
181192
multiStartIndex = content.data() + content.size() + 1;
@@ -186,17 +197,17 @@ void ProcessorSplitMultilineLogStringNative::ProcessEvent(PipelineEventGroup& lo
186197
}
187198
} else {
188199
// case: start + continue or continue + end
189-
if (mMultiline.GetContinuePatternReg() != nullptr
190-
&& BoostRegexSearch(content.data(), content.size(), *mMultiline.GetContinuePatternReg(), exception)) {
200+
if (HasContinuePattern()
201+
&& BoostRegexSearch(content.data(), content.size(), GetContinuePatternReg(), exception)) {
191202
begin += content.size() + 1;
192203
continue;
193204
}
194-
if (mMultiline.GetEndPatternReg() != nullptr) {
205+
if (HasEndPattern()) {
195206
// case: start + end or continue + end or end
196-
if (mMultiline.GetContinuePatternReg() != nullptr) {
207+
if (HasContinuePattern()) {
197208
// current line is not matched against the continue pattern, so the end pattern will decide
198209
// if the current log is a match or not
199-
if (BoostRegexSearch(content.data(), content.size(), *mMultiline.GetEndPatternReg(), exception)) {
210+
if (BoostRegexSearch(content.data(), content.size(), GetEndPatternReg(), exception)) {
200211
CreateNewEvent(StringView(multiStartIndex, content.data() + content.size() - multiStartIndex),
201212
isLastLog,
202213
sourceKey,
@@ -218,14 +229,14 @@ void ProcessorSplitMultilineLogStringNative::ProcessEvent(PipelineEventGroup& lo
218229
isPartialLog = false;
219230
} else {
220231
// case: start + end or end
221-
if (BoostRegexSearch(content.data(), content.size(), *mMultiline.GetEndPatternReg(), exception)) {
232+
if (BoostRegexSearch(content.data(), content.size(), GetEndPatternReg(), exception)) {
222233
CreateNewEvent(StringView(multiStartIndex, content.data() + content.size() - multiStartIndex),
223234
isLastLog,
224235
sourceKey,
225236
sourceEvent,
226237
logGroup,
227238
newEvents);
228-
if (mMultiline.GetStartPatternReg() != nullptr) {
239+
if (HasStartPattern()) {
229240
isPartialLog = false;
230241
} else {
231242
multiStartIndex = content.data() + content.size() + 1;
@@ -237,9 +248,9 @@ void ProcessorSplitMultilineLogStringNative::ProcessEvent(PipelineEventGroup& lo
237248
// so wait for the next line
238249
}
239250
} else {
240-
if (mMultiline.GetContinuePatternReg() == nullptr) {
251+
if (!HasContinuePattern()) {
241252
// case: start
242-
if (BoostRegexSearch(content.data(), content.size(), *mMultiline.GetStartPatternReg(), exception)) {
253+
if (BoostRegexSearch(content.data(), content.size(), GetStartPatternReg(), exception)) {
243254
CreateNewEvent(StringView(multiStartIndex, content.data() - 1 - multiStartIndex),
244255
isLastLog,
245256
sourceKey,
@@ -259,8 +270,7 @@ void ProcessorSplitMultilineLogStringNative::ProcessEvent(PipelineEventGroup& lo
259270
logGroup,
260271
newEvents);
261272
ADD_COUNTER(mMatchedEventsTotal, 1);
262-
if (!BoostRegexSearch(
263-
content.data(), content.size(), *mMultiline.GetStartPatternReg(), exception)) {
273+
if (!BoostRegexSearch(content.data(), content.size(), GetStartPatternReg(), exception)) {
264274
// when no end pattern is given, the only chance to enter unmatched state is when both
265275
// start and continue pattern are given, and the current line is not matched against the
266276
// start pattern
@@ -278,7 +288,7 @@ void ProcessorSplitMultilineLogStringNative::ProcessEvent(PipelineEventGroup& lo
278288
// when in unmatched state, the unmatched log is handled one by one, so there is no need for additional handle
279289
// here
280290
if (isPartialLog && multiStartIndex - sourceVal.data() < static_cast<int64_t>(sourceVal.size())) {
281-
if (mMultiline.GetEndPatternReg() == nullptr) {
291+
if (!HasEndPattern()) {
282292
CreateNewEvent(StringView(multiStartIndex, sourceVal.data() + sourceVal.size() - multiStartIndex),
283293
true,
284294
sourceKey,
@@ -383,4 +393,16 @@ StringView ProcessorSplitMultilineLogStringNative::GetNextLine(StringView log, s
383393
return StringView(log.data() + begin, log.size() - begin);
384394
}
385395

396+
const boost::regex& ProcessorSplitMultilineLogStringNative::GetStartPatternReg() const {
397+
return mStartPatternReg[ProcessorRunner::GetThreadNo()];
398+
}
399+
400+
const boost::regex& ProcessorSplitMultilineLogStringNative::GetContinuePatternReg() const {
401+
return mContinuePatternReg[ProcessorRunner::GetThreadNo()];
402+
}
403+
404+
const boost::regex& ProcessorSplitMultilineLogStringNative::GetEndPatternReg() const {
405+
return mEndPatternReg[ProcessorRunner::GetThreadNo()];
406+
}
407+
386408
} // namespace logtail

Diff for: core/plugin/processor/inner/ProcessorSplitMultilineLogStringNative.h

+13
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,19 @@ class ProcessorSplitMultilineLogStringNative : public Processor {
6565
int* unmatchLines);
6666
StringView GetNextLine(StringView log, size_t begin);
6767

68+
bool HasStartPattern() const { return !mStartPatternReg.empty(); }
69+
bool HasContinuePattern() const { return !mContinuePatternReg.empty(); }
70+
bool HasEndPattern() const { return !mEndPatternReg.empty(); }
71+
const boost::regex& GetStartPatternReg() const;
72+
const boost::regex& GetContinuePatternReg() const;
73+
const boost::regex& GetEndPatternReg() const;
74+
75+
// boost::regex object shared by multi-thread leads to performance degradation. Therefore, each thread should be
76+
// allocated a different copy.
77+
std::vector<boost::regex> mStartPatternReg;
78+
std::vector<boost::regex> mContinuePatternReg;
79+
std::vector<boost::regex> mEndPatternReg;
80+
6881
CounterPtr mMatchedEventsTotal;
6982
CounterPtr mMatchedLinesTotal;
7083
CounterPtr mUnmatchedLinesTotal;

Diff for: core/runner/ProcessorRunner.cpp

+2
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ using namespace std;
3434

3535
namespace logtail {
3636

37+
thread_local uint32_t ProcessorRunner::sThreadNo;
3738
thread_local MetricsRecordRef ProcessorRunner::sMetricsRecordRef;
3839
thread_local CounterPtr ProcessorRunner::sInGroupsCnt;
3940
thread_local CounterPtr ProcessorRunner::sInEventsCnt;
@@ -89,6 +90,7 @@ void ProcessorRunner::Run(uint32_t threadNo) {
8990
LOG_INFO(sLogger, ("processor runner", "started")("thread no", threadNo));
9091

9192
// thread local metrics should be initialized in each thread
93+
sThreadNo = threadNo;
9294
WriteMetrics::GetInstance()->PrepareMetricsRecordRef(
9395
sMetricsRecordRef,
9496
MetricCategory::METRIC_CATEGORY_RUNNER,

Diff for: core/runner/ProcessorRunner.h

+3
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ class ProcessorRunner {
3838
static ProcessorRunner instance;
3939
return &instance;
4040
}
41+
static uint32_t GetThreadNo() { return sThreadNo; }
4142

4243
void Init();
4344
void Stop();
@@ -60,6 +61,8 @@ class ProcessorRunner {
6061
std::vector<std::future<void>> mThreadRes;
6162
std::atomic_bool mIsFlush = false;
6263

64+
thread_local static uint32_t sThreadNo;
65+
6366
thread_local static MetricsRecordRef sMetricsRecordRef;
6467
thread_local static CounterPtr sInGroupsCnt;
6568
thread_local static CounterPtr sInEventsCnt;

0 commit comments

Comments
 (0)