From 33e98f5499dcd0528d599cad3636f4e3848efded Mon Sep 17 00:00:00 2001 From: Ishan Taldekar Date: Fri, 17 Jan 2025 19:37:31 -0500 Subject: [PATCH] Improve internal API readability --- .../eclipse/amazonq/broker/EventBroker.java | 122 +++++++++--------- 1 file changed, 61 insertions(+), 61 deletions(-) diff --git a/plugin/src/software/aws/toolkits/eclipse/amazonq/broker/EventBroker.java b/plugin/src/software/aws/toolkits/eclipse/amazonq/broker/EventBroker.java index c9f347726..8bf412c48 100644 --- a/plugin/src/software/aws/toolkits/eclipse/amazonq/broker/EventBroker.java +++ b/plugin/src/software/aws/toolkits/eclipse/amazonq/broker/EventBroker.java @@ -29,7 +29,7 @@ public final class EventBroker { @FunctionalInterface private interface TypedCallable { - void call(T event); + void callWith(T event); } private final class BlockingCallerRunsPolicy implements RejectedExecutionHandler { @@ -58,40 +58,40 @@ public void rejectedExecution(final Runnable runnable, final ThreadPoolExecutor private class OrderedThreadPoolExecutor { - private final Map> typedEventQueue; - private final Map typedJobStatus; - private final Map typedJobLock; - private final Map> typedCallback; + private final Map> interestIdToEventQueueMap; + private final Map interestIdToJobStatusMap; + private final Map interestIdToJobLockMap; + private final Map> interestIdToCallbackMap; - private final BlockingQueue workQueue; + private final BlockingQueue scheduledJobsQueue; private final ThreadPoolExecutor executor; private final int eventQueueCapacity; - OrderedThreadPoolExecutor(final int coreThreadCount, final int maxThreadCount, final int queueCapacity, - final int keepAliveTime, final int eventQueueCapacity) { - workQueue = new ArrayBlockingQueue<>(queueCapacity); - typedEventQueue = new ConcurrentHashMap<>(); - typedJobStatus = new ConcurrentHashMap<>(); - typedJobLock = new ConcurrentHashMap<>(); - typedCallback = new ConcurrentHashMap<>(); + OrderedThreadPoolExecutor(final int coreThreadCount, final int maxThreadCount, final int jobQueueCapacity, + final int eventQueueCapacity, final int keepAliveTime, final TimeUnit keepAliveTimeUnit) { + scheduledJobsQueue = new ArrayBlockingQueue<>(jobQueueCapacity); + interestIdToEventQueueMap = new ConcurrentHashMap<>(); + interestIdToJobStatusMap = new ConcurrentHashMap<>(); + interestIdToJobLockMap = new ConcurrentHashMap<>(); + interestIdToCallbackMap = new ConcurrentHashMap<>(); this.eventQueueCapacity = eventQueueCapacity; - executor = new ThreadPoolExecutor(coreThreadCount, maxThreadCount, keepAliveTime, TimeUnit.MILLISECONDS, - workQueue, Executors.defaultThreadFactory(), new BlockingCallerRunsPolicy(workQueue)); + executor = new ThreadPoolExecutor(coreThreadCount, maxThreadCount, keepAliveTime, keepAliveTimeUnit, + scheduledJobsQueue, Executors.defaultThreadFactory(), new BlockingCallerRunsPolicy(scheduledJobsQueue)); } - public void registerCallback(final String interestId, final TypedCallable callback) { - typedCallback.putIfAbsent(interestId, callback); + public void registerCallbackForInterest(final String interestId, final TypedCallable callback) { + interestIdToCallbackMap.putIfAbsent(interestId, callback); } - public boolean hasRegisteredCallback(final String interestType) { - return typedCallback.containsKey(interestType); + public boolean isCallbackRegisteredForInterest(final String interestId) { + return interestIdToCallbackMap.containsKey(interestId); } @SuppressWarnings("unchecked") - public void submit(final String interestId, final R event) { - BlockingQueue eventQueue = (BlockingQueue) typedEventQueue.computeIfAbsent(interestId, + public void submitEventForInterest(final String interestId, final R event) { + BlockingQueue eventQueue = (BlockingQueue) interestIdToEventQueueMap.computeIfAbsent(interestId, k -> new ArrayBlockingQueue<>(eventQueueCapacity, true)); try { eventQueue.put(event); @@ -99,19 +99,19 @@ public void submit(final String interestId, final R event) { e.printStackTrace(); } - handleScheduling(interestId, (Class) event.getClass(), eventQueue); + handleJobScheduling(interestId, (Class) event.getClass(), eventQueue); } - public void handleScheduling(final String interestId, final Class eventType, + public void handleJobScheduling(final String interestId, final Class eventType, final BlockingQueue eventQueue) { - AtomicBoolean jobStatus = typedJobStatus.computeIfAbsent(interestId, k -> new AtomicBoolean(false)); - ReentrantLock jobLock = typedJobLock.computeIfAbsent(interestId, k -> new ReentrantLock(true)); + AtomicBoolean jobStatus = interestIdToJobStatusMap.computeIfAbsent(interestId, k -> new AtomicBoolean(false)); + ReentrantLock jobLock = interestIdToJobLockMap.computeIfAbsent(interestId, k -> new ReentrantLock(true)); jobLock.lock(); try { if (!jobStatus.get() && !eventQueue.isEmpty()) { if (jobStatus.compareAndSet(false, true)) { - executor.submit(() -> processEventQueue(interestId, eventType, + executor.submit(() -> processQueuedEvents(interestId, eventType, eventQueue, jobStatus, jobLock)); } } @@ -121,7 +121,7 @@ public void handleScheduling(final String interestId, final Class even } @SuppressWarnings("unchecked") - public void processEventQueue(final String interestId, final Class eventType, + public void processQueuedEvents(final String interestId, final Class eventType, final BlockingQueue eventQueue, final AtomicBoolean jobStatus, final ReentrantLock jobLock) { if (jobStatus == null || jobLock == null || eventQueue == null) { throw new NullPointerException("ThreadPoolExecutor in unexpected state"); @@ -129,7 +129,7 @@ public void processEventQueue(final String interestId, final Class eve jobLock.lock(); try { - TypedCallable eventCallback = (TypedCallable) typedCallback.get(interestId); + TypedCallable eventCallback = (TypedCallable) interestIdToCallbackMap.get(interestId); if (eventCallback == null) { return; } @@ -139,7 +139,7 @@ public void processEventQueue(final String interestId, final Class eve R newEvent = eventQueue.take(); if (newEvent != null) { try { - eventCallback.call(newEvent); + eventCallback.callWith(newEvent); } catch (Exception e) { e.printStackTrace(); } @@ -159,19 +159,18 @@ public void processEventQueue(final String interestId, final Class eve } private static final EventBroker INSTANCE; - private final Map, SubmissionPublisher> publishers; - private final OrderedThreadPoolExecutor emissionExecutor; - private final OrderedThreadPoolExecutor consumptionExecutor; + private final Map, SubmissionPublisher> eventTypeToPublisherMap; + private final OrderedThreadPoolExecutor eventEmissionExecutor; + private final OrderedThreadPoolExecutor eventConsumptionExecutor; static { INSTANCE = new EventBroker(); } private EventBroker() { - publishers = new ConcurrentHashMap<>(); - - emissionExecutor = new OrderedThreadPoolExecutor(5, 30, 30, 10, 100000000); - consumptionExecutor = new OrderedThreadPoolExecutor(5, 30, 30, 10, 100000000); + eventTypeToPublisherMap = new ConcurrentHashMap<>(); + eventEmissionExecutor = new OrderedThreadPoolExecutor(5, 30, 30, 100000000, 10, TimeUnit.MILLISECONDS); + eventConsumptionExecutor = new OrderedThreadPoolExecutor(5, 30, 30, 100000000, 10, TimeUnit.MILLISECONDS); } public static EventBroker getInstance() { @@ -184,20 +183,22 @@ public void post(final T event) { return; } - SubmissionPublisher publisher = getPublisher((Class) event.getClass()); - if (!emissionExecutor.hasRegisteredCallback((event.getClass().getName()))) { - registerPublisherCallback(publisher, event.getClass().getName()); + SubmissionPublisher publisher = getPublisherForEventType((Class) event.getClass()); + if (!eventEmissionExecutor.isCallbackRegisteredForInterest((event.getClass().getName()))) { + registerPublisherCallbackForInterest(event.getClass().getName(), publisher); } - emissionExecutor.submit(event.getClass().getName(), event); + eventEmissionExecutor.submitEventForInterest(event.getClass().getName(), event); } public Subscription subscribe(final EventObserver observer) { - SubmissionPublisher publisher = getPublisher(observer.getEventType()); + SubmissionPublisher publisher = getPublisherForEventType(observer.getEventType()); AtomicReference subscriptionReference = new AtomicReference<>(); String subscriberId = UUID.randomUUID().toString(); - registerSubscriberCallback(observer, subscriberId); + Activator.getLogger().info(subscriberId); + + registerSubscriberCallbackForInterest(subscriberId, observer); Subscriber subscriber = new Subscriber<>() { @@ -207,19 +208,18 @@ public Subscription subscribe(final EventObserver observer) { public void onSubscribe(final Subscription subscription) { this.subscription = subscription; subscriptionReference.set(subscription); - this.subscription.request(1); } @Override public void onNext(final T event) { - consumptionExecutor.submit(subscriberId, event); - this.subscription.request(1); + eventConsumptionExecutor.submitEventForInterest(subscriberId, event); + subscription.request(1); } @Override public void onError(final Throwable throwable) { - return; + throwable.printStackTrace(); } @Override @@ -234,11 +234,11 @@ public void onComplete() { } public Subscription subscribe(final StreamObserver observer) { - SubmissionPublisher publisher = getPublisher(observer.getEventType()); + SubmissionPublisher publisher = getPublisherForEventType(observer.getEventType()); AtomicReference subscriptionReference = new AtomicReference<>(); String subscriberId = UUID.randomUUID().toString(); - registerSubscriberCallback(observer, subscriberId); + registerSubscriberCallbackForInterest(subscriberId, observer); Subscriber subscriber = new Subscriber<>() { @@ -248,14 +248,13 @@ public Subscription subscribe(final StreamObserver observer) { public void onSubscribe(final Subscription subscription) { this.subscription = subscription; subscriptionReference.set(subscription); - this.subscription.request(1); } @Override public void onNext(final T event) { - consumptionExecutor.submit(subscriberId, event); - this.subscription.request(1); + eventConsumptionExecutor.submitEventForInterest(subscriberId, event); + subscription.request(1); } @Override @@ -275,30 +274,31 @@ public void onComplete() { } @SuppressWarnings("unchecked") - private SubmissionPublisher getPublisher(final Class eventType) { - return (SubmissionPublisher) publishers.computeIfAbsent(eventType, + private SubmissionPublisher getPublisherForEventType(final Class eventType) { + return (SubmissionPublisher) eventTypeToPublisherMap.computeIfAbsent(eventType, key -> new SubmissionPublisher<>(Runnable::run, Flow.defaultBufferSize())); } - private void registerSubscriberCallback(final EventObserver subscriber, final String subscriberId) { - Activator.getLogger().info(subscriberId); + private void registerSubscriberCallbackForInterest(final String interestId, + final EventObserver callback) { TypedCallable eventCallback = new TypedCallable<>() { @Override - public void call(final T event) { - subscriber.onEvent(event); + public void callWith(final T event) { + callback.onEvent(event); } }; - consumptionExecutor.registerCallback(subscriberId, eventCallback); + eventConsumptionExecutor.registerCallbackForInterest(interestId, eventCallback); } - private void registerPublisherCallback(final SubmissionPublisher publisher, final String eventId) { + private void registerPublisherCallbackForInterest(final String interestId, + final SubmissionPublisher publisher) { TypedCallable eventCallback = new TypedCallable<>() { @Override - public void call(final T event) { + public void callWith(final T event) { publisher.submit(event); } }; - emissionExecutor.registerCallback(eventId, eventCallback); + eventEmissionExecutor.registerCallbackForInterest(interestId, eventCallback); } }