aws-kinesis-writable
Advanced tools
Comparing version 1.4.4 to 1.4.5
20
index.js
@@ -21,5 +21,9 @@ var stream = require('stream'); | ||
* before send them to stream. | ||
* @param {number} [params.buffer.maxBatchSize] Max. size in bytes of the batch sent to Kinesis. Default 5242880 (5MiB) | ||
* @param {@function} [params.buffer.isPrioritaryMsg] Evaluates a message and returns true | ||
* when msg is prioritary | ||
*/ | ||
var MAX_BATCH_SIZE = 5*1024*1024 - 1024; // 4.99MiB | ||
function KinesisStream (params) { | ||
@@ -30,3 +34,4 @@ stream.Writable.call(this, { objectMode: params.objectMode }); | ||
timeout: 5, | ||
length: 10 | ||
length: 10, | ||
maxBatchSize: MAX_BATCH_SIZE | ||
}; | ||
@@ -122,2 +127,3 @@ | ||
this._queue = []; | ||
this._batch_size = 0; | ||
@@ -219,3 +225,11 @@ if (pending_records.length === 0) { | ||
if (this._params.buffer) { | ||
if (this._params.buffer) { | ||
// sends buffer when current current record will exceed bmax batch size | ||
this._batch_size = (this._batch_size || 0) + msg.length; | ||
if (this._batch_size > this._params.buffer.maxBatchSize) { | ||
clearTimeout(this._queueWait); | ||
this._sendEntries(); | ||
} | ||
this._queue.push(record); | ||
@@ -254,3 +268,3 @@ | ||
const RECORD_REGEXP = /records\.(\d+)\.member\.data/g | ||
const RECORD_REGEXP = /records\.(\d+)\.member\.data/g; | ||
@@ -257,0 +271,0 @@ function getRecordIndexesFromError (err) { |
{ | ||
"name": "aws-kinesis-writable", | ||
"description": "A bunyan stream for kinesis.", | ||
"version": "1.4.4", | ||
"version": "1.4.5", | ||
"author": "José F. Romaniello <jfromaniello@gmail.com> (http://joseoncode.com)", | ||
@@ -6,0 +6,0 @@ "repository": { |
@@ -45,2 +45,3 @@ const KinesisStream = require('../'); | ||
length: 10, | ||
maxBatchSize: 5241856, | ||
isPrioritaryMsg: undefined | ||
@@ -222,2 +223,32 @@ }; | ||
it('should send the events before buffer exceeds max batch size', function (done) { | ||
var bk = new KinesisStream({ | ||
region: 'us-west-1', | ||
streamName: STREAM_NAME, | ||
partitionKey: 'test-123', | ||
buffer: { length: 1000, timeout: 1, maxBatchSize: 5 * 1024 /* 5KiB */ } | ||
}); | ||
// the max batch size es 5KiB | ||
// so 12 logs larger than 512B (aprox 6KB) will exceed the batch size | ||
for(var i=0; i<12; i++) { | ||
var log_entry = JSON.stringify({i:i, foo_500K: _.repeat('*', 512)}); | ||
bk._write(log_entry, null, _.noop); | ||
} | ||
setTimeout(function () { | ||
kinesis.getRecords({ | ||
ShardIterator: iterator, | ||
Limit: 20 | ||
}, function (err, data) { | ||
bk.stop(); | ||
if (err) return done(err); | ||
// all logs entries should arrive to kinesis | ||
assert.equal(data.Records.length, 12); | ||
done(); | ||
}); | ||
}, 3000); | ||
}); | ||
it('should support object events', function (done) { | ||
@@ -224,0 +255,0 @@ var x = 1; |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
33044
827