Skip to content

Commit cb7c93e

Browse files
committed
Merge pull request awslabs#8 from tomsmaddox/master
Configurable stream sources
2 parents a929426 + e576645 commit cb7c93e

File tree

7 files changed

+242
-45
lines changed

7 files changed

+242
-45
lines changed

README.md

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,25 +11,40 @@ In order to effectively use this function, you should already have configured an
1111

1212
# Configuration
1313

14-
This AWS Lambda function uses either Tag information from Amazon Kinesis Stream, or a convention to determine which Delivery Stream to forward to. If Amazon Kinesis Streams are the source, the Delivery Stream can have any name, and to Tags are used to specify the Delivery Stream target. To Tag the Stream for Amazon Kinesis Firehose Delivery simply run the ```tagKinesisStream.sh``` script:
14+
This Lambda functions can map stream sources to Kinesis Firehose Delivery Streams in a few different ways (listed in order of preference):
15+
* Manually specified configuration (see [index.js:63](index.js#L59))
16+
* A DynamoDB stream naming convention to determine which Delivery Stream to forward to
17+
* An Kinesis Stream Tagging convention
18+
* (Optionally) A default delivery stream.
19+
20+
## Using the Default Delivery Stream
21+
In order to make sure that data will always be accepted by a Kinesis Firehose Delivery Stream this Lambda function can fail back to a default Delivery Stream if no manual configuration or other lookup has results.
22+
23+
This can be particularly helpful when developing and testing the integration of new data sources. In such cases you could have use the Default Delivery Stream to forward data to an S3 bucket with a one day retention period as specified in an [S3 Lifecycle Policy](http://docs.aws.amazon.com/AmazonS3/latest/dev/object-lifecycle-mgmt.html).
24+
25+
The Default Delivery Stream is enabled by default in the Lambda function, however to use it there should be a Kinesis Firehose with a matching name. You can use the [createDefaultDeliveryStream.sh](createDefaultDeliveryStream.sh) script to orchestrate its creation.
26+
27+
*Note: We recommend the usage of default delivery streams only for non-production workloads. They can be disabled by setting ```USE_DEFAULT_DELIVERY_STREAMS = false``` (see [index.js:61](index.js#L61))*
28+
29+
## Specifying a Delivery Stream for a Kinesis Stream Source
30+
If Amazon Kinesis Streams are the source, the Delivery Stream can be specified in configuration or tags can be used to specify the Delivery Stream target. To Tag the Stream for Amazon Kinesis Firehose Delivery simply run the ```tagKinesisStream.sh``` script:
1531

1632
```
1733
tagStream.sh <My Kinesis Stream> <My Firehose Delivery Stream> <region>
18-
34+
where
1935
<My Kinesis Stream> - The Amazon Kinesis Stream for which an event source has been created to the Forwarder Lambda function
2036
<My Firehose Delivery Stream> - The Amazon Kinesis Firehose Delivery Stream which you've configured to deliver to the required destination
2137
<region> - The region in which the Kinesis Stream & Firehose Delivery Stream have been created. Today only single region operation is permitted
2238
```
2339

2440
This will add a new Stream Tag named ```ForwardToFirehoseStream``` on the Kinesis Stream with the value you supply. This is limited to delivery in the same region as the Kinesis Stream or DynamoDB table. You can run the script any time to update this value. To view the Tags configured on the Stream, simply run ```aws kinesis list-tags-for-stream --stream-name <My Kinesis Stream> --region <region>```
2541

26-
If you are using Amazon DynamoDB, then *the Firehose Delivery Stream must be the same name as the Amazon DynamoDB Table*.
27-
28-
Only single region deployments are supported today.
42+
## Specifying a Delivery Stream for a DynamoDB Stream Source
43+
If you are using Amazon DynamoDB, then manual configuration can be used or the Firehose Delivery Stream should have the same name as the Amazon DynamoDB Table.
2944

3045
# Deploying
3146

32-
To use this function, simply deploy the [LambdaStreamToFirehose-1.2.0.zip](https://github.com/awslabs/lambda-streams-to-firehose/blob/master/dist/LambdaStreamToFirehose-1.2.0.zip) to AWS Lambda with handler `index.handler`. You must ensure that it is deployed with an invocation role that includes the ability to write Amazon CloudWatch Logs, Read from Amazon Kinesis or Amazon DynamoDB Streams, and Write to Amazon Kinesis Firehose:
47+
To use this function, simply deploy the [LambdaStreamToFirehose-1.3.0.zip](https://github.com/awslabs/lambda-streams-to-firehose/blob/master/dist/LambdaStreamToFirehose-1.3.0.zip) to AWS Lambda with handler `index.handler`. You must ensure that it is deployed with an invocation role that includes the ability to write Amazon CloudWatch Logs, Read from Amazon Kinesis or Amazon DynamoDB Streams, and Write to Amazon Kinesis Firehose:
3348

3449
```
3550
{
@@ -92,7 +107,7 @@ To use this function, simply deploy the [LambdaStreamToFirehose-1.2.0.zip](https
92107
}
93108
```
94109

95-
You may choose to restrict the IAM role to be specific to a subset of Kinesis or DynamoDB Update Streams and Firehose endpoints.
110+
You may choose to restrict the IAM role to be specific to a subset of Kinesis or DynamoDB Update Streams and Firehose endpoints.
96111

97112
Finally, create an Event Source (http://docs.aws.amazon.com/lambda/latest/dg/intro-core-components.html) for this function from the Stream to be forwarded to Firehose.
98113

build.sh

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ functionName=LambdaStreamToFirehose
2222
filename=$functionName-$version.zip
2323
region=eu-west-1
2424

25+
npm install
26+
2527
rm $filename 2>&1 >> /dev/null
2628

2729
zip -r $filename index.js package.json node_modules/ README.md LICENSE && mv -f $filename dist/$filename

createDefaultDeliveryStream.sh

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
#!/bin/bash
2+
3+
# Kinesis Streams to Firehose
4+
#
5+
# Copyright 2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
6+
#
7+
# Licensed under the Apache License, Version 2.0 (the "License");
8+
# you may not use this file except in compliance with the License.
9+
# You may obtain a copy of the License at
10+
#
11+
# http://www.apache.org/licenses/LICENSE-2.0
12+
#
13+
# Unless required by applicable law or agreed to in writing, software
14+
# distributed under the License is distributed on an "AS IS" BASIS,
15+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
# See the License for the specific language governing permissions and
17+
# limitations under the License.
18+
19+
region=eu-west-1
20+
21+
s3BucketNamePrefix="LambdaStreamsDefaultDeliveryBucket"
22+
iamRoleName="LambdaStreamsDefaultDeliveryRole"
23+
iamPolicyName="LambdaStreamsDefaultDeliveryPolicy"
24+
deliveryStreamName="LambdaStreamsDefaultDeliveryStream"
25+
26+
which aws > /dev/null 2>&1
27+
28+
if [ $? != 0 ]; then
29+
echo "This utility requires the AWS Cli, which can be installed using instructions found at http://docs.aws.amazon.com/cli/latest/userguide/installing.html"
30+
exit -2
31+
fi
32+
33+
randomChars=$(openssl rand -base64 8 | tr -dc 'a-cA-Z0-9')
34+
suggestedBucketName="$s3BucketNamePrefix-$randomChars"
35+
read -p "Please enter default stream destination bucket name [$suggestedBucketName]: " bucketName
36+
bucketName=${bucketName:-$suggestedBucketName}
37+
38+
echo -n "Creating S3 Bucket: $bucketName.. "
39+
aws s3 mb --region $region s3://$bucketName --output text |
40+
if [ $? -ne 0 ]; then
41+
exit 1
42+
fi
43+
echo "OK"
44+
45+
echo -n "Creating IAM Role: $iamRoleName.. "
46+
roleArn=$(aws iam create-role --query "Role.Arn" --output text \
47+
--role-name $iamRoleName \
48+
--assume-role-policy-document "{
49+
\"Version\": \"2012-10-17\",
50+
\"Statement\": [
51+
{
52+
\"Sid\": \"PermitFirehoseAccess\",
53+
\"Effect\": \"Allow\",
54+
\"Principal\": {
55+
\"Service\": \"firehose.amazonaws.com\"
56+
},
57+
\"Action\": \"sts:AssumeRole\"
58+
}
59+
]
60+
}")
61+
if [ $? -ne 0 ]; then
62+
aws s3api delete-bucket --region $region --bucket $bucketName
63+
exit 1
64+
fi
65+
echo "OK"
66+
67+
echo -n "Creating IAM Policy: $iamPolicyName.. "
68+
aws iam put-role-policy \
69+
--role-name $iamRoleName \
70+
--policy-name $iamPolicyName \
71+
--policy-document "{
72+
\"Version\": \"2012-10-17\",
73+
\"Statement\": [
74+
{
75+
\"Sid\": \"PermitFirehoseUsage\",
76+
\"Effect\": \"Allow\",
77+
\"Action\": [
78+
\"s3:AbortMultipartUpload\",
79+
\"s3:GetBucketLocation\",
80+
\"s3:GetObject\",
81+
\"s3:ListBucket\",
82+
\"s3:ListBucketMultipartUploads\",
83+
\"s3:PutObject\"
84+
],
85+
\"Resource\": [
86+
\"arn:aws:s3:::$bucketName\",
87+
\"arn:aws:s3:::$bucketName/*\"
88+
]
89+
}
90+
]
91+
}"
92+
if [ $? -ne 0 ]; then
93+
aws iam delete-role --role-name $iamRoleName
94+
aws s3api delete-bucket --region $region --bucket $bucketName
95+
exit 1
96+
fi
97+
echo "OK"
98+
99+
100+
echo "Waiting..."
101+
sleep 30
102+
103+
echo -n "Creating Kinesis Firehose Delivery Stream: $deliveryStreamName with role arn $roleArn and bucket $bucketName.. "
104+
deliveryStreamArn=$(aws firehose create-delivery-stream --region $region --query "DeliveryStreamARN" --output text \
105+
--delivery-stream-name $deliveryStreamName \
106+
--s3-destination-configuration "RoleARN=$roleArn,BucketARN=arn:aws:s3:::$bucketName")
107+
if [ $? -ne 0 ]; then
108+
aws iam delete-role-policy \
109+
--role-name $iamRoleName \
110+
--policy-name $iamPolicyName
111+
aws iam delete-role --role-name $iamRoleName
112+
aws s3api delete-bucket --region $region --bucket $bucketName
113+
exit 1
114+
fi
115+
echo "OK"
116+
117+
echo "Delivery Stream ARN: $deliveryStreamArn"

dist/LambdaStreamToFirehose-1.1.1.zip

25.5 KB
Binary file not shown.

dist/LambdaStreamToFirehose-1.3.0.zip

1.17 MB
Binary file not shown.

index.js

Lines changed: 99 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,23 @@ var FIREHOSE_MAX_BATCH_BYTES = 4 * 1024 * 1024;
4242
// should KPL checksums be calculated?
4343
var computeChecksums = true;
4444

45-
var deliveryStreamMapping = {};
45+
/*
46+
* If the source Kinesis Stream's tags or DynamoDB Stream Name don't resolve to
47+
* an existing Firehose, allow usage of a default delivery stream, or fail with
48+
* an error.
49+
*/
50+
var USE_DEFAULT_DELIVERY_STREAMS = true;
51+
/*
52+
* Delivery stream mappings can be specified here to overwrite values provided
53+
* by Kinesis Stream tags or DynamoDB stream name. (Helpful for debugging)
54+
* Format:
55+
* DDBStreamName: deliveryStreamName
56+
* Or:
57+
* FORWARD_TO_FIREHOSE_STREAM tag value: deliveryStreamName
58+
*/
59+
var deliveryStreamMapping = {
60+
'DEFAULT': 'LambdaStreamsDefaultDeliveryStream'
61+
};
4662

4763
var start;
4864

@@ -227,6 +243,9 @@ exports.handler = function(event, context) {
227243
* record.
228244
*/
229245
exports.processTransformedRecords = function(transformed, streamName, deliveryStreamName) {
246+
if (debug) {
247+
console.log('Processing transformed records');
248+
}
230249
// get the set of batch offsets based on the transformed record sizes
231250
var batches = exports.getBatchRanges(transformed);
232251

@@ -270,6 +289,9 @@ exports.handler = function(event, context) {
270289
* stream
271290
*/
272291
exports.writeToFirehose = function(firehoseBatch, streamName, deliveryStreamName, callback) {
292+
if (debug) {
293+
console.log('Writing to firehose');
294+
}
273295
// write the batch to firehose with putRecordBatch
274296
var putRecordBatchParams = {
275297
DeliveryStreamName : deliveryStreamName,
@@ -296,7 +318,10 @@ exports.handler = function(event, context) {
296318
* requests to forward to Firehose
297319
*/
298320
exports.processEvent = function(event, serviceName, streamName) {
299-
// look up the delivery stream name in the mapping cache
321+
if (debug) {
322+
console.log('Processing event');
323+
}
324+
// look up the delivery stream name of the mapping cache
300325
var deliveryStreamName = deliveryStreamMapping[streamName];
301326

302327
if (debug) {
@@ -378,11 +403,19 @@ exports.handler = function(event, context) {
378403
* specified Kinesis Stream Name, using Tags
379404
*/
380405
exports.buildDeliveryMap = function(streamName, serviceName, event, callback) {
381-
if (serviceName === DDB_SERVICE_NAME) {
406+
if (debug) {
407+
console.log('Building delivery stream mapping');
408+
}
409+
if (deliveryStreamMapping[streamName]) {
410+
// A delivery stream has already been specified in configuration
411+
// This could be indicative of debug usage.
412+
USE_DEFAULT_DELIVERY_STREAMS = false;
413+
exports.verifyDeliveryStreamMapping(streamName, event, callback);
414+
} else if (serviceName === DDB_SERVICE_NAME) {
382415
// dynamodb streams need the firehose delivery stream to match
383416
// the table name
384417
deliveryStreamMapping[streamName] = streamName;
385-
callback();
418+
exports.verifyDeliveryStreamMapping(streamName, event, callback);
386419
} else {
387420
// get the delivery stream name from Kinesis tag
388421
exports.kinesis.listTagsForStream({
@@ -395,45 +428,73 @@ exports.handler = function(event, context) {
395428
// name item
396429
data.Tags.map(function(item) {
397430
if (item.Key === FORWARD_TO_FIREHOSE_STREAM) {
431+
/* Disable fallback to a default delivery stream as
432+
* a FORWARD_TO_FIREHOSE_STREAM has been specifically
433+
* set.
434+
*/
435+
USE_DEFAULT_DELIVERY_STREAMS = false;
398436
deliveryStreamMapping[streamName] = item.Value;
399437
}
400438
});
401439

402-
if (!deliveryStreamMapping[streamName]) {
403-
// fail as the stream isn't tagged for delivery, but
404-
// since there is an event source configured we think
405-
// this
406-
// should have been done and is probably a
407-
// misconfiguration
408-
finish(event, ERROR, "Warning: Kinesis Stream " + streamName + " not tagged for Firehose delivery with Tag name " + FORWARD_TO_FIREHOSE_STREAM);
409-
} else {
410-
// validate the delivery stream name provided
411-
var params = {
412-
DeliveryStreamName : deliveryStreamMapping[streamName]
413-
};
414-
exports.firehose.describeDeliveryStream(params, function(err, data) {
415-
if (err) {
416-
// do not continue with the cached mapping
417-
var deliveryStream = deliveryStreamMapping[streamName];
418-
delete deliveryStreamMapping[streamName];
419-
420-
finish(event, ERROR, "Delivery Stream " + deliveryStream + " does not exist in region " + setRegion);
421-
} else {
422-
// call the specified callback - should have
423-
// already been prepared by the calling function
424-
callback();
425-
}
426-
});
427-
}
440+
exports.verifyDeliveryStreamMapping(streamName, event, callback);
428441
}
429442
});
430443
}
431-
432444
};
433445

434-
/*
435-
* if (debug) { console.log(JSON.stringify(event)); }
436-
*/
446+
exports.verifyDeliveryStreamMapping = function(streamName, event, callback) {
447+
if (debug) {
448+
console.log('Verifying delivery stream');
449+
}
450+
if (!deliveryStreamMapping[streamName]) {
451+
if (USE_DEFAULT_DELIVERY_STREAMS) {
452+
/* No delivery stream has been specified, probably as it's not
453+
* configured in stream tags. Using default delivery stream.
454+
* To prevent accidental forwarding of streams to a firehose set
455+
* USE_DEFAULT_DELIVERY_STREAMS = false.
456+
*/
457+
deliveryStreamMapping[streamName] = deliveryStreamMapping['DEFAULT'];
458+
} else {
459+
/*
460+
* Fail as no delivery stream mapping has been specified and we
461+
* have not configured to use a default.
462+
* Kinesis Streams should be tagged with
463+
* ForwardToFirehoseStream = <DeliveryStreamName>
464+
*/
465+
finish(event, ERROR, "Warning: Kinesis Stream " + streamName + " not tagged for Firehose delivery with Tag name " + FORWARD_TO_FIREHOSE_STREAM);
466+
return;
467+
}
468+
}
469+
// validate the delivery stream name provided
470+
var params = {
471+
DeliveryStreamName : deliveryStreamMapping[streamName]
472+
};
473+
exports.firehose.describeDeliveryStream(params, function(err, data) {
474+
if (err) {
475+
// do not continue with the cached mapping
476+
delete deliveryStreamMapping[streamName];
477+
478+
if (!USE_DEFAULT_DELIVERY_STREAMS || deliveryStreamMapping[streamName] == deliveryStreamMapping['DEFAULT']) {
479+
finish(event, ERROR, "Could not find suitable delivery stream for " + streamName + " and the " +
480+
"default delivery stream (" + deliveryStreamMapping['DEFAULT'] + ") either doesn't exist or is disabled.");
481+
} else {
482+
deliveryStreamMapping[streamName] = deliveryStreamMapping['DEFAULT'];
483+
exports.verifyDeliveryStreamMapping(streamName, event, callback);
484+
}
485+
} else {
486+
// call the specified callback - should have
487+
// already
488+
// been prepared by the calling function
489+
callback();
490+
}
491+
});
492+
}
493+
494+
/** End Runtime Functions */
495+
if (debug) {
496+
console.log(JSON.stringify(event));
497+
}
437498

438499
// fail the function if the wrong event source type is being sent, or if
439500
// there is no data, etc
@@ -467,8 +528,10 @@ exports.handler = function(event, context) {
467528
// parse the stream name out of the event
468529
var streamName = exports.getStreamName(event.Records[0].eventSourceARN);
469530

470-
if (!streamName) {
471-
finish(event, ERROR, "Malformed Kinesis Stream ARN");
531+
if (!deliveryStreamMapping[streamName]) {
532+
// no delivery stream cached so far, so add this stream's tag value
533+
// to the delivery map, and continue with processEvent
534+
exports.buildDeliveryMap(streamName, serviceName, event, exports.processEvent.bind(undefined, event, serviceName, streamName));
472535
} else {
473536

474537
if (deliveryStreamMapping.length === 0 || !deliveryStreamMapping[streamName]) {

package.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
{
22
"name": "lambda-stream-to-firehose",
33
"description": "An AWS Lambda function that forwards data from a Kinesis or DynamoDB Update Stream to a Kinesis Firehose Delivery Stream",
4-
"version": "1.2.0",
4+
"version": "1.3.0",
55
"dependencies": {
66
"async":"1.5.2",
77
"aws-kpl-deagg":"2.1.1"
@@ -28,4 +28,4 @@
2828
"LICENSE",
2929
"NOTICE.txt"
3030
]
31-
}
31+
}

0 commit comments

Comments
 (0)