-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathupdate_price.py
More file actions
224 lines (190 loc) · 7.21 KB
/
Copy pathupdate_price.py
File metadata and controls
224 lines (190 loc) · 7.21 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
import json
import re
try:
import pandas as pd
ENGINE = "pandas"
except ImportError:
from openpyxl import load_workbook
ENGINE = "openpyxl"
EXCEL_FILE = "price.xlsx"
JSON_FILE = "price.json"
def is_lobster(name):
lobsters = ['龙虾', '澳洲龙虾', '波士顿龙虾', '小青龙', '花龙', '青龙']
return any(lob in name for lob in lobsters)
def process_item(name, price, target_list):
original_name = name.strip()
cleaned_name = original_name.replace('(', '(').replace(')', ')').strip()
# 河虾、龙虾不拆分,直接保留
if '河虾' in cleaned_name or is_lobster(cleaned_name):
target_list.append({"name": original_name, "price": price})
return
prices = re.split(r'[-~—]', price)
prices = [p.strip() for p in prices if p.strip()]
price_count = len(prices)
if price_count == 1:
target_list.append({"name": original_name, "price": prices[0]})
return
base_name = re.sub(r'\([^)]*\)', '', cleaned_name).strip()
if price_count == 2:
try:
p1 = float(prices[0])
p2 = float(prices[1])
except ValueError:
target_list.append({"name": original_name, "price": price})
return
# 价高为“大”,价低为“小”
if p1 >= p2:
sorted_prices = [prices[0], prices[1]]
else:
sorted_prices = [prices[1], prices[0]]
specs = ['大', '小']
for i, spec in enumerate(specs):
target_list.append({
"name": f"{base_name}({spec})",
"price": sorted_prices[i]
})
return
if price_count == 3:
try:
p_vals = [float(p) for p in prices]
except ValueError:
target_list.append({"name": original_name, "price": price})
return
# 降序:高→中→低,对应大→中→小
sorted_indices = sorted(range(3), key=lambda i: p_vals[i], reverse=True)
spec_map = {sorted_indices[0]: '大', sorted_indices[1]: '中', sorted_indices[2]: '小'}
specs = [spec_map[i] for i in range(3)]
for i, spec in enumerate(specs):
target_list.append({
"name": f"{base_name}({spec})",
"price": prices[i]
})
return
# 超过3个价格,保留原样
target_list.append({"name": original_name, "price": price})
def process_lobster(row, target_list, last_price):
if len(row) < 4:
return last_price
name = row[1].strip() if row[1] else ''
spec = row[2].strip() if row[2] else ''
price_str = row[3].strip() if row[3] else ''
if '龙虾' not in name or not spec:
return last_price
if not price_str:
price_str = last_price
else:
price_str = price_str.replace('-', '~')
last_price = price_str
if not price_str:
return last_price
target_list.append({"name": f"龙虾({spec})", "price": price_str})
return last_price
def normalize_date(raw_date):
if not raw_date:
return ""
raw_date = str(raw_date).strip()
match = re.search(r'(\d{4})[./-](\d{1,2})[./-](\d{1,2})', raw_date)
if match:
year, month, day = match.groups()
return f"{year}/{int(month):02d}/{int(day):02d}"
return raw_date
def parse_first_price(price_str):
if not price_str:
return 0.0
parts = re.split(r'[~\-]', price_str)
for part in parts:
try:
return float(part.strip())
except ValueError:
continue
return 0.0
def extract_base_name(item_name):
name = item_name.strip()
name = re.sub(r'[((][大中小][))]$', '', name).strip()
name = re.sub(r'\(\d+头\)$', '', name).strip()
return name
def sort_items(items):
grouped = {}
group_order = []
for item in items:
base = extract_base_name(item["name"])
if base not in grouped:
grouped[base] = []
group_order.append(base)
grouped[base].append(item)
spec_order = {'大': 0, '中': 1, '小': 2}
for base in grouped:
lst = grouped[base]
if '河虾' in base:
lst.sort(key=lambda x: parse_first_price(x["price"]), reverse=True)
else:
def spec_key(item):
name = item["name"]
match = re.search(r'[((]([大中小])[))]$', name)
if match:
return spec_order.get(match.group(1), 99)
return 99
lst.sort(key=spec_key)
if '河虾' in group_order:
group_order.remove('河虾')
group_order.append('河虾')
result = []
for base in group_order:
result.extend(grouped[base])
return result
def main():
if ENGINE == "pandas":
df = pd.read_excel(EXCEL_FILE, header=None, dtype=str)
rows = df.values.tolist()
else:
wb = load_workbook(EXCEL_FILE, data_only=True)
ws = wb.active
rows = []
for row in ws.iter_rows(min_row=1, values_only=True):
rows.append([str(c).strip() if c is not None else '' for c in row])
wb.close()
categories = []
date_str = ""
current_cat = None
in_lobster = False
lobster_last_price = ""
for row in rows:
if not row or all((pd.isna(c) if ENGINE == "pandas" else c == '') for c in row):
continue
row = [str(c).strip() if (not (pd.isna(c) if ENGINE == "pandas" else c == '')) else '' for c in row]
row_text = ' '.join(row)
if '日期:' in row_text:
parts = row_text.split('日期:')
if len(parts) > 1:
date_str = normalize_date(parts[1])
continue
first_cell = row[0]
if '批发价' in first_cell:
title = first_cell.replace(':', ':').split(':')[0].strip()
current_cat = {"title": title, "items": []}
categories.append(current_cat)
if '龙虾' in title:
in_lobster = True
lobster_last_price = ""
else:
in_lobster = False
continue
if first_cell in ['编号', '品名']:
continue
if current_cat is not None:
if in_lobster:
lobster_last_price = process_lobster(row, current_cat["items"], lobster_last_price)
else:
if len(row) >= 3 and row[1] and row[2]:
process_item(row[1], row[2], current_cat["items"])
if len(row) >= 6 and row[4] and row[5]:
process_item(row[4], row[5], current_cat["items"])
for cat in categories:
cat["items"] = sort_items(cat["items"])
result = {"date": date_str, "categories": categories}
with open(JSON_FILE, 'w', encoding='utf-8') as f:
json.dump(result, f, ensure_ascii=False, indent=2)
total_items = sum(len(c['items']) for c in categories)
print(f"✅ 成功生成 {JSON_FILE},共 {len(categories)} 个分类,{total_items} 条记录")
if __name__ == "__main__":
main()