Skip to content

Commit 42813f5

Browse files
committed
Merge branch 'feat/concurrent-change-detection-and-analysis'
2 parents 614ce95 + 23a5bbf commit 42813f5

File tree

64 files changed

+4411
-3587
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

64 files changed

+4411
-3587
lines changed

cli/src/cli.cr

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,12 @@ module Statbus
101101
end
102102
parser.on("worker", "Run Statbus Worker for background processing") do
103103
@mode = Mode::Worker
104+
parser.on("--stop-when-idle", "Exit when all queues are idle (for testing)") do
105+
@worker.stop_when_idle = true
106+
end
107+
parser.on("--database DB", "Override database name") do |db|
108+
@config.postgres_db = db
109+
end
104110
end
105111
parser.on("import", "Import into installed StatBus") do
106112
@mode = Mode::Import

cli/src/worker.cr

Lines changed: 197 additions & 260 deletions
Large diffs are not rendered by default.

doc/data-model.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,8 @@ Handles background processing. A long-running worker process calls `worker.proce
195195
- `command_registry(command, created_at, handler_procedure, before_procedure, after_procedure, description, queue, batches_per_wave)`
196196
- Key FKs: queue.
197197
- `queue_registry(queue, description, default_concurrency)`
198-
- `last_processed(table_name, transaction_id)`
198+
- `base_change_log(establishment_ids, legal_unit_ids, enterprise_ids, edited_by_valid_range)`
199+
- `base_change_log_has_pending(has_pending)`
199200

200201
## Auth & System Tables/Views
201202

doc/derive-pipeline.md

Lines changed: 68 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ group. The queues are fully independent — work on one queue never blocks anoth
7474
```
7575

7676
**Import queue** (1 fiber): Processes `import_job_process` one at a time.
77-
When an import modifies data, it triggers `check_table` on the analytics queue.
77+
When an import modifies data, it triggers `collect_changes` on the analytics queue.
7878

7979
**Analytics queue** (4 fibers = 1 top + 3 child): Derives all statistical
8080
tables. The top fiber runs each pipeline stage sequentially; when a stage
@@ -102,9 +102,9 @@ all downstream tables. Each box below is a **top-level task** — they run
102102
│ lifecycle trigger
103103
104104
┌─────────────────────────────────────────────┐
105-
① │ check_table
106-
Detects changed legal_unit / establishment
107-
rows. Enqueues derive_statistical_unit. │
105+
① │ collect_changes
106+
Drains base_change_log accumulator and
107+
enqueues derive_statistical_unit.
108108
└──────────────────┬──────────────────────────┘
109109
│ (strictly sequential — next task)
110110
@@ -160,13 +160,25 @@ all downstream tables. Each box below is a **top-level task** — they run
160160
161161
162162
┌─────────────────────────────────────────────┐
163-
⑥ │ derive_statistical_unit_facet MONOLITHIC
163+
⑥ │ derive_statistical_unit_facet PARENT
164164
│ │
165-
│ Aggregates statistical_unit into facets │
166-
│ for the drilldown UI. Runs as a single │
167-
│ operation (see "Why monolithic?" below). │
165+
│ Spawns partition children (parallel): │
166+
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
167+
│ │ part 0 │ │ part 1 │ │ part N │...│
168+
│ └──────────┘ └──────────┘ └──────────┘ │
169+
│ derive_statistical_unit_facet_partition │
168170
│ │
171+
│ Enqueues → statistical_unit_facet_reduce │
169172
│ Enqueues → derive_statistical_history_facet│
173+
│ │
174+
│ Parent "waiting" → children → done │
175+
└──────────────────┬──────────────────────────┘
176+
177+
178+
┌─────────────────────────────────────────────┐
179+
⑥b │ statistical_unit_facet_reduce SERIAL │
180+
│ Merges partition staging data into │
181+
│ main statistical_unit_facet table. │
170182
└──────────────────┬──────────────────────────┘
171183
172184
@@ -195,15 +207,36 @@ all children finish. This is structured concurrency — concurrency is scoped
195207
inside the parent, never between top-level tasks.
196208

197209

198-
## Why derive_statistical_unit_facet Is Monolithic
210+
## Staging Pattern and Race Safety
211+
212+
Step ② (`derive_statistical_unit`) writes to an UNLOGGED staging table
213+
(`statistical_unit_staging`) via batch children, then step ③
214+
(`statistical_unit_flush_staging`) merges staging into the main table and
215+
TRUNCATEs staging.
216+
217+
**Why there is no TRUNCATE at the start of derive_statistical_unit:**
218+
An earlier version TRUNCATEd staging at the start of each derive cycle to
219+
"clean up interrupted runs." This created a latent race: if `collect_changes`
220+
enqueued a new `derive_statistical_unit` before the previous cycle's
221+
`flush_staging` ran, the TRUNCATE would destroy all staged data. The race was
222+
nearly triggered in concurrent testing (priority gaps as small as 2 sequence
223+
values). The TRUNCATE was removed because:
224+
225+
1. Batch children already do `DELETE FROM staging WHERE unit_type/unit_id`
226+
before inserting — stale data from a previous cycle gets overwritten
227+
2. `flush_staging` TRUNCATEs at the end after merging staging → main
228+
3. UNLOGGED tables auto-truncate on unclean shutdown (PostgreSQL guarantee)
229+
230+
231+
## Partitioning derive_statistical_unit_facet
199232

200233
The three derived tables have different data models:
201234

202235
| Table | Keyed by | Natural partition |
203236
|----------------------------|----------------------------------------|-------------------|
204237
| `statistical_history` | `(resolution, year, month, unit_type)` | period |
205238
| `statistical_history_facet`| `(resolution, year, month, ...dims)` | period |
206-
| `statistical_unit_facet` | `(valid_from, valid_until, ...dims)` | date range |
239+
| `statistical_unit_facet` | `(valid_from, valid_until, ...dims)` | `(unit_type, unit_id)` |
207240

208241
`statistical_history` and `statistical_history_facet` use period-based keys
209242
(`year`, `month`), so each period child touches **disjoint rows** — perfect
@@ -214,10 +247,13 @@ A facet row with `valid_from=2020, valid_until=2025` would overlap **65
214247
periods** (5 year + 60 month). Splitting by period would cause each child to
215248
redundantly DELETE and re-INSERT the same row. Correct but 65x wasteful.
216249

217-
At current scale (3.1M statistical units), DSUF takes 30–180 seconds as a
218-
monolithic operation. This is fast enough. If it becomes a bottleneck with
219-
larger datasets, the right approach is splitting by `(unit_type, unit_id)`
220-
using a map-reduce pattern (not by period).
250+
Instead, DSUF partitions by `(unit_type, unit_id)` using a **map-reduce**
251+
pattern: each partition child writes partial aggregations to an UNLOGGED
252+
staging table, then `statistical_unit_facet_reduce` merges and swaps the
253+
results into the main table in a single transaction.
254+
255+
Only **dirty partitions** (those with changed data tracked in
256+
`statistical_unit_facet_dirty_partitions`) are recomputed.
221257

222258

223259
## Production Performance (1.1M LU + 826K ES = 3.1M stat units)
@@ -254,23 +290,24 @@ the next-largest costs. The reporting stages (DSH, DSHF) take seconds.
254290

255291
All commands and their queue assignments:
256292

257-
| Queue | Command | Role | Notes |
258-
|-------------|----------------------------------------|-------------|------------------------------|
259-
| analytics | `check_table` | top-level | Detects changed rows |
260-
| analytics | `derive_statistical_unit` | parent | Spawns batch children |
261-
| analytics | `statistical_unit_refresh_batch` | child | Parallel batch processing |
262-
| analytics | `derive_statistical_unit_continue` | top-level | ANALYZE sync point |
263-
| analytics | `statistical_unit_flush_staging` | top-level | Merge staging → main table |
264-
| analytics | `derive_reports` | top-level | Enqueues DSH |
265-
| analytics | `derive_statistical_history` | parent | Spawns period children |
266-
| analytics | `derive_statistical_history_period` | child | Per-period aggregation |
267-
| analytics | `derive_statistical_unit_facet` | top-level | Monolithic facet derivation |
268-
| analytics | `derive_statistical_history_facet` | parent | Spawns period children |
269-
| analytics | `derive_statistical_history_facet_period` | child | Per-period facet aggregation |
270-
| analytics | `deleted_row` | top-level | Handle deletions |
271-
| import | `import_job_process` | top-level | One import at a time |
272-
| maintenance | `task_cleanup` | top-level | Clean old tasks |
273-
| maintenance | `import_job_cleanup` | top-level | Clean expired imports |
293+
| Queue | Command | Role | Notes |
294+
|-------------|-------------------------------------------|-------------|------------------------------|
295+
| analytics | `collect_changes` | top-level | Drains base_change_log |
296+
| analytics | `derive_statistical_unit` | parent | Spawns batch children |
297+
| analytics | `statistical_unit_refresh_batch` | child | Parallel batch processing |
298+
| analytics | `derive_statistical_unit_continue` | top-level | ANALYZE sync point |
299+
| analytics | `statistical_unit_flush_staging` | top-level | Merge staging → main table |
300+
| analytics | `derive_reports` | top-level | Enqueues DSH |
301+
| analytics | `derive_statistical_history` | parent | Spawns period children |
302+
| analytics | `derive_statistical_history_period` | child | Per-period aggregation |
303+
| analytics | `derive_statistical_unit_facet` | parent | Spawns partition children |
304+
| analytics | `derive_statistical_unit_facet_partition` | child | Per-partition facet compute |
305+
| analytics | `statistical_unit_facet_reduce` | top-level | Merge partitions → main |
306+
| analytics | `derive_statistical_history_facet` | parent | Spawns period children |
307+
| analytics | `derive_statistical_history_facet_period` | child | Per-period facet aggregation |
308+
| import | `import_job_process` | top-level | One import at a time |
309+
| maintenance | `task_cleanup` | top-level | Clean old tasks |
310+
| maintenance | `import_job_cleanup` | top-level | Clean expired imports |
274311

275312

276313
## Frontend Status Detection

doc/worker-structured-concurrency.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ Uncle tasks:
160160
For the full pipeline diagram, see [derive-pipeline.md](./derive-pipeline.md).
161161

162162
```
163-
1. check_table detects changes → enqueues derive_statistical_unit
163+
1. collect_changes drains base_change_log → enqueues derive_statistical_unit
164164
165165
2. derive_statistical_unit runs:
166166
- Computes closed groups of affected enterprises
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
BEGIN;
2+
3+
-- Restore the original function (without datname filter)
4+
CREATE OR REPLACE FUNCTION worker.reset_abandoned_processing_tasks()
5+
RETURNS integer
6+
LANGUAGE plpgsql
7+
AS $function$
8+
DECLARE
9+
v_reset_count int := 0;
10+
v_task RECORD;
11+
v_stale_pid INT;
12+
BEGIN
13+
-- Terminate all other lingering worker backends.
14+
-- The current worker holds the global advisory lock, so any other process with
15+
-- application_name = 'worker' is a stale remnant from a previous crash.
16+
FOR v_stale_pid IN
17+
SELECT pid FROM pg_stat_activity
18+
WHERE application_name = 'worker' AND pid <> pg_backend_pid()
19+
LOOP
20+
RAISE LOG 'Terminating stale worker PID %', v_stale_pid;
21+
PERFORM pg_terminate_backend(v_stale_pid);
22+
END LOOP;
23+
24+
-- Find tasks stuck in 'processing' and reset their status to 'pending'.
25+
-- The backends have already been terminated above.
26+
FOR v_task IN
27+
SELECT id FROM worker.tasks WHERE state = 'processing'::worker.task_state FOR UPDATE
28+
LOOP
29+
-- Reset the task to pending state.
30+
UPDATE worker.tasks
31+
SET state = 'pending'::worker.task_state,
32+
worker_pid = NULL,
33+
processed_at = NULL,
34+
error = NULL,
35+
duration_ms = NULL
36+
WHERE id = v_task.id;
37+
38+
v_reset_count := v_reset_count + 1;
39+
END LOOP;
40+
RETURN v_reset_count;
41+
END;
42+
$function$;
43+
44+
END;
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
BEGIN;
2+
3+
-- Fix: reset_abandoned_processing_tasks was killing worker connections
4+
-- across ALL databases (pg_stat_activity is cluster-wide).
5+
-- When running multiple workers on different databases (e.g., test worker
6+
-- on test_concurrent_* alongside production worker on statbus_local),
7+
-- they would enter a death spiral — each one's reset function killing
8+
-- the other's connections.
9+
--
10+
-- Fix: Add datname = current_database() filter so workers only terminate
11+
-- stale connections to their OWN database.
12+
CREATE OR REPLACE FUNCTION worker.reset_abandoned_processing_tasks()
13+
RETURNS integer
14+
LANGUAGE plpgsql
15+
AS $function$
16+
DECLARE
17+
v_reset_count int := 0;
18+
v_task RECORD;
19+
v_stale_pid INT;
20+
BEGIN
21+
-- Terminate all other lingering worker backends FOR THIS DATABASE ONLY.
22+
-- The current worker holds the global advisory lock, so any other process with
23+
-- application_name = 'worker' connected to the same database is a stale remnant
24+
-- from a previous crash.
25+
-- CRITICAL: Filter by datname = current_database() because pg_stat_activity is
26+
-- cluster-wide. Without this filter, workers on different databases (e.g., test
27+
-- databases) would kill each other's connections.
28+
FOR v_stale_pid IN
29+
SELECT pid FROM pg_stat_activity
30+
WHERE application_name = 'worker'
31+
AND pid <> pg_backend_pid()
32+
AND datname = current_database()
33+
LOOP
34+
RAISE LOG 'Terminating stale worker PID %', v_stale_pid;
35+
PERFORM pg_terminate_backend(v_stale_pid);
36+
END LOOP;
37+
38+
-- Find tasks stuck in 'processing' and reset their status to 'pending'.
39+
-- The backends have already been terminated above.
40+
FOR v_task IN
41+
SELECT id FROM worker.tasks WHERE state = 'processing'::worker.task_state FOR UPDATE
42+
LOOP
43+
-- Reset the task to pending state.
44+
UPDATE worker.tasks
45+
SET state = 'pending'::worker.task_state,
46+
worker_pid = NULL,
47+
processed_at = NULL,
48+
error = NULL,
49+
duration_ms = NULL
50+
WHERE id = v_task.id;
51+
52+
v_reset_count := v_reset_count + 1;
53+
END LOOP;
54+
RETURN v_reset_count;
55+
END;
56+
$function$;
57+
58+
END;

0 commit comments

Comments
 (0)