New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

aws-kinesis-writable

Package Overview
Dependencies
Maintainers
3
Versions
31
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

aws-kinesis-writable - npm Package Compare versions

Comparing version 1.3.1 to 1.4.0

95

index.js
var stream = require('stream');
var util = require('util');
var AWS = require('aws-sdk');
var immediate = require('immediate-invocation');
var _ = require('lodash');

@@ -26,3 +25,3 @@

function KinesisStream (params) {
stream.Writable.call(this);
stream.Writable.call(this, { objectMode: params.objectMode });

@@ -35,3 +34,3 @@ var defaultBuffer = {

this._params = _.defaultsDeep(params || {}, {
streamName: null,
streamName: null,
buffer: defaultBuffer,

@@ -61,7 +60,11 @@ partitionKey: function() { // by default the partition key will generate

this._queue = [];
this._queueWait = setTimeout(this._sendEntries.bind(this), this._params.buffer.timeout * 1000);
this._params.buffer.isPrioritaryMsg = this._params.buffer.isPrioritaryMsg || _.noop;
this._queueWait = this._queueSendEntries();
this._params.buffer.isPrioritaryMsg = this._params.buffer.isPrioritaryMsg;
}
this._kinesis = new AWS.Kinesis(_.pick(params, ['accessKeyId', 'secretAccessKey', 'region']));
this._kinesis = new AWS.Kinesis(_.pick(params, [
'accessKeyId',
'secretAccessKey',
'region',
'httpOptions']));
}

@@ -71,2 +74,9 @@

KinesisStream.prototype._queueSendEntries = function () {
var self = this;
return setTimeout(function(){
self._sendEntries();
}, this._params.buffer.timeout * 1000);
};
KinesisStream.prototype.setStreamName = function (streamName) {

@@ -88,13 +98,25 @@

* @param {@} msg - entry
* @return {Object} { Data, PartitionKey }
* @return {Record} { Data, PartitionKey, StreamName }
*/
KinesisStream.prototype._mapEntry = function (msg) {
return {
Data: msg,
PartitionKey: this._params.partitionKey(msg)
};
KinesisStream.prototype._mapEntry = function (msg, buffer) {
if (buffer){
return new BufferedRecord(msg, this._params.partitionKey(msg));
} else {
return new NonBufferedRecord(msg, this._params.partitionKey(msg), this._params.streamName);
}
};
var BufferedRecord = function(data, pk){
this.Data = data;
this.PartitionKey = pk;
};
var NonBufferedRecord = function(data, pk, streamName){
this.Data = data;
this.PartitionKey = pk;
this.StreamName = streamName;
};
KinesisStream.prototype._sendEntries = function () {
const pending_records = _.clone(this._queue);
const pending_records = this._queue;
const self = this;

@@ -104,3 +126,3 @@ this._queue = [];

if (pending_records.length === 0) {
self._queueWait = setTimeout(self._sendEntries.bind(self), self._params.buffer.timeout * 1000);
this._queueWait = this._queueSendEntries();
return;

@@ -110,3 +132,3 @@ }

if (!self._params.streamName) {
self.emit('error', new Error('Stream\'s name was not set.'));
this.emit('error', new Error('Stream\'s name was not set.'));
}

@@ -119,4 +141,4 @@

self._kinesis.putRecords(requestContent, function (err, result) {
self._queueWait = setTimeout(self._sendEntries.bind(self), self._params.buffer.timeout * 1000);
this._kinesis.putRecords(requestContent, function (err, result) {
self._queueWait = self._queueSendEntries();
if (err) {

@@ -138,19 +160,34 @@ return self.emit('error', err);

KinesisStream.prototype._write = function (chunk, encoding, done) {
var self = this;
if (!this._params.streamName) {
return immediate (done, new Error('Stream\'s name was not set.'));
return setImmediate (done, new Error('Stream\'s name was not set.'));
}
var isPrioMessage = this._params.buffer.isPrioritaryMsg;
try {
var msg = chunk.toString();
var record = self._mapEntry(msg);
var obj, msg;
if (Buffer.isBuffer(chunk)) {
msg = chunk;
if (isPrioMessage){
obj = JSON.parse(chunk.toString(encoding));
}
} else if (typeof chunk === 'string') {
msg = chunk;
if (isPrioMessage){
obj = JSON.parse(chunk);
}
} else {
obj = chunk;
msg = JSON.stringify(obj);
}
var record = this._mapEntry(msg, !!this._params.buffer);
if (this._params.buffer) {
this._queue.push(record);
// sends buffer when msg is prioritary
// sends buffer when current chunk is for prioritary entry
var shouldSendEntries = this._queue.length >= this._params.buffer.length || // queue reached max size
this._params.buffer.isPrioritaryMsg(msg); // msg is prioritary
(isPrioMessage && isPrioMessage(obj)); // msg is prioritary
if (shouldSendEntries) {

@@ -160,12 +197,10 @@ clearTimeout(this._queueWait);

}
return immediate(done);
return setImmediate(done);
}
self._kinesis.putRecord(_.extend({
StreamName: self._params.streamName
}, record), function (err) {
this._kinesis.putRecord(record, function (err) {
done(err);
});
} catch (e) {
immediate(done, e);
setImmediate(done, e);
}

@@ -172,0 +207,0 @@ };

{
"name": "aws-kinesis-writable",
"description": "A bunyan stream for kinesis.",
"version": "1.3.1",
"version": "1.4.0",
"author": "José F. Romaniello <jfromaniello@gmail.com> (http://joseoncode.com)",

@@ -15,3 +15,2 @@ "repository": {

"aws-sdk": "~2.1.43",
"immediate-invocation": "^1.0.0",
"lodash": "~3.10.1"

@@ -18,0 +17,0 @@ },

@@ -14,4 +14,3 @@ const KinesisStream = require('../');

function isPrioritaryMsg(msg) {
var entry = JSON.parse(msg);
function isPrioritaryMsg(entry) {
return entry.level >= 40;

@@ -40,3 +39,3 @@ }

length: 10,
isPrioritaryMsg: _.noop
isPrioritaryMsg: undefined
};

@@ -51,4 +50,4 @@

var instance = new KinesisStream({ streamName: STREAM_NAME });
assert.ok(instance._params.buffer);
assert.deepEqual(instance._params.buffer, defaultBuffer);
assert.ok(instance._params.buffer);
assert.deepEqual(instance._params.buffer, defaultBuffer);
assert.equal(typeof instance._params.partitionKey, 'function');

@@ -59,4 +58,4 @@ });

var instance = new KinesisStream({ streamName: STREAM_NAME, buffer: true });
assert.ok(instance._params.buffer);
assert.deepEqual(instance._params.buffer, defaultBuffer);
assert.ok(instance._params.buffer);
assert.deepEqual(instance._params.buffer, defaultBuffer);
});

@@ -66,3 +65,3 @@

var instance = new KinesisStream({ streamName: STREAM_NAME, buffer: false });
assert.ok(!instance._params.buffer);
assert.ok(!instance._params.buffer);
});

@@ -92,3 +91,2 @@

describe ('method setStreamName', function () {
[

@@ -222,2 +220,27 @@ null,

it('should support object events', function (done) {
var x = 1;
var bk = new KinesisStream({
region: 'us-west-1',
streamName: STREAM_NAME,
partitionKey: 'test-123',
buffer: { timeout: x }
});
var log_entry = {foo: 'bar'};
bk._write(log_entry, null, _.noop);
setTimeout(function () {
kinesis.getRecords({
ShardIterator: iterator,
Limit: 1
}, function (err, data) {
bk.stop();
if (err) return done(err);
assert.equal(data.Records.length, 1);
done();
});
}, x * 1000 + 100);
});
it('should send the events after X messages', function (done) {

@@ -250,4 +273,3 @@ var x = 3;

it('should send the events after a error level entry', function (done) {
it('should send the events after an error level entry', function (done) {
var bk = new KinesisStream({

@@ -254,0 +276,0 @@ region: 'us-west-1',

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc