Skip to content

Commit 7af2176

Browse files
authored
Fix intermittent SIGSEGV in cross-platform FFI plugin tests (drasi-project#376)
ffi hardening
1 parent 2b4cd05 commit 7af2176

7 files changed

Lines changed: 650 additions & 338 deletions

File tree

components/host-sdk/src/identity_bridge.rs

Lines changed: 51 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -44,57 +44,66 @@ impl IdentityProviderVtableBuilder {
4444
context_json: *const u8,
4545
context_len: usize,
4646
) -> FfiCredentialsResult {
47-
let wrapper = unsafe { &*(state as *const HostIdentityProviderState) };
48-
let provider = wrapper.provider.clone();
47+
std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
48+
let wrapper = unsafe { &*(state as *const HostIdentityProviderState) };
49+
let provider = wrapper.provider.clone();
4950

50-
// Deserialize context from JSON
51-
let context = if context_json.is_null() || context_len == 0 {
52-
CredentialContext::default()
53-
} else {
54-
let json_bytes = unsafe { std::slice::from_raw_parts(context_json, context_len) };
55-
let json_str = std::str::from_utf8(json_bytes).unwrap_or("{}");
56-
let properties: std::collections::HashMap<String, String> =
57-
match serde_json::from_str(json_str) {
58-
Ok(p) => p,
59-
Err(e) => {
60-
log::warn!("Failed to deserialize credential context JSON: {e}");
61-
std::collections::HashMap::new()
62-
}
63-
};
64-
CredentialContext { properties }
65-
};
51+
// Deserialize context from JSON
52+
let context = if context_json.is_null() || context_len == 0 {
53+
CredentialContext::default()
54+
} else {
55+
let json_bytes =
56+
unsafe { std::slice::from_raw_parts(context_json, context_len) };
57+
let json_str = std::str::from_utf8(json_bytes).unwrap_or("{}");
58+
let properties: std::collections::HashMap<String, String> =
59+
match serde_json::from_str(json_str) {
60+
Ok(p) => p,
61+
Err(e) => {
62+
log::warn!("Failed to deserialize credential context JSON: {e}");
63+
std::collections::HashMap::new()
64+
}
65+
};
66+
CredentialContext { properties }
67+
};
6668

67-
// This vtable is called from the plugin side (extern "C") which may
68-
// already be inside a tokio runtime. We spawn a thread and create a
69-
// lightweight current-thread runtime to avoid nesting runtimes.
70-
let result = std::thread::spawn(move || {
71-
let rt = tokio::runtime::Builder::new_current_thread()
72-
.enable_all()
73-
.build()
74-
.map_err(|e| anyhow::anyhow!("Failed to create runtime: {e}"))?;
75-
rt.block_on(provider.get_credentials(&context))
76-
})
77-
.join();
69+
// This vtable is called from the plugin side (extern "C") which may
70+
// already be inside a tokio runtime. We spawn a thread and create a
71+
// lightweight current-thread runtime to avoid nesting runtimes.
72+
let result = std::thread::spawn(move || {
73+
let rt = tokio::runtime::Builder::new_current_thread()
74+
.enable_all()
75+
.build()
76+
.map_err(|e| anyhow::anyhow!("Failed to create runtime: {e}"))?;
77+
rt.block_on(provider.get_credentials(&context))
78+
})
79+
.join();
7880

79-
match result {
80-
Ok(Ok(creds)) => FfiCredentialsResult::ok(credentials_to_ffi(creds)),
81-
Ok(Err(e)) => FfiCredentialsResult::err(e.to_string()),
82-
Err(_) => FfiCredentialsResult::err("get_credentials thread panicked".into()),
83-
}
81+
match result {
82+
Ok(Ok(creds)) => FfiCredentialsResult::ok(credentials_to_ffi(creds)),
83+
Ok(Err(e)) => FfiCredentialsResult::err(e.to_string()),
84+
Err(_) => FfiCredentialsResult::err("get_credentials thread panicked".into()),
85+
}
86+
}))
87+
.unwrap_or_else(|_| FfiCredentialsResult::err("get_credentials_fn panicked".into()))
8488
}
8589

8690
extern "C" fn clone_fn(state: *const c_void) -> *mut c_void {
87-
let wrapper = unsafe { &*(state as *const HostIdentityProviderState) };
88-
let cloned = Box::new(HostIdentityProviderState {
89-
provider: wrapper.provider.clone(),
90-
});
91-
Box::into_raw(cloned) as *mut c_void
91+
std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
92+
let wrapper = unsafe { &*(state as *const HostIdentityProviderState) };
93+
let cloned = Box::new(HostIdentityProviderState {
94+
provider: wrapper.provider.clone(),
95+
});
96+
Box::into_raw(cloned) as *mut c_void
97+
}))
98+
.unwrap_or(std::ptr::null_mut())
9299
}
93100

94101
extern "C" fn drop_fn(state: *mut c_void) {
95-
if !state.is_null() {
96-
unsafe { drop(Box::from_raw(state as *mut HostIdentityProviderState)) };
97-
}
102+
let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
103+
if !state.is_null() {
104+
unsafe { drop(Box::from_raw(state as *mut HostIdentityProviderState)) };
105+
}
106+
}));
98107
}
99108

100109
let wrapper = Box::new(HostIdentityProviderState { provider });

components/host-sdk/src/proxies/bootstrap_provider.rs

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -155,26 +155,30 @@ fn build_ffi_bootstrap_sender(event_tx: BootstrapEventSender) -> FfiBootstrapSen
155155
let state = Box::into_raw(Box::new(std_tx)) as *mut c_void;
156156

157157
extern "C" fn send_fn(state: *mut c_void, event: *mut FfiBootstrapEvent) -> i32 {
158-
let tx = unsafe { &*(state as *const std::sync::mpsc::Sender<BootstrapEvent>) };
159-
if event.is_null() {
160-
return -1;
161-
}
162-
let ffi_event = unsafe { &*event };
163-
let bootstrap_event = unsafe { *Box::from_raw(ffi_event.opaque as *mut BootstrapEvent) };
164-
// Free the FFI envelope but not the opaque (we took ownership)
165-
unsafe { drop(Box::from_raw(event)) };
166-
match tx.send(bootstrap_event) {
167-
Ok(()) => 0,
168-
Err(_) => -1,
169-
}
158+
std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
159+
let tx = unsafe { &*(state as *const std::sync::mpsc::Sender<BootstrapEvent>) };
160+
if event.is_null() {
161+
return -1;
162+
}
163+
let ffi_event = unsafe { &*event };
164+
let bootstrap_event =
165+
unsafe { *Box::from_raw(ffi_event.opaque as *mut BootstrapEvent) };
166+
// Free the FFI envelope but not the opaque (we took ownership)
167+
unsafe { drop(Box::from_raw(event)) };
168+
match tx.send(bootstrap_event) {
169+
Ok(()) => 0,
170+
Err(_) => -1,
171+
}
172+
}))
173+
.unwrap_or(-1)
170174
}
171175

172176
extern "C" fn drop_fn(state: *mut c_void) {
173-
unsafe {
177+
let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| unsafe {
174178
drop(Box::from_raw(
175179
state as *mut std::sync::mpsc::Sender<BootstrapEvent>,
176180
))
177-
};
181+
}));
178182
}
179183

180184
FfiBootstrapSender {

components/host-sdk/src/proxies/reaction.rs

Lines changed: 43 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,16 @@ pub struct ReactionProxy {
3636
_library: Arc<Library>,
3737
cached_id: String,
3838
cached_type_name: String,
39+
/// Per-instance callback context for plugin-emitted log/lifecycle callbacks.
40+
///
41+
/// Stored as an `Arc` whose strong count was bumped by `Arc::into_raw` when
42+
/// the raw pointer was handed to the plugin. The host's `Arc` is kept here
43+
/// so the proxy holds at least two strong references; on Drop the host's
44+
/// `Arc` is `mem::forget`-ed unconditionally so any **late** log/lifecycle
45+
/// callback emitted by the plugin (after `stop()` returns) still finds a
46+
/// valid pointer. The cdylib itself is intentionally leaked process-wide
47+
/// (see `host-sdk/src/loader.rs`), so the small per-instance `Arc` leak is
48+
/// acceptable in exchange for closing the late-callback UAF window.
3949
_callback_ctx: std::sync::Mutex<Option<Arc<crate::callbacks::InstanceCallbackContext>>>,
4050
/// Channel for push-based result delivery. Created on start, closed on stop/drop.
4151
result_tx:
@@ -195,7 +205,13 @@ impl Reaction for ReactionProxy {
195205
update_tx: context.update_tx.clone(),
196206
});
197207

198-
let ctx_ptr = Arc::as_ptr(&per_instance_ctx) as *mut c_void;
208+
// Bug C fix: hand the plugin a strong reference (Arc::into_raw bumps
209+
// the refcount) so log/lifecycle callbacks emitted late by the plugin
210+
// (e.g. from inside stop_fn or from internal tasks shutting down) do
211+
// not deref freed memory. The matching `mem::forget` happens in Drop
212+
// and intentionally leaks one strong ref per instance.
213+
let ctx_for_plugin = per_instance_ctx.clone();
214+
let ctx_ptr = Arc::into_raw(ctx_for_plugin) as *mut c_void;
199215

200216
if let Ok(mut guard) = self._callback_ctx.lock() {
201217
*guard = Some(per_instance_ctx);
@@ -257,19 +273,18 @@ impl Reaction for ReactionProxy {
257273
}
258274

259275
async fn stop(&self) -> anyhow::Result<()> {
260-
// Close the sender so the forwarder's callback returns null
276+
// Bug B fix: close ONLY the sender. Dropping the sender is sufficient
277+
// to unblock the forwarder's `rx.recv()` (it returns Err, the callback
278+
// returns null, and the forwarder breaks). Do NOT also drop the
279+
// receiver here — the callback may still be holding `context.rx.lock()`
280+
// for a recv() that is racing this stop, and removing the receiver
281+
// mid-flight creates a race against the forwarder's in-flight
282+
// `enqueue_query_result(qr).await` against the reaction's
283+
// shutting-down priority queue.
261284
{
262285
let mut guard = self.result_tx.lock().expect("result_tx lock poisoned");
263286
*guard = None;
264287
}
265-
// Also drop the receiver to unblock the callback if it's blocked in recv()
266-
if let Ok(guard) = self._push_ctx.lock() {
267-
if let Some(ref ctx) = *guard {
268-
if let Ok(mut rx_guard) = ctx.rx.lock() {
269-
*rx_guard = None;
270-
}
271-
}
272-
}
273288

274289
let state = drasi_plugin_sdk::ffi::SendMutPtr(self.vtable.state);
275290
let stop_fn = self.vtable.stop_fn;
@@ -327,15 +342,10 @@ impl Drop for ReactionProxy {
327342
if let Ok(mut guard) = self.result_tx.lock() {
328343
*guard = None;
329344
}
330-
// Also drop the receiver inside the push context to unblock the callback
331-
// if it's blocked in recv() (belt-and-suspenders with the sender drop).
332-
if let Ok(guard) = self._push_ctx.lock() {
333-
if let Some(ref ctx) = *guard {
334-
if let Ok(mut rx_guard) = ctx.rx.lock() {
335-
*rx_guard = None;
336-
}
337-
}
338-
}
345+
// Bug B fix: do NOT drop the receiver here. Sender drop alone unblocks
346+
// recv(); leaving the receiver in place avoids racing a callback that
347+
// is currently holding `context.rx.lock()`. The receiver lives until
348+
// the leaked push-ctx Arc is collected (see the `mem::forget` below).
339349

340350
// Wait for the forwarder task to fully exit its processing loop.
341351
//
@@ -381,6 +391,20 @@ impl Drop for ReactionProxy {
381391
std::mem::forget(ctx);
382392
}
383393
}
394+
395+
// Bug C fix: leak the per-instance callback context Arc unconditionally.
396+
// The strong reference handed to the plugin via `Arc::into_raw` in
397+
// initialize() is never reclaimed — late log/lifecycle callbacks
398+
// emitted by the plugin (during stop_fn or from internal tasks) must
399+
// still find a valid pointer. The cdylib itself is intentionally
400+
// leaked process-wide (see host-sdk/src/loader.rs), so this small
401+
// per-instance Arc leak is the price of closing the late-callback
402+
// UAF window.
403+
if let Ok(mut guard) = self._callback_ctx.lock() {
404+
if let Some(ctx) = guard.take() {
405+
std::mem::forget(ctx);
406+
}
407+
}
384408
}
385409
}
386410

components/host-sdk/src/proxies/source.rs

Lines changed: 44 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -40,23 +40,30 @@ use crate::state_store_bridge::StateStoreVtableBuilder;
4040
/// Runs the pinned future on a new OS thread with a current-thread tokio runtime.
4141
/// This avoids nesting issues with the host's multi-thread runtime.
4242
extern "C" fn host_executor(future_ptr: *mut c_void) -> *mut c_void {
43-
// Wrap the raw pointer to make it Send-safe for std::thread::spawn
44-
let send_ptr = drasi_plugin_sdk::ffi::SendMutPtr(future_ptr);
45-
let result = std::thread::spawn(move || {
46-
let boxed_future = unsafe {
47-
Box::from_raw(send_ptr.as_ptr()
48-
as *mut std::pin::Pin<Box<dyn std::future::Future<Output = *mut c_void>>>)
49-
};
50-
let rt = tokio::runtime::Builder::new_current_thread()
51-
.enable_all()
52-
.build()
53-
.expect("failed to build tokio runtime for source proxy");
54-
// Wrap the result in SendMutPtr to satisfy Send bound
55-
drasi_plugin_sdk::ffi::SendMutPtr(rt.block_on(*boxed_future))
56-
})
57-
.join()
58-
.expect("host executor thread panicked");
59-
result.as_ptr()
43+
std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
44+
// Wrap the raw pointer to make it Send-safe for std::thread::spawn
45+
let send_ptr = drasi_plugin_sdk::ffi::SendMutPtr(future_ptr);
46+
let result = std::thread::spawn(move || {
47+
let boxed_future = unsafe {
48+
Box::from_raw(send_ptr.as_ptr()
49+
as *mut std::pin::Pin<Box<dyn std::future::Future<Output = *mut c_void>>>)
50+
};
51+
let rt = match tokio::runtime::Builder::new_current_thread()
52+
.enable_all()
53+
.build()
54+
{
55+
Ok(rt) => rt,
56+
Err(_) => return drasi_plugin_sdk::ffi::SendMutPtr(std::ptr::null_mut()),
57+
};
58+
// Wrap the result in SendMutPtr to satisfy Send bound
59+
drasi_plugin_sdk::ffi::SendMutPtr(rt.block_on(*boxed_future))
60+
})
61+
.join()
62+
.map(|p| p.as_ptr())
63+
.unwrap_or(std::ptr::null_mut());
64+
result
65+
}))
66+
.unwrap_or(std::ptr::null_mut())
6067
}
6168

6269
/// Wraps a `SourceVtable` into a DrasiLib `Source` trait implementation.
@@ -265,10 +272,15 @@ impl Source for SourceProxy {
265272
update_tx: context.update_tx.clone(),
266273
});
267274

268-
// Use as_ptr (no refcount increment) — the Arc in _callback_ctx keeps
269-
// the context alive for the proxy's lifetime. This avoids the memory
270-
// leak that Arc::into_raw would cause (no matching from_raw).
271-
let ctx_ptr = Arc::as_ptr(&per_instance_ctx) as *mut c_void;
275+
// Bug C fix: hand the plugin a strong reference (Arc::into_raw bumps
276+
// the refcount) so log/lifecycle callbacks emitted late by the plugin
277+
// (e.g. from inside stop_fn or from internal tasks shutting down) do
278+
// not deref freed memory. The matching `mem::forget` happens in Drop
279+
// and intentionally leaks one strong ref per instance — acceptable
280+
// because the cdylib itself is intentionally process-leaked (see
281+
// host-sdk/src/loader.rs).
282+
let ctx_for_plugin = per_instance_ctx.clone();
283+
let ctx_ptr = Arc::into_raw(ctx_for_plugin) as *mut c_void;
272284

273285
// Store the Arc so it stays alive as long as this proxy
274286
if let Ok(mut guard) = self._callback_ctx.lock() {
@@ -316,6 +328,17 @@ impl Drop for SourceProxy {
316328
let drop_fn = self.vtable.drop_fn;
317329
let state = drasi_plugin_sdk::ffi::SendMutPtr(self.vtable.state);
318330
let _ = std::thread::spawn(move || (drop_fn)(state.as_ptr())).join();
331+
332+
// Bug C fix: leak the per-instance callback context Arc unconditionally.
333+
// The strong reference handed to the plugin via `Arc::into_raw` in
334+
// initialize() is never reclaimed — late log/lifecycle callbacks
335+
// emitted by the plugin (during stop_fn or from internal tasks) must
336+
// still find a valid pointer. Matches the pattern in ReactionProxy.
337+
if let Ok(mut guard) = self._callback_ctx.lock() {
338+
if let Some(ctx) = guard.take() {
339+
std::mem::forget(ctx);
340+
}
341+
}
319342
}
320343
}
321344

0 commit comments

Comments
 (0)