From bfd2ed316713875220e44407b0231b3701c70dd7 Mon Sep 17 00:00:00 2001 From: Patrick D'Souza Date: Thu, 11 Apr 2024 11:49:40 -0400 Subject: [PATCH] Add support for pipeline config --- exporter/opensearchexporter/README.md | 2 ++ exporter/opensearchexporter/config.go | 4 ++++ exporter/opensearchexporter/config_test.go | 9 +++++++++ exporter/opensearchexporter/log_bulk_indexer.go | 10 ++++++---- exporter/opensearchexporter/sso_log_exporter.go | 4 +++- exporter/opensearchexporter/testdata/config.yaml | 5 +++++ 6 files changed, 29 insertions(+), 5 deletions(-) diff --git a/exporter/opensearchexporter/README.md b/exporter/opensearchexporter/README.md index 47c7bd705f87c..b51c29a004449 100644 --- a/exporter/opensearchexporter/README.md +++ b/exporter/opensearchexporter/README.md @@ -38,6 +38,8 @@ Supports standard TLS settings as part of HTTP settings. See [TLS Configuration/ ### Bulk Indexer Options - `bulk_action` (optional): the [action](https://opensearch.org/docs/2.9/api-reference/document-apis/bulk/) for ingesting data. Only `create` and `index` are allowed here. +- `pipeline` (optional): Optional [Ingest Node](https://opensearch.org/docs/latest/opensearch/rest-api/ingest-apis/get-ingest/) pipeline ID used for processing documents published by the exporter. + ## Example ```yaml diff --git a/exporter/opensearchexporter/config.go b/exporter/opensearchexporter/config.go index c8546a6d407ae..cf6c02a340f9c 100644 --- a/exporter/opensearchexporter/config.go +++ b/exporter/opensearchexporter/config.go @@ -47,6 +47,10 @@ type Config struct { // BulkAction configures the action for ingesting data. Only `create` and `index` are allowed here. // If not specified, the default value `create` will be used. BulkAction string `mapstructure:"bulk_action"` + + // Pipeline configures the ingest node pipeline name that should be used to process the events. + // https://opensearch.org/docs/latest/opensearch/rest-api/ingest-apis/get-ingest/ + Pipeline string `mapstructure:"pipeline"` } var ( diff --git a/exporter/opensearchexporter/config_test.go b/exporter/opensearchexporter/config_test.go index 68d9abd6b6816..ff64126b405c8 100644 --- a/exporter/opensearchexporter/config_test.go +++ b/exporter/opensearchexporter/config_test.go @@ -106,6 +106,15 @@ func TestLoadConfig(t *testing.T) { return assert.ErrorContains(t, err, errBulkActionInvalid.Error()) }, }, + { + id: component.NewIDWithName(metadata.Type, "pipeline"), + expected: withDefaultConfig(func(config *Config) { + config.Endpoint = sampleEndpoint + config.BulkAction = defaultBulkAction + config.Pipeline = "testpipeline" + }), + configValidateAssert: assert.NoError, + }, } for _, tt := range tests { diff --git a/exporter/opensearchexporter/log_bulk_indexer.go b/exporter/opensearchexporter/log_bulk_indexer.go index ccf548bb8d67c..32d3bdebc3fb9 100644 --- a/exporter/opensearchexporter/log_bulk_indexer.go +++ b/exporter/opensearchexporter/log_bulk_indexer.go @@ -19,17 +19,18 @@ type logBulkIndexer struct { index string bulkAction string model mappingModel + pipeline string errs []error bulkIndexer opensearchutil.BulkIndexer } -func newLogBulkIndexer(index, bulkAction string, model mappingModel) *logBulkIndexer { - return &logBulkIndexer{index, bulkAction, model, nil, nil} +func newLogBulkIndexer(index, bulkAction string, model mappingModel, pipeline string) *logBulkIndexer { + return &logBulkIndexer{index, bulkAction, model, pipeline, nil, nil} } func (lbi *logBulkIndexer) start(client *opensearch.Client) error { var startErr error - lbi.bulkIndexer, startErr = newLogOpenSearchBulkIndexer(client, lbi.onIndexerError) + lbi.bulkIndexer, startErr = newLogOpenSearchBulkIndexer(client, lbi.pipeline, lbi.onIndexerError) return startErr } @@ -115,10 +116,11 @@ func (lbi *logBulkIndexer) newBulkIndexerItem(document []byte) opensearchutil.Bu return item } -func newLogOpenSearchBulkIndexer(client *opensearch.Client, onIndexerError func(context.Context, error)) (opensearchutil.BulkIndexer, error) { +func newLogOpenSearchBulkIndexer(client *opensearch.Client, pipeline string, onIndexerError func(context.Context, error)) (opensearchutil.BulkIndexer, error) { return opensearchutil.NewBulkIndexer(opensearchutil.BulkIndexerConfig{ NumWorkers: 1, Client: client, + Pipeline: pipeline, OnError: onIndexerError, }) } diff --git a/exporter/opensearchexporter/sso_log_exporter.go b/exporter/opensearchexporter/sso_log_exporter.go index 7ab0ec9447e5a..67c4e44ebbebf 100644 --- a/exporter/opensearchexporter/sso_log_exporter.go +++ b/exporter/opensearchexporter/sso_log_exporter.go @@ -21,6 +21,7 @@ type logExporter struct { model mappingModel httpSettings confighttp.ClientConfig telemetry component.TelemetrySettings + pipeline string } func newLogExporter(cfg *Config, set exporter.CreateSettings) (*logExporter, error) { @@ -45,6 +46,7 @@ func newLogExporter(cfg *Config, set exporter.CreateSettings) (*logExporter, err bulkAction: cfg.BulkAction, httpSettings: cfg.ClientConfig, model: model, + pipeline: cfg.Pipeline, }, nil } @@ -64,7 +66,7 @@ func (l *logExporter) Start(ctx context.Context, host component.Host) error { } func (l *logExporter) pushLogData(ctx context.Context, ld plog.Logs) error { - indexer := newLogBulkIndexer(l.Index, l.bulkAction, l.model) + indexer := newLogBulkIndexer(l.Index, l.bulkAction, l.model, l.pipeline) startErr := indexer.start(l.client) if startErr != nil { return startErr diff --git a/exporter/opensearchexporter/testdata/config.yaml b/exporter/opensearchexporter/testdata/config.yaml index a187af23318e1..b10c8e7f0ac00 100644 --- a/exporter/opensearchexporter/testdata/config.yaml +++ b/exporter/opensearchexporter/testdata/config.yaml @@ -41,3 +41,8 @@ opensearch/trace: enabled: true initial_interval: 100000000 randomization_factor: 0.5 + +opensearch/pipeline: + http: + endpoint: https://opensearch.example.com:9200 + pipeline: testpipeline