Skip to content

Commit

Permalink
Simple event bus in native Java
Browse files Browse the repository at this point in the history
  • Loading branch information
taldekar committed Jan 15, 2025
1 parent ead29a6 commit 5d59e9d
Show file tree
Hide file tree
Showing 5 changed files with 162 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Copyright 2024 Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

package software.aws.toolkits.eclipse.amazonq.broker;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.atomic.AtomicReference;

import software.aws.toolkits.eclipse.amazonq.subscriber.Subscriber;

public final class EventBroker {

private static final EventBroker INSTANCE;
private final Map<Class<?>, SubmissionPublisher<?>> publishers;

static {
INSTANCE = new EventBroker();
}

private EventBroker() {
publishers = new ConcurrentHashMap<>();
}

public static EventBroker getInstance() {
return INSTANCE;
}

public <T> void post(final T event) {
if (event == null) {
return;
}

@SuppressWarnings("unchecked")
SubmissionPublisher<T> publisher = (SubmissionPublisher<T>) getPublisher(event.getClass());
publisher.submit(event);
}

public <T> Subscription subscribe(final Subscriber<T> subscriber) {
SubmissionPublisher<T> publisher = getPublisher(subscriber.getSubscriptionEventClass());
AtomicReference<Subscription> subscriptionReference = new AtomicReference<>();

java.util.concurrent.Flow.Subscriber<T> subscriberWrapper = new java.util.concurrent.Flow.Subscriber<>() {
private java.util.concurrent.Flow.Subscription subscription;

@Override
public void onSubscribe(final java.util.concurrent.Flow.Subscription subscription) {
this.subscription = subscription;
subscriptionReference.set(subscription);
this.subscription.request(1);
}

@Override
public void onNext(final T event) {
subscriber.handleEvent(event);
this.subscription.request(1);
}

@Override
public void onError(final Throwable throwable) {
subscriber.handleError(throwable);
}

@Override
public void onComplete() {
// TODO: add if required
}
};

publisher.subscribe(subscriberWrapper);
return subscriptionReference.get();
}

@SuppressWarnings("unchecked")
private <T> SubmissionPublisher<T> getPublisher(final Class<T> eventType) {
return (SubmissionPublisher<T>) publishers.computeIfAbsent(eventType,
key -> new SubmissionPublisher<>());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Copyright 2024 Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

package software.aws.toolkits.eclipse.amazonq.events;

public final class TestEvent {
private final String message;

public TestEvent(final String message) {
this.message = message;
}

public String getMessage() {
return message;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright 2024 Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

package software.aws.toolkits.eclipse.amazonq.publishers;

import software.aws.toolkits.eclipse.amazonq.broker.EventBroker;
import software.aws.toolkits.eclipse.amazonq.events.TestEvent;

public final class TestPublisher {

public TestPublisher() {
Thread publisherThread = new Thread(() -> {
try {
Thread.sleep(5000);
EventBroker eventBroker = EventBroker.getInstance();

for (int i = 0; i < 10; i++) {
eventBroker.post(new TestEvent("Test Event " + i));
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, "TestPublisher-Thread");

publisherThread.start();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
// Copyright 2024 Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

package software.aws.toolkits.eclipse.amazonq.subscriber;

public interface Subscriber<T> {

Class<T> getSubscriptionEventClass();

void handleEvent(T event);
void handleError(Throwable error);

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,28 @@
import org.eclipse.swt.widgets.Display;
import org.eclipse.ui.part.ViewPart;

import software.aws.toolkits.eclipse.amazonq.broker.EventBroker;
import software.aws.toolkits.eclipse.amazonq.controllers.AmazonQViewController;
import software.aws.toolkits.eclipse.amazonq.events.TestEvent;
import software.aws.toolkits.eclipse.amazonq.lsp.auth.AuthStatusChangedListener;
import software.aws.toolkits.eclipse.amazonq.lsp.auth.AuthStatusProvider;
import software.aws.toolkits.eclipse.amazonq.lsp.auth.model.AuthState;
import software.aws.toolkits.eclipse.amazonq.plugin.Activator;
import software.aws.toolkits.eclipse.amazonq.publishers.TestPublisher;
import software.aws.toolkits.eclipse.amazonq.subscriber.Subscriber;
import software.aws.toolkits.eclipse.amazonq.util.ThemeDetector;
import software.aws.toolkits.eclipse.amazonq.views.actions.AmazonQCommonActions;

public abstract class AmazonQView extends ViewPart implements AuthStatusChangedListener {
public abstract class AmazonQView extends ViewPart implements AuthStatusChangedListener, Subscriber<TestEvent> {

private AmazonQViewController viewController;
private AmazonQCommonActions amazonQCommonActions;
private static final ThemeDetector THEME_DETECTOR = new ThemeDetector();

protected AmazonQView() {
this.viewController = new AmazonQViewController();
new TestPublisher();
EventBroker.getInstance().subscribe(this);
}

public final Browser getBrowser() {
Expand Down Expand Up @@ -127,4 +133,19 @@ public void dispose() {
super.dispose();
}

@Override
public final Class<TestEvent> getSubscriptionEventClass() {
return TestEvent.class;
}

@Override
public final void handleEvent(final TestEvent event) {
Activator.getLogger().info(event.getMessage());
}

@Override
public final void handleError(final Throwable error) {
error.printStackTrace();
}

}

0 comments on commit 5d59e9d

Please sign in to comment.