Skip to content

Commit 946233a

Browse files
feat: separate out write path executor with unbounded memory limit
closes: #26422
1 parent a967e23 commit 946233a

File tree

2 files changed

+60
-7
lines changed

2 files changed

+60
-7
lines changed

influxdb3/src/commands/serve.rs

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -543,6 +543,19 @@ pub async fn command(config: Config) -> Result<()> {
543543
let f = SendPanicsToTracing::new_with_metrics(&metrics);
544544
std::mem::forget(f);
545545

546+
// hmmm when you have extra executor, you need extra metrics! This is expected to be a
547+
// singleton
548+
let write_path_metrics = setup_metric_registry();
549+
550+
// Install custom panic handler and forget about it.
551+
//
552+
// This leaks the handler and prevents it from ever being dropped during the
553+
// lifetime of the program - this is actually a good thing, as it prevents
554+
// the panic handler from being removed while unwinding a panic (which in
555+
// turn, causes a panic - see #548)
556+
let write_path_panic_handler_fn = SendPanicsToTracing::new_with_metrics(&write_path_metrics);
557+
std::mem::forget(write_path_panic_handler_fn);
558+
546559
// Construct a token to trigger clean shutdown
547560
let frontend_shutdown = CancellationToken::new();
548561
let shutdown_manager = ShutdownManager::new(frontend_shutdown.clone());
@@ -612,6 +625,30 @@ pub async fn command(config: Config) -> Result<()> {
612625
Arc::clone(&metrics),
613626
),
614627
));
628+
629+
// Note: using same metrics registry causes runtime panic.
630+
let write_path_executor = Arc::new(Executor::new_with_config_and_executor(
631+
ExecutorConfig {
632+
// should this be divided? or should this contend for threads with executor that's
633+
// setup for querying only
634+
target_query_partitions: tokio_datafusion_config.num_threads.unwrap(),
635+
object_stores: [&parquet_store]
636+
.into_iter()
637+
.map(|store| (store.id(), Arc::clone(store.object_store())))
638+
.collect(),
639+
metric_registry: Arc::clone(&write_path_metrics),
640+
// use as much memory for persistence, can this be UnboundedMemoryPool?
641+
mem_pool_size: usize::MAX,
642+
},
643+
DedicatedExecutor::new(
644+
"datafusion_write_path",
645+
tokio_datafusion_config
646+
.builder()
647+
.map_err(Error::TokioRuntime)?,
648+
Arc::clone(&write_path_metrics),
649+
),
650+
));
651+
615652
let runtime_env = exec.new_context().inner().runtime_env();
616653
register_iox_object_store(runtime_env, parquet_store.id(), Arc::clone(&object_store));
617654

@@ -678,7 +715,7 @@ pub async fn command(config: Config) -> Result<()> {
678715
last_cache,
679716
distinct_cache,
680717
time_provider: Arc::<SystemProvider>::clone(&time_provider),
681-
executor: Arc::clone(&exec),
718+
executor: Arc::clone(&write_path_executor),
682719
wal_config,
683720
parquet_cache,
684721
metric_registry: Arc::clone(&metrics),

influxdb3_wal/src/lib.rs

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,12 @@ use influxdb_line_protocol::v3::SeriesValue;
1717
use influxdb3_id::{ColumnId, DbId, SerdeVecMap, TableId};
1818
use influxdb3_shutdown::ShutdownToken;
1919
use iox_time::Time;
20+
use observability_deps::tracing::error;
2021
use serde::{Deserialize, Serialize};
2122
use serde_with::serde_as;
2223
use std::cmp::Ordering;
2324
use std::fmt::Debug;
25+
use std::process;
2426
use std::str::FromStr;
2527
use std::sync::Arc;
2628
use std::time::Duration;
@@ -566,12 +568,26 @@ pub fn background_wal_flush<W: Wal>(
566568
{
567569
let snapshot_wal = Arc::clone(&wal);
568570
tokio::spawn(async move {
569-
let snapshot_details = snapshot_complete.await.expect("snapshot failed");
570-
assert_eq!(snapshot_info, snapshot_details);
571-
572-
snapshot_wal
573-
.cleanup_snapshot(snapshot_info, snapshot_permit)
574-
.await;
571+
let snapshot_details = snapshot_complete.await;
572+
match snapshot_details {
573+
Ok(snapshot_details) => {
574+
if snapshot_info != snapshot_details {
575+
drop(snapshot_permit);
576+
panic!("snapshot details are different");
577+
}
578+
snapshot_wal
579+
.cleanup_snapshot(snapshot_info, snapshot_permit)
580+
.await;
581+
582+
},
583+
Err(e) => {
584+
drop(snapshot_permit);
585+
error!(error = ?e, "snapshotting failed with error");
586+
// shutdown the whole process if snapshot fails (defaulting to
587+
// 137 as OOMs are the most common reason to fail at this point)
588+
process::exit(137);
589+
}
590+
}
575591
});
576592
}
577593
}

0 commit comments

Comments
 (0)