Description
Describe the issue
我想实现一个 这样的功能,在k8s 中,根据不同的命名空间,生成不同的tag(这一步已实现),然后fluentd 把tag 作为变量,传递给 kafka,作为kafka 的 topic ,以此来实现动态的 topic 主题,我这边如何配置都不生效
How did you install fluent operator?
helm 方式安装,版本v2.5.0
Additional context
我写的配置文件
[root@k8s-master1 fluent-operator]# cat fluentd-clusteroutput.yaml
apiVersion: fluentd.fluent.io/v1alpha1
kind: ClusterOutput
metadata:
name: cluster-fluentd-output
labels:
output.fluentd.fluent.io/enabled: "true"
spec:
outputs:
- customPlugin:
config: |
<match kube.**>
@type rewrite_tag_filter
key $.app_name
pattern ^(.+)$
tag $1
- kafka:
brokers: apple74-broker-headless.kafka.svc.cluster.local:9092
defaultTopic: ${tag}
requiredAcks: -1
topicKey: demo-log-demo
useEventTime: true
buffer:
type: file
path: /buffers/${tag}.buffer
flushMode: interval
flushInterval: 5s
flushThreadCount: "4"
chunkLimitSize: 2MB
retryType: exponential_backoff
retryMaxTimes: 5
retryMaxInterval: 30s
retryTimeout: 1m
totalLimitSize: 20MB
timezone: "Asia/Shanghai"
overflowAction: drop_oldest_chunk
运行后但是报错: failed to flush the buffer. retry_times=1 next_retry_time=2023-11-23 07:36:49 +0000 chunk="60acce2c6238a4887f4299ba3d8a72b8" error_class=Kafka::DeliveryFailed error="Failed to assign partitions to 16 messages"
2023-11-23T15:36:47.417400092+08:00 2023-11-23 07:36:47 +0000 [warn]: #0 suppressed same stacktrace
2023-11-23T15:36:47.812006150+08:00 2023-11-23 07:36:47 +0000 [warn]: #0 [ClusterFluentdConfig-cluster-fluentd-config::cluster::clusteroutput::cluster-fluentd-output-1] Send exception occurred: Failed to assign partitions to 37 messages