Skip to content

Commit 4a078c2

Browse files
authored
Handle activity relations without username or platform or segmentId (#2863)
1 parent 7444c75 commit 4a078c2

File tree

6 files changed

+43
-30
lines changed

6 files changed

+43
-30
lines changed

services/apps/script_executor_worker/src/activities.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import {
2424
import {
2525
createRelations,
2626
getActivitiesToCopy,
27-
getLatestSyncedActivityCreatedAt,
27+
getLatestSyncedActivityTimestamp,
2828
markActivitiesAsIndexed,
2929
resetIndexedIdentities,
3030
} from './activities/populate-activity-relations'
@@ -54,7 +54,7 @@ export {
5454
createRelations,
5555
resetIndexedIdentities,
5656
getActivitiesToCopy,
57-
getLatestSyncedActivityCreatedAt,
57+
getLatestSyncedActivityTimestamp,
5858
markActivitiesAsIndexed,
5959
syncMembersBatch,
6060
getMembersForSync,

services/apps/script_executor_worker/src/activities/populate-activity-relations/index.ts

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import {
22
createOrUpdateRelations,
3-
getActivityRelationsSortedByCreatedAt,
3+
getActivityRelationsSortedByTimestamp,
44
} from '@crowd/data-access-layer'
55
import { pgpQx } from '@crowd/data-access-layer/src/queryExecutor'
66
import { RedisCache } from '@crowd/redis'
@@ -9,27 +9,27 @@ import { svc } from '../../main'
99

1010
export async function resetIndexedIdentities(): Promise<void> {
1111
const redisCache = new RedisCache(`activity-relations-data`, svc.redis, svc.log)
12-
await redisCache.delete('latest-synced-activity-created-at')
12+
await redisCache.delete('last-synced-activity-timestamp')
1313
}
1414

15-
export async function getLatestSyncedActivityCreatedAt(): Promise<string> {
15+
export async function getLatestSyncedActivityTimestamp(): Promise<string> {
1616
const redisCache = new RedisCache(`activity-relations-data`, svc.redis, svc.log)
17-
const result = await redisCache.get('latest-synced-activity-created-at')
17+
const result = await redisCache.get('last-synced-activity-timestamp')
1818
return result || null
1919
}
2020

2121
export async function markActivitiesAsIndexed(activitiesRedisKey: string): Promise<string> {
2222
const activities = await getActivitiyDataFromRedis(activitiesRedisKey)
2323
const redisCache = new RedisCache(`activity-relations-data`, svc.redis, svc.log)
24-
const lastSyncedCreatedAt = activities[activities.length - 1].createdAt
25-
await redisCache.set('latest-synced-activity-created-at', lastSyncedCreatedAt)
26-
return lastSyncedCreatedAt
24+
const lastSyncedTimestamp = activities[activities.length - 1].timestamp
25+
await redisCache.set('last-synced-activity-timestamp', lastSyncedTimestamp)
26+
return lastSyncedTimestamp
2727
}
2828

29-
export async function getActivitiesToCopy(latestSyncedActivityCreatedAt: string, limit: number) {
30-
const activities = await getActivityRelationsSortedByCreatedAt(
29+
export async function getActivitiesToCopy(latestSyncedActivityTimestamp: string, limit: number) {
30+
const activities = await getActivityRelationsSortedByTimestamp(
3131
svc.questdbSQL,
32-
latestSyncedActivityCreatedAt,
32+
latestSyncedActivityTimestamp,
3333
limit,
3434
)
3535

@@ -44,7 +44,7 @@ export async function getActivitiesToCopy(latestSyncedActivityCreatedAt: string,
4444
return {
4545
activitiesRedisKey: key,
4646
activitiesLength: activities.length,
47-
lastCreatedAt: activities[activities.length - 1].createdAt,
47+
lastTimestamp: activities[activities.length - 1].timestamp,
4848
}
4949
}
5050

@@ -70,7 +70,7 @@ export async function createRelations(activitiesRedisKey): Promise<void> {
7070

7171
export async function saveActivityDataToRedis(key: string, activities): Promise<void> {
7272
const redisCache = new RedisCache(`activity-relations-data`, svc.redis, svc.log)
73-
await redisCache.set(key, JSON.stringify(activities), 30)
73+
await redisCache.set(key, JSON.stringify(activities), 360)
7474
}
7575

7676
export async function getActivitiyDataFromRedis(key: string) {

services/apps/script_executor_worker/src/schedules/schedulePopulateActivityRelations.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ export const schedulePopulateActivityRelations = async () => {
2525
},
2626
args: [
2727
{
28-
batchSizePerRun: 10000,
28+
batchSizePerRun: 50000,
2929
deleteIndexedEntities: false,
3030
},
3131
],

services/apps/script_executor_worker/src/types.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ export interface IFixOrgIdentitiesWithWrongUrlsArgs {
2323
export interface IPopulateActivityRelationsArgs {
2424
batchSizePerRun: number
2525
deleteIndexedEntities?: boolean
26-
latestSyncedActivityCreatedAt?: string
26+
latestSyncedActivityTimestamp?: string
2727
}
2828
export interface ISyncMembersArgs {
2929
batchSize?: number

services/apps/script_executor_worker/src/workflows/populateActivityRelations.ts

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,19 +12,19 @@ export async function populateActivityRelations(
1212
args: IPopulateActivityRelationsArgs,
1313
): Promise<void> {
1414
const BATCH_SIZE_PER_RUN = args.batchSizePerRun || 1000
15-
let latestSyncedActivityCreatedAt
15+
let latestSyncedActivityTimestamp
1616

1717
if (args.deleteIndexedEntities) {
1818
await activity.resetIndexedIdentities()
19-
latestSyncedActivityCreatedAt = null
19+
latestSyncedActivityTimestamp = null
2020
} else {
21-
latestSyncedActivityCreatedAt =
22-
args.latestSyncedActivityCreatedAt || (await activity.getLatestSyncedActivityCreatedAt())
21+
latestSyncedActivityTimestamp =
22+
args.latestSyncedActivityTimestamp || (await activity.getLatestSyncedActivityTimestamp())
2323
}
2424

25-
const { activitiesLength, activitiesRedisKey, lastCreatedAt } =
25+
const { activitiesLength, activitiesRedisKey, lastTimestamp } =
2626
await activity.getActivitiesToCopy(
27-
latestSyncedActivityCreatedAt ?? undefined,
27+
latestSyncedActivityTimestamp ?? undefined,
2828
BATCH_SIZE_PER_RUN,
2929
)
3030

@@ -33,7 +33,7 @@ export async function populateActivityRelations(
3333
}
3434

3535
if (activitiesLength < BATCH_SIZE_PER_RUN) {
36-
if (lastCreatedAt === args.latestSyncedActivityCreatedAt) {
36+
if (lastTimestamp === args.latestSyncedActivityTimestamp) {
3737
return
3838
}
3939
}
@@ -44,6 +44,6 @@ export async function populateActivityRelations(
4444

4545
await continueAsNew<typeof populateActivityRelations>({
4646
batchSizePerRun: args.batchSizePerRun,
47-
latestSyncedActivityCreatedAt: lastCreatedAt,
47+
latestSyncedActivityTimestamp: lastTimestamp,
4848
})
4949
}

services/libs/data-access-layer/src/activities/sql.ts

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1423,6 +1423,18 @@ export async function createOrUpdateRelations(
14231423
qe: QueryExecutor,
14241424
data: IActivityRelationCreateOrUpdateData,
14251425
): Promise<void> {
1426+
if (data.username === undefined || data.username === null) {
1427+
return
1428+
}
1429+
1430+
if (data.platform === undefined || data.platform === null) {
1431+
return
1432+
}
1433+
1434+
if (data.segmentId === undefined || data.segmentId === null) {
1435+
return
1436+
}
1437+
14261438
// check objectMember exists
14271439
if (data.objectMemberId !== undefined && data.objectMemberId !== null) {
14281440
let objectMember = await qe.select(
@@ -1729,21 +1741,22 @@ export async function moveActivityRelationsToAnotherOrganization(
17291741
} while (rowsUpdated === batchSize)
17301742
}
17311743

1732-
export async function getActivityRelationsSortedByCreatedAt(
1744+
export async function getActivityRelationsSortedByTimestamp(
17331745
qdbConn: DbConnOrTx,
1734-
cursorActivityCreatedAt?: string,
1746+
cursorActivityTimestamp?: string,
17351747
limit = 100,
17361748
) {
17371749
let cursorQuery = ''
17381750

1739-
if (cursorActivityCreatedAt) {
1740-
cursorQuery = `AND "createdAt" >= $(cursorActivityCreatedAt)`
1751+
if (cursorActivityTimestamp) {
1752+
cursorQuery = `AND "timestamp" >= $(cursorActivityTimestamp)`
17411753
}
17421754

17431755
const query = `
17441756
SELECT
17451757
id,
17461758
"memberId",
1759+
timestamp,
17471760
"createdAt",
17481761
"objectMemberId",
17491762
"organizationId",
@@ -1756,12 +1769,12 @@ export async function getActivityRelationsSortedByCreatedAt(
17561769
FROM activities
17571770
WHERE "deletedAt" IS NULL
17581771
${cursorQuery}
1759-
ORDER BY "createdAt" asc
1772+
ORDER BY "timestamp" asc
17601773
LIMIT ${limit}
17611774
`
17621775

17631776
const rows = await qdbConn.any(query, {
1764-
cursorActivityCreatedAt,
1777+
cursorActivityTimestamp,
17651778
limit,
17661779
})
17671780

0 commit comments

Comments
 (0)