Skip to content

Commit a2744fe

Browse files
authored
feat: enable input_static_file_onetime and add basic e2e test (alibaba#2369)
* e2e support one time config * fix StaticFileServer do not start * add basic input_static_file_onetime e2e test * fix ut * change config priority * update DefaultLogQueueSize * polish * polish
1 parent 2e9db4e commit a2744fe

17 files changed

Lines changed: 113 additions & 22 deletions

File tree

core/application/Application.cpp

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -216,17 +216,29 @@ void Application::Start() { // GCOVR_EXCL_START
216216

217217
// config provider
218218
{
219-
// add local config dir
220-
filesystem::path localConfigPath = filesystem::path(AppConfig::GetInstance()->GetLoongcollectorConfDir())
221-
/ GetContinuousPipelineConfigDir() / "local";
222-
error_code ec;
223-
filesystem::create_directories(localConfigPath, ec);
224-
if (ec) {
219+
// add local continuous config dir
220+
filesystem::path localContinuousConfigPath
221+
= filesystem::path(AppConfig::GetInstance()->GetLoongcollectorConfDir()) / GetContinuousPipelineConfigDir()
222+
/ "local";
223+
error_code ec1;
224+
filesystem::create_directories(localContinuousConfigPath, ec1);
225+
if (ec1) {
225226
LOG_WARNING(sLogger,
226227
("failed to create dir for local continuous_pipeline_config",
227-
"manual creation may be required")("error code", ec.value())("error msg", ec.message()));
228+
"manual creation may be required")("error code", ec1.value())("error msg", ec1.message()));
228229
}
229-
PipelineConfigWatcher::GetInstance()->AddSource(localConfigPath.string());
230+
PipelineConfigWatcher::GetInstance()->AddSource(localContinuousConfigPath.string());
231+
// add local onetime config dir
232+
filesystem::path localOnetimeConfigPath = filesystem::path(AppConfig::GetInstance()->GetLoongcollectorConfDir())
233+
/ "onetime_pipeline_config" / "local";
234+
error_code ec2;
235+
filesystem::create_directories(localOnetimeConfigPath, ec2);
236+
if (ec2) {
237+
LOG_WARNING(sLogger,
238+
("failed to create dir for local onetime_pipeline_config",
239+
"manual creation may be required")("error code", ec2.value())("error msg", ec2.message()));
240+
}
241+
PipelineConfigWatcher::GetInstance()->AddSource(localOnetimeConfigPath.string());
230242
}
231243
#ifdef __ENTERPRISE__
232244
EnterpriseConfigProvider::GetInstance()->Start();

core/collection_pipeline/CollectionPipeline.cpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ bool CollectionPipeline::Init(CollectionConfig&& config) {
9090
// for special treatment below
9191
const InputFile* inputFile = nullptr;
9292
const InputContainerStdio* inputContainerStdio = nullptr;
93+
const InputStaticFile* inputStaticFile = nullptr;
9394
bool hasFlusherSLS = false;
9495

9596
// to send alarm and init MetricsRecord before flusherSLS is built, a temporary object is made, which will be
@@ -122,6 +123,8 @@ bool CollectionPipeline::Init(CollectionConfig&& config) {
122123
inputFile = static_cast<const InputFile*>(mInputs[0]->GetPlugin());
123124
} else if (pluginType == InputContainerStdio::sName) {
124125
inputContainerStdio = static_cast<const InputContainerStdio*>(mInputs[0]->GetPlugin());
126+
} else if (pluginType == InputStaticFile::sName) {
127+
inputStaticFile = static_cast<const InputStaticFile*>(mInputs[0]->GetPlugin());
125128
}
126129
} else {
127130
AddPluginToGoPipeline(pluginType, detail, "inputs", mGoPipelineWithInput);
@@ -251,7 +254,8 @@ bool CollectionPipeline::Init(CollectionConfig&& config) {
251254
}
252255

253256
// mandatory override global.DefaultLogQueueSize in Go pipeline when input_file and Go processing coexist.
254-
if ((inputFile != nullptr || inputContainerStdio != nullptr) && IsFlushingThroughGoPipeline()) {
257+
if ((inputFile != nullptr || inputContainerStdio != nullptr || inputStaticFile != nullptr)
258+
&& IsFlushingThroughGoPipeline()) {
255259
mGoPipelineWithoutInput["global"]["DefaultLogQueueSize"]
256260
= Json::Value(INT32_FLAG(default_plugin_log_queue_size));
257261
}

core/collection_pipeline/CollectionPipeline.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
#include "monitor/MetricManager.h"
3535
#include "plugin/input/InputContainerStdio.h"
3636
#include "plugin/input/InputFile.h"
37+
#include "plugin/input/InputStaticFile.h"
3738

3839
namespace logtail {
3940

core/collection_pipeline/CollectionPipelineContext.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ class CollectionPipelineContext {
4949
bool InitGlobalConfig(const Json::Value& config, Json::Value& extendedParams) {
5050
return mGlobalConfig.Init(config, *this, extendedParams);
5151
}
52+
void SetConfigPriority(uint32_t priority) { mGlobalConfig.mPriority = priority; }
5253
void SetProcessQueueKey(QueueKey key) { mProcessQueueKey = key; }
5354
QueueKey GetProcessQueueKey() const { return mProcessQueueKey; }
5455
void SetIsOnetimePipelineRunningBeforeStart(bool flag) { mIsOnetimePipelineRunningBeforeStart = flag; }

core/collection_pipeline/GlobalConfig.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ struct GlobalConfig {
3636

3737
TopicType mTopicType = TopicType::NONE;
3838
std::string mTopicFormat;
39-
uint32_t mPriority = 1U;
39+
uint32_t mPriority = 1U; // highest priority is 0, lowest priority is 2, default is 1
4040
bool mEnableTimestampNanosecond = false;
4141
bool mUsingOldContentTag = false;
4242
};

core/config/CollectionConfig.cpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -253,12 +253,14 @@ bool CollectionConfig::Parse() {
253253
mInputs.push_back(&plugin);
254254
#ifndef APSARA_UNIT_TEST_MAIN
255255
// TODO: remove these special restrictions
256-
if (pluginType == "input_file" || pluginType == "input_container_stdio") {
256+
if (pluginType == "input_file" || pluginType == "input_container_stdio"
257+
|| pluginType == "input_static_file_onetime") {
257258
hasFileInput = true;
258259
}
259260
#else
260261
// TODO: remove these special restrictions after all C++ inputs support Go processors
261-
if (pluginType.find("input_file") != string::npos || pluginType.find("input_container_stdio") != string::npos) {
262+
if (pluginType.find("input_file") != string::npos || pluginType.find("input_container_stdio") != string::npos
263+
|| pluginType.find("input_static_file_onetime") != string::npos) {
262264
hasFileInput = true;
263265
}
264266
#endif

core/file_server/StaticFileServer.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,11 @@ void StaticFileServer::AddInput(const string& configName,
8383
const MultilineOptions* multilineOpts,
8484
const FileTagOptions* fileTagOpts,
8585
const CollectionPipelineContext* ctx) {
86+
// check if the server is started, if not, start it
87+
if (!mThreadRes.valid()) {
88+
Init();
89+
}
90+
8691
InputStaticFileCheckpointManager::GetInstance()->CreateCheckpoint(configName, idx, files);
8792
{
8893
lock_guard<mutex> lock(mUpdateMux);
@@ -259,6 +264,7 @@ FileTagConfig StaticFileServer::GetFileTagConfig(const std::string& name, size_t
259264

260265
#ifdef APSARA_UNIT_TEST_MAIN
261266
void StaticFileServer::Clear() {
267+
Stop();
262268
lock_guard<mutex> lock(mUpdateMux);
263269
mInputFileDiscoveryConfigsMap.clear();
264270
mInputFileReaderConfigsMap.clear();

core/plugin/input/InputStaticFile.cpp

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,9 @@
2424
#include "collection_pipeline/CollectionPipeline.h"
2525
#include "collection_pipeline/plugin/PluginRegistry.h"
2626
#include "common/ParamExtractor.h"
27+
#include "file_server/FileServer.h"
2728
#include "file_server/StaticFileServer.h"
29+
#include "monitor/metric_constants/MetricConstants.h"
2830
#include "plugin/processor/inner/ProcessorSplitLogStringNative.h"
2931
#include "plugin/processor/inner/ProcessorSplitMultilineLogStringNative.h"
3032

@@ -57,6 +59,9 @@ const string InputStaticFile::sName = "input_static_file_onetime";
5759
bool InputStaticFile::Init(const Json::Value& config, Json::Value& optionalGoPipeline) {
5860
string errorMsg;
5961

62+
// SetConfigPriority must be called before GlobalConfig::Init() to avoid overriding the priority set by the user
63+
mContext->SetConfigPriority(2);
64+
6065
if (!mFileDiscovery.Init(config, *mContext, sName)) {
6166
return false;
6267
}
@@ -121,6 +126,19 @@ bool InputStaticFile::Init(const Json::Value& config, Json::Value& optionalGoPip
121126
return false;
122127
}
123128

129+
// Initialize metrics
130+
mMonitorFileTotal = GetMetricsRecordRef().CreateIntGauge(METRIC_PLUGIN_MONITOR_FILE_TOTAL);
131+
static const std::unordered_map<std::string, MetricType> inputStaticFileMetricKeys = {
132+
{METRIC_PLUGIN_OUT_EVENTS_TOTAL, MetricType::METRIC_TYPE_COUNTER},
133+
{METRIC_PLUGIN_OUT_EVENT_GROUPS_TOTAL, MetricType::METRIC_TYPE_COUNTER},
134+
{METRIC_PLUGIN_OUT_SIZE_BYTES, MetricType::METRIC_TYPE_COUNTER},
135+
{METRIC_PLUGIN_SOURCE_SIZE_BYTES, MetricType::METRIC_TYPE_INT_GAUGE},
136+
{METRIC_PLUGIN_SOURCE_READ_OFFSET_BYTES, MetricType::METRIC_TYPE_INT_GAUGE},
137+
};
138+
mPluginMetricManager = std::make_shared<PluginMetricManager>(
139+
GetMetricsRecordRef()->GetLabels(), inputStaticFileMetricKeys, MetricCategory::METRIC_CATEGORY_PLUGIN_SOURCE);
140+
mPluginMetricManager->RegisterSizeGauge(mMonitorFileTotal);
141+
124142
return CreateInnerProcessors();
125143
}
126144

@@ -129,6 +147,10 @@ bool InputStaticFile::Start() {
129147
// TODO: get container info
130148
// mFileDiscovery.SetContainerInfo();
131149
}
150+
151+
// Add plugin metric manager
152+
FileServer::GetInstance()->AddPluginMetricManager(mContext->GetConfigName(), mPluginMetricManager);
153+
132154
optional<vector<filesystem::path>> files;
133155
if (!mContext->IsOnetimePipelineRunningBeforeStart()) {
134156
files = GetFiles();
@@ -140,6 +162,10 @@ bool InputStaticFile::Start() {
140162

141163
bool InputStaticFile::Stop(bool isPipelineRemoving) {
142164
StaticFileServer::GetInstance()->RemoveInput(mContext->GetConfigName(), mIndex);
165+
166+
// Remove plugin metric manager
167+
FileServer::GetInstance()->RemovePluginMetricManager(mContext->GetConfigName());
168+
143169
return true;
144170
}
145171

core/plugin/input/InputStaticFile.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
#include "file_server/FileTagOptions.h"
3030
#include "file_server/MultilineOptions.h"
3131
#include "file_server/reader/FileReaderOptions.h"
32+
#include "monitor/metric_models/ReentrantMetricsRecord.h"
3233

3334
namespace logtail {
3435

@@ -50,6 +51,9 @@ class InputStaticFile : public Input {
5051
FileTagOptions mFileTag;
5152

5253
private:
54+
PluginMetricManagerPtr mPluginMetricManager;
55+
IntGaugePtr mMonitorFileTotal;
56+
5357
void GetValidBaseDirs(const std::filesystem::path& dir,
5458
uint32_t depth,
5559
std::vector<std::filesystem::path>& filepaths) const;

scripts/gen_build_scripts.sh

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ function generateCopyScript() {
125125
echo 'mkdir -p $BINDIR/conf/instance_config/local/' >>$COPY_SCRIPT_FILE
126126
echo 'echo -e "{\n}" > $BINDIR/conf/instance_config/local/loongcollector_config.json' >>$COPY_SCRIPT_FILE
127127
echo 'mkdir -p $BINDIR/conf/continuous_pipeline_config/local' >>$COPY_SCRIPT_FILE
128+
echo 'mkdir -p $BINDIR/conf/onetime_pipeline_config/local' >>$COPY_SCRIPT_FILE
128129
echo 'docker rm -v "$id"' >>$COPY_SCRIPT_FILE
129130
}
130131

0 commit comments

Comments
 (0)