Skip to content

Commit 73f96ce

Browse files
refactor: tidy ups
1 parent 0e3bc05 commit 73f96ce

File tree

2 files changed

+19
-27
lines changed

2 files changed

+19
-27
lines changed

influxdb3/src/commands/serve.rs

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
//! Entrypoint for InfluxDB 3 Core Server
22
33
use anyhow::{Context, bail};
4-
use datafusion_util::config::register_iox_object_store;
54
use futures::{FutureExt, future::FusedFuture, pin_mut};
65
use influxdb3_authz::TokenAuthenticator;
76
use influxdb3_cache::{
@@ -546,8 +545,9 @@ pub async fn command(config: Config) -> Result<()> {
546545
let f = SendPanicsToTracing::new_with_metrics(&metrics);
547546
std::mem::forget(f);
548547

549-
// hmmm when you have extra executor, you need extra metrics! This is expected to be a
550-
// singleton
548+
// When you have extra executor, you need separate metrics registry! It is not clear what
549+
// the impact would be
550+
// TODO: confirm this is not going to mess up downstream metrics consumers
551551
let write_path_metrics = setup_metric_registry();
552552

553553
// Install custom panic handler and forget about it.
@@ -646,6 +646,13 @@ pub async fn command(config: Config) -> Result<()> {
646646
metric_registry: Arc::clone(&write_path_metrics),
647647
// use as much memory for persistence, can this be UnboundedMemoryPool?
648648
mem_pool_size: usize::MAX,
649+
// These are new additions, just skimming through the code it does not look like we can
650+
// achieve the same effect as having a separate executor. It looks like it's for "all"
651+
// queries, it'd be nice to have a filter to say when the query matches this pattern
652+
// apply these limits. If that's possible maybe we could avoid creating a separate
653+
// executor.
654+
per_query_mem_pool_config: PerQueryMemoryPoolConfig::Disabled,
655+
heap_memory_limit: None,
649656
},
650657
DedicatedExecutor::new(
651658
"datafusion_write_path",
@@ -656,9 +663,6 @@ pub async fn command(config: Config) -> Result<()> {
656663
),
657664
));
658665

659-
let runtime_env = exec.new_context().inner().runtime_env();
660-
register_iox_object_store(runtime_env, parquet_store.id(), Arc::clone(&object_store));
661-
662666
let trace_header_parser = TraceHeaderParser::new()
663667
.with_jaeger_trace_context_header_name(
664668
config

influxdb3_wal/src/lib.rs

Lines changed: 9 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ use serde::{Deserialize, Serialize};
2121
use serde_with::serde_as;
2222
use std::cmp::Ordering;
2323
use std::fmt::Debug;
24-
use std::process;
2524
use std::str::FromStr;
2625
use std::sync::Arc;
2726
use std::time::Duration;
@@ -558,26 +557,15 @@ pub fn background_wal_flush<W: Wal>(
558557
{
559558
let snapshot_wal = Arc::clone(&wal);
560559
tokio::spawn(async move {
561-
let snapshot_details = snapshot_complete.await;
562-
match snapshot_details {
563-
Ok(snapshot_details) => {
564-
if snapshot_info != snapshot_details {
565-
drop(snapshot_permit);
566-
panic!("snapshot details are different");
567-
}
568-
snapshot_wal
569-
.cleanup_snapshot(snapshot_info, snapshot_permit)
570-
.await;
571-
572-
},
573-
Err(e) => {
574-
drop(snapshot_permit);
575-
error!(error = ?e, "snapshotting failed with error");
576-
// shutdown the whole process if snapshot fails (defaulting to
577-
// 137 as OOMs are the most common reason to fail at this point)
578-
process::exit(137);
579-
}
580-
}
560+
// since we're using separate executor with unlimited memory,
561+
// the errors here will never be due to lack of resources. Only OS
562+
// (operating system) can OOM kill the whole process
563+
let snapshot_details = snapshot_complete.await.expect("snapshot failed");
564+
assert_eq!(snapshot_info, snapshot_details);
565+
566+
snapshot_wal
567+
.cleanup_snapshot(snapshot_info, snapshot_permit)
568+
.await;
581569
});
582570
}
583571
}

0 commit comments

Comments
 (0)