fluentd-node
Advanced tools
Comparing version 0.0.7 to 0.0.8
@@ -218,3 +218,11 @@ "use strict"; | ||
finally { | ||
this.clearAcks(); | ||
// We want to client to be in a state where nothing is pending that isn't in the sendQueue, now that the socket is unflushable. | ||
// This means nothing is pending acknowledgemnets, and nothing is pending to retry. | ||
// As a result, we can drop all the pending events, or send them once we're connected again | ||
// Drop the acks first, as they can queue up retries which we need to short circuit | ||
await this.clearAcks(); | ||
if (this.retrier) { | ||
// Short circuit all retries, so they requeue immediately | ||
await this.retrier.shortCircuit(); | ||
} | ||
} | ||
@@ -430,8 +438,11 @@ } | ||
*/ | ||
clearAcks() { | ||
async clearAcks() { | ||
for (const data of this.ackQueue.values()) { | ||
clearTimeout(data.timeoutId); | ||
data.deferred.reject(new error_1.ShutdownError("ack queue emptied")); | ||
data.deferred.reject(new error_1.AckShutdownError("ack queue emptied")); | ||
} | ||
this.ackQueue = new Map(); | ||
// We want this to resolve on the next tick, once handlers depending on the ack result have fully resolved | ||
// i.e we have emptied PromiseJobs | ||
return new Promise(r => process.nextTick(r)); | ||
} | ||
@@ -438,0 +449,0 @@ } |
@@ -47,3 +47,3 @@ declare class BaseError extends Error { | ||
/** | ||
* Thrown when an event is dropped for any reason | ||
* Thrown when an event is dropped from the queue for any reason | ||
*/ | ||
@@ -54,21 +54,31 @@ export declare class DroppedError extends BaseError { | ||
/** | ||
* Thrown when an event is dropped as a result of clearing out the queue | ||
* Thrown when an event is dropped as a result of clearing out the queue on shutdown | ||
* | ||
* Extends DroppedError since the message was dropped from the queue | ||
*/ | ||
export declare class ClearDroppedError extends DroppedError { | ||
export declare class QueueShutdownError extends DroppedError { | ||
constructor(message: string); | ||
} | ||
/** | ||
* Thrown when authentication fails on either the client or the server | ||
* Thrown into the ack queue on shutdown, terminating all promises waiting for an ack | ||
*/ | ||
export declare class AuthError extends BaseError { | ||
export declare class AckShutdownError extends BaseError { | ||
constructor(message: string); | ||
} | ||
/** | ||
* Thrown into the ack queue on shutdown, terminating all promises waiting for an ack | ||
* Thrown when the EventRetrier is shut down | ||
*/ | ||
export declare class ShutdownError extends BaseError { | ||
export declare class RetryShutdownError extends BaseError { | ||
constructor(message: string); | ||
} | ||
/** | ||
* Thrown when authentication fails on either the client or the server | ||
*/ | ||
export declare class AuthError extends BaseError { | ||
constructor(message: string); | ||
} | ||
/** | ||
* Thrown when the shared key doesn't match | ||
* | ||
* Extends AuthError, since the key was incorrect | ||
*/ | ||
@@ -75,0 +85,0 @@ export declare class SharedKeyMismatchError extends AuthError { |
"use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
exports.FatalSocketError = exports.DecodeError = exports.SharedKeyMismatchError = exports.ShutdownError = exports.AuthError = exports.ClearDroppedError = exports.DroppedError = exports.SocketNotWritableError = exports.SocketTimeoutError = exports.DataTypeError = exports.AckTimeoutError = exports.UnexpectedMessageError = exports.MissingTagError = exports.ConfigError = void 0; | ||
exports.FatalSocketError = exports.DecodeError = exports.SharedKeyMismatchError = exports.AuthError = exports.RetryShutdownError = exports.AckShutdownError = exports.QueueShutdownError = exports.DroppedError = exports.SocketNotWritableError = exports.SocketTimeoutError = exports.DataTypeError = exports.AckTimeoutError = exports.UnexpectedMessageError = exports.MissingTagError = exports.ConfigError = void 0; | ||
class BaseError extends Error { | ||
@@ -76,3 +76,3 @@ constructor(message) { | ||
/** | ||
* Thrown when an event is dropped for any reason | ||
* Thrown when an event is dropped from the queue for any reason | ||
*/ | ||
@@ -86,5 +86,7 @@ class DroppedError extends BaseError { | ||
/** | ||
* Thrown when an event is dropped as a result of clearing out the queue | ||
* Thrown when an event is dropped as a result of clearing out the queue on shutdown | ||
* | ||
* Extends DroppedError since the message was dropped from the queue | ||
*/ | ||
class ClearDroppedError extends DroppedError { | ||
class QueueShutdownError extends DroppedError { | ||
constructor(message) { | ||
@@ -94,7 +96,7 @@ super(message); | ||
} | ||
exports.ClearDroppedError = ClearDroppedError; | ||
exports.QueueShutdownError = QueueShutdownError; | ||
/** | ||
* Thrown when authentication fails on either the client or the server | ||
* Thrown into the ack queue on shutdown, terminating all promises waiting for an ack | ||
*/ | ||
class AuthError extends BaseError { | ||
class AckShutdownError extends BaseError { | ||
constructor(message) { | ||
@@ -104,7 +106,7 @@ super(message); | ||
} | ||
exports.AuthError = AuthError; | ||
exports.AckShutdownError = AckShutdownError; | ||
/** | ||
* Thrown into the ack queue on shutdown, terminating all promises waiting for an ack | ||
* Thrown when the EventRetrier is shut down | ||
*/ | ||
class ShutdownError extends BaseError { | ||
class RetryShutdownError extends BaseError { | ||
constructor(message) { | ||
@@ -114,5 +116,16 @@ super(message); | ||
} | ||
exports.ShutdownError = ShutdownError; | ||
exports.RetryShutdownError = RetryShutdownError; | ||
/** | ||
* Thrown when authentication fails on either the client or the server | ||
*/ | ||
class AuthError extends BaseError { | ||
constructor(message) { | ||
super(message); | ||
} | ||
} | ||
exports.AuthError = AuthError; | ||
/** | ||
* Thrown when the shared key doesn't match | ||
* | ||
* Extends AuthError, since the key was incorrect | ||
*/ | ||
@@ -119,0 +132,0 @@ class SharedKeyMismatchError extends AuthError { |
@@ -46,4 +46,16 @@ /** | ||
private options; | ||
private cancelWait; | ||
constructor(opts?: Partial<EventRetryOptions>); | ||
/** | ||
* Causes ongoing retry handlers to cancel their timeout and immediately retry | ||
* @returns a Promise which completes after all handlers have retried once. | ||
*/ | ||
shortCircuit(): Promise<void>; | ||
/** | ||
* Exits all ongoing retry handlers with an error | ||
* | ||
* @returns A promise which completes once all retry handlers have exited. | ||
*/ | ||
shutdown(): Promise<void>; | ||
/** | ||
* Retry the promise | ||
@@ -50,0 +62,0 @@ * |
"use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
exports.EventRetrier = void 0; | ||
const pDefer = require("p-defer"); | ||
const error_1 = require("./error"); | ||
@@ -19,4 +20,36 @@ /** | ||
}; | ||
this.cancelWait = pDefer(); | ||
} | ||
/** | ||
* Causes ongoing retry handlers to cancel their timeout and immediately retry | ||
* @returns a Promise which completes after all handlers have retried once. | ||
*/ | ||
async shortCircuit() { | ||
const { resolve } = this.cancelWait; | ||
// Reinitialize the promise so this class can continue to be reused | ||
this.cancelWait = pDefer(); | ||
resolve(); | ||
// The Promise.resolve PromiseJob will be enqueued after all the resolve functions on cancelWait. | ||
// Promise.race([a,b]) then enqueues another PromiseJob since it needs to resolve its promise. | ||
// We want the shutdown promise to resolve after the Promise.race.then call, meaning we enqueue a | ||
// new PromiseJob as well, which is enqueued after Promise.race enqueues it's jobs | ||
await Promise.resolve(); | ||
} | ||
/** | ||
* Exits all ongoing retry handlers with an error | ||
* | ||
* @returns A promise which completes once all retry handlers have exited. | ||
*/ | ||
async shutdown() { | ||
const { reject } = this.cancelWait; | ||
// Reinitialize the promise so this class can continue to be reused | ||
this.cancelWait = pDefer(); | ||
reject(new error_1.RetryShutdownError("Retries were shut down")); | ||
// The Promise.resolve PromiseJob will be enqueued after all the resolve functions on cancelWait. | ||
// Promise.race([a,b]) then enqueues another PromiseJob since it needs to resolve its promise. | ||
// We want the shutdown promise to resolve after the Promise.race.then call, meaning we enqueue a | ||
// new PromiseJob as well, which is enqueued after Promise.race enqueues it's jobs | ||
await Promise.resolve(); | ||
} | ||
/** | ||
* Retry the promise | ||
@@ -35,3 +68,3 @@ * | ||
catch (e) { | ||
// Ignore DroppedError by default, this prevents us from requeuing on shutdown | ||
// Ignore DroppedError by default, this prevents us from requeuing on shutdown or queue clear | ||
if (e instanceof error_1.DroppedError) { | ||
@@ -46,3 +79,17 @@ throw e; | ||
retryAttempts += 1; | ||
await new Promise(r => setTimeout(r, retryInterval)); | ||
let timeoutId = null; | ||
const waitPromise = new Promise(resolve => { | ||
timeoutId = setTimeout(() => { | ||
timeoutId = null; | ||
resolve(); | ||
}, retryInterval); | ||
}); | ||
try { | ||
await Promise.race([waitPromise, this.cancelWait.promise]); | ||
} | ||
finally { | ||
if (timeoutId) { | ||
clearTimeout(timeoutId); | ||
} | ||
} | ||
} | ||
@@ -49,0 +96,0 @@ // eslint-disable-next-line no-constant-condition |
@@ -30,3 +30,3 @@ "use strict"; | ||
while ((entryData = this.pop()) !== null) { | ||
entryData.deferred.reject(new error_1.ClearDroppedError("Message dropped due to queue shutdown")); | ||
entryData.deferred.reject(new error_1.QueueShutdownError("Message dropped due to queue shutdown")); | ||
} | ||
@@ -33,0 +33,0 @@ } |
{ | ||
"name": "fluentd-node", | ||
"version": "0.0.7", | ||
"version": "0.0.8", | ||
"description": "A fluent protocol implementation in node", | ||
@@ -5,0 +5,0 @@ "main": "./build/src/index.js", |
@@ -6,3 +6,3 @@ import { | ||
AckTimeoutError, | ||
ShutdownError, | ||
AckShutdownError, | ||
} from "./error"; | ||
@@ -443,3 +443,11 @@ import EventTime from "./event_time"; | ||
} finally { | ||
this.clearAcks(); | ||
// We want to client to be in a state where nothing is pending that isn't in the sendQueue, now that the socket is unflushable. | ||
// This means nothing is pending acknowledgemnets, and nothing is pending to retry. | ||
// As a result, we can drop all the pending events, or send them once we're connected again | ||
// Drop the acks first, as they can queue up retries which we need to short circuit | ||
await this.clearAcks(); | ||
if (this.retrier) { | ||
// Short circuit all retries, so they requeue immediately | ||
await this.retrier.shortCircuit(); | ||
} | ||
} | ||
@@ -681,9 +689,13 @@ } | ||
*/ | ||
private clearAcks(): void { | ||
private async clearAcks(): Promise<void> { | ||
for (const data of this.ackQueue.values()) { | ||
clearTimeout(data.timeoutId); | ||
data.deferred.reject(new ShutdownError("ack queue emptied")); | ||
data.deferred.reject(new AckShutdownError("ack queue emptied")); | ||
} | ||
this.ackQueue = new Map(); | ||
// We want this to resolve on the next tick, once handlers depending on the ack result have fully resolved | ||
// i.e we have emptied PromiseJobs | ||
return new Promise(r => process.nextTick(r)); | ||
} | ||
} |
@@ -74,3 +74,3 @@ class BaseError extends Error { | ||
/** | ||
* Thrown when an event is dropped for any reason | ||
* Thrown when an event is dropped from the queue for any reason | ||
*/ | ||
@@ -84,5 +84,7 @@ export class DroppedError extends BaseError { | ||
/** | ||
* Thrown when an event is dropped as a result of clearing out the queue | ||
* Thrown when an event is dropped as a result of clearing out the queue on shutdown | ||
* | ||
* Extends DroppedError since the message was dropped from the queue | ||
*/ | ||
export class ClearDroppedError extends DroppedError { | ||
export class QueueShutdownError extends DroppedError { | ||
constructor(message: string) { | ||
@@ -94,5 +96,5 @@ super(message); | ||
/** | ||
* Thrown when authentication fails on either the client or the server | ||
* Thrown into the ack queue on shutdown, terminating all promises waiting for an ack | ||
*/ | ||
export class AuthError extends BaseError { | ||
export class AckShutdownError extends BaseError { | ||
constructor(message: string) { | ||
@@ -104,5 +106,5 @@ super(message); | ||
/** | ||
* Thrown into the ack queue on shutdown, terminating all promises waiting for an ack | ||
* Thrown when the EventRetrier is shut down | ||
*/ | ||
export class ShutdownError extends BaseError { | ||
export class RetryShutdownError extends BaseError { | ||
constructor(message: string) { | ||
@@ -114,3 +116,14 @@ super(message); | ||
/** | ||
* Thrown when authentication fails on either the client or the server | ||
*/ | ||
export class AuthError extends BaseError { | ||
constructor(message: string) { | ||
super(message); | ||
} | ||
} | ||
/** | ||
* Thrown when the shared key doesn't match | ||
* | ||
* Extends AuthError, since the key was incorrect | ||
*/ | ||
@@ -117,0 +130,0 @@ export class SharedKeyMismatchError extends AuthError { |
@@ -1,2 +0,3 @@ | ||
import {DroppedError} from "./error"; | ||
import * as pDefer from "p-defer"; | ||
import {DroppedError, RetryShutdownError} from "./error"; | ||
@@ -49,2 +50,3 @@ /** | ||
private options: EventRetryOptions; | ||
private cancelWait: pDefer.DeferredPromise<void>; | ||
constructor(opts: Partial<EventRetryOptions> = {}) { | ||
@@ -60,5 +62,43 @@ this.options = { | ||
}; | ||
this.cancelWait = pDefer<void>(); | ||
} | ||
/** | ||
* Causes ongoing retry handlers to cancel their timeout and immediately retry | ||
* @returns a Promise which completes after all handlers have retried once. | ||
*/ | ||
public async shortCircuit(): Promise<void> { | ||
const {resolve} = this.cancelWait; | ||
// Reinitialize the promise so this class can continue to be reused | ||
this.cancelWait = pDefer<void>(); | ||
resolve(); | ||
// The Promise.resolve PromiseJob will be enqueued after all the resolve functions on cancelWait. | ||
// Promise.race([a,b]) then enqueues another PromiseJob since it needs to resolve its promise. | ||
// We want the shutdown promise to resolve after the Promise.race.then call, meaning we enqueue a | ||
// new PromiseJob as well, which is enqueued after Promise.race enqueues it's jobs | ||
await Promise.resolve(); | ||
} | ||
/** | ||
* Exits all ongoing retry handlers with an error | ||
* | ||
* @returns A promise which completes once all retry handlers have exited. | ||
*/ | ||
public async shutdown(): Promise<void> { | ||
const {reject} = this.cancelWait; | ||
// Reinitialize the promise so this class can continue to be reused | ||
this.cancelWait = pDefer<void>(); | ||
reject(new RetryShutdownError("Retries were shut down")); | ||
// The Promise.resolve PromiseJob will be enqueued after all the resolve functions on cancelWait. | ||
// Promise.race([a,b]) then enqueues another PromiseJob since it needs to resolve its promise. | ||
// We want the shutdown promise to resolve after the Promise.race.then call, meaning we enqueue a | ||
// new PromiseJob as well, which is enqueued after Promise.race enqueues it's jobs | ||
await Promise.resolve(); | ||
} | ||
/** | ||
* Retry the promise | ||
@@ -76,3 +116,3 @@ * | ||
} catch (e) { | ||
// Ignore DroppedError by default, this prevents us from requeuing on shutdown | ||
// Ignore DroppedError by default, this prevents us from requeuing on shutdown or queue clear | ||
if (e instanceof DroppedError) { | ||
@@ -95,3 +135,18 @@ throw e; | ||
retryAttempts += 1; | ||
await new Promise(r => setTimeout(r, retryInterval)); | ||
let timeoutId: NodeJS.Timeout | null = null; | ||
const waitPromise = new Promise<void>(resolve => { | ||
timeoutId = setTimeout(() => { | ||
timeoutId = null; | ||
resolve(); | ||
}, retryInterval); | ||
}); | ||
try { | ||
await Promise.race([waitPromise, this.cancelWait.promise]); | ||
} finally { | ||
if (timeoutId) { | ||
clearTimeout(timeoutId); | ||
} | ||
} | ||
} | ||
@@ -98,0 +153,0 @@ // eslint-disable-next-line no-constant-condition |
import * as protocol from "../protocol"; | ||
import {DeferredPromise} from "p-defer"; | ||
import {ClearDroppedError, DroppedError} from "../error"; | ||
import {DroppedError, QueueShutdownError} from "../error"; | ||
@@ -99,3 +99,3 @@ /** | ||
entryData.deferred.reject( | ||
new ClearDroppedError("Message dropped due to queue shutdown") | ||
new QueueShutdownError("Message dropped due to queue shutdown") | ||
); | ||
@@ -102,0 +102,0 @@ } |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
295334
6937