Skip to content

Commit 24e432f

Browse files
committed
handle schema name, quote table,schema,column names
1 parent e1ce236 commit 24e432f

File tree

1 file changed

+63
-40
lines changed

1 file changed

+63
-40
lines changed

src/connector/src/sink/postgres.rs

Lines changed: 63 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -420,6 +420,7 @@ impl PostgresSinkWriter {
420420
Op::Insert,
421421
&mut transaction,
422422
&self.schema,
423+
&self.config.schema,
423424
&self.config.table,
424425
&self.pk_indices,
425426
parameters,
@@ -460,6 +461,7 @@ impl PostgresSinkWriter {
460461
Op::Delete,
461462
&mut transaction,
462463
&self.schema,
464+
&self.config.schema,
463465
&self.config.table,
464466
&self.pk_indices,
465467
delete_parameters,
@@ -472,6 +474,7 @@ impl PostgresSinkWriter {
472474
Op::Insert,
473475
&mut transaction,
474476
&self.schema,
477+
&self.config.schema,
475478
&self.config.table,
476479
&self.pk_indices,
477480
insert_parameters,
@@ -488,6 +491,7 @@ impl PostgresSinkWriter {
488491
op: Op,
489492
transaction: &mut tokio_postgres::Transaction<'_>,
490493
schema: &Schema,
494+
schema_name: &str,
491495
table_name: &str,
492496
pk_indices: &[usize],
493497
parameters: Vec<Vec<Option<ScalarAdapter>>>,
@@ -512,12 +516,14 @@ impl PostgresSinkWriter {
512516
let statement_str = match op {
513517
Op::Insert => {
514518
if append_only {
515-
create_insert_sql(schema, table_name, rows_length)
519+
create_insert_sql(schema, schema_name, table_name, rows_length)
516520
} else {
517-
create_upsert_sql(schema, table_name, pk_indices, rows_length)
521+
create_upsert_sql(schema, schema_name, table_name, pk_indices, rows_length)
518522
}
519523
}
520-
Op::Delete => create_delete_sql(schema, table_name, pk_indices, rows_length),
524+
Op::Delete => {
525+
create_delete_sql(schema, schema_name, table_name, pk_indices, rows_length)
526+
}
521527
_ => unreachable!(),
522528
};
523529
let statement = transaction.prepare(&statement_str).await?;
@@ -538,12 +544,14 @@ impl PostgresSinkWriter {
538544
let statement_str = match op {
539545
Op::Insert => {
540546
if append_only {
541-
create_insert_sql(schema, table_name, rows_length)
547+
create_insert_sql(schema, schema_name, table_name, rows_length)
542548
} else {
543-
create_upsert_sql(schema, table_name, pk_indices, rows_length)
549+
create_upsert_sql(schema, schema_name, table_name, pk_indices, rows_length)
544550
}
545551
}
546-
Op::Delete => create_delete_sql(schema, table_name, pk_indices, rows_length),
552+
Op::Delete => {
553+
create_delete_sql(schema, schema_name, table_name, pk_indices, rows_length)
554+
}
547555
_ => unreachable!(),
548556
};
549557
tracing::trace!("binding statement: {:?}", statement_str);
@@ -577,7 +585,17 @@ impl LogSinker for PostgresSinkWriter {
577585
}
578586
}
579587

580-
fn create_insert_sql(schema: &Schema, table_name: &str, number_of_rows: usize) -> String {
588+
fn create_insert_sql(
589+
schema: &Schema,
590+
schema_name: &str,
591+
table_name: &str,
592+
number_of_rows: usize,
593+
) -> String {
594+
let normalized_table_name = format!(
595+
"{}.{}",
596+
quote_identifier(schema_name),
597+
quote_identifier(table_name)
598+
);
581599
let number_of_columns = schema.len();
582600
let columns: String = schema
583601
.fields()
@@ -593,20 +611,26 @@ fn create_insert_sql(schema: &Schema, table_name: &str, number_of_rows: usize) -
593611
})
594612
.collect_vec()
595613
.join(", ");
596-
format!("INSERT INTO {table_name} ({columns}) VALUES {parameters}")
614+
format!("INSERT INTO {normalized_table_name} ({columns}) VALUES {parameters}")
597615
}
598616

599617
fn create_delete_sql(
600618
schema: &Schema,
619+
schema_name: &str,
601620
table_name: &str,
602621
pk_indices: &[usize],
603622
number_of_rows: usize,
604623
) -> String {
624+
let normalized_table_name = format!(
625+
"{}.{}",
626+
quote_identifier(schema_name),
627+
quote_identifier(table_name)
628+
);
605629
let number_of_pk = pk_indices.len();
606630
let pk = {
607631
let pk_symbols = pk_indices
608632
.iter()
609-
.map(|pk_index| &schema.fields()[*pk_index].name)
633+
.map(|pk_index| quote_identifier(&schema.fields()[*pk_index].name))
610634
.join(", ");
611635
format!("({})", pk_symbols)
612636
};
@@ -619,47 +643,37 @@ fn create_delete_sql(
619643
})
620644
.collect_vec()
621645
.join(", ");
622-
format!("DELETE FROM {table_name} WHERE {pk} in ({parameters})")
646+
format!("DELETE FROM {normalized_table_name} WHERE {pk} in ({parameters})")
623647
}
624648

625649
fn create_upsert_sql(
626650
schema: &Schema,
651+
schema_name: &str,
627652
table_name: &str,
628653
pk_indices: &[usize],
629654
number_of_rows: usize,
630655
) -> String {
631656
let number_of_columns = schema.len();
632-
let columns: String = schema
633-
.fields()
634-
.iter()
635-
.map(|field| field.name.clone())
636-
.collect_vec()
637-
.join(", ");
638-
let parameters: String = (0..number_of_rows)
639-
.map(|i| {
640-
let row_parameters = (0..number_of_columns)
641-
.map(|j| format!("${}", i * number_of_columns + j + 1))
642-
.join(", ");
643-
format!("({row_parameters})")
644-
})
645-
.collect_vec()
646-
.join(", ");
657+
let insert_sql = create_insert_sql(schema, schema_name, table_name, number_of_rows);
647658
let pk_columns = pk_indices
648659
.iter()
649-
.map(|i| schema.fields()[*i].name.clone())
660+
.map(|pk_index| quote_identifier(&schema.fields()[*pk_index].name))
650661
.collect_vec()
651662
.join(", ");
652663
let update_parameters: String = (0..number_of_columns)
653664
.filter(|i| !pk_indices.contains(i))
654665
.map(|i| {
655-
let column = schema.fields()[i].name.clone();
666+
let column = quote_identifier(&schema.fields()[i].name);
656667
format!("{column} = EXCLUDED.{column}")
657668
})
658669
.collect_vec()
659670
.join(", ");
660-
format!(
661-
"INSERT INTO {table_name} ({columns}) VALUES {parameters} on conflict ({pk_columns}) do update set {update_parameters}"
662-
)
671+
format!("{insert_sql} on conflict ({pk_columns}) do update set {update_parameters}")
672+
}
673+
674+
/// Quote an identifier for PostgreSQL.
675+
fn quote_identifier(identifier: &str) -> String {
676+
format!("\"{}\"", identifier.replace("\"", "\"\""))
663677
}
664678

665679
#[cfg(test)]
@@ -689,11 +703,14 @@ mod tests {
689703
name: "b".to_owned(),
690704
},
691705
]);
706+
let schema_name = "test_schema";
692707
let table_name = "test_table";
693-
let sql = create_insert_sql(&schema, table_name, 3);
708+
let sql = create_insert_sql(&schema, schema_name, table_name, 3);
694709
check(
695710
sql,
696-
expect!["INSERT INTO test_table (a, b) VALUES ($1, $2), ($3, $4), ($5, $6)"],
711+
expect![[
712+
r#"INSERT INTO "test_schema"."test_table" (a, b) VALUES ($1, $2), ($3, $4), ($5, $6)"#
713+
]],
697714
);
698715
}
699716

@@ -709,17 +726,22 @@ mod tests {
709726
name: "b".to_owned(),
710727
},
711728
]);
729+
let schema_name = "test_schema";
712730
let table_name = "test_table";
713-
let sql = create_delete_sql(&schema, table_name, &[1], 3);
731+
let sql = create_delete_sql(&schema, schema_name, table_name, &[1], 3);
714732
check(
715733
sql,
716-
expect!["DELETE FROM test_table WHERE (b) in (($1), ($2), ($3))"],
734+
expect![[
735+
r#"DELETE FROM "test_schema"."test_table" WHERE ("b") in (($1), ($2), ($3))"#
736+
]],
717737
);
718738
let table_name = "test_table";
719-
let sql = create_delete_sql(&schema, table_name, &[0, 1], 3);
739+
let sql = create_delete_sql(&schema, schema_name, table_name, &[0, 1], 3);
720740
check(
721741
sql,
722-
expect!["DELETE FROM test_table WHERE (a, b) in (($1, $2), ($3, $4), ($5, $6))"],
742+
expect![[
743+
r#"DELETE FROM "test_schema"."test_table" WHERE ("a", "b") in (($1, $2), ($3, $4), ($5, $6))"#
744+
]],
723745
);
724746
}
725747

@@ -735,13 +757,14 @@ mod tests {
735757
name: "b".to_owned(),
736758
},
737759
]);
760+
let schema_name = "test_schema";
738761
let table_name = "test_table";
739-
let sql = create_upsert_sql(&schema, table_name, &[1], 3);
762+
let sql = create_upsert_sql(&schema, schema_name, table_name, &[1], 3);
740763
check(
741764
sql,
742-
expect![
743-
"INSERT INTO test_table (a, b) VALUES ($1, $2), ($3, $4), ($5, $6) on conflict (b) do update set a = EXCLUDED.a"
744-
],
765+
expect![[
766+
r#"INSERT INTO "test_schema"."test_table" (a, b) VALUES ($1, $2), ($3, $4), ($5, $6) on conflict ("b") do update set "a" = EXCLUDED."a""#
767+
]],
745768
);
746769
}
747770
}

0 commit comments

Comments
 (0)