Skip to content

[POC] [WIP] [After 2.7] Add batch processing support for collectors with configurable gather intervals and batch size#379

Open
cshiels-ie wants to merge 3 commits intoansible:develfrom
cshiels-ie:BatchCollector
Open

[POC] [WIP] [After 2.7] Add batch processing support for collectors with configurable gather intervals and batch size#379
cshiels-ie wants to merge 3 commits intoansible:develfrom
cshiels-ie:BatchCollector

Conversation

@cshiels-ie
Copy link
Copy Markdown
Contributor

@cshiels-ie cshiels-ie commented Apr 27, 2026

[AAP-XXXXX]
Dont forget to link back issue to PR.

Description

Addresses scale-lab findings (AAP-42304 / AAP-67301) where the job_host_summary COPY query takes 13–17 minutes on large deployments (~500k hosts, 215k+ jobs accumulated over months), and main_jobevent can have millions of rows within a single hour.

What is being changed?
Two new environment variables give operators per-deployment control over gather query size:

  • METRICS_UTILITY_GATHER_INTERVAL_HOURS (default 24): splits the daily_slicing gather window into smaller time slices without crossing calendar-day boundaries. Each slice becomes its own tarball via the existing ship_immediately() mechanism — no downstream changes to report building are needed.
  • METRICS_UTILITY_GATHER_BATCH_SIZE (default 0 = disabled): applies ID-range keyset pagination to large COPY queries. The ID filter is pushed inside the original query's WHERE clause (and into any CTE that scans the same table) so each batch pays only for its own rows — no OFFSET/LIMIT overhead.

Why is this change needed?
Background analytics queries should not run for 10+ minutes. The batching approach makes incremental progress and keeps individual queries under 2–3 minutes even on high-scale deployments.

How does this change address the issue?

  • daily_slicing is refactored to emit configurable sub-day slices while preserving day-aligned tarball storage.
  • Six high-volume collectors (job_host_summary, main_jobevent, job_host_summary_service, main_host_daily, main_indirectmanagednodeaudit, unified_jobs) are updated to support batch_sql via ID-range pagination.
  • DataframeOutput.batch_sql() is added so the same batching path works when metrics-utility is used as a library (returns a single concatenated DataFrame).
  • tempfile.mktemp replaced with mkdtemp (Sonar security fix).
  • get_batch_size() and get_gather_interval_hours() include input validation and consistent logging.

Testing

Prerequisites

  • Python 3.12+, uv installed
  • Postgres + MinIO running: make compose
  • Or against a real Controller DB: set METRICS_UTILITY_DB_* env vars

Steps to Test

  1. Run the unit test suite: make test
  2. Verify time-window batching: set METRICS_UTILITY_GATHER_INTERVAL_HOURS=4 and run gather_automation_controller_billing_data — confirm 6 tarballs are produced per 24-hour window instead of 1.
  3. Verify row-count batching: set METRICS_UTILITY_GATHER_BATCH_SIZE=100000 and run gather — confirm multiple COPY queries execute per time slice and total row counts match an unpatched run.
  4. Run build_report on the batched output — confirm XLSX row counts match a baseline.
  5. Verify backward compatibility: with no env vars set, confirm output is identical to the previous behaviour.
  6. On scale-lab data with METRICS_UTILITY_GATHER_INTERVAL_HOURS=4 + METRICS_UTILITY_GATHER_BATCH_SIZE=100000, confirm no individual query exceeds ~3 minutes.

Expected Results

  • make test: 814 tests pass, 49 fail (pre-existing DB/S3 infrastructure failures only).
  • Batched output produces the same final XLSX as non-batched.
  • No individual DB query exceeds 2–3 minutes on scale-lab data.

Required Actions

  • Requires documentation updates
  • Requires downstream repository changes
  • Requires infrastructure/deployment changes
  • Requires coordination with other teams
  • Blocked by PR/MR: #XXX

Self-Review Checklist

Code Quality

  • Code is properly linted and formatted.
  • All tests (existing and new) pass successfully.
  • Tests are added or updated as needed.
  • Code includes relevant comments for complex sections.
  • Changes are reviewed and approved by at least two team members.
  • Documentation is updated where applicable.

Notes for Reviewers

