Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sync all script optimizations #2244

Merged
merged 14 commits into from
Mar 1, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
drop table indexed_entities;
11 changes: 11 additions & 0 deletions backend/src/database/migrations/V1709228609__indexed-entities.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
create table if not exists indexed_entities (
type varchar(255) not null,
entity_id uuid not null,
tenant_id uuid not null,
indexed_at timestamptz not null default now(),

primary key (type, entity_id)
);

create index if not exists ix_indexed_entities_tenant on indexed_entities (tenant_id);
create index if not exists ix_indexed_entities_type on indexed_entities (type);
47 changes: 14 additions & 33 deletions services/libs/opensearch/src/repo/activity.repo.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { DbStore, RepositoryBase } from '@crowd/database'
import { Logger } from '@crowd/logging'
import { IDbActivitySyncData } from './activity.data'
import { IndexedEntityType } from './indexing.data'

export class ActivityRepository extends RepositoryBase<ActivityRepository> {
constructor(dbStore: DbStore, parentLog: Logger) {
Expand Down Expand Up @@ -57,40 +58,20 @@ export class ActivityRepository extends RepositoryBase<ActivityRepository> {
return results.map((r) => r.id)
}

public async getTenantActivitiesForSync(
tenantId: string,
perPage: number,
lastId?: string,
): Promise<string[]> {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
let results: any[]

if (lastId) {
results = await this.db().any(
`
select id from activities
where "tenantId" = $(tenantId) and "deletedAt" is null and id > $(lastId)
order by id
limit ${perPage};
`,
{
tenantId,
lastId,
},
)
} else {
results = await this.db().any(
`
select id from activities
where "tenantId" = $(tenantId) and "deletedAt" is null
order by id
limit ${perPage};
public async getTenantActivitiesForSync(tenantId: string, perPage: number): Promise<string[]> {
const results = await this.db().any(
`
select id from activities a
left join indexed_entities ie on
a.id = ie.entity_id and ie.type = $(type)
where a."tenantId" = $(tenantId) and a."deletedAt" is null and ie.entity_id is null
limit ${perPage}
`,
{
tenantId,
},
)
}
{
tenantId,
type: IndexedEntityType.ACTIVITY,
},
)

return results.map((r) => r.id)
}
Expand Down
10 changes: 10 additions & 0 deletions services/libs/opensearch/src/repo/indexing.data.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
export interface IEntityData {
id: string
tenantId: string
}

export enum IndexedEntityType {
ACTIVITY = 'activity',
MEMBER = 'member',
ORGANIZATION = 'organization',
}
22 changes: 22 additions & 0 deletions services/libs/opensearch/src/repo/indexing.repo.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import { DbStore, RepositoryBase } from '@crowd/database'
import { Logger } from '@crowd/logging'
import { IEntityData, IndexedEntityType } from './indexing.data'

export class IndexingRepository extends RepositoryBase<IndexingRepository> {
constructor(dbStore: DbStore, parentLog: Logger) {
super(dbStore, parentLog)
}

public async markEntitiesIndexed(type: IndexedEntityType, data: IEntityData[]): Promise<void> {
if (data.length > 0) {
const values = data.map((d) => `('${type}', '${d.id}', '${d.tenantId}')`)
const query = `
insert into indexed_entities(type, entity_id, tenant_id)
values ${values.join(',\n')}
on conflict (type, entity_id)
do update set indexed_at = now()
`
await this.db().none(query)
}
}
}
Loading
Loading