diff --git a/janusgraph-cdc-extension/src/main/java/org/sunbird/janusgraph/cdc/GraphLogProcessor.java b/janusgraph-cdc-extension/src/main/java/org/sunbird/janusgraph/cdc/GraphLogProcessor.java index 5f6d543..449f69f 100644 --- a/janusgraph-cdc-extension/src/main/java/org/sunbird/janusgraph/cdc/GraphLogProcessor.java +++ b/janusgraph-cdc-extension/src/main/java/org/sunbird/janusgraph/cdc/GraphLogProcessor.java @@ -39,6 +39,18 @@ public class GraphLogProcessor { private MessageConverter converter; private boolean isStarted = false; + // Tracks the last-processed lastUpdatedOn (epoch ms) per nodeUniqueId + // to filter out-of-order/duplicate events. + // Bounded to MAX_CACHE_SIZE entries (LRU eviction) to prevent unbounded heap growth. + private static final int MAX_CACHE_SIZE = 10_000; + private final Map lastUpdatedCache = Collections.synchronizedMap( + new LinkedHashMap(16, 0.75f, true) { + @Override + protected boolean removeEldestEntry(Map.Entry eldest) { + return size() > MAX_CACHE_SIZE; + } + }); + // Event buffering removed private GraphLogProcessor() { @@ -75,7 +87,7 @@ private void init(JanusGraph graph, Map config) { logger.info("Starting GraphLogProcessor..."); - // INitialize Converter + // Initialize Converter String converterType = (String) config.getOrDefault("graph.txn.log_processor.converter", "DEFAULT"); if ("TELEMETRY".equalsIgnoreCase(converterType)) { converter = new TelemetryMessageConverter(); @@ -142,6 +154,7 @@ private void stop() { } } sinks.clear(); + lastUpdatedCache.clear(); isStarted = false; logger.info("GraphLogProcessor stopped."); } @@ -167,33 +180,37 @@ private void processChanges(TransactionId txId, ChangeState changeState) { // This ensures edge-only changes are also discovered. Set changedVertices = changeState.getVertices(Change.ANY); - // Count all relations in this ChangeState (edges + properties combined) + // Cache ADDED/REMOVED vertex sets once — avoids repeated getVertices() calls + // inside the loop and prevents re-consuming potentially single-use iterables. + Set addedVertices = changeState.getVertices(Change.ADDED); + Set removedVertices = changeState.getVertices(Change.REMOVED); + + // Count all relations in this ChangeState for diagnostic logging int addedRelCount = 0; int removedRelCount = 0; for (JanusGraphRelation r : changeState.getRelations(Change.ADDED)) { addedRelCount++; if (r.isEdge()) { - logger.info(" ADDED relation (edge): type={}, vertices={}", r.getType().name(), r); + logger.debug(" ADDED relation (edge): type={}, vertices={}", r.getType().name(), r); } } for (JanusGraphRelation r : changeState.getRelations(Change.REMOVED)) { removedRelCount++; if (r.isEdge()) { - logger.info(" REMOVED relation (edge): type={}, vertices={}", r.getType().name(), r); + logger.debug(" REMOVED relation (edge): type={}, vertices={}", r.getType().name(), r); } } logger.info("ChangeState — vertices ADDED: {}, REMOVED: {}, ANY: {} | relations ADDED: {}, REMOVED: {}", - changeState.getVertices(Change.ADDED).size(), - changeState.getVertices(Change.REMOVED).size(), - changedVertices.size(), + addedVertices.size(), removedVertices.size(), changedVertices.size(), addedRelCount, removedRelCount); + for (JanusGraphVertex vertex : changedVertices) { // If it's a new vertex, we already processed it as CREATE - if (changeState.getVertices(Change.ADDED).contains(vertex)) { + if (addedVertices.contains(vertex)) { continue; } // If it's a removed vertex, we already processed it as DELETE - if (changeState.getVertices(Change.REMOVED).contains(vertex)) { + if (removedVertices.contains(vertex)) { continue; } @@ -219,8 +236,17 @@ private void processVertexChange(JanusGraphVertex vertex, ChangeState changeStat return; } + if (!shouldProcessEvent(event)) { + return; + } + // 4. Send event immediately (No buffering) - sendEventToSinks(vertex.id().toString(), event); + // Update cache only after a confirmed successful send to avoid + // dropping retries when the send fails. + boolean sent = sendEventToSinks(vertex.id().toString(), event); + if (sent) { + updateLastUpdatedCache(event); + } } catch (Exception e) { logger.error("Error converting/processing vertex change event", e); @@ -228,9 +254,13 @@ private void processVertexChange(JanusGraphVertex vertex, ChangeState changeStat } /** - * Send event to all configured sinks + * Send event to all configured sinks. + * + * @return true if serialization succeeded (even if individual sinks erred), + * false if the event could not be serialized (so the cache should not + * be advanced and the event may be retried). */ - private void sendEventToSinks(String key, Map event) { + private boolean sendEventToSinks(String key, Map event) { try { String json = mapper.writeValueAsString(event); for (EventSink sink : sinks) { @@ -241,31 +271,13 @@ private void sendEventToSinks(String key, Map event) { } } logger.info("Sent event: {}", json); + return true; } catch (Exception e) { logger.error("Error serializing event", e); + return false; } } - private boolean hasStatusAttribute(Map event) { - try { - if (event.containsKey("transactionData")) { - Map txData = (Map) event.get("transactionData"); - if (txData != null && txData.containsKey("properties")) { - Map props = (Map) txData.get("properties"); - return props != null && props.containsKey("status"); - } - } - // Check for flat properties (SimpleMessageConverter) - if (event.containsKey("properties")) { - Map props = (Map) event.get("properties"); - return props != null && props.containsKey("status"); - } - } catch (Exception e) { - // ignore - } - return false; - } - private String getNodeUniqueId(Map event) { if (event.containsKey("nodeUniqueId")) { return (String) event.get("nodeUniqueId"); @@ -340,4 +352,44 @@ private Long parseTimestamp(Object ts) { } return null; } + + /** + * Returns true if this event should be emitted; false if it is older than (or + * the same age as) an already-processed event for the same vertex. + * Does NOT update the cache — call {@link #updateLastUpdatedCache} after a + * confirmed successful send. + */ + private boolean shouldProcessEvent(Map event) { + String nodeId = getNodeUniqueId(event); + if (nodeId == null) { + return true; + } + Long eventTs = getLastUpdatedOn(event); + if (eventTs == null) { + return true; + } + Long cachedTs = lastUpdatedCache.get(nodeId); + if (cachedTs == null || eventTs > cachedTs) { + return true; + } + logger.debug("Dropping out-of-order/duplicate event for node {} (event ts={}, cached ts={})", + nodeId, eventTs, cachedTs); + return false; + } + + /** + * Advances the cache for the event's nodeUniqueId to its lastUpdatedOn + * timestamp. Call this only after the event has been successfully sent. + */ + private void updateLastUpdatedCache(Map event) { + String nodeId = getNodeUniqueId(event); + if (nodeId == null) { + return; + } + Long eventTs = getLastUpdatedOn(event); + if (eventTs == null) { + return; + } + lastUpdatedCache.put(nodeId, eventTs); + } }