Skip to content

Commit

Permalink
Add auth state dedup logic
Browse files Browse the repository at this point in the history
  • Loading branch information
taldekar committed Jan 22, 2025
1 parent dc8b198 commit 98841fd
Showing 1 changed file with 12 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
Expand Down Expand Up @@ -61,6 +62,7 @@ public static final class OrderedThreadPoolExecutor {
private final Map<String, BlockingQueue<?>> interestIdToEventQueueMap;
private final Map<String, AtomicBoolean> interestIdToJobStatusMap;
private final Map<String, TypedCallable<?>> interestIdToCallbackMap;
private final Map<String, Object> interestIdToLastEventMap;

private final BlockingQueue<Runnable> scheduledJobsQueue;
private final ThreadPoolExecutor executor;
Expand All @@ -74,6 +76,7 @@ public static final class OrderedThreadPoolExecutor {
interestIdToEventQueueMap = new ConcurrentHashMap<>();
interestIdToJobStatusMap = new ConcurrentHashMap<>();
interestIdToCallbackMap = new ConcurrentHashMap<>();
interestIdToLastEventMap = new ConcurrentHashMap<>();

this.eventQueueCapacity = eventQueueCapacity;

Expand Down Expand Up @@ -121,18 +124,25 @@ private <T, R> void processQueuedEvents(final String interestId, final Class<R>
}

List<R> eventBatchQueue = new ArrayList<>(EVENT_BATCH_SIZE);
R lastEvent = Optional.ofNullable(interestIdToLastEventMap.get(interestId)).map(event -> (R) event)
.orElse(null);

while (eventQueue.drainTo(eventBatchQueue) > 0) {
while (eventQueue.drainTo(eventBatchQueue, EVENT_BATCH_SIZE) > 0) {
for (R newEvent : eventBatchQueue) {
try {
eventCallback.callWith(newEvent);
if (!newEvent.equals(lastEvent)) {
eventCallback.callWith(newEvent);
}
lastEvent = newEvent;
} catch (Exception e) {
e.printStackTrace();
}
}

eventBatchQueue.clear();
}

interestIdToLastEventMap.put(interestId, lastEvent);
} finally {
jobStatus.set(false);
}
Expand Down

0 comments on commit 98841fd

Please sign in to comment.