a-promise-queue
Advanced tools
Comparing version 1.0.0 to 1.0.1
81
index.js
class PromiseQueue { | ||
constructor(cb, PromiseFlavour) { | ||
this.flushing = false; | ||
this.Promise = PromiseFlavour || Promise; | ||
@@ -9,34 +10,2 @@ this.promise = null; | ||
promised(fn) { | ||
try { | ||
return this.Promise.resolve(fn()); | ||
} catch (e) { | ||
return this.Promise.reject(e); | ||
} | ||
} | ||
next() { | ||
if (this.length > 0) { | ||
const nextFn = this.queue.shift(); | ||
return this.wrap(nextFn.fn, nextFn.resolve, nextFn.reject, nextFn.attempts); | ||
} | ||
this.promise = null; | ||
if (typeof this.callback === 'function') this.callback(); | ||
return true; | ||
} | ||
wrap(fn, resolve, reject, attempts) { | ||
let retryCount = 0; | ||
const retry = (err) => { | ||
if (retryCount >= attempts) { | ||
throw err || new Error('Unknown Error'); | ||
} | ||
retryCount += 1; | ||
return this.promised(fn).catch(retry); | ||
}; | ||
return retry() | ||
.then((r) => { resolve(r); }, (e) => { reject(e); }) | ||
.then(() => this.next()); | ||
} | ||
add(fn, opts) { | ||
@@ -47,3 +16,3 @@ if (typeof fn !== 'function') throw new Error('PromiseQueue.add() expects a function as an argument.'); | ||
if (this.promise === null) { | ||
this.promise = this.wrap(fn, resolve, reject, attempts); | ||
this.promise = this._wrap(fn, resolve, reject, attempts); | ||
} else { | ||
@@ -77,7 +46,53 @@ // shift order based on priority | ||
flush() { | ||
const concurrent = [this.promise, ...this.queue.map(queued => this._promised(queued.fn).then(queued.resolve, queued.reject))]; | ||
this.flushing = true; | ||
this.queue = []; | ||
const flushed = () => { | ||
this.flushing = false; | ||
this._next(); | ||
} | ||
return this.Promise.all(concurrent) | ||
.then(flushed, flushed); | ||
} | ||
get length() { | ||
return this.queue.length; | ||
} | ||
_promised(fn) { | ||
try { | ||
return this.Promise.resolve(fn()); | ||
} catch (e) { | ||
return this.Promise.reject(e); | ||
} | ||
} | ||
_next() { | ||
if (this.flushing) return; | ||
if (this.length > 0) { | ||
const nextFn = this.queue.shift(); | ||
return this._wrap(nextFn.fn, nextFn.resolve, nextFn.reject, nextFn.attempts); | ||
} | ||
this.promise = null; | ||
if (typeof this.callback === 'function') this.callback(); | ||
return true; | ||
} | ||
_wrap(fn, resolve, reject, attempts) { | ||
let retryCount = 0; | ||
const retry = (err) => { | ||
if (retryCount >= attempts) { | ||
throw err || new Error('Unknown Error'); | ||
} | ||
retryCount += 1; | ||
return this._promised(fn).catch(retry); | ||
}; | ||
return retry() | ||
.then((r) => { resolve(r); }, (e) => { reject(e); }) | ||
.then(() => this._next()); | ||
} | ||
} | ||
module.exports = PromiseQueue; |
{ | ||
"name": "a-promise-queue", | ||
"version": "1.0.0", | ||
"version": "1.0.1", | ||
"description": "A native es6 promise queue with optional retry attempts.", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
@@ -26,3 +26,3 @@ # a promise queue | ||
Returns number of promises waiting to be executed. | ||
+ `queue.add(Function generator, [Object options])` | ||
+ `var promise = queue.add(Function generator, [Object options])` | ||
Returns a promise which is resolved or rejected when the promise produced by the generator is eventually resolved. | ||
@@ -36,2 +36,6 @@ Example options: | ||
``` | ||
+ `var promise = queue.flush()` | ||
Runs all promises currently in the queue concurrently. | ||
Returns a promise which is resolved when all promises are finished. | ||
Any promises added after `.flush()` will execute after flush is complete. | ||
@@ -38,0 +42,0 @@ ## Example: |
21
test.js
@@ -98,2 +98,23 @@ const Promise = require('bluebird'); | ||
tape('.flush() causes all promises in queue to be run at once and promises added after run after flush is finished', (t) => { | ||
const queue = new PromiseQueue(); | ||
let counter = 0; | ||
let flushed = false; | ||
const count = () => { counter += 1; }; | ||
queue.add(() => Promise.delay(20).then(count)); | ||
queue.add(() => Promise.delay(20).then(count)); | ||
queue.add(() => Promise.delay(20).then(count)); | ||
queue.add(() => Promise.delay(20).then(count)); | ||
queue.flush().then(() => { | ||
flushed = true; | ||
t.same(counter, 4); | ||
}) | ||
queue.add(() => Promise.delay(20).then(count)) | ||
.then(() => { | ||
t.same(counter, 5); | ||
t.ok(flushed); | ||
t.end(); | ||
}); | ||
}) | ||
tape('is reusable', (t) => { | ||
@@ -100,0 +121,0 @@ let doneCounter = 0; |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
9928
205
53