Socket
Socket
Sign inDemoInstall

fluent-logger

Package Overview
Dependencies
Maintainers
4
Versions
46
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

fluent-logger - npm Package Compare versions

Comparing version 2.4.1 to 2.4.2

217

lib/sender.js

@@ -27,5 +27,3 @@ 'use strict';

this._timeResolution = options.milliseconds ? 1 : 1000;
this._socket = null;
this._data = null;
this._sendQueue = []; // queue for items waiting for being sent.
this._sendQueue = []; // queue for items waiting for being sent.
this._eventEmitter = new EventEmitter();

@@ -38,3 +36,3 @@ }

// Label must be string always
if (typeof args[0] === "string") label = args.shift();
if (typeof args[0] === 'string') label = args.shift();

@@ -45,11 +43,9 @@ // Data can be almost anything

// Date can be either timestamp number or Date object
if (typeof args[0] !== "function") timestamp = args.shift();
if (typeof args[0] !== 'function') timestamp = args.shift();
// Last argument is an optional callback
if (typeof args[0] === "function") callback = args.shift();
if (typeof args[0] === 'function') callback = args.shift();
var item = this._makePacketItem(label, data, timestamp);
var self = this;
var item = self._makePacketItem(label, data, timestamp);
var error;

@@ -59,12 +55,12 @@ var options;

options = {
tag_prefix: self.tag_prefix,
tag_prefix: this.tag_prefix,
label: label
};
error = new FluentLoggerError.MissingTag('tag is missing', options);
self._handleEvent('error', error, callback);
this._handleEvent('error', error, callback);
return;
}
if (typeof item.data !== "object") {
if (typeof item.data !== 'object') {
options = {
tag_prefix: self.tag_prefix,
tag_prefix: this.tag_prefix,
label: label,

@@ -74,3 +70,3 @@ record: item.data

error = new FluentLoggerError.DataTypeError('data must be an object', options);
self._handleEvent('error', error, callback);
this._handleEvent('error', error, callback);
return;

@@ -81,5 +77,5 @@ }

self._sendQueue.push(item);
self._connect(function(){
self._flushSendQueue();
this._sendQueue.push(item);
this._connect((socket) => {
this._flushSendQueue(socket);
});

@@ -95,8 +91,7 @@ };

FluentSender.prototype.end = function(label, data, callback){
var self = this;
if ((label != null && data != null)) {
self.emit(label, data, function(err) {
self._close();
this.emit(label, data, (err) => {
this._close();
if (err) {
self._handleEvent('error', err, callback);
this._handleEvent('error', err, callback);
} else {

@@ -107,4 +102,4 @@ callback && callback();

} else {
process.nextTick(function() {
self._close();
process.nextTick(() => {
this._close();
callback && callback();

@@ -116,6 +111,2 @@ });

FluentSender.prototype._close = function() {
if (this._socket) {
this._socket.end();
this._socket = null;
}
};

@@ -125,8 +116,7 @@

FluentSender.prototype._makePacketItem = function(label, data, time){
var self = this;
var tag = null;
if (self.tag_prefix && label) {
tag = [self.tag_prefix, label].join('.');
} else if (self.tag_prefix) {
tag = self.tag_prefix;
if (this.tag_prefix && label) {
tag = [this.tag_prefix, label].join('.');
} else if (this.tag_prefix) {
tag = this.tag_prefix;
} else if (label) {

@@ -136,3 +126,3 @@ tag = label;

if (typeof time !== "number" && !(time instanceof EventTime)) {
if (typeof time !== 'number' && !(time instanceof EventTime)) {
time = Math.floor((time ? time.getTime() : Date.now()) / this._timeResolution);

@@ -143,3 +133,3 @@ }

var options = {};
if (self.requireAckResponse) {
if (this.requireAckResponse) {
options = {

@@ -160,55 +150,26 @@ chunk: crypto.randomBytes(16).toString('base64')

FluentSender.prototype._connect = function(callback){
var self = this;
if (self._socket === null) {
self._socket = new net.Socket();
self._socket.setTimeout(self.timeout);
self._socket.on('error', function(err) {
if (self._socket) {
self._socket.destroy();
self._socket = null;
self._handleEvent('error', err);
}
});
self._socket.on('connect', function() {
self._handleEvent('connect');
});
self._socket.on('data', function(data) {
self._data = data;
});
if (self.path) {
self._socket.connect(self.path, callback);
} else {
self._socket.connect(self.port, self.host, callback);
}
var socket = new net.Socket();
socket.setTimeout(this.timeout);
socket.on('error', (err) => {
this._handleEvent('error', err);
});
socket.on('connect', () => {
this._handleEvent('connect');
});
if (this.path) {
socket.connect(this.path, () => callback(socket));
} else {
if (!self._socket.writable) {
self._socket.destroy();
self._socket = null;
process.nextTick(function() {
self._connect(callback);
});
} else {
process.nextTick(function() {
callback();
});
}
socket.connect(this.port, this.host, () => callback(socket));
}
};
FluentSender.prototype._flushSendQueue = function() {
var self = this;
if (self._flushingSendQueue)
FluentSender.prototype._flushSendQueue = function(socket) {
if (this._sendQueue.length === 0)
return;
self._flushingSendQueue = true;
process.nextTick(function waitToWrite() {
if (!self._socket) {
self._flushingSendQueue = false;
return;
}
if (self._socket.writable) {
self._doFlushSendQueue();
process.nextTick(() => {
if (socket.writable && socket.readable) {
this._doFlushSendQueue(socket);
} else {
process.nextTick(waitToWrite);
process.nextTick(arguments.callee);
}

@@ -218,37 +179,41 @@ });

FluentSender.prototype._doFlushSendQueue = function() {
var self = this;
var item = self._sendQueue.shift();
FluentSender.prototype._doFlushSendQueue = function(socket) {
var item = this._sendQueue.shift();
var timeoutId = null;
if (item === undefined) {
self._flushingSendQueue = false;
// nothing written;
socket && socket.destroy();
socket = null;
} else {
self._socket.write(new Buffer(item.packet), function(){
if (self.requireAckResponse) {
var intervalId = setInterval(function() {
if (self._data) {
var response = msgpack.decode(self._data, { codec: codec });
self._data = null;
clearInterval(intervalId);
clearTimeout(timeoutId);
if (response.ack !== item.options.chunk) {
var error = new FluentLoggerError.ResponseError('ack in response and chunk id in sent data are different',
{ ack: response.ack, chunk: item.options.chunk });
self._handleEvent('error', error, item.callback);
}
item.callback && item.callback();
process.nextTick(function() {
self._doFlushSendQueue(); // if socket is still available
});
if (socket === null) {
this._sendQueue.unshift(item);
this._connect((socket) => this._doFlushSendQueue(socket));
return;
}
socket.write(new Buffer(item.packet), () => {
if (this.requireAckResponse) {
socket.once('data', (data) => {
timeoutId && clearTimeout(timeoutId);
var response = msgpack.decode(data, { codec: codec });
if (response.ack !== item.options.chunk) {
var error = new FluentLoggerError.ResponseError('ack in response and chunk id in sent data are different',
{ ack: response.ack, chunk: item.options.chunk });
this._handleEvent('error', error, item.callback);
}
}, 10);
var timeoutId = setTimeout(function() {
item.callback && item.callback();
socket && socket.destroy();
socket = null;
process.nextTick(() => {
this._doFlushSendQueue(socket);
});
});
timeoutId = setTimeout(() => {
var error = new FluentLoggerError.ResponseTimeout('ack response timeout');
self._handleEvent('error', error, item.callback);
clearInterval(intervalId);
}, self.ackResponseTimeout);
this._handleEvent('error', error, item.callback);
}, this.ackResponseTimeout);
} else {
item.callback && item.callback();
process.nextTick(function() {
self._doFlushSendQueue(); // if socket is still available
socket && socket.destroy();
socket = null;
process.nextTick(() => {
this._doFlushSendQueue(socket);
});

@@ -262,6 +227,5 @@ }

FluentSender.prototype._handleEvent = function _handleEvent(signal, data, callback) {
var self = this;
callback && callback(data);
if (self._eventEmitter.listenerCount(signal) > 0) {
self._eventEmitter.emit(signal, data);
if (this._eventEmitter.listenerCount(signal) > 0) {
this._eventEmitter.emit(signal, data);
}

@@ -271,15 +235,15 @@ };

FluentSender.prototype._setupErrorHandler = function _setupErrorHandler() {
var self = this;
if (!self.reconnectInterval) {
if (!this.reconnectInterval) {
return;
}
self.on('error', function(error) {
self.internalLogger.error('Fluentd error', error);
self.internalLogger.info('Fluentd will reconnect after ' + self.reconnectInterval / 1000 + ' seconds');
setTimeout(function() {
self.internalLogger.info("Fluentd is reconnecting...");
self._connect(function() {
self.internalLogger.info('Fluentd reconnection finished!!');
this.on('error', (error) => {
this.internalLogger.error('Fluentd error', error);
this.internalLogger.info('Fluentd will reconnect after ' + this.reconnectInterval / 1000 + ' seconds');
setTimeout(() => {
this.internalLogger.info('Fluentd is reconnecting...');
this._connect((socket) => {
this.internalLogger.info('Fluentd reconnection finished!!');
this._flushSendQueue(socket);
});
}, self.reconnectInterval);
}, this.reconnectInterval);
});

@@ -299,8 +263,7 @@ };

var defaultEncoding = options.encoding || 'UTF-8';
var self = this;
var writable = new stream.Writable();
var dataString = '';
writable._write = function(chunk, encoding, callback) {
writable._write = (chunk, encoding, callback) => {
var dataArray = chunk.toString(defaultEncoding).split(/\n/);
function next() {
var next = () => {
if (dataArray.length) {

@@ -313,5 +276,5 @@ dataString += dataArray.shift();

}
self.emit(label, { message: dataString }, function(err) {
this.emit(label, { message: dataString }, (err) => {
if (err) {
self._handleEvent('error', err, callback);
this._handleEvent('error', err, callback);
return;

@@ -318,0 +281,0 @@ }

@@ -6,4 +6,3 @@ 'use strict';

function MockFluentdServer(options){
var self = this;
function MockFluentdServer(options) {
this._port = null;

@@ -13,11 +12,11 @@ this._options = options;

this._clients = {};
this._server = net.createServer(function(socket){
this._server = net.createServer((socket) => {
var clientKey = socket.remoteAddress + ":" + socket.remotePort;
self._clients[clientKey] = socket;
socket.on('end', function(){
delete(self._clients[clientKey]);
this._clients[clientKey] = socket;
socket.on('end', () => {
delete(this._clients[clientKey]);
});
var stream = msgpack.createDecodeStream();
socket.pipe(stream).on('data', function(m){
self._received.push({
socket.pipe(stream).on('data', (m) => {
this._received.push({
tag: m[0],

@@ -29,3 +28,3 @@ time: m[1],

var options = m[3];
if (self._options.requireAckResponse && options && options.chunk) {
if (this._options.requireAckResponse && options && options.chunk) {
var response = {

@@ -40,7 +39,7 @@ ack: options.chunk

MockFluentdServer.prototype.__defineGetter__('port', function(){
MockFluentdServer.prototype.__defineGetter__('port', function() {
return this._port;
});
MockFluentdServer.prototype.__defineGetter__('messages', function(){
MockFluentdServer.prototype.__defineGetter__('messages', function() {
return this._received;

@@ -51,5 +50,4 @@ });

MockFluentdServer.prototype.listen = function(callback){
var self = this;
this._server.listen(function(){
self._port = self._server.address().port;
this._server.listen(() => {
this._port = this._server.address().port;
callback();

@@ -59,33 +57,21 @@ });

MockFluentdServer.prototype.close = function(callback){
var self = this;
if( process.version.match(/^v0\.6\./) ){ // 0.6.x does not support callback for server.close();
this._server.close();
(function waitForClose(){
if( Object.keys(self._clients).length > 0 ){
setTimeout(waitForClose, 100);
}else{
callback();
}
})();
}else{
this._server.close(function(){
callback();
});
MockFluentdServer.prototype.close = function(callback) {
this._server.close(function() {
callback();
});
for (var i in this._clients) {
this._clients[i].end();
// this._clients[i].destroy();
}
for(var i in self._clients){
self._clients[i].end();
// self._clients[i].destroy();
}
};
module.exports = {
runServer: function(options, callback){
runServer: function(options, callback) {
var server = new MockFluentdServer(options);
server.listen(function(){
callback(server, function(_callback){
server.listen(function() {
callback(server, function(_callback) {
// wait 100 ms to receive all messages and then close
setTimeout(function(){
setTimeout(function() {
var messages = server.messages;
server.close(function(){
server.close(function() {
_callback && _callback(messages);

@@ -92,0 +78,0 @@ });

@@ -32,4 +32,3 @@ 'use strict';

fluentTransport.prototype.log = function(level, message, meta, callback) {
var self = this;
var sender = self.sender;
var sender = this.sender;

@@ -43,7 +42,7 @@ var data = {

sender.emit(data, function(error) {
sender.emit(data, (error) => {
if (error) {
self.emit('error', error);
this.emit('error', error);
} else {
self.emit('logged');
this.emit('logged');
}

@@ -50,0 +49,0 @@ });

{
"name": "fluent-logger",
"version": "2.4.1",
"version": "2.4.2",
"main": "./lib/index.js",

@@ -5,0 +5,0 @@ "scripts": {

@@ -522,21 +522,2 @@ var expect = require('chai').expect;

// Internal behavior test.
it('should not flush queue if existing connection is unavailable.', function(done){
runServer({}, function(server, finish){
var s = new sender.FluentSender('debug', {port: server.port});
s.emit('1st record', { message: '1st data' }, function(){
s._socket.destroy();
s.emit('2nd record', { message: '2nd data' }, function(){
finish(function(data){
expect(data[0].tag).to.be.equal("debug.1st record");
expect(data[0].data.message).to.be.equal("1st data");
expect(data[1].tag).to.be.equal("debug.2nd record");
expect(data[1].data.message).to.be.equal("2nd data");
done();
});
});
});
});
});
it('should write stream.', function(done) {

@@ -543,0 +524,0 @@ runServer({}, function(server, finish) {

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc