Comparing version 0.1.0 to 0.2.0
197
breeze.js
@@ -38,3 +38,3 @@ /*! | ||
exports.version = '0.1.0'; | ||
exports.version = '0.2.0'; | ||
@@ -444,4 +444,199 @@ /** | ||
/** | ||
* .dag (edges, concurrency, iterator[, done]) | ||
* | ||
* DAG, directed-acyclic-graph, is a graph of nodes in | ||
* which there are no cyclic references, and therefor has | ||
* a specific starting and ending point. The `dag` async | ||
* method will take an array of edges and resolve a best | ||
* fit path of execution. It will then iterate over each | ||
* edge in parallel up to a set amount of threads (concurrency). | ||
* Furthermore, an edge will not begin it's processing until | ||
* all of its dependancies have indicated successful execution. | ||
* | ||
* A set of edges is defined as an array, with each element being | ||
* an array of x, y pairs, where `x` must complete before `y` | ||
* can begin. | ||
* | ||
* var edges = [ | ||
* [ 'a', 'b' ] | ||
* , [ 'a', 'c' ] | ||
* , [ 'd', 'e' ] | ||
* , [ 'b', 'd' ] | ||
* ]; | ||
* | ||
* With the above edges, we expect `a` to start processing. Upon | ||
* completion, `b` and `c` will start. Upon `b` completion, `d` | ||
* will execute, then `e`. | ||
* | ||
* If there are cyclical references in the set of edges, the `done` | ||
* callback will be immediately called with an error indicating | ||
* the problem. | ||
* | ||
* breeze.dag(edges, 2, function (e, next) { | ||
* setTimeout(function () { | ||
* next(); // or next(err); | ||
* }, 1000); | ||
* }, function (err) { | ||
* // our done callback | ||
* }); | ||
* | ||
* As with `queue`, if an error occurs the `done` callback will | ||
* be executed immediately. No more items will begin processing, | ||
* but items that have already started will run to completion. | ||
* | ||
* @param {Array} edges | ||
* @param {Number} concurrency | ||
* @param {Function} iterator | ||
* @param {Function} onError or onComplete | ||
* @name dag | ||
* @api public | ||
*/ | ||
exports.dag = function (edges, concurrency, iterator, cb) { | ||
cb = cb || noop; | ||
var sorted = tsort(edges) | ||
if (sorted.error) return cb(sorted.error); | ||
// helper: get edge with id | ||
function selectEdge(id) { | ||
return graph.filter(function (e) { | ||
return e.id === id; | ||
})[0]; | ||
} | ||
// queue iterator | ||
function action (e, next) { | ||
iterator(e, function done (err) { | ||
var edge = selectEdge(e); | ||
edge.notify.forEach(function (n) { | ||
var notify = selectEdge(n); | ||
notify.fulfilled.push(e); | ||
var wl = notify.waitfor.length | ||
, fl = notify.fulfilled.length; | ||
if (wl === fl) queue.push(n, null, true); | ||
}); | ||
next(err); | ||
}); | ||
} | ||
// determine where to start | ||
function bootstrap (e) { | ||
e.fulfilled = []; | ||
if (!e.waitfor.length) queue.push(e.id); | ||
} | ||
// begin | ||
var graph = sorted.graph | ||
, queue = new Queue(action, concurrency); | ||
graph.forEach(bootstrap); | ||
queue.onerror = cb; | ||
queue.drain = cb; | ||
queue.process(); | ||
}; | ||
/** | ||
* ### .dagSeries (edges, iterator[, done]) | ||
* | ||
* Similar to `dag`, but will not use concurrency. A best | ||
* fit path of execution will be dermined and then executed | ||
* serially. | ||
* | ||
* @param {Array} edges | ||
* @param {Function} iterator | ||
* @param {Function} onError or onComplete callback | ||
* @name dagSeries | ||
* @api public | ||
*/ | ||
exports.dagSeries = function (edges, iterator, cb) { | ||
cb = cb || noop; | ||
var sorted = tsort(edges); | ||
if (sorted.error) return cb(sorted.error); | ||
exports.forEachSeries(sorted.path, function (edge, next) { | ||
iterator(edge, next); | ||
}, cb); | ||
}; | ||
/*! | ||
* tsort (edges) | ||
* | ||
* Topological sort utility. | ||
* | ||
* @param {Array} edges | ||
* @returns {Object} | ||
* @api private | ||
*/ | ||
function tsort (edges) { | ||
var nodes = {} | ||
, sorted = [] | ||
, visited = {}; | ||
// node constructor | ||
function N (id) { | ||
this.id = id; | ||
this.notify = []; | ||
this.waitfor = []; | ||
} | ||
// parse edges into nodes | ||
edges.forEach(function (v) { | ||
var from = v[0] | ||
, to = v[1]; | ||
if (!nodes[from]) nodes[from] = new N(from); | ||
if (!nodes[to]) nodes[to] = new N(to); | ||
if (!~nodes[to].waitfor.indexOf(from)) | ||
nodes[to].waitfor.push(from); | ||
if (!~nodes[from].notify.indexOf(to)) | ||
nodes[from].notify.push(to); | ||
}); | ||
// recursively visit nodes | ||
function doVisit (idstr, ancestors) { | ||
var node = nodes[idstr] | ||
, id = node.id; | ||
if (visited[idstr]) return; | ||
if (!Array.isArray(ancestors)) | ||
ancestors = []; | ||
ancestors.push(id); | ||
visited[idstr] = true; | ||
// deep recursive checking | ||
node.notify.forEach(function (afterId) { | ||
if (ancestors.indexOf(afterId) >= 0) | ||
throw new Error(id + ' can not come before ' + afterId); | ||
var aid = afterId.toString() | ||
, anc = ancestors.map(function (v) { return v }); | ||
doVisit(aid, anc); | ||
}); | ||
sorted.unshift(id); | ||
} | ||
function doFilter (s) { | ||
return null !== s | ||
&& 'undefined' !== typeof s; | ||
} | ||
// actually do our recursion | ||
// TODO: without try catch | ||
try { Object.keys(nodes).forEach(doVisit); } | ||
catch (ex) { return { error: ex } }; | ||
// order our notify object | ||
var critical = sorted.filter(doFilter) | ||
, graph = []; | ||
critical.forEach(function (v) { | ||
var n = nodes[v]; | ||
graph.push(n); | ||
}); | ||
return { path: critical, graph: graph }; | ||
}; | ||
return module.exports; | ||
}); |
@@ -7,2 +7,2 @@ /*! | ||
!function(a,b){typeof module!="undefined"?module.exports=b():typeof define=="function"&&typeof define.amd=="object"?define(b):this[a]=b()}("breeze",function(){function e(a){return a.length===0?null:a.length===1?a[0]:a}function f(a){if(Array.isArray(a)){var b=[];for(var c=0,d=a.length;c<d;c++)b.push(c);return b}return Object.keys(a)}function g(a,b,d){return function(b,f){var g=a[b];g(function(){var g=arguments[0];if(g)return f(g);var h=c.call(arguments,1),i=e(h);d[b]=i,f()})}}function h(a,b){this._iterator=a,this._concurrency=b||10,this._tasks=[],this._err=!1,this.workers=0}var a={},b=function(){},c=Array.prototype.slice,d=a.exports={};return d.version="0.1.0",d.nextTick="undefined"==typeof process||!process.nextTick?function(a){setTimeout(a,0)}:process.nextTick,d.forEach=function(a,c,d){d=d||b;if(!a.length)return d();var e=a.length;for(var f=0,g=a.length;f<g;f++)c(a[f],function(b){if(b)return d(b);--e||d(null)})},d.forEachSeries=function(a,c,d){function e(b){if(b==a.length)return d();c(a[b],function(c){if(c)return d(c);e(++b)})}d=d||b;if(!a.length)return d();e(0)},d.parallel=function(a,c){c=c||b;var e=f(a),h=Array.isArray(a)?Array(a.length):{},i=g(a,e,h);if(!e.length)return c();d.forEach(e,i,function(a){if(a)return c(a);c(null,h)})},d.series=function(a,c){c=c||b;var e=f(a),h=Array.isArray(a)?Array(a.length):{},i=g(a,e,h);if(!e.length)return c();d.forEachSeries(e,i,function(a){if(a)return c(a);c(null,h)})},d.queue=function(a,b){var c=new h(a,b);return c},Object.defineProperty(h.prototype,"length",{get:function(){return this._tasks.length}}),h.prototype.push=function(a,c,e){c=c||b,Array.isArray(a)||(a=[a]);var f=this._concurrency,g=this.saturated;for(var h=0,i=a.length;h<i;h++){var j=a[h];this._tasks.push({task:j,cb:c}),g&&this._tasks.length===f&&g(),e&&d.nextTick(this.process.bind(this))}},h.prototype.process=function(){var a=this,b=this._concurrency,c=this._iterator;if(this.workers<b&&this.length){if(this._err)return;var d=this._tasks.shift();this.empty&&!this.length&&this.empty(),this.workers++,c(d.task,function(){a.workers--;if(a._err)return;d.cb&&d.cb.apply(d,arguments);var c=arguments[0];c&&(a._err=!0);if(c&&a.onerror)return a.onerror(c);a.drain&&a.length+a.workers===0&&a.drain(),a.process()})}},h.prototype.onerror=null,h.prototype.saturated=null,h.prototype.empty=null,h.prototype.drain=null,a.exports}) | ||
!function(a,b){typeof module!="undefined"?module.exports=b():typeof define=="function"&&typeof define.amd=="object"?define(b):this[a]=b()}("breeze",function(){function e(a){return a.length===0?null:a.length===1?a[0]:a}function f(a){if(Array.isArray(a)){var b=[];for(var c=0,d=a.length;c<d;c++)b.push(c);return b}return Object.keys(a)}function g(a,b,d){return function(b,f){var g=a[b];g(function(){var g=arguments[0];if(g)return f(g);var h=c.call(arguments,1),i=e(h);d[b]=i,f()})}}function h(a,b){this._iterator=a,this._concurrency=b||10,this._tasks=[],this._err=!1,this.workers=0}function i(a){function e(a){this.id=a,this.notify=[],this.waitfor=[]}function f(a,e){var g=b[a],h=g.id;if(d[a])return;Array.isArray(e)||(e=[]),e.push(h),d[a]=!0,g.notify.forEach(function(a){if(e.indexOf(a)>=0)throw new Error(h+" can not come before "+a);var b=a.toString(),c=e.map(function(a){return a});f(b,c)}),c.unshift(h)}function g(a){return null!==a&&"undefined"!=typeof a}var b={},c=[],d={};a.forEach(function(a){var c=a[0],d=a[1];b[c]||(b[c]=new e(c)),b[d]||(b[d]=new e(d)),~b[d].waitfor.indexOf(c)||b[d].waitfor.push(c),~b[c].notify.indexOf(d)||b[c].notify.push(d)});try{Object.keys(b).forEach(f)}catch(h){return{error:h}}var i=c.filter(g),j=[];return i.forEach(function(a){var c=b[a];j.push(c)}),{path:i,graph:j}}var a={},b=function(){},c=Array.prototype.slice,d=a.exports={};return d.version="0.2.0",d.nextTick="undefined"==typeof process||!process.nextTick?function(a){setTimeout(a,0)}:process.nextTick,d.forEach=function(a,c,d){d=d||b;if(!a.length)return d();var e=a.length;for(var f=0,g=a.length;f<g;f++)c(a[f],function(b){if(b)return d(b);--e||d(null)})},d.forEachSeries=function(a,c,d){function e(b){if(b==a.length)return d();c(a[b],function(c){if(c)return d(c);e(++b)})}d=d||b;if(!a.length)return d();e(0)},d.parallel=function(a,c){c=c||b;var e=f(a),h=Array.isArray(a)?Array(a.length):{},i=g(a,e,h);if(!e.length)return c();d.forEach(e,i,function(a){if(a)return c(a);c(null,h)})},d.series=function(a,c){c=c||b;var e=f(a),h=Array.isArray(a)?Array(a.length):{},i=g(a,e,h);if(!e.length)return c();d.forEachSeries(e,i,function(a){if(a)return c(a);c(null,h)})},d.queue=function(a,b){var c=new h(a,b);return c},Object.defineProperty(h.prototype,"length",{get:function(){return this._tasks.length}}),h.prototype.push=function(a,c,e){c=c||b,Array.isArray(a)||(a=[a]);var f=this._concurrency,g=this.saturated;for(var h=0,i=a.length;h<i;h++){var j=a[h];this._tasks.push({task:j,cb:c}),g&&this._tasks.length===f&&g(),e&&d.nextTick(this.process.bind(this))}},h.prototype.process=function(){var a=this,b=this._concurrency,c=this._iterator;if(this.workers<b&&this.length){if(this._err)return;var d=this._tasks.shift();this.empty&&!this.length&&this.empty(),this.workers++,c(d.task,function(){a.workers--;if(a._err)return;d.cb&&d.cb.apply(d,arguments);var c=arguments[0];c&&(a._err=!0);if(c&&a.onerror)return a.onerror(c);a.drain&&a.length+a.workers===0&&a.drain(),a.process()})}},h.prototype.onerror=null,h.prototype.saturated=null,h.prototype.empty=null,h.prototype.drain=null,d.dag=function(a,c,d,e){function g(a){return l.filter(function(b){return b.id===a})[0]}function j(a,b){d(a,function(d){var e=g(a);e.notify.forEach(function(b){var c=g(b);c.fulfilled.push(a);var d=c.waitfor.length,e=c.fulfilled.length;d===e&&m.push(b,null,!0)}),b(d)})}function k(a){a.fulfilled=[],a.waitfor.length||m.push(a.id)}e=e||b;var f=i(a);if(f.error)return e(f.error);var l=f.graph,m=new h(j,c);l.forEach(k),m.onerror=e,m.drain=e,m.process()},d.dagSeries=function(a,c,e){e=e||b;var f=i(a);if(f.error)return e(f.error);d.forEachSeries(f.path,function(a,b){c(a,b)},e)},a.exports}) |
0.2.0 / 2012-06-16 | ||
================== | ||
* browser build | ||
* dag and dagSeries comments | ||
* Merge branch 'feature/pdag' | ||
* add tsort algorithm and dag/dagSeries async flows + tests | ||
* fix wrong version in package | ||
0.1.0 / 2012-06-14 | ||
@@ -3,0 +12,0 @@ ================== |
@@ -24,3 +24,3 @@ /*! | ||
exports.version = '0.1.0'; | ||
exports.version = '0.2.0'; | ||
@@ -429,1 +429,196 @@ /** | ||
Queue.prototype.drain = null; | ||
/** | ||
* .dag (edges, concurrency, iterator[, done]) | ||
* | ||
* DAG, directed-acyclic-graph, is a graph of nodes in | ||
* which there are no cyclic references, and therefor has | ||
* a specific starting and ending point. The `dag` async | ||
* method will take an array of edges and resolve a best | ||
* fit path of execution. It will then iterate over each | ||
* edge in parallel up to a set amount of threads (concurrency). | ||
* Furthermore, an edge will not begin it's processing until | ||
* all of its dependancies have indicated successful execution. | ||
* | ||
* A set of edges is defined as an array, with each element being | ||
* an array of x, y pairs, where `x` must complete before `y` | ||
* can begin. | ||
* | ||
* var edges = [ | ||
* [ 'a', 'b' ] | ||
* , [ 'a', 'c' ] | ||
* , [ 'd', 'e' ] | ||
* , [ 'b', 'd' ] | ||
* ]; | ||
* | ||
* With the above edges, we expect `a` to start processing. Upon | ||
* completion, `b` and `c` will start. Upon `b` completion, `d` | ||
* will execute, then `e`. | ||
* | ||
* If there are cyclical references in the set of edges, the `done` | ||
* callback will be immediately called with an error indicating | ||
* the problem. | ||
* | ||
* breeze.dag(edges, 2, function (e, next) { | ||
* setTimeout(function () { | ||
* next(); // or next(err); | ||
* }, 1000); | ||
* }, function (err) { | ||
* // our done callback | ||
* }); | ||
* | ||
* As with `queue`, if an error occurs the `done` callback will | ||
* be executed immediately. No more items will begin processing, | ||
* but items that have already started will run to completion. | ||
* | ||
* @param {Array} edges | ||
* @param {Number} concurrency | ||
* @param {Function} iterator | ||
* @param {Function} onError or onComplete | ||
* @name dag | ||
* @api public | ||
*/ | ||
exports.dag = function (edges, concurrency, iterator, cb) { | ||
cb = cb || noop; | ||
var sorted = tsort(edges) | ||
if (sorted.error) return cb(sorted.error); | ||
// helper: get edge with id | ||
function selectEdge(id) { | ||
return graph.filter(function (e) { | ||
return e.id === id; | ||
})[0]; | ||
} | ||
// queue iterator | ||
function action (e, next) { | ||
iterator(e, function done (err) { | ||
var edge = selectEdge(e); | ||
edge.notify.forEach(function (n) { | ||
var notify = selectEdge(n); | ||
notify.fulfilled.push(e); | ||
var wl = notify.waitfor.length | ||
, fl = notify.fulfilled.length; | ||
if (wl === fl) queue.push(n, null, true); | ||
}); | ||
next(err); | ||
}); | ||
} | ||
// determine where to start | ||
function bootstrap (e) { | ||
e.fulfilled = []; | ||
if (!e.waitfor.length) queue.push(e.id); | ||
} | ||
// begin | ||
var graph = sorted.graph | ||
, queue = new Queue(action, concurrency); | ||
graph.forEach(bootstrap); | ||
queue.onerror = cb; | ||
queue.drain = cb; | ||
queue.process(); | ||
}; | ||
/** | ||
* ### .dagSeries (edges, iterator[, done]) | ||
* | ||
* Similar to `dag`, but will not use concurrency. A best | ||
* fit path of execution will be dermined and then executed | ||
* serially. | ||
* | ||
* @param {Array} edges | ||
* @param {Function} iterator | ||
* @param {Function} onError or onComplete callback | ||
* @name dagSeries | ||
* @api public | ||
*/ | ||
exports.dagSeries = function (edges, iterator, cb) { | ||
cb = cb || noop; | ||
var sorted = tsort(edges); | ||
if (sorted.error) return cb(sorted.error); | ||
exports.forEachSeries(sorted.path, function (edge, next) { | ||
iterator(edge, next); | ||
}, cb); | ||
}; | ||
/*! | ||
* tsort (edges) | ||
* | ||
* Topological sort utility. | ||
* | ||
* @param {Array} edges | ||
* @returns {Object} | ||
* @api private | ||
*/ | ||
function tsort (edges) { | ||
var nodes = {} | ||
, sorted = [] | ||
, visited = {}; | ||
// node constructor | ||
function N (id) { | ||
this.id = id; | ||
this.notify = []; | ||
this.waitfor = []; | ||
} | ||
// parse edges into nodes | ||
edges.forEach(function (v) { | ||
var from = v[0] | ||
, to = v[1]; | ||
if (!nodes[from]) nodes[from] = new N(from); | ||
if (!nodes[to]) nodes[to] = new N(to); | ||
if (!~nodes[to].waitfor.indexOf(from)) | ||
nodes[to].waitfor.push(from); | ||
if (!~nodes[from].notify.indexOf(to)) | ||
nodes[from].notify.push(to); | ||
}); | ||
// recursively visit nodes | ||
function doVisit (idstr, ancestors) { | ||
var node = nodes[idstr] | ||
, id = node.id; | ||
if (visited[idstr]) return; | ||
if (!Array.isArray(ancestors)) | ||
ancestors = []; | ||
ancestors.push(id); | ||
visited[idstr] = true; | ||
// deep recursive checking | ||
node.notify.forEach(function (afterId) { | ||
if (ancestors.indexOf(afterId) >= 0) | ||
throw new Error(id + ' can not come before ' + afterId); | ||
var aid = afterId.toString() | ||
, anc = ancestors.map(function (v) { return v }); | ||
doVisit(aid, anc); | ||
}); | ||
sorted.unshift(id); | ||
} | ||
function doFilter (s) { | ||
return null !== s | ||
&& 'undefined' !== typeof s; | ||
} | ||
// actually do our recursion | ||
// TODO: without try catch | ||
try { Object.keys(nodes).forEach(doVisit); } | ||
catch (ex) { return { error: ex } }; | ||
// order our notify object | ||
var critical = sorted.filter(doFilter) | ||
, graph = []; | ||
critical.forEach(function (v) { | ||
var n = nodes[v]; | ||
graph.push(n); | ||
}); | ||
return { path: critical, graph: graph }; | ||
}; |
@@ -5,3 +5,3 @@ { | ||
"description": "Async flow control utility.", | ||
"version": "0.1.0", | ||
"version": "0.2.0", | ||
"repository": { | ||
@@ -8,0 +8,0 @@ "type": "git", |
37276
1143