11# SPDX-License-Identifier: Apache-2.0
22
33import os
4+ import threading
45import time
56from collections .abc import Callable
67from typing import Any
1718# Retry interval when exchange doesn't exist yet (in seconds)
1819EXCHANGE_RETRY_INTERVAL = 60
1920
21+ # Interval for checking for new exchanges after initial connection (in seconds)
22+ EXCHANGE_DISCOVERY_INTERVAL = 60
23+
2024# Multiple exchanges for different OpenStack services
2125EXCHANGES_CONFIG = {
2226 "ironic" : {
@@ -178,6 +182,9 @@ def __init__(self, connection):
178182 self .osism_baremetal_api_url : None | str = None
179183 self .websocket_manager = None
180184 self ._available_exchanges : dict [str , dict ] = {}
185+ self ._discovery_thread : threading .Thread | None = None
186+ self ._stop_discovery = threading .Event ()
187+ self ._new_exchanges_found = threading .Event ()
181188
182189 if settings .OSISM_API_URL :
183190 logger .info ("Setting up OSISM API" )
@@ -222,15 +229,18 @@ def _get_exchange_properties(self, channel, exchange_name: str) -> dict | None:
222229 logger .debug (f"Exchange '{ exchange_name } ' does not exist: { e } " )
223230 return None
224231
225- def _wait_for_exchanges (self ):
232+ def _check_for_new_exchanges (self ):
226233 """
227- Wait for at least one configured exchange to become available .
228- Checks exchanges passively without creating them .
234+ Check for newly available exchanges that aren't yet being consumed .
235+ Returns True if new exchanges were found .
229236 """
230- while not self . _available_exchanges :
231- logger . info ( "Checking for available exchanges..." )
237+ new_found = False
238+ try :
232239 with self .connection .channel () as channel :
233240 for service_name , config in EXCHANGES_CONFIG .items ():
241+ if service_name in self ._available_exchanges :
242+ # Already consuming this exchange
243+ continue
234244 exchange_name = config ["exchange" ]
235245 props = self ._get_exchange_properties (channel , exchange_name )
236246 if props :
@@ -239,8 +249,73 @@ def _wait_for_exchanges(self):
239249 "exchange_props" : props ,
240250 }
241251 logger .info (
242- f"Exchange '{ exchange_name } ' for { service_name } is available"
252+ f"New exchange '{ exchange_name } ' for { service_name } is now available"
243253 )
254+ new_found = True
255+ except Exception as e :
256+ logger .warning (f"Error checking for new exchanges: { e } " )
257+ return new_found
258+
259+ def _exchange_discovery_loop (self ):
260+ """
261+ Background thread that periodically checks for new exchanges.
262+ When new exchanges are found, signals the main consumer to restart.
263+ """
264+ logger .info ("Starting exchange discovery thread" )
265+ while not self ._stop_discovery .is_set ():
266+ # Wait for the discovery interval
267+ if self ._stop_discovery .wait (timeout = EXCHANGE_DISCOVERY_INTERVAL ):
268+ # Stop was requested
269+ break
270+
271+ # Check if all exchanges are already available
272+ if len (self ._available_exchanges ) >= len (EXCHANGES_CONFIG ):
273+ logger .info (
274+ "All configured exchanges are now available. "
275+ "Stopping exchange discovery."
276+ )
277+ break
278+
279+ logger .debug ("Checking for new exchanges..." )
280+ if self ._check_for_new_exchanges ():
281+ logger .info (
282+ "New exchanges found. Signaling consumer restart to add new consumers."
283+ )
284+ self ._new_exchanges_found .set ()
285+ # Signal the consumer to stop so it can restart with new exchanges
286+ self .should_stop = True
287+
288+ logger .info ("Exchange discovery thread stopped" )
289+
290+ def _start_exchange_discovery (self ):
291+ """Start the background exchange discovery thread."""
292+ if len (self ._available_exchanges ) >= len (EXCHANGES_CONFIG ):
293+ # All exchanges already available, no need for discovery
294+ logger .info ("All exchanges available, skipping discovery thread" )
295+ return
296+
297+ self ._stop_discovery .clear ()
298+ self ._discovery_thread = threading .Thread (
299+ target = self ._exchange_discovery_loop ,
300+ name = "exchange-discovery" ,
301+ daemon = True ,
302+ )
303+ self ._discovery_thread .start ()
304+
305+ def _stop_exchange_discovery (self ):
306+ """Stop the background exchange discovery thread."""
307+ self ._stop_discovery .set ()
308+ if self ._discovery_thread and self ._discovery_thread .is_alive ():
309+ self ._discovery_thread .join (timeout = 5 )
310+
311+ def _wait_for_exchanges (self ):
312+ """
313+ Wait for at least one configured exchange to become available.
314+ Checks exchanges passively without creating them.
315+ """
316+ while not self ._available_exchanges :
317+ logger .info ("Checking for available exchanges..." )
318+ self ._check_for_new_exchanges ()
244319
245320 if not self ._available_exchanges :
246321 logger .warning (
@@ -259,6 +334,9 @@ def get_consumers(self, consumer, channel):
259334 # Wait for exchanges to be available before creating consumers
260335 self ._wait_for_exchanges ()
261336
337+ # Start background discovery for remaining exchanges
338+ self ._start_exchange_discovery ()
339+
262340 # Create consumers only for available exchanges
263341 for service_name , config in self ._available_exchanges .items ():
264342 try :
@@ -406,11 +484,31 @@ def on_message(self, body, message):
406484
407485
408486def main ():
487+ # Track available exchanges across restarts
488+ available_exchanges : dict [str , dict ] = {}
489+
409490 while True :
410491 try :
411492 with Connection (BROKER_URI , connect_timeout = 30.0 ) as connection :
412493 connection .connect ()
413- NotificationsDump (connection ).run ()
494+ consumer = NotificationsDump (connection )
495+ # Restore previously discovered exchanges
496+ consumer ._available_exchanges = available_exchanges
497+ consumer .run ()
498+ # Save discovered exchanges for next iteration
499+ available_exchanges = consumer ._available_exchanges
500+ # Stop discovery thread if running
501+ consumer ._stop_exchange_discovery ()
502+
503+ # Check if we stopped due to new exchanges being found
504+ if consumer ._new_exchanges_found .is_set ():
505+ logger .info (
506+ "Restarting consumer to add new exchange consumers. "
507+ f"Total exchanges: { len (available_exchanges )} "
508+ )
509+ consumer ._new_exchanges_found .clear ()
510+ continue
511+
414512 except ConnectionRefusedError :
415513 logger .error ("Connection with broker refused. Retry in 60 seconds." )
416514 time .sleep (60 )
0 commit comments