Skip to content

Commit 595a32c

Browse files
Some more optimizations
1 parent 380dcfa commit 595a32c

4 files changed

Lines changed: 14 additions & 37 deletions

File tree

src/catalog.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,6 @@ use crate::metadata_writer::MetadataWriter;
2020
struct WriteConfig {
2121
/// Metadata writer for catalog operations
2222
writer: Arc<dyn MetadataWriter>,
23-
/// Base data path for writing files
24-
data_path: String,
2523
}
2624

2725
/// DuckLake catalog provider
@@ -123,7 +121,6 @@ impl DuckLakeCatalog {
123121
catalog_path,
124122
write_config: Some(WriteConfig {
125123
writer,
126-
data_path: data_path_str,
127124
}),
128125
})
129126
}
@@ -197,7 +194,7 @@ impl CatalogProvider for DuckLakeCatalog {
197194
// Configure writer if this catalog is writable
198195
#[cfg(feature = "write")]
199196
let schema = if let Some(ref config) = self.write_config {
200-
schema.with_writer(Arc::clone(&config.writer), config.data_path.clone())
197+
schema.with_writer(Arc::clone(&config.writer))
201198
} else {
202199
schema
203200
};

src/insert_exec.rs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@ pub struct DuckLakeInsertExec {
3939
arrow_schema: SchemaRef,
4040
write_mode: WriteMode,
4141
object_store_url: Arc<ObjectStoreUrl>,
42-
data_path: String,
4342
cache: PlanProperties,
4443
}
4544

@@ -53,7 +52,6 @@ impl DuckLakeInsertExec {
5352
arrow_schema: SchemaRef,
5453
write_mode: WriteMode,
5554
object_store_url: Arc<ObjectStoreUrl>,
56-
data_path: String,
5755
) -> Self {
5856
let cache = Self::compute_properties();
5957
Self {
@@ -64,7 +62,6 @@ impl DuckLakeInsertExec {
6462
arrow_schema,
6563
write_mode,
6664
object_store_url,
67-
data_path,
6865
cache,
6966
}
7067
}
@@ -85,7 +82,6 @@ impl Debug for DuckLakeInsertExec {
8582
.field("schema_name", &self.schema_name)
8683
.field("table_name", &self.table_name)
8784
.field("write_mode", &self.write_mode)
88-
.field("data_path", &self.data_path)
8985
.finish_non_exhaustive()
9086
}
9187
}
@@ -140,7 +136,6 @@ impl ExecutionPlan for DuckLakeInsertExec {
140136
Arc::clone(&self.arrow_schema),
141137
self.write_mode,
142138
self.object_store_url.clone(),
143-
self.data_path.clone(),
144139
)))
145140
}
146141

src/table.rs

Lines changed: 1 addition & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -95,9 +95,6 @@ pub struct DuckLakeTable {
9595
/// Metadata writer for write operations (when write feature is enabled)
9696
#[cfg(feature = "write")]
9797
writer: Option<Arc<dyn MetadataWriter>>,
98-
/// Data path for write operations (when write feature is enabled)
99-
#[cfg(feature = "write")]
100-
data_path: Option<String>,
10198
}
10299

103100
impl std::fmt::Debug for DuckLakeTable {
@@ -172,8 +169,6 @@ impl DuckLakeTable {
172169
schema_name: None,
173170
#[cfg(feature = "write")]
174171
writer: None,
175-
#[cfg(feature = "write")]
176-
data_path: None,
177172
})
178173
}
179174

@@ -395,17 +390,10 @@ impl DuckLakeTable {
395390
/// # Arguments
396391
/// * `schema_name` - Name of the schema this table belongs to
397392
/// * `writer` - Metadata writer for catalog operations
398-
/// * `data_path` - Base path for data files
399393
#[cfg(feature = "write")]
400-
pub fn with_writer(
401-
mut self,
402-
schema_name: String,
403-
writer: Arc<dyn MetadataWriter>,
404-
data_path: String,
405-
) -> Self {
394+
pub fn with_writer(mut self, schema_name: String, writer: Arc<dyn MetadataWriter>) -> Self {
406395
self.schema_name = Some(schema_name);
407396
self.writer = Some(writer);
408-
self.data_path = Some(data_path);
409397
self
410398
}
411399

@@ -588,10 +576,6 @@ impl TableProvider for DuckLakeTable {
588576
DataFusionError::Internal("Schema name not set for writable table".to_string())
589577
})?;
590578

591-
let data_path = self.data_path.as_ref().ok_or_else(|| {
592-
DataFusionError::Internal("Data path not set for writable table".to_string())
593-
})?;
594-
595579
let write_mode = match insert_op {
596580
InsertOp::Append => WriteMode::Append,
597581
InsertOp::Overwrite | InsertOp::Replace => WriteMode::Replace,
@@ -605,7 +589,6 @@ impl TableProvider for DuckLakeTable {
605589
self.schema(),
606590
write_mode,
607591
self.object_store_url.clone(),
608-
data_path.clone(),
609592
)))
610593
}
611594
}

tests/concurrent_write_tests.rs

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -159,11 +159,12 @@ async fn test_write_session_cleanup_on_drop() {
159159
let (writer, _): (SqliteMetadataWriter, _) = create_test_writer(&temp_dir).await;
160160
let writer: Arc<dyn MetadataWriter> = Arc::new(writer);
161161
let schema = create_user_schema();
162+
let object_store = create_object_store();
162163

163164
// Dropped session should NOT upload data (buffer is just dropped)
164165
let file_path_str = {
165166
let table_writer =
166-
DuckLakeTableWriter::new(Arc::clone(&writer), create_object_store()).unwrap();
167+
DuckLakeTableWriter::new(Arc::clone(&writer), Arc::clone(&object_store)).unwrap();
167168
let mut session = table_writer
168169
.begin_write("main", "dropped_table", &schema, WriteMode::Replace)
169170
.unwrap();
@@ -172,16 +173,16 @@ async fn test_write_session_cleanup_on_drop() {
172173
session.file_path().to_string()
173174
};
174175
// With buffer approach, no file is created until finish() is called
175-
let path = std::path::Path::new(&file_path_str);
176+
let dropped_path = object_store::path::Path::from(file_path_str);
176177
assert!(
177-
!path.exists(),
178-
"No file should exist since session was dropped without finish()"
178+
object_store.get(&dropped_path).await.is_err(),
179+
"No object should exist since session was dropped without finish()"
179180
);
180181

181-
// Finished session should upload and keep file
182+
// Finished session should upload the file
182183
let finished_path_str = {
183184
let table_writer =
184-
DuckLakeTableWriter::new(Arc::clone(&writer), create_object_store()).unwrap();
185+
DuckLakeTableWriter::new(Arc::clone(&writer), Arc::clone(&object_store)).unwrap();
185186
let mut session = table_writer
186187
.begin_write("main", "finished_table", &schema, WriteMode::Replace)
187188
.unwrap();
@@ -191,10 +192,11 @@ async fn test_write_session_cleanup_on_drop() {
191192
session.finish().await.unwrap();
192193
p
193194
};
194-
// LocalFileSystem stores files at /<object_path>, so prepend /
195-
let finished_fs_path = format!("/{}", finished_path_str);
196-
let finished_path = std::path::Path::new(&finished_fs_path);
197-
assert!(finished_path.exists(), "Finished file should exist");
195+
let finished_path = object_store::path::Path::from(finished_path_str);
196+
assert!(
197+
object_store.get(&finished_path).await.is_ok(),
198+
"Finished file should exist in object store"
199+
);
198200
}
199201

200202
#[tokio::test(flavor = "multi_thread")]

0 commit comments

Comments
 (0)