From 664da195a5d3572fed792f7c96cba24eda06af47 Mon Sep 17 00:00:00 2001 From: nailo2c Date: Wed, 28 Jan 2026 15:03:18 -0800 Subject: [PATCH 1/3] Migrate ADLSListOperator from ADLS Gen1 to Gen2 (#44228) --- .../microsoft/azure/operators/adls.py | 22 ++++++++++++++----- .../azure/operators/test_adls_list.py | 14 +++++++----- 2 files changed, 26 insertions(+), 10 deletions(-) diff --git a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/adls.py b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/adls.py index 5e6e5e68b329d..1beaa290549fd 100644 --- a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/adls.py +++ b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/adls.py @@ -126,7 +126,8 @@ class ADLSListOperator(BaseOperator): For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:ADLSListOperator` - :param path: The Azure Data Lake path to find the objects. Supports glob strings (templated) + :param file_system_name: Name of the file system (container) in ADLS Gen2. + :param path: The directory path within the file system to list files from (templated). :param azure_data_lake_conn_id: Reference to the :ref:`Azure Data Lake connection`. """ @@ -134,13 +135,24 @@ class ADLSListOperator(BaseOperator): ui_color = "#901dd2" def __init__( - self, *, path: str, azure_data_lake_conn_id: str = DEFAULT_AZURE_DATA_LAKE_CONN_ID, **kwargs + self, + *, + file_system_name: str, + path: str, + azure_data_lake_conn_id: str = DEFAULT_AZURE_DATA_LAKE_CONN_ID, + **kwargs, ) -> None: super().__init__(**kwargs) + self.file_system_name = file_system_name self.path = path self.azure_data_lake_conn_id = azure_data_lake_conn_id def execute(self, context: Context) -> list: - hook = AzureDataLakeHook(azure_data_lake_conn_id=self.azure_data_lake_conn_id) - self.log.info("Getting list of ADLS files in path: %s", self.path) - return hook.list(path=self.path) + hook = AzureDataLakeStorageV2Hook(adls_conn_id=self.azure_data_lake_conn_id) + self.log.info( + "Getting list of ADLS files in file system %s, path: %s", self.file_system_name, self.path + ) + return hook.list_files_directory( + file_system_name=self.file_system_name, + directory_name=self.path, + ) diff --git a/providers/microsoft/azure/tests/unit/microsoft/azure/operators/test_adls_list.py b/providers/microsoft/azure/tests/unit/microsoft/azure/operators/test_adls_list.py index e73b1e4200e80..e292b6c8da0a9 100644 --- a/providers/microsoft/azure/tests/unit/microsoft/azure/operators/test_adls_list.py +++ b/providers/microsoft/azure/tests/unit/microsoft/azure/operators/test_adls_list.py @@ -22,7 +22,8 @@ from airflow.providers.microsoft.azure.operators.adls import ADLSListOperator TASK_ID = "test-adls-list-operator" -TEST_PATH = "test/*" +TEST_FILE_SYSTEM_NAME = "test-container" +TEST_PATH = "test/path" MOCK_FILES = [ "test/TEST1.csv", "test/TEST2.csv", @@ -33,12 +34,15 @@ class TestAzureDataLakeStorageListOperator: - @mock.patch("airflow.providers.microsoft.azure.operators.adls.AzureDataLakeHook") + @mock.patch("airflow.providers.microsoft.azure.operators.adls.AzureDataLakeStorageV2Hook") def test_execute(self, mock_hook): - mock_hook.return_value.list.return_value = MOCK_FILES + mock_hook.return_value.list_files_directory.return_value = MOCK_FILES - operator = ADLSListOperator(task_id=TASK_ID, path=TEST_PATH) + operator = ADLSListOperator(task_id=TASK_ID, file_system_name=TEST_FILE_SYSTEM_NAME, path=TEST_PATH) files = operator.execute(None) - mock_hook.return_value.list.assert_called_once_with(path=TEST_PATH) + mock_hook.return_value.list_files_directory.assert_called_once_with( + file_system_name=TEST_FILE_SYSTEM_NAME, + directory_name=TEST_PATH, + ) assert sorted(files) == sorted(MOCK_FILES) From d0270a0ca5d6efa238609415dd620204e5e299c5 Mon Sep 17 00:00:00 2001 From: nailo2c Date: Wed, 28 Jan 2026 15:56:16 -0800 Subject: [PATCH 2/3] Update ADLSListOperator example dag --- .../azure/tests/system/microsoft/azure/example_adls_list.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/providers/microsoft/azure/tests/system/microsoft/azure/example_adls_list.py b/providers/microsoft/azure/tests/system/microsoft/azure/example_adls_list.py index 26e65e0cf8d9c..9c71a1e4bba5c 100644 --- a/providers/microsoft/azure/tests/system/microsoft/azure/example_adls_list.py +++ b/providers/microsoft/azure/tests/system/microsoft/azure/example_adls_list.py @@ -37,7 +37,8 @@ # [START howto_operator_adls_list] adls_files = ADLSListOperator( task_id="adls_files", - path="folder/output/*.parquet", + file_system_name="mycontainer", + path="folder/output", azure_data_lake_conn_id="azure_data_lake_default", ) # [END howto_operator_adls_list] From 3fbe3cffe1330e52a4b495a10e56f6fc0db717e8 Mon Sep 17 00:00:00 2001 From: nailo2c Date: Thu, 29 Jan 2026 13:45:13 -0800 Subject: [PATCH 3/3] Add file_system_name parameter to ADLSToGCSOperator and update tests --- .../google/cloud/transfers/adls_to_gcs.py | 8 +++++++- .../google/cloud/transfers/test_adls_to_gcs.py | 14 +++++++++----- 2 files changed, 16 insertions(+), 6 deletions(-) diff --git a/providers/google/src/airflow/providers/google/cloud/transfers/adls_to_gcs.py b/providers/google/src/airflow/providers/google/cloud/transfers/adls_to_gcs.py index 9bd75b14ac33b..bfbe73f31945a 100644 --- a/providers/google/src/airflow/providers/google/cloud/transfers/adls_to_gcs.py +++ b/providers/google/src/airflow/providers/google/cloud/transfers/adls_to_gcs.py @@ -110,6 +110,7 @@ def __init__( *, src_adls: str, dest_gcs: str, + file_system_name: str, azure_data_lake_conn_id: str, gcp_conn_id: str = "google_cloud_default", replace: bool = False, @@ -117,7 +118,12 @@ def __init__( google_impersonation_chain: str | Sequence[str] | None = None, **kwargs, ) -> None: - super().__init__(path=src_adls, azure_data_lake_conn_id=azure_data_lake_conn_id, **kwargs) + super().__init__( + file_system_name=file_system_name, + path=src_adls, + azure_data_lake_conn_id=azure_data_lake_conn_id, + **kwargs, + ) self.src_adls = src_adls self.dest_gcs = dest_gcs diff --git a/providers/google/tests/unit/google/cloud/transfers/test_adls_to_gcs.py b/providers/google/tests/unit/google/cloud/transfers/test_adls_to_gcs.py index 914cbdc672339..1dfdf0ffc0adc 100644 --- a/providers/google/tests/unit/google/cloud/transfers/test_adls_to_gcs.py +++ b/providers/google/tests/unit/google/cloud/transfers/test_adls_to_gcs.py @@ -24,6 +24,7 @@ TASK_ID = "test-adls-gcs-operator" ADLS_PATH_1 = "*" GCS_PATH = "gs://test/" +TEST_FILE_SYSTEM_NAME = "test-container" MOCK_FILES = [ "test/TEST1.csv", "test/TEST2.csv", @@ -44,6 +45,7 @@ def test_init(self): task_id=TASK_ID, src_adls=ADLS_PATH_1, dest_gcs=GCS_PATH, + file_system_name=TEST_FILE_SYSTEM_NAME, replace=False, azure_data_lake_conn_id=AZURE_CONN_ID, gcp_conn_id=GCS_CONN_ID, @@ -57,7 +59,7 @@ def test_init(self): assert operator.azure_data_lake_conn_id == AZURE_CONN_ID @mock.patch("airflow.providers.google.cloud.transfers.adls_to_gcs.AzureDataLakeHook") - @mock.patch("airflow.providers.microsoft.azure.operators.adls.AzureDataLakeHook") + @mock.patch("airflow.providers.microsoft.azure.operators.adls.AzureDataLakeStorageV2Hook") @mock.patch("airflow.providers.google.cloud.transfers.adls_to_gcs.GCSHook") def test_execute(self, gcs_mock_hook, adls_one_mock_hook, adls_two_mock_hook): """Test the execute function when the run is successful.""" @@ -66,13 +68,14 @@ def test_execute(self, gcs_mock_hook, adls_one_mock_hook, adls_two_mock_hook): task_id=TASK_ID, src_adls=ADLS_PATH_1, dest_gcs=GCS_PATH, + file_system_name=TEST_FILE_SYSTEM_NAME, replace=False, azure_data_lake_conn_id=AZURE_CONN_ID, gcp_conn_id=GCS_CONN_ID, google_impersonation_chain=IMPERSONATION_CHAIN, ) - adls_one_mock_hook.return_value.list.return_value = MOCK_FILES + adls_one_mock_hook.return_value.list_files_directory.return_value = MOCK_FILES adls_two_mock_hook.return_value.list.return_value = MOCK_FILES # gcs_mock_hook.return_value.upload.side_effect = _assert_upload @@ -92,7 +95,7 @@ def test_execute(self, gcs_mock_hook, adls_one_mock_hook, adls_two_mock_hook): any_order=True, ) - adls_one_mock_hook.assert_called_once_with(azure_data_lake_conn_id=AZURE_CONN_ID) + adls_one_mock_hook.assert_called_once_with(adls_conn_id=AZURE_CONN_ID) adls_two_mock_hook.assert_called_once_with(azure_data_lake_conn_id=AZURE_CONN_ID) gcs_mock_hook.assert_called_once_with( gcp_conn_id=GCS_CONN_ID, @@ -103,7 +106,7 @@ def test_execute(self, gcs_mock_hook, adls_one_mock_hook, adls_two_mock_hook): assert sorted(MOCK_FILES) == sorted(uploaded_files) @mock.patch("airflow.providers.google.cloud.transfers.adls_to_gcs.AzureDataLakeHook") - @mock.patch("airflow.providers.microsoft.azure.operators.adls.AzureDataLakeHook") + @mock.patch("airflow.providers.microsoft.azure.operators.adls.AzureDataLakeStorageV2Hook") @mock.patch("airflow.providers.google.cloud.transfers.adls_to_gcs.GCSHook") def test_execute_with_gzip(self, gcs_mock_hook, adls_one_mock_hook, adls_two_mock_hook): """Test the execute function when the run is successful.""" @@ -112,13 +115,14 @@ def test_execute_with_gzip(self, gcs_mock_hook, adls_one_mock_hook, adls_two_moc task_id=TASK_ID, src_adls=ADLS_PATH_1, dest_gcs=GCS_PATH, + file_system_name=TEST_FILE_SYSTEM_NAME, replace=False, azure_data_lake_conn_id=AZURE_CONN_ID, gcp_conn_id=GCS_CONN_ID, gzip=True, ) - adls_one_mock_hook.return_value.list.return_value = MOCK_FILES + adls_one_mock_hook.return_value.list_files_directory.return_value = MOCK_FILES adls_two_mock_hook.return_value.list.return_value = MOCK_FILES # gcs_mock_hook.return_value.upload.side_effect = _assert_upload