Description
Description
I have a service that already had completely working Confluent Kafka Consumers and Producers with SSL enabled in the environment I've got the following bug. Then I've added a goka package which only supports Sarama.
When I'm trying to turn on Sarama's TLS (without cert verifying – the same as Confluent consumer), I get the following error:
Error creating the kafka client: kafka: client has run out of available brokers to talk to: 3 errors occurred:\n\t* unexpected EOF\n\t* unexpected EOF\n\t* unexpected EOF\n"
Versions
Sarama | Kafka | Go |
---|---|---|
1.41.3 | 3.4.0 | 1.23.3 |
I also tried different sarama.V0_11_0_0
, V2_8_*
, etc. Nothing worked
Configuration
func getSaramaConfig(tlsEnabled bool) *sarama.Config {
config := sarama.NewConfig()
config.Version = sarama.V0_11_0_0
config.Net.TLS.Enable = tlsEnabled
config.Net.TLS.Config = &tls.Config{
InsecureSkipVerify: true,
}
return config
}
The code that utilizes the Sarama and not starting:
code: CLICK ME
func NewStreamProcessor(cfg *config.KafkaStreams, logger *zerolog.Logger) (*StreamProcessor, error) {
sp := &StreamProcessor{
cfg: cfg,
logger: logger,
}
sp.logger.Info().Msg("Initializing StreamProcessor")
// Sarama Kafka config
saramaCfg := getSaramaConfig(cfg.TLSEnabled)
requestCodec := new(RequestCodec)
mediaResultCodec := new(MediaResultCodec)
finalResponseCodec := new(FinalResponseCodec)
stateCodec := new(ProcessingStateCodec)
tmc := goka.NewTopicManagerConfig()
tmc.Table.Replication = 1
tmc.Stream.Replication = 1
group := goka.Group(cfg.ConsumerGroup)
requestsStream := goka.Stream(cfg.InputTopic)
mediaStream := goka.Stream(cfg.MediaTopic)
outputStream := goka.Stream(cfg.OutputTopic)
builder := goka.DefineGroup(group,
goka.Input(requestsStream, requestCodec, sp.handleRequest),
goka.Input(mediaStream, mediaResultCodec, sp.handleMediaResult),
goka.Output(outputStream, finalResponseCodec),
goka.Persist(stateCodec),
)
sp.logger.Info().Str("bootstrap_servers", cfg.BootstrapServers).Msg("Kafka bootstrap servers")
cfg.BootstrapServers = strings.ReplaceAll(cfg.BootstrapServers, " ", "")
bsServers := strings.Split(cfg.BootstrapServers, ",")
sp.logger.Info().Strs("bootstrap_servers_split", bsServers).Msg("Kafka bootstrap servers split")
sp.logger.Info().
Str("input_topic", cfg.InputTopic).
Str("output_topic", cfg.OutputTopic).
Str("media_topic", cfg.MediaTopic).
Msg("Topics")
emitter, err := goka.NewEmitter(
bsServers,
goka.Stream(cfg.OutputTopic),
finalResponseCodec,
goka.WithEmitterProducerBuilder(goka.ProducerBuilderWithConfig(saramaCfg)),
)
if err != nil {
return nil, fmt.Errorf("failed to create emitter: %w", err)
}
sp.logger.Info().Msg("Kafka Streams Emitter created")
processor, err := goka.NewProcessor(bsServers,
builder,
goka.WithStorageBuilder(func(topic string, partition int32) (storage.Storage, error) {
// TODO: replace if will be memory consuming
return storage.NewMemory(), nil
}),
goka.WithHasher(goka.DefaultHasher()),
goka.WithConsumerGroupBuilder(goka.ConsumerGroupBuilderWithConfig(saramaCfg)),
goka.WithTopicManagerBuilder(goka.TopicManagerBuilderWithConfig(saramaCfg, tmc)),
)
if err != nil {
return nil, fmt.Errorf("failed to create processor: %w", err)
}
sp.logger.Info().Msg("Kafka streams Processor (consumer) created")
sp.processor = processor
sp.emitter = emitter
return sp, nil
}
Logs
logs: CLICK ME
{"level":"info","caller":"/build/internal/infrastructure/repository/kafka/producer.go:40","time":"2024-12-12T09:45:15Z","message":"Creating Kafka Producer"}
{"level":"info","bootstrap.servers":"b-1.our-server:9094,b-3.our-server:9094,b-2.our-server:9094","security_protocol":"ssl","caller":"/build/internal/infrastructure/repository/kafka/producer.go:41","time":"2024-12-12T09:45:15Z","message":"Kafka Producer configuration"}
{"level":"info","caller":"/build/internal/infrastructure/repository/kafka/consumer.go:31","time":"2024-12-12T09:45:15Z","message":"Creating Kafka Consumer"}
{"level":"info","bootstrap.servers":"b-1.our-server:9094,b-3.our-server:9094,b-2.our-server:9094","security_protocol":"ssl","caller":"/build/internal/infrastructure/repository/kafka/consumer.go:32","time":"2024-12-12T09:45:15Z","message":"Kafka Consumer configuration"}
{"level":"info","caller":"/build/internal/infrastructure/repository/kafka/producer.go:40","time":"2024-12-12T09:45:15Z","message":"Creating Kafka Producer"}
{"level":"info","bootstrap.servers":"b-1.our-server:9094,b-3.our-server:9094,b-2.our-server:9094","security_protocol":"ssl","caller":"/build/internal/infrastructure/repository/kafka/producer.go:41","time":"2024-12-12T09:45:15Z","message":"Kafka Producer configuration"}
{"level":"info","caller":"/build/internal/app/app.go:152","time":"2024-12-12T09:45:15Z","message":"Application start"}
{"level":"debug","caller":"/build/internal/usecase/msgbroker/msgbroker.go:79","time":"2024-12-12T09:45:15Z","message":"MessageBroker is starting"}
{"level":"debug","caller":"/build/internal/infrastructure/repository/kafka/consumer.go:130","time":"2024-12-12T09:45:15Z","message":"Get all queue names"}
{"level":"debug","topicCount":16,"caller":"/build/internal/infrastructure/repository/kafka/consumer.go:136","time":"2024-12-12T09:45:15Z","message":"Get topic metadata"}
{"level":"debug","queue":"sc_input_with_media","caller":"/build/internal/infrastructure/repository/kafka/consumer.go:83","time":"2024-12-12T09:45:15Z","message":"Subscribing to queue"}
{"level":"debug","caller":"/build/internal/usecase/msgbroker/msgbroker.go:99","time":"2024-12-12T09:45:15Z","message":"MessageBroker finished initialization"}
{"level":"debug","caller":"/build/internal/usecase/msgbroker/msgbroker.go:102","time":"2024-12-12T09:45:15Z","message":"MessageBroker is starting Consumer"}
{"level":"debug","max_workers":20,"caller":"/build/internal/usecase/msgbroker/msgbroker.go:114","time":"2024-12-12T09:45:15Z","message":"Started message consuming loop"}
{"level":"info","caller":"/build/internal/usecase/streamprocessor/streamprocessor.go:49","time":"2024-12-12T09:45:15Z","message":"Initializing StreamProcessor"}
{"level":"info","bootstrap_servers":"b-1.our-server:9094,b-3.our-server:9094,b-2.our-server:9094","caller":"/build/internal/usecase/streamprocessor/streamprocessor.go:73","time":"2024-12-12T09:45:15Z","message":"Kafka bootstrap servers"}
{"level":"info","bootstrap_servers_split":["b-1.our-server:9094","b-3.our-server:9094","b-2.our-server:9094"[],"caller":"/build/internal/usecase/streamprocessor/streamprocessor.go:76","time":"2024-12-12T09:45:15Z","message":"Kafka bootstrap servers split"}
{"level":"info","input_topic":"messages_engine_input_request","output_topic":"sc_input_with_media","media_topic":"media_descriptions","caller":"/build/internal/usecase/streamprocessor/streamprocessor.go:81","time":"2024-12-12T09:45:15Z","message":"Topics"}
{"level":"info","client_id":"172.18.39.81","method":"GET","status_code":200,"body_size":-1,"path":"/ready","latency":"47.05µs","caller":"/build/internal/controller/http/v1/middleware/logger.go:62","time":"2024-12-12T09:45:16Z"}
{"level":"fatal","error":"failed to create emitter: error creating Kafka producer: Failed to start Sarama producer: kafka: client has run out of available brokers to talk to: 3 errors occurred:\n\t* unexpected EOF\n\t* unexpected EOF\n\t* unexpected EOF\n","caller":"/build/internal/app/app.go:142","time":"2024-12-12T09:45:16Z","message":"Unable to initialize StreamProcessor"}
Additional Context
As you can see from the logs, Confluent Kafka is starting absolutely fine (first part of the logs), but Sarama does not.
Additionally, I wonder why bootstrap servers' string split by comma (bootstrap_servers_split
in logs) have this []
(Confluent accepts single string with comma, Goka accepts a slice). But that might be just Zerolog representation of slice in logs, cause otherwise this slice would not work locally (without TLS).
Activity