Skip to content

Commit 54336fe

Browse files
author
Richard Kuo (Onyx)
committed
Merge branch 'main' of https://github.com/onyx-dot-app/onyx into bugfix/dependency-updates
2 parents bd79f68 + 9b6c762 commit 54336fe

File tree

7 files changed

+112
-15
lines changed

7 files changed

+112
-15
lines changed

Diff for: backend/ee/onyx/main.py

+19-5
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
from collections.abc import AsyncGenerator
2+
from contextlib import asynccontextmanager
3+
14
from fastapi import FastAPI
25
from httpx_oauth.clients.google import GoogleOAuth2
36
from httpx_oauth.clients.openid import BASE_SCOPES
@@ -44,21 +47,36 @@
4447
from onyx.main import get_application as get_application_base
4548
from onyx.main import include_auth_router_with_prefix
4649
from onyx.main import include_router_with_global_prefix_prepended
50+
from onyx.main import lifespan as lifespan_base
4751
from onyx.utils.logger import setup_logger
4852
from onyx.utils.variable_functionality import global_version
4953
from shared_configs.configs import MULTI_TENANT
5054

5155
logger = setup_logger()
5256

5357

58+
@asynccontextmanager
59+
async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
60+
"""Small wrapper around the lifespan of the MIT application.
61+
Basically just calls the base lifespan, and then adds EE-only
62+
steps after."""
63+
64+
async with lifespan_base(app):
65+
# seed the Onyx environment with LLMs, Assistants, etc. based on an optional
66+
# environment variable. Used to automate deployment for multiple environments.
67+
seed_db()
68+
69+
yield
70+
71+
5472
def get_application() -> FastAPI:
5573
# Anything that happens at import time is not guaranteed to be running ee-version
5674
# Anything after the server startup will be running ee version
5775
global_version.set_ee()
5876

5977
test_encryption()
6078

61-
application = get_application_base()
79+
application = get_application_base(lifespan_override=lifespan)
6280

6381
if MULTI_TENANT:
6482
add_tenant_id_middleware(application, logger)
@@ -166,10 +184,6 @@ def get_application() -> FastAPI:
166184
# Ensure all routes have auth enabled or are explicitly marked as public
167185
check_ee_router_auth(application)
168186

169-
# seed the Onyx environment with LLMs, Assistants, etc. based on an optional
170-
# environment variable. Used to automate deployment for multiple environments.
171-
seed_db()
172-
173187
# for debugging discovered routes
174188
# for route in application.router.routes:
175189
# print(f"Path: {route.path}, Methods: {route.methods}")

Diff for: backend/onyx/background/celery/tasks/beat_schedule.py

+11-1
Original file line numberDiff line numberDiff line change
@@ -179,14 +179,24 @@ def make_cloud_generator_task(task: dict[str, Any]) -> dict[str, Any]:
179179
},
180180
{
181181
"name": f"{ONYX_CLOUD_CELERY_TASK_PREFIX}_check-available-tenants",
182-
"task": OnyxCeleryTask.CHECK_AVAILABLE_TENANTS,
182+
"task": OnyxCeleryTask.CLOUD_CHECK_AVAILABLE_TENANTS,
183183
"schedule": timedelta(minutes=10),
184184
"options": {
185185
"queue": OnyxCeleryQueues.MONITORING,
186186
"priority": OnyxCeleryPriority.HIGH,
187187
"expires": BEAT_EXPIRES_DEFAULT,
188188
},
189189
},
190+
{
191+
"name": f"{ONYX_CLOUD_CELERY_TASK_PREFIX}_monitor-celery-pidbox",
192+
"task": OnyxCeleryTask.CLOUD_MONITOR_CELERY_PIDBOX,
193+
"schedule": timedelta(hours=4),
194+
"options": {
195+
"queue": OnyxCeleryQueues.MONITORING,
196+
"priority": OnyxCeleryPriority.HIGH,
197+
"expires": BEAT_EXPIRES_DEFAULT,
198+
},
199+
},
190200
]
191201

192202
# tasks that only run self hosted

Diff for: backend/onyx/background/celery/tasks/indexing/tasks.py

+20-5
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
from onyx.configs.constants import CELERY_INDEXING_LOCK_TIMEOUT
4545
from onyx.configs.constants import CELERY_INDEXING_WATCHDOG_CONNECTOR_TIMEOUT
4646
from onyx.configs.constants import CELERY_TASK_WAIT_FOR_FENCE_TIMEOUT
47+
from onyx.configs.constants import OnyxCeleryPriority
4748
from onyx.configs.constants import OnyxCeleryQueues
4849
from onyx.configs.constants import OnyxCeleryTask
4950
from onyx.configs.constants import OnyxRedisConstants
@@ -1234,8 +1235,9 @@ def connector_indexing_proxy_task(
12341235
@shared_task(
12351236
name=OnyxCeleryTask.CHECK_FOR_CHECKPOINT_CLEANUP,
12361237
soft_time_limit=300,
1238+
bind=True,
12371239
)
1238-
def check_for_checkpoint_cleanup(*, tenant_id: str) -> None:
1240+
def check_for_checkpoint_cleanup(self: Task, *, tenant_id: str) -> None:
12391241
"""Clean up old checkpoints that are older than 7 days."""
12401242
locked = False
12411243
redis_client = get_redis_client(tenant_id=tenant_id)
@@ -1256,14 +1258,15 @@ def check_for_checkpoint_cleanup(*, tenant_id: str) -> None:
12561258
task_logger.info(
12571259
f"Cleaning up checkpoint for index attempt {attempt.id}"
12581260
)
1259-
cleanup_checkpoint_task.apply_async(
1261+
self.app.send_task(
1262+
OnyxCeleryTask.CLEANUP_CHECKPOINT,
12601263
kwargs={
12611264
"index_attempt_id": attempt.id,
12621265
"tenant_id": tenant_id,
12631266
},
12641267
queue=OnyxCeleryQueues.CHECKPOINT_CLEANUP,
1268+
priority=OnyxCeleryPriority.MEDIUM,
12651269
)
1266-
12671270
except Exception:
12681271
task_logger.exception("Unexpected exception during checkpoint cleanup")
12691272
return None
@@ -1287,5 +1290,17 @@ def cleanup_checkpoint_task(
12871290
self: Task, *, index_attempt_id: int, tenant_id: str | None
12881291
) -> None:
12891292
"""Clean up a checkpoint for a given index attempt"""
1290-
with get_session_with_current_tenant() as db_session:
1291-
cleanup_checkpoint(db_session, index_attempt_id)
1293+
1294+
start = time.monotonic()
1295+
1296+
try:
1297+
with get_session_with_current_tenant() as db_session:
1298+
cleanup_checkpoint(db_session, index_attempt_id)
1299+
finally:
1300+
elapsed = time.monotonic() - start
1301+
1302+
task_logger.info(
1303+
f"cleanup_checkpoint_task completed: tenant_id={tenant_id} "
1304+
f"index_attempt_id={index_attempt_id} "
1305+
f"elapsed={elapsed:.2f}"
1306+
)

Diff for: backend/onyx/background/celery/tasks/monitoring/tasks.py

+48
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from datetime import timedelta
55
from itertools import islice
66
from typing import Any
7+
from typing import cast
78
from typing import Literal
89

910
import psutil
@@ -1000,3 +1001,50 @@ def monitor_process_memory(self: Task, *, tenant_id: str) -> None:
10001001

10011002
except Exception:
10021003
task_logger.exception("Error in monitor_process_memory task")
1004+
1005+
1006+
@shared_task(
1007+
name=OnyxCeleryTask.CLOUD_MONITOR_CELERY_PIDBOX, ignore_result=True, bind=True
1008+
)
1009+
def cloud_monitor_celery_pidbox(
1010+
self: Task,
1011+
) -> None:
1012+
"""
1013+
Celery can leave behind orphaned pidboxes from old workers that are idle and never cleaned up.
1014+
This task removes them based on idle time to avoid Redis clutter and overflowing the instance.
1015+
This is a real issue we've observed in production.
1016+
1017+
Note:
1018+
- Setting CELERY_ENABLE_REMOTE_CONTROL = False would prevent pidbox keys entirely,
1019+
but might also disable features like inspect, broadcast, and worker remote control.
1020+
Use with caution.
1021+
"""
1022+
1023+
num_deleted = 0
1024+
1025+
MAX_PIDBOX_IDLE = 24 * 3600 # 1 day in seconds
1026+
r_celery: Redis = self.app.broker_connection().channel().client # type: ignore
1027+
for key in r_celery.scan_iter("*.reply.celery.pidbox"):
1028+
key_bytes = cast(bytes, key)
1029+
key_str = key_bytes.decode("utf-8")
1030+
if key_str.startswith("_kombu"):
1031+
continue
1032+
1033+
idletime_raw = r_celery.object("idletime", key)
1034+
if idletime_raw is None:
1035+
continue
1036+
1037+
idletime = cast(int, idletime_raw)
1038+
if idletime < MAX_PIDBOX_IDLE:
1039+
continue
1040+
1041+
r_celery.delete(key)
1042+
task_logger.info(
1043+
f"Deleted idle pidbox: pidbox={key_str} "
1044+
f"idletime={idletime} "
1045+
f"max_idletime={MAX_PIDBOX_IDLE}"
1046+
)
1047+
num_deleted += 1
1048+
1049+
# Enable later in case we want some aggregate metrics
1050+
# task_logger.info(f"Deleted idle pidbox: pidbox={key_str}")

Diff for: backend/onyx/background/celery/tasks/tenant_provisioning/tasks.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535

3636

3737
@shared_task(
38-
name=OnyxCeleryTask.CHECK_AVAILABLE_TENANTS,
38+
name=OnyxCeleryTask.CLOUD_CHECK_AVAILABLE_TENANTS,
3939
queue=OnyxCeleryQueues.MONITORING,
4040
ignore_result=True,
4141
soft_time_limit=JOB_TIMEOUT,

Diff for: backend/onyx/configs/constants.py

+6-1
Original file line numberDiff line numberDiff line change
@@ -398,7 +398,12 @@ class OnyxCeleryTask:
398398
CLOUD_MONITOR_CELERY_QUEUES = (
399399
f"{ONYX_CLOUD_CELERY_TASK_PREFIX}_monitor_celery_queues"
400400
)
401-
CHECK_AVAILABLE_TENANTS = f"{ONYX_CLOUD_CELERY_TASK_PREFIX}_check_available_tenants"
401+
CLOUD_CHECK_AVAILABLE_TENANTS = (
402+
f"{ONYX_CLOUD_CELERY_TASK_PREFIX}_check_available_tenants"
403+
)
404+
CLOUD_MONITOR_CELERY_PIDBOX = (
405+
f"{ONYX_CLOUD_CELERY_TASK_PREFIX}_monitor_celery_pidbox"
406+
)
402407

403408
# Tenant pre-provisioning
404409
PRE_PROVISION_TENANT = f"{ONYX_CLOUD_CELERY_TASK_PREFIX}_pre_provision_tenant"

Diff for: backend/onyx/main.py

+7-2
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
from sentry_sdk.integrations.fastapi import FastApiIntegration
2222
from sentry_sdk.integrations.starlette import StarletteIntegration
2323
from sqlalchemy.orm import Session
24+
from starlette.types import Lifespan
2425

2526
from onyx import __version__
2627
from onyx.auth.schemas import UserCreate
@@ -275,8 +276,12 @@ def log_http_error(request: Request, exc: Exception) -> JSONResponse:
275276
)
276277

277278

278-
def get_application() -> FastAPI:
279-
application = FastAPI(title="Onyx Backend", version=__version__, lifespan=lifespan)
279+
def get_application(lifespan_override: Lifespan | None = None) -> FastAPI:
280+
application = FastAPI(
281+
title="Onyx Backend",
282+
version=__version__,
283+
lifespan=lifespan_override or lifespan,
284+
)
280285
if SENTRY_DSN:
281286
sentry_sdk.init(
282287
dsn=SENTRY_DSN,

0 commit comments

Comments
 (0)