-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathapp.py
More file actions
158 lines (130 loc) · 4.7 KB
/
app.py
File metadata and controls
158 lines (130 loc) · 4.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
from flask import Flask, request, jsonify, render_template
import logging
from urllib.parse import urlparse
from url_analyzer import URLAnalyzer
import sqlite3
from typing import Dict, Optional
import os
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = Flask(__name__)
url_analyzer = URLAnalyzer()
def get_db():
"""Get database connection."""
db = sqlite3.connect('data/whitelist.db')
db.row_factory = sqlite3.Row
return db
def check_whitelist(domain: str) -> Dict:
"""Check if a domain is in the whitelist.
Args:
domain: Domain to check
Returns:
Dict with whitelist status and details
"""
try:
# Create a new connection for this request
db = sqlite3.connect('data/whitelist.db')
db.row_factory = sqlite3.Row
cursor = db.cursor()
# Check umbrella table
cursor.execute("""
SELECT rank, last_updated
FROM umbrella
WHERE domain = ?
""", (domain,))
result = cursor.fetchone()
# Close connection
db.close()
if result:
return {
"status": "whitelisted",
"rank": result['rank'],
"last_updated": result['last_updated']
}
return {"status": "not_whitelisted"}
except Exception as e:
logger.error(f"Failed to check whitelist for {domain}: {str(e)}")
return {"status": "error", "error": str(e)}
@app.route('/')
def index():
"""Render main page."""
return render_template('index.html')
@app.route('/api/check', methods=['POST'])
def check_url():
"""Check if URL is phishing."""
try:
data = request.get_json()
url = data.get('url')
detailed = data.get('detailed', False)
if not url:
return jsonify({"error": "URL is required"}), 400
# Parse domain for whitelist check
domain = urlparse(url).netloc
if domain.startswith('www.'):
domain = domain[4:]
# Check whitelist first
whitelist_result = check_whitelist(domain)
# If whitelisted, return early
if whitelist_result["status"] == "whitelisted":
return jsonify({
"url": url,
"is_phishing": False,
"confidence": 1.0,
"details": {
"whitelist_status": "whitelisted",
"whitelist_rank": whitelist_result["rank"],
"whitelist_updated": whitelist_result["last_updated"]
}
})
# If not whitelisted, analyze URL
analysis = url_analyzer.analyze_url(url)
response = {
"url": url,
"is_phishing": analysis["is_phishing"],
"confidence": analysis["confidence"]
}
if detailed:
response["details"] = {
"whitelist_status": whitelist_result["status"],
"model_score": analysis["confidence"],
"features": analysis["features"],
"risk_factors": analysis["analysis"]["risk_factors"]
}
return jsonify(response)
except Exception as e:
logger.error(f"Error processing request: {str(e)}")
return jsonify({"error": str(e)}), 500
@app.route('/api/batch', methods=['POST'])
def check_batch():
"""Check multiple URLs for phishing."""
try:
data = request.get_json()
urls = data.get('urls', [])
if not urls:
return jsonify({"error": "URLs list is required"}), 400
results = []
for url in urls:
# Create a mock request for check_url
with app.test_request_context(json={'url': url, 'detailed': True}):
# Call check_url directly
response = check_url()
# Get the JSON data from the response
if isinstance(response, tuple):
result = response[0].get_json()
else:
result = response.get_json()
results.append(result)
return jsonify({
"results": results,
"total": len(results),
"phishing_count": sum(1 for r in results if r["is_phishing"])
})
except Exception as e:
logger.error(f"Error processing batch request: {str(e)}")
return jsonify({"error": str(e)}), 500
if __name__ == '__main__':
# Ensure data directory exists
os.makedirs('data', exist_ok=True)
# Start Flask app
app.run(debug=True)