Skip to content

Commit 8293a3f

Browse files
YuweiXiaoclaude
andauthored
feat: expose snapshot timing in duckpipe.status() (#9)
## Summary - Add `snapshot_duration_ms` and `snapshot_rows` columns to `table_mappings` and `duckpipe.status()` so users can query snapshot performance directly via SQL instead of parsing PG logs - Snapshot duration is always measured unconditionally (not gated on the `timing` flag), stored on SNAPSHOT→CATCHUP transition - `resync_table()` resets both columns to NULL for clean re-snapshot semantics ## Test plan - [x] All 24 regression tests pass (`make installcheck`) - [x] `monitoring` test verifies new columns appear in `duckpipe.status()` with correct NULL defaults - [x] Code reviewed for type consistency across the full stack (snapshot.rs → metadata.rs → service.rs → api.rs → bootstrap.sql) 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent cd852cb commit 8293a3f

14 files changed

Lines changed: 135 additions & 71 deletions

File tree

PROGRESS.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@
6464
### Monitoring / Observability
6565
- [ ] `applied_lsn` stays NULL during SNAPSHOT/CATCHUP — should be set to `snapshot_lsn` after snapshot completes
6666
- [ ] `worker_state` not updated during snapshot processing — stale metrics while snapshots run
67-
- [ ] Expose snapshot timing in `duckpipe.status()`currently snapshot duration/rate is only logged to PG log (`DuckPipe timing: action=snapshot_table`), not queryable via SQL. Add columns like `snapshot_duration_ms` and `snapshot_rows` to `table_mappings` so users and benchmark scripts can read them directly. Would also make `analyze_results.py` Summary consistent with per-table timings (no need to parse PG logs).
67+
- [x] Expose snapshot timing in `duckpipe.status()`added `snapshot_duration_ms` and `snapshot_rows` columns to `table_mappings` and `duckpipe.status()`. Snapshot duration is always measured (not gated on `timing` flag) and stored on CATCHUP transition. `resync_table()` resets both columns.
6868
- [x] Benchmark suite (`bench_suite.sh`) — 4 scenarios (single/multi × insert/mixed) with automated analysis report (`analyze_results.py`)
6969

7070
### Bugs

README.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -72,10 +72,10 @@ Sysbench on Apple M1 Pro, 100k rows/table, 30s OLTP phase:
7272

7373
| Scenario | Snapshot | OLTP TPS | Avg Lag | Consistency |
7474
|----------|----------|----------|---------|-------------|
75-
| 1 table, `oltp_insert` | 14,569 rows/s | 9,261 | 2.7 MB | PASS |
76-
| 4 tables, `oltp_insert` | 26,353 rows/s | 8,076 | 55 MB | PASS |
77-
| 1 table, `oltp_read_write` | 6,020 rows/s | 614 | 154 MB | PASS |
78-
| 4 tables, `oltp_read_write` | 38,858 rows/s | 444 | 332 MB | PASS |
75+
| 1 table, `oltp_insert` | 136,240 rows/s | 10,102 | 3.3 MB | PASS |
76+
| 4 tables, `oltp_insert` | 163,599 rows/s | 9,413 | 64.6 MB | PASS |
77+
| 1 table, `oltp_read_write` | 132,450 rows/s | 627 | 170.8 MB | PASS |
78+
| 4 tables, `oltp_read_write` | 150,830 rows/s | 450 | 369.3 MB | PASS |
7979

8080
Full breakdown (flush latency, phase timing, snapshot per-table, WAL cycles): [benchmark/results/report.md](benchmark/results/report.md)
8181

benchmark/analyze_results.py

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,16 @@ def parse_benchmark_log(path):
104104
elif "Consistency mismatches" in text:
105105
result.consistency = "FAIL"
106106

107+
# Per-table snapshot lines: Snapshot [public.sbtest1] : 100,000 rows in 800ms (125,000 rows/s)
108+
for m in re.finditer(
109+
r"Snapshot \[(\S+)\]\s*:\s*([\d,]+)\s*rows in\s*([\d,]+)ms",
110+
text,
111+
):
112+
table = m.group(1)
113+
rows = int(m.group(2).replace(",", ""))
114+
ms = float(m.group(3).replace(",", ""))
115+
result.snapshot_timings.append((table, rows, ms))
116+
107117
return result
108118

109119

@@ -152,13 +162,6 @@ def parse_pg_log(result, path):
152162
"total": float(m.group(11)),
153163
})
154164

