New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

imq

Package Overview
Dependencies
Maintainers
1
Versions
36
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

imq - npm Package Compare versions

Comparing version 1.0.3 to 1.1.0

7

benchmark/index.js

@@ -55,2 +55,4 @@ "use strict";

.describe('z', 'use gzip for message encoding/decoding')
.alias('s', 'safe')
.describe('s', 'use safe (guaranteed) message delivery algorithm')
.alias('e', 'example-message')

@@ -61,3 +63,3 @@ .describe('e', 'Path to a file containing JSON of example message to ' +

.describe('t', 'Increase sample message data given number of times')
.boolean(['h', 'z'])
.boolean(['h', 'z', 's'])
.argv;

@@ -75,2 +77,3 @@ const na = require('nodeaffinity');

const MSG_MULTIPLIER = Number(ARGV.t) || 0;
const SAFE_DELIVERY = ARGV.s;
let SAMPLE_MESSAGE;

@@ -301,3 +304,3 @@ if (ARGV.e) {

try {
const data = yield redis_test_1.run(STEPS, MSG_DELAY, USE_GZIP, SAMPLE_MESSAGE);
const data = yield redis_test_1.run(STEPS, MSG_DELAY, USE_GZIP, SAFE_DELIVERY, SAMPLE_MESSAGE);
process.send('data:' + JSON.stringify(data));

@@ -304,0 +307,0 @@ }

@@ -51,2 +51,5 @@ /*!

.alias('s', 'safe')
.describe('s', 'use safe (guaranteed) message delivery algorithm')
.alias('e', 'example-message')

@@ -59,3 +62,3 @@ .describe('e', 'Path to a file containing JSON of example message to ' +

.boolean(['h', 'z'])
.boolean(['h', 'z', 's'])
.argv;

@@ -77,2 +80,3 @@

const MSG_MULTIPLIER = Number(ARGV.t) || 0;
const SAFE_DELIVERY: boolean = ARGV.s;

@@ -364,2 +368,3 @@ let SAMPLE_MESSAGE: IJson;

USE_GZIP,
SAFE_DELIVERY,
SAMPLE_MESSAGE

@@ -366,0 +371,0 @@ );

@@ -16,2 +16,2 @@ import { IJson } from '../index';

*/
export declare function run(STEPS: number, MSG_DELAY?: number, useGzip?: boolean, jsonExample?: IJson): Promise<{}>;
export declare function run(STEPS: number, MSG_DELAY?: number, useGzip?: boolean, safeDelivery?: boolean, jsonExample?: IJson): Promise<{}>;

@@ -99,3 +99,3 @@ "use strict";

*/
function run(STEPS, MSG_DELAY = 0, useGzip = false, jsonExample = JSON_EXAMPLE) {
function run(STEPS, MSG_DELAY = 0, useGzip = false, safeDelivery = false, jsonExample = JSON_EXAMPLE) {
return __awaiter(this, void 0, void 0, function* () {

@@ -108,3 +108,4 @@ const bytesLen = bytes((useGzip ? index_1.pack : JSON.stringify)(jsonExample), useGzip);

vendor: 'Redis',
useGzip
useGzip,
safeDelivery
};

@@ -111,0 +112,0 @@ const mq = yield index_1.default.create(queueName, options).start();

@@ -95,2 +95,3 @@ /*!

useGzip: boolean = false,
safeDelivery: boolean = false,
jsonExample: IJson = JSON_EXAMPLE

@@ -108,3 +109,4 @@ ) {

vendor: 'Redis',
useGzip
useGzip,
safeDelivery
};

@@ -111,0 +113,0 @@ const mq = await IMQ.create(queueName, options).start();

{
"name": "imq",
"version": "1.0.3",
"version": "1.1.0",
"description": "Simple JSON-based messaging queue for inter service communication",

@@ -5,0 +5,0 @@ "scripts": {

# I Message Queue (imq)
[![Build Status](https://travis-ci.org/Mikhus/imq.svg?branch=master)](https://travis-ci.org/Mikhus/imq) [![License](https://img.shields.io/badge/license-ISC-blue.svg)](https://rawgit.com/Mikhus/imq/master/LICENSE)
Simple JSON-based messaging queue for inter service communication

@@ -108,2 +110,2 @@

[ISC](https://github.com/Mikhus/imq/blob/master/LICENSE)
[ISC](https://rawgit.com/Mikhus/imq/master/LICENSE)
/// <reference types="node" />
import { EventEmitter } from 'events';
export { EventEmitter } from 'events';
/**
* JSON-compatible type definition
*
* @type {any}
*/
export interface IJson {
[name: string]: number | string | boolean | null | IJson | any | number[] | string[] | boolean[] | null[] | IJson[] | any[];
}
/**
* Logger interface
*/
export interface ILogger {
/**
* Log level function
*
* @param {...any[]}
*/
log(...args: any[]): void;
/**
* Info level function
*
* @param {...any[]}
*/
info(...args: any[]): void;
/**
* Warning level function
*
* @param {...any[]}
*/
warn(...args: any[]): void;
/**
* Error level function
*
* @param {...any[]}
*/
error(...args: any[]): void;
}
/**
* Message format
*/
export declare type IMessage = {
/**
* Message unique identifier
*
* @type {string}
*/
id: string;
/**
* Message data. Any JSON-compatible data allowed
*
* @type {IJson}
*/
message: IJson;
/**
* Message source queue name
*
* @type {string}
*/
from: string;
/**
* Message delay in milliseconds (for delayed messages). Optional.
*
* @type {number}
*/
delay?: number;
};
/**
* Message queue options
*/
export declare type IMQOptions = {
/**
* Message queue network host
*
* @type {string}
*/
host: string;
/**
* Message queue network port
*
* @type {number}
*/
port: number;
/**
* Message queue vendor
*
* @type {string}
*/
vendor?: string;
/**
* Message queue global key prefix (namespace)
*
* @type {string}
*/
prefix?: string;
/**
* Logger defined to be used within message queue in runtime
*
* @type {ILogger}
*/
logger?: ILogger;
/**
* Watcher check delay period. This is used by a queue watcher
* agent to make sure at least one watcher is available for
* queue operations.
*
* @type {number}
*/
watcherCheckDelay?: number;
useGzip: boolean;
/**
* A way to serialize message using compression. Will increase
* load to worker process but can decrease network traffic between worker
* and queue host application
*
* @type {boolean}
*/
useGzip?: boolean;
/**
* Enables/disables safe message delivery. When safe message delivery
* is turned on it will use more complex algorithm for message handling
* by a worker process, guaranteeing that if worker fails the message will
* be delivered to another possible worker anyway. In most cases it
* is not required unless it is required by a system design.
*
* @type {boolean}
*/
safeDelivery?: boolean;
/**
* Time-to-live of worker queues (after this time messages are back to
* main queue for handling if worker died). Only works if safeDelivery
* option enabled.
*
* @type {number}
*/
safeDeliveryTtl?: number;
};

@@ -30,11 +142,94 @@ export interface IMessageQueueConstructor {

}
/**
* Generic messaging queue implementation interface
*
* @example
* ~~~typescript
* import { IMessageQueue, EventEmitter, uuid } from 'imq';
*
* class SomeMQAdapter implements IMessageQueue extends EventEmitter {
* public async start(): Promise<SomeMQAdapter> {
* // ... implementation goes here
* return this;
* }
* public async stop(): Promise<SomeMQAdapter> {
* // ... implementation goes here
* return this;
* }
* public async send(
* toQueue: string,
* message: IJson,
* delay?: number
* ): Promise<string> {
* const messageId = uuid();
* // ... implementation goes here
* return messageId;
* }
* public async destroy(): Promise<void> {
* // ... implementation goes here
* }
* public async clear(): Promise<SomeMQAdapter> {
* // ... implementation goes here
* return this;
* }
* }
* ~~~
*/
export interface IMessageQueue extends EventEmitter {
/**
* @event message (message: IJson, id: string, from: string)
* Message event. Occurs every time queue got the message.
*
* @event IMessageQueue#message
* @type {IJson} message - message data
* @type {string} id - message identifier
* @type {string} from - source queue produced the message
*/
/**
* Message event. Occurs every time queue got the message.
*
* @event IMessageQueue#error
* @type {Error} err - error caught by message queue
* @type {string} code - message queue error code
*/
/**
* Starts the messaging queue.
* Supposed to be an async function.
*
* @returns {Promise<IMessageQueue>}
*/
start(): Promise<IMessageQueue>;
/**
* Stops the queue (should stop handle queue messages).
* Supposed to be an async function.
*
* @returns {Promise<IMessageQueue>}
*/
stop(): Promise<IMessageQueue>;
/**
* Sends a message to given queue name with the given data.
* Supposed to be an async function.
*
* @param {string} toQueue - queue name to which message should be sent to
* @param {IJson} message - message data
* @param {number} [delay] - if specified, message will be handled in the
* target queue after specified period of time
* in milliseconds.
* @returns {Promise<string>} - message identifier
*/
send(toQueue: string, message: IJson, delay?: number): Promise<string>;
destroy(): void;
/**
* Safely destroys current queue, unregistering all set event
* listeners and connections.
* Supposed to be an async function.
*
* @returns {Promise<void>}
*/
destroy(): Promise<void>;
/**
* Clears queue data in queue host application.
* Supposed to be an async function.
*
* @returns {Promise<IMessageQueue>}
*/
clear(): Promise<IMessageQueue>;
}
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
var events_1 = require("events");
exports.EventEmitter = events_1.EventEmitter;
//# sourceMappingURL=IMessageQueue.js.map

@@ -19,3 +19,9 @@ /*!

import { EventEmitter } from 'events';
export { EventEmitter } from 'events';
/**
* JSON-compatible type definition
*
* @type {any}
*/
export interface IJson {

@@ -26,24 +32,144 @@ [name: string]: number | string | boolean | null | IJson | any |

/**
* Logger interface
*/
export interface ILogger {
/**
* Log level function
*
* @param {...any[]}
*/
log(...args: any[]): void,
/**
* Info level function
*
* @param {...any[]}
*/
info(...args: any[]): void;
/**
* Warning level function
*
* @param {...any[]}
*/
warn(...args: any[]): void;
/**
* Error level function
*
* @param {...any[]}
*/
error(...args: any[]): void;
}
/**
* Message format
*/
export type IMessage = {
/**
* Message unique identifier
*
* @type {string}
*/
id: string,
/**
* Message data. Any JSON-compatible data allowed
*
* @type {IJson}
*/
message: IJson,
/**
* Message source queue name
*
* @type {string}
*/
from: string,
/**
* Message delay in milliseconds (for delayed messages). Optional.
*
* @type {number}
*/
delay?: number
};
/**
* Message queue options
*/
export type IMQOptions = {
/**
* Message queue network host
*
* @type {string}
*/
host: string,
/**
* Message queue network port
*
* @type {number}
*/
port: number,
/**
* Message queue vendor
*
* @type {string}
*/
vendor?: string,
/**
* Message queue global key prefix (namespace)
*
* @type {string}
*/
prefix?: string,
/**
* Logger defined to be used within message queue in runtime
*
* @type {ILogger}
*/
logger?: ILogger,
/**
* Watcher check delay period. This is used by a queue watcher
* agent to make sure at least one watcher is available for
* queue operations.
*
* @type {number}
*/
watcherCheckDelay?: number,
useGzip: boolean
/**
* A way to serialize message using compression. Will increase
* load to worker process but can decrease network traffic between worker
* and queue host application
*
* @type {boolean}
*/
useGzip?: boolean,
/**
* Enables/disables safe message delivery. When safe message delivery
* is turned on it will use more complex algorithm for message handling
* by a worker process, guaranteeing that if worker fails the message will
* be delivered to another possible worker anyway. In most cases it
* is not required unless it is required by a system design.
*
* @type {boolean}
*/
safeDelivery?: boolean,
/**
* Time-to-live of worker queues (after this time messages are back to
* main queue for handling if worker died). Only works if safeDelivery
* option enabled.
*
* @type {number}
*/
safeDeliveryTtl?: number
};

@@ -55,12 +181,100 @@

/**
* Generic messaging queue implementation interface
*
* @example
* ~~~typescript
* import { IMessageQueue, EventEmitter, uuid } from 'imq';
*
* class SomeMQAdapter implements IMessageQueue extends EventEmitter {
* public async start(): Promise<SomeMQAdapter> {
* // ... implementation goes here
* return this;
* }
* public async stop(): Promise<SomeMQAdapter> {
* // ... implementation goes here
* return this;
* }
* public async send(
* toQueue: string,
* message: IJson,
* delay?: number
* ): Promise<string> {
* const messageId = uuid();
* // ... implementation goes here
* return messageId;
* }
* public async destroy(): Promise<void> {
* // ... implementation goes here
* }
* public async clear(): Promise<SomeMQAdapter> {
* // ... implementation goes here
* return this;
* }
* }
* ~~~
*/
export interface IMessageQueue extends EventEmitter {
/**
* @event message (message: IJson, id: string, from: string)
* Message event. Occurs every time queue got the message.
*
* @event IMessageQueue#message
* @type {IJson} message - message data
* @type {string} id - message identifier
* @type {string} from - source queue produced the message
*/
/**
* Message event. Occurs every time queue got the message.
*
* @event IMessageQueue#error
* @type {Error} err - error caught by message queue
* @type {string} code - message queue error code
*/
/**
* Starts the messaging queue.
* Supposed to be an async function.
*
* @returns {Promise<IMessageQueue>}
*/
start(): Promise<IMessageQueue>;
/**
* Stops the queue (should stop handle queue messages).
* Supposed to be an async function.
*
* @returns {Promise<IMessageQueue>}
*/
stop(): Promise<IMessageQueue>;
/**
* Sends a message to given queue name with the given data.
* Supposed to be an async function.
*
* @param {string} toQueue - queue name to which message should be sent to
* @param {IJson} message - message data
* @param {number} [delay] - if specified, message will be handled in the
* target queue after specified period of time
* in milliseconds.
* @returns {Promise<string>} - message identifier
*/
send(toQueue: string, message: IJson, delay?: number): Promise<string>;
destroy(): void;
/**
* Safely destroys current queue, unregistering all set event
* listeners and connections.
* Supposed to be an async function.
*
* @returns {Promise<void>}
*/
destroy(): Promise<void>;
/**
* Clears queue data in queue host application.
* Supposed to be an async function.
*
* @returns {Promise<IMessageQueue>}
*/
clear(): Promise<IMessageQueue>;
}

@@ -25,3 +25,4 @@ "use strict";

__export(require("./redis"));
__export(require("./IMessageQueue"));
__export(require("./RedisQueue"));
//# sourceMappingURL=index.js.map

@@ -42,2 +42,3 @@ /// <reference types="node" />

private signalsInitialized;
private safeCheckInterval;
/**

@@ -116,2 +117,10 @@ * @type {IRedisClient}

/**
* Unreliable but fast way of message handling by the queue
*/
private readUnsafe();
/**
* Reliable but slow method of message handling by message queue
*/
private readSafe();
/**
* Initializes read process on redis message queue

@@ -118,0 +127,0 @@ *

@@ -49,3 +49,5 @@ "use strict";

watcherCheckDelay: 5000,
useGzip: false
useGzip: false,
safeDelivery: false,
safeDeliveryTtl: 5000
};

@@ -218,2 +220,4 @@ /**

// istanbul ignore next
this.emit('error', err, 'OnMessage');
// istanbul ignore next
this.logger.error('RedisQueue message is invalid:', err);

@@ -246,5 +250,4 @@ }

return __awaiter(this, void 0, void 0, function* () {
const self = this;
if (this.scripts.moveDelayed.checksum) {
yield self.writer.evalsha(this.scripts.moveDelayed.checksum, 2, `${key}:delayed`, key, Date.now());
yield this.writer.evalsha(this.scripts.moveDelayed.checksum, 2, `${key}:delayed`, key, Date.now());
}

@@ -261,13 +264,13 @@ });

watch() {
const self = this;
if (!self.watcher || self.watcher.__ready__) {
if (!this.watcher || this.watcher.__ready__) {
return this;
}
try {
self.writer.config('set', 'notify-keyspace-events', 'Ex');
this.writer.config('set', 'notify-keyspace-events', 'Ex');
}
catch (err) {
this.emit('error', err, 'OnConfig');
this.logger.error('RedisQueue events error:', err);
}
self.watcher.on('pmessage', (...args) => __awaiter(this, void 0, void 0, function* () {
this.watcher.on('pmessage', (...args) => __awaiter(this, void 0, void 0, function* () {
try {

@@ -282,22 +285,54 @@ const key = args.pop().split(':');

catch (err) {
this.emit('error', err, 'OnWatch');
this.logger.error('RedisQueue watch error:', err);
}
}));
self.watcher.psubscribe('__keyevent@0__:expired', `${this.options.prefix}:delayed:*`);
self.watcher.__ready__ = true;
this.watcher.psubscribe('__keyevent@0__:expired', `${this.options.prefix}:delayed:*`);
// watch for expired unhandled safe queues
if (this.options.safeDelivery && !this.safeCheckInterval) {
this.safeCheckInterval = setInterval(() => __awaiter(this, void 0, void 0, function* () {
if (!this.writer) {
clearInterval(this.safeCheckInterval);
delete this.safeCheckInterval;
return;
}
const now = Date.now();
let cursor = '0';
while (true) {
try {
const data = yield this.writer.scan(cursor, 'match', `${this.options.prefix}:*:worker:*`, 'count', '1000');
cursor = data.shift();
const keys = data.shift() || [];
if (keys.length) {
for (let key of keys) {
const kp = key.split(':');
if (Number(kp.pop()) >= now) {
const qKey = `${kp.shift()}:${kp.shift()}`;
yield this.writer.rpoplpush(key, qKey);
}
}
}
if (cursor === '0') {
return;
}
}
catch (err) {
this.emit('error', err, 'OnSafeDelivery');
clearInterval(this.safeCheckInterval);
delete this.safeCheckInterval;
return;
}
}
}), this.options.safeDeliveryTtl);
}
this.watcher.__ready__ = true;
return this;
}
/**
* Initializes read process on redis message queue
*
* @returns {RedisQueue}
* Unreliable but fast way of message handling by the queue
*/
read() {
// istanbul ignore next
if (!this.reader) {
this.logger.error('Reader connection is not initialized!');
return this;
}
readUnsafe() {
process.nextTick(() => __awaiter(this, void 0, void 0, function* () {
try {
const key = this.key;
while (true) {

@@ -307,3 +342,3 @@ if (!this.reader) {

}
const msg = yield this.reader.brpop(this.key, 0);
const msg = yield this.reader.brpop(key, 0);
this.process(msg);

@@ -314,5 +349,51 @@ }

// istanbul ignore next
this.logger.error('RedisQueue reader failed:', err);
this.emit('error', err, 'OnReadUnsafe');
// istanbul ignore next
this.logger.error('RedisQueue unsafe reader failed:', err);
}
}));
}
/**
* Reliable but slow method of message handling by message queue
*/
readSafe() {
process.nextTick(() => __awaiter(this, void 0, void 0, function* () {
try {
const key = this.key;
while (true) {
const expire = Date.now() +
Number(this.options.safeDeliveryTtl);
const workerKey = `${key}:worker:${_1.uuid()}:${expire}`;
if (!this.reader || !this.writer) {
break;
}
yield this.reader.brpoplpush(this.key, workerKey, 0);
const msg = yield this.writer.lrange(workerKey, -1, 1);
this.process([key, msg]);
yield this.writer.del(workerKey);
}
}
catch (err) {
// istanbul ignore next
this.emit('error', err, 'OnReadSafe');
// istanbul ignore next
this.logger.error('RedisQueue safe reader failed:', err);
}
}));
}
/**
* Initializes read process on redis message queue
*
* @returns {RedisQueue}
*/
read() {
// istanbul ignore next
if (!this.reader) {
this.logger.error('Reader connection is not initialized!');
return this;
}
const readMethod = this.options.safeDelivery
? 'readSafe'
: 'readUnsafe';
this[readMethod]();
return this;

@@ -372,2 +453,3 @@ }

catch (err) {
this.emit('error', err, 'OnScriptLoad');
this.logger.error('Script load error:', err);

@@ -415,3 +497,2 @@ }

process.on('SIGINT', free);
process.on('exit', free);
this.signalsInitialized = true;

@@ -426,3 +507,5 @@ }

this.read();
this.processDelayed(this.key).catch();
this.processDelayed(this.key).catch(
// istanbul ignore next
(err) => this.emit('error', err, 'OnProcessDelayed'));
this.initialized = true;

@@ -469,5 +552,7 @@ return this;

return __awaiter(this, void 0, void 0, function* () {
const self = this;
self.reader && self.reader.unref();
delete self.reader;
if (this.reader) {
this.reader.removeAllListeners();
this.reader.unref();
delete this.reader;
}
this.initialized = false;

@@ -490,5 +575,9 @@ return this;

}
if (this.safeCheckInterval) {
clearInterval(this.safeCheckInterval);
delete this.safeCheckInterval;
}
if (this.watcher) {
this.watcher.removeAllListeners();
this.watcher.unref();
this.watcher.removeAllListeners();
delete this.watcher;

@@ -499,2 +588,3 @@ }

if (this.writer) {
this.writer.removeAllListeners();
this.writer.unref();

@@ -501,0 +591,0 @@ delete RedisQueue.writer;

@@ -41,3 +41,5 @@ /*!

watcherCheckDelay: 5000,
useGzip: false
useGzip: false,
safeDelivery: false,
safeDeliveryTtl: 5000
};

@@ -103,2 +105,4 @@

private safeCheckInterval: any;
/**

@@ -262,2 +266,4 @@ * @type {IRedisClient}

// istanbul ignore next
this.emit('error', err, 'OnMessage');
// istanbul ignore next
this.logger.error('RedisQueue message is invalid:', err);

@@ -292,6 +298,4 @@ }

private async processDelayed(key: string) {
const self: any = this;
if (this.scripts.moveDelayed.checksum) {
await self.writer.evalsha(
await this.writer.evalsha(
this.scripts.moveDelayed.checksum,

@@ -311,5 +315,3 @@ 2, `${key}:delayed`, key, Date.now()

private watch() {
const self: any = this;
if (!self.watcher || self.watcher.__ready__) {
if (!this.watcher || this.watcher.__ready__) {
return this;

@@ -319,10 +321,11 @@ }

try {
self.writer.config('set', 'notify-keyspace-events', 'Ex');
this.writer.config('set', 'notify-keyspace-events', 'Ex');
}
catch (err) {
this.emit('error', err, 'OnConfig');
this.logger.error('RedisQueue events error:', err);
}
self.watcher.on('pmessage', async (...args: any[]) => {
this.watcher.on('pmessage', async (...args: any[]) => {
try {

@@ -340,2 +343,3 @@ const key = args.pop().split(':');

catch (err) {
this.emit('error', err, 'OnWatch');
this.logger.error('RedisQueue watch error:', err);

@@ -345,3 +349,3 @@ }

self.watcher.psubscribe(
this.watcher.psubscribe(
'__keyevent@0__:expired',

@@ -351,4 +355,54 @@ `${this.options.prefix}:delayed:*`

self.watcher.__ready__ = true;
// watch for expired unhandled safe queues
if (this.options.safeDelivery && !this.safeCheckInterval) {
this.safeCheckInterval = setInterval(async () => {
if (!this.writer) {
clearInterval(this.safeCheckInterval);
delete this.safeCheckInterval;
return ;
}
const now = Date.now();
let cursor: string = '0';
while (true) {
try {
const data: Array<[string, string[]]> =
<any>await this.writer.scan(
cursor, 'match',
`${this.options.prefix}:*:worker:*`,
'count', '1000'
);
cursor = <any>data.shift();
const keys: string[] = <any>data.shift() || [];
if (keys.length) {
for (let key of keys) {
const kp: string[] = key.split(':');
if (Number(kp.pop()) >= now) {
const qKey = `${kp.shift()}:${kp.shift()}`;
await this.writer.rpoplpush(key, qKey);
}
}
}
if (cursor === '0') {
return ;
}
}
catch (err) {
this.emit('error', err, 'OnSafeDelivery');
clearInterval(this.safeCheckInterval);
delete this.safeCheckInterval;
return ;
}
}
}, this.options.safeDeliveryTtl);
}
this.watcher.__ready__ = true;
return this;

@@ -358,15 +412,9 @@ }

/**
* Initializes read process on redis message queue
*
* @returns {RedisQueue}
* Unreliable but fast way of message handling by the queue
*/
private read(): RedisQueue {
// istanbul ignore next
if (!this.reader) {
this.logger.error('Reader connection is not initialized!');
return this;
}
private readUnsafe() {
process.nextTick(async () => {
try {
const key = this.key;
while (true) {

@@ -377,3 +425,3 @@ if (!this.reader) {

const msg: any = await this.reader.brpop(this.key, 0);
const msg: any = await this.reader.brpop(key, 0);
this.process(msg);

@@ -385,6 +433,64 @@ }

// istanbul ignore next
this.logger.error('RedisQueue reader failed:', err);
this.emit('error', err, 'OnReadUnsafe');
// istanbul ignore next
this.logger.error('RedisQueue unsafe reader failed:', err);
}
});
}
/**
* Reliable but slow method of message handling by message queue
*/
private readSafe() {
process.nextTick(async () => {
try {
const key = this.key;
while (true) {
const expire: number = Date.now() +
Number(this.options.safeDeliveryTtl);
const workerKey = `${key}:worker:${uuid()}:${expire}`;
if (!this.reader || !this.writer) {
break;
}
await this.reader.brpoplpush(this.key, workerKey, 0);
const msg: any = await this.writer.lrange(
workerKey, -1, 1
);
this.process([key, msg]);
await this.writer.del(workerKey);
}
}
catch (err) {
// istanbul ignore next
this.emit('error', err, 'OnReadSafe');
// istanbul ignore next
this.logger.error('RedisQueue safe reader failed:', err);
}
});
}
/**
* Initializes read process on redis message queue
*
* @returns {RedisQueue}
*/
private read(): RedisQueue {
// istanbul ignore next
if (!this.reader) {
this.logger.error('Reader connection is not initialized!');
return this;
}
const readMethod = this.options.safeDelivery
? 'readSafe'
: 'readUnsafe';
this[readMethod]();
return this;

@@ -431,2 +537,3 @@ }

const owned = await this.lock();
if (owned) {

@@ -451,2 +558,3 @@ Object.keys(this.scripts).forEach(async (script: string) => {

catch (err) {
this.emit('error', err, 'OnScriptLoad');
this.logger.error('Script load error:', err);

@@ -507,3 +615,2 @@ }

process.on('SIGINT', free);
process.on('exit', free);

@@ -523,3 +630,6 @@ this.signalsInitialized = true;

this.processDelayed(this.key).catch();
this.processDelayed(this.key).catch(
// istanbul ignore next
(err) => this.emit('error', err, 'OnProcessDelayed')
);

@@ -580,7 +690,8 @@ this.initialized = true;

public async stop(): Promise<RedisQueue> {
const self = this;
if (this.reader) {
this.reader.removeAllListeners();
this.reader.unref();
delete this.reader;
}
self.reader && self.reader.unref();
delete self.reader;
this.initialized = false;

@@ -606,5 +717,10 @@

if (this.safeCheckInterval) {
clearInterval(this.safeCheckInterval);
delete this.safeCheckInterval;
}
if (this.watcher) {
this.watcher.removeAllListeners();
this.watcher.unref();
this.watcher.removeAllListeners();
delete this.watcher;

@@ -617,2 +733,3 @@ }

if (this.writer) {
this.writer.removeAllListeners();
this.writer.unref();

@@ -619,0 +736,0 @@ delete RedisQueue.writer;

@@ -10,3 +10,2 @@ /// <reference types="node" />

private __name;
private __zsets;
constructor(...args: any[]);

@@ -17,2 +16,5 @@ set(...args: any[]): void;

brpop(key: string, timeout?: number, cb?: Function): void;
brpoplpush(from: string, to: string, timeout: number, cb?: Function): void;
lrange(key: string, start: number, stop: number, cb?: Function): void;
scan(...args: any[]): (string | string[])[];
script(...args: any[]): void;

@@ -26,2 +28,3 @@ client(...args: any[]): any;

unref(...args: any[]): void;
config(...args: any[]): void;
}

@@ -28,0 +31,0 @@ export declare class RedisMultiMock {

@@ -35,3 +35,2 @@ "use strict";

this.__name = '';
this.__zsets = {};
setTimeout(() => {

@@ -73,5 +72,34 @@ this.emit('ready', this);

else {
cb && cb(null, [key, q.pop()]);
cb && cb(null, [key, q.shift()]);
}
}
brpoplpush(from, to, timeout, cb) {
const fromQ = RedisClientMock.__queues__[from] =
RedisClientMock.__queues__[from] || [];
const toQ = RedisClientMock.__queues__[to] =
RedisClientMock.__queues__[to] || [];
if (!fromQ.length) {
this.__rt && clearTimeout(this.__rt);
this.__rt = setTimeout(() => this.brpoplpush(from, to, timeout, cb), timeout || 100);
}
else {
toQ.push(fromQ.shift());
cb && cb(null, '1');
}
}
lrange(key, start, stop, cb) {
const q = RedisClientMock.__queues__[key] =
RedisClientMock.__queues__[key] || [];
cb && cb(null, q.splice(start, stop)[0]);
}
scan(...args) {
const qs = RedisClientMock.__queues__;
const found = [];
for (let q of Object.keys(qs)) {
if (q.match(/worker/)) {
found.push(q);
}
}
return ['0', found];
}
script(...args) {

@@ -125,2 +153,6 @@ const cmd = args.shift();

}
if (self.__queues__[key] !== undefined) {
delete self.__queues__[key];
count++;
}
}

@@ -144,2 +176,3 @@ cb(null, count);

}
config(...args) { }
}

@@ -146,0 +179,0 @@ RedisClientMock.__queues__ = {};

@@ -35,3 +35,2 @@ /*!

private __name: string = '';
private __zsets: any = {};

@@ -81,6 +80,39 @@ constructor(...args: any[]) {

} else {
cb && cb(null, [key, q.pop()]);
cb && cb(null, [key, q.shift()]);
}
}
brpoplpush(from: string, to: string, timeout: number, cb?: Function) {
const fromQ = RedisClientMock.__queues__[from] =
RedisClientMock.__queues__[from] || [];
const toQ = RedisClientMock.__queues__[to] =
RedisClientMock.__queues__[to] || [];
if (!fromQ.length) {
this.__rt && clearTimeout(this.__rt);
this.__rt = setTimeout(() => this.brpoplpush(
from, to, timeout, cb
), timeout || 100);
} else {
toQ.push(fromQ.shift());
cb && cb(null, '1');
}
}
lrange(key: string, start: number, stop: number, cb?: Function) {
const q = RedisClientMock.__queues__[key] =
RedisClientMock.__queues__[key] || [];
cb && cb(null, q.splice(start, stop)[0]);
}
scan(...args: any[]) {
const qs = RedisClientMock.__queues__;
const found: string[] = [];
for (let q of Object.keys(qs)) {
if (q.match(/worker/)) {
found.push(q);
}
}
return ['0', found];
}
script(...args: any[]) {

@@ -140,2 +172,6 @@ const cmd = args.shift();

}
if (self.__queues__[key] !== undefined) {
delete self.__queues__[key];
count++;
}
}

@@ -161,2 +197,4 @@ cb(null, count);

}
config(...args: any[]) {}
}

@@ -163,0 +201,0 @@

@@ -143,2 +143,17 @@ "use strict";

});
it('should guaranty message delivery if safeDelivery is on', (done) => {
// it is hard to emulate mq crash at a certain time of
// its runtime execution, so we simply assume delivery works itself
// for the moment. dumb test but better than nothing :(
const message = { hello: 'safe delivery' };
const rq = new src_1.RedisQueue('IMQSafe', {
logger: mocks_1.logger, safeDelivery: true
});
rq.on('message', (msg) => {
chai_1.expect(msg).to.deep.equal(message);
rq.destroy();
done();
});
rq.start().then(() => __awaiter(this, void 0, void 0, function* () { return rq.send('IMQSafe', message); }));
});
it('should deliver message with the given delay', (done) => {

@@ -145,0 +160,0 @@ const message = { hello: 'world' };

@@ -139,2 +139,20 @@ /*!

it('should guaranty message delivery if safeDelivery is on', (done) => {
// it is hard to emulate mq crash at a certain time of
// its runtime execution, so we simply assume delivery works itself
// for the moment. dumb test but better than nothing :(
const message: any = { hello: 'safe delivery' };
const rq = new RedisQueue('IMQSafe', {
logger, safeDelivery: true
});
rq.on('message', (msg) => {
expect(msg).to.deep.equal(message);
rq.destroy();
done();
});
rq.start().then(async () => rq.send('IMQSafe', message));
});
it('should deliver message with the given delay', (done) => {

@@ -141,0 +159,0 @@ const message: any = { hello: 'world' };

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc