New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

node-resque

Package Overview
Dependencies
Maintainers
2
Versions
181
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

node-resque - npm Package Compare versions

Comparing version 0.1.0 to 0.2.0

25

lib/plugins/delayQueueLock.js

@@ -20,21 +20,8 @@ // If a job with the same name, queue, and args is already in the delayed queue(s), do not enqueue it again

var self = this;
var delayedKeyMatcher = self.worker.connection.key('delayed', "*");
var found = false;
self.worker.connection.redis.keys(delayedKeyMatcher, function(err, timestamps){
var started = 0;
timestamps.forEach(function(timestamp){
started++;
self.worker.connection.redis.lrange(timestamp, 0, -1, function(err, jobs){
for(var i in jobs){
var job = JSON.parse(jobs[i]);
if(job.class == self.func && job.queue == self.queue && JSON.stringify(job.args) == JSON.stringify(self.args)){
found = true;
break;
}
}
started--;
if(started == 0){ callback(null, !found); }
});
});
if(started == 0){ callback(null, !found); }
self.worker.queueObject.scheduledAt(self.queue, self.func, self.args, function(err, timestamps){
if(timestamps.length > 0){
callback(null, false);
}else{
callback(null, true);
}
});

@@ -41,0 +28,0 @@ }

@@ -32,2 +32,10 @@ var util = require('util');

queue.prototype.encode = function(q, func, args){
return JSON.stringify({
"class": func,
queue: q,
args: args || []
});
}
queue.prototype.enqueue = function(q, func, args, callback){

@@ -41,6 +49,3 @@ var self = this;

self.connection.redis.sadd(self.connection.key('queues'), q, function(){
self.connection.redis.rpush(self.connection.key('queue', q), JSON.stringify({
"class": func,
args: args || []
}), function(){
self.connection.redis.rpush(self.connection.key('queue', q), self.encode(q, func, args), function(){
self.runPlugins('after_enqueue', func, q, job, args, function(err, toRun){

@@ -58,11 +63,9 @@ if(typeof callback == "function"){ callback(); }

var self = this;
var item = JSON.stringify({
"class": func,
queue: q,
args: args || []
});
var item = self.encode(q, func, args);
var rTimestamp = Math.round(timestamp / 1000); // assume timestamp is in ms
self.connection.redis.rpush(self.connection.key("delayed:" + rTimestamp), item, function(){
self.connection.redis.zadd(self.connection.key('delayed_queue_schedule'), rTimestamp, rTimestamp, function(){
if(typeof callback == "function"){ callback(); }
self.connection.redis.sadd(self.connection.key("timestamps:" + item), self.connection.key("delayed:" + rTimestamp), function(){
self.connection.redis.zadd(self.connection.key('delayed_queue_schedule'), rTimestamp, rTimestamp, function(){
if(typeof callback == "function"){ callback(); }
});
});

@@ -94,2 +97,49 @@ });

queue.prototype.del = function(q, func, args, count, callback){
var self = this;
if(typeof count == "function" && callback == null){
callback = count;
count = 0; // remove all enqueued items that match
}
self.connection.redis.lrem(self.connection.key('queue', q), count, self.encode(q, func, args), function(err, count){
if(typeof callback == "function"){ callback(err, count); }
});
}
queue.prototype.delDelayed = function(q, func, args, callback){
var self = this;
var search = self.encode(q, func, args);
var timestamps = self.connection.redis.smembers(self.connection.key("timestamps:" + search), function(err, members){
if(members.length == 0 ){ if(typeof callback == "function"){ callback(err, null); } }
else{
var started = 0;
var timestamps = [];
members.forEach(function(key){
started++;
self.connection.redis.lrem(key, 0, search, function(){
self.connection.redis.srem(self.connection.key("timestamps:" + search), key, function(){
timestamps.push(key.split(":")[key.split(":").length - 1]);
started--;
if(started == 0){
if(typeof callback == "function"){ callback(err, timestamps); }
}
})
})
});
}
})
}
queue.prototype.scheduledAt = function(q, func, args, callback){
var self = this;
var search = self.encode(q, func, args);
self.connection.redis.smembers(self.connection.key("timestamps:" + search), function(err, members){
var timestamps = [];
members.forEach(function(key){
timestamps.push(key.split(":")[key.split(":").length - 1]);
})
if(typeof callback == "function"){ callback(err, timestamps); }
});
}
exports.queue = queue;

@@ -108,8 +108,10 @@ // TODO: Locking like ruby does

self.connection.redis.lpop(key, function(err, job){
self.cleanupTimestamp(timestamp, function(){
if (err) {
callback(err);
} else {
callback(false, JSON.parse(job));
}
self.connection.redis.srem(self.connection.key("timestamps:" + job), key, function(){
self.cleanupTimestamp(timestamp, function(){
if (err) {
callback(err);
} else {
callback(false, JSON.parse(job));
}
});
});

@@ -116,0 +118,0 @@ });

@@ -5,3 +5,3 @@ {

"description": "an opinionated implementation of resque in node",
"version": "0.1.0",
"version": "0.2.0",
"homepage": "http://github.com/taskrabbit/node-resque",

@@ -12,3 +12,3 @@ "repository": {

},
"main": "index.js",
"main": "index.js",
"keywords": ["delayed", "queue", "resque", "redis"],

@@ -15,0 +15,0 @@ "engines": {

# node-resque
Delayed Tasks in nodejs. A very opinionated but compatible API with [resque](https://github.com/resque/resque) and [resque scheduler](https://github.com/resque/resque-scheduler)
[Find me on the NPM @ node-resque](https://npmjs.org/package/node-resque)
[![Build Status](https://secure.travis-ci.org/taskrabbit/node-resque.png?branch=master)](http://travis-ci.org/taskrabbit/node-resque)

@@ -137,3 +139,17 @@

```
## Queue Managment
Additonal methods provided on the `queue` object:
- **queue.prototype.queues** = function(callback)
- callback(error, array_of_queues)
- **queue.prototype.length** = function(q, callback)
- callback(error, number_of_elements_in_queue)
- **queue.prototype.del** = function(q, func, args, count, callback)
- callback(error, number_of_items_deleted)
- **queue.prototype.delDelayed** = function(q, func, args, callback)
- callback(error, timestamps_the_job_was_removed_from)
- **queue.prototype.scheduledAt** = function(q, func, args, callback)
- callback(error, timestamps_the_job_is_scheduled_for)
## Plugins

@@ -140,0 +156,0 @@

@@ -149,13 +149,11 @@ describe('plugins', function(){

queue.enqueueIn((10 * 1000) ,specHelper.queue, "uniqueJob", [1,2], function(){
setTimeout(function(){
queue.enqueue(specHelper.queue, "uniqueJob", [1,2], function(){
specHelper.redis.zcount(specHelper.namespace + ":delayed_queue_schedule", '-inf', '+inf', function(err, delayedLen){
queue.length(specHelper.queue, function(err, queueLen){
delayedLen.should.equal(1);
queueLen.should.equal(0);
done();
});
queue.enqueue(specHelper.queue, "uniqueJob", [1,2], function(){
specHelper.redis.zcount(specHelper.namespace + ":delayed_queue_schedule", '-inf', '+inf', function(err, delayedLen){
queue.length(specHelper.queue, function(err, queueLen){
delayedLen.should.equal(1);
queueLen.should.equal(0);
done();
});
});
}, 1001)
});
});

@@ -166,13 +164,11 @@ });

queue.enqueueIn((10 * 1000) ,specHelper.queue, "uniqueJob", [1,2], function(){
setTimeout(function(){
queue.enqueue(specHelper.queue, "uniqueJob", [3,4], function(){
specHelper.redis.zcount(specHelper.namespace + ":delayed_queue_schedule", '-inf', '+inf', function(err, delayedLen){
queue.length(specHelper.queue, function(err, queueLen){
delayedLen.should.equal(1);
queueLen.should.equal(1);
done();
});
queue.enqueue(specHelper.queue, "uniqueJob", [3,4], function(){
specHelper.redis.zcount(specHelper.namespace + ":delayed_queue_schedule", '-inf', '+inf', function(err, delayedLen){
queue.length(specHelper.queue, function(err, queueLen){
delayedLen.should.equal(1);
queueLen.should.equal(1);
done();
});
});
}, 1001)
});
});

@@ -179,0 +175,0 @@ });

@@ -9,8 +9,12 @@ describe('queue', function(){

specHelper.connect(function(){
specHelper.cleanup(function(){
done();
});
done();
});
});
beforeEach(function(done){
specHelper.cleanup(function(){
done();
});
})
after(function(done){

@@ -83,2 +87,36 @@ specHelper.cleanup(function(){

it('can find previously scheduled jobs', function(done){
queue.enqueueAt(10000, specHelper.queue, 'someJob', [1,2,3], function(){
queue.scheduledAt(specHelper.queue, 'someJob', [1,2,3], function(err, timestamps){
timestamps.length.should.equal(1);
timestamps[0].should.equal('10');
done();
});
});
});
it('can deleted an enqued job', function(done){
queue.enqueue(specHelper.queue, 'someJob', [1,2,3], function(){
queue.length(specHelper.queue, function(err, len){
len.should.equal(1);
queue.del(specHelper.queue, 'someJob', [1,2,3], function(){
queue.length(specHelper.queue, function(err, len){
len.should.equal(0);
done();
});
});
});
});
});
it('can deleted a delayed job', function(done){
queue.enqueueAt(10000, specHelper.queue, 'someJob', [1,2,3], function(){
queue.delDelayed(specHelper.queue, 'someJob', [1,2,3], function(err, timestamps){
timestamps.length.should.equal(1);
timestamps[0].should.equal('10');
done();
});
});
});
});
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc