mqemitter-redis
Advanced tools
Comparing version 0.2.0 to 0.3.0
@@ -0,9 +1,10 @@ | ||
'use strict' | ||
var redis = require('redis') | ||
, MQEmitter = require('mqemitter') | ||
, shortid = require('shortid') | ||
, inherits = require('inherits') | ||
, LRU = require("lru-cache") | ||
var redis = require('redis') | ||
var MQEmitter = require('mqemitter') | ||
var shortid = require('shortid') | ||
var inherits = require('inherits') | ||
var LRU = require('lru-cache') | ||
function MQEmitterRedis(opts) { | ||
function MQEmitterRedis (opts) { | ||
if (!(this instanceof MQEmitterRedis)) { | ||
@@ -13,30 +14,30 @@ return new MQEmitterRedis(opts) | ||
opts = opts || {} | ||
opts = opts || {} | ||
this._opts = opts | ||
this._opts = opts | ||
this.subConn = createConn(opts) | ||
this.pubConn = createConn(opts) | ||
this.subConn = createConn(opts) | ||
this.pubConn = createConn(opts) | ||
this._topics = {} | ||
this._topics = {} | ||
this._cache = LRU({ | ||
max: 10000, | ||
maxAge: 60 * 1000 // one minute | ||
}) | ||
this._cache = LRU({ | ||
max: 100000 | ||
, maxAge: 60 * 1000 // one minute | ||
}) | ||
var that = this | ||
var that = this | ||
function handler(sub, topic, payload) { | ||
function handler (sub, topic, payload) { | ||
var packet = JSON.parse(payload) | ||
if (!that._cache.get(packet.id)) | ||
if (!that._cache.get(packet.id)) { | ||
that._emit(packet.msg) | ||
} | ||
that._cache.set(packet.id, true) | ||
} | ||
this.subConn.on("message", function (topic, message) { | ||
this.subConn.on('message', function (topic, message) { | ||
handler(topic, topic, message) | ||
}) | ||
this.subConn.on("pmessage", function(sub, topic, message) { | ||
this.subConn.on('pmessage', function (sub, topic, message) { | ||
handler(sub, topic, message) | ||
@@ -50,3 +51,3 @@ }) | ||
function createConn(opts) { | ||
function createConn (opts) { | ||
var conn = redis.createClient(opts.port || null, | ||
@@ -66,13 +67,14 @@ opts.host || null, | ||
['emit', 'on', 'removeListener', 'close'].forEach(function(name) { | ||
['emit', 'on', 'removeListener', 'close'].forEach(function (name) { | ||
MQEmitterRedis.prototype['_' + name] = MQEmitterRedis.prototype[name] | ||
}) | ||
MQEmitterRedis.prototype.close = function close(cb) { | ||
MQEmitterRedis.prototype.close = function (cb) { | ||
var count = 2 | ||
, that = this | ||
var that = this | ||
function onEnd() { | ||
if (--count === 0) | ||
function onEnd () { | ||
if (--count === 0) { | ||
that._close(cb) | ||
} | ||
} | ||
@@ -89,10 +91,10 @@ | ||
MQEmitterRedis.prototype._subTopic = function(topic) { | ||
return topic.replace(this._opts.wildcardOne, '*') | ||
.replace(this._opts.wildcardSome, '*') | ||
MQEmitterRedis.prototype._subTopic = function (topic) { | ||
return topic.replace(this._opts.wildcardOne, '*') | ||
.replace(this._opts.wildcardSome, '*') | ||
} | ||
MQEmitterRedis.prototype.on = function on(topic, cb, done) { | ||
MQEmitterRedis.prototype.on = function on (topic, cb, done) { | ||
var subTopic = this._subTopic(topic) | ||
var onFinish = function() { | ||
var onFinish = function () { | ||
if (done) { | ||
@@ -122,12 +124,13 @@ setImmediate(done) | ||
MQEmitterRedis.prototype.emit = function emit(msg, done) { | ||
if (this.closed) | ||
MQEmitterRedis.prototype.emit = function (msg, done) { | ||
if (this.closed) { | ||
return done(new Error('mqemitter-redis is closed')) | ||
} | ||
var packet = { | ||
id: shortid() | ||
, msg: msg | ||
id: shortid(), | ||
msg: msg | ||
} | ||
this.pubConn.publish(msg.topic, JSON.stringify(packet), function() { | ||
this.pubConn.publish(msg.topic, JSON.stringify(packet), function () { | ||
if (done) { | ||
@@ -139,5 +142,5 @@ setImmediate(done) | ||
MQEmitterRedis.prototype.removeListener = function removeListener(topic, cb, done) { | ||
MQEmitterRedis.prototype.removeListener = function (topic, cb, done) { | ||
var subTopic = this._subTopic(topic) | ||
var onFinish = function() { | ||
var onFinish = function () { | ||
if (done) { | ||
@@ -166,3 +169,3 @@ setImmediate(done) | ||
MQEmitterRedis.prototype._containsWildcard = function(topic) { | ||
MQEmitterRedis.prototype._containsWildcard = function (topic) { | ||
return (topic.indexOf(this._opts.wildcardOne) >= 0) || | ||
@@ -169,0 +172,0 @@ (topic.indexOf(this._opts.wildcardSome) >= 0) |
{ | ||
"name": "mqemitter-redis", | ||
"version": "0.2.0", | ||
"version": "0.3.0", | ||
"description": "Redis-based MQEmitter", | ||
@@ -8,14 +8,18 @@ "main": "mqemitter-redis.js", | ||
"inherits": "^2.0.1", | ||
"mqemitter": "^0.3.0", | ||
"mqemitter": "^0.4.0", | ||
"redis": "^0.12.1", | ||
"shortid": "^2.0.1", | ||
"lru-cache": "~2.5.0" | ||
"shortid": "^2.2.2", | ||
"lru-cache": "^2.6.4" | ||
}, | ||
"devDependencies": { | ||
"faucet": "^0.0.1", | ||
"tape": "^2.14.0" | ||
"pre-commit": "^1.0.7", | ||
"standard": "^4.0.1", | ||
"tape": "^4.0.0" | ||
}, | ||
"scripts": { | ||
"lint": "standard", | ||
"test": "tape test.js | faucet" | ||
}, | ||
"pre-commit": ["lint", "test"], | ||
"repository": { | ||
@@ -22,0 +26,0 @@ "type": "git", |
@@ -1,2 +0,2 @@ | ||
mqemitter-redis | ||
mqemitter-redis [![Build Status](https://travis-ci.org/mcollina/mqemitter-redis.png)](https://travis-ci.org/mcollina/mqemitter-redis) | ||
=============== | ||
@@ -9,2 +9,5 @@ | ||
[![js-standard-style](https://raw.githubusercontent.com/feross/standard/master/badge.png)](https://github.com/feross/standard) | ||
Install | ||
@@ -22,14 +25,14 @@ ------- | ||
var redis = require('mqemitter-redis') | ||
, mq = redis({ | ||
port: 12345 | ||
, localhost: 12.34.56.78 | ||
, password: 'my secret' | ||
, db: 4 | ||
}) | ||
, msg = { | ||
topic: 'hello world' | ||
, payload: 'or any other fields' | ||
} | ||
var mq = redis({ | ||
port: 12345, | ||
localhost: 12.34.56.78, | ||
password: 'my secret', | ||
db: 4 | ||
}) | ||
var msg = { | ||
topic: 'hello world' | ||
payload: 'or any other fields' | ||
} | ||
mq.on('hello world', function(message, cb) { | ||
mq.on('hello world', function (message, cb) { | ||
// call callback when you are done | ||
@@ -41,3 +44,3 @@ // do not pass any errors, the emitter cannot handle it. | ||
// topic is mandatory | ||
mq.emit(msg, function() { | ||
mq.emit(msg, function () { | ||
// emitter will never return an error | ||
@@ -44,0 +47,0 @@ }) |
23
test.js
@@ -0,17 +1,18 @@ | ||
'use strict' | ||
var redis = require('./') | ||
, test = require('tape').test | ||
, abstractTests = require('mqemitter/abstractTest.js') | ||
var redis = require('./') | ||
var test = require('tape').test | ||
var abstractTests = require('mqemitter/abstractTest.js') | ||
abstractTests({ | ||
builder: redis | ||
, test: test | ||
builder: redis, | ||
test: test | ||
}) | ||
test('actual unsubscribe from Redis', function(t) { | ||
function noop () {} | ||
test('actual unsubscribe from Redis', function (t) { | ||
var e = redis() | ||
function noop() {} | ||
e.subConn.on('message', function(topic, message) { | ||
e.subConn.on('message', function (topic, message) { | ||
t.fail('the message should not be emitted') | ||
@@ -22,4 +23,4 @@ }) | ||
e.removeListener('hello', noop) | ||
e.emit({ topic: 'hello' }, function() { | ||
e.close(function() { | ||
e.emit({ topic: 'hello' }, function () { | ||
e.close(function () { | ||
t.end() | ||
@@ -26,0 +27,0 @@ }) |
7820
7
150
56
4
+ Addedfastparallel@1.7.2(transitive)
+ Addedlru-cache@2.7.3(transitive)
+ Addedmqemitter@0.4.0(transitive)
+ Addedreusify@1.0.4(transitive)
+ Addedxtend@4.0.2(transitive)
- Removedlru-cache@2.5.2(transitive)
- Removedmqemitter@0.3.1(transitive)
Updatedlru-cache@^2.6.4
Updatedmqemitter@^0.4.0
Updatedshortid@^2.2.2