Comparing version
@@ -41,2 +41,11 @@ var type = require('../message/type') | ||
QueueChannel.prototype.join = function(chn) { | ||
this.chn = chn | ||
this.socket.on('connect', this.onconnect) | ||
this.socket.on('message', this.onmessage) | ||
var streams = this.socket.streams | ||
if (streams.length > 0) | ||
this.ack(streams) | ||
} | ||
QueueChannel.prototype.leave = function() { | ||
@@ -52,4 +61,3 @@ // Unbind event listeners first | ||
} | ||
// Remove reference of socket | ||
delete this.socket | ||
this.emitter.emit('leave', this.ns, this.chn) | ||
} | ||
@@ -82,3 +90,3 @@ | ||
if (chn && INF === pack.type && AIN === pack.event) | ||
emitter.emit('join', chn) | ||
emitter.emit('join', meta[MNS], chn) | ||
dispatch(pack, stream) | ||
@@ -85,0 +93,0 @@ }) |
@@ -41,3 +41,3 @@ var type = require('../message/type') | ||
var meta = pack.meta | ||
// PUB message could only come from trusted stream -> unstrusted | ||
// PUB message could only come from trusted stream -> untrusted | ||
// We select streams by tag, which means the PUB message could be sent to many | ||
@@ -63,2 +63,5 @@ // streams as long as the stream has correct tag no matter trusted or untrusted. | ||
// Clean up senstive data before sending to untrusted streams | ||
cleanMeta(meta) | ||
buf = wire.encode(pack) | ||
this.all(_streams, { | ||
@@ -189,4 +192,3 @@ buf: buf | ||
sidStream.__smq_ssn__ = meta[SSN] | ||
delete meta[SID] | ||
delete meta[SSN] | ||
cleanMeta(meta) | ||
this.one(sidStreams, { | ||
@@ -303,3 +305,3 @@ type: INF, | ||
reply = function(repPack) { | ||
delete meta[SID] | ||
cleanMeta(meta) | ||
repPack.type = REP | ||
@@ -325,1 +327,10 @@ repPack.meta[MID] = msgId | ||
} | ||
/** | ||
* Private functions | ||
*/ | ||
function cleanMeta(meta) { | ||
delete meta[SID] | ||
delete meta[SSN] | ||
} |
@@ -14,2 +14,4 @@ var que = require('./que') | ||
} | ||
this.ns = ns | ||
this.chn = chn | ||
} | ||
@@ -50,9 +52,21 @@ | ||
SocketChannel.prototype.join = function(chn) { | ||
if (!chn) | ||
throw new Error('`join` requires channel name') | ||
if (this.chn) | ||
throw new Error('Already in channel "' + this.chn + '", leave it first.') | ||
this.chn = chn | ||
this.queue.join(chn) | ||
} | ||
SocketChannel.prototype.leave = function() { | ||
// Leave queue and remove reference | ||
if (!this.chn) | ||
return false | ||
this.queue.leave() | ||
delete this.queue | ||
// Unbind listeners | ||
this.removeAllListeners('join') | ||
// Remove chn so we know we are not in any channel. | ||
delete this.chn | ||
return true | ||
} | ||
{ | ||
"name": "socketmq", | ||
"version": "0.6.4", | ||
"version": "0.6.5", | ||
"description": "Lightweight stream-oriented messaging library for node.", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
@@ -182,9 +182,19 @@ var fs = require('fs') | ||
test('gateway: leave', function(t) { | ||
t.plan(1) | ||
t.plan(4) | ||
eioClient.on('leave', function(ns, chn) { | ||
t.equal(ns, '/chat', 'leave namepace') | ||
t.equal(chn, 'my room', 'leave channel') | ||
}) | ||
t.throws(function() { | ||
eioClient.join() | ||
}, /`join` requires channel name/) | ||
t.throws(function() { | ||
eioClient.join('new room') | ||
}, /Already in channel "my room", leave it first/) | ||
eioClient.leave() | ||
tcpClient.pubChn('my room', 'eio sub', 'message should not be delivered') | ||
t.throws(function() { | ||
eioClient.req('without callback', 'message should not be delivered') | ||
}, /TypeError: Cannot read property 'req' of undefined/, 'throw exception when req after leave') | ||
}) | ||
} |
74960
1.15%2194
1.57%