Skip to content

Commit

Permalink
Remove unused generic types
Browse files Browse the repository at this point in the history
  • Loading branch information
taldekar committed Jan 23, 2025
1 parent 63f4cc9 commit 048ce63
Showing 1 changed file with 13 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,29 +84,29 @@ public static final class OrderedThreadPoolExecutor {
scheduledJobsQueue, Executors.defaultThreadFactory(), new CallerRunsPolicyBlocking(scheduledJobsQueue));
}

public <T, R> void registerCallbackForInterest(final String interestId, final TypedCallable<R> callback) {
public <T> void registerCallbackForInterest(final String interestId, final TypedCallable<T> callback) {
interestIdToCallbackMap.putIfAbsent(interestId, callback);
}

public <T> boolean isCallbackRegisteredForInterest(final String interestId) {
public boolean isCallbackRegisteredForInterest(final String interestId) {
return interestIdToCallbackMap.containsKey(interestId);
}

@SuppressWarnings("unchecked")
public <T, R> void submitEventForInterest(final String interestId, final R event) {
BlockingQueue<R> eventQueue = (BlockingQueue<R>) interestIdToEventQueueMap.computeIfAbsent(interestId,
public <T> void submitEventForInterest(final String interestId, final T event) {
BlockingQueue<T> eventQueue = (BlockingQueue<T>) interestIdToEventQueueMap.computeIfAbsent(interestId,
k -> new ArrayBlockingQueue<>(eventQueueCapacity, true));
try {
eventQueue.put(event);
} catch (InterruptedException e) {
e.printStackTrace();
}

handleJobScheduling(interestId, (Class<R>) event.getClass(), eventQueue);
handleJobScheduling(interestId, (Class<T>) event.getClass(), eventQueue);
}

private <T, R> void handleJobScheduling(final String interestId, final Class<R> eventType,
final BlockingQueue<R> eventQueue) {
private <T> void handleJobScheduling(final String interestId, final Class<T> eventType,
final BlockingQueue<T> eventQueue) {
AtomicBoolean jobStatus = interestIdToJobStatusMap.computeIfAbsent(interestId, k -> new AtomicBoolean(false));

if (jobStatus.compareAndSet(false, true)) {
Expand All @@ -115,20 +115,20 @@ private <T, R> void handleJobScheduling(final String interestId, final Class<R>
}

@SuppressWarnings("unchecked")
private <T, R> void processQueuedEvents(final String interestId, final Class<R> eventType,
final BlockingQueue<R> eventQueue, final AtomicBoolean jobStatus) {
private <T> void processQueuedEvents(final String interestId, final Class<T> eventType,
final BlockingQueue<T> eventQueue, final AtomicBoolean jobStatus) {
try {
TypedCallable<R> eventCallback = (TypedCallable<R>) interestIdToCallbackMap.get(interestId);
TypedCallable<T> eventCallback = (TypedCallable<T>) interestIdToCallbackMap.get(interestId);
if (eventCallback == null) {
return;
}

List<R> eventBatchQueue = new ArrayList<>(EVENT_BATCH_SIZE);
R lastEvent = Optional.ofNullable(interestIdToLastEventMap.get(interestId)).map(event -> (R) event)
List<T> eventBatchQueue = new ArrayList<>(EVENT_BATCH_SIZE);
T lastEvent = Optional.ofNullable(interestIdToLastEventMap.get(interestId)).map(event -> (T) event)
.orElse(null);

while (eventQueue.drainTo(eventBatchQueue, EVENT_BATCH_SIZE) > 0) {
for (R newEvent : eventBatchQueue) {
for (T newEvent : eventBatchQueue) {
try {
if (!newEvent.equals(lastEvent)) {
eventCallback.callWith(newEvent);
Expand All @@ -138,7 +138,6 @@ private <T, R> void processQueuedEvents(final String interestId, final Class<R>
e.printStackTrace();
}
}

eventBatchQueue.clear();
}

Expand Down

0 comments on commit 048ce63

Please sign in to comment.