Skip to content

Commit 07dbdb4

Browse files
authored
feat: add lance_dataset_write for create/append/overwrite from ArrowArrayStream (#16)
## Summary - Adds `lance_dataset_write(uri, schema, stream, mode, storage_opts, out_dataset)` — writes an `ArrowArrayStream` into a Lance dataset with a committed manifest - `LanceWriteMode` covers `CREATE` / `APPEND` / `OVERWRITE` - Optional `out_dataset` hands back an open `LanceDataset*` at the new version so callers don't need to reopen - Matching `lance::Dataset::write(...)` static method in `lance.hpp` with full RAII (`StreamGuard` + `SchemaGuard`) ## Motivation Until now the C/C++ path only produced uncommitted fragment files (#5). `lance_dataset_write` closes the primary write path and unblocks the rest of Phase 3 (delete, update, merge-insert, schema evolution), which all need a way to create a dataset first. ## FFI contract notes - `mode` parameter is `int32_t` (not `LanceWriteMode`) on the wire — defends against `-fshort-enums` ABI mismatch. Validated in Rust via `LanceWriteMode::from_raw` before any unsafe enum construction. - Stream is consumed via `ArrowArrayStreamReader::from_raw` **before** uri/schema NULL checks, so the "consumed on every return path" contract holds for every error branch — verified by `test_dataset_write_releases_stream_on_every_error_path` (drop-counter on the boxed reader). - Schema is read by shared reference; the function does NOT call `schema->release`. Caller (or C++ `SchemaGuard`) retains ownership. Documented in the header. - `*out_dataset` is written only on success; error paths leave it untouched. Verified by sentinel-pointer test. - C++ wrapper builds `SchemaGuard` BEFORE `get_schema` so a non-conforming producer that partially populates the schema before reporting failure still has its `release` fired on unwind. `StreamGuard` covers `std::bad_alloc` during `kv` construction; `disarm()`s right before the C call. ## Test plan - `cargo test` — 87 integration tests, 13 covering the writer (CREATE/APPEND/OVERWRITE happy paths, OVERWRITE on a missing path, CREATE on an existing path, declared/append schema mismatches, empty stream, NULL args, invalid mode, `out_dataset` propagation, stream-release-on-every-error-path, out_dataset-untouched-on-error) - `cargo clippy --all-targets -- -D warnings` clean - `cargo fmt --check` clean - `cargo test --test compile_and_run_test -- --ignored` — C and C++ scan→write round-trips pass Closes #14.
1 parent a663d53 commit 07dbdb4

8 files changed

Lines changed: 1219 additions & 12 deletions

File tree

include/lance.h

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -481,6 +481,62 @@ int32_t lance_scanner_full_text_search(
481481
uint32_t max_fuzzy_distance
482482
);
483483

484+
/* ─── Dataset writer ─── */
485+
486+
/**
487+
* Write mode for lance_dataset_write. Values are ABI-stable.
488+
*
489+
* The `mode` parameter on the FFI call is a fixed-width int32_t — not this
490+
* enum type — so callers built with `-fshort-enums` or non-default enum
491+
* sizing cannot mismatch the Rust ABI. The Rust implementation validates the
492+
* received integer and rejects any out-of-range value with
493+
* LANCE_ERR_INVALID_ARGUMENT.
494+
*/
495+
typedef enum {
496+
LANCE_WRITE_CREATE = 0, /* Create new dataset; fail if path exists. */
497+
LANCE_WRITE_APPEND = 1, /* Append; fail if the new schema is incompatible. */
498+
LANCE_WRITE_OVERWRITE = 2, /* Overwrite existing, or create if missing. */
499+
} LanceWriteMode;
500+
501+
/**
502+
* Write an Arrow record batch stream to a Lance dataset at `uri`, committing
503+
* a manifest.
504+
*
505+
* @param uri Dataset URI (file://, s3://, memory://, etc.). Must not
506+
* be NULL or an empty string.
507+
* @param schema Required Arrow schema. The stream schema must match or
508+
* the call fails with LANCE_ERR_INVALID_ARGUMENT. This
509+
* function does NOT call schema->release; the caller
510+
* retains ownership and must release the schema after the
511+
* call returns (success or failure).
512+
* @param stream Arrow C Data Interface stream consumed by this call.
513+
* Do not use the stream after returning, regardless of
514+
* the return code.
515+
* @param mode CREATE / APPEND / OVERWRITE (see LanceWriteMode).
516+
* @param storage_opts NULL-terminated key-value pairs ["k","v",NULL], or NULL.
517+
* @param out_dataset If non-NULL, on success receives an open LanceDataset*
518+
* at the newly-committed version (caller must
519+
* lance_dataset_close it). Pass NULL to discard. On error
520+
* *out_dataset is left unchanged — do not read or free it.
521+
* On entry `*out_dataset` should be NULL or a pointer
522+
* whose previous value is no longer needed; this function
523+
* overwrites the slot on success without releasing any
524+
* prior handle.
525+
* @return 0 on success, -1 on error. Possible error codes include
526+
* LANCE_ERR_DATASET_ALREADY_EXISTS (CREATE on an existing path),
527+
* LANCE_ERR_INVALID_ARGUMENT (NULL/empty args, invalid mode,
528+
* schema mismatch),
529+
* LANCE_ERR_COMMIT_CONFLICT (concurrent writer).
530+
*/
531+
int32_t lance_dataset_write(
532+
const char* uri,
533+
const struct ArrowSchema* schema,
534+
struct ArrowArrayStream* stream,
535+
int32_t mode,
536+
const char* const* storage_opts,
537+
LanceDataset** out_dataset
538+
);
539+
484540
#ifdef __cplusplus
485541
} /* extern "C" */
486542
#endif

include/lance.hpp

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
#include "lance.h"
1919

20+
#include <cstdint>
2021
#include <memory>
2122
#include <stdexcept>
2223
#include <string>
@@ -94,6 +95,14 @@ struct VersionInfo {
9495
int64_t timestamp_ms;
9596
};
9697

98+
// ─── Write mode ──────────────────────────────────────────────────────────────
99+
100+
enum class WriteMode : int32_t {
101+
Create = LANCE_WRITE_CREATE,
102+
Append = LANCE_WRITE_APPEND,
103+
Overwrite = LANCE_WRITE_OVERWRITE,
104+
};
105+
97106
// ─── Dataset ─────────────────────────────────────────────────────────────────
98107

99108
class Dataset {
@@ -122,6 +131,118 @@ class Dataset {
122131
return Dataset(ds);
123132
}
124133

134+
/// Write an Arrow record batch stream to a Lance dataset and return the
135+
/// open dataset at the committed version.
136+
///
137+
/// The stream must be self-describing; its own schema is used. Treat the
138+
/// stream as consumed once this call returns or throws — do not reuse it.
139+
/// Throws lance::Error on failure (including if `stream` is null).
140+
static Dataset write(
141+
const std::string& uri,
142+
ArrowArrayStream* stream,
143+
WriteMode mode,
144+
const std::vector<std::pair<std::string, std::string>>& storage_opts = {}) {
145+
146+
if (stream == nullptr) {
147+
throw Error(LANCE_ERR_INVALID_ARGUMENT, "stream must not be null");
148+
}
149+
150+
// RAII guard for the stream. Until `lance_dataset_write` is called,
151+
// any exception (failed `get_schema`, `std::bad_alloc` while building
152+
// `kv`, etc.) must release the stream. After that call Rust owns it,
153+
// so we `disarm()` immediately before invoking the C API.
154+
struct StreamGuard {
155+
ArrowArrayStream* s;
156+
bool armed = true;
157+
// Explicit constructor: `= delete`d copy/move ctors disqualify
158+
// this from being an aggregate under C++20, so brace-init like
159+
// `StreamGuard{stream}` would otherwise fail to compile there.
160+
explicit StreamGuard(ArrowArrayStream* p) noexcept : s(p) {}
161+
~StreamGuard() noexcept {
162+
if (armed && s && s->release) s->release(s);
163+
}
164+
void disarm() noexcept { armed = false; }
165+
StreamGuard(const StreamGuard&) = delete;
166+
StreamGuard& operator=(const StreamGuard&) = delete;
167+
StreamGuard(StreamGuard&&) = delete;
168+
StreamGuard& operator=(StreamGuard&&) = delete;
169+
} stream_guard{stream};
170+
171+
// Defensive: a non-conforming or already-released producer may have a
172+
// null `get_schema`. Without this guard a bad caller would crash with
173+
// a null function-pointer dereference on the next line.
174+
if (stream->get_schema == nullptr) {
175+
throw Error(LANCE_ERR_INVALID_ARGUMENT,
176+
"stream get_schema callback is null");
177+
}
178+
179+
// Arm SchemaGuard before calling `get_schema` so a non-conforming
180+
// producer that partially populates the schema before returning an
181+
// error still has its `release` fired on unwind. The zero-init keeps
182+
// the destructor a no-op on the clean-error path (release == null).
183+
struct SchemaGuard {
184+
ArrowSchema* s;
185+
// Explicit constructor for the same C++20 aggregate-init reason
186+
// documented on StreamGuard above.
187+
explicit SchemaGuard(ArrowSchema* p) noexcept : s(p) {}
188+
~SchemaGuard() noexcept {
189+
if (s && s->release) s->release(s);
190+
}
191+
SchemaGuard(const SchemaGuard&) = delete;
192+
SchemaGuard& operator=(const SchemaGuard&) = delete;
193+
SchemaGuard(SchemaGuard&&) = delete;
194+
SchemaGuard& operator=(SchemaGuard&&) = delete;
195+
};
196+
ArrowSchema schema = {};
197+
SchemaGuard schema_guard{&schema};
198+
199+
// On failure, StreamGuard releases the stream and SchemaGuard
200+
// releases any partial schema state — preserving the "consumed on
201+
// return or throw" contract for both resources.
202+
if (stream->get_schema(stream, &schema) != 0) {
203+
const char* err = stream->get_last_error
204+
? stream->get_last_error(stream)
205+
: nullptr;
206+
std::string msg = std::string("failed to read stream schema: ") +
207+
(err ? err : "unknown");
208+
throw Error(LANCE_ERR_INVALID_ARGUMENT, msg);
209+
}
210+
211+
std::vector<const char*> kv;
212+
for (auto& [k, v] : storage_opts) {
213+
kv.push_back(k.c_str());
214+
kv.push_back(v.c_str());
215+
}
216+
kv.push_back(nullptr);
217+
const char* const* opts_ptr =
218+
storage_opts.empty() ? nullptr : kv.data();
219+
220+
// The C API consumes the stream on every return path, so disarm the
221+
// guard before calling. After this point the stream pointer is logically
222+
// owned by Rust and any C++-side exception must not re-release it.
223+
stream_guard.disarm();
224+
225+
LanceDataset* out = nullptr;
226+
int32_t rc = lance_dataset_write(
227+
uri.c_str(),
228+
&schema,
229+
stream,
230+
static_cast<int32_t>(mode),
231+
opts_ptr,
232+
&out);
233+
if (rc != 0) check_error();
234+
// Defensive null guard: a conforming Rust impl never returns rc == 0
235+
// with `out == nullptr`, but constructing a Dataset around a null
236+
// handle would silently crash on the first method call. Throw
237+
// explicitly rather than going through `check_error()` because the
238+
// thread-local code is `LANCE_OK` on this path (rc == 0).
239+
if (!out) {
240+
throw Error(LANCE_ERR_INTERNAL,
241+
"lance_dataset_write returned success with null out_dataset");
242+
}
243+
return Dataset(out);
244+
}
245+
125246
/// Number of rows in the dataset.
126247
uint64_t count_rows() const {
127248
uint64_t n = lance_dataset_count_rows(handle_.get());

src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ mod index;
2525
pub mod runtime;
2626
mod scanner;
2727
mod versions;
28+
mod writer;
2829

2930
// Re-export all extern "C" symbols so they appear in the cdylib.
3031
pub use batch::*;
@@ -36,3 +37,4 @@ pub use fragment_writer::*;
3637
pub use index::*;
3738
pub use scanner::*;
3839
pub use versions::*;
40+
pub use writer::*;

0 commit comments

Comments
 (0)