Skip to content

Commit 2315891

Browse files
authored
Merge branch 'devel' into cli-collectors
2 parents 0a41a58 + aa754e2 commit 2315891

24 files changed

Lines changed: 2080 additions & 452 deletions

.github/workflows/pytest.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ jobs:
9191
9292
- name: "Upload code coverage report"
9393
if: ${{ github.base_ref }}
94-
uses: actions/upload-artifact@v4
94+
uses: actions/upload-artifact@v7
9595
with:
9696
name: coverage-report
9797
path: coverage.xml
@@ -102,7 +102,7 @@ jobs:
102102

103103
- name: "Upload PR number"
104104
if: github.event_name == 'pull_request'
105-
uses: actions/upload-artifact@v4
105+
uses: actions/upload-artifact@v7
106106
with:
107107
name: pr_number
108108
path: pr_number.txt

.github/workflows/sonar_checks.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,15 @@ jobs:
2222
fetch-depth: 0
2323

2424
- name: Fetch Code Coverage Report
25-
uses: actions/download-artifact@v4
25+
uses: actions/download-artifact@v8
2626
with:
2727
name: coverage-report
2828
path: coverage-artifact
2929
github-token: ${{ secrets.GITHUB_TOKEN }}
3030
run-id: ${{ github.event.workflow_run.id }}
3131

3232
- name: Fetch PR Number
33-
uses: actions/download-artifact@v4
33+
uses: actions/download-artifact@v8
3434
with:
3535
name: pr_number
3636
path: pr-artifact

.pre-commit-config.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
repos:
22
- repo: https://github.com/astral-sh/ruff-pre-commit
3-
rev: v0.14.10
3+
rev: v0.15.4
44
hooks:
55
- id: ruff-check
66
- id: ruff-format
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
# Documentation of anonymized rollups
2+
3+
## 1. Collectors and Rollups
4+
5+
Each collector type has an associated anonymized rollup class that processes the collected data. Collectors are located in `metrics_utility/library/collectors/controller/`, and their corresponding rollup classes are in `metrics_utility/anonymized_rollups/`.
6+
7+
### Collector Types
8+
9+
Collectors fall into two categories:
10+
11+
- **Since-until collectors (time-series)**: These collectors require `since` and `until` parameters and collect data for a specific time range. They run hourly to collect incremental data. But they can be configured to run whatever we want.
12+
- **Snapshot collectors**: These collectors do not require time parameters and collect a point-in-time snapshot of the current state. They run once per day (or whever we want).
13+
14+
### Collector List
15+
16+
#### Time-Series Collectors (since-until)
17+
18+
1. **`unified_jobs`**
19+
- **Collector**: `metrics_utility/library/collectors/controller/unified_jobs.py`
20+
- **Rollup**: `metrics_utility/anonymized_rollups/jobs_anonymized_rollup.py` (`JobsAnonymizedRollup`)
21+
- **Description**: Collects unified job data including job status, duration, execution environment, inventory, organization, ansible version, installed collections, and job template information. Filters jobs by `finished` timestamp within the time range.
22+
23+
2. **`job_host_summary_service`**
24+
- **Collector**: `metrics_utility/library/collectors/controller/job_host_summary_service.py`
25+
- **Rollup**: `metrics_utility/anonymized_rollups/jobhostsummary_anonymized_rollup.py` (`JobHostSummaryAnonymizedRollup`)
26+
- **Description**: Collects job host summary data including task execution statistics (ok, failed, skipped, unreachable, etc.) per job and host. Uses partition-optimized queries for better performance.
27+
28+
3. **`credentials_service`**
29+
- **Collector**: `metrics_utility/library/collectors/controller/credentials_service.py`
30+
- **Rollup**: `metrics_utility/anonymized_rollups/credentials_anonymized_rollup.py` (`CredentialsAnonymizedRollup`)
31+
- **Description**: Collects credential usage data showing which credential types are used in jobs within the time range.
32+
33+
4. **`main_jobevent_service`**
34+
- **Collector**: `metrics_utility/library/collectors/controller/main_jobevent_service.py`
35+
- **Rollup**: `metrics_utility/anonymized_rollups/events_modules_anonymized_rollup.py` (`EventModulesAnonymizedRollup`)
36+
- **Description**: Collects job event data including module usage, collection usage, role usage, and event statistics. This is the largest collector and uses partition-optimized queries.
37+
38+
#### Snapshot Collectors
39+
40+
5. **`execution_environments`**
41+
- **Collector**: `metrics_utility/library/collectors/controller/execution_environments.py`
42+
- **Rollup**: `metrics_utility/anonymized_rollups/execution_environments_anonymized_rollup.py` (`ExecutionEnvironmentsAnonymizedRollup`)
43+
- **Description**: Collects execution environment statistics including count of default and custom execution environments.
44+
45+
6. **`table_metadata`**
46+
- **Collector**: `metrics_utility/library/collectors/controller/table_metadata.py`
47+
- **Rollup**: `metrics_utility/anonymized_rollups/table_metadata_anonymized_rollup.py` (`TableMetadataAnonymizedRollup`)
48+
- **Description**: Collects database table metadata including row counts and table sizes for various system tables. It is used for estimation of how many rows customer can have, and how large those tables are in terms of disc size.
49+
50+
7. **`controller_version_service`**
51+
- **Collector**: `metrics_utility/library/collectors/controller/controller_version_service.py`
52+
- **Rollup**: `metrics_utility/anonymized_rollups/controller_version_anonymized_rollup.py` (`ControllerVersionAnonymizedRollup`)
53+
- **Description**: Collects controller version information showing which versions of the controller are running.
54+
55+
## 2. Rollup Flow
56+
57+
The anonymized rollup process follows a multi-stage flow:
58+
59+
### Hourly Collection
60+
61+
1. **Collection**: Each time-series collector runs hourly, collecting data for a specific hour (e.g., 10:00-11:00). This is important, because otherwise we will
62+
not be able to compute data for whole day because of performance.
63+
64+
The data are then processed in batches (see prepare and merge below). Each batch computes basicaly hourly aggregate, which is much much smaller than raw data - it looks like json data with summaries, total counts, total durations...
65+
66+
Those summaries are updated with each batch (result of two hourly aggregates are then aggregated together - this is call rollups - rollups are basicaly hierarchical aggregates). Then this result is again aggregated with another hour and up until whole day.
67+
68+
The daily rollup is sent to the analytics team, who is then further aggregating our daily rollups into monthly and yearly rolups, but this is not part of our metrics utility.
69+
70+
2. **Prepare**: The raw dataframe from the collector is passed to the rollup's `prepare()` method, which:
71+
- Filters and preprocesses the data (e.g., filtering out unfinished jobs)
72+
- Performs initial aggregations
73+
- Returns a serializable dictionary or list (not a dataframe)
74+
75+
3. **Merge**: The result from `prepare()` is merged with the partial daily rollup using the `merge()` method:
76+
- The partial daily rollup is initially empty (None) for the first hour
77+
- Each subsequent hour's prepared data is merged into the accumulating daily rollup
78+
- Both the partial rollup and prepared data are serializable (JSON-compatible) structures
79+
- The merge operation combines these structures appropriately (e.g., concatenating lists, summing counts)
80+
81+
### Daily Base Processing
82+
83+
4. **Base**: After all hours for the day have been processed, the complete daily rollup is passed to the `base()` method, which:
84+
- Performs final aggregations and statistics computation if needed
85+
- Usualy quite short
86+
- Returns a dictionary with a `json` key containing the final rollup data
87+
88+
### Final Merging
89+
90+
5. **Combination**: All rollup results from `base()` are combined in `anonymized_rollups.py`:
91+
- Each rollup's `json` output is collected
92+
- All rollups are merged together using `anonymize_rollups()` function
93+
- The combined data is flattened into a single structure
94+
- Sensitive data is anonymized (see section 3)
95+
96+
## 3. Anonymization
97+
98+
After all rollups are merged, the data goes through anonymization:
99+
100+
1. **String Filtering**: Any string value that is not a built-in Python type or part of a public collection (defined in `collections.json`) is either:
101+
- Set to `"Unknown"` (for module names, collection names, role names with `collection_source == 'Unknown'`)
102+
- Filtered out entirely during collection (e.g., filtered by `manage` DB column or other filters)
103+
104+
3. **Sanitization**: NaN and infinity values are replaced with `None` to ensure valid JSON output.
105+
106+
The anonymization ensures that no sensitive customer data (like custom module names, collection names, or job template names) is exposed in the final output.
107+
108+
## 4. Message Splitting
109+
110+
The final anonymized rollup JSON is split into multiple messages for transmission to Segment.com:
111+
112+
1. **Top-level Key Splitting**: Each top-level key in the JSON dictionary becomes a separate message chunk. For example:
113+
- `statistics` → one chunk
114+
- `module_stats` → one or more chunks (if it's a list)
115+
- `jobs_by_job_type` → one or more chunks (if it's a list)
116+
117+
2. **Array Splitting**: If a top-level key contains an array (list), that array is split into multiple chunks if it exceeds the maximum message size:
118+
- Maximum size: 24KB (with empty space reserved for additional metadata)
119+
- Each chunk contains as many array items as can fit within the size limit
120+
- Items are never split across chunks
121+
122+
3. **Size Calculation**: The size of each chunk is calculated as the JSON-encoded byte size of the data.
123+
124+
4. **Dictionary Handling**: If a top-level key contains a dictionary (not a list), it is sent as a single chunk and cannot be split. Therefore, dictionaries must be smaller than the maximum message size.
125+
126+
The splitting logic is implemented in `metrics_utility/library/storage/segment.py` in the `_split_into_chunks()` method.
127+
128+
## 5. Testing
129+
130+
To test the anonymized rollup system, use the `run_no_events.py` script:
131+
132+
**Location**: `tools/anonymized_tests/run_no_events.py`
133+
134+
See more in the file itself
135+
136+
137+

metrics_utility/anonymized_rollups/anonymized_rollups.py

Lines changed: 64 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import hashlib
22
import json
33
import os
4+
import shutil
45

56
from typing import Any, Callable, Dict, List
67

@@ -17,6 +18,9 @@
1718
from metrics_utility.anonymized_rollups.table_metadata_anonymized_rollup import TableMetadataAnonymizedRollup
1819

1920

21+
OUT_BATCHES_DIR = './out/batches'
22+
23+
2024
def hash(value, salt):
2125
# has the value and salt, hash should be string
2226
combined = (salt + ':' + value).encode('utf-8')
@@ -147,23 +151,25 @@ def _get_default_host_summary_fields() -> Dict[str, int]:
147151
'skipped_total': 0,
148152
'ignored_total': 0,
149153
'rescued_total': 0,
150-
'unique_hosts_total': 0,
151154
'successful_hosts_total': 0,
152155
'failed_hosts_total': 0,
153156
'unreachable_hosts_total': 0,
154157
}
155158

156159

157160
def _extract_host_summary_fields(jhs_data: Dict[str, Any]) -> Dict[str, Any]:
158-
"""Extract host summary fields from job_host_summary data."""
161+
"""Extract host summary fields from job_host_summary data.
162+
163+
Note: unique_hosts_total is not included here as it's only computed at the top level,
164+
not per grouping.
165+
"""
159166
return {
160167
'dark_total': jhs_data.get('dark_total', 0),
161168
'failures_total': jhs_data.get('failures_total', 0),
162169
'ok_total': jhs_data.get('ok_total', 0),
163170
'skipped_total': jhs_data.get('skipped_total', 0),
164171
'ignored_total': jhs_data.get('ignored_total', 0),
165172
'rescued_total': jhs_data.get('rescued_total', 0),
166-
'unique_hosts_total': jhs_data.get('unique_hosts_total', 0),
167173
'successful_hosts_total': jhs_data.get('successful_hosts_total', 0),
168174
'failed_hosts_total': jhs_data.get('failed_hosts_total', 0),
169175
'unreachable_hosts_total': jhs_data.get('unreachable_hosts_total', 0),
@@ -204,10 +210,21 @@ def _calculate_sum_from_list(items: List[Dict[str, Any]], field: str) -> Any:
204210
return sum(item.get(field, 0) for item in items)
205211

206212

207-
def _calculate_host_summary_totals(job_host_summary_by_job_type: List[Dict[str, Any]]) -> Dict[str, Any]:
208-
"""Calculate host summary totals from job_type groups."""
213+
def _calculate_host_summary_totals(job_host_summary_by_job_type: List[Dict[str, Any]], host_ids: List[Any] = None) -> Dict[str, Any]:
214+
"""Calculate host summary totals from job_type groups.
215+
216+
Args:
217+
job_host_summary_by_job_type: List of job_type group dictionaries
218+
host_ids: Top-level list of host IDs to compute unique_hosts_total from
219+
"""
220+
# Compute unique_hosts_total from top-level host_ids list, not from groupings
221+
if host_ids is not None and isinstance(host_ids, list) and len(host_ids) > 0:
222+
unique_hosts_total = len(set(host_ids))
223+
else:
224+
unique_hosts_total = 0
225+
209226
return {
210-
'unique_hosts_total': _calculate_sum_from_list(job_host_summary_by_job_type, 'unique_hosts_total'),
227+
'unique_hosts_total': unique_hosts_total,
211228
'successful_hosts_total': _calculate_sum_from_list(job_host_summary_by_job_type, 'successful_hosts_total'),
212229
'failed_hosts_total': _calculate_sum_from_list(job_host_summary_by_job_type, 'failed_hosts_total'),
213230
'unreachable_hosts_total': _calculate_sum_from_list(job_host_summary_by_job_type, 'unreachable_hosts_total'),
@@ -217,6 +234,7 @@ def _calculate_host_summary_totals(job_host_summary_by_job_type: List[Dict[str,
217234
def _calculate_job_statistics(jobs_by_job_type: List[Dict[str, Any]]) -> Dict[str, Any]:
218235
"""Calculate job statistics by summing from all job_type groups."""
219236
return {
237+
'rollup_period_jobs_total': _calculate_sum_from_list(jobs_by_job_type, 'jobs_total'),
220238
'job_templates_total': _calculate_sum_from_list(jobs_by_job_type, 'templates_total'),
221239
'inventories_total': _calculate_sum_from_list(jobs_by_job_type, 'inventories_total'),
222240
'rollup_period_jobs_successful': _calculate_sum_from_list(jobs_by_job_type, 'jobs_successful_total'),
@@ -281,8 +299,8 @@ def _build_statistics(
281299
'rollup_period_execution_environments_total': execution_environments_total,
282300
'rollup_period_EE_default_total': execution_environments.get('execution_environments_default_total'),
283301
'rollup_period_EE_custom_total': execution_environments.get('execution_environments_custom_total'),
284-
# from jobs (top-level fields)
285-
'rollup_period_jobs_total': jobs.get('jobs_total'),
302+
# from jobs (computed from jobs_by_job_type aggregation)
303+
'rollup_period_jobs_total': job_statistics['rollup_period_jobs_total'],
286304
'rollup_period_jobs_successful': job_statistics['rollup_period_jobs_successful'],
287305
'rollup_period_jobs_failed': job_statistics['rollup_period_jobs_failed'],
288306
'rollup_period_jobs_duration_all_statuses_seconds': job_statistics['rollup_period_jobs_duration_all_statuses_seconds'],
@@ -404,8 +422,11 @@ def flatten_json_report(data: Dict[str, Any]) -> Dict[str, Any]:
404422
job_host_summary_by_launch_type: List[Dict[str, Any]] = job_host_summary_root.get('by_launch_type', []) or []
405423
job_host_summary_by_ansible_version: List[Dict[str, Any]] = job_host_summary_root.get('by_ansible_version', []) or []
406424

425+
# Extract top-level host_ids list to compute unique_hosts_total
426+
host_ids: List[Any] = job_host_summary_root.get('host_ids', []) or []
427+
407428
# Calculate statistics using helper functions
408-
host_summary_totals = _calculate_host_summary_totals(job_host_summary_by_job_type)
429+
host_summary_totals = _calculate_host_summary_totals(job_host_summary_by_job_type, host_ids)
409430
job_statistics = _calculate_job_statistics(jobs_by_job_type)
410431
playbooks_total = len(events_modules.get('modules_used_per_playbook_total', {}) or {})
411432
execution_environments_total = _calculate_execution_environments_total(execution_environments)
@@ -516,6 +537,11 @@ def anonymize_rollups(
516537

517538

518539
def compute_anonymized_rollup_from_raw_data(input_data, salt):
540+
541+
# delete everything in the directory ./out/batches (including subdirectories)
542+
if os.path.exists(OUT_BATCHES_DIR):
543+
shutil.rmtree(OUT_BATCHES_DIR, ignore_errors=True)
544+
519545
jobs = load_anonymized_rollup_data(JobsAnonymizedRollup(), input_data['unified_jobs'])
520546
jobs_result = JobsAnonymizedRollup().base(jobs)
521547

@@ -549,6 +575,12 @@ def compute_anonymized_rollup_from_raw_data(input_data, salt):
549575
)
550576
# Sanitize the result to replace NaN and infinity values with None (valid JSON)
551577
anonymized_rollup = sanitize_json(anonymized_rollup)
578+
579+
# save anonymized rollup to file
580+
os.makedirs(OUT_BATCHES_DIR, exist_ok=True)
581+
file_name = os.path.join(OUT_BATCHES_DIR, 'anonymized_rollup.json')
582+
with open(file_name, 'w') as f:
583+
f.write(json.dumps(anonymized_rollup, indent=2))
552584
return anonymized_rollup
553585

554586

@@ -563,12 +595,35 @@ def load_anonymized_rollup_data(rollup_object: BaseAnonymizedRollup, dataframe_l
563595

564596
concat_data = None
565597

598+
rollup_object_name = rollup_object.rollup_name
599+
600+
counter = -1
566601
for dataframe in dataframe_list:
602+
counter += 1
567603
# compat for CSVs
568604
if isinstance(dataframe, str):
569605
dataframe = pd.read_csv(dataframe, encoding='utf-8')
570606

571607
prepared_data = rollup_object.prepare(dataframe)
608+
# serialize prepared data to json
609+
prepared_data = json.dumps(prepared_data, indent=2)
610+
# back to dict/list
611+
prepared_data = json.loads(prepared_data)
572612
concat_data = rollup_object.merge(concat_data, prepared_data)
613+
# concat data to json and then back
614+
concat_data = json.dumps(concat_data, indent=2)
615+
concat_data = json.loads(concat_data)
616+
617+
# Create a subdirectory for this rollup
618+
rollup_batch_dir = os.path.join(OUT_BATCHES_DIR, rollup_object_name)
619+
os.makedirs(rollup_batch_dir, exist_ok=True)
620+
# save prepare data and concat data to separate files (as json pretty printed)
621+
# Each rollup has its own folder
622+
file1_name = os.path.join(rollup_batch_dir, f'{counter}_prepare.json')
623+
file2_name = os.path.join(rollup_batch_dir, f'{counter}_xconcat.json')
624+
with open(file1_name, 'w') as f:
625+
f.write(json.dumps(prepared_data, indent=2))
626+
with open(file2_name, 'w') as f:
627+
f.write(json.dumps(concat_data, indent=2))
573628

574629
return concat_data

0 commit comments

Comments
 (0)