Skip to content

Commit fa70414

Browse files
jhfclaude
andcommitted
fix: Prevent stale pipeline_progress rows for child tasks
Step A in process_tasks inserted pipeline_progress rows for ALL analytics tasks, including children. Concurrent child cleanup (step D) has a race condition where each sibling sees others as still 'processing', so none deletes the orphaned row. Guard step A to top-level tasks only — child progress is already tracked via parent's total/completed counters. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent fc6a2e4 commit fa70414

File tree

3 files changed

+808
-29
lines changed

3 files changed

+808
-29
lines changed
Lines changed: 379 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,379 @@
1+
-- Down Migration 20260224131241: fix_stale_pipeline_progress_for_child_tasks
2+
-- Restores original process_tasks that inserts pipeline_progress for all analytics tasks
3+
BEGIN;
4+
5+
CREATE OR REPLACE PROCEDURE worker.process_tasks(IN p_batch_size integer DEFAULT NULL::integer, IN p_max_runtime_ms integer DEFAULT NULL::integer, IN p_queue text DEFAULT NULL::text, IN p_max_priority bigint DEFAULT NULL::bigint, IN p_mode worker.process_mode DEFAULT NULL::worker.process_mode)
6+
LANGUAGE plpgsql
7+
AS $procedure$
8+
DECLARE
9+
task_record RECORD;
10+
start_time TIMESTAMPTZ;
11+
batch_start_time TIMESTAMPTZ;
12+
elapsed_ms NUMERIC;
13+
processed_count INT := 0;
14+
v_inside_transaction BOOLEAN;
15+
v_waiting_parent_id BIGINT;
16+
-- Retry-on-deadlock configuration
17+
v_max_retries CONSTANT INT := 3;
18+
v_retry_count INT;
19+
v_backoff_base_ms CONSTANT NUMERIC := 100; -- 100ms base backoff
20+
BEGIN
21+
-- Check if we're inside a transaction
22+
SELECT pg_current_xact_id_if_assigned() IS NOT NULL INTO v_inside_transaction;
23+
RAISE DEBUG 'Running worker.process_tasks inside transaction: %, queue: %, mode: %', v_inside_transaction, p_queue, COALESCE(p_mode::text, 'NULL');
24+
25+
batch_start_time := clock_timestamp();
26+
27+
-- Process tasks in a loop until we hit limits or run out of tasks
28+
LOOP
29+
-- Check for time limit
30+
IF p_max_runtime_ms IS NOT NULL AND
31+
EXTRACT(EPOCH FROM (clock_timestamp() - batch_start_time)) * 1000 > p_max_runtime_ms THEN
32+
RAISE DEBUG 'Exiting worker loop: Time limit of % ms reached', p_max_runtime_ms;
33+
EXIT;
34+
END IF;
35+
36+
-- STRUCTURED CONCURRENCY: Check if there's a waiting parent task
37+
-- If so, we're in concurrent mode and should pick its children
38+
SELECT t.id INTO v_waiting_parent_id
39+
FROM worker.tasks AS t
40+
JOIN worker.command_registry AS cr ON t.command = cr.command
41+
WHERE t.state = 'waiting'::worker.task_state
42+
AND (p_queue IS NULL OR cr.queue = p_queue)
43+
ORDER BY t.priority, t.id
44+
LIMIT 1;
45+
46+
-- MODE-SPECIFIC BEHAVIOR
47+
IF p_mode = 'top' THEN
48+
-- TOP MODE: Only process top-level tasks
49+
-- If a waiting parent exists, children need processing first - return immediately
50+
IF v_waiting_parent_id IS NOT NULL THEN
51+
RAISE DEBUG 'Top mode: waiting parent % exists, returning to let children process', v_waiting_parent_id;
52+
EXIT;
53+
END IF;
54+
55+
-- Pick a top-level pending task (parent_id IS NULL)
56+
RAISE DEBUG 'Top mode: picking top-level task';
57+
58+
SELECT t.*, cr.handler_procedure, cr.before_procedure, cr.after_procedure, cr.queue
59+
INTO task_record
60+
FROM worker.tasks AS t
61+
JOIN worker.command_registry AS cr ON t.command = cr.command
62+
WHERE t.state = 'pending'::worker.task_state
63+
AND t.parent_id IS NULL
64+
AND (t.scheduled_at IS NULL OR t.scheduled_at <= clock_timestamp())
65+
AND (p_queue IS NULL OR cr.queue = p_queue)
66+
AND (p_max_priority IS NULL OR t.priority <= p_max_priority)
67+
ORDER BY
68+
CASE WHEN t.scheduled_at IS NULL THEN 0 ELSE 1 END,
69+
t.scheduled_at,
70+
t.priority ASC NULLS LAST,
71+
t.id
72+
LIMIT 1
73+
FOR UPDATE OF t SKIP LOCKED;
74+
75+
ELSIF p_mode = 'child' THEN
76+
-- CHILD MODE: Only process children of waiting parents
77+
-- If no waiting parent exists, there's nothing for children to do - return immediately
78+
IF v_waiting_parent_id IS NULL THEN
79+
RAISE DEBUG 'Child mode: no waiting parent, returning';
80+
EXIT;
81+
END IF;
82+
83+
-- Pick a pending child of the waiting parent
84+
RAISE DEBUG 'Child mode: picking child of waiting parent %', v_waiting_parent_id;
85+
86+
SELECT t.*, cr.handler_procedure, cr.before_procedure, cr.after_procedure, cr.queue
87+
INTO task_record
88+
FROM worker.tasks AS t
89+
JOIN worker.command_registry AS cr ON t.command = cr.command
90+
WHERE t.state = 'pending'::worker.task_state
91+
AND t.parent_id = v_waiting_parent_id
92+
AND (t.scheduled_at IS NULL OR t.scheduled_at <= clock_timestamp())
93+
AND (p_max_priority IS NULL OR t.priority <= p_max_priority)
94+
ORDER BY t.priority ASC NULLS LAST, t.id
95+
LIMIT 1
96+
FOR UPDATE OF t SKIP LOCKED;
97+
98+
ELSE
99+
-- NULL MODE (backward compatible): Original behavior
100+
-- Pick children if waiting parent exists, otherwise top-level task
101+
IF v_waiting_parent_id IS NOT NULL THEN
102+
-- CONCURRENT MODE: Pick a pending child of the waiting parent
103+
RAISE DEBUG 'Concurrent mode: picking child of waiting parent %', v_waiting_parent_id;
104+
105+
SELECT t.*, cr.handler_procedure, cr.before_procedure, cr.after_procedure, cr.queue
106+
INTO task_record
107+
FROM worker.tasks AS t
108+
JOIN worker.command_registry AS cr ON t.command = cr.command
109+
WHERE t.state = 'pending'::worker.task_state
110+
AND t.parent_id = v_waiting_parent_id
111+
AND (t.scheduled_at IS NULL OR t.scheduled_at <= clock_timestamp())
112+
AND (p_max_priority IS NULL OR t.priority <= p_max_priority)
113+
ORDER BY t.priority ASC NULLS LAST, t.id
114+
LIMIT 1
115+
FOR UPDATE OF t SKIP LOCKED;
116+
ELSE
117+
-- SERIAL MODE: Pick a top-level pending task (parent_id IS NULL)
118+
RAISE DEBUG 'Serial mode: picking top-level task';
119+
120+
SELECT t.*, cr.handler_procedure, cr.before_procedure, cr.after_procedure, cr.queue
121+
INTO task_record
122+
FROM worker.tasks AS t
123+
JOIN worker.command_registry AS cr ON t.command = cr.command
124+
WHERE t.state = 'pending'::worker.task_state
125+
AND t.parent_id IS NULL
126+
AND (t.scheduled_at IS NULL OR t.scheduled_at <= clock_timestamp())
127+
AND (p_queue IS NULL OR cr.queue = p_queue)
128+
AND (p_max_priority IS NULL OR t.priority <= p_max_priority)
129+
ORDER BY
130+
CASE WHEN t.scheduled_at IS NULL THEN 0 ELSE 1 END,
131+
t.scheduled_at,
132+
t.priority ASC NULLS LAST,
133+
t.id
134+
LIMIT 1
135+
FOR UPDATE OF t SKIP LOCKED;
136+
END IF;
137+
END IF;
138+
139+
-- Exit if no more tasks
140+
IF NOT FOUND THEN
141+
RAISE DEBUG 'Exiting worker loop: No more pending tasks found';
142+
EXIT;
143+
END IF;
144+
145+
-- Process the task
146+
start_time := clock_timestamp();
147+
148+
-- Mark as processing and record the current backend PID
149+
UPDATE worker.tasks AS t
150+
SET state = 'processing'::worker.task_state,
151+
worker_pid = pg_backend_pid()
152+
WHERE t.id = task_record.id;
153+
154+
-- PIPELINE PROGRESS (A): Track task start for analytics queue
155+
IF task_record.queue = 'analytics' THEN
156+
INSERT INTO worker.pipeline_progress (step, total, completed, updated_at)
157+
VALUES (task_record.command, 0, 0, clock_timestamp())
158+
ON CONFLICT (step) DO NOTHING;
159+
END IF;
160+
161+
-- Call before_procedure if defined
162+
IF task_record.before_procedure IS NOT NULL THEN
163+
BEGIN
164+
RAISE DEBUG 'Calling before_procedure: % for task % (%)', task_record.before_procedure, task_record.id, task_record.command;
165+
EXECUTE format('CALL %s()', task_record.before_procedure);
166+
EXCEPTION WHEN OTHERS THEN
167+
RAISE WARNING 'Error in before_procedure % for task %: %', task_record.before_procedure, task_record.id, SQLERRM;
168+
END;
169+
END IF;
170+
171+
-- Commit to see state change (only if not in a test transaction)
172+
IF NOT v_inside_transaction THEN
173+
COMMIT;
174+
END IF;
175+
176+
DECLARE
177+
v_state worker.task_state;
178+
v_processed_at TIMESTAMPTZ;
179+
v_completed_at TIMESTAMPTZ;
180+
v_duration_ms NUMERIC;
181+
v_error TEXT DEFAULT NULL;
182+
v_has_children BOOLEAN;
183+
v_child_count INT;
184+
BEGIN
185+
-- Initialize retry counter for this task
186+
v_retry_count := 0;
187+
188+
-- RETRY LOOP for handling transient errors (deadlocks, serialization failures)
189+
<<retry_loop>>
190+
LOOP
191+
DECLARE
192+
v_message_text TEXT;
193+
v_pg_exception_detail TEXT;
194+
v_pg_exception_hint TEXT;
195+
v_pg_exception_context TEXT;
196+
BEGIN
197+
-- Execute the handler procedure
198+
IF task_record.handler_procedure IS NOT NULL THEN
199+
EXECUTE format('CALL %s($1)', task_record.handler_procedure)
200+
USING task_record.payload;
201+
ELSE
202+
RAISE EXCEPTION 'No handler procedure found for command: %', task_record.command;
203+
END IF;
204+
205+
-- Handler completed successfully - exit retry loop
206+
elapsed_ms := EXTRACT(EPOCH FROM (clock_timestamp() - start_time)) * 1000;
207+
v_processed_at := clock_timestamp();
208+
v_duration_ms := elapsed_ms;
209+
210+
-- STRUCTURED CONCURRENCY: Check if handler spawned children
211+
SELECT EXISTS (
212+
SELECT 1 FROM worker.tasks WHERE parent_id = task_record.id
213+
) INTO v_has_children;
214+
215+
IF v_has_children THEN
216+
-- Task spawned children: go to 'waiting' state
217+
v_state := 'waiting'::worker.task_state;
218+
v_completed_at := NULL; -- Not completed yet, waiting for children
219+
220+
-- PIPELINE PROGRESS (B): Parent going to waiting, track child count
221+
IF task_record.queue = 'analytics' THEN
222+
SELECT count(*)::int INTO v_child_count
223+
FROM worker.tasks WHERE parent_id = task_record.id;
224+
225+
UPDATE worker.pipeline_progress
226+
SET total = total + v_child_count,
227+
updated_at = clock_timestamp()
228+
WHERE step = task_record.command;
229+
END IF;
230+
231+
RAISE DEBUG 'Task % (%) spawned % children, entering waiting state', task_record.id, task_record.command, v_child_count;
232+
ELSE
233+
-- No children: task is completed
234+
v_state := 'completed'::worker.task_state;
235+
v_completed_at := clock_timestamp();
236+
RAISE DEBUG 'Task % (%) completed in % ms', task_record.id, task_record.command, elapsed_ms;
237+
END IF;
238+
239+
EXIT retry_loop; -- Success, exit the retry loop
240+
241+
EXCEPTION
242+
WHEN deadlock_detected THEN
243+
-- DEADLOCK: Retry with exponential backoff
244+
v_retry_count := v_retry_count + 1;
245+
IF v_retry_count <= v_max_retries THEN
246+
RAISE WARNING 'Task % (%) deadlock detected, retry %/% after % ms',
247+
task_record.id, task_record.command, v_retry_count, v_max_retries,
248+
round(v_backoff_base_ms * power(2, v_retry_count - 1));
249+
-- Exponential backoff with jitter
250+
PERFORM pg_sleep((v_backoff_base_ms * power(2, v_retry_count - 1) + (random() * 50)) / 1000.0);
251+
CONTINUE retry_loop; -- Retry the task
252+
ELSE
253+
-- Max retries exceeded - fall through to error handling
254+
RAISE WARNING 'Task % (%) max retries (%) exceeded for deadlock',
255+
task_record.id, task_record.command, v_max_retries;
256+
END IF;
257+
258+
-- Capture error details for failed task
259+
elapsed_ms := EXTRACT(EPOCH FROM (clock_timestamp() - start_time)) * 1000;
260+
v_state := 'failed'::worker.task_state;
261+
v_processed_at := clock_timestamp();
262+
v_completed_at := clock_timestamp();
263+
v_duration_ms := elapsed_ms;
264+
v_error := format('Deadlock detected after %s retries', v_retry_count);
265+
EXIT retry_loop;
266+
267+
WHEN serialization_failure THEN
268+
-- SERIALIZATION FAILURE: Retry with exponential backoff
269+
v_retry_count := v_retry_count + 1;
270+
IF v_retry_count <= v_max_retries THEN
271+
RAISE WARNING 'Task % (%) serialization failure, retry %/% after % ms',
272+
task_record.id, task_record.command, v_retry_count, v_max_retries,
273+
round(v_backoff_base_ms * power(2, v_retry_count - 1));
274+
PERFORM pg_sleep((v_backoff_base_ms * power(2, v_retry_count - 1) + (random() * 50)) / 1000.0);
275+
CONTINUE retry_loop;
276+
ELSE
277+
RAISE WARNING 'Task % (%) max retries (%) exceeded for serialization failure',
278+
task_record.id, task_record.command, v_max_retries;
279+
END IF;
280+
281+
elapsed_ms := EXTRACT(EPOCH FROM (clock_timestamp() - start_time)) * 1000;
282+
v_state := 'failed'::worker.task_state;
283+
v_processed_at := clock_timestamp();
284+
v_completed_at := clock_timestamp();
285+
v_duration_ms := elapsed_ms;
286+
v_error := format('Serialization failure after %s retries', v_retry_count);
287+
EXIT retry_loop;
288+
289+
WHEN OTHERS THEN
290+
-- OTHER ERRORS: Don't retry, fail immediately
291+
elapsed_ms := EXTRACT(EPOCH FROM (clock_timestamp() - start_time)) * 1000;
292+
v_state := 'failed'::worker.task_state;
293+
v_processed_at := clock_timestamp();
294+
v_completed_at := clock_timestamp();
295+
v_duration_ms := elapsed_ms;
296+
297+
GET STACKED DIAGNOSTICS
298+
v_message_text = MESSAGE_TEXT,
299+
v_pg_exception_detail = PG_EXCEPTION_DETAIL,
300+
v_pg_exception_hint = PG_EXCEPTION_HINT,
301+
v_pg_exception_context = PG_EXCEPTION_CONTEXT;
302+
303+
v_error := format(
304+
'Error: %s%sContext: %s%sDetail: %s%sHint: %s',
305+
v_message_text, E'\n',
306+
v_pg_exception_context, E'\n',
307+
COALESCE(v_pg_exception_detail, ''), E'\n',
308+
COALESCE(v_pg_exception_hint, '')
309+
);
310+
311+
RAISE WARNING 'Task % (%) failed in % ms: %', task_record.id, task_record.command, elapsed_ms, v_error;
312+
EXIT retry_loop;
313+
END;
314+
END LOOP retry_loop;
315+
316+
-- Update the task with results
317+
UPDATE worker.tasks AS t
318+
SET state = v_state,
319+
processed_at = v_processed_at,
320+
completed_at = v_completed_at,
321+
duration_ms = v_duration_ms,
322+
error = v_error
323+
WHERE t.id = task_record.id;
324+
325+
-- PIPELINE PROGRESS (D): Clean up completed leaf tasks
326+
IF task_record.queue = 'analytics' AND v_state IN ('completed', 'failed') AND NOT v_has_children THEN
327+
DELETE FROM worker.pipeline_progress
328+
WHERE step = task_record.command
329+
AND NOT EXISTS (
330+
SELECT 1 FROM worker.tasks
331+
WHERE command = task_record.command
332+
AND state IN ('pending', 'processing', 'waiting')
333+
AND id != task_record.id
334+
);
335+
END IF;
336+
337+
-- STRUCTURED CONCURRENCY: For test transactions, check parent inline
338+
-- (all changes are visible within the same transaction)
339+
IF v_inside_transaction AND task_record.parent_id IS NOT NULL AND v_state IN ('completed', 'failed') THEN
340+
PERFORM worker.complete_parent_if_ready(task_record.id);
341+
END IF;
342+
343+
-- Call after_procedure if defined
344+
IF task_record.after_procedure IS NOT NULL THEN
345+
BEGIN
346+
RAISE DEBUG 'Calling after_procedure: % for task % (%)', task_record.after_procedure, task_record.id, task_record.command;
347+
EXECUTE format('CALL %s()', task_record.after_procedure);
348+
EXCEPTION WHEN OTHERS THEN
349+
RAISE WARNING 'Error in after_procedure % for task %: %', task_record.after_procedure, task_record.id, SQLERRM;
350+
END;
351+
END IF;
352+
353+
-- Commit if not in transaction
354+
IF NOT v_inside_transaction THEN
355+
COMMIT;
356+
END IF;
357+
358+
-- STRUCTURED CONCURRENCY: Post-commit parent check (RACE CONDITION FIX)
359+
-- After COMMIT, a new READ COMMITTED snapshot sees all sibling completions.
360+
-- The last fiber to reach this point will see all siblings as completed and
361+
-- will complete the parent. If two fibers reach it simultaneously, both try
362+
-- UPDATE ... WHERE state = 'waiting' — one succeeds, the other matches 0 rows.
363+
IF NOT v_inside_transaction AND task_record.parent_id IS NOT NULL AND v_state IN ('completed', 'failed') THEN
364+
PERFORM worker.complete_parent_if_ready(task_record.id);
365+
COMMIT;
366+
END IF;
367+
END;
368+
369+
-- Increment processed count and check batch limit
370+
processed_count := processed_count + 1;
371+
IF p_batch_size IS NOT NULL AND processed_count >= p_batch_size THEN
372+
RAISE DEBUG 'Exiting worker loop: Batch size limit of % reached', p_batch_size;
373+
EXIT;
374+
END IF;
375+
END LOOP;
376+
END;
377+
$procedure$;
378+
379+
END;

0 commit comments

Comments
 (0)