fluentd-node
Advanced tools
Comparing version 0.0.14 to 0.0.15
@@ -52,17 +52,13 @@ import EventTime from "./event_time"; | ||
/** | ||
* The amount of time to wait between flush attempts on disconnection (after making at least one attempt) | ||
* If to wait for all pending events to finish sending to the fluent server before disconnecting | ||
* | ||
* Useful to wait if the fluent server is unavailable right when we're disconnecting | ||
* | ||
* Defaults to 0 | ||
* Defaults to false (does not wait) | ||
*/ | ||
flushDelay: number; | ||
waitForPending: boolean; | ||
/** | ||
* The number of times to attempt a flush on disconnection | ||
* The maximum amount of time to wait for pending events to finish sending to the fluent server before disconnecting | ||
* | ||
* Useful to empty the send queue before disconnecting | ||
* | ||
* Defaults to 1 | ||
* Defaults to 0 (no maximum time) | ||
*/ | ||
maxFlushAttempts: number; | ||
waitForPendingDelay: number; | ||
/** | ||
@@ -198,2 +194,3 @@ * The amount of time to wait before disconnecting the socket on disconnection | ||
private sendQueue; | ||
private emitQueue; | ||
private milliseconds; | ||
@@ -369,3 +366,3 @@ private retrier; | ||
*/ | ||
get queueLength(): number; | ||
get sendQueueLength(): number; | ||
/** | ||
@@ -377,2 +374,14 @@ * Returns whether or not the socket is writable | ||
get writable(): boolean; | ||
/** | ||
* Returns the number of events that have been queued, but haven't resolved yet | ||
* | ||
* This includes acknowledgements and retries if enabled. | ||
*/ | ||
get queueLength(): number; | ||
/** | ||
* Waits for all currently pending events to successfully resolve or reject | ||
* | ||
* @returns A Promise which resolves once all the pending events have successfully been emitted | ||
*/ | ||
waitForPending(): Promise<void>; | ||
} |
@@ -11,2 +11,3 @@ "use strict"; | ||
const event_retrier_1 = require("./event_retrier"); | ||
const util_1 = require("./util"); | ||
const defaultLimit = (limit) => { | ||
@@ -32,2 +33,3 @@ return { | ||
this.ackQueue = new Map(); | ||
this.emitQueue = new Set(); | ||
this.notFlushableLimitTimeoutId = null; | ||
@@ -83,4 +85,4 @@ this.nextFlushTimeoutId = null; | ||
this.disconnectOptions = { | ||
flushDelay: 0, | ||
maxFlushAttempts: 1, | ||
waitForPending: false, | ||
waitForPendingDelay: 0, | ||
socketDisconnectDelay: 0, | ||
@@ -178,8 +180,14 @@ ...(options.disconnect || {}), | ||
} | ||
let emitPromise; | ||
if (this.retrier !== null) { | ||
return this.retrier.retryPromise(() => this.pushEvent(tag, time, data)); | ||
emitPromise = this.retrier.retryPromise(() => this.pushEvent(tag, time, data)); | ||
} | ||
else { | ||
return this.pushEvent(tag, time, data); | ||
emitPromise = this.pushEvent(tag, time, data); | ||
} | ||
if (!this.emitQueue.has(emitPromise)) { | ||
this.emitQueue.add(emitPromise); | ||
emitPromise.finally(() => this.emitQueue.delete(emitPromise)); | ||
} | ||
return emitPromise; | ||
} | ||
@@ -236,13 +244,12 @@ /** | ||
try { | ||
let flushCount = 0; | ||
while (flushCount < this.disconnectOptions.maxFlushAttempts) { | ||
// Only delay after making one flush attempt | ||
if (flushCount > 0 && this.disconnectOptions.flushDelay > 0) { | ||
await new Promise(r => setTimeout(r, this.disconnectOptions.flushDelay)); | ||
// Flush before awaiting | ||
await this.flush(); | ||
if (this.disconnectOptions.waitForPending) { | ||
const flushPromise = this.waitForPending(); | ||
if (this.disconnectOptions.waitForPendingDelay > 0) { | ||
await util_1.awaitAtMost(flushPromise, this.disconnectOptions.waitForPendingDelay); | ||
} | ||
// Exit if flush returns false - queue is empty | ||
if (!(await this.flush())) { | ||
break; | ||
else { | ||
await flushPromise; | ||
} | ||
flushCount += 1; | ||
} | ||
@@ -252,3 +259,3 @@ } | ||
if (this.disconnectOptions.socketDisconnectDelay > 0) { | ||
await new Promise(r => setTimeout(r, this.disconnectOptions.socketDisconnectDelay)); | ||
await util_1.awaitTimeout(this.disconnectOptions.socketDisconnectDelay); | ||
} | ||
@@ -521,3 +528,3 @@ try { | ||
// i.e we have emptied PromiseJobs | ||
return new Promise(r => process.nextTick(r)); | ||
return util_1.awaitNextTick(); | ||
} | ||
@@ -529,3 +536,3 @@ /** | ||
*/ | ||
get queueLength() { | ||
get sendQueueLength() { | ||
return this.sendQueue.queueLength; | ||
@@ -541,4 +548,21 @@ } | ||
} | ||
/** | ||
* Returns the number of events that have been queued, but haven't resolved yet | ||
* | ||
* This includes acknowledgements and retries if enabled. | ||
*/ | ||
get queueLength() { | ||
return this.emitQueue.size; | ||
} | ||
/** | ||
* Waits for all currently pending events to successfully resolve or reject | ||
* | ||
* @returns A Promise which resolves once all the pending events have successfully been emitted | ||
*/ | ||
async waitForPending() { | ||
// Clone the emitQueue, to ignore emit calls made while waiting | ||
await Promise.allSettled(Array.from(this.emitQueue)); | ||
} | ||
} | ||
exports.FluentClient = FluentClient; | ||
//# sourceMappingURL=client.js.map |
@@ -6,2 +6,3 @@ "use strict"; | ||
const error_1 = require("./error"); | ||
const util_1 = require("./util"); | ||
/** | ||
@@ -78,17 +79,4 @@ * Provides retry logic for a promise, with failure cases | ||
retryAttempts += 1; | ||
let timeoutId = null; | ||
const waitPromise = new Promise(resolve => { | ||
timeoutId = setTimeout(() => { | ||
timeoutId = null; | ||
resolve(); | ||
}, retryInterval); | ||
}); | ||
try { | ||
await Promise.race([waitPromise, this.cancelWait.promise]); | ||
} | ||
finally { | ||
if (timeoutId) { | ||
clearTimeout(timeoutId); | ||
} | ||
} | ||
// Await the retry promise, but short circuiting is OK | ||
await util_1.awaitAtMost(this.cancelWait.promise, retryInterval); | ||
} | ||
@@ -95,0 +83,0 @@ // eslint-disable-next-line no-constant-condition |
{ | ||
"name": "fluentd-node", | ||
"version": "0.0.14", | ||
"version": "0.0.15", | ||
"description": "A fluent protocol implementation in node", | ||
@@ -30,3 +30,3 @@ "main": "./build/src/index.js", | ||
"engines": { | ||
"node": ">=12" | ||
"node": ">=12.9.0" | ||
}, | ||
@@ -33,0 +33,0 @@ "dependencies": { |
@@ -22,2 +22,3 @@ import { | ||
import {EventRetrier, EventRetryOptions} from "./event_retrier"; | ||
import {awaitAtMost, awaitNextTick, awaitTimeout} from "./util"; | ||
@@ -90,17 +91,13 @@ type AckData = { | ||
/** | ||
* The amount of time to wait between flush attempts on disconnection (after making at least one attempt) | ||
* If to wait for all pending events to finish sending to the fluent server before disconnecting | ||
* | ||
* Useful to wait if the fluent server is unavailable right when we're disconnecting | ||
* | ||
* Defaults to 0 | ||
* Defaults to false (does not wait) | ||
*/ | ||
flushDelay: number; | ||
waitForPending: boolean; | ||
/** | ||
* The number of times to attempt a flush on disconnection | ||
* The maximum amount of time to wait for pending events to finish sending to the fluent server before disconnecting | ||
* | ||
* Useful to empty the send queue before disconnecting | ||
* | ||
* Defaults to 1 | ||
* Defaults to 0 (no maximum time) | ||
*/ | ||
maxFlushAttempts: number; | ||
waitForPendingDelay: number; | ||
/** | ||
@@ -238,2 +235,3 @@ * The amount of time to wait before disconnecting the socket on disconnection | ||
private sendQueue: Queue; | ||
private emitQueue: Set<Promise<void>> = new Set(); | ||
private milliseconds: boolean; | ||
@@ -310,4 +308,4 @@ private retrier: EventRetrier | null; | ||
this.disconnectOptions = { | ||
flushDelay: 0, | ||
maxFlushAttempts: 1, | ||
waitForPending: false, | ||
waitForPendingDelay: 0, | ||
socketDisconnectDelay: 0, | ||
@@ -463,7 +461,15 @@ ...(options.disconnect || {}), | ||
} | ||
let emitPromise: Promise<void>; | ||
if (this.retrier !== null) { | ||
return this.retrier.retryPromise(() => this.pushEvent(tag, time, data)); | ||
emitPromise = this.retrier.retryPromise(() => | ||
this.pushEvent(tag, time, data) | ||
); | ||
} else { | ||
return this.pushEvent(tag, time, data); | ||
emitPromise = this.pushEvent(tag, time, data); | ||
} | ||
if (!this.emitQueue.has(emitPromise)) { | ||
this.emitQueue.add(emitPromise); | ||
emitPromise.finally(() => this.emitQueue.delete(emitPromise)); | ||
} | ||
return emitPromise; | ||
} | ||
@@ -528,21 +534,18 @@ | ||
try { | ||
let flushCount = 0; | ||
while (flushCount < this.disconnectOptions.maxFlushAttempts) { | ||
// Only delay after making one flush attempt | ||
if (flushCount > 0 && this.disconnectOptions.flushDelay > 0) { | ||
await new Promise(r => | ||
setTimeout(r, this.disconnectOptions.flushDelay) | ||
// Flush before awaiting | ||
await this.flush(); | ||
if (this.disconnectOptions.waitForPending) { | ||
const flushPromise = this.waitForPending(); | ||
if (this.disconnectOptions.waitForPendingDelay > 0) { | ||
await awaitAtMost( | ||
flushPromise, | ||
this.disconnectOptions.waitForPendingDelay | ||
); | ||
} else { | ||
await flushPromise; | ||
} | ||
// Exit if flush returns false - queue is empty | ||
if (!(await this.flush())) { | ||
break; | ||
} | ||
flushCount += 1; | ||
} | ||
} finally { | ||
if (this.disconnectOptions.socketDisconnectDelay > 0) { | ||
await new Promise(r => | ||
setTimeout(r, this.disconnectOptions.socketDisconnectDelay) | ||
); | ||
await awaitTimeout(this.disconnectOptions.socketDisconnectDelay); | ||
} | ||
@@ -845,3 +848,3 @@ try { | ||
// i.e we have emptied PromiseJobs | ||
return new Promise(r => process.nextTick(r)); | ||
return awaitNextTick(); | ||
} | ||
@@ -854,3 +857,3 @@ | ||
*/ | ||
get queueLength(): number { | ||
get sendQueueLength(): number { | ||
return this.sendQueue.queueLength; | ||
@@ -867,2 +870,21 @@ } | ||
} | ||
/** | ||
* Returns the number of events that have been queued, but haven't resolved yet | ||
* | ||
* This includes acknowledgements and retries if enabled. | ||
*/ | ||
get queueLength(): number { | ||
return this.emitQueue.size; | ||
} | ||
/** | ||
* Waits for all currently pending events to successfully resolve or reject | ||
* | ||
* @returns A Promise which resolves once all the pending events have successfully been emitted | ||
*/ | ||
public async waitForPending(): Promise<void> { | ||
// Clone the emitQueue, to ignore emit calls made while waiting | ||
await Promise.allSettled(Array.from(this.emitQueue)); | ||
} | ||
} |
import * as pDefer from "p-defer"; | ||
import {DroppedError, RetryShutdownError} from "./error"; | ||
import {awaitAtMost} from "./util"; | ||
@@ -133,17 +134,4 @@ /** | ||
let timeoutId: NodeJS.Timeout | null = null; | ||
const waitPromise = new Promise<void>(resolve => { | ||
timeoutId = setTimeout(() => { | ||
timeoutId = null; | ||
resolve(); | ||
}, retryInterval); | ||
}); | ||
try { | ||
await Promise.race([waitPromise, this.cancelWait.promise]); | ||
} finally { | ||
if (timeoutId) { | ||
clearTimeout(timeoutId); | ||
} | ||
} | ||
// Await the retry promise, but short circuiting is OK | ||
await awaitAtMost(this.cancelWait.promise, retryInterval); | ||
} | ||
@@ -150,0 +138,0 @@ // eslint-disable-next-line no-constant-condition |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
315747
63
7383