Socket
Socket
Sign inDemoInstall

fluent-logger

Package Overview
Dependencies
Maintainers
4
Versions
46
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

fluent-logger - npm Package Compare versions

Comparing version 2.4.2 to 2.4.3

90

lib/sender.js

@@ -27,2 +27,3 @@ 'use strict';

this._timeResolution = options.milliseconds ? 1 : 1000;
this._socket = null;
this._sendQueue = []; // queue for items waiting for being sent.

@@ -74,4 +75,4 @@ this._eventEmitter = new EventEmitter();

this._sendQueue.push(item);
this._connect((socket) => {
this._flushSendQueue(socket);
this._connect(() => {
this._flushSendQueue();
});

@@ -105,2 +106,6 @@ };

FluentSender.prototype._close = function() {
if (this._socket) {
this._socket.end();
this._socket = null;
}
};

@@ -141,26 +146,48 @@

FluentSender.prototype._connect = function(callback){
var socket = new net.Socket();
socket.setTimeout(this.timeout);
socket.on('error', (err) => {
this._handleEvent('error', err);
});
socket.on('connect', () => {
this._handleEvent('connect');
});
if (this.path) {
socket.connect(this.path, () => callback(socket));
if (this._socket === null) {
this._socket = new net.Socket();
this._socket.setTimeout(this.timeout);
this._socket.on('error', (err) => {
if (this._socket) {
this._socket.destroy();
this._socket = null;
this._handleEvent('error', err);
}
});
this._socket.on('connect', () => {
this._handleEvent('connect');
});
if (this.path) {
this._socket.connect(this.path, callback);
} else {
this._socket.connect(this.port, this.host, callback);
}
} else {
socket.connect(this.port, this.host, () => callback(socket));
if (!this._socket.writable) {
this._socket.destroy();
this._socket = null;
process.nextTick(() => {
this._connect(callback);
});
} else {
process.nextTick(callback);
}
}
};
FluentSender.prototype._flushSendQueue = function(socket) {
if (this._sendQueue.length === 0)
FluentSender.prototype._flushSendQueue = function() {
if (this._flushingSendQueue)
return;
this._flushingSendQueue = true;
process.nextTick(() => {
if (socket.writable && socket.readable) {
this._doFlushSendQueue(socket);
if (!this._socket) {
this._flushingSendQueue = false;
return;
}
if (this._socket.writable) {
this._doFlushSendQueue();
} else {
process.nextTick(arguments.callee);
process.nextTick(waitToWrite);
}

@@ -170,17 +197,12 @@ });

FluentSender.prototype._doFlushSendQueue = function(socket) {
FluentSender.prototype._doFlushSendQueue = function() {
var item = this._sendQueue.shift();
var timeoutId = null;
if (item === undefined) {
socket && socket.destroy();
socket = null;
this._flushingSendQueue = false;
// nothing written;
} else {
if (socket === null) {
this._sendQueue.unshift(item);
this._connect((socket) => this._doFlushSendQueue(socket));
return;
}
socket.write(new Buffer(item.packet), () => {
this._socket.write(new Buffer(item.packet), () => {
if (this.requireAckResponse) {
socket.once('data', (data) => {
this._socket.once('data', (data) => {
timeoutId && clearTimeout(timeoutId);

@@ -194,6 +216,4 @@ var response = msgpack.decode(data, { codec: codec });

item.callback && item.callback();
socket && socket.destroy();
socket = null;
process.nextTick(() => {
this._doFlushSendQueue(socket);
this._doFlushSendQueue(); // if socket is still available
});

@@ -207,6 +227,4 @@ });

item.callback && item.callback();
socket && socket.destroy();
socket = null;
process.nextTick(() => {
this._doFlushSendQueue(socket);
this._doFlushSendQueue(); // if socket is still available
});

@@ -231,2 +249,3 @@ }

this.on('error', (error) => {
this._flushingSendQueue = false;
this.internalLogger.error('Fluentd error', error);

@@ -236,5 +255,4 @@ this.internalLogger.info('Fluentd will reconnect after ' + this.reconnectInterval / 1000 + ' seconds');

this.internalLogger.info('Fluentd is reconnecting...');
this._connect((socket) => {
this._connect(() => {
this.internalLogger.info('Fluentd reconnection finished!!');
this._flushSendQueue(socket);
});

@@ -241,0 +259,0 @@ }, this.reconnectInterval);

2

package.json
{
"name": "fluent-logger",
"version": "2.4.2",
"version": "2.4.3",
"main": "./lib/index.js",

@@ -5,0 +5,0 @@ "scripts": {

@@ -522,2 +522,21 @@ var expect = require('chai').expect;

// Internal behavior test.
it('should not flush queue if existing connection is unavailable.', function(done){
runServer({}, function(server, finish){
var s = new sender.FluentSender('debug', {port: server.port});
s.emit('1st record', { message: '1st data' }, function(){
s._socket.destroy();
s.emit('2nd record', { message: '2nd data' }, function(){
finish(function(data){
expect(data[0].tag).to.be.equal("debug.1st record");
expect(data[0].data.message).to.be.equal("1st data");
expect(data[1].tag).to.be.equal("debug.2nd record");
expect(data[1].data.message).to.be.equal("2nd data");
done();
});
});
});
});
});
it('should write stream.', function(done) {

@@ -524,0 +543,0 @@ runServer({}, function(server, finish) {

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc