Skip to content

Commit d39236c

Browse files
Fix: Dont normalize aliases in merge and when matched
1 parent 95b1f6e commit d39236c

File tree

5 files changed

+57
-83
lines changed

5 files changed

+57
-83
lines changed

sqlmesh/core/dialect.py

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1412,18 +1412,10 @@ def replace_merge_table_aliases(
14121412
"""
14131413
from sqlmesh.core.engine_adapter.base import MERGE_SOURCE_ALIAS, MERGE_TARGET_ALIAS
14141414

1415-
normalized_merge_source_alias = quote_identifiers(
1416-
normalize_identifiers(exp.to_identifier(MERGE_SOURCE_ALIAS), dialect), dialect=dialect
1417-
)
1418-
1419-
normalized_merge_target_alias = quote_identifiers(
1420-
normalize_identifiers(exp.to_identifier(MERGE_TARGET_ALIAS), dialect), dialect=dialect
1421-
)
1422-
14231415
if isinstance(expression, exp.Column) and (first_part := expression.parts[0]):
14241416
if first_part.this.lower() in ("target", "dbt_internal_dest", "__merge_target__"):
1425-
first_part.replace(normalized_merge_target_alias)
1417+
first_part.replace(exp.to_identifier(MERGE_TARGET_ALIAS, quoted=True))
14261418
elif first_part.this.lower() in ("source", "dbt_internal_source", "__merge_source__"):
1427-
first_part.replace(normalized_merge_source_alias)
1419+
first_part.replace(exp.to_identifier(MERGE_SOURCE_ALIAS, quoted=True))
14281420

14291421
return expression

sqlmesh/core/model/kind.py

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -478,10 +478,9 @@ def _when_matched_validator(
478478
v = v[1:-1]
479479

480480
v = t.cast(exp.Whens, d.parse_one(v, into=exp.Whens, dialect=dialect))
481-
else:
482-
v = t.cast(exp.Whens, v.transform(d.replace_merge_table_aliases, dialect=dialect))
483481

484-
return validate_expression(v, dialect=dialect)
482+
v = validate_expression(v, dialect=dialect)
483+
return t.cast(exp.Whens, v.transform(d.replace_merge_table_aliases, dialect=dialect))
485484

486485
@field_validator("merge_filter", mode="before")
487486
def _merge_filter_validator(
@@ -497,10 +496,9 @@ def _merge_filter_validator(
497496
if isinstance(v, str):
498497
v = v.strip()
499498
v = d.parse_one(v, dialect=dialect)
500-
else:
501-
v = v.transform(d.replace_merge_table_aliases, dialect=dialect)
502499

503-
return validate_expression(v, dialect=dialect)
500+
v = validate_expression(v, dialect=dialect)
501+
return v.transform(d.replace_merge_table_aliases, dialect=dialect)
504502

505503
@property
506504
def data_hash_values(self) -> t.List[t.Optional[str]]:

tests/core/test_model.py

Lines changed: 25 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -5370,7 +5370,7 @@ def test_when_matched():
53705370
"""
53715371
)
53725372

5373-
expected_when_matched = "(WHEN MATCHED THEN UPDATE SET `__merge_target__`.`salary` = COALESCE(`__merge_source__`.`salary`, `__merge_target__`.`salary`))"
5373+
expected_when_matched = "(WHEN MATCHED THEN UPDATE SET `__MERGE_TARGET__`.`salary` = COALESCE(`__MERGE_SOURCE__`.`salary`, `__MERGE_TARGET__`.`salary`))"
53745374

53755375
model = load_sql_based_model(expressions, dialect="hive")
53765376
assert model.kind.when_matched.sql(dialect="hive") == expected_when_matched
@@ -5404,9 +5404,9 @@ def test_when_matched():
54045404
kind INCREMENTAL_BY_UNIQUE_KEY (
54055405
unique_key ("purchase_order_id"),
54065406
when_matched (
5407-
WHEN MATCHED AND "__merge_source__"."_operation" = 1 THEN DELETE
5408-
WHEN MATCHED AND "__merge_source__"."_operation" <> 1 THEN UPDATE SET
5409-
"__merge_target__"."purchase_order_id" = 1
5407+
WHEN MATCHED AND "__MERGE_SOURCE__"."_operation" = 1 THEN DELETE
5408+
WHEN MATCHED AND "__MERGE_SOURCE__"."_operation" <> 1 THEN UPDATE SET
5409+
"__MERGE_TARGET__"."purchase_order_id" = 1
54105410
),
54115411
batch_concurrency 1,
54125412
forward_only FALSE,
@@ -5457,7 +5457,7 @@ def fingerprint_merge(
54575457
kind INCREMENTAL_BY_UNIQUE_KEY (
54585458
unique_key ("purchase_order_id"),
54595459
when_matched (
5460-
WHEN MATCHED AND "__merge_source__"."salary" <> "__merge_target__"."salary" THEN UPDATE SET
5460+
WHEN MATCHED AND "__MERGE_SOURCE__"."salary" <> "__MERGE_TARGET__"."salary" THEN UPDATE SET
54615461
ARRAY('target.update_datetime = source.update_datetime', 'target.salary = source.salary')
54625462
),
54635463
batch_concurrency 1,
@@ -5491,8 +5491,8 @@ def test_when_matched_multiple():
54915491
)
54925492

54935493
expected_when_matched = [
5494-
"WHEN MATCHED AND `__merge_source__`.`x` = 1 THEN UPDATE SET `__merge_target__`.`salary` = COALESCE(`__merge_source__`.`salary`, `__merge_target__`.`salary`)",
5495-
"WHEN MATCHED THEN UPDATE SET `__merge_target__`.`salary` = COALESCE(`__merge_source__`.`salary`, `__merge_target__`.`salary`)",
5494+
"WHEN MATCHED AND `__MERGE_SOURCE__`.`x` = 1 THEN UPDATE SET `__MERGE_TARGET__`.`salary` = COALESCE(`__MERGE_SOURCE__`.`salary`, `__MERGE_TARGET__`.`salary`)",
5495+
"WHEN MATCHED THEN UPDATE SET `__MERGE_TARGET__`.`salary` = COALESCE(`__MERGE_SOURCE__`.`salary`, `__MERGE_TARGET__`.`salary`)",
54965496
]
54975497

54985498
model = load_sql_based_model(expressions, dialect="hive", variables={"schema": "db"})
@@ -5533,13 +5533,13 @@ def test_when_matched_merge_filter_multi_part_columns():
55335533
)
55345534

55355535
expected_when_matched = [
5536-
"WHEN MATCHED AND `__merge_source__`.`record`.`nested_record`.`field` = 1 THEN UPDATE SET `__merge_target__`.`repeated_record`.`sub_repeated_record`.`sub_field` = COALESCE(`__merge_source__`.`repeated_record`.`sub_repeated_record`.`sub_field`, `__merge_target__`.`repeated_record`.`sub_repeated_record`.`sub_field`)",
5537-
"WHEN MATCHED THEN UPDATE SET `__merge_target__`.`repeated_record`.`sub_repeated_record`.`sub_field` = COALESCE(`__merge_source__`.`repeated_record`.`sub_repeated_record`.`sub_field`, `__merge_target__`.`repeated_record`.`sub_repeated_record`.`sub_field`)",
5536+
"WHEN MATCHED AND `__MERGE_SOURCE__`.`record`.`nested_record`.`field` = 1 THEN UPDATE SET `__MERGE_TARGET__`.`repeated_record`.`sub_repeated_record`.`sub_field` = COALESCE(`__MERGE_SOURCE__`.`repeated_record`.`sub_repeated_record`.`sub_field`, `__MERGE_TARGET__`.`repeated_record`.`sub_repeated_record`.`sub_field`)",
5537+
"WHEN MATCHED THEN UPDATE SET `__MERGE_TARGET__`.`repeated_record`.`sub_repeated_record`.`sub_field` = COALESCE(`__MERGE_SOURCE__`.`repeated_record`.`sub_repeated_record`.`sub_field`, `__MERGE_TARGET__`.`repeated_record`.`sub_repeated_record`.`sub_field`)",
55385538
]
55395539

55405540
expected_merge_filter = (
5541-
"`__merge_source__`.`record`.`nested_record`.`field` < `__merge_target__`.`record`.`nested_record`.`field` AND "
5542-
"`__merge_target__`.`repeated_record`.`sub_repeated_record`.`sub_field` > `__merge_source__`.`repeated_record`.`sub_repeated_record`.`sub_field`"
5541+
"`__MERGE_SOURCE__`.`record`.`nested_record`.`field` < `__MERGE_TARGET__`.`record`.`nested_record`.`field` AND "
5542+
"`__MERGE_TARGET__`.`repeated_record`.`sub_repeated_record`.`sub_field` > `__MERGE_SOURCE__`.`repeated_record`.`sub_repeated_record`.`sub_field`"
55435543
)
55445544

55455545
model = load_sql_based_model(expressions, dialect="bigquery", variables={"schema": "db"})
@@ -5568,7 +5568,7 @@ def test_when_matched_normalization() -> None:
55685568
when_matched (
55695569
WHEN MATCHED THEN UPDATE SET
55705570
target.key_a = source.key_a,
5571-
target.key_b = source.key_b,
5571+
target.key_b = source.key_b,
55725572
)
55735573
)
55745574
);
@@ -6569,7 +6569,7 @@ def test_unrendered_macros_sql_model(mocker: MockerFixture) -> None:
65696569
assert model.unique_key[0] == exp.column("a", quoted=True)
65706570
assert (
65716571
t.cast(exp.Expression, model.merge_filter).sql()
6572-
== '"__merge_source__"."id" > 0 AND "__merge_target__"."updated_at" < @end_ds AND "__merge_source__"."updated_at" > @start_ds AND @merge_filter_var'
6572+
== '"__MERGE_SOURCE__"."id" > 0 AND "__MERGE_TARGET__"."updated_at" < @end_ds AND "__MERGE_SOURCE__"."updated_at" > @start_ds AND @merge_filter_var'
65736573
)
65746574

65756575

@@ -6665,7 +6665,7 @@ def model_with_macros(evaluator, **kwargs):
66656665
assert python_sql_model.unique_key[0] == exp.column("a", quoted=True)
66666666
assert (
66676667
python_sql_model.merge_filter.sql()
6668-
== '"__merge_source__"."id" > 0 AND "__merge_target__"."updated_at" < @end_ds AND "__merge_source__"."updated_at" > @start_ds AND @merge_filter_var'
6668+
== '"__MERGE_SOURCE__"."id" > 0 AND "__MERGE_TARGET__"."updated_at" < @end_ds AND "__MERGE_SOURCE__"."updated_at" > @start_ds AND @merge_filter_var'
66696669
)
66706670

66716671

@@ -7752,7 +7752,7 @@ def test_model_kind_to_expression():
77527752
.sql()
77537753
== """INCREMENTAL_BY_UNIQUE_KEY (
77547754
unique_key ("a"),
7755-
when_matched (WHEN MATCHED THEN UPDATE SET "__merge_target__"."b" = COALESCE("__merge_source__"."b", "__merge_target__"."b")),
7755+
when_matched (WHEN MATCHED THEN UPDATE SET "__MERGE_TARGET__"."b" = COALESCE("__MERGE_SOURCE__"."b", "__MERGE_TARGET__"."b")),
77567756
batch_concurrency 1,
77577757
forward_only FALSE,
77587758
disable_restatement FALSE,
@@ -7780,7 +7780,7 @@ def test_model_kind_to_expression():
77807780
.sql()
77817781
== """INCREMENTAL_BY_UNIQUE_KEY (
77827782
unique_key ("a"),
7783-
when_matched (WHEN MATCHED AND "__merge_source__"."x" = 1 THEN UPDATE SET "__merge_target__"."b" = COALESCE("__merge_source__"."b", "__merge_target__"."b") WHEN MATCHED THEN UPDATE SET "__merge_target__"."b" = COALESCE("__merge_source__"."b", "__merge_target__"."b")),
7783+
when_matched (WHEN MATCHED AND "__MERGE_SOURCE__"."x" = 1 THEN UPDATE SET "__MERGE_TARGET__"."b" = COALESCE("__MERGE_SOURCE__"."b", "__MERGE_TARGET__"."b") WHEN MATCHED THEN UPDATE SET "__MERGE_TARGET__"."b" = COALESCE("__MERGE_SOURCE__"."b", "__MERGE_TARGET__"."b")),
77847784
batch_concurrency 1,
77857785
forward_only FALSE,
77867786
disable_restatement FALSE,
@@ -8041,7 +8041,7 @@ def test_merge_filter():
80418041
"""
80428042
)
80438043

8044-
expected_incremental_predicate = f"`{MERGE_SOURCE_ALIAS.lower()}`.`salary` > 0"
8044+
expected_incremental_predicate = f"`{MERGE_SOURCE_ALIAS}`.`salary` > 0"
80458045

80468046
model = load_sql_based_model(expressions, dialect="hive")
80478047
assert model.kind.merge_filter.sql(dialect="hive") == expected_incremental_predicate
@@ -8084,19 +8084,19 @@ def test_merge_filter():
80848084
kind INCREMENTAL_BY_UNIQUE_KEY (
80858085
unique_key ("purchase_order_id"),
80868086
when_matched (
8087-
WHEN MATCHED AND "{MERGE_SOURCE_ALIAS.lower()}"."_operation" = 1 THEN DELETE
8088-
WHEN MATCHED AND "{MERGE_SOURCE_ALIAS.lower()}"."_operation" <> 1 THEN UPDATE SET
8089-
"{MERGE_TARGET_ALIAS.lower()}"."purchase_order_id" = 1
8087+
WHEN MATCHED AND "{MERGE_SOURCE_ALIAS}"."_operation" = 1 THEN DELETE
8088+
WHEN MATCHED AND "{MERGE_SOURCE_ALIAS}"."_operation" <> 1 THEN UPDATE SET
8089+
"{MERGE_TARGET_ALIAS}"."purchase_order_id" = 1
80908090
),
80918091
merge_filter (
8092-
"{MERGE_SOURCE_ALIAS.lower()}"."ds" > (
8092+
"{MERGE_SOURCE_ALIAS}"."ds" > (
80938093
SELECT
80948094
MAX("ds")
80958095
FROM "db"."test"
80968096
)
8097-
AND "{MERGE_SOURCE_ALIAS.lower()}"."ds" > @start_ds
8098-
AND "{MERGE_SOURCE_ALIAS.lower()}"."_operation" <> 1
8099-
AND "{MERGE_TARGET_ALIAS.lower()}"."start_date" > CURRENT_DATE + INTERVAL '7' DAY
8097+
AND "{MERGE_SOURCE_ALIAS}"."ds" > @start_ds
8098+
AND "{MERGE_SOURCE_ALIAS}"."_operation" <> 1
8099+
AND "{MERGE_TARGET_ALIAS}"."start_date" > CURRENT_DATE + INTERVAL '7' DAY
81008100
),
81018101
batch_concurrency 1,
81028102
forward_only FALSE,
@@ -8114,7 +8114,7 @@ def test_merge_filter():
81148114
rendered_merge_filters = model.render_merge_filter(start="2023-01-01", end="2023-01-02")
81158115
assert (
81168116
rendered_merge_filters.sql(dialect="hive")
8117-
== "(`__merge_source__`.`ds` > (SELECT MAX(`ds`) FROM `db`.`test`) AND `__merge_source__`.`ds` > '2023-01-01' AND `__merge_source__`.`_operation` <> 1 AND `__merge_target__`.`start_date` > CURRENT_DATE + INTERVAL '7' DAY)"
8117+
== "(`__MERGE_SOURCE__`.`ds` > (SELECT MAX(`ds`) FROM `db`.`test`) AND `__MERGE_SOURCE__`.`ds` > '2023-01-01' AND `__MERGE_SOURCE__`.`_operation` <> 1 AND `__MERGE_TARGET__`.`start_date` > CURRENT_DATE + INTERVAL '7' DAY)"
81188118
)
81198119

81208120

tests/core/test_snapshot_evaluator.py

Lines changed: 25 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -2200,18 +2200,14 @@ def test_create_incremental_by_unique_key_updated_at_exp(adapter_mock, make_snap
22002200
source=False,
22012201
then=exp.Update(
22022202
expressions=[
2203-
exp.column("name", MERGE_TARGET_ALIAS.lower(), quoted=True).eq(
2204-
exp.column("name", MERGE_SOURCE_ALIAS.lower(), quoted=True)
2203+
exp.column("name", MERGE_TARGET_ALIAS, quoted=True).eq(
2204+
exp.column("name", MERGE_SOURCE_ALIAS, quoted=True)
22052205
),
2206-
exp.column("updated_at", MERGE_TARGET_ALIAS.lower(), quoted=True).eq(
2206+
exp.column("updated_at", MERGE_TARGET_ALIAS, quoted=True).eq(
22072207
exp.Coalesce(
2208-
this=exp.column(
2209-
"updated_at", MERGE_SOURCE_ALIAS.lower(), quoted=True
2210-
),
2208+
this=exp.column("updated_at", MERGE_SOURCE_ALIAS, quoted=True),
22112209
expressions=[
2212-
exp.column(
2213-
"updated_at", MERGE_TARGET_ALIAS.lower(), quoted=True
2214-
)
2210+
exp.column("updated_at", MERGE_TARGET_ALIAS, quoted=True)
22152211
],
22162212
)
22172213
),
@@ -2269,23 +2265,19 @@ def test_create_incremental_by_unique_key_multiple_updated_at_exp(adapter_mock,
22692265
expressions=[
22702266
exp.When(
22712267
matched=True,
2272-
condition=exp.column("id", MERGE_SOURCE_ALIAS.lower(), quoted=True).eq(
2268+
condition=exp.column("id", MERGE_SOURCE_ALIAS, quoted=True).eq(
22732269
exp.Literal.number(1)
22742270
),
22752271
then=exp.Update(
22762272
expressions=[
2277-
exp.column("name", MERGE_TARGET_ALIAS.lower(), quoted=True).eq(
2278-
exp.column("name", MERGE_SOURCE_ALIAS.lower(), quoted=True)
2273+
exp.column("name", MERGE_TARGET_ALIAS, quoted=True).eq(
2274+
exp.column("name", MERGE_SOURCE_ALIAS, quoted=True)
22792275
),
2280-
exp.column("updated_at", MERGE_TARGET_ALIAS.lower(), quoted=True).eq(
2276+
exp.column("updated_at", MERGE_TARGET_ALIAS, quoted=True).eq(
22812277
exp.Coalesce(
2282-
this=exp.column(
2283-
"updated_at", MERGE_SOURCE_ALIAS.lower(), quoted=True
2284-
),
2278+
this=exp.column("updated_at", MERGE_SOURCE_ALIAS, quoted=True),
22852279
expressions=[
2286-
exp.column(
2287-
"updated_at", MERGE_TARGET_ALIAS.lower(), quoted=True
2288-
)
2280+
exp.column("updated_at", MERGE_TARGET_ALIAS, quoted=True)
22892281
],
22902282
)
22912283
),
@@ -2297,18 +2289,14 @@ def test_create_incremental_by_unique_key_multiple_updated_at_exp(adapter_mock,
22972289
source=False,
22982290
then=exp.Update(
22992291
expressions=[
2300-
exp.column("name", MERGE_TARGET_ALIAS.lower(), quoted=True).eq(
2301-
exp.column("name", MERGE_SOURCE_ALIAS.lower(), quoted=True)
2292+
exp.column("name", MERGE_TARGET_ALIAS, quoted=True).eq(
2293+
exp.column("name", MERGE_SOURCE_ALIAS, quoted=True)
23022294
),
2303-
exp.column("updated_at", MERGE_TARGET_ALIAS.lower(), quoted=True).eq(
2295+
exp.column("updated_at", MERGE_TARGET_ALIAS, quoted=True).eq(
23042296
exp.Coalesce(
2305-
this=exp.column(
2306-
"updated_at", MERGE_SOURCE_ALIAS.lower(), quoted=True
2307-
),
2297+
this=exp.column("updated_at", MERGE_SOURCE_ALIAS, quoted=True),
23082298
expressions=[
2309-
exp.column(
2310-
"updated_at", MERGE_TARGET_ALIAS.lower(), quoted=True
2311-
)
2299+
exp.column("updated_at", MERGE_TARGET_ALIAS, quoted=True)
23122300
],
23132301
)
23142302
),
@@ -2395,16 +2383,16 @@ def test_create_incremental_by_unique_key_merge_filter(adapter_mock, make_snapsh
23952383
assert model.merge_filter == exp.And(
23962384
this=exp.And(
23972385
this=exp.GT(
2398-
this=exp.column("id", MERGE_SOURCE_ALIAS.lower(), quoted=True),
2386+
this=exp.column("id", MERGE_SOURCE_ALIAS, quoted=True),
23992387
expression=exp.Literal(this="0", is_string=False),
24002388
),
24012389
expression=exp.LT(
2402-
this=exp.column("updated_at", MERGE_TARGET_ALIAS.lower(), quoted=True),
2390+
this=exp.column("updated_at", MERGE_TARGET_ALIAS, quoted=True),
24032391
expression=d.MacroVar(this="end_ds"),
24042392
),
24052393
),
24062394
expression=exp.GT(
2407-
this=exp.column("updated_at", MERGE_SOURCE_ALIAS.lower(), quoted=True),
2395+
this=exp.column("updated_at", MERGE_SOURCE_ALIAS, quoted=True),
24082396
expression=d.MacroVar(this="start_ds"),
24092397
),
24102398
)
@@ -2436,15 +2424,11 @@ def test_create_incremental_by_unique_key_merge_filter(adapter_mock, make_snapsh
24362424
matched=True,
24372425
then=exp.Update(
24382426
expressions=[
2439-
exp.column("updated_at", MERGE_TARGET_ALIAS.lower(), quoted=True).eq(
2427+
exp.column("updated_at", MERGE_TARGET_ALIAS, quoted=True).eq(
24402428
exp.Coalesce(
2441-
this=exp.column(
2442-
"updated_at", MERGE_SOURCE_ALIAS.lower(), quoted=True
2443-
),
2429+
this=exp.column("updated_at", MERGE_SOURCE_ALIAS, quoted=True),
24442430
expressions=[
2445-
exp.column(
2446-
"updated_at", MERGE_TARGET_ALIAS.lower(), quoted=True
2447-
)
2431+
exp.column("updated_at", MERGE_TARGET_ALIAS, quoted=True)
24482432
],
24492433
)
24502434
),
@@ -2456,16 +2440,16 @@ def test_create_incremental_by_unique_key_merge_filter(adapter_mock, make_snapsh
24562440
merge_filter=exp.And(
24572441
this=exp.And(
24582442
this=exp.GT(
2459-
this=exp.column("id", MERGE_SOURCE_ALIAS.lower(), quoted=True),
2443+
this=exp.column("id", MERGE_SOURCE_ALIAS, quoted=True),
24602444
expression=exp.Literal(this="0", is_string=False),
24612445
),
24622446
expression=exp.LT(
2463-
this=exp.column("updated_at", MERGE_TARGET_ALIAS.lower(), quoted=True),
2447+
this=exp.column("updated_at", MERGE_TARGET_ALIAS, quoted=True),
24642448
expression=exp.Literal(this="2020-01-02", is_string=True),
24652449
),
24662450
),
24672451
expression=exp.GT(
2468-
this=exp.column("updated_at", MERGE_SOURCE_ALIAS.lower(), quoted=True),
2452+
this=exp.column("updated_at", MERGE_SOURCE_ALIAS, quoted=True),
24692453
expression=exp.Literal(this="2020-01-01", is_string=True),
24702454
),
24712455
),

tests/dbt/test_config.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ def test_model_to_sqlmesh_fields():
135135
assert kind.on_destructive_change == OnDestructiveChange.ALLOW
136136
assert (
137137
kind.merge_filter.sql(dialect=model.dialect)
138-
== """55 > "__merge_source__"."b" AND "__merge_target__"."session_start" > CURRENT_DATE + INTERVAL '7' DAY"""
138+
== """55 > "__MERGE_SOURCE__"."b" AND "__MERGE_TARGET__"."session_start" > CURRENT_DATE + INTERVAL '7' DAY"""
139139
)
140140

141141
model = model_config.update_with({"dialect": "snowflake"}).to_sqlmesh(context)

0 commit comments

Comments
 (0)