-
Notifications
You must be signed in to change notification settings - Fork 12
Expand file tree
/
Copy pathCloudFetchConfiguration.cs
More file actions
160 lines (142 loc) · 7.7 KB
/
CloudFetchConfiguration.cs
File metadata and controls
160 lines (142 loc) · 7.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
/*
* Copyright (c) 2025 ADBC Drivers Contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Buffers;
using System.Collections.Generic;
using Apache.Arrow;
using Microsoft.IO;
namespace AdbcDrivers.Databricks.Reader.CloudFetch
{
/// <summary>
/// Configuration for the CloudFetch download pipeline.
/// Protocol-agnostic - works with both Thrift and REST implementations.
/// </summary>
internal sealed class CloudFetchConfiguration
{
// Default values
internal const int DefaultParallelDownloads = 3;
internal const int DefaultPrefetchCount = 2;
internal const int DefaultMemoryBufferSizeMB = 200;
internal const int DefaultTimeoutMinutes = 5;
internal const int DefaultMaxRetries = -1; // -1 = not set (use timeout only)
internal const int DefaultRetryTimeoutSeconds = 300; // 5 minutes
internal const int DefaultRetryDelayMs = 500;
internal const int DefaultMaxUrlRefreshAttempts = 3;
internal const int DefaultUrlExpirationBufferSeconds = 60;
/// <summary>
/// Number of parallel downloads to perform.
/// </summary>
public int ParallelDownloads { get; set; } = DefaultParallelDownloads;
/// <summary>
/// Number of files to prefetch ahead of the reader.
/// </summary>
public int PrefetchCount { get; set; } = DefaultPrefetchCount;
/// <summary>
/// Memory buffer size limit in MB for buffered files.
/// </summary>
public int MemoryBufferSizeMB { get; set; } = DefaultMemoryBufferSizeMB;
/// <summary>
/// HTTP client timeout for downloads (in minutes).
/// </summary>
public int TimeoutMinutes { get; set; } = DefaultTimeoutMinutes;
/// <summary>
/// Maximum retry attempts for failed downloads.
/// -1 means not set (use timeout only). When set to a non-negative value,
/// the retry loop exits if either this count or the timeout is reached.
/// </summary>
public int MaxRetries { get; set; } = DefaultMaxRetries;
/// <summary>
/// Maximum time in seconds to retry failed downloads before giving up.
/// Uses exponential backoff with jitter within this time budget.
/// </summary>
public int RetryTimeoutSeconds { get; set; } = DefaultRetryTimeoutSeconds;
/// <summary>
/// Delay between retry attempts (in milliseconds).
/// </summary>
public int RetryDelayMs { get; set; } = DefaultRetryDelayMs;
/// <summary>
/// Maximum attempts to refresh expired URLs.
/// </summary>
public int MaxUrlRefreshAttempts { get; set; } = DefaultMaxUrlRefreshAttempts;
/// <summary>
/// Buffer time before URL expiration to trigger refresh (in seconds).
/// </summary>
public int UrlExpirationBufferSeconds { get; set; } = DefaultUrlExpirationBufferSeconds;
/// <summary>
/// Whether the result data is LZ4 compressed.
/// </summary>
public bool IsLz4Compressed { get; set; }
/// <summary>
/// The Arrow schema for the results.
/// </summary>
public Schema Schema { get; set; }
/// <summary>
/// RecyclableMemoryStreamManager for LZ4 decompression output streams.
/// If not provided, a new instance will be created when needed.
/// For optimal memory pooling, this should be shared from the connection/database level.
/// </summary>
public RecyclableMemoryStreamManager? MemoryStreamManager { get; set; }
/// <summary>
/// ArrayPool for LZ4 decompression buffers.
/// If not provided, ArrayPool<byte>.Shared will be used.
/// For optimal memory pooling, this should be shared from the connection/database level.
/// </summary>
public ArrayPool<byte>? Lz4BufferPool { get; set; }
/// <summary>
/// Creates configuration from connection properties.
/// Works with UNIFIED properties that are shared across ALL protocols (Thrift, REST, future protocols).
/// Same property names (e.g., "adbc.databricks.cloudfetch.parallel_downloads") work for all protocols.
/// </summary>
/// <param name="properties">Connection properties from either Thrift or REST connection.</param>
/// <param name="schema">Arrow schema for the results.</param>
/// <param name="isLz4Compressed">Whether results are LZ4 compressed.</param>
/// <returns>CloudFetch configuration parsed from unified properties.</returns>
public static CloudFetchConfiguration FromProperties(
IReadOnlyDictionary<string, string> properties,
Schema schema,
bool isLz4Compressed)
{
// Parse MaxRetries separately: -1 (default) = not set, >= 0 = explicit limit.
// Throw on non-integer values to surface misconfiguration.
int parsedMaxRetries = DefaultMaxRetries;
if (properties.TryGetValue(DatabricksParameters.CloudFetchMaxRetries, out string? maxRetriesStr))
{
if (!int.TryParse(maxRetriesStr, out int maxRetries) || maxRetries < -1)
{
throw new ArgumentException(
$"Invalid value '{maxRetriesStr}' for {DatabricksParameters.CloudFetchMaxRetries}. " +
$"Expected -1 (no limit) or a non-negative integer.");
}
parsedMaxRetries = maxRetries;
}
var config = new CloudFetchConfiguration
{
Schema = schema ?? throw new ArgumentNullException(nameof(schema)),
IsLz4Compressed = isLz4Compressed,
ParallelDownloads = PropertyHelper.GetPositiveIntPropertyWithValidation(properties, DatabricksParameters.CloudFetchParallelDownloads, DefaultParallelDownloads),
PrefetchCount = PropertyHelper.GetPositiveIntPropertyWithValidation(properties, DatabricksParameters.CloudFetchPrefetchCount, DefaultPrefetchCount),
MemoryBufferSizeMB = PropertyHelper.GetPositiveIntPropertyWithValidation(properties, DatabricksParameters.CloudFetchMemoryBufferSize, DefaultMemoryBufferSizeMB),
TimeoutMinutes = PropertyHelper.GetPositiveIntPropertyWithValidation(properties, DatabricksParameters.CloudFetchTimeoutMinutes, DefaultTimeoutMinutes),
MaxRetries = parsedMaxRetries,
RetryTimeoutSeconds = PropertyHelper.GetPositiveIntPropertyWithValidation(properties, DatabricksParameters.CloudFetchRetryTimeoutSeconds, DefaultRetryTimeoutSeconds),
RetryDelayMs = PropertyHelper.GetPositiveIntPropertyWithValidation(properties, DatabricksParameters.CloudFetchRetryDelayMs, DefaultRetryDelayMs),
MaxUrlRefreshAttempts = PropertyHelper.GetPositiveIntPropertyWithValidation(properties, DatabricksParameters.CloudFetchMaxUrlRefreshAttempts, DefaultMaxUrlRefreshAttempts),
UrlExpirationBufferSeconds = PropertyHelper.GetPositiveIntPropertyWithValidation(properties, DatabricksParameters.CloudFetchUrlExpirationBufferSeconds, DefaultUrlExpirationBufferSeconds)
};
return config;
}
}
}