Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

bull

Package Overview
Dependencies
Maintainers
1
Versions
198
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

bull - npm Package Compare versions

Comparing version 0.1.9 to 0.1.10

.editorconfig

26

lib/job.js

@@ -98,3 +98,3 @@ "use strict";

'end'].join('\n');
return this.queue.client.evalAsync(script, 1, this.lockKey(), token).then(function(result){

@@ -105,8 +105,8 @@ return result === 1;

Job.prototype.completed = function(){
return this._done('completed');
Job.prototype.moveToCompleted = function(){
return this._moveToSet('completed');
}
Job.prototype.failed = function(err){
return this._done('failed');
Job.prototype.moveToFailed = function(err){
return this._moveToSet('failed');
}

@@ -149,3 +149,3 @@

return this.queue.client.evalAsync(
script,
script,
keys.length,

@@ -167,16 +167,16 @@ keys[0],

.sismemberAsync(this.queue.toKey(list), this.jobId).then(function(isMember){
return isMember === 1;
});
return isMember === 1;
});
}
Job.prototype._done = function(list){
Job.prototype._moveToSet = function(set){
var queue = this.queue;
var activeList = queue.toKey('active');
var completedList = queue.toKey(list);
var destinationSet = queue.toKey(set);
var multi = queue.multi();
return multi
.lrem(activeList, 0, this.jobId)
.sadd(completedList, this.jobId)
.sadd(destinationSet, this.jobId)
.execAsync();

@@ -183,0 +183,0 @@ }

@@ -29,2 +29,3 @@ "use strict";

var LOCK_RENEW_TIME = 5000; // 5 seconds is the renew time.
var CLIENT_CLOSE_TIMEOUT_MS = 5000;

@@ -84,4 +85,12 @@ var Queue = function Queue(name, redisPort, redisHost, redisOptions){

Queue.prototype.close = function(){
this.client.end();
this.bclient.end();
var _this = this;
var timeoutMsg = 'Timed out while waiting for redis clients to close';
return new Promise(function(resolve, reject) {
var triggerEvent = _.after(2, resolve);
_this.client.end();
_this.bclient.end();
_this.client.stream.on('close', triggerEvent);
_this.client.stream.on('close', triggerEvent);
}).timeout(CLIENT_CLOSE_TIMEOUT_MS, timeoutMsg);
}

@@ -98,7 +107,7 @@

this.handler = handler;
this.run().catch(function(err){
console.log(err);
});
this.handler = handler;
};

@@ -141,4 +150,4 @@

var multi = this.multi();
multi.llen(this.toKey('wait'))
multi.llen(this.toKey('paused'))
multi.llen(this.toKey('wait'));
multi.llen(this.toKey('paused'));

@@ -173,14 +182,5 @@ return multi.execAsync().then(function(res){

return multi.execAsync().then(function(res){
var waiting = res[0];
var paused = res[1];
return multi.execAsync().spread(function(waiting, paused){
var jobKeys = (paused.concat(waiting)).map(_this.toKey, _this);
var jobKeys = _.map(waiting, function(jobId){
return _this.toKey(jobId);
});
jobKeys = jobKeys.concat(_.map(paused, function(jobId){
return _this.toKey(jobId);
}));
if(jobKeys.length){

@@ -213,11 +213,7 @@ var multi = _this.multi();

if(_this.processing){
_this.once('completed', function(){
_this.emit('paused');
resolve();
});
_this.once('completed', resolve);
}else{
_this.emit('paused');
resolve();
}
});
}).then(this.emit.bind(this, 'paused'));

@@ -242,6 +238,3 @@ return this.paused;

Queue.prototype.run = function(){
var _this = this;
return this.processStalledJobs().then(function(){
return _this.processJobs();
});
return this.processStalledJobs().then(this.processJobs.bind(this));
}

@@ -259,8 +252,8 @@

return Job.fromId(_this, jobId);
})).then(function(jobs){
var tasks = jobs.map(function(job){
return _.bind(_this.processStalledJob, _this, job);
});
return sequence(tasks);
}))
}).then(function(jobs){
var tasks = jobs.map(function(job){
return _this.processStalledJob.bind(_this, job);
});
return sequence(tasks);
});

