Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

bull

Package Overview
Dependencies
Maintainers
1
Versions
198
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

bull - npm Package Compare versions

Comparing version 0.2.7 to 0.3.0

typings/mocha/mocha.d.ts

14

a.js

@@ -19,1 +19,15 @@ "use strict";

videoQueue.add({video: 'http://example.com/video1.mov'});
/**
*
* Tasks
*
*/
queue.task('video', opts, function(input, next){
output = do_something_with_input(input);
next('postprocess', output);
});

@@ -0,1 +1,13 @@

v0.3.0
======
- added support for custom clients.
- added test support for node 0.12.
- timeout improvements.
- unit test improvements.
- added timeout to queue pop blocking call.
- removed when dependency.
[Changes](https://github.com/OptimalBits/bull/compare/v0.2.7...v0.3.0)
v0.2.7

@@ -2,0 +14,0 @@ ======

10

gulpfile.js

@@ -13,3 +13,3 @@ /*eslint-env node */

'./lib/queue.js',
'!./test/**'
'./test/**'
])

@@ -31,3 +31,9 @@ .pipe(eslint({

globals: {
'define': true
'define': true,
'describe': true,
'it': true,
'setTimeout': true,
'afterEach': true,
'beforeEach': true,
'before': true
}

@@ -34,0 +40,0 @@ }))

2

lib/job.js

@@ -146,3 +146,3 @@ /*eslint-env node */

var channel = this.queue.toKey('jobs');
var multi = this.queue.client.multi();
var multi = this.queue.multi();
var _this = this;

@@ -149,0 +149,0 @@

@@ -87,3 +87,3 @@ "use strict";

this.queues.forEach(function (queue, key) {
queue.handler = handler;
queue.setHandler(handler);
});

@@ -94,2 +94,3 @@

// TODO: Remove the polling mechanism using pub/sub.
PriorityQueue.prototype.run = function() {

@@ -176,3 +177,3 @@ var _this = this;

results.forEach(function (val) {
sum += val
sum += val;
});

@@ -214,3 +215,2 @@ return sum;

// ---------------------------------------------------------------------
PriorityQueue.prototype.getQueue = function(priority) {

@@ -217,0 +217,0 @@ if (!PriorityQueue.priorities[priority]) {

@@ -12,3 +12,2 @@ /*eslint-env node */

var uuid = require('node-uuid');
var sequence = require('when/sequence');
var semver = require('semver');

@@ -43,5 +42,2 @@

*/
Promise.promisifyAll(redis);
var MINIMUM_REDIS_VERSION = '2.8.11';

@@ -66,2 +62,12 @@ var LOCK_RENEW_TIME = 5000; // 5 seconds is the renew time.

function createClient() {
var client;
if(redisOptions !== undefined && redisOptions.createClient !== undefined){
client = redisOptions.createClient();
}else{
client = redis.createClient(redisPort, redisHost, redisOptions);
}
return Promise.promisifyAll(client);
}
redisPort = redisPort || 6379;

@@ -77,3 +83,3 @@ redisHost = redisHost || '127.0.0.1';

//
this.client = redis.createClient(redisPort, redisHost, redisOptions);
this.client = createClient();

@@ -91,3 +97,3 @@ getRedisVersion(this.client).then(function(version){

//
this.bclient = redis.createClient(redisPort, redisHost, redisOptions);
this.bclient = createClient();

@@ -97,3 +103,3 @@ //

//
this.eclient = redis.createClient(redisPort, redisHost, redisOptions);
this.eclient = createClient();

@@ -175,17 +181,6 @@ this.delayTimer = null;

Queue.prototype.process = function(concurrency, handler){
var _this = this;
if(typeof concurrency === 'function'){
handler = concurrency;
concurrency = 1;
}
if(this.handler) {
throw new Error('Cannot define a handler more than once per Queue instance');
}
this.setHandler(concurrency, handler);
this.concurrency = concurrency;
this.handler = handler;
var _this = this;
var runQueueWhenReady = function(){

@@ -206,2 +201,23 @@ _this.bclient.once('ready', _this.run.bind(_this));

Queue.prototype.setHandler = function(concurrency, handler){
if(typeof concurrency === 'function'){
handler = concurrency;
concurrency = 1;
}
if(this.handler) {
throw new Error('Cannot define a handler more than once per Queue instance');
}
this.concurrency = concurrency;
handler = handler.bind(this);
if(handler.length > 1){
this.handler = Promise.promisify(handler);
}else{
this.handler = Promise.method(handler);
}
};
/**

@@ -446,12 +462,6 @@ interface JobOptions

var _this = this;
return this.client.lrangeAsync(this.toKey('active'), 0, -1).then(function(active){
return Promise.all(active.map(function(jobId){
return Job.fromId(_this, jobId);
}));
}).then(function(jobs){
var tasks = jobs.map(function(job){
return _this.processStalledJob.bind(_this, job);
return this.client.lrangeAsync(this.toKey('active'), 0, -1).then(function(jobs){
return Promise.each(jobs, function(jobId) {
return Job.fromId(_this, jobId).then(_this.processStalledJob.bind(_this));
});
return sequence(tasks);
});

@@ -481,3 +491,5 @@ };

return this.getNextJob().then(function (job) {
return this.processStalledJobs().then(function() {
return _this.getNextJob();
}).then(function (job) {
return job.delayIfNeeded().then(function(delayed) {

@@ -503,3 +515,3 @@ return !delayed && job.takeLock(_this.token);

};
var runHandler = Promise.promisify(this.handler.bind(this));
var timeoutMs = job.opts.timeout;

@@ -531,7 +543,7 @@

this.processing++;
this.emit('active', job);
lockRenewer();
var jobPromise = runHandler(job);
var jobPromise = Promise.resolve(this.handler(job));
if(timeoutMs){

@@ -541,2 +553,4 @@ jobPromise = jobPromise.timeout(timeoutMs);

this.emit('active', job, jobPromise);
return jobPromise.then(handleCompleted, handleFailed).finally(finishProcessing);

@@ -569,3 +583,10 @@ };

}else{
return this.bclient.brpoplpushAsync(this.toKey(src), this.toKey(dst), 0);
return this.bclient.brpoplpushAsync(this.toKey(src), this.toKey(dst),
Math.floor(this.LOCK_RENEW_TIME / 1000)).then(function(jobId) {
if(jobId){
return jobId;
}else{
return Promise.reject();
}
});
}

@@ -572,0 +593,0 @@ };

{
"name": "bull",
"version": "0.2.7",
"version": "0.3.0",
"description": "Job manager",

@@ -20,15 +20,14 @@ "main": "index.js",

"dependencies": {
"bluebird": "~2.3.0",
"lodash": "~2.2.1",
"node-uuid": "~1.4.1",
"bluebird": "^2.9.27",
"lodash": "^3.9.3",
"node-uuid": "^1.4.3",
"redis": "^0.12.1",
"semver": "^4.2.0",
"when": "~2.1.1"
"semver": "^4.2.0"
},
"devDependencies": {
"expect.js": "~0.2.0",
"expect.js": "^0.3.1",
"gulp": "^3.8.11",
"gulp-eslint": "^0.7.0",
"mocha": "~1.21.4",
"sinon": "~1.12.1"
"gulp-eslint": "^0.13.2",
"mocha": "^2.2.5",
"sinon": "^1.14.1"
},

@@ -35,0 +34,0 @@ "scripts": {

@@ -109,2 +109,9 @@ Bull Job Manager

```javascript
.on('active', function(job, jobPromise){
// Job started
// You can use jobPromise.cancel() to abort this job.
})
.on('progress', function(job, progress){
// Job progress updated!
})
queue.on('completed', function(job){

@@ -116,5 +123,2 @@ // Job completed!

})
.on('progress', function(job, progress){
// Job progress updated!
})
.on('paused', function(){

@@ -177,3 +181,3 @@ // The queue has been paused

Bull can also be used for persistent messsage queues. This is a quite useful
Bull can also be used for persistent message queues. This is a quite useful
feature in some usecases. For example, you can have two servers that need to

@@ -239,2 +243,3 @@ communicate with each other. By using a queue the servers do not need to be online

* [Queue##empty](#empty)
* [Queue##close](#close)
* [Job](#job)

@@ -336,5 +341,5 @@ * [Job##remove](#remove)

Returns a promise that resolves when the queue is resumed after being paused.
The resume is global, meaning that all workers in all queue instances for
a given queue will be resumed.
Returns a promise that resolves when the queue is resumed after being paused.
The resume is global, meaning that all workers in all queue instances for
a given queue will be resumed.

@@ -380,2 +385,15 @@ Resuming a queue that is not paused does nothing.

<a name="close"/>
#### Queue##close()
Closes the underlying redis client. Use this if you are performing a graceful
shutdown.
__Arguments__
```javascript
returns {Promise} A promise that resolves when the redis client closes.
```
---------------------------------------
<a name="getJob"/>

@@ -382,0 +400,0 @@ #### Queue##getJob(jobId)

@@ -1,21 +0,17 @@

var Job = require('../lib/job');
/*eslint-env node */
/*global Promise:true */
'use strict';
//var Job = require('../lib/job');
var Queue = require('../');
var expect = require('expect.js');
//var expect = require('expect.js');
describe('Queue in a cluster', function(){
var queue;
before(function(done){
queue = Queue('test concurrent queue', 6379, '127.0.0.1');
queue = new Queue('test concurrent queue', 6379, '127.0.0.1');
done();
});
it('process several jobs in parallel', function(done){
done();
});
});
it('process several jobs in parallel');
});

@@ -0,1 +1,5 @@

/*eslint-env node */
/*global Promise:true */
'use strict';
var Job = require('../lib/job');

@@ -13,7 +17,7 @@ var Queue = require('../lib/queue');

if(keys.length){
queue.client.del(keys, function(err){
done(err);
queue.client.del(keys, function(err2){
done(err2);
});
}else{
done();
done(err);
}

@@ -34,3 +38,3 @@ });

.then(function(createdJob){
job = createdJob
job = createdJob;
});

@@ -57,24 +61,24 @@ });

describe('.remove', function () {
it('removes the job from redis', function(){
return Job.create(queue, 1, {foo: 'bar'})
.tap(function(job){
return job.remove();
})
.then(function(job){
return Job.fromId(queue, job.jobId)
})
.then(function(storedJob){
expect(storedJob).to.be(null);
});
})
it('removes the job from redis', function(){
return Job.create(queue, 1, {foo: 'bar'})
.tap(function(job){
return job.remove();
})
.then(function(job){
return Job.fromId(queue, job.jobId);
})
.then(function(storedJob){
expect(storedJob).to.be(null);
});
});
it('emits removed event', function (cb) {
queue.once('removed', function (job) {
expect(job.data.foo).to.be.equal('bar');
cb();
});
Job.create(queue, 1, {foo: 'bar'}).then(function(job){
job.remove();
});
it('emits removed event', function (cb) {
queue.once('removed', function (job) {
expect(job.data.foo).to.be.equal('bar');
cb();
});
Job.create(queue, 1, {foo: 'bar'}).then(function(job){
job.remove();
});
});
});

@@ -89,4 +93,4 @@

queue.once('failed', function (job) {
queue.once('waiting', function (job) {
expect(job.data.foo).to.be.equal('bar');
queue.once('waiting', function (job2) {
expect(job2.data.foo).to.be.equal('bar');
cb();

@@ -158,9 +162,9 @@ });

it('can set and get progress', function () {
return Job.create(queue, 2, {foo: 'bar'}).then(function(job){
return job.progress(42).then(function(){
return Job.fromId(queue, job.jobId).then(function(storedJob){
expect(storedJob.progress()).to.be(42);
});
return Job.create(queue, 2, {foo: 'bar'}).then(function(job){
return job.progress(42).then(function(){
return Job.fromId(queue, job.jobId).then(function(storedJob){
expect(storedJob.progress()).to.be(42);
});
});
});
});

@@ -170,34 +174,34 @@ });

describe('.moveToCompleted', function () {
it('marks the job as completed', function(){
return Job.create(queue, 3, {foo: 'bar'}).then(function(job){
it('marks the job as completed', function(){
return Job.create(queue, 3, {foo: 'bar'}).then(function(job){
return job.isCompleted().then(function(isCompleted){
expect(isCompleted).to.be(false);
}).then(function(){
return job.moveToCompleted();
}).then(function(){
return job.isCompleted().then(function(isCompleted){
expect(isCompleted).to.be(false);
}).then(function(){
return job.moveToCompleted();
}).then(function(){
return job.isCompleted().then(function(isCompleted){
expect(isCompleted).to.be(true);
});
expect(isCompleted).to.be(true);
});
});
});
});
});
describe('.moveToFailed', function () {
it('marks the job as failed', function(){
return Job.create(queue, 4, {foo: 'bar'}).then(function(job){
it('marks the job as failed', function(){
return Job.create(queue, 4, {foo: 'bar'}).then(function(job){
return job.isFailed().then(function(isFailed){
expect(isFailed).to.be(false);
}).then(function(){
return job.moveToFailed(new Error('test error'));
}).then(function(){
return job.isFailed().then(function(isFailed){
expect(isFailed).to.be(false);
}).then(function(){
return job.moveToFailed(Error("test error"));
}).then(function(){
return job.isFailed().then(function(isFailed){
expect(isFailed).to.be(true);
expect(job.stacktrace).not.be(null);
});
expect(isFailed).to.be(true);
expect(job.stacktrace).not.be(null);
});
});
});
});
});
});

@@ -1,10 +0,13 @@

"use strict";
/// <reference path='../typings/mocha/mocha.d.ts'/>
/*eslint-env node */
/*global Promise:true */
'use strict';
var Job = require('../lib/job');
var Queue = require('../lib/priority-queue');
var expect = require('expect.js');
var Promise = require('bluebird');
var redis = require('redis');
var sinon = require('sinon');
var _ = require('lodash');
var uuid = require('node-uuid');
var redis = require('redis');

@@ -30,3 +33,3 @@ var STD_QUEUE_NAME = 'test queue';

queue = undefined;
})
});
}

@@ -36,2 +39,11 @@ sandbox.restore();

it('allow custom clients', function(){
var clients = 0;
queue = new Queue(STD_QUEUE_NAME, {redis: {opts: {createClient: function(){
clients++;
return redis.createClient();
}}}});
expect(clients).to.be(15);
});
describe('.close', function () {

@@ -41,3 +53,3 @@ var testQueue;

beforeEach(function () {
testQueue = buildQueue('test');
testQueue = buildQueue('test');
});

@@ -53,11 +65,11 @@

it('creates a queue with dots in its name', function(){
queue = Queue('using. dots. in.name.');
queue = new Queue('using. dots. in.name.');
return queue.add({foo: 'bar'}).then(function(job){
expect(job.jobId).to.be.ok()
expect(job.data.foo).to.be('bar')
expect(job.jobId).to.be.ok();
expect(job.data.foo).to.be('bar');
})
.then(function(){
queue.process(function(job, jobDone){
expect(job.data.foo).to.be.equal('bar')
expect(job.data.foo).to.be.equal('bar');
jobDone();

@@ -77,4 +89,4 @@ });

queue.add({foo: 'bar'}).then(function(job){
expect(job.jobId).to.be.ok()
expect(job.data.foo).to.be('bar')
expect(job.jobId).to.be.ok();
expect(job.data.foo).to.be('bar');
}).catch(done);

@@ -134,3 +146,3 @@ });

Promise.all(jobs).then(function(){
return queueStalled.process(function(job) {
return queueStalled.process(function() {
// instead of completing we just close the queue to simulate a crash.

@@ -142,3 +154,4 @@ return queueStalled.close().then(function() {

});
queue2.on('completed', function (job) {
queue2.on('completed', function () {
doneAfterFour();

@@ -152,4 +165,4 @@ });

});
}).catch(done)
})
}).catch(done);
});
});

@@ -175,3 +188,3 @@

return new Promise(function(resolve, reject){
return new Promise(function(resolve){
var resolveAfterAllJobs = _.after(jobs.length, resolve);

@@ -189,9 +202,9 @@ queue.on('completed', resolveAfterAllJobs);

for(var i=0; i<NUM_QUEUES; i++){
var queue = buildQueue('test queue stalled 2');
stalledQueues.push(queue);
queue.setLockRenewTime(10);
for(var i = 0; i < NUM_QUEUES; i++){
var stalledQueue = buildQueue('test queue stalled 2');
stalledQueues.push(stalledQueue);
stalledQueue.setLockRenewTime(10);
for(var j=0; j<NUM_JOBS_PER_QUEUE; j++){
jobs.push(queue.add({job: j}));
for(var j = 0; j < NUM_JOBS_PER_QUEUE; j++){
jobs.push(stalledQueue.add({job: j}));
}

@@ -202,27 +215,29 @@ }

var processed = 0;
for(var k=0; k<stalledQueues.length; k++){
stalledQueues[k].process(function(job){
// instead of completing we just close the queue to simulate a crash.
this.close().then(function() {
processed ++;
if(processed === stalledQueues.length){
setTimeout(function(){
var queue2 = buildQueue('test queue stalled 2');
queue2.process(function(job, jobDone){
jobDone();
});
var procFn = function(){
// instead of completing we just close the queue to simulate a crash.
this.close().then(function() {
processed++;
if(processed === stalledQueues.length){
setTimeout(function(){
var queue2 = buildQueue('test queue stalled 2');
queue2.process(function(job2, jobDone){
jobDone();
});
var counter = 0;
queue2.on('completed', function(job){
counter ++;
if(counter === NUM_QUEUES * NUM_JOBS_PER_QUEUE) {
queue2.close().then(function(){
done();
});
}
});
}, 100);
}
});
var counter = 0;
queue2.on('completed', function(){
counter++;
if(counter === NUM_QUEUES * NUM_JOBS_PER_QUEUE) {
queue2.close().then(function(){
done();
});
}
});
}, 100);
}
});
};
for(var k = 0; k < stalledQueues.length; k++){
stalledQueues[k].process(procFn);
}

@@ -233,8 +248,8 @@ });

it('does not process a job that is being processed when a new queue starts', function(done){
this.timeout(5000)
this.timeout(5000);
var err = null;
var anotherQueue;
var queueName = uuid();
queue = buildQueue(queueName);
queue = buildQueue();
queue.add({foo: 'bar'}).then(function(addedJob){

@@ -247,11 +262,12 @@ queue.process(function(job, jobDone){

}
anotherQueue = buildQueue(queueName);
anotherQueue.process(function(job2, jobDone2){
err = new Error('The second queue should not have received a job to process');
jobDone2();
});
setTimeout(jobDone, 100);
});
anotherQueue = buildQueue();
anotherQueue.process(function(job, jobDone){
err = new Error('The second queue should not have received a job to process');
jobDone();
});
queue.on('completed', function(){

@@ -266,3 +282,3 @@ cleanupQueue(anotherQueue).then(done.bind(null, err));

it('process a job that fails', function(done){
var jobError = Error("Job Failed");
var jobError = new Error('Job Failed');
queue = buildQueue();

@@ -291,7 +307,7 @@

it('process a job that throws an exception', function(done){
var jobError = new Error("Job Failed");
var jobError = new Error('Job Failed');
queue = buildQueue();
queue.process(function(job, jobDone){
queue.process(function(job){
expect(job.data.foo).to.be.equal('bar');

@@ -326,7 +342,9 @@ throw jobError;

jobDone();
if(counter == maxJobs) done();
if(counter === maxJobs){
done();
}
counter++;
});
for(var i=1; i<=maxJobs; i++){
for(var i = 1; i <= maxJobs; i++){
queue.add({foo: 'bar', num: i});

@@ -337,3 +355,2 @@ }

it('count added, unprocessed jobs', function(){
var counter = 1;
var maxJobs = 100;

@@ -344,3 +361,3 @@ var added = [];

for(var i=1; i<=maxJobs; i++){
for(var i = 1; i <= maxJobs; i++){
added.push(queue.add({foo: 'bar', num: i}));

@@ -371,3 +388,5 @@ }

counter--;
if(counter === 0) done();
if(counter === 0){
done();
}
});

@@ -426,3 +445,3 @@

var currentValue = 0, first = true;
queue = Queue('test lifo');
queue = new Queue('test lifo');

@@ -446,3 +465,3 @@ queue.once('ready', function(){

queue.add({'count': 0}).then(function(){
Promise.delay(100).then(function() {
Promise.delay(500).then(function() {
queue.pause().then(function(){

@@ -457,3 +476,3 @@ // Add a series of jobs in a predictable order

});
})
});
});

@@ -463,3 +482,3 @@ });

describe("Jobs getters", function(){
describe('Jobs getters', function(){
it('should get waitting jobs', function(done){

@@ -479,4 +498,2 @@ queue = buildQueue();

it('should get active jobs', function(done){
var counter = 2;
queue = buildQueue();

@@ -496,3 +513,3 @@ queue.process(function(job, jobDone){

it('should get completed jobs', function(){
it('should get completed jobs', function(done){
var counter = 2;

@@ -506,3 +523,3 @@

queue.on('completed', function(){
counter --;
counter--;

@@ -512,3 +529,3 @@ if(counter === 0){

expect(jobs).to.be.a('array');
// We need a "empty completed" kind of function.
// We need a 'empty completed' kind of function.
//expect(jobs.length).to.be.equal(2);

@@ -530,7 +547,7 @@ done();

queue.process(function(job, jobDone){
jobDone(Error("Forced error"));
jobDone(new Error('Forced error'));
});
queue.on('failed', function(){
counter --;
counter--;

@@ -537,0 +554,0 @@ if(counter === 0){

@@ -1,4 +0,5 @@

"use strict";
/*eslint-env node */
/*global Promise:true */
'use strict';
var Job = require('../lib/job');
var Queue = require('../');

@@ -10,2 +11,3 @@ var expect = require('expect.js');

var _ = require('lodash');
var uuid = require('node-uuid');

@@ -19,7 +21,7 @@ var STD_QUEUE_NAME = 'test queue';

function cleanupQueue(queue){
function cleanupQueue(queue) {
return queue.empty().then(queue.close.bind(queue));
}
describe('Queue', function(){
describe('Queue', function () {
var queue;

@@ -30,5 +32,5 @@ var sandbox = sinon.sandbox.create();

if(queue){
return cleanupQueue(queue).then(function(){
return cleanupQueue(queue).then(function () {
queue = undefined;
})
});
}

@@ -40,5 +42,4 @@ sandbox.restore();

var testQueue;
beforeEach(function () {
testQueue = new Queue('test');
testQueue = new Queue('test');
});

@@ -68,3 +69,3 @@

it('should return a promise', function () {
var closePromise = testQueue.close().then(function(){
var closePromise = testQueue.close().then(function () {
expect(closePromise).to.be.a(Promise);

@@ -75,7 +76,7 @@ });

describe('instantiation', function(){
it('should create a queue with standard redis opts', function(done){
queue = Queue('standard');
describe('instantiation', function () {
it('should create a queue with standard redis opts', function (done) {
queue = new Queue('standard');
queue.once('ready', function(){
queue.once('ready', function () {
expect(queue.client.connectionOption.host).to.be('127.0.0.1');

@@ -94,6 +95,6 @@ expect(queue.bclient.connectionOption.host).to.be('127.0.0.1');

it('creates a queue using the supplied redis DB', function(done){
queue = Queue('custom', {redis: {DB: 1}});
it('creates a queue using the supplied redis DB', function (done) {
queue = new Queue('custom', { redis: { DB: 1 } });
queue.once('ready', function(){
queue.once('ready', function () {
expect(queue.client.connectionOption.host).to.be('127.0.0.1');

@@ -112,6 +113,6 @@ expect(queue.bclient.connectionOption.host).to.be('127.0.0.1');

it('creates a queue using custom the supplied redis host', function(done){
queue = Queue('custom', {redis: {host: 'localhost'}});
it('creates a queue using custom the supplied redis host', function (done) {
queue = new Queue('custom', { redis: { host: 'localhost' } });
queue.once('ready', function(){
queue.once('ready', function () {
expect(queue.client.connectionOption.host).to.be('localhost');

@@ -126,11 +127,11 @@ expect(queue.bclient.connectionOption.host).to.be('localhost');

it('creates a queue with dots in its name', function(){
queue = Queue('using. dots. in.name.');
it('creates a queue with dots in its name', function () {
queue = new Queue('using. dots. in.name.');
return queue.add({foo: 'bar'}).then(function(job){
expect(job.jobId).to.be.ok()
expect(job.data.foo).to.be('bar')
}).then(function(){
queue.process(function(job, jobDone){
expect(job.data.foo).to.be.equal('bar')
return queue.add({ foo: 'bar' }).then(function (job) {
expect(job.jobId).to.be.ok();
expect(job.data.foo).to.be('bar');
}).then(function () {
queue.process(function (job, jobDone) {
expect(job.data.foo).to.be.equal('bar');
jobDone();

@@ -142,8 +143,8 @@ });

describe('connection', function(){
it('should recover from a connection loss', function(done){
queue = Queue('test connection loss');
queue.on('error', function(err){
describe('connection', function () {
it('should recover from a connection loss', function (done) {
queue = new Queue('test connection loss');
queue.on('error', function () {
// error event has to be observed or the exception will bubble up
}).process(function(job, jobDone){
}).process(function (job, jobDone) {
expect(job.data.foo).to.be.equal('bar');

@@ -159,3 +160,3 @@ jobDone();

// add something to the queue
queue.add({'foo': 'bar'});
queue.add({ 'foo': 'bar' });
});

@@ -168,5 +169,5 @@

queue.process(function (job, jobDone) {
expect(runSpy.callCount).to.be(2);
jobDone();
done();
expect(runSpy.callCount).to.be(2);
jobDone();
done();
});

@@ -176,3 +177,3 @@

queue.add({'foo': 'bar'});
queue.add({ 'foo': 'bar' });
queue.bclient.emit('end');

@@ -188,111 +189,146 @@ });

setTimeout(function() {
setTimeout(function () {
expect(runSpy.callCount).to.be(0);
done()
}, 100)
done();
}, 100);
});
});
describe(' a worker', function(){
it('should process a job', function(done){
queue = buildQueue();
queue.process(function(job, jobDone){
expect(job.data.foo).to.be.equal('bar');
jobDone();
done();
describe(' a worker', function () {
it('should process a job', function (done) {
queue = buildQueue();
queue.process(function (job, jobDone) {
expect(job.data.foo).to.be.equal('bar');
jobDone();
done();
});
queue.add({ foo: 'bar' }).then(function (job) {
expect(job.jobId).to.be.ok();
expect(job.data.foo).to.be('bar');
}).catch(done);
});
queue.add({foo: 'bar'}).then(function(job){
expect(job.jobId).to.be.ok()
expect(job.data.foo).to.be('bar')
}).catch(done);
});
it('process a job that updates progress', function (done) {
queue = buildQueue();
queue.process(function (job, jobDone) {
expect(job.data.foo).to.be.equal('bar');
job.progress(42);
jobDone();
});
it('process a job that updates progress', function(done){
queue = buildQueue();
queue.process(function(job, jobDone){
expect(job.data.foo).to.be.equal('bar');
job.progress(42);
jobDone();
queue.add({ foo: 'bar' }).then(function (job) {
expect(job.jobId).to.be.ok();
expect(job.data.foo).to.be('bar');
}).catch(done);
queue.on('progress', function (job, progress) {
expect(job).to.be.ok();
expect(progress).to.be.eql(42);
done();
});
});
queue.add({foo: 'bar'}).then(function(job){
expect(job.jobId).to.be.ok();
expect(job.data.foo).to.be('bar');
}).catch(done);
it('process a job that returns data in the process handler', function (done) {
queue = buildQueue();
queue.process(function (job, jobDone) {
expect(job.data.foo).to.be.equal('bar');
jobDone(null, 37);
});
queue.on('progress', function(job, progress){
expect(job).to.be.ok();
expect(progress).to.be.eql(42);
done();
queue.add({ foo: 'bar' }).then(function (job) {
expect(job.jobId).to.be.ok();
expect(job.data.foo).to.be('bar');
}).catch(done);
queue.on('completed', function (job, data) {
expect(job).to.be.ok();
expect(data).to.be.eql(37);
done();
});
});
});
it('process a job that returns data in the process handler', function(done){
queue = buildQueue();
queue.process(function(job, jobDone){
expect(job.data.foo).to.be.equal('bar');
jobDone(null, 37);
it('process a job that returns a promise', function (done) {
queue = buildQueue();
queue.process(function (job) {
expect(job.data.foo).to.be.equal('bar');
return Promise.delay(250);
});
queue.add({ foo: 'bar' }).then(function (job) {
expect(job.jobId).to.be.ok();
expect(job.data.foo).to.be('bar');
}).catch(done);
queue.on('completed', function (job) {
expect(job).to.be.ok();
done();
});
});
queue.add({foo: 'bar'}).then(function(job){
expect(job.jobId).to.be.ok();
expect(job.data.foo).to.be('bar');
}).catch(done);
it('process a synchronous job', function (done) {
queue = buildQueue();
queue.process(function (job) {
expect(job.data.foo).to.be.equal('bar');
});
queue.on('completed', function(job, data){
expect(job).to.be.ok();
expect(data).to.be.eql(37);
done();
queue.add({ foo: 'bar' }).then(function (job) {
expect(job.jobId).to.be.ok();
expect(job.data.foo).to.be('bar');
}).catch(done);
queue.on('completed', function (job) {
expect(job).to.be.ok();
done();
});
});
});
it('process stalled jobs when starting a queue', function(done){
var queueStalled = Queue('test queue stalled', 6379, '127.0.0.1');
queueStalled.LOCK_RENEW_TIME = 10;
var jobs = [
queueStalled.add({bar: 'baz'}),
queueStalled.add({bar1: 'baz1'}),
queueStalled.add({bar2: 'baz2'}),
queueStalled.add({bar3: 'baz3'})
];
it('process stalled jobs when starting a queue', function (done) {
var queueStalled = new Queue('test queue stalled', 6379, '127.0.0.1');
queueStalled.LOCK_RENEW_TIME = 10;
var jobs = [
queueStalled.add({ bar: 'baz' }),
queueStalled.add({ bar1: 'baz1' }),
queueStalled.add({ bar2: 'baz2' }),
queueStalled.add({ bar3: 'baz3' })
];
Promise.all(jobs).then(function(){
queueStalled.process(function(job){
// instead of completing we just close the queue to simulate a crash.
queueStalled.close();
setTimeout(function(){
var queue2 = Queue('test queue stalled', 6379, '127.0.0.1');
var doneAfterFour = _.after(4, function(){
done();
});
queue2.on('completed', doneAfterFour);
Promise.all(jobs).then(function () {
queueStalled.process(function () {
// instead of completing we just close the queue to simulate a crash.
queueStalled.close();
setTimeout(function () {
var queue2 = new Queue('test queue stalled', 6379, '127.0.0.1');
var doneAfterFour = _.after(4, function () {
done();
});
queue2.on('completed', doneAfterFour);
queue2.process(function(job, jobDone){
jobDone();
});
}, 100);
queue2.process(function (job, jobDone) {
jobDone();
});
}, 100);
});
});
});
});
it('processes jobs that were added before the queue backend started', function(){
var queueStalled = Queue('test queue added before', 6379, '127.0.0.1');
queueStalled.LOCK_RENEW_TIME = 10;
var jobs = [
queueStalled.add({bar: 'baz'}),
queueStalled.add({bar1: 'baz1'}),
queueStalled.add({bar2: 'baz2'}),
queueStalled.add({bar3: 'baz3'})
];
it('processes jobs that were added before the queue backend started', function () {
var queueStalled = new Queue('test queue added before', 6379, '127.0.0.1');
queueStalled.LOCK_RENEW_TIME = 10;
var jobs = [
queueStalled.add({ bar: 'baz' }),
queueStalled.add({ bar1: 'baz1' }),
queueStalled.add({ bar2: 'baz2' }),
queueStalled.add({ bar3: 'baz3' })
];
return Promise.all(jobs)
.then(queueStalled.close.bind(queueStalled))
.then(function(){
queue = Queue('test queue added before', 6379, '127.0.0.1');
queue.process(function(job, jobDone){
return Promise.all(jobs)
.then(queueStalled.close.bind(queueStalled))
.then(function () {
queue = new Queue('test queue added before', 6379, '127.0.0.1');
queue.process(function (job, jobDone) {
jobDone();
});
return new Promise(function(resolve, reject){
return new Promise(function (resolve) {
var resolveAfterAllJobs = _.after(jobs.length, resolve);

@@ -302,32 +338,31 @@ queue.on('completed', resolveAfterAllJobs);

});
});
});
it('processes several stalled jobs when starting several queues', function(done){
var NUM_QUEUES = 10;
var NUM_JOBS_PER_QUEUE = 20;
var stalledQueues = [];
var jobs = [];
it('processes several stalled jobs when starting several queues', function (done) {
var NUM_QUEUES = 10;
var NUM_JOBS_PER_QUEUE = 20;
var stalledQueues = [];
var jobs = [];
for(var i=0; i<NUM_QUEUES; i++){
var queue = Queue('test queue stalled 2', 6379, '127.0.0.1');
stalledQueues.push(queue);
queue.LOCK_RENEW_TIME = 10;
for(var i = 0; i < NUM_QUEUES; i++) {
var queueStalled2 = new Queue('test queue stalled 2', 6379, '127.0.0.1');
stalledQueues.push(queueStalled2);
queueStalled2.LOCK_RENEW_TIME = 10;
for(var j=0; j<NUM_JOBS_PER_QUEUE; j++){
jobs.push(queue.add({job: j}));
for(var j = 0; j < NUM_JOBS_PER_QUEUE; j++) {
jobs.push(queueStalled2.add({ job: j }));
}
}
}
Promise.all(jobs).then(function(){
var processed = 0;
for(var k=0; k<stalledQueues.length; k++){
stalledQueues[k].process(function(job){
Promise.all(jobs).then(function () {
var processed = 0;
var procFn = function () {
// instead of completing we just close the queue to simulate a crash.
this.close();
processed ++;
if(processed === stalledQueues.length){
setTimeout(function(){
var queue2 = Queue('test queue stalled 2', 6379, '127.0.0.1');
queue2.process(function(job, jobDone){
processed++;
if(processed === stalledQueues.length) {
setTimeout(function () {
var queue2 = new Queue('test queue stalled 2', 6379, '127.0.0.1');
queue2.process(function (job2, jobDone) {
jobDone();

@@ -337,7 +372,7 @@ });

var counter = 0;
queue2.on('completed', function(job){
counter ++;
queue2.on('completed', function () {
counter++;
if(counter === NUM_QUEUES * NUM_JOBS_PER_QUEUE) {
queue2.close().then(function(){
done();
queue2.close().then(function () {
done();
});

@@ -348,184 +383,214 @@ }

}
});
}
};
for(var k = 0; k < stalledQueues.length; k++) {
stalledQueues[k].process(procFn);
}
});
});
});
it('does not process a job that is being processed when a new queue starts', function(done){
this.timeout(5000)
var err = null;
var anotherQueue;
it('does not process a job that is being processed when a new queue starts', function (done) {
this.timeout(5000);
var err = null;
var anotherQueue;
queue = buildQueue();
queue = buildQueue();
queue.add({foo: 'bar'}).then(function(addedJob){
queue.process(function(job, jobDone){
expect(job.data.foo).to.be.equal('bar');
queue.add({ foo: 'bar' }).then(function (addedJob) {
queue.process(function (job, jobDone) {
expect(job.data.foo).to.be.equal('bar');
if(addedJob.jobId !== job.jobId){
err = new Error('Processed job id does not match that of added job');
}
setTimeout(jobDone, 100);
});
setTimeout(function() {
anotherQueue = buildQueue();
anotherQueue.process(function(job, jobDone){
err = new Error('The second queue should not have received a job to process');
jobDone();
if(addedJob.jobId !== job.jobId) {
err = new Error('Processed job id does not match that of added job');
}
setTimeout(jobDone, 100);
});
setTimeout(function () {
anotherQueue = buildQueue();
anotherQueue.process(function (job, jobDone) {
err = new Error('The second queue should not have received a job to process');
jobDone();
});
queue.on('completed', function(){
cleanupQueue(anotherQueue).then(done.bind(null, err));
});
}, 10);
queue.on('completed', function () {
cleanupQueue(anotherQueue).then(done.bind(null, err));
});
}, 10);
});
});
});
it('process stalled jobs without requiring a queue restart');
it('process stalled jobs without requiring a queue restart', function (done) {
this.timeout(5000);
var collect = _.after(2, done);
it('process a job that fails', function(done){
var jobError = Error("Job Failed");
queue = buildQueue();
queue = buildQueue('running-stalled-job-' + uuid());
queue.process(function(job, jobDone){
expect(job.data.foo).to.be.equal('bar');
jobDone(jobError);
});
queue.LOCK_RENEW_TIME = 1000;
queue.add({foo: 'bar'}).then(function(job){
expect(job.jobId).to.be.ok();
expect(job.data.foo).to.be('bar');
}, function(err){
done(err);
});
queue.on('completed', function () {
collect();
});
queue.once('failed', function(job, err){
expect(job.jobId).to.be.ok();
expect(job.data.foo).to.be('bar');
expect(err).to.be.eql(jobError);
done();
queue.process(function (job, jobDone) {
expect(job.data.foo).to.be.equal('bar');
jobDone();
var client = redis.createClient();
client.srem(queue.toKey('completed'), 1);
client.lpush(queue.toKey('active'), 1);
});
queue.add({ foo: 'bar' }).then(function (job) {
expect(job.jobId).to.be.ok();
expect(job.data.foo).to.be('bar');
}).catch(done);
});
});
it('process a job that throws an exception', function(done){
var jobError = new Error("Job Failed");
it('process a job that fails', function (done) {
var jobError = new Error('Job Failed');
queue = buildQueue();
queue = buildQueue();
queue.process(function (job, jobDone) {
expect(job.data.foo).to.be.equal('bar');
jobDone(jobError);
});
queue.process(function(job, jobDone){
expect(job.data.foo).to.be.equal('bar');
throw jobError;
});
queue.add({ foo: 'bar' }).then(function (job) {
expect(job.jobId).to.be.ok();
expect(job.data.foo).to.be('bar');
}, function (err) {
done(err);
});
queue.add({foo: 'bar'}).then(function(job){
expect(job.jobId).to.be.ok();
expect(job.data.foo).to.be('bar');
}, function(err){
done(err);
queue.once('failed', function (job, err) {
expect(job.jobId).to.be.ok();
expect(job.data.foo).to.be('bar');
expect(err).to.be.eql(jobError);
done();
});
});
queue.once('failed', function(job, err){
expect(job.jobId).to.be.ok();
expect(job.data.foo).to.be('bar');
expect(err).to.be.eql(jobError);
done();
});
});
it('process a job that throws an exception', function (done) {
var jobError = new Error('Job Failed');
it('retry a job that fails', function(done){
var called = 0
var messages = 0;
var failedOnce = false;
queue = buildQueue();
var queue = buildQueue('retry-test-queue');
var client = redis.createClient(6379, '127.0.0.1', {});
client.select(0);
client.on('ready', function () {
client.on("message", function(channel, message) {
expect(channel).to.be.equal(queue.toKey("jobs"));
expect(parseInt(message, 10)).to.be.a('number');
messages++;
queue.process(function (job) {
expect(job.data.foo).to.be.equal('bar');
throw jobError;
});
client.subscribe(queue.toKey("jobs"));
queue.add({foo: 'bar'}).then(function(job){
queue.add({ foo: 'bar' }).then(function (job) {
expect(job.jobId).to.be.ok();
expect(job.data.foo).to.be('bar');
}, function (err) {
done(err);
});
queue.once('failed', function (job, err) {
expect(job.jobId).to.be.ok();
expect(job.data.foo).to.be('bar');
expect(err).to.be.eql(jobError);
done();
});
});
queue.process(function(job, jobDone){
called++;
if (called % 2 !== 0){
throw new Error("Not even!")
}
jobDone();
});
it('retry a job that fails', function (done) {
var called = 0;
var messages = 0;
var failedOnce = false;
queue.once('failed', function(job, err){
expect(job.jobId).to.be.ok();
expect(job.data.foo).to.be('bar');
expect(err.message).to.be.eql("Not even!");
failedOnce = true
queue.retryJob(job);
});
var retryQueue = buildQueue('retry-test-queue');
var client = redis.createClient(6379, '127.0.0.1', {});
queue.once('completed', function(){
expect(failedOnce).to.be(true);
expect(messages).to.eql(2);
done();
});
});
client.select(0);
it('process several jobs serially', function(done){
var counter = 1;
var maxJobs = 100;
client.on('ready', function () {
client.on('message', function (channel, message) {
expect(channel).to.be.equal(retryQueue.toKey('jobs'));
expect(parseInt(message, 10)).to.be.a('number');
messages++;
});
client.subscribe(retryQueue.toKey('jobs'));
queue = buildQueue();
retryQueue.add({ foo: 'bar' }).then(function (job) {
expect(job.jobId).to.be.ok();
expect(job.data.foo).to.be('bar');
});
});
queue.process(function(job, jobDone){
expect(job.data.num).to.be.equal(counter);
expect(job.data.foo).to.be.equal('bar');
jobDone();
if(counter == maxJobs) done();
counter++;
retryQueue.process(function (job, jobDone) {
called++;
if(called % 2 !== 0) {
throw new Error('Not even!');
}
jobDone();
});
retryQueue.once('failed', function (job, err) {
expect(job.jobId).to.be.ok();
expect(job.data.foo).to.be('bar');
expect(err.message).to.be.eql('Not even!');
failedOnce = true;
retryQueue.retryJob(job);
});
retryQueue.once('completed', function () {
expect(failedOnce).to.be(true);
expect(messages).to.eql(2);
done();
});
});
for(var i=1; i<=maxJobs; i++){
queue.add({foo: 'bar', num: i});
}
});
it('process several jobs serially', function (done) {
var counter = 1;
var maxJobs = 100;
it('process a lifo queue', function(done){
var currentValue = 0, first = true;
queue = Queue('test lifo');
queue = buildQueue();
queue.once('ready', function(){
queue.process(function(job, jobDone){
// Catching the job before the pause
if(first){
expect(job.data.count).to.be.equal(0);
first = false;
return jobDone();
}
expect(job.data.count).to.be.equal(currentValue--);
queue.process(function (job, jobDone) {
expect(job.data.num).to.be.equal(counter);
expect(job.data.foo).to.be.equal('bar');
jobDone();
if(currentValue === 0){
if(counter === maxJobs) {
done();
}
counter++;
});
// Add a job to pend proccessing
queue.add({'count': 0}).then(function(){
queue.pause().then(function(){
// Add a series of jobs in a predictable order
var fn = function(cb){
queue.add({'count': ++currentValue}, {'lifo': true}).then(cb);
};
fn(fn(fn(fn(function(){
queue.resume();
}))));
for(var i = 1; i <= maxJobs; i++) {
queue.add({ foo: 'bar', num: i });
}
});
it('process a lifo queue', function (done) {
var currentValue = 0, first = true;
queue = new Queue('test lifo');
queue.once('ready', function () {
queue.process(function (job, jobDone) {
// Catching the job before the pause
if(first) {
expect(job.data.count).to.be.equal(0);
first = false;
return jobDone();
}
expect(job.data.count).to.be.equal(currentValue--);
jobDone();
if(currentValue === 0) {
done();
}
});
// Add a job to pend proccessing
queue.add({ 'count': 0 }).then(function () {
queue.pause().then(function () {
// Add a series of jobs in a predictable order
var fn = function (cb) {
queue.add({ 'count': ++currentValue }, { 'lifo': true }).then(cb);
};
fn(fn(fn(fn(function () {
queue.resume();
}))));
});
});
});

@@ -535,7 +600,3 @@ });

});
it('count added, unprocessed jobs', function(){
var counter = 1;
it('count added, unprocessed jobs', function () {
var maxJobs = 100;

@@ -546,4 +607,4 @@ var added = [];

for(var i=1; i<=maxJobs; i++){
added.push(queue.add({foo: 'bar', num: i}));
for(var i = 1; i <= maxJobs; i++) {
added.push(queue.add({ foo: 'bar', num: i }));
}

@@ -553,10 +614,10 @@

.then(queue.count.bind(queue))
.then(function(count){
expect(count).to.be(100);
})
.then(function (count) {
expect(count).to.be(100);
})
.then(queue.empty.bind(queue))
.then(queue.count.bind(queue))
.then(function(count){
expect(count).to.be(0);
});
.then(function (count) {
expect(count).to.be(0);
});
});

@@ -566,3 +627,3 @@

queue = buildQueue();
queue.add({foo: 'bar'});
queue.add({ foo: 'bar' });
queue.once('waiting', function (job) {

@@ -574,4 +635,4 @@ expect(job.data.foo).to.be.equal('bar');

describe(".pause", function(){
it('should pause a queue until resumed', function(){
describe('.pause', function () {
it('should pause a queue until resumed', function () {
var ispaused = false, counter = 2;

@@ -581,4 +642,4 @@

var resultPromise = new Promise(function(resolve, reject){
queue.process(function(job, jobDone){
var resultPromise = new Promise(function (resolve) {
queue.process(function (job, jobDone) {
expect(ispaused).to.be(false);

@@ -588,3 +649,3 @@ expect(job.data.foo).to.be.equal('paused');

counter--;
if(counter === 0){
if(counter === 0) {
resolve();

@@ -595,14 +656,14 @@ }

return Promise.join(queue.pause().then(function(){
ispaused = true;
return queue.add({foo: 'paused'});
}).then(function(){
return queue.add({foo: 'paused'});
}).then(function(){
ispaused = false;
queue.resume();
}), resultPromise);;
})
return Promise.join(queue.pause().then(function () {
ispaused = true;
return queue.add({ foo: 'paused' });
}).then(function () {
return queue.add({ foo: 'paused' });
}).then(function () {
ispaused = false;
queue.resume();
}), resultPromise);
});
it('should be able to pause a running queue and emit relevant events', function(done){
it('should be able to pause a running queue and emit relevant events', function (done) {
var ispaused = false, isresumed = true, first = true;

@@ -612,4 +673,4 @@

queue.empty().then(function(){
queue.process(function(job, jobDone){
queue.empty().then(function () {
queue.process(function (job, jobDone) {
expect(ispaused).to.be(false);

@@ -619,3 +680,3 @@ expect(job.data.foo).to.be.equal('paused');

if(first){
if(first) {
first = false;

@@ -630,6 +691,6 @@ ispaused = true;

queue.add({foo: 'paused'});
queue.add({foo: 'paused'});
queue.add({ foo: 'paused' });
queue.add({ foo: 'paused' });
queue.on('paused', function(){
queue.on('paused', function () {
ispaused = false;

@@ -639,3 +700,3 @@ queue.resume();

queue.on('resumed', function(){
queue.on('resumed', function () {
isresumed = true;

@@ -648,25 +709,25 @@ });

it('should publish a message when a new message is added to the queue', function(done) {
it('should publish a message when a new message is added to the queue', function (done) {
var client = redis.createClient(6379, '127.0.0.1', {});
client.select(0);
queue = Queue('test pub sub');
queue = new Queue('test pub sub');
client.on('ready', function () {
client.on("message", function(channel, message) {
expect(channel).to.be.equal(queue.toKey("jobs"));
client.on('message', function (channel, message) {
expect(channel).to.be.equal(queue.toKey('jobs'));
expect(parseInt(message, 10)).to.be.a('number');
done();
});
client.subscribe(queue.toKey("jobs"));
queue.add({test: "stuff"});
client.subscribe(queue.toKey('jobs'));
queue.add({ test: 'stuff' });
});
});
it("should emit an event when a job becomes active", function (done) {
it('should emit an event when a job becomes active', function (done) {
queue = buildQueue();
queue.process(function(job, jobDone){
queue.process(function (job, jobDone) {
jobDone();
});
queue.add({});
queue.once('active', function (job) {
queue.once('completed', function (job) {
queue.once('active', function () {
queue.once('completed', function () {
done();

@@ -677,6 +738,6 @@ });

describe("Delayed jobs", function(){
it("should process a delayed job only after delayed time", function(done){
describe('Delayed jobs', function () {
it('should process a delayed job only after delayed time', function (done) {
var delay = 500;
queue = Queue("delayed queue simple");
queue = new Queue('delayed queue simple');
var client = redis.createClient(6379, '127.0.0.1', {});

@@ -686,22 +747,22 @@ var timestamp = Date.now();

client.on('ready', function () {
client.on("message", function(channel, message) {
client.on('message', function (channel, message) {
expect(parseInt(message, 10)).to.be.a('number');
publishHappened = true;
});
client.subscribe(queue.toKey("jobs"));
client.subscribe(queue.toKey('jobs'));
});
queue.process(function(job, jobDone){
queue.process(function (job, jobDone) {
jobDone();
});
queue.on('completed', function(){
queue.on('completed', function () {
expect(Date.now() > timestamp + delay);
queue.getWaiting().then(function(jobs){
queue.getWaiting().then(function (jobs) {
expect(jobs.length).to.be.equal(0);
}).then(function(){
return queue.getDelayed().then(function(jobs){
}).then(function () {
return queue.getDelayed().then(function (jobs) {
expect(jobs.length).to.be.equal(0);
})
}).then(function(){
});
}).then(function () {
expect(publishHappened).to.be(true);

@@ -713,6 +774,6 @@ done();

queue.on('ready', function () {
queue.add({delayed: 'foobar'}, {delay: delay}).then(function(job){
expect(job.jobId).to.be.ok()
expect(job.data.delayed).to.be('foobar')
expect(job.delay).to.be(delay)
queue.add({ delayed: 'foobar' }, { delay: delay }).then(function (job) {
expect(job.jobId).to.be.ok();
expect(job.data.delayed).to.be('foobar');
expect(job.delay).to.be(delay);
});

@@ -722,7 +783,7 @@ });

it("should process delayed jobs in correct order", function(done){
it('should process delayed jobs in correct order', function (done) {
var order = 0;
queue = Queue("delayed queue multiple");
queue = new Queue('delayed queue multiple');
queue.process(function(job, jobDone){
queue.process(function (job, jobDone) {
expect(order).to.be.below(job.data.order);

@@ -732,3 +793,3 @@ order = job.data.order;

jobDone();
if(order === 10){
if(order === 10) {
done();

@@ -738,25 +799,25 @@ }

queue.add({order: 1}, {delay: 100});
queue.add({order: 6}, {delay: 600});
queue.add({order: 10}, {delay: 1000});
queue.add({order: 2}, {delay: 200});
queue.add({order: 9}, {delay: 900});
queue.add({order: 5}, {delay: 500});
queue.add({order: 3}, {delay: 300});
queue.add({order: 7}, {delay: 700});
queue.add({order: 4}, {delay: 400});
queue.add({order: 8}, {delay: 800});
queue.add({ order: 1 }, { delay: 100 });
queue.add({ order: 6 }, { delay: 600 });
queue.add({ order: 10 }, { delay: 1000 });
queue.add({ order: 2 }, { delay: 200 });
queue.add({ order: 9 }, { delay: 900 });
queue.add({ order: 5 }, { delay: 500 });
queue.add({ order: 3 }, { delay: 300 });
queue.add({ order: 7 }, { delay: 700 });
queue.add({ order: 4 }, { delay: 400 });
queue.add({ order: 8 }, { delay: 800 });
});
it("should process delayed jobs in correct order even in case of restart", function(done){
var QUEUE_NAME = "delayed queue multiple";
it('should process delayed jobs in correct order even in case of restart', function (done) {
var QUEUE_NAME = 'delayed queue multiple';
var order = 1;
queue = Queue(QUEUE_NAME);
queue = new Queue(QUEUE_NAME);
var fn = function(job, jobDone){
var fn = function (job, jobDone) {
expect(order).to.be.equal(job.data.order);
jobDone();
if (order === 4 ) {
if(order === 4) {
done();

@@ -769,7 +830,6 @@ }

Promise.join(
queue.add({order: 2}, {delay: 300}),
queue.add({order: 4}, {delay: 500}),
queue.add({order: 1}, {delay: 200}),
queue.add({order: 3}, {delay: 400})).then(function(){
queue.add({ order: 2 }, { delay: 300 }),
queue.add({ order: 4 }, { delay: 500 }),
queue.add({ order: 1 }, { delay: 200 }),
queue.add({ order: 3 }, { delay: 400 })).then(function () {
//

@@ -779,17 +839,16 @@ // Start processing so that jobs get into the delay set.

queue.process(fn);
}).delay(20).then(function(){
}).delay(20).then(function () {
//We simulate a restart
return queue.close().then(function() {
return Promise.delay(100).then(function() {
queue = Queue(QUEUE_NAME);
return queue.close().then(function () {
return Promise.delay(100).then(function () {
queue = new Queue(QUEUE_NAME);
queue.process(fn);
})
});
});
});
});
});
describe("Concurrency process", function() {
it("should run job in sequence if I specify a concurrency of 1", function (done) {
describe('Concurrency process', function () {
it('should run job in sequence if I specify a concurrency of 1', function (done) {
queue = buildQueue();

@@ -799,3 +858,3 @@

queue.process(1, function (job, done) {
queue.process(1, function (job, jobDone) {
expect(processing).to.be.equal(false);

@@ -805,3 +864,3 @@ processing = true;

processing = false;
done();
jobDone();
});

@@ -819,5 +878,5 @@ });

//Due to time to get new jobs and call process, false negative can appear.
it("should process job respecting the concurrency set", function (done) {
queue = buildQueue("test concurrency");
queue.empty().then(function() {
it('should process job respecting the concurrency set', function (done) {
queue = buildQueue('test concurrency');
queue.empty().then(function () {
var nbProcessing = 0;

@@ -827,3 +886,3 @@ var pendingMessageToProcess = 8;

queue.process(4, function (job, done) {
queue.process(4, function (job, jobDone) {
nbProcessing++;

@@ -841,3 +900,3 @@ expect(nbProcessing).to.be.lessThan(5);

nbProcessing--;
done();
jobDone();
});

@@ -860,3 +919,3 @@ }).catch(done);

it("should wait for all concurrent processing in case of pause", function (done) {
it('should wait for all concurrent processing in case of pause', function (done) {
queue = buildQueue();

@@ -867,11 +926,11 @@

queue.process(3, function (job, done) {
queue.process(3, function (job, jobDone) {
var error = null;
if (++i === 4) {
if(++i === 4){
queue.pause().then(function () {
Promise.delay(500).then(function(){ // Wait for all the active jobs to finalize.
Promise.delay(500).then(function () { // Wait for all the active jobs to finalize.
expect(nbJobFinish).to.be.above(3);
queue.resume();
})
});
});

@@ -882,3 +941,3 @@ }

// They had a bug in pause() with this special case.
if (i % 3 == 0) {
if(i % 3 === 0){
error = new Error();

@@ -888,5 +947,5 @@ }

//100 - i*20 is to force to finish job n°4 before lower job that will wait longer
Promise.delay(100 - i*10).then(function () {
Promise.delay(100 - i * 10).then(function () {
nbJobFinish++;
done(error);
jobDone(error);
});

@@ -911,7 +970,7 @@ }).catch(done);

describe("Jobs getters", function(){
it('should get waitting jobs', function(done){
describe('Jobs getters', function () {
it('should get waitting jobs', function (done) {
queue = buildQueue();
Promise.join(queue.add({foo: 'bar'}), queue.add({baz: 'qux'})).then(function(){
queue.getWaiting().then(function(jobs){
Promise.join(queue.add({ foo: 'bar' }), queue.add({ baz: 'qux' })).then(function () {
queue.getWaiting().then(function (jobs) {
expect(jobs).to.be.a('array');

@@ -926,8 +985,6 @@ expect(jobs.length).to.be.equal(2);

it('should get active jobs', function(done){
var counter = 2;
it('should get active jobs', function (done) {
queue = buildQueue();
queue.process(function(job, jobDone){
queue.getActive().then(function(jobs){
queue.process(function (job, jobDone) {
queue.getActive().then(function (jobs) {
expect(jobs).to.be.a('array');

@@ -941,11 +998,11 @@ expect(jobs.length).to.be.equal(1);

queue.add({foo: 'bar'});
queue.add({ foo: 'bar' });
});
it('should get a specific job', function(done){
var data = {foo: 'sup!'};
it('should get a specific job', function (done) {
var data = { foo: 'sup!' };
queue = buildQueue();
queue.add(data).then(function(job) {
queue.getJob(job.jobId).then(function(returnedJob) {
queue.add(data).then(function (job) {
queue.getJob(job.jobId).then(function (returnedJob) {
expect(returnedJob.data).to.eql(data);

@@ -955,18 +1012,18 @@ expect(returnedJob.jobId).to.be(job.jobId);

});
})
});
});
it('should get completed jobs', function(){
it('should get completed jobs', function (done) {
var counter = 2;
queue = buildQueue();
queue.process(function(job, jobDone){
queue.process(function (job, jobDone) {
jobDone();
});
queue.on('completed', function(){
counter --;
queue.on('completed', function () {
counter--;
if(counter === 0){
queue.getCompleted().then(function(jobs){
queue.getCompleted().then(function (jobs) {
expect(jobs).to.be.a('array');

@@ -980,7 +1037,7 @@ // We need a "empty completed" kind of function.

queue.add({foo: 'bar'});
queue.add({baz: 'qux'});
queue.add({ foo: 'bar' });
queue.add({ baz: 'qux' });
});
it('should get failed jobs', function(done){
it('should get failed jobs', function (done) {
var counter = 2;

@@ -990,11 +1047,11 @@

queue.process(function(job, jobDone){
jobDone(Error("Forced error"));
queue.process(function (job, jobDone) {
jobDone(new Error('Forced error'));
});
queue.on('failed', function(){
counter --;
queue.on('failed', function () {
counter--;
if(counter === 0){
queue.getFailed().then(function(jobs){
queue.getFailed().then(function (jobs) {
expect(jobs).to.be.a('array');

@@ -1006,14 +1063,14 @@ done();

queue.add({foo: 'bar'});
queue.add({baz: 'qux'});
queue.add({ foo: 'bar' });
queue.add({ baz: 'qux' });
});
it('fails jobs that exceed their specified timeout', function(done){
it('fails jobs that exceed their specified timeout', function (done) {
queue = buildQueue();
queue.process(function(job, jobDone){
queue.process(function (job, jobDone) {
setTimeout(jobDone, 150);
});
queue.on('failed', function(job, error){
queue.on('failed', function (job, error) {
expect(error).to.be.a(Promise.TimeoutError);

@@ -1023,3 +1080,3 @@ done();

queue.on('completed', function(){
queue.on('completed', function () {
var error = new Error('The job should have timed out');

@@ -1029,3 +1086,3 @@ done(error);

queue.add({some: 'data'}, {
queue.add({ some: 'data' }, {
timeout: 100

@@ -1032,0 +1089,0 @@ });

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc