Skip to content

Commit

Permalink
Remove unused generic types
Browse files Browse the repository at this point in the history
  • Loading branch information
taldekar committed Jan 23, 2025
1 parent 048ce63 commit 89934a8
Showing 1 changed file with 31 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,76 +59,76 @@ public void rejectedExecution(final Runnable runnable, final ThreadPoolExecutor

public static final class OrderedThreadPoolExecutor {

private final Map<String, BlockingQueue<?>> interestIdToEventQueueMap;
private final Map<String, AtomicBoolean> interestIdToJobStatusMap;
private final Map<String, TypedCallable<?>> interestIdToCallbackMap;
private final Map<String, Object> interestIdToLastEventMap;
private final Map<String, BlockingQueue<?>> bufferedEventsForInterest;
private final Map<String, AtomicBoolean> jobStatusForInterest;
private final Map<String, TypedCallable<?>> callbackForInterest;
private final Map<String, Object> lastEventForInterest;

private final BlockingQueue<Runnable> scheduledJobsQueue;
private final BlockingQueue<Runnable> scheduledJobs;
private final ThreadPoolExecutor executor;
private final int eventQueueCapacity;

public static final int EVENT_BATCH_SIZE = 250;

OrderedThreadPoolExecutor(final int coreThreadCount, final int maxThreadCount, final int jobQueueCapacity,
final int eventQueueCapacity, final int keepAliveTime, final TimeUnit keepAliveTimeUnit) {
scheduledJobsQueue = new ArrayBlockingQueue<>(jobQueueCapacity);
interestIdToEventQueueMap = new ConcurrentHashMap<>();
interestIdToJobStatusMap = new ConcurrentHashMap<>();
interestIdToCallbackMap = new ConcurrentHashMap<>();
interestIdToLastEventMap = new ConcurrentHashMap<>();
scheduledJobs = new ArrayBlockingQueue<>(jobQueueCapacity);
bufferedEventsForInterest = new ConcurrentHashMap<>();
jobStatusForInterest = new ConcurrentHashMap<>();
callbackForInterest = new ConcurrentHashMap<>();
lastEventForInterest = new ConcurrentHashMap<>();

this.eventQueueCapacity = eventQueueCapacity;

executor = new ThreadPoolExecutor(coreThreadCount, maxThreadCount, keepAliveTime, keepAliveTimeUnit,
scheduledJobsQueue, Executors.defaultThreadFactory(), new CallerRunsPolicyBlocking(scheduledJobsQueue));
scheduledJobs, Executors.defaultThreadFactory(), new CallerRunsPolicyBlocking(scheduledJobs));
}

public <T> void registerCallbackForInterest(final String interestId, final TypedCallable<T> callback) {
interestIdToCallbackMap.putIfAbsent(interestId, callback);
callbackForInterest.putIfAbsent(interestId, callback);
}

public boolean isCallbackRegisteredForInterest(final String interestId) {
return interestIdToCallbackMap.containsKey(interestId);
return callbackForInterest.containsKey(interestId);
}

@SuppressWarnings("unchecked")
public <T> void submitEventForInterest(final String interestId, final T event) {
BlockingQueue<T> eventQueue = (BlockingQueue<T>) interestIdToEventQueueMap.computeIfAbsent(interestId,
BlockingQueue<T> bufferedEvents = (BlockingQueue<T>) bufferedEventsForInterest.computeIfAbsent(interestId,
k -> new ArrayBlockingQueue<>(eventQueueCapacity, true));
try {
eventQueue.put(event);
bufferedEvents.put(event);
} catch (InterruptedException e) {
e.printStackTrace();
}

handleJobScheduling(interestId, (Class<T>) event.getClass(), eventQueue);
handleJobScheduling(interestId, (Class<T>) event.getClass(), bufferedEvents);
}

private <T> void handleJobScheduling(final String interestId, final Class<T> eventType,
final BlockingQueue<T> eventQueue) {
AtomicBoolean jobStatus = interestIdToJobStatusMap.computeIfAbsent(interestId, k -> new AtomicBoolean(false));
final BlockingQueue<T> bufferedEvents) {
AtomicBoolean jobStatus = jobStatusForInterest.computeIfAbsent(interestId, k -> new AtomicBoolean(false));

if (jobStatus.compareAndSet(false, true)) {
executor.submit(() -> processQueuedEvents(interestId, eventType, eventQueue, jobStatus));
executor.submit(() -> processQueuedEvents(interestId, eventType, bufferedEvents, jobStatus));
}
}

@SuppressWarnings("unchecked")
private <T> void processQueuedEvents(final String interestId, final Class<T> eventType,
final BlockingQueue<T> eventQueue, final AtomicBoolean jobStatus) {
final BlockingQueue<T> bufferedEvents, final AtomicBoolean jobStatus) {
try {
TypedCallable<T> eventCallback = (TypedCallable<T>) interestIdToCallbackMap.get(interestId);
TypedCallable<T> eventCallback = (TypedCallable<T>) callbackForInterest.get(interestId);
if (eventCallback == null) {
return;
}

List<T> eventBatchQueue = new ArrayList<>(EVENT_BATCH_SIZE);
T lastEvent = Optional.ofNullable(interestIdToLastEventMap.get(interestId)).map(event -> (T) event)
List<T> batchedEvents = new ArrayList<>(EVENT_BATCH_SIZE);
T lastEvent = Optional.ofNullable(lastEventForInterest.get(interestId)).map(event -> (T) event)
.orElse(null);

while (eventQueue.drainTo(eventBatchQueue, EVENT_BATCH_SIZE) > 0) {
for (T newEvent : eventBatchQueue) {
while (bufferedEvents.drainTo(batchedEvents, EVENT_BATCH_SIZE) > 0) {
for (T newEvent : batchedEvents) {
try {
if (!newEvent.equals(lastEvent)) {
eventCallback.callWith(newEvent);
Expand All @@ -138,10 +138,11 @@ private <T> void processQueuedEvents(final String interestId, final Class<T> eve
e.printStackTrace();
}
}
eventBatchQueue.clear();

batchedEvents.clear();
}

interestIdToLastEventMap.put(interestId, lastEvent);
lastEventForInterest.put(interestId, lastEvent);
} finally {
jobStatus.set(false);
}
Expand All @@ -150,7 +151,7 @@ private <T> void processQueuedEvents(final String interestId, final Class<T> eve
}

private static final EventBroker INSTANCE;
private final Map<Class<?>, SubmissionPublisher<?>> eventTypeToPublisherMap;
private final Map<Class<?>, SubmissionPublisher<?>> publisherForEventType;
private final OrderedThreadPoolExecutor publisherExecutor;
private final OrderedThreadPoolExecutor subscriberExecutor;

Expand All @@ -159,7 +160,7 @@ private <T> void processQueuedEvents(final String interestId, final Class<T> eve
}

private EventBroker() {
eventTypeToPublisherMap = new ConcurrentHashMap<>();
publisherForEventType = new ConcurrentHashMap<>();
publisherExecutor = new OrderedThreadPoolExecutor(5, 20, 50, 100, 10, TimeUnit.MILLISECONDS);
subscriberExecutor = new OrderedThreadPoolExecutor(5, 20, 50, 100, 10, TimeUnit.MILLISECONDS);
}
Expand Down Expand Up @@ -264,7 +265,7 @@ public void onComplete() {

@SuppressWarnings("unchecked")
private <T> SubmissionPublisher<T> getPublisherForEventType(final Class<T> eventType) {
return (SubmissionPublisher<T>) eventTypeToPublisherMap.computeIfAbsent(eventType,
return (SubmissionPublisher<T>) publisherForEventType.computeIfAbsent(eventType,
key -> new SubmissionPublisher<>(Runnable::run, Flow.defaultBufferSize()));
}

Expand Down

0 comments on commit 89934a8

Please sign in to comment.