Skip to content

Commit 867a482

Browse files
authored
add dynamic store connection APIs (#28655)
## Summary & Motivation Creates pagination connection API methods for fetching partition keys from the dynamic store (backed by the instance / event_log_storage). ## How I Tested These Changes BK
1 parent 43722d7 commit 867a482

File tree

10 files changed

+209
-0
lines changed

10 files changed

+209
-0
lines changed

python_modules/dagster/dagster/_core/definitions/partition.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -479,6 +479,14 @@ def __init__(self, instance: DagsterInstance):
479479
def get_dynamic_partitions(self, partitions_def_name: str) -> Sequence[str]:
480480
return self._instance.get_dynamic_partitions(partitions_def_name)
481481

482+
@cached_method
483+
def get_paginated_dynamic_partitions(
484+
self, partitions_def_name: str, limit: int, ascending: bool, cursor: Optional[str] = None
485+
) -> PaginatedResults[str]:
486+
return self._instance.get_paginated_dynamic_partitions(
487+
partitions_def_name=partitions_def_name, limit=limit, ascending=ascending, cursor=cursor
488+
)
489+
482490
@cached_method
483491
def has_dynamic_partition(self, partitions_def_name: str, partition_key: str) -> bool:
484492
return self._instance.has_dynamic_partition(partitions_def_name, partition_key)

python_modules/dagster/dagster/_core/definitions/run_request.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
ASSET_PARTITION_RANGE_START_TAG,
2929
PARTITION_NAME_TAG,
3030
)
31+
from dagster._core.types.pagination import PaginatedResults
3132
from dagster._record import IHaveNew, LegacyNamedTupleMixin, record, record_custom
3233
from dagster._utils.cached_method import cached_method
3334
from dagster._utils.error import SerializableErrorInfo
@@ -294,6 +295,15 @@ def get_dynamic_partitions(self, partitions_def_name: str) -> Sequence[str]:
294295
)
295296
return list((partition_keys | added_partition_keys) - deleted_partition_keys)
296297

298+
@cached_method
299+
def get_paginated_dynamic_partitions(
300+
self, partitions_def_name: str, limit: int, ascending: bool, cursor: Optional[str] = None
301+
) -> PaginatedResults[str]:
302+
partition_keys = self.get_dynamic_partitions(partitions_def_name)
303+
return PaginatedResults.create_from_sequence(
304+
seq=partition_keys, limit=limit, ascending=ascending, cursor=cursor
305+
)
306+
297307
def has_dynamic_partition(self, partitions_def_name: str, partition_key: str) -> bool:
298308
return partition_key not in self.deleted_partition_keys_by_partitions_def_name.get(
299309
partitions_def_name, set()

python_modules/dagster/dagster/_core/instance/__init__.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@
8181
TAGS_TO_MAYBE_OMIT_ON_RETRY,
8282
WILL_RETRY_TAG,
8383
)
84+
from dagster._core.types.pagination import PaginatedResults
8485
from dagster._serdes import ConfigurableClass
8586
from dagster._time import get_current_datetime, get_current_timestamp
8687
from dagster._utils import PrintFn, is_uuid, traced
@@ -336,6 +337,11 @@ class DynamicPartitionsStore(Protocol):
336337
@abstractmethod
337338
def get_dynamic_partitions(self, partitions_def_name: str) -> Sequence[str]: ...
338339

340+
@abstractmethod
341+
def get_paginated_dynamic_partitions(
342+
self, partitions_def_name: str, limit: int, ascending: bool, cursor: Optional[str] = None
343+
) -> PaginatedResults[str]: ...
344+
339345
@abstractmethod
340346
def has_dynamic_partition(self, partitions_def_name: str, partition_key: str) -> bool: ...
341347

@@ -2397,6 +2403,26 @@ def get_dynamic_partitions(self, partitions_def_name: str) -> Sequence[str]:
23972403
check.str_param(partitions_def_name, "partitions_def_name")
23982404
return self._event_storage.get_dynamic_partitions(partitions_def_name)
23992405

2406+
@traced
2407+
def get_paginated_dynamic_partitions(
2408+
self, partitions_def_name: str, limit: int, ascending: bool, cursor: Optional[str] = None
2409+
) -> PaginatedResults[str]:
2410+
"""Get a paginatable subset of partition keys for the specified :py:class:`DynamicPartitionsDefinition`.
2411+
2412+
Args:
2413+
partitions_def_name (str): The name of the `DynamicPartitionsDefinition`.
2414+
limit (int): Maximum number of partition keys to return.
2415+
ascending (bool): The order of dynamic partitions to return.
2416+
cursor (Optional[str]): Cursor to use for pagination. Defaults to None.
2417+
"""
2418+
check.str_param(partitions_def_name, "partitions_def_name")
2419+
check.int_param(limit, "limit")
2420+
check.bool_param(ascending, "ascending")
2421+
check.opt_str_param(cursor, "cursor")
2422+
return self._event_storage.get_paginated_dynamic_partitions(
2423+
partitions_def_name=partitions_def_name, limit=limit, ascending=ascending, cursor=cursor
2424+
)
2425+
24002426
@public
24012427
@traced
24022428
def add_dynamic_partitions(

python_modules/dagster/dagster/_core/storage/event_log/base.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
from dagster._core.storage.partition_status_cache import get_and_update_asset_status_cache_value
3737
from dagster._core.storage.sql import AlembicVersion
3838
from dagster._core.storage.tags import MULTIDIMENSIONAL_PARTITION_PREFIX
39+
from dagster._core.types.pagination import PaginatedResults
3940
from dagster._record import record
4041
from dagster._utils import PrintFn
4142
from dagster._utils.concurrency import ConcurrencyClaimStatus, ConcurrencyKeyInfo
@@ -524,6 +525,12 @@ def get_dynamic_partitions(self, partitions_def_name: str) -> Sequence[str]:
524525
"""Get the list of partition keys for a dynamic partitions definition."""
525526
raise NotImplementedError()
526527

528+
@abstractmethod
529+
def get_paginated_dynamic_partitions(
530+
self, partitions_def_name: str, limit: int, ascending: bool, cursor: Optional[str] = None
531+
) -> PaginatedResults[str]:
532+
raise NotImplementedError()
533+
527534
@abstractmethod
528535
def has_dynamic_partition(self, partitions_def_name: str, partition_key: str) -> bool:
529536
"""Check if a dynamic partition exists."""

python_modules/dagster/dagster/_core/storage/event_log/sql_event_log.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@
9595
db_select,
9696
db_subquery,
9797
)
98+
from dagster._core.types.pagination import PaginatedResults, StorageIdCursor
9899
from dagster._serdes import deserialize_value, serialize_value
99100
from dagster._time import datetime_from_timestamp, get_current_timestamp, utc_datetime_from_naive
100101
from dagster._utils import PrintFn
@@ -2027,6 +2028,47 @@ def get_dynamic_partitions(self, partitions_def_name: str) -> Sequence[str]:
20272028

20282029
return [cast("str", row[1]) for row in rows]
20292030

2031+
def get_paginated_dynamic_partitions(
2032+
self, partitions_def_name: str, limit: int, ascending: bool, cursor: Optional[str] = None
2033+
) -> PaginatedResults[str]:
2034+
self._check_partitions_table()
2035+
order_by = (
2036+
DynamicPartitionsTable.c.id.asc() if ascending else DynamicPartitionsTable.c.id.desc()
2037+
)
2038+
query = (
2039+
db_select(
2040+
[
2041+
DynamicPartitionsTable.c.id,
2042+
DynamicPartitionsTable.c.partition,
2043+
]
2044+
)
2045+
.where(DynamicPartitionsTable.c.partitions_def_name == partitions_def_name)
2046+
.order_by(order_by)
2047+
.limit(limit)
2048+
)
2049+
if cursor:
2050+
last_storage_id = StorageIdCursor.from_cursor(cursor).storage_id
2051+
if ascending:
2052+
query = query.where(DynamicPartitionsTable.c.id > last_storage_id)
2053+
else:
2054+
query = query.where(DynamicPartitionsTable.c.id < last_storage_id)
2055+
2056+
with self.index_connection() as conn:
2057+
rows = conn.execute(query).fetchall()
2058+
2059+
if rows:
2060+
next_cursor = StorageIdCursor(storage_id=cast(int, rows[-1][0])).to_string()
2061+
elif cursor:
2062+
next_cursor = cursor
2063+
else:
2064+
next_cursor = StorageIdCursor(storage_id=-1).to_string()
2065+
2066+
return PaginatedResults(
2067+
results=[cast(str, row[1]) for row in rows],
2068+
cursor=next_cursor,
2069+
has_more=len(rows) == limit,
2070+
)
2071+
20302072
def has_dynamic_partition(self, partitions_def_name: str, partition_key: str) -> bool:
20312073
self._check_partitions_table()
20322074
query = (

python_modules/dagster/dagster/_core/storage/legacy_storage.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
from dagster._core.storage.runs.base import RunStorage
3030
from dagster._core.storage.schedules.base import ScheduleStorage
3131
from dagster._core.storage.sql import AlembicVersion
32+
from dagster._core.types.pagination import PaginatedResults
3233
from dagster._serdes import ConfigurableClass, ConfigurableClassData
3334
from dagster._utils import PrintFn
3435
from dagster._utils.concurrency import ConcurrencyClaimStatus, ConcurrencyKeyInfo
@@ -621,6 +622,13 @@ def get_latest_asset_partition_materialization_attempts_without_materializations
621622
def get_dynamic_partitions(self, partitions_def_name: str) -> Sequence[str]:
622623
return self._storage.event_log_storage.get_dynamic_partitions(partitions_def_name)
623624

625+
def get_paginated_dynamic_partitions(
626+
self, partitions_def_name: str, limit: int, ascending: bool, cursor: Optional[str] = None
627+
) -> PaginatedResults[str]:
628+
return self._storage.event_log_storage.get_paginated_dynamic_partitions(
629+
partitions_def_name=partitions_def_name, limit=limit, ascending=ascending, cursor=cursor
630+
)
631+
624632
def has_dynamic_partition(self, partitions_def_name: str, partition_key: str) -> bool:
625633
return self._storage.event_log_storage.has_dynamic_partition(
626634
partitions_def_name, partition_key

python_modules/dagster/dagster/_core/types/pagination.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,3 +83,22 @@ def to_string(self) -> str:
8383
@classmethod
8484
def from_cursor(cls, cursor: str):
8585
return deserialize_value(base64.b64decode(cursor).decode("utf-8"), cls)
86+
87+
88+
@whitelist_for_serdes
89+
@record
90+
class StorageIdCursor:
91+
storage_id: int
92+
93+
def __str__(self) -> str:
94+
return self.to_string()
95+
96+
def to_string(self) -> str:
97+
string_serialized = serialize_value(self)
98+
return base64.b64encode(bytes(string_serialized, encoding="utf-8")).decode(
99+
"utf-8"
100+
)
101+
102+
@classmethod
103+
def from_cursor(cls, cursor: str):
104+
return deserialize_value(base64.b64decode(cursor).decode("utf-8"), cls)

python_modules/dagster/dagster/_utils/caching_instance_queryer.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
RunRecord,
3535
)
3636
from dagster._core.storage.tags import PARTITION_NAME_TAG
37+
from dagster._core.types.pagination import PaginatedResults
3738
from dagster._time import get_current_datetime
3839
from dagster._utils.cached_method import cached_method
3940

@@ -644,6 +645,23 @@ def get_dynamic_partitions(self, partitions_def_name: str) -> Sequence[str]:
644645
)
645646
return self._dynamic_partitions_cache[partitions_def_name]
646647

648+
def get_paginated_dynamic_partitions(
649+
self, partitions_def_name: str, limit: int, ascending: bool, cursor: Optional[str] = None
650+
) -> PaginatedResults[str]:
651+
if partitions_def_name not in self._dynamic_partitions_cache:
652+
return self.instance.get_paginated_dynamic_partitions(
653+
partitions_def_name=partitions_def_name,
654+
limit=limit,
655+
ascending=ascending,
656+
cursor=cursor,
657+
)
658+
659+
# the full set of partition keys are cached... create a sequence connection from the cached keys
660+
partition_keys = self._dynamic_partitions_cache[partitions_def_name]
661+
return PaginatedResults.create_from_sequence(
662+
seq=partition_keys, limit=limit, ascending=ascending, cursor=cursor
663+
)
664+
647665
def has_dynamic_partition(self, partitions_def_name: str, partition_key: str) -> bool:
648666
return partition_key in self.get_dynamic_partitions(partitions_def_name)
649667

python_modules/dagster/dagster_tests/definitions_tests/test_dynamic_partitions.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,9 @@ def my_asset():
8383

8484
instance.add_dynamic_partitions("foo", ["a"])
8585
assert partitions_def.get_partition_keys(dynamic_partitions_store=instance) == ["a"]
86+
assert get_paginated_partition_keys(partitions_def, dynamic_partitions_store=instance) == [
87+
"a"
88+
]
8689
assert materialize([my_asset], instance=instance, partition_key="a").success
8790
materialization = instance.get_latest_materialization_event(AssetKey("my_asset"))
8891
assert materialization

python_modules/dagster/dagster_tests/storage_tests/utils/event_log_storage.py

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6713,3 +6713,71 @@ def test_fetch_failed_materializations(self, test_run_id, storage: EventLogStora
67136713
failed_record.asset_key == asset_key_1
67146714
for failed_record in failed_records_for_partitions.records
67156715
)
6716+
6717+
def test_dynamic_store_pagination(self, storage: EventLogStorage):
6718+
assert (
6719+
storage.get_paginated_dynamic_partitions(
6720+
partitions_def_name="foo", limit=1, ascending=True
6721+
).results
6722+
== []
6723+
)
6724+
6725+
def get_paginated_partitions(partitions_def_name, ascending=True):
6726+
all_results = []
6727+
cursor = None
6728+
has_more = True
6729+
while has_more:
6730+
results = storage.get_paginated_dynamic_partitions(
6731+
partitions_def_name=partitions_def_name,
6732+
limit=1,
6733+
ascending=ascending,
6734+
cursor=cursor,
6735+
)
6736+
cursor = results.cursor
6737+
has_more = results.has_more
6738+
all_results.extend(results.results)
6739+
return all_results
6740+
6741+
storage.add_dynamic_partitions(
6742+
partitions_def_name="foo", partition_keys=["foo", "bar", "baz"]
6743+
)
6744+
6745+
# paginated results
6746+
assert get_paginated_partitions("foo") == ["foo", "bar", "baz"]
6747+
6748+
# Test for idempotency
6749+
storage.add_dynamic_partitions(partitions_def_name="foo", partition_keys=["foo"])
6750+
assert get_paginated_partitions("foo") == ["foo", "bar", "baz"]
6751+
6752+
# add more partitions
6753+
storage.add_dynamic_partitions(partitions_def_name="foo", partition_keys=["foo", "qux"])
6754+
assert get_paginated_partitions("foo") == ["foo", "bar", "baz", "qux"]
6755+
6756+
assert get_paginated_partitions("foo", ascending=False) == ["qux", "baz", "bar", "foo"]
6757+
6758+
# partial paginated results
6759+
results = storage.get_paginated_dynamic_partitions(
6760+
partitions_def_name="foo", limit=1, ascending=True
6761+
)
6762+
assert results.results == ["foo"]
6763+
assert results.cursor
6764+
post_foo_cursor = results.cursor
6765+
assert results.has_more
6766+
6767+
# partial reverse paginated results
6768+
results = storage.get_paginated_dynamic_partitions(
6769+
partitions_def_name="foo", limit=1, ascending=False
6770+
)
6771+
assert results.results == ["qux"]
6772+
assert results.cursor
6773+
assert results.has_more
6774+
6775+
# try cursored fetching when the keys before and after the cursor are removed
6776+
storage.delete_dynamic_partition(partitions_def_name="foo", partition_key="foo")
6777+
storage.delete_dynamic_partition(partitions_def_name="foo", partition_key="bar")
6778+
assert storage.get_paginated_dynamic_partitions(
6779+
partitions_def_name="foo", limit=1, ascending=True, cursor=post_foo_cursor
6780+
).results == ["baz"]
6781+
assert storage.get_paginated_dynamic_partitions(
6782+
partitions_def_name="foo", limit=1, ascending=True
6783+
).results == ["baz"]

0 commit comments

Comments
 (0)