Airflow's Task Execution API uses Cadwyn for API versioning with CalVer. This allows us to maintain backward compatibility while evolving the API over time.
Airflow components (e.g., workers, API servers) could be deployed independently. This can lead to version mismatches—for example, a worker using Task SDK 1.0.0 (requiring Airflow >=3.0.0) while the API server has been upgraded to 3.0.1. Without versioning, such mismatches can cause runtime failures or subtle bugs. Enforcing a clear versioning contract ensures backward compatibility and allows for safe, incremental upgrades across different components.
Note: We're prioritizing backward compatibility, older clients should continue to work seamlessly with newer servers. Since the client (Task SDK) and server (API) are still coupled, we can look at forward compatibility once we have a clear versioning strategy between Task SDK and Airflow Core.
- Any change to the API schema or behavior that affects existing clients requires a new version.
- All versions are maintained in the codebase
- Migrations encapsulate all schema changes.
- Core code logic remains version-agnostic.
When making changes to the Execution API, you must:
- Add a new migration module (e.g.,
v2025_04_28.py). Pick a likely date for when the Airflow version will be released.- Use the
vYYYY_MM_DDformat. - New migrations can be added to an existing unreleased version; not every migration needs a new version.
- For unreleased versions, pick a likely date for when it will be released.
- The version number should be unique and not conflict with any existing versions.
- Use the
- Document the changes in the migration.
- Update tests to cover both old and new versions.
- Ensure backward compatibility wherever possible.
Here's an example of how to add a new version migration that adds a new field consumed_asset_events to the
DagRun model.
from cadwyn import ResponseInfo, VersionChange, convert_response_to_previous_version_for, schema
from airflow.api_fastapi.execution_api.datamodels.taskinstance import DagRun, TIRunContext
class AddConsumedAssetEventsField(VersionChange):
"""Add the `consumed_asset_events` to DagRun model."""
description = __doc__
instructions_to_migrate_to_previous_version = (schema(DagRun).field("consumed_asset_events").didnt_exist,)
@convert_response_to_previous_version_for(TIRunContext) # type: ignore
def remove_consumed_asset_events(response: ResponseInfo): # type: ignore
response.body["dag_run"].pop("consumed_asset_events")The Execution API versioning code is organized as follows:
airflow-core/src/airflow/api_fastapi/execution_api/versions/- Contains version migrationsairflow-core/src/airflow/api_fastapi/execution_api/versions/head/- Contains the latest version modelsairflow-core/src/airflow/api_fastapi/execution_api/versions/v2025_04_28/- Contains version-specific models and migrationsairflow-core/tests/unit/api_fastapi/execution_api/versions/head- Contains tests for the latest versionairflow-core/tests/unit/api_fastapi/execution_api/versions/v2025_04_28- Contains tests for the version-specific models
For an example of how to implement version changes, see: - PR #50528 - PR #48125