misk-aws: the sqs client should be a managed guava service#3655
misk-aws: the sqs client should be a managed guava service#3655
Conversation
5426a5f to
64f2864
Compare
There was a problem hiding this comment.
Pull request overview
This pull request introduces a managed Guava service wrapper for AWS SQS clients to ensure proper shutdown ordering and prevent concurrent shutdown issues. The changes establish a clear dependency hierarchy where services that use SQS clients (like RepeatedTaskQueue) now depend on AwsSqsClientService, ensuring in-flight tasks complete before HTTP connection pools are closed.
Changes:
- Introduced AwsSqsClientService as a managed AbstractIdleService to control SQS client lifecycle
- Added service dependency: RepeatedTaskQueue now depends on AwsSqsClientService for proper shutdown ordering
- Updated test module to bind AwsSqsClientService with shutdownClients=false to avoid shutting down shared test clients
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
| misk-aws/src/main/kotlin/misk/jobqueue/sqs/AwsSqsClientService.kt | New service wrapper that manages SQS client lifecycle with configurable shutdown behavior for testing |
| misk-aws/src/main/kotlin/misk/jobqueue/sqs/AwsSqsJobQueueModule.kt | Installs AwsSqsClientService and adds it as a dependency for RepeatedTaskQueue to enforce shutdown ordering |
| misk-aws/src/test/kotlin/misk/jobqueue/sqs/SqsJobQueueTestModule.kt | Binds test instance of AwsSqsClientService with shutdownClients=false for shared test clients |
| misk-aws/api/misk-aws.api | Added public API surface for new AwsSqsClientService class |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| open class AwsSqsClientService( | ||
| private val sqsClient: AmazonSQS, | ||
| @ForSqsReceiving private val sqsReceivingClient: AmazonSQS, | ||
| private val shutdownClients: Boolean, | ||
| ) : AbstractIdleService() { | ||
|
|
||
| @Inject constructor( | ||
| sqsClient: AmazonSQS, | ||
| @ForSqsReceiving sqsReceivingClient: AmazonSQS, | ||
| ) : this(sqsClient, sqsReceivingClient, shutdownClients = true) |
There was a problem hiding this comment.
The AwsSqsClientService manages only the default SQS clients (sqsClient and sqsReceivingClient) but does not handle region-specific clients that are created for external queues. These region-specific clients are created via AmazonSQSProvider and injected into QueueResolver as crossRegionSQS and crossRegionForReceivingSQS maps.
While this may be a pre-existing issue, since the goal of this PR is to ensure proper shutdown ordering for SQS clients, consider whether these region-specific clients should also be managed by this service to prevent resource leaks (unclosed HTTP connection pools). If these clients are intentionally not being shut down (e.g., because they're rarely used or for other reasons), please document this decision in the code or PR description.
| sqsClient.shutdown() | ||
| if (sqsReceivingClient !== sqsClient) { | ||
| sqsReceivingClient.shutdown() | ||
| } |
There was a problem hiding this comment.
If sqsClient.shutdown() throws an exception, the sqsReceivingClient.shutdown() will not be called, potentially leading to resource leaks. Consider wrapping each shutdown call in a try-catch block to ensure both clients are attempted to be shut down even if one fails. This is especially important since these are separate clients that might fail independently.
| sqsClient.shutdown() | |
| if (sqsReceivingClient !== sqsClient) { | |
| sqsReceivingClient.shutdown() | |
| } | |
| var error: RuntimeException? = null | |
| try { | |
| sqsClient.shutdown() | |
| } catch (e: RuntimeException) { | |
| log.error(e) { "Error shutting down primary AWS SQS client" } | |
| error = e | |
| } | |
| if (sqsReceivingClient !== sqsClient) { | |
| try { | |
| sqsReceivingClient.shutdown() | |
| } catch (e: RuntimeException) { | |
| log.error(e) { "Error shutting down receiving AWS SQS client" } | |
| if (error == null) { | |
| error = e | |
| } else { | |
| error.addSuppressed(e) | |
| } | |
| } | |
| } | |
| if (error != null) { | |
| throw error | |
| } |
| package misk.jobqueue.sqs | ||
|
|
||
| import com.amazonaws.services.sqs.AmazonSQS | ||
| import com.google.common.util.concurrent.AbstractIdleService | ||
| import jakarta.inject.Inject | ||
| import jakarta.inject.Singleton | ||
| import misk.logging.getLogger | ||
|
|
||
| /** | ||
| * A service wrapper for [AmazonSQS] clients that ensures proper shutdown ordering. | ||
| * | ||
| * This service doesn't do anything on startup, but on shutdown it closes the SQS clients. | ||
| * By making other services (like [misk.tasks.RepeatedTaskQueue]) depend on this service, | ||
| * we ensure those services complete their in-flight tasks before the HTTP connection pool | ||
| * is closed. | ||
| * | ||
| * @param shutdownClients If false, the clients will not be shut down when this service stops. | ||
| * This is useful for testing where clients are shared across tests. | ||
| */ | ||
| @Singleton | ||
| open class AwsSqsClientService( | ||
| private val sqsClient: AmazonSQS, | ||
| @ForSqsReceiving private val sqsReceivingClient: AmazonSQS, | ||
| private val shutdownClients: Boolean, | ||
| ) : AbstractIdleService() { | ||
|
|
||
| @Inject constructor( | ||
| sqsClient: AmazonSQS, | ||
| @ForSqsReceiving sqsReceivingClient: AmazonSQS, | ||
| ) : this(sqsClient, sqsReceivingClient, shutdownClients = true) | ||
|
|
||
| override fun startUp() { | ||
| log.info { "AWS SQS client service started" } | ||
| } | ||
|
|
||
| override fun shutDown() { | ||
| if (!shutdownClients) { | ||
| log.info { "AWS SQS client service stopped (clients not shut down)" } | ||
| return | ||
| } | ||
|
|
||
| log.info { "Shutting down AWS SQS clients" } | ||
| sqsClient.shutdown() | ||
| if (sqsReceivingClient !== sqsClient) { | ||
| sqsReceivingClient.shutdown() | ||
| } | ||
| log.info { "AWS SQS clients shut down" } | ||
| } | ||
|
|
||
| companion object { | ||
| private val log = getLogger<AwsSqsClientService>() | ||
| } | ||
| } |
There was a problem hiding this comment.
The test coverage for AwsSqsClientService appears to be indirect - it's only tested as part of the broader integration tests via SqsJobQueueTestModule. Consider adding a dedicated unit test for AwsSqsClientService that verifies:
- The service starts up correctly
- When shutdownClients is true, both clients are shut down (and handles the case where they're the same instance)
- When shutdownClients is false, clients are not shut down
- Proper logging occurs in each scenario
This would ensure the service lifecycle management works correctly in isolation.
This allows us to order the shutdown by adding dependencies between services instead of the guice concurrent shutdown we see. Amp-Thread-ID: https://ampcode.com/threads/T-019c2aac-43dd-709c-86a9-5e168bd909b6 Co-authored-by: Amp <amp@ampcode.com>
64f2864 to
1db08a5
Compare
natetarrh
left a comment
There was a problem hiding this comment.
change LGTM but did you commit an instructions file by mistake?
There was a problem hiding this comment.
intentional to include this?
https://linear.app/squareup/issue/FRAME-1630/misk-aws-sqs-jobqueue-handle-in-flight-tasks-during-shutdown
This allows us to order the shutdown by adding dependencies between
services instead of the guice concurrent shutdown we see.
See slack bug report
Description
Testing Strategy
Checklist
Thank you for contributing to Misk! 🎉