Skip to content

feat(pipes-targets): add Firehose #34160

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions packages/@aws-cdk/aws-pipes-targets-alpha/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ Pipe targets are the end point of an EventBridge Pipe. The following targets are
* `targets.ApiGatewayTarget`: [Send event source to an API Gateway REST API](#amazon-api-gateway-rest-api)
* `targets.CloudWatchLogsTarget`: [Send event source to a CloudWatch Logs log group](#amazon-cloudwatch-logs-log-group)
* `targets.EventBridgeTarget`: [Send event source to an EventBridge event bus](#amazon-eventbridge-event-bus)
* `targets.FirehoseTarget`: [Send event source to a Firehose stream](#amazon-data-firehose-stream)
* `targets.KinesisTarget`: [Send event source to a Kinesis data stream](#amazon-kinesis-data-stream)
* `targets.LambdaFunction`: [Send event source to a Lambda function](#aws-lambda-function)
* `targets.SageMakerTarget`: [Send event source to a SageMaker pipeline](#amazon-sagemaker-pipeline)
Expand Down Expand Up @@ -179,6 +180,44 @@ const pipe = new pipes.Pipe(this, 'Pipe', {
});
```

### Amazon Data Firehose Stream

A Firehose stream can be used as a target for a pipe.
The topic will receive the (enriched/filtered) source payload.

```ts
declare const sourceQueue: sqs.Queue;
declare const targetStream: firehose.DeliveryStream;

const pipeTarget = new targets.FirehoseTarget(targetStream);

const pipe = new pipes.Pipe(this, 'Pipe', {
source: new SqsSource(sourceQueue),
target: pipeTarget
});
```

The target input can be transformed:

```ts
declare const sourceQueue: sqs.Queue;
declare const targetStream: firehose.DeliveryStream;

const pipeTarget = new targets.FirehoseTarget(targetStream,
{
inputTransformation: pipes.InputTransformation.fromObject(
{
"SomeKey": pipes.DynamicInput.fromEventPath('$.body')
})
}
);

const pipe = new pipes.Pipe(this, 'Pipe', {
source: new SqsSource(sourceQueue),
target: pipeTarget
});
```

### Amazon Kinesis Data Stream

A Kinesis data stream can be used as a target for a pipe.
Expand Down
49 changes: 49 additions & 0 deletions packages/@aws-cdk/aws-pipes-targets-alpha/lib/firehose.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import { IInputTransformation, IPipe, ITarget, TargetConfig } from '@aws-cdk/aws-pipes-alpha';
import { IRole } from 'aws-cdk-lib/aws-iam';
import { IDeliveryStream } from 'aws-cdk-lib/aws-kinesisfirehose';

/**
* Firehose target properties.
*/
export interface FirehoseTargetParameters {
/**
* The input transformation to apply to the message before sending it to the target.
*
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-pipes-pipe-pipetargetparameters.html#cfn-pipes-pipe-pipetargetparameters-inputtemplate
* @default - none
*/
readonly inputTransformation?: IInputTransformation;
}

/**
* A EventBridge Pipes target that sends messages to a Firehose stream.
*/
export class FirehoseTarget implements ITarget {
private stream: IDeliveryStream;
private streamParameters?: FirehoseTargetParameters;
public readonly targetArn: string;

constructor(stream: IDeliveryStream, parameters?: FirehoseTargetParameters) {
this.stream = stream;
this.targetArn = stream.deliveryStreamArn;
this.streamParameters = parameters;
}

grantPush(grantee: IRole): void {
this.stream.grantPutRecords(grantee);
}

bind(pipe: IPipe): TargetConfig {
if (!this.streamParameters?.inputTransformation) {
return {
targetParameters: {},
};
}

return {
targetParameters: {
inputTemplate: this.streamParameters.inputTransformation?.bind(pipe).inputTemplate,
},
};
}
}
1 change: 1 addition & 0 deletions packages/@aws-cdk/aws-pipes-targets-alpha/lib/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ export * from './api-destination';
export * from './api-gateway';
export * from './cloudwatch-logs';
export * from './event-bridge';
export * from './firehose';
export * from './kinesis';
export * from './lambda';
export * from './sagemaker';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import * as cdk from 'aws-cdk-lib';
import * as api from 'aws-cdk-lib/aws-apigateway';
import * as events from 'aws-cdk-lib/aws-events';
import * as firehose from 'aws-cdk-lib/aws-kinesisfirehose';
import * as kinesis from 'aws-cdk-lib/aws-kinesis';
import * as logs from 'aws-cdk-lib/aws-logs';
import * as sagemaker from 'aws-cdk-lib/aws-sagemaker';
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
// Jest Snapshot v1, https://goo.gl/fbAQLP

exports[`firehose should grant pipe role put access 1`] = `
{
"MyPipeRoleCBC8E9AB": {
"Properties": {
"AssumeRolePolicyDocument": {
"Statement": [
{
"Action": "sts:AssumeRole",
"Effect": "Allow",
"Principal": {
"Service": "pipes.amazonaws.com",
},
},
],
"Version": "2012-10-17",
},
},
"Type": "AWS::IAM::Role",
},
"MyStreamS3DestinationRole5E0BA960": {
"Properties": {
"AssumeRolePolicyDocument": {
"Statement": [
{
"Action": "sts:AssumeRole",
"Effect": "Allow",
"Principal": {
"Service": "firehose.amazonaws.com",
},
},
],
"Version": "2012-10-17",
},
},
"Type": "AWS::IAM::Role",
},
}
`;

exports[`firehose should grant pipe role put access 2`] = `
{
"MyPipeRoleDefaultPolicy31387C20": {
"Properties": {
"PolicyDocument": {
"Statement": [
{
"Action": [
"firehose:PutRecord",
"firehose:PutRecordBatch",
],
"Effect": "Allow",
"Resource": {
"Fn::GetAtt": [
"MyStream5C050E93",
"Arn",
],
},
},
],
"Version": "2012-10-17",
},
"PolicyName": "MyPipeRoleDefaultPolicy31387C20",
"Roles": [
{
"Ref": "MyPipeRoleCBC8E9AB",
},
],
},
"Type": "AWS::IAM::Policy",
},
"MyStreamS3DestinationRoleDefaultPolicy401EF6F2": {
"Properties": {
"PolicyDocument": {
"Statement": [
{
"Action": [
"s3:GetObject*",
"s3:GetBucket*",
"s3:List*",
"s3:DeleteObject*",
"s3:PutObject",
"s3:PutObjectLegalHold",
"s3:PutObjectRetention",
"s3:PutObjectTagging",
"s3:PutObjectVersionTagging",
"s3:Abort*",
],
"Effect": "Allow",
"Resource": [
{
"Fn::GetAtt": [
"Bucket83908E77",
"Arn",
],
},
{
"Fn::Join": [
"",
[
{
"Fn::GetAtt": [
"Bucket83908E77",
"Arn",
],
},
"/*",
],
],
},
],
},
{
"Action": [
"logs:CreateLogStream",
"logs:PutLogEvents",
],
"Effect": "Allow",
"Resource": {
"Fn::GetAtt": [
"MyStreamLogGroupAB67AB09",
"Arn",
],
},
},
],
"Version": "2012-10-17",
},
"PolicyName": "MyStreamS3DestinationRoleDefaultPolicy401EF6F2",
"Roles": [
{
"Ref": "MyStreamS3DestinationRole5E0BA960",
},
],
},
"Type": "AWS::IAM::Policy",
},
}
`;
118 changes: 118 additions & 0 deletions packages/@aws-cdk/aws-pipes-targets-alpha/test/firehose.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
import { InputTransformation, Pipe } from '@aws-cdk/aws-pipes-alpha';
import { App, Stack } from 'aws-cdk-lib';
import { Template } from 'aws-cdk-lib/assertions';
import { DeliveryStream, S3Bucket } from 'aws-cdk-lib/aws-kinesisfirehose';
import { Bucket } from 'aws-cdk-lib/aws-s3';
import { TestSource } from './test-classes';
import { FirehoseTarget } from '../lib';

describe('firehose', () => {
it('should have only target arn', () => {
// ARRANGE
const app = new App();
const stack = new Stack(app, 'TestStack');
const bucket = new Bucket(stack, 'Bucket');
const stream = new DeliveryStream(stack, 'MyStream', {
destination: new S3Bucket(bucket),
});
const target = new FirehoseTarget(stream);

new Pipe(stack, 'MyPipe', {
source: new TestSource(),
target,
});

// ACT
const template = Template.fromStack(stack);

// ASSERT
template.hasResourceProperties('AWS::Pipes::Pipe', {
Target: {
'Fn::GetAtt': [
'MyStream5C050E93',
'Arn',
],
},
TargetParameters: {},
});
});

it('should have input transformation', () => {
// ARRANGE
const app = new App();
const stack = new Stack(app, 'TestStack');
const bucket = new Bucket(stack, 'Bucket');
const stream = new DeliveryStream(stack, 'MyStream', {
destination: new S3Bucket(bucket),
});

const inputTransformation = InputTransformation.fromObject({
key: 'value',
});
const target = new FirehoseTarget(stream, {
inputTransformation,
});

new Pipe(stack, 'MyPipe', {
source: new TestSource(),
target,
});

// ACT
const template = Template.fromStack(stack);

// ASSERT
template.hasResourceProperties('AWS::Pipes::Pipe', {
TargetParameters: {
InputTemplate: '{"key":"value"}',
},
});
});

it('should handle an empty parameter object', () => {
// ARRANGE
const app = new App();
const stack = new Stack(app, 'TestStack');
const bucket = new Bucket(stack, 'Bucket');
const stream = new DeliveryStream(stack, 'MyStream', {
destination: new S3Bucket(bucket),
});
const target = new FirehoseTarget(stream, {});

new Pipe(stack, 'MyPipe', {
source: new TestSource(),
target,
});

// ACT
const template = Template.fromStack(stack);

// ASSERT
template.hasResourceProperties('AWS::Pipes::Pipe', {
TargetParameters: {},
});
});

it('should grant pipe role put access', () => {
// ARRANGE
const app = new App();
const stack = new Stack(app, 'TestStack');
const bucket = new Bucket(stack, 'Bucket');
const stream = new DeliveryStream(stack, 'MyStream', {
destination: new S3Bucket(bucket),
});
const target = new FirehoseTarget(stream);

new Pipe(stack, 'MyPipe', {
source: new TestSource(),
target,
});

// ACT
const template = Template.fromStack(stack);

// ASSERT
expect(template.findResources('AWS::IAM::Role')).toMatchSnapshot();
expect(template.findResources('AWS::IAM::Policy')).toMatchSnapshot();
});
});

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading