@condor-labs/bullmq
Advanced tools
Comparing version 0.0.1 to 0.0.2
{ | ||
"name": "@condor-labs/bullmq", | ||
"version": "0.0.1", | ||
"version": "0.0.2", | ||
"description": "This module provide and useful helper to use bullmq library", | ||
@@ -41,2 +41,2 @@ "engines": { | ||
} | ||
} | ||
} |
@@ -1,32 +0,32 @@ | ||
const bullmq = require('bullmq') | ||
const logger = require('@condor-labs/logger') | ||
const joi = require('joi') | ||
const bullmq = require("bullmq"); | ||
const logger = require("@condor-labs/logger"); | ||
const joi = require("joi"); | ||
const queues = [] | ||
const queues = []; | ||
const self = { | ||
_settings: null, | ||
_errorMsgInvalidSetting: 'INVALID_SETTINGS', | ||
_errorMsgUndefinedSetting: 'SETTINGS_NOT_DEFINED', | ||
_errorMsgInvalidQueue: 'QUEUE_NAME_NOT_DEFINED', | ||
_errorMsgInvalidSetting: "INVALID_SETTINGS", | ||
_errorMsgUndefinedSetting: "SETTINGS_NOT_DEFINED", | ||
_errorMsgInvalidQueue: "QUEUE_NAME_NOT_DEFINED", | ||
_errorCallback: (processType) => (_job, error) => { | ||
logger.error(`Error at worker ${processType}`, error) | ||
logger.error(`Error at worker ${processType}`, error); | ||
}, | ||
_upperAndTrim: (queueName) => { | ||
if (!queueName) { | ||
throw new Error(self._errorMsgInvalidQueue) | ||
throw new Error(self._errorMsgInvalidQueue); | ||
} | ||
return queueName.toUpperCase().trim() | ||
return queueName.toUpperCase().trim(); | ||
}, | ||
_findQueue: (queueName) => { | ||
const foundQueue = queues.find((q) => { | ||
return q.name === queueName | ||
}) | ||
return q.name === queueName; | ||
}); | ||
return foundQueue | ||
return foundQueue; | ||
}, | ||
_validateSettings: (_settings) => { | ||
if (!_settings) { | ||
logger.error(self._errorMsgUndefinedSetting) | ||
return false | ||
logger.error(self._errorMsgUndefinedSetting); | ||
return false; | ||
} | ||
@@ -37,15 +37,15 @@ | ||
port: joi.number().default(6379), | ||
host: joi.string().default('127.0.0.1').required(), | ||
password: joi.string().default(null).allow('') | ||
host: joi.string().default("127.0.0.1").required(), | ||
password: joi.string().default(null).allow(""), | ||
}, | ||
options: joi.object() | ||
}) | ||
options: joi.object(), | ||
}); | ||
const { error, value } = schema.validate(_settings) | ||
const { error, value } = schema.validate(_settings); | ||
if (!error) { | ||
return true | ||
return true; | ||
} else { | ||
logger.error(`${error} - ${value}`) | ||
return false | ||
logger.error(`${error} - ${value}`); | ||
return false; | ||
} | ||
@@ -55,34 +55,35 @@ }, | ||
if (!self._validateSettings(settings)) { | ||
throw new Error(self._errorMsgInvalidSetting) | ||
throw new Error(self._errorMsgInvalidSetting); | ||
} | ||
self._settings = settings | ||
self._settings = settings; | ||
}, | ||
getQueue: (queueName) => { | ||
const name = self._upperAndTrim(queueName) | ||
const foundQueue = self._findQueue(name) | ||
const name = self._upperAndTrim(queueName); | ||
const foundQueue = self._findQueue(name); | ||
if (foundQueue) { | ||
return foundQueue.queue | ||
return foundQueue.queue; | ||
} | ||
const newQueue = new bullmq.Queue(name, { | ||
connection: self._settings.redis | ||
}) | ||
connection: self._settings.redis, | ||
}); | ||
queues.push({ name, queue: newQueue }) | ||
return newQueue | ||
queues.push({ name, queue: newQueue }); | ||
return newQueue; | ||
}, | ||
add: async (queueName, data) => { | ||
const queue = self.getQueue(queueName) | ||
await queue.add(queueName, data) | ||
const queue = self.getQueue(queueName); | ||
await queue.add(queueName, data); | ||
}, | ||
process: (queueName, handler, options) => { | ||
const worker = new bullmq.Worker(queueName, handler, { | ||
const name = self._upperAndTrim(queueName); | ||
const worker = new bullmq.Worker(name, handler, { | ||
connection: self._settings.redis, | ||
...options | ||
}) | ||
...options, | ||
}); | ||
worker.on('failed', self._errorCallback(queueName)) | ||
return worker | ||
worker.on("failed", self._errorCallback(name)); | ||
return worker; | ||
}, | ||
@@ -93,7 +94,7 @@ pushBulk: (queueName, jobs = []) => { | ||
name: queueName, | ||
...job | ||
} | ||
}) | ||
const queue = self.getQueue(self._upperAndTrim(queueName)) | ||
return queue.addBulk(_jobs) | ||
...job, | ||
}; | ||
}); | ||
const queue = self.getQueue(self._upperAndTrim(queueName)); | ||
return queue.addBulk(_jobs); | ||
}, | ||
@@ -104,6 +105,6 @@ pause: (queueName) => self.getQueue(queueName).pause(), | ||
default: () => { | ||
return bullmq | ||
} | ||
} | ||
return bullmq; | ||
}, | ||
}; | ||
module.exports = self | ||
module.exports = self; |
8405
198