Skip to content

Commit eb7a053

Browse files
authored
fix: step position race condition (#860)
Changes 1. Create step - Refactor PATCH query - SET ISOLATION LEVEL - Unit tests 2. Delete step - SET ISOLATION LEVEL 3. Publish/unpublish flow - Set published at for all pipes - Check that positions are correct - Unit tests
1 parent 63d169d commit eb7a053

File tree

5 files changed

+372
-24
lines changed

5 files changed

+372
-24
lines changed
Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
import { beforeEach, describe, expect, it } from 'vitest'
2+
3+
import createStep from '@/graphql/mutations/create-step'
4+
import Flow from '@/models/flow'
5+
import Step from '@/models/step'
6+
import User from '@/models/user'
7+
import Context from '@/types/express/context'
8+
9+
describe('createStep mutation integration tests', async () => {
10+
let testFlow: Flow
11+
let existingSteps: Step[]
12+
let context: Context
13+
14+
// Clean up (and seed) database before each test.
15+
beforeEach(async () => {
16+
// Clear out all rows. Adjust deletion order if using foreign keys.
17+
await Step.query().delete()
18+
await Flow.query().delete()
19+
20+
const testUser = await User.query().findOne({ email: '[email protected]' })
21+
context = {
22+
req: null,
23+
currentUser: testUser,
24+
res: null,
25+
isAdminOperation: false,
26+
}
27+
28+
// Create a flow associated with the test user.
29+
testFlow = await testUser.$relatedQuery('flows').insertAndFetch({
30+
name: 'Test Flow',
31+
// additional flow properties as needed
32+
})
33+
34+
// Create a "previous" step in the flow with position 1.
35+
existingSteps = await testFlow.$relatedQuery('steps').insertAndFetch([
36+
{
37+
key: 'newSubmission',
38+
appKey: 'formsg',
39+
type: 'trigger',
40+
position: 1,
41+
parameters: { foo: 'bar' },
42+
},
43+
{
44+
key: 'key',
45+
appKey: 'appKey',
46+
type: 'action',
47+
position: 2,
48+
parameters: {},
49+
},
50+
{
51+
key: 'key',
52+
appKey: 'appKey',
53+
type: 'action',
54+
position: 3,
55+
parameters: {},
56+
},
57+
])
58+
})
59+
60+
it('creates a new step correctly and shift later steps down', async () => {
61+
const params = {
62+
input: {
63+
flow: { id: testFlow.id },
64+
previousStep: { id: existingSteps[0].id },
65+
key: 'newStep',
66+
appKey: 'test-app',
67+
parameters: { newParam: 'value' },
68+
},
69+
}
70+
71+
const newStep = await createStep(null, params, context)
72+
73+
// Ensure the new step is returned as expected.
74+
expect(newStep).toBeDefined()
75+
expect(newStep.type).toBe('action')
76+
expect(newStep.key).toBe('newStep')
77+
expect(newStep.appKey).toBe('test-app')
78+
// New step's position should be previousStep.position + 1.
79+
expect(newStep.position).toBe(existingSteps[0].position + 1)
80+
81+
// Verify overall that there are three steps related to the flow.
82+
const steps = await testFlow
83+
.$relatedQuery('steps')
84+
.orderBy('position', 'asc')
85+
expect(steps).toHaveLength(4)
86+
expect(steps.map((step) => step.id)).toEqual([
87+
existingSteps[0].id,
88+
newStep.id,
89+
existingSteps[1].id,
90+
existingSteps[2].id,
91+
])
92+
// Also, check the ordering.
93+
expect(steps.map((step) => step.position)).toEqual([1, 2, 3, 4])
94+
})
95+
96+
it('should not shift steps if the previous step is the last step', async () => {
97+
const params = {
98+
input: {
99+
flow: { id: testFlow.id },
100+
previousStep: { id: existingSteps[2].id },
101+
},
102+
}
103+
104+
const newStep = await createStep(null, params, context)
105+
106+
expect(newStep).toBeDefined()
107+
expect(newStep.position).toBe(existingSteps[2].position + 1)
108+
const steps = await testFlow
109+
.$relatedQuery('steps')
110+
.orderBy('position', 'asc')
111+
expect(steps).toHaveLength(4)
112+
expect(steps.map((step) => step.id)).toEqual([
113+
existingSteps[0].id,
114+
existingSteps[1].id,
115+
existingSteps[2].id,
116+
newStep.id,
117+
])
118+
// Also, check the ordering.
119+
expect(steps.map((step) => step.position)).toEqual([1, 2, 3, 4])
120+
})
121+
122+
it('throws an error if the flow does not belong to the current user', async () => {
123+
// Create another user who does not own the existing flow.
124+
const otherUser = await User.query().insertAndFetch({
125+
126+
})
127+
128+
context.currentUser = otherUser
129+
130+
const params = {
131+
input: {
132+
flow: { id: testFlow.id },
133+
previousStep: { id: existingSteps[0].id },
134+
key: 'key',
135+
appKey: 'appKey',
136+
parameters: {},
137+
},
138+
}
139+
140+
await expect(createStep(null, params, context)).rejects.toThrow()
141+
})
142+
143+
it('throws an error if the previous step is not found', async () => {
144+
const params = {
145+
input: {
146+
flow: { id: testFlow.id },
147+
previousStep: { id: 'invalid-id' }, // Non-existent step id
148+
key: 'newStep',
149+
appKey: 'test-app',
150+
parameters: { newParam: 'value' },
151+
},
152+
}
153+
154+
await expect(createStep(null, params, context)).rejects.toThrow()
155+
})
156+
})
Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
import { beforeEach, describe, expect, it, vi } from 'vitest'
2+
3+
import cronTimes from '@/apps/scheduler/common/cron-times'
4+
import updateFlowStatus from '@/graphql/mutations/update-flow-status'
5+
import {
6+
REMOVE_AFTER_7_DAYS_OR_50_JOBS,
7+
REMOVE_AFTER_30_DAYS,
8+
} from '@/helpers/default-job-configuration'
9+
import flowQueue from '@/queues/flow'
10+
11+
// In these tests we simulate the chaining of queries on currentUser.$relatedQuery('flows')
12+
// and the additional methods on the flow: $query, getTriggerStep etc.
13+
describe('updateFlowStatus', () => {
14+
let fakeFlow: any
15+
let fakeQuery: any
16+
let context: any
17+
let fakeTriggerStep: any
18+
let patchSpy: ReturnType<typeof vi.fn>
19+
20+
beforeEach(() => {
21+
vi.resetAllMocks()
22+
23+
// Create a fake flow object with default values.
24+
fakeFlow = {
25+
id: 'flow-1',
26+
active: false,
27+
steps: [{ position: 1 }, { position: 2 }], // contiguous by default
28+
// we will override $query to simulate patch operations
29+
$query: vi.fn(),
30+
getTriggerStep: vi.fn(),
31+
}
32+
patchSpy = vi.fn().mockResolvedValue(undefined)
33+
fakeFlow.$query.mockReturnValue({ patch: patchSpy })
34+
35+
// Fake the chained query methods on context.currentUser.$relatedQuery('flows')
36+
fakeQuery = {
37+
findOne: vi.fn().mockReturnThis(),
38+
withGraphJoined: vi.fn().mockReturnThis(),
39+
orderBy: vi.fn().mockReturnThis(),
40+
throwIfNotFound: vi.fn().mockResolvedValue(fakeFlow),
41+
}
42+
43+
context = {
44+
currentUser: {
45+
$relatedQuery: vi.fn().mockReturnValue(fakeQuery),
46+
},
47+
}
48+
49+
// Set up a default fake trigger step returning a trigger command.
50+
fakeTriggerStep = {
51+
parameters: {},
52+
getTriggerCommand: vi.fn().mockResolvedValue({
53+
getInterval: vi.fn().mockReturnValue(cronTimes.everyHour),
54+
}),
55+
}
56+
fakeFlow.getTriggerStep.mockResolvedValue(fakeTriggerStep)
57+
58+
// Mock flowQueue methods.
59+
flowQueue.add = vi.fn().mockResolvedValue(undefined)
60+
flowQueue.getRepeatableJobs = vi
61+
.fn()
62+
.mockResolvedValue([{ id: fakeFlow.id, key: 'repeat-key' }])
63+
flowQueue.removeRepeatableByKey = vi.fn().mockResolvedValue(undefined)
64+
})
65+
66+
it('returns the flow without changes if the active status did not change', async () => {
67+
// Set the flow status to true and provide the same value in input.
68+
fakeFlow.active = true
69+
70+
const params = { input: { id: fakeFlow.id, active: true } }
71+
const result = await updateFlowStatus({}, params, context)
72+
73+
expect(result).toEqual(fakeFlow)
74+
// The patch/update should not be triggered
75+
expect(fakeFlow.$query).not.toHaveBeenCalled()
76+
expect(fakeFlow.getTriggerStep).not.toHaveBeenCalled()
77+
})
78+
79+
it('throws an error when activating a flow with non-contiguous step positions', async () => {
80+
// Prepare a flow with steps that are not contiguous.
81+
fakeFlow.active = false
82+
fakeFlow.steps = [{ position: 1 }, { position: 3 }]
83+
84+
const params = { input: { id: fakeFlow.id, active: true } }
85+
86+
await expect(updateFlowStatus({}, params, context)).rejects.toThrow(
87+
'Step positions are out of order. Please contact [email protected] for help.',
88+
)
89+
})
90+
91+
it('activates the flow and enqueues a job for non-webhook triggers', async () => {
92+
// Starting state where the flow is inactive and input sets it to active.
93+
fakeFlow.active = false
94+
fakeFlow.steps = [{ position: 1 }, { position: 2 }]
95+
96+
const params = { input: { id: fakeFlow.id, active: true } }
97+
const result = await updateFlowStatus({}, params, context)
98+
99+
// Validate that we patched the flow with active true and publishedAt set to an ISO string.
100+
expect(patchSpy).toHaveBeenCalledWith({
101+
active: true,
102+
publishedAt: expect.any(String),
103+
})
104+
105+
// jobName is constructed as "flow-<flow.id>"
106+
expect(flowQueue.add).toHaveBeenCalledWith(
107+
`flow-${fakeFlow.id}`,
108+
{ flowId: fakeFlow.id },
109+
{
110+
repeat: { pattern: '0 * * * *' },
111+
jobId: fakeFlow.id,
112+
removeOnComplete: REMOVE_AFTER_7_DAYS_OR_50_JOBS,
113+
removeOnFail: REMOVE_AFTER_30_DAYS,
114+
},
115+
)
116+
117+
expect(result).toEqual(fakeFlow)
118+
})
119+
120+
it('deactivates the flow and removes the repeatable job for non-webhook triggers', async () => {
121+
// For deactivation, ensure the current flow is active and we are setting it to inactive.
122+
fakeFlow.active = true
123+
124+
// Simulate that a repeatable job exists for this flow.
125+
flowQueue.getRepeatableJobs = vi
126+
.fn()
127+
.mockResolvedValue([{ id: fakeFlow.id, key: 'repeat-key' }])
128+
129+
const params = { input: { id: fakeFlow.id, active: false } }
130+
const result = await updateFlowStatus({}, params, context)
131+
132+
expect(patchSpy).toHaveBeenCalledWith({
133+
active: false,
134+
publishedAt: null,
135+
})
136+
137+
expect(flowQueue.removeRepeatableByKey).toHaveBeenCalledWith('repeat-key')
138+
expect(result).toEqual(fakeFlow)
139+
})
140+
141+
it('does not perform any queue actions when the trigger type is webhook on activation', async () => {
142+
// Flow activation but the trigger is of type webhook.
143+
fakeFlow.active = false
144+
fakeFlow.steps = [{ position: 1 }, { position: 2 }]
145+
146+
fakeTriggerStep.getTriggerCommand.mockResolvedValue({
147+
type: 'webhook',
148+
})
149+
150+
const params = { input: { id: fakeFlow.id, active: true } }
151+
const result = await updateFlowStatus({}, params, context)
152+
153+
// The patch should still occur.
154+
expect(patchSpy).toHaveBeenCalledWith({
155+
active: true,
156+
publishedAt: expect.any(String),
157+
})
158+
159+
// But no job should be added when trigger type is webhook.
160+
expect(flowQueue.add).not.toHaveBeenCalled()
161+
expect(result).toEqual(fakeFlow)
162+
})
163+
164+
it('does not perform any queue actions when the trigger type is webhook on deactivation', async () => {
165+
// Flow deactivation but the trigger is of type webhook.
166+
fakeFlow.active = true
167+
168+
fakeTriggerStep.getTriggerCommand.mockResolvedValue({
169+
type: 'webhook',
170+
})
171+
172+
const params = { input: { id: fakeFlow.id, active: false } }
173+
const result = await updateFlowStatus({}, params, context)
174+
175+
expect(patchSpy).toHaveBeenCalledWith({
176+
active: false,
177+
publishedAt: null,
178+
})
179+
180+
// For webhook triggers no removal of a repeatable job should be attempted.
181+
expect(flowQueue.getRepeatableJobs).not.toHaveBeenCalled()
182+
expect(flowQueue.removeRepeatableByKey).not.toHaveBeenCalled()
183+
expect(result).toEqual(fakeFlow)
184+
})
185+
})

packages/backend/src/graphql/mutations/create-step.ts

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import { raw } from 'objection'
2+
13
import Step from '@/models/step'
24

35
import type { MutationResolvers } from '../__generated__/types.generated'
@@ -10,6 +12,8 @@ const createStep: MutationResolvers['createStep'] = async (
1012
const { input } = params
1113

1214
return await Step.transaction(async (trx) => {
15+
await trx.raw('SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;')
16+
1317
// Put SELECTs in transaction just in case there's concurrent modification.
1418
const flow = await context.currentUser
1519
.$relatedQuery('flows', trx)
@@ -25,6 +29,13 @@ const createStep: MutationResolvers['createStep'] = async (
2529
})
2630
.throwIfNotFound()
2731

32+
await flow
33+
.$relatedQuery('steps', trx)
34+
.patch({
35+
position: raw(`position + 1`),
36+
})
37+
.where('position', '>=', previousStep.position + 1)
38+
2839
const step = await flow.$relatedQuery('steps', trx).insertAndFetch({
2940
key: input.key,
3041
appKey: input.appKey,
@@ -33,19 +44,6 @@ const createStep: MutationResolvers['createStep'] = async (
3344
parameters: input.parameters,
3445
})
3546

36-
const nextSteps = await flow
37-
.$relatedQuery('steps', trx)
38-
.where('position', '>=', step.position)
39-
.whereNot('id', step.id)
40-
41-
const nextStepQueries = nextSteps.map(async (nextStep, index) => {
42-
await nextStep.$query(trx).patchAndFetch({
43-
position: step.position + index + 1,
44-
})
45-
})
46-
47-
await Promise.all(nextStepQueries)
48-
4947
return step
5048
})
5149
}

0 commit comments

Comments
 (0)