Both env vars default to their existing behaviour (24 h interval, no row batching), so no configuration changes are required on existing deployments. Operators on high-scale deployments should start with METRICS_UTILITY_GATHER_INTERVAL_HOURS=4 and add METRICS_UTILITY_GATHER_BATCH_SIZE=100000 if sub-hour windows still produce large queries.

credentials_service and main_jobevent_service intentionally do not support METRICS_UTILITY_GATHER_BATCH_SIZE: the former is a SELECT DISTINCT aggregation with no useful primary key, and the latter already uses custom Python-level partition pruning with explicit job IDs and hourly ranges.


Note

Medium Risk
Touches core gather slicing and several high-volume SQL collectors; batching/time-slicing changes could alter query boundaries and output chunking if misconfigured, though defaults preserve existing behavior.

Overview
Adds two operator-tunable knobs to reduce gather query size: sub-day time slicing via METRICS_UTILITY_GATHER_INTERVAL_HOURS (refactors daily_slicing to emit multiple slices per day without crossing midnight) and ID-range COPY batching via METRICS_UTILITY_GATHER_BATCH_SIZE.

Implements batch_sql support in collector outputs (CollectionOutput streaming CSV and DataframeOutput concatenating DataFrames) and updates several large Controller collectors (job_host_summary, main_jobevent, job_host_summary_service, main_host_daily, main_indirectmanagednodeaudit, unified_jobs) to optionally run in primary-key batches with corresponding MIN/MAX(id) probes.

Also replaces insecure tempfile.mktemp usage with mkdtemp, documents the new env vars, and adds unit tests covering interval validation, daily slicing behavior, and batch execution/header handling.

Reviewed by Cursor Bugbot for commit f64792b. Bugbot is set up for automated code reviews on this repo. Configure here.

…ariables

- Introduced `METRICS_UTILITY_GATHER_BATCH_SIZE` and `METRICS_UTILITY_GATHER_INTERVAL_HOURS` to control batch size and interval for data gathering.
- Updated collectors to utilize these new settings for improved performance and scalability.
- Refactored SQL queries in multiple collectors to support ID-range batching, enhancing data retrieval efficiency.
- Enhanced documentation to reflect the new environment variables and their usage in collectors.
@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Apr 27, 2026

📝 Walkthrough

Walkthrough

The changes introduce batch-aware data collection to the metrics utility. New environment variables configure gather intervals and batch sizes. Multiple collectors are refactored to conditionally execute queries in ID-range batches via keyset pagination, and core utilities are added to support batch SQL execution with DataFrame and CSV output formats.

Changes

Cohort / File(s) Summary
Configuration Documentation
docs/environment.md
Added two new environment variables: METRICS_UTILITY_GATHER_BATCH_SIZE for controlling COPY query row batching via keyset pagination, and METRICS_UTILITY_GATHER_INTERVAL_HOURS for splitting daily gather ranges into smaller time windows for daily_slicing collectors.
Core Utility Functions
metrics_utility/base/utils.py
New get_gather_interval_hours() function reads the gather interval from environment with 24-hour default and includes exception handling for invalid values.
Batch Processing Infrastructure
metrics_utility/library/collectors/util.py
Added get_batch_size() to read batch configuration, and new batch_sql() methods on DataframeOutput and CollectionOutput classes that fetch min/max ID bounds, paginate by ranges, and concatenate results while handling headers appropriately for CSV outputs.
Daily Slicing Interval Refactor
metrics_utility/automation_controller_billing/collectors.py
Refactored daily_slicing to derive slicing steps from configurable gather interval instead of fixed 1-day increments, removing special-case first-day alignment logic.
Batch-Enabled Collectors
metrics_utility/library/collectors/controller/job_host_summary.py, job_host_summary_service.py, main_host.py, main_indirectmanagednodeaudit.py, main_jobevent.py, unified_jobs.py
Each collector now builds queries with injectable batch filters and conditionally executes via batch_sql() when get_batch_size() returns a value. Batching computes ID bounds within time windows and iterates over ID ranges; otherwise falls back to single query execution.

Sequence Diagram(s)

sequenceDiagram
    participant Collector
    participant Utils
    participant Database
    participant Output

    Collector->>Utils: get_batch_size()
    Utils-->>Collector: batch_size (or 0)
    
    alt Batching Enabled
        Collector->>Database: SELECT MIN(id), MAX(id) WHERE time_filter
        Database-->>Collector: min_id, max_id
        Collector->>Output: batch_sql(query_fn, min_max_query, batch_size)
        
        loop For each ID range [start, end)
            Output->>Database: query_fn(batch_start, batch_end)
            Database-->>Output: rows with id in range
            Output->>Output: Accumulate/write batch
        end
        
        Output-->>Collector: Combined result
    else Batching Disabled
        Collector->>Database: Execute full query (batch_filter='TRUE')
        Database-->>Collector: All rows
        Output-->>Collector: Result
    end
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~30 minutes

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 26.92% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.
Description check ✅ Passed The PR description is comprehensive and well-structured, covering all key aspects: clear explanation of what is being changed, why it is needed, and how it addresses the issue.
Title check ✅ Passed The title accurately describes the main objective of the pull request: adding batch processing support to collectors with configurable gather intervals and batch size.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

🧹 Nitpick comments (2)
metrics_utility/library/collectors/util.py (1)

184-195: Defensive guard against batch_size <= 0 in the loop.

