Is your feature request related to a problem? Please describe.
After sub-issues 01 and 02 land, an operation can transition to pending with a next_retry_at set, but nothing reads that timestamp or dispatches the actual retry. Without the pipeline I'm building here, pending ops just sit in the database forever - persistence is non-functional.
I want two new Celery tasks: a Beat-driven scanner that finds due pending ops every 10 minutes, and a per-op worker that flips the status back to in-progress and re-dispatches the existing upgrade_firmware task. The worker is the idempotency point - concurrent calls from Beat and the monitoring signal (sub-issue 05) can't double-fire the same op. I'm using the database as the schedule rather than Celery's eta=, because ETA tasks sit in the broker for the whole retry window (memory bloat), get lost on Redis restart, and aren't queryable from the admin.
Describe the solution I would implement
I would like to introduce the database-driven periodic retry pipeline and the worker that dispatches individual retries idempotently.
-
Add retry_pending_upgrade(operation_id) as a @shared_task(base=OpenwispCeleryTask) in tasks.py:
- Atomic compare-and-swap:
UpgradeOperation.objects.filter(pk=operation_id, status="pending").update(status="in-progress"). If the update returns 0 (another worker already grabbed it, or it was cancelled), bail out silently.
- Call
log_line() with the current retry_count so the operation's log reflects the retry lineage.
-
After the status flip, check device.is_deactivated() (already used elsewhere at api/views.py:290):
- If the device was deactivated mid-pending, set
status="failed" and log the reason. Sub-issue 06's post_save handler picks up the pending → failed transition and fires the failure-needs-attention notification. No firmware dispatch.
- Otherwise dispatch
upgrade_firmware.delay(op.pk). The existing per-device pipeline takes over from there - succeeds, fails terminally, or cycles back through sub-issue 02's persistent branch with retry_count bumped.
-
Add check_pending_upgrades() as a periodic task:
- Query
UpgradeOperation.objects.filter(status="pending", next_retry_at__lte=timezone.now()).values_list("pk", flat=True). Fast because of sub-issue 01's db_index=True on next_retry_at, and the index stays naturally sparse since next_retry_at is only set on ops that have entered pending.
- For each due op, dispatch
retry_pending_upgrade.apply_async(args=[pk], countdown=random.uniform(0, jitter)) so a batch of simultaneously-due retries gets spread across the jitter window instead of slamming the broker in one burst.
-
Two configurable settings via the existing getattr(settings, "OPENWISP_FIRMWARE_UPGRADER_*", default) pattern:
| Setting |
Default |
Purpose |
..._CHECK_PENDING_UPGRADES_PERIOD |
600 (10 min) |
Beat scan cadence - coarse because retry deltas are hours |
..._PERSISTENT_RETRY_DISPATCH_JITTER |
300 (5 min) |
Max countdown to smooth dispatch bursts |
-
Register check_pending_upgrades in the test settings' CELERY_BEAT_SCHEDULE so the test suite exercises the wiring deployers will use. The docs ticket (10) covers the production-side CELERY_BEAT_SCHEDULE snippet.
-
Unit tests covering:
- No pending ops → no dispatch.
- Mixed past/future
next_retry_at → only past-due dispatched, countdowns in [0, jitter].
- Happy path: status flips and
upgrade_firmware.delay fires.
- Raced-out (already claimed) → returns silently, no
upgrade_firmware call.
- Deactivated device → flips to
failed, no upgrade_firmware call.
- All tests run with
CELERY_TASK_ALWAYS_EAGER=True (matches the existing convention) and mock timezone.now() for time-dependent assertions.
Is your feature request related to a problem? Please describe.
After sub-issues 01 and 02 land, an operation can transition to
pendingwith anext_retry_atset, but nothing reads that timestamp or dispatches the actual retry. Without the pipeline I'm building here, pending ops just sit in the database forever - persistence is non-functional.I want two new Celery tasks: a Beat-driven scanner that finds due pending ops every 10 minutes, and a per-op worker that flips the status back to
in-progressand re-dispatches the existingupgrade_firmwaretask. The worker is the idempotency point - concurrent calls from Beat and the monitoring signal (sub-issue 05) can't double-fire the same op. I'm using the database as the schedule rather than Celery'seta=, because ETA tasks sit in the broker for the whole retry window (memory bloat), get lost on Redis restart, and aren't queryable from the admin.Describe the solution I would implement
I would like to introduce the database-driven periodic retry pipeline and the worker that dispatches individual retries idempotently.
Add
retry_pending_upgrade(operation_id)as a@shared_task(base=OpenwispCeleryTask)intasks.py:UpgradeOperation.objects.filter(pk=operation_id, status="pending").update(status="in-progress"). If the update returns 0 (another worker already grabbed it, or it was cancelled), bail out silently.log_line()with the currentretry_countso the operation's log reflects the retry lineage.After the status flip, check
device.is_deactivated()(already used elsewhere atapi/views.py:290):status="failed"and log the reason. Sub-issue 06's post_save handler picks up thepending → failedtransition and fires the failure-needs-attention notification. No firmware dispatch.upgrade_firmware.delay(op.pk). The existing per-device pipeline takes over from there - succeeds, fails terminally, or cycles back through sub-issue 02's persistent branch withretry_countbumped.Add
check_pending_upgrades()as a periodic task:UpgradeOperation.objects.filter(status="pending", next_retry_at__lte=timezone.now()).values_list("pk", flat=True). Fast because of sub-issue 01'sdb_index=Trueonnext_retry_at, and the index stays naturally sparse sincenext_retry_atis only set on ops that have enteredpending.retry_pending_upgrade.apply_async(args=[pk], countdown=random.uniform(0, jitter))so a batch of simultaneously-due retries gets spread across the jitter window instead of slamming the broker in one burst.Two configurable settings via the existing
getattr(settings, "OPENWISP_FIRMWARE_UPGRADER_*", default)pattern:..._CHECK_PENDING_UPGRADES_PERIOD..._PERSISTENT_RETRY_DISPATCH_JITTERRegister
check_pending_upgradesin the test settings'CELERY_BEAT_SCHEDULEso the test suite exercises the wiring deployers will use. The docs ticket (10) covers the production-sideCELERY_BEAT_SCHEDULEsnippet.Unit tests covering:
next_retry_at→ only past-due dispatched, countdowns in[0, jitter].upgrade_firmware.delayfires.upgrade_firmwarecall.failed, noupgrade_firmwarecall.CELERY_TASK_ALWAYS_EAGER=True(matches the existing convention) and mocktimezone.now()for time-dependent assertions.