Skip to content

Commit 0152791

Browse files
authored
Merge pull request #34 from AthennaIO/develop
feat(driver): lock jobs inside drivers
2 parents 006d06d + 3622a51 commit 0152791

6 files changed

Lines changed: 159 additions & 78 deletions

File tree

package-lock.json

Lines changed: 66 additions & 69 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@athenna/queue",
3-
"version": "5.17.0",
3+
"version": "5.18.0",
44
"description": "The Athenna queue handler.",
55
"license": "MIT",
66
"author": "João Lenon <lenon@athenna.io>",

src/drivers/AwsSqsDriver.ts

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -387,20 +387,52 @@ export class AwsSqsDriver extends Driver<SQSClient> {
387387

388388
AwsSqsDriver.ackedIds.delete(job.id)
389389

390+
const heartbeatDelay = this.calculateHeartbeatDelay()
391+
392+
let heartbeatTimeout: NodeJS.Timeout
393+
394+
const startHeartbeat = () => {
395+
if (heartbeatDelay <= 0) {
396+
return
397+
}
398+
399+
heartbeatTimeout = setInterval(() => {
400+
this.changeJobVisibility(
401+
job.id,
402+
this.msToS(this.visibilityTimeout)
403+
).catch(() => {})
404+
}, heartbeatDelay)
405+
}
406+
407+
const stopHeartbeat = () => {
408+
if (!heartbeatTimeout) {
409+
return
410+
}
411+
412+
clearInterval(heartbeatTimeout)
413+
heartbeatTimeout = undefined
414+
}
415+
390416
try {
417+
startHeartbeat()
418+
391419
await processor({
392420
id: job.id,
393421
attempts: job.attempts,
394422
data: job.data
395423
})
396424

425+
stopHeartbeat()
426+
397427
if (!AwsSqsDriver.ackedIds.has(job.id)) {
398428
await this.changeJobVisibility(
399429
job.id,
400430
this.msToS(this.noAckDelayMs + requeueJitterMs)
401431
)
402432
}
403433
} catch (err) {
434+
stopHeartbeat()
435+
404436
const receiveCount = Number(
405437
job.metadata.Attributes?.ApproximateReceiveCount ?? '1'
406438
)

0 commit comments

Comments
 (0)