Skip to content

Commit 7ac2e3b

Browse files
authored
fix(get_recurringtasks_to_process): Add last_picked_at (#37)
* fix(get_recurringtasks_to_process): Add last_picked_at Closes: #35 * make last_picked_at nullable * remove task_runs var
1 parent 3adfffa commit 7ac2e3b

File tree

4 files changed

+101
-0
lines changed

4 files changed

+101
-0
lines changed
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
# Generated by Django 4.2.18 on 2025-04-03 07:34
2+
3+
from django.db import migrations, models
4+
import django.utils.timezone
5+
import os
6+
from common.migrations.helpers import PostgresOnlyRunSQL
7+
8+
9+
class Migration(migrations.Migration):
10+
11+
dependencies = [
12+
("task_processor", "0012_add_locked_at_and_timeout"),
13+
]
14+
15+
operations = [
16+
migrations.AddField(
17+
model_name="recurringtask",
18+
name="last_picked_at",
19+
field=models.DateTimeField(blank=True, null=True),
20+
preserve_default=False,
21+
),
22+
PostgresOnlyRunSQL.from_sql_file(
23+
os.path.join(
24+
os.path.dirname(__file__),
25+
"sql",
26+
"0013_get_recurringtasks_to_process.sql",
27+
),
28+
reverse_sql=os.path.join(
29+
os.path.dirname(__file__),
30+
"sql",
31+
"0012_get_recurringtasks_to_process.sql",
32+
),
33+
),
34+
]
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
CREATE OR REPLACE FUNCTION get_recurringtasks_to_process()
2+
RETURNS SETOF task_processor_recurringtask AS $$
3+
DECLARE
4+
row_to_return task_processor_recurringtask;
5+
BEGIN
6+
-- Select the tasks that needs to be processed
7+
FOR row_to_return IN
8+
SELECT *
9+
FROM task_processor_recurringtask
10+
-- Add one minute to the timeout as a grace period for overhead
11+
WHERE is_locked = FALSE OR (locked_at IS NOT NULL AND locked_at < NOW() - timeout + INTERVAL '1 minute')
12+
ORDER BY last_picked_at NULLS FIRST
13+
LIMIT 1
14+
-- Select for update to ensure that no other workers can select these tasks while in this transaction block
15+
FOR UPDATE SKIP LOCKED
16+
LOOP
17+
-- Lock every selected task(by updating `is_locked` to true)
18+
UPDATE task_processor_recurringtask
19+
-- Lock this row by setting is_locked True, so that no other workers can select these tasks after this
20+
-- transaction is complete (but the tasks are still being executed by the current worker)
21+
SET is_locked = TRUE, locked_at = NOW(), last_picked_at = NOW()
22+
WHERE id = row_to_return.id;
23+
-- If we don't explicitly update the columns here, the client will receive a row
24+
-- that is locked but still shows `is_locked` as `False` and `locked_at` as `None`.
25+
row_to_return.is_locked := TRUE;
26+
row_to_return.locked_at := NOW();
27+
RETURN NEXT row_to_return;
28+
END LOOP;
29+
30+
RETURN;
31+
END;
32+
$$ LANGUAGE plpgsql
33+

src/task_processor/models.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,7 @@ class RecurringTask(AbstractBaseTask):
163163
locked_at = models.DateTimeField(blank=True, null=True)
164164
timeout = models.DurationField(default=timedelta(minutes=30))
165165

166+
last_picked_at = models.DateTimeField(blank=True, null=True)
166167
objects: RecurringTaskManager = RecurringTaskManager()
167168

168169
class Meta:

tests/unit/task_processor/test_unit_task_processor_processor.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -303,6 +303,39 @@ def _dummy_recurring_task() -> None:
303303
assert task_run.error_details is None
304304

305305

306+
@pytest.mark.django_db(transaction=True)
307+
def test_run_recurring_tasks_loops_over_all_tasks(
308+
db: None,
309+
run_by_processor: None,
310+
) -> None:
311+
# Given, Three recurring tasks
312+
@register_recurring_task(run_every=timedelta(milliseconds=200))
313+
def _dummy_recurring_task_1() -> None:
314+
pass
315+
316+
@register_recurring_task(run_every=timedelta(milliseconds=200))
317+
def _dummy_recurring_task_2() -> None:
318+
pass
319+
320+
@register_recurring_task(run_every=timedelta(milliseconds=200))
321+
def _dummy_recurring_task_3() -> None:
322+
pass
323+
324+
initialise()
325+
326+
# When, we call run_recurring_tasks in a loop few times
327+
for _ in range(4):
328+
run_recurring_tasks()
329+
330+
# Then - we should have exactly one RecurringTaskRun for each task
331+
for i in range(1, 4):
332+
task = RecurringTask.objects.get(
333+
task_identifier=f"test_unit_task_processor_processor._dummy_recurring_task_{i}",
334+
)
335+
336+
assert RecurringTaskRun.objects.filter(task=task).count() == 1
337+
338+
306339
def test_run_recurring_tasks_only_executes_tasks_after_interval_set_by_run_every(
307340
db: None,
308341
run_by_processor: None,

0 commit comments

Comments
 (0)