- A real-time cybersecurity threat detection system using machine learning to identify and automatically respond to authentication attacks, application-level attacks, and insider threats.
-
Authentication Attacks
- Brute force detection
- Credential stuffing identification
- Anomalous login pattern recognition
-
Application-Level Attacks
- SQL Injection detection
- Cross-Site Scripting (XSS) detection
- Path traversal detection
- Command injection detection
- Rate limit abuse / DoS detection
-
Insider Threats
- Data exfiltration detection
- Unusual access time monitoring
- Anomalous user behavior analysis
- Behavioral baseline establishment
- Real-time event processing
- Machine learning-based anomaly detection
- Automated threat response system
- Comprehensive logging and audit trail
- Dashboard analytics
- Rule-based and ML-hybrid detection
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Event Sources β
β (Auth Logs, API Requests, User Actions) β
ββββββββββββββββββ¬βββββββββββββββββββββββββββββββββββββββββ
β
βΌ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Detection Engine β
β ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ β
β β Auth β β App β β Insider β β
β β Detector β β Detector β β Detector β β
β ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ β
ββββββββββββββββββ¬βββββββββββββββββββββββββββββββββββββββββ
β
βΌ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β SQLite Database β
β β’ Events β’ Threats β’ Predictions β’ Actions β
ββββββββββββββββββ¬βββββββββββββββββββββββββββββββββββββββββ
β
βΌ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Response System β
β β’ Block IP β’ Rate Limit β’ Alert β’ Freeze Account β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
threat-detection-system/
β
βββ threat_detection_db.py # Database schema and operations
βββ ml_threat_models.py # ML models for each threat type
βββ detection_engine.py # Real-time detection orchestration
βββ demo_system.py # Complete demo with examples
β
βββ demo files/
β βββ threat_detection.db # SQLite database (created on first run)
β βββ threat_demo.db
β
βββ README.md # This file
# Python 3.8 or higher
python --version
# Required packages
pip install numpy scikit-learn-
Clone or download the project files
threat_detection_db.pyml_threat_models.pydetection_engine.pydemo_system.py
-
Install dependencies
pip install numpy scikit-learn- Run the demo
python demo_system.pyThe demo will:
- Initialize the database
- Generate synthetic training data
- Train ML models
- Demonstrate all threat detection capabilities
- Show automated responses
- Display dashboard statistics
from detection_engine import ThreatDetectionEngine
# Initialize engine
engine = ThreatDetectionEngine()
# Train models on historical data
engine.train_models(days_of_history=30)
# Process authentication event
result = engine.process_auth_event(
username="user123",
ip_address="192.168.1.100",
success=False,
failure_reason="Invalid password"
)
# Process application event
result = engine.process_app_event(
ip_address="10.0.0.50",
endpoint="/api/users",
method="POST",
payload='{"username": "admin"}',
response_code=200
)
# Process user behavior
result = engine.process_user_behavior(
user_id="user123",
action_type="file_download",
resource_accessed="/sensitive/data.csv",
data_volume=1048576
)# Modify response rules in detection_engine.py
engine.response_system.response_rules['sql_injection']['high'] = [
'block_ip',
'block_endpoint',
'alert_admin',
'custom_action'
]# Get dashboard statistics
stats = engine.get_dashboard_stats()
print(f"Threats in last 24h: {stats['total_threats']}")
# Get top attacking IPs
top_ips = engine.db.get_top_attacking_ips(limit=10)
# Get threat statistics
threat_stats = engine.db.get_threat_statistics(hours=24)- Algorithm: Isolation Forest + Rule-based
- Features: Failure rate, unique usernames, time patterns, business hours
- Detection: Brute force, credential stuffing, anomalous patterns
- Algorithm: Pattern matching + Regex-based
- Detection: SQL injection, XSS, path traversal, command injection
- Patterns: 20+ malicious payload signatures
- Algorithm: Isolation Forest with behavioral baselines
- Features: Data volume deviation, session patterns, resource access, time anomalies
- Detection: Data exfiltration, unusual access times, anomalous behavior
auth_events: Authentication attempts with success/failure app_events: Application requests with payloads user_behavior: User actions and resource access threats: Detected threats with severity and confidence ml_predictions: Model predictions with features response_actions: Automated responses taken blocked_ips: Currently blocked IP addresses
# Authentication
FAILED_LOGIN_THRESHOLD = 5
TIME_WINDOW_MINUTES = 10
CREDENTIAL_STUFFING_THRESHOLD = 3
# Rate limiting
RATE_LIMIT_THRESHOLD = 100 # requests per time window# Enable/disable auto-response
engine.auto_response_enabled = True
# Modify time window
engine.time_window_minutes = 10-
Database Security
- Use encrypted SQLite with SQLCipher
- Implement proper access controls
- Regular backups with encryption
-
API Integration
- Use authentication for all endpoints
- Rate limit the detection API itself
- Validate all inputs before processing
-
Response Actions
- Implement approval workflows for critical actions
- Add rollback mechanisms
- Log all automated responses
-
False Positive Management
- Implement feedback loops
- Regular model retraining
- Whitelist trusted IPs/users
# Create new detector class
class NewThreatDetector:
def __init__(self):
self.model = IsolationForest()
def train(self, data):
# Training logic
pass
def predict(self, event):
# Detection logic
return {'detected': True/False, ...}
# Integrate into engine
engine.new_detector = NewThreatDetector()# Add to ThreatResponseSystem
def custom_action(self, threat_id, details):
# Your custom response logic
# e.g., integrate with SIEM, send Slack alert, etc.
return True# Add to detection_engine.py
def check_threat_intel(self, ip_address):
# Query threat intelligence APIs
# e.g., AbuseIPDB, VirusTotal
# Update threat_intel table
pass- Batch Processing: Process events in batches
- Async Operations: Use async/await for I/O operations
- Caching: Cache model predictions and threat intel
- Database Indexing: Optimize indexes for query patterns
- Model Optimization: Use lightweight models for real-time detection
# Run the demo
python demo_system.py
# Test individual components
python threat_detection_db.py
python ml_threat_models.py
python detection_engine.pyAll events are logged to the SQLite database:
- Authentication attempts
- Application requests
- User behaviors
- Threats detected
- ML predictions
- Response actions taken
Query logs:
cursor = engine.db.conn.cursor()
cursor.execute("SELECT * FROM threats ORDER BY timestamp DESC LIMIT 10")
recent_threats = cursor.fetchall()- To extend this system:
- Add new detection patterns
- Improve ML models
- Add new response actions
- Enhance the dashboard
- Integrate with external systems
- This is a demonstration/educational project. Adapt as needed for your use case.
- This system is designed for educational and demonstration purposes. For production use:
- Conduct thorough security audits
- Implement proper monitoring
- Test extensively with your data
- Consider compliance requirements (GDPR, etc.)
- Have incident response procedures in place
@app.route('/api/login', methods=['POST'])
def login():
# Process login
result = engine.process_auth_event(
username=request.json['username'],
ip_address=request.remote_addr,
success=auth_successful,
user_agent=request.headers.get('User-Agent')
)
if result.get('blocked'):
return jsonify({'error': 'Access denied'}), 403
# Continue with normal logindef threat_detection_middleware(request):
result = engine.process_app_event(
ip_address=request.client.host,
endpoint=request.url.path,
method=request.method,
payload=await request.body(),
query_params=str(request.query_params)
)
if result.get('threat_detected'):
raise HTTPException(status_code=403)
return await call_next(request)- For questions or issues:
- Review the demo output
- Check the database logs
- Examine model predictions
- Adjust thresholds as needed
Built with Python, SQLite, and scikit-learn π π π€