Skip to content
Original file line number Diff line number Diff line change
@@ -1,12 +1,23 @@
import { Injectable } from '@nestjs/common';
import { InstrumentUsecase, PinoLogger, WorkflowRunRepository } from '@novu/application-generic';
import {
FeatureFlagsService,
InstrumentUsecase,
PinoLogger,
WorkflowRunCountRepository,
WorkflowRunRepository,
} from '@novu/application-generic';
import { NotificationTemplateRepository } from '@novu/dal';
import { FeatureFlagsKeysEnum } from '@novu/shared';
import { WorkflowVolumeDataPointDto } from '../../dtos/get-charts.response.dto';
import { BuildWorkflowByVolumeChartCommand } from './build-workflow-by-volume-chart.command';

@Injectable()
export class BuildWorkflowByVolumeChart {
constructor(
private workflowRunRepository: WorkflowRunRepository,
private workflowRunCountRepository: WorkflowRunCountRepository,
private featureFlagsService: FeatureFlagsService,
private notificationTemplateRepository: NotificationTemplateRepository,
private logger: PinoLogger
) {
this.logger.setContext(BuildWorkflowByVolumeChart.name);
Expand All @@ -16,6 +27,66 @@ export class BuildWorkflowByVolumeChart {
async execute(command: BuildWorkflowByVolumeChartCommand): Promise<WorkflowVolumeDataPointDto[]> {
const { environmentId, organizationId, startDate, endDate, workflowIds } = command;

const isRollupEnabled = await this.featureFlagsService.getFlag({
key: FeatureFlagsKeysEnum.IS_WORKFLOW_RUN_TREND_FROM_ROLLUP_ENABLED,
defaultValue: false,
organization: { _id: organizationId },
environment: { _id: environmentId },
});

if (isRollupEnabled) {
return this.buildChartFromWorkflowRunCount(startDate, endDate, environmentId, organizationId);
}

return this.buildChartFromWorkflowRuns(startDate, endDate, environmentId, organizationId, workflowIds);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Same observation as the trend chart: workflowIds is silently ignored when rollup is enabled.

When the rollup flag is on, buildChartFromWorkflowRunCount ignores the workflowIds parameter. This is the same pattern as in BuildWorkflowRunsTrendChart. If this is expected, consider at minimum adding a log when workflowIds is non-empty but unused.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@apps/api/src/app/activity/usecases/build-workflow-by-volume-chart/build-workflow-by-volume-chart.usecase.ts`
around lines 30 - 41, The code silently ignores workflowIds when the rollup
feature flag is enabled; update the call-site and implementation so workflowIds
are not dropped: change buildChartFromWorkflowRunCount to accept an optional
workflowIds parameter and apply that filter inside its logic (or if filtering by
workflow is not supported, at minimum add a clear log via processLogger.warn
inside build-workflow-by-volume-chart.usecase.ts before returning to state that
non-empty workflowIds are being ignored when isRollupEnabled is true),
referencing the isRollupEnabled check and the functions
buildChartFromWorkflowRunCount and buildChartFromWorkflowRuns so the behavior is
explicit.

}

private async buildChartFromWorkflowRunCount(
startDate: Date,
endDate: Date,
environmentId: string,
organizationId: string
): Promise<WorkflowVolumeDataPointDto[]> {
const workflowVolumes = await this.workflowRunCountRepository.getWorkflowVolumeData(
environmentId,
organizationId,
startDate,
endDate
);

if (workflowVolumes.length === 0) {
return [];
}

const triggerIdentifiers = workflowVolumes.map((row) => row.workflow_run_id);

const templates = await this.notificationTemplateRepository.findByTriggerIdentifierBulk(
environmentId,
triggerIdentifiers,
{ select: ['name', 'triggers'] }
);

const nameByIdentifier = new Map<string, string>();
for (const template of templates) {
const identifier = template.triggers?.[0]?.identifier;
if (identifier) {
nameByIdentifier.set(identifier, template.name);
}
Comment on lines +70 to +74
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Template name mapping assumes the matching trigger is always first

Line 71 only reads template.triggers?.[0]?.identifier. If the matched identifier is in another trigger entry, the chart falls back to raw workflow_run_id even though a template name exists.

Suggested fix
-const nameByIdentifier = new Map<string, string>();
+const nameByIdentifier = new Map<string, string>();
+const requestedIdentifierSet = new Set(triggerIdentifiers);
 for (const template of templates) {
-  const identifier = template.triggers?.[0]?.identifier;
-  if (identifier) {
-    nameByIdentifier.set(identifier, template.name);
+  for (const trigger of template.triggers ?? []) {
+    if (trigger?.identifier && requestedIdentifierSet.has(trigger.identifier)) {
+      nameByIdentifier.set(trigger.identifier, template.name);
+    }
   }
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@apps/api/src/app/activity/usecases/build-workflow-by-volume-chart/build-workflow-by-volume-chart.usecase.ts`
around lines 70 - 74, The current mapping only reads
template.triggers?.[0]?.identifier so it misses identifiers not at index 0;
update the logic in the loop over templates to iterate all trigger entries
(e.g., for each template in templates, loop through template.triggers) and call
nameByIdentifier.set(identifier, template.name) for each trigger.identifier
found so every trigger identifier is mapped to the template name (use the
existing templates, template.triggers and nameByIdentifier symbols to locate and
fix the code).

}

return workflowVolumes.map((row) => ({
workflowName: nameByIdentifier.get(row.workflow_run_id) ?? row.workflow_run_id,
count: parseInt(row.count, 10),
}));
}

private async buildChartFromWorkflowRuns(
startDate: Date,
endDate: Date,
environmentId: string,
organizationId: string,
workflowIds?: string[]
): Promise<WorkflowVolumeDataPointDto[]> {
const workflowRuns = await this.workflowRunRepository.getWorkflowVolumeData(
environmentId,
organizationId,
Expand All @@ -24,11 +95,9 @@ export class BuildWorkflowByVolumeChart {
workflowIds
);

const chartData: WorkflowVolumeDataPointDto[] = workflowRuns.map((workflowRun) => ({
return workflowRuns.map((workflowRun) => ({
workflowName: workflowRun.workflow_name,
count: parseInt(workflowRun.count, 10),
}));

return chartData;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import {
FeatureFlagsService,
InstrumentUsecase,
PinoLogger,
TraceLogRepository,
WorkflowRunCountRepository,
WorkflowRunRepository,
} from '@novu/application-generic';
import { FeatureFlagsKeysEnum } from '@novu/shared';
Expand All @@ -14,7 +14,7 @@ import { BuildWorkflowRunsTrendChartCommand } from './build-workflow-runs-trend-
export class BuildWorkflowRunsTrendChart {
constructor(
private workflowRunRepository: WorkflowRunRepository,
private traceLogRepository: TraceLogRepository,
private workflowRunCountRepository: WorkflowRunCountRepository,
private featureFlagsService: FeatureFlagsService,
private logger: PinoLogger
) {
Expand All @@ -25,33 +25,31 @@ export class BuildWorkflowRunsTrendChart {
async execute(command: BuildWorkflowRunsTrendChartCommand): Promise<WorkflowRunsTrendDataPointDto[]> {
const { environmentId, organizationId, startDate, endDate, workflowIds } = command;

const isTraceBasedEnabled = await this.featureFlagsService.getFlag({
key: FeatureFlagsKeysEnum.IS_WORKFLOW_RUN_TREND_FROM_TRACES_ENABLED,
const isRollupEnabled = await this.featureFlagsService.getFlag({
key: FeatureFlagsKeysEnum.IS_WORKFLOW_RUN_TREND_FROM_ROLLUP_ENABLED,
defaultValue: false,
organization: { _id: organizationId },
environment: { _id: environmentId },
});

if (isTraceBasedEnabled) {
return this.buildChartFromTraces(startDate, endDate, environmentId, organizationId, workflowIds);
if (isRollupEnabled) {
return this.buildChartFromWorkflowRunCount(startDate, endDate, environmentId, organizationId);
}

return this.buildChartFromWorkflowRuns(startDate, endDate, environmentId, organizationId, workflowIds);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Note: workflowIds filter is silently ignored when rollup is enabled.

When isRollupEnabled is true, buildChartFromWorkflowRunCount doesn't accept or use the workflowIds parameter from the command, meaning any workflow-specific filtering the caller requested is silently dropped. If this is intentional (rollup table doesn't support per-workflow filtering), consider logging a warning when workflowIds is provided but ignored, so callers aren't surprised by unfiltered results.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@apps/api/src/app/activity/usecases/build-workflow-runs-trend-chart/build-workflow-runs-trend-chart.usecase.ts`
around lines 35 - 39, When isRollupEnabled is true the code calls
buildChartFromWorkflowRunCount and silently drops the workflowIds filter; update
the branch that currently returns buildChartFromWorkflowRunCount(startDate,
endDate, environmentId, organizationId) to check if workflowIds is non-empty and
emit a warning via the use case's logger (e.g., this.logger.warn or existing
logging utility) stating that workflowIds are ignored for rollup mode, or
alternatively make buildChartFromWorkflowRunCount accept and apply workflowIds
if the rollup data supports it; reference the isRollupEnabled flag and the
buildChartFromWorkflowRunCount / buildChartFromWorkflowRuns call sites when
implementing the change.

}

private async buildChartFromTraces(
private async buildChartFromWorkflowRunCount(
startDate: Date,
endDate: Date,
environmentId: string,
organizationId: string,
workflowIds?: string[]
organizationId: string
): Promise<WorkflowRunsTrendDataPointDto[]> {
const workflowRuns = await this.traceLogRepository.getWorkflowRunsTrendData(
const workflowRuns = await this.workflowRunCountRepository.getWorkflowRunsTrendData(
environmentId,
organizationId,
startDate,
endDate,
workflowIds
endDate
);

const dataByDate = new Map<string, WorkflowRunsTrendDataPointDto>();
Expand Down
2 changes: 2 additions & 0 deletions apps/api/src/app/shared/shared.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import {
storageService,
TraceLogRepository,
TraceRollupRepository,
WorkflowRunCountRepository,
WorkflowRunRepository,
} from '@novu/application-generic';
import {
Expand Down Expand Up @@ -123,6 +124,7 @@ const ANALYTICS_PROVIDERS = [
TraceLogRepository,
StepRunRepository,
WorkflowRunRepository,
WorkflowRunCountRepository,
TraceRollupRepository,
DeliveryTrendCountsRepository,

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ function FinalStatusWorkflowRunsTrendChart({ data, isLoading, error }: WorkflowR
}

export function WorkflowRunsTrendChart(props: WorkflowRunsTrendChartProps) {
const isFinalStatusOnly = useFeatureFlag(FeatureFlagsKeysEnum.IS_WORKFLOW_RUN_TREND_FROM_TRACES_ENABLED);
const isFinalStatusOnly = useFeatureFlag(FeatureFlagsKeysEnum.IS_WORKFLOW_RUN_TREND_FROM_ROLLUP_ENABLED);

if (isFinalStatusOnly) {
return <FinalStatusWorkflowRunsTrendChart {...props} />;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,4 +217,87 @@ export class WorkflowRunCountRepository extends LogRepository<typeof workflowRun

return result.data.map((row) => row.organization_id);
}

async getWorkflowVolumeData(
environmentId: string,
organizationId: string,
startDate: Date,
endDate: Date,
limit: number = 5
): Promise<Array<{ workflow_run_id: string; count: string }>> {
const query = `
SELECT
workflow_run_id,
sum(count) as count
FROM ${WORKFLOW_RUN_COUNT_TABLE_NAME}
WHERE
environment_id = {environmentId:String}
AND organization_id = {organizationId:String}
AND date >= {startDate:Date}
AND date <= {endDate:Date}
AND event_type = 'workflow_run_status_processing'
GROUP BY workflow_run_id
ORDER BY count DESC
LIMIT {limit:UInt32}
`;

const params: Record<string, unknown> = {
environmentId,
organizationId,
startDate: startDate.toISOString().split('T')[0],
endDate: endDate.toISOString().split('T')[0],
limit,
};

const result = await this.clickhouseService.query<{
workflow_run_id: string;
count: string;
}>({
query,
params,
});

return result.data;
}

async getWorkflowRunsTrendData(
environmentId: string,
organizationId: string,
startDate: Date,
endDate: Date
): Promise<Array<{ date: string; event_type: string; count: string }>> {
const query = `
SELECT
date,
event_type,
sum(count) as count
FROM ${WORKFLOW_RUN_COUNT_TABLE_NAME}
WHERE
environment_id = {environmentId:String}
AND organization_id = {organizationId:String}
AND date >= {startDate:Date}
AND date <= {endDate:Date}
AND event_type IN ('workflow_run_status_processing', 'workflow_run_status_completed', 'workflow_run_status_error')
GROUP BY date, event_type
ORDER BY date, event_type
`;

const params: Record<string, unknown> = {
environmentId,
organizationId,
startDate: startDate.toISOString().split('T')[0],
endDate: endDate.toISOString().split('T')[0],
};

const result = await this.clickhouseService.query<{
date: string;
event_type: string;
count: string;
}>({
query,
params,
});

return result.data;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,17 +63,38 @@ export class NotificationTemplateRepository extends BaseRepository<
return this.mapEntities(items);
}

async findByTriggerIdentifierBulk(environmentId: string, identifiers: string[], session?: ClientSession | null) {
async findByTriggerIdentifierBulk(
environmentId: string,
identifiers: string[],
options?: { session?: ClientSession | null }
): Promise<NotificationTemplateEntity[]>;

async findByTriggerIdentifierBulk<K extends keyof NotificationTemplateEntity>(
environmentId: string,
identifiers: string[],
options: { session?: ClientSession | null; select: K[] }
): Promise<Pick<NotificationTemplateEntity, K>[]>;

async findByTriggerIdentifierBulk<K extends keyof NotificationTemplateEntity>(
environmentId: string,
identifiers: string[],
options: { session?: ClientSession | null; select?: K[] } = {}
): Promise<NotificationTemplateEntity[] | Pick<NotificationTemplateEntity, K>[]> {
const { session, select } = options;

const requestQuery: NotificationTemplateQuery = {
_environmentId: environmentId,
'triggers.identifier': { $in: identifiers },
};

const query = this.MongooseModel.find(requestQuery, undefined, { session }).populate('steps.template');
const projection = select ? Object.fromEntries(select.map((field) => [field, 1])) : undefined;

const baseQuery = this.MongooseModel.find(requestQuery, projection, { session });
const query = !select || select.includes('steps' as K) ? baseQuery.populate('steps.template') : baseQuery;

const items = await query;

return this.mapEntities(items);
return this.mapEntities(items) as NotificationTemplateEntity[] | Pick<NotificationTemplateEntity, K>[];
}

async findByTriggerIdentifier(
Expand Down
2 changes: 1 addition & 1 deletion packages/shared/src/types/feature-flags.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ export enum FeatureFlagsKeysEnum {
IS_WORKFLOW_RUN_LOGS_READ_ENABLED = 'IS_WORKFLOW_RUN_LOGS_READ_ENABLED',
IS_WORKFLOW_RUN_TRACES_WRITE_ENABLED = 'IS_WORKFLOW_RUN_TRACES_WRITE_ENABLED',
IS_WORKFLOW_RUN_PAGE_MIGRATION_ENABLED = 'IS_WORKFLOW_RUN_PAGE_MIGRATION_ENABLED',
IS_WORKFLOW_RUN_TREND_FROM_TRACES_ENABLED = 'IS_WORKFLOW_RUN_TREND_FROM_TRACES_ENABLED',
IS_WORKFLOW_RUN_TREND_FROM_ROLLUP_ENABLED = 'IS_WORKFLOW_RUN_TREND_FROM_ROLLUP_ENABLED',
IS_DELIVERY_LIFECYCLE_TRANSITION_ENABLED = 'IS_DELIVERY_LIFECYCLE_TRANSITION_ENABLED',
IS_EXECUTION_DETAILS_CLICKHOUSE_ONLY_ENABLED = 'IS_EXECUTION_DETAILS_CLICKHOUSE_ONLY_ENABLED',
IS_SUBSCRIBERS_SCHEDULE_ENABLED = 'IS_SUBSCRIBERS_SCHEDULE_ENABLED',
Expand Down
Loading