Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

arque

Package Overview
Dependencies
Maintainers
1
Versions
72
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

arque - npm Package Compare versions

Comparing version 0.1.4 to 0.2.0

yarn.lock

7

package.json
{
"name": "arque",
"version": "0.1.4",
"version": "0.2.0",
"description": "RabbitMQ based microservice framework.",

@@ -36,3 +36,4 @@ "main": "build/index.js",

"eslint-plugin-promise": "^3.5.0",
"eslint-plugin-standard": "^2.1.1"
"eslint-plugin-standard": "^2.1.1",
"rand-token": "^0.3.0"
},

@@ -44,3 +45,3 @@ "ava": {

],
"timeout": "30s",
"timeout": "10s",
"concurrency": 5,

@@ -47,0 +48,0 @@ "failFast": true,

# `arque`
A simple microservice framework based on RabbitMQ.
## `Worker`
```js

@@ -13,14 +11,6 @@ import Arque from 'arque';

});
```
## `Client`
```js
import Arque from 'arque';
import assert from 'assert';
const echo = arque.createClient('echo');
const arque = new Arque();
arque
.createClient('echo')
.exec('Hello World!')
echo('Hello World!')
.then(message => {

@@ -30,16 +20,59 @@ assert.equal(message, 'Hello World!');

```
## `Arque`
```js
const arque = new Arque();
```
```js
const arque = new Arque('amqp://localhost');
```
```js
const arque = new Arque({
uri: 'amqp://localhost',
prefix: null
});
```
### Options
* `uri` - RabbitMQ URI. Default value is `amqp://localhost`.
* `prefix` - Queue name prefix. Default value is `null`.
## Options
### `arque.createWorker()`
Creates a worker object.
```js
const worker = arque.createWorker('echo', async message => {
return message;
});
```
```js
const worker = arque.createWorker({
job: 'echo',
concurrency: 1
}, async message => {
return message;
});
```
#### Options
* `job` - Job name. `Required`
* `concurrency` - Maximum number of jobs that can be executed concurrently. Default value is `1`.
### `Arque`
* `url` - RabbitMQ URL
* `prefix` - Queue name prefix
### `Worker`
### `arque.createClient()`
Creates a client object.
```js
const client = arque.createClient('echo');
```
```js
const client = arque.createClient({
job: 'echo',
timeout: 60000
});
```
#### Options
* `job` - Job name
* `concurrency` - Maximum number of jobs that can be executed concurrently
* `timeout` - Timeout time in milliseconds. Default value is `60000`;
### `Client`
* `name` - Job name
* `return` - Flag to indicate wether to expect a return value
* `timeout`
## `Worker`
### `worker.close()`
Gracefully shut down the `arque` worker.
## `Client`
### `client.close()`
Gracefully shut down the `arque` client.

@@ -8,2 +8,3 @@ import assert from 'assert';

* @param {string} options.job
* @param {string} [options.timeout]
* @param {function} handler

@@ -15,7 +16,9 @@ */

}
assert(options.job, 'Job name not specified');
this._job = options.job;
this._timeout = options.timeout || 50000;
this._id = uuid.v4().replace(/-/g, '');
this._callbacks = {};
this.reset();
}

@@ -31,5 +34,10 @@

reset () {
delete this._assertChannel;
this._callbacks = new Map();
}
setConnection (connection) {
this._connection = connection;
delete this._assertChannel;
this.reset();
}

@@ -45,5 +53,6 @@

