Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 45 additions & 11 deletions packages/@aws-cdk/aws-pipes-targets-alpha/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ Pipe targets are the end point of an EventBridge Pipe. The following targets are
* `targets.ApiGatewayTarget`: [Send event source to an API Gateway REST API](#amazon-api-gateway-rest-api)
* `targets.CloudWatchLogsTarget`: [Send event source to a CloudWatch Logs log group](#amazon-cloudwatch-logs-log-group)
* `targets.EventBridgeTarget`: [Send event source to an EventBridge event bus](#amazon-eventbridge-event-bus)
* `targets.FirehoseTarget`: [Send event source to an Amazon Data Firehose delivery stream](#amazon-data-firehose-delivery-stream)
* `targets.KinesisTarget`: [Send event source to a Kinesis data stream](#amazon-kinesis-data-stream)
* `targets.LambdaFunction`: [Send event source to a Lambda function](#aws-lambda-function)
* `targets.SageMakerTarget`: [Send event source to a SageMaker pipeline](#amazon-sagemaker-pipeline)
Expand All @@ -37,7 +38,7 @@ Pipe targets are the end point of an EventBridge Pipe. The following targets are

### Amazon EventBridge API Destination

An EventBridge API destination can be used as a target for a pipe.
An EventBridge API destination can be used as a target for a pipe.
The API destination will receive the (enriched/filtered) source payload.

```ts
Expand Down Expand Up @@ -70,7 +71,7 @@ const pipe = new pipes.Pipe(this, 'Pipe', {

### Amazon API Gateway Rest API

A REST API can be used as a target for a pipe.
A REST API can be used as a target for a pipe.
The REST API will receive the (enriched/filtered) source payload.

```ts
Expand Down Expand Up @@ -115,7 +116,7 @@ const pipe = new pipes.Pipe(this, 'Pipe', {

### Amazon CloudWatch Logs Log Group

A CloudWatch Logs log group can be used as a target for a pipe.
A CloudWatch Logs log group can be used as a target for a pipe.
The log group will receive the (enriched/filtered) source payload.

```ts
Expand Down Expand Up @@ -148,7 +149,7 @@ const pipe = new pipes.Pipe(this, 'Pipe', {

### Amazon EventBridge Event Bus

An EventBridge event bus can be used as a target for a pipe.
An EventBridge event bus can be used as a target for a pipe.
The event bus will receive the (enriched/filtered) source payload.

```ts
Expand Down Expand Up @@ -179,9 +180,42 @@ const pipe = new pipes.Pipe(this, 'Pipe', {
});
```

### Amazon Data Firehose Delivery Stream

An Amazon Data Firehose delivery stream can be used as a target for a pipe.
The delivery stream will receive the (enriched/filtered) source payload.

```ts
declare const sourceQueue: sqs.Queue;
declare const targetDeliveryStream: firehose.DeliveryStream;

const deliveryStreamTarget = new targets.FirehoseTarget(targetDeliveryStream);

const pipe = new pipes.Pipe(this, 'Pipe', {
source: new SqsSource(sourceQueue),
target: deliveryStreamTarget,
});
```

The input to the target delivery stream can be transformed:

```ts
declare const sourceQueue: sqs.Queue;
declare const targetDeliveryStream: firehose.DeliveryStream;

const deliveryStreamTarget = new targets.FirehoseTarget(targetDeliveryStream, {
inputTransformation: pipes.InputTransformation.fromObject({ body: "👀" }),
});

const pipe = new pipes.Pipe(this, 'Pipe', {
source: new SqsSource(sourceQueue),
target: deliveryStreamTarget,
});
```

### Amazon Kinesis Data Stream

A Kinesis data stream can be used as a target for a pipe.
A Kinesis data stream can be used as a target for a pipe.
The data stream will receive the (enriched/filtered) source payload.

```ts
Expand Down Expand Up @@ -217,7 +251,7 @@ const pipe = new pipes.Pipe(this, 'Pipe', {

### AWS Lambda Function

A Lambda function can be used as a target for a pipe.
A Lambda function can be used as a target for a pipe.
The Lambda function will be invoked with the (enriched/filtered) source payload.

```ts
Expand Down Expand Up @@ -266,7 +300,7 @@ const pipe = new pipes.Pipe(this, 'Pipe', {

### Amazon SageMaker Pipeline

A SageMaker pipeline can be used as a target for a pipe.
A SageMaker pipeline can be used as a target for a pipe.
The pipeline will receive the (enriched/filtered) source payload.

```ts
Expand Down Expand Up @@ -299,7 +333,7 @@ const pipe = new pipes.Pipe(this, 'Pipe', {

### AWS Step Functions State Machine

A Step Functions state machine can be used as a target for a pipe.
A Step Functions state machine can be used as a target for a pipe.
The state machine will be invoked with the (enriched/filtered) source payload.

```ts
Expand Down Expand Up @@ -351,7 +385,7 @@ const pipe = new pipes.Pipe(this, 'Pipe', {

### Amazon SQS Queue

An SQS queue can be used as a target for a pipe.
An SQS queue can be used as a target for a pipe.
The queue will receive the (enriched/filtered) source payload.

```ts
Expand All @@ -374,8 +408,8 @@ declare const targetQueue: sqs.Queue;

const pipeTarget = new targets.SqsTarget(targetQueue,
{
inputTransformation: pipes.InputTransformation.fromObject(
{
inputTransformation: pipes.InputTransformation.fromObject(
{
"SomeKey": pipes.DynamicInput.fromEventPath('$.body')
})
}
Expand Down
43 changes: 43 additions & 0 deletions packages/@aws-cdk/aws-pipes-targets-alpha/lib/firehose.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import { IInputTransformation, IPipe, ITarget, TargetConfig } from '@aws-cdk/aws-pipes-alpha';
import { IRole } from 'aws-cdk-lib/aws-iam';
import { IDeliveryStream } from 'aws-cdk-lib/aws-kinesisfirehose';

/**
* Amazon Data Firehose target properties.
*/
export interface FirehoseTargetParameters {
/**
* The input transformation to apply to the message before sending it to the target.
*
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-pipes-pipe-pipetargetparameters.html#cfn-pipes-pipe-pipetargetparameters-inputtemplate
* @default - none
*/
readonly inputTransformation?: IInputTransformation;
}

/**
* An EventBridge Pipes target that sends messages to an Amazon Data Firehose delivery stream.
*/
export class FirehoseTarget implements ITarget {
private deliveryStream: IDeliveryStream;
private deliveryStreamParameters: FirehoseTargetParameters;
public readonly targetArn: string;

constructor(deliveryStream: IDeliveryStream, parameters: FirehoseTargetParameters = {}) {
this.deliveryStream = deliveryStream;
this.targetArn = deliveryStream.deliveryStreamArn;
this.deliveryStreamParameters = parameters;
}

grantPush(grantee: IRole): void {
this.deliveryStream.grantPutRecords(grantee);
}

bind(pipe: IPipe): TargetConfig {
return {
targetParameters: {
inputTemplate: this.deliveryStreamParameters.inputTransformation?.bind(pipe).inputTemplate,
},
};
}
}
1 change: 1 addition & 0 deletions packages/@aws-cdk/aws-pipes-targets-alpha/lib/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ export * from './api-destination';
export * from './api-gateway';
export * from './cloudwatch-logs';
export * from './event-bridge';
export * from './firehose';
export * from './kinesis';
export * from './lambda';
export * from './sagemaker';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import * as cdk from 'aws-cdk-lib';
import * as api from 'aws-cdk-lib/aws-apigateway';
import * as events from 'aws-cdk-lib/aws-events';
import * as kinesis from 'aws-cdk-lib/aws-kinesis';
import * as firehose from 'aws-cdk-lib/aws-kinesisfirehose';
import * as logs from 'aws-cdk-lib/aws-logs';
import * as sagemaker from 'aws-cdk-lib/aws-sagemaker';
import * as sqs from 'aws-cdk-lib/aws-sqs';
Expand Down
104 changes: 104 additions & 0 deletions packages/@aws-cdk/aws-pipes-targets-alpha/test/firehose.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
import { InputTransformation, Pipe } from '@aws-cdk/aws-pipes-alpha';
import { App, Stack } from 'aws-cdk-lib';
import { Template } from 'aws-cdk-lib/assertions';
import { DeliveryStream, S3Bucket } from 'aws-cdk-lib/aws-kinesisfirehose';
import { Bucket } from 'aws-cdk-lib/aws-s3';
import { TestSource } from './test-classes';
import { FirehoseTarget } from '../lib/firehose';

describe('Firehose', () => {
it('should have target arn', () => {
// ARRANGE
const app = new App();
const stack = new Stack(app, 'TestStack');
const bucket = new Bucket(stack, 'Bucket');
const deliveryStream = new DeliveryStream(stack, 'MyDeliveryStream', {
destination: new S3Bucket(bucket),
});

const target = new FirehoseTarget(deliveryStream);

new Pipe(stack, 'MyPipe', {
source: new TestSource(),
target,
});

// ACT
const template = Template.fromStack(stack);

// ASSERT
template.hasResourceProperties('AWS::Pipes::Pipe', {
Target: {
'Fn::GetAtt': [
'MyDeliveryStream79822137',
'Arn',
],
},
});
});

it('should have input transformation', () => {
// ARRANGE
const app = new App();
const stack = new Stack(app, 'TestStack');
const bucket = new Bucket(stack, 'Bucket');
const deliveryStream = new DeliveryStream(stack, 'MyDeliveryStream', {
destination: new S3Bucket(bucket),
});

const inputTransformation = InputTransformation.fromObject({
key: 'value',
});
const target = new FirehoseTarget(deliveryStream, {
inputTransformation,
});

new Pipe(stack, 'MyPipe', {
source: new TestSource(),
target,
});

// ACT
const template = Template.fromStack(stack);

// ASSERT
template.hasResourceProperties('AWS::Pipes::Pipe', {
TargetParameters: {
InputTemplate: '{"key":"value"}',
},
});
});

it('should grant pipe role putRecord access', () => {
// ARRANGE
const app = new App();
const stack = new Stack(app, 'TestStack');
const bucket = new Bucket(stack, 'Bucket');
const deliveryStream = new DeliveryStream(stack, 'MyDeliveryStream', {
destination: new S3Bucket(bucket),
});

const target = new FirehoseTarget(deliveryStream);

new Pipe(stack, 'MyPipe', {
source: new TestSource(),
target,
});

// ACT
const template = Template.fromStack(stack);

// ASSERT
template.hasResourceProperties('AWS::IAM::Policy', {
PolicyDocument: {
Statement: [
{
Action: ['firehose:PutRecord', 'firehose:PutRecordBatch'],
Resource: { 'Fn::GetAtt': ['MyDeliveryStream79822137', 'Arn'] },
},
],
},
Roles: [{ Ref: 'MyPipeRoleCBC8E9AB' }],
});
});
});

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading