Skip to content

Commit 5fc6e77

Browse files
authored
Add native ADBC FlightSQL driver support for osx-arm64, fix ADBC resources disposal (#16)
* Add native ADBC FlightSQL driver for osx-arm64 Include platform-specific driver loading logic and update tests to use explicit FlightSQL endpoints. * Use WithSpiceCloud in test client builders * Dispose query result streams and keep statements alive - Wrap query result streams to ensure the underlying ADBC statement remains alive for the stream's lifetime and is disposed correctly. - Update integration tests to use 'using' for result streams. - Add StatementBoundArrowArrayStream to manage disposal order. * Remove continue-on-error from Test step in CI workflow * Handle inconsistent schema exception for empty result sets * Throw on access to Schema after disposal
1 parent ff5f092 commit 5fc6e77

7 files changed

Lines changed: 404 additions & 196 deletions

File tree

.github/workflows/pr.yaml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,6 @@ jobs:
7676
- name: Build
7777
run: dotnet build --no-restore --configuration Release
7878
- name: Test
79-
continue-on-error: ${{ startsWith(matrix.os, 'macos') }}
8079
env:
8180
SCP_SPICEAI_TPCH_API_KEY: ${{ secrets.SCP_SPICEAI_TPCH_API_KEY }}
8281
run: dotnet test --no-build --verbosity normal --configuration Release --framework ${{ matrix.dotnet-version == '8.0.x' && 'net8.0' || matrix.dotnet-version == '9.0.x' && 'net9.0' || 'net10.0' }}

Spice/Spice.csproj

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
<Project Sdk="Microsoft.NET.Sdk">
1+
<Project Sdk="Microsoft.NET.Sdk">
22

33
<PropertyGroup>
44
<PackageId>SpiceAI</PackageId>
@@ -28,6 +28,18 @@
2828
<None Include="..\README.md" Pack="true" PackagePath="\" />
2929
</ItemGroup>
3030

31+
<!--
32+
Include native ADBC FlightSQL driver for osx-arm64.
33+
This platform is not included in the upstream Apache.Arrow.Adbc.Drivers.Interop.FlightSql NuGet package.
34+
NuGet/MSBuild automatically copies the correct RID-specific binary to output at build time.
35+
36+
Native library source: Apache Arrow ADBC 1.9.0
37+
SHA-256 (wheel): edee43183625acb2fd2e2b8331f07ac516b6958ef1f27663f6308334db7f7272
38+
-->
39+
<ItemGroup>
40+
<None Include="runtimes\**\*" Pack="true" PackagePath="runtimes" CopyToOutputDirectory="PreserveNewest" Link="runtimes\%(RecursiveDir)%(Filename)%(Extension)" />
41+
</ItemGroup>
42+
3143
<ItemGroup>
3244
<PackageReference Include="Apache.Arrow" Version="22.1.0" />
3345
<PackageReference Include="Apache.Arrow.Adbc" Version="0.21.0" />
@@ -43,5 +55,4 @@
4355
</AssemblyAttribute>
4456
</ItemGroup>
4557

46-
47-
</Project>
58+
</Project>
Binary file not shown.

Spice/src/Adbc/SpiceAdbcClient.cs

Lines changed: 107 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,9 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
2020
SOFTWARE.
2121
*/
2222

23+
using System.Runtime.InteropServices;
2324
using Apache.Arrow;
2425
using Apache.Arrow.Adbc;
25-
using Apache.Arrow.Adbc.Drivers.Interop.FlightSql;
2626
using Apache.Arrow.Ipc;
2727
using Apache.Arrow.Types;
2828
using Polly.Retry;
@@ -114,23 +114,36 @@ private void InitializeIfNeeded()
114114
return;
115115
}
116116

117-
// Format the URI for ADBC FlightSQL Go driver
118-
// The Go-based driver handles grpc/grpc+tls schemes natively
117+
// Format the URI for ADBC FlightSQL driver
118+
// The driver expects grpc:// or grpc+tls:// schemes
119119
var uri = _flightAddress;
120120

121-
// Ensure proper scheme if not already present
122-
if (!uri.StartsWith("grpc://", StringComparison.OrdinalIgnoreCase) &&
123-
!uri.StartsWith("grpc+tls://", StringComparison.OrdinalIgnoreCase) &&
124-
!uri.StartsWith("grpc+tcp://", StringComparison.OrdinalIgnoreCase) &&
125-
!uri.StartsWith("http://", StringComparison.OrdinalIgnoreCase) &&
126-
!uri.StartsWith("https://", StringComparison.OrdinalIgnoreCase))
121+
// Convert http/https schemes to grpc/grpc+tls
122+
if (uri.StartsWith("https://", StringComparison.OrdinalIgnoreCase))
123+
{
124+
#if NETSTANDARD2_0
125+
uri = "grpc+tls://" + uri.Substring(8);
126+
#else
127+
uri = string.Concat("grpc+tls://", uri.AsSpan(8));
128+
#endif
129+
}
130+
else if (uri.StartsWith("http://", StringComparison.OrdinalIgnoreCase))
131+
{
132+
#if NETSTANDARD2_0
133+
uri = "grpc://" + uri.Substring(7);
134+
#else
135+
uri = string.Concat("grpc://", uri.AsSpan(7));
136+
#endif
137+
}
138+
else if (!uri.StartsWith("grpc://", StringComparison.OrdinalIgnoreCase) &&
139+
!uri.StartsWith("grpc+tls://", StringComparison.OrdinalIgnoreCase) &&
140+
!uri.StartsWith("grpc+tcp://", StringComparison.OrdinalIgnoreCase))
127141
{
128142
// No scheme provided - add grpc or grpc+tls based on TLS setting
129143
uri = _useTls ? string.Concat("grpc+tls://", uri) : string.Concat("grpc://", uri);
130144
}
131145

132-
// Build database parameters for the Go driver
133-
// The Go driver uses "uri" as the connection parameter
146+
// Build database parameters for the ADBC FlightSQL driver
134147
var databaseParams = new Dictionary<string, string>
135148
{
136149
{ "uri", uri }
@@ -143,11 +156,12 @@ private void InitializeIfNeeded()
143156
databaseParams["password"] = _apiKey!;
144157
}
145158

146-
// Add user agent header using the Go driver's header prefix
159+
// Add user agent header
147160
databaseParams["adbc.flight.sql.rpc.call_header.user-agent"] = UserAgentHelper.BuildUserAgent(_userAgent);
148161

149-
// Create the Go-based interop driver and database
150-
var driver = FlightSqlDriverLoader.LoadDriver();
162+
// Load the ADBC FlightSQL driver from the application base directory
163+
var driverPath = ResolveNativeDriverPath();
164+
var driver = AdbcDriverLoader.LoadDriver(driverPath, "AdbcDriverFlightsqlInit");
151165
_database = driver.Open(databaseParams);
152166
_connection = _database.Connect(new Dictionary<string, string>());
153167
}
@@ -170,22 +184,34 @@ private void InitializeIfNeeded()
170184
{
171185
InitializeIfNeeded();
172186

173-
using var statement = _connection!.CreateStatement();
174-
statement.SqlQuery = sql;
187+
var statement = _connection!.CreateStatement();
188+
try
189+
{
190+
statement.SqlQuery = sql;
191+
192+
// Prepare the statement
193+
statement.Prepare();
175194

176-
// Prepare the statement
177-
statement.Prepare();
195+
// Bind parameters if provided
196+
if (parameters.Length > 0)
197+
{
198+
var parameterBatch = CreateParameterBatch(parameters);
199+
statement.Bind(parameterBatch, parameterBatch.Schema);
200+
}
178201

179-
// Bind parameters if provided
180-
if (parameters.Length > 0)
202+
// Execute the query
203+
var result = statement.ExecuteQuery();
204+
205+
// Wrap the stream to keep the statement alive for the stream's lifetime
206+
var wrappedStream = new StatementBoundArrowArrayStream(result.Stream!, statement);
207+
return Task.FromResult<IArrowArrayStream?>(wrappedStream);
208+
}
209+
catch
181210
{
182-
var parameterBatch = CreateParameterBatch(parameters);
183-
statement.Bind(parameterBatch, parameterBatch.Schema);
211+
// If anything fails before we wrap the stream, dispose the statement
212+
statement.Dispose();
213+
throw;
184214
}
185-
186-
// Execute the query
187-
var result = statement.ExecuteQuery();
188-
return Task.FromResult(result.Stream);
189215
});
190216
}
191217

@@ -513,6 +539,61 @@ private static Decimal256Array CreateDecimal256Array(object? value, Decimal256Ty
513539

514540
// ============ Disposal ============
515541

542+
/// <summary>
543+
/// Resolves the path to the native ADBC FlightSQL driver based on the current platform.
544+
/// Uses standard .NET conventions: AppContext.BaseDirectory and runtimes/{rid}/native/ layout.
545+
/// </summary>
546+
private static string ResolveNativeDriverPath()
547+
{
548+
var rid = GetRuntimeIdentifier();
549+
var driverFileName = GetDriverFileName();
550+
var driverPath = Path.Combine(AppContext.BaseDirectory, "runtimes", rid, "native", driverFileName);
551+
552+
if (!File.Exists(driverPath))
553+
{
554+
throw new FileNotFoundException(
555+
$"Could not find native ADBC FlightSQL driver at {driverPath}. " +
556+
$"Ensure the SpiceAI NuGet package is properly installed.",
557+
driverPath);
558+
}
559+
560+
return driverPath;
561+
}
562+
563+
/// <summary>
564+
/// Gets the runtime identifier for the current platform.
565+
/// </summary>
566+
private static string GetRuntimeIdentifier()
567+
{
568+
var arch = RuntimeInformation.OSArchitecture.ToString().ToLowerInvariant();
569+
570+
if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows))
571+
return $"win-{arch}";
572+
if (RuntimeInformation.IsOSPlatform(OSPlatform.Linux))
573+
return $"linux-{arch}";
574+
if (RuntimeInformation.IsOSPlatform(OSPlatform.OSX))
575+
return $"osx-{arch}";
576+
577+
throw new PlatformNotSupportedException(
578+
$"Unsupported platform: {RuntimeInformation.OSDescription}");
579+
}
580+
581+
/// <summary>
582+
/// Gets the platform-specific driver filename.
583+
/// </summary>
584+
private static string GetDriverFileName()
585+
{
586+
if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows))
587+
return "libadbc_driver_flightsql.dll";
588+
if (RuntimeInformation.IsOSPlatform(OSPlatform.Linux))
589+
return "libadbc_driver_flightsql.so";
590+
if (RuntimeInformation.IsOSPlatform(OSPlatform.OSX))
591+
return "libadbc_driver_flightsql.dylib";
592+
593+
throw new PlatformNotSupportedException(
594+
$"Unsupported platform: {RuntimeInformation.OSDescription}");
595+
}
596+
516597
private bool _disposed;
517598

518599
public void Dispose()
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
/*
2+
Copyright 2024 The Spice.ai OSS Authors
3+
4+
Permission is hereby granted, free of charge, to any person obtaining a copy
5+
of this software and associated documentation files (the "Software"), to deal
6+
in the Software without restriction, including without limitation the rights
7+
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8+
copies of the Software, and to permit persons to whom the Software is
9+
furnished to do so, subject to the following conditions:
10+
11+
The above copyright notice and this permission notice shall be included in all
12+
copies or substantial portions of the Software.
13+
14+
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15+
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16+
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17+
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18+
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19+
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
20+
SOFTWARE.
21+
*/
22+
23+
using System;
24+
using System.Threading;
25+
using System.Threading.Tasks;
26+
using Apache.Arrow;
27+
using Apache.Arrow.Adbc;
28+
using Apache.Arrow.Ipc;
29+
30+
namespace Spice.Adbc;
31+
32+
/// <summary>
33+
/// A wrapper around IArrowArrayStream that keeps the ADBC statement alive
34+
/// for the lifetime of the stream. When this wrapper is disposed, it disposes
35+
/// both the underlying stream and the statement in the correct order.
36+
/// </summary>
37+
internal sealed class StatementBoundArrowArrayStream : IArrowArrayStream
38+
{
39+
private readonly IArrowArrayStream _innerStream;
40+
private readonly AdbcStatement _statement;
41+
private bool _disposed;
42+
43+
/// <summary>
44+
/// Creates a new StatementBoundArrowArrayStream that wraps the given stream
45+
/// and keeps the statement alive until disposal.
46+
/// </summary>
47+
/// <param name="innerStream">The underlying Arrow array stream</param>
48+
/// <param name="statement">The ADBC statement to keep alive and dispose with the stream</param>
49+
public StatementBoundArrowArrayStream(IArrowArrayStream innerStream, AdbcStatement statement)
50+
{
51+
_innerStream = innerStream ?? throw new ArgumentNullException(nameof(innerStream));
52+
_statement = statement ?? throw new ArgumentNullException(nameof(statement));
53+
}
54+
55+
/// <inheritdoc/>
56+
public Schema Schema
57+
{
58+
get
59+
{
60+
#if NET8_0_OR_GREATER
61+
ObjectDisposedException.ThrowIf(_disposed, this);
62+
#else
63+
if (_disposed) throw new ObjectDisposedException(GetType().FullName);
64+
#endif
65+
return _innerStream.Schema;
66+
}
67+
}
68+
69+
/// <inheritdoc/>
70+
public ValueTask<RecordBatch?> ReadNextRecordBatchAsync(CancellationToken cancellationToken = default)
71+
{
72+
#if NET8_0_OR_GREATER
73+
ObjectDisposedException.ThrowIf(_disposed, this);
74+
#else
75+
if (_disposed) throw new ObjectDisposedException(GetType().FullName);
76+
#endif
77+
return _innerStream.ReadNextRecordBatchAsync(cancellationToken);
78+
}
79+
80+
/// <inheritdoc/>
81+
public void Dispose()
82+
{
83+
if (_disposed) return;
84+
_disposed = true;
85+
86+
// Dispose in the correct order:
87+
// 1. First dispose the stream (finishes reading/closes the stream)
88+
// 2. Then dispose the statement (releases server-side resources)
89+
try
90+
{
91+
_innerStream.Dispose();
92+
}
93+
finally
94+
{
95+
_statement.Dispose();
96+
}
97+
}
98+
}

0 commit comments

Comments
 (0)