Skip to content

Commit 37a00a2

Browse files
committed
update jsonSerializer and flusher_file
1 parent e20e879 commit 37a00a2

File tree

3 files changed

+79
-91
lines changed

3 files changed

+79
-91
lines changed

Diff for: core/collection_pipeline/serializer/JsonSerializer.cpp

+63-41
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,28 @@
1818
// TODO: the following dependencies should be removed
1919
#include "protobuf/sls/LogGroupSerializer.h"
2020

21+
#include "rapidjson/writer.h"
22+
#include "rapidjson/stringbuffer.h"
23+
2124
using namespace std;
2225

2326
namespace logtail {
2427

25-
const string JSON_KEY_TIME = "__time__";
28+
29+
const char* JSON_KEY_TIME = "__time__";
30+
31+
// Helper function to serialize common fields (tags and time)
32+
template <typename WriterType>
33+
void SerializeCommonFields(const SizedMap& tags, uint64_t timestamp, WriterType& writer) {
34+
// Serialize tags
35+
for (const auto& tag : tags.mInner) {
36+
writer.Key(tag.first.to_string().c_str());
37+
writer.String(tag.second.to_string().c_str());
38+
}
39+
// Serialize time
40+
writer.Key(JSON_KEY_TIME);
41+
writer.Uint64(timestamp);
42+
}
2643

2744
bool JsonEventGroupSerializer::Serialize(BatchedEvents&& group, string& res, string& errorMsg) {
2845
if (group.mEvents.empty()) {
@@ -37,29 +54,31 @@ bool JsonEventGroupSerializer::Serialize(BatchedEvents&& group, string& res, str
3754
return false;
3855
}
3956

40-
Json::Value groupTags;
41-
for (const auto& tag : group.mTags.mInner) {
42-
groupTags[tag.first.to_string()] = tag.second.to_string();
43-
}
57+
// Create reusable StringBuffer and Writer
58+
rapidjson::StringBuffer jsonBuffer;
59+
rapidjson::Writer<rapidjson::StringBuffer> writer(jsonBuffer);
60+
auto resetBuffer = [&jsonBuffer, &writer]() {
61+
jsonBuffer.Clear(); // Clear the buffer for reuse
62+
writer.Reset(jsonBuffer);
63+
};
4464

4565
// TODO: should support nano second
46-
ostringstream oss;
4766
switch (eventType) {
4867
case PipelineEvent::Type::LOG:
4968
for (const auto& item : group.mEvents) {
5069
const auto& e = item.Cast<LogEvent>();
51-
Json::Value eventJson;
52-
// tags
53-
eventJson.copy(groupTags);
54-
// time
55-
eventJson[JSON_KEY_TIME] = e.GetTimestamp();
70+
resetBuffer();
71+
72+
writer.StartObject();
73+
SerializeCommonFields(group.mTags, e.GetTimestamp(), writer);
5674
// contents
5775
for (const auto& kv : e) {
58-
eventJson[kv.first.to_string()] = kv.second.to_string();
76+
writer.Key(kv.first.to_string().c_str());
77+
writer.String(kv.second.to_string().c_str());
5978
}
60-
Json::StreamWriterBuilder writer;
61-
writer["indentation"] = "";
62-
oss << Json::writeString(writer, eventJson) << endl;
79+
writer.EndObject();
80+
res.append(jsonBuffer.GetString());
81+
res.append("\n");
6382
}
6483
break;
6584
case PipelineEvent::Type::METRIC:
@@ -69,34 +88,38 @@ bool JsonEventGroupSerializer::Serialize(BatchedEvents&& group, string& res, str
6988
if (e.Is<std::monostate>()) {
7089
continue;
7190
}
72-
Json::Value eventJson;
73-
// tags
74-
eventJson.copy(groupTags);
75-
// time
76-
eventJson[JSON_KEY_TIME] = e.GetTimestamp();
91+
resetBuffer();
92+
93+
writer.StartObject();
94+
SerializeCommonFields(group.mTags, e.GetTimestamp(), writer);
7795
// __labels__
78-
eventJson[METRIC_RESERVED_KEY_LABELS] = Json::objectValue;
79-
auto& labels = eventJson[METRIC_RESERVED_KEY_LABELS];
96+
writer.Key(METRIC_RESERVED_KEY_LABELS.c_str());
97+
writer.StartObject();
8098
for (auto tag = e.TagsBegin(); tag != e.TagsEnd(); tag++) {
81-
labels[tag->first.to_string()] = tag->second.to_string();
99+
writer.Key(tag->first.to_string().c_str());
100+
writer.String(tag->second.to_string().c_str());
82101
}
102+
writer.EndObject();
83103
// __name__
84-
eventJson[METRIC_RESERVED_KEY_NAME] = e.GetName().to_string();
104+
writer.Key(METRIC_RESERVED_KEY_NAME.c_str());
105+
writer.String(e.GetName().to_string().c_str());
85106
// __value__
107+
writer.Key(METRIC_RESERVED_KEY_VALUE.c_str());
86108
if (e.Is<UntypedSingleValue>()) {
87-
eventJson[METRIC_RESERVED_KEY_VALUE] = e.GetValue<UntypedSingleValue>()->mValue;
109+
writer.Double(e.GetValue<UntypedSingleValue>()->mValue);
88110
} else if (e.Is<UntypedMultiDoubleValues>()) {
89-
eventJson[METRIC_RESERVED_KEY_VALUE] = Json::objectValue;
90-
auto& values = eventJson[METRIC_RESERVED_KEY_VALUE];
111+
writer.StartObject();
91112
for (auto value = e.GetValue<UntypedMultiDoubleValues>()->ValuesBegin();
92113
value != e.GetValue<UntypedMultiDoubleValues>()->ValuesEnd();
93114
value++) {
94-
values[value->first.to_string()] = value->second.Value;
115+
writer.Key(value->first.to_string().c_str());
116+
writer.Double(value->second.Value);
95117
}
118+
writer.EndObject();
96119
}
97-
Json::StreamWriterBuilder writer;
98-
writer["indentation"] = "";
99-
oss << Json::writeString(writer, eventJson) << endl;
120+
writer.EndObject();
121+
res.append(jsonBuffer.GetString());
122+
res.append("\n");
100123
}
101124
break;
102125
case PipelineEvent::Type::SPAN:
@@ -108,22 +131,21 @@ bool JsonEventGroupSerializer::Serialize(BatchedEvents&& group, string& res, str
108131
case PipelineEvent::Type::RAW:
109132
for (const auto& item : group.mEvents) {
110133
const auto& e = item.Cast<RawEvent>();
111-
Json::Value eventJson;
112-
// tags
113-
eventJson.copy(groupTags);
114-
// time
115-
eventJson[JSON_KEY_TIME] = e.GetTimestamp();
134+
resetBuffer();
135+
136+
writer.StartObject();
137+
SerializeCommonFields(group.mTags, e.GetTimestamp(), writer);
116138
// content
117-
eventJson[DEFAULT_CONTENT_KEY] = e.GetContent().to_string();
118-
Json::StreamWriterBuilder writer;
119-
writer["indentation"] = "";
120-
oss << Json::writeString(writer, eventJson) << endl;
139+
writer.Key(DEFAULT_CONTENT_KEY.c_str());
140+
writer.String(e.GetContent().to_string().c_str());
141+
writer.EndObject();
142+
res.append(jsonBuffer.GetString());
143+
res.append("\n");
121144
}
122145
break;
123146
default:
124147
break;
125148
}
126-
res = oss.str();
127149
return true;
128150
}
129151

Diff for: core/plugin/flusher/file/FlusherFile.cpp

+16-46
Original file line numberDiff line numberDiff line change
@@ -43,18 +43,16 @@ bool FlusherFile::Init(const Json::Value& config, Json::Value& optionalGoPipelin
4343
mContext->GetLogstoreName(),
4444
mContext->GetRegion());
4545
}
46-
// Pattern
47-
// GetMandatoryStringParam(config, "Pattern", mPattern, errorMsg);
4846
// MaxFileSize
49-
// GetMandatoryUIntParam(config, "MaxFileSize", mMaxFileSize, errorMsg);
47+
GetMandatoryUIntParam(config, "MaxFileSize", mMaxFileSize, errorMsg);
5048
// MaxFiles
51-
// GetMandatoryUIntParam(config, "MaxFiles", mMaxFileSize, errorMsg);
49+
GetMandatoryUIntParam(config, "MaxFiles", mMaxFileSize, errorMsg);
5250

5351
// create file writer
54-
auto file_sink = std::make_shared<spdlog::sinks::rotating_file_sink_mt>(mFilePath, mMaxFileSize, mMaxFiles, true);
52+
auto fileSink = std::make_shared<spdlog::sinks::rotating_file_sink_mt>(mFilePath, mMaxFileSize, mMaxFiles, true);
5553
mFileWriter = std::make_shared<spdlog::async_logger>(
56-
sName, file_sink, spdlog::thread_pool(), spdlog::async_overflow_policy::block);
57-
mFileWriter->set_pattern(mPattern);
54+
sName, fileSink, spdlog::thread_pool(), spdlog::async_overflow_policy::block);
55+
mFileWriter->set_pattern("%v");
5856

5957
mBatcher.Init(Json::Value(), this, DefaultFlushStrategyOptions{});
6058
mGroupSerializer = make_unique<JsonEventGroupSerializer>(this);
@@ -63,63 +61,35 @@ bool FlusherFile::Init(const Json::Value& config, Json::Value& optionalGoPipelin
6361
}
6462

6563
bool FlusherFile::Send(PipelineEventGroup&& g) {
66-
if (g.IsReplay()) {
67-
return SerializeAndPush(std::move(g));
68-
} else {
69-
vector<BatchedEventsList> res;
70-
mBatcher.Add(std::move(g), res);
71-
return SerializeAndPush(std::move(res));
72-
}
64+
return SerializeAndPush(std::move(g));
7365
}
7466

7567
bool FlusherFile::Flush(size_t key) {
76-
BatchedEventsList res;
77-
mBatcher.FlushQueue(key, res);
78-
return SerializeAndPush(std::move(res));
68+
return true;
7969
}
8070

8171
bool FlusherFile::FlushAll() {
82-
vector<BatchedEventsList> res;
83-
mBatcher.FlushAll(res);
84-
return SerializeAndPush(std::move(res));
72+
return true;
8573
}
8674

8775
bool FlusherFile::SerializeAndPush(PipelineEventGroup&& group) {
88-
string serializedData, errorMsg;
76+
string serializedData;
77+
string errorMsg;
8978
BatchedEvents g(std::move(group.MutableEvents()),
9079
std::move(group.GetSizedTags()),
9180
std::move(group.GetSourceBuffer()),
9281
group.GetMetadata(EventGroupMetaKey::SOURCE_ID),
9382
std::move(group.GetExactlyOnceCheckpoint()));
94-
mGroupSerializer->DoSerialize(move(g), serializedData, errorMsg);
83+
mGroupSerializer->DoSerialize(std::move(g), serializedData, errorMsg);
9584
if (errorMsg.empty()) {
96-
mFileWriter->info(serializedData);
85+
if (!serializedData.empty() && serializedData.back() == '\n') {
86+
serializedData.pop_back();
87+
}
88+
mFileWriter->info(std::move(serializedData));
89+
mFileWriter->flush();
9790
} else {
9891
LOG_ERROR(sLogger, ("serialize pipeline event group error", errorMsg));
9992
}
100-
mFileWriter->flush();
101-
return true;
102-
}
103-
104-
bool FlusherFile::SerializeAndPush(BatchedEventsList&& groupList) {
105-
string serializedData;
106-
for (auto& group : groupList) {
107-
string errorMsg;
108-
mGroupSerializer->DoSerialize(move(group), serializedData, errorMsg);
109-
if (errorMsg.empty()) {
110-
mFileWriter->info(serializedData);
111-
} else {
112-
LOG_ERROR(sLogger, ("serialize pipeline event group error", errorMsg));
113-
}
114-
}
115-
mFileWriter->flush();
116-
return true;
117-
}
118-
119-
bool FlusherFile::SerializeAndPush(vector<BatchedEventsList>&& groupLists) {
120-
for (auto& groupList : groupLists) {
121-
SerializeAndPush(std::move(groupList));
122-
}
12393
return true;
12494
}
12595

Diff for: core/plugin/flusher/file/FlusherFile.h

-4
Original file line numberDiff line numberDiff line change
@@ -38,15 +38,11 @@ class FlusherFile : public Flusher {
3838

3939
private:
4040
bool SerializeAndPush(PipelineEventGroup&& group);
41-
bool SerializeAndPush(BatchedEventsList&& groupList);
42-
bool SerializeAndPush(std::vector<BatchedEventsList>&& groupLists);
4341

4442
std::shared_ptr<spdlog::logger> mFileWriter;
4543
std::string mFilePath;
46-
std::string mPattern = "%v";
4744
uint32_t mMaxFileSize = 1024 * 1024 * 10;
4845
uint32_t mMaxFiles = 10;
49-
Batcher<EventBatchStatus> mBatcher;
5046
std::unique_ptr<EventGroupSerializer> mGroupSerializer;
5147

5248
CounterPtr mSendCnt;

0 commit comments

Comments
 (0)