Skip to content

Commit 2f7b9c0

Browse files
test: add test cases for keyspace and keyevent notifications (#336)
1 parent 3fe654b commit 2f7b9c0

File tree

1 file changed

+51
-0
lines changed

1 file changed

+51
-0
lines changed
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
import pytest
2+
import time
3+
from redis.client import PubSub
4+
5+
def wait_for_message(pubsub: PubSub, timeout=0.5, ignore_subscribe_messages=False):
6+
now = time.time()
7+
timeout = now + timeout
8+
while now < timeout:
9+
message = pubsub.get_message(ignore_subscribe_messages=ignore_subscribe_messages)
10+
if message is not None:
11+
return message
12+
time.sleep(0.01)
13+
now = time.time()
14+
return None
15+
16+
def test_keyspace_notifications():
17+
import fakeredis
18+
19+
server = fakeredis.FakeServer()
20+
r = fakeredis.FakeRedis(server=server)
21+
22+
# Enable keyspace notifications in redis
23+
r.config_set('notify-keyspace-events', 'KEA')
24+
25+
p = r.pubsub()
26+
p.psubscribe('__keyspace@0__:*')
27+
p.psubscribe('__keyevent@0__:*')
28+
29+
msg1 = wait_for_message(p)
30+
msg2 = wait_for_message(p)
31+
assert msg1['type'] == 'psubscribe'
32+
assert msg2['type'] == 'psubscribe'
33+
34+
r.set('foo', 'bar')
35+
36+
msgs = []
37+
msg = wait_for_message(p, timeout=1.0)
38+
while msg is not None:
39+
msgs.append(msg)
40+
msg = wait_for_message(p, timeout=0.1)
41+
42+
assert len(msgs) >= 2
43+
44+
keyspace_msg = next((m for m in msgs if m['channel'] == b'__keyspace@0__:foo'), None)
45+
keyevent_msg = next((m for m in msgs if m['channel'] == b'__keyevent@0__:set'), None)
46+
47+
assert keyspace_msg is not None
48+
assert keyspace_msg['data'] == b'set'
49+
50+
assert keyevent_msg is not None
51+
assert keyevent_msg['data'] == b'foo'

0 commit comments

Comments
 (0)