-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
415 lines (335 loc) · 16.7 KB
/
main.py
File metadata and controls
415 lines (335 loc) · 16.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
"""
CTF Web漏洞扫描器主程序
"""
import sys
import os
import time
import json
from datetime import datetime
from typing import List, Dict, Optional
import argparse
from colorama import init, Fore, Back, Style
from tabulate import tabulate
from tqdm import tqdm
# 添加scanner模块到路径
sys.path.append(os.path.join(os.path.dirname(__file__), 'scanner'))
from scanner.web_crawler import WebCrawler
from scanner.sql_injection import SQLInjectionDetector
from scanner.xss_detector import XSSDetector
from scanner.command_injection import CommandInjectionDetector
from scanner.fuzzer import IntelligentFuzzer
# 初始化colorama
init(autoreset=True)
class VulnerabilityScanner:
"""漏洞扫描器主类"""
def __init__(self):
self.crawler = WebCrawler()
self.sql_detector = SQLInjectionDetector()
self.xss_detector = XSSDetector()
self.command_detector = CommandInjectionDetector()
self.fuzzer = IntelligentFuzzer()
self.results = {
'scan_info': {},
'vulnerabilities': [],
'statistics': {}
}
def print_banner(self):
"""打印程序横幅"""
banner = f"""
{Fore.CYAN}╔══════════════════════════════════════════════════════════════╗
║ CTF Web漏洞扫描器 v1.0.0 ║
║ 专为CTF学习设计的安全工具 ║
╚══════════════════════════════════════════════════════════════╝{Style.RESET_ALL}
{Fore.YELLOW}支持的漏洞类型:{Style.RESET_ALL}
• SQL注入 (SQLi) • 跨站脚本 (XSS) • 命令注入
• 文件包含 • 路径遍历 • LDAP注入
• XML注入 • NoSQL注入 • 模糊测试
{Fore.RED}⚠️ 警告: 仅用于学习和授权的安全测试,请勿用于非法用途!{Style.RESET_ALL}
"""
print(banner)
def get_user_input(self) -> Dict:
"""获取用户输入"""
print(f"{Fore.GREEN}请输入扫描配置:{Style.RESET_ALL}")
url = input(f"{Fore.CYAN}目标URL: {Style.RESET_ALL}").strip()
if not url:
print(f"{Fore.RED}错误: URL不能为空{Style.RESET_ALL}")
return None
# 添加协议前缀
if not url.startswith(('http://', 'https://')):
url = 'http://' + url
print(f"\n{Fore.CYAN}扫描选项:{Style.RESET_ALL}")
print("1. 快速扫描 (仅检测常见漏洞)")
print("2. 标准扫描 (检测所有漏洞类型)")
print("3. 深度扫描 (包含模糊测试和机器学习)")
scan_mode = input(f"{Fore.CYAN}请选择扫描模式 (1-3): {Style.RESET_ALL}").strip()
# 扫描深度
depth = 1
if scan_mode == '3':
depth_input = input(f"{Fore.CYAN}爬取深度 (1-3, 默认1): {Style.RESET_ALL}").strip()
if depth_input.isdigit():
depth = min(int(depth_input), 3)
return {
'url': url,
'scan_mode': scan_mode,
'depth': depth
}
def scan_website(self, config: Dict) -> Dict:
"""扫描网站"""
url = config['url']
scan_mode = config['scan_mode']
depth = config['depth']
print(f"\n{Fore.GREEN}开始扫描: {url}{Style.RESET_ALL}")
print(f"{Fore.CYAN}扫描模式: {self._get_scan_mode_name(scan_mode)}{Style.RESET_ALL}")
print(f"{Fore.CYAN}爬取深度: {depth}{Style.RESET_ALL}")
# 记录扫描信息
self.results['scan_info'] = {
'url': url,
'scan_mode': scan_mode,
'depth': depth,
'start_time': datetime.now().isoformat(),
'scan_type': self._get_scan_mode_name(scan_mode)
}
try:
# 1. 爬取网站
print(f"\n{Fore.YELLOW}步骤 1/4: 爬取网站信息...{Style.RESET_ALL}")
pages = self.crawler.crawl_site(url, max_depth=depth)
if not pages:
print(f"{Fore.RED}错误: 无法访问目标网站{Style.RESET_ALL}")
return self.results
print(f"{Fore.GREEN}✓ 成功爬取 {len(pages)} 个页面{Style.RESET_ALL}")
# 2. 分析可疑端点
print(f"\n{Fore.YELLOW}步骤 2/4: 分析可疑端点...{Style.RESET_ALL}")
vulnerable_endpoints = self.crawler.get_vulnerable_endpoints(pages)
if not vulnerable_endpoints:
print(f"{Fore.YELLOW}⚠ 未发现可疑端点{Style.RESET_ALL}")
return self.results
print(f"{Fore.GREEN}✓ 发现 {len(vulnerable_endpoints)} 个可疑端点{Style.RESET_ALL}")
# 3. 漏洞检测
print(f"\n{Fore.YELLOW}步骤 3/4: 执行漏洞检测...{Style.RESET_ALL}")
self._perform_vulnerability_tests(vulnerable_endpoints, scan_mode)
# 4. 生成报告
print(f"\n{Fore.YELLOW}步骤 4/4: 生成扫描报告...{Style.RESET_ALL}")
self._generate_statistics()
self.results['scan_info']['end_time'] = datetime.now().isoformat()
self.results['scan_info']['duration'] = self._calculate_duration()
print(f"{Fore.GREEN}✓ 扫描完成{Style.RESET_ALL}")
except Exception as e:
print(f"{Fore.RED}扫描过程中发生错误: {e}{Style.RESET_ALL}")
return self.results
def _get_scan_mode_name(self, mode: str) -> str:
"""获取扫描模式名称"""
modes = {
'1': '快速扫描',
'2': '标准扫描',
'3': '深度扫描'
}
return modes.get(mode, '未知模式')
def _perform_vulnerability_tests(self, endpoints: List[Dict], scan_mode: str):
"""执行漏洞测试"""
total_tests = len(endpoints)
with tqdm(total=total_tests, desc="漏洞检测", unit="端点") as pbar:
for endpoint in endpoints:
try:
# 根据端点类型选择测试方法
if endpoint['type'] == 'url_parameter':
self._test_url_parameter(endpoint, scan_mode)
elif endpoint['type'] == 'form_field':
self._test_form_field(endpoint, scan_mode)
pbar.update(1)
time.sleep(0.1) # 避免请求过快
except Exception as e:
print(f"{Fore.RED}测试端点失败: {e}{Style.RESET_ALL}")
pbar.update(1)
def _test_url_parameter(self, endpoint: Dict, scan_mode: str):
"""测试URL参数"""
url = endpoint['url']
parameter = endpoint['parameter']
# SQL注入检测
if scan_mode in ['2', '3']:
sql_results = self.sql_detector.detect_sql_injection(url, parameter)
self.results['vulnerabilities'].extend(sql_results)
# XSS检测
if scan_mode in ['2', '3']:
xss_results = self.xss_detector.detect_xss(url, parameter)
self.results['vulnerabilities'].extend(xss_results)
# 命令注入检测
if scan_mode in ['2', '3']:
command_results = self.command_detector.detect_command_injection(url, parameter)
self.results['vulnerabilities'].extend(command_results)
# 模糊测试
if scan_mode == '3':
fuzz_results = self.fuzzer.fuzz_parameter(url, parameter)
self.results['vulnerabilities'].extend(fuzz_results)
def _test_form_field(self, endpoint: Dict, scan_mode: str):
"""测试表单字段"""
url = endpoint['url']
parameter = endpoint['parameter']
method = endpoint.get('method', 'GET')
# SQL注入检测
if scan_mode in ['2', '3']:
sql_results = self.sql_detector.detect_sql_injection(url, parameter, method)
self.results['vulnerabilities'].extend(sql_results)
# XSS检测
if scan_mode in ['2', '3']:
xss_results = self.xss_detector.detect_xss(url, parameter, method)
self.results['vulnerabilities'].extend(xss_results)
# 命令注入检测
if scan_mode in ['2', '3']:
command_results = self.command_detector.detect_command_injection(url, parameter, method)
self.results['vulnerabilities'].extend(command_results)
# 模糊测试
if scan_mode == '3':
fuzz_results = self.fuzzer.fuzz_parameter(url, parameter, method)
self.results['vulnerabilities'].extend(fuzz_results)
def _generate_statistics(self):
"""生成统计信息"""
vulnerabilities = self.results['vulnerabilities']
# 按类型统计
vuln_types = {}
for vuln in vulnerabilities:
vuln_type = getattr(vuln, 'injection_type', getattr(vuln, 'xss_type', getattr(vuln, 'vulnerability_type', 'unknown')))
vuln_types[vuln_type] = vuln_types.get(vuln_type, 0) + 1
# 按风险等级统计
risk_levels = {'high': 0, 'medium': 0, 'low': 0}
for vuln in vulnerabilities:
confidence = getattr(vuln, 'confidence', 0)
if confidence >= 0.8:
risk_levels['high'] += 1
elif confidence >= 0.6:
risk_levels['medium'] += 1
else:
risk_levels['low'] += 1
self.results['statistics'] = {
'total_vulnerabilities': len(vulnerabilities),
'vulnerability_types': vuln_types,
'risk_levels': risk_levels
}
def _calculate_duration(self) -> str:
"""计算扫描持续时间"""
start_time = datetime.fromisoformat(self.results['scan_info']['start_time'])
end_time = datetime.fromisoformat(self.results['scan_info']['end_time'])
duration = end_time - start_time
hours, remainder = divmod(duration.total_seconds(), 3600)
minutes, seconds = divmod(remainder, 60)
if hours > 0:
return f"{int(hours)}小时{int(minutes)}分钟{int(seconds)}秒"
elif minutes > 0:
return f"{int(minutes)}分钟{int(seconds)}秒"
else:
return f"{int(seconds)}秒"
def display_results(self):
"""显示扫描结果"""
print(f"\n{Fore.CYAN}{'='*60}{Style.RESET_ALL}")
print(f"{Fore.CYAN}扫描结果报告{Style.RESET_ALL}")
print(f"{Fore.CYAN}{'='*60}{Style.RESET_ALL}")
# 扫描信息
scan_info = self.results['scan_info']
print(f"\n{Fore.YELLOW}扫描信息:{Style.RESET_ALL}")
print(f" 目标URL: {scan_info.get('url', 'N/A')}")
print(f" 扫描模式: {scan_info.get('scan_type', 'N/A')}")
print(f" 扫描时间: {scan_info.get('start_time', 'N/A')}")
print(f" 持续时间: {scan_info.get('duration', 'N/A')}")
# 统计信息
stats = self.results['statistics']
print(f"\n{Fore.YELLOW}统计信息:{Style.RESET_ALL}")
print(f" 总漏洞数: {stats.get('total_vulnerabilities', 0)}")
# 漏洞类型统计
vuln_types = stats.get('vulnerability_types', {})
if vuln_types:
print(f"\n{Fore.YELLOW}漏洞类型分布:{Style.RESET_ALL}")
for vuln_type, count in vuln_types.items():
print(f" {vuln_type}: {count}")
# 风险等级统计
risk_levels = stats.get('risk_levels', {})
if risk_levels:
print(f"\n{Fore.YELLOW}风险等级分布:{Style.RESET_ALL}")
for level, count in risk_levels.items():
color = Fore.RED if level == 'high' else Fore.YELLOW if level == 'medium' else Fore.GREEN
print(f" {color}{level.upper()}: {count}{Style.RESET_ALL}")
# 详细漏洞信息
vulnerabilities = self.results['vulnerabilities']
if vulnerabilities:
print(f"\n{Fore.YELLOW}详细漏洞信息:{Style.RESET_ALL}")
# 按置信度排序
vulnerabilities.sort(key=lambda x: getattr(x, 'confidence', 0), reverse=True)
# 显示前10个漏洞
for i, vuln in enumerate(vulnerabilities[:10]):
confidence = getattr(vuln, 'confidence', 0)
vuln_type = getattr(vuln, 'injection_type', getattr(vuln, 'xss_type', getattr(vuln, 'vulnerability_type', 'unknown')))
payload = getattr(vuln, 'payload', 'N/A')
evidence = getattr(vuln, 'evidence', 'N/A')
# 根据置信度选择颜色
if confidence >= 0.8:
color = Fore.RED
elif confidence >= 0.6:
color = Fore.YELLOW
else:
color = Fore.GREEN
print(f"\n{color}漏洞 #{i+1}{Style.RESET_ALL}")
print(f" 类型: {vuln_type}")
print(f" 置信度: {confidence:.2f}")
print(f" 载荷: {payload}")
print(f" 证据: {evidence}")
# 保存结果
self._save_results()
def _save_results(self):
"""保存扫描结果"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"scan_results_{timestamp}.json"
try:
# 转换为可序列化的格式
serializable_results = {
'scan_info': self.results['scan_info'],
'statistics': self.results['statistics'],
'vulnerabilities': []
}
for vuln in self.results['vulnerabilities']:
vuln_dict = {
'url': getattr(vuln, 'url', ''),
'parameter': getattr(vuln, 'parameter', ''),
'type': getattr(vuln, 'injection_type', getattr(vuln, 'xss_type', getattr(vuln, 'vulnerability_type', 'unknown'))),
'payload': getattr(vuln, 'payload', ''),
'confidence': getattr(vuln, 'confidence', 0),
'evidence': getattr(vuln, 'evidence', ''),
'response_time': getattr(vuln, 'response_time', 0)
}
serializable_results['vulnerabilities'].append(vuln_dict)
with open(filename, 'w', encoding='utf-8') as f:
json.dump(serializable_results, f, indent=2, ensure_ascii=False)
print(f"\n{Fore.GREEN}扫描结果已保存到: {filename}{Style.RESET_ALL}")
except Exception as e:
print(f"{Fore.RED}保存结果失败: {e}{Style.RESET_ALL}")
def main():
"""主函数"""
parser = argparse.ArgumentParser(description='CTF Web漏洞扫描器')
parser.add_argument('-u', '--url', help='目标URL')
parser.add_argument('-m', '--mode', choices=['1', '2', '3'], default='2', help='扫描模式')
parser.add_argument('-d', '--depth', type=int, default=1, help='爬取深度')
parser.add_argument('--headless', action='store_true', help='无头模式')
args = parser.parse_args()
scanner = VulnerabilityScanner()
if not args.headless:
scanner.print_banner()
if args.url:
# 命令行模式
config = {
'url': args.url,
'scan_mode': args.mode,
'depth': args.depth
}
else:
# 交互模式
config = scanner.get_user_input()
if not config:
return
# 执行扫描
results = scanner.scan_website(config)
# 显示结果
scanner.display_results()
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print(f"\n{Fore.YELLOW}扫描被用户中断{Style.RESET_ALL}")
except Exception as e:
print(f"\n{Fore.RED}程序发生错误: {e}{Style.RESET_ALL}")