Skip to content

Commit 9b3c23b

Browse files
committed
do not batch on conflict do update
1 parent 5ae82f7 commit 9b3c23b

File tree

1 file changed

+20
-6
lines changed

1 file changed

+20
-6
lines changed

src/connector/src/sink/postgres.rs

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -268,6 +268,8 @@ struct ParameterBuffer<'a> {
268268
estimated_parameter_size: usize,
269269
/// current parameter buffer to be filled.
270270
current_parameter_buffer: Vec<Option<ScalarAdapter>>,
271+
/// Parameter upper bound
272+
parameter_upper_bound: usize,
271273
}
272274

273275
impl<'a> ParameterBuffer<'a> {
@@ -277,19 +279,20 @@ impl<'a> ParameterBuffer<'a> {
277279
const MAX_PARAMETERS: usize = 32768;
278280

279281
/// `flattened_chunk_size` is the number of datums in a single chunk.
280-
fn new(schema_types: &'a [PgType], flattened_chunk_size: usize) -> Self {
281-
let estimated_parameter_size = usize::min(Self::MAX_PARAMETERS, flattened_chunk_size);
282+
fn new(schema_types: &'a [PgType], parameter_upper_bound: usize) -> Self {
283+
let estimated_parameter_size = usize::min(Self::MAX_PARAMETERS, parameter_upper_bound);
282284
Self {
283285
parameters: vec![],
284286
column_length: schema_types.len(),
285287
schema_types,
286288
estimated_parameter_size,
287289
current_parameter_buffer: Vec::with_capacity(estimated_parameter_size),
290+
parameter_upper_bound,
288291
}
289292
}
290293

291294
fn add_row(&mut self, row: impl Row) {
292-
if self.current_parameter_buffer.len() + self.column_length >= Self::MAX_PARAMETERS {
295+
if self.current_parameter_buffer.len() + self.column_length >= self.parameter_upper_bound {
293296
self.new_buffer();
294297
}
295298
for (i, datum_ref) in row.iter().enumerate() {
@@ -401,7 +404,6 @@ impl PostgresSinkWriter {
401404
}
402405

403406
async fn write_batch_append_only(&mut self, chunk: StreamChunk) -> Result<()> {
404-
let mut transaction = self.client.transaction().await?;
405407
// 1d flattened array of parameters to be inserted.
406408
let mut parameter_buffer = ParameterBuffer::new(
407409
&self.schema_types,
@@ -420,6 +422,8 @@ impl PostgresSinkWriter {
420422
}
421423
}
422424
let (parameters, remaining) = parameter_buffer.into_parts();
425+
426+
let mut transaction = self.client.transaction().await?;
423427
Self::execute_parameter(
424428
Op::Insert,
425429
&mut transaction,
@@ -438,11 +442,20 @@ impl PostgresSinkWriter {
438442
}
439443

440444
async fn write_batch_non_append_only(&mut self, chunk: StreamChunk) -> Result<()> {
441-
let mut transaction = self.client.transaction().await?;
442445
// 1d flattened array of parameters to be inserted.
443446
let mut insert_parameter_buffer = ParameterBuffer::new(
444447
&self.schema_types,
445-
chunk.cardinality() * chunk.data_types().len(),
448+
// NOTE(kwannoel):
449+
// insert on conflict do update may have multiple
450+
// rows on the same PK.
451+
// In that case they could encounter the following PG error:
452+
// ERROR: ON CONFLICT DO UPDATE command cannot affect row a second time
453+
// HINT: Ensure that no rows proposed for insertion within the same command have duplicate constrained values
454+
// Given that JDBC sink does not batch their insert on conflict do update,
455+
// we can keep the behaviour consistent.
456+
//
457+
// We may opt for an optimization flag to toggle this behaviour in the future.
458+
chunk.data_types().len(),
446459
);
447460
let mut delete_parameter_buffer = ParameterBuffer::new(
448461
&self.schema_types,
@@ -461,6 +474,7 @@ impl PostgresSinkWriter {
461474
}
462475

463476
let (delete_parameters, delete_remaining_parameter) = delete_parameter_buffer.into_parts();
477+
let mut transaction = self.client.transaction().await?;
464478
Self::execute_parameter(
465479
Op::Delete,
466480
&mut transaction,

0 commit comments

Comments
 (0)