Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 35 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,9 @@ parquet = "57.0.0"
paste = "1.0.15"
pem = "3.0.4"
percent-encoding = "2.3.1"
pgwire-replication = { version = "0.3", default-features = false, features = ["tls-rustls", "scram"] }
postgres-native-tls = "0.5"
native-tls = "0.2"
pin-project = "1.1"
pingora-lru = "0.7"
postcard = { version = "1.1.3", features = ["use-std"] }
Expand Down
9 changes: 7 additions & 2 deletions crates/data-connectors/connector-postgres/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,19 @@ license.workspace = true
description = "PostgreSQL data connector for Spice.ai runtime"

[dependencies]
async-stream.workspace = true
async-trait.workspace = true
data_components = { path = "../../data_components", features = ["postgres"] }
datafusion.workspace = true
datafusion-table-providers = { workspace = true, features = ["postgres", "postgres-federation"] }
linkme.workspace = true
paste.workspace = true
fundu.workspace = true
futures.workspace = true
opentelemetry.workspace = true
runtime = { path = "../../runtime" }
secrecy.workspace = true
snafu.workspace = true
tokio.workspace = true
tokio-postgres.workspace = true

[features]
default = []
Expand Down
93 changes: 79 additions & 14 deletions crates/data-connectors/connector-postgres/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,10 @@ limitations under the License.
//! `PostgreSQL` data connector for Spice.ai runtime.
//!
//! This crate provides the `PostgreSQL` connector implementation, allowing
//! Spice.ai to connect to `PostgreSQL` databases as data sources.
//!
//! This connector is extracted from the runtime crate to enable faster
//! incremental builds - changes to this connector only require rebuilding
//! this crate, not the entire runtime.
//! Spice.ai to connect to `PostgreSQL` databases as data sources. It also
//! exposes a direct WAL-based `ChangesStream`, so users can set
//! `acceleration.refresh_mode: changes` on a Postgres dataset and get
//! change-by-change replication into the local accelerator without Debezium.

use async_trait::async_trait;
use datafusion::datasource::TableProvider;
Expand All @@ -32,6 +31,7 @@ use datafusion_table_providers::sql::db_connection_pool::{
postgrespool::{self, PostgresConnectionPool},
};
use runtime::component::dataset::Dataset;
use runtime::component::metrics::MetricsProvider;
use runtime::dataconnector::{
ConnectorComponent, ConnectorParams, DataConnector, DataConnectorError, DataConnectorFactory,
DataConnectorResult, NewDataConnectorResult,
Expand All @@ -44,6 +44,8 @@ use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;

mod replication;

#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Unable to create Postgres connection pool: {source}"))]
Expand All @@ -52,7 +54,10 @@ pub enum Error {

/// `PostgreSQL` data connector.
pub struct Postgres {
postgres_factory: PostgresTableFactory,
factory: PostgresTableFactory,
params: runtime::parameters::Parameters,
replication_metrics:
std::sync::Arc<data_components::postgres_replication::ReplicationMetricsCollector>,
}

impl std::fmt::Debug for Postgres {
Expand Down Expand Up @@ -92,6 +97,35 @@ const PARAMETERS: &[ParameterSpec] = &[
ParameterSpec::runtime("connection_pool_size")
.description("The maximum number of connections created in the connection pool.")
.default("5"),
// --- Logical replication (WAL streaming) ---
ParameterSpec::component("replication_slot").description(
"Name of the Postgres replication slot to create/reuse for this dataset. \
Defaults to `spice_<dataset>_<dataset-hash>_<instance-hash>`. Each Spice replica \
MUST have its own unique slot.",
),
ParameterSpec::component("publication").description(
"Name of the Postgres publication to create/reuse for this dataset. \
Defaults to `spice_<dataset>_<dataset-hash>_pub`. Shared across replicas for the \
same dataset.",
),
ParameterSpec::component("replication_initial_snapshot")
.description(
"Whether to take an initial snapshot of the table's existing rows on first \
connection, before streaming WAL changes. Default: true.",
)
.default("true"),
ParameterSpec::component("replication_temporary_slot")
.description(
"If true, create a temporary replication slot that is dropped when the \
Spice process disconnects. Default: false (durable slot).",
)
.default("false"),
ParameterSpec::component("replication_status_interval")
.description(
"How often to send StandbyStatusUpdate to Postgres (e.g. '10s'). \
Default: 10s.",
)
.default("10s"),
];

impl DataConnectorFactory for PostgresFactory {
Expand All @@ -111,15 +145,23 @@ impl DataConnectorFactory for PostgresFactory {
SecretBox::from(format!("Spice.ai {}", env!("CARGO_PKG_VERSION"))),
);

let params_for_replication = params.parameters.clone();

match PostgresConnectionPool::new(param_map).await {
Ok(pool) => {
let unsupported_type_action = params
.unsupported_type_action
.unwrap_or(datafusion_table_providers::UnsupportedTypeAction::String);
let pool = pool.with_unsupported_type_action(unsupported_type_action);

let postgres_factory = PostgresTableFactory::new(Arc::new(pool));
Ok(Arc::new(Postgres { postgres_factory }) as Arc<dyn DataConnector>)
let factory = PostgresTableFactory::new(Arc::new(pool));
Ok(Arc::new(Postgres {
factory,
params: params_for_replication,
replication_metrics:
data_components::postgres_replication::ReplicationMetricsCollector::new(
),
}) as Arc<dyn DataConnector>)
}
Err(e) => match e {
postgrespool::Error::InvalidUsernameOrPassword { .. } => Err(
Expand Down Expand Up @@ -178,7 +220,7 @@ impl DataConnector for Postgres {
dataset: &Dataset,
) -> Option<DataConnectorResult<Arc<dyn TableProvider>>> {
match self
.postgres_factory
.factory
.read_write_table_provider(dataset.path().into())
.await
{
Expand Down Expand Up @@ -224,11 +266,7 @@ impl DataConnector for Postgres {
&self,
dataset: &Dataset,
) -> DataConnectorResult<Arc<dyn TableProvider>> {
match self
.postgres_factory
.table_provider(dataset.path().into())
.await
{
match self.factory.table_provider(dataset.path().into()).await {
Ok(provider) => Ok(provider),
Err(e) => {
if let Some(err_source) = e.source() {
Expand Down Expand Up @@ -266,6 +304,33 @@ impl DataConnector for Postgres {
}
}
}

fn supports_changes_stream(&self) -> bool {
true
}

fn changes_stream(
&self,
federated_table: Arc<runtime::federated_table::FederatedTable>,
dataset: &Dataset,
_accelerated_table_provider: Arc<dyn TableProvider>,
_accelerator_write_mutex: Arc<tokio::sync::Mutex<()>>,
) -> Option<data_components::cdc::ChangesStream> {
Some(replication::build_changes_stream(
&self.params,
dataset,
federated_table,
Arc::clone(&self.replication_metrics),
))
}

fn metrics_provider(&self) -> Option<Arc<dyn MetricsProvider>> {
Some(Arc::new(replication::PostgresMetricsProvider::new(
data_components::postgres_replication::ReplicationMetrics::new(Arc::clone(
&self.replication_metrics,
)),
)))
}
}

/// The name used to identify this connector in configuration.
Expand Down
Loading
Loading