@@ -1897,6 +1897,115 @@ def _mutate_config(current_gateway_name: str, config: Config):
18971897 ctx .cleanup (context )
18981898
18991899
1900+ def test_incremental_by_unique_key_model_when_matched (ctx : TestContext ):
1901+ if not ctx .supports_merge :
1902+ pytest .skip (f"{ ctx .dialect } on { ctx .gateway } doesnt support merge" )
1903+
1904+ # DuckDB and some other engines use logical_merge which doesn't support when_matched
1905+ if ctx .dialect not in ["bigquery" , "databricks" , "postgres" , "snowflake" , "spark" ]:
1906+ pytest .skip (f"{ ctx .dialect } doesn't support native MERGE with when_matched clause" )
1907+
1908+ context = ctx .create_context ()
1909+ schema = ctx .schema (TEST_SCHEMA )
1910+
1911+ # Create seed data with multiple days
1912+ seed_query = ctx .input_data (
1913+ pd .DataFrame (
1914+ [
1915+ [1 , "item_a" , 100 , "2020-01-01" ],
1916+ [2 , "item_b" , 200 , "2020-01-01" ],
1917+ [1 , "item_a_changed" , 150 , "2020-01-02" ], # Same item_id, different name and value
1918+ [2 , "item_b_changed" , 250 , "2020-01-02" ], # Same item_id, different name and value
1919+ [3 , "item_c" , 300 , "2020-01-02" ], # New item on day 2
1920+ ],
1921+ columns = ["item_id" , "name" , "value" , "event_date" ],
1922+ ),
1923+ columns_to_types = {
1924+ "item_id" : exp .DataType .build ("integer" ),
1925+ "name" : exp .DataType .build ("text" ),
1926+ "value" : exp .DataType .build ("integer" ),
1927+ "event_date" : exp .DataType .build ("date" ),
1928+ },
1929+ )
1930+ context .upsert_model (
1931+ create_sql_model (name = f"{ schema } .seed_model" , query = seed_query , kind = "FULL" )
1932+ )
1933+
1934+ table_format = ""
1935+ if ctx .dialect == "athena" :
1936+ # INCREMENTAL_BY_UNIQUE_KEY uses MERGE which is only supported in Athena on Iceberg tables
1937+ table_format = "table_format iceberg,"
1938+
1939+ # Create model with when_matched clause that only updates the value column
1940+ # BUT keeps the existing name column unchanged
1941+ # batch_size=1 is so that we trigger merge on second batch and verify behaviour of when_matched
1942+ context .upsert_model (
1943+ load_sql_based_model (
1944+ d .parse (
1945+ f"""MODEL (
1946+ name { schema } .test_model_when_matched,
1947+ kind INCREMENTAL_BY_UNIQUE_KEY (
1948+ unique_key item_id,
1949+ batch_size 1,
1950+ when_matched WHEN MATCHED THEN UPDATE SET target.value = source.value, target.event_date = source.event_date
1951+ ),
1952+ { table_format }
1953+ start '2020-01-01',
1954+ end '2020-01-02',
1955+ cron '@daily'
1956+ );
1957+
1958+ select item_id, name, value, event_date
1959+ from { schema } .seed_model
1960+ where event_date between @start_date and @end_date""" ,
1961+ )
1962+ )
1963+ )
1964+
1965+ try :
1966+ # Initial plan to create the model and run it
1967+ context .plan (auto_apply = True , no_prompts = True )
1968+
1969+ test_model = context .get_model (f"{ schema } .test_model_when_matched" )
1970+
1971+ # Verify that the model has the when_matched clause configured
1972+ assert test_model .kind .when_matched is not None
1973+
1974+ actual_df = (
1975+ ctx .get_current_data (test_model .fqn ).sort_values (by = "item_id" ).reset_index (drop = True )
1976+ )
1977+
1978+ # Expected results after batch processing:
1979+ # - Day 1: Items 1 and 2 are inserted (first insert)
1980+ # - Day 2: Items 1 and 2 are merged (when_matched clause preserves names but updates values/dates)
1981+ # Item 3 is inserted as new
1982+ expected_df = (
1983+ pd .DataFrame (
1984+ [
1985+ [1 , "item_a" , 150 , "2020-01-02" ], # name from day 1, value and date from day 2
1986+ [2 , "item_b" , 250 , "2020-01-02" ], # name from day 1, value and date from day 2
1987+ [3 , "item_c" , 300 , "2020-01-02" ], # new item from day 2
1988+ ],
1989+ columns = ["item_id" , "name" , "value" , "event_date" ],
1990+ )
1991+ .sort_values (by = "item_id" )
1992+ .reset_index (drop = True )
1993+ )
1994+
1995+ # Convert date columns to string for comparison
1996+ actual_df ["event_date" ] = actual_df ["event_date" ].astype (str )
1997+ expected_df ["event_date" ] = expected_df ["event_date" ].astype (str )
1998+
1999+ pd .testing .assert_frame_equal (
2000+ actual_df ,
2001+ expected_df ,
2002+ check_dtype = False ,
2003+ )
2004+
2005+ finally :
2006+ ctx .cleanup (context )
2007+
2008+
19002009def test_managed_model_upstream_forward_only (ctx : TestContext ):
19012010 """
19022011 This scenario goes as follows:
0 commit comments