diff --git a/.github/workflows/benchmark.yaml b/.github/workflows/benchmark.yaml index 5d3258d23e..577eb206aa 100644 --- a/.github/workflows/benchmark.yaml +++ b/.github/workflows/benchmark.yaml @@ -82,7 +82,6 @@ jobs: make benchmark DOCKER_REPOSITORY=ghcr.io/$REPO_OWNER/loongcollector DOCKER_PUSH=true VERSION=edge Benchmark: - if: github.event.pull_request.merged == true runs-on: ${{ matrix.runner }} timeout-minutes: 60 strategy: diff --git a/core/collection_pipeline/serializer/JsonSerializer.cpp b/core/collection_pipeline/serializer/JsonSerializer.cpp index e05b5a7827..358bd47eac 100644 --- a/core/collection_pipeline/serializer/JsonSerializer.cpp +++ b/core/collection_pipeline/serializer/JsonSerializer.cpp @@ -14,15 +14,25 @@ #include "collection_pipeline/serializer/JsonSerializer.h" -#include "constants/SpanConstants.h" -// TODO: the following dependencies should be removed -#include "protobuf/sls/LogGroupSerializer.h" +#include "rapidjson/stringbuffer.h" +#include "rapidjson/writer.h" using namespace std; namespace logtail { -const string JSON_KEY_TIME = "__time__"; +// Helper function to serialize common fields (tags and time) +template +void SerializeCommonFields(const SizedMap& tags, uint64_t timestamp, WriterType& writer) { + // Serialize tags + for (const auto& tag : tags.mInner) { + writer.Key(tag.first.to_string().c_str()); + writer.String(tag.second.to_string().c_str()); + } + // Serialize time + writer.Key("__time__"); + writer.Uint64(timestamp); +} bool JsonEventGroupSerializer::Serialize(BatchedEvents&& group, string& res, string& errorMsg) { if (group.mEvents.empty()) { @@ -35,31 +45,39 @@ bool JsonEventGroupSerializer::Serialize(BatchedEvents&& group, string& res, str // should not happen errorMsg = "unsupported event type in event group"; return false; + } else if (eventType == PipelineEvent::Type::SPAN) { + errorMsg = "invalid event type, span type is not yet supported"; + return false; } - Json::Value groupTags; - for (const auto& tag : group.mTags.mInner) { - groupTags[tag.first.to_string()] = tag.second.to_string(); - } + // Create reusable StringBuffer and Writer + rapidjson::StringBuffer jsonBuffer; + rapidjson::Writer writer(jsonBuffer); + auto resetBuffer = [&jsonBuffer, &writer]() { + jsonBuffer.Clear(); // Clear the buffer for reuse + writer.Reset(jsonBuffer); + }; // TODO: should support nano second - ostringstream oss; switch (eventType) { case PipelineEvent::Type::LOG: for (const auto& item : group.mEvents) { const auto& e = item.Cast(); - Json::Value eventJson; - // tags - eventJson.copy(groupTags); - // time - eventJson[JSON_KEY_TIME] = e.GetTimestamp(); + if (e.Empty()) { + continue; + } + resetBuffer(); + + writer.StartObject(); + SerializeCommonFields(group.mTags, e.GetTimestamp(), writer); // contents for (const auto& kv : e) { - eventJson[kv.first.to_string()] = kv.second.to_string(); + writer.Key(kv.first.to_string().c_str()); + writer.String(kv.second.to_string().c_str()); } - Json::StreamWriterBuilder writer; - writer["indentation"] = ""; - oss << Json::writeString(writer, eventJson) << endl; + writer.EndObject(); + res.append(jsonBuffer.GetString()); + res.append("\n"); } break; case PipelineEvent::Type::METRIC: @@ -69,62 +87,62 @@ bool JsonEventGroupSerializer::Serialize(BatchedEvents&& group, string& res, str if (e.Is()) { continue; } - Json::Value eventJson; - // tags - eventJson.copy(groupTags); - // time - eventJson[JSON_KEY_TIME] = e.GetTimestamp(); + resetBuffer(); + + writer.StartObject(); + SerializeCommonFields(group.mTags, e.GetTimestamp(), writer); // __labels__ - eventJson[METRIC_RESERVED_KEY_LABELS] = Json::objectValue; - auto& labels = eventJson[METRIC_RESERVED_KEY_LABELS]; + writer.Key("__labels__"); + writer.StartObject(); for (auto tag = e.TagsBegin(); tag != e.TagsEnd(); tag++) { - labels[tag->first.to_string()] = tag->second.to_string(); + writer.Key(tag->first.to_string().c_str()); + writer.String(tag->second.to_string().c_str()); } + writer.EndObject(); // __name__ - eventJson[METRIC_RESERVED_KEY_NAME] = e.GetName().to_string(); + writer.Key("__name__"); + writer.String(e.GetName().to_string().c_str()); // __value__ + writer.Key("__value__"); if (e.Is()) { - eventJson[METRIC_RESERVED_KEY_VALUE] = e.GetValue()->mValue; + writer.Double(e.GetValue()->mValue); } else if (e.Is()) { - eventJson[METRIC_RESERVED_KEY_VALUE] = Json::objectValue; - auto& values = eventJson[METRIC_RESERVED_KEY_VALUE]; + writer.StartObject(); for (auto value = e.GetValue()->ValuesBegin(); value != e.GetValue()->ValuesEnd(); value++) { - values[value->first.to_string()] = value->second.Value; + writer.Key(value->first.to_string().c_str()); + writer.Double(value->second.Value); } + writer.EndObject(); } - Json::StreamWriterBuilder writer; - writer["indentation"] = ""; - oss << Json::writeString(writer, eventJson) << endl; + writer.EndObject(); + res.append(jsonBuffer.GetString()); + res.append("\n"); } break; - case PipelineEvent::Type::SPAN: - // TODO: implement span serializer - LOG_ERROR( - sLogger, - ("invalid event type", "span type is not supported")("config", mFlusher->GetContext().GetConfigName())); - break; case PipelineEvent::Type::RAW: for (const auto& item : group.mEvents) { const auto& e = item.Cast(); - Json::Value eventJson; - // tags - eventJson.copy(groupTags); - // time - eventJson[JSON_KEY_TIME] = e.GetTimestamp(); + if (e.GetContent().empty()) { + continue; + } + resetBuffer(); + + writer.StartObject(); + SerializeCommonFields(group.mTags, e.GetTimestamp(), writer); // content - eventJson[DEFAULT_CONTENT_KEY] = e.GetContent().to_string(); - Json::StreamWriterBuilder writer; - writer["indentation"] = ""; - oss << Json::writeString(writer, eventJson) << endl; + writer.Key(DEFAULT_CONTENT_KEY.c_str()); + writer.String(e.GetContent().to_string().c_str()); + writer.EndObject(); + res.append(jsonBuffer.GetString()); + res.append("\n"); } break; default: break; } - res = oss.str(); - return true; + return !res.empty(); } } // namespace logtail diff --git a/core/plugin/flusher/file/FlusherFile.cpp b/core/plugin/flusher/file/FlusherFile.cpp index 1f7117bb3d..0cfbea0fb5 100644 --- a/core/plugin/flusher/file/FlusherFile.cpp +++ b/core/plugin/flusher/file/FlusherFile.cpp @@ -16,7 +16,6 @@ #include "spdlog/async.h" #include "spdlog/sinks/rotating_file_sink.h" -#include "spdlog/sinks/stdout_color_sinks.h" #include "collection_pipeline/queue/SenderQueueManager.h" @@ -43,83 +42,53 @@ bool FlusherFile::Init(const Json::Value& config, Json::Value& optionalGoPipelin mContext->GetLogstoreName(), mContext->GetRegion()); } - // Pattern - // GetMandatoryStringParam(config, "Pattern", mPattern, errorMsg); // MaxFileSize - // GetMandatoryUIntParam(config, "MaxFileSize", mMaxFileSize, errorMsg); + GetMandatoryUIntParam(config, "MaxFileSize", mMaxFileSize, errorMsg); // MaxFiles - // GetMandatoryUIntParam(config, "MaxFiles", mMaxFileSize, errorMsg); + GetMandatoryUIntParam(config, "MaxFiles", mMaxFiles, errorMsg); // create file writer - auto file_sink = std::make_shared(mFilePath, mMaxFileSize, mMaxFiles, true); - mFileWriter = std::make_shared( - sName, file_sink, spdlog::thread_pool(), spdlog::async_overflow_policy::block); - mFileWriter->set_pattern(mPattern); + auto threadPool = std::make_shared(10, 1); + auto fileSink = std::make_shared(mFilePath, mMaxFileSize, mMaxFiles, true); + mFileWriter + = std::make_shared(sName, fileSink, threadPool, spdlog::async_overflow_policy::block); + mFileWriter->set_pattern("%v"); - mBatcher.Init(Json::Value(), this, DefaultFlushStrategyOptions{}); mGroupSerializer = make_unique(this); mSendCnt = GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_FLUSHER_OUT_EVENT_GROUPS_TOTAL); return true; } bool FlusherFile::Send(PipelineEventGroup&& g) { - if (g.IsReplay()) { - return SerializeAndPush(std::move(g)); - } else { - vector res; - mBatcher.Add(std::move(g), res); - return SerializeAndPush(std::move(res)); - } + return SerializeAndPush(std::move(g)); } bool FlusherFile::Flush(size_t key) { - BatchedEventsList res; - mBatcher.FlushQueue(key, res); - return SerializeAndPush(std::move(res)); + return true; } bool FlusherFile::FlushAll() { - vector res; - mBatcher.FlushAll(res); - return SerializeAndPush(std::move(res)); + return true; } bool FlusherFile::SerializeAndPush(PipelineEventGroup&& group) { - string serializedData, errorMsg; + string serializedData; + string errorMsg; BatchedEvents g(std::move(group.MutableEvents()), std::move(group.GetSizedTags()), std::move(group.GetSourceBuffer()), group.GetMetadata(EventGroupMetaKey::SOURCE_ID), std::move(group.GetExactlyOnceCheckpoint())); - mGroupSerializer->DoSerialize(move(g), serializedData, errorMsg); + mGroupSerializer->DoSerialize(std::move(g), serializedData, errorMsg); if (errorMsg.empty()) { - mFileWriter->info(serializedData); + if (!serializedData.empty() && serializedData.back() == '\n') { + serializedData.pop_back(); + } + mFileWriter->info(std::move(serializedData)); + mFileWriter->flush(); } else { LOG_ERROR(sLogger, ("serialize pipeline event group error", errorMsg)); } - mFileWriter->flush(); - return true; -} - -bool FlusherFile::SerializeAndPush(BatchedEventsList&& groupList) { - string serializedData; - for (auto& group : groupList) { - string errorMsg; - mGroupSerializer->DoSerialize(move(group), serializedData, errorMsg); - if (errorMsg.empty()) { - mFileWriter->info(serializedData); - } else { - LOG_ERROR(sLogger, ("serialize pipeline event group error", errorMsg)); - } - } - mFileWriter->flush(); - return true; -} - -bool FlusherFile::SerializeAndPush(vector&& groupLists) { - for (auto& groupList : groupLists) { - SerializeAndPush(std::move(groupList)); - } return true; } diff --git a/core/plugin/flusher/file/FlusherFile.h b/core/plugin/flusher/file/FlusherFile.h index ab2abd4533..eb5176d01c 100644 --- a/core/plugin/flusher/file/FlusherFile.h +++ b/core/plugin/flusher/file/FlusherFile.h @@ -38,15 +38,11 @@ class FlusherFile : public Flusher { private: bool SerializeAndPush(PipelineEventGroup&& group); - bool SerializeAndPush(BatchedEventsList&& groupList); - bool SerializeAndPush(std::vector&& groupLists); std::shared_ptr mFileWriter; std::string mFilePath; - std::string mPattern = "%v"; uint32_t mMaxFileSize = 1024 * 1024 * 10; uint32_t mMaxFiles = 10; - Batcher mBatcher; std::unique_ptr mGroupSerializer; CounterPtr mSendCnt; diff --git a/core/unittest/serializer/CMakeLists.txt b/core/unittest/serializer/CMakeLists.txt index b471e3b81a..4b063c8c47 100644 --- a/core/unittest/serializer/CMakeLists.txt +++ b/core/unittest/serializer/CMakeLists.txt @@ -21,6 +21,10 @@ target_link_libraries(serializer_unittest ${UT_BASE_TARGET}) add_executable(sls_serializer_unittest SLSSerializerUnittest.cpp) target_link_libraries(sls_serializer_unittest ${UT_BASE_TARGET}) +add_executable(json_serializer_unittest JsonSerializerUnittest.cpp) +target_link_libraries(json_serializer_unittest ${UT_BASE_TARGET}) + include(GoogleTest) gtest_discover_tests(serializer_unittest) gtest_discover_tests(sls_serializer_unittest) +gtest_discover_tests(json_serializer_unittest) diff --git a/core/unittest/serializer/JsonSerializerUnittest.cpp b/core/unittest/serializer/JsonSerializerUnittest.cpp new file mode 100644 index 0000000000..d734ae03e1 --- /dev/null +++ b/core/unittest/serializer/JsonSerializerUnittest.cpp @@ -0,0 +1,377 @@ +// Copyright 2025 iLogtail Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + + +#include "collection_pipeline/serializer/JsonSerializer.h" +#include "unittest/Unittest.h" +#include "unittest/plugin/PluginMock.h" + +DECLARE_FLAG_INT32(max_send_log_group_size); + +using namespace std; + +namespace logtail { + +class JsonSerializerUnittest : public ::testing::Test { +public: + void TestSerializeEventGroup(); + +protected: + static void SetUpTestCase() { sFlusher = make_unique(); } + + void SetUp() override { + mCtx.SetConfigName("test_config"); + sFlusher->SetContext(mCtx); + sFlusher->SetMetricsRecordRef(FlusherMock::sName, "1"); + } + +private: + BatchedEvents + createBatchedLogEvents(bool enableNanosecond, bool withEmptyContent = false, bool withNonEmptyContent = true); + BatchedEvents createBatchedMetricEvents( + bool enableNanosecond, uint32_t nanoTimestamp, bool emptyValue, bool onlyOneTag, bool multiValue = false); + BatchedEvents + createBatchedRawEvents(bool enableNanosecond, bool withEmptyContent = false, bool withNonEmptyContent = true); + BatchedEvents createBatchedSpanEvents(); + + static unique_ptr sFlusher; + + CollectionPipelineContext mCtx; +}; + +unique_ptr JsonSerializerUnittest::sFlusher; + +void JsonSerializerUnittest::TestSerializeEventGroup() { + JsonEventGroupSerializer serializer(sFlusher.get()); + { // log + { // nano second disabled, and set + string res; + string errorMsg; + APSARA_TEST_TRUE(serializer.DoSerialize(createBatchedLogEvents(false), res, errorMsg)); + APSARA_TEST_EQUAL("{\"__machine_uuid__\":\"machine_uuid\",\"__pack_id__\":\"pack_id\",\"__source__\":" + "\"source\",\"__topic__\":\"topic\",\"__time__\":1234567890,\"key\":\"value\"}\n", + res); + APSARA_TEST_EQUAL("", errorMsg); + } + { // nano second enabled, and set + // Todo + } + { // nano second enabled, not set + const_cast(mCtx.GetGlobalConfig()).mEnableTimestampNanosecond = true; + string res; + string errorMsg; + APSARA_TEST_TRUE(serializer.DoSerialize(createBatchedLogEvents(false), res, errorMsg)); + APSARA_TEST_EQUAL("{\"__machine_uuid__\":\"machine_uuid\",\"__pack_id__\":\"pack_id\",\"__source__\":" + "\"source\",\"__topic__\":\"topic\",\"__time__\":1234567890,\"key\":\"value\"}\n", + res); + APSARA_TEST_EQUAL("", errorMsg); + } + { // with empty event + string res; + string errorMsg; + APSARA_TEST_TRUE(serializer.DoSerialize(createBatchedLogEvents(false, true, true), res, errorMsg)); + APSARA_TEST_EQUAL("{\"__machine_uuid__\":\"machine_uuid\",\"__pack_id__\":\"pack_id\",\"__source__\":" + "\"source\",\"__topic__\":\"topic\",\"__time__\":1234567890,\"key\":\"value\"}\n", + res); + APSARA_TEST_EQUAL("", errorMsg); + } + { // only empty event + string res; + string errorMsg; + APSARA_TEST_FALSE(serializer.DoSerialize(createBatchedLogEvents(false, true, false), res, errorMsg)); + APSARA_TEST_EQUAL("", res); + APSARA_TEST_EQUAL("", errorMsg); + } + } + { // metric + { // only 1 tag + string res; + string errorMsg; + APSARA_TEST_TRUE(serializer.DoSerialize(createBatchedMetricEvents(false, 0, false, true), res, errorMsg)); + APSARA_TEST_EQUAL("{\"__machine_uuid__\":\"machine_uuid\",\"__pack_id__\":\"pack_id\",\"__source__\":" + "\"source\",\"__topic__\":\"topic\",\"__time__\":1234567890,\"__labels__\":{\"key1\":" + "\"value1\"},\"__name__\":\"test_gauge\",\"__value__\":0.1}\n", + res); + APSARA_TEST_EQUAL("", errorMsg); + } + { // multi value + string res; + string errorMsg; + APSARA_TEST_TRUE( + serializer.DoSerialize(createBatchedMetricEvents(false, 0, false, false, true), res, errorMsg)); + APSARA_TEST_EQUAL( + "{\"__machine_uuid__\":\"machine_uuid\",\"__pack_id__\":\"pack_id\",\"__source__\":\"source\",\"__" + "topic__\":\"topic\",\"__time__\":1234567890,\"__labels__\":{\"key1\":\"value1\",\"key2\":\"value2\"}," + "\"__name__\":\"test_gauge\",\"__value__\":{\"test-1\":10.0,\"test-2\":2.0}}\n", + res); + APSARA_TEST_EQUAL("", errorMsg); + } + { // nano second disabled + string res; + string errorMsg; + APSARA_TEST_TRUE(serializer.DoSerialize(createBatchedMetricEvents(false, 0, false, false), res, errorMsg)); + APSARA_TEST_EQUAL("{\"__machine_uuid__\":\"machine_uuid\",\"__pack_id__\":\"pack_id\",\"__source__\":" + "\"source\",\"__topic__\":\"topic\",\"__time__\":1234567890,\"__labels__\":{\"key1\":" + "\"value1\",\"key2\":\"value2\"},\"__name__\":\"test_gauge\",\"__value__\":0.1}\n", + res); + APSARA_TEST_EQUAL("", errorMsg); + } + { // nano second enabled + // Todo + } + { // empty metric value + string res; + string errorMsg; + APSARA_TEST_FALSE(serializer.DoSerialize(createBatchedMetricEvents(false, 0, true, false), res, errorMsg)); + APSARA_TEST_EQUAL("", res); + APSARA_TEST_EQUAL("", errorMsg); + } + } + { // span + string res; + string errorMsg; + auto events = createBatchedSpanEvents(); + APSARA_TEST_EQUAL(events.mEvents.size(), 1U); + APSARA_TEST_TRUE(events.mEvents[0]->GetType() == PipelineEvent::Type::SPAN); + APSARA_TEST_FALSE(serializer.DoSerialize(std::move(events), res, errorMsg)); + APSARA_TEST_EQUAL("", res); + APSARA_TEST_EQUAL("invalid event type, span type is not yet supported", errorMsg); + } + { // raw + { // nano second disabled, and set + string res; + string errorMsg; + APSARA_TEST_TRUE(serializer.DoSerialize(createBatchedRawEvents(false), res, errorMsg)); + APSARA_TEST_EQUAL("{\"__machine_uuid__\":\"machine_uuid\",\"__pack_id__\":\"pack_id\",\"__source__\":" + "\"source\",\"__topic__\":\"topic\",\"__time__\":1234567890,\"content\":\"value\"}\n", + res); + APSARA_TEST_EQUAL("", errorMsg); + } + { // nano second enabled, and set + // Todo + } + { // nano second enabled, not set + const_cast(mCtx.GetGlobalConfig()).mEnableTimestampNanosecond = true; + string res; + string errorMsg; + APSARA_TEST_TRUE(serializer.DoSerialize(createBatchedRawEvents(false), res, errorMsg)); + APSARA_TEST_EQUAL("{\"__machine_uuid__\":\"machine_uuid\",\"__pack_id__\":\"pack_id\",\"__source__\":" + "\"source\",\"__topic__\":\"topic\",\"__time__\":1234567890,\"content\":\"value\"}\n", + res); + APSARA_TEST_EQUAL("", errorMsg); + } + { // with empty event + string res; + string errorMsg; + APSARA_TEST_TRUE(serializer.DoSerialize(createBatchedRawEvents(false, true, true), res, errorMsg)); + APSARA_TEST_EQUAL("{\"__machine_uuid__\":\"machine_uuid\",\"__pack_id__\":\"pack_id\",\"__source__\":" + "\"source\",\"__topic__\":\"topic\",\"__time__\":1234567890,\"content\":\"value\"}\n", + res); + APSARA_TEST_EQUAL("", errorMsg); + } + { // only empty event + string res; + string errorMsg; + APSARA_TEST_FALSE(serializer.DoSerialize(createBatchedRawEvents(false, true, false), res, errorMsg)); + APSARA_TEST_EQUAL("", res); + APSARA_TEST_EQUAL("", errorMsg); + } + } + { // empty log group + PipelineEventGroup group(make_shared()); + BatchedEvents batch(std::move(group.MutableEvents()), + std::move(group.GetSizedTags()), + std::move(group.GetSourceBuffer()), + group.GetMetadata(EventGroupMetaKey::SOURCE_ID), + std::move(group.GetExactlyOnceCheckpoint())); + string res; + string errorMsg; + APSARA_TEST_FALSE(serializer.DoSerialize(std::move(batch), res, errorMsg)); + APSARA_TEST_EQUAL("", res); + APSARA_TEST_EQUAL("empty event group", errorMsg); + } +} + + +BatchedEvents +JsonSerializerUnittest::createBatchedLogEvents(bool enableNanosecond, bool withEmptyContent, bool withNonEmptyContent) { + PipelineEventGroup group(make_shared()); + group.SetTag(LOG_RESERVED_KEY_TOPIC, "topic"); + group.SetTag(LOG_RESERVED_KEY_SOURCE, "source"); + group.SetTag(LOG_RESERVED_KEY_MACHINE_UUID, "machine_uuid"); + group.SetTag(LOG_RESERVED_KEY_PACKAGE_ID, "pack_id"); + StringBuffer b = group.GetSourceBuffer()->CopyString(string("pack_id")); + group.SetMetadataNoCopy(EventGroupMetaKey::SOURCE_ID, StringView(b.data, b.size)); + group.SetExactlyOnceCheckpoint(RangeCheckpointPtr(new RangeCheckpoint)); + if (withNonEmptyContent) { + LogEvent* e = group.AddLogEvent(); + e->SetContent(string("key"), string("value")); + if (enableNanosecond) { + e->SetTimestamp(1234567890, 1); + } else { + e->SetTimestamp(1234567890); + } + } + if (withEmptyContent) { + LogEvent* e = group.AddLogEvent(); + if (enableNanosecond) { + e->SetTimestamp(1234567890, 1); + } else { + e->SetTimestamp(1234567890); + } + } + BatchedEvents batch(std::move(group.MutableEvents()), + std::move(group.GetSizedTags()), + std::move(group.GetSourceBuffer()), + group.GetMetadata(EventGroupMetaKey::SOURCE_ID), + std::move(group.GetExactlyOnceCheckpoint())); + return batch; +} + +BatchedEvents JsonSerializerUnittest::createBatchedMetricEvents( + bool enableNanosecond, uint32_t nanoTimestamp, bool emptyValue, bool onlyOneTag, bool multiValue) { + PipelineEventGroup group(make_shared()); + group.SetTag(LOG_RESERVED_KEY_TOPIC, "topic"); + group.SetTag(LOG_RESERVED_KEY_SOURCE, "source"); + group.SetTag(LOG_RESERVED_KEY_MACHINE_UUID, "machine_uuid"); + group.SetTag(LOG_RESERVED_KEY_PACKAGE_ID, "pack_id"); + + StringBuffer b = group.GetSourceBuffer()->CopyString(string("pack_id")); + group.SetMetadataNoCopy(EventGroupMetaKey::SOURCE_ID, StringView(b.data, b.size)); + group.SetExactlyOnceCheckpoint(RangeCheckpointPtr(new RangeCheckpoint)); + MetricEvent* e = group.AddMetricEvent(); + e->SetTag(string("key1"), string("value1")); + if (!onlyOneTag) { + e->SetTag(string("key2"), string("value2")); + } + if (enableNanosecond) { + e->SetTimestamp(1234567890, nanoTimestamp); + } else { + e->SetTimestamp(1234567890); + } + + if (!emptyValue) { + if (!multiValue) { + double value = 0.1; + e->SetValue(value); + } else { + UntypedMultiDoubleValues v({{"test-1", {UntypedValueMetricType::MetricTypeCounter, 10.0}}, + {"test-2", {UntypedValueMetricType::MetricTypeGauge, 2.0}}}, + nullptr); + e->SetValue(v); + } + } + e->SetName("test_gauge"); + BatchedEvents batch(std::move(group.MutableEvents()), + std::move(group.GetSizedTags()), + std::move(group.GetSourceBuffer()), + group.GetMetadata(EventGroupMetaKey::SOURCE_ID), + std::move(group.GetExactlyOnceCheckpoint())); + return batch; +} + +BatchedEvents +JsonSerializerUnittest::createBatchedRawEvents(bool enableNanosecond, bool withEmptyContent, bool withNonEmptyContent) { + PipelineEventGroup group(make_shared()); + group.SetTag(LOG_RESERVED_KEY_TOPIC, "topic"); + group.SetTag(LOG_RESERVED_KEY_SOURCE, "source"); + group.SetTag(LOG_RESERVED_KEY_MACHINE_UUID, "machine_uuid"); + group.SetTag(LOG_RESERVED_KEY_PACKAGE_ID, "pack_id"); + StringBuffer b = group.GetSourceBuffer()->CopyString(string("pack_id")); + group.SetMetadataNoCopy(EventGroupMetaKey::SOURCE_ID, StringView(b.data, b.size)); + group.SetExactlyOnceCheckpoint(RangeCheckpointPtr(new RangeCheckpoint)); + if (withNonEmptyContent) { + RawEvent* e = group.AddRawEvent(); + e->SetContent(string("value")); + if (enableNanosecond) { + e->SetTimestamp(1234567890, 1); + } else { + e->SetTimestamp(1234567890); + } + } + if (withEmptyContent) { + RawEvent* e = group.AddRawEvent(); + e->SetContent(string("")); + if (enableNanosecond) { + e->SetTimestamp(1234567890, 1); + } else { + e->SetTimestamp(1234567890); + } + } + BatchedEvents batch(std::move(group.MutableEvents()), + std::move(group.GetSizedTags()), + std::move(group.GetSourceBuffer()), + group.GetMetadata(EventGroupMetaKey::SOURCE_ID), + std::move(group.GetExactlyOnceCheckpoint())); + return batch; +} + +BatchedEvents JsonSerializerUnittest::createBatchedSpanEvents() { + PipelineEventGroup group(make_shared()); + group.SetTag(LOG_RESERVED_KEY_TOPIC, "topic"); + group.SetTag(LOG_RESERVED_KEY_SOURCE, "source"); + group.SetTag(LOG_RESERVED_KEY_MACHINE_UUID, "aaa"); + group.SetTag(LOG_RESERVED_KEY_PACKAGE_ID, "bbb"); + auto now = std::chrono::system_clock::now(); + auto duration = now.time_since_epoch(); + auto seconds = std::chrono::duration_cast(duration).count(); + // auto nano = std::chrono::duration_cast(duration).count(); + StringBuffer b = group.GetSourceBuffer()->CopyString(string("pack_id")); + group.SetMetadataNoCopy(EventGroupMetaKey::SOURCE_ID, StringView(b.data, b.size)); + group.SetExactlyOnceCheckpoint(RangeCheckpointPtr(new RangeCheckpoint)); + SpanEvent* spanEvent = group.AddSpanEvent(); + spanEvent->SetScopeTag(std::string("scope-tag-0"), std::string("scope-value-0")); + spanEvent->SetTag(std::string("workloadName"), std::string("arms-oneagent-test-ql")); + spanEvent->SetTag(std::string("workloadKind"), std::string("faceless")); + spanEvent->SetTag(std::string("source_ip"), std::string("10.54.0.33")); + spanEvent->SetTag(std::string("host"), std::string("10.54.0.33")); + spanEvent->SetTag(std::string("rpc"), std::string("/oneagent/qianlu/local/1")); + spanEvent->SetTag(std::string("rpcType"), std::string("25")); + spanEvent->SetTag(std::string("callType"), std::string("http-client")); + spanEvent->SetTag(std::string("statusCode"), std::string("200")); + spanEvent->SetTag(std::string("version"), std::string("HTTP1.1")); + auto innerEvent = spanEvent->AddEvent(); + innerEvent->SetTag(std::string("innner-event-key-0"), std::string("inner-event-value-0")); + innerEvent->SetTag(std::string("innner-event-key-1"), std::string("inner-event-value-1")); + innerEvent->SetName("inner-event"); + innerEvent->SetTimestampNs(1000); + auto innerLink = spanEvent->AddLink(); + innerLink->SetTag(std::string("innner-link-key-0"), std::string("inner-link-value-0")); + innerLink->SetTag(std::string("innner-link-key-1"), std::string("inner-link-value-1")); + innerLink->SetTraceId("inner-link-traceid"); + innerLink->SetSpanId("inner-link-spanid"); + innerLink->SetTraceState("inner-link-trace-state"); + spanEvent->SetName("/oneagent/qianlu/local/1"); + spanEvent->SetKind(SpanEvent::Kind::Client); + spanEvent->SetStatus(SpanEvent::StatusCode::Ok); + spanEvent->SetSpanId("span-1-2-3-4-5"); + spanEvent->SetTraceId("trace-1-2-3-4-5"); + spanEvent->SetParentSpanId("parent-1-2-3-4-5"); + spanEvent->SetTraceState("test-state"); + spanEvent->SetStartTimeNs(1000); + spanEvent->SetEndTimeNs(2000); + spanEvent->SetTimestamp(seconds); + BatchedEvents batch(std::move(group.MutableEvents()), + std::move(group.GetSizedTags()), + std::move(group.GetSourceBuffer()), + group.GetMetadata(EventGroupMetaKey::SOURCE_ID), + std::move(group.GetExactlyOnceCheckpoint())); + return batch; +} + +UNIT_TEST_CASE(JsonSerializerUnittest, TestSerializeEventGroup) + +} // namespace logtail + +UNIT_TEST_MAIN