Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,18 @@ public class GraphLogProcessor {
private MessageConverter converter;
private boolean isStarted = false;

// Tracks the last-processed lastUpdatedOn (epoch ms) per nodeUniqueId
// to filter out-of-order/duplicate events.
// Bounded to MAX_CACHE_SIZE entries (LRU eviction) to prevent unbounded heap growth.
private static final int MAX_CACHE_SIZE = 10_000;
private final Map<String, Long> lastUpdatedCache = Collections.synchronizedMap(
new LinkedHashMap<String, Long>(16, 0.75f, true) {
@Override
protected boolean removeEldestEntry(Map.Entry<String, Long> eldest) {
return size() > MAX_CACHE_SIZE;
}
});

// Event buffering removed

private GraphLogProcessor() {
Expand Down Expand Up @@ -75,7 +87,7 @@ private void init(JanusGraph graph, Map<String, Object> config) {

logger.info("Starting GraphLogProcessor...");

// INitialize Converter
// Initialize Converter
String converterType = (String) config.getOrDefault("graph.txn.log_processor.converter", "DEFAULT");
if ("TELEMETRY".equalsIgnoreCase(converterType)) {
converter = new TelemetryMessageConverter();
Expand Down Expand Up @@ -142,6 +154,7 @@ private void stop() {
}
}
sinks.clear();
lastUpdatedCache.clear();
isStarted = false;
logger.info("GraphLogProcessor stopped.");
}
Expand All @@ -167,33 +180,37 @@ private void processChanges(TransactionId txId, ChangeState changeState) {
// This ensures edge-only changes are also discovered.
Set<JanusGraphVertex> changedVertices = changeState.getVertices(Change.ANY);

// Count all relations in this ChangeState (edges + properties combined)
// Cache ADDED/REMOVED vertex sets once — avoids repeated getVertices() calls
// inside the loop and prevents re-consuming potentially single-use iterables.
Set<JanusGraphVertex> addedVertices = changeState.getVertices(Change.ADDED);
Set<JanusGraphVertex> removedVertices = changeState.getVertices(Change.REMOVED);

// Count all relations in this ChangeState for diagnostic logging
int addedRelCount = 0;
int removedRelCount = 0;
for (JanusGraphRelation r : changeState.getRelations(Change.ADDED)) {
addedRelCount++;
if (r.isEdge()) {
logger.info(" ADDED relation (edge): type={}, vertices={}", r.getType().name(), r);
logger.debug(" ADDED relation (edge): type={}, vertices={}", r.getType().name(), r);
}
}
for (JanusGraphRelation r : changeState.getRelations(Change.REMOVED)) {
removedRelCount++;
if (r.isEdge()) {
logger.info(" REMOVED relation (edge): type={}, vertices={}", r.getType().name(), r);
logger.debug(" REMOVED relation (edge): type={}, vertices={}", r.getType().name(), r);
}
}
logger.info("ChangeState — vertices ADDED: {}, REMOVED: {}, ANY: {} | relations ADDED: {}, REMOVED: {}",
changeState.getVertices(Change.ADDED).size(),
changeState.getVertices(Change.REMOVED).size(),
changedVertices.size(),
addedVertices.size(), removedVertices.size(), changedVertices.size(),
addedRelCount, removedRelCount);

for (JanusGraphVertex vertex : changedVertices) {
// If it's a new vertex, we already processed it as CREATE
if (changeState.getVertices(Change.ADDED).contains(vertex)) {
if (addedVertices.contains(vertex)) {
continue;
}
// If it's a removed vertex, we already processed it as DELETE
if (changeState.getVertices(Change.REMOVED).contains(vertex)) {
if (removedVertices.contains(vertex)) {
continue;
}

Expand All @@ -219,18 +236,31 @@ private void processVertexChange(JanusGraphVertex vertex, ChangeState changeStat
return;
}

if (!shouldProcessEvent(event)) {
return;
}

// 4. Send event immediately (No buffering)
sendEventToSinks(vertex.id().toString(), event);
// Update cache only after a confirmed successful send to avoid
// dropping retries when the send fails.
boolean sent = sendEventToSinks(vertex.id().toString(), event);
if (sent) {
updateLastUpdatedCache(event);
}

} catch (Exception e) {
logger.error("Error converting/processing vertex change event", e);
}
}

/**
* Send event to all configured sinks
* Send event to all configured sinks.
*
* @return true if serialization succeeded (even if individual sinks erred),
* false if the event could not be serialized (so the cache should not
* be advanced and the event may be retried).
*/
private void sendEventToSinks(String key, Map<String, Object> event) {
private boolean sendEventToSinks(String key, Map<String, Object> event) {
try {
String json = mapper.writeValueAsString(event);
for (EventSink sink : sinks) {
Expand All @@ -241,31 +271,13 @@ private void sendEventToSinks(String key, Map<String, Object> event) {
}
}
logger.info("Sent event: {}", json);
return true;
} catch (Exception e) {
logger.error("Error serializing event", e);
return false;
}
}

private boolean hasStatusAttribute(Map<String, Object> event) {
try {
if (event.containsKey("transactionData")) {
Map<String, Object> txData = (Map<String, Object>) event.get("transactionData");
if (txData != null && txData.containsKey("properties")) {
Map<String, Object> props = (Map<String, Object>) txData.get("properties");
return props != null && props.containsKey("status");
}
}
// Check for flat properties (SimpleMessageConverter)
if (event.containsKey("properties")) {
Map<String, Object> props = (Map<String, Object>) event.get("properties");
return props != null && props.containsKey("status");
}
} catch (Exception e) {
// ignore
}
return false;
}

private String getNodeUniqueId(Map<String, Object> event) {
if (event.containsKey("nodeUniqueId")) {
return (String) event.get("nodeUniqueId");
Expand Down Expand Up @@ -340,4 +352,44 @@ private Long parseTimestamp(Object ts) {
}
return null;
}

/**
* Returns true if this event should be emitted; false if it is older than (or
* the same age as) an already-processed event for the same vertex.
* Does NOT update the cache — call {@link #updateLastUpdatedCache} after a
* confirmed successful send.
*/
private boolean shouldProcessEvent(Map<String, Object> event) {
String nodeId = getNodeUniqueId(event);
if (nodeId == null) {
return true;
}
Long eventTs = getLastUpdatedOn(event);
if (eventTs == null) {
return true;
}
Long cachedTs = lastUpdatedCache.get(nodeId);
if (cachedTs == null || eventTs > cachedTs) {
return true;
}
logger.debug("Dropping out-of-order/duplicate event for node {} (event ts={}, cached ts={})",
nodeId, eventTs, cachedTs);
return false;
}

/**
* Advances the cache for the event's nodeUniqueId to its lastUpdatedOn
* timestamp. Call this only after the event has been successfully sent.
*/
private void updateLastUpdatedCache(Map<String, Object> event) {
String nodeId = getNodeUniqueId(event);
if (nodeId == null) {
return;
}
Long eventTs = getLastUpdatedOn(event);
if (eventTs == null) {
return;
}
lastUpdatedCache.put(nodeId, eventTs);
}
}