Este projeto implementa um pipeline de dados para processar arquivos da Bovespa usando AWS Glue (Spark), armazenar os dados refinados no S3 particionados, e consultar via Athena. Uma função Lambda orquestra a execução do job Glue quando novos dados chegam no bucket.
- Ingestão: arquivos brutos (Parquet/CSV) em
s3://<bucket>/raw/ - Processamento: job Glue
bovespa-etl-jobaplica padronização de colunas, cálculos de janelas (7d, 30d), agregações diárias e escreve ems3://<bucket>/refined/particionado poryear/month/day/Simbolo - Catálogo: tabela
bovespa_refined_datano databasebovespa_database - Consulta: Athena com
MSCK REPAIR TABLEpara sincronizar partições - Orquestração: Lambda dispara o job Glue com argumentos apropriados
glue_etl_job.py: script do job Glue com transformações e escritadeploy_glue_job.sh: script para publicar o código no S3 e criar/atualizar o job Gluelambda-package/lambda_function.py: função Lambda que inicia o job Glues3_setup.sh,aws_setup_commands.sh: utilitários de configuraçãoathena_queries.sql: consultas de exemplo
- Conta AWS com Glue, S3, Athena e Lambda habilitados
- CLI da AWS configurada (
aws configure) e permissões adequadas - Python 3.9+ localmente (para empacotar/deploy)
-
Defina variáveis no arquivo
.envou no ambiente:S3_BUCKET_NAME— nome do bucket S3 (ex.:bovespa-pipeline-dados-YYYYMMDDHHMMSS)JOB_NAME— padrão:bovespa-etl-jobDATABASE_NAME— padrão:bovespa_databaseTABLE_NAME— padrão:bovespa_refined_data
-
Instale dependências locais (opcional):
pip install -r requirements.txt
Execute:
bash deploy_glue_job.shIsso irá:
- Enviar
glue_etl_job.pyparas3://<bucket>/scripts/glue_etl_job.py - Criar/atualizar o job
bovespa-etl-job - Validar o database Glue
aws glue start-job-run --job-name bovespa-etl-job --arguments '{
"--bucket_name": "<bucket>",
"--input_path": "s3://<bucket>/raw/",
"--output_path": "s3://<bucket>/refined/",
"--partition_year": "2025",
"--partition_month": "01",
"--partition_day": "06",
"--database_name": "bovespa_database",
"--table_name": "bovespa_refined_data"
}'Se não fornecer partições, o script derivará de Data ou da data atual.
Após finalizar o job, sincronize partições:
MSCK REPAIR TABLE bovespa_database.bovespa_refined_data;Exemplo de consulta:
SELECT simbolo, data, fechamento_final, volume_total
FROM bovespa_database.bovespa_refined_data
WHERE year='2025' AND month='01' AND day='06' AND simbolo='MRVE3'
LIMIT 20;- Evite
input_pathcom encoding deyear%3D...; use o diretório baseraw/ - Logs do CloudWatch ajudam a verificar schema e contagens (
df.count(),df.printSchema()) - O script é tolerante a nulos e variações de schema, padronizando colunas essenciais
- Não faça commit de
.envou chaves/segredos - Use
.env.examplecomo modelo
Adapte conforme sua necessidade.
Criado e Desenvolvido por Guilherme Favaron www.guilhermefavaron.com.br