fluentd-node
Advanced tools
Comparing version 0.0.13 to 0.0.14
@@ -97,8 +97,2 @@ import EventTime from "./event_time"; | ||
/** | ||
* How long to wait to flush the queued events | ||
* | ||
* Defaults to 0 | ||
*/ | ||
flushInterval?: number; | ||
/** | ||
* The timestamp resolution of events passed to FluentD. | ||
@@ -113,10 +107,30 @@ * | ||
/** | ||
* How long to wait to flush the queued events | ||
* | ||
* If this is 0, we don't wait at all | ||
* | ||
* Defaults to 0 | ||
*/ | ||
flushInterval?: number; | ||
/** | ||
* The limit at which the queue needs to be flushed. | ||
* | ||
* Useful with flushInterval to limit the size of the queue | ||
* Used when flushInterval is > 0 to say "flush after flushInterval ms, or when the queue reaches X size" | ||
* | ||
* See the subtype for defaults | ||
*/ | ||
sendQueueFlushLimit?: Partial<SendQueueLimit>; | ||
sendQueueIntervalFlushLimit?: Partial<SendQueueLimit>; | ||
/** | ||
* The limit at which we flush synchronously. By default, we flush asynchronously, | ||
* which can be bad if we're sending 30k+ events at a time. | ||
* | ||
* This sets a size limit at which we flush synchronously within emit(), which makes | ||
* sure we're flushing as quickly as possible | ||
* | ||
* Defaults to null (no limit) | ||
* | ||
* See the subtype for defaults | ||
*/ | ||
sendQueueSyncFlushLimit?: Partial<SendQueueLimit>; | ||
/** | ||
* The limit at which we start dropping events | ||
@@ -189,3 +203,4 @@ * | ||
private flushInterval; | ||
private sendQueueFlushLimit; | ||
private sendQueueIntervalFlushLimit; | ||
private sendQueueSyncFlushLimit; | ||
private sendQueueMaxLimit; | ||
@@ -289,8 +304,9 @@ private sendQueueNotFlushableLimit; | ||
/** | ||
* Flushes to the socket internally | ||
* Flushes to the socket synchronously | ||
* | ||
* Managed by `flush` to not be called multiple times | ||
* Prefer calling `.flush` which will flush on the next tick, allowing events from this tick to queue up. | ||
* | ||
* @returns true if there are more events in the queue to flush, false otherwise | ||
*/ | ||
private innerFlush; | ||
syncFlush(): boolean; | ||
/** | ||
@@ -318,2 +334,7 @@ * Flushes the event queue. Queues up the flushes for the next tick, preventing multiple flushes at the same time. | ||
/** | ||
* Checks if the sendQueue hits this limit | ||
* @param limit the limit to check | ||
*/ | ||
private shouldLimit; | ||
/** | ||
* Send the front item of the queue to the socket | ||
@@ -320,0 +341,0 @@ * @returns True if there was something to send |
@@ -11,2 +11,9 @@ "use strict"; | ||
const event_retrier_1 = require("./event_retrier"); | ||
const defaultLimit = (limit) => { | ||
return { | ||
size: +Infinity, | ||
length: +Infinity, | ||
...(limit || {}), | ||
}; | ||
}; | ||
/** | ||
@@ -60,18 +67,13 @@ * A Fluent Client. Connects to a FluentD server using the [Forward protocol](https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1). | ||
this.flushInterval = options.flushInterval || 0; | ||
this.sendQueueFlushLimit = { | ||
size: +Infinity, | ||
length: +Infinity, | ||
...(options.sendQueueFlushLimit || {}), | ||
}; | ||
this.sendQueueMaxLimit = { | ||
size: +Infinity, | ||
length: +Infinity, | ||
...(options.sendQueueMaxLimit || {}), | ||
}; | ||
this.sendQueueSyncFlushLimit = options.sendQueueSyncFlushLimit | ||
? defaultLimit(options.sendQueueSyncFlushLimit) | ||
: null; | ||
this.sendQueueIntervalFlushLimit = options.sendQueueIntervalFlushLimit | ||
? defaultLimit(options.sendQueueIntervalFlushLimit) | ||
: null; | ||
this.sendQueueMaxLimit = options.sendQueueMaxLimit | ||
? defaultLimit(options.sendQueueMaxLimit) | ||
: null; | ||
this.sendQueueNotFlushableLimit = options.sendQueueNotFlushableLimit | ||
? { | ||
size: +Infinity, | ||
length: +Infinity, | ||
...options.sendQueueNotFlushableLimit, | ||
} | ||
? defaultLimit(options.sendQueueNotFlushableLimit) | ||
: null; | ||
@@ -194,3 +196,5 @@ this.sendQueueNotFlushableLimitDelay = | ||
const promise = this.sendQueue.push(tag, time, data); | ||
this.dropLimit(this.sendQueueMaxLimit); | ||
if (this.sendQueueMaxLimit) { | ||
this.dropLimit(this.sendQueueMaxLimit); | ||
} | ||
this.maybeFlush(); | ||
@@ -284,8 +288,9 @@ return promise; | ||
/** | ||
* Flushes to the socket internally | ||
* Flushes to the socket synchronously | ||
* | ||
* Managed by `flush` to not be called multiple times | ||
* Prefer calling `.flush` which will flush on the next tick, allowing events from this tick to queue up. | ||
* | ||
* @returns true if there are more events in the queue to flush, false otherwise | ||
*/ | ||
innerFlush() { | ||
syncFlush() { | ||
if (this.sendQueue.queueLength === 0) { | ||
@@ -322,3 +327,3 @@ return false; | ||
this.willFlushNextTick = null; | ||
resolve(this.innerFlush()); | ||
resolve(this.syncFlush()); | ||
})); | ||
@@ -366,15 +371,12 @@ } | ||
} | ||
// flush on an interval | ||
if (this.flushInterval > 0) { | ||
const limit = this.sendQueueFlushLimit; | ||
if (this.sendQueue.queueSize !== -1 && | ||
this.sendQueue.queueSize >= limit.size) { | ||
// If the queue has hit the memory flush limit | ||
// If we've hit a blocking limit | ||
if (this.sendQueueSyncFlushLimit && | ||
this.shouldLimit(this.sendQueueSyncFlushLimit)) { | ||
this.syncFlush(); | ||
} | ||
else if (this.flushInterval > 0) { | ||
if (this.sendQueueIntervalFlushLimit && | ||
this.shouldLimit(this.sendQueueIntervalFlushLimit)) { | ||
this.flush(); | ||
} | ||
else if (this.sendQueue.queueLength !== -1 && | ||
this.sendQueue.queueLength >= limit.length) { | ||
// If the queue has hit the length flush limit | ||
this.flush(); | ||
} | ||
else if (this.nextFlushTimeoutId === null) { | ||
@@ -413,2 +415,19 @@ // Otherwise, schedule the next flush interval | ||
/** | ||
* Checks if the sendQueue hits this limit | ||
* @param limit the limit to check | ||
*/ | ||
shouldLimit(limit) { | ||
if (this.sendQueue.queueSize !== -1 && | ||
this.sendQueue.queueSize >= limit.size) { | ||
// If the queue has hit the memory flush limit | ||
return true; | ||
} | ||
else if (this.sendQueue.queueLength !== -1 && | ||
this.sendQueue.queueLength >= limit.length) { | ||
// If the queue has hit the length flush limit | ||
return true; | ||
} | ||
return false; | ||
} | ||
/** | ||
* Send the front item of the queue to the socket | ||
@@ -415,0 +434,0 @@ * @returns True if there was something to send |
{ | ||
"name": "fluentd-node", | ||
"version": "0.0.13", | ||
"version": "0.0.14", | ||
"description": "A fluent protocol implementation in node", | ||
@@ -5,0 +5,0 @@ "main": "./build/src/index.js", |
@@ -79,2 +79,9 @@ import { | ||
}; | ||
const defaultLimit = (limit?: Partial<SendQueueLimit>): SendQueueLimit => { | ||
return { | ||
size: +Infinity, | ||
length: +Infinity, | ||
...(limit || {}), | ||
}; | ||
}; | ||
@@ -129,8 +136,2 @@ export type DisconnectOptions = { | ||
/** | ||
* How long to wait to flush the queued events | ||
* | ||
* Defaults to 0 | ||
*/ | ||
flushInterval?: number; | ||
/** | ||
* The timestamp resolution of events passed to FluentD. | ||
@@ -145,11 +146,30 @@ * | ||
/** | ||
* How long to wait to flush the queued events | ||
* | ||
* If this is 0, we don't wait at all | ||
* | ||
* Defaults to 0 | ||
*/ | ||
flushInterval?: number; | ||
/** | ||
* The limit at which the queue needs to be flushed. | ||
* | ||
* Useful with flushInterval to limit the size of the queue | ||
* Used when flushInterval is > 0 to say "flush after flushInterval ms, or when the queue reaches X size" | ||
* | ||
* See the subtype for defaults | ||
*/ | ||
sendQueueFlushLimit?: Partial<SendQueueLimit>; | ||
sendQueueIntervalFlushLimit?: Partial<SendQueueLimit>; | ||
/** | ||
* The limit at which we flush synchronously. By default, we flush asynchronously, | ||
* which can be bad if we're sending 30k+ events at a time. | ||
* | ||
* This sets a size limit at which we flush synchronously within emit(), which makes | ||
* sure we're flushing as quickly as possible | ||
* | ||
* Defaults to null (no limit) | ||
* | ||
* See the subtype for defaults | ||
*/ | ||
sendQueueSyncFlushLimit?: Partial<SendQueueLimit>; | ||
/** | ||
* The limit at which we start dropping events | ||
@@ -225,4 +245,5 @@ * | ||
private flushInterval: number; | ||
private sendQueueFlushLimit: SendQueueLimit; | ||
private sendQueueMaxLimit: SendQueueLimit; | ||
private sendQueueIntervalFlushLimit: SendQueueLimit | null; | ||
private sendQueueSyncFlushLimit: SendQueueLimit | null; | ||
private sendQueueMaxLimit: SendQueueLimit | null; | ||
private sendQueueNotFlushableLimit: SendQueueLimit | null; | ||
@@ -274,18 +295,13 @@ private sendQueueNotFlushableLimitDelay: number; | ||
this.flushInterval = options.flushInterval || 0; | ||
this.sendQueueFlushLimit = { | ||
size: +Infinity, | ||
length: +Infinity, | ||
...(options.sendQueueFlushLimit || {}), | ||
}; | ||
this.sendQueueMaxLimit = { | ||
size: +Infinity, | ||
length: +Infinity, | ||
...(options.sendQueueMaxLimit || {}), | ||
}; | ||
this.sendQueueSyncFlushLimit = options.sendQueueSyncFlushLimit | ||
? defaultLimit(options.sendQueueSyncFlushLimit) | ||
: null; | ||
this.sendQueueIntervalFlushLimit = options.sendQueueIntervalFlushLimit | ||
? defaultLimit(options.sendQueueIntervalFlushLimit) | ||
: null; | ||
this.sendQueueMaxLimit = options.sendQueueMaxLimit | ||
? defaultLimit(options.sendQueueMaxLimit) | ||
: null; | ||
this.sendQueueNotFlushableLimit = options.sendQueueNotFlushableLimit | ||
? { | ||
size: +Infinity, | ||
length: +Infinity, | ||
...options.sendQueueNotFlushableLimit, | ||
} | ||
? defaultLimit(options.sendQueueNotFlushableLimit) | ||
: null; | ||
@@ -471,3 +487,5 @@ this.sendQueueNotFlushableLimitDelay = | ||
const promise = this.sendQueue.push(tag, time, data); | ||
this.dropLimit(this.sendQueueMaxLimit); | ||
if (this.sendQueueMaxLimit) { | ||
this.dropLimit(this.sendQueueMaxLimit); | ||
} | ||
this.maybeFlush(); | ||
@@ -566,8 +584,9 @@ return promise; | ||
/** | ||
* Flushes to the socket internally | ||
* Flushes to the socket synchronously | ||
* | ||
* Managed by `flush` to not be called multiple times | ||
* Prefer calling `.flush` which will flush on the next tick, allowing events from this tick to queue up. | ||
* | ||
* @returns true if there are more events in the queue to flush, false otherwise | ||
*/ | ||
private innerFlush(): boolean { | ||
public syncFlush(): boolean { | ||
if (this.sendQueue.queueLength === 0) { | ||
@@ -609,3 +628,3 @@ return false; | ||
this.willFlushNextTick = null; | ||
resolve(this.innerFlush()); | ||
resolve(this.syncFlush()); | ||
}) | ||
@@ -655,17 +674,14 @@ ); | ||
} | ||
// flush on an interval | ||
if (this.flushInterval > 0) { | ||
const limit = this.sendQueueFlushLimit; | ||
// If we've hit a blocking limit | ||
if ( | ||
this.sendQueueSyncFlushLimit && | ||
this.shouldLimit(this.sendQueueSyncFlushLimit) | ||
) { | ||
this.syncFlush(); | ||
} else if (this.flushInterval > 0) { | ||
if ( | ||
this.sendQueue.queueSize !== -1 && | ||
this.sendQueue.queueSize >= limit.size | ||
this.sendQueueIntervalFlushLimit && | ||
this.shouldLimit(this.sendQueueIntervalFlushLimit) | ||
) { | ||
// If the queue has hit the memory flush limit | ||
this.flush(); | ||
} else if ( | ||
this.sendQueue.queueLength !== -1 && | ||
this.sendQueue.queueLength >= limit.length | ||
) { | ||
// If the queue has hit the length flush limit | ||
this.flush(); | ||
} else if (this.nextFlushTimeoutId === null) { | ||
@@ -709,2 +725,23 @@ // Otherwise, schedule the next flush interval | ||
/** | ||
* Checks if the sendQueue hits this limit | ||
* @param limit the limit to check | ||
*/ | ||
private shouldLimit(limit: SendQueueLimit): boolean { | ||
if ( | ||
this.sendQueue.queueSize !== -1 && | ||
this.sendQueue.queueSize >= limit.size | ||
) { | ||
// If the queue has hit the memory flush limit | ||
return true; | ||
} else if ( | ||
this.sendQueue.queueLength !== -1 && | ||
this.sendQueue.queueLength >= limit.length | ||
) { | ||
// If the queue has hit the length flush limit | ||
return true; | ||
} | ||
return false; | ||
} | ||
/** | ||
* Send the front item of the queue to the socket | ||
@@ -711,0 +748,0 @@ * @returns True if there was something to send |
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
311661
7302