Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

query-swarm

Package Overview
Dependencies
Maintainers
1
Versions
10
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

query-swarm - npm Package Compare versions

Comparing version 0.0.5 to 0.0.6

42

lib/QuerySwarm.js

@@ -5,2 +5,3 @@ 'use strict';

var EventEmitter = require('events').EventEmitter;
var Redlock = require('redlock');
var async = require('async');

@@ -10,3 +11,3 @@

this.name = 'UndeadLock';
this.message = 'The lock expired before the query returned';
this.message = 'The lock expired before the query returned.';
this.cursor = cursor;

@@ -53,3 +54,3 @@ this.tasks = tasks;

threshold: typeof options.threshold === 'number' ? options.threshold : 10,
// the max duration a query can run before we try again
// the duration a worker waits between between polling an empty queue
retryDelay: typeof options.retryDelay === 'number' ? options.retryDelay : 500,

@@ -74,2 +75,5 @@ // the max duration a query can run before we try again

self.options.concurrency = typeof options.concurrency === 'number' ? options.concurrency : 10;
// create instance or redlock that won't retry for a lock
self.redlock = new Redlock({ retryCount: 0 }, redis);
}

@@ -109,3 +113,3 @@

redis.del(self.id + ':lock', self.id + ':cursor', self.id + ':queue', self.id + ':processing', self.id + ':deadletter', callback);
redis.del(self.id + ':lock', self.id + ':throttle', self.id + ':cursor', self.id + ':queue', self.id + ':processing', self.id + ':deadletter', callback);
});

@@ -130,16 +134,19 @@ return self;

// acquire a redis lock
var timeout = Date.now() + self.options.lockTimeout;
redis.set(self.id + ':lock', null, 'PX', self.options.lockTimeout + 1000, 'NX', function(err, result) {
if(result === null)
self.redlock.lock(self.id + ':lock', self.options.lockTimeout, function(err, lock) {
if(err || !lock)
return;
// acquire the distributed throttle
redis.set(self.id + ':throttle', null, 'PX', self.throttle, 'NX', function(err, result) {
if(result === null)
redis.set(self.id + ':throttle', null, 'PX', self.options.throttle, 'NX', function(err, throttle) {
if(err || !throttle) {
lock.unlock();
return;
}
// get the cursor
redis.get(self.id + ':cursor', function(err, cursor) {
if(err)
if(err) {
lock.unlock();
return self.emit('error', err, 'error getting cursor');
}

@@ -151,7 +158,4 @@ cursor = JSON.parse(cursor);

if(err) {
self.emit('error', err, 'error running the user-provided query');
if(Date.now() >= timeout) return;
return redis.del(self.id + ':lock', function(err) {
if(err) self.emit('error', err, 'error releasing lock');
});
lock.unlock();
return self.emit('error', err, 'error running the user-provided query');
}

@@ -162,5 +166,7 @@

// VERY BAD: we've exceeded our lock timeout
if(Date.now() >= timeout)
if(Date.now() >= lock.expiration)
return self.master.emit('error', new OrphanedQuery(cursor, tasks));
// TODO: extend the lock if its expiration has fallen below a configurable buffer
var multi = redis.multi();

@@ -178,7 +184,7 @@

// release the lock
multi = multi.del(self.id + ':lock');
multi.exec(function(err) {
// release the lock
lock.unlock();
// TODO: this *could* be a bad place to have an error for different reasons

@@ -185,0 +191,0 @@ // we need to check which command threw and act accordingly:

{
"name": "query-swarm",
"version": "0.0.5",
"version": "0.0.6",
"description": "Safely distribute query-driven tasks over a swarm of parallel functions on a single process or across multiple processes/machines.",

@@ -31,9 +31,9 @@ "keywords": "schedule, queue, task, job, worker, redis, distributed, async",

"async": "^0.9.0",
"redis": "^0.11.0"
"redis": "^0.11.0",
"redlock": "^0.2.0"
},
"devDependencies": {
"async": "^0.9.0",
"chai": "^1.9.2",
"chai": "^3.0.0",
"mocha": "^2.0.1"
}
}

@@ -66,4 +66,10 @@ 'use strict';

setTimeout(function(){
// the swarm should query once
assert.equal(q, 1);
assert.ok(w > 9 && w < 12); // can be 10 or 11, because we have 2 workers
// and consume the first 10 jobs before issuing a stop command;
// could have processed 10 or 11 jobs, because we have 2 workers
assert.isAbove(w, 9);
assert.isBelow(w, 12);
done();

@@ -76,4 +82,10 @@ }, 500);

setTimeout(function(){
// the swarm should query one more time
assert.equal(q, 2);
assert.ok(w > 19 && w < 22); // can be 20 or 21, because we have 2 workers
// and consume the next 10 jobs before issuing a stop command;
// could have processed 20 or 21 jobs, because we have 2 workers
assert.isAbove(w, 19);
assert.isBelow(w, 22);
done();

@@ -80,0 +92,0 @@ }, 500);

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc