-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcontroller.py
107 lines (76 loc) · 3.06 KB
/
controller.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
from settings import (
BOT_API_TOKEN,
DEFAULT_SERVER_ID,
)
import telegram.monitoring as monitoring
import outline.api as outline
import amnezia.controller as amnezia
from settings import BOT_API_TOKEN, DEFAULT_SERVER_ID
from helpers.exceptions import (
KeyCreationError,
KeyRenamingError,
InvalidServerIdError,
)
from helpers.aliases import ServerId
assert BOT_API_TOKEN is not None
def create_new_key(message, server_id: ServerId = DEFAULT_SERVER_ID,
key_name: str | None = None, type: str = "outline"):
try:
if key_name is None:
key_name = form_key_name(message)
if type == "outline":
key = outline.get_new_key(key_name, server_id)
elif type == "amnezia-warp":
key = amnezia.generate_amnezia_key(type=type);
else:
#TODO
print("Key type is unknown.")
raise Exception
return key
except KeyCreationError:
error_message = "API error: cannot create the key"
send_monitoring_error_message(message, error_message)
print(error_message)
raise KeyCreationError
except KeyRenamingError:
error_message = "API error: cannot rename the key"
send_monitoring_error_message(message, error_message)
print(error_message)
raise KeyRenamingError
except InvalidServerIdError:
error_message = "The server id does not exist."
send_monitoring_error_message(message, error_message)
print(error_message)
raise InvalidServerIdError
def form_key_name(message) -> str:
if message.text.startswith("https://portal.itgen.io/"):
key_name = _get_gena_user_id_from_link(message.text)
else:
key_name = message.from_user.username
return key_name
def report_blacklist_attempt(username: str, chat_id: str) -> None:
monitoring.report_blacklist_attempt(username, chat_id)
def report_not_in_whitelist(username: str, chat_id: str) -> None:
monitoring.report_not_in_whitelist(username, chat_id)
def send_api_status():
monitoring.send_api_status()
def send_monitoring_start_message():
monitoring.send_start_message()
def send_monitoring_error_message(message, error_message):
monitoring.send_error(error_message=error_message,
username=message.from_user.username,
firstname=message.from_user.first_name,
lastname=message.from_user.last_name)
def send_monitoring_new_key_created(key, message):
monitoring.new_key_created(
key_id=key.kid,
key_name=key.name,
chat_id=message.chat.id,
username=message.from_user.username,
server_id=key.server_id)
def _get_gena_user_id_from_link(link: str) -> str:
if not link.startswith("https://portal.itgen.io/profile/"):
raise InvalidGenaProfileLink
gena_profile_id = link.split('/')[-1]
# in case we have something like '?tab=home' at the end
return gena_profile_id.split("?")[0]