Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 43 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,19 @@ $ cat ~/.aws/config
region = eu-central-1
```

Alternatively, you can use the following environment variables when starting postgres to configure the S3 client:
Alternatively, you can configure AWS credentials using session variables (GUCs) or environment variables:

#### Session Variables (GUCs) - Highest Priority
You can set these within a PostgreSQL session:
```sql
SET pg_parquet.aws_access_key_id = 'AKIA...';
SET pg_parquet.aws_secret_access_key = '...';
SET pg_parquet.aws_session_token = '...'; -- Optional, for temporary credentials
SET pg_parquet.aws_region = 'us-east-1';
SET pg_parquet.aws_endpoint_url = 'https://s3.amazonaws.com';
```

#### Environment Variables - Second Priority
- `AWS_ACCESS_KEY_ID`: the access key ID of the AWS account
- `AWS_SECRET_ACCESS_KEY`: the secret access key of the AWS account
- `AWS_SESSION_TOKEN`: the session token for the AWS account
Expand All @@ -234,8 +246,9 @@ Alternatively, you can use the following environment variables when starting pos
- `AWS_ALLOW_HTTP`: allows http endpoints **(only via environment variables)**

Config source priority order is shown below:
1. Environment variables,
2. Config file.
1. Session variables (GUCs),
2. Environment variables,
3. Config file.

Supported S3 uri formats are shown below:
- s3:// \<bucket\> / \<path\>
Expand Down Expand Up @@ -304,10 +317,36 @@ $ cat ~/.config/gcloud/application_default_credentials.json
}
```

Alternatively, you can use the following environment variables when starting postgres to configure the Google Cloud Storage client:
Alternatively, you can configure Google Cloud Storage credentials using session variables (GUCs) or environment variables:

#### Session Variables (GUCs) - Highest Priority
You can set these within a PostgreSQL session:

**For service account key (JSON string):**
```sql
-- Simple JSON (escape single quotes by doubling them)
SET pg_parquet.google_service_account_key = '{"type": "service_account", "project_id": "my-project"}';

-- Complex JSON with private key (escape single quotes)
SET pg_parquet.google_service_account_key = '{"type": "service_account", "project_id": "my-project", "private_key": "-----BEGIN PRIVATE KEY-----\n...\n-----END PRIVATE KEY-----\n", "client_email": "[email protected]"}';
```

**For service account path:**
```sql
SET pg_parquet.google_service_account_path = '/path/to/service-account-key.json';
```

**Note:** When setting JSON service account keys via GUC, make sure to escape any single quotes (`'`) by doubling them (`''`).

#### Environment Variables - Second Priority
- `GOOGLE_SERVICE_ACCOUNT_KEY`: json serialized service account key **(only via environment variables)**
- `GOOGLE_SERVICE_ACCOUNT_PATH`: an alternative location for the config file **(only via environment variables)**

Config source priority order is shown below:
1. Session variables (GUCs),
2. Environment variables,
3. Default credentials file (`~/.config/gcloud/application_default_credentials.json`).

Supported Google Cloud Storage uri formats are shown below:
- gs:// \<bucket\> / \<path\>

Expand Down
96 changes: 93 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,30 @@
use pgrx::pg_sys::AsPgCStr;
use std::ffi::CStr;
use std::ffi::CString;
use std::sync::LazyLock;

use parquet_copy_hook::hook::{init_parquet_copy_hook, ENABLE_PARQUET_COPY_HOOK};
use parquet_copy_hook::pg_compat::MarkGUCPrefixReserved;
use pgrx::pg_sys::AsPgCStr;
use pgrx::{prelude::*, GucContext, GucFlags, GucRegistry};
use pgrx::{prelude::*, GucContext, GucFlags, GucRegistry, GucSetting};
use tokio::runtime::Runtime;

// AWS Configuration GUCs
pub(crate) static AWS_ACCESS_KEY_ID: GucSetting<Option<CString>> =
GucSetting::<Option<CString>>::new(None);
pub(crate) static AWS_SECRET_ACCESS_KEY: GucSetting<Option<CString>> =
GucSetting::<Option<CString>>::new(None);
pub(crate) static AWS_SESSION_TOKEN: GucSetting<Option<CString>> =
GucSetting::<Option<CString>>::new(None);
pub(crate) static AWS_ENDPOINT_URL: GucSetting<Option<CString>> =
GucSetting::<Option<CString>>::new(None);
pub(crate) static AWS_REGION: GucSetting<Option<CString>> =
GucSetting::<Option<CString>>::new(None);

