Skip to content

Commit 65fb122

Browse files
authored
feat: Add last error API for ffi (#26)
1 parent 1e2f7ba commit 65fb122

4 files changed

Lines changed: 57 additions & 35 deletions

File tree

rust/error.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -53,18 +53,18 @@ pub extern "C" fn lance_last_error_code() -> i32 {
5353
}
5454

5555
#[no_mangle]
56-
pub extern "C" fn lance_last_error_message() -> *mut c_char {
57-
LAST_ERROR.with(|e| match e.borrow().as_ref() {
58-
Some(err) => err.message.clone().into_raw(),
59-
None => ptr::null_mut(),
56+
pub extern "C" fn lance_last_error_message() -> *const c_char {
57+
LAST_ERROR.with(|e| match e.borrow_mut().take() {
58+
Some(err) => err.message.into_raw() as *const c_char,
59+
None => ptr::null(),
6060
})
6161
}
6262

6363
#[no_mangle]
64-
pub unsafe extern "C" fn lance_free_string(s: *mut c_char) {
64+
pub unsafe extern "C" fn lance_free_string(s: *const c_char) {
6565
if !s.is_null() {
6666
unsafe {
67-
let _ = CString::from_raw(s);
67+
let _ = CString::from_raw(s as *mut c_char);
6868
}
6969
}
7070
}

rust/lib.rs

Lines changed: 40 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ pub unsafe extern "C" fn lance_open_dataset(path: *const c_char) -> *mut c_void
3232
match CStr::from_ptr(path).to_str() {
3333
Ok(s) => s,
3434
Err(err) => {
35-
set_last_error(ErrorCode::Utf8, err.to_string());
35+
set_last_error(ErrorCode::Utf8, format!("utf8 decode: {err}"));
3636
return ptr::null_mut();
3737
}
3838
}
@@ -41,11 +41,14 @@ pub unsafe extern "C" fn lance_open_dataset(path: *const c_char) -> *mut c_void
4141
let dataset = match runtime::block_on(Dataset::open(path_str)) {
4242
Ok(Ok(ds)) => Arc::new(ds),
4343
Ok(Err(err)) => {
44-
set_last_error(ErrorCode::DatasetOpen, err.to_string());
44+
set_last_error(
45+
ErrorCode::DatasetOpen,
46+
format!("dataset open '{path_str}': {err}"),
47+
);
4548
return ptr::null_mut();
4649
}
4750
Err(err) => {
48-
set_last_error(ErrorCode::Runtime, err.to_string());
51+
set_last_error(ErrorCode::Runtime, format!("runtime: {err}"));
4952
return ptr::null_mut();
5053
}
5154
};
@@ -107,7 +110,7 @@ pub unsafe extern "C" fn lance_schema_to_arrow(
107110
let ffi_schema = match FFI_ArrowSchema::try_from(&data_type) {
108111
Ok(schema) => schema,
109112
Err(err) => {
110-
set_last_error(ErrorCode::SchemaExport, err.to_string());
113+
set_last_error(ErrorCode::SchemaExport, format!("schema export: {err}"));
111114
return -1;
112115
}
113116
};
@@ -134,7 +137,7 @@ pub unsafe extern "C" fn lance_create_stream(dataset: *mut c_void) -> *mut c_voi
134137
Box::into_raw(Box::new(stream)) as *mut c_void
135138
}
136139
Err(err) => {
137-
set_last_error(ErrorCode::StreamCreate, err.to_string());
140+
set_last_error(ErrorCode::StreamCreate, format!("stream create: {err}"));
138141
ptr::null_mut()
139142
}
140143
}
@@ -193,15 +196,18 @@ pub unsafe extern "C" fn lance_create_fragment_stream(
193196
let fragment_id_usize = match usize::try_from(fragment_id) {
194197
Ok(v) => v,
195198
Err(err) => {
196-
set_last_error(ErrorCode::InvalidArgument, err.to_string());
199+
set_last_error(ErrorCode::InvalidArgument, format!("invalid fragment id: {err}"));
197200
return ptr::null_mut();
198201
}
199202
};
200203

201204
let fragment = match handle.dataset.get_fragment(fragment_id_usize) {
202205
Some(f) => f,
203206
None => {
204-
set_last_error(ErrorCode::FragmentScan, "fragment not found");
207+
set_last_error(
208+
ErrorCode::FragmentScan,
209+
format!("fragment not found: {fragment_id}"),
210+
);
205211
return ptr::null_mut();
206212
}
207213
};
@@ -219,14 +225,14 @@ pub unsafe extern "C" fn lance_create_fragment_stream(
219225
let col_name = match unsafe { CStr::from_ptr(col_ptr) }.to_str() {
220226
Ok(v) => v,
221227
Err(err) => {
222-
set_last_error(ErrorCode::Utf8, err.to_string());
228+
set_last_error(ErrorCode::Utf8, format!("utf8 decode: {err}"));
223229
return ptr::null_mut();
224230
}
225231
};
226232
projection.push(col_name.to_string());
227233
}
228234
if let Err(err) = scan.project(&projection) {
229-
set_last_error(ErrorCode::FragmentScan, err.to_string());
235+
set_last_error(ErrorCode::FragmentScan, format!("fragment scan project: {err}"));
230236
return ptr::null_mut();
231237
}
232238
}
@@ -235,13 +241,13 @@ pub unsafe extern "C" fn lance_create_fragment_stream(
235241
let filter = match unsafe { CStr::from_ptr(filter_sql) }.to_str() {
236242
Ok(v) => v,
237243
Err(err) => {
238-
set_last_error(ErrorCode::Utf8, err.to_string());
244+
set_last_error(ErrorCode::Utf8, format!("utf8 decode: {err}"));
239245
return ptr::null_mut();
240246
}
241247
};
242248
if !filter.is_empty() {
243249
if let Err(err) = scan.filter(filter) {
244-
set_last_error(ErrorCode::FragmentScan, err.to_string());
250+
set_last_error(ErrorCode::FragmentScan, format!("fragment scan filter: {err}"));
245251
return ptr::null_mut();
246252
}
247253
}
@@ -255,33 +261,48 @@ pub unsafe extern "C" fn lance_create_fragment_stream(
255261
Box::into_raw(Box::new(stream)) as *mut c_void
256262
}
257263
Err(err) => {
258-
set_last_error(ErrorCode::StreamCreate, err.to_string());
264+
set_last_error(ErrorCode::StreamCreate, format!("stream create: {err}"));
259265
ptr::null_mut()
260266
}
261267
}
262268
}
263269

264270
#[no_mangle]
265-
pub unsafe extern "C" fn lance_stream_next(stream: *mut c_void) -> *mut c_void {
271+
pub unsafe extern "C" fn lance_stream_next(
272+
stream: *mut c_void,
273+
out_batch: *mut *mut c_void,
274+
) -> i32 {
275+
if out_batch.is_null() {
276+
set_last_error(ErrorCode::InvalidArgument, "out_batch is null");
277+
return -1;
278+
}
279+
unsafe {
280+
ptr::write_unaligned(out_batch, ptr::null_mut());
281+
}
282+
266283
if stream.is_null() {
267284
set_last_error(ErrorCode::InvalidArgument, "stream is null");
268-
return ptr::null_mut();
285+
return -1;
269286
}
270287

271288
let stream = unsafe { &mut *(stream as *mut LanceStream) };
272289

273290
match stream.next() {
274291
Ok(Some(batch)) => {
275292
clear_last_error();
276-
Box::into_raw(Box::new(batch)) as *mut c_void
293+
let batch_ptr = Box::into_raw(Box::new(batch)) as *mut c_void;
294+
unsafe {
295+
ptr::write_unaligned(out_batch, batch_ptr);
296+
}
297+
0
277298
}
278299
Ok(None) => {
279300
clear_last_error();
280-
ptr::null_mut()
301+
1
281302
}
282303
Err(err) => {
283-
set_last_error(ErrorCode::StreamNext, err.to_string());
284-
ptr::null_mut()
304+
set_last_error(ErrorCode::StreamNext, format!("stream next: {err}"));
305+
-1
285306
}
286307
}
287308
}
@@ -336,7 +357,7 @@ pub unsafe extern "C" fn lance_batch_to_arrow(
336357
let schema = match FFI_ArrowSchema::try_from(data.data_type()) {
337358
Ok(schema) => schema,
338359
Err(err) => {
339-
set_last_error(ErrorCode::BatchExport, err.to_string());
360+
set_last_error(ErrorCode::BatchExport, format!("batch export: {err}"));
340361
return -1;
341362
}
342363
};

src/lance_scan.cpp

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -34,12 +34,12 @@ void lance_free_schema(void *schema);
3434
int32_t lance_schema_to_arrow(void *schema, ArrowSchema *out_schema);
3535

3636
void *lance_create_stream(void *dataset);
37-
void *lance_stream_next(void *stream);
37+
int32_t lance_stream_next(void *stream, void **out_batch);
3838
void lance_close_stream(void *stream);
3939

4040
int32_t lance_last_error_code();
41-
char *lance_last_error_message();
42-
void lance_free_string(char *s);
41+
const char *lance_last_error_message();
42+
void lance_free_string(const char *s);
4343

4444
uint64_t *lance_dataset_list_fragments(void *dataset, size_t *out_len);
4545
void lance_free_fragment_list(uint64_t *ptr, size_t len);
@@ -664,16 +664,17 @@ static bool LanceScanLoadNextBatch(LanceScanLocalState &local_state) {
664664
if (!local_state.stream) {
665665
return false;
666666
}
667-
auto *batch = lance_stream_next(local_state.stream);
668-
if (!batch) {
669-
auto err_suffix = LanceFormatErrorSuffix();
670-
if (!err_suffix.empty()) {
671-
throw IOException("Failed to read next Lance RecordBatch" + err_suffix);
672-
}
667+
void *batch = nullptr;
668+
auto rc = lance_stream_next(local_state.stream, &batch);
669+
if (rc == 1) {
673670
lance_close_stream(local_state.stream);
674671
local_state.stream = nullptr;
675672
return false;
676673
}
674+
if (rc != 0) {
675+
throw IOException("Failed to read next Lance RecordBatch" +
676+
LanceFormatErrorSuffix());
677+
}
677678

678679
auto new_chunk = make_shared_ptr<ArrowArrayWrapper>();
679680
ArrowSchema tmp_schema;

test/sql/lance.test

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ SELECT * FROM 'test/test_data.lance' LIMIT 1
7070
statement error
7171
SELECT * FROM lance_scan('dummy_path.lance')
7272
----
73-
Lance error:
73+
IO Error: Failed to open Lance dataset: dummy_path.lance (Lance error: dataset open 'dummy_path.lance':
7474

7575
# Test error handling - requires at least one argument
7676
statement error

0 commit comments

Comments
 (0)