Skip to content

Commit 90965c5

Browse files
authored
Combine activity moving and affiliation recalculation (#2807)
1 parent 9952017 commit 90965c5

File tree

9 files changed

+173
-107
lines changed

9 files changed

+173
-107
lines changed

package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,5 +4,6 @@
44
},
55
"devDependencies": {
66
"husky": "^9.1.7"
7-
}
7+
},
8+
"packageManager": "pnpm@9.11.0+sha512.0a203ffaed5a3f63242cd064c8fb5892366c103e328079318f78062f24ea8c9d50bc6a47aa3567cabefd824d170e78fa2745ed1f16b132e16436146b7688f19b"
89
}

pnpm-lock.yaml

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

services/apps/entity_merging_worker/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
"@crowd/opensearch": "workspace:*",
1919
"@crowd/redis": "workspace:*",
2020
"@crowd/types": "workspace:*",
21+
"@crowd/queue": "workspace:*",
2122
"@temporalio/workflow": "~1.11.1",
2223
"tsx": "^4.7.1",
2324
"typescript": "^5.6.3"

services/apps/entity_merging_worker/src/activities.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
export {
22
deleteMember,
3-
moveActivitiesBetweenMembers,
43
moveActivitiesWithIdentityToAnotherMember,
54
recalculateActivityAffiliationsOfMemberAsync,
65
syncMember,
76
notifyFrontendMemberMergeSuccessful,
87
notifyFrontendMemberUnmergeSuccessful,
98
syncRemoveMember,
9+
finishMemberMergingUpdateActivities,
10+
finishMemberUnmergingUpdateActivities,
1011
} from './activities/members'
1112

1213
export {

services/apps/entity_merging_worker/src/activities/members.ts

Lines changed: 75 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,18 @@
11
import { WorkflowIdReusePolicy } from '@temporalio/workflow'
22

3+
import { updateActivities } from '@crowd/data-access-layer/src/activities/update'
34
import { cleanupMemberAggregates } from '@crowd/data-access-layer/src/members/segments'
5+
import { IDbActivityCreateData } from '@crowd/data-access-layer/src/old/apps/data_sink_worker/repo/activity.data'
46
import {
57
cleanupMember,
68
deleteMemberSegments,
79
findMemberById,
810
getIdentitiesWithActivity,
9-
moveActivitiesToNewMember,
1011
moveIdentityActivitiesToNewMember,
1112
} from '@crowd/data-access-layer/src/old/apps/entity_merging_worker'
12-
import { dbStoreQx } from '@crowd/data-access-layer/src/queryExecutor'
13+
import { figureOutNewOrgId } from '@crowd/data-access-layer/src/old/apps/profiles_worker'
14+
import { prepareMemberAffiliationsUpdate } from '@crowd/data-access-layer/src/old/apps/profiles_worker'
15+
import { dbStoreQx, pgpQx } from '@crowd/data-access-layer/src/queryExecutor'
1316
import { SearchSyncApiClient } from '@crowd/opensearch'
1417
import { RedisPubSubEmitter } from '@crowd/redis'
1518
import {
@@ -28,19 +31,6 @@ export async function deleteMember(memberId: string): Promise<void> {
2831
await cleanupMember(svc.postgres.writer, memberId)
2932
}
3033

31-
export async function moveActivitiesBetweenMembers(
32-
primaryId: string,
33-
secondaryId: string,
34-
tenantId: string,
35-
): Promise<void> {
36-
const memberExists = await findMemberById(svc.postgres.writer, primaryId, tenantId)
37-
38-
if (!memberExists) {
39-
return
40-
}
41-
await moveActivitiesToNewMember(svc.questdbSQL, svc.queue, primaryId, secondaryId, tenantId)
42-
}
43-
4434
export async function moveActivitiesWithIdentityToAnotherMember(
4535
fromId: string,
4636
toId: string,
@@ -187,3 +177,73 @@ export async function notifyFrontendMemberUnmergeSuccessful(
187177
),
188178
)
189179
}
180+
181+
export async function finishMemberMergingUpdateActivities(memberId: string, newMemberId: string) {
182+
const pgDb = svc.postgres.reader
183+
const qDb = svc.questdbSQL
184+
const queueClient = svc.queue
185+
186+
const qx = pgpQx(pgDb.connection())
187+
const { orgCases } = await prepareMemberAffiliationsUpdate(qx, memberId)
188+
189+
await updateActivities(
190+
qDb,
191+
queueClient,
192+
[
193+
async () => ({ memberId: newMemberId }),
194+
async (activity) => ({ organizationId: figureOutNewOrgId(activity, orgCases) }),
195+
],
196+
`"memberId" = $(memberId)`,
197+
{
198+
memberId,
199+
},
200+
)
201+
}
202+
203+
function moveByIdentities({
204+
activity,
205+
identities,
206+
newMemberId,
207+
}: {
208+
activity: IDbActivityCreateData
209+
identities: IMemberIdentity[]
210+
newMemberId: string
211+
}): Partial<IDbActivityCreateData> {
212+
const { platform, username } = activity
213+
const activityMatches = identities.some(
214+
(i) =>
215+
i.type === MemberIdentityType.USERNAME && i.platform === platform && i.value === username,
216+
)
217+
218+
return activityMatches ? { memberId: newMemberId } : {}
219+
}
220+
221+
export async function finishMemberUnmergingUpdateActivities({
222+
memberId,
223+
newMemberId,
224+
identities,
225+
}: {
226+
memberId: string
227+
newMemberId: string
228+
identities: IMemberIdentity[]
229+
}) {
230+
const pgDb = svc.postgres.reader
231+
const qDb = svc.questdbSQL
232+
const queueClient = svc.queue
233+
234+
const qx = pgpQx(pgDb.connection())
235+
const { orgCases } = await prepareMemberAffiliationsUpdate(qx, memberId)
236+
237+
await updateActivities(
238+
qDb,
239+
queueClient,
240+
[
241+
async (activity) => moveByIdentities({ activity, identities, newMemberId }),
242+
async (activity) => ({
243+
organizationId: figureOutNewOrgId(activity, orgCases),
244+
}),
245+
],
246+
`"memberId" = $(memberId)`,
247+
{ memberId },
248+
)
249+
}

services/apps/entity_merging_worker/src/workflows/all.ts

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,10 @@ import * as activities from '../activities'
66

77
const {
88
deleteMember,
9-
moveActivitiesBetweenMembers,
109
deleteOrganization,
1110
moveActivitiesBetweenOrgs,
1211
notifyFrontendOrganizationMergeSuccessful,
1312
notifyFrontendOrganizationUnmergeSuccessful,
14-
moveActivitiesWithIdentityToAnotherMember,
1513
recalculateActivityAffiliationsOfMemberAsync,
1614
recalculateActivityAffiliationsOfOrganizationSynchronous,
1715
setMergeAction,
@@ -20,6 +18,8 @@ const {
2018
notifyFrontendMemberMergeSuccessful,
2119
notifyFrontendMemberUnmergeSuccessful,
2220
syncRemoveMember,
21+
finishMemberMergingUpdateActivities,
22+
finishMemberUnmergingUpdateActivities,
2323
} = proxyActivities<typeof activities>({
2424
startToCloseTimeout: '60 minutes',
2525
})
@@ -35,8 +35,9 @@ export async function finishMemberMerging(
3535
await setMergeAction(primaryId, secondaryId, tenantId, {
3636
step: MergeActionStep.MERGE_ASYNC_STARTED,
3737
})
38-
await moveActivitiesBetweenMembers(primaryId, secondaryId, tenantId)
39-
await recalculateActivityAffiliationsOfMemberAsync(primaryId, tenantId)
38+
39+
await finishMemberMergingUpdateActivities(primaryId, secondaryId)
40+
4041
await syncMember(primaryId)
4142
await syncRemoveMember(secondaryId)
4243
await deleteMember(secondaryId)
@@ -66,7 +67,12 @@ export async function finishMemberUnmerging(
6667
await setMergeAction(primaryId, secondaryId, tenantId, {
6768
step: MergeActionStep.UNMERGE_ASYNC_STARTED,
6869
})
69-
await moveActivitiesWithIdentityToAnotherMember(primaryId, secondaryId, identities, tenantId)
70+
71+
await finishMemberUnmergingUpdateActivities({
72+
memberId: primaryId,
73+
newMemberId: secondaryId,
74+
identities,
75+
})
7076
await syncMember(primaryId)
7177
await syncMember(secondaryId)
7278
await recalculateActivityAffiliationsOfMemberAsync(primaryId, tenantId)

services/libs/data-access-layer/src/activities/update.ts

Lines changed: 27 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -53,26 +53,42 @@ export async function streamActivities(
5353
})
5454
}
5555

56+
export type MapActivityFunction = (
57+
activity: IDbActivityCreateData,
58+
) => Promise<Partial<IDbActivityCreateData>>
59+
5660
export async function updateActivities(
5761
qdb: DbConnOrTx,
5862
queueClient: IQueue,
59-
mapActivity: (activity: IDbActivityCreateData) => Promise<Partial<IDbActivityCreateData>>,
63+
mapActivity: MapActivityFunction | MapActivityFunction[],
6064
where: string,
6165
params?: Record<string, unknown>,
6266
): Promise<{ processed: number; duration: number }> {
67+
async function mapNewActivity(
68+
activity: IDbActivityCreateData,
69+
mapActivity: MapActivityFunction | MapActivityFunction[],
70+
): Promise<IDbActivityCreateData> {
71+
let newActivity = activity
72+
73+
if (!Array.isArray(mapActivity)) {
74+
mapActivity = [mapActivity]
75+
}
76+
77+
for (const map of mapActivity) {
78+
newActivity = {
79+
...newActivity,
80+
...(await map(newActivity)),
81+
}
82+
}
83+
84+
return newActivity
85+
}
86+
6387
return streamActivities(
6488
qdb,
6589
async (activity) => {
66-
await insertActivities(
67-
queueClient,
68-
[
69-
{
70-
...activity,
71-
...(await mapActivity(activity)),
72-
},
73-
],
74-
true,
75-
)
90+
const newActivity = await mapNewActivity(activity, mapActivity)
91+
await insertActivities(queueClient, [newActivity], true)
7692
},
7793
where,
7894
params,

services/libs/data-access-layer/src/old/apps/entity_merging_worker/index.ts

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -40,25 +40,6 @@ export async function findMemberById(db: DbStore, primaryId: string, tenantId: s
4040
)
4141
}
4242

43-
export async function moveActivitiesToNewMember(
44-
qdb: DbConnOrTx,
45-
queueClient: IQueue,
46-
primaryId: string,
47-
secondaryId: string,
48-
tenantId: string,
49-
) {
50-
await updateActivities(
51-
qdb,
52-
queueClient,
53-
async () => ({ memberId: primaryId }),
54-
`"memberId" = $(memberId) AND "tenantId" = $(tenantId)`,
55-
{
56-
memberId: secondaryId,
57-
tenantId,
58-
},
59-
)
60-
}
61-
6243
export async function updateMergeActionState(
6344
db: DbStore,
6445
primaryId: string,

0 commit comments

Comments
 (0)