-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtravel_planner_engine.py
More file actions
457 lines (383 loc) · 22.5 KB
/
travel_planner_engine.py
File metadata and controls
457 lines (383 loc) · 22.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
# travel_planner_engine_v6_final.py
import math
import itertools
from datetime import datetime, timedelta
from typing import List, Dict, Any, Optional
import pandas as pd
from collections import defaultdict
class TravelPlannerEngine:
"""
通用旅游行程优化引擎 (V6.0 Final)。
该引擎集成了所有核心功能,包括从Excel加载全量数据、交通优化、
住宿优化(性别感知、同址)、地点剪枝、动态时序排列以及
全局成本决策,旨在生成一个成本最低且完全可行的旅行方案。
"""
def __init__(self):
"""
初始化引擎,准备存储从Excel加载的各种数据。
"""
# 从 '配置参数' Sheet 加载
self.num_people: int = 0
self.num_luggage: int = 0
self.min_days: int = 0
self.max_days: int = 0
# 从 '人员信息' Sheet 加载
self.members: List[Dict] = []
# 从 '地点' Sheet 加载
self.locations: List[Dict] = []
# 从 '交通矩阵' Sheet 加载
self.time_matrix: Dict[str, Dict[str, float]] = {}
# 从 '住宿方案' 和 '房型详情' Sheets 加载
self.accommodation_providers: List[Dict] = []
self.room_types_by_provider: Dict[str, List[Dict]] = defaultdict(list)
# 从 '机票价格' Sheet 加载
self.flight_costs: Dict[int, float] = {}
# 内置或可配置的车辆库
self.vehicle_options: List[Dict] = [
{'name': '经济轿车', 'seats': 4, 'luggage': 2, 'price_per_day': 200},
{'name': '舒适商务', 'seats': 7, 'luggage_base': 1, 'price_per_day': 500},
{'name': '中巴', 'seats': 15, 'luggage': 15, 'price_per_day': 900},
]
@classmethod
def from_excel(cls, file_path: str) -> 'TravelPlannerEngine':
"""
[工厂方法] 从一个结构化的Excel文件加载所有数据,并创建一个完全配置好的引擎实例。
Args:
file_path (str): Excel文件的路径。
Returns:
TravelPlannerEngine: 一个完全初始化的引擎实例。
Raises:
FileNotFoundError: 如果文件不存在。
ValueError: 如果Excel数据缺失或格式不正确。
"""
try:
engine = cls()
xls = pd.ExcelFile(file_path)
# 1. 读取配置参数
config_df = pd.read_excel(xls, sheet_name='配置参数')
config = dict(zip(config_df['Parameter'], config_df['Value']))
engine.num_people = int(config['num_people'])
engine.num_luggage = int(config['num_luggage'])
engine.min_days = int(config['min_days'])
engine.max_days = int(config['max_days'])
# 2. 读取人员信息
personnel_df = pd.read_excel(xls, sheet_name='人员信息')
engine.members = personnel_df.to_dict('records')
# 校验人数是否与配置一致
if len(engine.members) != engine.num_people:
raise ValueError("'人员信息' Sheet中的人数与'配置参数'中的 'num_people' 不匹配。")
# 3. 读取地点
locations_df = pd.read_excel(xls, sheet_name='地点')
engine.locations = locations_df.to_dict('records')
# 4. 读取交通矩阵
matrix_df = pd.read_excel(xls, sheet_name='交通矩阵', index_col=0)
engine.time_matrix = matrix_df.to_dict('index')
# 5. 读取住宿方案与房型详情
providers_df = pd.read_excel(xls, sheet_name='住宿方案')
engine.accommodation_providers = providers_df.to_dict('records')
room_types_df = pd.read_excel(xls, sheet_name='房型详情')
for _, row in room_types_df.iterrows():
engine.room_types_by_provider[row['provider_name']].append(row.to_dict())
# 6. 读取机票价格
flight_df = pd.read_excel(xls, sheet_name='机票价格')
required_flight_cols = ['天数', '出发时间段', '返回时间段', '价格']
if not all(col in flight_df.columns for col in required_flight_cols):
raise ValueError("Excel '机票价格' Sheet中缺少必要的列。")
# 将整个表格存为记录列表,每条记录都是一个完整的航班选项
engine.flight_options = flight_df.to_dict('records')
# --- 关键修复:添加返回语句和成功提示 ---
print("✅ 引擎已成功从Excel文件初始化 (V6.0 Final)。")
return engine
except FileNotFoundError:
raise FileNotFoundError(f"❌ 错误: 无法找到Excel文件 '{file_path}'")
except Exception as e:
raise ValueError(f"❌ 解析Excel文件时出错: {e}")
def _optimize_transport(self) -> Dict[str, Any]:
"""
【交通模块】计算满足人数和行李的最低每日车辆成本。
采用穷举搜索找到最优车辆组合,包含商务车特殊逻辑。
"""
max_counts = [math.ceil(self.num_people / v['seats']) + 1 for v in self.vehicle_options]
possible_combinations = itertools.product(*(range(c + 1) for c in max_counts))
valid_solutions = []
for combo in possible_combinations:
if sum(combo) == 0: continue
total_seats, total_luggage, total_cost = 0, 0, 0
business_van_idx = next((i for i, v in enumerate(self.vehicle_options) if v['name'] == '舒适商务'), -1)
seats_from_other_vehicles, luggage_from_other_vehicles = 0, 0
for i, count in enumerate(combo):
if i == business_van_idx: continue
vehicle = self.vehicle_options[i]
seats_from_other_vehicles += count * vehicle['seats']
luggage_from_other_vehicles += count * vehicle.get('luggage', 0)
num_business_van = combo[business_van_idx] if business_van_idx != -1 else 0
business_van = self.vehicle_options[business_van_idx] if business_van_idx != -1 else {}
total_seats = seats_from_other_vehicles + num_business_van * business_van.get('seats', 0)
if total_seats < self.num_people: continue
people_in_van = max(0, self.num_people - seats_from_other_vehicles)
if business_van and people_in_van > num_business_van * business_van['seats']: continue
empty_seats_in_van = (num_business_van * business_van.get('seats', 0)) - people_in_van
luggage_from_van = (num_business_van * business_van.get('luggage_base', 0)) + empty_seats_in_van
total_luggage = luggage_from_other_vehicles + luggage_from_van
if total_luggage < self.num_luggage: continue
for i, count in enumerate(combo):
total_cost += count * self.vehicle_options[i]['price_per_day']
valid_solutions.append({'cost': total_cost, 'combination': combo})
if not valid_solutions: raise ValueError("无法找到满足条件的车辆组合!")
return min(valid_solutions, key=lambda x: x['cost'])
def _get_best_subsets(self, max_skip: int = 2) -> List[List[Dict]]:
"""
【地点模块】生成所有可能的地点组合(包含所有必去点,最多跳过2个可选点)。
"""
must_go = [loc for loc in self.locations if loc['is_must_go']]
optional = [loc for loc in self.locations if not loc['is_must_go']]
possible_subsets = []
for num_to_keep in range(len(optional), len(optional) - max_skip - 1, -1):
if num_to_keep < 0: continue
for kept_optionals in itertools.combinations(optional, num_to_keep):
possible_subsets.append(must_go + list(kept_optionals))
# 初步剪枝:如果仅游玩时间就超过最大天数,则该组合无效
valid_subsets = [
subset for subset in possible_subsets
if math.ceil(sum(loc['play_time_hours'] for loc in subset) / 10) <= self.max_days
]
if not valid_subsets: raise ValueError("没有任何地点组合能在指定天数内完成基础游玩。")
return valid_subsets
def _solve_room_assignment_for_group(self, num_people_in_group: int, available_rooms: List[Dict]) -> Dict:
"""【住宿辅助】使用动态规划为特定性别分组计算最优房间组合。"""
if num_people_in_group == 0: return {'cost': 0, 'combination': {}, 'total_capacity': 0}
n = num_people_in_group
rooms = sorted(available_rooms, key=lambda x: x['capacity'])
max_cap = rooms[-1]['capacity'] if rooms else 1
dp_size = n + max_cap
dp = {i: (float('inf'), {}) for i in range(dp_size)}
dp[0] = (0, defaultdict(int))
for i in range(1, dp_size):
for room in rooms:
cap, price, name = room['capacity'], room['price_per_night'], room['room_name']
if i >= cap:
prev_cost, prev_combo = dp[i - cap]
if prev_cost != float('inf'):
new_cost = prev_cost + price
if new_cost < dp[i][0]:
new_combo = prev_combo.copy()
new_combo[name] += 1
dp[i] = (new_cost, new_combo)
best_cost, best_combination, final_capacity = float('inf'), {}, 0
for i in range(n, dp_size):
cost, combo = dp[i]
if cost < best_cost:
best_cost, best_combination, final_capacity = cost, dict(combo), i
if best_cost == float('inf'): raise ValueError(f"无法为 {n} 人的分组找到合适的房间组合。")
return {'cost': best_cost, 'combination': best_combination, 'total_capacity': final_capacity}
def _optimize_accommodation(self) -> Dict[str, Any]:
"""【住宿模块】遍历所有住宿方案,为整个团体找到最低成本的同址、性别感知住宿方案。"""
males = [p for p in self.members if p['gender'] == '男']
females = [p for p in self.members if p['gender'] == '女']
best_overall_solution = {'total_cost': float('inf')}
for provider in self.accommodation_providers:
provider_name = provider['provider_name']
current_solution = {'provider_name': provider_name, 'type': provider['type'], 'total_cost': 0, 'details': {}}
if provider['type'] == 'Hotel':
hotel_rooms = self.room_types_by_provider.get(provider_name, [])
if not hotel_rooms: continue
male_solution = self._solve_room_assignment_for_group(len(males), hotel_rooms)
female_solution = self._solve_room_assignment_for_group(len(females), hotel_rooms)
current_solution['total_cost'] = male_solution['cost'] + female_solution['cost']
current_solution['details']['males'] = male_solution
current_solution['details']['females'] = female_solution
elif provider['type'] == 'Villa':
if self.num_people <= provider['capacity']:
current_solution['total_cost'] = provider['price_per_night']
current_solution['details']['info'] = f"整栋别墅,可容纳 {provider['capacity']} 人。"
else:
current_solution['total_cost'] = float('inf')
if current_solution['total_cost'] < best_overall_solution['total_cost']:
best_overall_solution = current_solution
if best_overall_solution['total_cost'] == float('inf'):
raise ValueError("在所有住宿方案中,都无法找到能容纳所有人的有效组合。")
return best_overall_solution
def _solve_tsp_and_schedule(self, subset: List[Dict], days: int, departure_slot: str, return_slot: str) -> Optional[Dict]:
"""
【V7 升级版排期模块】为给定的地点子集找到最优路径(TSP),并根据具体的
出发和返回时间段,在指定天数内进行贪心排期。
Args:
subset (List[Dict]): 待安排的地点组合。
days (int): 本次排期尝试的总天数。
departure_slot (str): 出发时间段 (如 '上午出发', '下午出发')。
return_slot (str): 返回时间段 (如 '上午返回', '晚上返回')。
Returns:
Optional[Dict]: 如果排期成功,返回包含行程单的字典,否则返回 None。
"""
# 1. 路径优化 (TSP),此部分逻辑不变
start_node, location_names = "Hotel", [loc['name'] for loc in subset]
best_path, min_travel_time = [], float('inf')
# 对于地点较多的情况,应换用更高效的TSP近似算法,但全排列对旅游场景(<=10个点)足够
for path_perm in itertools.permutations(location_names):
current_time = self.time_matrix[start_node][path_perm[0]]
for i in range(len(path_perm) - 1):
current_time += self.time_matrix[path_perm[i]][path_perm[i+1]]
current_time += self.time_matrix[path_perm[-1]][start_node]
if current_time < min_travel_time:
min_travel_time, best_path = current_time, [start_node] + list(path_perm)
visit_order = sorted(subset, key=lambda x: best_path.index(x['name']))
# 2. 贪心排期,引入航班时间感知逻辑
itinerary, current_day, schedule_possible = {}, 1, True
# 2.1 根据出发时间段,动态设置第一天的开始时间
if departure_slot == '上午出发':
current_time = datetime.strptime("12:00", "%H:%M") # 上午用于飞行和交通,中午开始活动
elif departure_slot == '下午出发':
current_time = datetime.strptime("18:00", "%H:%M") # 下午用于飞行和交通,晚上入住或晚餐
else: # 默认情况
current_time = datetime.strptime("12:00", "%H:%M")
day_start_time_default = datetime.strptime("09:00", "%H:%M")
day_end_time_default = datetime.strptime("21:00", "%H:%M")
last_location = "Hotel"
# 3. 循环安排每个地点
for loc in visit_order:
# 3.1 动态设置当天的结束时间,特别是最后一天
day_end_time = day_end_time_default
if current_day == days: # 如果是行程的最后一天
if return_slot == '上午返回':
day_end_time = datetime.strptime("12:00", "%H:%M") # 上午必须前往机场
elif return_slot == '下午返回':
day_end_time = datetime.strptime("16:00", "%H:%M") # 留出下午的时间前往机场
# '晚上返回' 则使用默认的 21:00,表示有一整天可用
# 3.2 计算活动时间
travel_time = self.time_matrix[last_location][loc['name']]
start = current_time + timedelta(hours=travel_time)
end = start + timedelta(hours=loc['play_time_hours'])
# 3.3 检查是否超出当天可用窗口,如果超出则顺延
if end > day_end_time and last_location != "Hotel":
current_day += 1
if current_day > days: # 如果顺延后总天数超出,则此排期失败
schedule_possible = False
break
current_time = day_start_time_default # 新的一天从早上9点开始
travel_time = self.time_matrix["Hotel"][loc['name']] # 从酒店重新出发
start = current_time + timedelta(hours=travel_time)
end = start + timedelta(hours=loc['play_time_hours'])
# 顺延后再次检查是否能在新的一天排下 (处理单个活动时间超长的情况)
if end > day_end_time_default:
schedule_possible = False
break
# 3.4 将活动添加到行程单
day_key = f"Day {current_day}"
if day_key not in itinerary: itinerary[day_key] = []
itinerary[day_key].append(f"[{start.strftime('%H:%M')}-{end.strftime('%H:%M')}] {loc['name']}")
current_time, last_location = end, loc['name']
# 4. 返回结果
if schedule_possible:
return {'itinerary': itinerary, 'subset': subset}
return None
def generate_itinerary(self) -> Dict:
"""
【V7 升级版主决策引擎】整合所有模块,现在会遍历所有[地点组合] x [航班选项]
的笛卡尔积,以寻找全局成本最低的最终方案。
"""
try:
print("⚙️ 开始计算最优交通方案...")
daily_vehicle_cost = self._optimize_transport()['cost']
print(f"🚗 每日最优交通成本: {daily_vehicle_cost:.2f} 元")
print("⚙️ 开始计算最优住宿方案...")
accommodation_solution = self._optimize_accommodation()
daily_hotel_cost = accommodation_solution['total_cost']
print(f"🏨 每日最优住宿成本: {daily_hotel_cost:.2f} 元 (选择: {accommodation_solution['provider_name']})")
print("⚙️ 开始筛选地点组合...")
candidate_subsets = self._get_best_subsets()
print(f"🗺️ 共生成 {len(candidate_subsets)} 个候选地点组合。")
except ValueError as e:
return {"status": "Error", "message": str(e)}
all_valid_plans = []
print(f"⚙️ 正在为 {len(candidate_subsets)} 个地点组合与 {len(self.flight_options)} 个航班选项的所有组合进行评估...")
# 核心修改:双重循环,遍历所有可能性
for i, subset in enumerate(candidate_subsets):
for flight_option in self.flight_options:
days = flight_option['天数']
# 检查航班选项的天数是否在用户设定的范围内
if not (self.min_days <= days <= self.max_days):
continue
# 调用升级后的排期算法
schedule_result = self._solve_tsp_and_schedule(
subset,
days,
flight_option['出发时间段'],
flight_option['返回时间段']
)
# 如果在该航班约束下,行程可以被成功排完
if schedule_result:
flight_per_person = flight_option['价格']
# 严格按总价计算
ticket_total = sum(loc['ticket_price'] for loc in subset) * self.num_people
hotel_total = daily_hotel_cost * (days - 1)
vehicle_total = daily_vehicle_cost * days
flight_total = flight_per_person * self.num_people
total_cost = ticket_total + hotel_total + vehicle_total + flight_total
all_valid_plans.append({
'total_cost': total_cost,
'per_person_cost': total_cost / self.num_people,
'days': days,
'itinerary': schedule_result['itinerary'],
'cost_breakdown': {
'机票总价': flight_total, '住宿总价': hotel_total,
'交通总价': vehicle_total, '门票总价': ticket_total
},
'accommodation_details': accommodation_solution,
'flight_details': flight_option # 将航班详情也存入方案
})
print(f"📊 评估完成,共找到 {len(all_valid_plans)} 个完全可行的方案。")
if not all_valid_plans:
return {"status": "Error", "message": f"在所有组合评估后,未能找到任何可行的行程方案。"}
# 从所有可行方案中,返回总成本最低的那个
best_plan = min(all_valid_plans, key=lambda x: x['total_cost'])
best_plan['status'] = 'Success'
return best_plan
# --- 演示代码 ---
if __name__ == "__main__":
print("="*60)
print(" 通用旅游行程优化引擎 V6.0 Final - 演示 ")
print("="*60)
EXCEL_FILE = "travel_data_v6.xlsx"
# 确保演示文件存在
try:
from create_excel_data import create_travel_data_excel_v6
create_travel_data_excel_v6(EXCEL_FILE)
except ImportError:
print("警告: 未找到 create_excel_data.py, 请确保 'travel_data_v6.xlsx' 文件已存在。")
try:
# 1. 从Excel文件一键初始化引擎
planner = TravelPlannerEngine.from_excel(EXCEL_FILE)
# 2. 生成最终行程
final_itinerary = planner.generate_itinerary()
# 3. 打印结果
print("\n" + "="*60)
if final_itinerary['status'] == 'Success':
print("🎉 【最优行程方案已生成】 🎉")
print("="*60)
plan = final_itinerary
print(f"📅 总天数: {plan['days']} 天")
print(f"💰 总成本: {plan['total_cost']:.2f} 元")
print(f"👤 人均成本: {plan['per_person_cost']:.2f} 元/人")
print("\n--- 📊 成本明细 ---")
for item, cost in plan['cost_breakdown'].items():
print(f" - {item:<10}: {cost:>10.2f} 元")
print("\n--- 🏨 住宿方案 ---")
accom = plan['accommodation_details']
print(f" - 选择地点: {accom['provider_name']} ({accom['type']})")
if accom['type'] == 'Hotel':
males_plan = accom['details']['males']
females_plan = accom['details']['females']
print(f" - 男性分组: {males_plan['cost']:.2f}元, 房间: {males_plan['combination']}")
print(f" - 女性分组: {females_plan['cost']:.2f}元, 房间: {females_plan['combination']}")
else:
print(f" - {accom['details']['info']}")
print("\n--- 🗺️ 详细路书 (Itinerary) ---")
for day, activities in sorted(plan['itinerary'].items()):
print(f" {day}:")
for activity in activities:
print(f" - {activity}")
else:
print(f"❌ 行程生成失败: {final_itinerary['message']}")
print("="*60)
except (FileNotFoundError, ValueError) as e:
print(f"\n❌ 程序运行出错: {e}")