diff --git a/enterprise/litellm_enterprise/proxy/common_utils/check_responses_cost.py b/enterprise/litellm_enterprise/proxy/common_utils/check_responses_cost.py index 54fbc7abcc5..dc0168683c8 100644 --- a/enterprise/litellm_enterprise/proxy/common_utils/check_responses_cost.py +++ b/enterprise/litellm_enterprise/proxy/common_utils/check_responses_cost.py @@ -11,6 +11,7 @@ from litellm.constants import ( MANAGED_OBJECT_STALENESS_CUTOFF_DAYS, MAX_OBJECTS_PER_POLL_CYCLE, + STALE_OBJECT_CLEANUP_BATCH_SIZE, ) if TYPE_CHECKING: @@ -32,21 +33,49 @@ def __init__( self.prisma_client: PrismaClient = prisma_client self.llm_router: Router = llm_router + async def _expire_stale_rows( + self, cutoff: datetime, batch_size: int + ) -> int: + """Execute the bounded UPDATE that marks stale rows as 'stale_expired'. + + Isolated so it can be swapped / mocked in tests without touching the + orchestration logic in ``_cleanup_stale_managed_objects``. + + Uses PostgreSQL syntax (``$1::timestamptz``, ``LIMIT``, double-quoted + identifiers) which is the only dialect the proxy supports — every + ``schema.prisma`` in the repo sets ``provider = "postgresql"``. + Same pattern as ``spend_log_cleanup.py``. + """ + return await self.prisma_client.db.execute_raw( + """ + UPDATE "LiteLLM_ManagedObjectTable" + SET "status" = 'stale_expired' + WHERE "id" IN ( + SELECT "id" FROM "LiteLLM_ManagedObjectTable" + WHERE "file_purpose" = 'response' + AND "status" NOT IN ('completed', 'complete', 'failed', 'expired', 'cancelled', 'stale_expired') + AND "created_at" < $1::timestamptz + ORDER BY "created_at" ASC + LIMIT $2 + ) + """, + cutoff, + batch_size, + ) + async def _cleanup_stale_managed_objects(self) -> None: """ Mark managed objects older than MANAGED_OBJECT_STALENESS_CUTOFF_DAYS days in non-terminal states as 'stale_expired'. These will never complete and should not be polled. + + Runs as a single DB query with a subquery LIMIT so no rows are loaded + into Python memory. Processes at most STALE_OBJECT_CLEANUP_BATCH_SIZE + rows per invocation to avoid overwhelming the DB when there is a large + backlog. """ cutoff = datetime.now(timezone.utc) - timedelta(days=MANAGED_OBJECT_STALENESS_CUTOFF_DAYS) - result = await self.prisma_client.db.litellm_managedobjecttable.update_many( - where={ - "file_purpose": "response", - "status": {"not_in": ["completed", "complete", "failed", "expired", "cancelled", "stale_expired"]}, - "created_at": {"lt": cutoff}, - }, - data={"status": "stale_expired"}, - ) + result = await self._expire_stale_rows(cutoff, STALE_OBJECT_CLEANUP_BATCH_SIZE) if result > 0: verbose_proxy_logger.warning( f"CheckResponsesCost: marked {result} stale managed objects " diff --git a/litellm/constants.py b/litellm/constants.py index 1af53b2dae0..28c6c0cc0e3 100644 --- a/litellm/constants.py +++ b/litellm/constants.py @@ -1367,6 +1367,9 @@ MANAGED_OBJECT_STALENESS_CUTOFF_DAYS = max( 1, int(os.getenv("MANAGED_OBJECT_STALENESS_CUTOFF_DAYS", 7)) ) +STALE_OBJECT_CLEANUP_BATCH_SIZE = max( + 1, int(os.getenv("STALE_OBJECT_CLEANUP_BATCH_SIZE", 1000)) +) # Set PROXY_BATCH_POLLING_ENABLED=false to disable the CheckBatchCost and # CheckResponsesCost background polling jobs entirely (e.g. to avoid DB load on # installations with large numbers of stale managed objects).