Skip to content

Commit 14f33d4

Browse files
committed
restructure and fix merge-conflicts
1 parent df41e03 commit 14f33d4

File tree

9 files changed

+39
-83
lines changed

9 files changed

+39
-83
lines changed

services/apps/script_executor_worker/src/activities.ts

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,10 @@
11
import { excludeEntityFromCleanup, hasActivityRecords } from './activities/cleanup/helpers'
22
import { deleteMember, getMembersToCleanup } from './activities/cleanup/member'
3-
import {
4-
deleteOrganization,
5-
getOrganizationsToCleanup,
6-
queueOrgForAggComputation,
7-
} from './activities/cleanup/organization'
3+
import { deleteOrganization, getOrganizationsToCleanup } from './activities/cleanup/organization'
84
import {
95
mergeMembers,
106
mergeOrganizations,
7+
queueOrganizationForAggComputation,
118
unmergeMembers,
129
unmergeMembersPreview,
1310
waitForTemporalWorkflowExecutionFinish,
@@ -25,13 +22,10 @@ import {
2522
findMemberMergeActions,
2623
} from './activities/dissect-member'
2724
import {
28-
addOrgIdToRedisCache,
2925
calculateMemberAffiliations,
30-
copyActivitiesFromPgToQuestDb,
3126
deleteProcessedMemberOrgAffiliations,
3227
getMembersWithDeletedOrgAffilations,
3328
getProcessedMemberOrgAffiliations,
34-
hasActivityInQuestDb,
3529
markMemberOrgAffiliationAsProcessed,
3630
} from './activities/fix-deleted-member-org-affilations'
3731
import {
@@ -86,19 +80,16 @@ export {
8680
excludeEntityFromCleanup,
8781
getOrganizationsToCleanup,
8882
hasActivityRecords,
89-
queueOrgForAggComputation,
9083
syncMembersBatch,
9184
getMembersForSync,
9285
getOrganizationsForSync,
9386
syncOrganizationsBatch,
9487
deleteIndexedEntities,
9588
markEntitiesIndexed,
89+
queueOrganizationForAggComputation,
9690
getActivitiesToCopyToTinybird,
9791
markActivitiesAsIndexedForSyncingActivitiesToTinybird,
9892
getMembersWithDeletedOrgAffilations,
99-
hasActivityInQuestDb,
100-
copyActivitiesFromPgToQuestDb,
101-
addOrgIdToRedisCache,
10293
calculateMemberAffiliations,
10394
markMemberOrgAffiliationAsProcessed,
10495
getProcessedMemberOrgAffiliations,

services/apps/script_executor_worker/src/activities/cleanup/organization.ts

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,3 @@ export async function deleteOrganization(orgId: string): Promise<void> {
2121
throw error
2222
}
2323
}
24-
25-
export async function queueOrgForAggComputation(orgId: string): Promise<void> {
26-
try {
27-
await svc.redis.sAdd('organizationIdsForAggComputation', orgId)
28-
} catch (error) {
29-
svc.log.error(error, 'Error adding organization to redis set!')
30-
throw error
31-
}
32-
}

services/apps/script_executor_worker/src/activities/common.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,3 +134,12 @@ export function timeout(ms: number, workflowId: string): Promise<void> {
134134
}, ms)
135135
})
136136
}
137+
138+
export async function queueOrganizationForAggComputation(orgId: string): Promise<void> {
139+
try {
140+
await svc.redis.sAdd('organizationIdsForAggComputation', orgId)
141+
} catch (error) {
142+
svc.log.error(error, 'Error adding organization to redis set!')
143+
throw error
144+
}
145+
}

services/apps/script_executor_worker/src/activities/fix-deleted-member-org-affilations/index.ts

Lines changed: 0 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,4 @@
1-
import { insertActivities } from '@crowd/data-access-layer'
2-
import { IDbActivity } from '@crowd/data-access-layer/src/old/apps/data_sink_worker/repo/activity.data'
31
import { runMemberAffiliationsUpdate } from '@crowd/data-access-layer/src/old/apps/profiles_worker'
4-
import ActivityRepository from '@crowd/data-access-layer/src/old/apps/script_executor_worker/activity.repo'
52
import MemberRepository from '@crowd/data-access-layer/src/old/apps/script_executor_worker/member.repo'
63
import TempRepository from '@crowd/data-access-layer/src/old/apps/script_executor_worker/temp.repo'
74

@@ -18,36 +15,6 @@ export async function getMembersWithDeletedOrgAffilations(
1815
}
1916
}
2017

