1111from prometheus_client import start_http_server , Counter
1212from collections import defaultdict
1313
14- messages_processed_total = Counter ("influxdbloader_messages_processed_total" , "Total messages processed by InfluxDB loader." )
15- messages_processed_bytes = Counter ("influxdbloader_messages_processed_bytes" , "Total message bytes processed by InfluxDB loader." )
16-
17- messages_processed_total_by_node_total = Counter ("influxdbloader_messages_processed_total_by_node_total" , "Total messages processed by InfluxDB loader by VSN." , ["node" , "vsn" ])
18- messages_processed_bytes_by_node = Counter ("influxdbloader_messages_processed_bytes_by_node" , "Total message bytes processed by InfluxDB loader by VSN." , ["node" , "vsn" ])
14+ messages_processed_total = Counter (
15+ "influxdbloader_messages_processed_total" ,
16+ "Total messages processed by InfluxDB loader." ,
17+ )
18+ messages_processed_bytes = Counter (
19+ "influxdbloader_messages_processed_bytes" ,
20+ "Total message bytes processed by InfluxDB loader." ,
21+ )
22+
23+ messages_processed_total_by_node_total = Counter (
24+ "influxdbloader_messages_processed_total_by_node_total" ,
25+ "Total messages processed by InfluxDB loader by VSN." ,
26+ ["node" , "vsn" ],
27+ )
28+ messages_processed_bytes_by_node = Counter (
29+ "influxdbloader_messages_processed_bytes_by_node" ,
30+ "Total message bytes processed by InfluxDB loader by VSN." ,
31+ ["node" , "vsn" ],
32+ )
1933
2034
2135def assert_type (obj , t ):
@@ -50,9 +64,15 @@ def coerce_value(x):
5064
5165
5266class MessageHandler :
53-
54- def __init__ (self , rabbitmq_conn : pika .BlockingConnection , influxdb_client : InfluxDBClient , influxdb_bucket : str ,
55- influxdb_org : str , max_flush_interval : float , max_batch_size : int ):
67+ def __init__ (
68+ self ,
69+ rabbitmq_conn : pika .BlockingConnection ,
70+ influxdb_client : InfluxDBClient ,
71+ influxdb_bucket : str ,
72+ influxdb_org : str ,
73+ max_flush_interval : float ,
74+ max_batch_size : int ,
75+ ):
5676 self .rabbitmq_conn = rabbitmq_conn
5777 self .influxdb_client = influxdb_client
5878 self .influxdb_bucket = influxdb_bucket
@@ -86,19 +106,27 @@ def flush(self):
86106 continue
87107
88108 # check that meta["node"] matches user_id
89- if "node-" + msg .meta ["node" ] != properties .user_id :
90- logging .info ("dropping invalid message: username (%s) doesn't match node meta (%s) - " , msg .meta ["node" ], properties .user_id )
109+ if "node-" + msg .meta ["node" ] != properties .user_id :
110+ logging .info (
111+ "dropping invalid message: username (%s) doesn't match node meta (%s) - " ,
112+ msg .meta ["node" ],
113+ properties .user_id ,
114+ )
91115 continue
92116
93- logging .debug ("creating record for msg: %s value-type: %s" , msg , type (msg .value ))
94- records .append ({
95- "measurement" : msg .name ,
96- "tags" : msg .meta ,
97- "fields" : {
98- "value" : coerce_value (msg .value ),
99- },
100- "time" : msg .timestamp ,
101- })
117+ logging .debug (
118+ "creating record for msg: %s value-type: %s" , msg , type (msg .value )
119+ )
120+ records .append (
121+ {
122+ "measurement" : msg .name ,
123+ "tags" : msg .meta ,
124+ "fields" : {
125+ "value" : coerce_value (msg .value ),
126+ },
127+ "time" : msg .timestamp ,
128+ }
129+ )
102130
103131 # update per node metrics
104132 # TODO(sean) clean up and better isolate metrics aggregation
@@ -111,7 +139,12 @@ def flush(self):
111139 logging .info ("writing %d records to influxdb" , len (records ))
112140 with self .influxdb_client .write_api (write_options = SYNCHRONOUS ) as write_api :
113141 try :
114- write_api .write (self .influxdb_bucket , self .influxdb_org , records , write_precision = WritePrecision .NS )
142+ write_api .write (
143+ self .influxdb_bucket ,
144+ self .influxdb_org ,
145+ records ,
146+ write_precision = WritePrecision .NS ,
147+ )
115148 except InfluxDBError as exc :
116149 # TODO(sean) InfluxDB only responds with single invalid data point message.
117150 # Although the write goes through for the valid data points, getting this info
@@ -169,28 +202,56 @@ def get_ssl_options(args):
169202def main ():
170203 parser = argparse .ArgumentParser ()
171204 parser .add_argument ("--debug" , action = "store_true" )
172- parser .add_argument ("--rabbitmq_host" ,default = getenv ("RABBITMQ_HOST" , "localhost" ))
173- parser .add_argument ("--rabbitmq_port" , default = getenv ("RABBITMQ_PORT" , "5672" ), type = int )
205+ parser .add_argument ("--rabbitmq_host" , default = getenv ("RABBITMQ_HOST" , "localhost" ))
206+ parser .add_argument (
207+ "--rabbitmq_port" , default = getenv ("RABBITMQ_PORT" , "5672" ), type = int
208+ )
174209 parser .add_argument ("--rabbitmq_username" , default = getenv ("RABBITMQ_USERNAME" , "" ))
175210 parser .add_argument ("--rabbitmq_password" , default = getenv ("RABBITMQ_PASSWORD" , "" ))
176- parser .add_argument ("--rabbitmq_cacertfile" , default = getenv ("RABBITMQ_CACERTFILE" , "" ))
211+ parser .add_argument (
212+ "--rabbitmq_cacertfile" , default = getenv ("RABBITMQ_CACERTFILE" , "" )
213+ )
177214 parser .add_argument ("--rabbitmq_certfile" , default = getenv ("RABBITMQ_CERTFILE" , "" ))
178215 parser .add_argument ("--rabbitmq_keyfile" , default = getenv ("RABBITMQ_KEYFILE" , "" ))
179- parser .add_argument ("--rabbitmq_exchange" , default = getenv ("RABBITMQ_EXCHANGE" , "waggle.msg" ))
180- parser .add_argument ("--rabbitmq_queue" , default = getenv ("RABBITMQ_QUEUE" , "influx-messages" ))
181- parser .add_argument ("--influxdb_url" , default = getenv ("INFLUXDB_URL" , "http://localhost:8086" ))
216+ parser .add_argument (
217+ "--rabbitmq_exchange" , default = getenv ("RABBITMQ_EXCHANGE" , "waggle.msg" )
218+ )
219+ parser .add_argument (
220+ "--rabbitmq_queue" , default = getenv ("RABBITMQ_QUEUE" , "influx-messages" )
221+ )
222+ parser .add_argument (
223+ "--influxdb_url" , default = getenv ("INFLUXDB_URL" , "http://localhost:8086" )
224+ )
182225 parser .add_argument ("--influxdb_token" , default = getenv ("INFLUXDB_TOKEN" ))
183- parser .add_argument ("--influxdb_bucket" , default = getenv ("INFLUXDB_BUCKET" , "waggle" ))
226+ parser .add_argument (
227+ "--influxdb_bucket" , default = getenv ("INFLUXDB_BUCKET" , "waggle" )
228+ )
184229 parser .add_argument ("--influxdb_org" , default = getenv ("INFLUXDB_ORG" , "waggle" ))
185- parser .add_argument ("--max_flush_interval" , default = getenv ("MAX_FLUSH_INTERVAL" , "1.0" ), type = float , help = "max flush interval" )
186- parser .add_argument ("--max_batch_size" , default = getenv ("MAX_BATCH_SIZE" , "5000" ), type = int , help = "max batch size" )
187- parser .add_argument ("--metrics_port" , default = getenv ("METRICS_PORT" , "8080" ), type = int , help = "port to expose metrics" )
230+ parser .add_argument (
231+ "--max_flush_interval" ,
232+ default = getenv ("MAX_FLUSH_INTERVAL" , "1.0" ),
233+ type = float ,
234+ help = "max flush interval" ,
235+ )
236+ parser .add_argument (
237+ "--max_batch_size" ,
238+ default = getenv ("MAX_BATCH_SIZE" , "5000" ),
239+ type = int ,
240+ help = "max batch size" ,
241+ )
242+ parser .add_argument (
243+ "--metrics_port" ,
244+ default = getenv ("METRICS_PORT" , "8080" ),
245+ type = int ,
246+ help = "port to expose metrics" ,
247+ )
188248 args = parser .parse_args ()
189249
190250 logging .basicConfig (
191251 level = logging .DEBUG if args .debug else logging .INFO ,
192252 format = "%(asctime)s %(message)s" ,
193- datefmt = "%Y/%m/%d %H:%M:%S" )
253+ datefmt = "%Y/%m/%d %H:%M:%S" ,
254+ )
194255 # pika logging is too verbose, so we turn it down.
195256 logging .getLogger ("pika" ).setLevel (logging .CRITICAL )
196257
@@ -203,18 +264,21 @@ def main():
203264 credentials = credentials ,
204265 ssl_options = ssl_options ,
205266 retry_delay = 60 ,
206- socket_timeout = 10.0 )
267+ socket_timeout = 10.0 ,
268+ )
207269
208270 start_http_server (args .metrics_port )
209271
210272 with ExitStack () as es :
211273 logging .info ("connecting to influxdb at %s" , args .influxdb_url )
212- client = es .enter_context (InfluxDBClient (
213- url = args .influxdb_url ,
214- token = args .influxdb_token ,
215- org = args .influxdb_org ,
216- enable_gzip = True ,
217- ))
274+ client = es .enter_context (
275+ InfluxDBClient (
276+ url = args .influxdb_url ,
277+ token = args .influxdb_token ,
278+ org = args .influxdb_org ,
279+ enable_gzip = True ,
280+ )
281+ )
218282 logging .info ("connected to influxdb" )
219283
220284 logging .info ("connecting to rabbitmq" )
0 commit comments