diff --git a/python/tests/test_merge.py b/python/tests/test_merge.py index efec90900a..3e1ccc8a7e 100644 --- a/python/tests/test_merge.py +++ b/python/tests/test_merge.py @@ -2297,3 +2297,81 @@ def test_merge(tmp_path: pathlib.Path): new_df = pl.read_delta(str(tmp_path)) assert_frame_equal(df, new_df, check_row_order=False) + + +@pytest.mark.pyarrow +@pytest.mark.pandas +def test_merge_file_pruning_regression_3636(tmp_path: pathlib.Path): + """ + https://github.com/delta-io/delta-rs/issues/3636 + """ + import pyarrow as pa + import pandas as pd + import numpy as np + import deltalake + from deltalake import DeltaTable + + # create table once + dt = DeltaTable.create( + tmp_path, + pa.schema( + [ + pa.field("ts", pa.timestamp("us"), nullable=False), + pa.field("direction", pa.string(), nullable=False), # S, W, N, E + pa.field("distance", pa.float64(), nullable=False), + pa.field("updated_on", pa.int64(), nullable=False), # timestamp in ms + pa.field("date", pa.string(), nullable=False), + ] + ), + partition_by=["date"], + mode="ignore", + ) + + def mock_data(size: int): + tss = pd.date_range(start="2025-01-01", freq="2s", periods=size // 4) + + init_upd_df = { + "ts": [*tss] * 4, + "direction": [i for i in ("S", "W", "N", "E") for _ in range(len(tss))], + "distance": np.random.randint(0, 10000, size=len(tss) * 4), + "updated_on": list( + (tss.floor("D") + pd.Timedelta(hours=25)).astype(int) // 1000000 + ) + * 4, + "date": list(tss.strftime("%Y-%m-%d")) * 4, + } + return pd.DataFrame(init_upd_df) + + # Let's prefill the table large enough to spread the results across + # multiple files + init_df = mock_data(500_000) + + dt = DeltaTable(tmp_path) + + dt.merge( + init_df, + predicate="s.date = t.date and s.ts = t.ts and s.direction = t.direction", + source_alias="s", + target_alias="t", + ).when_not_matched_insert_all().execute() + + second_upd = mock_data(40) + + dt = DeltaTable(tmp_path) + + stats = ( + dt.merge( + second_upd, + predicate="s.date = t.date and s.ts = t.ts and s.direction = t.direction", + source_alias="s", + target_alias="t", + ) + .when_not_matched_insert_all() + .execute() + ) + + files_scanned = stats["num_target_files_scanned"] + + assert files_scanned <= 1, ( + f"The number of target files scanned was too large! {files_scanned}" + )