@@ -7,20 +7,25 @@ import {
7
7
ReceiveMessageCommandOutput ,
8
8
} from '@aws-sdk/client-sqs' ;
9
9
import * as Sentry from '@sentry/node' ;
10
- import { config } from '../config' ;
11
- import { SqsMessage } from '../routes/queueDelete' ;
12
10
import { setTimeout } from 'timers/promises' ;
13
- import { SeverityLevel } from '@sentry/types' ;
14
- import { unleash } from '../unleash' ;
15
11
import { type Unleash } from 'unleash-client' ;
16
12
import { serverLogger } from '@pocket-tools/ts-logger' ;
17
- import { QueueConfig } from '../types' ;
18
13
import * as otel from '@opentelemetry/api' ;
19
14
20
- export abstract class QueueHandler {
21
- readonly sqsClient : SQSClient ;
15
+ export interface QueueConfig {
16
+ batchSize : number ;
17
+ url : string ;
18
+ visibilityTimeout : number ;
19
+ maxMessages : number ;
20
+ waitTimeSeconds : number ;
21
+ defaultPollIntervalSeconds : number ;
22
+ afterMessagePollIntervalSeconds : number ;
23
+ messageRetentionSeconds : number ;
24
+ name : string ;
25
+ }
26
+
27
+ export abstract class QueuePoller < TMessageBody > {
22
28
private tracer : otel . Tracer ;
23
- protected unleashClient : Unleash ;
24
29
25
30
/**
26
31
* Class for deleting records in batches from the database,
@@ -43,23 +48,26 @@ export abstract class QueueHandler {
43
48
* in the future.
44
49
*/
45
50
constructor (
46
- public readonly emitter : EventEmitter ,
47
- public readonly eventName : string ,
48
- public readonly queueConfig : QueueConfig ,
49
- pollOnInit = true ,
50
- unleashClient ?: Unleash ,
51
+ protected events : {
52
+ emitter : EventEmitter ;
53
+ eventName : string ;
54
+ } ,
55
+ protected sqs : {
56
+ config : QueueConfig ;
57
+ client : SQSClient ;
58
+ } ,
59
+ protected opts : {
60
+ pollOnInit ?: boolean ;
61
+ } ,
51
62
) {
52
- this . sqsClient = new SQSClient ( {
53
- region : config . aws . region ,
54
- endpoint : config . aws . endpoint ,
55
- maxAttempts : 3 ,
56
- } ) ;
57
- this . unleashClient = unleashClient ?? unleash ( ) ;
63
+ // Default to polling when class is instantiated (really only when
64
+ // testing do you want it otherwise)
65
+ const pollOnInit = opts . pollOnInit != null ? opts . pollOnInit : true ;
58
66
this . tracer = otel . trace . getTracer ( 'queue-tracer' ) ;
59
- emitter . on ( this . eventName , async ( ) => await this . pollQueue ( ) ) ;
67
+ events . emitter . on ( events . eventName , async ( ) => await this . pollQueue ( ) ) ;
60
68
// Start the polling by emitting an initial event
61
69
if ( pollOnInit ) {
62
- emitter . emit ( this . eventName ) ;
70
+ events . emitter . emit ( events . eventName ) ;
63
71
}
64
72
}
65
73
@@ -70,20 +78,22 @@ export abstract class QueueHandler {
70
78
*/
71
79
async deleteMessage ( message : Message ) {
72
80
const deleteParams = {
73
- QueueUrl : this . queueConfig . url ,
81
+ QueueUrl : this . sqs . config . url ,
74
82
ReceiptHandle : message . ReceiptHandle ,
75
83
} ;
76
84
try {
77
- await this . sqsClient . send ( new DeleteMessageCommand ( deleteParams ) ) ;
85
+ await this . sqs . client . send ( new DeleteMessageCommand ( deleteParams ) ) ;
78
86
} catch ( error ) {
79
87
const errorMessage = 'Error deleting message from queue' ;
80
88
serverLogger . error ( {
81
89
message : errorMessage ,
82
90
error : error ,
83
91
errorData : message ,
92
+ queue : this . sqs . config . url ,
93
+ } ) ;
94
+ Sentry . captureException ( error , {
95
+ data : { ...message , queue : this . sqs . config . url } ,
84
96
} ) ;
85
- Sentry . addBreadcrumb ( { message : errorMessage , data : message } ) ;
86
- Sentry . captureException ( error ) ;
87
97
}
88
98
}
89
99
@@ -95,7 +105,7 @@ export abstract class QueueHandler {
95
105
* @returns whether or not the message was successfully handled
96
106
* (underlying call to AccountDeleteDataService completed without error)
97
107
*/
98
- abstract handleMessage ( body : object ) : Promise < boolean > ;
108
+ abstract handleMessage ( body : TMessageBody ) : Promise < boolean > ;
99
109
/**
100
110
* Set a timeout to emit another poll event which will be handled
101
111
* by the listener.
@@ -106,7 +116,7 @@ export abstract class QueueHandler {
106
116
serverLogger . info ( `Set next poll timeout at ${ timeout } ` ) ;
107
117
await setTimeout ( timeout ) ;
108
118
}
109
- this . emitter . emit ( this . eventName ) ;
119
+ this . events . emitter . emit ( this . events . eventName ) ;
110
120
}
111
121
112
122
/**
@@ -118,7 +128,7 @@ export abstract class QueueHandler {
118
128
async pollQueue ( ) {
119
129
return await Sentry . withIsolationScope ( async ( ) => {
120
130
return await this . tracer . startActiveSpan (
121
- `poll-queue-${ this . queueConfig . name } ` ,
131
+ `poll-queue-${ this . sqs . config . name } ` ,
122
132
{ root : true } ,
123
133
async ( span : otel . Span ) => {
124
134
await this . __pollQueue ( span ) ;
@@ -140,20 +150,20 @@ export abstract class QueueHandler {
140
150
const params = {
141
151
// https://github.com/aws/aws-sdk/issues/233
142
152
AttributeNames : [ 'SentTimestamp' ] as any , // see issue above - bug in the SDK
143
- MaxNumberOfMessages : this . queueConfig . maxMessages ,
153
+ MaxNumberOfMessages : this . sqs . config . maxMessages ,
144
154
MessageAttributeNames : [ 'All' ] ,
145
- QueueUrl : this . queueConfig . url ,
146
- VisibilityTimeout : this . queueConfig . visibilityTimeout ,
147
- WaitTimeSeconds : this . queueConfig . waitTimeSeconds ,
155
+ QueueUrl : this . sqs . config . url ,
156
+ VisibilityTimeout : this . sqs . config . visibilityTimeout ,
157
+ WaitTimeSeconds : this . sqs . config . waitTimeSeconds ,
148
158
} ;
149
159
150
- serverLogger . info ( `Begining polling of ${ this . queueConfig . url } ` ) ;
160
+ serverLogger . info ( `Begining polling of ${ this . sqs . config . url } ` ) ;
151
161
152
162
let data : ReceiveMessageCommandOutput | null = null ;
153
- let body : SqsMessage | null = null ;
163
+ let body : TMessageBody | null = null ;
154
164
155
165
try {
156
- data = await this . sqsClient . send ( new ReceiveMessageCommand ( params ) ) ;
166
+ data = await this . sqs . client . send ( new ReceiveMessageCommand ( params ) ) ;
157
167
body =
158
168
data . Messages &&
159
169
data . Messages . length > 0 &&
@@ -162,9 +172,14 @@ export abstract class QueueHandler {
162
172
: null ;
163
173
} catch ( error ) {
164
174
const receiveError = 'PollQueue: Error receiving messages from queue' ;
165
- serverLogger . error ( { message : receiveError , error : error } ) ;
166
- Sentry . addBreadcrumb ( { message : receiveError } ) ;
167
- Sentry . captureException ( error , { level : 'fatal' as SeverityLevel } ) ;
175
+ serverLogger . error ( {
176
+ message : receiveError ,
177
+ error : error ,
178
+ queue : this . sqs . config . url ,
179
+ } ) ;
180
+ Sentry . captureException ( error , {
181
+ data : { queue : this . sqs . config . url } ,
182
+ } ) ;
168
183
span . recordException ( error ) ;
169
184
span . setStatus ( { code : otel . SpanStatusCode . ERROR } ) ;
170
185
}
@@ -176,12 +191,12 @@ export abstract class QueueHandler {
176
191
}
177
192
// Schedule next message poll
178
193
await this . scheduleNextPoll (
179
- this . queueConfig . afterMessagePollIntervalSeconds * 1000 ,
194
+ this . sqs . config . afterMessagePollIntervalSeconds * 1000 ,
180
195
) ;
181
196
} else {
182
197
// If no messages were found, schedule another poll after a short time
183
198
await this . scheduleNextPoll (
184
- this . queueConfig . defaultPollIntervalSeconds * 1000 ,
199
+ this . sqs . config . defaultPollIntervalSeconds * 1000 ,
185
200
) ;
186
201
}
187
202
}
0 commit comments