Socket
Socket
Sign inDemoInstall

fluent-logger

Package Overview
Dependencies
1
Maintainers
4
Versions
46
Alerts
File Explorer

Advanced tools

Install Socket

Detect and block malicious and high-risk dependencies

Install

Comparing version 2.8.0 to 3.0.0

3

.eslintrc.js

@@ -60,2 +60,5 @@ module.exports = {

],
"no-var": [
"error"
],
"node/exports-style": [

@@ -62,0 +65,0 @@ "error",

59

lib/event-time.js
'use strict';
var EventTime = function EventTime(epoch, nano) {
this.epoch = epoch;
this.nano = nano;
};
module.exports = class EventTime {
constructor(epoch, nano) {
this.epoch = epoch;
this.nano = nano;
}
EventTime.pack = function eventTimePack(eventTime) {
var b = Buffer.alloc(8);
b.writeUInt32BE(eventTime.epoch, 0);
b.writeUInt32BE(eventTime.nano, 4);
return b;
};
static pack(eventTime) {
const b = Buffer.alloc(8);
b.writeUInt32BE(eventTime.epoch, 0);
b.writeUInt32BE(eventTime.nano, 4);
return b;
}
EventTime.unpack = function eventTimeUnpack(buffer) {
var e = buffer.readUInt32BE(0);
var n = buffer.readUInt32BE(4);
return new EventTime(e, n);
};
static unpack(buffer) {
const e = buffer.readUInt32BE(0);
const n = buffer.readUInt32BE(4);
return new EventTime(e, n);
}
EventTime.now = function now() {
var now = Date.now();
return EventTime.fromTimestamp(now);
};
static now() {
const now = Date.now();
return EventTime.fromTimestamp(now);
}
EventTime.fromDate = function fromDate(date) {
var t = date.getTime();
return EventTime.fromTimestamp(t);
};
static fromDate(date) {
const t = date.getTime();
return EventTime.fromTimestamp(t);
}
EventTime.fromTimestamp = function fromTimestamp(t) {
var epoch = Math.floor(t / 1000);
var nano = (t - epoch * 1000) * 1000000;
return new EventTime(epoch, nano);
static fromTimestamp(t) {
const epoch = Math.floor(t / 1000);
const nano = (t - epoch * 1000) * 1000000;
return new EventTime(epoch, nano);
}
};
module.exports = {};
module.exports.EventTime = EventTime;
'use strict';
var FluentSender = require('./sender').FluentSender;
var sender = new FluentSender('debug');
var EventTime = require('./event-time').EventTime;
const FluentSender = require('./sender');
const EventTime = require('./event-time');
let sender = new FluentSender('debug');
module.exports = {
configure: function(config) {
configure: function(tag, options) {
sender.end();
var tag = config;
var options = arguments[1];
sender = new FluentSender(tag, options);

@@ -17,3 +15,3 @@ sender._setupErrorHandler();

createFluentSender: function(tag, options) {
var _sender = new FluentSender(tag, options);
const _sender = new FluentSender(tag, options);
_sender._setupErrorHandler();

@@ -25,3 +23,3 @@ return _sender;

winstonTransport: function() {
var transport = require('../lib/winston');
const transport = require('../lib/winston');
return transport;

@@ -35,7 +33,7 @@ }

// delegate logger interfaces to default sender object
var methods = ['emit', 'end', 'addListener', 'on', 'once', 'removeListener', 'removeAllListeners', 'setMaxListeners', 'getMaxListeners'];
const methods = ['emit', 'end', 'addListener', 'on', 'once', 'removeListener', 'removeAllListeners', 'setMaxListeners', 'getMaxListeners'];
methods.forEach((attr, i) => {
module.exports[attr] = function() {
if (sender) {
return sender[attr].apply(sender, Array.prototype.slice.call(arguments));
return sender[attr].apply(sender, Array.from(arguments));
}

@@ -42,0 +40,0 @@ return undefined;

'use strict';
var util = require('util');
module.exports = {};
var BaseError = function BaseError(message, options) {
Error.captureStackTrace(this, this.constructor);
this.name = this.constructor.name;
this.message = message;
this.options = options;
};
util.inherits(BaseError, Error);
class BaseError extends Error {
constructor(message, options) {
super();
Error.captureStackTrace(this, this.constructor);
this.name = this.constructor.name;
this.message = message;
this.options = options;
}
}
var ConfigError = function ConfigError(message, options) {
ConfigError.super_.call(this, message, options);
};
util.inherits(ConfigError, BaseError);
class ConfigError extends BaseError {
constructor(message, options) {
super(message, options);
}
}
var MissingTagError = function MissingTagError(message, options) {
MissingTagError.super_.call(this, message, options);
};
util.inherits(MissingTagError, BaseError);
class MissingTagError extends BaseError {
constructor(message, options) {
super(message, options);
}
}
var ResponseError = function ResponseError(message, options) {
MissingTagError.super_.call(this, message, options);
};
util.inherits(ResponseError, BaseError);
class ResponseError extends BaseError {
constructor(message, options) {
super(message, options);
}
}
var ResponseTimeoutError = function ResponseTimeoutError(message, options) {
ResponseTimeoutError.super_.call(this, message, options);
};
util.inherits(ResponseTimeoutError, BaseError);
class ResponseTimeoutError extends BaseError {
constructor(message, options) {
super(message, options);
}
}
var DataTypeError = function DataTypeError(message, options) {
DataTypeError.super_.call(this, message, options);
};
util.inherits(DataTypeError, BaseError);
class DataTypeError extends BaseError {
constructor(message, options) {
super(message, options);
}
}
var HandshakeError = function HandshakeError(message, options) {
HandshakeError.super_.call(this, message, options);
};
util.inherits(HandshakeError, BaseError);
class HandshakeError extends BaseError {
constructor(message, options) {
super(message, options);
}
}

@@ -45,0 +51,0 @@ module.exports = {

'use strict';
var EventEmitter = require('events').EventEmitter;
var msgpack = require('msgpack-lite');
var net = require('net');
var stream = require('stream');
var crypto = require('crypto');
var tls = require('tls');
var zlib = require('zlib');
var FluentLoggerError = require('./logger-error');
var EventTime = require('./event-time').EventTime;
const EventEmitter = require('events').EventEmitter;
const msgpack = require('msgpack-lite');
const net = require('net');
const stream = require('stream');
const crypto = require('crypto');
const tls = require('tls');
const zlib = require('zlib');
const FluentLoggerError = require('./logger-error');
const EventTime = require('./event-time');
var codec = msgpack.createCodec();
const codec = msgpack.createCodec();
codec.addExtPacker(0x00, EventTime, EventTime.pack);
codec.addExtUnpacker(0x00, EventTime.unpack);
function FluentSender(tag_prefix, options) {
options = options || {};
this._eventMode = options.eventMode || 'Message'; // Message, PackedForward, CompressedPackedForward
if (!/^Message|PackedForward|CompressedPackedForward$/.test(this._eventMode)) {
throw new FluentLoggerError.ConfigError('Unknown event mode: ' + this._eventMode);
class FluentSender {
constructor(tag_prefix, options) {
options = options || {};
this._eventMode = options.eventMode || 'Message'; // Message, PackedForward, CompressedPackedForward
if (!/^Message|PackedForward|CompressedPackedForward$/.test(this._eventMode)) {
throw new FluentLoggerError.ConfigError('Unknown event mode: ' + this._eventMode);
}
this.tag_prefix = tag_prefix;
this.host = options.host || 'localhost';
this.port = options.port || 24224;
this.path = options.path;
this.timeout = options.timeout || 3.0;
this.tls = !!options.tls;
this.tlsOptions = options.tlsOptions || {};
this.reconnectInterval = options.reconnectInterval || 600000; // Default is 10 minutes
this.requireAckResponse = options.requireAckResponse;
this.ackResponseTimeout = options.ackResponseTimeout || 190000; // Default is 190 seconds
this.internalLogger = options.internalLogger || console;
this._timeResolution = options.milliseconds ? 1 : 1000;
this._socket = null;
if (this._eventMode === 'Message') {
this._sendQueue = []; // queue for items waiting for being sent.
this._flushInterval = 0;
} else {
this._sendQueue = new Map();
this._flushInterval = options.flushInterval || 100;
this._sendQueueSizeLimit = 8 * 1024 * 1024; // 8MiB
this._sendQueueSize = 0;
this._flushSendQueueTimeoutId = null;
this._compressed = (this._eventMode === 'CompressedPackedForward');
}
this._eventEmitter = new EventEmitter();
// options.security = { clientHostname: "client.localdomain", sharedKey: "very-secret-shared-key" }
this.security = options.security || {
clientHostname: null,
sharedKey: null,
username: '',
password: ''
};
this.sharedKeySalt = crypto.randomBytes(16).toString('hex');
// helo, pingpong, established
this._status = null;
this._connecting = false;
}
this.tag_prefix = tag_prefix;
this.host = options.host || 'localhost';
this.port = options.port || 24224;
this.path = options.path;
this.timeout = options.timeout || 3.0;
this.tls = !!options.tls;
this.tlsOptions = options.tlsOptions || {};
this.reconnectInterval = options.reconnectInterval || 600000; // Default is 10 minutes
this.requireAckResponse = options.requireAckResponse;
this.ackResponseTimeout = options.ackResponseTimeout || 190000; // Default is 190 seconds
this.internalLogger = options.internalLogger || console;
this._timeResolution = options.milliseconds ? 1 : 1000;
this._socket = null;
if (this._eventMode === 'Message') {
this._sendQueue = []; // queue for items waiting for being sent.
this._flushInterval = 0;
} else {
this._sendQueue = new Map();
this._flushInterval = options.flushInterval || 100;
this._sendQueueSizeLimit = 8 * 1024 * 1024; // 8MiB
this._sendQueueSize = 0;
this._flushSendQueueTimeoutId = null;
this._compressed = (this._eventMode === 'CompressedPackedForward');
}
this._eventEmitter = new EventEmitter();
// options.security = { clientHostname: "client.localdomain", sharedKey: "very-secret-shared-key" }
this.security = options.security || {
clientHostname: null,
sharedKey: null,
username: '',
password: ''
};
this.sharedKeySalt = crypto.randomBytes(16).toString('hex');
// helo, pingpong, established
this._status = null;
this._connecting = false;
}
FluentSender.prototype.emit = function(/*[label] <data>, [timestamp], [callback] */) {
var label, data, timestamp, callback;
var args = Array.prototype.slice.call(arguments);
// Label must be string always
if (typeof args[0] === 'string') label = args.shift();
emit(/*[label] <data>, [timestamp], [callback] */) {
let label, data, timestamp, callback;
let args = Array.from(arguments);
// Label must be string always
if (typeof args[0] === 'string') {
label = args.shift();
}
// Data can be almost anything
data = args.shift();
// Data can be almost anything
data = args.shift();
// Date can be either timestamp number or Date object
if (typeof args[0] !== 'function') timestamp = args.shift();
// Date can be either timestamp number or Date object
if (typeof args[0] !== 'function') {
timestamp = args.shift();
}
// Last argument is an optional callback
if (typeof args[0] === 'function') callback = args.shift();
// Last argument is an optional callback
if (typeof args[0] === 'function') {
callback = args.shift();
}
let tag = this._makeTag(label);
let error;
let options;
if (tag === null) {
options = {
tag_prefix: this.tag_prefix,
label: label
};
error = new FluentLoggerError.MissingTag('tag is missing', options);
this._handleEvent('error', error, callback);
return;
}
if (typeof data !== 'object') {
options = {
tag_prefix: this.tag_prefix,
label: label,
record: data
};
error = new FluentLoggerError.DataTypeError('data must be an object', options);
this._handleEvent('error', error, callback);
return;
}
const tag = this._makeTag(label);
let error;
let options;
if (tag === null) {
options = {
tag_prefix: this.tag_prefix,
label: label
};
error = new FluentLoggerError.MissingTag('tag is missing', options);
this._handleEvent('error', error, callback);
return;
}
if (typeof data !== 'object') {
options = {
tag_prefix: this.tag_prefix,
label: label,
record: data
};
error = new FluentLoggerError.DataTypeError('data must be an object', options);
this._handleEvent('error', error, callback);
return;
}
this._push(tag, timestamp, data, callback);
this._connect(() => {
this._flushSendQueue();
});
};
['addListener', 'on', 'once', 'removeListener', 'removeAllListeners', 'setMaxListeners', 'getMaxListeners'].forEach((attr, i) => {
FluentSender.prototype[attr] = function() {
return this._eventEmitter[attr].apply(this._eventEmitter, Array.prototype.slice.call(arguments));
};
});
FluentSender.prototype.end = function(label, data, callback) {
if ((label != null && data != null)) {
this.emit(label, data, (err) => {
this._close();
if (err) {
this._handleEvent('error', err, callback);
} else {
callback && callback();
}
this._push(tag, timestamp, data, callback);
this._connect(() => {
this._flushSendQueue();
});
} else {
process.nextTick(() => {
this._close();
callback && callback();
});
}
};
FluentSender.prototype._close = function() {
if (this._socket) {
this._socket.end();
this._socket = null;
this._status = null;
end(label, data, callback) {
if ((label != null && data != null)) {
this.emit(label, data, (err) => {
this._close();
if (err) {
this._handleEvent('error', err, callback);
} else {
callback && callback();
}
});
} else {
process.nextTick(() => {
this._close();
callback && callback();
});
}
}
};
FluentSender.prototype._makeTag = function(label) {
let tag = null;
if (this.tag_prefix && label) {
tag = [this.tag_prefix, label].join('.');
} else if (this.tag_prefix) {
tag = this.tag_prefix;
} else if (label) {
tag = label;
_close() {
if (this._socket) {
this._socket.end();
this._socket = null;
this._status = null;
}
}
return tag;
};
FluentSender.prototype._makePacketItem = function(tag, time, data) {
if (typeof time !== 'number' && !(time instanceof EventTime)) {
time = Math.floor((time ? time.getTime() : Date.now()) / this._timeResolution);
_makeTag(label) {
let tag = null;
if (this.tag_prefix && label) {
tag = [this.tag_prefix, label].join('.');
} else if (this.tag_prefix) {
tag = this.tag_prefix;
} else if (label) {
tag = label;
}
return tag;
}
var packet = [tag, time, data];
var options = {};
if (this.requireAckResponse) {
options = {
chunk: crypto.randomBytes(16).toString('base64')
_makePacketItem(tag, time, data) {
if (typeof time !== 'number' && !(time instanceof EventTime)) {
time = Math.floor((time ? time.getTime() : Date.now()) / this._timeResolution);
}
const packet = [tag, time, data];
let options = {};
if (this.requireAckResponse) {
options = {
chunk: crypto.randomBytes(16).toString('base64')
};
packet.push(options);
}
return {
packet: msgpack.encode(packet, { codec: codec }),
tag: tag,
time: time,
data: data,
options: options
};
packet.push(options);
}
return {
packet: msgpack.encode(packet, { codec: codec }),
tag: tag,
time: time,
data: data,
options: options
};
};
FluentSender.prototype._makeEventEntry = function(time, data) {
if (typeof time !== 'number' && !(time instanceof EventTime)) {
time = Math.floor((time ? time.getTime() : Date.now()) / this._timeResolution);
_makeEventEntry(time, data) {
if (typeof time !== 'number' && !(time instanceof EventTime)) {
time = Math.floor((time ? time.getTime() : Date.now()) / this._timeResolution);
}
return msgpack.encode([time, data], { codec: codec });
}
return msgpack.encode([time, data], { codec: codec });
};
FluentSender.prototype._push = function(tag, time, data, callback) {
if (this._eventMode === 'Message') {
// Message mode
let item = this._makePacketItem(tag, time, data);
item.callback = callback;
this._sendQueue.push(item);
} else {
// PackedForward mode
let eventEntry = this._makeEventEntry(time, data);
this._sendQueueSize += eventEntry.length;
if (this._sendQueue.has(tag)) {
let eventEntryData = this._sendQueue.get(tag);
eventEntryData.eventEntry = Buffer.concat([eventEntryData.eventEntry, eventEntry]);
eventEntryData.callbacks.push(callback);
this._sendQueue.set(tag, eventEntryData);
_push(tag, time, data, callback) {
if (this._eventMode === 'Message') {
// Message mode
let item = this._makePacketItem(tag, time, data);
item.callback = callback;
this._sendQueue.push(item);
} else {
this._sendQueue.set(tag, { eventEntry: eventEntry, callbacks: [callback] });
// PackedForward mode
const eventEntry = this._makeEventEntry(time, data);
this._sendQueueSize += eventEntry.length;
if (this._sendQueue.has(tag)) {
let eventEntryData = this._sendQueue.get(tag);
eventEntryData.eventEntry = Buffer.concat([eventEntryData.eventEntry, eventEntry]);
eventEntryData.callbacks.push(callback);
this._sendQueue.set(tag, eventEntryData);
} else {
this._sendQueue.set(tag, { eventEntry: eventEntry, callbacks: [callback] });
}
}
}
};
FluentSender.prototype._connect = function(callback) {
if (this._connecting) {
return;
}
_connect(callback) {
if (this._connecting) {
return;
}
this._connecting = true;
process.nextTick(() => {
if (this._socket === null) {
this._doConnect(() => {
this._connecting = false;
callback();
});
} else {
if (!this._socket.writable) {
this._disconnect();
process.nextTick(() => {
this._connecting = true;
process.nextTick(() => {
if (this._socket === null) {
this._doConnect(() => {
this._connecting = false;
this._connect(callback);
callback();
});
} else {
process.nextTick(() => {
this._connecting = false;
callback();
});
if (!this._socket.writable) {
this._disconnect();
process.nextTick(() => {
this._connecting = false;
this._connect(callback);
});
} else {
process.nextTick(() => {
this._connecting = false;
callback();
});
}
}
}
});
};
});
}
FluentSender.prototype._doConnect = function(callback) {
let addHandlers = () => {
let errorHandler = (err) => {
if (this._socket) {
this._disconnect();
this._handleEvent('error', err);
_doConnect(callback) {
let addHandlers = () => {
let errorHandler = (err) => {
if (this._socket) {
this._disconnect();
this._handleEvent('error', err);
}
};
this._socket.on('error', errorHandler);
this._socket.on('connect', () => {
this._handleEvent('connect');
});
if (this.tls) {
this._socket.on('tlsClientError', errorHandler);
this._socket.on('secureConnect', () => {
this._handleEvent('connect');
});
}
};
this._socket.on('error', errorHandler);
this._socket.on('connect', () => {
this._handleEvent('connect');
});
if (this.tls) {
this._socket.on('tlsClientError', errorHandler);
this._socket.on('secureConnect', () => {
this._handleEvent('connect');
});
}
};
if (!this.tls) {
this._socket = new net.Socket();
this._socket.setTimeout(this.timeout);
addHandlers();
}
if (this.path) {
if (this.tls) {
this._socket = tls.connect(Object.assign({}, this.tlsOptions, { path: this.path }), () => {
callback();
});
if (!this.tls) {
this._socket = new net.Socket();
this._socket.setTimeout(this.timeout);
addHandlers();
} else {
this._socket.connect(this.path, () => {
callback();
});
}
} else {
let postConnect = () => {
if (this.security.clientHostname && this.security.sharedKey !== null) {
this._handshake(callback);
if (this.path) {
if (this.tls) {
this._socket = tls.connect(Object.assign({}, this.tlsOptions, { path: this.path }), () => {
callback();
});
addHandlers();
} else {
this._status = 'established';
callback();
this._socket.connect(this.path, () => {
callback();
});
}
};
if (this.tls) {
this._socket = tls.connect(Object.assign({}, this.tlsOptions, { host: this.host, port: this.port }), () => {
postConnect();
});
addHandlers();
} else {
this._socket.connect(this.port, this.host, () => {
postConnect();
});
let postConnect = () => {
if (this.security.clientHostname && this.security.sharedKey !== null) {
this._handshake(callback);
} else {
this._status = 'established';
callback();
}
};
if (this.tls) {
this._socket = tls.connect(Object.assign({}, this.tlsOptions, { host: this.host, port: this.port }), () => {
postConnect();
});
addHandlers();
} else {
this._socket.connect(this.port, this.host, () => {
postConnect();
});
}
}
}
};
FluentSender.prototype._disconnect = function() {
this._socket && this._socket.destroy();
this._socket = null;
this._status = null;
this._connecting = false;
};
_disconnect() {
this._socket && this._socket.destroy();
this._socket = null;
this._status = null;
this._connecting = false;
}
FluentSender.prototype._handshake = function(callback) {
if (this._status === 'established') {
return;
}
this._status = 'helo';
this._socket.once('data', (data) => {
this._socket.pause();
let heloStatus = this._checkHelo(data);
if (!heloStatus.succeeded) {
this.internalLogger.error('Received invalid HELO message from ' + this._socket.remoteAddress);
this._disconnect();
_handshake(callback) {
if (this._status === 'established') {
return;
}
this._status = 'pingpong';
this._socket.write(new Buffer(this._generatePing()), () => {
this._socket.resume();
this._socket.once('data', (data) => {
let pongStatus = this._checkPong(data);
if (!pongStatus.succeeded) {
this.internalLogger.error(pongStatus.message);
let error = new FluentLoggerError.HandshakeError(pongStatus.message);
this._handleEvent('error', error);
this._disconnect();
return;
}
this._status = 'established';
this.internalLogger.info('Established');
callback();
this._status = 'helo';
this._socket.once('data', (data) => {
this._socket.pause();
const heloStatus = this._checkHelo(data);
if (!heloStatus.succeeded) {
this.internalLogger.error('Received invalid HELO message from ' + this._socket.remoteAddress);
this._disconnect();
return;
}
this._status = 'pingpong';
this._socket.write(Buffer.from(this._generatePing()), () => {
this._socket.resume();
this._socket.once('data', (data) => {
const pongStatus = this._checkPong(data);
if (!pongStatus.succeeded) {
this.internalLogger.error(pongStatus.message);
const error = new FluentLoggerError.HandshakeError(pongStatus.message);
this._handleEvent('error', error);
this._disconnect();
return;
}
this._status = 'established';
this.internalLogger.info('Established');
callback();
});
});
});
});
};
}
FluentSender.prototype._flushSendQueue = function() {
if (this._flushingSendQueue)
return;
_flushSendQueue() {
if (this._flushingSendQueue)
return;
this._flushingSendQueue = true;
process.nextTick(() => {
this._waitToWrite();
});
};
FluentSender.prototype._waitToWrite = function() {
if (!this._socket) {
this._flushingSendQueue = false;
return;
this._flushingSendQueue = true;
process.nextTick(() => {
this._waitToWrite();
});
}
if (this._socket.writable) {
if (this._eventMode === 'Message') {
this._doFlushSendQueue();
} else {
if (this._sendQueueSize >= this._sendQueueSizeLimit) {
this._flushSendQueueTimeoutId && clearTimeout(this._flushSendQueueTimeoutId);
_waitToWrite() {
if (!this._socket) {
this._flushingSendQueue = false;
return;
}
if (this._socket.writable) {
if (this._eventMode === 'Message') {
this._doFlushSendQueue();
} else {
this._flushSendQueueTimeoutId && clearTimeout(this._flushSendQueueTimeoutId);
this._flushSendQueueTimeoutId = setTimeout(() => {
if (this._sendQueueSize >= this._sendQueueSizeLimit) {
this._flushSendQueueTimeoutId && clearTimeout(this._flushSendQueueTimeoutId);
this._doFlushSendQueue();
}, this._flushInterval);
} else {
this._flushSendQueueTimeoutId && clearTimeout(this._flushSendQueueTimeoutId);
this._flushSendQueueTimeoutId = setTimeout(() => {
this._doFlushSendQueue();
}, this._flushInterval);
}
}
} else {
process.nextTick(() => {
this._waitToWrite();
});
}
} else {
process.nextTick(() => {
this._waitToWrite();
});
}
}
FluentSender.prototype._doFlushSendQueue = function(timeoutId) {
if (this._eventMode === 'Message') {
let item = this._sendQueue.shift();
if (item === undefined) {
this._flushingSendQueue = false;
// nothing written;
return;
_doFlushSendQueue(timeoutId) {
if (this._eventMode === 'Message') {
let item = this._sendQueue.shift();
if (item === undefined) {
this._flushingSendQueue = false;
// nothing written;
return;
}
this._doWrite(item.packet, item.options, timeoutId, [item.callback]);
} else {
if (this._sendQueue.size === 0) {
this._flushingSendQueue = false;
return;
}
let tag = Array.from(this._sendQueue.keys())[0];
let eventEntryData = this._sendQueue.get(tag);
let entries = eventEntryData.eventEntry;
let size = eventEntryData.eventEntry.length;
this._sendQueue.delete(tag);
if (this._compressed) {
entries = zlib.gzipSync(entries);
size = entries.length;
}
const options = {
chunk: crypto.randomBytes(16).toString('base64'),
size: size,
compressed: this._compressed ? 'gzip' : 'text'
};
const packet = msgpack.encode([tag, entries, options], { codec: codec });
this._doWrite(packet, options, timeoutId, eventEntryData.callbacks);
}
this._doWrite(item.packet, item.options, timeoutId, [item.callback]);
} else {
if (this._sendQueue.size === 0) {
this._flushingSendQueue = false;
return;
}
let tag = Array.from(this._sendQueue.keys())[0];
let eventEntryData = this._sendQueue.get(tag);
let entries = eventEntryData.eventEntry;
let size = eventEntryData.eventEntry.length;
this._sendQueue.delete(tag);
if (this._compressed) {
entries = zlib.gzipSync(entries);
size = entries.length;
}
let options = {
chunk: crypto.randomBytes(16).toString('base64'),
size: size,
compressed: this._compressed ? 'gzip' : 'text'
};
let packet = msgpack.encode([tag, entries, options], { codec: codec });
this._doWrite(packet, options, timeoutId, eventEntryData.callbacks);
}
};
FluentSender.prototype._doWrite = function(packet, options, timeoutId, callbacks) {
const sendQueueSize = this._sendQueueSize;
this._socket.write(new Buffer(packet), () => {
if (this.requireAckResponse) {
this._socket.once('data', (data) => {
timeoutId && clearTimeout(timeoutId);
var response = msgpack.decode(data, { codec: codec });
if (response.ack !== options.chunk) {
var error = new FluentLoggerError.ResponseError(
'ack in response and chunk id in sent data are different',
{ ack: response.ack, chunk: options.chunk }
);
_doWrite(packet, options, timeoutId, callbacks) {
const sendQueueSize = this._sendQueueSize;
this._socket.write(Buffer.from(packet), () => {
if (this.requireAckResponse) {
this._socket.once('data', (data) => {
timeoutId && clearTimeout(timeoutId);
const response = msgpack.decode(data, { codec: codec });
if (response.ack !== options.chunk) {
const error = new FluentLoggerError.ResponseError(
'ack in response and chunk id in sent data are different',
{ ack: response.ack, chunk: options.chunk }
);
callbacks.forEach((callback) => {
this._handleEvent('error', error, callback);
});
}
this._sendQueueSize -= sendQueueSize;
callbacks.forEach((callback) => {
callback && callback();
});
process.nextTick(() => {
this._waitToWrite();
});
});
timeoutId = setTimeout(() => {
const error = new FluentLoggerError.ResponseTimeout('ack response timeout');
callbacks.forEach((callback) => {
this._handleEvent('error', error, callback);
});
}
}, this.ackResponseTimeout);
} else {
this._sendQueueSize -= sendQueueSize;

@@ -418,169 +434,160 @@ callbacks.forEach((callback) => {

});
});
timeoutId = setTimeout(() => {
var error = new FluentLoggerError.ResponseTimeout('ack response timeout');
callbacks.forEach((callback) => {
this._handleEvent('error', error, callback);
});
}, this.ackResponseTimeout);
} else {
this._sendQueueSize -= sendQueueSize;
callbacks.forEach((callback) => {
callback && callback();
});
process.nextTick(() => {
this._waitToWrite();
});
}
});
}
_handleEvent(signal, data, callback) {
callback && callback(data);
if (this._eventEmitter.listenerCount(signal) > 0) {
this._eventEmitter.emit(signal, data);
}
});
};
FluentSender.prototype._handleEvent = function _handleEvent(signal, data, callback) {
callback && callback(data);
if (this._eventEmitter.listenerCount(signal) > 0) {
this._eventEmitter.emit(signal, data);
}
};
FluentSender.prototype._setupErrorHandler = function _setupErrorHandler(callback) {
if (!this.reconnectInterval) {
return;
_setupErrorHandler(callback) {
if (!this.reconnectInterval) {
return;
}
this.on('error', (error) => {
this._flushingSendQueue = false;
this._status = null;
this.internalLogger.error('Fluentd error', error);
this.internalLogger.info('Fluentd will reconnect after ' + this.reconnectInterval / 1000 + ' seconds');
let timeoutId = setTimeout(() => {
this.internalLogger.info('Fluentd is reconnecting...');
this._connect(() => {
this.internalLogger.info('Fluentd reconnection finished!!');
});
}, this.reconnectInterval);
callback && callback(timeoutId);
});
}
this.on('error', (error) => {
this._flushingSendQueue = false;
this._status = null;
this.internalLogger.error('Fluentd error', error);
this.internalLogger.info('Fluentd will reconnect after ' + this.reconnectInterval / 1000 + ' seconds');
let timeoutId = setTimeout(() => {
this.internalLogger.info('Fluentd is reconnecting...');
this._connect(() => {
this.internalLogger.info('Fluentd reconnection finished!!');
});
}, this.reconnectInterval);
callback && callback(timeoutId);
});
};
FluentSender.prototype._checkHelo = function _checkHelo(data) {
// ['HELO', options(hash)]
this.internalLogger.info('Checking HELO...');
var message = msgpack.decode(data);
if (message.length !== 2) {
return { succeeded: false, message: 'Invalid format for HELO message' };
_checkHelo(data) {
// ['HELO', options(hash)]
this.internalLogger.info('Checking HELO...');
const message = msgpack.decode(data);
if (message.length !== 2) {
return { succeeded: false, message: 'Invalid format for HELO message' };
}
if (message[0] !== 'HELO') {
return { succeeded: false, message: 'Invalid format for HELO message' };
}
const options = message[1] || {};
this.sharedKeyNonce = options['nonce'] || '';
this.authentication = options['auth'] || '';
return { succeeded: true };
}
if (message[0] !== 'HELO') {
return { succeeded: false, message: 'Invalid format for HELO message' };
}
var options = message[1] || {};
this.sharedKeyNonce = options['nonce'] || '';
this.authentication = options['auth'] || '';
return { succeeded: true };
};
FluentSender.prototype._generatePing = function _generatePing() {
// [
// 'PING',
// client_hostname,
// shared_key_salt,
// sha512_hex(sharedkey_salt + client_hostname + nonce + shared_key),
// username || '',
// sha512_hex(auth_salt + username + password) || ''
// ]
var sharedKeyHexdigest = crypto.createHash('sha512')
.update(this.sharedKeySalt)
.update(this.security.clientHostname)
.update(this.sharedKeyNonce)
.update(this.security.sharedKey)
.digest('hex');
var ping = ['PING', this.security.clientHostname, this.sharedKeySalt, sharedKeyHexdigest];
if (Buffer.isBuffer(this.authentication) && Buffer.byteLength(this.authentication) !== 0) {
var passwordHexDigest = crypto.createHash('sha512')
.update(this.authentication)
.update(this.security.username || '')
.update(this.security.password || '')
_generatePing() {
// [
// 'PING',
// client_hostname,
// shared_key_salt,
// sha512_hex(sharedkey_salt + client_hostname + nonce + shared_key),
// username || '',
// sha512_hex(auth_salt + username + password) || ''
// ]
const sharedKeyHexdigest = crypto.createHash('sha512')
.update(this.sharedKeySalt)
.update(this.security.clientHostname)
.update(this.sharedKeyNonce)
.update(this.security.sharedKey)
.digest('hex');
ping.push(this.username, passwordHexDigest);
} else {
ping.push('', '');
const ping = ['PING', this.security.clientHostname, this.sharedKeySalt, sharedKeyHexdigest];
if (Buffer.isBuffer(this.authentication) && Buffer.byteLength(this.authentication) !== 0) {
const passwordHexDigest = crypto.createHash('sha512')
.update(this.authentication)
.update(this.security.username || '')
.update(this.security.password || '')
.digest('hex');
ping.push(this.username, passwordHexDigest);
} else {
ping.push('', '');
}
return msgpack.encode(ping);
}
return msgpack.encode(ping);
};
FluentSender.prototype._checkPong = function _checkPong(data) {
// [
// 'PONG',
// bool(authentication result),
// 'reason if authentication failed',
// server_hostname,
// sha512_hex(salt + server_hostname + nonce + sharedkey)
// ]
this.internalLogger.info('Checking PONG...');
var message = msgpack.decode(data);
if (message.length !== 5) {
return false;
_checkPong(data) {
// [
// 'PONG',
// bool(authentication result),
// 'reason if authentication failed',
// server_hostname,
// sha512_hex(salt + server_hostname + nonce + sharedkey)
// ]
this.internalLogger.info('Checking PONG...');
const message = msgpack.decode(data);
if (message.length !== 5) {
return false;
}
if (message[0] !== 'PONG') {
return { succeeded: false, message: 'Invalid format for PONG message' };
}
const _pong = message[0];
const authResult = message[1];
const reason = message[2];
const hostname = message[3];
const sharedKeyHexdigest = message[4];
if (!authResult) {
return { succeeded: false, message: 'Authentication failed: ' + reason };
}
if (hostname === this.security.clientHostname) {
return { succeeded: false, message: 'Same hostname between input and output: invalid configuration' };
}
const clientsideHexdigest = crypto.createHash('sha512')
.update(this.sharedKeySalt)
.update(hostname)
.update(this.sharedKeyNonce)
.update(this.security.sharedKey)
.digest('hex');
if (sharedKeyHexdigest !== clientsideHexdigest) {
return { succeeded: false, message: 'Sharedkey mismatch' };
}
return { succeeded: true };
}
if (message[0] !== 'PONG') {
return { succeeded: false, message: 'Invalid format for PONG message' };
}
var _pong = message[0];
var authResult = message[1];
var reason = message[2];
var hostname = message[3];
var sharedKeyHexdigest = message[4];
if (!authResult) {
return { succeeded: false, message: 'Authentication failed: ' + reason };
}
if (hostname === this.security.clientHostname) {
return { succeeded: false, message: 'Same hostname between input and output: invalid configuration' };
}
var clientsideHexdigest = crypto.createHash('sha512')
.update(this.sharedKeySalt)
.update(hostname)
.update(this.sharedKeyNonce)
.update(this.security.sharedKey)
.digest('hex');
if (sharedKeyHexdigest !== clientsideHexdigest) {
return { succeeded: false, message: 'Sharedkey mismatch' };
}
return { succeeded: true };
};
FluentSender.prototype.toStream = function(options) {
if (typeof options === 'string') {
options = {label: options};
} else {
options = options || {};
}
var label = options.label;
if (!label) {
throw new Error('label is needed');
}
var defaultEncoding = options.encoding || 'UTF-8';
var writable = new stream.Writable();
var dataString = '';
writable._write = (chunk, encoding, callback) => {
var dataArray = chunk.toString(defaultEncoding).split(/\n/);
var next = () => {
if (dataArray.length) {
dataString += dataArray.shift();
}
if (!dataArray.length) {
process.nextTick(callback);
return;
}
this.emit(label, { message: dataString }, (err) => {
if (err) {
this._handleEvent('error', err, callback);
toStream(options) {
if (typeof options === 'string') {
options = {label: options};
} else {
options = options || {};
}
const label = options.label;
if (!label) {
throw new Error('label is needed');
}
const defaultEncoding = options.encoding || 'UTF-8';
const writable = new stream.Writable();
let dataString = '';
writable._write = (chunk, encoding, callback) => {
const dataArray = chunk.toString(defaultEncoding).split(/\n/);
let next = () => {
if (dataArray.length) {
dataString += dataArray.shift();
}
if (!dataArray.length) {
process.nextTick(callback);
return;
}
dataString = '';
next();
});
this.emit(label, { message: dataString }, (err) => {
if (err) {
this._handleEvent('error', err, callback);
return;
}
dataString = '';
next();
});
};
next();
};
next();
return writable;
}
}
['addListener', 'on', 'once', 'removeListener', 'removeAllListeners', 'setMaxListeners', 'getMaxListeners'].forEach((attr, i) => {
FluentSender.prototype[attr] = function() {
return this._eventEmitter[attr].apply(this._eventEmitter, Array.from(arguments));
};
return writable;
};
});
module.exports = {};
module.exports.FluentSender = FluentSender;
module.exports = FluentSender;
'use strict';
var net = require('net');
var tls = require('tls');
var msgpack = require('msgpack-lite');
var crypto = require('crypto');
var zlib = require('zlib');
const net = require('net');
const tls = require('tls');
const msgpack = require('msgpack-lite');
const crypto = require('crypto');
const zlib = require('zlib');
function MockFluentdServer(options, tlsOptions) {
this._port = null;
this._options = options;
this._tlsOptions = tlsOptions || {};
this._received = [];
this._clients = {};
this._state = null;
this._nonce = null;
this._userAuthSalt = null;
let server = (socket) => {
var clientKey = socket.remoteAddress + ':' + socket.remotePort;
this._clients[clientKey] = socket;
socket.on('end', () => {
delete this._clients[clientKey];
});
var stream = msgpack.createDecodeStream();
socket.pipe(stream).on('data', (m) => {
if (this._state === 'pingpong') {
let authResult = this._checkPing(m);
socket.write(msgpack.encode(this._generatePong(authResult, this._nonce, this._options.security.sharedKey)));
if (authResult.succeeded) {
this._state = 'established';
} else {
socket.end();
}
} else if (this._state === 'established') {
let entries = m[1];
let options = null;
if (entries instanceof Buffer) {
options = m[2];
if (options.compressed === 'gzip') {
entries = zlib.gunzipSync(entries);
class MockFluentdServer {
constructor(options, tlsOptions) {
this._port = null;
this._options = options;
this._tlsOptions = tlsOptions || {};
this._received = [];
this._clients = {};
this._state = null;
this._nonce = null;
this._userAuthSalt = null;
let server = (socket) => {
const clientKey = socket.remoteAddress + ':' + socket.remotePort;
this._clients[clientKey] = socket;
socket.on('end', () => {
delete this._clients[clientKey];
});
const stream = msgpack.createDecodeStream();
socket.pipe(stream).on('data', (m) => {
if (this._state === 'pingpong') {
const authResult = this._checkPing(m);
socket.write(msgpack.encode(this._generatePong(authResult, this._nonce, this._options.security.sharedKey)));
if (authResult.succeeded) {
this._state = 'established';
} else {
socket.end();
}
let s = msgpack.createDecodeStream();
s.on('data', (data) => {
let _time = data[0];
let record = data[1];
} else if (this._state === 'established') {
let entries = m[1];
let options = null;
if (entries instanceof Buffer) {
options = m[2];
if (options.compressed === 'gzip') {
entries = zlib.gunzipSync(entries);
}
let s = msgpack.createDecodeStream();
s.on('data', (data) => {
let _time = data[0];
let record = data[1];
this._received.push({
tag: m[0],
data: record,
options: options
});
});
s.write(entries);
} else {
this._received.push({
tag: m[0],
data: record,
options: options
time: m[1],
data: m[2],
options: m[3]
});
});
s.write(entries);
} else {
this._received.push({
tag: m[0],
time: m[1],
data: m[2],
options: m[3]
});
options = m[3];
options = m[3];
}
if (this._options.requireAckResponse && options && options.chunk) {
const response = {
ack: options.chunk
};
socket.write(msgpack.encode(response));
}
}
if (this._options.requireAckResponse && options && options.chunk) {
var response = {
ack: options.chunk
};
socket.write(msgpack.encode(response));
}
});
};
let connectionEventType = 'connection';
if (this._tlsOptions.tls) {
connectionEventType = 'secureConnection';
this._server = tls.createServer(this._tlsOptions, server);
} else {
this._server = net.createServer(server);
}
this._server.on(connectionEventType, (socket) => {
if (this._options.security && this._options.security.sharedKey && this._options.security.serverHostname) {
this._state = 'helo';
this._nonce = crypto.randomBytes(16);
this._userAuthSalt = crypto.randomBytes(16);
} else {
this._state = 'established';
}
if (this._state === 'helo') {
socket.write(msgpack.encode(this._generateHelo(this._nonce, this._userAuthSalt)));
this._state = 'pingpong';
}
});
};
var connectionEventType = 'connection';
if (this._tlsOptions.tls) {
connectionEventType = 'secureConnection';
this._server = tls.createServer(this._tlsOptions, server);
} else {
this._server = net.createServer(server);
}
this._server.on(connectionEventType, (socket) => {
if (this._options.security && this._options.security.sharedKey && this._options.security.serverHostname) {
this._state = 'helo';
this._nonce = crypto.randomBytes(16);
this._userAuthSalt = crypto.randomBytes(16);
} else {
this._state = 'established';
}
if (this._state === 'helo') {
socket.write(msgpack.encode(this._generateHelo(this._nonce, this._userAuthSalt)));
this._state = 'pingpong';
}
});
}
MockFluentdServer.prototype.__defineGetter__('port', function() {
return this._port;
});
get port() {
return this._port;
}
MockFluentdServer.prototype.__defineGetter__('messages', function() {
return this._received;
});
get messages() {
return this._received;
}
MockFluentdServer.prototype._generateHelo = function(nonce, userAuthSalt) {
// ['HELO', options(hash)]
let options = {
'nonce': nonce,
'auth': this._options.security ? userAuthSalt : '',
'keepalive': false
};
return ['HELO', options];
};
_generateHelo(nonce, userAuthSalt) {
// ['HELO', options(hash)]
let options = {
'nonce': nonce,
'auth': this._options.security ? userAuthSalt : '',
'keepalive': false
};
return ['HELO', options];
}
MockFluentdServer.prototype._checkPing = function(m) {
// this._options.checkPing() should return { succeeded: true, reason: 'why', sharedKeySalt: 'salt' }
if (this._options.checkPing) {
return this._options.checkPing(m);
} else {
// ['PING', self_hostname, shared_key_salt, sha512_hex(shared_key_salt + self_hostname + nonce + shared_key), username || '', sha512_hex(auth_salt + username + password) || '']
if (m.length !== 6) {
return { succeeded: false, reason: 'Invalid ping message size' };
}
if (m[0] !== 'PING') {
return { succeeded: false, reason: 'Invalid ping message' };
}
let _ping = m[0];
let hostname = m[1];
let sharedKeySalt = m[2];
let sharedKeyHexDigest = m[3];
let _username = m[4];
let passwordDigest = m[5];
let serverSideDigest = crypto.createHash('sha512')
.update(sharedKeySalt)
.update(hostname)
.update(this._nonce)
.update(this._options.security.sharedKey)
.digest('hex');
if (sharedKeyHexDigest !== serverSideDigest) {
return { succeeded: false, reason: 'shared key mismatch' };
}
if (this._options.security.username && this._options.security.password) {
let serverSidePasswordDigest = crypto.createHash('sha512')
.update(this._userAuthSalt)
.update(this._options.security.username)
.update(this._options.security.password)
_checkPing(m) {
// this._options.checkPing() should return { succeeded: true, reason: 'why', sharedKeySalt: 'salt' }
if (this._options.checkPing) {
return this._options.checkPing(m);
} else {
// ['PING', self_hostname, shared_key_salt, sha512_hex(shared_key_salt + self_hostname + nonce + shared_key), username || '', sha512_hex(auth_salt + username + password) || '']
if (m.length !== 6) {
return { succeeded: false, reason: 'Invalid ping message size' };
}
if (m[0] !== 'PING') {
return { succeeded: false, reason: 'Invalid ping message' };
}
const _ping = m[0];
const hostname = m[1];
const sharedKeySalt = m[2];
const sharedKeyHexDigest = m[3];
const _username = m[4];
const passwordDigest = m[5];
const serverSideDigest = crypto.createHash('sha512')
.update(sharedKeySalt)
.update(hostname)
.update(this._nonce)
.update(this._options.security.sharedKey)
.digest('hex');
if (passwordDigest !== serverSidePasswordDigest) {
return { succeeded: false, reason: 'username/password mismatch' };
if (sharedKeyHexDigest !== serverSideDigest) {
return { succeeded: false, reason: 'shared key mismatch' };
}
if (this._options.security.username && this._options.security.password) {
const serverSidePasswordDigest = crypto.createHash('sha512')
.update(this._userAuthSalt)
.update(this._options.security.username)
.update(this._options.security.password)
.digest('hex');
if (passwordDigest !== serverSidePasswordDigest) {
return { succeeded: false, reason: 'username/password mismatch' };
}
}
return { succeeded: true, sharedKeySalt: sharedKeySalt };
}
return { succeeded: true, sharedKeySalt: sharedKeySalt };
}
};
MockFluentdServer.prototype._generatePong = function(authResult, nonce, sharedKey) {
// this._options.generatePong() should return PONG message
// [
// 'PONG',
// bool(authentication result),
// 'reason if authentication failed',
// serverHostname,
// sha512_hex(salt + serverHostname + nonce + sharedkey)
// ]
if (authResult.succeeded) {
let sharedKeyDigestHex = crypto.createHash('sha512')
.update(authResult.sharedKeySalt)
.update(this._options.security.serverHostname)
.update(nonce)
.update(sharedKey)
.digest('hex');
return ['PONG', true, '', this._options.security.serverHostname, sharedKeyDigestHex];
} else {
return ['PONG', false, authResult.reason, '', ''];
_generatePong(authResult, nonce, sharedKey) {
// this._options.generatePong() should return PONG message
// [
// 'PONG',
// bool(authentication result),
// 'reason if authentication failed',
// serverHostname,
// sha512_hex(salt + serverHostname + nonce + sharedkey)
// ]
if (authResult.succeeded) {
const sharedKeyDigestHex = crypto.createHash('sha512')
.update(authResult.sharedKeySalt)
.update(this._options.security.serverHostname)
.update(nonce)
.update(sharedKey)
.digest('hex');
return ['PONG', true, '', this._options.security.serverHostname, sharedKeyDigestHex];
} else {
return ['PONG', false, authResult.reason, '', ''];
}
}
};
MockFluentdServer.prototype.listen = function(callback) {
this._server.listen(() => {
this._port = this._server.address().port;
callback();
});
};
listen(callback) {
this._server.listen(() => {
this._port = this._server.address().port;
callback();
});
}
MockFluentdServer.prototype.close = function(callback) {
this._server.close(() => {
callback();
});
for (var i in this._clients) {
this._clients[i].end();
// this._clients[i].destroy();
close(callback) {
this._server.close(() => {
callback();
});
for (const i in this._clients) {
this._clients[i].end();
// this._clients[i].destroy();
}
}
};
}
module.exports = {
runServer: function(options, tlsOptions, callback) {
var server = new MockFluentdServer(options, tlsOptions);
const server = new MockFluentdServer(options, tlsOptions);
server.listen(() => {

@@ -197,3 +199,3 @@ callback(server, (_callback) => {

setTimeout(() => {
var messages = server.messages;
const messages = server.messages;
server.close(() => {

@@ -200,0 +202,0 @@ _callback && _callback(messages);

@@ -6,4 +6,4 @@ 'use strict';

/* eslint node/no-unpublished-require: ["error", {"allowModules": ["winston"]}] */
const sender = require('./sender');
const FluentSender = require('./sender');
/* eslint-disable-next-line node/no-extraneous-require */
const Transport = require('winston-transport');

@@ -24,3 +24,3 @@ const DEFAULT_TAG = 'winston';

this.sender = new sender.FluentSender(tag, options);
this.sender = new FluentSender(tag, options);
}

@@ -41,2 +41,2 @@

}
}
};
{
"name": "fluent-logger",
"version": "2.8.0",
"version": "3.0.0",
"main": "./lib/index.js",

@@ -31,3 +31,3 @@ "scripts": {

"engines": {
"node": ">=4"
"node": ">=6"
},

@@ -40,3 +40,3 @@ "dependencies": {

"chai": "",
"eslint": "^4.11.0",
"eslint": "^5.1.0",
"eslint-plugin-node": "*",

@@ -43,0 +43,0 @@ "mocha": "*",

@@ -345,2 +345,2 @@ # fluent-logger for Node.js

This package is compatible with NodeJS versions > 4.
This package is compatible with NodeJS versions >= 6.
'use strict';
/* globals describe, it */
/* eslint node/no-unpublished-require: ["error", {"allowModules": ["chai"]}] */
var expect = require('chai').expect;
var EventTime = require('../lib/event-time').EventTime;
var msgpack = require('msgpack-lite');
const expect = require('chai').expect;
const EventTime = require('../lib/event-time');
const msgpack = require('msgpack-lite');
var codec = msgpack.createCodec();
const codec = msgpack.createCodec();
codec.addExtPacker(0x00, EventTime, EventTime.pack);

@@ -14,5 +14,5 @@ codec.addExtUnpacker(0x00, EventTime.unpack);

it('should equal to decoded value', (done) => {
var eventTime = EventTime.now();
var encoded = msgpack.encode(eventTime, { codec: codec });
var decoded = msgpack.decode(encoded, { codec: codec });
const eventTime = EventTime.now();
const encoded = msgpack.encode(eventTime, { codec: codec });
const decoded = msgpack.decode(encoded, { codec: codec });
expect(JSON.stringify(decoded)).to.equal(JSON.stringify(eventTime));

@@ -22,7 +22,7 @@ done();

it('should equal fromDate and fromTimestamp', (done) => {
var now = new Date(1489543720999); // 2017-03-15T02:08:40.999Z
var timestamp = now.getTime();
var eventTime = JSON.stringify(new EventTime(1489543720, 999000000));
var eventTime1 = JSON.stringify(EventTime.fromDate(now));
var eventTime2 = JSON.stringify(EventTime.fromTimestamp(timestamp));
const now = new Date(1489543720999); // 2017-03-15T02:08:40.999Z
const timestamp = now.getTime();
const eventTime = JSON.stringify(new EventTime(1489543720, 999000000));
const eventTime1 = JSON.stringify(EventTime.fromDate(now));
const eventTime2 = JSON.stringify(EventTime.fromTimestamp(timestamp));
expect(eventTime1).to.equal(eventTime);

@@ -29,0 +29,0 @@ expect(eventTime2).to.equal(eventTime);

@@ -6,12 +6,12 @@ 'use strict';

/* eslint node/no-unpublished-require: ["error", {"allowModules": ["async", "chai"]}] */
var expect = require('chai').expect;
var sender = require('../lib/sender');
var EventTime = require('../lib/event-time').EventTime;
var runServer = require('../lib/testHelper').runServer;
var stream = require('stream');
var async = require('async');
var EventEmitter = require('events').EventEmitter;
var msgpack = require('msgpack-lite');
const expect = require('chai').expect;
const FluentSender = require('../lib/sender');
const EventTime = require('../lib/event-time');
const runServer = require('../lib/testHelper').runServer;
const stream = require('stream');
const async = require('async');
const EventEmitter = require('events').EventEmitter;
const msgpack = require('msgpack-lite');
var codec = msgpack.createCodec();
const codec = msgpack.createCodec();
codec.addExtPacker(0x00, EventTime, EventTime.pack);

@@ -24,5 +24,6 @@ codec.addExtUnpacker(0x00, EventTime.unpack);

if (tls) {
var selfsigned = require('selfsigned');
var attrs = [{ name: 'commonName', value: 'foo.com' }];
var pems = selfsigned.generate(attrs, { days: 365 });
/* eslint-disable-next-line node/no-unpublished-require */
const selfsigned = require('selfsigned');
const attrs = [{ name: 'commonName', value: 'foo.com' }];
const pems = selfsigned.generate(attrs, { days: 365 });
serverOptions = { tls: true, key: pems.private, cert: pems.cert, ca: pems.cert };

@@ -33,3 +34,3 @@ clientOptions = { tls: true, tlsOptions: { rejectUnauthorized: false } };

try {
new sender.FluentSender('debug', Object.assign({}, clientOptions, { eventMode: 'Unknown' }));
new FluentSender('debug', Object.assign({}, clientOptions, { eventMode: 'Unknown' }));
} catch (e) {

@@ -43,8 +44,8 @@ expect(e.message).to.be.equal('Unknown event mode: Unknown');

runServer({}, serverOptions, (server, finish) => {
var s1 = new sender.FluentSender('debug', Object.assign({}, clientOptions, { port: server.port }));
var emits = [];
const s1 = new FluentSender('debug', Object.assign({}, clientOptions, { port: server.port }));
const emits = [];
function emit(k) {
emits.push((done) => { s1.emit('record', k, done); });
}
for (var i = 0; i < 10; i++) {
for (let i = 0; i < 10; i++) {
emit({ number: i });

@@ -55,3 +56,3 @@ }

expect(data.length).to.be.equal(10);
for (var i = 0; i < 10; i++) {
for (let i = 0; i < 10; i++) {
expect(data[i].tag).to.be.equal('debug.record');

@@ -69,4 +70,4 @@ expect(data[i].data.number).to.be.equal(i);

runServer({}, serverOptions, (server, finish) => {
var s = new sender.FluentSender('debug', Object.assign({}, clientOptions, {port: server.port}));
var called = false;
const s = new FluentSender('debug', Object.assign({}, clientOptions, {port: server.port}));
let called = false;
s.on('connect', () => {

@@ -85,3 +86,3 @@ called = true;

it('should raise error when connection fails', (done) => {
var s = new sender.FluentSender('debug', Object.assign({}, clientOptions, {
const s = new FluentSender('debug', Object.assign({}, clientOptions, {
host: 'localhost',

@@ -98,3 +99,3 @@ port: 65535

it('should log error when connection fails w/ internal logger', (done) => {
var logger = {
const logger = {
buffer: {

@@ -111,3 +112,3 @@ info: [],

};
var s = new sender.FluentSender('debug', Object.assign({}, clientOptions, {
const s = new FluentSender('debug', Object.assign({}, clientOptions, {
host: 'localhost',

@@ -131,3 +132,3 @@ port: 65535,

runServer({}, serverOptions, (server, finish) => {
var s = new sender.FluentSender('debug', Object.assign({}, clientOptions, {port: server.port}));
const s = new FluentSender('debug', Object.assign({}, clientOptions, {port: server.port}));
s.emit('1st record', { message: '1st data' });

@@ -151,5 +152,5 @@ s.emit('2nd record', { message: '2nd data' });

runServer({}, serverOptions, (server, finish) => {
var s = new sender.FluentSender('debug', Object.assign({}, clientOptions, {port: server.port}));
var timestamp = new Date(2222, 12, 4);
var timestamp_seconds_since_epoch = Math.floor(timestamp.getTime() / 1000);
const s = new FluentSender('debug', Object.assign({}, clientOptions, {port: server.port}));
const timestamp = new Date(2222, 12, 4);
const timestamp_seconds_since_epoch = Math.floor(timestamp.getTime() / 1000);

@@ -167,4 +168,4 @@ s.emit('1st record', { message: '1st data' }, timestamp, () => {

runServer({}, serverOptions, (server, finish) => {
var s = new sender.FluentSender('debug', Object.assign({}, clientOptions, {port: server.port}));
var timestamp = Math.floor(new Date().getTime() / 1000);
const s = new FluentSender('debug', Object.assign({}, clientOptions, {port: server.port}));
const timestamp = Math.floor(new Date().getTime() / 1000);

@@ -182,8 +183,8 @@ s.emit('1st record', { message: '1st data' }, timestamp, () => {

runServer({}, serverOptions, (server, finish) => {
var s = new sender.FluentSender('debug', Object.assign({}, clientOptions, {port: server.port}));
var eventTime = EventTime.now();
const s = new FluentSender('debug', Object.assign({}, clientOptions, {port: server.port}));
const eventTime = EventTime.now();
s.emit('1st record', { message: '1st data' }, eventTime, () => {
finish((data) => {
var decoded = EventTime.unpack(data[0].time.buffer);
const decoded = EventTime.unpack(data[0].time.buffer);
expect(JSON.stringify(decoded)).to.equal(JSON.stringify(eventTime));

@@ -197,3 +198,3 @@ done();

it('should resume the connection automatically and flush the queue', (done) => {
var s = new sender.FluentSender('debug', clientOptions);
const s = new FluentSender('debug', clientOptions);
s.emit('1st record', { message: '1st data' });

@@ -222,3 +223,3 @@ s.on('error', (err) => {

runServer({}, serverOptions, (server, finish) => {
var s = new sender.FluentSender('debug', Object.assign({}, clientOptions, {port: server.port}));
const s = new FluentSender('debug', Object.assign({}, clientOptions, {port: server.port}));
s.emit('foo', 'bar', () => {

@@ -253,11 +254,11 @@ // connected

runServer({requireAckResponse: true}, serverOptions, (server, finish) => {
var s1 = new sender.FluentSender('debug', Object.assign({}, clientOptions, {
const s1 = new FluentSender('debug', Object.assign({}, clientOptions, {
port: server.port,
requireAckResponse: true
}));
var emits = [];
const emits = [];
function emit(k) {
emits.push((done) => { s1.emit('record', k, done); });
}
for (var i = 0; i < 10; i++) {
for (let i = 0; i < 10; i++) {
emit({ number: i });

@@ -268,3 +269,3 @@ }

expect(data.length).to.be.equal(10);
for (var i = 0; i < 10; i++) {
for (let i = 0; i < 10; i++) {
expect(data[i].tag).to.be.equal('debug.record');

@@ -283,3 +284,3 @@ expect(data[i].data.number).to.be.equal(i);

runServer({requireAckResponse: false }, serverOptions, (server, finish) => {
var s1 = new sender.FluentSender('debug', Object.assign({}, clientOptions, {
const s1 = new FluentSender('debug', Object.assign({}, clientOptions, {
port: server.port,

@@ -301,3 +302,3 @@ requireAckResponse: false,

it('should set error handler', (done) => {
var s = new sender.FluentSender('debug', Object.assign({}, clientOptions, {
const s = new FluentSender('debug', Object.assign({}, clientOptions, {
reconnectInterval: 100

@@ -400,3 +401,3 @@ }));

runServer({}, serverOptions, (server, finish) => {
var s1 = new sender.FluentSender('debug', Object.assign({}, clientOptions, { port: server.port }));
const s1 = new FluentSender('debug', Object.assign({}, clientOptions, { port: server.port }));
s1.emit.apply(s1, testCase.args);

@@ -465,3 +466,3 @@

runServer({}, serverOptions, (server, finish) => {
var s1 = new sender.FluentSender(null, Object.assign({}, clientOptions, { port: server.port }));
const s1 = new FluentSender(null, Object.assign({}, clientOptions, { port: server.port }));
s1.emit.apply(s1, testCase.args);

@@ -517,3 +518,3 @@

runServer({}, serverOptions, (server, finish) => {
var s1 = new sender.FluentSender(null, Object.assign({}, clientOptions, { port: server.port }));
const s1 = new FluentSender(null, Object.assign({}, clientOptions, { port: server.port }));
s1.on('error', (error) => {

@@ -541,3 +542,3 @@ expect(error.name).to.be.equal('MissingTagError');

runServer({}, serverOptions, (server, finish) => {
var s1 = new sender.FluentSender(null, Object.assign({}, clientOptions, { port: server.port }));
const s1 = new FluentSender(null, Object.assign({}, clientOptions, { port: server.port }));
s1.on('error', (error) => {

@@ -555,3 +556,3 @@ expect(error.name).to.be.equal('DataTypeError');

it('should set max listeners', (done) => {
var s = new sender.FluentSender('debug', clientOptions);
const s = new FluentSender('debug', clientOptions);
if (EventEmitter.prototype.getMaxListeners) {

@@ -572,3 +573,3 @@ expect(s.getMaxListeners()).to.be.equal(10);

runServer({}, serverOptions, (server, finish) => {
var s = new sender.FluentSender('debug', Object.assign({}, clientOptions, {port: server.port}));
const s = new FluentSender('debug', Object.assign({}, clientOptions, {port: server.port}));
s.emit('1st record', { message: '1st data' }, () => {

@@ -591,5 +592,5 @@ s._disconnect();

runServer({}, serverOptions, (server, finish) => {
var s = new sender.FluentSender('debug', Object.assign({}, clientOptions, { port: server.port }));
var ss = s.toStream('record');
var pt = new stream.PassThrough();
const s = new FluentSender('debug', Object.assign({}, clientOptions, { port: server.port }));
const ss = s.toStream('record');
const pt = new stream.PassThrough();
pt.pipe(ss);

@@ -616,3 +617,3 @@ pt.push('data1\n');

runServer({ requireAckResponse: true }, serverOptions, (server, finish) => {
var s = new sender.FluentSender('debug', Object.assign({}, clientOptions, {
const s = new FluentSender('debug', Object.assign({}, clientOptions, {
port: server.port,

@@ -623,11 +624,11 @@ timeout: 3.0,

}));
var errors = [];
const errors = [];
s.on('error', (err) => {
errors.push(count + ': ' + err);
});
var maxCount = 20;
var count = 0;
var sendMessage = function() {
var time = Math.round(Date.now() / 1000);
var data = {
const maxCount = 20;
let count = 0;
const sendMessage = function() {
const time = Math.round(Date.now() / 1000);
const data = {
count: count

@@ -644,3 +645,3 @@ };

};
var timer = setInterval(sendMessage, 10);
let timer = setInterval(sendMessage, 10);
});

@@ -651,3 +652,3 @@ });

runServer({}, serverOptions, (server, finish) => {
let loggerOptions = {
const loggerOptions = {
port: server.port,

@@ -660,3 +661,3 @@ eventMode: 'PackedForward',

};
var s = new sender.FluentSender('debug', Object.assign({}, clientOptions, loggerOptions));
const s = new FluentSender('debug', Object.assign({}, clientOptions, loggerOptions));
s.emit('test', { message: 'This is test 0' });

@@ -680,3 +681,3 @@ s.end('test', { message: 'This is test 1' });

runServer({}, serverOptions, (server, finish) => {
let loggerOptions = {
const loggerOptions = {
port: server.port,

@@ -689,3 +690,3 @@ eventMode: 'CompressedPackedForward',

};
var s = new sender.FluentSender('debug', Object.assign({}, clientOptions, loggerOptions));
const s = new FluentSender('debug', Object.assign({}, clientOptions, loggerOptions));
s.emit('test', { message: 'This is test 0' });

@@ -708,4 +709,4 @@ s.emit('test', { message: 'This is test 1' });

it('should process handshake sahred key', (done) => {
let sharedKey = 'sharedkey';
let options = {
const sharedKey = 'sharedkey';
const options = {
security: {

@@ -717,3 +718,3 @@ serverHostname: 'server.example.com',

runServer(options, serverOptions, (server, finish) => {
let loggerOptions = {
const loggerOptions = {
port: server.port,

@@ -729,3 +730,3 @@ security: {

};
var s = new sender.FluentSender('debug', Object.assign({}, clientOptions, loggerOptions));
const s = new FluentSender('debug', Object.assign({}, clientOptions, loggerOptions));
s.emit('test', { message: 'This is test 0' });

@@ -745,4 +746,4 @@ s.emit('test', { message: 'This is test 1' });

it('should process handshake sahred key mismatch', (done) => {
let sharedKey = 'sharedkey';
let options = {
const sharedKey = 'sharedkey';
const options = {
security: {

@@ -754,3 +755,3 @@ serverHostname: 'server.example.com',

runServer(options, serverOptions, (server, finish) => {
let loggerOptions = {
const loggerOptions = {
port: server.port,

@@ -766,3 +767,3 @@ security: {

};
var s = new sender.FluentSender('debug', Object.assign({}, clientOptions, loggerOptions));
const s = new FluentSender('debug', Object.assign({}, clientOptions, loggerOptions));
s.on('error', (error) => {

@@ -780,4 +781,4 @@ expect(error.message).to.be.equal('Authentication failed: shared key mismatch');

it('should process handshake user based authentication', (done) => {
let sharedKey = 'sharedkey';
let options = {
const sharedKey = 'sharedkey';
const options = {
security: {

@@ -791,3 +792,3 @@ serverHostname: 'server.example.com',

runServer(options, serverOptions, (server, finish) => {
let loggerOptions = {
const loggerOptions = {
port: server.port,

@@ -805,3 +806,3 @@ security: {

};
var s = new sender.FluentSender('debug', Object.assign({}, clientOptions, loggerOptions));
const s = new FluentSender('debug', Object.assign({}, clientOptions, loggerOptions));
s.emit('test', { message: 'This is test 0' });

@@ -821,4 +822,4 @@ s.emit('test', { message: 'This is test 1' });

it('should process handshake user based authentication failed', (done) => {
let sharedKey = 'sharedkey';
let options = {
const sharedKey = 'sharedkey';
const options = {
security: {

@@ -832,3 +833,3 @@ serverHostname: 'server.example.com',

runServer(options, serverOptions, (server, finish) => {
let loggerOptions = {
const loggerOptions = {
port: server.port,

@@ -846,3 +847,3 @@ security: {

};
var s = new sender.FluentSender('debug', Object.assign({}, clientOptions, loggerOptions));
const s = new FluentSender('debug', Object.assign({}, clientOptions, loggerOptions));
s.on('error', (error) => {

@@ -860,4 +861,4 @@ expect(error.message).to.be.equal('Authentication failed: username/password mismatch');

it('should process handshake failed', (done) => {
let sharedKey = 'sharedkey';
let options = {
const sharedKey = 'sharedkey';
const options = {
security: {

@@ -870,3 +871,3 @@ serverHostname: 'server.example.com',

runServer(options, serverOptions, (server, finish) => {
let loggerOptions = {
const loggerOptions = {
port: server.port,

@@ -882,3 +883,3 @@ security: {

};
var s = new sender.FluentSender('debug', Object.assign({}, clientOptions, loggerOptions));
const s = new FluentSender('debug', Object.assign({}, clientOptions, loggerOptions));
s.on('error', (err) => {

@@ -885,0 +886,0 @@ expect(err.message).to.be.equal('Authentication failed: reason');

'use strict';
/* globals describe, it */
/* eslint node/no-unpublished-require: ["error", {"allowModules": ["async", "chai", "winston"]}] */
var expect = require('chai').expect;
var winstonSupport = require('../lib/winston');
var winston = require('winston');
var runServer = require('../lib/testHelper').runServer;
const expect = require('chai').expect;
const winstonSupport = require('../lib/winston');
const winston = require('winston');
const runServer = require('../lib/testHelper').runServer;

@@ -21,3 +21,3 @@ describe('winston', () => {

runServer({}, {}, (server, finish) => {
var logger = winston.createLogger({
const logger = winston.createLogger({
format: winston.format.combine(

@@ -24,0 +24,0 @@ winston.format.splat(),

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap

Packages

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc