@@ -2239,11 +2239,20 @@ fn metrics() -> String {
22392239 let mut group_entries = Vec :: new ( ) ;
22402240
22412241 Spi :: connect ( |client| {
2242+ // Current WAL position for source_lag_bytes on local groups.
2243+ let current_wal_lsn: u64 = client
2244+ . select ( "SELECT pg_current_wal_lsn()::text" , None , & [ ] )
2245+ . ok ( )
2246+ . and_then ( |t| t. into_iter ( ) . next ( ) )
2247+ . and_then ( |r| r. get :: < String > ( 1 ) . ok ( ) . flatten ( ) )
2248+ . map ( |s| duckpipe_core:: types:: parse_lsn ( & s) )
2249+ . unwrap_or ( 0 ) ;
2250+
22422251 // Tables
22432252 let result = client. select (
22442253 "SELECT g.name, m.source_schema || '.' || m.source_table, \
22452254 m.state, m.rows_synced, m.consecutive_failures, \
2246- m.snapshot_duration_ms, m.snapshot_rows, m.applied_lsn::text, m.id \
2255+ m.snapshot_duration_ms, m.snapshot_rows, m.applied_lsn::text, m.id, m.group_id \
22472256 FROM duckpipe.table_mappings m \
22482257 JOIN duckpipe.sync_groups g ON m.group_id = g.id \
22492258 ORDER BY g.name, m.source_schema, m.source_table",
@@ -2261,15 +2270,33 @@ fn metrics() -> String {
22612270 let snapshot_rows: Option < i64 > = row. get ( 7 ) . unwrap ( ) ;
22622271 let applied_lsn: Option < String > = row. get ( 8 ) . unwrap ( ) ;
22632272 let mapping_id: i32 = row. get :: < i32 > ( 9 ) . unwrap ( ) . unwrap_or ( 0 ) ;
2273+ let group_id: i32 = row. get :: < i32 > ( 10 ) . unwrap ( ) . unwrap_or ( 0 ) ;
22642274
22652275 let tm = shm_table_map. get ( & mapping_id) . copied ( ) . unwrap_or_default ( ) ;
22662276
2277+ let pending_lsn = shm_group_map
2278+ . get ( & group_id)
2279+ . map ( |g| g. pending_lsn )
2280+ . unwrap_or ( 0 ) ;
2281+ let applied_lsn_u64 = applied_lsn
2282+ . as_ref ( )
2283+ . map ( |s| duckpipe_core:: types:: parse_lsn ( s) )
2284+ . unwrap_or ( 0 ) ;
2285+ let flush_lag_bytes: Option < i64 > = if pending_lsn == 0 || applied_lsn_u64 == 0 {
2286+ None
2287+ } else if tm. queued_changes == 0 {
2288+ Some ( 0 )
2289+ } else {
2290+ Some ( pending_lsn. saturating_sub ( applied_lsn_u64) as i64 )
2291+ } ;
2292+
22672293 table_entries. push ( format ! (
22682294 "{{\" group\" :{},\" source_table\" :{},\" state\" :{},\" rows_synced\" :{},\
22692295 \" queued_changes\" :{},\" duckdb_memory_bytes\" :{},\
22702296 \" consecutive_failures\" :{},\" flush_count\" :{},\" flush_duration_ms\" :{},\
22712297 \" avg_row_bytes\" :{},\
2272- \" snapshot_duration_ms\" :{},\" snapshot_rows\" :{},\" applied_lsn\" :{}}}",
2298+ \" snapshot_duration_ms\" :{},\" snapshot_rows\" :{},\" applied_lsn\" :{},\
2299+ \" flush_lag_bytes\" :{}}}",
22732300 json_str( & group_name) ,
22742301 json_str( & source_table) ,
22752302 json_str( & state) ,
@@ -2283,32 +2310,44 @@ fn metrics() -> String {
22832310 json_opt_i64( snapshot_duration_ms) ,
22842311 json_opt_i64( snapshot_rows) ,
22852312 json_opt_str( applied_lsn. as_deref( ) ) ,
2313+ json_opt_i64( flush_lag_bytes) ,
22862314 ) ) ;
22872315 }
22882316 }
22892317
22902318 // Groups
22912319 let result = client. select (
2292- "SELECT g.id, g.name FROM duckpipe.sync_groups g ORDER BY g.name" ,
2320+ "SELECT g.id, g.name, (g.conninfo IS NULL) as is_local \
2321+ FROM duckpipe.sync_groups g ORDER BY g.name",
22932322 None ,
22942323 & [ ] ,
22952324 ) ;
22962325 if let Ok ( tuptable) = result {
22972326 for row in tuptable {
22982327 let group_id: i32 = row. get :: < i32 > ( 1 ) . unwrap ( ) . unwrap_or ( 0 ) ;
22992328 let name: String = row. get :: < String > ( 2 ) . unwrap ( ) . unwrap_or_default ( ) ;
2329+ let is_local: bool = row. get :: < bool > ( 3 ) . unwrap ( ) . unwrap_or ( false ) ;
23002330
23012331 let gm = shm_group_map. get ( & group_id) . copied ( ) . unwrap_or_default ( ) ;
23022332
2333+ let pending_lsn = gm. pending_lsn ;
2334+ let source_lag_bytes: Option < i64 > =
2335+ if is_local && current_wal_lsn != 0 && pending_lsn != 0 {
2336+ Some ( current_wal_lsn. saturating_sub ( pending_lsn) as i64 )
2337+ } else {
2338+ None
2339+ } ;
2340+
23032341 group_entries. push ( format ! (
23042342 "{{\" name\" :{},\" total_queued_changes\" :{},\" is_backpressured\" :{},\" active_flushes\" :{},\
2305- \" gate_wait_avg_ms\" :{},\" gate_timeouts\" :{}}}",
2343+ \" gate_wait_avg_ms\" :{},\" gate_timeouts\" :{}, \" source_lag_bytes \" :{} }}",
23062344 json_str( & name) ,
23072345 gm. total_queued_changes,
23082346 gm. is_backpressured,
23092347 gm. active_flushes,
23102348 gm. gate_wait_avg_ms,
23112349 gm. gate_timeouts,
2350+ json_opt_i64( source_lag_bytes) ,
23122351 ) ) ;
23132352 }
23142353 }
0 commit comments