Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apps/api/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@
"redis": "^4.6.13",
"reflect-metadata": "^0.1.13",
"replicate": "^1.1.0",
"resend": "^4.0.1",
"resend": "^6.6.0",
"rxjs": "^7.2.0",
"sharp": "^0.33.5",
"strip-ansi": "^7.1.2",
Expand Down
3 changes: 3 additions & 0 deletions apps/api/src/modules/config/app.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ export default () => ({
sender: process.env.EMAIL_SENDER || 'Refly <[email protected]>',
payloadMode: process.env.EMAIL_PAYLOAD_MODE || 'base64', // 'url' or 'base64'
resendApiKey: process.env.RESEND_API_KEY || 're_123',
maxRetries: Number.parseInt(process.env.EMAIL_MAX_RETRIES) || 3,
baseDelayMs: Number.parseInt(process.env.EMAIL_BASE_DELAY_MS) || 500,
minTimeBetweenEmailsMs: Number.parseInt(process.env.EMAIL_MIN_TIME_BETWEEN_MS) || 500, // 2 QPS = 500ms between emails
},
auth: {
skipVerification: process.env.AUTH_SKIP_VERIFICATION === 'true' || false,
Expand Down
103 changes: 94 additions & 9 deletions apps/api/src/modules/notification/notification.service.ts
Original file line number Diff line number Diff line change
@@ -1,22 +1,111 @@
import { Injectable, Logger } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { Attachment, Resend } from 'resend';
import { Attachment, ErrorResponse, Resend } from 'resend';
import { SendEmailRequest, User } from '@refly/openapi-schema';
import { PrismaService } from '../common/prisma.service';
import { ParamsError } from '@refly/errors';
import { MiscService } from '../misc/misc.service';
import { guard } from '../../utils/guard';

@Injectable()
export class NotificationService {
private readonly logger = new Logger(NotificationService.name);
private readonly resend: Resend;
private readonly maxRetries: number;
private readonly baseDelayMs: number;
private lastEmailSentAt = 0;
private readonly minTimeBetweenEmailsMs: number;

constructor(
private readonly configService: ConfigService,
private readonly prisma: PrismaService,
private readonly miscService: MiscService,
) {
this.resend = new Resend(this.configService.get('email.resendApiKey'));
this.maxRetries = this.configService.get<number>('email.maxRetries') ?? 3;
this.baseDelayMs = this.configService.get<number>('email.baseDelayMs') ?? 500;
this.minTimeBetweenEmailsMs =
this.configService.get<number>('email.minTimeBetweenEmailsMs') ?? 500;
}

/**
* Ensure minimum time between email sends to respect rate limits
*/
private async enforceRateLimit(): Promise<void> {
const now = Date.now();
const timeSinceLastEmail = now - this.lastEmailSentAt;

if (timeSinceLastEmail < this.minTimeBetweenEmailsMs) {
const delayNeeded = this.minTimeBetweenEmailsMs - timeSinceLastEmail;
this.logger.debug(`Rate limiting: waiting ${delayNeeded}ms before sending next email`);
await new Promise((resolve) => setTimeout(resolve, delayNeeded));
}

this.lastEmailSentAt = Date.now();
}

/**
* Check if error is rate limit related
*/
private isRateLimitError(error: ErrorResponse): boolean {
return (
error?.name === 'rate_limit_exceeded' ||
error?.message?.toLowerCase().includes('rate') ||
error?.message?.toLowerCase().includes('limit')
);
}

/**
* Send email with retry logic for rate limit errors using guard.retry
* @param emailData - Email data to send
* @returns Resend response
*/
private async sendEmailWithRetry(emailData: {
from: string;
to: string;
subject: string;
html: string;
attachments?: Attachment[];
}): Promise<any> {
return guard
.retry(
async () => {
// Enforce rate limit before sending
await this.enforceRateLimit();

const res = await this.resend.emails.send(emailData);

// Check for rate limit error in response
if (res.error) {
if (this.isRateLimitError(res.error)) {
// Throw to trigger retry
throw new Error(`Rate limit exceeded: ${res.error.message}`);
}
// Other errors should not be retried
throw new Error(res.error.message);
}

return res;
},
{
maxAttempts: this.maxRetries,
initialDelay: this.baseDelayMs,
maxDelay: this.baseDelayMs * 2 ** (this.maxRetries - 1),
backoffFactor: 2,
retryIf: (error: any) => this.isRateLimitError(error),
onRetry: (error: any, attempt: number) => {
this.logger.warn(
`Rate limit error detected. Retrying... (attempt ${attempt}/${this.maxRetries}): ${error.message}`,
);
},
},
)
.orThrow((error: any) => {
this.logger.error(
`Failed to send email after ${this.maxRetries} retries: ${error.message}`,
);
return new Error(`Failed to send email: ${error.message}`);
});
}

/**
Expand Down Expand Up @@ -171,20 +260,16 @@ export class NotificationService {
attachments = await Promise.all(attachmentUrls.map((url) => this.processAttachmentURL(url)));
}

const res = await this.resend.emails.send({
await this.sendEmailWithRetry({
from: sender,
to: receiver,
subject,
html,
attachments,
});

this.logger.log(`Email sent successfully to ${receiver}`);

if (res.error) {
throw new Error(res.error?.message);
}

this.logger.log(`Email sent to successfully in ${new Date().getTime() - now.getTime()}ms`);
this.logger.log(
`Email sent successfully to ${receiver} in ${new Date().getTime() - now.getTime()}ms`,
);
}
}
Loading