Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion airflow-core/docs/img/airflow_erd.sha256
Original file line number Diff line number Diff line change
@@ -1 +1 @@
f0833aded6ef9098ee8b4f22800b72310d33ae51800a1aeda8edc0360d3195ca
203aa3570578ef6e24b0f6725545e3ab830b349a9ca43f8f238ee9588245adc0
172 changes: 86 additions & 86 deletions airflow-core/docs/img/airflow_erd.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
5 changes: 4 additions & 1 deletion airflow-core/docs/migrations-ref.rst
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,10 @@ Here's the list of all the Database Migrations that are executed via when you ru
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
| Revision ID | Revises ID | Airflow Version | Description |
+=========================+==================+===================+==============================================================+
| ``82dbd68e6171`` (head) | ``55297ae24532`` | ``3.2.0`` | Add index to task_reschedule ti_id . |
| ``a5a3e5eb9b8d`` (head) | ``82dbd68e6171`` | ``3.2.0`` | Make external_executor_id TEXT to allow for longer |
| | | | external_executor_ids. |
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
| ``82dbd68e6171`` | ``55297ae24532`` | ``3.2.0`` | Add index to task_reschedule ti_id . |
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
| ``55297ae24532`` | ``e79fc784f145`` | ``3.2.0`` | Add required fields to enable UI integrations for the |
| | | | Deadline Alerts feature. |
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""
Make external_executor_id TEXT to allow for longer external_executor_ids.

Revision ID: a5a3e5eb9b8d
Revises: 82dbd68e6171
Create Date: 2026-01-28 16:35:00.000000

"""

from __future__ import annotations

import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision = "a5a3e5eb9b8d"
down_revision = "82dbd68e6171"
branch_labels = None
depends_on = None
airflow_version = "3.2.0"


def upgrade():
"""Change external_executor_id column from VARCHAR(250) to TEXT."""
with op.batch_alter_table("task_instance", schema=None) as batch_op:
batch_op.alter_column(
"external_executor_id",
existing_type=sa.VARCHAR(length=250),
type_=sa.Text(),
existing_nullable=True,
)

with op.batch_alter_table("task_instance_history", schema=None) as batch_op:
batch_op.alter_column(
"external_executor_id",
existing_type=sa.VARCHAR(length=250),
type_=sa.Text(),
existing_nullable=True,
)


def downgrade():
"""Revert external_executor_id column from TEXT to VARCHAR(250)."""
with op.batch_alter_table("task_instance_history", schema=None) as batch_op:
batch_op.alter_column(
"external_executor_id",
existing_type=sa.Text(),
type_=sa.VARCHAR(length=250),
existing_nullable=True,
)

with op.batch_alter_table("task_instance", schema=None) as batch_op:
batch_op.alter_column(
"external_executor_id",
existing_type=sa.Text(),
type_=sa.VARCHAR(length=250),
existing_nullable=True,
)
2 changes: 1 addition & 1 deletion airflow-core/src/airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ class TaskInstance(Base, LoggingMixin):
String(250), server_default=SpanStatus.NOT_STARTED, nullable=False
)

external_executor_id: Mapped[str | None] = mapped_column(StringID(), nullable=True)
external_executor_id: Mapped[str | None] = mapped_column(Text(), nullable=True)

# The trigger to resume on if we are in state DEFERRED
trigger_id: Mapped[int | None] = mapped_column(Integer, nullable=True)
Expand Down
3 changes: 2 additions & 1 deletion airflow-core/src/airflow/models/taskinstancehistory.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
Index,
Integer,
String,
Text,
UniqueConstraint,
func,
select,
Expand Down Expand Up @@ -105,7 +106,7 @@ class TaskInstanceHistory(Base):
String(250), server_default=SpanStatus.NOT_STARTED, nullable=False
)

external_executor_id: Mapped[str | None] = mapped_column(StringID(), nullable=True)
external_executor_id: Mapped[str | None] = mapped_column(Text(), nullable=True)
trigger_id: Mapped[int | None] = mapped_column(Integer, nullable=True)
trigger_timeout: Mapped[DateTime | None] = mapped_column(DateTime, nullable=True)
next_method: Mapped[str | None] = mapped_column(String(1000), nullable=True)
Expand Down
2 changes: 1 addition & 1 deletion airflow-core/src/airflow/utils/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ class MappedClassProtocol(Protocol):
"3.0.0": "29ce7909c52b",
"3.0.3": "fe199e1abd77",
"3.1.0": "cc92b33c6709",
"3.2.0": "82dbd68e6171",
"3.2.0": "a5a3e5eb9b8d",
}

# Prefix used to identify tables holding data moved during migration.
Expand Down
16 changes: 16 additions & 0 deletions airflow-core/tests/unit/models/test_taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -1441,6 +1441,22 @@ def test_check_and_change_state_before_execution_with_exec_id(self, create_task_
assert ti_from_deserialized_task.state == State.RUNNING
assert ti_from_deserialized_task.try_number == 0

@provide_session
def test_external_executor_id_accepts_long_values(self, create_task_instance, session):
"""Test that external_executor_id can store values exceeding 250 characters."""
# Kubernetes pod names and other executor IDs can exceed 250 chars
long_executor_id = "k8s-pod-" + "a" * 300 # 308 characters total

ti = create_task_instance(dag_id="test_long_external_executor_id")
ti.external_executor_id = long_executor_id
session.merge(ti)
session.commit()

# Verify the value persists without truncation
ti.refresh_from_db()
assert ti.external_executor_id == long_executor_id
assert len(ti.external_executor_id) == 308

def test_check_and_change_state_before_execution_dep_not_met(self, dag_maker):
with dag_maker(dag_id="test_check_and_change_state_before_execution") as dag:
task1 = EmptyOperator(task_id="task1")
Expand Down