Comparing version 0.7.0 to 0.7.1
@@ -25,2 +25,3 @@ var type = require('../message/type') | ||
this.emitter = emitter || socket | ||
this.joined = false | ||
} | ||
@@ -77,2 +78,3 @@ | ||
this.joined = false | ||
this.emitter.emit('leave', reason, this.ns, this.chn) | ||
@@ -84,7 +86,6 @@ } | ||
if (reason) { | ||
if (this.chn) { | ||
if (this.chn) | ||
this.emitter.leave(reason) | ||
} else if (this.left && (EXITED === reason || DISCON === reason)) { | ||
else if (this.left && (EXITED === reason || DISCON === reason)) | ||
this.left(pack, stream) | ||
} | ||
} | ||
@@ -120,6 +121,10 @@ } | ||
var self = this | ||
var emitter = this.emitter | ||
this.allow(pack, stream, function(pack, stream) { | ||
if (chn && INF === pack.type && JON === pack.event) | ||
if (chn && INF === pack.type && JON === pack.event) { | ||
self.joined = true | ||
self._flush() | ||
emitter.emit('join', JOINED, meta[MNS], chn) | ||
} | ||
dispatch(pack, stream) | ||
@@ -135,3 +140,32 @@ }) | ||
QueueChannel.prototype._pub = Queue.prototype.pub | ||
QueueChannel.prototype._flushAll = Queue.prototype._flush | ||
QueueChannel.prototype._flush = function() { | ||
var len = this._pendings.length | ||
if (0 < len) { | ||
if (this.chn && true !== this.joined) | ||
// Only flush INF pack if we have channel name and not joined yet. | ||
// Reset pendings array to what left. | ||
this.pendings = flushInf(this, len) | ||
else | ||
this._flushAll() | ||
} | ||
} | ||
function flushInf(queue, len) { | ||
var pendings = queue._pendings | ||
var newPendings = [] | ||
var i = 0 | ||
while (i < len) { | ||
var _p = pendings[i++] | ||
if ('inf' === _p[0]) | ||
queue[_p[0]](_p[1], _p[2], _p[3], _p[4], _p[5], _p[6]) | ||
else | ||
newPendings.push(_p) | ||
} | ||
return newPendings | ||
} | ||
function getChannelMeta(queue, pack) { | ||
@@ -138,0 +172,0 @@ var meta = pack.meta || {} |
@@ -38,7 +38,7 @@ var inf = require('./inf') | ||
var queue = this | ||
setTimeout(function() { | ||
// Acknowledge existing streams | ||
if (socket.streams.length > 0) | ||
if (socket.streams && socket.streams.length > 0) { | ||
setTimeout(function() { | ||
queue.ack(socket.streams) | ||
}, 0) | ||
}, 0) | ||
} | ||
} | ||
@@ -45,0 +45,0 @@ |
{ | ||
"name": "socketmq", | ||
"version": "0.7.0", | ||
"version": "0.7.1", | ||
"description": "Lightweight stream-oriented messaging library for node.", | ||
@@ -14,9 +14,9 @@ "main": "index.js", | ||
"devDependencies": { | ||
"coveralls": "2.11.9", | ||
"engine.io": "1.6.9", | ||
"engine.io-client": "1.6.9", | ||
"coveralls": "2.11.12", | ||
"engine.io": "1.6.11", | ||
"engine.io-client": "1.6.11", | ||
"humanize-number": "0.0.2", | ||
"istanbul": "0.4.3", | ||
"nodemon": "1.9.2", | ||
"tape": "4.5.1" | ||
"istanbul": "0.4.4", | ||
"nodemon": "1.10.0", | ||
"tape": "4.6.0" | ||
}, | ||
@@ -23,0 +23,0 @@ "scripts": { |
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
85334
2495