Skip to content

Commit 1855dc0

Browse files
authored
Merge pull request #104 from supabase/tls
Enable TLS when a replicator connects to the database
2 parents dd064ac + 4b6a466 commit 1855dc0

File tree

15 files changed

+247
-21
lines changed

15 files changed

+247
-21
lines changed

Diff for: Cargo.toml

+2
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ prost = { version = "0.13.1", default-features = false }
3232
rand = { version = "0.8.5", default-features = false }
3333
reqwest = { version = "0.12", default-features = false }
3434
rustls = { version = "0.23.12", default-features = false }
35+
rustls-pemfile = { version = "2.2.0", default-features = false }
3536
rustyline = { version = "14.0.0", default-features = false }
3637
secrecy = { version = "0.8.0", default-features = false }
3738
serde = { version = "1.0", default-features = false }
@@ -40,6 +41,7 @@ sqlx = { version = "0.8.2", default-features = false }
4041
thiserror = "1.0"
4142
tokio = { version = "1.38", default-features = false }
4243
tokio-postgres = { git = "https://github.com/imor/rust-postgres", default-features = false, rev = "20265ef38e32a06f76b6f9b678e2077fc2211f6b" }
44+
tokio-postgres-rustls = { git = "https://github.com/imor/tokio-postgres-rustls", default-features = false }
4345
tracing = { version = "0.1", default-features = false }
4446
tracing-actix-web = { version = "0.7", default-features = false }
4547
tracing-bunyan-formatter = { version = "0.3", default-features = false }

Diff for: api/src/k8s_client.rs

+15
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@ pub trait K8sClient {
6060

6161
async fn delete_bq_secret(&self, prefix: &str) -> Result<(), K8sError>;
6262

63+
async fn get_config_map(&self, config_map_name: &str) -> Result<ConfigMap, K8sError>;
64+
6365
async fn create_or_update_config_map(
6466
&self,
6567
prefix: &str,
@@ -95,6 +97,7 @@ const CONFIG_MAP_NAME_SUFFIX: &str = "replicator-config";
9597
const STATEFUL_SET_NAME_SUFFIX: &str = "replicator";
9698
const CONTAINER_NAME_SUFFIX: &str = "replicator";
9799
const NAMESPACE_NAME: &str = "replicator-data-plane";
100+
pub const TRUSTED_ROOT_CERT_CONFIG_MAP_NAME: &str = "trusted-root-certs-config";
98101

99102
impl HttpK8sClient {
100103
pub async fn new() -> Result<HttpK8sClient, K8sError> {
@@ -218,6 +221,18 @@ impl K8sClient for HttpK8sClient {
218221
Ok(())
219222
}
220223

224+
async fn get_config_map(&self, config_map_name: &str) -> Result<ConfigMap, K8sError> {
225+
info!("getting config map");
226+
let config_map = match self.config_maps_api.get(config_map_name).await {
227+
Ok(config_map) => config_map,
228+
Err(e) => {
229+
return Err(e.into());
230+
}
231+
};
232+
info!("got config map");
233+
Ok(config_map)
234+
}
235+
221236
async fn create_or_update_config_map(
222237
&self,
223238
prefix: &str,

Diff for: api/src/replicator_config.rs

+24-2
Original file line numberDiff line numberDiff line change
@@ -89,16 +89,26 @@ pub struct BatchConfig {
8989
pub max_fill_secs: u64,
9090
}
9191

92+
#[derive(Debug, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
93+
pub struct TlsConfig {
94+
/// trusted root certificates in PEM format
95+
pub trusted_root_certs: String,
96+
97+
/// true when TLS is enabled
98+
pub enabled: bool,
99+
}
100+
92101
#[derive(Debug, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
93102
pub struct Config {
94103
pub source: SourceConfig,
95104
pub sink: SinkConfig,
96105
pub batch: BatchConfig,
106+
pub tls: TlsConfig,
97107
}
98108

99109
#[cfg(test)]
100110
mod tests {
101-
use crate::replicator_config::{BatchConfig, Config, SinkConfig, SourceConfig};
111+
use crate::replicator_config::{BatchConfig, Config, SinkConfig, SourceConfig, TlsConfig};
102112

103113
#[test]
104114
pub fn deserialize_settings_test() {
@@ -122,6 +132,10 @@ mod tests {
122132
"batch": {
123133
"max_size": 1000,
124134
"max_fill_secs": 10
135+
},
136+
"tls": {
137+
"trusted_root_certs": "",
138+
"enabled": false
125139
}
126140
}"#;
127141
let actual = serde_json::from_str::<Config>(settings);
@@ -143,6 +157,10 @@ mod tests {
143157
max_size: 1000,
144158
max_fill_secs: 10,
145159
},
160+
tls: TlsConfig {
161+
trusted_root_certs: "".to_string(),
162+
enabled: false,
163+
},
146164
};
147165
assert!(actual.is_ok());
148166
assert_eq!(expected, actual.unwrap());
@@ -168,8 +186,12 @@ mod tests {
168186
max_size: 1000,
169187
max_fill_secs: 10,
170188
},
189+
tls: TlsConfig {
190+
trusted_root_certs: "".to_string(),
191+
enabled: false,
192+
},
171193
};
172-
let expected = r#"{"source":{"postgres":{"host":"localhost","port":5432,"name":"postgres","username":"postgres","slot_name":"replicator_slot","publication":"replicator_publication"}},"sink":{"big_query":{"project_id":"project-id","dataset_id":"dataset-id"}},"batch":{"max_size":1000,"max_fill_secs":10}}"#;
194+
let expected = r#"{"source":{"postgres":{"host":"localhost","port":5432,"name":"postgres","username":"postgres","slot_name":"replicator_slot","publication":"replicator_publication"}},"sink":{"big_query":{"project_id":"project-id","dataset_id":"dataset-id"}},"batch":{"max_size":1000,"max_fill_secs":10},"tls":{"trusted_root_certs":"","enabled":false}}"#;
173195
let actual = serde_json::to_string(&actual);
174196
assert!(actual.is_ok());
175197
assert_eq!(expected, actual.unwrap());

Diff for: api/src/routes/pipelines.rs

+26-4
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use crate::{
2222
sources::{source_exists, Source, SourceConfig, SourcesDbError},
2323
},
2424
encryption::EncryptionKey,
25-
k8s_client::{HttpK8sClient, K8sClient, K8sError, PodPhase},
25+
k8s_client::{HttpK8sClient, K8sClient, K8sError, PodPhase, TRUSTED_ROOT_CERT_CONFIG_MAP_NAME},
2626
replicator_config,
2727
routes::extract_tenant_id,
2828
};
@@ -72,6 +72,9 @@ enum PipelineError {
7272

7373
#[error("sinks db error: {0}")]
7474
SinksDb(#[from] SinksDbError),
75+
76+
#[error("trusted root certs config not found")]
77+
TrustedRootCertsConfigMissing,
7578
}
7679

7780
impl PipelineError {
@@ -95,7 +98,8 @@ impl ResponseError for PipelineError {
9598
| PipelineError::NoDefaultImageFound
9699
| PipelineError::SourcesDb(_)
97100
| PipelineError::SinksDb(_)
98-
| PipelineError::K8sError(_) => StatusCode::INTERNAL_SERVER_ERROR,
101+
| PipelineError::K8sError(_)
102+
| PipelineError::TrustedRootCertsConfigMissing => StatusCode::INTERNAL_SERVER_ERROR,
99103
PipelineError::PipelineNotFound(_) => StatusCode::NOT_FOUND,
100104
PipelineError::TenantId(_)
101105
| PipelineError::SourceNotFound(_)
@@ -362,7 +366,8 @@ pub async fn start_pipeline(
362366
let (pipeline, replicator, image, source, sink) =
363367
read_data(&pool, tenant_id, pipeline_id, &encryption_key).await?;
364368

365-
let (secrets, config) = create_configs(source.config, sink.config, pipeline)?;
369+
let (secrets, config) =
370+
create_configs(&k8s_client, source.config, sink.config, pipeline).await?;
366371
let prefix = create_prefix(tenant_id, replicator.id);
367372

368373
create_or_update_secrets(&k8s_client, &prefix, secrets).await?;
@@ -497,7 +502,8 @@ async fn read_data(
497502
Ok((pipeline, replicator, image, source, sink))
498503
}
499504

500-
fn create_configs(
505+
async fn create_configs(
506+
k8s_client: &Arc<HttpK8sClient>,
501507
source_config: SourceConfig,
502508
sink_config: SinkConfig,
503509
pipeline: Pipeline,
@@ -546,10 +552,26 @@ fn create_configs(
546552
max_fill_secs: batch_config.max_fill_secs,
547553
};
548554

555+
let trusted_root_certs_cm = k8s_client
556+
.get_config_map(TRUSTED_ROOT_CERT_CONFIG_MAP_NAME)
557+
.await?;
558+
let data = trusted_root_certs_cm
559+
.data
560+
.ok_or(PipelineError::TrustedRootCertsConfigMissing)?;
561+
let trusted_root_certs = data
562+
.get("trusted_root_certs")
563+
.ok_or(PipelineError::TrustedRootCertsConfigMissing)?;
564+
565+
let tls_config = replicator_config::TlsConfig {
566+
trusted_root_certs: trusted_root_certs.clone(),
567+
enabled: true,
568+
};
569+
549570
let config = replicator_config::Config {
550571
source: source_config,
551572
sink: sink_config,
552573
batch: batch_config,
574+
tls: tls_config,
553575
};
554576

555577
Ok((secrets, config))

Diff for: pg_replicate/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ tokio-postgres = { workspace = true, features = [
4545
"with-uuid-1",
4646
"with-serde_json-1",
4747
] }
48+
tokio-postgres-rustls = { workspace = true }
4849
tracing = { workspace = true, default-features = true }
4950
uuid = { workspace = true, features = ["v4"] }
5051

Diff for: pg_replicate/examples/bigquery.rs

+5
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use pg_replicate::{
99
PipelineAction,
1010
},
1111
table::TableName,
12+
SslMode,
1213
};
1314
use tracing::error;
1415
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
@@ -129,6 +130,8 @@ async fn main_impl() -> Result<(), Box<dyn Error>> {
129130
&db_args.db_name,
130131
&db_args.db_username,
131132
db_args.db_password,
133+
SslMode::Disable,
134+
vec![],
132135
None,
133136
TableNamesFrom::Vec(table_names),
134137
)
@@ -145,6 +148,8 @@ async fn main_impl() -> Result<(), Box<dyn Error>> {
145148
&db_args.db_name,
146149
&db_args.db_username,
147150
db_args.db_password,
151+
SslMode::Disable,
152+
vec![],
148153
Some(slot_name),
149154
TableNamesFrom::Publication(publication),
150155
)

Diff for: pg_replicate/examples/duckdb.rs

+5
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use pg_replicate::{
99
PipelineAction,
1010
},
1111
table::TableName,
12+
SslMode,
1213
};
1314
use tracing::error;
1415
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
@@ -123,6 +124,8 @@ async fn main_impl() -> Result<(), Box<dyn Error>> {
123124
&db_args.db_name,
124125
&db_args.db_username,
125126
db_args.db_password,
127+
SslMode::Disable,
128+
vec![],
126129
None,
127130
TableNamesFrom::Vec(table_names),
128131
)
@@ -139,6 +142,8 @@ async fn main_impl() -> Result<(), Box<dyn Error>> {
139142
&db_args.db_name,
140143
&db_args.db_username,
141144
db_args.db_password,
145+
SslMode::Disable,
146+
vec![],
142147
Some(slot_name),
143148
TableNamesFrom::Publication(publication),
144149
)

Diff for: pg_replicate/examples/stdout.rs

+5
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use pg_replicate::{
99
PipelineAction,
1010
},
1111
table::TableName,
12+
SslMode,
1213
};
1314
use tracing::error;
1415
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
@@ -99,6 +100,8 @@ async fn main_impl() -> Result<(), Box<dyn Error>> {
99100
&db_args.db_name,
100101
&db_args.db_username,
101102
db_args.db_password,
103+
SslMode::Disable,
104+
vec![],
102105
None,
103106
TableNamesFrom::Vec(table_names),
104107
)
@@ -115,6 +118,8 @@ async fn main_impl() -> Result<(), Box<dyn Error>> {
115118
&db_args.db_name,
116119
&db_args.db_username,
117120
db_args.db_password,
121+
SslMode::Disable,
122+
vec![],
118123
Some(slot_name),
119124
TableNamesFrom::Publication(publication),
120125
)

Diff for: pg_replicate/src/clients/postgres.rs

+59-2
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,14 @@ use std::collections::HashMap;
22

33
use pg_escape::{quote_identifier, quote_literal};
44
use postgres_replication::LogicalReplicationStream;
5+
use rustls::{pki_types::CertificateDer, ClientConfig};
56
use thiserror::Error;
67
use tokio_postgres::{
7-
config::ReplicationMode,
8+
config::{ReplicationMode, SslMode},
89
types::{Kind, PgLsn, Type},
910
Client as PostgresClient, Config, CopyOutStream, NoTls, SimpleQueryMessage,
1011
};
12+
use tokio_postgres_rustls::MakeRustlsConnect;
1113
use tracing::{info, warn};
1214

1315
use crate::table::{ColumnSchema, TableId, TableName, TableSchema};
@@ -53,6 +55,9 @@ pub enum ReplicationClientError {
5355

5456
#[error("failed to create slot")]
5557
FailedToCreateSlot,
58+
59+
#[error("rustls error: {0}")]
60+
RustlsError(#[from] rustls::Error),
5661
}
5762

5863
impl ReplicationClient {
@@ -64,7 +69,7 @@ impl ReplicationClient {
6469
username: &str,
6570
password: Option<String>,
6671
) -> Result<ReplicationClient, ReplicationClientError> {
67-
info!("connecting to postgres");
72+
info!("connecting to postgres without TLS");
6873

6974
let mut config = Config::new();
7075
config
@@ -95,6 +100,58 @@ impl ReplicationClient {
95100
})
96101
}
97102

103+
/// Connect to a postgres database in logical replication mode with TLS
104+
pub async fn connect_tls(
105+
host: &str,
106+
port: u16,
107+
database: &str,
108+
username: &str,
109+
password: Option<String>,
110+
ssl_mode: SslMode,
111+
trusted_root_certs: Vec<CertificateDer<'static>>,
112+
) -> Result<ReplicationClient, ReplicationClientError> {
113+
info!("connecting to postgres with TLS");
114+
115+
let mut config = Config::new();
116+
config
117+
.host(host)
118+
.port(port)
119+
.dbname(database)
120+
.user(username)
121+
.ssl_mode(ssl_mode)
122+
.replication_mode(ReplicationMode::Logical);
123+
124+
if let Some(password) = password {
125+
config.password(password);
126+
}
127+
128+
let mut root_store = rustls::RootCertStore::empty();
129+
for trusted_root_cert in trusted_root_certs {
130+
root_store.add(trusted_root_cert)?;
131+
}
132+
let tls_config = ClientConfig::builder()
133+
.with_root_certificates(root_store)
134+
.with_no_client_auth();
135+
136+
let tls = MakeRustlsConnect::new(tls_config);
137+
138+
let (postgres_client, connection) = config.connect(tls).await?;
139+
140+
tokio::spawn(async move {
141+
info!("waiting for connection to terminate");
142+
if let Err(e) = connection.await {
143+
warn!("connection error: {}", e);
144+
}
145+
});
146+
147+
info!("successfully connected to postgres");
148+
149+
Ok(ReplicationClient {
150+
postgres_client,
151+
in_txn: false,
152+
})
153+
}
154+
98155
/// Starts a read-only trasaction with repeatable read isolation level
99156
pub async fn begin_readonly_transaction(&mut self) -> Result<(), ReplicationClientError> {
100157
self.postgres_client

Diff for: pg_replicate/src/lib.rs

+1
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,4 @@ pub mod clients;
22
pub mod conversions;
33
pub mod pipeline;
44
pub mod table;
5+
pub use tokio_postgres::config::SslMode;

0 commit comments

Comments
 (0)