1
1
from prometheus_client import start_http_server , Gauge
2
2
import pandas as pd
3
3
import time
4
+ import argparse
4
5
import os
5
- from concurrent .futures import ThreadPoolExecutor
6
6
7
7
8
- output_path = '../output/'
8
+ execution_time_gauge = Gauge (
9
+ 's3_specs_time_metrics' ,
10
+ 'Tests time metrics' ,
11
+ ['execution_name' , 'execution_type' , 'time_metric' ]
12
+ )
13
+
14
+ # Gauge setup
15
+ execution_status_gauge = Gauge (
16
+ 's3_specs_status_data' ,
17
+ 'Test statuses' ,
18
+ ['name' , 'category' ]
19
+ )
20
+
21
+
22
+ parser = argparse .ArgumentParser ()
23
+ parser .add_argument ('--parquet_path' ,
24
+ required = True ,
25
+ help = 'Path of folder containing the execution_time and test parquet artifacts' )
26
+
27
+ parser = parser .parse_args () # Parse the arguments
9
28
10
29
def execution_metrics_exporter ():
11
- file_path = output_path + 'execution_time.parquet'
30
+ file_path = 'execution_time.parquet'
31
+ file_path = os .path .join (parser .parquet_path , file_path )
12
32
df = pd .read_parquet (file_path )
13
33
14
34
# Drop unnecessary columns and take the first 5 rows
@@ -19,30 +39,24 @@ def execution_metrics_exporter():
19
39
cleaned_time_metric_df ,
20
40
id_vars = ['execution_name' , 'execution_type' ],
21
41
value_vars = ['avg_time' , 'min_time' , 'total_time' ],
22
- var_name = 'type_time ' ,
42
+ var_name = 'time_metric ' ,
23
43
value_name = 'time_values'
24
44
).reset_index (drop = True ).drop_duplicates ()
25
45
26
- # Gauge setup
27
- execution_time_gauge = Gauge (
28
- 's3_specs_time_metrics' ,
29
- 'Tests time metrics' ,
30
- ['execution_name' , 'execution_type' , 'type_time' ]
31
- )
32
-
33
46
# Set gauge values
34
47
for record in melted_df .to_dict ('records' ):
35
48
execution_time_gauge .labels (
36
- Execution_name = record ['execution_name' ],
37
- Execution_type = record ['execution_type' ],
38
- type_time = record ['type_time ' ]
49
+ execution_name = record ['execution_name' ],
50
+ execution_type = record ['execution_type' ],
51
+ time_metric = record ['time_metric ' ]
39
52
).set (record ['time_values' ])
40
53
41
54
print ('Time metrics exported...' )
42
55
43
56
def test_metrics_exporter ():
44
57
# Test data gauge
45
- file_path = output_path + 'tests.parquet'
58
+ file_path = 'tests.parquet'
59
+ file_path = os .path .join (parser .parquet_path , file_path )
46
60
df = pd .read_parquet (file_path )
47
61
48
62
status_to_numeric = {
@@ -55,13 +69,6 @@ def test_metrics_exporter():
55
69
cleaned_status_df ['status' ] = cleaned_status_df ['status' ].map (status_to_numeric ).drop_duplicates ()
56
70
dicts_time_metric_df = cleaned_status_df .to_dict (orient = 'records' )
57
71
58
- # Gauge setup
59
- execution_status_gauge = Gauge (
60
- 's3_specs_status_data' ,
61
- 'Test statuses' ,
62
- ['name' , 'category' ]
63
- )
64
-
65
72
# Set gauge values
66
73
for record in dicts_time_metric_df :
67
74
execution_status_gauge .labels (
0 commit comments