-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathserver.py
More file actions
123 lines (106 loc) · 3.46 KB
/
server.py
File metadata and controls
123 lines (106 loc) · 3.46 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
import asyncio
import logging
import redis.asyncio as redis
import uvicorn
import time
from rich.logging import RichHandler
import fastapi
import json
import multiprocessing as mp
from contextlib import asynccontextmanager
import anyio
from fastapi import FastAPI
from fastapi.websockets import WebSocket
from starlette.websockets import WebSocketDisconnect
from tasks import send, read
logging.basicConfig(
level=logging.WARNING, # DEBUG, INFO, WARNING, ERROR, CRITICAL
format="%(asctime)s:%(name)s:%(funcName)s:%(lineno)d %(message)s",
handlers=[
RichHandler() # Permet d'afficher les logs dans la console
]
)
logger = logging.getLogger(__name__)
logging.getLogger("uvicorn.access").disabled = True
logging.getLogger("uvicorn.error").disabled = True
logging.getLogger("fastapi").disabled = True # Désactive FastAPI logs
redis_host = "127.0.0.1"
redis_port = 6379
processus = []
active_connections = {}
message_queues = set()
### HANDLE PROCESSES CLEAN ####
@asynccontextmanager
async def start_processus(app: FastAPI):
"""
Je vais démarrer le processus de lecture des messages dans la file redis
:return:
"""
for proces in processus:
proces.start()
logger.info(f"✅ Process {proces.pid} running")
yield
for proces in processus:
if proces and proces.is_alive():
proces.terminate()
proces.join()
logger.info("🔴 Redis process stopped.")
client = redis.Redis(host=redis_host, port=redis_port)
pubsub = client.pubsub()
for name in message_queues:
await pubsub.unsubscribe(name)
logger.info("✅ Reset messages")
app = fastapi.FastAPI(lifespan=start_processus)
@app.get("/account")
async def account():
""""""
@app.websocket("/chat/{sender}")
async def chat(websocket: WebSocket, sender: int):
"""
Select a receiver
:param websocket
:param sender:
:return:
"""
await websocket.accept()
active_connections[sender] = websocket
logger.info(f"👶✅ Sender {sender} connected")
logger.warning(f"active connections:{len(active_connections)}")
redis_client = await redis.Redis(host=redis_host, port=redis_port)
## SEND STEP
tasks = list()
tasks.append(asyncio.create_task(send(websocket,
sender,
redis_client,
active_connections),
name=f"send-{sender}"))
### RECEIVE STEP FOR MESSAGE IN MEMORY
tasks.append(asyncio.create_task(read(websocket,
sender,
redis_client,
active_connections),
name=f"read-{sender}"))
done, pending = await asyncio.wait(tasks, return_when="ALL_COMPLETED")
for task in done:
try:
task.result()
logger.debug(f"✅ Completed: {task.get_name()}")
except WebSocketDisconnect as e:
pass
finally:
logger.info(f"👶🔴 Sender {sender} is gone")
del active_connections[sender]
for task in pending:
logger.debug(f"⏳ Cancelling: {task.get_name()}")
task.cancel()
@app.get("/test")
def hello_world():
return "ok"
if __name__ == "__main__":
uvicorn.run(
app,
host="0.0.0.0",
port=5000,
log_level="warning",
reload=True
)