11import asyncio
22from multiprocessing import Process , Queue
3+ from os .path import join
4+ from queue import Empty
35from typing import Optional
46
57from loguru import logger
68
9+ from app .config import settings
710from app .db .base import Session
811from app .db .data_table import Channel , ChannelHistory , Thumbnail
912from app .db .repository import YoutubeDataRepository
1013from app .integrations .ytapi import YTApiClient
1114from app .integrations .ytdlp import YTChannelDownloader
12- from app .schema import ChannelAPIInfoSchema , ChannelInfoSchema , NewVideoSchema , VideoSchema
15+ from app .schema import ChannelAPIInfoSchema , ChannelInfoSchema , NewVideoSchema , VideoDownloadSchema , VideoSchema
1316
1417
1518class YTMonitorService :
@@ -28,6 +31,7 @@ def __init__(
2831 self ._history_timeout = history_timeout
2932 self ._queue = new_videos_queue # Очередь для обработки новых видео
3033 self ._shorts_publish = shorts_publish
34+ self ._download_queue = Queue ()
3135
3236 def run (
3337 self , monitor_new : bool = True , monitor_history : bool = True , monitor_video_formats : bool = True
@@ -50,6 +54,11 @@ def run(
5054 processes .append (video_formats_process )
5155 video_formats_process .start ()
5256
57+ if self ._shorts_publish :
58+ shorts_publish_process = Process (target = self ._start_async_loop , args = (self ._shorts_downloader ,))
59+ processes .append (shorts_publish_process )
60+ shorts_publish_process .start ()
61+
5362 return processes
5463
5564 def _start_async_loop (self , coro_func , * args , ** kwargs ):
@@ -96,6 +105,21 @@ async def _update_video_formats(self):
96105 logger .info (f"(FORMATS) Waiting for { self ._history_timeout } seconds" )
97106 await asyncio .sleep (self ._history_timeout )
98107
108+ async def _shorts_downloader (self , delay : int = 5 ):
109+ while True :
110+ try :
111+ video : VideoDownloadSchema = self ._download_queue .get (block = False , timeout = 5 )
112+ logger .debug (f"video={ video } " )
113+ await YTChannelDownloader .download_video (video )
114+ # Задержка между скачиванием файлов
115+ await asyncio .sleep (delay )
116+ except Empty :
117+ # Если очередь пуста после таймаута, ничего не делаем и продолжаем ждать
118+ await asyncio .sleep (delay )
119+ continue
120+ except Exception as e :
121+ logger .error (f"Ошибка при отправке сообщения: { e } " )
122+
99123 async def _process_channel_videos (self , channel_url : str , process_new : bool = False , process_old : bool = False ):
100124 """Обработка новых и старых видео для канала."""
101125 # Получение информации о канале через yt-dlp
@@ -178,11 +202,26 @@ async def _process_channel_videos(self, channel_url: str, process_new: bool = Fa
178202 channel_url = ytdlp_channel_info .channel_url ,
179203 video_title = video .title ,
180204 video_url = video .url ,
205+ video_id = video .id ,
181206 )
182207 ) # add video to queue for telegram bot
183- # elif self._shorts_publish:
184- # self._queue.put(
185- # )
208+ elif self ._shorts_publish :
209+ video_file_name = (
210+ (ytdlp_channel_info .original_url or ytdlp_channel_info .channel )
211+ + "_shorts_"
212+ + video .id
213+ + ".mp4"
214+ )
215+ self ._download_queue .put (
216+ VideoDownloadSchema (
217+ file_name = video_file_name ,
218+ video_download_path = join (
219+ settings .storage_path , settings .shorts_download_path , video_file_name
220+ ),
221+ video_url = video .url ,
222+ video_id = video .id ,
223+ )
224+ )
186225 if process_old and old_videos :
187226 self ._process_old_videos (old_videos )
188227
0 commit comments