21-
export async function hasActivityInQuestDb(
22-
memberId: string,
23-
organizationId: string,
24-
): Promise<boolean> {
25-
const repo = new ActivityRepository(svc.postgres.reader.connection(), svc.log, svc.questdbSQL)
26-
return repo.hasActivity(memberId, organizationId)
27-
}
28-
29-
export async function copyActivitiesFromPgToQuestDb(
30-
memberId: string,
31-
orgId: string,
32-
): Promise<void> {
33-
let offset = 0
34-
let activities: IDbActivity[] = []
35-
36-
const repo = new ActivityRepository(svc.postgres.reader.connection(), svc.log, svc.questdbSQL)
37-
38-
// Get first batch
39-
activities = await repo.findActivitiesPg(memberId, orgId, { offset: 0 })
40-
41-
while (activities.length > 0) {
42-
// Insert current batch
43-
await insertActivities(svc.queue, activities)
44-
45-
// Move to next batch
46-
offset += activities.length
47-
activities = await repo.findActivitiesPg(memberId, orgId, { offset })
48-
}
49-
}
50-
5118
export async function calculateMemberAffiliations(memberId: string): Promise<void> {
5219
try {
5320
await runMemberAffiliationsUpdate(svc.postgres.writer, svc.questdbSQL, svc.queue, memberId)
@@ -56,10 +23,6 @@ export async function calculateMemberAffiliations(memberId: string): Promise<voi
5623
}
5724
}
5825

