Skip to content

Commit a0dfd18

Browse files
Add PasswordProvider trait for dynamic Postgres credentials (#548)
* add password provider to postgres connector * Fix clippy lint for PasswordProvider PR * Use SecretString::clone in StaticPasswordProvider --------- Co-authored-by: Phillip LeBlanc <phillip@leblanc.tech>
1 parent 692138f commit a0dfd18

6 files changed

Lines changed: 421 additions & 80 deletions

File tree

core/src/postgres.rs

Lines changed: 6 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use crate::sql::arrow_sql_gen::statement::{
22
CreateTableBuilder, Error as SqlGenError, IndexBuilder, InsertBuilder,
33
};
4+
use crate::sql::db_connection_pool::dbconnection::postgresconn::PostgresPooledConnection;
45
use crate::sql::db_connection_pool::{
56
self,
67
dbconnection::{postgresconn::PostgresConnection, DbConnection},
@@ -15,10 +16,7 @@ use arrow::{
1516
datatypes::{Schema, SchemaRef},
1617
};
1718
use async_trait::async_trait;
18-
use bb8_postgres::{
19-
tokio_postgres::{types::ToSql, Transaction},
20-
PostgresConnectionManager,
21-
};
19+
use bb8_postgres::tokio_postgres::{types::ToSql, Transaction};
2220
use datafusion::catalog::Session;
2321
use datafusion::sql::unparser::dialect::PostgreSqlDialect;
2422
use datafusion::{
@@ -29,7 +27,6 @@ use datafusion::{
2927
logical_expr::CreateExternalTable,
3028
sql::TableReference,
3129
};
32-
use postgres_native_tls::MakeTlsConnector;
3330
use snafu::prelude::*;
3431
use std::{collections::HashMap, sync::Arc};
3532

@@ -47,15 +44,10 @@ use self::write::PostgresTableWriter;
4744

4845
pub mod write;
4946

50-
pub type DynPostgresConnectionPool = dyn DbConnectionPool<
51-
bb8::PooledConnection<'static, PostgresConnectionManager<MakeTlsConnector>>,
52-
&'static (dyn ToSql + Sync),
53-
> + Send
54-
+ Sync;
55-
pub type DynPostgresConnection = dyn DbConnection<
56-
bb8::PooledConnection<'static, PostgresConnectionManager<MakeTlsConnector>>,
57-
&'static (dyn ToSql + Sync),
58-
>;
47+
pub type DynPostgresConnectionPool =
48+
dyn DbConnectionPool<PostgresPooledConnection, &'static (dyn ToSql + Sync)> + Send + Sync;
49+
pub type DynPostgresConnection =
50+
dyn DbConnection<PostgresPooledConnection, &'static (dyn ToSql + Sync)>;
5951

6052
#[derive(Debug, Snafu)]
6153
pub enum Error {

core/src/sql/db_connection_pool/dbconnection/postgresconn.rs

Lines changed: 16 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use std::sync::Arc;
55
use crate::sql::arrow_sql_gen::postgres::rows_to_arrow;
66
use crate::sql::arrow_sql_gen::postgres::schema::pg_data_type_to_arrow_type;
77
use crate::sql::arrow_sql_gen::postgres::schema::ParseContext;
8+
use crate::sql::db_connection_pool::postgrespool::ConnectionManager;
89
use crate::util::handle_unsupported_type_error;
910
use crate::util::schema::SchemaValidator;
1011
use arrow::datatypes::Field;
@@ -13,14 +14,21 @@ use arrow::datatypes::SchemaRef;
1314
use arrow_schema::DataType;
1415
use async_stream::stream;
1516
use bb8_postgres::tokio_postgres::types::ToSql;
16-
use bb8_postgres::PostgresConnectionManager;
17+
18+
/// A pooled Postgres connection obtained from a [`PostgresConnectionPool`](crate::sql::db_connection_pool::postgrespool::PostgresConnectionPool).
19+
///
20+
/// Dereferences to [`tokio_postgres::Client`](bb8_postgres::tokio_postgres::Client) for executing queries.
21+
// Defined here rather than in `postgrespool` to avoid a type-resolution cycle
22+
// between the two modules (postgrespool imports PostgresConnection, which uses
23+
// this alias).
24+
pub type PostgresPooledConnection = bb8::PooledConnection<'static, ConnectionManager>;
1725
use datafusion::error::DataFusionError;
1826
use datafusion::execution::SendableRecordBatchStream;
1927
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
2028
use datafusion::sql::TableReference;
2129
use futures::stream;
2230
use futures::StreamExt;
23-
use postgres_native_tls::MakeTlsConnector;
31+
2432
use snafu::prelude::*;
2533

2634
use crate::UnsupportedTypeAction;
@@ -132,7 +140,7 @@ pub enum PostgresError {
132140
}
133141

134142
pub struct PostgresConnection {
135-
pub conn: bb8::PooledConnection<'static, PostgresConnectionManager<MakeTlsConnector>>,
143+
pub conn: PostgresPooledConnection,
136144
unsupported_type_action: UnsupportedTypeAction,
137145
}
138146

@@ -151,12 +159,7 @@ impl SchemaValidator for PostgresConnection {
151159
}
152160
}
153161

154-
impl<'a>
155-
DbConnection<
156-
bb8::PooledConnection<'static, PostgresConnectionManager<MakeTlsConnector>>,
157-
&'a (dyn ToSql + Sync),
158-
> for PostgresConnection
159-
{
162+
impl<'a> DbConnection<PostgresPooledConnection, &'a (dyn ToSql + Sync)> for PostgresConnection {
160163
fn as_any(&self) -> &dyn Any {
161164
self
162165
}
@@ -167,26 +170,16 @@ impl<'a>
167170

168171
fn as_async(
169172
&self,
170-
) -> Option<
171-
&dyn AsyncDbConnection<
172-
bb8::PooledConnection<'static, PostgresConnectionManager<MakeTlsConnector>>,
173-
&'a (dyn ToSql + Sync),
174-
>,
175-
> {
173+
) -> Option<&dyn AsyncDbConnection<PostgresPooledConnection, &'a (dyn ToSql + Sync)>> {
176174
Some(self)
177175
}
178176
}
179177

180178
#[async_trait::async_trait]
181-
impl<'a>
182-
AsyncDbConnection<
183-
bb8::PooledConnection<'static, PostgresConnectionManager<MakeTlsConnector>>,
184-
&'a (dyn ToSql + Sync),
185-
> for PostgresConnection
179+
impl<'a> AsyncDbConnection<PostgresPooledConnection, &'a (dyn ToSql + Sync)>
180+
for PostgresConnection
186181
{
187-
fn new(
188-
conn: bb8::PooledConnection<'static, PostgresConnectionManager<MakeTlsConnector>>,
189-
) -> Self {
182+
fn new(conn: PostgresPooledConnection) -> Self {
190183
PostgresConnection {
191184
conn,
192185
unsupported_type_action: UnsupportedTypeAction::default(),

core/src/sql/db_connection_pool/mod.rs

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use async_trait::async_trait;
22
use dbconnection::DbConnection;
3+
use secrecy::SecretString;
34
use std::sync::Arc;
45

56
pub mod dbconnection;
@@ -23,6 +24,41 @@ pub mod sqlitepool;
2324
pub type Error = Box<dyn std::error::Error + Send + Sync>;
2425
type Result<T, E = Error> = std::result::Result<T, E>;
2526

27+
/// A trait for providing passwords dynamically to database connection pools.
28+
///
29+
/// Implementations can fetch credentials from secret managers, IAM services,
30+
/// or other dynamic sources. Called each time a new connection is created in the pool.
31+
///
32+
/// Implementations that cache or rate-limit credentials should use interior
33+
/// mutability (e.g., `tokio::sync::RwLock`) since the trait requires `&self`.
34+
#[async_trait]
35+
pub trait PasswordProvider: Send + Sync {
36+
/// Returns the current password/token for authentication.
37+
/// Called each time a new connection is created in the pool.
38+
async fn get_password(&self) -> Result<SecretString>;
39+
}
40+
41+
/// A password provider that always returns the same static password.
42+
///
43+
/// This is the default provider used by [`PostgresConnectionPool::new()`](crate::sql::db_connection_pool::postgrespool::PostgresConnectionPool::new)
44+
/// when a `pass` parameter is supplied. It can also be used explicitly with
45+
/// [`new_with_password_provider()`](crate::sql::db_connection_pool::postgrespool::PostgresConnectionPool::new_with_password_provider).
46+
pub struct StaticPasswordProvider(SecretString);
47+
48+
impl StaticPasswordProvider {
49+
/// Creates a new `StaticPasswordProvider` with the given password.
50+
pub fn new(password: SecretString) -> Self {
51+
Self(password)
52+
}
53+
}
54+
55+
#[async_trait]
56+
impl PasswordProvider for StaticPasswordProvider {
57+
async fn get_password(&self) -> Result<SecretString> {
58+
Ok(self.0.clone())
59+
}
60+
}
61+
2662
/// Controls whether join pushdown is allowed, and under what conditions
2763
#[derive(Clone, Debug, PartialEq, Eq)]
2864
pub enum JoinPushDown {

0 commit comments

Comments
 (0)