@@ -6,10 +6,10 @@ use crate::write_buffer::table_buffer::TableBuffer;
6
6
use crate :: { ChunkFilter , ParquetFile , ParquetFileId , PersistedSnapshot } ;
7
7
use anyhow:: Context ;
8
8
use arrow:: {
9
- array:: AsArray ,
9
+ array:: { AsArray , UInt64Array } ,
10
+ compute:: take,
10
11
datatypes:: TimestampNanosecondType ,
11
12
record_batch:: RecordBatch ,
12
- row:: { RowConverter , SortField } ,
13
13
} ;
14
14
use async_trait:: async_trait;
15
15
use data_types:: {
@@ -203,8 +203,7 @@ impl QueryableBuffer {
203
203
let table_name =
204
204
db_schema. table_id_to_name ( table_id) . expect ( "table exists" ) ;
205
205
// mapping between time to main record batch array's index
206
- let mut smaller_chunks: HashMap < i64 , ( MinMax , Vec < usize > ) > =
207
- HashMap :: new ( ) ;
206
+ let mut smaller_chunks: HashMap < i64 , ( MinMax , Vec < u64 > ) > = HashMap :: new ( ) ;
208
207
let smaller_duration = Duration :: from_secs ( 10 ) . as_nanos ( ) as i64 ;
209
208
let all_times = chunk
210
209
. record_batch
@@ -214,59 +213,62 @@ impl QueryableBuffer {
214
213
. values ( ) ;
215
214
for ( idx, time) in all_times. iter ( ) . enumerate ( ) {
216
215
let smaller_chunk_time = time - ( time % smaller_duration) ;
217
- let ( min_max, vec_indices) =
218
- smaller_chunks. entry ( smaller_chunk_time) . or_insert_with ( || {
219
- ( MinMax :: new ( i64:: MAX , i64:: MIN ) , Vec :: new ( ) )
220
- } ) ;
216
+ let ( min_max, vec_indices) = smaller_chunks
217
+ . entry ( smaller_chunk_time)
218
+ . or_insert_with ( || ( MinMax :: new ( i64:: MAX , i64:: MIN ) , Vec :: new ( ) ) ) ;
221
219
222
220
min_max. update ( * time) ;
223
- vec_indices. push ( idx) ;
221
+ vec_indices. push ( idx as u64 ) ;
222
+ }
223
+
224
+ let total_row_count = chunk. record_batch . column ( 0 ) . len ( ) ;
225
+
226
+ for ( smaller_chunk_time, ( min_max, all_indexes) ) in smaller_chunks. iter ( ) {
227
+ debug ! (
228
+ ?smaller_chunk_time,
229
+ ?min_max,
230
+ num_indexes = ?all_indexes. len( ) ,
231
+ ?total_row_count,
232
+ ">>> number of small chunks" ) ;
224
233
}
225
234
226
235
// at this point we have a bucket for each 10 sec block, we can create
227
236
// smaller record batches here but maybe wasteful if we ever needed one
228
237
// batch (let's see how this works first and then decide what can happen)
229
238
let batch_schema = chunk. record_batch . schema ( ) ;
239
+ debug ! ( schema = ?chunk. schema, ">>> influx schema" ) ;
240
+ debug ! ( arrow_schema = ?batch_schema, ">>> batch schema" ) ;
230
241
let parent_cols = chunk. record_batch . columns ( ) ;
231
- let fields = batch_schema
232
- . fields ( )
233
- . iter ( )
234
- . map ( |field| SortField :: new ( field. data_type ( ) . clone ( ) ) )
235
- . collect ( ) ;
236
- debug ! ( ?fields, ">>> schema fields" ) ;
237
-
238
- let converter =
239
- RowConverter :: new ( fields) . expect ( "row converter created from fields" ) ;
240
- let rows = converter
241
- . convert_columns ( parent_cols)
242
- . expect ( "convert cols to rows to succeed" ) ;
243
-
244
- for ( smaller_chunk_time, ( min_max, all_indexes) ) in smaller_chunks. iter ( ) {
245
- // create a record batch using just all_indexes from parent recordbatch
246
- let all_rows = all_indexes
247
- . iter ( )
248
- . map ( |idx| rows. row ( * idx) )
249
- . collect :: < Vec < _ > > ( ) ;
250
-
251
- let child_cols = converter
252
- . convert_rows ( all_rows)
253
- . expect ( "should convert rows back to cols" ) ;
254
242
243
+ for ( smaller_chunk_time, ( min_max, all_indexes) ) in
244
+ smaller_chunks. into_iter ( )
245
+ {
246
+ let mut smaller_chunk_cols = vec ! [ ] ;
247
+ let indices = UInt64Array :: from_iter ( all_indexes) ;
248
+ for arr in parent_cols {
249
+ let filtered =
250
+ take ( & arr, & indices, None )
251
+ . expect ( "index should be accessible in parent cols" ) ;
252
+
253
+ debug ! ( smaller_chunk_len = ?filtered. len( ) , ">>> filtered size" ) ;
254
+ smaller_chunk_cols. push ( filtered) ;
255
+ }
256
+ debug ! ( smaller_chunk_len = ?smaller_chunk_cols. len( ) , ">>> smaller chunks size" ) ;
255
257
let smaller_rec_batch =
256
- RecordBatch :: try_new ( Arc :: clone ( & batch_schema) , child_cols )
258
+ RecordBatch :: try_new ( Arc :: clone ( & batch_schema) , smaller_chunk_cols )
257
259
. expect ( "create smaller record batch" ) ;
258
260
let persist_job = PersistJob {
259
261
database_id : * database_id,
260
262
table_id : * table_id,
261
263
table_name : Arc :: clone ( & table_name) ,
262
- chunk_time : * smaller_chunk_time,
264
+ chunk_time : smaller_chunk_time,
263
265
path : ParquetFilePath :: new (
264
266
self . persister . node_identifier_prefix ( ) ,
265
267
db_schema. name . as_ref ( ) ,
266
268
database_id. as_u32 ( ) ,
267
269
table_name. as_ref ( ) ,
268
270
table_id. as_u32 ( ) ,
269
- * smaller_chunk_time,
271
+ smaller_chunk_time,
270
272
snapshot_details. last_wal_sequence_number ,
271
273
) ,
272
274
batch : smaller_rec_batch,
@@ -277,6 +279,63 @@ impl QueryableBuffer {
277
279
} ;
278
280
persisting_chunks. push ( persist_job) ;
279
281
}
282
+ // let fields = batch_schema
283
+ // .fields()
284
+ // .iter()
285
+ // .map(|field| SortField::new(field.data_type().clone()))
286
+ // .collect();
287
+ // debug!(?fields, ">>> schema fields");
288
+ //
289
+ // let converter =
290
+ // RowConverter::new(fields).expect("row converter created from fields");
291
+ // debug!(?converter, ">>> converter");
292
+ //
293
+ // let rows = converter
294
+ // .convert_columns(parent_cols)
295
+ // .expect("convert cols to rows to succeed");
296
+ // debug!(?rows, ">>> all rows");
297
+ //
298
+ // for (smaller_chunk_time, (min_max, all_indexes)) in smaller_chunks.iter() {
299
+ //
300
+ // // create a record batch using just all_indexes from parent recordbatch
301
+ // let all_rows = all_indexes
302
+ // .iter()
303
+ // .map(|idx| rows.row(*idx))
304
+ // .collect::<Vec<_>>();
305
+ // debug!(?rows, ">>> all filtered child rows");
306
+ //
307
+ // // hmmm this conversion turns Dictionary types to StringArray, not sure
308
+ // // why
309
+ // let child_cols = converter
310
+ // .convert_rows(all_rows)
311
+ // .expect("should convert rows back to cols");
312
+ // debug!(?child_cols, ">>> all child cols");
313
+ //
314
+ // let smaller_rec_batch =
315
+ // RecordBatch::try_new(Arc::clone(&batch_schema), child_cols)
316
+ // .expect("create smaller record batch");
317
+ // let persist_job = PersistJob {
318
+ // database_id: *database_id,
319
+ // table_id: *table_id,
320
+ // table_name: Arc::clone(&table_name),
321
+ // chunk_time: *smaller_chunk_time,
322
+ // path: ParquetFilePath::new(
323
+ // self.persister.node_identifier_prefix(),
324
+ // db_schema.name.as_ref(),
325
+ // database_id.as_u32(),
326
+ // table_name.as_ref(),
327
+ // table_id.as_u32(),
328
+ // *smaller_chunk_time,
329
+ // snapshot_details.last_wal_sequence_number,
330
+ // ),
331
+ // batch: smaller_rec_batch,
332
+ // // this schema.clone() can be avoided?
333
+ // schema: chunk.schema.clone(),
334
+ // timestamp_min_max: min_max.to_ts_min_max(),
335
+ // sort_key: table_buffer.sort_key.clone(),
336
+ // };
337
+ // persisting_chunks.push(persist_job);
338
+ // }
280
339
}
281
340
}
282
341
}
@@ -494,6 +553,7 @@ impl QueryableBuffer {
494
553
}
495
554
}
496
555
556
+ #[ derive( Debug ) ]
497
557
struct MinMax {
498
558
min : i64 ,
499
559
max : i64 ,
@@ -505,10 +565,7 @@ impl MinMax {
505
565
// it's good to start with i64::MAX for min and i64::MIN
506
566
// for max in loops so this type unlike TimestampMinMax
507
567
// doesn't check this pre-condition
508
- Self {
509
- min,
510
- max
511
- }
568
+ Self { min, max }
512
569
}
513
570
514
571
fn update ( & mut self , other : i64 ) {
0 commit comments