Skip to content
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/unit-testing.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ jobs:
with:
python-version: ${{ matrix.python-version }}
- name: Install pytest tool
run: pip install pytest==8.3.4 pytest-minio-mock==0.4.19
run: pip install pytest==8.3.4
- name: Install library dependencies
run: pip install -e python-lib/tc_etl_lib
- name: Test with pytest
Expand Down
34 changes: 17 additions & 17 deletions python-lib/tc_etl_lib/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -239,21 +239,20 @@ iotam: tc.iota.iotaManager = tc.iota.iotaManager(endpoint = 'http://<iota_endpoi
iotam.send_batch_http(data=[{"<key_1>": "<value_1>", "<key_2>": "<value_2>"}, {"<key_3>": "<value_3>", "<key_4>": "<value_4>"}])
```

Ejemplo de uso de la clase minioManager
Ejemplo de uso de la clase objectStorageManager

```python
# import library
import tc_etl_lib as tc

# declare minioManager and get initialized client
minio_manager = tc.minioManager(endpoint='<minio_endpoint>:<port>',
# declare objectStorageManager
object_storage_manager = tc.objectStorageManager(endpoint='<http/https>://<object_storage_endpoint>:<port>',
access_key='<user>',
secret_key='<password>')
Comment thread
JuanMartinP marked this conversation as resolved.
Outdated


# Upload test-file.txt to python-test-bucket/output/example.txt
# note test-file.txt must exist in the same directory where this example is run
minio_manager.upload_file(bucket_name='python-test-bucket',
object_storage_manager.upload_file(bucket_name='python-test-bucket',
destination_file='/output/example.txt',
source_file="test-file.txt")
Comment thread
JuanMartinP marked this conversation as resolved.
Outdated

Expand All @@ -262,13 +261,13 @@ def process_chunk(file_chunk):
print(file_chunk)

# Retrieve example.txt and apply custom method to each 3 bytes chunk
minio_manager.process_file(bucket_name='python-test-bucket',
object_storage_manager.process_file(bucket_name='python-test-bucket',
destination_file='/output/example.txt',
chunk_size=3,
processing_method=process_chunk)
Comment thread
JuanMartinP marked this conversation as resolved.
Outdated

# Remove the bucket created in the upload file method
minio_manager.remove_bucket(minio_client, "python-test-bucket")
object_storage_manager.remove_bucket("python-test-bucket")
```

## Funciones disponibles en la librería
Expand Down Expand Up @@ -410,26 +409,26 @@ La librería está creada con diferentes clases dependiendo de la funcionalidad
- :param obligatorio: `data`: Datos a enviar. Puede ser una lista de diccionarios o un DataFrame.
- :raises SendBatchError: Se levanta cuando se produce una excepción dentro de `send_http`. Atrapa la excepción original y se guarda y se imprime el índice donde se produjo el error.

- Clase `minioManager`: En esta clase están las funciones relacionadas con la solución de almacenamiento de objetos MinIO.
- Clase `objectStorageManager`: En esta clase están las funciones relacionadas con la solución de almacenamiento de objetos.

- `__init__`: constructor de objetos de la clase.
- :param obligatorio `endpoint`: enpoint de acceso a MinIO
- :param obligatorio `access_key`: usuario necesario para hacer login en MinIO
- :param obligatorio `secret_key`: contraseña necesaria para hacer login en MinIO
- :param optional `secure`: flag para indicar si la conexión con MinIO usa https (True) o http (False). Por defecto se considera `True` si se omite el parámetro.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

¿con boto no hay opción de conexión no segura?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Boto no tiene un flag específico pero en el endpoint acepta http y https, así que para mi demo por ejemplo me bastó con poner http y ya.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Es decir, que ya en el URL schema (sea https, sea http) lo hace adecuadamente. Mejor que con la lib de Minio entonces :)

NTC (entendido y todo ok)

- :param obligatorio `endpoint`: enpoint de acceso a nuestro servicio de object storage
- :param obligatorio `access_key`: usuario necesario para hacer login en nuestro servicio de object storage
- :param obligatorio `secret_key`: contraseña necesaria para hacer login en nuestro servicio de object storage
- :raises [ValueError](https://docs.python.org/3/library/exceptions.html#ValueError): Se lanza cuando le falta alguno de los argumentos obligatorios.
- `create_bucket`: crea el bucket si no existe, si existe no hace nada.
- :param obligatorio `bucket_name`: nombre del bucket a crear.
- `remove_bucket`: borra el bucket si existe, si no existe no hace nada.
- :param obligatorio `bucket_name`: nombre del bucket a borrar.
- `upload_file`: sube un fichero a MinIO (si ya existe lo sobreescribe). Si el bucket al que se sube no existe se crea previamente.
- :raises [Exception](https://docs.python.org/3/library/exceptions.html#Exception): Se lanza cuando se captura una excepción en el borrado del bucket
- `upload_file`: sube un fichero (si ya existe lo sobreescribe). Si el bucket al que se sube no existe se crea previamente.
- :param obligatorio `bucket_name`: nombre del bucket donde se va a subir el fichero.
- :param obligatorio `destination_file`: nombre del fichero en MinIO (puede incluir el path SIN el nombre del bucket al inicio).
- :param obligatorio `destination_file`: nombre del fichero en el bucket (puede incluir el path SIN el nombre del bucket al inicio).
- :param obligatorio `source_file`: nombre del fichero local a subir (puede incluir el path).
- :return: objeto con el estado de la subida del fichero.
- `process_file`: procesa un fichero de MinIO por fragmentos y le aplica a cada fragmento la función provista.
- :raises [Exception](https://docs.python.org/3/library/exceptions.html#Exception): Se lanza cuando se captura una excepción en la subida del fichero
- `process_file`: procesa por fragmentos un fichero subido y le aplica a cada fragmento la función provista.
- :param obligatorio `bucket_name`: nombre del bucket donde se va a buscar el fichero.
- :param obligatorio `file`: nombre del fichero en MinIO (puede incluir el path SIN el nombre del bucket al inicio).
- :param obligatorio `file`: nombre del fichero en el bucket (puede incluir el path SIN el nombre del bucket al inicio).
- :param obligatorio `processing_method`: método a aplicar a cada fragmento del fichero.
- :param optional `chunk_size`: tamaño en bytes de cada fragmento del fichero a recuperar. Por defecto 500000 bytes si se omite el argumento
- :raises [Exception](https://docs.python.org/3/library/exceptions.html#Exception): Se lanza cuando se captura una excepción en el procesamiento del fichero
Expand Down Expand Up @@ -567,6 +566,7 @@ TOTAL 403 221 45%
## Changelog


- Change: replace `minioManager` backend library to use aws `boto3`
- Add: new class `minioManager` to manage MinIO connection and file processing ([#109](https://github.com/telefonicasc/etl-framework/issues/109))
Comment thread
JuanMartinP marked this conversation as resolved.
Outdated

0.16.0 (September 29th, 2025)
Expand Down
5 changes: 3 additions & 2 deletions python-lib/tc_etl_lib/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,16 @@
# La última release de numpy antes de 2.0.0 es 1.26.4.
# La última release de numpy compatible con python 3.8 es 1.24.4
'numpy==1.24.4',
'minio==7.2.7'
'boto3==1.37.38'

]
INSTALL_REQUIRES_PYTHON_3_12 = [
'requests>=2.28.2,<2.33.0',
'urllib3==1.26.16',
'psycopg2-binary>=2.9.5',
'pandas==2.2.2',
'numpy==2.2.0',
'minio==7.2.18'
'boto3==1.40.55'
]

setup(
Expand Down
2 changes: 1 addition & 1 deletion python-lib/tc_etl_lib/tc_etl_lib/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,4 @@
from .iota import iotaManager
from .store import Store, orionStore, sqlFileStore
from .normalizer import normalizer
from .minio import minioManager
from .object_storage import objectStorageManager
Original file line number Diff line number Diff line change
Expand Up @@ -19,32 +19,28 @@
# along with IoT orchestrator. If not, see http://www.gnu.org/licenses/.

"""
Minio routines for Python:
- minioManager.
Object storage routines for Python:
- objectStorageManager.
"""
from minio import Minio
from typing import Optional, cast
import logging
import boto3

logger = logging.getLogger(__name__)


class minioManager:
"""Minio Manager
class objectStorageManager:
"""Object storage Manager

endpoint: define minio endpoint
access_key: user to log in to minio
secret_key: password to log in to minio
secure: flag to select if the connection to MinIO is https or http (True by default)
client: authenticated MinIO client
endpoint: define Object storage endpoint
access_key: user to log in to Object storage
secret_key: password to log in to Object storage
"""
endpoint: str
access_key: str
secret_key: str
secure: bool
client: Minio

def __init__(self, endpoint: Optional[str] = None, access_key: Optional[str] = None, secret_key: Optional[str] = None, secure=True):
def __init__(self, endpoint: Optional[str] = None, access_key: Optional[str] = None, secret_key: Optional[str] = None):

messageError = []
if endpoint is None:
Expand All @@ -61,27 +57,27 @@ def __init__(self, endpoint: Optional[str] = None, access_key: Optional[str] = N
if len(messageError) != 1:
defineParams = " and ".join(
[", ".join(messageError[:-1]), messageError[-1]])
raise ValueError(f'You must define {defineParams} in minioManager')
raise ValueError(f'You must define {defineParams} in objectStorageManager')

# At this point, all Optional[str] have been validated to be not None.
# cast them to let type checker knows.
self.endpoint = cast(str, endpoint)
self.access_key = cast(str, access_key)
self.secret_key = cast(str, secret_key)
self.secure = secure
self.client = self.__init_client()

def __init_client(self):
"""
Create a MinIO client with the class endpoint, its access key and secret key.
Create a Object storage client with the class endpoint, its access key and secret key.

:return authenticated MinIO client
:return authenticated Object storage client
"""
return Minio(
self.endpoint,
self.access_key,
self.secret_key,
secure=self.secure
Comment on lines -80 to -84
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Esto haría desaparecer el from minio import Minio y, en cascada, la dependencia de minio en setup.py, ¿no?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Es la intención si, pero como la librería oficial de aws para s3 es más compleja (o tiene una documentación más enrevesada al menos) lo estoy intentando hacer progresivamente.

return boto3.client(
's3',
endpoint_url=self.endpoint,
aws_access_key_id=self.access_key,
aws_secret_access_key=self.secret_key,
aws_session_token=None
)

def create_bucket(self, bucket_name):
Expand All @@ -90,25 +86,25 @@ def create_bucket(self, bucket_name):

:param bucket_name: name of the bucket where the file is located
"""
found = self.client.bucket_exists(bucket_name)
if not found:
self.client.make_bucket(bucket_name)
try:
self.client.create_bucket(Bucket=bucket_name)
logger.debug(f'Created bucket ({bucket_name})')
else:
logger.debug(f'Bucket {bucket_name} already exists')
except Exception as e:
# BucketAlreadyExists or BucketAlreadyOwnedByYou
logger.debug(f'Error creating the bucket: {e}')

def remove_bucket(self, bucket_name):
"""
Remove the bucket if it exists.

:param bucket_name: name of the bucket where the file is located
"""
found = self.client.bucket_exists(bucket_name)
if found:
self.client.remove_bucket(bucket_name)
try:
self.client.delete_bucket(Bucket=bucket_name)
logger.debug(f'Removed bucket {bucket_name}')
else:
logger.debug(f'Bucket {bucket_name} doesnt exist')
except Exception as e:
logger.debug(f'An error ocurred while deleting {bucket_name}: {e}')
raise Exception(f'An error ocurred while deleting {bucket_name}: {e}')

def upload_file(self, bucket_name, destination_file, source_file):
"""
Expand All @@ -117,18 +113,17 @@ def upload_file(self, bucket_name, destination_file, source_file):
:param bucket_name: name of the bucket where the file is located
:param destination_file: name of the file to retrieve (can include path without bucket_name)
:param source_file: name of the file to upload (can include path)
:return object with the status of the upload
"""
# Bucket must exist before uploading file
self.create_bucket(bucket_name)

logger.debug(
f'Uploading {source_file} as object {destination_file} to bucket {bucket_name}')
return self.client.fput_object(
bucket_name,
object_name=destination_file,
file_path=source_file,
)
try:
self.client.upload_file(source_file, bucket_name, destination_file)
except Exception as e:
logger.debug(f'An error ocurred while uploading the file: {e}')
raise Exception(f'An error ocurred while uploading the file: {e}')

def process_file(self, bucket_name, file, processing_method, chunk_size=500000):
"""Retrieves a file in chunks and applies a function to each chunk
Expand All @@ -138,22 +133,19 @@ def process_file(self, bucket_name, file, processing_method, chunk_size=500000):
:param processing_method: method to apply to each chunk of the retrieved file
:param chunk_size: size in bytes of the chunks to retrieve (500000 by default)
"""
file_size = self.client.stat_object(
bucket_name, object_name=file).size or 0
file_size = self.client.get_object_attributes(
Bucket=bucket_name, Key=file, ObjectAttributes=['ObjectSize'])['ObjectSize'] or 0

response = None
for offset in range(0, file_size, chunk_size):
# Get the file
try:
byte_range = f'bytes={offset}-{offset+chunk_size-1}'
response = self.client.get_object(
bucket_name, file, offset, chunk_size)
Bucket=bucket_name, Key=file, Range=byte_range)
# response.data returns bytes
processing_method(response.data)
processing_method(response['Body'].read())
except Exception as e:
raise Exception(
f'An error occured while processing the file: {e}')

logger.debug(f'Processing ended.')
if response:
response.close()
response.release_conn()
Loading