Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

bull

Package Overview
Dependencies
Maintainers
1
Versions
198
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

bull - npm Package Compare versions

Comparing version 1.1.1 to 1.1.2

9

lib/job.js

@@ -126,4 +126,4 @@ /*eslint-env node */

*/
Job.prototype.takeLock = function(token, renew){
return scripts.takeLock(this.queue, this, token, renew).then(function(res){
Job.prototype.takeLock = function(token, renew, ensureActive){
return scripts.takeLock(this.queue, this, token, renew, ensureActive).then(function(res){
return res === 1; // Indicates successful lock.

@@ -357,2 +357,3 @@ });

if(!finished){
var interval;
function onCompleted(job){

@@ -363,2 +364,3 @@ if(job.jobId === _this.jobId){

removeListeners();
clearInterval(interval);
}

@@ -371,2 +373,3 @@

removeListeners();
clearInterval(interval);
}

@@ -385,3 +388,3 @@

//
var interval = setInterval(function(){
interval = setInterval(function(){
status(resolve, reject).then(function(finished){

@@ -388,0 +391,0 @@ if(finished){

@@ -66,3 +66,3 @@ /*eslint-env node */

if(_.isObject(redisPort)){
if(_.isObject(redisPort)) {
var opts = redisPort;

@@ -72,4 +72,7 @@ var redisOpts = opts.redis || {};

redisHost = redisOpts.host;
redisOptions = redisOpts.opts || {};
redisOptions = redisOpts.opts || {};
redisOptions.db = redisOpts.DB;
} else if(arguments.length == 3) {
redisPort = parseInt(redisPort);
redisOptions = redisOptions || {};
} else if(_.isString(redisPort)) {

@@ -601,3 +604,3 @@ try {

Queue.prototype.processJob = function(job, renew){
Queue.prototype.processJob = function(job){
var _this = this;

@@ -619,4 +622,8 @@ var lockRenewId;

//
var renew = false;
var lockRenewer = function(){
return job.takeLock(_this.token, renew).then(function(locked){
// The first call to lock the job should ensure that the job is in the 'active' state,
// because it might have gotten picked up already by another processor. We don't need
// to do this on subsequent calls.
return job.takeLock(_this.token, renew, !renew).then(function(locked){
if(locked){

@@ -626,2 +633,4 @@ renew = true;

}
// TODO: if we failed to re-acquire the lock while trying to renew, should we let the job
// handler know and cancel the timer?
return locked;

@@ -628,0 +637,0 @@ }, function(err){

@@ -202,18 +202,7 @@ /**

},
moveToCompleted: function(job, token, removeOnComplete){
return scripts.move(job, token, 'active', removeOnComplete ? void 0 : 'completed');
/*
var params = {};
if(isNaN(job.attemptsMade)){
params.attemptsMade = 1;
}else{
params.attemptsMade = job.attemptsMade++;
}
},
if(job.stacktrace){
params.stacktrace = JSON.stringify(job.stacktrace);
}
return this.queue.client.hmsetAsync(this.queue.toKey(this.jobId), params);
*/
},
moveToSet: function(queue, set, jobId, context){

@@ -305,5 +294,37 @@ //

/**
* Takes a lock
* Gets a lock for a job.
*
* @param {Queue} queue The queue for the job
* @param {Job} job The job
* @param {Boolean=false} renew Whether to renew to lock, meaning it will assume the job
* is already locked and just reset the lock expiration.
* @param {Boolean=false} ensureActive Ensures that the job is in the 'active' state.
*/
takeLock: function(queue, job, token, renew){
takeLock: function(queue, job, token, renew, ensureActive){
// Ensures that the lock doesn't exist, or if it does, that we own it.
var ensureOwnershipCall = [
'local prevLock = redis.call("GET", KEYS[1])',
'if (prevLock and prevLock ~= ARGV[1]) then',
' return 0',
'end'
].join('\n');
// Ensures that the lock in the 'active' state.
var ensureActiveCall = [
// Note that while this is inefficient to run a O(n) traversal of the 'active' queue,
// it's highly likely that the job is within the first few elements of the active
// list at the time this call is used.
'local activeJobs = redis.call("LRANGE", KEYS[3], 0, -1)',
'local found = false',
'for _, job in ipairs(activeJobs) do',
' if(job == ARGV[3]) then',
' found = true',
' break',
' end',
'end',
'if (found == false) then',
' return 0',
'end'
].join('\n');
var lockCall;

@@ -317,2 +338,4 @@ if (renew){

var script = [
(renew ? ensureOwnershipCall : ''),
(ensureActive ? ensureActiveCall : ''),
'if(' + lockCall + ') then',

@@ -329,9 +352,11 @@ // Mark the job as having been locked at least once. Used to determine if the job was stalled.

queue.client,
'takeLock' + (renew ? 'Renew' : ''),
'takeLock' + (renew ? 'Renew' : '') + (ensureActive ? 'EnsureActive' : ''),
script,
2,
3,
job.lockKey(),
queue.toKey(job.jobId),
queue.toKey('active'),
token,
queue.LOCK_RENEW_TIME
queue.LOCK_RENEW_TIME,
job.jobId
];

@@ -338,0 +363,0 @@

{
"name": "bull",
"version": "1.1.1",
"version": "1.1.2",
"description": "Job manager",

@@ -22,5 +22,4 @@ "main": "index.js",

"debuglog": "^1.0.0",
"disturbed": "^1.0.3",
"disturbed": "^1.0.6",
"lodash": "^4.16.6",
"mocha": "^3.1.2",
"node-uuid": "^1.4.7",

@@ -27,0 +26,0 @@ "redis": "^2.6.3",

@@ -76,6 +76,7 @@ /*eslint-env node */

queue = buildQueue();
var numJobs = 100;
workerMessageHandler = function(job) {
jobs.push(job.id);
if(jobs.length === 11) {
if(jobs.length === numJobs) {
var counts = {};

@@ -92,3 +93,3 @@ var j = 0;

var i = 0;
for(i; i < 11; i++) {
for(i; i < numJobs; i++) {
queue.add({});

@@ -95,0 +96,0 @@ }

@@ -172,2 +172,20 @@ /*eslint-env node */

it('should create a queue with a port number and a hostname', function (done) {
var queue = new Queue('connstring', '6379', '127.0.0.1');
queue.once('ready', function () {
expect(queue.client.connection_options.host).to.be('127.0.0.1');
expect(queue.bclient.connection_options.host).to.be('127.0.0.1');
expect(queue.client.connection_options.port).to.be(6379);
expect(queue.bclient.connection_options.port).to.be(6379);
expect(queue.client.selected_db).to.be(0);
expect(queue.bclient.selected_db).to.be(0);
queue.close().then(done);
});
});
it('creates a queue using the supplied redis DB', function (done) {

@@ -219,2 +237,19 @@ var queue = new Queue('custom', { redis: { DB: 1 } });

});
it('creates a queue accepting port as a string', function () {
var queue = new Queue('foobar', '6379', 'localhost');
return queue.add({ foo: 'bar' }).then(function (job) {
expect(job.jobId).to.be.ok();
expect(job.data.foo).to.be('bar');
}).then(function () {
queue.process(function (job, jobDone) {
expect(job.data.foo).to.be.equal('bar');
jobDone();
});
}).then(function () {
return queue.close();
});
});
});

@@ -221,0 +256,0 @@

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc