Applicable when
- Firehose delivery stream is used
Implementation
The code below will create a new nested stack with configured Firehose Delivery Stream
import { Integration, IntegrationType, MethodOptions } from '@aws-cdk/aws-apigateway';
import { NestedStack, NestedStackProps } from '@aws-cdk/aws-cloudformation';
import { PolicyDocument, Role, ServicePrincipal } from '@aws-cdk/aws-iam';
import { CfnDeliveryStream } from '@aws-cdk/aws-kinesisfirehose';
import { withEnv } from '../util/consts';
import { getEventRequestTemplate } from '../util/vtl-templates';
export class FirehoseStack extends NestedStack {
public deliveryStreamName: string;
public deliveryStreamRoleName: string;
public eventRoleName: string;
public deliveryStreamRole: Role;
public deliveryStream: CfnDeliveryStream;
constructor(scope: BackendStack, id: string, props?: NestedStackProps) {
super(scope, id, props);
this.deliveryStreamName = withEnv('delivery-stream');
this.deliveryStreamRoleName = withEnv('delivery-stream-role');
this.eventRoleName = withEnv('event-role');
this.createDeliveryRole();
this.setupDeliveryStream();
}
public createDeliveryRole(): void {
// Add policies to the role as and when needed (inside this method)
this.deliveryStreamRole = new Role(this, this.deliveryStreamRoleName, {
roleName: this.deliveryStreamRoleName,
assumedBy: new ServicePrincipal('firehose.amazonaws.com'),
inlinePolicies: {
kinesisS3Policy: PolicyDocument.fromJson({
Version: '2012-10-17',
Statement: [
{
Effect: 'Allow',
Action: [
's3:AbortMultipartUpload',
's3:GetBucketLocation',
's3:GetObject',
's3:ListBucket',
's3:ListBucketMultipartUploads',
's3:PutObject',
],
Resource: `${this.backendStack.eventBucket.bucketArn}/*`,
},
],
}),
kinesisLambdaPolicy: PolicyDocument.fromJson({
Version: '2012-10-17',
Statement: [
{
Effect: 'Allow',
Action: 'lambda:*',
Resource: '*',
},
],
}),
gluePolicy: PolicyDocument.fromJson({
Version: '2012-10-17',
Statement: [
{
Effect: 'Allow',
Action: 'glue:*',
Resource: '*',
},
],
}),
firehosePolicy: PolicyDocument.fromJson({
Version: '2012-10-17',
Statement: [
{
Effect: 'Allow',
Action: 'firehose:*',
Resource: '*',
},
],
}),
},
});
// Provides Put, Get access to S3 and full access to CloudWatch Logs.
this.deliveryStreamRole.addManagedPolicy({
managedPolicyArn: 'arn:aws:iam::aws:policy/AWSLambdaExecute',
});
}
public setupDeliveryStream(): void {
this.deliveryStream = new CfnDeliveryStream(this, this.deliveryStreamName, {
deliveryStreamName: this.deliveryStreamName,
deliveryStreamType: 'DirectPut',
extendedS3DestinationConfiguration: {
bucketArn: this.backendStack.eventBucket.bucketArn,
roleArn: this.deliveryStreamRole.roleArn,
bufferingHints: {
sizeInMBs: 64,
intervalInSeconds: 60,
},
prefix: 'data/',
errorOutputPrefix: 'errors/',
processingConfiguration: {
enabled: true,
processors: [
{
type: 'Lambda',
parameters: [
{
parameterName: 'LambdaArn',
parameterValue: this.backendStack.eventProcessingLambda.functionArn,
},
],
},
],
},
dataFormatConversionConfiguration: {
enabled: true,
inputFormatConfiguration: {
deserializer: {
// These settings might need to be changed based on the use case
// This is the default settings when configured through the console
openXJsonSerDe: {
caseInsensitive: false,
// Add hive keywords (e.g. timestamp) if they are added to events schema
columnToJsonKeyMappings: {},
convertDotsInJsonKeysToUnderscores: false,
},
},
},
outputFormatConfiguration: {
serializer: {
// Add parquet options if anything specific is required
parquetSerDe: {
compression: 'SNAPPY',
},
},
},
schemaConfiguration: {
databaseName: this.backendStack.glueStack.database.databaseName, // Target Glue database name
roleArn: this.deliveryStreamRole.roleArn,
tableName: this.backendStack.glueStack.eventsTable.tableName, // Target Glue table name
},
},
},
});
}
}
Comments
0 comments
Please sign in to leave a comment.