Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 89 additions & 4 deletions .github/workflows/Build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ concurrency:
jobs:
build:
name: Build
runs-on: ubuntu-latest
runs-on: ubuntu-24.04
env:
CUSTOM_LINKER: mold
CCACHE_BASEDIR: ${{ github.workspace }}
Expand Down Expand Up @@ -63,9 +63,15 @@ jobs:
- name: Build
run: |
GEN=ninja make release -j 4
- name: Test
run: |
GEN=ninja make test

- name: Upload build artifacts
uses: actions/upload-artifact@v4
with:
name: build-release-${{ github.sha }}
path: |
build/release/test/unittest
build/release/src/libduckdb.so*
if-no-files-found: error

- name: Save ccache
if: github.ref == 'refs/heads/main' && success()
Expand All @@ -74,3 +80,82 @@ jobs:
path: |
.ccache
key: ccache-${{ runner.os }}-${{ steps.duckdb-sha.outputs.sha }}-${{ steps.build-sha.outputs.sha }}-${{ github.sha }}

test:
name: Test
runs-on: ubuntu-24.04
needs: build
steps:
- uses: actions/checkout@v5

- name: Download build artifacts
uses: actions/download-artifact@v4
with:
name: build-release-${{ github.sha }}
path: build/release

- name: Ensure test runner is executable
run: |
chmod +x ./build/release/test/unittest

- name: Run tests
run: |
./build/release/test/unittest "test/*"

test_s3:
name: Test S3 (MinIO)
runs-on: ubuntu-24.04
needs: build
steps:
- uses: actions/checkout@v5

- name: Download build artifacts
uses: actions/download-artifact@v4
with:
name: build-release-${{ github.sha }}
path: build/release

- name: Ensure test runner is executable
run: |
chmod +x ./build/release/test/unittest

- name: Start MinIO
run: |
docker run -d --rm \
--name lance-minio \
-p 9000:9000 \
-e MINIO_ROOT_USER=minioadmin \
-e MINIO_ROOT_PASSWORD=minioadmin \
minio/minio:RELEASE.2025-01-20T14-49-07Z \
server /data

- name: Wait for MinIO
run: |
for i in $(seq 1 60); do
if curl -fsS "http://127.0.0.1:9000/minio/health/ready" >/dev/null; then
exit 0
fi
sleep 1
done
echo "MinIO did not become ready" >&2
exit 1

- name: Upload test dataset to MinIO
run: |
curl -fsSL "https://dl.min.io/client/mc/release/linux-amd64/mc" -o mc
chmod +x mc
./mc alias set local http://127.0.0.1:9000 minioadmin minioadmin
./mc mb -p local/lance-test || true
./mc mirror --overwrite test/test_data.lance local/lance-test/test_data.lance

- name: Run S3 tests
env:
LANCE_TEST_S3: "1"
run: |
./build/release/test/unittest "test/*"

- name: Cleanup MinIO
if: always()
run: |
docker logs lance-minio || true
docker rm -f lance-minio || true
12 changes: 12 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,18 @@ SELECT *
LIMIT 10;
```

### S3 authentication via DuckDB Secrets

For `s3://` paths, the extension can use DuckDB's native Secrets mechanism to obtain credentials:

```sql
CREATE SECRET (TYPE S3, provider credential_chain);

SELECT *
FROM 's3://bucket/path/to/dataset.lance'
LIMIT 10;
```

## Install

### Install from DuckDB Community Extensions (recommended)
Expand Down
80 changes: 80 additions & 0 deletions rust/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@

use std::ffi::{c_char, c_void, CStr};
use std::ptr;
use std::collections::HashMap;
use std::sync::Arc;

use arrow::array::{Array, RecordBatch, StructArray};
use arrow::datatypes::{DataType, Schema};
use arrow::ffi::{FFI_ArrowArray, FFI_ArrowSchema};
use lance::Dataset;
use lance::dataset::builder::DatasetBuilder;

mod runtime;
mod scanner;
Expand Down Expand Up @@ -71,6 +73,84 @@ pub unsafe extern "C" fn lance_open_dataset(path: *const c_char) -> *mut c_void
Box::into_raw(handle) as *mut c_void
}

#[no_mangle]
pub unsafe extern "C" fn lance_open_dataset_with_storage_options(
path: *const c_char,
option_keys: *const *const c_char,
option_values: *const *const c_char,
options_len: usize,
) -> *mut c_void {
if path.is_null() {
set_last_error(ErrorCode::InvalidArgument, "path is null");
return ptr::null_mut();
}
if options_len > 0 && (option_keys.is_null() || option_values.is_null()) {
set_last_error(
ErrorCode::InvalidArgument,
"option_keys/option_values is null with non-zero length",
);
return ptr::null_mut();
}

let path_str = match CStr::from_ptr(path).to_str() {
Ok(s) => s,
Err(err) => {
set_last_error(ErrorCode::Utf8, format!("utf8 decode: {err}"));
return ptr::null_mut();
}
};

let mut storage_options = HashMap::<String, String>::new();
for i in 0..options_len {
let key_ptr = *option_keys.add(i);
let value_ptr = *option_values.add(i);
if key_ptr.is_null() || value_ptr.is_null() {
set_last_error(ErrorCode::InvalidArgument, "option key/value is null");
return ptr::null_mut();
}
let key = match CStr::from_ptr(key_ptr).to_str() {
Ok(s) => s,
Err(err) => {
set_last_error(ErrorCode::Utf8, format!("utf8 decode key: {err}"));
return ptr::null_mut();
}
};
let value = match CStr::from_ptr(value_ptr).to_str() {
Ok(s) => s,
Err(err) => {
set_last_error(ErrorCode::Utf8, format!("utf8 decode value: {err}"));
return ptr::null_mut();
}
};
storage_options.insert(key.to_string(), value.to_string());
}

let dataset = match runtime::block_on(async {
DatasetBuilder::from_uri(path_str)
.with_storage_options(storage_options)
.load()
.await
}) {
Ok(Ok(ds)) => Arc::new(ds),
Ok(Err(err)) => {
set_last_error(
ErrorCode::DatasetOpen,
format!("dataset open '{path_str}': {err}"),
);
return ptr::null_mut();
}
Err(err) => {
set_last_error(ErrorCode::Runtime, format!("runtime: {err}"));
return ptr::null_mut();
}
};

let handle = Box::new(DatasetHandle { dataset });
clear_last_error();

Box::into_raw(handle) as *mut c_void
}

#[no_mangle]
pub unsafe extern "C" fn lance_close_dataset(dataset: *mut c_void) {
if !dataset.is_null() {
Expand Down
115 changes: 114 additions & 1 deletion src/lance_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@
#include "duckdb/common/arrow/arrow.hpp"
#include "duckdb/common/exception.hpp"
#include "duckdb/common/string_util.hpp"
#include "duckdb/catalog/catalog_transaction.hpp"
#include "duckdb/execution/expression_executor.hpp"
#include "duckdb/function/table/arrow.hpp"
#include "duckdb/function/table_function.hpp"
#include "duckdb/main/config.hpp"
#include "duckdb/main/extension/extension_loader.hpp"
#include "duckdb/main/secret/secret_manager.hpp"
#include "duckdb/planner/expression/bound_between_expression.hpp"
#include "duckdb/planner/expression/bound_cast_expression.hpp"
#include "duckdb/planner/expression/bound_columnref_expression.hpp"
Expand Down Expand Up @@ -42,6 +44,10 @@
// not call `release` unless the caller initialized them to a valid value.
extern "C" {
void *lance_open_dataset(const char *path);
void *lance_open_dataset_with_storage_options(const char *path,
const char **option_keys,
const char **option_values,
size_t options_len);
void lance_close_dataset(void *dataset);

void *lance_get_schema(void *dataset);
Expand Down Expand Up @@ -100,6 +106,87 @@ static string LanceFormatErrorSuffix() {
return " (Lance error: " + err + ")";
}

static string NormalizeS3Scheme(const string &path) {
if (StringUtil::StartsWith(path, "s3a://")) {
return "s3://" + path.substr(6);
}
if (StringUtil::StartsWith(path, "s3n://")) {
return "s3://" + path.substr(6);
}
return path;
}

static string SecretValueToString(const Value &value) {
if (value.IsNull()) {
return "";
}
return value.ToString();
}

static void AddIfNotEmpty(vector<string> &keys, vector<string> &values,
const string &key, const string &value) {
if (value.empty()) {
return;
}
keys.push_back(key);
values.push_back(value);
}

static void FillS3StorageOptionsFromSecrets(ClientContext &context,
const string &path,
vector<string> &out_keys,
vector<string> &out_values) {
auto &secret_manager = SecretManager::Get(context);
auto transaction = CatalogTransaction::GetSystemCatalogTransaction(context);
auto secret_match = secret_manager.LookupSecret(transaction, path, "s3");
if (!secret_match.HasMatch() || !secret_match.secret_entry ||
!secret_match.secret_entry->secret) {
return;
}

auto *kv_secret = dynamic_cast<const KeyValueSecret *>(
secret_match.secret_entry->secret.get());
if (!kv_secret) {
return;
}

auto key_id = SecretValueToString(kv_secret->TryGetValue("key_id"));
auto secret_access_key =
SecretValueToString(kv_secret->TryGetValue("secret"));
auto session_token =
SecretValueToString(kv_secret->TryGetValue("session_token"));
auto region = SecretValueToString(kv_secret->TryGetValue("region"));
auto endpoint = SecretValueToString(kv_secret->TryGetValue("endpoint"));
auto url_style = SecretValueToString(kv_secret->TryGetValue("url_style"));
auto use_ssl = SecretValueToString(kv_secret->TryGetValue("use_ssl"));

if (key_id.empty() && secret_access_key.empty()) {
AddIfNotEmpty(out_keys, out_values, "skip_signature", "true");
} else {
AddIfNotEmpty(out_keys, out_values, "access_key_id", key_id);
AddIfNotEmpty(out_keys, out_values, "secret_access_key", secret_access_key);
AddIfNotEmpty(out_keys, out_values, "session_token", session_token);
}

AddIfNotEmpty(out_keys, out_values, "region", region);
AddIfNotEmpty(out_keys, out_values, "endpoint", endpoint);

if (StringUtil::CIEquals(url_style, "vhost") ||
StringUtil::CIEquals(url_style, "virtual_hosted")) {
AddIfNotEmpty(out_keys, out_values, "virtual_hosted_style_request", "true");
} else if (StringUtil::CIEquals(url_style, "path")) {
AddIfNotEmpty(out_keys, out_values, "virtual_hosted_style_request",
"false");
}

if (!use_ssl.empty()) {
if (StringUtil::CIEquals(use_ssl, "false") ||
StringUtil::CIEquals(use_ssl, "0")) {
AddIfNotEmpty(out_keys, out_values, "allow_http", "true");
}
}
}

struct LanceScanBindData : public TableFunctionData {
string file_path;
void *dataset = nullptr;
Expand Down Expand Up @@ -565,7 +652,33 @@ static unique_ptr<FunctionData> LanceScanBind(ClientContext &context,
auto result = make_uniq<LanceScanBindData>();
result->file_path = input.inputs[0].GetValue<string>();

result->dataset = lance_open_dataset(result->file_path.c_str());
auto open_path = result->file_path;
vector<string> option_keys;
vector<string> option_values;

if (StringUtil::StartsWith(open_path, "s3://") ||
StringUtil::StartsWith(open_path, "s3a://") ||
StringUtil::StartsWith(open_path, "s3n://")) {
open_path = NormalizeS3Scheme(open_path);
FillS3StorageOptionsFromSecrets(context, open_path, option_keys,
option_values);
}

if (!option_keys.empty()) {
vector<const char *> key_ptrs;
vector<const char *> value_ptrs;
key_ptrs.reserve(option_keys.size());
value_ptrs.reserve(option_values.size());
for (idx_t i = 0; i < option_keys.size(); i++) {
key_ptrs.push_back(option_keys[i].c_str());
value_ptrs.push_back(option_values[i].c_str());
}
result->dataset = lance_open_dataset_with_storage_options(
open_path.c_str(), key_ptrs.data(), value_ptrs.data(),
option_keys.size());
} else {
result->dataset = lance_open_dataset(open_path.c_str());
}
if (!result->dataset) {
throw IOException("Failed to open Lance dataset: " + result->file_path +
LanceFormatErrorSuffix());
Expand Down
Loading