-
Notifications
You must be signed in to change notification settings - Fork 200
Description
Checklist
- I have included information about relevant versions - all versions
- I have verified that the issue persists when using the
masterbranch of Faust.
Steps to reproduce
The problem is that faust can get stuck in recovery forever when auto.offset.reset is "latest" (https://faust-streaming.github.io/faust/userguide/settings.html#consumer-auto-offset-reset). In recovery, the fetcher keeps fetching records from the changelog topics to rebuild its in-memory state. Say these changelog topic partitions are so large that recovery will take some time. During this recovery time, Kafka may independently remove some topic partition segments in line with the retention policy of the changelog topic. All this while the faust client is still fetching records from the changelog topic partitions. For example:
time t - changelog segments: earliest |----------------------------|----------------------| latest offset
time t+1 - recovery progress: earliest |............R
time t+2 - changelog segments: new earliest |----------------------| latest offset
time t+3 - recovery has offset R, but offset R+1 no longer exists in the changelog topic
Therefore at time t+3 since offset R no longer exists in the changelog topic the fetcher will jump to an offset as configured by auto.offset.reset. This is the normal Kafka consumer behavior when the offset of a consumer group is no longer in the topic. See: https://github.com/aio-libs/aiokafka/blob/49a557334584b5a09e96e78609b111777e14b2fb/aiokafka/consumer/fetcher.py#L794-L796
If auto.offset.reset=latest the fetcher jumps to latest offset + 1, instead of the new earliest. Therefore, the fetcher will wait for one more record to be published to the changelog topic. But we're still in recovery mode, so, by construction, nothing is publishing to the changelog. Recovery cannot finish and faust is stuck in recovery forever. See:
faust/faust/tables/recovery.py
Lines 888 to 890 in ff75c0b
| def need_recovery(self) -> bool: | |
| """Return :const:`True` if recovery is required.""" | |
| return any(v > 0 for v in self.active_remaining().values()) |
Remember that, faust applies the same auto.offset.reset to all input topics (including changelogs), and does not make any distinction while fetching.
faust/faust/transport/drivers/aiokafka.py
Lines 1010 to 1030 in ff75c0b
| async def _fetch_records( | |
| self, | |
| consumer: aiokafka.AIOKafkaConsumer, | |
| active_partitions: Set[TP], | |
| timeout: Optional[float] = None, | |
| max_records: Optional[int] = None, | |
| ) -> RecordMap: | |
| if not self.consumer.flow_active: | |
| return {} | |
| fetcher = consumer._fetcher | |
| if consumer._closed or fetcher._closed: | |
| raise ConsumerStoppedError() | |
| with fetcher._subscriptions.fetch_context(): | |
| try: | |
| return await fetcher.fetched_records( | |
| active_partitions, | |
| timeout=timeout, | |
| max_records=max_records, | |
| ) | |
| finally: | |
| pass |
This behavior is difficult to reproduce. If we set auto.offset.reset to latest. One would need to modify the retention policy of a changelog topic during recovery (maybe adding some asyncio.sleep in the recovery task to buy some time).
Expected behavior
Recovery should eventually finish.
Actual behavior
Faust is stuck in recovery.