88
99
1010Encoder = Callable [[Any ], bytes ]
11+ MAX_TTL = 30 * 24 * 3600
1112
1213
1314class RedisJobTracker :
@@ -17,7 +18,12 @@ class RedisJobTracker:
1718 key <prefix>{task_id}
1819 and keeps a Set <prefix>__all with the ids for quick listing.
1920 """
20- def __init__ (self , url : str = None , prefix : str = 'job:' ) -> None :
21+ def __init__ (
22+ self ,
23+ url : str = None ,
24+ prefix : str = 'job:' ,
25+ ttl_seconds : int = 30 * 24 * 3600 , # 30 days
26+ ) -> None :
2127 self ._url = url or CACHE_URL
2228 self ._redis : redis .Redis = redis .from_url (
2329 self ._url ,
@@ -28,6 +34,7 @@ def __init__(self, url: str = None, prefix: str = 'job:') -> None:
2834 self ._decoder : Encoder = self ._decode_model
2935 self .prefix = prefix if prefix .endswith (":" ) else f"{ prefix } :"
3036 self ._lock = asyncio .Lock ()
37+ self ._ttl = min (ttl_seconds , MAX_TTL )
3138
3239 def _key (self , task_id : str ) -> str :
3340 return f"{ self .prefix } { task_id } "
@@ -49,7 +56,9 @@ async def create_job(self, job: JobRecord, **kwargs) -> JobRecord:
4956 ) from exc
5057 key = self ._key (job .task_id )
5158 async with self ._lock :
52- await self ._redis .set (key , self ._encoder (job ))
59+ await self ._redis .set (
60+ key , self ._encoder (job ), ex = self ._ttl
61+ )
5362 await self ._redis .sadd (self ._set_key , job .task_id )
5463
5564 # Create secondary index for attributes if provided
@@ -75,7 +84,9 @@ async def _update(self, job_id: str, **patch) -> None:
7584 rec : JobRecord = self ._decoder (payload )
7685 for k , v in patch .items ():
7786 setattr (rec , k , v )
78- await self ._redis .set (key , self ._encoder (rec ))
87+ await self ._redis .set (
88+ key , self ._encoder (rec ), keepttl = True
89+ ) # keep the TTL
7990
8091 # Update secondary index for attributes if they are part of the patch
8192 if 'attributes' in patch :
@@ -178,7 +189,11 @@ async def forget(self, job_id: str) -> None:
178189 if payload :
179190 rec : JobRecord = self ._decoder (payload )
180191 for k , v in rec .attributes .items ():
181- await self ._redis .srem (self ._attr_key (k , v ), job_id )
192+ akey = self ._attr_key (k , v )
193+ await self ._redis .srem (akey , job_id )
194+ # set attr-set to expire in 24 h if now empty
195+ if await self ._redis .scard (akey ) == 0 :
196+ await self ._redis .expire (akey , 86400 )
182197
183198 await self ._redis .delete (self ._key (job_id ))
184199 await self ._redis .srem (self ._set_key , job_id )
0 commit comments