Socket
Socket
Sign inDemoInstall

keuss

Package Overview
Dependencies
Maintainers
1
Versions
76
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

keuss - npm Package Compare versions

Comparing version 1.0.5 to 1.0.6

.idea/keuss.iml

72

backends/redis-oq.js

@@ -54,3 +54,3 @@ 'use strict';

var mature = entry.mature || Queue.now ();
self._verbose ('insert: id is %s, mature is %s', id, mature);
self._verbose ('insert: id is %s, mature is %s', id, mature);

@@ -62,3 +62,3 @@ this._roq.push (id, mature.getTime (), JSON.stringify (pl), function (err, res) {

self._verbose ('insert: inserted payload %j', pl, {});
self._verbose ('insert: inserted payload %j', pl, {});
callback (null, res);

@@ -88,3 +88,3 @@ });

// TODO check res has 2, and parses as json
// TODO check res has #2, and parses as json

@@ -98,4 +98,70 @@ pl.mature = new Date (parseInt (res[1]));

}
/////////////////////////////////////////
// reserve element: call cb (err, pl) where pl has an id
reserve (callback) {
/////////////////////////////////////////
var self = this;
var delay = this._opts.reserve_delay || 120;
this._roq.reserve (delay*1000, function (err, res) {
if (err) {
return callback (err);
}
// res is [id, mature, text]
self._verbose ('reserve: obtained %j', res, {});
if (!res) {
callback (null, null);
}
else {
var pl = JSON.parse (res[2]);
// TODO check res has #2, and parses as json
pl.mature = new Date (parseInt (res[1]));
pl._id = res[0];
self._verbose ('reserve: final pl to return is %j', pl, {});
callback (null, pl);
}
});
}
/////////////////////////////////////////
// commit previous reserve, by p.id: call cb (err, true|false), true if element committed
commit (id, callback) {
/////////////////////////////////////////
var self = this;
this._roq.commit (id, function (err, res) {
if (err) {
return callback (err);
}
self._verbose ('commit: res is', res, {});
return callback (null, res != null)
});
}
/////////////////////////////////////////
// rollback previous reserve, by p.id: call cb (err, true|false), true if element rolled back
rollback (id, callback) {
/////////////////////////////////////////
var self = this;
this._roq.rollback (id, function (err, res) {
if (err) {
return callback (err);
}
self._verbose ('rollback: res is', res, {});
return callback (null, res != null)
});
}
//////////////////////////////////

@@ -102,0 +168,0 @@ // queue size including non-mature elements

2

package.json
{
"name": "keuss",
"version": "1.0.5",
"version": "1.0.6",
"keywords": ["queue", "job"],

@@ -5,0 +5,0 @@ "homepage":"https://github.com/pepmartinez/keuss",

@@ -8,2 +8,3 @@

var factory = null;

@@ -19,3 +20,3 @@

new (winston.transports.Console)({
level: 'info',
level: 'verbose',
timestamp: function() {return new Date ();},

@@ -318,2 +319,188 @@ formatter: function (options) {

it ('should do raw reserve & commit as expected', function (done){
var q = factory.queue('test_queue');
var id = null;
async.series([
function (cb) {q.push ({elem:1, pl:'twetrwte'}, cb)},
function (cb) {q.size (function (err, size) {
size.should.equal (1);
cb();
})},
function (cb) {q.next_t (function (err, res) {
res.getTime().should.be.approximately (new Date().getTime (), 500);
cb();
})},
function (cb) {q.reserve (function (err, res) {
id = res._id;
res.payload.should.eql ({elem:1, pl:'twetrwte'});
res.tries.should.equal (0);
cb ();
})},
function (cb) {q.size (function (err, size) {
size.should.equal (0);
cb();
})},
function (cb) {q.totalSize (function (err, size) {
size.should.equal (1);
cb();
})},
function (cb) {q.next_t (function (err, res) {
res.getTime().should.be.approximately (new Date().getTime () + 120000, 500);
cb();
})},
function (cb) {q.commit (id, function (err, res) {
res.should.equal (true);
cb ();
})},
function (cb) {q.size (function (err, size) {
size.should.equal (0);
cb();
})},
function (cb) {q.totalSize (function (err, size) {
size.should.equal (0);
cb();
})},
function (cb) {q.next_t (function (err, res) {
should.equal (res, null);
cb();
})}
], function(err, results) {
done();
});
});
it ('should do raw reserve & rollback as expected', function (done){
var q = factory.queue('test_queue');
var id = null;
async.series([
function (cb) {q.push ({elem:1, pl:'twetrwte'}, cb)},
function (cb) {q.size (function (err, size) {
size.should.equal (1);
cb();
})},
function (cb) {q.next_t (function (err, res) {
res.getTime().should.be.approximately (new Date().getTime (), 500);
cb();
})},
function (cb) {q.reserve (function (err, res) {
id = res._id;
res.payload.should.eql ({elem:1, pl:'twetrwte'});
res.tries.should.equal (0);
cb ();
})},
function (cb) {q.size (function (err, size) {
size.should.equal (0);
cb();
})},
function (cb) {q.totalSize (function (err, size) {
size.should.equal (1);
cb();
})},
function (cb) {q.next_t (function (err, res) {
res.getTime().should.be.approximately (new Date().getTime () + 120000, 500);
cb();
})},
function (cb) {q.rollback (id, function (err, res) {
res.should.equal (true);
cb ();
})},
function (cb) {q.size (function (err, size) {
size.should.equal (1);
cb();
})},
function (cb) {q.totalSize (function (err, size) {
size.should.equal (1);
cb();
})},
function (cb) {q.next_t (function (err, res) {
res.getTime().should.be.approximately (new Date().getTime (), 500);
cb();
})},
function (cb) {q.commit (id, function (err, res) {
res.should.equal (false);
cb ();
})},
function (cb) {q.size (function (err, size) {
size.should.equal (1);
cb();
})},
function (cb) {q.totalSize (function (err, size) {
size.should.equal (1);
cb();
})},
function (cb) {q.next_t (function (err, res) {
res.getTime().should.be.approximately (new Date().getTime (), 500);
cb();
})},
function (cb) {q.get (function (err, res) {
res._id.should.eql (id);
res.payload.should.eql ({elem:1, pl:'twetrwte'});
// res.tries.should.equal (1);
cb ();
})},
function (cb) {q.size (function (err, size) {
size.should.equal (0);
cb();
})},
function (cb) {q.totalSize (function (err, size) {
size.should.equal (0);
cb();
})},
function (cb) {q.next_t (function (err, res) {
should.equal (res, null);
cb();
})}
], function(err, results) {
done();
});
});
it ('should do get.reserve & ok as expected', function (done){
var q = factory.queue('test_queue');
var id = null;
async.series([
function (cb) {q.push ({elem:1, pl:'twetrwte'}, cb)},
function (cb) {q.pop ('c1', {reserve: true}, function (err, res) {
id = res._id;
res.payload.should.eql ({elem:1, pl:'twetrwte'});
res.tries.should.equal (0);
cb ();
})},
function (cb) {q.size (function (err, size) {
size.should.equal (0);
cb();
})},
function (cb) {q.totalSize (function (err, size) {
size.should.equal (1);
cb();
})},
function (cb) {q.next_t (function (err, res) {
res.getTime().should.be.approximately (new Date().getTime () + 120000, 500);
cb();
})},
function (cb) {q.ok (id, function (err, res) {
res.should.equal (true);
cb ();
})},
function (cb) {q.size (function (err, size) {
size.should.equal (0);
cb();
})},
function (cb) {q.totalSize (function (err, size) {
size.should.equal (0);
cb();
})},
function (cb) {q.next_t (function (err, res) {
should.equal (res, null);
cb();
})}
], function(err, results) {
done();
});
});
});

@@ -37,3 +37,3 @@ 'use strict';

local val = redis.call ('HGET', 'keuss:q:ordered_queue:hash:' .. KEYS[1], id)
-- delete from index, hash

@@ -46,3 +46,95 @@ redis.call ('ZREM', 'keuss:q:ordered_queue:index:' .. KEYS[1], id)

const _s_lua_code_reserve = `
-- qname in KEYS[1]
-- mature_mark in ARGV[1]
-- incr_t in ARGV[2]
-- get older (lower mature) id from index
local z_res = redis.call ('ZRANGE', 'keuss:q:ordered_queue:index:' .. KEYS[1], 0, 0, 'WITHSCORES')
if (z_res[1] == nil) then
return nil
end
local id = z_res[1]
local mature = z_res[2]
if (mature > ARGV[1]) then
-- head is not mature, just end
return nil
end
-- increment score
redis.call ('ZINCRBY', 'keuss:q:ordered_queue:index:' .. KEYS[1], ARGV[2], id)
-- get val by id from hash
local val = redis.call ('HGET', 'keuss:q:ordered_queue:hash:' .. KEYS[1], id)
local obj_val = cjson.decode (val)
if (not (obj_val.reserved)) then
-- mark as reserved
obj_val.reserved = true
redis.call ('HSET', 'keuss:q:ordered_queue:hash:' .. KEYS[1], id, cjson.encode (obj_val))
end
return { id, z_res[2], val }
`;
const _s_lua_code_commit = `
-- qname in KEYS[1]
-- id in ARGV[1]
local id = ARGV[1]
local val = redis.call ('HGET', 'keuss:q:ordered_queue:hash:' .. KEYS[1], id)
if (val == nil) then
return nil
end
-- check if it was reserved
local obj_val = cjson.decode (val)
if (not (obj_val.reserved)) then
-- not a reserved one
return nil
end
-- delete from index, hash
redis.call ('ZREM', 'keuss:q:ordered_queue:index:' .. KEYS[1], id)
redis.call ('HDEL', 'keuss:q:ordered_queue:hash:' .. KEYS[1], id)
return id
`;
const _s_lua_code_rollback = `
-- qname in KEYS[1]
-- id in ARGV[1]
-- new_t in ARGV[2]
local id = ARGV[1]
local val = redis.call ('HGET', 'keuss:q:ordered_queue:hash:' .. KEYS[1], id)
if (val == nil) then
return nil
end
local obj_val = cjson.decode (val)
if (not (obj_val.reserved)) then
-- not a reserved one
return nil
end
-- reset score
redis.call ('ZADD', 'keuss:q:ordered_queue:index:' .. KEYS[1], 'XX', ARGV[2], id)
-- reset val
obj_val.reserved = false
redis.call ('HSET', 'keuss:q:ordered_queue:hash:' .. KEYS[1], id, cjson.encode (obj_val))
return id
`;
class RedisOrderedQueue {

@@ -55,3 +147,5 @@ constructor (name, factory) {

//////////////////////////////////
push (id, mature, obj, done) {
//////////////////////////////////
this._rediscl.roq_push (this._name, id, mature, obj, function (err, res) {

@@ -65,3 +159,5 @@ if (err) return done (err);

//////////////////////////////////
pop (done) {
//////////////////////////////////
this._rediscl.roq_pop (this._name, new Date().getTime (), function (err, res) {

@@ -76,2 +172,35 @@ if (err) return done (err);

//////////////////////////////////
reserve (incr, done) {
//////////////////////////////////
this._rediscl.roq_reserve (this._name, new Date().getTime (), incr, function (err, res) {
if (err) return done (err);
// res is [id, mature, text]
done (null, res);
});
}
//////////////////////////////////
commit (id, done) {
//////////////////////////////////
this._rediscl.roq_commit (this._name, id, function (err, res) {
if (err) return done (err);
// res is id
done (null, res);
});
}
//////////////////////////////////
rollback (id, done) {
//////////////////////////////////
this._rediscl.roq_rollback (this._name, id, new Date().getTime (), function (err, res) {
if (err) return done (err);
// res is id
done (null, res);
});
}
//////////////////////////////////
// queue size including non-mature elements

@@ -113,11 +242,7 @@ totalSize (callback) {

this._rediscl.defineCommand('roq_push', {
numberOfKeys: 1,
lua: _s_lua_code_push
});
this._rediscl.defineCommand('roq_pop', {
numberOfKeys: 1,
lua: _s_lua_code_pop
});
this._rediscl.defineCommand('roq_push', {numberOfKeys: 1, lua: _s_lua_code_push});
this._rediscl.defineCommand('roq_pop', {numberOfKeys: 1, lua: _s_lua_code_pop});
this._rediscl.defineCommand('roq_reserve', {numberOfKeys: 1, lua: _s_lua_code_reserve});
this._rediscl.defineCommand('roq_commit', {numberOfKeys: 1, lua: _s_lua_code_commit});
this._rediscl.defineCommand('roq_rollback', {numberOfKeys: 1, lua: _s_lua_code_rollback});
}

@@ -124,0 +249,0 @@

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc