Skip to content

Commit

Permalink
Fixed issues with type identification
Browse files Browse the repository at this point in the history
  • Loading branch information
taldekar committed Jan 17, 2025
1 parent 968aa47 commit b915d76
Show file tree
Hide file tree
Showing 6 changed files with 127 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.Flow;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.RejectedExecutionException;
Expand All @@ -22,6 +23,7 @@

import software.aws.toolkits.eclipse.amazonq.observers.EventObserver;
import software.aws.toolkits.eclipse.amazonq.observers.StreamObserver;
import software.aws.toolkits.eclipse.amazonq.plugin.Activator;

public final class EventBroker {

Expand Down Expand Up @@ -56,10 +58,10 @@ public void rejectedExecution(final Runnable runnable, final ThreadPoolExecutor

private class OrderedThreadPoolExecutor {

private final Map<Class<?>, BlockingQueue<?>> typedEventQueue;
private final Map<Class<?>, AtomicBoolean> typedJobStatus;
private final Map<Class<?>, ReentrantLock> typedJobLock;
private final Map<Class<?>, TypedCallable<?>> typedCallback;
private final Map<String, BlockingQueue<?>> typedEventQueue;
private final Map<String, AtomicBoolean> typedJobStatus;
private final Map<String, ReentrantLock> typedJobLock;
private final Map<String, TypedCallable<?>> typedCallback;

private final BlockingQueue<Runnable> workQueue;
private final ThreadPoolExecutor executor;
Expand All @@ -79,67 +81,80 @@ private class OrderedThreadPoolExecutor {
workQueue, Executors.defaultThreadFactory(), new BlockingCallerRunsPolicy(workQueue));
}

public <T, R> void registerCallback(final Class<T> interestType, final TypedCallable<R> callback) {
typedCallback.putIfAbsent(interestType, callback);
public <T, R> void registerCallback(final String interestId, final TypedCallable<R> callback) {
typedCallback.putIfAbsent(interestId, callback);
}

public <T> boolean hasRegisteredCallback(final Class<T> interestType) {
public <T> boolean hasRegisteredCallback(final String interestType) {
return typedCallback.containsKey(interestType);
}

@SuppressWarnings("unchecked")
public <T, R> void submit(final Class<T> interestType, final R event) {
BlockingQueue<R> eventQueue = (BlockingQueue<R>) typedEventQueue.computeIfAbsent(interestType,
k -> new ArrayBlockingQueue<>(eventQueueCapacity));
eventQueue.offer(event);
public <T, R> void submit(final String interestId, final R event) {
BlockingQueue<R> eventQueue = (BlockingQueue<R>) typedEventQueue.computeIfAbsent(interestId,
k -> new ArrayBlockingQueue<>(eventQueueCapacity, true));
try {
eventQueue.put(event);
} catch (InterruptedException e) {
e.printStackTrace();
}

handleScheduling(interestType, event.getClass());
handleScheduling(interestId, (Class<R>) event.getClass(), eventQueue);
}

public <T, R> void handleScheduling(final Class<T> interestType, final Class<R> eventType) {
AtomicBoolean jobStatus = typedJobStatus.computeIfAbsent(interestType, k -> new AtomicBoolean(false));
ReentrantLock jobLock = typedJobLock.computeIfAbsent(interestType, k -> new ReentrantLock(true));

if (!jobStatus.get()) {
jobLock.lock();
public <T, R> void handleScheduling(final String interestId, final Class<R> eventType,
final BlockingQueue<R> eventQueue) {
AtomicBoolean jobStatus = typedJobStatus.computeIfAbsent(interestId, k -> new AtomicBoolean(false));
ReentrantLock jobLock = typedJobLock.computeIfAbsent(interestId, k -> new ReentrantLock(true));

try {
jobLock.lock();
try {
if (!jobStatus.get() && !eventQueue.isEmpty()) {
if (jobStatus.compareAndSet(false, true)) {
executor.submit(() -> processEventQueue(interestType, eventType));
executor.submit(() -> processEventQueue(interestId, eventType,
eventQueue, jobStatus, jobLock));
}
} finally {
jobLock.unlock();
}
} finally {
jobLock.unlock();
}
}

@SuppressWarnings("unchecked")
public <T, R> void processEventQueue(final Class<T> interestType, final Class<R> eventType) {
TypedCallable<R> eventCallback = (TypedCallable<R>) typedCallback.get(interestType);
if (eventCallback == null) {
return;
}

AtomicBoolean jobStatus = typedJobStatus.get(interestType);
ReentrantLock jobLock = typedJobLock.get(interestType);
BlockingQueue<R> eventQueue = (BlockingQueue<R>) typedEventQueue.get(interestType);

public <T, R> void processEventQueue(final String interestId, final Class<R> eventType,
final BlockingQueue<R> eventQueue, final AtomicBoolean jobStatus, final ReentrantLock jobLock) {
if (jobStatus == null || jobLock == null || eventQueue == null) {
throw new NullPointerException("ThreadPoolExecutor in unexpected state");
}

jobLock.lock();
try {
TypedCallable<R> eventCallback = (TypedCallable<R>) typedCallback.get(interestId);
if (eventCallback == null) {
return;
}

while (!eventQueue.isEmpty()) {
R newEvent = eventQueue.poll();

if (newEvent != null) {
eventCallback.call(newEvent);
while (!eventQueue.isEmpty()) {
try {
R newEvent = eventQueue.take();
if (newEvent != null) {
try {
eventCallback.call(newEvent);
} catch (Exception e) {
e.printStackTrace();
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} finally {
try {
jobStatus.set(false);
} finally {
jobLock.unlock();
}
}

jobStatus.set(false);
jobLock.unlock();
}
}

Expand All @@ -155,8 +170,8 @@ public <T, R> void processEventQueue(final Class<T> interestType, final Class<R>
private EventBroker() {
publishers = new ConcurrentHashMap<>();

emissionExecutor = new OrderedThreadPoolExecutor(10, 10, 10, 10, 10);
consumptionExecutor = new OrderedThreadPoolExecutor(10, 10, 10, 10, 10);
emissionExecutor = new OrderedThreadPoolExecutor(5, 30, 30, 10, 100000000);
consumptionExecutor = new OrderedThreadPoolExecutor(5, 30, 30, 10, 100000000);
}

public static EventBroker getInstance() {
Expand All @@ -170,23 +185,23 @@ public <T> void post(final T event) {
}

SubmissionPublisher<T> publisher = getPublisher((Class<T>) event.getClass());
if (!emissionExecutor.hasRegisteredCallback(event.getClass())) {
registerPublisherCallback(publisher, (Class<T>) event.getClass());
if (!emissionExecutor.hasRegisteredCallback((event.getClass().getName()))) {
registerPublisherCallback(publisher, event.getClass().getName());
}

emissionExecutor.submit(event.getClass(), event);
emissionExecutor.submit(event.getClass().getName(), event);
}

public <T> Subscription subscribe(final EventObserver<T> observer) {
SubmissionPublisher<T> publisher = getPublisher(observer.getEventType());
AtomicReference<Subscription> subscriptionReference = new AtomicReference<>();
Class<?> subscriberToken = createUniqueSubscriberClassToken();
String subscriberId = UUID.randomUUID().toString();

registerSubscriberCallback(observer, subscriberToken);
registerSubscriberCallback(observer, subscriberId);

Subscriber<T> subscriber = new Subscriber<>() {

private Subscription subscription;
private volatile Subscription subscription;

@Override
public void onSubscribe(final Subscription subscription) {
Expand All @@ -198,7 +213,7 @@ public void onSubscribe(final Subscription subscription) {

@Override
public void onNext(final T event) {
consumptionExecutor.submit(subscriberToken, event);
consumptionExecutor.submit(subscriberId, event);
this.subscription.request(1);
}

Expand All @@ -221,13 +236,13 @@ public void onComplete() {
public <T> Subscription subscribe(final StreamObserver<T> observer) {
SubmissionPublisher<T> publisher = getPublisher(observer.getEventType());
AtomicReference<Subscription> subscriptionReference = new AtomicReference<>();
Class<?> subscriberToken = createUniqueSubscriberClassToken();
String subscriberId = UUID.randomUUID().toString();

registerSubscriberCallback(observer, subscriberToken);
registerSubscriberCallback(observer, subscriberId);

Subscriber<T> subscriber = new Subscriber<>() {

private Subscription subscription;
private volatile Subscription subscription;

@Override
public void onSubscribe(final Subscription subscription) {
Expand All @@ -239,7 +254,7 @@ public void onSubscribe(final Subscription subscription) {

@Override
public void onNext(final T event) {
consumptionExecutor.submit(subscriberToken, event);
consumptionExecutor.submit(subscriberId, event);
this.subscription.request(1);
}

Expand All @@ -262,33 +277,28 @@ public void onComplete() {
@SuppressWarnings("unchecked")
private <T> SubmissionPublisher<T> getPublisher(final Class<T> eventType) {
return (SubmissionPublisher<T>) publishers.computeIfAbsent(eventType,
key -> new SubmissionPublisher<>());
}

private Class<?> createUniqueSubscriberClassToken() {
return new Object() {
private static final String ID = UUID.randomUUID().toString();
}.getClass();
key -> new SubmissionPublisher<>(Runnable::run, Flow.defaultBufferSize()));
}

private <T> void registerSubscriberCallback(final EventObserver<T> subscriber, final Class<?> subscriberToken) {
private <T> void registerSubscriberCallback(final EventObserver<T> subscriber, final String subscriberId) {
Activator.getLogger().info(subscriberId);
TypedCallable<T> eventCallback = new TypedCallable<>() {
@Override
public void call(final T event) {
subscriber.onEvent(event);
}
};
consumptionExecutor.registerCallback(subscriberToken, eventCallback);
consumptionExecutor.registerCallback(subscriberId, eventCallback);
}

private <T> void registerPublisherCallback(final SubmissionPublisher<T> publisher, final Class<T> eventType) {
private <T> void registerPublisherCallback(final SubmissionPublisher<T> publisher, final String eventId) {
TypedCallable<T> eventCallback = new TypedCallable<>() {
@Override
public void call(final T event) {
publisher.submit(event);
}
};
emissionExecutor.registerCallback(eventType, eventCallback);
emissionExecutor.registerCallback(eventId, eventCallback);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,19 @@

public final class TestEvent {
private final String message;
private final int sequenceNumber;

public TestEvent(final String message) {
public TestEvent(final String message, final int sequenceNumber) {
this.message = message;
this.sequenceNumber = sequenceNumber;
}

public String getMessage() {
return message;
}

public int getSequenceNumber() {
return sequenceNumber;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,27 @@

package software.aws.toolkits.eclipse.amazonq.plugin;

import java.util.ArrayList;
import java.util.List;

import org.eclipse.ui.plugin.AbstractUIPlugin;
import org.osgi.framework.BundleContext;

import software.aws.toolkits.eclipse.amazonq.broker.EventBroker;
import software.aws.toolkits.eclipse.amazonq.chat.ChatStateManager;
import software.aws.toolkits.eclipse.amazonq.configuration.DefaultPluginStore;
import software.aws.toolkits.eclipse.amazonq.configuration.PluginStore;
import software.aws.toolkits.eclipse.amazonq.lsp.auth.DefaultLoginService;
import software.aws.toolkits.eclipse.amazonq.lsp.auth.LoginService;
import software.aws.toolkits.eclipse.amazonq.providers.LspProvider;
import software.aws.toolkits.eclipse.amazonq.providers.LspProviderImpl;
import software.aws.toolkits.eclipse.amazonq.subscriber.TestSubscribers;
import software.aws.toolkits.eclipse.amazonq.telemetry.service.DefaultTelemetryService;
import software.aws.toolkits.eclipse.amazonq.telemetry.service.TelemetryService;
import software.aws.toolkits.eclipse.amazonq.util.PluginLogger;
import software.aws.toolkits.eclipse.amazonq.chat.ChatStateManager;
import software.aws.toolkits.eclipse.amazonq.util.CodeReferenceLoggingService;
import software.aws.toolkits.eclipse.amazonq.util.DefaultCodeReferenceLoggingService;
import software.aws.toolkits.eclipse.amazonq.util.LoggingService;
import software.aws.toolkits.eclipse.amazonq.util.PluginLogger;

public class Activator extends AbstractUIPlugin {

Expand All @@ -44,6 +49,14 @@ public Activator() {
.initializeOnStartUp()
.build();
codeReferenceLoggingService = DefaultCodeReferenceLoggingService.getInstance();

List<TestSubscribers> testSubscriberList = new ArrayList<>(3);

for (int i = 0; i < 3; ++i) {
TestSubscribers testSubsciber = new TestSubscribers();
testSubscriberList.add(testSubsciber);
EventBroker.getInstance().subscribe(testSubsciber);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ public TestPublisher() {
Thread.sleep(5000);
EventBroker eventBroker = EventBroker.getInstance();

for (int i = 0; i < 10; i++) {
eventBroker.post(new TestEvent("Test Event " + i));
for (int i = 0; i < 100000; i++) {
eventBroker.post(new TestEvent("Test Event " + i, i));
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright 2024 Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

package software.aws.toolkits.eclipse.amazonq.subscriber;

import software.aws.toolkits.eclipse.amazonq.events.TestEvent;
import software.aws.toolkits.eclipse.amazonq.observers.EventObserver;
import software.aws.toolkits.eclipse.amazonq.plugin.Activator;

public final class TestSubscribers implements EventObserver<TestEvent> {

private int previousSequenceNumber = -1;

public TestSubscribers() {

}

@Override
public void onEvent(final TestEvent event) {
Activator.getLogger().info(event.getMessage());

if (event.getSequenceNumber() - previousSequenceNumber != 1) {
Activator.getLogger().info("OUT OF ORDER: " + event.getSequenceNumber() + " " + previousSequenceNumber);
}

previousSequenceNumber = event.getSequenceNumber();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import org.eclipse.swt.widgets.Display;
import org.eclipse.ui.part.ViewPart;

import software.aws.toolkits.eclipse.amazonq.broker.EventBroker;
import software.aws.toolkits.eclipse.amazonq.controllers.AmazonQViewController;
import software.aws.toolkits.eclipse.amazonq.events.TestEvent;
import software.aws.toolkits.eclipse.amazonq.lsp.auth.AuthStatusChangedListener;
Expand All @@ -30,7 +29,7 @@ public abstract class AmazonQView extends ViewPart implements AuthStatusChangedL
protected AmazonQView() {
this.viewController = new AmazonQViewController();
new TestPublisher();
EventBroker.getInstance().subscribe(this);
// EventBroker.getInstance().subscribe(this);
}

public final Browser getBrowser() {
Expand Down

0 comments on commit b915d76

Please sign in to comment.