Skip to content

Commit ed5d40c

Browse files
feat: separate out write path executor with unbounded memory
Currently when there is an OOM while snapshotting, the process keeps going without crashing. This behaviour is observed in main (commit: be25c6f). This means the wal files keep increasing to a point that restarts never can replay all the files. This is happening because of the distribution of memory, in enterprise especially there is no need for an ingester to be allocated just 20% for datafusion memory pool (which runs the snapshot) as parquet cache is not in use at all. This 20% is too conservative for an ingester, so instead of redistributing the memory settings based on the mode it's running, a separate write path executor is introduced in this commit with no bound on memory (still uses `GreedyMemoryPool` under the hoold with `usize::MAX` as upper limit). This means write path executor will always run into OOM and stop the whole process. Also, it is important to let snapshotting process use as much memory as it needs as without that, the buffer will keep getting bigger and run into OOM anyway. closes: #26422
1 parent be25c6f commit ed5d40c

File tree

2 files changed

+49
-4
lines changed

2 files changed

+49
-4
lines changed

influxdb3/src/commands/serve.rs

Lines changed: 45 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
//! Entrypoint for InfluxDB 3 Core Server
22
33
use anyhow::{Context, bail};
4-
use datafusion_util::config::register_iox_object_store;
54
use futures::{FutureExt, future::FusedFuture, pin_mut};
65
use influxdb3_authz::TokenAuthenticator;
76
use influxdb3_cache::{
@@ -546,6 +545,20 @@ pub async fn command(config: Config) -> Result<()> {
546545
let f = SendPanicsToTracing::new_with_metrics(&metrics);
547546
std::mem::forget(f);
548547

548+
// When you have extra executor, you need separate metrics registry! It is not clear what
549+
// the impact would be
550+
// TODO: confirm this is not going to mess up downstream metrics consumers
551+
let write_path_metrics = setup_metric_registry();
552+
553+
// Install custom panic handler and forget about it.
554+
//
555+
// This leaks the handler and prevents it from ever being dropped during the
556+
// lifetime of the program - this is actually a good thing, as it prevents
557+
// the panic handler from being removed while unwinding a panic (which in
558+
// turn, causes a panic - see #548)
559+
let write_path_panic_handler_fn = SendPanicsToTracing::new_with_metrics(&write_path_metrics);
560+
std::mem::forget(write_path_panic_handler_fn);
561+
549562
// Construct a token to trigger clean shutdown
550563
let frontend_shutdown = CancellationToken::new();
551564
let shutdown_manager = ShutdownManager::new(frontend_shutdown.clone());
@@ -619,8 +632,36 @@ pub async fn command(config: Config) -> Result<()> {
619632
Arc::clone(&metrics),
620633
),
621634
));
622-
let runtime_env = exec.new_context().inner().runtime_env();
623-
register_iox_object_store(runtime_env, parquet_store.id(), Arc::clone(&object_store));
635+
636+
// Note: using same metrics registry causes runtime panic.
637+
let write_path_executor = Arc::new(Executor::new_with_config_and_executor(
638+
ExecutorConfig {
639+
// should this be divided? or should this contend for threads with executor that's
640+
// setup for querying only
641+
target_query_partitions: tokio_datafusion_config.num_threads.unwrap(),
642+
object_stores: [&parquet_store]
643+
.into_iter()
644+
.map(|store| (store.id(), Arc::clone(store.object_store())))
645+
.collect(),
646+
metric_registry: Arc::clone(&write_path_metrics),
647+
// use as much memory for persistence, can this be UnboundedMemoryPool?
648+
mem_pool_size: usize::MAX,
649+
// These are new additions, just skimming through the code it does not look like we can
650+
// achieve the same effect as having a separate executor. It looks like it's for "all"
651+
// queries, it'd be nice to have a filter to say when the query matches this pattern
652+
// apply these limits. If that's possible maybe we could avoid creating a separate
653+
// executor.
654+
per_query_mem_pool_config: PerQueryMemoryPoolConfig::Disabled,
655+
heap_memory_limit: None,
656+
},
657+
DedicatedExecutor::new(
658+
"datafusion_write_path",
659+
tokio_datafusion_config
660+
.builder()
661+
.map_err(Error::TokioRuntime)?,
662+
Arc::clone(&write_path_metrics),
663+
),
664+
));
624665

625666
let trace_header_parser = TraceHeaderParser::new()
626667
.with_jaeger_trace_context_header_name(
@@ -685,7 +726,7 @@ pub async fn command(config: Config) -> Result<()> {
685726
last_cache,
686727
distinct_cache,
687728
time_provider: Arc::<SystemProvider>::clone(&time_provider),
688-
executor: Arc::clone(&exec),
729+
executor: Arc::clone(&write_path_executor),
689730
wal_config,
690731
parquet_cache,
691732
metric_registry: Arc::clone(&metrics),

influxdb3_wal/src/lib.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use influxdb_line_protocol::FieldValue;
1616
use influxdb3_id::{ColumnId, DbId, SerdeVecMap, TableId};
1717
use influxdb3_shutdown::ShutdownToken;
1818
use iox_time::Time;
19+
use observability_deps::tracing::error;
1920
use serde::{Deserialize, Serialize};
2021
use serde_with::serde_as;
2122
use std::cmp::Ordering;
@@ -556,6 +557,9 @@ pub fn background_wal_flush<W: Wal>(
556557
{
557558
let snapshot_wal = Arc::clone(&wal);
558559
tokio::spawn(async move {
560+
// since we're using separate executor with unlimited memory,
561+
// the errors here will never be due to lack of resources. Only OS
562+
// (operating system) can OOM kill the whole process
559563
let snapshot_details = snapshot_complete.await.expect("snapshot failed");
560564
assert_eq!(snapshot_info, snapshot_details);
561565

0 commit comments

Comments
 (0)