channel.consume(this.callbackQueue, async message => {
const correlationId = message.properties.correlationId;
channel.ack(message);
const payload = JSON.parse(message.content);
const callback = this._callbacks[message.properties.correlationId];
const callback = this._callbacks.get(correlationId);
if (callback) {

@@ -62,2 +71,5 @@ callback(payload);

async exec () {
if (this._closeCallback) {
throw new Error('Client is closing');
}
let correlationId = uuid.v4().replace(/-/g, '');

@@ -69,3 +81,3 @@ let payload = {

const channel = await this.assertChannel();
channel.sendToQueue(
await channel.sendToQueue(
this.queue,

@@ -77,12 +89,44 @@ new Buffer(JSON.stringify(payload)),

return await new Promise((resolve, reject) => {
this._callbacks[correlationId] = result => {
if (result.result) {
resolve(result.result);
const timeout = setTimeout(() => {
this.deleteCallback(correlationId);
const error = new Error('Job timeout.');
error.code = 'TIMEOUT';
error.correlationId = correlationId;
error.job = this._job;
reject(error);
}, this._timeout);
this._callbacks.set(correlationId, payload => {
clearTimeout(timeout);
if (payload.result) {
resolve(payload.result);
} else {
reject(new Error(result.error));
const error = new Error(payload.error.message);
for (const key in payload.error) {
if (key === 'message') {
continue;
}
error[key] = payload.error[key];
}
reject(error);
}
delete this._callbacks[correlationId];
};
this.deleteCallback(correlationId);
});
});
}
deleteCallback (correlationId) {
delete this._callbacks.delete(correlationId);
if (this._closeCallback && this._callbacks.size === 0) {
this._closeCallback();
}
}
async close () {
await new Promise(resolve => {
this._closeCallback = resolve;
});
const channel = await this.assertChannel();
await channel.close();
}
}

@@ -8,8 +8,11 @@ import amqp from 'amqplib';

export default class Arque {
constructor () {
let options;
if (typeof arguments[0] === 'string') {
options = {url: arguments[0]};
} else {
options = arguments[0] || {};
/**
* Constructor
* @param {Object} [options]
* @param {string} [options.uri]
* @param {string} [options.prefix]
*/
constructor (options = {}) {
if (typeof options === 'string') {
options = {uri: options};
}

@@ -26,3 +29,3 @@

this._assertConnection = amqp
.connect(this._url)
.connect(this._uri)
.then(connection => {

@@ -59,5 +62,11 @@ connection._maxListeners = MAX_CONNECTION_LISTENERS;

client.setConnection(connection);
return async function () {
const clientFunc = async function () {
return await client.exec.apply(client, arguments);
};
clientFunc.close = async function () {
await client.close.apply(client);
};
return clientFunc;
})();

@@ -64,0 +73,0 @@ return client;

@@ -20,2 +20,3 @@ import assert from 'assert';

this._handler = handler;
this._jobs = new Map();
}

@@ -35,22 +36,48 @@

channel.prefetch(this._concurrency);
await channel.consume(this.queue, async message => {
await channel.ack(message);
this._channel = channel;
const {consumerTag} = await channel.consume(this.queue, async message => {
const correlationId = message.properties.correlationId;
const payload = JSON.parse(message.content);
let result;
this._jobs.set(correlationId, payload);
let response;
try {
result = {
result: await this._handler.apply(this._handler, payload.arguments)
};
const result = await this._handler.apply(this._handler, payload.arguments);
response = {result};
} catch (err) {
result = {
error: err.message
};
const error = {message: err.message};
for (const key in err) {
error[key] = err[key];
}
response = {error};
} finally {
await channel.ack(message);
}
channel.sendToQueue(
await channel.sendToQueue(
message.properties.replyTo,
new Buffer(JSON.stringify(result)),
{correlationId: message.properties.correlationId});
new Buffer(JSON.stringify(response)),
{correlationId});
this._jobs.delete(correlationId);
if (this._closeCallback && this._jobs.size === 0) {
this._closeCallback();
}
});
this._consumerTag = consumerTag;
}
async close () {
if (this._consumerTag) {
const channel = this._channel;
await channel.cancel(this._consumerTag);
await new Promise(resolve => {
this._closeCallback = resolve;
});
await channel.close();
}
}
}
import test from 'ava';
import _ from 'lodash';
import {generate as randString} from 'rand-token';
import Arque from '../src';
import {delay} from './helpers';
const arque = new Arque();
test('Should execute job and return result.', async t => {
const JOB_NAME = 'echo';
const DELAY = 200;
const JOB_NAME = 'echo' + randString(8);
const MESSAGE = 'Hello World!';
const arque = new Arque();
await arque.createWorker(JOB_NAME, async message => {
await delay(DELAY);
return message;

@@ -16,1 +22,152 @@ });

});
test('Should execute jobs sequentially', async t => {
const DELAY = 200;
const JOB_NAME = 'echo' + randString(8);
await arque.createWorker(JOB_NAME, async message => {
await delay(DELAY);
return message;
});
const echo = await arque.createClient(JOB_NAME);
const timestamp = Date.now();
const result = await Promise.all(_.times(3, async index => {
return await echo({index});
}));
t.truthy(Date.now() - timestamp >= DELAY * 3);
t.truthy(Date.now() - timestamp < DELAY * 4);
t.deepEqual(result, _.times(3, index => {
return {index};
}));
});
test('Should execute jobs in parallel', async t => {
const DELAY = 200;
const JOB_NAME = 'echo' + randString(8);
await arque.createWorker({
job: JOB_NAME,
concurrency: 10
}, async message => {
await delay(DELAY);
return message;
});
const echo = await arque.createClient(JOB_NAME);
const timestamp = Date.now();
const result = await Promise.all(_.times(10, async index => {
return await echo({index});
}));
t.truthy(Date.now() - timestamp >= DELAY);
t.truthy(Date.now() - timestamp < DELAY * 2);
t.deepEqual(result, _.times(10, index => {
return {index};
}));
});
test('Should distribute jobs to multiple workers', async t => {
const DELAY = 200;
const JOB_NAME = 'echo' + randString(8);
const counts = [];
await Promise.all(_.times(5, async index => {
await arque.createWorker({
job: JOB_NAME,
concurrency: 4
}, async message => {
counts[index] = (counts[index] || 0) + 1;
await delay(DELAY);
return message;
});
}));
const echo = await arque.createClient(JOB_NAME);
const timestamp = Date.now();
const result = await Promise.all(_.times(20, async index => {
return await echo({index});
}));
t.truthy(Date.now() - timestamp >= DELAY);
t.truthy(Date.now() - timestamp < DELAY * 2);
t.deepEqual(result, _.times(20, index => {
return {index};
}));
t.deepEqual(counts, [4, 4, 4, 4, 4]);
});
test('Should close client gracefully.', async t => {
const DELAY = 200;
const JOB_NAME = 'echo' + randString(8);
await arque.createWorker({
concurrency: 5,
job: JOB_NAME
}, async message => {
await delay(DELAY);
return message;
});
const echo = await arque.createClient(JOB_NAME);
const promise = Promise.all(_.times(5, async index => {
return await echo({index});
}));
const timestamp = Date.now();
await echo.close();
t.truthy(Date.now() - timestamp >= DELAY);
t.truthy(Date.now() - timestamp < DELAY * 2);
t.deepEqual(await promise, _.times(5, index => {
return {index};
}));
});
test('Should close worker gracefully.', async t => {
const DELAY = 200;
const JOB_NAME = 'echo' + randString(8);
let count = 0;
let receiveCallback;
const worker = await arque.createWorker({
concurrency: 5,
job: JOB_NAME
}, async message => {
count++;
if (count >= 5) {
receiveCallback();
}
await delay(DELAY);
return message;
});
const echo = await arque.createClient(JOB_NAME);
const promise = Promise.all(_.times(5, async index => {
return await echo({index});
}));
await new Promise(resolve => {
receiveCallback = resolve;
});
const timestamp = Date.now();
await worker.close();
t.truthy(Date.now() - timestamp >= DELAY);
t.truthy(Date.now() - timestamp < DELAY * 2);
t.deepEqual(await promise, _.times(5, index => {
return {index};
}));
});
test('Should handle error correctly', async t => {
const JOB_NAME = 'echo' + randString(8);
await arque.createWorker(JOB_NAME, async () => {
const error = new Error('Error');
error.code = 'ERROR';
throw error;
});
const echo = await arque.createClient(JOB_NAME);
let error = await t.throws(echo());
t.is(error.message, 'Error');
t.is(error.code, 'ERROR');
});
test('Should handle timeout correctly', async t => {
const DELAY = 1000;
const JOB_NAME = 'echo' + randString(8);
await arque.createWorker(JOB_NAME, async () => {
await delay(DELAY);
});
const echo = await arque.createClient({
job: JOB_NAME,
timeout: 500
});
let error = await t.throws(echo());
t.is(error.code, 'TIMEOUT');
});
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc