Skip to content

Commit

Permalink
Improved API and event handling
Browse files Browse the repository at this point in the history
  • Loading branch information
taldekar committed Jan 17, 2025
1 parent a0dacfe commit 968aa47
Show file tree
Hide file tree
Showing 5 changed files with 167 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,12 @@
package software.aws.toolkits.eclipse.amazonq.broker;

import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.Flow;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
Expand All @@ -21,13 +20,14 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;

import software.aws.toolkits.eclipse.amazonq.subscriber.Subscriber;
import software.aws.toolkits.eclipse.amazonq.observers.EventObserver;
import software.aws.toolkits.eclipse.amazonq.observers.StreamObserver;

public final class EventBroker {

@FunctionalInterface
private interface TypedCallable<T> {
void runCallback(T event);
void call(T event);
}

private final class BlockingCallerRunsPolicy implements RejectedExecutionHandler {
Expand All @@ -51,6 +51,7 @@ public void rejectedExecution(final Runnable runnable, final ThreadPoolExecutor
throw new RejectedExecutionException("Task " + runnable + " rejected from " + executor);
}
}

}

private class OrderedThreadPoolExecutor {
Expand Down Expand Up @@ -78,6 +79,14 @@ private class OrderedThreadPoolExecutor {
workQueue, Executors.defaultThreadFactory(), new BlockingCallerRunsPolicy(workQueue));
}

public <T, R> void registerCallback(final Class<T> interestType, final TypedCallable<R> callback) {
typedCallback.putIfAbsent(interestType, callback);
}

public <T> boolean hasRegisteredCallback(final Class<T> interestType) {
return typedCallback.containsKey(interestType);
}

@SuppressWarnings("unchecked")
public <T, R> void submit(final Class<T> interestType, final R event) {
BlockingQueue<R> eventQueue = (BlockingQueue<R>) typedEventQueue.computeIfAbsent(interestType,
Expand All @@ -87,20 +96,16 @@ public <T, R> void submit(final Class<T> interestType, final R event) {
handleScheduling(interestType, event.getClass());
}

public <T, R> void registerCallback(final Class<T> interestType, final TypedCallable<R> callback) {
typedCallback.putIfAbsent(interestType, callback);
}

public <T, R> void handleScheduling(final Class<T> interestType, final Class<R> eventType) {
AtomicBoolean jobStatus = typedJobStatus.computeIfAbsent(interestType, k -> new AtomicBoolean(false));
ReentrantLock jobLock = typedJobLock.computeIfAbsent(interestType, k -> new ReentrantLock());
ReentrantLock jobLock = typedJobLock.computeIfAbsent(interestType, k -> new ReentrantLock(true));

if (!jobStatus.get()) {
jobLock.lock();

try {
if (jobStatus.compareAndSet(false, true)) {
processEventQueue(interestType, eventType);
executor.submit(() -> processEventQueue(interestType, eventType));
}
} finally {
jobLock.unlock();
Expand All @@ -120,41 +125,35 @@ public <T, R> void processEventQueue(final Class<T> interestType, final Class<R>
BlockingQueue<R> eventQueue = (BlockingQueue<R>) typedEventQueue.get(interestType);

if (jobStatus == null || jobLock == null || eventQueue == null) {
throw new NullPointerException("ThreadPoolExecutor in unexpected state.");
throw new NullPointerException("ThreadPoolExecutor in unexpected state");
}

R newEvent = eventQueue.poll();
jobLock.lock();

if (newEvent != null) {
boolean hasJobStatusUpdated = jobStatus.compareAndSet(true, !eventQueue.isEmpty());
eventCallback.runCallback(newEvent);
while (!eventQueue.isEmpty()) {
R newEvent = eventQueue.poll();

if (!hasJobStatusUpdated) {
processEventQueue(interestType, eventType);
if (newEvent != null) {
eventCallback.call(newEvent);
}
} else {
jobStatus.set(false);
}

jobStatus.set(false);
jobLock.unlock();
}

}

private static final EventBroker INSTANCE;
private final Map<Class<?>, SubmissionPublisher<?>> publishers;
private final OrderedThreadPoolExecutor emissionExecutor;
private final OrderedThreadPoolExecutor consumptionExecutor;
private final Set<Class<?>> publishEventCallbacksRegistered;

static {
INSTANCE = new EventBroker();
}

private EventBroker() {
publishers = new ConcurrentHashMap<>();
publishEventCallbacksRegistered = ConcurrentHashMap.newKeySet();

emissionExecutor = new OrderedThreadPoolExecutor(10, 10, 10, 10, 10);
consumptionExecutor = new OrderedThreadPoolExecutor(10, 10, 10, 10, 10);
Expand All @@ -164,48 +163,74 @@ public static EventBroker getInstance() {
return INSTANCE;
}

@SuppressWarnings("unchecked")
public <T> void post(final T event) {
if (event == null) {
return;
}

@SuppressWarnings("unchecked")
SubmissionPublisher<T> publisher = getPublisher((Class<T>) event.getClass());

if (!publishEventCallbacksRegistered.contains(event.getClass())) {
TypedCallable<T> eventCallback = new TypedCallable<>() {
@Override
public void runCallback(final T event) {
publisher.submit(event);
}
};

emissionExecutor.registerCallback(event.getClass(), eventCallback);
if (!emissionExecutor.hasRegisteredCallback(event.getClass())) {
registerPublisherCallback(publisher, (Class<T>) event.getClass());
}

emissionExecutor.submit(event.getClass(), event);
}

public <T> Subscription subscribe(final Subscriber<T> subscriber) {
SubmissionPublisher<T> publisher = getPublisher(subscriber.getEventType());
public <T> Subscription subscribe(final EventObserver<T> observer) {
SubmissionPublisher<T> publisher = getPublisher(observer.getEventType());
AtomicReference<Subscription> subscriptionReference = new AtomicReference<>();
Class<?> customSubscriberToken = createUniqueSubscriberClassToken();
Class<?> subscriberToken = createUniqueSubscriberClassToken();

registerSubscriberCallback(observer, subscriberToken);

Subscriber<T> subscriber = new Subscriber<>() {

private Subscription subscription;

@Override
public void onSubscribe(final Subscription subscription) {
this.subscription = subscription;
subscriptionReference.set(subscription);

this.subscription.request(1);
}

@Override
public void onNext(final T event) {
consumptionExecutor.submit(subscriberToken, event);
this.subscription.request(1);
}

@Override
public void onError(final Throwable throwable) {
return;
}

TypedCallable<T> eventCallback = new TypedCallable<>() {
@Override
public void runCallback(final T event) {
subscriber.handleEvent(event);
public void onComplete() {
return;
}

};
consumptionExecutor.registerCallback(customSubscriberToken, eventCallback);

java.util.concurrent.Flow.Subscriber<T> subscriberWrapper = new java.util.concurrent.Flow.Subscriber<>() {
publisher.subscribe(subscriber);
return subscriptionReference.get();
}

public <T> Subscription subscribe(final StreamObserver<T> observer) {
SubmissionPublisher<T> publisher = getPublisher(observer.getEventType());
AtomicReference<Subscription> subscriptionReference = new AtomicReference<>();
Class<?> subscriberToken = createUniqueSubscriberClassToken();

private java.util.concurrent.Flow.Subscription subscription;
private Class<?> subscriberToken = customSubscriberToken;
registerSubscriberCallback(observer, subscriberToken);

Subscriber<T> subscriber = new Subscriber<>() {

private Subscription subscription;

@Override
public void onSubscribe(final java.util.concurrent.Flow.Subscription subscription) {
public void onSubscribe(final Subscription subscription) {
this.subscription = subscription;
subscriptionReference.set(subscription);

Expand All @@ -220,29 +245,50 @@ public void onNext(final T event) {

@Override
public void onError(final Throwable throwable) {
subscriber.handleError(throwable);
observer.onError(throwable);
}

@Override
public void onComplete() {
// TODO: add if required
observer.onComplete();
}

};

publisher.subscribe(subscriberWrapper);
publisher.subscribe(subscriber);
return subscriptionReference.get();
}

@SuppressWarnings("unchecked")
private <T> SubmissionPublisher<T> getPublisher(final Class<T> eventType) {
return (SubmissionPublisher<T>) publishers.computeIfAbsent(eventType,
key -> new SubmissionPublisher<>(Executors.newSingleThreadExecutor(), Flow.defaultBufferSize()));
key -> new SubmissionPublisher<>());
}

private Class<?> createUniqueSubscriberClassToken() {
return new Object() {
private static final String ID = UUID.randomUUID().toString();
}.getClass();
}

private <T> void registerSubscriberCallback(final EventObserver<T> subscriber, final Class<?> subscriberToken) {
TypedCallable<T> eventCallback = new TypedCallable<>() {
@Override
public void call(final T event) {
subscriber.onEvent(event);
}
};
consumptionExecutor.registerCallback(subscriberToken, eventCallback);
}

private <T> void registerPublisherCallback(final SubmissionPublisher<T> publisher, final Class<T> eventType) {
TypedCallable<T> eventCallback = new TypedCallable<>() {
@Override
public void call(final T event) {
publisher.submit(event);
}
};
emissionExecutor.registerCallback(eventType, eventCallback);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright 2024 Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

package software.aws.toolkits.eclipse.amazonq.observers;

import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;

public interface EventObserver<T> {

@SuppressWarnings("unchecked")
default Class<T> getEventType() {
Class<?> currentClass = getClass();
while (currentClass != null) {
for (Type type : currentClass.getGenericInterfaces()) {
if (type instanceof ParameterizedType paramType && (paramType.getRawType() == EventObserver.class)) {
Type typeArg = paramType.getActualTypeArguments()[0];
if (typeArg instanceof Class<?>) {
return (Class<T>) typeArg;
}
throw new IllegalStateException("Generic type parameter is not a Class");
}
}
currentClass = currentClass.getSuperclass();
}
throw new IllegalStateException("Could not determine generic type");
}

void onEvent(T event);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2024 Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

package software.aws.toolkits.eclipse.amazonq.observers;

import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;

public interface StreamObserver<T> extends EventObserver<T> {

@Override
@SuppressWarnings("unchecked")
default Class<T> getEventType() {
Class<?> currentClass = getClass();
while (currentClass != null) {
for (Type type : currentClass.getGenericInterfaces()) {
if (type instanceof ParameterizedType paramType && (paramType.getRawType() == StreamObserver.class)) {
Type typeArg = paramType.getActualTypeArguments()[0];
if (typeArg instanceof Class<?>) {
return (Class<T>) typeArg;
}
throw new IllegalStateException("Generic type parameter is not a Class");
}
}
currentClass = currentClass.getSuperclass();
}
throw new IllegalStateException("Could not determine generic type");
}

void onError(Throwable error);
void onComplete();

}

This file was deleted.

Loading

0 comments on commit 968aa47

Please sign in to comment.