Socket
Socket
Sign inDemoInstall

rsmq

Package Overview
Dependencies
Maintainers
1
Versions
44
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

rsmq - npm Package Compare versions

Comparing version 0.10.0 to 0.11.0

7

CHANGELOG.md
# CHANGELOG for rsmq
## 0.11.0
* Refactored all code from coffeescript to typescript.
* Added tests for #90
* Removed support for Node 4
* Support for `password` option when connecting to Redis (#70)
## 0.10.0

@@ -4,0 +11,0 @@

1162

index.js

@@ -1,635 +0,557 @@

// Generated by CoffeeScript 1.12.7
/*
rsmq
A Really Simple Message Queue based on Redis
The MIT License (MIT)
Copyright © 2013-2018 Patrick Liess, http://www.tcs.de
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the “Software”), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
var EventEmitter, RedisInst, RedisSMQ, _, crypto,
bind = function(fn, me){ return function(){ return fn.apply(me, arguments); }; },
extend = function(child, parent) { for (var key in parent) { if (hasProp.call(parent, key)) child[key] = parent[key]; } function ctor() { this.constructor = child; } ctor.prototype = parent.prototype; child.prototype = new ctor(); child.__super__ = parent.prototype; return child; },
hasProp = {}.hasOwnProperty,
slice = [].slice;
crypto = require("crypto");
_ = require("lodash");
RedisInst = require("redis");
EventEmitter = require("events").EventEmitter;
RedisSMQ = (function(superClass) {
extend(RedisSMQ, superClass);
function RedisSMQ(options) {
var opts, ref, ref1;
if (options == null) {
options = {};
}
this._initErrors = bind(this._initErrors, this);
this._handleError = bind(this._handleError, this);
this.setQueueAttributes = bind(this.setQueueAttributes, this);
this.sendMessage = bind(this.sendMessage, this);
this._receiveMessage = bind(this._receiveMessage, this);
this._popMessage = bind(this._popMessage, this);
this.receiveMessage = bind(this.receiveMessage, this);
this.popMessage = bind(this.popMessage, this);
this.listQueues = bind(this.listQueues, this);
this.getQueueAttributes = bind(this.getQueueAttributes, this);
this.deleteQueue = bind(this.deleteQueue, this);
this.deleteMessage = bind(this.deleteMessage, this);
this.createQueue = bind(this.createQueue, this);
this._changeMessageVisibility = bind(this._changeMessageVisibility, this);
this.changeMessageVisibility = bind(this.changeMessageVisibility, this);
this._getQueue = bind(this._getQueue, this);
this.quit = bind(this.quit, this);
this.asyncify = bind(this.asyncify, this);
if (Promise) {
_.forEach(["changeMessageVisibility", "createQueue", "deleteMessage", "deleteQueue", "getQueueAttributes", "listQueues", "popMessage", "receiveMessage", "sendMessage", "setQueueAttributes"], this.asyncify);
}
opts = _.extend({
host: "127.0.0.1",
port: 6379,
options: {},
client: null,
ns: "rsmq",
realtime: false
}, options);
this.realtime = opts.realtime;
this.redisns = opts.ns + ":";
if (((ref = opts.client) != null ? (ref1 = ref.constructor) != null ? ref1.name : void 0 : void 0) === "RedisClient") {
this.redis = opts.client;
} else {
this.redis = RedisInst.createClient(opts.port, opts.host, opts.options);
}
this.connected = this.redis.connected || false;
if (this.connected) {
this.emit("connect");
this.initScript();
}
this.redis.on("connect", (function(_this) {
return function() {
_this.connected = true;
_this.emit("connect");
_this.initScript();
};
})(this));
this.redis.on("error", (function(_this) {
return function(err) {
if (err.message.indexOf("ECONNREFUSED")) {
_this.connected = false;
_this.emit("disconnect");
} else {
console.error("Redis ERROR", err);
_this.emit("error");
}
};
})(this));
this._initErrors();
return;
}
RedisSMQ.prototype.asyncify = function(methodKey) {
var asyncMethodKey;
asyncMethodKey = methodKey + "Async";
this[asyncMethodKey] = (function(_this) {
return function() {
var args;
args = 1 <= arguments.length ? slice.call(arguments, 0) : [];
return new Promise(function(resolve, reject) {
return _this[methodKey].apply(_this, slice.call(args).concat([function(err, result) {
if (err) {
reject(err);
return;
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
const _ = require("lodash");
const RedisInst = require("redis");
const EventEmitter = require("events").EventEmitter;
class RedisSMQ extends EventEmitter {
constructor(options = {}) {
super(options);
this.asyncify = (methodKey) => {
const asyncMethodKey = methodKey + "Async";
this[asyncMethodKey] = (...args) => {
return new Promise((resolve, reject) => {
this[methodKey](...args, (err, result) => {
if (err) {
reject(err);
return;
}
resolve(result);
});
});
};
};
this.quit = () => {
this.redis.quit();
};
this._getQueue = (qname, uid, cb) => {
const mc = [
["hmget", `${this.redisns}${qname}:Q`, "vt", "delay", "maxsize"],
["time"]
];
this.redis.multi(mc).exec((err, resp) => {
if (err) {
this._handleError(cb, err);
return;
}
if (resp[0][0] === null || resp[0][1] === null || resp[0][2] === null) {
this._handleError(cb, "queueNotFound");
return;
}
const ms = this._formatZeroPad(Number(resp[1][1]), 6);
const ts = Number(resp[1][0] + ms.toString(10).slice(0, 3));
const q = {
vt: parseInt(resp[0][0], 10),
delay: parseInt(resp[0][1], 10),
maxsize: parseInt(resp[0][2], 10),
ts: ts
};
if (uid) {
uid = this._makeid(22);
q.uid = Number(resp[1][0] + ms).toString(36) + uid;
}
cb(null, q);
});
};
this.changeMessageVisibility = (options, cb) => {
if (this._validate(options, ["qname", "id", "vt"], cb) === false)
return;
this._getQueue(options.qname, false, (err, q) => {
if (err) {
this._handleError(cb, err);
return;
}
if (this.changeMessageVisibility_sha1) {
this._changeMessageVisibility(options, q, cb);
return;
}
this.on("scriptload:changeMessageVisibility", () => {
this._changeMessageVisibility(options, q, cb);
});
});
};
this._changeMessageVisibility = (options, q, cb) => {
this.redis.evalsha(this.changeMessageVisibility_sha1, 3, `${this.redisns}${options.qname}`, options.id, q.ts + options.vt * 1000, (err, resp) => {
if (err) {
this._handleError(cb, err);
return;
}
cb(null, resp);
});
};
this.createQueue = (options, cb) => {
const key = `${this.redisns}${options.qname}:Q`;
options.vt = options.vt != null ? options.vt : 30;
options.delay = options.delay != null ? options.delay : 0;
options.maxsize = options.maxsize != null ? options.maxsize : 65536;
if (this._validate(options, ["qname", "vt", "delay", "maxsize"], cb) === false)
return;
this.redis.time((err, resp) => {
if (err) {
this._handleError(cb, err);
return;
}
const mc = [
["hsetnx", key, "vt", options.vt],
["hsetnx", key, "delay", options.delay],
["hsetnx", key, "maxsize", options.maxsize],
["hsetnx", key, "created", resp[0]],
["hsetnx", key, "modified", resp[0]],
];
this.redis.multi(mc).exec((err, resp) => {
if (err) {
this._handleError(cb, err);
return;
}
if (resp[0] === 0) {
this._handleError(cb, "queueExists");
return;
}
this.redis.sadd(`${this.redisns}QUEUES`, options.qname, (err, resp) => {
if (err) {
this._handleError(cb, err);
return;
}
cb(null, 1);
});
});
});
};
this.deleteMessage = (options, cb) => {
if (this._validate(options, ["qname", "id"], cb) === false)
return;
const key = `${this.redisns}${options.qname}`;
const mc = [
["zrem", key, options.id],
["hdel", `${key}:Q`, `${options.id}`, `${options.id}:rc`, `${options.id}:fr`]
];
this.redis.multi(mc).exec((err, resp) => {
if (err) {
this._handleError(cb, err);
return;
}
if (resp[0] === 1 && resp[1] > 0) {
cb(null, 1);
}
else {
cb(null, 0);
}
});
};
this.deleteQueue = (options, cb) => {
if (this._validate(options, ["qname"], cb) === false)
return;
const key = `${this.redisns}${options.qname}`;
const mc = [
["del", `${key}:Q`, key],
["srem", `${this.redisns}QUEUES`, options.qname]
];
this.redis.multi(mc).exec((err, resp) => {
if (err) {
this._handleError(cb, err);
return;
}
if (resp[0] === 0) {
this._handleError(cb, "queueNotFound");
return;
}
cb(null, 1);
});
};
this.getQueueAttributes = (options, cb) => {
if (this._validate(options, ["qname"], cb) === false)
return;
const key = `${this.redisns}${options.qname}`;
this.redis.time((err, resp) => {
if (err) {
this._handleError(cb, err);
return;
}
const mc = [
["hmget", `${key}:Q`, "vt", "delay", "maxsize", "totalrecv", "totalsent", "created", "modified"],
["zcard", key],
["zcount", key, resp[0] + "000", "+inf"]
];
this.redis.multi(mc).exec((err, resp) => {
if (err) {
this._handleError(cb, err);
return;
}
if (resp[0][0] === null) {
this._handleError(cb, "queueNotFound");
return;
}
const o = {
vt: parseInt(resp[0][0], 10),
delay: parseInt(resp[0][1], 10),
maxsize: parseInt(resp[0][2], 10),
totalrecv: parseInt(resp[0][3], 10) || 0,
totalsent: parseInt(resp[0][4], 10) || 0,
created: parseInt(resp[0][5], 10),
modified: parseInt(resp[0][6], 10),
msgs: resp[1],
hiddenmsgs: resp[2]
};
cb(null, o);
});
});
};
this._handleReceivedMessage = (cb) => {
return (err, resp) => {
if (err) {
this._handleError(cb, err);
return;
}
if (!resp.length) {
cb(null, {});
return;
}
const o = {
id: resp[0],
message: resp[1],
rc: resp[2],
fr: Number(resp[3]),
sent: Number(parseInt(resp[0].slice(0, 10), 36) / 1000)
};
cb(null, o);
};
};
this.initScript = () => {
const script_popMessage = `local msg = redis.call("ZRANGEBYSCORE", KEYS[1], "-inf", KEYS[2], "LIMIT", "0", "1")
if #msg == 0 then
return {}
end
redis.call("HINCRBY", KEYS[1] .. ":Q", "totalrecv", 1)
local mbody = redis.call("HGET", KEYS[1] .. ":Q", msg[1])
local rc = redis.call("HINCRBY", KEYS[1] .. ":Q", msg[1] .. ":rc", 1)
local o = {msg[1], mbody, rc}
if rc==1 then
table.insert(o, KEYS[2])
else
local fr = redis.call("HGET", KEYS[1] .. ":Q", msg[1] .. ":fr")
table.insert(o, fr)
end
redis.call("ZREM", KEYS[1], msg[1])
redis.call("HDEL", KEYS[1] .. ":Q", msg[1], msg[1] .. ":rc", msg[1] .. ":fr")
return o`;
const script_receiveMessage = `local msg = redis.call("ZRANGEBYSCORE", KEYS[1], "-inf", KEYS[2], "LIMIT", "0", "1")
if #msg == 0 then
return {}
end
redis.call("ZADD", KEYS[1], KEYS[3], msg[1])
redis.call("HINCRBY", KEYS[1] .. ":Q", "totalrecv", 1)
local mbody = redis.call("HGET", KEYS[1] .. ":Q", msg[1])
local rc = redis.call("HINCRBY", KEYS[1] .. ":Q", msg[1] .. ":rc", 1)
local o = {msg[1], mbody, rc}
if rc==1 then
redis.call("HSET", KEYS[1] .. ":Q", msg[1] .. ":fr", KEYS[2])
table.insert(o, KEYS[2])
else
local fr = redis.call("HGET", KEYS[1] .. ":Q", msg[1] .. ":fr")
table.insert(o, fr)
end
return o`;
const script_changeMessageVisibility = `local msg = redis.call("ZSCORE", KEYS[1], KEYS[2])
if not msg then
return 0
end
redis.call("ZADD", KEYS[1], KEYS[3], KEYS[2])
return 1`;
this.redis.script("load", script_popMessage, (err, resp) => {
if (err) {
console.log(err);
return;
}
this.popMessage_sha1 = resp;
this.emit("scriptload:popMessage");
});
this.redis.script("load", script_receiveMessage, (err, resp) => {
if (err) {
console.log(err);
return;
}
this.receiveMessage_sha1 = resp;
this.emit("scriptload:receiveMessage");
});
this.redis.script("load", script_changeMessageVisibility, (err, resp) => {
if (err) {
console.log(err);
return;
}
this.changeMessageVisibility_sha1 = resp;
this.emit('scriptload:changeMessageVisibility');
});
};
this.listQueues = (cb) => {
this.redis.smembers(`${this.redisns}QUEUES`, (err, resp) => {
if (err) {
this._handleError(cb, err);
return;
}
cb(null, resp);
});
};
this.popMessage = (options, cb) => {
if (this._validate(options, ["qname"], cb) === false)
return;
this._getQueue(options.qname, false, (err, q) => {
if (err) {
this._handleError(cb, err);
return;
}
if (this.popMessage_sha1) {
this._popMessage(options, q, cb);
return;
}
this.on("scriptload:popMessage", () => {
this._popMessage(options, q, cb);
});
});
};
this.receiveMessage = (options, cb) => {
if (this._validate(options, ["qname"], cb) === false)
return;
this._getQueue(options.qname, false, (err, q) => {
if (err) {
this._handleError(cb, err);
return;
}
options.vt = options.vt != null ? options.vt : q.vt;
if (this._validate(options, ["vt"], cb) === false)
return;
if (this.receiveMessage_sha1) {
this._receiveMessage(options, q, cb);
return;
}
this.on("scriptload:receiveMessage", () => {
this._receiveMessage(options, q, cb);
});
});
};
this._popMessage = (options, q, cb) => {
this.redis.evalsha(this.popMessage_sha1, 2, `${this.redisns}${options.qname}`, q.ts, this._handleReceivedMessage(cb));
};
this._receiveMessage = (options, q, cb) => {
this.redis.evalsha(this.receiveMessage_sha1, 3, `${this.redisns}${options.qname}`, q.ts, q.ts + options.vt * 1000, this._handleReceivedMessage(cb));
};
this.sendMessage = (options, cb) => {
if (this._validate(options, ["qname"], cb) === false)
return;
this._getQueue(options.qname, true, (err, q) => {
if (err) {
this._handleError(cb, err);
return;
}
options.delay = options.delay != null ? options.delay : q.delay;
if (this._validate(options, ["delay"], cb) === false)
return;
if (typeof options.message !== "string") {
this._handleError(cb, "messageNotString");
return;
}
if (q.maxsize !== -1 && options.message.length > q.maxsize) {
this._handleError(cb, "messageTooLong");
return;
}
const key = `${this.redisns}${options.qname}`;
const mc = [
["zadd", key, q.ts + options.delay * 1000, q.uid],
["hset", `${key}:Q`, q.uid, options.message],
["hincrby", `${key}:Q`, "totalsent", 1]
];
if (this.realtime) {
mc.push(["zcard", key]);
}
this.redis.multi(mc).exec((err, resp) => {
if (err) {
this._handleError(cb, err);
return;
}
if (this.realtime) {
this.redis.publish(`${this.redisns}rt:${options.qname}`, resp[3]);
}
cb(null, q.uid);
});
});
};
this.setQueueAttributes = (options, cb) => {
const props = ["vt", "maxsize", "delay"];
let k = [];
for (let item of props) {
if (options[item] != null) {
k.push(item);
}
}
resolve(result);
}]));
});
};
})(this);
};
RedisSMQ.prototype.quit = function() {
this.redis.quit();
};
RedisSMQ.prototype._getQueue = function(qname, uid, cb) {
var mc;
mc = [["hmget", "" + this.redisns + qname + ":Q", "vt", "delay", "maxsize"], ["time"]];
this.redis.multi(mc).exec((function(_this) {
return function(err, resp) {
var ms, q, ts;
if (err) {
_this._handleError(cb, err);
return;
}
if (resp[0][0] === null || resp[0][1] === null || resp[0][2] === null) {
_this._handleError(cb, "queueNotFound");
return;
}
ms = _this._formatZeroPad(Number(resp[1][1]), 6);
ts = Number(resp[1][0] + ms.toString(10).slice(0, 3));
q = {
vt: parseInt(resp[0][0], 10),
delay: parseInt(resp[0][1], 10),
maxsize: parseInt(resp[0][2], 10),
ts: ts
if (k.length === 0) {
this._handleError(cb, "noAttributeSupplied");
return;
}
if (this._validate(options, ["qname"].concat(k), cb) === false)
return;
const key = `${this.redisns}${options.qname}`;
this._getQueue(options.qname, false, (err, q) => {
if (err) {
this._handleError(cb, err);
return;
}
this.redis.time((err, resp) => {
if (err) {
this._handleError(cb, err);
return;
}
const mc = [
["hset", `${this.redisns}${options.qname}:Q`, "modified", resp[0]]
];
for (let item of k) {
mc.push(["hset", `${this.redisns}${options.qname}:Q`, item, options[item]]);
}
;
this.redis.multi(mc).exec((err) => {
if (err) {
this._handleError(cb, err);
return;
}
this.getQueueAttributes(options, cb);
});
});
});
};
if (uid) {
uid = _this._makeid(22);
q.uid = Number(resp[1][0] + ms).toString(36) + uid;
}
cb(null, q);
};
})(this));
};
RedisSMQ.prototype.changeMessageVisibility = function(options, cb) {
if (this._validate(options, ["qname", "id", "vt"], cb) === false) {
return;
}
this._getQueue(options.qname, false, (function(_this) {
return function(err, q) {
if (err) {
_this._handleError(cb, err);
return;
}
if (_this.changeMessageVisibility_sha1) {
_this._changeMessageVisibility(options, q, cb);
return;
}
_this.on('scriptload:changeMessageVisibility', function() {
_this._changeMessageVisibility(options, q, cb);
});
};
})(this));
};
RedisSMQ.prototype._changeMessageVisibility = function(options, q, cb) {
this.redis.evalsha(this.changeMessageVisibility_sha1, 3, "" + this.redisns + options.qname, options.id, q.ts + options.vt * 1000, (function(_this) {
return function(err, resp) {
if (err) {
_this._handleError(cb, err);
return;
}
cb(null, resp);
};
})(this));
};
RedisSMQ.prototype.createQueue = function(options, cb) {
var ref, ref1, ref2;
options.vt = (ref = options.vt) != null ? ref : 30;
options.delay = (ref1 = options.delay) != null ? ref1 : 0;
options.maxsize = (ref2 = options.maxsize) != null ? ref2 : 65536;
if (this._validate(options, ["qname", "vt", "delay", "maxsize"], cb) === false) {
return;
}
this.redis.time((function(_this) {
return function(err, resp) {
var mc;
if (err) {
_this._handleError(cb, err);
return;
}
mc = [["hsetnx", "" + _this.redisns + options.qname + ":Q", "vt", options.vt], ["hsetnx", "" + _this.redisns + options.qname + ":Q", "delay", options.delay], ["hsetnx", "" + _this.redisns + options.qname + ":Q", "maxsize", options.maxsize], ["hsetnx", "" + _this.redisns + options.qname + ":Q", "created", resp[0]], ["hsetnx", "" + _this.redisns + options.qname + ":Q", "modified", resp[0]]];
_this.redis.multi(mc).exec(function(err, resp) {
if (err) {
_this._handleError(cb, err);
return;
}
if (resp[0] === 0) {
_this._handleError(cb, "queueExists");
return;
}
_this.redis.sadd(_this.redisns + "QUEUES", options.qname, function(err, resp) {
if (err) {
_this._handleError(cb, err);
return;
this._handleError = (cb, err, data = {}) => {
let _err = null;
if (_.isString(err)) {
_err = new Error();
_err.name = err;
let ref = null;
_err.message = ((ref = this._ERRORS) != null ? typeof ref[err] === "function" ? ref[err](data) : void 0 : void 0) || "unkown";
}
cb(null, 1);
});
});
};
})(this));
};
RedisSMQ.prototype.deleteMessage = function(options, cb) {
var key, mc;
if (this._validate(options, ["qname", "id"], cb) === false) {
return;
}
key = "" + this.redisns + options.qname;
mc = [["zrem", key, options.id], ["hdel", key + ":Q", "" + options.id, options.id + ":rc", options.id + ":fr"]];
this.redis.multi(mc).exec((function(_this) {
return function(err, resp) {
if (err) {
_this._handleError(cb, err);
return;
}
if (resp[0] === 1 && resp[1] > 0) {
cb(null, 1);
} else {
cb(null, 0);
}
};
})(this));
};
RedisSMQ.prototype.deleteQueue = function(options, cb) {
var key, mc;
if (this._validate(options, ["qname"], cb) === false) {
return;
}
key = "" + this.redisns + options.qname;
mc = [["del", key + ":Q"], ["del", key], ["srem", this.redisns + "QUEUES", options.qname]];
this.redis.multi(mc).exec((function(_this) {
return function(err, resp) {
if (err) {
_this._handleError(cb, err);
return;
}
if (resp[0] === 0) {
_this._handleError(cb, "queueNotFound");
return;
}
cb(null, 1);
};
})(this));
};
RedisSMQ.prototype.getQueueAttributes = function(options, cb) {
var key;
if (this._validate(options, ["qname"], cb) === false) {
return;
}
key = "" + this.redisns + options.qname;
this.redis.time((function(_this) {
return function(err, resp) {
var mc;
if (err) {
_this._handleError(cb, err);
return;
}
mc = [["hmget", key + ":Q", "vt", "delay", "maxsize", "totalrecv", "totalsent", "created", "modified"], ["zcard", key], ["zcount", key, resp[0] + "000", "+inf"]];
_this.redis.multi(mc).exec(function(err, resp) {
var o;
if (err) {
_this._handleError(cb, err);
return;
}
if (resp[0][0] === null) {
_this._handleError(cb, "queueNotFound");
return;
}
o = {
vt: parseInt(resp[0][0], 10),
delay: parseInt(resp[0][1], 10),
maxsize: parseInt(resp[0][2], 10),
totalrecv: parseInt(resp[0][3], 10) || 0,
totalsent: parseInt(resp[0][4], 10) || 0,
created: parseInt(resp[0][5], 10),
modified: parseInt(resp[0][6], 10),
msgs: resp[1],
hiddenmsgs: resp[2]
};
cb(null, o);
});
};
})(this));
};
RedisSMQ.prototype._handleReceivedMessage = function(cb) {
return (function(_this) {
return function(err, resp) {
var o;
if (err) {
_this._handleError(cb, err);
return;
}
if (!resp.length) {
cb(null, {});
return;
}
o = {
id: resp[0],
message: resp[1],
rc: resp[2],
fr: Number(resp[3]),
sent: parseInt(parseInt(resp[0].slice(0, 10), 36) / 1000)
else {
_err = err;
}
cb(_err);
};
cb(null, o);
};
})(this);
};
RedisSMQ.prototype.initScript = function(cb) {
var script_changeMessageVisibility, script_popMessage, script_receiveMessage;
script_popMessage = 'local msg = redis.call("ZRANGEBYSCORE", KEYS[1], "-inf", KEYS[2], "LIMIT", "0", "1") if #msg == 0 then return {} end redis.call("HINCRBY", KEYS[1] .. ":Q", "totalrecv", 1) local mbody = redis.call("HGET", KEYS[1] .. ":Q", msg[1]) local rc = redis.call("HINCRBY", KEYS[1] .. ":Q", msg[1] .. ":rc", 1) local o = {msg[1], mbody, rc} if rc==1 then table.insert(o, KEYS[2]) else local fr = redis.call("HGET", KEYS[1] .. ":Q", msg[1] .. ":fr") table.insert(o, fr) end redis.call("ZREM", KEYS[1], msg[1]) redis.call("HDEL", KEYS[1] .. ":Q", msg[1], msg[1] .. ":rc", msg[1] .. ":fr") return o';
script_receiveMessage = 'local msg = redis.call("ZRANGEBYSCORE", KEYS[1], "-inf", KEYS[2], "LIMIT", "0", "1") if #msg == 0 then return {} end redis.call("ZADD", KEYS[1], KEYS[3], msg[1]) redis.call("HINCRBY", KEYS[1] .. ":Q", "totalrecv", 1) local mbody = redis.call("HGET", KEYS[1] .. ":Q", msg[1]) local rc = redis.call("HINCRBY", KEYS[1] .. ":Q", msg[1] .. ":rc", 1) local o = {msg[1], mbody, rc} if rc==1 then redis.call("HSET", KEYS[1] .. ":Q", msg[1] .. ":fr", KEYS[2]) table.insert(o, KEYS[2]) else local fr = redis.call("HGET", KEYS[1] .. ":Q", msg[1] .. ":fr") table.insert(o, fr) end return o';
script_changeMessageVisibility = 'local msg = redis.call("ZSCORE", KEYS[1], KEYS[2]) if not msg then return 0 end redis.call("ZADD", KEYS[1], KEYS[3], KEYS[2]) return 1';
this.redis.script("load", script_popMessage, (function(_this) {
return function(err, resp) {
if (err) {
console.log(err);
return;
this._initErrors = () => {
this._ERRORS = {};
for (let key in this.ERRORS) {
this._ERRORS[key] = _.template(this.ERRORS[key]);
}
};
this._VALID = {
qname: /^([a-zA-Z0-9_-]){1,160}$/,
id: /^([a-zA-Z0-9:]){32}$/
};
this._validate = (o, items, cb) => {
for (let item of items) {
switch (item) {
case "qname":
case "id":
if (!o[item]) {
this._handleError(cb, "missingParameter", { item: item });
return false;
}
o[item] = o[item].toString();
if (!this._VALID[item].test(o[item])) {
this._handleError(cb, "invalidFormat", { item: item });
return false;
}
break;
case "vt":
case "delay":
o[item] = parseInt(o[item], 10);
if (_.isNaN(o[item]) || !_.isNumber(o[item]) || o[item] < 0 || o[item] > 9999999) {
this._handleError(cb, "invalidValue", { item: item, min: 0, max: 9999999 });
return false;
}
break;
case "maxsize":
o[item] = parseInt(o[item], 10);
if (_.isNaN(o[item]) || !_.isNumber(o[item]) || o[item] < 1024 || o[item] > 65536) {
if (o[item] !== -1) {
this._handleError(cb, "invalidValue", { item: item, min: 1024, max: 65536 });
return false;
}
}
break;
}
}
;
return o;
};
this.ERRORS = {
"noAttributeSupplied": "No attribute was supplied",
"missingParameter": "No <%= item %> supplied",
"invalidFormat": "Invalid <%= item %> format",
"invalidValue": "<%= item %> must be between <%= min %> and <%= max %>",
"messageNotString": "Message must be a string",
"messageTooLong": "Message too long",
"queueNotFound": "Queue not found",
"queueExists": "Queue exists"
};
if (Promise) {
_.forEach([
"changeMessageVisibility",
"createQueue",
"deleteMessage",
"deleteQueue",
"getQueueAttributes",
"listQueues",
"popMessage",
"receiveMessage",
"sendMessage",
"setQueueAttributes",
], this.asyncify);
}
_this.popMessage_sha1 = resp;
_this.emit('scriptload:popMessage');
};
})(this));
this.redis.script("load", script_receiveMessage, (function(_this) {
return function(err, resp) {
if (err) {
console.log(err);
return;
const opts = _.extend({
host: "127.0.0.1",
port: 6379,
options: {
password: options.password || null
},
client: null,
ns: "rsmq",
realtime: false
}, options);
opts.options.host = opts.host;
opts.options.port = opts.port;
this.realtime = opts.realtime;
this.redisns = opts.ns + ":";
if (opts.client && options.client.constructor.name === "RedisClient") {
this.redis = opts.client;
}
_this.receiveMessage_sha1 = resp;
_this.emit('scriptload:receiveMessage');
};
})(this));
this.redis.script("load", script_changeMessageVisibility, (function(_this) {
return function(err, resp) {
_this.changeMessageVisibility_sha1 = resp;
_this.emit('scriptload:changeMessageVisibility');
};
})(this));
};
RedisSMQ.prototype.listQueues = function(cb) {
this.redis.smembers(this.redisns + "QUEUES", (function(_this) {
return function(err, resp) {
if (err) {
_this._handleError(cb, err);
return;
else {
this.redis = RedisInst.createClient(opts);
}
cb(null, resp);
};
})(this));
};
RedisSMQ.prototype.popMessage = function(options, cb) {
if (this._validate(options, ["qname"], cb) === false) {
return;
}
this._getQueue(options.qname, false, (function(_this) {
return function(err, q) {
if (err) {
_this._handleError(cb, err);
return;
this.connected = this.redis.connected || false;
if (this.connected) {
this.emit("connect");
this.initScript();
}
if (_this.popMessage_sha1) {
_this._popMessage(options, q, cb);
return;
}
_this.on('scriptload:popMessage', function() {
_this._popMessage(options, q, cb);
this.redis.on("connect", () => {
this.connected = true;
this.emit("connect");
this.initScript();
});
};
})(this));
};
RedisSMQ.prototype.receiveMessage = function(options, cb) {
if (this._validate(options, ["qname"], cb) === false) {
return;
}
this._getQueue(options.qname, false, (function(_this) {
return function(err, q) {
var ref;
if (err) {
_this._handleError(cb, err);
return;
}
options.vt = (ref = options.vt) != null ? ref : q.vt;
if (_this._validate(options, ["vt"], cb) === false) {
return;
}
if (_this.receiveMessage_sha1) {
_this._receiveMessage(options, q, cb);
return;
}
_this.on('scriptload:receiveMessage', function() {
_this._receiveMessage(options, q, cb);
this.redis.on("error", (err) => {
if (err.message.indexOf("ECONNREFUSED")) {
this.connected = false;
this.emit("disconnect");
}
else {
console.error("Redis ERROR", err);
this.emit("error");
}
});
};
})(this));
};
RedisSMQ.prototype._popMessage = function(options, q, cb) {
this.redis.evalsha(this.popMessage_sha1, 2, "" + this.redisns + options.qname, q.ts, this._handleReceivedMessage(cb));
};
RedisSMQ.prototype._receiveMessage = function(options, q, cb) {
this.redis.evalsha(this.receiveMessage_sha1, 3, "" + this.redisns + options.qname, q.ts, q.ts + options.vt * 1000, this._handleReceivedMessage(cb));
};
RedisSMQ.prototype.sendMessage = function(options, cb) {
if (this._validate(options, ["qname"], cb) === false) {
return;
this._initErrors();
}
this._getQueue(options.qname, true, (function(_this) {
return function(err, q) {
var key, mc, ref;
if (err) {
_this._handleError(cb, err);
return;
}
options.delay = (ref = options.delay) != null ? ref : q.delay;
if (_this._validate(options, ["delay"], cb) === false) {
return;
}
if (typeof options.message !== "string") {
_this._handleError(cb, "messageNotString");
return;
}
if (q.maxsize !== -1 && options.message.length > q.maxsize) {
_this._handleError(cb, "messageTooLong");
return;
}
key = "" + _this.redisns + options.qname;
mc = [["zadd", key, q.ts + options.delay * 1000, q.uid], ["hset", key + ":Q", q.uid, options.message], ["hincrby", key + ":Q", "totalsent", 1]];
if (_this.realtime) {
mc.push(["zcard", key]);
}
_this.redis.multi(mc).exec(function(err, resp) {
if (err) {
_this._handleError(cb, err);
return;
}
if (_this.realtime) {
_this.redis.publish(_this.redisns + "rt:" + options.qname, resp[3]);
}
cb(null, q.uid);
});
};
})(this));
};
RedisSMQ.prototype.setQueueAttributes = function(options, cb) {
var item, j, k, key, len1, props;
props = ["vt", "maxsize", "delay"];
k = [];
for (j = 0, len1 = props.length; j < len1; j++) {
item = props[j];
if (options[item] != null) {
k.push(item);
}
_formatZeroPad(num, count) {
return ((Math.pow(10, count) + num) + "").substr(1);
}
if (!k.length) {
this._handleError(cb, "noAttributeSupplied");
return;
}
if (this._validate(options, ["qname"].concat(k), cb) === false) {
return;
}
key = "" + this.redisns + options.qname;
this._getQueue(options.qname, false, (function(_this) {
return function(err, q) {
if (err) {
_this._handleError(cb, err);
return;
_makeid(len) {
let text = "";
const possible = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789";
let i = 0;
for (i = 0; i < len; i++) {
text += possible.charAt(Math.floor(Math.random() * possible.length));
}
_this.redis.time(function(err, resp) {
var l, len2, mc;
if (err) {
_this._handleError(cb, err);
return;
}
mc = [["hset", "" + _this.redisns + options.qname + ":Q", "modified", resp[0]]];
for (l = 0, len2 = k.length; l < len2; l++) {
item = k[l];
mc.push(["hset", "" + _this.redisns + options.qname + ":Q", item, options[item]]);
}
_this.redis.multi(mc).exec(function(err, resp) {
if (err) {
_this._handleError(cb, err);
return;
}
_this.getQueueAttributes(options, cb);
});
});
};
})(this));
};
RedisSMQ.prototype._formatZeroPad = function(num, count) {
return ((Math.pow(10, count) + num) + "").substr(1);
};
RedisSMQ.prototype._handleError = function(cb, err, data) {
var _err, ref;
if (data == null) {
data = {};
return text;
}
if (_.isString(err)) {
_err = new Error();
_err.name = err;
_err.message = ((ref = this._ERRORS) != null ? typeof ref[err] === "function" ? ref[err](data) : void 0 : void 0) || "unkown";
} else {
_err = err;
}
cb(_err);
};
RedisSMQ.prototype._initErrors = function() {
var key, msg, ref;
this._ERRORS = {};
ref = this.ERRORS;
for (key in ref) {
msg = ref[key];
this._ERRORS[key] = _.template(msg);
}
};
RedisSMQ.prototype._makeid = function(len) {
var i, j, possible, ref, text;
text = "";
possible = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789";
for (i = j = 0, ref = len; 0 <= ref ? j < ref : j > ref; i = 0 <= ref ? ++j : --j) {
text += possible.charAt(Math.floor(Math.random() * possible.length));
}
return text;
};
RedisSMQ.prototype._VALID = {
qname: /^([a-zA-Z0-9_-]){1,160}$/,
id: /^([a-zA-Z0-9:]){32}$/
};
RedisSMQ.prototype._validate = function(o, items, cb) {
var item, j, len1;
for (j = 0, len1 = items.length; j < len1; j++) {
item = items[j];
switch (item) {
case "qname":
case "id":
if (!o[item]) {
this._handleError(cb, "missingParameter", {
item: item
});
return false;
}
o[item] = o[item].toString();
if (!this._VALID[item].test(o[item])) {
this._handleError(cb, "invalidFormat", {
item: item
});
return false;
}
break;
case "vt":
case "delay":
o[item] = parseInt(o[item], 10);
if (_.isNaN(o[item]) || !_.isNumber(o[item]) || o[item] < 0 || o[item] > 9999999) {
this._handleError(cb, "invalidValue", {
item: item,
min: 0,
max: 9999999
});
return false;
}
break;
case "maxsize":
o[item] = parseInt(o[item], 10);
if (_.isNaN(o[item]) || !_.isNumber(o[item]) || o[item] < 1024 || o[item] > 65536) {
if (o[item] !== -1) {
this._handleError(cb, "invalidValue", {
item: item,
min: 1024,
max: 65536
});
return false;
}
}
}
}
return o;
};
RedisSMQ.prototype.ERRORS = {
"noAttributeSupplied": "No attribute was supplied",
"missingParameter": "No <%= item %> supplied",
"invalidFormat": "Invalid <%= item %> format",
"invalidValue": "<%= item %> must be between <%= min %> and <%= max %>",
"messageNotString": "Message must be a string",
"messageTooLong": "Message too long",
"queueNotFound": "Queue not found",
"queueExists": "Queue exists"
};
return RedisSMQ;
})(EventEmitter);
}
module.exports = RedisSMQ;
The MIT License (MIT)
Copyright (c) 2013-2018 Patrick Liess http://www.tcs.de
Copyright (c) 2013-2019 Patrick Liess http://www.tcs.de

@@ -5,0 +5,0 @@ Permission is hereby granted, free of charge, to any person obtaining a copy

{
"name": "rsmq",
"description": "A really simple message queue based on Redis",
"version": "0.10.0",
"version": "0.11.0",
"license": "MIT",
"author": "P. Liess <smrchy+npm@gmail.com>",
"engines": {
"node": "> 0.10.20"
"node": "> 6.0.0"
},
"scripts": {
"build": "./node_modules/.bin/coffee -cb index.coffee test/test.coffee",
"build": "tsc",
"watch": "tsc -w",
"test": "mocha ./test/test.js"

@@ -16,10 +17,12 @@ },

"@types/redis": "^2.8.0",
"lodash": "^4.17.10",
"lodash": "^4.17.11",
"redis": "^2.8.0"
},
"devDependencies": {
"async": "^2.6.1",
"coffee-script": "^1.12.7",
"async": "^2.6.2",
"coffeescript": "^2.4.1",
"mocha": "^4.0.1",
"should": "^13.1.3"
"should": "^13.1.3",
"ts-loader": "^5.4.3",
"typescript": "3.4.5"
},

@@ -26,0 +29,0 @@ "keywords": [

@@ -15,3 +15,3 @@ ![RSMQ: Redis Simple Message Queue for Node.js](https://img.webmart.de/rsmq_wide.png)

* Lightweight: **Just Redis** and ~500 lines of javascript.
* Speed: Send and receive 5000+ messages per second on an average machine. It's **just Redis**.
* Speed: Send/receive 10000+ messages per second on an average machine. It's **just Redis**.
* Guaranteed **delivery of a message to exactly one recipient** within a messages visibility timeout.

@@ -78,2 +78,3 @@ * Received messages that are not deleted will reappear after the visibility timeout.

* `realtime` (Boolean): *optional (Default: false)* Enable realtime PUBLISH of new messages (see the [Realtime section](#realtime))
* `password` (String): *optional (Default: null)* If your Redis server requires a password supply it here

@@ -96,3 +97,3 @@

```javascript
rsmq.createQueueAsync({qname:"myqueue"}.then(function (resp) {
rsmq.createQueueAsync({qname:"myqueue"}).then(function (resp) {
if (resp===1) {

@@ -394,3 +395,2 @@ console.log("queue created")

|[**rsmq-worker**](https://github.com/mpneuried/rsmq-worker)|Helper to implement a worker based on [RSMQ (Redis Simple Message Queue)](https://github.com/smrchy/rsmq).|
|[**redis-notifications**](https://github.com/mpneuried/redis-notifications)|A Redis based notification engine. It implements the rsmq-worker to safely create notifications and recurring reports.|
|[**connect-redis-sessions**](https://github.com/mpneuried/connect-redis-sessions)|A connect or express middleware to use [redis sessions](https://github.com/smrchy/redis-sessions) that lets you handle multiple sessions per user_id.|

@@ -397,0 +397,0 @@

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc