Skip to content

Commit dd3e8ea

Browse files
authored
organization aggs sync improvements (#2860)
1 parent 17d0ab9 commit dd3e8ea

File tree

15 files changed

+283
-113
lines changed

15 files changed

+283
-113
lines changed

backend/src/services/member/memberOrganizationsService.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,7 @@ export default class MemberOrganizationsService extends LoggerBase {
169169
memberId,
170170
[memberOrganizationToBeDeleted.organizationId],
171171
repositoryOptions,
172+
true,
172173
)
173174

174175
const result = await this.list(memberId)

services/apps/profiles_worker/src/activities/member/memberUpdate.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,10 @@ export async function updateMemberAffiliations(input: MemberUpdateInput): Promis
2121
}
2222
}
2323

24-
export async function syncMember(memberId: string): Promise<void> {
24+
export async function syncMember(memberId: string, withAggs: boolean): Promise<void> {
2525
const syncApi = new SearchSyncApiClient({
2626
baseUrl: process.env['CROWD_SEARCH_SYNC_API_URL'],
2727
})
2828

29-
await syncApi.triggerMemberSync(memberId)
29+
await syncApi.triggerMemberSync(memberId, { withAggs })
3030
}

services/apps/profiles_worker/src/workflows/member/memberUpdate.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ export async function memberUpdate(input: MemberUpdateInput): Promise<void> {
2020
await updateMemberAffiliations(input)
2121
if (input.syncToOpensearch) {
2222
// sync member
23-
await syncMember(input.member.id)
23+
await syncMember(input.member.id, input.syncToOpensearch)
2424
// sync all member organizations
2525
const organizationIds = input.memberOrganizationIds || []
2626
for (const orgId of organizationIds) {

services/apps/script_executor_worker/src/activities.ts

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,9 @@ import {
2828
markActivitiesAsIndexed,
2929
resetIndexedIdentities,
3030
} from './activities/populate-activity-relations'
31-
import {
32-
deleteIndexedEntities,
33-
getMembersForSync,
34-
markEntitiesIndexed,
35-
syncMembersBatch,
36-
} from './activities/sync/member'
31+
import { deleteIndexedEntities, markEntitiesIndexed } from './activities/sync/entity-index'
32+
import { getMembersForSync, syncMembersBatch } from './activities/sync/member'
33+
import { getOrganizationsForSync, syncOrganizationsBatch } from './activities/sync/organization'
3734

3835
export {
3936
findMembersWithSameVerifiedEmailsInDifferentPlatforms,
@@ -58,6 +55,8 @@ export {
5855
markActivitiesAsIndexed,
5956
syncMembersBatch,
6057
getMembersForSync,
58+
getOrganizationsForSync,
59+
syncOrganizationsBatch,
6160
deleteIndexedEntities,
6261
markEntitiesIndexed,
6362
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
import { IndexedEntityType } from '@crowd/opensearch/src/repo/indexing.data'
2+
import { IndexingRepository } from '@crowd/opensearch/src/repo/indexing.repo'
3+
4+
import { svc } from '../../main'
5+
6+
export async function deleteIndexedEntities(entityType: IndexedEntityType): Promise<void> {
7+
try {
8+
const indexingRepo = new IndexingRepository(svc.postgres.writer, svc.log)
9+
await indexingRepo.deleteIndexedEntities(entityType)
10+
} catch (error) {
11+
svc.log.error(error, 'Error deleting indexed entities')
12+
throw error
13+
}
14+
}
15+
16+
export async function markEntitiesIndexed(
17+
entityType: IndexedEntityType,
18+
entityIds: string[],
19+
): Promise<void> {
20+
try {
21+
const indexingRepo = new IndexingRepository(svc.postgres.writer, svc.log)
22+
await indexingRepo.markEntitiesIndexed(entityType, entityIds)
23+
} catch (error) {
24+
svc.log.error(error, 'Error marking entities indexed')
25+
throw error
26+
}
27+
}

services/apps/script_executor_worker/src/activities/sync/member.ts

Lines changed: 8 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,18 @@
11
import { sumBy } from '@crowd/common'
22
import { DbStore } from '@crowd/data-access-layer/src/database'
33
import { MemberSyncService } from '@crowd/opensearch'
4-
import { IndexedEntityType } from '@crowd/opensearch/src/repo/indexing.data'
5-
import { IndexingRepository } from '@crowd/opensearch/src/repo/indexing.repo'
64
import { MemberRepository } from '@crowd/opensearch/src/repo/member.repo'
75

86
import { svc } from '../../main'
97

10-
export async function deleteIndexedEntities(entityType: IndexedEntityType): Promise<void> {
11-
const indexingRepo = new IndexingRepository(svc.postgres.writer, svc.log)
12-
await indexingRepo.deleteIndexedEntities(entityType)
13-
}
14-
15-
export async function markEntitiesIndexed(
16-
entityType: IndexedEntityType,
17-
entityIds: string[],
18-
): Promise<void> {
19-
const indexingRepo = new IndexingRepository(svc.postgres.writer, svc.log)
20-
await indexingRepo.markEntitiesIndexed(entityType, entityIds)
21-
}
22-
238
export async function getMembersForSync(batchSize: number): Promise<string[]> {
24-
const memberRepo = new MemberRepository(svc.redis, svc.postgres.reader, svc.log)
25-
return memberRepo.getMembersForSync(batchSize)
9+
try {
10+
const memberRepo = new MemberRepository(svc.redis, svc.postgres.reader, svc.log)
11+
return memberRepo.getMembersForSync(batchSize)
12+
} catch (error) {
13+
svc.log.error(error, 'Error getting members for sync')
14+
throw error
15+
}
2616
}
2717

2818
export async function syncMembersBatch(
@@ -41,7 +31,7 @@ export async function syncMembersBatch(
4131

4232
const CHUNK_SIZE = chunkSize || 10
4333

44-
svc.log.info(`Syncing members in chunks of ${CHUNK_SIZE} members!`)
34+
svc.log.info(`Syncing members in chunks of ${CHUNK_SIZE}!`)
4535

4636
const results = []
4737
for (let i = 0; i < memberIds.length; i += CHUNK_SIZE) {
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
import { sumBy } from '@crowd/common'
2+
import { DbStore } from '@crowd/data-access-layer/src/database'
3+
import { OrganizationSyncService } from '@crowd/opensearch'
4+
import { OrganizationRepository } from '@crowd/opensearch/src/repo/organization.repo'
5+
6+
import { svc } from '../../main'
7+
8+
export async function getOrganizationsForSync(batchSize: number): Promise<string[]> {
9+
try {
10+
const organizationRepo = new OrganizationRepository(svc.postgres.reader, svc.log)
11+
return organizationRepo.getOrganizationsForSync(batchSize)
12+
} catch (error) {
13+
svc.log.error(error, 'Error getting organizations for sync')
14+
throw error
15+
}
16+
}
17+
18+
export async function syncOrganizationsBatch(
19+
organizationIds: string[],
20+
withAggs: boolean,
21+
chunkSize?: number,
22+
): Promise<{ docCount: number; organizationCount: number }> {
23+
try {
24+
const service = new OrganizationSyncService(
25+
new DbStore(svc.log, svc.questdbSQL),
26+
svc.postgres.writer,
27+
svc.opensearch,
28+
svc.log,
29+
)
30+
31+
const CHUNK_SIZE = chunkSize || 10
32+
33+
svc.log.info(`Syncing orgs in chunks of ${CHUNK_SIZE}!`)
34+
35+
const results = []
36+
for (let i = 0; i < organizationIds.length; i += CHUNK_SIZE) {
37+
const chunk = organizationIds.slice(i, i + CHUNK_SIZE)
38+
const chunkResults = await Promise.all(
39+
chunk.map((organizationId) => service.syncOrganizations([organizationId], { withAggs })),
40+
)
41+
results.push(...chunkResults)
42+
}
43+
44+
return {
45+
docCount: sumBy(results, (r) => r.documentsIndexed),
46+
organizationCount: sumBy(results, (r) => r.organizationsSynced),
47+
}
48+
} catch (err) {
49+
throw new Error(err)
50+
}
51+
}

services/apps/script_executor_worker/src/main.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@ const options: Options = {
2626
opensearch: {
2727
enabled: true,
2828
},
29+
queue: {
30+
enabled: true,
31+
},
2932
}
3033

3134
export const svc = new ServiceWorker(config, options)

services/apps/script_executor_worker/src/types.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,14 @@ export interface IPopulateActivityRelationsArgs {
2525
deleteIndexedEntities?: boolean
2626
latestSyncedActivityTimestamp?: string
2727
}
28-
export interface ISyncMembersArgs {
28+
export interface ISyncArgs {
2929
batchSize?: number
3030
chunkSize?: number
3131
clean?: boolean
3232
withAggs?: boolean
3333
testRun?: boolean
3434
}
35+
36+
export interface IUnlinkDeletedMemberOrgArgs {
37+
memberId: string
38+
}

services/apps/script_executor_worker/src/workflows.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import { findAndMergeMembersWithSameVerifiedEmailsInDifferentPlatforms } from '.
44
import { fixOrgIdentitiesWithWrongUrls } from './workflows/fixOrgIdentitiesWithWrongUrls'
55
import { populateActivityRelations } from './workflows/populateActivityRelations'
66
import { syncMembers } from './workflows/syncMembers'
7+
import { syncOrganizations } from './workflows/syncOrganizations'
78

89
export {
910
findAndMergeMembersWithSameVerifiedEmailsInDifferentPlatforms,
@@ -12,4 +13,5 @@ export {
1213
fixOrgIdentitiesWithWrongUrls,
1314
populateActivityRelations,
1415
syncMembers,
16+
syncOrganizations,
1517
}

0 commit comments

Comments
 (0)