Skip to content

Commit af8ce43

Browse files
[Converter] Commit high watermark of converter processed up to as part of snapshot properties (#589)
* [Converter] Commit high watermark of converter processed up to as part of snapshot properties * Remove print statement --------- Co-authored-by: Miranda <yiqin121@gmail.com>
1 parent 821ea59 commit af8ce43

File tree

7 files changed

+143
-17
lines changed

7 files changed

+143
-17
lines changed

deltacat/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@
126126

127127
deltacat.logs.configure_deltacat_logger(logging.getLogger(__name__))
128128

129-
__version__ = "2.0.0.post6"
129+
__version__ = "2.0.0.post7"
130130

131131

132132
__all__ = [

deltacat/compute/converter/converter_session.py

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,8 @@ def converter_session(
8080
- file_system: File system instance
8181
- location_provider_prefix_override: Optional prefix override for file locations
8282
- position_delete_for_multiple_data_files: Whether to generate position deletes for multiple data files
83+
- start_snapshot_id: Optional starting snapshot ID for filtering files (files from this snapshot onwards will be processed)
84+
- start_sequence_number: Optional starting sequence number for filtering files (used in conjunction with start_snapshot_id)
8385
**kwargs: Additional keyword arguments (currently unused)
8486
8587
Returns:
@@ -123,14 +125,26 @@ def converter_session(
123125
position_delete_for_multiple_data_files = (
124126
params.position_delete_for_multiple_data_files
125127
)
128+
start_snapshot_id = params.start_snapshot_id
129+
start_sequence_number = params.start_sequence_number
130+
logger.info(
131+
f"Converter session parameters - start_snapshot_id: {start_snapshot_id}, start_sequence_number: {start_sequence_number}"
132+
)
126133

127134
logger.info(f"Fetching all bucket files for table {table_identifier}...")
128-
data_file_dict, equality_delete_dict, pos_delete_dict = fetch_all_bucket_files(
129-
table=iceberg_table
130-
)
135+
(
136+
data_file_dict,
137+
equality_delete_dict,
138+
pos_delete_dict,
139+
latest_snapshot_id,
140+
largest_sequence_number,
141+
) = fetch_all_bucket_files(table=iceberg_table, start_snapshot_id=start_snapshot_id)
131142
logger.info(
132143
f"Fetched files - data: {len(data_file_dict)}, equality_delete: {len(equality_delete_dict)}, pos_delete: {len(pos_delete_dict)}"
133144
)
145+
logger.info(
146+
f"Latest snapshot ID: {latest_snapshot_id}, Largest sequence number: {largest_sequence_number}"
147+
)
134148

135149
convert_input_files_for_all_buckets = group_all_files_to_each_bucket(
136150
data_file_dict=data_file_dict,
@@ -350,20 +364,26 @@ def convert_input_provider(index: int, item: Any) -> Dict[str, ConvertInput]:
350364
converter_snapshot_id = commit_append_snapshot(
351365
iceberg_table=iceberg_table,
352366
new_position_delete_files=to_be_added_files_list,
367+
latest_snapshot_id=latest_snapshot_id,
368+
largest_sequence_number=largest_sequence_number,
353369
)
354370
elif snapshot_type == SnapshotType.REPLACE:
355371
logger.info(f"Committing replace snapshot for {table_identifier}.")
356372
converter_snapshot_id = commit_replace_snapshot(
357373
iceberg_table=iceberg_table,
358374
to_be_deleted_files=to_be_deleted_files_list,
359375
new_position_delete_files=to_be_added_files_list,
376+
latest_snapshot_id=latest_snapshot_id,
377+
largest_sequence_number=largest_sequence_number,
360378
)
361379
elif snapshot_type == SnapshotType.DELETE:
362380
logger.info(f"Committing delete snapshot for {table_identifier}.")
363381
converter_snapshot_id = commit_replace_snapshot(
364382
iceberg_table=iceberg_table,
365383
to_be_deleted_files=to_be_deleted_files_list,
366384
new_position_delete_files=[], # No new files to add
385+
latest_snapshot_id=latest_snapshot_id,
386+
largest_sequence_number=largest_sequence_number,
367387
)
368388
else:
369389
logger.warning(f"Unexpected snapshot type: {snapshot_type}")

deltacat/compute/converter/model/converter_session_params.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ def of(params: Optional[Dict[str, Any]]) -> ConverterSessionParams:
4949
)
5050
result.s3_prefix_override = params.get("s3_prefix_override", None)
5151
result.fileio_override = params.get("fileio_override", None)
52+
result.start_snapshot_id = params.get("start_snapshot_id", None)
53+
result.start_sequence_number = params.get("start_sequence_number", None)
5254

5355
return result
5456

@@ -155,3 +157,19 @@ def fileio_override(self) -> Optional[FileIO]:
155157
@fileio_override.setter
156158
def fileio_override(self, fileio_override: Optional[FileIO]) -> None:
157159
self["fileio_override"] = fileio_override
160+
161+
@property
162+
def start_snapshot_id(self) -> Optional[int]:
163+
return self["start_snapshot_id"]
164+
165+
@start_snapshot_id.setter
166+
def start_snapshot_id(self, start_snapshot_id: Optional[int]) -> None:
167+
self["start_snapshot_id"] = start_snapshot_id
168+
169+
@property
170+
def start_sequence_number(self) -> Optional[int]:
171+
return self["start_sequence_number"]
172+
173+
@start_sequence_number.setter
174+
def start_sequence_number(self, start_sequence_number: Optional[int]) -> None:
175+
self["start_sequence_number"] = start_sequence_number

deltacat/compute/converter/pyiceberg/overrides.py

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,13 @@ def parquet_files_dict_to_iceberg_data_files(
203203
def fetch_all_bucket_files(
204204
table: Table,
205205
start_snapshot_id: Optional[int] = None,
206-
) -> Tuple[Dict[Any, DataFileList], Dict[Any, DataFileList], Dict[Any, DataFileList]]:
206+
) -> Tuple[
207+
Dict[Any, DataFileList],
208+
Dict[Any, DataFileList],
209+
Dict[Any, DataFileList],
210+
Optional[int],
211+
Optional[int],
212+
]:
207213
# step 1: filter manifests using partition summaries
208214
# the filter depends on the partition spec used to write the manifest file, so create a cache of filters for each spec id
209215

@@ -213,7 +219,7 @@ def fetch_all_bucket_files(
213219
current_snapshot = data_scan.snapshot()
214220

215221
if not current_snapshot:
216-
return {}, {}, {}
222+
return {}, {}, {}, None, None
217223

218224
snapshots = list(table.metadata.snapshots)
219225
expected_start_sequence_number = -1
@@ -350,11 +356,33 @@ def fetch_all_bucket_files(
350356
for partition_value, files_dict in positional_delete_entries_registry.items():
351357
positional_delete_entries[partition_value] = list(files_dict.values())
352358

359+
# Calculate latest snapshot ID and largest sequence number from snapshots
360+
latest_snapshot_id = None
361+
largest_sequence_number = None
362+
363+
if snapshots:
364+
# Get the latest snapshot ID (current snapshot)
365+
latest_snapshot_id = current_snapshot.snapshot_id
366+
367+
# Find the largest sequence number across all snapshots
368+
largest_sequence_number = max(
369+
snapshot.sequence_number for snapshot in snapshots
370+
)
371+
353372
logger.info(
354373
f"Fetched {sum(len(files) for files in data_entries.values())} data files from table, "
355374
f"{sum(len(files) for files in equality_data_entries.values())} equality delete files, "
356375
f"{sum(len(files) for files in positional_delete_entries.values())} position delete files"
357376
)
377+
logger.info(
378+
f"Latest snapshot ID: {latest_snapshot_id}, Largest sequence number: {largest_sequence_number}"
379+
)
358380
for k, v in data_entries.items():
359381
logger.info(f"{len(v)} files for partition value :{k}")
360-
return data_entries, equality_data_entries, positional_delete_entries
382+
return (
383+
data_entries,
384+
equality_data_entries,
385+
positional_delete_entries,
386+
latest_snapshot_id,
387+
largest_sequence_number,
388+
)

deltacat/compute/converter/pyiceberg/update_snapshot_overrides.py

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,10 @@ def _get_entries(manifest: ManifestFile) -> List[ManifestEntry]:
167167

168168

169169
def commit_append_snapshot(
170-
iceberg_table: Table, new_position_delete_files: List[DataFile]
170+
iceberg_table: Table,
171+
new_position_delete_files: List[DataFile],
172+
latest_snapshot_id: int = None,
173+
largest_sequence_number: int = None,
171174
) -> str:
172175
tx = iceberg_table.transaction()
173176
try:
@@ -177,7 +180,21 @@ def commit_append_snapshot(
177180
"schema.name-mapping.default": tx.table_metadata.schema().name_mapping.model_dump_json()
178181
}
179182
)
180-
with append_delete_files_override(tx.update_snapshot()) as append_snapshot:
183+
184+
# Prepare snapshot properties
185+
snapshot_properties = {}
186+
if latest_snapshot_id is not None:
187+
snapshot_properties["sourceSnapshotId"] = str(latest_snapshot_id)
188+
if largest_sequence_number is not None:
189+
snapshot_properties["sourceSequenceNumber"] = str(largest_sequence_number)
190+
191+
logger.info(
192+
f"Committing append snapshot with properties: {snapshot_properties}"
193+
)
194+
195+
with append_delete_files_override(
196+
tx.update_snapshot(snapshot_properties=snapshot_properties)
197+
) as append_snapshot:
181198
if new_position_delete_files:
182199
for data_file in new_position_delete_files:
183200
append_snapshot.append_data_file(data_file)
@@ -207,6 +224,7 @@ def commit_snapshot_properties_change(iceberg_table: Table):
207224
except Exception as e:
208225
raise e
209226
else:
227+
logger.info(f"Commit only table properties changes: {current_snapshot_id}")
210228
metadata = tx.commit_transaction().metadata
211229
logger.info(
212230
f"Successfully committed only table properties change with ray.converter.snapshot_id:{current_snapshot_id}"
@@ -305,6 +323,8 @@ def commit_replace_snapshot(
305323
iceberg_table: Table,
306324
new_position_delete_files: List[DataFile],
307325
to_be_deleted_files: List[DataFile],
326+
latest_snapshot_id: int = None,
327+
largest_sequence_number: int = None,
308328
) -> str:
309329
tx = iceberg_table.transaction()
310330
try:
@@ -314,8 +334,20 @@ def commit_replace_snapshot(
314334
"schema.name-mapping.default": tx.table_metadata.schema().name_mapping.model_dump_json()
315335
}
316336
)
337+
338+
# Prepare snapshot properties
339+
snapshot_properties = {}
340+
if latest_snapshot_id is not None:
341+
snapshot_properties["sourceSnapshotId"] = str(latest_snapshot_id)
342+
if largest_sequence_number is not None:
343+
snapshot_properties["sourceSequenceNumber"] = str(largest_sequence_number)
344+
345+
logger.info(
346+
f"Committing replace snapshot with properties: {snapshot_properties}"
347+
)
348+
317349
with replace_delete_files_override(
318-
tx.update_snapshot()
350+
tx.update_snapshot(snapshot_properties=snapshot_properties)
319351
) as replace_delete_snapshot:
320352
if new_position_delete_files:
321353
for data_file in new_position_delete_files:

deltacat/tests/compute/converter/integration/test_convert_session.py

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -490,7 +490,13 @@ def test_converter(
490490

491491
# Get files and create convert input
492492
tbl = session_catalog.load_table(identifier)
493-
data_file_dict, equality_delete_dict, pos_delete_dict = fetch_all_bucket_files(tbl)
493+
(
494+
data_file_dict,
495+
equality_delete_dict,
496+
pos_delete_dict,
497+
_,
498+
_,
499+
) = fetch_all_bucket_files(tbl)
494500

495501
# Handle equality delete if present
496502
if "equality_delete_data" in test_case:
@@ -606,7 +612,13 @@ def test_converter_session_duplicate_position_deletes_spark_compatibility(
606612

607613
# Load table and run converter first time
608614
tbl = session_catalog.load_table(identifier)
609-
data_file_dict, equality_delete_dict, pos_delete_dict = fetch_all_bucket_files(tbl)
615+
(
616+
data_file_dict,
617+
equality_delete_dict,
618+
pos_delete_dict,
619+
_,
620+
_,
621+
) = fetch_all_bucket_files(tbl)
610622

611623
convert_input_files_for_all_buckets = group_all_files_to_each_bucket(
612624
data_file_dict=data_file_dict,
@@ -675,7 +687,13 @@ def test_converter_session_duplicate_position_deletes_spark_compatibility(
675687
# This could happen in scenarios where the converter is run multiple times on the same data
676688

677689
# Get files again (now includes position delete files from first run)
678-
data_file_dict, equality_delete_dict, pos_delete_dict = fetch_all_bucket_files(tbl)
690+
(
691+
data_file_dict,
692+
equality_delete_dict,
693+
pos_delete_dict,
694+
_,
695+
_,
696+
) = fetch_all_bucket_files(tbl)
679697

680698
convert_input_files_for_all_buckets = group_all_files_to_each_bucket(
681699
data_file_dict=data_file_dict,
@@ -850,9 +868,13 @@ def test_converter_session_no_input_files(
850868
)
851869

852870
# Verify table is empty (no data files)
853-
data_file_dict, equality_delete_dict, pos_delete_dict = fetch_all_bucket_files(
854-
tbl
855-
)
871+
(
872+
data_file_dict,
873+
equality_delete_dict,
874+
pos_delete_dict,
875+
_,
876+
_,
877+
) = fetch_all_bucket_files(tbl)
856878

857879
assert (
858880
len(data_file_dict) == 0

deltacat/tests/compute/converter/integration/test_converter_commit_conflict_resolution.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -414,7 +414,13 @@ def test_converter_commit_conflict_resolution(
414414
# Get files and create convert input
415415
tbl = session_catalog.load_table(identifier)
416416

417-
data_file_dict, equality_delete_dict, pos_delete_dict = fetch_all_bucket_files(tbl)
417+
(
418+
data_file_dict,
419+
equality_delete_dict,
420+
pos_delete_dict,
421+
_,
422+
_,
423+
) = fetch_all_bucket_files(tbl)
418424

419425
# Handle equality delete if present
420426
if "equality_delete_data" in test_case:

0 commit comments

Comments
 (0)