Skip to content

Commit 8598a99

Browse files
feat: break down persist jobs
closes: #25991
1 parent 1f72bfc commit 8598a99

File tree

2 files changed

+113
-24
lines changed

2 files changed

+113
-24
lines changed

influxdb3/src/commands/serve.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -577,7 +577,8 @@ pub async fn command(config: Config) -> Result<()> {
577577

578578
info!("setting up background mem check for query buffer");
579579
background_buffer_checker(
580-
config.force_snapshot_mem_threshold.as_num_bytes(),
580+
// config.force_snapshot_mem_threshold.bytes(),
581+
734003200,
581582
&write_buffer_impl,
582583
)
583584
.await;

influxdb3_write/src/write_buffer/queryable_buffer.rs

+111-23
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,12 @@ use crate::write_buffer::persisted_files::PersistedFiles;
55
use crate::write_buffer::table_buffer::TableBuffer;
66
use crate::{ChunkFilter, ParquetFile, ParquetFileId, PersistedSnapshot};
77
use anyhow::Context;
8-
use arrow::record_batch::RecordBatch;
8+
use arrow::{
9+
array::AsArray,
10+
datatypes::TimestampNanosecondType,
11+
record_batch::RecordBatch,
12+
row::{RowConverter, SortField},
13+
};
914
use async_trait::async_trait;
1015
use data_types::{
1116
ChunkId, ChunkOrder, PartitionHashId, PartitionId, PartitionKey, TimestampMinMax,
@@ -25,7 +30,7 @@ use iox_query::chunk_statistics::{NoColumnRanges, create_chunk_statistics};
2530
use iox_query::exec::Executor;
2631
use iox_query::frontend::reorg::ReorgPlanner;
2732
use object_store::path::Path;
28-
use observability_deps::tracing::{error, info};
33+
use observability_deps::tracing::{debug, error, info};
2934
use parking_lot::Mutex;
3035
use parking_lot::RwLock;
3136
use parquet::format::FileMetaData;
@@ -197,27 +202,81 @@ impl QueryableBuffer {
197202
for chunk in snapshot_chunks {
198203
let table_name =
199204
db_schema.table_id_to_name(table_id).expect("table exists");
200-
let persist_job = PersistJob {
201-
database_id: *database_id,
202-
table_id: *table_id,
203-
table_name: Arc::clone(&table_name),
204-
chunk_time: chunk.chunk_time,
205-
path: ParquetFilePath::new(
206-
self.persister.node_identifier_prefix(),
207-
db_schema.name.as_ref(),
208-
database_id.as_u32(),
209-
table_name.as_ref(),
210-
table_id.as_u32(),
211-
chunk.chunk_time,
212-
snapshot_details.last_wal_sequence_number,
213-
),
214-
batch: chunk.record_batch,
215-
schema: chunk.schema,
216-
timestamp_min_max: chunk.timestamp_min_max,
217-
sort_key: table_buffer.sort_key.clone(),
218-
};
219-
220-
persisting_chunks.push(persist_job);
205+
// mapping between time to main record batch array's index
206+
let mut smaller_chunks: HashMap<i64, (MinMax, Vec<usize>)> =
207+
HashMap::new();
208+
let smaller_duration = Duration::from_secs(10).as_nanos() as i64;
209+
let all_times = chunk
210+
.record_batch
211+
.column_by_name("time")
212+
.expect("time col to be present")
213+
.as_primitive::<TimestampNanosecondType>()
214+
.values();
215+
for (idx, time) in all_times.iter().enumerate() {
216+
let smaller_chunk_time = time - (time % smaller_duration);
217+
let (min_max, vec_indices) =
218+
smaller_chunks.entry(smaller_chunk_time).or_insert_with(|| {
219+
(MinMax::new(i64::MAX, i64::MIN), Vec::new())
220+
});
221+
222+
min_max.update(*time);
223+
vec_indices.push(idx);
224+
}
225+
226+
// at this point we have a bucket for each 10 sec block, we can create
227+
// smaller record batches here but maybe wasteful if we ever needed one
228+
// batch (let's see how this works first and then decide what can happen)
229+
let batch_schema = chunk.record_batch.schema();
230+
let parent_cols = chunk.record_batch.columns();
231+
let fields = batch_schema
232+
.fields()
233+
.iter()
234+
.map(|field| SortField::new(field.data_type().clone()))
235+
.collect();
236+
debug!(?fields, ">>> schema fields");
237+
238+
let converter =
239+
RowConverter::new(fields).expect("row converter created from fields");
240+
let rows = converter
241+
.convert_columns(parent_cols)
242+
.expect("convert cols to rows to succeed");
243+
244+
for (smaller_chunk_time, (min_max, all_indexes)) in smaller_chunks.iter() {
245+
// create a record batch using just all_indexes from parent recordbatch
246+
let all_rows = all_indexes
247+
.iter()
248+
.map(|idx| rows.row(*idx))
249+
.collect::<Vec<_>>();
250+
251+
let child_cols = converter
252+
.convert_rows(all_rows)
253+
.expect("should convert rows back to cols");
254+
255+
let smaller_rec_batch =
256+
RecordBatch::try_new(Arc::clone(&batch_schema), child_cols)
257+
.expect("create smaller record batch");
258+
let persist_job = PersistJob {
259+
database_id: *database_id,
260+
table_id: *table_id,
261+
table_name: Arc::clone(&table_name),
262+
chunk_time: *smaller_chunk_time,
263+
path: ParquetFilePath::new(
264+
self.persister.node_identifier_prefix(),
265+
db_schema.name.as_ref(),
266+
database_id.as_u32(),
267+
table_name.as_ref(),
268+
table_id.as_u32(),
269+
*smaller_chunk_time,
270+
snapshot_details.last_wal_sequence_number,
271+
),
272+
batch: smaller_rec_batch,
273+
// this schema.clone() can be avoided?
274+
schema: chunk.schema.clone(),
275+
timestamp_min_max: min_max.to_ts_min_max(),
276+
sort_key: table_buffer.sort_key.clone(),
277+
};
278+
persisting_chunks.push(persist_job);
279+
}
221280
}
222281
}
223282
}
@@ -435,6 +494,34 @@ impl QueryableBuffer {
435494
}
436495
}
437496

497+
struct MinMax {
498+
min: i64,
499+
max: i64,
500+
}
501+
502+
impl MinMax {
503+
fn new(min: i64, max: i64) -> Self {
504+
// this doesn't check if min < max, a lot of the times
505+
// it's good to start with i64::MAX for min and i64::MIN
506+
// for max in loops so this type unlike TimestampMinMax
507+
// doesn't check this pre-condition
508+
Self {
509+
min,
510+
max
511+
}
512+
}
513+
514+
fn update(&mut self, other: i64) {
515+
self.min = other.min(self.min);
516+
self.max = other.max(self.max);
517+
}
518+
519+
fn to_ts_min_max(&self) -> TimestampMinMax {
520+
// at this point min < max
521+
TimestampMinMax::new(self.min, self.max)
522+
}
523+
}
524+
438525
#[async_trait]
439526
impl WalFileNotifier for QueryableBuffer {
440527
async fn notify(&self, write: Arc<WalContents>) {
@@ -653,6 +740,7 @@ async fn sort_dedupe_persist(
653740
persist_job.path.to_string()
654741
);
655742

743+
// TODO: this is a good place to use multiple batches
656744
let chunk_stats = create_chunk_statistics(
657745
Some(row_count),
658746
&persist_job.schema,

0 commit comments

Comments
 (0)