Skip to content

Non-atomic MongoDB count queries produce inconsistent collector statistics across UI and telemetry #25648

@etgraylog

Description

@etgraylog

Expected Behavior

When ramping up a large number (+1000) of Collectors simultaneously, the number of Offline Collector Instances should always be equal to or greater than zero, never less than zero. Additionally, the telemetry snapshot should always satisfy online_collectors ≤ total_collectors.

Current Behavior

When ramping up a large number (+1000) of Collectors simultaneously, the number of Offline Collector Instances can at times appear as a negative number due to a race-condition between two separate non-atomic MongoDB countDocuments() calls.

Image

Race-condition locations:

1. — CollectorInstancesResource.java:188-201 (global stats shown in the overview header):

final long totalInstances = collectorInstanceService.count();       // Query 1 — snapshot at T1
final long onlineInstances = collectorInstanceService.countOnline(  // Query 2 — snapshot at T2
        Instant.now().minus(getOfflineThreshold()));
return new CollectorStatsResponse(
        totalInstances,
        onlineInstances,
        totalInstances - onlineInstances,   // ← can be negative
        ...);

2. — FleetResource.java:174-183 (per-fleet stats on the fleet detail page):

final long totalInstances = instanceService.countByFleet(fleetId);          // Query 1 — T1
final long onlineInstances = instanceService.countOnlineByFleet(fleetId,    // Query 2 — T2
        Instant.now().minus(getOfflineThreshold()));
return new FleetStatsResponse(totalInstances, onlineInstances,
        totalInstances - onlineInstances, totalSources);  // ← can be negative

3. — CollectorMetricsSupplier.java (telemetry event emitted to the metrics pipeline):

entry("total_collectors",  collectorInstanceService.count()),                                                               // Query 1 — snapshot at T1
entry("online_collectors", collectorInstanceService.countOnline(Instant.now(clock).minus(collectorsConfig.collectorOfflineThreshold())))  // Query 2 — snapshot at T2

Note on severity: Locations 1 and 2 produce a user-visible negative offline_instances value in the UI. Location 3 does not compute a subtraction and so produces no visible error — instead it silently records a logically impossible telemetry snapshot where
online_collectors > total_collectors, corrupting the metric data without any signal to the user.

How can the number of Offline Collector Instances appear as -2 during a scaling burst?

  1. Query 1 runs at T1 → totalInstances = 1298
  2. 2 more collectors enroll between T1 and T2 (writing last_seen ≈ now)
  3. Query 2 runs at T2 → onlineInstances = 1300 (the 2 new enrollees are within the threshold)
  4. offlineInstances = 1298 − 1300 = −2

The faster new collectors are enrolling, the larger the window for this gap. At 1300 collectors ramping up all at once, this behavior seems possible to occur rather frequently.

Note: Why bulkStats() is safe:

FleetResource.bulkStats() correctly uses countByFleetGrouped(), which runs a single MongoDB aggregation pipeline that computes both total and online atomically from the same collection snapshot:

Aggregates.group("$fleet_id",
Accumulators.sum("total", 1L),               // always total ≥ online
Accumulators.sum("online", $cond(lastSeen >= threshold, 1L, 0L))
)

Within a single aggregation stage, online is strictly a subset of total, so the subtraction is always ≥ 0.

Possible Solution

Add a global-count variant of countByFleetGrouped to CollectorInstanceService, and a per-fleet single-aggregation method, then use them in all three affected locations:

  1. CollectorInstanceService.java — add two methods:
/** Atomically returns [total, online] across all instances. */
public long[] countTotalAndOnline(Instant onlineThreshold) {
    final var pipeline = List.of(
            Aggregates.group(null,  // null = no grouping key → global
                    Accumulators.sum("total", 1L),
                    Accumulators.sum("online",
                            new Document("$cond", List.of(
                                    new Document("$gte", List.of("$" + FIELD_LAST_SEEN, Date.from(onlineThreshold))),
                                    1L, 0L)))
            )
    );
    final long[] result = {0L, 0L};
    collection.aggregate(pipeline, Document.class)
            .forEach(doc -> {
                result[0] = ((Number) doc.get("total")).longValue();
                result[1] = ((Number) doc.get("online")).longValue();
            });
    return result;
}

