-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathauth.py
More file actions
142 lines (111 loc) · 3.98 KB
/
auth.py
File metadata and controls
142 lines (111 loc) · 3.98 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
"""
认证模块
处理用户认证和 JWT 令牌
"""
import jwt
import bcrypt
from datetime import datetime, timedelta
from functools import wraps
from flask import request, jsonify
from config import Config
from database import get_connection, dict_factory
def hash_password(password: str) -> str:
"""加密密码"""
return bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt()).decode('utf-8')
def verify_password(password: str, password_hash: str) -> bool:
"""验证密码"""
return bcrypt.checkpw(password.encode('utf-8'), password_hash.encode('utf-8'))
def generate_token(user_id: int, username: str) -> str:
"""生成 JWT 令牌"""
payload = {
'user_id': user_id,
'username': username,
'exp': datetime.utcnow() + Config.JWT_EXPIRATION_DELTA,
'iat': datetime.utcnow()
}
return jwt.encode(payload, Config.JWT_SECRET_KEY, algorithm=Config.JWT_ALGORITHM)
def verify_token(token: str) -> dict:
"""验证 JWT 令牌"""
try:
payload = jwt.decode(token, Config.JWT_SECRET_KEY, algorithms=[Config.JWT_ALGORITHM])
return payload
except jwt.ExpiredSignatureError:
return None
except jwt.InvalidTokenError:
return None
def get_current_user():
"""从请求中获取当前用户"""
auth_header = request.headers.get('Authorization')
if not auth_header:
return None
try:
token = auth_header.split(' ')[1] # Bearer <token>
payload = verify_token(token)
if not payload:
return None
# 从数据库获取用户信息
conn = get_connection()
conn.row_factory = dict_factory
cursor = conn.cursor()
cursor.execute('SELECT * FROM users WHERE id = ?', (payload['user_id'],))
user = cursor.fetchone()
conn.close()
return user
except (IndexError, KeyError):
return None
def login_required(f):
"""需要登录的装饰器"""
@wraps(f)
def decorated_function(*args, **kwargs):
user = get_current_user()
if not user:
return jsonify({'error': '需要认证', 'message': '请提供有效的 JWT 令牌'}), 401
request.current_user = user
return f(*args, **kwargs)
return decorated_function
def register_user(username: str, email: str, password: str) -> dict:
"""注册新用户"""
conn = get_connection()
cursor = conn.cursor()
# 检查用户名是否已存在
cursor.execute('SELECT id FROM users WHERE username = ?', (username,))
if cursor.fetchone():
conn.close()
return {'error': '用户名已存在'}
# 检查邮箱是否已存在
cursor.execute('SELECT id FROM users WHERE email = ?', (email,))
if cursor.fetchone():
conn.close()
return {'error': '邮箱已被注册'}
# 创建新用户
password_hash = hash_password(password)
now = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
cursor.execute('''
INSERT INTO users (username, email, password_hash, created_at)
VALUES (?, ?, ?, ?)
''', (username, email, password_hash, now))
user_id = cursor.lastrowid
conn.commit()
conn.close()
return {'user_id': user_id, 'username': username, 'email': email}
def authenticate_user(username: str, password: str) -> dict:
"""验证用户登录"""
conn = get_connection()
conn.row_factory = dict_factory
cursor = conn.cursor()
cursor.execute('SELECT * FROM users WHERE username = ?', (username,))
user = cursor.fetchone()
conn.close()
if not user:
return {'error': '用户名或密码错误'}
if not verify_password(password, user['password_hash']):
return {'error': '用户名或密码错误'}
token = generate_token(user['id'], user['username'])
return {
'user': {
'id': user['id'],
'username': user['username'],
'email': user['email']
},
'token': token
}