Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

imq

Package Overview
Dependencies
Maintainers
1
Versions
36
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

imq - npm Package Compare versions

Comparing version 1.2.16 to 1.2.17

2

package.json
{
"name": "imq",
"version": "1.2.16",
"version": "1.2.17",
"description": "Simple JSON-based messaging queue for inter service communication",

@@ -5,0 +5,0 @@ "keywords": [

# I Message Queue (imq)
[![Build Status](https://travis-ci.org/imqueue/imq.svg?branch=master)](https://travis-ci.org/imqueue/imq) [![Coverage Status](https://coveralls.io/repos/github/imqueue/imq/badge.svg?branch=master)](https://coveralls.io/github/imqueue/imq?branch=master) [![License](https://img.shields.io/badge/license-ISC-blue.svg)](https://rawgit.com/imqueue/imq/master/LICENSE)
[![Build Status](https://travis-ci.org/imqueue/imq.svg?branch=master)](https://travis-ci.org/imqueue/imq)
[![codebeat badge](https://codebeat.co/badges/85bb2a18-4ebb-4e48-a2ce-92b7bf438b1a)](https://codebeat.co/projects/github-com-imqueue-imq-master)
[![Coverage Status](https://coveralls.io/repos/github/imqueue/imq/badge.svg?branch=master)](https://coveralls.io/github/imqueue/imq?branch=master)
[![License](https://img.shields.io/badge/license-ISC-blue.svg)](https://rawgit.com/imqueue/imq/master/LICENSE)
Simple JSON-based messaging queue for inter service communication
**Related packages:**
- [imq-rpc](https://github.com/imqueue/imq-rpc) - RPC-like client/service
implementation over imq.
- [imq-cli](https://github.com/imqueue/imq-cli) - Command Line Interface
for imq and imq-rpc.
# Features

@@ -8,0 +18,0 @@

@@ -22,2 +22,11 @@ /// <reference types="node" />

/**
* Batch imq action processing on all registered imqs at once
*
* @access private
* @param {string} action
* @param {string} message
* @return {Promise<this>}
*/
private batch(action, message);
/**
* Starts the messaging queue.

@@ -24,0 +33,0 @@ * Supposed to be an async function.

@@ -36,3 +36,3 @@ "use strict";

this.queueLength = 0;
this.options = Object.assign({}, _1.DEFAULT_IMQ_OPTIONS, options || {});
this.options = _1.buildOptions(_1.DEFAULT_IMQ_OPTIONS, options);
// istanbul ignore next

@@ -55,12 +55,14 @@ this.logger = this.options.logger || console;

/**
* Starts the messaging queue.
* Supposed to be an async function.
* Batch imq action processing on all registered imqs at once
*
* @returns {Promise<ClusteredRedisQueue>}
* @access private
* @param {string} action
* @param {string} message
* @return {Promise<this>}
*/
async start() {
this.logger.log('Starting clustered redis message queue...');
async batch(action, message) {
this.logger.log(message);
const promises = [];
for (let imq of this.imqs) {
promises.push(imq.start());
promises.push(imq[action]());
}

@@ -71,2 +73,11 @@ await Promise.all(promises);

/**
* Starts the messaging queue.
* Supposed to be an async function.
*
* @returns {Promise<ClusteredRedisQueue>}
*/
async start() {
return await this.batch('start', 'Starting clustered redis message queue...');
}
/**
* Stops the queue (should stop handle queue messages).

@@ -78,9 +89,3 @@ * Supposed to be an async function.

async stop() {
this.logger.log('Stopping clustered redis message queue...');
const promises = [];
for (let imq of this.imqs) {
promises.push(imq.stop());
}
await Promise.all(promises);
return this;
return await this.batch('stop', 'Stopping clustered redis message queue...');
}

@@ -118,9 +123,5 @@ /**

async destroy() {
this.logger.log('Destroying clustered redis message queue...');
const promises = [];
for (let imq of this.imqs) {
promises.push(imq.destroy());
}
await Promise.all(promises);
await this.batch('destroy', 'Destroying clustered redis message queue...');
}
// noinspection JSUnusedGlobalSymbols
/**

@@ -133,9 +134,4 @@ * Clears queue data in queue host application.

async clear() {
this.logger.log('Clearing clustered redis message queue...');
const promises = [];
for (let imq of this.imqs) {
promises.push(imq.clear());
}
await Promise.all(promises);
return this;
return await this.batch('clear', 'Clearing clustered redis message queue...');
;
}

@@ -151,2 +147,3 @@ // EventEmitter interface

// istanbul ignore next
// noinspection JSUnusedGlobalSymbols
off(...args) {

@@ -196,3 +193,3 @@ for (let imq of this.imqs) {

for (let imq of this.imqs) {
imq.prependListener.apply(imq, args);
imq.prependOnceListener.apply(imq, args);
}

@@ -218,7 +215,7 @@ return this;

rawListeners(...args) {
let listeners = [];
let rawListeners = [];
for (let imq of this.imqs) {
listeners = listeners.concat(imq.rawListeners.apply(imq, args));
rawListeners = rawListeners.concat(imq.rawListeners.apply(imq, args));
}
return listeners;
return rawListeners;
}

@@ -225,0 +222,0 @@ // istanbul ignore next

@@ -0,1 +1,17 @@

/*!
* Copyright (c) 2018, Mykhailo Stadnyk <mikhus@gmail.com>
*
* Permission to use, copy, modify, and/or distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
* copyright notice and this permission notice appear in all copies.
*
* THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH
* REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
* AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT,
* INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
* LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR
* OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
* PERFORMANCE OF THIS SOFTWARE.
*/
export declare function buildOptions<T>(defaultOptions: T, givenOptions?: Partial<T>): T;
export * from './profile';

@@ -2,0 +18,0 @@ export * from './uuid';

"use strict";
function __export(m) {
for (var p in m) if (!exports.hasOwnProperty(p)) exports[p] = m[p];
}
Object.defineProperty(exports, "__esModule", { value: true });
/*!

@@ -21,2 +17,12 @@ * Copyright (c) 2018, Mykhailo Stadnyk <mikhus@gmail.com>

*/
function __export(m) {
for (var p in m) if (!exports.hasOwnProperty(p)) exports[p] = m[p];
}
Object.defineProperty(exports, "__esModule", { value: true });
// istanbul ignore next
// noinspection JSUnusedGlobalSymbols
function buildOptions(defaultOptions, givenOptions) {
return Object.assign({}, defaultOptions, givenOptions || {});
}
exports.buildOptions = buildOptions;
__export(require("./profile"));

@@ -23,0 +29,0 @@ __export(require("./uuid"));

@@ -25,2 +25,11 @@ import 'reflect-metadata';

export declare const IMQ_LOG_TIME_FORMAT: AllowedTimeFormat;
export interface DebugInfoOptions {
debugTime: boolean;
debugArgs: boolean;
className: string | symbol;
args: any[];
methodName: string | symbol;
start: number;
logger: ILogger;
}
/**

@@ -37,3 +46,3 @@ * Prints debug information

*/
export declare function printDebugInfo(debugTime: boolean, debugArgs: boolean, className: string, args: any[], methodName: string | symbol, start: number, logger?: ILogger): void;
export declare function logDebugInfo({debugTime, debugArgs, className, args, methodName, start, logger}: DebugInfoOptions): void;
/**

@@ -40,0 +49,0 @@ * Implements '@profile' decorator.

@@ -54,5 +54,3 @@ "use strict";

*/
function printDebugInfo(debugTime, debugArgs, className, args, methodName, start,
/* istanbul ignore next */
logger = console) {
function logDebugInfo({ debugTime, debugArgs, className, args, methodName, start, logger }) {
if (debugTime) {

@@ -79,3 +77,3 @@ const time = mt.now() - start;

}
exports.printDebugInfo = printDebugInfo;
exports.logDebugInfo = logDebugInfo;
/**

@@ -130,2 +128,11 @@ * Implements '@profile' decorator.

const result = original.apply(this, args);
const debugOptions = {
debugTime,
debugArgs,
className,
args,
methodName,
start,
logger: this.logger
};
/* istanbul ignore next */

@@ -135,3 +142,3 @@ if (result && typeof result.then === 'function') {

result.then((res) => {
printDebugInfo(debugTime, debugArgs, className, args, methodName, start, this.logger);
logDebugInfo(debugOptions);
return res;

@@ -141,3 +148,3 @@ });

}
printDebugInfo(debugTime, debugArgs, className, args, methodName, start, this.logger);
logDebugInfo(debugOptions);
return result;

@@ -144,0 +151,0 @@ };

@@ -122,2 +122,23 @@ /// <reference types="node" />

/**
* Watch routine
*
* @access private
* @return {Promise<void>}
*/
private processWatch();
/**
* Watch message processor
*
* @access private
* @param {...any[]} args
* @return {Promise<void>}
*/
private onWatchMessage(...args);
/**
* Clears safe check interval
*
* @access private
*/
private cleanSafeCheckInterval();
/**
* Setups watch process on delayed messages

@@ -165,2 +186,11 @@ *

/**
* Emits error
*
* @access private
* @param {string} eventName
* @param {string} message
* @param {Error} err
*/
private emitError(eventName, message, err);
/**
* Acquires owner for watcher connection to this instance of the queue

@@ -172,2 +202,11 @@ *

/**
* Returns watcher lock resolver function
*
* @access private
* @param {Function} resolve
* @param {Function} reject
* @return {Function}
*/
private watchLockResolver(resolve, reject);
/**
* Initializes single watcher connection across all queues with the same

@@ -174,0 +213,0 @@ * prefix.

@@ -123,4 +123,3 @@ "use strict";

};
this.options = exports.DEFAULT_IMQ_OPTIONS;
this.options = Object.assign({}, exports.DEFAULT_IMQ_OPTIONS, options || {});
this.options = _1.buildOptions(exports.DEFAULT_IMQ_OPTIONS, options);
this.pack = this.options.useGzip ? pack : JSON.stringify;

@@ -240,5 +239,3 @@ this.unpack = this.options.useGzip ? unpack : JSON.parse;

// istanbul ignore next
this.emit('error', err, 'OnMessage');
// istanbul ignore next
this.logger.error(`${this.name}: process error - message is invalid, pid ${process.pid}, redis host ${this.redisKey}:`, err);
this.emitError('OnMessage', 'process error - message is invalid', err);
}

@@ -273,3 +270,70 @@ return this;

}
// istanbul ignore next
/**
* Watch routine
*
* @access private
* @return {Promise<void>}
*/
async processWatch() {
const now = Date.now();
let cursor = '0';
while (true) {
try {
const data = await this.writer.scan(cursor, 'match', `${this.options.prefix}:*:worker:*`, 'count', '1000');
cursor = data.shift();
const keys = data.shift() || [];
if (keys.length) {
for (let key of keys) {
const kp = key.split(':');
if (Number(kp.pop()) >= now) {
const qKey = `${kp.shift()}:${kp.shift()}`;
await this.writer.rpoplpush(key, qKey);
}
}
}
if (cursor === '0') {
return;
}
}
catch (err) {
this.emitError('OnSafeDelivery', 'safe queue message delivery problem', err);
return this.cleanSafeCheckInterval();
}
}
}
// istanbul ignore next
/**
* Watch message processor
*
* @access private
* @param {...any[]} args
* @return {Promise<void>}
*/
async onWatchMessage(...args) {
try {
const key = args.pop().split(':');
if (key.pop() !== 'ttl') {
return;
}
key.pop(); // msg id
await this.processDelayed(key.join(':'));
}
catch (err) {
this.emitError('OnWatch', 'watch error', err);
}
}
// istanbul ignore next
/**
* Clears safe check interval
*
* @access private
*/
cleanSafeCheckInterval() {
if (this.safeCheckInterval) {
clearInterval(this.safeCheckInterval);
delete this.safeCheckInterval;
}
}
/**
* Setups watch process on delayed messages

@@ -289,19 +353,5 @@ *

catch (err) {
this.emit('error', err, 'OnConfig');
this.logger.warn(`${this.name}: events config error, pid ${process.pid} on redis host ${this.redisKey}:`, err);
this.emitError('OnConfig', 'events config error', err);
}
this.watcher.on('pmessage', async (...args) => {
try {
const key = args.pop().split(':');
if (key.pop() !== 'ttl') {
return;
}
key.pop(); // msg id
await this.processDelayed(key.join(':'));
}
catch (err) {
this.emit('error', err, 'OnWatch');
this.logger.error(`${this.name}: watch error, pid ${process.pid} on redis host ${this.redisKey}:`, err);
}
});
this.watcher.on('pmessage', this.onWatchMessage.bind(this));
this.watcher.psubscribe('__keyevent@0__:expired', `${this.options.prefix}:delayed:*`);

@@ -312,33 +362,5 @@ // watch for expired unhandled safe queues

if (!this.writer) {
clearInterval(this.safeCheckInterval);
delete this.safeCheckInterval;
return;
return this.cleanSafeCheckInterval();
}
const now = Date.now();
let cursor = '0';
while (true) {
try {
const data = await this.writer.scan(cursor, 'match', `${this.options.prefix}:*:worker:*`, 'count', '1000');
cursor = data.shift();
const keys = data.shift() || [];
if (keys.length) {
for (let key of keys) {
const kp = key.split(':');
if (Number(kp.pop()) >= now) {
const qKey = `${kp.shift()}:${kp.shift()}`;
await this.writer.rpoplpush(key, qKey);
}
}
}
if (cursor === '0') {
return;
}
}
catch (err) {
this.emit('error', err, 'OnSafeDelivery');
clearInterval(this.safeCheckInterval);
delete this.safeCheckInterval;
return;
}
}
await this.processWatch();
}, this.options.safeDeliveryTtl);

@@ -377,5 +399,3 @@ }

// istanbul ignore next
this.emit('error', err, 'OnReadUnsafe');
// istanbul ignore next
this.logger.error(`${this.name}: unsafe reader failed, pid ${process.pid} on redis host ${this.redisKey}:`, err);
this.emitError('OnReadUnsafe', 'unsafe reader failed', err);
}

@@ -412,5 +432,3 @@ });

// istanbul ignore next
this.emit('error', err, 'OnReadSafe');
// istanbul ignore next
this.logger.error(`${this.name}: safe reader failed, pid ${process.pid} on redis host ${this.redisKey}:`, err);
this.emitError('OnReadSafe', 'safe reader failed', err);
}

@@ -463,3 +481,16 @@ });

}
// istanbul ignore next
/**
* Emits error
*
* @access private
* @param {string} eventName
* @param {string} message
* @param {Error} err
*/
emitError(eventName, message, err) {
this.emit('error', err, eventName);
this.logger.error(`${this.name}: ${message}, pid ${process.pid} on redis host ${this.redisKey}:`, err);
}
/**
* Acquires owner for watcher connection to this instance of the queue

@@ -482,4 +513,3 @@ *

catch (err) {
this.emit('error', err, 'OnScriptLoad');
this.logger.error(`${this.name}: script load error, pid ${process.pid} on redis host ${this.redisKey}:`, err);
this.emitError('OnScriptLoad', 'script load error', err);
}

@@ -492,3 +522,27 @@ });

}
// istanbul ignore next
/**
* Returns watcher lock resolver function
*
* @access private
* @param {Function} resolve
* @param {Function} reject
* @return {Function}
*/
watchLockResolver(resolve, reject) {
return (async () => {
try {
const noWatcher = !await this.watcherCount();
if (await this.isLocked() && noWatcher) {
await this.unlock();
await this.ownWatch();
}
resolve();
}
catch (err) {
reject(err);
}
});
}
/**
* Initializes single watcher connection across all queues with the same

@@ -510,15 +564,3 @@ * prefix.

// check for possible dead-lock to resolve
setTimeout(async () => {
try {
const noWatcher = !await this.watcherCount();
if (await this.isLocked() && noWatcher) {
await this.unlock();
await this.ownWatch();
}
resolve();
}
catch (err) {
reject(err);
}
}, intrand(1, 50));
setTimeout(this.watchLockResolver(resolve, reject), intrand(1, 50));
}

@@ -571,7 +613,5 @@ }

this.read();
this.processDelayed(this.key).catch(
// istanbul ignore next
(err) => {
this.emit('error', err, 'OnProcessDelayed');
this.logger.error(`${this.name}: error processing delayed queue, pid ${process.pid} on redis host ${this.redisKey}`, err);
this.processDelayed(this.key).catch((err) => {
this.emitError('OnProcessDelayed', 'error processing delayed queue', err);
});

@@ -637,6 +677,3 @@ this.initialized = true;

this.removeAllListeners();
if (this.safeCheckInterval) {
clearInterval(this.safeCheckInterval);
delete this.safeCheckInterval;
}
this.cleanSafeCheckInterval();
if (this.watcher) {

@@ -643,0 +680,0 @@ this.watcher.removeAllListeners();

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc