Skip to content

Commit f969e63

Browse files
committed
Fix connection access in triggerer for deferrable operators (#57154)
When deferrable operators run in the triggerer's async event loop and synchronously access connections (e.g., via @cached_property), the `ExecutionAPISecretsBackend` failed silently. This occurred because `SUPERVISOR_COMMS.send()` uses `async_to_sync`, which raises `RuntimeError` when called within an existing event loop in a greenback portal context. Add specific RuntimeError handling in `ExecutionAPISecretsBackend` that detects this scenario and uses `greenback.await_()` to call the async versions (aget_connection/aget_variable) as a fallback. It was originally fixed in #55799 for 3.1.0 but #56602 introduced a bug. Ideally all providers handle this better and have better written Triggers. Example PR for Databricks: #55568 Fixes #57145 (cherry picked from commit da32b68)
1 parent 51817f0 commit f969e63

File tree

3 files changed

+58
-19
lines changed

3 files changed

+58
-19
lines changed

task-sdk/src/airflow/sdk/definitions/connection.py

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
# under the License.
1818
from __future__ import annotations
1919

20-
import asyncio
2120
import json
2221
import logging
2322
from json import JSONDecodeError
@@ -226,24 +225,6 @@ def get(cls, conn_id: str) -> Any:
226225
return _get_connection(conn_id)
227226
except AirflowRuntimeError as e:
228227
cls._handle_connection_error(e, conn_id)
229-
except RuntimeError as e:
230-
# The error from async_to_sync is a RuntimeError, so we have to fall back to text matching
231-
if str(e).startswith("You cannot use AsyncToSync in the same thread as an async event loop"):
232-
import greenback
233-
234-
task = asyncio.current_task()
235-
if greenback.has_portal(task):
236-
import warnings
237-
238-
warnings.warn(
239-
"You should not use sync calls here -- use `await Conn.async_get` instead",
240-
stacklevel=2,
241-
)
242-
243-
return greenback.await_(cls.async_get(conn_id))
244-
245-
log.exception("async_to_sync failed")
246-
raise
247228

248229
@classmethod
249230
async def async_get(cls, conn_id: str) -> Any:

task-sdk/src/airflow/sdk/execution_time/secrets/execution_api.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,26 @@ def get_connection(self, conn_id: str) -> Connection | None: # type: ignore[ove
6363

6464
# Convert ExecutionAPI response to SDK Connection
6565
return _process_connection_result_conn(msg)
66+
except RuntimeError as e:
67+
# TriggerCommsDecoder.send() uses async_to_sync internally, which raises RuntimeError
68+
# when called within an async event loop. In greenback portal contexts (triggerer),
69+
# we catch this and use greenback to call the async version instead.
70+
if str(e).startswith("You cannot use AsyncToSync in the same thread as an async event loop"):
71+
import asyncio
72+
73+
import greenback
74+
75+
task = asyncio.current_task()
76+
if greenback.has_portal(task):
77+
import warnings
78+
79+
warnings.warn(
80+
"You should not use sync calls here -- use `await aget_connection` instead",
81+
stacklevel=2,
82+
)
83+
return greenback.await_(self.aget_connection(conn_id))
84+
# Fall through to the general exception handler for other RuntimeErrors
85+
return None
6686
except Exception:
6787
# If SUPERVISOR_COMMS fails for any reason, return None
6888
# to allow fallback to other backends

task-sdk/tests/task_sdk/execution_time/test_secrets.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import pytest
2121

22+
from airflow.sdk.definitions.connection import Connection
2223
from airflow.sdk.execution_time.secrets.execution_api import ExecutionAPISecretsBackend
2324

2425

@@ -120,6 +121,43 @@ def test_get_conn_value_not_implemented(self):
120121
with pytest.raises(NotImplementedError, match="Use get_connection instead"):
121122
backend.get_conn_value("test_conn")
122123

124+
def test_runtime_error_triggers_greenback_fallback(self, mocker, mock_supervisor_comms):
125+
"""
126+
Test that RuntimeError from async_to_sync triggers greenback fallback.
127+
128+
This test verifies the fix for issue #57145: when SUPERVISOR_COMMS.send()
129+
raises the specific RuntimeError about async_to_sync in an event loop,
130+
the backend catches it and uses greenback to call aget_connection().
131+
"""
132+
133+
# Expected connection to be returned
134+
expected_conn = Connection(
135+
conn_id="databricks_default",
136+
conn_type="databricks",
137+
host="example.databricks.com",
138+
)
139+
140+
# Simulate the RuntimeError that triggers greenback fallback
141+
mock_supervisor_comms.send.side_effect = RuntimeError(
142+
"You cannot use AsyncToSync in the same thread as an async event loop"
143+
)
144+
145+
# Mock the greenback and asyncio modules that are imported inside the exception handler
146+
mocker.patch("greenback.has_portal", return_value=True)
147+
mock_greenback_await = mocker.patch("greenback.await_", return_value=expected_conn)
148+
mocker.patch("asyncio.current_task")
149+
150+
backend = ExecutionAPISecretsBackend()
151+
conn = backend.get_connection("databricks_default")
152+
153+
# Verify we got the expected connection
154+
assert conn is not None
155+
assert conn.conn_id == "databricks_default"
156+
# Verify the greenback fallback was called
157+
mock_greenback_await.assert_called_once()
158+
# Verify send was attempted first (and raised RuntimeError)
159+
mock_supervisor_comms.send.assert_called_once()
160+
123161

124162
class TestContextDetection:
125163
"""Test context detection in ensure_secrets_backend_loaded."""

0 commit comments

Comments
 (0)