New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

larvitamintercom

Package Overview
Dependencies
Maintainers
6
Versions
136
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

larvitamintercom - npm Package Compare versions

Comparing version 0.3.7 to 0.4.0

466

index.js

@@ -7,8 +7,7 @@ 'use strict';

bramqp = require('bramqp'),
LUtils = require('larvitutils'),
lUtils = new LUtils(),
{ Log, Utils } = require('larvitutils'),
lUtils = new Utils(),
async = require('async'),
url = require('url'),
net = require('net'),
_ = require('lodash');
net = require('net');

@@ -30,4 +29,3 @@ /**

function Intercom(options) {
const tasks = [],
that = this;
const that = this;

@@ -42,3 +40,3 @@ let logPrefix = topLogPrefix + 'Intercom() - ',

if ( ! options.log) {
options.log = new lUtils.Log();
options.log = new Log();
}

@@ -69,2 +67,4 @@

that.log.verbose(logPrefix + 'Initializing on loopback interface');
that.initializeListeners();
} else {

@@ -75,271 +75,317 @@ that.loopback = false;

that.socket = net.connect({
'port': that.port,
'host': that.host
});
function openSocket() {
let connectionOptions = {
'port': that.port,
'host': that.host
};
that.log.verbose(logPrefix + 'Initializing on ' + that.host + ':' + that.port);
that.log.info(logPrefix + 'Initializing socket on ' + that.host + ':' + that.port);
that.socket.on('error', function (err) {
if (that.expectingClose !== false) {
that.log.verbose(logPrefix + 'expected socket close, but also got socket error: ' + err.message);
} else {
that.log.error(logPrefix + 'socket error: ' + err.message);
}
});
that.socket.on('close', function (err) {
that.log.verbose(logPrefix + 'socket closed');
if (err) {
that.socket = net.connect(connectionOptions);
that.socket.setKeepAlive = true;
that.socket.on('connect', function () {
that.log.verbose(logPrefix + 'Socket connected to ' + that.host + ':' + that.port);
onSocketConnect(function (err) {
if (err) {
that.log.error(logPrefix + ' Couldn\'t Initialize connection to rabbitmq');
}
that.initializeListeners();
});
});
that.socket.on('error', function (err) {
if (that.expectingClose !== false) {
that.log.verbose(logPrefix + 'expected socket close, but also got socket error: ' + err.message);
} else {
that.log.error(logPrefix + 'socket error: ' + err.message);
}
});
that.socket.on('close', function (err) {
that.socket.destroy();
that.socket.unref();
if (that.expectingClose !== false) {
that.log.verbose(logPrefix + 'socket closed with error, err: ' + err.message);
} else {
that.log.error(logPrefix + 'socket closed with error, err: ' + err.message);
setTimeout(openSocket, 1000);
}
}
});
});
that.socket.on('end', function () {
that.log.info(logPrefix + 'socket connection ended by remote');
});
that.socket.on('end', function () {
that.log.info(logPrefix + 'socket connection ended by remote');
});
}
// Create handle by socket connect to rabbitmq
tasks.push(function (cb) {
bramqp.initialize(that.socket, 'rabbitmq/full/amqp0-9-1.stripped.extended', function (err, result) {
if (err) {
that.log.error(logPrefix + 'Error connecting to ' + that.host + ':' + that.port + ' err: ' + err.message);
that.emit('error', err);
}
function onSocketConnect(cb) {
const tasks = [];
// log.silly(logPrefix + 'bramqp.initialize() ran on ' + that.host + ':' + that.port);
// Create handle by socket connect to rabbitmq
tasks.push(function (cb) {
bramqp.initialize(that.socket, 'rabbitmq/full/amqp0-9-1.stripped.extended', function (err, result) {
if (err) {
that.log.error(logPrefix + 'Error connecting to ' + that.host + ':' + that.port + ' err: ' + err.message);
that.emit('error', err);
}
that.handle = result;
// log.silly(logPrefix + 'bramqp.initialize() ran on ' + that.host + ':' + that.port);
cb(err);
that.handle = result;
cb(err);
});
});
});
// Open AMQP communication
tasks.push(function (cb) {
const heartBeat = true,
auth = parsedConStr.auth;
// Open AMQP communication
tasks.push(function (cb) {
const heartBeat = true,
auth = parsedConStr.auth;
let username,
password;
let username,
password;
if (auth) {
username = parsedConStr.auth.split(':')[0];
password = parsedConStr.auth.split(':')[1];
}
if (auth) {
username = parsedConStr.auth.split(':')[0];
password = parsedConStr.auth.split(':')[1];
}
that.log.debug(logPrefix + 'openAMQPCommunication running on ' + that.host + ':' + that.port + ' with username: ' + username);
that.log.debug(logPrefix + 'openAMQPCommunication running on ' + that.host + ':' + that.port + ' with username: ' + username);
that.handle.openAMQPCommunication(username, password, heartBeat, function (err) {
if (err) {
that.log.error(logPrefix + 'Error opening AMQP communication: ' + err.message);
that.emit('error', err);
}
that.handle.openAMQPCommunication(username, password, heartBeat, function (err) {
if (err) {
that.log.error(logPrefix + 'Error opening AMQP communication: ' + err.message);
that.emit('error', err);
}
cb(err);
cb(err);
});
});
});
async.series(tasks, function (err) {
if (err) return cb(err);
cb();
});
};
openSocket();
}
// Register listener for incoming messages
tasks.push(function (cb) {
that.handle.on(that.channelName + ':basic.deliver', function (channel, method, data) {
const exchange = data.exchange,
deliveryTag = data['delivery-tag'],
consumerTag = data['consumer-tag'];
// log.silly(logPrefix + 'Incoming message. exchange: "' + exchange + '", consumerTag: "' + consumerTag + '", deliveryTag: "' + deliveryTag + '"');
Intercom.prototype.initializeListeners = function () {
const tasks = [],
that = this;
that.handle.once('content', function (channel, className, properties, content) {
let message;
// Register listener for incoming messages
tasks.push(function (cb) {
that.handle.on(that.channelName + ':basic.deliver', function (channel, method, data) {
const exchange = data.exchange,
deliveryTag = data['delivery-tag'],
consumerTag = data['consumer-tag'];
// log.silly(logPrefix + 'Incoming message content. exchange: "' + exchange + '", consumerTag: "' + consumerTag + '", deliveryTag: "' + deliveryTag + '", content: "' + content.toString() + '"');
// log.silly(logPrefix + 'Incoming message. exchange: "' + exchange + '", consumerTag: "' + consumerTag + '", deliveryTag: "' + deliveryTag + '"');
try {
message = JSON.parse(content.toString());
} catch (err) {
that.log.warn(logPrefix + 'subscribe() - Could not parse incoming message. exchange: "' + exchange + '", consumerTag: "' + consumerTag + '", deliveryTag: "' + deliveryTag + '", content: "' + content.toString() + '"');
return;
}
that.handle.once('content', function (channel, className, properties, content) {
let message;
if (lUtils.formatUuid(message.uuid) === false) {
that.log.warn(logPrefix + 'consume() - Message does not contain uuid. exchange: "' + exchange + '", consumerTag: "' + consumerTag + '", deliveryTag: "' + deliveryTag + '", content: "' + content.toString() + '"');
}
// log.silly(logPrefix + 'Incoming message content. exchange: "' + exchange + '", consumerTag: "' + consumerTag + '", deliveryTag: "' + deliveryTag + '", content: "' + content.toString() + '"');
that.emit('incoming_msg_' + exchange, message, deliveryTag);
try {
message = JSON.parse(content.toString());
} catch (err) {
that.log.warn(logPrefix + 'subscribe() - Could not parse incoming message. exchange: "' + exchange + '", consumerTag: "' + consumerTag + '", deliveryTag: "' + deliveryTag + '", content: "' + content.toString() + '"');
return;
}
if (lUtils.formatUuid(message.uuid) === false) {
that.log.warn(logPrefix + 'consume() - Message does not contain uuid. exchange: "' + exchange + '", consumerTag: "' + consumerTag + '", deliveryTag: "' + deliveryTag + '", content: "' + content.toString() + '"');
}
that.emit('incoming_msg_' + exchange, message, deliveryTag);
});
});
cb();
});
cb();
});
// Register listener for close events
tasks.push(function (cb) {
that.handle.on('connection.close', function (channel, method, data) {
if (that.expectingClose === false) {
that.log.error(logPrefix + 'Unexpected connection.close! channel: "' + channel + '" data: "' + JSON.stringify(data) + '"');
} else {
that.log.info(logPrefix + 'Expected connetion.close. channel: "' + channel + '" data: "' + JSON.stringify(data) + '"');
}
tasks.push(function (cb) {
that.handle.on('error', function (err) {
that.log.error(logPrefix + 'RabbitMQ connection error :' + err.message);
});
cb();
});
cb();
});
// Log all handle events
// Should be disabled in production code and only manually enabled while debugging due to it being expensive
/** /tasks.push(function (cb) {
const oldEmitter = that.handle.emit;
// Register listener for close events
tasks.push(function (cb) {
that.handle.on('connection.close', function (channel, method, data) {
if (that.expectingClose === false) {
that.log.error(logPrefix + 'Unexpected connection.close! channel: "' + channel + '" data: "' + JSON.stringify(data) + '"');
} else {
that.log.info(logPrefix + 'Expected connetion.close. channel: "' + channel + '" data: "' + JSON.stringify(data) + '"');
}
});
cb();
});
that.handle.emit = function () {
const emitArgs = arguments;
// Log all handle events
// Should be disabled in production code and only manually enabled while debugging due to it being expensive
/** /tasks.push(function (cb) {
const oldEmitter = that.handle.emit;
that.log.silly(topLogPrefix + 'handle.on("' + arguments[0] + '"), all arguments: "' + JSON.stringify(arguments) + '"');
that.handle.emit = function () {
const emitArgs = arguments;
oldEmitter.apply(that.handle, arguments);
}
that.log.silly(topLogPrefix + 'handle.on("' + arguments[0] + '"), all arguments: "' + JSON.stringify(arguments) + '"');
cb();
});/**/
oldEmitter.apply(that.handle, arguments);
}
// Construct generic handle comms
tasks.push(function (cb) {
const cmdStrsWithoutOk = ['basic.publish', 'content', 'closeAMQPCommunication', 'basic.nack', 'basic.ack', 'basic.qos'];
cb();
});/**/
that.handle.cmd = function cmd(cmdStr, params, cb) {
if (typeof cb !== 'function') {
cb = function () {};
}
// Construct generic handle comms
tasks.push(function (cb) {
const cmdStrsWithoutOk = ['basic.publish', 'content', 'closeAMQPCommunication', 'basic.nack', 'basic.ack', 'basic.qos'];
that.cmdQueue.push({'cmdStr': cmdStr, 'params': params, 'cb': cb});
that.handle.cmd = function cmd(cmdStr, params, cb) {
if (typeof cb !== 'function') {
cb = function () {};
}
// log.silly(logPrefix + 'handle.cmd() - cmdStr: "' + cmdStr + '" added to run queue. params: "' + JSON.stringify(params) + '"');
that.cmdQueue.push({'cmdStr': cmdStr, 'params': params, 'cb': cb});
if (that.cmdInProgress === true) {
// log.silly(logPrefix + 'handle.cmd() - cmdStr: "' + cmdStr + '" cmdInProgress === true');
return;
}
// log.silly(logPrefix + 'handle.cmd() - cmdStr: "' + cmdStr + '" added to run queue. params: "' + JSON.stringify(params) + '"');
// log.silly(logPrefix + 'handle.cmd() - cmdStr: "' + cmdStr + '" cmdInProgress !== true');
if (that.cmdInProgress === true) {
// log.silly(logPrefix + 'handle.cmd() - cmdStr: "' + cmdStr + '" cmdInProgress === true');
return;
}
that.cmdInProgress = true;
// log.silly(logPrefix + 'handle.cmd() - cmdStr: "' + cmdStr + '" cmdInProgress !== true');
function readFromQueue() {
const mainParams = that.cmdQueue.shift(),
cmdStr = mainParams.cmdStr,
tasks = [],
cb = mainParams.cb;
that.cmdInProgress = true;
let params = mainParams.params,
channel,
method,
data;
function readFromQueue() {
const mainParams = that.cmdQueue.shift(),
cmdStr = mainParams.cmdStr,
tasks = [],
cb = mainParams.cb;
if ( ! Array.isArray(params)) {
params = [];
}
let params = mainParams.params,
channel,
method,
data;
// Register the callback
tasks.push(function (cb) {
const cmdGroupName = cmdStr.split('.')[0],
cmdName = cmdStr.split('.')[1];
if ( ! Array.isArray(params)) {
params = [];
}
let callCb = true,
okTimeout;
// Register the callback
tasks.push(function (cb) {
const cmdGroupName = cmdStr.split('.')[0],
cmdName = cmdStr.split('.')[1];
function cmdCb(err) {
if (err) {
that.log.error(logPrefix + 'handle.cmd() - readFromQueue() - cmdStr: "' + cmdStr + '" failed, err: ' + err.message);
callCb = false;
return cb(err);
let callCb = true,
okTimeout;
function cmdCb(err) {
if (err) {
that.log.error(logPrefix + 'handle.cmd() - readFromQueue() - cmdStr: "' + cmdStr + '" failed, err: ' + err.message);
callCb = false;
return cb(err);
}
// log.silly(logPrefix + 'handle.cmd() - readFromQueue() - cmdStr: "' + cmdStr + '" succeeded');
if (cmdStrsWithoutOk.indexOf(cmdStr) !== - 1) {
return cb();
}
}
// log.silly(logPrefix + 'handle.cmd() - readFromQueue() - cmdStr: "' + cmdStr + '" succeeded');
if (cmdStrsWithoutOk.indexOf(cmdStr) !== - 1) {
if (that.loopback === true) {
return cb();
}
}
if (that.loopback === true) {
return cb();
}
if (cmdStrsWithoutOk.indexOf(cmdStr) === - 1 && that.loopback === false) {
okTimeout = setTimeout(function () {
const err = new Error('no answer received from queue within 10s');
that.log.error(topLogPrefix + 'handle.cmd() - readFromQueue() - cmdStr: "' + cmdStr + '", ' + err.message);
callCb = false;
cb(err);
}, 10000);
if (cmdStrsWithoutOk.indexOf(cmdStr) === - 1 && that.loopback === false) {
okTimeout = setTimeout(function () {
const err = new Error('no answer received from queue within 10s');
that.log.error(topLogPrefix + 'handle.cmd() - readFromQueue() - cmdStr: "' + cmdStr + '", ' + err.message);
callCb = false;
cb(err);
}, 10000);
that.handle.once(that.channelName + ':' + cmdStr + '-ok', function (x, y, z) {
// We want these in the outer scope, thats why the weird naming
channel = x;
method = y;
data = z;
that.handle.once(that.channelName + ':' + cmdStr + '-ok', function (x, y, z) {
// We want these in the outer scope, thats why the weird naming
channel = x;
method = y;
data = z;
// log.silly(topLogPrefix + 'handle.cmd() - readFromQueue() - cmdStr: "' + cmdStr + '", answer received from queue');
if (callCb === false) {
that.log.warn(topLogPrefix + 'handle.cmd() - readFromQueue() - cmdStr: "' + cmdStr + '", answer received but to late; timeout have already happened');
return;
}
clearTimeout(okTimeout);
cb();
});
}
// log.silly(topLogPrefix + 'handle.cmd() - readFromQueue() - cmdStr: "' + cmdStr + '", answer received from queue');
if (callCb === false) {
that.log.warn(topLogPrefix + 'handle.cmd() - readFromQueue() - cmdStr: "' + cmdStr + '", answer received but to late; timeout have already happened');
return;
}
clearTimeout(okTimeout);
cb();
});
}
params.push(cmdCb);
params.push(cmdCb);
if (cmdName) {
that.handle[cmdGroupName][cmdName].apply(that.handle, params);
} else {
that.handle[cmdGroupName].apply(that.handle, params);
}
});
if (cmdName) {
that.handle[cmdGroupName][cmdName].apply(that.handle, params);
} else {
that.handle[cmdGroupName].apply(that.handle, params);
}
});
async.series(tasks, function (err) {
cb(err, channel, method, data);
async.series(tasks, function (err) {
cb(err, channel, method, data);
if (that.cmdQueue.length === 0) {
// log.silly(topLogPrefix + 'handle.cmd() - readFromQueue() - cmdStr: "' + cmdStr + '" cmdQueue.length === 0');
that.cmdInProgress = false;
} else {
// log.silly(topLogPrefix + 'handle.cmd() - readFromQueue() - cmdStr: "' + cmdStr + '" readFromQueue() rerunning');
readFromQueue();
}
});
}
readFromQueue();
};
cb();
});
if (that.cmdQueue.length === 0) {
// log.silly(topLogPrefix + 'handle.cmd() - readFromQueue() - cmdStr: "' + cmdStr + '" cmdQueue.length === 0');
that.cmdInProgress = false;
} else {
// log.silly(topLogPrefix + 'handle.cmd() - readFromQueue() - cmdStr: "' + cmdStr + '" readFromQueue() rerunning');
readFromQueue();
}
});
}
readFromQueue();
};
cb();
});
// Set QoS to 10
tasks.push(function (cb) {
const prefetchSize = 0,
prefetchCount = 10,
global = true;
// Set QoS to 10
tasks.push(function (cb) {
const prefetchSize = 0,
prefetchCount = 10,
global = true;
that.handle.cmd('basic.qos', [that.channelName, prefetchSize, prefetchCount, global], function (err) {
that.log.verbose(logPrefix + 'basic.qos set to: "' + prefetchCount + '"');
cb(err);
that.handle.cmd('basic.qos', [that.channelName, prefetchSize, prefetchCount, global], function (err) {
that.log.verbose(logPrefix + 'basic.qos set to: "' + prefetchCount + '"');
cb(err);
});
});
});
async.series(tasks, function (err) {
if ( ! err) {
if (that.loopback === true) {
that.log.verbose(logPrefix + 'Initialized on loopback interface');
} else {
that.log.verbose(logPrefix + 'Initialized on ' + that.host + ':' + that.port);
async.series(tasks, function (err) {
if ( ! err) {
if (that.loopback === true) {
that.log.verbose(logPrefix + 'Initialized on loopback interface');
} else {
that.log.verbose(logPrefix + 'Initialized on ' + that.host + ':' + that.port);
}
that.queueReady = true;
setImmediate(function () {
that.emit('ready');
});
}
that.queueReady = true;
setImmediate(function () {
that.emit('ready');
});
}
});
});
};
}

@@ -811,3 +857,3 @@

) {
const newOrgMsg = _.cloneDeep(orgMsg);
const newOrgMsg = JSON.parse(JSON.stringify(orgMsg));

@@ -814,0 +860,0 @@ if (that.loopbackConQueue[options.exchange] === undefined) {

{
"name": "larvitamintercom",
"version": "0.3.7",
"version": "0.4.0",
"description": "",
"main": "index.js",
"scripts": {
"test": "mocha"
"lint": "eslint *.js test/*.js",
"test:unit": "mocha --exit --bail 'test/**/*.js'",
"test": "npm run lint && npm run test:unit"
},

@@ -27,16 +29,14 @@ "repository": {

"dependencies": {
"async": "^2.0.1",
"bramqp": "^0.6.0",
"larvitutils": "^2.0.0",
"lodash": "^4.17.4",
"uuid": "^3.0.0"
"async": "^3.2.3",
"bramqp": "^0.6.1",
"larvitutils": "^5.0.0",
"uuid": "^8.3.2"
},
"devDependencies": {
"eslint": "5.16.0",
"eslint-config-google": "0.11.0",
"eslint-plugin-html": "5.0.3",
"eslint-plugin-import": "2.17.1",
"mocha": "6.1.3",
"mocha-eslint": "5.0.0"
"eslint": "8.9.0",
"eslint-config-google": "0.14.0",
"eslint-plugin-html": "6.2.0",
"eslint-plugin-import": "2.25.4",
"mocha": "9.2.0"
}
}

@@ -9,2 +9,5 @@ [![Build Status](https://travis-ci.org/larvit/larvitamintercom.svg?branch=master)](https://travis-ci.org/larvit/larvitamintercom) [![Dependencies](https://david-dm.org/larvit/larvitamintercom.svg)](https://david-dm.org/larvit/larvitamintercom.svg)

### Connection
When instantiating a new intercom it will try to connect instantly and on connection error or connection lost it will try to reconnect an infinite number of times every 1sec.
### Send

@@ -11,0 +14,0 @@

@@ -6,6 +6,6 @@ 'use strict';

assert = require('assert'),
LUtils = require('larvitutils'),
lUtils = new LUtils(),
{ Log, Utils } = require('larvitutils'),
lUtils = new Utils(),
async = require('async'),
log = new lUtils.Log(),
log = new Log(),
fs = require('fs');

@@ -12,0 +12,0 @@

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc