Skip to content

Commit b7d4efe

Browse files
authored
Merge branch 'main' into support/merge-error
2 parents 5a8036e + 90e1aa5 commit b7d4efe

File tree

17 files changed

+478
-56
lines changed

17 files changed

+478
-56
lines changed

backend/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
"script:refreshGithubRepoSettings": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true tsx src/bin/scripts/refresh-github-repo-settings.ts"
3737
},
3838
"lint-staged": {
39-
"**/*.*": [
39+
"**/*.ts": [
4040
"eslint",
4141
"prettier --write"
4242
]

backend/src/database/migrations/U1737705711__activityRelations.sql

Whitespace-only changes.
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
create table public."activityRelations" (
2+
"activityId" uuid not null primary key,
3+
"memberId" uuid not null,
4+
"objectMemberId" uuid null,
5+
"organizationId" uuid null,
6+
"conversationId" uuid null,
7+
"parentId" uuid null,
8+
"segmentId" uuid not null,
9+
"platform" text not null,
10+
"username" text not null,
11+
"objectMemberUsername" text null,
12+
"createdAt" timestamp with time zone default now() not null,
13+
"updatedAt" timestamp with time zone default now() not null,
14+
foreign key ("memberId") references members (id) on delete cascade,
15+
foreign key ("organizationId") references organizations (id) on delete set null,
16+
foreign key ("objectMemberId") references members (id) on delete set null,
17+
foreign key ("conversationId") references conversations (id) on delete set null,
18+
foreign key ("segmentId") references segments (id) on delete cascade,
19+
unique ("activityId", "memberId")
20+
);
21+
create index "ix_activityRelations_memberId" on "activityRelations"("memberId");
22+
create index "ix_activityRelations_organizationId" on "activityRelations"("organizationId");
23+
create index "ix_activityRelations_platform_username" on "activityRelations"("platform", "username");
24+
25+
26+
DO
27+
$$
28+
DECLARE
29+
batch_size INT := 100000;
30+
last_processed_id UUID := '00000000-0000-0000-0000-000000000000';
31+
total_processed INT := 0;
32+
rows_inserted INT;
33+
BEGIN
34+
LOOP
35+
INSERT INTO "activityRelations" ("activityId", "memberId", "segmentId", "objectMemberId", "organizationId", "conversationId", "parentId", "platform", "username", "objectMemberUsername")
36+
SELECT id, "memberId", "segmentId", "objectMemberId", "organizationId", "conversationId", "parentId", "platform", "username", "objectMemberUsername"
37+
FROM activities
38+
WHERE id > last_processed_id
39+
AND "segmentId" IS NOT NULL
40+
ORDER BY id::TEXT
41+
LIMIT batch_size;
42+
43+
GET DIAGNOSTICS rows_inserted = ROW_COUNT;
44+
45+
total_processed := total_processed + rows_inserted;
46+
RAISE NOTICE 'Batch processed: % rows. Total processed: % rows.', rows_inserted, total_processed;
47+
48+
EXIT WHEN rows_inserted = 0;
49+
50+
SELECT id::UUID INTO last_processed_id
51+
FROM activities
52+
WHERE id > last_processed_id
53+
ORDER BY id::TEXT DESC
54+
LIMIT 1;
55+
56+
END LOOP;
57+
58+
RAISE NOTICE 'All rows processed. Total rows inserted: %.', total_processed;
59+
END
60+
$$;

backend/src/database/repositories/member/memberOrganizationAffiliationOverridesRepository.ts

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,12 @@
11
import {
22
changeOverride as changeMemberOrganizationAffiliationOverride,
33
findOverrides as findMemberOrganizationAffiliationOverrides,
4+
findPrimaryWorkExperiencesOfMember,
45
} from '@crowd/data-access-layer/src/member_organization_affiliation_overrides'
5-
import { IChangeAffiliationOverrideData } from '@crowd/types'
6+
import {
7+
IChangeAffiliationOverrideData,
8+
IMemberOrganizationAffiliationOverride,
9+
} from '@crowd/types'
610

711
import { IRepositoryOptions } from '../IRepositoryOptions'
812
import SequelizeRepository from '../sequelizeRepository'
@@ -17,6 +21,14 @@ class MemberOrganizationAffiliationOverridesRepository {
1721
])
1822
return overrides[0]
1923
}
24+
25+
static async findPrimaryWorkExperiences(
26+
memberId: string,
27+
options: IRepositoryOptions,
28+
): Promise<IMemberOrganizationAffiliationOverride[]> {
29+
const qx = SequelizeRepository.getQueryExecutor(options)
30+
return findPrimaryWorkExperiencesOfMember(qx, memberId)
31+
}
2032
}
2133

