@@ -79,48 +79,63 @@ static final class QueuedLARequest {
7979 }
8080
8181 private void processQueue () {
82- while (running || !requestQueue .isEmpty ()) {
83- SlotPermit slotPermit = null ;
84- QueuedLARequest request = null ;
85- try {
86- request = requestQueue .take ();
87-
88- SlotSupplierFuture future = slotSupplier .reserveSlot (request .data );
82+ try {
83+ while (running || !requestQueue .isEmpty ()) {
84+ SlotPermit slotPermit = null ;
85+ QueuedLARequest request = null ;
8986 try {
90- slotPermit = future .get ();
91- } catch (InterruptedException e ) {
92- SlotPermit maybePermitAnyway = future .abortReservation ();
93- if (maybePermitAnyway != null ) {
94- slotSupplier .releaseSlot (SlotReleaseReason .neverUsed (), maybePermitAnyway );
87+ // Use poll with timeout instead of take() to avoid being blocked indefinitely
88+ request = requestQueue .poll (100 , TimeUnit .MILLISECONDS );
89+ if (request == null ) {
90+ continue ;
9591 }
92+
93+ SlotSupplierFuture future = slotSupplier .reserveSlot (request .data );
94+ try {
95+ slotPermit = future .get ();
96+ } catch (InterruptedException e ) {
97+ SlotPermit maybePermitAnyway = future .abortReservation ();
98+ if (maybePermitAnyway != null ) {
99+ slotSupplier .releaseSlot (SlotReleaseReason .neverUsed (), maybePermitAnyway );
100+ }
101+ Thread .currentThread ().interrupt ();
102+ return ;
103+ } catch (ExecutionException e ) {
104+ log .error (
105+ "Error reserving local activity slot, dropped activity id {}" ,
106+ request .task .getActivityId (),
107+ e );
108+ continue ;
109+ }
110+
111+ request .task .getExecutionContext ().setPermit (slotPermit );
112+ afterReservedCallback .apply (request .task );
113+ } catch (InterruptedException e ) {
96114 Thread .currentThread ().interrupt ();
97115 return ;
98- } catch (ExecutionException e ) {
99- log .error (
100- "Error reserving local activity slot, dropped activity id {}" ,
101- request .task .getActivityId (),
102- e );
103- continue ;
104- }
105-
106- request .task .getExecutionContext ().setPermit (slotPermit );
107- afterReservedCallback .apply (request .task );
108- } catch (InterruptedException e ) {
109- Thread .currentThread ().interrupt ();
110- } catch (Throwable e ) {
111- // Fail the workflow task if something went wrong executing the local activity (at the
112- // executor level, otherwise, the LA handler itself should be handling errors)
113- log .error ("Unexpected error submitting local activity task to worker" , e );
114- if (slotPermit != null ) {
115- slotSupplier .releaseSlot (SlotReleaseReason .error (new RuntimeException (e )), slotPermit );
116- }
117- if (request != null ) {
118- LocalActivityExecutionContext executionContext = request .task .getExecutionContext ();
119- executionContext .callback (
120- LocalActivityResult .processingFailed (
121- executionContext .getActivityId (), request .task .getAttemptTask ().getAttempt (), e ));
116+ } catch (Throwable e ) {
117+ // Fail the workflow task if something went wrong executing the local activity (at the
118+ // executor level, otherwise, the LA handler itself should be handling errors)
119+ log .error ("Unexpected error submitting local activity task to worker" , e );
120+ if (slotPermit != null ) {
121+ slotSupplier .releaseSlot (SlotReleaseReason .error (new RuntimeException (e )), slotPermit );
122+ }
123+ if (request != null ) {
124+ LocalActivityExecutionContext executionContext = request .task .getExecutionContext ();
125+ executionContext .callback (
126+ LocalActivityResult .processingFailed (
127+ executionContext .getActivityId (),
128+ request .task .getAttemptTask ().getAttempt (),
129+ e ));
130+ }
131+ if (e .getCause () instanceof InterruptedException ) {
132+ Thread .currentThread ().interrupt ();
133+ return ;
134+ }
122135 }
123136 }
137+ } finally {
138+ log .debug ("LocalActivitySlotSupplierQueue thread exiting" );
124139 }
125140 }
126141
@@ -170,11 +185,10 @@ public boolean isTerminated() {
170185 @ Override
171186 public CompletableFuture <Void > shutdown (ShutdownManager shutdownManager , boolean interruptTasks ) {
172187 running = false ;
173- if (requestQueue .isEmpty ()) {
174- // Just interrupt the thread, so that if we're waiting on blocking take the thread will
175- // be interrupted and exit. Otherwise the loop will exit once the queue is empty.
176- queueThreadService .shutdownNow ();
177- }
188+
189+ queueThreadService .shutdownNow ();
190+
191+ log .debug ("LocalActivitySlotSupplierQueue shutdown initiated" );
178192
179193 return interruptTasks
180194 ? shutdownManager .shutdownExecutorNowUntimed (
@@ -190,6 +204,15 @@ public void awaitTermination(long timeout, TimeUnit unit) {
190204 // timeout duration if no task was ever submitted.
191205 return ;
192206 }
193- ShutdownManager .awaitTermination (queueThreadService , unit .toMillis (timeout ));
207+
208+ try {
209+ boolean terminated = queueThreadService .awaitTermination (timeout , unit );
210+ if (!terminated ) {
211+ log .warn ("LocalActivitySlotSupplierQueue did not terminate within the timeout period" );
212+ }
213+ } catch (InterruptedException e ) {
214+ Thread .currentThread ().interrupt ();
215+ log .warn ("Interrupted while waiting for LocalActivitySlotSupplierQueue termination" );
216+ }
194217 }
195218}
0 commit comments