aws-kinesis-writable
Advanced tools
Comparing version 2.0.0 to 4.0.0
406
index.js
@@ -1,7 +0,9 @@ | ||
var stream = require('stream'); | ||
var util = require('util'); | ||
var AWS = require('aws-sdk'); | ||
var retry = require('retry'); | ||
var _ = require('lodash'); | ||
const util = require('util'); | ||
const assert = require('assert'); | ||
const Writable = require('stream').Writable; | ||
const retry = require('retry'); | ||
const AWS = require('aws-sdk'); | ||
const merge = require('lodash.merge'); | ||
/** | ||
@@ -14,8 +16,4 @@ * [KinesisStream description] | ||
* @param {string} params.streamName AWS Knesis stream name | ||
* @param {string|function} params.partitionKey Constant string to use as partitionKey | ||
* or a function that return the partitionKey | ||
* based on a msg passed by argument | ||
* @param {object} [params.getCredentialsFromIAMRole=false] Explictly tells `aws-sdk` to get credentials using an IAM role | ||
* @param {function} params.partitionKey function that return the partitionKey based on a msg passed by argument | ||
* @param {object} [params.httpOptions={}] HTTP options that will be used on `aws-sdk` (e.g. timeout values) | ||
* @param {boolean|object} [params.buffer=true] | ||
* @param {number} [params.buffer.timeout] Max. number of seconds | ||
@@ -25,348 +23,138 @@ * to wait before send msgs to stream | ||
* before send them to stream. | ||
* @param {number} [params.buffer.maxBatchSize] Max. size in bytes of the batch sent to Kinesis. Default 5242880 (5MiB) | ||
* @param {@function} [params.buffer.isPrioritaryMsg] Evaluates a message and returns true | ||
* when msg is prioritary | ||
* @param {object} [params.retryConfiguration={}] | ||
* @param {number} [params.retryConfiguration.retries=0] Number of retries to perform after a failed attempt | ||
* @param {number} [params.retryConfiguration.factor=2] The exponential factor to use | ||
* @param {number} [params.retryConfiguration.minTimeout=1000] The number of milliseconds before starting the first retry | ||
* @param {boolean} [params.retryConfiguration.randomize=false] Randomizes the timeouts by multiplying with a factor between 1 to 2 | ||
* @param {@function} [params.buffer.isPrioritaryMsg] Evaluates a message and returns true if msg has priority (to be deprecated) | ||
* @param {@function} [params.buffer.hasPriority] Evaluates a message and returns true if msg has priority | ||
* @param {@function} [params.buffer.retry.retries] Attempts to be made to flush a batch | ||
* @param {@function} [params.buffer.retry.minTimeout] Min time to wait between attempts | ||
* @param {@function} [params.buffer.retry.maxTimeout] Max time to wait between attempts | ||
*/ | ||
var MAX_BATCH_SIZE = 5*1024*1024 - 1024; // 4.99MiB | ||
const defaultBuffer = { | ||
timeout: 5, | ||
length: 10, | ||
hasPriority: function() { | ||
return false; | ||
}, | ||
retry: { | ||
retries: 2, | ||
minTimeout: 300, | ||
maxTimeout: 500 | ||
} | ||
}; | ||
function KinesisStream (params) { | ||
stream.Writable.call(this, { objectMode: params.objectMode }); | ||
assert(params.streamName, 'streamName required'); | ||
var defaultBuffer = { | ||
timeout: 5, | ||
length: 10, | ||
maxBatchSize: MAX_BATCH_SIZE | ||
this.streamName = params.streamName; | ||
this.buffer = merge(defaultBuffer, params.buffer); | ||
this.partitionKey = params.partitionKey || function getPartitionKey() { | ||
return Date.now().toString(); | ||
}; | ||
this._retryConfiguration = params.retryConfiguration || { | ||
retries: 0, | ||
factor: 2, | ||
minTimeout: 1000, | ||
randomize: false | ||
}; | ||
this.hasPriority = this.buffer.isPrioritaryMsg || this.buffer.hasPriority; | ||
this._params = _.defaultsDeep(params || {}, { | ||
streamName: null, | ||
buffer: defaultBuffer, | ||
partitionKey: function() { // by default the partition key will generate | ||
return Date.now().toString(); // a "random" distribution between all shards | ||
} | ||
// increase the timeout to get credentials from the EC2 Metadata Service | ||
AWS.config.credentials = new AWS.EC2MetadataCredentials({ | ||
httpOptions: params.httpOptions || { timeout: 5000 } | ||
}); | ||
// partitionKey must be a string or a function | ||
if (!this._params.partitionKey || | ||
typeof this._params.partitionKey !== 'string' && | ||
typeof this._params.partitionKey !== 'function') { | ||
throw new Error("'partitionKey' property should be a string or a function."); | ||
} | ||
this.recordsQueue = []; | ||
// if partitionKey is a string, converts it to a function that allways returns the string | ||
if (typeof this._params.partitionKey === 'string') { | ||
this._params.partitionKey = _.constant(this._params.partitionKey); | ||
} | ||
this.kinesis = params.kinesis || new AWS.Kinesis({ | ||
accessKeyId: params.accessKeyId, | ||
secretAccessKey: params.secretAccessKey, | ||
region: params.region, | ||
httpOptions: params.httpOptions | ||
}); | ||
if (this._params.buffer && typeof this._params.buffer === 'boolean') { | ||
this._params.buffer = defaultBuffer; | ||
} | ||
Writable.call(this, { objectMode: params.objectMode }); | ||
} | ||
if (this._params.buffer) { | ||
this._queue = []; | ||
this._queueWait = this._queueSendEntries(); | ||
this._params.buffer.isPrioritaryMsg = this._params.buffer.isPrioritaryMsg; | ||
} | ||
util.inherits(KinesisStream, Writable); | ||
if (params.getCredentialsFromIAMRole) { | ||
// increase the timeout to get credentials from the EC2 Metadata Service | ||
AWS.config.credentials = new AWS.EC2MetadataCredentials({ | ||
httpOptions: params.httpOptions || { timeout: 5000 } | ||
}); | ||
function parseChunk(chunk) { | ||
if (Buffer.isBuffer(chunk) ) { | ||
chunk = chunk.toString(); | ||
} | ||
this._logger = console; | ||
this._kinesis = new AWS.Kinesis(_.pick(params, [ | ||
'accessKeyId', | ||
'secretAccessKey', | ||
'region', | ||
'httpOptions'])); | ||
if (typeof chunk === 'string') { | ||
chunk = JSON.parse(chunk); | ||
} | ||
return chunk; | ||
} | ||
util.inherits(KinesisStream, stream.Writable); | ||
KinesisStream.prototype._write = function(chunk, enc, next) { | ||
chunk = parseChunk(chunk); | ||
KinesisStream.prototype._emitError = function (records, err, attempts) { | ||
err.records = records; | ||
err.attempts = attempts; | ||
this.emit('error', err); | ||
}; | ||
const hasPriority = this.hasPriority(chunk); | ||
if (hasPriority) { | ||
this.recordsQueue.unshift(chunk); | ||
} else { | ||
this.recordsQueue.push(chunk); | ||
} | ||
KinesisStream.prototype._queueSendEntries = function () { | ||
var self = this; | ||
return setTimeout(function(){ | ||
self._sendEntries(); | ||
}, this._params.buffer.timeout * 1000); | ||
}; | ||
KinesisStream.prototype.setStreamName = function (streamName) { | ||
if (!streamName || typeof streamName !== 'string') { | ||
throw new Error('\'streamName\' must be a valid string.'); | ||
if (this.timer) { | ||
clearTimeout(this.timer); | ||
} | ||
this._params.streamName = streamName; | ||
}; | ||
KinesisStream.prototype.getStreamName = function () { | ||
return this._params.streamName; | ||
}; | ||
/** | ||
* Map a msg to a record structure | ||
* @param {@} msg - entry | ||
* @return {Record} { Data, PartitionKey, StreamName } | ||
*/ | ||
KinesisStream.prototype._mapEntry = function (msg, buffer) { | ||
if (buffer){ | ||
return new BufferedRecord(msg, this._params.partitionKey(msg)); | ||
if (this.recordsQueue.length >= this.buffer.length || hasPriority) { | ||
this.flush(); | ||
} else { | ||
return new NonBufferedRecord(msg, this._params.partitionKey(msg), this._params.streamName); | ||
this.timer = setTimeout(this.flush.bind(this), this.buffer.timeout * 1000); | ||
} | ||
}; | ||
var BufferedRecord = function(data, pk){ | ||
this.Data = data; | ||
this.PartitionKey = pk; | ||
return next(); | ||
}; | ||
var NonBufferedRecord = function(data, pk, streamName){ | ||
this.Data = data; | ||
this.PartitionKey = pk; | ||
this.StreamName = streamName; | ||
}; | ||
KinesisStream.prototype._sendEntries = function () { | ||
const pending_records = this._queue; | ||
const self = this; | ||
this._queue = []; | ||
this._batch_size = 0; | ||
if (pending_records.length === 0) { | ||
this._queueWait = this._queueSendEntries(); | ||
return; | ||
KinesisStream.prototype.dispatch = function(records, cb) { | ||
if (records.length === 0) { | ||
return cb ? cb() : null; | ||
} | ||
if (!self._params.streamName) { | ||
this.emit('error', new Error('Stream\'s name was not set.')); | ||
} | ||
const operation = retry.operation(this.buffer.retry); | ||
var requestContent = { | ||
StreamName: this._params.streamName, | ||
Records: pending_records | ||
}; | ||
const partitionKey = this.partitionKey(); | ||
this._putRecords(requestContent); | ||
}; | ||
const formattedRecords = records.map((record) => { | ||
return { Data: JSON.stringify(record), PartitionKey: partitionKey }; | ||
}); | ||
KinesisStream.prototype._putRecords = function(requestContent) { | ||
const self = this; | ||
var operation = this._getRetryOperation(); | ||
operation.attempt(function(currentAttempt) { | ||
try { | ||
var req = self._kinesis.putRecords(requestContent, function (err, result) { | ||
try { | ||
self._queueWait = self._queueSendEntries(); | ||
if (err) { | ||
if (!err.records) err.records = requestContent.Records; | ||
throw err; | ||
} | ||
if (result && result.FailedRecordCount) { | ||
result.Records | ||
.forEach(function (recordResult, index) { | ||
if (recordResult.ErrorCode) { | ||
recordResult.Record = requestContent.Records[index]; | ||
self.emit('errorRecord', recordResult); | ||
} | ||
}); | ||
} | ||
} catch(err) { | ||
if (operation.retry(err)) { | ||
return; | ||
} else { | ||
self._emitError(requestContent, err, currentAttempt); | ||
} | ||
} | ||
}) | ||
.on('complete', function() { | ||
req.removeAllListeners(); | ||
var response_stream = req.response.httpResponse.stream; | ||
if (response_stream) { | ||
response_stream.removeAllListeners(); | ||
} | ||
var request_stream = req.httpRequest.stream; | ||
if (request_stream) { | ||
request_stream.removeAllListeners(); | ||
} | ||
}); | ||
} catch(err) { | ||
operation.attempt(() => { | ||
this.putRecords(formattedRecords, (err) => { | ||
if (operation.retry(err)) { | ||
return; | ||
} else { | ||
self._emitError(requestContent, err, currentAttempt); | ||
} | ||
} | ||
}); | ||
}; | ||
KinesisStream.prototype._retryValidRecords = function(requestContent, err) { | ||
const self = this; | ||
if (err) { | ||
this.emitRecordError(err, records); | ||
} | ||
// By default asumes that all records failed | ||
var failedRecords = requestContent.Records; | ||
// try to find within the error, which records have failed. | ||
var failedRecordIndexes = getRecordIndexesFromError(err); | ||
if (failedRecordIndexes.length > 0) { | ||
// failed records found, extract them from collection of records | ||
failedRecords = _.pullAt(requestContent.Records, failedRecordIndexes); | ||
// now, try one more time with records that didn't fail. | ||
self._putRecords({ | ||
StreamName: requestContent.StreamName, | ||
Records: requestContent.Records | ||
if (cb) { | ||
return cb(err ? operation.mainError() : null); | ||
} | ||
}); | ||
} | ||
// send error with failed records | ||
err.streamName = requestContent.StreamName; | ||
err.records = failedRecords; | ||
self.emit('error', err); | ||
}); | ||
}; | ||
KinesisStream.prototype._write = function (chunk, encoding, done) { | ||
var self = this; | ||
if (!this._params.streamName) { | ||
return setImmediate (done, new Error('Stream\'s name was not set.')); | ||
} | ||
KinesisStream.prototype.putRecords = function(records, cb) { | ||
const req = this.kinesis.putRecords({ | ||
StreamName: this.streamName, | ||
Records: records | ||
}, cb); | ||
var isPrioMessage = this._params.buffer.isPrioritaryMsg; | ||
var operation = this._getRetryOperation(); | ||
operation.attempt(function(currentAttempt) { | ||
try { | ||
var obj, msg; | ||
if (Buffer.isBuffer(chunk)) { | ||
msg = chunk; | ||
if (isPrioMessage){ | ||
if (encoding === 'buffer'){ | ||
obj = JSON.parse(chunk.toString()); | ||
} else { | ||
obj = JSON.parse(chunk.toString(encoding)); | ||
} | ||
} | ||
} else if (typeof chunk === 'string') { | ||
msg = chunk; | ||
if (isPrioMessage){ | ||
obj = JSON.parse(chunk); | ||
} | ||
} else { | ||
obj = chunk; | ||
msg = JSON.stringify(obj); | ||
} | ||
var record = self._mapEntry(msg, !!self._params.buffer); | ||
if (self._params.buffer) { | ||
// sends buffer when current current record will exceed bmax batch size | ||
self._batch_size = (self._batch_size || 0) + msg.length; | ||
if (self._batch_size > self._params.buffer.maxBatchSize) { | ||
clearTimeout(self._queueWait); | ||
self._sendEntries(); | ||
} | ||
self._queue.push(record); | ||
// sends buffer when current chunk is for prioritary entry | ||
var shouldSendEntries = self._queue.length >= self._params.buffer.length || // queue reached max size | ||
(isPrioMessage && isPrioMessage(obj)); // msg is prioritary | ||
if (shouldSendEntries) { | ||
clearTimeout(self._queueWait); | ||
self._sendEntries(); | ||
} | ||
return setImmediate(done); | ||
} | ||
var req = self._kinesis.putRecord(record, function (err) { | ||
if (err) { | ||
err.streamName = record.StreamName; | ||
err.records = [ _.omit(record, 'StreamName') ]; | ||
} | ||
throw err; | ||
}) | ||
.on('complete', function() { | ||
req.removeAllListeners(); | ||
req.response.httpResponse.stream.removeAllListeners(); | ||
req.httpRequest.stream.removeAllListeners(); | ||
}); | ||
} catch(err) { | ||
if (operation.retry(err)) { | ||
return; | ||
} else { | ||
self._emitError(err.records, err, currentAttempt); | ||
setImmediate(done, err); | ||
} | ||
} | ||
// remove all listeners which end up leaking | ||
req.on('complete', function() { | ||
req.removeAllListeners(); | ||
req.response.httpResponse.stream.removeAllListeners(); | ||
req.httpRequest.stream.removeAllListeners(); | ||
}); | ||
}; | ||
KinesisStream.prototype.stop = function () { | ||
clearTimeout(this._queueWait); | ||
this._queue = []; | ||
KinesisStream.prototype.flush = function() { | ||
this.dispatch(this.recordsQueue.splice(0, this.buffer.length)); | ||
}; | ||
KinesisStream.prototype._getRetryOperation = function () { | ||
if (this._retryConfiguration && this._retryConfiguration.retries > 0) { | ||
return retry.operation(this._retryConfiguration); | ||
} else { | ||
return { | ||
attempt: function(cb) { | ||
return cb(1); | ||
}, | ||
retry: function () { | ||
return false; | ||
} | ||
}; | ||
} | ||
KinesisStream.prototype.emitRecordError = function (err, records) { | ||
err.records = records; | ||
this.emit('error', err); | ||
}; | ||
module.exports = KinesisStream; | ||
module.exports.pool = require('./pool'); | ||
const RECORD_REGEXP = /records\.(\d+)\.member\.data/g; | ||
function getRecordIndexesFromError (err) { | ||
var matches = []; | ||
if (!err) return matches; | ||
if (err && _.isString(err.message)) { | ||
var match = RECORD_REGEXP.exec(err.message); | ||
while (match !== null) { | ||
matches.push(parseInt(match[1], 10) - 1); | ||
match = RECORD_REGEXP.exec(err.message); | ||
} | ||
} | ||
return matches; | ||
} |
{ | ||
"name": "aws-kinesis-writable", | ||
"description": "A bunyan stream for kinesis.", | ||
"version": "2.0.0", | ||
"description": "A stream implementation for kinesis.", | ||
"version": "4.0.0", | ||
"author": "José F. Romaniello <jfromaniello@gmail.com> (http://joseoncode.com)", | ||
@@ -15,12 +15,12 @@ "repository": { | ||
"dependencies": { | ||
"aws-sdk": "^2.4.3", | ||
"lodash": "~3.10.1", | ||
"retry": "^0.9.0" | ||
"aws-sdk": "^2.94.0", | ||
"lodash.merge": "^4.6.0", | ||
"retry": "^0.10.1" | ||
}, | ||
"devDependencies": { | ||
"chai": "~2.3.0", | ||
"istanbul": "^0.3.19", | ||
"mocha": "~2.2.5", | ||
"sinon": "^1.16.1" | ||
"chai": "^4.1.0", | ||
"istanbul": "^0.4.5", | ||
"mocha": "^3.5.0", | ||
"sinon": "^3.0.0" | ||
} | ||
} |
@@ -45,3 +45,3 @@ Kinesis writable stream for [bunyan](http://npmjs.com/package/bunyan). | ||
lenght: 100, // or when 100 messages are in the queue | ||
isPrioritaryMsg: function (msg) { // or the message has a type > 40 | ||
hasPriority: function (msg) { // or the message has a type > 40 | ||
var entry = JSON.parse(msg); | ||
@@ -69,48 +69,8 @@ return entry.type > 40; | ||
### Methods | ||
* `getStreamName()`: returns the stream's name. | ||
* `setStreamName(name)`: set the name of the stream where messages will be send. | ||
### Events | ||
* `errorRecord`: Emitted once for each failed record at the `aws.kinesis.putRecords`'s response. | ||
* `error`: Emitted every time an uncaught is thrown. | ||
* `error`: Emitted every time records are failed to be written. | ||
**Note**: Amazon Credentials are not required. It will either use the environment variables, `~/.aws/credentials` or roles as every other aws sdk. | ||
## Kinesis Pool | ||
You can write to a pool of kinesis streams: only one of them will be used at a time, and the current stream will automatically switch to another in case of an error. This can be used as a failover mechanism in case Kinesis fails in one AWS region. | ||
### Usage: | ||
```javascript | ||
var streams = [ | ||
new KinesisWritable({ | ||
region: 'us-west-2', | ||
streamName: 'foo' | ||
}), | ||
new KinesisWritable({ | ||
region: 'us-east-1', | ||
streamName: 'bar' | ||
}), | ||
new KinesisWritable({ | ||
region: 'sa-east-1', | ||
streamName: 'baz' | ||
}) | ||
]; | ||
streams[0].primary = true; | ||
var kinesis = new KinesisStreamPool({ | ||
streams: streams | ||
}); | ||
process.stdin.resume(); | ||
process.stdin.pipe(kinesis); | ||
``` | ||
### Events | ||
* `poolFailure`: Emitted every time you try to write and no stream is available. | ||
## Issue Reporting | ||
@@ -117,0 +77,0 @@ |
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
New author
Supply chain riskA new npm collaborator published a version of the package for the first time. New collaborators are usually benign additions to a project, but do indicate a change to the security surface area of a package.
Found 1 instance in 1 package
New author
Supply chain riskA new npm collaborator published a version of the package for the first time. New collaborators are usually benign additions to a project, but do indicate a change to the security surface area of a package.
Found 1 instance in 1 package
13671
7
263
85
2
+ Addedlodash.merge@^4.6.0
+ Addedlodash.merge@4.6.2(transitive)
+ Addedretry@0.10.1(transitive)
- Removedlodash@~3.10.1
- Removedlodash@3.10.1(transitive)
- Removedretry@0.9.0(transitive)
Updatedaws-sdk@^2.94.0
Updatedretry@^0.10.1