Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

breeze

Package Overview
Dependencies
Maintainers
1
Versions
20
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

breeze - npm Package Compare versions

Comparing version 0.3.0 to 0.4.0

1304

breeze.js

@@ -1,690 +0,778 @@

/*!
* breeze - async flow control
* Copyright(c) 2012 Jake Luer <jake@alogicalparadox.com>
* MIT Licensed
*/
!function (name, context, definition) {
if (typeof require === "function" && typeof exports === "object" && typeof module === "object")
module.exports = definition(name, context);
else if (typeof define === 'function' && typeof define.amd === 'object') define(definition);
else context[name] = definition(name, context);
}('breeze', this, function (name, context) {
!function (name, definition) {
if (typeof module != 'undefined') module.exports = definition();
else if (typeof define == 'function' && typeof define.amd == 'object') define(definition);
else this[name] = definition();
}('breeze', function () {
var module = {};
/*!
* Breeze Async Control Utility
* Copyright(c) 2012 Jake Luer <jake@alogicalparadox.com>
* MIT Licensed
*/
/*!
* Helpers
*/
3/*!
* Breeze Async Control Utility
* Copyright(c) 2012 Jake Luer <jake@alogicalparadox.com>
* MIT Licensed
*/
var noop = function () {}
, slice = Array.prototype.slice;
/*!
* Helpers
*/
/*!
* Define primary export
*/
var noop = function () {}
, slice = Array.prototype.slice;
var exports = module.exports = {};
/*!
* Define primary export
*/
/*!
* Breeze version
*/
var exports = module.exports = {};
exports.version = '0.4.0';
/*!
* Breeze version
*/
/**
* ### .nextTick (fn)
*
* Cross-compatible `nextTick` implementation. Uses
* `process.nextTick` for node and `setTimeout(fn, 0)`
* for the browser.
*
* @param {Function} callback
* @name nextTick
* @api public
*/
exports.version = '0.3.0';
exports.nextTick = ('undefined' === typeof process || !process.nextTick)
? function (fn) { setTimeout(fn, 0); }
: process.nextTick;
/**
* ### .nextTick (fn)
*
* Cross-compatible `nextTick` implementation. Uses
* `process.nextTick` for node and `setTimeout(fn, 0)`
* for the browser.
*
* @param {Function} callback
* @name nextTick
* @api public
*/
/**
* ### forEach (array, iterator, done)
*
* Apply an iterator to each item in an array
* in parellel. Execute a callback when all items
* have been completed or immediately if there is
* an error provided.
*
* @param {Array} array to iterate
* @param {Function} iterator function
* @param {Function} callback on complete or error
* @cb {Error|null} if error
* @name forEach
* @api public
*/
exports.nextTick = ('undefined' === typeof process || !process.nextTick)
? function (fn) { setTimeout(fn, 0); }
: process.nextTick;
exports.forEach = function (arr, iterator, cb) {
cb = cb || noop;
if (!arr.length) return cb();
var count = arr.length;
for (var i = 0, l = arr.length; i < l; i++) {
iterator(arr[i], function next (err) {
if (err) return cb(err);
--count || cb(null);
});
}
};
/**
* ### forEach (array, iterator, done)
*
* Apply an iterator to each item in an array
* in parellel. Execute a callback when all items
* have been completed or immediately if there is
* an error provided.
*
* @param {Array} array to iterate
* @param {Function} iterator function
* @param {Function} callback on complete or error
* @cb {Error|null} if error
* @name forEach
* @api public
*/
/**
* ### forEachSeries (array, iterator, done)
*
* Apply an iterator to each item in an array
* serially. Execute a callback when all items
* have been completed or immediately if there is
* is an error provided.
*
* @param {Array} array to iterate
* @param {Function} iterator function
* @param {Function} callback on complete or error
* @cb {Error|null} if error
* @name forEachSeries
* @api public
*/
exports.forEach = function (arr, iterator, cb) {
cb = cb || noop;
if (!arr.length) return cb();
var count = arr.length;
for (var i = 0, l = arr.length; i < l; i++) {
iterator(arr[i], function next (err) {
exports.forEachSeries = function (arr, iterator, cb) {
cb = cb || noop;
if (!arr.length) return cb();
function iterate (i) {
if (i == arr.length) return cb();
iterator(arr[i], function next (err) {
if (err) return cb(err);
iterate(++i);
});
}
iterate(0);
};
/**
* ### parallel (fns, done)
*
* Execute a collection of functions in parellel
* and execute a callback upon completion or occurance
* of an error. Functions can be provided as either
* an array or an object. Each function will be passed
* a callback to signal completion. The callback accepts
* either an error for the first argument, or null for the
* first argument and results following. The results will be
* provied as the second argument of the callback in-kind,
* maintaining the order of the input array or the keys
* of the input object.
*
* @param {Array|Object} functions to execute
* @param {Function} callback on completion or error
* @cb {Error|null} if error
* @cb {Array|Object} reflecting the results
* @name parellel
* @api public
*/
exports.parallel = function (tasks, cb) {
cb = cb || noop;
var keys = prepareKeys(tasks)
, res = Array.isArray(tasks) ? Array(tasks.length) : {}
, iterator = prepareIterator(tasks, keys, res);
if (!keys.length) return cb();
exports.forEach(keys, iterator, function (err) {
if (err) return cb(err);
--count || cb(null);
cb(null, res);
});
}
};
};
/**
* ### forEachSeries (array, iterator, done)
*
* Apply an iterator to each item in an array
* serially. Execute a callback when all items
* have been completed or immediately if there is
* is an error provided.
*
* @param {Array} array to iterate
* @param {Function} iterator function
* @param {Function} callback on complete or error
* @cb {Error|null} if error
* @name forEachSeries
* @api public
*/
/**
* ### series (fns, done)
*
* Execute a collection of functions serially
* and execute a callback upon completion or occurance
* of an error. Functions can be provided as either
* an array or an object. Each function will be passed
* a callback to signal completion. The callback accepts
* either an error for the first argument, or null for the
* first argument and results following. The results will be
* provied as the second argument of the callback in-kind,
* maintaining the order of the input array or the keys
* of the input object.
*
* @param {Array|Object} functions to execute
* @param {Function} callback on completion or error
* @cb {Error|null} if error
* @cb {Array|Object} reflecting the results
* @name series
* @api public
*/
exports.forEachSeries = function (arr, iterator, cb) {
cb = cb || noop;
if (!arr.length) return cb();
function iterate (i) {
if (i == arr.length) return cb();
iterator(arr[i], function next (err) {
exports.series = function (tasks, cb) {
cb = cb || noop;
var keys = prepareKeys(tasks)
, res = Array.isArray(tasks) ? Array(tasks.length) : {}
, iterator = prepareIterator(tasks, keys, res);
if (!keys.length) return cb();
exports.forEachSeries(keys, iterator, function (err) {
if (err) return cb(err);
iterate(++i);
cb(null, res);
});
};
/*!
* parseArgs (args
*
* Figure out how to return the a set of arguments
* as part of a parallel or series set. If length
* is more than one, return as item, else return
* as an array.
*
* @param {Array} arguments to parse
* @returns {Mixed} arguments
* @api private
*/
function parseArgs (args) {
if (args.length === 0) return null;
return args.length === 1
? args[0]
: args;
}
iterate(0);
};
/**
* ### parallel (fns, done)
*
* Execute a collection of functions in parellel
* and execute a callback upon completion or occurance
* of an error. Functions can be provided as either
* an array or an object. Each function will be passed
* a callback to signal completion. The callback accepts
* either an error for the first argument, or null for the
* first argument and results following. The results will be
* provied as the second argument of the callback in-kind,
* maintaining the order of the input array or the keys
* of the input object.
*
* @param {Array|Object} functions to execute
* @param {Function} callback on completion or error
* @cb {Error|null} if error
* @cb {Array|Object} reflecting the results
* @name parellel
* @api public
*/
/*!
* parseKeys (tasks)
*
* For a parallel or series set, determine the context
* of the request as array or object, and return the
* keys for that context. If working with an array,
* create an array of indexes to lookup when passed
* to the iterator.
*
* @param {Array|Object} tasks
* @returns {Array} keys
* @api private
*/
exports.parallel = function (tasks, cb) {
cb = cb || noop;
var keys = prepareKeys(tasks)
, res = Array.isArray(tasks) ? Array(tasks.length) : {}
, iterator = prepareIterator(tasks, keys, res);
if (!keys.length) return cb();
exports.forEach(keys, iterator, function (err) {
if (err) return cb(err);
cb(null, res);
});
};
function prepareKeys (tasks) {
if (Array.isArray(tasks)) {
var keys = [];
for (var i = 0, l = tasks.length; i < l; i++)
keys.push(i);
return keys;
} else {
return Object.keys(tasks);
}
}
/**
* ### series (fns, done)
*
* Execute a collection of functions serially
* and execute a callback upon completion or occurance
* of an error. Functions can be provided as either
* an array or an object. Each function will be passed
* a callback to signal completion. The callback accepts
* either an error for the first argument, or null for the
* first argument and results following. The results will be
* provied as the second argument of the callback in-kind,
* maintaining the order of the input array or the keys
* of the input object.
*
* @param {Array|Object} functions to execute
* @param {Function} callback on completion or error
* @cb {Error|null} if error
* @cb {Array|Object} reflecting the results
* @name series
* @api public
*/
/*!
* prepareIterator (tasks, keys, response)
*
* Create a function to use as an iterator for
* a parallel or series execution. Handles
* writing of the results to the result set
* so that it can be passed back in the final
* callback.
*
* @param {Mixed} array or object of tasks
* @param {Array} keys to use to get current task
* @param {Mixed} res object to write to
* @returns {Function} iterator
* @api private
*/
exports.series = function (tasks, cb) {
cb = cb || noop;
var keys = prepareKeys(tasks)
, res = Array.isArray(tasks) ? Array(tasks.length) : {}
, iterator = prepareIterator(tasks, keys, res);
if (!keys.length) return cb();
exports.forEachSeries(keys, iterator, function (err) {
if (err) return cb(err);
cb(null, res);
});
};
function prepareIterator (tasks, keys, res) {
return function (key, next) {
var fn = tasks[key];
fn(function done () {
var err = arguments[0];
if (err) return next(err);
var args = slice.call(arguments, 1)
, arg = parseArgs(args);
res[key] = arg;
next();
});
}
}
/*!
* parseArgs (args
*
* Figure out how to return the a set of arguments
* as part of a parallel or series set. If length
* is more than one, return as item, else return
* as an array.
*
* @param {Array} arguments to parse
* @returns {Mixed} arguments
* @api private
*/
/**
* ### .atomic ()
*
* Provides atomic functions. Each serial set is
* placed in a queue based on a key. All functions within
* a single key are executed serially. Useful if you are
* trying to perform asyncronous operations as a series but
* with multiple data structures.
*
* var atomic = breeze.atomic();
* for (var i = 0; i < 5; i++) {
* atomic('i', function (done, key, n) {
* // key == 'i';
* // n == current i
* done();
* }, i);
* }
*
* @name atomic
* @api public
*/
function parseArgs (args) {
if (args.length === 0) return null;
return args.length === 1
? args[0]
: args;
}
exports.atomic = function () {
var handler = {};
/*!
* parseKeys (tasks)
*
* For a parallel or series set, determine the context
* of the request as array or object, and return the
* keys for that context. If working with an array,
* create an array of indexes to lookup when passed
* to the iterator.
*
* @param {Array|Object} tasks
* @returns {Array} keys
* @api private
*/
function iterator (task, next) {
task.args.unshift(function () {
next();
}, task.key);
task.fn.apply(this, task.args);
}
function prepareKeys (tasks) {
if (Array.isArray(tasks)) {
var keys = [];
for (var i = 0, l = tasks.length; i < l; i++)
keys.push(i);
return keys;
} else {
return Object.keys(tasks);
}
}
function makeQueue (key) {
var queue = exports.queue(iterator, 1);
queue.drain = function () {
delete handler[key];
};
return queue;
}
/*!
* prepareIterator (tasks, keys, response)
*
* Create a function to use as an iterator for
* a parallel or series execution. Handles
* writing of the results to the result set
* so that it can be passed back in the final
* callback.
*
* @param {Mixed} array or object of tasks
* @param {Array} keys to use to get current task
* @param {Mixed} res object to write to
* @returns {Function} iterator
* @api private
*/
return function atomic (key, fn) {
var task = { key: key, fn: fn, args: slice.call(arguments, 2) }
, list = handler[key] || (handler[key] = makeQueue(key));
list.push(task, null, true);
};
};
function prepareIterator (tasks, keys, res) {
return function (key, next) {
var fn = tasks[key];
fn(function done () {
var err = arguments[0];
if (err) return next(err);
var args = slice.call(arguments, 1)
, arg = parseArgs(args);
res[key] = arg;
next();
});
}
}
/**
* ### queue (iterator, concurrency)
*
* The queue mechanism allows for a any number of
* data objects to be processed by an iterator
* when they become available. The queue will processes
* items in parellel, up to a given concurrently value,
* then will wait until an item has finished until
* beginning to process the next. The items queued
* can have a callback executed when it has completed
* its iterator. In addition, a queue may also have
* functions attached to listen for specific events.
* On such event is an error. Should any item in the queue
* fail to process and provide an error to it's `done`
* callback, no further items will be processed.
*
* @param {Function} iterator
* @param {Number} concurrency (defaults to 10)
* @returns {Object} queue
* @name queue
* @api public
*/
/**
* ### .atomic ()
*
* Provides atomic functions. Each serial set is
* placed in a queue based on a key. All functions within
* a single key are executed serially. Useful if you are
* trying to perform asyncronous operations as a series but
* with multiple data structures.
*
* var atomic = breeze.atomic();
* for (var i = 0; i < 5; i++) {
* atomic('i', function (done, key, n) {
* // key == 'i';
* // n == current i
* done();
* }, i);
* });
*
* @name atomic
* @api public
*/
exports.queue = function (iterator, concurrency) {
var queue = new Queue(iterator, concurrency);
return queue;
};
exports.atomic = function () {
var handler = {};
/*!
* Queue
*
* The Queue constructor will hold all of the necissary
* settings to correctly execute a queue.
*
* @param {Function} iterator
* @param {Number} concurrency (defaults to 10)
* @api private
*/
function iterator (task, next) {
task.args.unshift(function () {
next();
}, task.key);
task.fn.apply(this, task.args);
function Queue (iterator, concurrency) {
this._iterator = iterator;
this._concurrency = concurrency || 10;
this._tasks = [];
this._err = false;
this.workers = 0;
}
function makeQueue (key) {
var queue = exports.queue(iterator, 1);
queue.drain = function () {
delete handler[key];
};
return queue;
}
/**
* #### .length
*
* Property indicating the number of items current
* in the queue. An item is removed from this list
* prior to being processed.
*
* @returns {Number} count of queued items
* @api public
*/
return function atomic (key, fn) {
var task = { key: key, fn: fn, args: slice.call(arguments, 2) }
, list = handler[key] || (handler[key] = makeQueue(key));
list.push(task, null, true);
Object.defineProperty(Queue.prototype, 'length',
{ get: function () {
return this._tasks.length;
}
});
/**
* #### .push (items[, callback[, autostart]])
*
* You can push an item or an array of items into
* the queue for processing. The callback will be
* called for the completion of each item if the queue
* has not entered into an error state. A `autostart`
* boolean my also be provided if you wish to start
* processing the queue with this push of items. If
* no pushes provide the autostart, then the queue
* must be started manually with `.process()`.
*
* Note that if the queue has already been started but
* has been drained of items, it will not start again
* with another push unless the `autostart` toggle is present.
*
* @param {Array} item or items to be added to the queue
* @param {Function} callback for completion of each item
* @param {Boolean} autostart process (defaults to false)
* @name push
* @api public
*/
Queue.prototype.push = function (items, cb, start) {
if ('boolean' === typeof cb) start = cb, cb = noop;
if (!Array.isArray(items)) items = [ items ];
cb = cb || noop;
var cc = this._concurrency
, sat = this.saturated;
for (var i = 0, l = items.length; i < l; i ++) {
var task = items[i];
this._tasks.push({ task: task , cb: cb });
if (sat && this._tasks.length === cc) sat();
if (start) exports.nextTick(this.process.bind(this));
}
};
};
/**
* ### queue (iterator, concurrency)
*
* The queue mechanism allows for a any number of
* data objects to be processed by an iterator
* when they become available. The queue will processes
* items in parellel, up to a given concurrently value,
* then will wait until an item has finished until
* beginning to process the next. The items queued
* can have a callback executed when it has completed
* its iterator. In addition, a queue may also have
* functions attached to listen for specific events.
* On such event is an error. Should any item in the queue
* fail to process and provide an error to it's `done`
* callback, no further items will be processed.
*
* @param {Function} iterator
* @param {Number} concurrency (defaults to 10)
* @returns {Object} queue
* @name queue
* @api public
*/
/**
* #### .process ()
*
* Begin the queue processing cycle.
*
* @name process
* @api public
*/
exports.queue = function (iterator, concurrency) {
var queue = new Queue(iterator, concurrency);
return queue;
};
Queue.prototype.process = function () {
var self = this
, cc = this._concurrency
, iterator = this._iterator
if (this.workers < cc && this.length && !this._err) {
var task = this._tasks.shift();
if (this.empty && !this.length) this.empty();
this.workers++;
iterator(task.task, function next () {
self.workers--;
if (self._err) return;
var err = arguments[0];
if (err) self._err = true;
if (task.cb) task.cb.apply(task, arguments);
if (err && self.onerror) return self.onerror(err);
if (self.drain && self.length + self.workers === 0) self.drain();
self.process();
});
this.process();
}
};
/*!
* Queue
*
* The Queue constructor will hold all of the necissary
* settings to correctly execute a queue.
*
* @param {Function} iterator
* @param {Number} concurrency (defaults to 10)
* @api private
*/
/**
* #### .onerror
*
* Setting this to a function will provide a listener
* should an error occur. It will not be executed otherwise.
*
* @default null
* @expected {Function} on error callback
* @cb {Error} object that was passed as error during iteration
* @api public
*/
function Queue (iterator, concurrency) {
this._iterator = iterator;
this._concurrency = concurrency || 10;
this._tasks = [];
this._err = false;
this.workers = 0;
}
Queue.prototype.onerror = null;
/**
* #### .length
*
* Property indicating the number of items current
* in the queue. An item is removed from this list
* prior to being processed.
*
* @returns {Number} count of queued items
* @api public
*/
/**
* #### .saturated
*
* This listener will be executed when the number of
* queued items exceeds the current concurrency value.
* This will be executed directly after the push of
* said items
*
* @default null
* @expected {Function}
* @api public
*/
Object.defineProperty(Queue.prototype, 'length',
{ get: function () {
return this._tasks.length;
}
});
Queue.prototype.saturated = null;
/**
* #### .push (items, callback, autostart)
*
* You can push an item or an array of items into
* the queue for processing. The callback will be
* called for the completion of each item if the queue
* has not entered into an error state. A `autostart`
* boolean my also be provided if you wish to start
* processing the queue with this push of items. If
* no pushes provide the autostart, then the queue
* must be started manually with `.process()`.
*
* Note, that if the queue has already been started but
* has been drained of items, it will not start again
* with another push unless the `autostart` toggle is present.
*
* @param {Array} item or items to be added to the queue
* @param {Function} callback for completion of each item
* @param {Boolean} autostart process (defaults to false)
* @name push
* @api public
*/
/**
* #### .empty
*
* This listener will be executed when the queue is empty.
* In other words, prior to the last item in the queue
* being processed.
*
* @default null
* @expected {Function}
* @api public
*/
Queue.prototype.push = function (items, cb, start) {
cb = cb || noop;
if (!Array.isArray(items)) items = [ items ];
var cc = this._concurrency
, sat = this.saturated;
for (var i = 0, l = items.length; i < l; i ++) {
var task = items[i];
this._tasks.push({ task: task , cb: cb });
if (sat && this._tasks.length === cc) sat();
if (start) exports.nextTick(this.process.bind(this));
}
};
Queue.prototype.empty = null;
/**
* #### .process ()
*
* Begin the queue processing cycle.
*
* @name process
* @api public
*/
/**
* #### .drain
*
* This listener will be executed when all queued
* items have been executed through the iterator.
*
* @default null
* @expected {Function}
* @api public
*/
Queue.prototype.process = function () {
var self = this
, cc = this._concurrency
, iterator = this._iterator
if (this.workers < cc && this.length && !this._err) {
var task = this._tasks.shift();
if (this.empty && !this.length) this.empty();
this.workers++;
iterator(task.task, function next () {
self.workers--;
if (self._err) return;
var err = arguments[0];
if (err) self._err = true;
if (task.cb) task.cb.apply(task, arguments);
if (err && self.onerror) return self.onerror(err);
if (self.drain && self.length + self.workers === 0) self.drain();
self.process();
});
this.process();
}
};
Queue.prototype.drain = null;
/**
* #### .onerror
*
* Setting this to a function will provide a listener
* should an error occur. It will not be executed otherwise.
*
* @default null
* @expected {Function} on error callback
* @cb {Error} object that was passed as error during iteration
* @api public
*/
/**
* .dag (edges, concurrency, iterator[, done])
*
* DAG, directed-acyclic-graph, is a graph of nodes in
* which there are no cyclic references, and therefor has
* a specific starting and ending point. The `dag` async
* method will take an array of edges and resolve a best
* fit path of execution. It will then iterate over each
* edge in parallel up to a set amount of threads (concurrency).
* Furthermore, an edge will not begin it's processing until
* all of its dependancies have indicated successful execution.
*
* A set of edges is defined as an array, with each element being
* an array of x, y pairs, where `x` must complete before `y`
* can begin.
*
* var edges = [
* [ 'a', 'b' ]
* , [ 'a', 'c' ]
* , [ 'd', 'e' ]
* , [ 'b', 'd' ]
* ];
*
* With the above edges, we expect `a` to start processing. Upon
* completion, `b` and `c` will start. Upon `b` completion, `d`
* will execute, then `e`.
*
* If there are cyclical references in the set of edges, the `done`
* callback will be immediately called with an error indicating
* the problem.
*
* breeze.dag(edges, 2, function (e, next) {
* setTimeout(function () {
* next(); // or next(err);
* }, 1000);
* }, function (err) {
* // our done callback
* });
*
* As with `queue`, if an error occurs the `done` callback will
* be executed immediately. No more items will begin processing,
* but items that have already started will run to completion.
*
* @param {Array} edges
* @param {Number} concurrency
* @param {Function} iterator
* @param {Function} onError or onComplete
* @name dag
* @api public
*/
Queue.prototype.onerror = null;
exports.dag = function (edges, concurrency, iterator, cb) {
cb = cb || noop;
var sorted = tsort(edges)
if (sorted.error) return cb(sorted.error);
if (!sorted.path.length) return cb(null);
/**
* #### .saturated
*
* This listener will be executed when the number of
* queued items exceeds the current concurrency value.
* This will be executed directly after the push of
* said items
*
* @default null
* @expected {Function}
* @api public
*/
// helper: get edge with id
function selectEdge(id) {
return graph.filter(function (e) {
return e.id === id;
})[0];
}
Queue.prototype.saturated = null;
// queue iterator
function action (e, next) {
iterator(e, function done (err) {
var edge = selectEdge(e);
edge.notify.forEach(function (n) {
var notify = selectEdge(n);
notify.fulfilled.push(e);
var wl = notify.waitfor.length
, fl = notify.fulfilled.length;
if (wl === fl) queue.push(n, null, true);
});
next(err);
});
}
/**
* #### .empty
*
* This listener will be executed when the queue is empty.
* In other words, prior to the last item in the queue
* being processed.
*
* @default null
* @expected {Function}
* @api public
*/
// determine where to start
function bootstrap (e) {
e.fulfilled = [];
if (!e.waitfor.length) queue.push(e.id);
}
Queue.prototype.empty = null;
// begin
var graph = sorted.graph
, queue = new Queue(action, concurrency);
graph.forEach(bootstrap);
queue.onerror = cb;
queue.drain = cb;
queue.process();
};
/**
* #### .drain
*
* This listener will be executed when all queued
* items have been executed through the iterator.
*
* @default null
* @expected {Function}
* @api public
*/
/**
* ### .dagSeries (edges, iterator[, done])
*
* Similar to `dag`, but will not use concurrency. A best
* fit path of execution will be dermined and then executed
* serially.
*
* @param {Array} edges
* @param {Function} iterator
* @param {Function} onError or onComplete callback
* @name dagSeries
* @api public
*/
Queue.prototype.drain = null;
exports.dagSeries = function (edges, iterator, cb) {
cb = cb || noop;
var sorted = tsort(edges);
if (sorted.error) return cb(sorted.error);
exports.forEachSeries(sorted.path, function (edge, next) {
iterator(edge, next);
}, cb);
};
/**
* .dag (edges, concurrency, iterator[, done])
*
* DAG, directed-acyclic-graph, is a graph of nodes in
* which there are no cyclic references, and therefor has
* a specific starting and ending point. The `dag` async
* method will take an array of edges and resolve a best
* fit path of execution. It will then iterate over each
* edge in parallel up to a set amount of threads (concurrency).
* Furthermore, an edge will not begin it's processing until
* all of its dependancies have indicated successful execution.
*
* A set of edges is defined as an array, with each element being
* an array of x, y pairs, where `x` must complete before `y`
* can begin.
*
* var edges = [
* [ 'a', 'b' ]
* , [ 'a', 'c' ]
* , [ 'd', 'e' ]
* , [ 'b', 'd' ]
* ];
*
* With the above edges, we expect `a` to start processing. Upon
* completion, `b` and `c` will start. Upon `b` completion, `d`
* will execute, then `e`.
*
* If there are cyclical references in the set of edges, the `done`
* callback will be immediately called with an error indicating
* the problem.
*
* breeze.dag(edges, 2, function (e, next) {
* setTimeout(function () {
* next(); // or next(err);
* }, 1000);
* }, function (err) {
* // our done callback
* });
*
* As with `queue`, if an error occurs the `done` callback will
* be executed immediately. No more items will begin processing,
* but items that have already started will run to completion.
*
* @param {Array} edges
* @param {Number} concurrency
* @param {Function} iterator
* @param {Function} onError or onComplete
* @name dag
* @api public
*/
/*!
* tsort (edges)
*
* Topological sort utility.
*
* @param {Array} edges
* @returns {Object}
* @api private
*/
exports.dag = function (edges, concurrency, iterator, cb) {
cb = cb || noop;
var sorted = tsort(edges)
if (sorted.error) return cb(sorted.error);
if (!sorted.path.length) return cb(null);
function tsort (edges) {
var nodes = {}
, sorted = []
, visited = {};
// helper: get edge with id
function selectEdge(id) {
return graph.filter(function (e) {
return e.id === id;
})[0];
}
// node constructor
function N (id) {
this.id = id;
this.notify = [];
this.waitfor = [];
}
// queue iterator
function action (e, next) {
iterator(e, function done (err) {
var edge = selectEdge(e);
edge.notify.forEach(function (n) {
var notify = selectEdge(n);
notify.fulfilled.push(e);
var wl = notify.waitfor.length
, fl = notify.fulfilled.length;
if (wl === fl) queue.push(n, null, true);
});
next(err);
// parse edges into nodes
edges.forEach(function (v) {
var from = v[0]
, to = v[1];
if (!nodes[from]) nodes[from] = new N(from);
if (!nodes[to]) nodes[to] = new N(to);
if (!~nodes[to].waitfor.indexOf(from))
nodes[to].waitfor.push(from);
if (!~nodes[from].notify.indexOf(to))
nodes[from].notify.push(to);
});
}
// determine where to start
function bootstrap (e) {
e.fulfilled = [];
if (!e.waitfor.length) queue.push(e.id);
}
// recursively visit nodes
function doVisit (idstr, ancestors) {
var node = nodes[idstr]
, id = node.id;
// begin
var graph = sorted.graph
, queue = new Queue(action, concurrency);
graph.forEach(bootstrap);
queue.onerror = cb;
queue.drain = cb;
queue.process();
};
if (visited[idstr]) return;
if (!Array.isArray(ancestors))
ancestors = [];
/**
* ### .dagSeries (edges, iterator[, done])
*
* Similar to `dag`, but will not use concurrency. A best
* fit path of execution will be dermined and then executed
* serially.
*
* @param {Array} edges
* @param {Function} iterator
* @param {Function} onError or onComplete callback
* @name dagSeries
* @api public
*/
ancestors.push(id);
visited[idstr] = true;
exports.dagSeries = function (edges, iterator, cb) {
cb = cb || noop;
var sorted = tsort(edges);
if (sorted.error) return cb(sorted.error);
exports.forEachSeries(sorted.path, function (edge, next) {
iterator(edge, next);
}, cb);
};
// deep recursive checking
node.notify.forEach(function (afterId) {
if (ancestors.indexOf(afterId) >= 0)
throw new Error(id + ' can not come before ' + afterId);
var aid = afterId.toString()
, anc = ancestors.map(function (v) { return v });
doVisit(aid, anc);
});
/*!
* tsort (edges)
*
* Topological sort utility.
*
* @param {Array} edges
* @returns {Object}
* @api private
*/
sorted.unshift(id);
}
function tsort (edges) {
var nodes = {}
, sorted = []
, visited = {};
function doFilter (s) {
return null !== s
&& 'undefined' !== typeof s;
}
// node constructor
function N (id) {
this.id = id;
this.notify = [];
this.waitfor = [];
}
// actually do our recursion
// TODO: without try catch
try { Object.keys(nodes).forEach(doVisit); }
catch (ex) { return { error: ex } };
// parse edges into nodes
edges.forEach(function (v) {
var from = v[0]
, to = v[1];
if (!nodes[from]) nodes[from] = new N(from);
if (!nodes[to]) nodes[to] = new N(to);
if (!~nodes[to].waitfor.indexOf(from))
nodes[to].waitfor.push(from);
if (!~nodes[from].notify.indexOf(to))
nodes[from].notify.push(to);
});
// order our notify object
var critical = sorted.filter(doFilter)
, graph = [];
critical.forEach(function (v) {
var n = nodes[v];
n.notify = n.notify.filter(doFilter);
n.waitfor = n.waitfor.filter(doFilter);
graph.push(n);
});
// recursively visit nodes
function doVisit (idstr, ancestors) {
var node = nodes[idstr]
, id = node.id;
return { path: critical, graph: graph };
};
if (visited[idstr]) return;
if (!Array.isArray(ancestors))
ancestors = [];
/**
* .auto (tasks[, concurreny], callback)
*
* Determines a best-fit concurrency path of execution
* for a set of interdependant named tasks.
*
* Uses `.dag` under the hood.
*
* @param {Object} tasks
* @param {Number} concurrency
* @param {Function} callback
* @name auto
* @api public
*/
ancestors.push(id);
visited[idstr] = true;
exports.auto = function (tasks, cc, cb) {
if ('function' == typeof cc) cb = cc, cc = 10;
cb = cb || noop;
// deep recursive checking
node.notify.forEach(function (afterId) {
if (ancestors.indexOf(afterId) >= 0)
throw new Error(id + ' can not come before ' + afterId);
var aid = afterId.toString()
, anc = ancestors.map(function (v) { return v });
doVisit(aid, anc);
});
var parsed = autoParseTasks(tasks)
, edges = parsed.edges
, fns = parsed.fns;
sorted.unshift(id);
}
function iterator (edge, next) {
var fn = fns[edge];
fn(next);
}
function doFilter (s) {
return null !== s
&& 'undefined' !== typeof s;
}
exports.dag(edges, cc, iterator, cb);
};
// actually do our recursion
// TODO: without try catch
try { Object.keys(nodes).forEach(doVisit); }
catch (ex) { return { error: ex } };
/**
* .autoSeries (tasks, callback)
*
* Determines a best-fit concurrency path of execution
* for a set of interdependant named tasks.
*
* Uses `.dagSeries` under the hood.
*
* @param {Object} tasks
* @param {Number} concurrency
* @param {Function} callback
* @name auto
* @api public
*/
// order our notify object
var critical = sorted.filter(doFilter)
, graph = [];
critical.forEach(function (v) {
var n = nodes[v];
n.notify = n.notify.filter(doFilter);
n.waitfor = n.waitfor.filter(doFilter);
graph.push(n);
});
exports.autoSeries = function (tasks, cb) {
cb = cb || noop;
return { path: critical, graph: graph };
};
var parsed = autoParseTasks(tasks)
, edges = parsed.edges
, fns = parsed.fns;
function iterator (edge, next) {
var fn = fns[edge];
fn(next);
}
exports.dagSeries(edges, iterator, cb);
};
/*!
* autoParseTasks (tasks)
*
* Parse the object passed as `tasks` to auto
* or autoSeries. Returns an object indicating
* the graph and functions to call by name.
*
* @param {Object} tasks
* @api private
*/
function autoParseTasks (tasks) {
var edges = []
, fns = {};
for (var key in tasks) {
var task = tasks[key];
if (Array.isArray(task)) {
for (var i = 0; i < task.length; i++) {
var name = task[i];
if ('string' === typeof name) edges.push([ name, key ]);
else if ('function' == typeof name) fns[key] = name;
}
} else {
fns[key] = task;
edges.push([ null, key ]);
}
}
return { edges: edges, fns: fns };
}
return module.exports;
});
/*!
* breeze - async flow control
* Copyright(c) 2012 Jake Luer <jake@alogicalparadox.com>
* Breeze - Async Flow Control
* Copyright (c) 2012 Jake Luer <jake@alogicalparadox.com>
* MIT Licensed
*
* @website https://github.com/logicalparadox/breeze/
*/
!function(a,b){typeof module!="undefined"?module.exports=b():typeof define=="function"&&typeof define.amd=="object"?define(b):this[a]=b()}("breeze",function(){function e(a){return a.length===0?null:a.length===1?a[0]:a}function f(a){if(Array.isArray(a)){var b=[];for(var c=0,d=a.length;c<d;c++)b.push(c);return b}return Object.keys(a)}function g(a,b,d){return function(b,f){var g=a[b];g(function(){var g=arguments[0];if(g)return f(g);var h=c.call(arguments,1),i=e(h);d[b]=i,f()})}}function h(a,b){this._iterator=a,this._concurrency=b||10,this._tasks=[],this._err=!1,this.workers=0}function i(a){function e(a){this.id=a,this.notify=[],this.waitfor=[]}function f(a,e){var g=b[a],h=g.id;if(d[a])return;Array.isArray(e)||(e=[]),e.push(h),d[a]=!0,g.notify.forEach(function(a){if(e.indexOf(a)>=0)throw new Error(h+" can not come before "+a);var b=a.toString(),c=e.map(function(a){return a});f(b,c)}),c.unshift(h)}function g(a){return null!==a&&"undefined"!=typeof a}var b={},c=[],d={};a.forEach(function(a){var c=a[0],d=a[1];b[c]||(b[c]=new e(c)),b[d]||(b[d]=new e(d)),~b[d].waitfor.indexOf(c)||b[d].waitfor.push(c),~b[c].notify.indexOf(d)||b[c].notify.push(d)});try{Object.keys(b).forEach(f)}catch(h){return{error:h}}var i=c.filter(g),j=[];return i.forEach(function(a){var c=b[a];c.notify=c.notify.filter(g),c.waitfor=c.waitfor.filter(g),j.push(c)}),{path:i,graph:j}}var a={};3;var b=function(){},c=Array.prototype.slice,d=a.exports={};return d.version="0.3.0",d.nextTick="undefined"==typeof process||!process.nextTick?function(a){setTimeout(a,0)}:process.nextTick,d.forEach=function(a,c,d){d=d||b;if(!a.length)return d();var e=a.length;for(var f=0,g=a.length;f<g;f++)c(a[f],function(b){if(b)return d(b);--e||d(null)})},d.forEachSeries=function(a,c,d){function e(b){if(b==a.length)return d();c(a[b],function(c){if(c)return d(c);e(++b)})}d=d||b;if(!a.length)return d();e(0)},d.parallel=function(a,c){c=c||b;var e=f(a),h=Array.isArray(a)?Array(a.length):{},i=g(a,e,h);if(!e.length)return c();d.forEach(e,i,function(a){if(a)return c(a);c(null,h)})},d.series=function(a,c){c=c||b;var e=f(a),h=Array.isArray(a)?Array(a.length):{},i=g(a,e,h);if(!e.length)return c();d.forEachSeries(e,i,function(a){if(a)return c(a);c(null,h)})},d.atomic=function(){function b(a,b){a.args.unshift(function(){b()},a.key),a.fn.apply(this,a.args)}function e(c){var e=d.queue(b,1);return e.drain=function(){delete a[c]},e}var a={};return function(d,f){var g={key:d,fn:f,args:c.call(arguments,2)},h=a[d]||(a[d]=e(d));h.push(g,null,!0)}},d.queue=function(a,b){var c=new h(a,b);return c},Object.defineProperty(h.prototype,"length",{get:function(){return this._tasks.length}}),h.prototype.push=function(a,c,e){c=c||b,Array.isArray(a)||(a=[a]);var f=this._concurrency,g=this.saturated;for(var h=0,i=a.length;h<i;h++){var j=a[h];this._tasks.push({task:j,cb:c}),g&&this._tasks.length===f&&g(),e&&d.nextTick(this.process.bind(this))}},h.prototype.process=function(){var a=this,b=this._concurrency,c=this._iterator;if(this.workers<b&&this.length&&!this._err){var d=this._tasks.shift();this.empty&&!this.length&&this.empty(),this.workers++,c(d.task,function(){a.workers--;if(a._err)return;var c=arguments[0];c&&(a._err=!0),d.cb&&d.cb.apply(d,arguments);if(c&&a.onerror)return a.onerror(c);a.drain&&a.length+a.workers===0&&a.drain(),a.process()}),this.process()}},h.prototype.onerror=null,h.prototype.saturated=null,h.prototype.empty=null,h.prototype.drain=null,d.dag=function(a,c,d,e){function g(a){return l.filter(function(b){return b.id===a})[0]}function j(a,b){d(a,function(d){var e=g(a);e.notify.forEach(function(b){var c=g(b);c.fulfilled.push(a);var d=c.waitfor.length,e=c.fulfilled.length;d===e&&m.push(b,null,!0)}),b(d)})}function k(a){a.fulfilled=[],a.waitfor.length||m.push(a.id)}e=e||b;var f=i(a);if(f.error)return e(f.error);if(!f.path.length)return e(null);var l=f.graph,m=new h(j,c);l.forEach(k),m.onerror=e,m.drain=e,m.process()},d.dagSeries=function(a,c,e){e=e||b;var f=i(a);if(f.error)return e(f.error);d.forEachSeries(f.path,function(a,b){c(a,b)},e)},a.exports})
!function(e,t,n){typeof require=="function"&&typeof exports=="object"&&typeof module=="object"?module.exports=n(e,t):typeof define=="function"&&typeof define.amd=="object"?define(n):t[e]=n(e,t)}("breeze",this,function(e,t){function o(e){return e.length===0?null:e.length===1?e[0]:e}function u(e){if(Array.isArray(e)){var t=[];for(var n=0,r=e.length;n<r;n++)t.push(n);return t}return Object.keys(e)}function a(e,t,n){return function(t,r){var s=e[t];s(function(){var s=arguments[0];if(s)return r(s);var u=i.call(arguments,1),a=o(u);n[t]=a,r()})}}function f(e,t){this._iterator=e,this._concurrency=t||10,this._tasks=[],this._err=!1,this.workers=0}function l(e){function i(e){this.id=e,this.notify=[],this.waitfor=[]}function s(e,i){var o=t[e],u=o.id;if(r[e])return;Array.isArray(i)||(i=[]),i.push(u),r[e]=!0,o.notify.forEach(function(e){if(i.indexOf(e)>=0)throw new Error(u+" can not come before "+e);var t=e.toString(),n=i.map(function(e){return e});s(t,n)}),n.unshift(u)}function o(e){return null!==e&&"undefined"!=typeof e}var t={},n=[],r={};e.forEach(function(e){var n=e[0],r=e[1];t[n]||(t[n]=new i(n)),t[r]||(t[r]=new i(r)),~t[r].waitfor.indexOf(n)||t[r].waitfor.push(n),~t[n].notify.indexOf(r)||t[n].notify.push(r)});try{Object.keys(t).forEach(s)}catch(u){return{error:u}}var a=n.filter(o),f=[];return a.forEach(function(e){var n=t[e];n.notify=n.notify.filter(o),n.waitfor=n.waitfor.filter(o),f.push(n)}),{path:a,graph:f}}function c(e){var t=[],n={};for(var r in e){var i=e[r];if(Array.isArray(i))for(var s=0;s<i.length;s++){var o=i[s];"string"==typeof o?t.push([o,r]):"function"==typeof o&&(n[r]=o)}else n[r]=i,t.push([null,r])}return{edges:t,fns:n}}var n={},r=function(){},i=Array.prototype.slice,s=n.exports={};return s.version="0.4.0",s.nextTick="undefined"==typeof process||!process.nextTick?function(e){setTimeout(e,0)}:process.nextTick,s.forEach=function(e,t,n){n=n||r;if(!e.length)return n();var i=e.length;for(var s=0,o=e.length;s<o;s++)t(e[s],function(t){if(t)return n(t);--i||n(null)})},s.forEachSeries=function(e,t,n){function i(r){if(r==e.length)return n();t(e[r],function(t){if(t)return n(t);i(++r)})}n=n||r;if(!e.length)return n();i(0)},s.parallel=function(e,t){t=t||r;var n=u(e),i=Array.isArray(e)?Array(e.length):{},o=a(e,n,i);if(!n.length)return t();s.forEach(n,o,function(e){if(e)return t(e);t(null,i)})},s.series=function(e,t){t=t||r;var n=u(e),i=Array.isArray(e)?Array(e.length):{},o=a(e,n,i);if(!n.length)return t();s.forEachSeries(n,o,function(e){if(e)return t(e);t(null,i)})},s.atomic=function(){function t(e,t){e.args.unshift(function(){t()},e.key),e.fn.apply(this,e.args)}function n(n){var r=s.queue(t,1);return r.drain=function(){delete e[n]},r}var e={};return function(r,s){var o={key:r,fn:s,args:i.call(arguments,2)},u=e[r]||(e[r]=n(r));u.push(o,null,!0)}},s.queue=function(e,t){var n=new f(e,t);return n},Object.defineProperty(f.prototype,"length",{get:function(){return this._tasks.length}}),f.prototype.push=function(e,t,n){"boolean"==typeof t&&(n=t,t=r),Array.isArray(e)||(e=[e]),t=t||r;var i=this._concurrency,o=this.saturated;for(var u=0,a=e.length;u<a;u++){var f=e[u];this._tasks.push({task:f,cb:t}),o&&this._tasks.length===i&&o(),n&&s.nextTick(this.process.bind(this))}},f.prototype.process=function(){var e=this,t=this._concurrency,n=this._iterator;if(this.workers<t&&this.length&&!this._err){var r=this._tasks.shift();this.empty&&!this.length&&this.empty(),this.workers++,n(r.task,function(){e.workers--;if(e._err)return;var n=arguments[0];n&&(e._err=!0),r.cb&&r.cb.apply(r,arguments);if(n&&e.onerror)return e.onerror(n);e.drain&&e.length+e.workers===0&&e.drain(),e.process()}),this.process()}},f.prototype.onerror=null,f.prototype.saturated=null,f.prototype.empty=null,f.prototype.drain=null,s.dag=function(e,t,n,i){function o(e){return c.filter(function(t){return t.id===e})[0]}function u(e,t){n(e,function(r){var i=o(e);i.notify.forEach(function(t){var n=o(t);n.fulfilled.push(e);var r=n.waitfor.length,i=n.fulfilled.length;r===i&&h.push(t,null,!0)}),t(r)})}function a(e){e.fulfilled=[],e.waitfor.length||h.push(e.id)}i=i||r;var s=l(e);if(s.error)return i(s.error);if(!s.path.length)return i(null);var c=s.graph,h=new f(u,t);c.forEach(a),h.onerror=i,h.drain=i,h.process()},s.dagSeries=function(e,t,n){n=n||r;var i=l(e);if(i.error)return n(i.error);s.forEachSeries(i.path,function(e,n){t(e,n)},n)},s.auto=function(e,t,n){function a(e,t){var n=u[e];n(t)}"function"==typeof t&&(n=t,t=10),n=n||r;var i=c(e),o=i.edges,u=i.fns;s.dag(o,t,a,n)},s.autoSeries=function(e,t){function u(e,t){var n=o[e];n(t)}t=t||r;var n=c(e),i=n.edges,o=n.fns;s.dagSeries(i,u,t)},n.exports})
0.4.0 / 2012-07-25
==================
* update browser build to folio 0.3.x
* Merge branch 'feature/auto'
* add tests for `auto` and `autoSeries`
* add `.auto` and `.autoSeries` methods
* queue.push does not require (null) for callback to use autostart
* vim typo
0.3.0 / 2012-06-28

@@ -3,0 +13,0 @@ ==================

@@ -1,2 +0,2 @@

3/*!
/*!
* Breeze Async Control Utility

@@ -24,3 +24,3 @@ * Copyright(c) 2012 Jake Luer <jake@alogicalparadox.com>

exports.version = '0.3.0';
exports.version = '0.4.0';

@@ -259,3 +259,3 @@ /**

* }, i);
* });
* }
*

@@ -356,3 +356,3 @@ * @name atomic

/**
* #### .push (items, callback, autostart)
* #### .push (items[, callback[, autostart]])
*

@@ -368,3 +368,3 @@ * You can push an item or an array of items into

*
* Note, that if the queue has already been started but
* Note that if the queue has already been started but
* has been drained of items, it will not start again

@@ -381,4 +381,5 @@ * with another push unless the `autostart` toggle is present.

Queue.prototype.push = function (items, cb, start) {
if ('boolean' === typeof cb) start = cb, cb = noop;
if (!Array.isArray(items)) items = [ items ];
cb = cb || noop;
if (!Array.isArray(items)) items = [ items ];
var cc = this._concurrency

@@ -678,1 +679,95 @@ , sat = this.saturated;

};
/**
* .auto (tasks[, concurreny], callback)
*
* Determines a best-fit concurrency path of execution
* for a set of interdependant named tasks.
*
* Uses `.dag` under the hood.
*
* @param {Object} tasks
* @param {Number} concurrency
* @param {Function} callback
* @name auto
* @api public
*/
exports.auto = function (tasks, cc, cb) {
if ('function' == typeof cc) cb = cc, cc = 10;
cb = cb || noop;
var parsed = autoParseTasks(tasks)
, edges = parsed.edges
, fns = parsed.fns;
function iterator (edge, next) {
var fn = fns[edge];
fn(next);
}
exports.dag(edges, cc, iterator, cb);
};
/**
* .autoSeries (tasks, callback)
*
* Determines a best-fit concurrency path of execution
* for a set of interdependant named tasks.
*
* Uses `.dagSeries` under the hood.
*
* @param {Object} tasks
* @param {Number} concurrency
* @param {Function} callback
* @name auto
* @api public
*/
exports.autoSeries = function (tasks, cb) {
cb = cb || noop;
var parsed = autoParseTasks(tasks)
, edges = parsed.edges
, fns = parsed.fns;
function iterator (edge, next) {
var fn = fns[edge];
fn(next);
}
exports.dagSeries(edges, iterator, cb);
};
/*!
* autoParseTasks (tasks)
*
* Parse the object passed as `tasks` to auto
* or autoSeries. Returns an object indicating
* the graph and functions to call by name.
*
* @param {Object} tasks
* @api private
*/
function autoParseTasks (tasks) {
var edges = []
, fns = {};
for (var key in tasks) {
var task = tasks[key];
if (Array.isArray(task)) {
for (var i = 0; i < task.length; i++) {
var name = task[i];
if ('string' === typeof name) edges.push([ name, key ]);
else if ('function' == typeof name) fns[key] = name;
}
} else {
fns[key] = task;
edges.push([ null, key ]);
}
}
return { edges: edges, fns: fns };
}

@@ -5,3 +5,3 @@ {

"description": "Async flow control utility.",
"version": "0.3.0",
"version": "0.4.0",
"repository": {

@@ -20,3 +20,3 @@ "type": "git",

, "chai-spies": "*"
, "folio": "0.2.x"
, "folio": "0.3.x"
},

@@ -23,0 +23,0 @@ "optionalDependencies": {},

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc