Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions packages/@aws-cdk/aws-pipes-targets-alpha/lib/firehose.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import { IInputTransformation, IPipe, ITarget, TargetConfig } from '@aws-cdk/aws-pipes-alpha';
import { IRole } from 'aws-cdk-lib/aws-iam';
import { IDeliveryStream } from 'aws-cdk-lib/aws-kinesisfirehose';

/**
* Amazon Data Firehose target properties.
*/
export interface FirehoseTargetParameters {
/**
* The input transformation to apply to the message before sending it to the target.
*
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-pipes-pipe-pipetargetparameters.html#cfn-pipes-pipe-pipetargetparameters-inputtemplate
* @default - none
*/
readonly inputTransformation?: IInputTransformation;
}

/**
* An EventBridge Pipes target that sends messages to an Amazon Data Firehose delivery stream.
*/
export class FirehoseTarget implements ITarget {
private stream: IDeliveryStream;
private streamParameters: FirehoseTargetParameters;
public readonly targetArn: string;

constructor(stream: IDeliveryStream, parameters: FirehoseTargetParameters = {}) {
this.stream = stream;
this.targetArn = stream.deliveryStreamArn;
this.streamParameters = parameters;
}

grantPush(grantee: IRole): void {
this.stream.grantPutRecords(grantee);
}

bind(pipe: IPipe): TargetConfig {
return {
targetParameters: {
inputTemplate: this.streamParameters.inputTransformation?.bind(pipe).inputTemplate,
},
};
}
}
1 change: 1 addition & 0 deletions packages/@aws-cdk/aws-pipes-targets-alpha/lib/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ export * from './lambda';
export * from './sagemaker';
export * from './sqs';
export * from './stepfunctions';
export * from './firehose';
104 changes: 104 additions & 0 deletions packages/@aws-cdk/aws-pipes-targets-alpha/test/firehose.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
import { InputTransformation, Pipe } from '@aws-cdk/aws-pipes-alpha';
import { App, Stack } from 'aws-cdk-lib';
import { Template } from 'aws-cdk-lib/assertions';
import { DeliveryStream, S3Bucket } from 'aws-cdk-lib/aws-kinesisfirehose';
import { Bucket } from 'aws-cdk-lib/aws-s3';
import { TestSource } from './test-classes';
import { FirehoseTarget } from '../lib/firehose';

describe('Firehose', () => {
it('should have target arn', () => {
// ARRANGE
const app = new App();
const stack = new Stack(app, 'TestStack');
const bucket = new Bucket(stack, 'Bucket');
const stream = new DeliveryStream(stack, 'MyStream', {
destination: new S3Bucket(bucket),
});

const target = new FirehoseTarget(stream);

new Pipe(stack, 'MyPipe', {
source: new TestSource(),
target,
});

// ACT
const template = Template.fromStack(stack);

// ASSERT
template.hasResourceProperties('AWS::Pipes::Pipe', {
Target: {
'Fn::GetAtt': [
'MyStream5C050E93',
'Arn',
],
},
});
});

it('should have input transformation', () => {
// ARRANGE
const app = new App();
const stack = new Stack(app, 'TestStack');
const bucket = new Bucket(stack, 'Bucket');
const stream = new DeliveryStream(stack, 'MyStream', {
destination: new S3Bucket(bucket),
});

const inputTransformation = InputTransformation.fromObject({
key: 'value',
});
const target = new FirehoseTarget(stream, {
inputTransformation,
});

new Pipe(stack, 'MyPipe', {
source: new TestSource(),
target,
});

// ACT
const template = Template.fromStack(stack);

// ASSERT
template.hasResourceProperties('AWS::Pipes::Pipe', {
TargetParameters: {
InputTemplate: '{"key":"value"}',
},
});
});

it('should grant pipe role putRecord access', () => {
// ARRANGE
const app = new App();
const stack = new Stack(app, 'TestStack');
const bucket = new Bucket(stack, 'Bucket');
const stream = new DeliveryStream(stack, 'MyStream', {
destination: new S3Bucket(bucket),
});

const target = new FirehoseTarget(stream);

new Pipe(stack, 'MyPipe', {
source: new TestSource(),
target,
});

// ACT
const template = Template.fromStack(stack);

// ASSERT
template.hasResourceProperties('AWS::IAM::Policy', {
PolicyDocument: {
Statement: [
{
Action: ['firehose:PutRecord', 'firehose:PutRecordBatch'],
Resource: { 'Fn::GetAtt': ['MyStream5C050E93', 'Arn'] },
},
],
},
Roles: [{ Ref: 'MyPipeRoleCBC8E9AB' }],
});
});
});

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading