Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 2 additions & 15 deletions src/metrics/failover.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
use metrics::{Unit, describe_gauge, describe_histogram, gauge, histogram};
use metrics::{Unit, describe_gauge, gauge};

/// Metric name for tracking if a stream is currently in failover mode.
///
/// This gauge is set to 1 when the stream is in failover mode and 0 otherwise.
const STREAM_FAILOVER_ACTIVE: &str = "stream_failover_active";

/// Metric name for recording the time spent in failover mode.
const STREAM_FAILOVER_DURATION_MILLISECONDS: &str = "stream_failover_duration_milliseconds";

/// Label key for stream identifier.
const STREAM_ID_LABEL: &str = "stream_id";

Expand All @@ -18,11 +15,6 @@ pub(crate) fn register_failover_metrics() {
Unit::Count,
"Whether the stream is currently in failover mode (1 = failover, 0 = healthy)"
);
describe_histogram!(
STREAM_FAILOVER_DURATION_MILLISECONDS,
Unit::Seconds,
"Time spent in failover mode before recovery"
);
}

/// Records that the stream has entered failover mode.
Expand All @@ -35,15 +27,10 @@ pub fn record_failover_entered(stream_id: u64) {
}

/// Records that the stream has recovered from failover mode.
pub fn record_failover_recovered(stream_id: u64, duration_milliseconds: f64) {
pub fn record_failover_recovered(stream_id: u64) {
gauge!(
STREAM_FAILOVER_ACTIVE,
STREAM_ID_LABEL => stream_id.to_string()
)
.set(0.0);
histogram!(
STREAM_FAILOVER_DURATION_MILLISECONDS,
STREAM_ID_LABEL => stream_id.to_string()
)
.record(duration_milliseconds);
}
39 changes: 24 additions & 15 deletions src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,18 +50,25 @@ where
// create stream store
let store = StreamStore::create(config.clone(), store).await?;

// run initial maintenance synchronously during startup if due
let (status, next_maintenance_at) = store.get_stream_state().await?;
if let StreamStatus::Failover {
checkpoint_event_id,
} = &status
{
warn!(
checkpoint_event_id = %checkpoint_event_id.id,
"Stream starting in failover mode; recovery will be attempted on next batch"
);

// sync failover metric
match &status {
StreamStatus::Failover {
checkpoint_event_id,
} => {
warn!(
checkpoint_event_id = %checkpoint_event_id.id,
"Stream starting in failover mode; recovery will be attempted on next batch"
);
metrics::record_failover_entered(config.id);
}
StreamStatus::Healthy => {
metrics::record_failover_recovered(config.id);
}
}

// run initial maintenance synchronously during startup if due
if Utc::now() >= next_maintenance_at {
run_maintenance(&store, next_maintenance_at).await?;
let next = next_maintenance_at + chrono::Duration::hours(24);
Expand Down Expand Up @@ -216,8 +223,6 @@ where
checkpoint_event_id: &EventIdentifier,
current_batch_event_id: &EventIdentifier,
) -> EtlResult<bool> {
let failover_start = Utc::now();

let checkpoint_event = self.store.get_checkpoint_event(checkpoint_event_id).await?;

// Record processing lag for checkpoint event
Expand Down Expand Up @@ -280,10 +285,7 @@ where
replay_client.update_checkpoint(&last_event_id).await?;
}

// Record successful failover recovery
let failover_duration_milliseconds =
(Utc::now() - failover_start).num_milliseconds() as f64;
metrics::record_failover_recovered(self.config.id, failover_duration_milliseconds);
metrics::record_failover_recovered(self.config.id);

self.store
.store_stream_status(StreamStatus::Healthy)
Expand All @@ -295,6 +297,13 @@ where
}
}

#[cfg(feature = "test-utils")]
impl<Sink, SchemaStore> PgStream<Sink, SchemaStore> {
pub fn store(&self) -> &StreamStore<SchemaStore> {
&self.store
}
}

impl<Sink, SchemaStore> Destination for PgStream<Sink, SchemaStore>
where
Sink: SinkTrait + Sync,
Expand Down
18 changes: 18 additions & 0 deletions src/test_utils/failable_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ use crate::types::TriggeredEvent;
pub struct FailableSink {
events: Arc<Mutex<Vec<TriggeredEvent>>>,
fail_on_call: Arc<AtomicUsize>,
/// Inclusive start of the range of calls to fail on.
fail_range_start: Arc<AtomicUsize>,
/// Exclusive end of the range of calls to fail on.
fail_range_end: Arc<AtomicUsize>,
call_count: Arc<AtomicUsize>,
should_fail: Arc<AtomicBool>,
}
Expand All @@ -28,6 +32,8 @@ impl FailableSink {
Self {
events: Arc::new(Mutex::new(Vec::new())),
fail_on_call: Arc::new(AtomicUsize::new(usize::MAX)),
fail_range_start: Arc::new(AtomicUsize::new(usize::MAX)),
fail_range_end: Arc::new(AtomicUsize::new(usize::MAX)),
call_count: Arc::new(AtomicUsize::new(0)),
should_fail: Arc::new(AtomicBool::new(false)),
}
Expand All @@ -38,10 +44,18 @@ impl FailableSink {
self.fail_on_call.store(n, Ordering::SeqCst);
}

/// Configure sink to fail on calls in the range `[start, end)` (0-indexed)
pub fn fail_on_calls(&self, start: usize, end: usize) {
self.fail_range_start.store(start, Ordering::SeqCst);
self.fail_range_end.store(end, Ordering::SeqCst);
}

/// Configure sink to succeed on all calls
pub fn succeed_always(&self) {
self.should_fail.store(false, Ordering::SeqCst);
self.fail_on_call.store(usize::MAX, Ordering::SeqCst);
self.fail_range_start.store(usize::MAX, Ordering::SeqCst);
self.fail_range_end.store(usize::MAX, Ordering::SeqCst);
}

pub async fn events(&self) -> Vec<TriggeredEvent> {
Expand All @@ -62,8 +76,12 @@ impl Sink for FailableSink {
async fn publish_events(&self, events: Vec<TriggeredEvent>) -> EtlResult<()> {
let call_num = self.call_count.fetch_add(1, Ordering::SeqCst);

let range_start = self.fail_range_start.load(Ordering::SeqCst);
let range_end = self.fail_range_end.load(Ordering::SeqCst);

// Check if we should fail on this specific call
if call_num == self.fail_on_call.load(Ordering::SeqCst)
|| (call_num >= range_start && call_num < range_end)
|| self.should_fail.load(Ordering::SeqCst)
{
return Err(etl_error!(
Expand Down
6 changes: 2 additions & 4 deletions tests/failover_checkpoint_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use etl::destination::Destination;
use etl::error::{ErrorKind, EtlResult};
use etl::store::both::postgres::PostgresStore;
use postgres_stream::sink::Sink;
use postgres_stream::store::StreamStore;
use postgres_stream::stream::PgStream;
use postgres_stream::test_utils::{
TestDatabase, create_postgres_store_with_table_id, insert_events_to_db, make_event_with_id,
Expand Down Expand Up @@ -63,7 +62,7 @@ async fn test_failover_does_not_overwrite_checkpoint_on_repeated_failures() {
create_postgres_store_with_table_id(config.id, &db.config, &db.pool).await;

let stream: PgStream<FailFirstNSink, PostgresStore> =
PgStream::create(config.clone(), sink, store.clone())
PgStream::create(config.clone(), sink, store)
.await
.expect("Failed to create PgStream");

Expand All @@ -90,8 +89,7 @@ async fn test_failover_does_not_overwrite_checkpoint_on_repeated_failures() {
.await
.expect("write_events should succeed even if sink fails");

let stream_store = StreamStore::create(config, store).await.unwrap();
let (status, _) = stream_store.get_stream_state().await.unwrap();
let (status, _) = stream.store().get_stream_state().await.unwrap();

match status {
StreamStatus::Failover {
Expand Down
72 changes: 26 additions & 46 deletions tests/maintenance_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use chrono::Duration;
use etl::store::both::postgres::PostgresStore;
use postgres_stream::maintenance::run_maintenance;
use postgres_stream::sink::memory::MemorySink;
use postgres_stream::store::StreamStore;
use postgres_stream::stream::PgStream;
use postgres_stream::test_utils::{TestDatabase, create_postgres_store, test_stream_config};

Expand Down Expand Up @@ -37,8 +36,8 @@ async fn test_maintenance_creates_future_partitions() {
let sink = MemorySink::new();
let store = create_postgres_store(config.id, &db.config, &db.pool).await;

let _stream: PgStream<MemorySink, PostgresStore> =
PgStream::create(config, sink, store.clone()).await.unwrap();
let stream: PgStream<MemorySink, PostgresStore> =
PgStream::create(config, sink, store).await.unwrap();

// Manually delete all partitions except today
sqlx::query(
Expand Down Expand Up @@ -72,11 +71,8 @@ async fn test_maintenance_creates_future_partitions() {
assert_eq!(count_before.0, 1);

// Run maintenance manually
let stream_store = StreamStore::create(test_stream_config(&db), store.clone())
.await
.unwrap();
let scheduled_time = chrono::Utc::now();
run_maintenance(&stream_store, scheduled_time)
run_maintenance(stream.store(), scheduled_time)
.await
.unwrap();

Expand All @@ -99,8 +95,8 @@ async fn test_maintenance_drops_old_partitions() {
let sink = MemorySink::new();
let store = create_postgres_store(config.id, &db.config, &db.pool).await;

let _stream: PgStream<MemorySink, PostgresStore> =
PgStream::create(config, sink, store.clone()).await.unwrap();
let stream: PgStream<MemorySink, PostgresStore> =
PgStream::create(config, sink, store).await.unwrap();

// Create an old partition (10 days ago - beyond 7 day retention)
let old_date = chrono::Utc::now() - Duration::days(10);
Expand Down Expand Up @@ -130,11 +126,8 @@ async fn test_maintenance_drops_old_partitions() {
assert!(exists_before.0);

// Run maintenance
let stream_store = StreamStore::create(test_stream_config(&db), store.clone())
.await
.unwrap();
let scheduled_time = chrono::Utc::now();
run_maintenance(&stream_store, scheduled_time)
run_maintenance(stream.store(), scheduled_time)
.await
.unwrap();

Expand All @@ -159,30 +152,28 @@ async fn test_maintenance_updates_next_maintenance_at() {
let sink = MemorySink::new();
let store = create_postgres_store(config.id, &db.config, &db.pool).await;

let _stream: PgStream<MemorySink, PostgresStore> =
PgStream::create(config, sink, store.clone()).await.unwrap();
let stream: PgStream<MemorySink, PostgresStore> =
PgStream::create(config, sink, store).await.unwrap();

// Get initial next_maintenance_at
let stream_store = StreamStore::create(test_stream_config(&db), store.clone())
.await
.unwrap();
let (_status, next_before) = stream_store.get_stream_state().await.unwrap();
let (_status, next_before) = stream.store().get_stream_state().await.unwrap();

// Run maintenance
let scheduled_time = next_before;
let completed_at = run_maintenance(&stream_store, scheduled_time)
let completed_at = run_maintenance(stream.store(), scheduled_time)
.await
.unwrap();

// Update the next maintenance time
let next_after = next_before + Duration::hours(24);
stream_store
stream
.store()
.store_next_maintenance_at(next_after)
.await
.unwrap();

// Verify next_maintenance_at was updated
let (_status_after, next_after_stored) = stream_store.get_stream_state().await.unwrap();
let (_status_after, next_after_stored) = stream.store().get_stream_state().await.unwrap();

assert!(
next_after_stored > next_before,
Expand All @@ -201,19 +192,15 @@ async fn test_maintenance_idempotent() {
let sink = MemorySink::new();
let store = create_postgres_store(config.id, &db.config, &db.pool).await;

let _stream: PgStream<MemorySink, PostgresStore> =
PgStream::create(config, sink, store.clone()).await.unwrap();

let stream_store = StreamStore::create(test_stream_config(&db), store.clone())
.await
.unwrap();
let stream: PgStream<MemorySink, PostgresStore> =
PgStream::create(config, sink, store).await.unwrap();

// Run maintenance twice
let scheduled_time = chrono::Utc::now();
run_maintenance(&stream_store, scheduled_time)
run_maintenance(stream.store(), scheduled_time)
.await
.unwrap();
run_maintenance(&stream_store, scheduled_time)
run_maintenance(stream.store(), scheduled_time)
.await
.unwrap();

Expand All @@ -237,16 +224,12 @@ async fn test_run_maintenance_returns_scheduled_time() {
let sink = MemorySink::new();
let store = create_postgres_store(config.id, &db.config, &db.pool).await;

let _stream: PgStream<MemorySink, PostgresStore> =
PgStream::create(config, sink, store.clone()).await.unwrap();

let stream_store = StreamStore::create(test_stream_config(&db), store.clone())
.await
.unwrap();
let stream: PgStream<MemorySink, PostgresStore> =
PgStream::create(config, sink, store).await.unwrap();

// Use a specific scheduled_time (1 hour in the past)
let scheduled_time = chrono::Utc::now() - Duration::hours(1);
let returned_time = run_maintenance(&stream_store, scheduled_time)
let returned_time = run_maintenance(stream.store(), scheduled_time)
.await
.unwrap();

Expand All @@ -264,32 +247,29 @@ async fn test_maintenance_scheduling_uses_scheduled_time_not_execution_time() {
let sink = MemorySink::new();
let store = create_postgres_store(config.id, &db.config, &db.pool).await;

let _stream: PgStream<MemorySink, PostgresStore> =
PgStream::create(config, sink, store.clone()).await.unwrap();

let stream_store = StreamStore::create(test_stream_config(&db), store.clone())
.await
.unwrap();
let stream: PgStream<MemorySink, PostgresStore> =
PgStream::create(config, sink, store).await.unwrap();

// Simulate a maintenance that was scheduled for 2 hours ago
// This can happen if the system was down during the scheduled maintenance time
let original_scheduled_time = chrono::Utc::now() - Duration::hours(2);

// Run maintenance with the original scheduled time
let returned_time = run_maintenance(&stream_store, original_scheduled_time)
let returned_time = run_maintenance(stream.store(), original_scheduled_time)
.await
.unwrap();

// The next maintenance should be scheduled 24h from the ORIGINAL scheduled time,
// not 24h from now. This ensures consistent daily scheduling even if
// maintenance runs late.
let expected_next_maintenance = original_scheduled_time + Duration::hours(24);
stream_store
stream
.store()
.store_next_maintenance_at(expected_next_maintenance)
.await
.unwrap();

let (_status, actual_next_maintenance) = stream_store.get_stream_state().await.unwrap();
let (_status, actual_next_maintenance) = stream.store().get_stream_state().await.unwrap();

// Verify the next maintenance is scheduled based on the original scheduled time
assert_eq!(
Expand Down
Loading
Loading