fluent-logger
Advanced tools
Comparing version 2.6.2 to 2.7.0
@@ -7,2 +7,3 @@ 'use strict'; | ||
var crypto = require('crypto'); | ||
var tls = require('tls'); | ||
var zlib = require('zlib'); | ||
@@ -27,2 +28,4 @@ var FluentLoggerError = require('./logger-error'); | ||
this.timeout = options.timeout || 3.0; | ||
this.tls = !!options.tls; | ||
this.tlsOptions = options.tlsOptions || {}; | ||
this.reconnectInterval = options.reconnectInterval || 600000; // Default is 10 minutes | ||
@@ -51,3 +54,3 @@ this.requireAckResponse = options.requireAckResponse; | ||
username: '', | ||
passwort: '' | ||
password: '' | ||
}; | ||
@@ -229,20 +232,39 @@ this.sharedKeySalt = crypto.randomBytes(16).toString('hex'); | ||
FluentSender.prototype._doConnect = function(callback) { | ||
this._socket = new net.Socket(); | ||
this._socket.setTimeout(this.timeout); | ||
this._socket.on('error', (err) => { | ||
if (this._socket) { | ||
this._disconnect(); | ||
this._handleEvent('error', err); | ||
let addHandlers = () => { | ||
let errorHandler = (err) => { | ||
if (this._socket) { | ||
this._disconnect(); | ||
this._handleEvent('error', err); | ||
} | ||
}; | ||
this._socket.on('error', errorHandler); | ||
this._socket.on('connect', () => { | ||
this._handleEvent('connect'); | ||
}); | ||
if (this.tls) { | ||
this._socket.on('tlsClientError', errorHandler); | ||
this._socket.on('secureConnect', () => { | ||
this._handleEvent('connect'); | ||
}); | ||
} | ||
}); | ||
this._socket.on('connect', () => { | ||
this._handleEvent('connect'); | ||
}); | ||
}; | ||
if (!this.tls) { | ||
this._socket = new net.Socket(); | ||
this._socket.setTimeout(this.timeout); | ||
addHandlers(); | ||
} | ||
if (this.path) { | ||
this._socket.connect(this.path, () => { | ||
callback(); | ||
}); | ||
if (this.tls) { | ||
this._socket = tls.connect(Object.assign({}, this.tlsOptions, { path: this.path }), () => { | ||
callback(); | ||
}); | ||
addHandlers(); | ||
} else { | ||
this._socket.connect(this.path, () => { | ||
callback(); | ||
}); | ||
} | ||
} else { | ||
this._socket.connect(this.port, this.host, () => { | ||
if (this.security.clientHostname && this.security.sharedKey) { | ||
let postConnect = () => { | ||
if (this.security.clientHostname && this.security.sharedKey !== null) { | ||
this._handshake(callback); | ||
@@ -253,3 +275,13 @@ } else { | ||
} | ||
}); | ||
}; | ||
if (this.tls) { | ||
this._socket = tls.connect(Object.assign({}, this.tlsOptions, { host: this.host, port: this.port }), () => { | ||
postConnect(); | ||
}); | ||
addHandlers(); | ||
} else { | ||
this._socket.connect(this.port, this.host, () => { | ||
postConnect(); | ||
}); | ||
} | ||
} | ||
@@ -256,0 +288,0 @@ }; |
'use strict'; | ||
var net = require('net'); | ||
var tls = require('tls'); | ||
var msgpack = require('msgpack-lite'); | ||
@@ -7,5 +8,6 @@ var crypto = require('crypto'); | ||
function MockFluentdServer(options) { | ||
function MockFluentdServer(options, tlsOptions) { | ||
this._port = null; | ||
this._options = options; | ||
this._tlsOptions = tlsOptions || {}; | ||
this._received = []; | ||
@@ -16,3 +18,3 @@ this._clients = {}; | ||
this._userAuthSalt = null; | ||
this._server = net.createServer((socket) => { | ||
let server = (socket) => { | ||
var clientKey = socket.remoteAddress + ':' + socket.remotePort; | ||
@@ -69,4 +71,11 @@ this._clients[clientKey] = socket; | ||
}); | ||
}); | ||
this._server.on('connection', (socket) => { | ||
}; | ||
var connectionEventType = 'connection'; | ||
if (this._tlsOptions.tls) { | ||
connectionEventType = 'secureConnection'; | ||
this._server = tls.createServer(this._tlsOptions, server); | ||
} else { | ||
this._server = net.createServer(server); | ||
} | ||
this._server.on(connectionEventType, (socket) => { | ||
if (this._options.security && this._options.security.sharedKey && this._options.security.serverHostname) { | ||
@@ -185,4 +194,4 @@ this._state = 'helo'; | ||
module.exports = { | ||
runServer: function(options, callback) { | ||
var server = new MockFluentdServer(options); | ||
runServer: function(options, tlsOptions, callback) { | ||
var server = new MockFluentdServer(options, tlsOptions); | ||
server.listen(() => { | ||
@@ -189,0 +198,0 @@ callback(server, (_callback) => { |
{ | ||
"name": "fluent-logger", | ||
"version": "2.6.2", | ||
"version": "2.7.0", | ||
"main": "./lib/index.js", | ||
@@ -42,3 +42,4 @@ "scripts": { | ||
"eslint": "^4.11.0", | ||
"eslint-plugin-node": "" | ||
"eslint-plugin-node": "", | ||
"selfsigned": "" | ||
}, | ||
@@ -45,0 +46,0 @@ "license": "Apache-2.0", |
@@ -75,3 +75,3 @@ # fluent-logger for Node.js | ||
```js | ||
var logger = require('fluent-logger').createFluentSender('tag_prefix', { | ||
var logger = require('fluent-logger').createFluentSender('dummy', { | ||
host: 'localhost', | ||
@@ -108,2 +108,50 @@ port: 24224, | ||
### TLS/SSL encryption | ||
Logger configuration: | ||
```js | ||
var logger = require('fluent-logger').createFluentSender('dummy', { | ||
host: 'localhost', | ||
port: 24224, | ||
timeout: 3.0, | ||
reconnectInterval: 600000, // 10 minutes | ||
security: { | ||
clientHostname: "client.localdomain", | ||
sharedKey: "secure_communication_is_awesome" | ||
}, | ||
tls: true, | ||
tlsOptions: { | ||
ca: fs.readFileSync('/path/to/ca_cert.pem') | ||
} | ||
}); | ||
logger.emit('debug', { message: 'This is a message' }); | ||
``` | ||
Server configuration: | ||
```aconf | ||
<source> | ||
@type forward | ||
port 24224 | ||
<transport tls> | ||
ca_cert_path /path/to/ca_cert.pem | ||
ca_private_key_path /path/to/ca_key.pem | ||
ca_private_key_passphrase very_secret_passphrase | ||
</transport> | ||
<security> | ||
self_hostname input.testing.local | ||
shared_key secure_communication_is_awesome | ||
</security> | ||
</source> | ||
<match dummy.*> | ||
@type stdout | ||
</match> | ||
``` | ||
FYI: You can generate certificates using fluent-ca-generate command since Fluentd 1.1.0. | ||
See also [How to enable TLS/SSL encryption](https://docs.fluentd.org/v1.0/articles/in_forward#how-to-enable-tls/ssl-encryption). | ||
### EventTime support | ||
@@ -273,2 +321,14 @@ | ||
**tls** | ||
Enable TLS for socket. | ||
**tlsOptions** | ||
Options to pass to tls.connect when tls is true. | ||
For more details, see following documents | ||
* [tls.connect()](https://nodejs.org/api/tls.html#tls_tls_connect_options_callback) | ||
* [tls.createSecureContext()](https://nodejs.org/api/tls.html#tls_tls_createsecurecontext_options) | ||
**internalLogger** | ||
@@ -275,0 +335,0 @@ |
@@ -19,6 +19,15 @@ 'use strict'; | ||
describe('FluentSender', () => { | ||
let doTest = (tls) => { | ||
let serverOptions = {}; | ||
let clientOptions = {}; | ||
if (tls) { | ||
var selfsigned = require('selfsigned'); | ||
var attrs = [{ name: 'commonName', value: 'foo.com' }]; | ||
var pems = selfsigned.generate(attrs, { days: 365 }); | ||
serverOptions = { tls: true, key: pems.private, cert: pems.cert, ca: pems.cert }; | ||
clientOptions = { tls: true, tlsOptions: { rejectUnauthorized: false } }; | ||
} | ||
it('should throw error', (done) => { | ||
try { | ||
new sender.FluentSender('debug', { eventMode: 'Unknown' }); | ||
new sender.FluentSender('debug', Object.assign({}, clientOptions, { eventMode: 'Unknown' })); | ||
} catch (e) { | ||
@@ -31,4 +40,4 @@ expect(e.message).to.be.equal('Unknown event mode: Unknown'); | ||
it('should send records', (done) => { | ||
runServer({}, (server, finish) => { | ||
var s1 = new sender.FluentSender('debug', { port: server.port }); | ||
runServer({}, serverOptions, (server, finish) => { | ||
var s1 = new sender.FluentSender('debug', Object.assign({}, clientOptions, { port: server.port })); | ||
var emits = []; | ||
@@ -56,4 +65,4 @@ function emit(k) { | ||
it('should emit connect event', (done) => { | ||
runServer({}, (server, finish) => { | ||
var s = new sender.FluentSender('debug', {port: server.port}); | ||
runServer({}, serverOptions, (server, finish) => { | ||
var s = new sender.FluentSender('debug', Object.assign({}, clientOptions, {port: server.port})); | ||
var called = false; | ||
@@ -73,6 +82,6 @@ s.on('connect', () => { | ||
it('should raise error when connection fails', (done) => { | ||
var s = new sender.FluentSender('debug', { | ||
var s = new sender.FluentSender('debug', Object.assign({}, clientOptions, { | ||
host: 'localhost', | ||
port: 65535 | ||
}); | ||
})); | ||
s.on('error', (err) => { | ||
@@ -98,7 +107,7 @@ expect(err.code).to.be.equal('ECONNREFUSED'); | ||
}; | ||
var s = new sender.FluentSender('debug', { | ||
var s = new sender.FluentSender('debug', Object.assign({}, clientOptions, { | ||
host: 'localhost', | ||
port: 65535, | ||
internalLogger: logger | ||
}); | ||
})); | ||
s._setupErrorHandler((timeoutId) => { | ||
@@ -117,4 +126,4 @@ expect(logger.buffer.info).to.have.lengthOf(1); | ||
it('should assure the sequence.', (done) => { | ||
runServer({}, (server, finish) => { | ||
var s = new sender.FluentSender('debug', {port: server.port}); | ||
runServer({}, serverOptions, (server, finish) => { | ||
var s = new sender.FluentSender('debug', Object.assign({}, clientOptions, {port: server.port})); | ||
s.emit('1st record', { message: '1st data' }); | ||
@@ -137,4 +146,4 @@ s.emit('2nd record', { message: '2nd data' }); | ||
it('should allow to emit with a custom timestamp', (done) => { | ||
runServer({}, (server, finish) => { | ||
var s = new sender.FluentSender('debug', {port: server.port}); | ||
runServer({}, serverOptions, (server, finish) => { | ||
var s = new sender.FluentSender('debug', Object.assign({}, clientOptions, {port: server.port})); | ||
var timestamp = new Date(2222, 12, 4); | ||
@@ -153,4 +162,4 @@ var timestamp_seconds_since_epoch = Math.floor(timestamp.getTime() / 1000); | ||
it('should allow to emit with a custom numeric timestamp', (done) => { | ||
runServer({}, (server, finish) => { | ||
var s = new sender.FluentSender('debug', {port: server.port}); | ||
runServer({}, serverOptions, (server, finish) => { | ||
var s = new sender.FluentSender('debug', Object.assign({}, clientOptions, {port: server.port})); | ||
var timestamp = Math.floor(new Date().getTime() / 1000); | ||
@@ -168,4 +177,4 @@ | ||
it('should allow to emit with a EventTime', (done) => { | ||
runServer({}, (server, finish) => { | ||
var s = new sender.FluentSender('debug', {port: server.port}); | ||
runServer({}, serverOptions, (server, finish) => { | ||
var s = new sender.FluentSender('debug', Object.assign({}, clientOptions, {port: server.port})); | ||
var eventTime = EventTime.now(); | ||
@@ -184,7 +193,7 @@ | ||
it('should resume the connection automatically and flush the queue', (done) => { | ||
var s = new sender.FluentSender('debug'); | ||
var s = new sender.FluentSender('debug', clientOptions); | ||
s.emit('1st record', { message: '1st data' }); | ||
s.on('error', (err) => { | ||
expect(err.code).to.be.equal('ECONNREFUSED'); | ||
runServer({}, (server, finish) => { | ||
runServer({}, serverOptions, (server, finish) => { | ||
s.port = server.port; | ||
@@ -208,4 +217,4 @@ s.emit('2nd record', { message: '2nd data' }); | ||
it('should reconnect when fluentd close the client socket suddenly', (done) => { | ||
runServer({}, (server, finish) => { | ||
var s = new sender.FluentSender('debug', {port: server.port}); | ||
runServer({}, serverOptions, (server, finish) => { | ||
var s = new sender.FluentSender('debug', Object.assign({}, clientOptions, {port: server.port})); | ||
s.emit('foo', 'bar', () => { | ||
@@ -217,3 +226,3 @@ // connected | ||
if (!(s._socket && s._socket.writable)) { | ||
runServer({}, (_server2, finish) => { | ||
runServer({}, serverOptions, (_server2, finish) => { | ||
s.port = _server2.port; // in actuall case, s.port does not need to be updated. | ||
@@ -240,7 +249,7 @@ s.emit('bar', { message: 'hoge' }, () => { | ||
it('should send records with requireAckResponse', (done) => { | ||
runServer({requireAckResponse: true}, (server, finish) => { | ||
var s1 = new sender.FluentSender('debug', { | ||
runServer({requireAckResponse: true}, serverOptions, (server, finish) => { | ||
var s1 = new sender.FluentSender('debug', Object.assign({}, clientOptions, { | ||
port: server.port, | ||
requireAckResponse: true | ||
}); | ||
})); | ||
var emits = []; | ||
@@ -269,8 +278,8 @@ function emit(k) { | ||
it('should send records ackResponseTimeout', (done) => { | ||
runServer({requireAckResponse: false }, (server, finish) => { | ||
var s1 = new sender.FluentSender('debug', { | ||
runServer({requireAckResponse: false }, serverOptions, (server, finish) => { | ||
var s1 = new sender.FluentSender('debug', Object.assign({}, clientOptions, { | ||
port: server.port, | ||
requireAckResponse: false, | ||
ackResponseTimeout: 1000 | ||
}); | ||
})); | ||
s1.on('response-timeout', (error) => { | ||
@@ -288,5 +297,5 @@ expect(error).to.be.equal('ack response timeout'); | ||
it('should set error handler', (done) => { | ||
var s = new sender.FluentSender('debug', { | ||
var s = new sender.FluentSender('debug', Object.assign({}, clientOptions, { | ||
reconnectInterval: 100 | ||
}); | ||
})); | ||
expect(s._eventEmitter.listeners('error').length).to.be.equal(0); | ||
@@ -386,4 +395,4 @@ s._setupErrorHandler(); | ||
it('should send records with ' + testCase.name + ' arguments', (done) => { | ||
runServer({}, (server, finish) => { | ||
var s1 = new sender.FluentSender('debug', { port: server.port }); | ||
runServer({}, serverOptions, (server, finish) => { | ||
var s1 = new sender.FluentSender('debug', Object.assign({}, clientOptions, { port: server.port })); | ||
s1.emit.apply(s1, testCase.args); | ||
@@ -451,4 +460,4 @@ | ||
it('should send records with ' + testCase.name + ' arguments without a default tag', (done) => { | ||
runServer({}, (server, finish) => { | ||
var s1 = new sender.FluentSender(null, { port: server.port }); | ||
runServer({}, serverOptions, (server, finish) => { | ||
var s1 = new sender.FluentSender(null, Object.assign({}, clientOptions, { port: server.port })); | ||
s1.emit.apply(s1, testCase.args); | ||
@@ -503,4 +512,4 @@ | ||
it('should not send records with ' + testCase.name + ' arguments without a default tag', (done) => { | ||
runServer({}, (server, finish) => { | ||
var s1 = new sender.FluentSender(null, { port: server.port }); | ||
runServer({}, serverOptions, (server, finish) => { | ||
var s1 = new sender.FluentSender(null, Object.assign({}, clientOptions, { port: server.port })); | ||
s1.on('error', (error) => { | ||
@@ -527,4 +536,4 @@ expect(error.name).to.be.equal('MissingTagError'); | ||
it('should not send records is not object', (done) => { | ||
runServer({}, (server, finish) => { | ||
var s1 = new sender.FluentSender(null, { port: server.port }); | ||
runServer({}, serverOptions, (server, finish) => { | ||
var s1 = new sender.FluentSender(null, Object.assign({}, clientOptions, { port: server.port })); | ||
s1.on('error', (error) => { | ||
@@ -542,3 +551,3 @@ expect(error.name).to.be.equal('DataTypeError'); | ||
it('should set max listeners', (done) => { | ||
var s = new sender.FluentSender('debug'); | ||
var s = new sender.FluentSender('debug', clientOptions); | ||
if (EventEmitter.prototype.getMaxListeners) { | ||
@@ -558,4 +567,4 @@ expect(s.getMaxListeners()).to.be.equal(10); | ||
it('should not flush queue if existing connection is unavailable.', (done) => { | ||
runServer({}, (server, finish) => { | ||
var s = new sender.FluentSender('debug', {port: server.port}); | ||
runServer({}, serverOptions, (server, finish) => { | ||
var s = new sender.FluentSender('debug', Object.assign({}, clientOptions, {port: server.port})); | ||
s.emit('1st record', { message: '1st data' }, () => { | ||
@@ -577,4 +586,4 @@ s._disconnect(); | ||
it('should write stream.', (done) => { | ||
runServer({}, (server, finish) => { | ||
var s = new sender.FluentSender('debug', { port: server.port }); | ||
runServer({}, serverOptions, (server, finish) => { | ||
var s = new sender.FluentSender('debug', Object.assign({}, clientOptions, { port: server.port })); | ||
var ss = s.toStream('record'); | ||
@@ -602,4 +611,4 @@ var pt = new stream.PassThrough(); | ||
it('should process messages step by step on requireAckResponse=true', (done) => { | ||
runServer({ requireAckResponse: true }, (server, finish) => { | ||
var s = new sender.FluentSender('debug', { | ||
runServer({ requireAckResponse: true }, serverOptions, (server, finish) => { | ||
var s = new sender.FluentSender('debug', Object.assign({}, clientOptions, { | ||
port: server.port, | ||
@@ -609,3 +618,3 @@ timeout: 3.0, | ||
requireAckResponse: true | ||
}); | ||
})); | ||
var errors = []; | ||
@@ -636,3 +645,3 @@ s.on('error', (err) => { | ||
it('should process entries when using PackedForward Mode', (done) => { | ||
runServer({}, (server, finish) => { | ||
runServer({}, serverOptions, (server, finish) => { | ||
let loggerOptions = { | ||
@@ -646,3 +655,3 @@ port: server.port, | ||
}; | ||
var s = new sender.FluentSender('debug', loggerOptions); | ||
var s = new sender.FluentSender('debug', Object.assign({}, clientOptions, loggerOptions)); | ||
s.emit('test', { message: 'This is test 0' }); | ||
@@ -664,3 +673,3 @@ s.end('test', { message: 'This is test 1' }); | ||
it('should compress entries when using CompressedPackedForward Mode', (done) => { | ||
runServer({}, (server, finish) => { | ||
runServer({}, serverOptions, (server, finish) => { | ||
let loggerOptions = { | ||
@@ -674,3 +683,3 @@ port: server.port, | ||
}; | ||
var s = new sender.FluentSender('debug', loggerOptions); | ||
var s = new sender.FluentSender('debug', Object.assign({}, clientOptions, loggerOptions)); | ||
s.emit('test', { message: 'This is test 0' }); | ||
@@ -699,3 +708,3 @@ s.emit('test', { message: 'This is test 1' }); | ||
}; | ||
runServer(options, (server, finish) => { | ||
runServer(options, serverOptions, (server, finish) => { | ||
let loggerOptions = { | ||
@@ -712,3 +721,3 @@ port: server.port, | ||
}; | ||
var s = new sender.FluentSender('debug', loggerOptions); | ||
var s = new sender.FluentSender('debug', Object.assign({}, clientOptions, loggerOptions)); | ||
s.emit('test', { message: 'This is test 0' }); | ||
@@ -735,3 +744,3 @@ s.emit('test', { message: 'This is test 1' }); | ||
}; | ||
runServer(options, (server, finish) => { | ||
runServer(options, serverOptions, (server, finish) => { | ||
let loggerOptions = { | ||
@@ -748,3 +757,3 @@ port: server.port, | ||
}; | ||
var s = new sender.FluentSender('debug', loggerOptions); | ||
var s = new sender.FluentSender('debug', Object.assign({}, clientOptions, loggerOptions)); | ||
s.on('error', (error) => { | ||
@@ -771,3 +780,3 @@ expect(error.message).to.be.equal('Authentication failed: shared key mismatch'); | ||
}; | ||
runServer(options, (server, finish) => { | ||
runServer(options, serverOptions, (server, finish) => { | ||
let loggerOptions = { | ||
@@ -786,3 +795,3 @@ port: server.port, | ||
}; | ||
var s = new sender.FluentSender('debug', loggerOptions); | ||
var s = new sender.FluentSender('debug', Object.assign({}, clientOptions, loggerOptions)); | ||
s.emit('test', { message: 'This is test 0' }); | ||
@@ -811,3 +820,3 @@ s.emit('test', { message: 'This is test 1' }); | ||
}; | ||
runServer(options, (server, finish) => { | ||
runServer(options, serverOptions, (server, finish) => { | ||
let loggerOptions = { | ||
@@ -826,3 +835,3 @@ port: server.port, | ||
}; | ||
var s = new sender.FluentSender('debug', loggerOptions); | ||
var s = new sender.FluentSender('debug', Object.assign({}, clientOptions, loggerOptions)); | ||
s.on('error', (error) => { | ||
@@ -848,3 +857,3 @@ expect(error.message).to.be.equal('Authentication failed: username/password mismatch'); | ||
}; | ||
runServer(options, (server, finish) => { | ||
runServer(options, serverOptions, (server, finish) => { | ||
let loggerOptions = { | ||
@@ -861,3 +870,3 @@ port: server.port, | ||
}; | ||
var s = new sender.FluentSender('debug', loggerOptions); | ||
var s = new sender.FluentSender('debug', Object.assign({}, clientOptions, loggerOptions)); | ||
s.on('error', (err) => { | ||
@@ -873,2 +882,10 @@ expect(err.message).to.be.equal('Authentication failed: reason'); | ||
}); | ||
}; | ||
describe('FluentSender', () => { | ||
doTest(); | ||
}); | ||
describe('FluentSenderWithTLS', () => { | ||
doTest(true); | ||
}); |
@@ -20,3 +20,3 @@ 'use strict'; | ||
it('should send log records', (done) => { | ||
runServer({}, (server, finish) => { | ||
runServer({}, {}, (server, finish) => { | ||
var logger = new (winston.Logger)({ | ||
@@ -23,0 +23,0 @@ transports: [ |
Sorry, the diff of this file is not supported yet
Network access
Supply chain riskThis module accesses the network.
Found 1 instance in 1 package
112612
1816
346
7
4