1
1
import { continueAsNew , proxyActivities } from '@temporalio/workflow'
2
2
3
- import * as commonActivities from '../activities/common'
4
- import * as activities from '../activities/fix-deleted-member-org-affilations'
5
- import * as syncActivities from '../activities/sync/member'
3
+ import * as activities from '../activities'
6
4
import { IFixDeletedMemberOrgAffilationsArgs } from '../types'
7
5
8
- const activity = proxyActivities < typeof activities > ( {
6
+ const {
7
+ getProcessedMemberOrgAffiliations,
8
+ calculateMemberAffiliations,
9
+ syncMembersBatch,
10
+ queueOrgForAggComputation,
11
+ deleteProcessedMemberOrgAffiliations,
12
+ } = proxyActivities < typeof activities > ( {
9
13
startToCloseTimeout : '45 minutes' ,
10
14
} )
11
15
12
- const syncActivity = proxyActivities < typeof syncActivities > ( {
13
- startToCloseTimeout : '30 minutes' ,
14
- } )
15
-
16
- const commonActivity = proxyActivities < typeof commonActivities > ( {
17
- startToCloseTimeout : '30 minutes' ,
18
- } )
19
-
20
16
export async function fixDeletedMemberOrgAffilations (
21
17
args : IFixDeletedMemberOrgAffilationsArgs ,
22
18
) : Promise < void > {
@@ -27,29 +23,29 @@ export async function fixDeletedMemberOrgAffilations(
27
23
)
28
24
29
25
// Find affected memberId and orgId
30
- const affectedMembers = await activity . getProcessedMemberOrgAffiliations ( BATCH_SIZE )
26
+ const affectedMembers = await getProcessedMemberOrgAffiliations ( BATCH_SIZE )
31
27
32
28
if ( affectedMembers . length === 0 ) {
33
29
console . log ( 'No processed member org affiliations found!' )
34
30
return
35
31
}
36
32
37
- const CHUNK_SIZE = 10
33
+ const CHUNK_SIZE = 25
38
34
for ( let i = 0 ; i < affectedMembers . length ; i += CHUNK_SIZE ) {
39
35
const chunk = affectedMembers . slice ( i , i + CHUNK_SIZE )
40
36
await Promise . all (
41
37
chunk . map ( async ( { memberId, organizationId } ) => {
42
38
// Calculate affiliation
43
- await activity . calculateMemberAffiliations ( memberId )
39
+ await calculateMemberAffiliations ( memberId )
44
40
45
41
// Sync member
46
- await syncActivity . syncMembersBatch ( [ memberId ] , true )
42
+ await syncMembersBatch ( [ memberId ] , true )
47
43
48
44
// Add orgId to redisCache
49
45
// It will be picked up by the spawnOrganizationAggregatesComputation workflow
50
- await commonActivity . queueOrganizationForAggComputation ( organizationId )
46
+ await queueOrgForAggComputation ( organizationId )
51
47
52
- await activity . deleteProcessedMemberOrgAffiliations ( memberId , organizationId )
48
+ await deleteProcessedMemberOrgAffiliations ( memberId , organizationId )
53
49
} ) ,
54
50
)
55
51
}
0 commit comments