Skip to content

Commit bb0d4dc

Browse files
committed
[python] Deduplicate input to keep last one in TableUpsertByKey
1 parent 7a76c43 commit bb0d4dc

File tree

3 files changed

+96
-15
lines changed

3 files changed

+96
-15
lines changed

docs/content/pypaimon/data-evolution.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,7 @@ table_commit.close()
231231
**Notes**
232232

233233
- Execution is driven **partition-by-partition**: only one partition's key set is loaded into memory at a time.
234-
- Duplicate keys in the input data will raise an error.
234+
- Duplicate keys in the input data are automatically deduplicated — the **last occurrence** is kept.
235235
- The upsert is atomic per commit — all matched updates and new appends are included in the same commit.
236236

237237
## Update Columns By Shards

paimon-python/pypaimon/tests/table_upsert_by_key_test.py

Lines changed: 82 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -493,21 +493,93 @@ def test_empty_data_raises(self):
493493
tu.upsert_by_arrow_with_key(data, upsert_keys=['id'])
494494
self.assertIn('empty', str(ctx.exception))
495495

496-
def test_duplicate_keys_in_input_raises(self):
497-
"""Duplicate composite keys in input data should raise ValueError."""
496+
def test_duplicate_keys_in_input_keeps_last(self):
497+
"""Duplicate keys in input data should keep the last occurrence."""
498498
table = self._create_table()
499-
data = pa.Table.from_pydict({
500-
'id': [1, 1],
501-
'name': ['A', 'B'],
499+
self._write(table, pa.Table.from_pydict({
500+
'id': [1, 2],
501+
'name': ['Alice', 'Bob'],
502502
'age': [25, 30],
503503
'city': ['NYC', 'LA'],
504+
}, schema=self.pa_schema))
505+
506+
# id=1 appears twice; the second row (name='A_last') should win
507+
data = pa.Table.from_pydict({
508+
'id': [1, 1],
509+
'name': ['A_first', 'A_last'],
510+
'age': [90, 91],
511+
'city': ['X', 'Y'],
504512
}, schema=self.pa_schema)
513+
self._upsert(table, data, upsert_keys=['id'])
505514

506-
with self.assertRaises(ValueError) as ctx:
507-
wb = table.new_batch_write_builder()
508-
tu = wb.new_update()
509-
tu.upsert_by_arrow_with_key(data, upsert_keys=['id'])
510-
self.assertIn('duplicate', str(ctx.exception).lower())
515+
result = self._read_all(table)
516+
rows = {r: (n, a, c) for r, n, a, c in zip(
517+
result['id'].to_pylist(),
518+
result['name'].to_pylist(),
519+
result['age'].to_pylist(),
520+
result['city'].to_pylist(),
521+
)}
522+
# id=1 updated with last duplicate row
523+
self.assertEqual(rows[1], ('A_last', 91, 'Y'))
524+
# id=2 unchanged
525+
self.assertEqual(rows[2], ('Bob', 30, 'LA'))
526+
527+
def test_duplicate_keys_all_new_keeps_last(self):
528+
"""Duplicate keys in input on empty table keeps the last occurrence."""
529+
table = self._create_table()
530+
531+
# id=1 appears three times; last row should win
532+
data = pa.Table.from_pydict({
533+
'id': [1, 1, 1, 2],
534+
'name': ['A1', 'A2', 'A3', 'B'],
535+
'age': [10, 20, 30, 40],
536+
'city': ['X1', 'X2', 'X3', 'Y'],
537+
}, schema=self.pa_schema)
538+
self._upsert(table, data, upsert_keys=['id'])
539+
540+
result = self._read_all(table)
541+
self.assertEqual(result.num_rows, 2)
542+
rows = {r: (n, a, c) for r, n, a, c in zip(
543+
result['id'].to_pylist(),
544+
result['name'].to_pylist(),
545+
result['age'].to_pylist(),
546+
result['city'].to_pylist(),
547+
)}
548+
self.assertEqual(rows[1], ('A3', 30, 'X3'))
549+
self.assertEqual(rows[2], ('B', 40, 'Y'))
550+
551+
def test_duplicate_keys_partitioned_keeps_last(self):
552+
"""Duplicate keys in a partitioned table keep the last per partition."""
553+
table = self._create_table(
554+
pa_schema=self.partitioned_pa_schema,
555+
partition_keys=['region'],
556+
)
557+
self._write(table, pa.Table.from_pydict({
558+
'id': [1, 2],
559+
'name': ['Alice', 'Bob'],
560+
'age': [25, 30],
561+
'region': ['US', 'EU'],
562+
}, schema=self.partitioned_pa_schema))
563+
564+
# id=1 duplicated in US partition; id=2 duplicated in EU partition
565+
data = pa.Table.from_pydict({
566+
'id': [1, 1, 2, 2],
567+
'name': ['A_first', 'A_last', 'B_first', 'B_last'],
568+
'age': [50, 51, 60, 61],
569+
'region': ['US', 'US', 'EU', 'EU'],
570+
}, schema=self.partitioned_pa_schema)
571+
self._upsert(table, data, upsert_keys=['id'])
572+
573+
result = self._read_all(table)
574+
self.assertEqual(result.num_rows, 2)
575+
rows = {(r, reg): (n, a) for r, n, a, reg in zip(
576+
result['id'].to_pylist(),
577+
result['name'].to_pylist(),
578+
result['age'].to_pylist(),
579+
result['region'].to_pylist(),
580+
)}
581+
self.assertEqual(rows[(1, 'US')], ('A_last', 51))
582+
self.assertEqual(rows[(2, 'EU')], ('B_last', 61))
511583

512584
def test_partitioned_table_missing_partition_col_in_data_raises(self):
513585
"""Input data missing partition column should raise ValueError."""

paimon-python/pypaimon/write/table_upsert_by_key.py

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -164,12 +164,21 @@ def _upsert_partition(
164164
for i in range(partition_data.num_rows)
165165
]
166166

167-
# 2. Per-partition duplicate check
167+
# 2. Deduplicate: keep last occurrence of each key
168+
key_to_last_idx: Dict[_KeyTuple, int] = {}
169+
for i, key_tuple in enumerate(input_key_tuples):
170+
key_to_last_idx[key_tuple] = i # last write wins
171+
168172
input_key_set = set(input_key_tuples)
169173
if len(input_key_tuples) != len(input_key_set):
170-
raise ValueError(
171-
f"Input data contains duplicate values in upsert_keys columns "
172-
f"{match_keys} within partition {partition_spec}."
174+
original_count = len(input_key_tuples)
175+
dedup_indices = sorted(key_to_last_idx.values())
176+
partition_data = partition_data.take(dedup_indices)
177+
input_key_tuples = [input_key_tuples[i] for i in dedup_indices]
178+
logger.info(
179+
"Deduplicated input from %d to %d rows in partition %s "
180+
"(kept last occurrence).",
181+
original_count, len(input_key_tuples), partition_spec,
173182
)
174183

175184
# 3. Scan partition in batches, build key → _ROW_ID only for

0 commit comments

Comments
 (0)