diff --git a/ci/scripts/e2e-sink-test.sh b/ci/scripts/e2e-sink-test.sh index e915d68d3a4af..b30f17cd7f44f 100755 --- a/ci/scripts/e2e-sink-test.sh +++ b/ci/scripts/e2e-sink-test.sh @@ -66,7 +66,7 @@ echo "--- test sink: jdbc:postgres switch to postgres native" # check sink destination postgres risedev slt './e2e_test/sink/remote/jdbc.load.slt' sleep 1 -sqllogictest -h db -p 5432 -d test './e2e_test/sink/remote/jdbc.check.pg.slt' +sqllogictest -h db -p 5432 -d test './e2e_test/sink/remote/jdbc.check.pg.slt' --label 'pg-native' sleep 1 echo "--- killing risingwave cluster: ci-1cn-1fe-switch-to-pg-native" @@ -105,7 +105,7 @@ echo "--- testing remote sinks" # check sink destination postgres risedev slt './e2e_test/sink/remote/jdbc.load.slt' sleep 1 -sqllogictest -h db -p 5432 -d test './e2e_test/sink/remote/jdbc.check.pg.slt' +sqllogictest -h db -p 5432 -d test './e2e_test/sink/remote/jdbc.check.pg.slt' --label 'jdbc' sleep 1 # check sink destination mysql using shell diff --git a/e2e_test/sink/postgres_sink.slt b/e2e_test/sink/postgres_sink.slt index 10ce6952a4508..0bcbdfade396d 100644 --- a/e2e_test/sink/postgres_sink.slt +++ b/e2e_test/sink/postgres_sink.slt @@ -269,8 +269,62 @@ select * from postgres_query('$PGHOST', '$PGPORT', '$PGUSER', '${PGPASSWORD:post 1 Varcharvalue1 Textvalue1 123 456 789 12.34 56.78 90.12 t 2023-05-22 12:34:56 2023-05-22 12:34:56 1 day {"key": "value"} 2 Varcharvalue4 Textvalue2 234 400 890 NaN 67.89 1.23 f 2023-05-23 23:45:01 2023-05-23 23:45:01 2 days {"key": "value2"} +################### test duplicate inserts should work + +statement ok +CREATE TABLE rw_types_table_dup ( + id BIGINT PRIMARY KEY, + varchar_column VARCHAR, + text_column TEXT, + smallint_column SMALLINT, + integer_column INTEGER, + bigint_column BIGINT, + decimal_column DECIMAL, + real_column REAL, + double_column DOUBLE PRECISION, + boolean_column BOOLEAN, + date_column DATE, + time_column TIME, + interval_column INTERVAL, + jsonb_column JSONB, + timestamp_column TIMESTAMP +); + +statement ok +INSERT INTO rw_types_table (id, varchar_column, text_column, integer_column, smallint_column, bigint_column, decimal_column, real_column, double_column, boolean_column, date_column, time_column, timestamp_column, interval_column, jsonb_column) VALUES + (1, 'Varcharvalue4', 'Textvalue1', 123, 456, 789, 12.34, 56.78, 90.12, TRUE, '2023-05-22', '12:34:56', '2023-05-22 12:34:56', '1 day', '{"key": "value"}'), + (2, 'Varcharvalue6', 'Textvalue2', 234, 567, 890, 'NAN'::decimal, 67.89, 01.23, FALSE, '2023-05-23', '23:45:01', '2023-05-23 23:45:01', '2 days', '{"key": "value2"}'); + +statement ok +flush; + +statement ok +CREATE SINK postgres_rw_types_sink_dup FROM rw_types_table_dup WITH ( + connector='postgres', + host='$PGHOST', + port='$PGPORT', + user='$PGUSER', + password='$PGPASSWORD', + database='sink_test', + table='pg_types_table', + type='upsert', + primary_key='id', +); + +query I +select * from postgres_query('$PGHOST', '$PGPORT', '$PGUSER', '${PGPASSWORD:postgres}', 'sink_test', 'select * from pg_types_table order by id;'); +---- +1 Varcharvalue4 Textvalue1 123 456 789 12.34 56.78 90.12 t 2023-05-22 12:34:56 2023-05-22 12:34:56 1 day {"key": "value"} +2 Varcharvalue6 Textvalue2 234 567 890 NaN 67.89 1.23 f 2023-05-23 23:45:01 2023-05-23 23:45:01 2 days {"key": "value2"} + ################### cleanup sink +statement ok +DROP SINK postgres_rw_types_sink_dup; + +statement ok +DROP TABLE rw_types_table_dup; + statement ok DROP SINK postgres_rw_types_sink; diff --git a/e2e_test/sink/remote/jdbc.check.pg.slt b/e2e_test/sink/remote/jdbc.check.pg.slt index 1ec8c827d939b..46c76dbf6289c 100644 --- a/e2e_test/sink/remote/jdbc.check.pg.slt +++ b/e2e_test/sink/remote/jdbc.check.pg.slt @@ -18,7 +18,15 @@ select * from t_remote_1 order by id; 5 Varchar value 5 Text value 5 567 890 123 56.78 90.12 34.56 t 2023-05-26 12:34:56 2023-05-26 12:34:56 2023-05-26 12:34:56+00 2 years 3 mons 4 days 05:06:07 {"key": "value5"} \xdeadbabe 6 Varchar value 6 Text value 6 789 123 456 67.89 34.56 78.91 f 2023-05-27 23:45:01 2023-05-27 23:45:01 2023-05-27 23:45:01+00 2 years 3 mons 4 days 05:06:07 {"key": "value6"} \xdeadbabe +onlyif pg-native +query III +select * from biz.t_types order by id; +---- +1 Varcharvalue1 Textvalue1 123 456 789 12.34 56.78 90.12 t 2023-05-22 12:34:56 2023-05-22 12:34:56 1 day {"key": "value"} {"Value 1","Value 2"} {12.345,56.789} {1,2,3} {1,2,3} {1,2,3} {12.3,56.7} +2 Varcharvalue2 Textvalue2 234 567 890 NaN 67.89 1.23 f 2023-05-23 23:45:01 2023-05-23 23:45:01 2 days {"key": "value2"} {"Value 3","Value 4"} {43.21,65.432} {4,5,6} {4,5,6} {4,5,6} {43.2,65.4} +3 Varcharvalue1 Textvalue1 123 456 789 Infinity 56.78 90.12 t 2023-05-22 12:34:56 2023-05-22 12:34:56 1 day {"key": "value"} {"How're you?","\"hello\\ \\world\""} {12.345,56.789} {1,2,3} {1,2,3} {1,2,3} {43.2,65.4} +onlyif jdbc query III select * from biz.t_types order by id; ---- diff --git a/src/config/ci-jdbc-to-native.toml b/src/config/ci-jdbc-to-native.toml index 368d2f0760b4c..1f4e328388f43 100644 --- a/src/config/ci-jdbc-to-native.toml +++ b/src/config/ci-jdbc-to-native.toml @@ -12,7 +12,7 @@ in_flight_barrier_nums = 10 [streaming.developer] stream_exchange_concurrent_barriers = 10 -switch_jdbc_pg_to_native = true +stream_switch_jdbc_pg_to_native = true [storage] imm_merge_threshold = 2 diff --git a/src/connector/src/sink/postgres.rs b/src/connector/src/sink/postgres.rs index a03b536521fe1..e9c8ff9b85ee7 100644 --- a/src/connector/src/sink/postgres.rs +++ b/src/connector/src/sink/postgres.rs @@ -14,7 +14,7 @@ use std::collections::{BTreeMap, HashSet}; -use anyhow::anyhow; +use anyhow::{Context, anyhow}; use async_trait::async_trait; use itertools::Itertools; use phf::phf_set; @@ -169,7 +169,11 @@ impl Sink for PostgresSink { &self.config.ssl_root_cert, self.is_append_only, ) - .await?; + .await + .context(format!( + "failed to connect to database: {}, schema: {}, table: {}", + &self.config.database, &self.config.schema, &self.config.table + ))?; // Check that names and types match, order of columns doesn't matter. { @@ -264,6 +268,8 @@ struct ParameterBuffer<'a> { estimated_parameter_size: usize, /// current parameter buffer to be filled. current_parameter_buffer: Vec>, + /// Parameter upper bound + parameter_upper_bound: usize, } impl<'a> ParameterBuffer<'a> { @@ -273,19 +279,21 @@ impl<'a> ParameterBuffer<'a> { const MAX_PARAMETERS: usize = 32768; /// `flattened_chunk_size` is the number of datums in a single chunk. - fn new(schema_types: &'a [PgType], flattened_chunk_size: usize) -> Self { - let estimated_parameter_size = usize::min(Self::MAX_PARAMETERS, flattened_chunk_size); + fn new(schema_types: &'a [PgType], parameter_upper_bound: usize) -> Self { + let estimated_parameter_size = usize::min(Self::MAX_PARAMETERS, parameter_upper_bound); Self { parameters: vec![], column_length: schema_types.len(), schema_types, estimated_parameter_size, current_parameter_buffer: Vec::with_capacity(estimated_parameter_size), + parameter_upper_bound, } } fn add_row(&mut self, row: impl Row) { - if self.current_parameter_buffer.len() + self.column_length >= Self::MAX_PARAMETERS { + assert_eq!(row.len(), self.column_length); + if self.current_parameter_buffer.len() + self.column_length > self.parameter_upper_bound { self.new_buffer(); } for (i, datum_ref) in row.iter().enumerate() { @@ -319,8 +327,10 @@ impl<'a> ParameterBuffer<'a> { pub struct PostgresSinkWriter { config: PostgresConfig, pk_indices: Vec, + pk_indices_lookup: HashSet, is_append_only: bool, client: tokio_postgres::Client, + pk_types: Vec, schema_types: Vec, schema: Schema, } @@ -328,7 +338,7 @@ pub struct PostgresSinkWriter { impl PostgresSinkWriter { async fn new( config: PostgresConfig, - mut schema: Schema, + schema: Schema, pk_indices: Vec, is_append_only: bool, ) -> Result { @@ -343,8 +353,10 @@ impl PostgresSinkWriter { ) .await?; + let pk_indices_lookup = pk_indices.iter().copied().collect::>(); + // Rewrite schema types for serialization - let schema_types = { + let (pk_types, schema_types) = { let name_to_type = PostgresExternalTable::type_mapping( &config.user, &config.password, @@ -359,7 +371,8 @@ impl PostgresSinkWriter { ) .await?; let mut schema_types = Vec::with_capacity(schema.fields.len()); - for field in &mut schema.fields[..] { + let mut pk_types = Vec::with_capacity(pk_indices.len()); + for (i, field) in schema.fields.iter().enumerate() { let field_name = &field.name; let actual_data_type = name_to_type.get(field_name).map(|t| (*t).clone()); let actual_data_type = actual_data_type @@ -370,16 +383,21 @@ impl PostgresSinkWriter { )) })? .clone(); + if pk_indices_lookup.contains(&i) { + pk_types.push(actual_data_type.clone()) + } schema_types.push(actual_data_type); } - schema_types + (pk_types, schema_types) }; let writer = Self { config, pk_indices, + pk_indices_lookup, is_append_only, client, + pk_types, schema_types, schema, }; @@ -397,7 +415,6 @@ impl PostgresSinkWriter { } async fn write_batch_append_only(&mut self, chunk: StreamChunk) -> Result<()> { - let mut transaction = self.client.transaction().await?; // 1d flattened array of parameters to be inserted. let mut parameter_buffer = ParameterBuffer::new( &self.schema_types, @@ -416,14 +433,19 @@ impl PostgresSinkWriter { } } let (parameters, remaining) = parameter_buffer.into_parts(); + + let mut transaction = self.client.transaction().await?; Self::execute_parameter( Op::Insert, &mut transaction, &self.schema, + &self.config.schema, &self.config.table, &self.pk_indices, + &self.pk_indices_lookup, parameters, remaining, + true, ) .await?; transaction.commit().await?; @@ -432,16 +454,23 @@ impl PostgresSinkWriter { } async fn write_batch_non_append_only(&mut self, chunk: StreamChunk) -> Result<()> { - let mut transaction = self.client.transaction().await?; // 1d flattened array of parameters to be inserted. let mut insert_parameter_buffer = ParameterBuffer::new( &self.schema_types, - chunk.cardinality() * chunk.data_types().len(), - ); - let mut delete_parameter_buffer = ParameterBuffer::new( - &self.schema_types, - chunk.cardinality() * self.pk_indices.len(), + // NOTE(kwannoel): + // insert on conflict do update may have multiple + // rows on the same PK. + // In that case they could encounter the following PG error: + // ERROR: ON CONFLICT DO UPDATE command cannot affect row a second time + // HINT: Ensure that no rows proposed for insertion within the same command have duplicate constrained values + // Given that JDBC sink does not batch their insert on conflict do update, + // we can keep the behaviour consistent. + // + // We may opt for an optimization flag to toggle this behaviour in the future. + chunk.data_types().len(), ); + let mut delete_parameter_buffer = + ParameterBuffer::new(&self.pk_types, chunk.cardinality() * self.pk_indices.len()); // 1d flattened array of parameters to be deleted. for (op, row) in chunk.rows() { match op { @@ -455,14 +484,18 @@ impl PostgresSinkWriter { } let (delete_parameters, delete_remaining_parameter) = delete_parameter_buffer.into_parts(); + let mut transaction = self.client.transaction().await?; Self::execute_parameter( Op::Delete, &mut transaction, &self.schema, + &self.config.schema, &self.config.table, &self.pk_indices, + &self.pk_indices_lookup, delete_parameters, delete_remaining_parameter, + false, ) .await?; let (insert_parameters, insert_remaining_parameter) = insert_parameter_buffer.into_parts(); @@ -470,10 +503,13 @@ impl PostgresSinkWriter { Op::Insert, &mut transaction, &self.schema, + &self.config.schema, &self.config.table, &self.pk_indices, + &self.pk_indices_lookup, insert_parameters, insert_remaining_parameter, + false, ) .await?; transaction.commit().await?; @@ -485,54 +521,113 @@ impl PostgresSinkWriter { op: Op, transaction: &mut tokio_postgres::Transaction<'_>, schema: &Schema, + schema_name: &str, table_name: &str, pk_indices: &[usize], + pk_indices_lookup: &HashSet, parameters: Vec>>, remaining_parameter: Vec>, + append_only: bool, ) -> Result<()> { + async fn prepare_statement( + transaction: &mut tokio_postgres::Transaction<'_>, + op: Op, + schema: &Schema, + schema_name: &str, + table_name: &str, + pk_indices: &[usize], + pk_indices_lookup: &HashSet, + rows_length: usize, + append_only: bool, + ) -> Result<(String, tokio_postgres::Statement)> { + assert!(rows_length > 0, "parameters are empty"); + let statement_str = match op { + Op::Insert => { + if append_only { + create_insert_sql(schema, schema_name, table_name, rows_length) + } else { + create_upsert_sql( + schema, + schema_name, + table_name, + pk_indices, + pk_indices_lookup, + rows_length, + ) + } + } + Op::Delete => { + create_delete_sql(schema, schema_name, table_name, pk_indices, rows_length) + } + _ => unreachable!(), + }; + let statement = transaction + .prepare(&statement_str) + .await + .with_context(|| format!("failed to prepare statement: {}", statement_str))?; + Ok((statement_str, statement)) + } + let column_length = match op { Op::Insert => schema.len(), Op::Delete => pk_indices.len(), _ => unreachable!(), }; + if !parameters.is_empty() { let parameter_length = parameters[0].len(); - let rows_length = parameter_length / column_length; assert_eq!( parameter_length % column_length, 0, - "flattened parameters are unaligned, parameters={:#?} columns={:#?}", - parameters, - schema.fields(), + "flattened parameters are unaligned, parameter_length={} column_length={}", + parameter_length, + column_length, ); - let statement = match op { - Op::Insert => create_insert_sql(schema, table_name, rows_length), - Op::Delete => create_delete_sql(schema, table_name, pk_indices, rows_length), - _ => unreachable!(), - }; - let statement = transaction.prepare(&statement).await?; + let rows_length = parameter_length / column_length; + let (statement_str, statement) = prepare_statement( + transaction, + op, + schema, + schema_name, + table_name, + pk_indices, + pk_indices_lookup, + rows_length, + append_only, + ) + .await?; for parameter in parameters { - transaction.execute_raw(&statement, parameter).await?; + transaction + .execute_raw(&statement, parameter) + .await + .with_context(|| format!("failed to execute statement: {}", statement_str,))?; } } if !remaining_parameter.is_empty() { - let rows_length = remaining_parameter.len() / column_length; + let parameter_length = remaining_parameter.len(); assert_eq!( - remaining_parameter.len() % column_length, + parameter_length % column_length, 0, "flattened parameters are unaligned" ); - let statement = match op { - Op::Insert => create_insert_sql(schema, table_name, rows_length), - Op::Delete => create_delete_sql(schema, table_name, pk_indices, rows_length), - _ => unreachable!(), - }; - tracing::trace!("binding statement: {:?}", statement); - let statement = transaction.prepare(&statement).await?; + let rows_length = remaining_parameter.len() / column_length; + let (statement_str, statement) = prepare_statement( + transaction, + op, + schema, + schema_name, + table_name, + pk_indices, + pk_indices_lookup, + rows_length, + append_only, + ) + .await?; tracing::trace!("binding parameters: {:?}", remaining_parameter); transaction .execute_raw(&statement, remaining_parameter) - .await?; + .await + .with_context(|| format!("failed to execute statement: {}", statement_str))?; } Ok(()) } @@ -557,12 +652,26 @@ impl LogSinker for PostgresSinkWriter { } } -fn create_insert_sql(schema: &Schema, table_name: &str, number_of_rows: usize) -> String { +fn create_insert_sql( + schema: &Schema, + schema_name: &str, + table_name: &str, + number_of_rows: usize, +) -> String { + assert!( + number_of_rows > 0, + "number of parameters must be greater than 0" + ); + let normalized_table_name = format!( + "{}.{}", + quote_identifier(schema_name), + quote_identifier(table_name) + ); let number_of_columns = schema.len(); let columns: String = schema .fields() .iter() - .map(|field| field.name.clone()) + .map(|field| quote_identifier(&field.name)) .join(", "); let parameters: String = (0..number_of_rows) .map(|i| { @@ -573,20 +682,30 @@ fn create_insert_sql(schema: &Schema, table_name: &str, number_of_rows: usize) - }) .collect_vec() .join(", "); - format!("INSERT INTO {table_name} ({columns}) VALUES {parameters}") + format!("INSERT INTO {normalized_table_name} ({columns}) VALUES {parameters}") } fn create_delete_sql( schema: &Schema, + schema_name: &str, table_name: &str, pk_indices: &[usize], number_of_rows: usize, ) -> String { + assert!( + number_of_rows > 0, + "number of parameters must be greater than 0" + ); + let normalized_table_name = format!( + "{}.{}", + quote_identifier(schema_name), + quote_identifier(table_name) + ); let number_of_pk = pk_indices.len(); let pk = { let pk_symbols = pk_indices .iter() - .map(|pk_index| &schema.fields()[*pk_index].name) + .map(|pk_index| quote_identifier(&schema.fields()[*pk_index].name)) .join(", "); format!("({})", pk_symbols) }; @@ -599,7 +718,38 @@ fn create_delete_sql( }) .collect_vec() .join(", "); - format!("DELETE FROM {table_name} WHERE {pk} in ({parameters})") + format!("DELETE FROM {normalized_table_name} WHERE {pk} in ({parameters})") +} + +fn create_upsert_sql( + schema: &Schema, + schema_name: &str, + table_name: &str, + pk_indices: &[usize], + pk_indices_lookup: &HashSet, + number_of_rows: usize, +) -> String { + let number_of_columns = schema.len(); + let insert_sql = create_insert_sql(schema, schema_name, table_name, number_of_rows); + let pk_columns = pk_indices + .iter() + .map(|pk_index| quote_identifier(&schema.fields()[*pk_index].name)) + .collect_vec() + .join(", "); + let update_parameters: String = (0..number_of_columns) + .filter(|i| !pk_indices_lookup.contains(i)) + .map(|i| { + let column = quote_identifier(&schema.fields()[i].name); + format!("{column} = EXCLUDED.{column}") + }) + .collect_vec() + .join(", "); + format!("{insert_sql} on conflict ({pk_columns}) do update set {update_parameters}") +} + +/// Quote an identifier for PostgreSQL. +fn quote_identifier(identifier: &str) -> String { + format!("\"{}\"", identifier.replace("\"", "\"\"")) } #[cfg(test)] @@ -629,11 +779,14 @@ mod tests { name: "b".to_owned(), }, ]); + let schema_name = "test_schema"; let table_name = "test_table"; - let sql = create_insert_sql(&schema, table_name, 3); + let sql = create_insert_sql(&schema, schema_name, table_name, 3); check( sql, - expect!["INSERT INTO test_table (a, b) VALUES ($1, $2), ($3, $4), ($5, $6)"], + expect![[ + r#"INSERT INTO "test_schema"."test_table" ("a", "b") VALUES ($1, $2), ($3, $4), ($5, $6)"# + ]], ); } @@ -649,17 +802,53 @@ mod tests { name: "b".to_owned(), }, ]); + let schema_name = "test_schema"; let table_name = "test_table"; - let sql = create_delete_sql(&schema, table_name, &[1], 3); + let sql = create_delete_sql(&schema, schema_name, table_name, &[1], 3); check( sql, - expect!["DELETE FROM test_table WHERE (b) in (($1), ($2), ($3))"], + expect![[ + r#"DELETE FROM "test_schema"."test_table" WHERE ("b") in (($1), ($2), ($3))"# + ]], ); let table_name = "test_table"; - let sql = create_delete_sql(&schema, table_name, &[0, 1], 3); + let sql = create_delete_sql(&schema, schema_name, table_name, &[0, 1], 3); + check( + sql, + expect![[ + r#"DELETE FROM "test_schema"."test_table" WHERE ("a", "b") in (($1, $2), ($3, $4), ($5, $6))"# + ]], + ); + } + + #[test] + fn test_create_upsert_sql() { + let schema = Schema::new(vec![ + Field { + data_type: DataType::Int32, + name: "a".to_owned(), + }, + Field { + data_type: DataType::Int32, + name: "b".to_owned(), + }, + ]); + let schema_name = "test_schema"; + let table_name = "test_table"; + let pk_indices_lookup = HashSet::from_iter([1]); + let sql = create_upsert_sql( + &schema, + schema_name, + table_name, + &[1], + &pk_indices_lookup, + 3, + ); check( sql, - expect!["DELETE FROM test_table WHERE (a, b) in (($1, $2), ($3, $4), ($5, $6))"], + expect![[ + r#"INSERT INTO "test_schema"."test_table" ("a", "b") VALUES ($1, $2), ($3, $4), ($5, $6) on conflict ("b") do update set "a" = EXCLUDED."a""# + ]], ); } } diff --git a/src/risedevtool/src/task/postgres_service.rs b/src/risedevtool/src/task/postgres_service.rs index abaf0574de5bf..fe520d486b2f8 100644 --- a/src/risedevtool/src/task/postgres_service.rs +++ b/src/risedevtool/src/task/postgres_service.rs @@ -30,9 +30,16 @@ impl DockerServiceConfig for PostgresConfig { fn args(&self) -> Vec { // Enable CDC. - ["-c", "wal_level=logical", "-c", "max_replication_slots=30"] - .map(String::from) - .to_vec() + [ + "-c", + "wal_level=logical", + "-c", + "max_replication_slots=30", + "-c", + "log_statement=all", + ] + .map(String::from) + .to_vec() } fn envs(&self) -> Vec<(String, String)> { diff --git a/src/stream/src/from_proto/sink.rs b/src/stream/src/from_proto/sink.rs index 8133bc72913bb..7f5bec50b395d 100644 --- a/src/stream/src/from_proto/sink.rs +++ b/src/stream/src/from_proto/sink.rs @@ -141,13 +141,19 @@ impl ExecutorBuilder for SinkExecutorBuilder { && let Some(url) = properties_with_secret.get("jdbc.url") && url.starts_with("jdbc:postgresql:") { + tracing::info!("switching to native postgres connector"); let jdbc_url = parse_jdbc_url(url) .map_err(|e| StreamExecutorError::from((SinkError::Config(e), sink_id.sink_id)))?; + properties_with_secret.insert(CONNECTOR_TYPE_KEY.to_owned(), "postgres".to_owned()); properties_with_secret.insert("host".to_owned(), jdbc_url.host); properties_with_secret.insert("port".to_owned(), jdbc_url.port.to_string()); properties_with_secret.insert("database".to_owned(), jdbc_url.db_name); - properties_with_secret.insert("user".to_owned(), jdbc_url.username); - properties_with_secret.insert("password".to_owned(), jdbc_url.password); + if let Some(username) = jdbc_url.username { + properties_with_secret.insert("user".to_owned(), username); + } + if let Some(password) = jdbc_url.password { + properties_with_secret.insert("password".to_owned(), password); + } if let Some(table_name) = properties_with_secret.get("table.name") { properties_with_secret.insert("table".to_owned(), table_name.clone()); } @@ -306,8 +312,8 @@ struct JdbcUrl { host: String, port: u16, db_name: String, - username: String, - password: String, + username: Option, + password: Option, } fn parse_jdbc_url(url: &str) -> anyhow::Result { @@ -331,7 +337,9 @@ fn parse_jdbc_url(url: &str) -> anyhow::Result { let port = url .port() .ok_or_else(|| anyhow!("missing port in jdbc url"))?; - let db_name = url.path(); + let Some(db_name) = url.path().strip_prefix('/') else { + bail!("missing db_name in jdbc url"); + }; let mut username = None; let mut password = None; for (key, value) in url.query_pairs() { @@ -342,15 +350,13 @@ fn parse_jdbc_url(url: &str) -> anyhow::Result { password = Some(value.to_string()); } } - let username = username.ok_or_else(|| anyhow!("missing username in jdbc url"))?; - let password = password.ok_or_else(|| anyhow!("missing password in jdbc url"))?; Ok(JdbcUrl { host: host.to_owned(), port, db_name: db_name.to_owned(), - username: username.to_owned(), - password: password.to_owned(), + username, + password, }) } @@ -364,8 +370,8 @@ mod tests { let jdbc_url = parse_jdbc_url(url).unwrap(); assert_eq!(jdbc_url.host, "localhost"); assert_eq!(jdbc_url.port, 5432); - assert_eq!(jdbc_url.db_name, "/test"); - assert_eq!(jdbc_url.username, "postgres"); - assert_eq!(jdbc_url.password, "postgres"); + assert_eq!(jdbc_url.db_name, "test"); + assert_eq!(jdbc_url.username, Some("postgres".to_owned())); + assert_eq!(jdbc_url.password, Some("postgres".to_owned())); } }