Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

nue

Package Overview
Dependencies
Maintainers
1
Versions
17
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

nue - npm Package Compare versions

Comparing version 0.0.1 to 0.0.2

.idea/.name

424

lib/nue.js

@@ -1,41 +0,117 @@

var nue = module.exports = function(tickSize) {
tickSize = typeof tickSize === 'number' ? tickSize : nue.tickSize;
return new Nue(tickSize);
};
var sentinel = {};
var noOp = function () {};
exports.DEFAULT_BATCH_SIZE = 3;
exports.name = 'nue';
exports.version = '0.0.1';
exports.batchSize = exports.DEFAULT_BATCH_SIZE;
exports.start = function () {
var args = Array.prototype.slice.apply(arguments);
var callback;
if (args.length === 0) {
return;
}
callback = args.pop();
callback.apply(null, args);
};
exports.serialQueue = function () {
var args = Array.prototype.slice.apply(arguments);
return wrapOrApply(Nue.prototype.serialQueue, args);
};
exports.serial = function () {
var args = Array.prototype.slice.apply(arguments);
return wrapOrApply(Nue.prototype.serial, args);
};
exports.serialEach = function () {
var args = Array.prototype.slice.apply(arguments);
return wrapOrApply(Nue.prototype.serialEach, args);
};
exports.parallelQueue = function () {
var args = Array.prototype.slice.apply(arguments);
return wrapOrApply(Nue.prototype.parallelQueue, args);
};
exports.parallel = function () {
var args = Array.prototype.slice.apply(arguments);
return wrapOrApply(Nue.prototype.parallel, args);
};
exports.parallelEach = function () {
var args = Array.prototype.slice.apply(arguments);
return wrapOrApply(Nue.prototype.parallelEach, args);
};
function wrapOrApply (fn, args) {
var batchSize = exports.batchSize;
if (typeof args[0] === 'number') {
batchSize = args[0];
return function () {
var args = Array.prototype.slice.apply(arguments);
return fn.apply(new Nue(batchSize), args)
};
}
return fn.apply(new Nue(batchSize), args)
}
/**
*
* @param tickSize
* @param batchSize
* @param worker
* @param callback
*/
function SeriesQueue(tickSize, worker, callback) {
this.tickSize = tickSize;
function SerialQueue(batchSize, worker, end) {
this.batchSize = batchSize;
this.worker = worker;
this.callback = callback || noOp;
this.end = end || noOp;
this.values = [];
this.isAddingCompleted = false;
this.isPushed = false;
};
this.pushCount = 0;
this.queueIndex = 0;
this.data = {};
}
SeriesQueue.prototype.push = function(value) {
SerialQueue.prototype.push = function(value) {
var self = this;
if (this.isAddingCompleted) {
throw new Error('This queue has already been frozen.');
}
this.values.push(value);
self.pushCount++;
if (!this.isPushed) {
this.isPushed = true;
process.nextTick(function() {
executeBatch([]);
});
}
function executeBatch (args) {
var values = self.values.splice(0, self.tickSize);
var values = self.batchSize > 0
? self.values.splice(0, self.batchSize)
: self.values;
if (values.length === 0) {
if (self.isAddingCompleted) {
self.end.apply({data: self.data}, args);
}
return;
}
(function execute(value, args) {
var context;
if (value === sentinel) {
self.callback(null);
return;
}
context = {
(function execute(value, values, args) {
var context = {
index: self.queueIndex,
isFirst: self.queueIndex === 0,
isLast: self.isAddingCompleted && self.queueIndex === self.pushCount - 1,
data: self.data,
next: function () {
var args = Array.prototype.slice.apply(arguments);
context.next = noOp;
self.data = context.data;
self.queueIndex++;
if (values.length) {
execute(values.shift(), args);
execute(values.shift(), values, args);
} else {

@@ -47,21 +123,15 @@ process.nextTick(function () {

},
end: self.callback
end: function () {
var args = Array.prototype.slice.apply(arguments);
var end = self.end;
self.end = noOp;
end.apply({data: context.data}, args);
}
};
self.worker.apply(context, [value].concat(args));
}(values.shift(), args));
}(values.shift(), values, args));
}
if (this.isAddingCompleted) {
throw new Error('This queue has already been frozen.');
}
this.values.push(value);
if (!this.isPushed) {
this.isPushed = true;
process.nextTick(function() {
executeBatch([]);
});
}
};
SeriesQueue.prototype.complete = function() {
this.push(sentinel);
SerialQueue.prototype.complete = function() {
this.isAddingCompleted = true;

@@ -72,7 +142,6 @@ };

*
* @param tickSize
* @param batchSize
* @param tasks
* @param callback
*/
function Series(tickSize, tasks, callback) {
function Serial(batchSize, tasks, end) {
var worker = function(task) {

@@ -82,3 +151,3 @@ var args = Array.prototype.slice.call(arguments, 1);

};
var queue = new SeriesQueue(tickSize, worker, callback);
var queue = new SerialQueue(batchSize, worker, end);
tasks.forEach(function (task) {

@@ -88,21 +157,77 @@ queue.push(task);

queue.complete();
};
}
function SerialEach(batchSize, worker, begin, end) {
var context = {
data: {},
begin: function () {
var args = Array.prototype.slice.apply(arguments);
var values;
context.begin = noOp;
if (args.length === 0) {
values = [];
} else if (args.length === 1) {
values = Array.isArray(args[0]) ? args[0] : [args[0]];
} else {
values = args;
}
var queue = new SerialQueue(batchSize, worker, end);
queue.data = context.data;
values.forEach(function (value) {
queue.push(value);
});
queue.complete();
}
};
begin.apply(context);
}
/**
*
* @param tickSize
* @param batchSize
* @param worker
* @param callback
* @param begin
* @param end
*/
function ParallelQueue(tickSize, worker, callback) {
this.tickSize = tickSize;
function ParallelQueue(batchSize, worker, begin, end) {
this.batchSize = batchSize;
this.worker = worker;
this.callback = callback || noOp;
this.begin = begin;
this.end = end || noOp;
this.values = [];
this.isAddingCompleted = false;
this.isCanceled = false;
};
this.isPushed = false;
this.pushCount = 0;
this.taskCount = 0;
this.results = [];
this.args = [];
}
ParallelQueue.prototype.push = function (value) {
var self = this;
if (this.isAddingCompleted) {
throw new Error('This queue has already been frozen.');
}
if (this.isCanceled) {
return;
}
this.values.push({index : this.pushCount, value: value});
this.pushCount++;
this.taskCount++;
if (!this.isPushed) {
this.isPushed = true;
(function () {
var context = {
fork: function () {
self.args = Array.prototype.slice.apply(arguments);
context.fork = noOp;
process.nextTick(executeBatch);
}
};
self.begin.apply(context);
}());
} else {
process.nextTick(executeBatch);
}
function executeBatch() {

@@ -113,3 +238,3 @@ var context;

var i;
for (i = 0; values.length && i < self.tickSize; i++) {
for (i = 0; values.length && (self.batchSize > 0 && i < self.batchSize || self.batchSize < 0); i++) {
if (self.isCanceled) {

@@ -119,27 +244,30 @@ break;

value = values.shift();
if (value === sentinel) {
self.isCompleted = true;
self.callback(null);
if (value.value === sentinel) {
process.nextTick(function _end() {
if (self.taskCount === 1) {
self.end.call(null, null, self.results);
} else {
process.nextTick(_end);
}
});
return;
}
context = {
next: noOp,
end: function () {
var args = Array.prototype.slice.apply(arguments);
self.callback.apply(self, args);
self.callback = noOp;
self.isCanceled = true;
}
context = {};
context.index = value.index;
context.join = (function(index, context) {
return function (result) {
self.results[index] = result;
self.taskCount--;
context.join = noOp;
};
}(value.index, context));
context.end = function () {
var args = Array.prototype.slice.apply(arguments);
self.end.apply(null, args);
self.end = noOp;
self.isCanceled = true;
};
self.worker.call(context, value);
}
self.worker.call(context, value.value, self.args[value.index]);
}
}
if (this.isAddingCompleted) {
throw new Error('This queue has already been frozen.');
}
if (this.isCanceled) {
return;
}
this.values.push(value);
process.nextTick(executeBatch);
};

@@ -154,11 +282,12 @@

*
* @param tickSize
* @param batchSize
* @param tasks
* @param callback
* @param begin
* @param end
*/
function Parallel(tickSize, tasks, callback) {
var worker = function (task) {
task.call(this);
function Parallel(batchSize, tasks, begin, end) {
var worker = function (task, arg) {
task.call(this, arg);
};
var queue = new ParallelQueue(tickSize, worker, callback);
var queue = new ParallelQueue(batchSize, worker, begin, end);
tasks.forEach(function (task) {

@@ -168,45 +297,138 @@ queue.push(task);

queue.complete();
};
}
/**
*
* @param tickSize
* @param batchSize
* @param worker
* @param begin
* @param end
*/
function Nue (tickSize) {
this.tickSize = tickSize;
function ParallelEach(batchSize, worker, begin, end) {
var context = {
begin: function () {
var args = Array.prototype.slice.apply(arguments);
var values;
context.begin = noOp;
if (args.length === 0) {
values = [];
} else if (args.length === 1) {
values = Array.isArray(args[0]) ? args[0] : [args[0]];
} else {
values = args;
}
var beginBridge = function () {
this.fork();
};
var queue = new ParallelQueue(batchSize, worker, beginBridge, end);
values.forEach(function (value) {
queue.push(value);
});
queue.complete();
}
};
begin.apply(context);
}
/**
*
* @param batchSize
*/
function Nue (batchSize) {
this.batchSize = batchSize;
}
Nue.prototype.serialQueue = function (worker) {
return new SerialQueue(this.batchSize, worker);
};
Nue.prototype.seriesQueue = function (worker, callback) {
return new SeriesQueue(this.tickSize, worker, callback);
Nue.prototype.serial = function () {
var tasks = Array.prototype.slice.apply(arguments);
var batchSize = this.batchSize;
return function () {
var self = this;
var args = Array.prototype.slice.apply(arguments);
var begin = tasks[0];
var end;
var endWrapper;
tasks[0] = function () {
begin.apply(this, args);
};
if (tasks.length > 1) {
end = tasks.pop();
}
return new Serial(batchSize, tasks, end);
};
};
Nue.prototype.series = function (tasks, callback) {
var args = Array.prototype.slice.apply(arguments);
if (!Array.isArray(args[0])) {
tasks = args;
callback = null;
}
return new Series(this.tickSize, tasks, callback);
Nue.prototype.serialEach = function (begin, worker, end) {
var batchSize = this.batchSize;
return function () {
var self = this;
var args = Array.prototype.slice.apply(arguments);
var beginWrapper = function () {
begin.apply(this, args);
};
var endWrapper;
if (end) {
endWrapper = function() {
var args = Array.prototype.slice.apply(arguments);
end.apply(self, args);
};
}
return new SerialEach(batchSize, worker, beginWrapper, endWrapper);
};
};
Nue.prototype.parallelQueue = function (worker, callback) {
return new ParallelQueue(this.tickSize, worker, callback);
Nue.prototype.parallelQueue = function (worker, end) {
var begin = function () {
this.fork();
};
return new ParallelQueue(this.batchSize, worker, begin, end);
};
Nue.prototype.parallel = function (tasks, callback) {
Nue.prototype.parallel = function (begin, tasks, end) {
var args = Array.prototype.slice.apply(arguments);
if (!Array.isArray(args[0])) {
tasks = args;
callback = null;
if (Array.isArray(args[0])) {
begin = function () {
this.fork();
};
tasks = args[0];
end = typeof args[1] === 'function' ? args[1] : null;
}
return new Parallel(this.tickSize, tasks, callback);
var batchSize = this.batchSize;
return function () {
var self = this;
var args = Array.prototype.slice.apply(arguments);
var beginWrapper = function () {
begin.apply(this, args);
};
var endWrapper;
if (end) {
endWrapper = function() {
var args = Array.prototype.slice.apply(arguments);
end.apply(self, args);
};
}
return new Parallel(batchSize, tasks, beginWrapper, endWrapper);
};
};
nue.DEFAULT_TICK_SIZE = 3;
nue.name = 'nue';
nue.version = '0.0.1';
nue.tickSize = nue.DEFAULT_TICK_SIZE;
nue.seriesQueue = new Nue(nue.tickSize).seriesQueue;
nue.series = new Nue(nue.tickSize).series;
nue.parallelQueue = new Nue(nue.tickSize).parallelQueue;
nue.parallel = new Nue(nue.tickSize).parallel;
Nue.prototype.parallelEach = function (begin, worker, end) {
var batchSize = this.batchSize;
return function () {
var self = this;
var args = Array.prototype.slice.apply(arguments);
var beginWrapper = function () {
begin.apply(this, args);
};
var endWrapper;
if (end) {
endWrapper = function() {
var args = Array.prototype.slice.apply(arguments);
end.apply(self, args);
};
}
return new ParallelEach(batchSize, worker, beginWrapper, endWrapper);
};
};

@@ -9,3 +9,6 @@ {

"main" : "./lib/nue.js",
"version" : "0.0.1"
"devDependencies": {
"mocha": "~0.11.0"
},
"version" : "0.0.2"
}

@@ -1,116 +0,134 @@

nue — An async control-flow library suited for the node event loop
===================================================================
nue — An async control-flow library
===================================
nue is an async control-flow library.
nue is an async control-flow library suited for the node event loop.
## Examples
## Installing
> JavaScript
```
$ npm install nue
```
## Example
### serial
```js
var nue = require('nue');
var start = nue.start;
var serial = nue.serial;
var fs = require('fs');
step1();
start(serial(
function (){
this.data = [];
fs.readFile('file1', this.next);
},
function (err, data){
if (err) throw this.end(err);
this.data.push(data.length);
fs.readFile('file2', this.next);
},
function (err, data){
if (err) throw this.end(err);
this.data.push(data.length);
this.next();
},
function (err) {
if (err) throw err;
console.log(this.data);
}
));
```
function step1() {
console.log('step1 start');
nue.parallel([
function(){
console.log('aaa');
},
function(){
console.log('bbb');
}],
function(err){
if (err) throw err;
console.log('step1 end\n');
step2();
}
);
}
### serialEach
function step2() {
console.log('step2 start');
nue.series([
function () {
console.log('ccc');
this.next('test', 2);
},
function (a, b){
console.log('ddd ' + a + b);
this.next();
}],
function (err) {
if (err) throw err;
console.log('step2 end\n');
step3();
}
);
}
```js
var nue = require('nue');
var start = nue.start;
var serial = nue.serial;
var serialEach = nue.serialEach;
var fs = require('fs');
function step3() {
console.log("step3 start");
var q = nue.parallelQueue(
function (data){
console.log('data: ' + data);
},
function (err) {
if (err) throw err;
console.log('step3 end\n');
step4();
}
);
for (var i = 0; i < 10; i++) {
q.push(i);
start(serialEach(
function () {
this.data = 0;
this.begin('file1', 'file2', 'file3');
},
function (name) {
var self = this;
fs.readFile(name, function (err, data) {
if (err) throw this.end(err);
self.data += data.length;
self.next(null, self.data);
});
},
function (err, data) {
if (err) throw err;
console.log(data);
}
q.complete();
}
));
```
function step4() {
console.log("step4 start");
var q = nue.seriesQueue(
function (data){
console.log('data: ' + data);
this.next();
### parallel
```js
var nue = require('nue');
var start = nue.start;
var serial = nue.serial;
var parallel = nue.parallel;
var fs = require('fs');
start(parallel(
function () {
this.fork('file1', 'file2');
},
[
function (name) {
var self = this;
fs.readFile(name, function (err, data) {
if (err) this.err(err);
self.join(data.length);
});
},
function (err) {
if (err) throw err;
console.log('step4 end\n');
function (path) {
var self = this;
fs.stat(path, function (err, stats) {
if (err) this.err(err);
self.join(stats.isFile());
});
}
);
for (var i = 0; i < 10; i++) {
q.push(i);
],
function (err, results) {
if (err) throw err;
console.log(results);
}
q.complete();
}
));
```
> Result
### parallelEach
```
step1 start
aaa:
bbb:
step1 end
```js
var nue = require('nue');
var start = nue.start;
var serial = nue.serial;
var parallelEach = nue.parallelEach;
var fs = require('fs');
step2 start
ccc:
ddd: test, 2
step2 end
step3 start
data: 0
data: 1
data: 2
data: 3
data: 4
step3 end
step4 start
data: 0
data: 1
data: 2
data: 3
data: 4
step4 end
start(parallelEach(
function () {
this.begin('file1', 'file2');
},
function (name) {
var self = this;
fs.readFile(name, function (err, data) {
if (err) this.end(err);
self.join(data.length);
});
},
function (err, results) {
if (err) throw err;
console.log(results);
}
));
```
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc