Skip to content

Commit

Permalink
Introduce ViewRouter Component (#337)
Browse files Browse the repository at this point in the history
* improved handling of lsp failure state

* add static mock to lsp connection test case

* Integrate Event Broker in LspStatusManager

* Add ViewRouter POC

* Fix code formatting bug

* Add method to retrieve observables

* Add listeners for combined state streams and view update request

* Add listeners for combined state streams and view update request

* Add listeners for combined state streams and view update request

* Add functionality for hot event streams that track latest event (#340)

* Add functionality for hot subscribers that track latest event

* Add test to ensure event stream isolation

* Fix missing subscription disposal in test

* Use autoConnect to implicitly manage stream connection

* Remove active view update request listener

* Add ViewRouter tests

* Remove public constructor

* Remove public constructor and unused event in tests

* Add comments

* Add documentation for EventBroker

* Remove ViewRouter initialization

* Remove LspInitializingView from ID enum

* Add documentation for ViewRouter

* Refactor and enhance EventBroker tests

* Refactor ViewRouter tests for clarity

* Remove PluginState class into separate file

* Add documentation to subscription management logic

* Add support for notifying  multiple late-subscribers over time of latest state (#342)

* Remove CODE_REFERENCE_VIEW

* Rename newActiveViewId to newActiveView

* Rename ViewId class to AmazonQViewType

* Revert "Integrate Event Broker in LspStatusManager"

This reverts commit 374e549.

* Revert "improved handling of lsp failure state"

This reverts commit 13c55ac.

* Refactor EventBroker and enhance tests (#343)

---------

Co-authored-by: Nicolas Borges <[email protected]>
  • Loading branch information
taldekar and Nicolas Borges authored Feb 4, 2025
1 parent 0767c4d commit f975e6f
Show file tree
Hide file tree
Showing 8 changed files with 473 additions and 49 deletions.
2 changes: 1 addition & 1 deletion plugin/META-INF/MANIFEST.MF
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ Require-Bundle: org.eclipse.core.runtime;bundle-version="3.31.0",
org.apache.commons.logging;bundle-version="1.2.0",
slf4j.api;bundle-version="2.0.13",
org.apache.commons.lang3;bundle-version="3.14.0"
Bundle-Classpath: target/classes/,
Bundle-Classpath: .,
target/dependency/annotations-2.28.26.jar,
target/dependency/apache-client-2.28.26.jar,
target/dependency/auth-2.28.26.jar,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,35 +3,112 @@

package software.aws.toolkits.eclipse.amazonq.broker;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.disposables.CompositeDisposable;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.functions.Consumer;
import io.reactivex.rxjava3.schedulers.Schedulers;
import io.reactivex.rxjava3.subjects.PublishSubject;
import io.reactivex.rxjava3.subjects.BehaviorSubject;
import io.reactivex.rxjava3.subjects.Subject;
import software.aws.toolkits.eclipse.amazonq.broker.api.EventObserver;

/**
* A thread-safe event broker that implements the publish-subscribe pattern
* using RxJava.
*
* This broker manages event distribution using BehaviorSubjects, which cache
* the most recent event for each event type. It provides type-safe event
* publishing and subscription, with automatic resource management for
* subscriptions. Events are published and consumed on dedicated threads so
* operations are non-blocking.
*/
public final class EventBroker {

private final Subject<Object> eventBus = PublishSubject.create().toSerialized();
/** Maps event types to their corresponding subjects for event distribution. */
private final Map<Class<?>, Subject<Object>> subjectsForType;

/** Tracks all subscriptions for proper cleanup. */
private final CompositeDisposable disposableSubscriptions;

public EventBroker() {
subjectsForType = new ConcurrentHashMap<>();
disposableSubscriptions = new CompositeDisposable();
}

public <T> void post(final T event) {
/**
* Posts an event of the specified type to all subscribers and caches it for
* late-subscribers.
*
* @param <T> The type of the event
* @param eventType The class object representing the event type
* @param event The event to publish
*/
public <T> void post(final Class<T> eventType, final T event) {
if (event == null) {
return;
}
eventBus.onNext(event);
getOrCreateSubject(eventType).onNext(event);
}

/**
* Gets or creates a Subject for the specified event type. Creates a new
* serialized BehaviorSubject if none exists.
*
* @param <T> The type of events the subject will handle
* @param eventType The class object representing the event type
* @return A Subject that handles events of the specified type
*/
private <T> Subject<Object> getOrCreateSubject(final Class<T> eventType) {
return subjectsForType.computeIfAbsent(eventType, k -> {
Subject<Object> subject = BehaviorSubject.create().toSerialized();
subject.subscribeOn(Schedulers.computation());
return subject;
});
}

/**
* Subscribes an observer to events of a specific type. The observer will
* receive events on a computation thread by default. The subscription is
* automatically tracked for disposal management.
*
* @param <T> the type of events to observe
* @param eventType the Class object representing the event type
* @param observer the observer that will handle emitted events
* @return a Disposable that can be used to unsubscribe from the events
*/
public <T> Disposable subscribe(final Class<T> eventType, final EventObserver<T> observer) {
Consumer<T> consumer = new Consumer<>() {
@Override
public void accept(final T event) {
observer.onEvent(event);
}
};

return eventBus.ofType(eventType)
.observeOn(Schedulers.computation())
.subscribe(consumer);
}
Disposable subscription = ofObservable(eventType)
.observeOn(Schedulers.computation()) // subscribe on dedicated thread
.subscribe(observer::onEvent);
disposableSubscriptions.add(subscription); // track subscription for dispose call
return subscription;
}

/**
* Returns an Observable for the specified event type. This Observable can be
* used to create custom subscription chains with additional operators.
*
* @param <T> the type of events the Observable will emit
* @param eventType the Class object representing the event type
* @return an Observable that emits events of the specified type
*/
public <T> Observable<T> ofObservable(final Class<T> eventType) {
return getOrCreateSubject(eventType).ofType(eventType);
}

/**
* Disposes of all subscriptions managed by this broker by clearing the disposable subscriptions collection.
* This method should be called when the broker is no longer needed to prevent memory leaks.
* After disposal, any existing subscriptions will be terminated and new events will not be delivered
* to their observers.
*
* Note: This only disposes of the subscriptions, not the underlying Observables.
* The EventBroker can be reused after disposal by creating new subscriptions.
*/
public void dispose() {
disposableSubscriptions.clear();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ private void updateState(final AuthStateType authStatusType, final LoginType log
*/
AuthState newAuthState = getAuthState();
if (previousAuthState == null || newAuthState.authStateType() != previousAuthState.authStateType()) {
Activator.getEventBroker().post(newAuthState);
Activator.getEventBroker().post(AuthState.class, newAuthState);
}
previousAuthState = newAuthState;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
// Copyright 2024 Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

package software.aws.toolkits.eclipse.amazonq.views.router;

public enum AmazonQViewType {
TOOLKIT_LOGIN_VIEW, CHAT_VIEW, DEPENDENCY_MISSING_VIEW, RE_AUTHENTICATE_VIEW, CHAT_ASSET_MISSING_VIEW,
LSP_STARTUP_FAILED_VIEW
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
// Copyright 2024 Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

package software.aws.toolkits.eclipse.amazonq.views.router;

import software.aws.toolkits.eclipse.amazonq.lsp.auth.model.AuthState;
import software.aws.toolkits.eclipse.amazonq.lsp.manager.LspState;

public record PluginState(AuthState authState, LspState lspState) {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
// Copyright 2024 Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

package software.aws.toolkits.eclipse.amazonq.views.router;

import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.schedulers.Schedulers;
import software.aws.toolkits.eclipse.amazonq.broker.api.EventObserver;
import software.aws.toolkits.eclipse.amazonq.lsp.auth.model.AuthState;
import software.aws.toolkits.eclipse.amazonq.lsp.manager.LspState;
import software.aws.toolkits.eclipse.amazonq.plugin.Activator;

/**
* Routes to appropriate views based on the combined auth and lsp states (plugin
* state). This router observes plugin state changes and updates the active view
* accordingly, broadcasting view changes through the event broker.
*/
public final class ViewRouter implements EventObserver<PluginState> {

private AmazonQViewType activeView;

/**
* Constructs a ViewRouter with the specified builder configuration. Initializes
* state observation and sets up view routing logic. Primarily useful for
* testing and injecting observables. When none are passed, the router get the
* observables directly from the event broker and combines them to create the
* PluginState stream.
*
* @param builder The builder containing auth and lsp state observables
*/
private ViewRouter(final Builder builder) {
if (builder.authStateObservable == null) {
builder.authStateObservable = Activator.getEventBroker().ofObservable(AuthState.class);
}

if (builder.lspStateObservable == null) {
builder.lspStateObservable = Activator.getEventBroker().ofObservable(LspState.class);
}

/*
* Combine auth and lsp streams and publish combined state updates on changes to
* either stream consisting of the latest events from both streams (this will
* happen only after one event has been published to both streams):
*/
Observable.combineLatest(builder.authStateObservable, builder.lspStateObservable, PluginState::new)
.observeOn(Schedulers.computation()).subscribe(this::onEvent);
}

public static Builder builder() {
return new Builder();
}

/**
* Handles plugin state changes by refreshing the active view.
*
* @param pluginState The current combined state auth and lsp state of the plugin
*/
@Override
public void onEvent(final PluginState pluginState) {
refreshActiveView(pluginState);
}

/**
* Determines and sets the appropriate view based on the order of resolution.
* View selection follows a priority order:
* 1. Dependency Missing: can browsers be created.
* 2. LSP Startup Failed: has the language server initialization failed (not pending/active).
* 3. Chat UI Asset Missing: have chat assets been fetched and available?
* 4. Authentication Logged out: if user logged out, needs to login again.
* 5. Authentication Expired: if auth has expired, needs to be refreshed.
* 5. Chat View: happy path.
*
* @param pluginState The current combined auth and lsp state of the plugin
*/
private void refreshActiveView(final PluginState pluginState) {
AmazonQViewType newActiveView;

if (isDependencyMissing()) { // TODO: dependency missing check logic needs to be implemented
newActiveView = AmazonQViewType.DEPENDENCY_MISSING_VIEW;
} else if (pluginState.lspState() == LspState.FAILED) {
newActiveView = AmazonQViewType.LSP_STARTUP_FAILED_VIEW;
} else if (isChatUIAssetMissing()) { // TODO: chat missing logic needs to be implemented
newActiveView = AmazonQViewType.CHAT_ASSET_MISSING_VIEW;
} else if (pluginState.authState().isLoggedOut()) {
newActiveView = AmazonQViewType.TOOLKIT_LOGIN_VIEW;
} else if (pluginState.authState().isExpired()) {
newActiveView = AmazonQViewType.RE_AUTHENTICATE_VIEW;
} else {
newActiveView = AmazonQViewType.CHAT_VIEW;
}

updateActiveView(newActiveView);
}

/**
* Updates the active view if it has changed and notifies observers of the
* change.
*
* @param newActiveViewId The new view to be activated
*/
private void updateActiveView(final AmazonQViewType newActiveViewId) {
if (activeView != newActiveViewId) {
activeView = newActiveViewId;
notifyActiveViewChange();
}
}

/**
* Broadcasts the active view change through the event broker.
*/
private void notifyActiveViewChange() {
Activator.getEventBroker().post(AmazonQViewType.class, activeView);
}

/**
* Checks if browsers available are compatible or is dependency missing.
* TODO: Implement actual dependency checking logic
*
* @return true if dependencies are missing, false otherwise
*/
private boolean isDependencyMissing() {
return false;
}

/**
* Checks if required chat UI assets are missing.
* TODO: Implement actual asset checking logic
*
* @return true if chat UI assets are missing, false otherwise
*/
private boolean isChatUIAssetMissing() {
return false;
}

public static final class Builder {

private Observable<AuthState> authStateObservable;
private Observable<LspState> lspStateObservable;

public Builder withAuthStateObservable(final Observable<AuthState> authStateObservable) {
this.authStateObservable = authStateObservable;
return this;
}

public Builder withLspStateObservable(final Observable<LspState> lspStateObservable) {
this.lspStateObservable = lspStateObservable;
return this;
}

public ViewRouter build() {
return new ViewRouter(this);
}

}

}
Loading

0 comments on commit f975e6f

Please sign in to comment.