Skip to content

Commit 64aedba

Browse files
committed
feat: bulk retry iterations
1 parent 1ddb6cb commit 64aedba

File tree

8 files changed

+360
-33
lines changed

8 files changed

+360
-33
lines changed

packages/backend/src/graphql/mutation-resolvers.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import type { MutationResolvers } from './__generated__/types.generated'
22
import bulkRetryExecutions from './mutations/bulk-retry-executions'
3+
import bulkRetryIterations from './mutations/bulk-retry-iterations'
34
import createConnection from './mutations/create-connection'
45
import createFlow from './mutations/create-flow'
56
import createFlowTransfer from './mutations/create-flow-transfer'
@@ -51,6 +52,7 @@ import verifyOtp from './mutations/verify-otp'
5152

5253
export default {
5354
bulkRetryExecutions,
55+
bulkRetryIterations,
5456
createConnection,
5557
generateAuthUrl,
5658
updateConnection,
Lines changed: 237 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,237 @@
1+
import { chunk } from 'lodash'
2+
import { raw } from 'objection'
3+
4+
import { DEFAULT_JOB_OPTIONS } from '@/helpers/default-job-configuration'
5+
import logger from '@/helpers/logger'
6+
import Execution from '@/models/execution'
7+
import ExecutionStep from '@/models/execution-step'
8+
import { enqueueActionJob, getActionJob } from '@/queues/action'
9+
10+
import type { MutationResolvers } from '../__generated__/types.generated'
11+
12+
const CHUNK_SIZE = 100
13+
14+
async function getAllFailedIterations(executionId: string) {
15+
const failedExecutionSteps = await ExecutionStep.query()
16+
.with('latest_attempts', (builder) => {
17+
builder
18+
.distinctOn([raw('step_id'), raw("metadata->>'iteration'")])
19+
.select('*')
20+
.from('execution_steps')
21+
.where('execution_id', executionId)
22+
.orderBy('step_id')
23+
.orderBy(raw("metadata->>'iteration'"))
24+
.orderBy('created_at', 'desc')
25+
})
26+
.select(
27+
'id',
28+
'execution_id',
29+
'step_id',
30+
'status',
31+
'job_id',
32+
'app_key',
33+
'key',
34+
'metadata',
35+
)
36+
.from('latest_attempts')
37+
.where('status', '!=', 'success')
38+
.withSoftDeleted()
39+
40+
return failedExecutionSteps
41+
}
42+
43+
const bulkRetryIterations: MutationResolvers['bulkRetryIterations'] = async (
44+
_parent,
45+
params,
46+
) => {
47+
if (!params.input.executionId) {
48+
throw new Error('Execution ID is required')
49+
}
50+
51+
let failedExecutionSteps = await getAllFailedIterations(
52+
params.input.executionId,
53+
)
54+
55+
/**
56+
* NOTE: this filters out execution steps that are not failed or have no job id
57+
* if there is no job id, we will skip the retry
58+
*/
59+
failedExecutionSteps = failedExecutionSteps.filter((executionStep) => {
60+
const {
61+
id: executionStepId,
62+
executionId,
63+
status,
64+
jobId,
65+
metadata,
66+
} = executionStep
67+
68+
const defaultLoggerMetadata = {
69+
executionId: executionId,
70+
executionStepId: executionStepId,
71+
iteration: metadata.iteration,
72+
}
73+
74+
if (status !== 'failure') {
75+
logger.error(
76+
'Latest execution step is not failed for a failed execution',
77+
{
78+
event: 'bulk-retry-iteration-step-status-mismatch',
79+
...defaultLoggerMetadata,
80+
},
81+
)
82+
return false
83+
}
84+
85+
if (jobId === null || jobId === undefined) {
86+
// For fresh per-app queues, job ID can be 0.
87+
logger.error('Latest execution step does not have a job ID', {
88+
event: 'bulk-retry-iteration-step-no-job-id',
89+
...defaultLoggerMetadata,
90+
})
91+
return false
92+
}
93+
94+
if (executionId !== params.input.executionId) {
95+
return false
96+
}
97+
98+
return true
99+
})
100+
101+
// Nothing to do if no steps to retry
102+
if (failedExecutionSteps.length === 0) {
103+
return {
104+
numFailedIterations: 0,
105+
allSuccessfullyRetried: true,
106+
}
107+
}
108+
109+
// Retry each failed iteration
110+
const retryAttempts: PromiseSettledResult<void>[] = []
111+
const chunkedIterations = chunk(failedExecutionSteps, CHUNK_SIZE)
112+
113+
for (const currChunk of chunkedIterations) {
114+
const promises = currChunk.map(async (executionStep) => {
115+
const {
116+
id: executionStepId,
117+
executionId,
118+
jobId,
119+
appKey,
120+
metadata,
121+
} = executionStep
122+
123+
const defaultLoggerMetadata = {
124+
executionId: executionId,
125+
executionStepId: executionStepId,
126+
iteration: metadata.iteration,
127+
}
128+
129+
const job = await getActionJob(jobId)
130+
if (!job) {
131+
// if job cannot be found anymore, remove the job id from the execution step so it cannot be retried again
132+
await executionStep.$query().patch({ jobId: null })
133+
logger.error('Bulk retrying iteration - no job', {
134+
event: 'bulk-retry-iteration-no-job',
135+
...defaultLoggerMetadata,
136+
oldJobId: jobId,
137+
})
138+
throw new Error(
139+
`Job for ${executionId}-${executionStepId}-${metadata.iteration} not found or has expired`,
140+
)
141+
}
142+
143+
try {
144+
const jobState = await job.getState()
145+
if (jobState !== 'failed') {
146+
logger.warn(
147+
`Bulk retrying iteration ${metadata.iteration} - job not failed`,
148+
{
149+
event: 'bulk-retry-iteration-job-not-failed',
150+
...defaultLoggerMetadata,
151+
jobId: jobId,
152+
jobState,
153+
},
154+
)
155+
throw new Error(
156+
`Job for ${executionId}-${executionStepId}-${metadata.iteration} (JOB: ${jobId}) is not in a failed state`,
157+
)
158+
}
159+
} catch (error) {
160+
logger.error('Bulk retrying execution step - job get state error', {
161+
event: 'bulk-retry-iteration-job-getstate-error',
162+
...defaultLoggerMetadata,
163+
oldJobData: job.data,
164+
oldJobId: job.id,
165+
error,
166+
})
167+
168+
throw error
169+
}
170+
171+
logger.info('Bulk retrying execution step - start', {
172+
event: 'bulk-retry-iteration-start',
173+
...defaultLoggerMetadata,
174+
oldJobData: job.data,
175+
oldJobId: job.id,
176+
})
177+
178+
try {
179+
await job.remove()
180+
181+
const newJob = await enqueueActionJob({
182+
appKey: appKey,
183+
jobName: job.name,
184+
jobData: job.data,
185+
jobOptions: DEFAULT_JOB_OPTIONS,
186+
})
187+
await Execution.query().findById(executionId).patch({ status: null })
188+
await executionStep.$query().patch({ jobId: newJob.id })
189+
190+
logger.info('Bulk retrying iterations - done', {
191+
event: 'bulk-retry-iteration-done',
192+
...defaultLoggerMetadata,
193+
oldJobData: job.data,
194+
oldJobId: job.id,
195+
newJobId: '123',
196+
})
197+
} catch (error) {
198+
logger.error('Bulk retrying iterations - ERROR', {
199+
event: 'bulk-retry-iteration-failed',
200+
...defaultLoggerMetadata,
201+
oldJobData: job.data,
202+
oldJobId: job.id,
203+
error,
204+
})
205+
206+
throw error
207+
}
208+
})
209+
const currRetryAttempts = await Promise.allSettled(promises)
210+
retryAttempts.push(...currRetryAttempts)
211+
}
212+
213+
const allSuccessfullyRetried = !retryAttempts.find(
214+
(attempt) => attempt.status === 'rejected',
215+
)
216+
217+
if (!allSuccessfullyRetried) {
218+
// Actually we can do some more processing to see which IDs failed but nvm.
219+
logger.warn('Some attempts in bulk iteration retry failed', {
220+
event: 'bulk-retry-iteration-some-attempts-failed',
221+
executionId: params.input.executionId,
222+
})
223+
} else {
224+
logger.info('Bulk iteration retry succeeded', {
225+
event: 'bulk-retry-iteration-success',
226+
executionId: params.input.executionId,
227+
numRetried: failedExecutionSteps.length,
228+
})
229+
}
230+
231+
return {
232+
numFailedIterations: failedExecutionSteps.length,
233+
allSuccessfullyRetried,
234+
}
235+
}
236+
237+
export default bulkRetryIterations

packages/backend/src/graphql/schema.graphql

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,9 @@ type Mutation {
9090
bulkRetryExecutions(
9191
input: BulkRetryExecutionsInput
9292
): BulkRetryExecutionsResult!
93+
bulkRetryIterations(
94+
input: BulkRetryIterationsInput
95+
): BulkRetryIterationsResult!
9396
logout: Boolean
9497
loginWithSgid(input: LoginWithSgidInput!): LoginWithSgidResult!
9598
loginWithSelectedSgid(
@@ -413,6 +416,11 @@ type BulkRetryExecutionsResult {
413416
allSuccessfullyRetried: Boolean!
414417
}
415418

419+
type BulkRetryIterationsResult {
420+
numFailedIterations: Int!
421+
allSuccessfullyRetried: Boolean!
422+
}
423+
416424
input CreateConnectionInput {
417425
key: String!
418426
formattedData: JSONObject!
@@ -562,6 +570,11 @@ input BulkRetryExecutionsInput {
562570
executionIds: [String!]
563571
}
564572

573+
input BulkRetryIterationsInput {
574+
flowId: String!
575+
executionId: String!
576+
}
577+
565578
"""
566579
The `JSONObject` scalar type represents JSON objects as specified by [ECMA-404](http://www.ecma-international.org/publications/files/ECMA-ST/ECMA-404.pdf).
567580
"""

packages/frontend/src/components/ExecutionGroup/IterationSelector.tsx

Lines changed: 36 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,39 @@
1-
// import { IExecution } from '@plumber/types'
1+
import { IExecution, IExecutionStep } from '@plumber/types'
22

33
import { useMemo } from 'react'
44
import { Flex, Tag } from '@chakra-ui/react'
55

66
import { SingleSelect } from '@/components/SingleSelect'
77
import { type GroupedSteps } from '@/helpers/processExecutionSteps'
88

9-
// import { RetryAllButton } from '../ExecutionStep/components/RetryAllButton'
9+
import { RetryAllButton } from '../ExecutionStep/components/RetryAllButton'
10+
import RetryButton from '../ExecutionStep/components/RetryButton'
11+
1012
import { GroupStatusType } from './GroupStatusFilter'
1113

1214
interface IterationSelectorProps {
13-
// canRetryAll: boolean
14-
// execution: IExecution
15-
groupedSteps: GroupedSteps
15+
canRetryAll: boolean
16+
execution: IExecution
17+
iterations: GroupedSteps
1618
selectedIteration: string
19+
selectedIterationStep: {
20+
iteration: number
21+
steps: IExecutionStep[]
22+
status: string
23+
}
1724
setSelectedIteration: (iteration: string) => void
1825
}
1926

2027
export default function IterationSelector({
21-
// canRetryAll,
22-
// execution,
23-
groupedSteps,
28+
canRetryAll,
29+
execution,
30+
iterations,
2431
selectedIteration,
32+
selectedIterationStep,
2533
setSelectedIteration,
2634
}: IterationSelectorProps) {
2735
const items = useMemo(() => {
28-
return groupedSteps.map(({ iteration, status }) => ({
36+
return iterations.map(({ iteration, status }) => ({
2937
label: (
3038
<Flex alignItems="center" gap={4}>
3139
Item {iteration}
@@ -50,7 +58,19 @@ export default function IterationSelector({
5058
),
5159
value: iteration.toString(),
5260
}))
53-
}, [groupedSteps])
61+
}, [iterations])
62+
63+
const executionStepId: string | null = useMemo(() => {
64+
if (selectedIterationStep.status !== GroupStatusType.Failure) {
65+
return null
66+
}
67+
const executionSteps = selectedIterationStep.steps
68+
const executionStepId = executionSteps[executionSteps.length - 1].id
69+
return executionStepId
70+
}, [selectedIterationStep])
71+
72+
const canRetry =
73+
selectedIterationStep.status === GroupStatusType.Failure && executionStepId
5474

5575
return (
5676
<Flex justifyContent="space-between" alignItems="center">
@@ -64,8 +84,12 @@ export default function IterationSelector({
6484
isClearable={false}
6585
colorScheme="secondary"
6686
/>
67-
{/* TODO: add retry buttons */}
68-
{/* {canRetryAll && <RetryAllButton execution={execution} type="iteration" />} */}
87+
<Flex gap={2}>
88+
{canRetry && <RetryButton executionStepId={executionStepId} />}
89+
{canRetry && canRetryAll && (
90+
<RetryAllButton execution={execution} type="iteration" />
91+
)}
92+
</Flex>
6993
</Flex>
7094
)
7195
}

packages/frontend/src/components/ExecutionGroup/index.tsx

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,8 +114,11 @@ export default function ExecutionGroup(props: ExecutionGroupProps) {
114114
</HStack>
115115
<Flex p={4} pt={0} direction="column" gap={4}>
116116
<IterationSelector
117-
groupedSteps={iterationsToShow}
117+
canRetryAll={groupStats.failure > 0}
118+
execution={execution}
119+
iterations={iterationsToShow}
118120
selectedIteration={selectedIteration}
121+
selectedIterationStep={selectedIterationStep}
119122
setSelectedIteration={setSelectedIteration}
120123
/>
121124
<Grid mb={{ base: '16px', sm: '40px' }} rowGap={6}>

0 commit comments

Comments
 (0)