Skip to content

Commit e437f06

Browse files
authored
refactor: Cleanup duplicated logic in DELETE (#73)
1 parent 89d1b5e commit e437f06

7 files changed

Lines changed: 155 additions & 224 deletions

File tree

src/include/lance_common.hpp

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,29 @@ void LanceFillS3StorageOptionsFromSecrets(ClientContext &context,
1919
const string &path,
2020
vector<string> &out_keys,
2121
vector<string> &out_values);
22+
void ResolveLanceStorageOptions(ClientContext &context, const string &path,
23+
string &out_open_path,
24+
vector<string> &out_option_keys,
25+
vector<string> &out_option_values);
26+
void BuildStorageOptionPointerArrays(const vector<string> &option_keys,
27+
const vector<string> &option_values,
28+
vector<const char *> &out_key_ptrs,
29+
vector<const char *> &out_value_ptrs);
30+
31+
static constexpr uint64_t LANCE_DEFAULT_MAX_ROWS_PER_FILE = 1024ULL * 1024ULL;
32+
static constexpr uint64_t LANCE_DEFAULT_MAX_ROWS_PER_GROUP = 1024ULL;
33+
static constexpr uint64_t LANCE_DEFAULT_MAX_BYTES_PER_FILE =
34+
90ULL * 1024ULL * 1024ULL * 1024ULL;
35+
2236
void ResolveLanceNamespaceAuth(ClientContext &context, const string &endpoint,
2337
const unordered_map<string, Value> &options,
2438
string &out_bearer_token, string &out_api_key);
2539
void ResolveLanceNamespaceAuth(ClientContext &context, const string &endpoint,
2640
const named_parameter_map_t &options,
2741
string &out_bearer_token, string &out_api_key);
42+
void ResolveLanceNamespaceAuthOverrides(
43+
const unordered_map<string, Value> &options, string &out_bearer_token,
44+
string &out_api_key);
2845

2946
bool TryLanceNamespaceListTables(ClientContext &context, const string &endpoint,
3047
const string &namespace_id,

src/lance_common.cpp

Lines changed: 57 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ void ResolveLanceNamespaceAuth(ClientContext &context, const string &endpoint,
171171
string &out_bearer_token, string &out_api_key) {
172172
FillLanceNamespaceAuthFromSecrets(context, endpoint, out_bearer_token,
173173
out_api_key);
174-
(void)options;
174+
ResolveLanceNamespaceAuthOverrides(options, out_bearer_token, out_api_key);
175175
}
176176

177177
void ResolveLanceNamespaceAuth(ClientContext &context, const string &endpoint,
@@ -193,23 +193,57 @@ void ResolveLanceNamespaceAuth(ClientContext &context, const string &endpoint,
193193
}
194194
}
195195

196-
static void ResolveStorageOptions(ClientContext &context, const string &path,
197-
string &out_open_path,
198-
vector<string> &out_keys,
199-
vector<string> &out_values) {
196+
void ResolveLanceNamespaceAuthOverrides(
197+
const unordered_map<string, Value> &options, string &out_bearer_token,
198+
string &out_api_key) {
199+
for (auto &kv : options) {
200+
if (StringUtil::CIEquals(kv.first, "token")) {
201+
ApplyAuthOverrideValue(kv.second, out_bearer_token);
202+
continue;
203+
}
204+
if (StringUtil::CIEquals(kv.first, "bearer_token")) {
205+
ApplyAuthOverrideValue(kv.second, out_bearer_token);
206+
continue;
207+
}
208+
if (StringUtil::CIEquals(kv.first, "api_key")) {
209+
ApplyAuthOverrideValue(kv.second, out_api_key);
210+
continue;
211+
}
212+
}
213+
}
214+
215+
void ResolveLanceStorageOptions(ClientContext &context, const string &path,
216+
string &out_open_path, vector<string> &out_keys,
217+
vector<string> &out_values) {
200218
out_open_path = path;
201219
out_keys.clear();
202220
out_values.clear();
203221

204-
if (StringUtil::StartsWith(out_open_path, "s3://") ||
205-
StringUtil::StartsWith(out_open_path, "s3a://") ||
206-
StringUtil::StartsWith(out_open_path, "s3n://")) {
207-
out_open_path = LanceNormalizeS3Scheme(out_open_path);
222+
out_open_path = LanceNormalizeS3Scheme(out_open_path);
223+
if (StringUtil::StartsWith(out_open_path, "s3://")) {
208224
LanceFillS3StorageOptionsFromSecrets(context, out_open_path, out_keys,
209225
out_values);
210226
}
211227
}
212228

229+
void BuildStorageOptionPointerArrays(const vector<string> &option_keys,
230+
const vector<string> &option_values,
231+
vector<const char *> &out_key_ptrs,
232+
vector<const char *> &out_value_ptrs) {
233+
if (option_keys.size() != option_values.size()) {
234+
throw InternalException(
235+
"Storage option keys/values size mismatch for Lance");
236+
}
237+
out_key_ptrs.clear();
238+
out_value_ptrs.clear();
239+
out_key_ptrs.reserve(option_keys.size());
240+
out_value_ptrs.reserve(option_values.size());
241+
for (idx_t i = 0; i < option_keys.size(); i++) {
242+
out_key_ptrs.push_back(option_keys[i].c_str());
243+
out_value_ptrs.push_back(option_values[i].c_str());
244+
}
245+
}
246+
213247
bool TryLanceNamespaceListTables(ClientContext &context, const string &endpoint,
214248
const string &namespace_id,
215249
const string &bearer_token,
@@ -255,16 +289,13 @@ bool TryLanceDirNamespaceListTables(ClientContext &context, const string &root,
255289
string open_root;
256290
vector<string> option_keys;
257291
vector<string> option_values;
258-
ResolveStorageOptions(context, root, open_root, option_keys, option_values);
292+
ResolveLanceStorageOptions(context, root, open_root, option_keys,
293+
option_values);
259294

260295
vector<const char *> key_ptrs;
261296
vector<const char *> value_ptrs;
262-
key_ptrs.reserve(option_keys.size());
263-
value_ptrs.reserve(option_values.size());
264-
for (idx_t i = 0; i < option_keys.size(); i++) {
265-
key_ptrs.push_back(option_keys[i].c_str());
266-
value_ptrs.push_back(option_values[i].c_str());
267-
}
297+
BuildStorageOptionPointerArrays(option_keys, option_values, key_ptrs,
298+
value_ptrs);
268299

269300
auto *ptr = lance_dir_namespace_list_tables(
270301
open_root.c_str(), key_ptrs.empty() ? nullptr : key_ptrs.data(),
@@ -316,46 +347,34 @@ void *LanceOpenDataset(ClientContext &context, const string &path) {
316347
string open_path;
317348
vector<string> option_keys;
318349
vector<string> option_values;
319-
ResolveStorageOptions(context, path, open_path, option_keys, option_values);
350+
ResolveLanceStorageOptions(context, path, open_path, option_keys,
351+
option_values);
320352

321353
if (option_keys.empty()) {
322354
return lance_open_dataset(open_path.c_str());
323355
}
324356

325357
vector<const char *> key_ptrs;
326358
vector<const char *> value_ptrs;
327-
key_ptrs.reserve(option_keys.size());
328-
value_ptrs.reserve(option_values.size());
329-
for (idx_t i = 0; i < option_keys.size(); i++) {
330-
key_ptrs.push_back(option_keys[i].c_str());
331-
value_ptrs.push_back(option_values[i].c_str());
332-
}
359+
BuildStorageOptionPointerArrays(option_keys, option_values, key_ptrs,
360+
value_ptrs);
333361
return lance_open_dataset_with_storage_options(
334362
open_path.c_str(), key_ptrs.data(), value_ptrs.data(),
335363
option_keys.size());
336364
}
337365

338-
static constexpr uint64_t DEFAULT_MAX_ROWS_PER_FILE = 1024ULL * 1024ULL;
339-
static constexpr uint64_t DEFAULT_MAX_ROWS_PER_GROUP = 1024ULL;
340-
static constexpr uint64_t DEFAULT_MAX_BYTES_PER_FILE =
341-
90ULL * 1024ULL * 1024ULL * 1024ULL;
342-
343366
int64_t LanceTruncateDataset(ClientContext &context,
344367
const string &dataset_uri) {
345368
string open_path;
346369
vector<string> option_keys;
347370
vector<string> option_values;
348-
ResolveStorageOptions(context, dataset_uri, open_path, option_keys,
349-
option_values);
371+
ResolveLanceStorageOptions(context, dataset_uri, open_path, option_keys,
372+
option_values);
350373

351374
vector<const char *> key_ptrs;
352375
vector<const char *> value_ptrs;
353-
key_ptrs.reserve(option_keys.size());
354-
value_ptrs.reserve(option_values.size());
355-
for (idx_t i = 0; i < option_keys.size(); i++) {
356-
key_ptrs.push_back(option_keys[i].c_str());
357-
value_ptrs.push_back(option_values[i].c_str());
358-
}
376+
BuildStorageOptionPointerArrays(option_keys, option_values, key_ptrs,
377+
value_ptrs);
359378

360379
void *dataset = nullptr;
361380
if (option_keys.empty()) {
@@ -401,8 +420,8 @@ int64_t LanceTruncateDataset(ClientContext &context,
401420
open_path.c_str(), "overwrite",
402421
key_ptrs.empty() ? nullptr : key_ptrs.data(),
403422
value_ptrs.empty() ? nullptr : value_ptrs.data(), option_keys.size(),
404-
DEFAULT_MAX_ROWS_PER_FILE, DEFAULT_MAX_ROWS_PER_GROUP,
405-
DEFAULT_MAX_BYTES_PER_FILE, &schema_root.arrow_schema);
423+
LANCE_DEFAULT_MAX_ROWS_PER_FILE, LANCE_DEFAULT_MAX_ROWS_PER_GROUP,
424+
LANCE_DEFAULT_MAX_BYTES_PER_FILE, &schema_root.arrow_schema);
406425
if (!writer) {
407426
throw IOException("Failed to open Lance writer: " + open_path +
408427
LanceFormatErrorSuffix());

src/lance_insert.cpp

Lines changed: 7 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,6 @@
1515

1616
namespace duckdb {
1717

18-
static constexpr uint64_t DEFAULT_MAX_ROWS_PER_FILE = 1024ULL * 1024ULL;
19-
static constexpr uint64_t DEFAULT_MAX_ROWS_PER_GROUP = 1024ULL;
20-
static constexpr uint64_t DEFAULT_MAX_BYTES_PER_FILE =
21-
90ULL * 1024ULL * 1024ULL * 1024ULL;
22-
2318
struct LanceInsertGlobalState : public GlobalSinkState {
2419
explicit LanceInsertGlobalState(string dataset_uri_p,
2520
vector<string> column_names_p,
@@ -102,31 +97,21 @@ class PhysicalLanceInsert final : public PhysicalOperator {
10297
gstate.column_types, gstate.column_names,
10398
props);
10499

105-
gstate.open_path = gstate.dataset_uri;
106-
if (StringUtil::StartsWith(gstate.open_path, "s3://") ||
107-
StringUtil::StartsWith(gstate.open_path, "s3a://") ||
108-
StringUtil::StartsWith(gstate.open_path, "s3n://")) {
109-
gstate.open_path = LanceNormalizeS3Scheme(gstate.open_path);
110-
LanceFillS3StorageOptionsFromSecrets(context.client, gstate.open_path,
111-
gstate.option_keys,
112-
gstate.option_values);
113-
}
100+
ResolveLanceStorageOptions(context.client, gstate.dataset_uri,
101+
gstate.open_path, gstate.option_keys,
102+
gstate.option_values);
114103

115104
vector<const char *> key_ptrs;
116105
vector<const char *> value_ptrs;
117-
key_ptrs.reserve(gstate.option_keys.size());
118-
value_ptrs.reserve(gstate.option_values.size());
119-
for (idx_t i = 0; i < gstate.option_keys.size(); i++) {
120-
key_ptrs.push_back(gstate.option_keys[i].c_str());
121-
value_ptrs.push_back(gstate.option_values[i].c_str());
122-
}
106+
BuildStorageOptionPointerArrays(gstate.option_keys, gstate.option_values,
107+
key_ptrs, value_ptrs);
123108

124109
gstate.writer = lance_open_uncommitted_writer_with_storage_options(
125110
gstate.open_path.c_str(), "append",
126111
key_ptrs.empty() ? nullptr : key_ptrs.data(),
127112
value_ptrs.empty() ? nullptr : value_ptrs.data(),
128-
gstate.option_keys.size(), DEFAULT_MAX_ROWS_PER_FILE,
129-
DEFAULT_MAX_ROWS_PER_GROUP, DEFAULT_MAX_BYTES_PER_FILE,
113+
gstate.option_keys.size(), LANCE_DEFAULT_MAX_ROWS_PER_FILE,
114+
LANCE_DEFAULT_MAX_ROWS_PER_GROUP, LANCE_DEFAULT_MAX_BYTES_PER_FILE,
130115
&gstate.schema_root.arrow_schema);
131116
if (!gstate.writer) {
132117
throw IOException("Failed to open Lance writer: " + gstate.open_path +

0 commit comments

Comments
 (0)