-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathmain.py
More file actions
357 lines (298 loc) · 14.1 KB
/
main.py
File metadata and controls
357 lines (298 loc) · 14.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
from os import name as os_name, getenv
from asyncio import run, wait, create_task, FIRST_COMPLETED, Event, get_running_loop
from logging import getLogger
import signal
from datetime import datetime, time as t
from io import BytesIO
import aiohttp
from dotenv import load_dotenv
from aiogram import Bot, Dispatcher, types
from aiogram.filters import Command
from aiogram.types import BufferedInputFile
from pymax import SocketMaxClient, MaxClient, Message
from pymax.types import FileAttach, PhotoAttach, VideoAttach
import data_handler
from logger import setup_logger
# --- Initial Setup ---
setup_logger()
l = getLogger("api_logger")
load_dotenv()
# --- Constants & Configuration ---
CHECK_TIME = False # проверять ли время перед отправкой сообщения (если да, то давать ошибку если START_TIME <= now <= END_TIME)
START_TIME = t(7, 0)
END_TIME = t(22, 0)
BOT_POST_MESSAGE = None # доп текст в сообщении от бота
BOT_MESSAGE_PREFIX = "⫻" # префикс для отпарвляемых сообщений
BOT_START_MESSAGE = None # стартовое сообщение бота отпарвляемое в макс при запуске (если None, то не отпралвять)
REQUESTS_TIMEOUT = 15 # таймаут запросов
# --- Environment Variables ---
try:
USE_SOCKET_CLIENT = eval(getenv('USE_SOCKET_CLIENT', 'False').title())
MAX_PHONE = getenv('VK_PHONE')
MAX_CHAT_ID = int(getenv('VK_CHAT_ID', 0))
MAX_TOKEN = getenv('VK_COOKIE')
TG_CHAT_ID = int(getenv('TG_CHAT_ID', 0))
TG_TOKEN = getenv('TG_TOKEN')
ADMIN_USER_ID = int(getenv('ADMIN_USER_ID', 0))
if not all([MAX_CHAT_ID, TG_CHAT_ID, TG_TOKEN, MAX_TOKEN, MAX_PHONE]):
raise ValueError("One or more environment variables are not set.")
assert TG_TOKEN
assert MAX_PHONE
except (ValueError, TypeError) as e:
l.critical(f"FATAL: Configuration error - {e}. Please check your .env file.")
quit(1)
msgs_map = data_handler.load('msgs') or {}
last_sender_id = None
bot = Bot(token=TG_TOKEN)
dp = Dispatcher()
# Reconnect=True effectively replaces the "Watchdog" thread
if USE_SOCKET_CLIENT:
client = SocketMaxClient(MAX_PHONE, token=MAX_TOKEN, work_dir="data/cache", reconnect=True)
else:
client = MaxClient(MAX_PHONE, token=MAX_TOKEN, work_dir="data/cache", reconnect=True)
# --- Helper Functions ---
async def download_content(url: str) -> BytesIO:
"""Download content from URL into memory."""
async with aiohttp.ClientSession() as session:
async with session.get(url, timeout=REQUESTS_TIMEOUT) as response: # pyright: ignore[reportArgumentType]
response.raise_for_status()
content = await response.read()
file_bytes = BytesIO(content)
# Attempt to set a name, though Telegram often overrides logic based on method
file_bytes.name = response.headers.get("X-File-Name", "file")
return file_bytes
async def get_sender_name(user_id: int) -> str:
"""Fetch user name via PyMax."""
try:
user = await client.get_user(user_id=user_id)
if user and user.names:
return user.names[0].name or ''
except Exception as e:
l.error(f"Could not fetch profile for ID {user_id}: {e}")
return f"User {user_id}"
# --- Logic: Max -> Telegram ---
async def get_smart_sender_info(user_id: int):
"""Fetches name and determines gender-specific verb suffix."""
try:
user = await client.get_user(user_id=user_id)
if user:
name = f"{user.names[0].name}" if user.names else f"User {user_id}"
# Sex: 1 is Female, 2 is Male. Default to 'л' (male/neutral)
suffix = "ла" if user.gender == 1 else "л"
return name, suffix
except Exception as e:
l.error(f"Error fetching user {user_id}: {e}")
return f"User {user_id}", "л(-а)"
# --- Logic: Max -> Telegram ---
async def process_max_message(message: Message, forwarded: bool = False) -> int | None:
"""
Handles messages. Returns the Telegram Message ID of the first part sent.
"""
global last_sender_id
assert message.sender
assert message.chat_id
# 1. Top-level filter
l.debug(message)
if not forwarded and message.chat_id != MAX_CHAT_ID:
return None
if message.text and message.text.startswith(BOT_MESSAGE_PREFIX):
return None
msg_id_str = str(message.id) if message.id else "FWD_PART"
l.info(f"Processing Max Message ID: {msg_id_str} (Forwarded: {forwarded})")
# This will track the FIRST Telegram ID associated with this Max message
first_tg_id = None
try:
sender_name, gender_suffix = await get_smart_sender_info(message.sender)
# 2. Header Logic
if not forwarded and last_sender_id != message.sender:
header_text = f"{BOT_MESSAGE_PREFIX} *{sender_name} написа{gender_suffix}:*"
sent_header = await bot.send_message(TG_CHAT_ID, header_text, parse_mode="Markdown")
first_tg_id = sent_header.message_id
last_sender_id = message.sender
# 3. Reply Mapping (Lookup)
reply_to_tg_id = None
if message.link and message.link.type == 'REPLY':
replied_max_id = str(message.link.message.id)
reply_to_tg_id = msgs_map.get(replied_max_id)
if reply_to_tg_id:
l.info(f"Reply Link: Max[{replied_max_id}] -> TG[{reply_to_tg_id}]")
# 4. Forward Recursion
fwds_to_process = []
if message.link and message.link.type == 'FORWARD':
fwds_to_process.append(message.link.message)
if hasattr(message, 'fwd_messages') and message.fwd_messages: # pyright: ignore[reportAttributeAccessIssue]
fwds_to_process.extend(message.fwd_messages) # pyright: ignore[reportAttributeAccessIssue]
for fwd_msg in fwds_to_process:
# Recursive call returns the TG ID of the forwarded message
fwd_tg_id = await process_max_message(fwd_msg, forwarded=True)
# If our container doesn't have a TG ID yet (no header), use the first forward's ID
if first_tg_id is None:
first_tg_id = fwd_tg_id
# 5. Content Prep
text_content = message.text or ""
if forwarded:
text_content = f"↪ Переслано от {sender_name}:_\n{text_content}"
# 6. Attachments
if message.attaches:
for attach in message.attaches:
sent = None
try:
if isinstance(attach, PhotoAttach):
f_bytes = await download_content(attach.base_url)
sent = await bot.send_photo(
TG_CHAT_ID,
photo=BufferedInputFile(f_bytes.getvalue(), filename="photo.jpg"),
caption=text_content if text_content else None,
reply_to_message_id=reply_to_tg_id,
parse_mode="Markdown"
)
elif isinstance(attach, VideoAttach):
vid_info = await client.get_video_by_id(message.chat_id, message.id, attach.video_id)
if vid_info and vid_info.url:
f_bytes = await download_content(vid_info.url)
sent = await bot.send_video(
TG_CHAT_ID,
video=BufferedInputFile(f_bytes.getvalue(), filename="video.mp4"),
caption=text_content if text_content else None,
reply_to_message_id=reply_to_tg_id,
parse_mode="Markdown"
)
elif isinstance(attach, FileAttach):
file_info = await client.get_file_by_id(message.chat_id, message.id, attach.file_id)
if file_info and file_info.url:
f_bytes = await download_content(file_info.url)
sent = await bot.send_document(
TG_CHAT_ID,
document=BufferedInputFile(f_bytes.getvalue(), filename=getattr(file_info, 'name', 'file')),
caption=text_content if text_content else None,
reply_to_message_id=reply_to_tg_id,
parse_mode="Markdown"
)
if sent:
if first_tg_id is None: first_tg_id = sent.message_id
text_content = "" # Only send caption once
except Exception as e:
l.error(f"Attachment error: {e}")
# 7. Remaining Text
if text_content.strip():
sent_msg = await bot.send_message(
TG_CHAT_ID,
text_content,
reply_to_message_id=reply_to_tg_id,
parse_mode="Markdown"
)
if first_tg_id is None: first_tg_id = sent_msg.message_id
# 8. Save Mapping
# We save mapping for both forwarded items and top-level containers
if first_tg_id and message.id:
msgs_map[str(message.id)] = first_tg_id
data_handler.save('msgs', msgs_map)
l.info(f"Mapping Saved: Max[{message.id}] == TG[{first_tg_id}]")
return first_tg_id
except Exception as e:
l.error(f"Error: {e}", exc_info=True)
return None
@client.on_message()
async def max_message_handler(message: Message):
# PyMax entry point
await process_max_message(message)
# --- Logic: Telegram -> Max ---
@dp.message(Command("send"))
async def send_handler(message: types.Message):
"""Handles /send command."""
assert message.from_user
try:
# Check time
now = datetime.now().time()
if ADMIN_USER_ID and message.from_user.id != ADMIN_USER_ID:
await message.reply('Отправка сообщений доступна только администратору')
return
if not (START_TIME <= now <= END_TIME) and CHECK_TIME:
await message.reply(f"Можно отправлять сообщения только между {START_TIME:%H:%M} и {END_TIME:%H:%M}")
return
# Check empty message
text_to_send = (message.text or '').replace("/send", "", 1).strip()
if not text_to_send:
await message.reply("Нельзя отправить пустое сообщение.")
return
# Get username
username = message.from_user.full_name or message.from_user.username
# Create full text
full_text = f"{BOT_MESSAGE_PREFIX} *{username} написал(-а):*\n{text_to_send}"
if BOT_POST_MESSAGE:
full_text += f"\n{BOT_MESSAGE_PREFIX} {BOT_POST_MESSAGE}"
# Get id of replied message in MAX
reply_to_max_id = None
if message.reply_to_message:
tg_reply_id = message.reply_to_message.message_id
# Reverse lookup
for mid, tid in msgs_map.items():
if tid == tg_reply_id:
reply_to_max_id = mid
break
# Send message
sent_msg = await client.send_message(
chat_id=MAX_CHAT_ID,
text=full_text,
reply_to=reply_to_max_id
)
# Map message
if sent_msg and sent_msg.id:
msgs_map[str(sent_msg.id)] = message.message_id
await message.reply("Отправлено!")
except Exception as e:
l.error(f"Error in send_handler: {e}", exc_info=True)
await message.reply('Произошла ошибка при отправке.')
# --- Lifecycle ---
async def on_startup():
l.info("Bot started. Transfer is active.")
# Send startup message (invite link) logic
if BOT_START_MESSAGE and not data_handler.load("started"):
try:
invite = await bot.create_chat_invite_link(TG_CHAT_ID)
msg = BOT_START_MESSAGE.replace("TG_CHAT_INVITE_LINK", invite.invite_link)
await client.send_message(msg, MAX_CHAT_ID)
data_handler.save("started", True)
except Exception as e:
l.error(f"Failed to send startup message: {e}")
async def main():
# 1. Setup Signal Handling
stop_event = Event()
loop = get_running_loop()
if os_name != 'nt':
for sig in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(sig, stop_event.set)
# 2. Start Telegram Poller FIRST (as a background task)
l.info("Starting Telegram Polling...")
# This creates the task but doesn't block execution
tg_task = create_task(dp.start_polling(bot))
# 3. Run startup logic (invite links, etc.)
await on_startup()
# 4. Start Max Client (This blocks and keeps the script alive)
l.info("Initializing Max Client...")
max_task = create_task(client.start())
l.debug('inited')
try:
# We use a task for Max as well to allow clean shutdowns
# Wait for either the stop signal or the tasks to fail
stop_task = create_task(stop_event.wait())
await wait(
[tg_task, max_task, stop_task],
return_when=FIRST_COMPLETED
)
except Exception as e:
l.error(f"Critical error in main loop: {e}")
finally:
l.info("Shutting down...")
data_handler.save('msgs', msgs_map)
# Clean up tasks
tg_task.cancel()
max_task.cancel()
await client.close()
await bot.session.close()
l.info("Shutdown complete.")
if __name__ == '__main__':
try:
run(main())
except (KeyboardInterrupt, SystemExit):
l.info("Bot stopped.")