forked from open-telemetry/opentelemetry-collector-contrib
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathconfig.go
More file actions
228 lines (193 loc) · 9.52 KB
/
config.go
File metadata and controls
228 lines (193 loc) · 9.52 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package kafkaexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter"
import (
"errors"
"fmt"
"maps"
"slices"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configoptional"
"go.opentelemetry.io/collector/config/configretry"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/kafka/configkafka"
)
var _ component.Config = (*Config)(nil)
var (
errRecordPartitionerUnknownType = errors.New("unknown partitioner type")
errRecordPartitionerExtRequired = errors.New("partitioner.extension must be set when type is \"extension\"")
)
var errLogsPartitionExclusive = errors.New(
"partition_logs_by_resource_attributes and partition_logs_by_trace_id cannot both be enabled",
)
var (
errTopicMetadataKeyNotIncluded = errors.New("topic_from_metadata_key must be present in sending_queue::batch::partition::metadata_keys if batching is enabled")
errBatchPartitionMetadataKeysRequired = errors.New("sending_queue::batch::partition::metadata_keys must be configured when include_metadata_keys is set and batching is enabled")
errIncludeMetadataKeysNotPartitioned = errors.New("sending_queue::batch::partition::metadata_keys must include all include_metadata_keys values")
)
// RecordPartitionerConfig configures the strategy used to assign Kafka records to partitions.
type RecordPartitionerConfig struct {
// Type is the partitioning strategy. Valid values are:
// - "" or "sarama_compatible" (default): sticky key partitioner with Sarama-compatible FNV-1a hashing
// - "sticky": franz-go StickyKeyPartitioner with murmur2 hashing
// - "round_robin": round-robin across all available partitions
// - "least_backup": routes to the partition with fewest in-flight records
// - "extension": delegates to an extension implementing RecordPartitionerExtension
Type string `mapstructure:"type"`
// Extension is the component ID of an extension implementing RecordPartitionerExtension.
// Required when Type is "extension"; must be empty for all other types.
Extension *component.ID `mapstructure:"extension,omitempty"`
}
func (c *RecordPartitionerConfig) Validate() error {
switch c.Type {
case "", RecordPartitionerTypeSaramaCompatible,
RecordPartitionerTypeRoundRobin,
RecordPartitionerTypeLeastBackup:
case RecordPartitionerTypeExtension:
if c.Extension == nil {
return errRecordPartitionerExtRequired
}
default:
return fmt.Errorf("%w: %q", errRecordPartitionerUnknownType, c.Type)
}
return nil
}
// Config defines configuration for Kafka exporter.
type Config struct {
TimeoutSettings exporterhelper.TimeoutConfig `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct.
QueueBatchConfig configoptional.Optional[exporterhelper.QueueBatchConfig] `mapstructure:"sending_queue"`
configretry.BackOffConfig `mapstructure:"retry_on_failure"`
configkafka.ClientConfig `mapstructure:",squash"`
Producer configkafka.ProducerConfig `mapstructure:"producer"`
// Logs holds configuration about how logs should be sent to Kafka.
Logs SignalConfig `mapstructure:"logs"`
// Metrics holds configuration about how metrics should be sent to Kafka.
Metrics SignalConfig `mapstructure:"metrics"`
// Traces holds configuration about how traces should be sent to Kafka.
Traces SignalConfig `mapstructure:"traces"`
// Profiles holds configuration about how profiles should be sent to Kafka.
Profiles SignalConfig `mapstructure:"profiles"`
// IncludeMetadataKeys indicates the receiver's client metadata keys to propagate as Kafka message headers.
IncludeMetadataKeys []string `mapstructure:"include_metadata_keys"`
// TopicFromAttribute is the name of the attribute to use as the topic name.
TopicFromAttribute string `mapstructure:"topic_from_attribute"`
// PartitionTracesByID sets the message key of outgoing trace messages to the trace ID.
//
// NOTE: this does not have any effect for Jaeger encodings. Jaeger encodings always use
// use the trace ID for the message key.
PartitionTracesByID bool `mapstructure:"partition_traces_by_id"`
// PartitionMetricsByResourceAttributes controls the partitioning of metrics messages by
// resource. If this is true, then the message key will be set to a hash of the resource's
// identifying attributes.
PartitionMetricsByResourceAttributes bool `mapstructure:"partition_metrics_by_resource_attributes"`
// PartitionLogsByResourceAttributes controls the partitioning of logs messages by resource.
// If this is true, then the message key will be set to a hash of the resource's identifying
// attributes.
PartitionLogsByResourceAttributes bool `mapstructure:"partition_logs_by_resource_attributes"`
// PartitionLogsByTraceID controls partitioning of log messages by trace ID only.
// When enabled, the exporter splits incoming logs per TraceID (using SplitLogs)
// and sets the Kafka message key to the 16-byte hex string of that TraceID.
// If a LogRecord has an empty TraceID, the key may be empty and partition
// selection falls back to the Kafka client’s default strategy. Resource
// attributes are not used for the key when this option is enabled.
PartitionLogsByTraceID bool `mapstructure:"partition_logs_by_trace_id"`
// RecordPartitioner configures how Kafka records are assigned to partitions.
// The default ("sarama_compatible") retains the legacy Sarama-compatible hashing
// behavior. Set to "sticky", "round_robin", or "least_backup" to use one of the
// built-in franz-go partitioners, or "extension" to delegate to a custom extension.
RecordPartitioner RecordPartitionerConfig `mapstructure:"record_partitioner"`
}
func (c *Config) Validate() error {
if c.PartitionLogsByResourceAttributes && c.PartitionLogsByTraceID {
return errLogsPartitionExclusive
}
if err := c.RecordPartitioner.Validate(); err != nil {
return fmt.Errorf("record_partitioner: %w", err)
}
if err := validateBatchPartitionerKeys(c); err != nil {
return err
}
return nil
}
// SignalConfig holds signal-specific configuration for the Kafka exporter.
type SignalConfig struct {
// Topic holds the name of the Kafka topic to which messages of the
// signal type should be produced.
//
// The default depends on the signal type:
// - "otlp_spans" for traces
// - "otlp_metrics" for metrics
// - "otlp_logs" for logs
// - "otlp_profiles" for profiles
Topic string `mapstructure:"topic"`
// TopicFromMetadataKey holds the name of the metadata key to use as the
// topic name for this signal type. If this is set, it takes precedence
// over the topic name set in the topic field.
TopicFromMetadataKey string `mapstructure:"topic_from_metadata_key"`
// Encoding holds the encoding of messages for the signal type.
//
// Defaults to "otlp_proto".
Encoding string `mapstructure:"encoding"`
}
// validateBatchPartitionerKeys validates the partition keys if sending_queue::batch is enabled.
// The exporter relies on a few client metadata keys to be present, if configured, in the final
// batch that needs to be exported, however, since batching removes all client metadata keys by
// default we need to ensure proper partitioning is configured to keep the required metadata.
func validateBatchPartitionerKeys(c *Config) error {
if !isBatchingEnabled(c.QueueBatchConfig) {
return nil
}
partitionMetadataKeys := c.QueueBatchConfig.Get().Batch.Get().Partition.MetadataKeys
partitionMetadataKeySet := make(map[string]struct{}, len(partitionMetadataKeys))
for _, key := range partitionMetadataKeys {
partitionMetadataKeySet[key] = struct{}{}
}
// Validate if include_metadata_keys are included in partition keys
if len(c.IncludeMetadataKeys) != 0 {
if len(partitionMetadataKeys) == 0 {
return errBatchPartitionMetadataKeysRequired
}
for _, includeKey := range c.IncludeMetadataKeys {
if _, ok := partitionMetadataKeySet[includeKey]; !ok {
return fmt.Errorf("%w: missing %q from sending_queue::batch::partition::metadata_keys=%v",
errIncludeMetadataKeysNotPartitioned,
includeKey,
partitionMetadataKeys,
)
}
}
}
// Validate if topic_from_metadata_key is included in partition_keys
if err := validateTopicFromMetadataKey(c.Logs.TopicFromMetadataKey, partitionMetadataKeySet); err != nil {
return fmt.Errorf("logs::topic_from_metadata_key: %w", err)
}
if err := validateTopicFromMetadataKey(c.Metrics.TopicFromMetadataKey, partitionMetadataKeySet); err != nil {
return fmt.Errorf("metrics::topic_from_metadata_key: %w", err)
}
if err := validateTopicFromMetadataKey(c.Traces.TopicFromMetadataKey, partitionMetadataKeySet); err != nil {
return fmt.Errorf("traces::topic_from_metadata_key: %w", err)
}
if err := validateTopicFromMetadataKey(c.Profiles.TopicFromMetadataKey, partitionMetadataKeySet); err != nil {
return fmt.Errorf("profiles::topic_from_metadata_key: %w", err)
}
return nil
}
func isBatchingEnabled(queueBatchConfig configoptional.Optional[exporterhelper.QueueBatchConfig]) bool {
if !queueBatchConfig.HasValue() {
return false
}
return queueBatchConfig.Get().Batch.HasValue()
}
func validateTopicFromMetadataKey(topicFromMetadataKey string, partitionKeysSet map[string]struct{}) error {
if topicFromMetadataKey == "" {
return nil
}
if _, ok := partitionKeysSet[topicFromMetadataKey]; !ok {
return fmt.Errorf("%w: %q not found in partition keys=%v",
errTopicMetadataKeyNotIncluded,
topicFromMetadataKey,
slices.Collect(maps.Keys(partitionKeysSet)),
)
}
return nil
}