Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,21 @@
cast,
)

from dagster._core.definitions.partitions.definition.multi import (
MultiPartitionsDefinition,
)
from dagster._core.definitions.partitions.definition.time_window import (
TimeWindowPartitionsDefinition,
)
try:
from dagster._core.definitions.partitions.definition.multi import (
MultiPartitionsDefinition,
)
from dagster._core.definitions.partitions.definition.time_window import (
TimeWindowPartitionsDefinition,
)
except ModuleNotFoundError:
from dagster._core.definitions.multi_dimensional_partitions import (
MultiPartitionsDefinition,
)
from dagster._core.definitions.time_window_partitions import (
TimeWindowPartitionsDefinition,
)

from dagster._core.execution.context.input import InputContext
from dagster._core.execution.context.output import OutputContext
from dagster._core.storage.db_io_manager import DbIOManager, TablePartitionDimension, TableSlice
Expand Down
36 changes: 20 additions & 16 deletions libraries/dagster-delta/dagster_delta/_db_io_manager/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,23 @@
AssetKey,
MultiPartitionKey,
MultiPartitionsDefinition,
PartitionsDefinition,
TimeWindowPartitionsDefinition,
)
from dagster._core.definitions.partitions.utils import TimeWindow

try:
from dagster._core.definitions.partitions.utils import TimeWindow
except ModuleNotFoundError:
from dagster._core.definitions.time_window_partitions import TimeWindow

from dagster._core.storage.db_io_manager import TablePartitionDimension
from pendulum import instance as pdi


def generate_multi_partitions_dimension(
asset_partition_keys: Sequence[str],
asset_partitions_def: MultiPartitionsDefinition,
partition_expr: Mapping[str, str],
asset_partitions_def: MultiPartitionsDefinition | PartitionsDefinition,
partition_expr: Mapping[str, str] | TimeWindow | None,
asset_key: AssetKey,
) -> list[TablePartitionDimension]:
"""Generates multi partition dimensions."""
Expand All @@ -27,7 +33,7 @@ def generate_multi_partitions_dimension(
cast(MultiPartitionKey, partition_key).keys_by_dimension
for partition_key in asset_partition_keys
]
for part in asset_partitions_def.partitions_defs:
for part in asset_partitions_def.partitions_defs: # type: ignore[attr-defined]
partitions: list[TimeWindow | str] = []
for multi_partition_key_mapping in multi_partition_key_mappings:
partition_key = multi_partition_key_mapping[part.name]
Expand All @@ -38,7 +44,7 @@ def generate_multi_partitions_dimension(
else:
partitions.append(partition_key)

partition_expr_str = partition_expr.get(part.name)
partition_expr_str = partition_expr.get(part.name) # type: ignore[attr-defined]
if partition_expr_str is None:
raise ValueError(
f"Asset '{asset_key}' has partition {part.name}, but the"
Expand Down Expand Up @@ -90,7 +96,7 @@ def generate_single_partition_dimension(
if isinstance(asset_partitions_time_window, TimeWindow):
partition_dimension = TablePartitionDimension(
partition_expr=partition_expr,
partitions=(asset_partitions_time_window if asset_partition_keys else []),
partitions=(asset_partitions_time_window if asset_partition_keys else []), # type: ignore
)
else:
partition_dimension = TablePartitionDimension(
Expand Down Expand Up @@ -132,16 +138,14 @@ def hourly_delta(self) -> int:

def is_consecutive(self) -> bool:
"""Checks whether the provided start dates of each partition timewindow is consecutive"""
return (
len(
{
pdi(self.start).add(hours=self.hourly_delta * i)
for i in range(date_diff(self.start, self.end).in_days() + 1)
}
- {pdi(d.start) for d in self._partitions},
)
== 1
)
expected_starts = {
pdi(self.start).add(hours=self.hourly_delta * i)
for i in range(len(set(self._partitions)))
}

actual_starts = {pdi(d.start) for d in self._partitions}

return expected_starts == actual_starts


def date_diff(start: dt.datetime, end: dt.datetime) -> pendulum.Interval:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,15 @@
MultiPartitionsDefinition,
OutputContext,
)
from dagster._core.definitions.partitions.definition.time_window import (
TimeWindowPartitionsDefinition,
)

try:
from dagster._core.definitions.partitions.definition.time_window import (
TimeWindowPartitionsDefinition,
)
except ModuleNotFoundError:
from dagster._core.definitions.time_window_partitions import (
TimeWindowPartitionsDefinition,
)


def extract_date_format_from_partition_definition(
Expand Down Expand Up @@ -64,7 +70,7 @@ def extract_date_format_from_partition_definition(
raise ValueError(
f"Partition_expr mapping is invalid. Partition_dimension :{partition_dims_definition.name} not found in partition_expr: {partition_expr}.",
)
date_format[partition_expr_name] = partition_dims_definition.partitions_def.fmt
date_format[partition_expr_name] = partition_dims_definition.partitions_def.fmt # type: ignore[attr-defined]
else:
raise ValueError(
"MultiPartitionsDefinition provided, so partion_expr needs to be a dictionary mapping of {dimension: column}",
Expand Down
16 changes: 11 additions & 5 deletions libraries/dagster-delta/dagster_delta/_handler/utils/dnf.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
from collections.abc import Iterable, Sequence
from typing import Optional, Union, cast

from dagster._core.definitions.partitions.utils import (
TimeWindow,
)
try:
from dagster._core.definitions.partitions.utils import (
TimeWindow,
)
except ImportError:
from dagster._core.definitions.time_window_partitions import (
TimeWindow,
)

from dagster._core.storage.db_io_manager import TablePartitionDimension
from deltalake.schema import Field as DeltaField
from deltalake.schema import PrimitiveType, Schema
Expand Down Expand Up @@ -79,8 +85,8 @@ def _value_dnf(
start_dt = min(start_dts)
end_dt = max(end_dts)
else:
start_dt = table_partition.partitions.start
end_dt = table_partition.partitions.end
start_dt = table_partition.partitions.start # type: ignore[attr-defined]
end_dt = table_partition.partitions.end # type: ignore[attr-defined]

start_dt = start_dt.strftime(date_format)
end_dt = end_dt.strftime(date_format)
Expand Down
7 changes: 6 additions & 1 deletion libraries/dagster-delta/dagster_delta/io_manager/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,12 @@

from dagster import InputContext, OutputContext
from dagster._config.pythonic_config import ConfigurableIOManagerFactory
from dagster._core.definitions.partitions.utils import TimeWindow

try:
from dagster._core.definitions.partitions.utils import TimeWindow
except ModuleNotFoundError:
from dagster._core.definitions.time_window_partitions import TimeWindow

from dagster._core.storage.db_io_manager import (
DbClient,
DbTypeHandler,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,30 +44,118 @@ def daily_partitions_time_window_not_consecutive() -> list[TimeWindow]:
]


def test_multi_time_partitions_checker_consecutive(
@pytest.fixture
def hourly_partitions_time_window_consecutive() -> list[TimeWindow]:
return [
TimeWindow(
start=dt.datetime(2022, 1, 1, 1),
end=dt.datetime(2022, 1, 1, 2),
),
TimeWindow(
start=dt.datetime(2022, 1, 1, 2),
end=dt.datetime(2022, 1, 1, 3),
),
TimeWindow(
start=dt.datetime(2022, 1, 1, 3),
end=dt.datetime(2022, 1, 1, 4),
),
]


@pytest.fixture
def hourly_partitions_time_window_not_consecutive() -> list[TimeWindow]:
return [
TimeWindow(
start=dt.datetime(2022, 1, 1, 1),
end=dt.datetime(2022, 1, 1, 2),
),
TimeWindow(
start=dt.datetime(2022, 1, 1, 2),
end=dt.datetime(2022, 1, 1, 3),
),
TimeWindow(
start=dt.datetime(2022, 1, 1, 4),
end=dt.datetime(2022, 1, 1, 5),
),
]


# NB: The MonthlyPartitionDefinition in dagster will orchestrate the partitions into separate jobs to avoid issues with different month duration in hours so we only need to check a single month at a time. We need to check a single month doesn't collide witht the code like in the case of PR #20
@pytest.fixture
def monthly_partitions_time_window() -> list[TimeWindow]:
return [
TimeWindow(
start=dt.datetime(2022, 1, 1, 0),
end=dt.datetime(2022, 2, 1, 0),
),
]


def test_multi_time_partitions_monthly_checker(
monthly_partitions_time_window: list[TimeWindow],
):
checker = utils.MultiTimePartitionsChecker(
partitions=monthly_partitions_time_window,
)

assert checker.hourly_delta == 744
assert checker.start == dt.datetime(2022, 1, 1, 0)
assert checker.end == dt.datetime(2022, 2, 1, 0)
assert checker.is_consecutive()


def test_multi_time_partitions_daily_checker_consecutive(
daily_partitions_time_window_consecutive: list[TimeWindow],
):
checker = utils.MultiTimePartitionsChecker(
partitions=daily_partitions_time_window_consecutive,
)

assert checker.hourly_delta == 24
assert checker.start == dt.datetime(2022, 1, 1, 0)
assert checker.end == dt.datetime(2022, 1, 4, 0)
assert checker.hourly_delta == 24
assert checker.is_consecutive()


def test_multi_time_partitions_checker_non_consecutive(
def test_multi_time_partitions_daily_checker_non_consecutive(
daily_partitions_time_window_not_consecutive: list[TimeWindow],
):
checker = utils.MultiTimePartitionsChecker(
partitions=daily_partitions_time_window_not_consecutive,
)

assert checker.hourly_delta == 24
assert checker.start == dt.datetime(2022, 1, 1, 0)
assert checker.end == dt.datetime(2022, 1, 5, 0)
assert not checker.is_consecutive()


def test_multi_time_partitions_hourly_checker_consecutive(
hourly_partitions_time_window_consecutive: list[TimeWindow],
):
checker = utils.MultiTimePartitionsChecker(
partitions=hourly_partitions_time_window_consecutive,
)

assert checker.hourly_delta == 1
assert checker.start == dt.datetime(2022, 1, 1, 1)
assert checker.end == dt.datetime(2022, 1, 1, 4)
assert checker.is_consecutive()


def test_multi_time_partitions_hourly_checker_non_consecutive(
hourly_partitions_time_window_not_consecutive: list[TimeWindow],
):
checker = utils.MultiTimePartitionsChecker(
partitions=hourly_partitions_time_window_not_consecutive,
)

assert checker.hourly_delta == 1
assert checker.start == dt.datetime(2022, 1, 1, 1)
assert checker.end == dt.datetime(2022, 1, 1, 5)
assert not checker.is_consecutive()


def test_generate_single_partition_dimension_static():
partition_dimension = utils.generate_single_partition_dimension(
partition_expr="color_column",
Expand Down
2 changes: 1 addition & 1 deletion libraries/dagster-delta/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "dagster-delta"
version = "0.5.1"
version = "0.5.2"
description = "Deltalake IO Managers for Dagster with pyarrow and Polars support."
readme = "README.md"
requires-python = ">=3.9"
Expand Down
4 changes: 2 additions & 2 deletions libraries/dagster-delta/uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.