sequential-task-queue
Advanced tools
Comparing version 1.1.2 to 1.2.0
"use strict"; | ||
/** | ||
* Standard cancellation reasons. {@link SequentialTaskQueue} sets {@link CancellationToken#reason} | ||
* Standard cancellation reasons. {@link SequentialTaskQueue} sets {@link CancellationToken.reason} | ||
* to one of these values when cancelling a task for a reason other than the user code calling | ||
* {@link CancellationToken#cancel}. | ||
* {@link CancellationToken.cancel}. | ||
*/ | ||
exports.cancellationTokenReasons = { | ||
/** Used when the task was cancelled in response to a call to {@link SequentialTaskQueue#cancel} */ | ||
/** Used when the task was cancelled in response to a call to {@link SequentialTaskQueue.cancel} */ | ||
cancel: Object.create(null), | ||
@@ -27,3 +27,3 @@ /** Used when the task was cancelled after its timeout has passed */ | ||
* Creates a new instance of {@link SequentialTaskQueue} | ||
* @param {TaskQueueOptions} options - Configuration options for the task queue. | ||
* @param options - Configuration options for the task queue. | ||
*/ | ||
@@ -40,3 +40,3 @@ constructor(options) { | ||
} | ||
/** Indicates if the queue has been closed. Calling {@link SequentialTaskQueue#push} on a closed queue will result in an exception. */ | ||
/** Indicates if the queue has been closed. Calling {@link SequentialTaskQueue.push} on a closed queue will result in an exception. */ | ||
get isClosed() { | ||
@@ -47,4 +47,4 @@ return this._isClosed; | ||
* Adds a new task to the queue. | ||
* @param {Function} task - The function to call when the task is run | ||
* @param {number} timeout - An optional timeout (in milliseconds) for the task, after which it should be cancelled to avoid hanging tasks clogging up the queue. | ||
* @param task - The function to call when the task is run | ||
* @param timeout - An optional timeout (in milliseconds) for the task, after which it should be cancelled to avoid hanging tasks clogging up the queue. | ||
* @returns A {@link CancellationToken} that may be used to cancel the task before it completes. | ||
@@ -61,3 +61,5 @@ */ | ||
cancel: (reason) => this.cancelTask(taskEntry, reason) | ||
} | ||
}, | ||
resolve: undefined, | ||
reject: undefined | ||
}; | ||
@@ -67,3 +69,8 @@ taskEntry.args.push(taskEntry.cancellationToken); | ||
this.scheduler.schedule(() => this.next()); | ||
return taskEntry.cancellationToken; | ||
var result = new Promise((resolve, reject) => { | ||
taskEntry.resolve = resolve; | ||
taskEntry.reject = reject; | ||
}); | ||
result.cancel = (reason) => taskEntry.cancellationToken.cancel(reason); | ||
return result; | ||
} | ||
@@ -77,5 +84,8 @@ /** | ||
this.cancelTask(this.currentTask, exports.cancellationTokenReasons.cancel); | ||
// emit a drained event if there were tasks waiting in the queue | ||
if (this.queue.splice(0).length) | ||
var queue = this.queue.splice(0); | ||
// Cancel all and emit a drained event if there were tasks waiting in the queue | ||
if (queue.length) { | ||
queue.forEach(task => this.cancelTask(task, exports.cancellationTokenReasons.cancel)); | ||
this.emit(exports.sequentialTaskQueueEvents.drained); | ||
} | ||
return this.wait(); | ||
@@ -85,3 +95,3 @@ } | ||
* Closes the queue, preventing new tasks to be added. | ||
* Any calls to {@link SequentialTaskQueue#push} after closing the queue will result in an exception. | ||
* Any calls to {@link SequentialTaskQueue.push} after closing the queue will result in an exception. | ||
* @param {boolean} cancel - Indicates that the queue should also be cancelled. | ||
@@ -125,3 +135,3 @@ * @returns {Promise} A Promise that is fulfilled when the queue has finished executing remaining tasks. | ||
var cb = (...args) => { | ||
this.off(evt, cb); | ||
this.removeListener(evt, cb); | ||
handler.apply(this, args); | ||
@@ -136,3 +146,3 @@ }; | ||
*/ | ||
off(evt, handler) { | ||
removeListener(evt, handler) { | ||
if (this.events) { | ||
@@ -151,2 +161,6 @@ var list = this.events[evt]; | ||
} | ||
/** @see {@link SequentialTaskQueue.removeListener} */ | ||
off(evt, handler) { | ||
return this.removeListener(evt, handler); | ||
} | ||
emit(evt, ...args) { | ||
@@ -179,3 +193,4 @@ if (this.events && this.events[evt]) | ||
if (res && isPromise(res)) { | ||
res.then(() => { | ||
res.then(result => { | ||
task.result = result; | ||
this.doneTask(task); | ||
@@ -186,4 +201,6 @@ }, err => { | ||
} | ||
else | ||
else { | ||
task.result = res; | ||
this.doneTask(task); | ||
} | ||
} | ||
@@ -209,4 +226,10 @@ catch (e) { | ||
task.cancellationToken.cancel = noop; | ||
if (error) | ||
if (error) { | ||
this.emit(exports.sequentialTaskQueueEvents.error, error); | ||
task.reject.call(undefined, error); | ||
} | ||
else if (task.cancellationToken.cancelled) | ||
task.reject.call(undefined, task.cancellationToken.reason); | ||
else | ||
task.resolve.call(undefined, task.result); | ||
if (this.currentTask === task) { | ||
@@ -241,3 +264,1 @@ this.currentTask = undefined; | ||
}; | ||
//# sourceMappingURL=data:application/json;base64,{"version":3,"sources":["sequential-task-queue.ts"],"names":[],"mappings":";AA2EA;;;;GAIG;AACQ,gCAAwB,GAAG;IAClC,mGAAmG;IACnG,MAAM,EAAE,MAAM,CAAC,MAAM,CAAC,IAAI,CAAC;IAC3B,oEAAoE;IACpE,OAAO,EAAE,MAAM,CAAC,MAAM,CAAC,IAAI,CAAC;CAC/B,CAAA;AAED;;GAEG;AACQ,iCAAyB,GAAG;IACnC,OAAO,EAAE,SAAS;IAClB,KAAK,EAAE,OAAO;IACd,OAAO,EAAE,SAAS;CACrB,CAAA;AAED;;GAEG;AACH;IAqBI;;;MAGE;IACF,YAAY,OAAoC;QAnBxC,UAAK,GAAgB,EAAE,CAAC;QACxB,cAAS,GAAY,KAAK,CAAC;QAC3B,YAAO,GAAe,EAAE,CAAC;QAkB7B,EAAE,CAAC,CAAC,CAAC,OAAO,CAAC;YACT,OAAO,GAAG,EAAE,CAAC;QACjB,IAAI,CAAC,cAAc,GAAG,OAAO,CAAC,OAAO,CAAC;QACtC,IAAI,CAAC,IAAI,GAAG,OAAO,CAAC,IAAI,IAAI,qBAAqB,CAAC;QAClD,IAAI,CAAC,SAAS,GAAG,OAAO,CAAC,SAAS,IAAI,mBAAmB,CAAC,gBAAgB,CAAC;IAC/E,CAAC;IAfD,sIAAsI;IACtI,IAAI,QAAQ;QACR,MAAM,CAAC,IAAI,CAAC,SAAS,CAAC;IAC1B,CAAC;IAcD;;;;;OAKG;IACH,IAAI,CAAC,IAAc,EAAE,OAAqB;QACtC,EAAE,CAAC,CAAC,IAAI,CAAC,SAAS,CAAC;YACf,MAAM,IAAI,KAAK,CAAC,GAAG,IAAI,CAAC,IAAI,6BAA6B,CAAC,CAAC;QAC/D,IAAI,SAAS,GAAc;YACvB,QAAQ,EAAE,IAAI;YACd,IAAI,EAAE,OAAO,IAAI,OAAO,CAAC,IAAI,GAAG,CAAC,KAAK,CAAC,OAAO,CAAC,OAAO,CAAC,IAAI,CAAC,GAAG,OAAO,CAAC,IAAI,CAAC,KAAK,EAAE,GAAG,CAAC,OAAO,CAAC,IAAI,CAAC,CAAC,GAAG,EAAE;YAC1G,OAAO,EAAE,OAAO,IAAI,OAAO,CAAC,OAAO,KAAK,SAAS,GAAG,OAAO,CAAC,OAAO,GAAG,IAAI,CAAC,cAAc;YACzF,iBAAiB,EAAE;gBACf,MAAM,EAAE,CAAC,MAAO,KAAK,IAAI,CAAC,UAAU,CAAC,SAAS,EAAE,MAAM,CAAC;aAC1D;SACJ,CAAC;QACF,SAAS,CAAC,IAAI,CAAC,IAAI,CAAC,SAAS,CAAC,iBAAiB,CAAC,CAAC;QACjD,IAAI,CAAC,KAAK,CAAC,IAAI,CAAC,SAAS,CAAC,CAAC;QAC3B,IAAI,CAAC,SAAS,CAAC,QAAQ,CAAC,MAAM,IAAI,CAAC,IAAI,EAAE,CAAC,CAAC;QAC3C,MAAM,CAAC,SAAS,CAAC,iBAAiB,CAAC;IACvC,CAAC;IAED;;;OAGG;IACH,MAAM;QACF,EAAE,CAAC,CAAC,IAAI,CAAC,WAAW,CAAC;YACjB,IAAI,CAAC,UAAU,CAAC,IAAI,CAAC,WAAW,EAAE,gCAAwB,CAAC,MAAM,CAAC,CAAC;QACvE,gEAAgE;QAChE,EAAE,CAAC,CAAC,IAAI,CAAC,KAAK,CAAC,MAAM,CAAC,CAAC,CAAC,CAAC,MAAM,CAAC;YAC5B,IAAI,CAAC,IAAI,CAAC,iCAAyB,CAAC,OAAO,CAAC,CAAC;QACjD,MAAM,CAAC,IAAI,CAAC,IAAI,EAAE,CAAC;IACvB,CAAC;IAED;;;;;OAKG;IACH,KAAK,CAAC,MAAgB;QAClB,EAAE,CAAC,CAAC,CAAC,IAAI,CAAC,SAAS,CAAC,CAAC,CAAC;YAClB,IAAI,CAAC,SAAS,GAAG,IAAI,CAAC;YACtB,EAAE,CAAC,CAAC,MAAM,CAAC;gBACP,MAAM,CAAC,IAAI,CAAC,MAAM,EAAE,CAAC;QAC7B,CAAC;QACD,MAAM,CAAC,IAAI,CAAC,IAAI,EAAE,CAAC;IACvB,CAAC;IAED;;;OAGG;IACH,IAAI;QACA,EAAE,CAAC,CAAC,CAAC,IAAI,CAAC,WAAW,IAAI,IAAI,CAAC,KAAK,CAAC,MAAM,KAAK,CAAC,CAAC;YAC7C,MAAM,CAAC,OAAO,CAAC,OAAO,EAAE,CAAC;QAC7B,MAAM,CAAC,IAAI,OAAO,CAAC,OAAO;YACtB,IAAI,CAAC,OAAO,CAAC,IAAI,CAAC,OAAO,CAAC,CAAC;QAC/B,CAAC,CAAC,CAAC;IACP,CAAC;IAED;;;;OAIG;IACH,EAAE,CAAC,GAAW,EAAE,OAAiB;QAC7B,IAAI,CAAC,MAAM,GAAG,IAAI,CAAC,MAAM,IAAI,EAAE,CAAC;QAChC,CAAC,IAAI,CAAC,MAAM,CAAC,GAAG,CAAC,IAAI,CAAC,IAAI,CAAC,MAAM,CAAC,GAAG,CAAC,GAAG,EAAE,CAAC,CAAC,CAAC,IAAI,CAAC,OAAO,CAAC,CAAC;IAChE,CAAC;IAED;;;;OAIG;IACH,IAAI,CAAC,GAAW,EAAE,OAAiB;QAC/B,IAAI,EAAE,GAAG,CAAC,GAAG,IAAW;YACpB,IAAI,CAAC,GAAG,CAAC,GAAG,EAAE,EAAE,CAAC,CAAC;YAClB,OAAO,CAAC,KAAK,CAAC,IAAI,EAAE,IAAI,CAAC,CAAC;QAC9B,CAAC,CAAC;QACF,IAAI,CAAC,EAAE,CAAC,GAAG,EAAE,EAAE,CAAC,CAAC;IACrB,CAAC;IAED;;;;OAIG;IACH,GAAG,CAAC,GAAW,EAAE,OAAiB;QAC9B,EAAE,CAAC,CAAC,IAAI,CAAC,MAAM,CAAC,CAAC,CAAC;YACd,IAAI,IAAI,GAAG,IAAI,CAAC,MAAM,CAAC,GAAG,CAAC,CAAC;YAC5B,EAAE,CAAC,CAAC,IAAI,CAAC,CAAC,CAAC;gBACP,IAAI,CAAC,GAAG,CAAC,CAAC;gBACV,OAAO,CAAC,GAAG,IAAI,CAAC,MAAM,EAAE,CAAC;oBACrB,EAAE,CAAC,CAAC,IAAI,CAAC,CAAC,CAAC,KAAK,OAAO,CAAC;wBACpB,IAAI,CAAC,MAAM,CAAC,CAAC,EAAE,CAAC,CAAC,CAAC;oBACtB,IAAI;wBACA,CAAC,EAAE,CAAC;gBACZ,CAAC;YACL,CAAC;QACL,CAAC;IACL,CAAC;IAEO,IAAI,CAAC,GAAW,EAAE,GAAG,IAAW;QACpC,EAAE,CAAC,CAAC,IAAI,CAAC,MAAM,IAAI,IAAI,CAAC,MAAM,CAAC,GAAG,CAAC,CAAC;YAChC,IAAI,CAAC;gBACD,IAAI,CAAC,MAAM,CAAC,GAAG,CAAC,CAAC,OAAO,CAAC,EAAE,IAAI,EAAE,CAAC,KAAK,CAAC,IAAI,EAAE,IAAI,CAAC,CAAC,CAAC;YACzD,CAAE;YAAA,KAAK,CAAC,CAAC,CAAC,CAAC,CAAC,CAAC;gBACT,OAAO,CAAC,KAAK,CAAC,GAAG,IAAI,CAAC,IAAI,mBAAmB,GAAG,iBAAiB,EAAE,CAAC,CAAC,CAAC;YAC1E,CAAC;IACT,CAAC;IAEO,IAAI;QACR,2DAA2D;QAC3D,EAAE,CAAC,CAAC,CAAC,IAAI,CAAC,WAAW,CAAC,CAAC,CAAC;YACpB,IAAI,IAAI,GAAG,IAAI,CAAC,KAAK,CAAC,KAAK,EAAE,CAAC;YAC9B,uBAAuB;YACvB,OAAO,IAAI,IAAI,IAAI,CAAC,iBAAiB,CAAC,SAAS;gBAC3C,IAAI,GAAG,IAAI,CAAC,KAAK,CAAC,KAAK,EAAE,CAAC;YAC9B,EAAE,CAAC,CAAC,IAAI,CAAC,CAAC,CAAC;gBACP,IAAI,CAAC;oBACD,IAAI,CAAC,WAAW,GAAG,IAAI,CAAC;oBACxB,EAAE,CAAC,CAAC,IAAI,CAAC,OAAO,CAAC,CAAC,CAAC;wBACf,IAAI,CAAC,aAAa,GAAG,UAAU,CAC3B;4BACI,IAAI,CAAC,IAAI,CAAC,iCAAyB,CAAC,OAAO,CAAC,CAAC;4BAC7C,IAAI,CAAC,UAAU,CAAC,IAAI,EAAE,gCAAwB,CAAC,OAAO,CAAC,CAAC;wBAC5D,CAAC,EACD,IAAI,CAAC,OAAO,CAAC,CAAC;oBACtB,CAAC;oBACD,IAAI,GAAG,GAAG,IAAI,CAAC,QAAQ,CAAC,KAAK,CAAC,SAAS,EAAE,IAAI,CAAC,IAAI,CAAC,CAAC;oBACpD,EAAE,CAAC,CAAC,GAAG,IAAI,SAAS,CAAC,GAAG,CAAC,CAAC,CAAC,CAAC;wBACxB,GAAG,CAAC,IAAI,CAAC;4BACD,IAAI,CAAC,QAAQ,CAAC,IAAI,CAAC,CAAC;wBACxB,CAAC,EACD,GAAG;4BACC,IAAI,CAAC,QAAQ,CAAC,IAAI,EAAE,GAAG,CAAC,CAAC;wBAC7B,CAAC,CAAC,CAAC;oBACX,CAAC;oBAAC,IAAI;wBACF,IAAI,CAAC,QAAQ,CAAC,IAAI,CAAC,CAAC;gBAE5B,CAAE;gBAAA,KAAK,CAAC,CAAC,CAAC,CAAC,CAAC,CAAC;oBACT,IAAI,CAAC,QAAQ,CAAC,IAAI,EAAE,CAAC,CAAC,CAAC;gBAC3B,CAAC;YACL,CAAC;YAAC,IAAI,CAAC,CAAC;gBACJ,+BAA+B;gBAC/B,IAAI,CAAC,WAAW,EAAE,CAAC;YACvB,CAAC;QACL,CAAC;IACL,CAAC;IAEO,UAAU,CAAC,IAAe,EAAE,MAAY;QAC5C,IAAI,CAAC,iBAAiB,CAAC,SAAS,GAAG,IAAI,CAAC;QACxC,IAAI,CAAC,iBAAiB,CAAC,MAAM,GAAG,MAAM,CAAC;QACvC,IAAI,CAAC,QAAQ,CAAC,IAAI,CAAC,CAAC;IACxB,CAAC;IAEO,QAAQ,CAAC,IAAe,EAAE,KAAW;QACzC,EAAE,CAAC,CAAC,IAAI,CAAC,aAAa,CAAC;YACnB,YAAY,CAAC,IAAI,CAAC,aAAa,CAAC,CAAC;QACrC,IAAI,CAAC,iBAAiB,CAAC,MAAM,GAAG,IAAI,CAAC;QACrC,EAAE,CAAC,CAAC,KAAK,CAAC;YACN,IAAI,CAAC,IAAI,CAAC,iCAAyB,CAAC,KAAK,EAAE,KAAK,CAAC,CAAC;QACtD,EAAE,CAAC,CAAC,IAAI,CAAC,WAAW,KAAK,IAAI,CAAC,CAAC,CAAC;YAC5B,IAAI,CAAC,WAAW,GAAG,SAAS,CAAC;YAC7B,EAAE,CAAC,CAAC,CAAC,IAAI,CAAC,KAAK,CAAC,MAAM,CAAC,CAAC,CAAC;gBACrB,IAAI,CAAC,IAAI,CAAC,iCAAyB,CAAC,OAAO,CAAC,CAAC;gBAC7C,IAAI,CAAC,WAAW,EAAE,CAAC;YACvB,CAAC;YACD,IAAI;gBACA,IAAI,CAAC,SAAS,CAAC,QAAQ,CAAC,MAAM,IAAI,CAAC,IAAI,EAAE,CAAC,CAAC;QACnD,CAAC;IACL,CAAC;IAEO,WAAW;QACf,IAAI,OAAO,GAAG,IAAI,CAAC,OAAO,CAAC,MAAM,CAAC,CAAC,CAAC,CAAC;QACrC,OAAO,CAAC,OAAO,CAAC,MAAM,IAAI,MAAM,EAAE,CAAC,CAAC;IACxC,CAAC;AACL,CAAC;AApNU,oCAAgB,GAAc;IACjC,QAAQ,EAAE,QAAQ,IAAI,UAAU,CAAM,QAAQ,EAAE,CAAC,CAAC;CACrD,CAAC;AAJO,2BAAmB,sBAsN/B,CAAA;AAUD;AACA,CAAC;AAED,mBAAmB,GAAQ;IACvB,MAAM,CAAC,CAAC,GAAG,IAAI,OAAO,GAAG,CAAC,IAAI,KAAK,UAAU,CAAC,CAAC;AACnD,CAAC;AAED,mBAAmB,CAAC,gBAAgB,GAAG;IACnC,QAAQ,EAAE,OAAO,YAAY,KAAK,UAAU;UACtC,QAAQ,IAAI,YAAY,CAA2B,QAAQ,CAAC;UAC5D,QAAQ,IAAI,UAAU,CAA2B,QAAQ,EAAE,CAAC,CAAC;CACtE,CAAC","file":"sequential-task-queue.js","sourcesContent":["/** \r\n * Represents an object that schedules a function for asynchronous execution.\r\n * The default implementation used by {@link SequentialTaskQueue} calls {@link setImmediate} when available,\r\n * and {@link setTimeout} otherwise.\r\n * @see {@link SequentialTaskQueue.defaultScheduler}\r\n * @see {@link TaskQueueOptions#scheduler}\r\n */\r\nexport interface Scheduler {\r\n    /**\r\n     * Schedules a callback for asynchronous execution.\r\n     */\r\n    schedule(callback: Function): void;\r\n}\r\n\r\n/**\r\n * Object used for passing configuration options to the {@link SequentialTaskQueue} constructor.\r\n */\r\nexport interface SequentialTaskQueueOptions {\r\n    /**\r\n     * Assigns a name to the task queue for diagnostic purposes. The name does not need to be unique.\r\n     */\r\n    name?: string;\r\n    /**\r\n     * Default timeout (in milliseconds) for tasks pushed to the queue. Default is 0 (no timeout).\r\n     *  */    \r\n    timeout?: number;\r\n    /**\r\n     * Scheduler used by the queue. Defaults to {@link SequentialTaskQueue.defaultScheduler}. \r\n     */\r\n    scheduler?: Scheduler;\r\n}\r\n\r\n/**\r\n * Options object for individual tasks.\r\n */\r\nexport interface TaskOptions {\r\n    /**\r\n     * Timeout for the task, in milliseconds. \r\n     * */\r\n    timeout?: number;\r\n\r\n    /**\r\n     * Arguments to pass to the task. Useful for minimalising the number of Function objects and closures created \r\n     * when pushing the same task multiple times, with different arguments.  \r\n     * \r\n     * @example\r\n     * // The following code creates a single Function object and no closures:\r\n     * for (let i = 0; i < 100; i++)\r\n     *     queue.push(process, {args: [i]});\r\n     * function process(n) {\r\n     *     console.log(n);\r\n     * }\r\n     */\r\n    args?: any;    \r\n}\r\n\r\n/**\r\n * Provides the API for querying and invoking task cancellation.\r\n */\r\nexport interface CancellationToken {\r\n    /**\r\n     * When `true`, indicates that the task has been cancelled. \r\n     */\r\n    cancelled?: boolean;\r\n    /**\r\n     * An arbitrary object representing the reason of the cancellation. Can be a member of the {@link cancellationTokenReasons} object or an `Error`, etc.  \r\n     */\r\n    reason?: any;\r\n    /**\r\n     * Cancels the task for which the cancellation token was created.\r\n     * @param reason - The reason of the cancellation, see {@link CancellationToken#reason} \r\n     */\r\n    cancel(reason?: any);\r\n}\r\n\r\n/**\r\n * Standard cancellation reasons. {@link SequentialTaskQueue} sets {@link CancellationToken#reason} \r\n * to one of these values when cancelling a task for a reason other than the user code calling\r\n * {@link CancellationToken#cancel}.    \r\n */\r\nexport var cancellationTokenReasons = {\r\n    /** Used when the task was cancelled in response to a call to {@link SequentialTaskQueue#cancel} */\r\n    cancel: Object.create(null),\r\n    /** Used when the task was cancelled after its timeout has passed */\r\n    timeout: Object.create(null)\r\n}\r\n\r\n/**\r\n * Standard event names used by {@link SequentialTaskQueue}\r\n */\r\nexport var sequentialTaskQueueEvents = {\r\n    drained: \"drained\",\r\n    error: \"error\",\r\n    timeout: \"timeout\"\r\n}\r\n\r\n/**\r\n * FIFO task queue to run tasks in predictable order, without concurrency.\r\n */\r\nexport class SequentialTaskQueue {\r\n\r\n    static defaultScheduler: Scheduler = {\r\n        schedule: callback => setTimeout(<any>callback, 0)\r\n    };\r\n\r\n    private queue: TaskEntry[] = [];\r\n    private _isClosed: boolean = false;\r\n    private waiters: Function[] = [];\r\n    private defaultTimeout: number;\r\n    private currentTask: TaskEntry;\r\n    private scheduler: Scheduler;\r\n    private events: { [key: string]: Function[] };\r\n\r\n    name: string;\r\n\r\n    /** Indicates if the queue has been closed. Calling {@link SequentialTaskQueue#push} on a closed queue will result in an exception. */\r\n    get isClosed() {\r\n        return this._isClosed;\r\n    }\r\n\r\n    /** \r\n     * Creates a new instance of {@link SequentialTaskQueue}\r\n     * @param {TaskQueueOptions} options - Configuration options for the task queue.\r\n    */\r\n    constructor(options?: SequentialTaskQueueOptions) {\r\n        if (!options)\r\n            options = {};\r\n        this.defaultTimeout = options.timeout;\r\n        this.name = options.name || \"SequentialTaskQueue\";\r\n        this.scheduler = options.scheduler || SequentialTaskQueue.defaultScheduler;\r\n    }\r\n\r\n    /**\r\n     * Adds a new task to the queue.\r\n     * @param {Function} task - The function to call when the task is run\r\n     * @param {number} timeout - An optional timeout (in milliseconds) for the task, after which it should be cancelled to avoid hanging tasks clogging up the queue. \r\n     * @returns A {@link CancellationToken} that may be used to cancel the task before it completes.\r\n     */\r\n    push(task: Function, options?: TaskOptions): CancellationToken {\r\n        if (this._isClosed)\r\n            throw new Error(`${this.name} has been previously closed`);\r\n        var taskEntry: TaskEntry = {\r\n            callback: task,\r\n            args: options && options.args ? (Array.isArray(options.args) ? options.args.slice() : [options.args]) : [],\r\n            timeout: options && options.timeout !== undefined ? options.timeout : this.defaultTimeout,\r\n            cancellationToken: {\r\n                cancel: (reason?) => this.cancelTask(taskEntry, reason)\r\n            }\r\n        };\r\n        taskEntry.args.push(taskEntry.cancellationToken);\r\n        this.queue.push(taskEntry);\r\n        this.scheduler.schedule(() => this.next());\r\n        return taskEntry.cancellationToken;\r\n    }\r\n\r\n    /**\r\n     * Cancels the currently running task (if any), and clears the queue.\r\n     * @returns {Promise} A Promise that is fulfilled when the queue is empty and the current task has been cancelled.\r\n     */\r\n    cancel(): PromiseLike<any> {\r\n        if (this.currentTask) \r\n            this.cancelTask(this.currentTask, cancellationTokenReasons.cancel);\r\n        // emit a drained event if there were tasks waiting in the queue\r\n        if (this.queue.splice(0).length)\r\n            this.emit(sequentialTaskQueueEvents.drained);\r\n        return this.wait();\r\n    }\r\n\r\n    /**\r\n     * Closes the queue, preventing new tasks to be added. \r\n     * Any calls to {@link SequentialTaskQueue#push} after closing the queue will result in an exception.\r\n     * @param {boolean} cancel - Indicates that the queue should also be cancelled.\r\n     * @returns {Promise} A Promise that is fulfilled when the queue has finished executing remaining tasks.  \r\n     */\r\n    close(cancel?: boolean): PromiseLike<any> {\r\n        if (!this._isClosed) {\r\n            this._isClosed = true;\r\n            if (cancel)\r\n                return this.cancel();\r\n        }\r\n        return this.wait();\r\n    }\r\n\r\n    /**\r\n     * Returns a promise that is fulfilled when the queue is empty.\r\n     * @returns {Promise}\r\n     */\r\n    wait(): PromiseLike<any> {\r\n        if (!this.currentTask && this.queue.length === 0)\r\n            return Promise.resolve();\r\n        return new Promise(resolve => {\r\n            this.waiters.push(resolve);\r\n        });\r\n    }\r\n\r\n    /**\r\n     * Adds an event handler for a named event.\r\n     * @param {string} evt - Event name. See the readme for a list of valid events.\r\n     * @param {Function} handler - Event handler. When invoking the handler, the queue will set itself as the `this` argument of the call. \r\n     */\r\n    on(evt: string, handler: Function) {\r\n        this.events = this.events || {};\r\n        (this.events[evt] || (this.events[evt] = [])).push(handler);     \r\n    }\r\n\r\n    /**\r\n     * Adds a single-shot event handler for a named event.\r\n     * @param {string} evt - Event name. See the readme for a list of valid events.\r\n     * @param {Function} handler - Event handler. When invoking the handler, the queue will set itself as the `this` argument of the call. \r\n     */\r\n    once(evt: string, handler: Function) {\r\n        var cb = (...args: any[]) => {\r\n            this.off(evt, cb);\r\n            handler.apply(this, args);\r\n        };\r\n        this.on(evt, cb);\r\n    }\r\n\r\n    /**\r\n     * Removes an event handler.\r\n     * @param {string} evt - Event name\r\n     * @param {Function} handler - Event handler to be removed\r\n     */\r\n    off(evt: string, handler: Function) {\r\n        if (this.events) {\r\n            var list = this.events[evt];\r\n            if (list) {\r\n                var i = 0;\r\n                while (i < list.length) {\r\n                    if (list[i] === handler)\r\n                        list.splice(i, 1);\r\n                    else\r\n                        i++;\r\n                }\r\n            }\r\n        }\r\n    }\r\n\r\n    private emit(evt: string, ...args: any[]) {\r\n        if (this.events && this.events[evt])\r\n            try { \r\n                this.events[evt].forEach(fn => fn.apply(this, args));\r\n            } catch (e) {\r\n                console.error(`${this.name}: Exception in '${evt}' event handler`, e);\r\n            }\r\n    }\r\n\r\n    private next() {\r\n        // Try running the next task, if not currently running one \r\n        if (!this.currentTask) {\r\n            var task = this.queue.shift();\r\n            // skip cancelled tasks\r\n            while (task && task.cancellationToken.cancelled)\r\n                task = this.queue.shift();\r\n            if (task) {\r\n                try {\r\n                    this.currentTask = task;\r\n                    if (task.timeout) {\r\n                        task.timeoutHandle = setTimeout(\r\n                            () => {\r\n                                this.emit(sequentialTaskQueueEvents.timeout);\r\n                                this.cancelTask(task, cancellationTokenReasons.timeout);\r\n                            }, \r\n                            task.timeout);\r\n                    }\r\n                    let res = task.callback.apply(undefined, task.args);\r\n                    if (res && isPromise(res)) {\r\n                        res.then(() => {\r\n                                this.doneTask(task);\r\n                            },\r\n                            err => {\r\n                                this.doneTask(task, err);\r\n                            });\r\n                    } else\r\n                        this.doneTask(task);\r\n\r\n                } catch (e) {\r\n                    this.doneTask(task, e);\r\n                }\r\n            } else {\r\n                // queue is empty, call waiters\r\n                this.callWaiters(); \r\n            }\r\n        }\r\n    }\r\n\r\n    private cancelTask(task: TaskEntry, reason?: any) {\r\n        task.cancellationToken.cancelled = true;\r\n        task.cancellationToken.reason = reason;\r\n        this.doneTask(task);\r\n    }\r\n\r\n    private doneTask(task: TaskEntry, error?: any) {\r\n        if (task.timeoutHandle)\r\n            clearTimeout(task.timeoutHandle);\r\n        task.cancellationToken.cancel = noop;\r\n        if (error)\r\n            this.emit(sequentialTaskQueueEvents.error, error);\r\n        if (this.currentTask === task) {\r\n            this.currentTask = undefined;\r\n            if (!this.queue.length) {\r\n                this.emit(sequentialTaskQueueEvents.drained);\r\n                this.callWaiters();\r\n            }\r\n            else\r\n                this.scheduler.schedule(() => this.next());\r\n        }\r\n    }\r\n\r\n    private callWaiters() {\r\n        let waiters = this.waiters.splice(0);\r\n        waiters.forEach(waiter => waiter());\r\n    }\r\n}\r\n\r\ninterface TaskEntry {\r\n    args: any[];\r\n    callback: Function;\r\n    timeout?: number;\r\n    timeoutHandle?: any;\r\n    cancellationToken: CancellationToken;\r\n}\r\n\r\nfunction noop() {\r\n}\r\n\r\nfunction isPromise(obj: any): obj is PromiseLike<any> {\r\n    return (obj && typeof obj.then === \"function\");\r\n}\r\n\r\nSequentialTaskQueue.defaultScheduler = {\r\n    schedule: typeof setImmediate === \"function\" \r\n        ? callback => setImmediate(<(...args: any[]) => void>callback)\r\n        : callback => setTimeout(<(...args: any[]) => void>callback, 0)\r\n};\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n\r\n"],"sourceRoot":"/source/"} |
@@ -6,3 +6,3 @@ /** | ||
* @see {@link SequentialTaskQueue.defaultScheduler} | ||
* @see {@link TaskQueueOptions#scheduler} | ||
* @see {@link TaskQueueOptions.scheduler} | ||
*/ | ||
@@ -68,3 +68,3 @@ export interface Scheduler { | ||
* Cancels the task for which the cancellation token was created. | ||
* @param reason - The reason of the cancellation, see {@link CancellationToken#reason} | ||
* @param reason - The reason of the cancellation, see {@link CancellationToken.reason} | ||
*/ | ||
@@ -74,5 +74,5 @@ cancel(reason?: any): any; | ||
/** | ||
* Standard cancellation reasons. {@link SequentialTaskQueue} sets {@link CancellationToken#reason} | ||
* Standard cancellation reasons. {@link SequentialTaskQueue} sets {@link CancellationToken.reason} | ||
* to one of these values when cancelling a task for a reason other than the user code calling | ||
* {@link CancellationToken#cancel}. | ||
* {@link CancellationToken.cancel}. | ||
*/ | ||
@@ -92,2 +92,12 @@ export declare var cancellationTokenReasons: { | ||
/** | ||
* Promise interface with the ability to cancel. | ||
*/ | ||
export interface CancellablePromiseLike<T> extends PromiseLike<T> { | ||
/** | ||
* Cancels (and consequently, rejects) the task associated with the Promise. | ||
* @param reason - Reason of the cancellation. This value will be passed when rejecting this Promise. | ||
*/ | ||
cancel(reason?: any): void; | ||
} | ||
/** | ||
* FIFO task queue to run tasks in predictable order, without concurrency. | ||
@@ -105,7 +115,7 @@ */ | ||
name: string; | ||
/** Indicates if the queue has been closed. Calling {@link SequentialTaskQueue#push} on a closed queue will result in an exception. */ | ||
/** Indicates if the queue has been closed. Calling {@link SequentialTaskQueue.push} on a closed queue will result in an exception. */ | ||
readonly isClosed: boolean; | ||
/** | ||
* Creates a new instance of {@link SequentialTaskQueue} | ||
* @param {TaskQueueOptions} options - Configuration options for the task queue. | ||
* @param options - Configuration options for the task queue. | ||
*/ | ||
@@ -115,7 +125,7 @@ constructor(options?: SequentialTaskQueueOptions); | ||
* Adds a new task to the queue. | ||
* @param {Function} task - The function to call when the task is run | ||
* @param {number} timeout - An optional timeout (in milliseconds) for the task, after which it should be cancelled to avoid hanging tasks clogging up the queue. | ||
* @param task - The function to call when the task is run | ||
* @param timeout - An optional timeout (in milliseconds) for the task, after which it should be cancelled to avoid hanging tasks clogging up the queue. | ||
* @returns A {@link CancellationToken} that may be used to cancel the task before it completes. | ||
*/ | ||
push(task: Function, options?: TaskOptions): CancellationToken; | ||
push(task: Function, options?: TaskOptions): CancellablePromiseLike<any>; | ||
/** | ||
@@ -128,3 +138,3 @@ * Cancels the currently running task (if any), and clears the queue. | ||
* Closes the queue, preventing new tasks to be added. | ||
* Any calls to {@link SequentialTaskQueue#push} after closing the queue will result in an exception. | ||
* Any calls to {@link SequentialTaskQueue.push} after closing the queue will result in an exception. | ||
* @param {boolean} cancel - Indicates that the queue should also be cancelled. | ||
@@ -156,2 +166,4 @@ * @returns {Promise} A Promise that is fulfilled when the queue has finished executing remaining tasks. | ||
*/ | ||
removeListener(evt: string, handler: Function): void; | ||
/** @see {@link SequentialTaskQueue.removeListener} */ | ||
off(evt: string, handler: Function): void; | ||
@@ -158,0 +170,0 @@ private emit(evt, ...args); |
{ | ||
"name": "sequential-task-queue", | ||
"version": "1.1.2", | ||
"version": "1.2.0", | ||
"description": "FIFO task queue for node and the browser", | ||
@@ -19,5 +19,5 @@ "author": { | ||
"devDependencies": { | ||
"@types/mocha": "^2.2.32", | ||
"@types/node": "^6.0.41", | ||
"@types/sinon": "^1.16.31", | ||
"@types/mocha": "^2.2.39", | ||
"@types/node": "^7.0.5", | ||
"@types/sinon": "^1.16.35", | ||
"del": "^2.2.2", | ||
@@ -27,14 +27,15 @@ "gulp": "^3.9.1", | ||
"gulp-rename": "^1.2.2", | ||
"gulp-sourcemaps": "^1.6.0", | ||
"gulp-sourcemaps": "^2.4.1", | ||
"gulp-template": "^4.0.0", | ||
"gulp-typedoc": "^2.0.0", | ||
"gulp-typescript": "^3.0.1", | ||
"mocha": "^3.0.2", | ||
"gulp-typedoc": "^2.0.2", | ||
"gulp-typescript": "^3.1.5", | ||
"mocha": "^3.2.0", | ||
"run-sequence": "^1.2.2", | ||
"sinon": "^1.17.5", | ||
"sinon": "^1.17.7", | ||
"snip-text": "^1.0.0", | ||
"typedoc": "^0.4.5", | ||
"typedoc": "^0.5.6", | ||
"typedoc-markdown-theme": "0.0.4", | ||
"typescript": "^2.0.3" | ||
"typescript": "^2.1.6", | ||
"vinyl-paths": "^2.1.0" | ||
} | ||
} |
@@ -10,3 +10,3 @@ # SequentialTaskQueue | ||
Use `push` to add tasks to the queue: | ||
Use `push` to add tasks to the queue. The method returns a `Promise` that will fulfill when the task has been executed or cancelled. | ||
@@ -25,3 +25,3 @@ ```js | ||
If the function passed to `push` returns a promise, the queue will wait for that promise to fulfill before moving to the next task. | ||
If the function passed to `push` returns a `Promise`, the queue will wait for it to fulfill before moving to the next task. | ||
Rejected promises don't cause the queue to stop executing tasks, but are reported in the `error` event (see below). | ||
@@ -66,7 +66,7 @@ | ||
for every task pushed to the queue, and passing it (to the task function) as the last argument. The task can then query the token's `cancelled` property to check if it | ||
has been cancelled: | ||
has been cancelled. The `Promise` returned by `push` is extended with a `cancel` method so that individual tasks can be cancelled. | ||
```js | ||
var queue = new SequentialTaskQueue(); | ||
var ct = queue.push(token => { | ||
var task = queue.push(token => { | ||
return new Promise((resolve, reject) => { | ||
@@ -84,3 +84,3 @@ setTimeout(resolve, 100); | ||
setTimeout(() => { | ||
ct.cancel(); | ||
task.cancel(); | ||
}, 50); | ||
@@ -91,10 +91,9 @@ ``` | ||
When cancelling the current task, the queue will immediately schedule the next one. | ||
When cancelling the current task, the queue will immediately schedule the next one, without waiting for the task to finish. | ||
It is the task's responsibility to abort when the cancellation token is set, thus avoiding invalid application state. | ||
Remember, the primary goal of the task queue is to run asynchronous tasks in a predictable order, without concurrency. | ||
The basic assumption is that if a task has been cancelled, it will not mutate the application state. | ||
When a task is cancelled, the corresponding `Promise` is rejected with the cancellation reason, regardless of where the task currently is in the execution chain (running, scheduled or queued). | ||
## Timeouts | ||
Tasks can be pushed into the queue with a timeout, after which the queue will cancel the task (the timer starts when the task is run). | ||
Tasks can be pushed into the queue with a timeout, after which the queue will cancel the task (the timer starts when the task is run, not when queued). | ||
The timeout value is supplied to `push` in the second argument, which is interpreted as an options object for the task: | ||
@@ -182,3 +181,3 @@ | ||
`SequentialTaskQueue` implements the `on`, `off` and `once` methods of node's `EventEmitter` pattern. | ||
`SequentialTaskQueue` implements the `on`, `removeListener` (`off`) and `once` methods of node's `EventEmitter` pattern. | ||
@@ -198,2 +197,12 @@ The following events are defined: | ||
The `timeout` event is emitted when a task is cancelled due to an expired timeout. The event is emitted before calling `cancel` on the task's cancellation token. | ||
The `timeout` event is emitted when a task is cancelled due to an expired timeout. The event is emitted before calling `cancel` on the task's cancellation token. | ||
--- | ||
## Changelog | ||
### 1.2.0 | ||
`SequentialTaskQueue.push` now returns a `Promise`. Earlier versions only returned a cancellation token. | ||
--- | ||
This file was generated using [gulp-template](http://github.com/sindresorhus/gulp-template) and [snip-text](http://github.com/BalassaMarton/snip-text) |
201
23576
19
417