Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

mqemitter-redis

Package Overview
Dependencies
Maintainers
1
Versions
22
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

mqemitter-redis - npm Package Compare versions

Comparing version 0.2.0 to 0.3.0

.travis.yml

83

mqemitter-redis.js

@@ -0,9 +1,10 @@

'use strict'
var redis = require('redis')
, MQEmitter = require('mqemitter')
, shortid = require('shortid')
, inherits = require('inherits')
, LRU = require("lru-cache")
var redis = require('redis')
var MQEmitter = require('mqemitter')
var shortid = require('shortid')
var inherits = require('inherits')
var LRU = require('lru-cache')
function MQEmitterRedis(opts) {
function MQEmitterRedis (opts) {
if (!(this instanceof MQEmitterRedis)) {

@@ -13,30 +14,30 @@ return new MQEmitterRedis(opts)

opts = opts || {}
opts = opts || {}
this._opts = opts
this._opts = opts
this.subConn = createConn(opts)
this.pubConn = createConn(opts)
this.subConn = createConn(opts)
this.pubConn = createConn(opts)
this._topics = {}
this._topics = {}
this._cache = LRU({
max: 10000,
maxAge: 60 * 1000 // one minute
})
this._cache = LRU({
max: 100000
, maxAge: 60 * 1000 // one minute
})
var that = this
var that = this
function handler(sub, topic, payload) {
function handler (sub, topic, payload) {
var packet = JSON.parse(payload)
if (!that._cache.get(packet.id))
if (!that._cache.get(packet.id)) {
that._emit(packet.msg)
}
that._cache.set(packet.id, true)
}
this.subConn.on("message", function (topic, message) {
this.subConn.on('message', function (topic, message) {
handler(topic, topic, message)
})
this.subConn.on("pmessage", function(sub, topic, message) {
this.subConn.on('pmessage', function (sub, topic, message) {
handler(sub, topic, message)

@@ -50,3 +51,3 @@ })

function createConn(opts) {
function createConn (opts) {
var conn = redis.createClient(opts.port || null,

@@ -66,13 +67,14 @@ opts.host || null,

['emit', 'on', 'removeListener', 'close'].forEach(function(name) {
['emit', 'on', 'removeListener', 'close'].forEach(function (name) {
MQEmitterRedis.prototype['_' + name] = MQEmitterRedis.prototype[name]
})
MQEmitterRedis.prototype.close = function close(cb) {
MQEmitterRedis.prototype.close = function (cb) {
var count = 2
, that = this
var that = this
function onEnd() {
if (--count === 0)
function onEnd () {
if (--count === 0) {
that._close(cb)
}
}

@@ -89,10 +91,10 @@

MQEmitterRedis.prototype._subTopic = function(topic) {
return topic.replace(this._opts.wildcardOne, '*')
.replace(this._opts.wildcardSome, '*')
MQEmitterRedis.prototype._subTopic = function (topic) {
return topic.replace(this._opts.wildcardOne, '*')
.replace(this._opts.wildcardSome, '*')
}
MQEmitterRedis.prototype.on = function on(topic, cb, done) {
MQEmitterRedis.prototype.on = function on (topic, cb, done) {
var subTopic = this._subTopic(topic)
var onFinish = function() {
var onFinish = function () {
if (done) {

@@ -122,12 +124,13 @@ setImmediate(done)

MQEmitterRedis.prototype.emit = function emit(msg, done) {
if (this.closed)
MQEmitterRedis.prototype.emit = function (msg, done) {
if (this.closed) {
return done(new Error('mqemitter-redis is closed'))
}
var packet = {
id: shortid()
, msg: msg
id: shortid(),
msg: msg
}
this.pubConn.publish(msg.topic, JSON.stringify(packet), function() {
this.pubConn.publish(msg.topic, JSON.stringify(packet), function () {
if (done) {

@@ -139,5 +142,5 @@ setImmediate(done)

MQEmitterRedis.prototype.removeListener = function removeListener(topic, cb, done) {
MQEmitterRedis.prototype.removeListener = function (topic, cb, done) {
var subTopic = this._subTopic(topic)
var onFinish = function() {
var onFinish = function () {
if (done) {

@@ -166,3 +169,3 @@ setImmediate(done)

MQEmitterRedis.prototype._containsWildcard = function(topic) {
MQEmitterRedis.prototype._containsWildcard = function (topic) {
return (topic.indexOf(this._opts.wildcardOne) >= 0) ||

@@ -169,0 +172,0 @@ (topic.indexOf(this._opts.wildcardSome) >= 0)

{
"name": "mqemitter-redis",
"version": "0.2.0",
"version": "0.3.0",
"description": "Redis-based MQEmitter",

@@ -8,14 +8,18 @@ "main": "mqemitter-redis.js",

"inherits": "^2.0.1",
"mqemitter": "^0.3.0",
"mqemitter": "^0.4.0",
"redis": "^0.12.1",
"shortid": "^2.0.1",
"lru-cache": "~2.5.0"
"shortid": "^2.2.2",
"lru-cache": "^2.6.4"
},
"devDependencies": {
"faucet": "^0.0.1",
"tape": "^2.14.0"
"pre-commit": "^1.0.7",
"standard": "^4.0.1",
"tape": "^4.0.0"
},
"scripts": {
"lint": "standard",
"test": "tape test.js | faucet"
},
"pre-commit": ["lint", "test"],
"repository": {

@@ -22,0 +26,0 @@ "type": "git",

@@ -1,2 +0,2 @@

mqemitter-redis
mqemitter-redis  [![Build Status](https://travis-ci.org/mcollina/mqemitter-redis.png)](https://travis-ci.org/mcollina/mqemitter-redis)
===============

@@ -9,2 +9,5 @@

[![js-standard-style](https://raw.githubusercontent.com/feross/standard/master/badge.png)](https://github.com/feross/standard)
Install

@@ -22,14 +25,14 @@ -------

var redis = require('mqemitter-redis')
, mq = redis({
port: 12345
, localhost: 12.34.56.78
, password: 'my secret'
, db: 4
})
, msg = {
topic: 'hello world'
, payload: 'or any other fields'
}
var mq = redis({
port: 12345,
localhost: 12.34.56.78,
password: 'my secret',
db: 4
})
var msg = {
topic: 'hello world'
payload: 'or any other fields'
}
mq.on('hello world', function(message, cb) {
mq.on('hello world', function (message, cb) {
// call callback when you are done

@@ -41,3 +44,3 @@ // do not pass any errors, the emitter cannot handle it.

// topic is mandatory
mq.emit(msg, function() {
mq.emit(msg, function () {
// emitter will never return an error

@@ -44,0 +47,0 @@ })

@@ -0,17 +1,18 @@

'use strict'
var redis = require('./')
, test = require('tape').test
, abstractTests = require('mqemitter/abstractTest.js')
var redis = require('./')
var test = require('tape').test
var abstractTests = require('mqemitter/abstractTest.js')
abstractTests({
builder: redis
, test: test
builder: redis,
test: test
})
test('actual unsubscribe from Redis', function(t) {
function noop () {}
test('actual unsubscribe from Redis', function (t) {
var e = redis()
function noop() {}
e.subConn.on('message', function(topic, message) {
e.subConn.on('message', function (topic, message) {
t.fail('the message should not be emitted')

@@ -22,4 +23,4 @@ })

e.removeListener('hello', noop)
e.emit({ topic: 'hello' }, function() {
e.close(function() {
e.emit({ topic: 'hello' }, function () {
e.close(function () {
t.end()

@@ -26,0 +27,0 @@ })

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc