Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

nue

Package Overview
Dependencies
Maintainers
1
Versions
17
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

nue - npm Package Compare versions

Comparing version 0.0.7 to 0.0.8

452

lib/nue.js
'use strict';
// require
var EventEmitter = require('events').EventEmitter;

@@ -9,37 +7,74 @@ var util = require('util');

// exports
exports.DEFAULT_BATCH_SIZE = 10;
exports.name = 'nue';
exports.version = '0.0.7';
exports.batchSize = exports.DEFAULT_BATCH_SIZE;
exports.version = '0.0.8';
exports.flow = flow;
// variables
function flow() {
var functions = Array.prototype.slice.call(arguments);
var deferred = function deferred() {
var args = Array.prototype.slice.call(arguments);
var tasks = functions.length > 0 ? functions : function () {this.next();};
runFlow(tasks, deferred, args, this);
};
attachEmitter(deferred);
return deferred;
}
var noop = function () {};
function runFlow(tasks, caller, callerArgs, callerContext) {
var flow = {
args: callerArgs,
data: {},
err: undefined,
lastStep: null
};
var steps = makeSteps(tasks, caller, callerContext, flow);
flow.lastStep = steps[steps.length - 1];
var context = steps.length > 1
? new StepContext(steps[0], flow)
: new LastStepContext(flow);
context.invoke();
}
// functions
function flow() {
if (typeof arguments[0] === 'number') {
var batchSize = arguments[0];
return function () {
return flowDefer(batchSize, Array.prototype.slice.call(arguments));
function makeSteps(tasks, caller, callerContext, flow) {
var steps = tasks.map(function (task) {
var type = typeof task;
if (type !== 'function') {
throw new Error('The task is a not function. ' + type);
}
if (task.__nue__) {
return task;
}
var deferred = function deferred() {
task.apply(this, arguments);
};
attachEmitter(deferred);
return deferred;
});
var len = steps.length - 1;
for (var i = 0; i < len; i++) {
(function chain(i, step, next) {
if (i < len - 1) {
step.events.once('done', function runNextStep() {
var context = new StepContext(next, flow);
context.invoke();
});
} else {
step.events.once('done', function runLastStep() {
var context = new LastStepContext(flow);
context.invoke();
});
}
}(i, steps[i], steps[i + 1]));
}
return flowDefer(null, Array.prototype.slice.call(arguments));
steps[steps.length - 1].events.once('done', function exitFlow() {
if (callerContext instanceof ContextBase) {
callerContext.next.apply(callerContext, flow.args);
} else {
caller.events.emit('done');
}
});
return steps;
}
function flowDefer(batchSize, tasks) {
var defer = function () {
var args = Array.prototype.slice.call(arguments);
var runner = new FlowRunner(batchSize, tasks, defer, args, this);
runner.run();
};
attachEmitter(defer);
return defer;
}
function attachEmitter(target) {

@@ -50,4 +85,5 @@ target.events = new EventEmitter();

// classes
function noop () {}
function ContextBase() {

@@ -57,4 +93,2 @@ this.next = this.next.bind(this);

this.async = this.async.bind(this);
this.queue = this.queue.bind(this);
this.parallelQueue = this.parallelQueue.bind(this);
this.asyncIndex = 0;

@@ -70,3 +104,3 @@ this.asyncCallCount = 0;

var self = this;
return (function (args, index) {
return (function makeCallback(args, index) {
function flatten(array) {

@@ -98,129 +132,9 @@ var results = [];

ContextBase.prototype.queue = function (worker) {
var self = this;
var batchSize = arguments[0];
if (typeof batchSize === 'number') {
return function (worker) {
return new Queue(batchSize, worker, self)
};
}
return new Queue(this.batchSize, worker, self);
};
ContextBase.prototype.parallelQueue = function (worker) {
var self = this;
var batchSize = arguments[0];
if (typeof batchSize === 'number') {
return function (worker) {
return new ParallelQueue(batchSize, worker, self)
};
}
return new ParallelQueue(this.batchSize, worker, self);
};
function FlowRunner(batchSize, tasks, caller, callerArgs, callerContext) {
this.batchSize = batchSize;
this.tasks = tasks;
this.caller = caller;
this.callerArgs = callerArgs;
this.callerContext = callerContext;
}
FlowRunner.prototype.endStep = function (emitter, flow) {
if (flow.batchSize > 0 && flow.batchSize === flow.callCount) {
flow.callCount = 0;
process.nextTick(function () {
emitter.emit('done', flow);
});
} else {
emitter.emit('done', flow);
}
};
FlowRunner.prototype.startStep = function (context, step, flow) {
flow.callCount++;
step.apply(context, flow.args);
};
FlowRunner.prototype.makeSteps = function () {
var steps = this.tasks.map(function (task, i) {
var fn;
if (typeof task !== 'function') {
throw new Error('not function. ' + typeof task);
}
if (task.__nue__) {
return task;
}
fn = function () {
task.apply(this, arguments);
};
attachEmitter(fn);
return fn;
});
var len = steps.length;
var self = this;
steps.forEach(function (step, i, steps) {
var next = steps[i + 1];
if (i < len - 1) {
(function (C) {
step.events.once('done', function(flow) {
self.startStep(new C(self, next, flow), next, flow);
});
}(i < len - 2 ? StepContext : LastStepContext));
}
});
if (steps.length === 0) {
steps.push(function () {
this.next();
});
attachEmitter(steps[0]);
}
return steps;
};
FlowRunner.prototype.startFlow = function () {
var steps = this.makeSteps();
var firstStep = steps[0];
var lastStep = steps[steps.length - 1];
var self = this;
lastStep.events.once('done', function (flow) {
if (self.callerContext instanceof ContextBase) {
self.callerContext.next.apply(self.callerContext, flow.args);
} else {
self.caller.events.emit('done', flow);
}
});
var flow = {
args: self.callerArgs,
data: {},
err: undefined,
lastStep: lastStep,
batchSize: this.batchSize || exports.batchSize || exports.DEFAULT_BATCH_SIZE,
callCount: 0
};
var context;
if (firstStep === lastStep) {
context = new LastStepContext(this, firstStep, flow)
} else {
context = new StepContext(this, firstStep, flow);
}
this.startStep(context, firstStep, flow);
};
FlowRunner.prototype.run = function () {
this.startFlow();
};
function StepContext(runner, step, flow) {
function StepContext(step, flow) {
ContextBase.call(this);
this.runner = runner;
this.step = step;
this.flow = flow;
this.batchSize = flow.batchSize;
this.data = flow.data;
this.err = flow.err;
}
util.inherits(StepContext, ContextBase);

@@ -232,4 +146,3 @@

this.flow.args = Array.prototype.slice.call(arguments);
this.flow.err = this.err;
this.runner.endStep(this.step.events, this.flow);
this.step.events.emit('done');
};

@@ -240,19 +153,23 @@

this.end = noop;
this.flow.err = arguments[0];
this.flow.args = Array.prototype.slice.call(arguments, 1);
this.flow.err = arguments[0];
this.flow.lastStep.apply(new LastStepContext(this.runner, this.step, this.flow), this.flow.args);
var context= new LastStepContext(this.flow);
context.invoke();
};
StepContext.prototype.invoke = function () {
try {
this.step.apply(this, this.flow.args);
} catch (e) {
StepContext.prototype.end.call(this, e);
}
};
function LastStepContext(runner, step, flow) {
function LastStepContext(flow) {
ContextBase.call(this);
this.runner = runner;
this.step = step;
this.flow = flow;
this.batchSize = flow.batchSize;
this.data = flow.data;
this.err = flow.err;
this.isNextCalled = false;
}
util.inherits(LastStepContext, ContextBase);

@@ -268,212 +185,13 @@

}
this.isNextCalled = true;
this.flow.err = this.err;
this.flow.args = Array.prototype.slice.call(arguments);
this.flow.err = this.err;
this.runner.endStep(this.flow.lastStep.events, this.flow);
this.flow.lastStep.events.emit('done');
};
LastStepContext.prototype.end = function () {
throw new Error('not supported.');
throw new Error('This function is unsupported in the last step.');
};
function Queue(batchSize, worker, callerContext) {
if (!(callerContext instanceof ContextBase)) {
throw new Error('The context is illegal. The function is out of the flow.');
}
this.batchSize = batchSize;
this.worker = worker;
this.callerContext = callerContext;
this.values = [];
this.results = [];
this.isAddingCompleted = false;
this.length = 0;
this.index = 0;
}
Queue.prototype.push = function (value) {
var self = this;
if (this.isAddingCompleted) {
throw new Error('This queue has already been frozen.');
}
this.values.push(value);
this.length++;
if (!this.runner) {
this.runner = new QueueRunner(this);
process.nextTick(function () {
self.runner.processValues();
});
}
};
Queue.prototype.complete = function() {
this.isAddingCompleted = true;
};
function QueueRunner (queue) {
this.queue = queue;
}
QueueRunner.prototype.runWorker = function (value, values) {
this.queue.worker.call(new QueueContext(this, values), value);
};
QueueRunner.prototype.processValues = function () {
var queue = this.queue;
var values = queue.batchSize > 0 ? queue.values.splice(0, queue.batchSize) : queue.values;
var self = this;
if (values.length === 0) {
if (queue.isAddingCompleted) {
queue.callerContext.next.call(queue.callerContext, queue.results);
} else {
process.nextTick(function () {
self.processValues();
});
}
} else {
self.runWorker(values.shift(), values)
}
};
function QueueContext(runner, values) {
ContextBase.call(this);
this.runner = runner;
this.values = values;
this.queue = runner.queue;
this.data = this.queue.data;
this.index = this.queue.index;
}
util.inherits(QueueContext, ContextBase);
QueueContext.prototype.next = function (result) {
this.next = noop;
this.end = noop;
this.queue.results[this.index] = result;
this.queue.index++;
if (this.values.length) {
this.runner.runWorker(this.values.shift(), this.values);
} else {
var self = this;
process.nextTick(function () {
self.runner.processValues();
});
}
};
QueueContext.prototype.end = function () {
this.next = noop;
this.end = noop;
this.queue.callerContext.end.apply(this.queue.callerContext, arguments);
};
function ParallelQueue(batchSize, worker, callerContext) {
if (!(callerContext instanceof ContextBase)) {
throw new Error('The context is illegal. The function is out of the flow.');
}
this.batchSize = batchSize;
this.worker = worker;
this.callerContext = callerContext;
this.values = [];
this.isAddingCompleted = false;
this.isCanceled = false;
this.length = 0;
this.backlog = 0;
this.results = [];
this.runner = new ParallelQueueRunner(this);
}
ParallelQueue.SENTINEL = {};
ParallelQueue.prototype.push = function (value) {
var self = this;
if (this.isAddingCompleted) {
throw new Error('This parallel queue has been already frozen.');
}
if (this.isCanceled) {
return;
}
this.values.push({index : this.length, value: value});
this.length++;
this.backlog++;
process.nextTick(function () {
self.runner.processValues();
});
};
ParallelQueue.prototype.complete = function() {
this.push(ParallelQueue.SENTINEL);
this.isAddingCompleted = true;
};
function ParallelQueueRunner(queue) {
this.queue = queue;
}
ParallelQueueRunner.prototype.endParallelQueue = function () {
var self = this;
if (this.queue.backlog === 1) {
this.queue.callerContext.next.call(this.queue.callerContext, this.queue.results);
} else {
process.nextTick(function () {
self.endParallelQueue();
});
}
};
ParallelQueueRunner.prototype.runWorker = function (value) {
var context = new ParallelQueueContext(this, this.queue.batchSize, value.index);
this.queue.worker.call(context, value.value);
};
ParallelQueueRunner.prototype.processValues = function () {
var value;
var i;
var queue = this.queue;
var self = this;
for (i = 0; queue.values.length && (queue.batchSize > 0 && i < queue.batchSize || queue.batchSize < 0); i++) {
if (queue.isCanceled) {
break;
}
value = queue.values.shift();
if (value.value === ParallelQueue.SENTINEL) {
process.nextTick(function () {
self.endParallelQueue();
});
} else {
this.runWorker(value);
}
}
};
function ParallelQueueContext(runner, batchSize, index) {
ContextBase.call(this);
this.runner = runner;
this.batchSize = batchSize;
this.index = index;
this.queue = runner.queue;
this.data = this.queue.data;
}
util.inherits(ParallelQueueContext, ContextBase);
ParallelQueueContext.prototype.next = function (result) {
this.next = noop;
this.end = noop;
this.queue.results[this.index] = result;
this.queue.backlog--;
};
ParallelQueueContext.prototype.end = function () {
this.next = noop;
this.end = noop;
if (!this.queue.isCanceled) {
this.queue.isCanceled = true;
this.queue.callerContext.end.apply(this.queue.callerContext, arguments);
}
};
LastStepContext.prototype.invoke = function () {
this.flow.lastStep.apply(this, this.flow.args);
};

4

package.json
{
"name" : "nue",
"description" : "An async control-flow library suited for the node event loop.",
"description" : "An async control-flow library suited for node.js.",
"keywords" : ["control-flow", "async"],

@@ -12,3 +12,3 @@ "author" : "Toshihiro Nakamura <toshihiro.nakamura@gmail.com>",

},
"version" : "0.0.7"
"version" : "0.0.8"
}
nue — An async control-flow library
===================================
nue is an async control-flow library suited for the node event loop.
nue is an async control-flow library suited for node.js.

@@ -20,4 +20,4 @@ ## Installing

function (file1, file2) {
fs.readFile(file1, 'utf-8', this.async());
fs.readFile(file2, 'utf-8', this.async());
fs.readFile(file1, 'utf8', this.async());
fs.readFile(file2, 'utf8', this.async());
},

@@ -28,2 +28,3 @@ function (data1, data2) {

function (data) {
if (this.err) throw this.err;
console.log(data);

@@ -41,3 +42,3 @@ console.log('done');

<a name="flow" />
### flow([Function tasks...]) -> Function
### flow([Function functions...]) -> Function

@@ -48,21 +49,115 @@ Return a function which represents the control-flow.

* `tasks`: Optional. Tasks which are executed in series.
* `functions`: Optional. Functions which are executed in series.
> Context
`this` context of the each task has following properties.
`this` context of each function in a flow has following properties.
* `next`: Function. A function to execute a next task.
* `async`: Function. A function to accept parameters for a next task and return a callback.
* `end`: Function. A function to execute a last task to end a control-flow. The first parameter is an error object.
* `queue`: Function. A function to create a serial queue object.
* `parallelQueue`: Function. A function to create a parallel queue object.
* `next`: Function. A function to execute a next function immediately.
* `async`: Function. A function to accept parameters for a next function and return a callback.
* `end`: Function. A function to execute a last function to end a control-flow. The first parameter is an error object.
* `data`: Object : A object to share arbitrary data among functions in a control-flow.
In addition to the above ones, the context of the last task has a following property.
In addition to the above ones, the context of the last function has a following property.
* `err`: Object. An object represents an error which passed from the `end` function.
## Flow Nesting
A flow can be nested.
```js
var flow = require('nue').flow;
var fs = require('fs');
var subFlow = flow(
function (file) {
fs.readFile(file, 'utf8', this.async());
}
);
var mainFlow = flow(
function () {
this.next('file1');
},
subFlow,
function (data) {
if (this.err) throw this.err;
console.log(data);
}
);
mainFlow();
```
## Arguments Passing Between Functions
arguments are passed with `this.next` or `this.async`.
```js
var flow = require('nue').flow;
var fs = require('fs');
var myFlow = flow(
function (file1, file2) {
fs.readFile(file1, 'utf8', this.async(file1));
fs.readFile(file2, 'utf8', this.async(file2));
},
function (file1, data1, file2, data2) {
console.log(file1 + ' and ' + file2 + ' have been red.');
this.next(data1 + data2);
},
function (data) {
if (this.err) throw this.err;
console.log(data);
this.next(data);
}
);
myFlow('file1', 'file2');
```
`this.async` can be called in loop.
Below example produces same results with above example.
```js
var flow = require('nue').flow;
var fs = require('fs');
var myFlow = flow(
function (files) {
files.forEach(function (file) {
fs.readFile(file, 'utf8', this.async(file));
}.bind(this));
},
function () {
var args = Array.prototype.slice.call(arguments);
var files = [];
var data = '';
args.forEach(function (arg, i) {
if (i % 2 === 0) {
files.push(arg);
} else {
data += arg;
}
});
console.log(files.join(' and ') + ' have been red.');
this.next(data);
},
function (data) {
if (this.err) throw this.err;
console.log(data);
this.next(data);
}
);
myFlow(['file1', 'file2']);
```
## Data Sharing Among Functions
Each function in a flow can share data through `this.data`.
`this.data` is shared in a same flow.
A nesting flow and any nested flows can't share `this.data`.
```js

@@ -76,4 +171,4 @@ var flow = require('nue').flow;

this.data.file2 = file2;
fs.readFile(file1, 'utf-8', this.async());
fs.readFile(file2, 'utf-8', this.async());
fs.readFile(file1, 'utf8', this.async());
fs.readFile(file2, 'utf8', this.async());
},

@@ -84,2 +179,3 @@ function (data1, data2) {

function (data) {
if (this.err) throw this.err;
console.log(data);

@@ -96,2 +192,5 @@ console.log(this.data.file1 ' and ' + this.data.file2 ' are concatenated.');

In a last function in a flow, `this.err` represents an error thrown in the flow.
To indicate error handling completion, you must assign `null` to `this.err`.
```js

@@ -103,4 +202,4 @@ var flow = require('nue').flow;

function (file1, file2) {
fs.readFile(file1, 'utf-8', this.async());
fs.readFile(file2, 'utf-8', this.async());
fs.readFile(file1, 'utf8', this.async());
fs.readFile(file2, 'utf8', this.async());
},

@@ -107,0 +206,0 @@ function (data1, data2) {

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc