From 37a00a2196be03a73f1eea96cb0b5707c04f34ef Mon Sep 17 00:00:00 2001 From: Takuka0311 <1914426213@qq.com> Date: Tue, 15 Apr 2025 16:55:19 +0800 Subject: [PATCH 1/7] update jsonSerializer and flusher_file --- .../serializer/JsonSerializer.cpp | 104 +++++++++++------- core/plugin/flusher/file/FlusherFile.cpp | 62 +++-------- core/plugin/flusher/file/FlusherFile.h | 4 - 3 files changed, 79 insertions(+), 91 deletions(-) diff --git a/core/collection_pipeline/serializer/JsonSerializer.cpp b/core/collection_pipeline/serializer/JsonSerializer.cpp index e05b5a7827..1fdea0c347 100644 --- a/core/collection_pipeline/serializer/JsonSerializer.cpp +++ b/core/collection_pipeline/serializer/JsonSerializer.cpp @@ -18,11 +18,28 @@ // TODO: the following dependencies should be removed #include "protobuf/sls/LogGroupSerializer.h" +#include "rapidjson/writer.h" +#include "rapidjson/stringbuffer.h" + using namespace std; namespace logtail { -const string JSON_KEY_TIME = "__time__"; + + const char* JSON_KEY_TIME = "__time__"; + + // Helper function to serialize common fields (tags and time) + template + void SerializeCommonFields(const SizedMap& tags, uint64_t timestamp, WriterType& writer) { + // Serialize tags + for (const auto& tag : tags.mInner) { + writer.Key(tag.first.to_string().c_str()); + writer.String(tag.second.to_string().c_str()); + } + // Serialize time + writer.Key(JSON_KEY_TIME); + writer.Uint64(timestamp); + } bool JsonEventGroupSerializer::Serialize(BatchedEvents&& group, string& res, string& errorMsg) { if (group.mEvents.empty()) { @@ -37,29 +54,31 @@ bool JsonEventGroupSerializer::Serialize(BatchedEvents&& group, string& res, str return false; } - Json::Value groupTags; - for (const auto& tag : group.mTags.mInner) { - groupTags[tag.first.to_string()] = tag.second.to_string(); - } + // Create reusable StringBuffer and Writer + rapidjson::StringBuffer jsonBuffer; + rapidjson::Writer writer(jsonBuffer); + auto resetBuffer = [&jsonBuffer, &writer]() { + jsonBuffer.Clear(); // Clear the buffer for reuse + writer.Reset(jsonBuffer); + }; // TODO: should support nano second - ostringstream oss; switch (eventType) { case PipelineEvent::Type::LOG: for (const auto& item : group.mEvents) { const auto& e = item.Cast(); - Json::Value eventJson; - // tags - eventJson.copy(groupTags); - // time - eventJson[JSON_KEY_TIME] = e.GetTimestamp(); + resetBuffer(); + + writer.StartObject(); + SerializeCommonFields(group.mTags, e.GetTimestamp(), writer); // contents for (const auto& kv : e) { - eventJson[kv.first.to_string()] = kv.second.to_string(); + writer.Key(kv.first.to_string().c_str()); + writer.String(kv.second.to_string().c_str()); } - Json::StreamWriterBuilder writer; - writer["indentation"] = ""; - oss << Json::writeString(writer, eventJson) << endl; + writer.EndObject(); + res.append(jsonBuffer.GetString()); + res.append("\n"); } break; case PipelineEvent::Type::METRIC: @@ -69,34 +88,38 @@ bool JsonEventGroupSerializer::Serialize(BatchedEvents&& group, string& res, str if (e.Is()) { continue; } - Json::Value eventJson; - // tags - eventJson.copy(groupTags); - // time - eventJson[JSON_KEY_TIME] = e.GetTimestamp(); + resetBuffer(); + + writer.StartObject(); + SerializeCommonFields(group.mTags, e.GetTimestamp(), writer); // __labels__ - eventJson[METRIC_RESERVED_KEY_LABELS] = Json::objectValue; - auto& labels = eventJson[METRIC_RESERVED_KEY_LABELS]; + writer.Key(METRIC_RESERVED_KEY_LABELS.c_str()); + writer.StartObject(); for (auto tag = e.TagsBegin(); tag != e.TagsEnd(); tag++) { - labels[tag->first.to_string()] = tag->second.to_string(); + writer.Key(tag->first.to_string().c_str()); + writer.String(tag->second.to_string().c_str()); } + writer.EndObject(); // __name__ - eventJson[METRIC_RESERVED_KEY_NAME] = e.GetName().to_string(); + writer.Key(METRIC_RESERVED_KEY_NAME.c_str()); + writer.String(e.GetName().to_string().c_str()); // __value__ + writer.Key(METRIC_RESERVED_KEY_VALUE.c_str()); if (e.Is()) { - eventJson[METRIC_RESERVED_KEY_VALUE] = e.GetValue()->mValue; + writer.Double(e.GetValue()->mValue); } else if (e.Is()) { - eventJson[METRIC_RESERVED_KEY_VALUE] = Json::objectValue; - auto& values = eventJson[METRIC_RESERVED_KEY_VALUE]; + writer.StartObject(); for (auto value = e.GetValue()->ValuesBegin(); value != e.GetValue()->ValuesEnd(); value++) { - values[value->first.to_string()] = value->second.Value; + writer.Key(value->first.to_string().c_str()); + writer.Double(value->second.Value); } + writer.EndObject(); } - Json::StreamWriterBuilder writer; - writer["indentation"] = ""; - oss << Json::writeString(writer, eventJson) << endl; + writer.EndObject(); + res.append(jsonBuffer.GetString()); + res.append("\n"); } break; case PipelineEvent::Type::SPAN: @@ -108,22 +131,21 @@ bool JsonEventGroupSerializer::Serialize(BatchedEvents&& group, string& res, str case PipelineEvent::Type::RAW: for (const auto& item : group.mEvents) { const auto& e = item.Cast(); - Json::Value eventJson; - // tags - eventJson.copy(groupTags); - // time - eventJson[JSON_KEY_TIME] = e.GetTimestamp(); + resetBuffer(); + + writer.StartObject(); + SerializeCommonFields(group.mTags, e.GetTimestamp(), writer); // content - eventJson[DEFAULT_CONTENT_KEY] = e.GetContent().to_string(); - Json::StreamWriterBuilder writer; - writer["indentation"] = ""; - oss << Json::writeString(writer, eventJson) << endl; + writer.Key(DEFAULT_CONTENT_KEY.c_str()); + writer.String(e.GetContent().to_string().c_str()); + writer.EndObject(); + res.append(jsonBuffer.GetString()); + res.append("\n"); } break; default: break; } - res = oss.str(); return true; } diff --git a/core/plugin/flusher/file/FlusherFile.cpp b/core/plugin/flusher/file/FlusherFile.cpp index 1f7117bb3d..a7b86adf8a 100644 --- a/core/plugin/flusher/file/FlusherFile.cpp +++ b/core/plugin/flusher/file/FlusherFile.cpp @@ -43,18 +43,16 @@ bool FlusherFile::Init(const Json::Value& config, Json::Value& optionalGoPipelin mContext->GetLogstoreName(), mContext->GetRegion()); } - // Pattern - // GetMandatoryStringParam(config, "Pattern", mPattern, errorMsg); // MaxFileSize - // GetMandatoryUIntParam(config, "MaxFileSize", mMaxFileSize, errorMsg); + GetMandatoryUIntParam(config, "MaxFileSize", mMaxFileSize, errorMsg); // MaxFiles - // GetMandatoryUIntParam(config, "MaxFiles", mMaxFileSize, errorMsg); + GetMandatoryUIntParam(config, "MaxFiles", mMaxFileSize, errorMsg); // create file writer - auto file_sink = std::make_shared(mFilePath, mMaxFileSize, mMaxFiles, true); + auto fileSink = std::make_shared(mFilePath, mMaxFileSize, mMaxFiles, true); mFileWriter = std::make_shared( - sName, file_sink, spdlog::thread_pool(), spdlog::async_overflow_policy::block); - mFileWriter->set_pattern(mPattern); + sName, fileSink, spdlog::thread_pool(), spdlog::async_overflow_policy::block); + mFileWriter->set_pattern("%v"); mBatcher.Init(Json::Value(), this, DefaultFlushStrategyOptions{}); mGroupSerializer = make_unique(this); @@ -63,63 +61,35 @@ bool FlusherFile::Init(const Json::Value& config, Json::Value& optionalGoPipelin } bool FlusherFile::Send(PipelineEventGroup&& g) { - if (g.IsReplay()) { - return SerializeAndPush(std::move(g)); - } else { - vector res; - mBatcher.Add(std::move(g), res); - return SerializeAndPush(std::move(res)); - } + return SerializeAndPush(std::move(g)); } bool FlusherFile::Flush(size_t key) { - BatchedEventsList res; - mBatcher.FlushQueue(key, res); - return SerializeAndPush(std::move(res)); + return true; } bool FlusherFile::FlushAll() { - vector res; - mBatcher.FlushAll(res); - return SerializeAndPush(std::move(res)); + return true; } bool FlusherFile::SerializeAndPush(PipelineEventGroup&& group) { - string serializedData, errorMsg; + string serializedData; + string errorMsg; BatchedEvents g(std::move(group.MutableEvents()), std::move(group.GetSizedTags()), std::move(group.GetSourceBuffer()), group.GetMetadata(EventGroupMetaKey::SOURCE_ID), std::move(group.GetExactlyOnceCheckpoint())); - mGroupSerializer->DoSerialize(move(g), serializedData, errorMsg); + mGroupSerializer->DoSerialize(std::move(g), serializedData, errorMsg); if (errorMsg.empty()) { - mFileWriter->info(serializedData); + if (!serializedData.empty() && serializedData.back() == '\n') { + serializedData.pop_back(); + } + mFileWriter->info(std::move(serializedData)); + mFileWriter->flush(); } else { LOG_ERROR(sLogger, ("serialize pipeline event group error", errorMsg)); } - mFileWriter->flush(); - return true; -} - -bool FlusherFile::SerializeAndPush(BatchedEventsList&& groupList) { - string serializedData; - for (auto& group : groupList) { - string errorMsg; - mGroupSerializer->DoSerialize(move(group), serializedData, errorMsg); - if (errorMsg.empty()) { - mFileWriter->info(serializedData); - } else { - LOG_ERROR(sLogger, ("serialize pipeline event group error", errorMsg)); - } - } - mFileWriter->flush(); - return true; -} - -bool FlusherFile::SerializeAndPush(vector&& groupLists) { - for (auto& groupList : groupLists) { - SerializeAndPush(std::move(groupList)); - } return true; } diff --git a/core/plugin/flusher/file/FlusherFile.h b/core/plugin/flusher/file/FlusherFile.h index ab2abd4533..eb5176d01c 100644 --- a/core/plugin/flusher/file/FlusherFile.h +++ b/core/plugin/flusher/file/FlusherFile.h @@ -38,15 +38,11 @@ class FlusherFile : public Flusher { private: bool SerializeAndPush(PipelineEventGroup&& group); - bool SerializeAndPush(BatchedEventsList&& groupList); - bool SerializeAndPush(std::vector&& groupLists); std::shared_ptr mFileWriter; std::string mFilePath; - std::string mPattern = "%v"; uint32_t mMaxFileSize = 1024 * 1024 * 10; uint32_t mMaxFiles = 10; - Batcher mBatcher; std::unique_ptr mGroupSerializer; CounterPtr mSendCnt; From 2c12da3e17b16bbc539bc36a3c1f2fa745672bdb Mon Sep 17 00:00:00 2001 From: Takuka0311 <1914426213@qq.com> Date: Tue, 15 Apr 2025 17:23:04 +0800 Subject: [PATCH 2/7] polish --- .../serializer/JsonSerializer.cpp | 27 +++++++++---------- core/plugin/flusher/file/FlusherFile.cpp | 1 - 2 files changed, 13 insertions(+), 15 deletions(-) diff --git a/core/collection_pipeline/serializer/JsonSerializer.cpp b/core/collection_pipeline/serializer/JsonSerializer.cpp index 1fdea0c347..94c1e59e92 100644 --- a/core/collection_pipeline/serializer/JsonSerializer.cpp +++ b/core/collection_pipeline/serializer/JsonSerializer.cpp @@ -25,21 +25,20 @@ using namespace std; namespace logtail { - - const char* JSON_KEY_TIME = "__time__"; - - // Helper function to serialize common fields (tags and time) - template - void SerializeCommonFields(const SizedMap& tags, uint64_t timestamp, WriterType& writer) { - // Serialize tags - for (const auto& tag : tags.mInner) { - writer.Key(tag.first.to_string().c_str()); - writer.String(tag.second.to_string().c_str()); - } - // Serialize time - writer.Key(JSON_KEY_TIME); - writer.Uint64(timestamp); +const char* JSON_KEY_TIME = "__time__"; + +// Helper function to serialize common fields (tags and time) +template +void SerializeCommonFields(const SizedMap& tags, uint64_t timestamp, WriterType& writer) { + // Serialize tags + for (const auto& tag : tags.mInner) { + writer.Key(tag.first.to_string().c_str()); + writer.String(tag.second.to_string().c_str()); } + // Serialize time + writer.Key(JSON_KEY_TIME); + writer.Uint64(timestamp); +} bool JsonEventGroupSerializer::Serialize(BatchedEvents&& group, string& res, string& errorMsg) { if (group.mEvents.empty()) { diff --git a/core/plugin/flusher/file/FlusherFile.cpp b/core/plugin/flusher/file/FlusherFile.cpp index a7b86adf8a..2e8e3d2c8a 100644 --- a/core/plugin/flusher/file/FlusherFile.cpp +++ b/core/plugin/flusher/file/FlusherFile.cpp @@ -54,7 +54,6 @@ bool FlusherFile::Init(const Json::Value& config, Json::Value& optionalGoPipelin sName, fileSink, spdlog::thread_pool(), spdlog::async_overflow_policy::block); mFileWriter->set_pattern("%v"); - mBatcher.Init(Json::Value(), this, DefaultFlushStrategyOptions{}); mGroupSerializer = make_unique(this); mSendCnt = GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_FLUSHER_OUT_EVENT_GROUPS_TOTAL); return true; From 43b5eb1ba2fec6d205ce953165097f9bad48b6ee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=84=E9=A3=8F?= Date: Tue, 15 Apr 2025 11:43:31 +0000 Subject: [PATCH 3/7] fix lint --- .../serializer/JsonSerializer.cpp | 16 +++++----------- core/plugin/flusher/file/FlusherFile.cpp | 1 - 2 files changed, 5 insertions(+), 12 deletions(-) diff --git a/core/collection_pipeline/serializer/JsonSerializer.cpp b/core/collection_pipeline/serializer/JsonSerializer.cpp index 94c1e59e92..2a2b00252e 100644 --- a/core/collection_pipeline/serializer/JsonSerializer.cpp +++ b/core/collection_pipeline/serializer/JsonSerializer.cpp @@ -14,19 +14,13 @@ #include "collection_pipeline/serializer/JsonSerializer.h" -#include "constants/SpanConstants.h" -// TODO: the following dependencies should be removed -#include "protobuf/sls/LogGroupSerializer.h" - -#include "rapidjson/writer.h" #include "rapidjson/stringbuffer.h" +#include "rapidjson/writer.h" using namespace std; namespace logtail { -const char* JSON_KEY_TIME = "__time__"; - // Helper function to serialize common fields (tags and time) template void SerializeCommonFields(const SizedMap& tags, uint64_t timestamp, WriterType& writer) { @@ -36,7 +30,7 @@ void SerializeCommonFields(const SizedMap& tags, uint64_t timestamp, WriterType& writer.String(tag.second.to_string().c_str()); } // Serialize time - writer.Key(JSON_KEY_TIME); + writer.Key("__time__"); writer.Uint64(timestamp); } @@ -92,7 +86,7 @@ bool JsonEventGroupSerializer::Serialize(BatchedEvents&& group, string& res, str writer.StartObject(); SerializeCommonFields(group.mTags, e.GetTimestamp(), writer); // __labels__ - writer.Key(METRIC_RESERVED_KEY_LABELS.c_str()); + writer.Key("__labels__"); writer.StartObject(); for (auto tag = e.TagsBegin(); tag != e.TagsEnd(); tag++) { writer.Key(tag->first.to_string().c_str()); @@ -100,10 +94,10 @@ bool JsonEventGroupSerializer::Serialize(BatchedEvents&& group, string& res, str } writer.EndObject(); // __name__ - writer.Key(METRIC_RESERVED_KEY_NAME.c_str()); + writer.Key("__name__"); writer.String(e.GetName().to_string().c_str()); // __value__ - writer.Key(METRIC_RESERVED_KEY_VALUE.c_str()); + writer.Key("__value__"); if (e.Is()) { writer.Double(e.GetValue()->mValue); } else if (e.Is()) { diff --git a/core/plugin/flusher/file/FlusherFile.cpp b/core/plugin/flusher/file/FlusherFile.cpp index 2e8e3d2c8a..cdbd90c647 100644 --- a/core/plugin/flusher/file/FlusherFile.cpp +++ b/core/plugin/flusher/file/FlusherFile.cpp @@ -16,7 +16,6 @@ #include "spdlog/async.h" #include "spdlog/sinks/rotating_file_sink.h" -#include "spdlog/sinks/stdout_color_sinks.h" #include "collection_pipeline/queue/SenderQueueManager.h" From f84170df27af6cbf67c26bbdd96320af03b1250b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=84=E9=A3=8F?= Date: Tue, 15 Apr 2025 11:55:06 +0000 Subject: [PATCH 4/7] Optimize spdlog --- core/plugin/flusher/file/FlusherFile.cpp | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/core/plugin/flusher/file/FlusherFile.cpp b/core/plugin/flusher/file/FlusherFile.cpp index cdbd90c647..53868fa3c3 100644 --- a/core/plugin/flusher/file/FlusherFile.cpp +++ b/core/plugin/flusher/file/FlusherFile.cpp @@ -48,9 +48,10 @@ bool FlusherFile::Init(const Json::Value& config, Json::Value& optionalGoPipelin GetMandatoryUIntParam(config, "MaxFiles", mMaxFileSize, errorMsg); // create file writer + auto threadPool = std::make_shared(10, 1); auto fileSink = std::make_shared(mFilePath, mMaxFileSize, mMaxFiles, true); - mFileWriter = std::make_shared( - sName, fileSink, spdlog::thread_pool(), spdlog::async_overflow_policy::block); + mFileWriter + = std::make_shared(sName, fileSink, threadPool, spdlog::async_overflow_policy::block); mFileWriter->set_pattern("%v"); mGroupSerializer = make_unique(this); From a0a8ba8ab9fe92a5a9d78475a258aeff07e2fafa Mon Sep 17 00:00:00 2001 From: Takuka0311 <1914426213@qq.com> Date: Sun, 27 Apr 2025 10:38:39 +0800 Subject: [PATCH 5/7] Update core/plugin/flusher/file/FlusherFile.cpp Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- core/plugin/flusher/file/FlusherFile.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/plugin/flusher/file/FlusherFile.cpp b/core/plugin/flusher/file/FlusherFile.cpp index 53868fa3c3..0cfbea0fb5 100644 --- a/core/plugin/flusher/file/FlusherFile.cpp +++ b/core/plugin/flusher/file/FlusherFile.cpp @@ -45,7 +45,7 @@ bool FlusherFile::Init(const Json::Value& config, Json::Value& optionalGoPipelin // MaxFileSize GetMandatoryUIntParam(config, "MaxFileSize", mMaxFileSize, errorMsg); // MaxFiles - GetMandatoryUIntParam(config, "MaxFiles", mMaxFileSize, errorMsg); + GetMandatoryUIntParam(config, "MaxFiles", mMaxFiles, errorMsg); // create file writer auto threadPool = std::make_shared(10, 1); From 2fdcae4a6a0d189bae26aa449ae89875a6aec095 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=84=E9=A3=8F?= Date: Sun, 27 Apr 2025 10:39:30 +0000 Subject: [PATCH 6/7] add ut --- .github/workflows/benchmark.yaml | 1 - .../serializer/JsonSerializer.cpp | 17 +- core/unittest/serializer/CMakeLists.txt | 4 + .../serializer/JsonSerializerUnittest.cpp | 357 ++++++++++++++++++ 4 files changed, 371 insertions(+), 8 deletions(-) create mode 100644 core/unittest/serializer/JsonSerializerUnittest.cpp diff --git a/.github/workflows/benchmark.yaml b/.github/workflows/benchmark.yaml index 5d3258d23e..577eb206aa 100644 --- a/.github/workflows/benchmark.yaml +++ b/.github/workflows/benchmark.yaml @@ -82,7 +82,6 @@ jobs: make benchmark DOCKER_REPOSITORY=ghcr.io/$REPO_OWNER/loongcollector DOCKER_PUSH=true VERSION=edge Benchmark: - if: github.event.pull_request.merged == true runs-on: ${{ matrix.runner }} timeout-minutes: 60 strategy: diff --git a/core/collection_pipeline/serializer/JsonSerializer.cpp b/core/collection_pipeline/serializer/JsonSerializer.cpp index 2a2b00252e..358bd47eac 100644 --- a/core/collection_pipeline/serializer/JsonSerializer.cpp +++ b/core/collection_pipeline/serializer/JsonSerializer.cpp @@ -45,6 +45,9 @@ bool JsonEventGroupSerializer::Serialize(BatchedEvents&& group, string& res, str // should not happen errorMsg = "unsupported event type in event group"; return false; + } else if (eventType == PipelineEvent::Type::SPAN) { + errorMsg = "invalid event type, span type is not yet supported"; + return false; } // Create reusable StringBuffer and Writer @@ -60,6 +63,9 @@ bool JsonEventGroupSerializer::Serialize(BatchedEvents&& group, string& res, str case PipelineEvent::Type::LOG: for (const auto& item : group.mEvents) { const auto& e = item.Cast(); + if (e.Empty()) { + continue; + } resetBuffer(); writer.StartObject(); @@ -115,15 +121,12 @@ bool JsonEventGroupSerializer::Serialize(BatchedEvents&& group, string& res, str res.append("\n"); } break; - case PipelineEvent::Type::SPAN: - // TODO: implement span serializer - LOG_ERROR( - sLogger, - ("invalid event type", "span type is not supported")("config", mFlusher->GetContext().GetConfigName())); - break; case PipelineEvent::Type::RAW: for (const auto& item : group.mEvents) { const auto& e = item.Cast(); + if (e.GetContent().empty()) { + continue; + } resetBuffer(); writer.StartObject(); @@ -139,7 +142,7 @@ bool JsonEventGroupSerializer::Serialize(BatchedEvents&& group, string& res, str default: break; } - return true; + return !res.empty(); } } // namespace logtail diff --git a/core/unittest/serializer/CMakeLists.txt b/core/unittest/serializer/CMakeLists.txt index b471e3b81a..4b063c8c47 100644 --- a/core/unittest/serializer/CMakeLists.txt +++ b/core/unittest/serializer/CMakeLists.txt @@ -21,6 +21,10 @@ target_link_libraries(serializer_unittest ${UT_BASE_TARGET}) add_executable(sls_serializer_unittest SLSSerializerUnittest.cpp) target_link_libraries(sls_serializer_unittest ${UT_BASE_TARGET}) +add_executable(json_serializer_unittest JsonSerializerUnittest.cpp) +target_link_libraries(json_serializer_unittest ${UT_BASE_TARGET}) + include(GoogleTest) gtest_discover_tests(serializer_unittest) gtest_discover_tests(sls_serializer_unittest) +gtest_discover_tests(json_serializer_unittest) diff --git a/core/unittest/serializer/JsonSerializerUnittest.cpp b/core/unittest/serializer/JsonSerializerUnittest.cpp new file mode 100644 index 0000000000..3c06110839 --- /dev/null +++ b/core/unittest/serializer/JsonSerializerUnittest.cpp @@ -0,0 +1,357 @@ +// Copyright 2025 iLogtail Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + + +#include "collection_pipeline/serializer/JsonSerializer.h" +#include "unittest/Unittest.h" +#include "unittest/plugin/PluginMock.h" + +DECLARE_FLAG_INT32(max_send_log_group_size); + +using namespace std; + +namespace logtail { + +class JsonSerializerUnittest : public ::testing::Test { +public: + void TestSerializeEventGroup(); + +protected: + static void SetUpTestCase() { sFlusher = make_unique(); } + + void SetUp() override { + mCtx.SetConfigName("test_config"); + sFlusher->SetContext(mCtx); + sFlusher->SetMetricsRecordRef(FlusherMock::sName, "1"); + } + +private: + BatchedEvents + createBatchedLogEvents(bool enableNanosecond, bool withEmptyContent = false, bool withNonEmptyContent = true); + BatchedEvents + createBatchedMetricEvents(bool enableNanosecond, uint32_t nanoTimestamp, bool emptyValue, bool onlyOneTag, bool multiValue = false); + BatchedEvents + createBatchedRawEvents(bool enableNanosecond, bool withEmptyContent = false, bool withNonEmptyContent = true); + BatchedEvents createBatchedSpanEvents(); + + static unique_ptr sFlusher; + + CollectionPipelineContext mCtx; +}; + +unique_ptr JsonSerializerUnittest::sFlusher; + +void JsonSerializerUnittest::TestSerializeEventGroup() { + JsonEventGroupSerializer serializer(sFlusher.get()); + { // log + { // nano second disabled, and set + string res; + string errorMsg; + APSARA_TEST_TRUE(serializer.DoSerialize(createBatchedLogEvents(false), res, errorMsg)); + APSARA_TEST_EQUAL("{\"__machine_uuid__\":\"machine_uuid\",\"__pack_id__\":\"pack_id\",\"__source__\":\"source\",\"__topic__\":\"topic\",\"__time__\":1234567890,\"key\":\"value\"}\n", res); + APSARA_TEST_EQUAL("", errorMsg); + } + { // nano second enabled, and set + // Todo + } + { // nano second enabled, not set + const_cast(mCtx.GetGlobalConfig()).mEnableTimestampNanosecond = true; + string res; + string errorMsg; + APSARA_TEST_TRUE(serializer.DoSerialize(createBatchedLogEvents(false), res, errorMsg)); + APSARA_TEST_EQUAL("{\"__machine_uuid__\":\"machine_uuid\",\"__pack_id__\":\"pack_id\",\"__source__\":\"source\",\"__topic__\":\"topic\",\"__time__\":1234567890,\"key\":\"value\"}\n", res); + APSARA_TEST_EQUAL("", errorMsg); + } + { // with empty event + string res; + string errorMsg; + APSARA_TEST_TRUE(serializer.DoSerialize(createBatchedLogEvents(false, true, true), res, errorMsg)); + APSARA_TEST_EQUAL("{\"__machine_uuid__\":\"machine_uuid\",\"__pack_id__\":\"pack_id\",\"__source__\":\"source\",\"__topic__\":\"topic\",\"__time__\":1234567890,\"key\":\"value\"}\n", res); + APSARA_TEST_EQUAL("", errorMsg); + } + { // only empty event + string res; + string errorMsg; + APSARA_TEST_FALSE(serializer.DoSerialize(createBatchedLogEvents(false, true, false), res, errorMsg)); + APSARA_TEST_EQUAL("", res); + APSARA_TEST_EQUAL("", errorMsg); + } + } + { // metric + { // only 1 tag + string res; + string errorMsg; + APSARA_TEST_TRUE(serializer.DoSerialize(createBatchedMetricEvents(false, 0, false, true), res, errorMsg)); + APSARA_TEST_EQUAL("{\"__machine_uuid__\":\"machine_uuid\",\"__pack_id__\":\"pack_id\",\"__source__\":\"source\",\"__topic__\":\"topic\",\"__time__\":1234567890,\"__labels__\":{\"key1\":\"value1\"},\"__name__\":\"test_gauge\",\"__value__\":0.1}\n", res); + APSARA_TEST_EQUAL("", errorMsg); + } + { // multi value + string res; + string errorMsg; + APSARA_TEST_TRUE(serializer.DoSerialize(createBatchedMetricEvents(false, 0, false, false, true), res, errorMsg)); + APSARA_TEST_EQUAL("{\"__machine_uuid__\":\"machine_uuid\",\"__pack_id__\":\"pack_id\",\"__source__\":\"source\",\"__topic__\":\"topic\",\"__time__\":1234567890,\"__labels__\":{\"key1\":\"value1\",\"key2\":\"value2\"},\"__name__\":\"test_gauge\",\"__value__\":{\"test-1\":10.0,\"test-2\":2.0}}\n", res); + APSARA_TEST_EQUAL("", errorMsg); + } + { // nano second disabled + string res; + string errorMsg; + APSARA_TEST_TRUE(serializer.DoSerialize(createBatchedMetricEvents(false, 0, false, false), res, errorMsg)); + APSARA_TEST_EQUAL("{\"__machine_uuid__\":\"machine_uuid\",\"__pack_id__\":\"pack_id\",\"__source__\":\"source\",\"__topic__\":\"topic\",\"__time__\":1234567890,\"__labels__\":{\"key1\":\"value1\",\"key2\":\"value2\"},\"__name__\":\"test_gauge\",\"__value__\":0.1}\n", res); + APSARA_TEST_EQUAL("", errorMsg); + } + { // nano second enabled + // Todo + } + { // empty metric value + string res; + string errorMsg; + APSARA_TEST_FALSE(serializer.DoSerialize(createBatchedMetricEvents(false, 0, true, false), res, errorMsg)); + APSARA_TEST_EQUAL("", res); + APSARA_TEST_EQUAL("", errorMsg); + } + } + { // span + string res; + string errorMsg; + auto events = createBatchedSpanEvents(); + APSARA_TEST_EQUAL(events.mEvents.size(), 1U); + APSARA_TEST_TRUE(events.mEvents[0]->GetType() == PipelineEvent::Type::SPAN); + APSARA_TEST_FALSE(serializer.DoSerialize(std::move(events), res, errorMsg)); + APSARA_TEST_EQUAL("", res); + APSARA_TEST_EQUAL("invalid event type, span type is not yet supported", errorMsg); + } + { // raw + { // nano second disabled, and set + string res; + string errorMsg; + APSARA_TEST_TRUE(serializer.DoSerialize(createBatchedRawEvents(false), res, errorMsg)); + APSARA_TEST_EQUAL("{\"__machine_uuid__\":\"machine_uuid\",\"__pack_id__\":\"pack_id\",\"__source__\":\"source\",\"__topic__\":\"topic\",\"__time__\":1234567890,\"content\":\"value\"}\n", res); + APSARA_TEST_EQUAL("", errorMsg); + } + { // nano second enabled, and set + // Todo + } + { // nano second enabled, not set + const_cast(mCtx.GetGlobalConfig()).mEnableTimestampNanosecond = true; + string res; + string errorMsg; + APSARA_TEST_TRUE(serializer.DoSerialize(createBatchedRawEvents(false), res, errorMsg)); + APSARA_TEST_EQUAL("{\"__machine_uuid__\":\"machine_uuid\",\"__pack_id__\":\"pack_id\",\"__source__\":\"source\",\"__topic__\":\"topic\",\"__time__\":1234567890,\"content\":\"value\"}\n", res); + APSARA_TEST_EQUAL("", errorMsg); + } + { // with empty event + string res; + string errorMsg; + APSARA_TEST_TRUE(serializer.DoSerialize(createBatchedRawEvents(false, true, true), res, errorMsg)); + APSARA_TEST_EQUAL("{\"__machine_uuid__\":\"machine_uuid\",\"__pack_id__\":\"pack_id\",\"__source__\":\"source\",\"__topic__\":\"topic\",\"__time__\":1234567890,\"content\":\"value\"}\n", res); + APSARA_TEST_EQUAL("", errorMsg); + } + { // only empty event + string res; + string errorMsg; + APSARA_TEST_FALSE(serializer.DoSerialize(createBatchedRawEvents(false, true, false), res, errorMsg)); + APSARA_TEST_EQUAL("", res); + APSARA_TEST_EQUAL("", errorMsg); + } + } + { // empty log group + PipelineEventGroup group(make_shared()); + BatchedEvents batch(std::move(group.MutableEvents()), + std::move(group.GetSizedTags()), + std::move(group.GetSourceBuffer()), + group.GetMetadata(EventGroupMetaKey::SOURCE_ID), + std::move(group.GetExactlyOnceCheckpoint())); + string res; + string errorMsg; + APSARA_TEST_FALSE(serializer.DoSerialize(std::move(batch), res, errorMsg)); + APSARA_TEST_EQUAL("", res); + APSARA_TEST_EQUAL("empty event group", errorMsg); + } +} + + +BatchedEvents +JsonSerializerUnittest::createBatchedLogEvents(bool enableNanosecond, bool withEmptyContent, bool withNonEmptyContent) { + PipelineEventGroup group(make_shared()); + group.SetTag(LOG_RESERVED_KEY_TOPIC, "topic"); + group.SetTag(LOG_RESERVED_KEY_SOURCE, "source"); + group.SetTag(LOG_RESERVED_KEY_MACHINE_UUID, "machine_uuid"); + group.SetTag(LOG_RESERVED_KEY_PACKAGE_ID, "pack_id"); + StringBuffer b = group.GetSourceBuffer()->CopyString(string("pack_id")); + group.SetMetadataNoCopy(EventGroupMetaKey::SOURCE_ID, StringView(b.data, b.size)); + group.SetExactlyOnceCheckpoint(RangeCheckpointPtr(new RangeCheckpoint)); + if (withNonEmptyContent) { + LogEvent* e = group.AddLogEvent(); + e->SetContent(string("key"), string("value")); + if (enableNanosecond) { + e->SetTimestamp(1234567890, 1); + } else { + e->SetTimestamp(1234567890); + } + } + if (withEmptyContent) { + LogEvent* e = group.AddLogEvent(); + if (enableNanosecond) { + e->SetTimestamp(1234567890, 1); + } else { + e->SetTimestamp(1234567890); + } + } + BatchedEvents batch(std::move(group.MutableEvents()), + std::move(group.GetSizedTags()), + std::move(group.GetSourceBuffer()), + group.GetMetadata(EventGroupMetaKey::SOURCE_ID), + std::move(group.GetExactlyOnceCheckpoint())); + return batch; +} + +BatchedEvents JsonSerializerUnittest::createBatchedMetricEvents(bool enableNanosecond, + uint32_t nanoTimestamp, + bool emptyValue, + bool onlyOneTag, + bool multiValue) { + PipelineEventGroup group(make_shared()); + group.SetTag(LOG_RESERVED_KEY_TOPIC, "topic"); + group.SetTag(LOG_RESERVED_KEY_SOURCE, "source"); + group.SetTag(LOG_RESERVED_KEY_MACHINE_UUID, "machine_uuid"); + group.SetTag(LOG_RESERVED_KEY_PACKAGE_ID, "pack_id"); + + StringBuffer b = group.GetSourceBuffer()->CopyString(string("pack_id")); + group.SetMetadataNoCopy(EventGroupMetaKey::SOURCE_ID, StringView(b.data, b.size)); + group.SetExactlyOnceCheckpoint(RangeCheckpointPtr(new RangeCheckpoint)); + MetricEvent* e = group.AddMetricEvent(); + e->SetTag(string("key1"), string("value1")); + if (!onlyOneTag) { + e->SetTag(string("key2"), string("value2")); + } + if (enableNanosecond) { + e->SetTimestamp(1234567890, nanoTimestamp); + } else { + e->SetTimestamp(1234567890); + } + + if (!emptyValue) { + if (!multiValue) { + double value = 0.1; + e->SetValue(value); + } else { + UntypedMultiDoubleValues v({{"test-1", {UntypedValueMetricType::MetricTypeCounter, 10.0}}, + {"test-2", {UntypedValueMetricType::MetricTypeGauge, 2.0}}}, + nullptr); + e->SetValue(v); + } + } + e->SetName("test_gauge"); + BatchedEvents batch(std::move(group.MutableEvents()), + std::move(group.GetSizedTags()), + std::move(group.GetSourceBuffer()), + group.GetMetadata(EventGroupMetaKey::SOURCE_ID), + std::move(group.GetExactlyOnceCheckpoint())); + return batch; +} + +BatchedEvents +JsonSerializerUnittest::createBatchedRawEvents(bool enableNanosecond, bool withEmptyContent, bool withNonEmptyContent) { + PipelineEventGroup group(make_shared()); + group.SetTag(LOG_RESERVED_KEY_TOPIC, "topic"); + group.SetTag(LOG_RESERVED_KEY_SOURCE, "source"); + group.SetTag(LOG_RESERVED_KEY_MACHINE_UUID, "machine_uuid"); + group.SetTag(LOG_RESERVED_KEY_PACKAGE_ID, "pack_id"); + StringBuffer b = group.GetSourceBuffer()->CopyString(string("pack_id")); + group.SetMetadataNoCopy(EventGroupMetaKey::SOURCE_ID, StringView(b.data, b.size)); + group.SetExactlyOnceCheckpoint(RangeCheckpointPtr(new RangeCheckpoint)); + if (withNonEmptyContent) { + RawEvent* e = group.AddRawEvent(); + e->SetContent(string("value")); + if (enableNanosecond) { + e->SetTimestamp(1234567890, 1); + } else { + e->SetTimestamp(1234567890); + } + } + if (withEmptyContent) { + RawEvent* e = group.AddRawEvent(); + e->SetContent(string("")); + if (enableNanosecond) { + e->SetTimestamp(1234567890, 1); + } else { + e->SetTimestamp(1234567890); + } + } + BatchedEvents batch(std::move(group.MutableEvents()), + std::move(group.GetSizedTags()), + std::move(group.GetSourceBuffer()), + group.GetMetadata(EventGroupMetaKey::SOURCE_ID), + std::move(group.GetExactlyOnceCheckpoint())); + return batch; +} + +BatchedEvents JsonSerializerUnittest::createBatchedSpanEvents() { + PipelineEventGroup group(make_shared()); + group.SetTag(LOG_RESERVED_KEY_TOPIC, "topic"); + group.SetTag(LOG_RESERVED_KEY_SOURCE, "source"); + group.SetTag(LOG_RESERVED_KEY_MACHINE_UUID, "aaa"); + group.SetTag(LOG_RESERVED_KEY_PACKAGE_ID, "bbb"); + auto now = std::chrono::system_clock::now(); + auto duration = now.time_since_epoch(); + auto seconds = std::chrono::duration_cast(duration).count(); + // auto nano = std::chrono::duration_cast(duration).count(); + StringBuffer b = group.GetSourceBuffer()->CopyString(string("pack_id")); + group.SetMetadataNoCopy(EventGroupMetaKey::SOURCE_ID, StringView(b.data, b.size)); + group.SetExactlyOnceCheckpoint(RangeCheckpointPtr(new RangeCheckpoint)); + SpanEvent* spanEvent = group.AddSpanEvent(); + spanEvent->SetScopeTag(std::string("scope-tag-0"), std::string("scope-value-0")); + spanEvent->SetTag(std::string("workloadName"), std::string("arms-oneagent-test-ql")); + spanEvent->SetTag(std::string("workloadKind"), std::string("faceless")); + spanEvent->SetTag(std::string("source_ip"), std::string("10.54.0.33")); + spanEvent->SetTag(std::string("host"), std::string("10.54.0.33")); + spanEvent->SetTag(std::string("rpc"), std::string("/oneagent/qianlu/local/1")); + spanEvent->SetTag(std::string("rpcType"), std::string("25")); + spanEvent->SetTag(std::string("callType"), std::string("http-client")); + spanEvent->SetTag(std::string("statusCode"), std::string("200")); + spanEvent->SetTag(std::string("version"), std::string("HTTP1.1")); + auto innerEvent = spanEvent->AddEvent(); + innerEvent->SetTag(std::string("innner-event-key-0"), std::string("inner-event-value-0")); + innerEvent->SetTag(std::string("innner-event-key-1"), std::string("inner-event-value-1")); + innerEvent->SetName("inner-event"); + innerEvent->SetTimestampNs(1000); + auto innerLink = spanEvent->AddLink(); + innerLink->SetTag(std::string("innner-link-key-0"), std::string("inner-link-value-0")); + innerLink->SetTag(std::string("innner-link-key-1"), std::string("inner-link-value-1")); + innerLink->SetTraceId("inner-link-traceid"); + innerLink->SetSpanId("inner-link-spanid"); + innerLink->SetTraceState("inner-link-trace-state"); + spanEvent->SetName("/oneagent/qianlu/local/1"); + spanEvent->SetKind(SpanEvent::Kind::Client); + spanEvent->SetStatus(SpanEvent::StatusCode::Ok); + spanEvent->SetSpanId("span-1-2-3-4-5"); + spanEvent->SetTraceId("trace-1-2-3-4-5"); + spanEvent->SetParentSpanId("parent-1-2-3-4-5"); + spanEvent->SetTraceState("test-state"); + spanEvent->SetStartTimeNs(1000); + spanEvent->SetEndTimeNs(2000); + spanEvent->SetTimestamp(seconds); + BatchedEvents batch(std::move(group.MutableEvents()), + std::move(group.GetSizedTags()), + std::move(group.GetSourceBuffer()), + group.GetMetadata(EventGroupMetaKey::SOURCE_ID), + std::move(group.GetExactlyOnceCheckpoint())); + return batch; +} + +UNIT_TEST_CASE(JsonSerializerUnittest, TestSerializeEventGroup) + +} // namespace logtail + +UNIT_TEST_MAIN From 0d9a482c95571a4a60367f4df342ba75e2a0fd49 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=84=E9=A3=8F?= Date: Sun, 27 Apr 2025 12:16:14 +0000 Subject: [PATCH 7/7] fix lint --- .../serializer/JsonSerializerUnittest.cpp | 70 ++++++++++++------- 1 file changed, 45 insertions(+), 25 deletions(-) diff --git a/core/unittest/serializer/JsonSerializerUnittest.cpp b/core/unittest/serializer/JsonSerializerUnittest.cpp index 3c06110839..d734ae03e1 100644 --- a/core/unittest/serializer/JsonSerializerUnittest.cpp +++ b/core/unittest/serializer/JsonSerializerUnittest.cpp @@ -39,8 +39,8 @@ class JsonSerializerUnittest : public ::testing::Test { private: BatchedEvents createBatchedLogEvents(bool enableNanosecond, bool withEmptyContent = false, bool withNonEmptyContent = true); - BatchedEvents - createBatchedMetricEvents(bool enableNanosecond, uint32_t nanoTimestamp, bool emptyValue, bool onlyOneTag, bool multiValue = false); + BatchedEvents createBatchedMetricEvents( + bool enableNanosecond, uint32_t nanoTimestamp, bool emptyValue, bool onlyOneTag, bool multiValue = false); BatchedEvents createBatchedRawEvents(bool enableNanosecond, bool withEmptyContent = false, bool withNonEmptyContent = true); BatchedEvents createBatchedSpanEvents(); @@ -59,25 +59,31 @@ void JsonSerializerUnittest::TestSerializeEventGroup() { string res; string errorMsg; APSARA_TEST_TRUE(serializer.DoSerialize(createBatchedLogEvents(false), res, errorMsg)); - APSARA_TEST_EQUAL("{\"__machine_uuid__\":\"machine_uuid\",\"__pack_id__\":\"pack_id\",\"__source__\":\"source\",\"__topic__\":\"topic\",\"__time__\":1234567890,\"key\":\"value\"}\n", res); + APSARA_TEST_EQUAL("{\"__machine_uuid__\":\"machine_uuid\",\"__pack_id__\":\"pack_id\",\"__source__\":" + "\"source\",\"__topic__\":\"topic\",\"__time__\":1234567890,\"key\":\"value\"}\n", + res); APSARA_TEST_EQUAL("", errorMsg); } - { // nano second enabled, and set - // Todo + { // nano second enabled, and set + // Todo } { // nano second enabled, not set const_cast(mCtx.GetGlobalConfig()).mEnableTimestampNanosecond = true; string res; string errorMsg; APSARA_TEST_TRUE(serializer.DoSerialize(createBatchedLogEvents(false), res, errorMsg)); - APSARA_TEST_EQUAL("{\"__machine_uuid__\":\"machine_uuid\",\"__pack_id__\":\"pack_id\",\"__source__\":\"source\",\"__topic__\":\"topic\",\"__time__\":1234567890,\"key\":\"value\"}\n", res); + APSARA_TEST_EQUAL("{\"__machine_uuid__\":\"machine_uuid\",\"__pack_id__\":\"pack_id\",\"__source__\":" + "\"source\",\"__topic__\":\"topic\",\"__time__\":1234567890,\"key\":\"value\"}\n", + res); APSARA_TEST_EQUAL("", errorMsg); } { // with empty event string res; string errorMsg; APSARA_TEST_TRUE(serializer.DoSerialize(createBatchedLogEvents(false, true, true), res, errorMsg)); - APSARA_TEST_EQUAL("{\"__machine_uuid__\":\"machine_uuid\",\"__pack_id__\":\"pack_id\",\"__source__\":\"source\",\"__topic__\":\"topic\",\"__time__\":1234567890,\"key\":\"value\"}\n", res); + APSARA_TEST_EQUAL("{\"__machine_uuid__\":\"machine_uuid\",\"__pack_id__\":\"pack_id\",\"__source__\":" + "\"source\",\"__topic__\":\"topic\",\"__time__\":1234567890,\"key\":\"value\"}\n", + res); APSARA_TEST_EQUAL("", errorMsg); } { // only empty event @@ -93,25 +99,36 @@ void JsonSerializerUnittest::TestSerializeEventGroup() { string res; string errorMsg; APSARA_TEST_TRUE(serializer.DoSerialize(createBatchedMetricEvents(false, 0, false, true), res, errorMsg)); - APSARA_TEST_EQUAL("{\"__machine_uuid__\":\"machine_uuid\",\"__pack_id__\":\"pack_id\",\"__source__\":\"source\",\"__topic__\":\"topic\",\"__time__\":1234567890,\"__labels__\":{\"key1\":\"value1\"},\"__name__\":\"test_gauge\",\"__value__\":0.1}\n", res); + APSARA_TEST_EQUAL("{\"__machine_uuid__\":\"machine_uuid\",\"__pack_id__\":\"pack_id\",\"__source__\":" + "\"source\",\"__topic__\":\"topic\",\"__time__\":1234567890,\"__labels__\":{\"key1\":" + "\"value1\"},\"__name__\":\"test_gauge\",\"__value__\":0.1}\n", + res); APSARA_TEST_EQUAL("", errorMsg); } { // multi value string res; string errorMsg; - APSARA_TEST_TRUE(serializer.DoSerialize(createBatchedMetricEvents(false, 0, false, false, true), res, errorMsg)); - APSARA_TEST_EQUAL("{\"__machine_uuid__\":\"machine_uuid\",\"__pack_id__\":\"pack_id\",\"__source__\":\"source\",\"__topic__\":\"topic\",\"__time__\":1234567890,\"__labels__\":{\"key1\":\"value1\",\"key2\":\"value2\"},\"__name__\":\"test_gauge\",\"__value__\":{\"test-1\":10.0,\"test-2\":2.0}}\n", res); + APSARA_TEST_TRUE( + serializer.DoSerialize(createBatchedMetricEvents(false, 0, false, false, true), res, errorMsg)); + APSARA_TEST_EQUAL( + "{\"__machine_uuid__\":\"machine_uuid\",\"__pack_id__\":\"pack_id\",\"__source__\":\"source\",\"__" + "topic__\":\"topic\",\"__time__\":1234567890,\"__labels__\":{\"key1\":\"value1\",\"key2\":\"value2\"}," + "\"__name__\":\"test_gauge\",\"__value__\":{\"test-1\":10.0,\"test-2\":2.0}}\n", + res); APSARA_TEST_EQUAL("", errorMsg); } { // nano second disabled string res; string errorMsg; APSARA_TEST_TRUE(serializer.DoSerialize(createBatchedMetricEvents(false, 0, false, false), res, errorMsg)); - APSARA_TEST_EQUAL("{\"__machine_uuid__\":\"machine_uuid\",\"__pack_id__\":\"pack_id\",\"__source__\":\"source\",\"__topic__\":\"topic\",\"__time__\":1234567890,\"__labels__\":{\"key1\":\"value1\",\"key2\":\"value2\"},\"__name__\":\"test_gauge\",\"__value__\":0.1}\n", res); + APSARA_TEST_EQUAL("{\"__machine_uuid__\":\"machine_uuid\",\"__pack_id__\":\"pack_id\",\"__source__\":" + "\"source\",\"__topic__\":\"topic\",\"__time__\":1234567890,\"__labels__\":{\"key1\":" + "\"value1\",\"key2\":\"value2\"},\"__name__\":\"test_gauge\",\"__value__\":0.1}\n", + res); APSARA_TEST_EQUAL("", errorMsg); } { // nano second enabled - // Todo + // Todo } { // empty metric value string res; @@ -136,25 +153,31 @@ void JsonSerializerUnittest::TestSerializeEventGroup() { string res; string errorMsg; APSARA_TEST_TRUE(serializer.DoSerialize(createBatchedRawEvents(false), res, errorMsg)); - APSARA_TEST_EQUAL("{\"__machine_uuid__\":\"machine_uuid\",\"__pack_id__\":\"pack_id\",\"__source__\":\"source\",\"__topic__\":\"topic\",\"__time__\":1234567890,\"content\":\"value\"}\n", res); + APSARA_TEST_EQUAL("{\"__machine_uuid__\":\"machine_uuid\",\"__pack_id__\":\"pack_id\",\"__source__\":" + "\"source\",\"__topic__\":\"topic\",\"__time__\":1234567890,\"content\":\"value\"}\n", + res); APSARA_TEST_EQUAL("", errorMsg); } { // nano second enabled, and set - // Todo + // Todo } { // nano second enabled, not set const_cast(mCtx.GetGlobalConfig()).mEnableTimestampNanosecond = true; string res; string errorMsg; APSARA_TEST_TRUE(serializer.DoSerialize(createBatchedRawEvents(false), res, errorMsg)); - APSARA_TEST_EQUAL("{\"__machine_uuid__\":\"machine_uuid\",\"__pack_id__\":\"pack_id\",\"__source__\":\"source\",\"__topic__\":\"topic\",\"__time__\":1234567890,\"content\":\"value\"}\n", res); + APSARA_TEST_EQUAL("{\"__machine_uuid__\":\"machine_uuid\",\"__pack_id__\":\"pack_id\",\"__source__\":" + "\"source\",\"__topic__\":\"topic\",\"__time__\":1234567890,\"content\":\"value\"}\n", + res); APSARA_TEST_EQUAL("", errorMsg); } { // with empty event string res; string errorMsg; APSARA_TEST_TRUE(serializer.DoSerialize(createBatchedRawEvents(false, true, true), res, errorMsg)); - APSARA_TEST_EQUAL("{\"__machine_uuid__\":\"machine_uuid\",\"__pack_id__\":\"pack_id\",\"__source__\":\"source\",\"__topic__\":\"topic\",\"__time__\":1234567890,\"content\":\"value\"}\n", res); + APSARA_TEST_EQUAL("{\"__machine_uuid__\":\"machine_uuid\",\"__pack_id__\":\"pack_id\",\"__source__\":" + "\"source\",\"__topic__\":\"topic\",\"__time__\":1234567890,\"content\":\"value\"}\n", + res); APSARA_TEST_EQUAL("", errorMsg); } { // only empty event @@ -165,7 +188,7 @@ void JsonSerializerUnittest::TestSerializeEventGroup() { APSARA_TEST_EQUAL("", errorMsg); } } - { // empty log group + { // empty log group PipelineEventGroup group(make_shared()); BatchedEvents batch(std::move(group.MutableEvents()), std::move(group.GetSizedTags()), @@ -173,7 +196,7 @@ void JsonSerializerUnittest::TestSerializeEventGroup() { group.GetMetadata(EventGroupMetaKey::SOURCE_ID), std::move(group.GetExactlyOnceCheckpoint())); string res; - string errorMsg; + string errorMsg; APSARA_TEST_FALSE(serializer.DoSerialize(std::move(batch), res, errorMsg)); APSARA_TEST_EQUAL("", res); APSARA_TEST_EQUAL("empty event group", errorMsg); @@ -216,11 +239,8 @@ JsonSerializerUnittest::createBatchedLogEvents(bool enableNanosecond, bool withE return batch; } -BatchedEvents JsonSerializerUnittest::createBatchedMetricEvents(bool enableNanosecond, - uint32_t nanoTimestamp, - bool emptyValue, - bool onlyOneTag, - bool multiValue) { +BatchedEvents JsonSerializerUnittest::createBatchedMetricEvents( + bool enableNanosecond, uint32_t nanoTimestamp, bool emptyValue, bool onlyOneTag, bool multiValue) { PipelineEventGroup group(make_shared()); group.SetTag(LOG_RESERVED_KEY_TOPIC, "topic"); group.SetTag(LOG_RESERVED_KEY_SOURCE, "source"); @@ -247,8 +267,8 @@ BatchedEvents JsonSerializerUnittest::createBatchedMetricEvents(bool enableNanos e->SetValue(value); } else { UntypedMultiDoubleValues v({{"test-1", {UntypedValueMetricType::MetricTypeCounter, 10.0}}, - {"test-2", {UntypedValueMetricType::MetricTypeGauge, 2.0}}}, - nullptr); + {"test-2", {UntypedValueMetricType::MetricTypeGauge, 2.0}}}, + nullptr); e->SetValue(v); } }