Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

breeze

Package Overview
Dependencies
Maintainers
2
Versions
20
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

breeze - npm Package Compare versions

Comparing version 0.4.0 to 1.0.0

readme.md

899

breeze.js

@@ -1,778 +0,175 @@

!function (name, context, definition) {
if (typeof require === "function" && typeof exports === "object" && typeof module === "object")
module.exports = definition(name, context);
else if (typeof define === 'function' && typeof define.amd === 'object') define(definition);
else context[name] = definition(name, context);
}('breeze', this, function (name, context) {
/**
* Initializes breeze class
*/
function Breeze () {
this.steps = []
}
var module = {};
/*!
* Breeze Async Control Utility
* Copyright(c) 2012 Jake Luer <jake@alogicalparadox.com>
* MIT Licensed
*/
/**
* Takes the next (optional) callback, and generates the done callback to be passed as the first argument.
*
* @param {Function} next Next callback in the system to be invoked.
* @return {Function} Completion callback, first argument is error, subsequent arguments are passed down the chain.
*/
Breeze.prototype.createDoneCallback = function _breezeCreateDoneCallback (next) {
var system = this
/*!
* Helpers
*/
return function (err) {
var args = Array.prototype.slice.call(arguments, 1)
var context
var noop = function () {}
, slice = Array.prototype.slice;
// Short circuit the system
if (err) {
if (!this.onError) {
system.err = err
} else {
system.onError(err)
}
/*!
* Define primary export
*/
system.steps = []
return system.check()
}
var exports = module.exports = {};
/*!
* Breeze version
*/
exports.version = '0.4.0';
/**
* ### .nextTick (fn)
*
* Cross-compatible `nextTick` implementation. Uses
* `process.nextTick` for node and `setTimeout(fn, 0)`
* for the browser.
*
* @param {Function} callback
* @name nextTick
* @api public
*/
exports.nextTick = ('undefined' === typeof process || !process.nextTick)
? function (fn) { setTimeout(fn, 0); }
: process.nextTick;
/**
* ### forEach (array, iterator, done)
*
* Apply an iterator to each item in an array
* in parellel. Execute a callback when all items
* have been completed or immediately if there is
* an error provided.
*
* @param {Array} array to iterate
* @param {Function} iterator function
* @param {Function} callback on complete or error
* @cb {Error|null} if error
* @name forEach
* @api public
*/
exports.forEach = function (arr, iterator, cb) {
cb = cb || noop;
if (!arr.length) return cb();
var count = arr.length;
for (var i = 0, l = arr.length; i < l; i++) {
iterator(arr[i], function next (err) {
if (err) return cb(err);
--count || cb(null);
});
// Continue
if (next) {
args.push(system.createDoneCallback(this.steps.shift()))
next.apply(this, args)
} else {
system.args = args
system.context = this
}
};
/**
* ### forEachSeries (array, iterator, done)
*
* Apply an iterator to each item in an array
* serially. Execute a callback when all items
* have been completed or immediately if there is
* is an error provided.
*
* @param {Array} array to iterate
* @param {Function} iterator function
* @param {Function} callback on complete or error
* @cb {Error|null} if error
* @name forEachSeries
* @api public
*/
system.check()
}
}
exports.forEachSeries = function (arr, iterator, cb) {
cb = cb || noop;
if (!arr.length) return cb();
function iterate (i) {
if (i == arr.length) return cb();
iterator(arr[i], function next (err) {
if (err) return cb(err);
iterate(++i);
});
}
iterate(0);
};
/**
* Checks whether the system is running, needs to be ran, or has completed
* running.
*
* @return {void}
*/
Breeze.prototype.check = function _breezeCheck () {
var func
/**
* ### parallel (fns, done)
*
* Execute a collection of functions in parellel
* and execute a callback upon completion or occurance
* of an error. Functions can be provided as either
* an array or an object. Each function will be passed
* a callback to signal completion. The callback accepts
* either an error for the first argument, or null for the
* first argument and results following. The results will be
* provied as the second argument of the callback in-kind,
* maintaining the order of the input array or the keys
* of the input object.
*
* @param {Array|Object} functions to execute
* @param {Function} callback on completion or error
* @cb {Error|null} if error
* @cb {Array|Object} reflecting the results
* @name parellel
* @api public
*/
args = this.args || []
exports.parallel = function (tasks, cb) {
cb = cb || noop;
var keys = prepareKeys(tasks)
, res = Array.isArray(tasks) ? Array(tasks.length) : {}
, iterator = prepareIterator(tasks, keys, res);
if (!keys.length) return cb();
exports.forEach(keys, iterator, function (err) {
if (err) return cb(err);
cb(null, res);
});
};
if (!this.running && this.steps.length) {
this.running = true
func = this.steps.shift()
args.unshift(this.createDoneCallback(this.steps.shift()))
func.apply(this.context, args)
} else if (this.running && !this.steps.length) {
this.running = false
}
}
/**
* ### series (fns, done)
*
* Execute a collection of functions serially
* and execute a callback upon completion or occurance
* of an error. Functions can be provided as either
* an array or an object. Each function will be passed
* a callback to signal completion. The callback accepts
* either an error for the first argument, or null for the
* first argument and results following. The results will be
* provied as the second argument of the callback in-kind,
* maintaining the order of the input array or the keys
* of the input object.
*
* @param {Array|Object} functions to execute
* @param {Function} callback on completion or error
* @cb {Error|null} if error
* @cb {Array|Object} reflecting the results
* @name series
* @api public
*/
/**
* Adds callback to the system stack when first argument passed is of a
* truthy status.
*
* @param {Boolean} arg Argument to be evaluated
* @param {Function} next Callback to be pushed onto the stack
* @return {this}
*/
Breeze.prototype.when = Breeze.prototype.maybe = function _breezeMaybeWhen (arg, next) {
if (arg) {
this.hasMaybeHappened = true
this.steps.push(next)
this.check()
}
exports.series = function (tasks, cb) {
cb = cb || noop;
var keys = prepareKeys(tasks)
, res = Array.isArray(tasks) ? Array(tasks.length) : {}
, iterator = prepareIterator(tasks, keys, res);
if (!keys.length) return cb();
exports.forEachSeries(keys, iterator, function (err) {
if (err) return cb(err);
cb(null, res);
});
};
return this
}
/*!
* parseArgs (args
*
* Figure out how to return the a set of arguments
* as part of a parallel or series set. If length
* is more than one, return as item, else return
* as an array.
*
* @param {Array} arguments to parse
* @returns {Mixed} arguments
* @api private
*/
/**
* Adds callback to the system when first argument is evaluated as true, and
* no none calls have been invoked.
*
* @param {Boolean} arg Argument to be evaluated
* @param {Function} next Callback to be pushed onto stack
* @return {this}
*/
Breeze.prototype.some = function _breezeSome (arg, next) {
this.hasSome = true
function parseArgs (args) {
if (args.length === 0) return null;
return args.length === 1
? args[0]
: args;
if (arg && !this.hasNoneHappened) {
this.hasSomeHappened = true
this.steps.push(next)
this.check()
}
return this
}
/*!
* parseKeys (tasks)
*
* For a parallel or series set, determine the context
* of the request as array or object, and return the
* keys for that context. If working with an array,
* create an array of indexes to lookup when passed
* to the iterator.
*
* @param {Array|Object} tasks
* @returns {Array} keys
* @api private
*/
function prepareKeys (tasks) {
if (Array.isArray(tasks)) {
var keys = [];
for (var i = 0, l = tasks.length; i < l; i++)
keys.push(i);
return keys;
} else {
return Object.keys(tasks);
}
/**
* Adds callback to the system if no some callback was triggered.
*
* @param {Function} next Callback to be pushed onto stack
* @return {this}
*/
Breeze.prototype.none = function _breezeNone (next) {
if (!this.hasSomeHappened && this.hasSome) {
this.hasNoneHappened = true
this.steps.push(next)
this.check()
}
/*!
* prepareIterator (tasks, keys, response)
*
* Create a function to use as an iterator for
* a parallel or series execution. Handles
* writing of the results to the result set
* so that it can be passed back in the final
* callback.
*
* @param {Mixed} array or object of tasks
* @param {Array} keys to use to get current task
* @param {Mixed} res object to write to
* @returns {Function} iterator
* @api private
*/
function prepareIterator (tasks, keys, res) {
return function (key, next) {
var fn = tasks[key];
fn(function done () {
var err = arguments[0];
if (err) return next(err);
var args = slice.call(arguments, 1)
, arg = parseArgs(args);
res[key] = arg;
next();
});
}
if (!this.hasSome) {
throw new Error("Cannot add none callback before some callback")
}
/**
* ### .atomic ()
*
* Provides atomic functions. Each serial set is
* placed in a queue based on a key. All functions within
* a single key are executed serially. Useful if you are
* trying to perform asyncronous operations as a series but
* with multiple data structures.
*
* var atomic = breeze.atomic();
* for (var i = 0; i < 5; i++) {
* atomic('i', function (done, key, n) {
* // key == 'i';
* // n == current i
* done();
* }, i);
* }
*
* @name atomic
* @api public
*/
return this
}
exports.atomic = function () {
var handler = {};
/**
* Adds callback to the system
*
* @param {Function} next Callback to be pushed onto the stack
* @return {this}
*/
Breeze.prototype.then = function _breezeThen (next) {
this.steps.push(next)
this.check()
return this
}
function iterator (task, next) {
task.args.unshift(function () {
next();
}, task.key);
task.fn.apply(this, task.args);
}
function makeQueue (key) {
var queue = exports.queue(iterator, 1);
queue.drain = function () {
delete handler[key];
};
return queue;
}
return function atomic (key, fn) {
var task = { key: key, fn: fn, args: slice.call(arguments, 2) }
, list = handler[key] || (handler[key] = makeQueue(key));
list.push(task, null, true);
};
};
/**
* ### queue (iterator, concurrency)
*
* The queue mechanism allows for a any number of
* data objects to be processed by an iterator
* when they become available. The queue will processes
* items in parellel, up to a given concurrently value,
* then will wait until an item has finished until
* beginning to process the next. The items queued
* can have a callback executed when it has completed
* its iterator. In addition, a queue may also have
* functions attached to listen for specific events.
* On such event is an error. Should any item in the queue
* fail to process and provide an error to it's `done`
* callback, no further items will be processed.
*
* @param {Function} iterator
* @param {Number} concurrency (defaults to 10)
* @returns {Object} queue
* @name queue
* @api public
*/
exports.queue = function (iterator, concurrency) {
var queue = new Queue(iterator, concurrency);
return queue;
};
/*!
* Queue
*
* The Queue constructor will hold all of the necissary
* settings to correctly execute a queue.
*
* @param {Function} iterator
* @param {Number} concurrency (defaults to 10)
* @api private
*/
function Queue (iterator, concurrency) {
this._iterator = iterator;
this._concurrency = concurrency || 10;
this._tasks = [];
this._err = false;
this.workers = 0;
/**
* Checks whether system has an err and invokes callback, or saves callback for later
* invocation.
*
* @param {Function} next Callback to be invoked should err exist
* @return {this}
*/
Breeze.prototype.catch = function _breezeCatch (next) {
if (this.err) {
next(this.err)
return this
}
/**
* #### .length
*
* Property indicating the number of items current
* in the queue. An item is removed from this list
* prior to being processed.
*
* @returns {Number} count of queued items
* @api public
*/
this.onError = next
this.check()
return this
}
Object.defineProperty(Queue.prototype, 'length',
{ get: function () {
return this._tasks.length;
}
});
/**
* Resets system
*/
Breeze.prototype.reset = function _breezeReset () {
this.hasMaybeHappened = undefined
this.hasNoneHappened = undefined
this.hasSomeHappened = undefined
this.hasSome = undefined
this.onError = undefined
this.running = undefined
this.err = undefined
this.steps = []
/**
* #### .push (items[, callback[, autostart]])
*
* You can push an item or an array of items into
* the queue for processing. The callback will be
* called for the completion of each item if the queue
* has not entered into an error state. A `autostart`
* boolean my also be provided if you wish to start
* processing the queue with this push of items. If
* no pushes provide the autostart, then the queue
* must be started manually with `.process()`.
*
* Note that if the queue has already been started but
* has been drained of items, it will not start again
* with another push unless the `autostart` toggle is present.
*
* @param {Array} item or items to be added to the queue
* @param {Function} callback for completion of each item
* @param {Boolean} autostart process (defaults to false)
* @name push
* @api public
*/
return this
}
Queue.prototype.push = function (items, cb, start) {
if ('boolean' === typeof cb) start = cb, cb = noop;
if (!Array.isArray(items)) items = [ items ];
cb = cb || noop;
var cc = this._concurrency
, sat = this.saturated;
for (var i = 0, l = items.length; i < l; i ++) {
var task = items[i];
this._tasks.push({ task: task , cb: cb });
if (sat && this._tasks.length === cc) sat();
if (start) exports.nextTick(this.process.bind(this));
}
};
/**
* #### .process ()
*
* Begin the queue processing cycle.
*
* @name process
* @api public
*/
Queue.prototype.process = function () {
var self = this
, cc = this._concurrency
, iterator = this._iterator
if (this.workers < cc && this.length && !this._err) {
var task = this._tasks.shift();
if (this.empty && !this.length) this.empty();
this.workers++;
iterator(task.task, function next () {
self.workers--;
if (self._err) return;
var err = arguments[0];
if (err) self._err = true;
if (task.cb) task.cb.apply(task, arguments);
if (err && self.onerror) return self.onerror(err);
if (self.drain && self.length + self.workers === 0) self.drain();
self.process();
});
this.process();
}
};
/**
* #### .onerror
*
* Setting this to a function will provide a listener
* should an error occur. It will not be executed otherwise.
*
* @default null
* @expected {Function} on error callback
* @cb {Error} object that was passed as error during iteration
* @api public
*/
Queue.prototype.onerror = null;
/**
* #### .saturated
*
* This listener will be executed when the number of
* queued items exceeds the current concurrency value.
* This will be executed directly after the push of
* said items
*
* @default null
* @expected {Function}
* @api public
*/
Queue.prototype.saturated = null;
/**
* #### .empty
*
* This listener will be executed when the queue is empty.
* In other words, prior to the last item in the queue
* being processed.
*
* @default null
* @expected {Function}
* @api public
*/
Queue.prototype.empty = null;
/**
* #### .drain
*
* This listener will be executed when all queued
* items have been executed through the iterator.
*
* @default null
* @expected {Function}
* @api public
*/
Queue.prototype.drain = null;
/**
* .dag (edges, concurrency, iterator[, done])
*
* DAG, directed-acyclic-graph, is a graph of nodes in
* which there are no cyclic references, and therefor has
* a specific starting and ending point. The `dag` async
* method will take an array of edges and resolve a best
* fit path of execution. It will then iterate over each
* edge in parallel up to a set amount of threads (concurrency).
* Furthermore, an edge will not begin it's processing until
* all of its dependancies have indicated successful execution.
*
* A set of edges is defined as an array, with each element being
* an array of x, y pairs, where `x` must complete before `y`
* can begin.
*
* var edges = [
* [ 'a', 'b' ]
* , [ 'a', 'c' ]
* , [ 'd', 'e' ]
* , [ 'b', 'd' ]
* ];
*
* With the above edges, we expect `a` to start processing. Upon
* completion, `b` and `c` will start. Upon `b` completion, `d`
* will execute, then `e`.
*
* If there are cyclical references in the set of edges, the `done`
* callback will be immediately called with an error indicating
* the problem.
*
* breeze.dag(edges, 2, function (e, next) {
* setTimeout(function () {
* next(); // or next(err);
* }, 1000);
* }, function (err) {
* // our done callback
* });
*
* As with `queue`, if an error occurs the `done` callback will
* be executed immediately. No more items will begin processing,
* but items that have already started will run to completion.
*
* @param {Array} edges
* @param {Number} concurrency
* @param {Function} iterator
* @param {Function} onError or onComplete
* @name dag
* @api public
*/
exports.dag = function (edges, concurrency, iterator, cb) {
cb = cb || noop;
var sorted = tsort(edges)
if (sorted.error) return cb(sorted.error);
if (!sorted.path.length) return cb(null);
// helper: get edge with id
function selectEdge(id) {
return graph.filter(function (e) {
return e.id === id;
})[0];
}
// queue iterator
function action (e, next) {
iterator(e, function done (err) {
var edge = selectEdge(e);
edge.notify.forEach(function (n) {
var notify = selectEdge(n);
notify.fulfilled.push(e);
var wl = notify.waitfor.length
, fl = notify.fulfilled.length;
if (wl === fl) queue.push(n, null, true);
});
next(err);
});
}
// determine where to start
function bootstrap (e) {
e.fulfilled = [];
if (!e.waitfor.length) queue.push(e.id);
}
// begin
var graph = sorted.graph
, queue = new Queue(action, concurrency);
graph.forEach(bootstrap);
queue.onerror = cb;
queue.drain = cb;
queue.process();
};
/**
* ### .dagSeries (edges, iterator[, done])
*
* Similar to `dag`, but will not use concurrency. A best
* fit path of execution will be dermined and then executed
* serially.
*
* @param {Array} edges
* @param {Function} iterator
* @param {Function} onError or onComplete callback
* @name dagSeries
* @api public
*/
exports.dagSeries = function (edges, iterator, cb) {
cb = cb || noop;
var sorted = tsort(edges);
if (sorted.error) return cb(sorted.error);
exports.forEachSeries(sorted.path, function (edge, next) {
iterator(edge, next);
}, cb);
};
/*!
* tsort (edges)
*
* Topological sort utility.
*
* @param {Array} edges
* @returns {Object}
* @api private
*/
function tsort (edges) {
var nodes = {}
, sorted = []
, visited = {};
// node constructor
function N (id) {
this.id = id;
this.notify = [];
this.waitfor = [];
}
// parse edges into nodes
edges.forEach(function (v) {
var from = v[0]
, to = v[1];
if (!nodes[from]) nodes[from] = new N(from);
if (!nodes[to]) nodes[to] = new N(to);
if (!~nodes[to].waitfor.indexOf(from))
nodes[to].waitfor.push(from);
if (!~nodes[from].notify.indexOf(to))
nodes[from].notify.push(to);
});
// recursively visit nodes
function doVisit (idstr, ancestors) {
var node = nodes[idstr]
, id = node.id;
if (visited[idstr]) return;
if (!Array.isArray(ancestors))
ancestors = [];
ancestors.push(id);
visited[idstr] = true;
// deep recursive checking
node.notify.forEach(function (afterId) {
if (ancestors.indexOf(afterId) >= 0)
throw new Error(id + ' can not come before ' + afterId);
var aid = afterId.toString()
, anc = ancestors.map(function (v) { return v });
doVisit(aid, anc);
});
sorted.unshift(id);
}
function doFilter (s) {
return null !== s
&& 'undefined' !== typeof s;
}
// actually do our recursion
// TODO: without try catch
try { Object.keys(nodes).forEach(doVisit); }
catch (ex) { return { error: ex } };
// order our notify object
var critical = sorted.filter(doFilter)
, graph = [];
critical.forEach(function (v) {
var n = nodes[v];
n.notify = n.notify.filter(doFilter);
n.waitfor = n.waitfor.filter(doFilter);
graph.push(n);
});
return { path: critical, graph: graph };
};
/**
* .auto (tasks[, concurreny], callback)
*
* Determines a best-fit concurrency path of execution
* for a set of interdependant named tasks.
*
* Uses `.dag` under the hood.
*
* @param {Object} tasks
* @param {Number} concurrency
* @param {Function} callback
* @name auto
* @api public
*/
exports.auto = function (tasks, cc, cb) {
if ('function' == typeof cc) cb = cc, cc = 10;
cb = cb || noop;
var parsed = autoParseTasks(tasks)
, edges = parsed.edges
, fns = parsed.fns;
function iterator (edge, next) {
var fn = fns[edge];
fn(next);
}
exports.dag(edges, cc, iterator, cb);
};
/**
* .autoSeries (tasks, callback)
*
* Determines a best-fit concurrency path of execution
* for a set of interdependant named tasks.
*
* Uses `.dagSeries` under the hood.
*
* @param {Object} tasks
* @param {Number} concurrency
* @param {Function} callback
* @name auto
* @api public
*/
exports.autoSeries = function (tasks, cb) {
cb = cb || noop;
var parsed = autoParseTasks(tasks)
, edges = parsed.edges
, fns = parsed.fns;
function iterator (edge, next) {
var fn = fns[edge];
fn(next);
}
exports.dagSeries(edges, iterator, cb);
};
/*!
* autoParseTasks (tasks)
*
* Parse the object passed as `tasks` to auto
* or autoSeries. Returns an object indicating
* the graph and functions to call by name.
*
* @param {Object} tasks
* @api private
*/
function autoParseTasks (tasks) {
var edges = []
, fns = {};
for (var key in tasks) {
var task = tasks[key];
if (Array.isArray(task)) {
for (var i = 0; i < task.length; i++) {
var name = task[i];
if ('string' === typeof name) edges.push([ name, key ]);
else if ('function' == typeof name) fns[key] = name;
}
} else {
fns[key] = task;
edges.push([ null, key ]);
}
}
return { edges: edges, fns: fns };
}
return module.exports;
});
/**
* Create new instance of Breeze
*/
module.exports = function breeze () {
return new Breeze()
}
{
"author": "Jake Luer <jake@alogicalparadox.com> (http://alogicalparadox.com)",
"name": "breeze",
"description": "Async flow control utility.",
"version": "0.4.0",
"version": "1.0.0",
"description": "Functional async flow control library",
"main": "breeze.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"repository": {
"type": "git",
"url": "git://github.com/logicalparadox/breeze.git"
"url": "git+https://github.com/Nijikokun/breeze.git"
},
"main": "./index",
"scripts": {
"test": "make test"
"keywords": [
"flow",
"async",
"ocaml",
"functional",
"haskell",
"waterfall",
"try",
"catch"
],
"author": "Nijiko Yonskai",
"license": "MIT",
"bugs": {
"url": "https://github.com/Nijikokun/breeze/issues"
},
"dependencies": {},
"devDependencies": {
"mocha": "*"
, "chai": "*"
, "chai-spies": "*"
, "folio": "0.3.x"
},
"optionalDependencies": {},
"engines": {
"node": "*"
}
"homepage": "https://github.com/Nijikokun/breeze#readme"
}
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc