diff --git a/graylog2-server/src/main/java/org/graylog2/bindings/PeriodicalBindings.java b/graylog2-server/src/main/java/org/graylog2/bindings/PeriodicalBindings.java
index e470ce92ef18..fad39f0fba80 100644
--- a/graylog2-server/src/main/java/org/graylog2/bindings/PeriodicalBindings.java
+++ b/graylog2-server/src/main/java/org/graylog2/bindings/PeriodicalBindings.java
@@ -36,6 +36,7 @@
import org.graylog2.periodical.LeaderPresenceCheckPeriodical;
import org.graylog2.periodical.NodeMetricPeriodical;
import org.graylog2.periodical.NodePingThread;
+import org.graylog2.periodical.StaleInputRuntimeStateCleanup;
import org.graylog2.periodical.OrphanedTokenCleaner;
import org.graylog2.periodical.SearchVersionCheckPeriodical;
import org.graylog2.periodical.ThrottleStateUpdaterThread;
@@ -74,5 +75,6 @@ protected void configure() {
periodicalBinder.addBinding().to(ExpiredTokenCleaner.class);
periodicalBinder.addBinding().to(OrphanedTokenCleaner.class);
periodicalBinder.addBinding().to(NodeMetricPeriodical.class);
+ periodicalBinder.addBinding().to(StaleInputRuntimeStateCleanup.class);
}
}
diff --git a/graylog2-server/src/main/java/org/graylog2/inputs/InputRuntimeStatusProvider.java b/graylog2-server/src/main/java/org/graylog2/inputs/InputRuntimeStatusProvider.java
index 659e7305ba8a..6d7238fda23c 100644
--- a/graylog2-server/src/main/java/org/graylog2/inputs/InputRuntimeStatusProvider.java
+++ b/graylog2-server/src/main/java/org/graylog2/inputs/InputRuntimeStatusProvider.java
@@ -17,52 +17,28 @@
package org.graylog2.inputs;
import jakarta.inject.Inject;
-import jakarta.inject.Named;
import jakarta.inject.Singleton;
-import org.graylog2.cluster.Node;
-import org.graylog2.cluster.NodeService;
import org.graylog2.database.filtering.ComputedFieldProvider;
+import org.graylog2.inputs.persistence.InputStateService;
import org.graylog2.plugin.IOState;
-import org.graylog2.plugin.system.NodeId;
-import org.graylog2.rest.RemoteInterfaceProvider;
-import org.graylog2.rest.resources.system.inputs.RemoteInputStatesResource;
-import org.graylog2.shared.inputs.InputRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import retrofit2.Response;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
/**
- * Provides filtering support for input runtime status by querying InputRegistry across all cluster nodes.
+ * Provides filtering support for input runtime status by querying MongoDB.
*
- * This provider enables filtering inputs by their actual runtime state (RUNNING, FAILED, STOPPED, etc.)
- * rather than just the desired_state stored in the database. It aggregates runtime status information
- * from all nodes in the cluster to provide a cluster-wide view.
- *
- *
- * Optimizations:
- *
- *
- * - Reads the local node's InputRegistry directly (no HTTP call to itself)
- * - Queries remote nodes in parallel
- * - Caches results for 5 seconds to avoid repeated cluster-wide fetches
- *
+ * Runtime state is persisted to MongoDB by {@link org.graylog2.inputs.InputStateListener},
+ * enabling efficient cluster-wide queries by state without HTTP fan-out.
*/
@Singleton
public class InputRuntimeStatusProvider implements ComputedFieldProvider {
private static final Logger LOG = LoggerFactory.getLogger(InputRuntimeStatusProvider.class);
private static final String FIELD_NAME = "runtime_status";
- private static final long CACHE_TTL_MS = 5_000;
public static final Map> STATUS_GROUPS = Map.of(
"RUNNING", Set.of(IOState.Type.RUNNING),
@@ -78,29 +54,14 @@ public class InputRuntimeStatusProvider implements ComputedFieldProvider {
"FAILED", "Failed"
);
- private final NodeService nodeService;
- private final RemoteInterfaceProvider remoteInterfaceProvider;
- private final InputRegistry inputRegistry;
+ private final InputStateService runtimeStateService;
private final InputService inputService;
- private final NodeId nodeId;
- private final ExecutorService executorService;
-
- private volatile Map> cachedStatuses;
- private volatile long cacheTimestamp = 0;
@Inject
- public InputRuntimeStatusProvider(NodeService nodeService,
- RemoteInterfaceProvider remoteInterfaceProvider,
- InputRegistry inputRegistry,
- InputService inputService,
- NodeId nodeId,
- @Named("proxiedRequestsExecutorService") ExecutorService executorService) {
- this.nodeService = nodeService;
- this.remoteInterfaceProvider = remoteInterfaceProvider;
- this.inputRegistry = inputRegistry;
+ public InputRuntimeStatusProvider(InputStateService runtimeStateService,
+ InputService inputService) {
+ this.runtimeStateService = runtimeStateService;
this.inputService = inputService;
- this.nodeId = nodeId;
- this.executorService = executorService;
}
@Override
@@ -112,27 +73,16 @@ public Set getMatchingIds(String filterValue, String authToken) {
return Set.of();
}
- // For NOT_RUNNING: query desired_state from DB since stopped inputs are absent from the InputRegistry
+ // For NOT_RUNNING: query desired_state from DB since stopped inputs have no runtime state
if ("NOT_RUNNING".equals(key)) {
final Set matching = inputService.findIdsByDesiredState(IOState.Type.STOPPED);
LOG.debug("Found {} inputs with runtime_status group={}", matching.size(), key);
return matching;
}
-
- final Set targetStrings = targetStates.stream()
- .map(IOState.Type::toString)
- .collect(Collectors.toSet());
-
- final Map> allStatuses = getClusterStatuses(authToken);
final Set matching = new HashSet<>();
- for (Map.Entry> entry : allStatuses.entrySet()) {
- for (String status : entry.getValue()) {
- if (targetStrings.contains(status)) {
- matching.add(entry.getKey());
- break;
- }
- }
+ for (IOState.Type type : targetStates) {
+ matching.addAll(runtimeStateService.getByState(type));
}
LOG.debug("Found {} inputs with runtime_status group={}", matching.size(), key);
@@ -143,63 +93,4 @@ public Set getMatchingIds(String filterValue, String authToken) {
public String getFieldName() {
return FIELD_NAME;
}
-
- private Map> getClusterStatuses(String authToken) {
- long now = System.currentTimeMillis();
- if (cachedStatuses != null && (now - cacheTimestamp) < CACHE_TTL_MS) {
- return cachedStatuses;
- }
- synchronized (this) {
- // Double-check after acquiring lock
- if (cachedStatuses != null && (System.currentTimeMillis() - cacheTimestamp) < CACHE_TTL_MS) {
- return cachedStatuses;
- }
- cachedStatuses = fetchAllClusterStatuses(authToken);
- cacheTimestamp = System.currentTimeMillis();
- return cachedStatuses;
- }
- }
-
- private Map> fetchAllClusterStatuses(String authToken) {
- final Map> result = new ConcurrentHashMap<>();
- final String localNodeId = nodeId.getNodeId();
-
- // 1. Local node: read InputRegistry directly (no HTTP)
- inputRegistry.getStatusesByInputId().forEach((inputId, status) ->
- result.computeIfAbsent(inputId, k -> ConcurrentHashMap.newKeySet()).add(status));
-
- // 2. Remote nodes: parallel HTTP calls
- final Map activeNodes = nodeService.allActive();
- final Map>> futures = new HashMap<>();
-
- for (Map.Entry entry : activeNodes.entrySet()) {
- if (entry.getKey().equals(localNodeId)) {
- continue; // skip local node, already handled above
- }
- final Node node = entry.getValue();
- futures.put(entry.getKey(), executorService.submit(() -> {
- final RemoteInputStatesResource remote = remoteInterfaceProvider.get(
- node, authToken, RemoteInputStatesResource.class);
- final Response