Skip to content

Commit 3a4a9ab

Browse files
feat: schema mismatch alignment
This PR addresses the OOM issue (or reduces the chances of running into OOM when snapshotting) by doing following main changes - defaults gen 1 duration to 1m (instead of 10m) - snapshot chunks are built lazily and - sort/dedupe step itself is done serially (i.e 1 at a time) As an optimisation when _not_ forcing a snapshot it aggregates up to 10m worth of chunks and writes them in parallel assumption is given it's a normal snapshot, there is enough memory to run it. closes: #25991
1 parent 7ddd888 commit 3a4a9ab

File tree

9 files changed

+868
-283
lines changed

9 files changed

+868
-283
lines changed

Cargo.lock

+1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

influxdb3/src/commands/serve.rs

+2-4
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,7 @@ pub struct Config {
181181
#[clap(
182182
long = "gen1-duration",
183183
env = "INFLUXDB3_GEN1_DURATION",
184-
default_value = "10m",
184+
default_value = "1m",
185185
action
186186
)]
187187
pub gen1_duration: Gen1Duration,
@@ -577,9 +577,7 @@ pub async fn command(config: Config) -> Result<()> {
577577

578578
info!("setting up background mem check for query buffer");
579579
background_buffer_checker(
580-
// config.force_snapshot_mem_threshold.bytes(),
581-
734003200,
582-
// 536870912,
580+
config.force_snapshot_mem_threshold.as_num_bytes(),
583581
&write_buffer_impl,
584582
)
585583
.await;

influxdb3_wal/src/lib.rs

+11-1
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,16 @@ impl Gen1Duration {
207207
self.0.as_nanos() as i64
208208
}
209209

210+
pub fn as_10m(&self) -> u64 {
211+
let duration_secs = self.0.as_secs();
212+
let ten_min_secs = 600;
213+
if duration_secs >= ten_min_secs {
214+
1
215+
} else {
216+
ten_min_secs / duration_secs
217+
}
218+
}
219+
210220
pub fn new_1m() -> Self {
211221
Self(Duration::from_secs(60))
212222
}
@@ -237,7 +247,7 @@ impl Default for Gen1Duration {
237247

238248
#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
239249
pub struct NoopDetails {
240-
timestamp_ns: i64,
250+
pub timestamp_ns: i64,
241251
}
242252

243253
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]

influxdb3_write/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ serde_json.workspace = true
6363
serde_with.workspace = true
6464
sha2.workspace = true
6565
snap.workspace = true
66+
sysinfo.workspace = true
6667
thiserror.workspace = true
6768
tokio.workspace = true
6869
url.workspace = true

influxdb3_write/src/paths.rs

+6-21
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,6 @@ pub struct ParquetFilePath(ObjPath);
5555
impl ParquetFilePath {
5656
/// Generate a parquet file path using the given arguments. This will convert the provided
5757
/// `chunk_time` into a date time string with format `'YYYY-MM-DD/HH-MM'`
58-
#[allow(clippy::too_many_arguments)]
5958
pub fn new(
6059
host_prefix: &str,
6160
db_name: &str,
@@ -64,26 +63,14 @@ impl ParquetFilePath {
6463
table_id: u32,
6564
chunk_time: i64,
6665
wal_file_sequence_number: WalFileSequenceNumber,
67-
sub_chunk_index: Option<u64>,
6866
) -> Self {
6967
let date_time = DateTime::<Utc>::from_timestamp_nanos(chunk_time);
70-
let path = if sub_chunk_index.is_some() {
71-
ObjPath::from(format!(
72-
"{host_prefix}/dbs/{db_name}-{db_id}/{table_name}-{table_id}/{date_string}/{wal_seq:010}-{chunk_idx}.{ext}",
73-
date_string = date_time.format("%Y-%m-%d/%H-%M"),
74-
wal_seq = wal_file_sequence_number.as_u64(),
75-
chunk_idx = sub_chunk_index.unwrap(),
76-
ext = PARQUET_FILE_EXTENSION
77-
))
78-
79-
} else {
80-
ObjPath::from(format!(
81-
"{host_prefix}/dbs/{db_name}-{db_id}/{table_name}-{table_id}/{date_string}/{wal_seq:010}.{ext}",
82-
date_string = date_time.format("%Y-%m-%d/%H-%M"),
83-
wal_seq = wal_file_sequence_number.as_u64(),
84-
ext = PARQUET_FILE_EXTENSION
85-
))
86-
};
68+
let path = ObjPath::from(format!(
69+
"{host_prefix}/dbs/{db_name}-{db_id}/{table_name}-{table_id}/{date_string}/{wal_seq:010}.{ext}",
70+
date_string = date_time.format("%Y-%m-%d/%H-%M"),
71+
wal_seq = wal_file_sequence_number.as_u64(),
72+
ext = PARQUET_FILE_EXTENSION
73+
));
8774
Self(path)
8875
}
8976
}
@@ -156,7 +143,6 @@ fn parquet_file_path_new() {
156143
.timestamp_nanos_opt()
157144
.unwrap(),
158145
WalFileSequenceNumber::new(1337),
159-
None,
160146
),
161147
ObjPath::from("my_host/dbs/my_db-0/my_table-0/2038-01-19/03-14/0000001337.parquet")
162148
);
@@ -176,7 +162,6 @@ fn parquet_file_percent_encoded() {
176162
.timestamp_nanos_opt()
177163
.unwrap(),
178164
WalFileSequenceNumber::new(100),
179-
None,
180165
)
181166
.as_ref()
182167
.as_ref(),

influxdb3_write/src/persister.rs

-1
Original file line numberDiff line numberDiff line change
@@ -968,7 +968,6 @@ mod tests {
968968
0,
969969
Utc::now().timestamp_nanos_opt().unwrap(),
970970
WalFileSequenceNumber::new(1),
971-
None,
972971
);
973972
let (bytes_written, meta, _) = persister
974973
.persist_parquet_file(path.clone(), stream_builder.build())

influxdb3_write/src/write_buffer/mod.rs

+6-7
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,8 @@ impl WriteBufferImpl {
221221
distinct_cache_provider: Arc::clone(&distinct_cache),
222222
persisted_files: Arc::clone(&persisted_files),
223223
parquet_cache: parquet_cache.clone(),
224+
gen1_duration: wal_config.gen1_duration,
225+
max_size_per_parquet_file_bytes: 50_000_000,
224226
}));
225227

226228
// create the wal instance, which will replay into the queryable buffer and start
@@ -2721,10 +2723,7 @@ mod tests {
27212723
#[test_log::test(tokio::test)]
27222724
async fn test_out_of_order_data() {
27232725
let tmp_dir = test_helpers::tmp_dir().unwrap();
2724-
debug!(
2725-
?tmp_dir,
2726-
">>> using tmp dir for test_check_mem_and_force_snapshot"
2727-
);
2726+
debug!(?tmp_dir, ">>> using tmp dir");
27282727
let obj_store: Arc<dyn ObjectStore> =
27292728
Arc::new(LocalFileSystem::new_with_prefix(tmp_dir).unwrap());
27302729
let (write_buffer, _, _) = setup(
@@ -2795,6 +2794,9 @@ mod tests {
27952794
"| a | us | 1970-01-01T00:00:28Z | 10.0 |",
27962795
"| a | us | 1970-01-01T00:00:29Z | 10.0 |",
27972796
"| a | us | 1970-01-01T00:00:30Z | 10.0 |",
2797+
"| a | us | 1970-01-01T00:00:20Z | 10.0 |",
2798+
"| a | us | 1970-01-01T00:00:21Z | 10.0 |",
2799+
"| a | us | 1970-01-01T00:00:22Z | 10.0 |",
27982800
"| a | us | 1970-01-01T00:01:40Z | 10.0 |",
27992801
"| a | us | 1970-01-01T00:01:41Z | 10.0 |",
28002802
"| a | us | 1970-01-01T00:01:42Z | 10.0 |",
@@ -2807,9 +2809,6 @@ mod tests {
28072809
"| a | us | 1970-01-01T00:01:49Z | 10.0 |",
28082810
"| a | us | 1970-01-01T00:01:50Z | 10.0 |",
28092811
"| a | us | 1970-01-01T00:01:51Z | 10.0 |",
2810-
"| a | us | 1970-01-01T00:00:20Z | 10.0 |",
2811-
"| a | us | 1970-01-01T00:00:21Z | 10.0 |",
2812-
"| a | us | 1970-01-01T00:00:22Z | 10.0 |",
28132812
"+------+--------+----------------------+-------+",
28142813
],
28152814
&actual

0 commit comments

Comments
 (0)