Skip to content

Commit 90a4eef

Browse files
committed
optimization attempt
1 parent 057b262 commit 90a4eef

1 file changed

Lines changed: 25 additions & 19 deletions

File tree

src/database.rs

Lines changed: 25 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -482,36 +482,42 @@ impl Database {
482482
}
483483

484484
/// Creates standard writer properties used across different operations
485-
fn create_writer_properties(&self, sorting_columns: Vec<SortingColumn>) -> WriterProperties {
486-
use deltalake::datafusion::parquet::basic::{Compression, ZstdLevel};
485+
fn create_writer_properties(&self, sorting_columns: Vec<SortingColumn>, fields: &[crate::schema_loader::FieldDef]) -> WriterProperties {
486+
use deltalake::datafusion::parquet::basic::{Compression, Encoding, ZstdLevel};
487487
use deltalake::datafusion::parquet::file::properties::EnabledStatistics;
488+
use deltalake::datafusion::parquet::schema::types::ColumnPath;
488489

489490
let page_row_count_limit = self.config.parquet.timefusion_page_row_count_limit;
490491
let compression_level = self.config.parquet.timefusion_zstd_compression_level;
491492
let max_row_group_size = self.config.parquet.timefusion_max_row_group_size;
492493

493-
WriterProperties::builder()
494-
// Use ZSTD compression with high level for maximum compression ratio
494+
let mut builder = WriterProperties::builder()
495495
.set_compression(Compression::ZSTD(
496496
ZstdLevel::try_new(compression_level).unwrap_or_else(|_| ZstdLevel::try_new(ZSTD_COMPRESSION_LEVEL).unwrap()),
497497
))
498-
// Set max row group size for better compression and query performance
499498
.set_max_row_group_size(max_row_group_size)
500-
// Enable dictionary encoding for better compression of repetitive values
501499
.set_dictionary_enabled(true)
502-
// Dictionary page size - 8MB allows larger dictionaries for better compression
503-
.set_dictionary_page_size_limit(8388608) // 8MB
504-
// Enable statistics for better query optimization
500+
.set_dictionary_page_size_limit(8388608)
505501
.set_statistics_enabled(EnabledStatistics::Page)
506-
// Enable bloom filters for predicate pushdown (read-side already enabled)
507502
.set_bloom_filter_enabled(!self.config.parquet.timefusion_bloom_filter_disabled)
508503
.set_bloom_filter_fpp(0.01)
509504
.set_bloom_filter_ndv(100_000)
510-
// Set page row count limit for better compression
511505
.set_data_page_row_count_limit(page_row_count_limit)
512-
// Set sorting columns for better query performance on sorted data
513-
.set_sorting_columns(if sorting_columns.is_empty() { None } else { Some(sorting_columns) })
514-
.build()
506+
.set_sorting_columns(if sorting_columns.is_empty() { None } else { Some(sorting_columns) });
507+
508+
for field in fields {
509+
let dt = field.data_type.as_str();
510+
let col = ColumnPath::from(field.name.as_str());
511+
if dt.starts_with("Timestamp") || dt == "Date32" {
512+
builder = builder
513+
.set_column_encoding(col.clone(), Encoding::DELTA_BINARY_PACKED)
514+
.set_column_dictionary_enabled(col, false);
515+
} else if matches!(dt, "Int32" | "Int64" | "UInt32" | "UInt64") {
516+
builder = builder.set_column_encoding(col, Encoding::DELTA_BINARY_PACKED);
517+
}
518+
}
519+
520+
builder.build()
515521
}
516522

517523
/// Updates a DeltaTable and handles errors consistently
@@ -977,7 +983,7 @@ impl Database {
977983

978984
// Time-series optimized settings
979985
// Larger batch size for better throughput with time-series data
980-
let _ = options.set("datafusion.execution.batch_size", "8192");
986+
let _ = options.set("datafusion.execution.batch_size", "65536");
981987

982988
// Optimize for sorted data (timestamps are typically sorted)
983989
let _ = options.set("datafusion.optimizer.prefer_existing_sort", "true");
@@ -998,7 +1004,7 @@ impl Database {
9981004

9991005
// Memory management for large time-series queries
10001006
let _ = options.set("datafusion.execution.coalesce_batches", "true");
1001-
let _ = options.set("datafusion.execution.coalesce_target_batch_size", "8192");
1007+
let _ = options.set("datafusion.execution.coalesce_target_batch_size", "65536");
10021008

10031009
// Enable all optimizer rules for maximum optimization
10041010
let _ = options.set("datafusion.optimizer.max_passes", "5");
@@ -1638,7 +1644,7 @@ impl Database {
16381644
// Get the appropriate schema for this table
16391645
let schema = get_schema(&table_name).unwrap_or_else(get_default_schema);
16401646

1641-
let writer_properties = self.create_writer_properties(schema.sorting_columns());
1647+
let writer_properties = self.create_writer_properties(schema.sorting_columns(), &schema.fields);
16421648

16431649
// Retry logic for concurrent writes
16441650
let max_retries = 5;
@@ -1758,7 +1764,7 @@ impl Database {
17581764
info!("Optimizing files from {} date partitions", partition_filters.len());
17591765

17601766
let schema = get_schema(table_name).unwrap_or_else(get_default_schema);
1761-
let writer_properties = self.create_writer_properties(schema.sorting_columns());
1767+
let writer_properties = self.create_writer_properties(schema.sorting_columns(), &schema.fields);
17621768

17631769
let optimize_result = table_clone
17641770
.optimize()
@@ -1819,7 +1825,7 @@ impl Database {
18191825
.with_filters(&partition_filters)
18201826
.with_type(deltalake::operations::optimize::OptimizeType::Compact)
18211827
.with_target_size(target_size as u64)
1822-
.with_writer_properties(self.create_writer_properties(schema.sorting_columns()))
1828+
.with_writer_properties(self.create_writer_properties(schema.sorting_columns(), &schema.fields))
18231829
.with_min_commit_interval(tokio::time::Duration::from_secs(30))
18241830
.await;
18251831

0 commit comments

Comments
 (0)