Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/databricks/labs/remorph/reconcile/constants.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from enum import Enum, auto

COLUMN_PLACEHOLDER = "{}"

class AutoName(Enum):
"""
Expand Down
36 changes: 36 additions & 0 deletions src/databricks/labs/remorph/reconcile/metadata/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from dataclasses import dataclass, field
from typing import Any


@dataclass
class TableFQN:
catalog: str | None
schema: str
name: str

@property
def fqn(self) -> str:
if self.catalog:
return f"{self.catalog}.{self.schema}.{self.name}"
else:
return f"{self.schema}.{self.name}"


@dataclass
class FieldInfo:
name: str
data_type: str
nullable: bool | None = None
metadata: dict[str, Any] | None = None


@dataclass
class TableDefinition:
fqn: TableFQN
location: str | None = None
table_format: str | None = None
view_text: str | None = None
columns: list[FieldInfo] = field(default_factory=list)
primary_keys: list[str] | None = None
size_gb: int = 0
comment: str | None = None
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from sqlglot import parse_one
from sqlglot.dialects import Dialect
from sqlglot.expressions import Cast, Column, Expression, DataType

from databricks.labs.remorph.recon.normalization.normalizer import Normalizer


class DatabricksNormalizer(Normalizer):
def __init__(self, dialect: Dialect, default_normalizations: dict[str, str]):
super().__init__(dialect, default_normalizations)

def normalize(self, data_type: DataType, column: Column) -> Expression:
if data_type.this == DataType.Type.BOOLEAN:
return self.to_string(Cast(this=column, to=DataType.Type.INT))
if data_type.this == DataType.Type.DATE:
return parse_one(f"date_format({column.sql()}, 'yyyy-MM-dd')", dialect=self.dialect)
# if TIMESTAMP type
# convert to a string with 'yyyy-MM-dd HH:mm:ss.SSSSSS' format
# if fractional number type
# value = parse_one(f"cast({column.sql()} as decimal(38, {scale}))", dialect=self.dialect)
# if scale > 0:
# value = parse_one(f"format_number({value.sql()}, {scale})", dialect=self.dialect)
# return parse_one(f"replace({self.to_string(value)}, ',', '')", dialect=self.dialect)
return self.apply_default_normalizations(data_type, column)
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
from abc import abstractmethod, ABC

from sqlglot import parse_one
from sqlglot.dialects import Dialect
from sqlglot.expressions import Cast, Column, Expression, DataType

from databricks.labs.remorph.reconcile.constants import COLUMN_PLACEHOLDER


class Normalizer(ABC):
def __init__(
self,
dialect: Dialect,
default_normalizations: dict[str, str],
):
self._dialect = dialect
self._default_normalizations = self._prep_default_normalizations(default_normalizations)

@abstractmethod
def normalize(self, data_type: DataType, column: Column) -> Expression:
pass

@property
def dialect(self) -> Dialect:
return self._dialect

def apply_default_normalizations(self, data_type: DataType, column: Column) -> Expression:
transformation = self._default_normalizations.get(data_type)
if not transformation:
return column
if COLUMN_PLACEHOLDER in transformation:
return parse_one(transformation.format(column.sql()), dialect=self.dialect)
return parse_one(transformation, dialect=self.dialect)

def to_string(self, expr: Expression):
return Cast(this=expr, to=DataType.Type.TEXT)

def _prep_default_normalizations(self, default_normalizations: dict[str, str]) -> dict[DataType, str]:
if not default_normalizations:
return {}

normalizations = {}
for dt, expr in default_normalizations.items():
data_type = DataType.build(dt, dialect=self.dialect, udt=True)
normalizations[data_type] = expr
return normalizations
25 changes: 25 additions & 0 deletions src/databricks/labs/remorph/reconcile/normalization/oracle.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from sqlglot.dialects import Dialect
from sqlglot.expressions import Cast, Column, Expression, DataType, Trim

from databricks.labs.remorph.recon.normalization.normalizer import Normalizer


class OracleNormalizer(Normalizer):
def __init__(self, dialect: Dialect, default_normalizations: dict[str, str]):
super().__init__(dialect, default_normalizations)

def normalize(self, data_type: DataType, column: Column) -> Expression:
if data_type.this == DataType.Type.BOOLEAN:
return self.to_string(column)
if data_type.this == DataType.Type.UUID:
return Cast(this=Trim(this=column), to=DataType.build("VARCHAR(36)", dialect=self.dialect))
if data_type.this == DataType.Type.DATE:
return self.to_string(column)
# if TIMESTAMP type
# convert to a string with 'YYYY-MM-DD HH24:MI:SS.FF6' format
# if fractional number type
# convert to a string with FM999.9990 based format based on precision and scale
return self.apply_default_normalizations(data_type, column)

def to_string(self, expr: Expression):
return Cast(this=expr, to=DataType.build("VARCHAR(1024)", dialect=self.dialect))
22 changes: 22 additions & 0 deletions src/databricks/labs/remorph/reconcile/normalization/snow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from sqlglot.dialects import Dialect
from sqlglot.expressions import Cast, Column, Expression, DataType

from databricks.labs.remorph.recon.normalization.normalizer import Normalizer


class SnowNormalizer(Normalizer):
def __init__(self, dialect: Dialect, default_normalizations: dict[str, str]):
super().__init__(dialect, default_normalizations)

def normalize(self, data_type: DataType, column: Column) -> Expression:
if data_type.this == DataType.Type.BOOLEAN:
return self.to_string(Cast(this=column, to=DataType.Type.INT))
if data_type.this == DataType.Type.UUID:
return self.to_string(column)
if data_type.this == DataType.Type.DATE:
return self.to_string(column)
# if TIMESTAMP type
# convert to a string with 'YYYY-MM-DD HH24:MI:SS.FF6' format
# if fractional number type
# convert to a string with CAST(column AS DECIMAL(38, scale))
return self.apply_default_normalizations(data_type, column)
Loading