@@ -69,19 +69,21 @@ def get_metric_data(
6969 self .logger .info ("Collecting %s" , metric_name )
7070 try :
7171 if "agg" in metric :
72- metric_df , metric_dataframe_name = self .process_aggregation_metric (
72+ metric_df , metric_dataframe_names = self .process_aggregation_metric (
7373 uuids , metric , match , timestamp_field
7474 )
7575 else :
76- metric_df , metric_dataframe_name = self .process_standard_metric (
76+ metric_df , single_name = self .process_standard_metric (
7777 uuids , metric , match , metric_value_field , timestamp_field
7878 )
79+ metric_dataframe_names = [single_name ]
7980 metric ["labels" ] = labels
8081 metric ["direction" ] = direction
8182 metric ["threshold" ] = threshold
8283 metric ["correlation" ] = correlation
8384 metric ["context" ] = context
84- metrics_config [metric_dataframe_name ] = metric
85+ for metric_dataframe_name in metric_dataframe_names :
86+ metrics_config [metric_dataframe_name ] = metric
8587 dataframe_list .append (metric_df )
8688 self .logger .debug (metric_df )
8789 except Exception as e :
@@ -96,39 +98,64 @@ def get_metric_data(
9698 def process_aggregation_metric (
9799 self , uuids : List [str ], metric : Dict [str , Any ], match : Matcher , timestamp_field : str = "timestamp"
98100 ) -> pd .DataFrame :
99- """Method to get aggregated dataframe
101+ """
102+ Method to get an aggregated dataframe for a given metric.
100103
101104 Args:
102- uuids (List[str]): _description_
103- metric (Dict[str, Any]): _description_
104- match (Matcher): _description_
105+ uuids (List[str]): List of UUIDs to include in the aggregation.
106+ metric (Dict[str, Any]): Metric configuration dictionary.
107+ match (Matcher): Matcher instance for query operations.
108+ timestamp_field (str, optional): Timestamp field to use. Defaults to "timestamp".
105109
106110 Returns:
107- pd.DataFrame: _description_
111+ pd.DataFrame: Aggregated metric dataframe and list of metric column names.
108112 """
113+ self .logger .info ("process_aggregation_metric" )
109114 aggregated_metric_data = match .get_agg_metric_query (uuids , metric , timestamp_field )
115+ self .logger .info ("aggregated_metric_data %s" , aggregated_metric_data )
110116 aggregation_value = metric ["agg" ]["value" ]
111117 aggregation_type = metric ["agg" ]["agg_type" ]
112- aggregation_name = f"{ aggregation_value } _{ aggregation_type } "
118+
119+ if aggregation_type == "percentiles" :
120+ percentile_prefix = f"{ aggregation_value } _{ aggregation_type } _"
121+ if aggregated_metric_data :
122+ agg_columns = [k for k in aggregated_metric_data [0 ].keys ()
123+ if k .startswith (percentile_prefix )]
124+ else :
125+ agg_columns = []
126+ self .logger .info ("percentile columns found: %s" , agg_columns )
127+ else :
128+ agg_columns = [f"{ aggregation_value } _{ aggregation_type } " ]
129+
130+ all_columns = [self .uuid_field , timestamp_field ] + agg_columns
131+
113132 if len (aggregated_metric_data ) == 0 :
114- aggregated_df = pd .DataFrame (columns = [ self . uuid_field , timestamp_field , aggregation_name ] )
133+ aggregated_df = pd .DataFrame (columns = all_columns )
115134 else :
116135 aggregated_df = match .convert_to_df (
117- aggregated_metric_data , columns = [ self . uuid_field , timestamp_field , aggregation_name ] ,
136+ aggregated_metric_data , columns = all_columns ,
118137 timestamp_field = timestamp_field
119138 )
120139 aggregated_df .loc [:, timestamp_field ] = aggregated_df [timestamp_field ].apply (self .standardize_timestamp )
121140
122141 aggregated_df = aggregated_df .drop_duplicates (subset = [self .uuid_field ], keep = "first" )
123- aggregated_metric_name = f"{ metric ['name' ]} _{ aggregation_type } "
124- aggregated_df = aggregated_df .rename (
125- columns = {aggregation_name : aggregated_metric_name }
126- )
142+
143+ rename_map = {}
144+ aggregated_metric_names = []
145+ for col in agg_columns :
146+ if aggregation_type == "percentiles" :
147+ suffix = col [len (f"{ aggregation_value } _{ aggregation_type } _" ):]
148+ new_name = f"{ metric ['name' ]} _{ aggregation_type } _{ suffix } "
149+ else :
150+ new_name = f"{ metric ['name' ]} _{ aggregation_type } "
151+ rename_map [col ] = new_name
152+ aggregated_metric_names .append (new_name )
153+
154+ aggregated_df = aggregated_df .rename (columns = rename_map )
127155 if timestamp_field != "timestamp" :
128- aggregated_df = aggregated_df .rename (
129- columns = {timestamp_field : "timestamp" }
130- )
131- return aggregated_df , aggregated_metric_name
156+ aggregated_df = aggregated_df .rename (columns = {timestamp_field : "timestamp" })
157+
158+ return aggregated_df , aggregated_metric_names
132159
133160 def standardize_timestamp (self , timestamp : Any ) -> str :
134161 """Method to standardize timestamp formats
0 commit comments