Skip to content

Commit 4feea31

Browse files
authored
fix: preserve failover checkpoint under repeated failures (#46)
## Summary - Prevent checkpoint drift in `PgStream::tick` by preserving the existing failover checkpoint when publish fails while already in failover, instead of overwriting it with a newer batch checkpoint. - Harden slot recovery to keep the earliest safe recovery point by comparing existing failover checkpoint vs LSN-derived checkpoint and only updating when recovery would move backward safely. - Make replay deterministic and non-blocking by ordering replay COPY rows (`ORDER BY created_at, id`) and using a separate connection for replay checkpoint updates. - Add regression coverage for repeated failover failures, active COPY + checkpoint update behavior, large replay windows, and slot recovery when an existing failover checkpoint is already present. ## Why - Without these guards, failover and slot-recovery flows could advance checkpoints during ongoing failure conditions and skip undelivered events. ## Testing - `cargo test --features test-utils --test failover_checkpoint_tests` - `cargo test --features test-utils --test slot_recovery_tests test_slot_recovery_preserves_existing_failover_checkpoint` - `cargo test --features test-utils slot_recovery::tests::` - `cargo test --features test-utils --test replay_client_tests test_get_events_copy_stream_with_events`
1 parent 076b76d commit 4feea31

7 files changed

Lines changed: 608 additions & 31 deletions

File tree

src/replay_client.rs

Lines changed: 37 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@ use std::io::BufReader;
22
use std::num::NonZeroI32;
33
use std::sync::Arc;
44

5-
use etl::config::{ETL_REPLICATION_OPTIONS, IntoConnectOptions, PgConnectionConfig};
5+
use etl::config::{
6+
ETL_REPLICATION_OPTIONS, ETL_STATE_MANAGEMENT_OPTIONS, IntoConnectOptions, PgConnectionConfig,
7+
};
68
use etl::error::EtlResult;
79
use etl_postgres::replication::extract_server_version;
810
use tokio_postgres::tls::MakeTlsConnect;
@@ -38,7 +40,10 @@ where
3840
#[derive(Debug, Clone)]
3941
pub struct ReplayClient {
4042
stream_id: StreamId,
41-
client: Arc<Client>,
43+
/// Connection used for COPY streaming of replay events.
44+
copy_client: Arc<Client>,
45+
/// Separate connection used for checkpoint updates during replay.
46+
checkpoint_client: Arc<Client>,
4247
/// Server version extracted from connection - reserved for future version-specific logic
4348
#[allow(dead_code)]
4449
server_version: Option<NonZeroI32>,
@@ -67,22 +72,28 @@ impl ReplayClient {
6772
stream_id: StreamId,
6873
pg_connection_config: PgConnectionConfig,
6974
) -> EtlResult<Self> {
70-
let config: Config = pg_connection_config
75+
let copy_config: Config = pg_connection_config
7176
.clone()
7277
.with_db(Some(&ETL_REPLICATION_OPTIONS));
7378

74-
let (client, connection) = config.connect(NoTls).await?;
79+
let (copy_client, copy_connection) = copy_config.connect(NoTls).await?;
7580

76-
let server_version = connection
81+
let server_version = copy_connection
7782
.parameter("server_version")
7883
.and_then(extract_server_version);
7984

80-
spawn_postgres_connection::<NoTls>(connection);
85+
spawn_postgres_connection::<NoTls>(copy_connection);
86+
87+
let checkpoint_config: Config =
88+
pg_connection_config.with_db(Some(&ETL_STATE_MANAGEMENT_OPTIONS));
89+
let (checkpoint_client, checkpoint_connection) = checkpoint_config.connect(NoTls).await?;
90+
spawn_postgres_connection::<NoTls>(checkpoint_connection);
8191

8292
info!("successfully connected to postgres without tls");
8393

8494
Ok(ReplayClient {
85-
client: Arc::new(client),
95+
copy_client: Arc::new(copy_client),
96+
checkpoint_client: Arc::new(checkpoint_client),
8697
server_version,
8798
stream_id,
8899
})
@@ -95,7 +106,7 @@ impl ReplayClient {
95106
stream_id: StreamId,
96107
pg_connection_config: PgConnectionConfig,
97108
) -> EtlResult<Self> {
98-
let config: Config = pg_connection_config
109+
let copy_config: Config = pg_connection_config
99110
.clone()
100111
.with_db(Some(&ETL_REPLICATION_OPTIONS));
101112

@@ -113,18 +124,28 @@ impl ReplayClient {
113124
.with_root_certificates(root_store)
114125
.with_no_client_auth();
115126

116-
let (client, connection) = config.connect(MakeRustlsConnect::new(tls_config)).await?;
127+
let (copy_client, copy_connection) = copy_config
128+
.connect(MakeRustlsConnect::new(tls_config.clone()))
129+
.await?;
117130

118-
let server_version = connection
131+
let server_version = copy_connection
119132
.parameter("server_version")
120133
.and_then(extract_server_version);
121134

122-
spawn_postgres_connection::<MakeRustlsConnect>(connection);
135+
spawn_postgres_connection::<MakeRustlsConnect>(copy_connection);
136+
137+
let checkpoint_config: Config =
138+
pg_connection_config.with_db(Some(&ETL_STATE_MANAGEMENT_OPTIONS));
139+
let (checkpoint_client, checkpoint_connection) = checkpoint_config
140+
.connect(MakeRustlsConnect::new(tls_config))
141+
.await?;
142+
spawn_postgres_connection::<MakeRustlsConnect>(checkpoint_connection);
123143

124144
info!("successfully connected to postgres with tls");
125145

126146
Ok(ReplayClient {
127-
client: Arc::new(client),
147+
copy_client: Arc::new(copy_client),
148+
checkpoint_client: Arc::new(checkpoint_client),
128149
server_version,
129150
stream_id,
130151
})
@@ -133,7 +154,7 @@ impl ReplayClient {
133154
/// Checks if the underlying connection is closed.
134155
#[must_use]
135156
pub fn is_closed(&self) -> bool {
136-
self.client.is_closed()
157+
self.copy_client.is_closed() || self.checkpoint_client.is_closed()
137158
}
138159

139160
/// Gets events between two checkpoints (exclusive on both ends).
@@ -150,12 +171,13 @@ impl ReplayClient {
150171
where (created_at, id) > ('{}'::timestamptz, '{}'::uuid)
151172
and (created_at, id) < ('{}'::timestamptz, '{}'::uuid)
152173
and stream_id = {}
174+
order by created_at, id
153175
) to stdout with (format text);
154176
"#,
155177
from.created_at, from.id, to.created_at, to.id, self.stream_id as i64
156178
);
157179

158-
let stream = self.client.copy_out_simple(&copy_query).await?;
180+
let stream = self.copy_client.copy_out_simple(&copy_query).await?;
159181

160182
Ok(stream)
161183
}
@@ -164,7 +186,7 @@ impl ReplayClient {
164186
///
165187
/// This is duplicated from the [`StreamStore`] because we want to use a persistent connection during failover.
166188
pub async fn update_checkpoint(&self, checkpoint: &EventIdentifier) -> EtlResult<()> {
167-
self.client
189+
self.checkpoint_client
168190
.execute(
169191
r#"
170192
update pgstream.streams

src/slot_recovery.rs

Lines changed: 104 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,32 @@ use tracing::{info, warn};
2424

2525
use crate::types::SlotName;
2626

27+
type Checkpoint = (String, DateTime<Utc>);
28+
29+
#[must_use]
30+
fn checkpoint_is_earlier(a: &Checkpoint, b: &Checkpoint) -> bool {
31+
a.1 < b.1 || (a.1 == b.1 && a.0 < b.0)
32+
}
33+
34+
#[must_use]
35+
fn select_recovery_checkpoint(
36+
existing_checkpoint: Option<Checkpoint>,
37+
lsn_checkpoint: Option<Checkpoint>,
38+
) -> Option<Checkpoint> {
39+
match (existing_checkpoint, lsn_checkpoint) {
40+
(Some(existing), Some(from_lsn)) => {
41+
if checkpoint_is_earlier(&existing, &from_lsn) {
42+
Some(existing)
43+
} else {
44+
Some(from_lsn)
45+
}
46+
}
47+
(Some(existing), None) => Some(existing),
48+
(None, Some(from_lsn)) => Some(from_lsn),
49+
(None, None) => None,
50+
}
51+
}
52+
2753
/// Checks if an error indicates a replication slot has been invalidated.
2854
///
2955
/// Postgres returns error code 55000 (OBJECT_NOT_IN_PREREQUISITE_STATE) with the message
@@ -66,6 +92,24 @@ pub async fn handle_slot_recovery(pool: &PgPool, stream_id: u64) -> EtlResult<()
6692
// Start a transaction for the checkpoint update
6793
let mut tx = pool.begin().await?;
6894

95+
// Preserve an existing failover checkpoint if we are already in failover mode.
96+
let existing_checkpoint_row: Option<(Option<String>, Option<DateTime<Utc>>)> = sqlx::query_as(
97+
"SELECT failover_checkpoint_id, failover_checkpoint_ts FROM pgstream.streams WHERE id = $1",
98+
)
99+
.bind(stream_id as i64)
100+
.fetch_optional(&mut *tx)
101+
.await?;
102+
103+
let existing_checkpoint = existing_checkpoint_row.and_then(|(id, ts)| id.zip(ts));
104+
105+
if let Some((id, created_at)) = &existing_checkpoint {
106+
info!(
107+
event_id = %id,
108+
event_created_at = %created_at,
109+
"existing failover checkpoint found"
110+
);
111+
}
112+
69113
// 1. Get confirmed_flush_lsn BEFORE dropping the slot
70114
let confirmed_lsn: Option<String> = sqlx::query_scalar(
71115
"SELECT confirmed_flush_lsn::text FROM pg_replication_slots WHERE slot_name = $1",
@@ -91,7 +135,7 @@ pub async fn handle_slot_recovery(pool: &PgPool, stream_id: u64) -> EtlResult<()
91135
);
92136

93137
// 2. Find the first event after the confirmed LSN
94-
let checkpoint: Option<(String, DateTime<Utc>)> = sqlx::query_as(
138+
let lsn_checkpoint: Option<Checkpoint> = sqlx::query_as(
95139
"SELECT id::text, created_at FROM pgstream.events
96140
WHERE lsn > $1::pg_lsn AND stream_id = $2
97141
ORDER BY created_at, id LIMIT 1",
@@ -101,8 +145,23 @@ pub async fn handle_slot_recovery(pool: &PgPool, stream_id: u64) -> EtlResult<()
101145
.fetch_optional(&mut *tx)
102146
.await?;
103147

104-
// 3. Set failover checkpoint BEFORE dropping slot (crash-safe ordering)
105-
if let Some((id, created_at)) = checkpoint {
148+
// 3. Choose the earliest safe checkpoint.
149+
// If we are already in failover mode, keep the existing checkpoint unless
150+
// the LSN-derived checkpoint is earlier.
151+
let checkpoint = select_recovery_checkpoint(existing_checkpoint.clone(), lsn_checkpoint);
152+
153+
// 4. Set failover checkpoint BEFORE dropping slot (crash-safe ordering)
154+
if checkpoint == existing_checkpoint {
155+
if let Some((id, created_at)) = checkpoint {
156+
info!(
157+
event_id = %id,
158+
event_created_at = %created_at,
159+
"preserving existing failover checkpoint during slot recovery"
160+
);
161+
} else {
162+
info!("no events found after confirmed_flush_lsn, pipeline will start fresh");
163+
}
164+
} else if let Some((id, created_at)) = checkpoint {
106165
info!(
107166
event_id = %id,
108167
event_created_at = %created_at,
@@ -124,7 +183,7 @@ pub async fn handle_slot_recovery(pool: &PgPool, stream_id: u64) -> EtlResult<()
124183
info!("no events found after confirmed_flush_lsn, pipeline will start fresh");
125184
}
126185

127-
// 4. Delete ETL replication state so ETL will create a fresh slot on restart
186+
// 5. Delete ETL replication state so ETL will create a fresh slot on restart
128187
// This triggers DataSync, but we skip it by returning Ok(()) from write_table_rows.
129188
// The failover checkpoint ensures we COPY missed events when replication starts.
130189
let deleted = sqlx::query("DELETE FROM etl.replication_state WHERE pipeline_id = $1")
@@ -137,10 +196,10 @@ pub async fn handle_slot_recovery(pool: &PgPool, stream_id: u64) -> EtlResult<()
137196
"deleted ETL replication state to trigger fresh slot creation"
138197
);
139198

140-
// 5. Commit the transaction - checkpoint is now durable
199+
// 6. Commit the transaction - checkpoint is now durable
141200
tx.commit().await?;
142201

143-
// 6. Drop the invalidated slot AFTER commit (non-transactional operation)
202+
// 7. Drop the invalidated slot AFTER commit (non-transactional operation)
144203
// This ordering ensures crash safety: if we crash here, the checkpoint is
145204
// already saved, and the next recovery attempt will simply drop the slot.
146205
let drop_result = sqlx::query("SELECT pg_drop_replication_slot($1)")
@@ -164,6 +223,7 @@ pub async fn handle_slot_recovery(pool: &PgPool, stream_id: u64) -> EtlResult<()
164223
#[cfg(test)]
165224
mod tests {
166225
use super::*;
226+
use chrono::TimeZone;
167227

168228
#[test]
169229
fn test_is_slot_invalidation_error_matches() {
@@ -206,4 +266,42 @@ mod tests {
206266
let error = etl::etl_error!(etl::error::ErrorKind::InvalidState, "connection refused");
207267
assert!(!is_slot_invalidation_error(&error));
208268
}
269+
270+
#[test]
271+
fn test_select_recovery_checkpoint_prefers_earlier_existing_checkpoint() {
272+
let ts = Utc.with_ymd_and_hms(2024, 3, 15, 12, 0, 0).unwrap();
273+
let existing = Some(("00000000-0000-0000-0000-000000000001".to_string(), ts));
274+
let from_lsn = Some((
275+
"00000000-0000-0000-0000-000000000002".to_string(),
276+
ts + chrono::Duration::seconds(1),
277+
));
278+
279+
assert_eq!(
280+
select_recovery_checkpoint(existing.clone(), from_lsn),
281+
existing
282+
);
283+
}
284+
285+
#[test]
286+
fn test_select_recovery_checkpoint_prefers_earlier_lsn_checkpoint() {
287+
let ts = Utc.with_ymd_and_hms(2024, 3, 15, 12, 0, 0).unwrap();
288+
let existing = Some((
289+
"00000000-0000-0000-0000-000000000002".to_string(),
290+
ts + chrono::Duration::seconds(1),
291+
));
292+
let from_lsn = Some(("00000000-0000-0000-0000-000000000001".to_string(), ts));
293+
294+
assert_eq!(
295+
select_recovery_checkpoint(existing, from_lsn.clone()),
296+
from_lsn
297+
);
298+
}
299+
300+
#[test]
301+
fn test_select_recovery_checkpoint_uses_existing_when_lsn_checkpoint_missing() {
302+
let ts = Utc.with_ymd_and_hms(2024, 3, 15, 12, 0, 0).unwrap();
303+
let existing = Some(("00000000-0000-0000-0000-000000000001".to_string(), ts));
304+
305+
assert_eq!(select_recovery_checkpoint(existing.clone(), None), existing);
306+
}
209307
}

src/stream.rs

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -103,16 +103,31 @@ where
103103

104104
let result = self.sink.publish_events(events).await;
105105
if result.is_err() {
106-
info!(
107-
"Publishing events failed, entering failover at checkpoint event id: {:?}",
108-
checkpoint_id
109-
);
110-
metrics::record_failover_entered(self.config.id);
111-
self.store
112-
.store_stream_status(StreamStatus::Failover {
113-
checkpoint_event_id: checkpoint_id,
114-
})
115-
.await?;
106+
let (current_status, _) = self.store.get_stream_state().await?;
107+
108+
match current_status {
109+
StreamStatus::Healthy => {
110+
info!(
111+
"Publishing events failed, entering failover at checkpoint event id: {:?}",
112+
checkpoint_id
113+
);
114+
metrics::record_failover_entered(self.config.id);
115+
self.store
116+
.store_stream_status(StreamStatus::Failover {
117+
checkpoint_event_id: checkpoint_id,
118+
})
119+
.await?;
120+
}
121+
StreamStatus::Failover {
122+
checkpoint_event_id,
123+
} => {
124+
info!(
125+
"Publishing events failed while already in failover, preserving checkpoint event id: {:?}",
126+
checkpoint_event_id
127+
);
128+
}
129+
}
130+
116131
return Ok(());
117132
}
118133

0 commit comments

Comments
 (0)