155-
# DuckPipe timing: action=snapshot_table source=... rows=N elapsed_ms=X
156-
for m in re.finditer(
157-
r"DuckPipe timing: action=snapshot_table\s+source=(\S+)\s+target=\S+\s+rows=(\d+)\s+elapsed_ms=([\d.]+)",
158-
text,
159-
):
160-
result.snapshot_timings.append((m.group(1), int(m.group(2)), float(m.group(3))))
161-
162165
# DuckPipe timing: action=process_sync_group_streaming ... elapsed_ms=X
163166
for m in re.finditer(
164167
r"DuckPipe timing: action=process_sync_group_streaming\s+.*?elapsed_ms=([\d.]+)",

benchmark/results/report.md

Lines changed: 32 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -4,70 +4,70 @@
44

55
| Metric | Single-table append (1T, oltp_insert) | Multi-table append (4T, oltp_insert) | Single-table mixed (1T, oltp_read_write) | Multi-table mixed (4T, oltp_read_write) |
66
|--------|---|---|---|---|
7-
| Snapshot (rows/s) | 14,569 | 26,353 | 6,020 | 38,858 |
8-
| OLTP TPS | 9,261.0 | 8,075.7 | 613.6 | 444.4 |
9-
| Avg Lag (MB) | 2.7 | 55.0 | 154 | 332.1 |
10-
| Peak Lag (MB) | 4.0 | 126 | 171.7 | 357.3 |
11-
| Catch-up (s) | 2.2 | 2.5 | 65.4 | 68.5 |
12-
| Catch-up (rows/s) | - | 109 | - | - |
7+
| Snapshot (rows/s) | 136,240 | 163,599 | 132,450 | 150,830 |
8+
| OLTP TPS | 10,102.3 | 9,412.7 | 626.6 | 450.3 |
9+
| Avg Lag (MB) | 3.3 | 64.6 | 170.8 | 369.3 |
10+
| Peak Lag (MB) | 5.3 | 125.5 | 189.2 | 390.9 |
11+
| Catch-up (s) | 2.2 | 2.4 | 65.6 | 69.0 |
12+
| Catch-up (rows/s) | - | - | - | - |
1313
| Consistency | PASS | PASS | PASS | PASS |
1414

1515
## Flush Performance
1616

1717
| Metric | Single-table append (1T, oltp_insert) | Multi-table append (4T, oltp_insert) | Single-table mixed (1T, oltp_read_write) | Multi-table mixed (4T, oltp_read_write) |
1818
|--------|---|---|---|---|
19-
| Flush count | 29 | 119 | 30 | 120 |
20-
| Avg latency (ms) | 41.3 | 24.9 | 35.0 | 33.9 |
21-
| P50 latency (ms) | 33.8 | 16.8 | 33.2 | 23.9 |
22-
| P99 latency (ms) | 229.2 | 240.9 | 109.5 | 169.4 |
23-
| Avg rows/flush | 9,580.6 | 2,036.0 | 3,682.2 | 666.6 |
19+
| Flush count | 31 | 121 | 30 | 122 |
20+
| Avg latency (ms) | 35.1 | 17.6 | 31.0 | 25.6 |
21+
| P50 latency (ms) | 33.6 | 15.2 | 30.3 | 22.8 |
22+
| P99 latency (ms) | 63.6 | 107 | 48.4 | 122.1 |
23+
| Avg rows/flush | 9,776.6 | 2,333.8 | 3,760 | 664.4 |
2424

2525
### Flush Phase Breakdown (avg ms)
2626

2727
| Phase | Single-table append (1T, oltp_insert) | Multi-table append (4T, oltp_insert) | Single-table mixed (1T, oltp_read_write) | Multi-table mixed (4T, oltp_read_write) |
2828
|-------|---|---|---|---|
29-
| discover | 0.6 | 0.5 | - | - |
30-
| buf_create | 0.5 | 0.3 | 0.3 | 0.4 |
31-
| load | 18.2 | 4.4 | 6.9 | 1.7 |
32-
| compact | 6.6 | 3.8 | 3.9 | 3.5 |
29+
| discover | 0.5 | 0.5 | - | - |
30+
| buf_create | 0.4 | 0.3 | 0.3 | 0.3 |
31+
| load | 16.9 | 4.7 | 6.8 | 1.6 |
32+
| compact | 4.9 | 3.5 | 3.8 | 3.2 |
3333
| begin | 0.1 | 0.1 | 0.1 | 0.1 |
34-
| delete | 2.1 | 0.2 | 14.8 | 9.2 |
35-
| insert | 9.9 | 5.9 | 2.8 | 1.6 |
36-
| commit | 2.0 | 9.0 | 5.5 | 16.9 |
37-
| cleanup | 0.5 | 0.4 | 0.4 | 0.4 |
34+
| delete | 0.3 | 0.3 | 14.8 | 8.3 |
35+
| insert | 9.3 | 4.0 | 2.0 | 1.5 |
36+
| commit | 2.0 | 3.6 | 2.7 | 10.0 |
37+
| cleanup | 0.4 | 0.4 | 0.4 | 0.4 |
3838

3939
## Snapshot Timings
4040

4141
**Single-table append (1T, oltp_insert)**
4242

43-
- `public.sbtest1`: 100,000 rows in 799.7ms (125046 rows/s)
43+
- `public.sbtest1`: 100,000 rows in 734.0ms (136240 rows/s)
4444

4545
**Multi-table append (4T, oltp_insert)**
4646

47-
- `public.sbtest1`: 100,000 rows in 2903.2ms (34445 rows/s)
48-
- `public.sbtest2`: 100,000 rows in 3150.2ms (31744 rows/s)
49-
- `public.sbtest3`: 100,000 rows in 3162.5ms (31621 rows/s)
50-
- `public.sbtest4`: 100,000 rows in 3187.5ms (31373 rows/s)
47+
- `public.sbtest1`: 100,000 rows in 2423.0ms (41271 rows/s)
48+
- `public.sbtest2`: 100,000 rows in 2405.0ms (41580 rows/s)
49+
- `public.sbtest3`: 100,000 rows in 2235.0ms (44743 rows/s)
50+
- `public.sbtest4`: 100,000 rows in 2445.0ms (40900 rows/s)
5151

5252
**Single-table mixed (1T, oltp_read_write)**
5353

54-
- `public.sbtest1`: 100,000 rows in 1037.5ms (96383 rows/s)
54+
- `public.sbtest1`: 100,000 rows in 755.0ms (132450 rows/s)
5555

5656
**Multi-table mixed (4T, oltp_read_write)**
5757

58-
- `public.sbtest1`: 100,000 rows in 4152.2ms (24084 rows/s)
59-
- `public.sbtest3`: 100,000 rows in 4231.0ms (23635 rows/s)
60-
- `public.sbtest4`: 100,000 rows in 4369.0ms (22889 rows/s)
61-
- `public.sbtest2`: 100,000 rows in 4550.1ms (21978 rows/s)
58+
- `public.sbtest1`: 100,000 rows in 2545.0ms (39293 rows/s)
59+
- `public.sbtest2`: 100,000 rows in 2627.0ms (38066 rows/s)
60+
- `public.sbtest3`: 100,000 rows in 2642.0ms (37850 rows/s)
61+
- `public.sbtest4`: 100,000 rows in 2652.0ms (37707 rows/s)
6262

6363

6464
## WAL Processing Cycle Times
6565

6666
| Metric | Single-table append (1T, oltp_insert) | Multi-table append (4T, oltp_insert) | Single-table mixed (1T, oltp_read_write) | Multi-table mixed (4T, oltp_read_write) |
6767
|--------|---|---|---|---|
68-
| Cycles | 61 | 70 | 2,631 | 3,059 |
69-
| Avg (ms) | 596.8 | 581.6 | 19.4 | 14.6 |
70-
| P99 (ms) | 5,004.0 | 5,004.6 | 17.0 | 10.5 |
68+
| Cycles | 35 | 51 | 2,671 | 3,184 |
69+
| Avg (ms) | 1,049.3 | 804.7 | 19.1 | 13.9 |
70+
| P99 (ms) | 5,003.3 | 5,005.2 | 18.9 | 9.3 |
7171

7272
## Issues & Observations
7373

benchmark/run_sysbench.py

Lines changed: 40 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,28 @@ def get_total_queued_changes(db_params):
104104
)
105105
return parse_int(res, 0)
106106

