File tree 3 files changed +33
-0
lines changed
3 files changed +33
-0
lines changed Original file line number Diff line number Diff line change 27
27
import os
28
28
import sys
29
29
import types
30
+ import warnings
30
31
from pathlib import Path
31
32
from typing import TYPE_CHECKING , Any , Iterable
32
33
@@ -431,6 +432,17 @@ def initialize_ti_deps_plugins():
431
432
registered_ti_dep_classes = {}
432
433
433
434
for plugin in plugins :
435
+ if not plugin .ti_deps :
436
+ continue
437
+
438
+ from airflow .exceptions import RemovedInAirflow3Warning
439
+
440
+ warnings .warn (
441
+ "Using custom `ti_deps` on operators has been removed in Airflow 3.0" ,
442
+ RemovedInAirflow3Warning ,
443
+ stacklevel = 1 ,
444
+ )
445
+
434
446
registered_ti_dep_classes .update (
435
447
{qualname (ti_dep .__class__ ): ti_dep .__class__ for ti_dep in plugin .ti_deps }
436
448
)
Original file line number Diff line number Diff line change 28
28
29
29
import pytest
30
30
31
+ from airflow .exceptions import RemovedInAirflow3Warning
31
32
from airflow .hooks .base import BaseHook
32
33
from airflow .listeners .listener import get_listener_manager
33
34
from airflow .plugins_manager import AirflowPlugin
@@ -174,6 +175,11 @@ def clean_plugins(self):
174
175
175
176
plugins_manager .loaded_plugins = set ()
176
177
plugins_manager .plugins = []
178
+ yield
179
+ plugins_manager .loaded_plugins = set ()
180
+
181
+ plugins_manager .registered_ti_dep_classes = None
182
+ plugins_manager .plugins = None
177
183
178
184
def test_no_log_when_no_plugins (self , caplog ):
179
185
with mock_plugin_manager (plugins = []):
@@ -270,6 +276,17 @@ class AirflowAdminMenuLinksPlugin(AirflowPlugin):
270
276
),
271
277
]
272
278
279
+ def test_deprecate_ti_deps (self ):
280
+ class DeprecatedTIDeps (AirflowPlugin ):
281
+ name = "ti_deps"
282
+
283
+ ti_deps = [mock .MagicMock ()]
284
+
285
+ with mock_plugin_manager (plugins = [DeprecatedTIDeps ()]), pytest .warns (RemovedInAirflow3Warning ):
286
+ from airflow import plugins_manager
287
+
288
+ plugins_manager .initialize_ti_deps_plugins ()
289
+
273
290
def test_should_not_warning_about_fab_plugins (self , caplog ):
274
291
class AirflowAdminViewsPlugin (AirflowPlugin ):
275
292
name = "test_admin_views_plugin"
Original file line number Diff line number Diff line change @@ -430,6 +430,7 @@ def setup_test_cases(self):
430
430
)
431
431
432
432
@pytest .mark .db_test
433
+ @pytest .mark .filterwarnings ("ignore::airflow.exceptions.RemovedInAirflow3Warning" )
433
434
def test_serialization (self ):
434
435
"""Serialization and deserialization should work for every DAG and Operator."""
435
436
dags = collect_dags ()
@@ -539,6 +540,7 @@ def sorted_serialized_dag(dag_dict: dict):
539
540
return actual , expected
540
541
541
542
@pytest .mark .db_test
543
+ @pytest .mark .filterwarnings ("ignore::airflow.exceptions.RemovedInAirflow3Warning" )
542
544
def test_deserialization_across_process (self ):
543
545
"""A serialized DAG can be deserialized in another process."""
544
546
@@ -1596,6 +1598,7 @@ def test_deps_sorted(self):
1596
1598
"airflow.ti_deps.deps.trigger_rule_dep.TriggerRuleDep" ,
1597
1599
]
1598
1600
1601
+ @pytest .mark .filterwarnings ("ignore::airflow.exceptions.RemovedInAirflow3Warning" )
1599
1602
def test_error_on_unregistered_ti_dep_serialization (self ):
1600
1603
# trigger rule not registered through the plugin system will not be serialized
1601
1604
class DummyTriggerRule (BaseTIDep ):
@@ -1634,6 +1637,7 @@ def test_error_on_unregistered_ti_dep_deserialization(self):
1634
1637
SerializedBaseOperator .deserialize_operator (serialize_op )
1635
1638
1636
1639
@pytest .mark .db_test
1640
+ @pytest .mark .filterwarnings ("ignore::airflow.exceptions.RemovedInAirflow3Warning" )
1637
1641
def test_serialize_and_deserialize_custom_ti_deps (self ):
1638
1642
from test_plugin import CustomTestTriggerRule
1639
1643
You can’t perform that action at this time.
0 commit comments