Skip to content

Commit e9cb71b

Browse files
Developergoingforstudying-ctrl
authored andcommitted
fix(pubsub): use remaining timeout in listen() to prevent cumulative timeout drift
- Track remaining timeout across loop iterations instead of passing the full timeout each time. This ensures listen(timeout=N) returns within approximately N seconds total, not N seconds per iteration. - Addresses cursor[bot] review comments about full timeout being passed each loop iteration.
1 parent 91b3cd2 commit e9cb71b

2 files changed

Lines changed: 10 additions & 2 deletions

File tree

redis/asyncio/client.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1440,16 +1440,20 @@ async def listen(self, timeout: Optional[float] = None) -> AsyncIterator:
14401440
"""
14411441
if timeout is not None:
14421442
start = time.monotonic()
1443+
remaining = timeout
1444+
else:
1445+
remaining = None
14431446
while self.subscribed:
14441447
response = await self.handle_message(
1445-
await self.parse_response(block=(timeout is None), timeout=timeout)
1448+
await self.parse_response(block=(timeout is None), timeout=remaining)
14461449
)
14471450
if response is not None:
14481451
yield response
14491452
if timeout is not None:
14501453
elapsed = time.monotonic() - start
14511454
if elapsed >= timeout:
14521455
break
1456+
remaining = timeout - elapsed
14531457

14541458
async def get_message(
14551459
self, ignore_subscribe_messages: bool = False, timeout: Optional[float] = 0.0

redis/client.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1449,16 +1449,20 @@ def listen(self, timeout: Optional[float] = None):
14491449
"""
14501450
if timeout is not None:
14511451
start = time.monotonic()
1452+
remaining = timeout
1453+
else:
1454+
remaining = None
14521455
while self.subscribed:
14531456
response = self.handle_message(
1454-
self.parse_response(block=(timeout is None), timeout=timeout)
1457+
self.parse_response(block=(timeout is None), timeout=remaining)
14551458
)
14561459
if response is not None:
14571460
yield response
14581461
if timeout is not None:
14591462
elapsed = time.monotonic() - start
14601463
if elapsed >= timeout:
14611464
break
1465+
remaining = timeout - elapsed
14621466

14631467
def get_message(
14641468
self, ignore_subscribe_messages: bool = False, timeout: float = 0.0

0 commit comments

Comments
 (0)