Comparing version 0.5.0 to 0.6.0
# Changelog | ||
- 0.6.0 (2012/03/10) | ||
- New Feature - debugging is supported. `NODE_DEBUG=nue` is available. | ||
- Change - NueAsyncError is improved. Now NueAsyncError is a plain Error object. | ||
- Change - `history` property is removed from the step context. | ||
- Change - result array from `this.asyncEach` is not flatten. | ||
- Change - result array from `nue.parallel` is not flatten. | ||
- 0.5.0 (2012/03/04) | ||
@@ -4,0 +11,0 @@ - New Feature - `nue.as` function is introduced to map asynchronous callback arguments to next function ones. |
@@ -10,3 +10,3 @@ var flow = require('../index').flow; | ||
if (this.err) throw this.err; | ||
console.log(s1 + ' + ' + s2 + ' -> ' + length); // file1 + file2 -> 10 | ||
console.log(s1 + '.length + ' + s2 + '.length -> ' + length); // file1.length + file2.length -> 10 | ||
console.log('done'); | ||
@@ -13,0 +13,0 @@ this.next(); |
@@ -7,4 +7,2 @@ var flow = require('../index').flow; | ||
function readFiles(file1, file2) { | ||
if (!file1) this.endWith(new Error('file1 is illegal.')); | ||
if (!file2) this.endWith(new Error('file2 is illegal.')); | ||
fs.readFile(file1, 'utf8', this.async(as(1))); | ||
@@ -11,0 +9,0 @@ fs.readFile(file2, 'utf8', this.async(as(1))); |
@@ -5,19 +5,43 @@ var flow = require('../index').flow; | ||
var myFlow = flow('main')( | ||
function one() { this.next(); }, | ||
function two() { this.next(); }, | ||
function one() { | ||
console.log(this.stepName); | ||
this.next(); | ||
}, | ||
function two() { | ||
console.log(this.stepName); | ||
this.next(); | ||
}, | ||
parallel('par1')( | ||
flow('par1-1')( | ||
function three() { this.next(); }, | ||
function four() { this.next(); } | ||
function three() { | ||
console.log(this.stepName); | ||
this.next(); | ||
}, | ||
function four() { | ||
console.log(this.stepName); | ||
this.next(); | ||
} | ||
), | ||
flow('par1-2')( | ||
function five() { this.next(); }, | ||
function six() { this.next(); } | ||
function five() { | ||
console.log(this.stepName); | ||
this.next(); | ||
}, | ||
function six() { | ||
console.log(this.stepName); | ||
this.next(); | ||
} | ||
) | ||
), | ||
function seven() { this.next(); }, | ||
function eight() { this.next(); }, | ||
function seven() { | ||
console.log(this.stepName); | ||
this.next(); | ||
}, | ||
function eight() { | ||
console.log(this.stepName); | ||
this.next(); | ||
}, | ||
function allDone() { | ||
if (this.err) throw this.err; | ||
console.log(this.history); | ||
console.log(this.stepName); | ||
this.next(); | ||
@@ -24,0 +48,0 @@ } |
var flow = require('../index').flow; | ||
var as = require('../index').as; | ||
function sleep(ms) { | ||
setTimeout(this.async(), ms); | ||
setTimeout(this.async(as(-1)), ms); | ||
} | ||
@@ -10,5 +11,6 @@ | ||
console.log('wait... ' + new Date()); | ||
this.exec(sleep, 1000, this.async()); | ||
this.exec(sleep, 1000, this.async(as(1))); | ||
}, | ||
function end() { | ||
if (this.err) throw this.err; | ||
console.log('ok... ' + new Date()); | ||
@@ -15,0 +17,0 @@ this.next(); |
525
lib/nue.js
'use strict'; | ||
exports.name = 'nue'; | ||
exports.version = '0.5.0'; | ||
exports.version = '0.6.0'; | ||
exports.flow = flow; | ||
@@ -9,7 +9,7 @@ exports.parallel = parallel; | ||
var util = require('util'); | ||
var assert = require('assert'); | ||
var debugMode = process.env.NODE_DEBUG && /\bnue\b/.test(process.env.NODE_DEBUG); | ||
var flowId = 0; | ||
function flow() { | ||
@@ -19,16 +19,15 @@ if (arguments.length === 1 && typeof arguments[0] === 'string') { | ||
return function () { | ||
return prepareFlow(flowName, normalizeArgs(arguments)); | ||
return deferFlow(flowName, normalizeArgs(arguments)); | ||
}; | ||
} else { | ||
return prepareFlow('', normalizeArgs(arguments)); | ||
return deferFlow('', normalizeArgs(arguments)); | ||
} | ||
function prepareFlow(flowName, functions) { | ||
function deferFlow(flowName, functions) { | ||
function startFlow() { | ||
var isTopLevel = !(this instanceof ContextBase); | ||
var flow = { | ||
flowName: flowName, | ||
flowId: flowId++, | ||
context: this, | ||
isTopLevel: isTopLevel, | ||
history: isTopLevel ? [] : this.history, | ||
isTopLevel: !(this instanceof StepContext), | ||
data: {}, | ||
@@ -38,2 +37,9 @@ err: null | ||
var steps = functions.length > 0 ? functions : [function () { this.next(); }]; | ||
if (debugMode && flow.isTopLevel) { | ||
var location = getLocation(startFlow); | ||
debug('begin TOP_LEVEL_FLOW. flow: %s(%d), calledAt: %s:%d:%d, args:', | ||
flow.flowName || '<anonymous>', flow.flowId, | ||
location.fileName, location.lineNumber, location.columnNumber, | ||
Array.prototype.slice.call(arguments)); | ||
} | ||
var head = chainSteps(flow, steps); | ||
@@ -55,8 +61,9 @@ head.apply(this, arguments); | ||
return steps.reduceRight(function chain(prev, curr, i) { | ||
return steps.reduceRight(function chain(prev, curr, index) { | ||
assert.equal(typeof curr, 'function', 'Each argument for `flow` must be a function.'); | ||
return function step() { | ||
var next = i === len - 2 ? last : prev; | ||
var context = new StepContext(flow, curr.stepName || curr.name, i, next, last); | ||
var next = index === len - 2 ? last : prev; | ||
var context = new StepContext(flow, curr.stepName || curr.name, index, next, last); | ||
try { | ||
curr.apply(context, arguments); | ||
runStep(curr, context, arguments); | ||
} catch (e) { | ||
@@ -68,10 +75,11 @@ if (isThrown) { | ||
} | ||
} | ||
}; | ||
}); | ||
function makeLastStep(fn, index) { | ||
assert.equal(typeof fn, 'function', 'Each argument for `flow` must be a function.'); | ||
return function lastStep() { | ||
var context = new LastStepContext(flow, fn.stepName || fn.name, index); | ||
var context = new StepContext(flow, fn.stepName || fn.name, index, exit, exit); | ||
try { | ||
fn.apply(context, arguments); | ||
runStep(fn, context, arguments, true); | ||
} catch (e) { | ||
@@ -81,4 +89,41 @@ isThrown = true; | ||
} | ||
}; | ||
function exit() { | ||
if (flow.isTopLevel) { | ||
if (flow.err) { | ||
var message = 'An error must be handled in a last step of its flow. ' + | ||
'To indicate error handling completed, set null to `this.err` before exiting the last step. ' + | ||
'cause: ' + util.inspect(flow.err); | ||
var err = new Error(message); | ||
err.name = 'NueUnhandledError'; | ||
throw err; | ||
} | ||
} else { | ||
if (flow.err) { | ||
flow.context.endWith.call(flow.context, flow.err); | ||
} else { | ||
flow.context.next.apply(flow.context, arguments); | ||
} | ||
} | ||
} | ||
} | ||
function runStep(step, context, args) { | ||
if (debugMode) { | ||
debug('begin STEP. flow: %s(%d), step: %s(%d), args:', | ||
context.flowName || '<anonymous>', context._flow.flowId, | ||
context.stepName || '<anonymous>', context.stepIndex, Array.prototype.slice.call(args)); | ||
} | ||
try { | ||
step.apply(context, args); | ||
} catch (e) { | ||
if (debugMode) { | ||
debug('error STEP. flow: , step: %s(%d), err:', | ||
context.flowName || '<anonymous>', context._flow.flowId, | ||
context.stepName || '<anonymous>', context.stepIndex, e); | ||
} | ||
throw e; | ||
} | ||
} | ||
} | ||
@@ -91,19 +136,22 @@ } | ||
return function () { | ||
return prepareParallel(flowName, normalizeArgs(arguments)); | ||
return deferParallel(flowName, normalizeArgs(arguments)); | ||
}; | ||
} else { | ||
return prepareParallel('', normalizeArgs(arguments)); | ||
return deferParallel('', normalizeArgs(arguments)); | ||
} | ||
function prepareParallel(flowName, functions) { | ||
function deferParallel(flowName, functions) { | ||
function startParallel() { | ||
assert.ok(this instanceof ContextBase, 'A `parallel` must be inside a flow or another parallel.'); | ||
assert(this instanceof StepContext, 'A `parallel` must be inside a flow or another parallel.'); | ||
var self = this; | ||
var args = Array.prototype.slice.call(arguments); | ||
self.asyncEach(1)(functions, function (fn, group) { | ||
assert.equal(typeof fn, 'function', 'Each argument must be a function.'); | ||
var callback = group.async(); | ||
assert.equal(typeof fn, 'function', 'Each argument for `parallel` must be a function.'); | ||
var callback = group.async(As.ALL); | ||
var end = function end() { | ||
callback.apply(null, [this.err].concat(Array.prototype.slice.call(arguments))); | ||
this.err = null; | ||
if (this.err) { | ||
self.endWith(this.err); | ||
} else { | ||
callback.apply(null, arguments); | ||
} | ||
}; | ||
@@ -113,3 +161,2 @@ end.stepName = (fn.stepName || fn.name) + '_end'; | ||
}); | ||
process.nextTick(self.async(Async.SIGNAL_UNLOCK)); | ||
} | ||
@@ -126,11 +173,33 @@ startParallel.stepName = flowName; | ||
function normalizeArgs(args) { | ||
if (args.length === 1 && Array.isArray(args[0])) { | ||
return args[0]; | ||
return (args.length === 1 && Array.isArray(args[0])) ? args[0] : Array.prototype.slice.call(args); | ||
} | ||
function getLocation(target) { | ||
var originalPrepareStackTrace = Error.prepareStackTrace; | ||
var originalStackTraceLimit = Error.stackTraceLimit; | ||
Error.prepareStackTrace = prepareStackTrace; | ||
Error.stackTraceLimit = 1; | ||
var err = {}; | ||
Error.captureStackTrace(err, target); | ||
var stack = err.stack; | ||
Error.prepareStackTrace = originalPrepareStackTrace; | ||
Error.stackTraceLimit = originalStackTraceLimit; | ||
return { | ||
functionName: stack ? stack.getFunctionName() : '', | ||
fileName: stack ? stack.getFileName() : '', | ||
lineNumber: stack ? stack.getLineNumber() : -1, | ||
columnNumber: stack ? stack.getColumnNumber() : -1 | ||
}; | ||
function prepareStackTrace() { | ||
return arguments[1][0]; | ||
} | ||
return Array.prototype.slice.call(args); | ||
} | ||
function noop () {} | ||
function debug() { | ||
var args = Array.prototype.slice.call(arguments); | ||
args[0] = 'NUE: ' + args[0]; | ||
console.error.apply(console, args); | ||
} | ||
function As(index) { | ||
@@ -140,82 +209,9 @@ this.index = index; | ||
As.ALL = {description: 'all'}; | ||
function Async(lock, next, endWith) { | ||
this.lock = lock; | ||
this.next = next; | ||
this.endWith = endWith; | ||
this.index = 0; | ||
this.pending = 0; | ||
this.isCanceled = false; | ||
this.results = []; | ||
} | ||
Async.SIGNAL_UNLOCK = {}; | ||
Async.prototype.makeCallback = function makeCallback(mapping, loopIndex, groupAsyncIndex) { | ||
assert.ok(typeof mapping === 'object' || mapping === undefined, 'The argument `mapping` must be an object if specified.'); | ||
this.pending++; | ||
var isFirst = this.index === 0; | ||
var index = mapping === Async.SIGNAL_UNLOCK ? -1 : this.index++; | ||
if (isFirst && !this.lock) { | ||
this.lock = true; | ||
process.nextTick(this.makeCallback(Async.SIGNAL_UNLOCK)); | ||
} | ||
var self = this; | ||
var asyncCallback = function asyncCallback(err) { | ||
self.pending--; | ||
if (!self.isCanceled) { | ||
if (isErrorHandleRequire(mapping) && err) { | ||
self.isCanceled = true; | ||
self.endWith.call(null, err, index, mapping, loopIndex, groupAsyncIndex); | ||
} else { | ||
if (mapping === Async.SIGNAL_UNLOCK) { | ||
self.lock = false; | ||
} else { | ||
self.results[index] = mapping ? mapArguments(mapping, arguments) : Array.prototype.slice.call(arguments, 1); | ||
} | ||
if (self.pending === 0 && !self.lock) { | ||
self.next.apply(null, self.results); | ||
} | ||
} | ||
} | ||
}; | ||
asyncCallback.index = index; | ||
return asyncCallback; | ||
function isErrorHandleRequire(mapping) { | ||
if (!mapping) { | ||
return true; | ||
} | ||
if (mapping instanceof As) { | ||
return mapping.index !== 0; | ||
} | ||
return Object.keys(mapping).every(function (key) { | ||
var value = mapping[key]; | ||
if (value instanceof As) { | ||
return value.index !== 0; | ||
} | ||
return true; | ||
}); | ||
} | ||
function mapArguments(mapping, args) { | ||
if (mapping instanceof As) { | ||
return args[mapping.index]; | ||
} | ||
return Object.keys(mapping).reduce(function (result, key) { | ||
var value = mapping[key]; | ||
if (value instanceof As) { | ||
result[key] = args[value.index]; | ||
} else { | ||
result[key] = value; | ||
} | ||
return result; | ||
}, {}); | ||
} | ||
}; | ||
function ContextBase(flow, name, index, next, last) { | ||
function StepContext(flow, name, index, next, last) { | ||
this._flow = flow; | ||
this._next = next; | ||
this._last = last; | ||
this._async = new Async(false, this.next.bind(this), this.endWith.bind(this)); | ||
this.err = flow.err; | ||
@@ -226,34 +222,45 @@ this.data = flow.data; | ||
this.stepIndex = index; | ||
this.history = flow.history; | ||
this.history.push(new HistoryEntry(flow.flowName, name, index)); | ||
var self = this; | ||
this._asyncObj = new Async(false, this.next.bind(this), function endWith(err, asyncIndex, mapping) { | ||
self.endWith.call(self, new NueAsyncError(err, self.flowName, self.stepName, self.stepIndex, asyncIndex, mapping)); | ||
}); | ||
} | ||
ContextBase.DEFAULT_CONCURRENCY = 10; | ||
StepContext.DEFAULT_CONCURRENCY = 10; | ||
ContextBase.prototype._disable = function _disable() { | ||
this.next = noop; | ||
this.end = noop; | ||
this.endWith = noop; | ||
StepContext.noop = function noop() {}; | ||
StepContext.prototype._disable = function _disable() { | ||
this.next = StepContext.noop; | ||
this.end = StepContext.noop; | ||
this.endWith = StepContext.noop; | ||
}; | ||
ContextBase.prototype.async = function async(mapping) { | ||
return this._asyncObj.makeCallback.call(this._asyncObj, mapping); | ||
StepContext.prototype.next = function next() { | ||
this._disable(); | ||
this._flow.err = this.err; | ||
this._next.apply(null, arguments); | ||
}; | ||
ContextBase.prototype.asyncEach = function asyncEach() { | ||
StepContext.prototype.end = function end() { | ||
this._disable(); | ||
this._flow.err = this.err; | ||
this._last.apply(null, arguments); | ||
}; | ||
StepContext.prototype.endWith = function endWith(err) { | ||
this._disable(); | ||
this._flow.err = err; | ||
this._last.call(null); | ||
}; | ||
StepContext.prototype.async = function async(mapping) { | ||
return this._async.makeCallback.call(this._async, mapping, async); | ||
}; | ||
StepContext.prototype.asyncEach = function asyncEach() { | ||
var self = this; | ||
var callback = self.async(); | ||
var next = function next() { | ||
var args = Array.prototype.slice.call(arguments); | ||
return callback.apply(self, [null].concat(flattenArray(args))); | ||
var callback = self.async(As.ALL); | ||
var asyncObj = new Async(true, callback.bind(self), self.endWith.bind(self)); | ||
var group = { | ||
async: function async(mapping) { | ||
return asyncObj.makeCallback.call(asyncObj, mapping, async); | ||
} | ||
}; | ||
var endWith = function endWith(cause, asyncIndex, mapping, loopIndex, asyncGroupIndex) { | ||
var err = new NueGroupAsyncError(cause, self.flowName, self.stepName, self.stepIndex, callback.index, mapping, loopIndex, asyncGroupIndex); | ||
return self.endWith.call(self, err); | ||
}; | ||
var async = new Async(true, next, endWith); | ||
@@ -263,63 +270,40 @@ if (arguments.length === 1 && typeof arguments[0] === 'number') { | ||
return function () { | ||
validateAndConsume(concurrency, arguments[0], arguments[1]); | ||
validateAndStart(concurrency, arguments[0], arguments[1]); | ||
}; | ||
} else { | ||
validateAndConsume(ContextBase.DEFAULT_CONCURRENCY, arguments[0], arguments[1]); | ||
validateAndStart(StepContext.DEFAULT_CONCURRENCY, arguments[0], arguments[1]); | ||
} | ||
function validateAndConsume(concurrency, array, worker) { | ||
assert.ok(Array.isArray(array), 'An argument `array` must be an array.'); | ||
function validateAndStart(concurrency, array, worker) { | ||
assert(Array.isArray(array), 'An argument `array` must be an array.'); | ||
assert.equal(typeof worker, 'function', 'An argument `worker` must be a function.'); | ||
process.nextTick(function consumeFirst() { | ||
consume(concurrency, array, worker, 0); | ||
process.nextTick(function startEach() { | ||
each(concurrency, array, worker, 0); | ||
}); | ||
} | ||
function consume(concurrency, array, worker, index) { | ||
function each(concurrency, array, worker, index) { | ||
var len = array.length; | ||
for (var i = 0; i < concurrency && index < len; i++, index++) { | ||
(function callWorker(index) { | ||
var count = 0; | ||
var group = { | ||
async: function (mapping) { | ||
return async.makeCallback.call(async, mapping, index, count++); | ||
} | ||
}; | ||
worker.call(null, array[index], group, index, array); | ||
}(index)); | ||
worker.call(self, array[index], group, index, array); | ||
} | ||
if (index === len) { | ||
process.nextTick(async.makeCallback(Async.SIGNAL_UNLOCK)); | ||
process.nextTick(asyncObj.makeCallback(Async.SIGNAL_UNLOCK)); | ||
} else { | ||
process.nextTick(function consumeNext() { | ||
consume(concurrency, array, worker, index); | ||
process.nextTick(function nextEach() { | ||
each(concurrency, array, worker, index); | ||
}); | ||
} | ||
} | ||
function flattenArray(array) { | ||
var results = []; | ||
array.forEach(function (array2) { | ||
if (Array.isArray(array2)) { | ||
array2.forEach(function (e) { | ||
results.push(e); | ||
}); | ||
} else { | ||
results.push(array2); | ||
} | ||
}); | ||
return results; | ||
} | ||
}; | ||
ContextBase.prototype.exec = function exec(fn) { | ||
assert.ok(arguments.length > 1, 'Arguments length must be more than 1.'); | ||
StepContext.prototype.exec = function exec(fn) { | ||
assert(arguments.length > 1, 'Arguments length must be more than 1.'); | ||
assert.equal(typeof fn, 'function', 'The first argument must be a function.'); | ||
assert.equal(typeof arguments[arguments.length - 1], 'function', 'The last argument must be a function.'); | ||
var self = this; | ||
var callback = arguments[arguments.length - 1]; | ||
var args = Array.prototype.slice.call(arguments, 1, arguments.length - 1); | ||
var self = this; | ||
var end = function end() { | ||
callback.apply(null, [this.err].concat(Array.prototype.slice.call(arguments))); | ||
this.err = null; | ||
}; | ||
@@ -332,132 +316,101 @@ end.stepName = (fn.stepName || fn.name) + '_end'; | ||
function StepContext(flow, name, index, next, last) { | ||
ContextBase.call(this, flow, name, index, next, last); | ||
function Async(lock, next, endWith) { | ||
this.lock = lock; | ||
this.next = next; | ||
this.endWith = endWith; | ||
this.index = 0; | ||
this.pending = 0; | ||
this.isCanceled = false; | ||
this.results = []; | ||
} | ||
util.inherits(StepContext, ContextBase); | ||
StepContext.prototype.next = function next() { | ||
this._disable(); | ||
this._flow.err = null; | ||
this._next.apply(null, arguments); | ||
}; | ||
Async.SIGNAL_UNLOCK = {description: 'signal_unlock'}; | ||
StepContext.prototype.end = function end() { | ||
this._disable(); | ||
this._flow.err = null; | ||
this._last.apply(null, arguments); | ||
}; | ||
Async.prototype.makeCallback = function makeCallback(mapping, caller) { | ||
assert(typeof mapping === 'object' && mapping !== null, 'An argument `mapping` must be an object'); | ||
this.pending++; | ||
if (this.index === 0 && !this.lock) { | ||
this.lock = true; | ||
process.nextTick(this.makeCallback(Async.SIGNAL_UNLOCK)); | ||
} | ||
var index = mapping === Async.SIGNAL_UNLOCK ? -1 : this.index++; | ||
var location = getLocation(caller); | ||
var self = this; | ||
StepContext.prototype.endWith = function endWith(err) { | ||
this._disable(); | ||
this._flow.err = err; | ||
this._last.call(null); | ||
}; | ||
return function asyncCallback(err) { | ||
self.pending--; | ||
if (!self.isCanceled) { | ||
if (err && isErrorHandleRequired(mapping)) { | ||
self.isCanceled = true; | ||
self.endWith.call(null, makeAsyncError(err)); | ||
} else { | ||
if (mapping === Async.SIGNAL_UNLOCK) { | ||
self.lock = false; | ||
} else { | ||
self.results[index] = mapArguments(mapping, arguments); | ||
} | ||
if (self.pending === 0 && !self.lock) { | ||
self.next.apply(null, self.results); | ||
} | ||
} | ||
} | ||
}; | ||
function isErrorHandleRequired(mapping) { | ||
if (mapping === Async.SIGNAL_UNLOCK || mapping === As.ALL) { | ||
return false; | ||
} | ||
if (mapping instanceof As) { | ||
return mapping.index !== 0; | ||
} | ||
return Object.keys(mapping).every(function (key) { | ||
var value = mapping[key]; | ||
if (value instanceof As) { | ||
return value.index !== 0; | ||
} | ||
return true; | ||
}); | ||
} | ||
function LastStepContext(flow, name, index) { | ||
ContextBase.call(this, flow, name, index); | ||
} | ||
util.inherits(LastStepContext, ContextBase); | ||
function mapArguments(mapping, args) { | ||
if (mapping === As.ALL) { | ||
return Array.prototype.slice.call(args); | ||
} | ||
if (mapping instanceof As) { | ||
return args[mapping.index]; | ||
} | ||
return Object.keys(mapping).reduce(function (result, key) { | ||
var value = mapping[key]; | ||
if (value instanceof As) { | ||
result[key] = args[value.index]; | ||
} else { | ||
result[key] = value; | ||
} | ||
return result; | ||
}, {}); | ||
} | ||
LastStepContext.prototype.next = function next() { | ||
this._disable(); | ||
this._exit(Array.prototype.slice.call(arguments)); | ||
}; | ||
LastStepContext.prototype.end = function end() { | ||
this._disable(); | ||
this._exit(Array.prototype.slice.call(arguments)); | ||
}; | ||
LastStepContext.prototype.endWith = function endWith(err) { | ||
this._disable(); | ||
this.err = err; | ||
this._exit([]); | ||
}; | ||
LastStepContext.prototype._exit = function _exit(args) { | ||
if (this._flow.isTopLevel) { | ||
if (this.err) { | ||
throw new NueUnhandledError(this.err, this.flowName, this.stepName); | ||
} | ||
} else { | ||
if (this.err) { | ||
this._flow.context.endWith.call(this._flow.context, this.err); | ||
function makeAsyncError(err) { | ||
var log = { | ||
'function': location.functionName, | ||
location: util.format('%s:%d:%d', location.fileName, location.lineNumber, location.columnNumber), | ||
mapping: mapping | ||
}; | ||
var cause; | ||
var history; | ||
if (err.name === 'NueAsyncError') { | ||
cause = err.cause; | ||
history = err.asyncCallHistory.concat([log]); | ||
} else { | ||
this._flow.context.next.apply(this._flow.context, args); | ||
cause = err; | ||
history = [log]; | ||
} | ||
var e = new Error('An error occurred in an async call.'); | ||
e.name = 'NueAsyncError'; | ||
e.cause = cause; | ||
e.asyncCallHistory = history; | ||
e.message += util.format('\ncause stack is ...\n %s\nasync call history is ...\n', | ||
e.cause.stack, history); | ||
return e; | ||
} | ||
}; | ||
function HistoryEntry(flowName, stepName, stepIndex) { | ||
this.flowName = flowName; | ||
this.stepName = stepName; | ||
this.stepIndex = stepIndex; | ||
} | ||
HistoryEntry.prototype.toString = function toString() { | ||
var flowName = this.flowName || '<anonymous>'; | ||
var stepName = this.stepName || '<anonymous>'; | ||
return flowName + '[' + this.stepIndex + ']:' + stepName; | ||
}; | ||
function NueAsyncError(cause, flowName, stepName, stepIndex, asyncIndex, mapping) { | ||
this.cause = cause; | ||
this.flowName = flowName; | ||
this.stepName = stepName; | ||
this.stepIndex = stepIndex; | ||
this.asyncIndex = asyncIndex; | ||
this.name = 'NueAsyncError'; | ||
this.message = "An error in an async callback:" + | ||
"\nflowName = '" + flowName + "'," + | ||
"\nstepName = '" + stepName + "'," + | ||
'\nstepIndex = ' + stepIndex + ', ' + | ||
'\nasyncIndex = ' + asyncIndex + ', ' + | ||
'\nmapping = ' + util.inspect(mapping, false, null) + | ||
'\n+----- BEGIN CAUSE STACK -----+\n' + indent(cause.stack) + '\n+----- END CAUSE STACK -----+'; | ||
Error.captureStackTrace(this, NueAsyncError); | ||
} | ||
util.inherits(NueAsyncError, Error); | ||
function NueGroupAsyncError(cause, flowName, stepName, stepIndex, asyncIndex, mapping, loopIndex, groupAsyncIndex) { | ||
this.cause = cause; | ||
this.flowName = flowName; | ||
this.stepName = stepName; | ||
this.stepIndex = stepIndex; | ||
this.asyncIndex = asyncIndex; | ||
this.loopIndex = loopIndex; | ||
this.groupAsyncIndex = groupAsyncIndex; | ||
this.name = 'NueGroupAsyncError'; | ||
this.message = "An error in an async callback:" + | ||
"\nflowName = '" + flowName + "'," + | ||
"\nstepName = '" + stepName + "'," + | ||
'\nstepIndex = ' + stepIndex + ', ' + | ||
'\nasyncIndex = ' + asyncIndex + ', ' + | ||
'\nloopIndex = ' + loopIndex + ', ' + | ||
'\ngroupAsyncIndex = ' + groupAsyncIndex + ', ' + | ||
'\nmapping = ' + util.inspect(mapping, false, null) + | ||
'\n+----- BEGIN CAUSE STACK -----+\n' + indent(cause.stack) + '\n+----- END CAUSE STACK -----+'; | ||
Error.captureStackTrace(this, NueGroupAsyncError); | ||
} | ||
util.inherits(NueGroupAsyncError, Error); | ||
function NueUnhandledError(cause, flowName, stepName) { | ||
this.cause = cause; | ||
this.name = 'NueUnhandledError'; | ||
this.message = 'The error must be handled in a last step of flow. ' + | ||
'To indicate error handling completed, set null to `this.err` before exiting the last step:' + | ||
"\nflowName = '" + flowName + "'," + | ||
"\nstepName = '" + stepName + "'" + | ||
'\n+----- BEGIN CAUSE STACK -----+\n' + indent(cause.stack) + '\n+----- END CAUSE STACK -----+'; | ||
Error.captureStackTrace(this, NueUnhandledError); | ||
} | ||
util.inherits(NueUnhandledError, Error); | ||
function indent(text) { | ||
return ' ' + text.replace(/\n/g, '\n '); | ||
} |
@@ -12,3 +12,3 @@ { | ||
}, | ||
"version" : "0.5.0" | ||
"version" : "0.6.0" | ||
} |
199
README.md
@@ -40,96 +40,133 @@ nue — An async control-flow library | ||
### flow([Function steps...]) -> Function | ||
### Top Level API | ||
The `nue` module provides following API. | ||
#### flow([Function steps...]) -> Function | ||
Return a function which represents the control-flow. | ||
> Arguments | ||
* `steps`: Optional. Optional functions to execute in series. | ||
> Context | ||
#### flow(String flowName) -> Function | ||
`this` context of each step in a flow has following properties. | ||
Accept a flow name and return another `flow` function. | ||
* `next`: next([Object values...]) -> Void | ||
* A function to execute a next step immediately. | ||
* `flowName`: Required. A flow name to be used for debug. | ||
* `async`: async([Object mapping]) -> Function | ||
* A function to accept an argument mapping definition for a next step and return a callback. | ||
#### parallel([Function steps...]) -> Function | ||
Return a function which represents the parallel control-flow. | ||
The `parallel` must be nested inside a `flow` or another `parallel`. | ||
* `steps`: Optional. Optional functions to execute in parallel. | ||
#### parallel(String flowName) -> Function | ||
Accept a flow name and return another `parallel` function. | ||
* `flowName`: Required. A flow name to be used for debug. | ||
#### as(Number index) -> Object | ||
* `index`: Required. An index to map an asynchronous callback argument to a next step argument. | ||
If the index is zero, an error handling is skipped. | ||
### Step Context API | ||
`flow` and `parallel` API accept functions called `step`s. Each step context object - it means a `this` object in the step function - provides following API. | ||
#### next([Object values...]) -> Void | ||
A function to execute a next step immediately. | ||
* `values`: Optional. Arguments for a next step. | ||
#### async([Object mapping]) -> Function | ||
A function to accept an argument mapping definition for a next step and return a callback. | ||
`async` can be called many times, but all calls are done in same tick. | ||
And all callbacks `async` returns must be called absolutely. | ||
And all callbacks `async` returns must be called. | ||
* `asyncEach`: asyncEach(Array array, Function(element, group, elementIndex, traversedArray)) -> Void | ||
* A function to execute a provided function once per array element asynchronously. | ||
The `group` has a `async` function to accept an argument mapping definition and return a callback. | ||
* `mapping`: Required. An argument mapping definition. | ||
To map single argument, call `as` API and pass its result. | ||
* `asyncEach`: asyncEach(Number concurrency) -> Function | ||
* A function to accept a concurrency number and return another `asyncEach` function which | ||
executes a provided function once per array element asynchronously with the specified cuncurrency. | ||
If you use another `forEach` function directly, default concurrency 10 is used. | ||
```js | ||
fs.readFile('file1', 'utf8', this.async(as(1))); | ||
``` | ||
* `exec`: exec(Function function([values...]), [Object args...], Function callback(err, [values...])) -> Void | ||
* A function to execute a specified `function` with `args` asynchronously. The `callback` are executed when the `function` is completed. | ||
To map multiple arguments, pass an object. | ||
* `end`: end([Object values...]) -> Void | ||
* A function to execute a last step immediately to end a control-flow. | ||
```js | ||
child_process.exec('whoami', this.async({stdout: as(1), stderr: as(2)})); | ||
``` | ||
* `endWith`: endWith(Error err) -> Void | ||
* A function to execute a last step immediately with an error to end a control-flow. | ||
The parameter `err` is referred as `this.err` in a last step. | ||
#### asyncEach(Array array, Function callback(element, group, index, traversedArray)) -> Void | ||
* `data`: Object | ||
* A object to share arbitrary data between steps in a control-flow. | ||
A function to execute a provided function once per array element asynchronously. | ||
* `flowName`: String | ||
* A flow name. | ||
* `array`: Required. An array. | ||
* `callback`: Required. A function being executed once per array element. | ||
The context object in the callback is same with outer step context object. | ||
* `element`: Required. A current element. | ||
* `group`: Required. Provedes `async` function to accept an argument mapping definition and return a callback. | ||
* `index`: Required. An element index. | ||
* `traversedArray`: Required. An array object being traversed. | ||
* `stepName`: String | ||
* A step name. | ||
#### asyncEach(Number concurrency) -> Function | ||
* `history`: Array | ||
* An array to contain information about executed steps. This is an EXPERIMETAL FEATURE. | ||
A function to accept a concurrency number and return another `asyncEach` function which | ||
executes a provided function once per array element asynchronously with the specified cuncurrency. | ||
If you use another `forEach` function directly, default concurrency 10 is used. | ||
In addition to above ones, the context of a last step has a following property. | ||
* `concurrency`: Required. the number of concurrency. | ||
* `err`: Object | ||
* An object represents an error which is thrown with `throw`, passed to `this.endWith` or | ||
passed to an async callback as first argument. | ||
#### exec(Function function, [Object args...], Function callback(err, [values...])) -> Void | ||
### flow(String flowName) -> Function | ||
A function to execute a specified `function` with `args` asynchronously. | ||
Accept a flow name and return another `flow` function. | ||
* `function`: Required. A function to be executed asynchronously. | ||
* `args`: Optional. Arguments for the `function`. | ||
* `callback`: Required. A function to be executed when the `function` is completed. | ||
* `err`: Required. An error in an async call. | ||
* `values`: Required. Results from the `function`. | ||
> Arguments | ||
#### end([Object values...]) -> Void | ||
* `flowName`: Required. A flow name to be used for debug. | ||
A function to execute a last step immediately to end a control-flow. | ||
### parallel([Function steps...]) -> Function | ||
* `values`: Optional. Arguments for a last step. | ||
Return a function which represents the parallel control-flow. | ||
The `parallel` must be nested inside a `flow` or another `parallel`. | ||
#### endWith(Error err) -> Void | ||
> Arguments | ||
A function to execute a last step immediately with an error to end a control-flow. | ||
* `steps`: Optional. Optional functions to execute in parallel. | ||
* `err`: Required. An error object. This object can be referred as `this.err` in a last step. | ||
### parallel(String flowName) -> Function | ||
#### data : Object | ||
Accept a flow name and return another `parallel` function. | ||
A object to share arbitrary data between steps in a control-flow. | ||
> Arguments | ||
#### flowName : String | ||
* `flowName`: Required. A flow name to be used for debug. | ||
A flow name. | ||
### as(Number index) -> Object | ||
#### stepName : String | ||
> Arguments | ||
A step name. | ||
* `index`: Required. An index to map an asynchronous callback argument to a next step argument. | ||
If the index is zero, an error handling is skipped. | ||
#### err : Object | ||
## Arguments Passing Between Functions | ||
An error object, which is thrown with `throw`, passed to `this.endWith` or passed to an async callback as first argument. | ||
This property is accessible in only last steps. | ||
## More Examples | ||
### Arguments Passing Between Functions | ||
Arguments are passed with `this.next` or `this.async`. | ||
### Synchronously | ||
#### Synchronously | ||
@@ -146,3 +183,3 @@ ```js | ||
if (this.err) throw this.err; | ||
console.log(s1 + ' + ' + s2 + ' -> ' + length); // file1 + file2 -> 10 | ||
console.log(s1 + '.length + ' + s2 + '.length -> ' + length); // file1.length + file2.length -> 10 | ||
console.log('done'); | ||
@@ -156,3 +193,3 @@ this.next(); | ||
### Asynchronously | ||
#### Asynchronously | ||
@@ -209,3 +246,3 @@ To pass asynchronous call results to a next function, arguments mapping definition is necessary. | ||
## Asynchronous Loop | ||
### Asynchronous Loop | ||
@@ -249,3 +286,3 @@ `this.asyncEach` executes a provided function once per array element asynchronously. | ||
## Flow Nesting | ||
### Flow Nesting | ||
@@ -281,3 +318,3 @@ A flow is composable. So it can be nested. | ||
## Asynchronous Flow Execution | ||
### Asynchronous Flow Execution | ||
@@ -313,3 +350,3 @@ A flow can be executed asynchronously. | ||
## Parallel Flow | ||
### Parallel Flow | ||
@@ -378,3 +415,3 @@ In following example, the flow `par1-1` and `par1-2` are executed in parallel. | ||
## Data Sharing Between Functions | ||
### Data Sharing Between Functions | ||
@@ -411,3 +448,3 @@ Each step in a flow can share data through `this.data`. | ||
## Error Handling | ||
### Error Handling | ||
@@ -424,4 +461,2 @@ In a last step in a flow, `this.err` represents an error which is thrown with `throw`, passed to `this.endWith` or passed to an async callback as first argument. | ||
function readFiles(file1, file2) { | ||
if (!file1) this.endWith(new Error('file1 is illegal.')); | ||
if (!file2) this.endWith(new Error('file2 is illegal.')); | ||
fs.readFile(file1, 'utf8', this.async(as(1))); | ||
@@ -450,3 +485,3 @@ fs.readFile(file2, 'utf8', this.async(as(1))); | ||
## Unit Test with Mocha | ||
### Unit Test with Mocha | ||
@@ -502,1 +537,33 @@ Following example shows how to test a flow and a function with [Mocha](http://visionmedia.github.com/mocha/). | ||
``` | ||
## Debugging | ||
Use `NODE_DEBUG=nue`. | ||
### Example | ||
> hoge.js | ||
```js | ||
var flow = require('nue').flow; | ||
flow('hoge')( | ||
function add(x, y) { | ||
this.next(x + y); | ||
}, | ||
function done(result) { | ||
if (this.err) throw this.err; | ||
console.log(result); | ||
} | ||
)(10, 20); | ||
``` | ||
> Run and Output | ||
```sh | ||
$ NODE_DEBUG=nue node hoge.js | ||
NUE: begin TOP_LEVEL_FLOW. flow: hoge(0), calledAt: /private/tmp/hoge.js:11:1, args: [ 10, 20 ] | ||
NUE: begin STEP. flow: hoge(0), step: add(0), args: [ 10, 20 ] | ||
NUE: begin STEP. flow: hoge(0), step: done(1), args: [ 30 ] | ||
30 | ||
``` |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Environment variable access
Supply chain riskPackage accesses environment variables, which may be a sign of credential stuffing or data theft.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
38026
557
687
11