New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

aws-kinesis-writable

Package Overview
Dependencies
Maintainers
5
Versions
31
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

aws-kinesis-writable - npm Package Compare versions

Comparing version 2.0.0 to 4.0.0

.eslintrc.json

406

index.js

@@ -1,7 +0,9 @@

var stream = require('stream');
var util = require('util');
var AWS = require('aws-sdk');
var retry = require('retry');
var _ = require('lodash');
const util = require('util');
const assert = require('assert');
const Writable = require('stream').Writable;
const retry = require('retry');
const AWS = require('aws-sdk');
const merge = require('lodash.merge');
/**

@@ -14,8 +16,4 @@ * [KinesisStream description]

* @param {string} params.streamName AWS Knesis stream name
* @param {string|function} params.partitionKey Constant string to use as partitionKey
* or a function that return the partitionKey
* based on a msg passed by argument
* @param {object} [params.getCredentialsFromIAMRole=false] Explictly tells `aws-sdk` to get credentials using an IAM role
* @param {function} params.partitionKey function that return the partitionKey based on a msg passed by argument
* @param {object} [params.httpOptions={}] HTTP options that will be used on `aws-sdk` (e.g. timeout values)
* @param {boolean|object} [params.buffer=true]
* @param {number} [params.buffer.timeout] Max. number of seconds

@@ -25,348 +23,138 @@ * to wait before send msgs to stream

* before send them to stream.
* @param {number} [params.buffer.maxBatchSize] Max. size in bytes of the batch sent to Kinesis. Default 5242880 (5MiB)
* @param {@function} [params.buffer.isPrioritaryMsg] Evaluates a message and returns true
* when msg is prioritary
* @param {object} [params.retryConfiguration={}]
* @param {number} [params.retryConfiguration.retries=0] Number of retries to perform after a failed attempt
* @param {number} [params.retryConfiguration.factor=2] The exponential factor to use
* @param {number} [params.retryConfiguration.minTimeout=1000] The number of milliseconds before starting the first retry
* @param {boolean} [params.retryConfiguration.randomize=false] Randomizes the timeouts by multiplying with a factor between 1 to 2
* @param {@function} [params.buffer.isPrioritaryMsg] Evaluates a message and returns true if msg has priority (to be deprecated)
* @param {@function} [params.buffer.hasPriority] Evaluates a message and returns true if msg has priority
* @param {@function} [params.buffer.retry.retries] Attempts to be made to flush a batch
* @param {@function} [params.buffer.retry.minTimeout] Min time to wait between attempts
* @param {@function} [params.buffer.retry.maxTimeout] Max time to wait between attempts
*/
var MAX_BATCH_SIZE = 5*1024*1024 - 1024; // 4.99MiB
const defaultBuffer = {
timeout: 5,
length: 10,
hasPriority: function() {
return false;
},
retry: {
retries: 2,
minTimeout: 300,
maxTimeout: 500
}
};
function KinesisStream (params) {
stream.Writable.call(this, { objectMode: params.objectMode });
assert(params.streamName, 'streamName required');
var defaultBuffer = {
timeout: 5,
length: 10,
maxBatchSize: MAX_BATCH_SIZE
this.streamName = params.streamName;
this.buffer = merge(defaultBuffer, params.buffer);
this.partitionKey = params.partitionKey || function getPartitionKey() {
return Date.now().toString();
};
this._retryConfiguration = params.retryConfiguration || {
retries: 0,
factor: 2,
minTimeout: 1000,
randomize: false
};
this.hasPriority = this.buffer.isPrioritaryMsg || this.buffer.hasPriority;
this._params = _.defaultsDeep(params || {}, {
streamName: null,
buffer: defaultBuffer,
partitionKey: function() { // by default the partition key will generate
return Date.now().toString(); // a "random" distribution between all shards
}
// increase the timeout to get credentials from the EC2 Metadata Service
AWS.config.credentials = new AWS.EC2MetadataCredentials({
httpOptions: params.httpOptions || { timeout: 5000 }
});
// partitionKey must be a string or a function
if (!this._params.partitionKey ||
typeof this._params.partitionKey !== 'string' &&
typeof this._params.partitionKey !== 'function') {
throw new Error("'partitionKey' property should be a string or a function.");
}
this.recordsQueue = [];
// if partitionKey is a string, converts it to a function that allways returns the string
if (typeof this._params.partitionKey === 'string') {
this._params.partitionKey = _.constant(this._params.partitionKey);
}
this.kinesis = params.kinesis || new AWS.Kinesis({
accessKeyId: params.accessKeyId,
secretAccessKey: params.secretAccessKey,
region: params.region,
httpOptions: params.httpOptions
});
if (this._params.buffer && typeof this._params.buffer === 'boolean') {
this._params.buffer = defaultBuffer;
}
Writable.call(this, { objectMode: params.objectMode });
}
if (this._params.buffer) {
this._queue = [];
this._queueWait = this._queueSendEntries();
this._params.buffer.isPrioritaryMsg = this._params.buffer.isPrioritaryMsg;
}
util.inherits(KinesisStream, Writable);
if (params.getCredentialsFromIAMRole) {
// increase the timeout to get credentials from the EC2 Metadata Service
AWS.config.credentials = new AWS.EC2MetadataCredentials({
httpOptions: params.httpOptions || { timeout: 5000 }
});
function parseChunk(chunk) {
if (Buffer.isBuffer(chunk) ) {
chunk = chunk.toString();
}
this._logger = console;
this._kinesis = new AWS.Kinesis(_.pick(params, [
'accessKeyId',
'secretAccessKey',
'region',
'httpOptions']));
if (typeof chunk === 'string') {
chunk = JSON.parse(chunk);
}
return chunk;
}
util.inherits(KinesisStream, stream.Writable);
KinesisStream.prototype._write = function(chunk, enc, next) {
chunk = parseChunk(chunk);
KinesisStream.prototype._emitError = function (records, err, attempts) {
err.records = records;
err.attempts = attempts;
this.emit('error', err);
};
const hasPriority = this.hasPriority(chunk);
if (hasPriority) {
this.recordsQueue.unshift(chunk);
} else {
this.recordsQueue.push(chunk);
}
KinesisStream.prototype._queueSendEntries = function () {
var self = this;
return setTimeout(function(){
self._sendEntries();
}, this._params.buffer.timeout * 1000);
};
KinesisStream.prototype.setStreamName = function (streamName) {
if (!streamName || typeof streamName !== 'string') {
throw new Error('\'streamName\' must be a valid string.');
if (this.timer) {
clearTimeout(this.timer);
}
this._params.streamName = streamName;
};
KinesisStream.prototype.getStreamName = function () {
return this._params.streamName;
};
/**
* Map a msg to a record structure
* @param {@} msg - entry
* @return {Record} { Data, PartitionKey, StreamName }
*/
KinesisStream.prototype._mapEntry = function (msg, buffer) {
if (buffer){
return new BufferedRecord(msg, this._params.partitionKey(msg));
if (this.recordsQueue.length >= this.buffer.length || hasPriority) {
this.flush();
} else {
return new NonBufferedRecord(msg, this._params.partitionKey(msg), this._params.streamName);
this.timer = setTimeout(this.flush.bind(this), this.buffer.timeout * 1000);
}
};
var BufferedRecord = function(data, pk){
this.Data = data;
this.PartitionKey = pk;
return next();
};
var NonBufferedRecord = function(data, pk, streamName){
this.Data = data;
this.PartitionKey = pk;
this.StreamName = streamName;
};
KinesisStream.prototype._sendEntries = function () {
const pending_records = this._queue;
const self = this;
this._queue = [];
this._batch_size = 0;
if (pending_records.length === 0) {
this._queueWait = this._queueSendEntries();
return;
KinesisStream.prototype.dispatch = function(records, cb) {
if (records.length === 0) {
return cb ? cb() : null;
}
if (!self._params.streamName) {
this.emit('error', new Error('Stream\'s name was not set.'));
}
const operation = retry.operation(this.buffer.retry);
var requestContent = {
StreamName: this._params.streamName,
Records: pending_records
};
const partitionKey = this.partitionKey();
this._putRecords(requestContent);
};
const formattedRecords = records.map((record) => {
return { Data: JSON.stringify(record), PartitionKey: partitionKey };
});
KinesisStream.prototype._putRecords = function(requestContent) {
const self = this;
var operation = this._getRetryOperation();
operation.attempt(function(currentAttempt) {
try {
var req = self._kinesis.putRecords(requestContent, function (err, result) {
try {
self._queueWait = self._queueSendEntries();
if (err) {
if (!err.records) err.records = requestContent.Records;
throw err;
}
if (result && result.FailedRecordCount) {
result.Records
.forEach(function (recordResult, index) {
if (recordResult.ErrorCode) {
recordResult.Record = requestContent.Records[index];
self.emit('errorRecord', recordResult);
}
});
}
} catch(err) {
if (operation.retry(err)) {
return;
} else {
self._emitError(requestContent, err, currentAttempt);
}
}
})
.on('complete', function() {
req.removeAllListeners();
var response_stream = req.response.httpResponse.stream;
if (response_stream) {
response_stream.removeAllListeners();
}
var request_stream = req.httpRequest.stream;
if (request_stream) {
request_stream.removeAllListeners();
}
});
} catch(err) {
operation.attempt(() => {
this.putRecords(formattedRecords, (err) => {
if (operation.retry(err)) {
return;
} else {
self._emitError(requestContent, err, currentAttempt);
}
}
});
};
KinesisStream.prototype._retryValidRecords = function(requestContent, err) {
const self = this;
if (err) {
this.emitRecordError(err, records);
}
// By default asumes that all records failed
var failedRecords = requestContent.Records;
// try to find within the error, which records have failed.
var failedRecordIndexes = getRecordIndexesFromError(err);
if (failedRecordIndexes.length > 0) {
// failed records found, extract them from collection of records
failedRecords = _.pullAt(requestContent.Records, failedRecordIndexes);
// now, try one more time with records that didn't fail.
self._putRecords({
StreamName: requestContent.StreamName,
Records: requestContent.Records
if (cb) {
return cb(err ? operation.mainError() : null);
}
});
}
// send error with failed records
err.streamName = requestContent.StreamName;
err.records = failedRecords;
self.emit('error', err);
});
};
KinesisStream.prototype._write = function (chunk, encoding, done) {
var self = this;
if (!this._params.streamName) {
return setImmediate (done, new Error('Stream\'s name was not set.'));
}
KinesisStream.prototype.putRecords = function(records, cb) {
const req = this.kinesis.putRecords({
StreamName: this.streamName,
Records: records
}, cb);
var isPrioMessage = this._params.buffer.isPrioritaryMsg;
var operation = this._getRetryOperation();
operation.attempt(function(currentAttempt) {
try {
var obj, msg;
if (Buffer.isBuffer(chunk)) {
msg = chunk;
if (isPrioMessage){
if (encoding === 'buffer'){
obj = JSON.parse(chunk.toString());
} else {
obj = JSON.parse(chunk.toString(encoding));
}
}
} else if (typeof chunk === 'string') {
msg = chunk;
if (isPrioMessage){
obj = JSON.parse(chunk);
}
} else {
obj = chunk;
msg = JSON.stringify(obj);
}
var record = self._mapEntry(msg, !!self._params.buffer);
if (self._params.buffer) {
// sends buffer when current current record will exceed bmax batch size
self._batch_size = (self._batch_size || 0) + msg.length;
if (self._batch_size > self._params.buffer.maxBatchSize) {
clearTimeout(self._queueWait);
self._sendEntries();
}
self._queue.push(record);
// sends buffer when current chunk is for prioritary entry
var shouldSendEntries = self._queue.length >= self._params.buffer.length || // queue reached max size
(isPrioMessage && isPrioMessage(obj)); // msg is prioritary
if (shouldSendEntries) {
clearTimeout(self._queueWait);
self._sendEntries();
}
return setImmediate(done);
}
var req = self._kinesis.putRecord(record, function (err) {
if (err) {
err.streamName = record.StreamName;
err.records = [ _.omit(record, 'StreamName') ];
}
throw err;
})
.on('complete', function() {
req.removeAllListeners();
req.response.httpResponse.stream.removeAllListeners();
req.httpRequest.stream.removeAllListeners();
});
} catch(err) {
if (operation.retry(err)) {
return;
} else {
self._emitError(err.records, err, currentAttempt);
setImmediate(done, err);
}
}
// remove all listeners which end up leaking
req.on('complete', function() {
req.removeAllListeners();
req.response.httpResponse.stream.removeAllListeners();
req.httpRequest.stream.removeAllListeners();
});
};
KinesisStream.prototype.stop = function () {
clearTimeout(this._queueWait);
this._queue = [];
KinesisStream.prototype.flush = function() {
this.dispatch(this.recordsQueue.splice(0, this.buffer.length));
};
KinesisStream.prototype._getRetryOperation = function () {
if (this._retryConfiguration && this._retryConfiguration.retries > 0) {
return retry.operation(this._retryConfiguration);
} else {
return {
attempt: function(cb) {
return cb(1);
},
retry: function () {
return false;
}
};
}
KinesisStream.prototype.emitRecordError = function (err, records) {
err.records = records;
this.emit('error', err);
};
module.exports = KinesisStream;
module.exports.pool = require('./pool');
const RECORD_REGEXP = /records\.(\d+)\.member\.data/g;
function getRecordIndexesFromError (err) {
var matches = [];
if (!err) return matches;
if (err && _.isString(err.message)) {
var match = RECORD_REGEXP.exec(err.message);
while (match !== null) {
matches.push(parseInt(match[1], 10) - 1);
match = RECORD_REGEXP.exec(err.message);
}
}
return matches;
}
{
"name": "aws-kinesis-writable",
"description": "A bunyan stream for kinesis.",
"version": "2.0.0",
"description": "A stream implementation for kinesis.",
"version": "4.0.0",
"author": "José F. Romaniello <jfromaniello@gmail.com> (http://joseoncode.com)",

@@ -15,12 +15,12 @@ "repository": {

"dependencies": {
"aws-sdk": "^2.4.3",
"lodash": "~3.10.1",
"retry": "^0.9.0"
"aws-sdk": "^2.94.0",
"lodash.merge": "^4.6.0",
"retry": "^0.10.1"
},
"devDependencies": {
"chai": "~2.3.0",
"istanbul": "^0.3.19",
"mocha": "~2.2.5",
"sinon": "^1.16.1"
"chai": "^4.1.0",
"istanbul": "^0.4.5",
"mocha": "^3.5.0",
"sinon": "^3.0.0"
}
}

@@ -45,3 +45,3 @@ Kinesis writable stream for [bunyan](http://npmjs.com/package/bunyan).

lenght: 100, // or when 100 messages are in the queue
isPrioritaryMsg: function (msg) { // or the message has a type > 40
hasPriority: function (msg) { // or the message has a type > 40
var entry = JSON.parse(msg);

@@ -69,48 +69,8 @@ return entry.type > 40;

### Methods
* `getStreamName()`: returns the stream's name.
* `setStreamName(name)`: set the name of the stream where messages will be send.
### Events
* `errorRecord`: Emitted once for each failed record at the `aws.kinesis.putRecords`'s response.
* `error`: Emitted every time an uncaught is thrown.
* `error`: Emitted every time records are failed to be written.
**Note**: Amazon Credentials are not required. It will either use the environment variables, `~/.aws/credentials` or roles as every other aws sdk.
## Kinesis Pool
You can write to a pool of kinesis streams: only one of them will be used at a time, and the current stream will automatically switch to another in case of an error. This can be used as a failover mechanism in case Kinesis fails in one AWS region.
### Usage:
```javascript
var streams = [
new KinesisWritable({
region: 'us-west-2',
streamName: 'foo'
}),
new KinesisWritable({
region: 'us-east-1',
streamName: 'bar'
}),
new KinesisWritable({
region: 'sa-east-1',
streamName: 'baz'
})
];
streams[0].primary = true;
var kinesis = new KinesisStreamPool({
streams: streams
});
process.stdin.resume();
process.stdin.pipe(kinesis);
```
### Events
* `poolFailure`: Emitted every time you try to write and no stream is available.
## Issue Reporting

@@ -117,0 +77,0 @@

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc