Skip to content

Commit

Permalink
Issue deprecation warning for plugins registering ti_deps (#45742)
Browse files Browse the repository at this point in the history
This is removed in Airflow3 via #45713
  • Loading branch information
ashb authored Jan 29, 2025
1 parent a5726a5 commit a2f302d
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 0 deletions.
12 changes: 12 additions & 0 deletions airflow/plugins_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import os
import sys
import types
import warnings
from pathlib import Path
from typing import TYPE_CHECKING, Any, Iterable

Expand Down Expand Up @@ -431,6 +432,17 @@ def initialize_ti_deps_plugins():
registered_ti_dep_classes = {}

for plugin in plugins:
if not plugin.ti_deps:
continue

from airflow.exceptions import RemovedInAirflow3Warning

warnings.warn(
"Using custom `ti_deps` on operators has been removed in Airflow 3.0",
RemovedInAirflow3Warning,
stacklevel=1,
)

registered_ti_dep_classes.update(
{qualname(ti_dep.__class__): ti_dep.__class__ for ti_dep in plugin.ti_deps}
)
Expand Down
17 changes: 17 additions & 0 deletions tests/plugins/test_plugins_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import pytest

from airflow.exceptions import RemovedInAirflow3Warning
from airflow.hooks.base import BaseHook
from airflow.listeners.listener import get_listener_manager
from airflow.plugins_manager import AirflowPlugin
Expand Down Expand Up @@ -174,6 +175,11 @@ def clean_plugins(self):

plugins_manager.loaded_plugins = set()
plugins_manager.plugins = []
yield
plugins_manager.loaded_plugins = set()

plugins_manager.registered_ti_dep_classes = None
plugins_manager.plugins = None

def test_no_log_when_no_plugins(self, caplog):
with mock_plugin_manager(plugins=[]):
Expand Down Expand Up @@ -270,6 +276,17 @@ class AirflowAdminMenuLinksPlugin(AirflowPlugin):
),
]

def test_deprecate_ti_deps(self):
class DeprecatedTIDeps(AirflowPlugin):
name = "ti_deps"

ti_deps = [mock.MagicMock()]

with mock_plugin_manager(plugins=[DeprecatedTIDeps()]), pytest.warns(RemovedInAirflow3Warning):
from airflow import plugins_manager

plugins_manager.initialize_ti_deps_plugins()

def test_should_not_warning_about_fab_plugins(self, caplog):
class AirflowAdminViewsPlugin(AirflowPlugin):
name = "test_admin_views_plugin"
Expand Down
20 changes: 20 additions & 0 deletions tests/serialization/test_dag_serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,21 @@ def timetable_plugin(monkeypatch):
)


@pytest.fixture
def custom_ti_dep(monkeypatch):
"""Patch plugins manager to always and only return our custom timetable."""
from test_plugin import CustomTestTriggerRule

from airflow import plugins_manager

monkeypatch.setattr(plugins_manager, "initialize_ti_deps_plugins", lambda: None)
monkeypatch.setattr(
plugins_manager,
"registered_ti_dep_classes",
{"test_plugin.CustomTestTriggerRule": CustomTestTriggerRule},
)


# TODO: (potiuk) - AIP-44 - check why this test hangs
@pytest.mark.skip_if_database_isolation_mode
class TestStringifiedDAGs:
Expand All @@ -430,6 +445,7 @@ def setup_test_cases(self):
)

@pytest.mark.db_test
@pytest.mark.filterwarnings("ignore::airflow.exceptions.RemovedInAirflow3Warning")
def test_serialization(self):
"""Serialization and deserialization should work for every DAG and Operator."""
dags = collect_dags()
Expand Down Expand Up @@ -539,6 +555,7 @@ def sorted_serialized_dag(dag_dict: dict):
return actual, expected

@pytest.mark.db_test
@pytest.mark.filterwarnings("ignore::airflow.exceptions.RemovedInAirflow3Warning")
def test_deserialization_across_process(self):
"""A serialized DAG can be deserialized in another process."""

Expand Down Expand Up @@ -1596,6 +1613,7 @@ def test_deps_sorted(self):
"airflow.ti_deps.deps.trigger_rule_dep.TriggerRuleDep",
]

@pytest.mark.filterwarnings("ignore::airflow.exceptions.RemovedInAirflow3Warning")
def test_error_on_unregistered_ti_dep_serialization(self):
# trigger rule not registered through the plugin system will not be serialized
class DummyTriggerRule(BaseTIDep):
Expand Down Expand Up @@ -1634,6 +1652,8 @@ def test_error_on_unregistered_ti_dep_deserialization(self):
SerializedBaseOperator.deserialize_operator(serialize_op)

@pytest.mark.db_test
@pytest.mark.usefixtures("custom_ti_dep")
@pytest.mark.filterwarnings("ignore::airflow.exceptions.RemovedInAirflow3Warning")
def test_serialize_and_deserialize_custom_ti_deps(self):
from test_plugin import CustomTestTriggerRule

Expand Down

0 comments on commit a2f302d

Please sign in to comment.