1515# along with this program. If not, see <https://www.gnu.org/licenses/>.
1616
1717from __future__ import annotations
18- from typing import Sequence , MutableMapping , Union , Final , ClassVar , Optional , Any
18+ from typing import Sequence , MutableMapping , Union , Final , ClassVar , Optional , Any , Callable
1919
2020import asyncio
2121from collections import defaultdict , Counter
2626from ._stat import NotifierStat
2727from .. import db , env , web
2828from ..command import inner
29- from ..command .utils import unsub_all_and_leave_chat , escape_html
29+ from ..command .utils import default_leave_chat_helper , escape_html
3030from ..compat import nullcontext
3131from ..errors_collection import EntityNotFoundError , UserBlockedErrors
3232from ..helpers .bg import bg
@@ -42,8 +42,9 @@ class Notifier:
4242 _stat : ClassVar [NotifierStat ] = NotifierStat ()
4343
4444 # it may cause memory leak, but they are too small that leaking thousands of that is still not a big deal!
45- _user_unsub_all_lock_bucket : ClassVar [dict [int , asyncio .Lock ]] = defaultdict (asyncio .Lock )
45+ _on_blocked_lock_bucket : ClassVar [dict [int , asyncio .Lock ]] = defaultdict (asyncio .Lock )
4646 _user_blocked_counter : ClassVar [Counter ] = Counter ()
47+ _on_blocked_cb : Callable [[int ], None ] = default_leave_chat_helper
4748
4849 def __init__ (
4950 self ,
@@ -273,7 +274,7 @@ async def _send(self, sub: db.Sub, post: Union[str, Post]) -> None:
273274 try :
274275 await env .bot .get_input_entity (user_id ) # verify that the input entity can be gotten first
275276 except ValueError : # cannot get the input entity, the user may have banned the bot
276- return await self ._locked_unsub_all_and_leave_chat (
277+ return await self ._on_blocked (
277278 user_id = user_id ,
278279 err_msg = type (EntityNotFoundError ).__name__ ,
279280 )
@@ -285,10 +286,10 @@ async def _send(self, sub: db.Sub, post: Union[str, Post]) -> None:
285286 if self ._user_blocked_counter [user_id ]: # reset the counter if success
286287 del self ._user_blocked_counter [user_id ]
287288 except UserBlockedErrors as e :
288- return await self ._locked_unsub_all_and_leave_chat (user_id = user_id , err_msg = type (e ).__name__ )
289+ return await self ._on_blocked (user_id = user_id , err_msg = type (e ).__name__ )
289290 except BadRequestError as e :
290291 if e .message == 'TOPIC_CLOSED' :
291- return await self ._locked_unsub_all_and_leave_chat (user_id = user_id , err_msg = e .message )
292+ return await self ._on_blocked (user_id = user_id , err_msg = e .message )
292293 except Exception as e :
293294 logger .error (f'Failed to send { post .link } (feed: { post .feed_link } , user: { sub .user_id } ):' , exc_info = e )
294295 try :
@@ -314,18 +315,18 @@ async def _send(self, sub: db.Sub, post: Union[str, Post]) -> None:
314315 )
315316 return None
316317
317- async def _locked_unsub_all_and_leave_chat (self , user_id : int , err_msg : str ) -> None :
318- user_unsub_all_lock = self ._user_unsub_all_lock_bucket [user_id ]
319- if user_unsub_all_lock .locked ():
318+ async def _on_blocked (self , user_id : int , err_msg : str ):
319+ on_blocked_lock = self ._on_blocked_lock_bucket [user_id ]
320+ if on_blocked_lock .locked ():
320321 return # no need to unsub twice!
321- async with user_unsub_all_lock :
322+ async with on_blocked_lock :
322323 if self ._user_blocked_counter [user_id ] < 5 :
323324 self ._user_blocked_counter [user_id ] += 1
324325 return # skip once
325326 # fail for 5 times, consider been banned
326327 del self ._user_blocked_counter [user_id ]
327328 logger .error (f'User blocked ({ err_msg } ): { user_id } ' )
328- await unsub_all_and_leave_chat (user_id )
329+ await self . _on_blocked_cb (user_id )
329330 if self ._raise_stop_pipeline_after_leave_chat :
330331 raise StopPipeline ()
331332
0 commit comments