Skip to content

Commit d6a76c9

Browse files
authored
fix: data sink worker optimizations (CM-1054) (#3952)
Signed-off-by: Uroš Marolt <uros@marolt.me>
1 parent 8ccd558 commit d6a76c9

File tree

12 files changed

+373
-173
lines changed

12 files changed

+373
-173
lines changed

backend/src/database/migrations/U1774430554__covering-indexes-for-member-identity-lookups.sql

Whitespace-only changes.
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
-- Query: findMembersByVerifiedUsernames
2+
--
3+
-- Joins memberIdentities on (platform, lower(value)) with WHERE:
4+
-- verified = true AND type = 'username' AND deletedAt IS NULL
5+
--
6+
-- The existing idx_memberIdentities_platform_type_lower_value_memberId is
7+
-- missing verified = true in its partial condition, so PostgreSQL must
8+
-- heap-fetch every row to recheck verified, which is expensive when there
9+
-- are many unverified identities.
10+
--
11+
-- This index adds verified = true to the partial condition and includes memberId
12+
-- so the join to members can read memberId from the index without a heap fetch.
13+
CREATE INDEX CONCURRENTLY IF NOT EXISTS "idx_memberIdentities_verified_username_platform_lower_value"
14+
ON "memberIdentities" (platform, lower(value), "memberId")
15+
WHERE verified = true
16+
AND type = 'username'
17+
AND "deletedAt" IS NULL;
18+
19+
-- Query: findMembersByVerifiedEmails
20+
--
21+
-- Joins memberIdentities on lower(value) with WHERE:
22+
-- verified = true AND type = 'email' AND deletedAt IS NULL
23+
--
24+
-- The existing idx_memberIdentities_verified_email_lower_value has the right
25+
-- partial condition but only stores lower(value) — no memberId. Every matched
26+
-- index entry requires a heap fetch to get memberId for the join to members.
27+
-- Under concurrent insert/update load, those heap fetches queue behind buffer
28+
-- pin locks, causing multi-second delays even for small inputs.
29+
--
30+
-- This index adds memberId so the join to members can proceed without
31+
-- touching the memberIdentities heap pages.
32+
CREATE INDEX CONCURRENTLY IF NOT EXISTS "idx_memberIdentities_verified_email_lower_value_memberid"
33+
ON "memberIdentities" (lower(value), "memberId")
34+
WHERE verified = true
35+
AND type = 'email'
36+
AND "deletedAt" IS NULL;

services/apps/data_sink_worker/src/service/activity.service.ts

Lines changed: 37 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1050,6 +1050,10 @@ export default class ActivityService extends LoggerBase {
10501050
const preparedActivities: IActivityPrepareForUpsertResult[] = []
10511051

10521052
const memberService = new MemberService(this.pgStore, this.redisClient, this.temporal, this.log)
1053+
// Shared org promise cache: ensures findOrCreateOrganization is called at most once per
1054+
// unique org per batch. Concurrent member creates that reference the same org await the
1055+
// same promise instead of firing redundant DB round trips.
1056+
const orgPromiseCache = new Map<string, Promise<string | undefined>>()
10531057

10541058
// find distinct members to create
10551059
const payloadsWithoutDbMembers: IActivityProcessData[] = relevantPayloads.filter(
@@ -1145,6 +1149,8 @@ export default class ActivityService extends LoggerBase {
11451149
reach: value.member.reach,
11461150
},
11471151
value.platform,
1152+
undefined,
1153+
orgPromiseCache,
11481154
)
11491155
.then((memberId) => {
11501156
// map ids for members
@@ -1288,6 +1294,8 @@ export default class ActivityService extends LoggerBase {
12881294
payload.dbMember,
12891295
dbMemberIdentities.get(payload.dbMember.id),
12901296
payload.platform,
1297+
undefined,
1298+
orgPromiseCache,
12911299
)
12921300
.then(() => {
12931301
payload.memberId = payload.dbMember.id
@@ -1344,6 +1352,8 @@ export default class ActivityService extends LoggerBase {
13441352
payload.dbObjectMember,
13451353
dbMemberIdentities.get(payload.dbObjectMember.id),
13461354
payload.platform,
1355+
undefined,
1356+
orgPromiseCache,
13471357
)
13481358
.then(() => {
13491359
payload.objectMemberId = payload.dbObjectMember.id
@@ -1538,7 +1548,7 @@ export default class ActivityService extends LoggerBase {
15381548
this.log.trace(
15391549
`[ACTIVITY] Upserting ${relationsToUpsert.length} activity relations (filtered from ${preparedForUpsert.length}, skipped ${skippedCount})`,
15401550
)
1541-
await createOrUpdateRelations(this.pgQx, relationsToUpsert)
1551+
await createOrUpdateRelations(this.pgQx, relationsToUpsert, true)
15421552
} else {
15431553
this.log.trace(
15441554
`[ACTIVITY] No activity relations need updating (all ${preparedForUpsert.length} would only update updatedAt)`,
@@ -1580,28 +1590,36 @@ export default class ActivityService extends LoggerBase {
15801590
`${prepared.payload.platform}:${prepared.payload.channel}:${prepared.payload.segmentId}`,
15811591
)
15821592
}
1593+
}
15831594

1584-
await this.searchSyncWorkerEmitter.triggerMemberSync(
1585-
prepared.payload.memberId,
1586-
onboarding,
1587-
prepared.payload.segmentId,
1588-
)
1589-
1590-
if (prepared.payload.objectMemberId) {
1591-
await this.searchSyncWorkerEmitter.triggerMemberSync(
1592-
prepared.payload.objectMemberId,
1593-
onboarding,
1594-
prepared.payload.segmentId,
1595-
)
1596-
}
1595+
const orgIds = preparedForUpsert
1596+
.map((p) => p.payload.organizationId)
1597+
.filter((id): id is string => !!id)
1598+
if (orgIds.length > 0) {
1599+
await this.redisClient.sAdd('organizationIdsForAggComputation', orgIds)
1600+
}
15971601

1598-
if (prepared.payload.organizationId) {
1599-
await this.redisClient.sAdd(
1600-
'organizationIdsForAggComputation',
1601-
prepared.payload.organizationId,
1602-
)
1602+
// Deduplicate member sync triggers — a member may appear in many activities in the
1603+
// same batch. Emit once per unique (memberId, segmentId) pair.
1604+
const memberSyncKeys = new Set<string>()
1605+
const memberSyncPromises: Promise<void>[] = []
1606+
for (const prepared of preparedForUpsert) {
1607+
for (const memberId of [prepared.payload.memberId, prepared.payload.objectMemberId]) {
1608+
if (!memberId) continue
1609+
const key = `${memberId}:${prepared.payload.segmentId}`
1610+
if (!memberSyncKeys.has(key)) {
1611+
memberSyncKeys.add(key)
1612+
memberSyncPromises.push(
1613+
this.searchSyncWorkerEmitter.triggerMemberSync(
1614+
memberId,
1615+
onboarding,
1616+
prepared.payload.segmentId,
1617+
),
1618+
)
1619+
}
16031620
}
16041621
}
1622+
await Promise.all(memberSyncPromises)
16051623

16061624
for (const prepared of preparedActivities) {
16071625
resultMap.set(prepared.resultId, { success: true })

services/apps/data_sink_worker/src/service/member.service.ts

Lines changed: 102 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import { Client as TemporalClient } from '@crowd/temporal'
2828
import {
2929
IMemberData,
3030
IMemberIdentity,
31+
IOrganization,
3132
IOrganizationIdSource,
3233
MemberAttributeName,
3334
MemberBotDetection,
@@ -43,6 +44,22 @@ import { IMemberCreateData, IMemberUpdateData } from './member.data'
4344
import MemberAttributeService from './memberAttribute.service'
4445
import { OrganizationService } from './organization.service'
4546

47+
/**
48+
* Returns a stable cache key for an org based on its verified identities, falling back to
49+
* displayName. Used by the org promise cache to deduplicate `findOrCreateOrganization` calls
50+
* across members in the same batch.
51+
*/
52+
function orgCacheKey(org: IOrganization): string | null {
53+
const verified = (org.identities ?? [])
54+
.filter((i) => i.verified)
55+
.map((i) => `${i.platform}:${i.type}:${i.value.toLowerCase()}`)
56+
.sort()
57+
.join('|')
58+
if (verified) return verified
59+
if (org.displayName) return `name:${org.displayName.toLowerCase()}`
60+
return null
61+
}
62+
4663
export default class MemberService extends LoggerBase {
4764
private readonly memberRepo: MemberRepository
4865
private readonly pgQx: QueryExecutor
@@ -67,6 +84,7 @@ export default class MemberService extends LoggerBase {
6784
data: IMemberCreateData,
6885
platform: PlatformType,
6986
releaseMemberLock?: () => Promise<void>,
87+
orgPromiseCache?: Map<string, Promise<string | undefined>>,
7088
): Promise<string> {
7189
return logExecutionTimeV2(
7290
async () => {
@@ -188,11 +206,31 @@ export default class MemberService extends LoggerBase {
188206
const orgService = new OrganizationService(this.store, this.log)
189207
if (data.organizations) {
190208
for (const org of data.organizations) {
191-
const id = await logExecutionTimeV2(
192-
() => orgService.findOrCreate(platform, integrationId, org),
193-
this.log,
194-
'memberService -> create -> findOrCreateOrg',
195-
)
209+
// Temp fix: skip the individual-noaccount.com placeholder org to avoid
210+
// hot-row contention on the organizations table. Permanent fix is in
211+
// tncTransformerBase.ts to stop emitting this org entirely.
212+
if (
213+
org.identities?.some((i) => i.verified && i.value === 'individual-noaccount.com')
214+
) {
215+
continue
216+
}
217+
218+
const key = orgCacheKey(org)
219+
let orgIdPromise: Promise<string | undefined>
220+
if (key && orgPromiseCache?.has(key)) {
221+
orgIdPromise = orgPromiseCache.get(key)
222+
} else {
223+
orgIdPromise = logExecutionTimeV2(
224+
() => orgService.findOrCreate(platform, integrationId, org),
225+
this.log,
226+
'memberService -> create -> findOrCreateOrg',
227+
)
228+
if (key) {
229+
orgPromiseCache?.set(key, orgIdPromise)
230+
orgIdPromise.catch(() => orgPromiseCache?.delete(key))
231+
}
232+
}
233+
const id = await orgIdPromise
196234
organizations.push({
197235
id,
198236
source: org.source,
@@ -209,6 +247,7 @@ export default class MemberService extends LoggerBase {
209247
this.assignOrganizationByEmailDomain(
210248
integrationId,
211249
emailIdentities.map((i) => i.value),
250+
orgPromiseCache,
212251
),
213252
this.log,
214253
'memberService -> create -> assignOrganizationByEmailDomain',
@@ -220,7 +259,6 @@ export default class MemberService extends LoggerBase {
220259

221260
if (organizations.length > 0) {
222261
const uniqOrgs = uniqby(organizations, 'id')
223-
const orgService = new OrganizationService(this.store, this.log)
224262

225263
const orgsToAdd = (
226264
await Promise.all(
@@ -266,6 +304,7 @@ export default class MemberService extends LoggerBase {
266304
originalIdentities: IMemberIdentity[],
267305
platform: PlatformType,
268306
releaseMemberLock?: () => Promise<void>,
307+
orgPromiseCache?: Map<string, Promise<string | undefined>>,
269308
): Promise<void> {
270309
await logExecutionTimeV2(
271310
async () => {
@@ -398,13 +437,33 @@ export default class MemberService extends LoggerBase {
398437
const orgService = new OrganizationService(this.store, this.log)
399438
if (data.organizations) {
400439
for (const org of data.organizations) {
440+
// Temp fix: skip the individual-noaccount.com placeholder org to avoid
441+
// hot-row contention on the organizations table. Permanent fix is in
442+
// tncTransformerBase.ts to stop emitting this org entirely.
443+
if (
444+
org.identities?.some((i) => i.verified && i.value === 'individual-noaccount.com')
445+
) {
446+
continue
447+
}
448+
401449
this.log.trace({ memberId: id }, 'Finding or creating organization!')
402450

403-
const orgId = await logExecutionTimeV2(
404-
() => orgService.findOrCreate(platform, integrationId, org),
405-
this.log,
406-
'memberService -> update -> findOrCreateOrg',
407-
)
451+
const key = orgCacheKey(org)
452+
let orgIdPromise: Promise<string | undefined>
453+
if (key && orgPromiseCache?.has(key)) {
454+
orgIdPromise = orgPromiseCache.get(key)
455+
} else {
456+
orgIdPromise = logExecutionTimeV2(
457+
() => orgService.findOrCreate(platform, integrationId, org),
458+
this.log,
459+
'memberService -> update -> findOrCreateOrg',
460+
)
461+
if (key) {
462+
orgPromiseCache?.set(key, orgIdPromise)
463+
orgIdPromise.catch(() => orgPromiseCache?.delete(key))
464+
}
465+
}
466+
const orgId = await orgIdPromise
408467
organizations.push({
409468
id: orgId,
410469
source: data.source,
@@ -422,6 +481,7 @@ export default class MemberService extends LoggerBase {
422481
this.assignOrganizationByEmailDomain(
423482
integrationId,
424483
emailIdentities.map((i) => i.value),
484+
orgPromiseCache,
425485
),
426486
this.log,
427487
'memberService -> update -> assignOrganizationByEmailDomain',
@@ -433,7 +493,6 @@ export default class MemberService extends LoggerBase {
433493

434494
if (organizations.length > 0) {
435495
const uniqOrgs = uniqby(organizations, 'id')
436-
const orgService = new OrganizationService(this.store, this.log)
437496

438497
this.log.trace({ memberId: id }, 'Finding member organizations!')
439498
const orgsToAdd = (
@@ -473,6 +532,7 @@ export default class MemberService extends LoggerBase {
473532
public async assignOrganizationByEmailDomain(
474533
integrationId: string,
475534
emails: string[],
535+
orgPromiseCache?: Map<string, Promise<string | undefined>>,
476536
): Promise<IOrganizationIdSource[]> {
477537
const orgService = new OrganizationService(this.store, this.log)
478538
const organizations: IOrganizationIdSource[] = []
@@ -494,26 +554,38 @@ export default class MemberService extends LoggerBase {
494554
// Assign member to organization based on email domain
495555
for (const domain of emailDomains) {
496556
const orgSource = OrganizationSource.EMAIL_DOMAIN
497-
const orgId = await orgService.findOrCreate(
498-
OrganizationAttributeSource.EMAIL,
499-
integrationId,
500-
{
501-
attributes: {
502-
name: {
503-
integration: [domain],
504-
},
557+
const org: IOrganization = {
558+
attributes: {
559+
name: {
560+
integration: [domain],
505561
},
506-
identities: [
507-
{
508-
value: domain,
509-
type: OrganizationIdentityType.PRIMARY_DOMAIN,
510-
platform: 'email',
511-
verified: true,
512-
source: orgSource,
513-
},
514-
],
515562
},
516-
)
563+
identities: [
564+
{
565+
value: domain,
566+
type: OrganizationIdentityType.PRIMARY_DOMAIN,
567+
platform: 'email',
568+
verified: true,
569+
source: orgSource,
570+
},
571+
],
572+
}
573+
const key = orgCacheKey(org)
574+
let orgIdPromise: Promise<string | undefined>
575+
if (key && orgPromiseCache?.has(key)) {
576+
orgIdPromise = orgPromiseCache.get(key)
577+
} else {
578+
orgIdPromise = orgService.findOrCreate(
579+
OrganizationAttributeSource.EMAIL,
580+
integrationId,
581+
org,
582+
)
583+
if (key) {
584+
orgPromiseCache?.set(key, orgIdPromise)
585+
orgIdPromise.catch(() => orgPromiseCache?.delete(key))
586+
}
587+
}
588+
const orgId = await orgIdPromise
517589
if (orgId) {
518590
organizations.push({
519591
id: orgId,

0 commit comments

Comments
 (0)