Skip to content

Commit e79af19

Browse files
feat: produce snapshot chunks lazily
1 parent 09abf42 commit e79af19

File tree

5 files changed

+284
-163
lines changed

5 files changed

+284
-163
lines changed

influxdb3/src/commands/serve.rs

+1
Original file line numberDiff line numberDiff line change
@@ -579,6 +579,7 @@ pub async fn command(config: Config) -> Result<()> {
579579
background_buffer_checker(
580580
// config.force_snapshot_mem_threshold.bytes(),
581581
734003200,
582+
// 536870912,
582583
&write_buffer_impl,
583584
)
584585
.await;

influxdb3_write/src/paths.rs

+21-6
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ pub struct ParquetFilePath(ObjPath);
5555
impl ParquetFilePath {
5656
/// Generate a parquet file path using the given arguments. This will convert the provided
5757
/// `chunk_time` into a date time string with format `'YYYY-MM-DD/HH-MM'`
58+
#[allow(clippy::too_many_arguments)]
5859
pub fn new(
5960
host_prefix: &str,
6061
db_name: &str,
@@ -63,14 +64,26 @@ impl ParquetFilePath {
6364
table_id: u32,
6465
chunk_time: i64,
6566
wal_file_sequence_number: WalFileSequenceNumber,
67+
sub_chunk_index: Option<u64>,
6668
) -> Self {
6769
let date_time = DateTime::<Utc>::from_timestamp_nanos(chunk_time);
68-
let path = ObjPath::from(format!(
69-
"{host_prefix}/dbs/{db_name}-{db_id}/{table_name}-{table_id}/{date_string}/{wal_seq:010}.{ext}",
70-
date_string = date_time.format("%Y-%m-%d/%H-%M"),
71-
wal_seq = wal_file_sequence_number.as_u64(),
72-
ext = PARQUET_FILE_EXTENSION
73-
));
70+
let path = if sub_chunk_index.is_some() {
71+
ObjPath::from(format!(
72+
"{host_prefix}/dbs/{db_name}-{db_id}/{table_name}-{table_id}/{date_string}/{wal_seq:010}-{chunk_idx}.{ext}",
73+
date_string = date_time.format("%Y-%m-%d/%H-%M"),
74+
wal_seq = wal_file_sequence_number.as_u64(),
75+
chunk_idx = sub_chunk_index.unwrap(),
76+
ext = PARQUET_FILE_EXTENSION
77+
))
78+
79+
} else {
80+
ObjPath::from(format!(
81+
"{host_prefix}/dbs/{db_name}-{db_id}/{table_name}-{table_id}/{date_string}/{wal_seq:010}.{ext}",
82+
date_string = date_time.format("%Y-%m-%d/%H-%M"),
83+
wal_seq = wal_file_sequence_number.as_u64(),
84+
ext = PARQUET_FILE_EXTENSION
85+
))
86+
};
7487
Self(path)
7588
}
7689
}
@@ -143,6 +156,7 @@ fn parquet_file_path_new() {
143156
.timestamp_nanos_opt()
144157
.unwrap(),
145158
WalFileSequenceNumber::new(1337),
159+
None,
146160
),
147161
ObjPath::from("my_host/dbs/my_db-0/my_table-0/2038-01-19/03-14/0000001337.parquet")
148162
);
@@ -162,6 +176,7 @@ fn parquet_file_percent_encoded() {
162176
.timestamp_nanos_opt()
163177
.unwrap(),
164178
WalFileSequenceNumber::new(100),
179+
None,
165180
)
166181
.as_ref()
167182
.as_ref(),

influxdb3_write/src/persister.rs

+1
Original file line numberDiff line numberDiff line change
@@ -968,6 +968,7 @@ mod tests {
968968
0,
969969
Utc::now().timestamp_nanos_opt().unwrap(),
970970
WalFileSequenceNumber::new(1),
971+
None,
971972
);
972973
let (bytes_written, meta, _) = persister
973974
.persist_parquet_file(path.clone(), stream_builder.build())

0 commit comments

Comments
 (0)