Skip to content

CancelToken seems to not work properly on consumer #1085

Open
@tsebastiani

Description

@tsebastiani

Description

Hi,
I'm trying to implement a simple graceful shutdown for kafka consumer. The expected behaviour (from the api description) should be that the consume method waits until a new message is fetched or the cancel method is called on the CancellationTokenSource, but this is not actually happening.
My first try was to run the Polling from inside a Task, but it wasn't working at all so I tried with the simplest example possible, but it's still not working.

Confluent nuget version 1.3.0-PRE1 , 1.2.1, 1.2.0
Kafka version 2.12-2.3.0
DotnetCore 3.0
MacOS Catalina

How to reproduce

            var tokenSource = new CancellationTokenSource();
            var token = tokenSource.Token;
            //Binding the token cancel event to an assembly unload (issued on docker stop) and a ctrl+c
            AssemblyLoadContext.Default.Unloading += (s) => { Logger.Debug("Container closing..."); tokenSource.Cancel(); };
            Console.CancelKeyPress += (s, e) => { tokenSource.Cancel(); Logger.Debug("Container ctrl+c..."); };
            var conf = new ConsumerConfig
            {
                GroupId = "cg1",
                BootstrapServers = "localhost:9092",
                AutoOffsetReset = AutoOffsetReset.Earliest,
            };
            using (var c = new ConsumerBuilder<Ignore, string>(conf).Build())
            {
                c.Subscribe(Env.InputTopic);

                Logger.Debug("Waiting for a message.....");
                var message = c.Consume(token);

                if (token.IsCancellationRequested)
                {
                   //this log is never printed after docker stop or ctrl+c "Container Closing" and "Container ctrl+c..." is printed instead
                    Logger.Debug("Token canceled closing connection");
                }
            }
            //this log is never printed after docker stop ctrl+c "Container Closing" and "Container ctrl+c..." is printed instead
            Logger.Debug("Consumer canceled Exiting....");
            return;

Checklist

Please provide the following information:

  • A complete (i.e. we can run it), minimal program demonstrating the problem. No need to supply a project file.
  • Confluent.Kafka nuget version.
  • Apache Kafka version.
  • Client configuration.
  • Operating system.
  • Provide logs (with "debug" : "..." as necessary in configuration).
  • Provide broker log excerpts.
  • Critical issue.

Metadata

Metadata

Assignees

No one assigned

    Labels

    investigate furtherwe want to further investigate to understand properly

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions