Skip to content

Commit da3334f

Browse files
committed
refactor(messaging): use a long-lived Kafka producer
1 parent a24ff29 commit da3334f

2 files changed

Lines changed: 87 additions & 44 deletions

File tree

cts/messaging.py

Lines changed: 38 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -92,51 +92,61 @@ def _retry_with_backoff(func, max_retries=3, initial_delay=1.0, backoff_multipli
9292
raise last_exception
9393

9494

95+
_kafka_producer = None
96+
97+
98+
def _get_kafka_producer():
99+
"""Get or create a long-lived Kafka producer.
100+
101+
The producer is created lazily on first use and reused for subsequent
102+
calls to avoid repeated TCP/TLS/SASL handshakes.
103+
"""
104+
global _kafka_producer
105+
if _kafka_producer is None:
106+
from kafka import KafkaProducer
107+
108+
_kafka_producer = KafkaProducer(
109+
bootstrap_servers=conf.messaging_broker_urls,
110+
compression_type=conf.messaging_kafka_compression_type,
111+
security_protocol=conf.messaging_kafka_security_protocol,
112+
sasl_mechanism=conf.messaging_kafka_sasl_mechanism,
113+
sasl_plain_username=conf.messaging_kafka_username,
114+
sasl_plain_password=conf.messaging_kafka_password,
115+
value_serializer=lambda v: json.dumps(v).encode("utf-8"),
116+
)
117+
return _kafka_producer
118+
119+
95120
def _kafka_send_msg(msgs):
96121
"""Send messages to Kafka with retry logic.
97122
123+
Uses a persistent producer that is reused across calls. On failure,
124+
the producer is closed and recreated on the next retry attempt.
125+
98126
:param list[dict] msgs: List of messages to be sent.
99127
:raises Exception: If Kafka operations fail after retries
100128
"""
101-
from kafka import KafkaProducer
102129

103130
def _send():
104-
"""Inner function to send messages (will be retried on failure)"""
105-
config = {
106-
"bootstrap_servers": conf.messaging_broker_urls,
107-
"compression_type": conf.messaging_kafka_compression_type,
108-
"security_protocol": conf.messaging_kafka_security_protocol,
109-
"sasl_mechanism": conf.messaging_kafka_sasl_mechanism,
110-
"sasl_plain_username": conf.messaging_kafka_username,
111-
"sasl_plain_password": conf.messaging_kafka_password,
112-
"value_serializer": lambda v: json.dumps(v).encode("utf-8"),
113-
}
114-
115-
producer = None
131+
global _kafka_producer
116132
try:
117-
producer = KafkaProducer(**config)
118-
119-
# Send all messages first, then flush once for better performance
133+
producer = _get_kafka_producer()
120134
for msg in msgs:
121135
event = msg.get("event", "event")
122136
topic = "%s%s" % (conf.messaging_topic_prefix, event)
123137
producer.send(topic, msg)
124-
125-
# Single flush for all messages - more efficient than flushing each message
126-
producer.flush()
127-
128138
except Exception as e:
129139
log.error("Failed to send messages to Kafka: %s", str(e))
130-
raise
131-
finally:
132-
# Ensure producer is always closed, even on exceptions
133-
if producer is not None:
140+
# Close and discard the broken producer so the next retry
141+
# creates a fresh connection.
142+
if _kafka_producer is not None:
134143
try:
135-
producer.close()
136-
except Exception as e:
137-
log.warning("Error closing Kafka producer: %s", str(e))
144+
_kafka_producer.close()
145+
except Exception:
146+
pass
147+
_kafka_producer = None
148+
raise
138149

139-
# Retry the send operation with exponential backoff
140150
_retry_with_backoff(_send)
141151

142152

tests/test_events.py

Lines changed: 49 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030

3131
from cts import conf
3232
from cts import app, db
33+
import cts.messaging
3334
from cts.messaging import _retry_with_backoff, _kafka_send_msg, _umb_send_msg, publish
3435
from cts.models import Compose, User, Tag
3536
from utils import ModelsBaseTest
@@ -95,6 +96,14 @@ class TestKafkaSendMessageWhenComposeIsCreated(ModelsBaseTest):
9596

9697
disable_event_handlers = False
9798

99+
def setUp(self):
100+
super(TestKafkaSendMessageWhenComposeIsCreated, self).setUp()
101+
cts.messaging._kafka_producer = None
102+
103+
def tearDown(self):
104+
super(TestKafkaSendMessageWhenComposeIsCreated, self).tearDown()
105+
cts.messaging._kafka_producer = None
106+
98107
def setup_composes(self):
99108
User.create_user(username="odcs")
100109
db.session.commit()
@@ -130,28 +139,45 @@ def test_send_message(self, KafkaProducer):
130139
},
131140
)
132141

133-
# Verify flush and close were called
134-
mock_producer.flush.assert_called_once()
135-
mock_producer.close.assert_called_once()
142+
# Producer should not be closed on success (long-lived)
143+
mock_producer.close.assert_not_called()
136144

137145
@patch.object(conf, "messaging_broker_urls", new=["localhost:9092"])
138146
@patch.object(conf, "messaging_kafka_username", new="test_user")
139147
@patch.object(conf, "messaging_kafka_password", new="test_password")
140148
@patch("kafka.KafkaProducer")
141149
def test_kafka_producer_closed_on_exception(self, KafkaProducer):
142-
"""Test that Kafka producer is closed even when send() fails"""
150+
"""Test that producer is closed and recreated on failure"""
143151
mock_producer = KafkaProducer.return_value
144152
mock_producer.send.side_effect = Exception("Send failed")
145153

146154
msgs = [{"event": "test", "data": "test_data"}]
147155

148-
with patch("time.sleep"): # Mock sleep for retry backoff
156+
with patch("time.sleep"):
149157
with self.assertRaises(Exception):
150158
_kafka_send_msg(msgs)
151159

152-
# Producer close should be called on every retry attempt (finally block)
160+
# Producer should be closed on each failed attempt and recreated
153161
# Default is 3 retries + 1 initial = 4 attempts
154162
self.assertEqual(mock_producer.close.call_count, 4)
163+
# After all retries exhausted, producer should be reset to None
164+
self.assertIsNone(cts.messaging._kafka_producer)
165+
166+
@patch.object(conf, "messaging_broker_urls", new=["localhost:9092"])
167+
@patch.object(conf, "messaging_kafka_username", new="test_user")
168+
@patch.object(conf, "messaging_kafka_password", new="test_password")
169+
@patch("kafka.KafkaProducer")
170+
def test_kafka_producer_reused_across_calls(self, KafkaProducer):
171+
"""Test that producer is created once and reused"""
172+
mock_producer = KafkaProducer.return_value
173+
174+
_kafka_send_msg([{"event": "compose-created", "compose": {}}])
175+
_kafka_send_msg([{"event": "compose-tagged", "compose": {}}])
176+
177+
# Producer should only be created once
178+
KafkaProducer.assert_called_once()
179+
# But send should be called twice
180+
self.assertEqual(mock_producer.send.call_count, 2)
155181

156182

157183
@patch("cts.messaging.publish")
@@ -510,25 +536,32 @@ def producer_side_effect(*args, **kwargs):
510536
class TestKafkaRetries(unittest.TestCase):
511537
"""Test Kafka-specific retry behavior"""
512538

539+
def setUp(self):
540+
cts.messaging._kafka_producer = None
541+
542+
def tearDown(self):
543+
cts.messaging._kafka_producer = None
544+
513545
@patch.object(conf, "messaging_broker_urls", new=["localhost:9092"])
514546
@patch.object(conf, "messaging_kafka_username", new="test_user")
515547
@patch.object(conf, "messaging_kafka_password", new="test_password")
516548
@patch.object(conf, "messaging_topic_prefix", new="cts.")
517549
def test_kafka_send_msg_retries_on_transient_failure(self):
518550
"""Test that _kafka_send_msg retries on transient failures"""
519-
# Simulate transient failure then success
520-
attempt_count = [0]
551+
# Simulate send failure on first call, then success
552+
send_count = [0]
521553
mock_producer = Mock()
522554

523-
def producer_side_effect(*args, **kwargs):
524-
attempt_count[0] += 1
525-
if attempt_count[0] == 1:
555+
def send_side_effect(*args, **kwargs):
556+
send_count[0] += 1
557+
if send_count[0] == 1:
526558
raise ConnectionError("Transient network error")
527-
return mock_producer
528559

529-
with patch("time.sleep"): # Mock sleep to speed up test
530-
with patch("kafka.KafkaProducer", side_effect=producer_side_effect):
531-
# Should succeed on second attempt
560+
mock_producer.send.side_effect = send_side_effect
561+
562+
with patch("time.sleep"):
563+
with patch("kafka.KafkaProducer", return_value=mock_producer):
532564
_kafka_send_msg([{"event": "test", "data": "test"}])
533565

534-
self.assertEqual(attempt_count[0], 2)
566+
# First send fails, producer is closed and reset, second send succeeds
567+
self.assertEqual(send_count[0], 2)

0 commit comments

Comments
 (0)