From 7b5285aebdc16a1773b24361ab62192ab8b5d246 Mon Sep 17 00:00:00 2001 From: jkay Date: Wed, 25 Oct 2023 10:50:40 +1300 Subject: [PATCH] remove-stale-msg-flag --- src/config/feature-flags.ts | 1 - src/sqs/sqs.test.ts | 122 +----------------------------------- src/sqs/sqs.ts | 43 +------------ 3 files changed, 2 insertions(+), 164 deletions(-) diff --git a/src/config/feature-flags.ts b/src/config/feature-flags.ts index 4a78130dc0..4b5a5365eb 100644 --- a/src/config/feature-flags.ts +++ b/src/config/feature-flags.ts @@ -17,7 +17,6 @@ export enum BooleanFlags { VERBOSE_LOGGING = "verbose-logging", SEND_PR_COMMENTS_TO_JIRA = "send-pr-comments-to-jira_zy5ib", JIRA_ADMIN_CHECK = "jira-admin-check", - REMOVE_STALE_MESSAGES = "remove-stale-messages", USE_NEW_PULL_ALGO = "use-new-pull-algo", USE_DYNAMODB_FOR_DEPLOYMENT_WEBHOOK = "use-dynamodb-for-deployment-webhook", USE_DYNAMODB_FOR_DEPLOYMENT_BACKFILL = "use-dynamodb-for-deployment-backfill", diff --git a/src/sqs/sqs.test.ts b/src/sqs/sqs.test.ts index 20a0a1b382..1b21e930a4 100644 --- a/src/sqs/sqs.test.ts +++ b/src/sqs/sqs.test.ts @@ -4,10 +4,9 @@ import { waitUntil } from "test/utils/wait-until"; import { statsd } from "config/statsd"; import { sqsQueueMetrics } from "config/metric-names"; import { AWSError, Request as AwsRequest, Service, Response } from "aws-sdk"; -import { BaseMessagePayload, SQSMessageContext } from "~/src/sqs/sqs.types"; +import { BaseMessagePayload } from "~/src/sqs/sqs.types"; import { preemptiveRateLimitCheck } from "utils/preemptive-rate-limit"; import { when } from "jest-when"; -import { booleanFlag, BooleanFlags } from "config/feature-flags"; import { SendMessageResult } from "aws-sdk/clients/sqs"; jest.mock("config/feature-flags"); @@ -59,10 +58,6 @@ describe("SQS", () => { // eslint-disable-next-line @typescript-eslint/no-unsafe-argument .calledWith(expect.anything(), expect.anything()) .mockResolvedValue({ isExceedThreshold: false }); - when(booleanFlag).calledWith( - BooleanFlags.REMOVE_STALE_MESSAGES, - jiraHost - ).mockResolvedValue(true); }); afterEach(async () => { @@ -252,119 +247,4 @@ describe("SQS", () => { })); }); }); - - describe("deleteStaleMessages", () => { - - // Mock the SQSMessageContext object - const context = { - log: { - warn: jest.fn(), - error: jest.fn() - } - } as unknown as SQSMessageContext; - - beforeEach(() => { - queue = createSqsQueue(1); - queue.start(); - when(booleanFlag).calledWith( - BooleanFlags.REMOVE_STALE_MESSAGES, - jiraHost - ).mockResolvedValue(true); - }); - - // Test case for when feature flag is turned off - it("should return false when feature flag is false", async () => { - when(booleanFlag).calledWith( - BooleanFlags.REMOVE_STALE_MESSAGES, - jiraHost - ).mockResolvedValue(false); - const message = { - Body: JSON.stringify({ - webhookReceived: Date.now() - 2 * 24 * 60 * 60 * 1000 // Two days ago - }), - MessageId: "12345" - }; - - const result = await queue.deleteStaleMessages(message, context, jiraHost); - expect(result).toBe(false); - }); - - // Test case for when the message is not from the targeted queue - it("should return false when message is not from targeted queue", async () => { - const message = { - Body: JSON.stringify({}), - MessageId: "12345" - }; - const result = await queue.deleteStaleMessages(message, context, jiraHost); - expect(result).toBe(false); - }); - - // Test case for when the message does not have a body - it("should return false when message has no body", async () => { - const message = { - MessageId: "12345" - }; - const result = await queue.deleteStaleMessages(message, context, jiraHost); - expect(result).toBe(false); - }); - - // Test case for when the message is from the targeted queue and is stale - it("should delete stale message and return true", async () => { - const message = { - Body: JSON.stringify({ - webhookReceived: Date.now() - 2 * 24 * 60 * 60 * 1000 // Two days ago - }), - MessageId: "12345" - }; - const deleteMessage = jest.fn(); - const mockThis = { - queueName: "deployment", - deleteMessage - }; - const result = await queue.deleteStaleMessages.call(mockThis, message, context, jiraHost); - expect(result).toBe(true); - expect(deleteMessage).toHaveBeenCalledWith(context); - // eslint-disable-next-line @typescript-eslint/unbound-method - expect(context.log.warn).toHaveBeenCalledWith( - { deletedMessageId: "12345" }, - "Deleted stale message from deployment queue" - ); - }); - - // Test case for when the message is from the targeted queue and is not stale - it("should return false when message is not stale", async () => { - const message = { - Body: JSON.stringify({ - webhookReceived: Date.now() - 12 * 60 * 60 * 1000 // 12 hours ago - }), - MessageId: "12345" - }; - const result = await queue.deleteStaleMessages(message, context, jiraHost); - expect(result).toBe(false); - }); - - // Test case for when deleting the message fails - it("should return false and log an error when deleting the message fails", async () => { - const message = { - Body: JSON.stringify({ - webhookReceived: Date.now() - 2 * 24 * 60 * 60 * 1000 // Two days ago - }), - MessageId: "12345" - }; - const deleteMessage = jest.fn().mockRejectedValue(new Error("Failed to delete message")); - const mockThis = { - queueName: "deployment", - deleteMessage - }; - const result = await queue.deleteStaleMessages.call(mockThis, message, context, jiraHost); - expect(result).toBe(false); - expect(deleteMessage).toHaveBeenCalledWith(context); - // eslint-disable-next-line @typescript-eslint/unbound-method - expect(context.log.error).toHaveBeenCalledWith( - // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment - { error: expect.any(Error), deletedMessageId: "12345" }, - "Failed to delete stale message from deployment queue" - ); - }); - }); }); diff --git a/src/sqs/sqs.ts b/src/sqs/sqs.ts index d5d868f87f..66a70a5cfe 100644 --- a/src/sqs/sqs.ts +++ b/src/sqs/sqs.ts @@ -6,7 +6,7 @@ import { v4 as uuidv4 } from "uuid"; import { statsd } from "config/statsd"; import { sqsQueueMetrics } from "config/metric-names"; import { ErrorHandler, ErrorHandlingResult, MessageHandler, QueueSettings, SQSContext, SQSMessageContext, BaseMessagePayload, SqsTimeoutError } from "~/src/sqs/sqs.types"; -import { booleanFlag, BooleanFlags, stringFlag, StringFlags } from "config/feature-flags"; +import { stringFlag, StringFlags } from "config/feature-flags"; import { preemptiveRateLimitCheck } from "utils/preemptive-rate-limit"; //Maximum SQS Delay according to SQS docs https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-delay-queues.html @@ -17,7 +17,6 @@ const MAX_MESSAGE_VISIBILITY_TIMEOUT_SEC: number = 12 * 60 * 60 - 1; const DEFAULT_LONG_POLLING_INTERVAL = 4; const PROCESSING_DURATION_HISTOGRAM_BUCKETS = "10_100_500_1000_2000_3000_5000_10000_30000_60000"; const EXTRA_VISIBILITY_TIMEOUT_DELAY = 2; -const ONE_DAY_MILLI = 24 * 60 * 60 * 1000; const isNotAFailure = (errorHandlingResult: ErrorHandlingResult) => { return !errorHandlingResult.isFailure; @@ -244,45 +243,6 @@ export class SqsQueue { } } - public async deleteStaleMessages(message: Message, context: SQSMessageContext, jiraHost?: string): Promise { - if (!await booleanFlag(BooleanFlags.REMOVE_STALE_MESSAGES, jiraHost)) { - return false; - } - const TARGETED_QUEUES = ["deployment"]; - if (!message?.Body || !TARGETED_QUEUES.includes(this.queueName)) { - return false; - } - - const messageBody = JSON.parse(message.Body) as { webhookReceived?: number }; - const webhookReceived = messageBody?.webhookReceived; - if (!webhookReceived) { - context.log.warn( - { deletedMessageId: message.MessageId }, - `No webhookReceived timestamp found in message from ${this.queueName} queue` - ); - return false; - } - - if (Date.now() - webhookReceived > ONE_DAY_MILLI) { - try { - await this.deleteMessage(context); - context.log.warn( - { deletedMessageId: message.MessageId }, - `Deleted stale message from ${this.queueName} queue` - ); - return true; - } catch (error: unknown) { - context.log.error( - { error, deletedMessageId: message.MessageId }, - `Failed to delete stale message from ${this.queueName} queue` - ); - return false; - } - } - - return false; - } - private async executeMessage(message: Message, listenerContext: SQSContext): Promise { // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment const payload: MessagePayload = message.Body ? JSON.parse(message.Body) : {}; @@ -324,7 +284,6 @@ export class SqsQueue { try { const messageProcessingStartTime = Date.now(); - if (await this.deleteStaleMessages(message, context, payload?.jiraHost)) return; const rateLimitCheckResult = await preemptiveRateLimitCheck(context, this); if (rateLimitCheckResult.isExceedThreshold) {