Comparing version 0.0.7 to 0.0.8
452
lib/nue.js
'use strict'; | ||
// require | ||
var EventEmitter = require('events').EventEmitter; | ||
@@ -9,37 +7,74 @@ var util = require('util'); | ||
// exports | ||
exports.DEFAULT_BATCH_SIZE = 10; | ||
exports.name = 'nue'; | ||
exports.version = '0.0.7'; | ||
exports.batchSize = exports.DEFAULT_BATCH_SIZE; | ||
exports.version = '0.0.8'; | ||
exports.flow = flow; | ||
// variables | ||
function flow() { | ||
var functions = Array.prototype.slice.call(arguments); | ||
var deferred = function deferred() { | ||
var args = Array.prototype.slice.call(arguments); | ||
var tasks = functions.length > 0 ? functions : function () {this.next();}; | ||
runFlow(tasks, deferred, args, this); | ||
}; | ||
attachEmitter(deferred); | ||
return deferred; | ||
} | ||
var noop = function () {}; | ||
function runFlow(tasks, caller, callerArgs, callerContext) { | ||
var flow = { | ||
args: callerArgs, | ||
data: {}, | ||
err: undefined, | ||
lastStep: null | ||
}; | ||
var steps = makeSteps(tasks, caller, callerContext, flow); | ||
flow.lastStep = steps[steps.length - 1]; | ||
var context = steps.length > 1 | ||
? new StepContext(steps[0], flow) | ||
: new LastStepContext(flow); | ||
context.invoke(); | ||
} | ||
// functions | ||
function flow() { | ||
if (typeof arguments[0] === 'number') { | ||
var batchSize = arguments[0]; | ||
return function () { | ||
return flowDefer(batchSize, Array.prototype.slice.call(arguments)); | ||
function makeSteps(tasks, caller, callerContext, flow) { | ||
var steps = tasks.map(function (task) { | ||
var type = typeof task; | ||
if (type !== 'function') { | ||
throw new Error('The task is a not function. ' + type); | ||
} | ||
if (task.__nue__) { | ||
return task; | ||
} | ||
var deferred = function deferred() { | ||
task.apply(this, arguments); | ||
}; | ||
attachEmitter(deferred); | ||
return deferred; | ||
}); | ||
var len = steps.length - 1; | ||
for (var i = 0; i < len; i++) { | ||
(function chain(i, step, next) { | ||
if (i < len - 1) { | ||
step.events.once('done', function runNextStep() { | ||
var context = new StepContext(next, flow); | ||
context.invoke(); | ||
}); | ||
} else { | ||
step.events.once('done', function runLastStep() { | ||
var context = new LastStepContext(flow); | ||
context.invoke(); | ||
}); | ||
} | ||
}(i, steps[i], steps[i + 1])); | ||
} | ||
return flowDefer(null, Array.prototype.slice.call(arguments)); | ||
steps[steps.length - 1].events.once('done', function exitFlow() { | ||
if (callerContext instanceof ContextBase) { | ||
callerContext.next.apply(callerContext, flow.args); | ||
} else { | ||
caller.events.emit('done'); | ||
} | ||
}); | ||
return steps; | ||
} | ||
function flowDefer(batchSize, tasks) { | ||
var defer = function () { | ||
var args = Array.prototype.slice.call(arguments); | ||
var runner = new FlowRunner(batchSize, tasks, defer, args, this); | ||
runner.run(); | ||
}; | ||
attachEmitter(defer); | ||
return defer; | ||
} | ||
function attachEmitter(target) { | ||
@@ -50,4 +85,5 @@ target.events = new EventEmitter(); | ||
// classes | ||
function noop () {} | ||
function ContextBase() { | ||
@@ -57,4 +93,2 @@ this.next = this.next.bind(this); | ||
this.async = this.async.bind(this); | ||
this.queue = this.queue.bind(this); | ||
this.parallelQueue = this.parallelQueue.bind(this); | ||
this.asyncIndex = 0; | ||
@@ -70,3 +104,3 @@ this.asyncCallCount = 0; | ||
var self = this; | ||
return (function (args, index) { | ||
return (function makeCallback(args, index) { | ||
function flatten(array) { | ||
@@ -98,129 +132,9 @@ var results = []; | ||
ContextBase.prototype.queue = function (worker) { | ||
var self = this; | ||
var batchSize = arguments[0]; | ||
if (typeof batchSize === 'number') { | ||
return function (worker) { | ||
return new Queue(batchSize, worker, self) | ||
}; | ||
} | ||
return new Queue(this.batchSize, worker, self); | ||
}; | ||
ContextBase.prototype.parallelQueue = function (worker) { | ||
var self = this; | ||
var batchSize = arguments[0]; | ||
if (typeof batchSize === 'number') { | ||
return function (worker) { | ||
return new ParallelQueue(batchSize, worker, self) | ||
}; | ||
} | ||
return new ParallelQueue(this.batchSize, worker, self); | ||
}; | ||
function FlowRunner(batchSize, tasks, caller, callerArgs, callerContext) { | ||
this.batchSize = batchSize; | ||
this.tasks = tasks; | ||
this.caller = caller; | ||
this.callerArgs = callerArgs; | ||
this.callerContext = callerContext; | ||
} | ||
FlowRunner.prototype.endStep = function (emitter, flow) { | ||
if (flow.batchSize > 0 && flow.batchSize === flow.callCount) { | ||
flow.callCount = 0; | ||
process.nextTick(function () { | ||
emitter.emit('done', flow); | ||
}); | ||
} else { | ||
emitter.emit('done', flow); | ||
} | ||
}; | ||
FlowRunner.prototype.startStep = function (context, step, flow) { | ||
flow.callCount++; | ||
step.apply(context, flow.args); | ||
}; | ||
FlowRunner.prototype.makeSteps = function () { | ||
var steps = this.tasks.map(function (task, i) { | ||
var fn; | ||
if (typeof task !== 'function') { | ||
throw new Error('not function. ' + typeof task); | ||
} | ||
if (task.__nue__) { | ||
return task; | ||
} | ||
fn = function () { | ||
task.apply(this, arguments); | ||
}; | ||
attachEmitter(fn); | ||
return fn; | ||
}); | ||
var len = steps.length; | ||
var self = this; | ||
steps.forEach(function (step, i, steps) { | ||
var next = steps[i + 1]; | ||
if (i < len - 1) { | ||
(function (C) { | ||
step.events.once('done', function(flow) { | ||
self.startStep(new C(self, next, flow), next, flow); | ||
}); | ||
}(i < len - 2 ? StepContext : LastStepContext)); | ||
} | ||
}); | ||
if (steps.length === 0) { | ||
steps.push(function () { | ||
this.next(); | ||
}); | ||
attachEmitter(steps[0]); | ||
} | ||
return steps; | ||
}; | ||
FlowRunner.prototype.startFlow = function () { | ||
var steps = this.makeSteps(); | ||
var firstStep = steps[0]; | ||
var lastStep = steps[steps.length - 1]; | ||
var self = this; | ||
lastStep.events.once('done', function (flow) { | ||
if (self.callerContext instanceof ContextBase) { | ||
self.callerContext.next.apply(self.callerContext, flow.args); | ||
} else { | ||
self.caller.events.emit('done', flow); | ||
} | ||
}); | ||
var flow = { | ||
args: self.callerArgs, | ||
data: {}, | ||
err: undefined, | ||
lastStep: lastStep, | ||
batchSize: this.batchSize || exports.batchSize || exports.DEFAULT_BATCH_SIZE, | ||
callCount: 0 | ||
}; | ||
var context; | ||
if (firstStep === lastStep) { | ||
context = new LastStepContext(this, firstStep, flow) | ||
} else { | ||
context = new StepContext(this, firstStep, flow); | ||
} | ||
this.startStep(context, firstStep, flow); | ||
}; | ||
FlowRunner.prototype.run = function () { | ||
this.startFlow(); | ||
}; | ||
function StepContext(runner, step, flow) { | ||
function StepContext(step, flow) { | ||
ContextBase.call(this); | ||
this.runner = runner; | ||
this.step = step; | ||
this.flow = flow; | ||
this.batchSize = flow.batchSize; | ||
this.data = flow.data; | ||
this.err = flow.err; | ||
} | ||
util.inherits(StepContext, ContextBase); | ||
@@ -232,4 +146,3 @@ | ||
this.flow.args = Array.prototype.slice.call(arguments); | ||
this.flow.err = this.err; | ||
this.runner.endStep(this.step.events, this.flow); | ||
this.step.events.emit('done'); | ||
}; | ||
@@ -240,19 +153,23 @@ | ||
this.end = noop; | ||
this.flow.err = arguments[0]; | ||
this.flow.args = Array.prototype.slice.call(arguments, 1); | ||
this.flow.err = arguments[0]; | ||
this.flow.lastStep.apply(new LastStepContext(this.runner, this.step, this.flow), this.flow.args); | ||
var context= new LastStepContext(this.flow); | ||
context.invoke(); | ||
}; | ||
StepContext.prototype.invoke = function () { | ||
try { | ||
this.step.apply(this, this.flow.args); | ||
} catch (e) { | ||
StepContext.prototype.end.call(this, e); | ||
} | ||
}; | ||
function LastStepContext(runner, step, flow) { | ||
function LastStepContext(flow) { | ||
ContextBase.call(this); | ||
this.runner = runner; | ||
this.step = step; | ||
this.flow = flow; | ||
this.batchSize = flow.batchSize; | ||
this.data = flow.data; | ||
this.err = flow.err; | ||
this.isNextCalled = false; | ||
} | ||
util.inherits(LastStepContext, ContextBase); | ||
@@ -268,212 +185,13 @@ | ||
} | ||
this.isNextCalled = true; | ||
this.flow.err = this.err; | ||
this.flow.args = Array.prototype.slice.call(arguments); | ||
this.flow.err = this.err; | ||
this.runner.endStep(this.flow.lastStep.events, this.flow); | ||
this.flow.lastStep.events.emit('done'); | ||
}; | ||
LastStepContext.prototype.end = function () { | ||
throw new Error('not supported.'); | ||
throw new Error('This function is unsupported in the last step.'); | ||
}; | ||
function Queue(batchSize, worker, callerContext) { | ||
if (!(callerContext instanceof ContextBase)) { | ||
throw new Error('The context is illegal. The function is out of the flow.'); | ||
} | ||
this.batchSize = batchSize; | ||
this.worker = worker; | ||
this.callerContext = callerContext; | ||
this.values = []; | ||
this.results = []; | ||
this.isAddingCompleted = false; | ||
this.length = 0; | ||
this.index = 0; | ||
} | ||
Queue.prototype.push = function (value) { | ||
var self = this; | ||
if (this.isAddingCompleted) { | ||
throw new Error('This queue has already been frozen.'); | ||
} | ||
this.values.push(value); | ||
this.length++; | ||
if (!this.runner) { | ||
this.runner = new QueueRunner(this); | ||
process.nextTick(function () { | ||
self.runner.processValues(); | ||
}); | ||
} | ||
}; | ||
Queue.prototype.complete = function() { | ||
this.isAddingCompleted = true; | ||
}; | ||
function QueueRunner (queue) { | ||
this.queue = queue; | ||
} | ||
QueueRunner.prototype.runWorker = function (value, values) { | ||
this.queue.worker.call(new QueueContext(this, values), value); | ||
}; | ||
QueueRunner.prototype.processValues = function () { | ||
var queue = this.queue; | ||
var values = queue.batchSize > 0 ? queue.values.splice(0, queue.batchSize) : queue.values; | ||
var self = this; | ||
if (values.length === 0) { | ||
if (queue.isAddingCompleted) { | ||
queue.callerContext.next.call(queue.callerContext, queue.results); | ||
} else { | ||
process.nextTick(function () { | ||
self.processValues(); | ||
}); | ||
} | ||
} else { | ||
self.runWorker(values.shift(), values) | ||
} | ||
}; | ||
function QueueContext(runner, values) { | ||
ContextBase.call(this); | ||
this.runner = runner; | ||
this.values = values; | ||
this.queue = runner.queue; | ||
this.data = this.queue.data; | ||
this.index = this.queue.index; | ||
} | ||
util.inherits(QueueContext, ContextBase); | ||
QueueContext.prototype.next = function (result) { | ||
this.next = noop; | ||
this.end = noop; | ||
this.queue.results[this.index] = result; | ||
this.queue.index++; | ||
if (this.values.length) { | ||
this.runner.runWorker(this.values.shift(), this.values); | ||
} else { | ||
var self = this; | ||
process.nextTick(function () { | ||
self.runner.processValues(); | ||
}); | ||
} | ||
}; | ||
QueueContext.prototype.end = function () { | ||
this.next = noop; | ||
this.end = noop; | ||
this.queue.callerContext.end.apply(this.queue.callerContext, arguments); | ||
}; | ||
function ParallelQueue(batchSize, worker, callerContext) { | ||
if (!(callerContext instanceof ContextBase)) { | ||
throw new Error('The context is illegal. The function is out of the flow.'); | ||
} | ||
this.batchSize = batchSize; | ||
this.worker = worker; | ||
this.callerContext = callerContext; | ||
this.values = []; | ||
this.isAddingCompleted = false; | ||
this.isCanceled = false; | ||
this.length = 0; | ||
this.backlog = 0; | ||
this.results = []; | ||
this.runner = new ParallelQueueRunner(this); | ||
} | ||
ParallelQueue.SENTINEL = {}; | ||
ParallelQueue.prototype.push = function (value) { | ||
var self = this; | ||
if (this.isAddingCompleted) { | ||
throw new Error('This parallel queue has been already frozen.'); | ||
} | ||
if (this.isCanceled) { | ||
return; | ||
} | ||
this.values.push({index : this.length, value: value}); | ||
this.length++; | ||
this.backlog++; | ||
process.nextTick(function () { | ||
self.runner.processValues(); | ||
}); | ||
}; | ||
ParallelQueue.prototype.complete = function() { | ||
this.push(ParallelQueue.SENTINEL); | ||
this.isAddingCompleted = true; | ||
}; | ||
function ParallelQueueRunner(queue) { | ||
this.queue = queue; | ||
} | ||
ParallelQueueRunner.prototype.endParallelQueue = function () { | ||
var self = this; | ||
if (this.queue.backlog === 1) { | ||
this.queue.callerContext.next.call(this.queue.callerContext, this.queue.results); | ||
} else { | ||
process.nextTick(function () { | ||
self.endParallelQueue(); | ||
}); | ||
} | ||
}; | ||
ParallelQueueRunner.prototype.runWorker = function (value) { | ||
var context = new ParallelQueueContext(this, this.queue.batchSize, value.index); | ||
this.queue.worker.call(context, value.value); | ||
}; | ||
ParallelQueueRunner.prototype.processValues = function () { | ||
var value; | ||
var i; | ||
var queue = this.queue; | ||
var self = this; | ||
for (i = 0; queue.values.length && (queue.batchSize > 0 && i < queue.batchSize || queue.batchSize < 0); i++) { | ||
if (queue.isCanceled) { | ||
break; | ||
} | ||
value = queue.values.shift(); | ||
if (value.value === ParallelQueue.SENTINEL) { | ||
process.nextTick(function () { | ||
self.endParallelQueue(); | ||
}); | ||
} else { | ||
this.runWorker(value); | ||
} | ||
} | ||
}; | ||
function ParallelQueueContext(runner, batchSize, index) { | ||
ContextBase.call(this); | ||
this.runner = runner; | ||
this.batchSize = batchSize; | ||
this.index = index; | ||
this.queue = runner.queue; | ||
this.data = this.queue.data; | ||
} | ||
util.inherits(ParallelQueueContext, ContextBase); | ||
ParallelQueueContext.prototype.next = function (result) { | ||
this.next = noop; | ||
this.end = noop; | ||
this.queue.results[this.index] = result; | ||
this.queue.backlog--; | ||
}; | ||
ParallelQueueContext.prototype.end = function () { | ||
this.next = noop; | ||
this.end = noop; | ||
if (!this.queue.isCanceled) { | ||
this.queue.isCanceled = true; | ||
this.queue.callerContext.end.apply(this.queue.callerContext, arguments); | ||
} | ||
}; | ||
LastStepContext.prototype.invoke = function () { | ||
this.flow.lastStep.apply(this, this.flow.args); | ||
}; |
{ | ||
"name" : "nue", | ||
"description" : "An async control-flow library suited for the node event loop.", | ||
"description" : "An async control-flow library suited for node.js.", | ||
"keywords" : ["control-flow", "async"], | ||
@@ -12,3 +12,3 @@ "author" : "Toshihiro Nakamura <toshihiro.nakamura@gmail.com>", | ||
}, | ||
"version" : "0.0.7" | ||
"version" : "0.0.8" | ||
} |
131
README.md
nue — An async control-flow library | ||
=================================== | ||
nue is an async control-flow library suited for the node event loop. | ||
nue is an async control-flow library suited for node.js. | ||
@@ -20,4 +20,4 @@ ## Installing | ||
function (file1, file2) { | ||
fs.readFile(file1, 'utf-8', this.async()); | ||
fs.readFile(file2, 'utf-8', this.async()); | ||
fs.readFile(file1, 'utf8', this.async()); | ||
fs.readFile(file2, 'utf8', this.async()); | ||
}, | ||
@@ -28,2 +28,3 @@ function (data1, data2) { | ||
function (data) { | ||
if (this.err) throw this.err; | ||
console.log(data); | ||
@@ -41,3 +42,3 @@ console.log('done'); | ||
<a name="flow" /> | ||
### flow([Function tasks...]) -> Function | ||
### flow([Function functions...]) -> Function | ||
@@ -48,21 +49,115 @@ Return a function which represents the control-flow. | ||
* `tasks`: Optional. Tasks which are executed in series. | ||
* `functions`: Optional. Functions which are executed in series. | ||
> Context | ||
`this` context of the each task has following properties. | ||
`this` context of each function in a flow has following properties. | ||
* `next`: Function. A function to execute a next task. | ||
* `async`: Function. A function to accept parameters for a next task and return a callback. | ||
* `end`: Function. A function to execute a last task to end a control-flow. The first parameter is an error object. | ||
* `queue`: Function. A function to create a serial queue object. | ||
* `parallelQueue`: Function. A function to create a parallel queue object. | ||
* `next`: Function. A function to execute a next function immediately. | ||
* `async`: Function. A function to accept parameters for a next function and return a callback. | ||
* `end`: Function. A function to execute a last function to end a control-flow. The first parameter is an error object. | ||
* `data`: Object : A object to share arbitrary data among functions in a control-flow. | ||
In addition to the above ones, the context of the last task has a following property. | ||
In addition to the above ones, the context of the last function has a following property. | ||
* `err`: Object. An object represents an error which passed from the `end` function. | ||
## Flow Nesting | ||
A flow can be nested. | ||
```js | ||
var flow = require('nue').flow; | ||
var fs = require('fs'); | ||
var subFlow = flow( | ||
function (file) { | ||
fs.readFile(file, 'utf8', this.async()); | ||
} | ||
); | ||
var mainFlow = flow( | ||
function () { | ||
this.next('file1'); | ||
}, | ||
subFlow, | ||
function (data) { | ||
if (this.err) throw this.err; | ||
console.log(data); | ||
} | ||
); | ||
mainFlow(); | ||
``` | ||
## Arguments Passing Between Functions | ||
arguments are passed with `this.next` or `this.async`. | ||
```js | ||
var flow = require('nue').flow; | ||
var fs = require('fs'); | ||
var myFlow = flow( | ||
function (file1, file2) { | ||
fs.readFile(file1, 'utf8', this.async(file1)); | ||
fs.readFile(file2, 'utf8', this.async(file2)); | ||
}, | ||
function (file1, data1, file2, data2) { | ||
console.log(file1 + ' and ' + file2 + ' have been red.'); | ||
this.next(data1 + data2); | ||
}, | ||
function (data) { | ||
if (this.err) throw this.err; | ||
console.log(data); | ||
this.next(data); | ||
} | ||
); | ||
myFlow('file1', 'file2'); | ||
``` | ||
`this.async` can be called in loop. | ||
Below example produces same results with above example. | ||
```js | ||
var flow = require('nue').flow; | ||
var fs = require('fs'); | ||
var myFlow = flow( | ||
function (files) { | ||
files.forEach(function (file) { | ||
fs.readFile(file, 'utf8', this.async(file)); | ||
}.bind(this)); | ||
}, | ||
function () { | ||
var args = Array.prototype.slice.call(arguments); | ||
var files = []; | ||
var data = ''; | ||
args.forEach(function (arg, i) { | ||
if (i % 2 === 0) { | ||
files.push(arg); | ||
} else { | ||
data += arg; | ||
} | ||
}); | ||
console.log(files.join(' and ') + ' have been red.'); | ||
this.next(data); | ||
}, | ||
function (data) { | ||
if (this.err) throw this.err; | ||
console.log(data); | ||
this.next(data); | ||
} | ||
); | ||
myFlow(['file1', 'file2']); | ||
``` | ||
## Data Sharing Among Functions | ||
Each function in a flow can share data through `this.data`. | ||
`this.data` is shared in a same flow. | ||
A nesting flow and any nested flows can't share `this.data`. | ||
```js | ||
@@ -76,4 +171,4 @@ var flow = require('nue').flow; | ||
this.data.file2 = file2; | ||
fs.readFile(file1, 'utf-8', this.async()); | ||
fs.readFile(file2, 'utf-8', this.async()); | ||
fs.readFile(file1, 'utf8', this.async()); | ||
fs.readFile(file2, 'utf8', this.async()); | ||
}, | ||
@@ -84,2 +179,3 @@ function (data1, data2) { | ||
function (data) { | ||
if (this.err) throw this.err; | ||
console.log(data); | ||
@@ -96,2 +192,5 @@ console.log(this.data.file1 ' and ' + this.data.file2 ' are concatenated.'); | ||
In a last function in a flow, `this.err` represents an error thrown in the flow. | ||
To indicate error handling completion, you must assign `null` to `this.err`. | ||
```js | ||
@@ -103,4 +202,4 @@ var flow = require('nue').flow; | ||
function (file1, file2) { | ||
fs.readFile(file1, 'utf-8', this.async()); | ||
fs.readFile(file2, 'utf-8', this.async()); | ||
fs.readFile(file1, 'utf8', this.async()); | ||
fs.readFile(file2, 'utf8', this.async()); | ||
}, | ||
@@ -107,0 +206,0 @@ function (data1, data2) { |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
217
12266
222