Socket
Socket
Sign inDemoInstall

fluent-logger

Package Overview
Dependencies
1
Maintainers
4
Versions
46
Alerts
File Explorer

Advanced tools

Install Socket

Detect and block malicious and high-risk dependencies

Install

Comparing version 2.6.2 to 2.7.0

66

lib/sender.js

@@ -7,2 +7,3 @@ 'use strict';

var crypto = require('crypto');
var tls = require('tls');
var zlib = require('zlib');

@@ -27,2 +28,4 @@ var FluentLoggerError = require('./logger-error');

this.timeout = options.timeout || 3.0;
this.tls = !!options.tls;
this.tlsOptions = options.tlsOptions || {};
this.reconnectInterval = options.reconnectInterval || 600000; // Default is 10 minutes

@@ -51,3 +54,3 @@ this.requireAckResponse = options.requireAckResponse;

username: '',
passwort: ''
password: ''
};

@@ -229,20 +232,39 @@ this.sharedKeySalt = crypto.randomBytes(16).toString('hex');

FluentSender.prototype._doConnect = function(callback) {
this._socket = new net.Socket();
this._socket.setTimeout(this.timeout);
this._socket.on('error', (err) => {
if (this._socket) {
this._disconnect();
this._handleEvent('error', err);
let addHandlers = () => {
let errorHandler = (err) => {
if (this._socket) {
this._disconnect();
this._handleEvent('error', err);
}
};
this._socket.on('error', errorHandler);
this._socket.on('connect', () => {
this._handleEvent('connect');
});
if (this.tls) {
this._socket.on('tlsClientError', errorHandler);
this._socket.on('secureConnect', () => {
this._handleEvent('connect');
});
}
});
this._socket.on('connect', () => {
this._handleEvent('connect');
});
};
if (!this.tls) {
this._socket = new net.Socket();
this._socket.setTimeout(this.timeout);
addHandlers();
}
if (this.path) {
this._socket.connect(this.path, () => {
callback();
});
if (this.tls) {
this._socket = tls.connect(Object.assign({}, this.tlsOptions, { path: this.path }), () => {
callback();
});
addHandlers();
} else {
this._socket.connect(this.path, () => {
callback();
});
}
} else {
this._socket.connect(this.port, this.host, () => {
if (this.security.clientHostname && this.security.sharedKey) {
let postConnect = () => {
if (this.security.clientHostname && this.security.sharedKey !== null) {
this._handshake(callback);

@@ -253,3 +275,13 @@ } else {

}
});
};
if (this.tls) {
this._socket = tls.connect(Object.assign({}, this.tlsOptions, { host: this.host, port: this.port }), () => {
postConnect();
});
addHandlers();
} else {
this._socket.connect(this.port, this.host, () => {
postConnect();
});
}
}

@@ -256,0 +288,0 @@ };

'use strict';
var net = require('net');
var tls = require('tls');
var msgpack = require('msgpack-lite');

@@ -7,5 +8,6 @@ var crypto = require('crypto');

function MockFluentdServer(options) {
function MockFluentdServer(options, tlsOptions) {
this._port = null;
this._options = options;
this._tlsOptions = tlsOptions || {};
this._received = [];

@@ -16,3 +18,3 @@ this._clients = {};

this._userAuthSalt = null;
this._server = net.createServer((socket) => {
let server = (socket) => {
var clientKey = socket.remoteAddress + ':' + socket.remotePort;

@@ -69,4 +71,11 @@ this._clients[clientKey] = socket;

});
});
this._server.on('connection', (socket) => {
};
var connectionEventType = 'connection';
if (this._tlsOptions.tls) {
connectionEventType = 'secureConnection';
this._server = tls.createServer(this._tlsOptions, server);
} else {
this._server = net.createServer(server);
}
this._server.on(connectionEventType, (socket) => {
if (this._options.security && this._options.security.sharedKey && this._options.security.serverHostname) {

@@ -185,4 +194,4 @@ this._state = 'helo';

module.exports = {
runServer: function(options, callback) {
var server = new MockFluentdServer(options);
runServer: function(options, tlsOptions, callback) {
var server = new MockFluentdServer(options, tlsOptions);
server.listen(() => {

@@ -189,0 +198,0 @@ callback(server, (_callback) => {

{
"name": "fluent-logger",
"version": "2.6.2",
"version": "2.7.0",
"main": "./lib/index.js",

@@ -42,3 +42,4 @@ "scripts": {

"eslint": "^4.11.0",
"eslint-plugin-node": ""
"eslint-plugin-node": "",
"selfsigned": ""
},

@@ -45,0 +46,0 @@ "license": "Apache-2.0",

@@ -75,3 +75,3 @@ # fluent-logger for Node.js

```js
var logger = require('fluent-logger').createFluentSender('tag_prefix', {
var logger = require('fluent-logger').createFluentSender('dummy', {
host: 'localhost',

@@ -108,2 +108,50 @@ port: 24224,

### TLS/SSL encryption
Logger configuration:
```js
var logger = require('fluent-logger').createFluentSender('dummy', {
host: 'localhost',
port: 24224,
timeout: 3.0,
reconnectInterval: 600000, // 10 minutes
security: {
clientHostname: "client.localdomain",
sharedKey: "secure_communication_is_awesome"
},
tls: true,
tlsOptions: {
ca: fs.readFileSync('/path/to/ca_cert.pem')
}
});
logger.emit('debug', { message: 'This is a message' });
```
Server configuration:
```aconf
<source>
@type forward
port 24224
<transport tls>
ca_cert_path /path/to/ca_cert.pem
ca_private_key_path /path/to/ca_key.pem
ca_private_key_passphrase very_secret_passphrase
</transport>
<security>
self_hostname input.testing.local
shared_key secure_communication_is_awesome
</security>
</source>
<match dummy.*>
@type stdout
</match>
```
FYI: You can generate certificates using fluent-ca-generate command since Fluentd 1.1.0.
See also [How to enable TLS/SSL encryption](https://docs.fluentd.org/v1.0/articles/in_forward#how-to-enable-tls/ssl-encryption).
### EventTime support

@@ -273,2 +321,14 @@

**tls**
Enable TLS for socket.
**tlsOptions**
Options to pass to tls.connect when tls is true.
For more details, see following documents
* [tls.connect()](https://nodejs.org/api/tls.html#tls_tls_connect_options_callback)
* [tls.createSecureContext()](https://nodejs.org/api/tls.html#tls_tls_createsecurecontext_options)
**internalLogger**

@@ -275,0 +335,0 @@

@@ -19,6 +19,15 @@ 'use strict';

describe('FluentSender', () => {
let doTest = (tls) => {
let serverOptions = {};
let clientOptions = {};
if (tls) {
var selfsigned = require('selfsigned');
var attrs = [{ name: 'commonName', value: 'foo.com' }];
var pems = selfsigned.generate(attrs, { days: 365 });
serverOptions = { tls: true, key: pems.private, cert: pems.cert, ca: pems.cert };
clientOptions = { tls: true, tlsOptions: { rejectUnauthorized: false } };
}
it('should throw error', (done) => {
try {
new sender.FluentSender('debug', { eventMode: 'Unknown' });
new sender.FluentSender('debug', Object.assign({}, clientOptions, { eventMode: 'Unknown' }));
} catch (e) {

@@ -31,4 +40,4 @@ expect(e.message).to.be.equal('Unknown event mode: Unknown');

it('should send records', (done) => {
runServer({}, (server, finish) => {
var s1 = new sender.FluentSender('debug', { port: server.port });
runServer({}, serverOptions, (server, finish) => {
var s1 = new sender.FluentSender('debug', Object.assign({}, clientOptions, { port: server.port }));
var emits = [];

@@ -56,4 +65,4 @@ function emit(k) {

it('should emit connect event', (done) => {
runServer({}, (server, finish) => {
var s = new sender.FluentSender('debug', {port: server.port});
runServer({}, serverOptions, (server, finish) => {
var s = new sender.FluentSender('debug', Object.assign({}, clientOptions, {port: server.port}));
var called = false;

@@ -73,6 +82,6 @@ s.on('connect', () => {

it('should raise error when connection fails', (done) => {
var s = new sender.FluentSender('debug', {
var s = new sender.FluentSender('debug', Object.assign({}, clientOptions, {
host: 'localhost',
port: 65535
});
}));
s.on('error', (err) => {

@@ -98,7 +107,7 @@ expect(err.code).to.be.equal('ECONNREFUSED');

};
var s = new sender.FluentSender('debug', {
var s = new sender.FluentSender('debug', Object.assign({}, clientOptions, {
host: 'localhost',
port: 65535,
internalLogger: logger
});
}));
s._setupErrorHandler((timeoutId) => {

@@ -117,4 +126,4 @@ expect(logger.buffer.info).to.have.lengthOf(1);

it('should assure the sequence.', (done) => {
runServer({}, (server, finish) => {
var s = new sender.FluentSender('debug', {port: server.port});
runServer({}, serverOptions, (server, finish) => {
var s = new sender.FluentSender('debug', Object.assign({}, clientOptions, {port: server.port}));
s.emit('1st record', { message: '1st data' });

@@ -137,4 +146,4 @@ s.emit('2nd record', { message: '2nd data' });

it('should allow to emit with a custom timestamp', (done) => {
runServer({}, (server, finish) => {
var s = new sender.FluentSender('debug', {port: server.port});
runServer({}, serverOptions, (server, finish) => {
var s = new sender.FluentSender('debug', Object.assign({}, clientOptions, {port: server.port}));
var timestamp = new Date(2222, 12, 4);

@@ -153,4 +162,4 @@ var timestamp_seconds_since_epoch = Math.floor(timestamp.getTime() / 1000);

it('should allow to emit with a custom numeric timestamp', (done) => {
runServer({}, (server, finish) => {
var s = new sender.FluentSender('debug', {port: server.port});
runServer({}, serverOptions, (server, finish) => {
var s = new sender.FluentSender('debug', Object.assign({}, clientOptions, {port: server.port}));
var timestamp = Math.floor(new Date().getTime() / 1000);

@@ -168,4 +177,4 @@

it('should allow to emit with a EventTime', (done) => {
runServer({}, (server, finish) => {
var s = new sender.FluentSender('debug', {port: server.port});
runServer({}, serverOptions, (server, finish) => {
var s = new sender.FluentSender('debug', Object.assign({}, clientOptions, {port: server.port}));
var eventTime = EventTime.now();

@@ -184,7 +193,7 @@

it('should resume the connection automatically and flush the queue', (done) => {
var s = new sender.FluentSender('debug');
var s = new sender.FluentSender('debug', clientOptions);
s.emit('1st record', { message: '1st data' });
s.on('error', (err) => {
expect(err.code).to.be.equal('ECONNREFUSED');
runServer({}, (server, finish) => {
runServer({}, serverOptions, (server, finish) => {
s.port = server.port;

@@ -208,4 +217,4 @@ s.emit('2nd record', { message: '2nd data' });

it('should reconnect when fluentd close the client socket suddenly', (done) => {
runServer({}, (server, finish) => {
var s = new sender.FluentSender('debug', {port: server.port});
runServer({}, serverOptions, (server, finish) => {
var s = new sender.FluentSender('debug', Object.assign({}, clientOptions, {port: server.port}));
s.emit('foo', 'bar', () => {

@@ -217,3 +226,3 @@ // connected

if (!(s._socket && s._socket.writable)) {
runServer({}, (_server2, finish) => {
runServer({}, serverOptions, (_server2, finish) => {
s.port = _server2.port; // in actuall case, s.port does not need to be updated.

@@ -240,7 +249,7 @@ s.emit('bar', { message: 'hoge' }, () => {

it('should send records with requireAckResponse', (done) => {
runServer({requireAckResponse: true}, (server, finish) => {
var s1 = new sender.FluentSender('debug', {
runServer({requireAckResponse: true}, serverOptions, (server, finish) => {
var s1 = new sender.FluentSender('debug', Object.assign({}, clientOptions, {
port: server.port,
requireAckResponse: true
});
}));
var emits = [];

@@ -269,8 +278,8 @@ function emit(k) {

it('should send records ackResponseTimeout', (done) => {
runServer({requireAckResponse: false }, (server, finish) => {
var s1 = new sender.FluentSender('debug', {
runServer({requireAckResponse: false }, serverOptions, (server, finish) => {
var s1 = new sender.FluentSender('debug', Object.assign({}, clientOptions, {
port: server.port,
requireAckResponse: false,
ackResponseTimeout: 1000
});
}));
s1.on('response-timeout', (error) => {

@@ -288,5 +297,5 @@ expect(error).to.be.equal('ack response timeout');

it('should set error handler', (done) => {
var s = new sender.FluentSender('debug', {
var s = new sender.FluentSender('debug', Object.assign({}, clientOptions, {
reconnectInterval: 100
});
}));
expect(s._eventEmitter.listeners('error').length).to.be.equal(0);

@@ -386,4 +395,4 @@ s._setupErrorHandler();

it('should send records with ' + testCase.name + ' arguments', (done) => {
runServer({}, (server, finish) => {
var s1 = new sender.FluentSender('debug', { port: server.port });
runServer({}, serverOptions, (server, finish) => {
var s1 = new sender.FluentSender('debug', Object.assign({}, clientOptions, { port: server.port }));
s1.emit.apply(s1, testCase.args);

@@ -451,4 +460,4 @@

it('should send records with ' + testCase.name + ' arguments without a default tag', (done) => {
runServer({}, (server, finish) => {
var s1 = new sender.FluentSender(null, { port: server.port });
runServer({}, serverOptions, (server, finish) => {
var s1 = new sender.FluentSender(null, Object.assign({}, clientOptions, { port: server.port }));
s1.emit.apply(s1, testCase.args);

@@ -503,4 +512,4 @@

it('should not send records with ' + testCase.name + ' arguments without a default tag', (done) => {
runServer({}, (server, finish) => {
var s1 = new sender.FluentSender(null, { port: server.port });
runServer({}, serverOptions, (server, finish) => {
var s1 = new sender.FluentSender(null, Object.assign({}, clientOptions, { port: server.port }));
s1.on('error', (error) => {

@@ -527,4 +536,4 @@ expect(error.name).to.be.equal('MissingTagError');

it('should not send records is not object', (done) => {
runServer({}, (server, finish) => {
var s1 = new sender.FluentSender(null, { port: server.port });
runServer({}, serverOptions, (server, finish) => {
var s1 = new sender.FluentSender(null, Object.assign({}, clientOptions, { port: server.port }));
s1.on('error', (error) => {

@@ -542,3 +551,3 @@ expect(error.name).to.be.equal('DataTypeError');

it('should set max listeners', (done) => {
var s = new sender.FluentSender('debug');
var s = new sender.FluentSender('debug', clientOptions);
if (EventEmitter.prototype.getMaxListeners) {

@@ -558,4 +567,4 @@ expect(s.getMaxListeners()).to.be.equal(10);

it('should not flush queue if existing connection is unavailable.', (done) => {
runServer({}, (server, finish) => {
var s = new sender.FluentSender('debug', {port: server.port});
runServer({}, serverOptions, (server, finish) => {
var s = new sender.FluentSender('debug', Object.assign({}, clientOptions, {port: server.port}));
s.emit('1st record', { message: '1st data' }, () => {

@@ -577,4 +586,4 @@ s._disconnect();

it('should write stream.', (done) => {
runServer({}, (server, finish) => {
var s = new sender.FluentSender('debug', { port: server.port });
runServer({}, serverOptions, (server, finish) => {
var s = new sender.FluentSender('debug', Object.assign({}, clientOptions, { port: server.port }));
var ss = s.toStream('record');

@@ -602,4 +611,4 @@ var pt = new stream.PassThrough();

it('should process messages step by step on requireAckResponse=true', (done) => {
runServer({ requireAckResponse: true }, (server, finish) => {
var s = new sender.FluentSender('debug', {
runServer({ requireAckResponse: true }, serverOptions, (server, finish) => {
var s = new sender.FluentSender('debug', Object.assign({}, clientOptions, {
port: server.port,

@@ -609,3 +618,3 @@ timeout: 3.0,

requireAckResponse: true
});
}));
var errors = [];

@@ -636,3 +645,3 @@ s.on('error', (err) => {

it('should process entries when using PackedForward Mode', (done) => {
runServer({}, (server, finish) => {
runServer({}, serverOptions, (server, finish) => {
let loggerOptions = {

@@ -646,3 +655,3 @@ port: server.port,

};
var s = new sender.FluentSender('debug', loggerOptions);
var s = new sender.FluentSender('debug', Object.assign({}, clientOptions, loggerOptions));
s.emit('test', { message: 'This is test 0' });

@@ -664,3 +673,3 @@ s.end('test', { message: 'This is test 1' });

it('should compress entries when using CompressedPackedForward Mode', (done) => {
runServer({}, (server, finish) => {
runServer({}, serverOptions, (server, finish) => {
let loggerOptions = {

@@ -674,3 +683,3 @@ port: server.port,

};
var s = new sender.FluentSender('debug', loggerOptions);
var s = new sender.FluentSender('debug', Object.assign({}, clientOptions, loggerOptions));
s.emit('test', { message: 'This is test 0' });

@@ -699,3 +708,3 @@ s.emit('test', { message: 'This is test 1' });

};
runServer(options, (server, finish) => {
runServer(options, serverOptions, (server, finish) => {
let loggerOptions = {

@@ -712,3 +721,3 @@ port: server.port,

};
var s = new sender.FluentSender('debug', loggerOptions);
var s = new sender.FluentSender('debug', Object.assign({}, clientOptions, loggerOptions));
s.emit('test', { message: 'This is test 0' });

@@ -735,3 +744,3 @@ s.emit('test', { message: 'This is test 1' });

};
runServer(options, (server, finish) => {
runServer(options, serverOptions, (server, finish) => {
let loggerOptions = {

@@ -748,3 +757,3 @@ port: server.port,

};
var s = new sender.FluentSender('debug', loggerOptions);
var s = new sender.FluentSender('debug', Object.assign({}, clientOptions, loggerOptions));
s.on('error', (error) => {

@@ -771,3 +780,3 @@ expect(error.message).to.be.equal('Authentication failed: shared key mismatch');

};
runServer(options, (server, finish) => {
runServer(options, serverOptions, (server, finish) => {
let loggerOptions = {

@@ -786,3 +795,3 @@ port: server.port,

};
var s = new sender.FluentSender('debug', loggerOptions);
var s = new sender.FluentSender('debug', Object.assign({}, clientOptions, loggerOptions));
s.emit('test', { message: 'This is test 0' });

@@ -811,3 +820,3 @@ s.emit('test', { message: 'This is test 1' });

};
runServer(options, (server, finish) => {
runServer(options, serverOptions, (server, finish) => {
let loggerOptions = {

@@ -826,3 +835,3 @@ port: server.port,

};
var s = new sender.FluentSender('debug', loggerOptions);
var s = new sender.FluentSender('debug', Object.assign({}, clientOptions, loggerOptions));
s.on('error', (error) => {

@@ -848,3 +857,3 @@ expect(error.message).to.be.equal('Authentication failed: username/password mismatch');

};
runServer(options, (server, finish) => {
runServer(options, serverOptions, (server, finish) => {
let loggerOptions = {

@@ -861,3 +870,3 @@ port: server.port,

};
var s = new sender.FluentSender('debug', loggerOptions);
var s = new sender.FluentSender('debug', Object.assign({}, clientOptions, loggerOptions));
s.on('error', (err) => {

@@ -873,2 +882,10 @@ expect(err.message).to.be.equal('Authentication failed: reason');

});
};
describe('FluentSender', () => {
doTest();
});
describe('FluentSenderWithTLS', () => {
doTest(true);
});

@@ -20,3 +20,3 @@ 'use strict';

it('should send log records', (done) => {
runServer({}, (server, finish) => {
runServer({}, {}, (server, finish) => {
var logger = new (winston.Logger)({

@@ -23,0 +23,0 @@ transports: [

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap

Packages

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc