-
Notifications
You must be signed in to change notification settings - Fork 28
Open
Description
Description
When using ListRedisScheduleSource with schedule_by_interval(), the schedule data is stored in Redis (schedule:data:{id}) but the schedule is never executed because it's not added to any index list.
Root Cause
In ListRedisScheduleSource.add_schedule(), only cron and time schedules are indexed:
async def add_schedule(self, schedule: "ScheduledTask") -> None:
async with Redis(connection_pool=self._connection_pool) as redis:
await redis.set(
f"{self._prefix}:data:{schedule.schedule_id}",
self._serializer.dumpb(model_dump(schedule)),
)
if schedule.cron is not None:
await redis.rpush(self._get_cron_key(), schedule.schedule_id)
elif schedule.time is not None:
await redis.rpush(self._get_time_key(schedule.time), schedule.schedule_id)
# ⚠️ schedule.interval is NOT handled - data is saved but never indexedSimilarly, get_schedules() only retrieves schedules from schedule:cron and schedule:time:{datetime} lists.
Steps to Reproduce
from datetime import timedelta
from taskiq_redis import ListRedisScheduleSource
source = ListRedisScheduleSource(url="redis://localhost:6379/0")
@broker.task
async def my_task():
pass
# This creates schedule:data:{id} but NOT schedule:interval or similar
schedule = await my_task.schedule_by_interval(
source=source,
interval=timedelta(seconds=300),
)
# Returns empty list - schedule is lost
schedules = await source.get_schedules() # []Expected Behavior
schedule_by_interval should work with ListRedisScheduleSource, either by:
- Adding support for
intervalschedules (e.g.,schedule:intervallist)
Workaround
Use schedule_by_cron() instead of schedule_by_interval():
# Convert interval to cron expression
# 300 seconds = */5 * * * *
schedule = await my_task.schedule_by_cron(
source=source,
cron="*/5 * * * *",
)However, cron cannot go below minutes, for example, every 10 seconds.
Environment
- taskiq-redis version: 1.2.0
- Python version: 3.13.1
Metadata
Metadata
Assignees
Labels
No labels