diff --git a/.eslintignore b/.eslintignore index de1e10e..5afcfba 100644 --- a/.eslintignore +++ b/.eslintignore @@ -1,2 +1,3 @@ dist .eslintrc +extern/ diff --git a/.gitignore b/.gitignore index 8048c4b..6208801 100644 --- a/.gitignore +++ b/.gitignore @@ -47,3 +47,4 @@ data/scheme-size* notes/ archive-processor/downloaded-rdf archive-processor/local-kms-csv +extern diff --git a/bin/env/local_env.sh b/bin/env/local_env.sh index 23dfeac..ee5dd7b 100644 --- a/bin/env/local_env.sh +++ b/bin/env/local_env.sh @@ -17,5 +17,5 @@ export LOCALSTACK_CONTAINER_NAME="${LOCALSTACK_CONTAINER_NAME:-kms-localstack}" export LOCALSTACK_IMAGE="${LOCALSTACK_IMAGE:-localstack/localstack:3.8.1}" export LOCALSTACK_PORT="${LOCALSTACK_PORT:-4566}" export AWS_ENDPOINT_URL="${AWS_ENDPOINT_URL:-http://localstack:${LOCALSTACK_PORT}}" -export SAM_WARM_CONTAINERS="${SAM_WARM_CONTAINERS:-EAGER}" +export SAM_WARM_CONTAINERS="${SAM_WARM_CONTAINERS:-LAZY}" export SAM_LOCAL_WATCH="${SAM_LOCAL_WATCH:-false}" diff --git a/cdk/app/lib/CmrEventProcessingStack.ts b/cdk/app/lib/CmrEventProcessingStack.ts index 5197ecc..1c60be2 100644 --- a/cdk/app/lib/CmrEventProcessingStack.ts +++ b/cdk/app/lib/CmrEventProcessingStack.ts @@ -1,16 +1,10 @@ -import * as path from 'path' - import * as cdk from 'aws-cdk-lib' -import * as iam from 'aws-cdk-lib/aws-iam' -import * as lambda from 'aws-cdk-lib/aws-lambda' -import * as eventsources from 'aws-cdk-lib/aws-lambda-event-sources' -import { NodejsFunction } from 'aws-cdk-lib/aws-lambda-nodejs' import * as sns from 'aws-cdk-lib/aws-sns' -import * as subscriptions from 'aws-cdk-lib/aws-sns-subscriptions' -import * as sqs from 'aws-cdk-lib/aws-sqs' import { Construct } from 'constructs' +import { CmrKeywordEventsListenerSetup } from './helper/CmrKeywordEventsListenerSetup' import { LogForwardingSetup } from './helper/LogForwardingSetup' +import { MetadataCorrectionSetup } from './helper/MetadataCorrectionSetup' /** * Properties for the CMR event processing stack. @@ -29,10 +23,10 @@ export interface CmrEventProcessingStackProps extends cdk.StackProps { * that will process those events for downstream CMR business logic. */ export class CmrEventProcessingStack extends cdk.Stack { - public readonly keywordEventsQueueUrlOutput: cdk.CfnOutput + private readonly logForwardingSetup?: LogForwardingSetup /** - * Creates the CMR queue, subscription, listener, and queue output. + * Creates the CMR listener resources and metadata correction messaging resources. * * @param {Construct} scope - Parent construct. * @param {string} id - Stack identifier. @@ -42,58 +36,32 @@ export class CmrEventProcessingStack extends cdk.Stack { super(scope, id, props) const useLocalstack = this.node.tryGetContext('useLocalstack') === 'true' - const queueName = `${props.prefix}-${props.stage}-cmr-keyword-events` const topic = sns.Topic.fromTopicArn(this, 'KeywordEventsTopic', props.topicArn) - const queue = new sqs.Queue(this, 'CmrKeywordEventsQueue', { - queueName - }) - - topic.addSubscription(new subscriptions.SqsSubscription(queue)) - - const listenerRole = new iam.Role(this, 'CmrKeywordEventsProcessorRole', { - assumedBy: new iam.ServicePrincipal('lambda.amazonaws.com'), - managedPolicies: [ - iam.ManagedPolicy.fromAwsManagedPolicyName('service-role/AWSLambdaBasicExecutionRole') - ] + const metadataCorrectionSetup = new MetadataCorrectionSetup(this, 'MetadataCorrection', { + prefix: props.prefix, + stage: props.stage }) - const listenerLambda = new NodejsFunction(this, `${props.prefix}-cmr-keyword-events-processor`, { - functionName: `${props.prefix}-${props.stage}-cmr-keyword-events-processor`, - entry: path.join(__dirname, '../../../serverless/src/cmrKeywordEventsListener/handler.js'), - handler: 'cmrKeywordEventsListener', - runtime: lambda.Runtime.NODEJS_22_X, - timeout: cdk.Duration.seconds(30), - memorySize: 1024, - role: listenerRole, - depsLockFilePath: path.join(__dirname, '../../../package-lock.json'), - projectRoot: path.join(__dirname, '../../..') + const listenerSetup = new CmrKeywordEventsListenerSetup(this, 'CmrKeywordEventsListener', { + prefix: props.prefix, + stage: props.stage, + keywordEventsTopic: topic, + metadataCorrectionRequestsTopic: metadataCorrectionSetup.metadataCorrectionRequestsTopic }) - listenerLambda.addEventSource(new eventsources.SqsEventSource(queue, { - batchSize: 1 - })) - - queue.grantConsumeMessages(listenerLambda) - // Set up CloudWatch Logs forwarding to Splunk via NGAP SecLog account // Skip log forwarding for localstack deployments if (!useLocalstack) { - // eslint-disable-next-line no-new - new LogForwardingSetup(this, 'LogForwarding', { + this.logForwardingSetup = new LogForwardingSetup(this, 'LogForwarding', { prefix: props.prefix, stage: props.stage, logDestinationArn: props.logDestinationArn, lambdas: { - 'cmrKeywordEventsListener/handler.js::cmr-keyword-events-processor': listenerLambda + 'cmrKeywordEventsListener/handler.js::cmr-keyword-events-processor': listenerSetup.listenerLambda, + 'metadataCorrectionService/handler.js::metadata-correction-service': metadataCorrectionSetup.metadataCorrectionServiceLambda } }) } - - this.keywordEventsQueueUrlOutput = new cdk.CfnOutput(this, 'CmrKeywordEventsQueueUrl', { - description: 'Queue URL for CMR keyword event processing', - exportName: `${props.prefix}-CmrKeywordEventsQueueUrl`, - value: queue.queueUrl - }) } } diff --git a/cdk/app/lib/helper/CmrKeywordEventsListenerSetup.ts b/cdk/app/lib/helper/CmrKeywordEventsListenerSetup.ts new file mode 100644 index 0000000..7951784 --- /dev/null +++ b/cdk/app/lib/helper/CmrKeywordEventsListenerSetup.ts @@ -0,0 +1,77 @@ +import * as path from 'path' + +import * as cdk from 'aws-cdk-lib' +import * as eventsources from 'aws-cdk-lib/aws-lambda-event-sources' +import { NodejsFunction } from 'aws-cdk-lib/aws-lambda-nodejs' +import * as sns from 'aws-cdk-lib/aws-sns' +import * as subscriptions from 'aws-cdk-lib/aws-sns-subscriptions' +import * as sqs from 'aws-cdk-lib/aws-sqs' +import { Construct } from 'constructs' + +import { NODE_LAMBDA_RUNTIME } from './NodeLambdaRuntime' + +/** + * Properties for the CMR keyword events listener infrastructure. + */ +interface CmrKeywordEventsListenerSetupProps { + prefix: string + stage: string + keywordEventsTopic: sns.ITopic + metadataCorrectionRequestsTopic: sns.ITopic +} + +/** + * Creates the CMR keyword events queue, listener Lambda, and related wiring. + */ +export class CmrKeywordEventsListenerSetup extends Construct { + public readonly queue: sqs.Queue + + public readonly listenerLambda: NodejsFunction + + public readonly queueUrlOutput: cdk.CfnOutput + + /** + * @param {Construct} scope - Parent construct. + * @param {string} id - Construct identifier. + * @param {CmrKeywordEventsListenerSetupProps} props - Listener configuration. + */ + constructor(scope: Construct, id: string, props: CmrKeywordEventsListenerSetupProps) { + super(scope, id) + + const queueName = `${props.prefix}-${props.stage}-cmr-keyword-events` + const projectRoot = path.join(__dirname, '../../../..') + + this.queue = new sqs.Queue(this, 'CmrKeywordEventsQueue', { + queueName + }) + + props.keywordEventsTopic.addSubscription(new subscriptions.SqsSubscription(this.queue)) + + this.listenerLambda = new NodejsFunction(this, `${props.prefix}-cmr-keyword-events-processor`, { + functionName: `${props.prefix}-${props.stage}-cmr-keyword-events-processor`, + entry: path.join(projectRoot, 'serverless/src/cmrKeywordEventsListener/handler.js'), + handler: 'cmrKeywordEventsListener', + runtime: NODE_LAMBDA_RUNTIME, + timeout: cdk.Duration.seconds(30), + memorySize: 1024, + environment: { + METADATA_CORRECTION_REQUESTS_TOPIC_ARN: props.metadataCorrectionRequestsTopic.topicArn + }, + depsLockFilePath: path.join(projectRoot, 'package-lock.json'), + projectRoot + }) + + this.listenerLambda.addEventSource(new eventsources.SqsEventSource(this.queue, { + batchSize: 1 + })) + + this.queue.grantConsumeMessages(this.listenerLambda) + props.metadataCorrectionRequestsTopic.grantPublish(this.listenerLambda) + + this.queueUrlOutput = new cdk.CfnOutput(this, 'CmrKeywordEventsQueueUrl', { + description: 'Queue URL for CMR keyword event processing', + exportName: `${props.prefix}-CmrKeywordEventsQueueUrl`, + value: this.queue.queueUrl + }) + } +} diff --git a/cdk/app/lib/helper/KmsLambdaFunctions.ts b/cdk/app/lib/helper/KmsLambdaFunctions.ts index 85a3037..8c4c77a 100644 --- a/cdk/app/lib/helper/KmsLambdaFunctions.ts +++ b/cdk/app/lib/helper/KmsLambdaFunctions.ts @@ -11,6 +11,7 @@ import { NodejsFunction, NodejsFunctionProps } from 'aws-cdk-lib/aws-lambda-node import { Construct } from 'constructs' import { ApiResources } from './ApiResources' +import { NODE_LAMBDA_RUNTIME } from './NodeLambdaRuntime' /** * Interface for LambdaFunctions constructor properties @@ -582,7 +583,7 @@ export class LambdaFunctions { functionName: `${this.props.prefix}-${this.props.stage}-${functionName}`, entry: path.join(__dirname, '../../../../serverless/src', handlerPath), handler: handlerName, - runtime: lambda.Runtime.NODEJS_22_X, + runtime: NODE_LAMBDA_RUNTIME, timeout, memorySize, role: this.props.lambdaRole, diff --git a/cdk/app/lib/helper/MetadataCorrectionSetup.ts b/cdk/app/lib/helper/MetadataCorrectionSetup.ts new file mode 100644 index 0000000..5eb3d59 --- /dev/null +++ b/cdk/app/lib/helper/MetadataCorrectionSetup.ts @@ -0,0 +1,144 @@ +import * as path from 'path' + +import * as cdk from 'aws-cdk-lib' +import * as eventsources from 'aws-cdk-lib/aws-lambda-event-sources' +import { NodejsFunction } from 'aws-cdk-lib/aws-lambda-nodejs' +import * as sns from 'aws-cdk-lib/aws-sns' +import * as subscriptions from 'aws-cdk-lib/aws-sns-subscriptions' +import * as sqs from 'aws-cdk-lib/aws-sqs' +import { Construct } from 'constructs' + +import { NODE_LAMBDA_RUNTIME } from './NodeLambdaRuntime' + +/** + * Properties for metadata correction infrastructure. + */ +interface MetadataCorrectionSetupProps { + prefix: string + stage: string +} + +/** + * Creates the metadata correction SNS/SQS/Lambda plumbing and exports its endpoints. + */ +export class MetadataCorrectionSetup extends Construct { + public readonly metadataCorrectionRequestsTopic: sns.Topic + + public readonly metadataCorrectionRequestsQueue: sqs.Queue + + public readonly metadataCorrectionRequestsDlq: sqs.Queue + + public readonly metadataCorrectionServiceLambda: NodejsFunction + + public readonly metadataCorrectionRequestsTopicArnOutput: cdk.CfnOutput + + public readonly metadataCorrectionRequestsQueueUrlOutput: cdk.CfnOutput + + public readonly metadataCorrectionRequestsQueueArnOutput: cdk.CfnOutput + + public readonly metadataCorrectionRequestsDlqUrlOutput: cdk.CfnOutput + + public readonly metadataCorrectionRequestsDlqArnOutput: cdk.CfnOutput + + /** + * @param {Construct} scope - Parent construct. + * @param {string} id - Construct identifier. + * @param {MetadataCorrectionSetupProps} props - Metadata correction configuration. + */ + constructor(scope: Construct, id: string, props: MetadataCorrectionSetupProps) { + super(scope, id) + + const metadataCorrectionRequestsBaseName = `${props.prefix}-${props.stage}-metadata-correction-requests` + const metadataCorrectionRequestsName = `${metadataCorrectionRequestsBaseName}.fifo` + const projectRoot = path.join(__dirname, '../../../..') + + // TODO: Create a follow-up ticket for DLQ handling. This DLQ is only the + // redrive target today; before adding a consumer, decide whether failures + // should be alarmed on, manually inspected, or redriven by an operator. + this.metadataCorrectionRequestsDlq = new sqs.Queue(this, 'MetadataCorrectionRequestsDlq', { + queueName: `${metadataCorrectionRequestsBaseName}-dlq.fifo`, + fifo: true, + contentBasedDeduplication: true, + retentionPeriod: cdk.Duration.days(14) + }) + + this.metadataCorrectionRequestsQueue = new sqs.Queue(this, 'MetadataCorrectionRequestsQueue', { + queueName: metadataCorrectionRequestsName, + fifo: true, + contentBasedDeduplication: true, + deadLetterQueue: { + queue: this.metadataCorrectionRequestsDlq, + maxReceiveCount: 3 + }, + retentionPeriod: cdk.Duration.days(14), + visibilityTimeout: cdk.Duration.minutes(5) + }) + + this.metadataCorrectionRequestsTopic = new sns.Topic(this, 'MetadataCorrectionRequestsTopic', { + contentBasedDeduplication: true, + fifo: true, + topicName: metadataCorrectionRequestsName + }) + + this.metadataCorrectionRequestsTopic.addSubscription(new subscriptions.SqsSubscription( + this.metadataCorrectionRequestsQueue, + { + rawMessageDelivery: true + } + )) + + this.metadataCorrectionServiceLambda = new NodejsFunction( + this, + `${props.prefix}-metadata-correction-service`, + { + functionName: `${props.prefix}-${props.stage}-metadata-correction-service`, + entry: path.join(projectRoot, 'serverless/src/metadataCorrectionService/handler.js'), + handler: 'metadataCorrectionService', + runtime: NODE_LAMBDA_RUNTIME, + timeout: cdk.Duration.seconds(30), + memorySize: 1024, + depsLockFilePath: path.join(projectRoot, 'package-lock.json'), + projectRoot + } + ) + + this.metadataCorrectionServiceLambda.addEventSource(new eventsources.SqsEventSource( + this.metadataCorrectionRequestsQueue, + { + batchSize: 1 + } + )) + + this.metadataCorrectionRequestsQueue.grantConsumeMessages(this.metadataCorrectionServiceLambda) + + this.metadataCorrectionRequestsTopicArnOutput = new cdk.CfnOutput(this, 'MetadataCorrectionRequestsTopicArn', { + description: 'SNS topic ARN for metadata correction request publishing', + exportName: `${props.prefix}-MetadataCorrectionRequestsTopicArn`, + value: this.metadataCorrectionRequestsTopic.topicArn + }) + + this.metadataCorrectionRequestsQueueUrlOutput = new cdk.CfnOutput(this, 'MetadataCorrectionRequestsQueueUrl', { + description: 'Queue URL for metadata correction request processing', + exportName: `${props.prefix}-MetadataCorrectionRequestsQueueUrl`, + value: this.metadataCorrectionRequestsQueue.queueUrl + }) + + this.metadataCorrectionRequestsQueueArnOutput = new cdk.CfnOutput(this, 'MetadataCorrectionRequestsQueueArn', { + description: 'Queue ARN for metadata correction request processing', + exportName: `${props.prefix}-MetadataCorrectionRequestsQueueArn`, + value: this.metadataCorrectionRequestsQueue.queueArn + }) + + this.metadataCorrectionRequestsDlqUrlOutput = new cdk.CfnOutput(this, 'MetadataCorrectionRequestsDlqUrl', { + description: 'DLQ URL for failed metadata correction request processing', + exportName: `${props.prefix}-MetadataCorrectionRequestsDlqUrl`, + value: this.metadataCorrectionRequestsDlq.queueUrl + }) + + this.metadataCorrectionRequestsDlqArnOutput = new cdk.CfnOutput(this, 'MetadataCorrectionRequestsDlqArn', { + description: 'DLQ ARN for failed metadata correction request processing', + exportName: `${props.prefix}-MetadataCorrectionRequestsDlqArn`, + value: this.metadataCorrectionRequestsDlq.queueArn + }) + } +} diff --git a/cdk/app/lib/helper/NodeLambdaRuntime.ts b/cdk/app/lib/helper/NodeLambdaRuntime.ts new file mode 100644 index 0000000..3538b43 --- /dev/null +++ b/cdk/app/lib/helper/NodeLambdaRuntime.ts @@ -0,0 +1,9 @@ +import * as lambda from 'aws-cdk-lib/aws-lambda' + +/** + * Shared runtime for all Node.js Lambda functions in the CDK app. + * + * Keeping this in one place makes Node runtime upgrades explicit and consistent + * across stacks and helper constructs. + */ +export const NODE_LAMBDA_RUNTIME = lambda.Runtime.NODEJS_22_X diff --git a/scripts/localstack/bridge.js b/scripts/localstack/bridge.js index 7b21f26..1d8076a 100644 --- a/scripts/localstack/bridge.js +++ b/scripts/localstack/bridge.js @@ -272,15 +272,31 @@ const getQueueResourceName = (registration) => ( `${normalizeName(getRegistrationKey(registration))}-to-${normalizeName(registration.handler)}` ) +/** + * Adds the AWS FIFO suffix to a local resource name when the bridge registration asks for + * FIFO semantics. + * + * @param {string} name - Local resource name. + * @param {Object} registration - Bridge registration definition. + * @returns {string} Resource name, optionally suffixed for FIFO. + */ +const applyFifoSuffix = (name, registration) => ( + registration.eventPattern?.fifo ? `${name}.fifo` : name +) + /** * Ensures a LocalStack queue exists and returns its URL and ARN. * * @param {string} queueName - Fully qualified queue name to create or retrieve. + * @param {Object | undefined} attributes - Queue attributes to apply during creation. * @returns {Promise<{queueUrl: string | undefined, queueArn: string | undefined}>} */ -const ensureQueue = async (queueName) => { +const ensureQueue = async (queueName, attributes) => { const { QueueUrl: queueUrl } = await sqsClient.send(new CreateQueueCommand({ - QueueName: queueName + QueueName: queueName, + ...(attributes && { + Attributes: attributes + }) })) const queueAttributes = await sqsClient.send(new GetQueueAttributesCommand({ @@ -303,16 +319,34 @@ const ensureQueue = async (queueName) => { */ const ensureSnsToSqsResources = async (registration) => { const name = getRegistrationKey(registration) - const topicName = formatResourceName(normalizeName(registration.eventPattern.topicName)) - const queueName = formatResourceName(getQueueResourceName(registration)) + const topicName = applyFifoSuffix( + formatResourceName(normalizeName(registration.eventPattern.topicName)), + registration + ) + const queueName = applyFifoSuffix( + formatResourceName(getQueueResourceName(registration)), + registration + ) + const fifoAttributes = registration.eventPattern?.fifo + ? { + ContentBasedDeduplication: 'true', + FifoQueue: 'true' + } + : undefined const { TopicArn: topicArn } = await snsClient.send(new CreateTopicCommand({ - Name: topicName + Name: topicName, + ...(registration.eventPattern?.fifo && { + Attributes: { + ContentBasedDeduplication: 'true', + FifoTopic: 'true' + } + }) })) const { queueArn, queueUrl - } = await ensureQueue(queueName) + } = await ensureQueue(queueName, fifoAttributes) const queuePolicy = { Version: '2012-10-17', @@ -353,7 +387,12 @@ const ensureSnsToSqsResources = async (registration) => { await snsClient.send(new SubscribeCommand({ TopicArn: topicArn, Protocol: 'sqs', - Endpoint: queueArn + Endpoint: queueArn, + ...(registration.eventPattern.rawMessageDelivery && { + Attributes: { + RawMessageDelivery: 'true' + } + }) })) } diff --git a/scripts/localstack/run_bridge.sh b/scripts/localstack/run_bridge.sh index b6ca1ce..fb7f398 100755 --- a/scripts/localstack/run_bridge.sh +++ b/scripts/localstack/run_bridge.sh @@ -3,6 +3,9 @@ SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" PROJECT_ROOT="$(cd "${SCRIPT_DIR}/../.." && pwd)" +# Default local topic ARN: arn:aws:sns:us-east-1:000000000000:kms-dev-metadata-correction-requests.fifo +export METADATA_CORRECTION_REQUESTS_TOPIC_ARN="${METADATA_CORRECTION_REQUESTS_TOPIC_ARN:-arn:aws:sns:${AWS_REGION:-us-east-1}:${LOCALSTACK_ACCOUNT_ID:-000000000000}:${STACK_PREFIX:-kms}-${STAGE_NAME:-dev}-metadata-correction-requests.fifo}" + export BRIDGE_REGISTRY_JSON="${BRIDGE_REGISTRY_JSON:-$(cat <<'EOF' [ { @@ -20,6 +23,15 @@ export BRIDGE_REGISTRY_JSON="${BRIDGE_REGISTRY_JSON:-$(cat <<'EOF' "topicName": "keyword-events" } }, + { + "handler": "metadataCorrectionService", + "sourceType": "sns-to-sqs", + "eventPattern": { + "fifo": true, + "rawMessageDelivery": true, + "topicName": "metadata-correction-requests" + } + }, { "handler": "primeConceptsCache", "sourceType": "eventbridge-to-sqs", diff --git a/serverless/src/cmrKeywordEventsListener/__tests__/handler.test.js b/serverless/src/cmrKeywordEventsListener/__tests__/handler.test.js index 3600ac4..6160546 100644 --- a/serverless/src/cmrKeywordEventsListener/__tests__/handler.test.js +++ b/serverless/src/cmrKeywordEventsListener/__tests__/handler.test.js @@ -7,6 +7,7 @@ import { } from 'vitest' import { logger } from '@/shared/logger' +import { publishMetadataCorrectionRequest } from '@/shared/publishMetadataCorrectionRequest' import { cmrKeywordEventsListener } from '../handler' @@ -17,9 +18,18 @@ vi.mock('@/shared/logger', () => ({ } })) +vi.mock('@/shared/publishMetadataCorrectionRequest', () => ({ + publishMetadataCorrectionRequest: vi.fn() +})) + describe('when the CMR keyword events processor is invoked', () => { beforeEach(() => { vi.clearAllMocks() + vi.mocked(publishMetadataCorrectionRequest).mockResolvedValue({ + messageId: 'metadata-correction-message-123', + message: '{}', + topicArn: 'arn:aws:sns:us-east-1:000000000000:kms-dev-metadata-correction-requests.fifo' + }) }) describe('when the invocation is successful', () => { @@ -32,8 +42,12 @@ describe('when the CMR keyword events processor is invoked', () => { body: JSON.stringify({ Type: 'Notification', Message: JSON.stringify({ - event_type: 'keyword_updated', - uuid: '1234' + EventType: 'UPDATED', + Scheme: 'sciencekeywords', + UUID: '1234', + OldKeywordPath: 'Old > Keyword', + NewKeywordPath: 'New > Keyword', + Timestamp: '2026-04-21T00:00:00.000Z' }) }) } @@ -45,12 +59,33 @@ describe('when the CMR keyword events processor is invoked', () => { expect.objectContaining({ messageId: 'message-123', keywordEvent: expect.objectContaining({ - event_type: 'keyword_updated', - uuid: '1234' + EventType: 'UPDATED', + UUID: '1234' }) }) ) + expect(publishMetadataCorrectionRequest).toHaveBeenCalledWith({ + source: 'cmrKeywordEventsListener', + collectionConceptId: 'C0000000000-KMS', + keywordEvent: { + eventType: 'UPDATED', + scheme: 'sciencekeywords', + uuid: '1234', + oldKeywordPath: 'Old > Keyword', + newKeywordPath: 'New > Keyword', + timestamp: '2026-04-21T00:00:00.000Z' + } + }) + + expect(logger.info).toHaveBeenCalledWith( + '[consumer] Published metadata correction request', + expect.objectContaining({ + collectionConceptId: 'C0000000000-KMS', + messageId: 'metadata-correction-message-123' + }) + ) + expect(result).toEqual({ batchItemFailures: [] }) @@ -81,6 +116,8 @@ describe('when the CMR keyword events processor is invoked', () => { expect(result).toEqual({ batchItemFailures: [] }) + + expect(publishMetadataCorrectionRequest).not.toHaveBeenCalled() }) }) @@ -105,6 +142,8 @@ describe('when the CMR keyword events processor is invoked', () => { expect(result).toEqual({ batchItemFailures: [] }) + + expect(publishMetadataCorrectionRequest).not.toHaveBeenCalled() }) }) }) @@ -132,5 +171,28 @@ describe('when the CMR keyword events processor is invoked', () => { expect(logger.error).toHaveBeenCalled() }) }) + + describe('when publishing the metadata correction request fails', () => { + test('should log the error and throw', async () => { + vi.mocked(publishMetadataCorrectionRequest).mockRejectedValue(new Error('SNS unavailable')) + + await expect(cmrKeywordEventsListener({ + Records: [ + { + messageId: 'message-123', + body: JSON.stringify({ + Type: 'Notification', + Message: JSON.stringify({ + EventType: 'UPDATED', + UUID: '1234' + }) + }) + } + ] + })).rejects.toThrow('SNS unavailable') + + expect(logger.error).toHaveBeenCalled() + }) + }) }) }) diff --git a/serverless/src/cmrKeywordEventsListener/handler.js b/serverless/src/cmrKeywordEventsListener/handler.js index 8d44d63..d316377 100644 --- a/serverless/src/cmrKeywordEventsListener/handler.js +++ b/serverless/src/cmrKeywordEventsListener/handler.js @@ -1,4 +1,37 @@ import { logger } from '@/shared/logger' +import { publishMetadataCorrectionRequest } from '@/shared/publishMetadataCorrectionRequest' + +/** + * Placeholder collection concept id used until KMS-675A discovers real concept ids from CMR. + * KMS-675A will replace this with concept ids discovered from CMR. + * + * @type {string} + */ +const METADATA_CORRECTION_CONCEPT_ID = 'C0000000000-KMS' + +/** + * Builds a metadata correction request so the SNS/SQS/consumer path can be tested before + * CMR concept id lookup is implemented. + * + * TODO: Create a follow-up ticket to query CMR for every collection concept id that uses + * the changed keyword. The listener should publish one metadata correction request per + * collection concept id so FIFO ordering can protect corrections for the same collection. + * + * @param {Record} keywordEvent - Parsed KMS keyword event. + * @returns {Record} Metadata correction request payload. + */ +const buildMetadataCorrectionRequest = (keywordEvent) => ({ + source: 'cmrKeywordEventsListener', + collectionConceptId: METADATA_CORRECTION_CONCEPT_ID, + keywordEvent: { + eventType: keywordEvent.EventType, + scheme: keywordEvent.Scheme, + uuid: keywordEvent.UUID, + oldKeywordPath: keywordEvent.OldKeywordPath, + newKeywordPath: keywordEvent.NewKeywordPath, + timestamp: keywordEvent.Timestamp + } +}) /** * CMR event processor that consumes SNS notifications delivered through SQS. @@ -13,7 +46,7 @@ import { logger } from '@/shared/logger' export const cmrKeywordEventsListener = async (event) => { const records = event?.Records || [] - records.forEach((record) => { + await Promise.all(records.map(async (record) => { try { const snsEnvelope = JSON.parse(record.body || '{}') const keywordEvent = snsEnvelope.Message @@ -24,11 +57,22 @@ export const cmrKeywordEventsListener = async (event) => { messageId: record.messageId, keywordEvent }) + + if (keywordEvent) { + const metadataCorrectionRequest = buildMetadataCorrectionRequest(keywordEvent) + const publishResult = await publishMetadataCorrectionRequest(metadataCorrectionRequest) + + logger.info('[consumer] Published metadata correction request', { + collectionConceptId: metadataCorrectionRequest.collectionConceptId, + messageId: publishResult.messageId, + topicArn: publishResult.topicArn + }) + } } catch (error) { - logger.error('Failed to parse keyword event record', error) + logger.error('Failed to process keyword event record', error) throw error } - }) + })) return { batchItemFailures: [] diff --git a/serverless/src/getConcepts/handler.js b/serverless/src/getConcepts/handler.js index fa774cf..9a2585e 100644 --- a/serverless/src/getConcepts/handler.js +++ b/serverless/src/getConcepts/handler.js @@ -399,7 +399,7 @@ export const getConcepts = async (event, context) => { const endTime = performance.now() performanceMetrics.totalTime = (endTime - startTime).toFixed(2) - logger.info('get concepts performance=', JSON.stringify(performanceMetrics)) + logger.debug('get concepts performance=', JSON.stringify(performanceMetrics)) // API Gateway has a hard limit of responses at 6MB const SIZE_THRESHOLD = 5 * 1024 * 1024 // Set threshold to 5MB to have some buffer diff --git a/serverless/src/metadataCorrectionService/__tests__/handler.test.js b/serverless/src/metadataCorrectionService/__tests__/handler.test.js new file mode 100644 index 0000000..8539f86 --- /dev/null +++ b/serverless/src/metadataCorrectionService/__tests__/handler.test.js @@ -0,0 +1,106 @@ +import { + beforeEach, + describe, + expect, + test, + vi +} from 'vitest' + +import { logger } from '@/shared/logger' + +import { metadataCorrectionService } from '../handler' + +vi.mock('@/shared/logger', () => ({ + logger: { + info: vi.fn(), + error: vi.fn() + } +})) + +describe('when the metadata correction service is invoked', () => { + beforeEach(() => { + vi.clearAllMocks() + }) + + describe('when the invocation is successful', () => { + test('should log the parsed metadata correction request and acknowledge the batch', async () => { + const result = await metadataCorrectionService({ + Records: [ + { + messageId: 'message-123', + body: JSON.stringify({ + source: 'cmrKeywordEventsListener', + collectionConceptId: 'C0000000000-KMS', + keywordEvent: { + eventType: 'UPDATED', + uuid: '1234' + } + }) + } + ] + }) + + expect(logger.info).toHaveBeenCalledWith( + '[metadata-correction] Received metadata correction request', + expect.objectContaining({ + collectionConceptId: 'C0000000000-KMS', + messageId: 'message-123', + metadataCorrectionRequest: expect.objectContaining({ + source: 'cmrKeywordEventsListener', + keywordEvent: expect.objectContaining({ + eventType: 'UPDATED', + uuid: '1234' + }) + }) + }) + ) + + expect(result).toEqual({ + batchItemFailures: [] + }) + }) + + test('should acknowledge an empty batch', async () => { + await expect(metadataCorrectionService()).resolves.toEqual({ + batchItemFailures: [] + }) + }) + + test('should treat a missing record body as an empty request', async () => { + const result = await metadataCorrectionService({ + Records: [ + { + messageId: 'message-456' + } + ] + }) + + expect(logger.info).toHaveBeenCalledWith( + '[metadata-correction] Received metadata correction request', + expect.objectContaining({ + messageId: 'message-456', + metadataCorrectionRequest: {} + }) + ) + + expect(result).toEqual({ + batchItemFailures: [] + }) + }) + }) + + describe('when the invocation is unsuccessful', () => { + test('should log the error and throw when the record body cannot be parsed', async () => { + await expect(metadataCorrectionService({ + Records: [ + { + messageId: 'message-123', + body: 'not-json' + } + ] + })).rejects.toThrow() + + expect(logger.error).toHaveBeenCalled() + }) + }) +}) diff --git a/serverless/src/metadataCorrectionService/handler.js b/serverless/src/metadataCorrectionService/handler.js new file mode 100644 index 0000000..eb69564 --- /dev/null +++ b/serverless/src/metadataCorrectionService/handler.js @@ -0,0 +1,53 @@ +import { logger } from '@/shared/logger' + +/** + * Metadata correction service placeholder that consumes metadata correction requests from SQS. + * + * This proves the SNS/SQS/Lambda plumbing for KMS-676. Follow-on ticket will replace the stubbed + * logging behavior with real metadata fetch, keyword resolution, and native metadata updates. + * + * TODO: Create a follow-up ticket for targeted correction requests. When a request includes + * the affected keyword event, fetch the collection's native metadata from CMR, detect the + * metadata format, and delegate the specific keyword replacement to the appropriate updater + * for ISO, ECHO10, DIF10, or UMM. Each updater should modify only the affected keyword fields + * for its metadata format. + * + * TODO: Create a follow-up ticket for untargeted correction requests. If the request does not + * include a targeted keyword event, fetch the collection's UMM metadata and identify invalid + * keyword paths by validating against current KMS, or by asking CMR to validate and return the + * invalid keyword report. Use historical KMS lookup to map stale keyword paths to current KMS + * values, then call the native metadata updater with historical -> current keyword + * replacements. + * + * TODO: Consider making the correction service collection-level by default. Even when a + * keyword event is present, treat it as context and validate all keywords in the collection so + * the service can fix every stale keyword in one metadata update instead of issuing multiple + * targeted updates for the same collection. + * + * @param {{ Records?: Array<{ body?: string, messageId?: string }> }} event - SQS batch event. + * @returns {Promise<{batchItemFailures: Array}>} Empty batch failures for acknowledged messages. + */ +export const metadataCorrectionService = async (event) => { + const records = event?.Records || [] + + records.forEach((record) => { + try { + const metadataCorrectionRequest = JSON.parse(record.body || '{}') + + logger.info('[metadata-correction] Received metadata correction request', { + collectionConceptId: metadataCorrectionRequest.collectionConceptId, + messageId: record.messageId, + metadataCorrectionRequest + }) + } catch (error) { + logger.error('[metadata-correction] Failed to parse metadata correction request', error) + throw error + } + }) + + return { + batchItemFailures: [] + } +} + +export default metadataCorrectionService diff --git a/serverless/src/primeConceptsCache/handler.js b/serverless/src/primeConceptsCache/handler.js index eb96b99..cbb52fc 100644 --- a/serverless/src/primeConceptsCache/handler.js +++ b/serverless/src/primeConceptsCache/handler.js @@ -288,6 +288,19 @@ export const primeConceptsCache = async (event) => { await redisClient.set(CONCEPTS_CACHE_VERSION_KEY, versionMarker) + logger.info( + '[cache-prime] complete ' + + `status=${failed > 0 ? 'failed' : 'success'} ` + + `versionMarker=${versionMarker} ` + + `schemes=${schemes.length} ` + + `deletedKeys=${deletedKeys} ` + + `warmed=${warmed} ` + + `failed=${failed} ` + + `conceptsWarmed=${conceptResultsSummary.warmed} ` + + `schemeConceptsWarmed=${schemeResultsSummary.warmed} ` + + `treesWarmed=${treeResultsSummary.warmed}` + ) + return { statusCode: failed > 0 ? 500 : 200, body: JSON.stringify({ diff --git a/serverless/src/publisher/__tests__/handler.test.js b/serverless/src/publisher/__tests__/handler.test.js index 312c0bc..20a107a 100644 --- a/serverless/src/publisher/__tests__/handler.test.js +++ b/serverless/src/publisher/__tests__/handler.test.js @@ -237,6 +237,9 @@ describe('publisher handler', () => { expect(getConceptSchemeDetails).toHaveBeenCalledWith({ version: 'published' }) expect(getConceptSchemeDetails).toHaveBeenCalledWith({ version: 'draft' }) expect(downloadConcepts).toHaveBeenCalledTimes(4) // 2 schemes × 2 versions + expect(logger.info).toHaveBeenCalledWith( + '[publisher] Keyword changes summary schemes=2 processed=2 failed=0 added=0 removed=0 changed=0' + ) }) test('should process concept schemes sequentially', async () => { @@ -550,9 +553,6 @@ describe('publisher handler', () => { // Should have succeeded after retry expect(result.size).toBe(1) expect(result.has('sciencekeywords')).toBe(true) - expect(logger.info).toHaveBeenCalledWith( - expect.stringContaining('Retrying sciencekeywords (attempt 2/4)') - ) vi.useRealTimers() }) @@ -642,11 +642,6 @@ describe('publisher handler', () => { // Verify deletedscheme has removed keywords expect(result.get('deletedscheme').removedKeywords.size).toBe(1) - // Verify appropriate logging - expect(logger.info).toHaveBeenCalledWith( - 'Scheme deletedscheme does not exist in draft version (scheme removed). All keywords will be marked as DELETED.' - ) - // Verify compare was called with empty string for deletedscheme expect(mockComparator.compare).toHaveBeenCalledWith('published csv content', '') }) @@ -822,11 +817,6 @@ describe('publisher handler', () => { // Verify newscheme has added keywords expect(result.get('newscheme').addedKeywords.size).toBe(1) - // Verify appropriate logging - expect(logger.info).toHaveBeenCalledWith( - 'Scheme newscheme is new in draft version. All keywords will be marked as INSERTED.' - ) - // Verify compare was called with empty string for newscheme published expect(mockComparator.compare).toHaveBeenCalledWith('', 'draft csv content for newscheme') }) @@ -875,15 +865,6 @@ describe('publisher handler', () => { // Verify comparator was called 3 times expect(mockComparator.compare).toHaveBeenCalledTimes(3) - - // Verify appropriate logs for each case - expect(logger.info).toHaveBeenCalledWith( - 'Scheme deletedscheme does not exist in draft version (scheme removed). All keywords will be marked as DELETED.' - ) - - expect(logger.info).toHaveBeenCalledWith( - 'Scheme newscheme is new in draft version. All keywords will be marked as INSERTED.' - ) }) test('should use Unknown error fallback when retries exhaust without an error message and blocking is enabled', async () => { @@ -1586,7 +1567,6 @@ describe('publisher handler', () => { await publisher(mockEvent) - expect(logger.info).toHaveBeenCalledWith('[publisher] Created 1 keyword events') expect(callOrder).toEqual([ 'build-publish-query', 'execute-publish', diff --git a/serverless/src/publisher/handler.js b/serverless/src/publisher/handler.js index 1a6bf70..cf52545 100644 --- a/serverless/src/publisher/handler.js +++ b/serverless/src/publisher/handler.js @@ -141,8 +141,6 @@ export const createKeywordEvents = (keywordChangesMap) => { }) }) - logger.info(`Created ${keywordEvents.length} keyword events from ${keywordChangesMap.size} schemes`) - return keywordEvents } @@ -194,11 +192,6 @@ export const getKeywordChanges = async () => { return new Map() } - logger.info( - `Found ${allNotations.size} total concept schemes to process ` - + `(${publishedNotations.size} in published, ${draftNotations.size} in draft)` - ) - // Initialize CSV comparator const csvComparator = new CsvComparator() const failedSchemes = [] @@ -208,8 +201,6 @@ export const getKeywordChanges = async () => { const results = await Array.from(allNotations).reduce(async (resultsPromise, notation) => { const sequentialResults = await resultsPromise const result = await (async () => { - logger.info(`Processing concept scheme: ${notation}`) - const inPublished = publishedNotations.has(notation) const inDraft = draftNotations.has(notation) let comparison @@ -219,13 +210,8 @@ export const getKeywordChanges = async () => { /* eslint-disable no-await-in-loop */ for (let attempt = 0; attempt <= maxRetries; attempt += 1) { try { - if (attempt > 0) { - logger.info(`Retrying ${notation} (attempt ${attempt + 1}/${maxRetries + 1})`) - } - if (inPublished && inDraft) { // Normal case: scheme exists in both versions - logger.debug(`Downloading both versions for ${notation}`) const [publishedCsv, draftCsv] = await Promise.all([ downloadConcepts({ conceptScheme: notation, @@ -242,8 +228,6 @@ export const getKeywordChanges = async () => { comparison = csvComparator.compare(publishedCsv, draftCsv) } else if (inPublished && !inDraft) { // Scheme removed: all keywords marked as DELETED - logger.info(`Scheme ${notation} does not exist in draft version (scheme removed). All keywords will be marked as DELETED.`) - const publishedCsv = await downloadConcepts({ conceptScheme: notation, format: 'csv', @@ -255,8 +239,6 @@ export const getKeywordChanges = async () => { } else { // All notations come from the union of published and draft scheme sets, // so reaching this branch means the scheme only exists in draft. - logger.info(`Scheme ${notation} is new in draft version. All keywords will be marked as INSERTED.`) - const draftCsv = await downloadConcepts({ conceptScheme: notation, format: 'csv', @@ -269,15 +251,9 @@ export const getKeywordChanges = async () => { const summary = csvComparator.getSummary(comparison) - logger.info( - `Successfully processed ${notation}: ` - + `Found ${summary.addedCount} keywords added, ` - + `${summary.removedCount} keywords removed, ` - + `${summary.changedCount} keywords changed` - ) - return { notation, + summary, comparison } } catch (error) { @@ -291,7 +267,6 @@ export const getKeywordChanges = async () => { // Wait before retrying (exponential backoff: 1s, 2s, 4s) const delayMs = 2 ** attempt * 1000 - logger.info(`Waiting ${delayMs}ms before retry for ${notation}`) await new Promise((resolve) => { setTimeout(resolve, delayMs) }) @@ -341,7 +316,31 @@ export const getKeywordChanges = async () => { .map((result) => [result.notation, result.comparison]) ) - logger.info(`Keyword changes detection completed. Processed ${keywordChangesMap.size} concept schemes.`) + const keywordChangeSummary = results.reduce((summary, result) => { + if (!result) { + return summary + } + + return { + addedCount: summary.addedCount + result.summary.addedCount, + removedCount: summary.removedCount + result.summary.removedCount, + changedCount: summary.changedCount + result.summary.changedCount + } + }, { + addedCount: 0, + removedCount: 0, + changedCount: 0 + }) + + logger.info( + '[publisher] Keyword changes summary ' + + `schemes=${allNotations.size} ` + + `processed=${keywordChangesMap.size} ` + + `failed=${failedSchemes.length} ` + + `added=${keywordChangeSummary.addedCount} ` + + `removed=${keywordChangeSummary.removedCount} ` + + `changed=${keywordChangeSummary.changedCount}` + ) return keywordChangesMap } @@ -490,16 +489,9 @@ export const publisher = async (event) => { const keywordChanges = await getKeywordChanges() const keywordChangesDetected = countKeywordChanges(keywordChanges) - // Log summary of all changes - const totalSchemes = keywordChanges.size - logger.info(`[publisher] Analysis completed. Processed ${totalSchemes} concept schemes.`) - const keywordEvents = createKeywordEvents(keywordChanges) const keywordEventsGenerated = keywordEvents.length - logger.info(`[publisher] Created ${keywordEvents.length} keyword events`) - logger.info('[publisher] Keyword Events:', keywordEvents) - // Execute the publish operation logger.info(`[publisher] Executing publish update for version=${versionName}`) const publishQuery = getPublishUpdateQuery(versionName, publishDate) diff --git a/serverless/src/shared/__tests__/publishMetadataCorrectionRequest.test.js b/serverless/src/shared/__tests__/publishMetadataCorrectionRequest.test.js new file mode 100644 index 0000000..63512d6 --- /dev/null +++ b/serverless/src/shared/__tests__/publishMetadataCorrectionRequest.test.js @@ -0,0 +1,111 @@ +import { + beforeEach, + describe, + expect, + test, + vi +} from 'vitest' + +const { sendMock, snsClientMock } = vi.hoisted(() => ({ + sendMock: vi.fn(), + snsClientMock: vi.fn(() => ({ + send: sendMock + })) +})) + +vi.mock('@aws-sdk/client-sns', () => ({ + SNSClient: snsClientMock, + PublishCommand: vi.fn((input) => input) +})) + +describe('when the metadata correction request publisher is used', () => { + beforeEach(() => { + vi.resetModules() + vi.clearAllMocks() + process.env.METADATA_CORRECTION_REQUESTS_TOPIC_ARN = 'arn:aws:sns:us-east-1:000000000000:kms-dev-metadata-correction-requests.fifo' + delete process.env.AWS_ENDPOINT_URL + delete process.env.AWS_REGION + delete process.env.AWS_ACCESS_KEY_ID + delete process.env.AWS_SECRET_ACCESS_KEY + }) + + describe('when the request is successful', () => { + test('should publish the expected payload', async () => { + sendMock.mockResolvedValue({ MessageId: 'message-123' }) + const { publishMetadataCorrectionRequest } = await import('../publishMetadataCorrectionRequest') + + const payload = { + collectionConceptId: 'C0000000000-KMS', + keywordEvent: { + eventType: 'UPDATED', + uuid: '1234' + } + } + const result = await publishMetadataCorrectionRequest(payload) + + expect(sendMock).toHaveBeenCalledWith({ + TopicArn: 'arn:aws:sns:us-east-1:000000000000:kms-dev-metadata-correction-requests.fifo', + Message: JSON.stringify(payload), + MessageGroupId: 'C0000000000-KMS' + }) + + expect(result).toMatchObject({ + messageGroupId: 'C0000000000-KMS', + messageId: 'message-123', + topicArn: 'arn:aws:sns:us-east-1:000000000000:kms-dev-metadata-correction-requests.fifo' + }) + }) + + test('should create the SNS client with the LocalStack override when configured', async () => { + process.env.AWS_ENDPOINT_URL = 'http://localstack:4566' + process.env.AWS_REGION = 'us-east-1' + sendMock.mockResolvedValue({ MessageId: 'message-123' }) + + await import('../publishMetadataCorrectionRequest') + + expect(snsClientMock).toHaveBeenCalledWith({ + endpoint: 'http://localstack:4566', + region: 'us-east-1', + credentials: { + accessKeyId: 'test', + secretAccessKey: 'test' + } + }) + }) + + test('should default the LocalStack region and credentials when they are not configured', async () => { + process.env.AWS_ENDPOINT_URL = 'http://localstack:4566' + sendMock.mockResolvedValue({ MessageId: 'message-123' }) + + await import('../publishMetadataCorrectionRequest') + + expect(snsClientMock).toHaveBeenCalledWith({ + endpoint: 'http://localstack:4566', + region: 'us-east-1', + credentials: { + accessKeyId: 'test', + secretAccessKey: 'test' + } + }) + }) + }) + + describe('when the request is unsuccessful', () => { + test('should throw an error when the topic ARN is missing', async () => { + delete process.env.METADATA_CORRECTION_REQUESTS_TOPIC_ARN + const { publishMetadataCorrectionRequest } = await import('../publishMetadataCorrectionRequest') + + await expect(publishMetadataCorrectionRequest({ collectionConceptId: 'C0000000000-KMS' })) + .rejects + .toThrow('Missing METADATA_CORRECTION_REQUESTS_TOPIC_ARN') + }) + + test('should throw an error when the collection concept id is missing', async () => { + const { publishMetadataCorrectionRequest } = await import('../publishMetadataCorrectionRequest') + + await expect(publishMetadataCorrectionRequest({})) + .rejects + .toThrow('Missing metadata correction collectionConceptId') + }) + }) +}) diff --git a/serverless/src/shared/primeConcepts.js b/serverless/src/shared/primeConcepts.js index b8d7768..a994e8c 100644 --- a/serverless/src/shared/primeConcepts.js +++ b/serverless/src/shared/primeConcepts.js @@ -1,5 +1,3 @@ -import { logger } from '@/shared/logger' - /** * Primes /concepts and /concepts/concept_scheme routes for the given schemes. * @@ -122,9 +120,6 @@ export const primeConcepts = async ({ return responses }, Promise.resolve([])) - const pagesWithData = 1 + additionalResponses.length - const pagesAttempted = pagesWithData - logger.info(`[cache-prime] summary concepts scheme=${schemeName} format=${format} totalPages=${totalPages} attemptedPages=${pagesAttempted} pagesWithData=${pagesWithData}`) acc.push({ status: 'fulfilled', diff --git a/serverless/src/shared/publishMetadataCorrectionRequest.js b/serverless/src/shared/publishMetadataCorrectionRequest.js new file mode 100644 index 0000000..dcd9276 --- /dev/null +++ b/serverless/src/shared/publishMetadataCorrectionRequest.js @@ -0,0 +1,83 @@ +import { PublishCommand, SNSClient } from '@aws-sdk/client-sns' + +/** + * Creates an SNS client for either real AWS or a LocalStack endpoint override. + * + * @returns {SNSClient} Configured SNS client instance. + */ +const createSnsClient = () => { + const endpoint = process.env.AWS_ENDPOINT_URL + const config = endpoint + ? { + endpoint, + region: process.env.AWS_REGION || 'us-east-1', + credentials: { + accessKeyId: process.env.AWS_ACCESS_KEY_ID || 'test', + secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY || 'test' + } + } + : {} + + return new SNSClient(config) +} + +const snsClient = createSnsClient() + +/** + * Publishes a metadata correction request to the configured metadata correction topic. + * + * @param {Record} metadataCorrectionRequest - Request payload to publish. + * @returns {Promise<{messageId: string | undefined, message: string, messageGroupId: string, topicArn: string}>} + * Metadata describing the publish request and SNS response. + * @throws {Error} When the metadata correction requests topic ARN is not configured. + * @throws {Error} When the collection concept id is missing. + */ +export const publishMetadataCorrectionRequest = async (metadataCorrectionRequest) => { + const topicArn = process.env.METADATA_CORRECTION_REQUESTS_TOPIC_ARN + + if (!topicArn) { + throw new Error('Missing METADATA_CORRECTION_REQUESTS_TOPIC_ARN') + } + + /* + * FIFO ordering is scoped by MessageGroupId. Grouping by collection concept id prevents + * concurrent correction writes for the same collection while still allowing different + * collections to be corrected in parallel. + * + * Same collection group, processed in order: + * C123 -> message 1 + * C123 -> message 2 + * C123 -> message 3 + * + * Different collection groups, processed independently: + * C123 -> message 1 + * C456 -> message 1 + * C789 -> message 1 + * + * Keep this queue contract to one collectionConceptId per message. If a keyword event + * affects multiple collections, publish one message per collection so separate keyword + * events that touch the same collection are serialized through the same group. + */ + const messageGroupId = metadataCorrectionRequest.collectionConceptId + + if (!messageGroupId) { + throw new Error('Missing metadata correction collectionConceptId') + } + + const message = JSON.stringify(metadataCorrectionRequest) + + const response = await snsClient.send(new PublishCommand({ + TopicArn: topicArn, + Message: message, + MessageGroupId: String(messageGroupId) + })) + + return { + messageId: response.MessageId, + message, + messageGroupId: String(messageGroupId), + topicArn + } +} + +export default publishMetadataCorrectionRequest