Skip to content

feat: separate out write path executor with unbounded memory limit #26455

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 45 additions & 4 deletions influxdb3/src/commands/serve.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
//! Entrypoint for InfluxDB 3 Core Server

use anyhow::{Context, bail};
use datafusion_util::config::register_iox_object_store;
use futures::{FutureExt, future::FusedFuture, pin_mut};
use influxdb3_authz::TokenAuthenticator;
use influxdb3_cache::{
Expand Down Expand Up @@ -546,6 +545,20 @@ pub async fn command(config: Config) -> Result<()> {
let f = SendPanicsToTracing::new_with_metrics(&metrics);
std::mem::forget(f);

// When you have extra executor, you need separate metrics registry! It is not clear what
// the impact would be
// TODO: confirm this is not going to mess up downstream metrics consumers
let write_path_metrics = setup_metric_registry();
Comment on lines +548 to +551
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will likely be an issue, since the HTTP endpoint that serves prometheus (/metrics) assumes a single registry:

fn handle_metrics(&self) -> Result<Response<Body>> {
let mut body: Vec<u8> = Default::default();
let mut reporter = metric_exporters::PrometheusTextEncoder::new(&mut body);
self.common_state.metrics.report(&mut reporter);
Ok(Response::new(Body::from(body)))
}

Is the issue that using the same registry for multiple executors causes them to overwrite each other, or contend for locks with each other?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It runs into panic,

2025-06-02T14:49:39.205230Z ERROR panic_logging: Thread panic panic_type="unknown" panic_message="More than one execution pool created: previously existing instrument" panic_file="/home/praveen/.cargo/git/checkouts/influxdb3_core-2ede6fca005e1dcf/fd0e474/iox_query/src/exec.rs" panic_line=281 panic_column=9

thread 'main' panicked at /home/praveen/.cargo/git/checkouts/influxdb3_core-2ede6fca005e1dcf/fd0e474/iox_query/src/exec.rs:281:9:
More than one execution pool created: previously existing instrument
stack backtrace:
   0:     0x629f44b5e172 - <std::sys::backtrace::BacktraceLock::print::DisplayBacktrace as core::fmt::Display>::fmt::hc04c8f544ab24d66
   1:     0x629f44b8eb63 - core::fmt::write::hfe57b7174b7d8eab
   2:     0x629f44b595a3 - std::io::Write::write_fmt::h154385efa8565236
   3:     0x629f44b5dfc2 - std::sys::backtrace::BacktraceLock::print::h0c8f24e22f5873a8
   4:     0x629f44b5f24c - std::panicking::default_hook::{{closure}}::hd07d57e6a602c8e4
   5:     0x629f44b5f04f - std::panicking::default_hook::h63d12f7d95bd91ed
   6:     0x629f3fd807db - panic_logging::SendPanicsToTracing::new_inner::{{closure}}::h4f1478e3035af477
   7:     0x629f44b5fd43 - std::panicking::rust_panic_with_hook::h33b18b24045abff4
   8:     0x629f44b5f9c6 - std::panicking::begin_panic_handler::{{closure}}::hf8313cc2fd0126bc
   9:     0x629f44b5e679 - std::sys::backtrace::__rust_end_short_backtrace::h57fe07c8aea5c98a
  10:     0x629f44b5f68d - __rustc[95feac21a9532783]::rust_begin_unwind
  11:     0x629f44b8bac0 - core::panicking::panic_fmt::hd54fb667be51beea
  12:     0x629f414b6cb7 - iox_query::exec::Executor::new_with_config_and_executor::h3ef1059edcb25ade
  13:     0x629f3f99fd9c - influxdb3::commands::serve::command::{{closure}}::h2cdf5ca9df83df25
  14:     0x629f3f9b6fcb - influxdb3::main::{{closure}}::hc953cfc298ca6770
  15:     0x629f3f987b39 - tokio::runtime::park::CachedParkThread::block_on::h51b18ac33f8a0e4d
  16:     0x629f3fb2a7bf - tokio::runtime::runtime::Runtime::block_on::h9eb33b87acb6fa53
  17:     0x629f3fc20d55 - influxdb3::main::h75a268e75e689bc6
  18:     0x629f3fcbb256 - std::sys::backtrace::__rust_begin_short_backtrace::h5b4e77177edb3cca
  19:     0x629f3fab4321 - std::rt::lang_start::{{closure}}::hc69eb1d94c6de306
  20:     0x629f44b4e080 - std::rt::lang_start_internal::h418648f91f5be3a1
  21:     0x629f3fc3b19d - main
  22:     0x7ed71c33d488 - <unknown>
  23:     0x7ed71c33d54c - __libc_start_main
  24:     0x629f3f95b325 - _start
  25:                0x0 - <unknown>
2025-06-02T14:49:39.295724Z  WARN executor: DedicatedExecutor dropped without calling shutdown()
2025-06-02T14:49:39.296308Z  WARN executor: DedicatedExecutor dropped without calling shutdown()

I can look into the panic and see if I can address that in a different way if this is going to mess downstream consumers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked into the code that runs into panic

https://github.com/influxdata/influxdb3_core/blob/fd0e474a6c0af5ba867399d753f5df18f59907cb/iox_query/src/exec.rs#L268-L284

It looks like there is an assumption that you violate single memory pool -> executor relationship if the "datafusion_pool" is already registered. Even though we don't break that relationship, i.e in this branch there are two executors and each has it's own memory pool so the relationship is still correct but because registry is shared it runs into this error.

I need to spend a bit more time to see if I can create the executor outside without hooking it up to metrics to start with (or use a different name for instrument "datafusion_write_pool") and then experiment with how it's reporting.


// Install custom panic handler and forget about it.
//
// This leaks the handler and prevents it from ever being dropped during the
// lifetime of the program - this is actually a good thing, as it prevents
// the panic handler from being removed while unwinding a panic (which in
// turn, causes a panic - see #548)
let write_path_panic_handler_fn = SendPanicsToTracing::new_with_metrics(&write_path_metrics);
std::mem::forget(write_path_panic_handler_fn);

// Construct a token to trigger clean shutdown
let frontend_shutdown = CancellationToken::new();
let shutdown_manager = ShutdownManager::new(frontend_shutdown.clone());
Expand Down Expand Up @@ -619,8 +632,36 @@ pub async fn command(config: Config) -> Result<()> {
Arc::clone(&metrics),
),
));
let runtime_env = exec.new_context().inner().runtime_env();
register_iox_object_store(runtime_env, parquet_store.id(), Arc::clone(&object_store));

// Note: using same metrics registry causes runtime panic.
let write_path_executor = Arc::new(Executor::new_with_config_and_executor(
ExecutorConfig {
// should this be divided? or should this contend for threads with executor that's
// setup for querying only
target_query_partitions: tokio_datafusion_config.num_threads.unwrap(),
object_stores: [&parquet_store]
.into_iter()
.map(|store| (store.id(), Arc::clone(store.object_store())))
.collect(),
metric_registry: Arc::clone(&write_path_metrics),
// use as much memory for persistence, can this be UnboundedMemoryPool?
mem_pool_size: usize::MAX,
// These are new additions, just skimming through the code it does not look like we can
// achieve the same effect as having a separate executor. It looks like it's for "all"
// queries, it'd be nice to have a filter to say when the query matches this pattern
// apply these limits. If that's possible maybe we could avoid creating a separate
// executor.
per_query_mem_pool_config: PerQueryMemoryPoolConfig::Disabled,
heap_memory_limit: None,
Comment on lines +649 to +655
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, these came in from iox in the most recent sync. They aren't used anywhere in our code yet. I opened #26460 to address using them.

},
DedicatedExecutor::new(
"datafusion_write_path",
tokio_datafusion_config
.builder()
.map_err(Error::TokioRuntime)?,
Arc::clone(&write_path_metrics),
),
));

let trace_header_parser = TraceHeaderParser::new()
.with_jaeger_trace_context_header_name(
Expand Down Expand Up @@ -685,7 +726,7 @@ pub async fn command(config: Config) -> Result<()> {
last_cache,
distinct_cache,
time_provider: Arc::<SystemProvider>::clone(&time_provider),
executor: Arc::clone(&exec),
executor: Arc::clone(&write_path_executor),
wal_config,
parquet_cache,
metric_registry: Arc::clone(&metrics),
Expand Down
4 changes: 4 additions & 0 deletions influxdb3_wal/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use influxdb_line_protocol::FieldValue;
use influxdb3_id::{ColumnId, DbId, SerdeVecMap, TableId};
use influxdb3_shutdown::ShutdownToken;
use iox_time::Time;
use observability_deps::tracing::error;
use serde::{Deserialize, Serialize};
use serde_with::serde_as;
use std::cmp::Ordering;
Expand Down Expand Up @@ -556,6 +557,9 @@ pub fn background_wal_flush<W: Wal>(
{
let snapshot_wal = Arc::clone(&wal);
tokio::spawn(async move {
// since we're using separate executor with unlimited memory,
// the errors here will never be due to lack of resources. Only OS
// (operating system) can OOM kill the whole process
let snapshot_details = snapshot_complete.await.expect("snapshot failed");
assert_eq!(snapshot_info, snapshot_details);

Expand Down