-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Description
Description
In Airflow 2.x we were able to bulk-read all Variables from the metastore using ORM:
from airflow.utils.session import create_session
from airflow.models import Variable
with create_session() as session:
airflow_vars_from_db = session.query(Variable).all()In Airflow 3.x this pattern fails (in our case the error happens during DAG parsing) because direct access to the metastore via the ORM is not allowed and results in:
RuntimeError: Direct database access via the ORM is not allowed in Airflow 3.0
Task SDK provides Variable.get(key) but does not provide any supported way to list Variable keys. This makes it impossible to implement patterns where variable keys must be discovered dynamically (e.g. list all keys or keys by prefix) when the list of keys is not known ahead of time.
Use case/motivation
We have multiple DAGs configured via many Airflow Variables created/updated by external systems. The set of variable keys is not known at DAG authoring time and can change over time.
We want to collect Variables at DAG parse time and pass the collected “snapshot” into a task (e.g. PythonOperator) via op_kwargs, so the task can operate on a consistent set of configuration values.
Minimal example showing the desired pattern:
from __future__ import annotations
from pendulum import datetime
from airflow.sdk import dag, Variable
from airflow.providers.standard.operators.python import PythonOperator
def use_vars_snapshot(vars_snapshot: dict[str, str], **context):
print(f"Loaded {len(vars_snapshot)} variables")
@dag(start_date=datetime(2025, 1, 1), schedule=None, catchup=False)
def demo_pythonoperator_pass_all_vars():
# Desired: discover ALL Variable keys (or keys by prefix) during DAG parsing.
# Problem: Task SDK has only Variable.get(key) and no supported key listing API.
all_keys = ??? # keys unknown ahead of time; no supported way to discover them
vars_snapshot = {k: Variable.get(k) for k in all_keys}
PythonOperator(
task_id="use_vars",
python_callable=use_vars_snapshot,
op_kwargs={"vars_snapshot": vars_snapshot},
)
demo_pythonoperator_pass_all_vars()Workarounds tried
Public REST API v2 GET /api/v2/variables:
- introduces a network dependency and pagination logic
Proposed solution
Provide a supported API to list Variable keys and/or bulk retrieval of values
Environment
Apache Airflow 3.1.0
Related issues
No response
Are you willing to submit a PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct