Skip to content

Commit 2b3c604

Browse files
authored
feat: Set expiry to Bgtask metadata id sets (#5736)
1 parent 9cf4cf3 commit 2b3c604

4 files changed

Lines changed: 27 additions & 6 deletions

File tree

changes/5736.feature.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Set expiry to set records of Bgtask metadata ids

src/ai/backend/common/bgtask/bgtask.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -460,7 +460,11 @@ async def _heartbeat_loop(self) -> None:
460460
for task_id, bg_task in self._ongoing_tasks.items():
461461
if not bg_task.done():
462462
alive_task_ids.append(task_id)
463-
await self._valkey_client.heartbeat(alive_task_ids)
463+
await self._valkey_client.heartbeat(
464+
alive_task_ids,
465+
server_id=self._server_id,
466+
tags=self._tags,
467+
)
464468
except Exception as e:
465469
log.exception("Exception in heartbeat loop: {}", e)
466470
await asyncio.sleep(_HEARTBEAT_INTERVAL)

src/ai/backend/common/clients/valkey_client/valkey_bgtask/client.py

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -118,9 +118,11 @@ async def register_task(self, metadata: BackgroundTaskMetadata) -> None:
118118
for tag in metadata.tags:
119119
tag_key = self._get_tag_key(tag)
120120
batch.sadd(tag_key, [metadata.task_id.hex])
121+
batch.expire(tag_key, TASK_METADATA_TTL)
121122

122123
server_key = self._get_server_key(metadata.server_id)
123124
batch.sadd(server_key, [metadata.task_id.hex])
125+
batch.expire(server_key, TASK_METADATA_TTL)
124126
await self._client.client.exec(batch, raise_on_error=True)
125127

126128
@valkey_decorator()
@@ -240,14 +242,28 @@ def _resolve_script_result(
240242
return result_type, metadata
241243

242244
@valkey_decorator()
243-
async def heartbeat(self, task_ids: Collection[TaskID]) -> None:
245+
async def heartbeat(
246+
self,
247+
task_ids: Collection[TaskID],
248+
server_id: Optional[str],
249+
tags: Collection[str],
250+
) -> None:
244251
"""
245252
Extend TTL to 24 hours for active tasks. Non-existent tasks are ignored.
246253
"""
247-
if not task_ids:
254+
keys: list[str] = []
255+
256+
task_keys = [self._get_task_key(task_id) for task_id in task_ids]
257+
keys.extend(task_keys)
258+
if server_id is not None:
259+
server_key = self._get_server_key(server_id)
260+
keys.append(server_key)
261+
tag_keys = [self._get_tag_key(tag) for tag in tags]
262+
keys.extend(tag_keys)
263+
264+
if not keys:
248265
return
249266
batch = self._create_batch()
250-
keys = [self._get_task_key(task_id) for task_id in task_ids]
251267
for key in keys:
252268
batch.expire(key, TASK_METADATA_TTL)
253269
await self._client.client.exec(batch, raise_on_error=True)
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
1-
TASK_METADATA_TTL = 7200 # 2 hours
2-
TASK_TTL_THRESHOLD = 6900 # 1 hour 55 minutes
1+
TASK_METADATA_TTL = 60 * 60 * 24 # 24 hours
2+
TASK_TTL_THRESHOLD = TASK_METADATA_TTL - 60 * 5 # 5 minutes margin

0 commit comments

Comments
 (0)