From 4b288733184c477d7c2abe8e49d0195dda5412a4 Mon Sep 17 00:00:00 2001 From: Yeganathan S Date: Wed, 29 Jan 2025 13:52:43 +0530 Subject: [PATCH 01/26] rm tenant logic in search-sync scripts and logic --- .../src/workflows/syncAllMembers.ts | 0 .../apps/search_sync_api/src/routes/member.ts | 17 -------- .../src/routes/organization.ts | 19 --------- .../src/bin/sync-all-members.ts | 40 +++--------------- .../src/bin/sync-all-organizations.ts | 41 ++++--------------- .../src/bin/sync-tenant-members.ts | 36 ---------------- .../src/bin/sync-tenant-organizations.ts | 33 --------------- .../search_sync_worker/src/queue/index.ts | 16 -------- .../emitters/searchSyncWorker.emitter.ts | 25 ----------- services/libs/opensearch/src/apiClient.ts | 10 ----- .../libs/opensearch/src/repo/indexing.data.ts | 5 --- .../libs/opensearch/src/repo/indexing.repo.ts | 8 ++-- .../libs/opensearch/src/repo/member.repo.ts | 6 +-- .../opensearch/src/repo/organization.repo.ts | 11 ++--- .../src/service/member.sync.service.ts | 21 +++------- .../src/service/organization.sync.service.ts | 21 +++------- .../src/queue/search_sync_worker/index.ts | 2 - 17 files changed, 32 insertions(+), 279 deletions(-) create mode 100644 services/apps/script_executor_worker/src/workflows/syncAllMembers.ts delete mode 100644 services/apps/search_sync_worker/src/bin/sync-tenant-members.ts delete mode 100644 services/apps/search_sync_worker/src/bin/sync-tenant-organizations.ts diff --git a/services/apps/script_executor_worker/src/workflows/syncAllMembers.ts b/services/apps/script_executor_worker/src/workflows/syncAllMembers.ts new file mode 100644 index 0000000000..e69de29bb2 diff --git a/services/apps/search_sync_api/src/routes/member.ts b/services/apps/search_sync_api/src/routes/member.ts index f96e2072e6..45a8fff5e9 100644 --- a/services/apps/search_sync_api/src/routes/member.ts +++ b/services/apps/search_sync_api/src/routes/member.ts @@ -27,23 +27,6 @@ router.post( }), ) -router.post( - '/sync/tenant/members', - asyncWrap(async (req: ApiRequest, res) => { - const memberSyncService = syncService(req) - - const { tenantId } = req.body - try { - req.log.trace(`Calling memberSyncService.syncTenantMembers for tenant ${tenantId}`) - await memberSyncService.syncTenantMembers(tenantId) - res.sendStatus(200) - } catch (error) { - req.log.error(error) - res.status(500).send(error.message) - } - }), -) - router.post( '/sync/organization/members', asyncWrap(async (req: ApiRequest, res) => { diff --git a/services/apps/search_sync_api/src/routes/organization.ts b/services/apps/search_sync_api/src/routes/organization.ts index 309e6fd7fd..5ce1acbed5 100644 --- a/services/apps/search_sync_api/src/routes/organization.ts +++ b/services/apps/search_sync_api/src/routes/organization.ts @@ -28,25 +28,6 @@ router.post( }), ) -router.post( - '/sync/tenant/organizations', - asyncWrap(async (req: ApiRequest, res) => { - const organizationSyncService = syncService(req) - - const { tenantId } = req.body - try { - req.log.trace( - `Calling organizationSyncService.syncTenantOrganizations for tenant ${tenantId}`, - ) - await organizationSyncService.syncTenantOrganizations(tenantId) - res.sendStatus(200) - } catch (error) { - req.log.error(error) - res.status(500).send(error.message) - } - }), -) - router.post( '/cleanup/tenant/organizations', asyncWrap(async (req: ApiRequest, res) => { diff --git a/services/apps/search_sync_worker/src/bin/sync-all-members.ts b/services/apps/search_sync_worker/src/bin/sync-all-members.ts index 3669243eec..a6a2d25baa 100644 --- a/services/apps/search_sync_worker/src/bin/sync-all-members.ts +++ b/services/apps/search_sync_worker/src/bin/sync-all-members.ts @@ -1,6 +1,4 @@ -import { timeout } from '@crowd/common' import { DbStore, getDbConnection } from '@crowd/data-access-layer/src/database' -import { MemberRepository } from '@crowd/data-access-layer/src/old/apps/search_sync_worker/member.repo' import { getServiceLogger } from '@crowd/logging' import { MemberSyncService, OpenSearchService, getOpensearchClient } from '@crowd/opensearch' import { IndexedEntityType } from '@crowd/opensearch/src/repo/indexing.data' @@ -14,8 +12,6 @@ const log = getServiceLogger() const processArguments = process.argv.slice(2) -const MAX_CONCURRENT = 3 - setImmediate(async () => { const osClient = await getOpensearchClient(OPENSEARCH_CONFIG()) const openSearchService = new OpenSearchService(log, osClient) @@ -35,42 +31,18 @@ setImmediate(async () => { withAggs = false } - const repo = new MemberRepository(store, log) - - const tenantIds = await repo.getTenantIds() const qdbConn = await getClientSQL() const qdbStore = new DbStore(log, qdbConn) const service = new MemberSyncService(redis, store, qdbStore, openSearchService, log) - let current = 0 - for (let i = 0; i < tenantIds.length; i++) { - const tenantId = tenantIds[i] - - while (current == MAX_CONCURRENT) { - await timeout(1000) - } - - log.info(`Processing tenant ${i + 1}/${tenantIds.length}`) - current += 1 - service - .syncTenantMembers(tenantId, 500, { withAggs }) - .then(() => { - current-- - log.info(`Processed tenant ${i + 1}/${tenantIds.length}`) - }) - .catch((err) => { - current-- - log.error( - err, - { tenantId }, - `Error syncing members for tenant ${i + 1}/${tenantIds.length}!`, - ) - }) - } + log.info('Starting sync of all members') - while (current > 0) { - await timeout(1000) + try { + await service.syncAllMembers(500, { withAggs }) + log.info('Successfully synced all members') + } catch (err) { + log.error(err, 'Error syncing members!') } process.exit(0) diff --git a/services/apps/search_sync_worker/src/bin/sync-all-organizations.ts b/services/apps/search_sync_worker/src/bin/sync-all-organizations.ts index de9ebc24f1..6908b5011f 100644 --- a/services/apps/search_sync_worker/src/bin/sync-all-organizations.ts +++ b/services/apps/search_sync_worker/src/bin/sync-all-organizations.ts @@ -1,6 +1,4 @@ -import { timeout } from '@crowd/common' import { DbStore, getDbConnection } from '@crowd/data-access-layer/src/database' -import { OrganizationRepository } from '@crowd/data-access-layer/src/old/apps/search_sync_worker/organization.repo' import { getServiceLogger } from '@crowd/logging' import { OpenSearchService, OrganizationSyncService, getOpensearchClient } from '@crowd/opensearch' import { IndexedEntityType } from '@crowd/opensearch/src/repo/indexing.data' @@ -13,8 +11,6 @@ const log = getServiceLogger() const processArguments = process.argv.slice(2) -const MAX_CONCURRENT = 3 - setImmediate(async () => { const osClient = await getOpensearchClient(OPENSEARCH_CONFIG()) const openSearchService = new OpenSearchService(log, osClient) @@ -52,9 +48,6 @@ setImmediate(async () => { } const readStore = new DbStore(log, readHost) - const repo = new OrganizationRepository(readStore, log) - - const tenantIds = await repo.getTenantIds() const service = new OrganizationSyncService( qdbStore, @@ -64,35 +57,15 @@ setImmediate(async () => { readStore, ) - let current = 0 - for (let i = 0; i < tenantIds.length; i++) { - const tenantId = tenantIds[i] - - while (current == MAX_CONCURRENT) { - await timeout(1000) - } - - log.info(`Processing tenant ${i + 1}/${tenantIds.length}`) - current += 1 - service - .syncTenantOrganizations(tenantId, 500, { withAggs }) - .then(() => { - current-- - log.info(`Processed tenant ${i + 1}/${tenantIds.length}`) - }) - .catch((err) => { - current-- - log.error( - err, - { tenantId }, - `Error syncing organizations for tenant ${i + 1}/${tenantIds.length}!`, - ) - }) - } + log.info('Starting sync of all organizations') - while (current > 0) { - await timeout(1000) + try { + await service.syncAllOrganizations(500, { withAggs }) + log.info('Successfully synced all organizations') + } catch (err) { + log.error(err, 'Error syncing organizations!') } process.exit(0) }) + diff --git a/services/apps/search_sync_worker/src/bin/sync-tenant-members.ts b/services/apps/search_sync_worker/src/bin/sync-tenant-members.ts deleted file mode 100644 index 7463066dc4..0000000000 --- a/services/apps/search_sync_worker/src/bin/sync-tenant-members.ts +++ /dev/null @@ -1,36 +0,0 @@ -import { DbStore, getDbConnection } from '@crowd/data-access-layer/src/database' -import { getServiceLogger } from '@crowd/logging' -import { MemberSyncService, OpenSearchService, getOpensearchClient } from '@crowd/opensearch' -import { getClientSQL } from '@crowd/questdb' -import { getRedisClient } from '@crowd/redis' - -import { DB_CONFIG, OPENSEARCH_CONFIG, REDIS_CONFIG } from '../conf' - -const log = getServiceLogger() - -const processArguments = process.argv.slice(2) - -if (processArguments.length !== 1) { - log.error('Expected 1 argument: tenantId') - process.exit(1) -} - -const tenantId = processArguments[0] - -setImmediate(async () => { - const osClient = await getOpensearchClient(OPENSEARCH_CONFIG()) - const openSearchService = new OpenSearchService(log, osClient) - - const redis = await getRedisClient(REDIS_CONFIG(), true) - - const dbConnection = await getDbConnection(DB_CONFIG()) - const store = new DbStore(log, dbConnection) - const qdbConn = await getClientSQL() - const qdbStore = new DbStore(log, qdbConn) - - const service = new MemberSyncService(redis, store, qdbStore, openSearchService, log) - - await service.syncTenantMembers(tenantId) - - process.exit(0) -}) diff --git a/services/apps/search_sync_worker/src/bin/sync-tenant-organizations.ts b/services/apps/search_sync_worker/src/bin/sync-tenant-organizations.ts deleted file mode 100644 index 05f5337c60..0000000000 --- a/services/apps/search_sync_worker/src/bin/sync-tenant-organizations.ts +++ /dev/null @@ -1,33 +0,0 @@ -import { DbStore, getDbConnection } from '@crowd/data-access-layer/src/database' -import { getServiceLogger } from '@crowd/logging' -import { OpenSearchService, OrganizationSyncService, getOpensearchClient } from '@crowd/opensearch' -import { getClientSQL } from '@crowd/questdb' - -import { DB_CONFIG, OPENSEARCH_CONFIG } from '../conf' - -const log = getServiceLogger() - -const processArguments = process.argv.slice(2) - -if (processArguments.length !== 1) { - log.error('Expected 1 argument: tenantId') - process.exit(1) -} - -const tenantId = processArguments[0] - -setImmediate(async () => { - const osClient = await getOpensearchClient(OPENSEARCH_CONFIG()) - const openSearchService = new OpenSearchService(log, osClient) - - const dbConnection = await getDbConnection(DB_CONFIG()) - const store = new DbStore(log, dbConnection) - const qdbConn = await getClientSQL() - const qdbStore = new DbStore(log, qdbConn) - - const service = new OrganizationSyncService(qdbStore, store, openSearchService, log) - - await service.syncTenantOrganizations(tenantId) - - process.exit(0) -}) diff --git a/services/apps/search_sync_worker/src/queue/index.ts b/services/apps/search_sync_worker/src/queue/index.ts index 06855baf30..5b31183487 100644 --- a/services/apps/search_sync_worker/src/queue/index.ts +++ b/services/apps/search_sync_worker/src/queue/index.ts @@ -98,15 +98,6 @@ export class WorkerQueueReceiver extends PrioritizedQueueReciever { }) } - break - // this one taks a while so we can't relly on it to be finished in time and the queue message might pop up again so we immediatelly return - case SearchSyncWorkerQueueMessageType.SYNC_TENANT_MEMBERS: - if (data.tenantId) { - this.initMemberService() - .syncTenantMembers(data.tenantId) - .catch((err) => this.log.error(err, 'Error while syncing tenant members!')) - } - break case SearchSyncWorkerQueueMessageType.SYNC_ORGANIZATION_MEMBERS: if (data.organizationId) { @@ -138,13 +129,6 @@ export class WorkerQueueReceiver extends PrioritizedQueueReciever { await this.initOrganizationService().syncOrganizations([data.organizationId]) } break - case SearchSyncWorkerQueueMessageType.SYNC_TENANT_ORGANIZATIONS: - if (data.tenantId) { - this.initOrganizationService() - .syncTenantOrganizations(data.tenantId) - .catch((err) => this.log.error(err, 'Error while syncing tenant organizations!')) - } - break case SearchSyncWorkerQueueMessageType.CLEANUP_TENANT_ORGANIZATIONS: if (data.tenantId) { this.initOrganizationService() diff --git a/services/libs/common_services/src/services/emitters/searchSyncWorker.emitter.ts b/services/libs/common_services/src/services/emitters/searchSyncWorker.emitter.ts index 68d2364398..c15bc1eb5d 100644 --- a/services/libs/common_services/src/services/emitters/searchSyncWorker.emitter.ts +++ b/services/libs/common_services/src/services/emitters/searchSyncWorker.emitter.ts @@ -50,16 +50,6 @@ export class SearchSyncWorkerEmitter extends QueuePriorityService { ) } - public async triggerTenantMembersSync(tenantId: string) { - if (!tenantId) { - throw new Error('tenantId is required!') - } - await this.sendMessage(tenantId, tenantId, { - type: SearchSyncWorkerQueueMessageType.SYNC_TENANT_MEMBERS, - tenantId, - }) - } - public async triggerOrganizationMembersSync( tenantId: string, organizationId: string, @@ -146,21 +136,6 @@ export class SearchSyncWorkerEmitter extends QueuePriorityService { ) } - public async triggerTenantOrganizationSync(tenantId: string) { - if (!tenantId) { - throw new Error('tenantId is required!') - } - await this.sendMessage( - tenantId, - tenantId, - { - type: SearchSyncWorkerQueueMessageType.SYNC_TENANT_ORGANIZATIONS, - tenantId, - }, - tenantId, - ) - } - public async triggerRemoveOrganization( tenantId: string, organizationId: string, diff --git a/services/libs/opensearch/src/apiClient.ts b/services/libs/opensearch/src/apiClient.ts index 24e60256f0..35bd4144fb 100644 --- a/services/libs/opensearch/src/apiClient.ts +++ b/services/libs/opensearch/src/apiClient.ts @@ -27,16 +27,6 @@ export class SearchSyncApiClient { }) } - public async triggerTenantMembersSync(tenantId: string): Promise { - if (!tenantId) { - throw new Error('tenantId is required!') - } - - await this.searchSyncApi.post('/sync/tenant/members', { - tenantId, - }) - } - public async triggerOrganizationMembersSync( tenantId: string, organizationId: string, diff --git a/services/libs/opensearch/src/repo/indexing.data.ts b/services/libs/opensearch/src/repo/indexing.data.ts index 3504097fcb..ab18886843 100644 --- a/services/libs/opensearch/src/repo/indexing.data.ts +++ b/services/libs/opensearch/src/repo/indexing.data.ts @@ -1,8 +1,3 @@ -export interface IEntityData { - id: string - tenantId: string -} - export enum IndexedEntityType { ACTIVITY = 'activity', MEMBER = 'member', diff --git a/services/libs/opensearch/src/repo/indexing.repo.ts b/services/libs/opensearch/src/repo/indexing.repo.ts index 378f9bd659..f57570165b 100644 --- a/services/libs/opensearch/src/repo/indexing.repo.ts +++ b/services/libs/opensearch/src/repo/indexing.repo.ts @@ -1,7 +1,7 @@ import { DbStore, RepositoryBase } from '@crowd/database' import { Logger } from '@crowd/logging' -import { IEntityData, IndexedEntityType } from './indexing.data' +import { IndexedEntityType } from './indexing.data' export class IndexingRepository extends RepositoryBase { constructor(dbStore: DbStore, parentLog: Logger) { @@ -19,11 +19,11 @@ export class IndexingRepository extends RepositoryBase { ) } - public async markEntitiesIndexed(type: IndexedEntityType, data: IEntityData[]): Promise { + public async markEntitiesIndexed(type: IndexedEntityType, data: string[]): Promise { if (data.length > 0) { - const values = data.map((d) => `('${type}', '${d.id}', '${d.tenantId}')`) + const values = data.map((d) => `('${type}', '${d}')`) const query = ` - insert into indexed_entities(type, entity_id, tenant_id) + insert into indexed_entities(type, entity_id) values ${values.join(',\n')} on conflict (type, entity_id) do update set indexed_at = now() diff --git a/services/libs/opensearch/src/repo/member.repo.ts b/services/libs/opensearch/src/repo/member.repo.ts index 3109cc2f6d..5cbffa15bb 100644 --- a/services/libs/opensearch/src/repo/member.repo.ts +++ b/services/libs/opensearch/src/repo/member.repo.ts @@ -41,18 +41,16 @@ export class MemberRepository extends RepositoryBase { return results } - public async getTenantMembersForSync(tenantId: string, perPage: number): Promise { + public async getMembersForSync(perPage: number): Promise { // eslint-disable-next-line @typescript-eslint/no-explicit-any const results = await this.db().any( ` select m.id from members m left join indexed_entities ie on m.id = ie.entity_id and ie.type = $(type) - where m."tenantId" = $(tenantId) and - ie.entity_id is null + where ie.entity_id is null limit ${perPage};`, { - tenantId, type: IndexedEntityType.MEMBER, }, ) diff --git a/services/libs/opensearch/src/repo/organization.repo.ts b/services/libs/opensearch/src/repo/organization.repo.ts index 01742eb07d..2b75e3a33e 100644 --- a/services/libs/opensearch/src/repo/organization.repo.ts +++ b/services/libs/opensearch/src/repo/organization.repo.ts @@ -8,19 +8,17 @@ export class OrganizationRepository extends RepositoryBase { + public async checkOrganizationsExists(orgIds: string[]): Promise { const results = await this.db().any( ` select id from organizations where - "tenantId" = $(tenantId) and id in ($(orgIds:csv)) and "deletedAt" is null `, { - tenantId, orgIds, }, ) @@ -28,8 +26,7 @@ export class OrganizationRepository extends RepositoryBase r.id) } - public async getTenantOrganizationsForSync( - tenantId: string, + public async getAllOrganizationsForSync( perPage: number, previousBatchIds: string[], ): Promise { @@ -40,13 +37,11 @@ export class OrganizationRepository extends RepositoryBase { - this.log.debug({ tenantId }, 'Syncing all tenant members!') + this.log.debug('Syncing all members!') let docCount = 0 let memberCount = 0 const now = new Date() - let memberIds = await this.memberRepo.getTenantMembersForSync(tenantId, batchSize) + let memberIds = await this.memberRepo.getMembersForSync(batchSize) while (memberIds.length > 0) { for (const memberId of memberIds) { @@ -289,27 +288,17 @@ export class MemberSyncService { const diffInSeconds = (new Date().getTime() - now.getTime()) / 1000 this.log.info( - { tenantId }, `Synced ${memberCount} members! Speed: ${Math.round( memberCount / diffInSeconds, )} members/second!`, ) - await this.indexingRepo.markEntitiesIndexed( - IndexedEntityType.MEMBER, - memberIds.map((id) => { - return { - id, - tenantId, - } - }), - ) + await this.indexingRepo.markEntitiesIndexed(IndexedEntityType.MEMBER, memberIds) - memberIds = await this.memberRepo.getTenantMembersForSync(tenantId, batchSize) + memberIds = await this.memberRepo.getMembersForSync(batchSize) } this.log.info( - { tenantId }, `Synced total of ${memberCount} members with ${docCount} documents!`, ) } diff --git a/services/libs/opensearch/src/service/organization.sync.service.ts b/services/libs/opensearch/src/service/organization.sync.service.ts index a611b0cabe..741a75c2ef 100644 --- a/services/libs/opensearch/src/service/organization.sync.service.ts +++ b/services/libs/opensearch/src/service/organization.sync.service.ts @@ -154,7 +154,6 @@ export class OrganizationSyncService { while (results.length > 0) { // check every organization if they exists in the database and if not remove them from the index const dbIds = await this.readOrgRepo.checkOrganizationsExists( - tenantId, results.map((r) => r._source.uuid_organizationId), ) @@ -245,12 +244,11 @@ export class OrganizationSyncService { } } - public async syncTenantOrganizations( - tenantId: string, + public async syncAllOrganizations( batchSize = 200, opts: { withAggs?: boolean } = { withAggs: true }, ): Promise { - this.log.warn({ tenantId }, 'Syncing all tenant organizations!') + this.log.warn('Syncing all organizations!') let docCount = 0 let organizationCount = 0 let previousBatchIds: string[] = [] @@ -258,8 +256,7 @@ export class OrganizationSyncService { await logExecutionTime( async () => { - let organizationIds = await this.readOrgRepo.getTenantOrganizationsForSync( - tenantId, + let organizationIds = await this.readOrgRepo.getAllOrganizationsForSync( batchSize, previousBatchIds, ) @@ -275,7 +272,6 @@ export class OrganizationSyncService { const diffInMinutes = (new Date().getTime() - now.getTime()) / 1000 / 60 this.log.info( - { tenantId }, `Synced ${organizationCount} organizations! Speed: ${Math.round( organizationCount / diffInMinutes, )} organizations/minute!`, @@ -283,17 +279,11 @@ export class OrganizationSyncService { await this.indexingRepo.markEntitiesIndexed( IndexedEntityType.ORGANIZATION, - organizationIds.map((id) => { - return { - id, - tenantId, - } - }), + organizationIds, ) previousBatchIds = organizationIds - organizationIds = await this.readOrgRepo.getTenantOrganizationsForSync( - tenantId, + organizationIds = await this.readOrgRepo.getAllOrganizationsForSync( batchSize, previousBatchIds, ) @@ -304,7 +294,6 @@ export class OrganizationSyncService { ) this.log.info( - { tenantId }, `Synced total of ${organizationCount} organizations with ${docCount} documents!`, ) } diff --git a/services/libs/types/src/queue/search_sync_worker/index.ts b/services/libs/types/src/queue/search_sync_worker/index.ts index ac09918f2c..1f86131106 100644 --- a/services/libs/types/src/queue/search_sync_worker/index.ts +++ b/services/libs/types/src/queue/search_sync_worker/index.ts @@ -1,12 +1,10 @@ export enum SearchSyncWorkerQueueMessageType { SYNC_MEMBER = 'sync_member', - SYNC_TENANT_MEMBERS = 'sync_tenant_members', SYNC_ORGANIZATION_MEMBERS = 'sync_organization_members', REMOVE_MEMBER = 'remove_member', CLEANUP_TENANT_MEMBERS = 'cleanup_tenant_members', SYNC_ORGANIZATION = 'sync_organization', - SYNC_TENANT_ORGANIZATIONS = 'sync_tenant_organizations', REMOVE_ORGANIZATION = 'remove_organization', CLEANUP_TENANT_ORGANIZATIONS = 'cleanup_tenant_organizations', } From 25fa2fc13831606e1890bf3f62e71e5abf53c86d Mon Sep 17 00:00:00 2001 From: Yeganathan S Date: Wed, 29 Jan 2025 14:25:17 +0530 Subject: [PATCH 02/26] move member sync script to script-executor-worker --- pnpm-lock.yaml | 3 ++ .../apps/script_executor_worker/package.json | 1 + .../script_executor_worker/src/activities.ts | 3 ++ .../src/activities/search-sync/index.ts | 43 +++++++++++++++++++ .../apps/script_executor_worker/src/main.ts | 4 +- .../apps/script_executor_worker/src/types.ts | 6 +++ .../script_executor_worker/src/workflows.ts | 2 + .../src/workflows/syncAllMembers.ts | 0 .../src/workflows/syncMembers.ts | 26 +++++++++++ 9 files changed, 86 insertions(+), 2 deletions(-) create mode 100644 services/apps/script_executor_worker/src/activities/search-sync/index.ts delete mode 100644 services/apps/script_executor_worker/src/workflows/syncAllMembers.ts create mode 100644 services/apps/script_executor_worker/src/workflows/syncMembers.ts diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 78632c4162..823538987a 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -1044,6 +1044,9 @@ importers: '@crowd/logging': specifier: workspace:* version: link:../../libs/logging + '@crowd/opensearch': + specifier: workspace:* + version: link:../../libs/opensearch '@crowd/redis': specifier: workspace:* version: link:../../libs/redis diff --git a/services/apps/script_executor_worker/package.json b/services/apps/script_executor_worker/package.json index 713abab5cb..bb436ec729 100644 --- a/services/apps/script_executor_worker/package.json +++ b/services/apps/script_executor_worker/package.json @@ -16,6 +16,7 @@ "@crowd/archetype-worker": "workspace:*", "@crowd/data-access-layer": "workspace:*", "@crowd/logging": "workspace:*", + "@crowd/opensearch": "workspace:*", "@crowd/redis": "workspace:*", "@crowd/types": "workspace:*", "@temporalio/workflow": "~1.11.1", diff --git a/services/apps/script_executor_worker/src/activities.ts b/services/apps/script_executor_worker/src/activities.ts index 0ec6f3703c..e61f92a542 100644 --- a/services/apps/script_executor_worker/src/activities.ts +++ b/services/apps/script_executor_worker/src/activities.ts @@ -21,6 +21,7 @@ import { findMembersWithSamePlatformIdentitiesDifferentCapitalization, findMembersWithSameVerifiedEmailsInDifferentPlatforms, } from './activities/merge-members-with-similar-identities' +import { syncAllMembers, syncMember } from './activities/search-sync' export { findMembersWithSameVerifiedEmailsInDifferentPlatforms, @@ -38,4 +39,6 @@ export { updateOrganizationIdentity, deleteOrganizationIdentity, isLfxMember, + syncMember, + syncAllMembers, } diff --git a/services/apps/script_executor_worker/src/activities/search-sync/index.ts b/services/apps/script_executor_worker/src/activities/search-sync/index.ts new file mode 100644 index 0000000000..032ba5a216 --- /dev/null +++ b/services/apps/script_executor_worker/src/activities/search-sync/index.ts @@ -0,0 +1,43 @@ +import { DbStore } from '@crowd/data-access-layer/src/database' +import { MemberRepository } from '@crowd/data-access-layer/src/old/apps/search_sync_worker/member.repo' +import { MemberSyncService } from '@crowd/opensearch' + +import { svc } from '../../main' + +export async function syncMember(memberId: string, withAggs: boolean): Promise { + try { + const memberRepo = new MemberRepository(svc.postgres.writer, svc.log) + const service = new MemberSyncService( + svc.redis, + svc.postgres.writer, + new DbStore(svc.log, svc.questdbSQL), + svc.opensearch, + svc.log, + ) + + const results = await memberRepo.checkMembersExist([memberId]) + if (results.length === 0) { + svc.log.error(`Member ${memberId} not found!`) + return + } + + await service.syncMembers(memberId, { withAggs }) + } catch (err) { + throw new Error(err) + } +} + +export async function syncAllMembers(batchSize: number, withAggs: boolean): Promise { + try { + const service = new MemberSyncService( + svc.redis, + svc.postgres.writer, + new DbStore(svc.log, svc.questdbSQL), + svc.opensearch, + svc.log, + ) + await service.syncAllMembers(batchSize, { withAggs }) + } catch (err) { + throw new Error(err) + } +} diff --git a/services/apps/script_executor_worker/src/main.ts b/services/apps/script_executor_worker/src/main.ts index 8ae0530519..ae773f0360 100644 --- a/services/apps/script_executor_worker/src/main.ts +++ b/services/apps/script_executor_worker/src/main.ts @@ -10,7 +10,7 @@ const config: Config = { enabled: true, }, questdb: { - enabled: false, + enabled: true, }, redis: { enabled: true, @@ -22,7 +22,7 @@ const options: Options = { enabled: true, }, opensearch: { - enabled: false, + enabled: true, }, } diff --git a/services/apps/script_executor_worker/src/types.ts b/services/apps/script_executor_worker/src/types.ts index 5447633312..6da6994b3d 100644 --- a/services/apps/script_executor_worker/src/types.ts +++ b/services/apps/script_executor_worker/src/types.ts @@ -26,3 +26,9 @@ export interface IFixOrgIdentitiesWithWrongUrlsArgs { tenantId: string testRun?: boolean } + +export interface ISyncMembersArgs { + batchSize?: number + withAggs?: boolean + memberIds?: string[] +} diff --git a/services/apps/script_executor_worker/src/workflows.ts b/services/apps/script_executor_worker/src/workflows.ts index 783ed8e00c..ad42ec6fe2 100644 --- a/services/apps/script_executor_worker/src/workflows.ts +++ b/services/apps/script_executor_worker/src/workflows.ts @@ -2,10 +2,12 @@ import { dissectMember } from './workflows/dissectMember' import { findAndMergeMembersWithSamePlatformIdentitiesDifferentCapitalization } from './workflows/findAndMergeMembersWithSamePlatformIdentitiesDifferentCapitalization' import { findAndMergeMembersWithSameVerifiedEmailsInDifferentPlatforms } from './workflows/findAndMergeMembersWithSameVerifiedEmailsInDifferentPlatforms' import { fixOrgIdentitiesWithWrongUrls } from './workflows/fixOrgIdentitiesWithWrongUrls' +import { syncMembers } from './workflows/syncMembers' export { findAndMergeMembersWithSameVerifiedEmailsInDifferentPlatforms, findAndMergeMembersWithSamePlatformIdentitiesDifferentCapitalization, dissectMember, fixOrgIdentitiesWithWrongUrls, + syncMembers, } diff --git a/services/apps/script_executor_worker/src/workflows/syncAllMembers.ts b/services/apps/script_executor_worker/src/workflows/syncAllMembers.ts deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/services/apps/script_executor_worker/src/workflows/syncMembers.ts b/services/apps/script_executor_worker/src/workflows/syncMembers.ts new file mode 100644 index 0000000000..0c026c89ad --- /dev/null +++ b/services/apps/script_executor_worker/src/workflows/syncMembers.ts @@ -0,0 +1,26 @@ +import { proxyActivities } from '@temporalio/workflow' + +import * as activities from '../activities/search-sync' +import { ISyncMembersArgs } from '../types' + +const activity = proxyActivities({ + startToCloseTimeout: '1 hour', + retry: { maximumAttempts: 3 }, +}) + +export async function syncMembers(args: ISyncMembersArgs): Promise { + const BATCH_SIZE = args.batchSize || 500 + const WITH_AGGS = args.withAggs || true + + console.log('Starting members sync!') + + if (args.memberIds.length) { + for (const memberId of args.memberIds) { + await activity.syncMember(memberId, WITH_AGGS) + } + console.log(`Successfully synced ${args.memberIds.length} members`) + } else { + await activity.syncAllMembers(BATCH_SIZE, WITH_AGGS) + console.log(`Successfully synced all members`) + } +} From 0f77c45d9e0213d5acad45652c2cfdeb930011fa Mon Sep 17 00:00:00 2001 From: Yeganathan S Date: Wed, 29 Jan 2025 14:39:06 +0530 Subject: [PATCH 03/26] rm memberIds support in workflow --- .../script_executor_worker/src/activities.ts | 4 +-- .../src/activities/search-sync/index.ts | 27 ++++--------------- .../apps/script_executor_worker/src/types.ts | 2 +- .../script_executor_worker/src/workflows.ts | 4 +-- .../{syncMembers.ts => syncAllMembers.ts} | 16 +++++------ 5 files changed, 17 insertions(+), 36 deletions(-) rename services/apps/script_executor_worker/src/workflows/{syncMembers.ts => syncAllMembers.ts} (50%) diff --git a/services/apps/script_executor_worker/src/activities.ts b/services/apps/script_executor_worker/src/activities.ts index e61f92a542..7148bdc131 100644 --- a/services/apps/script_executor_worker/src/activities.ts +++ b/services/apps/script_executor_worker/src/activities.ts @@ -21,7 +21,7 @@ import { findMembersWithSamePlatformIdentitiesDifferentCapitalization, findMembersWithSameVerifiedEmailsInDifferentPlatforms, } from './activities/merge-members-with-similar-identities' -import { syncAllMembers, syncMember } from './activities/search-sync' +import { deleteIndexedEntities, syncAllMembers } from './activities/search-sync' export { findMembersWithSameVerifiedEmailsInDifferentPlatforms, @@ -39,6 +39,6 @@ export { updateOrganizationIdentity, deleteOrganizationIdentity, isLfxMember, - syncMember, syncAllMembers, + deleteIndexedEntities, } diff --git a/services/apps/script_executor_worker/src/activities/search-sync/index.ts b/services/apps/script_executor_worker/src/activities/search-sync/index.ts index 032ba5a216..b9cfe4d272 100644 --- a/services/apps/script_executor_worker/src/activities/search-sync/index.ts +++ b/services/apps/script_executor_worker/src/activities/search-sync/index.ts @@ -1,30 +1,13 @@ import { DbStore } from '@crowd/data-access-layer/src/database' -import { MemberRepository } from '@crowd/data-access-layer/src/old/apps/search_sync_worker/member.repo' import { MemberSyncService } from '@crowd/opensearch' +import { IndexedEntityType } from '@crowd/opensearch/src/repo/indexing.data' +import { IndexingRepository } from '@crowd/opensearch/src/repo/indexing.repo' import { svc } from '../../main' -export async function syncMember(memberId: string, withAggs: boolean): Promise { - try { - const memberRepo = new MemberRepository(svc.postgres.writer, svc.log) - const service = new MemberSyncService( - svc.redis, - svc.postgres.writer, - new DbStore(svc.log, svc.questdbSQL), - svc.opensearch, - svc.log, - ) - - const results = await memberRepo.checkMembersExist([memberId]) - if (results.length === 0) { - svc.log.error(`Member ${memberId} not found!`) - return - } - - await service.syncMembers(memberId, { withAggs }) - } catch (err) { - throw new Error(err) - } +export async function deleteIndexedEntities(): Promise { + const indexingRepo = new IndexingRepository(svc.postgres.writer, svc.log) + await indexingRepo.deleteIndexedEntities(IndexedEntityType.MEMBER) } export async function syncAllMembers(batchSize: number, withAggs: boolean): Promise { diff --git a/services/apps/script_executor_worker/src/types.ts b/services/apps/script_executor_worker/src/types.ts index 6da6994b3d..6949e5f0d6 100644 --- a/services/apps/script_executor_worker/src/types.ts +++ b/services/apps/script_executor_worker/src/types.ts @@ -29,6 +29,6 @@ export interface IFixOrgIdentitiesWithWrongUrlsArgs { export interface ISyncMembersArgs { batchSize?: number + clean?: boolean withAggs?: boolean - memberIds?: string[] } diff --git a/services/apps/script_executor_worker/src/workflows.ts b/services/apps/script_executor_worker/src/workflows.ts index ad42ec6fe2..9e5d852bd4 100644 --- a/services/apps/script_executor_worker/src/workflows.ts +++ b/services/apps/script_executor_worker/src/workflows.ts @@ -2,12 +2,12 @@ import { dissectMember } from './workflows/dissectMember' import { findAndMergeMembersWithSamePlatformIdentitiesDifferentCapitalization } from './workflows/findAndMergeMembersWithSamePlatformIdentitiesDifferentCapitalization' import { findAndMergeMembersWithSameVerifiedEmailsInDifferentPlatforms } from './workflows/findAndMergeMembersWithSameVerifiedEmailsInDifferentPlatforms' import { fixOrgIdentitiesWithWrongUrls } from './workflows/fixOrgIdentitiesWithWrongUrls' -import { syncMembers } from './workflows/syncMembers' +import { syncAllMembers } from './workflows/syncAllMembers' export { findAndMergeMembersWithSameVerifiedEmailsInDifferentPlatforms, findAndMergeMembersWithSamePlatformIdentitiesDifferentCapitalization, dissectMember, fixOrgIdentitiesWithWrongUrls, - syncMembers, + syncAllMembers, } diff --git a/services/apps/script_executor_worker/src/workflows/syncMembers.ts b/services/apps/script_executor_worker/src/workflows/syncAllMembers.ts similarity index 50% rename from services/apps/script_executor_worker/src/workflows/syncMembers.ts rename to services/apps/script_executor_worker/src/workflows/syncAllMembers.ts index 0c026c89ad..dfe2c9db22 100644 --- a/services/apps/script_executor_worker/src/workflows/syncMembers.ts +++ b/services/apps/script_executor_worker/src/workflows/syncAllMembers.ts @@ -8,19 +8,17 @@ const activity = proxyActivities({ retry: { maximumAttempts: 3 }, }) -export async function syncMembers(args: ISyncMembersArgs): Promise { +export async function syncAllMembers(args: ISyncMembersArgs): Promise { const BATCH_SIZE = args.batchSize || 500 const WITH_AGGS = args.withAggs || true console.log('Starting members sync!') - if (args.memberIds.length) { - for (const memberId of args.memberIds) { - await activity.syncMember(memberId, WITH_AGGS) - } - console.log(`Successfully synced ${args.memberIds.length} members`) - } else { - await activity.syncAllMembers(BATCH_SIZE, WITH_AGGS) - console.log(`Successfully synced all members`) + if (args.clean) { + await activity.deleteIndexedEntities() + console.log('Deleted indexed entities for members!') } + + await activity.syncAllMembers(BATCH_SIZE, WITH_AGGS) + console.log(`Successfully synced all members`) } From 7bde72e66efc36b4ed11b02d943ee66fd017f8d0 Mon Sep 17 00:00:00 2001 From: Yeganathan S Date: Wed, 29 Jan 2025 14:42:12 +0530 Subject: [PATCH 04/26] fix --- services/apps/script_executor_worker/src/activities.ts | 2 +- .../src/activities/{search-sync/index.ts => member-sync.ts} | 2 +- .../apps/script_executor_worker/src/workflows/syncAllMembers.ts | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) rename services/apps/script_executor_worker/src/activities/{search-sync/index.ts => member-sync.ts} (96%) diff --git a/services/apps/script_executor_worker/src/activities.ts b/services/apps/script_executor_worker/src/activities.ts index 7148bdc131..3867db6f63 100644 --- a/services/apps/script_executor_worker/src/activities.ts +++ b/services/apps/script_executor_worker/src/activities.ts @@ -17,11 +17,11 @@ import { isLfxMember, updateOrganizationIdentity, } from './activities/fix-organization-identities-with-wrong-urls' +import { deleteIndexedEntities, syncAllMembers } from './activities/member-sync' import { findMembersWithSamePlatformIdentitiesDifferentCapitalization, findMembersWithSameVerifiedEmailsInDifferentPlatforms, } from './activities/merge-members-with-similar-identities' -import { deleteIndexedEntities, syncAllMembers } from './activities/search-sync' export { findMembersWithSameVerifiedEmailsInDifferentPlatforms, diff --git a/services/apps/script_executor_worker/src/activities/search-sync/index.ts b/services/apps/script_executor_worker/src/activities/member-sync.ts similarity index 96% rename from services/apps/script_executor_worker/src/activities/search-sync/index.ts rename to services/apps/script_executor_worker/src/activities/member-sync.ts index b9cfe4d272..c3cf73902b 100644 --- a/services/apps/script_executor_worker/src/activities/search-sync/index.ts +++ b/services/apps/script_executor_worker/src/activities/member-sync.ts @@ -3,7 +3,7 @@ import { MemberSyncService } from '@crowd/opensearch' import { IndexedEntityType } from '@crowd/opensearch/src/repo/indexing.data' import { IndexingRepository } from '@crowd/opensearch/src/repo/indexing.repo' -import { svc } from '../../main' +import { svc } from '../main' export async function deleteIndexedEntities(): Promise { const indexingRepo = new IndexingRepository(svc.postgres.writer, svc.log) diff --git a/services/apps/script_executor_worker/src/workflows/syncAllMembers.ts b/services/apps/script_executor_worker/src/workflows/syncAllMembers.ts index dfe2c9db22..8618b982a2 100644 --- a/services/apps/script_executor_worker/src/workflows/syncAllMembers.ts +++ b/services/apps/script_executor_worker/src/workflows/syncAllMembers.ts @@ -1,6 +1,6 @@ import { proxyActivities } from '@temporalio/workflow' -import * as activities from '../activities/search-sync' +import * as activities from '../activities/member-sync' import { ISyncMembersArgs } from '../types' const activity = proxyActivities({ From e50d1cc4403315f1d5de6f2d1743b6daed28b2ff Mon Sep 17 00:00:00 2001 From: Yeganathan S Date: Wed, 29 Jan 2025 14:48:09 +0530 Subject: [PATCH 05/26] change startToCloseTimeout --- .../apps/script_executor_worker/src/workflows/syncAllMembers.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/apps/script_executor_worker/src/workflows/syncAllMembers.ts b/services/apps/script_executor_worker/src/workflows/syncAllMembers.ts index 8618b982a2..ec4a8c6131 100644 --- a/services/apps/script_executor_worker/src/workflows/syncAllMembers.ts +++ b/services/apps/script_executor_worker/src/workflows/syncAllMembers.ts @@ -4,7 +4,7 @@ import * as activities from '../activities/member-sync' import { ISyncMembersArgs } from '../types' const activity = proxyActivities({ - startToCloseTimeout: '1 hour', + startToCloseTimeout: '5 minute', retry: { maximumAttempts: 3 }, }) From 5ecfca979fe35c2257a62dc6f34f64616a5e0a81 Mon Sep 17 00:00:00 2001 From: Yeganathan S Date: Wed, 29 Jan 2025 15:04:08 +0530 Subject: [PATCH 06/26] change startToCloseTimeout --- .../script_executor_worker/src/workflows/syncAllMembers.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/services/apps/script_executor_worker/src/workflows/syncAllMembers.ts b/services/apps/script_executor_worker/src/workflows/syncAllMembers.ts index ec4a8c6131..bd6ff073e5 100644 --- a/services/apps/script_executor_worker/src/workflows/syncAllMembers.ts +++ b/services/apps/script_executor_worker/src/workflows/syncAllMembers.ts @@ -4,7 +4,7 @@ import * as activities from '../activities/member-sync' import { ISyncMembersArgs } from '../types' const activity = proxyActivities({ - startToCloseTimeout: '5 minute', + startToCloseTimeout: '10 minute', retry: { maximumAttempts: 3 }, }) @@ -20,5 +20,5 @@ export async function syncAllMembers(args: ISyncMembersArgs): Promise { } await activity.syncAllMembers(BATCH_SIZE, WITH_AGGS) - console.log(`Successfully synced all members`) + console.log(`Synced all members!`) } From 0fd156abf428eb943ef217a9a325ea2ea3c5ad67 Mon Sep 17 00:00:00 2001 From: Yeganathan S Date: Wed, 29 Jan 2025 15:21:50 +0530 Subject: [PATCH 07/26] run prettier --- services/libs/opensearch/src/service/member.sync.service.ts | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/services/libs/opensearch/src/service/member.sync.service.ts b/services/libs/opensearch/src/service/member.sync.service.ts index 0001f79ea6..1217366d42 100644 --- a/services/libs/opensearch/src/service/member.sync.service.ts +++ b/services/libs/opensearch/src/service/member.sync.service.ts @@ -298,9 +298,7 @@ export class MemberSyncService { memberIds = await this.memberRepo.getMembersForSync(batchSize) } - this.log.info( - `Synced total of ${memberCount} members with ${docCount} documents!`, - ) + this.log.info(`Synced total of ${memberCount} members with ${docCount} documents!`) } public async syncOrganizationMembers( From b12c575b68835113889c809bd867d56658e88b3d Mon Sep 17 00:00:00 2001 From: Yeganathan S Date: Wed, 29 Jan 2025 21:59:19 +0530 Subject: [PATCH 08/26] add dedup improvement --- services/libs/opensearch/src/service/member.sync.service.ts | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/services/libs/opensearch/src/service/member.sync.service.ts b/services/libs/opensearch/src/service/member.sync.service.ts index 1217366d42..f413a06cf0 100644 --- a/services/libs/opensearch/src/service/member.sync.service.ts +++ b/services/libs/opensearch/src/service/member.sync.service.ts @@ -1,4 +1,4 @@ -import { distinct, trimUtf8ToMaxByteLength } from '@crowd/common' +import { distinct, distinctBy, trimUtf8ToMaxByteLength } from '@crowd/common' import { MemberField, fetchMemberIdentities, @@ -439,6 +439,8 @@ export class MemberSyncService { } if (memberData.length > 0) { + // dedup memberData so no same member-segment duplicates + memberData = distinctBy(memberData, (m) => `${m.memberId}-${m.segmentId}`) try { await this.memberRepo.transactionally( async (txRepo) => { From 6398d7f063d841c1cf57571caabad6e31fdb2560 Mon Sep 17 00:00:00 2001 From: Yeganathan S Date: Wed, 29 Jan 2025 22:14:33 +0530 Subject: [PATCH 09/26] fix ci cd --- backend/src/services/searchSyncService.ts | 20 ------------------- .../src/service/organization.sync.service.ts | 4 +--- 2 files changed, 1 insertion(+), 23 deletions(-) diff --git a/backend/src/services/searchSyncService.ts b/backend/src/services/searchSyncService.ts index 946ee7e983..110a94deb4 100644 --- a/backend/src/services/searchSyncService.ts +++ b/backend/src/services/searchSyncService.ts @@ -63,16 +63,6 @@ export default class SearchSyncService extends LoggerBase { } } - async triggerTenantMembersSync(tenantId: string) { - const client = await this.getSearchSyncClient() - - if (client instanceof SearchSyncApiClient || client instanceof SearchSyncWorkerEmitter) { - await client.triggerTenantMembersSync(tenantId) - } else { - throw new Error('Unexpected search client type!') - } - } - async triggerOrganizationMembersSync(tenantId: string, organizationId: string) { const client = await this.getSearchSyncClient() @@ -126,16 +116,6 @@ export default class SearchSyncService extends LoggerBase { } } - async triggerTenantOrganizationSync(tenantId: string) { - const client = await this.getSearchSyncClient() - - if (client instanceof SearchSyncApiClient || client instanceof SearchSyncWorkerEmitter) { - await client.triggerTenantOrganizationSync(tenantId) - } else { - throw new Error('Unexpected search client type!') - } - } - async triggerRemoveOrganization(tenantId: string, organizationId: string) { const client = await this.getSearchSyncClient() diff --git a/services/libs/opensearch/src/service/organization.sync.service.ts b/services/libs/opensearch/src/service/organization.sync.service.ts index 741a75c2ef..d46c133ee4 100644 --- a/services/libs/opensearch/src/service/organization.sync.service.ts +++ b/services/libs/opensearch/src/service/organization.sync.service.ts @@ -293,9 +293,7 @@ export class OrganizationSyncService { 'sync-tenant-organizations', ) - this.log.info( - `Synced total of ${organizationCount} organizations with ${docCount} documents!`, - ) + this.log.info(`Synced total of ${organizationCount} organizations with ${docCount} documents!`) } public async syncOrganizations( From fb302969e694a8a9c1b8ec9760ae67fa8866bf27 Mon Sep 17 00:00:00 2001 From: Yeganathan S Date: Thu, 30 Jan 2025 14:17:09 +0530 Subject: [PATCH 10/26] fix workflow timeout --- pnpm-lock.yaml | 3 + .../script_executor_worker/src/activities.ts | 11 +++- .../src/activities/member-sync.ts | 26 -------- .../src/activities/sync/member.ts | 59 +++++++++++++++++++ .../script_executor_worker/src/workflows.ts | 4 +- .../src/workflows/syncAllMembers.ts | 24 -------- .../src/workflows/syncMembers.ts | 52 ++++++++++++++++ .../src/bin/sync-all-organizations.ts | 1 - services/libs/data-access-layer/package.json | 1 + .../data-access-layer/src/members/segments.ts | 1 + .../script_executor_worker/member.repo.ts | 17 ++++++ 11 files changed, 144 insertions(+), 55 deletions(-) delete mode 100644 services/apps/script_executor_worker/src/activities/member-sync.ts create mode 100644 services/apps/script_executor_worker/src/activities/sync/member.ts delete mode 100644 services/apps/script_executor_worker/src/workflows/syncAllMembers.ts create mode 100644 services/apps/script_executor_worker/src/workflows/syncMembers.ts diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 823538987a..6aabd0a7ed 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -1510,6 +1510,9 @@ importers: '@crowd/logging': specifier: workspace:* version: link:../logging + '@crowd/opensearch': + specifier: workspace:* + version: link:../opensearch '@crowd/questdb': specifier: workspace:* version: link:../questdb diff --git a/services/apps/script_executor_worker/src/activities.ts b/services/apps/script_executor_worker/src/activities.ts index 3867db6f63..b40303312e 100644 --- a/services/apps/script_executor_worker/src/activities.ts +++ b/services/apps/script_executor_worker/src/activities.ts @@ -17,11 +17,16 @@ import { isLfxMember, updateOrganizationIdentity, } from './activities/fix-organization-identities-with-wrong-urls' -import { deleteIndexedEntities, syncAllMembers } from './activities/member-sync' import { findMembersWithSamePlatformIdentitiesDifferentCapitalization, findMembersWithSameVerifiedEmailsInDifferentPlatforms, } from './activities/merge-members-with-similar-identities' +import { + deleteIndexedEntities, + getMembersForSync, + markEntitiesIndexed, + syncMembersBatch, +} from './activities/sync/member' export { findMembersWithSameVerifiedEmailsInDifferentPlatforms, @@ -39,6 +44,8 @@ export { updateOrganizationIdentity, deleteOrganizationIdentity, isLfxMember, - syncAllMembers, + syncMembersBatch, + getMembersForSync, deleteIndexedEntities, + markEntitiesIndexed, } diff --git a/services/apps/script_executor_worker/src/activities/member-sync.ts b/services/apps/script_executor_worker/src/activities/member-sync.ts deleted file mode 100644 index c3cf73902b..0000000000 --- a/services/apps/script_executor_worker/src/activities/member-sync.ts +++ /dev/null @@ -1,26 +0,0 @@ -import { DbStore } from '@crowd/data-access-layer/src/database' -import { MemberSyncService } from '@crowd/opensearch' -import { IndexedEntityType } from '@crowd/opensearch/src/repo/indexing.data' -import { IndexingRepository } from '@crowd/opensearch/src/repo/indexing.repo' - -import { svc } from '../main' - -export async function deleteIndexedEntities(): Promise { - const indexingRepo = new IndexingRepository(svc.postgres.writer, svc.log) - await indexingRepo.deleteIndexedEntities(IndexedEntityType.MEMBER) -} - -export async function syncAllMembers(batchSize: number, withAggs: boolean): Promise { - try { - const service = new MemberSyncService( - svc.redis, - svc.postgres.writer, - new DbStore(svc.log, svc.questdbSQL), - svc.opensearch, - svc.log, - ) - await service.syncAllMembers(batchSize, { withAggs }) - } catch (err) { - throw new Error(err) - } -} diff --git a/services/apps/script_executor_worker/src/activities/sync/member.ts b/services/apps/script_executor_worker/src/activities/sync/member.ts new file mode 100644 index 0000000000..4040812f2b --- /dev/null +++ b/services/apps/script_executor_worker/src/activities/sync/member.ts @@ -0,0 +1,59 @@ +import { DbStore } from '@crowd/data-access-layer/src/database' +import { MemberSyncService } from '@crowd/opensearch' +import { IndexedEntityType } from '@crowd/opensearch/src/repo/indexing.data' +import { IndexingRepository } from '@crowd/opensearch/src/repo/indexing.repo' +import { MemberRepository } from '@crowd/opensearch/src/repo/member.repo' + +import { svc } from '../../main' + +export async function deleteIndexedEntities(entityType: IndexedEntityType): Promise { + const indexingRepo = new IndexingRepository(svc.postgres.writer, svc.log) + await indexingRepo.deleteIndexedEntities(entityType) +} + +export async function markEntitiesIndexed( + entityType: IndexedEntityType, + entityIds: string[], +): Promise { + const indexingRepo = new IndexingRepository(svc.postgres.writer, svc.log) + await indexingRepo.markEntitiesIndexed(entityType, entityIds) +} + +export async function getMembersForSync(batchSize: number): Promise { + const memberRepo = new MemberRepository(svc.redis, svc.postgres.writer, svc.log) + return memberRepo.getMembersForSync(batchSize) +} + +export async function syncMembersBatch( + memberIds: string[], + withAggs: boolean, +): Promise<{ docCount: number; memberCount: number }> { + let docCount = 0 + let memberCount = 0 + + try { + const service = new MemberSyncService( + svc.redis, + svc.postgres.writer, + new DbStore(svc.log, svc.questdbSQL), + svc.opensearch, + svc.log, + ) + + for (const memberId of memberIds) { + const { membersSynced, documentsIndexed } = await service.syncMembers(memberId, { + withAggs, + }) + + docCount += documentsIndexed + memberCount += membersSynced + } + + return { + docCount, + memberCount, + } + } catch (err) { + throw new Error(err) + } +} diff --git a/services/apps/script_executor_worker/src/workflows.ts b/services/apps/script_executor_worker/src/workflows.ts index 9e5d852bd4..ad42ec6fe2 100644 --- a/services/apps/script_executor_worker/src/workflows.ts +++ b/services/apps/script_executor_worker/src/workflows.ts @@ -2,12 +2,12 @@ import { dissectMember } from './workflows/dissectMember' import { findAndMergeMembersWithSamePlatformIdentitiesDifferentCapitalization } from './workflows/findAndMergeMembersWithSamePlatformIdentitiesDifferentCapitalization' import { findAndMergeMembersWithSameVerifiedEmailsInDifferentPlatforms } from './workflows/findAndMergeMembersWithSameVerifiedEmailsInDifferentPlatforms' import { fixOrgIdentitiesWithWrongUrls } from './workflows/fixOrgIdentitiesWithWrongUrls' -import { syncAllMembers } from './workflows/syncAllMembers' +import { syncMembers } from './workflows/syncMembers' export { findAndMergeMembersWithSameVerifiedEmailsInDifferentPlatforms, findAndMergeMembersWithSamePlatformIdentitiesDifferentCapitalization, dissectMember, fixOrgIdentitiesWithWrongUrls, - syncAllMembers, + syncMembers, } diff --git a/services/apps/script_executor_worker/src/workflows/syncAllMembers.ts b/services/apps/script_executor_worker/src/workflows/syncAllMembers.ts deleted file mode 100644 index bd6ff073e5..0000000000 --- a/services/apps/script_executor_worker/src/workflows/syncAllMembers.ts +++ /dev/null @@ -1,24 +0,0 @@ -import { proxyActivities } from '@temporalio/workflow' - -import * as activities from '../activities/member-sync' -import { ISyncMembersArgs } from '../types' - -const activity = proxyActivities({ - startToCloseTimeout: '10 minute', - retry: { maximumAttempts: 3 }, -}) - -export async function syncAllMembers(args: ISyncMembersArgs): Promise { - const BATCH_SIZE = args.batchSize || 500 - const WITH_AGGS = args.withAggs || true - - console.log('Starting members sync!') - - if (args.clean) { - await activity.deleteIndexedEntities() - console.log('Deleted indexed entities for members!') - } - - await activity.syncAllMembers(BATCH_SIZE, WITH_AGGS) - console.log(`Synced all members!`) -} diff --git a/services/apps/script_executor_worker/src/workflows/syncMembers.ts b/services/apps/script_executor_worker/src/workflows/syncMembers.ts new file mode 100644 index 0000000000..b577749ac8 --- /dev/null +++ b/services/apps/script_executor_worker/src/workflows/syncMembers.ts @@ -0,0 +1,52 @@ +import { proxyActivities } from '@temporalio/workflow' + +import { IndexedEntityType } from '@crowd/opensearch/src/repo/indexing.data' + +import * as activities from '../activities/sync/member' +import { ISyncMembersArgs } from '../types' + +const activity = proxyActivities({ + startToCloseTimeout: '30 minute', + retry: { maximumAttempts: 3, backoffCoefficient: 3 }, +}) + +export async function syncMembers(args: ISyncMembersArgs): Promise { + const BATCH_SIZE = args.batchSize || 500 + const WITH_AGGS = args.withAggs || true + + console.log(`Starting members sync! (batchSize: ${BATCH_SIZE}, withAggs: ${WITH_AGGS})`) + + if (args.clean) { + await activity.deleteIndexedEntities(IndexedEntityType.MEMBER) + console.log('Deleted indexed entities for members!') + } + + let totalMembersSynced = 0 + let totalDocumentsIndexed = 0 + + const now = new Date() + + let memberIds = await activity.getMembersForSync(BATCH_SIZE) + + while (memberIds.length > 0) { + const { docCount, memberCount } = await activity.syncMembersBatch(memberIds, WITH_AGGS) + + totalMembersSynced += memberCount + totalDocumentsIndexed += docCount + + const diffInSeconds = (new Date().getTime() - now.getTime()) / 1000 + console.log( + `Synced ${memberCount} members! Speed: ${Math.round( + memberCount / diffInSeconds, + )} members/second!`, + ) + + await activity.markEntitiesIndexed(IndexedEntityType.MEMBER, memberIds) + + memberIds = await activity.getMembersForSync(BATCH_SIZE) + } + + console.log( + `Synced total of ${totalMembersSynced} members with ${totalDocumentsIndexed} documents!`, + ) +} diff --git a/services/apps/search_sync_worker/src/bin/sync-all-organizations.ts b/services/apps/search_sync_worker/src/bin/sync-all-organizations.ts index 6908b5011f..5257ab439c 100644 --- a/services/apps/search_sync_worker/src/bin/sync-all-organizations.ts +++ b/services/apps/search_sync_worker/src/bin/sync-all-organizations.ts @@ -68,4 +68,3 @@ setImmediate(async () => { process.exit(0) }) - diff --git a/services/libs/data-access-layer/package.json b/services/libs/data-access-layer/package.json index 819be3150d..7bacfccabd 100644 --- a/services/libs/data-access-layer/package.json +++ b/services/libs/data-access-layer/package.json @@ -16,6 +16,7 @@ "@crowd/redis": "workspace:*", "@crowd/queue": "workspace:*", "@crowd/telemetry": "workspace:*", + "@crowd/opensearch": "workspace:*", "@crowd/types": "workspace:*", "@questdb/nodejs-client": "~3.0.0", "html-to-text": "~9.0.5", diff --git a/services/libs/data-access-layer/src/members/segments.ts b/services/libs/data-access-layer/src/members/segments.ts index 56157985a7..dca0c33981 100644 --- a/services/libs/data-access-layer/src/members/segments.ts +++ b/services/libs/data-access-layer/src/members/segments.ts @@ -44,6 +44,7 @@ export async function insertMemberSegments(qx: QueryExecutor, data: IMemberSegme 'averageSentiment', ], data, + 'DO NOTHING', ), ) } catch (e) { diff --git a/services/libs/data-access-layer/src/old/apps/script_executor_worker/member.repo.ts b/services/libs/data-access-layer/src/old/apps/script_executor_worker/member.repo.ts index 126c76219d..c383e25f7f 100644 --- a/services/libs/data-access-layer/src/old/apps/script_executor_worker/member.repo.ts +++ b/services/libs/data-access-layer/src/old/apps/script_executor_worker/member.repo.ts @@ -1,5 +1,6 @@ import { DbConnection, DbTransaction } from '@crowd/database' import { Logger } from '@crowd/logging' +import { IndexedEntityType } from '@crowd/opensearch/src/repo/indexing.data' import { IMember } from '@crowd/types' import { IFindMemberIdentitiesGroupedByPlatformResult, ISimilarMember } from './types' @@ -159,6 +160,22 @@ class MemberRepository { return member } + + public async getMembersForSync(perPage: number): Promise { + const results = await this.connection.any( + ` + select m.id + from members m + left join indexed_entities ie on m.id = ie.entity_id and ie.type = $(type) + where ie.entity_id is null + limit ${perPage};`, + { + type: IndexedEntityType.MEMBER, + }, + ) + + return results.map((r) => r.id) + } } export default MemberRepository From 0b2795a5929c736dea878c386e0f70afb736d93c Mon Sep 17 00:00:00 2001 From: Yeganathan S Date: Thu, 30 Jan 2025 20:26:55 +0530 Subject: [PATCH 11/26] change default batchSize --- .../src/workflows/syncMembers.ts | 2 +- .../apps/script_executor_worker/member.repo.ts | 16 ++++++++++------ 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/services/apps/script_executor_worker/src/workflows/syncMembers.ts b/services/apps/script_executor_worker/src/workflows/syncMembers.ts index b577749ac8..7598163b36 100644 --- a/services/apps/script_executor_worker/src/workflows/syncMembers.ts +++ b/services/apps/script_executor_worker/src/workflows/syncMembers.ts @@ -11,7 +11,7 @@ const activity = proxyActivities({ }) export async function syncMembers(args: ISyncMembersArgs): Promise { - const BATCH_SIZE = args.batchSize || 500 + const BATCH_SIZE = args.batchSize || 100 const WITH_AGGS = args.withAggs || true console.log(`Starting members sync! (batchSize: ${BATCH_SIZE}, withAggs: ${WITH_AGGS})`) diff --git a/services/libs/data-access-layer/src/old/apps/script_executor_worker/member.repo.ts b/services/libs/data-access-layer/src/old/apps/script_executor_worker/member.repo.ts index c383e25f7f..f5b0b9e972 100644 --- a/services/libs/data-access-layer/src/old/apps/script_executor_worker/member.repo.ts +++ b/services/libs/data-access-layer/src/old/apps/script_executor_worker/member.repo.ts @@ -161,14 +161,18 @@ class MemberRepository { return member } - public async getMembersForSync(perPage: number): Promise { + public async getMembersNotInSegmentAggs(perPage: number): Promise { const results = await this.connection.any( ` - select m.id - from members m - left join indexed_entities ie on m.id = ie.entity_id and ie.type = $(type) - where ie.entity_id is null - limit ${perPage};`, + select distinct a."memberId" + from activities a + left join "memberSegmentsAgg" msa + on a."memberId" = msa."memberId" + and a."segmentId" = msa."segmentId" + where msa."memberId" is null + and msa."segmentId" is null + limit ${perPage}; + `, { type: IndexedEntityType.MEMBER, }, From 537d325bff2d3879de717cf0ab83ad802e4011c9 Mon Sep 17 00:00:00 2001 From: Yeganathan S Date: Fri, 31 Jan 2025 14:45:05 +0530 Subject: [PATCH 12/26] check if parallel processing works --- .../src/activities/sync/member.ts | 72 +++++++++++++++++-- .../apps/script_executor_worker/src/types.ts | 1 + .../src/workflows/syncMembers.ts | 5 ++ 3 files changed, 71 insertions(+), 7 deletions(-) diff --git a/services/apps/script_executor_worker/src/activities/sync/member.ts b/services/apps/script_executor_worker/src/activities/sync/member.ts index 4040812f2b..d4368efbee 100644 --- a/services/apps/script_executor_worker/src/activities/sync/member.ts +++ b/services/apps/script_executor_worker/src/activities/sync/member.ts @@ -28,8 +28,8 @@ export async function syncMembersBatch( memberIds: string[], withAggs: boolean, ): Promise<{ docCount: number; memberCount: number }> { - let docCount = 0 - let memberCount = 0 + let totalDocCount = 0 + let totalMemberCount = 0 try { const service = new MemberSyncService( @@ -40,18 +40,76 @@ export async function syncMembersBatch( svc.log, ) - for (const memberId of memberIds) { + // Split members into two groups + const midpoint = Math.floor(memberIds.length / 2) + const firstHalf = memberIds.slice(0, midpoint) + const secondHalf = memberIds.slice(midpoint) + + // Process first half sequentially + svc.log.info(`Processing first ${firstHalf.length} members sequentially...`) + const sequentialStartTime = Date.now() + let sequentialDocCount = 0 + let sequentialMemberCount = 0 + + for (const memberId of firstHalf) { const { membersSynced, documentsIndexed } = await service.syncMembers(memberId, { withAggs, }) + sequentialDocCount += documentsIndexed + sequentialMemberCount += membersSynced + } + + const sequentialEndTime = Date.now() + const sequentialDuration = (sequentialEndTime - sequentialStartTime) / 1000 + svc.log.info( + `Sequential processing of ${firstHalf.length} members took ${sequentialDuration.toFixed(2)}s (${( + sequentialMemberCount / sequentialDuration + ).toFixed(2)} members/s)`, + ) + + // Process second half in parallel with chunks of 5 + svc.log.info(`Processing second ${secondHalf.length} members in parallel...`) + const parallelStartTime = Date.now() + let parallelDocCount = 0 + let parallelMemberCount = 0 + const CHUNK_SIZE = 5 - docCount += documentsIndexed - memberCount += membersSynced + for (let i = 0; i < secondHalf.length; i += CHUNK_SIZE) { + const chunk = secondHalf.slice(i, i + CHUNK_SIZE) + const promises = chunk.map(async (memberId) => { + const result = await service.syncMembers(memberId, { withAggs }) + return result + }) + const results = await Promise.all(promises) + for (const result of results) { + parallelDocCount += result.documentsIndexed + parallelMemberCount += result.membersSynced + } } + const parallelEndTime = Date.now() + const parallelDuration = (parallelEndTime - parallelStartTime) / 1000 + svc.log.info( + `Parallel processing of ${secondHalf.length} members took ${parallelDuration.toFixed(2)}s (${( + parallelMemberCount / parallelDuration + ).toFixed(2)} members/s)`, + ) + + // Log performance comparison + const speedupRatio = sequentialDuration / parallelDuration + svc.log.info( + `Performance comparison for ${secondHalf.length} members each: + Sequential: ${sequentialDuration.toFixed(2)}s (${(sequentialMemberCount / sequentialDuration).toFixed(2)} members/s) + Parallel : ${parallelDuration.toFixed(2)}s (${(parallelMemberCount / parallelDuration).toFixed(2)} members/s) + Parallel processing was ${speedupRatio.toFixed(2)}x ${speedupRatio > 1 ? 'faster' : 'slower'}`, + ) + + totalDocCount = sequentialDocCount + parallelDocCount + totalMemberCount = sequentialMemberCount + parallelMemberCount + return { - docCount, - memberCount, + docCount: totalDocCount, + memberCount: totalMemberCount, } } catch (err) { throw new Error(err) diff --git a/services/apps/script_executor_worker/src/types.ts b/services/apps/script_executor_worker/src/types.ts index 6949e5f0d6..b07fd41810 100644 --- a/services/apps/script_executor_worker/src/types.ts +++ b/services/apps/script_executor_worker/src/types.ts @@ -31,4 +31,5 @@ export interface ISyncMembersArgs { batchSize?: number clean?: boolean withAggs?: boolean + testRun?: boolean } diff --git a/services/apps/script_executor_worker/src/workflows/syncMembers.ts b/services/apps/script_executor_worker/src/workflows/syncMembers.ts index 7598163b36..a7c8cbb577 100644 --- a/services/apps/script_executor_worker/src/workflows/syncMembers.ts +++ b/services/apps/script_executor_worker/src/workflows/syncMembers.ts @@ -43,6 +43,11 @@ export async function syncMembers(args: ISyncMembersArgs): Promise { await activity.markEntitiesIndexed(IndexedEntityType.MEMBER, memberIds) + if (args.testRun) { + console.log('Test run completed - stopping after first batch!') + break + } + memberIds = await activity.getMembersForSync(BATCH_SIZE) } From 4e45aa963f6ab6b89f42a937fc0b39454496fdf9 Mon Sep 17 00:00:00 2001 From: Yeganathan S Date: Fri, 31 Jan 2025 15:06:10 +0530 Subject: [PATCH 13/26] increase chunk size --- .../apps/script_executor_worker/src/activities/sync/member.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/apps/script_executor_worker/src/activities/sync/member.ts b/services/apps/script_executor_worker/src/activities/sync/member.ts index d4368efbee..cdb93a183a 100644 --- a/services/apps/script_executor_worker/src/activities/sync/member.ts +++ b/services/apps/script_executor_worker/src/activities/sync/member.ts @@ -72,7 +72,7 @@ export async function syncMembersBatch( const parallelStartTime = Date.now() let parallelDocCount = 0 let parallelMemberCount = 0 - const CHUNK_SIZE = 5 + const CHUNK_SIZE = 10 for (let i = 0; i < secondHalf.length; i += CHUNK_SIZE) { const chunk = secondHalf.slice(i, i + CHUNK_SIZE) From 6917563578e1fe5f6939e62883f3f7cb2fb6c656 Mon Sep 17 00:00:00 2001 From: Yeganathan S Date: Fri, 31 Jan 2025 16:21:37 +0530 Subject: [PATCH 14/26] parallel syncing --- pnpm-lock.yaml | 3 + .../apps/script_executor_worker/package.json | 1 + .../src/activities/sync/member.ts | 80 +++---------------- 3 files changed, 15 insertions(+), 69 deletions(-) diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 6aabd0a7ed..1e43a72351 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -1038,6 +1038,9 @@ importers: '@crowd/archetype-worker': specifier: workspace:* version: link:../../archetypes/worker + '@crowd/common': + specifier: workspace:* + version: link:../../libs/common '@crowd/data-access-layer': specifier: workspace:* version: link:../../libs/data-access-layer diff --git a/services/apps/script_executor_worker/package.json b/services/apps/script_executor_worker/package.json index bb436ec729..92980da32a 100644 --- a/services/apps/script_executor_worker/package.json +++ b/services/apps/script_executor_worker/package.json @@ -14,6 +14,7 @@ "dependencies": { "@crowd/archetype-standard": "workspace:*", "@crowd/archetype-worker": "workspace:*", + "@crowd/common": "workspace:*", "@crowd/data-access-layer": "workspace:*", "@crowd/logging": "workspace:*", "@crowd/opensearch": "workspace:*", diff --git a/services/apps/script_executor_worker/src/activities/sync/member.ts b/services/apps/script_executor_worker/src/activities/sync/member.ts index cdb93a183a..19d2629da2 100644 --- a/services/apps/script_executor_worker/src/activities/sync/member.ts +++ b/services/apps/script_executor_worker/src/activities/sync/member.ts @@ -1,3 +1,4 @@ +import { sumBy } from '@crowd/common' import { DbStore } from '@crowd/data-access-layer/src/database' import { MemberSyncService } from '@crowd/opensearch' import { IndexedEntityType } from '@crowd/opensearch/src/repo/indexing.data' @@ -28,9 +29,6 @@ export async function syncMembersBatch( memberIds: string[], withAggs: boolean, ): Promise<{ docCount: number; memberCount: number }> { - let totalDocCount = 0 - let totalMemberCount = 0 - try { const service = new MemberSyncService( svc.redis, @@ -40,76 +38,20 @@ export async function syncMembersBatch( svc.log, ) - // Split members into two groups - const midpoint = Math.floor(memberIds.length / 2) - const firstHalf = memberIds.slice(0, midpoint) - const secondHalf = memberIds.slice(midpoint) - - // Process first half sequentially - svc.log.info(`Processing first ${firstHalf.length} members sequentially...`) - const sequentialStartTime = Date.now() - let sequentialDocCount = 0 - let sequentialMemberCount = 0 - - for (const memberId of firstHalf) { - const { membersSynced, documentsIndexed } = await service.syncMembers(memberId, { - withAggs, - }) - sequentialDocCount += documentsIndexed - sequentialMemberCount += membersSynced - } - - const sequentialEndTime = Date.now() - const sequentialDuration = (sequentialEndTime - sequentialStartTime) / 1000 - svc.log.info( - `Sequential processing of ${firstHalf.length} members took ${sequentialDuration.toFixed(2)}s (${( - sequentialMemberCount / sequentialDuration - ).toFixed(2)} members/s)`, - ) - - // Process second half in parallel with chunks of 5 - svc.log.info(`Processing second ${secondHalf.length} members in parallel...`) - const parallelStartTime = Date.now() - let parallelDocCount = 0 - let parallelMemberCount = 0 + // Process in chunks of 10 members in parallel const CHUNK_SIZE = 10 - - for (let i = 0; i < secondHalf.length; i += CHUNK_SIZE) { - const chunk = secondHalf.slice(i, i + CHUNK_SIZE) - const promises = chunk.map(async (memberId) => { - const result = await service.syncMembers(memberId, { withAggs }) - return result - }) - const results = await Promise.all(promises) - for (const result of results) { - parallelDocCount += result.documentsIndexed - parallelMemberCount += result.membersSynced - } + const results = [] + for (let i = 0; i < memberIds.length; i += CHUNK_SIZE) { + const chunk = memberIds.slice(i, i + CHUNK_SIZE) + const chunkResults = await Promise.all( + chunk.map((memberId) => service.syncMembers(memberId, { withAggs })), + ) + results.push(...chunkResults) } - const parallelEndTime = Date.now() - const parallelDuration = (parallelEndTime - parallelStartTime) / 1000 - svc.log.info( - `Parallel processing of ${secondHalf.length} members took ${parallelDuration.toFixed(2)}s (${( - parallelMemberCount / parallelDuration - ).toFixed(2)} members/s)`, - ) - - // Log performance comparison - const speedupRatio = sequentialDuration / parallelDuration - svc.log.info( - `Performance comparison for ${secondHalf.length} members each: - Sequential: ${sequentialDuration.toFixed(2)}s (${(sequentialMemberCount / sequentialDuration).toFixed(2)} members/s) - Parallel : ${parallelDuration.toFixed(2)}s (${(parallelMemberCount / parallelDuration).toFixed(2)} members/s) - Parallel processing was ${speedupRatio.toFixed(2)}x ${speedupRatio > 1 ? 'faster' : 'slower'}`, - ) - - totalDocCount = sequentialDocCount + parallelDocCount - totalMemberCount = sequentialMemberCount + parallelMemberCount - return { - docCount: totalDocCount, - memberCount: totalMemberCount, + docCount: sumBy(results, (r) => r.documentsIndexed), + memberCount: sumBy(results, (r) => r.membersSynced), } } catch (err) { throw new Error(err) From 9b39513d8b0e2b8df6ce1ba83a0b5bf58e6a8967 Mon Sep 17 00:00:00 2001 From: Yeganathan S Date: Fri, 31 Jan 2025 18:09:28 +0530 Subject: [PATCH 15/26] customize chunkSize --- .../src/activities/sync/member.ts | 13 +++++++++++-- services/apps/script_executor_worker/src/types.ts | 1 + .../src/workflows/syncMembers.ts | 12 ++++++++---- 3 files changed, 20 insertions(+), 6 deletions(-) diff --git a/services/apps/script_executor_worker/src/activities/sync/member.ts b/services/apps/script_executor_worker/src/activities/sync/member.ts index 19d2629da2..2ee237e6d1 100644 --- a/services/apps/script_executor_worker/src/activities/sync/member.ts +++ b/services/apps/script_executor_worker/src/activities/sync/member.ts @@ -28,7 +28,9 @@ export async function getMembersForSync(batchSize: number): Promise { export async function syncMembersBatch( memberIds: string[], withAggs: boolean, + chunkSize?: number, ): Promise<{ docCount: number; memberCount: number }> { + const startTime = new Date() try { const service = new MemberSyncService( svc.redis, @@ -38,8 +40,10 @@ export async function syncMembersBatch( svc.log, ) - // Process in chunks of 10 members in parallel - const CHUNK_SIZE = 10 + const CHUNK_SIZE = chunkSize || 10 + + svc.log.info(`Syncing members in chunks of ${CHUNK_SIZE} members!`) + const results = [] for (let i = 0; i < memberIds.length; i += CHUNK_SIZE) { const chunk = memberIds.slice(i, i + CHUNK_SIZE) @@ -49,6 +53,11 @@ export async function syncMembersBatch( results.push(...chunkResults) } + const totalDiffInMinutes = (new Date().getTime() - startTime.getTime()) / (1000 * 60) + svc.log.info( + `Completed sync of ${memberIds.length} members in ${totalDiffInMinutes.toFixed(2)} minutes`, + ) + return { docCount: sumBy(results, (r) => r.documentsIndexed), memberCount: sumBy(results, (r) => r.membersSynced), diff --git a/services/apps/script_executor_worker/src/types.ts b/services/apps/script_executor_worker/src/types.ts index b07fd41810..d36fb62221 100644 --- a/services/apps/script_executor_worker/src/types.ts +++ b/services/apps/script_executor_worker/src/types.ts @@ -29,6 +29,7 @@ export interface IFixOrgIdentitiesWithWrongUrlsArgs { export interface ISyncMembersArgs { batchSize?: number + chunkSize?: number clean?: boolean withAggs?: boolean testRun?: boolean diff --git a/services/apps/script_executor_worker/src/workflows/syncMembers.ts b/services/apps/script_executor_worker/src/workflows/syncMembers.ts index a7c8cbb577..864ea03b9c 100644 --- a/services/apps/script_executor_worker/src/workflows/syncMembers.ts +++ b/services/apps/script_executor_worker/src/workflows/syncMembers.ts @@ -29,16 +29,20 @@ export async function syncMembers(args: ISyncMembersArgs): Promise { let memberIds = await activity.getMembersForSync(BATCH_SIZE) while (memberIds.length > 0) { - const { docCount, memberCount } = await activity.syncMembersBatch(memberIds, WITH_AGGS) + const { docCount, memberCount } = await activity.syncMembersBatch( + memberIds, + WITH_AGGS, + args.chunkSize, + ) totalMembersSynced += memberCount totalDocumentsIndexed += docCount - const diffInSeconds = (new Date().getTime() - now.getTime()) / 1000 + const diffInMinutes = (new Date().getTime() - now.getTime()) / (1000 * 60) console.log( `Synced ${memberCount} members! Speed: ${Math.round( - memberCount / diffInSeconds, - )} members/second!`, + memberCount / diffInMinutes, + )} members/minute!`, ) await activity.markEntitiesIndexed(IndexedEntityType.MEMBER, memberIds) From 5a2cdd752130f8b93f89b5bfea6edc981e27d737 Mon Sep 17 00:00:00 2001 From: Yeganathan S Date: Fri, 31 Jan 2025 23:00:01 +0530 Subject: [PATCH 16/26] improve --- .../script_executor_worker/src/activities/sync/member.ts | 6 ------ .../script_executor_worker/src/workflows/syncMembers.ts | 8 +++----- 2 files changed, 3 insertions(+), 11 deletions(-) diff --git a/services/apps/script_executor_worker/src/activities/sync/member.ts b/services/apps/script_executor_worker/src/activities/sync/member.ts index 2ee237e6d1..8a923abf0c 100644 --- a/services/apps/script_executor_worker/src/activities/sync/member.ts +++ b/services/apps/script_executor_worker/src/activities/sync/member.ts @@ -30,7 +30,6 @@ export async function syncMembersBatch( withAggs: boolean, chunkSize?: number, ): Promise<{ docCount: number; memberCount: number }> { - const startTime = new Date() try { const service = new MemberSyncService( svc.redis, @@ -53,11 +52,6 @@ export async function syncMembersBatch( results.push(...chunkResults) } - const totalDiffInMinutes = (new Date().getTime() - startTime.getTime()) / (1000 * 60) - svc.log.info( - `Completed sync of ${memberIds.length} members in ${totalDiffInMinutes.toFixed(2)} minutes`, - ) - return { docCount: sumBy(results, (r) => r.documentsIndexed), memberCount: sumBy(results, (r) => r.membersSynced), diff --git a/services/apps/script_executor_worker/src/workflows/syncMembers.ts b/services/apps/script_executor_worker/src/workflows/syncMembers.ts index 864ea03b9c..de4f4f7d7d 100644 --- a/services/apps/script_executor_worker/src/workflows/syncMembers.ts +++ b/services/apps/script_executor_worker/src/workflows/syncMembers.ts @@ -39,11 +39,9 @@ export async function syncMembers(args: ISyncMembersArgs): Promise { totalDocumentsIndexed += docCount const diffInMinutes = (new Date().getTime() - now.getTime()) / (1000 * 60) - console.log( - `Synced ${memberCount} members! Speed: ${Math.round( - memberCount / diffInMinutes, - )} members/minute!`, - ) + const speed = Math.round(totalMembersSynced / diffInMinutes) + + console.log(`Synced ${memberCount} members! Speed: ${speed} members/minute!`) await activity.markEntitiesIndexed(IndexedEntityType.MEMBER, memberIds) From a9031af723ebff865b64a1db024a4e6dfe8cfdf2 Mon Sep 17 00:00:00 2001 From: Yeganathan S Date: Fri, 31 Jan 2025 23:14:02 +0530 Subject: [PATCH 17/26] fix speed calculation --- .../script_executor_worker/src/workflows/syncMembers.ts | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/services/apps/script_executor_worker/src/workflows/syncMembers.ts b/services/apps/script_executor_worker/src/workflows/syncMembers.ts index de4f4f7d7d..a8930da428 100644 --- a/services/apps/script_executor_worker/src/workflows/syncMembers.ts +++ b/services/apps/script_executor_worker/src/workflows/syncMembers.ts @@ -24,11 +24,10 @@ export async function syncMembers(args: ISyncMembersArgs): Promise { let totalMembersSynced = 0 let totalDocumentsIndexed = 0 - const now = new Date() - let memberIds = await activity.getMembersForSync(BATCH_SIZE) while (memberIds.length > 0) { + const batchStartTime = Date.now() const { docCount, memberCount } = await activity.syncMembersBatch( memberIds, WITH_AGGS, @@ -38,8 +37,7 @@ export async function syncMembers(args: ISyncMembersArgs): Promise { totalMembersSynced += memberCount totalDocumentsIndexed += docCount - const diffInMinutes = (new Date().getTime() - now.getTime()) / (1000 * 60) - const speed = Math.round(totalMembersSynced / diffInMinutes) + const speed = Math.round(memberCount / ((Date.now() - batchStartTime) / 60000)) console.log(`Synced ${memberCount} members! Speed: ${speed} members/minute!`) From 053ce54050d21e06300e5d7bb8b276846a9a2b71 Mon Sep 17 00:00:00 2001 From: Yeganathan S Date: Fri, 31 Jan 2025 23:24:18 +0530 Subject: [PATCH 18/26] fix calc --- .../src/workflows/syncMembers.ts | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/services/apps/script_executor_worker/src/workflows/syncMembers.ts b/services/apps/script_executor_worker/src/workflows/syncMembers.ts index a8930da428..7a6288c5d7 100644 --- a/services/apps/script_executor_worker/src/workflows/syncMembers.ts +++ b/services/apps/script_executor_worker/src/workflows/syncMembers.ts @@ -27,7 +27,7 @@ export async function syncMembers(args: ISyncMembersArgs): Promise { let memberIds = await activity.getMembersForSync(BATCH_SIZE) while (memberIds.length > 0) { - const batchStartTime = Date.now() + const batchStartTime = new Date() const { docCount, memberCount } = await activity.syncMembersBatch( memberIds, WITH_AGGS, @@ -37,9 +37,13 @@ export async function syncMembers(args: ISyncMembersArgs): Promise { totalMembersSynced += memberCount totalDocumentsIndexed += docCount - const speed = Math.round(memberCount / ((Date.now() - batchStartTime) / 60000)) + const diffInSeconds = (new Date().getTime() - batchStartTime.getTime()) / 1000 - console.log(`Synced ${memberCount} members! Speed: ${speed} members/minute!`) + console.log( + `Synced ${memberCount} members! Speed: ${Math.round( + memberCount / diffInSeconds, + )} members/second!`, + ) await activity.markEntitiesIndexed(IndexedEntityType.MEMBER, memberIds) From dfd7bc2d11d948759221835bd429276168bbc9cb Mon Sep 17 00:00:00 2001 From: Yeganathan S Date: Mon, 3 Feb 2025 16:04:01 +0530 Subject: [PATCH 19/26] fix linter --- services/libs/queue/src/vendors/kafka/client.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/libs/queue/src/vendors/kafka/client.ts b/services/libs/queue/src/vendors/kafka/client.ts index 770b229510..af050902f5 100644 --- a/services/libs/queue/src/vendors/kafka/client.ts +++ b/services/libs/queue/src/vendors/kafka/client.ts @@ -383,7 +383,7 @@ export class KafkaQueueService extends LoggerBase implements IQueue { await consumer.run({ eachMessage: async ({ message }) => { if (message && message.value) { - let startWait = performance.now() + const startWait = performance.now() while (!this.isAvailable(maxConcurrentMessageProcessing)) { const diff = performance.now() - startWait From 9acb673e22a8ede7c7a45d31aad48df5c5c62616 Mon Sep 17 00:00:00 2001 From: Yeganathan S Date: Mon, 3 Feb 2025 19:08:13 +0530 Subject: [PATCH 20/26] rm tenantId column from indexedEntities --- .../migrations/V1738589807__rmTenantIdinIndexedEntities.sql | 1 + 1 file changed, 1 insertion(+) create mode 100644 backend/src/database/migrations/V1738589807__rmTenantIdinIndexedEntities.sql diff --git a/backend/src/database/migrations/V1738589807__rmTenantIdinIndexedEntities.sql b/backend/src/database/migrations/V1738589807__rmTenantIdinIndexedEntities.sql new file mode 100644 index 0000000000..42cfe8aca6 --- /dev/null +++ b/backend/src/database/migrations/V1738589807__rmTenantIdinIndexedEntities.sql @@ -0,0 +1 @@ +alter table indexed_entities drop column tenant_id; \ No newline at end of file From be017ecba750527a4b752d8b71fbf1c3aa3d465d Mon Sep 17 00:00:00 2001 From: Yeganathan S Date: Mon, 10 Feb 2025 13:49:57 +0530 Subject: [PATCH 21/26] use pg reader instance for getMembersForSync --- .../apps/script_executor_worker/src/activities/sync/member.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/apps/script_executor_worker/src/activities/sync/member.ts b/services/apps/script_executor_worker/src/activities/sync/member.ts index 8a923abf0c..5a178c5822 100644 --- a/services/apps/script_executor_worker/src/activities/sync/member.ts +++ b/services/apps/script_executor_worker/src/activities/sync/member.ts @@ -21,7 +21,7 @@ export async function markEntitiesIndexed( } export async function getMembersForSync(batchSize: number): Promise { - const memberRepo = new MemberRepository(svc.redis, svc.postgres.writer, svc.log) + const memberRepo = new MemberRepository(svc.redis, svc.postgres.reader, svc.log) return memberRepo.getMembersForSync(batchSize) } From f96f5c62b11b51cbc5132f48d381889b3699a43c Mon Sep 17 00:00:00 2001 From: Yeganathan S Date: Wed, 12 Feb 2025 14:21:49 +0530 Subject: [PATCH 22/26] use continueAsNew instead of looping --- .../src/workflows/syncMembers.ts | 50 +++++++------------ 1 file changed, 18 insertions(+), 32 deletions(-) diff --git a/services/apps/script_executor_worker/src/workflows/syncMembers.ts b/services/apps/script_executor_worker/src/workflows/syncMembers.ts index 7a6288c5d7..9b5d58033a 100644 --- a/services/apps/script_executor_worker/src/workflows/syncMembers.ts +++ b/services/apps/script_executor_worker/src/workflows/syncMembers.ts @@ -1,4 +1,4 @@ -import { proxyActivities } from '@temporalio/workflow' +import { continueAsNew, proxyActivities } from '@temporalio/workflow' import { IndexedEntityType } from '@crowd/opensearch/src/repo/indexing.data' @@ -14,48 +14,34 @@ export async function syncMembers(args: ISyncMembersArgs): Promise { const BATCH_SIZE = args.batchSize || 100 const WITH_AGGS = args.withAggs || true - console.log(`Starting members sync! (batchSize: ${BATCH_SIZE}, withAggs: ${WITH_AGGS})`) - if (args.clean) { await activity.deleteIndexedEntities(IndexedEntityType.MEMBER) console.log('Deleted indexed entities for members!') } - let totalMembersSynced = 0 - let totalDocumentsIndexed = 0 - - let memberIds = await activity.getMembersForSync(BATCH_SIZE) + const memberIds = await activity.getMembersForSync(BATCH_SIZE) - while (memberIds.length > 0) { - const batchStartTime = new Date() - const { docCount, memberCount } = await activity.syncMembersBatch( - memberIds, - WITH_AGGS, - args.chunkSize, - ) - - totalMembersSynced += memberCount - totalDocumentsIndexed += docCount + if (memberIds.length === 0) { + console.log('No more members to sync!') + return + } - const diffInSeconds = (new Date().getTime() - batchStartTime.getTime()) / 1000 + const batchStartTime = new Date() + const { memberCount } = await activity.syncMembersBatch(memberIds, WITH_AGGS, args.chunkSize) - console.log( - `Synced ${memberCount} members! Speed: ${Math.round( - memberCount / diffInSeconds, - )} members/second!`, - ) + const diffInSeconds = (new Date().getTime() - batchStartTime.getTime()) / 1000 - await activity.markEntitiesIndexed(IndexedEntityType.MEMBER, memberIds) + console.log( + `Synced ${memberCount} members! Speed: ${Math.round(memberCount / diffInSeconds)} members/second!`, + ) - if (args.testRun) { - console.log('Test run completed - stopping after first batch!') - break - } + await activity.markEntitiesIndexed(IndexedEntityType.MEMBER, memberIds) - memberIds = await activity.getMembersForSync(BATCH_SIZE) + if (args.testRun) { + console.log('Test run completed - stopping after first batch!') + return } - console.log( - `Synced total of ${totalMembersSynced} members with ${totalDocumentsIndexed} documents!`, - ) + // Continue as new for the next batch + await continueAsNew(args) } From 927bc3325dde3eb0cfb09cfc4e3b9ce068f00531 Mon Sep 17 00:00:00 2001 From: Yeganathan S Date: Wed, 12 Feb 2025 14:42:57 +0530 Subject: [PATCH 23/26] rm index --- .../migrations/V1738589807__rmTenantIdinIndexedEntities.sql | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/backend/src/database/migrations/V1738589807__rmTenantIdinIndexedEntities.sql b/backend/src/database/migrations/V1738589807__rmTenantIdinIndexedEntities.sql index 42cfe8aca6..f94564c1f3 100644 --- a/backend/src/database/migrations/V1738589807__rmTenantIdinIndexedEntities.sql +++ b/backend/src/database/migrations/V1738589807__rmTenantIdinIndexedEntities.sql @@ -1 +1,4 @@ -alter table indexed_entities drop column tenant_id; \ No newline at end of file +drop index if exists ix_indexed_entities_tenant; + +alter table indexed_entities + drop column tenant_id; \ No newline at end of file From 079a1e8dadb6a4f4dc1b1162e0affda576eb6317 Mon Sep 17 00:00:00 2001 From: Yeganathan S Date: Mon, 17 Feb 2025 19:42:23 +0530 Subject: [PATCH 24/26] optimize the db query and check --- .../src/workflows/syncMembers.ts | 2 +- .../libs/opensearch/src/repo/member.repo.ts | 18 +++++++++++------- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/services/apps/script_executor_worker/src/workflows/syncMembers.ts b/services/apps/script_executor_worker/src/workflows/syncMembers.ts index 9b5d58033a..5c8a39b6fb 100644 --- a/services/apps/script_executor_worker/src/workflows/syncMembers.ts +++ b/services/apps/script_executor_worker/src/workflows/syncMembers.ts @@ -6,7 +6,7 @@ import * as activities from '../activities/sync/member' import { ISyncMembersArgs } from '../types' const activity = proxyActivities({ - startToCloseTimeout: '30 minute', + startToCloseTimeout: '45 minutes', retry: { maximumAttempts: 3, backoffCoefficient: 3 }, }) diff --git a/services/libs/opensearch/src/repo/member.repo.ts b/services/libs/opensearch/src/repo/member.repo.ts index 5cbffa15bb..548e0dab44 100644 --- a/services/libs/opensearch/src/repo/member.repo.ts +++ b/services/libs/opensearch/src/repo/member.repo.ts @@ -42,19 +42,23 @@ export class MemberRepository extends RepositoryBase { } public async getMembersForSync(perPage: number): Promise { - // eslint-disable-next-line @typescript-eslint/no-explicit-any const results = await this.db().any( ` - select m.id - from members m - left join indexed_entities ie on m.id = ie.entity_id and ie.type = $(type) - where ie.entity_id is null - limit ${perPage};`, + SELECT m.id + FROM members m + WHERE NOT EXISTS ( + SELECT 1 + FROM indexed_entities ie + WHERE ie.entity_id = m.id + AND ie.type = $(type) + ) + ORDER BY m.id + LIMIT ${perPage}; + `, { type: IndexedEntityType.MEMBER, }, ) - return results.map((r) => r.id) } From 1151b8d839ee6a6a68c1efa751793615c23987da Mon Sep 17 00:00:00 2001 From: Yeganathan S Date: Mon, 17 Feb 2025 21:05:06 +0530 Subject: [PATCH 25/26] Add index --- ... => V1738589807__rmTenantIdAndAddIndexInIndexedEntities.sql} | 2 ++ 1 file changed, 2 insertions(+) rename backend/src/database/migrations/{V1738589807__rmTenantIdinIndexedEntities.sql => V1738589807__rmTenantIdAndAddIndexInIndexedEntities.sql} (50%) diff --git a/backend/src/database/migrations/V1738589807__rmTenantIdinIndexedEntities.sql b/backend/src/database/migrations/V1738589807__rmTenantIdAndAddIndexInIndexedEntities.sql similarity index 50% rename from backend/src/database/migrations/V1738589807__rmTenantIdinIndexedEntities.sql rename to backend/src/database/migrations/V1738589807__rmTenantIdAndAddIndexInIndexedEntities.sql index f94564c1f3..ebfd72d4c9 100644 --- a/backend/src/database/migrations/V1738589807__rmTenantIdinIndexedEntities.sql +++ b/backend/src/database/migrations/V1738589807__rmTenantIdAndAddIndexInIndexedEntities.sql @@ -1,3 +1,5 @@ +create index if not exists ix_indexed_entities_type_entity_id on indexed_entities (type, entity_id); + drop index if exists ix_indexed_entities_tenant; alter table indexed_entities From 70589ec4a0a1d941e1bdd63b4f1bcfc6ffca8d48 Mon Sep 17 00:00:00 2001 From: Yeganathan S Date: Mon, 17 Feb 2025 21:09:48 +0530 Subject: [PATCH 26/26] rename migration --- ...7__remove-tenantId-and-modify-indexes-on-indexed-entities.sql} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename backend/src/database/migrations/{V1738589807__rmTenantIdAndAddIndexInIndexedEntities.sql => V1738589807__remove-tenantId-and-modify-indexes-on-indexed-entities.sql} (100%) diff --git a/backend/src/database/migrations/V1738589807__rmTenantIdAndAddIndexInIndexedEntities.sql b/backend/src/database/migrations/V1738589807__remove-tenantId-and-modify-indexes-on-indexed-entities.sql similarity index 100% rename from backend/src/database/migrations/V1738589807__rmTenantIdAndAddIndexInIndexedEntities.sql rename to backend/src/database/migrations/V1738589807__remove-tenantId-and-modify-indexes-on-indexed-entities.sql