diff --git a/apps/api/src/app/activity/usecases/build-workflow-by-volume-chart/build-workflow-by-volume-chart.usecase.ts b/apps/api/src/app/activity/usecases/build-workflow-by-volume-chart/build-workflow-by-volume-chart.usecase.ts index 487d9f8c46d..70548b76f37 100644 --- a/apps/api/src/app/activity/usecases/build-workflow-by-volume-chart/build-workflow-by-volume-chart.usecase.ts +++ b/apps/api/src/app/activity/usecases/build-workflow-by-volume-chart/build-workflow-by-volume-chart.usecase.ts @@ -1,5 +1,13 @@ import { Injectable } from '@nestjs/common'; -import { InstrumentUsecase, PinoLogger, WorkflowRunRepository } from '@novu/application-generic'; +import { + FeatureFlagsService, + InstrumentUsecase, + PinoLogger, + WorkflowRunCountRepository, + WorkflowRunRepository, +} from '@novu/application-generic'; +import { NotificationTemplateRepository } from '@novu/dal'; +import { FeatureFlagsKeysEnum } from '@novu/shared'; import { WorkflowVolumeDataPointDto } from '../../dtos/get-charts.response.dto'; import { BuildWorkflowByVolumeChartCommand } from './build-workflow-by-volume-chart.command'; @@ -7,6 +15,9 @@ import { BuildWorkflowByVolumeChartCommand } from './build-workflow-by-volume-ch export class BuildWorkflowByVolumeChart { constructor( private workflowRunRepository: WorkflowRunRepository, + private workflowRunCountRepository: WorkflowRunCountRepository, + private featureFlagsService: FeatureFlagsService, + private notificationTemplateRepository: NotificationTemplateRepository, private logger: PinoLogger ) { this.logger.setContext(BuildWorkflowByVolumeChart.name); @@ -16,6 +27,66 @@ export class BuildWorkflowByVolumeChart { async execute(command: BuildWorkflowByVolumeChartCommand): Promise { const { environmentId, organizationId, startDate, endDate, workflowIds } = command; + const isWorkflowRunCountEnabled = await this.featureFlagsService.getFlag({ + key: FeatureFlagsKeysEnum.IS_WORKFLOW_RUN_COUNT_ENABLED, + defaultValue: false, + organization: { _id: organizationId }, + environment: { _id: environmentId }, + }); + + if (isWorkflowRunCountEnabled) { + return this.buildChartFromWorkflowRunCount(startDate, endDate, environmentId, organizationId); + } + + return this.buildChartFromWorkflowRuns(startDate, endDate, environmentId, organizationId, workflowIds); + } + + private async buildChartFromWorkflowRunCount( + startDate: Date, + endDate: Date, + environmentId: string, + organizationId: string + ): Promise { + const workflowVolumes = await this.workflowRunCountRepository.getWorkflowVolumeData( + environmentId, + organizationId, + startDate, + endDate + ); + + if (workflowVolumes.length === 0) { + return []; + } + + const triggerIdentifiers = workflowVolumes.map((row) => row.workflow_run_id); + + const templates = await this.notificationTemplateRepository.findByTriggerIdentifierBulk( + environmentId, + triggerIdentifiers, + { select: ['name', 'triggers'] } + ); + + const nameByIdentifier = new Map(); + for (const template of templates) { + const identifier = template.triggers?.[0]?.identifier; + if (identifier) { + nameByIdentifier.set(identifier, template.name); + } + } + + return workflowVolumes.map((row) => ({ + workflowName: nameByIdentifier.get(row.workflow_run_id) ?? row.workflow_run_id, + count: parseInt(row.count, 10), + })); + } + + private async buildChartFromWorkflowRuns( + startDate: Date, + endDate: Date, + environmentId: string, + organizationId: string, + workflowIds?: string[] + ): Promise { const workflowRuns = await this.workflowRunRepository.getWorkflowVolumeData( environmentId, organizationId, @@ -24,11 +95,9 @@ export class BuildWorkflowByVolumeChart { workflowIds ); - const chartData: WorkflowVolumeDataPointDto[] = workflowRuns.map((workflowRun) => ({ + return workflowRuns.map((workflowRun) => ({ workflowName: workflowRun.workflow_name, count: parseInt(workflowRun.count, 10), })); - - return chartData; } } diff --git a/apps/api/src/app/activity/usecases/build-workflow-runs-trend-chart/build-workflow-runs-trend-chart.usecase.ts b/apps/api/src/app/activity/usecases/build-workflow-runs-trend-chart/build-workflow-runs-trend-chart.usecase.ts index afe37253eaa..2a4ca723ea7 100644 --- a/apps/api/src/app/activity/usecases/build-workflow-runs-trend-chart/build-workflow-runs-trend-chart.usecase.ts +++ b/apps/api/src/app/activity/usecases/build-workflow-runs-trend-chart/build-workflow-runs-trend-chart.usecase.ts @@ -3,7 +3,7 @@ import { FeatureFlagsService, InstrumentUsecase, PinoLogger, - TraceLogRepository, + WorkflowRunCountRepository, WorkflowRunRepository, } from '@novu/application-generic'; import { FeatureFlagsKeysEnum } from '@novu/shared'; @@ -14,7 +14,7 @@ import { BuildWorkflowRunsTrendChartCommand } from './build-workflow-runs-trend- export class BuildWorkflowRunsTrendChart { constructor( private workflowRunRepository: WorkflowRunRepository, - private traceLogRepository: TraceLogRepository, + private workflowRunCountRepository: WorkflowRunCountRepository, private featureFlagsService: FeatureFlagsService, private logger: PinoLogger ) { @@ -25,33 +25,31 @@ export class BuildWorkflowRunsTrendChart { async execute(command: BuildWorkflowRunsTrendChartCommand): Promise { const { environmentId, organizationId, startDate, endDate, workflowIds } = command; - const isTraceBasedEnabled = await this.featureFlagsService.getFlag({ - key: FeatureFlagsKeysEnum.IS_WORKFLOW_RUN_TREND_FROM_TRACES_ENABLED, + const isWorkflowRunCountEnabled = await this.featureFlagsService.getFlag({ + key: FeatureFlagsKeysEnum.IS_WORKFLOW_RUN_COUNT_ENABLED, defaultValue: false, organization: { _id: organizationId }, environment: { _id: environmentId }, }); - if (isTraceBasedEnabled) { - return this.buildChartFromTraces(startDate, endDate, environmentId, organizationId, workflowIds); + if (isWorkflowRunCountEnabled) { + return this.buildChartFromWorkflowRunCount(startDate, endDate, environmentId, organizationId); } return this.buildChartFromWorkflowRuns(startDate, endDate, environmentId, organizationId, workflowIds); } - private async buildChartFromTraces( + private async buildChartFromWorkflowRunCount( startDate: Date, endDate: Date, environmentId: string, - organizationId: string, - workflowIds?: string[] + organizationId: string ): Promise { - const workflowRuns = await this.traceLogRepository.getWorkflowRunsTrendData( + const workflowRuns = await this.workflowRunCountRepository.getWorkflowRunsTrendData( environmentId, organizationId, startDate, - endDate, - workflowIds + endDate ); const dataByDate = new Map(); diff --git a/apps/api/src/app/shared/shared.module.ts b/apps/api/src/app/shared/shared.module.ts index a8b04b0bcce..84ceba31c50 100644 --- a/apps/api/src/app/shared/shared.module.ts +++ b/apps/api/src/app/shared/shared.module.ts @@ -26,6 +26,7 @@ import { storageService, TraceLogRepository, TraceRollupRepository, + WorkflowRunCountRepository, WorkflowRunRepository, } from '@novu/application-generic'; import { @@ -126,6 +127,7 @@ const ANALYTICS_PROVIDERS = [ TraceLogRepository, StepRunRepository, WorkflowRunRepository, + WorkflowRunCountRepository, TraceRollupRepository, DeliveryTrendCountsRepository, diff --git a/apps/dashboard/src/components/analytics/charts/workflow-runs-trend-chart.tsx b/apps/dashboard/src/components/analytics/charts/workflow-runs-trend-chart.tsx index 37b70ee445f..18909076904 100644 --- a/apps/dashboard/src/components/analytics/charts/workflow-runs-trend-chart.tsx +++ b/apps/dashboard/src/components/analytics/charts/workflow-runs-trend-chart.tsx @@ -282,7 +282,7 @@ function WorkflowRunsTrendChartInner({ } export function WorkflowRunsTrendChart(props: WorkflowRunsTrendChartProps) { - const isFinalStatusOnly = useFeatureFlag(FeatureFlagsKeysEnum.IS_WORKFLOW_RUN_TREND_FROM_TRACES_ENABLED); + const isFinalStatusOnly = useFeatureFlag(FeatureFlagsKeysEnum.IS_WORKFLOW_RUN_COUNT_ENABLED); return ; } diff --git a/libs/application-generic/src/services/analytic-logs/workflow-run-count/workflow-run-count.repository.ts b/libs/application-generic/src/services/analytic-logs/workflow-run-count/workflow-run-count.repository.ts index f2fdab3a211..aec6b3f35f0 100644 --- a/libs/application-generic/src/services/analytic-logs/workflow-run-count/workflow-run-count.repository.ts +++ b/libs/application-generic/src/services/analytic-logs/workflow-run-count/workflow-run-count.repository.ts @@ -210,4 +210,87 @@ export class WorkflowRunCountRepository extends LogRepository row.organization_id); } + + async getWorkflowVolumeData( + environmentId: string, + organizationId: string, + startDate: Date, + endDate: Date, + limit: number = 5 + ): Promise> { + const query = ` + SELECT + workflow_run_id, + sum(count) as count + FROM ${WORKFLOW_RUN_COUNT_TABLE_NAME} + WHERE + environment_id = {environmentId:String} + AND organization_id = {organizationId:String} + AND date >= {startDate:Date} + AND date <= {endDate:Date} + AND event_type = 'workflow_run_status_processing' + GROUP BY workflow_run_id + ORDER BY count DESC + LIMIT {limit:UInt32} + `; + + const params: Record = { + environmentId, + organizationId, + startDate: startDate.toISOString().split('T')[0], + endDate: endDate.toISOString().split('T')[0], + limit, + }; + + const result = await this.clickhouseService.query<{ + workflow_run_id: string; + count: string; + }>({ + query, + params, + }); + + return result.data; + } + + async getWorkflowRunsTrendData( + environmentId: string, + organizationId: string, + startDate: Date, + endDate: Date + ): Promise> { + const query = ` + SELECT + date, + event_type, + sum(count) as count + FROM ${WORKFLOW_RUN_COUNT_TABLE_NAME} + WHERE + environment_id = {environmentId:String} + AND organization_id = {organizationId:String} + AND date >= {startDate:Date} + AND date <= {endDate:Date} + AND event_type IN ('workflow_run_status_processing', 'workflow_run_status_completed', 'workflow_run_status_error') + GROUP BY date, event_type + ORDER BY date, event_type + `; + + const params: Record = { + environmentId, + organizationId, + startDate: startDate.toISOString().split('T')[0], + endDate: endDate.toISOString().split('T')[0], + }; + + const result = await this.clickhouseService.query<{ + date: string; + event_type: string; + count: string; + }>({ + query, + params, + }); + + return result.data; + } } diff --git a/libs/dal/src/repositories/notification-template/notification-template.repository.ts b/libs/dal/src/repositories/notification-template/notification-template.repository.ts index 612d4ac826b..8eaf907143e 100644 --- a/libs/dal/src/repositories/notification-template/notification-template.repository.ts +++ b/libs/dal/src/repositories/notification-template/notification-template.repository.ts @@ -63,17 +63,38 @@ export class NotificationTemplateRepository extends BaseRepository< return this.mapEntities(items); } - async findByTriggerIdentifierBulk(environmentId: string, identifiers: string[], session?: ClientSession | null) { + async findByTriggerIdentifierBulk( + environmentId: string, + identifiers: string[], + options?: { session?: ClientSession | null } + ): Promise; + + async findByTriggerIdentifierBulk( + environmentId: string, + identifiers: string[], + options: { session?: ClientSession | null; select: K[] } + ): Promise[]>; + + async findByTriggerIdentifierBulk( + environmentId: string, + identifiers: string[], + options: { session?: ClientSession | null; select?: K[] } = {} + ): Promise[]> { + const { session, select } = options; + const requestQuery: NotificationTemplateQuery = { _environmentId: environmentId, 'triggers.identifier': { $in: identifiers }, }; - const query = this.MongooseModel.find(requestQuery, undefined, { session }).populate('steps.template'); + const projection = select ? Object.fromEntries(select.map((field) => [field, 1])) : undefined; + + const baseQuery = this.MongooseModel.find(requestQuery, projection, { session }); + const query = !select || select.includes('steps' as K) ? baseQuery.populate('steps.template') : baseQuery; const items = await query; - return this.mapEntities(items); + return this.mapEntities(items) as NotificationTemplateEntity[] | Pick[]; } async findByTriggerIdentifier( diff --git a/packages/shared/src/types/feature-flags.ts b/packages/shared/src/types/feature-flags.ts index 15ee13f3919..67cf5eb47b6 100644 --- a/packages/shared/src/types/feature-flags.ts +++ b/packages/shared/src/types/feature-flags.ts @@ -60,7 +60,7 @@ export enum FeatureFlagsKeysEnum { IS_WORKFLOW_RUN_LOGS_READ_ENABLED = 'IS_WORKFLOW_RUN_LOGS_READ_ENABLED', IS_WORKFLOW_RUN_TRACES_WRITE_ENABLED = 'IS_WORKFLOW_RUN_TRACES_WRITE_ENABLED', IS_WORKFLOW_RUN_PAGE_MIGRATION_ENABLED = 'IS_WORKFLOW_RUN_PAGE_MIGRATION_ENABLED', - IS_WORKFLOW_RUN_TREND_FROM_TRACES_ENABLED = 'IS_WORKFLOW_RUN_TREND_FROM_TRACES_ENABLED', + IS_WORKFLOW_RUN_COUNT_ENABLED = 'IS_WORKFLOW_RUN_COUNT_ENABLED', IS_DELIVERY_LIFECYCLE_TRANSITION_ENABLED = 'IS_DELIVERY_LIFECYCLE_TRANSITION_ENABLED', IS_EXECUTION_DETAILS_CLICKHOUSE_ONLY_ENABLED = 'IS_EXECUTION_DETAILS_CLICKHOUSE_ONLY_ENABLED', IS_GET_PREFERENCES_DISABLED = 'IS_GET_PREFERENCES_DISABLED',