diff --git a/plugin/src/software/aws/toolkits/eclipse/amazonq/broker/EventBroker.java b/plugin/src/software/aws/toolkits/eclipse/amazonq/broker/EventBroker.java index be85f617..8ae02b32 100644 --- a/plugin/src/software/aws/toolkits/eclipse/amazonq/broker/EventBroker.java +++ b/plugin/src/software/aws/toolkits/eclipse/amazonq/broker/EventBroker.java @@ -26,6 +26,35 @@ import software.aws.toolkits.eclipse.amazonq.observers.EventObserver; import software.aws.toolkits.eclipse.amazonq.observers.StreamObserver; + +/** + * A thread-safe event broker that implements the publish-subscribe pattern for asynchronous event handling. + * This singleton class manages event publication and subscription using a concurrent execution model. + * + * The broker provides: + * - Thread-safe event publishing and subscription + * - Ordered event processing with batching support + * - Configurable thread pools for publishers and subscribers + * - De-duplication of consecutive identical events + * + * Thread Safety: This implementation is thread-safe and can handle concurrent publications + * and subscriptions from multiple threads. + * + * Example usage: + * EventBroker broker = EventBroker.getInstance(); + * + * // Subscribe to events + * EventObserver observer = new EventObserver<>() { + * @Override + * public void onEvent(final MyEvent event) { + * // handle event + * } + * }; + * Subscription subscription = broker.subscribe(observer); + * + * // Publish events + * broker.post(new MyEvent("data")); + */ public final class EventBroker { @FunctionalInterface @@ -33,23 +62,48 @@ private interface TypedCallable { void callWith(T event); } - public static final class CallerRunsPolicyBlocking implements RejectedExecutionHandler { + /* + * A rejection handler that defers task queueing by creating a new thread when the executor's queue is full. + * Instead of dropping tasks or blocking the submitting thread, this policy creates a dedicated + * "JobSubmissionPopUpThread" to handle the queueing of rejected tasks. + * + * When a task is rejected (due to queue capacity being reached): + * 1. If the executor is running: + * - Creates a new thread to handle the rejected task + * - The new thread attempts to put the task into the work queue + * - If the put operation is interrupted, preserves the interrupt state and + * throws RejectedExecutionException with the original exception as cause + * 2. If the executor is shutdown: + * - Immediately throws RejectedExecutionException + * + * This approach ensures: + * - Tasks are not lost during high load + * - The submitting thread is not blocked + * - Task ordering is maintained through the queue + * + * Note: Each rejection creates a new thread, which should be considered when + * using this handler in scenarios with frequent queue rejections. + */ + public static final class DeferredPopUpThreadQueueingPolicy implements RejectedExecutionHandler { private final BlockingQueue workQueue; - CallerRunsPolicyBlocking(final BlockingQueue workQueue) { + DeferredPopUpThreadQueueingPolicy(final BlockingQueue workQueue) { this.workQueue = workQueue; } @Override public void rejectedExecution(final Runnable runnable, final ThreadPoolExecutor executor) { if (!executor.isShutdown()) { - try { - workQueue.put(runnable); - } catch (InterruptedException exception) { - Thread.currentThread().interrupt(); - throw new RejectedExecutionException("Task " + runnable + " rejected from " + executor, exception); - } + // Create new thread for the blocking put operation + new Thread(() -> { + try { + workQueue.put(runnable); + } catch (InterruptedException exception) { + Thread.currentThread().interrupt(); + throw new RejectedExecutionException("Task " + runnable + " rejected from " + executor, exception); + } + }, "JobSubmissionPopUpThread").start(); } else { throw new RejectedExecutionException("Task " + runnable + " rejected from " + executor); } @@ -57,22 +111,57 @@ public void rejectedExecution(final Runnable runnable, final ThreadPoolExecutor } + /* + * A specialized ThreadPoolExecutor that guarantees ordered execution of tasks while providing + * configurable concurrency control. This executor ensures that tasks are processed in the + * order they were submitted, even when using multiple worker threads. + * + * Key features: + * - Maintains FIFO (First-In-First-Out) task execution order + * - Supports custom rejection policies for queue overflow scenarios + * - Provides configurable core and maximum thread pool sizes + * - Uses a blocking queue for task management + * - Handles task rejection through customizable policies + * + * Thread Management: + * - Core threads are retained even when idle + * - Additional threads are created up to maxThreads when needed + * - Excess threads terminate after being idle for keepAliveTime + * + * Job Handling: + * - Jobs are submitted to a bounded BlockingQueue + * - When queue is full, jobs are handled by the configured RejectedExecutionHandler + * - Default rejection policy creates pop-up threads to defer job queuing. + * + * Example usage: + * OrderedThreadPoolExecutor executor = new OrderedThreadPoolExecutor( + * coreThreads, // minimum number of threads + * maxThreads, // maximum number of threads + * jobQueueCapacity, // queue size to buffer jobs when all threads are busy + * queueCapacity, // maximum queue size + * keepAliveTime, // time to keep excess threads alive + * timeUnit // unit for keepAliveTime + * ); + * + * Thread Safety: This class is thread-safe and can handle concurrent task submissions + * from multiple threads while maintaining execution order. + */ public static final class OrderedThreadPoolExecutor { - private final Map> bufferedEventsForInterest; - private final Map jobStatusForInterest; - private final Map> callbackForInterest; - private final Map lastEventForInterest; + private final Map> bufferedEventsForInterest; // events that need to be processed for a particular interest + private final Map jobStatusForInterest; // flag specifying whether a job handling the queued events is running for specified interest + private final Map> callbackForInterest; // callback to handle queued events for specified interest + private final Map lastEventForInterest; // last event handled for specified interest private final BlockingQueue scheduledJobs; private final ThreadPoolExecutor executor; - private final int eventQueueCapacity; + private final int eventQueueCapacity; // size of the event buffer public static final int EVENT_BATCH_SIZE = 250; OrderedThreadPoolExecutor(final int coreThreadCount, final int maxThreadCount, final int jobQueueCapacity, final int eventQueueCapacity, final int keepAliveTime, final TimeUnit keepAliveTimeUnit) { - scheduledJobs = new ArrayBlockingQueue<>(jobQueueCapacity); + scheduledJobs = new ArrayBlockingQueue<>(jobQueueCapacity, true); bufferedEventsForInterest = new ConcurrentHashMap<>(); jobStatusForInterest = new ConcurrentHashMap<>(); callbackForInterest = new ConcurrentHashMap<>(); @@ -81,7 +170,7 @@ public static final class OrderedThreadPoolExecutor { this.eventQueueCapacity = eventQueueCapacity; executor = new ThreadPoolExecutor(coreThreadCount, maxThreadCount, keepAliveTime, keepAliveTimeUnit, - scheduledJobs, Executors.defaultThreadFactory(), new CallerRunsPolicyBlocking(scheduledJobs)); + scheduledJobs, Executors.defaultThreadFactory(), new DeferredPopUpThreadQueueingPolicy(scheduledJobs)); } public void registerCallbackForInterest(final String interestId, final TypedCallable callback) { @@ -92,6 +181,25 @@ public boolean isCallbackRegisteredForInterest(final String interestId) { return callbackForInterest.containsKey(interestId); } + /** + * Submits an event for processing based on a specific interest identifier (analogous to a topic). + * This method handles event buffering and scheduling in a thread-safe manner. + * + * @param The type of event being submitted + * @param interestId The identifier for the interest category/topic this event belongs to + * @param event The event object to be processed + * + * The method performs the following operations: + * 1. Creates or retrieves a blocking queue specific to the interestId + * 2. Buffers the event in the queue, blocking if the queue is full + * 3. Triggers job scheduling for the buffered events + * + * Note: This method uses a fair queuing policy to maintain FIFO ordering of events + * within each interest category. + * + * @throws RuntimeException if the thread is interrupted while putting the event + * into the queue (wraps InterruptedException) + */ @SuppressWarnings("unchecked") public void submitEventForInterest(final String interestId, final T event) { BlockingQueue bufferedEvents = (BlockingQueue) bufferedEventsForInterest.computeIfAbsent(interestId, @@ -105,6 +213,29 @@ public void submitEventForInterest(final String interestId, final T event) { handleJobScheduling(interestId, (Class) event.getClass(), bufferedEvents); } + /** + * Manages the scheduling of event processing jobs for a specific interest. + * This method ensures that only one job is actively processing events for each interest, + * preventing duplicate processing while maintaining event ordering. + * + * @param The type of events being processed + * @param interestId The identifier for the interest category being processed + * @param eventType The class type of the events in the queue + * @param bufferedEvents The queue containing events waiting to be processed + * + * Operation: + * 1. Maintains a job status flag (AtomicBoolean) for each interest + * 2. Uses atomic compare-and-set to ensure only one job is scheduled at a time + * 3. If no job is running (status is false), submits a new job to the executor + * + * Thread Safety: + * - Uses AtomicBoolean for thread-safe job status tracking + * - Employs CAS (Compare-And-Set) operations to prevent race conditions + * - Safe for concurrent access from multiple submitting threads + * + * Note: This method is non-blocking. If a job is already running for the given + * interest, subsequent calls will return immediately without scheduling a new job. + */ private void handleJobScheduling(final String interestId, final Class eventType, final BlockingQueue bufferedEvents) { AtomicBoolean jobStatus = jobStatusForInterest.computeIfAbsent(interestId, k -> new AtomicBoolean(false)); @@ -114,6 +245,40 @@ private void handleJobScheduling(final String interestId, final Class eve } } + /** + * Processes queued events for a specific interest in batches, ensuring duplicate events + * are not processed and maintaining the last processed event state. + * + * @param The type of events being processed + * @param interestId The identifier for the interest category being processed + * @param eventType The class type of the events in the queue + * @param bufferedEvents The queue containing events to be processed + * @param jobStatus Flag indicating the processing status for this interest + * + * Processing Logic: + * 1. Retrieves the callback registered for this interest + * 2. Processes events in batches of size EVENT_BATCH_SIZE + * 3. Skips duplicate events by comparing with the last processed event + * 4. Updates the last processed event after successful processing + * + * Error Handling: + * - Individual event processing failures are caught and logged + * - Processing continues with the next event if an error occurs + * - Job status is always reset in the finally block + * + * Thread Safety: + * - Safe for concurrent access through synchronized collections + * - Maintains atomic job status updates + * - Preserves event ordering within batches + * + * Note: This method suppresses unchecked warnings due to type casting + * requirements in the event handling system. + * + * Performance Considerations: + * - Uses batch processing to improve throughput + * - Implements duplicate event filtering + * - Clears batch buffer after processing to manage memory + */ @SuppressWarnings("unchecked") private void processQueuedEvents(final String interestId, final Class eventType, final BlockingQueue bufferedEvents, final AtomicBoolean jobStatus) {