Skip to content

Commit d5d5067

Browse files
dmunchion-elgreco
authored andcommitted
feat: configurable column encoding for parquet checkpoint files
Signed-off-by: Daniel Münch <[email protected]>
1 parent 1befab9 commit d5d5067

File tree

3 files changed

+125
-18
lines changed

3 files changed

+125
-18
lines changed

crates/core/src/protocol/checkpoints.rs

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use itertools::Itertools;
1313
use object_store::{Error, ObjectStore};
1414
use parquet::arrow::ArrowWriter;
1515
use parquet::basic::Compression;
16+
use parquet::basic::Encoding;
1617
use parquet::errors::ParquetError;
1718
use parquet::file::properties::WriterProperties;
1819
use regex::Regex;
@@ -354,17 +355,23 @@ fn parquet_bytes_from_state(
354355
);
355356

356357
debug!("Writing to checkpoint parquet buffer...");
358+
359+
let writer_properties = if state.table_config().use_checkpoint_rle() {
360+
WriterProperties::builder()
361+
.set_compression(Compression::SNAPPY)
362+
.build()
363+
} else {
364+
WriterProperties::builder()
365+
.set_compression(Compression::SNAPPY)
366+
.set_dictionary_enabled(false)
367+
.set_encoding(Encoding::PLAIN)
368+
.build()
369+
};
370+
357371
// Write the Checkpoint parquet file.
358372
let mut bytes = vec![];
359-
let mut writer = ArrowWriter::try_new(
360-
&mut bytes,
361-
arrow_schema.clone(),
362-
Some(
363-
WriterProperties::builder()
364-
.set_compression(Compression::SNAPPY)
365-
.build(),
366-
),
367-
)?;
373+
let mut writer =
374+
ArrowWriter::try_new(&mut bytes, arrow_schema.clone(), Some(writer_properties))?;
368375
let mut decoder = ReaderBuilder::new(arrow_schema)
369376
.with_batch_size(CHECKPOINT_RECORD_BATCH_SIZE)
370377
.build_decoder()?;

