From 5a5c5751b9459a6eca01d2a1c5c1ba980d5a60cb Mon Sep 17 00:00:00 2001 From: keeagnsmith21 Date: Wed, 7 Jan 2026 11:34:42 +0800 Subject: [PATCH 01/16] First pass --- observatory_platform/airflow/airflow.py | 3 ++- observatory_platform/airflow/sensors.py | 2 +- observatory_platform/airflow/tasks.py | 4 ++-- observatory_platform/airflow/tests/fixtures/bad_dag.py | 4 ++-- .../airflow/tests/fixtures/good_dag.py | 4 ++-- observatory_platform/airflow/tests/test_airflow.py | 10 +++++----- observatory_platform/airflow/tests/test_sensors.py | 4 ++-- observatory_platform/airflow/workflow.py | 2 +- observatory_platform/google/tests/fixtures/bad_dag.py | 4 ++-- observatory_platform/sandbox/sandbox_environment.py | 5 +++-- observatory_platform/sandbox/test_utils.py | 4 ++-- observatory_platform/sandbox/tests/fixtures/bad_dag.py | 4 ++-- .../sandbox/tests/test_sandbox_environment.py | 5 ++--- pyproject.toml | 7 ++++--- 14 files changed, 32 insertions(+), 30 deletions(-) diff --git a/observatory_platform/airflow/airflow.py b/observatory_platform/airflow/airflow.py index ea0ed0064..be2d0dc67 100644 --- a/observatory_platform/airflow/airflow.py +++ b/observatory_platform/airflow/airflow.py @@ -27,7 +27,8 @@ import validators from airflow.exceptions import AirflowException, AirflowNotFoundException from airflow.hooks.base import BaseHook -from airflow.models import TaskInstance, XCom, DagRun, Connection +# from airflow.models import TaskInstance, XCom, DagRun, Connection +from airflow.sdk import TaskInstance, XCom, DagRun, Connection from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook from airflow.utils.db import provide_session from dateutil.relativedelta import relativedelta diff --git a/observatory_platform/airflow/sensors.py b/observatory_platform/airflow/sensors.py index 2d72dbdee..146db4bb9 100644 --- a/observatory_platform/airflow/sensors.py +++ b/observatory_platform/airflow/sensors.py @@ -22,7 +22,7 @@ import pendulum from airflow.models import DagRun -from airflow.sensors.external_task import ExternalTaskSensor +from airflow.providers.standard.sensors.external_task import ExternalTaskSensor from airflow.utils.db import provide_session from airflow.utils.state import State from sqlalchemy.orm.scoping import scoped_session diff --git a/observatory_platform/airflow/tasks.py b/observatory_platform/airflow/tasks.py index b4646cd87..ce844c11d 100644 --- a/observatory_platform/airflow/tasks.py +++ b/observatory_platform/airflow/tasks.py @@ -19,10 +19,10 @@ from typing import List, Optional import airflow -from airflow.decorators import task +from airflow.sdk import task from airflow.exceptions import AirflowNotFoundException from airflow.hooks.base import BaseHook -from airflow.models import Variable +from airflow.sdk import Variable from observatory_platform.google.gke import gke_create_volume, gke_delete_volume diff --git a/observatory_platform/airflow/tests/fixtures/bad_dag.py b/observatory_platform/airflow/tests/fixtures/bad_dag.py index e4ce1af0c..fbfd321c6 100644 --- a/observatory_platform/airflow/tests/fixtures/bad_dag.py +++ b/observatory_platform/airflow/tests/fixtures/bad_dag.py @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:ecff8cd6bee7e5a6dd1a4b40358484eef5c52f0efe1b3c31a00f62a1d7987f81 -size 978 +oid sha256:ca28383c36130ea31943b776b540f15f41de1f0a086e479da28fa9ad46d4c32a +size 1001 diff --git a/observatory_platform/airflow/tests/fixtures/good_dag.py b/observatory_platform/airflow/tests/fixtures/good_dag.py index 5d26c28c0..b00270014 100644 --- a/observatory_platform/airflow/tests/fixtures/good_dag.py +++ b/observatory_platform/airflow/tests/fixtures/good_dag.py @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:321e9a394cad467e2bcc41583e55e45f9ecbaeb6c82a4d518290994cac75b708 -size 1152 +oid sha256:56c228ee6ac13255d4a82270505dccd601326235751d0d0fc0dc3e1f149609f4 +size 1175 diff --git a/observatory_platform/airflow/tests/test_airflow.py b/observatory_platform/airflow/tests/test_airflow.py index 9d0599c5e..4da83359c 100644 --- a/observatory_platform/airflow/tests/test_airflow.py +++ b/observatory_platform/airflow/tests/test_airflow.py @@ -24,14 +24,14 @@ import unittest from unittest.mock import MagicMock, patch -from airflow.decorators import dag from airflow.exceptions import AirflowException, AirflowNotFoundException from airflow.hooks.base import BaseHook -from airflow.models.connection import Connection -from airflow.models.dag import DAG, settings +from airflow.sdk import XCom, Connection +from airflow.models.dag import settings +from airflow.sdk import DAG, dag from airflow.models.xcom import XCom -from airflow.operators.bash import BashOperator -from airflow.operators.python import PythonOperator +from airflow.providers.standard.operators.bash import BashOperator +from airflow.providers.standard.operators.python import PythonOperator from airflow.utils import db from airflow.utils.session import provide_session from airflow.utils.state import State diff --git a/observatory_platform/airflow/tests/test_sensors.py b/observatory_platform/airflow/tests/test_sensors.py index bbe31f772..dc76c8ec0 100644 --- a/observatory_platform/airflow/tests/test_sensors.py +++ b/observatory_platform/airflow/tests/test_sensors.py @@ -23,8 +23,8 @@ import time_machine from airflow.exceptions import AirflowException from airflow.models import DagRun, DagModel -from airflow.models.dag import DAG -from airflow.operators.python import PythonOperator +from airflow.sdk import DAG +from airflow.providers.standard.operators.python import PythonOperator from airflow.utils.session import provide_session from airflow.utils.state import State diff --git a/observatory_platform/airflow/workflow.py b/observatory_platform/airflow/workflow.py index 7fc12fdad..55bdae516 100644 --- a/observatory_platform/airflow/workflow.py +++ b/observatory_platform/airflow/workflow.py @@ -27,7 +27,7 @@ import pendulum from airflow import AirflowException -from airflow.models import DagBag, Variable +from airflow.sdk import DagBag, Variable from observatory_platform.airflow.airflow import delete_old_xcoms from observatory_platform.config import AirflowVars diff --git a/observatory_platform/google/tests/fixtures/bad_dag.py b/observatory_platform/google/tests/fixtures/bad_dag.py index e4ce1af0c..fbfd321c6 100644 --- a/observatory_platform/google/tests/fixtures/bad_dag.py +++ b/observatory_platform/google/tests/fixtures/bad_dag.py @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:ecff8cd6bee7e5a6dd1a4b40358484eef5c52f0efe1b3c31a00f62a1d7987f81 -size 978 +oid sha256:ca28383c36130ea31943b776b540f15f41de1f0a086e479da28fa9ad46d4c32a +size 1001 diff --git a/observatory_platform/sandbox/sandbox_environment.py b/observatory_platform/sandbox/sandbox_environment.py index 067a88f09..16166eb45 100644 --- a/observatory_platform/sandbox/sandbox_environment.py +++ b/observatory_platform/sandbox/sandbox_environment.py @@ -40,11 +40,12 @@ import google import pendulum import requests -from airflow import DAG, settings +from airflow import settings +from airflow.sdk import DAG from airflow.models.connection import Connection from airflow.models.dagrun import DagRun from airflow.models.taskinstance import TaskInstance -from airflow.models.variable import Variable +from airflow.sdk import Variable from airflow.timetables.base import DataInterval from airflow.utils import db from airflow.utils.state import State diff --git a/observatory_platform/sandbox/test_utils.py b/observatory_platform/sandbox/test_utils.py index eda7bb059..58cb599fb 100644 --- a/observatory_platform/sandbox/test_utils.py +++ b/observatory_platform/sandbox/test_utils.py @@ -30,10 +30,10 @@ import boto3 import httpretty import pendulum -from airflow import DAG +from airflow.sdk import DAG from airflow.exceptions import AirflowException from airflow.models import DagBag -from airflow.operators.empty import EmptyOperator +from airflow.providers.standard.operators.empty import EmptyOperator from deepdiff import DeepDiff from google.cloud import bigquery, storage from google.cloud.bigquery import SourceFormat diff --git a/observatory_platform/sandbox/tests/fixtures/bad_dag.py b/observatory_platform/sandbox/tests/fixtures/bad_dag.py index e4ce1af0c..fbfd321c6 100644 --- a/observatory_platform/sandbox/tests/fixtures/bad_dag.py +++ b/observatory_platform/sandbox/tests/fixtures/bad_dag.py @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:ecff8cd6bee7e5a6dd1a4b40358484eef5c52f0efe1b3c31a00f62a1d7987f81 -size 978 +oid sha256:ca28383c36130ea31943b776b540f15f41de1f0a086e479da28fa9ad46d4c32a +size 1001 diff --git a/observatory_platform/sandbox/tests/test_sandbox_environment.py b/observatory_platform/sandbox/tests/test_sandbox_environment.py index ad17f638f..67f086f90 100644 --- a/observatory_platform/sandbox/tests/test_sandbox_environment.py +++ b/observatory_platform/sandbox/tests/test_sandbox_environment.py @@ -22,11 +22,10 @@ from datetime import timedelta import pendulum -from airflow.decorators import dag, task, task_group +from airflow.sdk import dag, task, task_group, Variable from airflow.exceptions import AirflowSkipException -from airflow.models.connection import Connection +from airflow.sdk import Connection from airflow.models.dag import ScheduleArg -from airflow.models.variable import Variable from airflow.timetables.base import DataInterval from airflow.utils.state import TaskInstanceState from google.cloud.exceptions import NotFound diff --git a/pyproject.toml b/pyproject.toml index ebc29617a..35226d786 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -6,7 +6,7 @@ build-backend = "setuptools.build_meta" name = "observatory-platform" dynamic = ["version"] description = "The Observatory Platform is an environment for fetching, processing and analysing data to understand how well universities operate as Open Knowledge Institutions." -requires-python = ">=3.10" +requires-python = "==3.12" license = { text = "Apache-2.0" } keywords = ["science", "data", "workflows", "academic institutes", "academic-observatory-workflows"] authors = [{ name = "Curtin University", email = "agent@observatory.academy" }] @@ -19,7 +19,7 @@ classifiers = [ "License :: OSI Approved :: Apache Software License", "Operating System :: OS Independent", "Programming Language :: Python :: 3", - "Programming Language :: Python :: 3.7", + "Programming Language :: Python :: 3.12", "Topic :: Scientific/Engineering", "Topic :: Software Development :: Libraries", "Topic :: Software Development :: Libraries :: Python Modules", @@ -28,7 +28,7 @@ classifiers = [ dependencies = [ "setuptools==80.9.0", # Airflow - "apache-airflow[slack]==2.10.5", + "apache-airflow[slack]==3.1.5", "apache-airflow-providers-cncf-kubernetes==7.4.0", "apache-airflow-providers-common-compat==1.9.0", "apache-airflow-providers-common-io==1.6.5", @@ -40,6 +40,7 @@ dependencies = [ "apache-airflow-providers-slack==9.1.0", "apache-airflow-providers-smtp==2.3.2", "apache-airflow-providers-sqlite==4.1.3", + "apache-airflow-providers-standard>=1.10.2", # Google Cloud "google-crc32c>=1.1.0,<2", "google-cloud-bigquery>=3.2,<4", From cd564a6c777924125ef293915a9f6194c44c65e9 Mon Sep 17 00:00:00 2001 From: keeagnsmith21 Date: Wed, 7 Jan 2026 11:44:38 +0800 Subject: [PATCH 02/16] version updates --- README.md | 2 +- pyproject.toml | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 8ae1dfd50..5bb60ce1e 100644 --- a/README.md +++ b/README.md @@ -32,7 +32,7 @@ cd observatory-platform Install dependencies: ```bash -pip install -e .[tests] --constraint https://raw.githubusercontent.com/apache/airflow/constraints-2.10.5/constraints-no-providers-3.10.txt +pip install -e .[tests] --constraint https://raw.githubusercontent.com/apache/airflow/constraints-3.1.5/constraints-no-providers-3.12.txt ``` Run unit tests: diff --git a/pyproject.toml b/pyproject.toml index 35226d786..71f384ce9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -30,7 +30,7 @@ dependencies = [ # Airflow "apache-airflow[slack]==3.1.5", "apache-airflow-providers-cncf-kubernetes==7.4.0", - "apache-airflow-providers-common-compat==1.9.0", + "apache-airflow-providers-common-compat==1.10.1", "apache-airflow-providers-common-io==1.6.5", "apache-airflow-providers-common-sql==1.29.0", "apache-airflow-providers-fab==1.5.2", @@ -59,7 +59,7 @@ dependencies = [ # SFTP "paramiko>=3,<4", # Utils - "natsort>=7.1.1,<8", + "natsort>=8,<9", "backoff>=2,<3", "validators<=0.20.0", "xmltodict", From 6442d5412767acb977fb533dd94e7ec95b4592c5 Mon Sep 17 00:00:00 2001 From: keeagnsmith21 Date: Wed, 7 Jan 2026 11:47:53 +0800 Subject: [PATCH 03/16] python version fix --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 71f384ce9..320a745d1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -6,7 +6,7 @@ build-backend = "setuptools.build_meta" name = "observatory-platform" dynamic = ["version"] description = "The Observatory Platform is an environment for fetching, processing and analysing data to understand how well universities operate as Open Knowledge Institutions." -requires-python = "==3.12" +requires-python = ">==3.12<3.13" license = { text = "Apache-2.0" } keywords = ["science", "data", "workflows", "academic institutes", "academic-observatory-workflows"] authors = [{ name = "Curtin University", email = "agent@observatory.academy" }] From 83b6bd47e3ac8de3e770e4de93d2206e46b0534a Mon Sep 17 00:00:00 2001 From: keeagnsmith21 Date: Wed, 7 Jan 2026 11:49:53 +0800 Subject: [PATCH 04/16] typo fix --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 320a745d1..65a95450c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -6,7 +6,7 @@ build-backend = "setuptools.build_meta" name = "observatory-platform" dynamic = ["version"] description = "The Observatory Platform is an environment for fetching, processing and analysing data to understand how well universities operate as Open Knowledge Institutions." -requires-python = ">==3.12<3.13" +requires-python = ">=3.12<3.13" license = { text = "Apache-2.0" } keywords = ["science", "data", "workflows", "academic institutes", "academic-observatory-workflows"] authors = [{ name = "Curtin University", email = "agent@observatory.academy" }] From f16d19cdb162c8f1fc8889b9bb63f1be2dd98004 Mon Sep 17 00:00:00 2001 From: keeagnsmith21 Date: Wed, 7 Jan 2026 11:51:58 +0800 Subject: [PATCH 05/16] typo --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 65a95450c..37fb04d00 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -6,7 +6,7 @@ build-backend = "setuptools.build_meta" name = "observatory-platform" dynamic = ["version"] description = "The Observatory Platform is an environment for fetching, processing and analysing data to understand how well universities operate as Open Knowledge Institutions." -requires-python = ">=3.12<3.13" +requires-python = ">=3.12,<3.13" license = { text = "Apache-2.0" } keywords = ["science", "data", "workflows", "academic institutes", "academic-observatory-workflows"] authors = [{ name = "Curtin University", email = "agent@observatory.academy" }] From 719f6600d9d58d9e03bc88570663dac82118f95e Mon Sep 17 00:00:00 2001 From: keeagnsmith21 Date: Wed, 7 Jan 2026 12:02:43 +0800 Subject: [PATCH 06/16] kubernetes version update --- pyproject.toml | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 37fb04d00..1467d8ad5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -29,7 +29,7 @@ dependencies = [ "setuptools==80.9.0", # Airflow "apache-airflow[slack]==3.1.5", - "apache-airflow-providers-cncf-kubernetes==7.4.0", + "apache-airflow-providers-cncf-kubernetes==10.12.0", "apache-airflow-providers-common-compat==1.10.1", "apache-airflow-providers-common-io==1.6.5", "apache-airflow-providers-common-sql==1.29.0", @@ -37,7 +37,6 @@ dependencies = [ "apache-airflow-providers-ftp==3.13.3", "apache-airflow-providers-http==5.3.2", "apache-airflow-providers-imap==3.9.4", - "apache-airflow-providers-slack==9.1.0", "apache-airflow-providers-smtp==2.3.2", "apache-airflow-providers-sqlite==4.1.3", "apache-airflow-providers-standard>=1.10.2", From decdd9d937fabffa6bfd9b84937993ce07ac3426 Mon Sep 17 00:00:00 2001 From: keeagnsmith21 Date: Wed, 7 Jan 2026 13:31:27 +0800 Subject: [PATCH 07/16] fix dagbag import --- observatory_platform/airflow/workflow.py | 3 ++- observatory_platform/load_dags.py | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/observatory_platform/airflow/workflow.py b/observatory_platform/airflow/workflow.py index 55bdae516..1cc86e084 100644 --- a/observatory_platform/airflow/workflow.py +++ b/observatory_platform/airflow/workflow.py @@ -27,7 +27,8 @@ import pendulum from airflow import AirflowException -from airflow.sdk import DagBag, Variable +from airflow.models import DagBag +from airflow.sdk import Variable from observatory_platform.airflow.airflow import delete_old_xcoms from observatory_platform.config import AirflowVars diff --git a/observatory_platform/load_dags.py b/observatory_platform/load_dags.py index 587569244..561e1e1a9 100644 --- a/observatory_platform/load_dags.py +++ b/observatory_platform/load_dags.py @@ -15,6 +15,6 @@ # The keywords airflow and DAG are required to load the DAGs from this file, see bullet 2 in the Apache Airflow FAQ: # https://airflow.apache.org/docs/stable/faq.html -from observatory.platform.refactor.workflow import load_dags_from_config +from observatory_platform.airflow.workflow import load_dags_from_config load_dags_from_config() From d89d307fe36ef519f05ed0cf6677d8b3bcc9364a Mon Sep 17 00:00:00 2001 From: keeagnsmith21 Date: Wed, 7 Jan 2026 14:39:42 +0800 Subject: [PATCH 08/16] version bumps --- observatory_platform/airflow/airflow.py | 6 +++--- pyproject.toml | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/observatory_platform/airflow/airflow.py b/observatory_platform/airflow/airflow.py index be2d0dc67..cf08f1225 100644 --- a/observatory_platform/airflow/airflow.py +++ b/observatory_platform/airflow/airflow.py @@ -27,13 +27,13 @@ import validators from airflow.exceptions import AirflowException, AirflowNotFoundException from airflow.hooks.base import BaseHook -# from airflow.models import TaskInstance, XCom, DagRun, Connection -from airflow.sdk import TaskInstance, XCom, DagRun, Connection +from airflow.models import TaskInstance, XCom, DagRun +from airflow.sdk import Connection from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook from airflow.utils.db import provide_session from dateutil.relativedelta import relativedelta from sqlalchemy import and_ -from sqlalchemy.orm import scoped_session, Session +from sqlalchemy.orm import Session from observatory_platform.config import AirflowConns ScheduleInterval = Union[str, timedelta, relativedelta] diff --git a/pyproject.toml b/pyproject.toml index 1467d8ad5..ec26e0d4d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -30,7 +30,7 @@ dependencies = [ # Airflow "apache-airflow[slack]==3.1.5", "apache-airflow-providers-cncf-kubernetes==10.12.0", - "apache-airflow-providers-common-compat==1.10.1", + "apache-airflow-providers-common-compat>=1.11.0", "apache-airflow-providers-common-io==1.6.5", "apache-airflow-providers-common-sql==1.29.0", "apache-airflow-providers-fab==1.5.2", @@ -39,7 +39,7 @@ dependencies = [ "apache-airflow-providers-imap==3.9.4", "apache-airflow-providers-smtp==2.3.2", "apache-airflow-providers-sqlite==4.1.3", - "apache-airflow-providers-standard>=1.10.2", + "apache-airflow-providers-standard>=1.11.0", # Google Cloud "google-crc32c>=1.1.0,<2", "google-cloud-bigquery>=3.2,<4", From 011f096c1a4d8c5b631793b15986c25eaf1bb971 Mon Sep 17 00:00:00 2001 From: keeagnsmith21 Date: Wed, 7 Jan 2026 14:52:07 +0800 Subject: [PATCH 09/16] unbump --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index ec26e0d4d..92547d897 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -30,7 +30,7 @@ dependencies = [ # Airflow "apache-airflow[slack]==3.1.5", "apache-airflow-providers-cncf-kubernetes==10.12.0", - "apache-airflow-providers-common-compat>=1.11.0", + "apache-airflow-providers-common-compat>=1.10.1", "apache-airflow-providers-common-io==1.6.5", "apache-airflow-providers-common-sql==1.29.0", "apache-airflow-providers-fab==1.5.2", From 92f4c61dbfc2ab986a98aac8b17a1cfd990b68c5 Mon Sep 17 00:00:00 2001 From: keeagnsmith21 Date: Wed, 7 Jan 2026 14:55:22 +0800 Subject: [PATCH 10/16] unbump --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 92547d897..ef68e2cd6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -39,7 +39,7 @@ dependencies = [ "apache-airflow-providers-imap==3.9.4", "apache-airflow-providers-smtp==2.3.2", "apache-airflow-providers-sqlite==4.1.3", - "apache-airflow-providers-standard>=1.11.0", + "apache-airflow-providers-standard>=1.10.2", # Google Cloud "google-crc32c>=1.1.0,<2", "google-cloud-bigquery>=3.2,<4", From 4d36dd1a742813908478b07a66d9f9b2f221872b Mon Sep 17 00:00:00 2001 From: keeagnsmith21 Date: Wed, 7 Jan 2026 15:16:34 +0800 Subject: [PATCH 11/16] slack pin --- pyproject.toml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index ef68e2cd6..1b8b94f0a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -28,7 +28,8 @@ classifiers = [ dependencies = [ "setuptools==80.9.0", # Airflow - "apache-airflow[slack]==3.1.5", + "apache-airflow==3.1.5", + "apache-airflow-providers-slack>=9.0.0", "apache-airflow-providers-cncf-kubernetes==10.12.0", "apache-airflow-providers-common-compat>=1.10.1", "apache-airflow-providers-common-io==1.6.5", From 00303b25af86d2d67441ce44d433634489307996 Mon Sep 17 00:00:00 2001 From: keeagnsmith21 Date: Wed, 7 Jan 2026 15:48:38 +0800 Subject: [PATCH 12/16] constraints file with providers --- README.md | 2 +- pyproject.toml | 24 ++++++++++++------------ 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/README.md b/README.md index 5bb60ce1e..769c421f8 100644 --- a/README.md +++ b/README.md @@ -32,7 +32,7 @@ cd observatory-platform Install dependencies: ```bash -pip install -e .[tests] --constraint https://raw.githubusercontent.com/apache/airflow/constraints-3.1.5/constraints-no-providers-3.12.txt +pip install -e .[tests] --constraint https://raw.githubusercontent.com/apache/airflow/constraints-3.1.5/constraints-3.12.txt ``` Run unit tests: diff --git a/pyproject.toml b/pyproject.toml index 1b8b94f0a..a9cddacd5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -29,18 +29,18 @@ dependencies = [ "setuptools==80.9.0", # Airflow "apache-airflow==3.1.5", - "apache-airflow-providers-slack>=9.0.0", - "apache-airflow-providers-cncf-kubernetes==10.12.0", - "apache-airflow-providers-common-compat>=1.10.1", - "apache-airflow-providers-common-io==1.6.5", - "apache-airflow-providers-common-sql==1.29.0", - "apache-airflow-providers-fab==1.5.2", - "apache-airflow-providers-ftp==3.13.3", - "apache-airflow-providers-http==5.3.2", - "apache-airflow-providers-imap==3.9.4", - "apache-airflow-providers-smtp==2.3.2", - "apache-airflow-providers-sqlite==4.1.3", - "apache-airflow-providers-standard>=1.10.2", + # "apache-airflow-providers-slack>=9.0.0", + # "apache-airflow-providers-cncf-kubernetes==10.12.0", + # "apache-airflow-providers-common-compat>=1.10.1", + # "apache-airflow-providers-common-io==1.6.5", + # "apache-airflow-providers-common-sql==1.29.0", + # "apache-airflow-providers-fab==1.5.2", + # "apache-airflow-providers-ftp==3.13.3", + # "apache-airflow-providers-http==5.3.2", + # "apache-airflow-providers-imap==3.9.4", + # "apache-airflow-providers-smtp==2.3.2", + # "apache-airflow-providers-sqlite==4.1.3", + # "apache-airflow-providers-standard>=1.10.2", # Google Cloud "google-crc32c>=1.1.0,<2", "google-cloud-bigquery>=3.2,<4", From e8c304d068cf9dca3a89f9a5f385c57da52dd09b Mon Sep 17 00:00:00 2001 From: keeagnsmith21 Date: Wed, 7 Jan 2026 15:55:24 +0800 Subject: [PATCH 13/16] gcs version fix --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index a9cddacd5..5b38cc486 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -45,7 +45,7 @@ dependencies = [ "google-crc32c>=1.1.0,<2", "google-cloud-bigquery>=3.2,<4", "google-api-python-client>=2,<3", - "google-cloud-storage>=2.7.0,<3", + "google-cloud-storage>=3.0.0,<4", #"google-auth-oauthlib>=0.4.5,<1", "google-cloud-compute >=1.16.0,<2.0", # File manipulation, reading, writing From 7f9ad143be79d52937bd2f6104bdbf12fa353e5c Mon Sep 17 00:00:00 2001 From: keeagnsmith21 Date: Wed, 7 Jan 2026 15:58:39 +0800 Subject: [PATCH 14/16] validators version removal --- pyproject.toml | 1 - 1 file changed, 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 5b38cc486..00d30dea9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -61,7 +61,6 @@ dependencies = [ # Utils "natsort>=8,<9", "backoff>=2,<3", - "validators<=0.20.0", "xmltodict", "tenacity>=8.0.0", "python-dateutil", From f5c49985c24cdf7b667c1e7b213f4f0405f4fec4 Mon Sep 17 00:00:00 2001 From: keeagnsmith21 Date: Wed, 7 Jan 2026 16:13:42 +0800 Subject: [PATCH 15/16] use constrains for versioning where possible --- pyproject.toml | 25 +++++++++++++------------ 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 00d30dea9..c5024285a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -29,18 +29,18 @@ dependencies = [ "setuptools==80.9.0", # Airflow "apache-airflow==3.1.5", - # "apache-airflow-providers-slack>=9.0.0", - # "apache-airflow-providers-cncf-kubernetes==10.12.0", - # "apache-airflow-providers-common-compat>=1.10.1", - # "apache-airflow-providers-common-io==1.6.5", - # "apache-airflow-providers-common-sql==1.29.0", - # "apache-airflow-providers-fab==1.5.2", - # "apache-airflow-providers-ftp==3.13.3", - # "apache-airflow-providers-http==5.3.2", - # "apache-airflow-providers-imap==3.9.4", - # "apache-airflow-providers-smtp==2.3.2", - # "apache-airflow-providers-sqlite==4.1.3", - # "apache-airflow-providers-standard>=1.10.2", + "apache-airflow-providers-slack", + "apache-airflow-providers-cncf-kubernetes", + "apache-airflow-providers-common-compat", + "apache-airflow-providers-common-io", + "apache-airflow-providers-common-sql", + "apache-airflow-providers-fab", + "apache-airflow-providers-ftp", + "apache-airflow-providers-http", + "apache-airflow-providers-imap", + "apache-airflow-providers-smtp", + "apache-airflow-providers-sqlite", + "apache-airflow-providers-standard", # Google Cloud "google-crc32c>=1.1.0,<2", "google-cloud-bigquery>=3.2,<4", @@ -59,6 +59,7 @@ dependencies = [ # SFTP "paramiko>=3,<4", # Utils + "validators", "natsort>=8,<9", "backoff>=2,<3", "xmltodict", From 3f008bcdf3c89629dc9cfb0788799f30c20144ed Mon Sep 17 00:00:00 2001 From: keeagnsmith21 Date: Tue, 13 Jan 2026 09:22:50 +0800 Subject: [PATCH 16/16] import updates --- observatory_platform/airflow/airflow.py | 2 +- .../sandbox/sandbox_environment.py | 54 +++++++++++++++---- 2 files changed, 44 insertions(+), 12 deletions(-) diff --git a/observatory_platform/airflow/airflow.py b/observatory_platform/airflow/airflow.py index cf08f1225..80b04cc88 100644 --- a/observatory_platform/airflow/airflow.py +++ b/observatory_platform/airflow/airflow.py @@ -26,7 +26,7 @@ import six import validators from airflow.exceptions import AirflowException, AirflowNotFoundException -from airflow.hooks.base import BaseHook +from airflow.sdk.bases.hook import BaseHook from airflow.models import TaskInstance, XCom, DagRun from airflow.sdk import Connection from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook diff --git a/observatory_platform/sandbox/sandbox_environment.py b/observatory_platform/sandbox/sandbox_environment.py index 16166eb45..3122c0b9c 100644 --- a/observatory_platform/sandbox/sandbox_environment.py +++ b/observatory_platform/sandbox/sandbox_environment.py @@ -36,6 +36,7 @@ import os from tempfile import mkdtemp from typing import List, Optional, Set, Union +import uuid import google import pendulum @@ -44,8 +45,10 @@ from airflow.sdk import DAG from airflow.models.connection import Connection from airflow.models.dagrun import DagRun +from airflow.models.dag import DagModel from airflow.models.taskinstance import TaskInstance -from airflow.sdk import Variable +from airflow.models.variable import Variable +from airflow.models.dag_version import DagVersion from airflow.timetables.base import DataInterval from airflow.utils import db from airflow.utils.state import State @@ -102,6 +105,7 @@ def __init__( self.workflows = workflows self.env_vars = env_vars self.temp_dir = mkdtemp() + self.current_dag = None if self.create_gcp_env: self.download_bucket = self.add_bucket(roles=gcs_bucket_roles) @@ -172,6 +176,12 @@ def add_bucket(self, prefix: Optional[str] = None, roles: Optional[Union[Set[str return bucket_name + def _ensure_dag_version(self, dag): + dag_version = DagVersion.get_latest_version(dag.dag_id, session=self.session) + if dag_version is None: + dag_version = DagVersion.write_dag(dag, session=self.session) + return dag_version + def _create_bucket(self, bucket_id: str, roles: Optional[Union[str, Set[str]]] = None) -> None: """Create a Google Cloud Storage Bucket. @@ -264,6 +274,7 @@ def add_variable(self, var: Variable) -> None: :param var: the Airflow variable. :return: None. """ + try: existing_var = self.session.query(Variable).filter(Variable.key == var.key).first() if existing_var: @@ -282,6 +293,8 @@ def add_variable(self, var: Variable) -> None: self.session.add(var) self.session.commit() + os.environ[f"AIRFLOW_VAR_{var.key.upper()}"] = var.val + def add_connection(self, conn: Connection): """Add an Airflow connection to the Observatory environment. @@ -289,9 +302,17 @@ def add_connection(self, conn: Connection): :return: None. """ + # Add to the database for scheduler/legacy logic + existing_conn = self.session.query(Connection).filter(Connection.conn_id == conn.conn_id).first() + if existing_conn: + self.session.delete(existing_conn) + self.session.add(conn) self.session.commit() + env_key = f"AIRFLOW_CONN_{conn.conn_id.upper()}" + os.environ[env_key] = conn.get_uri() + def run_task(self, task_id: str, map_index: int = -1) -> TaskInstance: """Run an Airflow task. @@ -299,13 +320,14 @@ def run_task(self, task_id: str, map_index: int = -1) -> TaskInstance: :param map_index: the map index if the task is a daynamic task :return: None. """ - assert self.dag_run is not None, "with create_dag_run must be called before run_task" - dag = self.dag_run.dag run_id = self.dag_run.run_id - task = dag.get_task(task_id=task_id) - ti = TaskInstance(task, run_id=run_id, map_index=map_index) + task = self.current_dag.get_task(task_id=task_id) + latest_version = DagVersion.get_latest_version(self.dag_run.dag_id) + # latest_version = self._ensure_dag_version(self.current_dag) + # ti = TaskInstance(task=task, dag_version_id=uuid.uuid4(), run_id=run_id, map_index=map_index) + ti = TaskInstance(task=task, dag_version_id=latest_version, run_id=run_id, map_index=map_index) ti.refresh_from_db() # TODO: remove this when this issue fixed / PR merged: https://github.com/apache/airflow/issues/34023#issuecomment-1705761692 @@ -328,7 +350,7 @@ def get_task_instance(self, task_id: str) -> TaskInstance: assert self.dag_run is not None, "with create_dag_run must be called before get_task_instance" run_id = self.dag_run.run_id - task = self.dag_run.dag.get_task(task_id=task_id) + task = self.current_dag.get_task(task_id=task_id) ti = TaskInstance(task, run_id=run_id) ti.refresh_from_db() return ti @@ -339,7 +361,7 @@ def create_dag_run( dag: DAG, logical_date: pendulum.DateTime = None, data_interval: DataInterval = None, - run_type: DagRunType = DagRunType.SCHEDULED, + run_type: str = DagRunType.SCHEDULED, ): """Create a DagRun that can be used when running tasks. During cleanup the DAG run state is updated. @@ -355,19 +377,29 @@ def create_dag_run( if not logical_date: logical_date = data_interval.start elif logical_date: - data_interval = dag.infer_automated_data_interval(logical_date=logical_date) + data_interval = dag.timetable.infer_manual_data_interval(run_after=logical_date) start_date = data_interval.start else: raise ValueError("Must provide one of `data_inerval` or `logical_date`") + # dag_version = self._ensure_dag_version(dag) try: - self.dag_run = dag.create_dagrun( + self.dag_run = DagRun( + dag_id=dag.dag_id, + run_id=dag.timetable.generate_run_id( + run_type=DagRunType(run_type), run_after=logical_date, data_interval=data_interval + ), state=State.RUNNING, - execution_date=logical_date, + logical_date=logical_date, start_date=start_date, - run_type=run_type, + run_type=DagRunType(run_type), data_interval=data_interval, ) + DagVersion.write_dag(dag.dag_id, "sandbox", session=self.session) + self.session.add(self.dag_run) + self.session.commit() + self.current_dag = dag + yield self.dag_run finally: self.dag_run.update_state()