| Status | |
|---|---|
| Stability | development: profiles |
| beta: traces, metrics, logs | |
| Distributions | core, contrib |
| Issues | |
| Code coverage | |
| Code Owners | @pavolloffay, @MovieStoreGuy, @axw, @paulojmdias |
Kafka exporter exports logs, metrics, and traces to Kafka. This exporter uses a synchronous producer that blocks and does not batch messages, therefore it should be used with batch and queued retry processors for higher throughput and resiliency. Message payload encoding is configurable.
Note
The Kafka exporter uses the franz-go client library, which provides
better performance and support for modern Kafka features.
There are no required settings.
The following settings can be optionally configured:
brokers(default = localhost:9092): The list of kafka brokers.protocol_version(default = 2.1.0): Kafka protocol version.resolve_canonical_bootstrap_servers_only(default = false): Whether to resolve then reverse-lookup broker IPs during startup.client_id(default = "otel-collector"): The client ID to configure the Kafka client with. The client ID will be used for all produce requests.conn_idle_timeout(default =9m): The time after which idle connections to Kafka brokers are not reused and may be closed.logstopic(default = otlp_logs): The name of the Kafka topic to which logs will be exported.encoding(default = otlp_proto): The encoding for logs. See Supported encodings.topic_from_metadata_key(default = ""): The name of the metadata key whose value should be used as the message's topic. Useful to dynamically produce to topics based on request inputs. It takes precedence overtopic_from_attributeandtopicsettings.
metricstopic(default = otlp_metrics): The name of the Kafka topic to publish metrics to.encoding(default = otlp_proto): The encoding for metrics. See Supported encodings.topic_from_metadata_key(default = ""): The name of the metadata key whose value should be used as the message's topic. Useful to dynamically produce to topics based on request inputs. It takes precedence overtopic_from_attributeandtopicsettings.
tracestopic(default = otlp_spans): The name of the Kafka topic to publish traces to.encoding(default = otlp_proto): The encoding for traces. See Supported encodings.topic_from_metadata_key(default = ""): The name of the metadata key whose value should be used as the message's topic. Useful to dynamically produce to topics based on request inputs. It takes precedence overtopic_from_attributeandtopicsettings.
topic_from_attribute(default = ""): Specify the resource attribute whose value should be used as the message's topic. See Destination Topic below for more details.include_metadata_keys(default = []): Specifies a list of metadata keys to propagate as Kafka message headers. If one or more keys aren't found in the metadata, they are ignored. Whensending_queue::batchis enabled,sending_queue::batch::partition::metadata_keysmust be configured and include all values configured ininclude_metadata_keys.partition_traces_by_id(default = false): configures the exporter to include the trace ID as the message key in trace messages sent to kafka. Please note: this setting does not have any effect on Jaeger encoding exporters since Jaeger exporters include trace ID as the message key by default.partition_metrics_by_resource_attributes(default = false) configures the exporter to include the hash of sorted resource attributes as the message partitioning key in metric messages sent to kafka.partition_logs_by_resource_attributes(default = false) configures the exporter to include the hash of sorted resource attributes as the message partitioning key in log messages sent to kafka.partition_logs_by_trace_id(default = false): configures the exporter to partition log messages by trace ID, if the log record has one associated. Note:partition_logs_by_resource_attributesandpartition_logs_by_trace_idare mutually exclusive, and enabling both will lead to an error.record_partitioner: configures the Kafka-level record partitioner. When unset, the default sarama-compatible sticky key partitioner is used.type: The partitioner strategy. Valid values are:sarama_compatible(default): Sticky key partitioner using Sarama-compatible FNV-1a hashing.round_robin: Distributes records evenly across all partitions.least_backup: Sends records to the partition with the fewest in-flight messages.custom: Delegates partitioning to a custom extension.
extension: The ID of a cusom partitioner extension to be used.
tls: see TLS Configuration Settings for the full set of available options. Set totls: insecure: falseexplicitly when usingAWS_MSK_IAM_OAUTHBEARERas the authentication method.authplain_text(Deprecated in v0.123.0: use sasl with mechanism set to PLAIN instead.)username: The username to use.password: The password to use
saslusername: The username to use.password: The password to usemechanism: The SASL mechanism to use (SCRAM-SHA-256, SCRAM-SHA-512, AWS_MSK_IAM_OAUTHBEARER, or PLAIN)version(default = 0): The SASL protocol version to use (0 or 1)aws_mskregion: AWS Region in case of AWS_MSK_IAM_OAUTHBEARER mechanism
tls(Deprecated in v0.124.0: configure tls at the top level): this is an alias for tls at the top level.kerberosservice_name: Kerberos service namerealm: Kerberos realmuse_keytab: Use of keytab instead of password, if this is true, keytab file will be used instead of passwordusername: The Kerberos username used for authenticate with KDCpassword: The Kerberos password used for authenticate with KDCconfig_file: Path to Kerberos configuration. i.e /etc/krb5.confkeytab_file: Path to keytab file. i.e /etc/security/kafka.keytabdisable_fast_negotiation: Disable PA-FX-FAST negotiation (Pre-Authentication Framework - Fast). Some common Kerberos implementations do not support PA-FX-FAST negotiation. This is set tofalseby default.
metadatafull(default = true): Whether to maintain a full set of metadata. When disabled, the client does not make the initial request to broker at the startup.refresh_interval(default = 10m): The refreshInterval controls the frequency at which cluster metadata is refreshed in the background.retrymax(default = 3): The number of retries to get metadatabackoff(default = 250ms): How long to wait between metadata retries
timeout(default = 5s): Time to wait per individual attempt to produce data to Kafka.retry_on_failureenabled(default = true)initial_interval(default = 5s): Time to wait after the first failure before retrying; ignored ifenabledisfalserandomization_factor(default = 0.5): Is the random factor used to calculate the next backoffs.multiplier(default = 1.5): Is the value multiplied by the backoff interval bounds.max_interval(default = 30s): Is the upper bound on backoff; ignored ifenabledisfalsemax_elapsed_time(default = 300s): Is the maximum amount of time spent trying to send a batch; ignored ifenabledisfalse
sending_queue: see Exporter Helper for the full set of available options.enabled(default = true)num_consumers(default = 10): Number of consumers that dequeue batches; ignored ifenabledisfalsequeue_size(default = 1000): Maximum number of batches kept in memory before dropping data; ignored ifenabledisfalse; User should calculate this asnum_seconds * requests_per_secondwhere:num_secondsis the number of seconds to buffer in case of a backend outagerequests_per_secondis the average number of requests per seconds.
producermax_message_bytes(default = 1000000) the maximum permitted size of a message in bytes, calculated before compression.required_acks(default = 1) controls when a message is regarded as transmitted. https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#ackscompression(default = 'none') the compression used when producing messages to kafka. The options are:none,gzip,snappy,lz4, andzstdhttps://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#compression-typecompression_paramslevel(default = -1) the compression level used when producing messages to kafka.- The following are valid combinations of
compressionandlevelgzip- BestSpeed:
1 - BestCompression:
9 - DefaultCompression:
-1
- BestSpeed:
zstd- SpeedFastest:
1 - SpeedDefault:
3 - SpeedBetterCompression:
6 - SpeedBestCompression:
11
- SpeedFastest:
lz4Only supports fast levelsnappyNo compression levels supported yet
flush_max_messages(default = 10000) The maximum number of messages the producer will send in a single broker request.allow_auto_topic_creation(default = true) whether the broker is allowed to automatically create topics when they are referenced but do not already exist.linger: (default =10ms) How long individual topic partitions will linger waiting for more records before triggering a request to be built.
The Kafka exporter supports encoding extensions, as well as the following built-in encodings.
Available for all signals:
otlp_proto: data is encoded as OTLP Protobufotlp_json: data is encoded as OTLP JSON
Available only for traces:
jaeger_proto: the payload is serialized to a single Jaeger protoSpan, and keyed by TraceID.jaeger_json: the payload is serialized to a single Jaeger JSON Span usingjsonpb, and keyed by TraceID.zipkin_proto: the payload is serialized to Zipkin v2 proto Span.zipkin_json: the payload is serialized to Zipkin v2 JSON Span.
Available only for logs:
raw: if the log record body is a byte array, it is sent as is. Otherwise, it is serialized to JSON. Resource and record attributes are discarded.
Example configuration:
exporters:
kafka:
brokers:
- localhost:9092The destination topic can be defined in a few different ways and takes priority in the following order:
- When
<signal>::topic_from_metadata_keyis set to use a key from the request metadata, the value of this key is used as the signal specific topic. - Otherwise, if
topic_from_attributeis configured, and the corresponding attribute is found on the ingested data, the value of this attribute is used. - If a prior component in the collector pipeline sets the topic on the context via the
topic.WithTopicfunction (from thegithub.com/open-telemetry/opentelemetry-collector-contrib/pkg/kafka/topicpackage), the value set in the context is used. - Finally, the
<signal>::topicconfiguration is used for the signal-specific destination topic.
Kafka topics are partitioned, meaning a topic is spread over a number of “buckets” located on different Kafka brokers. The exporter supports multiple strategies to control how records are distributed across kafka partitions within a topic.
Available strategies for partitioning are sarama_compatible, round_robin, least_backup and custom
The Kafka exporter allows you to define a custom partitioning strategy via an extension. A sample config for custom partitioner would look like:
exporters:
kafka:
brokers:
- localhost:9092
record_partitioner:
type: custom
extension: my_custom_partitioner
extensions:
my_custom_partitioner:
# your extension-specific configuration here
# rest of the pipeline configUse custom partitioner if:
- Built-in strategies (round_robin, least_backup, etc.) don’t fit your needs
- You require domain-specific routing logic
Note
The custom partitioner extension must implement the RecordPartitionerExtension interface (see partitioner.go).