Description
Apache Airflow version
Other Airflow 2 version (please specify below)
What happened
This issue happens in Airflow 2.6.1 with Postgres 13.
I have enabled the DAG, and the scheduled run executed successfully.
However, backfilling causes a variety of issues.
Here is the initial command I ran: airflow dags backfill stale_data_test --start-date 2023-07-01 --end-date 2023-08-01 -B
Sometimes I get the following:
[2023-09-01T12:51:02.784+0000] {dagbag.py:541} INFO - Filling up the DagBag from /usr/local/airflow/dags/sem/stale_data_test.py
[2023-09-01T12:51:02.785+0000] {dagbag.py:541} INFO - Filling up the DagBag from /usr/local/airflow/dags/sem/stale_data_test.py
[2023-09-01T12:51:02.787+0000] {local_executor.py:135} ERROR - Failed to execute task No map_index passed to mapped task.
Traceback (most recent call last):
File "/usr/local/lib/python3.10/site-packages/airflow/executors/local_executor.py", line 131, in _execute_work_in_fork
args.func(args)
File "/usr/local/lib/python3.10/site-packages/airflow/cli/cli_config.py", line 51, in command
return func(*args, **kwargs)
File "/usr/local/lib/python3.10/site-packages/airflow/utils/cli.py", line 112, in wrapper
return f(*args, **kwargs)
File "/usr/local/lib/python3.10/site-packages/airflow/cli/commands/task_command.py", line 405, in task_run
ti, _ = _get_ti(task, args.map_index, exec_date_or_run_id=args.execution_date_or_run_id, pool=args.pool)
File "/usr/local/lib/python3.10/site-packages/airflow/utils/session.py", line 76, in wrapper
return func(*args, session=session, **kwargs)
File "/usr/local/lib/python3.10/site-packages/airflow/cli/commands/task_command.py", line 166, in _get_ti
raise RuntimeError("No map_index passed to mapped task")
RuntimeError: No map_index passed to mapped task
Re-running the command, backfilling finished successfully. However, running for a different start and end date I got the issue again.
When running the backfill with this command:
airflow dags backfill stale_data_test --start-date 2023-05-01 --end-date 2023-06-01 -B
I have got this issue:
Traceback (most recent call last):
File "/usr/local/lib/python3.10/site-packages/airflow/jobs/backfill_job_runner.py", line 912, in _execute
self._execute_dagruns(
File "/usr/local/lib/python3.10/site-packages/airflow/utils/session.py", line 73, in wrapper
return func(*args, **kwargs)
File "/usr/local/lib/python3.10/site-packages/airflow/jobs/backfill_job_runner.py", line 799, in _execute_dagruns
processed_dag_run_dates = self._process_backfill_task_instances(
File "/usr/local/lib/python3.10/site-packages/airflow/jobs/backfill_job_runner.py", line 695, in _process_backfill_task_instances
run.update_state(session=session)
File "/usr/local/lib/python3.10/site-packages/airflow/utils/session.py", line 73, in wrapper
return func(*args, **kwargs)
File "/usr/local/lib/python3.10/site-packages/airflow/models/dagrun.py", line 572, in update_state
info = self.task_instance_scheduling_decisions(session)
File "/usr/local/lib/python3.10/site-packages/airflow/utils/session.py", line 73, in wrapper
return func(*args, **kwargs)
File "/usr/local/lib/python3.10/site-packages/airflow/models/dagrun.py", line 719, in task_instance_scheduling_decisions
schedulable_tis, changed_tis, expansion_happened = self._get_ready_tis(
File "/usr/local/lib/python3.10/site-packages/airflow/models/dagrun.py", line 821, in _get_ready_tis
new_tis = _expand_mapped_task_if_needed(schedulable)
File "/usr/local/lib/python3.10/site-packages/airflow/models/dagrun.py", line 799, in _expand_mapped_task_if_needed
expanded_tis, _ = ti.task.expand_mapped_task(self.run_id, session=session)
File "/usr/local/lib/python3.10/site-packages/airflow/models/abstractoperator.py", line 471, in expand_mapped_task
session.flush()
File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 3449, in flush
self._flush(objects)
File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 3588, in _flush
with util.safe_reraise():
File "/usr/local/lib/python3.10/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
compat.raise_(
File "/usr/local/lib/python3.10/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
raise exception
File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 3549, in _flush
flush_context.execute()
File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/unitofwork.py", line 456, in execute
rec.execute(self)
File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/unitofwork.py", line 630, in execute
util.preloaded.orm_persistence.save_obj(
File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/persistence.py", line 237, in save_obj
_emit_update_statements(
File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/persistence.py", line 1035, in _emit_update_statements
raise orm_exc.StaleDataError(
sqlalchemy.orm.exc.StaleDataError: UPDATE statement on table 'task_instance' expected to update 1 row(s); 0 were matched.
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/astro/.local/bin/airflow", line 8, in <module>
sys.exit(main())
File "/usr/local/lib/python3.10/site-packages/airflow/__main__.py", line 48, in main
args.func(args)
File "/usr/local/lib/python3.10/site-packages/airflow/cli/cli_config.py", line 51, in command
return func(*args, **kwargs)
File "/usr/local/lib/python3.10/site-packages/airflow/utils/cli.py", line 112, in wrapper
return f(*args, **kwargs)
File "/usr/local/lib/python3.10/site-packages/airflow/cli/commands/dag_command.py", line 139, in dag_backfill
_run_dag_backfill(dags, args)
File "/usr/local/lib/python3.10/site-packages/airflow/cli/commands/dag_command.py", line 92, in _run_dag_backfill
dag.run(
File "/usr/local/lib/python3.10/site-packages/airflow/models/dag.py", line 2490, in run
run_job(job=job, execute_callable=job_runner._execute)
File "/usr/local/lib/python3.10/site-packages/airflow/utils/session.py", line 76, in wrapper
return func(*args, session=session, **kwargs)
File "/usr/local/lib/python3.10/site-packages/airflow/jobs/job.py", line 284, in run_job
return execute_job(job, execute_callable=execute_callable)
File "/usr/local/lib/python3.10/site-packages/airflow/jobs/job.py", line 313, in execute_job
ret = execute_callable()
File "/usr/local/lib/python3.10/site-packages/airflow/utils/session.py", line 76, in wrapper
return func(*args, session=session, **kwargs)
File "/usr/local/lib/python3.10/site-packages/airflow/jobs/backfill_job_runner.py", line 941, in _execute
session.commit()
File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 1454, in commit
self._transaction.commit(_to_root=self.future)
File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 830, in commit
self._assert_active(prepared_ok=True)
File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 604, in _assert_active
raise sa_exc.PendingRollbackError(
sqlalchemy.exc.PendingRollbackError: This Session's transaction has been rolled back due to a previous exception during flush. To begin a new transaction with this Session, first issue Session.rollback(). Original exception was: UPDATE statement on table 'task_instance' expected to update 1 row(s); 0 were matched. (Background on this error at: https://sqlalche.me/e/14/7s2a)
This second issue might be related to this.
I have also tried to limit the number of max_active_dag_runs
to 8, but then I got a deadlock (didn't reproduce at time of writing).
Probably related to the issue here.
What you think should happen instead
The backfill should finish successfully without any issues.
How to reproduce
To reproduce I have made a simple DAG:
import pendulum
from airflow import DAG
from airflow.decorators import task_group
from airflow.models import DagRun
from airflow.operators.python import PythonOperator
from airflow.utils.types import DagRunType
from dags.include.helpers.dag_helpers import merge_with_default_args
_QUERY_INTERVAL_START_OFFSET = 14
_QUERY_INTERVAL_END_OFFSET = 2
def _get_start_end_dates(dag_run: DagRun, data_interval_end: pendulum.DateTime):
if dag_run.run_type in [DagRunType.BACKFILL_JOB, DagRunType.MANUAL]:
start_date = data_interval_end.subtract(days=_QUERY_INTERVAL_END_OFFSET).date()
end_date = data_interval_end.subtract(days=_QUERY_INTERVAL_END_OFFSET).date()
return [
{
"start_date": start_date.isoformat(),
"end_date": end_date.isoformat(),
}
]
return [
{
"start_date": data_interval_end.subtract(days=i).date().isoformat(),
"end_date": data_interval_end.subtract(days=i).date().isoformat(),
}
for i in range(_QUERY_INTERVAL_END_OFFSET, _QUERY_INTERVAL_START_OFFSET + 1)
]
def _get_insert_run_data(
dag_run: DagRun,
data_interval_end: pendulum.DateTime,
):
current_date = data_interval_end.date().isoformat()
return [
{"current_date": current_date, **dates}
for dates in _get_start_end_dates(dag_run, data_interval_end)
]
def _print(start_date: str, end_date: str, current_date: str):
print(f"start_date: {start_date}")
print(f"end_date: {end_date}")
print(f"current_date: {current_date}")
with DAG(
dag_id="stale_data_test",
catchup=False,
start_date=pendulum.datetime(2023, 6, 7),
template_searchpath=[
"/usr/local/airflow/dags/include",
"/usr/local/airflow/dags/sem",
],
default_args=merge_with_default_args(),
schedule="0 6 * * *", # At 06:00 UTC every day
# max_active_runs=8,
# max_active_tasks=8,
):
get_run_data = PythonOperator(
task_id="get_run_data",
python_callable=_get_insert_run_data,
)
@task_group(group_id="insert_new_daily_data")
def insert_new_daily_data(start_date: str, end_date: str, current_date: str):
cleanup = PythonOperator(
task_id="cleanup",
python_callable=_print,
op_kwargs={
"start_date": start_date,
"end_date": end_date,
"current_date": current_date,
},
)
insert = PythonOperator(
task_id="insert",
python_callable=_print,
op_kwargs={
"start_date": start_date,
"end_date": end_date,
"current_date": current_date,
},
)
cleanup >> insert
insert_new_daily_data.expand_kwargs(kwargs=get_run_data.output)
Then I have bashed into the docker container and run the following commands (after waiting each finished or interrupting in case of specific errors):
1 airflow dags backfill stale_data_test --start-date 2023-07-01 --end-date 2023-08-01 -B
2 airflow dags backfill stale_data_test --start-date 2023-07-01 --end-date 2023-08-01 -B
3 airflow dags backfill stale_data_test --start-date 2023-06-01 --end-date 2023-07-01 -B
4 airflow dags backfill stale_data_test --start-date 2023-05-01 --end-date 2023-06-01 -B
Try for more dates or re-running with --reset-dagruns -y
.
Operating System
OS X (Linux in Docker)
Versions of Apache Airflow Providers
No response
Deployment
Docker-Compose
Deployment details
Docker:
Engine: 24.0.2
Compose: v2.19.1
Docker desktop:
Version
4.21.1 (114176)
Astro CLI Version: 1.17.1
Anything else
These problems occur regularly.
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct