Skip to content

Commit c1a99ef

Browse files
authored
Fix commit lsn send (#2178)
1 parent f3070ad commit c1a99ef

File tree

9 files changed

+84
-66
lines changed

9 files changed

+84
-66
lines changed

src/moonlink/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
pub mod error;
22
pub mod event_sync;
3+
pub mod lsn_state;
34
pub mod mooncake_table_id;
45
mod observability;
56
pub mod row;
@@ -11,6 +12,7 @@ mod union_read;
1112

1213
pub use error::*;
1314
pub use event_sync::EventSyncSender;
15+
pub use lsn_state::{CommitState, ReplicationState};
1416
pub use mooncake_table_id::MooncakeTableId;
1517
pub use storage::mooncake_table::batch_id_counter::BatchIdCounter;
1618
pub use storage::mooncake_table::data_batches::ColumnStoreBuffer;
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ impl LsnState {
5454
}
5555

5656
pub type ReplicationState = LsnState;
57-
pub type CommitStatus = LsnState;
57+
pub type CommitState = LsnState;
5858

5959
#[cfg(test)]
6060
mod tests {

src/moonlink_connectors/src/lib.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
pub mod error;
2-
pub mod lsn_state;
32
pub mod pg_replicate;
43
mod replication_connection;
54
mod replication_manager;

src/moonlink_connectors/src/pg_replicate.rs

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ pub mod table;
1010
pub mod table_init;
1111
pub mod util;
1212

13-
use crate::lsn_state::ReplicationState;
1413
use crate::pg_replicate::clients::postgres::{build_tls_connector, ReplicationClient};
1514
use crate::pg_replicate::conversions::cdc_event::{CdcEvent, CdcEventConversionError};
1615
use crate::pg_replicate::initial_copy::copy_table_stream;
@@ -24,8 +23,8 @@ use crate::pg_replicate::table_init::{build_table_components, TableComponents};
2423
use crate::Result;
2524
use futures::StreamExt;
2625
use moonlink::{
27-
MooncakeTableId, MoonlinkTableConfig, ObjectStorageCache, ReadStateFilepathRemap, TableEvent,
28-
WalManager,
26+
CommitState, MooncakeTableId, MoonlinkTableConfig, ObjectStorageCache, ReadStateFilepathRemap,
27+
ReplicationState, TableEvent, WalManager,
2928
};
3029
use native_tls::{Certificate, TlsConnector};
3130
use pg_escape::{quote_identifier, quote_literal};
@@ -66,7 +65,7 @@ pub enum PostgresReplicationCommand {
6665
src_table_id: SrcTableId,
6766
schema: TableSchema,
6867
event_sender: mpsc::Sender<TableEvent>,
69-
commit_lsn_tx: watch::Sender<u64>,
68+
commit_state: Arc<CommitState>,
7069
flush_lsn_rx: watch::Receiver<u64>,
7170
wal_flush_lsn_rx: watch::Receiver<u64>,
7271
ready_tx: oneshot::Sender<()>,
@@ -247,7 +246,7 @@ impl PostgresConnection {
247246
schema: &TableSchema,
248247
event_sender: mpsc::Sender<TableEvent>,
249248
is_recovery: bool,
250-
commit_lsn_tx: watch::Sender<u64>,
249+
commit_lsn_tx: Arc<CommitState>,
251250
table_base_path: &str,
252251
) -> Result<(bool)> {
253252
let src_table_id = schema.src_table_id;
@@ -296,9 +295,7 @@ impl PostgresConnection {
296295
}
297296

298297
// Notify read state manager with the commit LSN for the initial copy boundary.
299-
if let Err(e) = commit_lsn_tx.send(progress.boundary_lsn.into()) {
300-
warn!(error = ?e, table_id = src_table_id, "failed to send initial copy commit lsn");
301-
}
298+
commit_lsn_tx.mark(progress.boundary_lsn.into());
302299
self.replication_state.mark(progress.boundary_lsn.into());
303300

304301
Ok(true)
@@ -398,7 +395,7 @@ impl PostgresConnection {
398395
src_table_id: SrcTableId,
399396
schema: TableSchema,
400397
event_sender: mpsc::Sender<TableEvent>,
401-
commit_lsn_tx: watch::Sender<u64>,
398+
commit_state: Arc<CommitState>,
402399
flush_lsn_rx: watch::Receiver<u64>,
403400
wal_flush_lsn_rx: watch::Receiver<u64>,
404401
) -> Result<oneshot::Receiver<()>> {
@@ -407,7 +404,7 @@ impl PostgresConnection {
407404
src_table_id,
408405
schema,
409406
event_sender,
410-
commit_lsn_tx,
407+
commit_state,
411408
flush_lsn_rx,
412409
wal_flush_lsn_rx,
413410
ready_tx,
@@ -522,7 +519,7 @@ impl PostgresConnection {
522519

523520
// Send command to add table to replication
524521
let commit_lsn_tx = table_resources
525-
.commit_lsn_tx
522+
.commit_state
526523
.take()
527524
.expect("commit_lsn_tx is None");
528525
let commit_lsn_tx_for_copy = commit_lsn_tx.clone();
@@ -763,8 +760,8 @@ pub async fn run_event_loop(
763760
}
764761
},
765762
Some(cmd) = cmd_rx.recv() => match cmd {
766-
PostgresReplicationCommand::AddTable { src_table_id, schema, event_sender, commit_lsn_tx, flush_lsn_rx, wal_flush_lsn_rx, ready_tx } => {
767-
sink.add_table(src_table_id, event_sender, commit_lsn_tx, &schema);
763+
PostgresReplicationCommand::AddTable { src_table_id, schema, event_sender, commit_state, flush_lsn_rx, wal_flush_lsn_rx, ready_tx } => {
764+
sink.add_table(src_table_id, event_sender, commit_state, &schema);
768765
flush_lsn_rxs.insert(src_table_id, flush_lsn_rx);
769766
wal_flush_lsn_rxs.insert(src_table_id, wal_flush_lsn_rx);
770767
stream.as_mut().add_table_schema(schema);

src/moonlink_connectors/src/pg_replicate/moonlink_sink.rs

Lines changed: 46 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
1-
use crate::lsn_state::ReplicationState;
21
use crate::pg_replicate::util::PostgresTableRow;
32
use crate::pg_replicate::{
43
conversions::{cdc_event::CdcEvent, table_row::TableRow},
54
table::{SrcTableId, TableSchema},
65
};
76
use moonlink::TableEvent;
7+
use moonlink::{CommitState, ReplicationState};
88
use more_asserts as ma;
99
use postgres_replication::protocol::Column as ReplicationColumn;
1010
use std::collections::HashMap;
@@ -35,7 +35,7 @@ struct ColumnInfo {
3535
}
3636
pub struct Sink {
3737
event_senders: HashMap<SrcTableId, Sender<TableEvent>>,
38-
commit_lsn_txs: HashMap<SrcTableId, watch::Sender<u64>>,
38+
commit_lsn_txs: HashMap<SrcTableId, Arc<CommitState>>,
3939
streaming_transactions_state: HashMap<u32, TransactionState>,
4040
transaction_state: TransactionState,
4141
replication_state: Arc<ReplicationState>,
@@ -98,7 +98,7 @@ impl Sink {
9898
&mut self,
9999
src_table_id: SrcTableId,
100100
event_sender: Sender<TableEvent>,
101-
commit_lsn_tx: watch::Sender<u64>,
101+
commit_lsn_tx: Arc<CommitState>,
102102
table_schema: &TableSchema,
103103
) {
104104
self.event_senders.insert(src_table_id, event_sender);
@@ -215,10 +215,8 @@ impl Sink {
215215
ma::assert_ge!(commit_body.end_lsn(), self.max_keepalive_lsn_seen);
216216
for table_id in &self.transaction_state.touched_tables {
217217
let event_sender = self.event_senders.get(table_id);
218-
if let Some(commit_lsn_tx) = self.commit_lsn_txs.get(table_id).cloned() {
219-
if let Err(e) = commit_lsn_tx.send(commit_body.end_lsn()) {
220-
warn!(error = ?e, "failed to send commit lsn");
221-
}
218+
if let Some(commit_lsn_tx) = self.commit_lsn_txs.get(table_id) {
219+
commit_lsn_tx.mark(commit_body.end_lsn());
222220
}
223221
if let Some(event_sender) = event_sender {
224222
if let Err(e) = Self::send_table_event(
@@ -252,10 +250,8 @@ impl Sink {
252250
if let Some(tables_in_txn) = self.streaming_transactions_state.get(&xact_id) {
253251
for table_id in &tables_in_txn.touched_tables {
254252
let event_sender = self.event_senders.get(table_id);
255-
if let Some(commit_lsn_tx) = self.commit_lsn_txs.get(table_id).cloned() {
256-
if let Err(e) = commit_lsn_tx.send(stream_commit_body.end_lsn()) {
257-
warn!(error = ?e, "failed to send stream commit lsn");
258-
}
253+
if let Some(commit_lsn_tx) = self.commit_lsn_txs.get(table_id) {
254+
commit_lsn_tx.mark(stream_commit_body.end_lsn());
259255
}
260256
if let Some(event_sender) = event_sender {
261257
if let Err(e) = Self::send_table_event(
@@ -443,9 +439,9 @@ mod tests {
443439
// Setup one table with event sender and commit lsn channel
444440
let table_id: SrcTableId = 1;
445441
let (tx, mut rx) = mpsc::channel::<TableEvent>(64);
446-
let (commit_tx, _commit_rx) = watch::channel::<u64>(0);
442+
let commit_state = CommitState::new();
447443
let schema = make_table_schema(table_id);
448-
sink.add_table(table_id, tx, commit_tx, &schema);
444+
sink.add_table(table_id, tx, commit_state, &schema);
449445

450446
// Many inserts for the same (xid, table) pair
451447
let xid = Some(42u32);
@@ -493,10 +489,10 @@ mod tests {
493489
let b: SrcTableId = 12;
494490
let (tx_a, mut rx_a) = mpsc::channel::<TableEvent>(8);
495491
let (tx_b, mut rx_b) = mpsc::channel::<TableEvent>(8);
496-
let (commit_tx_a, _rx_a) = watch::channel::<u64>(0);
497-
let (commit_tx_b, _rx_b) = watch::channel::<u64>(0);
498-
sink.add_table(a, tx_a, commit_tx_a, &make_table_schema(a));
499-
sink.add_table(b, tx_b, commit_tx_b, &make_table_schema(b));
492+
let commit_state_a = CommitState::new();
493+
let commit_state_b = CommitState::new();
494+
sink.add_table(a, tx_a, commit_state_a.clone(), &make_table_schema(a));
495+
sink.add_table(b, tx_b, commit_state_b.clone(), &make_table_schema(b));
500496

501497
// Many inserts into A then into B within the same non-streaming transaction
502498
for _ in 0..5 {
@@ -528,12 +524,17 @@ mod tests {
528524
#[tokio::test]
529525
async fn cached_sender_cleared_on_drop_table() {
530526
let replication_state = ReplicationState::new();
527+
let commit_state = CommitState::new();
531528
let mut sink = Sink::new(replication_state);
532529

533530
let table_id: SrcTableId = 21;
534531
let (tx, _rx) = mpsc::channel::<TableEvent>(4);
535-
let (commit_tx, _commit_rx) = watch::channel::<u64>(0);
536-
sink.add_table(table_id, tx, commit_tx, &make_table_schema(table_id));
532+
sink.add_table(
533+
table_id,
534+
tx,
535+
commit_state.clone(),
536+
&make_table_schema(table_id),
537+
);
537538

538539
// Populate sender cache
539540
let _ = sink.get_event_sender_for(table_id);
@@ -547,12 +548,18 @@ mod tests {
547548
#[tokio::test]
548549
async fn interleaved_streams_do_not_use_stale_cache() {
549550
let replication_state = ReplicationState::new();
551+
let commit_state = CommitState::new();
550552
let mut sink = Sink::new(replication_state);
551553

552554
let table_id: SrcTableId = 31;
553555
let (tx, mut rx) = mpsc::channel::<TableEvent>(16);
554556
let (commit_tx, _commit_rx) = watch::channel::<u64>(0);
555-
sink.add_table(table_id, tx, commit_tx, &make_table_schema(table_id));
557+
sink.add_table(
558+
table_id,
559+
tx,
560+
commit_state.clone(),
561+
&make_table_schema(table_id),
562+
);
556563

557564
let xid1 = Some(100u32);
558565
let xid2 = Some(200u32);
@@ -611,16 +618,17 @@ mod tests {
611618
#[tokio::test]
612619
async fn cache_updates_on_table_change_same_xid() {
613620
let replication_state = ReplicationState::new();
621+
let commit_state = CommitState::new();
614622
let mut sink = Sink::new(replication_state);
615623

616624
let a: SrcTableId = 41;
617625
let b: SrcTableId = 42;
618626
let (tx_a, mut rx_a) = mpsc::channel::<TableEvent>(8);
619627
let (tx_b, mut rx_b) = mpsc::channel::<TableEvent>(8);
620-
let (commit_tx_a, _rx_a) = watch::channel::<u64>(0);
621-
let (commit_tx_b, _rx_b) = watch::channel::<u64>(0);
622-
sink.add_table(a, tx_a, commit_tx_a, &make_table_schema(a));
623-
sink.add_table(b, tx_b, commit_tx_b, &make_table_schema(b));
628+
let commit_state_a = CommitState::new();
629+
let commit_state_b = CommitState::new();
630+
sink.add_table(a, tx_a, commit_state_a.clone(), &make_table_schema(a));
631+
sink.add_table(b, tx_b, commit_state_b.clone(), &make_table_schema(b));
624632

625633
let xid = Some(777u32);
626634
// A then B under same xid
@@ -651,12 +659,17 @@ mod tests {
651659
#[tokio::test]
652660
async fn sender_cache_persists_across_xid_and_stream_like_boundaries() {
653661
let replication_state = ReplicationState::new();
662+
let commit_state = CommitState::new();
654663
let mut sink = Sink::new(replication_state);
655664

656665
let table_id: SrcTableId = 51;
657666
let (tx, mut rx) = mpsc::channel::<TableEvent>(8);
658-
let (commit_tx, _commit_rx) = watch::channel::<u64>(0);
659-
sink.add_table(table_id, tx, commit_tx, &make_table_schema(table_id));
667+
sink.add_table(
668+
table_id,
669+
tx,
670+
commit_state.clone(),
671+
&make_table_schema(table_id),
672+
);
660673

661674
let xid1 = Some(1u32);
662675
let xid2 = Some(2u32);
@@ -692,12 +705,17 @@ mod tests {
692705
#[tokio::test]
693706
async fn non_streaming_state_resets_between_transactions() {
694707
let replication_state = ReplicationState::new();
708+
let commit_state = CommitState::new();
695709
let mut sink = Sink::new(replication_state);
696710

697711
let table_id: SrcTableId = 61;
698712
let (tx, mut rx) = mpsc::channel::<TableEvent>(8);
699-
let (commit_tx, _commit_rx) = watch::channel::<u64>(0);
700-
sink.add_table(table_id, tx, commit_tx, &make_table_schema(table_id));
713+
sink.add_table(
714+
table_id,
715+
tx,
716+
commit_state.clone(),
717+
&make_table_schema(table_id),
718+
);
701719

702720
// First transaction: several inserts (non-streaming)
703721
for _ in 0..3 {

src/moonlink_connectors/src/pg_replicate/table_init.rs

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
use crate::lsn_state::ReplicationState;
21
use crate::pg_replicate::table::SrcTableId;
32
use crate::{Error, Result};
43
use arrow_schema::Schema as ArrowSchema;
@@ -12,6 +11,7 @@ use moonlink::{
1211
MoonlinkTableConfig, MoonlinkTableSecret, ObjectStorageCache, ReadStateManager, StorageConfig,
1312
TableEvent, TableEventManager, TableHandler, TableStatusReader, WalConfig, WalManager,
1413
};
14+
use moonlink::{CommitState, ReplicationState};
1515

1616
use std::io::ErrorKind;
1717
use std::path::{Path, PathBuf};
@@ -44,7 +44,7 @@ pub struct TableResources {
4444
pub read_state_manager: ReadStateManager,
4545
pub table_event_manager: TableEventManager,
4646
pub table_status_reader: TableStatusReader,
47-
pub commit_lsn_tx: Option<watch::Sender<u64>>,
47+
pub commit_state: Option<Arc<CommitState>>,
4848
pub flush_lsn_rx: Option<watch::Receiver<u64>>,
4949
pub wal_flush_lsn_rx: Option<watch::Receiver<u64>>,
5050
pub wal_file_accessor: Arc<dyn BaseFileSystemAccess>,
@@ -156,17 +156,18 @@ pub async fn build_table_components(
156156

157157
let last_persistence_snapshot_lsn = table.get_persistence_snapshot_lsn();
158158

159-
let (commit_lsn_tx, commit_lsn_rx) = watch::channel(0u64);
159+
let commit_state = CommitState::new();
160160
// Make a receiver first before possible mark operation, otherwise all receiver initializes with 0.
161-
let replication_lsn_tx = replication_state.subscribe();
161+
let replication_lsn_rx = replication_state.subscribe();
162+
let commit_lsn_rx = commit_state.subscribe();
162163
if let Some(persistence_snapshot_lsn) = last_persistence_snapshot_lsn {
163-
commit_lsn_tx.send(persistence_snapshot_lsn).unwrap();
164+
commit_state.mark(persistence_snapshot_lsn);
164165
replication_state.mark(persistence_snapshot_lsn);
165166
}
166167

167168
let read_state_manager = ReadStateManager::new(
168169
&table,
169-
replication_lsn_tx,
170+
replication_lsn_rx,
170171
commit_lsn_rx,
171172
table_components.read_state_filepath_remap,
172173
);
@@ -196,7 +197,7 @@ pub async fn build_table_components(
196197
read_state_manager,
197198
table_status_reader,
198199
table_event_manager,
199-
commit_lsn_tx: Some(commit_lsn_tx),
200+
commit_state: Some(commit_state),
200201
flush_lsn_rx: Some(flush_lsn_rx),
201202
wal_flush_lsn_rx: Some(wal_flush_lsn_rx),
202203
wal_file_accessor,

src/moonlink_connectors/src/replication_connection.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -287,7 +287,7 @@ impl ReplicationConnection {
287287
std::sync::Arc::new(arrow_schema),
288288
table_resources.event_sender.clone(),
289289
table_resources
290-
.commit_lsn_tx
290+
.commit_state
291291
.take()
292292
.expect("commit_lsn_tx not set"),
293293
table_resources

src/moonlink_connectors/src/rest_ingest.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ pub mod rest_event;
88
pub mod rest_source;
99
pub mod schema_util;
1010

11-
use crate::lsn_state::ReplicationState;
1211
use crate::rest_ingest::event_request::EventRequest;
1312
use crate::rest_ingest::moonlink_rest_sink::RestSink;
1413
use crate::rest_ingest::moonlink_rest_sink::TableStatus;
@@ -17,6 +16,8 @@ use crate::Result;
1716
use apache_avro::schema::Schema as AvroSchema;
1817
use arrow_schema::Schema;
1918
use futures::StreamExt;
19+
use moonlink::CommitState;
20+
use moonlink::ReplicationState;
2021
use moonlink::TableEvent;
2122
use more_asserts as ma;
2223
use std::sync::atomic::{AtomicU32, Ordering};
@@ -35,7 +36,7 @@ pub enum RestCommand {
3536
src_table_id: SrcTableId,
3637
schema: Arc<Schema>,
3738
event_sender: mpsc::Sender<TableEvent>,
38-
commit_lsn_tx: watch::Sender<u64>,
39+
commit_lsn_tx: Arc<CommitState>,
3940
flush_lsn_rx: watch::Receiver<u64>,
4041
wal_flush_lsn_rx: watch::Receiver<u64>,
4142
/// Persist LSN, only assigned for tables to recovery; used to indicate and update replication LSN.
@@ -102,7 +103,7 @@ impl RestApiConnection {
102103
src_table_id: SrcTableId,
103104
schema: Arc<Schema>,
104105
event_sender: mpsc::Sender<TableEvent>,
105-
commit_lsn_tx: watch::Sender<u64>,
106+
commit_lsn_tx: Arc<CommitState>,
106107
flush_lsn_rx: watch::Receiver<u64>,
107108
wal_flush_lsn_rx: watch::Receiver<u64>,
108109
persist_lsn: Option<u64>,

0 commit comments

Comments
 (0)