Skip to content
Original file line number Diff line number Diff line change
@@ -1,12 +1,23 @@
import { Injectable } from '@nestjs/common';
import { InstrumentUsecase, PinoLogger, WorkflowRunRepository } from '@novu/application-generic';
import {
FeatureFlagsService,
InstrumentUsecase,
PinoLogger,
WorkflowRunCountRepository,
WorkflowRunRepository,
} from '@novu/application-generic';
import { NotificationTemplateRepository } from '@novu/dal';
import { FeatureFlagsKeysEnum } from '@novu/shared';
import { WorkflowVolumeDataPointDto } from '../../dtos/get-charts.response.dto';
import { BuildWorkflowByVolumeChartCommand } from './build-workflow-by-volume-chart.command';

@Injectable()
export class BuildWorkflowByVolumeChart {
constructor(
private workflowRunRepository: WorkflowRunRepository,
private workflowRunCountRepository: WorkflowRunCountRepository,
private featureFlagsService: FeatureFlagsService,
private notificationTemplateRepository: NotificationTemplateRepository,
private logger: PinoLogger
) {
this.logger.setContext(BuildWorkflowByVolumeChart.name);
Expand All @@ -16,6 +27,66 @@ export class BuildWorkflowByVolumeChart {
async execute(command: BuildWorkflowByVolumeChartCommand): Promise<WorkflowVolumeDataPointDto[]> {
const { environmentId, organizationId, startDate, endDate, workflowIds } = command;

const isWorkflowRunCountEnabled = await this.featureFlagsService.getFlag({
key: FeatureFlagsKeysEnum.IS_WORKFLOW_RUN_COUNT_ENABLED,
defaultValue: false,
organization: { _id: organizationId },
environment: { _id: environmentId },
});

if (isWorkflowRunCountEnabled) {
return this.buildChartFromWorkflowRunCount(startDate, endDate, environmentId, organizationId);
}

return this.buildChartFromWorkflowRuns(startDate, endDate, environmentId, organizationId, workflowIds);
}

private async buildChartFromWorkflowRunCount(
startDate: Date,
endDate: Date,
environmentId: string,
organizationId: string
): Promise<WorkflowVolumeDataPointDto[]> {
const workflowVolumes = await this.workflowRunCountRepository.getWorkflowVolumeData(
environmentId,
organizationId,
startDate,
endDate
);

if (workflowVolumes.length === 0) {
return [];
}

const triggerIdentifiers = workflowVolumes.map((row) => row.workflow_run_id);

const templates = await this.notificationTemplateRepository.findByTriggerIdentifierBulk(
environmentId,
triggerIdentifiers,
{ select: ['name', 'triggers'] }
);

const nameByIdentifier = new Map<string, string>();
for (const template of templates) {
const identifier = template.triggers?.[0]?.identifier;
if (identifier) {
nameByIdentifier.set(identifier, template.name);
}
Comment on lines +70 to +74
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Template name mapping assumes the matching trigger is always first

Line 71 only reads template.triggers?.[0]?.identifier. If the matched identifier is in another trigger entry, the chart falls back to raw workflow_run_id even though a template name exists.

Suggested fix
-const nameByIdentifier = new Map<string, string>();
+const nameByIdentifier = new Map<string, string>();
+const requestedIdentifierSet = new Set(triggerIdentifiers);
 for (const template of templates) {
-  const identifier = template.triggers?.[0]?.identifier;
-  if (identifier) {
-    nameByIdentifier.set(identifier, template.name);
+  for (const trigger of template.triggers ?? []) {
+    if (trigger?.identifier && requestedIdentifierSet.has(trigger.identifier)) {
+      nameByIdentifier.set(trigger.identifier, template.name);
+    }
   }
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@apps/api/src/app/activity/usecases/build-workflow-by-volume-chart/build-workflow-by-volume-chart.usecase.ts`
around lines 70 - 74, The current mapping only reads
template.triggers?.[0]?.identifier so it misses identifiers not at index 0;
update the logic in the loop over templates to iterate all trigger entries
(e.g., for each template in templates, loop through template.triggers) and call
nameByIdentifier.set(identifier, template.name) for each trigger.identifier
found so every trigger identifier is mapped to the template name (use the
existing templates, template.triggers and nameByIdentifier symbols to locate and
fix the code).

}

return workflowVolumes.map((row) => ({
workflowName: nameByIdentifier.get(row.workflow_run_id) ?? row.workflow_run_id,
count: parseInt(row.count, 10),
}));
}

private async buildChartFromWorkflowRuns(
startDate: Date,
endDate: Date,
environmentId: string,
organizationId: string,
workflowIds?: string[]
): Promise<WorkflowVolumeDataPointDto[]> {
const workflowRuns = await this.workflowRunRepository.getWorkflowVolumeData(
environmentId,
organizationId,
Expand All @@ -24,11 +95,9 @@ export class BuildWorkflowByVolumeChart {
workflowIds
);

const chartData: WorkflowVolumeDataPointDto[] = workflowRuns.map((workflowRun) => ({
return workflowRuns.map((workflowRun) => ({
workflowName: workflowRun.workflow_name,
count: parseInt(workflowRun.count, 10),
}));

return chartData;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import {
FeatureFlagsService,
InstrumentUsecase,
PinoLogger,
TraceLogRepository,
WorkflowRunCountRepository,
WorkflowRunRepository,
} from '@novu/application-generic';
import { FeatureFlagsKeysEnum } from '@novu/shared';
Expand All @@ -14,7 +14,7 @@ import { BuildWorkflowRunsTrendChartCommand } from './build-workflow-runs-trend-
export class BuildWorkflowRunsTrendChart {
constructor(
private workflowRunRepository: WorkflowRunRepository,
private traceLogRepository: TraceLogRepository,
private workflowRunCountRepository: WorkflowRunCountRepository,
private featureFlagsService: FeatureFlagsService,
private logger: PinoLogger
) {
Expand All @@ -25,33 +25,31 @@ export class BuildWorkflowRunsTrendChart {
async execute(command: BuildWorkflowRunsTrendChartCommand): Promise<WorkflowRunsTrendDataPointDto[]> {
const { environmentId, organizationId, startDate, endDate, workflowIds } = command;

const isTraceBasedEnabled = await this.featureFlagsService.getFlag({
key: FeatureFlagsKeysEnum.IS_WORKFLOW_RUN_TREND_FROM_TRACES_ENABLED,
const isWorkflowRunCountEnabled = await this.featureFlagsService.getFlag({
key: FeatureFlagsKeysEnum.IS_WORKFLOW_RUN_COUNT_ENABLED,
defaultValue: false,
organization: { _id: organizationId },
environment: { _id: environmentId },
});

if (isTraceBasedEnabled) {
return this.buildChartFromTraces(startDate, endDate, environmentId, organizationId, workflowIds);
if (isWorkflowRunCountEnabled) {
return this.buildChartFromWorkflowRunCount(startDate, endDate, environmentId, organizationId);
}

return this.buildChartFromWorkflowRuns(startDate, endDate, environmentId, organizationId, workflowIds);
}

private async buildChartFromTraces(
private async buildChartFromWorkflowRunCount(
startDate: Date,
endDate: Date,
environmentId: string,
organizationId: string,
workflowIds?: string[]
organizationId: string
): Promise<WorkflowRunsTrendDataPointDto[]> {
const workflowRuns = await this.traceLogRepository.getWorkflowRunsTrendData(
const workflowRuns = await this.workflowRunCountRepository.getWorkflowRunsTrendData(
environmentId,
organizationId,
startDate,
endDate,
workflowIds
endDate
);

const dataByDate = new Map<string, WorkflowRunsTrendDataPointDto>();
Expand Down
2 changes: 2 additions & 0 deletions apps/api/src/app/shared/shared.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import {
storageService,
TraceLogRepository,
TraceRollupRepository,
WorkflowRunCountRepository,
WorkflowRunRepository,
} from '@novu/application-generic';
import {
Expand Down Expand Up @@ -123,6 +124,7 @@ const ANALYTICS_PROVIDERS = [
TraceLogRepository,
StepRunRepository,
WorkflowRunRepository,
WorkflowRunCountRepository,
TraceRollupRepository,
DeliveryTrendCountsRepository,

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,9 +215,9 @@ function FinalStatusWorkflowRunsTrendChart({ data, isLoading, error }: WorkflowR
}

export function WorkflowRunsTrendChart(props: WorkflowRunsTrendChartProps) {
const isFinalStatusOnly = useFeatureFlag(FeatureFlagsKeysEnum.IS_WORKFLOW_RUN_TREND_FROM_TRACES_ENABLED);
const isWorkflowRunCountEnabled = useFeatureFlag(FeatureFlagsKeysEnum.IS_WORKFLOW_RUN_COUNT_ENABLED);

if (isFinalStatusOnly) {
if (isWorkflowRunCountEnabled) {
return <FinalStatusWorkflowRunsTrendChart {...props} />;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,4 +217,87 @@ export class WorkflowRunCountRepository extends LogRepository<typeof workflowRun

return result.data.map((row) => row.organization_id);
}

async getWorkflowVolumeData(
environmentId: string,
organizationId: string,
startDate: Date,
endDate: Date,
limit: number = 5
): Promise<Array<{ workflow_run_id: string; count: string }>> {
const query = `
SELECT
workflow_run_id,
sum(count) as count
FROM ${WORKFLOW_RUN_COUNT_TABLE_NAME}
WHERE
environment_id = {environmentId:String}
AND organization_id = {organizationId:String}
AND date >= {startDate:Date}
AND date <= {endDate:Date}
AND event_type = 'workflow_run_status_processing'
GROUP BY workflow_run_id
ORDER BY count DESC
LIMIT {limit:UInt32}
`;

const params: Record<string, unknown> = {
environmentId,
organizationId,
startDate: startDate.toISOString().split('T')[0],
endDate: endDate.toISOString().split('T')[0],
limit,
};

const result = await this.clickhouseService.query<{
workflow_run_id: string;
count: string;
}>({
query,
params,
});

return result.data;
}

async getWorkflowRunsTrendData(
environmentId: string,
organizationId: string,
startDate: Date,
endDate: Date
): Promise<Array<{ date: string; event_type: string; count: string }>> {
const query = `
SELECT
date,
event_type,
sum(count) as count
FROM ${WORKFLOW_RUN_COUNT_TABLE_NAME}
WHERE
environment_id = {environmentId:String}
AND organization_id = {organizationId:String}
AND date >= {startDate:Date}
AND date <= {endDate:Date}
AND event_type IN ('workflow_run_status_processing', 'workflow_run_status_completed', 'workflow_run_status_error')
GROUP BY date, event_type
ORDER BY date, event_type
`;

const params: Record<string, unknown> = {
environmentId,
organizationId,
startDate: startDate.toISOString().split('T')[0],
endDate: endDate.toISOString().split('T')[0],
};

const result = await this.clickhouseService.query<{
date: string;
event_type: string;
count: string;
}>({
query,
params,
});

return result.data;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,17 +63,38 @@ export class NotificationTemplateRepository extends BaseRepository<
return this.mapEntities(items);
}

async findByTriggerIdentifierBulk(environmentId: string, identifiers: string[], session?: ClientSession | null) {
async findByTriggerIdentifierBulk(
environmentId: string,
identifiers: string[],
options?: { session?: ClientSession | null }
): Promise<NotificationTemplateEntity[]>;

async findByTriggerIdentifierBulk<K extends keyof NotificationTemplateEntity>(
environmentId: string,
identifiers: string[],
options: { session?: ClientSession | null; select: K[] }
): Promise<Pick<NotificationTemplateEntity, K>[]>;

async findByTriggerIdentifierBulk<K extends keyof NotificationTemplateEntity>(
environmentId: string,
identifiers: string[],
options: { session?: ClientSession | null; select?: K[] } = {}
): Promise<NotificationTemplateEntity[] | Pick<NotificationTemplateEntity, K>[]> {
const { session, select } = options;

const requestQuery: NotificationTemplateQuery = {
_environmentId: environmentId,
'triggers.identifier': { $in: identifiers },
};

const query = this.MongooseModel.find(requestQuery, undefined, { session }).populate('steps.template');
const projection = select ? Object.fromEntries(select.map((field) => [field, 1])) : undefined;

const baseQuery = this.MongooseModel.find(requestQuery, projection, { session });
const query = !select || select.includes('steps' as K) ? baseQuery.populate('steps.template') : baseQuery;

const items = await query;

return this.mapEntities(items);
return this.mapEntities(items) as NotificationTemplateEntity[] | Pick<NotificationTemplateEntity, K>[];
}

async findByTriggerIdentifier(
Expand Down
2 changes: 1 addition & 1 deletion packages/shared/src/types/feature-flags.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ export enum FeatureFlagsKeysEnum {
IS_WORKFLOW_RUN_LOGS_READ_ENABLED = 'IS_WORKFLOW_RUN_LOGS_READ_ENABLED',
IS_WORKFLOW_RUN_TRACES_WRITE_ENABLED = 'IS_WORKFLOW_RUN_TRACES_WRITE_ENABLED',
IS_WORKFLOW_RUN_PAGE_MIGRATION_ENABLED = 'IS_WORKFLOW_RUN_PAGE_MIGRATION_ENABLED',
IS_WORKFLOW_RUN_TREND_FROM_TRACES_ENABLED = 'IS_WORKFLOW_RUN_TREND_FROM_TRACES_ENABLED',
IS_WORKFLOW_RUN_COUNT_ENABLED = 'IS_WORKFLOW_RUN_COUNT_ENABLED',
IS_DELIVERY_LIFECYCLE_TRANSITION_ENABLED = 'IS_DELIVERY_LIFECYCLE_TRANSITION_ENABLED',
IS_EXECUTION_DETAILS_CLICKHOUSE_ONLY_ENABLED = 'IS_EXECUTION_DETAILS_CLICKHOUSE_ONLY_ENABLED',
IS_SUBSCRIBERS_SCHEDULE_ENABLED = 'IS_SUBSCRIBERS_SCHEDULE_ENABLED',
Expand Down
Loading