Skip to content

Commit 9dab1bc

Browse files
committed
Cover case where all columns are unique keys
1 parent d3bd903 commit 9dab1bc

File tree

2 files changed

+43
-15
lines changed

2 files changed

+43
-15
lines changed

sqlmesh/core/engine_adapter/mssql.py

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,7 @@ def merge(
212212
if merge_filter:
213213
on = exp.and_(merge_filter, on)
214214

215+
match_expressions = []
215216
if not when_matched:
216217
match_condition = None
217218
unique_key_names = [y.name for y in unique_key]
@@ -230,23 +231,24 @@ def merge(
230231
)
231232
)
232233

233-
match_expressions = [
234-
exp.When(
235-
matched=True,
236-
source=False,
237-
condition=match_condition,
238-
then=exp.Update(
239-
expressions=[
240-
exp.column(col, MERGE_TARGET_ALIAS).eq(
241-
exp.column(col, MERGE_SOURCE_ALIAS)
242-
)
243-
for col in columns_to_types_no_keys
244-
],
245-
),
234+
if target_columns_no_keys:
235+
match_expressions.append(
236+
exp.When(
237+
matched=True,
238+
source=False,
239+
condition=match_condition,
240+
then=exp.Update(
241+
expressions=[
242+
exp.column(col, MERGE_TARGET_ALIAS).eq(
243+
exp.column(col, MERGE_SOURCE_ALIAS)
244+
)
245+
for col in columns_to_types_no_keys
246+
],
247+
),
248+
)
246249
)
247-
]
248250
else:
249-
match_expressions = when_matched.copy().expressions
251+
match_expressions.extend(when_matched.copy().expressions)
250252

251253
match_expressions.append(
252254
exp.When(

tests/core/engine_adapter/test_mssql.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -456,6 +456,8 @@ def test_merge_pandas(
456456
temp_table_mock.return_value = make_temp_table_name(table_name, temp_table_id)
457457

458458
df = pd.DataFrame({"id": [1, 2, 3], "ts": [1, 2, 3], "val": [4, 5, 6]})
459+
460+
# 1 key
459461
adapter.merge(
460462
target_table=table_name,
461463
source_table=df,
@@ -476,6 +478,7 @@ def test_merge_pandas(
476478
f"DROP TABLE IF EXISTS [__temp_target_{temp_table_id}];",
477479
]
478480

481+
# 2 keys
479482
adapter.cursor.reset_mock()
480483
adapter._connection_pool.get().reset_mock()
481484
temp_table_mock.return_value = make_temp_table_name(table_name, temp_table_id)
@@ -499,6 +502,29 @@ def test_merge_pandas(
499502
f"DROP TABLE IF EXISTS [__temp_target_{temp_table_id}];",
500503
]
501504

505+
# all model columns are keys
506+
adapter.cursor.reset_mock()
507+
adapter._connection_pool.get().reset_mock()
508+
temp_table_mock.return_value = make_temp_table_name(table_name, temp_table_id)
509+
adapter.merge(
510+
target_table=table_name,
511+
source_table=df,
512+
columns_to_types={
513+
"id": exp.DataType.build("int"),
514+
"ts": exp.DataType.build("TIMESTAMP"),
515+
},
516+
unique_key=[exp.to_identifier("id"), exp.to_column("ts")],
517+
)
518+
adapter._connection_pool.get().bulk_copy.assert_called_with(
519+
f"__temp_target_{temp_table_id}", [(1, 1), (2, 2), (3, 3)]
520+
)
521+
522+
assert to_sql_calls(adapter) == [
523+
f"""IF NOT EXISTS (SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = '__temp_target_{temp_table_id}') EXEC('CREATE TABLE [__temp_target_{temp_table_id}] ([id] INTEGER, [ts] DATETIME2)');""",
524+
f"MERGE INTO [target] AS [__MERGE_TARGET__] USING (SELECT CAST([id] AS INTEGER) AS [id], CAST([ts] AS DATETIME2) AS [ts] FROM [__temp_target_{temp_table_id}]) AS [__MERGE_SOURCE__] ON [__MERGE_TARGET__].[id] = [__MERGE_SOURCE__].[id] AND [__MERGE_TARGET__].[ts] = [__MERGE_SOURCE__].[ts] WHEN NOT MATCHED THEN INSERT ([id], [ts]) VALUES ([__MERGE_SOURCE__].[id], [__MERGE_SOURCE__].[ts]);",
525+
f"DROP TABLE IF EXISTS [__temp_target_{temp_table_id}];",
526+
]
527+
502528

503529
def test_replace_query(make_mocked_engine_adapter: t.Callable):
504530
adapter = make_mocked_engine_adapter(MSSQLEngineAdapter)

0 commit comments

Comments
 (0)