|
15 | 15 |
|
16 | 16 | app = Flask(__name__) |
17 | 17 |
|
18 | | -BACKOFF_BASE = float(os.getenv('BACKOFF_BASE', '1.0')) # seconds |
19 | | -BACKOFF_MAX = float(os.getenv('BACKOFF_MAX', '30.0')) # seconds |
20 | 18 | PREFETCH_COUNT = int(os.getenv('RABBITMQ_PREFETCH_COUNT', '1')) # messages |
21 | 19 | MAX_JOB_RETRIES = int(os.getenv('MAX_JOB_RETRIES', '10')) # messages |
22 | 20 |
|
23 | | -stop_event = threading.Event() |
24 | 21 | worker_thread: threading.Thread | None = None |
25 | 22 | worker_running = threading.Event() |
26 | 23 |
|
@@ -138,45 +135,38 @@ def process_job(self, channel, method, header, body): |
138 | 135 | logging.info(f"Job {job_id} failed after {MAX_JOB_RETRIES} retries. Discarding job.") |
139 | 136 |
|
140 | 137 | def listen_for_jobs(self): |
141 | | - backoff = BACKOFF_BASE |
142 | | - while not stop_event.is_set(): |
143 | | - logging.info("Worker connecting to RabbitMQ...") |
144 | | - if not self.is_connected(): |
145 | | - logging.error("RabbitMQ is offline") |
146 | | - return |
147 | | - |
148 | | - logging.info("Worker connected to RabbitMQ") |
149 | | - self.consumer = self.channel.basic_consume( |
150 | | - queue=self.rabbitmq_queue, |
151 | | - on_message_callback=self.process_job, |
152 | | - auto_ack=False |
153 | | - ) |
154 | | - |
155 | | - # start listening |
156 | | - logging.info("Waiting for messages. To exit press CTRL+C") |
157 | | - worker_running.set() # mark healthy |
158 | | - |
159 | | - try: |
160 | | - # pylint: disable=protected-access |
161 | | - while self.channel and self.channel.is_open and self.channel._consumer_infos: |
162 | | - self.channel.connection.process_data_events(time_limit=1) # 1 second |
163 | | - except SystemExit: |
164 | | - raise |
165 | | - except KeyboardInterrupt: |
166 | | - raise |
167 | | - except GeneratorExit: |
168 | | - raise |
169 | | - except Exception: # pylint: disable=broad-except |
170 | | - logging.error("Worker crashed/disconnected:\n%s", traceback.format_exc()) |
171 | | - finally: |
172 | | - logging.info("Stopped listening for messages.") |
173 | | - self.close() |
174 | | - self.connection = None |
175 | | - |
176 | | - # backoff with cap |
177 | | - if stop_event.wait(backoff): |
178 | | - break |
179 | | - backoff = min(backoff * 2, BACKOFF_MAX) |
| 138 | + logging.info("Worker connecting to RabbitMQ...") |
| 139 | + if not self.is_connected(): |
| 140 | + logging.error("RabbitMQ is offline") |
| 141 | + return |
| 142 | + |
| 143 | + logging.info("Worker connected to RabbitMQ") |
| 144 | + self.consumer = self.channel.basic_consume( |
| 145 | + queue=self.rabbitmq_queue, |
| 146 | + on_message_callback=self.process_job, |
| 147 | + auto_ack=False |
| 148 | + ) |
| 149 | + |
| 150 | + # start listening |
| 151 | + logging.info("Waiting for messages. To exit press CTRL+C") |
| 152 | + worker_running.set() # mark healthy |
| 153 | + |
| 154 | + try: |
| 155 | + # pylint: disable=protected-access |
| 156 | + while self.channel and self.channel.is_open and self.channel._consumer_infos: |
| 157 | + self.channel.connection.process_data_events(time_limit=1) # 1 second |
| 158 | + except SystemExit: |
| 159 | + raise |
| 160 | + except KeyboardInterrupt: |
| 161 | + raise |
| 162 | + except GeneratorExit: |
| 163 | + raise |
| 164 | + except Exception: # pylint: disable=broad-except |
| 165 | + logging.error("Worker crashed/disconnected:\n%s", traceback.format_exc()) |
| 166 | + finally: |
| 167 | + logging.info("Stopped listening for messages.") |
| 168 | + self.close() |
| 169 | + self.connection = None |
180 | 170 |
|
181 | 171 | # final cleanup |
182 | 172 | worker_running.clear() |
@@ -208,6 +198,5 @@ def return_health(): |
208 | 198 | app.run(host='0.0.0.0', port=8001, threaded=True) |
209 | 199 | finally: |
210 | 200 | # Graceful shutdown |
211 | | - stop_event.set() |
212 | 201 | if worker_thread: |
213 | 202 | worker_thread.join(timeout=10) |
0 commit comments