Even with get_batch_size() clamped, this helper is reachable from any caller that constructs its own batch_size. A non-positive value here would loop forever. A cheap guard at the top of the function (or assert batch_size > 0) keeps the contract explicit.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@metrics_utility/library/collectors/util.py` around lines 184 - 195, The loop
can hang if batch_size <= 0; add a defensive guard at the start of the function
that contains this loop (check batch_size) — e.g., assert batch_size > 0 or
raise ValueError("batch_size must be > 0") — so callers that pass a non-positive
batch_size (used with variables batch_start, batch_end, max_id and query_fn)
fail fast and avoid an infinite loop.
metrics_utility/library/collectors/controller/job_host_summary_service.py (1)

8-57: Optional: narrow filtered_jobs to the batch's job_ids to avoid re-scanning the full time window per batch.

The filtered_jobs CTE re-scans every finished-in-window job on every batch (the comment at lines 61–62 acknowledges this). For large time windows with many jobs this is O(num_batches × jobs_in_window) work that can dominate. Since the per-batch mjs.id range maps to a small subset of job_ids, you can correlate the two by adding the batch filter at the JOIN level, e.g.:

Proposed tweak
-            FROM filtered_jobs fj
-            JOIN main_jobhostsummary mjs ON mjs.job_id = fj.id
+            FROM main_jobhostsummary mjs
+            JOIN filtered_jobs fj ON fj.id = mjs.job_id
             ...
-            WHERE ({batch_filter})
+            WHERE ({batch_filter})

Postgres can already pick this plan, but if EXPLAIN shows the CTE materializing first, restructuring as a subquery (or LATERAL) keyed off the batched mjs rows will avoid the repeated scan. Skip if profiling shows it's not a hot spot.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@metrics_utility/library/collectors/controller/job_host_summary_service.py`
around lines 8 - 57, The filtered_jobs CTE in build_query currently scans all
finished-in-window jobs every batch; restrict it to only the job_ids that appear
in the current batch (the rows from main_jobhostsummary mjs filtered by
batch_filter) or replace the CTE with a subquery/LATERAL join so Postgres can
push the batch_filter into the job selection. Concretely: modify the
filtered_jobs CTE (or remove it) so it selects mu.id FROM main_unifiedjob mu
JOIN (SELECT DISTINCT job_id FROM main_jobhostsummary WHERE ({batch_filter}))
bjs ON mu.id = bjs.job_id AND mu.finished IS NOT NULL, or rewrite the FROM to
JOIN LATERAL (SELECT mu.id FROM main_unifiedjob mu WHERE mu.id = mjs.job_id AND
mu.finished IS NOT NULL) so the planner avoids materializing the full window;
update references to filtered_jobs accordingly.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@docs/environment.md`:
- Line 98: The docs entry for METRICS_UTILITY_GATHER_BATCH_SIZE lists only
job_host_summary and main_jobevent but is missing other collectors updated to
use batching; update the sentence to enumerate all collectors that now call
output.batch_sql(...) — unified_jobs, main_indirectmanagednodeaudit,
main_host_daily, main_jobevent, and job_host_summary — so operators understand
which collectors will be limited by the batch size and how to combine with
METRICS_UTILITY_GATHER_INTERVAL_HOURS for large deployments.

In `@metrics_utility/base/utils.py`:
- Around line 21-31: get_gather_interval_hours currently allows 0 or negative
values which will stall daily_slicing; after parsing the env var in
get_gather_interval_hours() validate that the integer is >= 1, and if not, log a
clear error via logger.error and raise a ValueError (or clamp to 1 if you prefer
a silent recovery) so callers (e.g., daily_slicing in
metrics_utility/automation_controller_billing/collectors.py) never receive
non-positive intervals; keep the existing default of 24 when the env var is
absent or invalid.

In `@metrics_utility/library/collectors/controller/job_host_summary_service.py`:
- Around line 56-74: The current batched path uses mjs.id ranges (min_max_query
on mjs.id and build_query predicate 'mjs.id >= s AND mjs.id < e') which
preserves ORDER BY mu.finished only within a batch but not globally; either
confirm no consumer depends on global mu.finished ordering, or change batching
to range by the finished timestamp instead: replace min_max_query to SELECT
MIN(mu.finished), MAX(mu.finished) and change batch_sql's query_fn to
build_query(f"mu.finished >= '{s}' AND mu.finished < '{e}'") (and keep ORDER BY
mu.finished ASC in build_query) so concatenating batches preserves global
mu.finished order, referencing batch_size, min_max_query, build_query,
output.batch_sql, mjs.id, and mu.finished.

In `@metrics_utility/library/collectors/controller/job_host_summary.py`:
- Around line 77-90: The batching path currently batches by id (min_max_query on
main_jobhostsummary.id and build_query predicates like "main_jobhostsummary.id
>= {s} AND ...") which causes final output to be ordered by id rather than the
global "ORDER BY main_jobhostsummary.modified ASC"; change the batching to
preserve global modified ordering by switching to modified-range batching:
compute min_max_query over main_jobhostsummary.modified (e.g., SELECT
MIN(modified), MAX(modified)), update the lambda passed to output.batch_sql to
build_query with predicates on main_jobhostsummary.modified (e.g.,
"main_jobhostsummary.modified >= {s} AND main_jobhostsummary.modified < {e}"),
and ensure per-batch queries still include "ORDER BY
main_jobhostsummary.modified ASC" (or remove per-batch sort only if you confirm
consumers don’t need global ordering and document that); update references to
batch_size, min_max_query, build_query, and output.batch_sql accordingly.

In
`@metrics_utility/library/collectors/controller/main_indirectmanagednodeaudit.py`:
- Around line 36-49: The batched path for main_indirectmanagednodeaudit slices
by id ranges but the query ORDER BY uses main_indirectmanagednodeaudit.created
ASC, causing global ordering divergence versus the non-batched output; to fix,
change the ordering in the build_query used for both paths to ORDER BY
main_indirectmanagednodeaudit.id ASC (or otherwise make the batch_sql
build_query include an explicit id ASC order) so that batch_sql and output.sql
return results with the same global order; update the build_query definition or
the lambda passed to batch_sql (and ensure min_max_query remains as-is) so both
the batched and non-batched branches use id-based ordering.

In `@metrics_utility/library/collectors/util.py`:
- Around line 9-18: get_batch_size currently returns any int (including
negatives) and swallows parse errors, which lets negative batch_size bypass "if
batch_size" checks and cause infinite loops in callers like batch_sql and
_batch_copy_table_files; change get_batch_size to parse the env var, ensure the
value is a non-negative integer (if parsed value < 0 treat as invalid), and on
any invalid parse or negative value log the error via the project logger (same
style as get_max_gather_period_days / get_gather_interval_hours) and return 0
(disabled) instead of propagating negatives or silently swallowing exceptions.

---

Nitpick comments:
In `@metrics_utility/library/collectors/controller/job_host_summary_service.py`:
- Around line 8-57: The filtered_jobs CTE in build_query currently scans all
finished-in-window jobs every batch; restrict it to only the job_ids that appear
in the current batch (the rows from main_jobhostsummary mjs filtered by
batch_filter) or replace the CTE with a subquery/LATERAL join so Postgres can
push the batch_filter into the job selection. Concretely: modify the
filtered_jobs CTE (or remove it) so it selects mu.id FROM main_unifiedjob mu
JOIN (SELECT DISTINCT job_id FROM main_jobhostsummary WHERE ({batch_filter}))
bjs ON mu.id = bjs.job_id AND mu.finished IS NOT NULL, or rewrite the FROM to
JOIN LATERAL (SELECT mu.id FROM main_unifiedjob mu WHERE mu.id = mjs.job_id AND
mu.finished IS NOT NULL) so the planner avoids materializing the full window;
update references to filtered_jobs accordingly.

In `@metrics_utility/library/collectors/util.py`:
- Around line 184-195: The loop can hang if batch_size <= 0; add a defensive
guard at the start of the function that contains this loop (check batch_size) —
e.g., assert batch_size > 0 or raise ValueError("batch_size must be > 0") — so
callers that pass a non-positive batch_size (used with variables batch_start,
batch_end, max_id and query_fn) fail fast and avoid an infinite loop.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository YAML (base), Organization UI (inherited)

Review profile: CHILL

Plan: Enterprise

Run ID: d6db990e-95ff-4f77-be51-b6089ac9c568

📥 Commits

Reviewing files that changed from the base of the PR and between 828c8c1 and bddc46b.

📒 Files selected for processing (10)
  • docs/environment.md
  • metrics_utility/automation_controller_billing/collectors.py
  • metrics_utility/base/utils.py
  • metrics_utility/library/collectors/controller/job_host_summary.py
  • metrics_utility/library/collectors/controller/job_host_summary_service.py
  • metrics_utility/library/collectors/controller/main_host.py
  • metrics_utility/library/collectors/controller/main_indirectmanagednodeaudit.py
  • metrics_utility/library/collectors/controller/main_jobevent.py
  • metrics_utility/library/collectors/controller/unified_jobs.py
  • metrics_utility/library/collectors/util.py

Comment thread docs/environment.md Outdated
Comment thread metrics_utility/base/utils.py
Comment on lines +56 to +74
ORDER BY mu.finished ASC
"""

batch_size = get_batch_size()
if batch_size:
# ID range from jobhostsummary rows matching the job time window.
# The filtered_jobs CTE remains full (it's small — one row per job).
min_max_query = f"""
SELECT MIN(mjs.id), MAX(mjs.id)
FROM main_jobhostsummary mjs
JOIN main_unifiedjob mu ON mu.id = mjs.job_id
WHERE {jobs_where} AND mu.finished IS NOT NULL
"""
return output.batch_sql(
db,
query_fn=lambda s, e: build_query(f'mjs.id >= {s} AND mjs.id < {e}'),
min_max_query=min_max_query,
batch_size=batch_size,
)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Same global-order regression as job_host_summary.py.

ORDER BY mu.finished ASC is only enforced within each ID-range batch; across batches the output is concatenated in mjs.id order, not mu.finished order. Since mjs.id is independent of mu.finished (job_id and host-summary ids aren't aligned with finish time), the global ordering is effectively arbitrary in batched mode. Please confirm consumers don't rely on finished ordering, or batch by a key that matches the desired order.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@metrics_utility/library/collectors/controller/job_host_summary_service.py`
around lines 56 - 74, The current batched path uses mjs.id ranges (min_max_query
on mjs.id and build_query predicate 'mjs.id >= s AND mjs.id < e') which
preserves ORDER BY mu.finished only within a batch but not globally; either
confirm no consumer depends on global mu.finished ordering, or change batching
to range by the finished timestamp instead: replace min_max_query to SELECT
MIN(mu.finished), MAX(mu.finished) and change batch_sql's query_fn to
build_query(f"mu.finished >= '{s}' AND mu.finished < '{e}'") (and keep ORDER BY
mu.finished ASC in build_query) so concatenating batches preserves global
mu.finished order, referencing batch_size, min_max_query, build_query,
output.batch_sql, mjs.id, and mu.finished.

Comment on lines +77 to +90
ORDER BY main_jobhostsummary.modified ASC
"""

batch_size = get_batch_size()
if batch_size:
# ID-range batching: filter is pushed into filtered_hosts CTE and the
# final WHERE so each batch only scans its share of rows.
min_max_query = f'SELECT MIN(id), MAX(id) FROM main_jobhostsummary WHERE {where}'
return output.batch_sql(
db,
query_fn=lambda s, e: build_query(f'main_jobhostsummary.id >= {s} AND main_jobhostsummary.id < {e}'),
min_max_query=min_max_query,
batch_size=batch_size,
)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Find consumers of the job_host_summary collector output to assess ordering dependence.
rg -nP -C3 '\bjob_host_summary\b'
rg -nP -C3 'modified.*ASC|sort.*modified|sorted.*modified'

Repository: ansible/metrics-utility

Length of output: 50380


🏁 Script executed:

# First, let's examine the actual batch_sql function to understand how it handles ordering
rg -n "def batch_sql" --type py

Repository: ansible/metrics-utility

Length of output: 291


🏁 Script executed:

# Let's look at how workers/3-report-ccspv2.py uses the job_host_summary data
sed -n '20,50p' workers/3-report-ccspv2.py

Repository: ansible/metrics-utility

Length of output: 1185


🏁 Script executed:

# Check if there are any tests that verify ordering behavior or modified field
rg -n "modified.*order|order.*modified" --type py -A 2 -B 2

Repository: ansible/metrics-utility

Length of output: 1600


🏁 Script executed:

# Look at the _batch_copy_table_files function mentioned in the review
rg -n "_batch_copy_table_files" --type py -A 5 -B 2

Repository: ansible/metrics-utility

Length of output: 1533


🏁 Script executed:

# Examine the job_host_summary collector file itself to see the full context
sed -n '60,95p' metrics_utility/library/collectors/controller/job_host_summary.py

Repository: ansible/metrics-utility

Length of output: 1956


🏁 Script executed:

# Get the full _batch_copy_table_files implementation
sed -n '163,210p' metrics_utility/library/collectors/util.py

Repository: ansible/metrics-utility

Length of output: 1727


🏁 Script executed:

# Look at CsvFileSplitter to understand how it concatenates batches
rg -n "class CsvFileSplitter" --type py -A 15

Repository: ansible/metrics-utility

Length of output: 1472


🏁 Script executed:

# Check if there are any integration tests that verify the ordering behavior
rg -n "job_host_summary.*modified|modified.*job_host_summary" --type py -B 3 -A 3

Repository: ansible/metrics-utility

Length of output: 49


🏁 Script executed:

# Verify if the OUTPUT (CSV files) is actually ordered by modified when batching is disabled
sed -n '50,100p' metrics_utility/library/collectors/controller/job_host_summary.py

Repository: ansible/metrics-utility

Length of output: 2526


🏁 Script executed:

# Check what happens with the extracted job_host_summary in the downstream report building
rg -n "ReportCCSPv2" --type py -A 10 -B 5

Repository: ansible/metrics-utility

Length of output: 50379


🏁 Script executed:

# Look at what ReportCCSPv2 does with job_host_summary dataframe
fd -t f "*.py" -exec grep -l "ReportCCSPv2" {} \;

Repository: ansible/metrics-utility

Length of output: 315


🏁 Script executed:

# Search for any sorting/ordering logic in report processing
rg -n "sort.*modified|sort_values.*modified" --type py

Repository: ansible/metrics-utility

Length of output: 49


🏁 Script executed:

# Check the batch_sql at line 92 in util.py - might be a different implementation
sed -n '92,120p' metrics_utility/library/collectors/util.py

Repository: ansible/metrics-utility

Length of output: 1386


🏁 Script executed:

# Let's verify: does the non-batched path actually return globally ordered-by-modified data?
# Check the sql() method
rg -n "def sql\(" --type py -A 10

Repository: ansible/metrics-utility

Length of output: 2123


Global ORDER BY modified is not preserved when batching is enabled.

When batching is enabled, each ID-range batch is executed with ORDER BY modified ASC, so rows are sorted within each batch. However, since batches are processed sequentially in ID order (from MIN(id) to MAX(id)), the final concatenated output is globally ordered by ID, not by modified time. This differs from the non-batched path, which returns a single globally ordered result.

Since id and modified can diverge (rows get updated), this is a silent behavior change. If downstream consumers rely on global modified ordering (e.g., for incremental processing or time-series analysis), this is a regression. If they don't, the per-batch ORDER BY adds unnecessary sort cost and could be dropped.

Recommend: (1) verify whether consumers depend on global modified order, (2) if yes, change batching to iterate in modified order or document the change, or (3) if no, remove the per-batch sort.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@metrics_utility/library/collectors/controller/job_host_summary.py` around
lines 77 - 90, The batching path currently batches by id (min_max_query on
main_jobhostsummary.id and build_query predicates like "main_jobhostsummary.id
>= {s} AND ...") which causes final output to be ordered by id rather than the
global "ORDER BY main_jobhostsummary.modified ASC"; change the batching to
preserve global modified ordering by switching to modified-range batching:
compute min_max_query over main_jobhostsummary.modified (e.g., SELECT
MIN(modified), MAX(modified)), update the lambda passed to output.batch_sql to
build_query with predicates on main_jobhostsummary.modified (e.g.,
"main_jobhostsummary.modified >= {s} AND main_jobhostsummary.modified < {e}"),
and ensure per-batch queries still include "ORDER BY
main_jobhostsummary.modified ASC" (or remove per-batch sort only if you confirm
consumers don’t need global ordering and document that); update references to
batch_size, min_max_query, build_query, and output.batch_sql accordingly.

Comment thread metrics_utility/library/collectors/controller/main_indirectmanagednodeaudit.py Outdated
Comment thread metrics_utility/library/collectors/util.py
Comment thread metrics_utility/library/collectors/controller/main_jobevent.py
Comment thread metrics_utility/automation_controller_billing/collectors.py
@cshiels-ie cshiels-ie changed the title Add support for batch processing in collectors with new environment v… [POC] Add support for batch processing in collectors with new environment v… Apr 27, 2026
…ariables

- Updated `METRICS_UTILITY_GATHER_INTERVAL_HOURS` to enforce a minimum value of 1, preventing invalid configurations.
- Improved `METRICS_UTILITY_GATHER_BATCH_SIZE` to return 0 for negative or non-integer values, ensuring robust error handling.
- Refactored SQL queries in collectors to maintain consistency with new batch processing logic.
- Added comprehensive unit tests for both `get_gather_interval_hours` and `get_batch_size` functions, validating expected behavior and error conditions.
- Introduced new tests for daily slicing functionality, ensuring correct behavior with configurable intervals.
@cshiels-ie cshiels-ie changed the title [POC] Add support for batch processing in collectors with new environment v… Add batch processing support for collectors with configurable gather intervals and batch size Apr 27, 2026
- Added comprehensive docstrings to various collector functions, explaining their purpose, data handling, and batch processing capabilities.
- Updated SQL queries in collectors to ensure consistent ordering and efficient data retrieval.
- Improved documentation to reflect the new functionality and usage of environment variables for batch processing.
@sonarqubecloud
Copy link
Copy Markdown

Copy link
Copy Markdown

@cursor cursor Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 2 potential issues.

Fix All in Cursor

Reviewed by Cursor Bugbot for commit f64792b. Configure here.

dfs.append(_copy_table_pandas(db, query_fn(batch_start, batch_end)))
batch_start = batch_end

return pd.concat(dfs, ignore_index=True) if dfs else pd.DataFrame()
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing batch_size validation enables infinite loop

Medium Severity

DataframeOutput.batch_sql lacks the batch_size > 0 validation that _batch_copy_table_files has. If called with batch_size=0, the while batch_start <= max_id loop never advances (batch_end = batch_start + 0), causing an infinite loop. While internal callers guard with if batch_size:, DataframeOutput is the public default output class for library consumers who could call batch_sql directly.

Additional Locations (1)
Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit f64792b. Configure here.

ON jhs.job_id = je.job_id AND jhs.host_name = je.host_name
WHERE {min_max_where}
AND je.event IN ({_JOBEVENT_TYPES_SQL})
"""
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Expensive min_max query negates jobevent batching benefit

Medium Severity

The min_max_query for main_jobevent joins the large partitioned main_jobevent table with main_jobhostsummary without including job_created in the join condition. On a partitioned table, this prevents partition pruning, forcing PostgreSQL to scan all partitions just to determine the ID range. On the scale-lab deployments this PR targets (millions of rows per hour), this single setup query could itself run for many minutes, partially defeating the purpose of batching.

Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit f64792b. Configure here.

@cshiels-ie cshiels-ie changed the title Add batch processing support for collectors with configurable gather intervals and batch size [POC] Add batch processing support for collectors with configurable gather intervals and batch size Apr 27, 2026
@cshiels-ie cshiels-ie changed the title [POC] Add batch processing support for collectors with configurable gather intervals and batch size [POC] [WIP] [After 2.7] Add batch processing support for collectors with configurable gather intervals and batch size Apr 28, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant