Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions .chloggen/elasticsearchexporter-traces-dynamic-id.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. receiver/filelog)
component: exporter/elasticsearch

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add support for dynamic document IDs for traces and span events

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [43649]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
Adds `traces_dynamic_id` configuration option to allow setting document IDs based on span and span event attributes using the `elasticsearch.document_id` attribute.
This prevents duplicate documents from being created when the same span is sent multiple times, similar to the existing `logs_dynamic_id` feature.

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
38 changes: 34 additions & 4 deletions exporter/elasticsearchexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,9 @@ This can be customised through the following settings:
- `logs_dynamic_id` (optional): Dynamically determines the document ID to be used in Elasticsearch based on a log record attribute.
- `enabled`(default=false): Enable/Disable dynamic ID for log records. If `elasticsearch.document_id` exists and is not an empty string in the log record attributes, it will be used as the document ID. Otherwise, the document ID will be generated by Elasticsearch. The attribute `elasticsearch.document_id` is removed from the final document when the `otel` mapping mode is used. See [Setting a document id dynamically](#setting-a-document-id-dynamically).

- `traces_dynamic_id` (optional): Dynamically determines the document ID to be used in Elasticsearch based on a span attribute.
- `enabled`(default=false): Enable/Disable dynamic ID for spans. If `elasticsearch.document_id` exists and is not an empty string in the span attributes, it will be used as the document ID. For span events, this only applies when using `otel` mapping mode (where span events are stored as separate documents). Otherwise, the document ID will be generated by Elasticsearch. The attribute `elasticsearch.document_id` is removed from the final document when the `otel` mapping mode is used. See [Setting a document id dynamically](#setting-a-document-id-dynamically).



#### Document routing exceptions for OTel data mode
Expand Down Expand Up @@ -593,12 +596,13 @@ In case the record contains `timestamp`, this value is used. Otherwise, the `obs

## Setting a document id dynamically

The `logs_dynamic_id` setting allows users to set the document ID dynamically based on a log record attribute.
Besides the ability to control the document ID, this setting also works as a deduplication mechanism, as Elasticsearch will refuse to index a document with the same ID.
The `logs_dynamic_id` and `traces_dynamic_id` settings allow users to set the document ID dynamically based on log record, span, or span event attributes.
Besides the ability to control the document ID, these settings also work as a deduplication mechanism, as Elasticsearch will refuse to index a document with the same ID.

The log record attribute `elasticsearch.document_id` can be set explicitly by a processor based on the log record.
For logs, the log record attribute `elasticsearch.document_id` can be set explicitly by a processor based on the log record.
For traces, the span attribute `elasticsearch.document_id` (or span event attribute for span events) can be set explicitly by a processor based on the span or span event.

As an example, the `transform` processor can create this attribute dynamically:
As an example, the `transform` processor can create this attribute dynamically for logs:

```yaml
processors:
Expand All @@ -611,6 +615,32 @@ processors:
- set(attributes["elasticsearch.document_id"], Concat(["log", attributes["event_name"], attributes["event_creation_time"], "-"))
```

For traces, you can use the `transform` processor to set the document ID based on trace and span IDs to ensure uniqueness:

```yaml
exporters:
elasticsearch:
mapping:
mode: otel # Required for span events to be separate documents
traces_dynamic_id:
enabled: true

processors:
transform/es-doc-id-traces:
error_mode: ignore
trace_statements:
- context: span
statements:
# Set ID for spans
- set(attributes["elasticsearch.document_id"], Concat([trace_id.string, span_id.string], "-"))
- context: spanevent
statements:
# Set ID for span events (only works in otel mapping mode)
- set(attributes["elasticsearch.document_id"], Concat([trace_id.string, span_id.string, name], "-"))
```

**Note**: Span events are only stored as separate documents in `otel` mapping mode. In other mapping modes (ecs, bodymap, raw), span events are embedded within the span document and will not have separate document IDs.

## Known issues

### version_conflict_engine_exception
Expand Down
3 changes: 3 additions & 0 deletions exporter/elasticsearchexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ type Config struct {
// LogsDynamicID configures whether log record attribute `elasticsearch.document_id` is set as the document ID in ES.
LogsDynamicID DynamicIDSettings `mapstructure:"logs_dynamic_id"`

// TracesDynamicID configures whether span attribute `elasticsearch.document_id` is set as the document ID in ES.
TracesDynamicID DynamicIDSettings `mapstructure:"traces_dynamic_id"`

// LogsDynamicPipeline configures whether log record attribute `elasticsearch.document_pipeline` is set as the document ingest pipeline for ES.
LogsDynamicPipeline DynamicPipelineSettings `mapstructure:"logs_dynamic_pipeline"`

Expand Down
9 changes: 9 additions & 0 deletions exporter/elasticsearchexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ func TestConfig(t *testing.T) {
LogsDynamicID: DynamicIDSettings{
Enabled: false,
},
TracesDynamicID: DynamicIDSettings{
Enabled: false,
},
LogsDynamicPipeline: DynamicPipelineSettings{
Enabled: false,
},
Expand Down Expand Up @@ -168,6 +171,9 @@ func TestConfig(t *testing.T) {
LogsDynamicID: DynamicIDSettings{
Enabled: false,
},
TracesDynamicID: DynamicIDSettings{
Enabled: false,
},
LogsDynamicPipeline: DynamicPipelineSettings{
Enabled: false,
},
Expand Down Expand Up @@ -241,6 +247,9 @@ func TestConfig(t *testing.T) {
LogsDynamicID: DynamicIDSettings{
Enabled: false,
},
TracesDynamicID: DynamicIDSettings{
Enabled: false,
},
LogsDynamicPipeline: DynamicPipelineSettings{
Enabled: false,
},
Expand Down
25 changes: 17 additions & 8 deletions exporter/elasticsearchexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,10 @@ func (e *elasticsearchExporter) pushLogRecord(
}

buf := e.bufferPool.NewPooledBuffer()
docID := e.extractDocumentIDAttribute(record.Attributes())
var docID string
if e.config.LogsDynamicID.Enabled {
docID = extractDocumentIDAttribute(record.Attributes())
}
pipeline := e.extractDocumentPipelineAttribute(record.Attributes())
if err := encoder.encodeLog(ec, record, index, buf.Buffer); err != nil {
buf.Recycle()
Expand Down Expand Up @@ -422,12 +425,16 @@ func (e *elasticsearchExporter) pushTraceRecord(
}

buf := e.bufferPool.NewPooledBuffer()
var docID string
if e.config.TracesDynamicID.Enabled {
docID = extractDocumentIDAttribute(span.Attributes())
}
if err := encoder.encodeSpan(ec, span, index, buf.Buffer); err != nil {
buf.Recycle()
return fmt.Errorf("failed to encode trace record: %w", err)
}
// not recycling after Add returns an error as we don't know if it's already recycled
return bulkIndexerSession.Add(ctx, index.Index, "", "", buf, nil, docappender.ActionCreate)
return bulkIndexerSession.Add(ctx, index.Index, docID, "", buf, nil, docappender.ActionCreate)
}

func (e *elasticsearchExporter) pushSpanEvent(
Expand All @@ -445,19 +452,21 @@ func (e *elasticsearchExporter) pushSpanEvent(
}

buf := e.bufferPool.NewPooledBuffer()
var docID string
if e.config.TracesDynamicID.Enabled {
docID = extractDocumentIDAttribute(spanEvent.Attributes())
}
if err := encoder.encodeSpanEvent(ec, span, spanEvent, index, buf.Buffer); err != nil || buf.Buffer.Len() == 0 {
buf.Recycle()
return err
}
// not recycling after Add returns an error as we don't know if it's already recycled
return bulkIndexerSession.Add(ctx, index.Index, "", "", buf, nil, docappender.ActionCreate)
return bulkIndexerSession.Add(ctx, index.Index, docID, "", buf, nil, docappender.ActionCreate)
}

func (e *elasticsearchExporter) extractDocumentIDAttribute(m pcommon.Map) string {
if !e.config.LogsDynamicID.Enabled {
return ""
}

// extractDocumentIDAttribute extracts the document ID from the given attributes map.
// Returns empty string if the attribute is not present or is empty.
func extractDocumentIDAttribute(m pcommon.Map) string {
v, ok := m.Get(elasticsearch.DocumentIDAttributeName)
if !ok {
return ""
Expand Down
Loading