aws-kinesis-writable
Advanced tools
Comparing version 1.4.3 to 1.4.4
57
index.js
@@ -118,2 +118,3 @@ var stream = require('stream'); | ||
const self = this; | ||
this._queue = []; | ||
@@ -135,8 +136,12 @@ | ||
this._putRecords(requestContent); | ||
}; | ||
KinesisStream.prototype._putRecords = function(requestContent) { | ||
const self = this; | ||
this._kinesis.putRecords(requestContent, function (err, result) { | ||
self._queueWait = self._queueSendEntries(); | ||
if (err) { | ||
err.streamName = requestContent.StreamName; | ||
err.records = requestContent.Records; | ||
return self.emit('error', err); | ||
return self._retryValidRecords(requestContent, err); | ||
} | ||
@@ -156,2 +161,29 @@ | ||
KinesisStream.prototype._retryValidRecords = function(requestContent, err) { | ||
const self = this; | ||
// By default asumes that all records filed | ||
var failedRecords = requestContent.Records; | ||
// try to find within the error, whih records have fail. | ||
var failedRecordIndexes = getRecordIndexesFromError(err); | ||
if (failedRecordIndexes.length > 0) { | ||
// failed records found, extract them from collection of records | ||
failedRecords = _.pullAt(requestContent.Records, failedRecordIndexes); | ||
// now, try one more time with records that didn't fail. | ||
self._putRecords({ | ||
StreamName: requestContent.StreamName, | ||
Records: requestContent.Records | ||
}); | ||
} | ||
// send error with failed records | ||
err.streamName = requestContent.StreamName; | ||
err.records = failedRecords; | ||
self.emit('error', err); | ||
}; | ||
KinesisStream.prototype._write = function (chunk, encoding, done) { | ||
@@ -218,2 +250,19 @@ if (!this._params.streamName) { | ||
module.exports = KinesisStream; | ||
module.exports = KinesisStream; | ||
const RECORD_REGEXP = /records\.(\d+)\.member\.data/g | ||
function getRecordIndexesFromError (err) { | ||
var matches = []; | ||
if (err && _.isString(err.message)) { | ||
var match = RECORD_REGEXP.exec(err.message); | ||
while (match !== null) { | ||
matches.push(parseInt(match[1], 10) - 1); | ||
match = RECORD_REGEXP.exec(err.message); | ||
} | ||
} | ||
return matches; | ||
} |
{ | ||
"name": "aws-kinesis-writable", | ||
"description": "A bunyan stream for kinesis.", | ||
"version": "1.4.3", | ||
"version": "1.4.4", | ||
"author": "José F. Romaniello <jfromaniello@gmail.com> (http://joseoncode.com)", | ||
@@ -6,0 +6,0 @@ "repository": { |
@@ -19,6 +19,10 @@ const KinesisStream = require('../'); | ||
function get_iterator (callback) { | ||
kinesis.describeStream({ | ||
var options = { | ||
StreamName: STREAM_NAME | ||
}, function (err, stream) { | ||
}; | ||
kinesis.describeStream(options, function (err, stream) { | ||
if (err) return callback(err); | ||
var params = { | ||
@@ -29,3 +33,6 @@ ShardId: stream.StreamDescription.Shards[0].ShardId, | ||
}; | ||
kinesis.getShardIterator(params, callback); | ||
kinesis.getShardIterator(params, function (err, data) { | ||
setImmediate(callback, err, data); | ||
}); | ||
}); | ||
@@ -238,3 +245,3 @@ } | ||
}); | ||
}, x * 1000 + 100); | ||
}, x * 1000 + 300); | ||
}); | ||
@@ -401,6 +408,6 @@ | ||
sinon.stub(bk._kinesis, 'putRecords') | ||
var stub = sinon.stub(bk._kinesis, 'putRecords') | ||
.onFirstCall() | ||
.yields(new Error("some error from AWS")); | ||
bk.on('error', function (err) { | ||
@@ -413,2 +420,3 @@ assert.ok(err instanceof Error); | ||
assert.deepEqual(err.records[0], { Data: 'foo', PartitionKey: 'foo' }); | ||
stub.stub.restore(); | ||
done(); | ||
@@ -449,3 +457,3 @@ }); | ||
sinon.stub(bk._kinesis, 'putRecords') | ||
var stub = sinon.stub(bk._kinesis, 'putRecords') | ||
.onFirstCall() | ||
@@ -478,2 +486,3 @@ .yields(null, response); | ||
}); | ||
stub.stub.restore(); | ||
done(); | ||
@@ -491,3 +500,55 @@ break; | ||
}); | ||
it ('should retries with valid records after an error ocurred. (this test takes like a minute)', function (done) { | ||
this.timeout(120000); | ||
var bk = new KinesisStream({ | ||
region: 'us-west-1', | ||
buffer: { length: 3, timeout: 2 }, | ||
streamName: STREAM_NAME, | ||
partitionKey: "bar" | ||
}); | ||
var largerThan1MB = JSON.stringify({ x : _.repeat('*', 1024*1024) }); | ||
var shorterThan1MB = JSON.stringify({ x: 1 }); | ||
bk.on('error', function (err) { | ||
// should receive only one record, the one that es larger than 1MB | ||
assert.ok(err instanceof Error); | ||
assert.equal(err.streamName, STREAM_NAME); | ||
assert.ok(err.records); | ||
assert.equal(err.records.length, 1); | ||
assert.equal(err.records[0].Data, largerThan1MB); | ||
setTimeout(validateRecordsAtKinesis, 1000); | ||
}); | ||
// write 3 records, | ||
bk._write(shorterThan1MB, null, function (err) { | ||
assert.ok(!err); | ||
}); | ||
bk._write(largerThan1MB, null, function (err) { | ||
assert.ok(!err); | ||
}); | ||
bk._write(shorterThan1MB, null, function (err) { | ||
assert.ok(!err); | ||
}); | ||
function validateRecordsAtKinesis() { | ||
// two records shorter than 1MB must be stored at Kinesis | ||
kinesis.getRecords({ | ||
ShardIterator: iterator, | ||
Limit: 10 | ||
}, function (err, result) { | ||
bk.stop(); | ||
if (err) return done(err); | ||
assert.equal(result.Records.length, 2); | ||
assert.equal(result.Records[0].Data, shorterThan1MB); | ||
assert.equal(result.Records[1].Data, shorterThan1MB); | ||
done(); | ||
}); | ||
} | ||
}); | ||
}); | ||
}); |
31545
790