Skip to content

feat(csharp/src/Drivers/Apache/Spark): Add prefetch for direct result #2666

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from

Conversation

eric-wang-1990
Copy link
Contributor

This PR adds prefetch function for directResults.
While the application is processing the current batch, the driver can start getting the next batch.
Perf gain tested with Power BI Desktop:
ADBC without Prefetch:
image
ADBC with Prefetch:
image

@github-actions github-actions bot added this to the ADBC Libraries 18 milestone Apr 2, 2025
@eric-wang-1990 eric-wang-1990 changed the title feat(cshap/Drivers/Apache/Spark): Add prefetch for direct result feat(cshap/src/Drivers/Apache/Spark): Add prefetch for direct result Apr 2, 2025
@birschick-bq
Copy link
Contributor

@eric-wang-1990 - I tried this for the Apache Spark reader and had to revert.
#2273

I'll test your changes to see if there is and issue with interleaved use of the buffer using the the Thrift library.

@CurtHagenlocher CurtHagenlocher changed the title feat(cshap/src/Drivers/Apache/Spark): Add prefetch for direct result feat(csharp/src/Drivers/Apache/Spark): Add prefetch for direct result Apr 2, 2025
@birschick-bq
Copy link
Contributor

@eric-wang-1990 - I tried this for the Apache Spark reader and had to revert. #2273

I'll test your changes to see if there is and issue with interleaved use of the buffer using the the Thrift library.

As I suspected, I am able to reproduce the problem ...

 Apache.Arrow.Adbc.Tests.Drivers.Apache.Spark.DriverTests.CanExecuteQuery(batchSizeFactor: 0.10000000000000001)
  No source available
   Duration: 44.2 sec

  Message: 
System.InvalidOperationException : Unexpectedly reached the end of the stream before a full buffer was read.

  Stack Trace: 
ArrowStreamReaderImplementation.EnsureFullRead(Memory`1 buffer, Int32 bytesRead)
ArrowStreamReaderImplementation.ReadMessageAsync(CancellationToken cancellationToken)
ArrowStreamReaderImplementation.ReadRecordBatchAsync(CancellationToken cancellationToken)
ArrowStreamReaderImplementation.ReadNextRecordBatchAsync(CancellationToken cancellationToken)
SparkDatabricksReader.TryReadFromCurrentReader(CancellationToken cancellationToken) line 94
SparkDatabricksReader.ReadNextRecordBatchAsync(CancellationToken cancellationToken) line 66
DriverTests.CanExecuteQuery(QueryResult queryResult, Int64 expectedNumberOfResults, String environmentName) line 50
DriverTests`2.CanExecuteQuery(Nullable`1 batchSizeFactor) line 555
RuntimeMethodHandle.InvokeMethod(Object target, Void** arguments, Signature sig, Boolean isConstructor)
MethodBaseInvoker.InvokeDirectByRefWithFewArgs(Object obj, Span`1 copyOfArgs, BindingFlags invokeAttr)

  Standard Output: 
BatchSize: 5474. ExpectedResultCount: 54748
SELECT * FROM `hive_metastore`.`default`.`nyctaxigreen`

From PR #2273

The attempt to use “look-ahead” buffering in the HiveServer2Reader is not supported by the Thrift library. The issue is that the Thrift library uses a shared buffer on the Client/Protocol/Transport object -so interleaving Fetch and ExecuteStatement will fail because the buffer will get closed unexpectedly.

I created a ticket with Apache Thrift
https://issues.apache.org/jira/browse/THRIFT-5830

@CurtHagenlocher
Copy link
Contributor

@birschick-bq, do I understand correctly that there's no reliable or safe way to get this ~40% perf improvement because of limitations in the Thrift library? What would we need to do to work around it, open multiple connections to the server?

@birschick-bq
Copy link
Contributor

birschick-bq commented Apr 3, 2025

@birschick-bq, do I understand correctly that there's no reliable or safe way to get this ~40% perf improvement because of limitations in the Thrift library? What would we need to do to work around it, open multiple connections to the server?

@CurtHagenlocher / @eric-wang-1990

Yes, the limitation is in the Thrift library. The Transport layer is allocating/disposing the buffers independent of the higher level call. When the FetchNext await the results (buffer) it should have, they may have been destroyed by the next interleaved call.

My idea to solve this problem, is to have the buffers allocated/dispose at the Client layer and passed to the Transport layer. They would be created before the send_ call and disposed after the recv_ high-level call.

For example, currently the generated code for FetchResult would change to look something like this ...

public async Task<TFetchResultsResp> FetchResults(TFetchResultsReq @req, CancellationToken cancellationToken = default)
{
  using TransportBuffer inputBuffer = this.InputProtocol.Transport.AllocateBuffer();
  await send_FetchResults(@req, buffer: inputBuffer, cancellationToken);
  using TransportBuffer outputBuffer = this.OutputProtocol.Transport.AllocateBuffer();
  return await recv_FetchResults(buffer: outputBuffer, cancellationToken);
}

The challenge is changing all the code along the way to pass the buffers through the layers.

@davidhcoe
Copy link
Contributor

I know this one still needs some work, so just putting this out there, but I think this should wait on the decision about #2672 and then move to the Databricks driver.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants