From 93be320cbeec531d9be91791d6fb13d0b488ad13 Mon Sep 17 00:00:00 2001 From: neo773 Date: Tue, 4 Nov 2025 02:34:31 +0530 Subject: [PATCH 1/5] refactor reconnect account logic --- .../constants/standard-field-ids.ts | 1 + .../calendar-event-import-manager.module.ts | 9 +- ...launch-failed-calendar-channels.command.ts | 85 -------- ...h-failed-calendar-channels.cron.command.ts | 35 +++ ...aunch-failed-calendar-channels.cron.job.ts | 74 +++++++ ...ar-relaunch-failed-calendar-channel.job.ts | 69 ++++++ .../google-api-refresh-tokens.service.ts | 34 +-- .../microsoft-api-refresh-tokens.service.ts | 65 +++--- ...is-access-token-expired-or-invalid.util.ts | 25 --- ...nected-account-refresh-tokens.exception.ts | 1 + ...ted-account-refresh-tokens.service.spec.ts | 203 ++++++++++++++++-- ...onnected-account-refresh-tokens.service.ts | 52 ++--- .../connected-account.workspace-entity.ts | 10 + .../message-channel-sync-status.service.ts | 48 +++-- ...elaunch-failed-message-channels.command.ts | 84 -------- ...ch-failed-message-channels.cron.command.ts | 35 +++ ...launch-failed-message-channels.cron.job.ts | 74 +++++++ ...ing-relaunch-failed-message-channel.job.ts | 69 ++++++ .../messaging-import-manager.module.ts | 9 +- ...essaging-account-authentication.service.ts | 1 + 20 files changed, 671 insertions(+), 312 deletions(-) delete mode 100644 packages/twenty-server/src/modules/calendar/calendar-event-import-manager/commands/calendar-relaunch-failed-calendar-channels.command.ts create mode 100644 packages/twenty-server/src/modules/calendar/calendar-event-import-manager/crons/commands/calendar-relaunch-failed-calendar-channels.cron.command.ts create mode 100644 packages/twenty-server/src/modules/calendar/calendar-event-import-manager/crons/jobs/calendar-relaunch-failed-calendar-channels.cron.job.ts create mode 100644 packages/twenty-server/src/modules/calendar/calendar-event-import-manager/jobs/calendar-relaunch-failed-calendar-channel.job.ts delete mode 100644 packages/twenty-server/src/modules/connected-account/refresh-tokens-manager/drivers/microsoft/utils/is-access-token-expired-or-invalid.util.ts delete mode 100644 packages/twenty-server/src/modules/messaging/message-import-manager/commands/messaging-relaunch-failed-message-channels.command.ts create mode 100644 packages/twenty-server/src/modules/messaging/message-import-manager/crons/commands/messaging-relaunch-failed-message-channels.cron.command.ts create mode 100644 packages/twenty-server/src/modules/messaging/message-import-manager/crons/jobs/messaging-relaunch-failed-message-channels.cron.job.ts create mode 100644 packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-relaunch-failed-message-channel.job.ts diff --git a/packages/twenty-server/src/engine/workspace-manager/workspace-sync-metadata/constants/standard-field-ids.ts b/packages/twenty-server/src/engine/workspace-manager/workspace-sync-metadata/constants/standard-field-ids.ts index 9ba624f8c686d..7a797bc4e1e56 100644 --- a/packages/twenty-server/src/engine/workspace-manager/workspace-sync-metadata/constants/standard-field-ids.ts +++ b/packages/twenty-server/src/engine/workspace-manager/workspace-sync-metadata/constants/standard-field-ids.ts @@ -149,6 +149,7 @@ export const CONNECTED_ACCOUNT_STANDARD_FIELD_IDS = { accountOwner: '20202020-3517-4896-afac-b1d0aa362af6', lastSyncHistoryId: '20202020-115c-4a87-b50f-ac4367a971b9', authFailedAt: '20202020-d268-4c6b-baff-400d402b430a', + lastCredentialsRefreshedAt: '20202020-aa5e-4e85-903b-fdf90a941941', messageChannels: '20202020-24f7-4362-8468-042204d1e445', calendarChannels: '20202020-af4a-47bb-99ec-51911c1d3977', handleAliases: '20202020-8a3d-46be-814f-6228af16c47b', diff --git a/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/calendar-event-import-manager.module.ts b/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/calendar-event-import-manager.module.ts index 09c7dfbc51f34..2df94eb6430f3 100644 --- a/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/calendar-event-import-manager.module.ts +++ b/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/calendar-event-import-manager.module.ts @@ -10,19 +10,21 @@ import { ObjectMetadataRepositoryModule } from 'src/engine/object-metadata-repos import { WorkspaceDataSourceModule } from 'src/engine/workspace-datasource/workspace-datasource.module'; import { BlocklistWorkspaceEntity } from 'src/modules/blocklist/standard-objects/blocklist.workspace-entity'; import { CalendarEventCleanerModule } from 'src/modules/calendar/calendar-event-cleaner/calendar-event-cleaner.module'; -import { CalendarRelaunchFailedCalendarChannelsCommand } from 'src/modules/calendar/calendar-event-import-manager/commands/calendar-relaunch-failed-calendar-channels.command'; import { CalendarEventListFetchCronCommand } from 'src/modules/calendar/calendar-event-import-manager/crons/commands/calendar-event-list-fetch.cron.command'; import { CalendarEventsImportCronCommand } from 'src/modules/calendar/calendar-event-import-manager/crons/commands/calendar-import.cron.command'; import { CalendarOngoingStaleCronCommand } from 'src/modules/calendar/calendar-event-import-manager/crons/commands/calendar-ongoing-stale.cron.command'; +import { CalendarRelaunchFailedCalendarChannelsCronCommand } from 'src/modules/calendar/calendar-event-import-manager/crons/commands/calendar-relaunch-failed-calendar-channels.cron.command'; import { CalendarEventListFetchCronJob } from 'src/modules/calendar/calendar-event-import-manager/crons/jobs/calendar-event-list-fetch.cron.job'; import { CalendarEventsImportCronJob } from 'src/modules/calendar/calendar-event-import-manager/crons/jobs/calendar-events-import.cron.job'; import { CalendarOngoingStaleCronJob } from 'src/modules/calendar/calendar-event-import-manager/crons/jobs/calendar-ongoing-stale.cron.job'; +import { CalendarRelaunchFailedCalendarChannelsCronJob } from 'src/modules/calendar/calendar-event-import-manager/crons/jobs/calendar-relaunch-failed-calendar-channels.cron.job'; import { CalDavDriverModule } from 'src/modules/calendar/calendar-event-import-manager/drivers/caldav/caldav-driver.module'; import { GoogleCalendarDriverModule } from 'src/modules/calendar/calendar-event-import-manager/drivers/google-calendar/google-calendar-driver.module'; import { MicrosoftCalendarDriverModule } from 'src/modules/calendar/calendar-event-import-manager/drivers/microsoft-calendar/microsoft-calendar-driver.module'; import { CalendarEventListFetchJob } from 'src/modules/calendar/calendar-event-import-manager/jobs/calendar-event-list-fetch.job'; import { CalendarEventsImportJob } from 'src/modules/calendar/calendar-event-import-manager/jobs/calendar-events-import.job'; import { CalendarOngoingStaleJob } from 'src/modules/calendar/calendar-event-import-manager/jobs/calendar-ongoing-stale.job'; +import { CalendarRelaunchFailedCalendarChannelJob } from 'src/modules/calendar/calendar-event-import-manager/jobs/calendar-relaunch-failed-calendar-channel.job'; import { CalendarAccountAuthenticationService } from 'src/modules/calendar/calendar-event-import-manager/services/calendar-account-authentication.service'; import { CalendarEventImportErrorHandlerService } from 'src/modules/calendar/calendar-event-import-manager/services/calendar-event-import-exception-handler.service'; import { CalendarEventsImportService } from 'src/modules/calendar/calendar-event-import-manager/services/calendar-events-import.service'; @@ -71,8 +73,10 @@ import { RefreshTokensManagerModule } from 'src/modules/connected-account/refres CalendarEventsImportJob, CalendarOngoingStaleCronJob, CalendarOngoingStaleCronCommand, - CalendarRelaunchFailedCalendarChannelsCommand, CalendarOngoingStaleJob, + CalendarRelaunchFailedCalendarChannelsCronJob, + CalendarRelaunchFailedCalendarChannelsCronCommand, + CalendarRelaunchFailedCalendarChannelJob, ], exports: [ CalendarEventsImportService, @@ -80,6 +84,7 @@ import { RefreshTokensManagerModule } from 'src/modules/connected-account/refres CalendarEventListFetchCronCommand, CalendarEventsImportCronCommand, CalendarOngoingStaleCronCommand, + CalendarRelaunchFailedCalendarChannelsCronCommand, ], }) export class CalendarEventImportManagerModule {} diff --git a/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/commands/calendar-relaunch-failed-calendar-channels.command.ts b/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/commands/calendar-relaunch-failed-calendar-channels.command.ts deleted file mode 100644 index 09eef091b5076..0000000000000 --- a/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/commands/calendar-relaunch-failed-calendar-channels.command.ts +++ /dev/null @@ -1,85 +0,0 @@ -import { InjectRepository } from '@nestjs/typeorm'; - -import { Command } from 'nest-commander'; -import { Repository } from 'typeorm'; - -import { - ActiveOrSuspendedWorkspacesMigrationCommandRunner, - type RunOnWorkspaceArgs, -} from 'src/database/commands/command-runners/active-or-suspended-workspaces-migration.command-runner'; -import { WorkspaceEntity } from 'src/engine/core-modules/workspace/workspace.entity'; -import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager'; -import { - CalendarChannelSyncStage, - CalendarChannelSyncStatus, - CalendarChannelWorkspaceEntity, -} from 'src/modules/calendar/common/standard-objects/calendar-channel.workspace-entity'; -import { AccountsToReconnectService } from 'src/modules/connected-account/services/accounts-to-reconnect.service'; - -@Command({ - name: 'calendar:relaunch-failed-calendar-channels', - description: 'Relaunch failed message channels', -}) -export class CalendarRelaunchFailedCalendarChannelsCommand extends ActiveOrSuspendedWorkspacesMigrationCommandRunner { - constructor( - @InjectRepository(WorkspaceEntity) - protected readonly workspaceRepository: Repository, - protected readonly twentyORMGlobalManager: TwentyORMGlobalManager, - protected readonly accountsToReconnectService: AccountsToReconnectService, - ) { - super(workspaceRepository, twentyORMGlobalManager); - } - - override async runOnWorkspace({ - workspaceId, - options, - }: RunOnWorkspaceArgs): Promise { - try { - const calendarChannelRepository = - await this.twentyORMGlobalManager.getRepositoryForWorkspace( - workspaceId, - 'calendarChannel', - { shouldBypassPermissionChecks: true }, - ); - - const failedCalendarChannels = await calendarChannelRepository.find({ - where: { - syncStage: CalendarChannelSyncStage.FAILED, - }, - relations: { - connectedAccount: { - accountOwner: true, - }, - }, - }); - - if (!options.dryRun && failedCalendarChannels.length > 0) { - await calendarChannelRepository.update( - failedCalendarChannels.map(({ id }) => id), - { - syncStage: - CalendarChannelSyncStage.CALENDAR_EVENT_LIST_FETCH_PENDING, - syncStatus: CalendarChannelSyncStatus.ACTIVE, - }, - ); - - for (const failedCalendarChannel of failedCalendarChannels) { - await this.accountsToReconnectService.removeAccountToReconnect( - failedCalendarChannel.connectedAccount.accountOwner.userId, - failedCalendarChannel.connectedAccountId, - workspaceId, - ); - } - } - - this.logger.log( - `${options.dryRun ? ' (DRY RUN): ' : ''}Relaunched ${failedCalendarChannels.length} failed calendar channels`, - ); - } catch (error) { - this.logger.error( - 'Error while relaunching failed message channels', - error, - ); - } - } -} diff --git a/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/crons/commands/calendar-relaunch-failed-calendar-channels.cron.command.ts b/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/crons/commands/calendar-relaunch-failed-calendar-channels.cron.command.ts new file mode 100644 index 0000000000000..44417b2d76b5c --- /dev/null +++ b/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/crons/commands/calendar-relaunch-failed-calendar-channels.cron.command.ts @@ -0,0 +1,35 @@ +import { Command, CommandRunner } from 'nest-commander'; + +import { InjectMessageQueue } from 'src/engine/core-modules/message-queue/decorators/message-queue.decorator'; +import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants'; +import { MessageQueueService } from 'src/engine/core-modules/message-queue/services/message-queue.service'; +import { + CALENDAR_RELAUNCH_FAILED_CALENDAR_CHANNELS_CRON_PATTERN, + CalendarRelaunchFailedCalendarChannelsCronJob, +} from 'src/modules/calendar/calendar-event-import-manager/crons/jobs/calendar-relaunch-failed-calendar-channels.cron.job'; + +@Command({ + name: 'cron:calendar:relaunch-failed-calendar-channels', + description: + 'Starts a cron job to relaunch failed calendar channels every 30 minutes', +}) +export class CalendarRelaunchFailedCalendarChannelsCronCommand extends CommandRunner { + constructor( + @InjectMessageQueue(MessageQueue.cronQueue) + private readonly messageQueueService: MessageQueueService, + ) { + super(); + } + + async run(): Promise { + await this.messageQueueService.addCron({ + jobName: CalendarRelaunchFailedCalendarChannelsCronJob.name, + data: undefined, + options: { + repeat: { + pattern: CALENDAR_RELAUNCH_FAILED_CALENDAR_CHANNELS_CRON_PATTERN, + }, + }, + }); + } +} diff --git a/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/crons/jobs/calendar-relaunch-failed-calendar-channels.cron.job.ts b/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/crons/jobs/calendar-relaunch-failed-calendar-channels.cron.job.ts new file mode 100644 index 0000000000000..05658cb491e2f --- /dev/null +++ b/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/crons/jobs/calendar-relaunch-failed-calendar-channels.cron.job.ts @@ -0,0 +1,74 @@ +import { InjectDataSource, InjectRepository } from '@nestjs/typeorm'; + +import { WorkspaceActivationStatus } from 'twenty-shared/workspace'; +import { DataSource, Repository } from 'typeorm'; + +import { SentryCronMonitor } from 'src/engine/core-modules/cron/sentry-cron-monitor.decorator'; +import { ExceptionHandlerService } from 'src/engine/core-modules/exception-handler/exception-handler.service'; +import { InjectMessageQueue } from 'src/engine/core-modules/message-queue/decorators/message-queue.decorator'; +import { Process } from 'src/engine/core-modules/message-queue/decorators/process.decorator'; +import { Processor } from 'src/engine/core-modules/message-queue/decorators/processor.decorator'; +import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants'; +import { MessageQueueService } from 'src/engine/core-modules/message-queue/services/message-queue.service'; +import { WorkspaceEntity } from 'src/engine/core-modules/workspace/workspace.entity'; +import { getWorkspaceSchemaName } from 'src/engine/workspace-datasource/utils/get-workspace-schema-name.util'; +import { + CalendarRelaunchFailedCalendarChannelJob, + type CalendarRelaunchFailedCalendarChannelJobData, +} from 'src/modules/calendar/calendar-event-import-manager/jobs/calendar-relaunch-failed-calendar-channel.job'; +import { CalendarChannelSyncStage } from 'src/modules/calendar/common/standard-objects/calendar-channel.workspace-entity'; + +export const CALENDAR_RELAUNCH_FAILED_CALENDAR_CHANNELS_CRON_PATTERN = + '*/30 * * * *'; + +@Processor(MessageQueue.cronQueue) +export class CalendarRelaunchFailedCalendarChannelsCronJob { + constructor( + @InjectRepository(WorkspaceEntity) + private readonly workspaceRepository: Repository, + @InjectMessageQueue(MessageQueue.calendarQueue) + private readonly messageQueueService: MessageQueueService, + @InjectDataSource() + private readonly coreDataSource: DataSource, + private readonly exceptionHandlerService: ExceptionHandlerService, + ) {} + + @Process(CalendarRelaunchFailedCalendarChannelsCronJob.name) + @SentryCronMonitor( + CalendarRelaunchFailedCalendarChannelsCronJob.name, + CALENDAR_RELAUNCH_FAILED_CALENDAR_CHANNELS_CRON_PATTERN, + ) + async handle(): Promise { + const activeWorkspaces = await this.workspaceRepository.find({ + where: { + activationStatus: WorkspaceActivationStatus.ACTIVE, + }, + }); + + for (const activeWorkspace of activeWorkspaces) { + try { + const schemaName = getWorkspaceSchemaName(activeWorkspace.id); + + const failedCalendarChannels = await this.coreDataSource.query( + `SELECT * FROM ${schemaName}."calendarChannel" WHERE "syncStage" = '${CalendarChannelSyncStage.FAILED}'`, + ); + + for (const calendarChannel of failedCalendarChannels) { + await this.messageQueueService.add( + CalendarRelaunchFailedCalendarChannelJob.name, + { + workspaceId: activeWorkspace.id, + calendarChannelId: calendarChannel.id, + }, + ); + } + } catch (error) { + this.exceptionHandlerService.captureExceptions([error], { + workspace: { + id: activeWorkspace.id, + }, + }); + } + } + } +} diff --git a/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/jobs/calendar-relaunch-failed-calendar-channel.job.ts b/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/jobs/calendar-relaunch-failed-calendar-channel.job.ts new file mode 100644 index 0000000000000..fb64cf69f6e0d --- /dev/null +++ b/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/jobs/calendar-relaunch-failed-calendar-channel.job.ts @@ -0,0 +1,69 @@ +import { Scope } from '@nestjs/common'; + +import { Process } from 'src/engine/core-modules/message-queue/decorators/process.decorator'; +import { Processor } from 'src/engine/core-modules/message-queue/decorators/processor.decorator'; +import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants'; +import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager'; +import { + CalendarChannelSyncStage, + CalendarChannelSyncStatus, + CalendarChannelWorkspaceEntity, +} from 'src/modules/calendar/common/standard-objects/calendar-channel.workspace-entity'; +import { AccountsToReconnectService } from 'src/modules/connected-account/services/accounts-to-reconnect.service'; + +export type CalendarRelaunchFailedCalendarChannelJobData = { + workspaceId: string; + calendarChannelId: string; +}; + +@Processor({ + queueName: MessageQueue.calendarQueue, + scope: Scope.REQUEST, +}) +export class CalendarRelaunchFailedCalendarChannelJob { + constructor( + private readonly twentyORMGlobalManager: TwentyORMGlobalManager, + private readonly accountsToReconnectService: AccountsToReconnectService, + ) {} + + @Process(CalendarRelaunchFailedCalendarChannelJob.name) + async handle(data: CalendarRelaunchFailedCalendarChannelJobData) { + const { workspaceId, calendarChannelId } = data; + + const calendarChannelRepository = + await this.twentyORMGlobalManager.getRepositoryForWorkspace( + workspaceId, + 'calendarChannel', + { shouldBypassPermissionChecks: true }, + ); + + const calendarChannel = await calendarChannelRepository.findOne({ + where: { + id: calendarChannelId, + }, + relations: { + connectedAccount: { + accountOwner: true, + }, + }, + }); + + if ( + !calendarChannel || + calendarChannel.syncStage !== CalendarChannelSyncStage.FAILED + ) { + return; + } + + await calendarChannelRepository.update(calendarChannelId, { + syncStage: CalendarChannelSyncStage.CALENDAR_EVENT_LIST_FETCH_PENDING, + syncStatus: CalendarChannelSyncStatus.ACTIVE, + }); + + await this.accountsToReconnectService.removeAccountToReconnect( + calendarChannel.connectedAccount.accountOwner.userId, + calendarChannel.connectedAccountId, + workspaceId, + ); + } +} diff --git a/packages/twenty-server/src/modules/connected-account/refresh-tokens-manager/drivers/google/services/google-api-refresh-tokens.service.ts b/packages/twenty-server/src/modules/connected-account/refresh-tokens-manager/drivers/google/services/google-api-refresh-tokens.service.ts index 59101ad2fc147..14b7ffc30e7f0 100644 --- a/packages/twenty-server/src/modules/connected-account/refresh-tokens-manager/drivers/google/services/google-api-refresh-tokens.service.ts +++ b/packages/twenty-server/src/modules/connected-account/refresh-tokens-manager/drivers/google/services/google-api-refresh-tokens.service.ts @@ -1,6 +1,6 @@ import { Injectable } from '@nestjs/common'; -import { isDefined } from 'class-validator'; +import { GaxiosError } from 'gaxios'; import { google } from 'googleapis'; import { TwentyConfigService } from 'src/engine/core-modules/twenty-config/twenty-config.service'; @@ -23,19 +23,25 @@ export class GoogleAPIRefreshAccessTokenService { oAuth2Client.setCredentials({ refresh_token: refreshToken, }); - - const { token } = await oAuth2Client.getAccessToken(); - - if (!isDefined(token)) { - throw new ConnectedAccountRefreshAccessTokenException( - 'Failed to refresh google access token', - ConnectedAccountRefreshAccessTokenExceptionCode.REFRESH_ACCESS_TOKEN_FAILED, - ); + try { + const { token } = await oAuth2Client.getAccessToken(); + + return { + accessToken: token as string, + refreshToken, + }; + } catch (error) { + if ( + error instanceof GaxiosError && + error.response?.data?.error === 'invalid_grant' + ) { + throw new ConnectedAccountRefreshAccessTokenException( + 'Error refreshing Google tokens: Invalid refresh token', + ConnectedAccountRefreshAccessTokenExceptionCode.INVALID_REFRESH_TOKEN, + ); + } + + throw error; } - - return { - accessToken: token as string, - refreshToken, - }; } } diff --git a/packages/twenty-server/src/modules/connected-account/refresh-tokens-manager/drivers/microsoft/services/microsoft-api-refresh-tokens.service.ts b/packages/twenty-server/src/modules/connected-account/refresh-tokens-manager/drivers/microsoft/services/microsoft-api-refresh-tokens.service.ts index badbc2c51a131..06e86698ec238 100644 --- a/packages/twenty-server/src/modules/connected-account/refresh-tokens-manager/drivers/microsoft/services/microsoft-api-refresh-tokens.service.ts +++ b/packages/twenty-server/src/modules/connected-account/refresh-tokens-manager/drivers/microsoft/services/microsoft-api-refresh-tokens.service.ts @@ -1,9 +1,12 @@ import { Injectable } from '@nestjs/common'; -import axios from 'axios'; -import { z } from 'zod'; +import axios, { AxiosError } from 'axios'; import { TwentyConfigService } from 'src/engine/core-modules/twenty-config/twenty-config.service'; +import { + ConnectedAccountRefreshAccessTokenException, + ConnectedAccountRefreshAccessTokenExceptionCode, +} from 'src/modules/connected-account/refresh-tokens-manager/exceptions/connected-account-refresh-tokens.exception'; import { type ConnectedAccountTokens } from 'src/modules/connected-account/refresh-tokens-manager/services/connected-account-refresh-tokens.service'; export type MicrosoftTokens = { @@ -19,36 +22,46 @@ interface MicrosoftRefreshTokenResponse { expires_in: number; id_token?: string; } + @Injectable() export class MicrosoftAPIRefreshAccessTokenService { constructor(private readonly twentyConfigService: TwentyConfigService) {} async refreshTokens(refreshToken: string): Promise { - const response = await axios.post( - 'https://login.microsoftonline.com/common/oauth2/v2.0/token', - new URLSearchParams({ - client_id: this.twentyConfigService.get('AUTH_MICROSOFT_CLIENT_ID'), - client_secret: this.twentyConfigService.get( - 'AUTH_MICROSOFT_CLIENT_SECRET', - ), - refresh_token: refreshToken, - grant_type: 'refresh_token', - }), - { - headers: { - 'Content-Type': 'application/x-www-form-urlencoded', + try { + const response = await axios.post( + 'https://login.microsoftonline.com/common/oauth2/v2.0/token', + new URLSearchParams({ + client_id: this.twentyConfigService.get('AUTH_MICROSOFT_CLIENT_ID'), + client_secret: this.twentyConfigService.get( + 'AUTH_MICROSOFT_CLIENT_SECRET', + ), + refresh_token: refreshToken, + grant_type: 'refresh_token', + }), + { + headers: { + 'Content-Type': 'application/x-www-form-urlencoded', + }, }, - }, - ); - - z.object({ - access_token: z.string(), - refresh_token: z.string(), - }).parse(response.data); + ); + const responseData = response.data as MicrosoftRefreshTokenResponse; - return { - accessToken: response.data.access_token, - refreshToken: response.data.refresh_token, - }; + return { + accessToken: responseData.access_token, + refreshToken: responseData.refresh_token, + }; + } catch (error) { + if ( + error instanceof AxiosError && + error.response?.data?.error === 'invalid_grant' + ) { + throw new ConnectedAccountRefreshAccessTokenException( + `Failed to refresh Microsoft token: ${error.response?.data?.error} - ${error.response?.data?.error_description}`, + ConnectedAccountRefreshAccessTokenExceptionCode.INVALID_REFRESH_TOKEN, + ); + } + throw error; + } } } diff --git a/packages/twenty-server/src/modules/connected-account/refresh-tokens-manager/drivers/microsoft/utils/is-access-token-expired-or-invalid.util.ts b/packages/twenty-server/src/modules/connected-account/refresh-tokens-manager/drivers/microsoft/utils/is-access-token-expired-or-invalid.util.ts deleted file mode 100644 index 110cbb20d2d50..0000000000000 --- a/packages/twenty-server/src/modules/connected-account/refresh-tokens-manager/drivers/microsoft/utils/is-access-token-expired-or-invalid.util.ts +++ /dev/null @@ -1,25 +0,0 @@ -import { isNonEmptyString } from '@sniptt/guards'; -import jwt from 'jsonwebtoken'; - -export const isAccessTokenExpiredOrInvalid = ( - token: string, - expirationBufferInSeconds = 5 * 60, -): boolean => { - if (!isNonEmptyString(token)) { - return true; - } - - try { - const payload = jwt.decode(token) as { exp?: number } | null; - - if (!payload || typeof payload.exp !== 'number') { - return true; - } - - const currentTime = Math.floor(Date.now() / 1000); - - return payload.exp < currentTime + expirationBufferInSeconds; - } catch { - return true; - } -}; diff --git a/packages/twenty-server/src/modules/connected-account/refresh-tokens-manager/exceptions/connected-account-refresh-tokens.exception.ts b/packages/twenty-server/src/modules/connected-account/refresh-tokens-manager/exceptions/connected-account-refresh-tokens.exception.ts index 86d225b7449f3..86aa4fdca3d53 100644 --- a/packages/twenty-server/src/modules/connected-account/refresh-tokens-manager/exceptions/connected-account-refresh-tokens.exception.ts +++ b/packages/twenty-server/src/modules/connected-account/refresh-tokens-manager/exceptions/connected-account-refresh-tokens.exception.ts @@ -4,6 +4,7 @@ export class ConnectedAccountRefreshAccessTokenException extends CustomException export enum ConnectedAccountRefreshAccessTokenExceptionCode { REFRESH_TOKEN_NOT_FOUND = 'REFRESH_TOKEN_NOT_FOUND', + INVALID_REFRESH_TOKEN = 'INVALID_REFRESH_TOKEN', REFRESH_ACCESS_TOKEN_FAILED = 'REFRESH_ACCESS_TOKEN_FAILED', PROVIDER_NOT_SUPPORTED = 'PROVIDER_NOT_SUPPORTED', TEMPORARY_NETWORK_ERROR = 'TEMPORARY_NETWORK_ERROR', diff --git a/packages/twenty-server/src/modules/connected-account/refresh-tokens-manager/services/connected-account-refresh-tokens.service.spec.ts b/packages/twenty-server/src/modules/connected-account/refresh-tokens-manager/services/connected-account-refresh-tokens.service.spec.ts index afb5b0e5613f1..664bfa8f65948 100644 --- a/packages/twenty-server/src/modules/connected-account/refresh-tokens-manager/services/connected-account-refresh-tokens.service.spec.ts +++ b/packages/twenty-server/src/modules/connected-account/refresh-tokens-manager/services/connected-account-refresh-tokens.service.spec.ts @@ -5,7 +5,6 @@ import { ConnectedAccountProvider } from 'twenty-shared/types'; import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; import { GoogleAPIRefreshAccessTokenService } from 'src/modules/connected-account/refresh-tokens-manager/drivers/google/services/google-api-refresh-tokens.service'; import { MicrosoftAPIRefreshAccessTokenService } from 'src/modules/connected-account/refresh-tokens-manager/drivers/microsoft/services/microsoft-api-refresh-tokens.service'; -import { isAccessTokenExpiredOrInvalid } from 'src/modules/connected-account/refresh-tokens-manager/drivers/microsoft/utils/is-access-token-expired-or-invalid.util'; import { ConnectedAccountRefreshAccessTokenException, ConnectedAccountRefreshAccessTokenExceptionCode, @@ -14,12 +13,9 @@ import { type ConnectedAccountWorkspaceEntity } from 'src/modules/connected-acco import { ConnectedAccountRefreshTokensService } from './connected-account-refresh-tokens.service'; -jest.mock( - 'src/modules/connected-account/refresh-tokens-manager/drivers/microsoft/utils/is-access-token-expired-or-invalid.util', -); - describe('ConnectedAccountRefreshTokensService', () => { let service: ConnectedAccountRefreshTokensService; + let googleAPIRefreshAccessTokenService: GoogleAPIRefreshAccessTokenService; let microsoftAPIRefreshAccessTokenService: MicrosoftAPIRefreshAccessTokenService; let twentyORMManager: TwentyORMManager; @@ -57,6 +53,10 @@ describe('ConnectedAccountRefreshTokensService', () => { service = module.get( ConnectedAccountRefreshTokensService, ); + googleAPIRefreshAccessTokenService = + module.get( + GoogleAPIRefreshAccessTokenService, + ); microsoftAPIRefreshAccessTokenService = module.get( MicrosoftAPIRefreshAccessTokenService, @@ -69,16 +69,15 @@ describe('ConnectedAccountRefreshTokensService', () => { }); describe('refreshAndSaveTokens', () => { - it('should reuse valid access token without refreshing', async () => { + it('should reuse valid access token without refreshing when lastCredentialsRefreshedAt is recent', async () => { const connectedAccount = { id: mockConnectedAccountId, provider: ConnectedAccountProvider.MICROSOFT, accessToken: mockAccessToken, refreshToken: mockRefreshToken, + lastCredentialsRefreshedAt: new Date(Date.now() - 30 * 60 * 1000), // 30 minutes ago } as ConnectedAccountWorkspaceEntity; - (isAccessTokenExpiredOrInvalid as jest.Mock).mockReturnValue(false); - const result = await service.refreshAndSaveTokens( connectedAccount, mockWorkspaceId, @@ -94,12 +93,93 @@ describe('ConnectedAccountRefreshTokensService', () => { expect(twentyORMManager.getRepository).not.toHaveBeenCalled(); }); - it('should refresh and save new Microsoft token when expired', async () => { + it('should refresh and save new Microsoft token when expired (lastCredentialsRefreshedAt is old)', async () => { + const connectedAccount = { + id: mockConnectedAccountId, + provider: ConnectedAccountProvider.MICROSOFT, + accessToken: mockAccessToken, + refreshToken: mockRefreshToken, + lastCredentialsRefreshedAt: new Date(Date.now() - 2 * 60 * 60 * 1000), // 2 hours ago + } as ConnectedAccountWorkspaceEntity; + + const mockRepository = { update: jest.fn() }; + const newTokens = { + accessToken: mockNewAccessToken, + refreshToken: mockRefreshToken, + }; + + jest + .spyOn(microsoftAPIRefreshAccessTokenService, 'refreshTokens') + .mockResolvedValue(newTokens); + jest + .spyOn(twentyORMManager, 'getRepository') + .mockResolvedValue(mockRepository as any); + + const result = await service.refreshAndSaveTokens( + connectedAccount, + mockWorkspaceId, + ); + + expect(result).toEqual(newTokens); + expect( + microsoftAPIRefreshAccessTokenService.refreshTokens, + ).toHaveBeenCalledWith(mockRefreshToken); + expect(mockRepository.update).toHaveBeenCalledWith( + { id: mockConnectedAccountId }, + expect.objectContaining({ + ...newTokens, + lastCredentialsRefreshedAt: expect.any(Date), + }), + ); + }); + + it('should refresh and save new Google token when expired (lastCredentialsRefreshedAt is old)', async () => { + const connectedAccount = { + id: mockConnectedAccountId, + provider: ConnectedAccountProvider.GOOGLE, + accessToken: mockAccessToken, + refreshToken: mockRefreshToken, + lastCredentialsRefreshedAt: new Date(Date.now() - 2 * 60 * 60 * 1000), // 2 hours ago + } as ConnectedAccountWorkspaceEntity; + + const mockRepository = { update: jest.fn() }; + const newTokens = { + accessToken: mockNewAccessToken, + refreshToken: mockRefreshToken, + }; + + jest + .spyOn(googleAPIRefreshAccessTokenService, 'refreshTokens') + .mockResolvedValue(newTokens); + jest + .spyOn(twentyORMManager, 'getRepository') + .mockResolvedValue(mockRepository as any); + + const result = await service.refreshAndSaveTokens( + connectedAccount, + mockWorkspaceId, + ); + + expect(result).toEqual(newTokens); + expect( + googleAPIRefreshAccessTokenService.refreshTokens, + ).toHaveBeenCalledWith(mockRefreshToken); + expect(mockRepository.update).toHaveBeenCalledWith( + { id: mockConnectedAccountId }, + expect.objectContaining({ + ...newTokens, + lastCredentialsRefreshedAt: expect.any(Date), + }), + ); + }); + + it('should refresh token when lastCredentialsRefreshedAt is null', async () => { const connectedAccount = { id: mockConnectedAccountId, provider: ConnectedAccountProvider.MICROSOFT, accessToken: mockAccessToken, refreshToken: mockRefreshToken, + lastCredentialsRefreshedAt: null, } as ConnectedAccountWorkspaceEntity; const mockRepository = { update: jest.fn() }; @@ -108,7 +188,6 @@ describe('ConnectedAccountRefreshTokensService', () => { refreshToken: mockRefreshToken, }; - (isAccessTokenExpiredOrInvalid as jest.Mock).mockReturnValue(true); jest .spyOn(microsoftAPIRefreshAccessTokenService, 'refreshTokens') .mockResolvedValue(newTokens); @@ -127,7 +206,10 @@ describe('ConnectedAccountRefreshTokensService', () => { ).toHaveBeenCalledWith(mockRefreshToken); expect(mockRepository.update).toHaveBeenCalledWith( { id: mockConnectedAccountId }, - newTokens, + expect.objectContaining({ + ...newTokens, + lastCredentialsRefreshedAt: expect.any(Date), + }), ); }); @@ -137,6 +219,7 @@ describe('ConnectedAccountRefreshTokensService', () => { provider: ConnectedAccountProvider.GOOGLE, accessToken: mockAccessToken, refreshToken: null, + lastCredentialsRefreshedAt: new Date(Date.now() - 2 * 60 * 60 * 1000), } as unknown as ConnectedAccountWorkspaceEntity; await expect( @@ -149,30 +232,106 @@ describe('ConnectedAccountRefreshTokensService', () => { ); }); - it('should throw when Microsoft refresh fails with axios error', async () => { + it('should throw exception when Microsoft refresh fails with invalid_grant', async () => { const connectedAccount = { id: mockConnectedAccountId, provider: ConnectedAccountProvider.MICROSOFT, accessToken: mockAccessToken, refreshToken: mockRefreshToken, + lastCredentialsRefreshedAt: new Date(Date.now() - 2 * 60 * 60 * 1000), // 2 hours ago } as ConnectedAccountWorkspaceEntity; - const axiosError = { - message: 'Request failed', - response: { - status: 400, - data: { error: 'invalid_grant', error_description: 'Token expired' }, - }, - }; + const invalidGrantError = new ConnectedAccountRefreshAccessTokenException( + 'Microsoft OAuth error: invalid_grant - Token has been revoked', + ConnectedAccountRefreshAccessTokenExceptionCode.INVALID_REFRESH_TOKEN, + ); - (isAccessTokenExpiredOrInvalid as jest.Mock).mockReturnValue(true); jest .spyOn(microsoftAPIRefreshAccessTokenService, 'refreshTokens') - .mockRejectedValue(axiosError); + .mockRejectedValue(invalidGrantError); await expect( service.refreshAndSaveTokens(connectedAccount, mockWorkspaceId), - ).rejects.toThrow(ConnectedAccountRefreshAccessTokenException); + ).rejects.toMatchObject({ + message: expect.stringContaining( + 'Microsoft OAuth error: invalid_grant - Token has been revoked', + ), + code: ConnectedAccountRefreshAccessTokenExceptionCode.REFRESH_ACCESS_TOKEN_FAILED, + }); + }); + + it('should throw TEMPORARY_NETWORK_ERROR when refresh fails with network error', async () => { + const connectedAccount = { + id: mockConnectedAccountId, + provider: ConnectedAccountProvider.GOOGLE, + accessToken: mockAccessToken, + refreshToken: mockRefreshToken, + lastCredentialsRefreshedAt: new Date(Date.now() - 2 * 60 * 60 * 1000), // 2 hours ago + } as ConnectedAccountWorkspaceEntity; + + const networkError = new Error('Network error'); + + (networkError as any).code = 'ECONNRESET'; + + jest + .spyOn(googleAPIRefreshAccessTokenService, 'refreshTokens') + .mockRejectedValue(networkError); + + await expect( + service.refreshAndSaveTokens(connectedAccount, mockWorkspaceId), + ).rejects.toMatchObject({ + code: ConnectedAccountRefreshAccessTokenExceptionCode.TEMPORARY_NETWORK_ERROR, + }); + }); + }); + + describe('isAccessTokenStillValid', () => { + it('should return true when lastCredentialsRefreshedAt is within the valid window (30 minutes ago)', async () => { + const connectedAccount = { + id: mockConnectedAccountId, + provider: ConnectedAccountProvider.MICROSOFT, + lastCredentialsRefreshedAt: new Date(Date.now() - 30 * 60 * 1000), // 30 minutes ago + } as ConnectedAccountWorkspaceEntity; + + const result = await service.isAccessTokenStillValid(connectedAccount); + + expect(result).toBe(true); + }); + + it('should return false when lastCredentialsRefreshedAt is outside the valid window (2 hours ago)', async () => { + const connectedAccount = { + id: mockConnectedAccountId, + provider: ConnectedAccountProvider.GOOGLE, + lastCredentialsRefreshedAt: new Date(Date.now() - 2 * 60 * 60 * 1000), // 2 hours ago + } as ConnectedAccountWorkspaceEntity; + + const result = await service.isAccessTokenStillValid(connectedAccount); + + expect(result).toBe(false); + }); + + it('should return false when lastCredentialsRefreshedAt is null', async () => { + const connectedAccount = { + id: mockConnectedAccountId, + provider: ConnectedAccountProvider.MICROSOFT, + lastCredentialsRefreshedAt: null, + } as ConnectedAccountWorkspaceEntity; + + const result = await service.isAccessTokenStillValid(connectedAccount); + + expect(result).toBe(false); + }); + + it('should return true for IMAP_SMTP_CALDAV provider regardless of lastCredentialsRefreshedAt', async () => { + const connectedAccount = { + id: mockConnectedAccountId, + provider: ConnectedAccountProvider.IMAP_SMTP_CALDAV, + lastCredentialsRefreshedAt: null, + } as ConnectedAccountWorkspaceEntity; + + const result = await service.isAccessTokenStillValid(connectedAccount); + + expect(result).toBe(true); }); }); }); diff --git a/packages/twenty-server/src/modules/connected-account/refresh-tokens-manager/services/connected-account-refresh-tokens.service.ts b/packages/twenty-server/src/modules/connected-account/refresh-tokens-manager/services/connected-account-refresh-tokens.service.ts index 661af8644616c..b9b30c4eb6949 100644 --- a/packages/twenty-server/src/modules/connected-account/refresh-tokens-manager/services/connected-account-refresh-tokens.service.ts +++ b/packages/twenty-server/src/modules/connected-account/refresh-tokens-manager/services/connected-account-refresh-tokens.service.ts @@ -6,7 +6,6 @@ import { assertUnreachable } from 'twenty-shared/utils'; import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; import { GoogleAPIRefreshAccessTokenService } from 'src/modules/connected-account/refresh-tokens-manager/drivers/google/services/google-api-refresh-tokens.service'; import { MicrosoftAPIRefreshAccessTokenService } from 'src/modules/connected-account/refresh-tokens-manager/drivers/microsoft/services/microsoft-api-refresh-tokens.service'; -import { isAccessTokenExpiredOrInvalid } from 'src/modules/connected-account/refresh-tokens-manager/drivers/microsoft/utils/is-access-token-expired-or-invalid.util'; import { ConnectedAccountRefreshAccessTokenException, ConnectedAccountRefreshAccessTokenExceptionCode, @@ -19,6 +18,8 @@ export type ConnectedAccountTokens = { refreshToken: string; }; +const MESSAGING_CONNECTED_ACCOUNT_ACCESS_TOKEN_EXPIRATION = 1000 * 60 * 60; + @Injectable() export class ConnectedAccountRefreshTokensService { private readonly logger = new Logger( @@ -44,10 +45,8 @@ export class ConnectedAccountRefreshTokensService { ); } - const isAccessTokenValid = await this.isAccessTokenStillValid( - connectedAccount, - accessToken, - ); + const isAccessTokenValid = + await this.isAccessTokenStillValid(connectedAccount); if (isAccessTokenValid) { this.logger.debug( @@ -77,7 +76,10 @@ export class ConnectedAccountRefreshTokensService { await connectedAccountRepository.update( { id: connectedAccount.id }, - connectedAccountTokens, + { + ...connectedAccountTokens, + lastCredentialsRefreshedAt: new Date(), + }, ); return connectedAccountTokens; @@ -85,17 +87,23 @@ export class ConnectedAccountRefreshTokensService { async isAccessTokenStillValid( connectedAccount: ConnectedAccountWorkspaceEntity, - accessToken: string, ): Promise { switch (connectedAccount.provider) { - case ConnectedAccountProvider.GOOGLE: { - // Google's access tokens are opaque and needs network calls to check if they are valid we default to false for now - return false; - } + case ConnectedAccountProvider.GOOGLE: case ConnectedAccountProvider.MICROSOFT: { - const isExpired = isAccessTokenExpiredOrInvalid(accessToken); + if (!connectedAccount.lastCredentialsRefreshedAt) { + return false; + } - return !isExpired; + const BUFFER_TIME = 5 * 60 * 1000; + + const tokenExpirationTime = + MESSAGING_CONNECTED_ACCOUNT_ACCESS_TOKEN_EXPIRATION - BUFFER_TIME; + + return ( + connectedAccount.lastCredentialsRefreshedAt > + new Date(Date.now() - tokenExpirationTime) + ); } case ConnectedAccountProvider.IMAP_SMTP_CALDAV: return true; @@ -134,19 +142,6 @@ export class ConnectedAccountRefreshTokensService { ); } } catch (error) { - if (error?.name === 'AggregateError') { - const firstError = error?.errors?.[0]; - - this.logger.log(firstError); - - if (isAxiosTemporaryError(error)) { - throw new ConnectedAccountRefreshAccessTokenException( - `Error refreshing tokens for connected account ${connectedAccount.id.slice(0, 7)} in workspace ${workspaceId.slice(0, 7)}: ${firstError.code}`, - ConnectedAccountRefreshAccessTokenExceptionCode.TEMPORARY_NETWORK_ERROR, - ); - } - } - if (isAxiosTemporaryError(error)) { throw new ConnectedAccountRefreshAccessTokenException( `Error refreshing tokens for connected account ${connectedAccount.id.slice(0, 7)} in workspace ${workspaceId.slice(0, 7)}: ${error.code}`, @@ -158,10 +153,7 @@ export class ConnectedAccountRefreshTokensService { `Error while refreshing tokens on connected account ${connectedAccount.id.slice(0, 7)} in workspace ${workspaceId.slice(0, 7)}`, error, ); - throw new ConnectedAccountRefreshAccessTokenException( - `Error refreshing tokens for connected account ${connectedAccount.id.slice(0, 7)} in workspace ${workspaceId.slice(0, 7)}: ${error.message} ${error?.response?.data?.error_description}`, - ConnectedAccountRefreshAccessTokenExceptionCode.REFRESH_ACCESS_TOKEN_FAILED, - ); + throw error; } } } diff --git a/packages/twenty-server/src/modules/connected-account/standard-objects/connected-account.workspace-entity.ts b/packages/twenty-server/src/modules/connected-account/standard-objects/connected-account.workspace-entity.ts index de18b2a148107..44375eb607eb7 100644 --- a/packages/twenty-server/src/modules/connected-account/standard-objects/connected-account.workspace-entity.ts +++ b/packages/twenty-server/src/modules/connected-account/standard-objects/connected-account.workspace-entity.ts @@ -70,6 +70,16 @@ export class ConnectedAccountWorkspaceEntity extends BaseWorkspaceEntity { }) refreshToken: string; + @WorkspaceField({ + standardId: CONNECTED_ACCOUNT_STANDARD_FIELD_IDS.lastCredentialsRefreshedAt, + type: FieldMetadataType.DATE_TIME, + label: msg`Last credentials refreshed at`, + description: msg`Last credentials refreshed at`, + icon: 'IconHistory', + }) + @WorkspaceIsNullable() + lastCredentialsRefreshedAt: Date | null; + @WorkspaceField({ standardId: CONNECTED_ACCOUNT_STANDARD_FIELD_IDS.lastSyncHistoryId, type: FieldMetadataType.TEXT, diff --git a/packages/twenty-server/src/modules/messaging/common/services/message-channel-sync-status.service.ts b/packages/twenty-server/src/modules/messaging/common/services/message-channel-sync-status.service.ts index 51bc3ae09242c..094e4858f98a2 100644 --- a/packages/twenty-server/src/modules/messaging/common/services/message-channel-sync-status.service.ts +++ b/packages/twenty-server/src/modules/messaging/common/services/message-channel-sync-status.service.ts @@ -190,31 +190,35 @@ export class MessageChannelSyncStatusService { eventIds: messageChannelIds, }); - const connectedAccountRepository = - await this.twentyORMManager.getRepository( - 'connectedAccount', + if ( + syncStatus === MessageChannelSyncStatus.FAILED_INSUFFICIENT_PERMISSIONS + ) { + const connectedAccountRepository = + await this.twentyORMManager.getRepository( + 'connectedAccount', + ); + + const messageChannels = await messageChannelRepository.find({ + select: ['id', 'connectedAccountId'], + where: { id: Any(messageChannelIds) }, + }); + + const connectedAccountIds = messageChannels.map( + (messageChannel) => messageChannel.connectedAccountId, ); - const messageChannels = await messageChannelRepository.find({ - select: ['id', 'connectedAccountId'], - where: { id: Any(messageChannelIds) }, - }); - - const connectedAccountIds = messageChannels.map( - (messageChannel) => messageChannel.connectedAccountId, - ); - - await connectedAccountRepository.update( - { id: Any(connectedAccountIds) }, - { - authFailedAt: new Date(), - }, - ); + await connectedAccountRepository.update( + { id: Any(connectedAccountIds) }, + { + authFailedAt: new Date(), + }, + ); - await this.addToAccountsToReconnect( - messageChannels.map((messageChannel) => messageChannel.id), - workspaceId, - ); + await this.addToAccountsToReconnect( + messageChannels.map((messageChannel) => messageChannel.id), + workspaceId, + ); + } } private async addToAccountsToReconnect( diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/commands/messaging-relaunch-failed-message-channels.command.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/commands/messaging-relaunch-failed-message-channels.command.ts deleted file mode 100644 index 420bcc293247c..0000000000000 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/commands/messaging-relaunch-failed-message-channels.command.ts +++ /dev/null @@ -1,84 +0,0 @@ -import { InjectRepository } from '@nestjs/typeorm'; - -import { Command } from 'nest-commander'; -import { Repository } from 'typeorm'; - -import { - ActiveOrSuspendedWorkspacesMigrationCommandRunner, - type RunOnWorkspaceArgs, -} from 'src/database/commands/command-runners/active-or-suspended-workspaces-migration.command-runner'; -import { WorkspaceEntity } from 'src/engine/core-modules/workspace/workspace.entity'; -import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager'; -import { AccountsToReconnectService } from 'src/modules/connected-account/services/accounts-to-reconnect.service'; -import { - MessageChannelSyncStage, - MessageChannelSyncStatus, - MessageChannelWorkspaceEntity, -} from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; - -@Command({ - name: 'messaging:relaunch-failed-message-channels', - description: 'Relaunch failed message channels', -}) -export class MessagingRelaunchFailedMessageChannelsCommand extends ActiveOrSuspendedWorkspacesMigrationCommandRunner { - constructor( - @InjectRepository(WorkspaceEntity) - protected readonly workspaceRepository: Repository, - protected readonly twentyORMGlobalManager: TwentyORMGlobalManager, - protected readonly accountsToReconnectService: AccountsToReconnectService, - ) { - super(workspaceRepository, twentyORMGlobalManager); - } - - override async runOnWorkspace({ - workspaceId, - options, - }: RunOnWorkspaceArgs): Promise { - try { - const messageChannelRepository = - await this.twentyORMGlobalManager.getRepositoryForWorkspace( - workspaceId, - 'messageChannel', - { shouldBypassPermissionChecks: true }, - ); - - const failedMessageChannels = await messageChannelRepository.find({ - where: { - syncStage: MessageChannelSyncStage.FAILED, - }, - relations: { - connectedAccount: { - accountOwner: true, - }, - }, - }); - - if (!options.dryRun && failedMessageChannels.length > 0) { - await messageChannelRepository.update( - failedMessageChannels.map(({ id }) => id), - { - syncStage: MessageChannelSyncStage.MESSAGE_LIST_FETCH_PENDING, - syncStatus: MessageChannelSyncStatus.ACTIVE, - }, - ); - - for (const failedMessageChannel of failedMessageChannels) { - await this.accountsToReconnectService.removeAccountToReconnect( - failedMessageChannel.connectedAccount.accountOwner.userId, - failedMessageChannel.connectedAccountId, - workspaceId, - ); - } - } - - this.logger.log( - `${options.dryRun ? ' (DRY RUN) ' : ''}Relaunched ${failedMessageChannels.length} failed message channels`, - ); - } catch (error) { - this.logger.error( - 'Error while relaunching failed message channels', - error, - ); - } - } -} diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/crons/commands/messaging-relaunch-failed-message-channels.cron.command.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/crons/commands/messaging-relaunch-failed-message-channels.cron.command.ts new file mode 100644 index 0000000000000..08cb88ed65751 --- /dev/null +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/crons/commands/messaging-relaunch-failed-message-channels.cron.command.ts @@ -0,0 +1,35 @@ +import { Command, CommandRunner } from 'nest-commander'; + +import { InjectMessageQueue } from 'src/engine/core-modules/message-queue/decorators/message-queue.decorator'; +import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants'; +import { MessageQueueService } from 'src/engine/core-modules/message-queue/services/message-queue.service'; +import { + MESSAGING_RELAUNCH_FAILED_MESSAGE_CHANNELS_CRON_PATTERN, + MessagingRelaunchFailedMessageChannelsCronJob, +} from 'src/modules/messaging/message-import-manager/crons/jobs/messaging-relaunch-failed-message-channels.cron.job'; + +@Command({ + name: 'cron:messaging:relaunch-failed-message-channels', + description: + 'Starts a cron job to relaunch failed message channels every 30 minutes', +}) +export class MessagingRelaunchFailedMessageChannelsCronCommand extends CommandRunner { + constructor( + @InjectMessageQueue(MessageQueue.cronQueue) + private readonly messageQueueService: MessageQueueService, + ) { + super(); + } + + async run(): Promise { + await this.messageQueueService.addCron({ + jobName: MessagingRelaunchFailedMessageChannelsCronJob.name, + data: undefined, + options: { + repeat: { + pattern: MESSAGING_RELAUNCH_FAILED_MESSAGE_CHANNELS_CRON_PATTERN, + }, + }, + }); + } +} diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/crons/jobs/messaging-relaunch-failed-message-channels.cron.job.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/crons/jobs/messaging-relaunch-failed-message-channels.cron.job.ts new file mode 100644 index 0000000000000..037c159442bb1 --- /dev/null +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/crons/jobs/messaging-relaunch-failed-message-channels.cron.job.ts @@ -0,0 +1,74 @@ +import { InjectDataSource, InjectRepository } from '@nestjs/typeorm'; + +import { WorkspaceActivationStatus } from 'twenty-shared/workspace'; +import { DataSource, Repository } from 'typeorm'; + +import { SentryCronMonitor } from 'src/engine/core-modules/cron/sentry-cron-monitor.decorator'; +import { ExceptionHandlerService } from 'src/engine/core-modules/exception-handler/exception-handler.service'; +import { InjectMessageQueue } from 'src/engine/core-modules/message-queue/decorators/message-queue.decorator'; +import { Process } from 'src/engine/core-modules/message-queue/decorators/process.decorator'; +import { Processor } from 'src/engine/core-modules/message-queue/decorators/processor.decorator'; +import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants'; +import { MessageQueueService } from 'src/engine/core-modules/message-queue/services/message-queue.service'; +import { WorkspaceEntity } from 'src/engine/core-modules/workspace/workspace.entity'; +import { getWorkspaceSchemaName } from 'src/engine/workspace-datasource/utils/get-workspace-schema-name.util'; +import { MessageChannelSyncStage } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; +import { + MessagingRelaunchFailedMessageChannelJob, + type MessagingRelaunchFailedMessageChannelJobData, +} from 'src/modules/messaging/message-import-manager/jobs/messaging-relaunch-failed-message-channel.job'; + +export const MESSAGING_RELAUNCH_FAILED_MESSAGE_CHANNELS_CRON_PATTERN = + '*/30 * * * *'; + +@Processor(MessageQueue.cronQueue) +export class MessagingRelaunchFailedMessageChannelsCronJob { + constructor( + @InjectRepository(WorkspaceEntity) + private readonly workspaceRepository: Repository, + @InjectMessageQueue(MessageQueue.messagingQueue) + private readonly messageQueueService: MessageQueueService, + @InjectDataSource() + private readonly coreDataSource: DataSource, + private readonly exceptionHandlerService: ExceptionHandlerService, + ) {} + + @Process(MessagingRelaunchFailedMessageChannelsCronJob.name) + @SentryCronMonitor( + MessagingRelaunchFailedMessageChannelsCronJob.name, + MESSAGING_RELAUNCH_FAILED_MESSAGE_CHANNELS_CRON_PATTERN, + ) + async handle(): Promise { + const activeWorkspaces = await this.workspaceRepository.find({ + where: { + activationStatus: WorkspaceActivationStatus.ACTIVE, + }, + }); + + for (const activeWorkspace of activeWorkspaces) { + try { + const schemaName = getWorkspaceSchemaName(activeWorkspace.id); + + const failedMessageChannels = await this.coreDataSource.query( + `SELECT * FROM ${schemaName}."messageChannel" WHERE "syncStage" = '${MessageChannelSyncStage.FAILED}'`, + ); + + for (const messageChannel of failedMessageChannels) { + await this.messageQueueService.add( + MessagingRelaunchFailedMessageChannelJob.name, + { + workspaceId: activeWorkspace.id, + messageChannelId: messageChannel.id, + }, + ); + } + } catch (error) { + this.exceptionHandlerService.captureExceptions([error], { + workspace: { + id: activeWorkspace.id, + }, + }); + } + } + } +} diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-relaunch-failed-message-channel.job.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-relaunch-failed-message-channel.job.ts new file mode 100644 index 0000000000000..9532d8b2ac91e --- /dev/null +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-relaunch-failed-message-channel.job.ts @@ -0,0 +1,69 @@ +import { Scope } from '@nestjs/common'; + +import { Process } from 'src/engine/core-modules/message-queue/decorators/process.decorator'; +import { Processor } from 'src/engine/core-modules/message-queue/decorators/processor.decorator'; +import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants'; +import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager'; +import { AccountsToReconnectService } from 'src/modules/connected-account/services/accounts-to-reconnect.service'; +import { + MessageChannelSyncStage, + MessageChannelSyncStatus, + MessageChannelWorkspaceEntity, +} from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; + +export type MessagingRelaunchFailedMessageChannelJobData = { + workspaceId: string; + messageChannelId: string; +}; + +@Processor({ + queueName: MessageQueue.messagingQueue, + scope: Scope.REQUEST, +}) +export class MessagingRelaunchFailedMessageChannelJob { + constructor( + private readonly twentyORMGlobalManager: TwentyORMGlobalManager, + private readonly accountsToReconnectService: AccountsToReconnectService, + ) {} + + @Process(MessagingRelaunchFailedMessageChannelJob.name) + async handle(data: MessagingRelaunchFailedMessageChannelJobData) { + const { workspaceId, messageChannelId } = data; + + const messageChannelRepository = + await this.twentyORMGlobalManager.getRepositoryForWorkspace( + workspaceId, + 'messageChannel', + { shouldBypassPermissionChecks: true }, + ); + + const messageChannel = await messageChannelRepository.findOne({ + where: { + id: messageChannelId, + }, + relations: { + connectedAccount: { + accountOwner: true, + }, + }, + }); + + if ( + !messageChannel || + messageChannel.syncStage !== MessageChannelSyncStage.FAILED + ) { + return; + } + + await messageChannelRepository.update(messageChannelId, { + syncStage: MessageChannelSyncStage.MESSAGE_LIST_FETCH_PENDING, + syncStatus: MessageChannelSyncStatus.ACTIVE, + }); + + await this.accountsToReconnectService.removeAccountToReconnect( + messageChannel.connectedAccount.accountOwner.userId, + messageChannel.connectedAccountId, + workspaceId, + ); + } +} diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/messaging-import-manager.module.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/messaging-import-manager.module.ts index b7b846df91ba7..885afbf4aeed6 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/messaging-import-manager.module.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/messaging-import-manager.module.ts @@ -14,14 +14,15 @@ import { RefreshTokensManagerModule } from 'src/modules/connected-account/refres import { MessagingCommonModule } from 'src/modules/messaging/common/messaging-common.module'; import { MessagingMessageCleanerModule } from 'src/modules/messaging/message-cleaner/messaging-message-cleaner.module'; import { MessagingFolderSyncManagerModule } from 'src/modules/messaging/message-folder-manager/messaging-folder-sync-manager.module'; -import { MessagingRelaunchFailedMessageChannelsCommand } from 'src/modules/messaging/message-import-manager/commands/messaging-relaunch-failed-message-channels.command'; import { MessagingSingleMessageImportCommand } from 'src/modules/messaging/message-import-manager/commands/messaging-single-message-import.command'; import { MessagingMessageListFetchCronCommand } from 'src/modules/messaging/message-import-manager/crons/commands/messaging-message-list-fetch.cron.command'; import { MessagingMessagesImportCronCommand } from 'src/modules/messaging/message-import-manager/crons/commands/messaging-messages-import.cron.command'; import { MessagingOngoingStaleCronCommand } from 'src/modules/messaging/message-import-manager/crons/commands/messaging-ongoing-stale.cron.command'; +import { MessagingRelaunchFailedMessageChannelsCronCommand } from 'src/modules/messaging/message-import-manager/crons/commands/messaging-relaunch-failed-message-channels.cron.command'; import { MessagingMessageListFetchCronJob } from 'src/modules/messaging/message-import-manager/crons/jobs/messaging-message-list-fetch.cron.job'; import { MessagingMessagesImportCronJob } from 'src/modules/messaging/message-import-manager/crons/jobs/messaging-messages-import.cron.job'; import { MessagingOngoingStaleCronJob } from 'src/modules/messaging/message-import-manager/crons/jobs/messaging-ongoing-stale.cron.job'; +import { MessagingRelaunchFailedMessageChannelsCronJob } from 'src/modules/messaging/message-import-manager/crons/jobs/messaging-relaunch-failed-message-channels.cron.job'; import { MessagingGmailDriverModule } from 'src/modules/messaging/message-import-manager/drivers/gmail/messaging-gmail-driver.module'; import { MessagingIMAPDriverModule } from 'src/modules/messaging/message-import-manager/drivers/imap/messaging-imap-driver.module'; import { MessagingMicrosoftDriverModule } from 'src/modules/messaging/message-import-manager/drivers/microsoft/messaging-microsoft-driver.module'; @@ -31,6 +32,7 @@ import { MessagingCleanCacheJob } from 'src/modules/messaging/message-import-man import { MessagingMessageListFetchJob } from 'src/modules/messaging/message-import-manager/jobs/messaging-message-list-fetch.job'; import { MessagingMessagesImportJob } from 'src/modules/messaging/message-import-manager/jobs/messaging-messages-import.job'; import { MessagingOngoingStaleJob } from 'src/modules/messaging/message-import-manager/jobs/messaging-ongoing-stale.job'; +import { MessagingRelaunchFailedMessageChannelJob } from 'src/modules/messaging/message-import-manager/jobs/messaging-relaunch-failed-message-channel.job'; import { MessagingMessageImportManagerMessageChannelListener } from 'src/modules/messaging/message-import-manager/listeners/messaging-import-manager-message-channel.listener'; import { MessagingAccountAuthenticationService } from 'src/modules/messaging/message-import-manager/services/messaging-account-authentication.service'; import { MessagingCursorService } from 'src/modules/messaging/message-import-manager/services/messaging-cursor.service'; @@ -72,14 +74,16 @@ import { MessagingMonitoringModule } from 'src/modules/messaging/monitoring/mess MessagingMessageListFetchCronCommand, MessagingMessagesImportCronCommand, MessagingOngoingStaleCronCommand, + MessagingRelaunchFailedMessageChannelsCronCommand, MessagingSingleMessageImportCommand, - MessagingRelaunchFailedMessageChannelsCommand, MessagingMessageListFetchJob, MessagingMessagesImportJob, MessagingOngoingStaleJob, + MessagingRelaunchFailedMessageChannelJob, MessagingMessageListFetchCronJob, MessagingMessagesImportCronJob, MessagingOngoingStaleCronJob, + MessagingRelaunchFailedMessageChannelsCronJob, MessagingAddSingleMessageToCacheForImportJob, MessagingMessageImportManagerMessageChannelListener, MessagingCleanCacheJob, @@ -99,6 +103,7 @@ import { MessagingMonitoringModule } from 'src/modules/messaging/monitoring/mess MessagingMessageListFetchCronCommand, MessagingMessagesImportCronCommand, MessagingOngoingStaleCronCommand, + MessagingRelaunchFailedMessageChannelsCronCommand, ], }) export class MessagingImportManagerModule {} diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-account-authentication.service.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-account-authentication.service.ts index bbc018bdf0173..82f561a1d8818 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-account-authentication.service.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-account-authentication.service.ts @@ -98,6 +98,7 @@ export class MessagingAccountAuthenticationService { ); case ConnectedAccountRefreshAccessTokenExceptionCode.REFRESH_ACCESS_TOKEN_FAILED: case ConnectedAccountRefreshAccessTokenExceptionCode.REFRESH_TOKEN_NOT_FOUND: + case ConnectedAccountRefreshAccessTokenExceptionCode.INVALID_REFRESH_TOKEN: await this.messagingMonitoringService.track({ eventName: `refresh_token.error.insufficient_permissions`, workspaceId, From 3b133fb9f8afb6a5871ff85b47cac9287aa0e466 Mon Sep 17 00:00:00 2001 From: neo773 Date: Tue, 4 Nov 2025 05:00:50 +0530 Subject: [PATCH 2/5] rename var --- .../services/connected-account-refresh-tokens.service.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/twenty-server/src/modules/connected-account/refresh-tokens-manager/services/connected-account-refresh-tokens.service.ts b/packages/twenty-server/src/modules/connected-account/refresh-tokens-manager/services/connected-account-refresh-tokens.service.ts index b9b30c4eb6949..3b3853f46d2e5 100644 --- a/packages/twenty-server/src/modules/connected-account/refresh-tokens-manager/services/connected-account-refresh-tokens.service.ts +++ b/packages/twenty-server/src/modules/connected-account/refresh-tokens-manager/services/connected-account-refresh-tokens.service.ts @@ -18,7 +18,7 @@ export type ConnectedAccountTokens = { refreshToken: string; }; -const MESSAGING_CONNECTED_ACCOUNT_ACCESS_TOKEN_EXPIRATION = 1000 * 60 * 60; +const CONNECTED_ACCOUNT_ACCESS_TOKEN_EXPIRATION = 1000 * 60 * 60; @Injectable() export class ConnectedAccountRefreshTokensService { @@ -98,7 +98,7 @@ export class ConnectedAccountRefreshTokensService { const BUFFER_TIME = 5 * 60 * 1000; const tokenExpirationTime = - MESSAGING_CONNECTED_ACCOUNT_ACCESS_TOKEN_EXPIRATION - BUFFER_TIME; + CONNECTED_ACCOUNT_ACCESS_TOKEN_EXPIRATION - BUFFER_TIME; return ( connectedAccount.lastCredentialsRefreshedAt > From db1779f9b812f19f7fc9edaa1d461ae66c2979fd Mon Sep 17 00:00:00 2001 From: neo773 Date: Tue, 4 Nov 2025 05:11:05 +0530 Subject: [PATCH 3/5] fix test --- .../services/connected-account-refresh-tokens.service.spec.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/twenty-server/src/modules/connected-account/refresh-tokens-manager/services/connected-account-refresh-tokens.service.spec.ts b/packages/twenty-server/src/modules/connected-account/refresh-tokens-manager/services/connected-account-refresh-tokens.service.spec.ts index 664bfa8f65948..f176eb2f9341d 100644 --- a/packages/twenty-server/src/modules/connected-account/refresh-tokens-manager/services/connected-account-refresh-tokens.service.spec.ts +++ b/packages/twenty-server/src/modules/connected-account/refresh-tokens-manager/services/connected-account-refresh-tokens.service.spec.ts @@ -256,7 +256,7 @@ describe('ConnectedAccountRefreshTokensService', () => { message: expect.stringContaining( 'Microsoft OAuth error: invalid_grant - Token has been revoked', ), - code: ConnectedAccountRefreshAccessTokenExceptionCode.REFRESH_ACCESS_TOKEN_FAILED, + code: ConnectedAccountRefreshAccessTokenExceptionCode.INVALID_REFRESH_TOKEN, }); }); From d5073cfd10296f12ed72448c8475ce6b09b2279d Mon Sep 17 00:00:00 2001 From: neo773 Date: Wed, 5 Nov 2025 01:42:53 +0530 Subject: [PATCH 4/5] change --- .../services/google-api-refresh-tokens.service.ts | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/packages/twenty-server/src/modules/connected-account/refresh-tokens-manager/drivers/google/services/google-api-refresh-tokens.service.ts b/packages/twenty-server/src/modules/connected-account/refresh-tokens-manager/drivers/google/services/google-api-refresh-tokens.service.ts index 14b7ffc30e7f0..14a8b7100d91f 100644 --- a/packages/twenty-server/src/modules/connected-account/refresh-tokens-manager/drivers/google/services/google-api-refresh-tokens.service.ts +++ b/packages/twenty-server/src/modules/connected-account/refresh-tokens-manager/drivers/google/services/google-api-refresh-tokens.service.ts @@ -2,6 +2,7 @@ import { Injectable } from '@nestjs/common'; import { GaxiosError } from 'gaxios'; import { google } from 'googleapis'; +import { isDefined } from 'twenty-shared/utils'; import { TwentyConfigService } from 'src/engine/core-modules/twenty-config/twenty-config.service'; import { @@ -26,8 +27,15 @@ export class GoogleAPIRefreshAccessTokenService { try { const { token } = await oAuth2Client.getAccessToken(); + if (!isDefined(token)) { + throw new ConnectedAccountRefreshAccessTokenException( + 'Error refreshing Google tokens: Invalid refresh token', + ConnectedAccountRefreshAccessTokenExceptionCode.INVALID_REFRESH_TOKEN, + ); + } + return { - accessToken: token as string, + accessToken: token, refreshToken, }; } catch (error) { From d5fed734e88742ae620da4a1bf7a26959cafc0b2 Mon Sep 17 00:00:00 2001 From: neo773 Date: Wed, 5 Nov 2025 01:46:13 +0530 Subject: [PATCH 5/5] remove accountsToReconnectService --- .../jobs/calendar-relaunch-failed-calendar-channel.job.ts | 8 -------- .../jobs/messaging-relaunch-failed-message-channel.job.ts | 8 -------- 2 files changed, 16 deletions(-) diff --git a/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/jobs/calendar-relaunch-failed-calendar-channel.job.ts b/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/jobs/calendar-relaunch-failed-calendar-channel.job.ts index fb64cf69f6e0d..619d6dfd12de4 100644 --- a/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/jobs/calendar-relaunch-failed-calendar-channel.job.ts +++ b/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/jobs/calendar-relaunch-failed-calendar-channel.job.ts @@ -9,7 +9,6 @@ import { CalendarChannelSyncStatus, CalendarChannelWorkspaceEntity, } from 'src/modules/calendar/common/standard-objects/calendar-channel.workspace-entity'; -import { AccountsToReconnectService } from 'src/modules/connected-account/services/accounts-to-reconnect.service'; export type CalendarRelaunchFailedCalendarChannelJobData = { workspaceId: string; @@ -23,7 +22,6 @@ export type CalendarRelaunchFailedCalendarChannelJobData = { export class CalendarRelaunchFailedCalendarChannelJob { constructor( private readonly twentyORMGlobalManager: TwentyORMGlobalManager, - private readonly accountsToReconnectService: AccountsToReconnectService, ) {} @Process(CalendarRelaunchFailedCalendarChannelJob.name) @@ -59,11 +57,5 @@ export class CalendarRelaunchFailedCalendarChannelJob { syncStage: CalendarChannelSyncStage.CALENDAR_EVENT_LIST_FETCH_PENDING, syncStatus: CalendarChannelSyncStatus.ACTIVE, }); - - await this.accountsToReconnectService.removeAccountToReconnect( - calendarChannel.connectedAccount.accountOwner.userId, - calendarChannel.connectedAccountId, - workspaceId, - ); } } diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-relaunch-failed-message-channel.job.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-relaunch-failed-message-channel.job.ts index 9532d8b2ac91e..6326eb64868de 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-relaunch-failed-message-channel.job.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-relaunch-failed-message-channel.job.ts @@ -4,7 +4,6 @@ import { Process } from 'src/engine/core-modules/message-queue/decorators/proces import { Processor } from 'src/engine/core-modules/message-queue/decorators/processor.decorator'; import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants'; import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager'; -import { AccountsToReconnectService } from 'src/modules/connected-account/services/accounts-to-reconnect.service'; import { MessageChannelSyncStage, MessageChannelSyncStatus, @@ -23,7 +22,6 @@ export type MessagingRelaunchFailedMessageChannelJobData = { export class MessagingRelaunchFailedMessageChannelJob { constructor( private readonly twentyORMGlobalManager: TwentyORMGlobalManager, - private readonly accountsToReconnectService: AccountsToReconnectService, ) {} @Process(MessagingRelaunchFailedMessageChannelJob.name) @@ -59,11 +57,5 @@ export class MessagingRelaunchFailedMessageChannelJob { syncStage: MessageChannelSyncStage.MESSAGE_LIST_FETCH_PENDING, syncStatus: MessageChannelSyncStatus.ACTIVE, }); - - await this.accountsToReconnectService.removeAccountToReconnect( - messageChannel.connectedAccount.accountOwner.userId, - messageChannel.connectedAccountId, - workspaceId, - ); } }