-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathfind_parallel_workflows.py
More file actions
244 lines (202 loc) · 9.18 KB
/
find_parallel_workflows.py
File metadata and controls
244 lines (202 loc) · 9.18 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
#!/usr/bin/env python3
"""
找出高并行度的工作流,测试深度强化学习的效果
"""
import pandas as pd
from sqlalchemy import create_engine
import networkx as nx
import numpy as np
engine = create_engine('mysql+pymysql://root:@localhost:3306/whalesb')
def analyze_workflow(process_id):
"""分析工作流的并行度和结构"""
try:
process = pd.read_sql(f'SELECT * FROM t_ds_process_instance WHERE id = {process_id}', engine)
if len(process) == 0:
return None
pdc = int(process.iloc[0]['process_definition_code'])
tasks = pd.read_sql(f'SELECT * FROM t_ds_task_instance WHERE process_instance_id = {process_id} AND state = 7', engine)
if len(tasks) < 5: # 至少5个任务
return None
deps = pd.read_sql(f'SELECT * FROM t_ds_process_task_relation WHERE process_definition_code = {pdc}', engine)
task_defs = pd.read_sql(f'''
SELECT code, name, task_type FROM t_ds_task_definition
WHERE code IN (SELECT pre_task_code FROM t_ds_process_task_relation WHERE process_definition_code = {pdc}
UNION SELECT post_task_code FROM t_ds_process_task_relation WHERE process_definition_code = {pdc})
''', engine)
def_code_to_name = dict(zip(task_defs['code'], task_defs['name']))
# 构建 DAG
G = nx.DiGraph()
for _, t in tasks.iterrows():
G.add_node(t['name'], task_data=t.to_dict())
for _, d in deps.iterrows():
if d['pre_task_code'] != 0:
pre = def_code_to_name.get(d['pre_task_code'])
post = def_code_to_name.get(d['post_task_code'])
if pre in G.nodes and post in G.nodes:
G.add_edge(pre, post)
def get_duration(name):
if name in G.nodes and 'task_data' in G.nodes[name]:
t = G.nodes[name]['task_data']
start = pd.to_datetime(t['start_time'])
end = pd.to_datetime(t['end_time'])
return max(1, (end - start).total_seconds())
return 1
# 计算关键路径
critical_path_length = {}
try:
for node in nx.topological_sort(G):
preds = list(G.predecessors(node))
if not preds:
critical_path_length[node] = get_duration(node)
else:
critical_path_length[node] = max(critical_path_length[p] for p in preds) + get_duration(node)
except:
return None
cp_length = max(critical_path_length.values()) if critical_path_length else 0
total_work = sum(get_duration(n) for n in G.nodes)
parallelism = total_work / cp_length if cp_length > 0 else 1
# 原始 makespan
original_start = pd.to_datetime(process.iloc[0]['start_time'])
original_end = pd.to_datetime(process.iloc[0]['end_time'])
original_makespan = (original_end - original_start).total_seconds()
return {
'process_id': process_id,
'name': process.iloc[0]['name'][:60],
'tasks': len(G.nodes),
'edges': len(G.edges),
'critical_path': cp_length,
'total_work': total_work,
'parallelism': parallelism,
'original_makespan': original_makespan,
'theoretical_improvement': (original_makespan - cp_length) / original_makespan * 100 if original_makespan > 0 else 0,
'G': G,
'process': process
}
except Exception as e:
return None
print("=" * 100)
print("搜索高并行度工作流")
print("=" * 100)
# 获取成功的工作流
print("\n正在从数据库加载工作流...")
workflows = pd.read_sql("""
SELECT id, name FROM t_ds_process_instance
WHERE state = 7
ORDER BY id DESC
LIMIT 200
""", engine)
print(f"加载了 {len(workflows)} 个工作流,正在分析...")
results = []
for i, (_, wf) in enumerate(workflows.iterrows()):
if i % 20 == 0:
print(f" 已分析 {i}/{len(workflows)}...")
r = analyze_workflow(wf['id'])
if r and r['parallelism'] >= 2.0 and r['tasks'] >= 10:
results.append(r)
print(f"\n找到 {len(results)} 个高并行度工作流 (并行度 >= 2.0, 任务数 >= 10)")
# 按并行度排序
results.sort(key=lambda x: -x['parallelism'])
print("\n" + "=" * 100)
print("Top 10 高并行度工作流:")
print("=" * 100)
print(f"{'ID':>8} | {'任务数':>6} | {'边数':>5} | {'并行度':>6} | {'理论改进':>8} | 名称")
print("-" * 100)
for r in results[:10]:
print(f"{r['process_id']:>8} | {r['tasks']:>6} | {r['edges']:>5} | {r['parallelism']:>6.1f} | {r['theoretical_improvement']:>7.1f}% | {r['name'][:50]}")
# 对前几个高并行度工作流进行详细调度分析
print("\n" + "=" * 100)
print("对高并行度工作流进行调度算法比较")
print("=" * 100)
def schedule_workflow(G, num_resources, strategy='eft'):
"""调度工作流"""
def get_duration(name):
if name in G.nodes and 'task_data' in G.nodes[name]:
t = G.nodes[name]['task_data']
start = pd.to_datetime(t['start_time'])
end = pd.to_datetime(t['end_time'])
return max(1, (end - start).total_seconds())
return 1
try:
sorted_tasks = list(nx.topological_sort(G))
except:
return float('inf')
resource_avail = {i: 0 for i in range(num_resources)}
task_finish = {}
for task_name in sorted_tasks:
duration = get_duration(task_name)
# 计算最早开始时间
earliest = 0
for pred in G.predecessors(task_name):
if pred in task_finish:
earliest = max(earliest, task_finish[pred])
# 选择资源
if strategy == 'fifo':
selected = 0
elif strategy == 'round_robin':
selected = len(task_finish) % num_resources
elif strategy == 'sjf':
selected = min(resource_avail.items(), key=lambda x: x[1])[0]
elif strategy == 'eft':
# 选择能最早完成的资源
best = 0
best_finish = float('inf')
for r, avail in resource_avail.items():
finish = max(avail, earliest) + duration
if finish < best_finish:
best_finish = finish
best = r
selected = best
elif strategy == 'fe_iddqn':
# 模拟智能调度:考虑负载均衡 + 最早完成
scores = {}
for r, avail in resource_avail.items():
start_time = max(avail, earliest)
finish_time = start_time + duration
# 基于完成时间和负载均衡的综合得分
avg_load = sum(resource_avail.values()) / num_resources
balance_penalty = abs(avail - avg_load)
scores[r] = finish_time + 0.1 * balance_penalty
selected = min(scores.items(), key=lambda x: x[1])[0]
else:
selected = 0
start = max(resource_avail[selected], earliest)
finish = start + duration
resource_avail[selected] = finish
task_finish[task_name] = finish
return max(task_finish.values()) if task_finish else 0
# 测试不同资源数
for r in results[:5]:
print(f"\n工作流 {r['process_id']}: {r['name'][:50]}")
print(f" 任务数: {r['tasks']}, 边数: {r['edges']}, 并行度: {r['parallelism']:.1f}")
print(f" 关键路径: {r['critical_path']:.0f}s, 原始执行: {r['original_makespan']:.0f}s")
G = r['G']
print(f"\n {'资源数':>6} | {'FIFO':>10} | {'RR':>10} | {'SJF':>10} | {'EFT':>10} | {'FE-IDDQN':>10} | {'理论最优':>10}")
print(f" {'-'*80}")
for num_res in [1, 2, 3, 5, 10]:
fifo = schedule_workflow(G, num_res, 'fifo')
rr = schedule_workflow(G, num_res, 'round_robin')
sjf = schedule_workflow(G, num_res, 'sjf')
eft = schedule_workflow(G, num_res, 'eft')
fe_iddqn = schedule_workflow(G, num_res, 'fe_iddqn')
theoretical = r['critical_path'] # 理论最优 = 关键路径
# 找出最佳
results_dict = {'FIFO': fifo, 'RR': rr, 'SJF': sjf, 'EFT': eft, 'FE-IDDQN': fe_iddqn}
best = min(results_dict.values())
def format_result(val, is_best):
mark = '*' if val == best else ' '
return f"{val:>9.0f}{mark}"
print(f" {num_res:>6} | {format_result(fifo, fifo==best)} | {format_result(rr, rr==best)} | {format_result(sjf, sjf==best)} | {format_result(eft, eft==best)} | {format_result(fe_iddqn, fe_iddqn==best)} | {theoretical:>10.0f}")
print("\n" + "=" * 100)
print("结论")
print("=" * 100)
print("""
* 标记表示该资源数下的最优算法
观察:
1. 当资源数 = 1 时,所有算法结果相同(只能串行执行)
2. 当资源数增加时,差异开始显现
3. EFT (Earliest Finish Time) 通常表现最好
4. 深度强化学习的优势在于:
- 可以学习更复杂的调度策略
- 适应动态变化的任务执行时间
- 考虑更多因素(如资源异构性、通信成本等)
""")