|
1 | | -from dataclasses import dataclass |
2 | | -import typing as t |
3 | 1 | import abc |
4 | | -import string |
5 | 2 | import datetime |
| 3 | +import string |
| 4 | +import typing as t |
| 5 | +from dataclasses import dataclass |
6 | 6 | from datetime import timezone |
7 | 7 |
|
| 8 | +from django.contrib.auth import get_user_model, login, logout |
8 | 9 | from django.core.mail import EmailMessage |
9 | | -from django.contrib.auth import get_user_model |
10 | | - |
11 | | -from django.contrib.auth import login, logout |
12 | | -from . import token |
13 | | - |
14 | | - |
15 | | -class EmailLoginInfo(object): |
16 | | - subject: str |
17 | | - message: str |
18 | | - welcome_text = "Welcome to meterhub! Please click the link below to login.<br>" |
19 | | - url: str = "http://127.0.0.1:8000/account/verify?token=" |
20 | | - login_message: string.Template = string.Template('Click <a href="$url$token">Link</a>') |
21 | | - from_email: str |
22 | | - |
23 | | - def __init__(self) -> None: |
24 | | - self.init_variables() |
25 | | - |
26 | | - def init_variables(self): |
27 | | - raise NotImplementedError("You must set the subject and from_email.") |
28 | | - |
29 | | - def set_token(self, value): |
30 | | - self.message = self.welcome_text + self.login_message.substitute(url=self.url, token=value) |
31 | 10 |
|
32 | | - |
33 | | -class TimeLimit(object): |
34 | | - minutes: int = 10 |
35 | | - |
36 | | - |
37 | | -@dataclass |
38 | | -class MailRecord(object): |
39 | | - expired_time: t.Optional[datetime.datetime] |
40 | | - email: str |
41 | | - validated: bool |
42 | | - sault: str |
| 11 | +from . import errors, token |
43 | 12 |
|
44 | 13 |
|
45 | | -class MailRecordMixin(abc.ABC): |
| 14 | +class EmailInfo(object): |
| 15 | + """Email info.""" |
46 | 16 |
|
47 | | - @abc.abstractmethod |
48 | | - def get_mail_record(self, mail: str) -> MailRecord: |
49 | | - # for easy to change. use a function. |
50 | | - raise NotImplementedError("You must implement get_mail_record") |
| 17 | + subject: str |
| 18 | + message: str |
| 19 | + from_email: str |
| 20 | + welcome_text: str |
| 21 | + system_name: str |
51 | 22 |
|
52 | | - @abc.abstractmethod |
53 | | - def save_token(self, token: token.TokenDict): |
54 | | - """to save token in the database or somewhere.""" |
55 | | - raise NotImplementedError("You must implement save_token") |
| 23 | + url: str = "http://127.0.0.1:8000/account/verify?token=" |
| 24 | + login_message: string.Template = string.Template('Click <a href="$url$token">Link</a>') |
56 | 25 |
|
57 | | - @abc.abstractmethod |
58 | | - def disable_token(self, token: token.TokenDict): |
59 | | - """Every token should only login once""" |
60 | | - raise NotImplementedError("") |
| 26 | + def set_token(self, value): |
| 27 | + self.message = self.welcome_text + self.login_message.substitute( |
| 28 | + url=self.url, token=value |
| 29 | + ) |
61 | 30 |
|
62 | 31 |
|
63 | | -class EmailInfoMixin(MailRecordMixin): |
64 | | - email_info_class: t.Type[EmailLoginInfo] |
| 32 | +class EmailLoginInfo(EmailInfo): |
| 33 | + """Email info for login.""" |
65 | 34 |
|
66 | | - tl: TimeLimit |
| 35 | + def __init__(self): |
| 36 | + self.subject = f"Welcome to {self.system_name}! Please click the link below to login." |
| 37 | + self.from_email = "noreply@example.com" |
| 38 | + self.welcome_text: str = ( |
| 39 | + f"Welcome to {self.system_name}! Please click the link below to login.<br>" |
| 40 | + ) |
67 | 41 |
|
68 | | - def check_user(self, email): |
69 | | - User = get_user_model() |
70 | | - u = User.objects.get(email=email) |
71 | | - return u |
72 | 42 |
|
73 | | - def check_could_send(self, email): |
74 | | - re = self.get_mail_record(email) |
75 | | - # TODO: if other user send email, the current user could not sign in. |
76 | | - if (re.expired_time is None) or (re.expired_time <= datetime.datetime.now(tz=timezone.utc)): |
77 | | - return True |
78 | | - return False |
| 43 | +class EmailRegisterInfo(EmailInfo): |
| 44 | + """Email info for register.""" |
79 | 45 |
|
80 | | - def send_valid(self, email: str): |
81 | | - e = self.email_info_class() |
82 | | - m = token.TokenManager(self.tl.minutes) |
| 46 | + def __init__(self): |
| 47 | + self.subject = ( |
| 48 | + f"Welcome to {self.system_name}! Please click the link below to register." |
| 49 | + ) |
| 50 | + self.from_email = "noreply@example.com" |
| 51 | + self.welcome_text: str = ( |
| 52 | + f"Welcome to {self.system_name}! Please click the link below to register.<br>" |
| 53 | + ) |
83 | 54 |
|
84 | | - encrypt_token = m.encrypt_mail(email, self.save_token) |
85 | | - e.set_token(encrypt_token) |
86 | 55 |
|
87 | | - msg = EmailMessage(e.subject, e.message, e.from_email, [email]) |
88 | | - msg.content_subtype = "html" |
89 | | - msg.send() |
| 56 | +def get_info_class(sys_name: str) -> t.Tuple[EmailLoginInfo, EmailRegisterInfo]: |
| 57 | + class MyLoginInfo(EmailLoginInfo): |
| 58 | + system_name = sys_name |
90 | 59 |
|
91 | | - def send_login_mail(self, email: str): |
92 | | - self.check_user(email) |
93 | | - if not self.check_could_send(email=email): |
94 | | - raise Exception(f"Cannot send. Wait {self.tl.minutes} minutes.") |
| 60 | + class MyRegisterInfo(EmailRegisterInfo): |
| 61 | + system_name = sys_name |
95 | 62 |
|
96 | | - self.send_valid(email) |
| 63 | + return MyLoginInfo, MyRegisterInfo |
97 | 64 |
|
98 | 65 |
|
99 | | -class EmailValidateMixin(MailRecordMixin): |
100 | | - tl: TimeLimit |
101 | | - |
102 | | - def verify_login_mail(self, request, token_v: str): |
103 | | - m = token.TokenManager(self.tl.minutes) |
104 | | - token_str = m.decrypt_token(token=token_v) |
105 | | - token_d = m.transform_token(token_str) |
106 | | - |
107 | | - mr = self.get_mail_record(m.get_mail(token_d)) |
108 | | - if mr.validated: |
109 | | - raise Exception("Already validated.") |
| 66 | +@dataclass |
| 67 | +class TimeLimit(object): |
| 68 | + minutes: int = 10 |
110 | 69 |
|
111 | | - token_d = m.check_token(token_d, lambda: mr.sault) |
112 | | - if token_d is None: |
113 | | - raise Exception("Invalid token.") |
114 | 70 |
|
115 | | - User = get_user_model() |
116 | | - u = User.objects.get(email=m.get_mail(token_d)) |
117 | | - if not u.is_active: |
118 | | - raise Exception("Inactive user, disallow login.") |
| 71 | +@dataclass |
| 72 | +class MailRecord(object): |
| 73 | + expired_time: t.Optional[datetime.datetime] |
| 74 | + email: str |
| 75 | + validated: bool |
| 76 | + sault: str |
| 77 | + |
| 78 | + |
| 79 | +class MailRecordAPI(abc.ABC): |
| 80 | + """Mixin to get mail record.""" |
| 81 | + |
| 82 | + @abc.abstractmethod |
| 83 | + def get_mail_record(self, mail: str) -> MailRecord: |
| 84 | + """Get the mail record from the database.""" |
| 85 | + raise NotImplementedError("You must implement get_mail_record") |
| 86 | + |
| 87 | + @abc.abstractmethod |
| 88 | + def save_token(self, token: token.TokenDict): |
| 89 | + """to save token in the database or somewhere.""" |
| 90 | + raise NotImplementedError("You must implement save_token") |
| 91 | + |
| 92 | + @abc.abstractmethod |
| 93 | + def disable_token(self, token: token.TokenDict): |
| 94 | + """Every token should only login once""" |
| 95 | + raise NotImplementedError("") |
| 96 | + |
| 97 | + |
| 98 | +class EmailFunc(MailRecordAPI): |
| 99 | + """Mixin to send email.""" |
| 100 | + |
| 101 | + login_info_class: t.Type[EmailLoginInfo] |
| 102 | + register_info_class: t.Type[EmailRegisterInfo] |
| 103 | + |
| 104 | + tl: TimeLimit |
| 105 | + |
| 106 | + def check_user(self, email) -> bool: |
| 107 | + """check if the user exists.""" |
| 108 | + User = get_user_model() |
| 109 | + u = User.objects.filter(email=email) |
| 110 | + return u.exists() |
| 111 | + |
| 112 | + def check_could_send(self, email) -> bool: |
| 113 | + """check if the email could send.""" |
| 114 | + re = self.get_mail_record(email) |
| 115 | + # TODO: if other user send email, the current user could not sign in. |
| 116 | + if (re.expired_time is None) or ( |
| 117 | + re.expired_time <= datetime.datetime.now(tz=timezone.utc) |
| 118 | + ): |
| 119 | + return True |
| 120 | + return False |
| 121 | + |
| 122 | + def get_token_manager(self) -> token.TokenManager: |
| 123 | + return token.TokenManager(self.tl.minutes) |
| 124 | + |
| 125 | + def send_valid(self, email: str, mail_type: str): |
| 126 | + """send login/register mail.""" |
| 127 | + if mail_type == "login": |
| 128 | + e = self.login_info_class() |
| 129 | + elif mail_type == "register": |
| 130 | + e = self.register_info_class() |
| 131 | + else: |
| 132 | + raise ValueError(f"Invalid mail type: {mail_type}") |
| 133 | + |
| 134 | + m = self.get_token_manager() |
| 135 | + encrypt_token = m.encrypt_mail(email, mail_type, self.save_token) |
| 136 | + e.set_token(encrypt_token) |
| 137 | + |
| 138 | + msg = EmailMessage(e.subject, e.message, e.from_email, [email]) |
| 139 | + msg.content_subtype = "html" |
| 140 | + msg.send() |
| 141 | + |
| 142 | + def send_login_mail(self, email: str): |
| 143 | + """ |
| 144 | + send login mail. |
| 145 | + """ |
| 146 | + # if user not exist, send register mail. |
| 147 | + if not self.check_user(email): |
| 148 | + mail_type = "register" |
| 149 | + else: |
| 150 | + mail_type = "login" |
| 151 | + |
| 152 | + # if the email could not send, raise exception. |
| 153 | + if not self.check_could_send(email=email): |
| 154 | + raise Exception(f"Cannot send. Wait {self.tl.minutes} minutes.") |
| 155 | + |
| 156 | + self.send_valid(email, mail_type) |
| 157 | + |
| 158 | + |
| 159 | +class EmailVerifyMixin(MailRecordAPI): |
| 160 | + """verify the token in url""" |
| 161 | + |
| 162 | + tl: TimeLimit |
| 163 | + |
| 164 | + def verify_token(self, token_v: str): |
| 165 | + m = token.TokenManager(self.tl.minutes) |
| 166 | + token_str = m.decrypt_token(token=token_v) |
| 167 | + token_d = m.transform_token(token_str) |
| 168 | + |
| 169 | + mr = self.get_mail_record(m.get_mail(token_d)) |
| 170 | + if mr.validated: |
| 171 | + raise errors.ValidatedError("Token already validated.") |
| 172 | + |
| 173 | + token_d = m.check_token(token_d, lambda: mr.sault) |
| 174 | + if token_d is None: |
| 175 | + raise errors.TokenError("Invalid token.") |
119 | 176 |
|
120 | | - self.disable_token(token=token_d) |
121 | | - login(request, u) |
| 177 | + User = get_user_model() |
| 178 | + u = User.objects.filter(email=m.get_mail(token_d)).first() |
| 179 | + if not u: |
| 180 | + # if user not exist, create a new user. |
| 181 | + # support register by email. |
| 182 | + u = User.objects.create(username=m.get_mail(token_d), email=m.get_mail(token_d)) |
| 183 | + |
| 184 | + if not u.is_active: |
| 185 | + raise Exception("Inactive user, disallow login.") |
| 186 | + |
| 187 | + self.disable_token(token=token_d) |
| 188 | + return u |
| 189 | + |
| 190 | + def verify_login_mail(self, request, token_v: str): |
| 191 | + """ |
| 192 | + verify the login mail. |
| 193 | + if user not exist, create a new user. |
| 194 | + """ |
| 195 | + u = self.verify_token(token_v=token_v) |
| 196 | + login(request, u) |
122 | 197 |
|
123 | 198 |
|
124 | 199 | class EmailLogoutMixin(object): |
125 | | - def logout(self, request): |
126 | | - logout(request) |
| 200 | + def logout(self, request): |
| 201 | + logout(request) |
0 commit comments