Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 49 additions & 18 deletions libraries/dagster-delta/README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# dagster-delta
Dagster deltalake implementation for Pyarrow & Polars. Originally forked from dagster-deltalake with customizations.
Dagster deltalake implementation for Pyarrow & Polars. Originally forked from dagster-deltalake with customizations.

The IO Managers support partition mapping, custom write modes, special metadata configuration for advanced use cases.

Expand All @@ -20,6 +20,7 @@ dagster-delta supports MERGE execution with a couple pre-defined MERGE types (da
- **update_only** <- updates only the matches records
- **upsert** <- updates existing matches and inserts non matched records
- **replace_and_delete_unmatched** <- updates existing matches and deletes unmatched
- **custom** <- custom Merge with MergeOperationsConfig

Example:
```python
Expand Down Expand Up @@ -47,6 +48,36 @@ defs = Definitions(
)
```

Custom merge (gives full control)
```python
from dagster_delta import DeltaLakePolarsIOManager, WriteMode, MergeConfig, MergeType, MergeOperationsConfig
from dagster_delta_polars import DeltaLakePolarsIOManager

@asset(
key_prefix=["my_schema"] # will be used as the schema (parent folder) in Delta Lake
)
def my_table() -> pl.DataFrame: # the name of the asset will be the table name
...

defs = Definitions(
assets=[my_table],
resources={"io_manager": DeltaLakePolarsIOManager(
root_uri="s3://bucket",
mode=WriteMode.merge, # or just "merge"
merge_config=MergeConfig(
merge_type=MergeType.custom,
predicate="s.a = t.a",
source_alias="s",
target_alias="t",
merge_operations_config=MergeOperationsConfig(
when_not_matched_insert_all=[WhenNotMatchedInsertAll(predicate="s.price > 600")],
when_matched_update_all=[WhenMatchedUpdateAll()],
),
)
)}
)
```

## Special metadata configurations

### **Add** additional `table_configuration`
Expand Down Expand Up @@ -190,28 +221,28 @@ def asset_partitioned_1(upstream_1: pl.DataFrame, upstream_2: pl.DataFrame) -> p
def asset_partitioned_2(upstream_3: pl.DataFrame, upstream_4: pl.DataFrame) -> pl.DataFrame:
...

```
```

Effectively this would be the flow:

```
{static_partition_def: [a,b]}
┌───────────┐
│upstream 1 ├─┐ ┌────────────────────────┐
└───────────┘ │ │ │ write to storage on partition (a,b)
┌───────────┐ └─► asset_partitioned_1 ├──────────────────────┐
│upstream 2 ├───► │ │
```

{static_partition_def: [a,b]}
┌───────────┐
│upstream 1 ├─┐ ┌────────────────────────┐
└───────────┘ │ │ │ write to storage on partition (a,b)
┌───────────┐ └─► asset_partitioned_1 ├──────────────────────┐
│upstream 2 ├───► │ │
└───────────┘ └────────────────────────┘ ┌──────────────▼──────────────────┐
│ partitions │
│ asset_partitioned: │
│ [a,b,c,d] │
┌───────────┐ ┌────────────────────────┐ └──────────────▲──────────────────┘
│upstream 3 ├──┐│ │ │
└───────────┘ └► asset_partitioned_2 │ │
┌───────────┐ ┌─► ├──────────────────────┘
│upstream 4 ├─┘ └────────────────────────┘ write to storage on partition (c,d)
└───────────┘
{static_partition_def: [c,d]}
```
│upstream 3 ├──┐│ │ │
└───────────┘ └► asset_partitioned_2 │ │
┌───────────┐ ┌─► ├──────────────────────┘
│upstream 4 ├─┘ └────────────────────────┘ write to storage on partition (c,d)
└───────────┘
{static_partition_def: [c,d]}

```
16 changes: 16 additions & 0 deletions libraries/dagster-delta/dagster_delta/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,16 @@
GcsConfig,
LocalConfig,
MergeConfig,
MergeOperationsConfig,
MergeType,
S3Config,
WhenMatchedDelete,
WhenMatchedUpdate,
WhenMatchedUpdateAll,
WhenNotMatchedBySourceDelete,
WhenNotMatchedBySourceUpdate,
WhenNotMatchedInsert,
WhenNotMatchedInsertAll,
)
from dagster_delta.io_manager.arrow import DeltaLakePyarrowIOManager
from dagster_delta.io_manager.base import (
Expand All @@ -32,6 +40,14 @@
"DeltaTableResource",
"BaseDeltaLakeIOManager",
"DeltaLakePyarrowIOManager",
"WhenMatchedDelete",
"WhenMatchedUpdate",
"WhenMatchedUpdateAll",
"WhenNotMatchedBySourceDelete",
"WhenNotMatchedBySourceUpdate",
"WhenNotMatchedInsert",
"WhenNotMatchedInsertAll",
"MergeOperationsConfig",
]


Expand Down
7 changes: 6 additions & 1 deletion libraries/dagster-delta/dagster_delta/_handler/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
partition_dimensions_to_dnf,
read_table,
)
from dagster_delta.config import MergeConfig
from dagster_delta.io_manager.base import (
TableConnection,
_DeltaTableIOManagerResourceConfig,
Expand Down Expand Up @@ -63,7 +64,10 @@ def handle_output(
logger = logging.getLogger()
logger.setLevel("DEBUG")
definition_metadata = context.definition_metadata or {}

merge_predicate_from_metadata = definition_metadata.get("merge_predicate")
merge_operations_config_from_metadata = definition_metadata.get("merge_operations_config")

additional_table_config = definition_metadata.get("table_configuration", {})
if connection.table_config is not None:
table_config = additional_table_config | connection.table_config
Expand Down Expand Up @@ -179,12 +183,13 @@ def handle_output(
merge_stats = merge_execute(
dt,
data,
merge_config,
MergeConfig.model_validate(merge_config),
writer_properties=writer_properties,
commit_properties=commit_properties,
custom_metadata=custom_metadata,
delta_params=delta_params,
merge_predicate_from_metadata=merge_predicate_from_metadata,
merge_operations_config=merge_operations_config_from_metadata,
partition_filters=partition_filters,
)

Expand Down
82 changes: 72 additions & 10 deletions libraries/dagster-delta/dagster_delta/_handler/merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@
import pyarrow as pa
import pyarrow.dataset as ds
from deltalake import CommitProperties, DeltaTable, WriterProperties
from deltalake.table import FilterLiteralType
from deltalake.table import FilterLiteralType, TableMerger

from dagster_delta._handler.utils import create_predicate
from dagster_delta.config import MergeType
from dagster_delta.config import MergeConfig, MergeOperationsConfig, MergeType

T = TypeVar("T")
ArrowTypes = Union[pa.Table, pa.RecordBatchReader, ds.Dataset]
Expand All @@ -16,25 +16,26 @@
def merge_execute(
dt: DeltaTable,
data: Union[pa.RecordBatchReader, pa.Table],
merge_config: dict[str, Any],
merge_config: MergeConfig,
writer_properties: Optional[WriterProperties],
commit_properties: Optional[CommitProperties],
custom_metadata: Optional[dict[str, str]],
delta_params: dict[str, Any],
merge_predicate_from_metadata: Optional[str],
merge_operations_config: Optional[MergeOperationsConfig],
partition_filters: Optional[list[FilterLiteralType]] = None,
) -> dict[str, Any]:
merge_type = merge_config.get("merge_type")
error_on_type_mismatch = merge_config.get("error_on_type_mismatch", True)
merge_type = merge_config.merge_type
error_on_type_mismatch = merge_config.error_on_type_mismatch

if merge_predicate_from_metadata is not None:
predicate = str(merge_predicate_from_metadata)
elif merge_config.get("predicate") is not None:
predicate = str(merge_config.get("predicate"))
predicate = merge_predicate_from_metadata
elif merge_config.predicate is not None:
predicate = merge_config.predicate
else:
raise Exception("merge predicate was not provided")

target_alias = merge_config.get("target_alias")
target_alias = merge_config.target_alias

if partition_filters is not None:
partition_predicate = create_predicate(partition_filters, target_alias=target_alias)
Expand All @@ -47,7 +48,7 @@ def merge_execute(
merger = dt.merge(
source=data,
predicate=predicate,
source_alias=merge_config.get("source_alias"),
source_alias=merge_config.source_alias,
target_alias=target_alias,
error_on_type_mismatch=error_on_type_mismatch,
writer_properties=writer_properties,
Expand All @@ -64,5 +65,66 @@ def merge_execute(
return merger.when_matched_update_all().when_not_matched_insert_all().execute()
elif merge_type == MergeType.replace_delete_unmatched:
return merger.when_matched_update_all().when_not_matched_by_source_delete().execute()
elif merge_type == MergeType.custom:
if merge_operations_config is not None:
operations_config = merge_operations_config
elif merge_config.merge_operations_config is not None:
operations_config = merge_config.merge_operations_config
else:
raise Exception("merge operations config was not provided")
operations_config = MergeOperationsConfig.model_validate(operations_config)
return apply_merge_operations(merger, operations_config).execute()
else:
raise NotImplementedError


def apply_merge_operations(
merger: TableMerger,
operations_config: MergeOperationsConfig,
) -> TableMerger:
if operations_config.when_not_matched_insert is not None:
for match in operations_config.when_not_matched_insert:
merger = merger.when_not_matched_insert(
predicate=match.predicate,
updates=match.updates,
)
if operations_config.when_not_matched_insert_all is not None:
for match in operations_config.when_not_matched_insert_all:
merger = merger.when_not_matched_insert_all(
predicate=match.predicate,
except_cols=match.except_cols,
)
if operations_config.when_matched_update is not None:
for match in operations_config.when_matched_update:
merger = merger.when_matched_update(
predicate=match.predicate,
updates=match.updates,
)

if operations_config.when_matched_update_all is not None:
for match in operations_config.when_matched_update_all:
merger = merger.when_matched_update_all(
predicate=match.predicate,
except_cols=match.except_cols,
)

if operations_config.when_matched_delete is not None:
for match in operations_config.when_matched_delete:
merger = merger.when_matched_delete(
predicate=match.predicate,
)

if operations_config.when_not_matched_by_source_delete is not None:
for match in operations_config.when_not_matched_by_source_delete:
merger = merger.when_not_matched_by_source_delete(
predicate=match.predicate,
)

if operations_config.when_not_matched_by_source_update is not None:
for match in operations_config.when_not_matched_by_source_update:
merger = merger.when_not_matched_by_source_update(
updates=match.updates,
predicate=match.predicate,
)

return merger
82 changes: 82 additions & 0 deletions libraries/dagster-delta/dagster_delta/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,12 +252,91 @@ class MergeType(str, Enum):
- "update_only" <- updates only the matches records
- "upsert" <- updates existing matches and inserts non matched records
- "replace_and_delete_unmatched" <- updates existing matches and deletes unmatched
- "custom" <- requires MergeOperationsConfig to be provided
"""

deduplicate_insert = "deduplicate_insert" # Deduplicates on write
update_only = "update_only" # updates only the records
upsert = "upsert" # updates and inserts
replace_delete_unmatched = "replace_and_delete_unmatched"
custom = "custom"


class OperationConfig(Config):
"""Basic operation config"""

predicate: Optional[str] = None


class OperationAllConfig(Config):
"""Configuration for `_all` operations"""

predicate: Optional[str] = None
except_cols: Optional[list[str]] = None


class OperationWithUpdatesConfig(Config):
"""Configuration for operations that allow specific column updates"""

updates: dict[str, str]
predicate: Optional[str] = None


class WhenNotMatchedInsert(OperationWithUpdatesConfig):
"""When not matched statement"""

pass


class WhenNotMatchedInsertAll(OperationAllConfig):
"""When not matched insert all statement"""

pass


class WhenMatchedUpdate(OperationWithUpdatesConfig):
"""When matched update statement"""

pass


class WhenMatchedUpdateAll(OperationAllConfig):
"""When matched update all statement"""

pass


class WhenMatchedDelete(OperationConfig):
"""When matched delete statement"""

pass


class WhenNotMatchedBySourceDelete(OperationConfig):
"""When not matched by source delete statement"""

pass


class WhenNotMatchedBySourceUpdate(OperationWithUpdatesConfig):
"""When not matched by source update statement"""

pass


class MergeOperationsConfig(Config):
"""Configuration for each merge operation. Only used with merge_type 'custom'.

If you have multiple when statements of a single operation, they are evaluated in the order as provided in the list.
"""

when_not_matched_insert: Optional[list[WhenNotMatchedInsert]] = None
when_not_matched_insert_all: Optional[list[WhenNotMatchedInsertAll]] = None
when_matched_update: Optional[list[WhenMatchedUpdate]] = None
when_matched_update_all: Optional[list[OperationAllConfig]] = None
when_matched_delete: Optional[list[WhenMatchedDelete]] = None
when_not_matched_by_source_delete: Optional[list[WhenNotMatchedBySourceDelete]] = None
when_not_matched_by_source_update: Optional[list[WhenNotMatchedBySourceUpdate]] = None


class MergeConfig(Config):
Expand All @@ -279,3 +358,6 @@ class MergeConfig(Config):

error_on_type_mismatch: bool = True
"""specify if merge will return error if data types are mismatching"""

merge_operations_config: Optional[MergeOperationsConfig] = None
"""Full configuration of each merge operation, only use with merge_type='custom'"""
Loading