Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/moonlink/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub mod error;
pub mod event_sync;
pub mod lsn_state;
pub mod mooncake_table_id;
mod observability;
pub mod row;
Expand All @@ -11,6 +12,7 @@ mod union_read;

pub use error::*;
pub use event_sync::EventSyncSender;
pub use lsn_state::{CommitState, ReplicationState};
pub use mooncake_table_id::MooncakeTableId;
pub use storage::mooncake_table::batch_id_counter::BatchIdCounter;
pub use storage::mooncake_table::data_batches::ColumnStoreBuffer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ impl LsnState {
}

pub type ReplicationState = LsnState;
pub type CommitStatus = LsnState;
pub type CommitState = LsnState;

#[cfg(test)]
mod tests {
Expand Down
1 change: 0 additions & 1 deletion src/moonlink_connectors/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
pub mod error;
pub mod lsn_state;
pub mod pg_replicate;
mod replication_connection;
mod replication_manager;
Expand Down
23 changes: 10 additions & 13 deletions src/moonlink_connectors/src/pg_replicate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ pub mod table;
pub mod table_init;
pub mod util;

use crate::lsn_state::ReplicationState;
use crate::pg_replicate::clients::postgres::{build_tls_connector, ReplicationClient};
use crate::pg_replicate::conversions::cdc_event::{CdcEvent, CdcEventConversionError};
use crate::pg_replicate::initial_copy::copy_table_stream;
Expand All @@ -24,8 +23,8 @@ use crate::pg_replicate::table_init::{build_table_components, TableComponents};
use crate::Result;
use futures::StreamExt;
use moonlink::{
MooncakeTableId, MoonlinkTableConfig, ObjectStorageCache, ReadStateFilepathRemap, TableEvent,
WalManager,
CommitState, MooncakeTableId, MoonlinkTableConfig, ObjectStorageCache, ReadStateFilepathRemap,
ReplicationState, TableEvent, WalManager,
};
use native_tls::{Certificate, TlsConnector};
use pg_escape::{quote_identifier, quote_literal};
Expand Down Expand Up @@ -66,7 +65,7 @@ pub enum PostgresReplicationCommand {
src_table_id: SrcTableId,
schema: TableSchema,
event_sender: mpsc::Sender<TableEvent>,
commit_lsn_tx: watch::Sender<u64>,
commit_state: Arc<CommitState>,
flush_lsn_rx: watch::Receiver<u64>,
wal_flush_lsn_rx: watch::Receiver<u64>,
ready_tx: oneshot::Sender<()>,
Expand Down Expand Up @@ -247,7 +246,7 @@ impl PostgresConnection {
schema: &TableSchema,
event_sender: mpsc::Sender<TableEvent>,
is_recovery: bool,
commit_lsn_tx: watch::Sender<u64>,
commit_lsn_tx: Arc<CommitState>,
table_base_path: &str,
) -> Result<(bool)> {
let src_table_id = schema.src_table_id;
Expand Down Expand Up @@ -296,9 +295,7 @@ impl PostgresConnection {
}

// Notify read state manager with the commit LSN for the initial copy boundary.
if let Err(e) = commit_lsn_tx.send(progress.boundary_lsn.into()) {
warn!(error = ?e, table_id = src_table_id, "failed to send initial copy commit lsn");
}
commit_lsn_tx.mark(progress.boundary_lsn.into());
self.replication_state.mark(progress.boundary_lsn.into());

Ok(true)
Expand Down Expand Up @@ -398,7 +395,7 @@ impl PostgresConnection {
src_table_id: SrcTableId,
schema: TableSchema,
event_sender: mpsc::Sender<TableEvent>,
commit_lsn_tx: watch::Sender<u64>,
commit_state: Arc<CommitState>,
flush_lsn_rx: watch::Receiver<u64>,
wal_flush_lsn_rx: watch::Receiver<u64>,
) -> Result<oneshot::Receiver<()>> {
Expand All @@ -407,7 +404,7 @@ impl PostgresConnection {
src_table_id,
schema,
event_sender,
commit_lsn_tx,
commit_state,
flush_lsn_rx,
wal_flush_lsn_rx,
ready_tx,
Expand Down Expand Up @@ -522,7 +519,7 @@ impl PostgresConnection {

// Send command to add table to replication
let commit_lsn_tx = table_resources
.commit_lsn_tx
.commit_state
.take()
.expect("commit_lsn_tx is None");
let commit_lsn_tx_for_copy = commit_lsn_tx.clone();
Expand Down Expand Up @@ -763,8 +760,8 @@ pub async fn run_event_loop(
}
},
Some(cmd) = cmd_rx.recv() => match cmd {
PostgresReplicationCommand::AddTable { src_table_id, schema, event_sender, commit_lsn_tx, flush_lsn_rx, wal_flush_lsn_rx, ready_tx } => {
sink.add_table(src_table_id, event_sender, commit_lsn_tx, &schema);
PostgresReplicationCommand::AddTable { src_table_id, schema, event_sender, commit_state, flush_lsn_rx, wal_flush_lsn_rx, ready_tx } => {
sink.add_table(src_table_id, event_sender, commit_state, &schema);
flush_lsn_rxs.insert(src_table_id, flush_lsn_rx);
wal_flush_lsn_rxs.insert(src_table_id, wal_flush_lsn_rx);
stream.as_mut().add_table_schema(schema);
Expand Down
71 changes: 43 additions & 28 deletions src/moonlink_connectors/src/pg_replicate/moonlink_sink.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use crate::lsn_state::ReplicationState;
use crate::pg_replicate::util::PostgresTableRow;
use crate::pg_replicate::{
conversions::{cdc_event::CdcEvent, table_row::TableRow},
table::{SrcTableId, TableSchema},
};
use moonlink::TableEvent;
use moonlink::{CommitState, ReplicationState};
use more_asserts as ma;
use postgres_replication::protocol::Column as ReplicationColumn;
use std::collections::HashMap;
Expand Down Expand Up @@ -35,7 +35,7 @@ struct ColumnInfo {
}
pub struct Sink {
event_senders: HashMap<SrcTableId, Sender<TableEvent>>,
commit_lsn_txs: HashMap<SrcTableId, watch::Sender<u64>>,
commit_lsn_txs: HashMap<SrcTableId, Arc<CommitState>>,
streaming_transactions_state: HashMap<u32, TransactionState>,
transaction_state: TransactionState,
replication_state: Arc<ReplicationState>,
Expand Down Expand Up @@ -98,7 +98,7 @@ impl Sink {
&mut self,
src_table_id: SrcTableId,
event_sender: Sender<TableEvent>,
commit_lsn_tx: watch::Sender<u64>,
commit_lsn_tx: Arc<CommitState>,
table_schema: &TableSchema,
) {
self.event_senders.insert(src_table_id, event_sender);
Expand Down Expand Up @@ -215,10 +215,8 @@ impl Sink {
ma::assert_ge!(commit_body.end_lsn(), self.max_keepalive_lsn_seen);
for table_id in &self.transaction_state.touched_tables {
let event_sender = self.event_senders.get(table_id);
if let Some(commit_lsn_tx) = self.commit_lsn_txs.get(table_id).cloned() {
if let Err(e) = commit_lsn_tx.send(commit_body.end_lsn()) {
warn!(error = ?e, "failed to send commit lsn");
}
if let Some(commit_lsn_tx) = self.commit_lsn_txs.get(table_id) {
commit_lsn_tx.mark(commit_body.end_lsn());
}
if let Some(event_sender) = event_sender {
if let Err(e) = Self::send_table_event(
Expand Down Expand Up @@ -252,10 +250,8 @@ impl Sink {
if let Some(tables_in_txn) = self.streaming_transactions_state.get(&xact_id) {
for table_id in &tables_in_txn.touched_tables {
let event_sender = self.event_senders.get(table_id);
if let Some(commit_lsn_tx) = self.commit_lsn_txs.get(table_id).cloned() {
if let Err(e) = commit_lsn_tx.send(stream_commit_body.end_lsn()) {
warn!(error = ?e, "failed to send stream commit lsn");
}
if let Some(commit_lsn_tx) = self.commit_lsn_txs.get(table_id) {
commit_lsn_tx.mark(stream_commit_body.end_lsn());
}
if let Some(event_sender) = event_sender {
if let Err(e) = Self::send_table_event(
Expand Down Expand Up @@ -443,9 +439,9 @@ mod tests {
// Setup one table with event sender and commit lsn channel
let table_id: SrcTableId = 1;
let (tx, mut rx) = mpsc::channel::<TableEvent>(64);
let (commit_tx, _commit_rx) = watch::channel::<u64>(0);
let commit_state = CommitState::new();
let schema = make_table_schema(table_id);
sink.add_table(table_id, tx, commit_tx, &schema);
sink.add_table(table_id, tx, commit_state, &schema);

// Many inserts for the same (xid, table) pair
let xid = Some(42u32);
Expand Down Expand Up @@ -486,17 +482,16 @@ mod tests {
#[tokio::test]
async fn hot_path_non_streaming_vec_dedupe_across_tables() {
let replication_state = ReplicationState::new();
let commit_state = CommitState::new();
let mut sink = Sink::new(replication_state);

// Two tables
let a: SrcTableId = 11;
let b: SrcTableId = 12;
let (tx_a, mut rx_a) = mpsc::channel::<TableEvent>(8);
let (tx_b, mut rx_b) = mpsc::channel::<TableEvent>(8);
let (commit_tx_a, _rx_a) = watch::channel::<u64>(0);
let (commit_tx_b, _rx_b) = watch::channel::<u64>(0);
sink.add_table(a, tx_a, commit_tx_a, &make_table_schema(a));
sink.add_table(b, tx_b, commit_tx_b, &make_table_schema(b));
sink.add_table(a, tx_a, commit_state.clone(), &make_table_schema(a));
sink.add_table(b, tx_b, commit_state.clone(), &make_table_schema(b));

// Many inserts into A then into B within the same non-streaming transaction
for _ in 0..5 {
Expand Down Expand Up @@ -528,12 +523,17 @@ mod tests {
#[tokio::test]
async fn cached_sender_cleared_on_drop_table() {
let replication_state = ReplicationState::new();
let commit_state = CommitState::new();
let mut sink = Sink::new(replication_state);

let table_id: SrcTableId = 21;
let (tx, _rx) = mpsc::channel::<TableEvent>(4);
let (commit_tx, _commit_rx) = watch::channel::<u64>(0);
sink.add_table(table_id, tx, commit_tx, &make_table_schema(table_id));
sink.add_table(
table_id,
tx,
commit_state.clone(),
&make_table_schema(table_id),
);

// Populate sender cache
let _ = sink.get_event_sender_for(table_id);
Expand All @@ -547,12 +547,18 @@ mod tests {
#[tokio::test]
async fn interleaved_streams_do_not_use_stale_cache() {
let replication_state = ReplicationState::new();
let commit_state = CommitState::new();
let mut sink = Sink::new(replication_state);

let table_id: SrcTableId = 31;
let (tx, mut rx) = mpsc::channel::<TableEvent>(16);
let (commit_tx, _commit_rx) = watch::channel::<u64>(0);
sink.add_table(table_id, tx, commit_tx, &make_table_schema(table_id));
sink.add_table(
table_id,
tx,
commit_state.clone(),
&make_table_schema(table_id),
);

let xid1 = Some(100u32);
let xid2 = Some(200u32);
Expand Down Expand Up @@ -611,16 +617,15 @@ mod tests {
#[tokio::test]
async fn cache_updates_on_table_change_same_xid() {
let replication_state = ReplicationState::new();
let commit_state = CommitState::new();
let mut sink = Sink::new(replication_state);

let a: SrcTableId = 41;
let b: SrcTableId = 42;
let (tx_a, mut rx_a) = mpsc::channel::<TableEvent>(8);
let (tx_b, mut rx_b) = mpsc::channel::<TableEvent>(8);
let (commit_tx_a, _rx_a) = watch::channel::<u64>(0);
let (commit_tx_b, _rx_b) = watch::channel::<u64>(0);
sink.add_table(a, tx_a, commit_tx_a, &make_table_schema(a));
sink.add_table(b, tx_b, commit_tx_b, &make_table_schema(b));
sink.add_table(a, tx_a, commit_state.clone(), &make_table_schema(a));
sink.add_table(b, tx_b, commit_state.clone(), &make_table_schema(b));

let xid = Some(777u32);
// A then B under same xid
Expand Down Expand Up @@ -651,12 +656,17 @@ mod tests {
#[tokio::test]
async fn sender_cache_persists_across_xid_and_stream_like_boundaries() {
let replication_state = ReplicationState::new();
let commit_state = CommitState::new();
let mut sink = Sink::new(replication_state);

let table_id: SrcTableId = 51;
let (tx, mut rx) = mpsc::channel::<TableEvent>(8);
let (commit_tx, _commit_rx) = watch::channel::<u64>(0);
sink.add_table(table_id, tx, commit_tx, &make_table_schema(table_id));
sink.add_table(
table_id,
tx,
commit_state.clone(),
&make_table_schema(table_id),
);

let xid1 = Some(1u32);
let xid2 = Some(2u32);
Expand Down Expand Up @@ -692,12 +702,17 @@ mod tests {
#[tokio::test]
async fn non_streaming_state_resets_between_transactions() {
let replication_state = ReplicationState::new();
let commit_state = CommitState::new();
let mut sink = Sink::new(replication_state);

let table_id: SrcTableId = 61;
let (tx, mut rx) = mpsc::channel::<TableEvent>(8);
let (commit_tx, _commit_rx) = watch::channel::<u64>(0);
sink.add_table(table_id, tx, commit_tx, &make_table_schema(table_id));
sink.add_table(
table_id,
tx,
commit_state.clone(),
&make_table_schema(table_id),
);

// First transaction: several inserts (non-streaming)
for _ in 0..3 {
Expand Down
15 changes: 8 additions & 7 deletions src/moonlink_connectors/src/pg_replicate/table_init.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use crate::lsn_state::ReplicationState;
use crate::pg_replicate::table::SrcTableId;
use crate::{Error, Result};
use arrow_schema::Schema as ArrowSchema;
Expand All @@ -12,6 +11,7 @@ use moonlink::{
MoonlinkTableConfig, MoonlinkTableSecret, ObjectStorageCache, ReadStateManager, StorageConfig,
TableEvent, TableEventManager, TableHandler, TableStatusReader, WalConfig, WalManager,
};
use moonlink::{CommitState, ReplicationState};

use std::io::ErrorKind;
use std::path::{Path, PathBuf};
Expand Down Expand Up @@ -44,7 +44,7 @@ pub struct TableResources {
pub read_state_manager: ReadStateManager,
pub table_event_manager: TableEventManager,
pub table_status_reader: TableStatusReader,
pub commit_lsn_tx: Option<watch::Sender<u64>>,
pub commit_state: Option<Arc<CommitState>>,
pub flush_lsn_rx: Option<watch::Receiver<u64>>,
pub wal_flush_lsn_rx: Option<watch::Receiver<u64>>,
pub wal_file_accessor: Arc<dyn BaseFileSystemAccess>,
Expand Down Expand Up @@ -156,17 +156,18 @@ pub async fn build_table_components(

let last_persistence_snapshot_lsn = table.get_persistence_snapshot_lsn();

let (commit_lsn_tx, commit_lsn_rx) = watch::channel(0u64);
let commit_state = CommitState::new();
// Make a receiver first before possible mark operation, otherwise all receiver initializes with 0.
let replication_lsn_tx = replication_state.subscribe();
let replication_lsn_rx = replication_state.subscribe();
let commit_lsn_rx = commit_state.subscribe();
if let Some(persistence_snapshot_lsn) = last_persistence_snapshot_lsn {
commit_lsn_tx.send(persistence_snapshot_lsn).unwrap();
commit_state.mark(persistence_snapshot_lsn);
replication_state.mark(persistence_snapshot_lsn);
}

let read_state_manager = ReadStateManager::new(
&table,
replication_lsn_tx,
replication_lsn_rx,
commit_lsn_rx,
table_components.read_state_filepath_remap,
);
Expand Down Expand Up @@ -196,7 +197,7 @@ pub async fn build_table_components(
read_state_manager,
table_status_reader,
table_event_manager,
commit_lsn_tx: Some(commit_lsn_tx),
commit_state: Some(commit_state),
flush_lsn_rx: Some(flush_lsn_rx),
wal_flush_lsn_rx: Some(wal_flush_lsn_rx),
wal_file_accessor,
Expand Down
2 changes: 1 addition & 1 deletion src/moonlink_connectors/src/replication_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ impl ReplicationConnection {
std::sync::Arc::new(arrow_schema),
table_resources.event_sender.clone(),
table_resources
.commit_lsn_tx
.commit_state
.take()
.expect("commit_lsn_tx not set"),
table_resources
Expand Down
Loading
Loading