Comparing version
@@ -30,2 +30,3 @@ // == Imports =============================================================== | ||
// FUTURE: Make these all async and wait on init automatically. | ||
rpc(exchangeName, routingKey, options) { | ||
@@ -32,0 +33,0 @@ return new RPC(exchangeName, routingKey, this.createOptions(options)); |
@@ -28,8 +28,10 @@ // == Imports =============================================================== | ||
this.init = this.connected.init.then(() => { | ||
this.channel = this.connected.channel; | ||
this.init = (async () => { | ||
await this.connected.init; | ||
return this.connected.prepareQueue(this.queueName, this.exchangeName); | ||
}).then(() => { | ||
return this.channel.consume(this.queueName, msg => { | ||
this.channel = await this.connected.createChannel(); | ||
await this.connected.prepareQueue(this.queueName, this.exchangeName); | ||
let consumer = await this.channel.consume(this.queueName, msg => { | ||
this.channel.ack(msg); | ||
@@ -39,5 +41,5 @@ | ||
}); | ||
}).then(consumer => { | ||
this.consumerTag = consumer.consumerTag; | ||
}); | ||
})(); | ||
} | ||
@@ -51,3 +53,8 @@ | ||
if (this.consumerTag) { | ||
this.channel.cancel(this.consumerTag); | ||
try { | ||
this.channel.close(); | ||
} | ||
catch (err) { | ||
// Close failed, but this is being shut down anyway. | ||
} | ||
} | ||
@@ -54,0 +61,0 @@ |
@@ -59,3 +59,3 @@ // == Imports =============================================================== | ||
if (options) { | ||
var data = require(options); | ||
let data = require(options); | ||
@@ -74,3 +74,3 @@ merge(this, data.defaults, data[this.env()]); | ||
uri() { | ||
var uri = 'amqp://'; | ||
let uri = 'amqp://'; | ||
@@ -77,0 +77,0 @@ if (this.username && this.username != defaults.username) |
@@ -21,3 +21,3 @@ // == Imports =============================================================== | ||
static async open(options) { | ||
var connected = new Connected(options); | ||
let connected = new Connected(options); | ||
@@ -34,12 +34,16 @@ await connected.init; | ||
this.init = (init || Promise.resolve()).then(() => { | ||
this.init = (async () => { | ||
if (init) { | ||
await init; | ||
} | ||
if (options && options.connection) { | ||
this.connection = options.connection; | ||
++this.connection.__refs; | ||
} | ||
else { | ||
return RabbitMQ.connect(this.config).then(connection => { | ||
this.connection = connection; | ||
}); | ||
this.connection = await RabbitMQ.connect(this.config); | ||
this.connection.__refs = 1; | ||
} | ||
}).then(() => { | ||
if (options && options.channel) { | ||
@@ -49,7 +53,5 @@ this.channel = options.channel; | ||
else { | ||
return this.connection.createChannel().then(channel => { | ||
this.channel = channel; | ||
}); | ||
this.channel = await this.connection.createChannel(); | ||
} | ||
}); | ||
})(); | ||
@@ -59,8 +61,30 @@ this.context = new Context(options); | ||
close() { | ||
if (this.channel) { | ||
this.channel.close(); | ||
async createChannel() { | ||
return this.connection.createChannel(); | ||
} | ||
async close() { | ||
let channel = this.channel; | ||
if (channel) { | ||
delete this.channel; | ||
try { | ||
await channel.close(); | ||
} | ||
catch (err) { | ||
/// Close attempt failed, but must carry on. | ||
} | ||
} | ||
--this.connection.__refs; | ||
if (!this.connection.__refs) { | ||
let connection = this.connection; | ||
delete this.connection; | ||
return connection.close(); | ||
} | ||
} | ||
@@ -98,3 +122,3 @@ | ||
assertQueue(name, options) { | ||
var autoDelete = options && options.autoDelete; | ||
let autoDelete = options && options.autoDelete; | ||
@@ -125,4 +149,4 @@ return this.channel.assertQueue( | ||
publishAsJson(exchangeName, routingKey, data, options) { | ||
return this.channel.publish( | ||
async publishAsJson(exchangeName, routingKey, data, options) { | ||
await this.channel.publish( | ||
exchangeName, | ||
@@ -140,20 +164,25 @@ routingKey, | ||
consumeAsPromise(replyQueue, action) { | ||
return new Promise((resolve, reject) => { | ||
if (!replyQueue) { | ||
replyQueue = this.context.ident(); | ||
} | ||
async consumeAsPromise(replyQueue, action) { | ||
return new Promise(async (resolve, reject) => { | ||
try { | ||
if (!replyQueue) { | ||
replyQueue = this.context.ident(); | ||
} | ||
await this.channel.assertQueue(replyQueue, { | ||
exclusive: true, | ||
autoDelete: true | ||
}); | ||
this.channel.assertQueue(replyQueue, { | ||
exclusive: true, | ||
autoDelete: true | ||
}).then(() => { | ||
this.channel.consume(replyQueue, msg => { | ||
await this.channel.consume(replyQueue, msg => { | ||
resolve(msg); | ||
}); | ||
}).then(() => { | ||
if (action) { | ||
action(); | ||
} | ||
}) | ||
} | ||
catch (err) { | ||
reject(err); | ||
} | ||
}); | ||
@@ -163,3 +192,3 @@ } | ||
async readStream(queueName, exchangeName, options) { | ||
var stream = new Readable(this, queueName, exchangeName, options); | ||
let stream = new Readable(this, queueName, exchangeName, options); | ||
@@ -172,3 +201,3 @@ await stream.init; | ||
async writeStream(routingKey, exchangeName, options) { | ||
var stream = new Writable(this, routingKey, exchangeName, options); | ||
let stream = new Writable(this, routingKey, exchangeName, options); | ||
@@ -175,0 +204,0 @@ await stream.init; |
@@ -5,3 +5,3 @@ // == Imports =============================================================== | ||
const uuid = require('uuid/v4'); | ||
const sprintf = require('sprintf'); | ||
const { sprintf } = require('sprintf-js'); | ||
@@ -12,3 +12,3 @@ const processName = require('./support').processName; | ||
var sequence = 0; | ||
let sequence = 0; | ||
@@ -15,0 +15,0 @@ class Context { |
@@ -16,5 +16,5 @@ // == Imports =============================================================== | ||
handle(json) { | ||
var payload = JSON.parse(json); | ||
let payload = JSON.parse(json); | ||
var method = this[payload.method]; | ||
let method = this[payload.method]; | ||
@@ -21,0 +21,0 @@ if (typeof(method) != 'function') { |
@@ -6,3 +6,3 @@ // == Imports =============================================================== | ||
function connect(config) { | ||
var driver = require(config.driver); | ||
let driver = require(config.driver); | ||
@@ -9,0 +9,0 @@ return driver.connect(config.uri()); |
@@ -29,16 +29,18 @@ // == Imports =============================================================== | ||
var obj = { }; | ||
let obj = { }; | ||
var connected = obj.__connected = new Connected(new Config(options.config)); | ||
var context = connected.context; | ||
var queueName = options.queueName || connected.context.ident(); | ||
var routingKey = routingKey || options.routingKey; | ||
var requests = { }; | ||
var persistent = options.persistent; | ||
let connected = obj.__connected = new Connected(options); | ||
let context = connected.context; | ||
let queueName = options.queueName || connected.context.ident(); | ||
let persistent = options.persistent; | ||
var handler = function(msg) { | ||
var data = json.fromBuffer(msg.content); | ||
routingKey = routingKey || options.routingKey; | ||
var request = requests[data.id]; | ||
let requests = { }; | ||
let handler = msg => { | ||
let data = json.fromBuffer(msg.content); | ||
let request = requests[data.id]; | ||
if (request) { | ||
@@ -54,19 +56,24 @@ if (request.error) { | ||
obj.init = connected.init.then(() => { | ||
return connected.assertExchange( | ||
// Sets up a Promise that waits for the connection to be fully established | ||
obj.init = (async () => { | ||
await connected.init; | ||
await connected.assertExchange( | ||
exchangeName, | ||
'topic' | ||
); | ||
}).then(_exchange => { | ||
return connected.assertQueue(queueName, { | ||
await connected.assertQueue(queueName, { | ||
exclusive: true, | ||
autoDelete: true | ||
}); | ||
}).then(() => { | ||
return connected.channel.consume(queueName, handler); | ||
}); | ||
return new Proxy(obj, | ||
await connected.channel.consume(queueName, handler); | ||
})(); | ||
// Create a Proxy that converts incoming method calls into the corresponding | ||
// JSON-RPC messages. | ||
let proxy = new Proxy(obj, | ||
{ | ||
get: (_obj, prop) => { | ||
get: (_, prop) => { | ||
if (obj[prop]) { | ||
@@ -77,3 +84,3 @@ return obj[prop]; | ||
obj[prop] = function() { | ||
var messageId = context.requestId(); | ||
let messageId = context.requestId(); | ||
@@ -105,3 +112,9 @@ connected.publishAsJson( | ||
} | ||
) | ||
); | ||
proxy.close = () => { | ||
connected.close(); | ||
}; | ||
return proxy; | ||
} | ||
@@ -108,0 +121,0 @@ |
@@ -27,18 +27,24 @@ // == Imports =============================================================== | ||
this.autoDelete = options && options.autoDelete; | ||
this.exclusive = options && options.exclusive; | ||
this.durable = options && options.durable; | ||
this.setup(); | ||
} | ||
let init = this.init; | ||
setup() { | ||
this.init = this.init.then(() => { | ||
return this.assertExchange(this.exchangeName, this.exchangeType); | ||
}).then(() => { | ||
return this.assertQueue(this.queueName, { autoDelete: this.autoDelete }); | ||
}).then(() => { | ||
return this.bindQueue(this.queueName, this.exchangeName, '*'); | ||
}).then(() => { | ||
return this.channel.consume(this.queueName, msg => { | ||
this.init = (async () => { | ||
await init; | ||
await this.assertExchange(this.exchangeName, this.exchangeType); | ||
await this.assertQueue(this.queueName, { | ||
durable: this.durable, | ||
exclusive: this.exclusive, | ||
autoDelete: this.autoDelete | ||
}); | ||
await this.bindQueue(this.queueName, this.exchangeName, '*'); | ||
await this.channel.consume(this.queueName, msg => { | ||
this.consumeMessage(msg); | ||
}); | ||
}); | ||
})(); | ||
} | ||
@@ -55,4 +61,4 @@ | ||
async consumeMessage(msg) { | ||
var request; | ||
var replyTo = msg.properties && msg.properties.replyTo; | ||
let request; | ||
let replyTo = msg.properties && msg.properties.replyTo; | ||
@@ -84,3 +90,3 @@ try { | ||
case 'function': | ||
var result; | ||
let result; | ||
@@ -87,0 +93,0 @@ try { |
{ | ||
"name": "skein-rpc", | ||
"version": "0.2.1", | ||
"version": "0.3.1", | ||
"description": "Client library for doing JSON-RPC over RabbitMQ/AMQP", | ||
@@ -26,13 +26,13 @@ "main": "index.js", | ||
"devDependencies": { | ||
"chai": "^4.1.2", | ||
"mocha": "^4.0.1" | ||
"chai": "^4.2.0", | ||
"mocha": "^6.1.4" | ||
}, | ||
"dependencies": { | ||
"amqplib": "^0.5.2", | ||
"bluebird": "^3.5.1", | ||
"commander": "^2.12.2", | ||
"merge": "^1.2.0", | ||
"sprintf": "^0.1.5", | ||
"uuid": "^3.1.0" | ||
"amqplib": "^0.5.3", | ||
"bluebird": "^3.5.5", | ||
"commander": "^2.20.0", | ||
"merge": "^1.2.1", | ||
"sprintf-js": "^1.1.2", | ||
"uuid": "^3.3.2" | ||
} | ||
} |
@@ -7,2 +7,4 @@ // == Imports =============================================================== | ||
const exchange = 'test_exchange'; | ||
// == Support =============================================================== | ||
@@ -14,7 +16,8 @@ | ||
it('can be created with defaults', async () => { | ||
var client = new Client(); | ||
let client = new Client(); | ||
await client.init; | ||
assert.ok(client); | ||
await client.close(); | ||
}); | ||
@@ -24,9 +27,12 @@ | ||
it('can create an RPC client', async () => { | ||
var client = new Client(); | ||
let client = new Client(); | ||
await client.init; | ||
var rpc = client.rpc('test-exchange'); | ||
let rpc = client.rpc(exchange); | ||
await rpc.init; | ||
await client.init; | ||
assert.ok(rpc); | ||
assert.ok(rpc); | ||
rpc.close(); | ||
client.close(); | ||
}); | ||
@@ -37,11 +43,14 @@ }); | ||
it('can create a generic worker instance', async () => { | ||
var client = new Client(); | ||
let client = new Client(); | ||
await client.init; | ||
var worker = client.worker('test-exchange'); | ||
let worker = client.worker(exchange); | ||
await worker.init; | ||
await client.init; | ||
assert.ok(worker); | ||
assert.ok(worker); | ||
worker.close(); | ||
client.close(); | ||
}); | ||
}) | ||
}); | ||
}); |
@@ -20,7 +20,7 @@ // == Imports =============================================================== | ||
const streamName = 'q-readable-test'; | ||
var connected = await Connected.open(); | ||
let connected = await Connected.open(); | ||
var readable = new Readable(connected, streamName); | ||
var readableData = helpers.eventCounter(readable, 'data'); | ||
var buffer = new WritableArray(); | ||
let readable = new Readable(connected, streamName); | ||
let readableData = helpers.eventCounter(readable, 'data'); | ||
let buffer = new WritableArray(); | ||
@@ -37,3 +37,3 @@ await readable.init; | ||
var received = buffer.pop(); | ||
let received = buffer.pop(); | ||
@@ -43,2 +43,4 @@ assert.deepEqual({ test: true }, received); | ||
readable.destroy(); | ||
connected.close(); | ||
}); | ||
@@ -50,11 +52,11 @@ }); | ||
const streamName = 'q-writable-test'; | ||
var connected = await Connected.open(); | ||
let connected = await Connected.open(); | ||
await connected.assertQueue(streamName); | ||
var writable = new Writable(connected, streamName); | ||
var readable = new Readable(connected, streamName); | ||
let writable = new Writable(connected, streamName); | ||
let readable = new Readable(connected, streamName); | ||
var readableData = helpers.eventCounter(readable, 'data'); | ||
var buffer = new WritableArray(); | ||
let readableData = helpers.eventCounter(readable, 'data'); | ||
let buffer = new WritableArray(); | ||
@@ -72,7 +74,11 @@ await writable.init; | ||
var received = buffer.pop(); | ||
let received = buffer.pop(); | ||
assert.deepEqual({ test: true }, received); | ||
readable.destroy(); | ||
writable.destroy(); | ||
connected.close(); | ||
}); | ||
}); | ||
}); |
@@ -15,3 +15,3 @@ // == Imports =============================================================== | ||
it('can provide a default configuration', () => { | ||
var config = new Config(); | ||
let config = new Config(); | ||
@@ -25,5 +25,5 @@ assert.ok(config); | ||
it('can read an arbitrary JSON file', () => { | ||
var configPath = path.resolve(__dirname, './data/skein-example.json'); | ||
let configPath = path.resolve(__dirname, './data/skein-example.json'); | ||
var config = new Config(configPath); | ||
let config = new Config(configPath); | ||
@@ -41,3 +41,3 @@ assert.ok(config); | ||
it('can properly URI encode with a minimal config', () => { | ||
var config = new Config({ }); | ||
let config = new Config({ }); | ||
@@ -50,5 +50,5 @@ assert.equal(config.uri(), 'amqp://localhost/'); | ||
it('can properly URI encode values in configuration', () => { | ||
var configPath = path.resolve(__dirname, './data/skein-encoding-example.json'); | ||
let configPath = path.resolve(__dirname, './data/skein-encoding-example.json'); | ||
var config = new Config(configPath); | ||
let config = new Config(configPath); | ||
@@ -63,3 +63,3 @@ assert.ok(config); | ||
it('has a configuration file it can read', () => { | ||
var configPath = path.resolve(__dirname, './data/skein-example.json'); | ||
let configPath = path.resolve(__dirname, './data/skein-example.json'); | ||
@@ -66,0 +66,0 @@ assert.isTrue(Config.exist(configPath)); |
@@ -20,5 +20,7 @@ // == Imports =============================================================== | ||
it('will permit waiting for full initialization', async() => { | ||
var connected = await Connected.open(); | ||
let connected = await Connected.open(); | ||
assert.ok(connected.channel); | ||
connected.close(); | ||
}); | ||
@@ -28,5 +30,7 @@ }); | ||
it('can provide a default configuration', async () => { | ||
var connected = await Connected.open(); | ||
let connected = await Connected.open(); | ||
assert.ok(connected.channel); | ||
connected.close(); | ||
}); | ||
@@ -36,3 +40,3 @@ | ||
const queueName = 'q-connected-create-queues'; | ||
var connected = await Connected.open(); | ||
let connected = await Connected.open(); | ||
@@ -44,3 +48,5 @@ await connected.assertQueue(queueName); | ||
// Unconsumed queues are not auto-deleted, so a force-delete is required. | ||
connected.deleteQueue(queueName); | ||
await connected.deleteQueue(queueName); | ||
connected.close(); | ||
}) | ||
@@ -50,10 +56,10 @@ | ||
const streamName = 'q-connected-read-stream'; | ||
var connected = await Connected.open(); | ||
let connected = await Connected.open(); | ||
await connected.assertQueue(streamName); | ||
var buffer = new WritableArray(); | ||
let buffer = new WritableArray(); | ||
var readable = await connected.readStream(streamName); | ||
var readableData = helpers.eventCounter(readable, 'data'); | ||
let readable = await connected.readStream(streamName); | ||
let readableData = helpers.eventCounter(readable, 'data'); | ||
readable.pipe(buffer); | ||
@@ -67,5 +73,7 @@ | ||
var read = buffer.pop(); | ||
let read = buffer.pop(); | ||
assert.deepEqual({ stream: 'read' }, read); | ||
connected.close(); | ||
}); | ||
@@ -75,11 +83,11 @@ | ||
const streamName = 'q-connected-write-stream'; | ||
var connected = await Connected.open(); | ||
let connected = await Connected.open(); | ||
await connected.assertQueue(streamName); | ||
var writable = await connected.writeStream(streamName); | ||
var readable = await connected.readStream(streamName); | ||
let writable = await connected.writeStream(streamName); | ||
let readable = await connected.readStream(streamName); | ||
var readableData = helpers.eventCounter(readable, 'data'); | ||
var buffer = new WritableArray(); | ||
let readableData = helpers.eventCounter(readable, 'data'); | ||
let buffer = new WritableArray(); | ||
readable.pipe(buffer); | ||
@@ -93,6 +101,8 @@ | ||
var read = buffer.pop(); | ||
let read = buffer.pop(); | ||
assert.deepEqual({ stream: 'write' }, read); | ||
connected.close(); | ||
}); | ||
}); |
@@ -13,3 +13,3 @@ // == Imports =============================================================== | ||
it('can work with defaults', () => { | ||
var context = new Context(); | ||
let context = new Context(); | ||
@@ -24,5 +24,5 @@ assert.ok(context); | ||
it('consistently issues ident values', () => { | ||
var context = new Context(); | ||
let context = new Context(); | ||
var idents = [ | ||
let idents = [ | ||
context.ident(), | ||
@@ -36,3 +36,3 @@ context.ident() | ||
it('consistently issues unique ident values per-context', () => { | ||
var idents = [ | ||
let idents = [ | ||
new Context().ident(), | ||
@@ -39,0 +39,0 @@ new Context().ident() |
@@ -27,3 +27,3 @@ // == Imports =============================================================== | ||
it('can wrap synchronous functions that return plain values', () => { | ||
var handler = new ExampleHandler(); | ||
let handler = new ExampleHandler(); | ||
@@ -35,3 +35,3 @@ return handler.handle(JSON.stringify({ | ||
})).then((json) => { | ||
var result = JSON.parse(json); | ||
let result = JSON.parse(json); | ||
@@ -46,40 +46,40 @@ assert.deepEqual(result, { | ||
it('can wrap functions that return a promise', () => { | ||
var handler = new ExampleHandler(); | ||
it('can wrap functions that return a promise', async () => { | ||
let handler = new ExampleHandler(); | ||
return handler.handle(JSON.stringify({ | ||
let json = await handler.handle(JSON.stringify({ | ||
id: 'echoPromise-001', | ||
method: 'echoPromise', | ||
params: [ 'value' ] | ||
})).then((json) => { | ||
var result = JSON.parse(json); | ||
})); | ||
assert.deepEqual(result, { | ||
id: 'echoPromise-001', | ||
result: { value: 'value' }, | ||
error: null | ||
}); | ||
let result = JSON.parse(json); | ||
assert.deepEqual(result, { | ||
id: 'echoPromise-001', | ||
result: { value: 'value' }, | ||
error: null | ||
}); | ||
}); | ||
it('returns an error for an unknown method', () => { | ||
var handler = new ExampleHandler(); | ||
it('returns an error for an unknown method', async () => { | ||
let handler = new ExampleHandler(); | ||
return handler.handle(JSON.stringify({ | ||
let json = await handler.handle(JSON.stringify({ | ||
id: 'broken-001', | ||
method: 'broken', | ||
params: [ null ] | ||
})).then((json) => { | ||
var result = JSON.parse(json); | ||
})); | ||
assert.deepEqual(result, { | ||
id: 'broken-001', | ||
result: null, | ||
error: { | ||
code: -32601, | ||
message: "Undefined endpoint 'broken'" | ||
} | ||
}); | ||
let result = JSON.parse(json); | ||
assert.deepEqual(result, { | ||
id: 'broken-001', | ||
result: null, | ||
error: { | ||
code: -32601, | ||
message: "Undefined endpoint 'broken'" | ||
} | ||
}); | ||
}); | ||
}); |
@@ -15,12 +15,14 @@ // == Imports =============================================================== | ||
describe('connect()', () => { | ||
it('can connect to the default server', () => { | ||
var config = new Config(); | ||
it('can connect to the default server', async () => { | ||
let config = new Config(); | ||
assert.ok(config.driver); | ||
return rabbitmq.connect(config).then((conn) => { | ||
assert.ok(conn); | ||
}); | ||
let conn = await rabbitmq.connect(config); | ||
assert.ok(conn); | ||
conn.close(); | ||
}); | ||
}); | ||
}); |
@@ -16,14 +16,21 @@ // == Imports =============================================================== | ||
it('can be created with defaults', async () => { | ||
var client = new Client(); | ||
var worker = new EchoWorker('test_echo_rpc', 'test_exchange', client); | ||
let client = new Client(); | ||
await client.init; | ||
var rpc = new RPC('test_exchange', 'test_echo_rpc', client); | ||
let worker = new EchoWorker('test_echo_rpc', '', client); | ||
let rpc = new RPC('', 'test_echo_rpc', client); | ||
await rpc.init; | ||
await worker.init; | ||
var response = await rpc.echo('test'); | ||
let response = await rpc.echo('test'); | ||
assert.equal(response, 'test'); | ||
rpc.close(); | ||
worker.close(); | ||
client.close(); | ||
}); | ||
}); |
@@ -26,12 +26,18 @@ // == Imports =============================================================== | ||
it('can be created with a simple subclass (EchoWorker)', async () => { | ||
var client = new Client(); | ||
let client = new Client(); | ||
await client.init; | ||
var worker = client.worker('test_echo_worker', exchange, client, EchoWorker); | ||
var rpc = client.rpc(exchange, 'test_echo_worker'); | ||
let worker = client.worker('test_echo_worker', exchange, client, EchoWorker); | ||
await worker.init; | ||
await Promise.all([ worker.init, rpc.init ]); | ||
let rpc = client.rpc(exchange, 'test_echo_worker'); | ||
await rpc.init; | ||
var reply = await rpc.echo('with test data'); | ||
let reply = await rpc.echo('with test data'); | ||
assert.equal(reply, 'with test data'); | ||
worker.close(); | ||
rpc.close(); | ||
client.close(); | ||
}); | ||
@@ -41,11 +47,14 @@ | ||
const queueName = 'q-worker-malformed-json'; | ||
var connected = new Connected(); | ||
var worker = new Worker(queueName, exchange); | ||
var replyQueue = uuid(); | ||
var messageId = uuid(); | ||
await Promise.all([ connected.init, worker.init ]); | ||
let connected = new Connected(); | ||
await connected.init; | ||
var response = connected.consumeAsPromise(replyQueue); | ||
let worker = new Worker(queueName, exchange); | ||
let replyQueue = uuid(); | ||
let messageId = uuid(); | ||
await worker.init; | ||
let response = connected.consumeAsPromise(replyQueue); | ||
connected.channel.publish( | ||
@@ -63,5 +72,5 @@ '', | ||
var msg = await response; | ||
let msg = await response; | ||
var reply = json.fromBuffer(msg.content); | ||
let reply = json.fromBuffer(msg.content); | ||
@@ -71,2 +80,5 @@ assert.equal(reply.error.code, -32700); | ||
assert.equal(reply.id, messageId); | ||
connected.close(); | ||
worker.close(); | ||
}); | ||
@@ -76,11 +88,16 @@ | ||
const queueName = 'q-worker-incomplete-rpc'; | ||
var connected = new Connected(); | ||
var worker = new Worker(queueName, exchange); | ||
var replyQueue = uuid(); | ||
var messageId = uuid(); | ||
let connected = new Connected(); | ||
await connected.init; | ||
await Promise.all([ connected.init, worker.init ]); | ||
let worker = new Worker(queueName, exchange); | ||
await worker.init; | ||
return connected.publishAsJson( | ||
'', | ||
let replyQueue = uuid(); | ||
let messageId = uuid(); | ||
// Listener must be engaged first. | ||
let msg = connected.consumeAsPromise(replyQueue); | ||
await connected.publishAsJson( | ||
exchange, | ||
queueName, | ||
@@ -95,5 +112,6 @@ { | ||
var msg = await connected.consumeAsPromise(replyQueue); | ||
// Message will arrive after publishAsJson is called. | ||
msg = await msg; | ||
var reply = json.fromBuffer(msg.content); | ||
let reply = json.fromBuffer(msg.content); | ||
@@ -103,64 +121,76 @@ assert.equal(reply.error.code, -32600); | ||
assert.equal(reply.id, messageId); | ||
worker.close(); | ||
connected.close(); | ||
}); | ||
it('can deal with an RPC call missing a method', () => { | ||
var connected = new Connected(); | ||
var worker = new Worker('test_worker', exchange); | ||
var replyQueue = uuid(); | ||
var messageId = uuid(); | ||
it('can deal with an RPC call missing a method', async () => { | ||
let connected = new Connected(); | ||
await connected.init; | ||
let worker = new Worker('test_worker', exchange); | ||
await worker.init; | ||
return Promise.all([ connected.init, worker.init ]).then(() => { | ||
return connected.consumeAsPromise(replyQueue, () => { | ||
return connected.publishAsJson( | ||
'', | ||
'test_worker', | ||
{ | ||
"jsonrpc": "2.0", | ||
"id": messageId | ||
}, | ||
{ | ||
messageId: messageId, | ||
replyTo: replyQueue | ||
} | ||
); | ||
}); | ||
}).then(msg => { | ||
var reply = json.fromBuffer(msg.content); | ||
let replyQueue = uuid(); | ||
let messageId = uuid(); | ||
assert.equal(reply.error.code, -32600); | ||
assert.equal(reply.error.message, 'Invalid request: Missing or invalid "method" property'); | ||
assert.equal(reply.id, messageId); | ||
let msg = await connected.consumeAsPromise(replyQueue, () => { | ||
return connected.publishAsJson( | ||
'', | ||
'test_worker', | ||
{ | ||
"jsonrpc": "2.0", | ||
"id": messageId | ||
}, | ||
{ | ||
messageId: messageId, | ||
replyTo: replyQueue | ||
} | ||
); | ||
}); | ||
let reply = json.fromBuffer(msg.content); | ||
assert.equal(reply.error.code, -32600); | ||
assert.equal(reply.error.message, 'Invalid request: Missing or invalid "method" property'); | ||
assert.equal(reply.id, messageId); | ||
connected.close(); | ||
worker.close(); | ||
}); | ||
it('catches errors triggered within the worker code', () => { | ||
var connected = new Connected(); | ||
var worker = new ErrorWorker('test_error_worker', exchange); | ||
var replyQueue = uuid(); | ||
var messageId = uuid(); | ||
it('catches errors triggered within the worker code', async () => { | ||
let connected = new Connected(); | ||
let worker = new ErrorWorker('test_error_worker', exchange); | ||
let replyQueue = uuid(); | ||
let messageId = uuid(); | ||
return Promise.all([ connected.init, worker.init ]).then(() => { | ||
return connected.consumeAsPromise(replyQueue, () => { | ||
return connected.publishAsJson( | ||
'', | ||
'test_error_worker', | ||
{ | ||
"jsonrpc": "2.0", | ||
"id": messageId, | ||
"method": "error" | ||
}, | ||
{ | ||
messageId: messageId, | ||
replyTo: replyQueue | ||
} | ||
); | ||
}); | ||
}).then(msg => { | ||
var reply = json.fromBuffer(msg.content); | ||
await connected.init; | ||
await worker.init; | ||
assert.equal(reply.error.code, -32603); | ||
assert.equal(reply.error.message, 'Internal error when handling "error"'); | ||
assert.equal(reply.id, messageId); | ||
let msg = await connected.consumeAsPromise(replyQueue, () => { | ||
return connected.publishAsJson( | ||
'', | ||
'test_error_worker', | ||
{ | ||
"jsonrpc": "2.0", | ||
"id": messageId, | ||
"method": "error" | ||
}, | ||
{ | ||
messageId: messageId, | ||
replyTo: replyQueue | ||
} | ||
); | ||
}); | ||
let reply = json.fromBuffer(msg.content); | ||
assert.equal(reply.error.code, -32603); | ||
assert.equal(reply.error.message, 'Internal error when handling "error"'); | ||
assert.equal(reply.id, messageId); | ||
worker.close(); | ||
connected.close(); | ||
}); | ||
}); |
44369
3.74%1283
5.42%+ Added
+ Added
- Removed
- Removed
Updated
Updated
Updated
Updated
Updated