@@ -3,14 +3,20 @@ import { proxyActivities } from '@temporalio/workflow'
33import { updateActivities } from '@crowd/data-access-layer/src/activities/update'
44import { DbStore } from '@crowd/data-access-layer/src/database'
55import { DbConnOrTx } from '@crowd/data-access-layer/src/database'
6+ import { IDbActivityCreateData } from '@crowd/data-access-layer/src/old/apps/data_sink_worker/repo/activity.data'
67import {
78 figureOutNewOrgId ,
89 prepareMemberAffiliationsUpdate ,
910} from '@crowd/data-access-layer/src/old/apps/profiles_worker'
1011import { pgpQx } from '@crowd/data-access-layer/src/queryExecutor'
11- import { IMemberIdentity , MergeActionState , MergeActionStep } from '@crowd/types'
12+ import { IQueue } from '@crowd/queue'
13+ import {
14+ IMemberIdentity ,
15+ MemberIdentityType ,
16+ MergeActionState ,
17+ MergeActionStep ,
18+ } from '@crowd/types'
1219
13- import { IQueue } from '../../../../libs/queue/src/types'
1420import * as activities from '../activities'
1521import { svc } from '../main'
1622
@@ -20,7 +26,6 @@ const {
2026 moveActivitiesBetweenOrgs,
2127 notifyFrontendOrganizationMergeSuccessful,
2228 notifyFrontendOrganizationUnmergeSuccessful,
23- moveActivitiesWithIdentityToAnotherMember,
2429 recalculateActivityAffiliationsOfMemberAsync,
2530 recalculateActivityAffiliationsOfOrganizationSynchronous,
2631 setMergeAction,
@@ -94,6 +99,56 @@ export async function finishMemberMerging(
9499 )
95100}
96101
102+ function moveByIdentities ( {
103+ activity,
104+ identities,
105+ newMemberId,
106+ } : {
107+ activity : IDbActivityCreateData
108+ identities : IMemberIdentity [ ]
109+ newMemberId : string
110+ } ) : Partial < IDbActivityCreateData > {
111+ const { platform, username } = activity
112+ const activityMatches = identities . some (
113+ ( i ) =>
114+ i . type === MemberIdentityType . USERNAME && i . platform === platform && i . value === username ,
115+ )
116+
117+ return activityMatches ? { memberId : newMemberId } : { }
118+ }
119+
120+ export async function finishMemberUnmergingUpdateActivities ( {
121+ pgDb,
122+ qDb,
123+ queueClient,
124+ memberId,
125+ newMemberId,
126+ identities,
127+ } : {
128+ pgDb : DbStore
129+ qDb : DbConnOrTx
130+ queueClient : IQueue
131+ memberId : string
132+ newMemberId : string
133+ identities : IMemberIdentity [ ]
134+ } ) {
135+ const qx = pgpQx ( pgDb . connection ( ) )
136+ const { orgCases } = await prepareMemberAffiliationsUpdate ( qx , memberId )
137+
138+ await updateActivities (
139+ qDb ,
140+ queueClient ,
141+ [
142+ async ( activity ) => moveByIdentities ( { activity, identities, newMemberId } ) ,
143+ async ( activity ) => ( {
144+ organizationId : figureOutNewOrgId ( activity , orgCases ) ,
145+ } ) ,
146+ ] ,
147+ `"memberId" = $(memberId)` ,
148+ { memberId } ,
149+ )
150+ }
151+
97152export async function finishMemberUnmerging (
98153 primaryId : string ,
99154 secondaryId : string ,
@@ -106,7 +161,15 @@ export async function finishMemberUnmerging(
106161 await setMergeAction ( primaryId , secondaryId , tenantId , {
107162 step : MergeActionStep . UNMERGE_ASYNC_STARTED ,
108163 } )
109- await moveActivitiesWithIdentityToAnotherMember ( primaryId , secondaryId , identities , tenantId )
164+
165+ await finishMemberUnmergingUpdateActivities ( {
166+ pgDb : svc . postgres . reader ,
167+ qDb : svc . questdbSQL ,
168+ queueClient : svc . queue ,
169+ memberId : primaryId ,
170+ newMemberId : secondaryId ,
171+ identities,
172+ } )
110173 await syncMember ( primaryId )
111174 await syncMember ( secondaryId )
112175 await recalculateActivityAffiliationsOfMemberAsync ( primaryId , tenantId )
0 commit comments