Socket
Socket
Sign inDemoInstall

fluent-logger

Package Overview
Dependencies
Maintainers
4
Versions
46
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

fluent-logger - npm Package Compare versions

Comparing version 1.2.1 to 2.0.0

lib/logger-error.js

121

lib/sender.js

@@ -6,7 +6,9 @@ 'use strict';

var net = require('net');
var crypto = require('crypto');
var FluentLoggerError = require('./logger-error');
function FluentSender(tag, options){
function FluentSender(tag_prefix, options){
options = options || {};
this.tag = tag;
this.tag_prefix = tag_prefix;
this.host = options.host || 'localhost';

@@ -17,4 +19,7 @@ this.port = options.port || 24224;

this.reconnectInterval = options.reconnectInterval || 600000; // Default is 10 minutes
this.requireAckResponse = options.requireAckResponse;
this.ackResponseTimeout = options.ackResponseTimeout || 190000; // Default is 190 seconds
this._timeResolution = options.milliseconds ? 1 : 1000;
this._socket = null;
this._data = null;
this._sendQueue = []; // queue for items waiting for being sent.

@@ -44,2 +49,9 @@ this._sendQueueTail = -1;

if (item.tag === null) {
var error = new FluentLoggerError.MissingTag('tag is missing',
{ tag_prefix: self.tag_prefix, label: label });
self._handleError(error, 'error', callback);
return;
}
item.callback = callback;

@@ -62,9 +74,13 @@

var self = this;
if( (label != null && data != null) ){
self.emit(label, data, function(err){
if ((label != null && data != null)) {
self.emit(label, data, function(err) {
self._close();
callback && callback(err);
if (err) {
self._handleError(err, 'error', callback);
} else {
callback && callback();
}
});
}else{
process.nextTick(function(){
} else {
process.nextTick(function() {
self._close();

@@ -76,4 +92,4 @@ callback && callback();

FluentSender.prototype._close = function(){
if( this._socket ){
FluentSender.prototype._close = function() {
if (this._socket) {
this._socket.end();

@@ -87,3 +103,10 @@ this._socket = null;

var self = this;
var tag = label ? [self.tag, label].join('.') : self.tag;
var tag = null;
if (self.tag_prefix && label) {
tag = [self.tag_prefix, label].join('.');
} else if (self.tag_prefix) {
tag = self.tag_prefix;
} else if (label) {
tag = label;
}

@@ -95,2 +118,9 @@ if (typeof time != "number") {

var packet = [tag, time, data];
var options = {};
if (self.requireAckResponse) {
options = {
chunk: crypto.randomBytes(16).toString('base64')
};
packet.push(options);
}
return {

@@ -100,3 +130,4 @@ packet: msgpack.encode(packet),

time: time,
data: data
data: data,
options: options
};

@@ -107,32 +138,29 @@ };

var self = this;
if( self._socket === null ){
if (self._socket === null) {
self._socket = new net.Socket();
self._socket.setTimeout(self.timeout);
self._socket.on('error', function(err){
if( self._socket ){
self._socket.on('error', function(err) {
if (self._socket) {
self._socket.destroy();
self._socket = null;
if( self._eventEmitter.listeners('error').length > 0 ){
self._eventEmitter.emit('error', err);
}
self._handleError(err, 'error', null);
}
});
self._socket.on('data', function(data) {
self._data = data;
});
if (self.path) {
self._socket.connect(self.path, function() {
callback();
});
self._socket.connect(self.path, callback);
} else {
self._socket.connect(self.port, self.host, function() {
callback();
});
self._socket.connect(self.port, self.host, callback);
}
}else{
if( !self._socket.writable ){
} else {
if (!self._socket.writable) {
self._socket.destroy();
self._socket = null;
process.nextTick(function(){
process.nextTick(function() {
self._connect(callback);
});
}else{
process.nextTick(function(){
} else {
process.nextTick(function() {
callback();

@@ -144,17 +172,36 @@ });

FluentSender.prototype._flushSendQueue = function(){
FluentSender.prototype._flushSendQueue = function() {
var self = this;
var pos = self._sendQueue.length - self._sendQueueTail - 1;
var item = self._sendQueue[pos];
if( item === undefined ){
if (item === undefined) {
// nothing written;
}else{
} else {
self._sendQueueTail--;
self._sendQueue.shift();
self._socket.write(new Buffer(item.packet), function(){
if (self.requireAckResponse) {
var intervalId = setInterval(function() {
if (self._data) {
var response = msgpack.decode(self._data);
self._data = null;
clearInterval(intervalId);
if (response.ack !== item.options.chunk) {
var error = new FluentLoggerError.ResponseError('ack in response and chunk id in sent data are different',
{ ack: response.ack, chunk: item.options.chunk });
self._handleError(error, 'error', item.callback);
}
}
}, 100);
setTimeout(function() {
var error = new FluentLoggerError.ResponseTimeout('ack response timeout');
self._handleError(error, 'error', item.callback);
clearInterval(intervalId);
}, self.ackResponseTimeout);
}
item.callback && item.callback();
});
process.nextTick(function(){
process.nextTick(function() {
// socket is still available
if( self._socket && self._socket.writable ){
if (self._socket && self._socket.writable) {
self._flushSendQueue();

@@ -167,2 +214,10 @@ }

FluentSender.prototype._handleError = function(error, signal, callback) {
var self = this;
callback && callback(error);
if (self._eventEmitter.listenerCount(signal) > 0) {
self._eventEmitter.emit(signal, error);
}
};
FluentSender.prototype._setupErrorHandler = function() {

@@ -169,0 +224,0 @@ var self = this;

@@ -6,5 +6,6 @@ 'use strict';

function MockFluentdServer(){
function MockFluentdServer(options){
var self = this;
this._port = null;
this._options = options;
this._received = [];

@@ -23,4 +24,12 @@ this._clients = {};

time: m[1],
data: m[2]
data: m[2],
options: m[3]
});
var options = m[3];
if (self._options.requireAckResponse && options && options.chunk) {
var response = {
ack: options.chunk
};
socket.write(msgpack.encode(response));
}
});

@@ -70,4 +79,4 @@ });

module.exports = {
runServer: function(callback){
var server = new MockFluentdServer();
runServer: function(options, callback){
var server = new MockFluentdServer(options);
server.listen(function(){

@@ -74,0 +83,0 @@ callback(server, function(_callback){

{
"name": "fluent-logger",
"version": "1.2.1",
"version": "2.0.0",
"main": "./lib/index.js",

@@ -5,0 +5,0 @@ "scripts": {

@@ -37,3 +37,3 @@ # fluent-logger for Node.js

// The 2nd argument can be omitted. Here is a default value for options.
logger.configure('tag', {
logger.configure('tag_prefix', {
host: 'localhost',

@@ -52,3 +52,3 @@ port: 24224,

```js
var logger = require('fluent-logger').createFluentSender('tag', {
var logger = require('fluent-logger').createFluentSender('tag_prefix', {
host: 'localhost',

@@ -144,5 +144,7 @@ port: 24224,

**tag**
**tag_prefix**
The tag string.
The tag prefix string.
You can specify `null` when you use `FluentSender` directly.
In this case, you must specify `label` when you call `emit`.

@@ -183,2 +185,10 @@ **host**

**requireAckResponse**
Change the protocol to at-least-once. The logger waits the ack from destination.
**ackResponseTimeout**
This option is used when requireAckResponse is true. The default is 190. This default value is based on popular `tcp_syn_retries`.
## License

@@ -185,0 +195,0 @@

@@ -24,3 +24,3 @@ var expect = require('chai').expect;

it('should send log records', function(done){
runServer(function(server, finish){
runServer({}, function(server, finish){
var appender = log4jsSupport.appender('debug', {port: server.port});

@@ -46,3 +46,3 @@ log4js.addAppender(appender);

it('should not add levelTag', function(done){
runServer(function(server, finish){
runServer({}, function(server, finish){
var appender = log4jsSupport.appender('debug', {port: server.port, levelTag:false});

@@ -68,3 +68,3 @@ log4js.addAppender(appender);

it('should not crash when fluentd is not running', function(done){
runServer(function(server, finish){
runServer({}, function(server, finish){
var appender = log4jsSupport.appender('debug', {port: server.port});

@@ -85,3 +85,3 @@ log4js.addAppender(appender);

it('should listen error event when fluentd is down', function(done){
runServer(function(server, finish){
runServer({}, function(server, finish){
var appender = log4jsSupport.appender('debug', {port: server.port});

@@ -88,0 +88,0 @@ appender.on('error', function(err) {

@@ -9,3 +9,3 @@ var expect = require('chai').expect;

it('should send records', function(done){
runServer(function(server, finish){
runServer({}, function(server, finish){
var s1 = new sender.FluentSender('debug', { port: server.port });

@@ -47,3 +47,3 @@ var emits = [];

it('should assure the sequence.', function(done){
runServer(function(server, finish){
runServer({}, function(server, finish){
var s = new sender.FluentSender('debug', {port: server.port});

@@ -67,3 +67,3 @@ s.emit('1st record', '1st data');

it('should allow to emit with a custom timestamp', function(done){
runServer(function(server, finish){
runServer({}, function(server, finish){
var s = new sender.FluentSender('debug', {port: server.port});

@@ -83,3 +83,3 @@ var timestamp = new Date(2222, 12, 04);

it('should allow to emit with a custom numeric timestamp', function(done){
runServer(function(server, finish){
runServer({}, function(server, finish){
var s = new sender.FluentSender('debug', {port: server.port});

@@ -102,3 +102,3 @@ var timestamp = Math.floor(new Date().getTime() / 1000);

expect(err.code).to.be.equal('ECONNREFUSED');
runServer(function(server, finish){
runServer({}, function(server, finish){
s.port = server.port;

@@ -122,3 +122,3 @@ s.emit('2nd record', '2nd data');

it('should reconnect when fluentd close the client socket suddenly', function(done){
runServer(function(server, finish){
runServer({}, function(server, finish){
var s = new sender.FluentSender('debug', {port: server.port});

@@ -131,3 +131,3 @@ s.emit('foo', 'bar', function(){

if( !(s._socket && s._socket.writable) ){
runServer(function(_server2, finish){
runServer({}, function(_server2, finish){
s.port = _server2.port; // in actuall case, s.port does not need to be updated.

@@ -153,2 +153,48 @@ s.emit('bar', 'hoge', function(){

it('should send records with requireAckResponse', function(done) {
runServer({requireAckResponse: true}, function(server, finish) {
var s1 = new sender.FluentSender('debug', {
port: server.port,
requireAckResponse: true
});
var emits = [];
function emit(k){
emits.push(function(done){ s1.emit('record', k, done); });
}
for (var i=0; i<10; i++) {
emit(i);
}
emits.push(function(){
finish(function(data){
expect(data.length).to.be.equal(10);
for(var i=0; i<10; i++){
expect(data[i].tag).to.be.equal("debug.record");
expect(data[i].data).to.be.equal(i);
expect(data[i].options.chunk).to.be.equal(server.messages[i].options.chunk);
}
done();
});
});
async.series(emits);
});
});
it('should send records ackResponseTimeout', function(done) {
runServer({requireAckResponse: false }, function(server, finish) {
var s1 = new sender.FluentSender('debug', {
port: server.port,
requireAckResponse: false,
ackResponseTimeout: 1000
});
s1.on('response-timeout', function(error) {
expect(error).to.be.equal('ack response timeout');
});
s1.emit('record', 1);
finish(function(data) {
expect(data.length).to.be.equal(1);
done();
});
});
});
it('should set error handler', function(done){

@@ -189,3 +235,3 @@ var s = new sender.FluentSender('debug', {

tag: 'debug.foo',
data: { bar: 1 },
data: { bar: 1 }
}

@@ -253,3 +299,3 @@ },

it('should send records with '+testCase.name+' arguments', function(done){
runServer(function(server, finish){
runServer({}, function(server, finish){
var s1 = new sender.FluentSender('debug', { port: server.port });

@@ -278,2 +324,115 @@ s1.emit.apply(s1, testCase.args);

[
{
name: 'tag and record',
args: ['foo', { bar: 1 }],
expect: {
tag: 'foo',
data: { bar: 1 }
}
},
{
name: 'tag, record and time',
args: ['foo', { bar: 1 }, 12345],
expect: {
tag: 'foo',
data: { bar: 1 },
time: 12345
}
},
{
name: 'tag, record and callback',
args: ['foo', { bar: 1 }, function cb() { cb.called = true; }],
expect: {
tag: 'foo',
data: { bar: 1 }
}
},
{
name: 'tag, record, time and callback',
args: ['foo', { bar: 1 }, 12345, function cb() { cb.called = true; }],
expect: {
tag: 'foo',
data: { bar: 1 },
time: 12345
}
}
].forEach(function(testCase) {
it('should send records with '+testCase.name+' arguments without a default tag', function(done){
runServer({}, function(server, finish){
var s1 = new sender.FluentSender(null, { port: server.port });
s1.emit.apply(s1, testCase.args);
finish(function(data){
expect(data[0].tag).to.be.equal(testCase.expect.tag);
expect(data[0].data).to.be.deep.equal(testCase.expect.data);
if (testCase.expect.time) {
expect(data[0].time).to.be.deep.equal(testCase.expect.time);
}
testCase.args.forEach(function(arg) {
if (typeof arg === "function") {
expect(arg.called, "callback must be called").to.be.true;
}
});
done();
});
});
});
});
[
{
name: 'record',
args: [{ bar: 1 }]
},
{
name: 'record and time',
args: [{ bar: 1 }, 12345]
},
{
name: 'record and callback',
args: [{ bar: 1 }, function cb(){ cb.called = true; }]
},
{
name: 'record, time and callback',
args: [{ bar: 1 }, 12345, function cb(){ cb.called = true; }]
},
{
name: 'record and date object',
args: [{ bar: 1 }, new Date(1384434467952)]
}
].forEach(function(testCase) {
it('should not send records with '+testCase.name+' arguments without a default tag', function(done){
runServer({}, function(server, finish){
var s1 = new sender.FluentSender(null, { port: server.port });
s1.on('error', function(error) {
expect(error.name).to.be.equal('MissingTagError');
});
s1.emit.apply(s1, testCase.args);
finish(function(data){
expect(data.length).to.be.equal(0);
testCase.args.forEach(function(arg) {
if (typeof arg === "function") {
expect(arg.called, "callback must be called").to.be.true;
}
});
done();
});
});
});
});
it('should set max listeners', function(done){

@@ -295,3 +454,3 @@ var s = new sender.FluentSender('debug');

it('should not flush queue if existing connection is unavailable.', function(done){
runServer(function(server, finish){
runServer({}, function(server, finish){
var s = new sender.FluentSender('debug', {port: server.port});

@@ -298,0 +457,0 @@ s.emit('1st record', '1st data', function(){

@@ -17,3 +17,3 @@ var expect = require('chai').expect;

it('should send log records', function(done){
runServer(function(server, finish){
runServer({}, function(server, finish){
var logger = new (winston.Logger)({

@@ -20,0 +20,0 @@ transports: [

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc