diff --git a/backend/src/database/migrations/V1738589807__remove-tenantId-and-modify-indexes-on-indexed-entities.sql b/backend/src/database/migrations/V1738589807__remove-tenantId-and-modify-indexes-on-indexed-entities.sql new file mode 100644 index 0000000000..ebfd72d4c9 --- /dev/null +++ b/backend/src/database/migrations/V1738589807__remove-tenantId-and-modify-indexes-on-indexed-entities.sql @@ -0,0 +1,6 @@ +create index if not exists ix_indexed_entities_type_entity_id on indexed_entities (type, entity_id); + +drop index if exists ix_indexed_entities_tenant; + +alter table indexed_entities + drop column tenant_id; \ No newline at end of file diff --git a/backend/src/services/searchSyncService.ts b/backend/src/services/searchSyncService.ts index 95827025fb..ceea52f3ab 100644 --- a/backend/src/services/searchSyncService.ts +++ b/backend/src/services/searchSyncService.ts @@ -63,16 +63,6 @@ export default class SearchSyncService extends LoggerBase { } } - async triggerTenantMembersSync(tenantId: string) { - const client = await this.getSearchSyncClient() - - if (client instanceof SearchSyncApiClient || client instanceof SearchSyncWorkerEmitter) { - await client.triggerTenantMembersSync(tenantId) - } else { - throw new Error('Unexpected search client type!') - } - } - async triggerOrganizationMembersSync(tenantId: string, organizationId: string) { const client = await this.getSearchSyncClient() @@ -126,16 +116,6 @@ export default class SearchSyncService extends LoggerBase { } } - async triggerTenantOrganizationSync(tenantId: string) { - const client = await this.getSearchSyncClient() - - if (client instanceof SearchSyncApiClient || client instanceof SearchSyncWorkerEmitter) { - await client.triggerTenantOrganizationSync(tenantId) - } else { - throw new Error('Unexpected search client type!') - } - } - async triggerRemoveOrganization(tenantId: string, organizationId: string) { const client = await this.getSearchSyncClient() diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index ee5dbacb9c..a925a2b0f5 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -1050,12 +1050,18 @@ importers: '@crowd/archetype-worker': specifier: workspace:* version: link:../../archetypes/worker + '@crowd/common': + specifier: workspace:* + version: link:../../libs/common '@crowd/data-access-layer': specifier: workspace:* version: link:../../libs/data-access-layer '@crowd/logging': specifier: workspace:* version: link:../../libs/logging + '@crowd/opensearch': + specifier: workspace:* + version: link:../../libs/opensearch '@crowd/redis': specifier: workspace:* version: link:../../libs/redis @@ -1519,6 +1525,9 @@ importers: '@crowd/logging': specifier: workspace:* version: link:../logging + '@crowd/opensearch': + specifier: workspace:* + version: link:../opensearch '@crowd/questdb': specifier: workspace:* version: link:../questdb diff --git a/services/apps/script_executor_worker/package.json b/services/apps/script_executor_worker/package.json index 713abab5cb..92980da32a 100644 --- a/services/apps/script_executor_worker/package.json +++ b/services/apps/script_executor_worker/package.json @@ -14,8 +14,10 @@ "dependencies": { "@crowd/archetype-standard": "workspace:*", "@crowd/archetype-worker": "workspace:*", + "@crowd/common": "workspace:*", "@crowd/data-access-layer": "workspace:*", "@crowd/logging": "workspace:*", + "@crowd/opensearch": "workspace:*", "@crowd/redis": "workspace:*", "@crowd/types": "workspace:*", "@temporalio/workflow": "~1.11.1", diff --git a/services/apps/script_executor_worker/src/activities.ts b/services/apps/script_executor_worker/src/activities.ts index 0ec6f3703c..b40303312e 100644 --- a/services/apps/script_executor_worker/src/activities.ts +++ b/services/apps/script_executor_worker/src/activities.ts @@ -21,6 +21,12 @@ import { findMembersWithSamePlatformIdentitiesDifferentCapitalization, findMembersWithSameVerifiedEmailsInDifferentPlatforms, } from './activities/merge-members-with-similar-identities' +import { + deleteIndexedEntities, + getMembersForSync, + markEntitiesIndexed, + syncMembersBatch, +} from './activities/sync/member' export { findMembersWithSameVerifiedEmailsInDifferentPlatforms, @@ -38,4 +44,8 @@ export { updateOrganizationIdentity, deleteOrganizationIdentity, isLfxMember, + syncMembersBatch, + getMembersForSync, + deleteIndexedEntities, + markEntitiesIndexed, } diff --git a/services/apps/script_executor_worker/src/activities/sync/member.ts b/services/apps/script_executor_worker/src/activities/sync/member.ts new file mode 100644 index 0000000000..5a178c5822 --- /dev/null +++ b/services/apps/script_executor_worker/src/activities/sync/member.ts @@ -0,0 +1,62 @@ +import { sumBy } from '@crowd/common' +import { DbStore } from '@crowd/data-access-layer/src/database' +import { MemberSyncService } from '@crowd/opensearch' +import { IndexedEntityType } from '@crowd/opensearch/src/repo/indexing.data' +import { IndexingRepository } from '@crowd/opensearch/src/repo/indexing.repo' +import { MemberRepository } from '@crowd/opensearch/src/repo/member.repo' + +import { svc } from '../../main' + +export async function deleteIndexedEntities(entityType: IndexedEntityType): Promise { + const indexingRepo = new IndexingRepository(svc.postgres.writer, svc.log) + await indexingRepo.deleteIndexedEntities(entityType) +} + +export async function markEntitiesIndexed( + entityType: IndexedEntityType, + entityIds: string[], +): Promise { + const indexingRepo = new IndexingRepository(svc.postgres.writer, svc.log) + await indexingRepo.markEntitiesIndexed(entityType, entityIds) +} + +export async function getMembersForSync(batchSize: number): Promise { + const memberRepo = new MemberRepository(svc.redis, svc.postgres.reader, svc.log) + return memberRepo.getMembersForSync(batchSize) +} + +export async function syncMembersBatch( + memberIds: string[], + withAggs: boolean, + chunkSize?: number, +): Promise<{ docCount: number; memberCount: number }> { + try { + const service = new MemberSyncService( + svc.redis, + svc.postgres.writer, + new DbStore(svc.log, svc.questdbSQL), + svc.opensearch, + svc.log, + ) + + const CHUNK_SIZE = chunkSize || 10 + + svc.log.info(`Syncing members in chunks of ${CHUNK_SIZE} members!`) + + const results = [] + for (let i = 0; i < memberIds.length; i += CHUNK_SIZE) { + const chunk = memberIds.slice(i, i + CHUNK_SIZE) + const chunkResults = await Promise.all( + chunk.map((memberId) => service.syncMembers(memberId, { withAggs })), + ) + results.push(...chunkResults) + } + + return { + docCount: sumBy(results, (r) => r.documentsIndexed), + memberCount: sumBy(results, (r) => r.membersSynced), + } + } catch (err) { + throw new Error(err) + } +} diff --git a/services/apps/script_executor_worker/src/main.ts b/services/apps/script_executor_worker/src/main.ts index 8ae0530519..ae773f0360 100644 --- a/services/apps/script_executor_worker/src/main.ts +++ b/services/apps/script_executor_worker/src/main.ts @@ -10,7 +10,7 @@ const config: Config = { enabled: true, }, questdb: { - enabled: false, + enabled: true, }, redis: { enabled: true, @@ -22,7 +22,7 @@ const options: Options = { enabled: true, }, opensearch: { - enabled: false, + enabled: true, }, } diff --git a/services/apps/script_executor_worker/src/types.ts b/services/apps/script_executor_worker/src/types.ts index 5447633312..d36fb62221 100644 --- a/services/apps/script_executor_worker/src/types.ts +++ b/services/apps/script_executor_worker/src/types.ts @@ -26,3 +26,11 @@ export interface IFixOrgIdentitiesWithWrongUrlsArgs { tenantId: string testRun?: boolean } + +export interface ISyncMembersArgs { + batchSize?: number + chunkSize?: number + clean?: boolean + withAggs?: boolean + testRun?: boolean +} diff --git a/services/apps/script_executor_worker/src/workflows.ts b/services/apps/script_executor_worker/src/workflows.ts index 783ed8e00c..ad42ec6fe2 100644 --- a/services/apps/script_executor_worker/src/workflows.ts +++ b/services/apps/script_executor_worker/src/workflows.ts @@ -2,10 +2,12 @@ import { dissectMember } from './workflows/dissectMember' import { findAndMergeMembersWithSamePlatformIdentitiesDifferentCapitalization } from './workflows/findAndMergeMembersWithSamePlatformIdentitiesDifferentCapitalization' import { findAndMergeMembersWithSameVerifiedEmailsInDifferentPlatforms } from './workflows/findAndMergeMembersWithSameVerifiedEmailsInDifferentPlatforms' import { fixOrgIdentitiesWithWrongUrls } from './workflows/fixOrgIdentitiesWithWrongUrls' +import { syncMembers } from './workflows/syncMembers' export { findAndMergeMembersWithSameVerifiedEmailsInDifferentPlatforms, findAndMergeMembersWithSamePlatformIdentitiesDifferentCapitalization, dissectMember, fixOrgIdentitiesWithWrongUrls, + syncMembers, } diff --git a/services/apps/script_executor_worker/src/workflows/syncMembers.ts b/services/apps/script_executor_worker/src/workflows/syncMembers.ts new file mode 100644 index 0000000000..5c8a39b6fb --- /dev/null +++ b/services/apps/script_executor_worker/src/workflows/syncMembers.ts @@ -0,0 +1,47 @@ +import { continueAsNew, proxyActivities } from '@temporalio/workflow' + +import { IndexedEntityType } from '@crowd/opensearch/src/repo/indexing.data' + +import * as activities from '../activities/sync/member' +import { ISyncMembersArgs } from '../types' + +const activity = proxyActivities({ + startToCloseTimeout: '45 minutes', + retry: { maximumAttempts: 3, backoffCoefficient: 3 }, +}) + +export async function syncMembers(args: ISyncMembersArgs): Promise { + const BATCH_SIZE = args.batchSize || 100 + const WITH_AGGS = args.withAggs || true + + if (args.clean) { + await activity.deleteIndexedEntities(IndexedEntityType.MEMBER) + console.log('Deleted indexed entities for members!') + } + + const memberIds = await activity.getMembersForSync(BATCH_SIZE) + + if (memberIds.length === 0) { + console.log('No more members to sync!') + return + } + + const batchStartTime = new Date() + const { memberCount } = await activity.syncMembersBatch(memberIds, WITH_AGGS, args.chunkSize) + + const diffInSeconds = (new Date().getTime() - batchStartTime.getTime()) / 1000 + + console.log( + `Synced ${memberCount} members! Speed: ${Math.round(memberCount / diffInSeconds)} members/second!`, + ) + + await activity.markEntitiesIndexed(IndexedEntityType.MEMBER, memberIds) + + if (args.testRun) { + console.log('Test run completed - stopping after first batch!') + return + } + + // Continue as new for the next batch + await continueAsNew(args) +} diff --git a/services/apps/search_sync_api/src/routes/member.ts b/services/apps/search_sync_api/src/routes/member.ts index f96e2072e6..45a8fff5e9 100644 --- a/services/apps/search_sync_api/src/routes/member.ts +++ b/services/apps/search_sync_api/src/routes/member.ts @@ -27,23 +27,6 @@ router.post( }), ) -router.post( - '/sync/tenant/members', - asyncWrap(async (req: ApiRequest, res) => { - const memberSyncService = syncService(req) - - const { tenantId } = req.body - try { - req.log.trace(`Calling memberSyncService.syncTenantMembers for tenant ${tenantId}`) - await memberSyncService.syncTenantMembers(tenantId) - res.sendStatus(200) - } catch (error) { - req.log.error(error) - res.status(500).send(error.message) - } - }), -) - router.post( '/sync/organization/members', asyncWrap(async (req: ApiRequest, res) => { diff --git a/services/apps/search_sync_api/src/routes/organization.ts b/services/apps/search_sync_api/src/routes/organization.ts index 309e6fd7fd..5ce1acbed5 100644 --- a/services/apps/search_sync_api/src/routes/organization.ts +++ b/services/apps/search_sync_api/src/routes/organization.ts @@ -28,25 +28,6 @@ router.post( }), ) -router.post( - '/sync/tenant/organizations', - asyncWrap(async (req: ApiRequest, res) => { - const organizationSyncService = syncService(req) - - const { tenantId } = req.body - try { - req.log.trace( - `Calling organizationSyncService.syncTenantOrganizations for tenant ${tenantId}`, - ) - await organizationSyncService.syncTenantOrganizations(tenantId) - res.sendStatus(200) - } catch (error) { - req.log.error(error) - res.status(500).send(error.message) - } - }), -) - router.post( '/cleanup/tenant/organizations', asyncWrap(async (req: ApiRequest, res) => { diff --git a/services/apps/search_sync_worker/src/bin/sync-all-members.ts b/services/apps/search_sync_worker/src/bin/sync-all-members.ts index 3669243eec..a6a2d25baa 100644 --- a/services/apps/search_sync_worker/src/bin/sync-all-members.ts +++ b/services/apps/search_sync_worker/src/bin/sync-all-members.ts @@ -1,6 +1,4 @@ -import { timeout } from '@crowd/common' import { DbStore, getDbConnection } from '@crowd/data-access-layer/src/database' -import { MemberRepository } from '@crowd/data-access-layer/src/old/apps/search_sync_worker/member.repo' import { getServiceLogger } from '@crowd/logging' import { MemberSyncService, OpenSearchService, getOpensearchClient } from '@crowd/opensearch' import { IndexedEntityType } from '@crowd/opensearch/src/repo/indexing.data' @@ -14,8 +12,6 @@ const log = getServiceLogger() const processArguments = process.argv.slice(2) -const MAX_CONCURRENT = 3 - setImmediate(async () => { const osClient = await getOpensearchClient(OPENSEARCH_CONFIG()) const openSearchService = new OpenSearchService(log, osClient) @@ -35,42 +31,18 @@ setImmediate(async () => { withAggs = false } - const repo = new MemberRepository(store, log) - - const tenantIds = await repo.getTenantIds() const qdbConn = await getClientSQL() const qdbStore = new DbStore(log, qdbConn) const service = new MemberSyncService(redis, store, qdbStore, openSearchService, log) - let current = 0 - for (let i = 0; i < tenantIds.length; i++) { - const tenantId = tenantIds[i] - - while (current == MAX_CONCURRENT) { - await timeout(1000) - } - - log.info(`Processing tenant ${i + 1}/${tenantIds.length}`) - current += 1 - service - .syncTenantMembers(tenantId, 500, { withAggs }) - .then(() => { - current-- - log.info(`Processed tenant ${i + 1}/${tenantIds.length}`) - }) - .catch((err) => { - current-- - log.error( - err, - { tenantId }, - `Error syncing members for tenant ${i + 1}/${tenantIds.length}!`, - ) - }) - } + log.info('Starting sync of all members') - while (current > 0) { - await timeout(1000) + try { + await service.syncAllMembers(500, { withAggs }) + log.info('Successfully synced all members') + } catch (err) { + log.error(err, 'Error syncing members!') } process.exit(0) diff --git a/services/apps/search_sync_worker/src/bin/sync-all-organizations.ts b/services/apps/search_sync_worker/src/bin/sync-all-organizations.ts index de9ebc24f1..5257ab439c 100644 --- a/services/apps/search_sync_worker/src/bin/sync-all-organizations.ts +++ b/services/apps/search_sync_worker/src/bin/sync-all-organizations.ts @@ -1,6 +1,4 @@ -import { timeout } from '@crowd/common' import { DbStore, getDbConnection } from '@crowd/data-access-layer/src/database' -import { OrganizationRepository } from '@crowd/data-access-layer/src/old/apps/search_sync_worker/organization.repo' import { getServiceLogger } from '@crowd/logging' import { OpenSearchService, OrganizationSyncService, getOpensearchClient } from '@crowd/opensearch' import { IndexedEntityType } from '@crowd/opensearch/src/repo/indexing.data' @@ -13,8 +11,6 @@ const log = getServiceLogger() const processArguments = process.argv.slice(2) -const MAX_CONCURRENT = 3 - setImmediate(async () => { const osClient = await getOpensearchClient(OPENSEARCH_CONFIG()) const openSearchService = new OpenSearchService(log, osClient) @@ -52,9 +48,6 @@ setImmediate(async () => { } const readStore = new DbStore(log, readHost) - const repo = new OrganizationRepository(readStore, log) - - const tenantIds = await repo.getTenantIds() const service = new OrganizationSyncService( qdbStore, @@ -64,34 +57,13 @@ setImmediate(async () => { readStore, ) - let current = 0 - for (let i = 0; i < tenantIds.length; i++) { - const tenantId = tenantIds[i] - - while (current == MAX_CONCURRENT) { - await timeout(1000) - } - - log.info(`Processing tenant ${i + 1}/${tenantIds.length}`) - current += 1 - service - .syncTenantOrganizations(tenantId, 500, { withAggs }) - .then(() => { - current-- - log.info(`Processed tenant ${i + 1}/${tenantIds.length}`) - }) - .catch((err) => { - current-- - log.error( - err, - { tenantId }, - `Error syncing organizations for tenant ${i + 1}/${tenantIds.length}!`, - ) - }) - } + log.info('Starting sync of all organizations') - while (current > 0) { - await timeout(1000) + try { + await service.syncAllOrganizations(500, { withAggs }) + log.info('Successfully synced all organizations') + } catch (err) { + log.error(err, 'Error syncing organizations!') } process.exit(0) diff --git a/services/apps/search_sync_worker/src/bin/sync-tenant-members.ts b/services/apps/search_sync_worker/src/bin/sync-tenant-members.ts deleted file mode 100644 index 7463066dc4..0000000000 --- a/services/apps/search_sync_worker/src/bin/sync-tenant-members.ts +++ /dev/null @@ -1,36 +0,0 @@ -import { DbStore, getDbConnection } from '@crowd/data-access-layer/src/database' -import { getServiceLogger } from '@crowd/logging' -import { MemberSyncService, OpenSearchService, getOpensearchClient } from '@crowd/opensearch' -import { getClientSQL } from '@crowd/questdb' -import { getRedisClient } from '@crowd/redis' - -import { DB_CONFIG, OPENSEARCH_CONFIG, REDIS_CONFIG } from '../conf' - -const log = getServiceLogger() - -const processArguments = process.argv.slice(2) - -if (processArguments.length !== 1) { - log.error('Expected 1 argument: tenantId') - process.exit(1) -} - -const tenantId = processArguments[0] - -setImmediate(async () => { - const osClient = await getOpensearchClient(OPENSEARCH_CONFIG()) - const openSearchService = new OpenSearchService(log, osClient) - - const redis = await getRedisClient(REDIS_CONFIG(), true) - - const dbConnection = await getDbConnection(DB_CONFIG()) - const store = new DbStore(log, dbConnection) - const qdbConn = await getClientSQL() - const qdbStore = new DbStore(log, qdbConn) - - const service = new MemberSyncService(redis, store, qdbStore, openSearchService, log) - - await service.syncTenantMembers(tenantId) - - process.exit(0) -}) diff --git a/services/apps/search_sync_worker/src/bin/sync-tenant-organizations.ts b/services/apps/search_sync_worker/src/bin/sync-tenant-organizations.ts deleted file mode 100644 index 05f5337c60..0000000000 --- a/services/apps/search_sync_worker/src/bin/sync-tenant-organizations.ts +++ /dev/null @@ -1,33 +0,0 @@ -import { DbStore, getDbConnection } from '@crowd/data-access-layer/src/database' -import { getServiceLogger } from '@crowd/logging' -import { OpenSearchService, OrganizationSyncService, getOpensearchClient } from '@crowd/opensearch' -import { getClientSQL } from '@crowd/questdb' - -import { DB_CONFIG, OPENSEARCH_CONFIG } from '../conf' - -const log = getServiceLogger() - -const processArguments = process.argv.slice(2) - -if (processArguments.length !== 1) { - log.error('Expected 1 argument: tenantId') - process.exit(1) -} - -const tenantId = processArguments[0] - -setImmediate(async () => { - const osClient = await getOpensearchClient(OPENSEARCH_CONFIG()) - const openSearchService = new OpenSearchService(log, osClient) - - const dbConnection = await getDbConnection(DB_CONFIG()) - const store = new DbStore(log, dbConnection) - const qdbConn = await getClientSQL() - const qdbStore = new DbStore(log, qdbConn) - - const service = new OrganizationSyncService(qdbStore, store, openSearchService, log) - - await service.syncTenantOrganizations(tenantId) - - process.exit(0) -}) diff --git a/services/apps/search_sync_worker/src/queue/index.ts b/services/apps/search_sync_worker/src/queue/index.ts index 06855baf30..5b31183487 100644 --- a/services/apps/search_sync_worker/src/queue/index.ts +++ b/services/apps/search_sync_worker/src/queue/index.ts @@ -98,15 +98,6 @@ export class WorkerQueueReceiver extends PrioritizedQueueReciever { }) } - break - // this one taks a while so we can't relly on it to be finished in time and the queue message might pop up again so we immediatelly return - case SearchSyncWorkerQueueMessageType.SYNC_TENANT_MEMBERS: - if (data.tenantId) { - this.initMemberService() - .syncTenantMembers(data.tenantId) - .catch((err) => this.log.error(err, 'Error while syncing tenant members!')) - } - break case SearchSyncWorkerQueueMessageType.SYNC_ORGANIZATION_MEMBERS: if (data.organizationId) { @@ -138,13 +129,6 @@ export class WorkerQueueReceiver extends PrioritizedQueueReciever { await this.initOrganizationService().syncOrganizations([data.organizationId]) } break - case SearchSyncWorkerQueueMessageType.SYNC_TENANT_ORGANIZATIONS: - if (data.tenantId) { - this.initOrganizationService() - .syncTenantOrganizations(data.tenantId) - .catch((err) => this.log.error(err, 'Error while syncing tenant organizations!')) - } - break case SearchSyncWorkerQueueMessageType.CLEANUP_TENANT_ORGANIZATIONS: if (data.tenantId) { this.initOrganizationService() diff --git a/services/libs/common_services/src/services/emitters/searchSyncWorker.emitter.ts b/services/libs/common_services/src/services/emitters/searchSyncWorker.emitter.ts index 68d2364398..c15bc1eb5d 100644 --- a/services/libs/common_services/src/services/emitters/searchSyncWorker.emitter.ts +++ b/services/libs/common_services/src/services/emitters/searchSyncWorker.emitter.ts @@ -50,16 +50,6 @@ export class SearchSyncWorkerEmitter extends QueuePriorityService { ) } - public async triggerTenantMembersSync(tenantId: string) { - if (!tenantId) { - throw new Error('tenantId is required!') - } - await this.sendMessage(tenantId, tenantId, { - type: SearchSyncWorkerQueueMessageType.SYNC_TENANT_MEMBERS, - tenantId, - }) - } - public async triggerOrganizationMembersSync( tenantId: string, organizationId: string, @@ -146,21 +136,6 @@ export class SearchSyncWorkerEmitter extends QueuePriorityService { ) } - public async triggerTenantOrganizationSync(tenantId: string) { - if (!tenantId) { - throw new Error('tenantId is required!') - } - await this.sendMessage( - tenantId, - tenantId, - { - type: SearchSyncWorkerQueueMessageType.SYNC_TENANT_ORGANIZATIONS, - tenantId, - }, - tenantId, - ) - } - public async triggerRemoveOrganization( tenantId: string, organizationId: string, diff --git a/services/libs/data-access-layer/package.json b/services/libs/data-access-layer/package.json index 819be3150d..7bacfccabd 100644 --- a/services/libs/data-access-layer/package.json +++ b/services/libs/data-access-layer/package.json @@ -16,6 +16,7 @@ "@crowd/redis": "workspace:*", "@crowd/queue": "workspace:*", "@crowd/telemetry": "workspace:*", + "@crowd/opensearch": "workspace:*", "@crowd/types": "workspace:*", "@questdb/nodejs-client": "~3.0.0", "html-to-text": "~9.0.5", diff --git a/services/libs/data-access-layer/src/members/segments.ts b/services/libs/data-access-layer/src/members/segments.ts index 56157985a7..dca0c33981 100644 --- a/services/libs/data-access-layer/src/members/segments.ts +++ b/services/libs/data-access-layer/src/members/segments.ts @@ -44,6 +44,7 @@ export async function insertMemberSegments(qx: QueryExecutor, data: IMemberSegme 'averageSentiment', ], data, + 'DO NOTHING', ), ) } catch (e) { diff --git a/services/libs/data-access-layer/src/old/apps/script_executor_worker/member.repo.ts b/services/libs/data-access-layer/src/old/apps/script_executor_worker/member.repo.ts index 126c76219d..f5b0b9e972 100644 --- a/services/libs/data-access-layer/src/old/apps/script_executor_worker/member.repo.ts +++ b/services/libs/data-access-layer/src/old/apps/script_executor_worker/member.repo.ts @@ -1,5 +1,6 @@ import { DbConnection, DbTransaction } from '@crowd/database' import { Logger } from '@crowd/logging' +import { IndexedEntityType } from '@crowd/opensearch/src/repo/indexing.data' import { IMember } from '@crowd/types' import { IFindMemberIdentitiesGroupedByPlatformResult, ISimilarMember } from './types' @@ -159,6 +160,26 @@ class MemberRepository { return member } + + public async getMembersNotInSegmentAggs(perPage: number): Promise { + const results = await this.connection.any( + ` + select distinct a."memberId" + from activities a + left join "memberSegmentsAgg" msa + on a."memberId" = msa."memberId" + and a."segmentId" = msa."segmentId" + where msa."memberId" is null + and msa."segmentId" is null + limit ${perPage}; + `, + { + type: IndexedEntityType.MEMBER, + }, + ) + + return results.map((r) => r.id) + } } export default MemberRepository diff --git a/services/libs/opensearch/src/apiClient.ts b/services/libs/opensearch/src/apiClient.ts index e505018134..f850a956c0 100644 --- a/services/libs/opensearch/src/apiClient.ts +++ b/services/libs/opensearch/src/apiClient.ts @@ -24,16 +24,6 @@ export class SearchSyncApiClient { }) } - public async triggerTenantMembersSync(tenantId: string): Promise { - if (!tenantId) { - throw new Error('tenantId is required!') - } - - await this.searchSyncApi.post('/sync/tenant/members', { - tenantId, - }) - } - public async triggerOrganizationMembersSync( tenantId: string, organizationId: string, diff --git a/services/libs/opensearch/src/repo/indexing.data.ts b/services/libs/opensearch/src/repo/indexing.data.ts index 3504097fcb..ab18886843 100644 --- a/services/libs/opensearch/src/repo/indexing.data.ts +++ b/services/libs/opensearch/src/repo/indexing.data.ts @@ -1,8 +1,3 @@ -export interface IEntityData { - id: string - tenantId: string -} - export enum IndexedEntityType { ACTIVITY = 'activity', MEMBER = 'member', diff --git a/services/libs/opensearch/src/repo/indexing.repo.ts b/services/libs/opensearch/src/repo/indexing.repo.ts index 378f9bd659..f57570165b 100644 --- a/services/libs/opensearch/src/repo/indexing.repo.ts +++ b/services/libs/opensearch/src/repo/indexing.repo.ts @@ -1,7 +1,7 @@ import { DbStore, RepositoryBase } from '@crowd/database' import { Logger } from '@crowd/logging' -import { IEntityData, IndexedEntityType } from './indexing.data' +import { IndexedEntityType } from './indexing.data' export class IndexingRepository extends RepositoryBase { constructor(dbStore: DbStore, parentLog: Logger) { @@ -19,11 +19,11 @@ export class IndexingRepository extends RepositoryBase { ) } - public async markEntitiesIndexed(type: IndexedEntityType, data: IEntityData[]): Promise { + public async markEntitiesIndexed(type: IndexedEntityType, data: string[]): Promise { if (data.length > 0) { - const values = data.map((d) => `('${type}', '${d.id}', '${d.tenantId}')`) + const values = data.map((d) => `('${type}', '${d}')`) const query = ` - insert into indexed_entities(type, entity_id, tenant_id) + insert into indexed_entities(type, entity_id) values ${values.join(',\n')} on conflict (type, entity_id) do update set indexed_at = now() diff --git a/services/libs/opensearch/src/repo/member.repo.ts b/services/libs/opensearch/src/repo/member.repo.ts index 3109cc2f6d..548e0dab44 100644 --- a/services/libs/opensearch/src/repo/member.repo.ts +++ b/services/libs/opensearch/src/repo/member.repo.ts @@ -41,22 +41,24 @@ export class MemberRepository extends RepositoryBase { return results } - public async getTenantMembersForSync(tenantId: string, perPage: number): Promise { - // eslint-disable-next-line @typescript-eslint/no-explicit-any + public async getMembersForSync(perPage: number): Promise { const results = await this.db().any( ` - select m.id - from members m - left join indexed_entities ie on m.id = ie.entity_id and ie.type = $(type) - where m."tenantId" = $(tenantId) and - ie.entity_id is null - limit ${perPage};`, + SELECT m.id + FROM members m + WHERE NOT EXISTS ( + SELECT 1 + FROM indexed_entities ie + WHERE ie.entity_id = m.id + AND ie.type = $(type) + ) + ORDER BY m.id + LIMIT ${perPage}; + `, { - tenantId, type: IndexedEntityType.MEMBER, }, ) - return results.map((r) => r.id) } diff --git a/services/libs/opensearch/src/repo/organization.repo.ts b/services/libs/opensearch/src/repo/organization.repo.ts index 01742eb07d..2b75e3a33e 100644 --- a/services/libs/opensearch/src/repo/organization.repo.ts +++ b/services/libs/opensearch/src/repo/organization.repo.ts @@ -8,19 +8,17 @@ export class OrganizationRepository extends RepositoryBase { + public async checkOrganizationsExists(orgIds: string[]): Promise { const results = await this.db().any( ` select id from organizations where - "tenantId" = $(tenantId) and id in ($(orgIds:csv)) and "deletedAt" is null `, { - tenantId, orgIds, }, ) @@ -28,8 +26,7 @@ export class OrganizationRepository extends RepositoryBase r.id) } - public async getTenantOrganizationsForSync( - tenantId: string, + public async getAllOrganizationsForSync( perPage: number, previousBatchIds: string[], ): Promise { @@ -40,13 +37,11 @@ export class OrganizationRepository extends RepositoryBase { - this.log.debug({ tenantId }, 'Syncing all tenant members!') + this.log.debug('Syncing all members!') let docCount = 0 let memberCount = 0 const now = new Date() - let memberIds = await this.memberRepo.getTenantMembersForSync(tenantId, batchSize) + let memberIds = await this.memberRepo.getMembersForSync(batchSize) while (memberIds.length > 0) { for (const memberId of memberIds) { @@ -289,29 +288,17 @@ export class MemberSyncService { const diffInSeconds = (new Date().getTime() - now.getTime()) / 1000 this.log.info( - { tenantId }, `Synced ${memberCount} members! Speed: ${Math.round( memberCount / diffInSeconds, )} members/second!`, ) - await this.indexingRepo.markEntitiesIndexed( - IndexedEntityType.MEMBER, - memberIds.map((id) => { - return { - id, - tenantId, - } - }), - ) + await this.indexingRepo.markEntitiesIndexed(IndexedEntityType.MEMBER, memberIds) - memberIds = await this.memberRepo.getTenantMembersForSync(tenantId, batchSize) + memberIds = await this.memberRepo.getMembersForSync(batchSize) } - this.log.info( - { tenantId }, - `Synced total of ${memberCount} members with ${docCount} documents!`, - ) + this.log.info(`Synced total of ${memberCount} members with ${docCount} documents!`) } public async syncOrganizationMembers( @@ -452,6 +439,8 @@ export class MemberSyncService { } if (memberData.length > 0) { + // dedup memberData so no same member-segment duplicates + memberData = distinctBy(memberData, (m) => `${m.memberId}-${m.segmentId}`) try { await this.memberRepo.transactionally( async (txRepo) => { diff --git a/services/libs/opensearch/src/service/organization.sync.service.ts b/services/libs/opensearch/src/service/organization.sync.service.ts index a611b0cabe..d46c133ee4 100644 --- a/services/libs/opensearch/src/service/organization.sync.service.ts +++ b/services/libs/opensearch/src/service/organization.sync.service.ts @@ -154,7 +154,6 @@ export class OrganizationSyncService { while (results.length > 0) { // check every organization if they exists in the database and if not remove them from the index const dbIds = await this.readOrgRepo.checkOrganizationsExists( - tenantId, results.map((r) => r._source.uuid_organizationId), ) @@ -245,12 +244,11 @@ export class OrganizationSyncService { } } - public async syncTenantOrganizations( - tenantId: string, + public async syncAllOrganizations( batchSize = 200, opts: { withAggs?: boolean } = { withAggs: true }, ): Promise { - this.log.warn({ tenantId }, 'Syncing all tenant organizations!') + this.log.warn('Syncing all organizations!') let docCount = 0 let organizationCount = 0 let previousBatchIds: string[] = [] @@ -258,8 +256,7 @@ export class OrganizationSyncService { await logExecutionTime( async () => { - let organizationIds = await this.readOrgRepo.getTenantOrganizationsForSync( - tenantId, + let organizationIds = await this.readOrgRepo.getAllOrganizationsForSync( batchSize, previousBatchIds, ) @@ -275,7 +272,6 @@ export class OrganizationSyncService { const diffInMinutes = (new Date().getTime() - now.getTime()) / 1000 / 60 this.log.info( - { tenantId }, `Synced ${organizationCount} organizations! Speed: ${Math.round( organizationCount / diffInMinutes, )} organizations/minute!`, @@ -283,17 +279,11 @@ export class OrganizationSyncService { await this.indexingRepo.markEntitiesIndexed( IndexedEntityType.ORGANIZATION, - organizationIds.map((id) => { - return { - id, - tenantId, - } - }), + organizationIds, ) previousBatchIds = organizationIds - organizationIds = await this.readOrgRepo.getTenantOrganizationsForSync( - tenantId, + organizationIds = await this.readOrgRepo.getAllOrganizationsForSync( batchSize, previousBatchIds, ) @@ -303,10 +293,7 @@ export class OrganizationSyncService { 'sync-tenant-organizations', ) - this.log.info( - { tenantId }, - `Synced total of ${organizationCount} organizations with ${docCount} documents!`, - ) + this.log.info(`Synced total of ${organizationCount} organizations with ${docCount} documents!`) } public async syncOrganizations( diff --git a/services/libs/types/src/queue/search_sync_worker/index.ts b/services/libs/types/src/queue/search_sync_worker/index.ts index ac09918f2c..1f86131106 100644 --- a/services/libs/types/src/queue/search_sync_worker/index.ts +++ b/services/libs/types/src/queue/search_sync_worker/index.ts @@ -1,12 +1,10 @@ export enum SearchSyncWorkerQueueMessageType { SYNC_MEMBER = 'sync_member', - SYNC_TENANT_MEMBERS = 'sync_tenant_members', SYNC_ORGANIZATION_MEMBERS = 'sync_organization_members', REMOVE_MEMBER = 'remove_member', CLEANUP_TENANT_MEMBERS = 'cleanup_tenant_members', SYNC_ORGANIZATION = 'sync_organization', - SYNC_TENANT_ORGANIZATIONS = 'sync_tenant_organizations', REMOVE_ORGANIZATION = 'remove_organization', CLEANUP_TENANT_ORGANIZATIONS = 'cleanup_tenant_organizations', }