Skip to content

feat(pipes-targets): add SNS #34159

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 40 additions & 1 deletion packages/@aws-cdk/aws-pipes-targets-alpha/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ Pipe targets are the end point of an EventBridge Pipe. The following targets are
* `targets.LambdaFunction`: [Send event source to a Lambda function](#aws-lambda-function)
* `targets.SageMakerTarget`: [Send event source to a SageMaker pipeline](#amazon-sagemaker-pipeline)
* `targets.SfnStateMachine`: [Invoke a Step Functions state machine from an event source](#aws-step-functions-state-machine)
* `targets.SqsTarget`: [Send event source to an SQS queue](#amazon-sqs)
* `targets.SnsTarget`: [Send event source to an SNS topic](#amazon-sns-topic)
* `targets.SqsTarget`: [Send event source to an SQS queue](#amazon-sqs-queue)

### Amazon EventBridge API Destination

Expand Down Expand Up @@ -349,6 +350,44 @@ const pipe = new pipes.Pipe(this, 'Pipe', {
});
```

### Amazon SNS Topic

An SNS topic can be used as a target for a pipe.
The topic will receive the (enriched/filtered) source payload.

```ts
declare const sourceQueue: sqs.Queue;
declare const targetTopic: sns.Topic;

const pipeTarget = new targets.SnsTarget(targetTopic);

const pipe = new pipes.Pipe(this, 'Pipe', {
source: new SqsSource(sourceQueue),
target: pipeTarget
});
```

The target input can be transformed:

```ts
declare const sourceQueue: sqs.Queue;
declare const targetTopic: sns.Topic;

const pipeTarget = new targets.SnsTarget(targetTopic,
{
inputTransformation: pipes.InputTransformation.fromObject(
{
"SomeKey": pipes.DynamicInput.fromEventPath('$.body')
})
}
);

const pipe = new pipes.Pipe(this, 'Pipe', {
source: new SqsSource(sourceQueue),
target: pipeTarget
});
```

### Amazon SQS Queue

An SQS queue can be used as a target for a pipe.
Expand Down
1 change: 1 addition & 0 deletions packages/@aws-cdk/aws-pipes-targets-alpha/lib/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,6 @@ export * from './event-bridge';
export * from './kinesis';
export * from './lambda';
export * from './sagemaker';
export * from './sns';
export * from './sqs';
export * from './stepfunctions';
49 changes: 49 additions & 0 deletions packages/@aws-cdk/aws-pipes-targets-alpha/lib/sns.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import { IInputTransformation, IPipe, ITarget, TargetConfig } from '@aws-cdk/aws-pipes-alpha';
import { IRole } from 'aws-cdk-lib/aws-iam';
import { ITopic } from 'aws-cdk-lib/aws-sns';

/**
* SNS target properties.
*/
export interface SnsTargetParameters {
/**
* The input transformation to apply to the message before sending it to the target.
*
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-pipes-pipe-pipetargetparameters.html#cfn-pipes-pipe-pipetargetparameters-inputtemplate
* @default - none
*/
readonly inputTransformation?: IInputTransformation;
}

/**
* A EventBridge Pipes target that sends messages to an SNS topic.
*/
export class SnsTarget implements ITarget {
private topic: ITopic;
private topicParameters?: SnsTargetParameters;
public readonly targetArn: string;

constructor(topic: ITopic, parameters?: SnsTargetParameters) {
this.topic = topic;
this.targetArn = topic.topicArn;
this.topicParameters = parameters;
}

grantPush(grantee: IRole): void {
this.topic.grantPublish(grantee);
}

bind(pipe: IPipe): TargetConfig {
if (!this.topicParameters?.inputTransformation) {
return {
targetParameters: {},
};
}

return {
targetParameters: {
inputTemplate: this.topicParameters.inputTransformation?.bind(pipe).inputTemplate,
},
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import * as events from 'aws-cdk-lib/aws-events';
import * as kinesis from 'aws-cdk-lib/aws-kinesis';
import * as logs from 'aws-cdk-lib/aws-logs';
import * as sagemaker from 'aws-cdk-lib/aws-sagemaker';
import * as sns from 'aws-cdk-lib/aws-sns';
import * as sqs from 'aws-cdk-lib/aws-sqs';
import * as sfn from 'aws-cdk-lib/aws-stepfunctions';
import * as lambda from 'aws-cdk-lib/aws-lambda';
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Jest Snapshot v1, https://goo.gl/fbAQLP

exports[`sns should grant pipe role publish access 1`] = `
{
"MyPipeRoleCBC8E9AB": {
"Properties": {
"AssumeRolePolicyDocument": {
"Statement": [
{
"Action": "sts:AssumeRole",
"Effect": "Allow",
"Principal": {
"Service": "pipes.amazonaws.com",
},
},
],
"Version": "2012-10-17",
},
},
"Type": "AWS::IAM::Role",
},
}
`;

exports[`sns should grant pipe role publish access 2`] = `
{
"MyPipeRoleDefaultPolicy31387C20": {
"Properties": {
"PolicyDocument": {
"Statement": [
{
"Action": "sns:Publish",
"Effect": "Allow",
"Resource": {
"Ref": "MyTopic86869434",
},
},
],
"Version": "2012-10-17",
},
"PolicyName": "MyPipeRoleDefaultPolicy31387C20",
"Roles": [
{
"Ref": "MyPipeRoleCBC8E9AB",
},
],
},
"Type": "AWS::IAM::Policy",
},
}
`;
Loading
Loading