fluent-logger
Advanced tools
Comparing version 2.4.1 to 2.4.2
@@ -27,5 +27,3 @@ 'use strict'; | ||
this._timeResolution = options.milliseconds ? 1 : 1000; | ||
this._socket = null; | ||
this._data = null; | ||
this._sendQueue = []; // queue for items waiting for being sent. | ||
this._sendQueue = []; // queue for items waiting for being sent. | ||
this._eventEmitter = new EventEmitter(); | ||
@@ -38,3 +36,3 @@ } | ||
// Label must be string always | ||
if (typeof args[0] === "string") label = args.shift(); | ||
if (typeof args[0] === 'string') label = args.shift(); | ||
@@ -45,11 +43,9 @@ // Data can be almost anything | ||
// Date can be either timestamp number or Date object | ||
if (typeof args[0] !== "function") timestamp = args.shift(); | ||
if (typeof args[0] !== 'function') timestamp = args.shift(); | ||
// Last argument is an optional callback | ||
if (typeof args[0] === "function") callback = args.shift(); | ||
if (typeof args[0] === 'function') callback = args.shift(); | ||
var item = this._makePacketItem(label, data, timestamp); | ||
var self = this; | ||
var item = self._makePacketItem(label, data, timestamp); | ||
var error; | ||
@@ -59,12 +55,12 @@ var options; | ||
options = { | ||
tag_prefix: self.tag_prefix, | ||
tag_prefix: this.tag_prefix, | ||
label: label | ||
}; | ||
error = new FluentLoggerError.MissingTag('tag is missing', options); | ||
self._handleEvent('error', error, callback); | ||
this._handleEvent('error', error, callback); | ||
return; | ||
} | ||
if (typeof item.data !== "object") { | ||
if (typeof item.data !== 'object') { | ||
options = { | ||
tag_prefix: self.tag_prefix, | ||
tag_prefix: this.tag_prefix, | ||
label: label, | ||
@@ -74,3 +70,3 @@ record: item.data | ||
error = new FluentLoggerError.DataTypeError('data must be an object', options); | ||
self._handleEvent('error', error, callback); | ||
this._handleEvent('error', error, callback); | ||
return; | ||
@@ -81,5 +77,5 @@ } | ||
self._sendQueue.push(item); | ||
self._connect(function(){ | ||
self._flushSendQueue(); | ||
this._sendQueue.push(item); | ||
this._connect((socket) => { | ||
this._flushSendQueue(socket); | ||
}); | ||
@@ -95,8 +91,7 @@ }; | ||
FluentSender.prototype.end = function(label, data, callback){ | ||
var self = this; | ||
if ((label != null && data != null)) { | ||
self.emit(label, data, function(err) { | ||
self._close(); | ||
this.emit(label, data, (err) => { | ||
this._close(); | ||
if (err) { | ||
self._handleEvent('error', err, callback); | ||
this._handleEvent('error', err, callback); | ||
} else { | ||
@@ -107,4 +102,4 @@ callback && callback(); | ||
} else { | ||
process.nextTick(function() { | ||
self._close(); | ||
process.nextTick(() => { | ||
this._close(); | ||
callback && callback(); | ||
@@ -116,6 +111,2 @@ }); | ||
FluentSender.prototype._close = function() { | ||
if (this._socket) { | ||
this._socket.end(); | ||
this._socket = null; | ||
} | ||
}; | ||
@@ -125,8 +116,7 @@ | ||
FluentSender.prototype._makePacketItem = function(label, data, time){ | ||
var self = this; | ||
var tag = null; | ||
if (self.tag_prefix && label) { | ||
tag = [self.tag_prefix, label].join('.'); | ||
} else if (self.tag_prefix) { | ||
tag = self.tag_prefix; | ||
if (this.tag_prefix && label) { | ||
tag = [this.tag_prefix, label].join('.'); | ||
} else if (this.tag_prefix) { | ||
tag = this.tag_prefix; | ||
} else if (label) { | ||
@@ -136,3 +126,3 @@ tag = label; | ||
if (typeof time !== "number" && !(time instanceof EventTime)) { | ||
if (typeof time !== 'number' && !(time instanceof EventTime)) { | ||
time = Math.floor((time ? time.getTime() : Date.now()) / this._timeResolution); | ||
@@ -143,3 +133,3 @@ } | ||
var options = {}; | ||
if (self.requireAckResponse) { | ||
if (this.requireAckResponse) { | ||
options = { | ||
@@ -160,55 +150,26 @@ chunk: crypto.randomBytes(16).toString('base64') | ||
FluentSender.prototype._connect = function(callback){ | ||
var self = this; | ||
if (self._socket === null) { | ||
self._socket = new net.Socket(); | ||
self._socket.setTimeout(self.timeout); | ||
self._socket.on('error', function(err) { | ||
if (self._socket) { | ||
self._socket.destroy(); | ||
self._socket = null; | ||
self._handleEvent('error', err); | ||
} | ||
}); | ||
self._socket.on('connect', function() { | ||
self._handleEvent('connect'); | ||
}); | ||
self._socket.on('data', function(data) { | ||
self._data = data; | ||
}); | ||
if (self.path) { | ||
self._socket.connect(self.path, callback); | ||
} else { | ||
self._socket.connect(self.port, self.host, callback); | ||
} | ||
var socket = new net.Socket(); | ||
socket.setTimeout(this.timeout); | ||
socket.on('error', (err) => { | ||
this._handleEvent('error', err); | ||
}); | ||
socket.on('connect', () => { | ||
this._handleEvent('connect'); | ||
}); | ||
if (this.path) { | ||
socket.connect(this.path, () => callback(socket)); | ||
} else { | ||
if (!self._socket.writable) { | ||
self._socket.destroy(); | ||
self._socket = null; | ||
process.nextTick(function() { | ||
self._connect(callback); | ||
}); | ||
} else { | ||
process.nextTick(function() { | ||
callback(); | ||
}); | ||
} | ||
socket.connect(this.port, this.host, () => callback(socket)); | ||
} | ||
}; | ||
FluentSender.prototype._flushSendQueue = function() { | ||
var self = this; | ||
if (self._flushingSendQueue) | ||
FluentSender.prototype._flushSendQueue = function(socket) { | ||
if (this._sendQueue.length === 0) | ||
return; | ||
self._flushingSendQueue = true; | ||
process.nextTick(function waitToWrite() { | ||
if (!self._socket) { | ||
self._flushingSendQueue = false; | ||
return; | ||
} | ||
if (self._socket.writable) { | ||
self._doFlushSendQueue(); | ||
process.nextTick(() => { | ||
if (socket.writable && socket.readable) { | ||
this._doFlushSendQueue(socket); | ||
} else { | ||
process.nextTick(waitToWrite); | ||
process.nextTick(arguments.callee); | ||
} | ||
@@ -218,37 +179,41 @@ }); | ||
FluentSender.prototype._doFlushSendQueue = function() { | ||
var self = this; | ||
var item = self._sendQueue.shift(); | ||
FluentSender.prototype._doFlushSendQueue = function(socket) { | ||
var item = this._sendQueue.shift(); | ||
var timeoutId = null; | ||
if (item === undefined) { | ||
self._flushingSendQueue = false; | ||
// nothing written; | ||
socket && socket.destroy(); | ||
socket = null; | ||
} else { | ||
self._socket.write(new Buffer(item.packet), function(){ | ||
if (self.requireAckResponse) { | ||
var intervalId = setInterval(function() { | ||
if (self._data) { | ||
var response = msgpack.decode(self._data, { codec: codec }); | ||
self._data = null; | ||
clearInterval(intervalId); | ||
clearTimeout(timeoutId); | ||
if (response.ack !== item.options.chunk) { | ||
var error = new FluentLoggerError.ResponseError('ack in response and chunk id in sent data are different', | ||
{ ack: response.ack, chunk: item.options.chunk }); | ||
self._handleEvent('error', error, item.callback); | ||
} | ||
item.callback && item.callback(); | ||
process.nextTick(function() { | ||
self._doFlushSendQueue(); // if socket is still available | ||
}); | ||
if (socket === null) { | ||
this._sendQueue.unshift(item); | ||
this._connect((socket) => this._doFlushSendQueue(socket)); | ||
return; | ||
} | ||
socket.write(new Buffer(item.packet), () => { | ||
if (this.requireAckResponse) { | ||
socket.once('data', (data) => { | ||
timeoutId && clearTimeout(timeoutId); | ||
var response = msgpack.decode(data, { codec: codec }); | ||
if (response.ack !== item.options.chunk) { | ||
var error = new FluentLoggerError.ResponseError('ack in response and chunk id in sent data are different', | ||
{ ack: response.ack, chunk: item.options.chunk }); | ||
this._handleEvent('error', error, item.callback); | ||
} | ||
}, 10); | ||
var timeoutId = setTimeout(function() { | ||
item.callback && item.callback(); | ||
socket && socket.destroy(); | ||
socket = null; | ||
process.nextTick(() => { | ||
this._doFlushSendQueue(socket); | ||
}); | ||
}); | ||
timeoutId = setTimeout(() => { | ||
var error = new FluentLoggerError.ResponseTimeout('ack response timeout'); | ||
self._handleEvent('error', error, item.callback); | ||
clearInterval(intervalId); | ||
}, self.ackResponseTimeout); | ||
this._handleEvent('error', error, item.callback); | ||
}, this.ackResponseTimeout); | ||
} else { | ||
item.callback && item.callback(); | ||
process.nextTick(function() { | ||
self._doFlushSendQueue(); // if socket is still available | ||
socket && socket.destroy(); | ||
socket = null; | ||
process.nextTick(() => { | ||
this._doFlushSendQueue(socket); | ||
}); | ||
@@ -262,6 +227,5 @@ } | ||
FluentSender.prototype._handleEvent = function _handleEvent(signal, data, callback) { | ||
var self = this; | ||
callback && callback(data); | ||
if (self._eventEmitter.listenerCount(signal) > 0) { | ||
self._eventEmitter.emit(signal, data); | ||
if (this._eventEmitter.listenerCount(signal) > 0) { | ||
this._eventEmitter.emit(signal, data); | ||
} | ||
@@ -271,15 +235,15 @@ }; | ||
FluentSender.prototype._setupErrorHandler = function _setupErrorHandler() { | ||
var self = this; | ||
if (!self.reconnectInterval) { | ||
if (!this.reconnectInterval) { | ||
return; | ||
} | ||
self.on('error', function(error) { | ||
self.internalLogger.error('Fluentd error', error); | ||
self.internalLogger.info('Fluentd will reconnect after ' + self.reconnectInterval / 1000 + ' seconds'); | ||
setTimeout(function() { | ||
self.internalLogger.info("Fluentd is reconnecting..."); | ||
self._connect(function() { | ||
self.internalLogger.info('Fluentd reconnection finished!!'); | ||
this.on('error', (error) => { | ||
this.internalLogger.error('Fluentd error', error); | ||
this.internalLogger.info('Fluentd will reconnect after ' + this.reconnectInterval / 1000 + ' seconds'); | ||
setTimeout(() => { | ||
this.internalLogger.info('Fluentd is reconnecting...'); | ||
this._connect((socket) => { | ||
this.internalLogger.info('Fluentd reconnection finished!!'); | ||
this._flushSendQueue(socket); | ||
}); | ||
}, self.reconnectInterval); | ||
}, this.reconnectInterval); | ||
}); | ||
@@ -299,8 +263,7 @@ }; | ||
var defaultEncoding = options.encoding || 'UTF-8'; | ||
var self = this; | ||
var writable = new stream.Writable(); | ||
var dataString = ''; | ||
writable._write = function(chunk, encoding, callback) { | ||
writable._write = (chunk, encoding, callback) => { | ||
var dataArray = chunk.toString(defaultEncoding).split(/\n/); | ||
function next() { | ||
var next = () => { | ||
if (dataArray.length) { | ||
@@ -313,5 +276,5 @@ dataString += dataArray.shift(); | ||
} | ||
self.emit(label, { message: dataString }, function(err) { | ||
this.emit(label, { message: dataString }, (err) => { | ||
if (err) { | ||
self._handleEvent('error', err, callback); | ||
this._handleEvent('error', err, callback); | ||
return; | ||
@@ -318,0 +281,0 @@ } |
@@ -6,4 +6,3 @@ 'use strict'; | ||
function MockFluentdServer(options){ | ||
var self = this; | ||
function MockFluentdServer(options) { | ||
this._port = null; | ||
@@ -13,11 +12,11 @@ this._options = options; | ||
this._clients = {}; | ||
this._server = net.createServer(function(socket){ | ||
this._server = net.createServer((socket) => { | ||
var clientKey = socket.remoteAddress + ":" + socket.remotePort; | ||
self._clients[clientKey] = socket; | ||
socket.on('end', function(){ | ||
delete(self._clients[clientKey]); | ||
this._clients[clientKey] = socket; | ||
socket.on('end', () => { | ||
delete(this._clients[clientKey]); | ||
}); | ||
var stream = msgpack.createDecodeStream(); | ||
socket.pipe(stream).on('data', function(m){ | ||
self._received.push({ | ||
socket.pipe(stream).on('data', (m) => { | ||
this._received.push({ | ||
tag: m[0], | ||
@@ -29,3 +28,3 @@ time: m[1], | ||
var options = m[3]; | ||
if (self._options.requireAckResponse && options && options.chunk) { | ||
if (this._options.requireAckResponse && options && options.chunk) { | ||
var response = { | ||
@@ -40,7 +39,7 @@ ack: options.chunk | ||
MockFluentdServer.prototype.__defineGetter__('port', function(){ | ||
MockFluentdServer.prototype.__defineGetter__('port', function() { | ||
return this._port; | ||
}); | ||
MockFluentdServer.prototype.__defineGetter__('messages', function(){ | ||
MockFluentdServer.prototype.__defineGetter__('messages', function() { | ||
return this._received; | ||
@@ -51,5 +50,4 @@ }); | ||
MockFluentdServer.prototype.listen = function(callback){ | ||
var self = this; | ||
this._server.listen(function(){ | ||
self._port = self._server.address().port; | ||
this._server.listen(() => { | ||
this._port = this._server.address().port; | ||
callback(); | ||
@@ -59,33 +57,21 @@ }); | ||
MockFluentdServer.prototype.close = function(callback){ | ||
var self = this; | ||
if( process.version.match(/^v0\.6\./) ){ // 0.6.x does not support callback for server.close(); | ||
this._server.close(); | ||
(function waitForClose(){ | ||
if( Object.keys(self._clients).length > 0 ){ | ||
setTimeout(waitForClose, 100); | ||
}else{ | ||
callback(); | ||
} | ||
})(); | ||
}else{ | ||
this._server.close(function(){ | ||
callback(); | ||
}); | ||
MockFluentdServer.prototype.close = function(callback) { | ||
this._server.close(function() { | ||
callback(); | ||
}); | ||
for (var i in this._clients) { | ||
this._clients[i].end(); | ||
// this._clients[i].destroy(); | ||
} | ||
for(var i in self._clients){ | ||
self._clients[i].end(); | ||
// self._clients[i].destroy(); | ||
} | ||
}; | ||
module.exports = { | ||
runServer: function(options, callback){ | ||
runServer: function(options, callback) { | ||
var server = new MockFluentdServer(options); | ||
server.listen(function(){ | ||
callback(server, function(_callback){ | ||
server.listen(function() { | ||
callback(server, function(_callback) { | ||
// wait 100 ms to receive all messages and then close | ||
setTimeout(function(){ | ||
setTimeout(function() { | ||
var messages = server.messages; | ||
server.close(function(){ | ||
server.close(function() { | ||
_callback && _callback(messages); | ||
@@ -92,0 +78,0 @@ }); |
@@ -32,4 +32,3 @@ 'use strict'; | ||
fluentTransport.prototype.log = function(level, message, meta, callback) { | ||
var self = this; | ||
var sender = self.sender; | ||
var sender = this.sender; | ||
@@ -43,7 +42,7 @@ var data = { | ||
sender.emit(data, function(error) { | ||
sender.emit(data, (error) => { | ||
if (error) { | ||
self.emit('error', error); | ||
this.emit('error', error); | ||
} else { | ||
self.emit('logged'); | ||
this.emit('logged'); | ||
} | ||
@@ -50,0 +49,0 @@ }); |
{ | ||
"name": "fluent-logger", | ||
"version": "2.4.1", | ||
"version": "2.4.2", | ||
"main": "./lib/index.js", | ||
@@ -5,0 +5,0 @@ "scripts": { |
@@ -522,21 +522,2 @@ var expect = require('chai').expect; | ||
// Internal behavior test. | ||
it('should not flush queue if existing connection is unavailable.', function(done){ | ||
runServer({}, function(server, finish){ | ||
var s = new sender.FluentSender('debug', {port: server.port}); | ||
s.emit('1st record', { message: '1st data' }, function(){ | ||
s._socket.destroy(); | ||
s.emit('2nd record', { message: '2nd data' }, function(){ | ||
finish(function(data){ | ||
expect(data[0].tag).to.be.equal("debug.1st record"); | ||
expect(data[0].data.message).to.be.equal("1st data"); | ||
expect(data[1].tag).to.be.equal("debug.2nd record"); | ||
expect(data[1].data.message).to.be.equal("2nd data"); | ||
done(); | ||
}); | ||
}); | ||
}); | ||
}); | ||
}); | ||
it('should write stream.', function(done) { | ||
@@ -543,0 +524,0 @@ runServer({}, function(server, finish) { |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
70626
1221