Skip to content

Commit

Permalink
Integrate event broker to publish auth state
Browse files Browse the repository at this point in the history
  • Loading branch information
taldekar committed Jan 21, 2025
1 parent 8fc1f43 commit 1c682f3
Show file tree
Hide file tree
Showing 10 changed files with 67 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@

package software.aws.toolkits.eclipse.amazonq.broker;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
Expand All @@ -21,6 +19,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;

import software.aws.toolkits.eclipse.amazonq.observers.EventObserver;
import software.aws.toolkits.eclipse.amazonq.observers.StreamObserver;
Expand Down Expand Up @@ -61,18 +60,19 @@ public static final class OrderedThreadPoolExecutor {
private final Map<String, BlockingQueue<?>> interestIdToEventQueueMap;
private final Map<String, AtomicBoolean> interestIdToJobStatusMap;
private final Map<String, TypedCallable<?>> interestIdToCallbackMap;
private final Map<String, ReentrantLock> interestIdToJobLockMap;

private final BlockingQueue<Runnable> scheduledJobsQueue;
private final ThreadPoolExecutor executor;
private final int eventQueueCapacity;
private static final int BATCH_PROCESSING_QUEUE_SIZE = 250;

OrderedThreadPoolExecutor(final int coreThreadCount, final int maxThreadCount, final int jobQueueCapacity,
final int eventQueueCapacity, final int keepAliveTime, final TimeUnit keepAliveTimeUnit) {
scheduledJobsQueue = new ArrayBlockingQueue<>(jobQueueCapacity);
interestIdToEventQueueMap = new ConcurrentHashMap<>();
interestIdToJobStatusMap = new ConcurrentHashMap<>();
interestIdToCallbackMap = new ConcurrentHashMap<>();
interestIdToJobLockMap = new ConcurrentHashMap<>();

this.eventQueueCapacity = eventQueueCapacity;

Expand All @@ -92,10 +92,14 @@ public <T> boolean isCallbackRegisteredForInterest(final String interestId) {
public <T, R> void submitEventForInterest(final String interestId, final R event) {
BlockingQueue<R> eventQueue = (BlockingQueue<R>) interestIdToEventQueueMap.computeIfAbsent(interestId,
k -> new ArrayBlockingQueue<>(eventQueueCapacity, true));
ReentrantLock jobLock = interestIdToJobLockMap.computeIfAbsent(interestId, k -> new ReentrantLock(true));
jobLock.lock();
try {
eventQueue.put(event);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
jobLock.unlock();
}

handleJobScheduling(interestId, (Class<R>) event.getClass(), eventQueue);
Expand All @@ -119,20 +123,17 @@ private <T, R> void processQueuedEvents(final String interestId, final Class<R>
return;
}

List<R> batchedEvents = new ArrayList<>(BATCH_PROCESSING_QUEUE_SIZE);

while (!eventQueue.isEmpty()) {
eventQueue.drainTo(batchedEvents);

for (R newEvent : batchedEvents) {
try {
try {
R newEvent = eventQueue.poll();
if (newEvent != null) {
eventCallback.callWith(newEvent);
} catch (Exception e) {
e.printStackTrace();
} else {
break;
}
} catch (Exception e) {
e.printStackTrace();
}

batchedEvents.clear();
}
} finally {
jobStatus.set(false);
Expand All @@ -152,8 +153,8 @@ private <T, R> void processQueuedEvents(final String interestId, final Class<R>

private EventBroker() {
eventTypeToPublisherMap = new ConcurrentHashMap<>();
publisherExecutor = new OrderedThreadPoolExecutor(3, 10, 10, 100, 10, TimeUnit.MILLISECONDS);
subscriberExecutor = new OrderedThreadPoolExecutor(3, 10, 10, 100, 10, TimeUnit.MILLISECONDS);
publisherExecutor = new OrderedThreadPoolExecutor(5, 20, 50, 100, 10, TimeUnit.MILLISECONDS);
subscriberExecutor = new OrderedThreadPoolExecutor(5, 20, 50, 100, 10, TimeUnit.MILLISECONDS);
}

public static EventBroker getInstance() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Flow.Subscription;

import org.eclipse.swt.widgets.Display;
import org.eclipse.ui.AbstractSourceProvider;
Expand All @@ -13,7 +14,9 @@
import org.eclipse.ui.PlatformUI;
import org.eclipse.ui.services.ISourceProviderService;

import software.aws.toolkits.eclipse.amazonq.broker.EventBroker;
import software.aws.toolkits.eclipse.amazonq.lsp.auth.model.AuthState;
import software.aws.toolkits.eclipse.amazonq.observers.EventObserver;
import software.aws.toolkits.eclipse.amazonq.plugin.Activator;

/**
Expand All @@ -30,12 +33,13 @@
* @see software.aws.toolkits.eclipse.amazonq.toolbar
* @see software.aws.toolkits.eclipse.amazonq.toolbar-unauthenticated
*/
public final class AuthSourceProvider extends AbstractSourceProvider implements AuthStatusChangedListener {
public final class AuthSourceProvider extends AbstractSourceProvider implements EventObserver<AuthState> {
public static final String IS_LOGGED_IN_VARIABLE_ID = "is_logged_in";
private boolean isLoggedIn = false;
private Subscription authStateSubscription;

public AuthSourceProvider() {
AuthStatusProvider.addAuthStatusChangeListener(this);
authStateSubscription = EventBroker.getInstance().subscribe(this);
isLoggedIn = Activator.getLoginService().getAuthState().isLoggedIn();
}

Expand All @@ -53,8 +57,7 @@ public void dispose() {

// Notify listeners that this provider is being disposed
fireSourceChanged(ISources.WORKBENCH, IS_LOGGED_IN_VARIABLE_ID, null);

AuthStatusProvider.removeAuthStatusChangeListener(this);
authStateSubscription.cancel();
}

@Override
Expand All @@ -69,15 +72,15 @@ public void setIsLoggedIn(final Boolean isLoggedIn) {

public static AuthSourceProvider getProvider() {
IWorkbench workbench = PlatformUI.getWorkbench();
ISourceProviderService sourceProviderService = (ISourceProviderService) workbench
ISourceProviderService sourceProviderService = workbench
.getService(ISourceProviderService.class);
AuthSourceProvider provider = (AuthSourceProvider) sourceProviderService
.getSourceProvider(AuthSourceProvider.IS_LOGGED_IN_VARIABLE_ID);
return provider;
}

@Override
public void onAuthStatusChanged(final AuthState authState) {
public void onEvent(final AuthState authState) {
boolean isLoggedIn = authState.isLoggedIn();
Display.getDefault().asyncExec(() -> {
setIsLoggedIn(isLoggedIn);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

package software.aws.toolkits.eclipse.amazonq.lsp.auth;

import software.aws.toolkits.eclipse.amazonq.broker.EventBroker;
import software.aws.toolkits.eclipse.amazonq.configuration.PluginStore;
import software.aws.toolkits.eclipse.amazonq.lsp.auth.model.AuthState;
import software.aws.toolkits.eclipse.amazonq.lsp.auth.model.AuthStateType;
Expand Down Expand Up @@ -119,7 +120,7 @@ private void updateState(final AuthStateType authStatusType, final LoginType log
* This notification is critical for ensuring all plugin components reflect the current
* authentication state.
*/
AuthStatusProvider.notifyAuthStatusChanged(getAuthState());
EventBroker.getInstance().post(getAuthState());
}

private void syncAuthStateWithPluginStore() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public void completed(final ProgressEvent event) {
});

// Check if user is authenticated and build view accordingly
onAuthStatusChanged(authState);
onEvent(authState);
}

private Browser getAndUpdateStateManager() {
Expand All @@ -127,7 +127,7 @@ private Browser getAndUpdateStateManager() {
}

@Override
public final void onAuthStatusChanged(final AuthState authState) {
public final void onEvent(final AuthState authState) {
Display.getDefault().asyncExec(() -> {
amazonQCommonActions.updateActionVisibility(authState, getViewSite());
if (authState.isExpired()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,34 +2,34 @@

package software.aws.toolkits.eclipse.amazonq.views;

import java.util.concurrent.Flow.Subscription;

import org.eclipse.swt.SWT;
import org.eclipse.swt.browser.Browser;
import org.eclipse.swt.graphics.Color;
import org.eclipse.swt.widgets.Composite;
import org.eclipse.swt.widgets.Display;
import org.eclipse.ui.part.ViewPart;

import software.aws.toolkits.eclipse.amazonq.broker.EventBroker;
import software.aws.toolkits.eclipse.amazonq.controllers.AmazonQViewController;
import software.aws.toolkits.eclipse.amazonq.events.TestEvent;
import software.aws.toolkits.eclipse.amazonq.lsp.auth.AuthStatusChangedListener;
import software.aws.toolkits.eclipse.amazonq.lsp.auth.AuthStatusProvider;
import software.aws.toolkits.eclipse.amazonq.lsp.auth.model.AuthState;
import software.aws.toolkits.eclipse.amazonq.observers.StreamObserver;
import software.aws.toolkits.eclipse.amazonq.observers.EventObserver;
import software.aws.toolkits.eclipse.amazonq.plugin.Activator;
import software.aws.toolkits.eclipse.amazonq.publishers.TestPublisher;
import software.aws.toolkits.eclipse.amazonq.util.ThemeDetector;
import software.aws.toolkits.eclipse.amazonq.views.actions.AmazonQCommonActions;

public abstract class AmazonQView extends ViewPart implements AuthStatusChangedListener, StreamObserver<TestEvent> {
public abstract class AmazonQView extends ViewPart implements EventObserver<AuthState> {

private AmazonQViewController viewController;
private AmazonQCommonActions amazonQCommonActions;
private static final ThemeDetector THEME_DETECTOR = new ThemeDetector();
private Subscription authStateSubscription;

protected AmazonQView() {
this.viewController = new AmazonQViewController();
new TestPublisher();
// EventBroker.getInstance().subscribe(this);
}

public final Browser getBrowser() {
Expand Down Expand Up @@ -86,10 +86,10 @@ private void setupActions(final AuthState authState) {
}

private void setupAuthStatusListeners() {
AuthStatusProvider.addAuthStatusChangeListener(this);
AuthStatusProvider.addAuthStatusChangeListener(amazonQCommonActions.getSignoutAction());
AuthStatusProvider.addAuthStatusChangeListener(amazonQCommonActions.getFeedbackDialogContributionAction());
AuthStatusProvider.addAuthStatusChangeListener(amazonQCommonActions.getCustomizationDialogContributionAction());
authStateSubscription = EventBroker.getInstance().subscribe(this);
EventBroker.getInstance().subscribe(amazonQCommonActions.getSignoutAction());
EventBroker.getInstance().subscribe(amazonQCommonActions.getFeedbackDialogContributionAction());
EventBroker.getInstance().subscribe(amazonQCommonActions.getCustomizationDialogContributionAction());
}

@Override
Expand Down Expand Up @@ -128,23 +128,8 @@ function waitForFunction(functionName, timeout = 30000) {
*/
@Override
public void dispose() {
AuthStatusProvider.removeAuthStatusChangeListener(this);
authStateSubscription.cancel();
super.dispose();
}

@Override
public final void onEvent(final TestEvent event) {
Activator.getLogger().info(event.getMessage());
}

@Override
public final void onError(final Throwable error) {
error.printStackTrace();
}

@Override
public final void onComplete() {
System.out.println("Complete");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package software.aws.toolkits.eclipse.amazonq.views;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Flow.Subscription;

import org.eclipse.swt.SWT;
import org.eclipse.swt.events.SelectionAdapter;
Expand All @@ -18,30 +19,31 @@
import org.eclipse.swt.widgets.Display;
import org.eclipse.swt.widgets.Link;

import software.aws.toolkits.eclipse.amazonq.lsp.auth.AuthStatusChangedListener;
import software.aws.toolkits.eclipse.amazonq.lsp.auth.AuthStatusProvider;
import software.aws.toolkits.eclipse.amazonq.broker.EventBroker;
import software.aws.toolkits.eclipse.amazonq.lsp.auth.model.AuthState;
import software.aws.toolkits.eclipse.amazonq.observers.EventObserver;
import software.aws.toolkits.eclipse.amazonq.plugin.Activator;
import software.aws.toolkits.eclipse.amazonq.telemetry.UiTelemetryProvider;
import software.aws.toolkits.eclipse.amazonq.util.Constants;
import software.aws.toolkits.eclipse.amazonq.util.PluginUtils;
import software.aws.toolkits.eclipse.amazonq.util.ThreadingUtils;
import software.aws.toolkits.eclipse.amazonq.views.actions.SignoutAction;
import software.aws.toolkits.eclipse.amazonq.telemetry.UiTelemetryProvider;


public final class ReauthenticateView extends CallToActionView implements AuthStatusChangedListener {
public final class ReauthenticateView extends CallToActionView implements EventObserver<AuthState> {
public static final String ID = "software.aws.toolkits.eclipse.amazonq.views.ReauthenticateView";

private static final String ICON_PATH = "icons/AmazonQ64.png";
private static final String HEADER_LABEL = "Connection to Amazon Q Expired";
private static final String DETAIL_MESSAGE = "Please re-authenticate to continue";
private static final String BUTTON_LABEL = "Re-authenticate";
private static final String LINK_LABEL = "Sign out";
private Subscription authStateSubscription;

public ReauthenticateView() {
// It is necessary for this view to be an `AuthStatusChangedListener` to switch the view back to Q Chat after the authentication
// flow is successful. Without this listener, the re-authentication will succeed but the view will remain present.
AuthStatusProvider.addAuthStatusChangeListener(this);
authStateSubscription = EventBroker.getInstance().subscribe(this);
}

@Override
Expand Down Expand Up @@ -98,7 +100,7 @@ public void widgetSelected(final SelectionEvent e) {
}

@Override
public void onAuthStatusChanged(final AuthState authState) {
public void onEvent(final AuthState authState) {
Display.getDefault().asyncExec(() -> {
if (authState.isLoggedIn()) {
ViewVisibilityManager.showChatView("update");
Expand Down Expand Up @@ -138,7 +140,7 @@ protected void updateButtonStyle(final Button button) {

@Override
public void dispose() {
AuthStatusProvider.removeAuthStatusChangeListener(this);
authStateSubscription.cancel();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,11 @@ public Object function(final Object[] arguments) {
amazonQCommonActions = getAmazonQCommonActions();

// Check if user is authenticated and build view accordingly
onAuthStatusChanged(authState);
onEvent(authState);
}

@Override
public void onAuthStatusChanged(final AuthState authState) {
public void onEvent(final AuthState authState) {
var browser = getBrowser();
Display.getDefault().asyncExec(() -> {
amazonQCommonActions.updateActionVisibility(authState, getViewSite());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package software.aws.toolkits.eclipse.amazonq.views.actions;

import java.util.Objects;

import org.eclipse.jface.action.ContributionItem;
import org.eclipse.swt.SWT;
import org.eclipse.swt.events.SelectionAdapter;
Expand All @@ -13,18 +14,19 @@
import org.eclipse.swt.widgets.MenuItem;
import org.eclipse.swt.widgets.Shell;
import org.eclipse.ui.IViewSite;

import jakarta.inject.Inject;
import software.aws.toolkits.eclipse.amazonq.lsp.auth.AuthStatusChangedListener;
import software.aws.toolkits.eclipse.amazonq.lsp.auth.model.AuthState;
import software.aws.toolkits.eclipse.amazonq.lsp.auth.model.LoginType;
import software.aws.toolkits.eclipse.amazonq.observers.EventObserver;
import software.aws.toolkits.eclipse.amazonq.plugin.Activator;
import software.aws.toolkits.eclipse.amazonq.telemetry.UiTelemetryProvider;
import software.aws.toolkits.eclipse.amazonq.util.Constants;
import software.aws.toolkits.eclipse.amazonq.views.CustomizationDialog;
import software.aws.toolkits.eclipse.amazonq.views.model.Customization;
import software.aws.toolkits.eclipse.amazonq.views.CustomizationDialog.ResponseSelection;
import software.aws.toolkits.eclipse.amazonq.plugin.Activator;
import software.aws.toolkits.eclipse.amazonq.telemetry.UiTelemetryProvider;
import software.aws.toolkits.eclipse.amazonq.views.model.Customization;

public final class CustomizationDialogContributionItem extends ContributionItem implements AuthStatusChangedListener {
public final class CustomizationDialogContributionItem extends ContributionItem implements EventObserver<AuthState> {
private static final String CUSTOMIZATION_MENU_ITEM_TEXT = "Select Customization";

@Inject
Expand All @@ -45,7 +47,7 @@ public void updateVisibility(final AuthState authState) {
}

@Override
public void onAuthStatusChanged(final AuthState authState) {
public void onEvent(final AuthState authState) {
updateVisibility(authState);
}

Expand Down
Loading

0 comments on commit 1c682f3

Please sign in to comment.