task-queue
Advanced tools
Comparing version 0.0.9 to 0.1.1
60
index.js
@@ -5,3 +5,4 @@ /** | ||
var Buffer = require('CBuffer'); | ||
var Buffer = require('cbuffer-resizable'); | ||
var Heap = require('binaryheap-resizable'); | ||
var objdefined = require('objdefined'); | ||
@@ -15,18 +16,23 @@ var extend = require('extend'); | ||
} | ||
this._opts = options; | ||
this._opts = options || {}; | ||
if(!objdefined(this._opts.capacity)) throw new Error("Buffer capacity must be passed to task-queue."); | ||
this._array = new Buffer(this._opts.capacity); | ||
this._running = objdefined(this._opts.start, false); | ||
this._opts.concurrency = objdefined(this._opts.concurrency, 1); | ||
this._initArray(); | ||
}; | ||
this._exec = function() { | ||
q.prototype = { | ||
constructor: q, | ||
_initArray: function(){ | ||
this._array = new Buffer(this._opts.capacity); | ||
}, | ||
_exec: function() { | ||
if (this._running) { | ||
var i, actual_concurrency = this._opts.concurrency > this._array.size ? this._array.size : this._opts.concurrency; | ||
var i, actual_concurrency = this._opts.concurrency > this.size() ? this.size() : this._opts.concurrency; | ||
for (i = 0; i < actual_concurrency; i++){ | ||
var deq = this._array.shift(); | ||
var deq = this.dequeue(); | ||
if (deq) { | ||
setImmediate(function () { | ||
deq.method.apply(objdefined(deq.context, null), | ||
objdefined(deq.args, null)); | ||
objdefined(deq.args, null)); | ||
actual_concurrency--; | ||
@@ -38,7 +44,3 @@ if (actual_concurrency == 0) this._exec(); | ||
} | ||
}; | ||
}; | ||
q.prototype = { | ||
constructor: q, | ||
}, | ||
size: function(){ | ||
@@ -48,5 +50,2 @@ return this._array.size; | ||
enqueue: function(fn, opts){ // supports fn(args..) arguments in opts.args = [args..] | ||
if(this._array.isFull()){ | ||
return false; | ||
} | ||
var task = objdefined(opts, {}); | ||
@@ -59,4 +58,4 @@ task.method = fn; | ||
dequeue: function(){ | ||
if(this._array.size > 0) | ||
return this._array.shift(); | ||
if(this.size() > 0) | ||
return this._array.pop(); | ||
return null; | ||
@@ -82,2 +81,5 @@ }, | ||
extend(this._opts, opts); | ||
}, | ||
toArray: function(){ | ||
return this._array.toArray(); | ||
} | ||
@@ -90,7 +92,7 @@ }; | ||
} | ||
options = options || {}; | ||
options.comparator = options.comparator || function(a, b){ // Array.sort() comparator | ||
return a.priority > b.priority; // Higher priority comes first | ||
}; | ||
q.prototype.constructor.call(this, options); | ||
this._comparator = null; | ||
this.comparator(function(a, b){ // Array.sort() comparator | ||
return b.priority - a.priority; // Higher priority comes first | ||
}); | ||
}; | ||
@@ -103,9 +105,4 @@ | ||
p.prototype.comparator = function(fn){ | ||
if(!objdefined(fn)) return this._comparator; | ||
this._comparator = fn; | ||
this._safe_comparator = function(a, b){ | ||
if(!objdefined(a)) return 1; | ||
if(!objdefined(b)) return -1; | ||
return fn(a, b); | ||
}; | ||
if(!objdefined(fn)) return this._opts.comparator; | ||
this._opts.comparator = fn; | ||
}; | ||
@@ -116,3 +113,2 @@ | ||
if(size = q.prototype.enqueue.call(this, fn, objdefined(opts, {priority: 1}))){ | ||
this._array.sort(this._safe_comparator); | ||
return size; | ||
@@ -123,2 +119,6 @@ } | ||
p.prototype._initArray = function(){ | ||
this._array = new Heap(this._opts.capacity, this._opts.comparator); | ||
}; | ||
module.exports = { | ||
@@ -125,0 +125,0 @@ Queue: q, |
{ | ||
"author" : "Roberto Sales <robertosalesc@dcc.ufba.br>", | ||
"name": "task-queue", | ||
"version": "0.0.9", | ||
"version": "0.1.1", | ||
"description": "async single worker tasks queue that supports concurrency, priority and provide simple interface for passing task arguments", | ||
@@ -16,3 +16,4 @@ "keywords": ["task", "queue", "worker", "concurrency", "async", "priority"], | ||
{ | ||
"CBuffer": "~0.1.5", | ||
"cbuffer-resizable": "~0.0.4", | ||
"binaryheap-resizable": "~0.0.6", | ||
"objdefined": "~0.0.1", | ||
@@ -19,0 +20,0 @@ "extend": "*" |
19
test.js
@@ -6,5 +6,5 @@ /** | ||
var tq = require('./index.js'); | ||
/* | ||
var q = tq.Queue({capacity: 3}); | ||
var q = tq.Queue({capacity: 64}); | ||
q.enqueue(function(){}); | ||
@@ -16,9 +16,16 @@ q.enqueue(function(){}); | ||
console.log(q._array); | ||
*/ | ||
var p = tq.PriorityQueue({capacity: 64}); | ||
var p = tq.PriorityQueue({capacity: 6, comparator: function(a,b){ | ||
return a.priority > b.priority}}); | ||
p.enqueue(function(){}); | ||
p.enqueue(function(){}, {priority: 0}); | ||
p.enqueue(function(){}, {priority: 5}); | ||
p.enqueue(function(){}, {priority: 8}); | ||
p.enqueue(function(){}, {priority: 4}); | ||
p.enqueue(function(){}, {priority: 11}); | ||
p.enqueue(function(){}, {priority: 3}); | ||
console.log(p._array); | ||
console.log(p.dequeue()); | ||
console.log(p.dequeue()); | ||
console.log(p.toArray()); |
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
26188
15
127
4
+ Addedbinaryheap-resizable@~0.0.6
+ Addedcbuffer-resizable@~0.0.4
+ Addedbinaryheap-resizable@0.0.8(transitive)
+ Addedpeek@0.0.2(transitive)
- RemovedCBuffer@~0.1.5
- RemovedCBuffer@0.1.5(transitive)