@mangosteen/rate-limiter
Advanced tools
Comparing version 1.1.0 to 1.1.1
@@ -11,2 +11,4 @@ "use strict"; | ||
const models_1 = require("./models"); | ||
const cancellationError = {}; | ||
function noop() { } | ||
/** Rate limiter scheduling tasks using priority and FIFO queue. */ | ||
@@ -31,5 +33,6 @@ class PrioritizedFifoRateLimiter { | ||
const promise = new Promise((resolve, reject) => { | ||
this._addAwaiter(priority, { tokenCount, resolve, reject }); | ||
const newAwaiter = { tokenCount, resolve, reject }; | ||
this._addAwaiter(priority, newAwaiter); | ||
cancel = () => { | ||
if (this._removeFirstAwaiter(priority, awaiter => awaiter.reject === reject)) { | ||
if (this._removeFirstAwaiter(priority, awaiter => awaiter === newAwaiter)) { | ||
this._recreateWatcher(); | ||
@@ -52,8 +55,7 @@ reject(new models_1.CanceledError()); | ||
// Get previous watcher | ||
const prevWatcher = this._watcher; | ||
let prevWatcher = this._watcher; | ||
// Create a new promise, used to signal watcher cancellation | ||
let isCanceled = false; | ||
let rejectCancelPromise; | ||
let rejectCancel; | ||
const cancelPromise = new Promise((_resolve, reject) => { | ||
rejectCancelPromise = () => reject(new models_1.CanceledError()); | ||
rejectCancel = reject; | ||
}); | ||
@@ -71,4 +73,4 @@ // Create watcher promise, used to propagate errors and signal completion | ||
cancel: () => { | ||
isCanceled = true; | ||
rejectCancelPromise(); | ||
// Avoid error allocation to ease memory pressure | ||
rejectCancel(cancellationError); | ||
}, | ||
@@ -79,19 +81,31 @@ }; | ||
if (prevWatcher) { | ||
// Cancel previous watcher | ||
prevWatcher.cancel(); | ||
// Wait for the watcher to finish | ||
try { | ||
// Cancel previous watcher | ||
prevWatcher.cancel(); | ||
// Wait for the watcher to finish | ||
await prevWatcher.promise; | ||
} | ||
catch (error) { | ||
if (!(error instanceof models_1.CanceledError)) { | ||
if (error !== cancellationError && !(error instanceof models_1.CanceledError)) { | ||
throw error; | ||
} | ||
} | ||
finally { | ||
// Release the captured reference so it can be garbage-collected | ||
prevWatcher = undefined; | ||
} | ||
} | ||
// Create a stagger delay | ||
const { promise: delayPromise, cancel: cancelDelay } = this._delay(this._lastConsumed + this._minStaggerTime - just_performance_1.performance.now()); | ||
// Stagger awaiters | ||
await Promise.race([ | ||
cancelPromise, | ||
this._delay(this._lastConsumed + this._minStaggerTime - just_performance_1.performance.now()), | ||
]); | ||
try { | ||
await Promise.race([ | ||
cancelPromise, | ||
delayPromise, | ||
]); | ||
} | ||
finally { | ||
// Aid GC by removing the timer | ||
cancelDelay(); | ||
} | ||
// Find the first highest-priority awaiter | ||
@@ -112,3 +126,3 @@ const { key: priority, value: queue } = this._awaiters.end; | ||
catch (error) { | ||
if (isCanceled && error instanceof models_1.CanceledError) { | ||
if (error === cancellationError || error instanceof models_1.CanceledError) { | ||
cancelConsume(); | ||
@@ -133,3 +147,3 @@ await consumePromise; | ||
// Propagate the error to awaiters | ||
this._rejectAllAwaiters(error); | ||
this._rejectAllAwaiters(error === cancellationError ? new models_1.CanceledError() : error); | ||
} | ||
@@ -141,3 +155,2 @@ rejectWatcher(error); | ||
_rejectAllAwaiters(error) { | ||
let someoneRejected = false; | ||
// Clear the collection first | ||
@@ -149,6 +162,4 @@ const awaiters = this._awaiters; | ||
queue.peekAt(i).reject(error); | ||
someoneRejected = true; | ||
} | ||
}); | ||
return someoneRejected; | ||
} | ||
@@ -165,27 +176,39 @@ _addAwaiter(priority, awaiter) { | ||
_removeFirstAwaiter(priority, predicate) { | ||
const queueIter = this._awaiters.find(priority); | ||
const queue = queueIter.value; | ||
let success = false; | ||
const iter = this._awaiters.find(priority); | ||
const queue = iter === null || iter === void 0 ? void 0 : iter.value; | ||
if (queue) { | ||
for (let i = 0; i < queue.length; i++) { | ||
if (predicate(queue.peekAt(i))) { | ||
queue.removeOne(i); | ||
success = true; | ||
break; | ||
try { | ||
for (let i = 0; i < queue.length; i++) { | ||
if (predicate(queue.peekAt(i))) { | ||
queue.removeOne(i); | ||
return true; | ||
} | ||
} | ||
} | ||
// Remove empty queue | ||
if (queue.length <= 0) { | ||
this._awaiters = queueIter.remove(); | ||
finally { | ||
// Remove empty queue | ||
if (queue.length <= 0) { | ||
this._awaiters = iter.remove(); | ||
} | ||
} | ||
} | ||
return success; | ||
return false; | ||
} | ||
_delay(ms) { | ||
let promise; | ||
let cancel; | ||
if (ms <= 0) { | ||
return Promise.resolve(); | ||
promise = Promise.resolve(); | ||
cancel = noop; | ||
} | ||
else { | ||
return new Promise(resolve => this._scheduler.setTimeout(resolve, ms)); | ||
promise = new Promise((resolve, reject) => { | ||
const t = this._scheduler.setTimeout(resolve, ms); | ||
cancel = () => { | ||
this._scheduler.clearTimeout(t); | ||
reject(cancellationError); | ||
}; | ||
}); | ||
} | ||
return { promise, cancel }; | ||
} | ||
@@ -192,0 +215,0 @@ } |
{ | ||
"name": "@mangosteen/rate-limiter", | ||
"version": "1.1.0", | ||
"version": "1.1.1", | ||
"description": "A rate limiter that helps you limit your client from making excessive API requests.", | ||
@@ -16,3 +16,4 @@ "main": "dist/index.js", | ||
"prepublish": "npm run build", | ||
"start": "ts-node ./src/test.ts" | ||
"start": "ts-node ./src/test.ts", | ||
"start:debugger": "node --inspect -r ts-node/register ./src/test.ts" | ||
}, | ||
@@ -19,0 +20,0 @@ "repository": { |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
73962
904