Skip to content

Commit 80ffee3

Browse files
Shefeek JinnahShefeek Jinnah
authored andcommitted
Adding some optimizations
1 parent 004a781 commit 80ffee3

1 file changed

Lines changed: 58 additions & 36 deletions

File tree

src/table.rs

Lines changed: 58 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ use futures::StreamExt;
3333
use object_store::path::Path as ObjectPath;
3434
use parquet::arrow::ParquetRecordBatchStreamBuilder;
3535
use parquet::arrow::async_reader::ParquetObjectReader;
36+
use tokio::sync::OnceCell;
3637

3738
#[cfg(feature = "encryption")]
3839
use datafusion::execution::parquet_encryption::EncryptionFactory;
@@ -53,11 +54,13 @@ pub fn delete_file_schema() -> SchemaRef {
5354
]))
5455
}
5556

57+
/// Cached schema mapping for renamed columns
58+
type SchemaMappingCache = (SchemaRef, HashMap<String, String>);
59+
5660
/// DuckLake table provider
5761
///
5862
/// Represents a table within a DuckLake schema and provides access to data via Parquet files.
5963
/// Caches snapshot_id and uses it to load all metadata atomically.
60-
#[derive(Debug)]
6164
pub struct DuckLakeTable {
6265
#[allow(dead_code)]
6366
table_id: i64,
@@ -75,11 +78,26 @@ pub struct DuckLakeTable {
7578
columns: Vec<DuckLakeTableColumn>,
7679
/// Table files with paths as stored in metadata (resolved on-the-fly when needed)
7780
table_files: Vec<DuckLakeTableFile>,
81+
/// Cached schema mapping (read_schema, name_mapping) - computed once on first scan
82+
schema_mapping_cache: OnceCell<SchemaMappingCache>,
7883
/// Encryption factory for decrypting encrypted Parquet files (when encryption feature is enabled)
7984
#[cfg(feature = "encryption")]
8085
encryption_factory: Option<Arc<dyn EncryptionFactory>>,
8186
}
8287

88+
impl std::fmt::Debug for DuckLakeTable {
89+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
90+
f.debug_struct("DuckLakeTable")
91+
.field("table_id", &self.table_id)
92+
.field("table_name", &self.table_name)
93+
.field("table_path", &self.table_path)
94+
.field("schema", &self.schema)
95+
.field("columns", &self.columns)
96+
.field("table_files", &self.table_files)
97+
.finish_non_exhaustive()
98+
}
99+
}
100+
83101
impl DuckLakeTable {
84102
/// Create a new DuckLake table
85103
pub fn new(
@@ -134,6 +152,7 @@ impl DuckLakeTable {
134152
table_files,
135153
#[cfg(feature = "encryption")]
136154
encryption_factory,
155+
schema_mapping_cache: OnceCell::new(),
137156
})
138157
}
139158

@@ -153,36 +172,44 @@ impl DuckLakeTable {
153172
ParquetSource::default()
154173
}
155174

