diff --git a/src/metrics/failover.rs b/src/metrics/failover.rs index d52ac4d..e5c06d9 100644 --- a/src/metrics/failover.rs +++ b/src/metrics/failover.rs @@ -1,13 +1,10 @@ -use metrics::{Unit, describe_gauge, describe_histogram, gauge, histogram}; +use metrics::{Unit, describe_gauge, gauge}; /// Metric name for tracking if a stream is currently in failover mode. /// /// This gauge is set to 1 when the stream is in failover mode and 0 otherwise. const STREAM_FAILOVER_ACTIVE: &str = "stream_failover_active"; -/// Metric name for recording the time spent in failover mode. -const STREAM_FAILOVER_DURATION_MILLISECONDS: &str = "stream_failover_duration_milliseconds"; - /// Label key for stream identifier. const STREAM_ID_LABEL: &str = "stream_id"; @@ -18,11 +15,6 @@ pub(crate) fn register_failover_metrics() { Unit::Count, "Whether the stream is currently in failover mode (1 = failover, 0 = healthy)" ); - describe_histogram!( - STREAM_FAILOVER_DURATION_MILLISECONDS, - Unit::Seconds, - "Time spent in failover mode before recovery" - ); } /// Records that the stream has entered failover mode. @@ -35,15 +27,10 @@ pub fn record_failover_entered(stream_id: u64) { } /// Records that the stream has recovered from failover mode. -pub fn record_failover_recovered(stream_id: u64, duration_milliseconds: f64) { +pub fn record_failover_recovered(stream_id: u64) { gauge!( STREAM_FAILOVER_ACTIVE, STREAM_ID_LABEL => stream_id.to_string() ) .set(0.0); - histogram!( - STREAM_FAILOVER_DURATION_MILLISECONDS, - STREAM_ID_LABEL => stream_id.to_string() - ) - .record(duration_milliseconds); } diff --git a/src/stream.rs b/src/stream.rs index e643685..82ba71e 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -50,18 +50,25 @@ where // create stream store let store = StreamStore::create(config.clone(), store).await?; - // run initial maintenance synchronously during startup if due let (status, next_maintenance_at) = store.get_stream_state().await?; - if let StreamStatus::Failover { - checkpoint_event_id, - } = &status - { - warn!( - checkpoint_event_id = %checkpoint_event_id.id, - "Stream starting in failover mode; recovery will be attempted on next batch" - ); + + // sync failover metric + match &status { + StreamStatus::Failover { + checkpoint_event_id, + } => { + warn!( + checkpoint_event_id = %checkpoint_event_id.id, + "Stream starting in failover mode; recovery will be attempted on next batch" + ); + metrics::record_failover_entered(config.id); + } + StreamStatus::Healthy => { + metrics::record_failover_recovered(config.id); + } } + // run initial maintenance synchronously during startup if due if Utc::now() >= next_maintenance_at { run_maintenance(&store, next_maintenance_at).await?; let next = next_maintenance_at + chrono::Duration::hours(24); @@ -216,8 +223,6 @@ where checkpoint_event_id: &EventIdentifier, current_batch_event_id: &EventIdentifier, ) -> EtlResult { - let failover_start = Utc::now(); - let checkpoint_event = self.store.get_checkpoint_event(checkpoint_event_id).await?; // Record processing lag for checkpoint event @@ -280,10 +285,7 @@ where replay_client.update_checkpoint(&last_event_id).await?; } - // Record successful failover recovery - let failover_duration_milliseconds = - (Utc::now() - failover_start).num_milliseconds() as f64; - metrics::record_failover_recovered(self.config.id, failover_duration_milliseconds); + metrics::record_failover_recovered(self.config.id); self.store .store_stream_status(StreamStatus::Healthy) @@ -295,6 +297,13 @@ where } } +#[cfg(feature = "test-utils")] +impl PgStream { + pub fn store(&self) -> &StreamStore { + &self.store + } +} + impl Destination for PgStream where Sink: SinkTrait + Sync, diff --git a/src/test_utils/failable_sink.rs b/src/test_utils/failable_sink.rs index 2750366..2885d9f 100644 --- a/src/test_utils/failable_sink.rs +++ b/src/test_utils/failable_sink.rs @@ -12,6 +12,10 @@ use crate::types::TriggeredEvent; pub struct FailableSink { events: Arc>>, fail_on_call: Arc, + /// Inclusive start of the range of calls to fail on. + fail_range_start: Arc, + /// Exclusive end of the range of calls to fail on. + fail_range_end: Arc, call_count: Arc, should_fail: Arc, } @@ -28,6 +32,8 @@ impl FailableSink { Self { events: Arc::new(Mutex::new(Vec::new())), fail_on_call: Arc::new(AtomicUsize::new(usize::MAX)), + fail_range_start: Arc::new(AtomicUsize::new(usize::MAX)), + fail_range_end: Arc::new(AtomicUsize::new(usize::MAX)), call_count: Arc::new(AtomicUsize::new(0)), should_fail: Arc::new(AtomicBool::new(false)), } @@ -38,10 +44,18 @@ impl FailableSink { self.fail_on_call.store(n, Ordering::SeqCst); } + /// Configure sink to fail on calls in the range `[start, end)` (0-indexed) + pub fn fail_on_calls(&self, start: usize, end: usize) { + self.fail_range_start.store(start, Ordering::SeqCst); + self.fail_range_end.store(end, Ordering::SeqCst); + } + /// Configure sink to succeed on all calls pub fn succeed_always(&self) { self.should_fail.store(false, Ordering::SeqCst); self.fail_on_call.store(usize::MAX, Ordering::SeqCst); + self.fail_range_start.store(usize::MAX, Ordering::SeqCst); + self.fail_range_end.store(usize::MAX, Ordering::SeqCst); } pub async fn events(&self) -> Vec { @@ -62,8 +76,12 @@ impl Sink for FailableSink { async fn publish_events(&self, events: Vec) -> EtlResult<()> { let call_num = self.call_count.fetch_add(1, Ordering::SeqCst); + let range_start = self.fail_range_start.load(Ordering::SeqCst); + let range_end = self.fail_range_end.load(Ordering::SeqCst); + // Check if we should fail on this specific call if call_num == self.fail_on_call.load(Ordering::SeqCst) + || (call_num >= range_start && call_num < range_end) || self.should_fail.load(Ordering::SeqCst) { return Err(etl_error!( diff --git a/tests/failover_checkpoint_tests.rs b/tests/failover_checkpoint_tests.rs index 4fc2c87..eb8ff7f 100644 --- a/tests/failover_checkpoint_tests.rs +++ b/tests/failover_checkpoint_tests.rs @@ -5,7 +5,6 @@ use etl::destination::Destination; use etl::error::{ErrorKind, EtlResult}; use etl::store::both::postgres::PostgresStore; use postgres_stream::sink::Sink; -use postgres_stream::store::StreamStore; use postgres_stream::stream::PgStream; use postgres_stream::test_utils::{ TestDatabase, create_postgres_store_with_table_id, insert_events_to_db, make_event_with_id, @@ -63,7 +62,7 @@ async fn test_failover_does_not_overwrite_checkpoint_on_repeated_failures() { create_postgres_store_with_table_id(config.id, &db.config, &db.pool).await; let stream: PgStream = - PgStream::create(config.clone(), sink, store.clone()) + PgStream::create(config.clone(), sink, store) .await .expect("Failed to create PgStream"); @@ -90,8 +89,7 @@ async fn test_failover_does_not_overwrite_checkpoint_on_repeated_failures() { .await .expect("write_events should succeed even if sink fails"); - let stream_store = StreamStore::create(config, store).await.unwrap(); - let (status, _) = stream_store.get_stream_state().await.unwrap(); + let (status, _) = stream.store().get_stream_state().await.unwrap(); match status { StreamStatus::Failover { diff --git a/tests/maintenance_tests.rs b/tests/maintenance_tests.rs index 4111b84..6be82dc 100644 --- a/tests/maintenance_tests.rs +++ b/tests/maintenance_tests.rs @@ -2,7 +2,6 @@ use chrono::Duration; use etl::store::both::postgres::PostgresStore; use postgres_stream::maintenance::run_maintenance; use postgres_stream::sink::memory::MemorySink; -use postgres_stream::store::StreamStore; use postgres_stream::stream::PgStream; use postgres_stream::test_utils::{TestDatabase, create_postgres_store, test_stream_config}; @@ -37,8 +36,8 @@ async fn test_maintenance_creates_future_partitions() { let sink = MemorySink::new(); let store = create_postgres_store(config.id, &db.config, &db.pool).await; - let _stream: PgStream = - PgStream::create(config, sink, store.clone()).await.unwrap(); + let stream: PgStream = + PgStream::create(config, sink, store).await.unwrap(); // Manually delete all partitions except today sqlx::query( @@ -72,11 +71,8 @@ async fn test_maintenance_creates_future_partitions() { assert_eq!(count_before.0, 1); // Run maintenance manually - let stream_store = StreamStore::create(test_stream_config(&db), store.clone()) - .await - .unwrap(); let scheduled_time = chrono::Utc::now(); - run_maintenance(&stream_store, scheduled_time) + run_maintenance(stream.store(), scheduled_time) .await .unwrap(); @@ -99,8 +95,8 @@ async fn test_maintenance_drops_old_partitions() { let sink = MemorySink::new(); let store = create_postgres_store(config.id, &db.config, &db.pool).await; - let _stream: PgStream = - PgStream::create(config, sink, store.clone()).await.unwrap(); + let stream: PgStream = + PgStream::create(config, sink, store).await.unwrap(); // Create an old partition (10 days ago - beyond 7 day retention) let old_date = chrono::Utc::now() - Duration::days(10); @@ -130,11 +126,8 @@ async fn test_maintenance_drops_old_partitions() { assert!(exists_before.0); // Run maintenance - let stream_store = StreamStore::create(test_stream_config(&db), store.clone()) - .await - .unwrap(); let scheduled_time = chrono::Utc::now(); - run_maintenance(&stream_store, scheduled_time) + run_maintenance(stream.store(), scheduled_time) .await .unwrap(); @@ -159,30 +152,28 @@ async fn test_maintenance_updates_next_maintenance_at() { let sink = MemorySink::new(); let store = create_postgres_store(config.id, &db.config, &db.pool).await; - let _stream: PgStream = - PgStream::create(config, sink, store.clone()).await.unwrap(); + let stream: PgStream = + PgStream::create(config, sink, store).await.unwrap(); // Get initial next_maintenance_at - let stream_store = StreamStore::create(test_stream_config(&db), store.clone()) - .await - .unwrap(); - let (_status, next_before) = stream_store.get_stream_state().await.unwrap(); + let (_status, next_before) = stream.store().get_stream_state().await.unwrap(); // Run maintenance let scheduled_time = next_before; - let completed_at = run_maintenance(&stream_store, scheduled_time) + let completed_at = run_maintenance(stream.store(), scheduled_time) .await .unwrap(); // Update the next maintenance time let next_after = next_before + Duration::hours(24); - stream_store + stream + .store() .store_next_maintenance_at(next_after) .await .unwrap(); // Verify next_maintenance_at was updated - let (_status_after, next_after_stored) = stream_store.get_stream_state().await.unwrap(); + let (_status_after, next_after_stored) = stream.store().get_stream_state().await.unwrap(); assert!( next_after_stored > next_before, @@ -201,19 +192,15 @@ async fn test_maintenance_idempotent() { let sink = MemorySink::new(); let store = create_postgres_store(config.id, &db.config, &db.pool).await; - let _stream: PgStream = - PgStream::create(config, sink, store.clone()).await.unwrap(); - - let stream_store = StreamStore::create(test_stream_config(&db), store.clone()) - .await - .unwrap(); + let stream: PgStream = + PgStream::create(config, sink, store).await.unwrap(); // Run maintenance twice let scheduled_time = chrono::Utc::now(); - run_maintenance(&stream_store, scheduled_time) + run_maintenance(stream.store(), scheduled_time) .await .unwrap(); - run_maintenance(&stream_store, scheduled_time) + run_maintenance(stream.store(), scheduled_time) .await .unwrap(); @@ -237,16 +224,12 @@ async fn test_run_maintenance_returns_scheduled_time() { let sink = MemorySink::new(); let store = create_postgres_store(config.id, &db.config, &db.pool).await; - let _stream: PgStream = - PgStream::create(config, sink, store.clone()).await.unwrap(); - - let stream_store = StreamStore::create(test_stream_config(&db), store.clone()) - .await - .unwrap(); + let stream: PgStream = + PgStream::create(config, sink, store).await.unwrap(); // Use a specific scheduled_time (1 hour in the past) let scheduled_time = chrono::Utc::now() - Duration::hours(1); - let returned_time = run_maintenance(&stream_store, scheduled_time) + let returned_time = run_maintenance(stream.store(), scheduled_time) .await .unwrap(); @@ -264,19 +247,15 @@ async fn test_maintenance_scheduling_uses_scheduled_time_not_execution_time() { let sink = MemorySink::new(); let store = create_postgres_store(config.id, &db.config, &db.pool).await; - let _stream: PgStream = - PgStream::create(config, sink, store.clone()).await.unwrap(); - - let stream_store = StreamStore::create(test_stream_config(&db), store.clone()) - .await - .unwrap(); + let stream: PgStream = + PgStream::create(config, sink, store).await.unwrap(); // Simulate a maintenance that was scheduled for 2 hours ago // This can happen if the system was down during the scheduled maintenance time let original_scheduled_time = chrono::Utc::now() - Duration::hours(2); // Run maintenance with the original scheduled time - let returned_time = run_maintenance(&stream_store, original_scheduled_time) + let returned_time = run_maintenance(stream.store(), original_scheduled_time) .await .unwrap(); @@ -284,12 +263,13 @@ async fn test_maintenance_scheduling_uses_scheduled_time_not_execution_time() { // not 24h from now. This ensures consistent daily scheduling even if // maintenance runs late. let expected_next_maintenance = original_scheduled_time + Duration::hours(24); - stream_store + stream + .store() .store_next_maintenance_at(expected_next_maintenance) .await .unwrap(); - let (_status, actual_next_maintenance) = stream_store.get_stream_state().await.unwrap(); + let (_status, actual_next_maintenance) = stream.store().get_stream_state().await.unwrap(); // Verify the next maintenance is scheduled based on the original scheduled time assert_eq!( diff --git a/tests/stream_tests.rs b/tests/stream_tests.rs index d370ecd..80f164f 100644 --- a/tests/stream_tests.rs +++ b/tests/stream_tests.rs @@ -2,13 +2,13 @@ use chrono::Duration; use etl::destination::Destination; use etl::store::both::postgres::PostgresStore; use postgres_stream::sink::memory::MemorySink; -use postgres_stream::store::StreamStore; use postgres_stream::stream::PgStream; use postgres_stream::test_utils::{ FailableSink, TestDatabase, create_postgres_store, create_postgres_store_with_table_id, insert_events_to_db, make_event_with_id, make_test_event, test_stream_config, }; use postgres_stream::types::{StreamStatus, TriggeredEvent}; +use std::collections::HashSet; use std::time::Duration as StdDuration; // Basic stream tests @@ -62,7 +62,7 @@ async fn test_failover_sink_failure_enters_failover_mode() { create_postgres_store_with_table_id(config.id, &db.config, &db.pool).await; let stream: PgStream = - PgStream::create(config.clone(), sink.clone(), store.clone()) + PgStream::create(config.clone(), sink.clone(), store) .await .expect("Failed to create PgStream"); @@ -81,8 +81,7 @@ async fn test_failover_sink_failure_enters_failover_mode() { .await .expect("write_events should succeed even if sink fails"); - let stream_store = StreamStore::create(config, store).await.unwrap(); - let (status, _) = stream_store.get_stream_state().await.unwrap(); + let (status, _) = stream.store().get_stream_state().await.unwrap(); match status { StreamStatus::Failover { @@ -108,7 +107,7 @@ async fn test_failover_recovery_replays_missed_events() { create_postgres_store_with_table_id(config.id, &db.config, &db.pool).await; let stream: PgStream = - PgStream::create(config.clone(), sink.clone(), store.clone()) + PgStream::create(config.clone(), sink.clone(), store) .await .expect("Failed to create PgStream"); @@ -162,8 +161,7 @@ async fn test_failover_recovery_replays_missed_events() { ); } - let stream_store = StreamStore::create(config, store).await.unwrap(); - let (status, _) = stream_store.get_stream_state().await.unwrap(); + let (status, _) = stream.store().get_stream_state().await.unwrap(); assert!( matches!(status, StreamStatus::Healthy), "Stream should return to Healthy after recovery" @@ -181,7 +179,7 @@ async fn test_failover_with_no_missed_events() { create_postgres_store_with_table_id(config.id, &db.config, &db.pool).await; let stream: PgStream = - PgStream::create(config.clone(), sink.clone(), store.clone()) + PgStream::create(config.clone(), sink.clone(), store) .await .expect("Failed to create PgStream"); @@ -237,8 +235,7 @@ async fn test_failover_with_no_missed_events() { ); } - let stream_store = StreamStore::create(config, store).await.unwrap(); - let (status, _) = stream_store.get_stream_state().await.unwrap(); + let (status, _) = stream.store().get_stream_state().await.unwrap(); assert!( matches!(status, StreamStatus::Healthy), "Stream should return to Healthy after recovery" @@ -254,7 +251,7 @@ async fn test_failover_multiple_consecutive_failures() { create_postgres_store_with_table_id(config.id, &db.config, &db.pool).await; let stream: PgStream = - PgStream::create(config.clone(), sink.clone(), store.clone()) + PgStream::create(config.clone(), sink.clone(), store) .await .expect("Failed to create PgStream"); @@ -316,8 +313,7 @@ async fn test_failover_multiple_consecutive_failures() { ); // Verify all 5 unique event IDs are present - let published_ids: std::collections::HashSet = - published.iter().map(|e| e.id.id.clone()).collect(); + let published_ids: HashSet = published.iter().map(|e| e.id.id.clone()).collect(); assert_eq!( published_ids.len(), 5, @@ -332,8 +328,7 @@ async fn test_failover_multiple_consecutive_failures() { ); } - let stream_store = StreamStore::create(config, store).await.unwrap(); - let (status, _) = stream_store.get_stream_state().await.unwrap(); + let (status, _) = stream.store().get_stream_state().await.unwrap(); assert!( matches!(status, StreamStatus::Healthy), "Stream should recover to Healthy even after multiple failures" @@ -349,7 +344,7 @@ async fn test_failover_large_gap_recovery() { create_postgres_store_with_table_id(config.id, &db.config, &db.pool).await; let stream: PgStream = - PgStream::create(config.clone(), sink.clone(), store.clone()) + PgStream::create(config.clone(), sink.clone(), store) .await .expect("Failed to create PgStream"); @@ -415,8 +410,7 @@ async fn test_failover_large_gap_recovery() { ); } - let stream_store = StreamStore::create(config, store).await.unwrap(); - let (status, _) = stream_store.get_stream_state().await.unwrap(); + let (status, _) = stream.store().get_stream_state().await.unwrap(); assert!( matches!(status, StreamStatus::Healthy), "Stream should recover to Healthy after large gap" @@ -432,7 +426,7 @@ async fn test_failover_recovery_does_not_hang_when_replay_exceeds_batch_size() { create_postgres_store_with_table_id(config.id, &db.config, &db.pool).await; let stream: PgStream = - PgStream::create(config.clone(), sink.clone(), store.clone()) + PgStream::create(config.clone(), sink.clone(), store) .await .expect("Failed to create PgStream"); @@ -479,8 +473,7 @@ async fn test_failover_recovery_does_not_hang_when_replay_exceeds_batch_size() { ); recovery_result.unwrap().unwrap(); - let stream_store = StreamStore::create(config, store).await.unwrap(); - let (status, _) = stream_store.get_stream_state().await.unwrap(); + let (status, _) = stream.store().get_stream_state().await.unwrap(); assert!( matches!(status, StreamStatus::Healthy), "Stream should return to Healthy after large replay recovery" @@ -530,15 +523,12 @@ async fn test_failover_checkpoint_persists_across_stream_instances() { // Create a new stream instance - should pick up failover state let stream2: PgStream = - PgStream::create(config.clone(), sink.clone(), store.clone()) + PgStream::create(config.clone(), sink.clone(), store) .await .expect("Failed to create PgStream after restart"); // Verify it's in failover state - let stream_store = StreamStore::create(config.clone(), store.clone()) - .await - .unwrap(); - let (status, _) = stream_store.get_stream_state().await.unwrap(); + let (status, _) = stream2.store().get_stream_state().await.unwrap(); assert!( matches!(status, StreamStatus::Failover { .. }), "Stream should be in Failover state after restart" @@ -571,8 +561,7 @@ async fn test_failover_checkpoint_persists_across_stream_instances() { ); } - let stream_store2 = StreamStore::create(config, store).await.unwrap(); - let (status, _) = stream_store2.get_stream_state().await.unwrap(); + let (status, _) = stream2.store().get_stream_state().await.unwrap(); assert!( matches!(status, StreamStatus::Healthy), "Stream should be Healthy after recovery" @@ -595,11 +584,9 @@ async fn test_shutdown_succeeds_and_preserves_maintenance_schedule() { .unwrap(); // Set next_maintenance_at to 1 hour in the past - let helper_store = StreamStore::create(test_stream_config(&db), store.clone()) - .await - .unwrap(); let past_maintenance_time = chrono::Utc::now() - Duration::hours(1); - helper_store + stream1 + .store() .store_next_maintenance_at(past_maintenance_time) .await .unwrap(); @@ -607,15 +594,12 @@ async fn test_shutdown_succeeds_and_preserves_maintenance_schedule() { // Drop and recreate stream - this reads the past time from DB and runs maintenance drop(stream1); let stream2: PgStream = - PgStream::create(config.clone(), MemorySink::new(), store.clone()) + PgStream::create(config.clone(), MemorySink::new(), store) .await .unwrap(); // Verify maintenance ran and scheduled next run 24h from the past scheduled time - let helper_store2 = StreamStore::create(test_stream_config(&db), store.clone()) - .await - .unwrap(); - let (_, next_maintenance) = helper_store2.get_stream_state().await.unwrap(); + let (_, next_maintenance) = stream2.store().get_stream_state().await.unwrap(); let expected_next = past_maintenance_time + Duration::hours(24); assert_eq!( @@ -628,10 +612,7 @@ async fn test_shutdown_succeeds_and_preserves_maintenance_schedule() { stream2.shutdown().await.unwrap(); // Schedule should be preserved after shutdown - let helper_store3 = StreamStore::create(test_stream_config(&db), store.clone()) - .await - .unwrap(); - let (_, next_after_shutdown) = helper_store3.get_stream_state().await.unwrap(); + let (_, next_after_shutdown) = stream2.store().get_stream_state().await.unwrap(); assert_eq!( next_after_shutdown.timestamp(), @@ -639,3 +620,185 @@ async fn test_shutdown_succeeds_and_preserves_maintenance_schedule() { "Next maintenance schedule should be preserved after shutdown" ); } + +// Flaky sink failover tests + +#[tokio::test(flavor = "multi_thread")] +async fn test_failover_flaky_sink_mid_replay_recovers() { + let db = TestDatabase::spawn().await; + let config = test_stream_config(&db); + let sink = FailableSink::new(); + let (store, table_id) = + create_postgres_store_with_table_id(config.id, &db.config, &db.pool).await; + + let stream: PgStream = + PgStream::create(config.clone(), sink.clone(), store) + .await + .expect("Failed to create PgStream"); + + let event_ids = insert_events_to_db(&db, 6).await; + + // Event 0: success + sink.succeed_always(); + stream + .write_events(vec![make_event_with_id( + table_id, + event_ids.first().expect("Should have event 0"), + serde_json::json!({"seq": 1}), + )]) + .await + .unwrap(); + + // Event 1: fail - enters failover (call 1) + sink.fail_on_call(1); + stream + .write_events(vec![make_event_with_id( + table_id, + event_ids.get(1).expect("Should have event 1"), + serde_json::json!({"seq": 2}), + )]) + .await + .unwrap(); + + let (status, _) = stream.store().get_stream_state().await.unwrap(); + assert!( + matches!(status, StreamStatus::Failover { .. }), + "Stream should be in failover after sink failure" + ); + + // Next tick: checkpoint republish succeeds (call 2), but replay publish fails (call 3) + sink.fail_on_call(3); + stream + .write_events(vec![make_event_with_id( + table_id, + event_ids.get(5).expect("Should have event 5"), + serde_json::json!({"seq": 6}), + )]) + .await + .unwrap(); + + // Should still be in failover because replay failed mid-way + let (status, _) = stream.store().get_stream_state().await.unwrap(); + assert!( + matches!(status, StreamStatus::Failover { .. }), + "Stream should remain in failover after mid-replay failure" + ); + + // Next tick: everything succeeds - full recovery + sink.succeed_always(); + stream + .write_events(vec![make_event_with_id( + table_id, + event_ids.get(5).expect("Should have event 5"), + serde_json::json!({"seq": 6}), + )]) + .await + .unwrap(); + + let (status, _) = stream.store().get_stream_state().await.unwrap(); + assert!( + matches!(status, StreamStatus::Healthy), + "Stream should recover to Healthy after retry" + ); + + // Verify all unique event IDs were published + let published = sink.events().await; + let published_ids: HashSet = published.iter().map(|e| e.id.id.clone()).collect(); + for event_id in &event_ids { + assert!( + published_ids.contains(&event_id.id), + "Event {} should be in published events", + event_id.id + ); + } +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_failover_flaky_sink_checkpoint_publish_retries() { + let db = TestDatabase::spawn().await; + let config = test_stream_config(&db); + let sink = FailableSink::new(); + let (store, table_id) = + create_postgres_store_with_table_id(config.id, &db.config, &db.pool).await; + + let stream: PgStream = + PgStream::create(config.clone(), sink.clone(), store) + .await + .expect("Failed to create PgStream"); + + let event_ids = insert_events_to_db(&db, 4).await; + + // Event 0: success (call 0) + sink.succeed_always(); + stream + .write_events(vec![make_event_with_id( + table_id, + event_ids.first().expect("Should have event 0"), + serde_json::json!({"seq": 1}), + )]) + .await + .unwrap(); + + // Event 1: fail - enters failover (call 1) + sink.fail_on_call(1); + stream + .write_events(vec![make_event_with_id( + table_id, + event_ids.get(1).expect("Should have event 1"), + serde_json::json!({"seq": 2}), + )]) + .await + .unwrap(); + + let (status, _) = stream.store().get_stream_state().await.unwrap(); + assert!( + matches!(status, StreamStatus::Failover { .. }), + "Stream should be in failover after sink failure" + ); + + // Next tick: checkpoint republish fails (call 2) - sink still unavailable + sink.fail_on_call(2); + stream + .write_events(vec![make_event_with_id( + table_id, + event_ids.get(3).expect("Should have event 3"), + serde_json::json!({"seq": 4}), + )]) + .await + .unwrap(); + + // Should still be in failover + let (status, _) = stream.store().get_stream_state().await.unwrap(); + assert!( + matches!(status, StreamStatus::Failover { .. }), + "Stream should remain in failover when checkpoint publish fails" + ); + + // Next tick: everything succeeds - full recovery + sink.succeed_always(); + stream + .write_events(vec![make_event_with_id( + table_id, + event_ids.get(3).expect("Should have event 3"), + serde_json::json!({"seq": 4}), + )]) + .await + .unwrap(); + + let (status, _) = stream.store().get_stream_state().await.unwrap(); + assert!( + matches!(status, StreamStatus::Healthy), + "Stream should recover to Healthy after checkpoint publish retry succeeds" + ); + + // Verify all unique event IDs were published + let published = sink.events().await; + let published_ids: HashSet = published.iter().map(|e| e.id.id.clone()).collect(); + for event_id in &event_ids { + assert!( + published_ids.contains(&event_id.id), + "Event {} should be in published events", + event_id.id + ); + } +}