Skip to content

Commit 047e160

Browse files
Add integration test
1 parent d53805c commit 047e160

File tree

1 file changed

+109
-0
lines changed

1 file changed

+109
-0
lines changed

tests/core/engine_adapter/integration/test_integration.py

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1897,6 +1897,115 @@ def _mutate_config(current_gateway_name: str, config: Config):
18971897
ctx.cleanup(context)
18981898

18991899

1900+
def test_incremental_by_unique_key_model_when_matched(ctx: TestContext):
1901+
if not ctx.supports_merge:
1902+
pytest.skip(f"{ctx.dialect} on {ctx.gateway} doesnt support merge")
1903+
1904+
# DuckDB and some other engines use logical_merge which doesn't support when_matched
1905+
if ctx.dialect not in ["bigquery", "databricks", "postgres", "snowflake", "spark"]:
1906+
pytest.skip(f"{ctx.dialect} doesn't support native MERGE with when_matched clause")
1907+
1908+
context = ctx.create_context()
1909+
schema = ctx.schema(TEST_SCHEMA)
1910+
1911+
# Create seed data with multiple days
1912+
seed_query = ctx.input_data(
1913+
pd.DataFrame(
1914+
[
1915+
[1, "item_a", 100, "2020-01-01"],
1916+
[2, "item_b", 200, "2020-01-01"],
1917+
[1, "item_a_changed", 150, "2020-01-02"], # Same item_id, different name and value
1918+
[2, "item_b_changed", 250, "2020-01-02"], # Same item_id, different name and value
1919+
[3, "item_c", 300, "2020-01-02"], # New item on day 2
1920+
],
1921+
columns=["item_id", "name", "value", "event_date"],
1922+
),
1923+
columns_to_types={
1924+
"item_id": exp.DataType.build("integer"),
1925+
"name": exp.DataType.build("text"),
1926+
"value": exp.DataType.build("integer"),
1927+
"event_date": exp.DataType.build("date"),
1928+
},
1929+
)
1930+
context.upsert_model(
1931+
create_sql_model(name=f"{schema}.seed_model", query=seed_query, kind="FULL")
1932+
)
1933+
1934+
table_format = ""
1935+
if ctx.dialect == "athena":
1936+
# INCREMENTAL_BY_UNIQUE_KEY uses MERGE which is only supported in Athena on Iceberg tables
1937+
table_format = "table_format iceberg,"
1938+
1939+
# Create model with when_matched clause that only updates the value column
1940+
# BUT keeps the existing name column unchanged
1941+
# batch_size=1 is so that we trigger merge on second batch and verify behaviour of when_matched
1942+
context.upsert_model(
1943+
load_sql_based_model(
1944+
d.parse(
1945+
f"""MODEL (
1946+
name {schema}.test_model_when_matched,
1947+
kind INCREMENTAL_BY_UNIQUE_KEY (
1948+
unique_key item_id,
1949+
batch_size 1,
1950+
when_matched WHEN MATCHED THEN UPDATE SET target.value = source.value, target.event_date = source.event_date
1951+
),
1952+
{table_format}
1953+
start '2020-01-01',
1954+
end '2020-01-02',
1955+
cron '@daily'
1956+
);
1957+
1958+
select item_id, name, value, event_date
1959+
from {schema}.seed_model
1960+
where event_date between @start_date and @end_date""",
1961+
)
1962+
)
1963+
)
1964+
1965+
try:
1966+
# Initial plan to create the model and run it
1967+
context.plan(auto_apply=True, no_prompts=True)
1968+
1969+
test_model = context.get_model(f"{schema}.test_model_when_matched")
1970+
1971+
# Verify that the model has the when_matched clause configured
1972+
assert test_model.kind.when_matched is not None
1973+
1974+
actual_df = (
1975+
ctx.get_current_data(test_model.fqn).sort_values(by="item_id").reset_index(drop=True)
1976+
)
1977+
1978+
# Expected results after batch processing:
1979+
# - Day 1: Items 1 and 2 are inserted (first insert)
1980+
# - Day 2: Items 1 and 2 are merged (when_matched clause preserves names but updates values/dates)
1981+
# Item 3 is inserted as new
1982+
expected_df = (
1983+
pd.DataFrame(
1984+
[
1985+
[1, "item_a", 150, "2020-01-02"], # name from day 1, value and date from day 2
1986+
[2, "item_b", 250, "2020-01-02"], # name from day 1, value and date from day 2
1987+
[3, "item_c", 300, "2020-01-02"], # new item from day 2
1988+
],
1989+
columns=["item_id", "name", "value", "event_date"],
1990+
)
1991+
.sort_values(by="item_id")
1992+
.reset_index(drop=True)
1993+
)
1994+
1995+
# Convert date columns to string for comparison
1996+
actual_df["event_date"] = actual_df["event_date"].astype(str)
1997+
expected_df["event_date"] = expected_df["event_date"].astype(str)
1998+
1999+
pd.testing.assert_frame_equal(
2000+
actual_df,
2001+
expected_df,
2002+
check_dtype=False,
2003+
)
2004+
2005+
finally:
2006+
ctx.cleanup(context)
2007+
2008+
19002009
def test_managed_model_upstream_forward_only(ctx: TestContext):
19012010
"""
19022011
This scenario goes as follows:

0 commit comments

Comments
 (0)