fluent-logger
Advanced tools
Comparing version 2.8.0 to 3.0.0
@@ -60,2 +60,5 @@ module.exports = { | ||
], | ||
"no-var": [ | ||
"error" | ||
], | ||
"node/exports-style": [ | ||
@@ -62,0 +65,0 @@ "error", |
'use strict'; | ||
var EventTime = function EventTime(epoch, nano) { | ||
this.epoch = epoch; | ||
this.nano = nano; | ||
}; | ||
module.exports = class EventTime { | ||
constructor(epoch, nano) { | ||
this.epoch = epoch; | ||
this.nano = nano; | ||
} | ||
EventTime.pack = function eventTimePack(eventTime) { | ||
var b = Buffer.alloc(8); | ||
b.writeUInt32BE(eventTime.epoch, 0); | ||
b.writeUInt32BE(eventTime.nano, 4); | ||
return b; | ||
}; | ||
static pack(eventTime) { | ||
const b = Buffer.alloc(8); | ||
b.writeUInt32BE(eventTime.epoch, 0); | ||
b.writeUInt32BE(eventTime.nano, 4); | ||
return b; | ||
} | ||
EventTime.unpack = function eventTimeUnpack(buffer) { | ||
var e = buffer.readUInt32BE(0); | ||
var n = buffer.readUInt32BE(4); | ||
return new EventTime(e, n); | ||
}; | ||
static unpack(buffer) { | ||
const e = buffer.readUInt32BE(0); | ||
const n = buffer.readUInt32BE(4); | ||
return new EventTime(e, n); | ||
} | ||
EventTime.now = function now() { | ||
var now = Date.now(); | ||
return EventTime.fromTimestamp(now); | ||
}; | ||
static now() { | ||
const now = Date.now(); | ||
return EventTime.fromTimestamp(now); | ||
} | ||
EventTime.fromDate = function fromDate(date) { | ||
var t = date.getTime(); | ||
return EventTime.fromTimestamp(t); | ||
}; | ||
static fromDate(date) { | ||
const t = date.getTime(); | ||
return EventTime.fromTimestamp(t); | ||
} | ||
EventTime.fromTimestamp = function fromTimestamp(t) { | ||
var epoch = Math.floor(t / 1000); | ||
var nano = (t - epoch * 1000) * 1000000; | ||
return new EventTime(epoch, nano); | ||
static fromTimestamp(t) { | ||
const epoch = Math.floor(t / 1000); | ||
const nano = (t - epoch * 1000) * 1000000; | ||
return new EventTime(epoch, nano); | ||
} | ||
}; | ||
module.exports = {}; | ||
module.exports.EventTime = EventTime; |
'use strict'; | ||
var FluentSender = require('./sender').FluentSender; | ||
var sender = new FluentSender('debug'); | ||
var EventTime = require('./event-time').EventTime; | ||
const FluentSender = require('./sender'); | ||
const EventTime = require('./event-time'); | ||
let sender = new FluentSender('debug'); | ||
module.exports = { | ||
configure: function(config) { | ||
configure: function(tag, options) { | ||
sender.end(); | ||
var tag = config; | ||
var options = arguments[1]; | ||
sender = new FluentSender(tag, options); | ||
@@ -17,3 +15,3 @@ sender._setupErrorHandler(); | ||
createFluentSender: function(tag, options) { | ||
var _sender = new FluentSender(tag, options); | ||
const _sender = new FluentSender(tag, options); | ||
_sender._setupErrorHandler(); | ||
@@ -25,3 +23,3 @@ return _sender; | ||
winstonTransport: function() { | ||
var transport = require('../lib/winston'); | ||
const transport = require('../lib/winston'); | ||
return transport; | ||
@@ -35,7 +33,7 @@ } | ||
// delegate logger interfaces to default sender object | ||
var methods = ['emit', 'end', 'addListener', 'on', 'once', 'removeListener', 'removeAllListeners', 'setMaxListeners', 'getMaxListeners']; | ||
const methods = ['emit', 'end', 'addListener', 'on', 'once', 'removeListener', 'removeAllListeners', 'setMaxListeners', 'getMaxListeners']; | ||
methods.forEach((attr, i) => { | ||
module.exports[attr] = function() { | ||
if (sender) { | ||
return sender[attr].apply(sender, Array.prototype.slice.call(arguments)); | ||
return sender[attr].apply(sender, Array.from(arguments)); | ||
} | ||
@@ -42,0 +40,0 @@ return undefined; |
'use strict'; | ||
var util = require('util'); | ||
module.exports = {}; | ||
var BaseError = function BaseError(message, options) { | ||
Error.captureStackTrace(this, this.constructor); | ||
this.name = this.constructor.name; | ||
this.message = message; | ||
this.options = options; | ||
}; | ||
util.inherits(BaseError, Error); | ||
class BaseError extends Error { | ||
constructor(message, options) { | ||
super(); | ||
Error.captureStackTrace(this, this.constructor); | ||
this.name = this.constructor.name; | ||
this.message = message; | ||
this.options = options; | ||
} | ||
} | ||
var ConfigError = function ConfigError(message, options) { | ||
ConfigError.super_.call(this, message, options); | ||
}; | ||
util.inherits(ConfigError, BaseError); | ||
class ConfigError extends BaseError { | ||
constructor(message, options) { | ||
super(message, options); | ||
} | ||
} | ||
var MissingTagError = function MissingTagError(message, options) { | ||
MissingTagError.super_.call(this, message, options); | ||
}; | ||
util.inherits(MissingTagError, BaseError); | ||
class MissingTagError extends BaseError { | ||
constructor(message, options) { | ||
super(message, options); | ||
} | ||
} | ||
var ResponseError = function ResponseError(message, options) { | ||
MissingTagError.super_.call(this, message, options); | ||
}; | ||
util.inherits(ResponseError, BaseError); | ||
class ResponseError extends BaseError { | ||
constructor(message, options) { | ||
super(message, options); | ||
} | ||
} | ||
var ResponseTimeoutError = function ResponseTimeoutError(message, options) { | ||
ResponseTimeoutError.super_.call(this, message, options); | ||
}; | ||
util.inherits(ResponseTimeoutError, BaseError); | ||
class ResponseTimeoutError extends BaseError { | ||
constructor(message, options) { | ||
super(message, options); | ||
} | ||
} | ||
var DataTypeError = function DataTypeError(message, options) { | ||
DataTypeError.super_.call(this, message, options); | ||
}; | ||
util.inherits(DataTypeError, BaseError); | ||
class DataTypeError extends BaseError { | ||
constructor(message, options) { | ||
super(message, options); | ||
} | ||
} | ||
var HandshakeError = function HandshakeError(message, options) { | ||
HandshakeError.super_.call(this, message, options); | ||
}; | ||
util.inherits(HandshakeError, BaseError); | ||
class HandshakeError extends BaseError { | ||
constructor(message, options) { | ||
super(message, options); | ||
} | ||
} | ||
@@ -45,0 +51,0 @@ module.exports = { |
'use strict'; | ||
var EventEmitter = require('events').EventEmitter; | ||
var msgpack = require('msgpack-lite'); | ||
var net = require('net'); | ||
var stream = require('stream'); | ||
var crypto = require('crypto'); | ||
var tls = require('tls'); | ||
var zlib = require('zlib'); | ||
var FluentLoggerError = require('./logger-error'); | ||
var EventTime = require('./event-time').EventTime; | ||
const EventEmitter = require('events').EventEmitter; | ||
const msgpack = require('msgpack-lite'); | ||
const net = require('net'); | ||
const stream = require('stream'); | ||
const crypto = require('crypto'); | ||
const tls = require('tls'); | ||
const zlib = require('zlib'); | ||
const FluentLoggerError = require('./logger-error'); | ||
const EventTime = require('./event-time'); | ||
var codec = msgpack.createCodec(); | ||
const codec = msgpack.createCodec(); | ||
codec.addExtPacker(0x00, EventTime, EventTime.pack); | ||
codec.addExtUnpacker(0x00, EventTime.unpack); | ||
function FluentSender(tag_prefix, options) { | ||
options = options || {}; | ||
this._eventMode = options.eventMode || 'Message'; // Message, PackedForward, CompressedPackedForward | ||
if (!/^Message|PackedForward|CompressedPackedForward$/.test(this._eventMode)) { | ||
throw new FluentLoggerError.ConfigError('Unknown event mode: ' + this._eventMode); | ||
class FluentSender { | ||
constructor(tag_prefix, options) { | ||
options = options || {}; | ||
this._eventMode = options.eventMode || 'Message'; // Message, PackedForward, CompressedPackedForward | ||
if (!/^Message|PackedForward|CompressedPackedForward$/.test(this._eventMode)) { | ||
throw new FluentLoggerError.ConfigError('Unknown event mode: ' + this._eventMode); | ||
} | ||
this.tag_prefix = tag_prefix; | ||
this.host = options.host || 'localhost'; | ||
this.port = options.port || 24224; | ||
this.path = options.path; | ||
this.timeout = options.timeout || 3.0; | ||
this.tls = !!options.tls; | ||
this.tlsOptions = options.tlsOptions || {}; | ||
this.reconnectInterval = options.reconnectInterval || 600000; // Default is 10 minutes | ||
this.requireAckResponse = options.requireAckResponse; | ||
this.ackResponseTimeout = options.ackResponseTimeout || 190000; // Default is 190 seconds | ||
this.internalLogger = options.internalLogger || console; | ||
this._timeResolution = options.milliseconds ? 1 : 1000; | ||
this._socket = null; | ||
if (this._eventMode === 'Message') { | ||
this._sendQueue = []; // queue for items waiting for being sent. | ||
this._flushInterval = 0; | ||
} else { | ||
this._sendQueue = new Map(); | ||
this._flushInterval = options.flushInterval || 100; | ||
this._sendQueueSizeLimit = 8 * 1024 * 1024; // 8MiB | ||
this._sendQueueSize = 0; | ||
this._flushSendQueueTimeoutId = null; | ||
this._compressed = (this._eventMode === 'CompressedPackedForward'); | ||
} | ||
this._eventEmitter = new EventEmitter(); | ||
// options.security = { clientHostname: "client.localdomain", sharedKey: "very-secret-shared-key" } | ||
this.security = options.security || { | ||
clientHostname: null, | ||
sharedKey: null, | ||
username: '', | ||
password: '' | ||
}; | ||
this.sharedKeySalt = crypto.randomBytes(16).toString('hex'); | ||
// helo, pingpong, established | ||
this._status = null; | ||
this._connecting = false; | ||
} | ||
this.tag_prefix = tag_prefix; | ||
this.host = options.host || 'localhost'; | ||
this.port = options.port || 24224; | ||
this.path = options.path; | ||
this.timeout = options.timeout || 3.0; | ||
this.tls = !!options.tls; | ||
this.tlsOptions = options.tlsOptions || {}; | ||
this.reconnectInterval = options.reconnectInterval || 600000; // Default is 10 minutes | ||
this.requireAckResponse = options.requireAckResponse; | ||
this.ackResponseTimeout = options.ackResponseTimeout || 190000; // Default is 190 seconds | ||
this.internalLogger = options.internalLogger || console; | ||
this._timeResolution = options.milliseconds ? 1 : 1000; | ||
this._socket = null; | ||
if (this._eventMode === 'Message') { | ||
this._sendQueue = []; // queue for items waiting for being sent. | ||
this._flushInterval = 0; | ||
} else { | ||
this._sendQueue = new Map(); | ||
this._flushInterval = options.flushInterval || 100; | ||
this._sendQueueSizeLimit = 8 * 1024 * 1024; // 8MiB | ||
this._sendQueueSize = 0; | ||
this._flushSendQueueTimeoutId = null; | ||
this._compressed = (this._eventMode === 'CompressedPackedForward'); | ||
} | ||
this._eventEmitter = new EventEmitter(); | ||
// options.security = { clientHostname: "client.localdomain", sharedKey: "very-secret-shared-key" } | ||
this.security = options.security || { | ||
clientHostname: null, | ||
sharedKey: null, | ||
username: '', | ||
password: '' | ||
}; | ||
this.sharedKeySalt = crypto.randomBytes(16).toString('hex'); | ||
// helo, pingpong, established | ||
this._status = null; | ||
this._connecting = false; | ||
} | ||
FluentSender.prototype.emit = function(/*[label] <data>, [timestamp], [callback] */) { | ||
var label, data, timestamp, callback; | ||
var args = Array.prototype.slice.call(arguments); | ||
// Label must be string always | ||
if (typeof args[0] === 'string') label = args.shift(); | ||
emit(/*[label] <data>, [timestamp], [callback] */) { | ||
let label, data, timestamp, callback; | ||
let args = Array.from(arguments); | ||
// Label must be string always | ||
if (typeof args[0] === 'string') { | ||
label = args.shift(); | ||
} | ||
// Data can be almost anything | ||
data = args.shift(); | ||
// Data can be almost anything | ||
data = args.shift(); | ||
// Date can be either timestamp number or Date object | ||
if (typeof args[0] !== 'function') timestamp = args.shift(); | ||
// Date can be either timestamp number or Date object | ||
if (typeof args[0] !== 'function') { | ||
timestamp = args.shift(); | ||
} | ||
// Last argument is an optional callback | ||
if (typeof args[0] === 'function') callback = args.shift(); | ||
// Last argument is an optional callback | ||
if (typeof args[0] === 'function') { | ||
callback = args.shift(); | ||
} | ||
let tag = this._makeTag(label); | ||
let error; | ||
let options; | ||
if (tag === null) { | ||
options = { | ||
tag_prefix: this.tag_prefix, | ||
label: label | ||
}; | ||
error = new FluentLoggerError.MissingTag('tag is missing', options); | ||
this._handleEvent('error', error, callback); | ||
return; | ||
} | ||
if (typeof data !== 'object') { | ||
options = { | ||
tag_prefix: this.tag_prefix, | ||
label: label, | ||
record: data | ||
}; | ||
error = new FluentLoggerError.DataTypeError('data must be an object', options); | ||
this._handleEvent('error', error, callback); | ||
return; | ||
} | ||
const tag = this._makeTag(label); | ||
let error; | ||
let options; | ||
if (tag === null) { | ||
options = { | ||
tag_prefix: this.tag_prefix, | ||
label: label | ||
}; | ||
error = new FluentLoggerError.MissingTag('tag is missing', options); | ||
this._handleEvent('error', error, callback); | ||
return; | ||
} | ||
if (typeof data !== 'object') { | ||
options = { | ||
tag_prefix: this.tag_prefix, | ||
label: label, | ||
record: data | ||
}; | ||
error = new FluentLoggerError.DataTypeError('data must be an object', options); | ||
this._handleEvent('error', error, callback); | ||
return; | ||
} | ||
this._push(tag, timestamp, data, callback); | ||
this._connect(() => { | ||
this._flushSendQueue(); | ||
}); | ||
}; | ||
['addListener', 'on', 'once', 'removeListener', 'removeAllListeners', 'setMaxListeners', 'getMaxListeners'].forEach((attr, i) => { | ||
FluentSender.prototype[attr] = function() { | ||
return this._eventEmitter[attr].apply(this._eventEmitter, Array.prototype.slice.call(arguments)); | ||
}; | ||
}); | ||
FluentSender.prototype.end = function(label, data, callback) { | ||
if ((label != null && data != null)) { | ||
this.emit(label, data, (err) => { | ||
this._close(); | ||
if (err) { | ||
this._handleEvent('error', err, callback); | ||
} else { | ||
callback && callback(); | ||
} | ||
this._push(tag, timestamp, data, callback); | ||
this._connect(() => { | ||
this._flushSendQueue(); | ||
}); | ||
} else { | ||
process.nextTick(() => { | ||
this._close(); | ||
callback && callback(); | ||
}); | ||
} | ||
}; | ||
FluentSender.prototype._close = function() { | ||
if (this._socket) { | ||
this._socket.end(); | ||
this._socket = null; | ||
this._status = null; | ||
end(label, data, callback) { | ||
if ((label != null && data != null)) { | ||
this.emit(label, data, (err) => { | ||
this._close(); | ||
if (err) { | ||
this._handleEvent('error', err, callback); | ||
} else { | ||
callback && callback(); | ||
} | ||
}); | ||
} else { | ||
process.nextTick(() => { | ||
this._close(); | ||
callback && callback(); | ||
}); | ||
} | ||
} | ||
}; | ||
FluentSender.prototype._makeTag = function(label) { | ||
let tag = null; | ||
if (this.tag_prefix && label) { | ||
tag = [this.tag_prefix, label].join('.'); | ||
} else if (this.tag_prefix) { | ||
tag = this.tag_prefix; | ||
} else if (label) { | ||
tag = label; | ||
_close() { | ||
if (this._socket) { | ||
this._socket.end(); | ||
this._socket = null; | ||
this._status = null; | ||
} | ||
} | ||
return tag; | ||
}; | ||
FluentSender.prototype._makePacketItem = function(tag, time, data) { | ||
if (typeof time !== 'number' && !(time instanceof EventTime)) { | ||
time = Math.floor((time ? time.getTime() : Date.now()) / this._timeResolution); | ||
_makeTag(label) { | ||
let tag = null; | ||
if (this.tag_prefix && label) { | ||
tag = [this.tag_prefix, label].join('.'); | ||
} else if (this.tag_prefix) { | ||
tag = this.tag_prefix; | ||
} else if (label) { | ||
tag = label; | ||
} | ||
return tag; | ||
} | ||
var packet = [tag, time, data]; | ||
var options = {}; | ||
if (this.requireAckResponse) { | ||
options = { | ||
chunk: crypto.randomBytes(16).toString('base64') | ||
_makePacketItem(tag, time, data) { | ||
if (typeof time !== 'number' && !(time instanceof EventTime)) { | ||
time = Math.floor((time ? time.getTime() : Date.now()) / this._timeResolution); | ||
} | ||
const packet = [tag, time, data]; | ||
let options = {}; | ||
if (this.requireAckResponse) { | ||
options = { | ||
chunk: crypto.randomBytes(16).toString('base64') | ||
}; | ||
packet.push(options); | ||
} | ||
return { | ||
packet: msgpack.encode(packet, { codec: codec }), | ||
tag: tag, | ||
time: time, | ||
data: data, | ||
options: options | ||
}; | ||
packet.push(options); | ||
} | ||
return { | ||
packet: msgpack.encode(packet, { codec: codec }), | ||
tag: tag, | ||
time: time, | ||
data: data, | ||
options: options | ||
}; | ||
}; | ||
FluentSender.prototype._makeEventEntry = function(time, data) { | ||
if (typeof time !== 'number' && !(time instanceof EventTime)) { | ||
time = Math.floor((time ? time.getTime() : Date.now()) / this._timeResolution); | ||
_makeEventEntry(time, data) { | ||
if (typeof time !== 'number' && !(time instanceof EventTime)) { | ||
time = Math.floor((time ? time.getTime() : Date.now()) / this._timeResolution); | ||
} | ||
return msgpack.encode([time, data], { codec: codec }); | ||
} | ||
return msgpack.encode([time, data], { codec: codec }); | ||
}; | ||
FluentSender.prototype._push = function(tag, time, data, callback) { | ||
if (this._eventMode === 'Message') { | ||
// Message mode | ||
let item = this._makePacketItem(tag, time, data); | ||
item.callback = callback; | ||
this._sendQueue.push(item); | ||
} else { | ||
// PackedForward mode | ||
let eventEntry = this._makeEventEntry(time, data); | ||
this._sendQueueSize += eventEntry.length; | ||
if (this._sendQueue.has(tag)) { | ||
let eventEntryData = this._sendQueue.get(tag); | ||
eventEntryData.eventEntry = Buffer.concat([eventEntryData.eventEntry, eventEntry]); | ||
eventEntryData.callbacks.push(callback); | ||
this._sendQueue.set(tag, eventEntryData); | ||
_push(tag, time, data, callback) { | ||
if (this._eventMode === 'Message') { | ||
// Message mode | ||
let item = this._makePacketItem(tag, time, data); | ||
item.callback = callback; | ||
this._sendQueue.push(item); | ||
} else { | ||
this._sendQueue.set(tag, { eventEntry: eventEntry, callbacks: [callback] }); | ||
// PackedForward mode | ||
const eventEntry = this._makeEventEntry(time, data); | ||
this._sendQueueSize += eventEntry.length; | ||
if (this._sendQueue.has(tag)) { | ||
let eventEntryData = this._sendQueue.get(tag); | ||
eventEntryData.eventEntry = Buffer.concat([eventEntryData.eventEntry, eventEntry]); | ||
eventEntryData.callbacks.push(callback); | ||
this._sendQueue.set(tag, eventEntryData); | ||
} else { | ||
this._sendQueue.set(tag, { eventEntry: eventEntry, callbacks: [callback] }); | ||
} | ||
} | ||
} | ||
}; | ||
FluentSender.prototype._connect = function(callback) { | ||
if (this._connecting) { | ||
return; | ||
} | ||
_connect(callback) { | ||
if (this._connecting) { | ||
return; | ||
} | ||
this._connecting = true; | ||
process.nextTick(() => { | ||
if (this._socket === null) { | ||
this._doConnect(() => { | ||
this._connecting = false; | ||
callback(); | ||
}); | ||
} else { | ||
if (!this._socket.writable) { | ||
this._disconnect(); | ||
process.nextTick(() => { | ||
this._connecting = true; | ||
process.nextTick(() => { | ||
if (this._socket === null) { | ||
this._doConnect(() => { | ||
this._connecting = false; | ||
this._connect(callback); | ||
callback(); | ||
}); | ||
} else { | ||
process.nextTick(() => { | ||
this._connecting = false; | ||
callback(); | ||
}); | ||
if (!this._socket.writable) { | ||
this._disconnect(); | ||
process.nextTick(() => { | ||
this._connecting = false; | ||
this._connect(callback); | ||
}); | ||
} else { | ||
process.nextTick(() => { | ||
this._connecting = false; | ||
callback(); | ||
}); | ||
} | ||
} | ||
} | ||
}); | ||
}; | ||
}); | ||
} | ||
FluentSender.prototype._doConnect = function(callback) { | ||
let addHandlers = () => { | ||
let errorHandler = (err) => { | ||
if (this._socket) { | ||
this._disconnect(); | ||
this._handleEvent('error', err); | ||
_doConnect(callback) { | ||
let addHandlers = () => { | ||
let errorHandler = (err) => { | ||
if (this._socket) { | ||
this._disconnect(); | ||
this._handleEvent('error', err); | ||
} | ||
}; | ||
this._socket.on('error', errorHandler); | ||
this._socket.on('connect', () => { | ||
this._handleEvent('connect'); | ||
}); | ||
if (this.tls) { | ||
this._socket.on('tlsClientError', errorHandler); | ||
this._socket.on('secureConnect', () => { | ||
this._handleEvent('connect'); | ||
}); | ||
} | ||
}; | ||
this._socket.on('error', errorHandler); | ||
this._socket.on('connect', () => { | ||
this._handleEvent('connect'); | ||
}); | ||
if (this.tls) { | ||
this._socket.on('tlsClientError', errorHandler); | ||
this._socket.on('secureConnect', () => { | ||
this._handleEvent('connect'); | ||
}); | ||
} | ||
}; | ||
if (!this.tls) { | ||
this._socket = new net.Socket(); | ||
this._socket.setTimeout(this.timeout); | ||
addHandlers(); | ||
} | ||
if (this.path) { | ||
if (this.tls) { | ||
this._socket = tls.connect(Object.assign({}, this.tlsOptions, { path: this.path }), () => { | ||
callback(); | ||
}); | ||
if (!this.tls) { | ||
this._socket = new net.Socket(); | ||
this._socket.setTimeout(this.timeout); | ||
addHandlers(); | ||
} else { | ||
this._socket.connect(this.path, () => { | ||
callback(); | ||
}); | ||
} | ||
} else { | ||
let postConnect = () => { | ||
if (this.security.clientHostname && this.security.sharedKey !== null) { | ||
this._handshake(callback); | ||
if (this.path) { | ||
if (this.tls) { | ||
this._socket = tls.connect(Object.assign({}, this.tlsOptions, { path: this.path }), () => { | ||
callback(); | ||
}); | ||
addHandlers(); | ||
} else { | ||
this._status = 'established'; | ||
callback(); | ||
this._socket.connect(this.path, () => { | ||
callback(); | ||
}); | ||
} | ||
}; | ||
if (this.tls) { | ||
this._socket = tls.connect(Object.assign({}, this.tlsOptions, { host: this.host, port: this.port }), () => { | ||
postConnect(); | ||
}); | ||
addHandlers(); | ||
} else { | ||
this._socket.connect(this.port, this.host, () => { | ||
postConnect(); | ||
}); | ||
let postConnect = () => { | ||
if (this.security.clientHostname && this.security.sharedKey !== null) { | ||
this._handshake(callback); | ||
} else { | ||
this._status = 'established'; | ||
callback(); | ||
} | ||
}; | ||
if (this.tls) { | ||
this._socket = tls.connect(Object.assign({}, this.tlsOptions, { host: this.host, port: this.port }), () => { | ||
postConnect(); | ||
}); | ||
addHandlers(); | ||
} else { | ||
this._socket.connect(this.port, this.host, () => { | ||
postConnect(); | ||
}); | ||
} | ||
} | ||
} | ||
}; | ||
FluentSender.prototype._disconnect = function() { | ||
this._socket && this._socket.destroy(); | ||
this._socket = null; | ||
this._status = null; | ||
this._connecting = false; | ||
}; | ||
_disconnect() { | ||
this._socket && this._socket.destroy(); | ||
this._socket = null; | ||
this._status = null; | ||
this._connecting = false; | ||
} | ||
FluentSender.prototype._handshake = function(callback) { | ||
if (this._status === 'established') { | ||
return; | ||
} | ||
this._status = 'helo'; | ||
this._socket.once('data', (data) => { | ||
this._socket.pause(); | ||
let heloStatus = this._checkHelo(data); | ||
if (!heloStatus.succeeded) { | ||
this.internalLogger.error('Received invalid HELO message from ' + this._socket.remoteAddress); | ||
this._disconnect(); | ||
_handshake(callback) { | ||
if (this._status === 'established') { | ||
return; | ||
} | ||
this._status = 'pingpong'; | ||
this._socket.write(new Buffer(this._generatePing()), () => { | ||
this._socket.resume(); | ||
this._socket.once('data', (data) => { | ||
let pongStatus = this._checkPong(data); | ||
if (!pongStatus.succeeded) { | ||
this.internalLogger.error(pongStatus.message); | ||
let error = new FluentLoggerError.HandshakeError(pongStatus.message); | ||
this._handleEvent('error', error); | ||
this._disconnect(); | ||
return; | ||
} | ||
this._status = 'established'; | ||
this.internalLogger.info('Established'); | ||
callback(); | ||
this._status = 'helo'; | ||
this._socket.once('data', (data) => { | ||
this._socket.pause(); | ||
const heloStatus = this._checkHelo(data); | ||
if (!heloStatus.succeeded) { | ||
this.internalLogger.error('Received invalid HELO message from ' + this._socket.remoteAddress); | ||
this._disconnect(); | ||
return; | ||
} | ||
this._status = 'pingpong'; | ||
this._socket.write(Buffer.from(this._generatePing()), () => { | ||
this._socket.resume(); | ||
this._socket.once('data', (data) => { | ||
const pongStatus = this._checkPong(data); | ||
if (!pongStatus.succeeded) { | ||
this.internalLogger.error(pongStatus.message); | ||
const error = new FluentLoggerError.HandshakeError(pongStatus.message); | ||
this._handleEvent('error', error); | ||
this._disconnect(); | ||
return; | ||
} | ||
this._status = 'established'; | ||
this.internalLogger.info('Established'); | ||
callback(); | ||
}); | ||
}); | ||
}); | ||
}); | ||
}; | ||
} | ||
FluentSender.prototype._flushSendQueue = function() { | ||
if (this._flushingSendQueue) | ||
return; | ||
_flushSendQueue() { | ||
if (this._flushingSendQueue) | ||
return; | ||
this._flushingSendQueue = true; | ||
process.nextTick(() => { | ||
this._waitToWrite(); | ||
}); | ||
}; | ||
FluentSender.prototype._waitToWrite = function() { | ||
if (!this._socket) { | ||
this._flushingSendQueue = false; | ||
return; | ||
this._flushingSendQueue = true; | ||
process.nextTick(() => { | ||
this._waitToWrite(); | ||
}); | ||
} | ||
if (this._socket.writable) { | ||
if (this._eventMode === 'Message') { | ||
this._doFlushSendQueue(); | ||
} else { | ||
if (this._sendQueueSize >= this._sendQueueSizeLimit) { | ||
this._flushSendQueueTimeoutId && clearTimeout(this._flushSendQueueTimeoutId); | ||
_waitToWrite() { | ||
if (!this._socket) { | ||
this._flushingSendQueue = false; | ||
return; | ||
} | ||
if (this._socket.writable) { | ||
if (this._eventMode === 'Message') { | ||
this._doFlushSendQueue(); | ||
} else { | ||
this._flushSendQueueTimeoutId && clearTimeout(this._flushSendQueueTimeoutId); | ||
this._flushSendQueueTimeoutId = setTimeout(() => { | ||
if (this._sendQueueSize >= this._sendQueueSizeLimit) { | ||
this._flushSendQueueTimeoutId && clearTimeout(this._flushSendQueueTimeoutId); | ||
this._doFlushSendQueue(); | ||
}, this._flushInterval); | ||
} else { | ||
this._flushSendQueueTimeoutId && clearTimeout(this._flushSendQueueTimeoutId); | ||
this._flushSendQueueTimeoutId = setTimeout(() => { | ||
this._doFlushSendQueue(); | ||
}, this._flushInterval); | ||
} | ||
} | ||
} else { | ||
process.nextTick(() => { | ||
this._waitToWrite(); | ||
}); | ||
} | ||
} else { | ||
process.nextTick(() => { | ||
this._waitToWrite(); | ||
}); | ||
} | ||
} | ||
FluentSender.prototype._doFlushSendQueue = function(timeoutId) { | ||
if (this._eventMode === 'Message') { | ||
let item = this._sendQueue.shift(); | ||
if (item === undefined) { | ||
this._flushingSendQueue = false; | ||
// nothing written; | ||
return; | ||
_doFlushSendQueue(timeoutId) { | ||
if (this._eventMode === 'Message') { | ||
let item = this._sendQueue.shift(); | ||
if (item === undefined) { | ||
this._flushingSendQueue = false; | ||
// nothing written; | ||
return; | ||
} | ||
this._doWrite(item.packet, item.options, timeoutId, [item.callback]); | ||
} else { | ||
if (this._sendQueue.size === 0) { | ||
this._flushingSendQueue = false; | ||
return; | ||
} | ||
let tag = Array.from(this._sendQueue.keys())[0]; | ||
let eventEntryData = this._sendQueue.get(tag); | ||
let entries = eventEntryData.eventEntry; | ||
let size = eventEntryData.eventEntry.length; | ||
this._sendQueue.delete(tag); | ||
if (this._compressed) { | ||
entries = zlib.gzipSync(entries); | ||
size = entries.length; | ||
} | ||
const options = { | ||
chunk: crypto.randomBytes(16).toString('base64'), | ||
size: size, | ||
compressed: this._compressed ? 'gzip' : 'text' | ||
}; | ||
const packet = msgpack.encode([tag, entries, options], { codec: codec }); | ||
this._doWrite(packet, options, timeoutId, eventEntryData.callbacks); | ||
} | ||
this._doWrite(item.packet, item.options, timeoutId, [item.callback]); | ||
} else { | ||
if (this._sendQueue.size === 0) { | ||
this._flushingSendQueue = false; | ||
return; | ||
} | ||
let tag = Array.from(this._sendQueue.keys())[0]; | ||
let eventEntryData = this._sendQueue.get(tag); | ||
let entries = eventEntryData.eventEntry; | ||
let size = eventEntryData.eventEntry.length; | ||
this._sendQueue.delete(tag); | ||
if (this._compressed) { | ||
entries = zlib.gzipSync(entries); | ||
size = entries.length; | ||
} | ||
let options = { | ||
chunk: crypto.randomBytes(16).toString('base64'), | ||
size: size, | ||
compressed: this._compressed ? 'gzip' : 'text' | ||
}; | ||
let packet = msgpack.encode([tag, entries, options], { codec: codec }); | ||
this._doWrite(packet, options, timeoutId, eventEntryData.callbacks); | ||
} | ||
}; | ||
FluentSender.prototype._doWrite = function(packet, options, timeoutId, callbacks) { | ||
const sendQueueSize = this._sendQueueSize; | ||
this._socket.write(new Buffer(packet), () => { | ||
if (this.requireAckResponse) { | ||
this._socket.once('data', (data) => { | ||
timeoutId && clearTimeout(timeoutId); | ||
var response = msgpack.decode(data, { codec: codec }); | ||
if (response.ack !== options.chunk) { | ||
var error = new FluentLoggerError.ResponseError( | ||
'ack in response and chunk id in sent data are different', | ||
{ ack: response.ack, chunk: options.chunk } | ||
); | ||
_doWrite(packet, options, timeoutId, callbacks) { | ||
const sendQueueSize = this._sendQueueSize; | ||
this._socket.write(Buffer.from(packet), () => { | ||
if (this.requireAckResponse) { | ||
this._socket.once('data', (data) => { | ||
timeoutId && clearTimeout(timeoutId); | ||
const response = msgpack.decode(data, { codec: codec }); | ||
if (response.ack !== options.chunk) { | ||
const error = new FluentLoggerError.ResponseError( | ||
'ack in response and chunk id in sent data are different', | ||
{ ack: response.ack, chunk: options.chunk } | ||
); | ||
callbacks.forEach((callback) => { | ||
this._handleEvent('error', error, callback); | ||
}); | ||
} | ||
this._sendQueueSize -= sendQueueSize; | ||
callbacks.forEach((callback) => { | ||
callback && callback(); | ||
}); | ||
process.nextTick(() => { | ||
this._waitToWrite(); | ||
}); | ||
}); | ||
timeoutId = setTimeout(() => { | ||
const error = new FluentLoggerError.ResponseTimeout('ack response timeout'); | ||
callbacks.forEach((callback) => { | ||
this._handleEvent('error', error, callback); | ||
}); | ||
} | ||
}, this.ackResponseTimeout); | ||
} else { | ||
this._sendQueueSize -= sendQueueSize; | ||
@@ -418,169 +434,160 @@ callbacks.forEach((callback) => { | ||
}); | ||
}); | ||
timeoutId = setTimeout(() => { | ||
var error = new FluentLoggerError.ResponseTimeout('ack response timeout'); | ||
callbacks.forEach((callback) => { | ||
this._handleEvent('error', error, callback); | ||
}); | ||
}, this.ackResponseTimeout); | ||
} else { | ||
this._sendQueueSize -= sendQueueSize; | ||
callbacks.forEach((callback) => { | ||
callback && callback(); | ||
}); | ||
process.nextTick(() => { | ||
this._waitToWrite(); | ||
}); | ||
} | ||
}); | ||
} | ||
_handleEvent(signal, data, callback) { | ||
callback && callback(data); | ||
if (this._eventEmitter.listenerCount(signal) > 0) { | ||
this._eventEmitter.emit(signal, data); | ||
} | ||
}); | ||
}; | ||
FluentSender.prototype._handleEvent = function _handleEvent(signal, data, callback) { | ||
callback && callback(data); | ||
if (this._eventEmitter.listenerCount(signal) > 0) { | ||
this._eventEmitter.emit(signal, data); | ||
} | ||
}; | ||
FluentSender.prototype._setupErrorHandler = function _setupErrorHandler(callback) { | ||
if (!this.reconnectInterval) { | ||
return; | ||
_setupErrorHandler(callback) { | ||
if (!this.reconnectInterval) { | ||
return; | ||
} | ||
this.on('error', (error) => { | ||
this._flushingSendQueue = false; | ||
this._status = null; | ||
this.internalLogger.error('Fluentd error', error); | ||
this.internalLogger.info('Fluentd will reconnect after ' + this.reconnectInterval / 1000 + ' seconds'); | ||
let timeoutId = setTimeout(() => { | ||
this.internalLogger.info('Fluentd is reconnecting...'); | ||
this._connect(() => { | ||
this.internalLogger.info('Fluentd reconnection finished!!'); | ||
}); | ||
}, this.reconnectInterval); | ||
callback && callback(timeoutId); | ||
}); | ||
} | ||
this.on('error', (error) => { | ||
this._flushingSendQueue = false; | ||
this._status = null; | ||
this.internalLogger.error('Fluentd error', error); | ||
this.internalLogger.info('Fluentd will reconnect after ' + this.reconnectInterval / 1000 + ' seconds'); | ||
let timeoutId = setTimeout(() => { | ||
this.internalLogger.info('Fluentd is reconnecting...'); | ||
this._connect(() => { | ||
this.internalLogger.info('Fluentd reconnection finished!!'); | ||
}); | ||
}, this.reconnectInterval); | ||
callback && callback(timeoutId); | ||
}); | ||
}; | ||
FluentSender.prototype._checkHelo = function _checkHelo(data) { | ||
// ['HELO', options(hash)] | ||
this.internalLogger.info('Checking HELO...'); | ||
var message = msgpack.decode(data); | ||
if (message.length !== 2) { | ||
return { succeeded: false, message: 'Invalid format for HELO message' }; | ||
_checkHelo(data) { | ||
// ['HELO', options(hash)] | ||
this.internalLogger.info('Checking HELO...'); | ||
const message = msgpack.decode(data); | ||
if (message.length !== 2) { | ||
return { succeeded: false, message: 'Invalid format for HELO message' }; | ||
} | ||
if (message[0] !== 'HELO') { | ||
return { succeeded: false, message: 'Invalid format for HELO message' }; | ||
} | ||
const options = message[1] || {}; | ||
this.sharedKeyNonce = options['nonce'] || ''; | ||
this.authentication = options['auth'] || ''; | ||
return { succeeded: true }; | ||
} | ||
if (message[0] !== 'HELO') { | ||
return { succeeded: false, message: 'Invalid format for HELO message' }; | ||
} | ||
var options = message[1] || {}; | ||
this.sharedKeyNonce = options['nonce'] || ''; | ||
this.authentication = options['auth'] || ''; | ||
return { succeeded: true }; | ||
}; | ||
FluentSender.prototype._generatePing = function _generatePing() { | ||
// [ | ||
// 'PING', | ||
// client_hostname, | ||
// shared_key_salt, | ||
// sha512_hex(sharedkey_salt + client_hostname + nonce + shared_key), | ||
// username || '', | ||
// sha512_hex(auth_salt + username + password) || '' | ||
// ] | ||
var sharedKeyHexdigest = crypto.createHash('sha512') | ||
.update(this.sharedKeySalt) | ||
.update(this.security.clientHostname) | ||
.update(this.sharedKeyNonce) | ||
.update(this.security.sharedKey) | ||
.digest('hex'); | ||
var ping = ['PING', this.security.clientHostname, this.sharedKeySalt, sharedKeyHexdigest]; | ||
if (Buffer.isBuffer(this.authentication) && Buffer.byteLength(this.authentication) !== 0) { | ||
var passwordHexDigest = crypto.createHash('sha512') | ||
.update(this.authentication) | ||
.update(this.security.username || '') | ||
.update(this.security.password || '') | ||
_generatePing() { | ||
// [ | ||
// 'PING', | ||
// client_hostname, | ||
// shared_key_salt, | ||
// sha512_hex(sharedkey_salt + client_hostname + nonce + shared_key), | ||
// username || '', | ||
// sha512_hex(auth_salt + username + password) || '' | ||
// ] | ||
const sharedKeyHexdigest = crypto.createHash('sha512') | ||
.update(this.sharedKeySalt) | ||
.update(this.security.clientHostname) | ||
.update(this.sharedKeyNonce) | ||
.update(this.security.sharedKey) | ||
.digest('hex'); | ||
ping.push(this.username, passwordHexDigest); | ||
} else { | ||
ping.push('', ''); | ||
const ping = ['PING', this.security.clientHostname, this.sharedKeySalt, sharedKeyHexdigest]; | ||
if (Buffer.isBuffer(this.authentication) && Buffer.byteLength(this.authentication) !== 0) { | ||
const passwordHexDigest = crypto.createHash('sha512') | ||
.update(this.authentication) | ||
.update(this.security.username || '') | ||
.update(this.security.password || '') | ||
.digest('hex'); | ||
ping.push(this.username, passwordHexDigest); | ||
} else { | ||
ping.push('', ''); | ||
} | ||
return msgpack.encode(ping); | ||
} | ||
return msgpack.encode(ping); | ||
}; | ||
FluentSender.prototype._checkPong = function _checkPong(data) { | ||
// [ | ||
// 'PONG', | ||
// bool(authentication result), | ||
// 'reason if authentication failed', | ||
// server_hostname, | ||
// sha512_hex(salt + server_hostname + nonce + sharedkey) | ||
// ] | ||
this.internalLogger.info('Checking PONG...'); | ||
var message = msgpack.decode(data); | ||
if (message.length !== 5) { | ||
return false; | ||
_checkPong(data) { | ||
// [ | ||
// 'PONG', | ||
// bool(authentication result), | ||
// 'reason if authentication failed', | ||
// server_hostname, | ||
// sha512_hex(salt + server_hostname + nonce + sharedkey) | ||
// ] | ||
this.internalLogger.info('Checking PONG...'); | ||
const message = msgpack.decode(data); | ||
if (message.length !== 5) { | ||
return false; | ||
} | ||
if (message[0] !== 'PONG') { | ||
return { succeeded: false, message: 'Invalid format for PONG message' }; | ||
} | ||
const _pong = message[0]; | ||
const authResult = message[1]; | ||
const reason = message[2]; | ||
const hostname = message[3]; | ||
const sharedKeyHexdigest = message[4]; | ||
if (!authResult) { | ||
return { succeeded: false, message: 'Authentication failed: ' + reason }; | ||
} | ||
if (hostname === this.security.clientHostname) { | ||
return { succeeded: false, message: 'Same hostname between input and output: invalid configuration' }; | ||
} | ||
const clientsideHexdigest = crypto.createHash('sha512') | ||
.update(this.sharedKeySalt) | ||
.update(hostname) | ||
.update(this.sharedKeyNonce) | ||
.update(this.security.sharedKey) | ||
.digest('hex'); | ||
if (sharedKeyHexdigest !== clientsideHexdigest) { | ||
return { succeeded: false, message: 'Sharedkey mismatch' }; | ||
} | ||
return { succeeded: true }; | ||
} | ||
if (message[0] !== 'PONG') { | ||
return { succeeded: false, message: 'Invalid format for PONG message' }; | ||
} | ||
var _pong = message[0]; | ||
var authResult = message[1]; | ||
var reason = message[2]; | ||
var hostname = message[3]; | ||
var sharedKeyHexdigest = message[4]; | ||
if (!authResult) { | ||
return { succeeded: false, message: 'Authentication failed: ' + reason }; | ||
} | ||
if (hostname === this.security.clientHostname) { | ||
return { succeeded: false, message: 'Same hostname between input and output: invalid configuration' }; | ||
} | ||
var clientsideHexdigest = crypto.createHash('sha512') | ||
.update(this.sharedKeySalt) | ||
.update(hostname) | ||
.update(this.sharedKeyNonce) | ||
.update(this.security.sharedKey) | ||
.digest('hex'); | ||
if (sharedKeyHexdigest !== clientsideHexdigest) { | ||
return { succeeded: false, message: 'Sharedkey mismatch' }; | ||
} | ||
return { succeeded: true }; | ||
}; | ||
FluentSender.prototype.toStream = function(options) { | ||
if (typeof options === 'string') { | ||
options = {label: options}; | ||
} else { | ||
options = options || {}; | ||
} | ||
var label = options.label; | ||
if (!label) { | ||
throw new Error('label is needed'); | ||
} | ||
var defaultEncoding = options.encoding || 'UTF-8'; | ||
var writable = new stream.Writable(); | ||
var dataString = ''; | ||
writable._write = (chunk, encoding, callback) => { | ||
var dataArray = chunk.toString(defaultEncoding).split(/\n/); | ||
var next = () => { | ||
if (dataArray.length) { | ||
dataString += dataArray.shift(); | ||
} | ||
if (!dataArray.length) { | ||
process.nextTick(callback); | ||
return; | ||
} | ||
this.emit(label, { message: dataString }, (err) => { | ||
if (err) { | ||
this._handleEvent('error', err, callback); | ||
toStream(options) { | ||
if (typeof options === 'string') { | ||
options = {label: options}; | ||
} else { | ||
options = options || {}; | ||
} | ||
const label = options.label; | ||
if (!label) { | ||
throw new Error('label is needed'); | ||
} | ||
const defaultEncoding = options.encoding || 'UTF-8'; | ||
const writable = new stream.Writable(); | ||
let dataString = ''; | ||
writable._write = (chunk, encoding, callback) => { | ||
const dataArray = chunk.toString(defaultEncoding).split(/\n/); | ||
let next = () => { | ||
if (dataArray.length) { | ||
dataString += dataArray.shift(); | ||
} | ||
if (!dataArray.length) { | ||
process.nextTick(callback); | ||
return; | ||
} | ||
dataString = ''; | ||
next(); | ||
}); | ||
this.emit(label, { message: dataString }, (err) => { | ||
if (err) { | ||
this._handleEvent('error', err, callback); | ||
return; | ||
} | ||
dataString = ''; | ||
next(); | ||
}); | ||
}; | ||
next(); | ||
}; | ||
next(); | ||
return writable; | ||
} | ||
} | ||
['addListener', 'on', 'once', 'removeListener', 'removeAllListeners', 'setMaxListeners', 'getMaxListeners'].forEach((attr, i) => { | ||
FluentSender.prototype[attr] = function() { | ||
return this._eventEmitter[attr].apply(this._eventEmitter, Array.from(arguments)); | ||
}; | ||
return writable; | ||
}; | ||
}); | ||
module.exports = {}; | ||
module.exports.FluentSender = FluentSender; | ||
module.exports = FluentSender; |
'use strict'; | ||
var net = require('net'); | ||
var tls = require('tls'); | ||
var msgpack = require('msgpack-lite'); | ||
var crypto = require('crypto'); | ||
var zlib = require('zlib'); | ||
const net = require('net'); | ||
const tls = require('tls'); | ||
const msgpack = require('msgpack-lite'); | ||
const crypto = require('crypto'); | ||
const zlib = require('zlib'); | ||
function MockFluentdServer(options, tlsOptions) { | ||
this._port = null; | ||
this._options = options; | ||
this._tlsOptions = tlsOptions || {}; | ||
this._received = []; | ||
this._clients = {}; | ||
this._state = null; | ||
this._nonce = null; | ||
this._userAuthSalt = null; | ||
let server = (socket) => { | ||
var clientKey = socket.remoteAddress + ':' + socket.remotePort; | ||
this._clients[clientKey] = socket; | ||
socket.on('end', () => { | ||
delete this._clients[clientKey]; | ||
}); | ||
var stream = msgpack.createDecodeStream(); | ||
socket.pipe(stream).on('data', (m) => { | ||
if (this._state === 'pingpong') { | ||
let authResult = this._checkPing(m); | ||
socket.write(msgpack.encode(this._generatePong(authResult, this._nonce, this._options.security.sharedKey))); | ||
if (authResult.succeeded) { | ||
this._state = 'established'; | ||
} else { | ||
socket.end(); | ||
} | ||
} else if (this._state === 'established') { | ||
let entries = m[1]; | ||
let options = null; | ||
if (entries instanceof Buffer) { | ||
options = m[2]; | ||
if (options.compressed === 'gzip') { | ||
entries = zlib.gunzipSync(entries); | ||
class MockFluentdServer { | ||
constructor(options, tlsOptions) { | ||
this._port = null; | ||
this._options = options; | ||
this._tlsOptions = tlsOptions || {}; | ||
this._received = []; | ||
this._clients = {}; | ||
this._state = null; | ||
this._nonce = null; | ||
this._userAuthSalt = null; | ||
let server = (socket) => { | ||
const clientKey = socket.remoteAddress + ':' + socket.remotePort; | ||
this._clients[clientKey] = socket; | ||
socket.on('end', () => { | ||
delete this._clients[clientKey]; | ||
}); | ||
const stream = msgpack.createDecodeStream(); | ||
socket.pipe(stream).on('data', (m) => { | ||
if (this._state === 'pingpong') { | ||
const authResult = this._checkPing(m); | ||
socket.write(msgpack.encode(this._generatePong(authResult, this._nonce, this._options.security.sharedKey))); | ||
if (authResult.succeeded) { | ||
this._state = 'established'; | ||
} else { | ||
socket.end(); | ||
} | ||
let s = msgpack.createDecodeStream(); | ||
s.on('data', (data) => { | ||
let _time = data[0]; | ||
let record = data[1]; | ||
} else if (this._state === 'established') { | ||
let entries = m[1]; | ||
let options = null; | ||
if (entries instanceof Buffer) { | ||
options = m[2]; | ||
if (options.compressed === 'gzip') { | ||
entries = zlib.gunzipSync(entries); | ||
} | ||
let s = msgpack.createDecodeStream(); | ||
s.on('data', (data) => { | ||
let _time = data[0]; | ||
let record = data[1]; | ||
this._received.push({ | ||
tag: m[0], | ||
data: record, | ||
options: options | ||
}); | ||
}); | ||
s.write(entries); | ||
} else { | ||
this._received.push({ | ||
tag: m[0], | ||
data: record, | ||
options: options | ||
time: m[1], | ||
data: m[2], | ||
options: m[3] | ||
}); | ||
}); | ||
s.write(entries); | ||
} else { | ||
this._received.push({ | ||
tag: m[0], | ||
time: m[1], | ||
data: m[2], | ||
options: m[3] | ||
}); | ||
options = m[3]; | ||
options = m[3]; | ||
} | ||
if (this._options.requireAckResponse && options && options.chunk) { | ||
const response = { | ||
ack: options.chunk | ||
}; | ||
socket.write(msgpack.encode(response)); | ||
} | ||
} | ||
if (this._options.requireAckResponse && options && options.chunk) { | ||
var response = { | ||
ack: options.chunk | ||
}; | ||
socket.write(msgpack.encode(response)); | ||
} | ||
}); | ||
}; | ||
let connectionEventType = 'connection'; | ||
if (this._tlsOptions.tls) { | ||
connectionEventType = 'secureConnection'; | ||
this._server = tls.createServer(this._tlsOptions, server); | ||
} else { | ||
this._server = net.createServer(server); | ||
} | ||
this._server.on(connectionEventType, (socket) => { | ||
if (this._options.security && this._options.security.sharedKey && this._options.security.serverHostname) { | ||
this._state = 'helo'; | ||
this._nonce = crypto.randomBytes(16); | ||
this._userAuthSalt = crypto.randomBytes(16); | ||
} else { | ||
this._state = 'established'; | ||
} | ||
if (this._state === 'helo') { | ||
socket.write(msgpack.encode(this._generateHelo(this._nonce, this._userAuthSalt))); | ||
this._state = 'pingpong'; | ||
} | ||
}); | ||
}; | ||
var connectionEventType = 'connection'; | ||
if (this._tlsOptions.tls) { | ||
connectionEventType = 'secureConnection'; | ||
this._server = tls.createServer(this._tlsOptions, server); | ||
} else { | ||
this._server = net.createServer(server); | ||
} | ||
this._server.on(connectionEventType, (socket) => { | ||
if (this._options.security && this._options.security.sharedKey && this._options.security.serverHostname) { | ||
this._state = 'helo'; | ||
this._nonce = crypto.randomBytes(16); | ||
this._userAuthSalt = crypto.randomBytes(16); | ||
} else { | ||
this._state = 'established'; | ||
} | ||
if (this._state === 'helo') { | ||
socket.write(msgpack.encode(this._generateHelo(this._nonce, this._userAuthSalt))); | ||
this._state = 'pingpong'; | ||
} | ||
}); | ||
} | ||
MockFluentdServer.prototype.__defineGetter__('port', function() { | ||
return this._port; | ||
}); | ||
get port() { | ||
return this._port; | ||
} | ||
MockFluentdServer.prototype.__defineGetter__('messages', function() { | ||
return this._received; | ||
}); | ||
get messages() { | ||
return this._received; | ||
} | ||
MockFluentdServer.prototype._generateHelo = function(nonce, userAuthSalt) { | ||
// ['HELO', options(hash)] | ||
let options = { | ||
'nonce': nonce, | ||
'auth': this._options.security ? userAuthSalt : '', | ||
'keepalive': false | ||
}; | ||
return ['HELO', options]; | ||
}; | ||
_generateHelo(nonce, userAuthSalt) { | ||
// ['HELO', options(hash)] | ||
let options = { | ||
'nonce': nonce, | ||
'auth': this._options.security ? userAuthSalt : '', | ||
'keepalive': false | ||
}; | ||
return ['HELO', options]; | ||
} | ||
MockFluentdServer.prototype._checkPing = function(m) { | ||
// this._options.checkPing() should return { succeeded: true, reason: 'why', sharedKeySalt: 'salt' } | ||
if (this._options.checkPing) { | ||
return this._options.checkPing(m); | ||
} else { | ||
// ['PING', self_hostname, shared_key_salt, sha512_hex(shared_key_salt + self_hostname + nonce + shared_key), username || '', sha512_hex(auth_salt + username + password) || ''] | ||
if (m.length !== 6) { | ||
return { succeeded: false, reason: 'Invalid ping message size' }; | ||
} | ||
if (m[0] !== 'PING') { | ||
return { succeeded: false, reason: 'Invalid ping message' }; | ||
} | ||
let _ping = m[0]; | ||
let hostname = m[1]; | ||
let sharedKeySalt = m[2]; | ||
let sharedKeyHexDigest = m[3]; | ||
let _username = m[4]; | ||
let passwordDigest = m[5]; | ||
let serverSideDigest = crypto.createHash('sha512') | ||
.update(sharedKeySalt) | ||
.update(hostname) | ||
.update(this._nonce) | ||
.update(this._options.security.sharedKey) | ||
.digest('hex'); | ||
if (sharedKeyHexDigest !== serverSideDigest) { | ||
return { succeeded: false, reason: 'shared key mismatch' }; | ||
} | ||
if (this._options.security.username && this._options.security.password) { | ||
let serverSidePasswordDigest = crypto.createHash('sha512') | ||
.update(this._userAuthSalt) | ||
.update(this._options.security.username) | ||
.update(this._options.security.password) | ||
_checkPing(m) { | ||
// this._options.checkPing() should return { succeeded: true, reason: 'why', sharedKeySalt: 'salt' } | ||
if (this._options.checkPing) { | ||
return this._options.checkPing(m); | ||
} else { | ||
// ['PING', self_hostname, shared_key_salt, sha512_hex(shared_key_salt + self_hostname + nonce + shared_key), username || '', sha512_hex(auth_salt + username + password) || ''] | ||
if (m.length !== 6) { | ||
return { succeeded: false, reason: 'Invalid ping message size' }; | ||
} | ||
if (m[0] !== 'PING') { | ||
return { succeeded: false, reason: 'Invalid ping message' }; | ||
} | ||
const _ping = m[0]; | ||
const hostname = m[1]; | ||
const sharedKeySalt = m[2]; | ||
const sharedKeyHexDigest = m[3]; | ||
const _username = m[4]; | ||
const passwordDigest = m[5]; | ||
const serverSideDigest = crypto.createHash('sha512') | ||
.update(sharedKeySalt) | ||
.update(hostname) | ||
.update(this._nonce) | ||
.update(this._options.security.sharedKey) | ||
.digest('hex'); | ||
if (passwordDigest !== serverSidePasswordDigest) { | ||
return { succeeded: false, reason: 'username/password mismatch' }; | ||
if (sharedKeyHexDigest !== serverSideDigest) { | ||
return { succeeded: false, reason: 'shared key mismatch' }; | ||
} | ||
if (this._options.security.username && this._options.security.password) { | ||
const serverSidePasswordDigest = crypto.createHash('sha512') | ||
.update(this._userAuthSalt) | ||
.update(this._options.security.username) | ||
.update(this._options.security.password) | ||
.digest('hex'); | ||
if (passwordDigest !== serverSidePasswordDigest) { | ||
return { succeeded: false, reason: 'username/password mismatch' }; | ||
} | ||
} | ||
return { succeeded: true, sharedKeySalt: sharedKeySalt }; | ||
} | ||
return { succeeded: true, sharedKeySalt: sharedKeySalt }; | ||
} | ||
}; | ||
MockFluentdServer.prototype._generatePong = function(authResult, nonce, sharedKey) { | ||
// this._options.generatePong() should return PONG message | ||
// [ | ||
// 'PONG', | ||
// bool(authentication result), | ||
// 'reason if authentication failed', | ||
// serverHostname, | ||
// sha512_hex(salt + serverHostname + nonce + sharedkey) | ||
// ] | ||
if (authResult.succeeded) { | ||
let sharedKeyDigestHex = crypto.createHash('sha512') | ||
.update(authResult.sharedKeySalt) | ||
.update(this._options.security.serverHostname) | ||
.update(nonce) | ||
.update(sharedKey) | ||
.digest('hex'); | ||
return ['PONG', true, '', this._options.security.serverHostname, sharedKeyDigestHex]; | ||
} else { | ||
return ['PONG', false, authResult.reason, '', '']; | ||
_generatePong(authResult, nonce, sharedKey) { | ||
// this._options.generatePong() should return PONG message | ||
// [ | ||
// 'PONG', | ||
// bool(authentication result), | ||
// 'reason if authentication failed', | ||
// serverHostname, | ||
// sha512_hex(salt + serverHostname + nonce + sharedkey) | ||
// ] | ||
if (authResult.succeeded) { | ||
const sharedKeyDigestHex = crypto.createHash('sha512') | ||
.update(authResult.sharedKeySalt) | ||
.update(this._options.security.serverHostname) | ||
.update(nonce) | ||
.update(sharedKey) | ||
.digest('hex'); | ||
return ['PONG', true, '', this._options.security.serverHostname, sharedKeyDigestHex]; | ||
} else { | ||
return ['PONG', false, authResult.reason, '', '']; | ||
} | ||
} | ||
}; | ||
MockFluentdServer.prototype.listen = function(callback) { | ||
this._server.listen(() => { | ||
this._port = this._server.address().port; | ||
callback(); | ||
}); | ||
}; | ||
listen(callback) { | ||
this._server.listen(() => { | ||
this._port = this._server.address().port; | ||
callback(); | ||
}); | ||
} | ||
MockFluentdServer.prototype.close = function(callback) { | ||
this._server.close(() => { | ||
callback(); | ||
}); | ||
for (var i in this._clients) { | ||
this._clients[i].end(); | ||
// this._clients[i].destroy(); | ||
close(callback) { | ||
this._server.close(() => { | ||
callback(); | ||
}); | ||
for (const i in this._clients) { | ||
this._clients[i].end(); | ||
// this._clients[i].destroy(); | ||
} | ||
} | ||
}; | ||
} | ||
module.exports = { | ||
runServer: function(options, tlsOptions, callback) { | ||
var server = new MockFluentdServer(options, tlsOptions); | ||
const server = new MockFluentdServer(options, tlsOptions); | ||
server.listen(() => { | ||
@@ -197,3 +199,3 @@ callback(server, (_callback) => { | ||
setTimeout(() => { | ||
var messages = server.messages; | ||
const messages = server.messages; | ||
server.close(() => { | ||
@@ -200,0 +202,0 @@ _callback && _callback(messages); |
@@ -6,4 +6,4 @@ 'use strict'; | ||
/* eslint node/no-unpublished-require: ["error", {"allowModules": ["winston"]}] */ | ||
const sender = require('./sender'); | ||
const FluentSender = require('./sender'); | ||
/* eslint-disable-next-line node/no-extraneous-require */ | ||
const Transport = require('winston-transport'); | ||
@@ -24,3 +24,3 @@ const DEFAULT_TAG = 'winston'; | ||
this.sender = new sender.FluentSender(tag, options); | ||
this.sender = new FluentSender(tag, options); | ||
} | ||
@@ -41,2 +41,2 @@ | ||
} | ||
} | ||
}; |
{ | ||
"name": "fluent-logger", | ||
"version": "2.8.0", | ||
"version": "3.0.0", | ||
"main": "./lib/index.js", | ||
@@ -31,3 +31,3 @@ "scripts": { | ||
"engines": { | ||
"node": ">=4" | ||
"node": ">=6" | ||
}, | ||
@@ -40,3 +40,3 @@ "dependencies": { | ||
"chai": "", | ||
"eslint": "^4.11.0", | ||
"eslint": "^5.1.0", | ||
"eslint-plugin-node": "*", | ||
@@ -43,0 +43,0 @@ "mocha": "*", |
@@ -345,2 +345,2 @@ # fluent-logger for Node.js | ||
This package is compatible with NodeJS versions > 4. | ||
This package is compatible with NodeJS versions >= 6. |
'use strict'; | ||
/* globals describe, it */ | ||
/* eslint node/no-unpublished-require: ["error", {"allowModules": ["chai"]}] */ | ||
var expect = require('chai').expect; | ||
var EventTime = require('../lib/event-time').EventTime; | ||
var msgpack = require('msgpack-lite'); | ||
const expect = require('chai').expect; | ||
const EventTime = require('../lib/event-time'); | ||
const msgpack = require('msgpack-lite'); | ||
var codec = msgpack.createCodec(); | ||
const codec = msgpack.createCodec(); | ||
codec.addExtPacker(0x00, EventTime, EventTime.pack); | ||
@@ -14,5 +14,5 @@ codec.addExtUnpacker(0x00, EventTime.unpack); | ||
it('should equal to decoded value', (done) => { | ||
var eventTime = EventTime.now(); | ||
var encoded = msgpack.encode(eventTime, { codec: codec }); | ||
var decoded = msgpack.decode(encoded, { codec: codec }); | ||
const eventTime = EventTime.now(); | ||
const encoded = msgpack.encode(eventTime, { codec: codec }); | ||
const decoded = msgpack.decode(encoded, { codec: codec }); | ||
expect(JSON.stringify(decoded)).to.equal(JSON.stringify(eventTime)); | ||
@@ -22,7 +22,7 @@ done(); | ||
it('should equal fromDate and fromTimestamp', (done) => { | ||
var now = new Date(1489543720999); // 2017-03-15T02:08:40.999Z | ||
var timestamp = now.getTime(); | ||
var eventTime = JSON.stringify(new EventTime(1489543720, 999000000)); | ||
var eventTime1 = JSON.stringify(EventTime.fromDate(now)); | ||
var eventTime2 = JSON.stringify(EventTime.fromTimestamp(timestamp)); | ||
const now = new Date(1489543720999); // 2017-03-15T02:08:40.999Z | ||
const timestamp = now.getTime(); | ||
const eventTime = JSON.stringify(new EventTime(1489543720, 999000000)); | ||
const eventTime1 = JSON.stringify(EventTime.fromDate(now)); | ||
const eventTime2 = JSON.stringify(EventTime.fromTimestamp(timestamp)); | ||
expect(eventTime1).to.equal(eventTime); | ||
@@ -29,0 +29,0 @@ expect(eventTime2).to.equal(eventTime); |
@@ -6,12 +6,12 @@ 'use strict'; | ||
/* eslint node/no-unpublished-require: ["error", {"allowModules": ["async", "chai"]}] */ | ||
var expect = require('chai').expect; | ||
var sender = require('../lib/sender'); | ||
var EventTime = require('../lib/event-time').EventTime; | ||
var runServer = require('../lib/testHelper').runServer; | ||
var stream = require('stream'); | ||
var async = require('async'); | ||
var EventEmitter = require('events').EventEmitter; | ||
var msgpack = require('msgpack-lite'); | ||
const expect = require('chai').expect; | ||
const FluentSender = require('../lib/sender'); | ||
const EventTime = require('../lib/event-time'); | ||
const runServer = require('../lib/testHelper').runServer; | ||
const stream = require('stream'); | ||
const async = require('async'); | ||
const EventEmitter = require('events').EventEmitter; | ||
const msgpack = require('msgpack-lite'); | ||
var codec = msgpack.createCodec(); | ||
const codec = msgpack.createCodec(); | ||
codec.addExtPacker(0x00, EventTime, EventTime.pack); | ||
@@ -24,5 +24,6 @@ codec.addExtUnpacker(0x00, EventTime.unpack); | ||
if (tls) { | ||
var selfsigned = require('selfsigned'); | ||
var attrs = [{ name: 'commonName', value: 'foo.com' }]; | ||
var pems = selfsigned.generate(attrs, { days: 365 }); | ||
/* eslint-disable-next-line node/no-unpublished-require */ | ||
const selfsigned = require('selfsigned'); | ||
const attrs = [{ name: 'commonName', value: 'foo.com' }]; | ||
const pems = selfsigned.generate(attrs, { days: 365 }); | ||
serverOptions = { tls: true, key: pems.private, cert: pems.cert, ca: pems.cert }; | ||
@@ -33,3 +34,3 @@ clientOptions = { tls: true, tlsOptions: { rejectUnauthorized: false } }; | ||
try { | ||
new sender.FluentSender('debug', Object.assign({}, clientOptions, { eventMode: 'Unknown' })); | ||
new FluentSender('debug', Object.assign({}, clientOptions, { eventMode: 'Unknown' })); | ||
} catch (e) { | ||
@@ -43,8 +44,8 @@ expect(e.message).to.be.equal('Unknown event mode: Unknown'); | ||
runServer({}, serverOptions, (server, finish) => { | ||
var s1 = new sender.FluentSender('debug', Object.assign({}, clientOptions, { port: server.port })); | ||
var emits = []; | ||
const s1 = new FluentSender('debug', Object.assign({}, clientOptions, { port: server.port })); | ||
const emits = []; | ||
function emit(k) { | ||
emits.push((done) => { s1.emit('record', k, done); }); | ||
} | ||
for (var i = 0; i < 10; i++) { | ||
for (let i = 0; i < 10; i++) { | ||
emit({ number: i }); | ||
@@ -55,3 +56,3 @@ } | ||
expect(data.length).to.be.equal(10); | ||
for (var i = 0; i < 10; i++) { | ||
for (let i = 0; i < 10; i++) { | ||
expect(data[i].tag).to.be.equal('debug.record'); | ||
@@ -69,4 +70,4 @@ expect(data[i].data.number).to.be.equal(i); | ||
runServer({}, serverOptions, (server, finish) => { | ||
var s = new sender.FluentSender('debug', Object.assign({}, clientOptions, {port: server.port})); | ||
var called = false; | ||
const s = new FluentSender('debug', Object.assign({}, clientOptions, {port: server.port})); | ||
let called = false; | ||
s.on('connect', () => { | ||
@@ -85,3 +86,3 @@ called = true; | ||
it('should raise error when connection fails', (done) => { | ||
var s = new sender.FluentSender('debug', Object.assign({}, clientOptions, { | ||
const s = new FluentSender('debug', Object.assign({}, clientOptions, { | ||
host: 'localhost', | ||
@@ -98,3 +99,3 @@ port: 65535 | ||
it('should log error when connection fails w/ internal logger', (done) => { | ||
var logger = { | ||
const logger = { | ||
buffer: { | ||
@@ -111,3 +112,3 @@ info: [], | ||
}; | ||
var s = new sender.FluentSender('debug', Object.assign({}, clientOptions, { | ||
const s = new FluentSender('debug', Object.assign({}, clientOptions, { | ||
host: 'localhost', | ||
@@ -131,3 +132,3 @@ port: 65535, | ||
runServer({}, serverOptions, (server, finish) => { | ||
var s = new sender.FluentSender('debug', Object.assign({}, clientOptions, {port: server.port})); | ||
const s = new FluentSender('debug', Object.assign({}, clientOptions, {port: server.port})); | ||
s.emit('1st record', { message: '1st data' }); | ||
@@ -151,5 +152,5 @@ s.emit('2nd record', { message: '2nd data' }); | ||
runServer({}, serverOptions, (server, finish) => { | ||
var s = new sender.FluentSender('debug', Object.assign({}, clientOptions, {port: server.port})); | ||
var timestamp = new Date(2222, 12, 4); | ||
var timestamp_seconds_since_epoch = Math.floor(timestamp.getTime() / 1000); | ||
const s = new FluentSender('debug', Object.assign({}, clientOptions, {port: server.port})); | ||
const timestamp = new Date(2222, 12, 4); | ||
const timestamp_seconds_since_epoch = Math.floor(timestamp.getTime() / 1000); | ||
@@ -167,4 +168,4 @@ s.emit('1st record', { message: '1st data' }, timestamp, () => { | ||
runServer({}, serverOptions, (server, finish) => { | ||
var s = new sender.FluentSender('debug', Object.assign({}, clientOptions, {port: server.port})); | ||
var timestamp = Math.floor(new Date().getTime() / 1000); | ||
const s = new FluentSender('debug', Object.assign({}, clientOptions, {port: server.port})); | ||
const timestamp = Math.floor(new Date().getTime() / 1000); | ||
@@ -182,8 +183,8 @@ s.emit('1st record', { message: '1st data' }, timestamp, () => { | ||
runServer({}, serverOptions, (server, finish) => { | ||
var s = new sender.FluentSender('debug', Object.assign({}, clientOptions, {port: server.port})); | ||
var eventTime = EventTime.now(); | ||
const s = new FluentSender('debug', Object.assign({}, clientOptions, {port: server.port})); | ||
const eventTime = EventTime.now(); | ||
s.emit('1st record', { message: '1st data' }, eventTime, () => { | ||
finish((data) => { | ||
var decoded = EventTime.unpack(data[0].time.buffer); | ||
const decoded = EventTime.unpack(data[0].time.buffer); | ||
expect(JSON.stringify(decoded)).to.equal(JSON.stringify(eventTime)); | ||
@@ -197,3 +198,3 @@ done(); | ||
it('should resume the connection automatically and flush the queue', (done) => { | ||
var s = new sender.FluentSender('debug', clientOptions); | ||
const s = new FluentSender('debug', clientOptions); | ||
s.emit('1st record', { message: '1st data' }); | ||
@@ -222,3 +223,3 @@ s.on('error', (err) => { | ||
runServer({}, serverOptions, (server, finish) => { | ||
var s = new sender.FluentSender('debug', Object.assign({}, clientOptions, {port: server.port})); | ||
const s = new FluentSender('debug', Object.assign({}, clientOptions, {port: server.port})); | ||
s.emit('foo', 'bar', () => { | ||
@@ -253,11 +254,11 @@ // connected | ||
runServer({requireAckResponse: true}, serverOptions, (server, finish) => { | ||
var s1 = new sender.FluentSender('debug', Object.assign({}, clientOptions, { | ||
const s1 = new FluentSender('debug', Object.assign({}, clientOptions, { | ||
port: server.port, | ||
requireAckResponse: true | ||
})); | ||
var emits = []; | ||
const emits = []; | ||
function emit(k) { | ||
emits.push((done) => { s1.emit('record', k, done); }); | ||
} | ||
for (var i = 0; i < 10; i++) { | ||
for (let i = 0; i < 10; i++) { | ||
emit({ number: i }); | ||
@@ -268,3 +269,3 @@ } | ||
expect(data.length).to.be.equal(10); | ||
for (var i = 0; i < 10; i++) { | ||
for (let i = 0; i < 10; i++) { | ||
expect(data[i].tag).to.be.equal('debug.record'); | ||
@@ -283,3 +284,3 @@ expect(data[i].data.number).to.be.equal(i); | ||
runServer({requireAckResponse: false }, serverOptions, (server, finish) => { | ||
var s1 = new sender.FluentSender('debug', Object.assign({}, clientOptions, { | ||
const s1 = new FluentSender('debug', Object.assign({}, clientOptions, { | ||
port: server.port, | ||
@@ -301,3 +302,3 @@ requireAckResponse: false, | ||
it('should set error handler', (done) => { | ||
var s = new sender.FluentSender('debug', Object.assign({}, clientOptions, { | ||
const s = new FluentSender('debug', Object.assign({}, clientOptions, { | ||
reconnectInterval: 100 | ||
@@ -400,3 +401,3 @@ })); | ||
runServer({}, serverOptions, (server, finish) => { | ||
var s1 = new sender.FluentSender('debug', Object.assign({}, clientOptions, { port: server.port })); | ||
const s1 = new FluentSender('debug', Object.assign({}, clientOptions, { port: server.port })); | ||
s1.emit.apply(s1, testCase.args); | ||
@@ -465,3 +466,3 @@ | ||
runServer({}, serverOptions, (server, finish) => { | ||
var s1 = new sender.FluentSender(null, Object.assign({}, clientOptions, { port: server.port })); | ||
const s1 = new FluentSender(null, Object.assign({}, clientOptions, { port: server.port })); | ||
s1.emit.apply(s1, testCase.args); | ||
@@ -517,3 +518,3 @@ | ||
runServer({}, serverOptions, (server, finish) => { | ||
var s1 = new sender.FluentSender(null, Object.assign({}, clientOptions, { port: server.port })); | ||
const s1 = new FluentSender(null, Object.assign({}, clientOptions, { port: server.port })); | ||
s1.on('error', (error) => { | ||
@@ -541,3 +542,3 @@ expect(error.name).to.be.equal('MissingTagError'); | ||
runServer({}, serverOptions, (server, finish) => { | ||
var s1 = new sender.FluentSender(null, Object.assign({}, clientOptions, { port: server.port })); | ||
const s1 = new FluentSender(null, Object.assign({}, clientOptions, { port: server.port })); | ||
s1.on('error', (error) => { | ||
@@ -555,3 +556,3 @@ expect(error.name).to.be.equal('DataTypeError'); | ||
it('should set max listeners', (done) => { | ||
var s = new sender.FluentSender('debug', clientOptions); | ||
const s = new FluentSender('debug', clientOptions); | ||
if (EventEmitter.prototype.getMaxListeners) { | ||
@@ -572,3 +573,3 @@ expect(s.getMaxListeners()).to.be.equal(10); | ||
runServer({}, serverOptions, (server, finish) => { | ||
var s = new sender.FluentSender('debug', Object.assign({}, clientOptions, {port: server.port})); | ||
const s = new FluentSender('debug', Object.assign({}, clientOptions, {port: server.port})); | ||
s.emit('1st record', { message: '1st data' }, () => { | ||
@@ -591,5 +592,5 @@ s._disconnect(); | ||
runServer({}, serverOptions, (server, finish) => { | ||
var s = new sender.FluentSender('debug', Object.assign({}, clientOptions, { port: server.port })); | ||
var ss = s.toStream('record'); | ||
var pt = new stream.PassThrough(); | ||
const s = new FluentSender('debug', Object.assign({}, clientOptions, { port: server.port })); | ||
const ss = s.toStream('record'); | ||
const pt = new stream.PassThrough(); | ||
pt.pipe(ss); | ||
@@ -616,3 +617,3 @@ pt.push('data1\n'); | ||
runServer({ requireAckResponse: true }, serverOptions, (server, finish) => { | ||
var s = new sender.FluentSender('debug', Object.assign({}, clientOptions, { | ||
const s = new FluentSender('debug', Object.assign({}, clientOptions, { | ||
port: server.port, | ||
@@ -623,11 +624,11 @@ timeout: 3.0, | ||
})); | ||
var errors = []; | ||
const errors = []; | ||
s.on('error', (err) => { | ||
errors.push(count + ': ' + err); | ||
}); | ||
var maxCount = 20; | ||
var count = 0; | ||
var sendMessage = function() { | ||
var time = Math.round(Date.now() / 1000); | ||
var data = { | ||
const maxCount = 20; | ||
let count = 0; | ||
const sendMessage = function() { | ||
const time = Math.round(Date.now() / 1000); | ||
const data = { | ||
count: count | ||
@@ -644,3 +645,3 @@ }; | ||
}; | ||
var timer = setInterval(sendMessage, 10); | ||
let timer = setInterval(sendMessage, 10); | ||
}); | ||
@@ -651,3 +652,3 @@ }); | ||
runServer({}, serverOptions, (server, finish) => { | ||
let loggerOptions = { | ||
const loggerOptions = { | ||
port: server.port, | ||
@@ -660,3 +661,3 @@ eventMode: 'PackedForward', | ||
}; | ||
var s = new sender.FluentSender('debug', Object.assign({}, clientOptions, loggerOptions)); | ||
const s = new FluentSender('debug', Object.assign({}, clientOptions, loggerOptions)); | ||
s.emit('test', { message: 'This is test 0' }); | ||
@@ -680,3 +681,3 @@ s.end('test', { message: 'This is test 1' }); | ||
runServer({}, serverOptions, (server, finish) => { | ||
let loggerOptions = { | ||
const loggerOptions = { | ||
port: server.port, | ||
@@ -689,3 +690,3 @@ eventMode: 'CompressedPackedForward', | ||
}; | ||
var s = new sender.FluentSender('debug', Object.assign({}, clientOptions, loggerOptions)); | ||
const s = new FluentSender('debug', Object.assign({}, clientOptions, loggerOptions)); | ||
s.emit('test', { message: 'This is test 0' }); | ||
@@ -708,4 +709,4 @@ s.emit('test', { message: 'This is test 1' }); | ||
it('should process handshake sahred key', (done) => { | ||
let sharedKey = 'sharedkey'; | ||
let options = { | ||
const sharedKey = 'sharedkey'; | ||
const options = { | ||
security: { | ||
@@ -717,3 +718,3 @@ serverHostname: 'server.example.com', | ||
runServer(options, serverOptions, (server, finish) => { | ||
let loggerOptions = { | ||
const loggerOptions = { | ||
port: server.port, | ||
@@ -729,3 +730,3 @@ security: { | ||
}; | ||
var s = new sender.FluentSender('debug', Object.assign({}, clientOptions, loggerOptions)); | ||
const s = new FluentSender('debug', Object.assign({}, clientOptions, loggerOptions)); | ||
s.emit('test', { message: 'This is test 0' }); | ||
@@ -745,4 +746,4 @@ s.emit('test', { message: 'This is test 1' }); | ||
it('should process handshake sahred key mismatch', (done) => { | ||
let sharedKey = 'sharedkey'; | ||
let options = { | ||
const sharedKey = 'sharedkey'; | ||
const options = { | ||
security: { | ||
@@ -754,3 +755,3 @@ serverHostname: 'server.example.com', | ||
runServer(options, serverOptions, (server, finish) => { | ||
let loggerOptions = { | ||
const loggerOptions = { | ||
port: server.port, | ||
@@ -766,3 +767,3 @@ security: { | ||
}; | ||
var s = new sender.FluentSender('debug', Object.assign({}, clientOptions, loggerOptions)); | ||
const s = new FluentSender('debug', Object.assign({}, clientOptions, loggerOptions)); | ||
s.on('error', (error) => { | ||
@@ -780,4 +781,4 @@ expect(error.message).to.be.equal('Authentication failed: shared key mismatch'); | ||
it('should process handshake user based authentication', (done) => { | ||
let sharedKey = 'sharedkey'; | ||
let options = { | ||
const sharedKey = 'sharedkey'; | ||
const options = { | ||
security: { | ||
@@ -791,3 +792,3 @@ serverHostname: 'server.example.com', | ||
runServer(options, serverOptions, (server, finish) => { | ||
let loggerOptions = { | ||
const loggerOptions = { | ||
port: server.port, | ||
@@ -805,3 +806,3 @@ security: { | ||
}; | ||
var s = new sender.FluentSender('debug', Object.assign({}, clientOptions, loggerOptions)); | ||
const s = new FluentSender('debug', Object.assign({}, clientOptions, loggerOptions)); | ||
s.emit('test', { message: 'This is test 0' }); | ||
@@ -821,4 +822,4 @@ s.emit('test', { message: 'This is test 1' }); | ||
it('should process handshake user based authentication failed', (done) => { | ||
let sharedKey = 'sharedkey'; | ||
let options = { | ||
const sharedKey = 'sharedkey'; | ||
const options = { | ||
security: { | ||
@@ -832,3 +833,3 @@ serverHostname: 'server.example.com', | ||
runServer(options, serverOptions, (server, finish) => { | ||
let loggerOptions = { | ||
const loggerOptions = { | ||
port: server.port, | ||
@@ -846,3 +847,3 @@ security: { | ||
}; | ||
var s = new sender.FluentSender('debug', Object.assign({}, clientOptions, loggerOptions)); | ||
const s = new FluentSender('debug', Object.assign({}, clientOptions, loggerOptions)); | ||
s.on('error', (error) => { | ||
@@ -860,4 +861,4 @@ expect(error.message).to.be.equal('Authentication failed: username/password mismatch'); | ||
it('should process handshake failed', (done) => { | ||
let sharedKey = 'sharedkey'; | ||
let options = { | ||
const sharedKey = 'sharedkey'; | ||
const options = { | ||
security: { | ||
@@ -870,3 +871,3 @@ serverHostname: 'server.example.com', | ||
runServer(options, serverOptions, (server, finish) => { | ||
let loggerOptions = { | ||
const loggerOptions = { | ||
port: server.port, | ||
@@ -882,3 +883,3 @@ security: { | ||
}; | ||
var s = new sender.FluentSender('debug', Object.assign({}, clientOptions, loggerOptions)); | ||
const s = new FluentSender('debug', Object.assign({}, clientOptions, loggerOptions)); | ||
s.on('error', (err) => { | ||
@@ -885,0 +886,0 @@ expect(err.message).to.be.equal('Authentication failed: reason'); |
'use strict'; | ||
/* globals describe, it */ | ||
/* eslint node/no-unpublished-require: ["error", {"allowModules": ["async", "chai", "winston"]}] */ | ||
var expect = require('chai').expect; | ||
var winstonSupport = require('../lib/winston'); | ||
var winston = require('winston'); | ||
var runServer = require('../lib/testHelper').runServer; | ||
const expect = require('chai').expect; | ||
const winstonSupport = require('../lib/winston'); | ||
const winston = require('winston'); | ||
const runServer = require('../lib/testHelper').runServer; | ||
@@ -21,3 +21,3 @@ describe('winston', () => { | ||
runServer({}, {}, (server, finish) => { | ||
var logger = winston.createLogger({ | ||
const logger = winston.createLogger({ | ||
format: winston.format.combine( | ||
@@ -24,0 +24,0 @@ winston.format.splat(), |
Sorry, the diff of this file is not supported yet
1833
119297