Skip to content

Kafka brokers unavailable is not a fatal error, even when trying to consume #982

Open
@rakker91

Description

@rakker91

Description

Without brokers running, when you try to consume, you will receive Errors (broker down) over the error handler defined, but these are not defined as Fatal and you'll end up in Consume indefinitely without ever retrying.

How to reproduce

Don't start kafka.
Create a consumer (we're using IConsumer<byte[], byte[]> and wire up error handler:
this.ConsumerBuilder.SetErrorHandler(this.OnKafkaConsumerError);

Call Consume
var consumeResult = this.Consumer.Consume(token);

You'll get broker down errors like the following:

2019-06-14 10:31:14,126 |7| (Tws.Client.Event.Channels.Clients.Kafka.KafkaProvider) [ERROR] - Kafka has registered a non-fatal error and will try to recover. Error: 127.0.0.1:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: No connection could be made because the target machine actively refused it... (after 3001ms in state CONNECT)
2019-06-14 10:31:14,129 |7| (Tws.Client.Event.Channels.Clients.Kafka.KafkaProvider) [ERROR] - Kafka has registered a non-fatal error and will try to recover. Error: 1/1 brokers are down

But they're not fatal and no retry ever happens. You end up blocked at Consume, even if you start the broker.

Checklist

Please provide the following information:

  • A complete (i.e. we can run it), minimal program demonstrating the problem. No need to supply a project file.
    using System;
    using System.Collections.Generic;

namespace KafkaDemo
{
using Confluent.Kafka;

public class TestClass
{
    public void Run()
    {
        var consumerBuilder = new ConsumerBuilder<byte[], byte[]>(new Dictionary<string, string>() { { "bootstrap.servers", "192.168.1.99:9092" }, {"group.id", "group1"} });
        consumerBuilder.SetErrorHandler(this.ErrorHandler);
        var consumer = consumerBuilder.Build();

        while (true)
        {
            consumer.Consume();
        }
    }

    private void ErrorHandler(IConsumer<byte[], byte[]> consumer, Error error)
    {
        Console.WriteLine($"Error Received.  Fatal: {error.IsFatal}, Error: {error}");
    }
}

}

  • Confluent.Kafka nuget version.
    1.0.1.1
  • Apache Kafka version.
    2.12
  • Client configuration.
  • Operating system.
    windows 10 connecting to ubuntu 18.04 for kafka
  • Provide logs (with "debug" : "..." as necessary in configuration).
    See above
  • Provide broker log excerpts.
    See above
  • Critical issue.

Metadata

Metadata

Assignees

No one assigned

    Labels

    investigate furtherwe want to further investigate to understand properly

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions