@idearium/amqp
Advanced tools
Comparing version 1.0.0-beta.3 to 1.0.0-beta.4
@@ -5,2 +5,7 @@ # @idearium/amqp | ||
## v1.0.0-beta.4 - 2023-03-10 | ||
- @idearium/amqp is no longer a singleton. | ||
- Introduced tests for @idearium/amqp. | ||
## v1.0.0-beta.3 - 2023-03-07 | ||
@@ -7,0 +12,0 @@ |
16
index.js
@@ -6,6 +6,8 @@ const log = require('@idearium/log')(); | ||
const createClient = () => { | ||
const _connection = connect(); | ||
const channels = publishingChannels(_connection.connect); | ||
const amqpUrl = process.env.MQ_URL; | ||
const amqp = async (mqUrl = amqpUrl, opts = {}) => { | ||
const connection = await connect(mqUrl, opts); | ||
const channels = publishingChannels(connection); | ||
const consume = async (name, consumer, options) => { | ||
@@ -24,3 +26,3 @@ if (!name) { | ||
if (!_connection.isConnected()) { | ||
if (!connection.isConnected()) { | ||
throw new Error( | ||
@@ -41,3 +43,2 @@ 'You must connect to a server before using consume' | ||
const connection = await _connection.connect(); | ||
const channel = await connection.createChannel(); | ||
@@ -88,3 +89,3 @@ await channel.assertExchange(exchange, type, { | ||
const publish = async (name, data, options) => { | ||
if (!_connection.isConnected()) { | ||
if (!connection.isConnected()) { | ||
throw new Error( | ||
@@ -99,3 +100,2 @@ 'You must connect to a server before using publish' | ||
return { | ||
connect: _connection.connect, | ||
consume, | ||
@@ -106,2 +106,2 @@ publish, | ||
module.exports = createClient(); | ||
module.exports = amqp; |
@@ -5,4 +5,2 @@ const amqp = require('amqplib'); | ||
const amqpUrl = process.env.MQ_URL; | ||
const redactUrl = (url) => { | ||
@@ -19,59 +17,43 @@ const [protocol, remainder] = url.split('://'); | ||
module.exports = () => { | ||
let _connection; | ||
let _state = 'disconnected'; | ||
module.exports = async (mqUrl, opts = {}) => { | ||
let state = 'disconnected'; | ||
const createConnection = async (mqUrl = amqpUrl, opts = {}) => { | ||
if (!mqUrl) { | ||
throw new Error('mqUrl parameter is required'); | ||
} | ||
if (!mqUrl) { | ||
throw new Error('mqUrl parameter is required'); | ||
} | ||
const url = redactUrl(mqUrl); | ||
const url = redactUrl(mqUrl); | ||
log.info({ url }, 'Connecting to AMQP server.'); | ||
log.info({ url }, 'Connecting to AMQP server.'); | ||
_state = 'connecting'; | ||
state = 'connecting'; | ||
const [err, connection] = await safePromise(amqp.connect(mqUrl, opts)); | ||
const [err, connection] = await safePromise(amqp.connect(mqUrl, opts)); | ||
if (err) { | ||
log.error({ err, url }, 'Could not connect to AMQP server.'); | ||
if (err) { | ||
log.error({ err, url }, 'Could not connect to AMQP server.'); | ||
throw err; | ||
} | ||
throw err; | ||
} | ||
_state = 'connected'; | ||
state = 'connected'; | ||
log.info({ url }, 'Connected to AMQP server.'); | ||
log.info({ url }, 'Connected to AMQP server.'); | ||
// This needs to be async so that the throw causes Node.js to exit. | ||
connection.on('close', async (connectionErr) => { | ||
_state = 'disconnected'; | ||
// This needs to be async so that the throw causes Node.js to exit. | ||
connection.on('close', async (connectionErr) => { | ||
state = 'disconnected'; | ||
log.error( | ||
{ err: connectionErr, url }, | ||
'Connection to the AMQP server closed.' | ||
); | ||
log.error( | ||
{ err: connectionErr, url }, | ||
'Connection to the AMQP server closed.' | ||
); | ||
// This didn't wor for some reason? | ||
throw new Error('The connection to the AMQP server closed.'); | ||
}); | ||
// This didn't wor for some reason? | ||
throw new Error('The connection to the AMQP server closed.'); | ||
}); | ||
return connection; | ||
}; | ||
connection.isConnected = () => state === 'connected'; | ||
const connect = (url, opts) => { | ||
if (!_connection) { | ||
_connection = createConnection(url, opts); | ||
} | ||
// eslint-disable-next-line no-use-before-define | ||
return _connection; | ||
}; | ||
return { | ||
connect, | ||
connection: () => _connection, | ||
isConnected: () => _state === 'connected', | ||
}; | ||
return connection; | ||
}; |
@@ -17,3 +17,3 @@ const multiLog = require('@idearium/log/multi')(); | ||
const channels = (connect) => { | ||
const channels = (connection) => { | ||
const setupDrain = async ({ exchange, name, routingKey, type }) => { | ||
@@ -88,7 +88,3 @@ // eslint-disable-next-line no-use-before-define | ||
publishingChannels[name] = new Promise((resolve) => { | ||
connect().then(async (connection) => { | ||
const channelPromise = connection.createChannel(); | ||
const channel = await channelPromise; | ||
connection.createChannel().then((channel) => { | ||
channel.on('drain', republish); | ||
@@ -95,0 +91,0 @@ |
{ | ||
"name": "@idearium/amqp", | ||
"version": "1.0.0-beta.3", | ||
"version": "1.0.0-beta.4", | ||
"description": "A library for working with AMQP.", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
14765
8
408