fluent-logger
Advanced tools
Comparing version 1.2.1 to 2.0.0
@@ -6,7 +6,9 @@ 'use strict'; | ||
var net = require('net'); | ||
var crypto = require('crypto'); | ||
var FluentLoggerError = require('./logger-error'); | ||
function FluentSender(tag, options){ | ||
function FluentSender(tag_prefix, options){ | ||
options = options || {}; | ||
this.tag = tag; | ||
this.tag_prefix = tag_prefix; | ||
this.host = options.host || 'localhost'; | ||
@@ -17,4 +19,7 @@ this.port = options.port || 24224; | ||
this.reconnectInterval = options.reconnectInterval || 600000; // Default is 10 minutes | ||
this.requireAckResponse = options.requireAckResponse; | ||
this.ackResponseTimeout = options.ackResponseTimeout || 190000; // Default is 190 seconds | ||
this._timeResolution = options.milliseconds ? 1 : 1000; | ||
this._socket = null; | ||
this._data = null; | ||
this._sendQueue = []; // queue for items waiting for being sent. | ||
@@ -44,2 +49,9 @@ this._sendQueueTail = -1; | ||
if (item.tag === null) { | ||
var error = new FluentLoggerError.MissingTag('tag is missing', | ||
{ tag_prefix: self.tag_prefix, label: label }); | ||
self._handleError(error, 'error', callback); | ||
return; | ||
} | ||
item.callback = callback; | ||
@@ -62,9 +74,13 @@ | ||
var self = this; | ||
if( (label != null && data != null) ){ | ||
self.emit(label, data, function(err){ | ||
if ((label != null && data != null)) { | ||
self.emit(label, data, function(err) { | ||
self._close(); | ||
callback && callback(err); | ||
if (err) { | ||
self._handleError(err, 'error', callback); | ||
} else { | ||
callback && callback(); | ||
} | ||
}); | ||
}else{ | ||
process.nextTick(function(){ | ||
} else { | ||
process.nextTick(function() { | ||
self._close(); | ||
@@ -76,4 +92,4 @@ callback && callback(); | ||
FluentSender.prototype._close = function(){ | ||
if( this._socket ){ | ||
FluentSender.prototype._close = function() { | ||
if (this._socket) { | ||
this._socket.end(); | ||
@@ -87,3 +103,10 @@ this._socket = null; | ||
var self = this; | ||
var tag = label ? [self.tag, label].join('.') : self.tag; | ||
var tag = null; | ||
if (self.tag_prefix && label) { | ||
tag = [self.tag_prefix, label].join('.'); | ||
} else if (self.tag_prefix) { | ||
tag = self.tag_prefix; | ||
} else if (label) { | ||
tag = label; | ||
} | ||
@@ -95,2 +118,9 @@ if (typeof time != "number") { | ||
var packet = [tag, time, data]; | ||
var options = {}; | ||
if (self.requireAckResponse) { | ||
options = { | ||
chunk: crypto.randomBytes(16).toString('base64') | ||
}; | ||
packet.push(options); | ||
} | ||
return { | ||
@@ -100,3 +130,4 @@ packet: msgpack.encode(packet), | ||
time: time, | ||
data: data | ||
data: data, | ||
options: options | ||
}; | ||
@@ -107,32 +138,29 @@ }; | ||
var self = this; | ||
if( self._socket === null ){ | ||
if (self._socket === null) { | ||
self._socket = new net.Socket(); | ||
self._socket.setTimeout(self.timeout); | ||
self._socket.on('error', function(err){ | ||
if( self._socket ){ | ||
self._socket.on('error', function(err) { | ||
if (self._socket) { | ||
self._socket.destroy(); | ||
self._socket = null; | ||
if( self._eventEmitter.listeners('error').length > 0 ){ | ||
self._eventEmitter.emit('error', err); | ||
} | ||
self._handleError(err, 'error', null); | ||
} | ||
}); | ||
self._socket.on('data', function(data) { | ||
self._data = data; | ||
}); | ||
if (self.path) { | ||
self._socket.connect(self.path, function() { | ||
callback(); | ||
}); | ||
self._socket.connect(self.path, callback); | ||
} else { | ||
self._socket.connect(self.port, self.host, function() { | ||
callback(); | ||
}); | ||
self._socket.connect(self.port, self.host, callback); | ||
} | ||
}else{ | ||
if( !self._socket.writable ){ | ||
} else { | ||
if (!self._socket.writable) { | ||
self._socket.destroy(); | ||
self._socket = null; | ||
process.nextTick(function(){ | ||
process.nextTick(function() { | ||
self._connect(callback); | ||
}); | ||
}else{ | ||
process.nextTick(function(){ | ||
} else { | ||
process.nextTick(function() { | ||
callback(); | ||
@@ -144,17 +172,36 @@ }); | ||
FluentSender.prototype._flushSendQueue = function(){ | ||
FluentSender.prototype._flushSendQueue = function() { | ||
var self = this; | ||
var pos = self._sendQueue.length - self._sendQueueTail - 1; | ||
var item = self._sendQueue[pos]; | ||
if( item === undefined ){ | ||
if (item === undefined) { | ||
// nothing written; | ||
}else{ | ||
} else { | ||
self._sendQueueTail--; | ||
self._sendQueue.shift(); | ||
self._socket.write(new Buffer(item.packet), function(){ | ||
if (self.requireAckResponse) { | ||
var intervalId = setInterval(function() { | ||
if (self._data) { | ||
var response = msgpack.decode(self._data); | ||
self._data = null; | ||
clearInterval(intervalId); | ||
if (response.ack !== item.options.chunk) { | ||
var error = new FluentLoggerError.ResponseError('ack in response and chunk id in sent data are different', | ||
{ ack: response.ack, chunk: item.options.chunk }); | ||
self._handleError(error, 'error', item.callback); | ||
} | ||
} | ||
}, 100); | ||
setTimeout(function() { | ||
var error = new FluentLoggerError.ResponseTimeout('ack response timeout'); | ||
self._handleError(error, 'error', item.callback); | ||
clearInterval(intervalId); | ||
}, self.ackResponseTimeout); | ||
} | ||
item.callback && item.callback(); | ||
}); | ||
process.nextTick(function(){ | ||
process.nextTick(function() { | ||
// socket is still available | ||
if( self._socket && self._socket.writable ){ | ||
if (self._socket && self._socket.writable) { | ||
self._flushSendQueue(); | ||
@@ -167,2 +214,10 @@ } | ||
FluentSender.prototype._handleError = function(error, signal, callback) { | ||
var self = this; | ||
callback && callback(error); | ||
if (self._eventEmitter.listenerCount(signal) > 0) { | ||
self._eventEmitter.emit(signal, error); | ||
} | ||
}; | ||
FluentSender.prototype._setupErrorHandler = function() { | ||
@@ -169,0 +224,0 @@ var self = this; |
@@ -6,5 +6,6 @@ 'use strict'; | ||
function MockFluentdServer(){ | ||
function MockFluentdServer(options){ | ||
var self = this; | ||
this._port = null; | ||
this._options = options; | ||
this._received = []; | ||
@@ -23,4 +24,12 @@ this._clients = {}; | ||
time: m[1], | ||
data: m[2] | ||
data: m[2], | ||
options: m[3] | ||
}); | ||
var options = m[3]; | ||
if (self._options.requireAckResponse && options && options.chunk) { | ||
var response = { | ||
ack: options.chunk | ||
}; | ||
socket.write(msgpack.encode(response)); | ||
} | ||
}); | ||
@@ -70,4 +79,4 @@ }); | ||
module.exports = { | ||
runServer: function(callback){ | ||
var server = new MockFluentdServer(); | ||
runServer: function(options, callback){ | ||
var server = new MockFluentdServer(options); | ||
server.listen(function(){ | ||
@@ -74,0 +83,0 @@ callback(server, function(_callback){ |
{ | ||
"name": "fluent-logger", | ||
"version": "1.2.1", | ||
"version": "2.0.0", | ||
"main": "./lib/index.js", | ||
@@ -5,0 +5,0 @@ "scripts": { |
@@ -37,3 +37,3 @@ # fluent-logger for Node.js | ||
// The 2nd argument can be omitted. Here is a default value for options. | ||
logger.configure('tag', { | ||
logger.configure('tag_prefix', { | ||
host: 'localhost', | ||
@@ -52,3 +52,3 @@ port: 24224, | ||
```js | ||
var logger = require('fluent-logger').createFluentSender('tag', { | ||
var logger = require('fluent-logger').createFluentSender('tag_prefix', { | ||
host: 'localhost', | ||
@@ -144,5 +144,7 @@ port: 24224, | ||
**tag** | ||
**tag_prefix** | ||
The tag string. | ||
The tag prefix string. | ||
You can specify `null` when you use `FluentSender` directly. | ||
In this case, you must specify `label` when you call `emit`. | ||
@@ -183,2 +185,10 @@ **host** | ||
**requireAckResponse** | ||
Change the protocol to at-least-once. The logger waits the ack from destination. | ||
**ackResponseTimeout** | ||
This option is used when requireAckResponse is true. The default is 190. This default value is based on popular `tcp_syn_retries`. | ||
## License | ||
@@ -185,0 +195,0 @@ |
@@ -24,3 +24,3 @@ var expect = require('chai').expect; | ||
it('should send log records', function(done){ | ||
runServer(function(server, finish){ | ||
runServer({}, function(server, finish){ | ||
var appender = log4jsSupport.appender('debug', {port: server.port}); | ||
@@ -46,3 +46,3 @@ log4js.addAppender(appender); | ||
it('should not add levelTag', function(done){ | ||
runServer(function(server, finish){ | ||
runServer({}, function(server, finish){ | ||
var appender = log4jsSupport.appender('debug', {port: server.port, levelTag:false}); | ||
@@ -68,3 +68,3 @@ log4js.addAppender(appender); | ||
it('should not crash when fluentd is not running', function(done){ | ||
runServer(function(server, finish){ | ||
runServer({}, function(server, finish){ | ||
var appender = log4jsSupport.appender('debug', {port: server.port}); | ||
@@ -85,3 +85,3 @@ log4js.addAppender(appender); | ||
it('should listen error event when fluentd is down', function(done){ | ||
runServer(function(server, finish){ | ||
runServer({}, function(server, finish){ | ||
var appender = log4jsSupport.appender('debug', {port: server.port}); | ||
@@ -88,0 +88,0 @@ appender.on('error', function(err) { |
@@ -9,3 +9,3 @@ var expect = require('chai').expect; | ||
it('should send records', function(done){ | ||
runServer(function(server, finish){ | ||
runServer({}, function(server, finish){ | ||
var s1 = new sender.FluentSender('debug', { port: server.port }); | ||
@@ -47,3 +47,3 @@ var emits = []; | ||
it('should assure the sequence.', function(done){ | ||
runServer(function(server, finish){ | ||
runServer({}, function(server, finish){ | ||
var s = new sender.FluentSender('debug', {port: server.port}); | ||
@@ -67,3 +67,3 @@ s.emit('1st record', '1st data'); | ||
it('should allow to emit with a custom timestamp', function(done){ | ||
runServer(function(server, finish){ | ||
runServer({}, function(server, finish){ | ||
var s = new sender.FluentSender('debug', {port: server.port}); | ||
@@ -83,3 +83,3 @@ var timestamp = new Date(2222, 12, 04); | ||
it('should allow to emit with a custom numeric timestamp', function(done){ | ||
runServer(function(server, finish){ | ||
runServer({}, function(server, finish){ | ||
var s = new sender.FluentSender('debug', {port: server.port}); | ||
@@ -102,3 +102,3 @@ var timestamp = Math.floor(new Date().getTime() / 1000); | ||
expect(err.code).to.be.equal('ECONNREFUSED'); | ||
runServer(function(server, finish){ | ||
runServer({}, function(server, finish){ | ||
s.port = server.port; | ||
@@ -122,3 +122,3 @@ s.emit('2nd record', '2nd data'); | ||
it('should reconnect when fluentd close the client socket suddenly', function(done){ | ||
runServer(function(server, finish){ | ||
runServer({}, function(server, finish){ | ||
var s = new sender.FluentSender('debug', {port: server.port}); | ||
@@ -131,3 +131,3 @@ s.emit('foo', 'bar', function(){ | ||
if( !(s._socket && s._socket.writable) ){ | ||
runServer(function(_server2, finish){ | ||
runServer({}, function(_server2, finish){ | ||
s.port = _server2.port; // in actuall case, s.port does not need to be updated. | ||
@@ -153,2 +153,48 @@ s.emit('bar', 'hoge', function(){ | ||
it('should send records with requireAckResponse', function(done) { | ||
runServer({requireAckResponse: true}, function(server, finish) { | ||
var s1 = new sender.FluentSender('debug', { | ||
port: server.port, | ||
requireAckResponse: true | ||
}); | ||
var emits = []; | ||
function emit(k){ | ||
emits.push(function(done){ s1.emit('record', k, done); }); | ||
} | ||
for (var i=0; i<10; i++) { | ||
emit(i); | ||
} | ||
emits.push(function(){ | ||
finish(function(data){ | ||
expect(data.length).to.be.equal(10); | ||
for(var i=0; i<10; i++){ | ||
expect(data[i].tag).to.be.equal("debug.record"); | ||
expect(data[i].data).to.be.equal(i); | ||
expect(data[i].options.chunk).to.be.equal(server.messages[i].options.chunk); | ||
} | ||
done(); | ||
}); | ||
}); | ||
async.series(emits); | ||
}); | ||
}); | ||
it('should send records ackResponseTimeout', function(done) { | ||
runServer({requireAckResponse: false }, function(server, finish) { | ||
var s1 = new sender.FluentSender('debug', { | ||
port: server.port, | ||
requireAckResponse: false, | ||
ackResponseTimeout: 1000 | ||
}); | ||
s1.on('response-timeout', function(error) { | ||
expect(error).to.be.equal('ack response timeout'); | ||
}); | ||
s1.emit('record', 1); | ||
finish(function(data) { | ||
expect(data.length).to.be.equal(1); | ||
done(); | ||
}); | ||
}); | ||
}); | ||
it('should set error handler', function(done){ | ||
@@ -189,3 +235,3 @@ var s = new sender.FluentSender('debug', { | ||
tag: 'debug.foo', | ||
data: { bar: 1 }, | ||
data: { bar: 1 } | ||
} | ||
@@ -253,3 +299,3 @@ }, | ||
it('should send records with '+testCase.name+' arguments', function(done){ | ||
runServer(function(server, finish){ | ||
runServer({}, function(server, finish){ | ||
var s1 = new sender.FluentSender('debug', { port: server.port }); | ||
@@ -278,2 +324,115 @@ s1.emit.apply(s1, testCase.args); | ||
[ | ||
{ | ||
name: 'tag and record', | ||
args: ['foo', { bar: 1 }], | ||
expect: { | ||
tag: 'foo', | ||
data: { bar: 1 } | ||
} | ||
}, | ||
{ | ||
name: 'tag, record and time', | ||
args: ['foo', { bar: 1 }, 12345], | ||
expect: { | ||
tag: 'foo', | ||
data: { bar: 1 }, | ||
time: 12345 | ||
} | ||
}, | ||
{ | ||
name: 'tag, record and callback', | ||
args: ['foo', { bar: 1 }, function cb() { cb.called = true; }], | ||
expect: { | ||
tag: 'foo', | ||
data: { bar: 1 } | ||
} | ||
}, | ||
{ | ||
name: 'tag, record, time and callback', | ||
args: ['foo', { bar: 1 }, 12345, function cb() { cb.called = true; }], | ||
expect: { | ||
tag: 'foo', | ||
data: { bar: 1 }, | ||
time: 12345 | ||
} | ||
} | ||
].forEach(function(testCase) { | ||
it('should send records with '+testCase.name+' arguments without a default tag', function(done){ | ||
runServer({}, function(server, finish){ | ||
var s1 = new sender.FluentSender(null, { port: server.port }); | ||
s1.emit.apply(s1, testCase.args); | ||
finish(function(data){ | ||
expect(data[0].tag).to.be.equal(testCase.expect.tag); | ||
expect(data[0].data).to.be.deep.equal(testCase.expect.data); | ||
if (testCase.expect.time) { | ||
expect(data[0].time).to.be.deep.equal(testCase.expect.time); | ||
} | ||
testCase.args.forEach(function(arg) { | ||
if (typeof arg === "function") { | ||
expect(arg.called, "callback must be called").to.be.true; | ||
} | ||
}); | ||
done(); | ||
}); | ||
}); | ||
}); | ||
}); | ||
[ | ||
{ | ||
name: 'record', | ||
args: [{ bar: 1 }] | ||
}, | ||
{ | ||
name: 'record and time', | ||
args: [{ bar: 1 }, 12345] | ||
}, | ||
{ | ||
name: 'record and callback', | ||
args: [{ bar: 1 }, function cb(){ cb.called = true; }] | ||
}, | ||
{ | ||
name: 'record, time and callback', | ||
args: [{ bar: 1 }, 12345, function cb(){ cb.called = true; }] | ||
}, | ||
{ | ||
name: 'record and date object', | ||
args: [{ bar: 1 }, new Date(1384434467952)] | ||
} | ||
].forEach(function(testCase) { | ||
it('should not send records with '+testCase.name+' arguments without a default tag', function(done){ | ||
runServer({}, function(server, finish){ | ||
var s1 = new sender.FluentSender(null, { port: server.port }); | ||
s1.on('error', function(error) { | ||
expect(error.name).to.be.equal('MissingTagError'); | ||
}); | ||
s1.emit.apply(s1, testCase.args); | ||
finish(function(data){ | ||
expect(data.length).to.be.equal(0); | ||
testCase.args.forEach(function(arg) { | ||
if (typeof arg === "function") { | ||
expect(arg.called, "callback must be called").to.be.true; | ||
} | ||
}); | ||
done(); | ||
}); | ||
}); | ||
}); | ||
}); | ||
it('should set max listeners', function(done){ | ||
@@ -295,3 +454,3 @@ var s = new sender.FluentSender('debug'); | ||
it('should not flush queue if existing connection is unavailable.', function(done){ | ||
runServer(function(server, finish){ | ||
runServer({}, function(server, finish){ | ||
var s = new sender.FluentSender('debug', {port: server.port}); | ||
@@ -298,0 +457,0 @@ s.emit('1st record', '1st data', function(){ |
@@ -17,3 +17,3 @@ var expect = require('chai').expect; | ||
it('should send log records', function(done){ | ||
runServer(function(server, finish){ | ||
runServer({}, function(server, finish){ | ||
var logger = new (winston.Logger)({ | ||
@@ -20,0 +20,0 @@ transports: [ |
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
49869
15
1020
200