Skip to content

Commit dcb9e5e

Browse files
jhfclaude
andcommitted
fix: Remove TRUNCATE staging race in derive_statistical_unit
Remove TRUNCATE at the start of derive_statistical_unit that could destroy staged data from a previous cycle's batch children if a new collect_changes -> derive_statistical_unit started before flush_staging completed. Also removes dead code v_uncle_priority (allocated via nextval but never used). The TRUNCATE is unnecessary because: 1. Batch children DELETE before INSERT for their unit_type/unit_id 2. flush_staging TRUNCATEs at the end after merging 3. UNLOGGED tables auto-truncate on unclean shutdown Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent e51401f commit dcb9e5e

File tree

3 files changed

+461
-31
lines changed

3 files changed

+461
-31
lines changed

doc/derive-pipeline.md

Lines changed: 68 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ group. The queues are fully independent — work on one queue never blocks anoth
7474
```
7575

7676
**Import queue** (1 fiber): Processes `import_job_process` one at a time.
77-
When an import modifies data, it triggers `check_table` on the analytics queue.
77+
When an import modifies data, it triggers `collect_changes` on the analytics queue.
7878

7979
**Analytics queue** (4 fibers = 1 top + 3 child): Derives all statistical
8080
tables. The top fiber runs each pipeline stage sequentially; when a stage
@@ -102,9 +102,9 @@ all downstream tables. Each box below is a **top-level task** — they run
102102
│ lifecycle trigger
103103
104104
┌─────────────────────────────────────────────┐
105-
① │ check_table
106-
Detects changed legal_unit / establishment
107-
rows. Enqueues derive_statistical_unit. │
105+
① │ collect_changes
106+
Drains base_change_log accumulator and
107+
enqueues derive_statistical_unit.
108108
└──────────────────┬──────────────────────────┘
109109
│ (strictly sequential — next task)
110110
@@ -160,13 +160,25 @@ all downstream tables. Each box below is a **top-level task** — they run
160160
161161
162162
┌─────────────────────────────────────────────┐
163-
⑥ │ derive_statistical_unit_facet MONOLITHIC
163+
⑥ │ derive_statistical_unit_facet PARENT
164164
│ │
165-
│ Aggregates statistical_unit into facets │
166-
│ for the drilldown UI. Runs as a single │
167-
│ operation (see "Why monolithic?" below). │
165+
│ Spawns partition children (parallel): │
166+
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
167+
│ │ part 0 │ │ part 1 │ │ part N │...│
168+
│ └──────────┘ └──────────┘ └──────────┘ │
169+
│ derive_statistical_unit_facet_partition │
168170
│ │
171+
│ Enqueues → statistical_unit_facet_reduce │
169172
│ Enqueues → derive_statistical_history_facet│
173+
│ │
174+
│ Parent "waiting" → children → done │
175+
└──────────────────┬──────────────────────────┘
176+
177+
178+
┌─────────────────────────────────────────────┐
179+
⑥b │ statistical_unit_facet_reduce SERIAL │
180+
│ Merges partition staging data into │
181+
│ main statistical_unit_facet table. │
170182
└──────────────────┬──────────────────────────┘
171183
172184
@@ -195,15 +207,36 @@ all children finish. This is structured concurrency — concurrency is scoped
195207
inside the parent, never between top-level tasks.
196208

197209

198-
## Why derive_statistical_unit_facet Is Monolithic
210+
## Staging Pattern and Race Safety
211+
212+
Step ② (`derive_statistical_unit`) writes to an UNLOGGED staging table
213+
(`statistical_unit_staging`) via batch children, then step ③
214+
(`statistical_unit_flush_staging`) merges staging into the main table and
215+
TRUNCATEs staging.
216+
217+
**Why there is no TRUNCATE at the start of derive_statistical_unit:**
218+
An earlier version TRUNCATEd staging at the start of each derive cycle to
219+
"clean up interrupted runs." This created a latent race: if `collect_changes`
220+
enqueued a new `derive_statistical_unit` before the previous cycle's
221+
`flush_staging` ran, the TRUNCATE would destroy all staged data. The race was
222+
nearly triggered in concurrent testing (priority gaps as small as 2 sequence
223+
values). The TRUNCATE was removed because:
224+
225+
1. Batch children already do `DELETE FROM staging WHERE unit_type/unit_id`
226+
before inserting — stale data from a previous cycle gets overwritten
227+
2. `flush_staging` TRUNCATEs at the end after merging staging → main
228+
3. UNLOGGED tables auto-truncate on unclean shutdown (PostgreSQL guarantee)
229+
230+
231+
## Partitioning derive_statistical_unit_facet
199232

200233
The three derived tables have different data models:
201234

202235
| Table | Keyed by | Natural partition |
203236
|----------------------------|----------------------------------------|-------------------|
204237
| `statistical_history` | `(resolution, year, month, unit_type)` | period |
205238
| `statistical_history_facet`| `(resolution, year, month, ...dims)` | period |
206-
| `statistical_unit_facet` | `(valid_from, valid_until, ...dims)` | date range |
239+
| `statistical_unit_facet` | `(valid_from, valid_until, ...dims)` | `(unit_type, unit_id)` |
207240

208241
`statistical_history` and `statistical_history_facet` use period-based keys
209242
(`year`, `month`), so each period child touches **disjoint rows** — perfect
@@ -214,10 +247,13 @@ A facet row with `valid_from=2020, valid_until=2025` would overlap **65
214247
periods** (5 year + 60 month). Splitting by period would cause each child to
215248
redundantly DELETE and re-INSERT the same row. Correct but 65x wasteful.
216249

217-
At current scale (3.1M statistical units), DSUF takes 30–180 seconds as a
218-
monolithic operation. This is fast enough. If it becomes a bottleneck with
219-
larger datasets, the right approach is splitting by `(unit_type, unit_id)`
220-
using a map-reduce pattern (not by period).
250+
Instead, DSUF partitions by `(unit_type, unit_id)` using a **map-reduce**
251+
pattern: each partition child writes partial aggregations to an UNLOGGED
252+
staging table, then `statistical_unit_facet_reduce` merges and swaps the
253+
results into the main table in a single transaction.
254+
255+
Only **dirty partitions** (those with changed data tracked in
256+
`statistical_unit_facet_dirty_partitions`) are recomputed.
221257

222258

223259
## Production Performance (1.1M LU + 826K ES = 3.1M stat units)
@@ -254,23 +290,24 @@ the next-largest costs. The reporting stages (DSH, DSHF) take seconds.
254290

255291
All commands and their queue assignments:
256292

257-
| Queue | Command | Role | Notes |
258-
|-------------|----------------------------------------|-------------|------------------------------|
259-
| analytics | `check_table` | top-level | Detects changed rows |
260-
| analytics | `derive_statistical_unit` | parent | Spawns batch children |
261-
| analytics | `statistical_unit_refresh_batch` | child | Parallel batch processing |
262-
| analytics | `derive_statistical_unit_continue` | top-level | ANALYZE sync point |
263-
| analytics | `statistical_unit_flush_staging` | top-level | Merge staging → main table |
264-
| analytics | `derive_reports` | top-level | Enqueues DSH |
265-
| analytics | `derive_statistical_history` | parent | Spawns period children |
266-
| analytics | `derive_statistical_history_period` | child | Per-period aggregation |
267-
| analytics | `derive_statistical_unit_facet` | top-level | Monolithic facet derivation |
268-
| analytics | `derive_statistical_history_facet` | parent | Spawns period children |
269-
| analytics | `derive_statistical_history_facet_period` | child | Per-period facet aggregation |
270-
| analytics | `deleted_row` | top-level | Handle deletions |
271-
| import | `import_job_process` | top-level | One import at a time |
272-
| maintenance | `task_cleanup` | top-level | Clean old tasks |
273-
| maintenance | `import_job_cleanup` | top-level | Clean expired imports |
293+
| Queue | Command | Role | Notes |
294+
|-------------|-------------------------------------------|-------------|------------------------------|
295+
| analytics | `collect_changes` | top-level | Drains base_change_log |
296+
| analytics | `derive_statistical_unit` | parent | Spawns batch children |
297+
| analytics | `statistical_unit_refresh_batch` | child | Parallel batch processing |
298+
| analytics | `derive_statistical_unit_continue` | top-level | ANALYZE sync point |
299+
| analytics | `statistical_unit_flush_staging` | top-level | Merge staging → main table |
300+
| analytics | `derive_reports` | top-level | Enqueues DSH |
301+
| analytics | `derive_statistical_history` | parent | Spawns period children |
302+
| analytics | `derive_statistical_history_period` | child | Per-period aggregation |
303+
| analytics | `derive_statistical_unit_facet` | parent | Spawns partition children |
304+
| analytics | `derive_statistical_unit_facet_partition` | child | Per-partition facet compute |
305+
| analytics | `statistical_unit_facet_reduce` | top-level | Merge partitions → main |
306+
| analytics | `derive_statistical_history_facet` | parent | Spawns period children |
307+
| analytics | `derive_statistical_history_facet_period` | child | Per-period facet aggregation |
308+
| import | `import_job_process` | top-level | One import at a time |
309+
| maintenance | `task_cleanup` | top-level | Clean old tasks |
310+
| maintenance | `import_job_cleanup` | top-level | Clean expired imports |
274311

275312

276313
## Frontend Status Detection

0 commit comments

Comments
 (0)