Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions services/apps/script_executor_worker/src/activities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import {
findMemberIdentitiesGroupedByPlatform,
findMemberMergeActions,
} from './activities/dissect-member'
import { calculateMemberAffiliations, getSegmentMembers } from './activities/fix-member-affilations'
import {
deleteOrganizationIdentity,
findOrganizationIdentity,
Expand Down Expand Up @@ -60,4 +61,6 @@ export {
getMembersForSync,
deleteIndexedEntities,
markEntitiesIndexed,
getSegmentMembers,
calculateMemberAffiliations,
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import { runMemberAffiliationsUpdate } from '@crowd/data-access-layer/src/old/apps/profiles_worker'
import MemberRepository from '@crowd/data-access-layer/src/old/apps/script_executor_worker/member.repo'

import { svc } from '../../main'

export async function getSegmentMembers(
segmentId: string,
limit: number,
offset: number,
): Promise<string[]> {
const memberRepo = new MemberRepository(svc.postgres.reader.connection(), svc.log)
return memberRepo.getSegmentMembers(segmentId, limit, offset)
}

export async function calculateMemberAffiliations(memberId: string): Promise<void> {
try {
await runMemberAffiliationsUpdate(svc.postgres.writer, svc.questdbSQL, svc.queue, memberId)
} catch (err) {
throw new Error(err)
}
}
7 changes: 7 additions & 0 deletions services/apps/script_executor_worker/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,10 @@ export interface ISyncMembersArgs {
withAggs?: boolean
testRun?: boolean
}

export interface IFixMemberAffilationsArgs {
segmentId: string
limit?: number
offset?: number
testRun?: boolean
}
2 changes: 2 additions & 0 deletions services/apps/script_executor_worker/src/workflows.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { dissectMember } from './workflows/dissectMember'
import { findAndMergeMembersWithSamePlatformIdentitiesDifferentCapitalization } from './workflows/findAndMergeMembersWithSamePlatformIdentitiesDifferentCapitalization'
import { findAndMergeMembersWithSameVerifiedEmailsInDifferentPlatforms } from './workflows/findAndMergeMembersWithSameVerifiedEmailsInDifferentPlatforms'
import { fixMemberAffilations } from './workflows/fixMemberAffilations'
import { fixOrgIdentitiesWithWrongUrls } from './workflows/fixOrgIdentitiesWithWrongUrls'
import { populateActivityRelations } from './workflows/populateActivityRelations'
import { syncMembers } from './workflows/syncMembers'
Expand All @@ -12,4 +13,5 @@ export {
fixOrgIdentitiesWithWrongUrls,
populateActivityRelations,
syncMembers,
fixMemberAffilations,
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import { continueAsNew, proxyActivities } from '@temporalio/workflow'

import * as activities from '../activities/fix-member-affilations'
import * as syncActivities from '../activities/sync/member'
import { IFixMemberAffilationsArgs } from '../types'

const activity = proxyActivities<typeof activities>({
startToCloseTimeout: '30 minutes',
retry: { maximumAttempts: 3, backoffCoefficient: 3 },
})

const syncActivity = proxyActivities<typeof syncActivities>({
startToCloseTimeout: '30 minutes',
retry: { maximumAttempts: 3, backoffCoefficient: 3 },
})

export async function fixMemberAffilations(args: IFixMemberAffilationsArgs): Promise<void> {
const limit = args.limit ?? 200
const offset = args.offset ?? 0

console.log(
`Fixing member affiliations for segment ${args.segmentId} with limit ${limit} and offset ${offset}`,
)

const memberIds = await activity.getSegmentMembers(args.segmentId, limit, offset)

if (memberIds.length === 0) {
console.log('No more members to fix!')
return
}

for (const memberId of memberIds) {
// 1. Calculate the affiliations
await activity.calculateMemberAffiliations(memberId)
// 2. Resync the member
await syncActivity.syncMembersBatch([memberId], true)
}

if (args.testRun) {
console.log('Test run completed - stopping after first batch!')
return
}

await continueAsNew<typeof fixMemberAffilations>({
...args,
offset: offset + limit,
})
}
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,26 @@ class MemberRepository {

return results.map((r) => r.id)
}

public async getSegmentMembers(segmentId: string, limit = 200, offset = 0): Promise<string[]> {
const results = await this.connection.query(
`
select distinct "memberId"
from activities
where "segmentId" = $(segmentId)
order by "memberId"
limit $(limit)
offset $(offset)
`,
{
segmentId,
limit,
offset,
},
)

return results.map((r) => r.memberId)
}
}

export default MemberRepository
Loading