Skip to content

Commit 82cf0bc

Browse files
authored
refactor(backend): Clear out Notification Service code blockage (#9915)
Some of the code paths in the notification & scheduler service were synchronous HTTP calls that execute a long-running job that blocks. This makes the service threads busy waiting. ### Changes 🏗️ * Remove queue_notification API * Remove DTO * Move heavy tasks intothe executor <!-- Concisely describe all of the changes made in this pull request: --> ### Checklist 📋 #### For code changes: - [x] I have clearly listed my changes in the PR description - [x] I have made a test plan - [x] I have tested my changes according to the test plan: <!-- Put your test plan here: --> - [x] Manually executing notification service jobs through the scheduler API
1 parent 089e7aa commit 82cf0bc

File tree

4 files changed

+160
-172
lines changed

4 files changed

+160
-172
lines changed

autogpt_platform/backend/backend/data/credit.py

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
from typing import Any, cast
66

77
import stripe
8-
from autogpt_libs.utils.cache import thread_cached
98
from prisma import Json
109
from prisma.enums import (
1110
CreditRefundRequestStatus,
@@ -32,14 +31,13 @@
3231
TransactionHistory,
3332
UserTransaction,
3433
)
35-
from backend.data.notifications import NotificationEventDTO, RefundRequestData
34+
from backend.data.notifications import NotificationEventModel, RefundRequestData
3635
from backend.data.user import get_user_by_id, get_user_email_by_id
37-
from backend.notifications import NotificationManagerClient
36+
from backend.notifications.notifications import queue_notification_async
3837
from backend.server.model import Pagination
3938
from backend.server.v2.admin.model import UserHistoryResponse
4039
from backend.util.exceptions import InsufficientBalanceError
4140
from backend.util.retry import func_retry
42-
from backend.util.service import get_service_client
4341
from backend.util.settings import Settings
4442

4543
settings = Settings()
@@ -374,20 +372,17 @@ async def _add_transaction(
374372

375373

376374
class UserCredit(UserCreditBase):
377-
@thread_cached
378-
def notification_client(self) -> NotificationManagerClient:
379-
return get_service_client(NotificationManagerClient)
380375

381376
async def _send_refund_notification(
382377
self,
383378
notification_request: RefundRequestData,
384379
notification_type: NotificationType,
385380
):
386-
await self.notification_client().queue_notification_async(
387-
NotificationEventDTO(
381+
await queue_notification_async(
382+
NotificationEventModel(
388383
user_id=notification_request.user_id,
389384
type=notification_type,
390-
data=notification_request.model_dump(),
385+
data=notification_request,
391386
)
392387
)
393388

autogpt_platform/backend/backend/data/notifications.py

Lines changed: 3 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -189,26 +189,14 @@ class RefundRequestData(BaseNotificationData):
189189
]
190190

191191

192-
class NotificationEventDTO(BaseModel):
193-
user_id: str
192+
class BaseEventModel(BaseModel):
194193
type: NotificationType
195-
data: dict
196-
created_at: datetime = Field(default_factory=lambda: datetime.now(tz=timezone.utc))
197-
retry_count: int = 0
198-
199-
200-
class SummaryParamsEventDTO(BaseModel):
201194
user_id: str
202-
type: NotificationType
203-
data: dict
204195
created_at: datetime = Field(default_factory=lambda: datetime.now(tz=timezone.utc))
205196

206197

207-
class NotificationEventModel(BaseModel, Generic[NotificationDataType_co]):
208-
user_id: str
209-
type: NotificationType
198+
class NotificationEventModel(BaseEventModel, Generic[NotificationDataType_co]):
210199
data: NotificationDataType_co
211-
created_at: datetime = Field(default_factory=lambda: datetime.now(tz=timezone.utc))
212200

213201
@property
214202
def strategy(self) -> QueueType:
@@ -225,11 +213,8 @@ def template(self) -> str:
225213
return NotificationTypeOverride(self.type).template
226214

227215

228-
class SummaryParamsEventModel(BaseModel, Generic[SummaryParamsType_co]):
229-
user_id: str
230-
type: NotificationType
216+
class SummaryParamsEventModel(BaseEventModel, Generic[SummaryParamsType_co]):
231217
data: SummaryParamsType_co
232-
created_at: datetime = Field(default_factory=lambda: datetime.now(tz=timezone.utc))
233218

234219

235220
def get_notif_data_type(

autogpt_platform/backend/backend/executor/manager.py

Lines changed: 18 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -23,16 +23,16 @@
2323
from backend.data.notifications import (
2424
AgentRunData,
2525
LowBalanceData,
26-
NotificationEventDTO,
26+
NotificationEventModel,
2727
NotificationType,
2828
)
2929
from backend.data.rabbitmq import SyncRabbitMQ
3030
from backend.executor.utils import create_execution_queue_config
31+
from backend.notifications.notifications import queue_notification
3132
from backend.util.exceptions import InsufficientBalanceError
3233

3334
if TYPE_CHECKING:
3435
from backend.executor import DatabaseManagerClient
35-
from backend.notifications.notifications import NotificationManagerClient
3636

3737
from autogpt_libs.utils.cache import thread_cached
3838
from prometheus_client import Gauge, start_http_server
@@ -580,7 +580,6 @@ def on_graph_executor_start(cls):
580580
cls.db_client = get_db_client()
581581
cls.pool_size = settings.config.num_node_workers
582582
cls.pid = os.getpid()
583-
cls.notification_service = get_notification_service()
584583
cls._init_node_executor_pool()
585584
logger.info(f"GraphExec {cls.pid} started with {cls.pool_size} node workers")
586585

@@ -905,21 +904,21 @@ def _handle_agent_run_notif(
905904
for output in outputs
906905
]
907906

908-
event = NotificationEventDTO(
909-
user_id=graph_exec.user_id,
910-
type=NotificationType.AGENT_RUN,
911-
data=AgentRunData(
912-
outputs=named_outputs,
913-
agent_name=metadata.name if metadata else "Unknown Agent",
914-
credits_used=exec_stats.cost,
915-
execution_time=exec_stats.walltime,
916-
graph_id=graph_exec.graph_id,
917-
node_count=exec_stats.node_count,
918-
).model_dump(),
907+
queue_notification(
908+
NotificationEventModel(
909+
user_id=graph_exec.user_id,
910+
type=NotificationType.AGENT_RUN,
911+
data=AgentRunData(
912+
outputs=named_outputs,
913+
agent_name=metadata.name if metadata else "Unknown Agent",
914+
credits_used=exec_stats.cost,
915+
execution_time=exec_stats.walltime,
916+
graph_id=graph_exec.graph_id,
917+
node_count=exec_stats.node_count,
918+
),
919+
)
919920
)
920921

921-
cls.notification_service.queue_notification(event)
922-
923922
@classmethod
924923
def _handle_low_balance_notif(
925924
cls,
@@ -933,16 +932,16 @@ def _handle_low_balance_notif(
933932
base_url = (
934933
settings.config.frontend_base_url or settings.config.platform_base_url
935934
)
936-
cls.notification_service.queue_notification(
937-
NotificationEventDTO(
935+
queue_notification(
936+
NotificationEventModel(
938937
user_id=user_id,
939938
type=NotificationType.LOW_BALANCE,
940939
data=LowBalanceData(
941940
current_balance=exec_stats.cost,
942941
billing_page_link=f"{base_url}/profile/credits",
943942
shortfall=shortfall,
944943
agent_name=metadata.name if metadata else "Unknown Agent",
945-
).model_dump(),
944+
),
946945
)
947946
)
948947

@@ -1139,14 +1138,6 @@ def get_db_client() -> "DatabaseManagerClient":
11391138
return get_service_client(DatabaseManagerClient, health_check=False)
11401139

11411140

1142-
@thread_cached
1143-
def get_notification_service() -> "NotificationManagerClient":
1144-
from backend.notifications import NotificationManagerClient
1145-
1146-
# Disable health check for the service client to avoid breaking process initializer.
1147-
return get_service_client(NotificationManagerClient, health_check=False)
1148-
1149-
11501141
def send_execution_update(entry: GraphExecution | NodeExecutionResult | None):
11511142
if entry is None:
11521143
return

0 commit comments

Comments
 (0)