Skip to content
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
4b28873
rm tenant logic in search-sync scripts and logic
skwowet Jan 29, 2025
25fa2fc
move member sync script to script-executor-worker
skwowet Jan 29, 2025
0f77c45
rm memberIds support in workflow
skwowet Jan 29, 2025
7bde72e
fix
skwowet Jan 29, 2025
e50d1cc
change startToCloseTimeout
skwowet Jan 29, 2025
5ecfca9
change startToCloseTimeout
skwowet Jan 29, 2025
0fd156a
run prettier
skwowet Jan 29, 2025
b12c575
add dedup improvement
skwowet Jan 29, 2025
6398d7f
fix ci cd
skwowet Jan 29, 2025
fb30296
fix workflow timeout
skwowet Jan 30, 2025
0b2795a
change default batchSize
skwowet Jan 30, 2025
537d325
check if parallel processing works
skwowet Jan 31, 2025
4e45aa9
increase chunk size
skwowet Jan 31, 2025
6917563
parallel syncing
skwowet Jan 31, 2025
9b39513
customize chunkSize
skwowet Jan 31, 2025
5a2cdd7
improve
skwowet Jan 31, 2025
a9031af
fix speed calculation
skwowet Jan 31, 2025
053ce54
fix calc
skwowet Jan 31, 2025
086a2d3
Merge branch 'main' into fix/CM-1999
skwowet Feb 3, 2025
968f8e6
Merge branch 'main' into fix/CM-1999
skwowet Feb 3, 2025
dfd7bc2
fix linter
skwowet Feb 3, 2025
4cc5d80
Merge branch 'main' into fix/CM-1999
skwowet Feb 3, 2025
9acb673
rm tenantId column from indexedEntities
skwowet Feb 3, 2025
ed55301
Merge branch 'main' into fix/CM-1999
skwowet Feb 3, 2025
08d2b65
Merge branch 'main' into fix/CM-1999
skwowet Feb 7, 2025
be017ec
use pg reader instance for getMembersForSync
skwowet Feb 10, 2025
f96f5c6
use continueAsNew instead of looping
skwowet Feb 12, 2025
927bc33
rm index
skwowet Feb 12, 2025
fe15e3e
Merge branch 'main' into fix/CM-1999
skwowet Feb 17, 2025
079a1e8
optimize the db query and check
skwowet Feb 17, 2025
1151b8d
Add index
skwowet Feb 17, 2025
70589ec
rename migration
skwowet Feb 17, 2025
ca35595
Merge branch 'main' into fix/CM-1999
skwowet Feb 18, 2025
fe9e34b
Merge branch 'main' into fix/CM-1999
skwowet Feb 20, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
alter table indexed_entities drop column tenant_id;
20 changes: 0 additions & 20 deletions backend/src/services/searchSyncService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,16 +63,6 @@ export default class SearchSyncService extends LoggerBase {
}
}

async triggerTenantMembersSync(tenantId: string) {
const client = await this.getSearchSyncClient()

if (client instanceof SearchSyncApiClient || client instanceof SearchSyncWorkerEmitter) {
await client.triggerTenantMembersSync(tenantId)
} else {
throw new Error('Unexpected search client type!')
}
}

async triggerOrganizationMembersSync(tenantId: string, organizationId: string) {
const client = await this.getSearchSyncClient()

Expand Down Expand Up @@ -126,16 +116,6 @@ export default class SearchSyncService extends LoggerBase {
}
}

async triggerTenantOrganizationSync(tenantId: string) {
const client = await this.getSearchSyncClient()

if (client instanceof SearchSyncApiClient || client instanceof SearchSyncWorkerEmitter) {
await client.triggerTenantOrganizationSync(tenantId)
} else {
throw new Error('Unexpected search client type!')
}
}

async triggerRemoveOrganization(tenantId: string, organizationId: string) {
const client = await this.getSearchSyncClient()

Expand Down
9 changes: 9 additions & 0 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions services/apps/script_executor_worker/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@
"dependencies": {
"@crowd/archetype-standard": "workspace:*",
"@crowd/archetype-worker": "workspace:*",
"@crowd/common": "workspace:*",
"@crowd/data-access-layer": "workspace:*",
"@crowd/logging": "workspace:*",
"@crowd/opensearch": "workspace:*",
"@crowd/redis": "workspace:*",
"@crowd/types": "workspace:*",
"@temporalio/workflow": "~1.11.1",
Expand Down
10 changes: 10 additions & 0 deletions services/apps/script_executor_worker/src/activities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ import {
findMembersWithSamePlatformIdentitiesDifferentCapitalization,
findMembersWithSameVerifiedEmailsInDifferentPlatforms,
} from './activities/merge-members-with-similar-identities'
import {
deleteIndexedEntities,
getMembersForSync,
markEntitiesIndexed,
syncMembersBatch,
} from './activities/sync/member'

export {
findMembersWithSameVerifiedEmailsInDifferentPlatforms,
Expand All @@ -38,4 +44,8 @@ export {
updateOrganizationIdentity,
deleteOrganizationIdentity,
isLfxMember,
syncMembersBatch,
getMembersForSync,
deleteIndexedEntities,
markEntitiesIndexed,
}
62 changes: 62 additions & 0 deletions services/apps/script_executor_worker/src/activities/sync/member.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import { sumBy } from '@crowd/common'
import { DbStore } from '@crowd/data-access-layer/src/database'
import { MemberSyncService } from '@crowd/opensearch'
import { IndexedEntityType } from '@crowd/opensearch/src/repo/indexing.data'
import { IndexingRepository } from '@crowd/opensearch/src/repo/indexing.repo'
import { MemberRepository } from '@crowd/opensearch/src/repo/member.repo'

import { svc } from '../../main'

export async function deleteIndexedEntities(entityType: IndexedEntityType): Promise<void> {
const indexingRepo = new IndexingRepository(svc.postgres.writer, svc.log)
await indexingRepo.deleteIndexedEntities(entityType)
}

export async function markEntitiesIndexed(
entityType: IndexedEntityType,
entityIds: string[],
): Promise<void> {
const indexingRepo = new IndexingRepository(svc.postgres.writer, svc.log)
await indexingRepo.markEntitiesIndexed(entityType, entityIds)
}

export async function getMembersForSync(batchSize: number): Promise<string[]> {
const memberRepo = new MemberRepository(svc.redis, svc.postgres.reader, svc.log)
return memberRepo.getMembersForSync(batchSize)
}

export async function syncMembersBatch(
memberIds: string[],
withAggs: boolean,
chunkSize?: number,
): Promise<{ docCount: number; memberCount: number }> {
try {
const service = new MemberSyncService(
svc.redis,
svc.postgres.writer,
new DbStore(svc.log, svc.questdbSQL),
svc.opensearch,
svc.log,
)

const CHUNK_SIZE = chunkSize || 10

svc.log.info(`Syncing members in chunks of ${CHUNK_SIZE} members!`)

const results = []
for (let i = 0; i < memberIds.length; i += CHUNK_SIZE) {
const chunk = memberIds.slice(i, i + CHUNK_SIZE)
const chunkResults = await Promise.all(
chunk.map((memberId) => service.syncMembers(memberId, { withAggs })),
)
results.push(...chunkResults)
}

return {
docCount: sumBy(results, (r) => r.documentsIndexed),
memberCount: sumBy(results, (r) => r.membersSynced),
}
} catch (err) {
throw new Error(err)
}
}
4 changes: 2 additions & 2 deletions services/apps/script_executor_worker/src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ const config: Config = {
enabled: true,
},
questdb: {
enabled: false,
enabled: true,
},
redis: {
enabled: true,
Expand All @@ -22,7 +22,7 @@ const options: Options = {
enabled: true,
},
opensearch: {
enabled: false,
enabled: true,
},
}

Expand Down
8 changes: 8 additions & 0 deletions services/apps/script_executor_worker/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,11 @@ export interface IFixOrgIdentitiesWithWrongUrlsArgs {
tenantId: string
testRun?: boolean
}

export interface ISyncMembersArgs {
batchSize?: number
chunkSize?: number
clean?: boolean
withAggs?: boolean
testRun?: boolean
}
2 changes: 2 additions & 0 deletions services/apps/script_executor_worker/src/workflows.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ import { dissectMember } from './workflows/dissectMember'
import { findAndMergeMembersWithSamePlatformIdentitiesDifferentCapitalization } from './workflows/findAndMergeMembersWithSamePlatformIdentitiesDifferentCapitalization'
import { findAndMergeMembersWithSameVerifiedEmailsInDifferentPlatforms } from './workflows/findAndMergeMembersWithSameVerifiedEmailsInDifferentPlatforms'
import { fixOrgIdentitiesWithWrongUrls } from './workflows/fixOrgIdentitiesWithWrongUrls'
import { syncMembers } from './workflows/syncMembers'

export {
findAndMergeMembersWithSameVerifiedEmailsInDifferentPlatforms,
findAndMergeMembersWithSamePlatformIdentitiesDifferentCapitalization,
dissectMember,
fixOrgIdentitiesWithWrongUrls,
syncMembers,
}
61 changes: 61 additions & 0 deletions services/apps/script_executor_worker/src/workflows/syncMembers.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import { proxyActivities } from '@temporalio/workflow'

import { IndexedEntityType } from '@crowd/opensearch/src/repo/indexing.data'

import * as activities from '../activities/sync/member'
import { ISyncMembersArgs } from '../types'

const activity = proxyActivities<typeof activities>({
startToCloseTimeout: '30 minute',
retry: { maximumAttempts: 3, backoffCoefficient: 3 },
})

export async function syncMembers(args: ISyncMembersArgs): Promise<void> {
const BATCH_SIZE = args.batchSize || 100
const WITH_AGGS = args.withAggs || true

console.log(`Starting members sync! (batchSize: ${BATCH_SIZE}, withAggs: ${WITH_AGGS})`)

if (args.clean) {
await activity.deleteIndexedEntities(IndexedEntityType.MEMBER)
console.log('Deleted indexed entities for members!')
}

let totalMembersSynced = 0
let totalDocumentsIndexed = 0

let memberIds = await activity.getMembersForSync(BATCH_SIZE)

while (memberIds.length > 0) {
const batchStartTime = new Date()
const { docCount, memberCount } = await activity.syncMembersBatch(
memberIds,
WITH_AGGS,
args.chunkSize,
)

totalMembersSynced += memberCount
totalDocumentsIndexed += docCount

const diffInSeconds = (new Date().getTime() - batchStartTime.getTime()) / 1000

console.log(
`Synced ${memberCount} members! Speed: ${Math.round(
memberCount / diffInSeconds,
)} members/second!`,
)

await activity.markEntitiesIndexed(IndexedEntityType.MEMBER, memberIds)

if (args.testRun) {
console.log('Test run completed - stopping after first batch!')
break
}

memberIds = await activity.getMembersForSync(BATCH_SIZE)
}

console.log(
`Synced total of ${totalMembersSynced} members with ${totalDocumentsIndexed} documents!`,
)
}
17 changes: 0 additions & 17 deletions services/apps/search_sync_api/src/routes/member.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,23 +27,6 @@ router.post(
}),
)

router.post(
'/sync/tenant/members',
asyncWrap(async (req: ApiRequest, res) => {
const memberSyncService = syncService(req)

const { tenantId } = req.body
try {
req.log.trace(`Calling memberSyncService.syncTenantMembers for tenant ${tenantId}`)
await memberSyncService.syncTenantMembers(tenantId)
res.sendStatus(200)
} catch (error) {
req.log.error(error)
res.status(500).send(error.message)
}
}),
)

router.post(
'/sync/organization/members',
asyncWrap(async (req: ApiRequest, res) => {
Expand Down
19 changes: 0 additions & 19 deletions services/apps/search_sync_api/src/routes/organization.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,25 +28,6 @@ router.post(
}),
)

router.post(
'/sync/tenant/organizations',
asyncWrap(async (req: ApiRequest, res) => {
const organizationSyncService = syncService(req)

const { tenantId } = req.body
try {
req.log.trace(
`Calling organizationSyncService.syncTenantOrganizations for tenant ${tenantId}`,
)
await organizationSyncService.syncTenantOrganizations(tenantId)
res.sendStatus(200)
} catch (error) {
req.log.error(error)
res.status(500).send(error.message)
}
}),
)

router.post(
'/cleanup/tenant/organizations',
asyncWrap(async (req: ApiRequest, res) => {
Expand Down
40 changes: 6 additions & 34 deletions services/apps/search_sync_worker/src/bin/sync-all-members.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
import { timeout } from '@crowd/common'
import { DbStore, getDbConnection } from '@crowd/data-access-layer/src/database'
import { MemberRepository } from '@crowd/data-access-layer/src/old/apps/search_sync_worker/member.repo'
import { getServiceLogger } from '@crowd/logging'
import { MemberSyncService, OpenSearchService, getOpensearchClient } from '@crowd/opensearch'
import { IndexedEntityType } from '@crowd/opensearch/src/repo/indexing.data'
Expand All @@ -14,8 +12,6 @@ const log = getServiceLogger()

const processArguments = process.argv.slice(2)

const MAX_CONCURRENT = 3

setImmediate(async () => {
const osClient = await getOpensearchClient(OPENSEARCH_CONFIG())
const openSearchService = new OpenSearchService(log, osClient)
Expand All @@ -35,42 +31,18 @@ setImmediate(async () => {
withAggs = false
}

const repo = new MemberRepository(store, log)

const tenantIds = await repo.getTenantIds()
const qdbConn = await getClientSQL()
const qdbStore = new DbStore(log, qdbConn)

const service = new MemberSyncService(redis, store, qdbStore, openSearchService, log)

let current = 0
for (let i = 0; i < tenantIds.length; i++) {
const tenantId = tenantIds[i]

while (current == MAX_CONCURRENT) {
await timeout(1000)
}

log.info(`Processing tenant ${i + 1}/${tenantIds.length}`)
current += 1
service
.syncTenantMembers(tenantId, 500, { withAggs })
.then(() => {
current--
log.info(`Processed tenant ${i + 1}/${tenantIds.length}`)
})
.catch((err) => {
current--
log.error(
err,
{ tenantId },
`Error syncing members for tenant ${i + 1}/${tenantIds.length}!`,
)
})
}
log.info('Starting sync of all members')

while (current > 0) {
await timeout(1000)
try {
await service.syncAllMembers(500, { withAggs })
log.info('Successfully synced all members')
} catch (err) {
log.error(err, 'Error syncing members!')
}

process.exit(0)
Expand Down
Loading