@@ -3,7 +3,7 @@ use crate::persister::Persister;
3
3
use crate :: write_buffer:: persisted_files:: PersistedFiles ;
4
4
use crate :: write_buffer:: table_buffer:: TableBuffer ;
5
5
use crate :: { ChunkFilter , ParquetFile , ParquetFileId , PersistedSnapshot } ;
6
- use crate :: { chunk:: BufferChunk , write_buffer:: table_buffer:: SnaphotChunkIter } ;
6
+ use crate :: { chunk:: BufferChunk , write_buffer:: table_buffer:: SnapshotChunkIter } ;
7
7
use anyhow:: Context ;
8
8
use arrow:: record_batch:: RecordBatch ;
9
9
use arrow_util:: util:: ensure_schema;
@@ -215,7 +215,7 @@ impl QueryableBuffer {
215
215
216
216
let chunk_time_to_chunk = & mut table_buffer. chunk_time_to_chunks ;
217
217
let snapshot_chunks = & mut table_buffer. snapshotting_chunks ;
218
- let snapshot_chunks_iter = SnaphotChunkIter {
218
+ let snapshot_chunks_iter = SnapshotChunkIter {
219
219
keys_to_remove : all_keys_to_remove. iter ( ) ,
220
220
map : chunk_time_to_chunk,
221
221
table_def,
@@ -453,7 +453,6 @@ impl QueryableBuffer {
453
453
}
454
454
}
455
455
456
- #[ allow( clippy:: too_many_arguments) ]
457
456
async fn sort_dedupe_parallel < I : Iterator < Item = PersistJob > > (
458
457
iterator : I ,
459
458
persister : & Arc < Persister > ,
@@ -505,7 +504,6 @@ async fn sort_dedupe_parallel<I: Iterator<Item = PersistJob>>(
505
504
}
506
505
}
507
506
508
- #[ allow( clippy:: too_many_arguments) ]
509
507
async fn sort_dedupe_serial < I : Iterator < Item = PersistJob > > (
510
508
iterator : I ,
511
509
persister : & Arc < Persister > ,
@@ -839,6 +837,19 @@ impl<'a> PersistJobGroupedIterator<'a> {
839
837
) ,
840
838
}
841
839
}
840
+
841
+ fn free_mem_hint ( & mut self ) -> ( u64 , u64 ) {
842
+ self . system . refresh_memory ( ) ;
843
+ let system_mem_bytes = self . system . free_memory ( ) - 100_000_000 ;
844
+ let cgroup_free_mem_bytes = self
845
+ . system
846
+ . cgroup_limits ( )
847
+ . map ( |limit| limit. free_memory )
848
+ . unwrap_or ( u64:: MAX ) ;
849
+ let system_mem_bytes = system_mem_bytes. min ( cgroup_free_mem_bytes) ;
850
+ let max_size_bytes = self . max_size_bytes . min ( system_mem_bytes) ;
851
+ ( system_mem_bytes, max_size_bytes)
852
+ }
842
853
}
843
854
844
855
impl Iterator for PersistJobGroupedIterator < ' _ > {
@@ -856,18 +867,18 @@ impl Iterator for PersistJobGroupedIterator<'_> {
856
867
let mut min_chunk_time = current_data. chunk_time ;
857
868
let mut current_size_bytes = current_data. total_batch_size ( ) ;
858
869
debug ! ( ?current_size_bytes, table_name = ?current_data. table_name, ">>> current_size_bytes for table" ) ;
859
- self . system . refresh_memory ( ) ;
860
- let system_mem_bytes = self . system . free_memory ( ) - 100_000_000 ;
861
- let max_size_bytes = self . max_size_bytes . min ( system_mem_bytes) ;
870
+ let ( system_mem_bytes, max_size_bytes) = self . free_mem_hint ( ) ;
862
871
debug ! (
863
872
max_size_bytes,
864
873
system_mem_bytes, ">>> max size bytes/system mem bytes"
865
874
) ;
866
875
867
876
while all_batches. len ( ) < self . chunk_size && current_size_bytes < max_size_bytes {
868
- debug ! ( ?current_size_bytes, ">>> current_size_bytes" ) ;
877
+ trace ! ( ?current_size_bytes, ">>> current_size_bytes" ) ;
869
878
if let Some ( next_data) = self . iter . peek ( ) {
870
- if next_data. table_id == * current_table_id {
879
+ if next_data. table_id == * current_table_id
880
+ && ( current_size_bytes + next_data. total_batch_size ( ) ) < max_size_bytes
881
+ {
871
882
let next = self . iter . next ( ) . unwrap ( ) ;
872
883
ts_min_max = ts_min_max. union ( & next. timestamp_min_max ) ;
873
884
min_chunk_time = min_chunk_time. min ( next. chunk_time ) ;
@@ -880,18 +891,17 @@ impl Iterator for PersistJobGroupedIterator<'_> {
880
891
break ;
881
892
}
882
893
}
894
+ debug ! ( ?current_size_bytes, ">>> final batch size in bytes" ) ;
883
895
884
896
let table_defn = self
885
897
. catalog
886
898
. db_schema_by_id ( & current_data. database_id ) ?
887
899
. table_definition_by_id ( & current_data. table_id ) ?;
888
900
901
+ let arrow = table_defn. schema . as_arrow ( ) ;
889
902
let all_schema_aligned_batches: Vec < RecordBatch > = all_batches
890
903
. iter ( )
891
- . map ( |batch| {
892
- ensure_schema ( & table_defn. schema . as_arrow ( ) , batch)
893
- . expect ( "batches should have same schema" )
894
- } )
904
+ . map ( |batch| ensure_schema ( & arrow, batch) . expect ( "batches should have same schema" ) )
895
905
. collect ( ) ;
896
906
897
907
Some ( PersistJob {
@@ -918,8 +928,8 @@ impl Iterator for PersistJobGroupedIterator<'_> {
918
928
}
919
929
920
930
pub ( crate ) struct SortDedupePersistSummary {
921
- pub file_size_bytes : u64 ,
922
- pub file_meta_data : FileMetaData ,
931
+ pub ( crate ) file_size_bytes : u64 ,
932
+ pub ( crate ) file_meta_data : FileMetaData ,
923
933
}
924
934
925
935
impl SortDedupePersistSummary {
@@ -1528,7 +1538,7 @@ mod tests {
1528
1538
persisted_files : Arc :: new ( PersistedFiles :: new ( ) ) ,
1529
1539
parquet_cache : None ,
1530
1540
gen1_duration : Gen1Duration :: new_1m ( ) ,
1531
- max_size_per_parquet_file_bytes : 100_000 ,
1541
+ max_size_per_parquet_file_bytes : 150_000 ,
1532
1542
} ;
1533
1543
let queryable_buffer = QueryableBuffer :: new ( queryable_buffer_args) ;
1534
1544
@@ -1537,7 +1547,9 @@ mod tests {
1537
1547
for i in 0 ..2 {
1538
1548
// create another write, this time with two tags, in a different gen1 block
1539
1549
let ts = Gen1Duration :: new_1m ( ) . as_duration ( ) . as_nanos ( ) as i64 + ( i * 240_000_000_000 ) ;
1540
- let lp = format ! ( "foo,t1=a,t2=b f1=3i,f2=3 {}" , ts) ;
1550
+ // keep these tags different to bar so that it's easier to spot the byte differences
1551
+ // in the logs, otherwise foo and bar report exact same usage in bytes
1552
+ let lp = format ! ( "foo,t1=foo_a f1={}i,f2={} {}" , i, i, ts) ;
1541
1553
debug ! ( ?lp, ">>> writing line" ) ;
1542
1554
let val = WriteValidator :: initialize ( db. clone ( ) , Arc :: clone ( & catalog) , 0 ) . unwrap ( ) ;
1543
1555
@@ -1566,9 +1578,8 @@ mod tests {
1566
1578
for i in 0 ..10 {
1567
1579
// create another write, this time with two tags, in a different gen1 block
1568
1580
let ts = Gen1Duration :: new_1m ( ) . as_duration ( ) . as_nanos ( ) as i64 + ( i * 240_000_000_000 ) ;
1569
- // let line = format!("bar,t1=a,t2=b f1=3i,f2=3 {}", ts);
1570
1581
let lp = format ! (
1571
- "bar,t1=a ,t2=b f1=3i,f2=3 {}\n bar,t1=a ,t2=c f1=4i,f2=3 {}\n bar,t1=ab,t2=b f1=5i,f2=3 {}" ,
1582
+ "bar,t1=br_a ,t2=br_b f1=3i,f2=3 {}\n bar,t1=br_a ,t2=br_c f1=4i,f2=3 {}\n bar,t1=ab,t2=bb f1=5i,f2=3 {}" ,
1572
1583
ts, ts, ts
1573
1584
) ;
1574
1585
debug ! ( ?lp, ">>> writing line" ) ;
@@ -1637,12 +1648,27 @@ mod tests {
1637
1648
assert_eq ! ( 2 , foo_file. row_count) ;
1638
1649
}
1639
1650
1640
- // bar had 10 writes with 3 lines, should write 4 files each with 9 rows or 3 row in them
1651
+ // bar had 10 writes (each in separate chunk) with 3 lines in each write,
1652
+ // so these are grouped but because of the larger writes and max memory
1653
+ // is set to 150_000 bytes at the top, we end up with 4 files.
1641
1654
let table = db. table_definition ( "bar" ) . unwrap ( ) ;
1642
1655
let files = queryable_buffer
1643
1656
. persisted_files
1644
1657
. get_files ( db. id , table. table_id ) ;
1645
1658
debug ! ( ?files, ">>> test: queryable buffer persisted files" ) ;
1659
+
1660
+ // Below is the growth in memory (bytes) as reported by arrow record batches
1661
+ //
1662
+ // >>> current_size_bytes for table current_size_bytes=43952 table_name="bar"
1663
+ // >>> final batch size in bytes current_size_bytes=131856
1664
+ // >>> current_size_bytes for table current_size_bytes=43952 table_name="bar"
1665
+ // >>> final batch size in bytes current_size_bytes=131856
1666
+ // >>> current_size_bytes for table current_size_bytes=43952 table_name="bar"
1667
+ // >>> final batch size in bytes current_size_bytes=131856
1668
+ // >>> current_size_bytes for table current_size_bytes=43952 table_name="bar"
1669
+ // >>> final batch size in bytes current_size_bytes=43952
1670
+ // >>> current_size_bytes for table current_size_bytes=34408 table_name="foo"
1671
+ // >>> final batch size in bytes current_size_bytes=68816
1646
1672
assert_eq ! ( 4 , files. len( ) ) ;
1647
1673
for bar_file in files {
1648
1674
debug ! ( ?bar_file, ">>> test: bar_file" ) ;
0 commit comments