What happened?
This is a code snippet from the EventConsumer implementation:
event = await asyncio.wait_for(
self.queue.dequeue_event(), timeout=self._timeout
)
It forcefully times out the dequeue_event() coroutine which might have already picked up the event internally. This will result in a loss of that event.
I'm experiencing this behaviour with Redis implementation of the EventQueue. It does a blocking XREADGROUP followed by XACK. It appears that even XREADGROUP (as implemented in redis-py) in isolation isn't capable of handling the timeout.
Code of Conduct