Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

amqper

Package Overview
Dependencies
Maintainers
1
Versions
24
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

amqper - npm Package Compare versions

Comparing version 0.2.0 to 0.2.1

43

lib/client.js
"use strict";
var debug = require('debug')('amqper:client');
var deprecate = require('depd')('amqper');
var EventEmitter = require('events').EventEmitter;
var _ = require('lodash');
var util = require('util');
var when = require('when');
var Promise = require('bluebird');
var codecs = require('./codecs');

@@ -69,3 +69,3 @@

var codec = that.codec;
return when.try(function () {
return Promise.try(function () {
content = bufferify(codec.encode(content));

@@ -89,2 +89,16 @@ return channel.publish(exchange, routingKey, content);

}
if (handler && handler.length > 1) {
deprecate('route handler arguments > 1');
}
function fn(message) {
if (!handler) return;
if (handler.length > 1) {
handler(null, message);
} else {
handler(message);
}
}
var that = this;

@@ -94,5 +108,5 @@ return this.$promise.then(function () {

var router = that.context.route(route, options, function (message) {
when.try(function () {
Promise.try(function () {
message.payload = codec.decode(message.content);
return handler(null, message);
return fn(message);
}).then(function () {

@@ -103,3 +117,2 @@ return message.ack();

that.emit('error', err);
handler(err, message);
});

@@ -117,3 +130,3 @@ });

if (cb) cb();
return when.resolve();
return Promise.resolve();
}

@@ -123,3 +136,3 @@ this.closing = true;

return close_connection(this.conn).then(function () {
return when.all(when.map(that.routers, function (router) {
return Promise.all(Promise.map(that.routers, function (router) {
return router.connection.then(function (conn) {

@@ -139,10 +152,10 @@ if (conn === that.conn) return;

function close_connection(conn) {
if (!conn || conn.cloing || conn.closed) return when.resolve();
return when.try(function () {
var d = when.defer();
conn.once('close', function () {
d.resolve();
if (!conn || conn.cloing || conn.closed) return Promise.resolve();
return Promise.try(function () {
return new Promise(function (resolve) {
conn.once('close', function () {
resolve();
});
conn.close();
});
conn.close();
return d.promise;
}).catch(function (err) {

@@ -149,0 +162,0 @@ console.error(err.stack);

{
"name": "amqper",
"version": "0.2.0",
"version": "0.2.1",
"description": "A simple and elegant AMQP client for node based on amqplib.",

@@ -19,3 +19,5 @@ "homepage": "https://github.com/taoyuan/amqper",

"amqplib": "^0.3.2",
"bluebird": "^2.9.34",
"debug": "^2.2.0",
"depd": "^1.0.1",
"houkou": "^0.2.2",

@@ -22,0 +24,0 @@ "lodash": "^3.10.0",

@@ -34,3 +34,3 @@ 'use strict';

client.$promise.then(function () {
client.route('test1.:arg', {queue: 'this_is_queue_name_1'}, function (err, message) {
client.route('test1.:arg', {queue: 'this_is_queue_name_1'}, function (message) {
t.deepEqual(message.payload, data);

@@ -54,3 +54,3 @@ delayCloseClient(client, done);

client.format('msgpack');
client.route('test2.:arg', {queue: 'this_is_queue_name_2'}, function (err, message) {
client.route('test2.:arg', {queue: 'this_is_queue_name_2'}, function (message) {
t.deepEqual(message.payload, data);

@@ -57,0 +57,0 @@ delayCloseClient(client, done);

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc