-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathwebhook.py
343 lines (300 loc) · 12.8 KB
/
webhook.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
# 标准库
import asyncio
import base64
import re
import time
import hmac
import hashlib
import logging
# 第三方库
from fastapi import APIRouter, Request, HTTPException, Header, status, BackgroundTasks
# 应用程序自定义模块
from app.config import settings, init_db_pool
from app.utils import git_api, gitee_tool
from app.utils.client import silicon_client
from app.utils import euler_maker_api as maker
router = APIRouter()
logger = logging.getLogger(__name__)
MAX_RETRIES = 0
db_pool = init_db_pool()
def verify_signature(body: bytes, signature: str) -> bool:
"""Verify HMAC signature of webhook payload."""
try:
digest = hmac.new(
settings.webhook_secret.encode(),
msg=body,
digestmod=hashlib.sha256
).hexdigest()
return hmac.compare_digest(f"sha256={digest}", signature)
except Exception as e:
logger.error(f"Signature verification failed: {e}")
return False
def extract_pr_data(data: dict) -> dict:
"""Extract and validate required PR data from webhook payload."""
if data.get("noteable_type", "") != "PullRequest":
raise ValueError("Not a pull request event")
note = data.get("note", "").strip().lower()
if note not in [cmd.strip().lower() for cmd in settings.accept_cmds]:
raise ValueError("Unsupported command")
# Check for ci_failed label
labels = data.get("pull_request", {}).get("labels", [])
logger.info(f"labels is {labels}")
if not any(label.get("name") == "ci_failed" for label in labels):
raise ValueError("PR does not have ci_failed label")
pull_request = data.get("pull_request", {})
project = data.get("project", {})
return {
"repo_url": project.get("url", ""),
"source_url": pull_request.get("head", {}).get("repo", {}).get("url", ""),
"pr_number": pull_request.get("number", ""),
"repo_name": project.get("name", ""),
"pr_url": pull_request.get("html_url", ""),
}
def compute_signature(timestamp: str):
"""
计算Gitee Webhook签名
参数格式保持字符串类型以兼容HTTP Header的文本格式
"""
# 构造签名字符串
string_to_sign = f"{timestamp}\n{settings.webhook_secret}"
# 生成HMAC-SHA256签名
secret_enc = settings.webhook_secret.encode('utf-8')
string_to_sign_enc = string_to_sign.encode('utf-8')
hmac_code = hmac.new(secret_enc, string_to_sign_enc, hashlib.sha256).digest()
# Base64编码并进行URL转义
return base64.b64encode(hmac_code).decode('utf-8')
@router.post("/webhooks/spec", status_code=status.HTTP_202_ACCEPTED)
async def handle_webhook(
request: Request,
x_gitee_token: str = Header(None), # Gitee的签名
x_gitee_timestamp: str = Header(None) # Gitee的时间戳
):
if not x_gitee_token or not x_gitee_timestamp:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Missing required headers"
)
server_signature = compute_signature(x_gitee_timestamp)
# 安全比较签名(防止时序攻击)
if not hmac.compare_digest(server_signature, x_gitee_token):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Invalid signature"
)
try:
data = await request.json()
comment = data.get("note", "").strip().lower()
logger.info(f"Received webhook request, note: {comment}")
pr_data = extract_pr_data(data)
except ValueError as e:
return
except Exception as e:
logger.error(f"Payload processing failed: {e}")
raise HTTPException(status.HTTP_400_BAD_REQUEST, "Invalid payload")
try:
# Fetch spec file
spec_content = await git_api.get_spec_content(
pr_data["repo_url"],
pr_data["pr_number"],
f'{pr_data["repo_name"]}.spec'
)
except ValueError as e:
logger.info(f"忽略不支持的事件: {e}")
return
except Exception as e:
logger.error(f"数据解析失败: {e}")
raise HTTPException(status.HTTP_400_BAD_REQUEST, "无效负载")
try:
logger.info(f"开始入库")
conn = db_pool.get_connection()
cursor = conn.cursor()
cursor.execute(
"INSERT INTO pending_requests (repo_url, source_url, pr_number, repo_name, pr_url, spec_content) "
"VALUES (%s, %s, %s, %s, %s, %s)",
(
pr_data["repo_url"],
pr_data["source_url"],
pr_data["pr_number"],
pr_data["repo_name"],
pr_data["pr_url"],
spec_content
)
)
conn.commit()
return {"status": "处理已启动"}
except Exception as e:
conn.rollback()
raise HTTPException(status.HTTP_500_INTERNAL_SERVER_ERROR, "数据库写入异常")
finally:
cursor.close()
conn.close()
async def wait_for_build_completion(build_id: str, interval: int = 30, timeout: int = 36000) -> bool:
"""优化后的异步等待构建完成"""
start_time = time.time()
while time.time() - start_time < timeout:
status_data = await maker.get_build_status(build_id) # 直接调用异步版本
if status_data:
build_status = status_data.get('status')
if build_status == 201:
return True
elif build_status == 202:
return False
await asyncio.sleep(interval)
return False
async def handle_build_retries(pr_data: dict, current_spec: str, srcDir: str, build_id: str, retry_count: int,
commit_url: str,
maker_url: str):
"""处理构建重试逻辑"""
old_version, new_version = git_api.get_upgrade_versions(
pr_data["repo_url"],
pr_data["pr_number"],
f'{pr_data["repo_name"]}.spec'
)
try:
build_status = await wait_for_build_completion(build_id)
logger.info(f'the build result is {build_status}')
if build_status:
# 构建成功,提交评论
comment = settings.fix_success_comment.format(
commit_url=commit_url,
maker_url=maker_url
)
git_api.comment_on_pr(pr_data["repo_url"], pr_data["pr_number"], comment)
logger.info(f"PR #{pr_data['pr_number']} 构建成功,重试次数: {retry_count}")
elif retry_count < MAX_RETRIES:
# 获取失败日志
job_id = maker.get_job_id(settings.os_repair_project, pr_data["repo_name"])
log_url = maker.get_log_url(maker.get_result_root(job_id))
log_content = maker.get_build_log(log_url)
# 分析新日志生成修正
chat = silicon_client.SiliconFlowChat(settings.silicon_token)
new_spec, fail_reason = chat.analyze_build_log(pr_data["repo_name"], current_spec, log_content, srcDir)
# 提交新修正
fork_url, commit_sha, branch = git_api.check_and_push(
pr_data["source_url"],
new_spec,
pr_data["pr_number"]
)
# 触发新构建
new_build_id = maker.start_build_single(
settings.os_repair_project,
pr_data["repo_name"]
)
repair_job_id = maker.get_job_id(settings.os_repair_project, pr_data["repo_name"])
commit_url = f"{fork_url}/commit/{commit_sha}"
maker_url = (
f"https://eulermaker.compass-ci.openeuler.openatom.cn/package/build-record?"
f"osProject={settings.os_repair_project}&"
f"packageName={pr_data['repo_name']}&"
f"jobId={repair_job_id}"
)
# 递归处理
await handle_build_retries(pr_data, new_spec, srcDir, new_build_id, retry_count + 1, commit_url, maker_url)
else:
# 达到最大重试次数
logger.info(f"old_version is {old_version}, new_version is {new_version}")
comment = settings.fix_failure_comment.format(
package=pr_data["repo_name"],
old_version=old_version,
new_version=new_version,
commit_url=commit_url,
maker_url=maker_url,
)
issue_url = await analyze_error_and_create_issue(pr_data, old_version, new_version)
if issue_url:
comment += (f"\n升级后,发现缺少依赖包情况,openEuler-AutoRepair已经提出Issue,请留意处理进度.\n"
f"Issue链接:{issue_url}。")
git_api.comment_on_pr(pr_data["repo_url"], pr_data["pr_number"], comment)
logger.error(f"PR #{pr_data['pr_number']} 构建失败,已达最大重试次数")
except Exception as e:
logger.error(f"处理重试时发生异常: {e}")
comment = settings.fix_error_comment.format(error=str(e))
git_api.comment_on_pr(pr_data["repo_url"], pr_data["pr_number"], comment)
async def process_initial_repair(pr_data: dict, original_spec: str):
"""Process initial repair."""
try:
# Get build log
os_project = settings.os_project.format(
repo=pr_data["repo_name"],
pr_number=pr_data["pr_number"]
)
job_id = maker.get_job_id(os_project, pr_data["repo_name"])
log_url = maker.get_log_url(maker.get_result_root(job_id))
log_content = maker.get_build_log(log_url)
srcDir = gitee_tool.get_dir_json(pr_data["pr_url"], settings.gitee_token)
# Analyze build log
chat = silicon_client.SiliconFlowChat(settings.silicon_token)
fixed_spec, fail_reason = chat.analyze_build_log(pr_data["repo_name"], original_spec, log_content, srcDir)
# Comment Fail Reason In Pr
git_api.comment_on_pr(pr_data["repo_url"], pr_data["pr_number"], fail_reason)
# Update spec in fork
fork_url, commit_sha, branch = git_api.check_and_push(
pr_data["source_url"],
fixed_spec,
pr_data["pr_number"]
)
logger.info("start euler maker build")
# Trigger Euler Maker build
maker.add_software_package(
settings.os_repair_project,
pr_data["repo_name"],
"",
fork_url,
branch
)
logger.info("add build target")
maker.add_build_target(
settings.os_repair_project,
pr_data["repo_name"],
settings.os_variant,
settings.os_arch,
settings.ground_projects,
settings.flag_build,
settings.flag_publish
)
logger.info("start build single")
repair_build_id = maker.start_build_single(settings.os_repair_project, pr_data["repo_name"])
repair_job_id = maker.get_job_id(settings.os_repair_project, pr_data["repo_name"])
commit_url = f"{fork_url}/commit/{commit_sha}"
maker_url = (f"https://eulermaker.compass-ci.openeuler.openatom.cn/package/build-record?"
f"osProject={settings.os_repair_project}&"
f"packageName={pr_data['repo_name']}&"
f"jobId={repair_job_id}")
await handle_build_retries(pr_data, fixed_spec, srcDir, repair_build_id, 0, commit_url, maker_url)
except Exception as e:
logger.error(f"初始修复流程失败: {e}")
comment = settings.fix_error_comment.format(error=str(e))
git_api.comment_on_pr(pr_data["repo_url"], pr_data["pr_number"], comment)
async def analyze_error_and_create_issue(pr_data: dict, old_version, new_version):
"""分析错误并创建Issue"""
# 分析错误日志
try:
# Get build log
job_id = maker.get_job_id(settings.os_repair_project, pr_data["repo_name"])
log_url = maker.get_log_url(maker.get_result_root(job_id))
log_content = maker.get_build_log(log_url)
warning_patterns = [
r"Warning:.*",
r"skipped:.*",
r"warning:.*"
r"WARNING:.*",
r"No matching package to install:.*",
r".*is not installed.*"
]
warnings = []
for pattern in warning_patterns:
matches = re.findall(pattern, log_content)
warnings.extend(matches)
logger.info(f"the build warning info : {warnings}")
chat = silicon_client.SiliconFlowChat(settings.silicon_token)
title, content = chat.analyze_missing_package(warnings)
if title and content:
issue_url = git_api.create_issue(pr_data["repo_url"], title, settings.missing_package_comment.format(
old_version=old_version,
new_version=new_version,
missing_packages=content))
return issue_url
return ""
except Exception as e:
logger.error(f"获取构建日志失败: {e}")
return ""