Skip to content

Commit c75d4a3

Browse files
kibertoadclaude
andcommitted
feat(sqs): queue lifecycle helpers and Redis-first guidance
Adds opt-in queue lifecycle management for the SNS/SQS adapter so users who pick the per-instance queue model have first-class tools to keep queues from accumulating across restarts: - consumer.lifecycle.deleteQueueOnClose / unsubscribeOnClose run on close() for graceful-shutdown cleanup - consumer.lifecycle.heartbeat writes a layered-loader:heartbeat tag on the consumer's own queue on a timer - reapStaleQueues({ ... }) helper scans queues by prefix and deletes ones whose heartbeat is older than the threshold, optionally also unsubscribing orphan SNS subscriptions All opt-in and default off; existing users see no behaviour change. Documentation rewrites the Update notifications and Flexible invalidation triggers sections to strongly recommend Redis pub/sub whenever feasible, and to lead with the Redis-publisher-plus-SQS-trigger hybrid pattern (shape C) for the upstream-events use case. That hybrid uses a single shared SQS queue across all instances, avoiding the per-instance queue churn problem entirely. The package README now opens with a "Picking your shape" table making the three deployment shapes (pure Redis, pure SNS/SQS, hybrid) explicit and recommending shape C whenever both Redis and AWS upstream events are in play. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 8377943 commit c75d4a3

10 files changed

Lines changed: 1278 additions & 35 deletions

README.md

Lines changed: 166 additions & 30 deletions
Large diffs are not rendered by default.

packages/sqs/README.md

Lines changed: 226 additions & 1 deletion
Large diffs are not rendered by default.

packages/sqs/index.ts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,19 @@ export {
2727
createGroupNotificationPair,
2828
type SqsGroupNotificationConfig,
2929
} from './lib/SqsGroupNotificationFactory.js'
30+
export {
31+
DEFAULT_HEARTBEAT_INTERVAL_MS,
32+
DEFAULT_REAPER_IDLE_THRESHOLD_MS,
33+
HEARTBEAT_TAG_KEY,
34+
reapStaleQueues,
35+
resolveQueueUrl,
36+
startQueueHeartbeat,
37+
type HeartbeatOptions,
38+
type HeartbeatRunner,
39+
type QueueLifecycleOptions,
40+
type ReapStaleQueuesParams,
41+
type ReapStaleQueuesResult,
42+
} from './lib/queueLifecycle.js'
3043
export {
3144
CLEAR_COMMAND,
3245
DELETE_COMMAND,

packages/sqs/lib/SqsGroupNotificationConsumer.ts

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import { UnsubscribeCommand } from '@aws-sdk/client-sns'
2+
import { DeleteQueueCommand } from '@aws-sdk/client-sqs'
13
import { MessageHandlerConfigBuilder } from '@message-queue-toolkit/core'
24
import {
35
AbstractSnsSqsConsumer,
@@ -8,6 +10,11 @@ import {
810
} from '@message-queue-toolkit/sns'
911
import type { ConsumerErrorHandler, SynchronousGroupCache } from 'layered-loader'
1012
import { AbstractNotificationConsumer } from 'layered-loader'
13+
import {
14+
type HeartbeatRunner,
15+
type QueueLifecycleOptions,
16+
startQueueHeartbeat,
17+
} from './queueLifecycle.js'
1118
import type { SqsSubscriptionOptions } from './SqsNotificationConsumer.js'
1219
import {
1320
CLEAR_GROUP_NOTIFICATION_SCHEMA,
@@ -28,6 +35,12 @@ export type SqsGroupNotificationConsumerParams = {
2835
errorHandler?: ConsumerErrorHandler
2936
dependencies: SNSSQSConsumerDependencies
3037
subscriptionConfig?: SqsSubscriptionOptions
38+
/**
39+
* Optional queue-lifecycle behaviour. When set, controls automatic queue
40+
* cleanup on close and/or periodic heartbeat tagging used by
41+
* `reapStaleQueues`. See {@link QueueLifecycleOptions}.
42+
*/
43+
lifecycle?: QueueLifecycleOptions
3144
} & SqsGroupNotificationConsumerConfig
3245

3346
type ConsumerContext<LoadedValue> = {
@@ -71,6 +84,7 @@ export class SqsGroupNotificationConsumer<LoadedValue> extends AbstractNotificat
7184
private readonly params: SqsGroupNotificationConsumerParams
7285
private internalConsumer?: SnsSqsGroupInvalidationConsumer<LoadedValue>
7386
private subscribePromise?: Promise<SnsSqsGroupInvalidationConsumer<LoadedValue>>
87+
private heartbeat?: HeartbeatRunner
7488

7589
constructor(params: SqsGroupNotificationConsumerParams) {
7690
super(params.serverUuid, params.errorHandler)
@@ -143,6 +157,14 @@ export class SqsGroupNotificationConsumer<LoadedValue> extends AbstractNotificat
143157
await consumer.init()
144158
await consumer.start()
145159
this.internalConsumer = consumer
160+
if (this.params.lifecycle?.heartbeat) {
161+
this.heartbeat = startQueueHeartbeat({
162+
sqsClient: this.params.dependencies.sqsClient,
163+
queueUrl: consumer.publicQueueUrl,
164+
intervalMs: this.params.lifecycle.heartbeat.intervalMs,
165+
errorHandler: this.params.lifecycle.heartbeat.errorHandler,
166+
})
167+
}
146168
return consumer
147169
} catch (err) {
148170
await consumer.close().catch(() => undefined)
@@ -161,7 +183,35 @@ export class SqsGroupNotificationConsumer<LoadedValue> extends AbstractNotificat
161183
if (!this.internalConsumer) return
162184
const consumer = this.internalConsumer
163185
this.internalConsumer = undefined
186+
187+
this.heartbeat?.stop()
188+
this.heartbeat = undefined
189+
190+
const queueUrl = consumer.publicQueueUrl
191+
const subscriptionArn = consumer.publicSubscriptionArn
192+
const unsubscribeOnClose = this.params.lifecycle?.unsubscribeOnClose ?? false
193+
const deleteQueueOnClose = this.params.lifecycle?.deleteQueueOnClose ?? false
194+
164195
await consumer.close()
196+
197+
if (unsubscribeOnClose && subscriptionArn) {
198+
try {
199+
await this.params.dependencies.snsClient.send(
200+
new UnsubscribeCommand({ SubscriptionArn: subscriptionArn }),
201+
)
202+
} catch (err) {
203+
this.params.lifecycle?.onCleanupError?.(err as Error, 'unsubscribe')
204+
}
205+
}
206+
if (deleteQueueOnClose && queueUrl) {
207+
try {
208+
await this.params.dependencies.sqsClient.send(
209+
new DeleteQueueCommand({ QueueUrl: queueUrl }),
210+
)
211+
} catch (err) {
212+
this.params.lifecycle?.onCleanupError?.(err as Error, 'deleteQueue')
213+
}
214+
}
165215
}
166216

167217
get topicArn(): string | undefined {

packages/sqs/lib/SqsGroupNotificationFactory.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { randomUUID } from 'node:crypto'
22
import type { SNSDependencies, SNSSQSConsumerDependencies } from '@message-queue-toolkit/sns'
33
import type { ConsumerErrorHandler, PublisherErrorHandler } from 'layered-loader'
4+
import type { QueueLifecycleOptions } from './queueLifecycle.js'
45
import {
56
SqsGroupNotificationConsumer,
67
type SqsGroupNotificationConsumerConfig,
@@ -22,6 +23,13 @@ export type SqsGroupNotificationConfig = {
2223
consumer: {
2324
dependencies: SNSSQSConsumerDependencies
2425
subscriptionConfig?: SqsSubscriptionOptions
26+
/**
27+
* Optional queue-lifecycle behaviour. Only relevant when each instance owns
28+
* its own SQS queue (the default shape for full SNS/SQS fanout). Setups
29+
* that pair this consumer with a Redis publisher and a shared queue do not
30+
* need any of these knobs.
31+
*/
32+
lifecycle?: QueueLifecycleOptions
2533
} & SqsGroupNotificationConsumerConfig
2634
}
2735

@@ -59,13 +67,15 @@ export function createGroupNotificationPair<LoadedValue>(config: SqsGroupNotific
5967
errorHandler: config.consumerErrorHandler,
6068
dependencies: consumerConfig.dependencies,
6169
subscriptionConfig: consumerConfig.subscriptionConfig,
70+
lifecycle: consumerConfig.lifecycle,
6271
creationConfig: consumerConfig.creationConfig,
6372
}
6473
: {
6574
serverUuid,
6675
errorHandler: config.consumerErrorHandler,
6776
dependencies: consumerConfig.dependencies,
6877
subscriptionConfig: consumerConfig.subscriptionConfig,
78+
lifecycle: consumerConfig.lifecycle,
6979
locatorConfig: consumerConfig.locatorConfig,
7080
},
7181
)

packages/sqs/lib/SqsNotificationConsumer.ts

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
import type { SubscribeCommandInput } from '@aws-sdk/client-sns'
1+
import { type SubscribeCommandInput, UnsubscribeCommand } from '@aws-sdk/client-sns'
2+
import { DeleteQueueCommand } from '@aws-sdk/client-sqs'
23
import { MessageHandlerConfigBuilder } from '@message-queue-toolkit/core'
34
import {
45
AbstractSnsSqsConsumer,
@@ -9,6 +10,11 @@ import {
910
} from '@message-queue-toolkit/sns'
1011
import type { ConsumerErrorHandler, SynchronousCache } from 'layered-loader'
1112
import { AbstractNotificationConsumer } from 'layered-loader'
13+
import {
14+
type HeartbeatRunner,
15+
type QueueLifecycleOptions,
16+
startQueueHeartbeat,
17+
} from './queueLifecycle.js'
1218
import {
1319
CLEAR_NOTIFICATION_SCHEMA,
1420
DELETE_MANY_NOTIFICATION_SCHEMA,
@@ -40,6 +46,12 @@ export type SqsNotificationConsumerParams = {
4046
errorHandler?: ConsumerErrorHandler
4147
dependencies: SNSSQSConsumerDependencies
4248
subscriptionConfig?: SqsSubscriptionOptions
49+
/**
50+
* Optional queue-lifecycle behaviour. When set, controls automatic queue
51+
* cleanup on close and/or periodic heartbeat tagging used by
52+
* {@link reapStaleQueues}. See {@link QueueLifecycleOptions}.
53+
*/
54+
lifecycle?: QueueLifecycleOptions
4355
} & SqsNotificationConsumerConfig
4456

4557
type ConsumerContext<LoadedValue> = {
@@ -76,6 +88,7 @@ export class SqsNotificationConsumer<LoadedValue> extends AbstractNotificationCo
7688
private readonly params: SqsNotificationConsumerParams
7789
private internalConsumer?: SnsSqsInvalidationConsumer<LoadedValue>
7890
private subscribePromise?: Promise<SnsSqsInvalidationConsumer<LoadedValue>>
91+
private heartbeat?: HeartbeatRunner
7992

8093
constructor(params: SqsNotificationConsumerParams) {
8194
super(params.serverUuid, params.errorHandler)
@@ -151,6 +164,14 @@ export class SqsNotificationConsumer<LoadedValue> extends AbstractNotificationCo
151164
await consumer.init()
152165
await consumer.start()
153166
this.internalConsumer = consumer
167+
if (this.params.lifecycle?.heartbeat) {
168+
this.heartbeat = startQueueHeartbeat({
169+
sqsClient: this.params.dependencies.sqsClient,
170+
queueUrl: consumer.publicQueueUrl,
171+
intervalMs: this.params.lifecycle.heartbeat.intervalMs,
172+
errorHandler: this.params.lifecycle.heartbeat.errorHandler,
173+
})
174+
}
154175
return consumer
155176
} catch (err) {
156177
await consumer.close().catch(() => undefined)
@@ -169,7 +190,38 @@ export class SqsNotificationConsumer<LoadedValue> extends AbstractNotificationCo
169190
if (!this.internalConsumer) return
170191
const consumer = this.internalConsumer
171192
this.internalConsumer = undefined
193+
194+
this.heartbeat?.stop()
195+
this.heartbeat = undefined
196+
197+
const queueUrl = consumer.publicQueueUrl
198+
const subscriptionArn = consumer.publicSubscriptionArn
199+
const unsubscribeOnClose = this.params.lifecycle?.unsubscribeOnClose ?? false
200+
const deleteQueueOnClose = this.params.lifecycle?.deleteQueueOnClose ?? false
201+
172202
await consumer.close()
203+
204+
// Cleanup is best-effort: missing resources (already deleted, never created)
205+
// must not break shutdown. The user opts in to this behaviour, so swallowing
206+
// errors is the right default — they can re-run reapStaleQueues if needed.
207+
if (unsubscribeOnClose && subscriptionArn) {
208+
try {
209+
await this.params.dependencies.snsClient.send(
210+
new UnsubscribeCommand({ SubscriptionArn: subscriptionArn }),
211+
)
212+
} catch (err) {
213+
this.params.lifecycle?.onCleanupError?.(err as Error, 'unsubscribe')
214+
}
215+
}
216+
if (deleteQueueOnClose && queueUrl) {
217+
try {
218+
await this.params.dependencies.sqsClient.send(
219+
new DeleteQueueCommand({ QueueUrl: queueUrl }),
220+
)
221+
} catch (err) {
222+
this.params.lifecycle?.onCleanupError?.(err as Error, 'deleteQueue')
223+
}
224+
}
173225
}
174226

175227
/**

packages/sqs/lib/SqsNotificationFactory.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { randomUUID } from 'node:crypto'
22
import type { SNSDependencies, SNSSQSConsumerDependencies } from '@message-queue-toolkit/sns'
33
import type { ConsumerErrorHandler, PublisherErrorHandler } from 'layered-loader'
4+
import type { QueueLifecycleOptions } from './queueLifecycle.js'
45
import {
56
SqsNotificationConsumer,
67
type SqsNotificationConsumerConfig,
@@ -30,6 +31,13 @@ export type SqsNotificationConfig = {
3031
consumer: {
3132
dependencies: SNSSQSConsumerDependencies
3233
subscriptionConfig?: SqsSubscriptionOptions
34+
/**
35+
* Optional queue-lifecycle behaviour. Only relevant when each instance owns
36+
* its own SQS queue (the default shape for full SNS/SQS fanout). Setups
37+
* that pair this consumer with a Redis publisher and a shared queue do not
38+
* need any of these knobs.
39+
*/
40+
lifecycle?: QueueLifecycleOptions
3341
} & SqsNotificationConsumerConfig
3442
}
3543

@@ -67,13 +75,15 @@ export function createNotificationPair<LoadedValue>(config: SqsNotificationConfi
6775
errorHandler: config.consumerErrorHandler,
6876
dependencies: consumerConfig.dependencies,
6977
subscriptionConfig: consumerConfig.subscriptionConfig,
78+
lifecycle: consumerConfig.lifecycle,
7079
creationConfig: consumerConfig.creationConfig,
7180
}
7281
: {
7382
serverUuid,
7483
errorHandler: config.consumerErrorHandler,
7584
dependencies: consumerConfig.dependencies,
7685
subscriptionConfig: consumerConfig.subscriptionConfig,
86+
lifecycle: consumerConfig.lifecycle,
7787
locatorConfig: consumerConfig.locatorConfig,
7888
},
7989
)

0 commit comments

Comments
 (0)