Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

nue

Package Overview
Dependencies
Maintainers
1
Versions
17
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

nue - npm Package Compare versions

Comparing version 0.4.0 to 0.5.0

examples/argPassingAsync.js

6

CHANGELOG.md
# Changelog
- 0.5.0 (2012/03/04)
- New Feature - `nue.as` function is introduced to map asynchronous callback arguments to next function ones.
- Change - in a step function, `this.async` accepts arguments mapping definition to pass callback arguments to a next function.
- Change - in a step function, `this.forEach` function is removed and `this.asyncEach` is added instead.
- Change - in a step function, `this.args` property is removed.
- 0.4.0 (2012/02/27)

@@ -4,0 +10,0 @@ - New Feature - `nue.parallel` is available to execute some steps in parallel.

5

examples/dataSharing.js
var flow = require('../index').flow;
var as = require('../index').as;
var fs = require('fs');

@@ -8,4 +9,4 @@

this.data.file2 = file2;
fs.readFile(file1, 'utf8', this.async());
fs.readFile(file2, 'utf8', this.async());
fs.readFile(file1, 'utf8', this.async(as(1)));
fs.readFile(file2, 'utf8', this.async(as(1)));
},

@@ -12,0 +13,0 @@ function concat(data1, data2) {

var flow = require('../index').flow;
var as = require('../index').as;
var fs = require('fs');

@@ -8,4 +9,4 @@

if (!file2) this.endWith(new Error('file2 is illegal.'));
fs.readFile(file1, 'utf8', this.async());
fs.readFile(file2, 'utf8', this.async());
fs.readFile(file1, 'utf8', this.async(as(1)));
fs.readFile(file2, 'utf8', this.async(as(1)));
},

@@ -12,0 +13,0 @@ function concat(data1, data2) {

7

examples/flow.js
var flow = require('../index').flow;
var as = require('../index').as;
var fs = require('fs');
var myFlow = flow(
var myFlow = flow('myFlow')(
function readFiles(file1, file2) {
fs.readFile(file1, 'utf8', this.async());
fs.readFile(file2, 'utf8', this.async());
fs.readFile(file1, 'utf8', this.async(as(1)));
fs.readFile(file2, 'utf8', this.async(as(1)));
},

@@ -9,0 +10,0 @@ function concat(data1, data2) {

var flow = require('../index').flow;
var as = require('../index').as;
var fs = require('fs');

@@ -6,3 +7,3 @@

function readFile(file) {
fs.readFile(file, 'utf8', this.async());
fs.readFile(file, 'utf8', this.async(as(1)));
}

@@ -16,5 +17,5 @@ );

subFlow,
function end(data) {
function end(result) {
if (this.err) throw this.err;
console.log(data);
console.log(result);
console.log('done');

@@ -21,0 +22,0 @@ this.next();

var flow = require('../index').flow;
var as = require('../index').as;
var fs = require('fs');

@@ -6,3 +7,3 @@

function readFile(file) {
fs.readFile(file, 'utf8', this.async());
fs.readFile(file, 'utf8', this.async(as(1)));
}

@@ -13,4 +14,4 @@ );

function start() {
this.exec(subFlow, 'file1', this.async());
this.exec(subFlow, 'file2', this.async());
this.exec(subFlow, 'file1', this.async(as(1)));
this.exec(subFlow, 'file2', this.async(as(1)));
},

@@ -17,0 +18,0 @@ function end(data1, data2) {

var flow = require('../index').flow;
function sleep(flow, ms) {
setTimeout(function () {
flow.next();
}, ms);
function sleep(ms) {
setTimeout(this.async(), ms);
}

@@ -12,3 +10,3 @@

console.log('wait... ' + new Date());
sleep(this, 1000);
this.exec(sleep, 1000, this.async());
},

@@ -15,0 +13,0 @@ function end() {

var flow = require('../../index').flow;
var as = require('../../index').as;
var fs = require('fs');

@@ -6,4 +7,4 @@

function (file1, file2) {
fs.readFile(file1, 'utf8', this.async());
fs.readFile(file2, 'utf8', this.async());
fs.readFile(file1, 'utf8', this.async(as(1)));
fs.readFile(file2, 'utf8', this.async(as(1)));
},

@@ -16,3 +17,3 @@ function (data1, data2) {

function read(file) {
fs.readFile(file, 'utf8', this.async());
fs.readFile(file, 'utf8', this.async(as(1)));
}

@@ -19,0 +20,0 @@

'use strict';
exports.name = 'nue';
exports.version = '0.4.0';
exports.version = '0.5.0';
exports.flow = flow;
exports.parallel = parallel;
exports.as = as;
var EventEmitter = require('events').EventEmitter;
var util = require('util');

@@ -23,80 +23,60 @@ var assert = require('assert');

}
}
function normalizeArgs(args) {
if (args.length === 1 && Array.isArray(args[0])) {
return args[0];
function prepareFlow(flowName, functions) {
function startFlow() {
var isTopLevel = !(this instanceof ContextBase);
var flow = {
flowName: flowName,
context: this,
isTopLevel: isTopLevel,
history: isTopLevel ? [] : this.history,
data: {},
err: null
};
var steps = functions.length > 0 ? functions : [function () { this.next(); }];
var head = chainSteps(flow, steps);
head.apply(this, arguments);
}
startFlow.stepName = flowName;
return startFlow;
}
return Array.prototype.slice.call(args);
}
function prepareFlow(flowName, functions) {
function startFlow() {
var steps = functions.length > 0 ? functions : function () { this.next(); };
var args = Array.prototype.slice.call(arguments);
runFlow(flowName, chainSteps(steps), args, this);
}
startFlow.stepName = flowName;
return startFlow;
}
function chainSteps(flow, steps) {
var len = steps.length;
var lastIndex = len - 1;
var last = makeLastStep(steps[lastIndex], lastIndex);
var isThrown = false;
if (len === 1) {
return last;
}
function runFlow(flowName, steps, callerArgs, callerContext) {
var flow = new Flow(flowName, callerArgs, callerContext, steps[steps.length - 1]);
if (steps.length > 1) {
runStep(flow, steps[0]);
} else {
runLastStep(flow);
}
}
return steps.reduceRight(function chain(prev, curr, i) {
return function step() {
var next = i === len - 2 ? last : prev;
var context = new StepContext(flow, curr.stepName || curr.name, i, next, last);
try {
curr.apply(context, arguments);
} catch (e) {
if (isThrown) {
throw e;
}
StepContext.prototype.endWith.call(context, e);
}
}
});
function chainSteps(steps) {
steps = steps.map(function (step, i) {
assert.equal(typeof step, 'function', 'Each argument must be a function.');
var fn = function applyStep() {
step.apply(this, arguments);
};
fn.stepName = step.stepName || step.name;
fn.stepIndex = i;
attachEmitter(fn);
return fn;
});
var len = steps.length - 1;
for (var i = 0; i < len; i++) {
(function chain(i, step, next) {
if (i < len - 1) {
step.events.once('done', function startStep(flow) {
runStep(flow, next);
});
} else {
step.events.once('done', function startLastStep(flow) {
runLastStep(flow);
});
function makeLastStep(fn, index) {
return function lastStep() {
var context = new LastStepContext(flow, fn.stepName || fn.name, index);
try {
fn.apply(context, arguments);
} catch (e) {
isThrown = true;
throw e;
}
}
}(i, steps[i], steps[i + 1]));
}
return steps;
}
function runStep(flow, step) {
var context = new StepContext(flow, step);
try {
step.apply(context, flow.args);
} catch (e) {
if (flow.isErrThrown) {
throw e;
}
StepContext.prototype.endWith.call(context, e);
}
}
function runLastStep(flow) {
var context = new LastStepContext(flow);
try {
flow.lastStep.apply(context, flow.args);
} catch (e) {
flow.isErrThrown = true;
throw e;
}
}
function parallel() {

@@ -111,32 +91,34 @@ if (arguments.length === 1 && typeof arguments[0] === 'string') {

}
}
function prepareParallel(flowName, functions) {
function startParallel() {
assert.ok(this instanceof ContextBase, 'A `parallel` must be inside a flow or another parallel.');
var args = Array.prototype.slice.call(arguments);
runParallel(flowName, functions, args, this);
function prepareParallel(flowName, functions) {
function startParallel() {
assert.ok(this instanceof ContextBase, 'A `parallel` must be inside a flow or another parallel.');
var self = this;
var args = Array.prototype.slice.call(arguments);
self.asyncEach(1)(functions, function (fn, group) {
assert.equal(typeof fn, 'function', 'Each argument must be a function.');
var callback = group.async();
var end = function end() {
callback.apply(null, [this.err].concat(Array.prototype.slice.call(arguments)));
this.err = null;
};
end.stepName = (fn.stepName || fn.name) + '_end';
flow(flowName)(fn, end).apply(self, args);
});
process.nextTick(self.async(Async.SIGNAL_UNLOCK));
}
startParallel.stepName = flowName;
return startParallel;
}
startParallel.stepName = flowName;
return startParallel;
}
function runParallel(flowName, functions, callerArgs, callerContext) {
callerContext.forEach(1)(functions, function (fn) {
assert.equal(typeof fn, 'function', 'Each argument must be a function.');
var callback = callerContext.async();
var end = function end() {
callback.apply(null, [this.err].concat(this.args));
this.err = null;
};
end.stepName = (fn.stepName || fn.name) + '_end';
flow(flowName)(fn, end).apply(callerContext, callerArgs);
});
function as(index) {
return new As(index);
}
function attachEmitter(target) {
if (!target.__nue__) {
target.events = new EventEmitter();
target.__nue__ = true;
function normalizeArgs(args) {
if (args.length === 1 && Array.isArray(args[0])) {
return args[0];
}
return Array.prototype.slice.call(args);
}

@@ -147,106 +129,139 @@

function Flow(flowName, args, callerContext, lastStep){
this.flowName = flowName;
this.args = args;
this.callerContext = callerContext;
this.lastStep = lastStep;
this.isTopLevel = !(callerContext instanceof ContextBase);
this.history = this.isTopLevel ? [] : callerContext.history;
this.data = {};
this.err = null;
this.isErrThrown = false;
function As(index) {
this.index = index;
}
Flow.prototype.exit = function exit(err, args) {
if (this.isTopLevel) {
if (err) {
throw new NueUnhandledError(err);
function Async(lock, next, endWith) {
this.lock = lock;
this.next = next;
this.endWith = endWith;
this.index = 0;
this.pending = 0;
this.isCanceled = false;
this.results = [];
}
Async.SIGNAL_UNLOCK = {};
Async.prototype.makeCallback = function makeCallback(mapping, loopIndex, groupAsyncIndex) {
assert.ok(typeof mapping === 'object' || mapping === undefined, 'The argument `mapping` must be an object if specified.');
this.pending++;
var isFirst = this.index === 0;
var index = mapping === Async.SIGNAL_UNLOCK ? -1 : this.index++;
if (isFirst && !this.lock) {
this.lock = true;
process.nextTick(this.makeCallback(Async.SIGNAL_UNLOCK));
}
var self = this;
var asyncCallback = function asyncCallback(err) {
self.pending--;
if (!self.isCanceled) {
if (isErrorHandleRequire(mapping) && err) {
self.isCanceled = true;
self.endWith.call(null, err, index, mapping, loopIndex, groupAsyncIndex);
} else {
if (mapping === Async.SIGNAL_UNLOCK) {
self.lock = false;
} else {
self.results[index] = mapping ? mapArguments(mapping, arguments) : Array.prototype.slice.call(arguments, 1);
}
if (self.pending === 0 && !self.lock) {
self.next.apply(null, self.results);
}
}
}
} else {
if (err) {
this.callerContext.endWith.call(this.callerContext, err);
} else {
this.callerContext.next.apply(this.callerContext, args);
};
asyncCallback.index = index;
return asyncCallback;
function isErrorHandleRequire(mapping) {
if (!mapping) {
return true;
}
if (mapping instanceof As) {
return mapping.index !== 0;
}
return Object.keys(mapping).every(function (key) {
var value = mapping[key];
if (value instanceof As) {
return value.index !== 0;
}
return true;
});
}
function mapArguments(mapping, args) {
if (mapping instanceof As) {
return args[mapping.index];
}
return Object.keys(mapping).reduce(function (result, key) {
var value = mapping[key];
if (value instanceof As) {
result[key] = args[value.index];
} else {
result[key] = value;
}
return result;
}, {});
}
};
function ContextBase(flow, step) {
function ContextBase(flow, name, index, next, last) {
this._flow = flow;
this._step = step;
this._stepIndex = step.stepIndex;
this._asyncIndex = 0;
this._asyncCallCount = 0;
this._isAsyncCanceled = false;
this._results = [];
this._wait = false;
this._next = next;
this._last = last;
this.err = flow.err;
this.args = flow.args;
this.data = flow.data;
this.flowName = flow.flowName;
this.stepName = step.stepName;
this.stepName = name;
this.stepIndex = index;
this.history = flow.history;
this.history.push(new HistoryEntry(this.flowName, this.stepName, this._stepIndex));
this.next = this.next.bind(this);
this.end = this.end.bind(this);
this.endWith = this.endWith.bind(this);
this.async = this.async.bind(this);
this.forEach = this.forEach.bind(this);
this.exec = this.exec.bind(this);
this.history.push(new HistoryEntry(flow.flowName, name, index));
var self = this;
this._asyncObj = new Async(false, this.next.bind(this), function endWith(err, asyncIndex, mapping) {
self.endWith.call(self, new NueAsyncError(err, self.flowName, self.stepName, self.stepIndex, asyncIndex, mapping));
});
}
ContextBase.SIGNAL_WAIT_CANCEL = {};
ContextBase.DEFAULT_CONCURRENCY = 10;
ContextBase.prototype.async = function async() {
this._asyncCallCount++;
var args = Array.prototype.slice.call(arguments);
var self = this;
return (function makeCallback(args, asyncIndex) {
function callback(err) {
self._asyncCallCount--;
if (!self._isAsyncCanceled) {
if (err) {
self._isAsyncCanceled = true;
self.endWith.call(self, new NueAsyncError(err, self.flowName, self.stepName, self._stepIndex, asyncIndex));
} else {
if (args[0] === ContextBase.SIGNAL_WAIT_CANCEL) {
self._wait = false;
} else {
self._results[asyncIndex] = args.concat(Array.prototype.slice.call(arguments, 1));
}
if (self._asyncCallCount === 0 && !self._wait) {
self.next.apply(self, self._results.map(function (array) {
switch(array.length) {
case 0: return undefined;
case 1: return array[0];
default: return array;
}
}));
}
}
}
}
return callback;
}(args, this._asyncIndex++));
ContextBase.prototype._disable = function _disable() {
this.next = noop;
this.end = noop;
this.endWith = noop;
};
ContextBase.prototype.forEach = function forEach() {
ContextBase.prototype.async = function async(mapping) {
return this._asyncObj.makeCallback.call(this._asyncObj, mapping);
};
ContextBase.prototype.asyncEach = function asyncEach() {
var self = this;
var callback = self.async();
var next = function next() {
var args = Array.prototype.slice.call(arguments);
return callback.apply(self, [null].concat(flattenArray(args)));
};
var endWith = function endWith(cause, asyncIndex, mapping, loopIndex, asyncGroupIndex) {
var err = new NueGroupAsyncError(cause, self.flowName, self.stepName, self.stepIndex, callback.index, mapping, loopIndex, asyncGroupIndex);
return self.endWith.call(self, err);
};
var async = new Async(true, next, endWith);
if (arguments.length === 1 && typeof arguments[0] === 'number') {
var concurrency = arguments[0];
return function () {
waitAndConsume(concurrency, arguments[0], arguments[1]);
validateAndConsume(concurrency, arguments[0], arguments[1]);
};
} else {
waitAndConsume(ContextBase.DEFAULT_CONCURRENCY, arguments[0], arguments[1]);
validateAndConsume(ContextBase.DEFAULT_CONCURRENCY, arguments[0], arguments[1]);
}
function waitAndConsume(concurrency, array, worker) {
function validateAndConsume(concurrency, array, worker) {
assert.ok(Array.isArray(array), 'An argument `array` must be an array.');
assert.equal(typeof worker, 'function', 'An argument `worker` must be a function.');
self._wait = true;
consume(concurrency, array, worker, 0);
process.nextTick(function consumeFirst() {
consume(concurrency, array, worker, 0);
});
}

@@ -257,6 +272,14 @@

for (var i = 0; i < concurrency && index < len; i++, index++) {
worker.call(self, array[index], index, array);
(function callWorker(index) {
var count = 0;
var group = {
async: function (mapping) {
return async.makeCallback.call(async, mapping, index, count++);
}
};
worker.call(null, array[index], group, index, array);
}(index));
}
if (index === len) {
process.nextTick(self.async(ContextBase.SIGNAL_WAIT_CANCEL));
process.nextTick(async.makeCallback(Async.SIGNAL_UNLOCK));
} else {

@@ -268,2 +291,16 @@ process.nextTick(function consumeNext() {

}
function flattenArray(array) {
var results = [];
array.forEach(function (array2) {
if (Array.isArray(array2)) {
array2.forEach(function (e) {
results.push(e);
});
} else {
results.push(array2);
}
});
return results;
}
};

@@ -279,3 +316,3 @@

var end = function end() {
callback.apply(null, [this.err].concat(this.args));
callback.apply(null, [this.err].concat(Array.prototype.slice.call(arguments)));
this.err = null;

@@ -290,4 +327,4 @@ };

function StepContext(flow, step) {
ContextBase.call(this, flow, step);
function StepContext(flow, name, index, next, last) {
ContextBase.call(this, flow, name, index, next, last);
}

@@ -297,25 +334,22 @@ util.inherits(StepContext, ContextBase);

StepContext.prototype.next = function next() {
disableContextMethods(this);
this._disable();
this._flow.err = null;
this._flow.args = Array.prototype.slice.call(arguments);
this._step.events.emit('done', this._flow);
this._next.apply(null, arguments);
};
StepContext.prototype.end = function end() {
disableContextMethods(this);
this._disable();
this._flow.err = null;
this._flow.args = Array.prototype.slice.call(arguments);
runLastStep(this._flow);
this._last.apply(null, arguments);
};
StepContext.prototype.endWith = function endWith(err) {
disableContextMethods(this);
this._disable();
this._flow.err = err;
this._flow.args = [];
runLastStep(this._flow);
this._last.call(null);
};
function LastStepContext(flow) {
ContextBase.call(this, flow, flow.lastStep);
function LastStepContext(flow, name, index) {
ContextBase.call(this, flow, name, index);
}

@@ -325,24 +359,31 @@ util.inherits(LastStepContext, ContextBase);

LastStepContext.prototype.next = function next() {
disableContextMethods(this);
this._flow.exit(this.err, Array.prototype.slice.call(arguments));
this._disable();
this._exit(Array.prototype.slice.call(arguments));
};
LastStepContext.prototype.end = function end() {
disableContextMethods(this);
this._flow.exit(this.err, Array.prototype.slice.call(arguments));
this._disable();
this._exit(Array.prototype.slice.call(arguments));
};
LastStepContext.prototype.endWith = function endWith(err) {
disableContextMethods(this);
this._flow.exit(err, []);
this._disable();
this.err = err;
this._exit([]);
};
LastStepContext.prototype._exit = function _exit(args) {
if (this._flow.isTopLevel) {
if (this.err) {
throw new NueUnhandledError(this.err, this.flowName, this.stepName);
}
} else {
if (this.err) {
this._flow.context.endWith.call(this._flow.context, this.err);
} else {
this._flow.context.next.apply(this._flow.context, args);
}
}
};
function disableContextMethods(context) {
context.next = noop;
context.end = noop;
context.endWith = noop;
}
function HistoryEntry(flowName, stepName, stepIndex) {

@@ -361,3 +402,3 @@ this.flowName = flowName;

function NueAsyncError(cause, flowName, stepName, stepIndex, asyncIndex) {
function NueAsyncError(cause, flowName, stepName, stepIndex, asyncIndex, mapping) {
this.cause = cause;

@@ -369,7 +410,9 @@ this.flowName = flowName;

this.name = 'NueAsyncError';
this.message = "An error in an async callback: " +
"flowName = '" + flowName + "', stepName = '" + stepName +
"', stepIndex = " + stepIndex + ', asyncIndex = ' + asyncIndex + '\n' +
'+----- BEGIN CAUSE STACK -----+\n' + cause.stack + '\n' +
'+----- END CAUSE STACK -----+';
this.message = "An error in an async callback:" +
"\nflowName = '" + flowName + "'," +
"\nstepName = '" + stepName + "'," +
'\nstepIndex = ' + stepIndex + ', ' +
'\nasyncIndex = ' + asyncIndex + ', ' +
'\nmapping = ' + util.inspect(mapping, false, null) +
'\n+----- BEGIN CAUSE STACK -----+\n' + indent(cause.stack) + '\n+----- END CAUSE STACK -----+';
Error.captureStackTrace(this, NueAsyncError);

@@ -380,11 +423,40 @@ }

function NueUnhandledError(cause) {
function NueGroupAsyncError(cause, flowName, stepName, stepIndex, asyncIndex, mapping, loopIndex, groupAsyncIndex) {
this.cause = cause;
this.flowName = flowName;
this.stepName = stepName;
this.stepIndex = stepIndex;
this.asyncIndex = asyncIndex;
this.loopIndex = loopIndex;
this.groupAsyncIndex = groupAsyncIndex;
this.name = 'NueGroupAsyncError';
this.message = "An error in an async callback:" +
"\nflowName = '" + flowName + "'," +
"\nstepName = '" + stepName + "'," +
'\nstepIndex = ' + stepIndex + ', ' +
'\nasyncIndex = ' + asyncIndex + ', ' +
'\nloopIndex = ' + loopIndex + ', ' +
'\ngroupAsyncIndex = ' + groupAsyncIndex + ', ' +
'\nmapping = ' + util.inspect(mapping, false, null) +
'\n+----- BEGIN CAUSE STACK -----+\n' + indent(cause.stack) + '\n+----- END CAUSE STACK -----+';
Error.captureStackTrace(this, NueGroupAsyncError);
}
util.inherits(NueGroupAsyncError, Error);
function NueUnhandledError(cause, flowName, stepName) {
this.cause = cause;
this.name = 'NueUnhandledError';
this.message = 'The error must be handled in a last step. ' +
'To indicate error handling completed, set null to `this.err` before exiting the last step.\n' +
'+----- BEGIN CAUSE STACK -----+\n' + cause.stack + '\n' +
'+----- END CAUSE STACK -----+';
this.message = 'The error must be handled in a last step of flow. ' +
'To indicate error handling completed, set null to `this.err` before exiting the last step:' +
"\nflowName = '" + flowName + "'," +
"\nstepName = '" + stepName + "'" +
'\n+----- BEGIN CAUSE STACK -----+\n' + indent(cause.stack) + '\n+----- END CAUSE STACK -----+';
Error.captureStackTrace(this, NueUnhandledError);
}
util.inherits(NueUnhandledError, Error);
function indent(text) {
return ' ' + text.replace(/\n/g, '\n ');
}

@@ -12,3 +12,3 @@ {

},
"version" : "0.4.0"
"version" : "0.5.0"
}

@@ -16,8 +16,9 @@ nue — An async control-flow library

var flow = require('nue').flow;
var as = require('as').as;
var fs = require('fs');
var myFlow = flow(
var myFlow = flow('myFlow')(
function readFiles(file1, file2) {
fs.readFile(file1, 'utf8', this.async());
fs.readFile(file2, 'utf8', this.async());
fs.readFile(file1, 'utf8', this.async(as(1)));
fs.readFile(file2, 'utf8', this.async(as(1)));
},

@@ -55,11 +56,14 @@ function concat(data1, data2) {

* `async`: async([Object values...]) -> Function
* A function to accept parameters for a next step and return a callback.
* `async`: async([Object mapping]) -> Function
* A function to accept an argument mapping definition for a next step and return a callback.
`async` can be called many times, but all calls are done in same tick.
And all callbacks `async` returns must be called absolutely.
* `forEach`: forEach(Array array, Function(element, elementIndex, traversedArray)) -> Void
* A function to execute a provided function once per array element in parallel.
* `asyncEach`: asyncEach(Array array, Function(element, group, elementIndex, traversedArray)) -> Void
* A function to execute a provided function once per array element asynchronously.
The `group` has a `async` function to accept an argument mapping definition and return a callback.
* `forEach`: forEach(Number concurrency) -> Function
* A function to accept a concurrency number and return another `forEach` function which
executes a provided function once per array element in prallel with the specified cuncurrency.
* `asyncEach`: asyncEach(Number concurrency) -> Function
* A function to accept a concurrency number and return another `asyncEach` function which
executes a provided function once per array element asynchronously with the specified cuncurrency.
If you use another `forEach` function directly, default concurrency 10 is used.

@@ -80,10 +84,7 @@

* `args`: Array
* An array equivalent to `arguments` for a step except this is real Array.
* `flowName`: String
* flow name.
* A flow name.
* `stepName`: String
* step name.
* A step name.

@@ -105,3 +106,3 @@ * `history`: Array

* `flowName`: Required. Flow name to be used for debug.
* `flowName`: Required. A flow name to be used for debug.

@@ -123,10 +124,134 @@ ### parallel([Function steps...]) -> Function

* `flowName`: Required. Flow name to be used for debug.
* `flowName`: Required. A flow name to be used for debug.
### as(Number index) -> Object
> Arguments
* `index`: Required. An index to map an asynchronous callback argument to a next step argument.
If the index is zero, an error handling is skipped.
## Arguments Passing Between Functions
Arguments are passed with `this.next` or `this.async`.
### Synchronously
```js
var flow = require('nue').flow;
var myFlow = flow('myFlow')(
function concat(s1, s2) {
var length = s1.length + s2.length
this.next(s1, s2, length);
},
function end(s1, s2, length) {
if (this.err) throw this.err;
console.log(s1 + ' + ' + s2 + ' -> ' + length); // file1 + file2 -> 10
console.log('done');
this.next();
}
);
myFlow('file1', 'file2');
```
### Asynchronously
To pass asynchronous call results to a next function, arguments mapping definition is necessary.
The function `as` accepts an index to specify a callback argument and returns arguments mapping definition.
The function `this.async` accepts the mapping definition and return a callback.
When all callbacks are completed, the next function is called with specific arguments.
```js
var flow = require('nue').flow;
var as = require('as').as;
var fs = require('fs');
var myFlow = flow('myFlow')(
function readFiles(file1, file2) {
fs.readFile(file1, 'utf8', this.async(as(1)));
fs.readFile(file2, 'utf8', this.async(as(1)));
},
function end(data1, data2) {
if (this.err) throw this.err;
console.log(data1 + data2); // FILE1FILE2
console.log('done');
this.next();
}
);
myFlow('file1', 'file2');
```
Arguments mapping definition can contain arbitrary values.
```js
var flow = require('nue').flow;
var as = require('as').as;
var fs = require('fs');
var myFlow = flow('myFlow')(
function readFiles(file1, file2) {
fs.readFile(file1, 'utf8', this.async({name: file1, data: as(1)}));
fs.readFile(file2, 'utf8', this.async({name: file2, data: as(1)}));
},
function end(f1, f2) {
if (this.err) throw this.err;
console.log(f1.name + ' and ' + f2.name + ' have been read.'); // file1 and file2 have been read.
console.log(f1.data + f2.data); // FILE1FILE2
console.log('done');
this.next();
}
);
myFlow('file1', 'file2');
```
## Asynchronous Loop
`this.asyncEach` executes a provided function once per array element asynchronously.
By default, the number of concurrency is 10.
```js
var flow = require('nue').flow;
var as = require('as').as;
var fs = require('fs');
var myFlow = flow('myFlow')(
function readFiles(files) {
this.asyncEach(files, function (file, group) {
fs.readFile(file, 'utf8', group.async({name: file, data: as(1)}));
});
},
function end(files) {
if (this.err) throw this.err;
var names = files.map(function (f) { return f.name; });
var contents = files.map(function (f) { return f.data});
console.log(names.join(' and ') + ' have been read.'); // file1 and file2 have been read.
console.log(contents.join('')); // FILE1FILE2
this.next();
}
);
myFlow(['file1', 'file2']);
```
To change the number of concurrency, specify the number as below.
```js
function readFiles(files) {
this.asyncEach(5)(files, function (file, group) {
...
});
},
```
## Flow Nesting
A flow can be nested.
A flow is composable. So it can be nested.
```js
var flow = require('nue').flow;
var as = require('as').as;
var fs = require('fs');

@@ -136,3 +261,3 @@

function readFile(file) {
fs.readFile(file, 'utf8', this.async());
fs.readFile(file, 'utf8', this.async(as(1)));
}

@@ -146,5 +271,5 @@ );

subFlow,
function end(data) {
function end(result) {
if (this.err) throw this.err;
console.log(data);
console.log(result);
console.log('done');

@@ -158,8 +283,9 @@ this.next();

## Flow Nesting and Asynchronous Execution
## Asynchronous Flow Execution
A nested sub-flow can be executed asynchronously.
A flow can be executed asynchronously.
```js
var flow = require('nue').flow;
var as = require('as').as;
var fs = require('fs');

@@ -169,3 +295,3 @@

function readFile(file) {
fs.readFile(file, 'utf8', this.async());
fs.readFile(file, 'utf8', this.async(as(1)));
}

@@ -176,4 +302,4 @@ );

function start() {
this.exec(subFlow, 'file1', this.async());
this.exec(subFlow, 'file2', this.async());
this.exec(subFlow, 'file1', this.async(as(1)));
this.exec(subFlow, 'file2', this.async(as(1)));
},

@@ -193,3 +319,3 @@ function end(data1, data2) {

In Following example, the flow `par1-1` and `par1-2` are executed in parallel.
In following example, the flow `par1-1` and `par1-2` are executed in parallel.

@@ -225,23 +351,26 @@ ```js

## Arguments Passing Between Functions
Arguments to a parallel flow are passed to every forked functions.
Parallel flow results are passed to a next funtion as an array.
The array contains the results of forked functions.
arguments are passed with `this.next` or `this.async`.
```js
var flow = require('nue').flow;
var fs = require('fs');
var parallel = require('nue').parallel;
var myFlow = flow('myFlow')(
function readFiles(file1, file2) {
fs.readFile(file1, 'utf8', this.async(file1));
fs.readFile(file2, 'utf8', this.async(file2));
var myFlow = flow('main')(
function start() {
this.next(10, 20);
},
function concat(data1, data2) {
console.log(data1[0] + ' and ' + data2[0] + ' have been read.');
this.next(data1[1] + data2[1]);
},
function end(data) {
parallel('parallel')(
function add(x, y) {
this.next(x + y);
},
function sub(x, y) {
this.next(x - y);
}
),
function end(results) {
if (this.err) throw this.err;
console.log(data);
console.log('done');
console.log('add result: ' + results[0]); // add result: 30
console.log('sub result: ' + results[1]); // sub result: -10
this.next();

@@ -251,34 +380,5 @@ }

myFlow('file1', 'file2');
myFlow();
```
`this.async` can be called in loop.
Following example produces same results with above example.
```js
var flow = require('nue').flow;
var fs = require('fs');
var myFlow = flow('myFlow')(
function readFiles(files) {
process.nextTick(this.async(files));
this.forEach(files, function (file) {
fs.readFile(file, 'utf8', this.async());
});
},
function concat(files) {
console.log(files.join(' and ') + ' have been read.');
this.next(this.args.slice(1).join(''));
},
function end(data) {
if (this.err) throw this.err;
console.log(data);
console.log('done');
this.next();
}
);
myFlow(['file1', 'file2']);
```
## Data Sharing Between Functions

@@ -292,2 +392,3 @@

var flow = require('nue').flow;
var as = require('as').as;
var fs = require('fs');

@@ -299,4 +400,4 @@

this.data.file2 = file2;
fs.readFile(file1, 'utf8', this.async());
fs.readFile(file2, 'utf8', this.async());
fs.readFile(file1, 'utf8', this.async(as(1)));
fs.readFile(file2, 'utf8', this.async(as(1)));
},

@@ -320,6 +421,7 @@ function concat(data1, data2) {

In a last step in a flow, `this.err` represents an error which is thrown with `throw`, passed to `this.endWith` or passed to an async callback as first argument.
To indicate error handling completion, you must assign `null` to `this.err`.
To indicate error handling is completed, you must assign `null` to `this.err`.
```js
var flow = require('nue').flow;
var as = require('as').as;
var fs = require('fs');

@@ -331,4 +433,4 @@

if (!file2) this.endWith(new Error('file2 is illegal.'));
fs.readFile(file1, 'utf8', this.async());
fs.readFile(file2, 'utf8', this.async());
fs.readFile(file1, 'utf8', this.async(as(1)));
fs.readFile(file2, 'utf8', this.async(as(1)));
},

@@ -361,2 +463,3 @@ function concat(data1, data2) {

var flow = require('nue').flow;
var as = require('as').as;
var fs = require('fs');

@@ -366,4 +469,4 @@

function (file1, file2) {
fs.readFile(file1, 'utf8', this.async());
fs.readFile(file2, 'utf8', this.async());
fs.readFile(file1, 'utf8', this.async(as(1)));
fs.readFile(file2, 'utf8', this.async(as(1)));
},

@@ -376,3 +479,3 @@ function (data1, data2) {

function read(file) {
fs.readFile(file, 'utf8', this.async());
fs.readFile(file, 'utf8', this.async(as(1)));
}

@@ -379,0 +482,0 @@

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc