1
+ from typing import Optional , Dict
2
+
1
3
import atexit
2
4
import gevent
3
5
import logging
6
8
from datetime import datetime
7
9
8
10
from influxdb import InfluxDBClient
9
- from locust .exception import InterruptTaskSet
10
11
from requests .exceptions import HTTPError
11
12
import locust .env
13
+ from urllib3 import HTTPConnectionPool
12
14
13
- log = logging .getLogger ('locust_influx ' )
15
+ log = logging .getLogger ('locust_influxdb_listener ' )
14
16
15
17
16
18
class InfluxDBSettings :
17
19
"""
18
- Store influxdb settings
20
+ Store InfluxDB settings for a data connection.
19
21
"""
22
+
20
23
def __init__ (
21
24
self ,
22
- influx_host : str = 'localhost' ,
23
- influx_port : int = 8086 ,
25
+ host : str = 'localhost' ,
26
+ port : int = 8086 ,
24
27
user : str = 'admin' ,
25
28
pwd : str = 'pass' ,
26
29
database : str = 'default' ,
27
30
interval_ms : int = 1000 ,
28
31
ssl : bool = False ,
29
32
verify_ssl : bool = False ,
30
- create_database : bool = False ,
31
- tags : dict = {}
33
+ additional_tags : dict = {},
34
+ influx_host : Optional [str ] = 'localhost' ,
35
+ influx_port : Optional [int ] = 8086 ,
32
36
):
33
- self .influx_host = influx_host
34
- self .influx_port = influx_port
37
+ """
38
+ Initialize the InfluxDBSettings object with provided or default settings.
39
+
40
+ :param host: InfluxDB host address or hostname.
41
+ :param port: InfluxDB HTTP API port.
42
+ :param user: InfluxDB username for authentication.
43
+ :param pwd: InfluxDB password for authentication.
44
+ :param database: InfluxDB database name for storing data.
45
+ :param interval_ms: Data sending interval in milliseconds.
46
+ :param ssl: Enable SSL/TLS for secure data transmission.
47
+ :param verify_ssl: Verify SSL certificates (only if SSL is enabled).
48
+ :param additional_tags: Additional tags to include in globally for all data points.
49
+ """
50
+ self .host = host if host else influx_host # Renamed from influx_host
51
+ self .port = port if port else influx_port # Renamed from influx_port
35
52
self .user = user
36
53
self .pwd = pwd
37
54
self .database = database
38
55
self .interval_ms = interval_ms
39
56
self .ssl = ssl
40
57
self .verify_ssl = verify_ssl
41
- self .create_database = create_database
42
- self .tags = tags
58
+ self .additional_tags = additional_tags
43
59
44
60
45
61
class InfluxDBListener :
@@ -53,28 +69,35 @@ def __init__(
53
69
env : locust .env .Environment ,
54
70
influxDbSettings : InfluxDBSettings
55
71
):
72
+ """
73
+ Initialize the InfluxDBListener with the provided Locust environment and InfluxDB settings.
56
74
57
- # flush related attributes
75
+ :param env: The Locust environment to listen for events in.
76
+ :param influxDbSettings: Settings for the InfluxDB connection.
77
+ """
78
+
58
79
self .env = env
59
80
self .cache = []
60
81
self .stop_flag = False
61
82
self .interval_ms = influxDbSettings .interval_ms
62
- self .tags = influxDbSettings .tags
83
+ self .additional_tags = influxDbSettings .additional_tags
63
84
# influxdb settings
64
85
try :
86
+ # try to connect create the database and switch to it
65
87
self .influxdb_client = InfluxDBClient (
66
- host = influxDbSettings .influx_host ,
67
- port = influxDbSettings .influx_port ,
88
+ host = influxDbSettings .host ,
89
+ port = influxDbSettings .port ,
68
90
username = influxDbSettings .user ,
69
91
password = influxDbSettings .pwd ,
70
- database = influxDbSettings .database ,
71
92
ssl = influxDbSettings .ssl ,
72
93
verify_ssl = influxDbSettings .verify_ssl
73
94
)
74
- if influxDbSettings .create_database :
75
- self .influxdb_client .create_database (influxDbSettings .database )
76
- except :
77
- logging .error ('Could not connect to influxdb.' )
95
+ # database is mandatory so we should always try to create it
96
+ self .influxdb_client .create_database (influxDbSettings .database )
97
+ self .influxdb_client .switch_database (influxDbSettings .database )
98
+
99
+ except Exception as ex :
100
+ logging .error (f'Unexpected error: { ex } ' )
78
101
return
79
102
80
103
# determine if worker or master
@@ -85,7 +108,7 @@ def __init__(
85
108
# TODO: Get real ID of slaves form locust somehow
86
109
self .node_id = 'worker'
87
110
88
- # start background event to push data to influx
111
+ # start background event to push data to influxdb
89
112
self .flush_worker = gevent .spawn (self .__flush_cached_points_worker )
90
113
self .test_start (0 )
91
114
@@ -133,14 +156,13 @@ def __register_event(self, node_id: str, user_count: int, event: str, **_kwargs)
133
156
"""
134
157
135
158
time = datetime .utcnow ()
136
- tags = self .tags
137
159
fields = {
138
160
'node_id' : node_id ,
139
161
'event' : event ,
140
162
'user_count' : user_count
141
163
}
142
164
143
- point = self .__make_data_point ('locust_events' , tags , fields , time )
165
+ point = self .__make_data_point ('locust_events' , fields , time )
144
166
self .cache .append (point )
145
167
146
168
def __listen_for_requests_events (self , node_id , measurement , request_type , name ,
@@ -154,9 +176,8 @@ def __listen_for_requests_events(self, node_id, measurement, request_type, name,
154
176
155
177
time = datetime .utcnow ()
156
178
was_successful = True
157
- if response :
158
- # override with response code
159
- was_successful = 199 < response .status_code < 400
179
+ if response is not None :
180
+ was_successful = (199 < response .status_code < 400 ) and exception is None
160
181
tags = {
161
182
'node_id' : node_id ,
162
183
'request_type' : request_type ,
@@ -176,7 +197,7 @@ def __listen_for_requests_events(self, node_id, measurement, request_type, name,
176
197
'counter' : self .env .stats .num_requests , # TODO: Review the need of this field
177
198
'user_count' : self .env .runner .user_count
178
199
}
179
- point = self .__make_data_point (measurement , tags , fields , time )
200
+ point = self .__make_data_point (measurement , fields , time , tags = tags )
180
201
self .cache .append (point )
181
202
182
203
def __listen_for_locust_errors (self , node_id , user_instance , exception : Exception = None , tb = None ) -> None :
@@ -194,7 +215,7 @@ def __listen_for_locust_errors(self, node_id, user_instance, exception: Exceptio
194
215
'exception' : repr (exception ),
195
216
'traceback' : "" .join (traceback .format_tb (tb )),
196
217
}
197
- point = self .__make_data_point ('locust_exceptions' , tags , fields , time )
218
+ point = self .__make_data_point ('locust_exceptions' , fields , time , tags = tags )
198
219
self .cache .append (point )
199
220
200
221
@@ -210,15 +231,15 @@ def __flush_cached_points_worker(self) -> None:
210
231
self .__flush_points (self .influxdb_client )
211
232
gevent .sleep (self .interval_ms / 1000 )
212
233
213
- def __make_data_point (self , measurement : str , tags : dict , fields : dict , time : datetime ) -> dict :
234
+ def __make_data_point (self , measurement : str , fields : dict , time : datetime , tags : Optional [ Dict [ str , str ]] = {} ) -> dict :
214
235
"""
215
236
Create a list with a single point to be saved to influxdb.
216
237
:param measurement: The measurement where to save this point.
217
- :param tags: Dictionary of tags to be saved in the measurement.
218
238
:param fields: Dictionary of field to be saved to measurement.
219
239
:param time: The time os this point.
240
+ :param tags: Dictionary of tags to be saved in the measurement default to None.
220
241
"""
221
- return {"measurement" : measurement , "tags" : {** tags , ** self .tags }, "time" : time , "fields" : fields }
242
+ return {"measurement" : measurement , "tags" : {** tags , ** self .additional_tags }, "time" : time , "fields" : fields }
222
243
223
244
224
245
def last_flush_on_quitting (self ):
0 commit comments