fastq
Advanced tools
Comparing version 1.10.1 to 1.11.0
10
bench.js
@@ -5,2 +5,5 @@ 'use strict' | ||
var fastqueue = require('./')(worker, 1) | ||
var { promisify } = require('util') | ||
var immediate = promisify(setImmediate) | ||
var qPromise = require('./').promise(immediate, 1) | ||
var async = require('async') | ||
@@ -50,2 +53,6 @@ var neo = require('neo-async') | ||
function benchFastQPromise (done) { | ||
qPromise.push(42).then(function () { done() }, done) | ||
} | ||
function runBench (done) { | ||
@@ -56,3 +63,4 @@ async.eachSeries([ | ||
benchNeoQueue, | ||
benchAsyncQueue | ||
benchAsyncQueue, | ||
benchFastQPromise | ||
], bench, done) | ||
@@ -59,0 +67,0 @@ } |
@@ -6,2 +6,3 @@ declare function fastq<C, T = any, R = any>(context: C, worker: fastq.worker<C, T, R>, concurrency: number): fastq.queue<T, R> | ||
type worker<C, T = any, R = any> = (this: C, task: T, cb: fastq.done<R>) => void | ||
type asyncWorker<C, T = any, R = any> = (this: C, task: T) => Promise<R> | ||
type done<R = any> = (err: Error | null, result?: R) => void | ||
@@ -26,4 +27,11 @@ type errorHandler<T = any> = (err: Error, task: T) => void | ||
} | ||
interface queueAsPromised<T = any, R = any> extends queue<T, R> { | ||
push(task: T): Promise<R> | ||
} | ||
function promise<C, T = any, R = any>(context: C, worker: fastq.asyncWorker<C, T, R>, concurrency: number): fastq.queueAsPromised<T, R> | ||
function promise<C, T = any, R = any>(worker: fastq.asyncWorker<C, T, R>, concurrency: number): fastq.queueAsPromised<T, R> | ||
} | ||
export = fastq |
{ | ||
"name": "fastq", | ||
"version": "1.10.1", | ||
"version": "1.11.0", | ||
"description": "Fast, in memory work queue", | ||
@@ -8,4 +8,4 @@ "main": "queue.js", | ||
"lint": "standard --verbose | snazzy", | ||
"unit": "nyc --lines 100 --branches 100 --functions 100 --check-coverage --reporter=text tape test/test.js", | ||
"coverage": "nyc --reporter=html --reporter=cobertura --reporter=text tape test/test.js", | ||
"unit": "nyc --lines 100 --branches 100 --functions 100 --check-coverage --reporter=text tape test/test.js test/promise.js", | ||
"coverage": "nyc --reporter=html --reporter=cobertura --reporter=text tape test/test.js test/promise.js", | ||
"test:report": "npm run lint && npm run unit:report", | ||
@@ -12,0 +12,0 @@ "test": "npm run lint && npm run unit && npm run typescript", |
50
queue.js
@@ -203,2 +203,52 @@ 'use strict' | ||
function queueAsPromised (context, worker, concurrency) { | ||
if (typeof context === 'function') { | ||
concurrency = worker | ||
worker = context | ||
context = null | ||
} | ||
function asyncWrapper (arg, cb) { | ||
worker.call(this, arg) | ||
.then(function (res) { | ||
cb(null, res) | ||
}, cb) | ||
} | ||
var queue = fastqueue(context, asyncWrapper, concurrency) | ||
var pushCb = queue.push | ||
var unshiftCb = queue.unshift | ||
queue.push = push | ||
queue.unshift = unshift | ||
return queue | ||
function push (value) { | ||
return new Promise(function (resolve, reject) { | ||
pushCb(value, function (err, result) { | ||
if (err) { | ||
reject(err) | ||
return | ||
} | ||
resolve(result) | ||
}) | ||
}) | ||
} | ||
function unshift (value) { | ||
return new Promise(function (resolve, reject) { | ||
unshiftCb(value, function (err, result) { | ||
if (err) { | ||
reject(err) | ||
return | ||
} | ||
resolve(result) | ||
}) | ||
}) | ||
} | ||
} | ||
module.exports = fastqueue | ||
module.exports.promise = queueAsPromised |
@@ -50,2 +50,20 @@ # fastq | ||
or | ||
```js | ||
var queue = require('fastq').promise(worker, 1) | ||
async function worker (arg) { | ||
return 42 * 2 | ||
} | ||
async function run () { | ||
const result = await queue.push(42) | ||
console.log('the result is', result) | ||
}) | ||
} | ||
run() | ||
``` | ||
### Setting this | ||
@@ -88,2 +106,3 @@ | ||
* <a href="#saturated"><code>queue#<b>saturated</b></code></a> | ||
* <a href="#promise"><code>fastqueue.promise()</code></a> | ||
@@ -201,2 +220,31 @@ ------------------------------------------------------- | ||
------------------------------------------------------- | ||
<a name="promise"></a> | ||
### fastqueue.promise([that], worker(arg), concurrency) | ||
Creates a new queue with `Promise` apis. It also offers all the methods | ||
and properties of the object returned by [`fastqueue`](#fastqueue) with the modified | ||
[`push`](#pushPromise) and [`unshift`](#unshiftPromise) methods. | ||
Node v10+ is required to use the promisified version. | ||
Arguments: | ||
* `that`, optional context of the `worker` function. | ||
* `worker`, worker function, it would be called with `that` as `this`, | ||
if that is specified. It MUST return a `Promise`. | ||
* `concurrency`, number of concurrent tasks that could be executed in | ||
parallel. | ||
<a name="pushPromise"></a> | ||
#### queue.push(task) => Promise | ||
Add a task at the end of the queue. The returned `Promise` will be fulfilled | ||
when the task is processed. | ||
<a name="unshiftPromise"></a> | ||
#### queue.unshift(task) => Promise | ||
Add a task at the beginning of the queue. The returned `Promise` will be fulfilled | ||
when the task is processed. | ||
## License | ||
@@ -203,0 +251,0 @@ |
import * as fastq from '../' | ||
import { promise as queueAsPromised } from '../' | ||
@@ -68,1 +69,14 @@ // Basic example | ||
} | ||
const queue2 = queueAsPromised(asyncWorker, 1) | ||
async function asyncWorker(task: any) { | ||
return 'hello ' + task | ||
} | ||
async function run () { | ||
await queue.push(42) | ||
await queue.unshift(42) | ||
} | ||
run() |
33687
12
901
256