Skip to content

feat(csharp/src/Drivers/Databricks): Support server side property passthrough #2692

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 95 additions & 0 deletions csharp/src/Drivers/Databricks/DatabricksConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Apache.Arrow.Adbc.Drivers.Apache;
Expand All @@ -29,10 +31,22 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks
{
internal class DatabricksConnection : SparkHttpConnection
{
private bool _applySSPWithQueries = false;

public DatabricksConnection(IReadOnlyDictionary<string, string> properties) : base(properties)
{
if (Properties.TryGetValue(DatabricksParameters.ApplySSPWithQueries, out string? applySSPWithQueriesStr) &&
bool.TryParse(applySSPWithQueriesStr, out bool applySSPWithQueriesValue))
{
_applySSPWithQueries = applySSPWithQueriesValue;
}
}

/// <summary>
/// Gets whether server side properties should be applied using queries.
/// </summary>
internal bool ApplySSPWithQueries => _applySSPWithQueries;

internal override IArrowArrayStream NewReader<T>(T statement, Schema schema, TGetResultSetMetadataResp? metadataResp = null)
{
// Get result format from metadata response if available
Expand Down Expand Up @@ -86,9 +100,90 @@ protected override TOpenSessionReq CreateSessionRequest()
Client_protocol_i64 = (long)TProtocolVersion.SPARK_CLI_SERVICE_PROTOCOL_V7,
CanUseMultipleCatalogs = true,
};

// If not using queries to set server-side properties, include them in Configuration
if (!_applySSPWithQueries)
{
req.Configuration = new Dictionary<string, string>();
var serverSideProperties = GetServerSideProperties();
foreach (var property in serverSideProperties)
{
req.Configuration[property.Key] = property.Value;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder what kind of logs we can add here. @davidhcoe Until the logging/telemetry framework is ready do you recommend we log anything?

}
return req;
}

/// <summary>
/// Gets a dictionary of server-side properties extracted from connection properties.
/// </summary>
/// <returns>Dictionary of server-side properties with prefix removed from keys.</returns>
private Dictionary<string, string> GetServerSideProperties()
{
return Properties
.Where(p => p.Key.StartsWith(DatabricksParameters.ServerSidePropertyPrefix))
.ToDictionary(
p => p.Key.Substring(DatabricksParameters.ServerSidePropertyPrefix.Length),
p => p.Value
);
}

/// <summary>
/// Applies server-side properties by executing "set key=value" queries.
/// </summary>
/// <returns>A task representing the asynchronous operation.</returns>
public async Task ApplyServerSidePropertiesAsync()
{
if (!_applySSPWithQueries)
{
return;
}

var serverSideProperties = GetServerSideProperties();

if (serverSideProperties.Count == 0)
{
return;
}

using var statement = new DatabricksStatement(this);

foreach (var property in serverSideProperties)
{
if (!IsValidPropertyName(property.Key))
{
Debug.WriteLine($"Skipping invalid property name: {property.Key}");
continue;
}

string escapedValue = EscapeSqlString(property.Value);
string query = $"SET {property.Key}={escapedValue}";
statement.SqlQuery = query;

try
{
await statement.ExecuteUpdateAsync();
}
catch (Exception ex)
{
Debug.WriteLine($"Error setting server-side property '{property.Key}': {ex.Message}");
}
}
}

private bool IsValidPropertyName(string propertyName)
{
// Allow only letters and underscores in property names
return System.Text.RegularExpressions.Regex.IsMatch(
propertyName,
@"^[a-zA-Z_]+$");
}

private string EscapeSqlString(string value)
{
return "`" + value.Replace("`", "``") + "`";
}

protected override Task<TGetResultSetMetadataResp> GetResultSetMetadataAsync(TGetSchemasResp response, CancellationToken cancellationToken = default) =>
Task.FromResult(response.DirectResults.ResultSetMetadata);
protected override Task<TGetResultSetMetadataResp> GetResultSetMetadataAsync(TGetCatalogsResp response, CancellationToken cancellationToken = default) =>
Expand Down
1 change: 1 addition & 0 deletions csharp/src/Drivers/Databricks/DatabricksDatabase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public override AdbcConnection Connect(IReadOnlyDictionary<string, string>? opti
.ToDictionary(kvp => kvp.Key, kvp => kvp.Value);
DatabricksConnection connection = new DatabricksConnection(mergedProperties);
connection.OpenAsync().Wait();
connection.ApplyServerSidePropertiesAsync().Wait();
return connection;
}
}
Expand Down
15 changes: 15 additions & 0 deletions csharp/src/Drivers/Databricks/DatabricksParameters.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,21 @@ public class DatabricksParameters : SparkParameters
/// Default value is 5 minutes if not specified.
/// </summary>
public const string CloudFetchTimeoutMinutes = "adbc.databricks.cloudfetch.timeout_minutes";

/// <summary>
/// Whether to apply service side properties (SSP) with queries. If false, SSP will be applied
/// by setting the Thrift configuration when the session is opened.
/// Default value is false if not specified.
/// </summary>
public const string ApplySSPWithQueries = "adbc.databricks.apply_ssp_with_queries";

/// <summary>
/// Prefix for server-side properties. Properties with this prefix will be passed to the server
/// by executing a "set key=value" query when opening a session.
/// For example, a property with key "adbc.databricks.SSP_use_cached_result"
/// and value "true" will result in executing "set use_cached_result=true" on the server.
/// </summary>
public const string ServerSidePropertyPrefix = "adbc.databricks.SSP_";
}

/// <summary>
Expand Down
85 changes: 85 additions & 0 deletions csharp/test/Drivers/Databricks/ServerSidePropertyE2ETest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Apache.Arrow.Adbc.Drivers.Databricks;
using Xunit;
using Xunit.Abstractions;

namespace Apache.Arrow.Adbc.Tests.Drivers.Databricks
{
/// <summary>
/// End-to-end tests for the server-side property passthrough feature in the Databricks ADBC driver.
/// </summary>
public class ServerSidePropertyE2ETest : TestBase<DatabricksTestConfiguration, DatabricksTestEnvironment>
{
public ServerSidePropertyE2ETest(ITestOutputHelper? outputHelper)
: base(outputHelper, new DatabricksTestEnvironment.Factory())
{
// Skip the test if the DATABRICKS_TEST_CONFIG_FILE environment variable is not set
Skip.IfNot(Utils.CanExecuteTestConfig(TestConfigVariable));
}

/// <summary>
/// Tests setting server-side properties.
/// </summary>
[Theory]
[InlineData(true)]
[InlineData(false)]
public async Task TestServerSideProperty(bool applyWithQueries)
{
var additionalConnectionParams = new Dictionary<string, string>()
{
[DatabricksParameters.ServerSidePropertyPrefix + "use_cached_result"] = "false",
[DatabricksParameters.ServerSidePropertyPrefix + "statement_timeout"] = "12345",
[DatabricksParameters.ApplySSPWithQueries] = applyWithQueries.ToString().ToLower()
};
using var connection = NewConnection(TestConfiguration, additionalConnectionParams);

// Verify the server-side property was set by querying it
using var statement = connection.CreateStatement();
statement.SqlQuery = "SET";

var result = await statement.ExecuteQueryAsync();
Assert.NotNull(result.Stream);

var batch = await result.Stream.ReadNextRecordBatchAsync();
Assert.NotNull(batch);
Assert.True(batch.Length > 0);
Assert.Equal(2, batch.ColumnCount);

var returnedProperties = new Dictionary<string, string>();
var keys = (StringArray)batch.Column(0);
var values = (StringArray)batch.Column(1);
for (int i = 0; i < batch.Length; i++)
{
string key = keys.GetString(i);
string value = values.GetString(i);
returnedProperties[key] = value;
Console.WriteLine($"Property: {key} = {value}");
}

Assert.True(returnedProperties.ContainsKey("use_cached_result"));
Assert.Equal("false", returnedProperties["use_cached_result"]);

Assert.True(returnedProperties.ContainsKey("statement_timeout"));
Assert.Equal("12345", returnedProperties["statement_timeout"]);
}
}
}
Loading