diff --git a/.github/workflows/unit-testing.yml b/.github/workflows/unit-testing.yml index eaf8a20..e1f8eaf 100644 --- a/.github/workflows/unit-testing.yml +++ b/.github/workflows/unit-testing.yml @@ -29,7 +29,7 @@ jobs: with: python-version: ${{ matrix.python-version }} - name: Install pytest tool - run: pip install pytest==8.3.4 pytest-minio-mock==0.4.19 + run: pip install pytest==8.3.4 - name: Install library dependencies run: pip install -e python-lib/tc_etl_lib - name: Test with pytest diff --git a/python-lib/tc_etl_lib/README.md b/python-lib/tc_etl_lib/README.md index 97af2b2..cd79b57 100644 --- a/python-lib/tc_etl_lib/README.md +++ b/python-lib/tc_etl_lib/README.md @@ -239,36 +239,35 @@ iotam: tc.iota.iotaManager = tc.iota.iotaManager(endpoint = 'http://": "", "": ""}, {"": "", "": ""}]) ``` -Ejemplo de uso de la clase minioManager +Ejemplo de uso de la clase objectStorageManager ```python # import library import tc_etl_lib as tc -# declare minioManager and get initialized client -minio_manager = tc.minioManager(endpoint=':', - access_key='', - secret_key='') - +# declare objectStorageManager +object_storage_manager = tc.objectStorageManager(endpoint='://:', + access_key='', + secret_key='') # Upload test-file.txt to python-test-bucket/output/example.txt # note test-file.txt must exist in the same directory where this example is run -minio_manager.upload_file(bucket_name='python-test-bucket', - destination_file='/output/example.txt', - source_file="test-file.txt") +object_storage_manager.upload_file(bucket_name='python-test-bucket', + destination_file='/output/example.txt', + source_file="test-file.txt") # You can define your own custom processing method and use it in the processing_method argument of the process_file method def process_chunk(file_chunk): print(file_chunk) # Retrieve example.txt and apply custom method to each 3 bytes chunk -minio_manager.process_file(bucket_name='python-test-bucket', - destination_file='/output/example.txt', - chunk_size=3, - processing_method=process_chunk) +object_storage_manager.process_file(bucket_name='python-test-bucket', + destination_file='/output/example.txt', + chunk_size=3, + processing_method=process_chunk) # Remove the bucket created in the upload file method -minio_manager.remove_bucket(minio_client, "python-test-bucket") +object_storage_manager.remove_bucket("python-test-bucket") ``` ## Funciones disponibles en la librería @@ -410,26 +409,26 @@ La librería está creada con diferentes clases dependiendo de la funcionalidad - :param obligatorio: `data`: Datos a enviar. Puede ser una lista de diccionarios o un DataFrame. - :raises SendBatchError: Se levanta cuando se produce una excepción dentro de `send_http`. Atrapa la excepción original y se guarda y se imprime el índice donde se produjo el error. -- Clase `minioManager`: En esta clase están las funciones relacionadas con la solución de almacenamiento de objetos MinIO. +- Clase `objectStorageManager`: En esta clase están las funciones relacionadas con la solución de almacenamiento de objetos. - `__init__`: constructor de objetos de la clase. - - :param obligatorio `endpoint`: enpoint de acceso a MinIO - - :param obligatorio `access_key`: usuario necesario para hacer login en MinIO - - :param obligatorio `secret_key`: contraseña necesaria para hacer login en MinIO - - :param optional `secure`: flag para indicar si la conexión con MinIO usa https (True) o http (False). Por defecto se considera `True` si se omite el parámetro. + - :param obligatorio `endpoint`: enpoint de acceso a nuestro servicio de object storage + - :param obligatorio `access_key`: usuario necesario para hacer login en nuestro servicio de object storage + - :param obligatorio `secret_key`: contraseña necesaria para hacer login en nuestro servicio de object storage - :raises [ValueError](https://docs.python.org/3/library/exceptions.html#ValueError): Se lanza cuando le falta alguno de los argumentos obligatorios. - `create_bucket`: crea el bucket si no existe, si existe no hace nada. - :param obligatorio `bucket_name`: nombre del bucket a crear. - `remove_bucket`: borra el bucket si existe, si no existe no hace nada. - :param obligatorio `bucket_name`: nombre del bucket a borrar. - - `upload_file`: sube un fichero a MinIO (si ya existe lo sobreescribe). Si el bucket al que se sube no existe se crea previamente. + - :raises [Exception](https://docs.python.org/3/library/exceptions.html#Exception): Se lanza cuando se captura una excepción en el borrado del bucket + - `upload_file`: sube un fichero (si ya existe lo sobreescribe). Si el bucket al que se sube no existe se crea previamente. - :param obligatorio `bucket_name`: nombre del bucket donde se va a subir el fichero. - - :param obligatorio `destination_file`: nombre del fichero en MinIO (puede incluir el path SIN el nombre del bucket al inicio). + - :param obligatorio `destination_file`: nombre del fichero en el bucket (puede incluir el path SIN el nombre del bucket al inicio). - :param obligatorio `source_file`: nombre del fichero local a subir (puede incluir el path). - - :return: objeto con el estado de la subida del fichero. - - `process_file`: procesa un fichero de MinIO por fragmentos y le aplica a cada fragmento la función provista. + - :raises [Exception](https://docs.python.org/3/library/exceptions.html#Exception): Se lanza cuando se captura una excepción en la subida del fichero + - `process_file`: procesa por fragmentos un fichero subido y le aplica a cada fragmento la función provista. - :param obligatorio `bucket_name`: nombre del bucket donde se va a buscar el fichero. - - :param obligatorio `file`: nombre del fichero en MinIO (puede incluir el path SIN el nombre del bucket al inicio). + - :param obligatorio `file`: nombre del fichero en el bucket (puede incluir el path SIN el nombre del bucket al inicio). - :param obligatorio `processing_method`: método a aplicar a cada fragmento del fichero. - :param optional `chunk_size`: tamaño en bytes de cada fragmento del fichero a recuperar. Por defecto 500000 bytes si se omite el argumento - :raises [Exception](https://docs.python.org/3/library/exceptions.html#Exception): Se lanza cuando se captura una excepción en el procesamiento del fichero @@ -567,7 +566,7 @@ TOTAL 403 221 45% ## Changelog -- Add: new class `minioManager` to manage MinIO connection and file processing ([#109](https://github.com/telefonicasc/etl-framework/issues/109)) +- Add: new class `objectStorageBucket` to manage bucket based object storage compatible with S3 API (such as AWS S3 or MINIMO) ([#109](https://github.com/telefonicasc/etl-framework/issues/109)) 0.16.0 (September 29th, 2025) diff --git a/python-lib/tc_etl_lib/setup.py b/python-lib/tc_etl_lib/setup.py index 16042b3..d201ac7 100644 --- a/python-lib/tc_etl_lib/setup.py +++ b/python-lib/tc_etl_lib/setup.py @@ -47,7 +47,8 @@ # La última release de numpy antes de 2.0.0 es 1.26.4. # La última release de numpy compatible con python 3.8 es 1.24.4 'numpy==1.24.4', - 'minio==7.2.7' + 'boto3==1.37.38' + ] INSTALL_REQUIRES_PYTHON_3_12 = [ 'requests>=2.28.2,<2.33.0', @@ -55,7 +56,7 @@ 'psycopg2-binary>=2.9.5', 'pandas==2.2.2', 'numpy==2.2.0', - 'minio==7.2.18' + 'boto3==1.40.55' ] setup( diff --git a/python-lib/tc_etl_lib/tc_etl_lib/__init__.py b/python-lib/tc_etl_lib/tc_etl_lib/__init__.py index 7f903a9..c717680 100644 --- a/python-lib/tc_etl_lib/tc_etl_lib/__init__.py +++ b/python-lib/tc_etl_lib/tc_etl_lib/__init__.py @@ -25,4 +25,4 @@ from .iota import iotaManager from .store import Store, orionStore, sqlFileStore from .normalizer import normalizer -from .minio import minioManager +from .object_storage import objectStorageManager diff --git a/python-lib/tc_etl_lib/tc_etl_lib/minio.py b/python-lib/tc_etl_lib/tc_etl_lib/object_storage.py similarity index 68% rename from python-lib/tc_etl_lib/tc_etl_lib/minio.py rename to python-lib/tc_etl_lib/tc_etl_lib/object_storage.py index 495d84b..eee92ed 100644 --- a/python-lib/tc_etl_lib/tc_etl_lib/minio.py +++ b/python-lib/tc_etl_lib/tc_etl_lib/object_storage.py @@ -19,32 +19,28 @@ # along with IoT orchestrator. If not, see http://www.gnu.org/licenses/. """ -Minio routines for Python: - - minioManager. +Object storage routines for Python: + - objectStorageManager. """ -from minio import Minio from typing import Optional, cast import logging +import boto3 logger = logging.getLogger(__name__) -class minioManager: - """Minio Manager +class objectStorageManager: + """Object storage Manager - endpoint: define minio endpoint - access_key: user to log in to minio - secret_key: password to log in to minio - secure: flag to select if the connection to MinIO is https or http (True by default) - client: authenticated MinIO client + endpoint: define Object storage endpoint + access_key: user to log in to Object storage + secret_key: password to log in to Object storage """ endpoint: str access_key: str secret_key: str - secure: bool - client: Minio - def __init__(self, endpoint: Optional[str] = None, access_key: Optional[str] = None, secret_key: Optional[str] = None, secure=True): + def __init__(self, endpoint: Optional[str] = None, access_key: Optional[str] = None, secret_key: Optional[str] = None): messageError = [] if endpoint is None: @@ -61,27 +57,27 @@ def __init__(self, endpoint: Optional[str] = None, access_key: Optional[str] = N if len(messageError) != 1: defineParams = " and ".join( [", ".join(messageError[:-1]), messageError[-1]]) - raise ValueError(f'You must define {defineParams} in minioManager') + raise ValueError(f'You must define {defineParams} in objectStorageManager') # At this point, all Optional[str] have been validated to be not None. # cast them to let type checker knows. self.endpoint = cast(str, endpoint) self.access_key = cast(str, access_key) self.secret_key = cast(str, secret_key) - self.secure = secure self.client = self.__init_client() def __init_client(self): """ - Create a MinIO client with the class endpoint, its access key and secret key. + Create a Object storage client with the class endpoint, its access key and secret key. - :return authenticated MinIO client + :return authenticated Object storage client """ - return Minio( - self.endpoint, - self.access_key, - self.secret_key, - secure=self.secure + return boto3.client( + 's3', + endpoint_url=self.endpoint, + aws_access_key_id=self.access_key, + aws_secret_access_key=self.secret_key, + aws_session_token=None ) def create_bucket(self, bucket_name): @@ -90,12 +86,12 @@ def create_bucket(self, bucket_name): :param bucket_name: name of the bucket where the file is located """ - found = self.client.bucket_exists(bucket_name) - if not found: - self.client.make_bucket(bucket_name) + try: + self.client.create_bucket(Bucket=bucket_name) logger.debug(f'Created bucket ({bucket_name})') - else: - logger.debug(f'Bucket {bucket_name} already exists') + except Exception as e: + # BucketAlreadyExists or BucketAlreadyOwnedByYou + logger.debug(f'Error creating the bucket: {e}') def remove_bucket(self, bucket_name): """ @@ -103,12 +99,12 @@ def remove_bucket(self, bucket_name): :param bucket_name: name of the bucket where the file is located """ - found = self.client.bucket_exists(bucket_name) - if found: - self.client.remove_bucket(bucket_name) + try: + self.client.delete_bucket(Bucket=bucket_name) logger.debug(f'Removed bucket {bucket_name}') - else: - logger.debug(f'Bucket {bucket_name} doesnt exist') + except Exception as e: + logger.debug(f'An error ocurred while deleting {bucket_name}: {e}') + raise Exception(f'An error ocurred while deleting {bucket_name}: {e}') def upload_file(self, bucket_name, destination_file, source_file): """ @@ -117,18 +113,17 @@ def upload_file(self, bucket_name, destination_file, source_file): :param bucket_name: name of the bucket where the file is located :param destination_file: name of the file to retrieve (can include path without bucket_name) :param source_file: name of the file to upload (can include path) - :return object with the status of the upload """ # Bucket must exist before uploading file self.create_bucket(bucket_name) logger.debug( f'Uploading {source_file} as object {destination_file} to bucket {bucket_name}') - return self.client.fput_object( - bucket_name, - object_name=destination_file, - file_path=source_file, - ) + try: + self.client.upload_file(source_file, bucket_name, destination_file) + except Exception as e: + logger.debug(f'An error ocurred while uploading the file: {e}') + raise Exception(f'An error ocurred while uploading the file: {e}') def process_file(self, bucket_name, file, processing_method, chunk_size=500000): """Retrieves a file in chunks and applies a function to each chunk @@ -138,22 +133,19 @@ def process_file(self, bucket_name, file, processing_method, chunk_size=500000): :param processing_method: method to apply to each chunk of the retrieved file :param chunk_size: size in bytes of the chunks to retrieve (500000 by default) """ - file_size = self.client.stat_object( - bucket_name, object_name=file).size or 0 + file_size = self.client.get_object_attributes( + Bucket=bucket_name, Key=file, ObjectAttributes=['ObjectSize'])['ObjectSize'] or 0 - response = None for offset in range(0, file_size, chunk_size): # Get the file try: + byte_range = f'bytes={offset}-{offset+chunk_size-1}' response = self.client.get_object( - bucket_name, file, offset, chunk_size) + Bucket=bucket_name, Key=file, Range=byte_range) # response.data returns bytes - processing_method(response.data) + processing_method(response['Body'].read()) except Exception as e: raise Exception( f'An error occured while processing the file: {e}') logger.debug(f'Processing ended.') - if response: - response.close() - response.release_conn() diff --git a/python-lib/tc_etl_lib/tc_etl_lib/test_minio_manager.py b/python-lib/tc_etl_lib/tc_etl_lib/test_minio_manager.py deleted file mode 100644 index ebac4ea..0000000 --- a/python-lib/tc_etl_lib/tc_etl_lib/test_minio_manager.py +++ /dev/null @@ -1,117 +0,0 @@ -# -*- coding: utf-8 -*- -# -# Copyright 2023 Telefónica Soluciones de Informática y Comunicaciones de España, S.A.U. -# -# This file is part of tc_etl_lib -# -# tc_etl_lib is free software: you can redistribute it and/or -# modify it under the terms of the GNU Affero General Public License as -# published by the Free Software Foundation, either version 3 of the -# License, or (at your option) any later version. -# -# tc_etl_lib is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero -# General Public License for more details. -# -# You should have received a copy of the GNU Affero General Public License -# along with IoT orchestrator. If not, see http://www.gnu.org/licenses/. - -''' -MinIO Manager tests. -''' - -from pytest_minio_mock.plugin import minio_mock -from unittest import mock -from tc_etl_lib.minio import minioManager -import os - - -def init_minio_manager(): - return minioManager( - endpoint='localhost:9000', - access_key='admin', - secret_key='admin123') - - -def test_create_bucket(minio_mock): - minio_manager = init_minio_manager() - - minio_manager.create_bucket("test_bucket") - buckets = minio_manager.client.list_buckets() - assert len(buckets) == 1 - - -def test_remove_bucket(minio_mock): - minio_manager = init_minio_manager() - - minio_manager.create_bucket("test_bucket") - minio_manager.remove_bucket("test_bucket") - buckets = minio_manager.client.list_buckets() - assert len(buckets) == 0 - - -def test_upload_file(minio_mock): - minio_manager = init_minio_manager() - bucket_name = 'test-bucket' - file = 'test_minioManager_file.txt' - - # Create the test file if it doesnt exist - fichero_test = open(file, "w") - fichero_test.write("Test text") - fichero_test.close() - - minio_manager.create_bucket(bucket_name) - result = minio_manager.upload_file(bucket_name, - destination_file=file, - source_file=file) - - # Remove the test file - os.remove(file) - # pytest_minio_mock returns a string while real minio returns an object - assert result == "Upload successful" - - -def test_process_file(minio_mock): - minio_manager = init_minio_manager() - bucket_name = 'test-bucket' - file = "test-minioManager-file.txt" - out_file_name = "out.txt" - - # Create the test file if it doesnt exist - fichero_test = open(file, "w") - fichero_test.write("Test text") - fichero_test.close() - - minio_manager.create_bucket(bucket_name) - - minio_manager.upload_file(bucket_name, - destination_file=file, - source_file=file) - - # Custom processing method that saves locally the minio file - def test_processingMethod(file_chunk): - fichero_procesado = open(out_file_name, "ab") - fichero_procesado.write(file_chunk) - fichero_procesado.close() - - class obectStat: - size = 9 - - mocked_return = obectStat() - with mock.patch('pytest_minio_mock.plugin.MockMinioObject.stat_object', return_value=mocked_return) as irrelevant: - minio_manager.process_file(bucket_name, - file=file, - chunk_size=9, - processing_method=test_processingMethod) - - # Reads the out file - out_file = open(out_file_name, "r") - result = out_file.read() - out_file.close() - - # Remove the created files - os.remove(file) - os.remove(out_file_name) - # Check the downloaded file content is equal to the uploaded one - assert result == "Test text" diff --git a/python-lib/tc_etl_lib/tc_etl_lib/test_object_storage.py b/python-lib/tc_etl_lib/tc_etl_lib/test_object_storage.py new file mode 100644 index 0000000..0edac9e --- /dev/null +++ b/python-lib/tc_etl_lib/tc_etl_lib/test_object_storage.py @@ -0,0 +1,128 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2023 Telefónica Soluciones de Informática y Comunicaciones de España, S.A.U. +# +# This file is part of tc_etl_lib +# +# tc_etl_lib is free software: you can redistribute it and/or +# modify it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# tc_etl_lib is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero +# General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with IoT orchestrator. If not, see http://www.gnu.org/licenses/. + +''' +Object storage Manager tests. +''' + +from unittest import mock, TestCase +from unittest.mock import patch +from tc_etl_lib.object_storage import objectStorageManager + + +class TestObjectStorageService(TestCase): + + def init_object_storage_manager(self): + return objectStorageManager( + endpoint='http://localhost:9000', + access_key='admin', + secret_key='admin123') + + @patch('tc_etl_lib.object_storage.boto3.client') + def test_create_bucket(self, mock_boto_client): + mock_object_storage_client = mock.MagicMock() + mock_boto_client.return_value = mock_object_storage_client + object_storage_manager = self.init_object_storage_manager() + object_storage_manager.create_bucket("test-bucket") + + mock_object_storage_client.create_bucket.assert_called_once_with( + Bucket='test-bucket' + ) + + @patch('tc_etl_lib.object_storage.boto3.client') + def test_remove_bucket(self, mock_boto_client): + mock_object_storage_client = mock.MagicMock() + mock_boto_client.return_value = mock_object_storage_client + object_storage_manager = self.init_object_storage_manager() + object_storage_manager.remove_bucket("test-bucket") + + mock_object_storage_client.delete_bucket.assert_called_once_with( + Bucket='test-bucket' + ) + + @patch('tc_etl_lib.object_storage.boto3.client') + def test_remove_bucket_error(self, mock_boto_client): + mock_object_storage_client = mock.MagicMock() + mock_boto_client.return_value = mock_object_storage_client + + mock_object_storage_client.delete_bucket.side_effect = Exception( + "Test error removing bucket") + with self.assertRaises(Exception): + object_storage_manager = self.init_object_storage_manager() + object_storage_manager.remove_bucket("test-bucket") + + @patch('tc_etl_lib.object_storage.boto3.client') + def test_upload_file(self, mock_boto_client): + mock_object_storage_client = mock.MagicMock() + mock_boto_client.return_value = mock_object_storage_client + object_storage_manager = self.init_object_storage_manager() + object_storage_manager.upload_file("test-bucket", "destination", "source") + + mock_object_storage_client.upload_file.assert_called_once_with( + "source", "test-bucket", "destination" + ) + + @patch('tc_etl_lib.object_storage.boto3.client') + def test_upload_file_error(self, mock_boto_client): + mock_object_storage_client = mock.MagicMock() + mock_boto_client.return_value = mock_object_storage_client + + mock_object_storage_client.upload_file.side_effect = Exception( + "Test error uploading file") + with self.assertRaises(Exception): + object_storage_manager = self.init_object_storage_manager() + object_storage_manager.upload_file("test-bucket", "destination", "source") + + @patch('tc_etl_lib.object_storage.boto3.client') + def test_process_file(self, mock_boto_client): + mock_object_storage_client = mock.MagicMock() + mock_boto_client.return_value = mock_object_storage_client + fake_attr = {'ObjectSize': 10} + mock_object_storage_client.get_object_attributes.return_value(fake_attr) + + def custom_processing_method(chunk): + pass + object_storage_manager = self.init_object_storage_manager() + object_storage_manager.process_file(bucket_name='test-bucket', + file='file', + processing_method=custom_processing_method, + chunk_size=10) + + mock_object_storage_client.get_object.assert_called_once_with( + Bucket='test-bucket', Key='file', Range='bytes=0-9' + ) + + @patch('tc_etl_lib.object_storage.boto3.client') + def test_process_file_error(self, mock_boto_client): + mock_object_storage_client = mock.MagicMock() + mock_boto_client.return_value = mock_object_storage_client + fake_attr = {'ObjectSize': 10} + mock_object_storage_client.get_object_attributes.return_value(fake_attr) + + def custom_processing_method(chunk): + pass + + mock_object_storage_client.get_object.side_effect = Exception( + "Test error uploading file") + with self.assertRaises(Exception): + object_storage_manager = self.init_object_storage_manager() + object_storage_manager.process_file(bucket_name='test-bucket', + file='file', + processing_method=custom_processing_method, + chunk_size=10) diff --git a/python-lib/test-etl-lib-minio.py b/python-lib/test-etl-lib-object_storage.py similarity index 51% rename from python-lib/test-etl-lib-minio.py rename to python-lib/test-etl-lib-object_storage.py index cdede36..f66d9fa 100644 --- a/python-lib/test-etl-lib-minio.py +++ b/python-lib/test-etl-lib-object_storage.py @@ -20,21 +20,21 @@ import tc_etl_lib as tc -# declare minioManager -minio_manager = tc.minioManager(endpoint=':', - access_key='', - secret_key='') +# declare objectStorageManager +object_storage_manager = tc.objectStorageManager(endpoint='://:', + access_key='', + secret_key='') # Upload test-file.txt to python-test-bucket/output/example.txt -minio_manager.upload_file(bucket_name='python-test-bucket', - destination_file='/output/example.txt', - source_file="test-file.txt") +object_storage_manager.upload_file(bucket_name='python-test-bucket', + destination_file='/output/example.txt', + source_file="test-file.txt") # Retrieve example.txt and apply print method to each 3 bytes -minio_manager.process_file(bucket_name='python-test-bucket', - file='/output/example.txt', - processing_method=print, - chunk_size=3) +object_storage_manager.process_file(bucket_name='python-test-bucket', + file='/output/example.txt', + processing_method=print, + chunk_size=3) # Custom method that writes the file chunks in a CSV (he receives and writes bytes) def customCSVProcessingMethod(file_chunk): @@ -43,12 +43,12 @@ def customCSVProcessingMethod(file_chunk): processed_file.close() # Upload CSV -minio_manager.upload_file(bucket_name='python-test-bucket', - destination_file='/output/reallyBigFile.csv', - source_file="movimientos_padronales_20250822_v2.csv") +object_storage_manager.upload_file(bucket_name='python-test-bucket', + destination_file='/output/reallyBigFile.csv', + source_file="movimientos_padronales_20250822_v2.csv") # Retrieve reallyBigFile.csv and apply customCSVProcessingMethod method to each 1000000 bytes -minio_manager.process_file(bucket_name='python-test-bucket', - file='/output/reallyBigFile.csv', - processing_method=customCSVProcessingMethod, - chunk_size=1000000) +object_storage_manager.process_file(bucket_name='python-test-bucket', + file='/output/reallyBigFile.csv', + processing_method=customCSVProcessingMethod, + chunk_size=1000000)