Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion metrics_utility/anonymized_rollups/anonymized_rollups.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ def _merge_ansible_versions(jobs_by_job_type: List[Dict[str, Any]]) -> List[str]
ansible_versions = job.get('ansible_versions', [])
if isinstance(ansible_versions, list):
ansible_versions_set.update(ansible_versions)
return sorted(list(ansible_versions_set)) if ansible_versions_set else []
return sorted(ansible_versions_set) if ansible_versions_set else []


def _calculate_execution_environments_total(execution_environments: Dict[str, Any]) -> Any:
Expand Down
9 changes: 2 additions & 7 deletions metrics_utility/anonymized_rollups/base_anonymized_rollup.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def prepare(self, dataframe):
return dataframe

# Base receive the full daily rollup and computes some final statistics for the day
def base(self, dataframe):
def base(self, _dataframe):
return pd.DataFrame()

def save_rollup(self, rollup_data: dict, base_path: str, since: datetime, until: datetime, packed: bool = True) -> None:
Expand Down Expand Up @@ -95,12 +95,7 @@ def save_rollup(self, rollup_data: dict, base_path: str, since: datetime, until:
df.to_csv(csv_buffer, index=False)
tar_files[f'{key}.csv'] = csv_buffer.getvalue().encode('utf-8')

elif isinstance(value, list):
# Sanitize and store JSON data in memory for tar
sanitized_value = sanitize_json(value)
tar_files[f'{filename}.json'] = json.dumps(sanitized_value, indent=2).encode('utf-8')

elif isinstance(value, dict):
elif isinstance(value, (list, dict)):
# Sanitize and store JSON data in memory for tar
sanitized_value = sanitize_json(value)
tar_files[f'{filename}.json'] = json.dumps(sanitized_value, indent=2).encode('utf-8')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def merge(self, data_all, data_new):
credential_types_new = set(data_new.get('credential_types', []))

# Union the sets and convert back to sorted list
credential_types_merged = sorted(list(credential_types_all.union(credential_types_new)))
credential_types_merged = sorted(credential_types_all.union(credential_types_new))

return {
'credential_types': credential_types_merged,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

# Regex pattern to match collection names (e.g., namespace.collection.role or namespace.collection.role.task)
# Pattern is safe from reDOS: uses non-capturing groups and non-nested quantifiers
# Uses explicit ASCII character class [A-Za-z0-9_] rather than \w to avoid matching Unicode word chars
_COLLECTION_RE = re.compile(r'^([A-Za-z0-9_]+)\.([A-Za-z0-9_]+)\.[A-Za-z0-9_]+(?:\.[A-Za-z0-9_]+)*$')
_COLLECTION_PATTERN = r'^([A-Za-z0-9_]+\.[A-Za-z0-9_]+)\.[A-Za-z0-9_]+(?:\.[A-Za-z0-9_]+)*$'

Expand Down Expand Up @@ -138,7 +139,7 @@ def _merge_list_columns(self, item_all, item_new, merged_item):
list_new = item_new.get(col) if item_new.get(col) is not None else []
set_all = set(list_all) if isinstance(list_all, list) else set()
set_new = set(list_new) if isinstance(list_new, list) else set()
merged_item[col] = sorted(list(set_all.union(set_new)))
merged_item[col] = sorted(set_all.union(set_new))

def _merge_single_item(self, item_all, item_new):
"""Merge a single item from all and new data."""
Expand Down Expand Up @@ -184,7 +185,7 @@ def _merge_unique_modules(self, data_all, data_new):
"""Merge unique_modules lists (union and sort)."""
unique_modules_all = set(data_all.get('unique_modules', []))
unique_modules_new = set(data_new.get('unique_modules', []))
return sorted(list(unique_modules_all.union(unique_modules_new)))
return sorted(unique_modules_all.union(unique_modules_new))

def _merge_modules_per_playbook(self, data_all, data_new):
"""Merge modules_per_playbook dicts (union lists per playbook)."""
Expand All @@ -197,14 +198,14 @@ def _merge_modules_per_playbook(self, data_all, data_new):
list_new = modules_per_playbook_new.get(playbook, []) or []
set_all = set(list_all) if isinstance(list_all, list) else set()
set_new = set(list_new) if isinstance(list_new, list) else set()
modules_per_playbook[playbook] = sorted(list(set_all.union(set_new)))
modules_per_playbook[playbook] = sorted(set_all.union(set_new))
return modules_per_playbook

def _merge_unique_hosts(self, data_all, data_new):
"""Merge unique_hosts lists (union and sort)."""
unique_hosts_all = set(data_all.get('unique_hosts', []))
unique_hosts_new = set(data_new.get('unique_hosts', []))
return sorted(list(unique_hosts_all.union(unique_hosts_new)))
return sorted(unique_hosts_all.union(unique_hosts_new))

def merge(self, data_all, data_new):
"""
Expand Down Expand Up @@ -326,7 +327,7 @@ def _parse_and_check_json_array(x):
if isinstance(parsed, list):
return len(parsed) > 0
return bool(parsed)
except (json.JSONDecodeError, TypeError, ValueError):
except (TypeError, ValueError):
return False

def _parse_warnings_deprecations(self, dataframe):
Expand Down Expand Up @@ -515,7 +516,7 @@ def _compute_all_stats(self, task_summary):
def _convert_set_or_list_to_sorted_list(value):
"""Convert set or list to sorted list, return empty list for other types."""
if isinstance(value, set):
return sorted(list(value))
return sorted(value)
if isinstance(value, list):
return value
return []
Expand Down Expand Up @@ -557,15 +558,15 @@ def _convert_stats_to_json(self, module_stats, collection_stats, role_stats):

def _compute_unique_metadata(self, task_summary):
"""Compute unique_modules, modules_per_playbook, and unique_hosts."""
unique_modules = sorted(list(set(task_summary['module_name'].dropna().unique())))
unique_modules = sorted(set(task_summary['module_name'].dropna().unique()))

modules_per_playbook = {}
for playbook in task_summary['playbook'].dropna().unique():
modules_in_playbook = sorted(list(set(task_summary[task_summary['playbook'] == playbook]['module_name'].dropna().unique())))
modules_in_playbook = sorted(set(task_summary[task_summary['playbook'] == playbook]['module_name'].dropna().unique()))
modules_per_playbook[playbook] = modules_in_playbook

host_sets = [s for s in task_summary['host_ids'].dropna() if isinstance(s, set)]
unique_hosts = sorted(list(set().union(*host_sets))) if host_sets else []
unique_hosts = sorted(set().union(*host_sets)) if host_sets else []

return unique_modules, modules_per_playbook, unique_hosts

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

from dateutil.relativedelta import relativedelta

from metrics_utility.dataframe_schema import DataframeSchemaMixin


def granularity_cast(date, granularity):
if granularity == 'monthly':
Expand Down Expand Up @@ -95,14 +97,14 @@ def combine_json_values(val1, val2):
return merged


class Base:
class Base(DataframeSchemaMixin):
def __init__(self, extractor, month, extra_params):
self.extractor = extractor
self.month = month
self.extra_params = extra_params

def build_dataframe(self):
pass
return None

def dates(self):
if self.extra_params.get('since_date') is not None:
Expand Down Expand Up @@ -139,11 +141,11 @@ def summarize_merged_dataframes(self, df, columns, operations={}):
elif operations.get(col) == 'max':
df[col] = df[[f'{col}_x', f'{col}_y']].max(axis=1)
elif operations.get(col) == 'combine_set':
df[col] = df.apply(lambda row: combine_set(row.get(f'{col}_x'), row.get(f'{col}_y')), axis=1)
df[col] = df.apply(lambda row, c=col: combine_set(row.get(f'{c}_x'), row.get(f'{c}_y')), axis=1)
elif operations.get(col) == 'combine_json':
df[col] = df.apply(lambda row: combine_json(row.get(f'{col}_x'), row.get(f'{col}_y')), axis=1)
df[col] = df.apply(lambda row, c=col: combine_json(row.get(f'{c}_x'), row.get(f'{c}_y')), axis=1)
elif operations.get(col) == 'combine_json_values':
df[col] = df.apply(lambda row: combine_json_values(row.get(f'{col}_x'), row.get(f'{col}_y')), axis=1)
df[col] = df.apply(lambda row, c=col: combine_json_values(row.get(f'{c}_x'), row.get(f'{c}_y')), axis=1)
else:
df[col] = df[[f'{col}_x', f'{col}_y']].sum(axis=1)
del df[f'{col}_x']
Expand All @@ -158,7 +160,7 @@ def merge(self, rollup, new_group):
if rollup is None:
return new_group

rollup = pd.merge(rollup.loc[:,], new_group.loc[:,], on=self.unique_index_columns(), how='outer')
rollup = pd.merge(rollup.loc[:,], new_group.loc[:,], on=self.unique_index_columns(), how='outer', validate='one_to_one')
rollup = self.summarize_merged_dataframes(rollup, self.data_columns(), operations=self.operations())
return self.cast_dataframe(rollup, self.cast_types())

Expand All @@ -180,19 +182,3 @@ def dedup(self, dataframe, hostname_mapping=None):
# cast types to match the table
df_grouped = self.cast_dataframe(df_grouped, self.cast_types())
return df_grouped.reset_index()

@staticmethod
def unique_index_columns():
pass

@staticmethod
def data_columns():
pass

@staticmethod
def cast_types():
pass

@staticmethod
def operations():
pass
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@

from metrics_utility.automation_controller_billing.dataframe_engine.base import Base, merge_setdicts, merge_sets
from metrics_utility.automation_controller_billing.helpers import merge_arrays, merge_json_sets, parse_json_array
from metrics_utility.dataframe_schema import JobHostSummarySchema
from metrics_utility.metric_utils import DIRECT, INDIRECT, MANAGED_NODE_TYPES


# dataframe for job_host_summary / main_indirectmanagednodeaudit
class DataframeJobhostSummaryUsage(Base):
class DataframeJobhostSummaryUsage(JobHostSummarySchema, Base):
def build_dataframe(self):
# A daily rollup dataframe
billing_data_monthly_rollup = None
Expand Down Expand Up @@ -113,8 +114,6 @@ def group(self, dataframe):
job_created=('job_created', 'max'),
managed_node_type=('managed_node_type', 'min'),
managed_node_types_set=('managed_node_type_string', set),
# TODO: optimize the aggregation to keep less rows around
# job_ids=('inventory_name', set),
events=('events', merge_arrays),
canonical_facts=('canonical_facts', merge_json_sets),
facts=('facts', merge_json_sets),
Expand All @@ -138,51 +137,6 @@ def regroup(self, dataframe):
host_names_before_dedup=('host_names_before_dedup', merge_sets),
)

@staticmethod
def unique_index_columns():
return ['organization_name', 'job_template_name', 'host_name', 'original_host_name', 'install_uuid', 'job_remote_id']

@staticmethod
def data_columns():
return [
'host_runs',
'task_runs',
'first_automation',
'last_automation',
'job_created',
'managed_node_type',
'managed_node_types_set',
'canonical_facts',
'facts',
'events',
'host_names_before_dedup',
]

@staticmethod
def cast_types():
return {
'task_runs': int,
'host_runs': int,
'managed_node_type': int,
'first_automation': 'datetime64[ns]',
'last_automation': 'datetime64[ns]',
'job_created': 'datetime64[ns]',
}

@staticmethod
def operations():
return {
'first_automation': 'min',
'last_automation': 'max',
'job_created': 'max',
'managed_node_type': 'min',
'managed_node_types_set': 'combine_set',
'events': 'combine_set',
'canonical_facts': 'combine_json_values',
'facts': 'combine_json_values',
'host_names_before_dedup': 'combine_set',
}

def dedup(self, dataframe, hostname_mapping=None, scope_dataframe=None):
"""
Override dedup method to enrich canonical facts and facts from scope_dataframe
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def _build_deduped_record(self, dupes, latest_hostname, dupes_clean=None):

def stringify(self, value):
"""Convert a set of values to a comma-separated string, filtering out None."""
return ', '.join([v for v in list(value) if v is not None])
return ', '.join([v for v in value if v is not None])

def run(self):
"""Abstract method to be implemented by subclasses."""
Expand Down Expand Up @@ -76,7 +76,7 @@ def run(self):

# Iterative search to cover indirect relationships
iterations = int(self.extra_params['report_renewal_guidance_dedup_iterations'])
for i in range(iterations):
for _ in range(iterations):
# Hostname dupe lookup
dupes = self.find_dupes(dupes, 'hostname', dupes['hostname'])

Expand Down Expand Up @@ -312,7 +312,7 @@ def _group_by_individual_serial(self, expanded_df, serial_groups, processed_host
if row['hostname_group'] not in processed_hostname_groups:
for serial in row['individual_serials']:
if serial:
serial_matches = expanded_df[expanded_df['individual_serials'].apply(lambda x: serial in x if x else False)]
serial_matches = expanded_df[expanded_df['individual_serials'].apply(lambda x, s=serial: s in x if x else False)]
hostname_groups_in_serial = serial_matches['hostname_group'].unique()
if len(hostname_groups_in_serial) > 1:
canonical_group = hostname_groups_in_serial[0]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ def shipping_auth_mode(self):

def is_shipping_configured(self):
# TODO: move to base, or children
ret = super()
if ret is False:
ret = super().is_shipping_configured()
if not ret:
return False

if self.shipping_auth_mode() == self.SHIPPING_AUTH_SERVICE_ACCOUNT:
Expand Down
9 changes: 4 additions & 5 deletions metrics_utility/automation_controller_billing/report/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,14 +86,14 @@ def add_dedup_labels_if_needed(self, labels, column_names):
def convert_cell(self, cell):
# If the cell is a dictionary, convert each set value to a sorted list, then dump as a JSON string.
if isinstance(cell, dict):
new_cell = {k: sorted(list(v)) if isinstance(v, set) else v for k, v in cell.items()}
new_cell = {k: sorted(v) if isinstance(v, set) else v for k, v in cell.items()}
return json.dumps(new_cell)
# If the cell itself is a set, convert it to a sorted list and then to a JSON string.
elif isinstance(cell, set):
return json.dumps(sorted(list(cell)))
return json.dumps(sorted(cell))
# If the cell is a list, convert any set elements inside to sorted lists and dump as a JSON string.
elif isinstance(cell, list):
new_cell = [sorted(list(item)) if isinstance(item, set) else item for item in cell]
new_cell = [sorted(item) if isinstance(item, set) else item for item in cell]
# Sort the list itself if it contains strings
if new_cell and all(isinstance(item, str) for item in new_cell):
new_cell = sorted(new_cell)
Expand Down Expand Up @@ -135,7 +135,7 @@ def apply_mapping(row):

return destination_dataframe

def _build_data_section_scope(self, current_row, ws, dataframe, mode=None):
def _build_data_section_scope(self, current_row, ws, dataframe):
header_font = Font(name=self.FONT, size=10, color=self.BLACK_COLOR_HEX, bold=True)
value_font = Font(name=self.FONT, size=10, color=self.BLACK_COLOR_HEX)

Expand Down Expand Up @@ -505,7 +505,6 @@ def _build_data_section_usage_by_roles(self, current_row, ws, dataframe):
for c_idx, value in enumerate(row, 1):
cell = ws.cell(row=r_idx, column=c_idx)
cell.value = value
# cell.border = dotted_border

if row_counter == 0:
# set header style
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -528,14 +528,11 @@ def _build_heading_h1(self, current_row, ws):
h1_heading_cell = ws.cell(row=current_row, column=1)
h1_heading_cell.value = self.config['h1_heading']['value']

# h1_heading_cell.fill = PatternFill("solid", fgColor=self.BLACK_COLOR_HEX)
h1_heading_cell.font = Font(
name=self.FONT,
size=12,
bold=True,
) # color=self.WHITE_COLOR_HEX)

# h1_heading_cell.alignment = Alignment(horizontal='center')
)

current_row += 1
return current_row
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def __init__(self, extra_params):
self.s3_handler = S3Handler(params=self.extra_params)

def report_exist(self):
return len([file for file in self.s3_handler.list_files(self.report_spreadsheet_destination_path)]) > 0
return len(list(self.s3_handler.list_files(self.report_spreadsheet_destination_path))) > 0

def save(self, report_spreadsheet):
with tempfile.TemporaryDirectory(prefix='report_saver_billing_data_') as temp_dir:
Expand Down
4 changes: 2 additions & 2 deletions metrics_utility/base/package.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def __init__(self, collector):
self.total_data_size = 0

@classmethod
def max_data_size(cls):
def get_max_data_size(cls):
return cls.MAX_DATA_SIZE

def add_collection(self, collection):
Expand All @@ -83,7 +83,7 @@ def get_s3_configured(self):
pass

def has_free_space(self, requested_size):
return self.total_data_size + requested_size <= self.max_data_size()
return self.total_data_size + requested_size <= self.get_max_data_size()

def is_shipping_configured(self):
if not self.tar_path:
Expand Down
Loading