Skip to content

Commit 793d056

Browse files
committed
fix(backend/executor): Make executor continuously running and retrying message consumption (#9999)
The executor can sometimes become dangling due to the executor stopping executing messages but the process is not fully killed. This PR avoids such a scenario by simply keeping retrying it. ### Changes 🏗️ Introduced continuous_retry decorator and use it to executor message consumption/ ### Checklist 📋 #### For code changes: - [x] I have clearly listed my changes in the PR description - [x] I have made a test plan - [x] I have tested my changes according to the test plan: <!-- Put your test plan here: --> - [x] Run executor service and execute some agents.
1 parent 5e7b66d commit 793d056

File tree

2 files changed

+43
-19
lines changed

2 files changed

+43
-19
lines changed

autogpt_platform/backend/backend/executor/manager.py

Lines changed: 21 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@
6767
from backend.util.file import clean_exec_files
6868
from backend.util.logging import TruncatedLogger, configure_logging
6969
from backend.util.process import AppProcess, set_service_name
70-
from backend.util.retry import func_retry
70+
from backend.util.retry import continuous_retry, func_retry
7171
from backend.util.service import get_service_client
7272
from backend.util.settings import Settings
7373

@@ -938,8 +938,6 @@ def __init__(self):
938938
self.pool_size = settings.config.num_graph_workers
939939
self.running = True
940940
self.active_graph_runs: dict[str, tuple[Future, threading.Event]] = {}
941-
signal.signal(signal.SIGTERM, lambda sig, frame: self._on_sigterm())
942-
signal.signal(signal.SIGINT, lambda sig, frame: self._on_sigterm())
943941

944942
def run(self):
945943
pool_size_gauge.set(self.pool_size)
@@ -965,22 +963,29 @@ def _run(self):
965963
logger.info(f"[{self.service_name}] ⏳ Connecting to Redis...")
966964
redis.connect()
967965

966+
threading.Thread(
967+
target=lambda: self._consume_execution_cancel(),
968+
daemon=True,
969+
).start()
970+
971+
self._consume_execution_run()
972+
973+
@continuous_retry()
974+
def _consume_execution_cancel(self):
968975
cancel_client = SyncRabbitMQ(create_execution_queue_config())
969976
cancel_client.connect()
970977
cancel_channel = cancel_client.get_channel()
971978
logger.info(f"[{self.service_name}] ⏳ Starting cancel message consumer...")
972-
threading.Thread(
973-
target=lambda: (
974-
cancel_channel.basic_consume(
975-
queue=GRAPH_EXECUTION_CANCEL_QUEUE_NAME,
976-
on_message_callback=self._handle_cancel_message,
977-
auto_ack=True,
978-
),
979-
cancel_channel.start_consuming(),
980-
),
981-
daemon=True,
982-
).start()
979+
cancel_channel.basic_consume(
980+
queue=GRAPH_EXECUTION_CANCEL_QUEUE_NAME,
981+
on_message_callback=self._handle_cancel_message,
982+
auto_ack=True,
983+
)
984+
cancel_channel.start_consuming()
985+
raise RuntimeError(f"❌ cancel message consumer is stopped: {cancel_channel}")
983986

987+
@continuous_retry()
988+
def _consume_execution_run(self):
984989
run_client = SyncRabbitMQ(create_execution_queue_config())
985990
run_client.connect()
986991
run_channel = run_client.get_channel()
@@ -992,6 +997,7 @@ def _run(self):
992997
)
993998
logger.info(f"[{self.service_name}] ⏳ Starting to consume run messages...")
994999
run_channel.start_consuming()
1000+
raise RuntimeError(f"❌ run message consumer is stopped: {run_channel}")
9951001

9961002
def _handle_cancel_message(
9971003
self,
@@ -1090,10 +1096,6 @@ def cleanup(self):
10901096
super().cleanup()
10911097
self._on_cleanup()
10921098

1093-
def _on_sigterm(self):
1094-
llprint(f"[{self.service_name}] ⚠️ GraphExec SIGTERM received")
1095-
self._on_cleanup(log=llprint)
1096-
10971099
def _on_cleanup(self, log=logger.info):
10981100
prefix = f"[{self.service_name}][on_graph_executor_stop {os.getpid()}]"
10991101
log(f"{prefix} ⏳ Shutting down service loop...")
@@ -1110,7 +1112,7 @@ def _on_cleanup(self, log=logger.info):
11101112
redis.disconnect()
11111113

11121114
log(f"{prefix} ✅ Finished GraphExec cleanup")
1113-
exit(0)
1115+
sys.exit(0)
11141116

11151117

11161118
# ------- UTILITIES ------- #

autogpt_platform/backend/backend/util/retry.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import logging
33
import os
44
import threading
5+
import time
56
from functools import wraps
67
from uuid import uuid4
78

@@ -80,3 +81,24 @@ async def async_wrapper(*args, **kwargs):
8081
stop=stop_after_attempt(5),
8182
wait=wait_exponential(multiplier=1, min=1, max=30),
8283
)
84+
85+
86+
def continuous_retry(*, retry_delay: float = 1.0):
87+
def decorator(func):
88+
@wraps(func)
89+
def wrapper(*args, **kwargs):
90+
while True:
91+
try:
92+
return func(*args, **kwargs)
93+
except Exception as exc:
94+
logger.exception(
95+
"%s failed with %s — retrying in %.2f s",
96+
func.__name__,
97+
exc,
98+
retry_delay,
99+
)
100+
time.sleep(retry_delay)
101+
102+
return wrapper
103+
104+
return decorator

0 commit comments

Comments
 (0)