Skip to content

Commit c59e5fe

Browse files
STARGAZERSTARGAZER
authored andcommitted
refactor: harden V3 native async operation lifecycle
1 parent 304e80c commit c59e5fe

6 files changed

Lines changed: 328 additions & 145 deletions

File tree

docs/architecture/native-interop.md

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,22 @@ The public API is asynchronous, and the main V3 native operations use callback-n
7575

7676
Schema reads, partition planning, table creation, protocol upgrade, SQL DML operations, table/query/CDF/partition stream setup, and insert/merge setup use native async operation handles with completion notification. Managed code starts the relevant `*_async_with_callback` export, awaits a `TaskCompletionSource`, takes the result string with `dts_async_operation_take_result`, the schema with `dts_async_operation_take_schema`, or the stream with `dts_async_operation_take_stream` after the native callback fires, and releases the handle through `dts_async_operation_destroy`. Cancellation requests call `dts_async_operation_cancel` before managed code surfaces `OperationCanceledException`. This keeps the public API shapes unchanged while moving those one-shot operations onto the shared Tokio runtime instead of blocking the managed caller thread for the whole native operation.
7777

78+
Native async operations have a small state machine exposed through stable integer status values. Managed code mirrors those values with an internal enum and treats unknown values as native failures.
79+
80+
| Native state | Status | Result ownership |
81+
| --- | --- | --- |
82+
| `Pending` | `0` | Tokio task may still complete. No result, schema, stream, or error may be taken. |
83+
| `Succeeded` | `1` | JSON result is owned by Rust until `dts_async_operation_take_result` transfers it exactly once to managed code, which frees it with `dts_free_string`. |
84+
| `SucceededSchema` | `1` | Arrow schema is owned by Rust until `dts_async_operation_take_schema` writes it exactly once into caller-provided `FFI_ArrowSchema` storage. |
85+
| `SucceededStream` | `1` | Arrow stream reader is owned by Rust until `dts_async_operation_take_stream` writes it exactly once into caller-provided `FFI_ArrowArrayStream` storage. |
86+
| `Failed` | `2` | Error message pointer and error code are borrowed from the operation state and remain valid until the operation is destroyed or mutated. |
87+
| `Cancelled` | `3` | Error message pointer and `Cancelled` code are borrowed from the operation state. Managed cancellation detection prefers this typed code. |
88+
| destroyed | n/a | `dts_async_operation_destroy` clears the callback operation pointer and aborts any pending task. Late native completion must not invoke the managed callback. |
89+
90+
The native callback only signals that a terminal state is available; managed code must still query status and take the appropriate result. Result, schema, and stream take operations are single-use. Destroying an operation before completion suppresses late callbacks by clearing the stored operation pointer before aborting the task.
91+
92+
Native async task bodies catch panics and convert them to `Failed` with an internal error code before notifying the callback. This keeps managed callers from waiting forever if a native async task exits abnormally before producing a normal result.
93+
7894
For async insert and merge, Rust imports the caller-provided Arrow C Stream before spawning the write task. Managed code keeps both the exported `IArrowArrayStream` adapter and the `CArrowArrayStream` storage alive until native completion is signaled, then disposes and frees them. Cancellation waits for the aborted native task to drop the imported reader before notifying managed code, which prevents the native writer from reading through released managed stream state while still avoiding a blocking write FFI call.
7995

8096
Synchronous Rust C ABI exports are retained for native ABI compatibility, direct Rust unit coverage, and diagnostics. Managed SDK production paths should prefer the callback exports for operations with meaningful native work.
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
namespace DeltaLakeSharp.Client.Internal.Native
5+
{
6+
/// <summary>
7+
/// Stable native async operation status values exposed by the Rust V3 ABI.
8+
/// </summary>
9+
/// <remarks>
10+
/// Numeric values must match ASYNC_OPERATION_* constants in src/DeltaLakeSharp.Server/v3/src/interop/native.rs.
11+
/// </remarks>
12+
internal enum NativeAsyncOperationStatus
13+
{
14+
Pending = 0,
15+
Succeeded = 1,
16+
Failed = 2,
17+
Cancelled = 3,
18+
}
19+
}

src/DeltaLakeSharp.Client/Internal/NativeRustBackend.cs

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,6 @@ namespace DeltaLakeSharp.Client.Internal
2626
/// </summary>
2727
internal sealed class NativeRustBackend : IDeltaLakeBackend
2828
{
29-
private const int NativeAsyncOperationPending = 0;
30-
private const int NativeAsyncOperationSucceeded = 1;
31-
private const int NativeAsyncOperationFailed = 2;
32-
private const int NativeAsyncOperationCancelled = 3;
33-
3429
private static readonly NativeMethods.NativeAsyncOperationCompletedCallback NativeAsyncOperationCompleted = OnNativeAsyncOperationCompleted;
3530

3631
private delegate IntPtr StartNativeAsyncOperationWithCallback(
@@ -1365,14 +1360,15 @@ public void Complete(IntPtr operation)
13651360

13661361
try
13671362
{
1368-
int status = NativeMethods.AsyncOperationStatus(operation);
1363+
int rawStatus = NativeMethods.AsyncOperationStatus(operation);
1364+
var status = (NativeAsyncOperationStatus)rawStatus;
13691365
switch (status)
13701366
{
1371-
case NativeAsyncOperationSucceeded:
1367+
case NativeAsyncOperationStatus.Succeeded:
13721368
_completion.TrySetResult(_takeResult(operation));
13731369
break;
13741370

1375-
case NativeAsyncOperationFailed:
1371+
case NativeAsyncOperationStatus.Failed:
13761372
if (_cancellationToken.IsCancellationRequested && IsNativeCancellationFailure(operation))
13771373
{
13781374
_completion.TrySetCanceled(_cancellationToken);
@@ -1384,7 +1380,7 @@ public void Complete(IntPtr operation)
13841380
_operationName));
13851381
break;
13861382

1387-
case NativeAsyncOperationCancelled:
1383+
case NativeAsyncOperationStatus.Cancelled:
13881384
if (_cancellationToken.CanBeCanceled)
13891385
{
13901386
_completion.TrySetCanceled(_cancellationToken);
@@ -1395,14 +1391,14 @@ public void Complete(IntPtr operation)
13951391
}
13961392
break;
13971393

1398-
case NativeAsyncOperationPending:
1394+
case NativeAsyncOperationStatus.Pending:
13991395
_completion.TrySetException(new InvalidOperationException(
14001396
$"Native async {_nativeOperationName} completion was signaled before terminal state was available."));
14011397
break;
14021398

14031399
default:
14041400
_completion.TrySetException(new InvalidOperationException(
1405-
$"Native async {_nativeOperationName} returned unknown status {status}."));
1401+
$"Native async {_nativeOperationName} returned unknown status {rawStatus}."));
14061402
break;
14071403
}
14081404
}

src/DeltaLakeSharp.Server/v3/src/error.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -144,8 +144,10 @@ mod tests {
144144
);
145145
assert_eq!(
146146
ServiceErrorCode::Delta,
147-
ServiceError::Delta(deltalake::DeltaTableError::Generic("delta failure".to_string()))
148-
.code()
147+
ServiceError::Delta(deltalake::DeltaTableError::Generic(
148+
"delta failure".to_string()
149+
))
150+
.code()
149151
);
150152
assert_eq!(
151153
ServiceErrorCode::DataFusion,

0 commit comments

Comments
 (0)