-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathapi_call_multi_thread.py
More file actions
167 lines (146 loc) · 7.36 KB
/
api_call_multi_thread.py
File metadata and controls
167 lines (146 loc) · 7.36 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
import requests
# from openpyxl import load_workbook
import time
import json
from concurrent.futures import ThreadPoolExecutor, as_completed
import random
import pandas as pd
import os
from dotenv import load_dotenv
load_dotenv(".env")
data_id_source = os.getenv("DATA_ID_SOURCE")
fetched_data_sink = os.getenv("FETCHED_DATA_SINK")
faulty_data_sink = os.getenv("FAULTY_DATA_SINK")
api_url = os.getenv("API_URL")
BATCH_SIZE = int(os.getenv("BATCH_SIZE")) if os.getenv("BATCH_SIZE") and os.getenv("BATCH_SIZE").isdigit() else 1000
MAX_WORKERS = int(os.getenv("MAX_WORKERS")) if os.getenv("MAX_WORKERS") and os.getenv("MAX_WORKERS").isdigit() else 50
user_agent_file = os.getenv("USER_AGENT_LIST")
# Load user agents from JSON file
with open(user_agent_file, "r") as f:
user_agents = json.load(f).get("user_agents", [])
# -------------------------------------------------------------------------------------------------------------------------------------#
def timing_decorator(func):
def wrapper(*args, **kwargs):
start_time = time.perf_counter()
result = func(*args, **kwargs)
end_time = time.perf_counter()
print(f"Runtime: {(end_time - start_time) * 1000:.4f} ms")
return result
return wrapper
# -------------------------------------------------------------------------------------------------------------------------------------#
# def yield_data_openpyxl(file_name):
# wb = load_workbook(file_name, read_only=True, data_only=True) # Read-only and data_only mode for better RAM efficiency
# sheet = wb.active # Or wb['SheetName'] if knows sheet name
# for row in sheet.iter_rows(min_row=2,max_col=1, values_only=True): # Only read the first column (id)
# if row is not None:
# yield str(row[0])
# wb.close() # Close the workbook to free up resources
# @timing_decorator
# def count_rows_openpyxl(file_name):
# row_count = 0
# for row in yield_data_openpyxl(file_name):
# row_count += 1
# return row_count
def load_data_pandas(file_name):
df = pd.read_excel(file_name, usecols=["id"], engine='calamine')
print(f"Total product IDs loaded with pandas: {df.shape[0]}")
# print(f"Checking data: {df.head()}")
id_data = df["id"].tolist()
return id_data
def batching_data(id_list, batch_size=1000):
batch_list = [id_list[i : i + batch_size] for i in range(0, len(id_list), batch_size)]
print(f"Total batches: {len(batch_list)}")
return batch_list
def init_job(source_file, batch_size=1000):
if not os.path.exists(fetched_data_sink):
os.makedirs(fetched_data_sink)
if not os.path.exists(faulty_data_sink):
os.makedirs(faulty_data_sink)
id_list = load_data_pandas(source_file)
id_batch_list = batching_data(id_list, batch_size)
return id_batch_list
def write_to_json(json_data, directory, **kwargs):
naming = "_".join([str(kwargs[arg]) for arg in kwargs]) if kwargs else "data"
path = os.path.join(directory, f"{naming}.json")
with open(path, "w", encoding="utf-8") as f:
json.dump(json_data, f, ensure_ascii=False, indent=2)
print(f"Done writing to {path}.")
# -------------------------------------------------------------------------------------------------------------------------------------#
# Worker function to fetch data from API
def worker_fetch_data(product_id, api_url, max_retries=3):
for attempt in range(max_retries):
# Config rotating user-agent
headers = {
"User-Agent": random.choice(user_agents),
"Accept": "application/json",
"Referer": "https://tiki.vn/",
}
try:
response = requests.get(
url=f"{api_url}/{product_id}", headers=headers, timeout=10
)
status_code = response.status_code
if status_code == 200:
product_data = response.json()
return {
"id": product_data.get("id"),
"api_data": {
"status_code": status_code,
"error:": "None",
"name": product_data.get("name"),
"url_key": product_data.get("url_key"),
"price": product_data.get("price"),
"description": product_data.get("description"),
"images_url": product_data.get("images", [{}])[0].get("base_url"),
}
}
elif status_code == 429: # Too Many Requests
raise Exception("error 429: Too Many Requests.")
else:
return {
"id": product_id,
"api_data": {"status_code": status_code,"error": str(response.reason)}
}
except requests.exceptions.ReadTimeout as e:
print(f"Fetching id {product_id} encounter {e} - Attempt {attempt + 1}/{max_retries}")
time.sleep(0.05 + attempt*random.uniform(0, 0.1)) # random exponential backoff > 50 ms
except Exception as e:
print(f"Fetching id {product_id} encounter {e} - Attempt {attempt + 1}/{max_retries}")
time.sleep(0.05 + attempt*random.uniform(0, 0.1)) # random exponential backoff > 50 ms
# If all retries exhausted and no return yet:
return {
"id": product_id,
"api_data": {"status_code": "Timeout + retries exhausted","error": "Timeout + retries exhausted"}
}
## main API call function
@timing_decorator
def get_product_data(id_batch, batch_index, api_url, max_workers=MAX_WORKERS, retries=3):
fetched_data = {}
faulty_data = {}
error_429_count = 0
print(f"Start fetching data for batch {batch_index + 1:02d}.")
# ThreadPoolExecutor create multiple threads to fetch product data concurrently
with ThreadPoolExecutor(max_workers=max_workers) as executor: # max_workers
futures = [executor.submit(worker_fetch_data, product_id, api_url, retries) for product_id in id_batch]
for future in as_completed(futures):
api_result = future.result()
if api_result is not None and api_result.get("api_data", {}).get("status_code") == 200:
fetched_data.update({api_result.get("id"): api_result.get("api_data")})
else:
faulty_data.update({api_result.get("id"): api_result.get("api_data")})
if api_result.get("api_data", {}).get("status_code") == 429:
error_429_count += 1
write_to_json(fetched_data, fetched_data_sink, pre_fix="fetched", batch_no=batch_index + 1)
if faulty_data:
write_to_json(faulty_data, faulty_data_sink, pre_fix="faulty", batch_no=batch_index + 1)
print(f"Batch {batch_index + 1:02d} performance:")
print(f" - Fetched data: {len(fetched_data)} records.")
print(f" - Faulty data: {len(faulty_data)} records.")
print(f" - Error 429 (Too Many Requests): {error_429_count} records.")
# ----------------------------------------------------------------------------#
if __name__ == "__main__":
#Load product IDs from Excel file using pandas
id_batch_list = init_job(data_id_source, batch_size=BATCH_SIZE)
# Get product data for each batch and write to json files
for idx, id_batch in enumerate(id_batch_list[:1]): # For testing, only process the first batch. Change to id_batch_list for full run.
processed_data = get_product_data(id_batch, idx, api_url)