Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

query-swarm

Package Overview
Dependencies
Maintainers
4
Versions
10
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

query-swarm - npm Package Compare versions

Comparing version 0.2.0 to 0.3.0

4

lib/QuerySwarm.js

@@ -56,2 +56,4 @@ 'use strict';

lockTimeout: typeof options.lockTimeout === 'number' ? options.lockTimeout : 20000,
// the max number of attempts to process a task
maxProcessingRetries: typeof options.maxProcessingRetries === 'number' ? options.maxProcessingRetries : 0,
// maximum concurrent workers per process

@@ -138,3 +140,3 @@ get concurrency() { return concurrency; },

// acquire the distributed throttle
redis.set(self.id + ':throttle', null, 'PX', self.options.throttle, 'NX', function(err, throttle) {
self.redlock.lock(self.id + ':throttle', self.options.throttle, function(err, throttle) {
if(err || !throttle) {

@@ -141,0 +143,0 @@ lock.unlock();

@@ -23,2 +23,11 @@ 'use strict';

function RequeueError(task) {
this.name = 'RequeueError';
this.message = 'Unable to requeue task. Retrying.';
this.task = task;
}
util.inherits(RequeueError, Error);
module.exports = function(redis) {

@@ -33,2 +42,3 @@

// null || setTimeoutObject from waiting job
this.immediate = null;
this.timeout = null;

@@ -56,6 +66,7 @@ }

// if we have a timout set, just clear it and return
if(self.timeout !== null){
clearImmediate(self.timeout);
// if we have a timeout or immediate set, just clear it and return
if(self.timeout !== null || self.immediate !== null){
clearImmediate(self.immediate);
clearTimeout(self.timeout);
self.immediate = null;
self.timeout = null;

@@ -86,4 +97,5 @@ self.active = false;

// we're done with the timeout, so remove it
clearImmediate(self.timeout);
clearImmediate(self.immediate);
clearTimeout(self.timeout);
self.immediate = null;
self.timeout = null;

@@ -101,3 +113,3 @@

return setImmediate(self.next.bind(self), method, false);
return (self.immediate = setImmediate(self.next.bind(self), method, false));
};

@@ -133,6 +145,8 @@

// run the user-provided worker
self.master.worker(JSON.parse(task), function(err, result){
self.master.worker(JSON.parse(task), function(err, result, forceDeadletter){
if(err) {
self.master.emit('error', err, 'error running the user-proviced worker');
return self.deadletter(task);
self.master.emit('error', err, 'error running the user-provided worker');
// checking maxProcessingRetries here prevents an unnecessary zsore in the requeue method if retries are not enabled
return self.master.options.maxProcessingRetries && !forceDeadletter ? self.requeue(task) : self.deadletter(task);
}

@@ -158,2 +172,5 @@

// remove from retry list
.zrem(self.master.id + ':retries', task)
.exec(function(err){

@@ -166,2 +183,4 @@ if(err){

self.master.emit('error', new DeadletterError(task));
// save a reference to this timer somewhere so it can be cleared on stop?
setTimeout(self.deadletter.bind(self), self.master.options.retryDelay, task);

@@ -179,11 +198,54 @@ }

// remove from processing list
redis.lrem(self.master.id + ':processing', 1, task, function(err){
if(err){
self.master.emit('error', err, 'error acknowledging task');
self.master.emit('error', new AcknowledgeError(task));
setTimeout(self.acknowledge.bind(self), self.master.options.retryDelay, task);
redis.multi()
// remove from processing list
.lrem(self.master.id + ':processing', 1, task)
// remove from retry list
.zrem(self.master.id + ':retries', task)
.exec(function(err){
if(err){
self.master.emit('error', err, 'error acknowledging task');
self.master.emit('error', new AcknowledgeError(task));
// save a reference to this timer somewhere so it can be cleared on stop?
setTimeout(self.acknowledge.bind(self), self.master.options.retryDelay, task);
}
self.master.emit('acknowledge', task);
return self.next('consume');
});
};
Worker.prototype.requeue = function(task){
var self = this;
redis.zscore(self.master.id + ':retries', task, function(err, score) {
if (err) {
self.master.emit('error', err, 'error requeueing task');
self.master.emit('error', new RequeueError(task));
// save a reference to this timer somewhere so it can be cleared on stop?
return setTimeout(self.requeue.bind(self), self.master.options.retryDelay, task);
}
self.master.emit('acknowledge', task);
return self.next('consume');
if (score >= self.master.options.maxProcessingRetries) {
return self.deadletter(task);
}
// move from processing list back to queue and increment retry score
redis.multi()
.lrem(self.master.id + ':processing', 1, task)
.lpush(self.master.id + ':queue', task)
.zincrby(self.master.id + ':retries', 1, task)
.exec(function(err, replies) {
if(err){
self.master.emit('error', err, 'error requeueing task');
self.master.emit('error', new RequeueError(task));
// save a reference to this timer somewhere so it can be cleared on stop?
return setTimeout(self.requeue.bind(self), self.master.options.retryDelay, task);
}
self.master.emit('requeue', task);
return self.next('consume');
});
});

@@ -190,0 +252,0 @@ };

{
"name": "query-swarm",
"version": "0.2.0",
"version": "0.3.0",
"description": "Safely distribute query-driven tasks over a swarm of parallel functions on a single process or across multiple processes/machines.",

@@ -5,0 +5,0 @@ "keywords": "schedule, queue, task, job, worker, redis, distributed, async",

@@ -75,1 +75,6 @@ Query Swarm [![Build Status](https://travis-ci.org/the-control-group/query-swarm.svg?branch=master)](https://travis-ci.org/the-control-group/query-swarm)

- acknowledge: task
- requeue: task
Caveats
-------
Processing retry functionality via the `maxProcessingRetries` option, assumes your tasks are all unique at any given point in time.

@@ -21,2 +21,14 @@ 'use strict';

after(function(done) {
redis.del('QuerySwarm:test:cursor', done);
});
before(function(done) {
redis.scan('0',function(err, results) {
if (err) return done(err);
if (results[1].length !== 0) return done(new Error('Redis is not empty'));
done();
})
})
describe('Start/Stop', function(){

@@ -203,2 +215,109 @@ var q = 0;

describe('Requeue', function(){
it('should requeue up to maxProcessingRetries', function(done) {
var r = 0;
var d = 0;
var e = 0;
var q = 0;
var w = 0;
var tasks = [1,1];
var expectedWorkerRuns = tasks.length * 2;
var expectedRequeues = tasks.length;
var expectedDeadletters = tasks.length;
var expectedErrors = tasks.length * 2;
var swarm = new QuerySwarm(
'QuerySwarm:test:'+this.test.fullTitle(),
function(cursor, callback) {
q++;
callback(null, cursor, tasks.splice(0,1));
},
function(task, callback) {
w++;
callback(new Error('test'))
},
{
throttle: 10,
threshold: 4,
retryDelay: 50,
lockTimeout: 200,
concurrency: 2,
maxProcessingRetries: 1,
}
);
swarm.on('error', function(){
e++;
});
swarm.on('requeue', function(task) {
r++;
assert.equal(task, 1);
});
swarm.on('deadletter', function(task) {
d++;
assert.equal(task, 1);
})
swarm.start();
setTimeout(function(){
swarm.destroy(function(){
assert.equal(w, expectedWorkerRuns);
assert.equal(r, expectedRequeues);
assert.equal(d, expectedDeadletters);
assert.equal(e, expectedErrors);
done();
});
},500)
});
it('should requeue up to maxProcessingRetries unless the worker specifies force_deadletter', function(done) {
var r = 0;
var d = 0;
var e = 0;
var q = 0;
var w = 0;
var tasks = [1,1];
var expectedWorkerRuns = tasks.length;
var expectedRequeues = 0;
var expectedDeadletters = tasks.length;
var expectedErrors = tasks.length;
var swarm = new QuerySwarm(
'QuerySwarm:test:'+this.test.fullTitle(),
function(cursor, callback) {
q++;
callback(null, cursor, tasks.splice(0,1));
},
function(task, callback) {
w++;
callback(new Error('test'), null, true)
},
{
throttle: 10,
threshold: 4,
retryDelay: 50,
lockTimeout: 200,
concurrency: 2,
maxProcessingRetries: 1,
}
);
swarm.on('error', function(){
e++;
});
swarm.on('requeue', function(task) {
r++;
assert.equal(task, 1);
});
swarm.on('deadletter', function(task) {
d++;
assert.equal(task, 1);
})
swarm.start();
setTimeout(function(){
swarm.destroy(function(){
assert.equal(w, expectedWorkerRuns);
assert.equal(r, expectedRequeues);
assert.equal(d, expectedDeadletters);
assert.equal(e, expectedErrors);
done();
});
},500)
});
});
describe('Errors', function(){

@@ -205,0 +324,0 @@

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc