Skip to content

Commit 9f2c551

Browse files
Cappu7inoSTARGAZER
andauthored
Complete V3 native async operation migration (#16)
* Add native async partition planning * Generalize native async JSON operations * Harden native async operation lifecycle * Add async native read stream setup * Add async native query stream setup * Add async native CDF stream setup * Add async native partition stream setup * Add async native insert setup * Add async native merge setup * Add async native schema setup * Clean up managed native sync bindings --------- Co-authored-by: STARGAZER <stargazer@XingdeMacBook-Air.local>
1 parent 2a407fa commit 9f2c551

12 files changed

Lines changed: 3226 additions & 372 deletions

File tree

docs/ai/limitations.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ This file is intentionally direct. Use it to prevent incorrect SDK integrations
3939

4040
- The public APIs are asynchronous, but backend execution does not imply unlimited parallelism.
4141
- Flight clients own a gRPC channel per `DeltaTableServiceClient` instance.
42-
- V3 native calls cross a synchronous FFI boundary and use a native engine handle.
42+
- V3 native uses a native engine handle; main operations use callback-notified async operation handles, while batch pulls and imported write sources still follow Arrow C Stream ownership rules.
4343
- Avoid issuing multiple concurrent native operations through the same client unless the caller has validated backend behavior for that scenario.
4444
- Partitioned reads are the preferred pattern for independent parallel reads on V3.
4545

docs/ai/overview.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ Most SDK operations are asynchronous and stream Arrow `RecordBatch` values. Help
5858
- Treat `IAsyncEnumerable<RecordBatch>` and `IArrowArrayStream` results as streaming resources.
5959
- Avoid buffering full tables unless data volume is known to be small.
6060
- Use per-request storage options for SAS/OneLake authentication instead of global mutable state.
61-
- For V3 native operations, assume a synchronous FFI boundary inside the backend call path even when the public API is asynchronous.
61+
- For V3 native operations, schema reads, partition planning, JSON-result table operations, table/query/CDF/partition stream setup, and insert/merge setup use callback-notified native async operation handles. Batch consumption and write source consumption still use Arrow C Stream ownership rules.
6262

6363
## Major Tradeoffs
6464

docs/architecture/cdf-and-partitioned-reads.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,11 @@ Constraints:
3030
- The table must have CDF enabled.
3131
- V1/V2 Flight wrappers throw for CDF APIs.
3232

33+
Runtime behavior:
34+
35+
- V3 CDF stream setup uses the native async operation handle path, matching table and SQL stream setup.
36+
- Batch consumption remains Arrow C Stream based; consumers should dispose stream and reader resources promptly.
37+
3338
## CDF Result Shape
3439

3540
CDF rows include regular table columns plus metadata such as:
@@ -64,6 +69,12 @@ Descriptor fields:
6469
- total partition count
6570
- file count
6671

72+
Runtime behavior:
73+
74+
- V3 partition planning returns JSON through the native async operation handle path.
75+
- V3 partition stream setup also uses native async operation handles before returning an Arrow C Stream.
76+
- Partition stream batch consumption remains Arrow C Stream based.
77+
6778
## Partition Token Rules
6879

6980
- Tokens are opaque.

docs/architecture/execution-model.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,11 +66,14 @@ Properties:
6666
- Uses Arrow C Data/C Stream interfaces for schemas and batches.
6767
- Uses direct Arrow C Stream pulls by default, with bounded Rust-side prefetch available through `DeltaTableServiceClientOptions.EnableNativeReadPrefetch`.
6868
- Owns a native engine handle through a `SafeHandle` wrapper.
69+
- Uses a native async operation `SafeHandle` for V3 schema reads, partition planning, JSON-result table operations, read stream setup, and insert/merge setup.
6970
- Shares one process-wide Tokio runtime across native engine handles.
7071
- Avoids the Flight service boundary but requires native runtime assets.
7172

7273
Each `NativeRustBackend` still owns its native engine handle and per-engine error state. The shared runtime reduces thread and stack reservation overhead when multiple V3 clients are created in the same process. Native merge work is scheduled onto Tokio worker threads so deep delta-rs/DataFusion merge execution does not run on the .NET caller stack.
7374

75+
Schema reads, `GetReadPartitionsAsync`, table creation, protocol upgrade, SQL DML operations, table/query/CDF/partition stream setup, and insert/merge setup start callback-notified native operations on the shared Tokio runtime. The managed backend awaits a `TaskCompletionSource`, takes the schema, JSON result, or Arrow stream once after native completion is signaled, and cancels the native operation if the managed cancellation token is signaled. The public API and result models are unchanged.
76+
7477
When read-stream prefetch is enabled, production is bounded in two ways: each exported stream has a small native queue, and active backend batch production is capped process-wide. These limits provide backpressure for high-concurrency readers without changing the public `IAsyncEnumerable<RecordBatch>` or `IArrowArrayStream` shapes. The default read path remains direct batch pulling because local benchmarks showed prefetch overhead can dominate small/local reads.
7578

7679
## Streaming First

docs/architecture/native-interop.md

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ Managed V3 execution flows through:
1010

1111
- [../../src/DeltaLakeSharp.Client/Internal/NativeRustBackend.cs](../../src/DeltaLakeSharp.Client/Internal/NativeRustBackend.cs)
1212
- [../../src/DeltaLakeSharp.Client/Internal/Native/NativeEngineHandle.cs](../../src/DeltaLakeSharp.Client/Internal/Native/NativeEngineHandle.cs)
13+
- [../../src/DeltaLakeSharp.Client/Internal/Native/NativeAsyncOperationHandle.cs](../../src/DeltaLakeSharp.Client/Internal/Native/NativeAsyncOperationHandle.cs)
1314
- [../../src/DeltaLakeSharp.Client/Internal/Native/NativeMethods.net8.cs](../../src/DeltaLakeSharp.Client/Internal/Native/NativeMethods.net8.cs)
1415
- [../../src/DeltaLakeSharp.Client/Internal/Native/NativeMethods.net472.cs](../../src/DeltaLakeSharp.Client/Internal/Native/NativeMethods.net472.cs)
1516

@@ -43,9 +44,10 @@ Native merge work runs on Tokio worker threads instead of polling the whole merg
4344
| Data | Representation | Ownership Rule |
4445
| --- | --- | --- |
4546
| command metadata | JSON string | Managed code builds command payload; Rust parses it. |
46-
| schema | Arrow C Data schema | Managed code imports schema and frees temporary native structures. |
47+
| schema | Arrow C Data schema | Managed code imports schema and frees temporary native structures; async schema reads take the schema result exactly once. |
4748
| read batches | Arrow C Stream | Imported managed stream owns the release callback; Rust can use bounded prefetch behind the stream when enabled. |
48-
| write batches | Arrow C Stream | Managed stream is exported to Rust for operation duration. |
49+
| write batches | Arrow C Stream | Managed stream is exported to Rust for operation duration; async insert and merge keep the exported stream and native storage alive until completion notification. |
50+
| one-shot async operation | native operation pointer | Managed code awaits a `TaskCompletionSource`, takes the owned result string or Arrow stream once after native completion notification, and destroys the operation handle. |
4951
| string results | native string pointer | Managed code frees returned native strings. |
5052

5153
## Native Library Discovery
@@ -69,7 +71,13 @@ Common failure modes:
6971

7072
## Concurrency Expectations
7173

72-
The public API is asynchronous, but V3 crosses a synchronous FFI boundary for native calls. Do not assume unlimited parallelism through a single client instance. For parallel reads, prefer V3 partition planning and independent partition consumption.
74+
The public API is asynchronous, and the main V3 native operations use callback-notified native operation handles. Do not assume unlimited parallelism through a single client instance. For parallel reads, prefer V3 partition planning and independent partition consumption.
75+
76+
Schema reads, partition planning, table creation, protocol upgrade, SQL DML operations, table/query/CDF/partition stream setup, and insert/merge setup use native async operation handles with completion notification. Managed code starts the relevant `*_async_with_callback` export, awaits a `TaskCompletionSource`, takes the result string with `dts_async_operation_take_result`, the schema with `dts_async_operation_take_schema`, or the stream with `dts_async_operation_take_stream` after the native callback fires, and releases the handle through `dts_async_operation_destroy`. Cancellation requests call `dts_async_operation_cancel` before managed code surfaces `OperationCanceledException`. This keeps the public API shapes unchanged while moving those one-shot operations onto the shared Tokio runtime instead of blocking the managed caller thread for the whole native operation.
77+
78+
For async insert and merge, Rust imports the caller-provided Arrow C Stream before spawning the write task. Managed code keeps both the exported `IArrowArrayStream` adapter and the `CArrowArrayStream` storage alive until native completion is signaled, then disposes and frees them. Cancellation waits for the aborted native task to drop the imported reader before notifying managed code, which prevents the native writer from reading through released managed stream state while still avoiding a blocking write FFI call.
79+
80+
Synchronous Rust C ABI exports are retained for native ABI compatibility, direct Rust unit coverage, and diagnostics. Managed SDK production paths should prefer the callback exports for operations with meaningful native work.
7381

7482
By default, V3 read streams pull each batch through the Arrow C Stream callback and synchronously bridge to the async DataFusion stream. `DeltaTableServiceClientOptions.EnableNativeReadPrefetch` enables an experimental prefetch mode that places a small Rust-owned bounded queue behind the exported Arrow C Stream. In that mode, a Tokio producer task advances the DataFusion stream and sends ready batch results into the queue, while the Arrow C Stream pull side drains queued batches. The queue is bounded per stream, and native read production is guarded by a process-wide active-production limit so full per-stream queues do not monopolize global read capacity.
7583

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
using System;
5+
using System.Runtime.InteropServices;
6+
7+
namespace DeltaLakeSharp.Client.Internal.Native
8+
{
9+
internal sealed class NativeAsyncOperationHandle : SafeHandle
10+
{
11+
public NativeAsyncOperationHandle()
12+
: base(IntPtr.Zero, ownsHandle: true)
13+
{
14+
}
15+
16+
public override bool IsInvalid => handle == IntPtr.Zero;
17+
18+
public static NativeAsyncOperationHandle FromIntPtr(IntPtr ptr)
19+
{
20+
if (ptr == IntPtr.Zero)
21+
{
22+
throw new InvalidOperationException("Failed to create native async operation.");
23+
}
24+
25+
var handle = new NativeAsyncOperationHandle();
26+
handle.SetHandle(ptr);
27+
return handle;
28+
}
29+
30+
protected override bool ReleaseHandle()
31+
{
32+
NativeMethods.AsyncOperationDestroy(handle);
33+
return true;
34+
}
35+
}
36+
}

src/DeltaLakeSharp.Client/Internal/Native/NativeMethods.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@ internal static partial class NativeMethods
1212
internal const string LibraryName = "delta_table_service_native";
1313
private static IntPtr _loadedHandle;
1414

15+
[UnmanagedFunctionPointer(CallingConvention.Cdecl)]
16+
internal delegate void NativeAsyncOperationCompletedCallback(IntPtr operation, IntPtr userData);
17+
1518
private static string[] GetCandidateLibraryPaths()
1619
{
1720
string fileName = GetPlatformLibraryFileName();

0 commit comments

Comments
 (0)