Skip to content

Commit 6c82419

Browse files
committed
Merge branch 'main' of https://github.com/CrowdDotDev/crowd.dev into CM-2082
2 parents 089c460 + 923573d commit 6c82419

File tree

10 files changed

+141
-96
lines changed

10 files changed

+141
-96
lines changed

services/apps/cache_worker/src/activities.ts

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import {
22
checkOrganizationExists,
33
dropOrgIdFromRedis,
4-
getOrgIdsFromRedis,
4+
getOrganizationIdsFromRedis,
55
syncOrganization,
66
} from './activities/computeAggs/organization'
77
import {
@@ -44,7 +44,7 @@ export {
4444
getActivePlatforms,
4545
findNewActivityPlatforms,
4646
updateMemberMergeSuggestionsLastGeneratedAt,
47-
getOrgIdsFromRedis,
47+
getOrganizationIdsFromRedis,
4848
dropOrgIdFromRedis,
4949
checkOrganizationExists,
5050
syncOrganization,

services/apps/cache_worker/src/activities/computeAggs/organization.ts

+12-8
Original file line numberDiff line numberDiff line change
@@ -5,18 +5,22 @@ import { OrganizationSyncService } from '@crowd/opensearch'
55
import { svc } from '../../main'
66

77
interface IScanResult {
8-
cursor: string
98
organizationIds: string[]
9+
totalSize: number
1010
}
1111

12-
export async function getOrgIdsFromRedis(cursor = '0', count = 100): Promise<IScanResult> {
12+
export async function getOrganizationIdsFromRedis(batchSize = 100): Promise<IScanResult> {
1313
try {
14-
const result = await svc.redis.sScan('organizationIdsForAggComputation', Number(cursor), {
15-
COUNT: count,
16-
})
17-
return { organizationIds: result.members, cursor: result.cursor.toString() }
14+
const totalSize = await svc.redis.sCard('organizationIdsForAggComputation')
15+
const organizationIds = await svc.redis.sendCommand<string[]>([
16+
'SRANDMEMBER',
17+
'organizationIdsForAggComputation',
18+
batchSize.toString(),
19+
])
20+
21+
return { organizationIds, totalSize }
1822
} catch (e) {
19-
this.log.error(e, 'Failed to get organization IDs from Redis!')
23+
svc.log.error(e, 'Failed to get organization IDs from Redis!')
2024
throw e
2125
}
2226
}
@@ -33,7 +37,7 @@ export async function checkOrganizationExists(orgId: string): Promise<boolean> {
3337
const results = await repo.checkOrganizationsExists([orgId])
3438
exists = results.length > 0
3539
} catch (e) {
36-
this.log.error(e, 'Failed to check if organization exists!')
40+
svc.log.error(e, 'Failed to check if organization exists!')
3741
}
3842

3943
return exists

services/apps/cache_worker/src/schedules/computeOrgAggsDaily.ts

+3-4
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
import { ScheduleAlreadyRunning, ScheduleOverlapPolicy } from '@temporalio/client'
22

33
import { svc } from '../main'
4-
import { dailyGetAndComputeOrgAggs } from '../workflows'
4+
import { spawnOrganizationAggregatesComputation } from '../workflows'
55

66
export const scheduleComputeOrgAggsDaily = async () => {
77
try {
88
await svc.temporal.schedule.create({
9-
scheduleId: 'compute-org-aggs-daily',
9+
scheduleId: 'computeOrgAggsDaily',
1010
spec: {
1111
cronExpressions: ['0 8 * * *'],
1212
},
@@ -16,9 +16,8 @@ export const scheduleComputeOrgAggsDaily = async () => {
1616
},
1717
action: {
1818
type: 'startWorkflow',
19-
workflowType: dailyGetAndComputeOrgAggs,
19+
workflowType: spawnOrganizationAggregatesComputation,
2020
taskQueue: 'cache',
21-
workflowExecutionTimeout: '5 minutes',
2221
},
2322
})
2423
} catch (err) {
+4-4
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
1-
import { computeOrgAggsAndUpdate } from './workflows/compute-orgs-agg/computeOrgAggsAndUpdate'
2-
import { dailyGetAndComputeOrgAggs } from './workflows/compute-orgs-agg/getAndComputeOrgAggs'
1+
import { processOrganizationAggregates } from './workflows/processOrganizationAggregates'
32
import { refreshDashboardCache } from './workflows/refreshDashboardCache'
43
import { spawnDashboardCacheRefresh } from './workflows/spawnDashboardCacheRefresh'
4+
import { spawnOrganizationAggregatesComputation } from './workflows/spawnOrganizationAggregatesComputation'
55

66
export {
77
spawnDashboardCacheRefresh,
88
refreshDashboardCache,
9-
dailyGetAndComputeOrgAggs,
10-
computeOrgAggsAndUpdate,
9+
spawnOrganizationAggregatesComputation,
10+
processOrganizationAggregates,
1111
}

services/apps/cache_worker/src/workflows/compute-orgs-agg/getAndComputeOrgAggs.ts

-50
This file was deleted.

services/apps/cache_worker/src/workflows/compute-orgs-agg/computeOrgAggsAndUpdate.ts services/apps/cache_worker/src/workflows/processOrganizationAggregates.ts

+4-4
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
import { proxyActivities } from '@temporalio/workflow'
22

3-
import * as activities from '../../activities/computeAggs/organization'
4-
import { IProcessComputeOrgAggs } from '../../types'
3+
import * as activities from '../activities/computeAggs/organization'
4+
import { IProcessComputeOrgAggs } from '../types'
55

6-
const activity = proxyActivities<typeof activities>({ startToCloseTimeout: '10 minutes' })
6+
const activity = proxyActivities<typeof activities>({ startToCloseTimeout: '30 minutes' })
77

8-
export async function computeOrgAggsAndUpdate(args: IProcessComputeOrgAggs): Promise<void> {
8+
export async function processOrganizationAggregates(args: IProcessComputeOrgAggs): Promise<void> {
99
const orgId = args.organizationId
1010

1111
const orgExists = await activity.checkOrganizationExists(orgId)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
import {
2+
ChildWorkflowCancellationType,
3+
ParentClosePolicy,
4+
continueAsNew,
5+
executeChild,
6+
proxyActivities,
7+
workflowInfo,
8+
} from '@temporalio/workflow'
9+
10+
import * as activities from '../activities/computeAggs/organization'
11+
12+
import { processOrganizationAggregates } from './processOrganizationAggregates'
13+
14+
const activity = proxyActivities<typeof activities>({ startToCloseTimeout: '15 minutes' })
15+
16+
/*
17+
spawnOrganizationAggregatesComputation is a Temporal workflow that:
18+
- [Activity]: Get organization IDs from Redis.
19+
- [Child Workflow]: Re-compute and update aggregates for each organization
20+
in batches of 100. Child workflows run independently and won't be
21+
cancelled if the parent workflow stops.
22+
*/
23+
export async function spawnOrganizationAggregatesComputation(): Promise<void> {
24+
const { organizationIds, totalSize } = await activity.getOrganizationIdsFromRedis()
25+
26+
if (!totalSize) {
27+
console.log('No organizations found - finishing workflow!')
28+
return
29+
}
30+
31+
console.log(`Found ${totalSize} organizations for aggs computation!`)
32+
33+
const info = workflowInfo()
34+
35+
await Promise.all(
36+
organizationIds.map((organizationId) => {
37+
return executeChild(processOrganizationAggregates, {
38+
workflowId: `${info.workflowId}/${organizationId}`,
39+
cancellationType: ChildWorkflowCancellationType.ABANDON,
40+
parentClosePolicy: ParentClosePolicy.PARENT_CLOSE_POLICY_ABANDON,
41+
retry: {
42+
backoffCoefficient: 2,
43+
initialInterval: 2 * 1000,
44+
maximumInterval: 30 * 1000,
45+
maximumAttempts: 3,
46+
},
47+
args: [{ organizationId }],
48+
})
49+
}),
50+
)
51+
52+
// Continue with the next batch
53+
await continueAsNew<typeof spawnOrganizationAggregatesComputation>()
54+
}

services/apps/script_executor_worker/src/activities/populate-activity-relations/index.ts

+25-16
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1+
import { partition } from '@crowd/common'
12
import {
3+
IActivityRelationsCreateData,
24
createOrUpdateRelations,
35
getActivityRelationsSortedByTimestamp,
46
} from '@crowd/data-access-layer'
@@ -49,23 +51,30 @@ export async function getActivitiesToCopy(latestSyncedActivityTimestamp: string,
4951
}
5052

5153
export async function createRelations(activitiesRedisKey): Promise<void> {
52-
const activities = await getActivitiyDataFromRedis(activitiesRedisKey)
53-
const promises = activities.map(async (activity) =>
54-
createOrUpdateRelations(pgpQx(svc.postgres.writer.connection()), {
55-
activityId: activity.id,
56-
memberId: activity.memberId,
57-
platform: activity.platform,
58-
segmentId: activity.segmentId,
59-
username: activity.username,
60-
conversationId: activity.conversationId,
61-
objectMemberId: activity.objectMemberId,
62-
objectMemberUsername: activity.objectMemberUsername,
63-
organizationId: activity.organizationId,
64-
parentId: activity.parentId,
65-
}),
66-
)
54+
const activities: IActivityRelationsCreateData[] =
55+
await getActivitiyDataFromRedis(activitiesRedisKey)
56+
57+
const chunkSize = 1000
58+
const activityChunks = partition(activities, chunkSize)
6759

68-
await Promise.all(promises)
60+
for (const chunk of activityChunks) {
61+
const promises = chunk.map((activity) =>
62+
createOrUpdateRelations(pgpQx(svc.postgres.writer.connection()), {
63+
activityId: activity.id,
64+
memberId: activity.memberId,
65+
platform: activity.platform,
66+
segmentId: activity.segmentId,
67+
username: activity.username,
68+
conversationId: activity.conversationId,
69+
objectMemberId: activity.objectMemberId,
70+
objectMemberUsername: activity.objectMemberUsername,
71+
organizationId: activity.organizationId,
72+
parentId: activity.parentId,
73+
}),
74+
)
75+
76+
await Promise.all(promises)
77+
}
6978
}
7079

7180
export async function saveActivityDataToRedis(key: string, activities): Promise<void> {

services/apps/script_executor_worker/src/workflows/populateActivityRelations.ts

+21-7
Original file line numberDiff line numberDiff line change
@@ -22,20 +22,34 @@ export async function populateActivityRelations(
2222
args.latestSyncedActivityTimestamp || (await activity.getLatestSyncedActivityTimestamp())
2323
}
2424

25-
const { activitiesLength, activitiesRedisKey, lastTimestamp } =
26-
await activity.getActivitiesToCopy(
27-
latestSyncedActivityTimestamp ?? undefined,
28-
BATCH_SIZE_PER_RUN,
29-
)
25+
let { activitiesLength, activitiesRedisKey, lastTimestamp } = await activity.getActivitiesToCopy(
26+
latestSyncedActivityTimestamp ?? undefined,
27+
BATCH_SIZE_PER_RUN,
28+
)
3029

3130
if (activitiesLength === 0) {
3231
return
3332
}
3433

35-
if (activitiesLength < BATCH_SIZE_PER_RUN) {
36-
if (lastTimestamp === args.latestSyncedActivityTimestamp) {
34+
if (lastTimestamp === args.latestSyncedActivityTimestamp) {
35+
if (activitiesLength < BATCH_SIZE_PER_RUN) {
3736
return
3837
}
38+
39+
// All activities returned have the same timestamp, we need a bigger batch size
40+
// then the default one to pass this point
41+
let batchSizeMultiplier = 2
42+
43+
while (lastTimestamp === args.latestSyncedActivityTimestamp) {
44+
const result = await activity.getActivitiesToCopy(
45+
lastTimestamp,
46+
BATCH_SIZE_PER_RUN * batchSizeMultiplier,
47+
)
48+
activitiesLength = result.activitiesLength
49+
activitiesRedisKey = result.activitiesRedisKey
50+
lastTimestamp = result.lastTimestamp
51+
batchSizeMultiplier += 1
52+
}
3953
}
4054

4155
await activity.createRelations(activitiesRedisKey)

services/libs/data-access-layer/src/activities/sql.ts

+16-1
Original file line numberDiff line numberDiff line change
@@ -1741,11 +1741,26 @@ export async function moveActivityRelationsToAnotherOrganization(
17411741
} while (rowsUpdated === batchSize)
17421742
}
17431743

1744+
export interface IActivityRelationsCreateData {
1745+
id: string
1746+
memberId: string
1747+
timestamp: string
1748+
createdAt: string
1749+
objectMemberId?: string
1750+
organizationId?: string
1751+
conversationId?: string
1752+
parentId?: string
1753+
segmentId: string
1754+
platform: string
1755+
username: string
1756+
objectMemberUsername?: string
1757+
}
1758+
17441759
export async function getActivityRelationsSortedByTimestamp(
17451760
qdbConn: DbConnOrTx,
17461761
cursorActivityTimestamp?: string,
17471762
limit = 100,
1748-
) {
1763+
): Promise<IActivityRelationsCreateData[]> {
17491764
let cursorQuery = ''
17501765

17511766
if (cursorActivityTimestamp) {

0 commit comments

Comments
 (0)