Skip to content

Commit eb2ffda

Browse files
committed
[python] Introduce TableUpsertByKey for data-evolution table
1 parent 040d4d6 commit eb2ffda

File tree

4 files changed

+1240
-0
lines changed

4 files changed

+1240
-0
lines changed

docs/content/pypaimon/data-evolution.md

Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,134 @@ rb = table.new_read_builder().with_filter(pb.equal('_ROW_ID', 0))
106106
result = rb.new_read().to_arrow(rb.new_scan().plan().splits())
107107
```
108108

109+
## Upsert By Key
110+
111+
If you want to **upsert** (update-or-insert) rows by one or more business key columns — without manually providing
112+
`_ROW_ID` — use `upsert_by_arrow_with_key`. For each input row:
113+
114+
- **Key matches** an existing row → update that row in place.
115+
- **No match** → append as a new row.
116+
117+
**Requirements**
118+
119+
- The table must have `data-evolution.enabled = true` and `row-tracking.enabled = true`.
120+
- All `upsert_keys` must exist in both the table schema and the input data.
121+
- For **partitioned tables**, the input data must contain all partition key columns. Partition keys are
122+
**automatically stripped** from `upsert_keys` during matching (since each partition is processed independently),
123+
so you do **not** need to include them in `upsert_keys`.
124+
125+
**Example: basic upsert**
126+
127+
```python
128+
import pyarrow as pa
129+
from pypaimon import CatalogFactory, Schema
130+
131+
catalog = CatalogFactory.create({'warehouse': '/tmp/warehouse'})
132+
catalog.create_database('default', False)
133+
134+
pa_schema = pa.schema([
135+
('id', pa.int32()),
136+
('name', pa.string()),
137+
('age', pa.int32()),
138+
])
139+
schema = Schema.from_pyarrow_schema(
140+
pa_schema,
141+
options={'row-tracking.enabled': 'true', 'data-evolution.enabled': 'true'},
142+
)
143+
catalog.create_table('default.users', schema, False)
144+
table = catalog.get_table('default.users')
145+
146+
# write initial data
147+
write_builder = table.new_batch_write_builder()
148+
write = write_builder.new_write()
149+
commit = write_builder.new_commit()
150+
write.write_arrow(pa.Table.from_pydict(
151+
{'id': [1, 2], 'name': ['Alice', 'Bob'], 'age': [30, 25]},
152+
schema=pa_schema,
153+
))
154+
commit.commit(write.prepare_commit())
155+
write.close()
156+
commit.close()
157+
158+
# upsert: update id=1, insert id=3
159+
write_builder = table.new_batch_write_builder()
160+
table_update = write_builder.new_update()
161+
table_commit = write_builder.new_commit()
162+
163+
upsert_data = pa.Table.from_pydict(
164+
{'id': [1, 3], 'name': ['Alice_v2', 'Charlie'], 'age': [31, 28]},
165+
schema=pa_schema,
166+
)
167+
cmts = table_update.upsert_by_arrow_with_key(upsert_data, upsert_keys=['id'])
168+
table_commit.commit(cmts)
169+
table_commit.close()
170+
171+
# content should be:
172+
# id=1: name='Alice_v2', age=31 (updated)
173+
# id=2: name='Bob', age=25 (unchanged)
174+
# id=3: name='Charlie', age=28 (new)
175+
```
176+
177+
**Example: partial-column upsert with `update_cols`**
178+
179+
Combine `with_update_type` with `upsert_by_arrow_with_key` to update only specific columns for
180+
matched rows while still appending full rows for new keys:
181+
182+
```python
183+
write_builder = table.new_batch_write_builder()
184+
table_update = write_builder.new_update().with_update_type(['age'])
185+
table_commit = write_builder.new_commit()
186+
187+
upsert_data = pa.Table.from_pydict(
188+
{'id': [1, 4], 'name': ['ignored', 'David'], 'age': [99, 22]},
189+
schema=pa_schema,
190+
)
191+
cmts = table_update.upsert_by_arrow_with_key(upsert_data, upsert_keys=['id'])
192+
table_commit.commit(cmts)
193+
table_commit.close()
194+
195+
# id=1: only 'age' is updated to 99; 'name' remains 'Alice_v2'
196+
# id=4: appended as a full new row
197+
```
198+
199+
**Example: partitioned table with composite key**
200+
201+
```python
202+
partitioned_schema = pa.schema([
203+
('id', pa.int32()),
204+
('name', pa.string()),
205+
('region', pa.string()),
206+
])
207+
schema = Schema.from_pyarrow_schema(
208+
partitioned_schema,
209+
partition_keys=['region'],
210+
options={'row-tracking.enabled': 'true', 'data-evolution.enabled': 'true'},
211+
)
212+
catalog.create_table('default.users_partitioned', schema, False)
213+
table = catalog.get_table('default.users_partitioned')
214+
215+
# ... write initial data ...
216+
217+
write_builder = table.new_batch_write_builder()
218+
table_update = write_builder.new_update()
219+
table_commit = write_builder.new_commit()
220+
221+
upsert_data = pa.Table.from_pydict(
222+
{'id': [1, 3], 'name': ['Alice_v2', 'Charlie'], 'region': ['US', 'EU']},
223+
schema=partitioned_schema,
224+
)
225+
# upsert_keys=['id'] only; partition key 'region' is auto-stripped
226+
cmts = table_update.upsert_by_arrow_with_key(upsert_data, upsert_keys=['id'])
227+
table_commit.commit(cmts)
228+
table_commit.close()
229+
```
230+
231+
**Notes**
232+
233+
- Execution is driven **partition-by-partition**: only one partition's key set is loaded into memory at a time.
234+
- Duplicate keys in the input data will raise an error.
235+
- The upsert is atomic per commit — all matched updates and new appends are included in the same commit.
236+
109237
## Update Columns By Shards
110238

111239
If you want to **compute a derived column** (or **update an existing column based on other columns**) without providing

0 commit comments

Comments
 (0)