New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

nue

Package Overview
Dependencies
Maintainers
1
Versions
17
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

nue - npm Package Compare versions

Comparing version 0.0.2 to 0.0.3

examples/each.js

51

examples/parallel.js
var nue = require('../lib/nue');
var start = nue.start;
var serial = nue.serial;
var flow = nue.flow;
var parallel = nue.parallel;
var fs = require('fs');
start(parallel(
function () {
this.fork('LICENSE', 'README.md');
},
[
function (name) {
var self = this;
fs.readFile(name, function (err, data) {
if (err) this.err(err);
self.join(data.length);
});
flow(
parallel(
function () {
this.fork('LICENSE', 'README.md');
},
function (path) {
var self = this;
fs.stat(path, function (err, stats) {
if (err) this.err(err);
self.join(stats.isFile());
});
[
function (name) {
var self = this;
fs.readFile(name, function (err, data) {
if (err) this.end(err);
self.join(data.length);
});
},
function (path) {
var self = this;
fs.stat(path, function (err, stats) {
if (err) this.end(err);
self.join(stats.isFile());
});
}
],
function (err, results) {
if (err) throw err;
console.log(results);
}
],
function (err, results) {
if (err) throw err;
console.log(results);
}
));
)
)();
var nue = require('../lib/nue');
var start = nue.start;
var serial = nue.serial;
var flow = nue.flow;
var parallelEach = nue.parallelEach;
var fs = require('fs');
start(parallelEach(
function () {
this.begin('LICENSE', 'README.md');
},
function (name) {
var self = this;
fs.readFile(name, function (err, data) {
if (err) this.end(err);
self.join(data.length);
});
},
function (err, results) {
if (err) throw err;
console.log(results);
}
));
flow(
parallelEach(
function () {
this.fork('LICENSE', 'README.md');
},
function (name) {
var self = this;
fs.readFile(name, function (err, data) {
if (err) this.end(err);
self.join(data.length);
});
},
function (err, results) {
if (err) throw err;
console.log(results);
}
)
)();

@@ -0,1 +1,2 @@

var EventEmitter = require('events').EventEmitter;
var sentinel = {};

@@ -8,31 +9,21 @@ var noOp = function () {};

exports.version = '0.0.1';
exports.version = '0.0.3';
exports.batchSize = exports.DEFAULT_BATCH_SIZE;
exports.start = function () {
exports.queue = function () {
var args = Array.prototype.slice.apply(arguments);
var callback;
if (args.length === 0) {
return;
}
callback = args.pop();
callback.apply(null, args);
return wrapOrApply(Nue.prototype.queue, args);
};
exports.serialQueue = function () {
exports.flow = function () {
var args = Array.prototype.slice.apply(arguments);
return wrapOrApply(Nue.prototype.serialQueue, args);
return wrapOrApply(Nue.prototype.flow, args);
};
exports.serial = function () {
exports.each = function () {
var args = Array.prototype.slice.apply(arguments);
return wrapOrApply(Nue.prototype.serial, args);
return wrapOrApply(Nue.prototype.each, args);
};
exports.serialEach = function () {
var args = Array.prototype.slice.apply(arguments);
return wrapOrApply(Nue.prototype.serialEach, args);
};
exports.parallelQueue = function () {

@@ -58,4 +49,3 @@ var args = Array.prototype.slice.apply(arguments);

return function () {
var args = Array.prototype.slice.apply(arguments);
return fn.apply(new Nue(batchSize), args)
return fn.apply(new Nue(batchSize), arguments)
};

@@ -66,8 +56,348 @@ }

function Nue(batchSize) {
this.batchSize = batchSize;
}
Nue.prototype.flow = function () {
var tasks = Array.prototype.slice.apply(arguments);
var batchSize = this.batchSize;
var fn = function () {
var args = Array.prototype.slice.apply(arguments);
runFlow(batchSize, tasks, fn, args, this && this.data);
};
Nue.eventify(fn);
return fn;
};
Nue.prototype.each = function (begin, worker, end) {
var batchSize = this.batchSize;
var fn = function () {
var args = Array.prototype.slice.apply(arguments);
runEach(batchSize, begin, worker, end, args, this);
};
Nue.eventify(fn);
return fn;
};
Nue.prototype.queue = function (worker, end) {
return new Queue(this.batchSize, worker, end);
};
Nue.prototype.parallel = function (begin, tasks, end) {
var batchSize = this.batchSize;
var fn = function () {
var args = Array.prototype.slice.apply(arguments);
runParallel(batchSize, begin, tasks, end, args, this);
};
Nue.eventify(fn);
return fn;
};
Nue.prototype.parallelEach = function (begin, worker, end) {
var batchSize = this.batchSize;
var fn = function () {
var args = Array.prototype.slice.apply(arguments);
runParallelEach(batchSize, begin, worker, end, args, this);
};
Nue.eventify(fn);
return fn;
};
Nue.prototype.parallelQueue = function (worker, end) {
var begin = function () {
this.fork();
};
return new ParallelQueue(this.batchSize, begin, worker, end);
};
Nue.eventify = function (fn) {
fn.events = new EventEmitter();
fn.on = function (type, handler) {
if (type === 'done') {
fn.events.on('done', function (context, args) {
handler.apply(context, args);
});
}
return fn;
};
fn.once = function (type, handler) {
if (type === 'done') {
fn.events.once('done', function (context, args) {
handler.apply(context, args);
});
}
return fn;
};
fn.__nue__ = true;
};
function runFlow(batchSize, tasks, caller, callerArgs, data) {
var begin;
var end;
var taskWrappers = tasks.map(function (task) {
var fn;
if (typeof task !== 'function') {
throw new Error('not function');
}
if (task.__nue__) {
return task;
}
fn = function () {
return task.apply(this, arguments);
};
Nue.eventify(fn);
return fn;
});
taskWrappers.forEach(function (task, i, tasks) {
var next = tasks[i + 1];
if (i < tasks.length - 1) {
if (task.__nue__) {
task.events.once('__done__', function(flow) {
flow.callback.call(null, next, flow);
});
}
}
});
begin = taskWrappers[0];
end = taskWrappers[taskWrappers.length - 1];
end.events.on('__done__', function (flow) {
caller.events.emit('__done__', flow);
caller.events.emit('done', {data: flow.data}, flow.args);
});
executeTask(begin, {
args: callerArgs,
callback: executeTask,
data: typeof data !== 'undefined' ? data : {},
endCallback: end,
batchSize: batchSize,
callCount: 0
});
}
function executeTask(task, flow) {
flow.callCount++;
var context = {
data: flow.data,
next: function () {
context.next = noOp;
flow.args = Array.prototype.slice.apply(arguments);
flow.data = context.data;
done(task.events, flow);
},
end: function () {
var endContext = {
data: context.data,
next: function () {
endContext.end = noOp;
flow.args = Array.prototype.slice.apply(arguments);
flow.data = endContext.data;
done(flow.endCallback.events, flow);
},
end: noOp
};
flow.endCallback.apply(context, arguments);
}
};
task.apply(context, flow.args);
function done(emitter, flow) {
if (flow.batchSize > 0 && flow.batchSize === flow.callCount) {
flow.callCount = 0;
process.nextTick(function () {
emitter.emit('__done__', flow);
});
} else {
emitter.emit('__done__', flow);
}
}
}
function runEach(batchSize, begin, worker, end, callerArgs, callerContext) {
var valueIndex = 0;
var valueLength = 0;
var dataAvailable = callerContext && typeof callerContext.data !== 'undefined';
var data = dataAvailable ? callerContext.data : {};
var results = [];
var executeEnd = function () {
if (dataAvailable) {
callerContext.data = data;
}
if (end) {
end.apply(callerContext, Array.prototype.slice.apply(arguments));
}
};
var context = {
data: data,
next: function () {
var args = Array.prototype.slice.apply(arguments);
var values;
context.next = noOp;
if (args.length === 1 && Array.isArray(args[0])) {
values = args[0];
} else {
values = args;
}
valueLength = values.length;
data = context.data;
(function executeBatch () {
var tasks = batchSize > 0 ? values.splice(0, batchSize) : values;
if (tasks.length === 0) {
var fn = executeEnd;
executeEnd = noOp;
fn(null, results);
return;
}
(function execute(task, tasks) {
var context = {
isFirst: valueIndex === 0,
isLast: valueIndex === valueLength - 1,
index: valueIndex,
data: data,
next: function () {
var args = Array.prototype.slice.apply(arguments);
context.next = noOp;
data = context.data;
if (args.length === 0) {
results[valueIndex] = undefined;
} else if (args.length === 1) {
results[valueIndex] = args[0];
} else {
results[valueIndex] = args;
}
valueIndex++;
if (tasks.length) {
execute(tasks.shift(), tasks);
} else {
process.nextTick(function () {
executeBatch();
});
}
},
end: function () {
var fn = executeEnd;
executeEnd = noOp;
data = context.data;
fn.apply(null, arguments);
}
};
worker.call(context, task);
}(tasks.shift(), tasks));
}());
}
};
begin.apply(context, callerArgs);
}
function runParallel(batchSize, begin, tasks, end, callerArgs, callerContext) {
var taskWrappers = tasks.map(function (task, i) {
return { value:task, index:i };
});
var dataAvailable = callerContext && typeof callerContext.data !== 'undefined';
var data = dataAvailable ? callerContext.data : {};
var parallelArgs = [];
var results = [];
var taskCount = tasks.length;
var beginContext = {
fork: function () {
var args = Array.prototype.slice.apply(arguments);
if (args.length === 1 && Array.isArray(args[0])) {
parallelArgs = args[0]
} else {
parallelArgs = args;
}
beginContext.fork = noOp;
process.nextTick(function () {
executeBatch(data, false);
});
}
};
begin.apply(beginContext, callerArgs);
function executeBatch (data, isCanceled) {
var tasks;
var task;
var i;
var len;
var context;
var executeEnd = function () {
if (dataAvailable) {
callerContext.data = data;
}
if (end) {
end.apply(callerContext, arguments);
}
};
if (taskCount === 0) {
executeEnd(null, results);
return;
}
tasks = batchSize > 0 ? taskWrappers.splice(0, batchSize) : taskWrappers;
len = tasks.length;
for (i = 0; i < len; i++) {
if (isCanceled) {
break;
}
task = tasks[i];
context = {};
context.data = data;
context.join = (function (index, context) {
return function (result) {
data = context.data;
results[index] = result;
taskCount--;
context.join = noOp;
};
}(task.index, context));
context.end = (function (context) {
return function () {
data = context.data;
isCanceled = true;
executeEnd.apply(null, arguments);
}
}(context));
task.value.call(context, parallelArgs[i]);
}
process.nextTick(function () {
executeBatch(data, isCanceled);
});
}
}
function runParallelEach(batchSize, begin, worker, end, callerArgs, callerContext) {
var context = {
fork: function () {
var args = Array.prototype.slice.apply(arguments);
var values;
var beginBridge;
var tasks;
context.fork = noOp;
if (args.length === 1 && Array.isArray(args[0])) {
values = args[0];
} else {
values = args;
}
beginBridge = function () {
this.fork();
};
tasks = values.map(function (value) {
return function () {
worker.call(this, value);
}
});
runParallel(batchSize, beginBridge, tasks, end, [], callerContext);
}
};
begin.apply(context, callerArgs);
}
/**
*
*
* @param batchSize
* @param worker
* @param end
*/
function SerialQueue(batchSize, worker, end) {
function Queue(batchSize, worker, end) {
this.batchSize = batchSize;

@@ -77,37 +407,40 @@ this.worker = worker;

this.values = [];
this.results = [];
this.isAddingCompleted = false;
this.isPushed = false;
this.pushCount = 0;
this.queueIndex = 0;
this.length = 0;
this.index = 0;
this.data = {};
}
SerialQueue.prototype.push = function(value) {
Queue.prototype.push = function(value) {
var self = this;
if (this.isAddingCompleted) {
throw new Error('This queue has already been frozen.');
throw new Error('This queue has been already frozen.');
}
this.values.push(value);
self.pushCount++;
self.length++;
if (!this.isPushed) {
this.isPushed = true;
process.nextTick(function() {
executeBatch([]);
executeBatch();
});
}
function executeBatch (args) {
var values = self.batchSize > 0
? self.values.splice(0, self.batchSize)
: self.values;
function executeBatch () {
var values = self.batchSize > 0 ? self.values.splice(0, self.batchSize) : self.values;
if (values.length === 0) {
if (self.isAddingCompleted) {
self.end.apply({data: self.data}, args);
self.end.call({data: self.data}, null, self.results);
} else {
process.nextTick(function () {
executeBatch();
});
}
return;
}
(function execute(value, values, args) {
(function execute(value, values) {
var context = {
index: self.queueIndex,
isFirst: self.queueIndex === 0,
isLast: self.isAddingCompleted && self.queueIndex === self.pushCount - 1,
index: self.index,
isFirst: self.index === 0,
isLast: self.isAddingCompleted && self.index === self.length - 1,
data: self.data,

@@ -118,8 +451,15 @@ next: function () {

self.data = context.data;
self.queueIndex++;
if (args.length === 0) {
self.results[self.index] = undefined;
} else if (args.length === 1) {
self.results[self.index] = args[0];
} else {
self.results[self.index] = args;
}
self.index++;
if (values.length) {
execute(values.shift(), values, args);
execute(values.shift(), values);
} else {
process.nextTick(function () {
executeBatch(args);
executeBatch();
});

@@ -129,14 +469,13 @@ }

end: function () {
var args = Array.prototype.slice.apply(arguments);
var end = self.end;
self.end = noOp;
end.apply({data: context.data}, args);
end.apply({data: context.data}, arguments);
}
};
self.worker.apply(context, [value].concat(args));
}(values.shift(), values, args));
self.worker.call(context, value);
}(values.shift(), values));
}
};
SerialQueue.prototype.complete = function() {
Queue.prototype.complete = function() {
this.isAddingCompleted = true;

@@ -146,54 +485,12 @@ };

/**
*
*
* @param batchSize
* @param tasks
*/
function Serial(batchSize, tasks, end) {
var worker = function(task) {
var args = Array.prototype.slice.call(arguments, 1);
task.apply(this, args);
};
var queue = new SerialQueue(batchSize, worker, end);
tasks.forEach(function (task) {
queue.push(task);
});
queue.complete();
}
function SerialEach(batchSize, worker, begin, end) {
var context = {
data: {},
begin: function () {
var args = Array.prototype.slice.apply(arguments);
var values;
context.begin = noOp;
if (args.length === 0) {
values = [];
} else if (args.length === 1) {
values = Array.isArray(args[0]) ? args[0] : [args[0]];
} else {
values = args;
}
var queue = new SerialQueue(batchSize, worker, end);
queue.data = context.data;
values.forEach(function (value) {
queue.push(value);
});
queue.complete();
}
};
begin.apply(context);
}
/**
*
* @param batchSize
* @param begin
* @param worker
* @param begin
* @param end
*/
function ParallelQueue(batchSize, worker, begin, end) {
function ParallelQueue(batchSize, begin, worker, end) {
this.batchSize = batchSize;
this.begin = begin;
this.worker = worker;
this.begin = begin;
this.end = end || noOp;

@@ -204,3 +501,3 @@ this.values = [];

this.isPushed = false;
this.pushCount = 0;
this.length = 0;
this.taskCount = 0;

@@ -214,3 +511,3 @@ this.results = [];

if (this.isAddingCompleted) {
throw new Error('This queue has already been frozen.');
throw new Error('This queue has been already frozen.');
}

@@ -220,4 +517,4 @@ if (this.isCanceled) {

}
this.values.push({index : this.pushCount, value: value});
this.pushCount++;
this.values.push({index : this.length, value: value});
this.length++;
this.taskCount++;

@@ -237,3 +534,3 @@ if (!this.isPushed) {

} else {
process.nextTick(executeBatch);
process.nextTick(executeBatch);
}

@@ -275,3 +572,3 @@ function executeBatch() {

};
self.worker.call(context, value.value, self.args[value.index]);
self.worker.call(context, value.value, self.args[value.index]);
}

@@ -285,155 +582,1 @@ }

};
/**
*
* @param batchSize
* @param tasks
* @param begin
* @param end
*/
function Parallel(batchSize, tasks, begin, end) {
var worker = function (task, arg) {
task.call(this, arg);
};
var queue = new ParallelQueue(batchSize, worker, begin, end);
tasks.forEach(function (task) {
queue.push(task);
});
queue.complete();
}
/**
*
* @param batchSize
* @param worker
* @param begin
* @param end
*/
function ParallelEach(batchSize, worker, begin, end) {
var context = {
begin: function () {
var args = Array.prototype.slice.apply(arguments);
var values;
context.begin = noOp;
if (args.length === 0) {
values = [];
} else if (args.length === 1) {
values = Array.isArray(args[0]) ? args[0] : [args[0]];
} else {
values = args;
}
var beginBridge = function () {
this.fork();
};
var queue = new ParallelQueue(batchSize, worker, beginBridge, end);
values.forEach(function (value) {
queue.push(value);
});
queue.complete();
}
};
begin.apply(context);
}
/**
*
* @param batchSize
*/
function Nue (batchSize) {
this.batchSize = batchSize;
}
Nue.prototype.serialQueue = function (worker) {
return new SerialQueue(this.batchSize, worker);
};
Nue.prototype.serial = function () {
var tasks = Array.prototype.slice.apply(arguments);
var batchSize = this.batchSize;
return function () {
var self = this;
var args = Array.prototype.slice.apply(arguments);
var begin = tasks[0];
var end;
var endWrapper;
tasks[0] = function () {
begin.apply(this, args);
};
if (tasks.length > 1) {
end = tasks.pop();
}
return new Serial(batchSize, tasks, end);
};
};
Nue.prototype.serialEach = function (begin, worker, end) {
var batchSize = this.batchSize;
return function () {
var self = this;
var args = Array.prototype.slice.apply(arguments);
var beginWrapper = function () {
begin.apply(this, args);
};
var endWrapper;
if (end) {
endWrapper = function() {
var args = Array.prototype.slice.apply(arguments);
end.apply(self, args);
};
}
return new SerialEach(batchSize, worker, beginWrapper, endWrapper);
};
};
Nue.prototype.parallelQueue = function (worker, end) {
var begin = function () {
this.fork();
};
return new ParallelQueue(this.batchSize, worker, begin, end);
};
Nue.prototype.parallel = function (begin, tasks, end) {
var args = Array.prototype.slice.apply(arguments);
if (Array.isArray(args[0])) {
begin = function () {
this.fork();
};
tasks = args[0];
end = typeof args[1] === 'function' ? args[1] : null;
}
var batchSize = this.batchSize;
return function () {
var self = this;
var args = Array.prototype.slice.apply(arguments);
var beginWrapper = function () {
begin.apply(this, args);
};
var endWrapper;
if (end) {
endWrapper = function() {
var args = Array.prototype.slice.apply(arguments);
end.apply(self, args);
};
}
return new Parallel(batchSize, tasks, beginWrapper, endWrapper);
};
};
Nue.prototype.parallelEach = function (begin, worker, end) {
var batchSize = this.batchSize;
return function () {
var self = this;
var args = Array.prototype.slice.apply(arguments);
var beginWrapper = function () {
begin.apply(this, args);
};
var endWrapper;
if (end) {
endWrapper = function() {
var args = Array.prototype.slice.apply(arguments);
end.apply(self, args);
};
}
return new ParallelEach(batchSize, worker, beginWrapper, endWrapper);
};
};

@@ -12,3 +12,3 @@ {

},
"version" : "0.0.2"
"version" : "0.0.3"
}

@@ -14,16 +14,50 @@ nue — An async control-flow library

### serial
```js
var nue = require('nue');
var flow = nue.flow;
var fs = require('fs');
var myFlow = flow(
function (file) {
fs.readFile(file, 'utf-8', this.next);
},
function (err, data) {
console.log(data);
}
);
myFlow('file1');
```
## API
### flow([Function tasks...]) -> Function
Return a function which represents the control-flow.
> Arguments
* `tasks`: Optional. Tasks which are executed in series.
> Context
`this` context of the `each task` has following properties.
* `data`: Object. An object shared among control-flow.
* `next`: Function. A function to execute the next task.
* `end`: Function. A function to execute the last task.
> Example
```js
var nue = require('nue');
var start = nue.start;
var serial = nue.serial;
var flow = nue.flow;
var fs = require('fs');
start(serial(
function (){
var myFlow = flow(
function () {
this.data = [];
fs.readFile('file1', this.next);
},
function (err, data){
function (err, data) {
if (err) throw this.end(err);

@@ -33,3 +67,3 @@ this.data.push(data.length);

},
function (err, data){
function (err, data) {
if (err) throw this.end(err);

@@ -44,94 +78,193 @@ this.data.push(data.length);

));
myFlow();
```
### serialEach
### each(Function begin(beginArg), Function process(processArg), Function end(err, results)) -> Function
Return a function to process each value in series.
> Arguments
* `begin`: Required. Function. A callback to prepare values.
* `beginArg`: Optional. Object. A value passed from the previous task.
* `process`: Required. Function. A callback to process values.
* `processArg`: Optional. Object. An each value passed from the begin callback.
* `end`: Optional. Function. An optional callback to handle error and results.
* `err`: Required. Error. An error passed from the process callback.
* `results`: Optional. Array. Values passed from the process callback.
> Context
`this` context of the `begin callback` has following properties.
* `data`: Object. An object shared in control-flow.
* `next`: Function. A function to execute the process callback in series.
`this` context of the `process callback` has following properties.
* `data`: Object. An object shared in control-flow.
* `next`: Function. A function to execute the process callback with next value or the end callback.
* `end`: Function. A function to execute the end callback.
* `isFirst`: Boolean. Indicate whether the first process or not.
* `isLast`: Boolean. Indicate whether the last process or not.
* `index`: Number. A process index.
`this` context of the `end callback` is same with the previous task one.
> Example
```js
var nue = require('nue');
var start = nue.start;
var serial = nue.serial;
var serialEach = nue.serialEach;
var flow = nue.flow;
var each = nue.each;
var fs = require('fs');
start(serialEach(
function () {
this.data = 0;
this.begin('file1', 'file2', 'file3');
},
function (name) {
var self = this;
fs.readFile(name, function (err, data) {
if (err) throw this.end(err);
self.data += data.length;
self.next(null, self.data);
});
},
function (err, data) {
if (err) throw err;
console.log(data);
}
));
var myFlow = flow(
each(
function () {
this.next('file1', 'file2', 'file3');
},
function (name) {
var self = this;
fs.readFile(name, function (err, data) {
if (err) throw this.end(err);
self.next(data.length);
});
},
function (err, results) {
if (err) throw err;
console.log(results);
}
)
);
myFlow();
```
### parallel
### parallel(Function begin(beginArg), Array tasks, Function end(err, results)) -> Function
Return a function to process tasks in parallel.
> Arguments
* `begin`: Required. Function. A callback to prepare values.
* `beginArg`: Optional. Object. A value passed from the previous task.
* `tasks`: Required. Array. An array of function, which are executed in parallel.
* `end`: Optional. Function. An optional callback to handle error and results.
* `err`: Required. Error. An error object passed from the process callback.
* `results`: Optional. Array. Values passed from the tasks.
> Context
`this` context of the `begin callback` has following properties.
* `data`: Object. An object shared in control-flow.
* `fork`: Function. A function to execute the tasks in parallel.
`this` context of the `each task` has following properties.
* `data`: Object. An object shared in control-flow.
* `join`: Function. A function to end the task and wait other tasks to complete.
* `end`: Function. A function to execute the end callback.
`this` context of the `end callback` is same with the previous task one.
> Example
```js
var nue = require('nue');
var start = nue.start;
var serial = nue.serial;
var flow = nue.flow;
var parallel = nue.parallel;
var fs = require('fs');
start(parallel(
function () {
this.fork('file1', 'file2');
},
[
function (name) {
var self = this;
fs.readFile(name, function (err, data) {
if (err) this.err(err);
self.join(data.length);
});
var myFlow = flow(
parallel(
function () {
this.fork('file1', 'file2');
},
function (path) {
var self = this;
fs.stat(path, function (err, stats) {
if (err) this.err(err);
self.join(stats.isFile());
});
[
function (name) {
var self = this;
fs.readFile(name, function (err, data) {
if (err) this.err(err);
self.join(data.length);
});
},
function (path) {
var self = this;
fs.stat(path, function (err, stats) {
if (err) this.err(err);
self.join(stats.isFile());
});
}
],
function (err, results) {
if (err) throw err;
console.log(results);
}
],
function (err, results) {
if (err) throw err;
console.log(results);
}
));
)
);
myFlow();
```
### parallelEach
### parallelEach(Function begin(beginArg), Function process(processArg), Function end(err, results)) -> Function
Return a function to process each value in parallel.
> Arguments
* `begin`: Required. Function. A callback to prepare values.
* `beginArg`: Optional. Object. A value passed from the previous task.
* `process`: Required. Function. A callback to process values.
* `processArg`: Optional. Object. An each value passed from the begin callback.
* `end`: Optional. Function. An optional callback to handle error and results.
* `err`: Required. Error. An error object passed from the process callback.
* `results`: Optional. Array. Values passed from the process callback.
> Context
`this` context of the `begin callback` has following properties.
* `data`: Object. An object shared in control-flow.
* `fork`: Function. A function to execute the process callback in parallel.
`this` context of the `process callback` has following properties.
* `data`: Object. An object shared in control-flow.
* `join`: Function. A function to end the process callback and wait other process callbacks to complete.
* `end`: Function. A function to execute the end callback.
`this` context of the `end callback` is same with the previous task one.
> Example
```js
var nue = require('nue');
var start = nue.start;
var serial = nue.serial;
var flow = nue.flow;
var parallelEach = nue.parallelEach;
var fs = require('fs');
start(parallelEach(
function () {
this.begin('file1', 'file2');
},
function (name) {
var self = this;
fs.readFile(name, function (err, data) {
if (err) this.end(err);
self.join(data.length);
});
},
function (err, results) {
if (err) throw err;
console.log(results);
}
));
var myFlow = flow(
parallelEach(
function () {
this.fork('file1', 'file2');
},
function (name) {
var self = this;
fs.readFile(name, function (err, data) {
if (err) this.end(err);
self.join(data.length);
});
},
function (err, results) {
if (err) throw err;
console.log(results);
}
)
);
myFlow();
```
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc