Skip to content

Commit 9b6c762

Browse files
rkuo-danswerRichard Kuo (Onyx)
and
Richard Kuo (Onyx)
authored
Bugfix/cloud checkpoint cleanup (#4478)
* use send_task to be consistent * add pidbox monitoring task * add logging so we can track the task execution * log the idletime of the pidbox --------- Co-authored-by: Richard Kuo (Onyx) <[email protected]>
1 parent 634d990 commit 9b6c762

File tree

5 files changed

+86
-8
lines changed

5 files changed

+86
-8
lines changed

Diff for: backend/onyx/background/celery/tasks/beat_schedule.py

+11-1
Original file line numberDiff line numberDiff line change
@@ -179,14 +179,24 @@ def make_cloud_generator_task(task: dict[str, Any]) -> dict[str, Any]:
179179
},
180180
{
181181
"name": f"{ONYX_CLOUD_CELERY_TASK_PREFIX}_check-available-tenants",
182-
"task": OnyxCeleryTask.CHECK_AVAILABLE_TENANTS,
182+
"task": OnyxCeleryTask.CLOUD_CHECK_AVAILABLE_TENANTS,
183183
"schedule": timedelta(minutes=10),
184184
"options": {
185185
"queue": OnyxCeleryQueues.MONITORING,
186186
"priority": OnyxCeleryPriority.HIGH,
187187
"expires": BEAT_EXPIRES_DEFAULT,
188188
},
189189
},
190+
{
191+
"name": f"{ONYX_CLOUD_CELERY_TASK_PREFIX}_monitor-celery-pidbox",
192+
"task": OnyxCeleryTask.CLOUD_MONITOR_CELERY_PIDBOX,
193+
"schedule": timedelta(hours=4),
194+
"options": {
195+
"queue": OnyxCeleryQueues.MONITORING,
196+
"priority": OnyxCeleryPriority.HIGH,
197+
"expires": BEAT_EXPIRES_DEFAULT,
198+
},
199+
},
190200
]
191201

192202
# tasks that only run self hosted

Diff for: backend/onyx/background/celery/tasks/indexing/tasks.py

+20-5
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
from onyx.configs.constants import CELERY_INDEXING_LOCK_TIMEOUT
4545
from onyx.configs.constants import CELERY_INDEXING_WATCHDOG_CONNECTOR_TIMEOUT
4646
from onyx.configs.constants import CELERY_TASK_WAIT_FOR_FENCE_TIMEOUT
47+
from onyx.configs.constants import OnyxCeleryPriority
4748
from onyx.configs.constants import OnyxCeleryQueues
4849
from onyx.configs.constants import OnyxCeleryTask
4950
from onyx.configs.constants import OnyxRedisConstants
@@ -1234,8 +1235,9 @@ def connector_indexing_proxy_task(
12341235
@shared_task(
12351236
name=OnyxCeleryTask.CHECK_FOR_CHECKPOINT_CLEANUP,
12361237
soft_time_limit=300,
1238+
bind=True,
12371239
)
1238-
def check_for_checkpoint_cleanup(*, tenant_id: str) -> None:
1240+
def check_for_checkpoint_cleanup(self: Task, *, tenant_id: str) -> None:
12391241
"""Clean up old checkpoints that are older than 7 days."""
12401242
locked = False
12411243
redis_client = get_redis_client(tenant_id=tenant_id)
@@ -1256,14 +1258,15 @@ def check_for_checkpoint_cleanup(*, tenant_id: str) -> None:
12561258
task_logger.info(
12571259
f"Cleaning up checkpoint for index attempt {attempt.id}"
12581260
)
1259-
cleanup_checkpoint_task.apply_async(
1261+
self.app.send_task(
1262+
OnyxCeleryTask.CLEANUP_CHECKPOINT,
12601263
kwargs={
12611264
"index_attempt_id": attempt.id,
12621265
"tenant_id": tenant_id,
12631266
},
12641267
queue=OnyxCeleryQueues.CHECKPOINT_CLEANUP,
1268+
priority=OnyxCeleryPriority.MEDIUM,
12651269
)
1266-
12671270
except Exception:
12681271
task_logger.exception("Unexpected exception during checkpoint cleanup")
12691272
return None
@@ -1287,5 +1290,17 @@ def cleanup_checkpoint_task(
12871290
self: Task, *, index_attempt_id: int, tenant_id: str | None
12881291
) -> None:
12891292
"""Clean up a checkpoint for a given index attempt"""
1290-
with get_session_with_current_tenant() as db_session:
1291-
cleanup_checkpoint(db_session, index_attempt_id)
1293+
1294+
start = time.monotonic()
1295+
1296+
try:
1297+
with get_session_with_current_tenant() as db_session:
1298+
cleanup_checkpoint(db_session, index_attempt_id)
1299+
finally:
1300+
elapsed = time.monotonic() - start
1301+
1302+
task_logger.info(
1303+
f"cleanup_checkpoint_task completed: tenant_id={tenant_id} "
1304+
f"index_attempt_id={index_attempt_id} "
1305+
f"elapsed={elapsed:.2f}"
1306+
)

Diff for: backend/onyx/background/celery/tasks/monitoring/tasks.py

+48
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from datetime import timedelta
55
from itertools import islice
66
from typing import Any
7+
from typing import cast
78
from typing import Literal
89

910
import psutil
@@ -998,3 +999,50 @@ def monitor_process_memory(self: Task, *, tenant_id: str) -> None:
998999

9991000
except Exception:
10001001
task_logger.exception("Error in monitor_process_memory task")
1002+
1003+
1004+
@shared_task(
1005+
name=OnyxCeleryTask.CLOUD_MONITOR_CELERY_PIDBOX, ignore_result=True, bind=True
1006+
)
1007+
def cloud_monitor_celery_pidbox(
1008+
self: Task,
1009+
) -> None:
1010+
"""
1011+
Celery can leave behind orphaned pidboxes from old workers that are idle and never cleaned up.
1012+
This task removes them based on idle time to avoid Redis clutter and overflowing the instance.
1013+
This is a real issue we've observed in production.
1014+
1015+
Note:
1016+
- Setting CELERY_ENABLE_REMOTE_CONTROL = False would prevent pidbox keys entirely,
1017+
but might also disable features like inspect, broadcast, and worker remote control.
1018+
Use with caution.
1019+
"""
1020+
1021+
num_deleted = 0
1022+
1023+
MAX_PIDBOX_IDLE = 24 * 3600 # 1 day in seconds
1024+
r_celery: Redis = self.app.broker_connection().channel().client # type: ignore
1025+
for key in r_celery.scan_iter("*.reply.celery.pidbox"):
1026+
key_bytes = cast(bytes, key)
1027+
key_str = key_bytes.decode("utf-8")
1028+
if key_str.startswith("_kombu"):
1029+
continue
1030+
1031+
idletime_raw = r_celery.object("idletime", key)
1032+
if idletime_raw is None:
1033+
continue
1034+
1035+
idletime = cast(int, idletime_raw)
1036+
if idletime < MAX_PIDBOX_IDLE:
1037+
continue
1038+
1039+
r_celery.delete(key)
1040+
task_logger.info(
1041+
f"Deleted idle pidbox: pidbox={key_str} "
1042+
f"idletime={idletime} "
1043+
f"max_idletime={MAX_PIDBOX_IDLE}"
1044+
)
1045+
num_deleted += 1
1046+
1047+
# Enable later in case we want some aggregate metrics
1048+
# task_logger.info(f"Deleted idle pidbox: pidbox={key_str}")

Diff for: backend/onyx/background/celery/tasks/tenant_provisioning/tasks.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535

3636

3737
@shared_task(
38-
name=OnyxCeleryTask.CHECK_AVAILABLE_TENANTS,
38+
name=OnyxCeleryTask.CLOUD_CHECK_AVAILABLE_TENANTS,
3939
queue=OnyxCeleryQueues.MONITORING,
4040
ignore_result=True,
4141
soft_time_limit=JOB_TIMEOUT,

Diff for: backend/onyx/configs/constants.py

+6-1
Original file line numberDiff line numberDiff line change
@@ -398,7 +398,12 @@ class OnyxCeleryTask:
398398
CLOUD_MONITOR_CELERY_QUEUES = (
399399
f"{ONYX_CLOUD_CELERY_TASK_PREFIX}_monitor_celery_queues"
400400
)
401-
CHECK_AVAILABLE_TENANTS = f"{ONYX_CLOUD_CELERY_TASK_PREFIX}_check_available_tenants"
401+
CLOUD_CHECK_AVAILABLE_TENANTS = (
402+
f"{ONYX_CLOUD_CELERY_TASK_PREFIX}_check_available_tenants"
403+
)
404+
CLOUD_MONITOR_CELERY_PIDBOX = (
405+
f"{ONYX_CLOUD_CELERY_TASK_PREFIX}_monitor_celery_pidbox"
406+
)
402407

403408
# Tenant pre-provisioning
404409
PRE_PROVISION_TENANT = f"{ONYX_CLOUD_CELERY_TASK_PREFIX}_pre_provision_tenant"

0 commit comments

Comments
 (0)