Para esse case, foi desenvolvido uma arquitetura batch para facilitar a extração de dados parquets, transformação e gravação na bronze. Isso tudo só preenchendo um json. Segue mais contexto abaixo:
- O usuário irá preencher o template json.
- Com o json preenchido corretamente, automaticamente o "motor" irá realizar extração, transformação e gravação na bronze.
- Com os dados disponíveis e particionados em formato delta na bronze, os usuários vão conseguir consumir e modelar da melhor forma.
Estrutura do código abaixo:
📁case_tecnico_data_architect/
├── 📁analysis
│ ├── pergunta1.ipynb #RESPOSTA PARA PERGUNTA 1 - SQL
│ └── pergunta2.ipynb #RESPOSTA PARA PERGUNTA 2 - SQL
├── main.py #CÓDIGO PRINCIPAL, QUE USA TODAS ÀS FUNÇÕES DO ARQUIVO utils_etl.py PARA REALIZAR A ETL.
├── docker-compose.yml #
├── Dockerfile #
├── README.md #
├── requirements.txt # BIBLIOTECAS NECESSÁRIAS PARA FUNCIONAMENTO DO CÓDIGO
│
└── 📁src
├── 📁schema #NESTA PASTA, É ONDE ESTÁ OS JSON PADRONIZADO PARA O MOTOR. CADA JSON É UMA INGESTÃO, E É ATRAVÉS DO JSON QUE É REALIZADO EXTRAÇÃO, TRANSFORMAÇÃO E CARGA.
│ ├── bronze_layer_fhv_tripdata.json
│ ├── bronze_layer_green_tripdata.json
│ └── bronze_layer_yellow_tripdata.json
│
└── utils_etl.py #CÓDIGO ONDE ESTÁ CENTRALIZADO TODAS ÀS FUNÇÕES UTILIZADAS.
└── __init__.py #
Esse é o template que o JSON deve seguir. É através de cada arquivo JSON, o motor irá entender que é uma ingestão e irá fazer toda etapa de extração, transformação e carga em fila (batch ), seguindo os parâmetros preenchidos no JSON.
{
"catalogo": "nome-do-catalogo",
"schema": "nome_do_schema",
"tabela": "nome_da_tabela",
"mode": "modo_da_ingestão",
"descricao": "Descrição da tabela",
"source": ["origem do dado (nesse caso, é uma url (www.link.com.br))"],
"columns": {
"coluna_a": {
"type": "int",
"description": "Descrição que será adicionado na coluna_a"
},
"coluna_b": {
"type": "string",
"description": "Descrição que será adicionado na coluna_b"
}
},
"unique_keys": [
"coluna_a",
"coluna_b"
],
"partition_column": {
"source": "coluna_a",
"target": "coluna_a_partition"
}| chave_do_json | descrição |
|---|---|
| catalogo | Nome do catálogo que será gravado os dados. |
| schema | Nome do schema que será gravado os dados |
| tabela | Nome da tabela que será gravado os dados |
| mode | É permitido 3 tipos de modo: - full_refresh: apaga tudo e recria a tabela. - incremental_delete_and_insert: deleta registros filtrados pela partição (start_date e end_date) e insere novos dados. - incremental_merge: faz merge dos dados filtrados pela partição. |
| columns | Nome das colunas, seu tipo e sua descrição |
| unique_keys | É um dicionário, onde é possível adicionar uma ou colunas para gerar um ID ÚNICO. É esse campo que irá ser usado como id único no MERGE. |
| partition_column | Recebe uma coluna na chave 'source' no formato de timestamp, datetime ou date, convertemos para DATE e renomeamos com o valor da chave 'target' |
-
Clone esse repositório
git clone [email protected]:fsfer01/case_tecnico_data_architect.git
-
No diretório do repositório, suba o container:
docker compose up -d --build
-
Acesse o terminal do seu container, através do comando no terminal:
docker compose exec spark bash -
No terminal do container, execute:
pip install --upgrade pip pip install -r requirements.txt
-
Após instalação do requirements, execute o main.py
python main.py
-
Para executar os jupyter notebook, execute no terminal do container
jupyter nbconvert --to notebook --execute /app/analysis/pergunta1.ipynb --inplace jupyter nbconvert --to notebook --execute /app/analysis/pergunta2.ipynb --inplace
Após isso, você executou todo esse projeto. Repare que na pasta raiz do seu projeto, teremos novos diretórios, que estão sendo ignorado pelo .gitignore:
📁case_tecnico_data_architect/
├── 📁s3_camada_raw # ONDE ESTÁ SENDO GRAVADO OS ARQUIVOS PARQUETS QUE É EXTRAÍDO
├── 📁catalogo # ONDE ESTÁ SENDO ESCRITO AS TABELAS NO FORMATO DELTA
├── 📁analysis
├──────── 📁spark-warehouse # CRIANDO QUANDO EXECUTAMOS O JYPTER NOTEBOOK PARA REALIZAR ÀS CONSULTAS SQL
- Para sair do terminal do container,execute no terminal
CTRL + q e CTRL + p
- Você vai voltar para o terminal normal, aí, execute:
docker stop $(docker ps -aq) docker rm $(docker ps -aq) docker rmi $(docker images -aq)
O main.py é o ponto de entrada do pipeline ETL da camada Bronze, automatizando extração, transformação e carga (ETL) dos dados do TLC Taxi usando Spark e Delta Lake.
O pipeline suporta tanto tabelas locais (pastas) quanto tabelas registradas em catálogo Spark SQL.
- Python 3.9+
- PySpark 3.4.x
- Delta Lake 2.4.x
- Requests
- PyArrow
- DeltaTables
- Lista arquivos schema JSON em
src/schema/. - Para cada schema:
- Carrega metadados (colunas, chaves, partições, modo de carga).
- Gera URLs dos arquivos Parquet.
- Baixa os Parquets para a camada raw.
- Lê os Parquets em DataFrames Spark.
- Aplica transformações:
- Cast das colunas conforme schema.
- Criação de chave única (
unique_id) com fallback se não houver colunas únicas. - Criação de coluna de partição por data.
- Filtragem do DataFrame apenas para o período correspondente ao arquivo.
- Inclusão de colunas ausentes no final do DataFrame.
- Adição da coluna de data de processamento (
data_processamento).
- Persiste os dados na camada Bronze com Delta Lake (modo configurado no JSON ou sobrescrito no
main.py).
| Função | Descrição |
|---|---|
get_schemas_bronze_layer(path) |
Lê o schema JSON e retorna os metadados (colunas, chaves, partições). |
generate_tlc_taxi_urls(path_json, mode) |
Gera URLs dos arquivos Parquet, período de processamento e modo de carga. |
save_parquet_over_url(url, output_path, spark) |
Baixa Parquet da URL e salva na camada raw. |
read_parquet_files(path, spark) |
Lê arquivos Parquet locais em um DataFrame Spark unificado. |
get_period_from_filename(file_name) |
Extrai start_date e end_date do nome do arquivo (YYYY-MM). |
filter_df_by_period(df, partition_col, start_date, end_date) |
Filtra DataFrame por período. |
transform_df_and_create_unique_key_and_column_date_partition(df, schema_metadata, ...) |
Transformações principais: - Cast de tipos conforme o schema - Criação de unique_id com fallback - Criação de coluna de partição de data - Filtragem do período do arquivo - Inclusão de colunas ausentes no final - Coluna data_processamento |
load_to_bronze_layer(spark, df, metadata, local) |
Persiste DataFrame na camada Bronze (Delta Lake) com suporte a: - full refresh: sobrescreve tudo - incremental delete+insert: deleta registros da partição e insere novos - incremental merge: atualiza registros existentes e insere novos com base no unique_id |
- full_refresh: sobrescreve todos os dados da tabela com os novos dados do DataFrame.
- incremental_delete_and_insert: deleta registros existentes filtrados pela coluna de partição (
start_dateeend_date) e insere os novos registros. - incremental_merge: atualiza registros existentes (baseado em
unique_id) e insere novos registros filtrados pela partição.
Durante a execução, o pipeline exibe mensagens detalhadas, incluindo:
- Schemas JSON encontrados e arquivos processados.
- Status de download dos Parquets.
- Contagem de registros e colunas após transformação.
- Sucesso ou erro durante a carga na camada Bronze.