Skip to content

Commit c3d6bb5

Browse files
committed
opensearch sync issues fixed when syncing unmerged members with no activities
1 parent 0ddb7f6 commit c3d6bb5

File tree

9 files changed

+104
-35
lines changed

9 files changed

+104
-35
lines changed

services/apps/entity_merging_worker/src/activities/members.ts

+21-2
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ import {
1010
findMemberById,
1111
moveActivitiesToNewMember,
1212
moveIdentityActivitiesToNewMember,
13+
findMemberSegments,
14+
markMemberAsManuallyCreated,
1315
} from '@crowd/data-access-layer/src/old/apps/entity_merging_worker'
1416

1517
export async function deleteMember(memberId: string): Promise<void> {
@@ -78,12 +80,29 @@ export async function recalculateActivityAffiliations(
7880
})
7981
}
8082

81-
export async function syncMember(memberId: string): Promise<void> {
83+
export async function syncMember(memberId: string, secondaryMemberId: string): Promise<void> {
8284
const syncApi = new SearchSyncApiClient({
8385
baseUrl: process.env['CROWD_SEARCH_SYNC_API_URL'],
8486
})
8587

86-
await syncApi.triggerMemberSync(memberId)
88+
// check if member has any activities
89+
const result = await findMemberSegments(svc.postgres.writer, memberId)
90+
91+
if (result.segmentIds) {
92+
// segment information can be deduced from activities, no need to send segmentIds explicitly on merging
93+
await syncApi.triggerMemberSync(memberId)
94+
return
95+
}
96+
97+
// check if secondary member has any activities
98+
const secondaryResult = await findMemberSegments(svc.postgres.writer, secondaryMemberId)
99+
100+
if (secondaryResult.segmentIds) {
101+
// mark member as manually created
102+
await markMemberAsManuallyCreated(svc.postgres.writer, memberId)
103+
// member doesn't have any activity to deduce segmentIds for syncing, use the secondary member's activity segments
104+
await syncApi.triggerMemberSync(memberId, secondaryResult.segmentIds)
105+
}
87106
}
88107

89108
export async function notifyFrontendMemberUnmergeSuccessful(

services/apps/entity_merging_worker/src/workflows/all.ts

+2-2
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,8 @@ export async function finishMemberUnmerging(
3939
userId: string,
4040
): Promise<void> {
4141
await moveActivitiesWithIdentityToAnotherMember(primaryId, secondaryId, identities, tenantId)
42-
await syncMember(primaryId)
43-
await syncMember(secondaryId)
42+
await syncMember(primaryId, secondaryId)
43+
await syncMember(secondaryId, primaryId)
4444
await recalculateActivityAffiliations(primaryId, tenantId)
4545
await recalculateActivityAffiliations(secondaryId, tenantId)
4646
await setMergeActionState(primaryId, secondaryId, tenantId, 'unmerged' as MergeActionState)

services/apps/search_sync_api/src/routes/member.ts

+2-2
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,10 @@ router.post(
1818
serviceConfig,
1919
)
2020

21-
const { memberIds } = req.body
21+
const { memberIds, segmentIds } = req.body
2222
try {
2323
req.log.trace(`Calling memberSyncService.syncMembers for ${memberIds}`)
24-
await memberSyncService.syncMembers(memberIds)
24+
await memberSyncService.syncMembers(memberIds, segmentIds)
2525
res.sendStatus(200)
2626
} catch (error) {
2727
res.status(500).send(error.message)

services/libs/data-access-layer/src/old/apps/entity_merging_worker/index.ts

+23
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { DbStore } from '@crowd/database'
22
import { MergeActionState } from '@crowd/types'
3+
import { ISegmentIds } from './types'
34

45
export async function deleteMemberSegments(db: DbStore, memberId: string) {
56
return db.connection().query(
@@ -90,3 +91,25 @@ export async function moveIdentityActivitiesToNewMember(
9091
[toId, fromId, tenantId, username, platform],
9192
)
9293
}
94+
95+
export async function findMemberSegments(db: DbStore, memberId: string): Promise<ISegmentIds> {
96+
const result = await db.connection().one(
97+
`
98+
SELECT array_agg(distinct "segmentId") as "segmentIds"
99+
FROM activities
100+
WHERE "memberId" = $1
101+
`,
102+
[memberId],
103+
)
104+
return result as ISegmentIds
105+
}
106+
107+
export async function markMemberAsManuallyCreated(db: DbStore, memberId: string): Promise<void> {
108+
return db.connection().query(
109+
`
110+
UPDATE members set "manuallyCreated" = true
111+
WHERE "id" = $1
112+
`,
113+
[memberId],
114+
)
115+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
export interface ISegmentIds {
2+
segmentIds: string[]
3+
}

services/libs/opensearch/src/apiClient.ts

+2-1
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,14 @@ export class SearchSyncApiClient {
1212
})
1313
}
1414

15-
public async triggerMemberSync(memberId: string): Promise<void> {
15+
public async triggerMemberSync(memberId: string, segmentIds?: string[]): Promise<void> {
1616
if (!memberId) {
1717
throw new Error('memberId is required!')
1818
}
1919

2020
await this.searchSyncApi.post('/sync/members', {
2121
memberIds: [memberId],
22+
segmentIds,
2223
})
2324
}
2425

services/libs/opensearch/src/repo/member.data.ts

+5
Original file line numberDiff line numberDiff line change
@@ -102,3 +102,8 @@ export interface IMemberSegmentMatrixItem {
102102
export interface IMemberSegmentMatrix {
103103
[key: string]: IMemberSegmentMatrixItem[]
104104
}
105+
106+
export interface IMemberSegment {
107+
memberId: string
108+
segmentId: string
109+
}

services/libs/opensearch/src/repo/member.repo.ts

+44-26
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import { DbStore, RepositoryBase } from '@crowd/database'
22
import { Logger } from '@crowd/logging'
33
import { RedisCache, RedisClient } from '@crowd/redis'
44
import { IMemberAttribute } from '@crowd/types'
5-
import { IDbMemberSyncData, IMemberSegmentMatrix } from './member.data'
5+
import { IDbMemberSyncData, IMemberSegment, IMemberSegmentMatrix } from './member.data'
66

77
export class MemberRepository extends RepositoryBase<MemberRepository> {
88
private readonly cache: RedisCache
@@ -586,40 +586,58 @@ export class MemberRepository extends RepositoryBase<MemberRepository> {
586586
return results.map((r) => r.id)
587587
}
588588

589-
public async getMemberSegmentCouples(ids): Promise<IMemberSegmentMatrix> {
590-
let results = await this.db().any(
591-
`
592-
select distinct a."segmentId", a."memberId"
593-
from activities a
594-
where a."memberId" in ($(ids:csv));
595-
`,
596-
{
597-
ids,
598-
},
599-
)
600-
601-
// Manually created members don't have any activities,
602-
// filter out those memberIds that are not in the results
603-
const manuallyCreatedIds = ids.filter((id) => !results.some((r) => r.memberId === id))
604-
605-
// memberSegments aren't maintained well, so use as a fallback for manuallyCreated members
606-
if (manuallyCreatedIds.length > 0) {
607-
const missingResults = await this.db().any(
589+
public async getMemberSegmentCouples(
590+
memberIds: string[],
591+
segmentIds?: string[],
592+
): Promise<IMemberSegmentMatrix> {
593+
let memberSegments: IMemberSegment[] = []
594+
595+
if (segmentIds && segmentIds.length > 0) {
596+
for (const memberId of memberIds) {
597+
for (const segmentId of segmentIds) {
598+
memberSegments.push({
599+
memberId,
600+
segmentId,
601+
})
602+
}
603+
}
604+
} else {
605+
memberSegments = await this.db().any(
608606
`
609-
select distinct ms."segmentId", ms."memberId"
610-
from "memberSegments" ms
611-
where ms."memberId" in ($(manuallyCreatedIds:csv));
607+
select distinct a."segmentId", a."memberId"
608+
from activities a
609+
where a."memberId" in ($(ids:csv));
612610
`,
613611
{
614-
manuallyCreatedIds,
612+
ids: memberIds,
615613
},
616614
)
617-
results = [...results, ...missingResults]
615+
616+
// Manually created members don't have any activities,
617+
// filter out those memberIds that are not in the results
618+
const manuallyCreatedIds = memberIds.filter(
619+
(id) => !memberSegments.some((r) => r.memberId === id),
620+
)
621+
622+
// memberSegments aren't maintained well, so use as a fallback for manuallyCreated members
623+
if (manuallyCreatedIds.length > 0) {
624+
const missingResults = await this.db().any(
625+
`
626+
select distinct ms."segmentId", ms."memberId"
627+
from "memberSegments" ms
628+
where ms."memberId" in ($(manuallyCreatedIds:csv));
629+
`,
630+
{
631+
manuallyCreatedIds,
632+
},
633+
)
634+
memberSegments = [...memberSegments, ...missingResults]
635+
}
618636
}
619637

620638
const matrix = {}
621639

622-
for (const memberSegment of results) {
640+
for (const memberSegment of memberSegments) {
623641
if (!matrix[memberSegment.memberId]) {
624642
matrix[memberSegment.memberId] = [
625643
{

services/libs/opensearch/src/service/member.sync.service.ts

+2-2
Original file line numberDiff line numberDiff line change
@@ -312,13 +312,13 @@ export class MemberSyncService {
312312
)
313313
}
314314

315-
public async syncMembers(memberIds: string[]): Promise<IMemberSyncResult> {
315+
public async syncMembers(memberIds: string[], segmentIds?: string[]): Promise<IMemberSyncResult> {
316316
const CONCURRENT_DATABASE_QUERIES = 25
317317
const BULK_INDEX_DOCUMENT_BATCH_SIZE = 2500
318318

319319
// get all memberId-segmentId couples
320320
const memberSegmentCouples: IMemberSegmentMatrix =
321-
await this.memberRepo.getMemberSegmentCouples(memberIds)
321+
await this.memberRepo.getMemberSegmentCouples(memberIds, segmentIds)
322322
let databaseStream = []
323323
let syncStream = []
324324
let documentsIndexed = 0

0 commit comments

Comments
 (0)