Skip to content

Commit 56a9375

Browse files
committed
db:Improve import progress reporting
1 parent 86ea3ca commit 56a9375

20 files changed

+255
-190
lines changed

doc/import-system.md

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -163,19 +163,20 @@ The import system is built around several key database tables and concepts:
163163
* If no hard error is found by the current step, it updates `last_completed_priority = step.priority`. The `state` remains `'analysing'` (or advances if this is the last analysis step for the row). The `action` (e.g., 'insert', 'replace', 'update') determined by `analyse_external_idents` (or potentially modified to 'skip' by a prior analysis step that found a hard error) is preserved if no new hard error is found by the current step.
164164
* When a step successfully processes data or resolves a previously reported issue, it should clear *only its specific keys* from the `error` and `invalid_codes` columns (e.g., `dt.error - 'my_error_key'`, `dt.invalid_codes - 'my_code_key'`). This ensures that errors or invalid codes reported by other steps are preserved.
165165
* *Note: The `edit_info` step populates `edit_by_user_id` and `edit_at`. The `analyse_external_idents` step is primarily responsible for determining the initial `operation` and `action` based on identifier lookups and job strategy. It also identifies hard errors related to identifiers, including "unstable identifiers." An unstable identifier error occurs if an input row identifies an existing unit but attempts to *change the value* of one of its existing external identifiers of a specific type; this results in `action = 'skip'`. Adding a new identifier type to an existing unit, or omitting an identifier type (which might imply removal by a `process_` step), are not considered unstable identifier errors by this specific check.*
166-
* Continues processing batches across steps until `max_batches_per_transaction` is reached or no more rows are found for analysis in the current state/priority range.
167-
* If work remains (either batch limit hit or rows still in `analysing` state), the job is rescheduled by the worker system.
168-
* Once all analysis steps for all rows are complete (no rows left in `analysing` state, or all remaining rows are in `error` state with `action = 'skip'`), the job state moves to `processing_data` (or `waiting_for_review` if `job.review=true`). The worker is rescheduled if moving to `processing_data`.
166+
* The function processes one batch per available step in a single transaction. If any batch is found and processed, the function returns `true`, signaling that work was done and the job should be rescheduled to process subsequent batches.
167+
* This continues until a full pass over all analysis steps finds no more rows ready for processing. At this point, the function returns `false`.
168+
* Once the analysis phase is complete (i.e., the function returns `false`), the `import_job_process` procedure transitions the job's state to `processing_data` (or `waiting_for_review` if `job.review=true`). The worker is rescheduled if moving to `processing_data`.
169169
* **Process (`admin.import_job_process_phase('process')`)**:
170170
* Job state is `processing_data`.
171171
* Iterates through `import_step`s (from the `definition_snapshot`'s `import_step_list`) in `priority` order.
172172
* For each step with a `process_procedure`:
173173
* Selects batches of rows from `_data` (identified by their `row_id`) where `state = 'processing'` and `last_completed_priority < step.priority` and crucially `action != 'skip'` using `FOR UPDATE SKIP LOCKED`. This ensures rows marked as error (and thus skip) in the analysis phase are not processed.
174174
* Calls the step's `process_procedure` with the batch of `row_id`s.
175175
* The procedure reads data (including `internal` results, audit info, and `action`), performs the final `INSERT` (for `action='insert'`) or `REPLACE` (for `action='replace'`, using `admin.batch_insert_or_replace_generic_valid_time_table` for temporal data), updates `pk_id` columns, and sets `last_completed_priority = step.priority`. If a `process_procedure` encounters an unrecoverable error for a row (which should be rare if analysis is robust), it should set that row's `state` to `'error'` and `action` to `'skip'`.
176-
* Continues processing batches across steps until `max_batches_per_transaction` is reached or no more rows are found for processing in the current state/priority range.
177-
* If work remains, the job is rescheduled.
178-
* Once all operation steps for all rows are complete (no rows left in `processing` state), the job state moves to `finished`.
176+
* Immediately after a batch is processed by the final step in the sequence, its rows are transitioned from `state = 'processing'` to `state = 'processed'`, ensuring that state transitions are tied directly to the batch they belong to.
177+
* The function processes one batch per available step in a single transaction. If any batch is found and processed, the function returns `true`, indicating the job should be rescheduled to process subsequent batches.
178+
* This continues until a full pass over all processing steps finds no more rows ready for processing.
179+
* Once all operation steps for all rows are complete (the function returns `false`), the `import_job_process` procedure transitions the job's state to `finished`.
179180

180181
## Defining a New Import Type
181182

0 commit comments

Comments
 (0)