Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 21 additions & 3 deletions redis/asyncio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1430,12 +1430,30 @@ def sunsubscribe(self, *args, target_node=None) -> Awaitable:
self.pending_unsubscribe_shard_channels.update(s_channels)
return self.execute_command("SUNSUBSCRIBE", *args)

async def listen(self) -> AsyncIterator:
"""Listen for messages on channels this client has been subscribed to"""
async def listen(self, timeout: Optional[float] = None) -> AsyncIterator:
"""
Listen for messages on channels this client has been subscribed to.

If timeout is specified, the system will wait for `timeout` seconds
before returning. Timeout should be specified as a floating point
number or None to wait indefinitely.
"""
if timeout is not None:
start = time.monotonic()
remaining = timeout
else:
remaining = None
while self.subscribed:
response = await self.handle_message(await self.parse_response(block=True))
response = await self.handle_message(
await self.parse_response(block=(timeout is None), timeout=remaining)
)
if response is not None:
yield response
if timeout is not None:
elapsed = time.monotonic() - start
if elapsed >= timeout:
break
remaining = timeout - elapsed

async def get_message(
self, ignore_subscribe_messages: bool = False, timeout: Optional[float] = 0.0
Expand Down
24 changes: 21 additions & 3 deletions redis/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1439,12 +1439,30 @@ def sunsubscribe(self, *args, target_node=None):
self.pending_unsubscribe_shard_channels.update(s_channels)
return self.execute_command("SUNSUBSCRIBE", *args)

def listen(self):
"Listen for messages on channels this client has been subscribed to"
def listen(self, timeout: Optional[float] = None):
"""
Listen for messages on channels this client has been subscribed to.

If timeout is specified, the system will wait for `timeout` seconds
before returning. Timeout should be specified as a floating point
number, or None, to wait indefinitely.
"""
if timeout is not None:
start = time.monotonic()
remaining = timeout
else:
remaining = None
while self.subscribed:
response = self.handle_message(self.parse_response(block=True))
response = self.handle_message(
self.parse_response(block=(timeout is None), timeout=remaining)
)
Comment thread
cursor[bot] marked this conversation as resolved.
Comment thread
cursor[bot] marked this conversation as resolved.
if response is not None:
yield response
if timeout is not None:
elapsed = time.monotonic() - start
if elapsed >= timeout:
break
Comment thread
cursor[bot] marked this conversation as resolved.
remaining = timeout - elapsed

def get_message(
self, ignore_subscribe_messages: bool = False, timeout: float = 0.0
Expand Down
80 changes: 80 additions & 0 deletions tests/test_pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -2230,6 +2230,86 @@ def test_timeout_with_no_subscription(self, r):
msg = p.get_message(timeout=0.1)
assert msg is None

def test_listen_with_timeout_returns_none_when_no_message(self, r):
"""
Test that listen() with timeout parameter respects the timeout
and yields nothing when no message arrives within the timeout period.
Fixes redis/redis-py#4098.
"""
p = r.pubsub()
p.subscribe("foo")
# Read subscription message
msg = wait_for_message(p, timeout=1.0)
assert msg is not None
assert msg["type"] == "subscribe"

# listen with timeout should yield nothing and return quickly
start = time.monotonic()
messages = list(p.listen(timeout=0.1))
elapsed = time.monotonic() - start
assert len(messages) == 0
# Verify timeout was actually respected (within reasonable bounds)
assert elapsed < 0.5

def test_listen_with_timeout_yields_message_when_published(self, r):
"""
Test that listen() with timeout yields a message if one arrives
before the timeout expires.
Fixes redis/redis-py#4098.
"""
p = r.pubsub()
p.subscribe("foo")
# Read subscription message
msg = wait_for_message(p, timeout=1.0)
assert msg is not None

# Publish a message
r.publish("foo", "hello")

# listen with timeout should yield the message
messages = list(p.listen(timeout=1.0))
assert len(messages) == 1
assert messages[0]["type"] == "message"
assert messages[0]["data"] == b"hello"

def test_listen_timeout_none_blocks_until_message(self, r):
"""
Test that listen(timeout=None) blocks indefinitely until a message arrives.
We test this by using a thread to publish a message after a delay.
Fixes redis/redis-py#4098.
"""
p = r.pubsub()
p.subscribe("foo")
# Read subscription message
msg = wait_for_message(p, timeout=1.0)
assert msg is not None

# Publish a message after a short delay in a thread
start_publishing = threading.Event()

def publish_after_delay():
start_publishing.wait()
time.sleep(0.2)
r.publish("foo", "delayed_message")

thread = threading.Thread(target=publish_after_delay, daemon=True)
thread.start()

# listen with timeout=None should block until message arrives
start = time.monotonic()
start_publishing.set()
messages = []
for msg in p.listen(timeout=None):
messages.append(msg)
break
elapsed = time.monotonic() - start
assert len(messages) == 1
assert messages[0]["type"] == "message"
assert messages[0]["data"] == b"delayed_message"
# Should have waited at least 0.2 seconds
assert elapsed >= 0.15
thread.join(timeout=1.0)


class TestClusterPubSubTimeoutPropagation:
"""
Expand Down