Skip to content

Commit 8454796

Browse files
authored
Fix RabbitMQ 4 compatibility by using passive exchange declarations (#1855)
The listener service now waits for OpenStack services to create their exchanges instead of creating them itself. This prevents PRECONDITION_FAILED errors when the listener starts before other services and creates exchanges with different properties (e.g., durable=False vs durable=True). Changes: - Add passive exchange declaration to check if exchanges exist without creating them - Wait and retry every 60 seconds until at least one exchange is available - Remove legacy fallback that could create exchanges with wrong properties AI-assisted: Claude Code Signed-off-by: Christian Berendt <[email protected]>
1 parent 182ecd5 commit 8454796

File tree

1 file changed

+78
-16
lines changed

1 file changed

+78
-16
lines changed

osism/services/listener.py

Lines changed: 78 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@
1414
from osism.tasks import netbox
1515
from osism import settings
1616

17+
# Retry interval when exchange doesn't exist yet (in seconds)
18+
EXCHANGE_RETRY_INTERVAL = 60
19+
1720
# Multiple exchanges for different OpenStack services
1821
EXCHANGES_CONFIG = {
1922
"ironic": {
@@ -174,6 +177,7 @@ def __init__(self, connection):
174177
self.osism_api_session: None | requests.Session = None
175178
self.osism_baremetal_api_url: None | str = None
176179
self.websocket_manager = None
180+
self._available_exchanges: dict[str, dict] = {}
177181

178182
if settings.OSISM_API_URL:
179183
logger.info("Setting up OSISM API")
@@ -194,13 +198,81 @@ def __init__(self, connection):
194198

195199
return
196200

201+
def _get_exchange_properties(self, channel, exchange_name: str) -> dict | None:
202+
"""
203+
Check if an exchange exists and retrieve its properties.
204+
Uses passive declaration to check existence without creating the exchange.
205+
206+
Returns exchange properties dict if exists, None otherwise.
207+
"""
208+
try:
209+
# Use exchange_declare with passive=True to check if exchange exists
210+
# This will raise an exception if the exchange doesn't exist
211+
channel.exchange_declare(
212+
exchange=exchange_name,
213+
type="topic",
214+
passive=True,
215+
)
216+
# Exchange exists, get its properties via RabbitMQ management API
217+
# or assume topic type since that's what OpenStack uses
218+
logger.info(f"Exchange '{exchange_name}' exists")
219+
return {"type": "topic", "durable": True}
220+
except Exception as e:
221+
# Exchange doesn't exist
222+
logger.debug(f"Exchange '{exchange_name}' does not exist: {e}")
223+
return None
224+
225+
def _wait_for_exchanges(self):
226+
"""
227+
Wait for at least one configured exchange to become available.
228+
Checks exchanges passively without creating them.
229+
"""
230+
while not self._available_exchanges:
231+
logger.info("Checking for available exchanges...")
232+
with self.connection.channel() as channel:
233+
for service_name, config in EXCHANGES_CONFIG.items():
234+
exchange_name = config["exchange"]
235+
props = self._get_exchange_properties(channel, exchange_name)
236+
if props:
237+
self._available_exchanges[service_name] = {
238+
**config,
239+
"exchange_props": props,
240+
}
241+
logger.info(
242+
f"Exchange '{exchange_name}' for {service_name} is available"
243+
)
244+
245+
if not self._available_exchanges:
246+
logger.warning(
247+
f"No exchanges available yet. Waiting {EXCHANGE_RETRY_INTERVAL} seconds before retry..."
248+
)
249+
time.sleep(EXCHANGE_RETRY_INTERVAL)
250+
251+
logger.info(
252+
f"Found {len(self._available_exchanges)} available exchange(s): "
253+
f"{list(self._available_exchanges.keys())}"
254+
)
255+
197256
def get_consumers(self, consumer, channel):
198257
consumers = []
199258

200-
# Create consumers for all configured exchanges
201-
for service_name, config in EXCHANGES_CONFIG.items():
259+
# Wait for exchanges to be available before creating consumers
260+
self._wait_for_exchanges()
261+
262+
# Create consumers only for available exchanges
263+
for service_name, config in self._available_exchanges.items():
202264
try:
203-
exchange = Exchange(config["exchange"], type="topic", durable=False)
265+
exchange_props = config["exchange_props"]
266+
# Create exchange object matching the existing exchange properties
267+
# Use passive=True to ensure we don't try to create/modify the exchange
268+
exchange = Exchange(
269+
config["exchange"],
270+
type=exchange_props.get("type", "topic"),
271+
durable=exchange_props.get("durable", True),
272+
passive=True,
273+
)
274+
# Create our own queue bound to the existing exchange
275+
# Our queue can have its own properties (non-durable, auto-delete)
204276
queue = Queue(
205277
config["queue"],
206278
exchange,
@@ -217,20 +289,10 @@ def get_consumers(self, consumer, channel):
217289
logger.error(f"Failed to configure consumer for {service_name}: {e}")
218290

219291
if not consumers:
220-
logger.warning(
221-
"No consumers configured, falling back to legacy ironic consumer"
222-
)
223-
# Fallback to legacy configuration
224-
exchange = Exchange(EXCHANGE_NAME, type="topic", durable=False)
225-
queue = Queue(
226-
QUEUE_NAME,
227-
exchange,
228-
routing_key=ROUTING_KEY,
229-
durable=False,
230-
auto_delete=True,
231-
no_ack=True,
292+
logger.error(
293+
"No consumers could be configured. This should not happen after "
294+
"waiting for exchanges."
232295
)
233-
consumers.append(consumer(queue, callbacks=[self.on_message]))
234296

235297
return consumers
236298

0 commit comments

Comments
 (0)