Skip to content

Commit 58d6bdd

Browse files
construct dml merge and copy into
1 parent 03727a7 commit 58d6bdd

File tree

2 files changed

+25
-2
lines changed

2 files changed

+25
-2
lines changed

.github/workflows/test.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,4 +41,4 @@ jobs:
4141
poetry install
4242
- name: Test with pytest
4343
run: |
44-
poetry run pytest -n auto
44+
poetry run pytest -s -n auto

target_snowflake/connector.py

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from __future__ import annotations
22

3+
import sys
34
from operator import contains, eq
45
from typing import TYPE_CHECKING, Any, Iterable, Sequence, cast
56

@@ -9,8 +10,8 @@
910
from cryptography.hazmat.primitives import serialization
1011
from singer_sdk import typing as th
1112
from singer_sdk.connectors import SQLConnector
12-
from snowflake.sqlalchemy import URL
1313
from snowflake.sqlalchemy.base import SnowflakeIdentifierPreparer
14+
from snowflake.sqlalchemy.custom_commands import CopyInto, MergeInto
1415
from snowflake.sqlalchemy.snowdialect import SnowflakeDialect
1516
from sqlalchemy.sql import quoted_name, text
1617

@@ -386,6 +387,19 @@ def _get_merge_from_stage_statement( # noqa: ANN202
386387
)
387388
dedup_cols = ", ".join(list(formatted_key_properties))
388389
dedup = f"QUALIFY ROW_NUMBER() OVER (PARTITION BY {dedup_cols} ORDER BY SEQ8() DESC) = 1"
390+
391+
merge = MergeInto(
392+
target=full_table_name,
393+
source=text(f"""
394+
(select {json_casting_selects} from '@~/target-snowflake/{sync_id}'
395+
(file_format => {file_format}) {dedup}) s
396+
"""), # noqa: S608
397+
on=join_expr,
398+
)
399+
merge.when_matched_then_update()
400+
merge.when_not_matched_then_insert()
401+
print(">>> MERGE INTO:", merge.compile(self._engine), file=sys.stderr)
402+
389403
return (
390404
text(
391405
f"merge into {quoted_name(full_table_name, quote=True)} d using " # noqa: ISC003
@@ -411,6 +425,15 @@ def _get_copy_statement(self, full_table_name, schema, sync_id, file_format): #
411425
column_selections,
412426
"col_alias",
413427
)
428+
copy_into = CopyInto(
429+
from_=text(f"""
430+
(select {json_casting_selects} from '@~/target-snowflake/{sync_id}')
431+
"""), # noqa: S608
432+
into=full_table_name,
433+
formatter=formatter,
434+
)
435+
print(">>> COPY INTO:", copy_into.compile(self._engine), file=sys.stderr)
436+
414437
return (
415438
text(
416439
f"copy into {full_table_name} {col_alias_selects} from " # noqa: ISC003

0 commit comments

Comments
 (0)