18
18
19
19
20
20
class Matcher :
21
- """Matcher"""
22
21
# pylint: disable=too-many-instance-attributes
22
+ """
23
+ A class used to match or interact with an Elasticsearch index for performance scale testing.
24
+
25
+ Attributes:
26
+ index (str): Name of the Elasticsearch index to interact with.
27
+ level (int): Logging level (e.g., logging.INFO).
28
+ es_url (str): Elasticsearch endpoint, can be specified by the environment variable ES_SERVER
29
+ verify_certs (bool): Whether to verify SSL certificates when connecting to Elasticsearch.
30
+ version_field (str): Name of the field containing the OpenShift version.
31
+ uuid_field (str): Name of the field containing the UUID.
32
+ """
33
+
23
34
def __init__ (
24
35
self ,
25
36
index : str = "ospst-perf-scale-ci" ,
26
37
level : int = logging .INFO ,
27
- ES_URL : str = os .getenv ("ES_SERVER" ),
38
+ es_url : str = os .getenv ("ES_SERVER" ),
28
39
verify_certs : bool = True ,
29
40
version_field : str = "ocpVersion" ,
30
41
uuid_field : str = "uuid"
31
42
):
32
43
self .index = index
33
- self .es_url = ES_URL
34
44
self .search_size = 10000
35
45
self .logger = SingletonLogger (debug = level , name = "Matcher" )
36
- self .es = Elasticsearch ([self . es_url ], timeout = 30 , verify_certs = verify_certs )
46
+ self .es = Elasticsearch ([es_url ], timeout = 30 , verify_certs = verify_certs )
37
47
self .data = None
38
48
self .version_field = version_field
39
49
self .uuid_field = uuid_field
40
50
41
- def get_metadata_by_uuid (self , uuid : str , index : str = None ) -> dict :
51
+ def get_metadata_by_uuid (self , uuid : str ) -> dict :
42
52
"""Returns back metadata when uuid is given
43
53
44
54
Args:
45
55
uuid (str): uuid of the run
46
- index (str, optional): index to be searched in. Defaults to None.
47
56
48
57
Returns:
49
58
_type_: _description_
50
59
"""
51
- if index is None :
52
- index = self .index
53
60
query = Q ("match" , ** {self .uuid_field : {"value" : f"{ uuid } " }})
54
61
result = {}
55
- s = Search (using = self .es , index = index ).query (query )
56
- res = self .query_index (index , s )
62
+ s = Search (using = self .es , index = self . index ).query (query )
63
+ res = self .query_index (s )
57
64
hits = res .hits .hits
58
65
if hits :
59
66
result = dict (hits [0 ].to_dict ()["_source" ])
60
67
return result
61
68
62
- def query_index (self , index : str , search : Search ) -> Response :
69
+ def query_index (self , search : Search ) -> Response :
63
70
"""generic query function
64
71
65
72
Args:
66
- index (str): _description_
67
73
search (Search) : Search object with query
68
74
"""
69
- self .logger .info ("Executing query against index= %s" , index )
75
+ self .logger .info ("Executing query against index: %s" , self . index )
70
76
self .logger .debug ("Executing query \r \n %s" , search .to_dict ())
71
77
return search .execute ()
72
78
73
79
def get_uuid_by_metadata (
74
80
self ,
75
81
meta : Dict [str , Any ],
76
- index : str = None ,
77
82
lookback_date : datetime = None ,
78
83
lookback_size : int = 10000 ,
79
84
timestamp_field : str = "timestamp"
@@ -82,37 +87,30 @@ def get_uuid_by_metadata(
82
87
83
88
Args:
84
89
meta (Dict[str, Any]): metadata of the runs
85
- index (str, optional): Index to search. Defaults to None.
86
- lookback_date (datetime, optional):
90
+ lookback_date (datetime, optional):
87
91
The cutoff date to get the uuids from. Defaults to None.
88
- lookback_size (int, optional):
92
+ lookback_size (int, optional):
89
93
Maximum number of runs to get, gets the latest. Defaults to 10000.
90
94
91
- lookback_size and lookback_date get the data on the
95
+ lookback_size and lookback_date get the data on the
92
96
precedency of whichever cutoff comes first.
93
97
Similar to a car manufacturer's warranty limits.
94
98
95
99
Returns:
96
100
List[Dict[str, str]]: _description_
97
101
"""
102
+ must_clause = []
98
103
must_not_clause = []
99
- if index is None :
100
- index = self .index
101
-
102
104
version = str (meta [self .version_field ])[:4 ]
103
105
104
- must_clause = [
105
- (
106
- Q ("match" , ** {field : str (value )})
107
- if isinstance (value , str )
108
- else Q ("match" , ** {field : value })
109
- )
110
- for field , value in meta .items ()
111
- if field not in [self .version_field , "ocpMajorVersion" , "not" ]
112
- ]
113
-
114
- for field , value in meta .get ("not" , {}).items ():
115
- must_not_clause .append (Q ("match" , ** {field : str (value )}))
106
+ for field , value in meta .items ():
107
+ if field in ["ocpVersion" , "ocpMajorVersion" ]:
108
+ continue
109
+ if field != "not" :
110
+ must_clause .append (Q ("match" , ** {field : str (value )}))
111
+ else :
112
+ for not_field , not_value in meta ["not" ].items ():
113
+ must_not_clause .append (Q ("match" , ** {not_field : str (not_value )}))
116
114
117
115
if "ocpMajorVersion" in meta :
118
116
version = meta ["ocpMajorVersion" ]
@@ -135,26 +133,32 @@ def get_uuid_by_metadata(
135
133
filter = filter_clause ,
136
134
)
137
135
s = (
138
- Search (using = self .es , index = index )
136
+ Search (using = self .es , index = self . index )
139
137
.query (query )
140
138
.sort ({timestamp_field : {"order" : "desc" }})
141
139
.extra (size = lookback_size )
142
140
)
143
- result = self .query_index (index , s )
141
+ result = self .query_index (s )
144
142
hits = result .hits .hits
145
143
uuids_docs = []
146
144
for hit in hits :
147
145
if "buildUrl" in hit ["_source" ]:
148
- uuids_docs .append ({
146
+ uuids_docs .append (
147
+ {
149
148
self .uuid_field : hit .to_dict ()["_source" ][self .uuid_field ],
150
- "buildUrl" : hit .to_dict ()["_source" ]["buildUrl" ]})
149
+ "buildUrl" : hit .to_dict ()["_source" ]["buildUrl" ],
150
+ }
151
+ )
151
152
else :
152
- uuids_docs .append ({
153
+ uuids_docs .append (
154
+ {
153
155
self .uuid_field : hit .to_dict ()["_source" ][self .uuid_field ],
154
- "buildUrl" : "http://bogus-url" })
156
+ "buildUrl" : "http://bogus-url" ,
157
+ }
158
+ )
155
159
return uuids_docs
156
160
157
- def match_kube_burner (self , uuids : List [str ], index : str ) -> List [Dict [str , Any ]]:
161
+ def match_kube_burner (self , uuids : List [str ]) -> List [Dict [str , Any ]]:
158
162
"""match kube burner runs
159
163
Args:
160
164
uuids (list): list of uuids
@@ -170,9 +174,11 @@ def match_kube_burner(self, uuids: List[str], index: str) -> List[Dict[str, Any]
170
174
],
171
175
)
172
176
search = (
173
- Search (using = self .es , index = index ).query (query ).extra (size = self .search_size )
177
+ Search (using = self .es , index = self .index )
178
+ .query (query )
179
+ .extra (size = self .search_size )
174
180
)
175
- result = self .query_index (index , search )
181
+ result = self .query_index (search )
176
182
runs = [item .to_dict ()["_source" ] for item in result .hits .hits ]
177
183
return runs
178
184
@@ -193,10 +199,9 @@ def filter_runs(self, pdata: Dict[Any, Any], data: Dict[Any, Any]) -> List[str]:
193
199
ids_df = ndf .loc [df ["jobConfig.jobIterations" ] == iterations ]
194
200
return ids_df ["uuid" ].to_list ()
195
201
196
- def getResults (
202
+ def get_results (
197
203
self , uuid : str ,
198
204
uuids : List [str ],
199
- index_str : str ,
200
205
metrics : Dict [str , Any ]
201
206
) -> Dict [Any , Any ]:
202
207
"""
@@ -205,7 +210,6 @@ def getResults(
205
210
Args:
206
211
uuid (str): _description_
207
212
uuids (list): _description_
208
- index_str (str): _description_
209
213
metrics (dict): _description_
210
214
211
215
Returns:
@@ -232,24 +236,23 @@ def getResults(
232
236
],
233
237
)
234
238
search = (
235
- Search (using = self .es , index = index_str )
239
+ Search (using = self .es , index = self . index )
236
240
.query (query )
237
241
.extra (size = self .search_size )
238
242
)
239
- result = self .query_index (index_str , search )
243
+ result = self .query_index (search )
240
244
runs = [item .to_dict ()["_source" ] for item in result .hits .hits ]
241
245
return runs
242
246
243
247
def get_agg_metric_query (
244
248
self , uuids : List [str ],
245
- index : str ,
246
249
metrics : Dict [str , Any ],
247
- timestamp_field : str = "timestamp" ):
250
+ timestamp_field : str = "timestamp"
251
+ ):
248
252
"""burner_metric_query will query for specific metrics data.
249
253
250
254
Args:
251
255
uuids (list): List of uuids
252
- index (str): ES/OS Index to query from
253
256
metrics (dict): metrics defined in es index metrics
254
257
"""
255
258
metric_queries = []
@@ -266,12 +269,14 @@ def get_agg_metric_query(
266
269
query = Q (
267
270
"bool" ,
268
271
must = [
269
- Q ("terms" , ** {self .uuid_field + ".keyword" : uuids }),
272
+ Q ("terms" , ** {self .uuid_field + ".keyword" : uuids }),
270
273
metric_query ,
271
274
],
272
275
)
273
276
search = (
274
- Search (using = self .es , index = index ).query (query ).extra (size = self .search_size )
277
+ Search (using = self .es , index = self .index )
278
+ .query (query )
279
+ .extra (size = self .search_size )
275
280
)
276
281
agg_value = metrics ["agg" ]["value" ]
277
282
agg_type = metrics ["agg" ]["agg_type" ]
@@ -281,15 +286,15 @@ def get_agg_metric_query(
281
286
search .aggs .bucket (
282
287
"uuid" , "terms" , field = self .uuid_field + ".keyword" , size = self .search_size
283
288
).metric (agg_value , agg_type , field = metrics ["metric_of_interest" ])
284
- result = self .query_index (index , search )
289
+ result = self .query_index (search )
285
290
data = self .parse_agg_results (result , agg_value , agg_type , timestamp_field )
286
291
return data
287
292
288
293
def parse_agg_results (
289
294
self , data : Dict [Any , Any ],
290
295
agg_value : str ,
291
296
agg_type : str ,
292
- timestap_field : str = "timestamp"
297
+ timestamp_field : str = "timestamp"
293
298
) -> List [Dict [Any , Any ]]:
294
299
"""parse out CPU data from kube-burner query
295
300
Args:
@@ -306,7 +311,7 @@ def parse_agg_results(
306
311
for stamp in stamps :
307
312
dat = {}
308
313
dat [self .uuid_field ] = stamp .key
309
- dat [timestap_field ] = stamp .time .value_as_string
314
+ dat [timestamp_field ] = stamp .time .value_as_string
310
315
agg_values = next (
311
316
(item for item in agg_buckets if item .key == stamp .key ), None
312
317
)
@@ -320,7 +325,7 @@ def parse_agg_results(
320
325
def convert_to_df (
321
326
self , data : Dict [Any , Any ],
322
327
columns : List [str ] = None ,
323
- timestamp_field : str = "timestamp"
328
+ timestamp_field : str = "timestamp"
324
329
) -> pd .DataFrame :
325
330
"""convert to a dataframe
326
331
Args:
0 commit comments