New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

larvitamintercom

Package Overview
Dependencies
Maintainers
1
Versions
136
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

larvitamintercom - npm Package Compare versions

Comparing version 0.1.5 to 0.1.6

77

index.js

@@ -188,3 +188,4 @@ 'use strict';

Intercom.prototype.consume = function(options, msgCb, cb) {
const tasks = [],
const returnObj = {},
tasks = [],
that = this;

@@ -216,2 +217,29 @@

returnObj.cancel = function cancel(cb) {
if (returnObj.data === undefined || returnObj.data['consumer-tag'] === undefined) {
const err = new Error('No consumer tag is defined, consume have probably not been started yet.');
log.warn('larvitamintercom: consume() - ' + err.message);
cb(err);
return;
}
that.handle.basic.cancel(returnObj.data['consumer-tag'], function(err) {
if (err) {
log.warn('larvitamintercom: consume() - end() - Could not canceled consuming. consumer-tag: "' + returnObj.data['consumer-tag'] + '", err: ' + err.message);
} else {
log.verbose('larvitamintercom: consume() - cancel() - Canceled consuming. consumer-tag: "' + returnObj.data['consumer-tag'] + '"');
}
cb(err);
});
/* We could not get this to work :( // Lilleman and gagge 2016-12-27
that.handle.once(that.channelName + ':basic.cancel-ok', function(channel, method, data) {
log.verbose('larvitamintercom: consume() - cancel() - Canceled consuming.');
log.debug('larvitamintercom: consume() - cancel() - Canceled consuming. channel: ' + JSON.stringify(channel));
log.debug('larvitamintercom: consume() - cancel() - Canceled consuming. method: ' + JSON.stringify(method));
log.debug('larvitamintercom: consume() - cancel() - Canceled consuming. data: ' + JSON.stringify(data));
cb();
});*/
};
// Declare exchange

@@ -250,2 +278,5 @@ tasks.push(function(cb) {

that.handle.once(that.channelName + ':basic.consume-ok', function(channel, method, data) {
returnObj.channel = channel;
returnObj.method = method;
returnObj.data = data;
log.verbose('larvitamintercom: consume() - Started consuming with consumer tag: "' + data['consumer-tag'] + '"');

@@ -282,3 +313,7 @@ cb();

async.series(tasks, cb);
async.series(tasks, function(err) {
if (err) { cb(err); return; }
cb(err, returnObj);
});
};

@@ -531,3 +566,4 @@

Intercom.prototype.subscribe = function(options, msgCb, cb) {
const tasks = [],
const returnObj = {},
tasks = [],
that = this;

@@ -553,2 +589,30 @@

returnObj.cancel = function cancel(cb) {
if (returnObj.data === undefined || returnObj.data['consumer-tag'] === undefined) {
const err = new Error('No consumer tag is defined, consume have probably not been started yet.');
log.warn('larvitamintercom: subscribe() - ' + err.message);
cb(err);
return;
}
that.handle.basic.cancel(returnObj.data['consumer-tag'], function(err) {
if (err) {
log.warn('larvitamintercom: subscribe() - end() - Could not canceled consuming. consumer-tag: "' + returnObj.data['consumer-tag'] + '", err: ' + err.message);
} else {
log.verbose('larvitamintercom: subscribe() - cancel() - Canceled consuming. consumer-tag: "' + returnObj.data['consumer-tag'] + '"');
}
cb(err);
});
/* We could not get this to work :( // Lilleman and gagge 2016-12-27
that.handle.once(that.channelName + ':basic.cancel-ok', function(channel, method, data) {
log.verbose('larvitamintercom: subscribe() - cancel() - Canceled consuming.');
log.debug('larvitamintercom: subscribe() - cancel() - Canceled consuming. channel: ' + JSON.stringify(channel));
log.debug('larvitamintercom: subscribe() - cancel() - Canceled consuming. method: ' + JSON.stringify(method));
log.debug('larvitamintercom: subscribe() - cancel() - Canceled consuming. data: ' + JSON.stringify(data));
cb();
});*/
};
// Declare exchange

@@ -592,2 +656,5 @@ tasks.push(function(cb) {

that.handle.once(that.channelName + ':basic.consume-ok', function(channel, method, data) {
returnObj.channel = channel;
returnObj.method = method;
returnObj.data = data;
log.verbose('larvitamintercom: subscribe() - Started consuming with consumer tag: "' + data['consumer-tag'] + '", queueName: "' + queueName + '"');

@@ -624,5 +691,7 @@ cb();

async.series(tasks, cb);
async.series(tasks, function(err) {
cb(err, returnObj);
});
};
exports = module.exports = Intercom;

2

package.json
{
"name": "larvitamintercom",
"version": "0.1.5",
"version": "0.1.6",
"description": "",

@@ -5,0 +5,0 @@ "main": "index.js",

@@ -57,3 +57,3 @@ [![Build Status](https://travis-ci.org/larvit/larvitamintercom.svg?branch=master)](https://travis-ci.org/larvit/larvitamintercom) [![Dependencies](https://david-dm.org/larvit/larvitamintercom.svg)](https://david-dm.org/larvit/larvitamintercom.svg)

let options = {'exchange': 'foo'}; // Will default to "default" if options is omitted
let options = {'exchange': 'foo'}; // Will default to "default" if options is omitted

@@ -69,6 +69,13 @@ intercom.consume(options, function(message, ack, deliveryTag) {

ack(new Error('Something was wrong with the message'));
}, function(err) {
}, function(err, consumeInstance) {
// Callback from established consume connection
// Stop consuming when your application is not interested any more
setTimeout(function() {
consumeInstance.cancel(function(err) {
if (err) throw err;
// IMPORTANT!!! This callback is syncronous and does NOT guarantee no more messages comes on the consume()
});
}, 3600000);
});
```

@@ -102,4 +109,12 @@

ack(new Error('Something was wrong with the message'));
}, function(err) {
}, function(err, subscribeInstance) {
// Callback from established subscribe connection
// Stop consuming when your application is not interested any more
setTimeout(function() {
subscribeInstance.cancel(function(err) {
if (err) throw err;
// IMPORTANT!!! This callback is syncronous and does NOT guarantee no more messages comes on the subscribe()
});
}, 3600000);
});

@@ -106,0 +121,0 @@ ```

@@ -29,3 +29,3 @@ 'use strict';

for (let i = 0; i < 14; i ++) {
for (let i = 0; i < 18; i ++) {
tasks.push(function(cb) {

@@ -71,3 +71,3 @@ const intercom = new Intercom(config);

after(function(done) {
/*after(function(done) {
const tasks = [];

@@ -80,3 +80,9 @@

tasks.push(function(cb) {
intercom.close(cb);
console.log('Closing intercom ' + i);
intercom.close(function(err) {
if (err) throw err;
console.log('Closed intercom: ' + i);
cb(err);
});
});

@@ -86,3 +92,3 @@ }

async.parallel(tasks, done);
});
});*/

@@ -347,4 +353,4 @@ describe('Send and receive', function() {

this.timeout(500);
this.slow(420); // > 1050 is shown in yellow, 1000ms is setTimeout()
this.timeout(2000);
this.slow(700);

@@ -369,2 +375,96 @@ intercoms[0].send(orgMsg, {'exchange': exchange, 'forceConsumeQueue': true}, function(err) {

});
it('should not receive after the consumation is cancelled', function(done) {
const consumeIntercom = intercoms[14],
sendIntercom = intercoms[15],
exchangeName = 'notReceiveAfterConsumeCancel';
let receivedMessages = 0,
consumeInstance;
this.slow(1000);
// Handle a message from queue
function handleMsg(message, ack) {
ack();
receivedMessages ++;
if (receivedMessages === 1) {
consumeInstance.cancel(function(err) {
if (err) throw err;
// The callback is sadly not trustworthy. Instead wait a bit and try again
setTimeout(function() {
sendAgain();
}, 200);
});
}
}
consumeIntercom.consume({'exchange': exchangeName}, handleMsg, function(err, result) {
consumeInstance = result;
sendIntercom.send({'foo': 'bar1'}, {'exchange': exchangeName}, function(err) {
if (err) throw err;
});
});
function sendAgain() {
sendIntercom.send({'foo': 'bar2'}, {'exchange': exchangeName}, function(err) {
if (err) throw err;
// Wait a while, and then make sure we have not gotten a second message
setTimeout(function() {
assert.deepEqual(receivedMessages, 1);
done();
}, 200);
});
}
});
it('should not receive after the subscription is cancelled', function(done) {
const subscribeIntercom = intercoms[16],
sendIntercom = intercoms[17],
exchangeName = 'notReceiveAfterSubscribeCancel';
let receivedMessages = 0,
subscribeInstance;
this.slow(1000);
// Handle a message from queue
function handleMsg(message, ack) {
ack();
receivedMessages ++;
if (receivedMessages === 1) {
subscribeInstance.cancel(function(err) {
if (err) throw err;
// The callback is sadly not trustworthy. Instead wait a bit and try again
setTimeout(function() {
sendAgain();
}, 200);
});
}
}
subscribeIntercom.subscribe({'exchange': exchangeName}, handleMsg, function(err, result) {
subscribeInstance = result;
sendIntercom.send({'foo': 'bar1'}, {'exchange': exchangeName}, function(err) {
if (err) throw err;
});
});
function sendAgain() {
sendIntercom.send({'foo': 'bar2'}, {'exchange': exchangeName}, function(err) {
if (err) throw err;
// Wait a while, and then make sure we have not gotten a second message
setTimeout(function() {
assert.deepEqual(receivedMessages, 1);
done();
}, 200);
});
}
});
});
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc