Skip to content

Commit f3e1e14

Browse files
Refactor for respect single-responsability in transform module
1 parent b6352a6 commit f3e1e14

File tree

2 files changed

+15
-56
lines changed

2 files changed

+15
-56
lines changed

etl/models/transform/ResponseSplit.py

+6-51
Original file line numberDiff line numberDiff line change
@@ -1,61 +1,16 @@
1-
import os
2-
3-
import pandas as pd
4-
import concurrent.futures
5-
from tqdm import tqdm
6-
7-
from etl.common.utils.common import (
8-
DefaultTimestampStr,
9-
DefaultOutputFolder,
10-
DefaultUTCDatetime,
11-
)
121
from etl.common.utils.logs import loggingInfo
132
from etl.config.logFile import logFileName
143

154
WORK_DIR = logFileName(file=__file__)
165

176

187
class transformation:
19-
def __init__(self, json_response: dict, params) -> None:
20-
self.output_path = DefaultOutputFolder()
21-
self.insert_timestamp = DefaultTimestampStr()
22-
self.extracted_files = []
8+
def __init__(self, json_response: dict, params, fila: object):
239
self.json_response = json_response
24-
self.totalParams = len(json_response)
2510
self.validParams = params
11+
self.fila = fila
2612

27-
## Parallel Processing data
28-
with concurrent.futures.ThreadPoolExecutor(os.cpu_count()) as executor:
29-
list(
30-
tqdm(
31-
executor.map(self.__process_param__, enumerate(self.validParams)),
32-
total=self.totalParams,
33-
desc="Processing files",
34-
)
35-
)
36-
37-
loggingInfo(
38-
f"{self.totalParams} files extracted in: {self.output_path}", WORK_DIR
39-
)
40-
41-
def __process_param__(self, args):
42-
43-
index, param = args
44-
dic = self.json_response[param.replace("-", "")]
45-
46-
# Convert 'dic' to a Pandas DataFrame
47-
df = pd.DataFrame([dic])
48-
49-
# Add new columns to the DataFrame
50-
df["symbol"] = param
51-
52-
# Add two columns with the current date and time
53-
df["extracted_at"] = DefaultUTCDatetime()
54-
55-
# Write the DataFrame to a Parquet file
56-
df.to_parquet(f"{self.output_path}{param}-{self.insert_timestamp}.parquet")
57-
58-
# Append list with the file path
59-
self.extracted_files.append(f"{param}-{self.insert_timestamp}.parquet")
60-
61-
return None
13+
def publish(self):
14+
for param in self.validParams:
15+
dic = self.json_response[param.replace("-", "")]
16+
self.fila.put(dic) # type: ignore

etl/models/transform/test_transform.py

+9-5
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
import pytest
22
from etl.models.transform.ResponseSplit import transformation
3+
import queue
4+
5+
fila = queue.Queue()
36

47

58
@pytest.fixture
@@ -41,10 +44,11 @@ def valid_params() -> list:
4144

4245
def test_transformation_process_param(json_response, valid_params):
4346
# Create an instance of the transformation class
44-
transform = transformation(json_response, valid_params)
47+
transform = transformation(json_response, valid_params, fila)
48+
transform.publish()
4549

46-
assert len(transform.extracted_files) == 2
50+
assert fila.qsize() == 2
4751

48-
for extracted_file in transform.extracted_files:
49-
assert extracted_file.startswith("BRL-")
50-
assert extracted_file.endswith(".parquet")
52+
# for extracted_file in transform.extracted_files:
53+
# assert extracted_file.startswith("BRL-")
54+
# assert extracted_file.endswith(".parquet")

0 commit comments

Comments
 (0)