Skip to content

Commit 09183ad

Browse files
committed
Fixing pubsub's listen method to be blocking.
1 parent 8210f32 commit 09183ad

5 files changed

Lines changed: 360 additions & 4 deletions

File tree

redis/asyncio/client.py

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import asyncio
22
import copy
33
import inspect
4+
import math
45
import re
56
import time
67
import warnings
@@ -1276,7 +1277,21 @@ async def parse_response(self, block: bool = True, timeout: float = 0):
12761277
if not conn.is_connected:
12771278
await conn.connect()
12781279

1279-
read_timeout = None if block else timeout
1280+
# Block=True: signal "no timeout" to conn.read_response via
1281+
# math.inf. The connection treats math.inf as the per-read
1282+
# opt-in for blocking indefinitely without falling back to
1283+
# self.socket_timeout. Reconnect/AUTH/HELLO/resubscribe
1284+
# operations performed by the retry layer continue to honor
1285+
# self.socket_timeout because they do not pass math.inf.
1286+
#
1287+
# TODO(next-major): when the async Connection.read_response
1288+
# default for ``timeout`` is changed to SENTINEL, passing
1289+
# ``timeout=None`` from this method will become the natural
1290+
# "no timeout" signal and the math.inf hand-off can be
1291+
# removed. That swap is a breaking change to the
1292+
# Connection.read_response signature so it must wait for a
1293+
# major release.
1294+
read_timeout = math.inf if block else timeout
12801295
response = await self._execute(
12811296
conn,
12821297
conn.read_response,

redis/asyncio/connection.py

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import asyncio
22
import copy
33
import inspect
4+
import math
45
import socket
56
import sys
67
import time
@@ -746,8 +747,35 @@ async def read_response(
746747
disconnect_on_error: bool = True,
747748
push_request: Optional[bool] = False,
748749
):
749-
"""Read the response from a previously sent command"""
750-
read_timeout = timeout if timeout is not None else self.socket_timeout
750+
"""Read the response from a previously sent command.
751+
752+
``timeout`` semantics:
753+
- ``None`` (default): fall back to ``self.socket_timeout``.
754+
- ``math.inf``: block indefinitely with no timeout. Used by PubSub
755+
blocking reads (``listen()`` / ``get_message(timeout=None)`` /
756+
``parse_response(block=True)``) where the configured
757+
``socket_timeout`` must not abort the read.
758+
- ``float``: apply that timeout in seconds for this single read.
759+
760+
TODO(next-major): replace the ``math.inf`` opt-in with a SENTINEL
761+
default for ``timeout``. After that change, ``timeout=None`` will
762+
mean "no timeout, block until a response arrives" (matching the
763+
long-standing PubSub docstring contract) and the SENTINEL default
764+
will be the value that falls back to ``self.socket_timeout``.
765+
That swap is a breaking change, so it must wait for a major
766+
release. Until then, callers that need an indefinitely blocking
767+
read pass ``math.inf`` explicitly.
768+
"""
769+
# TODO(next-major): drop the math.inf branch. Use SENTINEL as the
770+
# default for ``timeout`` and treat ``timeout is None`` as the
771+
# "no timeout" signal (matching the PubSub docstring contract).
772+
# Match only positive infinity here. ``-math.inf`` is not a valid
773+
# "block forever" signal and historically behaved as an already-
774+
# expired timeout; preserve that.
775+
if timeout == math.inf:
776+
read_timeout = None
777+
else:
778+
read_timeout = timeout if timeout is not None else self.socket_timeout
751779
host_error = self._host_error()
752780
try:
753781
if read_timeout is not None and self.protocol in ["3", 3]:

redis/client.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1255,7 +1255,11 @@ def try_read():
12551255
read_timeout = timeout
12561256
else:
12571257
conn.connect()
1258-
read_timeout = SENTINEL # Use default socket timeout for blocking
1258+
# Block indefinitely waiting for a pubsub message. timeout=None
1259+
# makes the socket layer call sock.settimeout(None) for this read
1260+
# (and restore the original socket_timeout afterwards), so the
1261+
# configured socket_timeout does not abort the read.
1262+
read_timeout = None
12591263
return conn.read_response(
12601264
disconnect_on_error=False, push_request=True, timeout=read_timeout
12611265
)

tests/test_asyncio/test_pubsub.py

Lines changed: 187 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1618,6 +1618,193 @@ async def test_timeout_with_no_subscription(self, r):
16181618
await p.aclose()
16191619

16201620

1621+
@pytest.mark.asyncio
1622+
@pytest.mark.onlynoncluster
1623+
class TestAsyncPubSubBlockingListen:
1624+
"""
1625+
Tests that PubSub.listen() and parse_response(block=True) block
1626+
indefinitely waiting for a message, ignoring the connection's
1627+
configured socket_timeout. Regression tests for the issue where
1628+
listen() would raise TimeoutError once socket_timeout elapsed.
1629+
1630+
Retries are disabled on the subscriber so a TimeoutError from the
1631+
blocking read surfaces immediately instead of being silently retried
1632+
by the client's default retry policy (which would mask the bug).
1633+
"""
1634+
1635+
SOCKET_TIMEOUT = 0.3
1636+
PUBLISH_DELAY = SOCKET_TIMEOUT * 3
1637+
1638+
async def _make_subscriber(self, create_redis):
1639+
from redis.asyncio.retry import Retry
1640+
from redis.backoff import NoBackoff
1641+
1642+
return await create_redis(
1643+
socket_timeout=self.SOCKET_TIMEOUT,
1644+
retry=Retry(NoBackoff(), 0),
1645+
retry_on_timeout=False,
1646+
)
1647+
1648+
async def test_listen_blocks_longer_than_socket_timeout(self, create_redis):
1649+
"""
1650+
listen() must keep blocking past socket_timeout. Configure a very
1651+
short socket_timeout, then publish after a delay that exceeds it,
1652+
and assert listen() yields the message instead of raising
1653+
TimeoutError.
1654+
"""
1655+
r = await self._make_subscriber(create_redis)
1656+
publisher = await create_redis()
1657+
p = r.pubsub()
1658+
await p.subscribe("foo")
1659+
msg = await wait_for_message(p, timeout=1.0)
1660+
assert msg is not None and msg["type"] == "subscribe"
1661+
1662+
async def publish_after_delay():
1663+
await asyncio.sleep(self.PUBLISH_DELAY)
1664+
await publisher.publish("foo", "hello")
1665+
1666+
task = asyncio.create_task(publish_after_delay())
1667+
1668+
start = asyncio.get_running_loop().time()
1669+
msg = await p.listen().__anext__()
1670+
elapsed = asyncio.get_running_loop().time() - start
1671+
assert msg["type"] == "message"
1672+
assert msg["data"] == b"hello"
1673+
# Should have blocked past socket_timeout to receive the message.
1674+
assert elapsed > self.SOCKET_TIMEOUT
1675+
await task
1676+
await p.aclose()
1677+
1678+
async def test_get_message_block_indefinitely_past_socket_timeout(
1679+
self, create_redis
1680+
):
1681+
"""
1682+
get_message(timeout=None) must block past socket_timeout.
1683+
"""
1684+
r = await self._make_subscriber(create_redis)
1685+
publisher = await create_redis()
1686+
p = r.pubsub()
1687+
await p.subscribe("foo")
1688+
msg = await wait_for_message(p, timeout=1.0)
1689+
assert msg is not None
1690+
1691+
async def publish_after_delay():
1692+
await asyncio.sleep(self.PUBLISH_DELAY)
1693+
await publisher.publish("foo", "delayed")
1694+
1695+
task = asyncio.create_task(publish_after_delay())
1696+
1697+
start = asyncio.get_running_loop().time()
1698+
msg = await p.get_message(timeout=None)
1699+
elapsed = asyncio.get_running_loop().time() - start
1700+
assert msg is not None
1701+
assert msg["type"] == "message"
1702+
assert msg["data"] == b"delayed"
1703+
assert elapsed > self.SOCKET_TIMEOUT
1704+
await task
1705+
await p.aclose()
1706+
1707+
async def test_parse_response_block_true_blocks_past_socket_timeout(
1708+
self, create_redis
1709+
):
1710+
"""
1711+
parse_response(block=True) must block past socket_timeout.
1712+
"""
1713+
r = await self._make_subscriber(create_redis)
1714+
publisher = await create_redis()
1715+
p = r.pubsub(ignore_subscribe_messages=True)
1716+
await p.subscribe("foo")
1717+
# Drain the subscribe ack before calling parse_response directly.
1718+
await wait_for_message(p, timeout=1.0)
1719+
1720+
async def publish_after_delay():
1721+
await asyncio.sleep(self.PUBLISH_DELAY)
1722+
await publisher.publish("foo", "raw")
1723+
1724+
task = asyncio.create_task(publish_after_delay())
1725+
1726+
start = asyncio.get_running_loop().time()
1727+
response = await p.parse_response(block=True)
1728+
elapsed = asyncio.get_running_loop().time() - start
1729+
# Response is the raw pubsub frame [b"message", b"foo", b"raw"].
1730+
assert response is not None
1731+
assert response[0] == b"message"
1732+
assert response[-1] == b"raw"
1733+
assert elapsed > self.SOCKET_TIMEOUT
1734+
await task
1735+
await p.aclose()
1736+
1737+
async def test_blocking_listen_does_not_mutate_socket_timeout(
1738+
self, create_redis
1739+
):
1740+
"""
1741+
A blocking read must not mutate the connection's configured
1742+
socket_timeout. Otherwise reconnect/AUTH/HELLO/resubscribe paths
1743+
triggered by retries inside _execute would inherit no timeout and
1744+
could hang indefinitely. The fix uses a per-read sentinel rather
1745+
than clearing socket_timeout for the duration of the call.
1746+
"""
1747+
r = await self._make_subscriber(create_redis)
1748+
publisher = await create_redis()
1749+
p = r.pubsub()
1750+
await p.subscribe("foo")
1751+
await wait_for_message(p, timeout=1.0)
1752+
1753+
orig_timeout = p.connection.socket_timeout
1754+
assert orig_timeout == self.SOCKET_TIMEOUT
1755+
1756+
# Sample socket_timeout while the blocking read is in progress, to
1757+
# catch any mutation that happens only for the duration of the call.
1758+
async def sample_timeout_during_read():
1759+
# Yield a few times so the get_message read is in flight.
1760+
for _ in range(10):
1761+
await asyncio.sleep(0)
1762+
assert p.connection.socket_timeout == orig_timeout
1763+
1764+
async def publish_after_delay():
1765+
await asyncio.sleep(self.PUBLISH_DELAY)
1766+
await publisher.publish("foo", "msg")
1767+
1768+
sample_task = asyncio.create_task(sample_timeout_during_read())
1769+
publish_task = asyncio.create_task(publish_after_delay())
1770+
1771+
msg = await p.get_message(timeout=None)
1772+
await sample_task
1773+
assert msg is not None and msg["data"] == b"msg"
1774+
assert p.connection.socket_timeout == orig_timeout
1775+
await publish_task
1776+
await p.aclose()
1777+
1778+
async def test_block_indefinitely_does_not_alter_subsequent_reads(
1779+
self, create_redis
1780+
):
1781+
"""
1782+
After a blocking read returns, a follow-up non-blocking read must
1783+
still honor the configured ``socket_timeout`` (i.e. the per-read
1784+
``block_indefinitely`` does not bleed into later operations and
1785+
does not require mutating any connection-wide state).
1786+
"""
1787+
r = await self._make_subscriber(create_redis)
1788+
publisher = await create_redis()
1789+
p = r.pubsub()
1790+
await p.subscribe("foo")
1791+
await wait_for_message(p, timeout=1.0)
1792+
1793+
# First a blocking read.
1794+
await publisher.publish("foo", "first")
1795+
msg = await p.get_message(timeout=None)
1796+
assert msg is not None and msg["data"] == b"first"
1797+
1798+
# Then a non-blocking read with a short timeout and no message.
1799+
# If block_indefinitely leaked, this would block past the timeout.
1800+
start = asyncio.get_running_loop().time()
1801+
msg = await p.get_message(timeout=0.1)
1802+
elapsed = asyncio.get_running_loop().time() - start
1803+
assert msg is None
1804+
assert elapsed < 0.5
1805+
await p.aclose()
1806+
1807+
16211808
@pytest.mark.asyncio
16221809
class TestPubSubHandleMessageMetrics:
16231810
"""Tests for handle_message recording pubsub metrics."""

0 commit comments

Comments
 (0)