2234
export default MemberOrganizationAffiliationOverridesRepository

backend/src/services/member/memberAffiliationsService.ts

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
/* eslint-disable no-continue */
22
import { uniq } from 'lodash'
33

4-
import { groupBy } from '@crowd/common'
4+
import { Error400, dateIntersects, groupBy } from '@crowd/common'
55
import { findMaintainerRoles } from '@crowd/data-access-layer/src/maintainers'
66
import { fetchManySegments } from '@crowd/data-access-layer/src/segments'
77
import { LoggerBase } from '@crowd/logging'
@@ -18,6 +18,8 @@ import SequelizeRepository from '@/database/repositories/sequelizeRepository'
1818
import { IServiceOptions } from '../IServiceOptions'
1919
import MemberAffiliationService from '../memberAffiliationService'
2020

21+
import MemberOrganizationsService from './memberOrganizationsService'
22+
2123
export default class MemberAffiliationsService extends LoggerBase {
2224
options: IServiceOptions
2325

@@ -66,6 +68,41 @@ export default class MemberAffiliationsService extends LoggerBase {
6668
async changeAffiliationOverride(
6769
data: IChangeAffiliationOverrideData,
6870
): Promise<IMemberOrganizationAffiliationOverride> {
71+
if (data.isPrimaryWorkExperience) {
72+
const memberOrgService = new MemberOrganizationsService(this.options)
73+
// check if any other work experience in intersecting date range was marked as primary
74+
// we don't allow this because "isPrimaryWorkExperience" decides which work exp to pick on date conflicts
75+
const allWorkExperiencesOfMember = (await memberOrgService.list(data.memberId)).map(
76+
(mo) => mo.memberOrganizations,
77+
)
78+
79+
const currentlyEditedWorkExperience = allWorkExperiencesOfMember.find(
80+
(w) => w.id === data.memberOrganizationId,
81+
)
82+
83+
const primaryWorkExperiencesOfMember = allWorkExperiencesOfMember.filter(
84+
(w) => w.affiliationOverride.isPrimaryWorkExperience,
85+
)
86+
87+
if (currentlyEditedWorkExperience.affiliationOverride.isPrimaryWorkExperience === false) {
88+
for (const existingPrimaryWorkExp of primaryWorkExperiencesOfMember) {
89+
if (
90+
dateIntersects(
91+
existingPrimaryWorkExp.dateStart as string,
92+
existingPrimaryWorkExp.dateEnd as string,
93+
currentlyEditedWorkExperience.dateStart as string,
94+
currentlyEditedWorkExperience.dateEnd as string,
95+
)
96+
) {
97+
throw new Error400(
98+
this.options.language,
99+
`Date range conflicts with another primary work experience id = ${existingPrimaryWorkExp.id}`,
100+
)
101+
}
102+
}
103+
}
104+
}
105+
69106
const override = MemberOrganizationAffiliationOverridesRepository.changeOverride(
70107
data,
71108
this.options,

services/apps/data_sink_worker/src/service/activity.service.ts

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import {
1212
} from '@crowd/common'
1313
import { SearchSyncWorkerEmitter } from '@crowd/common_services'
1414
import {
15+
createOrUpdateRelations,
1516
findCommitsForPRSha,
1617
findMatchingPullRequestNodeId,
1718
insertActivities,
@@ -31,6 +32,7 @@ import { IDbMember } from '@crowd/data-access-layer/src/old/apps/data_sink_worke
3132
import MemberRepository from '@crowd/data-access-layer/src/old/apps/data_sink_worker/repo/member.repo'
3233
import RequestedForErasureMemberIdentitiesRepository from '@crowd/data-access-layer/src/old/apps/data_sink_worker/repo/requestedForErasureMemberIdentities.repo'
3334
import SettingsRepository from '@crowd/data-access-layer/src/old/apps/data_sink_worker/repo/settings.repo'
35+
import { dbStoreQx } from '@crowd/data-access-layer/src/queryExecutor'
3436
import { DEFAULT_ACTIVITY_TYPE_SETTINGS, GithubActivityType } from '@crowd/integrations'
3537
import { GitActivityType } from '@crowd/integrations/src/integrations/git/types'
3638
import { Logger, LoggerBase, getChildLogger } from '@crowd/logging'
@@ -84,6 +86,7 @@ export default class ActivityService extends LoggerBase {
8486
})
8587

8688
const id = await this.pgStore.transactionally(async (txStore) => {
89+
const queryExecutor = dbStoreQx(txStore)
8790
const txSettingsRepo = new SettingsRepository(txStore, this.log)
8891

8992
await txSettingsRepo.createActivityType(
@@ -132,6 +135,16 @@ export default class ActivityService extends LoggerBase {
132135
importHash: activity.importHash,
133136
},
134137
])
138+
await createOrUpdateRelations(queryExecutor, {
139+
activityId: activity.id,
140+
segmentId,
141+
memberId: activity.memberId,
142+
objectMemberId: activity.objectMemberId,
143+
organizationId: activity.organizationId,
144+
platform: activity.platform,
145+
username: activity.username,
146+
objectMemberUsername: activity.objectMemberUsername,
147+
})
135148
} catch (error) {
136149
this.log.error('Error creating activity in QuestDB:', error)
137150
throw error
@@ -160,6 +173,7 @@ export default class ActivityService extends LoggerBase {
160173
try {
161174
let toUpdate: IDbActivityUpdateData
162175
const updated = await this.pgStore.transactionally(async (txStore) => {
176+
const queryExecutor = dbStoreQx(txStore)
163177
const txSettingsRepo = new SettingsRepository(txStore, this.log)
164178

165179
toUpdate = await this.mergeActivityData(activity, original)
@@ -215,6 +229,16 @@ export default class ActivityService extends LoggerBase {
215229
importHash: original.importHash,
216230
},
217231
])
232+
await createOrUpdateRelations(queryExecutor, {
233+
activityId: id,
234+
segmentId,
235+
memberId: toUpdate.memberId || original.memberId,
236+
objectMemberId: toUpdate.objectMemberId || original.objectMemberId,
237+
organizationId: toUpdate.organizationId || original.organizationId,
238+
platform: toUpdate.platform || (original.platform as PlatformType),
239+
username: toUpdate.username || original.username,
240+
objectMemberUsername: toUpdate.objectMemberUsername || original.objectMemberUsername,
241+
})
218242
} catch (error) {
219243
this.log.error('Error updating (by inserting) activity in QuestDB:', error)
220244
throw error
@@ -1338,6 +1362,7 @@ export default class ActivityService extends LoggerBase {
13381362
}) => {
13391363
await updateActivities(
13401364
this.qdbStore.connection(),
1365+
dbStoreQx(this.pgStore),
13411366
this.client,
13421367
async (activity) => ({
13431368
attributes: {
@@ -1388,6 +1413,7 @@ export default class ActivityService extends LoggerBase {
13881413

13891414
await updateActivities(
13901415
this.qdbStore.connection(),
1416+
dbStoreQx(this.pgStore),
13911417
this.client,
13921418
async () => ({
13931419
sourceParentId: activity.sourceId,

services/apps/entity_merging_worker/src/activities.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
export {
22
deleteMember,
3-
moveActivitiesWithIdentityToAnotherMember,
43
recalculateActivityAffiliationsOfMemberAsync,
54
syncMember,
65
notifyFrontendMemberMergeSuccessful,

services/apps/entity_merging_worker/src/activities/members.ts

Lines changed: 8 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,6 @@ import { IDbActivityCreateData } from '@crowd/data-access-layer/src/old/apps/dat
66
import {
77
cleanupMember,
88
deleteMemberSegments,
9-
findMemberById,
10-
getIdentitiesWithActivity,
11-
moveIdentityActivitiesToNewMember,
129
} from '@crowd/data-access-layer/src/old/apps/entity_merging_worker'
1310
import { figureOutNewOrgId } from '@crowd/data-access-layer/src/old/apps/profiles_worker'
1411
import { prepareMemberAffiliationsUpdate } from '@crowd/data-access-layer/src/old/apps/profiles_worker'
@@ -31,42 +28,6 @@ export async function deleteMember(memberId: string): Promise<void> {
3128
await cleanupMember(svc.postgres.writer, memberId)
3229
}
3330

34-
export async function moveActivitiesWithIdentityToAnotherMember(
35-
fromId: string,
36-
toId: string,
37-
identities: IMemberIdentity[],
38-
tenantId: string,
39-
): Promise<void> {
40-
const memberExists = await findMemberById(svc.postgres.writer, toId, tenantId)
41-
42-
if (!memberExists) {
43-
return
44-
}
45-
46-
const identitiesWithActivity = await getIdentitiesWithActivity(
47-
svc.postgres.writer,
48-
fromId,
49-
tenantId,
50-
identities,
51-
)
52-
53-
for (const identity of identities.filter(
54-
(i) =>
55-
i.type === MemberIdentityType.USERNAME &&
56-
identitiesWithActivity.some((ai) => ai.platform === i.platform && ai.username === i.value),
57-
)) {
58-
await moveIdentityActivitiesToNewMember(
59-
svc.questdbSQL,
60-
svc.queue,
61-
tenantId,
62-
fromId,
63-
toId,
64-
identity.value,
65-
identity.platform,
66-
)
67-
}
68-
}
69-
7031
export async function recalculateActivityAffiliationsOfMemberAsync(
7132
memberId: string,
7233
tenantId: string,
@@ -184,14 +145,17 @@ export async function finishMemberMergingUpdateActivities(memberId: string, newM
184145
const queueClient = svc.queue
185146

186147
const qx = pgpQx(pgDb.connection())
187-
const { orgCases } = await prepareMemberAffiliationsUpdate(qx, memberId)
148+
const { orgCases, fallbackOrganizationId } = await prepareMemberAffiliationsUpdate(qx, memberId)
188149

189150
await updateActivities(
190151
qDb,
152+
pgpQx(svc.postgres.writer.connection()),
191153
queueClient,
192154
[
193155
async () => ({ memberId: newMemberId }),
194-
async (activity) => ({ organizationId: figureOutNewOrgId(activity, orgCases) }),
156+
async (activity) => ({
157+
organizationId: figureOutNewOrgId(activity, orgCases, fallbackOrganizationId),
158+
}),
195159
],
196160
`"memberId" = $(memberId)`,
197161
{
@@ -232,15 +196,16 @@ export async function finishMemberUnmergingUpdateActivities({
232196
const queueClient = svc.queue
233197

234198
const qx = pgpQx(pgDb.connection())
235-
const { orgCases } = await prepareMemberAffiliationsUpdate(qx, memberId)
199+
const { orgCases, fallbackOrganizationId } = await prepareMemberAffiliationsUpdate(qx, memberId)
236200

237201
await updateActivities(
238202
qDb,
203+
pgpQx(svc.postgres.writer.connection()),
239204
queueClient,
240205
[
241206
async (activity) => moveByIdentities({ activity, identities, newMemberId }),
242207
async (activity) => ({
243-
organizationId: figureOutNewOrgId(activity, orgCases),
208+
organizationId: figureOutNewOrgId(activity, orgCases, fallbackOrganizationId),
244209
}),
245210
],
246211
`"memberId" = $(memberId)`,

services/apps/entity_merging_worker/src/activities/organizations.ts

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,14 @@ export async function moveActivitiesBetweenOrgs(
2929
secondaryId: string,
3030
tenantId: string,
3131
): Promise<void> {
32-
await moveActivitiesToNewOrg(svc.questdbSQL, svc.queue, primaryId, secondaryId, tenantId)
32+
await moveActivitiesToNewOrg(
33+
svc.questdbSQL,
34+
svc.postgres.writer.connection(),
35+
svc.queue,
36+
primaryId,
37+
secondaryId,
38+
tenantId,
39+
)
3340
}
3441

3542
export async function recalculateActivityAffiliationsOfOrganizationSynchronous(

services/libs/common/src/timing.ts

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,3 +57,25 @@ export const getLongestDateRange = <T extends IMemberOrganization>(orgs: T[]) =>
5757

5858
return sortedByDateRange[0]
5959
}
60+
61+
export const dateIntersects = (
62+
d1Start?: string | null,
63+
d1End?: string | null,
64+
d2Start?: string | null,
65+
d2End?: string | null,
66+
): boolean => {
67+
// If both periods have no dates at all, we consider they span all time
68+
if ((!d1Start && !d1End) || (!d2Start && !d2End)) {
69+
return true
70+
}
71+
72+
// Convert strings to timestamps, using fallbacks for missing dates
73+
const start1 = d1Start ? new Date(d1Start).getTime() : -Infinity
74+
const end1 = d1End ? new Date(d1End).getTime() : Infinity
75+
const start2 = d2Start ? new Date(d2Start).getTime() : -Infinity
76+
const end2 = d2End ? new Date(d2End).getTime() : Infinity
77+
78+
// Periods intersect if one period's start is before other period's end
79+
// and that same period's end is after the other period's start
80+
return start1 <= end2 && end1 >= start2
81+
}

0 commit comments

Comments
 (0)