Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

nue

Package Overview
Dependencies
Maintainers
1
Versions
17
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

nue - npm Package Compare versions

Comparing version 0.5.0 to 0.6.0

7

CHANGELOG.md
# Changelog
- 0.6.0 (2012/03/10)
- New Feature - debugging is supported. `NODE_DEBUG=nue` is available.
- Change - NueAsyncError is improved. Now NueAsyncError is a plain Error object.
- Change - `history` property is removed from the step context.
- Change - result array from `this.asyncEach` is not flatten.
- Change - result array from `nue.parallel` is not flatten.
- 0.5.0 (2012/03/04)

@@ -4,0 +11,0 @@ - New Feature - `nue.as` function is introduced to map asynchronous callback arguments to next function ones.

2

examples/argPassingSync.js

@@ -10,3 +10,3 @@ var flow = require('../index').flow;

if (this.err) throw this.err;
console.log(s1 + ' + ' + s2 + ' -> ' + length); // file1 + file2 -> 10
console.log(s1 + '.length + ' + s2 + '.length -> ' + length); // file1.length + file2.length -> 10
console.log('done');

@@ -13,0 +13,0 @@ this.next();

@@ -7,4 +7,2 @@ var flow = require('../index').flow;

function readFiles(file1, file2) {
if (!file1) this.endWith(new Error('file1 is illegal.'));
if (!file2) this.endWith(new Error('file2 is illegal.'));
fs.readFile(file1, 'utf8', this.async(as(1)));

@@ -11,0 +9,0 @@ fs.readFile(file2, 'utf8', this.async(as(1)));

@@ -5,19 +5,43 @@ var flow = require('../index').flow;

var myFlow = flow('main')(
function one() { this.next(); },
function two() { this.next(); },
function one() {
console.log(this.stepName);
this.next();
},
function two() {
console.log(this.stepName);
this.next();
},
parallel('par1')(
flow('par1-1')(
function three() { this.next(); },
function four() { this.next(); }
function three() {
console.log(this.stepName);
this.next();
},
function four() {
console.log(this.stepName);
this.next();
}
),
flow('par1-2')(
function five() { this.next(); },
function six() { this.next(); }
function five() {
console.log(this.stepName);
this.next();
},
function six() {
console.log(this.stepName);
this.next();
}
)
),
function seven() { this.next(); },
function eight() { this.next(); },
function seven() {
console.log(this.stepName);
this.next();
},
function eight() {
console.log(this.stepName);
this.next();
},
function allDone() {
if (this.err) throw this.err;
console.log(this.history);
console.log(this.stepName);
this.next();

@@ -24,0 +48,0 @@ }

var flow = require('../index').flow;
var as = require('../index').as;
function sleep(ms) {
setTimeout(this.async(), ms);
setTimeout(this.async(as(-1)), ms);
}

@@ -10,5 +11,6 @@

console.log('wait... ' + new Date());
this.exec(sleep, 1000, this.async());
this.exec(sleep, 1000, this.async(as(1)));
},
function end() {
if (this.err) throw this.err;
console.log('ok... ' + new Date());

@@ -15,0 +17,0 @@ this.next();

'use strict';
exports.name = 'nue';
exports.version = '0.5.0';
exports.version = '0.6.0';
exports.flow = flow;

@@ -9,7 +9,7 @@ exports.parallel = parallel;

var util = require('util');
var assert = require('assert');
var debugMode = process.env.NODE_DEBUG && /\bnue\b/.test(process.env.NODE_DEBUG);
var flowId = 0;
function flow() {

@@ -19,16 +19,15 @@ if (arguments.length === 1 && typeof arguments[0] === 'string') {

return function () {
return prepareFlow(flowName, normalizeArgs(arguments));
return deferFlow(flowName, normalizeArgs(arguments));
};
} else {
return prepareFlow('', normalizeArgs(arguments));
return deferFlow('', normalizeArgs(arguments));
}
function prepareFlow(flowName, functions) {
function deferFlow(flowName, functions) {
function startFlow() {
var isTopLevel = !(this instanceof ContextBase);
var flow = {
flowName: flowName,
flowId: flowId++,
context: this,
isTopLevel: isTopLevel,
history: isTopLevel ? [] : this.history,
isTopLevel: !(this instanceof StepContext),
data: {},

@@ -38,2 +37,9 @@ err: null

var steps = functions.length > 0 ? functions : [function () { this.next(); }];
if (debugMode && flow.isTopLevel) {
var location = getLocation(startFlow);
debug('begin TOP_LEVEL_FLOW. flow: %s(%d), calledAt: %s:%d:%d, args:',
flow.flowName || '<anonymous>', flow.flowId,
location.fileName, location.lineNumber, location.columnNumber,
Array.prototype.slice.call(arguments));
}
var head = chainSteps(flow, steps);

@@ -55,8 +61,9 @@ head.apply(this, arguments);

return steps.reduceRight(function chain(prev, curr, i) {
return steps.reduceRight(function chain(prev, curr, index) {
assert.equal(typeof curr, 'function', 'Each argument for `flow` must be a function.');
return function step() {
var next = i === len - 2 ? last : prev;
var context = new StepContext(flow, curr.stepName || curr.name, i, next, last);
var next = index === len - 2 ? last : prev;
var context = new StepContext(flow, curr.stepName || curr.name, index, next, last);
try {
curr.apply(context, arguments);
runStep(curr, context, arguments);
} catch (e) {

@@ -68,10 +75,11 @@ if (isThrown) {

}
}
};
});
function makeLastStep(fn, index) {
assert.equal(typeof fn, 'function', 'Each argument for `flow` must be a function.');
return function lastStep() {
var context = new LastStepContext(flow, fn.stepName || fn.name, index);
var context = new StepContext(flow, fn.stepName || fn.name, index, exit, exit);
try {
fn.apply(context, arguments);
runStep(fn, context, arguments, true);
} catch (e) {

@@ -81,4 +89,41 @@ isThrown = true;

}
};
function exit() {
if (flow.isTopLevel) {
if (flow.err) {
var message = 'An error must be handled in a last step of its flow. ' +
'To indicate error handling completed, set null to `this.err` before exiting the last step. ' +
'cause: ' + util.inspect(flow.err);
var err = new Error(message);
err.name = 'NueUnhandledError';
throw err;
}
} else {
if (flow.err) {
flow.context.endWith.call(flow.context, flow.err);
} else {
flow.context.next.apply(flow.context, arguments);
}
}
}
}
function runStep(step, context, args) {
if (debugMode) {
debug('begin STEP. flow: %s(%d), step: %s(%d), args:',
context.flowName || '<anonymous>', context._flow.flowId,
context.stepName || '<anonymous>', context.stepIndex, Array.prototype.slice.call(args));
}
try {
step.apply(context, args);
} catch (e) {
if (debugMode) {
debug('error STEP. flow: , step: %s(%d), err:',
context.flowName || '<anonymous>', context._flow.flowId,
context.stepName || '<anonymous>', context.stepIndex, e);
}
throw e;
}
}
}

@@ -91,19 +136,22 @@ }

return function () {
return prepareParallel(flowName, normalizeArgs(arguments));
return deferParallel(flowName, normalizeArgs(arguments));
};
} else {
return prepareParallel('', normalizeArgs(arguments));
return deferParallel('', normalizeArgs(arguments));
}
function prepareParallel(flowName, functions) {
function deferParallel(flowName, functions) {
function startParallel() {
assert.ok(this instanceof ContextBase, 'A `parallel` must be inside a flow or another parallel.');
assert(this instanceof StepContext, 'A `parallel` must be inside a flow or another parallel.');
var self = this;
var args = Array.prototype.slice.call(arguments);
self.asyncEach(1)(functions, function (fn, group) {
assert.equal(typeof fn, 'function', 'Each argument must be a function.');
var callback = group.async();
assert.equal(typeof fn, 'function', 'Each argument for `parallel` must be a function.');
var callback = group.async(As.ALL);
var end = function end() {
callback.apply(null, [this.err].concat(Array.prototype.slice.call(arguments)));
this.err = null;
if (this.err) {
self.endWith(this.err);
} else {
callback.apply(null, arguments);
}
};

@@ -113,3 +161,2 @@ end.stepName = (fn.stepName || fn.name) + '_end';

});
process.nextTick(self.async(Async.SIGNAL_UNLOCK));
}

@@ -126,11 +173,33 @@ startParallel.stepName = flowName;

function normalizeArgs(args) {
if (args.length === 1 && Array.isArray(args[0])) {
return args[0];
return (args.length === 1 && Array.isArray(args[0])) ? args[0] : Array.prototype.slice.call(args);
}
function getLocation(target) {
var originalPrepareStackTrace = Error.prepareStackTrace;
var originalStackTraceLimit = Error.stackTraceLimit;
Error.prepareStackTrace = prepareStackTrace;
Error.stackTraceLimit = 1;
var err = {};
Error.captureStackTrace(err, target);
var stack = err.stack;
Error.prepareStackTrace = originalPrepareStackTrace;
Error.stackTraceLimit = originalStackTraceLimit;
return {
functionName: stack ? stack.getFunctionName() : '',
fileName: stack ? stack.getFileName() : '',
lineNumber: stack ? stack.getLineNumber() : -1,
columnNumber: stack ? stack.getColumnNumber() : -1
};
function prepareStackTrace() {
return arguments[1][0];
}
return Array.prototype.slice.call(args);
}
function noop () {}
function debug() {
var args = Array.prototype.slice.call(arguments);
args[0] = 'NUE: ' + args[0];
console.error.apply(console, args);
}
function As(index) {

@@ -140,82 +209,9 @@ this.index = index;

As.ALL = {description: 'all'};
function Async(lock, next, endWith) {
this.lock = lock;
this.next = next;
this.endWith = endWith;
this.index = 0;
this.pending = 0;
this.isCanceled = false;
this.results = [];
}
Async.SIGNAL_UNLOCK = {};
Async.prototype.makeCallback = function makeCallback(mapping, loopIndex, groupAsyncIndex) {
assert.ok(typeof mapping === 'object' || mapping === undefined, 'The argument `mapping` must be an object if specified.');
this.pending++;
var isFirst = this.index === 0;
var index = mapping === Async.SIGNAL_UNLOCK ? -1 : this.index++;
if (isFirst && !this.lock) {
this.lock = true;
process.nextTick(this.makeCallback(Async.SIGNAL_UNLOCK));
}
var self = this;
var asyncCallback = function asyncCallback(err) {
self.pending--;
if (!self.isCanceled) {
if (isErrorHandleRequire(mapping) && err) {
self.isCanceled = true;
self.endWith.call(null, err, index, mapping, loopIndex, groupAsyncIndex);
} else {
if (mapping === Async.SIGNAL_UNLOCK) {
self.lock = false;
} else {
self.results[index] = mapping ? mapArguments(mapping, arguments) : Array.prototype.slice.call(arguments, 1);
}
if (self.pending === 0 && !self.lock) {
self.next.apply(null, self.results);
}
}
}
};
asyncCallback.index = index;
return asyncCallback;
function isErrorHandleRequire(mapping) {
if (!mapping) {
return true;
}
if (mapping instanceof As) {
return mapping.index !== 0;
}
return Object.keys(mapping).every(function (key) {
var value = mapping[key];
if (value instanceof As) {
return value.index !== 0;
}
return true;
});
}
function mapArguments(mapping, args) {
if (mapping instanceof As) {
return args[mapping.index];
}
return Object.keys(mapping).reduce(function (result, key) {
var value = mapping[key];
if (value instanceof As) {
result[key] = args[value.index];
} else {
result[key] = value;
}
return result;
}, {});
}
};
function ContextBase(flow, name, index, next, last) {
function StepContext(flow, name, index, next, last) {
this._flow = flow;
this._next = next;
this._last = last;
this._async = new Async(false, this.next.bind(this), this.endWith.bind(this));
this.err = flow.err;

@@ -226,34 +222,45 @@ this.data = flow.data;

this.stepIndex = index;
this.history = flow.history;
this.history.push(new HistoryEntry(flow.flowName, name, index));
var self = this;
this._asyncObj = new Async(false, this.next.bind(this), function endWith(err, asyncIndex, mapping) {
self.endWith.call(self, new NueAsyncError(err, self.flowName, self.stepName, self.stepIndex, asyncIndex, mapping));
});
}
ContextBase.DEFAULT_CONCURRENCY = 10;
StepContext.DEFAULT_CONCURRENCY = 10;
ContextBase.prototype._disable = function _disable() {
this.next = noop;
this.end = noop;
this.endWith = noop;
StepContext.noop = function noop() {};
StepContext.prototype._disable = function _disable() {
this.next = StepContext.noop;
this.end = StepContext.noop;
this.endWith = StepContext.noop;
};
ContextBase.prototype.async = function async(mapping) {
return this._asyncObj.makeCallback.call(this._asyncObj, mapping);
StepContext.prototype.next = function next() {
this._disable();
this._flow.err = this.err;
this._next.apply(null, arguments);
};
ContextBase.prototype.asyncEach = function asyncEach() {
StepContext.prototype.end = function end() {
this._disable();
this._flow.err = this.err;
this._last.apply(null, arguments);
};
StepContext.prototype.endWith = function endWith(err) {
this._disable();
this._flow.err = err;
this._last.call(null);
};
StepContext.prototype.async = function async(mapping) {
return this._async.makeCallback.call(this._async, mapping, async);
};
StepContext.prototype.asyncEach = function asyncEach() {
var self = this;
var callback = self.async();
var next = function next() {
var args = Array.prototype.slice.call(arguments);
return callback.apply(self, [null].concat(flattenArray(args)));
var callback = self.async(As.ALL);
var asyncObj = new Async(true, callback.bind(self), self.endWith.bind(self));
var group = {
async: function async(mapping) {
return asyncObj.makeCallback.call(asyncObj, mapping, async);
}
};
var endWith = function endWith(cause, asyncIndex, mapping, loopIndex, asyncGroupIndex) {
var err = new NueGroupAsyncError(cause, self.flowName, self.stepName, self.stepIndex, callback.index, mapping, loopIndex, asyncGroupIndex);
return self.endWith.call(self, err);
};
var async = new Async(true, next, endWith);

@@ -263,63 +270,40 @@ if (arguments.length === 1 && typeof arguments[0] === 'number') {

return function () {
validateAndConsume(concurrency, arguments[0], arguments[1]);
validateAndStart(concurrency, arguments[0], arguments[1]);
};
} else {
validateAndConsume(ContextBase.DEFAULT_CONCURRENCY, arguments[0], arguments[1]);
validateAndStart(StepContext.DEFAULT_CONCURRENCY, arguments[0], arguments[1]);
}
function validateAndConsume(concurrency, array, worker) {
assert.ok(Array.isArray(array), 'An argument `array` must be an array.');
function validateAndStart(concurrency, array, worker) {
assert(Array.isArray(array), 'An argument `array` must be an array.');
assert.equal(typeof worker, 'function', 'An argument `worker` must be a function.');
process.nextTick(function consumeFirst() {
consume(concurrency, array, worker, 0);
process.nextTick(function startEach() {
each(concurrency, array, worker, 0);
});
}
function consume(concurrency, array, worker, index) {
function each(concurrency, array, worker, index) {
var len = array.length;
for (var i = 0; i < concurrency && index < len; i++, index++) {
(function callWorker(index) {
var count = 0;
var group = {
async: function (mapping) {
return async.makeCallback.call(async, mapping, index, count++);
}
};
worker.call(null, array[index], group, index, array);
}(index));
worker.call(self, array[index], group, index, array);
}
if (index === len) {
process.nextTick(async.makeCallback(Async.SIGNAL_UNLOCK));
process.nextTick(asyncObj.makeCallback(Async.SIGNAL_UNLOCK));
} else {
process.nextTick(function consumeNext() {
consume(concurrency, array, worker, index);
process.nextTick(function nextEach() {
each(concurrency, array, worker, index);
});
}
}
function flattenArray(array) {
var results = [];
array.forEach(function (array2) {
if (Array.isArray(array2)) {
array2.forEach(function (e) {
results.push(e);
});
} else {
results.push(array2);
}
});
return results;
}
};
ContextBase.prototype.exec = function exec(fn) {
assert.ok(arguments.length > 1, 'Arguments length must be more than 1.');
StepContext.prototype.exec = function exec(fn) {
assert(arguments.length > 1, 'Arguments length must be more than 1.');
assert.equal(typeof fn, 'function', 'The first argument must be a function.');
assert.equal(typeof arguments[arguments.length - 1], 'function', 'The last argument must be a function.');
var self = this;
var callback = arguments[arguments.length - 1];
var args = Array.prototype.slice.call(arguments, 1, arguments.length - 1);
var self = this;
var end = function end() {
callback.apply(null, [this.err].concat(Array.prototype.slice.call(arguments)));
this.err = null;
};

@@ -332,132 +316,101 @@ end.stepName = (fn.stepName || fn.name) + '_end';

function StepContext(flow, name, index, next, last) {
ContextBase.call(this, flow, name, index, next, last);
function Async(lock, next, endWith) {
this.lock = lock;
this.next = next;
this.endWith = endWith;
this.index = 0;
this.pending = 0;
this.isCanceled = false;
this.results = [];
}
util.inherits(StepContext, ContextBase);
StepContext.prototype.next = function next() {
this._disable();
this._flow.err = null;
this._next.apply(null, arguments);
};
Async.SIGNAL_UNLOCK = {description: 'signal_unlock'};
StepContext.prototype.end = function end() {
this._disable();
this._flow.err = null;
this._last.apply(null, arguments);
};
Async.prototype.makeCallback = function makeCallback(mapping, caller) {
assert(typeof mapping === 'object' && mapping !== null, 'An argument `mapping` must be an object');
this.pending++;
if (this.index === 0 && !this.lock) {
this.lock = true;
process.nextTick(this.makeCallback(Async.SIGNAL_UNLOCK));
}
var index = mapping === Async.SIGNAL_UNLOCK ? -1 : this.index++;
var location = getLocation(caller);
var self = this;
StepContext.prototype.endWith = function endWith(err) {
this._disable();
this._flow.err = err;
this._last.call(null);
};
return function asyncCallback(err) {
self.pending--;
if (!self.isCanceled) {
if (err && isErrorHandleRequired(mapping)) {
self.isCanceled = true;
self.endWith.call(null, makeAsyncError(err));
} else {
if (mapping === Async.SIGNAL_UNLOCK) {
self.lock = false;
} else {
self.results[index] = mapArguments(mapping, arguments);
}
if (self.pending === 0 && !self.lock) {
self.next.apply(null, self.results);
}
}
}
};
function isErrorHandleRequired(mapping) {
if (mapping === Async.SIGNAL_UNLOCK || mapping === As.ALL) {
return false;
}
if (mapping instanceof As) {
return mapping.index !== 0;
}
return Object.keys(mapping).every(function (key) {
var value = mapping[key];
if (value instanceof As) {
return value.index !== 0;
}
return true;
});
}
function LastStepContext(flow, name, index) {
ContextBase.call(this, flow, name, index);
}
util.inherits(LastStepContext, ContextBase);
function mapArguments(mapping, args) {
if (mapping === As.ALL) {
return Array.prototype.slice.call(args);
}
if (mapping instanceof As) {
return args[mapping.index];
}
return Object.keys(mapping).reduce(function (result, key) {
var value = mapping[key];
if (value instanceof As) {
result[key] = args[value.index];
} else {
result[key] = value;
}
return result;
}, {});
}
LastStepContext.prototype.next = function next() {
this._disable();
this._exit(Array.prototype.slice.call(arguments));
};
LastStepContext.prototype.end = function end() {
this._disable();
this._exit(Array.prototype.slice.call(arguments));
};
LastStepContext.prototype.endWith = function endWith(err) {
this._disable();
this.err = err;
this._exit([]);
};
LastStepContext.prototype._exit = function _exit(args) {
if (this._flow.isTopLevel) {
if (this.err) {
throw new NueUnhandledError(this.err, this.flowName, this.stepName);
}
} else {
if (this.err) {
this._flow.context.endWith.call(this._flow.context, this.err);
function makeAsyncError(err) {
var log = {
'function': location.functionName,
location: util.format('%s:%d:%d', location.fileName, location.lineNumber, location.columnNumber),
mapping: mapping
};
var cause;
var history;
if (err.name === 'NueAsyncError') {
cause = err.cause;
history = err.asyncCallHistory.concat([log]);
} else {
this._flow.context.next.apply(this._flow.context, args);
cause = err;
history = [log];
}
var e = new Error('An error occurred in an async call.');
e.name = 'NueAsyncError';
e.cause = cause;
e.asyncCallHistory = history;
e.message += util.format('\ncause stack is ...\n %s\nasync call history is ...\n',
e.cause.stack, history);
return e;
}
};
function HistoryEntry(flowName, stepName, stepIndex) {
this.flowName = flowName;
this.stepName = stepName;
this.stepIndex = stepIndex;
}
HistoryEntry.prototype.toString = function toString() {
var flowName = this.flowName || '<anonymous>';
var stepName = this.stepName || '<anonymous>';
return flowName + '[' + this.stepIndex + ']:' + stepName;
};
function NueAsyncError(cause, flowName, stepName, stepIndex, asyncIndex, mapping) {
this.cause = cause;
this.flowName = flowName;
this.stepName = stepName;
this.stepIndex = stepIndex;
this.asyncIndex = asyncIndex;
this.name = 'NueAsyncError';
this.message = "An error in an async callback:" +
"\nflowName = '" + flowName + "'," +
"\nstepName = '" + stepName + "'," +
'\nstepIndex = ' + stepIndex + ', ' +
'\nasyncIndex = ' + asyncIndex + ', ' +
'\nmapping = ' + util.inspect(mapping, false, null) +
'\n+----- BEGIN CAUSE STACK -----+\n' + indent(cause.stack) + '\n+----- END CAUSE STACK -----+';
Error.captureStackTrace(this, NueAsyncError);
}
util.inherits(NueAsyncError, Error);
function NueGroupAsyncError(cause, flowName, stepName, stepIndex, asyncIndex, mapping, loopIndex, groupAsyncIndex) {
this.cause = cause;
this.flowName = flowName;
this.stepName = stepName;
this.stepIndex = stepIndex;
this.asyncIndex = asyncIndex;
this.loopIndex = loopIndex;
this.groupAsyncIndex = groupAsyncIndex;
this.name = 'NueGroupAsyncError';
this.message = "An error in an async callback:" +
"\nflowName = '" + flowName + "'," +
"\nstepName = '" + stepName + "'," +
'\nstepIndex = ' + stepIndex + ', ' +
'\nasyncIndex = ' + asyncIndex + ', ' +
'\nloopIndex = ' + loopIndex + ', ' +
'\ngroupAsyncIndex = ' + groupAsyncIndex + ', ' +
'\nmapping = ' + util.inspect(mapping, false, null) +
'\n+----- BEGIN CAUSE STACK -----+\n' + indent(cause.stack) + '\n+----- END CAUSE STACK -----+';
Error.captureStackTrace(this, NueGroupAsyncError);
}
util.inherits(NueGroupAsyncError, Error);
function NueUnhandledError(cause, flowName, stepName) {
this.cause = cause;
this.name = 'NueUnhandledError';
this.message = 'The error must be handled in a last step of flow. ' +
'To indicate error handling completed, set null to `this.err` before exiting the last step:' +
"\nflowName = '" + flowName + "'," +
"\nstepName = '" + stepName + "'" +
'\n+----- BEGIN CAUSE STACK -----+\n' + indent(cause.stack) + '\n+----- END CAUSE STACK -----+';
Error.captureStackTrace(this, NueUnhandledError);
}
util.inherits(NueUnhandledError, Error);
function indent(text) {
return ' ' + text.replace(/\n/g, '\n ');
}

@@ -12,3 +12,3 @@ {

},
"version" : "0.5.0"
"version" : "0.6.0"
}

@@ -40,96 +40,133 @@ nue — An async control-flow library

### flow([Function steps...]) -> Function
### Top Level API
The `nue` module provides following API.
#### flow([Function steps...]) -> Function
Return a function which represents the control-flow.
> Arguments
* `steps`: Optional. Optional functions to execute in series.
> Context
#### flow(String flowName) -> Function
`this` context of each step in a flow has following properties.
Accept a flow name and return another `flow` function.
* `next`: next([Object values...]) -> Void
* A function to execute a next step immediately.
* `flowName`: Required. A flow name to be used for debug.
* `async`: async([Object mapping]) -> Function
* A function to accept an argument mapping definition for a next step and return a callback.
#### parallel([Function steps...]) -> Function
Return a function which represents the parallel control-flow.
The `parallel` must be nested inside a `flow` or another `parallel`.
* `steps`: Optional. Optional functions to execute in parallel.
#### parallel(String flowName) -> Function
Accept a flow name and return another `parallel` function.
* `flowName`: Required. A flow name to be used for debug.
#### as(Number index) -> Object
* `index`: Required. An index to map an asynchronous callback argument to a next step argument.
If the index is zero, an error handling is skipped.
### Step Context API
`flow` and `parallel` API accept functions called `step`s. Each step context object - it means a `this` object in the step function - provides following API.
#### next([Object values...]) -> Void
A function to execute a next step immediately.
* `values`: Optional. Arguments for a next step.
#### async([Object mapping]) -> Function
A function to accept an argument mapping definition for a next step and return a callback.
`async` can be called many times, but all calls are done in same tick.
And all callbacks `async` returns must be called absolutely.
And all callbacks `async` returns must be called.
* `asyncEach`: asyncEach(Array array, Function(element, group, elementIndex, traversedArray)) -> Void
* A function to execute a provided function once per array element asynchronously.
The `group` has a `async` function to accept an argument mapping definition and return a callback.
* `mapping`: Required. An argument mapping definition.
To map single argument, call `as` API and pass its result.
* `asyncEach`: asyncEach(Number concurrency) -> Function
* A function to accept a concurrency number and return another `asyncEach` function which
executes a provided function once per array element asynchronously with the specified cuncurrency.
If you use another `forEach` function directly, default concurrency 10 is used.
```js
fs.readFile('file1', 'utf8', this.async(as(1)));
```
* `exec`: exec(Function function([values...]), [Object args...], Function callback(err, [values...])) -> Void
* A function to execute a specified `function` with `args` asynchronously. The `callback` are executed when the `function` is completed.
To map multiple arguments, pass an object.
* `end`: end([Object values...]) -> Void
* A function to execute a last step immediately to end a control-flow.
```js
child_process.exec('whoami', this.async({stdout: as(1), stderr: as(2)}));
```
* `endWith`: endWith(Error err) -> Void
* A function to execute a last step immediately with an error to end a control-flow.
The parameter `err` is referred as `this.err` in a last step.
#### asyncEach(Array array, Function callback(element, group, index, traversedArray)) -> Void
* `data`: Object
* A object to share arbitrary data between steps in a control-flow.
A function to execute a provided function once per array element asynchronously.
* `flowName`: String
* A flow name.
* `array`: Required. An array.
* `callback`: Required. A function being executed once per array element.
The context object in the callback is same with outer step context object.
* `element`: Required. A current element.
* `group`: Required. Provedes `async` function to accept an argument mapping definition and return a callback.
* `index`: Required. An element index.
* `traversedArray`: Required. An array object being traversed.
* `stepName`: String
* A step name.
#### asyncEach(Number concurrency) -> Function
* `history`: Array
* An array to contain information about executed steps. This is an EXPERIMETAL FEATURE.
A function to accept a concurrency number and return another `asyncEach` function which
executes a provided function once per array element asynchronously with the specified cuncurrency.
If you use another `forEach` function directly, default concurrency 10 is used.
In addition to above ones, the context of a last step has a following property.
* `concurrency`: Required. the number of concurrency.
* `err`: Object
* An object represents an error which is thrown with `throw`, passed to `this.endWith` or
passed to an async callback as first argument.
#### exec(Function function, [Object args...], Function callback(err, [values...])) -> Void
### flow(String flowName) -> Function
A function to execute a specified `function` with `args` asynchronously.
Accept a flow name and return another `flow` function.
* `function`: Required. A function to be executed asynchronously.
* `args`: Optional. Arguments for the `function`.
* `callback`: Required. A function to be executed when the `function` is completed.
* `err`: Required. An error in an async call.
* `values`: Required. Results from the `function`.
> Arguments
#### end([Object values...]) -> Void
* `flowName`: Required. A flow name to be used for debug.
A function to execute a last step immediately to end a control-flow.
### parallel([Function steps...]) -> Function
* `values`: Optional. Arguments for a last step.
Return a function which represents the parallel control-flow.
The `parallel` must be nested inside a `flow` or another `parallel`.
#### endWith(Error err) -> Void
> Arguments
A function to execute a last step immediately with an error to end a control-flow.
* `steps`: Optional. Optional functions to execute in parallel.
* `err`: Required. An error object. This object can be referred as `this.err` in a last step.
### parallel(String flowName) -> Function
#### data : Object
Accept a flow name and return another `parallel` function.
A object to share arbitrary data between steps in a control-flow.
> Arguments
#### flowName : String
* `flowName`: Required. A flow name to be used for debug.
A flow name.
### as(Number index) -> Object
#### stepName : String
> Arguments
A step name.
* `index`: Required. An index to map an asynchronous callback argument to a next step argument.
If the index is zero, an error handling is skipped.
#### err : Object
## Arguments Passing Between Functions
An error object, which is thrown with `throw`, passed to `this.endWith` or passed to an async callback as first argument.
This property is accessible in only last steps.
## More Examples
### Arguments Passing Between Functions
Arguments are passed with `this.next` or `this.async`.
### Synchronously
#### Synchronously

@@ -146,3 +183,3 @@ ```js

if (this.err) throw this.err;
console.log(s1 + ' + ' + s2 + ' -> ' + length); // file1 + file2 -> 10
console.log(s1 + '.length + ' + s2 + '.length -> ' + length); // file1.length + file2.length -> 10
console.log('done');

@@ -156,3 +193,3 @@ this.next();

### Asynchronously
#### Asynchronously

@@ -209,3 +246,3 @@ To pass asynchronous call results to a next function, arguments mapping definition is necessary.

## Asynchronous Loop
### Asynchronous Loop

@@ -249,3 +286,3 @@ `this.asyncEach` executes a provided function once per array element asynchronously.

## Flow Nesting
### Flow Nesting

@@ -281,3 +318,3 @@ A flow is composable. So it can be nested.

## Asynchronous Flow Execution
### Asynchronous Flow Execution

@@ -313,3 +350,3 @@ A flow can be executed asynchronously.

## Parallel Flow
### Parallel Flow

@@ -378,3 +415,3 @@ In following example, the flow `par1-1` and `par1-2` are executed in parallel.

## Data Sharing Between Functions
### Data Sharing Between Functions

@@ -411,3 +448,3 @@ Each step in a flow can share data through `this.data`.

## Error Handling
### Error Handling

@@ -424,4 +461,2 @@ In a last step in a flow, `this.err` represents an error which is thrown with `throw`, passed to `this.endWith` or passed to an async callback as first argument.

function readFiles(file1, file2) {
if (!file1) this.endWith(new Error('file1 is illegal.'));
if (!file2) this.endWith(new Error('file2 is illegal.'));
fs.readFile(file1, 'utf8', this.async(as(1)));

@@ -450,3 +485,3 @@ fs.readFile(file2, 'utf8', this.async(as(1)));

## Unit Test with Mocha
### Unit Test with Mocha

@@ -502,1 +537,33 @@ Following example shows how to test a flow and a function with [Mocha](http://visionmedia.github.com/mocha/).

```
## Debugging
Use `NODE_DEBUG=nue`.
### Example
> hoge.js
```js
var flow = require('nue').flow;
flow('hoge')(
function add(x, y) {
this.next(x + y);
},
function done(result) {
if (this.err) throw this.err;
console.log(result);
}
)(10, 20);
```
> Run and Output
```sh
$ NODE_DEBUG=nue node hoge.js
NUE: begin TOP_LEVEL_FLOW. flow: hoge(0), calledAt: /private/tmp/hoge.js:11:1, args: [ 10, 20 ]
NUE: begin STEP. flow: hoge(0), step: add(0), args: [ 10, 20 ]
NUE: begin STEP. flow: hoge(0), step: done(1), args: [ 30 ]
30
```
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc