New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

@idearium/amqp

Package Overview
Dependencies
Maintainers
4
Versions
5
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@idearium/amqp - npm Package Compare versions

Comparing version 1.0.0-beta.3 to 1.0.0-beta.4

lib/__mocks__/connect.js

5

CHANGELOG.md

@@ -5,2 +5,7 @@ # @idearium/amqp

## v1.0.0-beta.4 - 2023-03-10
- @idearium/amqp is no longer a singleton.
- Introduced tests for @idearium/amqp.
## v1.0.0-beta.3 - 2023-03-07

@@ -7,0 +12,0 @@

16

index.js

@@ -6,6 +6,8 @@ const log = require('@idearium/log')();

const createClient = () => {
const _connection = connect();
const channels = publishingChannels(_connection.connect);
const amqpUrl = process.env.MQ_URL;
const amqp = async (mqUrl = amqpUrl, opts = {}) => {
const connection = await connect(mqUrl, opts);
const channels = publishingChannels(connection);
const consume = async (name, consumer, options) => {

@@ -24,3 +26,3 @@ if (!name) {

if (!_connection.isConnected()) {
if (!connection.isConnected()) {
throw new Error(

@@ -41,3 +43,2 @@ 'You must connect to a server before using consume'

const connection = await _connection.connect();
const channel = await connection.createChannel();

@@ -88,3 +89,3 @@ await channel.assertExchange(exchange, type, {

const publish = async (name, data, options) => {
if (!_connection.isConnected()) {
if (!connection.isConnected()) {
throw new Error(

@@ -99,3 +100,2 @@ 'You must connect to a server before using publish'

return {
connect: _connection.connect,
consume,

@@ -106,2 +106,2 @@ publish,

module.exports = createClient();
module.exports = amqp;

@@ -5,4 +5,2 @@ const amqp = require('amqplib');

const amqpUrl = process.env.MQ_URL;
const redactUrl = (url) => {

@@ -19,59 +17,43 @@ const [protocol, remainder] = url.split('://');

module.exports = () => {
let _connection;
let _state = 'disconnected';
module.exports = async (mqUrl, opts = {}) => {
let state = 'disconnected';
const createConnection = async (mqUrl = amqpUrl, opts = {}) => {
if (!mqUrl) {
throw new Error('mqUrl parameter is required');
}
if (!mqUrl) {
throw new Error('mqUrl parameter is required');
}
const url = redactUrl(mqUrl);
const url = redactUrl(mqUrl);
log.info({ url }, 'Connecting to AMQP server.');
log.info({ url }, 'Connecting to AMQP server.');
_state = 'connecting';
state = 'connecting';
const [err, connection] = await safePromise(amqp.connect(mqUrl, opts));
const [err, connection] = await safePromise(amqp.connect(mqUrl, opts));
if (err) {
log.error({ err, url }, 'Could not connect to AMQP server.');
if (err) {
log.error({ err, url }, 'Could not connect to AMQP server.');
throw err;
}
throw err;
}
_state = 'connected';
state = 'connected';
log.info({ url }, 'Connected to AMQP server.');
log.info({ url }, 'Connected to AMQP server.');
// This needs to be async so that the throw causes Node.js to exit.
connection.on('close', async (connectionErr) => {
_state = 'disconnected';
// This needs to be async so that the throw causes Node.js to exit.
connection.on('close', async (connectionErr) => {
state = 'disconnected';
log.error(
{ err: connectionErr, url },
'Connection to the AMQP server closed.'
);
log.error(
{ err: connectionErr, url },
'Connection to the AMQP server closed.'
);
// This didn't wor for some reason?
throw new Error('The connection to the AMQP server closed.');
});
// This didn't wor for some reason?
throw new Error('The connection to the AMQP server closed.');
});
return connection;
};
connection.isConnected = () => state === 'connected';
const connect = (url, opts) => {
if (!_connection) {
_connection = createConnection(url, opts);
}
// eslint-disable-next-line no-use-before-define
return _connection;
};
return {
connect,
connection: () => _connection,
isConnected: () => _state === 'connected',
};
return connection;
};

@@ -17,3 +17,3 @@ const multiLog = require('@idearium/log/multi')();

const channels = (connect) => {
const channels = (connection) => {
const setupDrain = async ({ exchange, name, routingKey, type }) => {

@@ -88,7 +88,3 @@ // eslint-disable-next-line no-use-before-define

publishingChannels[name] = new Promise((resolve) => {
connect().then(async (connection) => {
const channelPromise = connection.createChannel();
const channel = await channelPromise;
connection.createChannel().then((channel) => {
channel.on('drain', republish);

@@ -95,0 +91,0 @@

{
"name": "@idearium/amqp",
"version": "1.0.0-beta.3",
"version": "1.0.0-beta.4",
"description": "A library for working with AMQP.",

@@ -5,0 +5,0 @@ "main": "index.js",

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc