Skip to content

Commit c8de640

Browse files
authored
fix(flow): fail parent on failure by default (#2682)
1 parent 8260582 commit c8de640

File tree

6 files changed

+36
-49
lines changed

6 files changed

+36
-49
lines changed

src/classes/job.ts

+1-7
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ const logger = debuglog('bull');
3939

4040
const optsDecodeMap = {
4141
de: 'debounce',
42-
fpof: 'failParentOnFailure',
42+
fpof: 'failParentOnFailure', // TODO: deprecate it in next breaking change
4343
idof: 'ignoreDependencyOnFailure',
4444
kl: 'keepLogs',
4545
rdof: 'removeDependencyOnFailure',
@@ -1210,12 +1210,6 @@ export class Job<
12101210
throw new Error(`Delay and repeat options could not be used together`);
12111211
}
12121212

1213-
if (this.opts.removeDependencyOnFailure && this.opts.failParentOnFailure) {
1214-
throw new Error(
1215-
`RemoveDependencyOnFailure and failParentOnFailure options can not be used together`,
1216-
);
1217-
}
1218-
12191213
if (`${parseInt(this.id, 10)}` === this.id) {
12201214
throw new Error('Custom Ids cannot be integers');
12211215
}

src/commands/includes/moveParentFromWaitingChildrenToFailed.lua

+8-8
Original file line numberDiff line numberDiff line change
@@ -20,21 +20,21 @@ local function moveParentFromWaitingChildrenToFailed( parentQueueKey, parentKey,
2020

2121
if jobAttributes[1] then
2222
local parentData = cjson.decode(jobAttributes[1])
23-
if parentData['fpof'] then
23+
if parentData['rdof'] then
24+
local grandParentKey = parentData['queueKey'] .. ':' .. parentData['id']
25+
local grandParentDependenciesSet = grandParentKey .. ":dependencies"
26+
if rcall("SREM", grandParentDependenciesSet, parentKey) == 1 then
27+
moveParentToWaitIfNeeded(parentData['queueKey'], grandParentDependenciesSet,
28+
grandParentKey, parentData['id'], timestamp)
29+
end
30+
else
2431
moveParentFromWaitingChildrenToFailed(
2532
parentData['queueKey'],
2633
parentData['queueKey'] .. ':' .. parentData['id'],
2734
parentData['id'],
2835
parentKey,
2936
timestamp
3037
)
31-
elseif parentData['rdof'] then
32-
local grandParentKey = parentData['queueKey'] .. ':' .. parentData['id']
33-
local grandParentDependenciesSet = grandParentKey .. ":dependencies"
34-
if rcall("SREM", grandParentDependenciesSet, parentKey) == 1 then
35-
moveParentToWaitIfNeeded(parentData['queueKey'], grandParentDependenciesSet,
36-
grandParentKey, parentData['id'], timestamp)
37-
end
3838
end
3939
end
4040
end

src/commands/moveToFinished-14.lua

+6-6
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@
4040
opts - maxMetricsSize
4141
opts - fpof - fail parent on fail
4242
opts - idof - ignore dependency on fail
43-
opts - rdof - remove dependency on fail
43+
opts - rdof - remove dependency on fail TODO: remove it in next breaking change
4444
4545
Output:
4646
0 OK
@@ -140,11 +140,7 @@ if rcall("EXISTS", jobIdKey) == 1 then -- // Make sure job exists
140140
ARGV[4], timestamp)
141141
end
142142
else
143-
if opts['fpof'] then
144-
moveParentFromWaitingChildrenToFailed(parentQueueKey, parentKey,
145-
parentId, jobIdKey,
146-
timestamp)
147-
elseif opts['idof'] or opts['rdof'] then
143+
if opts['idof'] or opts['rdof'] then
148144
local dependenciesSet = parentKey .. ":dependencies"
149145
if rcall("SREM", dependenciesSet, jobIdKey) == 1 then
150146
moveParentToWaitIfNeeded(parentQueueKey, dependenciesSet,
@@ -154,6 +150,10 @@ if rcall("EXISTS", jobIdKey) == 1 then -- // Make sure job exists
154150
rcall("HSET", failedSet, jobIdKey, ARGV[4])
155151
end
156152
end
153+
else
154+
moveParentFromWaitingChildrenToFailed(parentQueueKey, parentKey,
155+
parentId, jobIdKey,
156+
timestamp)
157157
end
158158
end
159159
end

tests/test_flow.ts

+14-13
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ describe('flows', () => {
4040
});
4141

4242
describe('when removeOnFail is true in last pending child', () => {
43-
it('moves parent to wait without getting stuck', async () => {
43+
it('moves parent to failed without getting stuck', async () => {
4444
const worker = new Worker(
4545
queueName,
4646
async job => {
@@ -51,9 +51,14 @@ describe('flows', () => {
5151
{ connection, prefix },
5252
);
5353
await worker.waitUntilReady();
54+
const queueEvents = new QueueEvents(queueName, {
55+
connection,
56+
prefix,
57+
});
58+
await queueEvents.waitUntilReady();
5459

5560
const flow = new FlowProducer({ connection, prefix });
56-
await flow.add({
61+
const tree = await flow.add({
5762
name: 'parent',
5863
data: {},
5964
queueName,
@@ -74,21 +79,17 @@ describe('flows', () => {
7479
],
7580
});
7681

77-
const completed = new Promise<void>((resolve, reject) => {
78-
worker.on('completed', async (job: Job) => {
79-
try {
80-
if (job.name === 'parent') {
81-
const { processed } = await job.getDependenciesCount();
82-
expect(processed).to.equal(1);
83-
resolve();
84-
}
85-
} catch (err) {
86-
reject(err);
82+
const failed = new Promise<void>(resolve => {
83+
queueEvents.on('failed', async ({ jobId, failedReason, prev }) => {
84+
if (jobId === tree.job.id) {
85+
const { processed } = await tree.job!.getDependenciesCount();
86+
expect(processed).to.equal(1);
87+
resolve();
8788
}
8889
});
8990
});
9091

91-
await completed;
92+
await failed;
9293
await flow.close();
9394
await worker.close();
9495
});

tests/test_getters.ts

+7-2
Original file line numberDiff line numberDiff line change
@@ -809,20 +809,25 @@ describe('Jobs getters', function () {
809809
});
810810
});
811811

812+
await queue.add('test', {});
812813
const flow = new FlowProducer({ connection, prefix });
813814
await flow.add({
814815
name: 'parent-job',
815816
queueName,
816817
data: {},
817818
children: [
818-
{ name: 'child-1', data: { idx: 0, foo: 'bar' }, queueName },
819+
{
820+
name: 'child-1',
821+
data: { idx: 0, foo: 'bar' },
822+
queueName,
823+
opts: { delay: 6000 },
824+
},
819825
{ name: 'child-2', data: { idx: 1, foo: 'baz' }, queueName },
820826
{ name: 'child-3', data: { idx: 2, foo: 'bac' }, queueName },
821827
{ name: 'child-4', data: { idx: 3, foo: 'bad' }, queueName },
822828
],
823829
});
824830

825-
await queue.add('test', { idx: 2 }, { delay: 5000 });
826831
await queue.add('test', { idx: 3 }, { priority: 5 });
827832

828833
await completing;

tests/test_job.ts

-13
Original file line numberDiff line numberDiff line change
@@ -139,19 +139,6 @@ describe('Job', function () {
139139
});
140140
});
141141

142-
describe('when removeDependencyOnFailure and failParentOnFailure options are provided', () => {
143-
it('throws an error', async () => {
144-
const data = { foo: 'bar' };
145-
const opts = {
146-
removeDependencyOnFailure: true,
147-
failParentOnFailure: true,
148-
};
149-
await expect(Job.create(queue, 'test', data, opts)).to.be.rejectedWith(
150-
'RemoveDependencyOnFailure and failParentOnFailure options can not be used together',
151-
);
152-
});
153-
});
154-
155142
describe('when priority option is provided as float', () => {
156143
it('throws an error', async () => {
157144
const data = { foo: 'bar' };

0 commit comments

Comments
 (0)