Skip to content

Do IConsumer.Pause and IConsumer.Resume not take immediate effect? #2418

Open
@Aaronontheweb

Description

@Aaronontheweb

I'm in the process of trying to troubleshoot some partition re-balancing bugs inside https://github.com/akkadotnet/Akka.Streams.Kafka and here's something I've noticed:

var currentAssignment = _consumer.Assignment.ToImmutableList();
            var initialRebalanceInProcess = _rebalanceInProgress.Value;

            var partitionsToFetch = _requests.Values.SelectMany(v => v.Topics)
                .Where(p => currentAssignment.Contains(p))
                .ToImmutableHashSet();
            
            if (partitionsToFetch.IsEmpty || _requests.IsEmpty())
            {
                if(_log.IsDebugEnabled)
                    _log.Debug("Requests are empty or no partitions to fetch - partitionsToFetch.IsEmpty={0}, _requests.IsEmpty={1}. Attempting to consume with paused partitions.", 
                        partitionsToFetch.IsEmpty, 
                        _requests.IsEmpty());
                PausePartitions(currentAssignment);
                try
                {
                    // BUG: we are expecting a `null` value here - namely we're waiting for partition rebalancing events to complete
                    var consumed = _consumer.Consume(0); // will still return events even though all partitions are paused

When we call Consume(0) here, we will still get data from the IConsumer even though we just paused all partitions. Is this something I should expect to happen or is this a bug? I couldn't find any documentation on this anywhere I looked.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions