Comparing version 1.4.1 to 1.5.0
@@ -12,2 +12,3 @@ import * as amqp from 'amqplib'; | ||
logger?: StandardLogger; | ||
persistentMessages?: boolean; | ||
} | ||
@@ -25,2 +26,3 @@ export interface RpcClientOptions { | ||
log: StandardLogger; | ||
persistentMessages: boolean; | ||
protected calls: Map<string, PromiseCallbacks>; | ||
@@ -49,2 +51,4 @@ protected callTimer: Timer; | ||
* Default 15 minutes. | ||
* @param {boolean} [opts.rpcClient.persistentMessages] Whether to use persistent messages. | ||
* Default false. | ||
* @param {StandardLogger} [opts.rpcClient.logger] Custom logger for client use. | ||
@@ -70,3 +74,3 @@ */ | ||
*/ | ||
term({waitForCalls}?: { | ||
term({ waitForCalls }?: { | ||
waitForCalls?: number; | ||
@@ -87,3 +91,3 @@ }): Promise<void>; | ||
protected callPayload(procedure: string, ...args: any[]): ClientPayload; | ||
protected makeReplyHandler(): (message: amqp.Message) => any; | ||
protected makeReplyHandler(): (message: amqp.Message | null) => any; | ||
} |
@@ -35,2 +35,4 @@ "use strict"; | ||
* Default 15 minutes. | ||
* @param {boolean} [opts.rpcClient.persistentMessages] Whether to use persistent messages. | ||
* Default false. | ||
* @param {StandardLogger} [opts.rpcClient.logger] Custom logger for client use. | ||
@@ -44,2 +46,3 @@ */ | ||
this.log = logger_1.default; | ||
this.persistentMessages = false; | ||
this.amqpClient = new AmqpClient_1.default(opts.amqpClient); | ||
@@ -58,2 +61,5 @@ this.calls = new Map(); | ||
this.log = opts.rpcClient.logger; | ||
if (typeof opts.rpcClient.persistentMessages !== 'undefined') { | ||
this.persistentMessages = opts.rpcClient.persistentMessages; | ||
} | ||
} | ||
@@ -121,3 +127,3 @@ this.callTimer = new Timer_1.default(); | ||
// TODO: check for publish return, may need to flush | ||
await this.amqpClient.channel.publish(this.rpcExchangeName, 'call', new Buffer(JSON.stringify(this.callPayload(procedure, ...args))), { replyTo: 'amq.rabbitmq.reply-to', correlationId }); | ||
await this.amqpClient.channel.publish(this.rpcExchangeName, 'call', new Buffer(JSON.stringify(this.callPayload(procedure, ...args))), { persistent: this.persistentMessages, replyTo: 'amq.rabbitmq.reply-to', correlationId }); | ||
try { | ||
@@ -154,2 +160,4 @@ return await Promise.race([ | ||
return (message) => { | ||
if (!message) | ||
return; // beats me, but it's in the type definition 🤷♂️ | ||
const correlationId = message.properties.correlationId; | ||
@@ -156,0 +164,0 @@ const callbacks = this.calls.get(correlationId); |
@@ -48,2 +48,4 @@ "use strict"; | ||
const { consumerTag } = await this.amqpClient.channel.consume(`${this.rpcExchangeName}.call`, async (message) => { | ||
if (!message) | ||
return; | ||
let content; | ||
@@ -50,0 +52,0 @@ try { |
@@ -14,4 +14,4 @@ "use strict"; | ||
*/ | ||
const sendMessage = async (channel, { replyTo, correlationId }, response) => { | ||
await channel.publish('', replyTo, new Buffer(JSON.stringify(response)), { correlationId }); | ||
const sendMessage = async (channel, { replyTo, correlationId, deliveryMode }, response) => { | ||
await channel.publish('', replyTo, new Buffer(JSON.stringify(response)), { deliveryMode, correlationId }); | ||
}; | ||
@@ -18,0 +18,0 @@ /** |
@@ -1,1 +0,1 @@ | ||
export declare function delay(ms: number): Promise<{}>; | ||
export declare function delay(ms: number): Promise<unknown>; |
@@ -31,3 +31,3 @@ "use strict"; | ||
const payload = JSON.stringify({ procedure: 'marco', args: ['polo', 42], timeouts }); | ||
sinon.assert.calledWith(t.context.publishStub, 'mqrpc', 'call', sinon.match(buffer => buffer.toString() === payload), { replyTo: 'amq.rabbitmq.reply-to', correlationId: sinon.match.string }); | ||
sinon.assert.calledWith(t.context.publishStub, 'mqrpc', 'call', sinon.match(buffer => buffer.toString() === payload), { persistent: false, replyTo: 'amq.rabbitmq.reply-to', correlationId: sinon.match.string }); | ||
t.pass(); | ||
@@ -34,0 +34,0 @@ }); |
@@ -10,3 +10,3 @@ "use strict"; | ||
fields: { deliveryTag: 1234567890987654321 }, | ||
properties: { replyTo: 'amqp.rabbitmq.reply-to', correlationId: '123456' } | ||
properties: { replyTo: 'amqp.rabbitmq.reply-to', correlationId: '123456', deliveryMode: 2 } | ||
}; | ||
@@ -21,3 +21,3 @@ const amqpClient = new AmqpClient_1.default({ amqpUrl: _config_1.AMQP_URL }); | ||
await comms_1.ack(amqpClient.channel, sampleMessage); | ||
sinon.assert.calledWith(spy, '', 'amqp.rabbitmq.reply-to', new Buffer('{"type":"ack"}'), { correlationId: '123456' }); | ||
sinon.assert.calledWith(spy, '', 'amqp.rabbitmq.reply-to', new Buffer('{"type":"ack"}'), { correlationId: '123456', deliveryMode: 2 }); | ||
t.pass(); | ||
@@ -28,4 +28,4 @@ }); | ||
await comms_1.wait(amqpClient.channel, sampleMessage); | ||
sinon.assert.calledWith(spy, '', 'amqp.rabbitmq.reply-to', new Buffer('{"type":"wait"}'), { correlationId: '123456' }); | ||
sinon.assert.calledWith(spy, '', 'amqp.rabbitmq.reply-to', new Buffer('{"type":"wait"}'), { correlationId: '123456', deliveryMode: 2 }); | ||
t.pass(); | ||
}); |
@@ -27,3 +27,3 @@ "use strict"; | ||
await comms_1.reply(amqpClient.channel, sampleMessage); | ||
sinon.assert.calledWith(spy, '', 'amqp.rabbitmq.reply-to', sinon.match.any, { correlationId: '123456' }); | ||
sinon.assert.calledWith(spy, '', 'amqp.rabbitmq.reply-to', sinon.match.any, { correlationId: '123456', deliveryMode: undefined }); | ||
t.pass(); | ||
@@ -30,0 +30,0 @@ }); |
@@ -12,3 +12,3 @@ "use strict"; | ||
.exactly(times) | ||
.withExactArgs('', 1234, new Buffer(JSON.stringify({ type })), { correlationId: '12345' }) | ||
.withExactArgs('', 1234, new Buffer(JSON.stringify({ type })), { correlationId: '12345', deliveryMode: undefined }) | ||
.resolves(); | ||
@@ -15,0 +15,0 @@ }; |
@@ -14,4 +14,4 @@ import * as amqp from 'amqplib' | ||
socketOptions?: object | ||
connection: amqp.Connection | ||
channel: amqp.Channel | ||
connection!: amqp.Connection | ||
channel!: amqp.Channel | ||
prefetchCount = 100 | ||
@@ -18,0 +18,0 @@ |
@@ -16,2 +16,3 @@ import * as uuid from 'uuid/v4' | ||
logger?: StandardLogger | ||
persistentMessages?: boolean | ||
} | ||
@@ -36,2 +37,3 @@ | ||
log = logger as StandardLogger | ||
persistentMessages = false | ||
@@ -62,2 +64,4 @@ protected calls: Map<string, PromiseCallbacks> | ||
* Default 15 minutes. | ||
* @param {boolean} [opts.rpcClient.persistentMessages] Whether to use persistent messages. | ||
* Default false. | ||
* @param {StandardLogger} [opts.rpcClient.logger] Custom logger for client use. | ||
@@ -75,2 +79,5 @@ */ | ||
if (opts.rpcClient.logger) this.log = opts.rpcClient.logger | ||
if (typeof opts.rpcClient.persistentMessages !== 'undefined') { | ||
this.persistentMessages = opts.rpcClient.persistentMessages | ||
} | ||
} | ||
@@ -159,3 +166,3 @@ | ||
new Buffer(JSON.stringify(this.callPayload(procedure, ...args))), | ||
{ replyTo: 'amq.rabbitmq.reply-to', correlationId } | ||
{ persistent: this.persistentMessages, replyTo: 'amq.rabbitmq.reply-to', correlationId } | ||
) | ||
@@ -189,4 +196,6 @@ | ||
protected makeReplyHandler(): (message: amqp.Message) => any { | ||
return (message: amqp.Message) => { | ||
protected makeReplyHandler(): (message: amqp.Message | null) => any { | ||
return (message: amqp.Message | null) => { | ||
if (!message) return // beats me, but it's in the type definition 🤷♂️ | ||
const correlationId = message.properties.correlationId | ||
@@ -193,0 +202,0 @@ const callbacks = this.calls.get(correlationId) |
@@ -72,3 +72,5 @@ import { Channel, Message } from 'amqplib' | ||
`${this.rpcExchangeName}.call`, | ||
async (message: Message) => { | ||
async (message: Message | null) => { | ||
if (!message) return | ||
let content: ClientPayload | ||
@@ -75,0 +77,0 @@ |
@@ -15,3 +15,3 @@ import { Channel, Message } from 'amqplib' | ||
*/ | ||
const sendMessage = async (channel: Channel, { replyTo, correlationId }: any, response: object) => { | ||
const sendMessage = async (channel: Channel, { replyTo, correlationId, deliveryMode }: any, response: object) => { | ||
await channel.publish( | ||
@@ -21,3 +21,3 @@ '', | ||
new Buffer(JSON.stringify(response)), | ||
{ correlationId } | ||
{ deliveryMode, correlationId } | ||
) | ||
@@ -24,0 +24,0 @@ } |
{ | ||
"name": "mqrpc", | ||
"version": "1.4.1", | ||
"version": "1.5.0", | ||
"description": "💫 Easy RPC over RabbitMQ", | ||
@@ -37,15 +37,15 @@ "keywords": [ | ||
"@types/uuid": "^3.4.2", | ||
"ava": "^0.22.0", | ||
"ava": "^0.25.0", | ||
"should": "^13.1.0", | ||
"sinon": "^4.0.1", | ||
"tslint": "^5.8.0", | ||
"typescript": "^2.5.3" | ||
"typescript": "^3.4.5" | ||
}, | ||
"scripts": { | ||
"build": "tsc --declaration && npm run lint && ava", | ||
"build": "node_modules/.bin/tsc --declaration && yarn run lint && node_modules/.bin/ava", | ||
"clean": "rm -r dist/ || mkdir dist", | ||
"dev": "tsc -w & ava -w", | ||
"dev": "node_modules/.bin/tsc -w & node_modules/.bin/ava -w", | ||
"lint": "node_modules/.bin/tslint -p .", | ||
"test": "tsc && ava", | ||
"prepublish": "npm run clean && npm run build" | ||
"test": "node_modules/.bin/tsc && node_modules/.bin/ava", | ||
"prepublish": "yarn run clean && yarn run build" | ||
}, | ||
@@ -52,0 +52,0 @@ "ava": { |
@@ -44,3 +44,3 @@ import test from 'ava' | ||
sinon.match(buffer => buffer.toString() === payload), | ||
{ replyTo: 'amq.rabbitmq.reply-to', correlationId: sinon.match.string } | ||
{ persistent: false, replyTo: 'amq.rabbitmq.reply-to', correlationId: sinon.match.string } | ||
) | ||
@@ -47,0 +47,0 @@ t.pass() |
@@ -10,3 +10,3 @@ import test from 'ava' | ||
fields: { deliveryTag: 1234567890987654321 }, | ||
properties: { replyTo: 'amqp.rabbitmq.reply-to', correlationId: '123456' } | ||
properties: { replyTo: 'amqp.rabbitmq.reply-to', correlationId: '123456', deliveryMode: 2 } | ||
} as Message | ||
@@ -27,3 +27,3 @@ | ||
sinon.assert.calledWith( | ||
spy, '', 'amqp.rabbitmq.reply-to', new Buffer('{"type":"ack"}'), { correlationId: '123456' } | ||
spy, '', 'amqp.rabbitmq.reply-to', new Buffer('{"type":"ack"}'), { correlationId: '123456', deliveryMode: 2 } | ||
) | ||
@@ -38,5 +38,5 @@ t.pass() | ||
sinon.assert.calledWith( | ||
spy, '', 'amqp.rabbitmq.reply-to', new Buffer('{"type":"wait"}'), { correlationId: '123456' } | ||
spy, '', 'amqp.rabbitmq.reply-to', new Buffer('{"type":"wait"}'), { correlationId: '123456', deliveryMode: 2 } | ||
) | ||
t.pass() | ||
}) |
@@ -37,3 +37,3 @@ import test from 'ava' | ||
sinon.assert.calledWith( | ||
spy, '', 'amqp.rabbitmq.reply-to', sinon.match.any, { correlationId: '123456' } | ||
spy, '', 'amqp.rabbitmq.reply-to', sinon.match.any, { correlationId: '123456', deliveryMode: undefined } | ||
) | ||
@@ -40,0 +40,0 @@ t.pass() |
@@ -12,3 +12,3 @@ import test from 'ava' | ||
.exactly(times) | ||
.withExactArgs('', 1234, new Buffer(JSON.stringify({ type })), { correlationId: '12345' }) | ||
.withExactArgs('', 1234, new Buffer(JSON.stringify({ type })), { correlationId: '12345', deliveryMode: undefined }) | ||
.resolves() | ||
@@ -15,0 +15,0 @@ } |
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
New author
Supply chain riskA new npm collaborator published a version of the package for the first time. New collaborators are usually benign additions to a project, but do indicate a change to the security surface area of a package.
Found 1 instance in 1 package
3658
0
169216
112