aws-kinesis-writable
Advanced tools
Comparing version 1.3.1 to 1.4.0
95
index.js
var stream = require('stream'); | ||
var util = require('util'); | ||
var AWS = require('aws-sdk'); | ||
var immediate = require('immediate-invocation'); | ||
var _ = require('lodash'); | ||
@@ -26,3 +25,3 @@ | ||
function KinesisStream (params) { | ||
stream.Writable.call(this); | ||
stream.Writable.call(this, { objectMode: params.objectMode }); | ||
@@ -35,3 +34,3 @@ var defaultBuffer = { | ||
this._params = _.defaultsDeep(params || {}, { | ||
streamName: null, | ||
streamName: null, | ||
buffer: defaultBuffer, | ||
@@ -61,7 +60,11 @@ partitionKey: function() { // by default the partition key will generate | ||
this._queue = []; | ||
this._queueWait = setTimeout(this._sendEntries.bind(this), this._params.buffer.timeout * 1000); | ||
this._params.buffer.isPrioritaryMsg = this._params.buffer.isPrioritaryMsg || _.noop; | ||
this._queueWait = this._queueSendEntries(); | ||
this._params.buffer.isPrioritaryMsg = this._params.buffer.isPrioritaryMsg; | ||
} | ||
this._kinesis = new AWS.Kinesis(_.pick(params, ['accessKeyId', 'secretAccessKey', 'region'])); | ||
this._kinesis = new AWS.Kinesis(_.pick(params, [ | ||
'accessKeyId', | ||
'secretAccessKey', | ||
'region', | ||
'httpOptions'])); | ||
} | ||
@@ -71,2 +74,9 @@ | ||
KinesisStream.prototype._queueSendEntries = function () { | ||
var self = this; | ||
return setTimeout(function(){ | ||
self._sendEntries(); | ||
}, this._params.buffer.timeout * 1000); | ||
}; | ||
KinesisStream.prototype.setStreamName = function (streamName) { | ||
@@ -88,13 +98,25 @@ | ||
* @param {@} msg - entry | ||
* @return {Object} { Data, PartitionKey } | ||
* @return {Record} { Data, PartitionKey, StreamName } | ||
*/ | ||
KinesisStream.prototype._mapEntry = function (msg) { | ||
return { | ||
Data: msg, | ||
PartitionKey: this._params.partitionKey(msg) | ||
}; | ||
KinesisStream.prototype._mapEntry = function (msg, buffer) { | ||
if (buffer){ | ||
return new BufferedRecord(msg, this._params.partitionKey(msg)); | ||
} else { | ||
return new NonBufferedRecord(msg, this._params.partitionKey(msg), this._params.streamName); | ||
} | ||
}; | ||
var BufferedRecord = function(data, pk){ | ||
this.Data = data; | ||
this.PartitionKey = pk; | ||
}; | ||
var NonBufferedRecord = function(data, pk, streamName){ | ||
this.Data = data; | ||
this.PartitionKey = pk; | ||
this.StreamName = streamName; | ||
}; | ||
KinesisStream.prototype._sendEntries = function () { | ||
const pending_records = _.clone(this._queue); | ||
const pending_records = this._queue; | ||
const self = this; | ||
@@ -104,3 +126,3 @@ this._queue = []; | ||
if (pending_records.length === 0) { | ||
self._queueWait = setTimeout(self._sendEntries.bind(self), self._params.buffer.timeout * 1000); | ||
this._queueWait = this._queueSendEntries(); | ||
return; | ||
@@ -110,3 +132,3 @@ } | ||
if (!self._params.streamName) { | ||
self.emit('error', new Error('Stream\'s name was not set.')); | ||
this.emit('error', new Error('Stream\'s name was not set.')); | ||
} | ||
@@ -119,4 +141,4 @@ | ||
self._kinesis.putRecords(requestContent, function (err, result) { | ||
self._queueWait = setTimeout(self._sendEntries.bind(self), self._params.buffer.timeout * 1000); | ||
this._kinesis.putRecords(requestContent, function (err, result) { | ||
self._queueWait = self._queueSendEntries(); | ||
if (err) { | ||
@@ -138,19 +160,34 @@ return self.emit('error', err); | ||
KinesisStream.prototype._write = function (chunk, encoding, done) { | ||
var self = this; | ||
if (!this._params.streamName) { | ||
return immediate (done, new Error('Stream\'s name was not set.')); | ||
return setImmediate (done, new Error('Stream\'s name was not set.')); | ||
} | ||
var isPrioMessage = this._params.buffer.isPrioritaryMsg; | ||
try { | ||
var msg = chunk.toString(); | ||
var record = self._mapEntry(msg); | ||
var obj, msg; | ||
if (Buffer.isBuffer(chunk)) { | ||
msg = chunk; | ||
if (isPrioMessage){ | ||
obj = JSON.parse(chunk.toString(encoding)); | ||
} | ||
} else if (typeof chunk === 'string') { | ||
msg = chunk; | ||
if (isPrioMessage){ | ||
obj = JSON.parse(chunk); | ||
} | ||
} else { | ||
obj = chunk; | ||
msg = JSON.stringify(obj); | ||
} | ||
var record = this._mapEntry(msg, !!this._params.buffer); | ||
if (this._params.buffer) { | ||
this._queue.push(record); | ||
// sends buffer when msg is prioritary | ||
// sends buffer when current chunk is for prioritary entry | ||
var shouldSendEntries = this._queue.length >= this._params.buffer.length || // queue reached max size | ||
this._params.buffer.isPrioritaryMsg(msg); // msg is prioritary | ||
(isPrioMessage && isPrioMessage(obj)); // msg is prioritary | ||
if (shouldSendEntries) { | ||
@@ -160,12 +197,10 @@ clearTimeout(this._queueWait); | ||
} | ||
return immediate(done); | ||
return setImmediate(done); | ||
} | ||
self._kinesis.putRecord(_.extend({ | ||
StreamName: self._params.streamName | ||
}, record), function (err) { | ||
this._kinesis.putRecord(record, function (err) { | ||
done(err); | ||
}); | ||
} catch (e) { | ||
immediate(done, e); | ||
setImmediate(done, e); | ||
} | ||
@@ -172,0 +207,0 @@ }; |
{ | ||
"name": "aws-kinesis-writable", | ||
"description": "A bunyan stream for kinesis.", | ||
"version": "1.3.1", | ||
"version": "1.4.0", | ||
"author": "José F. Romaniello <jfromaniello@gmail.com> (http://joseoncode.com)", | ||
@@ -15,3 +15,2 @@ "repository": { | ||
"aws-sdk": "~2.1.43", | ||
"immediate-invocation": "^1.0.0", | ||
"lodash": "~3.10.1" | ||
@@ -18,0 +17,0 @@ }, |
@@ -14,4 +14,3 @@ const KinesisStream = require('../'); | ||
function isPrioritaryMsg(msg) { | ||
var entry = JSON.parse(msg); | ||
function isPrioritaryMsg(entry) { | ||
return entry.level >= 40; | ||
@@ -40,3 +39,3 @@ } | ||
length: 10, | ||
isPrioritaryMsg: _.noop | ||
isPrioritaryMsg: undefined | ||
}; | ||
@@ -51,4 +50,4 @@ | ||
var instance = new KinesisStream({ streamName: STREAM_NAME }); | ||
assert.ok(instance._params.buffer); | ||
assert.deepEqual(instance._params.buffer, defaultBuffer); | ||
assert.ok(instance._params.buffer); | ||
assert.deepEqual(instance._params.buffer, defaultBuffer); | ||
assert.equal(typeof instance._params.partitionKey, 'function'); | ||
@@ -59,4 +58,4 @@ }); | ||
var instance = new KinesisStream({ streamName: STREAM_NAME, buffer: true }); | ||
assert.ok(instance._params.buffer); | ||
assert.deepEqual(instance._params.buffer, defaultBuffer); | ||
assert.ok(instance._params.buffer); | ||
assert.deepEqual(instance._params.buffer, defaultBuffer); | ||
}); | ||
@@ -66,3 +65,3 @@ | ||
var instance = new KinesisStream({ streamName: STREAM_NAME, buffer: false }); | ||
assert.ok(!instance._params.buffer); | ||
assert.ok(!instance._params.buffer); | ||
}); | ||
@@ -92,3 +91,2 @@ | ||
describe ('method setStreamName', function () { | ||
[ | ||
@@ -222,2 +220,27 @@ null, | ||
it('should support object events', function (done) { | ||
var x = 1; | ||
var bk = new KinesisStream({ | ||
region: 'us-west-1', | ||
streamName: STREAM_NAME, | ||
partitionKey: 'test-123', | ||
buffer: { timeout: x } | ||
}); | ||
var log_entry = {foo: 'bar'}; | ||
bk._write(log_entry, null, _.noop); | ||
setTimeout(function () { | ||
kinesis.getRecords({ | ||
ShardIterator: iterator, | ||
Limit: 1 | ||
}, function (err, data) { | ||
bk.stop(); | ||
if (err) return done(err); | ||
assert.equal(data.Records.length, 1); | ||
done(); | ||
}); | ||
}, x * 1000 + 100); | ||
}); | ||
it('should send the events after X messages', function (done) { | ||
@@ -250,4 +273,3 @@ var x = 3; | ||
it('should send the events after a error level entry', function (done) { | ||
it('should send the events after an error level entry', function (done) { | ||
var bk = new KinesisStream({ | ||
@@ -254,0 +276,0 @@ region: 'us-west-1', |
New author
Supply chain riskA new npm collaborator published a version of the package for the first time. New collaborators are usually benign additions to a project, but do indicate a change to the security surface area of a package.
Found 1 instance in 1 package
26032
2
630
1
- Removedimmediate-invocation@^1.0.0
- Removedimmediate-invocation@1.0.0(transitive)