Skip to content

Optimize JsonSerializer and Flusher_file #2184

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 59 additions & 44 deletions core/collection_pipeline/serializer/JsonSerializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,25 @@

#include "collection_pipeline/serializer/JsonSerializer.h"

#include "constants/SpanConstants.h"
// TODO: the following dependencies should be removed
#include "protobuf/sls/LogGroupSerializer.h"
#include "rapidjson/stringbuffer.h"
#include "rapidjson/writer.h"

using namespace std;

namespace logtail {

const string JSON_KEY_TIME = "__time__";
// Helper function to serialize common fields (tags and time)
template <typename WriterType>
void SerializeCommonFields(const SizedMap& tags, uint64_t timestamp, WriterType& writer) {
// Serialize tags
for (const auto& tag : tags.mInner) {
writer.Key(tag.first.to_string().c_str());
writer.String(tag.second.to_string().c_str());
}
// Serialize time
writer.Key("__time__");
writer.Uint64(timestamp);
}

bool JsonEventGroupSerializer::Serialize(BatchedEvents&& group, string& res, string& errorMsg) {
if (group.mEvents.empty()) {
Expand All @@ -37,29 +47,31 @@ bool JsonEventGroupSerializer::Serialize(BatchedEvents&& group, string& res, str
return false;
}

Json::Value groupTags;
for (const auto& tag : group.mTags.mInner) {
groupTags[tag.first.to_string()] = tag.second.to_string();
}
// Create reusable StringBuffer and Writer
rapidjson::StringBuffer jsonBuffer;
rapidjson::Writer<rapidjson::StringBuffer> writer(jsonBuffer);
auto resetBuffer = [&jsonBuffer, &writer]() {
jsonBuffer.Clear(); // Clear the buffer for reuse
writer.Reset(jsonBuffer);
};

// TODO: should support nano second
ostringstream oss;
switch (eventType) {
case PipelineEvent::Type::LOG:
for (const auto& item : group.mEvents) {
const auto& e = item.Cast<LogEvent>();
Json::Value eventJson;
// tags
eventJson.copy(groupTags);
// time
eventJson[JSON_KEY_TIME] = e.GetTimestamp();
resetBuffer();

writer.StartObject();
SerializeCommonFields(group.mTags, e.GetTimestamp(), writer);
// contents
for (const auto& kv : e) {
eventJson[kv.first.to_string()] = kv.second.to_string();
writer.Key(kv.first.to_string().c_str());
writer.String(kv.second.to_string().c_str());
}
Json::StreamWriterBuilder writer;
writer["indentation"] = "";
oss << Json::writeString(writer, eventJson) << endl;
writer.EndObject();
res.append(jsonBuffer.GetString());
res.append("\n");
}
break;
case PipelineEvent::Type::METRIC:
Expand All @@ -69,34 +81,38 @@ bool JsonEventGroupSerializer::Serialize(BatchedEvents&& group, string& res, str
if (e.Is<std::monostate>()) {
continue;
}
Json::Value eventJson;
// tags
eventJson.copy(groupTags);
// time
eventJson[JSON_KEY_TIME] = e.GetTimestamp();
resetBuffer();

writer.StartObject();
SerializeCommonFields(group.mTags, e.GetTimestamp(), writer);
// __labels__
eventJson[METRIC_RESERVED_KEY_LABELS] = Json::objectValue;
auto& labels = eventJson[METRIC_RESERVED_KEY_LABELS];
writer.Key("__labels__");
writer.StartObject();
for (auto tag = e.TagsBegin(); tag != e.TagsEnd(); tag++) {
labels[tag->first.to_string()] = tag->second.to_string();
writer.Key(tag->first.to_string().c_str());
writer.String(tag->second.to_string().c_str());
}
writer.EndObject();
// __name__
eventJson[METRIC_RESERVED_KEY_NAME] = e.GetName().to_string();
writer.Key("__name__");
writer.String(e.GetName().to_string().c_str());
// __value__
writer.Key("__value__");
if (e.Is<UntypedSingleValue>()) {
eventJson[METRIC_RESERVED_KEY_VALUE] = e.GetValue<UntypedSingleValue>()->mValue;
writer.Double(e.GetValue<UntypedSingleValue>()->mValue);
} else if (e.Is<UntypedMultiDoubleValues>()) {
eventJson[METRIC_RESERVED_KEY_VALUE] = Json::objectValue;
auto& values = eventJson[METRIC_RESERVED_KEY_VALUE];
writer.StartObject();
for (auto value = e.GetValue<UntypedMultiDoubleValues>()->ValuesBegin();
value != e.GetValue<UntypedMultiDoubleValues>()->ValuesEnd();
value++) {
values[value->first.to_string()] = value->second.Value;
writer.Key(value->first.to_string().c_str());
writer.Double(value->second.Value);
}
writer.EndObject();
}
Json::StreamWriterBuilder writer;
writer["indentation"] = "";
oss << Json::writeString(writer, eventJson) << endl;
writer.EndObject();
res.append(jsonBuffer.GetString());
res.append("\n");
}
break;
case PipelineEvent::Type::SPAN:
Expand All @@ -108,22 +124,21 @@ bool JsonEventGroupSerializer::Serialize(BatchedEvents&& group, string& res, str
case PipelineEvent::Type::RAW:
for (const auto& item : group.mEvents) {
const auto& e = item.Cast<RawEvent>();
Json::Value eventJson;
// tags
eventJson.copy(groupTags);
// time
eventJson[JSON_KEY_TIME] = e.GetTimestamp();
resetBuffer();

writer.StartObject();
SerializeCommonFields(group.mTags, e.GetTimestamp(), writer);
// content
eventJson[DEFAULT_CONTENT_KEY] = e.GetContent().to_string();
Json::StreamWriterBuilder writer;
writer["indentation"] = "";
oss << Json::writeString(writer, eventJson) << endl;
writer.Key(DEFAULT_CONTENT_KEY.c_str());
writer.String(e.GetContent().to_string().c_str());
writer.EndObject();
res.append(jsonBuffer.GetString());
res.append("\n");
}
break;
default:
break;
}
res = oss.str();
return true;
}

Expand Down
67 changes: 18 additions & 49 deletions core/plugin/flusher/file/FlusherFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

#include "spdlog/async.h"
#include "spdlog/sinks/rotating_file_sink.h"
#include "spdlog/sinks/stdout_color_sinks.h"

#include "collection_pipeline/queue/SenderQueueManager.h"

Expand All @@ -43,83 +42,53 @@ bool FlusherFile::Init(const Json::Value& config, Json::Value& optionalGoPipelin
mContext->GetLogstoreName(),
mContext->GetRegion());
}
// Pattern
// GetMandatoryStringParam(config, "Pattern", mPattern, errorMsg);
// MaxFileSize
// GetMandatoryUIntParam(config, "MaxFileSize", mMaxFileSize, errorMsg);
GetMandatoryUIntParam(config, "MaxFileSize", mMaxFileSize, errorMsg);
// MaxFiles
// GetMandatoryUIntParam(config, "MaxFiles", mMaxFileSize, errorMsg);
GetMandatoryUIntParam(config, "MaxFiles", mMaxFileSize, errorMsg);

// create file writer
auto file_sink = std::make_shared<spdlog::sinks::rotating_file_sink_mt>(mFilePath, mMaxFileSize, mMaxFiles, true);
mFileWriter = std::make_shared<spdlog::async_logger>(
sName, file_sink, spdlog::thread_pool(), spdlog::async_overflow_policy::block);
mFileWriter->set_pattern(mPattern);
auto threadPool = std::make_shared<spdlog::details::thread_pool>(10, 1);
auto fileSink = std::make_shared<spdlog::sinks::rotating_file_sink_mt>(mFilePath, mMaxFileSize, mMaxFiles, true);
mFileWriter
= std::make_shared<spdlog::async_logger>(sName, fileSink, threadPool, spdlog::async_overflow_policy::block);
mFileWriter->set_pattern("%v");

mBatcher.Init(Json::Value(), this, DefaultFlushStrategyOptions{});
mGroupSerializer = make_unique<JsonEventGroupSerializer>(this);
mSendCnt = GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_FLUSHER_OUT_EVENT_GROUPS_TOTAL);
return true;
}

bool FlusherFile::Send(PipelineEventGroup&& g) {
if (g.IsReplay()) {
return SerializeAndPush(std::move(g));
} else {
vector<BatchedEventsList> res;
mBatcher.Add(std::move(g), res);
return SerializeAndPush(std::move(res));
}
return SerializeAndPush(std::move(g));
}

bool FlusherFile::Flush(size_t key) {
BatchedEventsList res;
mBatcher.FlushQueue(key, res);
return SerializeAndPush(std::move(res));
return true;
}

bool FlusherFile::FlushAll() {
vector<BatchedEventsList> res;
mBatcher.FlushAll(res);
return SerializeAndPush(std::move(res));
return true;
}

bool FlusherFile::SerializeAndPush(PipelineEventGroup&& group) {
string serializedData, errorMsg;
string serializedData;
string errorMsg;
BatchedEvents g(std::move(group.MutableEvents()),
std::move(group.GetSizedTags()),
std::move(group.GetSourceBuffer()),
group.GetMetadata(EventGroupMetaKey::SOURCE_ID),
std::move(group.GetExactlyOnceCheckpoint()));
mGroupSerializer->DoSerialize(move(g), serializedData, errorMsg);
mGroupSerializer->DoSerialize(std::move(g), serializedData, errorMsg);
if (errorMsg.empty()) {
mFileWriter->info(serializedData);
if (!serializedData.empty() && serializedData.back() == '\n') {
serializedData.pop_back();
}
mFileWriter->info(std::move(serializedData));
mFileWriter->flush();
} else {
LOG_ERROR(sLogger, ("serialize pipeline event group error", errorMsg));
}
mFileWriter->flush();
return true;
}

bool FlusherFile::SerializeAndPush(BatchedEventsList&& groupList) {
string serializedData;
for (auto& group : groupList) {
string errorMsg;
mGroupSerializer->DoSerialize(move(group), serializedData, errorMsg);
if (errorMsg.empty()) {
mFileWriter->info(serializedData);
} else {
LOG_ERROR(sLogger, ("serialize pipeline event group error", errorMsg));
}
}
mFileWriter->flush();
return true;
}

bool FlusherFile::SerializeAndPush(vector<BatchedEventsList>&& groupLists) {
for (auto& groupList : groupLists) {
SerializeAndPush(std::move(groupList));
}
return true;
}

Expand Down
4 changes: 0 additions & 4 deletions core/plugin/flusher/file/FlusherFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,11 @@ class FlusherFile : public Flusher {

private:
bool SerializeAndPush(PipelineEventGroup&& group);
bool SerializeAndPush(BatchedEventsList&& groupList);
bool SerializeAndPush(std::vector<BatchedEventsList>&& groupLists);

std::shared_ptr<spdlog::logger> mFileWriter;
std::string mFilePath;
std::string mPattern = "%v";
uint32_t mMaxFileSize = 1024 * 1024 * 10;
uint32_t mMaxFiles = 10;
Batcher<EventBatchStatus> mBatcher;
std::unique_ptr<EventGroupSerializer> mGroupSerializer;

CounterPtr mSendCnt;
Expand Down
Loading