@emartech/rabbitmq-client
Advanced tools
Comparing version 3.1.3 to 3.2.0
@@ -24,7 +24,7 @@ { | ||
"chai-subset": "1.6.0", | ||
"eslint": "6.8.0", | ||
"eslint": "7.2.0", | ||
"eslint-config-emarsys": "5.1.0", | ||
"eslint-plugin-mocha": "6.3.0", | ||
"mocha": "7.1.2", | ||
"semantic-release": "17.0.7", | ||
"eslint-plugin-mocha": "7.0.1", | ||
"mocha": "7.2.0", | ||
"semantic-release": "17.0.8", | ||
"sinon": "9.0.2", | ||
@@ -40,3 +40,3 @@ "sinon-chai": "3.5.0" | ||
}, | ||
"version": "3.1.3" | ||
"version": "3.2.0" | ||
} |
@@ -41,2 +41,6 @@ 'use strict'; | ||
}); | ||
process.once('SIGTERM', (function() { | ||
this._rabbitMqChannel.cancel(this._consumer.consumerTag); | ||
}).bind(this)); | ||
} catch (error) { | ||
@@ -43,0 +47,0 @@ this._logger.fromError('Consumer initialization error', error); |
@@ -174,2 +174,3 @@ 'use strict'; | ||
channel: channelName, | ||
cancel: () => {}, | ||
onMessages: async function() {} | ||
@@ -176,0 +177,0 @@ }, options); |
@@ -43,8 +43,7 @@ 'use strict'; | ||
await channel.consume(this._channel, async message => { | ||
const { consumerTag } = await channel.consume(this._channel, async message => { | ||
let autoNackTime; | ||
message._status = 'autonack init'; | ||
if (typeof this._autoNackTime === 'number') { | ||
autoNackTime = setTimeout(() => { | ||
logger.error('Consumer auto nack', { status: message._status, content: message.content.toString() }); | ||
logger.error('Consumer auto nack', { content: message.content.toString() }); | ||
channel.nack(message); | ||
@@ -55,12 +54,9 @@ }, this._autoNackTime); | ||
let content = {}; | ||
message._status = 'try'; | ||
try { | ||
content = JSON.parse(message.content.toString()); | ||
message._status = 'JSON parsed'; | ||
await this._onMessage(content, message); | ||
message._status = 'after onMessage'; | ||
if (autoNackTime) clearTimeout(autoNackTime); | ||
await channel.ack(message); | ||
} catch (error) { | ||
message._status = 'error'; | ||
if (autoNackTime) clearTimeout(autoNackTime); | ||
@@ -85,2 +81,6 @@ | ||
}); | ||
process.once('SIGTERM', () => { | ||
channel.cancel(consumerTag); | ||
}); | ||
} catch (error) { | ||
@@ -87,0 +87,0 @@ logger.fromError('Consumer initialization error', error); |
@@ -27,3 +27,3 @@ 'use strict'; | ||
describe('RabbitMQ Consumer', function() { | ||
describe('RabbitMQ Consumer', function () { | ||
let sandbox = sinon.createSandbox(); | ||
@@ -35,4 +35,5 @@ let clock; | ||
let prefetchStub; | ||
let cancelStub; | ||
beforeEach(async function() { | ||
beforeEach(async function () { | ||
clock = sandbox.useFakeTimers(); | ||
@@ -43,7 +44,10 @@ startProcess = null; | ||
prefetchStub = sandbox.stub(); | ||
cancelStub = sandbox.stub(); | ||
const connectionMock = { | ||
createChannel: () => Promise.resolve({ | ||
assertQueue: () => {} | ||
}) | ||
createChannel: () => | ||
Promise.resolve({ | ||
assertQueue: () => {}, | ||
on: () => {} | ||
}) | ||
}; | ||
@@ -55,16 +59,20 @@ sandbox.stub(amqp, 'connect').resolves(connectionMock); | ||
prefetch: prefetchStub, | ||
cancel: cancelStub, | ||
on: () => {}, | ||
consume: async (channelName, consumeFn) => { startProcess = consumeFn; return Promise.resolve(); } | ||
consume: async (channelName, consumeFn) => { | ||
startProcess = consumeFn; | ||
return Promise.resolve({ consumerTag: 'test-tag' }); | ||
} | ||
}); | ||
}); | ||
afterEach(function() { | ||
afterEach(function () { | ||
sandbox.restore(); | ||
}); | ||
it('should create RabbitMqConsumer', async function() { | ||
it('should create RabbitMqConsumer', async function () { | ||
const configuration = { | ||
logger: loggerName, | ||
channel: channelName, | ||
onMessage: async function() {} | ||
onMessage: async function () {} | ||
}; | ||
@@ -77,7 +85,7 @@ | ||
it('should create a RabbitMQ connection', async function() { | ||
it('should create a RabbitMQ connection', async function () { | ||
const configuration = { | ||
logger: loggerName, | ||
channel: channelName, | ||
onMessage: async function() {} | ||
onMessage: async function () {} | ||
}; | ||
@@ -93,3 +101,3 @@ | ||
it('should call onChannelEstablished with channel', async function() { | ||
it('should call onChannelEstablished with channel', async function () { | ||
const options = { | ||
@@ -107,3 +115,3 @@ logger: loggerName, | ||
it('should not retry when message is not parsable as JSON', async function() { | ||
it('should not retry when message is not parsable as JSON', async function () { | ||
const message = { content: Buffer.from('Not a JSON') }; | ||
@@ -113,3 +121,3 @@ const configuration = { | ||
channel: channelName, | ||
onMessage: async function() {} | ||
onMessage: async function () {} | ||
}; | ||
@@ -124,3 +132,3 @@ | ||
it('should not retry when onMessage throws non-retryable error', async function() { | ||
it('should not retry when onMessage throws non-retryable error', async function () { | ||
const message = { content: Buffer.from('{}') }; | ||
@@ -130,3 +138,5 @@ const configuration = { | ||
channel: channelName, | ||
onMessage: async function() { throw Error('test error'); } | ||
onMessage: async function () { | ||
throw Error('test error'); | ||
} | ||
}; | ||
@@ -141,3 +151,3 @@ | ||
it('should retry when onMessage throws retryable error', async function() { | ||
it('should retry when onMessage throws retryable error', async function () { | ||
const logFromErrorSpy = sandbox.spy(Logger.prototype, 'fromError'); | ||
@@ -149,3 +159,5 @@ | ||
channel: channelName, | ||
onMessage: async function() { throw new RetryableError('test error'); } | ||
onMessage: async function () { | ||
throw new RetryableError('test error'); | ||
} | ||
}; | ||
@@ -164,3 +176,3 @@ | ||
it('should log retriable error content if configured', async function() { | ||
it('should log retriable error content if configured', async function () { | ||
const logFromErrorSpy = sandbox.spy(Logger.prototype, 'fromError'); | ||
@@ -172,3 +184,5 @@ | ||
channel: channelName, | ||
onMessage: async function() { throw new RetryableError('test error'); }, | ||
onMessage: async function () { | ||
throw new RetryableError('test error'); | ||
}, | ||
logRetriableErrorContent: true | ||
@@ -188,3 +202,3 @@ }; | ||
it('should autonack if message processing takes too much time', async function() { | ||
it('should autonack if message processing takes too much time', async function () { | ||
const message = { content: Buffer.from('{}') }; | ||
@@ -195,4 +209,4 @@ const configuration = { | ||
autoNackTime: 500, | ||
onMessage: async function() { | ||
await new Promise(resolve => setTimeout(resolve, 1000)); | ||
onMessage: async function () { | ||
await new Promise((resolve) => setTimeout(resolve, 1000)); | ||
} | ||
@@ -209,3 +223,2 @@ }; | ||
}); | ||
}); |
Sorry, the diff of this file is not supported yet
40183
18
989