Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.graylog2.periodical.LeaderPresenceCheckPeriodical;
import org.graylog2.periodical.NodeMetricPeriodical;
import org.graylog2.periodical.NodePingThread;
import org.graylog2.periodical.StaleInputRuntimeStateCleanup;
import org.graylog2.periodical.OrphanedTokenCleaner;
import org.graylog2.periodical.SearchVersionCheckPeriodical;
import org.graylog2.periodical.ThrottleStateUpdaterThread;
Expand Down Expand Up @@ -74,5 +75,6 @@ protected void configure() {
periodicalBinder.addBinding().to(ExpiredTokenCleaner.class);
periodicalBinder.addBinding().to(OrphanedTokenCleaner.class);
periodicalBinder.addBinding().to(NodeMetricPeriodical.class);
periodicalBinder.addBinding().to(StaleInputRuntimeStateCleanup.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,52 +17,28 @@
package org.graylog2.inputs;

import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import org.graylog2.cluster.Node;
import org.graylog2.cluster.NodeService;
import org.graylog2.database.filtering.ComputedFieldProvider;
import org.graylog2.inputs.persistence.InputStateService;
import org.graylog2.plugin.IOState;
import org.graylog2.plugin.system.NodeId;
import org.graylog2.rest.RemoteInterfaceProvider;
import org.graylog2.rest.resources.system.inputs.RemoteInputStatesResource;
import org.graylog2.shared.inputs.InputRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import retrofit2.Response;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

/**
* Provides filtering support for input runtime status by querying InputRegistry across all cluster nodes.
* Provides filtering support for input runtime status by querying MongoDB.
* <p>
* This provider enables filtering inputs by their actual runtime state (RUNNING, FAILED, STOPPED, etc.)
* rather than just the desired_state stored in the database. It aggregates runtime status information
* from all nodes in the cluster to provide a cluster-wide view.
* </p>
* <p>
* <b>Optimizations:</b>
* </p>
* <ul>
* <li>Reads the local node's InputRegistry directly (no HTTP call to itself)</li>
* <li>Queries remote nodes in parallel</li>
* <li>Caches results for 5 seconds to avoid repeated cluster-wide fetches</li>
* </ul>
* Runtime state is persisted to MongoDB by {@link org.graylog2.inputs.InputStateListener},
* enabling efficient cluster-wide queries by state without HTTP fan-out.
*/
@Singleton
public class InputRuntimeStatusProvider implements ComputedFieldProvider {
private static final Logger LOG = LoggerFactory.getLogger(InputRuntimeStatusProvider.class);
private static final String FIELD_NAME = "runtime_status";
private static final long CACHE_TTL_MS = 5_000;

public static final Map<String, Set<IOState.Type>> STATUS_GROUPS = Map.of(
"RUNNING", Set.of(IOState.Type.RUNNING),
Expand All @@ -78,29 +54,14 @@ public class InputRuntimeStatusProvider implements ComputedFieldProvider {
"FAILED", "Failed"
);

private final NodeService nodeService;
private final RemoteInterfaceProvider remoteInterfaceProvider;
private final InputRegistry inputRegistry;
private final InputStateService runtimeStateService;
private final InputService inputService;
private final NodeId nodeId;
private final ExecutorService executorService;

private volatile Map<String, Set<String>> cachedStatuses;
private volatile long cacheTimestamp = 0;

@Inject
public InputRuntimeStatusProvider(NodeService nodeService,
RemoteInterfaceProvider remoteInterfaceProvider,
InputRegistry inputRegistry,
InputService inputService,
NodeId nodeId,
@Named("proxiedRequestsExecutorService") ExecutorService executorService) {
this.nodeService = nodeService;
this.remoteInterfaceProvider = remoteInterfaceProvider;
this.inputRegistry = inputRegistry;
public InputRuntimeStatusProvider(InputStateService runtimeStateService,
InputService inputService) {
this.runtimeStateService = runtimeStateService;
this.inputService = inputService;
this.nodeId = nodeId;
this.executorService = executorService;
}

@Override
Expand All @@ -112,27 +73,16 @@ public Set<String> getMatchingIds(String filterValue, String authToken) {
return Set.of();
}

// For NOT_RUNNING: query desired_state from DB since stopped inputs are absent from the InputRegistry
// For NOT_RUNNING: query desired_state from DB since stopped inputs have no runtime state
if ("NOT_RUNNING".equals(key)) {
final Set<String> matching = inputService.findIdsByDesiredState(IOState.Type.STOPPED);
LOG.debug("Found {} inputs with runtime_status group={}", matching.size(), key);
return matching;
}


final Set<String> targetStrings = targetStates.stream()
.map(IOState.Type::toString)
.collect(Collectors.toSet());

final Map<String, Set<String>> allStatuses = getClusterStatuses(authToken);
final Set<String> matching = new HashSet<>();
for (Map.Entry<String, Set<String>> entry : allStatuses.entrySet()) {
for (String status : entry.getValue()) {
if (targetStrings.contains(status)) {
matching.add(entry.getKey());
break;
}
}
for (IOState.Type type : targetStates) {
matching.addAll(runtimeStateService.getByState(type));
}

LOG.debug("Found {} inputs with runtime_status group={}", matching.size(), key);
Expand All @@ -143,63 +93,4 @@ public Set<String> getMatchingIds(String filterValue, String authToken) {
public String getFieldName() {
return FIELD_NAME;
}

private Map<String, Set<String>> getClusterStatuses(String authToken) {
long now = System.currentTimeMillis();
if (cachedStatuses != null && (now - cacheTimestamp) < CACHE_TTL_MS) {
return cachedStatuses;
}
synchronized (this) {
// Double-check after acquiring lock
if (cachedStatuses != null && (System.currentTimeMillis() - cacheTimestamp) < CACHE_TTL_MS) {
return cachedStatuses;
}
cachedStatuses = fetchAllClusterStatuses(authToken);
cacheTimestamp = System.currentTimeMillis();
return cachedStatuses;
}
}

private Map<String, Set<String>> fetchAllClusterStatuses(String authToken) {
final Map<String, Set<String>> result = new ConcurrentHashMap<>();
final String localNodeId = nodeId.getNodeId();

// 1. Local node: read InputRegistry directly (no HTTP)
inputRegistry.getStatusesByInputId().forEach((inputId, status) ->
result.computeIfAbsent(inputId, k -> ConcurrentHashMap.newKeySet()).add(status));

// 2. Remote nodes: parallel HTTP calls
final Map<String, Node> activeNodes = nodeService.allActive();
final Map<String, Future<Map<String, String>>> futures = new HashMap<>();

for (Map.Entry<String, Node> entry : activeNodes.entrySet()) {
if (entry.getKey().equals(localNodeId)) {
continue; // skip local node, already handled above
}
final Node node = entry.getValue();
futures.put(entry.getKey(), executorService.submit(() -> {
final RemoteInputStatesResource remote = remoteInterfaceProvider.get(
node, authToken, RemoteInputStatesResource.class);
final Response<Map<String, String>> response = remote.getLocalStatuses().execute();
if (response.isSuccessful() && response.body() != null) {
return response.body();
}
LOG.debug("Failed to get response from node {}: {}", node.getNodeId(), response.code());
return Map.<String, String>of();
}));
}

// 3. Collect results with timeout
for (Map.Entry<String, Future<Map<String, String>>> entry : futures.entrySet()) {
try {
final Map<String, String> nodeStatuses = entry.getValue().get(30, TimeUnit.SECONDS);
nodeStatuses.forEach((id, status) ->
result.computeIfAbsent(id, k -> ConcurrentHashMap.newKeySet()).add(status));
} catch (Exception e) {
LOG.debug("Error fetching input states from node {}: {}", entry.getKey(), e.getMessage());
}
}

return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.Subscribe;
import org.apache.commons.lang3.ObjectUtils;
import org.graylog2.inputs.persistence.InputStateService;
import org.graylog2.notifications.Notification;
import org.graylog2.notifications.NotificationService;
import org.graylog2.plugin.IOState;
Expand All @@ -43,15 +44,18 @@ public class InputStateListener {
private final NotificationService notificationService;
private final ActivityWriter activityWriter;
private final ServerStatus serverStatus;
private final InputStateService inputStateService;

@Inject
public InputStateListener(EventBus eventBus,
NotificationService notificationService,
ActivityWriter activityWriter,
ServerStatus serverStatus) {
ServerStatus serverStatus,
InputStateService inputStateService) {
this.notificationService = notificationService;
this.activityWriter = activityWriter;
this.serverStatus = serverStatus;
this.inputStateService = inputStateService;
eventBus.register(this);
}

Expand Down Expand Up @@ -88,5 +92,24 @@ public void inputStateChanged(IOStateChangedEvent<MessageInput> event) {

LOG.debug("Input State of {} changed: {} -> {}", input.toIdentifier(), event.oldState(), event.newState());
LOG.info("Input {} is now {}", input.toIdentifier(), event.newState());

final String inputId = input.getId();
if (inputId != null) {
try {
if (event.newState() == IOState.Type.TERMINATED) {
inputStateService.removeState(inputId);
} else {
inputStateService.upsertState(
inputId,
state.getState(),
state.getStartedAt(),
state.getLastFailedAt(),
state.getDetailedMessage());
}
} catch (Exception e) {
LOG.warn("Failed to persist runtime state for input {}: {}", inputId, e.getMessage());
LOG.debug("Exception details:", e);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Copyright (C) 2020 Graylog, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*/
package org.graylog2.inputs.persistence;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.google.auto.value.AutoValue;
import org.graylog2.database.MongoEntity;
import org.joda.time.DateTime;
import org.mongojack.Id;
import org.mongojack.ObjectId;

import javax.annotation.Nullable;

@AutoValue
@JsonDeserialize(builder = InputStateDto.Builder.class)
public abstract class InputStateDto implements MongoEntity {
static final String FIELD_ID = "id";
static final String FIELD_INPUT_ID = "input_id";
static final String FIELD_NODE_ID = "node_id";
static final String FIELD_STATE = "state";
static final String FIELD_STARTED_AT = "started_at";
static final String FIELD_LAST_FAILED_AT = "last_failed_at";
static final String FIELD_DETAILED_MESSAGE = "detailed_message";
static final String FIELD_UPDATED_AT = "updated_at";

@Id
@ObjectId
@Nullable
@JsonProperty(FIELD_ID)
public abstract String id();

@JsonProperty(FIELD_INPUT_ID)
public abstract String inputId();

@JsonProperty(FIELD_NODE_ID)
public abstract String nodeId();

@JsonProperty(FIELD_STATE)
public abstract String state();

@Nullable
@JsonProperty(FIELD_STARTED_AT)
public abstract DateTime startedAt();

@Nullable
@JsonProperty(FIELD_LAST_FAILED_AT)
public abstract DateTime lastFailedAt();

@Nullable
@JsonProperty(FIELD_DETAILED_MESSAGE)
public abstract String detailedMessage();

@JsonProperty(FIELD_UPDATED_AT)
public abstract DateTime updatedAt();

public static Builder builder() {
return Builder.create();
}

public abstract Builder toBuilder();

@AutoValue.Builder
public static abstract class Builder {
@JsonCreator
public static Builder create() {
return new AutoValue_InputStateDto.Builder();
}

@Id
@ObjectId
@JsonProperty(FIELD_ID)
public abstract Builder id(String id);

@JsonProperty(FIELD_INPUT_ID)
public abstract Builder inputId(String inputId);

@JsonProperty(FIELD_NODE_ID)
public abstract Builder nodeId(String nodeId);

@JsonProperty(FIELD_STATE)
public abstract Builder state(String state);

@JsonProperty(FIELD_STARTED_AT)
public abstract Builder startedAt(DateTime startedAt);

@JsonProperty(FIELD_LAST_FAILED_AT)
public abstract Builder lastFailedAt(DateTime lastFailedAt);

@JsonProperty(FIELD_DETAILED_MESSAGE)
public abstract Builder detailedMessage(String detailedMessage);

@JsonProperty(FIELD_UPDATED_AT)
public abstract Builder updatedAt(DateTime updatedAt);

public abstract InputStateDto build();
}
}
Loading
Loading