Skip to content

Commit c8c2e3b

Browse files
committed
review: surface backpressure reject/force-flush stats, preserve restored bucket range
Addresses code-review findings on the backpressure change: - StatsSnapshot/stats table now expose backpressure_rejected_total and backpressure_force_flush_total (were OTel-only). The change says "page on rejected", so operators must be able to see it via SELECT * FROM timefusion_stats when telemetry isn't wired — especially on first deploys. - FlushableBucket carries the rows' real min/max timestamp; take captures it before resetting the bucket atomics and restore_taken_bucket replays it (monotonic widen). Previously restore collapsed the range to the bucket start, hiding restored rows from time-range pruning until the next insert. Regression test restore_taken_bucket_preserves_timestamp_range. - DEFAULT_BUCKET_DURATION_MICROS 600s -> 300s to track the config default, so the test-only BUCKET_DURATION_MICROS const can't diverge from the process-global runtime value once any test pins the OnceLock. - Clarify the intentional drop(commit_guard)-before-backoff in both the append and dedup retry paths so it isn't removed as apparently redundant (holding it across the sleep would serialize all writers behind one writer's backoff).
1 parent 8ee32b6 commit c8c2e3b

4 files changed

Lines changed: 126 additions & 41 deletions

File tree

src/buffered_write_layer.rs

Lines changed: 60 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -110,32 +110,40 @@ fn quarantine_entry(quarantine_dir: &std::path::Path, entry: &WalEntry, kind: &s
110110
/// `snapshot_stats()` and rendered as rows by `timefusion.stats()`.
111111
#[derive(Debug, Clone)]
112112
pub struct StatsSnapshot {
113-
pub mem_project_count: usize,
114-
pub mem_total_buckets: usize,
115-
pub mem_total_rows: usize,
116-
pub mem_total_batches: usize,
117-
pub mem_estimated_bytes: usize,
118-
pub reserved_bytes: usize,
119-
pub max_memory_bytes: usize,
120-
pub pressure_pct: u32,
121-
pub wal_files: usize,
122-
pub wal_disk_bytes: u64,
123-
pub wal_shards_per_topic: usize,
124-
pub wal_known_topics: usize,
125-
pub bucket_duration_micros: i64,
113+
pub mem_project_count: usize,
114+
pub mem_total_buckets: usize,
115+
pub mem_total_rows: usize,
116+
pub mem_total_batches: usize,
117+
pub mem_estimated_bytes: usize,
118+
pub reserved_bytes: usize,
119+
pub max_memory_bytes: usize,
120+
pub pressure_pct: u32,
121+
pub wal_files: usize,
122+
pub wal_disk_bytes: u64,
123+
pub wal_shards_per_topic: usize,
124+
pub wal_known_topics: usize,
125+
pub bucket_duration_micros: i64,
126126
/// Age of the oldest bucket in MemBuffer (seconds, computed from
127127
/// `now - min(bucket.min_timestamp)`). None when MemBuffer is empty.
128128
/// Alerting target: alert at > 2× `flush_interval_secs`.
129-
pub oldest_bucket_age_secs: Option<u64>,
129+
pub oldest_bucket_age_secs: Option<u64>,
130130
/// Cumulative flush successes/failures since process start. Mirrors the
131131
/// OTel `timefusion.flush.completed`/`failed` counters so tests can
132132
/// assert without configuring OTel.
133-
pub flush_completed_total: u64,
134-
pub flush_failed_total: u64,
133+
pub flush_completed_total: u64,
134+
pub flush_failed_total: u64,
135135
/// Times an insert hit the memory hard limit and applied backpressure
136136
/// (synchronous flush-to-Delta) instead of rejecting. Sustained growth =
137137
/// ingest outpacing flush; the matching OTel counter is the alert target.
138-
pub backpressure_engaged_total: u64,
138+
pub backpressure_engaged_total: u64,
139+
/// Inserts rejected after the backpressure window expired without freeing
140+
/// memory — Delta flush isn't keeping up. PAGE on any growth (data is still
141+
/// in the WAL but ingest is now dropping). Mirrored from OTel so operators
142+
/// can watch it via the stats table when telemetry isn't wired.
143+
pub backpressure_rejected_total: u64,
144+
/// Open-bucket force-flush escalations (a single busy window was itself the
145+
/// pressure). Sustained growth = windows too large for the budget.
146+
pub backpressure_force_flush_total: u64,
139147
}
140148

141149
#[derive(Debug, Default)]
@@ -193,6 +201,10 @@ struct CoalescedGroup {
193201
wal_positions: Vec<Option<walrus_rust::WalPosition>>,
194202
/// Source bucket_ids; drained from MemBuffer after the combined commit succeeds.
195203
source_bucket_ids: Vec<i64>,
204+
/// Min/max timestamp across absorbed buckets (Option so the derived Default's
205+
/// 0 can't corrupt the min). Carried onto the combined FlushableBucket.
206+
min_timestamp: Option<i64>,
207+
max_timestamp: Option<i64>,
196208
}
197209

198210
struct CombinedBucket {
@@ -225,6 +237,8 @@ impl CoalescedGroup {
225237
}
226238
// Merge per-shard positions (max).
227239
self.wal_positions = merge_wal_positions(std::mem::take(&mut self.wal_positions), b.wal_positions);
240+
self.min_timestamp = Some(self.min_timestamp.map_or(b.min_timestamp, |m| m.min(b.min_timestamp)));
241+
self.max_timestamp = Some(self.max_timestamp.map_or(b.max_timestamp, |m| m.max(b.max_timestamp)));
228242
self.source_bucket_ids.push(b.bucket_id);
229243
}
230244

@@ -236,6 +250,8 @@ impl CoalescedGroup {
236250
wal_shard_counts,
237251
wal_positions,
238252
source_bucket_ids,
253+
min_timestamp,
254+
max_timestamp,
239255
} = self;
240256
// `absorb` is only called via `groups.entry(..).or_default().absorb(b)`
241257
// so `key` is always set by the time we collapse the group.
@@ -251,6 +267,8 @@ impl CoalescedGroup {
251267
row_count,
252268
wal_shard_counts,
253269
wal_positions,
270+
min_timestamp: min_timestamp.unwrap_or(i64::MAX),
271+
max_timestamp: max_timestamp.unwrap_or(i64::MIN),
254272
};
255273
CombinedBucket { combined, source_bucket_ids }
256274
}
@@ -267,38 +285,40 @@ pub type TantivyIndexCallback =
267285
Arc<dyn Fn(String, String, Vec<RecordBatch>, Vec<String>) -> futures::future::BoxFuture<'static, anyhow::Result<()>> + Send + Sync>;
268286

269287
pub struct BufferedWriteLayer {
270-
config: Arc<AppConfig>,
271-
wal: Arc<WalManager>,
272-
mem_buffer: Arc<MemBuffer>,
273-
shutdown: CancellationToken,
274-
delta_write_callback: Option<DeltaWriteCallback>,
275-
tantivy_index_callback: Option<TantivyIndexCallback>,
276-
background_tasks: Mutex<Vec<JoinHandle<()>>>,
277-
flush_lock: Mutex<()>,
278-
reserved_bytes: AtomicUsize, // Memory reserved for in-flight writes
279-
pressure_notify: Arc<Notify>, // Wakes flush task when pressure threshold crossed
288+
config: Arc<AppConfig>,
289+
wal: Arc<WalManager>,
290+
mem_buffer: Arc<MemBuffer>,
291+
shutdown: CancellationToken,
292+
delta_write_callback: Option<DeltaWriteCallback>,
293+
tantivy_index_callback: Option<TantivyIndexCallback>,
294+
background_tasks: Mutex<Vec<JoinHandle<()>>>,
295+
flush_lock: Mutex<()>,
296+
reserved_bytes: AtomicUsize, // Memory reserved for in-flight writes
297+
pressure_notify: Arc<Notify>, // Wakes flush task when pressure threshold crossed
280298
/// Notified at the end of every flush task iteration (success or failure).
281299
/// Test hook: lets E2E harnesses await actual completion of background work
282300
/// instead of racing wall-clock sleeps.
283-
flush_tick_notify: Arc<Notify>,
301+
flush_tick_notify: Arc<Notify>,
284302
/// Notified at the end of every eviction task iteration.
285-
eviction_tick_notify: Arc<Notify>,
303+
eviction_tick_notify: Arc<Notify>,
286304
/// Cumulative flush counters mirrored alongside OTel `record_flush`.
287305
/// OTel global metric state is opt-in (only initialized when telemetry is
288306
/// configured), so these atomics give the harness an in-process way to
289307
/// assert on what the global counters would be.
290-
flush_completed_total: AtomicU64,
291-
flush_failed_total: AtomicU64,
292-
backpressure_engaged_total: AtomicU64,
308+
flush_completed_total: AtomicU64,
309+
flush_failed_total: AtomicU64,
310+
backpressure_engaged_total: AtomicU64,
311+
backpressure_rejected_total: AtomicU64,
312+
backpressure_force_flush_total: AtomicU64,
293313
// Required for WAL replay of UPDATE/DELETE whose SQL references UDFs.
294-
function_registry: Arc<crate::functions::FnRegistry>,
314+
function_registry: Arc<crate::functions::FnRegistry>,
295315
/// Caps concurrent detached tantivy sidecar builds so a fast flush cycle
296316
/// (post-F4 — one build per (project, table) per cycle) can't fan out
297317
/// past S3 connection / memory limits when many tables flush together.
298318
/// FOLLOW-UP: handles aren't stored; graceful shutdown does not await
299319
/// in-flight tantivy uploads. Acceptable for now because the sidecar is
300320
/// best-effort and the index can be rebuilt from Delta on demand.
301-
tantivy_spawn_sem: Arc<tokio::sync::Semaphore>,
321+
tantivy_spawn_sem: Arc<tokio::sync::Semaphore>,
302322
}
303323

304324
impl std::fmt::Debug for BufferedWriteLayer {
@@ -342,6 +362,8 @@ impl BufferedWriteLayer {
342362
flush_completed_total: AtomicU64::new(0),
343363
flush_failed_total: AtomicU64::new(0),
344364
backpressure_engaged_total: AtomicU64::new(0),
365+
backpressure_rejected_total: AtomicU64::new(0),
366+
backpressure_force_flush_total: AtomicU64::new(0),
345367
function_registry,
346368
// 16 is well above realistic per-cycle table fan-out for the
347369
// monoscope workload (~5 distinct table names) while still
@@ -507,6 +529,7 @@ impl BufferedWriteLayer {
507529
last_mem = now_mem;
508530
if std::time::Instant::now() >= deadline {
509531
crate::metrics::record_backpressure_rejected();
532+
self.backpressure_rejected_total.fetch_add(1, Ordering::Relaxed);
510533
error!(
511534
"Write backpressure exhausted after {:?}: used={}MB still over hard limit — Delta flush is not freeing memory; rejecting (data remains in WAL)",
512535
timeout,
@@ -563,6 +586,7 @@ impl BufferedWriteLayer {
563586
return Ok(());
564587
}
565588
crate::metrics::record_backpressure_force_flush();
589+
self.backpressure_force_flush_total.fetch_add(1, Ordering::Relaxed);
566590
for (project_id, table_name, bucket_id) in self.mem_buffer.current_bucket_keys(current) {
567591
let Some(bucket) = self.mem_buffer.take_bucket_for_flush(&project_id, &table_name, bucket_id) else {
568592
continue;
@@ -1428,6 +1452,8 @@ impl BufferedWriteLayer {
14281452
flush_completed_total: self.flush_completed_total.load(Ordering::Relaxed),
14291453
flush_failed_total: self.flush_failed_total.load(Ordering::Relaxed),
14301454
backpressure_engaged_total: self.backpressure_engaged_total.load(Ordering::Relaxed),
1455+
backpressure_rejected_total: self.backpressure_rejected_total.load(Ordering::Relaxed),
1456+
backpressure_force_flush_total: self.backpressure_force_flush_total.load(Ordering::Relaxed),
14311457
}
14321458
}
14331459

src/database.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2715,8 +2715,11 @@ impl Database {
27152715
last_error = Some(e);
27162716
debug!("Delta write conflict detected, retrying... (attempt {}/{})", retry_count, max_retries);
27172717

2718-
// Release the commit lock before backing off — peers
2719-
// shouldn't queue behind this writer's sleep.
2718+
// Intentionally release the commit lock BEFORE the
2719+
// backoff sleep — do not remove. Holding it across the
2720+
// sleep would serialize every other writer behind this
2721+
// writer's backoff, converting commit contention into a
2722+
// throughput collapse.
27202723
drop(commit_guard);
27212724
// Exponential backoff for better handling of concurrent writes
27222725
let backoff_ms = 100 * (2_u64.pow(retry_count.min(5)));
@@ -3328,6 +3331,9 @@ impl Database {
33283331
break;
33293332
}
33303333
Err(e) => {
3334+
// Drop BEFORE the backoff sleep below — do not remove.
3335+
// Holding the commit lock across the sleep would block
3336+
// every concurrent append behind this dedup retry.
33313337
drop(commit_guard);
33323338
let msg = e.to_string();
33333339
// "Transaction failed" covers the conflict checker erroring

src/mem_buffer.rs

Lines changed: 48 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,12 @@ use crate::functions::FnRegistry;
2929
// longer = larger Delta files. Matches default flush interval for aligned boundaries.
3030
// Note: Timestamps before 1970 (negative microseconds) produce negative bucket IDs,
3131
// which is supported but may result in unexpected ordering if mixed with post-1970 data.
32-
const DEFAULT_BUCKET_DURATION_MICROS: i64 = 10 * 60 * 1_000_000;
32+
// Fallback when `set_bucket_duration_micros` is never called (i.e. unit tests
33+
// that build a MemBuffer directly). MUST track `d_bucket_duration_secs` in
34+
// config.rs — prod always overrides via bootstrap, but keeping the two in sync
35+
// avoids the test-only `BUCKET_DURATION_MICROS` const diverging from the
36+
// process-global runtime value once any test pins the OnceLock.
37+
const DEFAULT_BUCKET_DURATION_MICROS: i64 = 5 * 60 * 1_000_000;
3338
#[cfg(test)]
3439
const BUCKET_DURATION_MICROS: i64 = DEFAULT_BUCKET_DURATION_MICROS;
3540

@@ -55,7 +60,7 @@ const MAX_BATCH_COUNT_PER_BUCKET: usize = 8;
5560
const MAX_BATCH_BYTES_FOR_COALESCE: usize = 4 * 1024 * 1024;
5661

5762
/// Configured bucket window in microseconds. Set once at startup via
58-
/// `set_bucket_duration_micros`; defaults to 10 minutes when unset. Smaller
63+
/// `set_bucket_duration_micros`; defaults to 5 minutes when unset. Smaller
5964
/// windows free MemBuffer memory sooner (because the previous bucket becomes
6065
/// flushable sooner) at the cost of more, smaller Delta commits.
6166
pub fn bucket_duration_micros() -> i64 {
@@ -253,6 +258,12 @@ pub struct FlushableBucket {
253258
/// Written into Delta commit metadata so a crash between Delta commit
254259
/// and `advance_by_counts` can recover the cursor from Delta on restart.
255260
pub wal_positions: Vec<Option<walrus_rust::WalPosition>>,
261+
/// Actual min/max timestamp of the taken rows, captured before the source
262+
/// bucket's atomics were reset. `restore_taken_bucket` replays these so a
263+
/// restored bucket keeps its true time range (and stays visible to
264+
/// time-range pruning) rather than collapsing to the bucket's start.
265+
pub min_timestamp: i64,
266+
pub max_timestamp: i64,
256267
}
257268

258269
#[derive(Debug, Default)]
@@ -1161,6 +1172,8 @@ impl MemBuffer {
11611172
wal_positions,
11621173
row_count: bucket.row_count.load(Ordering::Relaxed),
11631174
wal_shard_counts,
1175+
min_timestamp: bucket.min_timestamp.load(Ordering::Relaxed),
1176+
max_timestamp: bucket.max_timestamp.load(Ordering::Relaxed),
11641177
});
11651178
}
11661179
}
@@ -1216,8 +1229,10 @@ impl MemBuffer {
12161229
let wal_state = std::mem::take(&mut *wal_g);
12171230
let freed = bucket.memory_bytes.swap(0, Ordering::Relaxed);
12181231
let row_count = bucket.row_count.swap(0, Ordering::Relaxed);
1219-
bucket.min_timestamp.store(i64::MAX, Ordering::Relaxed);
1220-
bucket.max_timestamp.store(i64::MIN, Ordering::Relaxed);
1232+
// Capture the real range as we reset the sentinels so a restore (on
1233+
// Delta commit failure) can replay it instead of guessing bucket-start.
1234+
let min_timestamp = bucket.min_timestamp.swap(i64::MAX, Ordering::Relaxed);
1235+
let max_timestamp = bucket.max_timestamp.swap(i64::MIN, Ordering::Relaxed);
12211236
drop(wal_g);
12221237
drop(batches_g);
12231238
drop(bucket_ref);
@@ -1249,6 +1264,8 @@ impl MemBuffer {
12491264
row_count,
12501265
wal_shard_counts: counts,
12511266
wal_positions: positions,
1267+
min_timestamp,
1268+
max_timestamp,
12521269
})
12531270
}
12541271

@@ -1282,7 +1299,11 @@ impl MemBuffer {
12821299
}
12831300
bucket.memory_bytes.fetch_add(added, Ordering::Relaxed);
12841301
bucket.row_count.fetch_add(b.row_count, Ordering::Relaxed);
1285-
bucket.update_timestamps(b.bucket_id * bucket_duration_micros());
1302+
// Replay the true range (monotonic widen) so restored rows stay visible
1303+
// to time-range pruning; concurrent inserts into the same open bucket
1304+
// are preserved since fetch_min/max only widens.
1305+
bucket.update_timestamps(b.min_timestamp);
1306+
bucket.update_timestamps(b.max_timestamp);
12861307
drop(wal_g);
12871308
drop(batches_g);
12881309
self.estimated_bytes.fetch_add(added, Ordering::Relaxed);
@@ -2399,6 +2420,28 @@ mod tests {
23992420
assert_eq!(total, 2, "no text_match preds → all rows returned");
24002421
}
24012422

2423+
/// Regression: `restore_taken_bucket` (the Delta-commit-failure path of the
2424+
/// open-bucket force-flush) used to reset the bucket's min/max to the bucket
2425+
/// *start* (`bucket_id * duration`), hiding restored rows from time-range
2426+
/// pruning until the next insert. It must replay the rows' real range.
2427+
#[test]
2428+
fn restore_taken_bucket_preserves_timestamp_range() {
2429+
use crate::test_utils::test_helpers::{json_to_batch, test_span};
2430+
let buffer = MemBuffer::new();
2431+
let dur = bucket_duration_micros();
2432+
let ts = 7 * dur + 12_345; // mid-bucket — distinct from the bucket-start sentinel
2433+
let bucket_id = MemBuffer::compute_bucket_id(ts);
2434+
buffer.insert("p1", "otel_logs_and_spans", json_to_batch(vec![test_span("a", "svc", "p1")]).unwrap(), ts).unwrap();
2435+
2436+
let taken = buffer.take_bucket_for_flush("p1", "otel_logs_and_spans", bucket_id).expect("bucket taken");
2437+
assert_eq!((taken.min_timestamp, taken.max_timestamp), (ts, ts), "take must capture the real row range");
2438+
2439+
buffer.restore_taken_bucket(&taken); // simulate Delta commit failure
2440+
let again = buffer.take_bucket_for_flush("p1", "otel_logs_and_spans", bucket_id).expect("restored bucket present");
2441+
assert_eq!((again.min_timestamp, again.max_timestamp), (ts, ts), "restore must preserve the true range");
2442+
assert_ne!(again.min_timestamp, bucket_id * dur, "must not collapse to bucket start");
2443+
}
2444+
24022445
#[test]
24032446
fn test_bucket_partitioning() {
24042447
let buffer = MemBuffer::new();

src/stats_table.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,16 @@ impl StatsTableProvider {
111111
));
112112
rows.push(("buffered_layer", "pressure_pct".into(), s.pressure_pct.to_string()));
113113
rows.push(("buffered_layer", "backpressure_engaged_total".into(), s.backpressure_engaged_total.to_string()));
114+
rows.push((
115+
"buffered_layer",
116+
"backpressure_rejected_total".into(),
117+
s.backpressure_rejected_total.to_string(),
118+
));
119+
rows.push((
120+
"buffered_layer",
121+
"backpressure_force_flush_total".into(),
122+
s.backpressure_force_flush_total.to_string(),
123+
));
114124
rows.push(("wal", "files".into(), s.wal_files.to_string()));
115125
rows.push(("wal", "disk_bytes".into(), s.wal_disk_bytes.to_string()));
116126
rows.push(("wal", "disk_mb".into(), format!("{:.1}", s.wal_disk_bytes as f64 / (1024.0 * 1024.0))));

0 commit comments

Comments
 (0)