59-
export async function addOrgIdToRedisCache(orgId: string): Promise<void> {
60-
await svc.redis.sAdd('organizationIdsForAggComputation', orgId)
61-
}
62-
6326
export async function markMemberOrgAffiliationAsProcessed(
6427
memberId: string,
6528
organizationId: string,

services/apps/script_executor_worker/src/workflows.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@ import { findAndMergeMembersWithSameVerifiedEmailsInDifferentPlatforms } from '.
77
import { fixDeletedMemberOrgAffilations } from './workflows/fixDeletedMemberOrgAffilations'
88
import { fixOrgIdentitiesWithWrongUrls } from './workflows/fixOrgIdentitiesWithWrongUrls'
99
import { populateActivityRelations } from './workflows/populateActivityRelations'
10-
import { syncMembers } from './workflows/syncMembers'
11-
import { syncOrganizations } from './workflows/syncOrganizations'
10+
import { syncMembers } from './workflows/sync/members'
11+
import { syncOrganizations } from './workflows/sync/organizations'
1212

1313
export {
1414
findAndMergeMembersWithSameVerifiedEmailsInDifferentPlatforms,

services/apps/script_executor_worker/src/workflows/cleanup/organizations.ts

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,11 @@ import { continueAsNew, proxyActivities } from '@temporalio/workflow'
33
import { EntityType } from '@crowd/data-access-layer/src/old/apps/script_executor_worker/types'
44

55
import * as cleanupHelpers from '../../activities/cleanup/helpers'
6-
import * as activities from '../../activities/cleanup/organization'
6+
import * as cleanupActivities from '../../activities/cleanup/organization'
7+
import * as commonActivities from '../../activities/common'
78
import { ICleanupArgs } from '../../types'
89

9-
const activity = proxyActivities<typeof activities>({
10+
const cleanupActivity = proxyActivities<typeof cleanupActivities>({
1011
startToCloseTimeout: '30 minutes',
1112
retry: { maximumAttempts: 3, backoffCoefficient: 3 },
1213
})
@@ -16,10 +17,15 @@ const cleanupHelper = proxyActivities<typeof cleanupHelpers>({
1617
retry: { maximumAttempts: 3, backoffCoefficient: 3 },
1718
})
1819

20+
const commonActivity = proxyActivities<typeof commonActivities>({
21+
startToCloseTimeout: '30 minutes',
22+
retry: { maximumAttempts: 3, backoffCoefficient: 3 },
23+
})
24+
1925
export async function cleanupOrganizations(args: ICleanupArgs): Promise<void> {
2026
const BATCH_SIZE = args.batchSize ?? 100
2127

22-
const organizationIds = await activity.getOrganizationsToCleanup(BATCH_SIZE)
28+
const organizationIds = await cleanupActivity.getOrganizationsToCleanup(BATCH_SIZE)
2329

2430
if (organizationIds.length === 0) {
2531
console.log('No more organizations to cleanup!')
@@ -41,8 +47,8 @@ export async function cleanupOrganizations(args: ICleanupArgs): Promise<void> {
4147

4248
console.log(`Deleting organization ${orgId} from database!`)
4349

44-
await activity.deleteOrganization(orgId)
45-
return activity.queueOrgForAggComputation(orgId)
50+
await cleanupActivity.deleteOrganization(orgId)
51+
return commonActivity.queueOrganizationForAggComputation(orgId)
4652
})
4753

4854
await Promise.all(cleanupTasks)

services/apps/script_executor_worker/src/workflows/fixDeletedMemberOrgAffilations.ts

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { continueAsNew, proxyActivities } from '@temporalio/workflow'
22

3+
import * as commonActivities from '../activities/common'
34
import * as activities from '../activities/fix-deleted-member-org-affilations'
45
import * as syncActivities from '../activities/sync/member'
56
import { IFixDeletedMemberOrgAffilationsArgs } from '../types'
@@ -14,6 +15,11 @@ const syncActivity = proxyActivities<typeof syncActivities>({
1415
retry: { maximumAttempts: 3, backoffCoefficient: 3 },
1516
})
1617

18+
const commonActivity = proxyActivities<typeof commonActivities>({
19+
startToCloseTimeout: '30 minutes',
20+
retry: { maximumAttempts: 3, backoffCoefficient: 3 },
21+
})
22+
1723
export async function fixDeletedMemberOrgAffilations(
1824
args: IFixDeletedMemberOrgAffilationsArgs,
1925
): Promise<void> {
@@ -23,7 +29,7 @@ export async function fixDeletedMemberOrgAffilations(
2329
`Fixing deleted member org affiliations with params: testRun=${args.testRun}, batchSize=${args.batchSize}`,
2430
)
2531

26-
// 1. Find affected memberId and orgId
32+
// Find affected memberId and orgId
2733
const affectedMembers = await activity.getProcessedMemberOrgAffiliations(BATCH_SIZE)
2834

2935
if (affectedMembers.length === 0) {
@@ -36,16 +42,6 @@ export async function fixDeletedMemberOrgAffilations(
3642
const chunk = affectedMembers.slice(i, i + CHUNK_SIZE)
3743
await Promise.all(
3844
chunk.map(async ({ memberId, organizationId }) => {
39-
// 2. Check if they have activity in questDb
40-
// const hasActivity = await activity.hasActivityInQuestDb(memberId, organizationId)
41-
42-
// 2.1 If no activities found, we need to get and insert them
43-
// if (!hasActivity) {
44-
// console.log(`Copying activities for member ${memberId} and org ${organizationId}`)
45-
46-
// await activity.copyActivitiesFromPgToQuestDb(memberId, organizationId)
47-
// }
48-
4945
// Calculate affiliation
5046
await activity.calculateMemberAffiliations(memberId)
5147

@@ -54,7 +50,7 @@ export async function fixDeletedMemberOrgAffilations(
5450

5551
// Add orgId to redisCache
5652
// It will be picked up by the spawnOrganizationAggregatesComputation workflow
57-
await activity.addOrgIdToRedisCache(organizationId)
53+
await commonActivity.queueOrganizationForAggComputation(organizationId)
5854

5955
await activity.deleteProcessedMemberOrgAffiliations(memberId, organizationId)
6056
}),

services/apps/script_executor_worker/src/workflows/syncMembers.ts renamed to services/apps/script_executor_worker/src/workflows/sync/members.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@ import { continueAsNew, proxyActivities } from '@temporalio/workflow'
22

33
import { IndexedEntityType } from '@crowd/opensearch/src/repo/indexing.data'
44

5-
import * as entityIndexActivities from '../activities/sync/entity-index'
6-
import * as memberSyncActivities from '../activities/sync/member'
7-
import { ISyncArgs } from '../types'
5+
import * as entityIndexActivities from '../../activities/sync/entity-index'
6+
import * as memberSyncActivities from '../../activities/sync/member'
7+
import { ISyncArgs } from '../../types'
88

99
const memberSyncActivity = proxyActivities<typeof memberSyncActivities>({
1010
startToCloseTimeout: '30 minutes',

services/apps/script_executor_worker/src/workflows/syncOrganizations.ts renamed to services/apps/script_executor_worker/src/workflows/sync/organizations.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@ import { continueAsNew, proxyActivities } from '@temporalio/workflow'
22

33
import { IndexedEntityType } from '@crowd/opensearch/src/repo/indexing.data'
44

5-
import * as entityIndexActivities from '../activities/sync/entity-index'
6-
import * as orgSyncActivities from '../activities/sync/organization'
7-
import { ISyncArgs } from '../types'
5+
import * as entityIndexActivities from '../../activities/sync/entity-index'
6+
import * as orgSyncActivities from '../../activities/sync/organization'
7+
import { ISyncArgs } from '../../types'
88

99
const orgSyncActivity = proxyActivities<typeof orgSyncActivities>({
1010
startToCloseTimeout: '30 minutes',

0 commit comments

Comments
 (0)