-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpipeline.py
148 lines (120 loc) · 4.35 KB
/
pipeline.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
import pandas as pd
import psycopg2
from psycopg2 import extras
from unidecode import unidecode
def log(message) -> None:
"""Logs the operations
Args:
message (_type_): The messages on the functions.
"""
print(f'\n {message}')
def get_data(path:str) -> pd.DataFrame:
"""Extracts the data from json
Args:
path (str): Path to the data.
Returns:
pd.DataFrame
"""
# Extracts the data from json
data = pd.read_json(path)
log("Dados carregados com sucesso.")
return data
def process_data(data: pd.DataFrame) -> pd.DataFrame:
"""Processes the data.
Args:
data (pd.DataFrame): The dataframe created in the first task.
Raises:
error: Raises an error if something goes wrong.
Returns:
pd.DataFrame: Treated data dataframe.
"""
try:
df = pd.DataFrame(data)
# Split the address into separate columns
df[['logradouro', 'numero', 'bairro', 'cidade', 'estado', 'cep']] = df['endereco'].apply(pd.Series)
# Removes unnecessary column
df = df.drop('endereco', axis=1)
# Remove special characters from the name and address
df['nome'] = df['nome'].apply(unidecode)
df['logradouro'] = df['logradouro'].apply(unidecode)
df['bairro'] = df['bairro'].apply(unidecode)
df['cidade'] = df['cidade'].apply(unidecode)
# Remove special characters from the phone number and cep
df['telefone'] = df['telefone'].str.replace('-','')
df['telefone'] = df['telefone'].str.replace('(00)','')
df['cep'] = df['cep'].str.replace('-','')
except Exception as error:
raise error
log('Dados transformados com sucesso.')
return df
def load_data_to_postgres(data: pd.DataFrame) -> None:
"""Loads data to a postgres table.
Args:
data (pd.DataFrame): Target dataframe.
"""
# connection string
connection_string = psycopg2.connect(
host="localhost",
port="5432",
database="postgres",
user="postgres",
password="postgres",
)
with connection_string.cursor() as cursor:
# creates the table
create_table = """
CREATE TABLE IF NOT EXISTS address_data(
id SERIAL PRIMARY KEY,
nome VARCHAR,
idade INT,
email VARCHAR,
telefone VARCHAR,
logradouro VARCHAR,
numero VARCHAR,
bairro VARCHAR,
cidade VARCHAR,
estado VARCHAR,
cep VARCHAR
)
"""
cursor.execute(create_table)
# insert non duplicated data
insert_query = """
INSERT INTO address_data (id, nome, idade, email, telefone, logradouro, numero, bairro, cidade, estado, cep)
VALUES %s
ON CONFLICT (id) DO UPDATE SET
nome = EXCLUDED.nome,
idade = EXCLUDED.idade,
telefone = EXCLUDED.telefone,
logradouro = EXCLUDED.logradouro,
numero = EXCLUDED.numero,
bairro = EXCLUDED.bairro,
cidade = EXCLUDED.cidade,
estado = EXCLUDED.estado,
cep = EXCLUDED.cep
"""
extras.execute_values(cursor, insert_query, data.values)
# Checa se os dados foram inseridos no banco
cursor.execute("SELECT COUNT(*) FROM address_data")
count = cursor.fetchone()[0]
if count == len(data):
log("Dados inseridos com sucesso.")
# Consulta para selecionar os dados inseridos
cursor.execute("SELECT * FROM address_data")
rows = cursor.fetchall() # Recupera todos os registros retornados pela consulta
# Exibe os dados recuperados
for row in rows:
print(row)
else:
log("Erro ao inserir dados.")
def run_pipeline(path: str) -> None:
"""Pipeline executor.
Args:
path (str): Where the data is stored.
"""
data = get_data(path)
dataframe_address_data = process_data(data)
load_data_to_postgres(dataframe_address_data)
if __name__ == "__main__":
path = '/home/carol/Documentos/data_eng_test_capim/data.json'
run_pipeline(path)