-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathredis_queue.py
More file actions
35 lines (31 loc) · 1.1 KB
/
redis_queue.py
File metadata and controls
35 lines (31 loc) · 1.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import logging
import time
import json
import redis
logger = logging.getLogger(__name__)
MESSAGE_QUEUE = "message_queue"
redis_host = "127.0.0.1"
redis_port=6379
redis_client = redis.Redis(host=redis_host, port=redis_port, db=0)
async def serve_message(active_connections, client):
"""
Utilisation de redis pour redistribuer les messages aux destinataires abonnés au canal
:return:
"""
logger.info("start listen from redis")
pubsub = client.pubsub()
pubsub.subscribe(MESSAGE_QUEUE)
try:
for message in pubsub.listen():
time.sleep(0.5)
if message['type'] == "message":
message_data = json.loads(message["data"].decode('utf-8'))
if message_data['receiver'] in active_connections:
logger.info(f"Send message {message_data} to {message_data['receiver']}")
else:
client.publish(MESSAGE_QUEUE, message["data"])
# logger.info(f"Waiting receiver {message_data['receiver']}")
except Exception as e:
raise e
finally:
pubsub.close()