/** Atomically returns [total, online] for a single fleet. */
public long[] countTotalAndOnlineByFleet(String fleetId, Instant onlineThreshold) {
    final var pipeline = List.of(
            Aggregates.match(Filters.eq(FIELD_FLEET_ID, fleetId)),
            Aggregates.group(null,
                    Accumulators.sum("total", 1L),
                    Accumulators.sum("online",
                            new Document("$cond", List.of(
                                    new Document("$gte", List.of("$" + FIELD_LAST_SEEN, Date.from(onlineThreshold))),
                                    1L, 0L)))
            )
    );
    final long[] result = {0L, 0L};
    collection.aggregate(pipeline, Document.class)
            .forEach(doc -> {
                result[0] = ((Number) doc.get("total")).longValue();
                result[1] = ((Number) doc.get("online")).longValue();
            });
    return result;
}
  1. CollectorInstancesResource.java && FleetResource.java — use the new atomic methods:
CollectorInstancesResource.java:188-201:
public CollectorStatsResponse stats() {
    final Instant onlineThreshold = Instant.now().minus(getOfflineThreshold());
    final long[] counts = collectorInstanceService.countTotalAndOnline(onlineThreshold);
    return new CollectorStatsResponse(
            counts[0],
            counts[1],
            counts[0] - counts[1],   // guaranteed ≥ 0
            fleetService.count(),
            sourceService.count());
}

FleetResource.java:174-183:
public FleetStatsResponse stats(@PathParam("fleetId") String fleetId) {
    checkPermission(CollectorsPermissions.FLEET_READ, fleetId);
    if (fleetService.get(fleetId).isEmpty()) {
        throw new NotFoundException("Fleet " + fleetId + " not found");
    }
    final Instant onlineThreshold = Instant.now().minus(getOfflineThreshold());
    final long[] counts = instanceService.countTotalAndOnlineByFleet(fleetId, onlineThreshold);
    final long totalSources = sourceService.countByFleet(fleetId);
    return new FleetStatsResponse(counts[0], counts[1], counts[0] - counts[1], totalSources);
}
  1. CollectorMetricsSupplier.java — replace the two separate count calls with the same atomic countTotalAndOnline():
final Instant onlineThreshold = Instant.now(clock).minus(collectorsConfig.collectorOfflineThreshold());
final long[] collectorCounts = collectorInstanceService.countTotalAndOnline(onlineThreshold);

return Optional.of(TelemetryEvent.of(Map.ofEntries(
        entry("transactions_last_day", fleetTransactionLogService.countMarkersSince(Instant.now(clock).minus(1, ChronoUnit.DAYS))),
        entry("total_collectors",  collectorCounts[0]),  // guaranteed: total ≥ online
        entry("online_collectors", collectorCounts[1]),
        ...
)));

The aggregation-based approach mirrors what countByFleetGrouped already does and eliminates the TOCTOU (Time-Of-Check To Time-Of-Use) window between the two count queries across all three callsites.

Steps to Reproduce (for bugs)

  1. Set the offline threshold high (e.g. 1 hour) so no collectors age out during the test — you want online = total at steady state, making any gap immediately visible as a negative value.
  2. Start a background loop enrolling new collector instances as fast as possible (e.g. a bash loop hitting the enrollment endpoint with unique instance UIDs in parallel).
  3. While the loop is running, repeatedly poll GET /api/collectors/stats (or /api/collectors/fleets/{id}/stats).
  4. Look for offline_instances < 0 in the JSON response.

Context

This issue was observed during Graylog Collector v2 benchmarks.

Your Environment

  • Graylog Version: 7.1.0 Beta 1
  • Java Version: Java v21
  • OpenSearch Version: 2.19.4
  • MongoDB Version: 8.0.20
  • Operating System: Ubuntu 24.04.4 LTS
  • Browser version: Brave 1.89.132

Checklist

[] This issue fix need to be backported.
[] Does this issue have security implications?

Metadata

Metadata

Assignees

Type

No fields configured for Bug.

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions