diff --git a/plugin/src/software/aws/toolkits/eclipse/amazonq/broker/EventBroker.java b/plugin/src/software/aws/toolkits/eclipse/amazonq/broker/EventBroker.java index 5dd2940b..c9f34772 100644 --- a/plugin/src/software/aws/toolkits/eclipse/amazonq/broker/EventBroker.java +++ b/plugin/src/software/aws/toolkits/eclipse/amazonq/broker/EventBroker.java @@ -9,6 +9,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; +import java.util.concurrent.Flow; import java.util.concurrent.Flow.Subscriber; import java.util.concurrent.Flow.Subscription; import java.util.concurrent.RejectedExecutionException; @@ -22,6 +23,7 @@ import software.aws.toolkits.eclipse.amazonq.observers.EventObserver; import software.aws.toolkits.eclipse.amazonq.observers.StreamObserver; +import software.aws.toolkits.eclipse.amazonq.plugin.Activator; public final class EventBroker { @@ -56,10 +58,10 @@ public void rejectedExecution(final Runnable runnable, final ThreadPoolExecutor private class OrderedThreadPoolExecutor { - private final Map, BlockingQueue> typedEventQueue; - private final Map, AtomicBoolean> typedJobStatus; - private final Map, ReentrantLock> typedJobLock; - private final Map, TypedCallable> typedCallback; + private final Map> typedEventQueue; + private final Map typedJobStatus; + private final Map typedJobLock; + private final Map> typedCallback; private final BlockingQueue workQueue; private final ThreadPoolExecutor executor; @@ -79,67 +81,80 @@ private class OrderedThreadPoolExecutor { workQueue, Executors.defaultThreadFactory(), new BlockingCallerRunsPolicy(workQueue)); } - public void registerCallback(final Class interestType, final TypedCallable callback) { - typedCallback.putIfAbsent(interestType, callback); + public void registerCallback(final String interestId, final TypedCallable callback) { + typedCallback.putIfAbsent(interestId, callback); } - public boolean hasRegisteredCallback(final Class interestType) { + public boolean hasRegisteredCallback(final String interestType) { return typedCallback.containsKey(interestType); } @SuppressWarnings("unchecked") - public void submit(final Class interestType, final R event) { - BlockingQueue eventQueue = (BlockingQueue) typedEventQueue.computeIfAbsent(interestType, - k -> new ArrayBlockingQueue<>(eventQueueCapacity)); - eventQueue.offer(event); + public void submit(final String interestId, final R event) { + BlockingQueue eventQueue = (BlockingQueue) typedEventQueue.computeIfAbsent(interestId, + k -> new ArrayBlockingQueue<>(eventQueueCapacity, true)); + try { + eventQueue.put(event); + } catch (InterruptedException e) { + e.printStackTrace(); + } - handleScheduling(interestType, event.getClass()); + handleScheduling(interestId, (Class) event.getClass(), eventQueue); } - public void handleScheduling(final Class interestType, final Class eventType) { - AtomicBoolean jobStatus = typedJobStatus.computeIfAbsent(interestType, k -> new AtomicBoolean(false)); - ReentrantLock jobLock = typedJobLock.computeIfAbsent(interestType, k -> new ReentrantLock(true)); - - if (!jobStatus.get()) { - jobLock.lock(); + public void handleScheduling(final String interestId, final Class eventType, + final BlockingQueue eventQueue) { + AtomicBoolean jobStatus = typedJobStatus.computeIfAbsent(interestId, k -> new AtomicBoolean(false)); + ReentrantLock jobLock = typedJobLock.computeIfAbsent(interestId, k -> new ReentrantLock(true)); - try { + jobLock.lock(); + try { + if (!jobStatus.get() && !eventQueue.isEmpty()) { if (jobStatus.compareAndSet(false, true)) { - executor.submit(() -> processEventQueue(interestType, eventType)); + executor.submit(() -> processEventQueue(interestId, eventType, + eventQueue, jobStatus, jobLock)); } - } finally { - jobLock.unlock(); } + } finally { + jobLock.unlock(); } } @SuppressWarnings("unchecked") - public void processEventQueue(final Class interestType, final Class eventType) { - TypedCallable eventCallback = (TypedCallable) typedCallback.get(interestType); - if (eventCallback == null) { - return; - } - - AtomicBoolean jobStatus = typedJobStatus.get(interestType); - ReentrantLock jobLock = typedJobLock.get(interestType); - BlockingQueue eventQueue = (BlockingQueue) typedEventQueue.get(interestType); - + public void processEventQueue(final String interestId, final Class eventType, + final BlockingQueue eventQueue, final AtomicBoolean jobStatus, final ReentrantLock jobLock) { if (jobStatus == null || jobLock == null || eventQueue == null) { throw new NullPointerException("ThreadPoolExecutor in unexpected state"); } jobLock.lock(); + try { + TypedCallable eventCallback = (TypedCallable) typedCallback.get(interestId); + if (eventCallback == null) { + return; + } - while (!eventQueue.isEmpty()) { - R newEvent = eventQueue.poll(); - - if (newEvent != null) { - eventCallback.call(newEvent); + while (!eventQueue.isEmpty()) { + try { + R newEvent = eventQueue.take(); + if (newEvent != null) { + try { + eventCallback.call(newEvent); + } catch (Exception e) { + e.printStackTrace(); + } + } + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } finally { + try { + jobStatus.set(false); + } finally { + jobLock.unlock(); } } - - jobStatus.set(false); - jobLock.unlock(); } } @@ -155,8 +170,8 @@ public void processEventQueue(final Class interestType, final Class private EventBroker() { publishers = new ConcurrentHashMap<>(); - emissionExecutor = new OrderedThreadPoolExecutor(10, 10, 10, 10, 10); - consumptionExecutor = new OrderedThreadPoolExecutor(10, 10, 10, 10, 10); + emissionExecutor = new OrderedThreadPoolExecutor(5, 30, 30, 10, 100000000); + consumptionExecutor = new OrderedThreadPoolExecutor(5, 30, 30, 10, 100000000); } public static EventBroker getInstance() { @@ -170,23 +185,23 @@ public void post(final T event) { } SubmissionPublisher publisher = getPublisher((Class) event.getClass()); - if (!emissionExecutor.hasRegisteredCallback(event.getClass())) { - registerPublisherCallback(publisher, (Class) event.getClass()); + if (!emissionExecutor.hasRegisteredCallback((event.getClass().getName()))) { + registerPublisherCallback(publisher, event.getClass().getName()); } - emissionExecutor.submit(event.getClass(), event); + emissionExecutor.submit(event.getClass().getName(), event); } public Subscription subscribe(final EventObserver observer) { SubmissionPublisher publisher = getPublisher(observer.getEventType()); AtomicReference subscriptionReference = new AtomicReference<>(); - Class subscriberToken = createUniqueSubscriberClassToken(); + String subscriberId = UUID.randomUUID().toString(); - registerSubscriberCallback(observer, subscriberToken); + registerSubscriberCallback(observer, subscriberId); Subscriber subscriber = new Subscriber<>() { - private Subscription subscription; + private volatile Subscription subscription; @Override public void onSubscribe(final Subscription subscription) { @@ -198,7 +213,7 @@ public void onSubscribe(final Subscription subscription) { @Override public void onNext(final T event) { - consumptionExecutor.submit(subscriberToken, event); + consumptionExecutor.submit(subscriberId, event); this.subscription.request(1); } @@ -221,13 +236,13 @@ public void onComplete() { public Subscription subscribe(final StreamObserver observer) { SubmissionPublisher publisher = getPublisher(observer.getEventType()); AtomicReference subscriptionReference = new AtomicReference<>(); - Class subscriberToken = createUniqueSubscriberClassToken(); + String subscriberId = UUID.randomUUID().toString(); - registerSubscriberCallback(observer, subscriberToken); + registerSubscriberCallback(observer, subscriberId); Subscriber subscriber = new Subscriber<>() { - private Subscription subscription; + private volatile Subscription subscription; @Override public void onSubscribe(final Subscription subscription) { @@ -239,7 +254,7 @@ public void onSubscribe(final Subscription subscription) { @Override public void onNext(final T event) { - consumptionExecutor.submit(subscriberToken, event); + consumptionExecutor.submit(subscriberId, event); this.subscription.request(1); } @@ -262,33 +277,28 @@ public void onComplete() { @SuppressWarnings("unchecked") private SubmissionPublisher getPublisher(final Class eventType) { return (SubmissionPublisher) publishers.computeIfAbsent(eventType, - key -> new SubmissionPublisher<>()); - } - - private Class createUniqueSubscriberClassToken() { - return new Object() { - private static final String ID = UUID.randomUUID().toString(); - }.getClass(); + key -> new SubmissionPublisher<>(Runnable::run, Flow.defaultBufferSize())); } - private void registerSubscriberCallback(final EventObserver subscriber, final Class subscriberToken) { + private void registerSubscriberCallback(final EventObserver subscriber, final String subscriberId) { + Activator.getLogger().info(subscriberId); TypedCallable eventCallback = new TypedCallable<>() { @Override public void call(final T event) { subscriber.onEvent(event); } }; - consumptionExecutor.registerCallback(subscriberToken, eventCallback); + consumptionExecutor.registerCallback(subscriberId, eventCallback); } - private void registerPublisherCallback(final SubmissionPublisher publisher, final Class eventType) { + private void registerPublisherCallback(final SubmissionPublisher publisher, final String eventId) { TypedCallable eventCallback = new TypedCallable<>() { @Override public void call(final T event) { publisher.submit(event); } }; - emissionExecutor.registerCallback(eventType, eventCallback); + emissionExecutor.registerCallback(eventId, eventCallback); } } diff --git a/plugin/src/software/aws/toolkits/eclipse/amazonq/events/TestEvent.java b/plugin/src/software/aws/toolkits/eclipse/amazonq/events/TestEvent.java index 11e938ca..3366b1c6 100644 --- a/plugin/src/software/aws/toolkits/eclipse/amazonq/events/TestEvent.java +++ b/plugin/src/software/aws/toolkits/eclipse/amazonq/events/TestEvent.java @@ -5,13 +5,19 @@ public final class TestEvent { private final String message; + private final int sequenceNumber; - public TestEvent(final String message) { + public TestEvent(final String message, final int sequenceNumber) { this.message = message; + this.sequenceNumber = sequenceNumber; } public String getMessage() { return message; } + public int getSequenceNumber() { + return sequenceNumber; + } + } diff --git a/plugin/src/software/aws/toolkits/eclipse/amazonq/plugin/Activator.java b/plugin/src/software/aws/toolkits/eclipse/amazonq/plugin/Activator.java index 827081d9..398c4680 100644 --- a/plugin/src/software/aws/toolkits/eclipse/amazonq/plugin/Activator.java +++ b/plugin/src/software/aws/toolkits/eclipse/amazonq/plugin/Activator.java @@ -3,22 +3,27 @@ package software.aws.toolkits.eclipse.amazonq.plugin; +import java.util.ArrayList; +import java.util.List; + import org.eclipse.ui.plugin.AbstractUIPlugin; import org.osgi.framework.BundleContext; +import software.aws.toolkits.eclipse.amazonq.broker.EventBroker; +import software.aws.toolkits.eclipse.amazonq.chat.ChatStateManager; import software.aws.toolkits.eclipse.amazonq.configuration.DefaultPluginStore; import software.aws.toolkits.eclipse.amazonq.configuration.PluginStore; import software.aws.toolkits.eclipse.amazonq.lsp.auth.DefaultLoginService; import software.aws.toolkits.eclipse.amazonq.lsp.auth.LoginService; import software.aws.toolkits.eclipse.amazonq.providers.LspProvider; import software.aws.toolkits.eclipse.amazonq.providers.LspProviderImpl; +import software.aws.toolkits.eclipse.amazonq.subscriber.TestSubscribers; import software.aws.toolkits.eclipse.amazonq.telemetry.service.DefaultTelemetryService; import software.aws.toolkits.eclipse.amazonq.telemetry.service.TelemetryService; -import software.aws.toolkits.eclipse.amazonq.util.PluginLogger; -import software.aws.toolkits.eclipse.amazonq.chat.ChatStateManager; import software.aws.toolkits.eclipse.amazonq.util.CodeReferenceLoggingService; import software.aws.toolkits.eclipse.amazonq.util.DefaultCodeReferenceLoggingService; import software.aws.toolkits.eclipse.amazonq.util.LoggingService; +import software.aws.toolkits.eclipse.amazonq.util.PluginLogger; public class Activator extends AbstractUIPlugin { @@ -44,6 +49,14 @@ public Activator() { .initializeOnStartUp() .build(); codeReferenceLoggingService = DefaultCodeReferenceLoggingService.getInstance(); + + List testSubscriberList = new ArrayList<>(3); + + for (int i = 0; i < 3; ++i) { + TestSubscribers testSubsciber = new TestSubscribers(); + testSubscriberList.add(testSubsciber); + EventBroker.getInstance().subscribe(testSubsciber); + } } @Override diff --git a/plugin/src/software/aws/toolkits/eclipse/amazonq/publishers/TestPublisher.java b/plugin/src/software/aws/toolkits/eclipse/amazonq/publishers/TestPublisher.java index 88743528..ef38577e 100644 --- a/plugin/src/software/aws/toolkits/eclipse/amazonq/publishers/TestPublisher.java +++ b/plugin/src/software/aws/toolkits/eclipse/amazonq/publishers/TestPublisher.java @@ -14,8 +14,8 @@ public TestPublisher() { Thread.sleep(5000); EventBroker eventBroker = EventBroker.getInstance(); - for (int i = 0; i < 10; i++) { - eventBroker.post(new TestEvent("Test Event " + i)); + for (int i = 0; i < 100000; i++) { + eventBroker.post(new TestEvent("Test Event " + i, i)); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); diff --git a/plugin/src/software/aws/toolkits/eclipse/amazonq/subscriber/TestSubscribers.java b/plugin/src/software/aws/toolkits/eclipse/amazonq/subscriber/TestSubscribers.java new file mode 100644 index 00000000..02b6a7c9 --- /dev/null +++ b/plugin/src/software/aws/toolkits/eclipse/amazonq/subscriber/TestSubscribers.java @@ -0,0 +1,28 @@ +// Copyright 2024 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +package software.aws.toolkits.eclipse.amazonq.subscriber; + +import software.aws.toolkits.eclipse.amazonq.events.TestEvent; +import software.aws.toolkits.eclipse.amazonq.observers.EventObserver; +import software.aws.toolkits.eclipse.amazonq.plugin.Activator; + +public final class TestSubscribers implements EventObserver { + + private int previousSequenceNumber = -1; + + public TestSubscribers() { + + } + + @Override + public void onEvent(final TestEvent event) { + Activator.getLogger().info(event.getMessage()); + + if (event.getSequenceNumber() - previousSequenceNumber != 1) { + Activator.getLogger().info("OUT OF ORDER: " + event.getSequenceNumber() + " " + previousSequenceNumber); + } + + previousSequenceNumber = event.getSequenceNumber(); + } +} diff --git a/plugin/src/software/aws/toolkits/eclipse/amazonq/views/AmazonQView.java b/plugin/src/software/aws/toolkits/eclipse/amazonq/views/AmazonQView.java index 52bcf65d..6bb9844c 100644 --- a/plugin/src/software/aws/toolkits/eclipse/amazonq/views/AmazonQView.java +++ b/plugin/src/software/aws/toolkits/eclipse/amazonq/views/AmazonQView.java @@ -9,7 +9,6 @@ import org.eclipse.swt.widgets.Display; import org.eclipse.ui.part.ViewPart; -import software.aws.toolkits.eclipse.amazonq.broker.EventBroker; import software.aws.toolkits.eclipse.amazonq.controllers.AmazonQViewController; import software.aws.toolkits.eclipse.amazonq.events.TestEvent; import software.aws.toolkits.eclipse.amazonq.lsp.auth.AuthStatusChangedListener; @@ -30,7 +29,7 @@ public abstract class AmazonQView extends ViewPart implements AuthStatusChangedL protected AmazonQView() { this.viewController = new AmazonQViewController(); new TestPublisher(); - EventBroker.getInstance().subscribe(this); +// EventBroker.getInstance().subscribe(this); } public final Browser getBrowser() {