156-
/// Resolve the schema to use for reading a file, handling column renames.
157-
/// Returns (read_schema, name_mapping) where name_mapping is old->new for renamed columns.
158-
async fn resolve_file_schema(
175+
/// Get the cached schema mapping, computing it once from the first file if needed.
176+
/// All files in a DuckLake table have the same schema structure, so we only need to check one.
177+
async fn get_schema_mapping(
159178
&self,
160179
state: &dyn Session,
161-
file: &DuckLakeFileData,
162-
) -> DataFusionResult<(SchemaRef, HashMap<String, String>)> {
163-
let resolved_path = self.resolve_file_path(file);
164-
let object_store = state
165-
.runtime_env()
166-
.object_store(self.object_store_url.as_ref())?;
167-
let object_path = ObjectPath::from(resolved_path.as_str());
168-
169-
let reader = ParquetObjectReader::new(object_store, object_path);
170-
let builder = ParquetRecordBatchStreamBuilder::new(reader)
171-
.await
172-
.map_err(|e| DataFusionError::External(Box::new(e)))?;
173-
174-
let field_id_map = extract_parquet_field_ids(builder.metadata());
175-
176-
// No field_ids means external file - use current schema directly
177-
if field_id_map.is_empty() {
178-
return Ok((self.schema.clone(), HashMap::new()));
179-
}
180+
) -> DataFusionResult<&SchemaMappingCache> {
181+
self.schema_mapping_cache
182+
.get_or_try_init(|| async {
183+
// If no files, use current schema with no rename mapping
184+
let Some(first_file) = self.table_files.first() else {
185+
return Ok((self.schema.clone(), HashMap::new()));
186+
};
187+
188+
let resolved_path = self.resolve_file_path(&first_file.file);
189+
let object_store = state
190+
.runtime_env()
191+
.object_store(self.object_store_url.as_ref())?;
192+
let object_path = ObjectPath::from(resolved_path.as_str());
193+
194+
let reader = ParquetObjectReader::new(object_store, object_path);
195+
let builder = ParquetRecordBatchStreamBuilder::new(reader)
196+
.await
197+
.map_err(|e| DataFusionError::External(Box::new(e)))?;
198+
199+
let field_id_map = extract_parquet_field_ids(builder.metadata());
200+
201+
// No field_ids means external file - use current schema directly
202+
if field_id_map.is_empty() {
203+
return Ok((self.schema.clone(), HashMap::new()));
204+
}
180205

181-
let (read_schema, name_mapping) =
182-
build_read_schema_with_field_id_mapping(&self.columns, &field_id_map)
183-
.map_err(|e| DataFusionError::External(Box::new(e)))?;
206+
let (read_schema, name_mapping) =
207+
build_read_schema_with_field_id_mapping(&self.columns, &field_id_map)
208+
.map_err(|e| DataFusionError::External(Box::new(e)))?;
184209

185-
Ok((Arc::new(read_schema), name_mapping))
210+
Ok((Arc::new(read_schema), name_mapping))
211+
})
212+
.await
186213
}
187214

188215
/// Read a delete file and extract all deleted row positions
@@ -250,9 +277,7 @@ impl DuckLakeTable {
250277
projection: Option<&Vec<usize>>,
251278
limit: Option<usize>,
252279
) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
253-
// Check if schema mapping is needed by reading first file's metadata
254-
// All files in a DuckLake table should have the same schema structure
255-
let (read_schema, name_mapping) = self.resolve_file_schema(state, &files[0].file).await?;
280+
let (read_schema, name_mapping) = self.get_schema_mapping(state).await?;
256281

257282
let partitioned_files: Vec<PartitionedFile> = files
258283
.iter()
@@ -292,15 +317,14 @@ impl DuckLakeTable {
292317

293318
// Wrap with ColumnRenameExec if column names differ
294319
if !name_mapping.is_empty() {
295-
// Build output schema with renamed columns (respecting projection)
296320
let output_schema = match projection {
297321
Some(indices) => Arc::new(self.schema.project(indices)?),
298322
None => self.schema.clone(),
299323
};
300324
Ok(Arc::new(ColumnRenameExec::new(
301325
parquet_exec,
302326
output_schema,
303-
name_mapping,
327+
name_mapping.clone(),
304328
)))
305329
} else {
306330
Ok(parquet_exec)
@@ -317,8 +341,7 @@ impl DuckLakeTable {
317341
projection: Option<&Vec<usize>>,
318342
limit: Option<usize>,
319343
) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
320-
// Check if schema mapping is needed by reading file's metadata
321-
let (read_schema, name_mapping) = self.resolve_file_schema(state, &table_file.file).await?;
344+
let (read_schema, name_mapping) = self.get_schema_mapping(state).await?;
322345

323346
// Resolve the data file path for scanning
324347
let resolved_path = self.resolve_file_path(&table_file.file);
@@ -369,15 +392,14 @@ impl DuckLakeTable {
369392

370393
// Wrap with ColumnRenameExec if column names differ
371394
if !name_mapping.is_empty() {
372-
// Build output schema with renamed columns (respecting projection)
373395
let output_schema = match projection {
374396
Some(indices) => Arc::new(self.schema.project(indices)?),
375397
None => self.schema.clone(),
376398
};
377399
Ok(Arc::new(ColumnRenameExec::new(
378400
exec_after_delete,
379401
output_schema,
380-
name_mapping,
402+
name_mapping.clone(),
381403
)))
382404
} else {
383405
Ok(exec_after_delete)

0 commit comments

Comments
 (0)