Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

search sync worker optimizations #2241

Merged
merged 2 commits into from
Feb 29, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions services/apps/data_sink_worker/src/bin/map-member-to-org.ts
Original file line number Diff line number Diff line change
@@ -113,10 +113,15 @@ setImmediate(async () => {
orgService.addToMember(member.tenantId, segmentId, member.id, orgs)

for (const org of orgs) {
await searchSyncWorkerEmitter.triggerOrganizationSync(member.tenantId, org.id, true)
await searchSyncWorkerEmitter.triggerOrganizationSync(
member.tenantId,
org.id,
true,
segmentId,
)
}

await searchSyncWorkerEmitter.triggerMemberSync(member.tenantId, member.id, true)
await searchSyncWorkerEmitter.triggerMemberSync(member.tenantId, member.id, true, segmentId)
log.info('Done mapping member to organizations!')
} else {
log.info('No organizations found with matching email domains!')
Original file line number Diff line number Diff line change
@@ -127,10 +127,15 @@ setImmediate(async () => {
orgService.addToMember(tenantId, segmentId, member.id, orgs)

for (const org of orgs) {
await searchSyncWorkerEmitter.triggerOrganizationSync(tenantId, org.id, true)
await searchSyncWorkerEmitter.triggerOrganizationSync(
tenantId,
org.id,
true,
segmentId,
)
}

await searchSyncWorkerEmitter.triggerMemberSync(tenantId, member.id, true)
await searchSyncWorkerEmitter.triggerMemberSync(tenantId, member.id, true, segmentId)
}
}

18 changes: 15 additions & 3 deletions services/apps/data_sink_worker/src/service/activity.service.ts
Original file line number Diff line number Diff line change
@@ -135,6 +135,7 @@ export default class ActivityService extends LoggerBase {
tenantId,
activity.memberId,
onboarding,
segmentId,
)
await this.searchSyncWorkerEmitter.triggerActivitySync(tenantId, id, onboarding)
}
@@ -222,6 +223,7 @@ export default class ActivityService extends LoggerBase {
tenantId,
activity.memberId,
onboarding,
segmentId,
)
await this.searchSyncWorkerEmitter.triggerActivitySync(tenantId, id, onboarding)
}
@@ -429,9 +431,9 @@ export default class ActivityService extends LoggerBase {
let memberId: string
let objectMemberId: string | undefined
let activityId: string
let segmentId: string

await this.store.transactionally(async (txStore) => {
let segmentId: string
try {
const txRepo = new ActivityRepository(txStore, this.log)
const txMemberRepo = new MemberRepository(txStore, this.log)
@@ -928,10 +930,20 @@ export default class ActivityService extends LoggerBase {
})

if (memberId) {
await this.searchSyncWorkerEmitter.triggerMemberSync(tenantId, memberId, onboarding)
await this.searchSyncWorkerEmitter.triggerMemberSync(
tenantId,
memberId,
onboarding,
segmentId,
)
}
if (objectMemberId) {
await this.searchSyncWorkerEmitter.triggerMemberSync(tenantId, objectMemberId, onboarding)
await this.searchSyncWorkerEmitter.triggerMemberSync(
tenantId,
objectMemberId,
onboarding,
segmentId,
)
}
if (activityId) {
await this.searchSyncWorkerEmitter.triggerActivitySync(tenantId, activityId, onboarding)
18 changes: 14 additions & 4 deletions services/apps/data_sink_worker/src/service/member.service.ts
Original file line number Diff line number Diff line change
@@ -172,11 +172,16 @@ export default class MemberService extends LoggerBase {
}

if (fireSync) {
await this.searchSyncWorkerEmitter.triggerMemberSync(tenantId, id, onboarding)
await this.searchSyncWorkerEmitter.triggerMemberSync(tenantId, id, onboarding, segmentId)
}

for (const org of organizations) {
await this.searchSyncWorkerEmitter.triggerOrganizationSync(tenantId, org.id, onboarding)
await this.searchSyncWorkerEmitter.triggerOrganizationSync(
tenantId,
org.id,
onboarding,
segmentId,
)
}

return id
@@ -301,11 +306,16 @@ export default class MemberService extends LoggerBase {
})

if (updated && fireSync) {
await this.searchSyncWorkerEmitter.triggerMemberSync(tenantId, id, onboarding)
await this.searchSyncWorkerEmitter.triggerMemberSync(tenantId, id, onboarding, segmentId)
}

for (const org of organizations) {
await this.searchSyncWorkerEmitter.triggerOrganizationSync(tenantId, org.id, onboarding)
await this.searchSyncWorkerEmitter.triggerOrganizationSync(
tenantId,
org.id,
onboarding,
segmentId,
)
}
} catch (err) {
this.log.error(err, { memberId: id }, 'Error while updating a member!')
2 changes: 1 addition & 1 deletion services/apps/search_sync_worker/src/main.ts
Original file line number Diff line number Diff line change
@@ -10,7 +10,7 @@ import { WorkerQueueReceiver } from './queue'
const tracer = getServiceTracer()
const log = getServiceLogger()

const MAX_CONCURRENT_PROCESSING = 2
const MAX_CONCURRENT_PROCESSING = 5

setImmediate(async () => {
log.info('Starting search sync worker...')
72 changes: 40 additions & 32 deletions services/apps/search_sync_worker/src/queue/index.ts
Original file line number Diff line number Diff line change
@@ -19,9 +19,9 @@ import { SERVICE_CONFIG } from '../conf'

/* eslint-disable @typescript-eslint/no-explicit-any */
export class WorkerQueueReceiver extends SqsPrioritizedQueueReciever {
private readonly memberBatchProcessor: BatchProcessor<string>
// private readonly memberBatchProcessor: BatchProcessor<string>
private readonly activityBatchProcessor: BatchProcessor<string>
private readonly organizationBatchProcessor: BatchProcessor<string>
// private readonly organizationBatchProcessor: BatchProcessor<string>

constructor(
level: QueuePriorityLevel,
@@ -45,20 +45,20 @@ export class WorkerQueueReceiver extends SqsPrioritizedQueueReciever {
10,
)

this.memberBatchProcessor = new BatchProcessor(
100,
30,
async (memberIds) => {
const distinct = Array.from(new Set(memberIds))
if (distinct.length > 0) {
this.log.info({ batchSize: distinct.length }, 'Processing batch of members!')
await this.initMemberService().syncMembers(distinct)
}
},
async (memberIds, err) => {
this.log.error(err, { memberIds }, 'Error while processing batch of members!')
},
)
// this.memberBatchProcessor = new BatchProcessor(
// 100,
// 30,
// async (memberIds) => {
// const distinct = Array.from(new Set(memberIds))
// if (distinct.length > 0) {
// this.log.info({ batchSize: distinct.length }, 'Processing batch of members!')
// await this.initMemberService().syncMembers(distinct)
// }
// },
// async (memberIds, err) => {
// this.log.error(err, { memberIds }, 'Error while processing batch of members!')
// },
// )

this.activityBatchProcessor = new BatchProcessor(
200,
@@ -75,20 +75,20 @@ export class WorkerQueueReceiver extends SqsPrioritizedQueueReciever {
},
)

this.organizationBatchProcessor = new BatchProcessor(
20,
30,
async (organizationIds) => {
const distinct = Array.from(new Set(organizationIds))
if (distinct.length > 0) {
this.log.info({ batchSize: distinct.length }, 'Processing batch of organizations!')
await this.initOrganizationService().syncOrganizations(distinct)
}
},
async (organizationIds, err) => {
this.log.error(err, { organizationIds }, 'Error while processing batch of organizations!')
},
)
// this.organizationBatchProcessor = new BatchProcessor(
// 20,
// 30,
// async (organizationIds) => {
// const distinct = Array.from(new Set(organizationIds))
// if (distinct.length > 0) {
// this.log.info({ batchSize: distinct.length }, 'Processing batch of organizations!')
// await this.initOrganizationService().syncOrganizations(distinct)
// }
// },
// async (organizationIds, err) => {
// this.log.error(err, { organizationIds }, 'Error while processing batch of organizations!')
// },
// )
}

private initMemberService(): MemberSyncService {
@@ -130,7 +130,11 @@ export class WorkerQueueReceiver extends SqsPrioritizedQueueReciever {
// members
case SearchSyncWorkerQueueMessageType.SYNC_MEMBER:
if (data.memberId) {
await this.memberBatchProcessor.addToBatch(data.memberId)
// await this.memberBatchProcessor.addToBatch(data.memberId)
await this.initMemberService().syncMembers(
[data.memberId],
data.segmentId ? [data.segmentId] : undefined,
)
}

break
@@ -202,7 +206,11 @@ export class WorkerQueueReceiver extends SqsPrioritizedQueueReciever {
// organizations
case SearchSyncWorkerQueueMessageType.SYNC_ORGANIZATION:
if (data.organizationId) {
await this.organizationBatchProcessor.addToBatch(data.organizationId)
// await this.organizationBatchProcessor.addToBatch(data.organizationId)
await this.initOrganizationService().syncOrganizations(
[data.organizationId],
data.segmentId ? [data.segmentId] : undefined,
)
}
break
case SearchSyncWorkerQueueMessageType.SYNC_TENANT_ORGANIZATIONS:
Original file line number Diff line number Diff line change
@@ -27,7 +27,12 @@ export class SearchSyncWorkerEmitter extends QueuePriorityService {
)
}

public async triggerMemberSync(tenantId: string, memberId: string, onboarding: boolean) {
public async triggerMemberSync(
tenantId: string,
memberId: string,
onboarding: boolean,
segmentId?: string,
) {
if (!tenantId) {
throw new Error('tenantId is required!')
}
@@ -41,8 +46,9 @@ export class SearchSyncWorkerEmitter extends QueuePriorityService {
{
type: SearchSyncWorkerQueueMessageType.SYNC_MEMBER,
memberId,
segmentId,
},
memberId,
`${memberId}:${segmentId}`,
{
onboarding,
},
@@ -211,6 +217,7 @@ export class SearchSyncWorkerEmitter extends QueuePriorityService {
tenantId: string,
organizationId: string,
onboarding: boolean,
segmentId?: string,
) {
if (!tenantId) {
throw new Error('tenantId is required!')
@@ -225,8 +232,9 @@ export class SearchSyncWorkerEmitter extends QueuePriorityService {
{
type: SearchSyncWorkerQueueMessageType.SYNC_ORGANIZATION,
organizationId,
segmentId,
},
undefined,
`${organizationId}:${segmentId}`,
{
onboarding,
},
9 changes: 0 additions & 9 deletions services/libs/opensearch/src/repo/activity.repo.ts
Original file line number Diff line number Diff line change
@@ -57,15 +57,6 @@ export class ActivityRepository extends RepositoryBase<ActivityRepository> {
return results.map((r) => r.id)
}

public async markSynced(activityIds: string[]): Promise<void> {
await this.db().none(
`update activities set "searchSyncedAt" = now() where id in ($(activityIds:csv))`,
{
activityIds,
},
)
}

public async getTenantActivitiesForSync(
tenantId: string,
perPage: number,
9 changes: 0 additions & 9 deletions services/libs/opensearch/src/repo/member.repo.ts
Original file line number Diff line number Diff line change
@@ -555,15 +555,6 @@ export class MemberRepository extends RepositoryBase<MemberRepository> {
return results
}

public async markSynced(memberIds: string[]): Promise<void> {
await this.db().none(
`update members set "searchSyncedAt" = now() where id in ($(memberIds:csv))`,
{
memberIds,
},
)
}

public async checkMembersExists(tenantId: string, memberIds: string[]): Promise<string[]> {
const results = await this.db().any(
`
2 changes: 0 additions & 2 deletions services/libs/opensearch/src/repo/organization.data.ts
Original file line number Diff line number Diff line change
@@ -68,8 +68,6 @@ export interface IDbOrganizationSyncData {
activityCount: number
memberCount: number
identities: IOrganizationIdentity[]
toMergeIds: string[]
noMergeIds: string[]
memberIds?: string[]
}

77 changes: 26 additions & 51 deletions services/libs/opensearch/src/repo/organization.repo.ts
Original file line number Diff line number Diff line change
@@ -127,26 +127,6 @@ export class OrganizationRepository extends RepositoryBase<OrganizationRepositor
$(segmentId)::uuid AS "segmentId",
$(organizationId)::uuid AS "organizationId"
),
to_merge_data AS (
SELECT
otm."organizationId",
array_agg(DISTINCT otm."toMergeId"::text) AS to_merge_ids
FROM "organizationToMerge" otm
INNER JOIN organizations o2 ON otm."toMergeId" = o2.id
WHERE otm."organizationId" = $(organizationId)
AND o2."deletedAt" IS NULL
GROUP BY otm."organizationId"
),
no_merge_data AS (
SELECT
onm."organizationId",
array_agg(DISTINCT onm."noMergeId"::text) AS no_merge_ids
FROM "organizationNoMerge" onm
INNER JOIN organizations o2 ON onm."noMergeId" = o2.id
WHERE onm."organizationId" = $(organizationId)
AND o2."deletedAt" IS NULL
GROUP BY onm."organizationId"
),
activities_1 AS (
SELECT
a.id,
@@ -156,11 +136,6 @@ export class OrganizationRepository extends RepositoryBase<OrganizationRepositor
a.timestamp,
a.platform::TEXT
FROM activities a
JOIN members m ON a."memberId" = m.id
AND m."deletedAt" IS NULL
JOIN "memberOrganizations" mo ON m.id = mo."memberId"
AND a."organizationId" = mo."organizationId"
AND mo."deletedAt" IS NULL
WHERE a."organizationId" = $(organizationId)
),
member_data AS (
@@ -254,14 +229,10 @@ export class OrganizationRepository extends RepositoryBase<OrganizationRepositor
md."activityCount"::integer,
md."memberCount"::integer,
md."memberIds",
coalesce(i.identities, '[]'::jsonb) as "identities",
coalesce(tmd.to_merge_ids, array []::text[]) as "toMergeIds",
coalesce(nmd.no_merge_ids, array []::text[]) as "noMergeIds"
coalesce(i.identities, '[]'::jsonb) as "identities"
FROM organizations o
LEFT JOIN member_data md ON o.id = md."organizationId"
LEFT JOIN identities i ON o.id = i."organizationId"
LEFT JOIN to_merge_data tmd on o.id = tmd."organizationId"
LEFT JOIN no_merge_data nmd on o.id = nmd."organizationId"
WHERE o.id = $(organizationId)
AND o."deletedAt" IS NULL;
`,
@@ -294,15 +265,6 @@ export class OrganizationRepository extends RepositoryBase<OrganizationRepositor
return results.map((r) => r.id)
}

public async markSynced(orgIds: string[]): Promise<void> {
await this.db().none(
`update organizations set "searchSyncedAt" = now() where id in ($(orgIds:csv))`,
{
orgIds,
},
)
}

public async getTenantOrganizationsForSync(
tenantId: string,
page: number,
@@ -335,18 +297,31 @@ export class OrganizationRepository extends RepositoryBase<OrganizationRepositor
return results.map((r) => r.tenantId)
}

public async getOrganizationSegmentCouples(ids): Promise<IOrganizationSegmentMatrix> {
const results = await this.db().any(
`
select distinct mo."organizationId", a."segmentId"
from "memberOrganizations" mo
inner join activities a on mo."memberId" = a."memberId"
where mo."organizationId" in ($(ids:csv));
`,
{
ids,
},
)
public async getOrganizationSegmentCouples(
organizationIds: string[],
segmentIds?: string[],
): Promise<IOrganizationSegmentMatrix> {
let results = []
if (segmentIds && segmentIds.length > 0) {
for (const organizationId of organizationIds) {
for (const segmentId of segmentIds) {
results.push({
organizationId,
segmentId,
})
}
}
} else {
results = await this.db().any(
`
select distinct a."organizationId", a."segmentId"
from activities a
where a."organizationId" in ($(organizationIds:csv)); `,
{
organizationIds,
},
)
}

const matrix = {}

2 changes: 0 additions & 2 deletions services/libs/opensearch/src/service/activity.sync.service.ts
Original file line number Diff line number Diff line change
@@ -252,8 +252,6 @@ export class ActivitySyncService {
}
}),
)

await this.activityRepo.markSynced(activities.map((m) => m.id))
}

return activities.length
2 changes: 0 additions & 2 deletions services/libs/opensearch/src/service/init.service.ts
Original file line number Diff line number Diff line change
@@ -141,8 +141,6 @@ export class InitService {
grossAdditionsByMonth: { '2022-05': 7, '2022-06': 6, '2022-07': 1, '2022-08': 1 },
grossDeparturesByMonth: { '2022-06': 2, '2022-07': 1, '2022-08': 2, '2022-09': 2 },
directSubsidiaries: ['Fake direct subsidiary 1', 'Fake direct subsidiary 2'],
toMergeIds: ['0ab4c62a-8dd4-4ecf-9c61-cf4c49311d49'],
noMergeIds: ['7cb770ab-0d6c-411f-b1e1-259ae6ade057'],
}

const prepared = OrganizationSyncService.prefixData(fakeOrg)
8 changes: 1 addition & 7 deletions services/libs/opensearch/src/service/member.sync.service.ts
Original file line number Diff line number Diff line change
@@ -313,7 +313,7 @@ export class MemberSyncService {
}

public async syncMembers(memberIds: string[], segmentIds?: string[]): Promise<IMemberSyncResult> {
const CONCURRENT_DATABASE_QUERIES = 25
const CONCURRENT_DATABASE_QUERIES = 5
const BULK_INDEX_DOCUMENT_BATCH_SIZE = 2500

// get all memberId-segmentId couples
@@ -446,10 +446,6 @@ export class MemberSyncService {
}
}

if (successfullySyncedMembers.length > 0) {
await this.memberRepo.markSynced(successfullySyncedMembers)
}

return {
membersSynced: memberIds.length,
documentsIndexed,
@@ -545,8 +541,6 @@ export class MemberSyncService {
memberCount += memberIds.length
}

await this.memberRepo.markSynced(memberIds)

return {
membersSynced: memberCount,
documentsIndexed: docCount,
Original file line number Diff line number Diff line change
@@ -257,13 +257,16 @@ export class OrganizationSyncService {
* @param organizationIds organizationIds to be synced to opensearch
* @returns
*/
public async syncOrganizations(organizationIds: string[]): Promise<IOrganizationSyncResult> {
const CONCURRENT_DATABASE_QUERIES = 25
public async syncOrganizations(
organizationIds: string[],
segmentIds?: string[],
): Promise<IOrganizationSyncResult> {
const CONCURRENT_DATABASE_QUERIES = 5
const BULK_INDEX_DOCUMENT_BATCH_SIZE = 2500

// get all orgId-segmentId couples
const orgSegmentCouples: IOrganizationSegmentMatrix =
await this.orgRepo.getOrganizationSegmentCouples(organizationIds)
await this.orgRepo.getOrganizationSegmentCouples(organizationIds, segmentIds)
let databaseStream = []
let syncStream = []
let documentsIndexed = 0
@@ -386,8 +389,6 @@ export class OrganizationSyncService {
}
}

await this.orgRepo.markSynced(organizationIds)

return {
organizationsSynced: organizationIds.length,
documentsIndexed,