Skip to content
This repository was archived by the owner on Apr 12, 2024. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions exporter/opensearchexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ Supports standard TLS settings as part of HTTP settings. See [TLS Configuration/

### Bulk Indexer Options
- `bulk_action` (optional): the [action](https://opensearch.org/docs/2.9/api-reference/document-apis/bulk/) for ingesting data. Only `create` and `index` are allowed here.
- `pipeline` (optional): Optional [Ingest Node](https://opensearch.org/docs/latest/opensearch/rest-api/ingest-apis/get-ingest/) pipeline ID used for processing documents published by the exporter.

## Example

```yaml
Expand Down
4 changes: 4 additions & 0 deletions exporter/opensearchexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ type Config struct {
// BulkAction configures the action for ingesting data. Only `create` and `index` are allowed here.
// If not specified, the default value `create` will be used.
BulkAction string `mapstructure:"bulk_action"`

// Pipeline configures the ingest node pipeline name that should be used to process the events.
// https://opensearch.org/docs/latest/opensearch/rest-api/ingest-apis/get-ingest/
Pipeline string `mapstructure:"pipeline"`
}

var (
Expand Down
9 changes: 9 additions & 0 deletions exporter/opensearchexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,15 @@ func TestLoadConfig(t *testing.T) {
return assert.ErrorContains(t, err, errBulkActionInvalid.Error())
},
},
{
id: component.NewIDWithName(metadata.Type, "pipeline"),
expected: withDefaultConfig(func(config *Config) {
config.Endpoint = sampleEndpoint
config.BulkAction = defaultBulkAction
config.Pipeline = "testpipeline"
}),
configValidateAssert: assert.NoError,
},
}

for _, tt := range tests {
Expand Down
10 changes: 6 additions & 4 deletions exporter/opensearchexporter/log_bulk_indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,18 @@ type logBulkIndexer struct {
index string
bulkAction string
model mappingModel
pipeline string
errs []error
bulkIndexer opensearchutil.BulkIndexer
}

func newLogBulkIndexer(index, bulkAction string, model mappingModel) *logBulkIndexer {
return &logBulkIndexer{index, bulkAction, model, nil, nil}
func newLogBulkIndexer(index, bulkAction string, model mappingModel, pipeline string) *logBulkIndexer {
return &logBulkIndexer{index, bulkAction, model, pipeline, nil, nil}
}

func (lbi *logBulkIndexer) start(client *opensearch.Client) error {
var startErr error
lbi.bulkIndexer, startErr = newLogOpenSearchBulkIndexer(client, lbi.onIndexerError)
lbi.bulkIndexer, startErr = newLogOpenSearchBulkIndexer(client, lbi.pipeline, lbi.onIndexerError)
return startErr
}

Expand Down Expand Up @@ -115,10 +116,11 @@ func (lbi *logBulkIndexer) newBulkIndexerItem(document []byte) opensearchutil.Bu
return item
}

func newLogOpenSearchBulkIndexer(client *opensearch.Client, onIndexerError func(context.Context, error)) (opensearchutil.BulkIndexer, error) {
func newLogOpenSearchBulkIndexer(client *opensearch.Client, pipeline string, onIndexerError func(context.Context, error)) (opensearchutil.BulkIndexer, error) {
return opensearchutil.NewBulkIndexer(opensearchutil.BulkIndexerConfig{
NumWorkers: 1,
Client: client,
Pipeline: pipeline,
OnError: onIndexerError,
})
}
Expand Down
4 changes: 3 additions & 1 deletion exporter/opensearchexporter/sso_log_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type logExporter struct {
model mappingModel
httpSettings confighttp.ClientConfig
telemetry component.TelemetrySettings
pipeline string
}

func newLogExporter(cfg *Config, set exporter.CreateSettings) (*logExporter, error) {
Expand All @@ -45,6 +46,7 @@ func newLogExporter(cfg *Config, set exporter.CreateSettings) (*logExporter, err
bulkAction: cfg.BulkAction,
httpSettings: cfg.ClientConfig,
model: model,
pipeline: cfg.Pipeline,
}, nil
}

Expand All @@ -64,7 +66,7 @@ func (l *logExporter) Start(ctx context.Context, host component.Host) error {
}

func (l *logExporter) pushLogData(ctx context.Context, ld plog.Logs) error {
indexer := newLogBulkIndexer(l.Index, l.bulkAction, l.model)
indexer := newLogBulkIndexer(l.Index, l.bulkAction, l.model, l.pipeline)
startErr := indexer.start(l.client)
if startErr != nil {
return startErr
Expand Down
5 changes: 5 additions & 0 deletions exporter/opensearchexporter/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,8 @@ opensearch/trace:
enabled: true
initial_interval: 100000000
randomization_factor: 0.5

opensearch/pipeline:
http:
endpoint: https://opensearch.example.com:9200
pipeline: testpipeline