RepeatedTaskQueue: add interrupt_on_shutdown to abort in-flight tasks#3803
Open
sokolikp wants to merge 2 commits into
Open
RepeatedTaskQueue: add interrupt_on_shutdown to abort in-flight tasks#3803sokolikp wants to merge 2 commits into
sokolikp wants to merge 2 commits into
Conversation
RepeatedTaskQueue.awaitTerminated() only joins the dispatch loop thread,
not the underlying taskExecutor's worker carrying the task lambda. For
tasks that perform long-blocking I/O (e.g. an SQS receiveMessage
long-poll) this means awaitTerminated returns essentially immediately
on stopAsync(), while the task lambda is still parked inside its I/O
call. Callers that want to release downstream resources (HTTP client
pools, etc.) right after awaitTerminated then race with the still-in-
flight call, often producing 'connection pool shut down' style errors.
This change adds an opt-in RepeatedTaskQueueConfig.interrupt_on_shutdown
flag. When true:
- triggerShutdown() additionally calls taskExecutor.shutdownNow(),
which Thread.interrupt()s any worker currently running a task
lambda. Blocking I/O calls that respect interrupts (Apache HTTP
Client, AWS SDK v2, Thread.sleep, etc.) unwind immediately.
- The catch (Throwable) blocks in schedule() and scheduleWithBackoff()
detect InterruptedException (directly, via the thread's interrupt
flag, or as a wrapped cause), restore the interrupt flag, and
return Status.NO_RESCHEDULE instead of marking the task FAILED and
rescheduling it.
- The dispatch loop's submit() call is wrapped in a try/catch for
RejectedExecutionException, since the executor may be shut down
between the running.get() check and the submit. This avoids the
dispatch thread terminating with a failure state at shutdown.
Defaults to false. Existing callers see no behavior change.
Add RepeatedTaskQueueShutdownInterruptTest covering: interrupt-enabled
shutdown completes promptly, interrupted tasks are not rescheduled,
and interrupt-disabled (the default) preserves historical behavior.
Amp-Thread-ID: https://ampcode.com/threads/T-019dfd53-8e08-768b-bbf8-866aa2fca7c6
Co-authored-by: Amp <amp@ampcode.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Adds an opt-in
RepeatedTaskQueueConfig.interrupt_on_shutdownflag that makesRepeatedTaskQueue.triggerShutdown()interrupt in-flight task lambdas viataskExecutor.shutdownNow(), soawaitTerminated()actually waits for the lambda to unwind rather than just for the dispatch loop.Motivation
RepeatedTaskQueueextendsAbstractExecutionThreadService, which only manages the single dispatch thread that drivesrun(). WhenstopAsync()triggers shutdown:runningis flipped tofalseand a wakeup task is added to theDelayQueue.running == false, returns.TERMINATED—awaitTerminated()returns.The
taskExecutorand any worker thread currently executing a previously-submitted task lambda are never touched. For tasks that perform long-blocking I/O (e.g. an SQSreceiveMessagelong-poll, which can park inside an HTTP call for up to 20s), this means the caller'sawaitTerminated()returns essentially immediately while the lambda is still mid-I/O.If the caller then closes a downstream resource (HTTP connection pool, AWS SDK client, DB connection, etc.) right after
awaitTerminated()— which is the textbook lifecycle — it races the still-in-flight call. We've been chasing exactly this race in cash-server's SQS consumer (IllegalStateException: Connection pool shut downfromSqsConsumer.processMessagesat pod shutdown).The current options are all unsatisfying:
ReentrantLock+Conditionand have the caller block on it before close. Works, but ~150 lines of bespoke concurrency code that every consumer ofRepeatedTaskQueuewould need to re-derive.The standard JDK pattern for "stop the workers now" is
ExecutorService.shutdownNow(), which callsThread.interrupt()on each active worker. AWS SDK v2, Apache HTTP Client, and most blocking JDK calls (Thread.sleep,Object.wait,BlockingQueue.poll, etc.) respond to interrupts. This change wires that pattern intoRepeatedTaskQueuebehind an opt-in flag.Change
RepeatedTaskQueueConfigNew field:
RepeatedTaskQueue.triggerShutdown()After the existing
running.set(false)+ wakeup-task path, if the flag is set, calltaskExecutor.shutdownNow(). This sendsThread.interrupt()to every active worker.RepeatedTaskQueue.schedule()andscheduleWithBackoff()Both
catch (Throwable)blocks now check whether the throwable indicates a shutdown interrupt (directInterruptedException, anInterruptedExceptionsomewhere in the cause chain, orThread.currentThread().isInterrupted == true). If so, restore the interrupt flag and returnStatus.NO_RESCHEDULEinstead ofStatus.FAILED. This prevents an interrupted shutdown from silently re-enqueueing the task and immediately re-running it on the still-shutting-down executor.RepeatedTaskQueue.run()Wrap
taskExecutor.submit { ... }in atry/catch (RejectedExecutionException). WithshutdownNow()enabled, there's a tiny window where the dispatch loop has passed itsrunning.get()check but the executor is already shut down —submitwould then throw and tear down the dispatch thread asFAILEDrather thanTERMINATED. We swallow theREEonly whenrunning == false(i.e. clearly during shutdown).Why opt-in
Tasks that swallow
InterruptedExceptionwithout surfacing it, or perform genuinely uninterruptible work that must complete (e.g. a critical-section DB write), would have their work silently abandoned at shutdown. The default offalsepreserves existing behavior exactly. Only callers that explicitly opt in get the new semantics.Tests
New
RepeatedTaskQueueShutdownInterruptTest(uses real, not direct, executors so interrupt behavior actually exercises the worker thread):awaitTerminated returns promptly when interrupt_on_shutdown is enabled— schedules a task that sleeps for 60s, callsstopAsync()+awaitTerminated(), asserts the call returns in well under 5s and the task observedInterruptedException.interrupted task is not rescheduled— verifies the catch block'sNO_RESCHEDULEpath: an interrupted task runs exactly once, never re-enqueued.awaitTerminated may not wait for in-flight task when interrupt_on_shutdown is disabled— pins down the historical default: the in-flight task is not interrupted by us, the service still terminates.All 20 existing
RepeatedTaskQueueTesttests still pass.Public API
misk/api/misk.apiupdated. The change is the canonical Kotlin "data class + new param with default" diff: a new(JJIZ)Vctor and acopy(JJIZ)replacingcopy(JJI). Source-compat is preserved for callers using named arguments; the only callers affected are those that calledRepeatedTaskQueueConfig.copy(...)positionally with all three existing args, which appears to be no one in this repo.Verification
bin/gradle :misk:compileKotlin --warn✅bin/gradle :misk:test --tests "misk.tasks.RepeatedTaskQueueShutdownInterruptTest"✅ (3/3)bin/gradle :misk:test --tests "misk.tasks.RepeatedTaskQueueTest"✅ (20/20)bin/gradle :misk:apiCheck --warn✅bin/gradle :misk:detekt --warn✅