Skip to content

Commit 1e2f7ba

Browse files
authored
refactor: Handling ffi errors correctly (#25)
1 parent 96a8c08 commit 1e2f7ba

5 files changed

Lines changed: 211 additions & 34 deletions

File tree

rust/error.rs

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
use std::cell::RefCell;
2+
use std::ffi::{c_char, CString};
3+
use std::ptr;
4+
5+
#[repr(i32)]
6+
#[derive(Clone, Copy)]
7+
pub enum ErrorCode {
8+
InvalidArgument = 1,
9+
Utf8 = 2,
10+
Runtime = 3,
11+
DatasetOpen = 4,
12+
FragmentScan = 6,
13+
StreamCreate = 7,
14+
StreamNext = 8,
15+
SchemaExport = 9,
16+
BatchExport = 10,
17+
}
18+
19+
struct LastError {
20+
code: i32,
21+
message: CString,
22+
}
23+
24+
thread_local! {
25+
static LAST_ERROR: RefCell<Option<LastError>> = RefCell::new(None);
26+
}
27+
28+
fn sanitize_message(message: &str) -> CString {
29+
match CString::new(message) {
30+
Ok(v) => v,
31+
Err(_) => CString::new(message.replace('\0', "\\0"))
32+
.unwrap_or_else(|_| CString::new("invalid error message").unwrap()),
33+
}
34+
}
35+
36+
pub fn clear_last_error() {
37+
LAST_ERROR.with(|e| {
38+
*e.borrow_mut() = None;
39+
});
40+
}
41+
42+
pub fn set_last_error(code: ErrorCode, message: impl AsRef<str>) {
43+
let code = code as i32;
44+
let message = sanitize_message(message.as_ref());
45+
LAST_ERROR.with(|e| {
46+
*e.borrow_mut() = Some(LastError { code, message });
47+
});
48+
}
49+
50+
#[no_mangle]
51+
pub extern "C" fn lance_last_error_code() -> i32 {
52+
LAST_ERROR.with(|e| e.borrow().as_ref().map(|v| v.code).unwrap_or(0))
53+
}
54+
55+
#[no_mangle]
56+
pub extern "C" fn lance_last_error_message() -> *mut c_char {
57+
LAST_ERROR.with(|e| match e.borrow().as_ref() {
58+
Some(err) => err.message.clone().into_raw(),
59+
None => ptr::null_mut(),
60+
})
61+
}
62+
63+
#[no_mangle]
64+
pub unsafe extern "C" fn lance_free_string(s: *mut c_char) {
65+
if !s.is_null() {
66+
unsafe {
67+
let _ = CString::from_raw(s);
68+
}
69+
}
70+
}

rust/lib.rs

Lines changed: 87 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,10 @@ use lance::Dataset;
1111

1212
mod runtime;
1313
mod scanner;
14+
mod error;
1415

1516
use scanner::LanceStream;
17+
use error::{clear_last_error, set_last_error, ErrorCode};
1618

1719
// Dataset operations - just holds the dataset
1820
struct DatasetHandle {
@@ -22,22 +24,34 @@ struct DatasetHandle {
2224
#[no_mangle]
2325
pub unsafe extern "C" fn lance_open_dataset(path: *const c_char) -> *mut c_void {
2426
if path.is_null() {
27+
set_last_error(ErrorCode::InvalidArgument, "path is null");
2528
return ptr::null_mut();
2629
}
2730

2831
let path_str = unsafe {
2932
match CStr::from_ptr(path).to_str() {
3033
Ok(s) => s,
31-
Err(_) => return ptr::null_mut(),
34+
Err(err) => {
35+
set_last_error(ErrorCode::Utf8, err.to_string());
36+
return ptr::null_mut();
37+
}
3238
}
3339
};
3440

3541
let dataset = match runtime::block_on(Dataset::open(path_str)) {
3642
Ok(Ok(ds)) => Arc::new(ds),
37-
_ => return ptr::null_mut(),
43+
Ok(Err(err)) => {
44+
set_last_error(ErrorCode::DatasetOpen, err.to_string());
45+
return ptr::null_mut();
46+
}
47+
Err(err) => {
48+
set_last_error(ErrorCode::Runtime, err.to_string());
49+
return ptr::null_mut();
50+
}
3851
};
3952

4053
let handle = Box::new(DatasetHandle { dataset });
54+
clear_last_error();
4155

4256
Box::into_raw(handle) as *mut c_void
4357
}
@@ -55,6 +69,7 @@ pub unsafe extern "C" fn lance_close_dataset(dataset: *mut c_void) {
5569
#[no_mangle]
5670
pub unsafe extern "C" fn lance_get_schema(dataset: *mut c_void) -> *mut c_void {
5771
if dataset.is_null() {
72+
set_last_error(ErrorCode::InvalidArgument, "dataset is null");
5873
return ptr::null_mut();
5974
}
6075

@@ -63,6 +78,7 @@ pub unsafe extern "C" fn lance_get_schema(dataset: *mut c_void) -> *mut c_void {
6378

6479
let arrow_schema: Schema = schema.into();
6580

81+
clear_last_error();
6682
Box::into_raw(Box::new(Arc::new(arrow_schema))) as *mut c_void
6783
}
6884

@@ -81,6 +97,7 @@ pub unsafe extern "C" fn lance_schema_to_arrow(
8197
out_schema: *mut FFI_ArrowSchema,
8298
) -> i32 {
8399
if schema.is_null() || out_schema.is_null() {
100+
set_last_error(ErrorCode::InvalidArgument, "schema or out_schema is null");
84101
return -1;
85102
}
86103

@@ -89,26 +106,37 @@ pub unsafe extern "C" fn lance_schema_to_arrow(
89106

90107
let ffi_schema = match FFI_ArrowSchema::try_from(&data_type) {
91108
Ok(schema) => schema,
92-
Err(_) => return -1,
109+
Err(err) => {
110+
set_last_error(ErrorCode::SchemaExport, err.to_string());
111+
return -1;
112+
}
93113
};
94114

95115
std::ptr::write_unaligned(out_schema, ffi_schema);
116+
clear_last_error();
96117
0
97118
}
98119

99120
// Stream operations
100121
#[no_mangle]
101122
pub unsafe extern "C" fn lance_create_stream(dataset: *mut c_void) -> *mut c_void {
102123
if dataset.is_null() {
124+
set_last_error(ErrorCode::InvalidArgument, "dataset is null");
103125
return ptr::null_mut();
104126
}
105127

106128
let handle = unsafe { &*(dataset as *const DatasetHandle) };
107129

108130
let scanner = handle.dataset.scan();
109131
match LanceStream::from_scanner(scanner) {
110-
Ok(stream) => Box::into_raw(Box::new(stream)) as *mut c_void,
111-
Err(_) => ptr::null_mut(),
132+
Ok(stream) => {
133+
clear_last_error();
134+
Box::into_raw(Box::new(stream)) as *mut c_void
135+
}
136+
Err(err) => {
137+
set_last_error(ErrorCode::StreamCreate, err.to_string());
138+
ptr::null_mut()
139+
}
112140
}
113141
}
114142

@@ -118,6 +146,7 @@ pub unsafe extern "C" fn lance_dataset_list_fragments(
118146
out_len: *mut usize,
119147
) -> *mut u64 {
120148
if dataset.is_null() || out_len.is_null() {
149+
set_last_error(ErrorCode::InvalidArgument, "dataset or out_len is null");
121150
return ptr::null_mut();
122151
}
123152

@@ -132,6 +161,7 @@ pub unsafe extern "C" fn lance_dataset_list_fragments(
132161
unsafe {
133162
ptr::write_unaligned(out_len, len);
134163
}
164+
clear_last_error();
135165
data
136166
}
137167

@@ -155,18 +185,25 @@ pub unsafe extern "C" fn lance_create_fragment_stream(
155185
filter_sql: *const c_char,
156186
) -> *mut c_void {
157187
if dataset.is_null() {
188+
set_last_error(ErrorCode::InvalidArgument, "dataset is null");
158189
return ptr::null_mut();
159190
}
160191

161192
let handle = unsafe { &*(dataset as *const DatasetHandle) };
162193
let fragment_id_usize = match usize::try_from(fragment_id) {
163194
Ok(v) => v,
164-
Err(_) => return ptr::null_mut(),
195+
Err(err) => {
196+
set_last_error(ErrorCode::InvalidArgument, err.to_string());
197+
return ptr::null_mut();
198+
}
165199
};
166200

167201
let fragment = match handle.dataset.get_fragment(fragment_id_usize) {
168202
Some(f) => f,
169-
None => return ptr::null_mut(),
203+
None => {
204+
set_last_error(ErrorCode::FragmentScan, "fragment not found");
205+
return ptr::null_mut();
206+
}
170207
};
171208

172209
let mut scan = fragment.scan();
@@ -176,48 +213,76 @@ pub unsafe extern "C" fn lance_create_fragment_stream(
176213
for idx in 0..columns_len {
177214
let col_ptr = unsafe { *columns.add(idx) };
178215
if col_ptr.is_null() {
216+
set_last_error(ErrorCode::InvalidArgument, "column name is null");
179217
return ptr::null_mut();
180218
}
181219
let col_name = match unsafe { CStr::from_ptr(col_ptr) }.to_str() {
182220
Ok(v) => v,
183-
Err(_) => return ptr::null_mut(),
221+
Err(err) => {
222+
set_last_error(ErrorCode::Utf8, err.to_string());
223+
return ptr::null_mut();
224+
}
184225
};
185226
projection.push(col_name.to_string());
186227
}
187-
if scan.project(&projection).is_err() {
228+
if let Err(err) = scan.project(&projection) {
229+
set_last_error(ErrorCode::FragmentScan, err.to_string());
188230
return ptr::null_mut();
189231
}
190232
}
191233

192234
if !filter_sql.is_null() {
193235
let filter = match unsafe { CStr::from_ptr(filter_sql) }.to_str() {
194236
Ok(v) => v,
195-
Err(_) => return ptr::null_mut(),
237+
Err(err) => {
238+
set_last_error(ErrorCode::Utf8, err.to_string());
239+
return ptr::null_mut();
240+
}
196241
};
197-
if !filter.is_empty() && scan.filter(filter).is_err() {
198-
return ptr::null_mut();
242+
if !filter.is_empty() {
243+
if let Err(err) = scan.filter(filter) {
244+
set_last_error(ErrorCode::FragmentScan, err.to_string());
245+
return ptr::null_mut();
246+
}
199247
}
200248
}
201249

202250
scan.scan_in_order(false);
203251

204252
match LanceStream::from_scanner(scan) {
205-
Ok(stream) => Box::into_raw(Box::new(stream)) as *mut c_void,
206-
Err(_) => ptr::null_mut(),
253+
Ok(stream) => {
254+
clear_last_error();
255+
Box::into_raw(Box::new(stream)) as *mut c_void
256+
}
257+
Err(err) => {
258+
set_last_error(ErrorCode::StreamCreate, err.to_string());
259+
ptr::null_mut()
260+
}
207261
}
208262
}
209263

210264
#[no_mangle]
211265
pub unsafe extern "C" fn lance_stream_next(stream: *mut c_void) -> *mut c_void {
212266
if stream.is_null() {
267+
set_last_error(ErrorCode::InvalidArgument, "stream is null");
213268
return ptr::null_mut();
214269
}
215270

216271
let stream = unsafe { &mut *(stream as *mut LanceStream) };
217272

218273
match stream.next() {
219-
Some(batch) => Box::into_raw(Box::new(batch)) as *mut c_void,
220-
None => ptr::null_mut(),
274+
Ok(Some(batch)) => {
275+
clear_last_error();
276+
Box::into_raw(Box::new(batch)) as *mut c_void
277+
}
278+
Ok(None) => {
279+
clear_last_error();
280+
ptr::null_mut()
281+
}
282+
Err(err) => {
283+
set_last_error(ErrorCode::StreamNext, err.to_string());
284+
ptr::null_mut()
285+
}
221286
}
222287
}
223288

@@ -257,6 +322,7 @@ pub unsafe extern "C" fn lance_batch_to_arrow(
257322
out_schema: *mut FFI_ArrowSchema,
258323
) -> i32 {
259324
if batch.is_null() || out_array.is_null() || out_schema.is_null() {
325+
set_last_error(ErrorCode::InvalidArgument, "batch or out pointer is null");
260326
return -1;
261327
}
262328

@@ -269,11 +335,15 @@ pub unsafe extern "C" fn lance_batch_to_arrow(
269335
let array = FFI_ArrowArray::new(&data);
270336
let schema = match FFI_ArrowSchema::try_from(data.data_type()) {
271337
Ok(schema) => schema,
272-
Err(_) => return -1,
338+
Err(err) => {
339+
set_last_error(ErrorCode::BatchExport, err.to_string());
340+
return -1;
341+
}
273342
};
274343

275344
std::ptr::write_unaligned(out_array, array);
276345
std::ptr::write_unaligned(out_schema, schema);
277346

347+
clear_last_error();
278348
0
279349
}

rust/scanner.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,14 @@ impl LanceStream {
2424
}
2525

2626
/// Get the next batch from the stream
27-
pub fn next(&mut self) -> Option<RecordBatch> {
27+
pub fn next(&mut self) -> Result<Option<RecordBatch>, lance::Error> {
2828
use futures::StreamExt;
2929

3030
self.handle.block_on(async {
3131
match self.stream.next().await {
32-
Some(Ok(batch)) => Some(batch),
33-
_ => None,
32+
Some(Ok(batch)) => Ok(Some(batch)),
33+
Some(Err(err)) => Err(err),
34+
None => Ok(None),
3435
}
3536
})
3637
}

0 commit comments

Comments
 (0)