Skip to content

Commit 0e3bc05

Browse files
feat: separate out write path executor with unbounded memory limit
closes: #26422
1 parent be25c6f commit 0e3bc05

File tree

2 files changed

+60
-7
lines changed

2 files changed

+60
-7
lines changed

influxdb3/src/commands/serve.rs

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -546,6 +546,19 @@ pub async fn command(config: Config) -> Result<()> {
546546
let f = SendPanicsToTracing::new_with_metrics(&metrics);
547547
std::mem::forget(f);
548548

549+
// hmmm when you have extra executor, you need extra metrics! This is expected to be a
550+
// singleton
551+
let write_path_metrics = setup_metric_registry();
552+
553+
// Install custom panic handler and forget about it.
554+
//
555+
// This leaks the handler and prevents it from ever being dropped during the
556+
// lifetime of the program - this is actually a good thing, as it prevents
557+
// the panic handler from being removed while unwinding a panic (which in
558+
// turn, causes a panic - see #548)
559+
let write_path_panic_handler_fn = SendPanicsToTracing::new_with_metrics(&write_path_metrics);
560+
std::mem::forget(write_path_panic_handler_fn);
561+
549562
// Construct a token to trigger clean shutdown
550563
let frontend_shutdown = CancellationToken::new();
551564
let shutdown_manager = ShutdownManager::new(frontend_shutdown.clone());
@@ -619,6 +632,30 @@ pub async fn command(config: Config) -> Result<()> {
619632
Arc::clone(&metrics),
620633
),
621634
));
635+
636+
// Note: using same metrics registry causes runtime panic.
637+
let write_path_executor = Arc::new(Executor::new_with_config_and_executor(
638+
ExecutorConfig {
639+
// should this be divided? or should this contend for threads with executor that's
640+
// setup for querying only
641+
target_query_partitions: tokio_datafusion_config.num_threads.unwrap(),
642+
object_stores: [&parquet_store]
643+
.into_iter()
644+
.map(|store| (store.id(), Arc::clone(store.object_store())))
645+
.collect(),
646+
metric_registry: Arc::clone(&write_path_metrics),
647+
// use as much memory for persistence, can this be UnboundedMemoryPool?
648+
mem_pool_size: usize::MAX,
649+
},
650+
DedicatedExecutor::new(
651+
"datafusion_write_path",
652+
tokio_datafusion_config
653+
.builder()
654+
.map_err(Error::TokioRuntime)?,
655+
Arc::clone(&write_path_metrics),
656+
),
657+
));
658+
622659
let runtime_env = exec.new_context().inner().runtime_env();
623660
register_iox_object_store(runtime_env, parquet_store.id(), Arc::clone(&object_store));
624661

@@ -685,7 +722,7 @@ pub async fn command(config: Config) -> Result<()> {
685722
last_cache,
686723
distinct_cache,
687724
time_provider: Arc::<SystemProvider>::clone(&time_provider),
688-
executor: Arc::clone(&exec),
725+
executor: Arc::clone(&write_path_executor),
689726
wal_config,
690727
parquet_cache,
691728
metric_registry: Arc::clone(&metrics),

influxdb3_wal/src/lib.rs

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,12 @@ use influxdb_line_protocol::FieldValue;
1616
use influxdb3_id::{ColumnId, DbId, SerdeVecMap, TableId};
1717
use influxdb3_shutdown::ShutdownToken;
1818
use iox_time::Time;
19+
use observability_deps::tracing::error;
1920
use serde::{Deserialize, Serialize};
2021
use serde_with::serde_as;
2122
use std::cmp::Ordering;
2223
use std::fmt::Debug;
24+
use std::process;
2325
use std::str::FromStr;
2426
use std::sync::Arc;
2527
use std::time::Duration;
@@ -556,12 +558,26 @@ pub fn background_wal_flush<W: Wal>(
556558
{
557559
let snapshot_wal = Arc::clone(&wal);
558560
tokio::spawn(async move {
559-
let snapshot_details = snapshot_complete.await.expect("snapshot failed");
560-
assert_eq!(snapshot_info, snapshot_details);
561-
562-
snapshot_wal
563-
.cleanup_snapshot(snapshot_info, snapshot_permit)
564-
.await;
561+
let snapshot_details = snapshot_complete.await;
562+
match snapshot_details {
563+
Ok(snapshot_details) => {
564+
if snapshot_info != snapshot_details {
565+
drop(snapshot_permit);
566+
panic!("snapshot details are different");
567+
}
568+
snapshot_wal
569+
.cleanup_snapshot(snapshot_info, snapshot_permit)
570+
.await;
571+
572+
},
573+
Err(e) => {
574+
drop(snapshot_permit);
575+
error!(error = ?e, "snapshotting failed with error");
576+
// shutdown the whole process if snapshot fails (defaulting to
577+
// 137 as OOMs are the most common reason to fail at this point)
578+
process::exit(137);
579+
}
580+
}
565581
});
566582
}
567583
}

0 commit comments

Comments
 (0)