Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

amqp-wrapper

Package Overview
Dependencies
Maintainers
7
Versions
32
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

amqp-wrapper - npm Package Compare versions

Comparing version 5.6.0-requeue.0 to 6.0.0-node10.0

65

amqp.js

@@ -14,5 +14,3 @@ 'use strict';

let connection;
let channel;
let consumerTag;
var connection, channel;

@@ -29,4 +27,8 @@ var prefetch = config.prefetch || 10;

function createChannel (err, conn) {
if (err) return d.reject(err);
debug('createChannel()');
if (err) {
return d.reject(err);
}
connection = conn;
conn.createConfirmChannel(assertExchange);

@@ -36,4 +38,8 @@ }

function assertExchange (err, ch) {
if (err) return d.reject(err);
debug('assertExchange()', ch);
if (err) {
return d.reject(err);
}
channel = ch;
channel.prefetch(prefetch);

@@ -44,5 +50,11 @@ channel.assertExchange(config.exchange, 'topic', {}, assertQueues);

function assertQueues (err) {
if (err) return d.reject(err);
if (!config.queue || !config.queue.name) return d.resolve();
queueSetup.setupForConsume(channel, config, d.resolver(cb));
debug('assertQueues()');
if (err) {
return d.reject(err);
}
if (config.queue && config.queue.name) {
queueSetup.setupForConsume(channel, config, d.resolver(cb));
} else {
d.resolve();
}
}

@@ -52,10 +64,7 @@ return d.nodeify(cb);

function requeueAll () {
channel.cancel(consumerTag);
channel.nackAll(true);
}
function close (cb) {
if (!connection) return cb();
return connection.close(cb);
if (connection) {
return connection.close(cb);
}
cb();
}

@@ -96,8 +105,12 @@

*/
function consume (handleMessage, options, cb) {
const d = Deferred();
function consume (handleMessage, options) {
debug('consume()');
function onMessage (message) {
function callback (message) {
function done (err, requeue) {
if (err) return channel.nack(message, false, requeue || false);
if (requeue === undefined) {
requeue = false;
}
if (err) {
return channel.nack(message, false, requeue);
}
channel.ack(message);

@@ -111,3 +124,3 @@ }

} catch (error) {
console.error(error);
console.log(error);
// Do not requeue on exception - it means something unexpected

@@ -119,10 +132,3 @@ // (and prob. non-transitory) happened.

channel.consume(config.queue.name, onMessage, options, consumeCb);
function consumeCb (err, ok) {
if (err) return d.reject(err);
consumerTag = ok.consumerTag;
return d.resolve();
}
return d.nodeify(cb);
channel.consume(config.queue.name, callback, options);
}

@@ -134,4 +140,3 @@

publish,
consume,
requeueAll
consume
};

@@ -138,0 +143,0 @@ };

{
"name": "amqp-wrapper",
"version": "5.6.0-requeue.0",
"version": "6.0.0-node10.0",
"description": "A wrapper around https://github.com/squaremo/amqp.node to make consuming and publishing dead easy.",
"main": "amqp.js",
"scripts": {
"test": "semistandard && NODE_ENV=test mocha test --recursive",
"jshint": "jshint -c .jshintrc --exclude-path .gitignore .",
"codestyle": "jscs -p google lib/ test/",
"test": "semistandard && NODE_ENV=test mocha test --recursive --exit",
"coverage": "nyc -a -c -r html -r text -r lcov npm test"

@@ -27,15 +25,12 @@ },

"dependencies": {
"amqplib": "^0.4.0",
"async": "^0.9.0",
"amqplib": "^0.5.5",
"async": "^3.1.0",
"debug": "^2.6.8",
"deferential": "^1.0.0",
"json-stringify-safe": "^5.0.0",
"q": "^1.4.1"
"q": "^1.5.1"
},
"devDependencies": {
"expect.js": "^0.3.1",
"jscs": "^1.6.1",
"jshint": "^2.8.0",
"lodash": "^2.4.1",
"mocha": "^1.21.4",
"mocha": "^6.2.0",
"nyc": "^11.4.1",

@@ -42,0 +37,0 @@ "sandboxed-module": "^0.3.0",

@@ -10,3 +10,3 @@ 'use strict';

var amqp = AMQP(config);
return amqp.connect().then(function () {
amqp.connect().then(function () {
done();

@@ -13,0 +13,0 @@ }, done);

@@ -21,3 +21,3 @@ 'use strict';

it('should throw with no url', function (done) {
expect(function () { AMQP({exchange: ''}); }).to
expect(function () { AMQP({ exchange: '' }); }).to
.throwError('amqp-wrapper: Invalid config');

@@ -27,3 +27,3 @@ done();

it('should throw with no exchange', function (done) {
expect(function () { AMQP({url: ''}); }).to
expect(function () { AMQP({ url: '' }); }).to
.throwError('amqp-wrapper: Invalid config');

@@ -127,3 +127,3 @@ done();

}
amqp.publish('myqueue', {woo: 'test'}, {}, done);
amqp.publish('myqueue', { woo: 'test' }, {}, done);
});

@@ -139,3 +139,3 @@ });

var amqpLibMock = require('./amqplibmock')({overrides: {ack: ack}});
var amqpLibMock = require('./amqplibmock')({ overrides: { ack: ack } });

@@ -169,3 +169,3 @@ var mockedAMQP = SandboxedModule.require('../amqp', {

messageToDeliver: 'nonvalidjson',
overrides: {nack: nack}
overrides: { nack: nack }
});

@@ -197,3 +197,3 @@

var amqpLibMock = require('./amqplibmock')({overrides: {nack: nack}});
var amqpLibMock = require('./amqplibmock')({ overrides: { nack: nack } });

@@ -218,24 +218,4 @@ var mockedAMQP = SandboxedModule.require('../amqp', {

});
describe('#requeueAll', function () {
it('returns messages to the queue', function (done) {
var amqp = AMQP(config.good);
amqp.connect(function (err) {
if (err) {
return done(err);
}
amqp.publish('myqueue', 'test', { hi: 'there' }, done);
amqp.consume((msg) => {
expect(msg.hi).to.equal('there');
}).then(() => {
amqp.requeueAll();
amqp.consume((msg) => {
expect(msg.hi).to.equal('there');
});
});
});
});
});
});
// vim: set et sw=2 colorcolumn=80:

@@ -8,3 +8,3 @@ module.exports = {

routingKey: 'myRoutingQueue',
options: {deadLetterExchange: 'wow'}
options: { deadLetterExchange: 'wow' }
}

@@ -26,3 +26,3 @@ },

routingKey: ['myRoutingKey', 'myRoutingKey2'],
options: {deadLetterExchange: 'wow'}
options: { deadLetterExchange: 'wow' }
}

@@ -29,0 +29,0 @@ }

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc