Skip to content

Commit c56de48

Browse files
committed
remove new_freshness_policy everywhere
1 parent bdea168 commit c56de48

File tree

11 files changed

+66
-205
lines changed

11 files changed

+66
-205
lines changed

python_modules/dagster/dagster/_core/definitions/asset_graph.py

-6
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
from dagster._core.definitions.events import AssetKey
2525
from dagster._core.definitions.freshness_policy import FreshnessPolicy
2626
from dagster._core.definitions.metadata import ArbitraryMetadataMapping
27-
from dagster._core.definitions.new_freshness_policy import NewFreshnessPolicy
2827
from dagster._core.definitions.partition import PartitionsDefinition
2928
from dagster._core.definitions.partition_mapping import PartitionMapping
3029
from dagster._core.definitions.resolved_asset_deps import ResolvedAssetDependencies
@@ -155,11 +154,6 @@ def execution_set_entity_keys(self) -> AbstractSet[EntityKey]:
155154
else:
156155
return self.assets_def.asset_and_check_keys
157156

158-
@property
159-
def new_freshness_policy(self) -> Optional[NewFreshnessPolicy]:
160-
"""Experimental, do not use."""
161-
return self._spec.new_freshness_policy
162-
163157
##### ASSET GRAPH SPECIFIC INTERFACE
164158

165159
@property

python_modules/dagster/dagster/_core/definitions/asset_out.py

-8
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
)
1818
from dagster._core.definitions.freshness_policy import FreshnessPolicy
1919
from dagster._core.definitions.input import NoValueSentinel
20-
from dagster._core.definitions.new_freshness_policy import NewFreshnessPolicy
2120
from dagster._core.definitions.output import Out
2221
from dagster._core.definitions.partition import PartitionsDefinition
2322
from dagster._core.definitions.utils import resolve_automation_condition
@@ -210,11 +209,6 @@ def tags(self) -> Optional[Mapping[str, str]]:
210209
def kinds(self) -> Optional[set[str]]:
211210
return self._spec.kinds
212211

213-
@property
214-
def new_freshness_policy(self) -> Optional[NewFreshnessPolicy]:
215-
"""Experimental, do not use."""
216-
return self._spec.new_freshness_policy
217-
218212
def to_out(self) -> Out:
219213
return Out(
220214
dagster_type=self.dagster_type,
@@ -231,15 +225,13 @@ def to_spec(
231225
deps: Sequence[AssetDep],
232226
additional_tags: Mapping[str, str] = {},
233227
partitions_def: Optional[PartitionsDefinition] = ...,
234-
new_freshness_policy: Optional[NewFreshnessPolicy] = ...,
235228
) -> AssetSpec:
236229
return self._spec.replace_attributes(
237230
key=key,
238231
tags={**additional_tags, **self.tags} if self.tags else additional_tags,
239232
kinds=self.kinds,
240233
deps=[*self._spec.deps, *deps],
241234
partitions_def=partitions_def if partitions_def is not None else ...,
242-
new_freshness_policy=new_freshness_policy if new_freshness_policy is not None else ...,
243235
)
244236

245237
@public

python_modules/dagster/dagster/_core/definitions/asset_spec.py

+5-23
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
import dagster._check as check
1717
from dagster._annotations import (
1818
PublicAttr,
19-
beta_param,
2019
hidden_param,
2120
only_allow_hidden_params_in_kwargs,
2221
public,
@@ -27,11 +26,11 @@
2726
AutomationCondition,
2827
)
2928
from dagster._core.definitions.events import AssetKey, CoercibleToAssetKey
30-
from dagster._core.definitions.freshness_policy import FreshnessPolicy
31-
from dagster._core.definitions.new_freshness_policy import (
29+
from dagster._core.definitions.freshness import (
3230
INTERNAL_FRESHNESS_POLICY_METADATA_KEY,
33-
NewFreshnessPolicy,
31+
InternalFreshnessPolicy,
3432
)
33+
from dagster._core.definitions.freshness_policy import FreshnessPolicy
3534
from dagster._core.definitions.partition import PartitionsDefinition
3635
from dagster._core.definitions.partition_mapping import PartitionMapping
3736
from dagster._core.definitions.utils import (
@@ -103,10 +102,6 @@ def validate_kind_tags(kinds: Optional[AbstractSet[str]]) -> None:
103102
raise DagsterInvalidDefinitionError("Assets can have at most three kinds currently.")
104103

105104

106-
@beta_param(
107-
param="new_freshness_policy",
108-
additional_warn_text="Currently experimental. Use freshness checks instead to define asset freshness.",
109-
)
110105
@hidden_param(
111106
param="freshness_policy",
112107
breaking_version="1.10.0",
@@ -154,7 +149,6 @@ class AssetSpec(IHasInternalInit, IHaveNew, LegacyNamedTupleMixin):
154149
will be made visible in the Dagster UI.
155150
partitions_def (Optional[PartitionsDefinition]): Defines the set of partition keys that
156151
compose the asset.
157-
new_freshness_policy (Optional[NewFreshnessPolicy]): A condition that delineates when an asset is considered fresh. Currently experimental.
158152
"""
159153

160154
key: PublicAttr[AssetKey]
@@ -169,7 +163,6 @@ class AssetSpec(IHasInternalInit, IHaveNew, LegacyNamedTupleMixin):
169163
owners: PublicAttr[Sequence[str]]
170164
tags: PublicAttr[Mapping[str, str]]
171165
partitions_def: PublicAttr[Optional[PartitionsDefinition]]
172-
new_freshness_policy: Optional[NewFreshnessPolicy]
173166

174167
def __new__(
175168
cls,
@@ -186,7 +179,6 @@ def __new__(
186179
tags: Optional[Mapping[str, str]] = None,
187180
kinds: Optional[set[str]] = None,
188181
partitions_def: Optional[PartitionsDefinition] = None,
189-
new_freshness_policy: Optional[NewFreshnessPolicy] = None,
190182
**kwargs,
191183
):
192184
from dagster._core.definitions.asset_dep import coerce_to_deps_and_check_duplicates
@@ -212,7 +204,7 @@ def __new__(
212204
}
213205
validate_kind_tags(kind_tags)
214206

215-
internal_freshness_policy: Optional[NewFreshnessPolicy] = kwargs.get(
207+
internal_freshness_policy: Optional[InternalFreshnessPolicy] = kwargs.get(
216208
"internal_freshness_policy"
217209
)
218210
if internal_freshness_policy:
@@ -256,9 +248,6 @@ def __new__(
256248
partitions_def=check.opt_inst_param(
257249
partitions_def, "partitions_def", PartitionsDefinition
258250
),
259-
new_freshness_policy=check.opt_inst_param(
260-
new_freshness_policy, "new_freshness_policy", NewFreshnessPolicy
261-
),
262251
)
263252

264253
@staticmethod
@@ -293,7 +282,6 @@ def dagster_internal_init(
293282
tags=tags,
294283
kinds=kinds,
295284
partitions_def=partitions_def,
296-
new_freshness_policy=kwargs.get("new_freshness_policy"),
297285
)
298286

299287
@cached_property
@@ -333,7 +321,6 @@ def with_io_manager_key(self, io_manager_key: str) -> "AssetSpec":
333321
)
334322

335323
@public
336-
@beta_param(param="new_freshness_policy", additional_warn_text="Currently experimental.")
337324
def replace_attributes(
338325
self,
339326
*,
@@ -350,7 +337,6 @@ def replace_attributes(
350337
kinds: Optional[set[str]] = ...,
351338
partitions_def: Optional[PartitionsDefinition] = ...,
352339
freshness_policy: Optional[FreshnessPolicy] = ...,
353-
new_freshness_policy: Optional[NewFreshnessPolicy] = ...,
354340
) -> "AssetSpec":
355341
"""Returns a new AssetSpec with the specified attributes replaced."""
356342
current_tags_without_kinds = {
@@ -375,9 +361,6 @@ def replace_attributes(
375361
tags=tags if tags is not ... else current_tags_without_kinds,
376362
kinds=kinds if kinds is not ... else self.kinds,
377363
partitions_def=partitions_def if partitions_def is not ... else self.partitions_def,
378-
new_freshness_policy=new_freshness_policy
379-
if new_freshness_policy is not ...
380-
else self.new_freshness_policy,
381364
)
382365

383366
@public
@@ -425,7 +408,6 @@ def merge_attributes(
425408
tags={**current_tags_without_kinds, **(tags if tags is not ... else {})},
426409
kinds={*self.kinds, *(kinds if kinds is not ... else {})},
427410
partitions_def=self.partitions_def,
428-
new_freshness_policy=self.new_freshness_policy,
429411
)
430412

431413

@@ -482,6 +464,6 @@ def map_asset_specs(
482464
]
483465

484466

485-
def attach_internal_freshness_policy(spec: AssetSpec, policy: NewFreshnessPolicy) -> AssetSpec:
467+
def attach_internal_freshness_policy(spec: AssetSpec, policy: InternalFreshnessPolicy) -> AssetSpec:
486468
"""Apply a freshness policy to an asset spec, attaching it to the spec's metadata."""
487469
return spec.merge_attributes(metadata={INTERNAL_FRESHNESS_POLICY_METADATA_KEY: str(policy)})

python_modules/dagster/dagster/_core/definitions/assets.py

-29
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@
3838
from dagster._core.definitions.freshness_policy import FreshnessPolicy
3939
from dagster._core.definitions.metadata import ArbitraryMetadataMapping
4040
from dagster._core.definitions.multi_dimensional_partitions import MultiPartitionsDefinition
41-
from dagster._core.definitions.new_freshness_policy import NewFreshnessPolicy
4241
from dagster._core.definitions.node_definition import NodeDefinition
4342
from dagster._core.definitions.op_definition import OpDefinition
4443
from dagster._core.definitions.op_invocation import direct_invocation_result
@@ -103,7 +102,6 @@ class AssetsDefinition(ResourceAddable, IHasInternalInit):
103102
"asset_deps",
104103
"owners_by_key",
105104
"partitions_def",
106-
"new_freshness_policy_by_key",
107105
}
108106

109107
# partition mappings are also tracked inside the AssetSpecs, but this enables faster access by
@@ -152,7 +150,6 @@ def __init__(
152150
auto_materialize_policies_by_key: Optional[Mapping[AssetKey, AutoMaterializePolicy]] = None,
153151
# if adding new fields, make sure to handle them in the with_attributes, from_graph,
154152
# from_op, and get_attributes_dict methods
155-
new_freshness_policy_by_key: Optional[Mapping[AssetKey, NewFreshnessPolicy]] = None,
156153
):
157154
from dagster._core.definitions.graph_definition import GraphDefinition
158155
from dagster._core.execution.build_resources import wrap_resources_for_execution
@@ -241,7 +238,6 @@ def __init__(
241238
check.invariant(partition_mappings is None)
242239
check.invariant(asset_deps is None)
243240
check.invariant(partitions_def is None)
244-
check.invariant(new_freshness_policy_by_key is None)
245241
resolved_specs = specs
246242

247243
else:
@@ -282,7 +278,6 @@ def __init__(
282278
descriptions_by_key=descriptions_by_key,
283279
code_versions_by_key=None,
284280
partitions_def=partitions_def,
285-
new_freshness_policy_by_key=new_freshness_policy_by_key,
286281
)
287282

288283
normalized_specs: list[AssetSpec] = []
@@ -406,7 +401,6 @@ def __call__(self, *args: object, **kwargs: object) -> object:
406401

407402
@public
408403
@beta_param(param="resource_defs")
409-
@beta_param(param="new_freshness_policy_by_output_name")
410404
@staticmethod
411405
def from_graph(
412406
graph_def: "GraphDefinition",
@@ -436,9 +430,6 @@ def from_graph(
436430
auto_materialize_policies_by_output_name: Optional[
437431
Mapping[str, Optional[AutoMaterializePolicy]]
438432
] = None,
439-
new_freshness_policy_by_output_name: Optional[
440-
Mapping[str, Optional[NewFreshnessPolicy]]
441-
] = None,
442433
) -> "AssetsDefinition":
443434
"""Constructs an AssetsDefinition from a GraphDefinition.
444435
@@ -497,10 +488,6 @@ def from_graph(
497488
backfill_policy (Optional[BackfillPolicy]): Defines this asset's BackfillPolicy
498489
owners_by_key (Optional[Mapping[AssetKey, Sequence[str]]]): Defines
499490
owners to be associated with each of the asset keys for this node.
500-
new_freshness_policy_by_output_name (Optional[Mapping[str, Optional[NewFreshnessPolicy]]]): Defines asset
501-
freshness conditions to be associated with some or all of the output assets for this node.
502-
Currently experimental, it is intended to replace the existing, deprecated `FreshnessPolicy` construct.
503-
For now, continue to use freshness checks to define asset freshness.
504491
"""
505492
return AssetsDefinition._from_node(
506493
node_def=graph_def,
@@ -526,7 +513,6 @@ def from_graph(
526513
check_specs=check_specs,
527514
owners_by_output_name=owners_by_output_name,
528515
code_versions_by_output_name=code_versions_by_output_name,
529-
new_freshness_policy_by_output_name=new_freshness_policy_by_output_name,
530516
)
531517

532518
@public
@@ -655,9 +641,6 @@ def _from_node(
655641
can_subset: bool = False,
656642
check_specs: Optional[Sequence[AssetCheckSpec]] = None,
657643
owners_by_output_name: Optional[Mapping[str, Sequence[str]]] = None,
658-
new_freshness_policy_by_output_name: Optional[
659-
Mapping[str, Optional[NewFreshnessPolicy]]
660-
] = None,
661644
) -> "AssetsDefinition":
662645
from dagster._core.definitions.decorators.decorator_assets_definition_builder import (
663646
_validate_check_specs_target_relevant_asset_keys,
@@ -762,9 +745,6 @@ def _output_dict_to_asset_dict(
762745
descriptions_by_key=_output_dict_to_asset_dict(descriptions_by_output_name),
763746
code_versions_by_key=_output_dict_to_asset_dict(code_versions_by_output_name),
764747
partitions_def=partitions_def,
765-
new_freshness_policy_by_key=_output_dict_to_asset_dict(
766-
new_freshness_policy_by_output_name
767-
),
768748
)
769749

770750
return AssetsDefinition.dagster_internal_init(
@@ -1728,7 +1708,6 @@ def _asset_specs_from_attr_key_params(
17281708
descriptions_by_key: Optional[Mapping[AssetKey, str]],
17291709
owners_by_key: Optional[Mapping[AssetKey, Sequence[str]]],
17301710
partitions_def: Optional[PartitionsDefinition],
1731-
new_freshness_policy_by_key: Optional[Mapping[AssetKey, NewFreshnessPolicy]],
17321711
) -> Sequence[AssetSpec]:
17331712
validated_group_names_by_key = check.opt_mapping_param(
17341713
group_names_by_key, "group_names_by_key", key_type=AssetKey, value_type=str
@@ -1764,13 +1743,6 @@ def _asset_specs_from_attr_key_params(
17641743
value_type=AutomationCondition,
17651744
)
17661745

1767-
validated_new_freshness_policy_by_key = check.opt_mapping_param(
1768-
new_freshness_policy_by_key,
1769-
"new_freshness_policy_by_key",
1770-
key_type=AssetKey,
1771-
value_type=NewFreshnessPolicy,
1772-
)
1773-
17741746
validated_owners_by_key = check.opt_mapping_param(
17751747
owners_by_key, "owners_by_key", key_type=AssetKey, value_type=list
17761748
)
@@ -1812,7 +1784,6 @@ def _asset_specs_from_attr_key_params(
18121784
partitions_def=check.opt_inst_param(
18131785
partitions_def, "partitions_def", PartitionsDefinition
18141786
),
1815-
new_freshness_policy=validated_new_freshness_policy_by_key.get(key),
18161787
)
18171788
)
18181789

python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py

-8
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@
3737
from dagster._core.definitions.freshness_policy import FreshnessPolicy
3838
from dagster._core.definitions.input import GraphIn
3939
from dagster._core.definitions.metadata import ArbitraryMetadataMapping, RawMetadataMapping
40-
from dagster._core.definitions.new_freshness_policy import NewFreshnessPolicy
4140
from dagster._core.definitions.output import GraphOut
4241
from dagster._core.definitions.partition import PartitionsDefinition
4342
from dagster._core.definitions.policy import RetryPolicy
@@ -91,7 +90,6 @@ def asset(
9190
owners: Optional[Sequence[str]] = ...,
9291
kinds: Optional[AbstractSet[str]] = ...,
9392
pool: Optional[str] = ...,
94-
new_freshness_policy: Optional[NewFreshnessPolicy] = ...,
9593
**kwargs,
9694
) -> Callable[[Callable[..., Any]], AssetsDefinition]: ...
9795

@@ -120,7 +118,6 @@ def _validate_hidden_non_argument_dep_param(
120118
@beta_param(param="resource_defs")
121119
@beta_param(param="io_manager_def")
122120
@beta_param(param="backfill_policy")
123-
@beta_param(param="new_freshness_policy")
124121
@hidden_param(
125122
param="non_argument_deps",
126123
breaking_version="2.0.0",
@@ -170,7 +167,6 @@ def asset(
170167
owners: Optional[Sequence[str]] = None,
171168
kinds: Optional[AbstractSet[str]] = None,
172169
pool: Optional[str] = None,
173-
new_freshness_policy: Optional[NewFreshnessPolicy] = None,
174170
**kwargs,
175171
) -> Union[AssetsDefinition, Callable[[Callable[..., Any]], AssetsDefinition]]:
176172
"""Create a definition for how to compute an asset.
@@ -329,7 +325,6 @@ def downstream_asset(conditional_asset):
329325
key=key,
330326
owners=owners,
331327
pool=pool,
332-
new_freshness_policy=new_freshness_policy,
333328
)
334329

335330
if compute_fn is not None:
@@ -403,7 +398,6 @@ class AssetDecoratorArgs(NamedTuple):
403398
check_specs: Optional[Sequence[AssetCheckSpec]]
404399
owners: Optional[Sequence[str]]
405400
pool: Optional[str]
406-
new_freshness_policy: Optional[NewFreshnessPolicy]
407401

408402

409403
class ResourceRelatedState(NamedTuple):
@@ -529,7 +523,6 @@ def create_assets_def_from_fn_and_decorator_args(
529523
decorator_name="@asset",
530524
execution_type=AssetExecutionType.MATERIALIZATION,
531525
pool=args.pool,
532-
new_freshness_policy=args.new_freshness_policy,
533526
)
534527

535528
builder = DecoratorAssetsDefinitionBuilder.from_asset_outs_in_asset_centric_decorator(
@@ -709,7 +702,6 @@ def my_function(asset0):
709702
decorator_name="@multi_asset",
710703
execution_type=AssetExecutionType.MATERIALIZATION,
711704
pool=pool,
712-
new_freshness_policy=None,
713705
)
714706

715707
def inner(fn: Callable[..., Any]) -> AssetsDefinition:

python_modules/dagster/dagster/_core/definitions/decorators/decorator_assets_definition_builder.py

-3
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
from dagster._core.definitions.backfill_policy import BackfillPolicy
2828
from dagster._core.definitions.decorators.op_decorator import _Op
2929
from dagster._core.definitions.input import In
30-
from dagster._core.definitions.new_freshness_policy import NewFreshnessPolicy
3130
from dagster._core.definitions.op_definition import OpDefinition
3231
from dagster._core.definitions.output import Out
3332
from dagster._core.definitions.partition import PartitionsDefinition
@@ -246,7 +245,6 @@ class DecoratorAssetsDefinitionBuilderArgs(NamedTuple):
246245
upstream_asset_deps: Optional[Iterable[AssetDep]]
247246
execution_type: Optional[AssetExecutionType]
248247
pool: Optional[str]
249-
new_freshness_policy: Optional[NewFreshnessPolicy] = None
250248

251249
@property
252250
def check_specs(self) -> Sequence[AssetCheckSpec]:
@@ -673,7 +671,6 @@ def _synthesize_specs(self) -> Sequence[AssetSpec]:
673671
key,
674672
deps=deps,
675673
partitions_def=self.args.partitions_def,
676-
new_freshness_policy=self.args.new_freshness_policy,
677674
)
678675
)
679676

0 commit comments

Comments
 (0)