Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(csharp/src/Drivers/Apache/Spark): low latency test cases #1948

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

<PropertyGroup>
<TargetFrameworks>net472;net6.0</TargetFrameworks>
<NoWarn>CS8618</NoWarn>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of disabling this warning, it looks like the _statementResp field should probably just be made nullable.

</PropertyGroup>

<ItemGroup>
Expand Down
42 changes: 35 additions & 7 deletions csharp/src/Drivers/Apache/Hive2/HiveServer2Statement.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/

using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Apache.Arrow.Ipc;
using Apache.Hive.Service.Rpc.Thrift;
Expand All @@ -28,6 +29,7 @@ public abstract class HiveServer2Statement : AdbcStatement
private const int BatchSizeDefault = 50000;
protected internal HiveServer2Connection connection;
protected internal TOperationHandle? operationHandle;
protected internal TExecuteStatementResp _statementResp;

protected HiveServer2Statement(HiveServer2Connection connection)
{
Expand All @@ -38,20 +40,42 @@ protected virtual void SetStatementProperties(TExecuteStatementReq statement)
{
}

protected abstract IArrowArrayStream NewReader<T>(T statement, Schema schema) where T : HiveServer2Statement;
protected abstract IArrowArrayStream NewReader<T>(T statement, Schema schema, TFetchResultsResp? firstResult) where T : HiveServer2Statement;

public override QueryResult ExecuteQuery() => ExecuteQueryAsync().AsTask().Result;

public override UpdateResult ExecuteUpdate() => ExecuteUpdateAsync().Result;

public override async ValueTask<QueryResult> ExecuteQueryAsync()
{
await ExecuteStatementAsync();
await PollForResponseAsync();
Schema schema = await GetSchemaAsync();
var resp = await ExecuteStatementAsync();

// TODO: Ensure this is set dynamically based on server capabilities
return new QueryResult(-1, NewReader(this, schema));
if (resp.OperationHandle.HasResultSet)
{
if(resp.DirectResults == null)
{
throw new InvalidOperationException();
}

if(resp.DirectResults.ResultSetMetadata == null)
{
throw new InvalidOperationException();
}

Schema schema = SchemaParser.GetArrowSchema(resp.DirectResults.ResultSetMetadata.Schema);
var firstBatch = resp.DirectResults.ResultSet;

return new QueryResult(-1, NewReader(this, schema, firstBatch));

}
else
{
await PollForResponseAsync();
Schema schema = await GetSchemaAsync();

// TODO: Ensure this is set dynamically based on server capabilities
return new QueryResult(-1, NewReader(this, schema, null));
}
}

public override async Task<UpdateResult> ExecuteUpdateAsync()
Expand Down Expand Up @@ -106,10 +130,11 @@ public override void SetOption(string key, string value)
}
}

protected async Task ExecuteStatementAsync()
protected async Task<TExecuteStatementResp> ExecuteStatementAsync()
{
TExecuteStatementReq executeRequest = new TExecuteStatementReq(this.connection.sessionHandle, this.SqlQuery);
SetStatementProperties(executeRequest);
executeRequest.GetDirectResults = new TSparkGetDirectResults(BatchSizeDefault);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Set it to the "current" value of BatchSize, in case it's been update through configuration/programmatically.

Suggested change
executeRequest.GetDirectResults = new TSparkGetDirectResults(BatchSizeDefault);
executeRequest.GetDirectResults = new TSparkGetDirectResults(BatchSize);

TExecuteStatementResp executeResponse = await this.connection.Client.ExecuteStatement(executeRequest);
if (executeResponse.Status.StatusCode == TStatusCode.ERROR_STATUS)
{
Expand All @@ -118,11 +143,14 @@ protected async Task ExecuteStatementAsync()
.SetNativeError(executeResponse.Status.ErrorCode);
}
this.operationHandle = executeResponse.OperationHandle;

return executeResponse;
}

protected async Task PollForResponseAsync()
{
TGetOperationStatusResp? statusResponse = null;

do
{
if (statusResponse != null) { await Task.Delay(PollTimeMilliseconds); }
Expand Down
3 changes: 2 additions & 1 deletion csharp/src/Drivers/Apache/Impala/ImpalaStatement.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
Expand All @@ -40,7 +41,7 @@ public override object GetValue(IArrowArray arrowArray, int index)
throw new NotSupportedException();
}

protected override IArrowArrayStream NewReader<T>(T statement, Schema schema) => new HiveServer2Reader(statement, schema);
protected override IArrowArrayStream NewReader<T>(T statement, Schema schema, TFetchResultsResp? firstBatch) => new HiveServer2Reader(statement, schema);

/// <summary>
/// Provides the constant string key values to the <see cref="AdbcStatement.SetOption(string, string)" /> method.
Expand Down
55 changes: 45 additions & 10 deletions csharp/src/Drivers/Apache/Spark/SparkConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ public class SparkConnection : HiveServer2Connection
private readonly Lazy<string> _productVersion;

internal static TSparkGetDirectResults sparkGetDirectResults = new TSparkGetDirectResults(1000);
HttpClient _httpClient;

public String TraceId;

internal static readonly Dictionary<string, string> timestampConfig = new Dictionary<string, string>
{
Expand Down Expand Up @@ -256,6 +259,8 @@ internal SparkConnection(IReadOnlyDictionary<string, string> properties)
: base(properties)
{
_productVersion = new Lazy<string>(() => GetProductVersion(), LazyThreadSafetyMode.PublicationOnly);
_httpClient = new HttpClient();
TraceId = string.Empty;
}

protected string ProductVersion => _productVersion.Value;
Expand All @@ -278,23 +283,53 @@ protected override async ValueTask<TProtocol> CreateProtocolAsync()
else
token = properties[SparkParameters.Password];

HttpClient httpClient = new HttpClient();
httpClient.BaseAddress = new UriBuilder(Uri.UriSchemeHttps, hostName, -1, path).Uri;
httpClient.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Bearer", token);
httpClient.DefaultRequestHeaders.UserAgent.ParseAdd(UserAgent);
httpClient.DefaultRequestHeaders.AcceptEncoding.Clear();
httpClient.DefaultRequestHeaders.AcceptEncoding.Add(new StringWithQualityHeaderValue("identity"));
httpClient.DefaultRequestHeaders.ExpectContinue = false;

TConfiguration config = new TConfiguration();

ThriftHttpTransport transport = new ThriftHttpTransport(httpClient, config);
_httpClient.BaseAddress = new UriBuilder(Uri.UriSchemeHttps, hostName, -1, path).Uri;
_httpClient.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Bearer", token);
_httpClient.DefaultRequestHeaders.UserAgent.ParseAdd("SimbaSparkJDBCDriver/2.06.15 Python/PyHive");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the best choice for a user-agent? Are we driving some specific server behavior by sending this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd update the UserAgent value and leave a comment as to why this needs to be particular constant. Ideally, the ADBC driver agent name could also be added to your whitelist.

_httpClient.DefaultRequestHeaders.AcceptEncoding.Clear();
_httpClient.DefaultRequestHeaders.AcceptEncoding.Add(new StringWithQualityHeaderValue("identity"));
_httpClient.DefaultRequestHeaders.ExpectContinue = false;
_httpClient.DefaultRequestHeaders.Add("traceparent", Generate());
ThriftHttpTransport transport = new ThriftHttpTransport(_httpClient, config);
// can switch to the one below if want to use the experimental one with IPeekableTransport
// ThriftHttpTransport transport = new ThriftHttpTransport(httpClient, config);
await transport.OpenAsync(CancellationToken.None);
return new TBinaryProtocol(transport);
}

private static readonly char[] Characters = "0123456789abcdef".ToCharArray();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private static readonly char[] Characters = "0123456789abcdef".ToCharArray();
private static readonly char[] HexCharacters = "0123456789abcdef".ToCharArray();

private static readonly Random Random = new Random();

public string Generate()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needs a more descriptive name

Suggested change
public string Generate()
public string GenerateTraceId()

{
StringBuilder result = new StringBuilder("00-");

for (int i = 0; i < 32; i++)
{
result.Append(Characters[Random.Next(Characters.Length)]);
}
Comment on lines +308 to +311
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be extracted to a function with the length as the parameter and then resused for lines 315-318.


result.Append('-');

for (int i = 0; i < 16; i++)
{
result.Append(Characters[Random.Next(Characters.Length)]);
}

result.Append("-01");

TraceId = result.ToString();

return TraceId;
}

public void ResetTraceId()
{
_httpClient.DefaultRequestHeaders.Remove("traceparent");
_httpClient.DefaultRequestHeaders.Add("traceparent", Generate());
}

protected override TOpenSessionReq CreateSessionRequest()
{
return new TOpenSessionReq(TProtocolVersion.SPARK_CLI_SERVICE_PROTOCOL_V7)
Expand Down
26 changes: 23 additions & 3 deletions csharp/src/Drivers/Apache/Spark/SparkStatement.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,17 @@ internal SparkStatement(SparkConnection connection)
: base(connection)
{
}
public override QueryResult ExecuteQuery()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: empty line

Suggested change
public override QueryResult ExecuteQuery()
public override QueryResult ExecuteQuery()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this override code be applied to ExecuteQueryAsync, too?

{
var conn = (connection as SparkConnection);

if(conn != null)
{
conn.ResetTraceId();
}
Comment on lines +37 to +42
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More concise?

 ```suggestion
if (connection is SparkConnection sparkConnection) sparkConnection.ResetTraceId();


return base.ExecuteQuery();
}

protected override void SetStatementProperties(TExecuteStatementReq statement)
{
Expand All @@ -55,7 +66,7 @@ protected override void SetStatementProperties(TExecuteStatementReq statement)
};
}

protected override IArrowArrayStream NewReader<T>(T statement, Schema schema) => new SparkReader(statement, schema);
protected override IArrowArrayStream NewReader<T>(T statement, Schema schema, TFetchResultsResp? firstBatch) => new SparkReader(statement, schema, firstBatch);

/// <summary>
/// Provides the constant string key values to the <see cref="AdbcStatement.SetOption(string, string)" /> method.
Expand All @@ -72,11 +83,18 @@ sealed class SparkReader : IArrowArrayStream
List<TSparkArrowBatch>? batches;
int index;
IArrowReader? reader;
bool hasMoreRows = false;

public SparkReader(HiveServer2Statement statement, Schema schema)
public SparkReader(HiveServer2Statement statement, Schema schema, TFetchResultsResp? firstBatch)
{
this.statement = statement;
this.schema = schema;

if (firstBatch != null)
{
this.batches = firstBatch.Results.ArrowBatches;
this.hasMoreRows = firstBatch.HasMoreRows;
}
}

public Schema Schema { get { return schema; } }
Expand Down Expand Up @@ -104,7 +122,7 @@ public SparkReader(HiveServer2Statement statement, Schema schema)
this.batches = null;
this.index = 0;

if (this.statement == null)
if (this.statement == null || !hasMoreRows)
{
return null;
}
Expand All @@ -117,6 +135,8 @@ public SparkReader(HiveServer2Statement statement, Schema schema)
{
this.statement = null;
}

hasMoreRows = response.HasMoreRows;
}
}

Expand Down
74 changes: 68 additions & 6 deletions csharp/test/Drivers/Apache/Spark/DriverTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,15 @@
*/

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Text.RegularExpressions;
using System.Threading;
using System.Threading.Tasks;
using Apache.Arrow.Adbc.Drivers.Apache.Spark;
using Apache.Arrow.Adbc.Tests.Metadata;
using Apache.Arrow.Adbc.Tests.Xunit;
using Apache.Arrow.Ipc;
Expand Down Expand Up @@ -505,14 +510,71 @@ public async Task CanGetTableTypes()
[SkippableFact, Order(10)]
public void CanExecuteQuery()
{
using AdbcConnection adbcConnection = NewConnection();
long count = 0;
long target = 10;
Queue<String> log = new Queue<string>();
string FileName = $@"c:\temp\hmsmeta\{DateTime.Now.ToString("yyyyMMdd_HHmm")}__{target}.log";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of hardcoding a path, please use something like Path.GetTempPath and consider deleting the file once the test has finished.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would add a logFolder property into the configuration file properties. This is not likely portable or desirable.


using AdbcStatement statement = adbcConnection.CreateStatement();
statement.SqlQuery = TestConfiguration.Query;

QueryResult queryResult = statement.ExecuteQuery();
for (int i = 0; i < target; i++)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test could be refactored to handle any test "kernel" (lambda) to wrap the test such that we could capture performance information in a log file. What do you think?

{
ThreadPool.QueueUserWorkItem((_) =>
{
using AdbcConnection adbcConnection = NewConnection();
var sparkConn = adbcConnection as SparkConnection;
if(sparkConn == null)
{
throw new InvalidCastException();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As this is test code and we're just throwing a cast exception anyway, the code is a little cleaner as just

var sparkConn = (SparkConnection)adbcConnection;

(The cast will throw if it's not the right type.)

}

for (int i = 0; i < 10000; i++)
{
var sw = new Stopwatch();
sw.Start();
bool succeed = false;
try
{
using AdbcStatement statement = adbcConnection.CreateStatement();
statement.SqlQuery = TestConfiguration.Query;

QueryResult queryResult = statement.ExecuteQuery();
Tests.DriverTests.CanExecuteQuery(queryResult, TestConfiguration.ExpectedResultsCount);
Comment on lines +536 to +540
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be the lambda fed into the performance logger.

succeed = true;
}
catch (Exception)
{
succeed = false;
}
sw.Stop();
var logStr = $"{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff")},{sparkConn.TraceId},{sw.ElapsedMilliseconds},{succeed}";

lock(log)
{
log.Enqueue(logStr);
if(log.Count > 1000)
{
File.AppendAllLines(FileName, log.ToArray());
log.Clear();
}
}
}

lock (log)
{
if (log.Count > 0)
{
File.AppendAllLines(FileName, log.ToArray());
log.Clear();
}
}

Interlocked.Increment(ref count);
});
}

Tests.DriverTests.CanExecuteQuery(queryResult, TestConfiguration.ExpectedResultsCount);
while(Interlocked.Read(ref count) < target)
{
Thread.Sleep(1000);
}
}

/// <summary>
Expand Down