You're Invited: Meet the Socket team at BSidesSF and RSAC - April 27 - May 1.RSVP
Socket
Sign inDemoInstall
Socket

skein-rpc

Package Overview
Dependencies
Maintainers
1
Versions
2
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

skein-rpc - npm Package Compare versions

Comparing version

to
0.3.1

1

lib/client.js

@@ -30,2 +30,3 @@ // == Imports ===============================================================

// FUTURE: Make these all async and wait on init automatically.
rpc(exchangeName, routingKey, options) {

@@ -32,0 +33,0 @@ return new RPC(exchangeName, routingKey, this.createOptions(options));

23

lib/conduit.js

@@ -28,8 +28,10 @@ // == Imports ===============================================================

this.init = this.connected.init.then(() => {
this.channel = this.connected.channel;
this.init = (async () => {
await this.connected.init;
return this.connected.prepareQueue(this.queueName, this.exchangeName);
}).then(() => {
return this.channel.consume(this.queueName, msg => {
this.channel = await this.connected.createChannel();
await this.connected.prepareQueue(this.queueName, this.exchangeName);
let consumer = await this.channel.consume(this.queueName, msg => {
this.channel.ack(msg);

@@ -39,5 +41,5 @@

});
}).then(consumer => {
this.consumerTag = consumer.consumerTag;
});
})();
}

@@ -51,3 +53,8 @@

if (this.consumerTag) {
this.channel.cancel(this.consumerTag);
try {
this.channel.close();
}
catch (err) {
// Close failed, but this is being shut down anyway.
}
}

@@ -54,0 +61,0 @@

@@ -59,3 +59,3 @@ // == Imports ===============================================================

if (options) {
var data = require(options);
let data = require(options);

@@ -74,3 +74,3 @@ merge(this, data.defaults, data[this.env()]);

uri() {
var uri = 'amqp://';
let uri = 'amqp://';

@@ -77,0 +77,0 @@ if (this.username && this.username != defaults.username)

@@ -21,3 +21,3 @@ // == Imports ===============================================================

static async open(options) {
var connected = new Connected(options);
let connected = new Connected(options);

@@ -34,12 +34,16 @@ await connected.init;

this.init = (init || Promise.resolve()).then(() => {
this.init = (async () => {
if (init) {
await init;
}
if (options && options.connection) {
this.connection = options.connection;
++this.connection.__refs;
}
else {
return RabbitMQ.connect(this.config).then(connection => {
this.connection = connection;
});
this.connection = await RabbitMQ.connect(this.config);
this.connection.__refs = 1;
}
}).then(() => {
if (options && options.channel) {

@@ -49,7 +53,5 @@ this.channel = options.channel;

else {
return this.connection.createChannel().then(channel => {
this.channel = channel;
});
this.channel = await this.connection.createChannel();
}
});
})();

@@ -59,8 +61,30 @@ this.context = new Context(options);

close() {
if (this.channel) {
this.channel.close();
async createChannel() {
return this.connection.createChannel();
}
async close() {
let channel = this.channel;
if (channel) {
delete this.channel;
try {
await channel.close();
}
catch (err) {
/// Close attempt failed, but must carry on.
}
}
--this.connection.__refs;
if (!this.connection.__refs) {
let connection = this.connection;
delete this.connection;
return connection.close();
}
}

@@ -98,3 +122,3 @@

assertQueue(name, options) {
var autoDelete = options && options.autoDelete;
let autoDelete = options && options.autoDelete;

@@ -125,4 +149,4 @@ return this.channel.assertQueue(

publishAsJson(exchangeName, routingKey, data, options) {
return this.channel.publish(
async publishAsJson(exchangeName, routingKey, data, options) {
await this.channel.publish(
exchangeName,

@@ -140,20 +164,25 @@ routingKey,

consumeAsPromise(replyQueue, action) {
return new Promise((resolve, reject) => {
if (!replyQueue) {
replyQueue = this.context.ident();
}
async consumeAsPromise(replyQueue, action) {
return new Promise(async (resolve, reject) => {
try {
if (!replyQueue) {
replyQueue = this.context.ident();
}
await this.channel.assertQueue(replyQueue, {
exclusive: true,
autoDelete: true
});
this.channel.assertQueue(replyQueue, {
exclusive: true,
autoDelete: true
}).then(() => {
this.channel.consume(replyQueue, msg => {
await this.channel.consume(replyQueue, msg => {
resolve(msg);
});
}).then(() => {
if (action) {
action();
}
})
}
catch (err) {
reject(err);
}
});

@@ -163,3 +192,3 @@ }

async readStream(queueName, exchangeName, options) {
var stream = new Readable(this, queueName, exchangeName, options);
let stream = new Readable(this, queueName, exchangeName, options);

@@ -172,3 +201,3 @@ await stream.init;

async writeStream(routingKey, exchangeName, options) {
var stream = new Writable(this, routingKey, exchangeName, options);
let stream = new Writable(this, routingKey, exchangeName, options);

@@ -175,0 +204,0 @@ await stream.init;

@@ -5,3 +5,3 @@ // == Imports ===============================================================

const uuid = require('uuid/v4');
const sprintf = require('sprintf');
const { sprintf } = require('sprintf-js');

@@ -12,3 +12,3 @@ const processName = require('./support').processName;

var sequence = 0;
let sequence = 0;

@@ -15,0 +15,0 @@ class Context {

@@ -16,5 +16,5 @@ // == Imports ===============================================================

handle(json) {
var payload = JSON.parse(json);
let payload = JSON.parse(json);
var method = this[payload.method];
let method = this[payload.method];

@@ -21,0 +21,0 @@ if (typeof(method) != 'function') {

@@ -6,3 +6,3 @@ // == Imports ===============================================================

function connect(config) {
var driver = require(config.driver);
let driver = require(config.driver);

@@ -9,0 +9,0 @@ return driver.connect(config.uri());

@@ -29,16 +29,18 @@ // == Imports ===============================================================

var obj = { };
let obj = { };
var connected = obj.__connected = new Connected(new Config(options.config));
var context = connected.context;
var queueName = options.queueName || connected.context.ident();
var routingKey = routingKey || options.routingKey;
var requests = { };
var persistent = options.persistent;
let connected = obj.__connected = new Connected(options);
let context = connected.context;
let queueName = options.queueName || connected.context.ident();
let persistent = options.persistent;
var handler = function(msg) {
var data = json.fromBuffer(msg.content);
routingKey = routingKey || options.routingKey;
var request = requests[data.id];
let requests = { };
let handler = msg => {
let data = json.fromBuffer(msg.content);
let request = requests[data.id];
if (request) {

@@ -54,19 +56,24 @@ if (request.error) {

obj.init = connected.init.then(() => {
return connected.assertExchange(
// Sets up a Promise that waits for the connection to be fully established
obj.init = (async () => {
await connected.init;
await connected.assertExchange(
exchangeName,
'topic'
);
}).then(_exchange => {
return connected.assertQueue(queueName, {
await connected.assertQueue(queueName, {
exclusive: true,
autoDelete: true
});
}).then(() => {
return connected.channel.consume(queueName, handler);
});
return new Proxy(obj,
await connected.channel.consume(queueName, handler);
})();
// Create a Proxy that converts incoming method calls into the corresponding
// JSON-RPC messages.
let proxy = new Proxy(obj,
{
get: (_obj, prop) => {
get: (_, prop) => {
if (obj[prop]) {

@@ -77,3 +84,3 @@ return obj[prop];

obj[prop] = function() {
var messageId = context.requestId();
let messageId = context.requestId();

@@ -105,3 +112,9 @@ connected.publishAsJson(

}
)
);
proxy.close = () => {
connected.close();
};
return proxy;
}

@@ -108,0 +121,0 @@

@@ -27,18 +27,24 @@ // == Imports ===============================================================

this.autoDelete = options && options.autoDelete;
this.exclusive = options && options.exclusive;
this.durable = options && options.durable;
this.setup();
}
let init = this.init;
setup() {
this.init = this.init.then(() => {
return this.assertExchange(this.exchangeName, this.exchangeType);
}).then(() => {
return this.assertQueue(this.queueName, { autoDelete: this.autoDelete });
}).then(() => {
return this.bindQueue(this.queueName, this.exchangeName, '*');
}).then(() => {
return this.channel.consume(this.queueName, msg => {
this.init = (async () => {
await init;
await this.assertExchange(this.exchangeName, this.exchangeType);
await this.assertQueue(this.queueName, {
durable: this.durable,
exclusive: this.exclusive,
autoDelete: this.autoDelete
});
await this.bindQueue(this.queueName, this.exchangeName, '*');
await this.channel.consume(this.queueName, msg => {
this.consumeMessage(msg);
});
});
})();
}

@@ -55,4 +61,4 @@

async consumeMessage(msg) {
var request;
var replyTo = msg.properties && msg.properties.replyTo;
let request;
let replyTo = msg.properties && msg.properties.replyTo;

@@ -84,3 +90,3 @@ try {

case 'function':
var result;
let result;

@@ -87,0 +93,0 @@ try {

{
"name": "skein-rpc",
"version": "0.2.1",
"version": "0.3.1",
"description": "Client library for doing JSON-RPC over RabbitMQ/AMQP",

@@ -26,13 +26,13 @@ "main": "index.js",

"devDependencies": {
"chai": "^4.1.2",
"mocha": "^4.0.1"
"chai": "^4.2.0",
"mocha": "^6.1.4"
},
"dependencies": {
"amqplib": "^0.5.2",
"bluebird": "^3.5.1",
"commander": "^2.12.2",
"merge": "^1.2.0",
"sprintf": "^0.1.5",
"uuid": "^3.1.0"
"amqplib": "^0.5.3",
"bluebird": "^3.5.5",
"commander": "^2.20.0",
"merge": "^1.2.1",
"sprintf-js": "^1.1.2",
"uuid": "^3.3.2"
}
}

@@ -7,2 +7,4 @@ // == Imports ===============================================================

const exchange = 'test_exchange';
// == Support ===============================================================

@@ -14,7 +16,8 @@

it('can be created with defaults', async () => {
var client = new Client();
let client = new Client();
await client.init;
assert.ok(client);
await client.close();
});

@@ -24,9 +27,12 @@

it('can create an RPC client', async () => {
var client = new Client();
let client = new Client();
await client.init;
var rpc = client.rpc('test-exchange');
let rpc = client.rpc(exchange);
await rpc.init;
await client.init;
assert.ok(rpc);
assert.ok(rpc);
rpc.close();
client.close();
});

@@ -37,11 +43,14 @@ });

it('can create a generic worker instance', async () => {
var client = new Client();
let client = new Client();
await client.init;
var worker = client.worker('test-exchange');
let worker = client.worker(exchange);
await worker.init;
await client.init;
assert.ok(worker);
assert.ok(worker);
worker.close();
client.close();
});
})
});
});

@@ -20,7 +20,7 @@ // == Imports ===============================================================

const streamName = 'q-readable-test';
var connected = await Connected.open();
let connected = await Connected.open();
var readable = new Readable(connected, streamName);
var readableData = helpers.eventCounter(readable, 'data');
var buffer = new WritableArray();
let readable = new Readable(connected, streamName);
let readableData = helpers.eventCounter(readable, 'data');
let buffer = new WritableArray();

@@ -37,3 +37,3 @@ await readable.init;

var received = buffer.pop();
let received = buffer.pop();

@@ -43,2 +43,4 @@ assert.deepEqual({ test: true }, received);

readable.destroy();
connected.close();
});

@@ -50,11 +52,11 @@ });

const streamName = 'q-writable-test';
var connected = await Connected.open();
let connected = await Connected.open();
await connected.assertQueue(streamName);
var writable = new Writable(connected, streamName);
var readable = new Readable(connected, streamName);
let writable = new Writable(connected, streamName);
let readable = new Readable(connected, streamName);
var readableData = helpers.eventCounter(readable, 'data');
var buffer = new WritableArray();
let readableData = helpers.eventCounter(readable, 'data');
let buffer = new WritableArray();

@@ -72,7 +74,11 @@ await writable.init;

var received = buffer.pop();
let received = buffer.pop();
assert.deepEqual({ test: true }, received);
readable.destroy();
writable.destroy();
connected.close();
});
});
});

@@ -15,3 +15,3 @@ // == Imports ===============================================================

it('can provide a default configuration', () => {
var config = new Config();
let config = new Config();

@@ -25,5 +25,5 @@ assert.ok(config);

it('can read an arbitrary JSON file', () => {
var configPath = path.resolve(__dirname, './data/skein-example.json');
let configPath = path.resolve(__dirname, './data/skein-example.json');
var config = new Config(configPath);
let config = new Config(configPath);

@@ -41,3 +41,3 @@ assert.ok(config);

it('can properly URI encode with a minimal config', () => {
var config = new Config({ });
let config = new Config({ });

@@ -50,5 +50,5 @@ assert.equal(config.uri(), 'amqp://localhost/');

it('can properly URI encode values in configuration', () => {
var configPath = path.resolve(__dirname, './data/skein-encoding-example.json');
let configPath = path.resolve(__dirname, './data/skein-encoding-example.json');
var config = new Config(configPath);
let config = new Config(configPath);

@@ -63,3 +63,3 @@ assert.ok(config);

it('has a configuration file it can read', () => {
var configPath = path.resolve(__dirname, './data/skein-example.json');
let configPath = path.resolve(__dirname, './data/skein-example.json');

@@ -66,0 +66,0 @@ assert.isTrue(Config.exist(configPath));

@@ -20,5 +20,7 @@ // == Imports ===============================================================

it('will permit waiting for full initialization', async() => {
var connected = await Connected.open();
let connected = await Connected.open();
assert.ok(connected.channel);
connected.close();
});

@@ -28,5 +30,7 @@ });

it('can provide a default configuration', async () => {
var connected = await Connected.open();
let connected = await Connected.open();
assert.ok(connected.channel);
connected.close();
});

@@ -36,3 +40,3 @@

const queueName = 'q-connected-create-queues';
var connected = await Connected.open();
let connected = await Connected.open();

@@ -44,3 +48,5 @@ await connected.assertQueue(queueName);

// Unconsumed queues are not auto-deleted, so a force-delete is required.
connected.deleteQueue(queueName);
await connected.deleteQueue(queueName);
connected.close();
})

@@ -50,10 +56,10 @@

const streamName = 'q-connected-read-stream';
var connected = await Connected.open();
let connected = await Connected.open();
await connected.assertQueue(streamName);
var buffer = new WritableArray();
let buffer = new WritableArray();
var readable = await connected.readStream(streamName);
var readableData = helpers.eventCounter(readable, 'data');
let readable = await connected.readStream(streamName);
let readableData = helpers.eventCounter(readable, 'data');
readable.pipe(buffer);

@@ -67,5 +73,7 @@

var read = buffer.pop();
let read = buffer.pop();
assert.deepEqual({ stream: 'read' }, read);
connected.close();
});

@@ -75,11 +83,11 @@

const streamName = 'q-connected-write-stream';
var connected = await Connected.open();
let connected = await Connected.open();
await connected.assertQueue(streamName);
var writable = await connected.writeStream(streamName);
var readable = await connected.readStream(streamName);
let writable = await connected.writeStream(streamName);
let readable = await connected.readStream(streamName);
var readableData = helpers.eventCounter(readable, 'data');
var buffer = new WritableArray();
let readableData = helpers.eventCounter(readable, 'data');
let buffer = new WritableArray();
readable.pipe(buffer);

@@ -93,6 +101,8 @@

var read = buffer.pop();
let read = buffer.pop();
assert.deepEqual({ stream: 'write' }, read);
connected.close();
});
});

@@ -13,3 +13,3 @@ // == Imports ===============================================================

it('can work with defaults', () => {
var context = new Context();
let context = new Context();

@@ -24,5 +24,5 @@ assert.ok(context);

it('consistently issues ident values', () => {
var context = new Context();
let context = new Context();
var idents = [
let idents = [
context.ident(),

@@ -36,3 +36,3 @@ context.ident()

it('consistently issues unique ident values per-context', () => {
var idents = [
let idents = [
new Context().ident(),

@@ -39,0 +39,0 @@ new Context().ident()

@@ -27,3 +27,3 @@ // == Imports ===============================================================

it('can wrap synchronous functions that return plain values', () => {
var handler = new ExampleHandler();
let handler = new ExampleHandler();

@@ -35,3 +35,3 @@ return handler.handle(JSON.stringify({

})).then((json) => {
var result = JSON.parse(json);
let result = JSON.parse(json);

@@ -46,40 +46,40 @@ assert.deepEqual(result, {

it('can wrap functions that return a promise', () => {
var handler = new ExampleHandler();
it('can wrap functions that return a promise', async () => {
let handler = new ExampleHandler();
return handler.handle(JSON.stringify({
let json = await handler.handle(JSON.stringify({
id: 'echoPromise-001',
method: 'echoPromise',
params: [ 'value' ]
})).then((json) => {
var result = JSON.parse(json);
}));
assert.deepEqual(result, {
id: 'echoPromise-001',
result: { value: 'value' },
error: null
});
let result = JSON.parse(json);
assert.deepEqual(result, {
id: 'echoPromise-001',
result: { value: 'value' },
error: null
});
});
it('returns an error for an unknown method', () => {
var handler = new ExampleHandler();
it('returns an error for an unknown method', async () => {
let handler = new ExampleHandler();
return handler.handle(JSON.stringify({
let json = await handler.handle(JSON.stringify({
id: 'broken-001',
method: 'broken',
params: [ null ]
})).then((json) => {
var result = JSON.parse(json);
}));
assert.deepEqual(result, {
id: 'broken-001',
result: null,
error: {
code: -32601,
message: "Undefined endpoint 'broken'"
}
});
let result = JSON.parse(json);
assert.deepEqual(result, {
id: 'broken-001',
result: null,
error: {
code: -32601,
message: "Undefined endpoint 'broken'"
}
});
});
});

@@ -15,12 +15,14 @@ // == Imports ===============================================================

describe('connect()', () => {
it('can connect to the default server', () => {
var config = new Config();
it('can connect to the default server', async () => {
let config = new Config();
assert.ok(config.driver);
return rabbitmq.connect(config).then((conn) => {
assert.ok(conn);
});
let conn = await rabbitmq.connect(config);
assert.ok(conn);
conn.close();
});
});
});

@@ -16,14 +16,21 @@ // == Imports ===============================================================

it('can be created with defaults', async () => {
var client = new Client();
var worker = new EchoWorker('test_echo_rpc', 'test_exchange', client);
let client = new Client();
await client.init;
var rpc = new RPC('test_exchange', 'test_echo_rpc', client);
let worker = new EchoWorker('test_echo_rpc', '', client);
let rpc = new RPC('', 'test_echo_rpc', client);
await rpc.init;
await worker.init;
var response = await rpc.echo('test');
let response = await rpc.echo('test');
assert.equal(response, 'test');
rpc.close();
worker.close();
client.close();
});
});

@@ -26,12 +26,18 @@ // == Imports ===============================================================

it('can be created with a simple subclass (EchoWorker)', async () => {
var client = new Client();
let client = new Client();
await client.init;
var worker = client.worker('test_echo_worker', exchange, client, EchoWorker);
var rpc = client.rpc(exchange, 'test_echo_worker');
let worker = client.worker('test_echo_worker', exchange, client, EchoWorker);
await worker.init;
await Promise.all([ worker.init, rpc.init ]);
let rpc = client.rpc(exchange, 'test_echo_worker');
await rpc.init;
var reply = await rpc.echo('with test data');
let reply = await rpc.echo('with test data');
assert.equal(reply, 'with test data');
worker.close();
rpc.close();
client.close();
});

@@ -41,11 +47,14 @@

const queueName = 'q-worker-malformed-json';
var connected = new Connected();
var worker = new Worker(queueName, exchange);
var replyQueue = uuid();
var messageId = uuid();
await Promise.all([ connected.init, worker.init ]);
let connected = new Connected();
await connected.init;
var response = connected.consumeAsPromise(replyQueue);
let worker = new Worker(queueName, exchange);
let replyQueue = uuid();
let messageId = uuid();
await worker.init;
let response = connected.consumeAsPromise(replyQueue);
connected.channel.publish(

@@ -63,5 +72,5 @@ '',

var msg = await response;
let msg = await response;
var reply = json.fromBuffer(msg.content);
let reply = json.fromBuffer(msg.content);

@@ -71,2 +80,5 @@ assert.equal(reply.error.code, -32700);

assert.equal(reply.id, messageId);
connected.close();
worker.close();
});

@@ -76,11 +88,16 @@

const queueName = 'q-worker-incomplete-rpc';
var connected = new Connected();
var worker = new Worker(queueName, exchange);
var replyQueue = uuid();
var messageId = uuid();
let connected = new Connected();
await connected.init;
await Promise.all([ connected.init, worker.init ]);
let worker = new Worker(queueName, exchange);
await worker.init;
return connected.publishAsJson(
'',
let replyQueue = uuid();
let messageId = uuid();
// Listener must be engaged first.
let msg = connected.consumeAsPromise(replyQueue);
await connected.publishAsJson(
exchange,
queueName,

@@ -95,5 +112,6 @@ {

var msg = await connected.consumeAsPromise(replyQueue);
// Message will arrive after publishAsJson is called.
msg = await msg;
var reply = json.fromBuffer(msg.content);
let reply = json.fromBuffer(msg.content);

@@ -103,64 +121,76 @@ assert.equal(reply.error.code, -32600);

assert.equal(reply.id, messageId);
worker.close();
connected.close();
});
it('can deal with an RPC call missing a method', () => {
var connected = new Connected();
var worker = new Worker('test_worker', exchange);
var replyQueue = uuid();
var messageId = uuid();
it('can deal with an RPC call missing a method', async () => {
let connected = new Connected();
await connected.init;
let worker = new Worker('test_worker', exchange);
await worker.init;
return Promise.all([ connected.init, worker.init ]).then(() => {
return connected.consumeAsPromise(replyQueue, () => {
return connected.publishAsJson(
'',
'test_worker',
{
"jsonrpc": "2.0",
"id": messageId
},
{
messageId: messageId,
replyTo: replyQueue
}
);
});
}).then(msg => {
var reply = json.fromBuffer(msg.content);
let replyQueue = uuid();
let messageId = uuid();
assert.equal(reply.error.code, -32600);
assert.equal(reply.error.message, 'Invalid request: Missing or invalid "method" property');
assert.equal(reply.id, messageId);
let msg = await connected.consumeAsPromise(replyQueue, () => {
return connected.publishAsJson(
'',
'test_worker',
{
"jsonrpc": "2.0",
"id": messageId
},
{
messageId: messageId,
replyTo: replyQueue
}
);
});
let reply = json.fromBuffer(msg.content);
assert.equal(reply.error.code, -32600);
assert.equal(reply.error.message, 'Invalid request: Missing or invalid "method" property');
assert.equal(reply.id, messageId);
connected.close();
worker.close();
});
it('catches errors triggered within the worker code', () => {
var connected = new Connected();
var worker = new ErrorWorker('test_error_worker', exchange);
var replyQueue = uuid();
var messageId = uuid();
it('catches errors triggered within the worker code', async () => {
let connected = new Connected();
let worker = new ErrorWorker('test_error_worker', exchange);
let replyQueue = uuid();
let messageId = uuid();
return Promise.all([ connected.init, worker.init ]).then(() => {
return connected.consumeAsPromise(replyQueue, () => {
return connected.publishAsJson(
'',
'test_error_worker',
{
"jsonrpc": "2.0",
"id": messageId,
"method": "error"
},
{
messageId: messageId,
replyTo: replyQueue
}
);
});
}).then(msg => {
var reply = json.fromBuffer(msg.content);
await connected.init;
await worker.init;
assert.equal(reply.error.code, -32603);
assert.equal(reply.error.message, 'Internal error when handling "error"');
assert.equal(reply.id, messageId);
let msg = await connected.consumeAsPromise(replyQueue, () => {
return connected.publishAsJson(
'',
'test_error_worker',
{
"jsonrpc": "2.0",
"id": messageId,
"method": "error"
},
{
messageId: messageId,
replyTo: replyQueue
}
);
});
let reply = json.fromBuffer(msg.content);
assert.equal(reply.error.code, -32603);
assert.equal(reply.error.message, 'Internal error when handling "error"');
assert.equal(reply.id, messageId);
worker.close();
connected.close();
});
});