Comparing version 0.0.2 to 0.0.3
var nue = require('../lib/nue'); | ||
var start = nue.start; | ||
var serial = nue.serial; | ||
var flow = nue.flow; | ||
var parallel = nue.parallel; | ||
var fs = require('fs'); | ||
start(parallel( | ||
function () { | ||
this.fork('LICENSE', 'README.md'); | ||
}, | ||
[ | ||
function (name) { | ||
var self = this; | ||
fs.readFile(name, function (err, data) { | ||
if (err) this.err(err); | ||
self.join(data.length); | ||
}); | ||
flow( | ||
parallel( | ||
function () { | ||
this.fork('LICENSE', 'README.md'); | ||
}, | ||
function (path) { | ||
var self = this; | ||
fs.stat(path, function (err, stats) { | ||
if (err) this.err(err); | ||
self.join(stats.isFile()); | ||
}); | ||
[ | ||
function (name) { | ||
var self = this; | ||
fs.readFile(name, function (err, data) { | ||
if (err) this.end(err); | ||
self.join(data.length); | ||
}); | ||
}, | ||
function (path) { | ||
var self = this; | ||
fs.stat(path, function (err, stats) { | ||
if (err) this.end(err); | ||
self.join(stats.isFile()); | ||
}); | ||
} | ||
], | ||
function (err, results) { | ||
if (err) throw err; | ||
console.log(results); | ||
} | ||
], | ||
function (err, results) { | ||
if (err) throw err; | ||
console.log(results); | ||
} | ||
)); | ||
) | ||
)(); |
var nue = require('../lib/nue'); | ||
var start = nue.start; | ||
var serial = nue.serial; | ||
var flow = nue.flow; | ||
var parallelEach = nue.parallelEach; | ||
var fs = require('fs'); | ||
start(parallelEach( | ||
function () { | ||
this.begin('LICENSE', 'README.md'); | ||
}, | ||
function (name) { | ||
var self = this; | ||
fs.readFile(name, function (err, data) { | ||
if (err) this.end(err); | ||
self.join(data.length); | ||
}); | ||
}, | ||
function (err, results) { | ||
if (err) throw err; | ||
console.log(results); | ||
} | ||
)); | ||
flow( | ||
parallelEach( | ||
function () { | ||
this.fork('LICENSE', 'README.md'); | ||
}, | ||
function (name) { | ||
var self = this; | ||
fs.readFile(name, function (err, data) { | ||
if (err) this.end(err); | ||
self.join(data.length); | ||
}); | ||
}, | ||
function (err, results) { | ||
if (err) throw err; | ||
console.log(results); | ||
} | ||
) | ||
)(); |
643
lib/nue.js
@@ -0,1 +1,2 @@ | ||
var EventEmitter = require('events').EventEmitter; | ||
var sentinel = {}; | ||
@@ -8,31 +9,21 @@ var noOp = function () {}; | ||
exports.version = '0.0.1'; | ||
exports.version = '0.0.3'; | ||
exports.batchSize = exports.DEFAULT_BATCH_SIZE; | ||
exports.start = function () { | ||
exports.queue = function () { | ||
var args = Array.prototype.slice.apply(arguments); | ||
var callback; | ||
if (args.length === 0) { | ||
return; | ||
} | ||
callback = args.pop(); | ||
callback.apply(null, args); | ||
return wrapOrApply(Nue.prototype.queue, args); | ||
}; | ||
exports.serialQueue = function () { | ||
exports.flow = function () { | ||
var args = Array.prototype.slice.apply(arguments); | ||
return wrapOrApply(Nue.prototype.serialQueue, args); | ||
return wrapOrApply(Nue.prototype.flow, args); | ||
}; | ||
exports.serial = function () { | ||
exports.each = function () { | ||
var args = Array.prototype.slice.apply(arguments); | ||
return wrapOrApply(Nue.prototype.serial, args); | ||
return wrapOrApply(Nue.prototype.each, args); | ||
}; | ||
exports.serialEach = function () { | ||
var args = Array.prototype.slice.apply(arguments); | ||
return wrapOrApply(Nue.prototype.serialEach, args); | ||
}; | ||
exports.parallelQueue = function () { | ||
@@ -58,4 +49,3 @@ var args = Array.prototype.slice.apply(arguments); | ||
return function () { | ||
var args = Array.prototype.slice.apply(arguments); | ||
return fn.apply(new Nue(batchSize), args) | ||
return fn.apply(new Nue(batchSize), arguments) | ||
}; | ||
@@ -66,8 +56,348 @@ } | ||
function Nue(batchSize) { | ||
this.batchSize = batchSize; | ||
} | ||
Nue.prototype.flow = function () { | ||
var tasks = Array.prototype.slice.apply(arguments); | ||
var batchSize = this.batchSize; | ||
var fn = function () { | ||
var args = Array.prototype.slice.apply(arguments); | ||
runFlow(batchSize, tasks, fn, args, this && this.data); | ||
}; | ||
Nue.eventify(fn); | ||
return fn; | ||
}; | ||
Nue.prototype.each = function (begin, worker, end) { | ||
var batchSize = this.batchSize; | ||
var fn = function () { | ||
var args = Array.prototype.slice.apply(arguments); | ||
runEach(batchSize, begin, worker, end, args, this); | ||
}; | ||
Nue.eventify(fn); | ||
return fn; | ||
}; | ||
Nue.prototype.queue = function (worker, end) { | ||
return new Queue(this.batchSize, worker, end); | ||
}; | ||
Nue.prototype.parallel = function (begin, tasks, end) { | ||
var batchSize = this.batchSize; | ||
var fn = function () { | ||
var args = Array.prototype.slice.apply(arguments); | ||
runParallel(batchSize, begin, tasks, end, args, this); | ||
}; | ||
Nue.eventify(fn); | ||
return fn; | ||
}; | ||
Nue.prototype.parallelEach = function (begin, worker, end) { | ||
var batchSize = this.batchSize; | ||
var fn = function () { | ||
var args = Array.prototype.slice.apply(arguments); | ||
runParallelEach(batchSize, begin, worker, end, args, this); | ||
}; | ||
Nue.eventify(fn); | ||
return fn; | ||
}; | ||
Nue.prototype.parallelQueue = function (worker, end) { | ||
var begin = function () { | ||
this.fork(); | ||
}; | ||
return new ParallelQueue(this.batchSize, begin, worker, end); | ||
}; | ||
Nue.eventify = function (fn) { | ||
fn.events = new EventEmitter(); | ||
fn.on = function (type, handler) { | ||
if (type === 'done') { | ||
fn.events.on('done', function (context, args) { | ||
handler.apply(context, args); | ||
}); | ||
} | ||
return fn; | ||
}; | ||
fn.once = function (type, handler) { | ||
if (type === 'done') { | ||
fn.events.once('done', function (context, args) { | ||
handler.apply(context, args); | ||
}); | ||
} | ||
return fn; | ||
}; | ||
fn.__nue__ = true; | ||
}; | ||
function runFlow(batchSize, tasks, caller, callerArgs, data) { | ||
var begin; | ||
var end; | ||
var taskWrappers = tasks.map(function (task) { | ||
var fn; | ||
if (typeof task !== 'function') { | ||
throw new Error('not function'); | ||
} | ||
if (task.__nue__) { | ||
return task; | ||
} | ||
fn = function () { | ||
return task.apply(this, arguments); | ||
}; | ||
Nue.eventify(fn); | ||
return fn; | ||
}); | ||
taskWrappers.forEach(function (task, i, tasks) { | ||
var next = tasks[i + 1]; | ||
if (i < tasks.length - 1) { | ||
if (task.__nue__) { | ||
task.events.once('__done__', function(flow) { | ||
flow.callback.call(null, next, flow); | ||
}); | ||
} | ||
} | ||
}); | ||
begin = taskWrappers[0]; | ||
end = taskWrappers[taskWrappers.length - 1]; | ||
end.events.on('__done__', function (flow) { | ||
caller.events.emit('__done__', flow); | ||
caller.events.emit('done', {data: flow.data}, flow.args); | ||
}); | ||
executeTask(begin, { | ||
args: callerArgs, | ||
callback: executeTask, | ||
data: typeof data !== 'undefined' ? data : {}, | ||
endCallback: end, | ||
batchSize: batchSize, | ||
callCount: 0 | ||
}); | ||
} | ||
function executeTask(task, flow) { | ||
flow.callCount++; | ||
var context = { | ||
data: flow.data, | ||
next: function () { | ||
context.next = noOp; | ||
flow.args = Array.prototype.slice.apply(arguments); | ||
flow.data = context.data; | ||
done(task.events, flow); | ||
}, | ||
end: function () { | ||
var endContext = { | ||
data: context.data, | ||
next: function () { | ||
endContext.end = noOp; | ||
flow.args = Array.prototype.slice.apply(arguments); | ||
flow.data = endContext.data; | ||
done(flow.endCallback.events, flow); | ||
}, | ||
end: noOp | ||
}; | ||
flow.endCallback.apply(context, arguments); | ||
} | ||
}; | ||
task.apply(context, flow.args); | ||
function done(emitter, flow) { | ||
if (flow.batchSize > 0 && flow.batchSize === flow.callCount) { | ||
flow.callCount = 0; | ||
process.nextTick(function () { | ||
emitter.emit('__done__', flow); | ||
}); | ||
} else { | ||
emitter.emit('__done__', flow); | ||
} | ||
} | ||
} | ||
function runEach(batchSize, begin, worker, end, callerArgs, callerContext) { | ||
var valueIndex = 0; | ||
var valueLength = 0; | ||
var dataAvailable = callerContext && typeof callerContext.data !== 'undefined'; | ||
var data = dataAvailable ? callerContext.data : {}; | ||
var results = []; | ||
var executeEnd = function () { | ||
if (dataAvailable) { | ||
callerContext.data = data; | ||
} | ||
if (end) { | ||
end.apply(callerContext, Array.prototype.slice.apply(arguments)); | ||
} | ||
}; | ||
var context = { | ||
data: data, | ||
next: function () { | ||
var args = Array.prototype.slice.apply(arguments); | ||
var values; | ||
context.next = noOp; | ||
if (args.length === 1 && Array.isArray(args[0])) { | ||
values = args[0]; | ||
} else { | ||
values = args; | ||
} | ||
valueLength = values.length; | ||
data = context.data; | ||
(function executeBatch () { | ||
var tasks = batchSize > 0 ? values.splice(0, batchSize) : values; | ||
if (tasks.length === 0) { | ||
var fn = executeEnd; | ||
executeEnd = noOp; | ||
fn(null, results); | ||
return; | ||
} | ||
(function execute(task, tasks) { | ||
var context = { | ||
isFirst: valueIndex === 0, | ||
isLast: valueIndex === valueLength - 1, | ||
index: valueIndex, | ||
data: data, | ||
next: function () { | ||
var args = Array.prototype.slice.apply(arguments); | ||
context.next = noOp; | ||
data = context.data; | ||
if (args.length === 0) { | ||
results[valueIndex] = undefined; | ||
} else if (args.length === 1) { | ||
results[valueIndex] = args[0]; | ||
} else { | ||
results[valueIndex] = args; | ||
} | ||
valueIndex++; | ||
if (tasks.length) { | ||
execute(tasks.shift(), tasks); | ||
} else { | ||
process.nextTick(function () { | ||
executeBatch(); | ||
}); | ||
} | ||
}, | ||
end: function () { | ||
var fn = executeEnd; | ||
executeEnd = noOp; | ||
data = context.data; | ||
fn.apply(null, arguments); | ||
} | ||
}; | ||
worker.call(context, task); | ||
}(tasks.shift(), tasks)); | ||
}()); | ||
} | ||
}; | ||
begin.apply(context, callerArgs); | ||
} | ||
function runParallel(batchSize, begin, tasks, end, callerArgs, callerContext) { | ||
var taskWrappers = tasks.map(function (task, i) { | ||
return { value:task, index:i }; | ||
}); | ||
var dataAvailable = callerContext && typeof callerContext.data !== 'undefined'; | ||
var data = dataAvailable ? callerContext.data : {}; | ||
var parallelArgs = []; | ||
var results = []; | ||
var taskCount = tasks.length; | ||
var beginContext = { | ||
fork: function () { | ||
var args = Array.prototype.slice.apply(arguments); | ||
if (args.length === 1 && Array.isArray(args[0])) { | ||
parallelArgs = args[0] | ||
} else { | ||
parallelArgs = args; | ||
} | ||
beginContext.fork = noOp; | ||
process.nextTick(function () { | ||
executeBatch(data, false); | ||
}); | ||
} | ||
}; | ||
begin.apply(beginContext, callerArgs); | ||
function executeBatch (data, isCanceled) { | ||
var tasks; | ||
var task; | ||
var i; | ||
var len; | ||
var context; | ||
var executeEnd = function () { | ||
if (dataAvailable) { | ||
callerContext.data = data; | ||
} | ||
if (end) { | ||
end.apply(callerContext, arguments); | ||
} | ||
}; | ||
if (taskCount === 0) { | ||
executeEnd(null, results); | ||
return; | ||
} | ||
tasks = batchSize > 0 ? taskWrappers.splice(0, batchSize) : taskWrappers; | ||
len = tasks.length; | ||
for (i = 0; i < len; i++) { | ||
if (isCanceled) { | ||
break; | ||
} | ||
task = tasks[i]; | ||
context = {}; | ||
context.data = data; | ||
context.join = (function (index, context) { | ||
return function (result) { | ||
data = context.data; | ||
results[index] = result; | ||
taskCount--; | ||
context.join = noOp; | ||
}; | ||
}(task.index, context)); | ||
context.end = (function (context) { | ||
return function () { | ||
data = context.data; | ||
isCanceled = true; | ||
executeEnd.apply(null, arguments); | ||
} | ||
}(context)); | ||
task.value.call(context, parallelArgs[i]); | ||
} | ||
process.nextTick(function () { | ||
executeBatch(data, isCanceled); | ||
}); | ||
} | ||
} | ||
function runParallelEach(batchSize, begin, worker, end, callerArgs, callerContext) { | ||
var context = { | ||
fork: function () { | ||
var args = Array.prototype.slice.apply(arguments); | ||
var values; | ||
var beginBridge; | ||
var tasks; | ||
context.fork = noOp; | ||
if (args.length === 1 && Array.isArray(args[0])) { | ||
values = args[0]; | ||
} else { | ||
values = args; | ||
} | ||
beginBridge = function () { | ||
this.fork(); | ||
}; | ||
tasks = values.map(function (value) { | ||
return function () { | ||
worker.call(this, value); | ||
} | ||
}); | ||
runParallel(batchSize, beginBridge, tasks, end, [], callerContext); | ||
} | ||
}; | ||
begin.apply(context, callerArgs); | ||
} | ||
/** | ||
* | ||
* | ||
* @param batchSize | ||
* @param worker | ||
* @param end | ||
*/ | ||
function SerialQueue(batchSize, worker, end) { | ||
function Queue(batchSize, worker, end) { | ||
this.batchSize = batchSize; | ||
@@ -77,37 +407,40 @@ this.worker = worker; | ||
this.values = []; | ||
this.results = []; | ||
this.isAddingCompleted = false; | ||
this.isPushed = false; | ||
this.pushCount = 0; | ||
this.queueIndex = 0; | ||
this.length = 0; | ||
this.index = 0; | ||
this.data = {}; | ||
} | ||
SerialQueue.prototype.push = function(value) { | ||
Queue.prototype.push = function(value) { | ||
var self = this; | ||
if (this.isAddingCompleted) { | ||
throw new Error('This queue has already been frozen.'); | ||
throw new Error('This queue has been already frozen.'); | ||
} | ||
this.values.push(value); | ||
self.pushCount++; | ||
self.length++; | ||
if (!this.isPushed) { | ||
this.isPushed = true; | ||
process.nextTick(function() { | ||
executeBatch([]); | ||
executeBatch(); | ||
}); | ||
} | ||
function executeBatch (args) { | ||
var values = self.batchSize > 0 | ||
? self.values.splice(0, self.batchSize) | ||
: self.values; | ||
function executeBatch () { | ||
var values = self.batchSize > 0 ? self.values.splice(0, self.batchSize) : self.values; | ||
if (values.length === 0) { | ||
if (self.isAddingCompleted) { | ||
self.end.apply({data: self.data}, args); | ||
self.end.call({data: self.data}, null, self.results); | ||
} else { | ||
process.nextTick(function () { | ||
executeBatch(); | ||
}); | ||
} | ||
return; | ||
} | ||
(function execute(value, values, args) { | ||
(function execute(value, values) { | ||
var context = { | ||
index: self.queueIndex, | ||
isFirst: self.queueIndex === 0, | ||
isLast: self.isAddingCompleted && self.queueIndex === self.pushCount - 1, | ||
index: self.index, | ||
isFirst: self.index === 0, | ||
isLast: self.isAddingCompleted && self.index === self.length - 1, | ||
data: self.data, | ||
@@ -118,8 +451,15 @@ next: function () { | ||
self.data = context.data; | ||
self.queueIndex++; | ||
if (args.length === 0) { | ||
self.results[self.index] = undefined; | ||
} else if (args.length === 1) { | ||
self.results[self.index] = args[0]; | ||
} else { | ||
self.results[self.index] = args; | ||
} | ||
self.index++; | ||
if (values.length) { | ||
execute(values.shift(), values, args); | ||
execute(values.shift(), values); | ||
} else { | ||
process.nextTick(function () { | ||
executeBatch(args); | ||
executeBatch(); | ||
}); | ||
@@ -129,14 +469,13 @@ } | ||
end: function () { | ||
var args = Array.prototype.slice.apply(arguments); | ||
var end = self.end; | ||
self.end = noOp; | ||
end.apply({data: context.data}, args); | ||
end.apply({data: context.data}, arguments); | ||
} | ||
}; | ||
self.worker.apply(context, [value].concat(args)); | ||
}(values.shift(), values, args)); | ||
self.worker.call(context, value); | ||
}(values.shift(), values)); | ||
} | ||
}; | ||
SerialQueue.prototype.complete = function() { | ||
Queue.prototype.complete = function() { | ||
this.isAddingCompleted = true; | ||
@@ -146,54 +485,12 @@ }; | ||
/** | ||
* | ||
* | ||
* @param batchSize | ||
* @param tasks | ||
*/ | ||
function Serial(batchSize, tasks, end) { | ||
var worker = function(task) { | ||
var args = Array.prototype.slice.call(arguments, 1); | ||
task.apply(this, args); | ||
}; | ||
var queue = new SerialQueue(batchSize, worker, end); | ||
tasks.forEach(function (task) { | ||
queue.push(task); | ||
}); | ||
queue.complete(); | ||
} | ||
function SerialEach(batchSize, worker, begin, end) { | ||
var context = { | ||
data: {}, | ||
begin: function () { | ||
var args = Array.prototype.slice.apply(arguments); | ||
var values; | ||
context.begin = noOp; | ||
if (args.length === 0) { | ||
values = []; | ||
} else if (args.length === 1) { | ||
values = Array.isArray(args[0]) ? args[0] : [args[0]]; | ||
} else { | ||
values = args; | ||
} | ||
var queue = new SerialQueue(batchSize, worker, end); | ||
queue.data = context.data; | ||
values.forEach(function (value) { | ||
queue.push(value); | ||
}); | ||
queue.complete(); | ||
} | ||
}; | ||
begin.apply(context); | ||
} | ||
/** | ||
* | ||
* @param batchSize | ||
* @param begin | ||
* @param worker | ||
* @param begin | ||
* @param end | ||
*/ | ||
function ParallelQueue(batchSize, worker, begin, end) { | ||
function ParallelQueue(batchSize, begin, worker, end) { | ||
this.batchSize = batchSize; | ||
this.begin = begin; | ||
this.worker = worker; | ||
this.begin = begin; | ||
this.end = end || noOp; | ||
@@ -204,3 +501,3 @@ this.values = []; | ||
this.isPushed = false; | ||
this.pushCount = 0; | ||
this.length = 0; | ||
this.taskCount = 0; | ||
@@ -214,3 +511,3 @@ this.results = []; | ||
if (this.isAddingCompleted) { | ||
throw new Error('This queue has already been frozen.'); | ||
throw new Error('This queue has been already frozen.'); | ||
} | ||
@@ -220,4 +517,4 @@ if (this.isCanceled) { | ||
} | ||
this.values.push({index : this.pushCount, value: value}); | ||
this.pushCount++; | ||
this.values.push({index : this.length, value: value}); | ||
this.length++; | ||
this.taskCount++; | ||
@@ -237,3 +534,3 @@ if (!this.isPushed) { | ||
} else { | ||
process.nextTick(executeBatch); | ||
process.nextTick(executeBatch); | ||
} | ||
@@ -275,3 +572,3 @@ function executeBatch() { | ||
}; | ||
self.worker.call(context, value.value, self.args[value.index]); | ||
self.worker.call(context, value.value, self.args[value.index]); | ||
} | ||
@@ -285,155 +582,1 @@ } | ||
}; | ||
/** | ||
* | ||
* @param batchSize | ||
* @param tasks | ||
* @param begin | ||
* @param end | ||
*/ | ||
function Parallel(batchSize, tasks, begin, end) { | ||
var worker = function (task, arg) { | ||
task.call(this, arg); | ||
}; | ||
var queue = new ParallelQueue(batchSize, worker, begin, end); | ||
tasks.forEach(function (task) { | ||
queue.push(task); | ||
}); | ||
queue.complete(); | ||
} | ||
/** | ||
* | ||
* @param batchSize | ||
* @param worker | ||
* @param begin | ||
* @param end | ||
*/ | ||
function ParallelEach(batchSize, worker, begin, end) { | ||
var context = { | ||
begin: function () { | ||
var args = Array.prototype.slice.apply(arguments); | ||
var values; | ||
context.begin = noOp; | ||
if (args.length === 0) { | ||
values = []; | ||
} else if (args.length === 1) { | ||
values = Array.isArray(args[0]) ? args[0] : [args[0]]; | ||
} else { | ||
values = args; | ||
} | ||
var beginBridge = function () { | ||
this.fork(); | ||
}; | ||
var queue = new ParallelQueue(batchSize, worker, beginBridge, end); | ||
values.forEach(function (value) { | ||
queue.push(value); | ||
}); | ||
queue.complete(); | ||
} | ||
}; | ||
begin.apply(context); | ||
} | ||
/** | ||
* | ||
* @param batchSize | ||
*/ | ||
function Nue (batchSize) { | ||
this.batchSize = batchSize; | ||
} | ||
Nue.prototype.serialQueue = function (worker) { | ||
return new SerialQueue(this.batchSize, worker); | ||
}; | ||
Nue.prototype.serial = function () { | ||
var tasks = Array.prototype.slice.apply(arguments); | ||
var batchSize = this.batchSize; | ||
return function () { | ||
var self = this; | ||
var args = Array.prototype.slice.apply(arguments); | ||
var begin = tasks[0]; | ||
var end; | ||
var endWrapper; | ||
tasks[0] = function () { | ||
begin.apply(this, args); | ||
}; | ||
if (tasks.length > 1) { | ||
end = tasks.pop(); | ||
} | ||
return new Serial(batchSize, tasks, end); | ||
}; | ||
}; | ||
Nue.prototype.serialEach = function (begin, worker, end) { | ||
var batchSize = this.batchSize; | ||
return function () { | ||
var self = this; | ||
var args = Array.prototype.slice.apply(arguments); | ||
var beginWrapper = function () { | ||
begin.apply(this, args); | ||
}; | ||
var endWrapper; | ||
if (end) { | ||
endWrapper = function() { | ||
var args = Array.prototype.slice.apply(arguments); | ||
end.apply(self, args); | ||
}; | ||
} | ||
return new SerialEach(batchSize, worker, beginWrapper, endWrapper); | ||
}; | ||
}; | ||
Nue.prototype.parallelQueue = function (worker, end) { | ||
var begin = function () { | ||
this.fork(); | ||
}; | ||
return new ParallelQueue(this.batchSize, worker, begin, end); | ||
}; | ||
Nue.prototype.parallel = function (begin, tasks, end) { | ||
var args = Array.prototype.slice.apply(arguments); | ||
if (Array.isArray(args[0])) { | ||
begin = function () { | ||
this.fork(); | ||
}; | ||
tasks = args[0]; | ||
end = typeof args[1] === 'function' ? args[1] : null; | ||
} | ||
var batchSize = this.batchSize; | ||
return function () { | ||
var self = this; | ||
var args = Array.prototype.slice.apply(arguments); | ||
var beginWrapper = function () { | ||
begin.apply(this, args); | ||
}; | ||
var endWrapper; | ||
if (end) { | ||
endWrapper = function() { | ||
var args = Array.prototype.slice.apply(arguments); | ||
end.apply(self, args); | ||
}; | ||
} | ||
return new Parallel(batchSize, tasks, beginWrapper, endWrapper); | ||
}; | ||
}; | ||
Nue.prototype.parallelEach = function (begin, worker, end) { | ||
var batchSize = this.batchSize; | ||
return function () { | ||
var self = this; | ||
var args = Array.prototype.slice.apply(arguments); | ||
var beginWrapper = function () { | ||
begin.apply(this, args); | ||
}; | ||
var endWrapper; | ||
if (end) { | ||
endWrapper = function() { | ||
var args = Array.prototype.slice.apply(arguments); | ||
end.apply(self, args); | ||
}; | ||
} | ||
return new ParallelEach(batchSize, worker, beginWrapper, endWrapper); | ||
}; | ||
}; |
@@ -12,3 +12,3 @@ { | ||
}, | ||
"version" : "0.0.2" | ||
"version" : "0.0.3" | ||
} |
281
README.md
@@ -14,16 +14,50 @@ nue — An async control-flow library | ||
### serial | ||
```js | ||
var nue = require('nue'); | ||
var flow = nue.flow; | ||
var fs = require('fs'); | ||
var myFlow = flow( | ||
function (file) { | ||
fs.readFile(file, 'utf-8', this.next); | ||
}, | ||
function (err, data) { | ||
console.log(data); | ||
} | ||
); | ||
myFlow('file1'); | ||
``` | ||
## API | ||
### flow([Function tasks...]) -> Function | ||
Return a function which represents the control-flow. | ||
> Arguments | ||
* `tasks`: Optional. Tasks which are executed in series. | ||
> Context | ||
`this` context of the `each task` has following properties. | ||
* `data`: Object. An object shared among control-flow. | ||
* `next`: Function. A function to execute the next task. | ||
* `end`: Function. A function to execute the last task. | ||
> Example | ||
```js | ||
var nue = require('nue'); | ||
var start = nue.start; | ||
var serial = nue.serial; | ||
var flow = nue.flow; | ||
var fs = require('fs'); | ||
start(serial( | ||
function (){ | ||
var myFlow = flow( | ||
function () { | ||
this.data = []; | ||
fs.readFile('file1', this.next); | ||
}, | ||
function (err, data){ | ||
function (err, data) { | ||
if (err) throw this.end(err); | ||
@@ -33,3 +67,3 @@ this.data.push(data.length); | ||
}, | ||
function (err, data){ | ||
function (err, data) { | ||
if (err) throw this.end(err); | ||
@@ -44,94 +78,193 @@ this.data.push(data.length); | ||
)); | ||
myFlow(); | ||
``` | ||
### serialEach | ||
### each(Function begin(beginArg), Function process(processArg), Function end(err, results)) -> Function | ||
Return a function to process each value in series. | ||
> Arguments | ||
* `begin`: Required. Function. A callback to prepare values. | ||
* `beginArg`: Optional. Object. A value passed from the previous task. | ||
* `process`: Required. Function. A callback to process values. | ||
* `processArg`: Optional. Object. An each value passed from the begin callback. | ||
* `end`: Optional. Function. An optional callback to handle error and results. | ||
* `err`: Required. Error. An error passed from the process callback. | ||
* `results`: Optional. Array. Values passed from the process callback. | ||
> Context | ||
`this` context of the `begin callback` has following properties. | ||
* `data`: Object. An object shared in control-flow. | ||
* `next`: Function. A function to execute the process callback in series. | ||
`this` context of the `process callback` has following properties. | ||
* `data`: Object. An object shared in control-flow. | ||
* `next`: Function. A function to execute the process callback with next value or the end callback. | ||
* `end`: Function. A function to execute the end callback. | ||
* `isFirst`: Boolean. Indicate whether the first process or not. | ||
* `isLast`: Boolean. Indicate whether the last process or not. | ||
* `index`: Number. A process index. | ||
`this` context of the `end callback` is same with the previous task one. | ||
> Example | ||
```js | ||
var nue = require('nue'); | ||
var start = nue.start; | ||
var serial = nue.serial; | ||
var serialEach = nue.serialEach; | ||
var flow = nue.flow; | ||
var each = nue.each; | ||
var fs = require('fs'); | ||
start(serialEach( | ||
function () { | ||
this.data = 0; | ||
this.begin('file1', 'file2', 'file3'); | ||
}, | ||
function (name) { | ||
var self = this; | ||
fs.readFile(name, function (err, data) { | ||
if (err) throw this.end(err); | ||
self.data += data.length; | ||
self.next(null, self.data); | ||
}); | ||
}, | ||
function (err, data) { | ||
if (err) throw err; | ||
console.log(data); | ||
} | ||
)); | ||
var myFlow = flow( | ||
each( | ||
function () { | ||
this.next('file1', 'file2', 'file3'); | ||
}, | ||
function (name) { | ||
var self = this; | ||
fs.readFile(name, function (err, data) { | ||
if (err) throw this.end(err); | ||
self.next(data.length); | ||
}); | ||
}, | ||
function (err, results) { | ||
if (err) throw err; | ||
console.log(results); | ||
} | ||
) | ||
); | ||
myFlow(); | ||
``` | ||
### parallel | ||
### parallel(Function begin(beginArg), Array tasks, Function end(err, results)) -> Function | ||
Return a function to process tasks in parallel. | ||
> Arguments | ||
* `begin`: Required. Function. A callback to prepare values. | ||
* `beginArg`: Optional. Object. A value passed from the previous task. | ||
* `tasks`: Required. Array. An array of function, which are executed in parallel. | ||
* `end`: Optional. Function. An optional callback to handle error and results. | ||
* `err`: Required. Error. An error object passed from the process callback. | ||
* `results`: Optional. Array. Values passed from the tasks. | ||
> Context | ||
`this` context of the `begin callback` has following properties. | ||
* `data`: Object. An object shared in control-flow. | ||
* `fork`: Function. A function to execute the tasks in parallel. | ||
`this` context of the `each task` has following properties. | ||
* `data`: Object. An object shared in control-flow. | ||
* `join`: Function. A function to end the task and wait other tasks to complete. | ||
* `end`: Function. A function to execute the end callback. | ||
`this` context of the `end callback` is same with the previous task one. | ||
> Example | ||
```js | ||
var nue = require('nue'); | ||
var start = nue.start; | ||
var serial = nue.serial; | ||
var flow = nue.flow; | ||
var parallel = nue.parallel; | ||
var fs = require('fs'); | ||
start(parallel( | ||
function () { | ||
this.fork('file1', 'file2'); | ||
}, | ||
[ | ||
function (name) { | ||
var self = this; | ||
fs.readFile(name, function (err, data) { | ||
if (err) this.err(err); | ||
self.join(data.length); | ||
}); | ||
var myFlow = flow( | ||
parallel( | ||
function () { | ||
this.fork('file1', 'file2'); | ||
}, | ||
function (path) { | ||
var self = this; | ||
fs.stat(path, function (err, stats) { | ||
if (err) this.err(err); | ||
self.join(stats.isFile()); | ||
}); | ||
[ | ||
function (name) { | ||
var self = this; | ||
fs.readFile(name, function (err, data) { | ||
if (err) this.err(err); | ||
self.join(data.length); | ||
}); | ||
}, | ||
function (path) { | ||
var self = this; | ||
fs.stat(path, function (err, stats) { | ||
if (err) this.err(err); | ||
self.join(stats.isFile()); | ||
}); | ||
} | ||
], | ||
function (err, results) { | ||
if (err) throw err; | ||
console.log(results); | ||
} | ||
], | ||
function (err, results) { | ||
if (err) throw err; | ||
console.log(results); | ||
} | ||
)); | ||
) | ||
); | ||
myFlow(); | ||
``` | ||
### parallelEach | ||
### parallelEach(Function begin(beginArg), Function process(processArg), Function end(err, results)) -> Function | ||
Return a function to process each value in parallel. | ||
> Arguments | ||
* `begin`: Required. Function. A callback to prepare values. | ||
* `beginArg`: Optional. Object. A value passed from the previous task. | ||
* `process`: Required. Function. A callback to process values. | ||
* `processArg`: Optional. Object. An each value passed from the begin callback. | ||
* `end`: Optional. Function. An optional callback to handle error and results. | ||
* `err`: Required. Error. An error object passed from the process callback. | ||
* `results`: Optional. Array. Values passed from the process callback. | ||
> Context | ||
`this` context of the `begin callback` has following properties. | ||
* `data`: Object. An object shared in control-flow. | ||
* `fork`: Function. A function to execute the process callback in parallel. | ||
`this` context of the `process callback` has following properties. | ||
* `data`: Object. An object shared in control-flow. | ||
* `join`: Function. A function to end the process callback and wait other process callbacks to complete. | ||
* `end`: Function. A function to execute the end callback. | ||
`this` context of the `end callback` is same with the previous task one. | ||
> Example | ||
```js | ||
var nue = require('nue'); | ||
var start = nue.start; | ||
var serial = nue.serial; | ||
var flow = nue.flow; | ||
var parallelEach = nue.parallelEach; | ||
var fs = require('fs'); | ||
start(parallelEach( | ||
function () { | ||
this.begin('file1', 'file2'); | ||
}, | ||
function (name) { | ||
var self = this; | ||
fs.readFile(name, function (err, data) { | ||
if (err) this.end(err); | ||
self.join(data.length); | ||
}); | ||
}, | ||
function (err, results) { | ||
if (err) throw err; | ||
console.log(results); | ||
} | ||
)); | ||
var myFlow = flow( | ||
parallelEach( | ||
function () { | ||
this.fork('file1', 'file2'); | ||
}, | ||
function (name) { | ||
var self = this; | ||
fs.readFile(name, function (err, data) { | ||
if (err) this.end(err); | ||
self.join(data.length); | ||
}); | ||
}, | ||
function (err, results) { | ||
if (err) throw err; | ||
console.log(results); | ||
} | ||
) | ||
); | ||
myFlow(); | ||
``` |
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
25492
628
268