@@ -286,9 +279,9 @@ }

return this.getNextJob().then(function(job){
return _this.processJob(job);
}).then(function(){
if(!_this.paused){
return _this.processJobs();
}
});
return this.getNextJob()
.then(this.processJob.bind(this))
.then(function(){
if(!_this.paused){
return _this.processJobs();
}
});
}

@@ -298,51 +291,41 @@

var _this = this;
var deferred = Promise.defer();
var lockRenewTimeout;
var lockRenewer = function(){
job.takeLock(_this.token, true);
job.renewLock();
lockRenewTimeout = setTimeout(lockRenewer, _this.LOCK_RENEW_TIME/2);
};
var runHandler = Promise.promisify(this.handler.bind(this));
if(!this.paused){
this.processing = true;
try{
lockRenewer();
_this.handler(job, function(err, data){
if(err){
failed(err);
}else{
completed(data);
}
});
} catch(err){
failed(err)
}
}else{
deferred.resolve();
function finishProcessing(){
clearTimeout(lockRenewTimeout);
_this.processing = false;
}
function completed(data){
var promise = job.completed();
promise.then(function(){
clearTimeout(lockRenewTimeout);
_this.processing = false;
_this.emit('completed', job, data);
function handleCompleted(data){
return job.moveToCompleted().then(function(){
_this.emit('completed', job, data);
});
deferred.resolve(promise);
}
function failed(err){
var promise = job.failed(err);
promise.then(function(){
job.releaseLock(_this.token).then(function(){
clearTimeout(lockRenewTimeout);
_this.processing = false;
_this.emit('failed', job, err);
});
});
deferred.resolve(promise);
function handleFailed(err){
var error = err.cause || err; //Handle explicit rejection
return job.moveToFailed(err)
.then(job.releaseLock.bind(job, _this.token))
.then(function(){
_this.emit('failed', job, error);
});
}
return deferred.promise;
return new Promise(function (resolve, reject) {
if(_this.paused){
return resolve();
}
_this.processing = true;
lockRenewer();
return runHandler(job)
.then(handleCompleted, handleFailed)
.then(finishProcessing)
.then(resolve, reject);
});
}

@@ -354,6 +337,4 @@

Queue.prototype.getNextJob = function(){
var _this = this;
return this.moveJob('wait', 'active').then(function(jobId){
return Job.fromId(_this, jobId);
});
var getJobFromId = Job.fromId.bind(null, this); //should this be a queue method?
return this.moveJob('wait', 'active').then(getJobFromId);
}

@@ -398,2 +379,4 @@

var _this = this;
var key = this.toKey(queueType);
var jobs;

@@ -403,8 +386,3 @@ start = _.isUndefined(start) ? 0 : start;

var key = this.toKey(queueType);
var jobs;
if(isList){
start = _.isUndefined(start) ? 0 : start;
end = _.isUndefined(end) ? -1 : end;
jobs = this.client.lrangeAsync(key, start, end);

@@ -416,7 +394,4 @@ }else{

return jobs.then(function(jobIds){
if(jobIds.length){
return Promise.all(_.map(jobIds, function(jobId){
return Job.fromId(_this, jobId);
}));
}
var jobsFromId = jobIds.map(Job.fromId.bind(null, _this));
return Promise.all(jobsFromId);
});

@@ -423,0 +398,0 @@ }

{
"name": "bull",
"version": "0.1.9",
"version": "0.1.10",
"description": "Job manager",

@@ -20,3 +20,3 @@ "main": "index.js",

"dependencies": {
"bluebird": "^2.3.0",
"bluebird": "~2.3.0",
"lodash": "~2.2.1",

@@ -23,0 +23,0 @@ "node-uuid": "~1.4.1",

@@ -17,2 +17,3 @@ Bull Job Manager

[![BuildStatus](https://secure.travis-ci.org/OptimalBits/bull.png?branch=master)](http://travis-ci.org/OptimalBits/bull)
[![NPM version](https://badge.fury.io/js/bull.svg)](http://badge.fury.io/js/bull)

@@ -19,0 +20,0 @@ Follow [manast](http://twitter.com/manast) for news and updates regarding this library.

@@ -8,3 +8,3 @@ var Job = require('../lib/job');

var queue;
before(function(done){

@@ -27,9 +27,9 @@ queue = new Queue('test', 6379, '127.0.0.1');

expect(job).to.have.property('data');
expect(job.data.foo).to.be.equal('bar');
Job.fromId(queue, job.jobId).then(function(storedJob){
expect(storedJob).to.have.property('jobId');
expect(storedJob).to.have.property('data');
expect(storedJob.data.foo).to.be.equal('bar');

@@ -46,3 +46,3 @@ done();

});
it('remove', function(done){

@@ -52,5 +52,5 @@ Job.create(queue, 1, {foo: 'bar'}).then(function(job){

expect(job).to.have.property('data');
expect(job.data.foo).to.be.equal('bar');
job.remove().then(function(){

@@ -70,4 +70,4 @@ Job.fromId(queue, job.jobId).then(function(storedJob){

})
describe('Locking', function(){

@@ -78,3 +78,3 @@ it('take a lock', function(done){

expect(job).to.have.property('data');
return job.takeLock('123').then(function(lockTaken){

@@ -88,3 +88,3 @@ expect(lockTaken).to.be(true);

});
it('take an already taken lock', function(done){

@@ -94,3 +94,3 @@ Job.create(queue, 2, {foo: 'bar'}).then(function(job){

expect(job).to.have.property('data');
return job.takeLock('123').then(function(lockTaken){

@@ -108,3 +108,3 @@ expect(lockTaken).to.be(true);

});
it('renew a taken lock', function(done){

@@ -114,3 +114,3 @@ Job.create(queue, 3, {foo: 'bar'}).then(function(job){

expect(job).to.have.property('data');
return job.takeLock('123').then(function(lockTaken){

@@ -128,3 +128,3 @@ expect(lockTaken).to.be(true);

});
it('release a lock', function(done){

@@ -134,3 +134,3 @@ Job.create(queue, 4, {foo: 'bar'}).then(function(job){

expect(job).to.have.property('data');
return job.takeLock('123').then(function(lockTaken){

@@ -153,4 +153,4 @@ expect(lockTaken).to.be(true);

})
it('report progress', function(done){

@@ -175,4 +175,4 @@ Job.create(queue, 2, {foo: 'bar'}).then(function(job){

});
it('completed', function(done){
it('moveToCompleted', function(done){
Job.create(queue, 3, {foo: 'bar'}).then(function(job){

@@ -182,3 +182,3 @@ return job.isCompleted().then(function(isCompleted){

}).then(function(){
return job.completed();
return job.moveToCompleted();
}).then(function(){

@@ -194,4 +194,4 @@ return job.isCompleted().then(function(isCompleted){

});
it('failed', function(done){
it('moveToFailed', function(done){
Job.create(queue, 4, {foo: 'bar'}).then(function(job){

@@ -201,3 +201,3 @@ return job.isFailed().then(function(isFailed){

}).then(function(){
return job.failed(Error("test error"));
return job.moveToFailed(Error("test error"));
}).then(function(){

@@ -212,5 +212,5 @@ return job.isFailed().then(function(isFailed){

});
});
});

@@ -8,16 +8,31 @@ var Job = require('../lib/job');

function buildQueue() {
return new Queue(STD_QUEUE_NAME, 6379, '127.0.0.1');
}
function cleanupQueue(queue, done){
queue.empty()
.then(queue.close.bind(queue))
.finally(done)
}
describe('Queue', function(){
var queue;
beforeEach(function(done){
queue = Queue(STD_QUEUE_NAME, 6379, '127.0.0.1');
done();
afterEach(function(done){
if(queue){
cleanupQueue(queue, done);
queue = undefined;
} else {
done();
}
});
afterEach(function(done){
queue.empty().then(function(){
queue.close();
done();
describe('.close', function () {
it('should return a promise', function (done) {
var testQueue = new Queue('test');
var closePromise = testQueue.close().finally(done);
expect(closePromise).to.be.a(Promise);
});
})
});

@@ -108,2 +123,3 @@ it('create a queue with standard redis opts', function(done){

it('process a job', function(done){
queue = buildQueue();
queue.process(function(job, jobDone){

@@ -124,2 +140,3 @@ expect(job.data.foo).to.be.equal('bar')

it('process a job that updates progress', function(done){
queue = buildQueue();
queue.process(function(job, jobDone){

@@ -146,2 +163,3 @@ expect(job.data.foo).to.be.equal('bar')

it('process a job that returns data in the process handler', function(done){
queue = buildQueue();
queue.process(function(job, jobDone){

@@ -270,34 +288,28 @@ expect(job.data.foo).to.be.equal('bar')

it('does not process a job that is being processed when a new queue starts', function(done){
var jobId;
queue.add({foo: 'bar'}).then(function(job){
jobId = job.jobId;
});
var err = null;
var anotherQueue;
queue.process(function(job, jobDone){
expect(job.data.foo).to.be.equal('bar')
queue = buildQueue();
if(jobId !== job.jobId){
done(Error("Missmatch job ids"));
}
queue.add({foo: 'bar'}).then(function(addedJob){
queue.process(function(job, jobDone){
expect(job.data.foo).to.be.equal('bar')
setTimeout(function(){
jobDone();
}, 100);
});
if(addedJob.jobId !== job.jobId){
err = new Error('Processed job id does not match that of added job');
}
queue.on('completed', function(job){
anotherQueue.close();
done();
});
setTimeout(jobDone, 100);
});
var anotherQueue = Queue(STD_QUEUE_NAME, 6379, '127.0.0.1');
setTimeout(function(){
anotherQueue = Queue(STD_QUEUE_NAME, 6379, '127.0.0.1');
anotherQueue.process(function(job, jobDone){
if(job.jobId === jobId){
done(Error("SHOULD NOT PROCESS"));
}
err = new Error('The second queue should not have received a job to process');
jobDone();
});
}, 50);
queue.on('completed', function(){
cleanupQueue(anotherQueue, done.bind(null, err));
});
});
});

@@ -309,6 +321,8 @@

var jobError = Error("Job Failed");
queue = buildQueue();
queue.process(function(job, jobDone){
expect(job.data.foo).to.be.equal('bar')
jobDone(jobError);
})
});

@@ -332,2 +346,5 @@ queue.add({foo: 'bar'}).then(function(job){

var jobError = new Error("Job Failed");
queue = buildQueue();
queue.process(function(job, jobDone){

@@ -379,2 +396,4 @@ expect(job.data.foo).to.be.equal('bar')

queue = buildQueue();
queue.process(function(job, jobDone){

@@ -398,2 +417,4 @@ expect(job.data.num).to.be.equal(counter);

queue = buildQueue();
for(var i=1; i<=maxJobs; i++){

@@ -420,2 +441,4 @@ added.push(queue.add({foo: 'bar', num: i}));

queue = buildQueue();
queue.process(function(job, jobDone){

@@ -446,2 +469,4 @@ expect(ispaused).to.be(false);

queue = buildQueue();
queue.process(function(job, jobDone){

@@ -513,2 +538,3 @@ expect(ispaused).to.be(false);

it('should get waitting jobs', function(done){
queue = buildQueue();
Promise.join(queue.add({foo: 'bar'}), queue.add({baz: 'qux'})).then(function(){

@@ -528,2 +554,3 @@ queue.getWaiting().then(function(jobs){

queue = buildQueue();
queue.process(function(job, jobDone){

@@ -545,2 +572,3 @@ queue.getActive().then(function(jobs){

queue = buildQueue();
queue.add(data).then(function(job) {

@@ -558,2 +586,3 @@ queue.getJob(job.jobId).then(function(returnedJob) {

queue = buildQueue();
queue.process(function(job, jobDone){

@@ -583,2 +612,4 @@ jobDone();

queue = buildQueue();
queue.process(function(job, jobDone){

@@ -603,5 +634,3 @@ jobDone(Error("Forced error"));

});
});

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc