From 26e7d555f3bb3aed8b9a790bc13929c632164fd0 Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Wed, 23 Apr 2025 18:34:32 +0800 Subject: [PATCH 01/21] add on insert do update for upsert pg sink --- e2e_test/sink/postgres_sink.slt | 54 ++++++++++++++++++++ src/connector/src/sink/postgres.rs | 82 +++++++++++++++++++++++++++++- 2 files changed, 134 insertions(+), 2 deletions(-) diff --git a/e2e_test/sink/postgres_sink.slt b/e2e_test/sink/postgres_sink.slt index 10ce6952a4508..0bcbdfade396d 100644 --- a/e2e_test/sink/postgres_sink.slt +++ b/e2e_test/sink/postgres_sink.slt @@ -269,8 +269,62 @@ select * from postgres_query('$PGHOST', '$PGPORT', '$PGUSER', '${PGPASSWORD:post 1 Varcharvalue1 Textvalue1 123 456 789 12.34 56.78 90.12 t 2023-05-22 12:34:56 2023-05-22 12:34:56 1 day {"key": "value"} 2 Varcharvalue4 Textvalue2 234 400 890 NaN 67.89 1.23 f 2023-05-23 23:45:01 2023-05-23 23:45:01 2 days {"key": "value2"} +################### test duplicate inserts should work + +statement ok +CREATE TABLE rw_types_table_dup ( + id BIGINT PRIMARY KEY, + varchar_column VARCHAR, + text_column TEXT, + smallint_column SMALLINT, + integer_column INTEGER, + bigint_column BIGINT, + decimal_column DECIMAL, + real_column REAL, + double_column DOUBLE PRECISION, + boolean_column BOOLEAN, + date_column DATE, + time_column TIME, + interval_column INTERVAL, + jsonb_column JSONB, + timestamp_column TIMESTAMP +); + +statement ok +INSERT INTO rw_types_table (id, varchar_column, text_column, integer_column, smallint_column, bigint_column, decimal_column, real_column, double_column, boolean_column, date_column, time_column, timestamp_column, interval_column, jsonb_column) VALUES + (1, 'Varcharvalue4', 'Textvalue1', 123, 456, 789, 12.34, 56.78, 90.12, TRUE, '2023-05-22', '12:34:56', '2023-05-22 12:34:56', '1 day', '{"key": "value"}'), + (2, 'Varcharvalue6', 'Textvalue2', 234, 567, 890, 'NAN'::decimal, 67.89, 01.23, FALSE, '2023-05-23', '23:45:01', '2023-05-23 23:45:01', '2 days', '{"key": "value2"}'); + +statement ok +flush; + +statement ok +CREATE SINK postgres_rw_types_sink_dup FROM rw_types_table_dup WITH ( + connector='postgres', + host='$PGHOST', + port='$PGPORT', + user='$PGUSER', + password='$PGPASSWORD', + database='sink_test', + table='pg_types_table', + type='upsert', + primary_key='id', +); + +query I +select * from postgres_query('$PGHOST', '$PGPORT', '$PGUSER', '${PGPASSWORD:postgres}', 'sink_test', 'select * from pg_types_table order by id;'); +---- +1 Varcharvalue4 Textvalue1 123 456 789 12.34 56.78 90.12 t 2023-05-22 12:34:56 2023-05-22 12:34:56 1 day {"key": "value"} +2 Varcharvalue6 Textvalue2 234 567 890 NaN 67.89 1.23 f 2023-05-23 23:45:01 2023-05-23 23:45:01 2 days {"key": "value2"} + ################### cleanup sink +statement ok +DROP SINK postgres_rw_types_sink_dup; + +statement ok +DROP TABLE rw_types_table_dup; + statement ok DROP SINK postgres_rw_types_sink; diff --git a/src/connector/src/sink/postgres.rs b/src/connector/src/sink/postgres.rs index a03b536521fe1..dcfe65032dde3 100644 --- a/src/connector/src/sink/postgres.rs +++ b/src/connector/src/sink/postgres.rs @@ -424,6 +424,7 @@ impl PostgresSinkWriter { &self.pk_indices, parameters, remaining, + true, ) .await?; transaction.commit().await?; @@ -463,6 +464,7 @@ impl PostgresSinkWriter { &self.pk_indices, delete_parameters, delete_remaining_parameter, + false, ) .await?; let (insert_parameters, insert_remaining_parameter) = insert_parameter_buffer.into_parts(); @@ -474,6 +476,7 @@ impl PostgresSinkWriter { &self.pk_indices, insert_parameters, insert_remaining_parameter, + false, ) .await?; transaction.commit().await?; @@ -489,6 +492,7 @@ impl PostgresSinkWriter { pk_indices: &[usize], parameters: Vec>>, remaining_parameter: Vec>, + append_only: bool, ) -> Result<()> { let column_length = match op { Op::Insert => schema.len(), @@ -506,7 +510,13 @@ impl PostgresSinkWriter { schema.fields(), ); let statement = match op { - Op::Insert => create_insert_sql(schema, table_name, rows_length), + Op::Insert => { + if append_only { + create_insert_sql(schema, table_name, rows_length) + } else { + create_upsert_sql(schema, table_name, pk_indices, rows_length) + } + } Op::Delete => create_delete_sql(schema, table_name, pk_indices, rows_length), _ => unreachable!(), }; @@ -523,7 +533,13 @@ impl PostgresSinkWriter { "flattened parameters are unaligned" ); let statement = match op { - Op::Insert => create_insert_sql(schema, table_name, rows_length), + Op::Insert => { + if append_only { + create_insert_sql(schema, table_name, rows_length) + } else { + create_upsert_sql(schema, table_name, pk_indices, rows_length) + } + } Op::Delete => create_delete_sql(schema, table_name, pk_indices, rows_length), _ => unreachable!(), }; @@ -602,6 +618,46 @@ fn create_delete_sql( format!("DELETE FROM {table_name} WHERE {pk} in ({parameters})") } +fn create_upsert_sql( + schema: &Schema, + table_name: &str, + pk_indices: &[usize], + number_of_rows: usize, +) -> String { + let number_of_columns = schema.len(); + let columns: String = schema + .fields() + .iter() + .map(|field| field.name.clone()) + .collect_vec() + .join(", "); + let parameters: String = (0..number_of_rows) + .map(|i| { + let row_parameters = (0..number_of_columns) + .map(|j| format!("${}", i * number_of_columns + j + 1)) + .join(", "); + format!("({row_parameters})") + }) + .collect_vec() + .join(", "); + let pk_columns = pk_indices + .iter() + .map(|i| schema.fields()[*i].name.clone()) + .collect_vec() + .join(", "); + let update_parameters: String = (0..number_of_columns) + .filter(|i| !pk_indices.contains(i)) + .map(|i| { + let column = schema.fields()[i].name.clone(); + format!("{column} = EXCLUDED.{column}") + }) + .collect_vec() + .join(", "); + format!( + "INSERT INTO {table_name} ({columns}) VALUES {parameters} on conflict ({pk_columns}) do update set {update_parameters}" + ) +} + #[cfg(test)] mod tests { use std::fmt::Display; @@ -662,4 +718,26 @@ mod tests { expect!["DELETE FROM test_table WHERE (a, b) in (($1, $2), ($3, $4), ($5, $6))"], ); } + + #[test] + fn test_create_upsert_sql() { + let schema = Schema::new(vec![ + Field { + data_type: DataType::Int32, + name: "a".to_owned(), + }, + Field { + data_type: DataType::Int32, + name: "b".to_owned(), + }, + ]); + let table_name = "test_table"; + let sql = create_upsert_sql(&schema, table_name, &[1], 3); + check( + sql, + expect![ + "INSERT INTO test_table (a, b) VALUES ($1, $2), ($3, $4), ($5, $6) on conflict (b) do update set a = EXCLUDED.a" + ], + ); + } } From e1ce236869038da244903f8e7ea24af675db1399 Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Thu, 24 Apr 2025 00:19:27 +0800 Subject: [PATCH 02/21] log failing statement --- src/connector/src/sink/postgres.rs | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/src/connector/src/sink/postgres.rs b/src/connector/src/sink/postgres.rs index dcfe65032dde3..852cf557770f1 100644 --- a/src/connector/src/sink/postgres.rs +++ b/src/connector/src/sink/postgres.rs @@ -14,7 +14,7 @@ use std::collections::{BTreeMap, HashSet}; -use anyhow::anyhow; +use anyhow::{Context, anyhow}; use async_trait::async_trait; use itertools::Itertools; use phf::phf_set; @@ -509,7 +509,7 @@ impl PostgresSinkWriter { parameters, schema.fields(), ); - let statement = match op { + let statement_str = match op { Op::Insert => { if append_only { create_insert_sql(schema, table_name, rows_length) @@ -520,9 +520,12 @@ impl PostgresSinkWriter { Op::Delete => create_delete_sql(schema, table_name, pk_indices, rows_length), _ => unreachable!(), }; - let statement = transaction.prepare(&statement).await?; + let statement = transaction.prepare(&statement_str).await?; for parameter in parameters { - transaction.execute_raw(&statement, parameter).await?; + transaction + .execute_raw(&statement, parameter) + .await + .with_context(|| format!("failed to run statement: {}", statement_str,))?; } } if !remaining_parameter.is_empty() { @@ -532,7 +535,7 @@ impl PostgresSinkWriter { 0, "flattened parameters are unaligned" ); - let statement = match op { + let statement_str = match op { Op::Insert => { if append_only { create_insert_sql(schema, table_name, rows_length) @@ -543,12 +546,13 @@ impl PostgresSinkWriter { Op::Delete => create_delete_sql(schema, table_name, pk_indices, rows_length), _ => unreachable!(), }; - tracing::trace!("binding statement: {:?}", statement); - let statement = transaction.prepare(&statement).await?; + tracing::trace!("binding statement: {:?}", statement_str); + let statement = transaction.prepare(&statement_str).await?; tracing::trace!("binding parameters: {:?}", remaining_parameter); transaction .execute_raw(&statement, remaining_parameter) - .await?; + .await + .with_context(|| format!("failed to run statement: {}", statement_str))?; } Ok(()) } From 24e432f051133b7b190f383b37e8e3bc6403cf72 Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Thu, 24 Apr 2025 19:36:21 +0800 Subject: [PATCH 03/21] handle schema name, quote table,schema,column names --- src/connector/src/sink/postgres.rs | 103 ++++++++++++++++++----------- 1 file changed, 63 insertions(+), 40 deletions(-) diff --git a/src/connector/src/sink/postgres.rs b/src/connector/src/sink/postgres.rs index 852cf557770f1..1df1f31f8588b 100644 --- a/src/connector/src/sink/postgres.rs +++ b/src/connector/src/sink/postgres.rs @@ -420,6 +420,7 @@ impl PostgresSinkWriter { Op::Insert, &mut transaction, &self.schema, + &self.config.schema, &self.config.table, &self.pk_indices, parameters, @@ -460,6 +461,7 @@ impl PostgresSinkWriter { Op::Delete, &mut transaction, &self.schema, + &self.config.schema, &self.config.table, &self.pk_indices, delete_parameters, @@ -472,6 +474,7 @@ impl PostgresSinkWriter { Op::Insert, &mut transaction, &self.schema, + &self.config.schema, &self.config.table, &self.pk_indices, insert_parameters, @@ -488,6 +491,7 @@ impl PostgresSinkWriter { op: Op, transaction: &mut tokio_postgres::Transaction<'_>, schema: &Schema, + schema_name: &str, table_name: &str, pk_indices: &[usize], parameters: Vec>>, @@ -512,12 +516,14 @@ impl PostgresSinkWriter { let statement_str = match op { Op::Insert => { if append_only { - create_insert_sql(schema, table_name, rows_length) + create_insert_sql(schema, schema_name, table_name, rows_length) } else { - create_upsert_sql(schema, table_name, pk_indices, rows_length) + create_upsert_sql(schema, schema_name, table_name, pk_indices, rows_length) } } - Op::Delete => create_delete_sql(schema, table_name, pk_indices, rows_length), + Op::Delete => { + create_delete_sql(schema, schema_name, table_name, pk_indices, rows_length) + } _ => unreachable!(), }; let statement = transaction.prepare(&statement_str).await?; @@ -538,12 +544,14 @@ impl PostgresSinkWriter { let statement_str = match op { Op::Insert => { if append_only { - create_insert_sql(schema, table_name, rows_length) + create_insert_sql(schema, schema_name, table_name, rows_length) } else { - create_upsert_sql(schema, table_name, pk_indices, rows_length) + create_upsert_sql(schema, schema_name, table_name, pk_indices, rows_length) } } - Op::Delete => create_delete_sql(schema, table_name, pk_indices, rows_length), + Op::Delete => { + create_delete_sql(schema, schema_name, table_name, pk_indices, rows_length) + } _ => unreachable!(), }; tracing::trace!("binding statement: {:?}", statement_str); @@ -577,7 +585,17 @@ impl LogSinker for PostgresSinkWriter { } } -fn create_insert_sql(schema: &Schema, table_name: &str, number_of_rows: usize) -> String { +fn create_insert_sql( + schema: &Schema, + schema_name: &str, + table_name: &str, + number_of_rows: usize, +) -> String { + let normalized_table_name = format!( + "{}.{}", + quote_identifier(schema_name), + quote_identifier(table_name) + ); let number_of_columns = schema.len(); let columns: String = schema .fields() @@ -593,20 +611,26 @@ fn create_insert_sql(schema: &Schema, table_name: &str, number_of_rows: usize) - }) .collect_vec() .join(", "); - format!("INSERT INTO {table_name} ({columns}) VALUES {parameters}") + format!("INSERT INTO {normalized_table_name} ({columns}) VALUES {parameters}") } fn create_delete_sql( schema: &Schema, + schema_name: &str, table_name: &str, pk_indices: &[usize], number_of_rows: usize, ) -> String { + let normalized_table_name = format!( + "{}.{}", + quote_identifier(schema_name), + quote_identifier(table_name) + ); let number_of_pk = pk_indices.len(); let pk = { let pk_symbols = pk_indices .iter() - .map(|pk_index| &schema.fields()[*pk_index].name) + .map(|pk_index| quote_identifier(&schema.fields()[*pk_index].name)) .join(", "); format!("({})", pk_symbols) }; @@ -619,47 +643,37 @@ fn create_delete_sql( }) .collect_vec() .join(", "); - format!("DELETE FROM {table_name} WHERE {pk} in ({parameters})") + format!("DELETE FROM {normalized_table_name} WHERE {pk} in ({parameters})") } fn create_upsert_sql( schema: &Schema, + schema_name: &str, table_name: &str, pk_indices: &[usize], number_of_rows: usize, ) -> String { let number_of_columns = schema.len(); - let columns: String = schema - .fields() - .iter() - .map(|field| field.name.clone()) - .collect_vec() - .join(", "); - let parameters: String = (0..number_of_rows) - .map(|i| { - let row_parameters = (0..number_of_columns) - .map(|j| format!("${}", i * number_of_columns + j + 1)) - .join(", "); - format!("({row_parameters})") - }) - .collect_vec() - .join(", "); + let insert_sql = create_insert_sql(schema, schema_name, table_name, number_of_rows); let pk_columns = pk_indices .iter() - .map(|i| schema.fields()[*i].name.clone()) + .map(|pk_index| quote_identifier(&schema.fields()[*pk_index].name)) .collect_vec() .join(", "); let update_parameters: String = (0..number_of_columns) .filter(|i| !pk_indices.contains(i)) .map(|i| { - let column = schema.fields()[i].name.clone(); + let column = quote_identifier(&schema.fields()[i].name); format!("{column} = EXCLUDED.{column}") }) .collect_vec() .join(", "); - format!( - "INSERT INTO {table_name} ({columns}) VALUES {parameters} on conflict ({pk_columns}) do update set {update_parameters}" - ) + format!("{insert_sql} on conflict ({pk_columns}) do update set {update_parameters}") +} + +/// Quote an identifier for PostgreSQL. +fn quote_identifier(identifier: &str) -> String { + format!("\"{}\"", identifier.replace("\"", "\"\"")) } #[cfg(test)] @@ -689,11 +703,14 @@ mod tests { name: "b".to_owned(), }, ]); + let schema_name = "test_schema"; let table_name = "test_table"; - let sql = create_insert_sql(&schema, table_name, 3); + let sql = create_insert_sql(&schema, schema_name, table_name, 3); check( sql, - expect!["INSERT INTO test_table (a, b) VALUES ($1, $2), ($3, $4), ($5, $6)"], + expect![[ + r#"INSERT INTO "test_schema"."test_table" (a, b) VALUES ($1, $2), ($3, $4), ($5, $6)"# + ]], ); } @@ -709,17 +726,22 @@ mod tests { name: "b".to_owned(), }, ]); + let schema_name = "test_schema"; let table_name = "test_table"; - let sql = create_delete_sql(&schema, table_name, &[1], 3); + let sql = create_delete_sql(&schema, schema_name, table_name, &[1], 3); check( sql, - expect!["DELETE FROM test_table WHERE (b) in (($1), ($2), ($3))"], + expect![[ + r#"DELETE FROM "test_schema"."test_table" WHERE ("b") in (($1), ($2), ($3))"# + ]], ); let table_name = "test_table"; - let sql = create_delete_sql(&schema, table_name, &[0, 1], 3); + let sql = create_delete_sql(&schema, schema_name, table_name, &[0, 1], 3); check( sql, - expect!["DELETE FROM test_table WHERE (a, b) in (($1, $2), ($3, $4), ($5, $6))"], + expect![[ + r#"DELETE FROM "test_schema"."test_table" WHERE ("a", "b") in (($1, $2), ($3, $4), ($5, $6))"# + ]], ); } @@ -735,13 +757,14 @@ mod tests { name: "b".to_owned(), }, ]); + let schema_name = "test_schema"; let table_name = "test_table"; - let sql = create_upsert_sql(&schema, table_name, &[1], 3); + let sql = create_upsert_sql(&schema, schema_name, table_name, &[1], 3); check( sql, - expect![ - "INSERT INTO test_table (a, b) VALUES ($1, $2), ($3, $4), ($5, $6) on conflict (b) do update set a = EXCLUDED.a" - ], + expect![[ + r#"INSERT INTO "test_schema"."test_table" (a, b) VALUES ($1, $2), ($3, $4), ($5, $6) on conflict ("b") do update set "a" = EXCLUDED."a""# + ]], ); } } From 49532a608b25dedf04aed6fec4a0b00c8f35261e Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Fri, 25 Apr 2025 00:46:23 +0800 Subject: [PATCH 04/21] add context to query preparation --- src/connector/src/sink/postgres.rs | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/src/connector/src/sink/postgres.rs b/src/connector/src/sink/postgres.rs index 1df1f31f8588b..1bf434276f98f 100644 --- a/src/connector/src/sink/postgres.rs +++ b/src/connector/src/sink/postgres.rs @@ -526,7 +526,10 @@ impl PostgresSinkWriter { } _ => unreachable!(), }; - let statement = transaction.prepare(&statement_str).await?; + let statement = transaction + .prepare(&statement_str) + .await + .with_context(|| format!("failed to run statement: {}", statement_str))?; for parameter in parameters { transaction .execute_raw(&statement, parameter) @@ -555,7 +558,10 @@ impl PostgresSinkWriter { _ => unreachable!(), }; tracing::trace!("binding statement: {:?}", statement_str); - let statement = transaction.prepare(&statement_str).await?; + let statement = transaction + .prepare(&statement_str) + .await + .with_context(|| format!("failed to run statement: {}", statement_str))?; tracing::trace!("binding parameters: {:?}", remaining_parameter); transaction .execute_raw(&statement, remaining_parameter) From 535c9ef12bf91233dd00890a9571bb6dd7d2eb0b Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Fri, 25 Apr 2025 10:42:35 +0800 Subject: [PATCH 05/21] quote insert column --- src/connector/src/sink/postgres.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/connector/src/sink/postgres.rs b/src/connector/src/sink/postgres.rs index 1bf434276f98f..21214ff9d9a11 100644 --- a/src/connector/src/sink/postgres.rs +++ b/src/connector/src/sink/postgres.rs @@ -606,7 +606,7 @@ fn create_insert_sql( let columns: String = schema .fields() .iter() - .map(|field| field.name.clone()) + .map(|field| quote_identifier(&field.name)) .join(", "); let parameters: String = (0..number_of_rows) .map(|i| { @@ -715,7 +715,7 @@ mod tests { check( sql, expect![[ - r#"INSERT INTO "test_schema"."test_table" (a, b) VALUES ($1, $2), ($3, $4), ($5, $6)"# + r#"INSERT INTO "test_schema"."test_table" ("a", "b") VALUES ($1, $2), ($3, $4), ($5, $6)"# ]], ); } @@ -769,7 +769,7 @@ mod tests { check( sql, expect![[ - r#"INSERT INTO "test_schema"."test_table" (a, b) VALUES ($1, $2), ($3, $4), ($5, $6) on conflict ("b") do update set "a" = EXCLUDED."a""# + r#"INSERT INTO "test_schema"."test_table" ("a", "b") VALUES ($1, $2), ($3, $4), ($5, $6) on conflict ("b") do update set "a" = EXCLUDED."a""# ]], ); } From 1c012ace3fb1f3f98edcb307439199fa651eecae Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Fri, 25 Apr 2025 11:06:26 +0800 Subject: [PATCH 06/21] refactor out prepare statement --- src/connector/src/sink/postgres.rs | 98 +++++++++++++++++++----------- 1 file changed, 63 insertions(+), 35 deletions(-) diff --git a/src/connector/src/sink/postgres.rs b/src/connector/src/sink/postgres.rs index 21214ff9d9a11..a85d455497ac2 100644 --- a/src/connector/src/sink/postgres.rs +++ b/src/connector/src/sink/postgres.rs @@ -498,21 +498,17 @@ impl PostgresSinkWriter { remaining_parameter: Vec>, append_only: bool, ) -> Result<()> { - let column_length = match op { - Op::Insert => schema.len(), - Op::Delete => pk_indices.len(), - _ => unreachable!(), - }; - if !parameters.is_empty() { - let parameter_length = parameters[0].len(); - let rows_length = parameter_length / column_length; - assert_eq!( - parameter_length % column_length, - 0, - "flattened parameters are unaligned, parameters={:#?} columns={:#?}", - parameters, - schema.fields(), - ); + async fn prepare_statement( + transaction: &mut tokio_postgres::Transaction<'_>, + op: Op, + schema: &Schema, + schema_name: &str, + table_name: &str, + pk_indices: &[usize], + rows_length: usize, + append_only: bool, + ) -> Result<(String, tokio_postgres::Statement)> { + assert!(rows_length > 0, "parameters are empty"); let statement_str = match op { Op::Insert => { if append_only { @@ -530,6 +526,36 @@ impl PostgresSinkWriter { .prepare(&statement_str) .await .with_context(|| format!("failed to run statement: {}", statement_str))?; + Ok((statement_str, statement)) + } + + let column_length = match op { + Op::Insert => schema.len(), + Op::Delete => pk_indices.len(), + _ => unreachable!(), + }; + + if !parameters.is_empty() { + let parameter_length = parameters[0].len(); + assert_eq!( + parameter_length % column_length, + 0, + "flattened parameters are unaligned, parameter_length={} column_length={}", + parameter_length, + column_length, + ); + let rows_length = parameter_length / column_length; + let (statement_str, statement) = prepare_statement( + transaction, + op, + schema, + schema_name, + table_name, + pk_indices, + rows_length, + append_only, + ) + .await?; for parameter in parameters { transaction .execute_raw(&statement, parameter) @@ -538,30 +564,24 @@ impl PostgresSinkWriter { } } if !remaining_parameter.is_empty() { - let rows_length = remaining_parameter.len() / column_length; + let parameter_length = remaining_parameter.len(); assert_eq!( - remaining_parameter.len() % column_length, + parameter_length % column_length, 0, "flattened parameters are unaligned" ); - let statement_str = match op { - Op::Insert => { - if append_only { - create_insert_sql(schema, schema_name, table_name, rows_length) - } else { - create_upsert_sql(schema, schema_name, table_name, pk_indices, rows_length) - } - } - Op::Delete => { - create_delete_sql(schema, schema_name, table_name, pk_indices, rows_length) - } - _ => unreachable!(), - }; - tracing::trace!("binding statement: {:?}", statement_str); - let statement = transaction - .prepare(&statement_str) - .await - .with_context(|| format!("failed to run statement: {}", statement_str))?; + let rows_length = remaining_parameter.len() / column_length; + let (statement_str, statement) = prepare_statement( + transaction, + op, + schema, + schema_name, + table_name, + pk_indices, + rows_length, + append_only, + ) + .await?; tracing::trace!("binding parameters: {:?}", remaining_parameter); transaction .execute_raw(&statement, remaining_parameter) @@ -597,6 +617,10 @@ fn create_insert_sql( table_name: &str, number_of_rows: usize, ) -> String { + assert!( + number_of_rows > 0, + "number of parameters must be greater than 0" + ); let normalized_table_name = format!( "{}.{}", quote_identifier(schema_name), @@ -627,6 +651,10 @@ fn create_delete_sql( pk_indices: &[usize], number_of_rows: usize, ) -> String { + assert!( + number_of_rows > 0, + "number of parameters must be greater than 0" + ); let normalized_table_name = format!( "{}.{}", quote_identifier(schema_name), From 44526f635933e1ab8edce6f684da08f55832ce8f Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Fri, 25 Apr 2025 11:54:00 +0800 Subject: [PATCH 07/21] use pk types instead for delete parameters --- src/connector/src/sink/postgres.rs | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/src/connector/src/sink/postgres.rs b/src/connector/src/sink/postgres.rs index a85d455497ac2..9011d27f6caa9 100644 --- a/src/connector/src/sink/postgres.rs +++ b/src/connector/src/sink/postgres.rs @@ -321,6 +321,7 @@ pub struct PostgresSinkWriter { pk_indices: Vec, is_append_only: bool, client: tokio_postgres::Client, + pk_types: Vec, schema_types: Vec, schema: Schema, } @@ -328,7 +329,7 @@ pub struct PostgresSinkWriter { impl PostgresSinkWriter { async fn new( config: PostgresConfig, - mut schema: Schema, + schema: Schema, pk_indices: Vec, is_append_only: bool, ) -> Result { @@ -343,8 +344,10 @@ impl PostgresSinkWriter { ) .await?; + let pk_indices_lookup = pk_indices.iter().copied().collect::>(); + // Rewrite schema types for serialization - let schema_types = { + let (pk_types, schema_types) = { let name_to_type = PostgresExternalTable::type_mapping( &config.user, &config.password, @@ -359,7 +362,8 @@ impl PostgresSinkWriter { ) .await?; let mut schema_types = Vec::with_capacity(schema.fields.len()); - for field in &mut schema.fields[..] { + let mut pk_types = Vec::with_capacity(pk_indices.len()); + for (i, field) in schema.fields.iter().enumerate() { let field_name = &field.name; let actual_data_type = name_to_type.get(field_name).map(|t| (*t).clone()); let actual_data_type = actual_data_type @@ -370,9 +374,12 @@ impl PostgresSinkWriter { )) })? .clone(); + if pk_indices_lookup.contains(&i) { + pk_types.push(actual_data_type.clone()) + } schema_types.push(actual_data_type); } - schema_types + (pk_types, schema_types) }; let writer = Self { @@ -380,6 +387,7 @@ impl PostgresSinkWriter { pk_indices, is_append_only, client, + pk_types, schema_types, schema, }; @@ -441,7 +449,7 @@ impl PostgresSinkWriter { chunk.cardinality() * chunk.data_types().len(), ); let mut delete_parameter_buffer = ParameterBuffer::new( - &self.schema_types, + &self.pk_types, chunk.cardinality() * self.pk_indices.len(), ); // 1d flattened array of parameters to be deleted. From 074c76868fca3dcd36d50b07cdaa7d3b208f6ba7 Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Fri, 25 Apr 2025 11:55:51 +0800 Subject: [PATCH 08/21] fmt --- src/connector/src/sink/postgres.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/connector/src/sink/postgres.rs b/src/connector/src/sink/postgres.rs index 9011d27f6caa9..6ae52f096af65 100644 --- a/src/connector/src/sink/postgres.rs +++ b/src/connector/src/sink/postgres.rs @@ -448,10 +448,8 @@ impl PostgresSinkWriter { &self.schema_types, chunk.cardinality() * chunk.data_types().len(), ); - let mut delete_parameter_buffer = ParameterBuffer::new( - &self.pk_types, - chunk.cardinality() * self.pk_indices.len(), - ); + let mut delete_parameter_buffer = + ParameterBuffer::new(&self.pk_types, chunk.cardinality() * self.pk_indices.len()); // 1d flattened array of parameters to be deleted. for (op, row) in chunk.rows() { match op { From e9e70a82e4e0bce647f29e01aed4136032c74770 Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Fri, 25 Apr 2025 12:03:11 +0800 Subject: [PATCH 09/21] ensure alignment when inserting row --- src/connector/src/sink/postgres.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/connector/src/sink/postgres.rs b/src/connector/src/sink/postgres.rs index 6ae52f096af65..1513076062657 100644 --- a/src/connector/src/sink/postgres.rs +++ b/src/connector/src/sink/postgres.rs @@ -285,6 +285,7 @@ impl<'a> ParameterBuffer<'a> { } fn add_row(&mut self, row: impl Row) { + assert_eq!(row.len(), self.column_length); if self.current_parameter_buffer.len() + self.column_length >= Self::MAX_PARAMETERS { self.new_buffer(); } From 861d1f2d32741c1db7cadba11d5e4c95fe536486 Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Fri, 25 Apr 2025 12:31:41 +0800 Subject: [PATCH 10/21] use pk_indices_lookup --- src/connector/src/sink/postgres.rs | 31 +++++++++++++++++++++++++++--- 1 file changed, 28 insertions(+), 3 deletions(-) diff --git a/src/connector/src/sink/postgres.rs b/src/connector/src/sink/postgres.rs index 1513076062657..aebf09013f479 100644 --- a/src/connector/src/sink/postgres.rs +++ b/src/connector/src/sink/postgres.rs @@ -320,6 +320,7 @@ impl<'a> ParameterBuffer<'a> { pub struct PostgresSinkWriter { config: PostgresConfig, pk_indices: Vec, + pk_indices_lookup: HashSet, is_append_only: bool, client: tokio_postgres::Client, pk_types: Vec, @@ -386,6 +387,7 @@ impl PostgresSinkWriter { let writer = Self { config, pk_indices, + pk_indices_lookup, is_append_only, client, pk_types, @@ -432,6 +434,7 @@ impl PostgresSinkWriter { &self.config.schema, &self.config.table, &self.pk_indices, + &self.pk_indices_lookup, parameters, remaining, true, @@ -471,6 +474,7 @@ impl PostgresSinkWriter { &self.config.schema, &self.config.table, &self.pk_indices, + &self.pk_indices_lookup, delete_parameters, delete_remaining_parameter, false, @@ -484,6 +488,7 @@ impl PostgresSinkWriter { &self.config.schema, &self.config.table, &self.pk_indices, + &self.pk_indices_lookup, insert_parameters, insert_remaining_parameter, false, @@ -501,6 +506,7 @@ impl PostgresSinkWriter { schema_name: &str, table_name: &str, pk_indices: &[usize], + pk_indices_lookup: &HashSet, parameters: Vec>>, remaining_parameter: Vec>, append_only: bool, @@ -512,6 +518,7 @@ impl PostgresSinkWriter { schema_name: &str, table_name: &str, pk_indices: &[usize], + pk_indices_lookup: &HashSet, rows_length: usize, append_only: bool, ) -> Result<(String, tokio_postgres::Statement)> { @@ -521,7 +528,14 @@ impl PostgresSinkWriter { if append_only { create_insert_sql(schema, schema_name, table_name, rows_length) } else { - create_upsert_sql(schema, schema_name, table_name, pk_indices, rows_length) + create_upsert_sql( + schema, + schema_name, + table_name, + pk_indices, + pk_indices_lookup, + rows_length, + ) } } Op::Delete => { @@ -559,6 +573,7 @@ impl PostgresSinkWriter { schema_name, table_name, pk_indices, + pk_indices_lookup, rows_length, append_only, ) @@ -585,6 +600,7 @@ impl PostgresSinkWriter { schema_name, table_name, pk_indices, + pk_indices_lookup, rows_length, append_only, ) @@ -692,6 +708,7 @@ fn create_upsert_sql( schema_name: &str, table_name: &str, pk_indices: &[usize], + pk_indices_lookup: &HashSet, number_of_rows: usize, ) -> String { let number_of_columns = schema.len(); @@ -702,7 +719,7 @@ fn create_upsert_sql( .collect_vec() .join(", "); let update_parameters: String = (0..number_of_columns) - .filter(|i| !pk_indices.contains(i)) + .filter(|i| !pk_indices_lookup.contains(i)) .map(|i| { let column = quote_identifier(&schema.fields()[i].name); format!("{column} = EXCLUDED.{column}") @@ -800,7 +817,15 @@ mod tests { ]); let schema_name = "test_schema"; let table_name = "test_table"; - let sql = create_upsert_sql(&schema, schema_name, table_name, &[1], 3); + let pk_indices_lookup = HashSet::from_iter([1]); + let sql = create_upsert_sql( + &schema, + schema_name, + table_name, + &[1], + &pk_indices_lookup, + 3, + ); check( sql, expect![[ From 5f4078f9bb051f9d52a15c3f4d741739e6145793 Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Mon, 28 Apr 2025 16:17:49 +0800 Subject: [PATCH 11/21] improve context --- src/connector/src/sink/postgres.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/connector/src/sink/postgres.rs b/src/connector/src/sink/postgres.rs index aebf09013f479..04050930e7449 100644 --- a/src/connector/src/sink/postgres.rs +++ b/src/connector/src/sink/postgres.rs @@ -546,7 +546,7 @@ impl PostgresSinkWriter { let statement = transaction .prepare(&statement_str) .await - .with_context(|| format!("failed to run statement: {}", statement_str))?; + .with_context(|| format!("failed to prepare statement: {}", statement_str))?; Ok((statement_str, statement)) } @@ -582,7 +582,7 @@ impl PostgresSinkWriter { transaction .execute_raw(&statement, parameter) .await - .with_context(|| format!("failed to run statement: {}", statement_str,))?; + .with_context(|| format!("failed to execute statement: {}", statement_str,))?; } } if !remaining_parameter.is_empty() { @@ -609,7 +609,7 @@ impl PostgresSinkWriter { transaction .execute_raw(&statement, remaining_parameter) .await - .with_context(|| format!("failed to run statement: {}", statement_str))?; + .with_context(|| format!("failed to execute statement: {}", statement_str))?; } Ok(()) } From 68949c55f3b9fb5fc9033a36c06c512deb7b7b9e Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Thu, 24 Apr 2025 14:06:48 +0800 Subject: [PATCH 12/21] switch --- src/config/ci-jdbc-to-native.toml | 2 +- src/stream/src/from_proto/sink.rs | 8 ++++++-- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/src/config/ci-jdbc-to-native.toml b/src/config/ci-jdbc-to-native.toml index 368d2f0760b4c..1f4e328388f43 100644 --- a/src/config/ci-jdbc-to-native.toml +++ b/src/config/ci-jdbc-to-native.toml @@ -12,7 +12,7 @@ in_flight_barrier_nums = 10 [streaming.developer] stream_exchange_concurrent_barriers = 10 -switch_jdbc_pg_to_native = true +stream_switch_jdbc_pg_to_native = true [storage] imm_merge_threshold = 2 diff --git a/src/stream/src/from_proto/sink.rs b/src/stream/src/from_proto/sink.rs index 8133bc72913bb..3a5d1614a7fda 100644 --- a/src/stream/src/from_proto/sink.rs +++ b/src/stream/src/from_proto/sink.rs @@ -141,8 +141,10 @@ impl ExecutorBuilder for SinkExecutorBuilder { && let Some(url) = properties_with_secret.get("jdbc.url") && url.starts_with("jdbc:postgresql:") { + tracing::info!("switching to native postgres connector"); let jdbc_url = parse_jdbc_url(url) .map_err(|e| StreamExecutorError::from((SinkError::Config(e), sink_id.sink_id)))?; + properties_with_secret.insert(CONNECTOR_TYPE_KEY.to_owned(), "postgres".to_owned()); properties_with_secret.insert("host".to_owned(), jdbc_url.host); properties_with_secret.insert("port".to_owned(), jdbc_url.port.to_string()); properties_with_secret.insert("database".to_owned(), jdbc_url.db_name); @@ -331,7 +333,9 @@ fn parse_jdbc_url(url: &str) -> anyhow::Result { let port = url .port() .ok_or_else(|| anyhow!("missing port in jdbc url"))?; - let db_name = url.path(); + let Some(db_name) = url.path().strip_prefix('/') else { + bail!("missing db_name in jdbc url"); + }; let mut username = None; let mut password = None; for (key, value) in url.query_pairs() { @@ -364,7 +368,7 @@ mod tests { let jdbc_url = parse_jdbc_url(url).unwrap(); assert_eq!(jdbc_url.host, "localhost"); assert_eq!(jdbc_url.port, 5432); - assert_eq!(jdbc_url.db_name, "/test"); + assert_eq!(jdbc_url.db_name, "test"); assert_eq!(jdbc_url.username, "postgres"); assert_eq!(jdbc_url.password, "postgres"); } From 7c54906af09fa9a25e33614e92a0d27008859b29 Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Thu, 24 Apr 2025 16:49:15 +0800 Subject: [PATCH 13/21] log database,schema,table on failure to connect --- src/connector/src/sink/postgres.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/connector/src/sink/postgres.rs b/src/connector/src/sink/postgres.rs index 04050930e7449..7bb3a011c405f 100644 --- a/src/connector/src/sink/postgres.rs +++ b/src/connector/src/sink/postgres.rs @@ -169,7 +169,11 @@ impl Sink for PostgresSink { &self.config.ssl_root_cert, self.is_append_only, ) - .await?; + .await + .context(format!( + "failed to connect to database: {}, schema: {}, table: {}", + &self.config.database, &self.config.schema, &self.config.table + ))?; // Check that names and types match, order of columns doesn't matter. { From f496ee3f2d75589e962405101cacf22c083474d9 Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Thu, 24 Apr 2025 17:24:15 +0800 Subject: [PATCH 14/21] let username and password fallback to previous settings --- src/stream/src/from_proto/sink.rs | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/src/stream/src/from_proto/sink.rs b/src/stream/src/from_proto/sink.rs index 3a5d1614a7fda..f94b2ea704977 100644 --- a/src/stream/src/from_proto/sink.rs +++ b/src/stream/src/from_proto/sink.rs @@ -148,8 +148,12 @@ impl ExecutorBuilder for SinkExecutorBuilder { properties_with_secret.insert("host".to_owned(), jdbc_url.host); properties_with_secret.insert("port".to_owned(), jdbc_url.port.to_string()); properties_with_secret.insert("database".to_owned(), jdbc_url.db_name); - properties_with_secret.insert("user".to_owned(), jdbc_url.username); - properties_with_secret.insert("password".to_owned(), jdbc_url.password); + if let Some(username) = jdbc_url.username { + properties_with_secret.insert("user".to_owned(), username); + } + if let Some(password) = jdbc_url.password { + properties_with_secret.insert("password".to_owned(), password); + } if let Some(table_name) = properties_with_secret.get("table.name") { properties_with_secret.insert("table".to_owned(), table_name.clone()); } @@ -308,8 +312,8 @@ struct JdbcUrl { host: String, port: u16, db_name: String, - username: String, - password: String, + username: Option, + password: Option, } fn parse_jdbc_url(url: &str) -> anyhow::Result { @@ -346,15 +350,13 @@ fn parse_jdbc_url(url: &str) -> anyhow::Result { password = Some(value.to_string()); } } - let username = username.ok_or_else(|| anyhow!("missing username in jdbc url"))?; - let password = password.ok_or_else(|| anyhow!("missing password in jdbc url"))?; Ok(JdbcUrl { host: host.to_owned(), port, db_name: db_name.to_owned(), - username: username.to_owned(), - password: password.to_owned(), + username, + password, }) } From 05a6ad839a2860515ed16ed222d3b238e03babea Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Thu, 24 Apr 2025 17:39:29 +0800 Subject: [PATCH 15/21] fix test --- src/stream/src/from_proto/sink.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/stream/src/from_proto/sink.rs b/src/stream/src/from_proto/sink.rs index f94b2ea704977..90f4dd7b7877d 100644 --- a/src/stream/src/from_proto/sink.rs +++ b/src/stream/src/from_proto/sink.rs @@ -371,7 +371,7 @@ mod tests { assert_eq!(jdbc_url.host, "localhost"); assert_eq!(jdbc_url.port, 5432); assert_eq!(jdbc_url.db_name, "test"); - assert_eq!(jdbc_url.username, "postgres"); - assert_eq!(jdbc_url.password, "postgres"); + assert_eq!(jdbc_url.username, Some("postgres")); + assert_eq!(jdbc_url.password, Some("postgres")); } } From 59e4b8e784e2726dc8f22ef61f22430bf6bc20ca Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Thu, 24 Apr 2025 19:58:22 +0800 Subject: [PATCH 16/21] fix type --- src/risedevtool/src/task/postgres_service.rs | 2 +- src/stream/src/from_proto/sink.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/risedevtool/src/task/postgres_service.rs b/src/risedevtool/src/task/postgres_service.rs index abaf0574de5bf..5bfdb86052078 100644 --- a/src/risedevtool/src/task/postgres_service.rs +++ b/src/risedevtool/src/task/postgres_service.rs @@ -30,7 +30,7 @@ impl DockerServiceConfig for PostgresConfig { fn args(&self) -> Vec { // Enable CDC. - ["-c", "wal_level=logical", "-c", "max_replication_slots=30"] + ["-c", "wal_level=logical", "-c", "max_replication_slots=30", "-c", "log_statement=all"] .map(String::from) .to_vec() } diff --git a/src/stream/src/from_proto/sink.rs b/src/stream/src/from_proto/sink.rs index 90f4dd7b7877d..7f5bec50b395d 100644 --- a/src/stream/src/from_proto/sink.rs +++ b/src/stream/src/from_proto/sink.rs @@ -371,7 +371,7 @@ mod tests { assert_eq!(jdbc_url.host, "localhost"); assert_eq!(jdbc_url.port, 5432); assert_eq!(jdbc_url.db_name, "test"); - assert_eq!(jdbc_url.username, Some("postgres")); - assert_eq!(jdbc_url.password, Some("postgres")); + assert_eq!(jdbc_url.username, Some("postgres".to_owned())); + assert_eq!(jdbc_url.password, Some("postgres".to_owned())); } } From a84a983d4473511381ed9e796112a4d43f5af6ed Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Thu, 24 Apr 2025 22:32:42 +0800 Subject: [PATCH 17/21] parameterize tests on jdbc/native --- ci/scripts/e2e-sink-test.sh | 4 ++-- e2e_test/sink/remote/jdbc.check.pg.slt | 8 ++++++++ 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/ci/scripts/e2e-sink-test.sh b/ci/scripts/e2e-sink-test.sh index e915d68d3a4af..b30f17cd7f44f 100755 --- a/ci/scripts/e2e-sink-test.sh +++ b/ci/scripts/e2e-sink-test.sh @@ -66,7 +66,7 @@ echo "--- test sink: jdbc:postgres switch to postgres native" # check sink destination postgres risedev slt './e2e_test/sink/remote/jdbc.load.slt' sleep 1 -sqllogictest -h db -p 5432 -d test './e2e_test/sink/remote/jdbc.check.pg.slt' +sqllogictest -h db -p 5432 -d test './e2e_test/sink/remote/jdbc.check.pg.slt' --label 'pg-native' sleep 1 echo "--- killing risingwave cluster: ci-1cn-1fe-switch-to-pg-native" @@ -105,7 +105,7 @@ echo "--- testing remote sinks" # check sink destination postgres risedev slt './e2e_test/sink/remote/jdbc.load.slt' sleep 1 -sqllogictest -h db -p 5432 -d test './e2e_test/sink/remote/jdbc.check.pg.slt' +sqllogictest -h db -p 5432 -d test './e2e_test/sink/remote/jdbc.check.pg.slt' --label 'jdbc' sleep 1 # check sink destination mysql using shell diff --git a/e2e_test/sink/remote/jdbc.check.pg.slt b/e2e_test/sink/remote/jdbc.check.pg.slt index 1ec8c827d939b..0b9fd412af885 100644 --- a/e2e_test/sink/remote/jdbc.check.pg.slt +++ b/e2e_test/sink/remote/jdbc.check.pg.slt @@ -18,7 +18,15 @@ select * from t_remote_1 order by id; 5 Varchar value 5 Text value 5 567 890 123 56.78 90.12 34.56 t 2023-05-26 12:34:56 2023-05-26 12:34:56 2023-05-26 12:34:56+00 2 years 3 mons 4 days 05:06:07 {"key": "value5"} \xdeadbabe 6 Varchar value 6 Text value 6 789 123 456 67.89 34.56 78.91 f 2023-05-27 23:45:01 2023-05-27 23:45:01 2023-05-27 23:45:01+00 2 years 3 mons 4 days 05:06:07 {"key": "value6"} \xdeadbabe +skipif pg-native +query III +select * from biz.t_types order by id; +---- +1 Varcharvalue1 Textvalue1 123 456 789 12.34 56.78 90.12 t 2023-05-22 12:34:56 2023-05-22 12:34:56 1 day {"key": "value"} {"Value 1","Value 2"} {12.345,56.789} {1,2,3} {1,2,3} {1,2,3} {12.3,56.7} +2 Varcharvalue2 Textvalue2 234 567 890 NaN 67.89 1.23 f 2023-05-23 23:45:01 2023-05-23 23:45:01 2 days {"key": "value2"} {"Value 3","Value 4"} {43.21,65.432} {4,5,6} {4,5,6} {4,5,6} {43.2,65.4} +3 Varcharvalue1 Textvalue1 123 456 789 Infinity 56.78 90.12 t 2023-05-22 12:34:56 2023-05-22 12:34:56 1 day {"key": "value"} {"How're you?","\"hello\\ \\world\""} {12.345,56.789} {1,2,3} {1,2,3} {1,2,3} {43.2,65.4} +skipif jdbc query III select * from biz.t_types order by id; ---- From 02cc4f443af89ff65ff3cc0230abc17bb62925de Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Thu, 24 Apr 2025 22:32:49 +0800 Subject: [PATCH 18/21] dump pg logs --- src/risedevtool/src/task/postgres_service.rs | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/src/risedevtool/src/task/postgres_service.rs b/src/risedevtool/src/task/postgres_service.rs index 5bfdb86052078..fe520d486b2f8 100644 --- a/src/risedevtool/src/task/postgres_service.rs +++ b/src/risedevtool/src/task/postgres_service.rs @@ -30,9 +30,16 @@ impl DockerServiceConfig for PostgresConfig { fn args(&self) -> Vec { // Enable CDC. - ["-c", "wal_level=logical", "-c", "max_replication_slots=30", "-c", "log_statement=all"] - .map(String::from) - .to_vec() + [ + "-c", + "wal_level=logical", + "-c", + "max_replication_slots=30", + "-c", + "log_statement=all", + ] + .map(String::from) + .to_vec() } fn envs(&self) -> Vec<(String, String)> { From a3843b92d49c3ed67dedbfcea42ab7ae8f321d0e Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Thu, 24 Apr 2025 23:22:55 +0800 Subject: [PATCH 19/21] do not batch on conflict do update --- src/connector/src/sink/postgres.rs | 26 ++++++++++++++++++++------ 1 file changed, 20 insertions(+), 6 deletions(-) diff --git a/src/connector/src/sink/postgres.rs b/src/connector/src/sink/postgres.rs index 7bb3a011c405f..9788a0acdfdbe 100644 --- a/src/connector/src/sink/postgres.rs +++ b/src/connector/src/sink/postgres.rs @@ -268,6 +268,8 @@ struct ParameterBuffer<'a> { estimated_parameter_size: usize, /// current parameter buffer to be filled. current_parameter_buffer: Vec>, + /// Parameter upper bound + parameter_upper_bound: usize, } impl<'a> ParameterBuffer<'a> { @@ -277,20 +279,21 @@ impl<'a> ParameterBuffer<'a> { const MAX_PARAMETERS: usize = 32768; /// `flattened_chunk_size` is the number of datums in a single chunk. - fn new(schema_types: &'a [PgType], flattened_chunk_size: usize) -> Self { - let estimated_parameter_size = usize::min(Self::MAX_PARAMETERS, flattened_chunk_size); + fn new(schema_types: &'a [PgType], parameter_upper_bound: usize) -> Self { + let estimated_parameter_size = usize::min(Self::MAX_PARAMETERS, parameter_upper_bound); Self { parameters: vec![], column_length: schema_types.len(), schema_types, estimated_parameter_size, current_parameter_buffer: Vec::with_capacity(estimated_parameter_size), + parameter_upper_bound, } } fn add_row(&mut self, row: impl Row) { assert_eq!(row.len(), self.column_length); - if self.current_parameter_buffer.len() + self.column_length >= Self::MAX_PARAMETERS { + if self.current_parameter_buffer.len() + self.column_length >= self.parameter_upper_bound { self.new_buffer(); } for (i, datum_ref) in row.iter().enumerate() { @@ -412,7 +415,6 @@ impl PostgresSinkWriter { } async fn write_batch_append_only(&mut self, chunk: StreamChunk) -> Result<()> { - let mut transaction = self.client.transaction().await?; // 1d flattened array of parameters to be inserted. let mut parameter_buffer = ParameterBuffer::new( &self.schema_types, @@ -431,6 +433,8 @@ impl PostgresSinkWriter { } } let (parameters, remaining) = parameter_buffer.into_parts(); + + let mut transaction = self.client.transaction().await?; Self::execute_parameter( Op::Insert, &mut transaction, @@ -450,11 +454,20 @@ impl PostgresSinkWriter { } async fn write_batch_non_append_only(&mut self, chunk: StreamChunk) -> Result<()> { - let mut transaction = self.client.transaction().await?; // 1d flattened array of parameters to be inserted. let mut insert_parameter_buffer = ParameterBuffer::new( &self.schema_types, - chunk.cardinality() * chunk.data_types().len(), + // NOTE(kwannoel): + // insert on conflict do update may have multiple + // rows on the same PK. + // In that case they could encounter the following PG error: + // ERROR: ON CONFLICT DO UPDATE command cannot affect row a second time + // HINT: Ensure that no rows proposed for insertion within the same command have duplicate constrained values + // Given that JDBC sink does not batch their insert on conflict do update, + // we can keep the behaviour consistent. + // + // We may opt for an optimization flag to toggle this behaviour in the future. + chunk.data_types().len(), ); let mut delete_parameter_buffer = ParameterBuffer::new(&self.pk_types, chunk.cardinality() * self.pk_indices.len()); @@ -471,6 +484,7 @@ impl PostgresSinkWriter { } let (delete_parameters, delete_remaining_parameter) = delete_parameter_buffer.into_parts(); + let mut transaction = self.client.transaction().await?; Self::execute_parameter( Op::Delete, &mut transaction, From dd502604d602ccaa3f2249c50916f4123d0b6b29 Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Thu, 24 Apr 2025 23:46:44 +0800 Subject: [PATCH 20/21] use onlyif --- e2e_test/sink/remote/jdbc.check.pg.slt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/e2e_test/sink/remote/jdbc.check.pg.slt b/e2e_test/sink/remote/jdbc.check.pg.slt index 0b9fd412af885..46c76dbf6289c 100644 --- a/e2e_test/sink/remote/jdbc.check.pg.slt +++ b/e2e_test/sink/remote/jdbc.check.pg.slt @@ -18,7 +18,7 @@ select * from t_remote_1 order by id; 5 Varchar value 5 Text value 5 567 890 123 56.78 90.12 34.56 t 2023-05-26 12:34:56 2023-05-26 12:34:56 2023-05-26 12:34:56+00 2 years 3 mons 4 days 05:06:07 {"key": "value5"} \xdeadbabe 6 Varchar value 6 Text value 6 789 123 456 67.89 34.56 78.91 f 2023-05-27 23:45:01 2023-05-27 23:45:01 2023-05-27 23:45:01+00 2 years 3 mons 4 days 05:06:07 {"key": "value6"} \xdeadbabe -skipif pg-native +onlyif pg-native query III select * from biz.t_types order by id; ---- @@ -26,7 +26,7 @@ select * from biz.t_types order by id; 2 Varcharvalue2 Textvalue2 234 567 890 NaN 67.89 1.23 f 2023-05-23 23:45:01 2023-05-23 23:45:01 2 days {"key": "value2"} {"Value 3","Value 4"} {43.21,65.432} {4,5,6} {4,5,6} {4,5,6} {43.2,65.4} 3 Varcharvalue1 Textvalue1 123 456 789 Infinity 56.78 90.12 t 2023-05-22 12:34:56 2023-05-22 12:34:56 1 day {"key": "value"} {"How're you?","\"hello\\ \\world\""} {12.345,56.789} {1,2,3} {1,2,3} {1,2,3} {43.2,65.4} -skipif jdbc +onlyif jdbc query III select * from biz.t_types order by id; ---- From 8b503476ac016a4fa812eb467cfc61af4320abf0 Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Fri, 25 Apr 2025 01:56:04 +0800 Subject: [PATCH 21/21] only provision new parameter buffer when overflow --- src/connector/src/sink/postgres.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/connector/src/sink/postgres.rs b/src/connector/src/sink/postgres.rs index 9788a0acdfdbe..e9c8ff9b85ee7 100644 --- a/src/connector/src/sink/postgres.rs +++ b/src/connector/src/sink/postgres.rs @@ -293,7 +293,7 @@ impl<'a> ParameterBuffer<'a> { fn add_row(&mut self, row: impl Row) { assert_eq!(row.len(), self.column_length); - if self.current_parameter_buffer.len() + self.column_length >= self.parameter_upper_bound { + if self.current_parameter_buffer.len() + self.column_length > self.parameter_upper_bound { self.new_buffer(); } for (i, datum_ref) in row.iter().enumerate() {