Comparing version 0.0.1 to 0.0.2
424
lib/nue.js
@@ -1,41 +0,117 @@ | ||
var nue = module.exports = function(tickSize) { | ||
tickSize = typeof tickSize === 'number' ? tickSize : nue.tickSize; | ||
return new Nue(tickSize); | ||
}; | ||
var sentinel = {}; | ||
var noOp = function () {}; | ||
exports.DEFAULT_BATCH_SIZE = 3; | ||
exports.name = 'nue'; | ||
exports.version = '0.0.1'; | ||
exports.batchSize = exports.DEFAULT_BATCH_SIZE; | ||
exports.start = function () { | ||
var args = Array.prototype.slice.apply(arguments); | ||
var callback; | ||
if (args.length === 0) { | ||
return; | ||
} | ||
callback = args.pop(); | ||
callback.apply(null, args); | ||
}; | ||
exports.serialQueue = function () { | ||
var args = Array.prototype.slice.apply(arguments); | ||
return wrapOrApply(Nue.prototype.serialQueue, args); | ||
}; | ||
exports.serial = function () { | ||
var args = Array.prototype.slice.apply(arguments); | ||
return wrapOrApply(Nue.prototype.serial, args); | ||
}; | ||
exports.serialEach = function () { | ||
var args = Array.prototype.slice.apply(arguments); | ||
return wrapOrApply(Nue.prototype.serialEach, args); | ||
}; | ||
exports.parallelQueue = function () { | ||
var args = Array.prototype.slice.apply(arguments); | ||
return wrapOrApply(Nue.prototype.parallelQueue, args); | ||
}; | ||
exports.parallel = function () { | ||
var args = Array.prototype.slice.apply(arguments); | ||
return wrapOrApply(Nue.prototype.parallel, args); | ||
}; | ||
exports.parallelEach = function () { | ||
var args = Array.prototype.slice.apply(arguments); | ||
return wrapOrApply(Nue.prototype.parallelEach, args); | ||
}; | ||
function wrapOrApply (fn, args) { | ||
var batchSize = exports.batchSize; | ||
if (typeof args[0] === 'number') { | ||
batchSize = args[0]; | ||
return function () { | ||
var args = Array.prototype.slice.apply(arguments); | ||
return fn.apply(new Nue(batchSize), args) | ||
}; | ||
} | ||
return fn.apply(new Nue(batchSize), args) | ||
} | ||
/** | ||
* | ||
* @param tickSize | ||
* @param batchSize | ||
* @param worker | ||
* @param callback | ||
*/ | ||
function SeriesQueue(tickSize, worker, callback) { | ||
this.tickSize = tickSize; | ||
function SerialQueue(batchSize, worker, end) { | ||
this.batchSize = batchSize; | ||
this.worker = worker; | ||
this.callback = callback || noOp; | ||
this.end = end || noOp; | ||
this.values = []; | ||
this.isAddingCompleted = false; | ||
this.isPushed = false; | ||
}; | ||
this.pushCount = 0; | ||
this.queueIndex = 0; | ||
this.data = {}; | ||
} | ||
SeriesQueue.prototype.push = function(value) { | ||
SerialQueue.prototype.push = function(value) { | ||
var self = this; | ||
if (this.isAddingCompleted) { | ||
throw new Error('This queue has already been frozen.'); | ||
} | ||
this.values.push(value); | ||
self.pushCount++; | ||
if (!this.isPushed) { | ||
this.isPushed = true; | ||
process.nextTick(function() { | ||
executeBatch([]); | ||
}); | ||
} | ||
function executeBatch (args) { | ||
var values = self.values.splice(0, self.tickSize); | ||
var values = self.batchSize > 0 | ||
? self.values.splice(0, self.batchSize) | ||
: self.values; | ||
if (values.length === 0) { | ||
if (self.isAddingCompleted) { | ||
self.end.apply({data: self.data}, args); | ||
} | ||
return; | ||
} | ||
(function execute(value, args) { | ||
var context; | ||
if (value === sentinel) { | ||
self.callback(null); | ||
return; | ||
} | ||
context = { | ||
(function execute(value, values, args) { | ||
var context = { | ||
index: self.queueIndex, | ||
isFirst: self.queueIndex === 0, | ||
isLast: self.isAddingCompleted && self.queueIndex === self.pushCount - 1, | ||
data: self.data, | ||
next: function () { | ||
var args = Array.prototype.slice.apply(arguments); | ||
context.next = noOp; | ||
self.data = context.data; | ||
self.queueIndex++; | ||
if (values.length) { | ||
execute(values.shift(), args); | ||
execute(values.shift(), values, args); | ||
} else { | ||
@@ -47,21 +123,15 @@ process.nextTick(function () { | ||
}, | ||
end: self.callback | ||
end: function () { | ||
var args = Array.prototype.slice.apply(arguments); | ||
var end = self.end; | ||
self.end = noOp; | ||
end.apply({data: context.data}, args); | ||
} | ||
}; | ||
self.worker.apply(context, [value].concat(args)); | ||
}(values.shift(), args)); | ||
}(values.shift(), values, args)); | ||
} | ||
if (this.isAddingCompleted) { | ||
throw new Error('This queue has already been frozen.'); | ||
} | ||
this.values.push(value); | ||
if (!this.isPushed) { | ||
this.isPushed = true; | ||
process.nextTick(function() { | ||
executeBatch([]); | ||
}); | ||
} | ||
}; | ||
SeriesQueue.prototype.complete = function() { | ||
this.push(sentinel); | ||
SerialQueue.prototype.complete = function() { | ||
this.isAddingCompleted = true; | ||
@@ -72,7 +142,6 @@ }; | ||
* | ||
* @param tickSize | ||
* @param batchSize | ||
* @param tasks | ||
* @param callback | ||
*/ | ||
function Series(tickSize, tasks, callback) { | ||
function Serial(batchSize, tasks, end) { | ||
var worker = function(task) { | ||
@@ -82,3 +151,3 @@ var args = Array.prototype.slice.call(arguments, 1); | ||
}; | ||
var queue = new SeriesQueue(tickSize, worker, callback); | ||
var queue = new SerialQueue(batchSize, worker, end); | ||
tasks.forEach(function (task) { | ||
@@ -88,21 +157,77 @@ queue.push(task); | ||
queue.complete(); | ||
}; | ||
} | ||
function SerialEach(batchSize, worker, begin, end) { | ||
var context = { | ||
data: {}, | ||
begin: function () { | ||
var args = Array.prototype.slice.apply(arguments); | ||
var values; | ||
context.begin = noOp; | ||
if (args.length === 0) { | ||
values = []; | ||
} else if (args.length === 1) { | ||
values = Array.isArray(args[0]) ? args[0] : [args[0]]; | ||
} else { | ||
values = args; | ||
} | ||
var queue = new SerialQueue(batchSize, worker, end); | ||
queue.data = context.data; | ||
values.forEach(function (value) { | ||
queue.push(value); | ||
}); | ||
queue.complete(); | ||
} | ||
}; | ||
begin.apply(context); | ||
} | ||
/** | ||
* | ||
* @param tickSize | ||
* @param batchSize | ||
* @param worker | ||
* @param callback | ||
* @param begin | ||
* @param end | ||
*/ | ||
function ParallelQueue(tickSize, worker, callback) { | ||
this.tickSize = tickSize; | ||
function ParallelQueue(batchSize, worker, begin, end) { | ||
this.batchSize = batchSize; | ||
this.worker = worker; | ||
this.callback = callback || noOp; | ||
this.begin = begin; | ||
this.end = end || noOp; | ||
this.values = []; | ||
this.isAddingCompleted = false; | ||
this.isCanceled = false; | ||
}; | ||
this.isPushed = false; | ||
this.pushCount = 0; | ||
this.taskCount = 0; | ||
this.results = []; | ||
this.args = []; | ||
} | ||
ParallelQueue.prototype.push = function (value) { | ||
var self = this; | ||
if (this.isAddingCompleted) { | ||
throw new Error('This queue has already been frozen.'); | ||
} | ||
if (this.isCanceled) { | ||
return; | ||
} | ||
this.values.push({index : this.pushCount, value: value}); | ||
this.pushCount++; | ||
this.taskCount++; | ||
if (!this.isPushed) { | ||
this.isPushed = true; | ||
(function () { | ||
var context = { | ||
fork: function () { | ||
self.args = Array.prototype.slice.apply(arguments); | ||
context.fork = noOp; | ||
process.nextTick(executeBatch); | ||
} | ||
}; | ||
self.begin.apply(context); | ||
}()); | ||
} else { | ||
process.nextTick(executeBatch); | ||
} | ||
function executeBatch() { | ||
@@ -113,3 +238,3 @@ var context; | ||
var i; | ||
for (i = 0; values.length && i < self.tickSize; i++) { | ||
for (i = 0; values.length && (self.batchSize > 0 && i < self.batchSize || self.batchSize < 0); i++) { | ||
if (self.isCanceled) { | ||
@@ -119,27 +244,30 @@ break; | ||
value = values.shift(); | ||
if (value === sentinel) { | ||
self.isCompleted = true; | ||
self.callback(null); | ||
if (value.value === sentinel) { | ||
process.nextTick(function _end() { | ||
if (self.taskCount === 1) { | ||
self.end.call(null, null, self.results); | ||
} else { | ||
process.nextTick(_end); | ||
} | ||
}); | ||
return; | ||
} | ||
context = { | ||
next: noOp, | ||
end: function () { | ||
var args = Array.prototype.slice.apply(arguments); | ||
self.callback.apply(self, args); | ||
self.callback = noOp; | ||
self.isCanceled = true; | ||
} | ||
context = {}; | ||
context.index = value.index; | ||
context.join = (function(index, context) { | ||
return function (result) { | ||
self.results[index] = result; | ||
self.taskCount--; | ||
context.join = noOp; | ||
}; | ||
}(value.index, context)); | ||
context.end = function () { | ||
var args = Array.prototype.slice.apply(arguments); | ||
self.end.apply(null, args); | ||
self.end = noOp; | ||
self.isCanceled = true; | ||
}; | ||
self.worker.call(context, value); | ||
} | ||
self.worker.call(context, value.value, self.args[value.index]); | ||
} | ||
} | ||
if (this.isAddingCompleted) { | ||
throw new Error('This queue has already been frozen.'); | ||
} | ||
if (this.isCanceled) { | ||
return; | ||
} | ||
this.values.push(value); | ||
process.nextTick(executeBatch); | ||
}; | ||
@@ -154,11 +282,12 @@ | ||
* | ||
* @param tickSize | ||
* @param batchSize | ||
* @param tasks | ||
* @param callback | ||
* @param begin | ||
* @param end | ||
*/ | ||
function Parallel(tickSize, tasks, callback) { | ||
var worker = function (task) { | ||
task.call(this); | ||
function Parallel(batchSize, tasks, begin, end) { | ||
var worker = function (task, arg) { | ||
task.call(this, arg); | ||
}; | ||
var queue = new ParallelQueue(tickSize, worker, callback); | ||
var queue = new ParallelQueue(batchSize, worker, begin, end); | ||
tasks.forEach(function (task) { | ||
@@ -168,45 +297,138 @@ queue.push(task); | ||
queue.complete(); | ||
}; | ||
} | ||
/** | ||
* | ||
* @param tickSize | ||
* @param batchSize | ||
* @param worker | ||
* @param begin | ||
* @param end | ||
*/ | ||
function Nue (tickSize) { | ||
this.tickSize = tickSize; | ||
function ParallelEach(batchSize, worker, begin, end) { | ||
var context = { | ||
begin: function () { | ||
var args = Array.prototype.slice.apply(arguments); | ||
var values; | ||
context.begin = noOp; | ||
if (args.length === 0) { | ||
values = []; | ||
} else if (args.length === 1) { | ||
values = Array.isArray(args[0]) ? args[0] : [args[0]]; | ||
} else { | ||
values = args; | ||
} | ||
var beginBridge = function () { | ||
this.fork(); | ||
}; | ||
var queue = new ParallelQueue(batchSize, worker, beginBridge, end); | ||
values.forEach(function (value) { | ||
queue.push(value); | ||
}); | ||
queue.complete(); | ||
} | ||
}; | ||
begin.apply(context); | ||
} | ||
/** | ||
* | ||
* @param batchSize | ||
*/ | ||
function Nue (batchSize) { | ||
this.batchSize = batchSize; | ||
} | ||
Nue.prototype.serialQueue = function (worker) { | ||
return new SerialQueue(this.batchSize, worker); | ||
}; | ||
Nue.prototype.seriesQueue = function (worker, callback) { | ||
return new SeriesQueue(this.tickSize, worker, callback); | ||
Nue.prototype.serial = function () { | ||
var tasks = Array.prototype.slice.apply(arguments); | ||
var batchSize = this.batchSize; | ||
return function () { | ||
var self = this; | ||
var args = Array.prototype.slice.apply(arguments); | ||
var begin = tasks[0]; | ||
var end; | ||
var endWrapper; | ||
tasks[0] = function () { | ||
begin.apply(this, args); | ||
}; | ||
if (tasks.length > 1) { | ||
end = tasks.pop(); | ||
} | ||
return new Serial(batchSize, tasks, end); | ||
}; | ||
}; | ||
Nue.prototype.series = function (tasks, callback) { | ||
var args = Array.prototype.slice.apply(arguments); | ||
if (!Array.isArray(args[0])) { | ||
tasks = args; | ||
callback = null; | ||
} | ||
return new Series(this.tickSize, tasks, callback); | ||
Nue.prototype.serialEach = function (begin, worker, end) { | ||
var batchSize = this.batchSize; | ||
return function () { | ||
var self = this; | ||
var args = Array.prototype.slice.apply(arguments); | ||
var beginWrapper = function () { | ||
begin.apply(this, args); | ||
}; | ||
var endWrapper; | ||
if (end) { | ||
endWrapper = function() { | ||
var args = Array.prototype.slice.apply(arguments); | ||
end.apply(self, args); | ||
}; | ||
} | ||
return new SerialEach(batchSize, worker, beginWrapper, endWrapper); | ||
}; | ||
}; | ||
Nue.prototype.parallelQueue = function (worker, callback) { | ||
return new ParallelQueue(this.tickSize, worker, callback); | ||
Nue.prototype.parallelQueue = function (worker, end) { | ||
var begin = function () { | ||
this.fork(); | ||
}; | ||
return new ParallelQueue(this.batchSize, worker, begin, end); | ||
}; | ||
Nue.prototype.parallel = function (tasks, callback) { | ||
Nue.prototype.parallel = function (begin, tasks, end) { | ||
var args = Array.prototype.slice.apply(arguments); | ||
if (!Array.isArray(args[0])) { | ||
tasks = args; | ||
callback = null; | ||
if (Array.isArray(args[0])) { | ||
begin = function () { | ||
this.fork(); | ||
}; | ||
tasks = args[0]; | ||
end = typeof args[1] === 'function' ? args[1] : null; | ||
} | ||
return new Parallel(this.tickSize, tasks, callback); | ||
var batchSize = this.batchSize; | ||
return function () { | ||
var self = this; | ||
var args = Array.prototype.slice.apply(arguments); | ||
var beginWrapper = function () { | ||
begin.apply(this, args); | ||
}; | ||
var endWrapper; | ||
if (end) { | ||
endWrapper = function() { | ||
var args = Array.prototype.slice.apply(arguments); | ||
end.apply(self, args); | ||
}; | ||
} | ||
return new Parallel(batchSize, tasks, beginWrapper, endWrapper); | ||
}; | ||
}; | ||
nue.DEFAULT_TICK_SIZE = 3; | ||
nue.name = 'nue'; | ||
nue.version = '0.0.1'; | ||
nue.tickSize = nue.DEFAULT_TICK_SIZE; | ||
nue.seriesQueue = new Nue(nue.tickSize).seriesQueue; | ||
nue.series = new Nue(nue.tickSize).series; | ||
nue.parallelQueue = new Nue(nue.tickSize).parallelQueue; | ||
nue.parallel = new Nue(nue.tickSize).parallel; | ||
Nue.prototype.parallelEach = function (begin, worker, end) { | ||
var batchSize = this.batchSize; | ||
return function () { | ||
var self = this; | ||
var args = Array.prototype.slice.apply(arguments); | ||
var beginWrapper = function () { | ||
begin.apply(this, args); | ||
}; | ||
var endWrapper; | ||
if (end) { | ||
endWrapper = function() { | ||
var args = Array.prototype.slice.apply(arguments); | ||
end.apply(self, args); | ||
}; | ||
} | ||
return new ParallelEach(batchSize, worker, beginWrapper, endWrapper); | ||
}; | ||
}; |
@@ -9,3 +9,6 @@ { | ||
"main" : "./lib/nue.js", | ||
"version" : "0.0.1" | ||
"devDependencies": { | ||
"mocha": "~0.11.0" | ||
}, | ||
"version" : "0.0.2" | ||
} |
210
README.md
@@ -1,116 +0,134 @@ | ||
nue — An async control-flow library suited for the node event loop | ||
=================================================================== | ||
nue — An async control-flow library | ||
=================================== | ||
nue is an async control-flow library. | ||
nue is an async control-flow library suited for the node event loop. | ||
## Examples | ||
## Installing | ||
> JavaScript | ||
``` | ||
$ npm install nue | ||
``` | ||
## Example | ||
### serial | ||
```js | ||
var nue = require('nue'); | ||
var start = nue.start; | ||
var serial = nue.serial; | ||
var fs = require('fs'); | ||
step1(); | ||
start(serial( | ||
function (){ | ||
this.data = []; | ||
fs.readFile('file1', this.next); | ||
}, | ||
function (err, data){ | ||
if (err) throw this.end(err); | ||
this.data.push(data.length); | ||
fs.readFile('file2', this.next); | ||
}, | ||
function (err, data){ | ||
if (err) throw this.end(err); | ||
this.data.push(data.length); | ||
this.next(); | ||
}, | ||
function (err) { | ||
if (err) throw err; | ||
console.log(this.data); | ||
} | ||
)); | ||
``` | ||
function step1() { | ||
console.log('step1 start'); | ||
nue.parallel([ | ||
function(){ | ||
console.log('aaa'); | ||
}, | ||
function(){ | ||
console.log('bbb'); | ||
}], | ||
function(err){ | ||
if (err) throw err; | ||
console.log('step1 end\n'); | ||
step2(); | ||
} | ||
); | ||
} | ||
### serialEach | ||
function step2() { | ||
console.log('step2 start'); | ||
nue.series([ | ||
function () { | ||
console.log('ccc'); | ||
this.next('test', 2); | ||
}, | ||
function (a, b){ | ||
console.log('ddd ' + a + b); | ||
this.next(); | ||
}], | ||
function (err) { | ||
if (err) throw err; | ||
console.log('step2 end\n'); | ||
step3(); | ||
} | ||
); | ||
} | ||
```js | ||
var nue = require('nue'); | ||
var start = nue.start; | ||
var serial = nue.serial; | ||
var serialEach = nue.serialEach; | ||
var fs = require('fs'); | ||
function step3() { | ||
console.log("step3 start"); | ||
var q = nue.parallelQueue( | ||
function (data){ | ||
console.log('data: ' + data); | ||
}, | ||
function (err) { | ||
if (err) throw err; | ||
console.log('step3 end\n'); | ||
step4(); | ||
} | ||
); | ||
for (var i = 0; i < 10; i++) { | ||
q.push(i); | ||
start(serialEach( | ||
function () { | ||
this.data = 0; | ||
this.begin('file1', 'file2', 'file3'); | ||
}, | ||
function (name) { | ||
var self = this; | ||
fs.readFile(name, function (err, data) { | ||
if (err) throw this.end(err); | ||
self.data += data.length; | ||
self.next(null, self.data); | ||
}); | ||
}, | ||
function (err, data) { | ||
if (err) throw err; | ||
console.log(data); | ||
} | ||
q.complete(); | ||
} | ||
)); | ||
``` | ||
function step4() { | ||
console.log("step4 start"); | ||
var q = nue.seriesQueue( | ||
function (data){ | ||
console.log('data: ' + data); | ||
this.next(); | ||
### parallel | ||
```js | ||
var nue = require('nue'); | ||
var start = nue.start; | ||
var serial = nue.serial; | ||
var parallel = nue.parallel; | ||
var fs = require('fs'); | ||
start(parallel( | ||
function () { | ||
this.fork('file1', 'file2'); | ||
}, | ||
[ | ||
function (name) { | ||
var self = this; | ||
fs.readFile(name, function (err, data) { | ||
if (err) this.err(err); | ||
self.join(data.length); | ||
}); | ||
}, | ||
function (err) { | ||
if (err) throw err; | ||
console.log('step4 end\n'); | ||
function (path) { | ||
var self = this; | ||
fs.stat(path, function (err, stats) { | ||
if (err) this.err(err); | ||
self.join(stats.isFile()); | ||
}); | ||
} | ||
); | ||
for (var i = 0; i < 10; i++) { | ||
q.push(i); | ||
], | ||
function (err, results) { | ||
if (err) throw err; | ||
console.log(results); | ||
} | ||
q.complete(); | ||
} | ||
)); | ||
``` | ||
> Result | ||
### parallelEach | ||
``` | ||
step1 start | ||
aaa: | ||
bbb: | ||
step1 end | ||
```js | ||
var nue = require('nue'); | ||
var start = nue.start; | ||
var serial = nue.serial; | ||
var parallelEach = nue.parallelEach; | ||
var fs = require('fs'); | ||
step2 start | ||
ccc: | ||
ddd: test, 2 | ||
step2 end | ||
step3 start | ||
data: 0 | ||
data: 1 | ||
data: 2 | ||
data: 3 | ||
data: 4 | ||
step3 end | ||
step4 start | ||
data: 0 | ||
data: 1 | ||
data: 2 | ||
data: 3 | ||
data: 4 | ||
step4 end | ||
start(parallelEach( | ||
function () { | ||
this.begin('file1', 'file2'); | ||
}, | ||
function (name) { | ||
var self = this; | ||
fs.readFile(name, function (err, data) { | ||
if (err) this.end(err); | ||
self.join(data.length); | ||
}); | ||
}, | ||
function (err, results) { | ||
if (err) throw err; | ||
console.log(results); | ||
} | ||
)); | ||
``` |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
Filesystem access
Supply chain riskAccesses the file system, and could potentially read sensitive data.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
16681
10
485
135
1
4
1