Comparing version 1.0.5 to 1.0.6
@@ -54,3 +54,3 @@ 'use strict'; | ||
var mature = entry.mature || Queue.now (); | ||
self._verbose ('insert: id is %s, mature is %s', id, mature); | ||
self._verbose ('insert: id is %s, mature is %s', id, mature); | ||
@@ -62,3 +62,3 @@ this._roq.push (id, mature.getTime (), JSON.stringify (pl), function (err, res) { | ||
self._verbose ('insert: inserted payload %j', pl, {}); | ||
self._verbose ('insert: inserted payload %j', pl, {}); | ||
callback (null, res); | ||
@@ -88,3 +88,3 @@ }); | ||
// TODO check res has 2, and parses as json | ||
// TODO check res has #2, and parses as json | ||
@@ -98,4 +98,70 @@ pl.mature = new Date (parseInt (res[1])); | ||
} | ||
///////////////////////////////////////// | ||
// reserve element: call cb (err, pl) where pl has an id | ||
reserve (callback) { | ||
///////////////////////////////////////// | ||
var self = this; | ||
var delay = this._opts.reserve_delay || 120; | ||
this._roq.reserve (delay*1000, function (err, res) { | ||
if (err) { | ||
return callback (err); | ||
} | ||
// res is [id, mature, text] | ||
self._verbose ('reserve: obtained %j', res, {}); | ||
if (!res) { | ||
callback (null, null); | ||
} | ||
else { | ||
var pl = JSON.parse (res[2]); | ||
// TODO check res has #2, and parses as json | ||
pl.mature = new Date (parseInt (res[1])); | ||
pl._id = res[0]; | ||
self._verbose ('reserve: final pl to return is %j', pl, {}); | ||
callback (null, pl); | ||
} | ||
}); | ||
} | ||
///////////////////////////////////////// | ||
// commit previous reserve, by p.id: call cb (err, true|false), true if element committed | ||
commit (id, callback) { | ||
///////////////////////////////////////// | ||
var self = this; | ||
this._roq.commit (id, function (err, res) { | ||
if (err) { | ||
return callback (err); | ||
} | ||
self._verbose ('commit: res is', res, {}); | ||
return callback (null, res != null) | ||
}); | ||
} | ||
///////////////////////////////////////// | ||
// rollback previous reserve, by p.id: call cb (err, true|false), true if element rolled back | ||
rollback (id, callback) { | ||
///////////////////////////////////////// | ||
var self = this; | ||
this._roq.rollback (id, function (err, res) { | ||
if (err) { | ||
return callback (err); | ||
} | ||
self._verbose ('rollback: res is', res, {}); | ||
return callback (null, res != null) | ||
}); | ||
} | ||
////////////////////////////////// | ||
@@ -102,0 +168,0 @@ // queue size including non-mature elements |
{ | ||
"name": "keuss", | ||
"version": "1.0.5", | ||
"version": "1.0.6", | ||
"keywords": ["queue", "job"], | ||
@@ -5,0 +5,0 @@ "homepage":"https://github.com/pepmartinez/keuss", |
@@ -8,2 +8,3 @@ | ||
var factory = null; | ||
@@ -19,3 +20,3 @@ | ||
new (winston.transports.Console)({ | ||
level: 'info', | ||
level: 'verbose', | ||
timestamp: function() {return new Date ();}, | ||
@@ -318,2 +319,188 @@ formatter: function (options) { | ||
it ('should do raw reserve & commit as expected', function (done){ | ||
var q = factory.queue('test_queue'); | ||
var id = null; | ||
async.series([ | ||
function (cb) {q.push ({elem:1, pl:'twetrwte'}, cb)}, | ||
function (cb) {q.size (function (err, size) { | ||
size.should.equal (1); | ||
cb(); | ||
})}, | ||
function (cb) {q.next_t (function (err, res) { | ||
res.getTime().should.be.approximately (new Date().getTime (), 500); | ||
cb(); | ||
})}, | ||
function (cb) {q.reserve (function (err, res) { | ||
id = res._id; | ||
res.payload.should.eql ({elem:1, pl:'twetrwte'}); | ||
res.tries.should.equal (0); | ||
cb (); | ||
})}, | ||
function (cb) {q.size (function (err, size) { | ||
size.should.equal (0); | ||
cb(); | ||
})}, | ||
function (cb) {q.totalSize (function (err, size) { | ||
size.should.equal (1); | ||
cb(); | ||
})}, | ||
function (cb) {q.next_t (function (err, res) { | ||
res.getTime().should.be.approximately (new Date().getTime () + 120000, 500); | ||
cb(); | ||
})}, | ||
function (cb) {q.commit (id, function (err, res) { | ||
res.should.equal (true); | ||
cb (); | ||
})}, | ||
function (cb) {q.size (function (err, size) { | ||
size.should.equal (0); | ||
cb(); | ||
})}, | ||
function (cb) {q.totalSize (function (err, size) { | ||
size.should.equal (0); | ||
cb(); | ||
})}, | ||
function (cb) {q.next_t (function (err, res) { | ||
should.equal (res, null); | ||
cb(); | ||
})} | ||
], function(err, results) { | ||
done(); | ||
}); | ||
}); | ||
it ('should do raw reserve & rollback as expected', function (done){ | ||
var q = factory.queue('test_queue'); | ||
var id = null; | ||
async.series([ | ||
function (cb) {q.push ({elem:1, pl:'twetrwte'}, cb)}, | ||
function (cb) {q.size (function (err, size) { | ||
size.should.equal (1); | ||
cb(); | ||
})}, | ||
function (cb) {q.next_t (function (err, res) { | ||
res.getTime().should.be.approximately (new Date().getTime (), 500); | ||
cb(); | ||
})}, | ||
function (cb) {q.reserve (function (err, res) { | ||
id = res._id; | ||
res.payload.should.eql ({elem:1, pl:'twetrwte'}); | ||
res.tries.should.equal (0); | ||
cb (); | ||
})}, | ||
function (cb) {q.size (function (err, size) { | ||
size.should.equal (0); | ||
cb(); | ||
})}, | ||
function (cb) {q.totalSize (function (err, size) { | ||
size.should.equal (1); | ||
cb(); | ||
})}, | ||
function (cb) {q.next_t (function (err, res) { | ||
res.getTime().should.be.approximately (new Date().getTime () + 120000, 500); | ||
cb(); | ||
})}, | ||
function (cb) {q.rollback (id, function (err, res) { | ||
res.should.equal (true); | ||
cb (); | ||
})}, | ||
function (cb) {q.size (function (err, size) { | ||
size.should.equal (1); | ||
cb(); | ||
})}, | ||
function (cb) {q.totalSize (function (err, size) { | ||
size.should.equal (1); | ||
cb(); | ||
})}, | ||
function (cb) {q.next_t (function (err, res) { | ||
res.getTime().should.be.approximately (new Date().getTime (), 500); | ||
cb(); | ||
})}, | ||
function (cb) {q.commit (id, function (err, res) { | ||
res.should.equal (false); | ||
cb (); | ||
})}, | ||
function (cb) {q.size (function (err, size) { | ||
size.should.equal (1); | ||
cb(); | ||
})}, | ||
function (cb) {q.totalSize (function (err, size) { | ||
size.should.equal (1); | ||
cb(); | ||
})}, | ||
function (cb) {q.next_t (function (err, res) { | ||
res.getTime().should.be.approximately (new Date().getTime (), 500); | ||
cb(); | ||
})}, | ||
function (cb) {q.get (function (err, res) { | ||
res._id.should.eql (id); | ||
res.payload.should.eql ({elem:1, pl:'twetrwte'}); | ||
// res.tries.should.equal (1); | ||
cb (); | ||
})}, | ||
function (cb) {q.size (function (err, size) { | ||
size.should.equal (0); | ||
cb(); | ||
})}, | ||
function (cb) {q.totalSize (function (err, size) { | ||
size.should.equal (0); | ||
cb(); | ||
})}, | ||
function (cb) {q.next_t (function (err, res) { | ||
should.equal (res, null); | ||
cb(); | ||
})} | ||
], function(err, results) { | ||
done(); | ||
}); | ||
}); | ||
it ('should do get.reserve & ok as expected', function (done){ | ||
var q = factory.queue('test_queue'); | ||
var id = null; | ||
async.series([ | ||
function (cb) {q.push ({elem:1, pl:'twetrwte'}, cb)}, | ||
function (cb) {q.pop ('c1', {reserve: true}, function (err, res) { | ||
id = res._id; | ||
res.payload.should.eql ({elem:1, pl:'twetrwte'}); | ||
res.tries.should.equal (0); | ||
cb (); | ||
})}, | ||
function (cb) {q.size (function (err, size) { | ||
size.should.equal (0); | ||
cb(); | ||
})}, | ||
function (cb) {q.totalSize (function (err, size) { | ||
size.should.equal (1); | ||
cb(); | ||
})}, | ||
function (cb) {q.next_t (function (err, res) { | ||
res.getTime().should.be.approximately (new Date().getTime () + 120000, 500); | ||
cb(); | ||
})}, | ||
function (cb) {q.ok (id, function (err, res) { | ||
res.should.equal (true); | ||
cb (); | ||
})}, | ||
function (cb) {q.size (function (err, size) { | ||
size.should.equal (0); | ||
cb(); | ||
})}, | ||
function (cb) {q.totalSize (function (err, size) { | ||
size.should.equal (0); | ||
cb(); | ||
})}, | ||
function (cb) {q.next_t (function (err, res) { | ||
should.equal (res, null); | ||
cb(); | ||
})} | ||
], function(err, results) { | ||
done(); | ||
}); | ||
}); | ||
}); |
@@ -37,3 +37,3 @@ 'use strict'; | ||
local val = redis.call ('HGET', 'keuss:q:ordered_queue:hash:' .. KEYS[1], id) | ||
-- delete from index, hash | ||
@@ -46,3 +46,95 @@ redis.call ('ZREM', 'keuss:q:ordered_queue:index:' .. KEYS[1], id) | ||
const _s_lua_code_reserve = ` | ||
-- qname in KEYS[1] | ||
-- mature_mark in ARGV[1] | ||
-- incr_t in ARGV[2] | ||
-- get older (lower mature) id from index | ||
local z_res = redis.call ('ZRANGE', 'keuss:q:ordered_queue:index:' .. KEYS[1], 0, 0, 'WITHSCORES') | ||
if (z_res[1] == nil) then | ||
return nil | ||
end | ||
local id = z_res[1] | ||
local mature = z_res[2] | ||
if (mature > ARGV[1]) then | ||
-- head is not mature, just end | ||
return nil | ||
end | ||
-- increment score | ||
redis.call ('ZINCRBY', 'keuss:q:ordered_queue:index:' .. KEYS[1], ARGV[2], id) | ||
-- get val by id from hash | ||
local val = redis.call ('HGET', 'keuss:q:ordered_queue:hash:' .. KEYS[1], id) | ||
local obj_val = cjson.decode (val) | ||
if (not (obj_val.reserved)) then | ||
-- mark as reserved | ||
obj_val.reserved = true | ||
redis.call ('HSET', 'keuss:q:ordered_queue:hash:' .. KEYS[1], id, cjson.encode (obj_val)) | ||
end | ||
return { id, z_res[2], val } | ||
`; | ||
const _s_lua_code_commit = ` | ||
-- qname in KEYS[1] | ||
-- id in ARGV[1] | ||
local id = ARGV[1] | ||
local val = redis.call ('HGET', 'keuss:q:ordered_queue:hash:' .. KEYS[1], id) | ||
if (val == nil) then | ||
return nil | ||
end | ||
-- check if it was reserved | ||
local obj_val = cjson.decode (val) | ||
if (not (obj_val.reserved)) then | ||
-- not a reserved one | ||
return nil | ||
end | ||
-- delete from index, hash | ||
redis.call ('ZREM', 'keuss:q:ordered_queue:index:' .. KEYS[1], id) | ||
redis.call ('HDEL', 'keuss:q:ordered_queue:hash:' .. KEYS[1], id) | ||
return id | ||
`; | ||
const _s_lua_code_rollback = ` | ||
-- qname in KEYS[1] | ||
-- id in ARGV[1] | ||
-- new_t in ARGV[2] | ||
local id = ARGV[1] | ||
local val = redis.call ('HGET', 'keuss:q:ordered_queue:hash:' .. KEYS[1], id) | ||
if (val == nil) then | ||
return nil | ||
end | ||
local obj_val = cjson.decode (val) | ||
if (not (obj_val.reserved)) then | ||
-- not a reserved one | ||
return nil | ||
end | ||
-- reset score | ||
redis.call ('ZADD', 'keuss:q:ordered_queue:index:' .. KEYS[1], 'XX', ARGV[2], id) | ||
-- reset val | ||
obj_val.reserved = false | ||
redis.call ('HSET', 'keuss:q:ordered_queue:hash:' .. KEYS[1], id, cjson.encode (obj_val)) | ||
return id | ||
`; | ||
class RedisOrderedQueue { | ||
@@ -55,3 +147,5 @@ constructor (name, factory) { | ||
////////////////////////////////// | ||
push (id, mature, obj, done) { | ||
////////////////////////////////// | ||
this._rediscl.roq_push (this._name, id, mature, obj, function (err, res) { | ||
@@ -65,3 +159,5 @@ if (err) return done (err); | ||
////////////////////////////////// | ||
pop (done) { | ||
////////////////////////////////// | ||
this._rediscl.roq_pop (this._name, new Date().getTime (), function (err, res) { | ||
@@ -76,2 +172,35 @@ if (err) return done (err); | ||
////////////////////////////////// | ||
reserve (incr, done) { | ||
////////////////////////////////// | ||
this._rediscl.roq_reserve (this._name, new Date().getTime (), incr, function (err, res) { | ||
if (err) return done (err); | ||
// res is [id, mature, text] | ||
done (null, res); | ||
}); | ||
} | ||
////////////////////////////////// | ||
commit (id, done) { | ||
////////////////////////////////// | ||
this._rediscl.roq_commit (this._name, id, function (err, res) { | ||
if (err) return done (err); | ||
// res is id | ||
done (null, res); | ||
}); | ||
} | ||
////////////////////////////////// | ||
rollback (id, done) { | ||
////////////////////////////////// | ||
this._rediscl.roq_rollback (this._name, id, new Date().getTime (), function (err, res) { | ||
if (err) return done (err); | ||
// res is id | ||
done (null, res); | ||
}); | ||
} | ||
////////////////////////////////// | ||
// queue size including non-mature elements | ||
@@ -113,11 +242,7 @@ totalSize (callback) { | ||
this._rediscl.defineCommand('roq_push', { | ||
numberOfKeys: 1, | ||
lua: _s_lua_code_push | ||
}); | ||
this._rediscl.defineCommand('roq_pop', { | ||
numberOfKeys: 1, | ||
lua: _s_lua_code_pop | ||
}); | ||
this._rediscl.defineCommand('roq_push', {numberOfKeys: 1, lua: _s_lua_code_push}); | ||
this._rediscl.defineCommand('roq_pop', {numberOfKeys: 1, lua: _s_lua_code_pop}); | ||
this._rediscl.defineCommand('roq_reserve', {numberOfKeys: 1, lua: _s_lua_code_reserve}); | ||
this._rediscl.defineCommand('roq_commit', {numberOfKeys: 1, lua: _s_lua_code_commit}); | ||
this._rediscl.defineCommand('roq_rollback', {numberOfKeys: 1, lua: _s_lua_code_rollback}); | ||
} | ||
@@ -124,0 +249,0 @@ |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
177604
38
3433