diff --git a/metrics_utility/library/dataframes/base_traditional.py b/metrics_utility/library/dataframes/base_traditional.py index f4aa2b930..c4213594a 100644 --- a/metrics_utility/library/dataframes/base_traditional.py +++ b/metrics_utility/library/dataframes/base_traditional.py @@ -11,24 +11,6 @@ # a dataframe class with logic for merges based on lists of indexes and merge operations # used by DataframeMainJobevent, DataframeMainHost and DataframeJobHostSummary class BaseTraditional(BaseDataframe): - def cast_dataframe(self, df): - types = self.cast_types() - levels = [] - if len(self.unique_index_columns()) == 1: - # Special behavior if the index is not composite, but only 1 column - # Casting index field to object - df.index = df.index.astype(object) - else: - # Composite index branch - # Casting index field to object - for index, _level in enumerate(df.index.levels): - casted_level = df.index.levels[index].astype(object) - levels.append(casted_level) - - df.index = df.index.set_levels(levels) - - return df.astype(types) - def dedup(self, dataframe, hostname_mapping=None, **kwargs): if dataframe is None or dataframe.empty: return self.empty() @@ -45,41 +27,13 @@ def dedup(self, dataframe, hostname_mapping=None, **kwargs): df_grouped = self.regroup(df) # cast types to match the table - df_grouped = self.cast_dataframe(df_grouped) - return df_grouped.reset_index() + df_grouped = df_grouped.astype(self.cast_types()) - def summarize_merged_dataframes(self, df, columns, operations={}): - for col in columns: - if operations.get(col) == 'min': - df[col] = df[[f'{col}_x', f'{col}_y']].min(axis=1) - elif operations.get(col) == 'max': - df[col] = df[[f'{col}_x', f'{col}_y']].max(axis=1) - elif operations.get(col) == 'combine_set': - df[col] = df.apply(lambda row: combine_set(row.get(f'{col}_x'), row.get(f'{col}_y')), axis=1) - elif operations.get(col) == 'combine_json': - df[col] = df.apply(lambda row: combine_json(row.get(f'{col}_x'), row.get(f'{col}_y')), axis=1) - elif operations.get(col) == 'combine_json_values': - df[col] = df.apply(lambda row: combine_json_values(row.get(f'{col}_x'), row.get(f'{col}_y')), axis=1) - else: - df[col] = df[[f'{col}_x', f'{col}_y']].sum(axis=1) - del df[f'{col}_x'] - del df[f'{col}_y'] - return df + return df_grouped.reset_index() def empty(self): return pd.DataFrame(columns=self.unique_index_columns() + self.data_columns()) - # Multipart collection, merge the dataframes and sum counts - # used by BaseDataframe.add_rollup - def merge(self, rollup, new_group): - if rollup is None: - return new_group - - rollup = pd.merge(rollup.loc[:,], new_group.loc[:,], on=self.unique_index_columns(), how='outer') - rollup = self.summarize_merged_dataframes(rollup, self.data_columns(), operations=self.operations()) - rollup = self.cast_dataframe(rollup) - return rollup - @staticmethod def cast_types(): pass @@ -88,10 +42,6 @@ def cast_types(): def data_columns(): pass - @staticmethod - def operations(): - pass - @staticmethod def unique_index_columns(): pass diff --git a/metrics_utility/library/dataframes/job_host_summary.py b/metrics_utility/library/dataframes/job_host_summary.py index da9c39bbd..4a8a1ed59 100644 --- a/metrics_utility/library/dataframes/job_host_summary.py +++ b/metrics_utility/library/dataframes/job_host_summary.py @@ -2,6 +2,8 @@ from metrics_utility.library.dataframes.base_traditional import ( BaseTraditional, + combine_json_values, + combine_set, merge_arrays, merge_json_sets, merge_setdicts, @@ -97,7 +99,17 @@ def group(self, dataframe): facts=('facts', merge_json_sets), host_names_before_dedup=('host_names_before_dedup', set), ) - return self.cast_dataframe(group) + + return group.astype( + { + 'task_runs': int, + 'host_runs': int, + 'managed_node_type': int, + 'first_automation': 'datetime64[ns]', + 'last_automation': 'datetime64[ns]', + 'job_created': 'datetime64[ns]', + } + ) # Merge pre-aggregated def regroup(self, dataframe): @@ -115,6 +127,68 @@ def regroup(self, dataframe): host_names_before_dedup=('host_names_before_dedup', merge_sets), ) + def merge(self, rollup, new_group): + if rollup is None: + return new_group + + df = pd.merge(rollup.loc[:,], new_group.loc[:,], on=self.unique_index_columns(), how='outer') + + # Apply aggregations directly + df['host_runs'] = df[['host_runs_x', 'host_runs_y']].sum(axis=1) + df['task_runs'] = df[['task_runs_x', 'task_runs_y']].sum(axis=1) + df['first_automation'] = df[['first_automation_x', 'first_automation_y']].min(axis=1) + df['last_automation'] = df[['last_automation_x', 'last_automation_y']].max(axis=1) + df['job_created'] = df[['job_created_x', 'job_created_y']].max(axis=1) + df['managed_node_type'] = df[['managed_node_type_x', 'managed_node_type_y']].min(axis=1) + df['managed_node_types_set'] = df.apply( + lambda row: combine_set(row.get('managed_node_types_set_x'), row.get('managed_node_types_set_y')), axis=1 + ) + df['events'] = df.apply(lambda row: combine_set(row.get('events_x'), row.get('events_y')), axis=1) + df['canonical_facts'] = df.apply(lambda row: combine_json_values(row.get('canonical_facts_x'), row.get('canonical_facts_y')), axis=1) + df['facts'] = df.apply(lambda row: combine_json_values(row.get('facts_x'), row.get('facts_y')), axis=1) + df['host_names_before_dedup'] = df.apply( + lambda row: combine_set(row.get('host_names_before_dedup_x'), row.get('host_names_before_dedup_y')), axis=1 + ) + + # Drop the _x and _y columns + df = df.drop( + columns=[ + 'host_runs_x', + 'host_runs_y', + 'task_runs_x', + 'task_runs_y', + 'first_automation_x', + 'first_automation_y', + 'last_automation_x', + 'last_automation_y', + 'job_created_x', + 'job_created_y', + 'managed_node_type_x', + 'managed_node_type_y', + 'managed_node_types_set_x', + 'managed_node_types_set_y', + 'events_x', + 'events_y', + 'canonical_facts_x', + 'canonical_facts_y', + 'facts_x', + 'facts_y', + 'host_names_before_dedup_x', + 'host_names_before_dedup_y', + ] + ) + + return df.astype( + { + 'task_runs': int, + 'host_runs': int, + 'managed_node_type': int, + 'first_automation': 'datetime64[ns]', + 'last_automation': 'datetime64[ns]', + 'job_created': 'datetime64[ns]', + } + ) + @staticmethod def unique_index_columns(): return ['organization_name', 'job_template_name', 'host_name', 'original_host_name', 'install_uuid', 'job_remote_id'] @@ -146,20 +220,6 @@ def cast_types(): 'job_created': 'datetime64[ns]', } - @staticmethod - def operations(): - return { - 'first_automation': 'min', - 'last_automation': 'max', - 'job_created': 'max', - 'managed_node_type': 'min', - 'managed_node_types_set': 'combine_set', - 'events': 'combine_set', - 'canonical_facts': 'combine_json_values', - 'facts': 'combine_json_values', - 'host_names_before_dedup': 'combine_set', - } - def dedup(self, dataframe, hostname_mapping=None, scope_dataframe=None, deduplicator=None): """ Override dedup method to enrich canonical facts and facts from scope_dataframe diff --git a/metrics_utility/library/dataframes/main_host.py b/metrics_utility/library/dataframes/main_host.py index d180e797b..0bd872fba 100644 --- a/metrics_utility/library/dataframes/main_host.py +++ b/metrics_utility/library/dataframes/main_host.py @@ -1,6 +1,14 @@ import pandas as pd -from metrics_utility.library.dataframes.base_traditional import BaseTraditional, merge_json_sets, merge_setdicts, merge_sets, parse_json +from metrics_utility.library.dataframes.base_traditional import ( + BaseTraditional, + combine_json_values, + combine_set, + merge_json_sets, + merge_setdicts, + merge_sets, + parse_json, +) def compute_serial(row): @@ -52,7 +60,8 @@ def group(self, dataframe): serials=('serial', set), host_names_before_dedup=('host_names_before_dedup', set), ) - return self.cast_dataframe(group) + + return group.astype({'last_automation': 'datetime64[ns]'}) # Merge pre-aggregated def regroup(self, dataframe): @@ -66,6 +75,45 @@ def regroup(self, dataframe): host_names_before_dedup=('host_names_before_dedup', merge_sets), ) + def merge(self, rollup, new_group): + if rollup is None: + return new_group + + df = pd.merge(rollup.loc[:,], new_group.loc[:,], on=self.unique_index_columns(), how='outer') + + # Apply aggregations directly + df['last_automation'] = df[['last_automation_x', 'last_automation_y']].max(axis=1) + df['organizations'] = df.apply(lambda row: combine_set(row.get('organizations_x'), row.get('organizations_y')), axis=1) + df['inventories'] = df.apply(lambda row: combine_set(row.get('inventories_x'), row.get('inventories_y')), axis=1) + df['canonical_facts'] = df.apply(lambda row: combine_json_values(row.get('canonical_facts_x'), row.get('canonical_facts_y')), axis=1) + df['facts'] = df.apply(lambda row: combine_json_values(row.get('facts_x'), row.get('facts_y')), axis=1) + df['serials'] = df.apply(lambda row: combine_set(row.get('serials_x'), row.get('serials_y')), axis=1) + df['host_names_before_dedup'] = df.apply( + lambda row: combine_set(row.get('host_names_before_dedup_x'), row.get('host_names_before_dedup_y')), axis=1 + ) + + # Drop the _x and _y columns + df = df.drop( + columns=[ + 'last_automation_x', + 'last_automation_y', + 'organizations_x', + 'organizations_y', + 'inventories_x', + 'inventories_y', + 'canonical_facts_x', + 'canonical_facts_y', + 'facts_x', + 'facts_y', + 'serials_x', + 'serials_y', + 'host_names_before_dedup_x', + 'host_names_before_dedup_y', + ] + ) + + return df.astype({'last_automation': 'datetime64[ns]'}) + @staticmethod def unique_index_columns(): return ['host_name', 'install_uuid'] @@ -77,15 +125,3 @@ def data_columns(): @staticmethod def cast_types(): return {'last_automation': 'datetime64[ns]'} - - @staticmethod - def operations(): - return { - 'last_automation': 'max', - 'organizations': 'combine_set', - 'inventories': 'combine_set', - 'canonical_facts': 'combine_json_values', - 'facts': 'combine_json_values', - 'serials': 'combine_set', - 'host_names_before_dedup': 'combine_set', - } diff --git a/metrics_utility/library/dataframes/main_jobevent.py b/metrics_utility/library/dataframes/main_jobevent.py index c53c798b7..c9c27eaf7 100644 --- a/metrics_utility/library/dataframes/main_jobevent.py +++ b/metrics_utility/library/dataframes/main_jobevent.py @@ -1,5 +1,7 @@ import re +import pandas as pd + from metrics_utility.library.dataframes.base_traditional import BaseTraditional @@ -54,8 +56,7 @@ def group(self, dataframe): # Duration is null in older versions of Controller group['duration'] = group.duration.fillna(0) - # Tweak types to match the table - return self.cast_dataframe(group) + return group.astype({'duration': 'float64', 'task_runs': 'int64'}) # Merge pre-aggregated def regroup(self, dataframe): @@ -64,6 +65,21 @@ def regroup(self, dataframe): duration=('duration', 'sum'), ) + def merge(self, rollup, new_group): + if rollup is None: + return new_group + + df = pd.merge(rollup.loc[:,], new_group.loc[:,], on=self.unique_index_columns(), how='outer') + + # Apply aggregations directly (both use sum) + df['task_runs'] = df[['task_runs_x', 'task_runs_y']].sum(axis=1) + df['duration'] = df[['duration_x', 'duration_y']].sum(axis=1) + + # Drop the _x and _y columns + df = df.drop(columns=['task_runs_x', 'task_runs_y', 'duration_x', 'duration_y']) + + return df.astype({'duration': 'float64', 'task_runs': 'int64'}) + @staticmethod def extract_collection_name(x): if x is None: @@ -98,7 +114,3 @@ def data_columns(): @staticmethod def cast_types(): return {'duration': 'float64', 'task_runs': 'int64'} - - @staticmethod - def operations(): - return {}