Skip to content

Commit 19eebcd

Browse files
Some fixes
1 parent e2293a3 commit 19eebcd

2 files changed

Lines changed: 105 additions & 7 deletions

File tree

src/metadata_writer_sqlite.rs

Lines changed: 81 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,33 @@
11
//! SQLite metadata writer for DuckLake catalogs.
2+
//!
3+
//! This module provides [`SqliteMetadataWriter`], an implementation of the
4+
//! [`MetadataWriter`] trait using SQLite as the catalog backend.
5+
//!
6+
//! # Runtime Requirements
7+
//!
8+
//! This implementation uses `tokio::task::block_in_place` internally, which requires
9+
//! a **multi-threaded Tokio runtime**. Using `#[tokio::test]` without
10+
//! `flavor = "multi_thread"` will panic.
11+
//!
12+
//! # Example
13+
//!
14+
//! ```ignore
15+
//! use datafusion_ducklake::SqliteMetadataWriter;
16+
//!
17+
//! // Create and initialize a new catalog
18+
//! let writer = SqliteMetadataWriter::new_with_init("sqlite:catalog.db?mode=rwc").await?;
19+
//! writer.set_data_path("/path/to/data")?;
20+
//! ```
221
322
use crate::Result;
423
use crate::metadata_provider::block_on;
524
use crate::metadata_writer::{ColumnDef, DataFileInfo, MetadataWriter};
625
use sqlx::Row;
726
use sqlx::sqlite::{SqlitePool, SqlitePoolOptions};
827

28+
/// Default maximum number of connections in the pool.
29+
const DEFAULT_MAX_CONNECTIONS: u32 = 5;
30+
931
/// SQL to create DuckLake schema tables
1032
const SQL_CREATE_SCHEMA: &str = r#"
1133
CREATE TABLE IF NOT EXISTS ducklake_metadata (
@@ -78,6 +100,19 @@ CREATE TABLE IF NOT EXISTS ducklake_delete_file (
78100
"#;
79101

80102
/// SQLite-based metadata writer for DuckLake catalogs.
103+
///
104+
/// This writer manages DuckLake catalog metadata stored in a SQLite database,
105+
/// including snapshots, schemas, tables, columns, and data file registrations.
106+
///
107+
/// # Thread Safety
108+
///
109+
/// This type is `Clone`, `Send`, and `Sync`. The underlying connection pool
110+
/// handles concurrent access safely.
111+
///
112+
/// # Connection Pool
113+
///
114+
/// Uses a connection pool with configurable size (default: 5 connections).
115+
/// For embedded single-threaded use cases, consider using `with_max_connections(1)`.
81116
#[derive(Debug, Clone)]
82117
pub struct SqliteMetadataWriter {
83118
pool: SqlitePool,
@@ -86,11 +121,43 @@ pub struct SqliteMetadataWriter {
86121
impl SqliteMetadataWriter {
87122
/// Creates a new writer for a DuckLake catalog.
88123
///
89-
/// Connection string format: `sqlite:///path/to/catalog.db?mode=rwc`
90-
/// Use `?mode=rwc` to create the file if it doesn't exist.
124+
/// # Connection String Format
125+
///
126+
/// - `sqlite:path/to/catalog.db` - Open existing database
127+
/// - `sqlite:path/to/catalog.db?mode=rwc` - Create if not exists
128+
/// - `sqlite::memory:` - In-memory database (for testing)
129+
///
130+
/// # Example
131+
///
132+
/// ```ignore
133+
/// let writer = SqliteMetadataWriter::new("sqlite:catalog.db?mode=rwc").await?;
134+
/// ```
91135
pub async fn new(connection_string: &str) -> Result<Self> {
136+
Self::with_max_connections(connection_string, DEFAULT_MAX_CONNECTIONS).await
137+
}
138+
139+
/// Creates a new writer with a custom connection pool size.
140+
///
141+
/// # Arguments
142+
///
143+
/// * `connection_string` - SQLite connection string
144+
/// * `max_connections` - Maximum number of connections in the pool
145+
///
146+
/// # Example
147+
///
148+
/// ```ignore
149+
/// // Single connection for embedded use
150+
/// let writer = SqliteMetadataWriter::with_max_connections(
151+
/// "sqlite:catalog.db?mode=rwc",
152+
/// 1
153+
/// ).await?;
154+
/// ```
155+
pub async fn with_max_connections(
156+
connection_string: &str,
157+
max_connections: u32,
158+
) -> Result<Self> {
92159
let pool = SqlitePoolOptions::new()
93-
.max_connections(5)
160+
.max_connections(max_connections)
94161
.connect(connection_string)
95162
.await?;
96163

@@ -99,7 +166,17 @@ impl SqliteMetadataWriter {
99166
})
100167
}
101168

102-
/// Creates a new writer and initializes the schema if needed.
169+
/// Creates a new writer and initializes the DuckLake schema tables.
170+
///
171+
/// This is a convenience method that combines `new()` and `initialize_schema()`.
172+
/// Use this when creating a new catalog from scratch.
173+
///
174+
/// # Example
175+
///
176+
/// ```ignore
177+
/// let writer = SqliteMetadataWriter::new_with_init("sqlite:catalog.db?mode=rwc").await?;
178+
/// writer.set_data_path("/data/warehouse")?;
179+
/// ```
103180
pub async fn new_with_init(connection_string: &str) -> Result<Self> {
104181
let writer = Self::new(connection_string).await?;
105182
writer.initialize_schema()?;

src/table_writer.rs

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -317,7 +317,7 @@ impl DuckLakeTableWriter {
317317

318318
/// A streaming write session for writing batches incrementally to a DuckLake table.
319319
///
320-
/// Created by `DuckLakeTableWriter::begin_write()` or `begin_write_to_path()`.
320+
/// Created by [`DuckLakeTableWriter::begin_write()`] or [`DuckLakeTableWriter::begin_write_to_path()`].
321321
///
322322
/// The session holds the Parquet writer and catalog metadata. Batches are written
323323
/// incrementally via `write_batch()`, and the file is registered in the catalog
@@ -329,8 +329,29 @@ impl DuckLakeTableWriter {
329329
/// begin_write() → write_batch()* → finish()
330330
/// ```
331331
///
332-
/// If the session is dropped without calling `finish()`, the file is written but
333-
/// NOT registered in the catalog (effectively discarded).
332+
/// # Error Handling and Cleanup
333+
///
334+
/// - **Dropped without `finish()`**: The Parquet file may be partially written but is
335+
/// NOT registered in the catalog. The orphaned file remains on disk and should be
336+
/// cleaned up by the caller or a separate garbage collection process.
337+
///
338+
/// - **Error during `write_batch()`**: The session remains valid and you can continue
339+
/// writing. The Parquet file may contain partial data.
340+
///
341+
/// - **Error during `finish()`**: The Parquet file is closed but may not be registered
342+
/// in the catalog. The file remains on disk.
343+
///
344+
/// For robust error handling in production, consider:
345+
/// 1. Wrapping writes in a try block
346+
/// 2. Implementing cleanup logic for orphaned files
347+
/// 3. Using the file path from `file_path()` to clean up on error
348+
///
349+
/// # Atomicity
350+
///
351+
/// The catalog registration in `finish()` is the commit point. Until `finish()`
352+
/// completes successfully, the data is not visible to readers. However, the
353+
/// snapshot, schema, table, and columns are created during `begin_write()`,
354+
/// so a failed write may leave empty catalog entries.
334355
#[derive(Debug)]
335356
pub struct TableWriteSession {
336357
metadata: Arc<dyn MetadataWriter>,

0 commit comments

Comments
 (0)