Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Event Bus in Native Java #324

Closed
wants to merge 16 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Copyright 2024 Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

package software.aws.toolkits.eclipse.amazonq.events;

public final class TestEvent {
private final String message;
private final int sequenceNumber;

public TestEvent(final String message, final int sequenceNumber) {
this.message = message;
this.sequenceNumber = sequenceNumber;
}

public String getMessage() {
return message;
}

public int getSequenceNumber() {
return sequenceNumber;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Flow.Subscription;

import org.eclipse.swt.widgets.Display;
import org.eclipse.ui.AbstractSourceProvider;
Expand All @@ -13,7 +14,9 @@
import org.eclipse.ui.PlatformUI;
import org.eclipse.ui.services.ISourceProviderService;

import software.aws.toolkits.eclipse.amazonq.broker.EventBroker;
import software.aws.toolkits.eclipse.amazonq.lsp.auth.model.AuthState;
import software.aws.toolkits.eclipse.amazonq.observers.EventObserver;
import software.aws.toolkits.eclipse.amazonq.plugin.Activator;

/**
Expand All @@ -30,12 +33,13 @@
* @see software.aws.toolkits.eclipse.amazonq.toolbar
* @see software.aws.toolkits.eclipse.amazonq.toolbar-unauthenticated
*/
public final class AuthSourceProvider extends AbstractSourceProvider implements AuthStatusChangedListener {
public final class AuthSourceProvider extends AbstractSourceProvider implements EventObserver<AuthState> {
public static final String IS_LOGGED_IN_VARIABLE_ID = "is_logged_in";
private boolean isLoggedIn = false;
private Subscription authStateSubscription;

public AuthSourceProvider() {
AuthStatusProvider.addAuthStatusChangeListener(this);
authStateSubscription = EventBroker.getInstance().subscribe(this);
isLoggedIn = Activator.getLoginService().getAuthState().isLoggedIn();
}

Expand All @@ -53,8 +57,7 @@ public void dispose() {

// Notify listeners that this provider is being disposed
fireSourceChanged(ISources.WORKBENCH, IS_LOGGED_IN_VARIABLE_ID, null);

AuthStatusProvider.removeAuthStatusChangeListener(this);
authStateSubscription.cancel();
}

@Override
Expand All @@ -69,15 +72,15 @@ public void setIsLoggedIn(final Boolean isLoggedIn) {

public static AuthSourceProvider getProvider() {
IWorkbench workbench = PlatformUI.getWorkbench();
ISourceProviderService sourceProviderService = (ISourceProviderService) workbench
ISourceProviderService sourceProviderService = workbench
.getService(ISourceProviderService.class);
AuthSourceProvider provider = (AuthSourceProvider) sourceProviderService
.getSourceProvider(AuthSourceProvider.IS_LOGGED_IN_VARIABLE_ID);
return provider;
}

@Override
public void onAuthStatusChanged(final AuthState authState) {
public void onEvent(final AuthState authState) {
boolean isLoggedIn = authState.isLoggedIn();
Display.getDefault().asyncExec(() -> {
setIsLoggedIn(isLoggedIn);
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

package software.aws.toolkits.eclipse.amazonq.lsp.auth;

import software.aws.toolkits.eclipse.amazonq.broker.EventBroker;
import software.aws.toolkits.eclipse.amazonq.configuration.PluginStore;
import software.aws.toolkits.eclipse.amazonq.lsp.auth.model.AuthState;
import software.aws.toolkits.eclipse.amazonq.lsp.auth.model.AuthStateType;
Expand Down Expand Up @@ -119,7 +120,7 @@ private void updateState(final AuthStateType authStatusType, final LoginType log
* This notification is critical for ensuring all plugin components reflect the current
* authentication state.
*/
AuthStatusProvider.notifyAuthStatusChanged(getAuthState());
EventBroker.getInstance().post(getAuthState());
}

private void syncAuthStateWithPluginStore() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright 2024 Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

package software.aws.toolkits.eclipse.amazonq.observers;

import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;

public interface EventObserver<T> {

@SuppressWarnings("unchecked")
default Class<T> getEventType() {
Class<?> currentClass = getClass();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there any references that you took inspiration from to compose this class? If yes, those would be useful to link in the description here

Copy link
Contributor Author

@taldekar taldekar Jan 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I used stack overflow for this, but it wasn't exactly this. I can try to find it. It think it was this one: https://stackoverflow.com/questions/3437897/how-do-i-get-a-class-instance-of-generic-type-t. I'll add it to the code.

while (currentClass != null) {
for (Type type : currentClass.getGenericInterfaces()) {
if (type instanceof ParameterizedType paramType && (paramType.getRawType() == EventObserver.class)) {
Type typeArg = paramType.getActualTypeArguments()[0];
if (typeArg instanceof Class<?>) {
return (Class<T>) typeArg;
}
throw new IllegalStateException("Generic type parameter is not a Class");
}
}
currentClass = currentClass.getSuperclass();
}
throw new IllegalStateException("Could not determine generic type");
}

void onEvent(T event);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2024 Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

package software.aws.toolkits.eclipse.amazonq.observers;

import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;

public interface StreamObserver<T> extends EventObserver<T> {

@Override
@SuppressWarnings("unchecked")
default Class<T> getEventType() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the difference between StreamObserver and EventObserver? I see a few new functions in this class but the getEventType looks the same

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The EventObserver is the base bones version of a subscriber that only cares about events on the data channel. The StreamObserver extends this functionality with hooks for when the stream errors out or completes. The interfaces were introduced because the standard subscriber method onNext wasn't descriptive or I felt like it didn't communicate that we were listening to/handling events. Also we'd be forced to also implement the onError and onComplete methods everytime.

Class<?> currentClass = getClass();
while (currentClass != null) {
for (Type type : currentClass.getGenericInterfaces()) {
if (type instanceof ParameterizedType paramType && (paramType.getRawType() == StreamObserver.class)) {
Type typeArg = paramType.getActualTypeArguments()[0];
if (typeArg instanceof Class<?>) {
return (Class<T>) typeArg;
}
throw new IllegalStateException("Generic type parameter is not a Class");
}
}
currentClass = currentClass.getSuperclass();
}
throw new IllegalStateException("Could not determine generic type");
}

void onError(Throwable error);
void onComplete();

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,28 @@

package software.aws.toolkits.eclipse.amazonq.plugin;

import java.util.ArrayList;
import java.util.List;

import org.eclipse.ui.plugin.AbstractUIPlugin;
import org.osgi.framework.BundleContext;

import software.aws.toolkits.eclipse.amazonq.broker.EventBroker;
import software.aws.toolkits.eclipse.amazonq.chat.ChatStateManager;
import software.aws.toolkits.eclipse.amazonq.configuration.DefaultPluginStore;
import software.aws.toolkits.eclipse.amazonq.configuration.PluginStore;
import software.aws.toolkits.eclipse.amazonq.lsp.auth.DefaultLoginService;
import software.aws.toolkits.eclipse.amazonq.lsp.auth.LoginService;
import software.aws.toolkits.eclipse.amazonq.providers.LspProvider;
import software.aws.toolkits.eclipse.amazonq.providers.LspProviderImpl;
import software.aws.toolkits.eclipse.amazonq.publishers.TestPublisher;
import software.aws.toolkits.eclipse.amazonq.subscriber.TestSubscribers;
import software.aws.toolkits.eclipse.amazonq.telemetry.service.DefaultTelemetryService;
import software.aws.toolkits.eclipse.amazonq.telemetry.service.TelemetryService;
import software.aws.toolkits.eclipse.amazonq.util.PluginLogger;
import software.aws.toolkits.eclipse.amazonq.chat.ChatStateManager;
import software.aws.toolkits.eclipse.amazonq.util.CodeReferenceLoggingService;
import software.aws.toolkits.eclipse.amazonq.util.DefaultCodeReferenceLoggingService;
import software.aws.toolkits.eclipse.amazonq.util.LoggingService;
import software.aws.toolkits.eclipse.amazonq.util.PluginLogger;

public class Activator extends AbstractUIPlugin {

Expand All @@ -44,6 +50,16 @@ public Activator() {
.initializeOnStartUp()
.build();
codeReferenceLoggingService = DefaultCodeReferenceLoggingService.getInstance();

List<TestSubscribers> testSubscriberList = new ArrayList<>(3);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we expect Activator to be the source of truth for event broker subscriptions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, this was just to test that the event bus is working. Subscriptions can come from anywhere in the codebase.


for (int i = 0; i < 3; ++i) {
TestSubscribers testSubsciber = new TestSubscribers();
testSubscriberList.add(testSubsciber);
EventBroker.getInstance().subscribe(testSubsciber);
}

new TestPublisher();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright 2024 Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

package software.aws.toolkits.eclipse.amazonq.publishers;

import software.aws.toolkits.eclipse.amazonq.broker.EventBroker;
import software.aws.toolkits.eclipse.amazonq.events.TestEvent;

public final class TestPublisher {

public TestPublisher() {
Thread publisherThread = new Thread(() -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we not map this to the auth events that get posted?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have separate publishers for auth events. This was just to demo that the event bus was performant and could deliver events in the order they were posted.

try {
Thread.sleep(5000);
EventBroker eventBroker = EventBroker.getInstance();

for (int i = 0; i < 100000; i++) {
eventBroker.post(new TestEvent("Test Event " + i, i));
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, "TestPublisher-Thread");

publisherThread.start();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright 2024 Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

package software.aws.toolkits.eclipse.amazonq.subscriber;

import software.aws.toolkits.eclipse.amazonq.events.TestEvent;
import software.aws.toolkits.eclipse.amazonq.observers.EventObserver;
import software.aws.toolkits.eclipse.amazonq.plugin.Activator;

public final class TestSubscribers implements EventObserver<TestEvent> {

private int previousSequenceNumber = -1;

public TestSubscribers() {

}

@Override
public void onEvent(final TestEvent event) {
// Activator.getLogger().info(event.getMessage());

if (event.getSequenceNumber() - previousSequenceNumber != 1) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am unclear on the significance of the sequence number here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The sequence numbers are more for testing the POC on my end. They helped ensure that the events arrived in the order in which they were published to all subscribers.

Activator.getLogger().info("OUT OF ORDER: " + event.getSequenceNumber() + " " + previousSequenceNumber);
}

previousSequenceNumber = event.getSequenceNumber();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public void completed(final ProgressEvent event) {
});

// Check if user is authenticated and build view accordingly
onAuthStatusChanged(authState);
onEvent(authState);
}

private Browser getAndUpdateStateManager() {
Expand All @@ -127,7 +127,7 @@ private Browser getAndUpdateStateManager() {
}

@Override
public final void onAuthStatusChanged(final AuthState authState) {
public final void onEvent(final AuthState authState) {
Display.getDefault().asyncExec(() -> {
amazonQCommonActions.updateActionVisibility(authState, getViewSite());
if (authState.isExpired()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,29 @@

package software.aws.toolkits.eclipse.amazonq.views;

import java.util.concurrent.Flow.Subscription;

import org.eclipse.swt.SWT;
import org.eclipse.swt.browser.Browser;
import org.eclipse.swt.graphics.Color;
import org.eclipse.swt.widgets.Composite;
import org.eclipse.swt.widgets.Display;
import org.eclipse.ui.part.ViewPart;

import software.aws.toolkits.eclipse.amazonq.broker.EventBroker;
import software.aws.toolkits.eclipse.amazonq.controllers.AmazonQViewController;
import software.aws.toolkits.eclipse.amazonq.lsp.auth.AuthStatusChangedListener;
import software.aws.toolkits.eclipse.amazonq.lsp.auth.AuthStatusProvider;
import software.aws.toolkits.eclipse.amazonq.lsp.auth.model.AuthState;
import software.aws.toolkits.eclipse.amazonq.observers.EventObserver;
import software.aws.toolkits.eclipse.amazonq.plugin.Activator;
import software.aws.toolkits.eclipse.amazonq.util.ThemeDetector;
import software.aws.toolkits.eclipse.amazonq.views.actions.AmazonQCommonActions;

public abstract class AmazonQView extends ViewPart implements AuthStatusChangedListener {
public abstract class AmazonQView extends ViewPart implements EventObserver<AuthState> {

private AmazonQViewController viewController;
private AmazonQCommonActions amazonQCommonActions;
private static final ThemeDetector THEME_DETECTOR = new ThemeDetector();
private Subscription authStateSubscription;

protected AmazonQView() {
this.viewController = new AmazonQViewController();
Expand Down Expand Up @@ -81,10 +84,10 @@ private void setupActions(final AuthState authState) {
}

private void setupAuthStatusListeners() {
AuthStatusProvider.addAuthStatusChangeListener(this);
AuthStatusProvider.addAuthStatusChangeListener(amazonQCommonActions.getSignoutAction());
AuthStatusProvider.addAuthStatusChangeListener(amazonQCommonActions.getFeedbackDialogContributionAction());
AuthStatusProvider.addAuthStatusChangeListener(amazonQCommonActions.getCustomizationDialogContributionAction());
authStateSubscription = EventBroker.getInstance().subscribe(this);
EventBroker.getInstance().subscribe(amazonQCommonActions.getSignoutAction());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cancels are missing for l88-90 right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, but they were not implemented in the original code either. To provide cancel functionality, we'd just need to keep track of the Subscription returned.

EventBroker.getInstance().subscribe(amazonQCommonActions.getFeedbackDialogContributionAction());
EventBroker.getInstance().subscribe(amazonQCommonActions.getCustomizationDialogContributionAction());
}

@Override
Expand Down Expand Up @@ -123,7 +126,7 @@ function waitForFunction(functionName, timeout = 30000) {
*/
@Override
public void dispose() {
AuthStatusProvider.removeAuthStatusChangeListener(this);
authStateSubscription.cancel();
super.dispose();
}

Expand Down
Loading
Loading