Skip to content

Commit 9c9cd4c

Browse files
committed
use lastTimestamp
1 parent 8c424ff commit 9c9cd4c

File tree

4 files changed

+36
-21
lines changed

4 files changed

+36
-21
lines changed

services/apps/script_executor_worker/src/activities/fix-member-affilations/index.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,10 @@ import { svc } from '../../main'
66
export async function getSegmentMembers(
77
segmentId: string,
88
limit: number,
9-
offset: number,
10-
): Promise<string[]> {
9+
lastTimestamp?: string,
10+
): Promise<{ memberIds: string[]; lastTimestamp: string }> {
1111
const memberRepo = new MemberRepository(svc.postgres.reader.connection(), svc.log, svc.questdbSQL)
12-
return memberRepo.getSegmentMembers(segmentId, limit, offset)
12+
return memberRepo.getSegmentMembers(segmentId, limit, lastTimestamp)
1313
}
1414

1515
export async function calculateMemberAffiliations(memberId: string): Promise<void> {

services/apps/script_executor_worker/src/types.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,6 @@ export interface ISyncMembersArgs {
3636
export interface IFixMemberAffilationsArgs {
3737
segmentId: string
3838
limit?: number
39-
offset?: number
39+
lastTimestamp?: string
4040
testRun?: boolean
4141
}

services/apps/script_executor_worker/src/workflows/fixMemberAffilations.ts

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,19 @@ const syncActivity = proxyActivities<typeof syncActivities>({
1616

1717
export async function fixMemberAffilations(args: IFixMemberAffilationsArgs): Promise<void> {
1818
const limit = args.limit ?? 200
19-
const offset = args.offset ?? 0
19+
const lastTimestamp = args.lastTimestamp
2020

2121
console.log(
22-
`Fixing member affiliations for segment ${args.segmentId} with limit ${limit} and offset ${offset}`,
22+
`Fixing member affiliations for segment ${args.segmentId} with limit ${limit}${
23+
lastTimestamp ? ` and lastTimestamp ${lastTimestamp}` : ''
24+
}`,
2325
)
2426

25-
const memberIds = await activity.getSegmentMembers(args.segmentId, limit, offset)
27+
const { memberIds, lastTimestamp: newLastTimestamp } = await activity.getSegmentMembers(
28+
args.segmentId,
29+
limit,
30+
lastTimestamp,
31+
)
2632

2733
if (memberIds.length === 0) {
2834
console.log('No more members to fix!')
@@ -43,6 +49,6 @@ export async function fixMemberAffilations(args: IFixMemberAffilationsArgs): Pro
4349

4450
await continueAsNew<typeof fixMemberAffilations>({
4551
...args,
46-
offset: offset + limit,
52+
lastTimestamp: newLastTimestamp,
4753
})
4854
}

services/libs/data-access-layer/src/old/apps/script_executor_worker/member.repo.ts

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -173,24 +173,33 @@ class MemberRepository {
173173
return results.map((r) => r.id)
174174
}
175175

176-
public async getSegmentMembers(segmentId: string, limit = 200, offset = 0): Promise<string[]> {
176+
public async getSegmentMembers(
177+
segmentId: string,
178+
limit = 200,
179+
lastTimestamp?: string,
180+
): Promise<{ memberIds: string[]; lastTimestamp: string }> {
181+
const timestampFilter = lastTimestamp ? `AND "timestamp" < '${lastTimestamp}'` : ''
182+
177183
const results = await this.questdbSQL.query(
178184
`
179-
SELECT DISTINCT "memberId"
180-
FROM activities
181-
WHERE "segmentId" = $(segmentId)
182-
AND "deletedAt" IS NULL
183-
ORDER BY "timestamp" DESC
184-
LIMIT $(limit), $(offset)
185+
WITH distinct_members AS (
186+
SELECT DISTINCT "memberId", max("timestamp") as last_activity
187+
FROM activities
188+
WHERE "segmentId" = '${segmentId}'
189+
${timestampFilter}
190+
GROUP BY "memberId"
191+
)
192+
SELECT "memberId", last_activity as "lastTimestamp"
193+
FROM distinct_members
194+
ORDER BY last_activity DESC
195+
LIMIT ${limit}
185196
`,
186-
{
187-
segmentId,
188-
limit,
189-
offset,
190-
},
191197
)
192198

193-
return results.map((r) => r.memberId)
199+
return {
200+
memberIds: results.map((r) => r.memberId),
201+
lastTimestamp: results.length > 0 ? results[results.length - 1].lastTimestamp : undefined,
202+
}
194203
}
195204
}
196205

0 commit comments

Comments
 (0)