crates/core/src/table/config.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,10 @@ pub enum TableProperty {
3636
/// stats_parsed column and to write partition values as a struct for partitionValues_parsed.
3737
CheckpointWriteStatsAsStruct,
3838

39+
/// true for Delta Lake to write checkpoint files using run length encoding (RLE).
40+
/// Some readers don't support run length encoding (i.e. Fabric) so this can be disabled.
41+
CheckpointUseRunLengthEncoding,
42+
3943
/// Whether column mapping is enabled for Delta table columns and the corresponding
4044
/// Parquet columns that use different names.
4145
ColumnMappingMode,
@@ -126,6 +130,7 @@ impl AsRef<str> for TableProperty {
126130
Self::AutoOptimizeOptimizeWrite => "delta.autoOptimize.optimizeWrite",
127131
Self::CheckpointWriteStatsAsJson => "delta.checkpoint.writeStatsAsJson",
128132
Self::CheckpointWriteStatsAsStruct => "delta.checkpoint.writeStatsAsStruct",
133+
Self::CheckpointUseRunLengthEncoding => "delta-rs.checkpoint.useRunLengthEncoding",
129134
Self::CheckpointPolicy => "delta.checkpointPolicy",
130135
Self::ColumnMappingMode => "delta.columnMapping.mode",
131136
Self::DataSkippingNumIndexedCols => "delta.dataSkippingNumIndexedCols",
@@ -158,6 +163,7 @@ impl FromStr for TableProperty {
158163
"delta.autoOptimize.optimizeWrite" => Ok(Self::AutoOptimizeOptimizeWrite),
159164
"delta.checkpoint.writeStatsAsJson" => Ok(Self::CheckpointWriteStatsAsJson),
160165
"delta.checkpoint.writeStatsAsStruct" => Ok(Self::CheckpointWriteStatsAsStruct),
166+
"delta-rs.checkpoint.useRunLengthEncoding" => Ok(Self::CheckpointUseRunLengthEncoding),
161167
"delta.checkpointPolicy" => Ok(Self::CheckpointPolicy),
162168
"delta.columnMapping.mode" => Ok(Self::ColumnMappingMode),
163169
"delta.dataSkippingNumIndexedCols" => Ok(Self::DataSkippingNumIndexedCols),
@@ -238,6 +244,13 @@ impl TableConfig<'_> {
238244
bool,
239245
false
240246
),
247+
(
248+
"true for Delta Lake to write checkpoint files using run length encoding (RLE)",
249+
TableProperty::CheckpointUseRunLengthEncoding,
250+
use_checkpoint_rle,
251+
bool,
252+
true
253+
),
241254
(
242255
"The target file size in bytes or higher units for file tuning",
243256
TableProperty::TargetFileSize,

crates/core/tests/checkpoint_writer.rs

Lines changed: 96 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,11 @@ use deltalake_core::protocol::DeltaOperation;
66

77
mod simple_checkpoint {
88
use deltalake_core::*;
9+
use parquet::basic::Encoding;
10+
use parquet::file::reader::{FileReader, SerializedFileReader};
911
use pretty_assertions::assert_eq;
10-
use std::fs;
12+
use regex::Regex;
13+
use std::fs::{self, File};
1114
use std::path::{Path, PathBuf};
1215

1316
#[tokio::test]
@@ -31,6 +34,9 @@ mod simple_checkpoint {
3134
let checkpoint_path = log_path.join("00000000000000000005.checkpoint.parquet");
3235
assert!(checkpoint_path.as_path().exists());
3336

37+
// Check that the checkpoint does use run length encoding
38+
assert_column_rle_encoding(checkpoint_path, true);
39+
3440
// _last_checkpoint should exist and point to the correct version
3541
let version = get_last_checkpoint_version(&log_path);
3642
assert_eq!(5, version);
@@ -42,6 +48,9 @@ mod simple_checkpoint {
4248
let checkpoint_path = log_path.join("00000000000000000010.checkpoint.parquet");
4349
assert!(checkpoint_path.as_path().exists());
4450

51+
// Check that the checkpoint does use run length encoding
52+
assert_column_rle_encoding(checkpoint_path, true);
53+
4554
// _last_checkpoint should exist and point to the correct version
4655
let version = get_last_checkpoint_version(&log_path);
4756
assert_eq!(10, version);
@@ -53,6 +62,77 @@ mod simple_checkpoint {
5362
assert_eq!(12, files.count());
5463
}
5564

65+
#[tokio::test]
66+
async fn checkpoint_run_length_encoding_test() {
67+
let table_location = "../test/tests/data/checkpoints";
68+
let table_path = PathBuf::from(table_location);
69+
let log_path = table_path.join("_delta_log");
70+
71+
// Delete checkpoint files from previous runs
72+
cleanup_checkpoint_files(log_path.as_path());
73+
74+
// Load the delta table
75+
let base_table = deltalake_core::open_table(table_location).await.unwrap();
76+
77+
// Set the table properties to disable run length encoding
78+
// this alters table version and should be done in a more principled way
79+
let table = DeltaOps(base_table)
80+
.set_tbl_properties()
81+
.with_properties(std::collections::HashMap::<String, String>::from([(
82+
"delta-rs.checkpoint.useRunLengthEncoding".to_string(),
83+
"false".to_string(),
84+
)]))
85+
.await
86+
.unwrap();
87+
88+
// Write a checkpoint
89+
checkpoints::create_checkpoint(&table, None).await.unwrap();
90+
91+
// checkpoint should exist
92+
let checkpoint_path = log_path.join("00000000000000000013.checkpoint.parquet");
93+
assert!(checkpoint_path.as_path().exists());
94+
95+
// Check that the checkpoint does not use run length encoding
96+
assert_column_rle_encoding(checkpoint_path, false);
97+
98+
// _last_checkpoint should exist and point to the correct version
99+
let version = get_last_checkpoint_version(&log_path);
100+
assert_eq!(table.version(), version);
101+
102+
// delta table should load just fine with the checkpoint in place
103+
let table_result = deltalake_core::open_table(table_location).await.unwrap();
104+
let table = table_result;
105+
let files = table.get_files_iter().unwrap();
106+
assert_eq!(12, files.count());
107+
}
108+
109+
fn assert_column_rle_encoding(file_path: PathBuf, should_be_rle: bool) {
110+
let file = File::open(&file_path).unwrap();
111+
let reader = SerializedFileReader::new(file).unwrap();
112+
let meta = reader.metadata();
113+
let mut found_rle = false;
114+
115+
for i in 0..meta.num_row_groups() {
116+
let row_group = meta.row_group(i);
117+
for j in 0..row_group.num_columns() {
118+
let column_chunk: &parquet::file::metadata::ColumnChunkMetaData =
119+
row_group.column(j);
120+
121+
for encoding in column_chunk.encodings() {
122+
if *encoding == Encoding::RLE_DICTIONARY {
123+
found_rle = true;
124+
}
125+
}
126+
}
127+
}
128+
129+
if should_be_rle {
130+
assert!(found_rle, "Expected RLE_DICTIONARY encoding");
131+
} else {
132+
assert!(!found_rle, "Expected no RLE_DICTIONARY encoding");
133+
}
134+
}
135+
56136
fn get_last_checkpoint_version(log_path: &Path) -> i64 {
57137
let last_checkpoint_path = log_path.join("_last_checkpoint");
58138
assert!(last_checkpoint_path.as_path().exists());
@@ -69,15 +149,22 @@ mod simple_checkpoint {
69149
}
70150

71151
fn cleanup_checkpoint_files(log_path: &Path) {
72-
let paths = fs::read_dir(log_path).unwrap();
73-
for d in paths.flatten() {
74-
let path = d.path();
75-
76-
if path.file_name().unwrap() == "_last_checkpoint"
77-
|| (path.extension().is_some() && path.extension().unwrap() == "parquet")
78-
{
79-
fs::remove_file(path).unwrap();
152+
let re = Regex::new(r"^(\d{20})\.json$").unwrap();
153+
for entry in fs::read_dir(log_path).unwrap().flatten() {
154+
let path = entry.path();
155+
let filename = match path.file_name().and_then(|n| n.to_str()) {
156+
Some(name) => name,
157+
None => continue,
158+
};
159+
160+
if let Some(caps) = re.captures(filename) {
161+
if let Ok(num) = caps[1].parse::<u64>() {
162+
if num <= 12 {
163+
continue;
164+
}
165+
}
80166
}
167+
let _ = fs::remove_file(path);
81168
}
82169
}
83170
}

0 commit comments

Comments
 (0)