Socket
Socket
Sign inDemoInstall

fluent-logger

Package Overview
Dependencies
5
Maintainers
4
Versions
46
Alerts
File Explorer

Advanced tools

Install Socket

Detect and block malicious and high-risk dependencies

Install

Comparing version 3.3.1 to 3.4.0

12

CHANGELOG.md
# v3.x
## v3.4.0 - 2019-11-20
### Improvements
* Gracefully free resources on `.end()` #144
* Update type definitions for TypeScript #145, #147
* Add new option messageQueueSizeLimit #152
### Fixes
* Fix packets on multiple tags get corrput and multiple calls of callbacks on error #155
## v3.3.1 - 2019-02-19

@@ -4,0 +16,0 @@

28

lib/index.d.ts

@@ -8,3 +8,2 @@ // Type definitions for fluent-logger-node

import { Transform } from 'winston-transport';
import { Writable } from 'stream';

@@ -28,2 +27,3 @@

security?: Security;
internalLogger?: Logger;
}

@@ -43,8 +43,18 @@

interface Logger {
info: LogFunction;
error: LogFunction;
[other: string]: any;
}
interface LogFunction {
(message: any, data?: any, ...extra: any[]): any
}
type Timestamp = number | Date;
type Callback = (err?: Error) => void;
class FluentSender<T> {
constructor(tagPrefix: string, options: Options);
emit(data: T, callback?: Callback): void;

@@ -59,8 +69,2 @@ emit(data: T, timestamp: Timestamp, callback?: Callback): void;

class FluentTransport extends Transform {
constructor(opt: Options);
public log(info: string, callback: (err: Error, b: boolean) => void): any;
}
class InnerEventTime {

@@ -83,8 +87,4 @@ epoch: number;

let support: {
winstonTransport: () => Constructable<FluentTransport, Options>
};
let EventTime: InnerEventTime;
let EventTime: InnerEventTime;
function configure(tag: string, options: Options): void;

@@ -91,0 +91,0 @@ function createFluentSender<T>(tag: string, options: Options): FluentSender<T>;

@@ -40,2 +40,3 @@ 'use strict';

this._flushInterval = 0;
this._messageQueueSizeLimit = options.messageQueueSizeLimit || 0;
} else {

@@ -163,7 +164,5 @@ this._sendQueue = new Map();

const packet = [tag, time, data];
let options = {};
const options = {};
if (this.requireAckResponse) {
options = {
chunk: crypto.randomBytes(16).toString('base64')
};
options.chunk = crypto.randomBytes(16).toString('base64');
packet.push(options);

@@ -191,4 +190,7 @@ }

// Message mode
let item = this._makePacketItem(tag, time, data);
const item = this._makePacketItem(tag, time, data);
item.callback = callback;
if (this._messageQueueSizeLimit && this._sendQueue.length === this._messageQueueSizeLimit) {
this._sendQueue.shift();
}
this._sendQueue.push(item);

@@ -200,8 +202,13 @@ } else {

if (this._sendQueue.has(tag)) {
let eventEntryData = this._sendQueue.get(tag);
const eventEntryData = this._sendQueue.get(tag);
eventEntryData.eventEntries.push(eventEntry);
eventEntryData.size += eventEntry.length;
if (callback) eventEntryData.callbacks.push(callback);
} else {
const callbacks = callback ? [callback] : [];
this._sendQueue.set(tag, { eventEntries: [eventEntry], callbacks: callbacks });
this._sendQueue.set(tag, {
eventEntries: [eventEntry],
size: eventEntry.length,
callbacks: callbacks
});
}

@@ -232,4 +239,4 @@ }

_doConnect(callback) {
let addHandlers = () => {
let errorHandler = (err) => {
const addHandlers = () => {
const errorHandler = (err) => {
if (this._socket) {

@@ -268,3 +275,3 @@ this._disconnect();

} else {
let postConnect = () => {
const postConnect = () => {
if (this.security.clientHostname && this.security.sharedKey !== null) {

@@ -354,2 +361,6 @@ this._handshake(callback);

this._flushSendQueueTimeoutId = setTimeout(() => {
if (!this._socket) {
this._flushingSendQueue = false;
return;
}
this._doFlushSendQueue();

@@ -368,3 +379,3 @@ }, this._flushInterval);

if (this._eventMode === 'Message') {
let item = this._sendQueue.shift();
const item = this._sendQueue.shift();
if (item === undefined) {

@@ -381,6 +392,6 @@ this._flushingSendQueue = false;

}
let first = this._sendQueue.entries().next().value;
let tag = first[0];
let eventEntryData = first[1];
let entries = Buffer.concat(eventEntryData.eventEntries, this._sendQueueSize);
const first = this._sendQueue.entries().next().value;
const tag = first[0];
const eventEntryData = first[1];
let entries = Buffer.concat(eventEntryData.eventEntries, eventEntryData.size);
let size = entries.length;

@@ -395,3 +406,4 @@ this._sendQueue.delete(tag);

size: size,
compressed: this._compressed ? 'gzip' : 'text'
compressed: this._compressed ? 'gzip' : 'text',
eventEntryDataSize: eventEntryData.size
};

@@ -404,3 +416,3 @@ const packet = msgpack.encode([tag, entries, options], { codec: codec });

_doWrite(packet, options, timeoutId, callbacks) {
const sendQueueSize = this._sendQueueSize;
const sendPacketSize = (options && options.eventEntryDataSize) || this._sendQueueSize;
this._socket.write(packet, () => {

@@ -419,7 +431,8 @@ if (this.requireAckResponse) {

});
} else { // no error on ack
callbacks.forEach((callback) => {
callback && callback();
});
}
this._sendQueueSize -= sendQueueSize;
callbacks.forEach((callback) => {
callback && callback();
});
this._sendQueueSize -= sendPacketSize;
process.nextTick(() => {

@@ -436,3 +449,3 @@ this._waitToWrite();

} else {
this._sendQueueSize -= sendQueueSize;
this._sendQueueSize -= sendPacketSize;
callbacks.forEach((callback) => {

@@ -464,6 +477,6 @@ callback && callback();

this.internalLogger.info('Fluentd will reconnect after ' + this.reconnectInterval / 1000 + ' seconds');
let timeoutId = setTimeout(() => {
const timeoutId = setTimeout(() => {
this.internalLogger.info('Fluentd is reconnecting...');
this._connect(() => {
this._flushSendQueue()
this._flushSendQueue();
this.internalLogger.info('Fluentd reconnection finished!!');

@@ -575,3 +588,3 @@ });

const dataArray = chunk.toString(defaultEncoding).split(/\n/);
let next = () => {
const next = () => {
if (dataArray.length) {

@@ -578,0 +591,0 @@ dataString += dataArray.shift();

@@ -51,2 +51,11 @@ 'use strict';

}
_final(callback) {
if (!this.sender) return process.nextTick(callback);
this.sender.end(null, null, () => {
this.sender = null;
callback();
});
}
};
{
"name": "fluent-logger",
"version": "3.3.1",
"version": "3.4.0",
"main": "./lib/index.js",

@@ -5,0 +5,0 @@ "scripts": {

@@ -164,3 +164,3 @@ # fluent-logger for Node.js

See also [How to enable TLS/SSL encryption](https://docs.fluentd.org/v1.0/articles/in_forward#how-to-enable-tls-encryption).
See also [How to enable TLS/SSL encryption](https://docs.fluentd.org/input/forward#how-to-enable-tls-encryption).

@@ -267,10 +267,14 @@ ### Mutual TLS Authentication

var fluentTransport = require('fluent-logger').support.winstonTransport();
var fluent = new fluentTransport('mytag', config);
var logger = winston.createLogger({
transports: [new fluentTransport('mytag', config), new (winston.transports.Console)()]
transports: [fluent, new (winston.transports.Console)()]
});
logger.on('logging', (transport, level, message, meta) => {
if (meta.end && transport.sender && transport.sender.end) {
transport.sender.end();
}
logger.on('flush', () => {
console.log("flush");
})
logger.on('finish', () => {
console.log("finish");
fluent.sender.end("end", {}, () => {})
});

@@ -280,3 +284,4 @@

logger.info('this log record is sent to fluent daemon');
logger.info('end of log message', { end: true });
logger.info('end of log message');
logger.end();
```

@@ -368,2 +373,6 @@

**messageQueueSizeLimit**
Maximum number of messages that can be in queue at the same time. If a new message is received and it overflows the queue then the oldest message will be removed before adding the new item. This option has effect only in `Message` mode. No limit by default.
**security.clientHostname**

@@ -370,0 +379,0 @@

@@ -319,2 +319,37 @@ 'use strict';

it('should send messages with different tags correctly in PackedForward', (done) => {
runServer({}, serverOptions, (server, finish) => {
const s1 = new FluentSender('debug', Object.assign({}, clientOptions, {
port: server.port,
eventMode: 'PackedForward'
}));
const emits = [];
const total = 4;
function emit(messageData) {
emits.push((asyncDone) => {
if (messageData.number === total) { // end
s1.emit(`multi-${messageData.number}`, { text: messageData.text}, asyncDone); // wait for send
} else {
s1.emit(`multi-${messageData.number}`, { text: messageData.text});
asyncDone(); // run immediately do not wait for ack
}
});
}
for (let i = 0; i <= total; i++) {
emit({ number: i, text: `This is text No ${i}` });
}
emits.push(() => {
finish((data) => {
expect(data.length).to.be.equal(5);
data.forEach((element, index) => {
expect(element.tag).to.be.equal(`debug.multi-${index}`);
expect(element.data.text).to.be.equal(`This is text No ${index}`);
});
done();
});
});
async.series(emits);
});
});
[

@@ -916,2 +951,22 @@ {

});
it('should limit messages stored in queue if server is not available', (done) => {
runServer({}, serverOptions, (server, finish) => {
finish(() => {
const s = new FluentSender('debug', Object.assign({}, clientOptions, {
port: server.port,
messageQueueSizeLimit: 3
}));
s.emit('message1', {});
s.emit('message2', {});
s.emit('message3', {});
s.emit('message4', {});
expect(s._sendQueue.length).to.be.equal(3);
expect(s._sendQueue[0].tag).to.be.equal('debug.message2');
expect(s._sendQueue[1].tag).to.be.equal('debug.message3');
expect(s._sendQueue[2].tag).to.be.equal('debug.message4');
done();
});
});
});
};

@@ -918,0 +973,0 @@

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap

Packages

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc