Comparing version 0.4.0 to 0.5.0
# Changelog | ||
- 0.5.0 (2012/03/04) | ||
- New Feature - `nue.as` function is introduced to map asynchronous callback arguments to next function ones. | ||
- Change - in a step function, `this.async` accepts arguments mapping definition to pass callback arguments to a next function. | ||
- Change - in a step function, `this.forEach` function is removed and `this.asyncEach` is added instead. | ||
- Change - in a step function, `this.args` property is removed. | ||
- 0.4.0 (2012/02/27) | ||
@@ -4,0 +10,0 @@ - New Feature - `nue.parallel` is available to execute some steps in parallel. |
var flow = require('../index').flow; | ||
var as = require('../index').as; | ||
var fs = require('fs'); | ||
@@ -8,4 +9,4 @@ | ||
this.data.file2 = file2; | ||
fs.readFile(file1, 'utf8', this.async()); | ||
fs.readFile(file2, 'utf8', this.async()); | ||
fs.readFile(file1, 'utf8', this.async(as(1))); | ||
fs.readFile(file2, 'utf8', this.async(as(1))); | ||
}, | ||
@@ -12,0 +13,0 @@ function concat(data1, data2) { |
var flow = require('../index').flow; | ||
var as = require('../index').as; | ||
var fs = require('fs'); | ||
@@ -8,4 +9,4 @@ | ||
if (!file2) this.endWith(new Error('file2 is illegal.')); | ||
fs.readFile(file1, 'utf8', this.async()); | ||
fs.readFile(file2, 'utf8', this.async()); | ||
fs.readFile(file1, 'utf8', this.async(as(1))); | ||
fs.readFile(file2, 'utf8', this.async(as(1))); | ||
}, | ||
@@ -12,0 +13,0 @@ function concat(data1, data2) { |
var flow = require('../index').flow; | ||
var as = require('../index').as; | ||
var fs = require('fs'); | ||
var myFlow = flow( | ||
var myFlow = flow('myFlow')( | ||
function readFiles(file1, file2) { | ||
fs.readFile(file1, 'utf8', this.async()); | ||
fs.readFile(file2, 'utf8', this.async()); | ||
fs.readFile(file1, 'utf8', this.async(as(1))); | ||
fs.readFile(file2, 'utf8', this.async(as(1))); | ||
}, | ||
@@ -9,0 +10,0 @@ function concat(data1, data2) { |
var flow = require('../index').flow; | ||
var as = require('../index').as; | ||
var fs = require('fs'); | ||
@@ -6,3 +7,3 @@ | ||
function readFile(file) { | ||
fs.readFile(file, 'utf8', this.async()); | ||
fs.readFile(file, 'utf8', this.async(as(1))); | ||
} | ||
@@ -16,5 +17,5 @@ ); | ||
subFlow, | ||
function end(data) { | ||
function end(result) { | ||
if (this.err) throw this.err; | ||
console.log(data); | ||
console.log(result); | ||
console.log('done'); | ||
@@ -21,0 +22,0 @@ this.next(); |
var flow = require('../index').flow; | ||
var as = require('../index').as; | ||
var fs = require('fs'); | ||
@@ -6,3 +7,3 @@ | ||
function readFile(file) { | ||
fs.readFile(file, 'utf8', this.async()); | ||
fs.readFile(file, 'utf8', this.async(as(1))); | ||
} | ||
@@ -13,4 +14,4 @@ ); | ||
function start() { | ||
this.exec(subFlow, 'file1', this.async()); | ||
this.exec(subFlow, 'file2', this.async()); | ||
this.exec(subFlow, 'file1', this.async(as(1))); | ||
this.exec(subFlow, 'file2', this.async(as(1))); | ||
}, | ||
@@ -17,0 +18,0 @@ function end(data1, data2) { |
var flow = require('../index').flow; | ||
function sleep(flow, ms) { | ||
setTimeout(function () { | ||
flow.next(); | ||
}, ms); | ||
function sleep(ms) { | ||
setTimeout(this.async(), ms); | ||
} | ||
@@ -12,3 +10,3 @@ | ||
console.log('wait... ' + new Date()); | ||
sleep(this, 1000); | ||
this.exec(sleep, 1000, this.async()); | ||
}, | ||
@@ -15,0 +13,0 @@ function end() { |
var flow = require('../../index').flow; | ||
var as = require('../../index').as; | ||
var fs = require('fs'); | ||
@@ -6,4 +7,4 @@ | ||
function (file1, file2) { | ||
fs.readFile(file1, 'utf8', this.async()); | ||
fs.readFile(file2, 'utf8', this.async()); | ||
fs.readFile(file1, 'utf8', this.async(as(1))); | ||
fs.readFile(file2, 'utf8', this.async(as(1))); | ||
}, | ||
@@ -16,3 +17,3 @@ function (data1, data2) { | ||
function read(file) { | ||
fs.readFile(file, 'utf8', this.async()); | ||
fs.readFile(file, 'utf8', this.async(as(1))); | ||
} | ||
@@ -19,0 +20,0 @@ |
490
lib/nue.js
'use strict'; | ||
exports.name = 'nue'; | ||
exports.version = '0.4.0'; | ||
exports.version = '0.5.0'; | ||
exports.flow = flow; | ||
exports.parallel = parallel; | ||
exports.as = as; | ||
var EventEmitter = require('events').EventEmitter; | ||
var util = require('util'); | ||
@@ -23,80 +23,60 @@ var assert = require('assert'); | ||
} | ||
} | ||
function normalizeArgs(args) { | ||
if (args.length === 1 && Array.isArray(args[0])) { | ||
return args[0]; | ||
function prepareFlow(flowName, functions) { | ||
function startFlow() { | ||
var isTopLevel = !(this instanceof ContextBase); | ||
var flow = { | ||
flowName: flowName, | ||
context: this, | ||
isTopLevel: isTopLevel, | ||
history: isTopLevel ? [] : this.history, | ||
data: {}, | ||
err: null | ||
}; | ||
var steps = functions.length > 0 ? functions : [function () { this.next(); }]; | ||
var head = chainSteps(flow, steps); | ||
head.apply(this, arguments); | ||
} | ||
startFlow.stepName = flowName; | ||
return startFlow; | ||
} | ||
return Array.prototype.slice.call(args); | ||
} | ||
function prepareFlow(flowName, functions) { | ||
function startFlow() { | ||
var steps = functions.length > 0 ? functions : function () { this.next(); }; | ||
var args = Array.prototype.slice.call(arguments); | ||
runFlow(flowName, chainSteps(steps), args, this); | ||
} | ||
startFlow.stepName = flowName; | ||
return startFlow; | ||
} | ||
function chainSteps(flow, steps) { | ||
var len = steps.length; | ||
var lastIndex = len - 1; | ||
var last = makeLastStep(steps[lastIndex], lastIndex); | ||
var isThrown = false; | ||
if (len === 1) { | ||
return last; | ||
} | ||
function runFlow(flowName, steps, callerArgs, callerContext) { | ||
var flow = new Flow(flowName, callerArgs, callerContext, steps[steps.length - 1]); | ||
if (steps.length > 1) { | ||
runStep(flow, steps[0]); | ||
} else { | ||
runLastStep(flow); | ||
} | ||
} | ||
return steps.reduceRight(function chain(prev, curr, i) { | ||
return function step() { | ||
var next = i === len - 2 ? last : prev; | ||
var context = new StepContext(flow, curr.stepName || curr.name, i, next, last); | ||
try { | ||
curr.apply(context, arguments); | ||
} catch (e) { | ||
if (isThrown) { | ||
throw e; | ||
} | ||
StepContext.prototype.endWith.call(context, e); | ||
} | ||
} | ||
}); | ||
function chainSteps(steps) { | ||
steps = steps.map(function (step, i) { | ||
assert.equal(typeof step, 'function', 'Each argument must be a function.'); | ||
var fn = function applyStep() { | ||
step.apply(this, arguments); | ||
}; | ||
fn.stepName = step.stepName || step.name; | ||
fn.stepIndex = i; | ||
attachEmitter(fn); | ||
return fn; | ||
}); | ||
var len = steps.length - 1; | ||
for (var i = 0; i < len; i++) { | ||
(function chain(i, step, next) { | ||
if (i < len - 1) { | ||
step.events.once('done', function startStep(flow) { | ||
runStep(flow, next); | ||
}); | ||
} else { | ||
step.events.once('done', function startLastStep(flow) { | ||
runLastStep(flow); | ||
}); | ||
function makeLastStep(fn, index) { | ||
return function lastStep() { | ||
var context = new LastStepContext(flow, fn.stepName || fn.name, index); | ||
try { | ||
fn.apply(context, arguments); | ||
} catch (e) { | ||
isThrown = true; | ||
throw e; | ||
} | ||
} | ||
}(i, steps[i], steps[i + 1])); | ||
} | ||
return steps; | ||
} | ||
function runStep(flow, step) { | ||
var context = new StepContext(flow, step); | ||
try { | ||
step.apply(context, flow.args); | ||
} catch (e) { | ||
if (flow.isErrThrown) { | ||
throw e; | ||
} | ||
StepContext.prototype.endWith.call(context, e); | ||
} | ||
} | ||
function runLastStep(flow) { | ||
var context = new LastStepContext(flow); | ||
try { | ||
flow.lastStep.apply(context, flow.args); | ||
} catch (e) { | ||
flow.isErrThrown = true; | ||
throw e; | ||
} | ||
} | ||
function parallel() { | ||
@@ -111,32 +91,34 @@ if (arguments.length === 1 && typeof arguments[0] === 'string') { | ||
} | ||
} | ||
function prepareParallel(flowName, functions) { | ||
function startParallel() { | ||
assert.ok(this instanceof ContextBase, 'A `parallel` must be inside a flow or another parallel.'); | ||
var args = Array.prototype.slice.call(arguments); | ||
runParallel(flowName, functions, args, this); | ||
function prepareParallel(flowName, functions) { | ||
function startParallel() { | ||
assert.ok(this instanceof ContextBase, 'A `parallel` must be inside a flow or another parallel.'); | ||
var self = this; | ||
var args = Array.prototype.slice.call(arguments); | ||
self.asyncEach(1)(functions, function (fn, group) { | ||
assert.equal(typeof fn, 'function', 'Each argument must be a function.'); | ||
var callback = group.async(); | ||
var end = function end() { | ||
callback.apply(null, [this.err].concat(Array.prototype.slice.call(arguments))); | ||
this.err = null; | ||
}; | ||
end.stepName = (fn.stepName || fn.name) + '_end'; | ||
flow(flowName)(fn, end).apply(self, args); | ||
}); | ||
process.nextTick(self.async(Async.SIGNAL_UNLOCK)); | ||
} | ||
startParallel.stepName = flowName; | ||
return startParallel; | ||
} | ||
startParallel.stepName = flowName; | ||
return startParallel; | ||
} | ||
function runParallel(flowName, functions, callerArgs, callerContext) { | ||
callerContext.forEach(1)(functions, function (fn) { | ||
assert.equal(typeof fn, 'function', 'Each argument must be a function.'); | ||
var callback = callerContext.async(); | ||
var end = function end() { | ||
callback.apply(null, [this.err].concat(this.args)); | ||
this.err = null; | ||
}; | ||
end.stepName = (fn.stepName || fn.name) + '_end'; | ||
flow(flowName)(fn, end).apply(callerContext, callerArgs); | ||
}); | ||
function as(index) { | ||
return new As(index); | ||
} | ||
function attachEmitter(target) { | ||
if (!target.__nue__) { | ||
target.events = new EventEmitter(); | ||
target.__nue__ = true; | ||
function normalizeArgs(args) { | ||
if (args.length === 1 && Array.isArray(args[0])) { | ||
return args[0]; | ||
} | ||
return Array.prototype.slice.call(args); | ||
} | ||
@@ -147,106 +129,139 @@ | ||
function Flow(flowName, args, callerContext, lastStep){ | ||
this.flowName = flowName; | ||
this.args = args; | ||
this.callerContext = callerContext; | ||
this.lastStep = lastStep; | ||
this.isTopLevel = !(callerContext instanceof ContextBase); | ||
this.history = this.isTopLevel ? [] : callerContext.history; | ||
this.data = {}; | ||
this.err = null; | ||
this.isErrThrown = false; | ||
function As(index) { | ||
this.index = index; | ||
} | ||
Flow.prototype.exit = function exit(err, args) { | ||
if (this.isTopLevel) { | ||
if (err) { | ||
throw new NueUnhandledError(err); | ||
function Async(lock, next, endWith) { | ||
this.lock = lock; | ||
this.next = next; | ||
this.endWith = endWith; | ||
this.index = 0; | ||
this.pending = 0; | ||
this.isCanceled = false; | ||
this.results = []; | ||
} | ||
Async.SIGNAL_UNLOCK = {}; | ||
Async.prototype.makeCallback = function makeCallback(mapping, loopIndex, groupAsyncIndex) { | ||
assert.ok(typeof mapping === 'object' || mapping === undefined, 'The argument `mapping` must be an object if specified.'); | ||
this.pending++; | ||
var isFirst = this.index === 0; | ||
var index = mapping === Async.SIGNAL_UNLOCK ? -1 : this.index++; | ||
if (isFirst && !this.lock) { | ||
this.lock = true; | ||
process.nextTick(this.makeCallback(Async.SIGNAL_UNLOCK)); | ||
} | ||
var self = this; | ||
var asyncCallback = function asyncCallback(err) { | ||
self.pending--; | ||
if (!self.isCanceled) { | ||
if (isErrorHandleRequire(mapping) && err) { | ||
self.isCanceled = true; | ||
self.endWith.call(null, err, index, mapping, loopIndex, groupAsyncIndex); | ||
} else { | ||
if (mapping === Async.SIGNAL_UNLOCK) { | ||
self.lock = false; | ||
} else { | ||
self.results[index] = mapping ? mapArguments(mapping, arguments) : Array.prototype.slice.call(arguments, 1); | ||
} | ||
if (self.pending === 0 && !self.lock) { | ||
self.next.apply(null, self.results); | ||
} | ||
} | ||
} | ||
} else { | ||
if (err) { | ||
this.callerContext.endWith.call(this.callerContext, err); | ||
} else { | ||
this.callerContext.next.apply(this.callerContext, args); | ||
}; | ||
asyncCallback.index = index; | ||
return asyncCallback; | ||
function isErrorHandleRequire(mapping) { | ||
if (!mapping) { | ||
return true; | ||
} | ||
if (mapping instanceof As) { | ||
return mapping.index !== 0; | ||
} | ||
return Object.keys(mapping).every(function (key) { | ||
var value = mapping[key]; | ||
if (value instanceof As) { | ||
return value.index !== 0; | ||
} | ||
return true; | ||
}); | ||
} | ||
function mapArguments(mapping, args) { | ||
if (mapping instanceof As) { | ||
return args[mapping.index]; | ||
} | ||
return Object.keys(mapping).reduce(function (result, key) { | ||
var value = mapping[key]; | ||
if (value instanceof As) { | ||
result[key] = args[value.index]; | ||
} else { | ||
result[key] = value; | ||
} | ||
return result; | ||
}, {}); | ||
} | ||
}; | ||
function ContextBase(flow, step) { | ||
function ContextBase(flow, name, index, next, last) { | ||
this._flow = flow; | ||
this._step = step; | ||
this._stepIndex = step.stepIndex; | ||
this._asyncIndex = 0; | ||
this._asyncCallCount = 0; | ||
this._isAsyncCanceled = false; | ||
this._results = []; | ||
this._wait = false; | ||
this._next = next; | ||
this._last = last; | ||
this.err = flow.err; | ||
this.args = flow.args; | ||
this.data = flow.data; | ||
this.flowName = flow.flowName; | ||
this.stepName = step.stepName; | ||
this.stepName = name; | ||
this.stepIndex = index; | ||
this.history = flow.history; | ||
this.history.push(new HistoryEntry(this.flowName, this.stepName, this._stepIndex)); | ||
this.next = this.next.bind(this); | ||
this.end = this.end.bind(this); | ||
this.endWith = this.endWith.bind(this); | ||
this.async = this.async.bind(this); | ||
this.forEach = this.forEach.bind(this); | ||
this.exec = this.exec.bind(this); | ||
this.history.push(new HistoryEntry(flow.flowName, name, index)); | ||
var self = this; | ||
this._asyncObj = new Async(false, this.next.bind(this), function endWith(err, asyncIndex, mapping) { | ||
self.endWith.call(self, new NueAsyncError(err, self.flowName, self.stepName, self.stepIndex, asyncIndex, mapping)); | ||
}); | ||
} | ||
ContextBase.SIGNAL_WAIT_CANCEL = {}; | ||
ContextBase.DEFAULT_CONCURRENCY = 10; | ||
ContextBase.prototype.async = function async() { | ||
this._asyncCallCount++; | ||
var args = Array.prototype.slice.call(arguments); | ||
var self = this; | ||
return (function makeCallback(args, asyncIndex) { | ||
function callback(err) { | ||
self._asyncCallCount--; | ||
if (!self._isAsyncCanceled) { | ||
if (err) { | ||
self._isAsyncCanceled = true; | ||
self.endWith.call(self, new NueAsyncError(err, self.flowName, self.stepName, self._stepIndex, asyncIndex)); | ||
} else { | ||
if (args[0] === ContextBase.SIGNAL_WAIT_CANCEL) { | ||
self._wait = false; | ||
} else { | ||
self._results[asyncIndex] = args.concat(Array.prototype.slice.call(arguments, 1)); | ||
} | ||
if (self._asyncCallCount === 0 && !self._wait) { | ||
self.next.apply(self, self._results.map(function (array) { | ||
switch(array.length) { | ||
case 0: return undefined; | ||
case 1: return array[0]; | ||
default: return array; | ||
} | ||
})); | ||
} | ||
} | ||
} | ||
} | ||
return callback; | ||
}(args, this._asyncIndex++)); | ||
ContextBase.prototype._disable = function _disable() { | ||
this.next = noop; | ||
this.end = noop; | ||
this.endWith = noop; | ||
}; | ||
ContextBase.prototype.forEach = function forEach() { | ||
ContextBase.prototype.async = function async(mapping) { | ||
return this._asyncObj.makeCallback.call(this._asyncObj, mapping); | ||
}; | ||
ContextBase.prototype.asyncEach = function asyncEach() { | ||
var self = this; | ||
var callback = self.async(); | ||
var next = function next() { | ||
var args = Array.prototype.slice.call(arguments); | ||
return callback.apply(self, [null].concat(flattenArray(args))); | ||
}; | ||
var endWith = function endWith(cause, asyncIndex, mapping, loopIndex, asyncGroupIndex) { | ||
var err = new NueGroupAsyncError(cause, self.flowName, self.stepName, self.stepIndex, callback.index, mapping, loopIndex, asyncGroupIndex); | ||
return self.endWith.call(self, err); | ||
}; | ||
var async = new Async(true, next, endWith); | ||
if (arguments.length === 1 && typeof arguments[0] === 'number') { | ||
var concurrency = arguments[0]; | ||
return function () { | ||
waitAndConsume(concurrency, arguments[0], arguments[1]); | ||
validateAndConsume(concurrency, arguments[0], arguments[1]); | ||
}; | ||
} else { | ||
waitAndConsume(ContextBase.DEFAULT_CONCURRENCY, arguments[0], arguments[1]); | ||
validateAndConsume(ContextBase.DEFAULT_CONCURRENCY, arguments[0], arguments[1]); | ||
} | ||
function waitAndConsume(concurrency, array, worker) { | ||
function validateAndConsume(concurrency, array, worker) { | ||
assert.ok(Array.isArray(array), 'An argument `array` must be an array.'); | ||
assert.equal(typeof worker, 'function', 'An argument `worker` must be a function.'); | ||
self._wait = true; | ||
consume(concurrency, array, worker, 0); | ||
process.nextTick(function consumeFirst() { | ||
consume(concurrency, array, worker, 0); | ||
}); | ||
} | ||
@@ -257,6 +272,14 @@ | ||
for (var i = 0; i < concurrency && index < len; i++, index++) { | ||
worker.call(self, array[index], index, array); | ||
(function callWorker(index) { | ||
var count = 0; | ||
var group = { | ||
async: function (mapping) { | ||
return async.makeCallback.call(async, mapping, index, count++); | ||
} | ||
}; | ||
worker.call(null, array[index], group, index, array); | ||
}(index)); | ||
} | ||
if (index === len) { | ||
process.nextTick(self.async(ContextBase.SIGNAL_WAIT_CANCEL)); | ||
process.nextTick(async.makeCallback(Async.SIGNAL_UNLOCK)); | ||
} else { | ||
@@ -268,2 +291,16 @@ process.nextTick(function consumeNext() { | ||
} | ||
function flattenArray(array) { | ||
var results = []; | ||
array.forEach(function (array2) { | ||
if (Array.isArray(array2)) { | ||
array2.forEach(function (e) { | ||
results.push(e); | ||
}); | ||
} else { | ||
results.push(array2); | ||
} | ||
}); | ||
return results; | ||
} | ||
}; | ||
@@ -279,3 +316,3 @@ | ||
var end = function end() { | ||
callback.apply(null, [this.err].concat(this.args)); | ||
callback.apply(null, [this.err].concat(Array.prototype.slice.call(arguments))); | ||
this.err = null; | ||
@@ -290,4 +327,4 @@ }; | ||
function StepContext(flow, step) { | ||
ContextBase.call(this, flow, step); | ||
function StepContext(flow, name, index, next, last) { | ||
ContextBase.call(this, flow, name, index, next, last); | ||
} | ||
@@ -297,25 +334,22 @@ util.inherits(StepContext, ContextBase); | ||
StepContext.prototype.next = function next() { | ||
disableContextMethods(this); | ||
this._disable(); | ||
this._flow.err = null; | ||
this._flow.args = Array.prototype.slice.call(arguments); | ||
this._step.events.emit('done', this._flow); | ||
this._next.apply(null, arguments); | ||
}; | ||
StepContext.prototype.end = function end() { | ||
disableContextMethods(this); | ||
this._disable(); | ||
this._flow.err = null; | ||
this._flow.args = Array.prototype.slice.call(arguments); | ||
runLastStep(this._flow); | ||
this._last.apply(null, arguments); | ||
}; | ||
StepContext.prototype.endWith = function endWith(err) { | ||
disableContextMethods(this); | ||
this._disable(); | ||
this._flow.err = err; | ||
this._flow.args = []; | ||
runLastStep(this._flow); | ||
this._last.call(null); | ||
}; | ||
function LastStepContext(flow) { | ||
ContextBase.call(this, flow, flow.lastStep); | ||
function LastStepContext(flow, name, index) { | ||
ContextBase.call(this, flow, name, index); | ||
} | ||
@@ -325,24 +359,31 @@ util.inherits(LastStepContext, ContextBase); | ||
LastStepContext.prototype.next = function next() { | ||
disableContextMethods(this); | ||
this._flow.exit(this.err, Array.prototype.slice.call(arguments)); | ||
this._disable(); | ||
this._exit(Array.prototype.slice.call(arguments)); | ||
}; | ||
LastStepContext.prototype.end = function end() { | ||
disableContextMethods(this); | ||
this._flow.exit(this.err, Array.prototype.slice.call(arguments)); | ||
this._disable(); | ||
this._exit(Array.prototype.slice.call(arguments)); | ||
}; | ||
LastStepContext.prototype.endWith = function endWith(err) { | ||
disableContextMethods(this); | ||
this._flow.exit(err, []); | ||
this._disable(); | ||
this.err = err; | ||
this._exit([]); | ||
}; | ||
LastStepContext.prototype._exit = function _exit(args) { | ||
if (this._flow.isTopLevel) { | ||
if (this.err) { | ||
throw new NueUnhandledError(this.err, this.flowName, this.stepName); | ||
} | ||
} else { | ||
if (this.err) { | ||
this._flow.context.endWith.call(this._flow.context, this.err); | ||
} else { | ||
this._flow.context.next.apply(this._flow.context, args); | ||
} | ||
} | ||
}; | ||
function disableContextMethods(context) { | ||
context.next = noop; | ||
context.end = noop; | ||
context.endWith = noop; | ||
} | ||
function HistoryEntry(flowName, stepName, stepIndex) { | ||
@@ -361,3 +402,3 @@ this.flowName = flowName; | ||
function NueAsyncError(cause, flowName, stepName, stepIndex, asyncIndex) { | ||
function NueAsyncError(cause, flowName, stepName, stepIndex, asyncIndex, mapping) { | ||
this.cause = cause; | ||
@@ -369,7 +410,9 @@ this.flowName = flowName; | ||
this.name = 'NueAsyncError'; | ||
this.message = "An error in an async callback: " + | ||
"flowName = '" + flowName + "', stepName = '" + stepName + | ||
"', stepIndex = " + stepIndex + ', asyncIndex = ' + asyncIndex + '\n' + | ||
'+----- BEGIN CAUSE STACK -----+\n' + cause.stack + '\n' + | ||
'+----- END CAUSE STACK -----+'; | ||
this.message = "An error in an async callback:" + | ||
"\nflowName = '" + flowName + "'," + | ||
"\nstepName = '" + stepName + "'," + | ||
'\nstepIndex = ' + stepIndex + ', ' + | ||
'\nasyncIndex = ' + asyncIndex + ', ' + | ||
'\nmapping = ' + util.inspect(mapping, false, null) + | ||
'\n+----- BEGIN CAUSE STACK -----+\n' + indent(cause.stack) + '\n+----- END CAUSE STACK -----+'; | ||
Error.captureStackTrace(this, NueAsyncError); | ||
@@ -380,11 +423,40 @@ } | ||
function NueUnhandledError(cause) { | ||
function NueGroupAsyncError(cause, flowName, stepName, stepIndex, asyncIndex, mapping, loopIndex, groupAsyncIndex) { | ||
this.cause = cause; | ||
this.flowName = flowName; | ||
this.stepName = stepName; | ||
this.stepIndex = stepIndex; | ||
this.asyncIndex = asyncIndex; | ||
this.loopIndex = loopIndex; | ||
this.groupAsyncIndex = groupAsyncIndex; | ||
this.name = 'NueGroupAsyncError'; | ||
this.message = "An error in an async callback:" + | ||
"\nflowName = '" + flowName + "'," + | ||
"\nstepName = '" + stepName + "'," + | ||
'\nstepIndex = ' + stepIndex + ', ' + | ||
'\nasyncIndex = ' + asyncIndex + ', ' + | ||
'\nloopIndex = ' + loopIndex + ', ' + | ||
'\ngroupAsyncIndex = ' + groupAsyncIndex + ', ' + | ||
'\nmapping = ' + util.inspect(mapping, false, null) + | ||
'\n+----- BEGIN CAUSE STACK -----+\n' + indent(cause.stack) + '\n+----- END CAUSE STACK -----+'; | ||
Error.captureStackTrace(this, NueGroupAsyncError); | ||
} | ||
util.inherits(NueGroupAsyncError, Error); | ||
function NueUnhandledError(cause, flowName, stepName) { | ||
this.cause = cause; | ||
this.name = 'NueUnhandledError'; | ||
this.message = 'The error must be handled in a last step. ' + | ||
'To indicate error handling completed, set null to `this.err` before exiting the last step.\n' + | ||
'+----- BEGIN CAUSE STACK -----+\n' + cause.stack + '\n' + | ||
'+----- END CAUSE STACK -----+'; | ||
this.message = 'The error must be handled in a last step of flow. ' + | ||
'To indicate error handling completed, set null to `this.err` before exiting the last step:' + | ||
"\nflowName = '" + flowName + "'," + | ||
"\nstepName = '" + stepName + "'" + | ||
'\n+----- BEGIN CAUSE STACK -----+\n' + indent(cause.stack) + '\n+----- END CAUSE STACK -----+'; | ||
Error.captureStackTrace(this, NueUnhandledError); | ||
} | ||
util.inherits(NueUnhandledError, Error); | ||
function indent(text) { | ||
return ' ' + text.replace(/\n/g, '\n '); | ||
} |
@@ -12,3 +12,3 @@ { | ||
}, | ||
"version" : "0.4.0" | ||
"version" : "0.5.0" | ||
} |
263
README.md
@@ -16,8 +16,9 @@ nue — An async control-flow library | ||
var flow = require('nue').flow; | ||
var as = require('as').as; | ||
var fs = require('fs'); | ||
var myFlow = flow( | ||
var myFlow = flow('myFlow')( | ||
function readFiles(file1, file2) { | ||
fs.readFile(file1, 'utf8', this.async()); | ||
fs.readFile(file2, 'utf8', this.async()); | ||
fs.readFile(file1, 'utf8', this.async(as(1))); | ||
fs.readFile(file2, 'utf8', this.async(as(1))); | ||
}, | ||
@@ -55,11 +56,14 @@ function concat(data1, data2) { | ||
* `async`: async([Object values...]) -> Function | ||
* A function to accept parameters for a next step and return a callback. | ||
* `async`: async([Object mapping]) -> Function | ||
* A function to accept an argument mapping definition for a next step and return a callback. | ||
`async` can be called many times, but all calls are done in same tick. | ||
And all callbacks `async` returns must be called absolutely. | ||
* `forEach`: forEach(Array array, Function(element, elementIndex, traversedArray)) -> Void | ||
* A function to execute a provided function once per array element in parallel. | ||
* `asyncEach`: asyncEach(Array array, Function(element, group, elementIndex, traversedArray)) -> Void | ||
* A function to execute a provided function once per array element asynchronously. | ||
The `group` has a `async` function to accept an argument mapping definition and return a callback. | ||
* `forEach`: forEach(Number concurrency) -> Function | ||
* A function to accept a concurrency number and return another `forEach` function which | ||
executes a provided function once per array element in prallel with the specified cuncurrency. | ||
* `asyncEach`: asyncEach(Number concurrency) -> Function | ||
* A function to accept a concurrency number and return another `asyncEach` function which | ||
executes a provided function once per array element asynchronously with the specified cuncurrency. | ||
If you use another `forEach` function directly, default concurrency 10 is used. | ||
@@ -80,10 +84,7 @@ | ||
* `args`: Array | ||
* An array equivalent to `arguments` for a step except this is real Array. | ||
* `flowName`: String | ||
* flow name. | ||
* A flow name. | ||
* `stepName`: String | ||
* step name. | ||
* A step name. | ||
@@ -105,3 +106,3 @@ * `history`: Array | ||
* `flowName`: Required. Flow name to be used for debug. | ||
* `flowName`: Required. A flow name to be used for debug. | ||
@@ -123,10 +124,134 @@ ### parallel([Function steps...]) -> Function | ||
* `flowName`: Required. Flow name to be used for debug. | ||
* `flowName`: Required. A flow name to be used for debug. | ||
### as(Number index) -> Object | ||
> Arguments | ||
* `index`: Required. An index to map an asynchronous callback argument to a next step argument. | ||
If the index is zero, an error handling is skipped. | ||
## Arguments Passing Between Functions | ||
Arguments are passed with `this.next` or `this.async`. | ||
### Synchronously | ||
```js | ||
var flow = require('nue').flow; | ||
var myFlow = flow('myFlow')( | ||
function concat(s1, s2) { | ||
var length = s1.length + s2.length | ||
this.next(s1, s2, length); | ||
}, | ||
function end(s1, s2, length) { | ||
if (this.err) throw this.err; | ||
console.log(s1 + ' + ' + s2 + ' -> ' + length); // file1 + file2 -> 10 | ||
console.log('done'); | ||
this.next(); | ||
} | ||
); | ||
myFlow('file1', 'file2'); | ||
``` | ||
### Asynchronously | ||
To pass asynchronous call results to a next function, arguments mapping definition is necessary. | ||
The function `as` accepts an index to specify a callback argument and returns arguments mapping definition. | ||
The function `this.async` accepts the mapping definition and return a callback. | ||
When all callbacks are completed, the next function is called with specific arguments. | ||
```js | ||
var flow = require('nue').flow; | ||
var as = require('as').as; | ||
var fs = require('fs'); | ||
var myFlow = flow('myFlow')( | ||
function readFiles(file1, file2) { | ||
fs.readFile(file1, 'utf8', this.async(as(1))); | ||
fs.readFile(file2, 'utf8', this.async(as(1))); | ||
}, | ||
function end(data1, data2) { | ||
if (this.err) throw this.err; | ||
console.log(data1 + data2); // FILE1FILE2 | ||
console.log('done'); | ||
this.next(); | ||
} | ||
); | ||
myFlow('file1', 'file2'); | ||
``` | ||
Arguments mapping definition can contain arbitrary values. | ||
```js | ||
var flow = require('nue').flow; | ||
var as = require('as').as; | ||
var fs = require('fs'); | ||
var myFlow = flow('myFlow')( | ||
function readFiles(file1, file2) { | ||
fs.readFile(file1, 'utf8', this.async({name: file1, data: as(1)})); | ||
fs.readFile(file2, 'utf8', this.async({name: file2, data: as(1)})); | ||
}, | ||
function end(f1, f2) { | ||
if (this.err) throw this.err; | ||
console.log(f1.name + ' and ' + f2.name + ' have been read.'); // file1 and file2 have been read. | ||
console.log(f1.data + f2.data); // FILE1FILE2 | ||
console.log('done'); | ||
this.next(); | ||
} | ||
); | ||
myFlow('file1', 'file2'); | ||
``` | ||
## Asynchronous Loop | ||
`this.asyncEach` executes a provided function once per array element asynchronously. | ||
By default, the number of concurrency is 10. | ||
```js | ||
var flow = require('nue').flow; | ||
var as = require('as').as; | ||
var fs = require('fs'); | ||
var myFlow = flow('myFlow')( | ||
function readFiles(files) { | ||
this.asyncEach(files, function (file, group) { | ||
fs.readFile(file, 'utf8', group.async({name: file, data: as(1)})); | ||
}); | ||
}, | ||
function end(files) { | ||
if (this.err) throw this.err; | ||
var names = files.map(function (f) { return f.name; }); | ||
var contents = files.map(function (f) { return f.data}); | ||
console.log(names.join(' and ') + ' have been read.'); // file1 and file2 have been read. | ||
console.log(contents.join('')); // FILE1FILE2 | ||
this.next(); | ||
} | ||
); | ||
myFlow(['file1', 'file2']); | ||
``` | ||
To change the number of concurrency, specify the number as below. | ||
```js | ||
function readFiles(files) { | ||
this.asyncEach(5)(files, function (file, group) { | ||
... | ||
}); | ||
}, | ||
``` | ||
## Flow Nesting | ||
A flow can be nested. | ||
A flow is composable. So it can be nested. | ||
```js | ||
var flow = require('nue').flow; | ||
var as = require('as').as; | ||
var fs = require('fs'); | ||
@@ -136,3 +261,3 @@ | ||
function readFile(file) { | ||
fs.readFile(file, 'utf8', this.async()); | ||
fs.readFile(file, 'utf8', this.async(as(1))); | ||
} | ||
@@ -146,5 +271,5 @@ ); | ||
subFlow, | ||
function end(data) { | ||
function end(result) { | ||
if (this.err) throw this.err; | ||
console.log(data); | ||
console.log(result); | ||
console.log('done'); | ||
@@ -158,8 +283,9 @@ this.next(); | ||
## Flow Nesting and Asynchronous Execution | ||
## Asynchronous Flow Execution | ||
A nested sub-flow can be executed asynchronously. | ||
A flow can be executed asynchronously. | ||
```js | ||
var flow = require('nue').flow; | ||
var as = require('as').as; | ||
var fs = require('fs'); | ||
@@ -169,3 +295,3 @@ | ||
function readFile(file) { | ||
fs.readFile(file, 'utf8', this.async()); | ||
fs.readFile(file, 'utf8', this.async(as(1))); | ||
} | ||
@@ -176,4 +302,4 @@ ); | ||
function start() { | ||
this.exec(subFlow, 'file1', this.async()); | ||
this.exec(subFlow, 'file2', this.async()); | ||
this.exec(subFlow, 'file1', this.async(as(1))); | ||
this.exec(subFlow, 'file2', this.async(as(1))); | ||
}, | ||
@@ -193,3 +319,3 @@ function end(data1, data2) { | ||
In Following example, the flow `par1-1` and `par1-2` are executed in parallel. | ||
In following example, the flow `par1-1` and `par1-2` are executed in parallel. | ||
@@ -225,23 +351,26 @@ ```js | ||
## Arguments Passing Between Functions | ||
Arguments to a parallel flow are passed to every forked functions. | ||
Parallel flow results are passed to a next funtion as an array. | ||
The array contains the results of forked functions. | ||
arguments are passed with `this.next` or `this.async`. | ||
```js | ||
var flow = require('nue').flow; | ||
var fs = require('fs'); | ||
var parallel = require('nue').parallel; | ||
var myFlow = flow('myFlow')( | ||
function readFiles(file1, file2) { | ||
fs.readFile(file1, 'utf8', this.async(file1)); | ||
fs.readFile(file2, 'utf8', this.async(file2)); | ||
var myFlow = flow('main')( | ||
function start() { | ||
this.next(10, 20); | ||
}, | ||
function concat(data1, data2) { | ||
console.log(data1[0] + ' and ' + data2[0] + ' have been read.'); | ||
this.next(data1[1] + data2[1]); | ||
}, | ||
function end(data) { | ||
parallel('parallel')( | ||
function add(x, y) { | ||
this.next(x + y); | ||
}, | ||
function sub(x, y) { | ||
this.next(x - y); | ||
} | ||
), | ||
function end(results) { | ||
if (this.err) throw this.err; | ||
console.log(data); | ||
console.log('done'); | ||
console.log('add result: ' + results[0]); // add result: 30 | ||
console.log('sub result: ' + results[1]); // sub result: -10 | ||
this.next(); | ||
@@ -251,34 +380,5 @@ } | ||
myFlow('file1', 'file2'); | ||
myFlow(); | ||
``` | ||
`this.async` can be called in loop. | ||
Following example produces same results with above example. | ||
```js | ||
var flow = require('nue').flow; | ||
var fs = require('fs'); | ||
var myFlow = flow('myFlow')( | ||
function readFiles(files) { | ||
process.nextTick(this.async(files)); | ||
this.forEach(files, function (file) { | ||
fs.readFile(file, 'utf8', this.async()); | ||
}); | ||
}, | ||
function concat(files) { | ||
console.log(files.join(' and ') + ' have been read.'); | ||
this.next(this.args.slice(1).join('')); | ||
}, | ||
function end(data) { | ||
if (this.err) throw this.err; | ||
console.log(data); | ||
console.log('done'); | ||
this.next(); | ||
} | ||
); | ||
myFlow(['file1', 'file2']); | ||
``` | ||
## Data Sharing Between Functions | ||
@@ -292,2 +392,3 @@ | ||
var flow = require('nue').flow; | ||
var as = require('as').as; | ||
var fs = require('fs'); | ||
@@ -299,4 +400,4 @@ | ||
this.data.file2 = file2; | ||
fs.readFile(file1, 'utf8', this.async()); | ||
fs.readFile(file2, 'utf8', this.async()); | ||
fs.readFile(file1, 'utf8', this.async(as(1))); | ||
fs.readFile(file2, 'utf8', this.async(as(1))); | ||
}, | ||
@@ -320,6 +421,7 @@ function concat(data1, data2) { | ||
In a last step in a flow, `this.err` represents an error which is thrown with `throw`, passed to `this.endWith` or passed to an async callback as first argument. | ||
To indicate error handling completion, you must assign `null` to `this.err`. | ||
To indicate error handling is completed, you must assign `null` to `this.err`. | ||
```js | ||
var flow = require('nue').flow; | ||
var as = require('as').as; | ||
var fs = require('fs'); | ||
@@ -331,4 +433,4 @@ | ||
if (!file2) this.endWith(new Error('file2 is illegal.')); | ||
fs.readFile(file1, 'utf8', this.async()); | ||
fs.readFile(file2, 'utf8', this.async()); | ||
fs.readFile(file1, 'utf8', this.async(as(1))); | ||
fs.readFile(file2, 'utf8', this.async(as(1))); | ||
}, | ||
@@ -361,2 +463,3 @@ function concat(data1, data2) { | ||
var flow = require('nue').flow; | ||
var as = require('as').as; | ||
var fs = require('fs'); | ||
@@ -366,4 +469,4 @@ | ||
function (file1, file2) { | ||
fs.readFile(file1, 'utf8', this.async()); | ||
fs.readFile(file2, 'utf8', this.async()); | ||
fs.readFile(file1, 'utf8', this.async(as(1))); | ||
fs.readFile(file2, 'utf8', this.async(as(1))); | ||
}, | ||
@@ -376,3 +479,3 @@ function (data1, data2) { | ||
function read(file) { | ||
fs.readFile(file, 'utf8', this.async()); | ||
fs.readFile(file, 'utf8', this.async(as(1))); | ||
} | ||
@@ -379,0 +482,0 @@ |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
37209
24
694
491
4311
9