amqp-delegate
Advanced tools
Comparing version 1.1.1 to 1.1.2
@@ -44,4 +44,4 @@ # How to contribute to this project | ||
6. Respond to any comments as appropriate, making changes and `git push` ing further changes as appropriate. | ||
7. When all comments are dealt and the PR finally gets a :+1: from someone else then merge the PR. _Note we will not be using the `git flow feature finish`_ option as that merges into develop automatically without the option for review. [see this stackexchange for more on that](http://programmers.stackexchange.com/questions/187723/code-review-with-git-flow-and-github). | ||
8. In your command-line `git checkout develop` then `git pull upstream develop` to get the latest code and `git branch -D feature/{branchname}` to delete the old feature branch. | ||
7. When all comments are dealt and the PR finally gets a :+1: from someone else then merge the PR. **Note** we will _not_ be using the `git flow feature finish` option as that merges into develop automatically without the option for review. [see this stackexchange for more on that](http://programmers.stackexchange.com/questions/187723/code-review-with-git-flow-and-github). | ||
8. In your command-line `git checkout develop` then `git pull upstream develop` to get the latest code and `git branch -D feature/{branch_name}` to delete the old feature branch. | ||
@@ -48,0 +48,0 @@ #### Hotfixes and Support branches |
{ | ||
"name": "amqp-delegate", | ||
"version": "1.1.1", | ||
"version": "1.1.2", | ||
"description": "A remote worker system that uses AMQP to coordinate jobs.", | ||
@@ -53,3 +53,3 @@ "author": "Dave Sag <davesag@gmail.com>", | ||
"amqplib": "^0.6.0", | ||
"uuid": "^8.2.0" | ||
"uuid": "^8.3.1" | ||
}, | ||
@@ -61,10 +61,10 @@ "devDependencies": { | ||
"@stryker-mutator/mocha-runner": "^3.3.1", | ||
"ajv": "^6.12.3", | ||
"ajv": "^6.12.5", | ||
"chai": "^4.2.0", | ||
"chai-as-promised": "^7.1.1", | ||
"eslint": "^7.5.0", | ||
"eslint-config-prettier": "^6.11.0", | ||
"eslint": "^7.10.0", | ||
"eslint-config-prettier": "^6.12.0", | ||
"eslint-config-standard": "^14.1.1", | ||
"eslint-plugin-import": "^2.22.0", | ||
"eslint-plugin-mocha": "^7.0.1", | ||
"eslint-plugin-import": "^2.22.1", | ||
"eslint-plugin-mocha": "^8.0.0", | ||
"eslint-plugin-node": "^11.1.0", | ||
@@ -74,13 +74,13 @@ "eslint-plugin-prettier": "^3.1.4", | ||
"eslint-plugin-standard": "^4.0.1", | ||
"faker": "^4.1.0", | ||
"husky": "^4.2.5", | ||
"lint-staged": "^10.2.11", | ||
"mocha": "^8.0.1", | ||
"faker": "^5.1.0", | ||
"husky": "^4.3.0", | ||
"lint-staged": "^10.4.0", | ||
"mocha": "^8.1.3", | ||
"nyc": "^15.1.0", | ||
"prettier": "^2.0.5", | ||
"prettier": "^2.1.2", | ||
"proxyquire": "^2.1.3", | ||
"sinon": "^9.0.2", | ||
"sinon": "^9.2.0", | ||
"sinon-chai": "^3.5.0", | ||
"snyk": "^1.362.1", | ||
"supertest": "^4.0.2", | ||
"snyk": "^1.410.2", | ||
"supertest": "^5.0.0", | ||
"wait-until": "0.0.2" | ||
@@ -93,3 +93,4 @@ }, | ||
"arrowParens": "avoid", | ||
"trailingComma": "none" | ||
"trailingComma": "none", | ||
"printWidth": 100 | ||
}, | ||
@@ -96,0 +97,0 @@ "lint-staged": { |
@@ -78,4 +78,3 @@ # amqp-delegate | ||
const task = (a, b) => | ||
new Promise(resolve => setTimeout(() => resolve(a + b), 10)) | ||
const task = (a, b) => new Promise(resolve => setTimeout(() => resolve(a + b), 10)) | ||
@@ -82,0 +81,0 @@ const worker = makeWorker({ |
/** | ||
* Attaches onError and onClose event handlers | ||
* to the supplied connection. | ||
* | ||
* @param connection - An amqp connection | ||
* @param handers - The onError and onClose event handler functions. | ||
* @param handlers - The onError and onClose event handler functions. | ||
*/ | ||
@@ -7,0 +8,0 @@ const attachEvents = (connection, { onError, onClose }) => { |
@@ -7,4 +7,3 @@ const ERRORS = { | ||
QUEUE_NOT_STARTED: 'Message Queue has not been started', | ||
TASK_MISSING: | ||
'You must provide an async pure function as a task for the worker to perform', | ||
TASK_MISSING: 'You must provide an async pure function as a task for the worker to perform', | ||
WRONG_CORRELATION_ID: 'The provided correlationId is incorrect' | ||
@@ -11,0 +10,0 @@ } |
const amqp = require('amqplib') | ||
const { v4 } = require('uuid') | ||
const { | ||
QUEUE_NOT_STARTED, | ||
QUEUE_ALREADY_STARTED, | ||
NOT_CONNECTED | ||
} = require('./errors') | ||
const { QUEUE_NOT_STARTED, QUEUE_ALREADY_STARTED, NOT_CONNECTED } = require('./errors') | ||
const defaults = require('./defaults') | ||
@@ -15,11 +11,12 @@ const attachEvents = require('./attachEvents') | ||
* Create a Job Delegator with the given options. | ||
* @param options | ||
* | ||
* @param {Object} options | ||
* - exchange The name of the service exchange (optional. Defaults to '') | ||
* - url The url of the AQMP server to use. (Optional. Defaults to 'amqp://localhost') | ||
* - onError a hander to handle connection errors (optional) | ||
* - url The url of the AMQP server to use. (Optional. Defaults to 'amqp://localhost') | ||
* - onError a handler to handle connection errors (optional) | ||
* - onClose a handler to handle connection closed events (optional) | ||
* @return A Delegator | ||
* @return A Delegator function | ||
*/ | ||
const makeDelegator = (options = {}) => { | ||
const _options = { | ||
const opts = { | ||
...defaults, | ||
@@ -29,3 +26,3 @@ ...options | ||
const { url, onError, onClose } = _options | ||
const { url, onError, onClose } = opts | ||
@@ -32,0 +29,0 @@ let connection |
const amqp = require('amqplib') | ||
const { | ||
NAME_MISSING, | ||
NOT_CONNECTED, | ||
QUEUE_ALREADY_STARTED, | ||
TASK_MISSING | ||
} = require('./errors') | ||
const { NAME_MISSING, NOT_CONNECTED, QUEUE_ALREADY_STARTED, TASK_MISSING } = require('./errors') | ||
const defaults = require('./defaults') | ||
@@ -14,7 +9,8 @@ const attachEvents = require('./attachEvents') | ||
* Create a Worker with the given options. | ||
* | ||
* @param options | ||
* - name The name of the worker. (required) | ||
* - task A pure async function that does the work (required) | ||
* - url The url of the AQMP server to use. (optional, defaults to 'amqp://localhost') | ||
* - onError a hander to handle connection errors (optional) | ||
* - url The url of the AMQP server to use. (optional, defaults to 'amqp://localhost') | ||
* - onError a handler to handle connection errors (optional) | ||
* - onClose a handler to handle connection closed events (optional) | ||
@@ -21,0 +17,0 @@ * @return A Worker |
const messageCorrelator = require('./messageCorrelator') | ||
/** | ||
* Creates a function that sends a message to invoke the remote task and waits for a reply. | ||
* | ||
* @param {string} correlationId The ID You expect to get back | ||
* @param {Object} channel The channel to listen on | ||
* @param {string} replyTo The queue to reply to | ||
*/ | ||
const invoker = (correlationId, channel, replyTo) => async (name, params) => | ||
new Promise((resolve, reject) => { | ||
channel.consume( | ||
replyTo, | ||
messageCorrelator(correlationId, resolve, reject), | ||
{ noAck: true } | ||
) | ||
channel.consume(replyTo, messageCorrelator(correlationId, resolve, reject), { noAck: true }) | ||
@@ -11,0 +14,0 @@ channel.sendToQueue(name, Buffer.from(JSON.stringify(params)), { |
const { WRONG_CORRELATION_ID } = require('../errors') | ||
/** | ||
* The messageCorrelator returns a function that checks the message's correlationId to see | ||
* if it matches the one it's expecting. | ||
* | ||
* If they do not match then the function does nothing. | ||
* | ||
* If they do match then the function attempts to parse the message content and resolve with | ||
* the result. If parsing fails it will reject with the error. | ||
* | ||
* @param {string} correlationId The ID you are waiting on | ||
* @param {Function} resolve The a resolve function | ||
* @param {Function} reject The a reject function | ||
* @returns Function that correlates the received message with the curried correlationId. | ||
*/ | ||
const messageCorrelator = (correlationId, resolve, reject) => message => { | ||
@@ -4,0 +18,0 @@ if (message.properties.correlationId === correlationId) { |
@@ -0,1 +1,9 @@ | ||
/** | ||
* Creates a task runner function that receives a message, checks to ensure it's the message | ||
* we want, runs the curried task, then responds to the message's replyTo with the result of the task. | ||
* | ||
* @param {Function} task — An async function to be run when a suitable message is received. | ||
* @param {Object} channel — The channel to use for the reply. | ||
* @returns Function that runs the curried task. | ||
*/ | ||
const taskRunner = (channel, task) => async message => { | ||
@@ -5,7 +13,5 @@ try { | ||
const result = await task(...params) | ||
await channel.sendToQueue( | ||
message.properties.replyTo, | ||
Buffer.from(JSON.stringify(result)), | ||
{ correlationId: message.properties.correlationId } | ||
) | ||
await channel.sendToQueue(message.properties.replyTo, Buffer.from(JSON.stringify(result)), { | ||
correlationId: message.properties.correlationId | ||
}) | ||
channel.ack(message) | ||
@@ -12,0 +18,0 @@ return result |
23112
211
162
Updateduuid@^8.3.1