-
Notifications
You must be signed in to change notification settings - Fork 741
/
Copy pathmap-tenant-members-to-org.ts
157 lines (131 loc) · 4.63 KB
/
map-tenant-members-to-org.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
import { DB_CONFIG, REDIS_CONFIG, SQS_CONFIG, TEMPORAL_CONFIG, UNLEASH_CONFIG } from '../conf'
import { DbStore, getDbConnection } from '@crowd/data-access-layer/src/database'
import { getServiceTracer } from '@crowd/tracing'
import { getServiceLogger } from '@crowd/logging'
import { getSqsClient } from '@crowd/sqs'
import MemberRepository from '@crowd/data-access-layer/src/old/apps/data_sink_worker/repo/member.repo'
import DataSinkRepository from '@crowd/data-access-layer/src/old/apps/data_sink_worker/repo/dataSink.repo'
import MemberService from '../service/member.service'
import { OrganizationService } from '../service/organization.service'
import { getUnleashClient } from '@crowd/feature-flags'
import { Client as TemporalClient, getTemporalClient } from '@crowd/temporal'
import { getRedisClient } from '@crowd/redis'
import {
DataSinkWorkerEmitter,
NodejsWorkerEmitter,
PriorityLevelContextRepository,
QueuePriorityContextLoader,
SearchSyncWorkerEmitter,
} from '@crowd/common_services'
const tracer = getServiceTracer()
const log = getServiceLogger()
const processArguments = process.argv.slice(2)
if (processArguments.length !== 1) {
log.error('Expected 1 argument: tenantId')
process.exit(1)
}
const tenantId = processArguments[0]
setImmediate(async () => {
const unleash = await getUnleashClient(UNLEASH_CONFIG())
let temporal: TemporalClient | undefined
// temp for production
if (TEMPORAL_CONFIG().serverUrl) {
temporal = await getTemporalClient(TEMPORAL_CONFIG())
}
const dbConnection = await getDbConnection(DB_CONFIG())
const store = new DbStore(log, dbConnection)
const priorityLevelRepo = new PriorityLevelContextRepository(new DbStore(log, dbConnection), log)
const loader: QueuePriorityContextLoader = (tenantId: string) =>
priorityLevelRepo.loadPriorityLevelContext(tenantId)
const redis = await getRedisClient(REDIS_CONFIG())
const sqsClient = getSqsClient(SQS_CONFIG())
const emitter = new DataSinkWorkerEmitter(sqsClient, redis, tracer, unleash, loader, log)
await emitter.init()
const dataSinkRepo = new DataSinkRepository(store, log)
const memberRepo = new MemberRepository(store, log)
const segmentIds = await dataSinkRepo.getSegmentIds(tenantId)
const segmentId = segmentIds[segmentIds.length - 1] // leaf segment id
const nodejsWorkerEmitter = new NodejsWorkerEmitter(
sqsClient,
redis,
tracer,
unleash,
loader,
log,
)
await nodejsWorkerEmitter.init()
const searchSyncWorkerEmitter = new SearchSyncWorkerEmitter(
sqsClient,
redis,
tracer,
unleash,
loader,
log,
)
await searchSyncWorkerEmitter.init()
const memberService = new MemberService(
store,
nodejsWorkerEmitter,
searchSyncWorkerEmitter,
unleash,
temporal,
redis,
log,
)
const orgService = new OrganizationService(store, log)
const limit = 100
let offset = 0
let processedMembers = 0
let currentMemberId = null
let currentEmails = null
try {
const { totalCount } = await memberRepo.getMemberIdsAndEmailsAndCount(tenantId, segmentIds, {
limit,
offset,
countOnly: true,
})
log.info({ tenantId }, `Total members found in the tenant: ${totalCount}`)
do {
const { members } = await memberRepo.getMemberIdsAndEmailsAndCount(tenantId, segmentIds, {
limit,
offset,
})
// member -> organization based on email domain
for (const member of members) {
currentMemberId = member.id
currentEmails = member.emails
if (member.emails) {
const orgs = await memberService.assignOrganizationByEmailDomain(
tenantId,
segmentId,
null,
member.emails,
)
if (orgs.length > 0) {
orgService.addToMember(tenantId, segmentId, member.id, orgs)
for (const org of orgs) {
await searchSyncWorkerEmitter.triggerOrganizationSync(
tenantId,
org.id,
true,
segmentId,
)
}
await searchSyncWorkerEmitter.triggerMemberSync(tenantId, member.id, true, segmentId)
}
}
processedMembers++
log.info(`Processed member ${member.id}. Progress: ${processedMembers}/${totalCount}`)
}
offset += limit
} while (totalCount > offset)
log.info(`Member to organization association completed for the tenant ${tenantId}`)
process.exit(0)
} catch (err) {
log.error(
`Failed to assign member to organizations for the tenant ${tenantId}. Member ID: ${currentMemberId}, Emails: ${currentEmails}`,
err,
)
process.exit(1)
}
})