fluent-logger
Advanced tools
Comparing version 3.3.1 to 3.4.0
# v3.x | ||
## v3.4.0 - 2019-11-20 | ||
### Improvements | ||
* Gracefully free resources on `.end()` #144 | ||
* Update type definitions for TypeScript #145, #147 | ||
* Add new option messageQueueSizeLimit #152 | ||
### Fixes | ||
* Fix packets on multiple tags get corrput and multiple calls of callbacks on error #155 | ||
## v3.3.1 - 2019-02-19 | ||
@@ -4,0 +16,0 @@ |
@@ -8,3 +8,2 @@ // Type definitions for fluent-logger-node | ||
import { Transform } from 'winston-transport'; | ||
import { Writable } from 'stream'; | ||
@@ -28,2 +27,3 @@ | ||
security?: Security; | ||
internalLogger?: Logger; | ||
} | ||
@@ -43,8 +43,18 @@ | ||
interface Logger { | ||
info: LogFunction; | ||
error: LogFunction; | ||
[other: string]: any; | ||
} | ||
interface LogFunction { | ||
(message: any, data?: any, ...extra: any[]): any | ||
} | ||
type Timestamp = number | Date; | ||
type Callback = (err?: Error) => void; | ||
class FluentSender<T> { | ||
constructor(tagPrefix: string, options: Options); | ||
emit(data: T, callback?: Callback): void; | ||
@@ -59,8 +69,2 @@ emit(data: T, timestamp: Timestamp, callback?: Callback): void; | ||
class FluentTransport extends Transform { | ||
constructor(opt: Options); | ||
public log(info: string, callback: (err: Error, b: boolean) => void): any; | ||
} | ||
class InnerEventTime { | ||
@@ -83,8 +87,4 @@ epoch: number; | ||
let support: { | ||
winstonTransport: () => Constructable<FluentTransport, Options> | ||
}; | ||
let EventTime: InnerEventTime; | ||
let EventTime: InnerEventTime; | ||
function configure(tag: string, options: Options): void; | ||
@@ -91,0 +91,0 @@ function createFluentSender<T>(tag: string, options: Options): FluentSender<T>; |
@@ -40,2 +40,3 @@ 'use strict'; | ||
this._flushInterval = 0; | ||
this._messageQueueSizeLimit = options.messageQueueSizeLimit || 0; | ||
} else { | ||
@@ -163,7 +164,5 @@ this._sendQueue = new Map(); | ||
const packet = [tag, time, data]; | ||
let options = {}; | ||
const options = {}; | ||
if (this.requireAckResponse) { | ||
options = { | ||
chunk: crypto.randomBytes(16).toString('base64') | ||
}; | ||
options.chunk = crypto.randomBytes(16).toString('base64'); | ||
packet.push(options); | ||
@@ -191,4 +190,7 @@ } | ||
// Message mode | ||
let item = this._makePacketItem(tag, time, data); | ||
const item = this._makePacketItem(tag, time, data); | ||
item.callback = callback; | ||
if (this._messageQueueSizeLimit && this._sendQueue.length === this._messageQueueSizeLimit) { | ||
this._sendQueue.shift(); | ||
} | ||
this._sendQueue.push(item); | ||
@@ -200,8 +202,13 @@ } else { | ||
if (this._sendQueue.has(tag)) { | ||
let eventEntryData = this._sendQueue.get(tag); | ||
const eventEntryData = this._sendQueue.get(tag); | ||
eventEntryData.eventEntries.push(eventEntry); | ||
eventEntryData.size += eventEntry.length; | ||
if (callback) eventEntryData.callbacks.push(callback); | ||
} else { | ||
const callbacks = callback ? [callback] : []; | ||
this._sendQueue.set(tag, { eventEntries: [eventEntry], callbacks: callbacks }); | ||
this._sendQueue.set(tag, { | ||
eventEntries: [eventEntry], | ||
size: eventEntry.length, | ||
callbacks: callbacks | ||
}); | ||
} | ||
@@ -232,4 +239,4 @@ } | ||
_doConnect(callback) { | ||
let addHandlers = () => { | ||
let errorHandler = (err) => { | ||
const addHandlers = () => { | ||
const errorHandler = (err) => { | ||
if (this._socket) { | ||
@@ -268,3 +275,3 @@ this._disconnect(); | ||
} else { | ||
let postConnect = () => { | ||
const postConnect = () => { | ||
if (this.security.clientHostname && this.security.sharedKey !== null) { | ||
@@ -354,2 +361,6 @@ this._handshake(callback); | ||
this._flushSendQueueTimeoutId = setTimeout(() => { | ||
if (!this._socket) { | ||
this._flushingSendQueue = false; | ||
return; | ||
} | ||
this._doFlushSendQueue(); | ||
@@ -368,3 +379,3 @@ }, this._flushInterval); | ||
if (this._eventMode === 'Message') { | ||
let item = this._sendQueue.shift(); | ||
const item = this._sendQueue.shift(); | ||
if (item === undefined) { | ||
@@ -381,6 +392,6 @@ this._flushingSendQueue = false; | ||
} | ||
let first = this._sendQueue.entries().next().value; | ||
let tag = first[0]; | ||
let eventEntryData = first[1]; | ||
let entries = Buffer.concat(eventEntryData.eventEntries, this._sendQueueSize); | ||
const first = this._sendQueue.entries().next().value; | ||
const tag = first[0]; | ||
const eventEntryData = first[1]; | ||
let entries = Buffer.concat(eventEntryData.eventEntries, eventEntryData.size); | ||
let size = entries.length; | ||
@@ -395,3 +406,4 @@ this._sendQueue.delete(tag); | ||
size: size, | ||
compressed: this._compressed ? 'gzip' : 'text' | ||
compressed: this._compressed ? 'gzip' : 'text', | ||
eventEntryDataSize: eventEntryData.size | ||
}; | ||
@@ -404,3 +416,3 @@ const packet = msgpack.encode([tag, entries, options], { codec: codec }); | ||
_doWrite(packet, options, timeoutId, callbacks) { | ||
const sendQueueSize = this._sendQueueSize; | ||
const sendPacketSize = (options && options.eventEntryDataSize) || this._sendQueueSize; | ||
this._socket.write(packet, () => { | ||
@@ -419,7 +431,8 @@ if (this.requireAckResponse) { | ||
}); | ||
} else { // no error on ack | ||
callbacks.forEach((callback) => { | ||
callback && callback(); | ||
}); | ||
} | ||
this._sendQueueSize -= sendQueueSize; | ||
callbacks.forEach((callback) => { | ||
callback && callback(); | ||
}); | ||
this._sendQueueSize -= sendPacketSize; | ||
process.nextTick(() => { | ||
@@ -436,3 +449,3 @@ this._waitToWrite(); | ||
} else { | ||
this._sendQueueSize -= sendQueueSize; | ||
this._sendQueueSize -= sendPacketSize; | ||
callbacks.forEach((callback) => { | ||
@@ -464,6 +477,6 @@ callback && callback(); | ||
this.internalLogger.info('Fluentd will reconnect after ' + this.reconnectInterval / 1000 + ' seconds'); | ||
let timeoutId = setTimeout(() => { | ||
const timeoutId = setTimeout(() => { | ||
this.internalLogger.info('Fluentd is reconnecting...'); | ||
this._connect(() => { | ||
this._flushSendQueue() | ||
this._flushSendQueue(); | ||
this.internalLogger.info('Fluentd reconnection finished!!'); | ||
@@ -575,3 +588,3 @@ }); | ||
const dataArray = chunk.toString(defaultEncoding).split(/\n/); | ||
let next = () => { | ||
const next = () => { | ||
if (dataArray.length) { | ||
@@ -578,0 +591,0 @@ dataString += dataArray.shift(); |
@@ -51,2 +51,11 @@ 'use strict'; | ||
} | ||
_final(callback) { | ||
if (!this.sender) return process.nextTick(callback); | ||
this.sender.end(null, null, () => { | ||
this.sender = null; | ||
callback(); | ||
}); | ||
} | ||
}; |
{ | ||
"name": "fluent-logger", | ||
"version": "3.3.1", | ||
"version": "3.4.0", | ||
"main": "./lib/index.js", | ||
@@ -5,0 +5,0 @@ "scripts": { |
@@ -164,3 +164,3 @@ # fluent-logger for Node.js | ||
See also [How to enable TLS/SSL encryption](https://docs.fluentd.org/v1.0/articles/in_forward#how-to-enable-tls-encryption). | ||
See also [How to enable TLS/SSL encryption](https://docs.fluentd.org/input/forward#how-to-enable-tls-encryption). | ||
@@ -267,10 +267,14 @@ ### Mutual TLS Authentication | ||
var fluentTransport = require('fluent-logger').support.winstonTransport(); | ||
var fluent = new fluentTransport('mytag', config); | ||
var logger = winston.createLogger({ | ||
transports: [new fluentTransport('mytag', config), new (winston.transports.Console)()] | ||
transports: [fluent, new (winston.transports.Console)()] | ||
}); | ||
logger.on('logging', (transport, level, message, meta) => { | ||
if (meta.end && transport.sender && transport.sender.end) { | ||
transport.sender.end(); | ||
} | ||
logger.on('flush', () => { | ||
console.log("flush"); | ||
}) | ||
logger.on('finish', () => { | ||
console.log("finish"); | ||
fluent.sender.end("end", {}, () => {}) | ||
}); | ||
@@ -280,3 +284,4 @@ | ||
logger.info('this log record is sent to fluent daemon'); | ||
logger.info('end of log message', { end: true }); | ||
logger.info('end of log message'); | ||
logger.end(); | ||
``` | ||
@@ -368,2 +373,6 @@ | ||
**messageQueueSizeLimit** | ||
Maximum number of messages that can be in queue at the same time. If a new message is received and it overflows the queue then the oldest message will be removed before adding the new item. This option has effect only in `Message` mode. No limit by default. | ||
**security.clientHostname** | ||
@@ -370,0 +379,0 @@ |
@@ -319,2 +319,37 @@ 'use strict'; | ||
it('should send messages with different tags correctly in PackedForward', (done) => { | ||
runServer({}, serverOptions, (server, finish) => { | ||
const s1 = new FluentSender('debug', Object.assign({}, clientOptions, { | ||
port: server.port, | ||
eventMode: 'PackedForward' | ||
})); | ||
const emits = []; | ||
const total = 4; | ||
function emit(messageData) { | ||
emits.push((asyncDone) => { | ||
if (messageData.number === total) { // end | ||
s1.emit(`multi-${messageData.number}`, { text: messageData.text}, asyncDone); // wait for send | ||
} else { | ||
s1.emit(`multi-${messageData.number}`, { text: messageData.text}); | ||
asyncDone(); // run immediately do not wait for ack | ||
} | ||
}); | ||
} | ||
for (let i = 0; i <= total; i++) { | ||
emit({ number: i, text: `This is text No ${i}` }); | ||
} | ||
emits.push(() => { | ||
finish((data) => { | ||
expect(data.length).to.be.equal(5); | ||
data.forEach((element, index) => { | ||
expect(element.tag).to.be.equal(`debug.multi-${index}`); | ||
expect(element.data.text).to.be.equal(`This is text No ${index}`); | ||
}); | ||
done(); | ||
}); | ||
}); | ||
async.series(emits); | ||
}); | ||
}); | ||
[ | ||
@@ -916,2 +951,22 @@ { | ||
}); | ||
it('should limit messages stored in queue if server is not available', (done) => { | ||
runServer({}, serverOptions, (server, finish) => { | ||
finish(() => { | ||
const s = new FluentSender('debug', Object.assign({}, clientOptions, { | ||
port: server.port, | ||
messageQueueSizeLimit: 3 | ||
})); | ||
s.emit('message1', {}); | ||
s.emit('message2', {}); | ||
s.emit('message3', {}); | ||
s.emit('message4', {}); | ||
expect(s._sendQueue.length).to.be.equal(3); | ||
expect(s._sendQueue[0].tag).to.be.equal('debug.message2'); | ||
expect(s._sendQueue[1].tag).to.be.equal('debug.message3'); | ||
expect(s._sendQueue[2].tag).to.be.equal('debug.message4'); | ||
done(); | ||
}); | ||
}); | ||
}); | ||
}; | ||
@@ -918,0 +973,0 @@ |
2095
427
95099
20