Skip to content
This repository was archived by the owner on Jan 2, 2026. It is now read-only.

Commit 28ae65b

Browse files
committed
Add retry logic for synchronous producers
1 parent 39631e2 commit 28ae65b

1 file changed

Lines changed: 68 additions & 7 deletions

File tree

server/mq/core/synchronous_producer.py

Lines changed: 68 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,17 @@ def __init__(self):
1313

1414
self.host = os.getenv("RABBIT_HOST", "rabbitmq-host")
1515
self._initialized = True
16-
self._connection = pika.BlockingConnection(
17-
pika.ConnectionParameters(host=self.host)
16+
17+
# Connection parameters with heartbeat and retry settings
18+
connection_params = pika.ConnectionParameters(
19+
host=self.host,
20+
heartbeat=600, # 10 minutes
21+
blocked_connection_timeout=300, # 5 minutes
22+
retry_delay=2,
23+
socket_timeout=10,
1824
)
25+
26+
self._connection = pika.BlockingConnection(connection_params)
1927
self._channel = self._connection.channel()
2028

2129
def __new__(cls):
@@ -24,9 +32,62 @@ def __new__(cls):
2432
cls._instance._initialized = False
2533
return cls._instance
2634

27-
def publish(self, routing_key, body, exchange="swecc-server-exchange"):
28-
self._channel.basic_publish(
29-
exchange=exchange,
30-
routing_key=routing_key,
31-
body=body,
35+
def _reconnect(self):
36+
"""Reconnect to RabbitMQ if connection is lost"""
37+
try:
38+
if self._connection and not self._connection.is_closed:
39+
self._connection.close()
40+
except:
41+
pass # Ignore errors when closing
42+
43+
# Use same connection parameters as __init__
44+
connection_params = pika.ConnectionParameters(
45+
host=self.host,
46+
heartbeat=600, # 10 minutes
47+
blocked_connection_timeout=300, # 5 minutes
48+
retry_delay=2,
49+
socket_timeout=10,
3250
)
51+
52+
self._connection = pika.BlockingConnection(connection_params)
53+
self._channel = self._connection.channel()
54+
55+
def close(self):
56+
"""Properly close the connection"""
57+
try:
58+
if self._channel and not self._channel.is_closed:
59+
self._channel.close()
60+
if self._connection and not self._connection.is_closed:
61+
self._connection.close()
62+
except:
63+
pass # Ignore errors when closing
64+
65+
def publish(self, routing_key, body, exchange="swecc-server-exchange"):
66+
# Check if connection/channel is still open and reconnect if needed
67+
if (
68+
not self._connection
69+
or self._connection.is_closed
70+
or not self._channel
71+
or self._channel.is_closed
72+
):
73+
self._reconnect()
74+
75+
try:
76+
self._channel.basic_publish(
77+
exchange=exchange,
78+
routing_key=routing_key,
79+
body=body,
80+
)
81+
except Exception as e:
82+
# Try to reconnect once on failure
83+
try:
84+
self._reconnect()
85+
self._channel.basic_publish(
86+
exchange=exchange,
87+
routing_key=routing_key,
88+
body=body,
89+
)
90+
except Exception as retry_e:
91+
raise Exception(
92+
f"Failed to publish message after retry: {retry_e}"
93+
) from e

0 commit comments

Comments
 (0)