New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

aws-kinesis-writable

Package Overview
Dependencies
Maintainers
3
Versions
31
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

aws-kinesis-writable - npm Package Compare versions

Comparing version 1.4.3 to 1.4.4

57

index.js

@@ -118,2 +118,3 @@ var stream = require('stream');

const self = this;
this._queue = [];

@@ -135,8 +136,12 @@

this._putRecords(requestContent);
};
KinesisStream.prototype._putRecords = function(requestContent) {
const self = this;
this._kinesis.putRecords(requestContent, function (err, result) {
self._queueWait = self._queueSendEntries();
if (err) {
err.streamName = requestContent.StreamName;
err.records = requestContent.Records;
return self.emit('error', err);
return self._retryValidRecords(requestContent, err);
}

@@ -156,2 +161,29 @@

KinesisStream.prototype._retryValidRecords = function(requestContent, err) {
const self = this;
// By default asumes that all records filed
var failedRecords = requestContent.Records;
// try to find within the error, whih records have fail.
var failedRecordIndexes = getRecordIndexesFromError(err);
if (failedRecordIndexes.length > 0) {
// failed records found, extract them from collection of records
failedRecords = _.pullAt(requestContent.Records, failedRecordIndexes);
// now, try one more time with records that didn't fail.
self._putRecords({
StreamName: requestContent.StreamName,
Records: requestContent.Records
});
}
// send error with failed records
err.streamName = requestContent.StreamName;
err.records = failedRecords;
self.emit('error', err);
};
KinesisStream.prototype._write = function (chunk, encoding, done) {

@@ -218,2 +250,19 @@ if (!this._params.streamName) {

module.exports = KinesisStream;
module.exports = KinesisStream;
const RECORD_REGEXP = /records\.(\d+)\.member\.data/g
function getRecordIndexesFromError (err) {
var matches = [];
if (err && _.isString(err.message)) {
var match = RECORD_REGEXP.exec(err.message);
while (match !== null) {
matches.push(parseInt(match[1], 10) - 1);
match = RECORD_REGEXP.exec(err.message);
}
}
return matches;
}

2

package.json
{
"name": "aws-kinesis-writable",
"description": "A bunyan stream for kinesis.",
"version": "1.4.3",
"version": "1.4.4",
"author": "José F. Romaniello <jfromaniello@gmail.com> (http://joseoncode.com)",

@@ -6,0 +6,0 @@ "repository": {

@@ -19,6 +19,10 @@ const KinesisStream = require('../');

function get_iterator (callback) {
kinesis.describeStream({
var options = {
StreamName: STREAM_NAME
}, function (err, stream) {
};
kinesis.describeStream(options, function (err, stream) {
if (err) return callback(err);
var params = {

@@ -29,3 +33,6 @@ ShardId: stream.StreamDescription.Shards[0].ShardId,

};
kinesis.getShardIterator(params, callback);
kinesis.getShardIterator(params, function (err, data) {
setImmediate(callback, err, data);
});
});

@@ -238,3 +245,3 @@ }

});
}, x * 1000 + 100);
}, x * 1000 + 300);
});

@@ -401,6 +408,6 @@

sinon.stub(bk._kinesis, 'putRecords')
var stub = sinon.stub(bk._kinesis, 'putRecords')
.onFirstCall()
.yields(new Error("some error from AWS"));
bk.on('error', function (err) {

@@ -413,2 +420,3 @@ assert.ok(err instanceof Error);

assert.deepEqual(err.records[0], { Data: 'foo', PartitionKey: 'foo' });
stub.stub.restore();
done();

@@ -449,3 +457,3 @@ });

sinon.stub(bk._kinesis, 'putRecords')
var stub = sinon.stub(bk._kinesis, 'putRecords')
.onFirstCall()

@@ -478,2 +486,3 @@ .yields(null, response);

});
stub.stub.restore();
done();

@@ -491,3 +500,55 @@ break;

});
it ('should retries with valid records after an error ocurred. (this test takes like a minute)', function (done) {
this.timeout(120000);
var bk = new KinesisStream({
region: 'us-west-1',
buffer: { length: 3, timeout: 2 },
streamName: STREAM_NAME,
partitionKey: "bar"
});
var largerThan1MB = JSON.stringify({ x : _.repeat('*', 1024*1024) });
var shorterThan1MB = JSON.stringify({ x: 1 });
bk.on('error', function (err) {
// should receive only one record, the one that es larger than 1MB
assert.ok(err instanceof Error);
assert.equal(err.streamName, STREAM_NAME);
assert.ok(err.records);
assert.equal(err.records.length, 1);
assert.equal(err.records[0].Data, largerThan1MB);
setTimeout(validateRecordsAtKinesis, 1000);
});
// write 3 records,
bk._write(shorterThan1MB, null, function (err) {
assert.ok(!err);
});
bk._write(largerThan1MB, null, function (err) {
assert.ok(!err);
});
bk._write(shorterThan1MB, null, function (err) {
assert.ok(!err);
});
function validateRecordsAtKinesis() {
// two records shorter than 1MB must be stored at Kinesis
kinesis.getRecords({
ShardIterator: iterator,
Limit: 10
}, function (err, result) {
bk.stop();
if (err) return done(err);
assert.equal(result.Records.length, 2);
assert.equal(result.Records[0].Data, shorterThan1MB);
assert.equal(result.Records[1].Data, shorterThan1MB);
done();
});
}
});
});
});
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc