Skip to content

Commit 2912d7d

Browse files
committed
[import] Convert import job batching to use multirange for efficiency and smaller parameters
This commit refactors the import job batch processing system to use `int4multirange` instead of `INTEGER[]` for passing batches of row IDs. This is a significant performance optimization that enables the use of GIST indexes for batch lookups. Key changes include: - All batch-oriented procedures now accept an `int4multirange` parameter. - Queries are updated from `... = ANY($1)` to `... <@ $1` to leverage GIST indexes on `row_id`. - A GIST index is now automatically created on the `row_id` column for new import data tables. - A GIST index is now automatically created on the `daterange(valid_from,valid_until)` columns for new import data tables. This refactoring also addresses several critical bugs and regressions: - Corrects multiple `format()` calls where unnumbered placeholders (`%L`) were incorrectly mixed with `EXECUTE ... USING`, leading to "operator is not unique" errors. All such calls now use numbered placeholders (`%1$L`). - Fixes a bug in `process_legal_unit` where the wrong variable was passed to a `USING` clause. - Resolves a syntax error and incorrect placeholder numbering in `analyse_location` and improves its error handling to be idempotent. Finally, extensive `RAISE DEBUG` logging has been added to all procedures to improve observability and simplify future debugging by logging the dynamic SQL being executed. The enterprise linking logic for establishments has also been moved to its own file for better separation of concerns. Fixes: "operator is not unique: integer <@ unknown" regression. Fixes: Syntax errors and incorrect error logic in `analyse_location`.
1 parent ff64b9d commit 2912d7d

24 files changed

+561
-508
lines changed

migrations/20250423000000_add_import_jobs.up.sql

Lines changed: 43 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
-- Migration 20250227115859: add import jobs
22
BEGIN;
33

4+
-- Enable GIST indexes on base types for efficient multirange operations
5+
CREATE EXTENSION IF NOT EXISTS btree_gist;
46

57
-- Create a separate schema for all the import related functions that are used internally
68
-- by the import job system, but not available through the API.
@@ -1449,6 +1451,14 @@ BEGIN
14491451
EXECUTE format($$CREATE INDEX ON public.%1$I (state, last_completed_priority, row_id)$$, job.data_table_name /* %1$I */);
14501452
RAISE DEBUG '[Job %] Added composite index to data table %', job.id, job.data_table_name;
14511453

1454+
-- Add GIST index on row_id for efficient multirange operations (<@)
1455+
EXECUTE format('CREATE INDEX ON public.%I USING GIST (row_id)', job.data_table_name);
1456+
RAISE DEBUG '[Job %] Added GIST index on row_id to data table %', job.id, job.data_table_name;
1457+
1458+
-- Add GIST index on daterange(valid_from, valid_until) for efficient temporal_merge lookups.
1459+
EXECUTE format('CREATE INDEX ON public.%I USING GIST (daterange(valid_from, valid_until, ''[]''))', job.data_table_name);
1460+
RAISE DEBUG '[Job %] Added GIST index on validity daterange to data table %', job.id, job.data_table_name;
1461+
14521462
-- Create indexes on uniquely identifying source_input columns to speed up lookups within analysis steps.
14531463
FOR col_rec IN
14541464
SELECT (x->>'column_name')::TEXT as column_name
@@ -1721,7 +1731,7 @@ $$;
17211731
CREATE OR REPLACE PROCEDURE import.propagate_fatal_error_to_entity_batch(
17221732
p_job_id INT,
17231733
p_data_table_name TEXT,
1724-
p_batch_row_ids INTEGER[],
1734+
p_batch_row_id_ranges int4multirange,
17251735
p_error_keys TEXT[],
17261736
p_step_code TEXT
17271737
)
@@ -1736,13 +1746,13 @@ BEGIN
17361746
EXECUTE format($$
17371747
SELECT array_agg(DISTINCT dt.founding_row_id)
17381748
FROM public.%1$I dt
1739-
WHERE dt.row_id = ANY($1) -- from the current batch
1749+
WHERE dt.row_id <@ $1 -- from the current batch
17401750
AND dt.state = 'error'
17411751
AND dt.founding_row_id IS NOT NULL
17421752
AND (dt.errors ?| %2$L::text[]); -- and the error is from this step
17431753
$$, p_data_table_name, p_error_keys)
17441754
INTO v_failed_entity_founding_rows
1745-
USING p_batch_row_ids;
1755+
USING p_batch_row_id_ranges;
17461756

17471757
IF array_length(v_failed_entity_founding_rows, 1) > 0 THEN
17481758
RAISE DEBUG '[Job %] %s: Propagating errors for % failed entities.', p_job_id, p_step_code, array_length(v_failed_entity_founding_rows, 1);
@@ -2048,7 +2058,7 @@ $procedure$;
20482058
-- Helper procedure to process a single batch through all processing steps
20492059
CREATE PROCEDURE admin.import_job_process_batch(
20502060
job public.import_job,
2051-
batch_row_ids INTEGER[]
2061+
batch_row_id_ranges int4multirange
20522062
)
20532063
LANGUAGE plpgsql AS $import_job_process_batch$
20542064
DECLARE
@@ -2058,18 +2068,18 @@ DECLARE
20582068
error_message TEXT;
20592069
v_should_disable_triggers BOOLEAN;
20602070
BEGIN
2061-
RAISE DEBUG '[Job %] Processing batch of % rows through all process steps.', job.id, array_length(batch_row_ids, 1);
2071+
RAISE DEBUG '[Job %] Processing batch with ranges %s through all process steps.', job.id, batch_row_id_ranges::text;
20622072
targets := job.definition_snapshot->'import_step_list';
20632073

20642074
-- Check if the batch contains any operations that are not simple inserts.
20652075
-- If so, we need to disable FK triggers to allow for temporary inconsistencies.
20662076
EXECUTE format(
2067-
'SELECT EXISTS(SELECT 1 FROM public.%I WHERE row_id = ANY($1) AND operation IS DISTINCT FROM %L)',
2077+
'SELECT EXISTS(SELECT 1 FROM public.%I WHERE row_id <@ $1 AND operation IS DISTINCT FROM %L)',
20682078
job.data_table_name,
20692079
'insert'
20702080
)
20712081
INTO v_should_disable_triggers
2072-
USING batch_row_ids;
2082+
USING batch_row_id_ranges;
20732083

20742084
IF v_should_disable_triggers THEN
20752085
RAISE DEBUG '[Job %] Batch contains updates/replaces. Disabling FK triggers.', job.id;
@@ -2088,7 +2098,7 @@ BEGIN
20882098
RAISE DEBUG '[Job %] Batch processing: Calling % for step %', job.id, proc_to_call, target_rec.code;
20892099

20902100
-- Since this is one transaction, any error will roll back the entire batch.
2091-
EXECUTE format('CALL %s($1, $2, $3)', proc_to_call) USING job.id, batch_row_ids, target_rec.code;
2101+
EXECUTE format('CALL %s($1, $2, $3)', proc_to_call) USING job.id, batch_row_id_ranges, target_rec.code;
20922102
END LOOP;
20932103

20942104
-- Re-enable triggers if they were disabled.
@@ -2142,7 +2152,7 @@ DECLARE
21422152
v_steps JSONB;
21432153
v_step_rec RECORD;
21442154
v_proc_to_call REGPROC;
2145-
v_batch_row_ids INTEGER[];
2155+
v_batch_row_id_ranges int4multirange;
21462156
v_error_message TEXT;
21472157
v_rows_exist BOOLEAN;
21482158
v_rows_processed INT;
@@ -2179,7 +2189,7 @@ BEGIN
21792189

21802190
IF v_rows_exist THEN
21812191
RAISE DEBUG '[Job %] Executing HOLISTIC step % (priority %)', job.id, v_step_rec.code, v_step_rec.priority;
2182-
EXECUTE format('CALL %s($1, $2, $3)', v_proc_to_call) USING job.id, NULL::INTEGER[], v_step_rec.code;
2192+
EXECUTE format('CALL %s($1, $2, $3)', v_proc_to_call) USING job.id, NULL::int4multirange, v_step_rec.code;
21832193
v_rows_processed := 1; -- Mark as having done work.
21842194
END IF;
21852195
ELSE
@@ -2193,27 +2203,27 @@ BEGIN
21932203
WHERE state = %2$L AND last_completed_priority < %3$L
21942204
ORDER BY entity_root_id
21952205
LIMIT %4$L
2196-
)
2197-
SELECT array_agg(t.row_id)
2198-
FROM (
2206+
),
2207+
batch_rows AS (
21992208
SELECT dt.row_id
22002209
FROM public.%1$I dt
22012210
JOIN entity_batch eb ON COALESCE(dt.founding_row_id, dt.row_id) = eb.entity_root_id
22022211
WHERE dt.state = %2$L AND dt.last_completed_priority < %3$L
22032212
ORDER BY dt.row_id
22042213
FOR UPDATE SKIP LOCKED
2205-
) t
2214+
)
2215+
SELECT public.array_to_int4multirange(array_agg(row_id)) FROM batch_rows
22062216
$$,
22072217
job.data_table_name, /* %1$I */
22082218
v_current_phase_data_state, /* %2$L */
22092219
v_step_rec.priority, /* %3$L */
22102220
job.analysis_batch_size /* %4$L */
2211-
) INTO v_batch_row_ids;
2221+
) INTO v_batch_row_id_ranges;
22122222

2213-
IF v_batch_row_ids IS NOT NULL AND array_length(v_batch_row_ids, 1) > 0 THEN
2214-
RAISE DEBUG '[Job %] Executing BATCHED step % (priority %), found % rows.', job.id, v_step_rec.code, v_step_rec.priority, array_length(v_batch_row_ids, 1);
2215-
EXECUTE format('CALL %s($1, $2, $3)', v_proc_to_call) USING job.id, v_batch_row_ids, v_step_rec.code;
2216-
v_rows_processed := array_length(v_batch_row_ids, 1);
2223+
IF v_batch_row_id_ranges IS NOT NULL AND NOT isempty(v_batch_row_id_ranges) THEN
2224+
RAISE DEBUG '[Job %] Executing BATCHED step % (priority %), found ranges: %s.', job.id, v_step_rec.code, v_step_rec.priority, v_batch_row_id_ranges::text;
2225+
EXECUTE format('CALL %s($1, $2, $3)', v_proc_to_call) USING job.id, v_batch_row_id_ranges, v_step_rec.code;
2226+
v_rows_processed := (SELECT count(*) FROM unnest(v_batch_row_id_ranges));
22172227
END IF;
22182228
END IF;
22192229
EXCEPTION WHEN OTHERS THEN
@@ -2269,7 +2279,7 @@ CREATE FUNCTION admin.import_job_processing_phase(
22692279
) RETURNS BOOLEAN -- Returns TRUE if work was done and rescheduling is needed
22702280
LANGUAGE plpgsql AS $import_job_processing_phase$
22712281
DECLARE
2272-
v_batch_row_ids INTEGER[];
2282+
v_batch_row_id_ranges int4multirange;
22732283
BEGIN
22742284
RAISE DEBUG '[Job %] Processing phase: checking for a batch.', job.id;
22752285

@@ -2282,29 +2292,29 @@ BEGIN
22822292
WHERE state = 'processing' AND action = 'use'
22832293
ORDER BY entity_root_id
22842294
LIMIT %2$L
2285-
)
2286-
SELECT array_agg(t.row_id)
2287-
FROM (
2295+
),
2296+
batch_rows AS (
22882297
SELECT dt.row_id
22892298
FROM public.%1$I dt
22902299
JOIN entity_batch eb ON COALESCE(dt.founding_row_id, dt.row_id) = eb.entity_root_id
22912300
WHERE dt.state = 'processing' AND dt.action = 'use'
22922301
ORDER BY dt.row_id
22932302
FOR UPDATE SKIP LOCKED
2294-
) t
2303+
)
2304+
SELECT public.array_to_int4multirange(array_agg(row_id)) FROM batch_rows
22952305
$$,
22962306
job.data_table_name, /* %1$I */
22972307
job.processing_batch_size /* %2$L */
2298-
) INTO v_batch_row_ids;
2308+
) INTO v_batch_row_id_ranges;
22992309

2300-
IF v_batch_row_ids IS NOT NULL AND array_length(v_batch_row_ids, 1) > 0 THEN
2301-
RAISE DEBUG '[Job %] Found batch of % rows to process.', job.id, array_length(v_batch_row_ids, 1);
2310+
IF v_batch_row_id_ranges IS NOT NULL AND NOT isempty(v_batch_row_id_ranges) THEN
2311+
RAISE DEBUG '[Job %] Found batch of ranges to process: %s.', job.id, v_batch_row_id_ranges::text;
23022312
BEGIN
2303-
CALL admin.import_job_process_batch(job, v_batch_row_ids);
2313+
CALL admin.import_job_process_batch(job, v_batch_row_id_ranges);
23042314

2305-
EXECUTE format($$UPDATE public.%1$I SET state = 'processed' WHERE row_id = ANY($1)$$,
2306-
job.data_table_name /* %1$I */) USING v_batch_row_ids;
2307-
RAISE DEBUG '[Job %] Batch successfully processed. Marked % rows as processed.', job.id, array_length(v_batch_row_ids, 1);
2315+
EXECUTE format($$UPDATE public.%1$I SET state = 'processed' WHERE row_id <@ $1$$,
2316+
job.data_table_name /* %1$I */) USING v_batch_row_id_ranges;
2317+
RAISE DEBUG '[Job %] Batch successfully processed. Marked rows in ranges %s as processed.', job.id, v_batch_row_id_ranges::text;
23082318
EXCEPTION WHEN OTHERS THEN
23092319
DECLARE
23102320
error_message TEXT;
@@ -2313,8 +2323,8 @@ BEGIN
23132323
GET STACKED DIAGNOSTICS error_message = MESSAGE_TEXT,
23142324
error_context = PG_EXCEPTION_CONTEXT;
23152325
RAISE WARNING '[Job %] Error processing batch: %. Context: %. Marking batch rows as error and failing job.', job.id, error_message, error_context;
2316-
EXECUTE format($$UPDATE public.%1$I SET state = 'error', errors = COALESCE(errors, '{}'::jsonb) || %2$L WHERE row_id = ANY($1)$$,
2317-
job.data_table_name /* %1$I */, jsonb_build_object('process_batch_error', error_message, 'context', error_context) /* %2$L */) USING v_batch_row_ids;
2326+
EXECUTE format($$UPDATE public.%1$I SET state = 'error', errors = COALESCE(errors, '{}'::jsonb) || %2$L WHERE row_id <@ $1$$,
2327+
job.data_table_name /* %1$I */, jsonb_build_object('process_batch_error', error_message, 'context', error_context) /* %2$L */) USING v_batch_row_id_ranges;
23182328
UPDATE public.import_job SET error = jsonb_build_object('error_in_processing_batch', error_message, 'context', error_context), state = 'finished' WHERE id = job.id;
23192329
-- On error, do not reschedule.
23202330
RETURN FALSE;

0 commit comments

Comments
 (0)