pub(crate) static GOOGLE_SERVICE_ACCOUNT_KEY: GucSetting<Option<CString>> =
GucSetting::<Option<CString>>::new(None);
pub(crate) static GOOGLE_SERVICE_ACCOUNT_PATH: GucSetting<Option<CString>> =
GucSetting::<Option<CString>>::new(None);

mod arrow_parquet;
mod object_store;
mod parquet_copy_hook;
Expand Down Expand Up @@ -49,7 +67,79 @@ pub extern "C-unwind" fn _PG_init() {
&ENABLE_PARQUET_COPY_HOOK,
GucContext::Userset,
GucFlags::default(),
)
);

// AWS Configuration GUCs
GucRegistry::define_string_guc(
CStr::from_ptr("pg_parquet.aws_access_key_id".as_pg_cstr()),
CStr::from_ptr("AWS Access Key ID for S3 authentication".as_pg_cstr()),
CStr::from_ptr(
"AWS Access Key ID used for authenticating with S3-compatible storage".as_pg_cstr(),
),
&AWS_ACCESS_KEY_ID,
GucContext::Userset,
GucFlags::default(),
);

GucRegistry::define_string_guc(
CStr::from_ptr("pg_parquet.aws_secret_access_key".as_pg_cstr()),
CStr::from_ptr("AWS Secret Access Key for S3 authentication".as_pg_cstr()),
CStr::from_ptr(
"AWS Secret Access Key used for authenticating with S3-compatible storage"
.as_pg_cstr(),
),
&AWS_SECRET_ACCESS_KEY,
GucContext::Userset,
GucFlags::default(),
);

GucRegistry::define_string_guc(
CStr::from_ptr("pg_parquet.aws_session_token".as_pg_cstr()),
CStr::from_ptr("AWS Session Token for S3 authentication".as_pg_cstr()),
CStr::from_ptr(
"AWS Session Token used for temporary credentials with S3-compatible storage"
.as_pg_cstr(),
),
&AWS_SESSION_TOKEN,
GucContext::Userset,
GucFlags::default(),
);

GucRegistry::define_string_guc(
CStr::from_ptr("pg_parquet.aws_endpoint_url".as_pg_cstr()),
CStr::from_ptr("AWS S3 Endpoint URL".as_pg_cstr()),
CStr::from_ptr("Custom endpoint URL for S3-compatible storage services".as_pg_cstr()),
&AWS_ENDPOINT_URL,
GucContext::Userset,
GucFlags::default(),
);

GucRegistry::define_string_guc(
CStr::from_ptr("pg_parquet.aws_region".as_pg_cstr()),
CStr::from_ptr("AWS Region for S3 operations".as_pg_cstr()),
CStr::from_ptr("AWS region for S3 bucket operations".as_pg_cstr()),
&AWS_REGION,
GucContext::Userset,
GucFlags::default(),
);

GucRegistry::define_string_guc(
CStr::from_ptr("pg_parquet.google_service_account_key".as_pg_cstr()),
CStr::from_ptr("Google Service Account Key JSON".as_pg_cstr()),
CStr::from_ptr("Google Cloud service account key used for authentication".as_pg_cstr()),
&GOOGLE_SERVICE_ACCOUNT_KEY,
GucContext::Userset,
GucFlags::default(),
);

GucRegistry::define_string_guc(
CStr::from_ptr("pg_parquet.google_service_account_path".as_pg_cstr()),
CStr::from_ptr("Google Service Account Key Path".as_pg_cstr()),
CStr::from_ptr("Path to Google Cloud service account key file".as_pg_cstr()),
&GOOGLE_SERVICE_ACCOUNT_PATH,
GucContext::Userset,
GucFlags::default(),
);
};

MarkGUCPrefixReserved("pg_parquet");
Expand Down
83 changes: 58 additions & 25 deletions src/object_store/aws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ use aws_credential_types::provider::ProvideCredentials;
use object_store::aws::AmazonS3Builder;
use url::Url;

use crate::PG_BACKEND_TOKIO_RUNTIME;
use crate::{
AWS_ACCESS_KEY_ID, AWS_ENDPOINT_URL, AWS_REGION, AWS_SECRET_ACCESS_KEY, AWS_SESSION_TOKEN,
PG_BACKEND_TOKIO_RUNTIME,
};

use super::object_store_cache::ObjectStoreWithExpiration;

Expand Down Expand Up @@ -115,43 +118,73 @@ struct AwsS3Config {
}

impl AwsS3Config {
// load reads the s3 config from the environment variables first and config files as fallback.
// load reads the s3 config from GUCs first, then environment variables, then config files as fallback.
fn load() -> Self {
let allow_http = if let Ok(allow_http) = std::env::var("AWS_ALLOW_HTTP") {
allow_http.parse().unwrap_or(false)
} else {
false
};

// first tries environment variables and then the config files
let sdk_config = PG_BACKEND_TOKIO_RUNTIME
.block_on(async { aws_config::defaults(BehaviorVersion::latest()).load().await });

let mut access_key_id = None;
let mut secret_access_key = None;
let mut session_token = None;
// Check GUC values first
let mut access_key_id = AWS_ACCESS_KEY_ID.get().map(|s| {
let key = s.to_string_lossy().to_string();
key
});
let mut secret_access_key = AWS_SECRET_ACCESS_KEY
.get()
.map(|s| s.to_string_lossy().to_string());
let mut session_token = AWS_SESSION_TOKEN
.get()
.map(|s| s.to_string_lossy().to_string());
let mut endpoint_url = AWS_ENDPOINT_URL
.get()
.map(|s| s.to_string_lossy().to_string());
let mut region = AWS_REGION.get().map(|s| s.to_string_lossy().to_string());
//ToDo: Add credential expiry handling when using session variables.
let mut expire_at = None;

if let Some(credential_provider) = sdk_config.credentials_provider() {
let cred_res = PG_BACKEND_TOKIO_RUNTIME
.block_on(async { credential_provider.provide_credentials().await });

if let Ok(credentials) = cred_res {
access_key_id = Some(credentials.access_key_id().to_string());
secret_access_key = Some(credentials.secret_access_key().to_string());
session_token = credentials.session_token().map(|t| t.to_string());
expire_at = credentials.expiry();
} else {
pgrx::error!(
"failed to load aws credentials: {:?}",
cred_res.unwrap_err()
);
// If GUCs are not set, fall back to environment variables and config files
if access_key_id.is_none() || secret_access_key.is_none() {
let sdk_config = PG_BACKEND_TOKIO_RUNTIME
.block_on(async { aws_config::defaults(BehaviorVersion::latest()).load().await });

if let Some(credential_provider) = sdk_config.credentials_provider() {
let cred_res = PG_BACKEND_TOKIO_RUNTIME
.block_on(async { credential_provider.provide_credentials().await });

if let Ok(credentials) = cred_res {
if access_key_id.is_none() {
let key = credentials.access_key_id().to_string();
access_key_id = Some(key);
}
if secret_access_key.is_none() {
secret_access_key = Some(credentials.secret_access_key().to_string());
}
if session_token.is_none() {
session_token = credentials.session_token().map(|t| t.to_string());
}
expire_at = credentials.expiry();
} else {
pgrx::error!(
"failed to load aws credentials: {:?}",
cred_res.unwrap_err()
);
}
}
}

let endpoint_url = sdk_config.endpoint_url().map(|u| u.to_string());
if region.is_none() {
let sdk_config = PG_BACKEND_TOKIO_RUNTIME
.block_on(async { aws_config::defaults(BehaviorVersion::latest()).load().await });
region = sdk_config.region().map(|r| r.to_string());
}

let region = sdk_config.region().map(|r| r.as_ref().to_string());
if endpoint_url.is_none() {
if let Ok(env_endpoint) = std::env::var("AWS_ENDPOINT_URL") {
endpoint_url = Some(env_endpoint);
}
}

Self {
region,
Expand Down
11 changes: 10 additions & 1 deletion src/object_store/gcs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ use std::sync::Arc;
use object_store::gcp::GoogleCloudStorageBuilder;
use url::Url;

use crate::GOOGLE_SERVICE_ACCOUNT_KEY;

use super::object_store_cache::ObjectStoreWithExpiration;

// create_gcs_object_store a GoogleCloudStorage object store from given uri.
Expand Down Expand Up @@ -61,8 +63,15 @@ struct GoogleStorageConfig {
impl GoogleStorageConfig {
// load loads the Google Storage configuration from the environment.
fn load() -> Self {
let mut key = GOOGLE_SERVICE_ACCOUNT_KEY
.get()
.map(|s| s.to_string_lossy().to_string());

if key.is_none() {
key = std::env::var("GOOGLE_SERVICE_ACCOUNT_KEY").ok();
}
Self {
service_account_key: std::env::var("GOOGLE_SERVICE_ACCOUNT_KEY").ok(),
service_account_key: key,
service_account_path: std::env::var("GOOGLE_SERVICE_ACCOUNT_PATH").ok(),
}
}
Expand Down
Loading
Loading