107+
def get_snapshot_metrics(db_params, num_tables):
108+
"""Query duckpipe.status() for per-table snapshot timing from SQL-exposed metrics."""
109+
table_filter = ", ".join([f"'public.sbtest{i}'" for i in range(1, num_tables + 1)])
110+
res = run_sql(
111+
db_params,
112+
f"SELECT source_table, snapshot_duration_ms, snapshot_rows "
113+
f"FROM duckpipe.status() "
114+
f"WHERE source_table IN ({table_filter}) "
115+
f"AND snapshot_duration_ms IS NOT NULL",
116+
)
117+
metrics = []
118+
if res:
119+
for line in res.strip().split("\n"):
120+
parts = line.split("|")
121+
if len(parts) == 3:
122+
table = parts[0].strip()
123+
duration_ms = float(parts[1].strip())
124+
rows = int(parts[2].strip())
125+
metrics.append((table, rows, duration_ms))
126+
return metrics
127+
128+
107129
def get_benchmark_rows_synced(args, db_params):
108130
table_filter = ", ".join([f"'public.sbtest{i}'" for i in range(1, args.tables + 1)])
109131
res = run_sql(
@@ -286,9 +308,21 @@ def benchmark_snapshot(args, db_params):
286308

287309
if target_count >= total_rows and not_streaming == 0:
288310
elapsed = time.time() - start_time
289-
snapshot_rate = target_count / elapsed if elapsed > 0 else 0
311+
wall_clock_rate = target_count / elapsed if elapsed > 0 else 0
290312
print(f"\n[+] Snapshot complete!")
291-
return snapshot_rate
313+
314+
# Fetch per-table snapshot metrics from duckpipe.status()
315+
snapshot_metrics = get_snapshot_metrics(db_params, args.tables)
316+
if snapshot_metrics:
317+
# Concurrent snapshots overlap; effective rate = total rows / max duration
318+
max_duration_ms = max(ms for _, _, ms in snapshot_metrics)
319+
total_snapshot_rows = sum(r for _, r, _ in snapshot_metrics)
320+
snapshot_rate = total_snapshot_rows / max_duration_ms * 1000 if max_duration_ms > 0 else wall_clock_rate
321+
else:
322+
snapshot_rate = wall_clock_rate
323+
snapshot_metrics = []
324+
325+
return snapshot_rate, snapshot_metrics
292326

293327
# If all rows are physically copied but tables are stuck in CATCHUP (no SNAPSHOT
294328
# remaining), nudge the WAL consumer by emitting a transactional logical message.
@@ -579,7 +613,7 @@ def main():
579613
if not args.skip_prepare:
580614
prepare_env(args, db_params)
581615

582-
snapshot_rate = benchmark_snapshot(args, db_params)
616+
snapshot_rate, snapshot_metrics = benchmark_snapshot(args, db_params)
583617
(streaming_tps, avg_lag, max_lag, final_lag,
584618
catchup_elapsed, catchup_throughput,
585619
actual_final_count, expected_final_count,
@@ -588,6 +622,9 @@ def main():
588622
print("\n===========================================================")
589623
print(" Final Results")
590624
print("===========================================================")
625+
for table, rows, ms in snapshot_metrics:
626+
rate = rows / ms * 1000 if ms > 0 else 0
627+
print(f" Snapshot [{table}] : {rows:,} rows in {ms:.0f}ms ({rate:,.0f} rows/s)")
591628
print(f" Snapshot Throughput : {snapshot_rate:.0f} rows/sec")
592629
print(f" OLTP Throughput : {streaming_tps:.2f} TPS")
593630
print(f" Avg Replication Lag : {avg_lag/1024/1024:.1f} MB [during OLTP]")

doc/CODE_WALKTHROUGH.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -361,7 +361,7 @@ Async operations against `duckpipe.sync_groups` and `duckpipe.table_mappings` vi
361361
- `update_applied_lsn(id, lsn)` — per-table flush checkpoint
362362

363363
**State transitions:**
364-
- `set_catchup_state(id, snapshot_lsn)` — SNAPSHOT → CATCHUP
364+
- `set_catchup_state(id, snapshot_lsn, duration_ms, snapshot_rows)` — SNAPSHOT → CATCHUP, stores snapshot timing
365365
- `transition_catchup_to_streaming(group_id, pending_lsn)` — bulk promote CATCHUP tables where `snapshot_lsn <= pending_lsn`
366366

367367
**Error tracking:**
@@ -589,7 +589,7 @@ The top-level function called by both the bgworker and daemon. One complete cycl
589589
4. **Disconnect**
590590
5. Return whether any work was done (caller uses this to decide sleep vs. immediate re-poll)
591591

592-
**`process_snapshots(meta, group, connstr, timing)`** — spawns all SNAPSHOT tables as concurrent tokio tasks. Each runs `snapshot::process_snapshot_task()` independently. On success: `set_catchup_state()`. On failure: `record_error_message()` (table stays in SNAPSHOT for retry).
592+
**`process_snapshots(meta, group, connstr, timing)`** — spawns all SNAPSHOT tables as concurrent tokio tasks. Each runs `snapshot::process_snapshot_task()` independently. On success: `set_catchup_state()` stores the snapshot_lsn, duration_ms, and rows_copied. On failure: `record_error_message()` (table stays in SNAPSHOT for retry).
593593

594594
**`process_sync_group_streaming(client, meta, group, config, consumer, coordinator)`** — streaming WAL consumption:
595595
1. Check backpressure — if flush threads are lagging, skip this poll round (but still read `min(applied_lsn)` from PG and advance slot)

doc/USAGE.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,8 @@ FROM duckpipe.status();
9696
| `consecutive_failures` | Number of flush failures since last success (ERRORED triggers at 3) |
9797
| `retry_at` | Scheduled auto-retry time when in ERRORED state |
9898
| `error_message` | Last error message (empty when healthy) |
99+
| `snapshot_duration_ms` | Time taken by the initial snapshot (NULL before snapshot completes) |
100+
| `snapshot_rows` | Number of rows copied during the initial snapshot |
99101

100102
### Group Overview
101103

duckpipe-core/src/metadata.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -560,18 +560,24 @@ impl<'a> MetadataClient<'a> {
560560
.collect())
561561
}
562562

563-
/// Update table state to CATCHUP with snapshot_lsn.
563+
/// Update table state to CATCHUP with snapshot_lsn, duration, and row count.
564564
pub async fn set_catchup_state(
565565
&self,
566566
mapping_id: i32,
567567
snapshot_lsn: u64,
568+
duration_ms: u64,
569+
snapshot_rows: u64,
568570
) -> Result<(), tokio_postgres::Error> {
569571
let lsn_str = format_lsn(snapshot_lsn);
572+
let duration_ms_i64 = duration_ms as i64;
573+
let snapshot_rows_i64 = snapshot_rows as i64;
570574
self.client
571575
.execute(
572576
"UPDATE duckpipe.table_mappings SET state = 'CATCHUP', \
573-
snapshot_lsn = $1::text::pg_lsn WHERE id = $2",
574-
&[&lsn_str, &mapping_id],
577+
snapshot_lsn = $1::text::pg_lsn, \
578+
snapshot_duration_ms = $2, snapshot_rows = $3 \
579+
WHERE id = $4",
580+
&[&lsn_str, &duration_ms_i64, &snapshot_rows_i64, &mapping_id],
575581
)
576582
.await?;
577583
Ok(())

duckpipe-core/src/service.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -828,8 +828,8 @@ pub async fn run_sync_cycle(
828828
let snap_results = snapshot_manager.collect_results();
829829
for snap in &snap_results {
830830
match &snap.result {
831-
Ok((snapshot_lsn, rows_copied)) => {
832-
meta.set_catchup_state(snap.task_id, *snapshot_lsn)
831+
Ok((snapshot_lsn, rows_copied, duration_ms)) => {
832+
meta.set_catchup_state(snap.task_id, *snapshot_lsn, *duration_ms, *rows_copied)
833833
.await
834834
.map_err(|e| format!("set_catchup_state: {}", e))?;
835835
if *rows_copied > 0 {

duckpipe-core/src/snapshot.rs

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -40,16 +40,16 @@ pub struct SnapshotResult {
4040
pub source_table: String,
4141
pub target_schema: String,
4242
pub target_table: String,
43-
/// Ok((consistent_point_lsn, rows_copied)) on success, Err(message) on failure.
44-
pub result: Result<(u64, u64), String>,
43+
/// Ok((consistent_point_lsn, rows_copied, duration_ms)) on success, Err(message) on failure.
44+
pub result: Result<(u64, u64, u64), String>,
4545
}
4646

4747
/// Process snapshot for a single table using a chunked COPY pipeline.
4848
///
4949
/// Creates a temporary logical replication slot via SQL to get a consistent_point,
5050
/// then streams COPY TO STDOUT through CSV chunk files into DuckDB via read_csv.
5151
///
52-
/// Returns `(consistent_point_lsn, rows_copied)` on success.
52+
/// Returns `(consistent_point_lsn, rows_copied, duration_ms)` on success.
5353
pub async fn process_snapshot_task(
5454
source_schema: &str,
5555
source_table: &str,
@@ -60,8 +60,8 @@ pub async fn process_snapshot_task(
6060
ducklake_schema: &str,
6161
timing: bool,
6262
task_id: i32,
63-
) -> Result<(u64, u64), String> {
64-
let table_start = if timing { Some(Instant::now()) } else { None };
63+
) -> Result<(u64, u64, u64), String> {
64+
let table_start = Instant::now();
6565

6666
// Step 1: Open control connection — creates temp slot and runs COPY.
6767
let (ctrl_client, ctrl_connection) =
@@ -201,6 +201,8 @@ pub async fn process_snapshot_task(
201201

202202
let rows_copied = copy_result?;
203203

204+
let elapsed_ms = table_start.elapsed().as_millis() as u64;
205+
204206
tracing::info!(
205207
"DuckPipe: Snapshot complete for {}.{}: {} rows copied (consistent_point={})",
206208
source_schema,
@@ -209,19 +211,19 @@ pub async fn process_snapshot_task(
209211
format_lsn(consistent_point)
210212
);
211213

212-
if let Some(start) = table_start {
214+
if timing {
213215
tracing::info!(
214-
"DuckPipe timing: action=snapshot_table source={}.{} target={}.{} rows={} elapsed_ms={:.3}",
216+
"DuckPipe timing: action=snapshot_table source={}.{} target={}.{} rows={} elapsed_ms={}",
215217
source_schema,
216218
source_table,
217219
target_schema,
218220
target_table,
219221
rows_copied,
220-
start.elapsed().as_secs_f64() * 1000.0
222+
elapsed_ms
221223
);
222224
}
223225

224-
Ok((consistent_point, rows_copied))
226+
Ok((consistent_point, rows_copied, elapsed_ms))
225227
}
226228

227229
/// CSV producer: streams COPY TO STDOUT bytes into chunk files with headers.

0 commit comments

Comments
 (0)