1- import datetime
1+ import json
2+ import logging
23import logging
34import os
5+ import socket
46import sys
57import time
68from collections import OrderedDict
9+ # noinspection PyUnresolvedReferences
10+ from logging .handlers import WatchedFileHandler
711from queue import Queue
8- # noinspection PyPackageRequirements
9- # from elasticsearch import Elasticsearch, helpers # 性能导入时间消耗2秒,实例化时候再导入。
10- from threading import Thread
12+ from threading import Lock , Thread
1113
12- from funboost . core . current_task import funboost_current_task
14+ import requests
1315from funboost .core .task_id_logger import TaskIdLogger
1416from nb_log import LogManager
17+ # noinspection PyPackageRequirements
1518from nb_log .monkey_print import nb_print
1619
1720from nb_log_config import IS_ADD_ELASTIC_HANDLER , \
18- NB_LOG_FORMATER_INDEX_FOR_CONSUMER_AND_PUBLISHER , ELASTIC_HOST , computer_name
21+ NB_LOG_FORMATER_INDEX_FOR_CONSUMER_AND_PUBLISHER , ELASTIC_HOST , computer_name , IS_ADD_DING_TALK_HANDLER , \
22+ DING_TALK_TOKEN , DING_TALK_SECRET , TIME_INTERVAL , DING_TALK_MSG_TEMPLATE
1923
2024very_nb_print = nb_print
2125
26+ host_name = socket .gethostname ()
27+
2228
2329def get (name : str = '' , tag : str = '' ) -> logging .Logger :
2430 """
@@ -33,10 +39,17 @@ def get(name: str = '', tag: str = '') -> logging.Logger:
3339 logger_name = name
3440 logger = LogManager (logger_name , logger_cls = TaskIdLogger ).get_logger_and_add_handlers (
3541 formatter_template = NB_LOG_FORMATER_INDEX_FOR_CONSUMER_AND_PUBLISHER )
42+
3643 if IS_ADD_ELASTIC_HANDLER :
3744 handler = ElasticHandler ([ELASTIC_HOST ], "" )
3845 handler .setLevel (10 )
3946 logger .addHandler (handler )
47+
48+ if IS_ADD_DING_TALK_HANDLER :
49+ handler = DingTalkHandler (DING_TALK_TOKEN , TIME_INTERVAL , DING_TALK_SECRET )
50+ # warnings
51+ handler .setLevel (30 )
52+ logger .addHandler (handler )
4053 return logger
4154
4255
@@ -85,6 +98,76 @@ def for_task(name: str) -> logging.Logger:
8598 return get ('task' , name )
8699
87100
101+ class DingTalkHandler (logging .Handler ):
102+ _lock_for_remove_handlers = Lock ()
103+
104+ def __init__ (self , ding_talk_token : str = None , time_interval : int = 60 , ding_talk_secret : str = None ):
105+ super ().__init__ ()
106+ self .ding_talk_token = ding_talk_token
107+ self .ding_talk_secret = ding_talk_secret
108+ self ._ding_talk_url = f'https://oapi.dingtalk.com/robot/send?access_token={ ding_talk_token } '
109+ self ._current_time = 0
110+ self ._time_interval = time_interval # 最好别频繁发。
111+ self ._msg_template = r'{"msgtype":"markdown","markdown":{"title":"discovery-syncer-python","text":"**时间:** %(asctime)s\n\n**任务:** %(task_id)s\n\n**脚本:** %(pathname)s\n\n**函数:** %(funcName)s\n\n**行号:** %(lineno)s\n\n**信息:** %(msg)s"}}'
112+ self ._lock = Lock ()
113+
114+ def emit (self , record ):
115+ # from threading import Thread
116+ with self ._lock :
117+ if time .time () - self ._current_time > self ._time_interval :
118+ # very_nb_print(self._current_time)
119+ self ._current_time = time .time ()
120+ self .__emit (record )
121+ # Thread(target=self.__emit, args=(record,)).start()
122+
123+ else :
124+ very_nb_print (
125+ f'此次离上次发送钉钉消息时间间隔不足 { self ._time_interval } 秒,此次不发送这个钉钉内容: { record .msg } ' )
126+
127+ def __emit (self , record ):
128+ data = (DING_TALK_MSG_TEMPLATE or self ._msg_template ) % record .__dict__
129+ try :
130+ # 因为钉钉发送也是使用requests实现的,如果requests调用的urllib3命名空间也加上了钉钉日志,将会造成循环,程序卡住。一般情况是在根日志加了钉钉handler。
131+ self ._remove_urllib_hanlder ()
132+ resp = requests .post (self ._ding_talk_url + self .sign (),
133+ json = json .loads (data .replace ("\\ " , "\\ \\ " ).replace ("\\ \\ n" , "\\ n" )), timeout = (5 , 5 ))
134+ very_nb_print (f'钉钉返回 : { resp .text } ' )
135+ except requests .RequestException as e :
136+ very_nb_print (f"发送消息给钉钉机器人失败 { e } " )
137+
138+ def __repr__ (self ):
139+ level = logging .getLevelName (self .level )
140+ return '<%s (%s)>' % (self .__class__ .__name__ , level ) + ' dingtalk token is ' + self .ding_talk_token
141+
142+ def sign (self ):
143+ if not self .ding_talk_secret :
144+ return ""
145+ import time
146+ import hmac
147+ import hashlib
148+ import base64
149+ import urllib .parse
150+ timestamp = str (round (time .time () * 1000 ))
151+ secret_enc = self .ding_talk_secret .encode ('utf-8' )
152+ string_to_sign = '{}\n {}' .format (timestamp , self .ding_talk_secret )
153+ string_to_sign_enc = string_to_sign .encode ('utf-8' )
154+ hmac_code = hmac .new (secret_enc , string_to_sign_enc , digestmod = hashlib .sha256 ).digest ()
155+ sign = urllib .parse .quote_plus (base64 .b64encode (hmac_code ))
156+ return "×tamp={}&sign={}" .format (timestamp , sign )
157+
158+ @classmethod
159+ def _remove_urllib_hanlder (cls ):
160+ for name in ['root' , 'urllib3' , 'requests' ]:
161+ cls .__remove_urllib_hanlder_by_name (name )
162+
163+ @classmethod
164+ def __remove_urllib_hanlder_by_name (cls , logger_name ):
165+ with cls ._lock_for_remove_handlers :
166+ for index , hdlr in enumerate (logging .getLogger (logger_name ).handlers ):
167+ if 'DingTalkHandler' in str (hdlr ):
168+ logging .getLogger (logger_name ).handlers .pop (index )
169+
170+
88171# noinspection PyUnresolvedReferences
89172class ElasticHandler (logging .Handler ):
90173 """
@@ -171,8 +254,7 @@ def emit(self, record):
171254 log_info_dict ['log_level' ] = level_str
172255 log_info_dict ['msg' ] = str (record .msg )
173256 log_info_dict ['script' ] = self .script_name
174- if record .task_id :
175- log_info_dict ['task_id' ] = record .task_id
257+ log_info_dict ['task_id' ] = record .task_id
176258 self .__add_task_to_bulk ({
177259 "_index" : f'{ self ._index_prefix } { time .strftime ("%Y.%m.%d" )} ' ,
178260 "_source" : log_info_dict
0 commit comments