You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
reacted with thumbs up emoji reacted with thumbs down emoji reacted with laugh emoji reacted with hooray emoji reacted with confused emoji reacted with heart emoji reacted with rocket emoji reacted with eyes emoji
Uh oh!
There was an error while loading. Please reload this page.
Uh oh!
There was an error while loading. Please reload this page.
-
flusher配置参数
参数来自rdkafka,实现loongcollector native时按照当前社区的实现格式转换参数名
基础配置
bootstrap.servers""host1:port1,host2:port2)。topic""message.max.bytes1000000(1MB)receive.message.max.bytes100000000(100MB)message.max.bytes)。queue.buffering.max.messages100000queue.buffering.max.kbytes1048576(1GB)queue.buffering.max.messages(消息数量限制)linger.ms5message.send.max.retries2147483647enable.idempotence=true)。retry.backoff.msbatch.num.messages10000batch.size(字节限制)和message.max.bytes(单条消息最大字节数)约束。batch.size1000000batch.num.messages(消息数量限制)和message.max.bytes(单条消息最大字节数)约束。acks-1•
0:Leader无需返回响应,消息可能丢失。•
1(默认):Leader写入本地日志后立即响应,无需ISR副本确认。•
-1/all:Leader需等待所有ISR副本提交消息后才响应。若ISR副本数不足min.insync.replicas(Broker配置),请求将失败。request.timeout.ms30000(30s)request.required.acks(即acks)≠0时生效。超时后生产者会重试或抛出异常。message.timeout.ms300000(5m)0表示无限等待。若配置了transactional.id,该值自动对齐transaction.timeout.ms。partitioner"consistent_random"consistent - 基于Key的CRC32哈希值分配(空Key和NULL Key会被固定分配到同一个分区)
consistent_random - 基于Key的CRC32哈希值分配(空Key和NULL Key会被随机分配到不同分区)
murmur2 - 与Java生产者兼容的Murmur2哈希分配(NULL Key会被固定分配到同一个分区)
murmur2_random - 与Java生产者兼容的Murmur2哈希分配(NULL Key会被随机分配到不同分区,功能上与Java生产者的默认分区器等效)
fnv1a - 基于Key的FNV-1a哈希值分配(NULL Key会被固定分配到同一个分区)
fnv1a_random - 基于Key的FNV-1a哈希值分配(NULL Key会被随机分配到不同分区)
compression.type"none""none"、"gzip"、"snappy"、"lz4"、"zstd"。compression.level-1compression.codec选定的算法)。数值越高压缩率越好,但会消耗更多CPU资源。可用范围取决于算法:gzip为[0-9];lz4为[0-12];snappy仅支持0;-1表示采用编解码器默认压缩级别。权限认证
rbkafka的权限认证配置项
SASL认证
sasl.mechanism"PLAIN"""""ssl认证
certificate_verify_cb来扩展此验证功能""""Kerberos认证
sasl.mechanisms"GSSAPI"权限部分实现目标
多组件接入权限认证配置统一化
Authentication.TLS.Enabledsecurity.protocol=sslAuthentication.TLS.CAFilessl.ca.locationAuthentication.TLS.CertFilessl.certificate.location(与 KeyFile 必须成对配置,否则将视为配置错误)Authentication.TLS.KeyFilessl.key.location(与 CertFile 必须成对配置,否则将视为配置错误)Authentication.TLS.KeyPasswordssl.key.password(可选)Authentication.Kerberos.EnabledAuthentication.Kerberos.Mechanisms"GSSAPI"sasl.mechanismsAuthentication.Kerberos.ServiceName"kafka"sasl.kerberos.service.nameAuthentication.Kerberos.Principalsasl.kerberos.principalAuthentication.Kerberos.Keytabsasl.kerberos.keytabAuthentication.Kerberos.KinitCmdsasl.kerberos.kinit.cmdAuthentication.SASL.Mechanismsasl.mechanismsAuthentication.SASL.Usernamesasl.usernameAuthentication.SASL.Passwordsasl.passwordAuthConfig参考实现下面的代码仅做实现指导,并未验证
文件:
auth_config.h文件:
auth_config.cppKafka 组件:自行映射
压缩配置参考说明
kafka日志发送标准
在开源生态中95%的消息中间件消费组件都是基于单条消息来消费的处理的,并且各语言的sdk也提供了buffer来实现消息的批量发送,因此社区不实现多条日志合并打包成一条message的功能,在go版本flusher中也不提供这样的能力。
Beta Was this translation helpful? Give feedback.
All reactions