Socket
Socket
Sign inDemoInstall

better-queue

Package Overview
Dependencies
3
Maintainers
2
Versions
59
Alerts
File Explorer

Advanced tools

Install Socket

Detect and block malicious and high-risk dependencies

Install

Comparing version 3.7.21 to 3.8.2

61

lib/queue.js

@@ -8,8 +8,2 @@ var uuid = require('node-uuid');

var stores = {
memory: './stores/memory',
sqlite: './stores/sqlite',
sql: './stores/SqlStore',
}
function Queue(process, opts) {

@@ -143,35 +137,23 @@ var self = this;

var self = this;
if (typeof store === 'string' && stores[store]) {
var loadStore = function (store) {
if (store === 'sqlite') {
// For backwards compatibility, we're adding this deprecated hack.
console.log('DEPRECATED: better-queue sqlite store is renamed to sql with dialect.')
store = 'sql';
}
var Store;
try {
var Store;
if (store === 'memory') {
Store = require('./stores/memory');
} else if (store === 'sql') {
Store = require('./stores/SqlStore');
} else if (store === 'sqlite') {
Store = require('./stores/sqlite');
} else {
Store = require(stores[store]);
}
self._store = new Store(opts);
} catch (e) { throw e }
} else if (typeof store === 'object' && typeof store.type === 'string' && stores[store.type]) {
try {
var Store;
if (store.type === 'memory') {
Store = require('./stores/memory');
} else if (store.type === 'sql') {
Store = require('./stores/SqlStore');
} else if (store.type === 'sqlite') {
Store = require('./stores/sqlite');
} else {
Store = require(stores[store.type]);
}
self._store = new Store(store);
} catch (e) { throw e }
} else if (typeof store === 'object' &&
store.putTask &&
store.getTask &&
((self.filo && store.takeLastN) ||
(!self.filo && store.takeFirstN))) {
Store = require('better-queue-' + store);
} catch (e) {
throw new Error('Attempting to require better-queue-' + store + ', but failed.\nPlease ensure you have this store installed via npm install --save better-queue-' + store)
}
return Store;
}
if (typeof store === 'string') {
var Store = loadStore(store);
self._store = new Store(opts);
} else if (typeof store === 'object' && typeof store.type === 'string') {
var Store = loadStore(store.type);
self._store = new Store(store);
} else if (typeof store === 'object' && store.putTask && store.getTask && ((self.filo && store.takeLastN) || (!self.filo && store.takeFirstN))) {
self._store = store;

@@ -261,3 +243,2 @@ } else {

}
if (!self._connected) {

@@ -565,3 +546,3 @@ self._tasksWaitingForConnect.push({ input: input, ticket: ticket });

self._fetching--;
if (err || !lockId) return;
if (err || lockId === undefined) return;
self._store.getLock(lockId, function (err, batch) {

@@ -568,0 +549,0 @@ if (err || !batch) return;

{
"name": "better-queue",
"version": "3.7.21",
"version": "3.8.2",
"description": "Better Queue for NodeJS",

@@ -30,12 +30,6 @@ "main": "lib/queue.js",

"devDependencies": {
"async": "^2.0.0-rc.4",
"extend": "^3.0.0",
"fs-extra": "^0.30.0",
"knex": "^0.11.4",
"mocha": "^2.3.4",
"mockery": "^1.7.0",
"pg": "^4.5.5",
"sqlite3": "^3.1.4"
"mocha": "^2.3.4"
},
"dependencies": {
"better-queue-memory": "^1.0.1",
"node-eta": "^0.9.0",

@@ -42,0 +36,0 @@ "node-uuid": "^1.4.7"

var assert = require('assert');
var helper = require('./lib/helper');
var fs = require('fs-extra');
var Queue = require('../lib/queue');
var MemoryStore = require('../lib/stores/memory');
var SQLiteStore = require('../lib/stores/sqlite');
var MemoryStore = require('better-queue-memory');

@@ -288,30 +286,2 @@ describe('Complex Queue', function() {

})
it('should release lock after running (sqlite)', function (done) {
var s = new SQLiteStore();
var q = new Queue(function (n, cb) {
cb();
setTimeout(function () {
s.getRunningTasks(function (err, tasks) {
assert.ok(!Object.keys(tasks).length)
done();
})
}, 1)
}, { store: s, autoResume: true })
q.push(1);
this.q = q;
})
it('should resume running task (sqlite)', function (done) {
var self = this;
var s = new SQLiteStore();
var q1 = new Queue(function () {
var q2 = new Queue(function () {
done();
}, { store: s })
self.q2 = q2;
}, { store: s })
q1.push(1);
this.q1 = q1;
})

@@ -335,59 +305,59 @@ it('failed task should not stack overflow', function (done) {

it('drain should still work with persistent queues', function (done) {
var q = new Queue(function (n, cb) {
setTimeout(cb, 1);
}, {
store: {
type: 'sql',
dialect: 'sqlite',
path: 'testqueue.sql'
}
})
var drained = false;
q.on('drain', function () {
drained = true;
done();
});
q.push(1);
this.q = q;
})
// it('drain should still work with persistent queues', function (done) {
// var q = new Queue(function (n, cb) {
// setTimeout(cb, 1);
// }, {
// store: {
// type: 'sql',
// dialect: 'sqlite',
// path: 'testqueue.sql'
// }
// })
// var drained = false;
// q.on('drain', function () {
// drained = true;
// done();
// });
// q.push(1);
// this.q = q;
// })
it('drain should still work when there are persisted items at load time', function (done) {
var initialQueue = new Queue(function (n, cb) {
setTimeout(cb, 100);
}, {
store: {
type: 'sql',
dialect: 'sqlite',
path: 'testqueue.sql'
}
});
initialQueue.push('' + 1);
initialQueue.push('' + 2);
setTimeout(function () {
// This effectively captures the queue in a state where there were unprocessed items
fs.copySync('testqueue.sql', 'testqueue2.sql');
initialQueue.destroy();
var persistedQueue = new Queue(function (n, cb) {
setTimeout(cb, 1);
}, {
store: {
type: 'sql',
dialect: 'sqlite',
path: 'testqueue2.sql'
}
})
var drained = false;
persistedQueue.on('drain', function () {
drained = true;
});
persistedQueue.push(2);
setTimeout(function () {
persistedQueue.destroy();
// it('drain should still work when there are persisted items at load time', function (done) {
// var initialQueue = new Queue(function (n, cb) {
// setTimeout(cb, 100);
// }, {
// store: {
// type: 'sql',
// dialect: 'sqlite',
// path: 'testqueue.sql'
// }
// });
// initialQueue.push('' + 1);
// initialQueue.push('' + 2);
// setTimeout(function () {
// // This effectively captures the queue in a state where there were unprocessed items
// fs.copySync('testqueue.sql', 'testqueue2.sql');
// initialQueue.destroy();
// var persistedQueue = new Queue(function (n, cb) {
// setTimeout(cb, 1);
// }, {
// store: {
// type: 'sql',
// dialect: 'sqlite',
// path: 'testqueue2.sql'
// }
// })
// var drained = false;
// persistedQueue.on('drain', function () {
// drained = true;
// });
// persistedQueue.push(2);
// setTimeout(function () {
// persistedQueue.destroy();
assert.ok(drained);
done();
}, 140)
}, 40)
})
// assert.ok(drained);
// done();
// }, 140)
// }, 40)
// })
})

@@ -1,17 +0,9 @@

var async = require('async');
var mockery = require('mockery');
mockery.enable({ warnOnReplace: false, warnOnUnregistered: false });
mockery.registerMock('./PostgresAdapter', require('../fixtures/PostgresAdapter'));
mockery.registerMock('./SqliteAdapter', require('../fixtures/SqliteAdapter'));
exports.destroyQueues = function (done) {
async.each([this.q, this.q1, this.q2], function (q, qCB) {
if (!q) return qCB();
exports.destroyQueues = function () {
[this.q, this.q1, this.q2].forEach(function (q) {
if (!q) return;
setTimeout(function () {
q.destroy(qCB);
q.destroy();
}, 15);
}, function (err) {
if (err) console.error(err);
done();
});
};

@@ -89,4 +89,3 @@ var assert = require('assert');

// TODO: Test progress
// TODO: Test the actual stores
})
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc