Skip to content

Commit 33ac43f

Browse files
committed
Feat(trino): Introduce custom timestamp type mapping
1 parent 8efc553 commit 33ac43f

File tree

4 files changed

+314
-11
lines changed

4 files changed

+314
-11
lines changed

sqlmesh/core/config/connection.py

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
from packaging import version
1818
from sqlglot import exp
1919
from sqlglot.helper import subclasses
20+
from sqlglot.errors import ParseError
2021

2122
from sqlmesh.core import engine_adapter
2223
from sqlmesh.core.config.base import BaseConfig
@@ -1890,6 +1891,7 @@ class TrinoConnectionConfig(ConnectionConfig):
18901891

18911892
# SQLMesh options
18921893
schema_location_mapping: t.Optional[dict[re.Pattern, str]] = None
1894+
trino_to_delta_timestamp_mapping: t.Optional[dict[exp.DataType, exp.DataType]] = None
18931895
concurrent_tasks: int = 4
18941896
register_comments: bool = True
18951897
pre_ping: t.Literal[False] = False
@@ -1914,6 +1916,47 @@ def _validate_regex_keys(
19141916
)
19151917
return compiled
19161918

1919+
@field_validator("trino_to_delta_timestamp_mapping", mode="before")
1920+
@classmethod
1921+
def _validate_trino_to_delta_timestamp_mapping(
1922+
cls, value: t.Optional[dict[str, str]]
1923+
) -> t.Optional[dict[exp.DataType, exp.DataType]]:
1924+
if value is None:
1925+
return value
1926+
1927+
# Delta Lake only supports these timestamp types
1928+
valid_delta_types = {
1929+
exp.DataType.build("TIMESTAMP"),
1930+
exp.DataType.build("TIMESTAMPNTZ"),
1931+
}
1932+
1933+
result: dict[exp.DataType, exp.DataType] = {}
1934+
for source_type, target_type in value.items():
1935+
try:
1936+
source_datatype = exp.DataType.build(source_type)
1937+
except ParseError:
1938+
raise ConfigError(
1939+
"Invalid SQL type string in trino_to_delta_timestamp_mapping: "
1940+
f"'{source_type}' is not a valid SQL data type."
1941+
)
1942+
try:
1943+
target_datatype = exp.DataType.build(target_type)
1944+
except ParseError:
1945+
raise ConfigError(
1946+
"Invalid SQL type string in trino_to_delta_timestamp_mapping: "
1947+
f"'{target_type}' is not a valid SQL data type."
1948+
)
1949+
# Delta Lake types must be TIMESTAMP or TIMESTAMP_NTZ
1950+
if target_datatype not in valid_delta_types:
1951+
raise ConfigError(
1952+
f"Invalid target type in trino_to_delta_timestamp_mapping: "
1953+
f"'{target_type}' is not a valid Delta Lake timestamp type. "
1954+
f"Valid types are: TIMESTAMP, TIMESTAMP_NTZ, or TIMESTAMPNTZ."
1955+
)
1956+
result[source_datatype] = target_datatype
1957+
1958+
return result
1959+
19171960
@model_validator(mode="after")
19181961
def _root_validator(self) -> Self:
19191962
port = self.port
@@ -2016,7 +2059,10 @@ def _static_connection_kwargs(self) -> t.Dict[str, t.Any]:
20162059

20172060
@property
20182061
def _extra_engine_config(self) -> t.Dict[str, t.Any]:
2019-
return {"schema_location_mapping": self.schema_location_mapping}
2062+
return {
2063+
"schema_location_mapping": self.schema_location_mapping,
2064+
"trino_to_delta_timestamp_mapping": self.trino_to_delta_timestamp_mapping,
2065+
}
20202066

20212067

20222068
class ClickhouseConnectionConfig(ConnectionConfig):

sqlmesh/core/engine_adapter/trino.py

