Security News
Fluent Assertions Faces Backlash After Abandoning Open Source Licensing
Fluent Assertions is facing backlash after dropping the Apache license for a commercial model, leaving users blindsided and questioning contributor rights.
aws-kpl-deagg
Advanced tools
Node.js module to simplify processing of Amazon Kinesis Records which have been created with the Kinesis Producer Library
The Amazon Kinesis Producer Library (KPL) gives you the ability to write data to Amazon Kinesis with a highly efficient, asyncronous delivery model that can improve performance. When you write to the Producer, you can also elect to turn on Aggregation, which writes multiple producer events to a single Kinesis Record, aggregating lots of smaller events into a 1MB record. When you use Aggregation, the KPL serialises data to the Kinesis stream using Google Protocol Buffers, and consumer applications must be able to deserialise this protobuf message. This module gives you the ability to process KPL serialised data using any Node.js consumer, including AWS Lambda, the Node.js KCL, a multi-lang KCL application using Node.js or even an Express.js application running in Elastic Beanstalk. The Java KCL also provides the ability to automatically deaggregate and process KPL encoded data.
The Node KPL Deaggregation module provides a simple interface for working with KPL encoded data in a consumer application. You can easily integrate it into existing applications that do not yet support KPL Aggregation, and the programming model provides for both synchronous and asyncronous processing.
When using deaggregation, you provide a Kinesis Record, and get back multiple Kinesis User Records. If a Kinesis Record that is provided is not a KPL encoded message, that's perfectly fine - you'll just get a single record output from the single record input. A Kinesis User Record which is returned from the kpl-deagg looks like:
{
partitionKey : String - The Partition Key provided when the record was submitted
explicitPartitionKey : String - The Partition Key provided for the purposes of Shard Allocation
sequenceNumber : BigInt - The sequence number assigned to the record on submission to Kinesis
subSequenceNumber : Int - The sub-sequence number for the User Record in the aggregated record, if aggregation was in use by the producer
data : Buffer - The original data transmitted by the producer (base64 encoded)
}
To get started, take your existing Node.js based Kinesis consumer, and include the node-kpl-deagg module from npm:
var deagg = require('kpl-deagg');
Next, when you receive a Kinesis Record in your consumer application, you will extract the User Records using deaggregation methods in the kpl-deagg module:
The syncronous model of deaggregation extracts all the Kinesis User Records from the received Kinesis Record, and accumulates them into an array. The method then makes a callback with any errors encountered, and the array of User Records that were deaggregated:
deaggregateSync = function(kinesisRecord, afterRecordCallback(err, UserRecord[]);
The asyncronous model of deaggregation allows you to provide a callback which is invoked for each User Record that is extracted from the Kinesis Record. When all User Records have been extracted from the Kinesis Record, an afterRecordCallback
is invoked which allows you to continue processing additional Kinesis Records that your consumer received:
deaggregate = function(kinesisRecord, perRecordCallback(err, UserRecord), afterRecordCallback(err, errorKinesisRecord));
If any errors are encountered during processing of the perRecordCallback
, then the afterRecordCallback
is called with the err
plus an error Record which contains the failing subSequenceNumber from the aggregated data, with details about the enclosing Kinesis Record:
{
partitionKey : String - The Partition Key of the enclosing Kinesis Aggregated Record
explicitPartitionKey : String - The Partition Key provided for the purposes of Shard Allocation
sequenceNumber : BigInt - The sequence number assigned to the record on submission of the Aggregated Record by the KPL
subSequenceNumber : Int - The sub-sequence number for the failing User Record in the aggregated record, if aggregation was in use by the producer
data : Buffer - The original protobuf message transmitted by the producer (base64 encoded)
}
This module includes an example AWS Lambda function in the index.js file (link), which gives you easy ability to build new functions to process KPL encoded data. Both examples use async.js (link) to process the received Kinesis Records.
/**
* Example lambda function which uses the KPL syncronous deaggregation
* interface to process Kinesis Records from the Event Source
*/
exports.exampleSync = function(event, context) {
console.log("Processing KPL Aggregated Messages using kpl-deagg(sync)");
handleNoProcess(event, function() {
console.log("Processing " + event.Records.length + " Kinesis Input Records");
var totalUserRecords = 0;
async.map(event.Records, function(record, asyncCallback) {
// use the deaggregateSync interface which receives a single
// callback with an error and an array of Kinesis Records
deagg.deaggregateSync(record.kinesis, function(err, userRecords) {
if (err) {
console.log(err);
asyncCallback(err);
} else {
console.log("Received " + userRecords.length + " Kinesis User Records");
totalUserRecords += userRecords.length;
userRecords.map(function(record) {
var recordData = new Buffer(record.data, 'base64');
console.log("Kinesis Aggregated User Record: " + JSON.stringify(record) + " " + recordData.toString('ascii'));
// you can do something else useful with each kinesis user record here
});
asyncCallback(err);
}
});
}, function(err, results) {
if (common.debug) {
console.log("Completed processing " + totalUserRecords + " Kinesis User Records");
}
if (err) {
finish(event, context, error, err);
} else {
finish(event, context, ok, "Success");
}
});
});
};
This example accumulates User Records into an enclosing array, in a similar way to how the syncronous interface works:
/**
* Example lambda function which uses the KPL asyncronous deaggregation
* interface to process Kinesis Records from the Event Source
*/
exports.exampleAsync = function(event, context) {
console.log("Processing KPL Aggregated Messages using kpl-deagg(async)");
handleNoProcess(event, function() {
// process all records in parallel
var realRecords = [];
console.log("Processing " + event.Records.length + " Kinesis Input Records");
async.map(event.Records, function(record, asyncCallback) {
// use the async deaggregate interface. the per-record callback
// appends the records to an array, and the finally callback calls
// the async callback to mark the kinesis record as completed within
// async.js
deagg.deaggregate(record.kinesis, function(err, userRecord) {
if (err) {
console.log("Error on Record: " + err);
asyncCallback(err);
} else {
var recordData = new Buffer(userRecord.data, 'base64');
console.log("Per Record Callback Invoked with Record: " + JSON.stringify(userRecord) + " " + recordData.toString('ascii'));
realRecords.push(userRecord);
// you can do something else useful with each kinesis user record here
}
}, function(err) {
console.log(err);
asyncCallback(err);
});
}, function(err, results) {
if (common.debug) {
console.log("Kinesis Record Processing Completed");
console.log("Processed " + realRecords.length + " Kinesis User Records");
}
if (err) {
finish(event, context, error, err);
} else {
finish(event, context, ok, "Success");
}
});
});
};
Copyright 2014-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
Licensed under the Amazon Software License (the "License"). You may not use this file except in compliance with the License. A copy of the License is located at
http://aws.amazon.com/asl/
or in the "license" file accompanying this file. This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, express or implied. See the License for the specific language governing permissions and limitations under the License.
FAQs
Node.js module to simplify processing of Amazon Kinesis Records which have been created with the Kinesis Producer Library
We found that aws-kpl-deagg demonstrated a not healthy version release cadence and project activity because the last version was released a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Security News
Fluent Assertions is facing backlash after dropping the Apache license for a commercial model, leaving users blindsided and questioning contributor rights.
Research
Security News
Socket researchers uncover the risks of a malicious Python package targeting Discord developers.
Security News
The UK is proposing a bold ban on ransomware payments by public entities to disrupt cybercrime, protect critical services, and lead global cybersecurity efforts.