Skip to content

Commit e5ee298

Browse files
committed
Improve MSSQL merge performance
1 parent 5228105 commit e5ee298

File tree

2 files changed

+87
-4
lines changed

2 files changed

+87
-4
lines changed

sqlmesh/core/engine_adapter/mssql.py

Lines changed: 85 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,13 @@
66

77
from sqlglot import exp
88

9-
from sqlmesh.core.dialect import to_schema
9+
from sqlmesh.core.dialect import to_schema, add_table
1010
from sqlmesh.core.engine_adapter.base import (
1111
EngineAdapterWithIndexSupport,
1212
EngineAdapter,
1313
InsertOverwriteStrategy,
14+
MERGE_SOURCE_ALIAS,
15+
MERGE_TARGET_ALIAS,
1416
)
1517
from sqlmesh.core.engine_adapter.mixins import (
1618
GetCurrentCatalogFromFunctionMixin,
@@ -32,7 +34,7 @@
3234

3335
if t.TYPE_CHECKING:
3436
from sqlmesh.core._typing import SchemaName, TableName
35-
from sqlmesh.core.engine_adapter._typing import DF, Query
37+
from sqlmesh.core.engine_adapter._typing import DF, Query, QueryOrDF
3638

3739

3840
@set_catalog()
@@ -188,6 +190,87 @@ def drop_schema(
188190
)
189191
super().drop_schema(schema_name, ignore_if_not_exists=ignore_if_not_exists, cascade=False)
190192

193+
def merge(
194+
self,
195+
target_table: TableName,
196+
source_table: QueryOrDF,
197+
columns_to_types: t.Optional[t.Dict[str, exp.DataType]],
198+
unique_key: t.Sequence[exp.Expression],
199+
when_matched: t.Optional[exp.Whens] = None,
200+
merge_filter: t.Optional[exp.Expression] = None,
201+
) -> None:
202+
source_queries, columns_to_types = self._get_source_queries_and_columns_to_types(
203+
source_table, columns_to_types, target_table=target_table
204+
)
205+
columns_to_types = columns_to_types or self.columns(target_table)
206+
on = exp.and_(
207+
*(
208+
add_table(part, MERGE_TARGET_ALIAS).eq(add_table(part, MERGE_SOURCE_ALIAS))
209+
for part in unique_key
210+
)
211+
)
212+
if merge_filter:
213+
on = exp.and_(merge_filter, on)
214+
215+
if not when_matched:
216+
match_condition = None
217+
unique_key_names = [y.name for y in unique_key]
218+
columns_to_types_no_keys = [c for c in columns_to_types if c not in unique_key_names]
219+
220+
target_columns_no_keys = [
221+
exp.column(c, MERGE_TARGET_ALIAS) for c in columns_to_types_no_keys
222+
]
223+
source_columns_no_keys = [
224+
exp.column(c, MERGE_SOURCE_ALIAS) for c in columns_to_types_no_keys
225+
]
226+
227+
match_condition = exp.Exists(
228+
this=exp.select(*target_columns_no_keys).except_(
229+
exp.select(*source_columns_no_keys)
230+
)
231+
)
232+
233+
match_expressions = [
234+
exp.When(
235+
matched=True,
236+
source=False,
237+
condition=match_condition,
238+
then=exp.Update(
239+
expressions=[
240+
exp.column(col, MERGE_TARGET_ALIAS).eq(
241+
exp.column(col, MERGE_SOURCE_ALIAS)
242+
)
243+
for col in columns_to_types_no_keys
244+
],
245+
),
246+
)
247+
]
248+
else:
249+
match_expressions = when_matched.copy().expressions
250+
251+
match_expressions.append(
252+
exp.When(
253+
matched=False,
254+
source=False,
255+
then=exp.Insert(
256+
this=exp.Tuple(expressions=[exp.column(col) for col in columns_to_types]),
257+
expression=exp.Tuple(
258+
expressions=[
259+
exp.column(col, MERGE_SOURCE_ALIAS) for col in columns_to_types
260+
]
261+
),
262+
),
263+
)
264+
)
265+
for source_query in source_queries:
266+
with source_query as query:
267+
self._merge(
268+
target_table=target_table,
269+
query=query,
270+
on=on,
271+
whens=exp.Whens(expressions=match_expressions),
272+
)
273+
191274
def _convert_df_datetime(self, df: DF, columns_to_types: t.Dict[str, exp.DataType]) -> None:
192275
import pandas as pd
193276
from pandas.api.types import is_datetime64_any_dtype # type: ignore

tests/core/engine_adapter/test_mssql.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -472,7 +472,7 @@ def test_merge_pandas(
472472

473473
assert to_sql_calls(adapter) == [
474474
f"""IF NOT EXISTS (SELECT * FROM information_schema.tables WHERE table_name = '__temp_target_{temp_table_id}') EXEC('CREATE TABLE [__temp_target_{temp_table_id}] ([id] INTEGER, [ts] DATETIME2, [val] INTEGER)');""",
475-
f"MERGE INTO [target] AS [__MERGE_TARGET__] USING (SELECT CAST([id] AS INTEGER) AS [id], CAST([ts] AS DATETIME2) AS [ts], CAST([val] AS INTEGER) AS [val] FROM [__temp_target_{temp_table_id}]) AS [__MERGE_SOURCE__] ON [__MERGE_TARGET__].[id] = [__MERGE_SOURCE__].[id] WHEN MATCHED THEN UPDATE SET [__MERGE_TARGET__].[id] = [__MERGE_SOURCE__].[id], [__MERGE_TARGET__].[ts] = [__MERGE_SOURCE__].[ts], [__MERGE_TARGET__].[val] = [__MERGE_SOURCE__].[val] WHEN NOT MATCHED THEN INSERT ([id], [ts], [val]) VALUES ([__MERGE_SOURCE__].[id], [__MERGE_SOURCE__].[ts], [__MERGE_SOURCE__].[val]);",
475+
f"MERGE INTO [target] AS [__MERGE_TARGET__] USING (SELECT CAST([id] AS INTEGER) AS [id], CAST([ts] AS DATETIME2) AS [ts], CAST([val] AS INTEGER) AS [val] FROM [__temp_target_{temp_table_id}]) AS [__MERGE_SOURCE__] ON [__MERGE_TARGET__].[id] = [__MERGE_SOURCE__].[id] WHEN MATCHED AND EXISTS(SELECT [__MERGE_TARGET__].[ts], [__MERGE_TARGET__].[val] EXCEPT SELECT [__MERGE_SOURCE__].[ts], [__MERGE_SOURCE__].[val]) THEN UPDATE SET [__MERGE_TARGET__].[ts] = [__MERGE_SOURCE__].[ts], [__MERGE_TARGET__].[val] = [__MERGE_SOURCE__].[val] WHEN NOT MATCHED THEN INSERT ([id], [ts], [val]) VALUES ([__MERGE_SOURCE__].[id], [__MERGE_SOURCE__].[ts], [__MERGE_SOURCE__].[val]);",
476476
f"DROP TABLE IF EXISTS [__temp_target_{temp_table_id}];",
477477
]
478478

@@ -495,7 +495,7 @@ def test_merge_pandas(
495495

496496
assert to_sql_calls(adapter) == [
497497
f"""IF NOT EXISTS (SELECT * FROM information_schema.tables WHERE table_name = '__temp_target_{temp_table_id}') EXEC('CREATE TABLE [__temp_target_{temp_table_id}] ([id] INTEGER, [ts] DATETIME2, [val] INTEGER)');""",
498-
f"MERGE INTO [target] AS [__MERGE_TARGET__] USING (SELECT CAST([id] AS INTEGER) AS [id], CAST([ts] AS DATETIME2) AS [ts], CAST([val] AS INTEGER) AS [val] FROM [__temp_target_{temp_table_id}]) AS [__MERGE_SOURCE__] ON [__MERGE_TARGET__].[id] = [__MERGE_SOURCE__].[id] AND [__MERGE_TARGET__].[ts] = [__MERGE_SOURCE__].[ts] WHEN MATCHED THEN UPDATE SET [__MERGE_TARGET__].[id] = [__MERGE_SOURCE__].[id], [__MERGE_TARGET__].[ts] = [__MERGE_SOURCE__].[ts], [__MERGE_TARGET__].[val] = [__MERGE_SOURCE__].[val] WHEN NOT MATCHED THEN INSERT ([id], [ts], [val]) VALUES ([__MERGE_SOURCE__].[id], [__MERGE_SOURCE__].[ts], [__MERGE_SOURCE__].[val]);",
498+
f"MERGE INTO [target] AS [__MERGE_TARGET__] USING (SELECT CAST([id] AS INTEGER) AS [id], CAST([ts] AS DATETIME2) AS [ts], CAST([val] AS INTEGER) AS [val] FROM [__temp_target_{temp_table_id}]) AS [__MERGE_SOURCE__] ON [__MERGE_TARGET__].[id] = [__MERGE_SOURCE__].[id] AND [__MERGE_TARGET__].[ts] = [__MERGE_SOURCE__].[ts] WHEN MATCHED AND EXISTS(SELECT [__MERGE_TARGET__].[val] EXCEPT SELECT [__MERGE_SOURCE__].[val]) THEN UPDATE SET [__MERGE_TARGET__].[val] = [__MERGE_SOURCE__].[val] WHEN NOT MATCHED THEN INSERT ([id], [ts], [val]) VALUES ([__MERGE_SOURCE__].[id], [__MERGE_SOURCE__].[ts], [__MERGE_SOURCE__].[val]);",
499499
f"DROP TABLE IF EXISTS [__temp_target_{temp_table_id}];",
500500
]
501501

0 commit comments

Comments
 (0)