fluent-logger
Advanced tools
Comparing version 2.4.2 to 2.4.3
@@ -27,2 +27,3 @@ 'use strict'; | ||
this._timeResolution = options.milliseconds ? 1 : 1000; | ||
this._socket = null; | ||
this._sendQueue = []; // queue for items waiting for being sent. | ||
@@ -74,4 +75,4 @@ this._eventEmitter = new EventEmitter(); | ||
this._sendQueue.push(item); | ||
this._connect((socket) => { | ||
this._flushSendQueue(socket); | ||
this._connect(() => { | ||
this._flushSendQueue(); | ||
}); | ||
@@ -105,2 +106,6 @@ }; | ||
FluentSender.prototype._close = function() { | ||
if (this._socket) { | ||
this._socket.end(); | ||
this._socket = null; | ||
} | ||
}; | ||
@@ -141,26 +146,48 @@ | ||
FluentSender.prototype._connect = function(callback){ | ||
var socket = new net.Socket(); | ||
socket.setTimeout(this.timeout); | ||
socket.on('error', (err) => { | ||
this._handleEvent('error', err); | ||
}); | ||
socket.on('connect', () => { | ||
this._handleEvent('connect'); | ||
}); | ||
if (this.path) { | ||
socket.connect(this.path, () => callback(socket)); | ||
if (this._socket === null) { | ||
this._socket = new net.Socket(); | ||
this._socket.setTimeout(this.timeout); | ||
this._socket.on('error', (err) => { | ||
if (this._socket) { | ||
this._socket.destroy(); | ||
this._socket = null; | ||
this._handleEvent('error', err); | ||
} | ||
}); | ||
this._socket.on('connect', () => { | ||
this._handleEvent('connect'); | ||
}); | ||
if (this.path) { | ||
this._socket.connect(this.path, callback); | ||
} else { | ||
this._socket.connect(this.port, this.host, callback); | ||
} | ||
} else { | ||
socket.connect(this.port, this.host, () => callback(socket)); | ||
if (!this._socket.writable) { | ||
this._socket.destroy(); | ||
this._socket = null; | ||
process.nextTick(() => { | ||
this._connect(callback); | ||
}); | ||
} else { | ||
process.nextTick(callback); | ||
} | ||
} | ||
}; | ||
FluentSender.prototype._flushSendQueue = function(socket) { | ||
if (this._sendQueue.length === 0) | ||
FluentSender.prototype._flushSendQueue = function() { | ||
if (this._flushingSendQueue) | ||
return; | ||
this._flushingSendQueue = true; | ||
process.nextTick(() => { | ||
if (socket.writable && socket.readable) { | ||
this._doFlushSendQueue(socket); | ||
if (!this._socket) { | ||
this._flushingSendQueue = false; | ||
return; | ||
} | ||
if (this._socket.writable) { | ||
this._doFlushSendQueue(); | ||
} else { | ||
process.nextTick(arguments.callee); | ||
process.nextTick(waitToWrite); | ||
} | ||
@@ -170,17 +197,12 @@ }); | ||
FluentSender.prototype._doFlushSendQueue = function(socket) { | ||
FluentSender.prototype._doFlushSendQueue = function() { | ||
var item = this._sendQueue.shift(); | ||
var timeoutId = null; | ||
if (item === undefined) { | ||
socket && socket.destroy(); | ||
socket = null; | ||
this._flushingSendQueue = false; | ||
// nothing written; | ||
} else { | ||
if (socket === null) { | ||
this._sendQueue.unshift(item); | ||
this._connect((socket) => this._doFlushSendQueue(socket)); | ||
return; | ||
} | ||
socket.write(new Buffer(item.packet), () => { | ||
this._socket.write(new Buffer(item.packet), () => { | ||
if (this.requireAckResponse) { | ||
socket.once('data', (data) => { | ||
this._socket.once('data', (data) => { | ||
timeoutId && clearTimeout(timeoutId); | ||
@@ -194,6 +216,4 @@ var response = msgpack.decode(data, { codec: codec }); | ||
item.callback && item.callback(); | ||
socket && socket.destroy(); | ||
socket = null; | ||
process.nextTick(() => { | ||
this._doFlushSendQueue(socket); | ||
this._doFlushSendQueue(); // if socket is still available | ||
}); | ||
@@ -207,6 +227,4 @@ }); | ||
item.callback && item.callback(); | ||
socket && socket.destroy(); | ||
socket = null; | ||
process.nextTick(() => { | ||
this._doFlushSendQueue(socket); | ||
this._doFlushSendQueue(); // if socket is still available | ||
}); | ||
@@ -231,2 +249,3 @@ } | ||
this.on('error', (error) => { | ||
this._flushingSendQueue = false; | ||
this.internalLogger.error('Fluentd error', error); | ||
@@ -236,5 +255,4 @@ this.internalLogger.info('Fluentd will reconnect after ' + this.reconnectInterval / 1000 + ' seconds'); | ||
this.internalLogger.info('Fluentd is reconnecting...'); | ||
this._connect((socket) => { | ||
this._connect(() => { | ||
this.internalLogger.info('Fluentd reconnection finished!!'); | ||
this._flushSendQueue(socket); | ||
}); | ||
@@ -241,0 +259,0 @@ }, this.reconnectInterval); |
{ | ||
"name": "fluent-logger", | ||
"version": "2.4.2", | ||
"version": "2.4.3", | ||
"main": "./lib/index.js", | ||
@@ -5,0 +5,0 @@ "scripts": { |
@@ -522,2 +522,21 @@ var expect = require('chai').expect; | ||
// Internal behavior test. | ||
it('should not flush queue if existing connection is unavailable.', function(done){ | ||
runServer({}, function(server, finish){ | ||
var s = new sender.FluentSender('debug', {port: server.port}); | ||
s.emit('1st record', { message: '1st data' }, function(){ | ||
s._socket.destroy(); | ||
s.emit('2nd record', { message: '2nd data' }, function(){ | ||
finish(function(data){ | ||
expect(data[0].tag).to.be.equal("debug.1st record"); | ||
expect(data[0].data.message).to.be.equal("1st data"); | ||
expect(data[1].tag).to.be.equal("debug.2nd record"); | ||
expect(data[1].data.message).to.be.equal("2nd data"); | ||
done(); | ||
}); | ||
}); | ||
}); | ||
}); | ||
}); | ||
it('should write stream.', function(done) { | ||
@@ -524,0 +543,0 @@ runServer({}, function(server, finish) { |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
71730
1256