Comparing version 0.1.4 to 0.2.0
{ | ||
"name": "arque", | ||
"version": "0.1.4", | ||
"version": "0.2.0", | ||
"description": "RabbitMQ based microservice framework.", | ||
@@ -36,3 +36,4 @@ "main": "build/index.js", | ||
"eslint-plugin-promise": "^3.5.0", | ||
"eslint-plugin-standard": "^2.1.1" | ||
"eslint-plugin-standard": "^2.1.1", | ||
"rand-token": "^0.3.0" | ||
}, | ||
@@ -44,3 +45,3 @@ "ava": { | ||
], | ||
"timeout": "30s", | ||
"timeout": "10s", | ||
"concurrency": 5, | ||
@@ -47,0 +48,0 @@ "failFast": true, |
# `arque` | ||
A simple microservice framework based on RabbitMQ. | ||
## `Worker` | ||
```js | ||
@@ -13,14 +11,6 @@ import Arque from 'arque'; | ||
}); | ||
``` | ||
## `Client` | ||
```js | ||
import Arque from 'arque'; | ||
import assert from 'assert'; | ||
const echo = arque.createClient('echo'); | ||
const arque = new Arque(); | ||
arque | ||
.createClient('echo') | ||
.exec('Hello World!') | ||
echo('Hello World!') | ||
.then(message => { | ||
@@ -30,16 +20,59 @@ assert.equal(message, 'Hello World!'); | ||
``` | ||
## `Arque` | ||
```js | ||
const arque = new Arque(); | ||
``` | ||
```js | ||
const arque = new Arque('amqp://localhost'); | ||
``` | ||
```js | ||
const arque = new Arque({ | ||
uri: 'amqp://localhost', | ||
prefix: null | ||
}); | ||
``` | ||
### Options | ||
* `uri` - RabbitMQ URI. Default value is `amqp://localhost`. | ||
* `prefix` - Queue name prefix. Default value is `null`. | ||
## Options | ||
### `arque.createWorker()` | ||
Creates a worker object. | ||
```js | ||
const worker = arque.createWorker('echo', async message => { | ||
return message; | ||
}); | ||
``` | ||
```js | ||
const worker = arque.createWorker({ | ||
job: 'echo', | ||
concurrency: 1 | ||
}, async message => { | ||
return message; | ||
}); | ||
``` | ||
#### Options | ||
* `job` - Job name. `Required` | ||
* `concurrency` - Maximum number of jobs that can be executed concurrently. Default value is `1`. | ||
### `Arque` | ||
* `url` - RabbitMQ URL | ||
* `prefix` - Queue name prefix | ||
### `Worker` | ||
### `arque.createClient()` | ||
Creates a client object. | ||
```js | ||
const client = arque.createClient('echo'); | ||
``` | ||
```js | ||
const client = arque.createClient({ | ||
job: 'echo', | ||
timeout: 60000 | ||
}); | ||
``` | ||
#### Options | ||
* `job` - Job name | ||
* `concurrency` - Maximum number of jobs that can be executed concurrently | ||
* `timeout` - Timeout time in milliseconds. Default value is `60000`; | ||
### `Client` | ||
* `name` - Job name | ||
* `return` - Flag to indicate wether to expect a return value | ||
* `timeout` | ||
## `Worker` | ||
### `worker.close()` | ||
Gracefully shut down the `arque` worker. | ||
## `Client` | ||
### `client.close()` | ||
Gracefully shut down the `arque` client. |
@@ -8,2 +8,3 @@ import assert from 'assert'; | ||
* @param {string} options.job | ||
* @param {string} [options.timeout] | ||
* @param {function} handler | ||
@@ -15,7 +16,9 @@ */ | ||
} | ||
assert(options.job, 'Job name not specified'); | ||
this._job = options.job; | ||
this._timeout = options.timeout || 50000; | ||
this._id = uuid.v4().replace(/-/g, ''); | ||
this._callbacks = {}; | ||
this.reset(); | ||
} | ||
@@ -31,5 +34,10 @@ | ||
reset () { | ||
delete this._assertChannel; | ||
this._callbacks = new Map(); | ||
} | ||
setConnection (connection) { | ||
this._connection = connection; | ||
delete this._assertChannel; | ||
this.reset(); | ||
} | ||
@@ -45,5 +53,6 @@ | ||
channel.consume(this.callbackQueue, async message => { | ||
const correlationId = message.properties.correlationId; | ||
channel.ack(message); | ||
const payload = JSON.parse(message.content); | ||
const callback = this._callbacks[message.properties.correlationId]; | ||
const callback = this._callbacks.get(correlationId); | ||
if (callback) { | ||
@@ -62,2 +71,5 @@ callback(payload); | ||
async exec () { | ||
if (this._closeCallback) { | ||
throw new Error('Client is closing'); | ||
} | ||
let correlationId = uuid.v4().replace(/-/g, ''); | ||
@@ -69,3 +81,3 @@ let payload = { | ||
const channel = await this.assertChannel(); | ||
channel.sendToQueue( | ||
await channel.sendToQueue( | ||
this.queue, | ||
@@ -77,12 +89,44 @@ new Buffer(JSON.stringify(payload)), | ||
return await new Promise((resolve, reject) => { | ||
this._callbacks[correlationId] = result => { | ||
if (result.result) { | ||
resolve(result.result); | ||
const timeout = setTimeout(() => { | ||
this.deleteCallback(correlationId); | ||
const error = new Error('Job timeout.'); | ||
error.code = 'TIMEOUT'; | ||
error.correlationId = correlationId; | ||
error.job = this._job; | ||
reject(error); | ||
}, this._timeout); | ||
this._callbacks.set(correlationId, payload => { | ||
clearTimeout(timeout); | ||
if (payload.result) { | ||
resolve(payload.result); | ||
} else { | ||
reject(new Error(result.error)); | ||
const error = new Error(payload.error.message); | ||
for (const key in payload.error) { | ||
if (key === 'message') { | ||
continue; | ||
} | ||
error[key] = payload.error[key]; | ||
} | ||
reject(error); | ||
} | ||
delete this._callbacks[correlationId]; | ||
}; | ||
this.deleteCallback(correlationId); | ||
}); | ||
}); | ||
} | ||
deleteCallback (correlationId) { | ||
delete this._callbacks.delete(correlationId); | ||
if (this._closeCallback && this._callbacks.size === 0) { | ||
this._closeCallback(); | ||
} | ||
} | ||
async close () { | ||
await new Promise(resolve => { | ||
this._closeCallback = resolve; | ||
}); | ||
const channel = await this.assertChannel(); | ||
await channel.close(); | ||
} | ||
} |
@@ -8,8 +8,11 @@ import amqp from 'amqplib'; | ||
export default class Arque { | ||
constructor () { | ||
let options; | ||
if (typeof arguments[0] === 'string') { | ||
options = {url: arguments[0]}; | ||
} else { | ||
options = arguments[0] || {}; | ||
/** | ||
* Constructor | ||
* @param {Object} [options] | ||
* @param {string} [options.uri] | ||
* @param {string} [options.prefix] | ||
*/ | ||
constructor (options = {}) { | ||
if (typeof options === 'string') { | ||
options = {uri: options}; | ||
} | ||
@@ -26,3 +29,3 @@ | ||
this._assertConnection = amqp | ||
.connect(this._url) | ||
.connect(this._uri) | ||
.then(connection => { | ||
@@ -59,5 +62,11 @@ connection._maxListeners = MAX_CONNECTION_LISTENERS; | ||
client.setConnection(connection); | ||
return async function () { | ||
const clientFunc = async function () { | ||
return await client.exec.apply(client, arguments); | ||
}; | ||
clientFunc.close = async function () { | ||
await client.close.apply(client); | ||
}; | ||
return clientFunc; | ||
})(); | ||
@@ -64,0 +73,0 @@ return client; |
@@ -20,2 +20,3 @@ import assert from 'assert'; | ||
this._handler = handler; | ||
this._jobs = new Map(); | ||
} | ||
@@ -35,22 +36,48 @@ | ||
channel.prefetch(this._concurrency); | ||
await channel.consume(this.queue, async message => { | ||
await channel.ack(message); | ||
this._channel = channel; | ||
const {consumerTag} = await channel.consume(this.queue, async message => { | ||
const correlationId = message.properties.correlationId; | ||
const payload = JSON.parse(message.content); | ||
let result; | ||
this._jobs.set(correlationId, payload); | ||
let response; | ||
try { | ||
result = { | ||
result: await this._handler.apply(this._handler, payload.arguments) | ||
}; | ||
const result = await this._handler.apply(this._handler, payload.arguments); | ||
response = {result}; | ||
} catch (err) { | ||
result = { | ||
error: err.message | ||
}; | ||
const error = {message: err.message}; | ||
for (const key in err) { | ||
error[key] = err[key]; | ||
} | ||
response = {error}; | ||
} finally { | ||
await channel.ack(message); | ||
} | ||
channel.sendToQueue( | ||
await channel.sendToQueue( | ||
message.properties.replyTo, | ||
new Buffer(JSON.stringify(result)), | ||
{correlationId: message.properties.correlationId}); | ||
new Buffer(JSON.stringify(response)), | ||
{correlationId}); | ||
this._jobs.delete(correlationId); | ||
if (this._closeCallback && this._jobs.size === 0) { | ||
this._closeCallback(); | ||
} | ||
}); | ||
this._consumerTag = consumerTag; | ||
} | ||
async close () { | ||
if (this._consumerTag) { | ||
const channel = this._channel; | ||
await channel.cancel(this._consumerTag); | ||
await new Promise(resolve => { | ||
this._closeCallback = resolve; | ||
}); | ||
await channel.close(); | ||
} | ||
} | ||
} |
161
test/rpc.js
import test from 'ava'; | ||
import _ from 'lodash'; | ||
import {generate as randString} from 'rand-token'; | ||
import Arque from '../src'; | ||
import {delay} from './helpers'; | ||
const arque = new Arque(); | ||
test('Should execute job and return result.', async t => { | ||
const JOB_NAME = 'echo'; | ||
const DELAY = 200; | ||
const JOB_NAME = 'echo' + randString(8); | ||
const MESSAGE = 'Hello World!'; | ||
const arque = new Arque(); | ||
await arque.createWorker(JOB_NAME, async message => { | ||
await delay(DELAY); | ||
return message; | ||
@@ -16,1 +22,152 @@ }); | ||
}); | ||
test('Should execute jobs sequentially', async t => { | ||
const DELAY = 200; | ||
const JOB_NAME = 'echo' + randString(8); | ||
await arque.createWorker(JOB_NAME, async message => { | ||
await delay(DELAY); | ||
return message; | ||
}); | ||
const echo = await arque.createClient(JOB_NAME); | ||
const timestamp = Date.now(); | ||
const result = await Promise.all(_.times(3, async index => { | ||
return await echo({index}); | ||
})); | ||
t.truthy(Date.now() - timestamp >= DELAY * 3); | ||
t.truthy(Date.now() - timestamp < DELAY * 4); | ||
t.deepEqual(result, _.times(3, index => { | ||
return {index}; | ||
})); | ||
}); | ||
test('Should execute jobs in parallel', async t => { | ||
const DELAY = 200; | ||
const JOB_NAME = 'echo' + randString(8); | ||
await arque.createWorker({ | ||
job: JOB_NAME, | ||
concurrency: 10 | ||
}, async message => { | ||
await delay(DELAY); | ||
return message; | ||
}); | ||
const echo = await arque.createClient(JOB_NAME); | ||
const timestamp = Date.now(); | ||
const result = await Promise.all(_.times(10, async index => { | ||
return await echo({index}); | ||
})); | ||
t.truthy(Date.now() - timestamp >= DELAY); | ||
t.truthy(Date.now() - timestamp < DELAY * 2); | ||
t.deepEqual(result, _.times(10, index => { | ||
return {index}; | ||
})); | ||
}); | ||
test('Should distribute jobs to multiple workers', async t => { | ||
const DELAY = 200; | ||
const JOB_NAME = 'echo' + randString(8); | ||
const counts = []; | ||
await Promise.all(_.times(5, async index => { | ||
await arque.createWorker({ | ||
job: JOB_NAME, | ||
concurrency: 4 | ||
}, async message => { | ||
counts[index] = (counts[index] || 0) + 1; | ||
await delay(DELAY); | ||
return message; | ||
}); | ||
})); | ||
const echo = await arque.createClient(JOB_NAME); | ||
const timestamp = Date.now(); | ||
const result = await Promise.all(_.times(20, async index => { | ||
return await echo({index}); | ||
})); | ||
t.truthy(Date.now() - timestamp >= DELAY); | ||
t.truthy(Date.now() - timestamp < DELAY * 2); | ||
t.deepEqual(result, _.times(20, index => { | ||
return {index}; | ||
})); | ||
t.deepEqual(counts, [4, 4, 4, 4, 4]); | ||
}); | ||
test('Should close client gracefully.', async t => { | ||
const DELAY = 200; | ||
const JOB_NAME = 'echo' + randString(8); | ||
await arque.createWorker({ | ||
concurrency: 5, | ||
job: JOB_NAME | ||
}, async message => { | ||
await delay(DELAY); | ||
return message; | ||
}); | ||
const echo = await arque.createClient(JOB_NAME); | ||
const promise = Promise.all(_.times(5, async index => { | ||
return await echo({index}); | ||
})); | ||
const timestamp = Date.now(); | ||
await echo.close(); | ||
t.truthy(Date.now() - timestamp >= DELAY); | ||
t.truthy(Date.now() - timestamp < DELAY * 2); | ||
t.deepEqual(await promise, _.times(5, index => { | ||
return {index}; | ||
})); | ||
}); | ||
test('Should close worker gracefully.', async t => { | ||
const DELAY = 200; | ||
const JOB_NAME = 'echo' + randString(8); | ||
let count = 0; | ||
let receiveCallback; | ||
const worker = await arque.createWorker({ | ||
concurrency: 5, | ||
job: JOB_NAME | ||
}, async message => { | ||
count++; | ||
if (count >= 5) { | ||
receiveCallback(); | ||
} | ||
await delay(DELAY); | ||
return message; | ||
}); | ||
const echo = await arque.createClient(JOB_NAME); | ||
const promise = Promise.all(_.times(5, async index => { | ||
return await echo({index}); | ||
})); | ||
await new Promise(resolve => { | ||
receiveCallback = resolve; | ||
}); | ||
const timestamp = Date.now(); | ||
await worker.close(); | ||
t.truthy(Date.now() - timestamp >= DELAY); | ||
t.truthy(Date.now() - timestamp < DELAY * 2); | ||
t.deepEqual(await promise, _.times(5, index => { | ||
return {index}; | ||
})); | ||
}); | ||
test('Should handle error correctly', async t => { | ||
const JOB_NAME = 'echo' + randString(8); | ||
await arque.createWorker(JOB_NAME, async () => { | ||
const error = new Error('Error'); | ||
error.code = 'ERROR'; | ||
throw error; | ||
}); | ||
const echo = await arque.createClient(JOB_NAME); | ||
let error = await t.throws(echo()); | ||
t.is(error.message, 'Error'); | ||
t.is(error.code, 'ERROR'); | ||
}); | ||
test('Should handle timeout correctly', async t => { | ||
const DELAY = 1000; | ||
const JOB_NAME = 'echo' + randString(8); | ||
await arque.createWorker(JOB_NAME, async () => { | ||
await delay(DELAY); | ||
}); | ||
const echo = await arque.createClient({ | ||
job: JOB_NAME, | ||
timeout: 500 | ||
}); | ||
let error = await t.throws(echo()); | ||
t.is(error.code, 'TIMEOUT'); | ||
}); |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
135389
17
571
77
10