Lines changed: 26 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,12 @@ class TrinoEngineAdapter(
7474
def schema_location_mapping(self) -> t.Optional[t.Dict[re.Pattern, str]]:
7575
return self._extra_config.get("schema_location_mapping")
7676

77+
@property
78+
def trino_to_delta_timestamp_mapping(
79+
self,
80+
) -> t.Optional[t.Dict[exp.DataType, exp.DataType]]:
81+
return self._extra_config.get("trino_to_delta_timestamp_mapping")
82+
7783
@property
7884
def catalog_support(self) -> CatalogSupport:
7985
return CatalogSupport.FULL_SUPPORT
@@ -117,7 +123,7 @@ def session(self, properties: SessionProperties) -> t.Iterator[None]:
117123
try:
118124
yield
119125
finally:
120-
self.execute(f"RESET SESSION AUTHORIZATION")
126+
self.execute("RESET SESSION AUTHORIZATION")
121127

122128
def replace_query(
123129
self,
@@ -351,15 +357,25 @@ def _to_delta_ts(
351357
ts6 = exp.DataType.build("timestamp(6)")
352358
ts3_tz = exp.DataType.build("timestamp(3) with time zone")
353359

354-
delta_columns_to_types = {
355-
k: ts6 if v.is_type(exp.DataType.Type.TIMESTAMP) else v
356-
for k, v in columns_to_types.items()
357-
}
358-
359-
delta_columns_to_types = {
360-
k: ts3_tz if v.is_type(exp.DataType.Type.TIMESTAMPTZ) else v
361-
for k, v in delta_columns_to_types.items()
362-
}
360+
delta_columns_to_types = {}
361+
362+
for column, column_type in columns_to_types.items():
363+
target_type = column_type
364+
if (
365+
self.trino_to_delta_timestamp_mapping
366+
and target_type in self.trino_to_delta_timestamp_mapping
367+
):
368+
target_type = self.trino_to_delta_timestamp_mapping[target_type]
369+
# Delta lake's TIMESTAMP type is time zone aware
370+
if target_type.is_type(exp.DataType.Type.TIMESTAMP):
371+
target_type = exp.DataType.build("timestamptz")
372+
elif target_type.is_type(exp.DataType.Type.TIMESTAMPNTZ):
373+
target_type = exp.DataType.build("timestamp")
374+
if target_type.is_type(exp.DataType.Type.TIMESTAMP):
375+
target_type = ts6
376+
elif target_type.is_type(exp.DataType.Type.TIMESTAMPTZ):
377+
target_type = ts3_tz
378+
delta_columns_to_types[column] = target_type
363379

364380
return delta_columns_to_types
365381

tests/core/engine_adapter/test_trino.py

Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -404,6 +404,109 @@ def test_delta_timestamps(make_mocked_engine_adapter: t.Callable):
404404
}
405405

406406

407+
def test_trino_to_delta_timestamp_mapping():
408+
"""Test that trino_to_delta_timestamp_mapping config property is properly defined and accessible."""
409+
config = TrinoConnectionConfig(
410+
user="user",
411+
host="host",
412+
catalog="catalog",
413+
)
414+
415+
adapter = config.create_engine_adapter()
416+
assert adapter.trino_to_delta_timestamp_mapping is None
417+
418+
config = TrinoConnectionConfig(
419+
user="user",
420+
host="host",
421+
catalog="catalog",
422+
trino_to_delta_timestamp_mapping={
423+
"TIMESTAMP": "TIMESTAMP",
424+
"TIMESTAMP(3)": "TIMESTAMP_NTZ",
425+
},
426+
)
427+
adapter = config.create_engine_adapter()
428+
assert adapter.trino_to_delta_timestamp_mapping is not None
429+
assert adapter.trino_to_delta_timestamp_mapping[
430+
exp.DataType.build("TIMESTAMP")
431+
] == exp.DataType.build("TIMESTAMP")
432+
433+
434+
def test_delta_timestamps_with_custom_mapping(make_mocked_engine_adapter: t.Callable):
435+
"""Test that _to_delta_ts respects custom trino_to_delta_timestamp_mapping."""
436+
# Create config with custom timestamp mapping using valid Delta Lake types
437+
# Delta Lake only supports: TIMESTAMP (TZ-aware) and TIMESTAMP_NTZ (no TZ)
438+
config = TrinoConnectionConfig(
439+
user="user",
440+
host="host",
441+
catalog="catalog",
442+
trino_to_delta_timestamp_mapping={
443+
"TIMESTAMP": "TIMESTAMP",
444+
"TIMESTAMP(1)": "TIMESTAMP",
445+
"TIMESTAMP WITH TIME ZONE": "TIMESTAMP",
446+
"TIMESTAMP(1) WITH TIME ZONE": "TIMESTAMP",
447+
},
448+
)
449+
450+
adapter = make_mocked_engine_adapter(
451+
TrinoEngineAdapter, trino_to_delta_timestamp_mapping=config.trino_to_delta_timestamp_mapping
452+
)
453+
454+
ts3_tz = exp.DataType.build("timestamp(3) with time zone")
455+
456+
columns_to_types = {
457+
"ts": exp.DataType.build("TIMESTAMP"),
458+
"ts_1": exp.DataType.build("TIMESTAMP(1)"),
459+
"ts_tz": exp.DataType.build("TIMESTAMP WITH TIME ZONE"),
460+
"ts_tz_1": exp.DataType.build("TIMESTAMP(1) WITH TIME ZONE"),
461+
}
462+
463+
delta_columns_to_types = adapter._to_delta_ts(columns_to_types)
464+
465+
# All types mapped to Delta's TIMESTAMP get converted to TIMESTAMPTZ then ts3_tz
466+
assert delta_columns_to_types == {
467+
"ts": ts3_tz,
468+
"ts_1": ts3_tz,
469+
"ts_tz": ts3_tz,
470+
"ts_tz_1": ts3_tz,
471+
}
472+
473+
474+
def test_delta_timestamps_with_partial_mapping(make_mocked_engine_adapter: t.Callable):
475+
"""Test that _to_delta_ts uses custom mapping for specified types and defaults for others."""
476+
config = TrinoConnectionConfig(
477+
user="user",
478+
host="host",
479+
catalog="catalog",
480+
trino_to_delta_timestamp_mapping={
481+
"TIMESTAMP": "TIMESTAMP",
482+
},
483+
)
484+
485+
adapter = make_mocked_engine_adapter(
486+
TrinoEngineAdapter, trino_to_delta_timestamp_mapping=config.trino_to_delta_timestamp_mapping
487+
)
488+
489+
ts6 = exp.DataType.build("timestamp(6)")
490+
ts3_tz = exp.DataType.build("timestamp(3) with time zone")
491+
492+
columns_to_types = {
493+
"ts": exp.DataType.build("TIMESTAMP"),
494+
"ts_1": exp.DataType.build("TIMESTAMP(1)"),
495+
"ts_tz": exp.DataType.build("TIMESTAMP WITH TIME ZONE"),
496+
}
497+
498+
delta_columns_to_types = adapter._to_delta_ts(columns_to_types)
499+
500+
# TIMESTAMP is in mapping → TIMESTAMP → TIMESTAMPTZ → ts3_tz
501+
# TIMESTAMP(1) is NOT in mapping (exact match), uses default TIMESTAMP → ts6
502+
# TIMESTAMP WITH TIME ZONE is NOT in mapping, uses default TIMESTAMPTZ → ts3_tz
503+
assert delta_columns_to_types == {
504+
"ts": ts3_tz,
505+
"ts_1": ts6, # Not in mapping, uses default
506+
"ts_tz": ts3_tz,
507+
}
508+
509+
407510
def test_table_format(trino_mocked_engine_adapter: TrinoEngineAdapter, mocker: MockerFixture):
408511
adapter = trino_mocked_engine_adapter
409512
mocker.patch(
@@ -755,3 +858,73 @@ def test_insert_overwrite_time_partition_iceberg(
755858
'DELETE FROM "my_catalog"."schema"."test_table" WHERE "b" BETWEEN \'2022-01-01\' AND \'2022-01-02\'',
756859
'INSERT INTO "my_catalog"."schema"."test_table" ("a", "b") SELECT "a", "b" FROM (SELECT "a", "b" FROM "tbl") AS "_subquery" WHERE "b" BETWEEN \'2022-01-01\' AND \'2022-01-02\'',
757860
]
861+
862+
863+
def test_delta_timestamps_with_non_timestamp_columns(make_mocked_engine_adapter: t.Callable):
864+
"""Test that _to_delta_ts handles non-timestamp columns alongside custom mapping."""
865+
config = TrinoConnectionConfig(
866+
user="user",
867+
host="host",
868+
catalog="catalog",
869+
trino_to_delta_timestamp_mapping={
870+
"TIMESTAMP": "TIMESTAMP",
871+
},
872+
)
873+
874+
adapter = make_mocked_engine_adapter(
875+
TrinoEngineAdapter, trino_to_delta_timestamp_mapping=config.trino_to_delta_timestamp_mapping
876+
)
877+
878+
ts6 = exp.DataType.build("timestamp(6)")
879+
ts3_tz = exp.DataType.build("timestamp(3) with time zone")
880+
881+
columns_to_types = {
882+
"ts": exp.DataType.build("TIMESTAMP"),
883+
"ts_1": exp.DataType.build("TIMESTAMP(1)"),
884+
"int_col": exp.DataType.build("INT"),
885+
"varchar_col": exp.DataType.build("VARCHAR(100)"),
886+
"decimal_col": exp.DataType.build("DECIMAL(10,2)"),
887+
}
888+
889+
delta_columns_to_types = adapter._to_delta_ts(columns_to_types)
890+
891+
# TIMESTAMP is in mapping → TIMESTAMP → TIMESTAMPTZ → ts3_tz
892+
# TIMESTAMP(1) is NOT in mapping (exact match), uses default TIMESTAMP → ts6
893+
# Non-timestamp columns should pass through unchanged
894+
assert delta_columns_to_types == {
895+
"ts": ts3_tz,
896+
"ts_1": ts6, # Not in mapping, uses default
897+
"int_col": exp.DataType.build("INT"),
898+
"varchar_col": exp.DataType.build("VARCHAR(100)"),
899+
"decimal_col": exp.DataType.build("DECIMAL(10,2)"),
900+
}
901+
902+
903+
def test_delta_timestamps_with_empty_mapping(make_mocked_engine_adapter: t.Callable):
904+
"""Test that _to_delta_ts handles empty custom mapping dictionary."""
905+
config = TrinoConnectionConfig(
906+
user="user",
907+
host="host",
908+
catalog="catalog",
909+
trino_to_delta_timestamp_mapping={},
910+
)
911+
912+
adapter = make_mocked_engine_adapter(
913+
TrinoEngineAdapter, trino_to_delta_timestamp_mapping=config.trino_to_delta_timestamp_mapping
914+
)
915+
916+
ts6 = exp.DataType.build("timestamp(6)")
917+
ts3_tz = exp.DataType.build("timestamp(3) with time zone")
918+
919+
columns_to_types = {
920+
"ts": exp.DataType.build("TIMESTAMP"),
921+
"ts_tz": exp.DataType.build("TIMESTAMP WITH TIME ZONE"),
922+
}
923+
924+
delta_columns_to_types = adapter._to_delta_ts(columns_to_types)
925+
926+
# With empty custom mapping, should fall back to defaults
927+
assert delta_columns_to_types == {
928+
"ts": ts6,
929+
"ts_tz": ts3_tz,
930+
}

tests/core/test_connection_config.py

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
import pytest
66
from _pytest.fixtures import FixtureRequest
7+
from sqlglot import exp
78
from unittest.mock import patch, MagicMock
89

910
from sqlmesh.core.config.connection import (
@@ -444,6 +445,73 @@ def test_trino_catalog_type_override(make_config):
444445
assert config.catalog_type_overrides == {"my_catalog": "iceberg"}
445446

446447

448+
def test_trino_trino_to_delta_timestamp_mapping(make_config):
449+
required_kwargs = dict(
450+
type="trino",
451+
user="user",
452+
host="host",
453+
catalog="catalog",
454+
)
455+
456+
# Test config without trino_to_delta_timestamp_mapping
457+
config = make_config(**required_kwargs)
458+
assert config.trino_to_delta_timestamp_mapping is None
459+
460+
# Test config with trino_to_delta_timestamp_mapping using valid Delta Lake types
461+
config = make_config(
462+
**required_kwargs,
463+
trino_to_delta_timestamp_mapping={
464+
"TIMESTAMP": "TIMESTAMP",
465+
"TIMESTAMP(3)": "TIMESTAMP_NTZ",
466+
},
467+
)
468+
469+
assert config.trino_to_delta_timestamp_mapping is not None
470+
assert config.trino_to_delta_timestamp_mapping[
471+
exp.DataType.build("TIMESTAMP")
472+
] == exp.DataType.build("TIMESTAMP")
473+
474+
# Test with invalid source type
475+
with pytest.raises(ConfigError) as exc_info:
476+
make_config(
477+
**required_kwargs,
478+
trino_to_delta_timestamp_mapping={
479+
"INVALID_TYPE": "TIMESTAMP",
480+
},
481+
)
482+
assert "Invalid SQL type string" in str(exc_info.value)
483+
assert "INVALID_TYPE" in str(exc_info.value)
484+
485+
# Test with invalid target type (not a valid SQL type)
486+
with pytest.raises(ConfigError) as exc_info:
487+
make_config(
488+
**required_kwargs,
489+
trino_to_delta_timestamp_mapping={
490+
"TIMESTAMP": "INVALID_TARGET_TYPE",
491+
},
492+
)
493+
assert "Invalid SQL type string" in str(exc_info.value)
494+
assert "INVALID_TARGET_TYPE" in str(exc_info.value)
495+
496+
# Test with invalid Delta Lake target type (valid SQL but not valid for Delta)
497+
with pytest.raises(ConfigError) as exc_info:
498+
make_config(
499+
**required_kwargs,
500+
trino_to_delta_timestamp_mapping={
501+
"TIMESTAMP": "TIMESTAMP(6)",
502+
},
503+
)
504+
assert "not a valid Delta Lake timestamp type" in str(exc_info.value)
505+
506+
# Test with empty mapping
507+
config = make_config(
508+
**required_kwargs,
509+
trino_to_delta_timestamp_mapping={},
510+
)
511+
assert config.trino_to_delta_timestamp_mapping is not None
512+
assert config.trino_to_delta_timestamp_mapping == {}
513+
514+
447515
def test_duckdb(make_config):
448516
config = make_config(
449517
type="duckdb",

0 commit comments

Comments
 (0)