-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathapi_call_async.py
More file actions
168 lines (141 loc) · 6.57 KB
/
api_call_async.py
File metadata and controls
168 lines (141 loc) · 6.57 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
import asyncio
import aiohttp
import time
import os
from dotenv import load_dotenv
import json
import random
import pandas as pd
load_dotenv(".env")
data_id_source = os.getenv("DATA_ID_SOURCE")
fetched_data_sink = os.getenv("FETCHED_DATA_SINK")
faulty_data_sink = os.getenv("FAULTY_DATA_SINK")
api_url = os.getenv("API_URL")
BATCH_SIZE = int(os.getenv("BATCH_SIZE"))
CONCURRENT_LIMIT = int(os.getenv("ASYNC_CONCURRENT_LIMIT"))
user_agent_file = os.getenv("USER_AGENT_LIST")
# Load user agents from JSON file
with open(user_agent_file, "r") as f:
user_agents = json.load(f).get("user_agents", [])
# Limit concurrent requests
semaphore = asyncio.Semaphore(CONCURRENT_LIMIT)
# -------------------------------------------------------------------------------------------------------------------------------------#
## Decorator functions
def timing_decorator(func):
def wrapper(*args, **kwargs):
start_time = time.perf_counter()
result = func(*args, **kwargs)
end_time = time.perf_counter()
print(f"Runtime: {(end_time - start_time) * 1000:.4f} ms")
return result
return wrapper
# -------------------------------------------------------------------------------------------------------------------------------------#
## Utility functions
def load_excel_data(file_name):
df = pd.read_excel(file_name, usecols=["id"], engine='calamine')
print(f"Total product IDs loaded with pandas: {df.shape[0]}")
# print(f"Checking data: {df.head()}")
id_data = df["id"].tolist()
return id_data
def batching_data(data_list, batch_size=1000):
batch_list = [data_list[i : i + batch_size] for i in range(0, len(data_list), batch_size)]
print(f"Total batches: {len(batch_list)}")
return batch_list
def init_job(source_file, batch_size=1000):
if not os.path.exists(fetched_data_sink):
os.makedirs(fetched_data_sink)
if not os.path.exists(faulty_data_sink):
os.makedirs(faulty_data_sink)
id_list = load_excel_data(source_file)
id_batch_list = batching_data(id_list, batch_size)
return id_batch_list
async def write_to_json(json_data, directory, **kwargs):
naming = "_".join([str(kwargs[arg]) for arg in kwargs]) if kwargs else "data"
path = os.path.join(directory, f"{naming}.json")
# Synchronous file writing wrapped in asyncio.to_thread to avoid blocking the event loop
def sync_write():
with open(path, "w", encoding="utf-8") as f:
json.dump(json_data, f, ensure_ascii=False, indent=2)
await asyncio.to_thread(sync_write)
print(f"Done writing to {path}.")
# -------------------------------------------------------------------------------------------------------------------------------------#
## API call functions
# api task
async def api_task(session, product_id, api_url, max_retries=3):
headers = {
"User-Agent": random.choice(user_agents),
"Accept": "application/json",
"Referer": "https://tiki.vn/",
}
for attempt in range(max_retries):
try:
async with semaphore:
async with session.get(
url=f"{api_url}/{product_id}", headers=headers, timeout=10
) as response:
status_code = response.status
if status_code == 200:
data = await response.json()
else:
reason = response.reason
await response.release()
if status_code == 200:
product_data = data
return {
"id": product_data.get("id"),
"api_data": {
"status_code": status_code,
"error:": "None",
"name": product_data.get("name"),
"url_key": product_data.get("url_key"),
"price": product_data.get("price"),
"description": product_data.get("description"),
"images_url": product_data.get("images", [{}])[0].get("base_url"),
}
}
elif status_code == 429: # Too Many Requests
raise Exception("error 429: Too Many Requests.")
else:
return {
"id": product_id,
"api_data": {"status_code": status_code,"error": str(response.reason)}
}
except Exception as e:
print(f"Fetching id {product_id} encounter {e} - Attempt {attempt + 1}/{max_retries}")
await asyncio.sleep(0.05 + attempt*random.uniform(0, 0.1)) # random exponential backoff > 50 ms
# If all retries exhausted and no return yet:
return {
"id": product_id,
"api_data": {"status_code": status_code,"error": str(reason)}
}
async def main():
id_batch_list = init_job(data_id_source, batch_size=BATCH_SIZE)
async with aiohttp.ClientSession() as session:
for idx, id_batch in enumerate(id_batch_list[:1]): # For testing, only process the first batch. Change to id_batch_list for full run.
fetched_data = {}
faulty_data = {}
error_429_count = 0
print(f"Fetching data of batch {idx + 1}...")
api_tasks_list = [
api_task(session, product_id, api_url) for product_id in id_batch
]
results = await asyncio.gather(*api_tasks_list)
for result in results:
if result and result.get("api_data", {}).get("status_code") == 200:
fetched_data[result.get("id")] = result.get("api_data")
else:
faulty_data[result.get("id")] = result.get("api_data")
if result.get("api_data", {}).get("status_code") == 429:
error_429_count += 1
await write_to_json(fetched_data, fetched_data_sink, pre_fix="fetched", batch_no=idx+1)
if faulty_data:
await write_to_json(faulty_data, faulty_data_sink, pre_fix="faulty", batch_no=idx+1)
print(f"Batch {idx + 1:02d} performance:")
print(f" - Fetched data: {len(fetched_data)}")
print(f" - Faulty data: {len(faulty_data)}")
print(f" - Error 429 (Too Many Requests): {error_429_count}")
# -------------------------------------------------------------------------------------------------------------------------------------#
if __name__ == "__main__":
start = time.perf_counter()
asyncio.run(main())
print(f"Total runtime: {(time.perf_counter() - start) * 1000:.4f} ms")