diff --git a/backend/src/database/migrations/U1709228609__indexed-entities.sql b/backend/src/database/migrations/U1709228609__indexed-entities.sql new file mode 100644 index 0000000000..21bca3ecc4 --- /dev/null +++ b/backend/src/database/migrations/U1709228609__indexed-entities.sql @@ -0,0 +1 @@ +drop table indexed_entities; \ No newline at end of file diff --git a/backend/src/database/migrations/V1709228609__indexed-entities.sql b/backend/src/database/migrations/V1709228609__indexed-entities.sql new file mode 100644 index 0000000000..1bd9867ee3 --- /dev/null +++ b/backend/src/database/migrations/V1709228609__indexed-entities.sql @@ -0,0 +1,11 @@ +create table if not exists indexed_entities ( + type varchar(255) not null, + entity_id uuid not null, + tenant_id uuid not null, + indexed_at timestamptz not null default now(), + + primary key (type, entity_id) +); + +create index if not exists ix_indexed_entities_tenant on indexed_entities (tenant_id); +create index if not exists ix_indexed_entities_type on indexed_entities (type); \ No newline at end of file diff --git a/services/apps/search_sync_worker/src/bin/sync-all-activities.ts b/services/apps/search_sync_worker/src/bin/sync-all-activities.ts index e87455d770..4b4976ae59 100644 --- a/services/apps/search_sync_worker/src/bin/sync-all-activities.ts +++ b/services/apps/search_sync_worker/src/bin/sync-all-activities.ts @@ -4,9 +4,13 @@ import { ActivityRepository } from '@crowd/data-access-layer/src/old/apps/search import { timeout } from '@crowd/common' import { DbStore, getDbConnection } from '@crowd/data-access-layer/src/database' import { getServiceLogger } from '@crowd/logging' +import { IndexingRepository } from '@crowd/opensearch/src/repo/indexing.repo' +import { IndexedEntityType } from '@crowd/opensearch/src/repo/indexing.data' const log = getServiceLogger() +const processArguments = process.argv.slice(2) + const MAX_CONCURRENT = 3 setImmediate(async () => { @@ -15,6 +19,11 @@ setImmediate(async () => { const dbConnection = await getDbConnection(DB_CONFIG()) const store = new DbStore(log, dbConnection) + if (processArguments.includes('--clean')) { + const indexingRepo = new IndexingRepository(store, log) + await indexingRepo.deleteIndexedEntities(IndexedEntityType.ACTIVITY) + } + const repo = new ActivityRepository(store, log) const tenantIds = await repo.getTenantIds() diff --git a/services/apps/search_sync_worker/src/bin/sync-all-members.ts b/services/apps/search_sync_worker/src/bin/sync-all-members.ts index 0bbba3893b..c990b3cba4 100644 --- a/services/apps/search_sync_worker/src/bin/sync-all-members.ts +++ b/services/apps/search_sync_worker/src/bin/sync-all-members.ts @@ -5,9 +5,13 @@ import { DbStore, getDbConnection } from '@crowd/data-access-layer/src/database' import { getServiceLogger } from '@crowd/logging' import { getRedisClient } from '@crowd/redis' import { timeout } from '@crowd/common' +import { IndexingRepository } from '@crowd/opensearch/src/repo/indexing.repo' +import { IndexedEntityType } from '@crowd/opensearch/src/repo/indexing.data' const log = getServiceLogger() +const processArguments = process.argv.slice(2) + const MAX_CONCURRENT = 3 setImmediate(async () => { @@ -18,6 +22,11 @@ setImmediate(async () => { const dbConnection = await getDbConnection(DB_CONFIG()) const store = new DbStore(log, dbConnection) + if (processArguments.includes('--clean')) { + const indexingRepo = new IndexingRepository(store, log) + await indexingRepo.deleteIndexedEntities(IndexedEntityType.MEMBER) + } + const repo = new MemberRepository(redis, store, log) const tenantIds = await repo.getTenantIds() diff --git a/services/apps/search_sync_worker/src/bin/sync-all-organizations.ts b/services/apps/search_sync_worker/src/bin/sync-all-organizations.ts index bb031a35be..0ef55f5279 100644 --- a/services/apps/search_sync_worker/src/bin/sync-all-organizations.ts +++ b/services/apps/search_sync_worker/src/bin/sync-all-organizations.ts @@ -4,9 +4,13 @@ import { DbStore, getDbConnection } from '@crowd/data-access-layer/src/database' import { getServiceLogger } from '@crowd/logging' import { OrganizationRepository } from '@crowd/data-access-layer/src/old/apps/search_sync_worker/organization.repo' import { timeout } from '@crowd/common' +import { IndexingRepository } from '@crowd/opensearch/src/repo/indexing.repo' +import { IndexedEntityType } from '@crowd/opensearch/src/repo/indexing.data' const log = getServiceLogger() +const processArguments = process.argv.slice(2) + const MAX_CONCURRENT = 3 setImmediate(async () => { @@ -15,6 +19,11 @@ setImmediate(async () => { const dbConnection = await getDbConnection(DB_CONFIG()) const store = new DbStore(log, dbConnection) + if (processArguments.includes('--clean')) { + const indexingRepo = new IndexingRepository(store, log) + await indexingRepo.deleteIndexedEntities(IndexedEntityType.ORGANIZATION) + } + const repo = new OrganizationRepository(store, log) const tenantIds = await repo.getTenantIds() diff --git a/services/libs/opensearch/src/repo/activity.repo.ts b/services/libs/opensearch/src/repo/activity.repo.ts index 9b1105751b..25d48210e5 100644 --- a/services/libs/opensearch/src/repo/activity.repo.ts +++ b/services/libs/opensearch/src/repo/activity.repo.ts @@ -1,6 +1,7 @@ import { DbStore, RepositoryBase } from '@crowd/database' import { Logger } from '@crowd/logging' import { IDbActivitySyncData } from './activity.data' +import { IndexedEntityType } from './indexing.data' export class ActivityRepository extends RepositoryBase { constructor(dbStore: DbStore, parentLog: Logger) { @@ -57,40 +58,20 @@ export class ActivityRepository extends RepositoryBase { return results.map((r) => r.id) } - public async getTenantActivitiesForSync( - tenantId: string, - perPage: number, - lastId?: string, - ): Promise { - // eslint-disable-next-line @typescript-eslint/no-explicit-any - let results: any[] - - if (lastId) { - results = await this.db().any( - ` - select id from activities - where "tenantId" = $(tenantId) and "deletedAt" is null and id > $(lastId) - order by id - limit ${perPage}; - `, - { - tenantId, - lastId, - }, - ) - } else { - results = await this.db().any( - ` - select id from activities - where "tenantId" = $(tenantId) and "deletedAt" is null - order by id - limit ${perPage}; + public async getTenantActivitiesForSync(tenantId: string, perPage: number): Promise { + const results = await this.db().any( + ` + select id from activities a + left join indexed_entities ie on + a.id = ie.entity_id and ie.type = $(type) + where a."tenantId" = $(tenantId) and a."deletedAt" is null and ie.entity_id is null + limit ${perPage} `, - { - tenantId, - }, - ) - } + { + tenantId, + type: IndexedEntityType.ACTIVITY, + }, + ) return results.map((r) => r.id) } diff --git a/services/libs/opensearch/src/repo/indexing.data.ts b/services/libs/opensearch/src/repo/indexing.data.ts new file mode 100644 index 0000000000..3504097fcb --- /dev/null +++ b/services/libs/opensearch/src/repo/indexing.data.ts @@ -0,0 +1,10 @@ +export interface IEntityData { + id: string + tenantId: string +} + +export enum IndexedEntityType { + ACTIVITY = 'activity', + MEMBER = 'member', + ORGANIZATION = 'organization', +} diff --git a/services/libs/opensearch/src/repo/indexing.repo.ts b/services/libs/opensearch/src/repo/indexing.repo.ts new file mode 100644 index 0000000000..0e38b12aab --- /dev/null +++ b/services/libs/opensearch/src/repo/indexing.repo.ts @@ -0,0 +1,33 @@ +import { DbStore, RepositoryBase } from '@crowd/database' +import { Logger } from '@crowd/logging' +import { IEntityData, IndexedEntityType } from './indexing.data' + +export class IndexingRepository extends RepositoryBase { + constructor(dbStore: DbStore, parentLog: Logger) { + super(dbStore, parentLog) + } + + public async deleteIndexedEntities(type: IndexedEntityType): Promise { + await this.db().none( + ` + delete from indexed_entities where type = $(type) + `, + { + type, + }, + ) + } + + public async markEntitiesIndexed(type: IndexedEntityType, data: IEntityData[]): Promise { + if (data.length > 0) { + const values = data.map((d) => `('${type}', '${d.id}', '${d.tenantId}')`) + const query = ` + insert into indexed_entities(type, entity_id, tenant_id) + values ${values.join(',\n')} + on conflict (type, entity_id) + do update set indexed_at = now() + ` + await this.db().none(query) + } + } +} diff --git a/services/libs/opensearch/src/repo/member.repo.ts b/services/libs/opensearch/src/repo/member.repo.ts index ce1f634144..2df91ad69f 100644 --- a/services/libs/opensearch/src/repo/member.repo.ts +++ b/services/libs/opensearch/src/repo/member.repo.ts @@ -3,6 +3,7 @@ import { Logger } from '@crowd/logging' import { RedisCache, RedisClient } from '@crowd/redis' import { IMemberAttribute } from '@crowd/types' import { IDbMemberSyncData, IMemberSegment, IMemberSegmentMatrix } from './member.data' +import { IndexedEntityType } from './indexing.data' export class MemberRepository extends RepositoryBase { private readonly cache: RedisCache @@ -38,53 +39,21 @@ export class MemberRepository extends RepositoryBase { return results } - public async getTenantMembersForSync( - tenantId: string, - perPage: number, - lastId?: string, - ): Promise { + public async getTenantMembersForSync(tenantId: string, perPage: number): Promise { // eslint-disable-next-line @typescript-eslint/no-explicit-any - let results: any[] - - if (lastId) { - results = await this.db().any( - ` - select m.id - from members m - where m."tenantId" = $(tenantId) and - m."deletedAt" is null and - m.id > $(lastId) and - ( - exists (select 1 from activities where "memberId" = m.id) or - m."manuallyCreated" - ) and - exists (select 1 from "memberIdentities" where "memberId" = m.id) - order by m.id - limit ${perPage};`, - { - tenantId, - lastId, - }, - ) - } else { - results = await this.db().any( - ` - select m.id - from members m - where m."tenantId" = $(tenantId) and - m."deletedAt" is null and - ( - exists (select 1 from activities where "memberId" = m.id) or - m."manuallyCreated" - ) and - exists (select 1 from "memberIdentities" where "memberId" = m.id) - order by m.id - limit ${perPage};`, - { - tenantId, - }, - ) - } + const results = await this.db().any( + ` + select m.id + from members m + left join indexed_entities ie on m.id = ie.entity_id and ie.type = $(type) + where m."tenantId" = $(tenantId) and + ie.entity_id is null + limit ${perPage};`, + { + tenantId, + type: IndexedEntityType.MEMBER, + }, + ) return results.map((r) => r.id) } @@ -365,187 +334,187 @@ export class MemberRepository extends RepositoryBase { ): Promise { const results = await this.db().oneOrNone( ` - with to_merge_data as ( - select mtm."memberId", - array_agg(distinct mtm."toMergeId"::text) as to_merge_ids - from "memberToMerge" mtm - inner join members m2 on mtm."toMergeId" = m2.id - where mtm."memberId" = $(memberId) - and m2."deletedAt" is null - group by mtm."memberId"), - no_merge_data as ( - select mnm."memberId", - array_agg(distinct mnm."noMergeId"::text) as no_merge_ids - from "memberNoMerge" mnm - inner join members m2 on mnm."noMergeId" = m2.id - where mnm."memberId" = $(memberId) - and m2."deletedAt" is null - group by mnm."memberId"), - member_tags as ( - select mt."memberId", - json_agg( - json_build_object( - 'id', t.id, - 'name', t.name + with to_merge_data as ( + select mtm."memberId", + array_agg(distinct mtm."toMergeId"::text) as to_merge_ids + from "memberToMerge" mtm + inner join members m2 on mtm."toMergeId" = m2.id + where mtm."memberId" = $(memberId) + and m2."deletedAt" is null + group by mtm."memberId"), + no_merge_data as ( + select mnm."memberId", + array_agg(distinct mnm."noMergeId"::text) as no_merge_ids + from "memberNoMerge" mnm + inner join members m2 on mnm."noMergeId" = m2.id + where mnm."memberId" = $(memberId) + and m2."deletedAt" is null + group by mnm."memberId"), + member_tags as ( + select mt."memberId", + json_agg( + json_build_object( + 'id', t.id, + 'name', t.name + ) + ) as all_tags, + jsonb_agg(t.id) as all_ids + from "memberTags" mt + inner join tags t on mt."tagId" = t.id + where mt."memberId" = $(memberId) + and t."deletedAt" is null + group by mt."memberId"), + member_organizations as ( + select mo."memberId", + json_agg( + json_build_object( + 'id', o.id, + 'logo', o.logo, + 'website', o.website, + 'displayName', o."displayName", + 'memberOrganizations', json_build_object( + 'dateStart', mo."dateStart", + 'dateEnd', mo."dateEnd", + 'source', mo."source", + 'title', mo.title ) - ) as all_tags, - jsonb_agg(t.id) as all_ids - from "memberTags" mt - inner join tags t on mt."tagId" = t.id - where mt."memberId" = $(memberId) - and t."deletedAt" is null - group by mt."memberId"), - member_organizations as ( - select mo."memberId", - json_agg( - json_build_object( - 'id', o.id, - 'logo', o.logo, - 'website', o.website, - 'displayName', o."displayName", - 'memberOrganizations', json_build_object( - 'dateStart', mo."dateStart", - 'dateEnd', mo."dateEnd", - 'source', mo."source", - 'title', mo.title - ) - ) - ) as all_organizations, - jsonb_agg(o.id) as all_ids - from "memberOrganizations" mo - inner join organizations o on mo."organizationId" = o.id - where mo."memberId" = $(memberId) - and mo."deletedAt" is null - and o."deletedAt" is null - group by mo."memberId"), - identities as ( - select mi."memberId", + ) + ) as all_organizations, + jsonb_agg(o.id) as all_ids + from "memberOrganizations" mo + inner join organizations o on mo."organizationId" = o.id + where mo."memberId" = $(memberId) + and mo."deletedAt" is null + and o."deletedAt" is null + group by mo."memberId"), + identities as ( + select mi."memberId", + json_agg( + json_build_object( + 'platform', mi.platform, + 'username', mi.username + ) + ) as identities + from "memberIdentities" mi + where mi."memberId" = $(memberId) + group by mi."memberId"), + activity_data as ( + select a."memberId", + count(a.id) as "activityCount", + max(a.timestamp) as "lastActive", + array_agg(distinct concat(a.platform, ':', a.type)) + filter (where a.platform is not null) as "activityTypes", + array_agg(distinct a.platform) filter (where a.platform is not null) as "activeOn", + count(distinct a."timestamp"::date) as "activeDaysCount", + round(avg( + case + when (a.sentiment ->> 'sentiment'::text) is not null + then (a.sentiment ->> 'sentiment'::text)::double precision + else null::double precision + end)::numeric, 2) as "averageSentiment" + from activities a + where a."memberId" = $(memberId) + and a."segmentId" = $(segmentId) + group by a."memberId"), + member_affiliations as ( + select msa."memberId", json_agg( - json_build_object( - 'platform', mi.platform, - 'username', mi.username - ) - ) as identities - from "memberIdentities" mi - where mi."memberId" = $(memberId) - group by mi."memberId"), - activity_data as ( - select a."memberId", - count(a.id) as "activityCount", - max(a.timestamp) as "lastActive", - array_agg(distinct concat(a.platform, ':', a.type)) - filter (where a.platform is not null) as "activityTypes", - array_agg(distinct a.platform) filter (where a.platform is not null) as "activeOn", - count(distinct a."timestamp"::date) as "activeDaysCount", - round(avg( - case - when (a.sentiment ->> 'sentiment'::text) is not null - then (a.sentiment ->> 'sentiment'::text)::double precision - else null::double precision - end)::numeric, 2) as "averageSentiment" - from activities a - where a."memberId" = $(memberId) - and a."segmentId" = $(segmentId) - group by a."memberId"), - member_affiliations as ( - select msa."memberId", - json_agg( - json_build_object( - 'id', msa.id, - 'segmentId', s.id, - 'segmentSlug', s.slug, - 'segmentName', s.name, - 'segmentParentName', s."parentName", - 'organizationId', o.id, - 'organizationName', o."displayName", - 'organizationLogo', o.logo, - 'dateStart', msa."dateStart", - 'dateEnd', msa."dateEnd" - ) - ) as all_affiliations, - jsonb_agg(msa.id) as all_ids - from "memberSegmentAffiliations" msa - left join organizations o on o.id = msa."organizationId" - inner join segments s on s.id = msa."segmentId" - where msa."memberId" = $(memberId) - group by msa."memberId"), - member_notes as ( - select mn."memberId", - json_agg( - json_build_object( - 'id', n.id, - 'body', n.body - ) - ) as all_notes, - jsonb_agg(n.id) as all_ids - from "memberNotes" mn - inner join notes n on mn."noteId" = n.id - where mn."memberId" = $(memberId) - and n."deletedAt" is null - group by mn."memberId"), - member_tasks as ( - select mtk."memberId", - json_agg( json_build_object( - 'id', tk.id, - 'name', tk.name, - 'body', tk.body, - 'status', tk.status, - 'dueDate', tk."dueDate", - 'type', tk.type + 'id', msa.id, + 'segmentId', s.id, + 'segmentSlug', s.slug, + 'segmentName', s.name, + 'segmentParentName', s."parentName", + 'organizationId', o.id, + 'organizationName', o."displayName", + 'organizationLogo', o.logo, + 'dateStart', msa."dateStart", + 'dateEnd', msa."dateEnd" ) - ) as all_tasks, - jsonb_agg(tk.id) as all_ids - from "memberTasks" mtk - inner join tasks tk on mtk."taskId" = tk.id - where mtk."memberId" = $(memberId) - and tk."deletedAt" is null - group by mtk."memberId") - select - m.id, - m."tenantId", - $(segmentId) as "segmentId", - m."displayName", - m.attributes, - coalesce(m.contributions, '[]'::jsonb) as contributions, - m.emails, - m.score, - m."lastEnriched", - m."joinedAt", - m."manuallyCreated", - m."createdAt", - m.reach, - coalesce(jsonb_array_length(m.contributions), 0) as "numberOfOpenSourceContributions", - - ad."activeOn", - ad."activityCount"::integer, - ad."activityTypes", - ad."activeDaysCount"::integer, - ad."lastActive", - ad."averageSentiment", - - coalesce(i.identities, '[]'::json) as "identities", - coalesce(m."weakIdentities", '[]'::jsonb) as "weakIdentities", - coalesce(mo.all_organizations, json_build_array()) as organizations, - coalesce(mt.all_tags, json_build_array()) as tags, - coalesce(ma.all_affiliations, json_build_array()) as affiliations, - coalesce(mn.all_notes, json_build_array()) as notes, - coalesce(mtk.all_tasks, json_build_array()) as tasks, - coalesce(tmd.to_merge_ids, array []::text[]) as "toMergeIds", - coalesce(nmd.no_merge_ids, array []::text[]) as "noMergeIds" - from members m - inner join identities i on m.id = i."memberId" - left join activity_data ad on m.id = ad."memberId" - left join to_merge_data tmd on m.id = tmd."memberId" - left join no_merge_data nmd on m.id = nmd."memberId" - left join member_tags mt on m.id = mt."memberId" - left join member_affiliations ma on m.id = ma."memberId" - left join member_notes mn on m.id = mn."memberId" - left join member_tasks mtk on m.id = mtk."memberId" - left join member_organizations mo on m.id = mo."memberId" - where m.id = $(memberId) - and m."deletedAt" is null - and (ad."memberId" is not null or m."manuallyCreated");`, + ) as all_affiliations, + jsonb_agg(msa.id) as all_ids + from "memberSegmentAffiliations" msa + left join organizations o on o.id = msa."organizationId" + inner join segments s on s.id = msa."segmentId" + where msa."memberId" = $(memberId) + group by msa."memberId"), + member_notes as ( + select mn."memberId", + json_agg( + json_build_object( + 'id', n.id, + 'body', n.body + ) + ) as all_notes, + jsonb_agg(n.id) as all_ids + from "memberNotes" mn + inner join notes n on mn."noteId" = n.id + where mn."memberId" = $(memberId) + and n."deletedAt" is null + group by mn."memberId"), + member_tasks as ( + select mtk."memberId", + json_agg( + json_build_object( + 'id', tk.id, + 'name', tk.name, + 'body', tk.body, + 'status', tk.status, + 'dueDate', tk."dueDate", + 'type', tk.type + ) + ) as all_tasks, + jsonb_agg(tk.id) as all_ids + from "memberTasks" mtk + inner join tasks tk on mtk."taskId" = tk.id + where mtk."memberId" = $(memberId) + and tk."deletedAt" is null + group by mtk."memberId") + select + m.id, + m."tenantId", + $(segmentId) as "segmentId", + m."displayName", + m.attributes, + coalesce(m.contributions, '[]'::jsonb) as contributions, + m.emails, + m.score, + m."lastEnriched", + m."joinedAt", + m."manuallyCreated", + m."createdAt", + m.reach, + coalesce(jsonb_array_length(m.contributions), 0) as "numberOfOpenSourceContributions", + + coalesce(ad."activeOn", array []::text[]) as "activeOn", + coalesce(ad."activityCount", 0)::integer as "activityCount", + coalesce(ad."activityTypes", array []::text[]) as "activityTypes", + coalesce(ad."activeDaysCount", 0)::integer as "activeDaysCount", + ad."lastActive", + ad."averageSentiment", + + coalesce(i.identities, '[]'::json) as "identities", + coalesce(m."weakIdentities", '[]'::jsonb) as "weakIdentities", + coalesce(mo.all_organizations, json_build_array()) as organizations, + coalesce(mt.all_tags, json_build_array()) as tags, + coalesce(ma.all_affiliations, json_build_array()) as affiliations, + coalesce(mn.all_notes, json_build_array()) as notes, + coalesce(mtk.all_tasks, json_build_array()) as tasks, + coalesce(tmd.to_merge_ids, array []::text[]) as "toMergeIds", + coalesce(nmd.no_merge_ids, array []::text[]) as "noMergeIds" + from members m + inner join identities i on m.id = i."memberId" + left join activity_data ad on m.id = ad."memberId" + left join to_merge_data tmd on m.id = tmd."memberId" + left join no_merge_data nmd on m.id = nmd."memberId" + left join member_tags mt on m.id = mt."memberId" + left join member_affiliations ma on m.id = ma."memberId" + left join member_notes mn on m.id = mn."memberId" + left join member_tasks mtk on m.id = mtk."memberId" + left join member_organizations mo on m.id = mo."memberId" + where m.id = $(memberId) + and m."deletedAt" is null + and (ad."memberId" is not null or m."manuallyCreated");`, { memberId, segmentId, diff --git a/services/libs/opensearch/src/repo/organization.repo.ts b/services/libs/opensearch/src/repo/organization.repo.ts index 524538522f..d90ed780b2 100644 --- a/services/libs/opensearch/src/repo/organization.repo.ts +++ b/services/libs/opensearch/src/repo/organization.repo.ts @@ -1,6 +1,7 @@ import { DbStore, RepositoryBase } from '@crowd/database' import { Logger } from '@crowd/logging' import { IDbOrganizationSyncData, IOrganizationSegmentMatrix } from './organization.data' +import { IndexedEntityType } from './indexing.data' export class OrganizationRepository extends RepositoryBase { constructor(dbStore: DbStore, parentLog: Logger) { @@ -265,26 +266,19 @@ export class OrganizationRepository extends RepositoryBase r.id) } - public async getTenantOrganizationsForSync( - tenantId: string, - page: number, - perPage: number, - cutoffDate: string, - ): Promise { + public async getTenantOrganizationsForSync(tenantId: string, perPage: number): Promise { const results = await this.db().any( ` - select o.id - from organizations o - where o."tenantId" = $(tenantId) and - o."deletedAt" is null and - ( - o."searchSyncedAt" is null or - o."searchSyncedAt" < $(cutoffDate) - ) - limit ${perPage} offset ${(page - 1) * perPage};`, + select o.id + from organizations o + left join indexed_entities ie on o.id = ie.entity_id and ie.type = $(type) + where o."tenantId" = $(tenantId) and + o."deletedAt" is null and + ie.entity_id is null + limit ${perPage}`, { tenantId, - cutoffDate, + type: IndexedEntityType.ORGANIZATION, }, ) diff --git a/services/libs/opensearch/src/service/activity.sync.service.ts b/services/libs/opensearch/src/service/activity.sync.service.ts index 255a043de6..6690c1e826 100644 --- a/services/libs/opensearch/src/service/activity.sync.service.ts +++ b/services/libs/opensearch/src/service/activity.sync.service.ts @@ -6,11 +6,14 @@ import { DbStore } from '@crowd/database' import { Logger, getChildLogger, logExecutionTime } from '@crowd/logging' import { IPagedSearchResponse, ISearchHit } from './opensearch.data' import { OpenSearchService } from './opensearch.service' +import { IndexingRepository } from '../repo/indexing.repo' +import { IndexedEntityType } from '../repo/indexing.data' export class ActivitySyncService { private static MAX_BYTE_LENGTH = 25000 private log: Logger private readonly activityRepo: ActivityRepository + private readonly indexingRepo: IndexingRepository constructor( store: DbStore, @@ -20,6 +23,7 @@ export class ActivitySyncService { this.log = getChildLogger('activity-sync-service', parentLog) this.activityRepo = new ActivityRepository(store, this.log) + this.indexingRepo = new IndexingRepository(store, this.log) } public async getAllIndexedTenantIds( @@ -138,7 +142,6 @@ export class ActivitySyncService { this.log.debug({ tenantId }, 'Syncing all tenant activities!') let count = 0 const now = new Date() - const cutoffDate = now.toISOString() await logExecutionTime( async () => { @@ -155,37 +158,17 @@ export class ActivitySyncService { )} activities/second!`, ) - activityIds = await this.activityRepo.getTenantActivitiesForSync( - tenantId, - batchSize, - activityIds[activityIds.length - 1], + await this.indexingRepo.markEntitiesIndexed( + IndexedEntityType.ACTIVITY, + activityIds.map((id) => { + return { + id, + tenantId, + } + }), ) - } - - activityIds = await this.activityRepo.getRemainingTenantActivitiesForSync( - tenantId, - 1, - batchSize, - cutoffDate, - ) - while (activityIds.length > 0) { - count += await this.syncActivities(activityIds) - - const diffInSeconds = (new Date().getTime() - now.getTime()) / 1000 - this.log.info( - { tenantId }, - `Synced ${count} activities! Speed: ${Math.round( - count / diffInSeconds, - )} activities/second!`, - ) - - activityIds = await this.activityRepo.getRemainingTenantActivitiesForSync( - tenantId, - 1, - batchSize, - cutoffDate, - ) + activityIds = await this.activityRepo.getTenantActivitiesForSync(tenantId, batchSize) } }, this.log, diff --git a/services/libs/opensearch/src/service/member.sync.service.ts b/services/libs/opensearch/src/service/member.sync.service.ts index 972ee9d52e..004af4339d 100644 --- a/services/libs/opensearch/src/service/member.sync.service.ts +++ b/services/libs/opensearch/src/service/member.sync.service.ts @@ -5,12 +5,14 @@ import { SegmentRepository } from '../repo/segment.repo' import { OpenSearchIndex } from '../types' import { distinct, distinctBy, groupBy, trimUtf8ToMaxByteLength } from '@crowd/common' import { DbStore } from '@crowd/database' -import { Logger, getChildLogger, logExecutionTime } from '@crowd/logging' +import { Logger, getChildLogger } from '@crowd/logging' import { RedisClient } from '@crowd/redis' import { Edition, IMemberAttribute, IServiceConfig, MemberAttributeType } from '@crowd/types' import { IMemberSyncResult } from './member.sync.data' import { IIndexRequest, IPagedSearchResponse, ISearchHit } from './opensearch.data' import { OpenSearchService } from './opensearch.service' +import { IndexingRepository } from '../repo/indexing.repo' +import { IndexedEntityType } from '../repo/indexing.data' export class MemberSyncService { private static MAX_BYTE_LENGTH = 25000 @@ -18,6 +20,7 @@ export class MemberSyncService { private readonly memberRepo: MemberRepository private readonly segmentRepo: SegmentRepository private readonly serviceConfig: IServiceConfig + private readonly indexingRepo: IndexingRepository constructor( redisClient: RedisClient, @@ -31,6 +34,7 @@ export class MemberSyncService { this.memberRepo = new MemberRepository(redisClient, store, this.log) this.segmentRepo = new SegmentRepository(store, this.log) + this.indexingRepo = new IndexingRepository(store, this.log) } public async getAllIndexedTenantIds( @@ -203,64 +207,35 @@ export class MemberSyncService { let memberCount = 0 const now = new Date() - const cutoffDate = now.toISOString() - await logExecutionTime( - async () => { - let memberIds = await this.memberRepo.getTenantMembersForSync(tenantId, batchSize) + let memberIds = await this.memberRepo.getTenantMembersForSync(tenantId, batchSize) - while (memberIds.length > 0) { - const { membersSynced, documentsIndexed } = await this.syncMembers(memberIds) + while (memberIds.length > 0) { + const { membersSynced, documentsIndexed } = await this.syncMembers(memberIds) - docCount += documentsIndexed - memberCount += membersSynced + docCount += documentsIndexed + memberCount += membersSynced - const diffInSeconds = (new Date().getTime() - now.getTime()) / 1000 - this.log.info( - { tenantId }, - `Synced ${memberCount} members! Speed: ${Math.round( - memberCount / diffInSeconds, - )} members/second!`, - ) - memberIds = await this.memberRepo.getTenantMembersForSync( - tenantId, - batchSize, - memberIds[memberIds.length - 1], - ) - } - - memberIds = await this.memberRepo.getRemainingTenantMembersForSync( - tenantId, - 1, - batchSize, - cutoffDate, - ) - - while (memberIds.length > 0) { - const { membersSynced, documentsIndexed } = await this.syncMembers(memberIds) - - memberCount += membersSynced - docCount += documentsIndexed - - const diffInSeconds = (new Date().getTime() - now.getTime()) / 1000 - this.log.info( - { tenantId }, - `Synced ${memberCount} members! Speed: ${Math.round( - memberCount / diffInSeconds, - )} members/second!`, - ) + const diffInSeconds = (new Date().getTime() - now.getTime()) / 1000 + this.log.info( + { tenantId }, + `Synced ${memberCount} members! Speed: ${Math.round( + memberCount / diffInSeconds, + )} members/second!`, + ) - memberIds = await this.memberRepo.getRemainingTenantMembersForSync( + await this.indexingRepo.markEntitiesIndexed( + IndexedEntityType.MEMBER, + memberIds.map((id) => { + return { + id, tenantId, - 1, - batchSize, - cutoffDate, - ) - } - }, - this.log, - 'sync-tenant-members', - ) + } + }), + ) + + memberIds = await this.memberRepo.getTenantMembersForSync(tenantId, batchSize) + } this.log.info( { tenantId }, @@ -275,36 +250,27 @@ export class MemberSyncService { const now = new Date() - await logExecutionTime( - async () => { - let memberIds = await this.memberRepo.getOrganizationMembersForSync( - organizationId, - batchSize, - ) - - while (memberIds.length > 0) { - const { membersSynced, documentsIndexed } = await this.syncMembers(memberIds) - - docCount += documentsIndexed - memberCount += membersSynced - - const diffInSeconds = (new Date().getTime() - now.getTime()) / 1000 - this.log.info( - { organizationId }, - `Synced ${memberCount} members! Speed: ${Math.round( - memberCount / diffInSeconds, - )} members/second!`, - ) - memberIds = await this.memberRepo.getOrganizationMembersForSync( - organizationId, - batchSize, - memberIds[memberIds.length - 1], - ) - } - }, - this.log, - 'sync-organization-members', - ) + let memberIds = await this.memberRepo.getOrganizationMembersForSync(organizationId, batchSize) + + while (memberIds.length > 0) { + const { membersSynced, documentsIndexed } = await this.syncMembers(memberIds) + + docCount += documentsIndexed + memberCount += membersSynced + + const diffInSeconds = (new Date().getTime() - now.getTime()) / 1000 + this.log.info( + { organizationId }, + `Synced ${memberCount} members! Speed: ${Math.round( + memberCount / diffInSeconds, + )} members/second!`, + ) + memberIds = await this.memberRepo.getOrganizationMembersForSync( + organizationId, + batchSize, + memberIds[memberIds.length - 1], + ) + } this.log.info( { organizationId }, @@ -313,7 +279,7 @@ export class MemberSyncService { } public async syncMembers(memberIds: string[], segmentIds?: string[]): Promise { - const CONCURRENT_DATABASE_QUERIES = 5 + const CONCURRENT_DATABASE_QUERIES = 10 const BULK_INDEX_DOCUMENT_BATCH_SIZE = 2500 // get all memberId-segmentId couples @@ -334,6 +300,12 @@ export class MemberSyncService { while (results.length > 0) { const result = results.shift() + + if (!result) { + index += 1 + continue + } + const { memberId, segmentId } = databaseStream[index] const memberSegments = memberSegmentCouples[memberId] diff --git a/services/libs/opensearch/src/service/organization.sync.service.ts b/services/libs/opensearch/src/service/organization.sync.service.ts index ed3048dabf..566f5c321e 100644 --- a/services/libs/opensearch/src/service/organization.sync.service.ts +++ b/services/libs/opensearch/src/service/organization.sync.service.ts @@ -10,12 +10,15 @@ import { IPagedSearchResponse, ISearchHit } from './opensearch.data' import { OpenSearchService } from './opensearch.service' import { IOrganizationSyncResult } from './organization.sync.data' import { IServiceConfig } from '@crowd/types' +import { IndexingRepository } from '../repo/indexing.repo' +import { IndexedEntityType } from '../repo/indexing.data' export class OrganizationSyncService { private log: Logger private readonly orgRepo: OrganizationRepository private readonly segmentRepo: SegmentRepository private readonly serviceConfig: IServiceConfig + private readonly indexingRepo: IndexingRepository constructor( store: DbStore, @@ -28,6 +31,7 @@ export class OrganizationSyncService { this.orgRepo = new OrganizationRepository(store, this.log) this.segmentRepo = new SegmentRepository(store, this.log) + this.indexingRepo = new IndexingRepository(store, this.log) } public async getAllIndexedTenantIds( @@ -197,25 +201,14 @@ export class OrganizationSyncService { } } - public async syncTenantOrganizations( - tenantId: string, - batchSize = 100, - syncCutoffTime?: string, - ): Promise { - const cutoffDate = syncCutoffTime ? syncCutoffTime : new Date().toISOString() - - this.log.warn({ tenantId, cutoffDate }, 'Syncing all tenant organizations!') + public async syncTenantOrganizations(tenantId: string, batchSize = 100): Promise { + this.log.warn({ tenantId }, 'Syncing all tenant organizations!') let docCount = 0 let organizationCount = 0 await logExecutionTime( async () => { - let organizationIds = await this.orgRepo.getTenantOrganizationsForSync( - tenantId, - 1, - batchSize, - cutoffDate, - ) + let organizationIds = await this.orgRepo.getTenantOrganizationsForSync(tenantId, batchSize) while (organizationIds.length > 0) { const { organizationsSynced, documentsIndexed } = await this.syncOrganizations( @@ -229,12 +222,17 @@ export class OrganizationSyncService { { tenantId }, `Synced ${organizationCount} organizations with ${docCount} documents!`, ) - organizationIds = await this.orgRepo.getTenantOrganizationsForSync( - tenantId, - 1, - batchSize, - cutoffDate, + + await this.indexingRepo.markEntitiesIndexed( + IndexedEntityType.ORGANIZATION, + organizationIds.map((id) => { + return { + id, + tenantId, + } + }), ) + organizationIds = await this.orgRepo.getTenantOrganizationsForSync(tenantId, batchSize) } }, this.log,