Skip to content

Commit 5f48cf2

Browse files
Add S3/ObjectStore write support for DuckLake catalogs (#49)
1 parent 7824e6b commit 5f48cf2

9 files changed

Lines changed: 289 additions & 248 deletions

File tree

src/catalog.rs

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,17 +13,13 @@ use datafusion::datasource::object_store::ObjectStoreUrl;
1313

1414
#[cfg(feature = "write")]
1515
use crate::metadata_writer::MetadataWriter;
16-
#[cfg(feature = "write")]
17-
use std::path::PathBuf;
1816

1917
/// Configuration for write operations (when write feature is enabled)
2018
#[cfg(feature = "write")]
2119
#[derive(Debug, Clone)]
2220
struct WriteConfig {
2321
/// Metadata writer for catalog operations
2422
writer: Arc<dyn MetadataWriter>,
25-
/// Base data path for writing files
26-
data_path: PathBuf,
2723
}
2824

2925
/// DuckLake catalog provider
@@ -125,7 +121,6 @@ impl DuckLakeCatalog {
125121
catalog_path,
126122
write_config: Some(WriteConfig {
127123
writer,
128-
data_path: PathBuf::from(&data_path_str),
129124
}),
130125
})
131126
}
@@ -199,7 +194,7 @@ impl CatalogProvider for DuckLakeCatalog {
199194
// Configure writer if this catalog is writable
200195
#[cfg(feature = "write")]
201196
let schema = if let Some(ref config) = self.write_config {
202-
schema.with_writer(Arc::clone(&config.writer), config.data_path.clone())
197+
schema.with_writer(Arc::clone(&config.writer))
203198
} else {
204199
schema
205200
};

src/error.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,10 @@ pub enum DuckLakeError {
5555
#[error("Unsupported feature: {0}")]
5656
Unsupported(String),
5757

58+
/// ObjectStore error
59+
#[error("ObjectStore error: {0}")]
60+
ObjectStore(#[from] object_store::Error),
61+
5862
/// IO error
5963
#[error("IO error: {0}")]
6064
Io(#[from] std::io::Error),

src/insert_exec.rs

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,12 @@
66
77
use std::any::Any;
88
use std::fmt::{self, Debug};
9-
use std::path::PathBuf;
109
use std::sync::Arc;
1110

1211
use arrow::array::{ArrayRef, RecordBatch, UInt64Array};
1312
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
1413
use datafusion::error::{DataFusionError, Result as DataFusionResult};
14+
use datafusion::execution::object_store::ObjectStoreUrl;
1515
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
1616
use datafusion::physical_expr::{EquivalenceProperties, Partitioning};
1717
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
@@ -38,7 +38,7 @@ pub struct DuckLakeInsertExec {
3838
table_name: String,
3939
arrow_schema: SchemaRef,
4040
write_mode: WriteMode,
41-
data_path: PathBuf,
41+
object_store_url: Arc<ObjectStoreUrl>,
4242
cache: PlanProperties,
4343
}
4444

@@ -51,7 +51,7 @@ impl DuckLakeInsertExec {
5151
table_name: String,
5252
arrow_schema: SchemaRef,
5353
write_mode: WriteMode,
54-
data_path: PathBuf,
54+
object_store_url: Arc<ObjectStoreUrl>,
5555
) -> Self {
5656
let cache = Self::compute_properties();
5757
Self {
@@ -61,7 +61,7 @@ impl DuckLakeInsertExec {
6161
table_name,
6262
arrow_schema,
6363
write_mode,
64-
data_path,
64+
object_store_url,
6565
cache,
6666
}
6767
}
@@ -82,7 +82,6 @@ impl Debug for DuckLakeInsertExec {
8282
.field("schema_name", &self.schema_name)
8383
.field("table_name", &self.table_name)
8484
.field("write_mode", &self.write_mode)
85-
.field("data_path", &self.data_path)
8685
.finish_non_exhaustive()
8786
}
8887
}
@@ -136,7 +135,7 @@ impl ExecutionPlan for DuckLakeInsertExec {
136135
self.table_name.clone(),
137136
Arc::clone(&self.arrow_schema),
138137
self.write_mode,
139-
self.data_path.clone(),
138+
self.object_store_url.clone(),
140139
)))
141140
}
142141

@@ -158,19 +157,24 @@ impl ExecutionPlan for DuckLakeInsertExec {
158157
let table_name = self.table_name.clone();
159158
let arrow_schema = Arc::clone(&self.arrow_schema);
160159
let write_mode = self.write_mode;
161-
let data_path = self.data_path.clone();
160+
let object_store_url = self.object_store_url.clone();
162161
let output_schema = make_insert_count_schema();
163162

164163
let stream = stream::once(async move {
165-
let input_stream = input.execute(0, context)?;
164+
let input_stream = input.execute(0, Arc::clone(&context))?;
166165
let batches: Vec<RecordBatch> = input_stream.try_collect().await?;
167166

168167
if batches.is_empty() {
169168
let count_array: ArrayRef = Arc::new(UInt64Array::from(vec![0u64]));
170169
return Ok(RecordBatch::try_new(output_schema, vec![count_array])?);
171170
}
172171

173-
let table_writer = DuckLakeTableWriter::new(writer)
172+
// Get object store from runtime environment
173+
let object_store = context
174+
.runtime_env()
175+
.object_store(object_store_url.as_ref())?;
176+
177+
let table_writer = DuckLakeTableWriter::new(writer, object_store)
174178
.map_err(|e| DataFusionError::External(Box::new(e)))?;
175179

176180
let schema_without_metadata =
@@ -185,8 +189,6 @@ impl ExecutionPlan for DuckLakeInsertExec {
185189
)
186190
.map_err(|e| DataFusionError::External(Box::new(e)))?;
187191

188-
let _ = data_path;
189-
190192
for batch in &batches {
191193
session
192194
.write_batch(batch)
@@ -197,6 +199,7 @@ impl ExecutionPlan for DuckLakeInsertExec {
197199

198200
session
199201
.finish()
202+
.await
200203
.map_err(|e| DataFusionError::External(Box::new(e)))?;
201204

202205
let count_array: ArrayRef = Arc::new(UInt64Array::from(vec![row_count]));

src/schema.rs

Lines changed: 5 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@ use crate::table::DuckLakeTable;
1616
use crate::metadata_writer::{ColumnDef, MetadataWriter, WriteMode};
1717
#[cfg(feature = "write")]
1818
use datafusion::error::DataFusionError;
19-
#[cfg(feature = "write")]
20-
use std::path::PathBuf;
2119

2220
/// Validate table name to prevent path traversal attacks.
2321
/// Table names are used to construct file paths, so we must ensure they
@@ -64,9 +62,6 @@ pub struct DuckLakeSchema {
6462
/// Metadata writer for write operations (when write feature is enabled)
6563
#[cfg(feature = "write")]
6664
writer: Option<Arc<dyn MetadataWriter>>,
67-
/// Data path for write operations (when write feature is enabled)
68-
#[cfg(feature = "write")]
69-
data_path: Option<PathBuf>,
7065
}
7166

7267
impl DuckLakeSchema {
@@ -88,23 +83,19 @@ impl DuckLakeSchema {
8883
schema_path,
8984
#[cfg(feature = "write")]
9085
writer: None,
91-
#[cfg(feature = "write")]
92-
data_path: None,
9386
}
9487
}
9588

9689
/// Configure this schema for write operations.
9790
///
98-
/// This method enables write support by attaching a metadata writer and data path.
91+
/// This method enables write support by attaching a metadata writer.
9992
/// Once configured, the schema can handle CREATE TABLE AS and tables can handle INSERT INTO.
10093
///
10194
/// # Arguments
10295
/// * `writer` - Metadata writer for catalog operations
103-
/// * `data_path` - Base path for data files
10496
#[cfg(feature = "write")]
105-
pub fn with_writer(mut self, writer: Arc<dyn MetadataWriter>, data_path: PathBuf) -> Self {
97+
pub fn with_writer(mut self, writer: Arc<dyn MetadataWriter>) -> Self {
10698
self.writer = Some(writer);
107-
self.data_path = Some(data_path);
10899
self
109100
}
110101
}
@@ -157,14 +148,8 @@ impl SchemaProvider for DuckLakeSchema {
157148

158149
// Configure writer if this schema is writable
159150
#[cfg(feature = "write")]
160-
let table = if let (Some(writer), Some(data_path)) =
161-
(self.writer.as_ref(), self.data_path.as_ref())
162-
{
163-
table.with_writer(
164-
self.schema_name.clone(),
165-
Arc::clone(writer),
166-
data_path.clone(),
167-
)
151+
let table = if let Some(writer) = self.writer.as_ref() {
152+
table.with_writer(self.schema_name.clone(), Arc::clone(writer))
168153
} else {
169154
table
170155
};
@@ -203,10 +188,6 @@ impl SchemaProvider for DuckLakeSchema {
203188
)
204189
})?;
205190

206-
let data_path = self.data_path.as_ref().ok_or_else(|| {
207-
DataFusionError::Internal("Data path not set for writable schema".to_string())
208-
})?;
209-
210191
// Convert Arrow schema to ColumnDefs
211192
let arrow_schema = table.schema();
212193
let columns: Vec<ColumnDef> = arrow_schema
@@ -236,11 +217,7 @@ impl SchemaProvider for DuckLakeSchema {
236217
table_path,
237218
)
238219
.map_err(|e| DataFusionError::External(Box::new(e)))?
239-
.with_writer(
240-
self.schema_name.clone(),
241-
Arc::clone(writer),
242-
data_path.clone(),
243-
);
220+
.with_writer(self.schema_name.clone(), Arc::clone(writer));
244221

245222
Ok(Some(Arc::new(writable_table) as Arc<dyn TableProvider>))
246223
}

src/table.rs

Lines changed: 2 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@ use crate::types::{
1919
use crate::insert_exec::DuckLakeInsertExec;
2020
#[cfg(feature = "write")]
2121
use crate::metadata_writer::{MetadataWriter, WriteMode};
22-
#[cfg(feature = "write")]
23-
use std::path::PathBuf;
2422

2523
#[cfg(feature = "encryption")]
2624
use crate::encryption::EncryptionFactoryBuilder;
@@ -97,9 +95,6 @@ pub struct DuckLakeTable {
9795
/// Metadata writer for write operations (when write feature is enabled)
9896
#[cfg(feature = "write")]
9997
writer: Option<Arc<dyn MetadataWriter>>,
100-
/// Data path for write operations (when write feature is enabled)
101-
#[cfg(feature = "write")]
102-
data_path: Option<PathBuf>,
10398
}
10499

105100
impl std::fmt::Debug for DuckLakeTable {
@@ -174,8 +169,6 @@ impl DuckLakeTable {
174169
schema_name: None,
175170
#[cfg(feature = "write")]
176171
writer: None,
177-
#[cfg(feature = "write")]
178-
data_path: None,
179172
})
180173
}
181174

@@ -397,17 +390,10 @@ impl DuckLakeTable {
397390
/// # Arguments
398391
/// * `schema_name` - Name of the schema this table belongs to
399392
/// * `writer` - Metadata writer for catalog operations
400-
/// * `data_path` - Base path for data files
401393
#[cfg(feature = "write")]
402-
pub fn with_writer(
403-
mut self,
404-
schema_name: String,
405-
writer: Arc<dyn MetadataWriter>,
406-
data_path: PathBuf,
407-
) -> Self {
394+
pub fn with_writer(mut self, schema_name: String, writer: Arc<dyn MetadataWriter>) -> Self {
408395
self.schema_name = Some(schema_name);
409396
self.writer = Some(writer);
410-
self.data_path = Some(data_path);
411397
self
412398
}
413399

@@ -590,10 +576,6 @@ impl TableProvider for DuckLakeTable {
590576
DataFusionError::Internal("Schema name not set for writable table".to_string())
591577
})?;
592578

593-
let data_path = self.data_path.as_ref().ok_or_else(|| {
594-
DataFusionError::Internal("Data path not set for writable table".to_string())
595-
})?;
596-
597579
let write_mode = match insert_op {
598580
InsertOp::Append => WriteMode::Append,
599581
InsertOp::Overwrite | InsertOp::Replace => WriteMode::Replace,
@@ -606,7 +588,7 @@ impl TableProvider for DuckLakeTable {
606588
self.table_name.clone(),
607589
self.schema(),
608590
write_mode,
609-
data_path.clone(),
591+
self.object_store_url.clone(),
610592
)))
611593
}
612594
}

0 commit comments

Comments
 (0)