Skip to content

Commit 10a4ceb

Browse files
fix: Handle reserved words in db, schema and table names with the normalize_name dialect helper
Co-authored-by: Pat Nadolny <[email protected]>
1 parent 0365442 commit 10a4ceb

File tree

5 files changed

+80
-12
lines changed

5 files changed

+80
-12
lines changed

Diff for: .github/workflows/test.yml

+8-5
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,15 @@ jobs:
2121
fail-fast: false
2222
matrix:
2323
python-version:
24-
- "3.8"
25-
- "3.9"
26-
- "3.10"
27-
- "3.11"
2824
- "3.12"
29-
os: ["ubuntu-latest", "macos-latest", "windows-latest"]
25+
# - "3.11"
26+
# - "3.10"
27+
# - "3.9"
28+
# - "3.8"
29+
os:
30+
- "ubuntu-latest"
31+
# - "macos-latest"
32+
# - "windows-latest"
3033
steps:
3134
- uses: actions/checkout@v4
3235
- name: Set up Python ${{ matrix.python-version }}

Diff for: target_snowflake/connector.py

+7-2
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
from snowflake.sqlalchemy import URL
1313
from snowflake.sqlalchemy.base import SnowflakeIdentifierPreparer
1414
from snowflake.sqlalchemy.snowdialect import SnowflakeDialect
15-
from sqlalchemy.sql import quoted_name, text
15+
from sqlalchemy.sql import text
1616

1717
from target_snowflake.snowflake_types import NUMBER, TIMESTAMP_NTZ, VARIANT
1818

@@ -61,6 +61,11 @@ def __init__(self, *args: Any, **kwargs: Any) -> None:
6161
self.schema_cache: dict = {}
6262
super().__init__(*args, **kwargs)
6363

64+
@property
65+
def dialect(self) -> SnowflakeDialect:
66+
"""Return a Snowflake dialect instance."""
67+
return self._engine.dialect
68+
6469
def get_table_columns(
6570
self,
6671
full_table_name: str,
@@ -388,7 +393,7 @@ def _get_merge_from_stage_statement( # noqa: ANN202
388393
dedup = f"QUALIFY ROW_NUMBER() OVER (PARTITION BY {dedup_cols} ORDER BY SEQ8() DESC) = 1"
389394
return (
390395
text(
391-
f"merge into {quoted_name(full_table_name, quote=True)} d using " # noqa: ISC003
396+
f"merge into {full_table_name} d using " # noqa: ISC003
392397
+ f"(select {json_casting_selects} from '@~/target-snowflake/{sync_id}'" # noqa: S608
393398
+ f"(file_format => {file_format}) {dedup}) s "
394399
+ f"on {join_expr} "

Diff for: target_snowflake/sinks.py

+5-4
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
from singer_sdk.sinks import SQLSink
1818
from snowflake.sqlalchemy.base import SnowflakeIdentifierPreparer
1919
from snowflake.sqlalchemy.snowdialect import SnowflakeDialect
20+
from sqlalchemy.sql import quoted_name
2021

2122
from target_snowflake.connector import SnowflakeConnector
2223

@@ -29,7 +30,7 @@
2930
}
3031

3132

32-
class SnowflakeSink(SQLSink):
33+
class SnowflakeSink(SQLSink[SnowflakeConnector]):
3334
"""Snowflake target sink class."""
3435

3536
connector_class = SnowflakeConnector
@@ -55,16 +56,16 @@ def __init__(
5556
@property
5657
def schema_name(self) -> str | None:
5758
schema = super().schema_name or self.config.get("schema")
58-
return schema.upper() if schema else None
59+
return quoted_name(self.connector.dialect.normalize_name(schema.upper() if schema else None), quote=True)
5960

6061
@property
6162
def database_name(self) -> str | None:
6263
db = super().database_name or self.config.get("database")
63-
return db.upper() if db else None
64+
return quoted_name(self.connector.dialect.normalize_name(db.upper() if db else None), quote=True)
6465

6566
@property
6667
def table_name(self) -> str:
67-
return super().table_name.upper()
68+
return quoted_name(self.connector.dialect.normalize_name(super().table_name.upper()), quote=True)
6869

6970
def setup(self) -> None:
7071
"""Set up Sink.

Diff for: tests/core.py

+58-1
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ def validate(self) -> None:
6666
class SnowflakeTargetCamelcaseComplexSchema(TargetCamelcaseComplexSchema):
6767
def validate(self) -> None:
6868
connector = self.target.default_sink_class.connector_class(self.target.config)
69-
table = f"{self.target.config['database']}.{self.target.config['default_target_schema']}.ForecastingTypeToCategory".upper() # noqa: E501
69+
table = f"{self.target.config['database']}.{self.target.config['default_target_schema']}.\"ForecastingTypeToCategory\"" # noqa: E501
7070
table_schema = connector.get_table(table)
7171
expected_types = {
7272
"id": sqlalchemy.VARCHAR,
@@ -458,6 +458,61 @@ def setup(self) -> None:
458458
)
459459

460460

461+
class SnowflakeTargetExistingReservedNameTableAlter(TargetFileTestTemplate):
462+
name = "existing_reserved_name_table_alter"
463+
# This sends a schema that will request altering from TIMESTAMP_NTZ to VARCHAR
464+
465+
@property
466+
def singer_filepath(self) -> Path:
467+
current_dir = Path(__file__).resolve().parent
468+
return current_dir / "target_test_streams" / "reserved_words_in_table.singer"
469+
470+
def setup(self) -> None:
471+
connector = self.target.default_sink_class.connector_class(self.target.config)
472+
table = f"{self.target.config['database']}.{self.target.config['default_target_schema']}.\"order\"".upper()
473+
connector.connection.execute(
474+
f"""
475+
CREATE OR REPLACE TABLE {table} (
476+
ID VARCHAR(16777216),
477+
COL_STR VARCHAR(16777216),
478+
COL_TS TIMESTAMP_NTZ(9),
479+
COL_INT STRING,
480+
COL_BOOL BOOLEAN,
481+
COL_VARIANT VARIANT,
482+
_SDC_BATCHED_AT TIMESTAMP_NTZ(9),
483+
_SDC_DELETED_AT VARCHAR(16777216),
484+
_SDC_EXTRACTED_AT TIMESTAMP_NTZ(9),
485+
_SDC_RECEIVED_AT TIMESTAMP_NTZ(9),
486+
_SDC_SEQUENCE NUMBER(38,0),
487+
_SDC_TABLE_VERSION NUMBER(38,0),
488+
PRIMARY KEY (ID)
489+
)
490+
""",
491+
)
492+
493+
494+
class SnowflakeTargetReservedWordsInTable(TargetFileTestTemplate):
495+
# Contains reserved words from
496+
# https://docs.snowflake.com/en/sql-reference/reserved-keywords
497+
# Syncs records then alters schema by adding a non-reserved word column.
498+
name = "reserved_words_in_table"
499+
500+
@property
501+
def singer_filepath(self) -> Path:
502+
current_dir = Path(__file__).resolve().parent
503+
return current_dir / "target_test_streams" / "reserved_words_in_table.singer"
504+
505+
def validate(self) -> None:
506+
connector = self.target.default_sink_class.connector_class(self.target.config)
507+
table = f"{self.target.config['database']}.{self.target.config['default_target_schema']}.\"order\"".upper()
508+
result = connector.connection.execute(
509+
f"select * from {table}",
510+
)
511+
assert result.rowcount == 1
512+
row = result.first()
513+
assert len(row) == 13, f"Row has unexpected length {len(row)}"
514+
515+
461516
class SnowflakeTargetTypeEdgeCasesTest(TargetFileTestTemplate):
462517
name = "type_edge_cases"
463518

@@ -540,6 +595,8 @@ def singer_filepath(self) -> Path:
540595
SnowflakeTargetColonsInColName,
541596
SnowflakeTargetExistingTable,
542597
SnowflakeTargetExistingTableAlter,
598+
SnowflakeTargetExistingReservedNameTableAlter,
599+
SnowflakeTargetReservedWordsInTable,
543600
SnowflakeTargetTypeEdgeCasesTest,
544601
SnowflakeTargetColumnOrderMismatch,
545602
],
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
{ "type": "SCHEMA", "stream": "order", "schema": { "properties": { "id": { "type": [ "string", "null" ] }, "col_str": { "type": [ "string", "null" ] }, "col_ts": { "format": "date-time", "type": [ "string", "null" ] }, "col_int": { "type": "integer" }, "col_bool": { "type": [ "boolean", "null" ] }, "col_variant": {"type": "object"} }, "type": "object" }, "key_properties": [ "id" ], "bookmark_properties": [ "col_ts" ] }
2+
{ "type": "RECORD", "stream": "order", "record": { "id": "123", "col_str": "foo", "col_ts": "2023-06-13 11:50:04.072", "col_int": 5, "col_bool": true, "col_variant": {"key": "val"} }, "time_extracted": "2023-06-14T18:08:23.074716+00:00" }

0 commit comments

Comments
 (0)