Skip to content

feat(csharp/src/Drivers/Apache): Add prefetch functionality to CloudFetch in Spark ADBC driver #2678

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from

Conversation

jadewang-db
Copy link
Contributor

@jadewang-db jadewang-db commented Apr 7, 2025

Add Prefetch Functionality to CloudFetch in Spark ADBC Driver

This PR enhances the CloudFetch feature in the Spark ADBC driver by implementing prefetch functionality, which improves performance by fetching multiple batches of results ahead of time.

Changes

CloudFetchResultFetcher Enhancements

  • Initial Prefetch: Added code to perform an initial prefetch of multiple batches when the fetcher starts, ensuring data is available immediately when needed.
  • State Management: Added tracking for current batch offset and size, with proper state reset when starting the fetcher.

Interface Updates

  • Added new methods to ICloudFetchResultFetcher interface:

Testing Infrastructure

  • Created ITestableHiveServer2Statement interface to facilitate testing
  • Updated tests to account for prefetch behavior
  • Ensured all tests pass with the new prefetch functionality

Benefits

  • Improved Performance: By prefetching multiple batches, data is available sooner, reducing wait times.
  • Better Reliability: Enhanced error handling and state management make the system more robust.
  • More Efficient Resource Usage: Link caching reduces unnecessary server requests.

This implementation maintains backward compatibility while providing significant performance improvements for CloudFetch operations.

@github-actions github-actions bot added this to the ADBC Libraries 18 milestone Apr 7, 2025
@jadewang-db jadewang-db force-pushed the cloudfetch-pipeline branch from f59743b to 01daf70 Compare April 14, 2025 19:18
refactor

some code refactoring

refactor

Delete CloudFetchDownloadManagerTest.cs

Initital changes
@jadewang-db jadewang-db force-pushed the cloudfetch-pipeline branch from 01daf70 to a388213 Compare April 14, 2025 19:49
Copy link
Contributor

@CurtHagenlocher CurtHagenlocher left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I'm still reviewing this logic but thought I'd give some initial feedback.

Also, please take a look at the linter output and make changes accordingly.

@@ -0,0 +1,60 @@
current cloudfetch implementation download the cloud result file inline with the reader, which generate performance problem, it slows down the reader when need download the next result file
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file needs an Apache copyright notice. See some of the other Markdown files in this project for how to add one.

/// <summary>
/// Interface for accessing HiveServer2Statement properties needed by CloudFetchResultFetcher.
/// </summary>
public interface IHiveServer2Statement
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should not be public.

It looks like all of the Thrift classes were mistakenly generated as public instead of internal. I'll make sure to fix that. We don't want internal implementation details to be part of the public API.

/// <summary>
/// Represents a downloaded result file with its associated metadata.
/// </summary>
public interface IDownloadResult : IDisposable
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These interfaces should not be public contracts

/// Maximum number of parallel downloads for CloudFetch operations.
/// Default value is 3 if not specified.
/// </summary>
public const string CloudFetchParallelDownloads = "adbc.spark.cloudfetch.parallel_downloads";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The existing CloudFetch properties are named adbc.databricks.* instead of adbc.spark.*.

@@ -61,81 +50,45 @@ internal sealed class CloudFetchReader : IArrowArrayStream
/// <param name="isLz4Compressed">Whether the results are LZ4 compressed.</param>
public CloudFetchReader(DatabricksStatement statement, Schema schema, bool isLz4Compressed)
{
this.statement = statement;
this.schema = schema;
this.schema = schema ?? throw new ArgumentNullException(nameof(schema));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't strictly needed because it's an internal class and nullable is enabled and the schema argument is already declared as non-nullable.

_statement,
_memoryManager,
_downloadQueue,
2000000);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be configurable? Can it be lifted to a named constant?

/// </summary>
internal sealed class EndOfResultsGuard : IDownloadResult
{
private static readonly Task<bool> CompletedTask = Task.FromResult(true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use Task.CompletedTask instead of defining a new one?

/// <summary>
/// Gets a task that completes when the download is finished.
/// </summary>
Task DownloadCompletedTask { get; }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nothing seems to use this

Comment on lines +196 to +208
try
{
return await _downloader.GetNextDownloadedFileAsync(cancellationToken).ConfigureAwait(false);
}
catch (Exception ex)
{
// If there's an error in the downloader, check if there's also an error in the fetcher
if (_resultFetcher.HasError)
{
throw new AggregateException("Errors in download pipeline", new[] { ex, _resultFetcher.Error! });
}
throw;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
try
{
return await _downloader.GetNextDownloadedFileAsync(cancellationToken).ConfigureAwait(false);
}
catch (Exception ex)
{
// If there's an error in the downloader, check if there's also an error in the fetcher
if (_resultFetcher.HasError)
{
throw new AggregateException("Errors in download pipeline", new[] { ex, _resultFetcher.Error! });
}
throw;
}
try
{
return await _downloader.GetNextDownloadedFileAsync(cancellationToken).ConfigureAwait(false);
}
catch (Exception ex) when (_resultFetcher.HasError)
{
throw new AggregateException("Errors in download pipeline", new[] { ex, _resultFetcher.Error! });
}

if (result != EndOfResultsGuard.Instance)
{
result.Dispose();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that EndOfResultsGuard.Dispose() is a nop, we could just .Dispose() without the if guard.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants