Skip to content

Commit 021ab7f

Browse files
authored
feat(queue): emit internal duplicated event (#2754)
1 parent 4cea99e commit 021ab7f

File tree

3 files changed

+25
-3
lines changed

3 files changed

+25
-3
lines changed

lib/commands/addJob-6.lua

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ else
5151
jobId = ARGV[2]
5252
jobIdKey = ARGV[1] .. jobId
5353
if rcall("EXISTS", jobIdKey) == 1 then
54-
rcall("PUBLISH", ARGV[1] .. "duplicated", jobId)
54+
rcall("PUBLISH", ARGV[1] .. "duplicated@" .. ARGV[11], jobId)
5555
return jobId .. "" -- convert to string
5656
end
5757
end

lib/queue.js

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -440,6 +440,9 @@ Queue.prototype._setupQueueEventListeners = function() {
440440
utils.emitSafe(this, 'global:stalled', message);
441441
break;
442442
case duplicatedKey:
443+
if (this.token === token) {
444+
utils.emitSafe(this, 'duplicated', message);
445+
}
443446
utils.emitSafe(this, 'global:duplicated', message);
444447
break;
445448
}
@@ -510,7 +513,7 @@ Queue.prototype._setupQueueEventListeners = function() {
510513
};
511514

512515
Queue.prototype._registerEvent = function(eventName) {
513-
const internalEvents = ['waiting', 'delayed'];
516+
const internalEvents = ['waiting', 'delayed', 'duplicated'];
514517

515518
if (
516519
eventName.startsWith('global:') ||

test/test_queue.js

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1098,7 +1098,7 @@ describe('Queue', () => {
10981098
});
10991099

11001100
describe('when job has been added again', () => {
1101-
it('emits duplicated event', async () => {
1101+
it('emits global duplicated event', async () => {
11021102
queue.process(
11031103
async () => {
11041104
await delay(50);
@@ -1116,6 +1116,25 @@ describe('Queue', () => {
11161116
});
11171117
});
11181118
});
1119+
1120+
it('emits duplicated event', async () => {
1121+
queue.process(
1122+
async () => {
1123+
await delay(50);
1124+
await queue.add({ foo: 'bar' }, { jobId: 'a1' });
1125+
await delay(50);
1126+
}
1127+
);
1128+
1129+
await queue.add({ foo: 'bar' }, { jobId: 'a1' });
1130+
1131+
await new Promise(resolve => {
1132+
queue.once('duplicated', (jobId) => {
1133+
expect(jobId).to.be.equal('a1');
1134+
resolve();
1135+
});
1136+
});
1137+
});
11191138
});
11201139

11211140
it('process a job that updates progress', done => {

0 commit comments

Comments
 (0)