Skip to content

Commit 7fedb5e

Browse files
authored
refactor(backend): Un-share resource initializations from AppService + Remove Pyro (#9750)
This is a prerequisite infra change for #9714. We will need a service where we can maintain our own client (db, redis, rabbitmq, be it async/sync) and configure our own cadence of initialization and cleanup. While refactoring the service.py, an option to use Pyro as an RPC protocol is also removed. ### Changes 🏗️ * Decouple resource initialization and cleanup from the parent AppService logic. * Removed Pyro. ### Checklist 📋 #### For code changes: - [x] I have clearly listed my changes in the PR description - [x] I have made a test plan - [x] I have tested my changes according to the test plan: <!-- Put your test plan here: --> - [x] CI
1 parent d316ed2 commit 7fedb5e

File tree

12 files changed

+76
-349
lines changed

12 files changed

+76
-349
lines changed

autogpt_platform/backend/backend/executor/database.py

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
import logging
2+
3+
from backend.data import db, redis
14
from backend.data.credit import UsageTransactionMetadata, get_user_credit_model
25
from backend.data.execution import (
36
GraphExecution,
@@ -44,6 +47,7 @@
4447

4548
config = Config()
4649
_user_credit_model = get_user_credit_model()
50+
logger = logging.getLogger(__name__)
4751

4852

4953
async def _spend_credits(
@@ -55,10 +59,22 @@ async def _spend_credits(
5559
class DatabaseManager(AppService):
5660
def __init__(self):
5761
super().__init__()
58-
self.use_db = True
59-
self.use_redis = True
6062
self.execution_event_bus = RedisExecutionEventBus()
6163

64+
def run_service(self) -> None:
65+
logger.info(f"[{self.service_name}] ⏳ Connecting to Database...")
66+
self.run_and_wait(db.connect())
67+
logger.info(f"[{self.service_name}] ⏳ Connecting to Redis...")
68+
redis.connect()
69+
super().run_service()
70+
71+
def cleanup(self):
72+
super().cleanup()
73+
logger.info(f"[{self.service_name}] ⏳ Disconnecting Redis...")
74+
redis.disconnect()
75+
logger.info(f"[{self.service_name}] ⏳ Disconnecting Database...")
76+
self.run_and_wait(db.disconnect())
77+
6278
@classmethod
6379
def get_port(cls) -> int:
6480
return config.database_api_port

autogpt_platform/backend/backend/executor/manager.py

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -930,8 +930,6 @@ def _handle_low_balance_notif(
930930
class ExecutionManager(AppService):
931931
def __init__(self):
932932
super().__init__()
933-
self.use_redis = True
934-
self.use_supabase = True
935933
self.pool_size = settings.config.num_graph_workers
936934
self.queue = ExecutionQueue[GraphExecutionEntry]()
937935
self.active_graph_runs: dict[str, tuple[Future, threading.Event]] = {}
@@ -944,14 +942,17 @@ def run_service(self):
944942
from backend.integrations.credentials_store import IntegrationCredentialsStore
945943

946944
self.credentials_store = IntegrationCredentialsStore()
945+
946+
logger.info(f"[{self.service_name}] ⏳ Spawn max-{self.pool_size} workers...")
947947
self.executor = ProcessPoolExecutor(
948948
max_workers=self.pool_size,
949949
initializer=Executor.on_graph_executor_start,
950950
)
951+
952+
logger.info(f"[{self.service_name}] ⏳ Connecting to Redis...")
953+
redis.connect()
954+
951955
sync_manager = multiprocessing.Manager()
952-
logger.info(
953-
f"[{self.service_name}] Started with max-{self.pool_size} graph workers"
954-
)
955956
while True:
956957
graph_exec_data = self.queue.get()
957958
graph_exec_id = graph_exec_data.graph_exec_id
@@ -968,10 +969,13 @@ def run_service(self):
968969
)
969970

970971
def cleanup(self):
971-
logger.info(f"[{__class__.__name__}] ⏳ Shutting down graph executor pool...")
972+
super().cleanup()
973+
974+
logger.info(f"[{self.service_name}] ⏳ Shutting down graph executor pool...")
972975
self.executor.shutdown(cancel_futures=True)
973976

974-
super().cleanup()
977+
logger.info(f"[{self.service_name}] ⏳ Disconnecting Redis...")
978+
redis.disconnect()
975979

976980
@property
977981
def db_client(self) -> "DatabaseManager":

autogpt_platform/backend/backend/executor/scheduler.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,12 @@ def run_service(self):
206206
self.scheduler.add_listener(job_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
207207
self.scheduler.start()
208208

209+
def cleanup(self):
210+
super().cleanup()
211+
logger.info(f"[{self.service_name}] ⏳ Shutting down scheduler...")
212+
if self.scheduler:
213+
self.scheduler.shutdown(wait=False)
214+
209215
@expose
210216
def add_execution_schedule(
211217
self,

autogpt_platform/backend/backend/notifications/notifications.py

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from prisma.enums import NotificationType
1010
from pydantic import BaseModel
1111

12+
from backend.data import rabbitmq
1213
from backend.data.notifications import (
1314
BaseSummaryData,
1415
BaseSummaryParams,
@@ -128,6 +129,20 @@ def __init__(self):
128129
self.running = True
129130
self.email_sender = EmailSender()
130131

132+
@property
133+
def rabbit(self) -> rabbitmq.AsyncRabbitMQ:
134+
"""Access the RabbitMQ service. Will raise if not configured."""
135+
if not self.rabbitmq_service:
136+
raise RuntimeError("RabbitMQ not configured for this service")
137+
return self.rabbitmq_service
138+
139+
@property
140+
def rabbit_config(self) -> rabbitmq.RabbitMQConfig:
141+
"""Access the RabbitMQ config. Will raise if not configured."""
142+
if not self.rabbitmq_config:
143+
raise RuntimeError("RabbitMQ not configured for this service")
144+
return self.rabbitmq_config
145+
131146
@classmethod
132147
def get_port(cls) -> int:
133148
return settings.config.notification_service_port
@@ -688,10 +703,14 @@ def _run_queue(
688703
)
689704

690705
def run_service(self):
706+
logger.info(f"[{self.service_name}] ⏳ Configuring RabbitMQ...")
707+
self.rabbitmq_service = rabbitmq.AsyncRabbitMQ(self.rabbitmq_config)
708+
self.run_and_wait(self.rabbitmq_service.connect())
709+
691710
logger.info(f"[{self.service_name}] Started notification service")
692711

693712
# Set up scheduler for batch processing of all notification types
694-
# this can be changed later to spawn differnt cleanups on different schedules
713+
# this can be changed later to spawn different cleanups on different schedules
695714
try:
696715
get_scheduler().add_batched_notification_schedule(
697716
notification_types=list(NotificationType),
@@ -753,3 +772,5 @@ def cleanup(self):
753772
"""Cleanup service resources"""
754773
self.running = False
755774
super().cleanup()
775+
logger.info(f"[{self.service_name}] ⏳ Disconnecting RabbitMQ...")
776+
self.run_and_wait(self.rabbitmq_service.disconnect())

autogpt_platform/backend/backend/server/rest_api.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,10 @@ def run(self):
145145
log_config=generate_uvicorn_config(),
146146
)
147147

148+
def cleanup(self):
149+
super().cleanup()
150+
logger.info(f"[{self.service_name}] ⏳ Shutting down Agent Server...")
151+
148152
@staticmethod
149153
async def test_execute_graph(
150154
graph_id: str,

autogpt_platform/backend/backend/server/ws_api.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
from fastapi import Depends, FastAPI, WebSocket, WebSocketDisconnect
1111
from starlette.middleware.cors import CORSMiddleware
1212

13-
from backend.data import redis
1413
from backend.data.execution import AsyncRedisExecutionEventBus
1514
from backend.data.user import DEFAULT_USER_ID
1615
from backend.server.conn_manager import ConnectionManager
@@ -56,15 +55,12 @@ def get_db_client():
5655

5756
async def event_broadcaster(manager: ConnectionManager):
5857
try:
59-
redis.connect()
6058
event_queue = AsyncRedisExecutionEventBus()
6159
async for event in event_queue.listen("*"):
6260
await manager.send_execution_update(event)
6361
except Exception as e:
6462
logger.exception(f"Event broadcaster error: {e}")
6563
raise
66-
finally:
67-
redis.disconnect()
6864

6965

7066
async def authenticate_websocket(websocket: WebSocket) -> str:
@@ -294,3 +290,7 @@ def run(self):
294290
port=Config().websocket_server_port,
295291
log_config=generate_uvicorn_config(),
296292
)
293+
294+
def cleanup(self):
295+
super().cleanup()
296+
logger.info(f"[{self.service_name}] ⏳ Shutting down WebSocket Server...")

autogpt_platform/backend/backend/util/process.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ def run(self):
4848
def service_name(cls) -> str:
4949
return cls.__name__
5050

51+
@abstractmethod
5152
def cleanup(self):
5253
"""
5354
Implement this method on a subclass to do post-execution cleanup,

0 commit comments

Comments
 (0)