-
Notifications
You must be signed in to change notification settings - Fork 23
[POC] [WIP] [After 2.7] Add batch processing support for collectors with configurable gather intervals and batch size #379
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
cshiels-ie
wants to merge
3
commits into
ansible:devel
Choose a base branch
from
cshiels-ie:BatchCollector
base: devel
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+885
−270
Open
Changes from all commits
Commits
Show all changes
3 commits
Select commit
Hold shift + click to select a range
bddc46b
Add support for batch processing in collectors with new environment v…
cshiels-ie be77f55
Enhance metrics utility with validation and testing for environment v…
cshiels-ie f64792b
Enhance collectors with detailed docstrings for improved clarity
cshiels-ie File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
159 changes: 90 additions & 69 deletions
159
metrics_utility/library/collectors/controller/job_host_summary.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,78 +1,99 @@ | ||
| from ..util import DataframeOutput, collector, date_where, ensure_functions | ||
| from ..util import DataframeOutput, collector, date_where, ensure_functions, get_batch_size | ||
|
|
||
|
|
||
| @collector | ||
| def job_host_summary(*, db=None, since=None, until=None, output=DataframeOutput()): | ||
| where = date_where('main_jobhostsummary.modified', since, until) | ||
| """Collect job-host summary rows from the Controller DB for the given time window. | ||
|
|
||
| # TODO: controler needs to have an index on main_jobhostsummary.modified | ||
| query = f""" | ||
| WITH | ||
| filtered_hosts AS ( | ||
| SELECT DISTINCT main_jobhostsummary.host_id | ||
| FROM main_jobhostsummary | ||
| WHERE {where} | ||
| ), | ||
| hosts_variables AS ( | ||
| SELECT | ||
| filtered_hosts.host_id, | ||
| CASE | ||
| WHEN (metrics_utility_is_valid_json(main_host.variables)) | ||
| THEN main_host.variables::jsonb->>'ansible_host' | ||
| ELSE metrics_utility_parse_yaml_field(main_host.variables, 'ansible_host' ) | ||
| END AS ansible_host_variable, | ||
| CASE | ||
| WHEN (metrics_utility_is_valid_json(main_host.variables)) | ||
| THEN main_host.variables::jsonb->>'ansible_connection' | ||
| ELSE metrics_utility_parse_yaml_field(main_host.variables, 'ansible_connection' ) | ||
| END AS ansible_connection_variable | ||
| FROM filtered_hosts | ||
| LEFT JOIN main_host ON main_host.id = filtered_hosts.host_id | ||
| ) | ||
| SELECT | ||
| main_jobhostsummary.id, | ||
| main_jobhostsummary.created, | ||
| main_jobhostsummary.modified, | ||
| main_jobhostsummary.host_name, | ||
| main_jobhostsummary.host_id as host_remote_id, | ||
| hosts_variables.ansible_host_variable, | ||
| hosts_variables.ansible_connection_variable, | ||
| main_jobhostsummary.changed, | ||
| main_jobhostsummary.dark, | ||
| main_jobhostsummary.failures, | ||
| main_jobhostsummary.ok, | ||
| main_jobhostsummary.processed, | ||
| main_jobhostsummary.skipped, | ||
| main_jobhostsummary.failed, | ||
| main_jobhostsummary.ignored, | ||
| main_jobhostsummary.rescued, | ||
| main_unifiedjob.created AS job_created, | ||
| main_jobhostsummary.job_id AS job_remote_id, | ||
| main_unifiedjob.unified_job_template_id AS job_template_remote_id, | ||
| main_unifiedjob.name AS job_template_name, | ||
| main_inventory.id AS inventory_remote_id, | ||
| main_inventory.name AS inventory_name, | ||
| main_organization.id AS organization_remote_id, | ||
| main_organization.name AS organization_name, | ||
| main_unifiedjobtemplate_project.id AS project_remote_id, | ||
| main_unifiedjobtemplate_project.name AS project_name | ||
| FROM main_jobhostsummary | ||
| -- connect to main_job, that has connections into inventory and project | ||
| LEFT JOIN main_job ON main_jobhostsummary.job_id = main_job.unifiedjob_ptr_id | ||
| -- get project name from project_options | ||
| LEFT JOIN main_unifiedjobtemplate AS main_unifiedjobtemplate_project ON main_unifiedjobtemplate_project.id = main_job.project_id | ||
| -- get inventory name from main_inventory | ||
| LEFT JOIN main_inventory ON main_inventory.id = main_job.inventory_id | ||
| -- get job name from main_unifiedjob | ||
| LEFT JOIN main_unifiedjob ON main_unifiedjob.id = main_jobhostsummary.job_id | ||
| -- get organization name from main_organization | ||
| LEFT JOIN main_organization ON main_organization.id = main_unifiedjob.organization_id | ||
| -- get variables from precomputed hosts_variables | ||
| LEFT JOIN hosts_variables ON hosts_variables.host_id = main_jobhostsummary.host_id | ||
| WHERE {where} | ||
| ORDER BY main_jobhostsummary.modified ASC | ||
| Joins main_jobhostsummary with host variables, job metadata, inventory, and | ||
| organization. When METRICS_UTILITY_GATHER_BATCH_SIZE is set, executes the query | ||
| in ID-range batches so the ID filter is pushed into the filtered_hosts CTE and the | ||
| final WHERE clause, keeping each batch cheap. | ||
| """ | ||
| where = date_where('main_jobhostsummary.modified', since, until) | ||
|
|
||
| # ensure_functions writes to DB, cannot be used in service (readonly DB) | ||
| ensure_functions(db) | ||
| return output.sql(db, query) | ||
|
|
||
| def build_query(batch_filter='TRUE'): | ||
| # TODO: controller needs to have an index on main_jobhostsummary.modified | ||
|
Check warning on line 19 in metrics_utility/library/collectors/controller/job_host_summary.py
|
||
| return f""" | ||
| WITH | ||
| filtered_hosts AS ( | ||
| SELECT DISTINCT main_jobhostsummary.host_id | ||
| FROM main_jobhostsummary | ||
| WHERE {where} AND ({batch_filter}) | ||
| ), | ||
| hosts_variables AS ( | ||
| SELECT | ||
| filtered_hosts.host_id, | ||
| CASE | ||
| WHEN (metrics_utility_is_valid_json(main_host.variables)) | ||
| THEN main_host.variables::jsonb->>'ansible_host' | ||
| ELSE metrics_utility_parse_yaml_field(main_host.variables, 'ansible_host' ) | ||
| END AS ansible_host_variable, | ||
| CASE | ||
| WHEN (metrics_utility_is_valid_json(main_host.variables)) | ||
| THEN main_host.variables::jsonb->>'ansible_connection' | ||
| ELSE metrics_utility_parse_yaml_field(main_host.variables, 'ansible_connection' ) | ||
| END AS ansible_connection_variable | ||
| FROM filtered_hosts | ||
| LEFT JOIN main_host ON main_host.id = filtered_hosts.host_id | ||
| ) | ||
| SELECT | ||
| main_jobhostsummary.id, | ||
| main_jobhostsummary.created, | ||
| main_jobhostsummary.modified, | ||
| main_jobhostsummary.host_name, | ||
| main_jobhostsummary.host_id as host_remote_id, | ||
| hosts_variables.ansible_host_variable, | ||
| hosts_variables.ansible_connection_variable, | ||
| main_jobhostsummary.changed, | ||
| main_jobhostsummary.dark, | ||
| main_jobhostsummary.failures, | ||
| main_jobhostsummary.ok, | ||
| main_jobhostsummary.processed, | ||
| main_jobhostsummary.skipped, | ||
| main_jobhostsummary.failed, | ||
| main_jobhostsummary.ignored, | ||
| main_jobhostsummary.rescued, | ||
| main_unifiedjob.created AS job_created, | ||
| main_jobhostsummary.job_id AS job_remote_id, | ||
| main_unifiedjob.unified_job_template_id AS job_template_remote_id, | ||
| main_unifiedjob.name AS job_template_name, | ||
| main_inventory.id AS inventory_remote_id, | ||
| main_inventory.name AS inventory_name, | ||
| main_organization.id AS organization_remote_id, | ||
| main_organization.name AS organization_name, | ||
| main_unifiedjobtemplate_project.id AS project_remote_id, | ||
| main_unifiedjobtemplate_project.name AS project_name | ||
| FROM main_jobhostsummary | ||
| -- connect to main_job, that has connections into inventory and project | ||
| LEFT JOIN main_job ON main_jobhostsummary.job_id = main_job.unifiedjob_ptr_id | ||
| -- get project name from project_options | ||
| LEFT JOIN main_unifiedjobtemplate AS main_unifiedjobtemplate_project ON main_unifiedjobtemplate_project.id = main_job.project_id | ||
| -- get inventory name from main_inventory | ||
| LEFT JOIN main_inventory ON main_inventory.id = main_job.inventory_id | ||
| -- get job name from main_unifiedjob | ||
| LEFT JOIN main_unifiedjob ON main_unifiedjob.id = main_jobhostsummary.job_id | ||
| -- get organization name from main_organization | ||
| LEFT JOIN main_organization ON main_organization.id = main_unifiedjob.organization_id | ||
| -- get variables from precomputed hosts_variables | ||
| LEFT JOIN hosts_variables ON hosts_variables.host_id = main_jobhostsummary.host_id | ||
| WHERE {where} AND ({batch_filter}) | ||
| ORDER BY main_jobhostsummary.id ASC | ||
| """ | ||
|
|
||
| batch_size = get_batch_size() | ||
| if batch_size: | ||
| # ID-range batching: filter is pushed into filtered_hosts CTE and the | ||
| # final WHERE so each batch only scans its share of rows. | ||
| min_max_query = f'SELECT MIN(id), MAX(id) FROM main_jobhostsummary WHERE {where}' | ||
| return output.batch_sql( | ||
| db, | ||
| query_fn=lambda s, e: build_query(f'main_jobhostsummary.id >= {s} AND main_jobhostsummary.id < {e}'), | ||
| min_max_query=min_max_query, | ||
| batch_size=batch_size, | ||
| ) | ||
|
|
||
| return output.sql(db, build_query()) | ||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.