Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -110,14 +110,20 @@ def __init__(
*,
src_adls: str,
dest_gcs: str,
file_system_name: str,
azure_data_lake_conn_id: str,
gcp_conn_id: str = "google_cloud_default",
replace: bool = False,
gzip: bool = False,
google_impersonation_chain: str | Sequence[str] | None = None,
**kwargs,
) -> None:
super().__init__(path=src_adls, azure_data_lake_conn_id=azure_data_lake_conn_id, **kwargs)
super().__init__(
file_system_name=file_system_name,
path=src_adls,
azure_data_lake_conn_id=azure_data_lake_conn_id,
**kwargs,
)

self.src_adls = src_adls
self.dest_gcs = dest_gcs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
TASK_ID = "test-adls-gcs-operator"
ADLS_PATH_1 = "*"
GCS_PATH = "gs://test/"
TEST_FILE_SYSTEM_NAME = "test-container"
MOCK_FILES = [
"test/TEST1.csv",
"test/TEST2.csv",
Expand All @@ -44,6 +45,7 @@ def test_init(self):
task_id=TASK_ID,
src_adls=ADLS_PATH_1,
dest_gcs=GCS_PATH,
file_system_name=TEST_FILE_SYSTEM_NAME,
replace=False,
azure_data_lake_conn_id=AZURE_CONN_ID,
gcp_conn_id=GCS_CONN_ID,
Expand All @@ -57,7 +59,7 @@ def test_init(self):
assert operator.azure_data_lake_conn_id == AZURE_CONN_ID

@mock.patch("airflow.providers.google.cloud.transfers.adls_to_gcs.AzureDataLakeHook")
@mock.patch("airflow.providers.microsoft.azure.operators.adls.AzureDataLakeHook")
@mock.patch("airflow.providers.microsoft.azure.operators.adls.AzureDataLakeStorageV2Hook")
@mock.patch("airflow.providers.google.cloud.transfers.adls_to_gcs.GCSHook")
def test_execute(self, gcs_mock_hook, adls_one_mock_hook, adls_two_mock_hook):
"""Test the execute function when the run is successful."""
Expand All @@ -66,13 +68,14 @@ def test_execute(self, gcs_mock_hook, adls_one_mock_hook, adls_two_mock_hook):
task_id=TASK_ID,
src_adls=ADLS_PATH_1,
dest_gcs=GCS_PATH,
file_system_name=TEST_FILE_SYSTEM_NAME,
replace=False,
azure_data_lake_conn_id=AZURE_CONN_ID,
gcp_conn_id=GCS_CONN_ID,
google_impersonation_chain=IMPERSONATION_CHAIN,
)

adls_one_mock_hook.return_value.list.return_value = MOCK_FILES
adls_one_mock_hook.return_value.list_files_directory.return_value = MOCK_FILES
adls_two_mock_hook.return_value.list.return_value = MOCK_FILES

# gcs_mock_hook.return_value.upload.side_effect = _assert_upload
Expand All @@ -92,7 +95,7 @@ def test_execute(self, gcs_mock_hook, adls_one_mock_hook, adls_two_mock_hook):
any_order=True,
)

adls_one_mock_hook.assert_called_once_with(azure_data_lake_conn_id=AZURE_CONN_ID)
adls_one_mock_hook.assert_called_once_with(adls_conn_id=AZURE_CONN_ID)
adls_two_mock_hook.assert_called_once_with(azure_data_lake_conn_id=AZURE_CONN_ID)
gcs_mock_hook.assert_called_once_with(
gcp_conn_id=GCS_CONN_ID,
Expand All @@ -103,7 +106,7 @@ def test_execute(self, gcs_mock_hook, adls_one_mock_hook, adls_two_mock_hook):
assert sorted(MOCK_FILES) == sorted(uploaded_files)

@mock.patch("airflow.providers.google.cloud.transfers.adls_to_gcs.AzureDataLakeHook")
@mock.patch("airflow.providers.microsoft.azure.operators.adls.AzureDataLakeHook")
@mock.patch("airflow.providers.microsoft.azure.operators.adls.AzureDataLakeStorageV2Hook")
@mock.patch("airflow.providers.google.cloud.transfers.adls_to_gcs.GCSHook")
def test_execute_with_gzip(self, gcs_mock_hook, adls_one_mock_hook, adls_two_mock_hook):
"""Test the execute function when the run is successful."""
Expand All @@ -112,13 +115,14 @@ def test_execute_with_gzip(self, gcs_mock_hook, adls_one_mock_hook, adls_two_moc
task_id=TASK_ID,
src_adls=ADLS_PATH_1,
dest_gcs=GCS_PATH,
file_system_name=TEST_FILE_SYSTEM_NAME,
replace=False,
azure_data_lake_conn_id=AZURE_CONN_ID,
gcp_conn_id=GCS_CONN_ID,
gzip=True,
)

adls_one_mock_hook.return_value.list.return_value = MOCK_FILES
adls_one_mock_hook.return_value.list_files_directory.return_value = MOCK_FILES
adls_two_mock_hook.return_value.list.return_value = MOCK_FILES

# gcs_mock_hook.return_value.upload.side_effect = _assert_upload
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,21 +126,33 @@ class ADLSListOperator(BaseOperator):
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:ADLSListOperator`

:param path: The Azure Data Lake path to find the objects. Supports glob strings (templated)
:param file_system_name: Name of the file system (container) in ADLS Gen2.
:param path: The directory path within the file system to list files from (templated).
:param azure_data_lake_conn_id: Reference to the :ref:`Azure Data Lake connection<howto/connection:adl>`.
"""

template_fields: Sequence[str] = ("path",)
ui_color = "#901dd2"

def __init__(
self, *, path: str, azure_data_lake_conn_id: str = DEFAULT_AZURE_DATA_LAKE_CONN_ID, **kwargs
self,
*,
file_system_name: str,
path: str,
azure_data_lake_conn_id: str = DEFAULT_AZURE_DATA_LAKE_CONN_ID,
**kwargs,
) -> None:
super().__init__(**kwargs)
self.file_system_name = file_system_name
self.path = path
self.azure_data_lake_conn_id = azure_data_lake_conn_id

def execute(self, context: Context) -> list:
hook = AzureDataLakeHook(azure_data_lake_conn_id=self.azure_data_lake_conn_id)
self.log.info("Getting list of ADLS files in path: %s", self.path)
return hook.list(path=self.path)
hook = AzureDataLakeStorageV2Hook(adls_conn_id=self.azure_data_lake_conn_id)
self.log.info(
"Getting list of ADLS files in file system %s, path: %s", self.file_system_name, self.path
)
return hook.list_files_directory(
file_system_name=self.file_system_name,
directory_name=self.path,
)
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@
# [START howto_operator_adls_list]
adls_files = ADLSListOperator(
task_id="adls_files",
path="folder/output/*.parquet",
file_system_name="mycontainer",
path="folder/output",
azure_data_lake_conn_id="azure_data_lake_default",
)
# [END howto_operator_adls_list]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@
from airflow.providers.microsoft.azure.operators.adls import ADLSListOperator

TASK_ID = "test-adls-list-operator"
TEST_PATH = "test/*"
TEST_FILE_SYSTEM_NAME = "test-container"
TEST_PATH = "test/path"
MOCK_FILES = [
"test/TEST1.csv",
"test/TEST2.csv",
Expand All @@ -33,12 +34,15 @@


class TestAzureDataLakeStorageListOperator:
@mock.patch("airflow.providers.microsoft.azure.operators.adls.AzureDataLakeHook")
@mock.patch("airflow.providers.microsoft.azure.operators.adls.AzureDataLakeStorageV2Hook")
def test_execute(self, mock_hook):
mock_hook.return_value.list.return_value = MOCK_FILES
mock_hook.return_value.list_files_directory.return_value = MOCK_FILES

operator = ADLSListOperator(task_id=TASK_ID, path=TEST_PATH)
operator = ADLSListOperator(task_id=TASK_ID, file_system_name=TEST_FILE_SYSTEM_NAME, path=TEST_PATH)

files = operator.execute(None)
mock_hook.return_value.list.assert_called_once_with(path=TEST_PATH)
mock_hook.return_value.list_files_directory.assert_called_once_with(
file_system_name=TEST_FILE_SYSTEM_NAME,
directory_name=TEST_PATH,
)
assert sorted(files) == sorted(MOCK_FILES)