Comparing version 0.3.0 to 0.3.1
@@ -1,2 +0,2 @@ | ||
'use strict'; /*jslint es5: true, node: true, indent: 2 */ /* globals setImmediate */ | ||
'use strict'; /*jslint es5: true, node: true, indent: 2 */ | ||
var util = require('util'); | ||
@@ -7,2 +7,36 @@ | ||
var Queue = module.exports = function(concurrency, worker) { | ||
/** `new Queue(concurrency, worker)` | ||
Queue applies the `worker` function to each piece of data in a | ||
stream.Readable({objectMode: true}). Only `concurrency` tasks will be | ||
processed at a time. | ||
* `concurrency` Number Maximum number of data to process at one time. | ||
* `worker` Function _Asynchronous_ task processor with signature: | ||
- `function(task_data, callback) { ... }` | ||
where `callback` has the signature: | ||
- `function(err, result)` | ||
If the provided `worker` is not _always_ | ||
[async](http://nodejs.org/api/process.html#process_process_nexttick_callback), | ||
your queue might cut short and stop reading before it has reached the end | ||
of the source stream. | ||
For example: | ||
new Queue(10, function(task_obj, callback) { | ||
setTimeout(function() { | ||
var task_json = JSON.stringify(task_obj); | ||
callback(null, 'Task json is ' + json.length + 'characters.\n'); | ||
}, Math.random() * 500); | ||
}); | ||
Queue inherits `streaming.Mapper`, which entails a fully | ||
`{objectMode: true}` stream experience: | ||
* `_writableState.objectMode`: true | ||
* `_readableState.objectMode`: true | ||
*/ | ||
Mapper.call(this); | ||
@@ -26,10 +60,9 @@ // concurrency is an integer | ||
this._in_progress++; | ||
// this._worker must be async. Enforce with setImmediate | ||
setImmediate(function() { | ||
self._worker(chunk, function(err, result) { | ||
self._in_progress--; | ||
// order is not guaranteed | ||
self.push(result); | ||
self._tick(err); | ||
}); | ||
// this._worker MUST be async. We could enforce this with setImmediate but | ||
// look, if you don't follow the rules, you might get some weird behavior. | ||
this._worker(chunk, function(err, result) { | ||
self._in_progress--; | ||
// order is not guaranteed | ||
self.push(result); | ||
self._tick(err); | ||
}); | ||
@@ -36,0 +69,0 @@ |
{ | ||
"name": "streaming", | ||
"version": "0.3.0", | ||
"version": "0.3.1", | ||
"description": "Transforms and other streaming helpers", | ||
@@ -5,0 +5,0 @@ "keywords": [ |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
12965
13
276