Comparing version 1.2.0 to 1.3.0
@@ -55,6 +55,15 @@ import * as amqp from 'amqplib'; | ||
/** | ||
* Immediately tear down the client. Stops consuming replies, closes the | ||
* channel and, if it owns the connection, closes it too. | ||
* Tear down the client, optionally waiting for pending calls to resolve. | ||
* Stops consuming replies, closes the channel and, if it owns the connection, | ||
* closes it too. | ||
* | ||
* When calls are pending and the wait time expired or no wait time was given, | ||
* the calls are rejected with a CallTerminated error. | ||
* | ||
* @param {number} [opts.waitForCalls] How long, in ms, to wait for pending | ||
* calls. Give 0 for indefinitely. | ||
*/ | ||
term(): Promise<void>; | ||
term({waitForCalls}?: { | ||
waitForCalls?: number; | ||
}): Promise<void>; | ||
/** | ||
@@ -61,0 +70,0 @@ * Calls the remote procedure with the given `procedure` and resolves its |
@@ -68,10 +68,28 @@ "use strict"; | ||
/** | ||
* Immediately tear down the client. Stops consuming replies, closes the | ||
* channel and, if it owns the connection, closes it too. | ||
* Tear down the client, optionally waiting for pending calls to resolve. | ||
* Stops consuming replies, closes the channel and, if it owns the connection, | ||
* closes it too. | ||
* | ||
* When calls are pending and the wait time expired or no wait time was given, | ||
* the calls are rejected with a CallTerminated error. | ||
* | ||
* @param {number} [opts.waitForCalls] How long, in ms, to wait for pending | ||
* calls. Give 0 for indefinitely. | ||
*/ | ||
async term() { | ||
await this.amqpClient.term(); | ||
async term({ waitForCalls } = {}) { | ||
if (typeof waitForCalls !== 'undefined' && this.calls.size > 0) { | ||
let waited = 0; | ||
const checkCallsInterval = setInterval(() => { | ||
waited += 50; | ||
if (this.calls.size === 0 || (waitForCalls > 0 && waited > waitForCalls)) { | ||
clearInterval(checkCallsInterval); | ||
return this.term(); | ||
} | ||
}, 50); | ||
return; | ||
} | ||
this.callTimer.clear(); | ||
this.calls.forEach(({ reject }) => reject(new errors_1.CallTerminated())); | ||
this.calls.clear(); | ||
await this.amqpClient.term(); | ||
} | ||
@@ -78,0 +96,0 @@ /** |
@@ -8,6 +8,7 @@ "use strict"; | ||
const errors_1 = require("../../lib/RpcClient/errors"); | ||
const Timer_1 = require("../../lib/Timer"); | ||
ava_1.default.beforeEach(async (t) => { | ||
t.context.client = new RpcClient_1.default({ | ||
amqpClient: { amqpUrl: _config_1.AMQP_URL }, | ||
rpcClient: { idleTimeout: 30 } | ||
rpcClient: { idleTimeout: 30, callTimeout: 100 } | ||
}); | ||
@@ -45,1 +46,6 @@ await t.context.client.init(); | ||
}); | ||
ava_1.default('[unit] #term can wait for pending calls', async (t) => { | ||
const callPromise = t.context.client.call('marco'); | ||
await t.context.client.term({ waitForCalls: 500 }); | ||
await t.throws(callPromise, Timer_1.TimeoutExpired); | ||
}); |
@@ -90,10 +90,32 @@ import * as uuid from 'uuid/v4' | ||
/** | ||
* Immediately tear down the client. Stops consuming replies, closes the | ||
* channel and, if it owns the connection, closes it too. | ||
* Tear down the client, optionally waiting for pending calls to resolve. | ||
* Stops consuming replies, closes the channel and, if it owns the connection, | ||
* closes it too. | ||
* | ||
* When calls are pending and the wait time expired or no wait time was given, | ||
* the calls are rejected with a CallTerminated error. | ||
* | ||
* @param {number} [opts.waitForCalls] How long, in ms, to wait for pending | ||
* calls. Give 0 for indefinitely. | ||
*/ | ||
async term() { | ||
await this.amqpClient.term() | ||
async term({ waitForCalls }: { waitForCalls?: number } = {}) { | ||
if (typeof waitForCalls !== 'undefined' && this.calls.size > 0) { | ||
let waited = 0 | ||
const checkCallsInterval = setInterval(() => { | ||
waited += 50 | ||
if (this.calls.size === 0 || (waitForCalls > 0 && waited > waitForCalls)) { | ||
clearInterval(checkCallsInterval) | ||
return this.term() | ||
} | ||
}, 50) | ||
return | ||
} | ||
this.callTimer.clear() | ||
this.calls.forEach(({ reject }) => reject(new CallTerminated())) | ||
this.calls.clear() | ||
await this.amqpClient.term() | ||
} | ||
@@ -100,0 +122,0 @@ |
{ | ||
"name": "mqrpc", | ||
"version": "1.2.0", | ||
"version": "1.3.0", | ||
"description": "💫 Easy RPC over RabbitMQ", | ||
@@ -5,0 +5,0 @@ "keywords": [ |
@@ -123,6 +123,8 @@ # MQRPC | ||
##### `async client.term()` | ||
##### `async client.term({ waitForCalls: number })` | ||
Neatly shut down the client. Closes the AMQP channel and, if one wasn't provided, the connection as well. | ||
Neatly shut down the client. Terminates any active calls, closes the AMQP channel and, if one wasn't provided, the connection as well. | ||
Will wait up to `waitForCalls` milliseconds for pending calls to resolve, or indefinitely if given 0ms. | ||
## Timeouts | ||
@@ -129,0 +131,0 @@ |
@@ -6,2 +6,3 @@ import test from 'ava' | ||
import { CallTerminated } from '../../lib/RpcClient/errors' | ||
import { TimeoutExpired } from '../../lib/Timer' | ||
@@ -11,3 +12,3 @@ test.beforeEach(async t => { | ||
amqpClient: { amqpUrl: AMQP_URL }, | ||
rpcClient: { idleTimeout: 30 } | ||
rpcClient: { idleTimeout: 30, callTimeout: 100 } | ||
}) | ||
@@ -58,1 +59,9 @@ | ||
}) | ||
test('[unit] #term can wait for pending calls', async t => { | ||
const callPromise = t.context.client.call('marco') | ||
await t.context.client.term({ waitForCalls: 500 }) | ||
await t.throws(callPromise, TimeoutExpired) | ||
}) |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
268894
3563
169
113