better-queue
Advanced tools
Comparing version 3.7.5 to 3.7.6
@@ -85,3 +85,3 @@ var uuid = require('node-uuid'); | ||
// Initialize Storage | ||
self.use(opts.store || 'memory'); | ||
self.use(opts.store || 'sql'); | ||
if (!self._store) { | ||
@@ -88,0 +88,0 @@ throw new Error('Queue cannot continue without a valid store.') |
@@ -24,23 +24,44 @@ var _ = require('lodash'); | ||
self.disconnect = done; | ||
cb(null, client); | ||
self.initialize(function (err) { | ||
if (err) return cb(err); | ||
cb(null, client); | ||
}); | ||
}); | ||
}; | ||
// http://stackoverflow.com/questions/1109061/insert-on-duplicate-update-in-postgresql | ||
PostgresAdapter.prototype.initialize = function (cb) { | ||
var sql = ' \n\ | ||
CREATE FUNCTION upsert_' + this.tableName + '(_id TEXT, _lock TEXT, _task TEXT, _priority NUMERIC) RETURNS VOID AS \n\ | ||
$$ \n\ | ||
BEGIN \n\ | ||
LOOP \n\ | ||
-- first try to update the key \n\ | ||
-- note that "id" must be unique \n\ | ||
UPDATE ' + this.tableName + ' SET lock=_lock, task=_task, priority=_priority WHERE id=_id; \n\ | ||
IF found THEN \n\ | ||
RETURN; \n\ | ||
END IF; \n\ | ||
-- not there, so try to insert the key \n\ | ||
-- if someone else inserts the same key concurrently, \n\ | ||
-- we could get a unique-key failure \n\ | ||
BEGIN \n\ | ||
INSERT INTO ' + this.tableName + ' (id, lock, task, priority) VALUES (_id, _lock, _task, _priority); \n\ | ||
RETURN; \n\ | ||
EXCEPTION WHEN unique_violation THEN \n\ | ||
-- do nothing, and loop to try the UPDATE again \n\ | ||
END; \n\ | ||
END LOOP; \n\ | ||
END; \n\ | ||
$$ \n\ | ||
LANGUAGE plpgsql; \n\ | ||
'; | ||
this.run(sql, function (err) { | ||
if (err) console.error('Error initializing: ', err); | ||
cb(err); | ||
}); | ||
}; | ||
PostgresAdapter.prototype.upsert = function (properties, cb) { | ||
var keys = Object.keys(properties); | ||
var values = keys.map(function (k) { | ||
var value = properties[k]; | ||
return typeof(value) === 'string' ? util.format("'%s'", value) : value; | ||
}); | ||
var sql = util.format('INSERT INTO %s (%s)', this.tableName, keys.join(',')); | ||
sql += util.format(' VALUES (%s)', values.join(',')); | ||
sql += ' ON CONFLICT (id) DO UPDATE SET '; | ||
var updates = []; | ||
_.zip(keys, values).forEach(function (kv) { | ||
var key = kv[0]; | ||
var value = kv[1]; | ||
if (key === 'id') return; | ||
updates.push(util.format('%s=%s', key, value)); | ||
}); | ||
sql += updates.join(','); | ||
var sql = util.format("SELECT upsert_%s('%s', '%s', '%s', %s)", this.tableName, properties.id, properties.lock, properties.task, properties.priority); | ||
this.run(sql, cb); | ||
@@ -78,5 +99,9 @@ }; | ||
var self = this; | ||
self.adapter.query(util.format('DROP TABLE IF EXISTS %s', this.tableName), cb); | ||
if (!self.adapter) return cb(); | ||
self.adapter.query(util.format('DROP FUNCTION IF EXISTS upsert_%s(text, text, text, numeric)', self.tableName), function (err) { | ||
if (err) return cb(err); | ||
self.adapter.query(util.format('DROP TABLE IF EXISTS %s', self.tableName), cb); | ||
}); | ||
}; | ||
module.exports = PostgresAdapter; |
@@ -14,3 +14,3 @@ var _ = require('lodash'); | ||
var dialect = opts.dialect || 'sqlite'; | ||
var dialect = opts.dialect || 'postgres'; | ||
if (dialect === 'sqlite') this.adapter = new SqliteAdapter(opts); | ||
@@ -17,0 +17,0 @@ else if (dialect === 'postgres') this.adapter = new PostgresAdapter(opts); |
{ | ||
"name": "better-queue", | ||
"version": "3.7.5", | ||
"version": "3.7.6", | ||
"description": "Better Queue for NodeJS", | ||
@@ -5,0 +5,0 @@ "main": "lib/queue.js", |
@@ -17,3 +17,3 @@ var PostgresAdapter = require('../../lib/stores/PostgresAdapter'); | ||
this.adapter = g_client; | ||
return cb(); | ||
return PostgresAdapter.prototype.initialize.call(this, cb); | ||
} | ||
@@ -20,0 +20,0 @@ |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Long strings
Supply chain riskContains long string literals, which may be a sign of obfuscated or packed code.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
90486
2338
2