Skip to content

Commit 57d78d1

Browse files
committed
improve pagination by using timestamp instead
1 parent a01da2b commit 57d78d1

File tree

6 files changed

+96
-12
lines changed

6 files changed

+96
-12
lines changed

services/apps/script_executor_worker/src/activities/populate-activity-relations/index.ts

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
import { getDefaultTenantId } from '@crowd/common'
2-
import { createOrUpdateRelations, getActivitiesSortedById } from '@crowd/data-access-layer'
2+
import {
3+
createOrUpdateRelations,
4+
getActivityRelationsSortedByTimestamp,
5+
getActivityTimestampById,
6+
} from '@crowd/data-access-layer'
37
import { pgpQx } from '@crowd/data-access-layer/src/queryExecutor'
48
import { IndexedEntityType, IndexingRepository } from '@crowd/opensearch'
59

@@ -26,7 +30,19 @@ export async function markActivitiesAsIndexed(activityIds: string[]): Promise<vo
2630
}
2731

2832
export async function getActivitiesToCopy(latestSyncedActivityId: string, limit: number) {
29-
const activities = await getActivitiesSortedById(svc.questdbSQL, latestSyncedActivityId, limit)
33+
let latestSyncedActivityTimestamp = undefined
34+
if (latestSyncedActivityId) {
35+
latestSyncedActivityTimestamp = await getActivityTimestampById(
36+
svc.questdbSQL,
37+
latestSyncedActivityId,
38+
)
39+
}
40+
41+
const activities = await getActivityRelationsSortedByTimestamp(
42+
svc.questdbSQL,
43+
latestSyncedActivityTimestamp ? latestSyncedActivityTimestamp.timestamp : undefined,
44+
limit,
45+
)
3046
return activities
3147
}
3248

services/apps/script_executor_worker/src/types.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,4 +30,5 @@ export interface IFixOrgIdentitiesWithWrongUrlsArgs {
3030
export interface IPopulateActivityRelationsArgs {
3131
batchSizePerRun: number
3232
deleteIndexedEntities?: boolean
33+
lastIndexedActivityId?: string
3334
}

services/apps/script_executor_worker/src/workflows/populateActivityRelations.ts

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@ export async function populateActivityRelations(
1717
await activity.deleteActivityIdsFromIndexedEntities()
1818
}
1919

20-
const latestSyncedActivityId = await activity.getLatestSyncedActivityId()
20+
const latestSyncedActivityId =
21+
args.lastIndexedActivityId || (await activity.getLatestSyncedActivityId())
2122

2223
const activitiesToCopy = await activity.getActivitiesToCopy(
2324
latestSyncedActivityId ?? undefined,
@@ -28,11 +29,19 @@ export async function populateActivityRelations(
2829
return
2930
}
3031

32+
if (activitiesToCopy.length < BATCH_SIZE_PER_RUN) {
33+
const lastSyncedActivityId = await activity.getLatestSyncedActivityId()
34+
if (lastSyncedActivityId === args.lastIndexedActivityId) {
35+
return
36+
}
37+
}
38+
3139
await activity.createRelations(activitiesToCopy)
3240

3341
await activity.markActivitiesAsIndexed(activitiesToCopy.map((a) => a.id))
3442

3543
await continueAsNew<typeof populateActivityRelations>({
3644
batchSizePerRun: args.batchSizePerRun,
45+
lastIndexedActivityId: activitiesToCopy[activitiesToCopy.length - 1].id,
3746
})
3847
}

services/libs/data-access-layer/src/activities/sql.ts

Lines changed: 42 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@ import {
1010
ActivityDisplayVariant,
1111
IActivityBySentimentMoodResult,
1212
IActivityByTypeAndPlatformResult,
13-
IActivityCreateData,
1413
IActivityData,
14+
IActivityDbBase,
1515
IEnrichableMemberIdentityActivityAggregate,
1616
IMemberIdentity,
1717
ITimeseriesDatapoint,
@@ -75,6 +75,30 @@ export async function getActivitiesById(
7575
return data.rows
7676
}
7777

78+
/**
79+
* Finds activity timestamp by id, without tenant or segment filters
80+
* @param qdbConn
81+
* @param id
82+
* @returns IActivityCreateData
83+
*/
84+
export async function getActivityTimestampById(
85+
qdbConn: DbConnOrTx,
86+
id: string,
87+
): Promise<Partial<IActivityDbBase>> {
88+
const query = `
89+
SELECT timestamp
90+
FROM activities
91+
WHERE "deletedAt" IS NULL
92+
and id = $(id)
93+
`
94+
95+
const rows = await qdbConn.any(query, {
96+
id,
97+
})
98+
99+
return rows.length > 0 ? rows[0] : null
100+
}
101+
78102
const ACTIVITY_UPDATABLE_COLUMNS: ActivityColumn[] = [
79103
'type',
80104
'isContribution',
@@ -1714,28 +1738,38 @@ export async function moveActivityRelationsToAnotherOrganization(
17141738
} while (rowsUpdated === batchSize)
17151739
}
17161740

1717-
export async function getActivitiesSortedById(
1741+
export async function getActivityRelationsSortedByTimestamp(
17181742
qdbConn: DbConnOrTx,
1719-
cursorActivityId?: string,
1743+
cursorActivityTimestamp?: string,
17201744
limit = 100,
17211745
) {
17221746
let cursorQuery = ''
17231747

1724-
if (cursorActivityId) {
1725-
cursorQuery = `AND id > $(cursorActivityId)::uuid`
1748+
if (cursorActivityTimestamp) {
1749+
cursorQuery = `AND timestamp >= $(cursorActivityTimestamp)`
17261750
}
17271751

17281752
const query = `
1729-
SELECT *
1753+
SELECT
1754+
id,
1755+
"memberId",
1756+
"objectMemberId",
1757+
"organizationId",
1758+
"conversationId",
1759+
"parentId",
1760+
"segmentId",
1761+
platform,
1762+
username,
1763+
"objectMemberUsername"
17301764
FROM activities
17311765
WHERE "deletedAt" IS NULL
17321766
${cursorQuery}
1733-
ORDER BY id::text asc
1767+
ORDER BY timestamp asc
17341768
LIMIT ${limit}
17351769
`
17361770

17371771
const rows = await qdbConn.any(query, {
1738-
cursorActivityId,
1772+
cursorActivityTimestamp,
17391773
limit,
17401774
})
17411775

services/libs/opensearch/src/repo/indexing.repo.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ export class IndexingRepository extends RepositoryBase<IndexingRepository> {
3838
select entity_id
3939
from indexed_entities
4040
where type = $(type)
41-
order by entity_id desc
41+
order by indexed_at desc
4242
limit 1
4343
`,
4444
{

services/libs/types/src/activities.ts

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,30 @@
11
import { PlatformType } from './enums/platforms'
22
import { IMemberData } from './members'
33

4+
5+
export interface IActivityDbBase {
6+
id: string
7+
type: string
8+
isContribution: boolean
9+
score: number
10+
timestamp: string
11+
platform: string
12+
sourceId: string
13+
sourceParentId?: string
14+
memberId: string
15+
username: string
16+
objectMemberId?: string
17+
objectMemberUsername?: string
18+
attributes: Record<string, unknown>
19+
body?: string
20+
title?: string
21+
channel?: string
22+
url?: string
23+
tenantId?: string
24+
organizationId?: string
25+
segmentId?: string
26+
}
27+
428
export interface IActivityCreateData {
529
id?: string
630
type: string

0 commit comments

Comments
 (0)