Skip to content

Commit 2e4d9ee

Browse files
committed
chunking inserts 1k each
1 parent bdc193d commit 2e4d9ee

File tree

3 files changed

+49
-17
lines changed
  • services
    • apps/script_executor_worker/src/activities/populate-activity-relations
    • libs

3 files changed

+49
-17
lines changed

services/apps/script_executor_worker/src/activities/populate-activity-relations/index.ts

Lines changed: 25 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1+
import { chunkArray } from '@crowd/common'
12
import {
3+
IActivityRelationsCreateData,
24
createOrUpdateRelations,
35
getActivityRelationsSortedByTimestamp,
46
} from '@crowd/data-access-layer'
@@ -49,23 +51,30 @@ export async function getActivitiesToCopy(latestSyncedActivityTimestamp: string,
4951
}
5052

5153
export async function createRelations(activitiesRedisKey): Promise<void> {
52-
const activities = await getActivitiyDataFromRedis(activitiesRedisKey)
53-
const promises = activities.map(async (activity) =>
54-
createOrUpdateRelations(pgpQx(svc.postgres.writer.connection()), {
55-
activityId: activity.id,
56-
memberId: activity.memberId,
57-
platform: activity.platform,
58-
segmentId: activity.segmentId,
59-
username: activity.username,
60-
conversationId: activity.conversationId,
61-
objectMemberId: activity.objectMemberId,
62-
objectMemberUsername: activity.objectMemberUsername,
63-
organizationId: activity.organizationId,
64-
parentId: activity.parentId,
65-
}),
66-
)
54+
const activities: IActivityRelationsCreateData[] =
55+
await getActivitiyDataFromRedis(activitiesRedisKey)
56+
57+
const chunkSize = 1000
58+
const activityChunks = chunkArray(activities, chunkSize)
6759

68-
await Promise.all(promises)
60+
for (const chunk of activityChunks) {
61+
const promises = chunk.map((activity) =>
62+
createOrUpdateRelations(pgpQx(svc.postgres.writer.connection()), {
63+
activityId: activity.id,
64+
memberId: activity.memberId,
65+
platform: activity.platform,
66+
segmentId: activity.segmentId,
67+
username: activity.username,
68+
conversationId: activity.conversationId,
69+
objectMemberId: activity.objectMemberId,
70+
objectMemberUsername: activity.objectMemberUsername,
71+
organizationId: activity.organizationId,
72+
parentId: activity.parentId,
73+
}),
74+
)
75+
76+
await Promise.all(promises)
77+
}
6978
}
7079

7180
export async function saveActivityDataToRedis(key: string, activities): Promise<void> {

services/libs/common/src/array.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,3 +134,11 @@ export const hasIntersection = (arr1: string[], arr2: string[]): boolean => {
134134
const set1 = new Set(arr1)
135135
return arr2.some((item) => set1.has(item))
136136
}
137+
138+
export function chunkArray<T>(array: T[], chunkSize: number): T[][] {
139+
const chunks = []
140+
for (let i = 0; i < array.length; i += chunkSize) {
141+
chunks.push(array.slice(i, i + chunkSize))
142+
}
143+
return chunks
144+
}

services/libs/data-access-layer/src/activities/sql.ts

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1741,11 +1741,26 @@ export async function moveActivityRelationsToAnotherOrganization(
17411741
} while (rowsUpdated === batchSize)
17421742
}
17431743

1744+
export interface IActivityRelationsCreateData {
1745+
id: string
1746+
memberId: string
1747+
timestamp: string
1748+
createdAt: string
1749+
objectMemberId?: string
1750+
organizationId?: string
1751+
conversationId?: string
1752+
parentId?: string
1753+
segmentId: string
1754+
platform: string
1755+
username: string
1756+
objectMemberUsername?: string
1757+
}
1758+
17441759
export async function getActivityRelationsSortedByTimestamp(
17451760
qdbConn: DbConnOrTx,
17461761
cursorActivityTimestamp?: string,
17471762
limit = 100,
1748-
) {
1763+
): Promise<IActivityRelationsCreateData[]> {
17491764
let cursorQuery = ''
17501765

17511766
if (cursorActivityTimestamp) {

0 commit comments

Comments
 (0)