Skip to content

Commit

Permalink
Fix bug and improve performance
Browse files Browse the repository at this point in the history
  • Loading branch information
taldekar committed Jan 22, 2025
1 parent 1c682f3 commit dc8b198
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

package software.aws.toolkits.eclipse.amazonq.broker;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
Expand All @@ -19,7 +21,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;

import software.aws.toolkits.eclipse.amazonq.observers.EventObserver;
import software.aws.toolkits.eclipse.amazonq.observers.StreamObserver;
Expand Down Expand Up @@ -60,19 +61,19 @@ public static final class OrderedThreadPoolExecutor {
private final Map<String, BlockingQueue<?>> interestIdToEventQueueMap;
private final Map<String, AtomicBoolean> interestIdToJobStatusMap;
private final Map<String, TypedCallable<?>> interestIdToCallbackMap;
private final Map<String, ReentrantLock> interestIdToJobLockMap;

private final BlockingQueue<Runnable> scheduledJobsQueue;
private final ThreadPoolExecutor executor;
private final int eventQueueCapacity;

public static final int EVENT_BATCH_SIZE = 250;

OrderedThreadPoolExecutor(final int coreThreadCount, final int maxThreadCount, final int jobQueueCapacity,
final int eventQueueCapacity, final int keepAliveTime, final TimeUnit keepAliveTimeUnit) {
scheduledJobsQueue = new ArrayBlockingQueue<>(jobQueueCapacity);
interestIdToEventQueueMap = new ConcurrentHashMap<>();
interestIdToJobStatusMap = new ConcurrentHashMap<>();
interestIdToCallbackMap = new ConcurrentHashMap<>();
interestIdToJobLockMap = new ConcurrentHashMap<>();

this.eventQueueCapacity = eventQueueCapacity;

Expand All @@ -92,14 +93,10 @@ public <T> boolean isCallbackRegisteredForInterest(final String interestId) {
public <T, R> void submitEventForInterest(final String interestId, final R event) {
BlockingQueue<R> eventQueue = (BlockingQueue<R>) interestIdToEventQueueMap.computeIfAbsent(interestId,
k -> new ArrayBlockingQueue<>(eventQueueCapacity, true));
ReentrantLock jobLock = interestIdToJobLockMap.computeIfAbsent(interestId, k -> new ReentrantLock(true));
jobLock.lock();
try {
eventQueue.put(event);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
jobLock.unlock();
}

handleJobScheduling(interestId, (Class<R>) event.getClass(), eventQueue);
Expand All @@ -109,7 +106,7 @@ private <T, R> void handleJobScheduling(final String interestId, final Class<R>
final BlockingQueue<R> eventQueue) {
AtomicBoolean jobStatus = interestIdToJobStatusMap.computeIfAbsent(interestId, k -> new AtomicBoolean(false));

if (!jobStatus.get() && !eventQueue.isEmpty() && jobStatus.compareAndSet(false, true)) {
if (jobStatus.compareAndSet(false, true)) {
executor.submit(() -> processQueuedEvents(interestId, eventType, eventQueue, jobStatus));
}
}
Expand All @@ -123,17 +120,18 @@ private <T, R> void processQueuedEvents(final String interestId, final Class<R>
return;
}

while (!eventQueue.isEmpty()) {
try {
R newEvent = eventQueue.poll();
if (newEvent != null) {
List<R> eventBatchQueue = new ArrayList<>(EVENT_BATCH_SIZE);

while (eventQueue.drainTo(eventBatchQueue) > 0) {
for (R newEvent : eventBatchQueue) {
try {
eventCallback.callWith(newEvent);
} else {
break;
} catch (Exception e) {
e.printStackTrace();
}
} catch (Exception e) {
e.printStackTrace();
}

eventBatchQueue.clear();
}
} finally {
jobStatus.set(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import software.aws.toolkits.eclipse.amazonq.lsp.auth.LoginService;
import software.aws.toolkits.eclipse.amazonq.providers.LspProvider;
import software.aws.toolkits.eclipse.amazonq.providers.LspProviderImpl;
import software.aws.toolkits.eclipse.amazonq.publishers.TestPublisher;
import software.aws.toolkits.eclipse.amazonq.subscriber.TestSubscribers;
import software.aws.toolkits.eclipse.amazonq.telemetry.service.DefaultTelemetryService;
import software.aws.toolkits.eclipse.amazonq.telemetry.service.TelemetryService;
Expand Down Expand Up @@ -57,6 +58,8 @@ public Activator() {
testSubscriberList.add(testSubsciber);
EventBroker.getInstance().subscribe(testSubsciber);
}

new TestPublisher();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public TestSubscribers() {

@Override
public void onEvent(final TestEvent event) {
Activator.getLogger().info(event.getMessage());
// Activator.getLogger().info(event.getMessage());

if (event.getSequenceNumber() - previousSequenceNumber != 1) {
Activator.getLogger().info("OUT OF ORDER: " + event.getSequenceNumber() + " " + previousSequenceNumber);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import software.aws.toolkits.eclipse.amazonq.lsp.auth.model.AuthState;
import software.aws.toolkits.eclipse.amazonq.observers.EventObserver;
import software.aws.toolkits.eclipse.amazonq.plugin.Activator;
import software.aws.toolkits.eclipse.amazonq.publishers.TestPublisher;
import software.aws.toolkits.eclipse.amazonq.util.ThemeDetector;
import software.aws.toolkits.eclipse.amazonq.views.actions.AmazonQCommonActions;

Expand All @@ -29,7 +28,6 @@ public abstract class AmazonQView extends ViewPart implements EventObserver<Auth

protected AmazonQView() {
this.viewController = new AmazonQViewController();
new TestPublisher();
}

public final Browser getBrowser() {
Expand Down

0 comments on commit dc8b198

Please sign in to comment.