Skip to content

Commit 806ee87

Browse files
YuweiXiaoclaude
andcommitted
style: apply cargo fmt formatting
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent 00de5ff commit 806ee87

13 files changed

Lines changed: 248 additions & 163 deletions

File tree

duckpipe-core/src/duckdb_flush.rs

Lines changed: 16 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -113,14 +113,12 @@ fn discover_lake_table_info(
113113

114114
let mut column_types = Vec::with_capacity(expected_attnames.len());
115115
for name in expected_attnames {
116-
let dtype = lake_col_map
117-
.get(&name.to_lowercase())
118-
.ok_or_else(|| {
119-
format!(
120-
"column '{}' not found in DuckLake table {}.{}",
121-
name, lake_schema, target_table
122-
)
123-
})?;
116+
let dtype = lake_col_map.get(&name.to_lowercase()).ok_or_else(|| {
117+
format!(
118+
"column '{}' not found in DuckLake table {}.{}",
119+
name, lake_schema, target_table
120+
)
121+
})?;
124122
column_types.push(dtype.clone());
125123
}
126124

@@ -175,7 +173,7 @@ impl FlushWorker {
175173
db.execute_batch(
176174
"SET ducklake_retry_wait_ms = 100; \
177175
SET ducklake_retry_backoff = 2.0; \
178-
SET ducklake_max_retry_count = 10;"
176+
SET ducklake_max_retry_count = 10;",
179177
)
180178
.map_err(|e| format!("duckdb set retry: {}", e))?;
181179

@@ -236,10 +234,7 @@ impl FlushWorker {
236234
// pgoutput always includes every column value in UPDATE WAL records.
237235
// Any 'u'-status (TOAST unchanged) column reaching the flush path means
238236
// the source table had its REPLICA IDENTITY changed after add_table().
239-
if changes
240-
.iter()
241-
.any(|c| c.col_unchanged.iter().any(|&u| u))
242-
{
237+
if changes.iter().any(|c| c.col_unchanged.iter().any(|&u| u)) {
243238
return Err(
244239
"TOAST unchanged column detected in WAL — source table must have \
245240
REPLICA IDENTITY FULL. Run: ALTER TABLE <name> REPLICA IDENTITY FULL"
@@ -403,12 +398,10 @@ impl FlushWorker {
403398
let deleted_count: usize = if skip_delete {
404399
0
405400
} else {
406-
self.db
407-
.execute(&delete_sql, [])
408-
.map_err(|e| {
409-
let _ = self.db.execute_batch("ROLLBACK");
410-
format!("duckdb delete from {}: {}", target_key, e)
411-
})?
401+
self.db.execute(&delete_sql, []).map_err(|e| {
402+
let _ = self.db.execute_batch("ROLLBACK");
403+
format!("duckdb delete from {}: {}", target_key, e)
404+
})?
412405
};
413406
let t_delete_ms = t_phase.elapsed().as_secs_f64() * 1000.0;
414407

@@ -427,12 +420,10 @@ impl FlushWorker {
427420
target_ref = target_ref,
428421
cols = all_cols.join(", ")
429422
);
430-
self.db
431-
.execute_batch(&insert_sql)
432-
.map_err(|e| {
433-
let _ = self.db.execute_batch("ROLLBACK");
434-
format!("duckdb insert into {}: {}", target_key, e)
435-
})?;
423+
self.db.execute_batch(&insert_sql).map_err(|e| {
424+
let _ = self.db.execute_batch("ROLLBACK");
425+
format!("duckdb insert into {}: {}", target_key, e)
426+
})?;
436427
let t_insert_ms = t_phase.elapsed().as_secs_f64() * 1000.0;
437428

438429
let t_phase = Instant::now();
@@ -486,7 +477,6 @@ impl FlushWorker {
486477
}
487478
}
488479

489-
490480
/// Result of a DuckDB-based flush.
491481
#[derive(Debug)]
492482
pub struct DuckDbFlushResult {

duckpipe-core/src/flush_coordinator.rs

Lines changed: 26 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -160,18 +160,19 @@ impl FlushCoordinator {
160160
) {
161161
// Seed in-memory LSN from the persistent PG value if we haven't seen this table yet.
162162
// `or_insert` preserves any higher value already tracked from a completed flush.
163-
self.per_table_lsn.entry(mapping_id).or_insert(initial_applied_lsn);
163+
self.per_table_lsn
164+
.entry(mapping_id)
165+
.or_insert(initial_applied_lsn);
164166
// Cache target_key → mapping_id for lock-free lookup in get_min_applied_lsn_in_coordinator.
165-
self.target_to_mapping.insert(target_key.to_string(), mapping_id);
167+
self.target_to_mapping
168+
.insert(target_key.to_string(), mapping_id);
166169
// Check if entry exists and thread is alive
167170
let needs_spawn = match self.threads.get(target_key) {
168171
None => true,
169-
Some(entry) => {
170-
match &entry.join_handle {
171-
Some(h) => h.is_finished(),
172-
None => true,
173-
}
174-
}
172+
Some(entry) => match &entry.join_handle {
173+
Some(h) => h.is_finished(),
174+
None => true,
175+
},
175176
};
176177

177178
if !needs_spawn {
@@ -264,7 +265,9 @@ impl FlushCoordinator {
264265
guard.last_lsn = change.lsn;
265266
}
266267
guard.changes.push(change);
267-
self.backpressure.total_queued.fetch_add(1, Ordering::Relaxed);
268+
self.backpressure
269+
.total_queued
270+
.fetch_add(1, Ordering::Relaxed);
268271
entry.queue_handle.condvar.notify_one();
269272
}
270273
}
@@ -413,7 +416,12 @@ impl FlushCoordinator {
413416
pub fn collect_results(&mut self) -> Vec<FlushThreadResult> {
414417
let mut results = Vec::new();
415418
while let Ok(r) = self.result_rx.try_recv() {
416-
if let FlushThreadResult::Success { mapping_id, last_lsn, .. } = &r {
419+
if let FlushThreadResult::Success {
420+
mapping_id,
421+
last_lsn,
422+
..
423+
} = &r
424+
{
417425
let entry = self.per_table_lsn.entry(*mapping_id).or_insert(0);
418426
if *last_lsn > *entry {
419427
*entry = *last_lsn;
@@ -539,7 +547,9 @@ fn flush_thread_main(
539547
if control.shutdown.load(Ordering::Acquire) {
540548
// Drop in-memory changes — confirmed_lsn was never advanced past them,
541549
// so PostgreSQL will re-deliver them on the next startup.
542-
backpressure.total_queued.fetch_sub(accumulated_count, Ordering::Relaxed);
550+
backpressure
551+
.total_queued
552+
.fetch_sub(accumulated_count, Ordering::Relaxed);
543553
pending_local.store(0, Ordering::Relaxed);
544554
if control.drain_requested.load(Ordering::Acquire) {
545555
signal_drain_complete(&drain_complete, &control);
@@ -622,7 +632,9 @@ fn flush_thread_main(
622632
);
623633
}
624634
accumulated.clear();
625-
backpressure.total_queued.fetch_sub(accumulated_count, Ordering::Relaxed);
635+
backpressure
636+
.total_queued
637+
.fetch_sub(accumulated_count, Ordering::Relaxed);
626638
accumulated_count = 0;
627639
pending_local.store(0, Ordering::Relaxed);
628640
accumulated_lsn = 0;
@@ -697,7 +709,8 @@ fn do_flush(
697709
)) {
698710
tracing::error!(
699711
"pg_duckpipe: metrics update failed for {}: {}",
700-
result.target_key, e
712+
result.target_key,
713+
e
701714
);
702715
}
703716
// Clear error state on success

duckpipe-core/src/flush_worker.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,9 @@ pub async fn update_error_state(
7272
.await;
7373
tracing::warn!(
7474
"pg_duckpipe: mapping {} transitioned to ERRORED after {} failures, retry in {}s",
75-
mapping_id, count, backoff
75+
mapping_id,
76+
count,
77+
backoff
7678
);
7779
}
7880
}
@@ -84,10 +86,7 @@ pub async fn update_error_state(
8486
}
8587

8688
/// Clear error state on successful flush: reset consecutive_failures and error_message.
87-
pub async fn clear_error_on_success(
88-
connstr: &str,
89-
mapping_id: i32,
90-
) -> Result<(), String> {
89+
pub async fn clear_error_on_success(connstr: &str, mapping_id: i32) -> Result<(), String> {
9190
let (client, connection) = tokio_postgres::connect(connstr, NoTls)
9291
.await
9392
.map_err(|e| format!("clear_error connect: {}", e))?;

duckpipe-core/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
pub mod decoder;
2-
pub mod log;
32
pub mod duckdb_flush;
43
pub mod error;
54
pub mod flush_coordinator;
65
pub mod flush_worker;
6+
pub mod log;
77
pub mod metadata;
88
pub mod queue;
99
pub mod service;

duckpipe-core/src/metadata.rs

Lines changed: 6 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use std::collections::HashMap;
44

55
use tokio_postgres::Client;
66

7-
use crate::types::{SyncGroup, TableMapping, format_lsn, parse_lsn};
7+
use crate::types::{format_lsn, parse_lsn, SyncGroup, TableMapping};
88

99
/// Async metadata client wrapping a tokio-postgres connection.
1010
pub struct MetadataClient<'a> {
@@ -35,9 +35,7 @@ impl<'a> MetadataClient<'a> {
3535
let publication: String = row.get(2);
3636
let slot_name: String = row.get(3);
3737
let confirmed_lsn_str: Option<String> = row.get(4);
38-
let confirmed_lsn = confirmed_lsn_str
39-
.map(|s| parse_lsn(&s))
40-
.unwrap_or(0);
38+
let confirmed_lsn = confirmed_lsn_str.map(|s| parse_lsn(&s)).unwrap_or(0);
4139
SyncGroup {
4240
id,
4341
name,
@@ -81,16 +79,12 @@ impl<'a> MetadataClient<'a> {
8179
let target_table: String = row.get(4);
8280
let state: String = row.get(5);
8381
let snapshot_lsn_str: Option<String> = row.get(6);
84-
let snapshot_lsn = snapshot_lsn_str
85-
.map(|s| parse_lsn(&s))
86-
.unwrap_or(0);
82+
let snapshot_lsn = snapshot_lsn_str.map(|s| parse_lsn(&s)).unwrap_or(0);
8783
let enabled: bool = row.get(7);
8884
let source_oid: Option<i64> = row.get(8);
8985
let error_message: Option<String> = row.get(9);
9086
let applied_lsn_str: Option<String> = row.get(10);
91-
let applied_lsn = applied_lsn_str
92-
.map(|s| parse_lsn(&s))
93-
.unwrap_or(0);
87+
let applied_lsn = applied_lsn_str.map(|s| parse_lsn(&s)).unwrap_or(0);
9488

9589
Ok(Some(TableMapping {
9690
id,
@@ -247,10 +241,7 @@ impl<'a> MetadataClient<'a> {
247241
}
248242

249243
/// Auto-retry: transition ERRORED table back to STREAMING, clear error and failures.
250-
pub async fn retry_errored_table(
251-
&self,
252-
mapping_id: i32,
253-
) -> Result<(), tokio_postgres::Error> {
244+
pub async fn retry_errored_table(&self, mapping_id: i32) -> Result<(), tokio_postgres::Error> {
254245
self.client
255246
.execute(
256247
"UPDATE duckpipe.table_mappings SET state = 'STREAMING', \
@@ -376,10 +367,7 @@ impl<'a> MetadataClient<'a> {
376367
/// Get the minimum applied_lsn across all active (STREAMING/CATCHUP) tables in a group.
377368
/// Returns 0 if any active table has NULL applied_lsn (not yet flushed), or if
378369
/// there are no active tables.
379-
pub async fn get_min_applied_lsn(
380-
&self,
381-
group_id: i32,
382-
) -> Result<u64, tokio_postgres::Error> {
370+
pub async fn get_min_applied_lsn(&self, group_id: i32) -> Result<u64, tokio_postgres::Error> {
383371
// For CATCHUP tables with NULL applied_lsn, use snapshot_lsn as the
384372
// effective floor: everything up to snapshot_lsn is either in the
385373
// snapshot copy or will be skipped by CATCHUP skip logic, so the slot

duckpipe-core/src/queue.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,4 +58,3 @@ impl TableQueue {
5858
self.changes.is_empty()
5959
}
6060
}
61-

0 commit comments

Comments
 (0)