Skip to content

feat(csharp/src/Drivers/Databricks): Support server side property passthrough #2692

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 75 additions & 0 deletions csharp/src/Drivers/Databricks/DatabricksConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Apache.Arrow.Adbc.Drivers.Apache;
Expand All @@ -29,10 +31,22 @@ namespace Apache.Arrow.Adbc.Drivers.Databricks
{
internal class DatabricksConnection : SparkHttpConnection
{
private bool _applySSPWithQueries = true;

public DatabricksConnection(IReadOnlyDictionary<string, string> properties) : base(properties)
{
if (Properties.TryGetValue(DatabricksParameters.ApplySSPWithQueries, out string? applySSPWithQueriesStr) &&
bool.TryParse(applySSPWithQueriesStr, out bool applySSPWithQueriesValue))
{
_applySSPWithQueries = applySSPWithQueriesValue;
}
}

/// <summary>
/// Gets whether server side properties should be applied using queries.
/// </summary>
internal bool ApplySSPWithQueries => _applySSPWithQueries;

internal override IArrowArrayStream NewReader<T>(T statement, Schema schema, TGetResultSetMetadataResp? metadataResp = null)
{
// Get result format from metadata response if available
Expand Down Expand Up @@ -86,9 +100,70 @@ protected override TOpenSessionReq CreateSessionRequest()
Client_protocol_i64 = (long)TProtocolVersion.SPARK_CLI_SERVICE_PROTOCOL_V7,
CanUseMultipleCatalogs = true,
};

// If not using queries to set server-side properties, include them in Configuration
if (!_applySSPWithQueries)
{
req.Configuration = new Dictionary<string, string>();
var serverSideProperties = GetServerSideProperties();
foreach (var property in serverSideProperties)
{
req.Configuration[property.Key] = property.Value;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder what kind of logs we can add here. @davidhcoe Until the logging/telemetry framework is ready do you recommend we log anything?

}
return req;
}

/// <summary>
/// Gets a dictionary of server-side properties extracted from connection properties.
/// </summary>
/// <returns>Dictionary of server-side properties with prefix removed from keys.</returns>
private Dictionary<string, string> GetServerSideProperties()
{
return Properties
.Where(p => p.Key.StartsWith(DatabricksParameters.ServerSidePropertyPrefix))
.ToDictionary(
p => p.Key.Substring(DatabricksParameters.ServerSidePropertyPrefix.Length),
p => p.Value
);
}

/// <summary>
/// Applies server-side properties by executing "set key=value" queries.
/// </summary>
/// <returns>A task representing the asynchronous operation.</returns>
public async Task ApplyServerSidePropertiesAsync()
{
if (!_applySSPWithQueries)
{
return;
}

var serverSideProperties = GetServerSideProperties();

if (serverSideProperties.Count == 0)
{
return;
}

using var statement = new DatabricksStatement(this);

foreach (var property in serverSideProperties)
{
string query = $"SET {property.Key}={property.Value}";
statement.SqlQuery = query;

try
{
await statement.ExecuteUpdateAsync();
}
catch (Exception ex)
{
Debug.WriteLine($"Error setting server-side property '{property.Key}': {ex.Message}");
}
}
}

protected override Task<TGetResultSetMetadataResp> GetResultSetMetadataAsync(TGetSchemasResp response, CancellationToken cancellationToken = default) =>
Task.FromResult(response.DirectResults.ResultSetMetadata);
protected override Task<TGetResultSetMetadataResp> GetResultSetMetadataAsync(TGetCatalogsResp response, CancellationToken cancellationToken = default) =>
Expand Down
1 change: 1 addition & 0 deletions csharp/src/Drivers/Databricks/DatabricksDatabase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public override AdbcConnection Connect(IReadOnlyDictionary<string, string>? opti
.ToDictionary(kvp => kvp.Key, kvp => kvp.Value);
DatabricksConnection connection = new DatabricksConnection(mergedProperties);
connection.OpenAsync().Wait();
connection.ApplyServerSidePropertiesAsync().Wait();
return connection;
}
}
Expand Down
15 changes: 15 additions & 0 deletions csharp/src/Drivers/Databricks/DatabricksParameters.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,21 @@ public class DatabricksParameters : SparkParameters
/// Default value is 5 minutes if not specified.
/// </summary>
public const string CloudFetchTimeoutMinutes = "adbc.databricks.cloudfetch.timeout_minutes";

/// <summary>
/// Whether to apply service side properties (SSP) with queries. If false, SSP will be applied
/// by setting the Thrift configuration when the session is opened.
/// Default value is true if not specified.
/// </summary>
public const string ApplySSPWithQueries = "adbc.databricks.apply_ssp_with_queries";

/// <summary>
/// Prefix for server-side properties. Properties with this prefix will be passed to the server
/// by executing a "set key=value" query when opening a session.
/// For example, a property with key "adbc.databricks.SSP_use_cached_result"
/// and value "true" will result in executing "set use_cached_result=true" on the server.
/// </summary>
public const string ServerSidePropertyPrefix = "adbc.databricks.SSP_";
}

/// <summary>
Expand Down