Comparing version 0.7.1 to 0.8.0
@@ -34,6 +34,10 @@ var type = require('../message/type') | ||
if (socket.isUntrusted(stream)) { | ||
if (stream.id) | ||
if (stream.id) { | ||
socket.tag(stream, prefixTags(SID, stream.id)) | ||
else | ||
socket.close(stream) | ||
} else { | ||
socket.close(stream, { | ||
type: socket.ERR_GATEWAY_NOID, | ||
stream: stream | ||
}) | ||
} | ||
} | ||
@@ -40,0 +44,0 @@ }) |
@@ -22,2 +22,4 @@ var que = require('./que') | ||
this.on('disconnect', this.ondisconnect) | ||
if (socket) | ||
socket.on('disconnect', this.ondisconnect) | ||
} | ||
@@ -24,0 +26,0 @@ |
@@ -10,6 +10,11 @@ var Url = require('url') | ||
exports.send = function(stream, buf) { | ||
if (stream.writable) | ||
if (stream.writable) { | ||
stream.write(buf) | ||
else | ||
this.emit('error', this.ERR_UNWRITABLE, stream) | ||
} else { | ||
this.close(stream, { | ||
data: buf, | ||
type: this.ERR_UNWRITABLE, | ||
stream: stream | ||
}) | ||
} | ||
} | ||
@@ -63,8 +68,13 @@ | ||
exports.close = function(stream) { | ||
exports.close = function(stream, error) { | ||
this.removeStream(stream) | ||
stream.destroy && stream.destroy() | ||
if (error) | ||
this.emit('error', error) | ||
} | ||
exports.addStream = function(stream) { | ||
if (-1 !== this.streams.indexOf(stream)) | ||
return | ||
var parser = new StreamParser() | ||
@@ -83,6 +93,9 @@ stream.pipe(parser) | ||
exports.removeStream = function(stream) { | ||
this.streams = this.streams.filter(function(s) { | ||
return stream !== s | ||
}) | ||
this.emit('disconnect', stream) | ||
var streams = this.streams | ||
var idx = streams.indexOf(stream) | ||
if (idx >= 0) { | ||
streams.splice(idx, 1) | ||
this.emit('disconnect', stream) | ||
return true | ||
} | ||
} |
@@ -22,5 +22,7 @@ var Queue = require('../queue/index') | ||
Socket.ERR_TIMEOUT = Socket.prototype.ERR_TIMEOUT = 'timeout' | ||
Socket.ERR_UNWRITABLE = Socket.prototype.ERR_UNWRITABLE = 'unwritable' | ||
Socket.ERR_NO_TAGGED_STREAM = Socket.prototype.ERR_NO_TAGGED_STREAM = 'no stream for tag' | ||
Socket.ERR_STREAM = Socket.prototype.ERR_STREAM = 'ERR_STREAM' | ||
Socket.ERR_TIMEOUT = Socket.prototype.ERR_TIMEOUT = 'ERR_TIMEOUT' | ||
Socket.ERR_UNWRITABLE = Socket.prototype.ERR_UNWRITABLE = 'ERR_UNWRITABLE' | ||
Socket.ERR_GATEWAY_NOID = Socket.prototype.ERR_GATEWAY_NOID = 'ERR_GATEWAY_NOID' | ||
Socket.ERR_NO_TAGGED_STREAM = Socket.prototype.ERR_NO_TAGGED_STREAM = 'ERR_NO_TAGGED_STREAM' | ||
@@ -27,0 +29,0 @@ /** |
@@ -27,3 +27,8 @@ var Buffer = require('../message/type').Buffer | ||
} else { | ||
this.emit('error', this.ERR_NO_TAGGED_STREAM, tag, event, msg) | ||
this.emit('error', { | ||
tag: tag, | ||
type: this.ERR_NO_TAGGED_STREAM, | ||
event: event, | ||
message: msg | ||
}) | ||
} | ||
@@ -55,3 +60,8 @@ } | ||
} else { | ||
this.emit('error', this.ERR_NO_TAGGED_STREAM, tag, event, msg) | ||
this.emit('error', { | ||
tag: tag, | ||
type: this.ERR_NO_TAGGED_STREAM, | ||
event: event, | ||
message: msg | ||
}) | ||
} | ||
@@ -58,0 +68,0 @@ } |
@@ -7,3 +7,3 @@ | ||
stream.on('close', function() { | ||
stream.once('close', function() { | ||
smq.close(stream) | ||
@@ -13,4 +13,7 @@ }) | ||
stream.on('error', function(err) { | ||
smq.close(stream) | ||
smq.emit('stream error', err, stream) | ||
smq.close(stream, { | ||
type: smq.ERR_STREAM, | ||
error: err, | ||
stream: stream | ||
}) | ||
}) | ||
@@ -17,0 +20,0 @@ |
{ | ||
"name": "socketmq", | ||
"version": "0.7.1", | ||
"version": "0.8.0", | ||
"description": "Lightweight stream-oriented messaging library for node.", | ||
@@ -11,12 +11,12 @@ "main": "index.js", | ||
"amp": "0.3.1", | ||
"inherits": "2.0.1" | ||
"inherits": "2.0.3" | ||
}, | ||
"devDependencies": { | ||
"coveralls": "2.11.12", | ||
"engine.io": "1.6.11", | ||
"engine.io-client": "1.6.11", | ||
"coveralls": "2.11.14", | ||
"engine.io": "1.7.2", | ||
"engine.io-client": "1.7.2", | ||
"humanize-number": "0.0.2", | ||
"istanbul": "0.4.4", | ||
"nodemon": "1.10.0", | ||
"tape": "4.6.0" | ||
"istanbul": "0.4.5", | ||
"nodemon": "1.11.0", | ||
"tape": "4.6.2" | ||
}, | ||
@@ -23,0 +23,0 @@ "scripts": { |
@@ -5,3 +5,3 @@ var test = require('tape') | ||
// 4 were in the tcp/tls/eio transport tests | ||
T.plan(23) | ||
T.plan(24) | ||
@@ -206,3 +206,3 @@ var serverStream1 | ||
t.plan(1) | ||
smqClient1.on('disconnect', function(stream) { | ||
smqClient1.once('disconnect', function(stream) { | ||
t.equal(stream, clientStream1, 'clientStream1 disconnected') | ||
@@ -216,4 +216,4 @@ }) | ||
smqClient1.on('error', function(error) { | ||
t.equal(error, smqClient1.ERR_UNWRITABLE, 'emit unwritable error') | ||
smqClient1.on('error', function(event) { | ||
t.equal(event.type, smqClient1.ERR_UNWRITABLE, 'emit unwritable error') | ||
}) | ||
@@ -227,9 +227,9 @@ | ||
smqClient2.on('error', function(error) { | ||
t.equal(error, smqClient2.ERR_NO_TAGGED_STREAM, 'client emit no stream for tag error') | ||
smqClient2.once('error', function handle(event) { | ||
t.equal(event.type, smqClient2.ERR_NO_TAGGED_STREAM, 'client emit no stream for tag error') | ||
}) | ||
smqClient2.reqTag('no such tag', 'event', 'message', function() {}) | ||
smqServer.on('error', function(error) { | ||
t.equal(error, smqClient2.ERR_NO_TAGGED_STREAM, 'server emit no stream for tag error') | ||
smqServer.on('error', function(event) { | ||
t.equal(event.type, smqClient2.ERR_NO_TAGGED_STREAM, 'server emit no stream for tag error') | ||
}) | ||
@@ -239,3 +239,3 @@ smqServer.pubTag('no such tag', 'event', 'message') | ||
test(name + ': stream error', function(t) { | ||
test(name + ': error ERR_STREAM', function(t) { | ||
t.plan(4) | ||
@@ -245,5 +245,6 @@ smqClient2.on('disconnect', function(stream) { | ||
}) | ||
smqClient2.on('stream error', function(err, stream) { | ||
t.equal(err, 'some error', 'stream error match') | ||
t.equal(stream, clientStream2, 'stream error stream match') | ||
smqClient2.on('error', function(event) { | ||
t.equal(event.type, smqClient2.ERR_STREAM, 'error type match') | ||
t.equal(event.error, 'some error', 'error match') | ||
t.equal(event.stream, clientStream2, 'error stream match') | ||
}) | ||
@@ -322,2 +323,24 @@ clientStream2.emit('error', 'some error') | ||
}) | ||
test(name + ': channel disconnect', function(t) { | ||
t.plan(5) | ||
var serverChannel = smqServer.channel('namespace') | ||
var clientChannel = smqClient1.channel('namespace', 'room') | ||
serverChannel.sub('message', function(channel, msg) { | ||
t.equal('room', channel, 'channel name is correct') | ||
t.equal('some message', msg, 'message is correct') | ||
setTimeout(function() { | ||
smqClient1.close(smqClient1.streams[0]) | ||
}, 100) | ||
}) | ||
clientChannel.on('leave', function(reason, ns, chn) { | ||
t.equal(reason, 'DISCON', 'leave reason DISCON') | ||
t.equal(ns, 'namespace', 'ns is namespace') | ||
t.equal(chn, 'room', 'channel is room') | ||
}) | ||
clientChannel.pub('message', 'some message') | ||
}) | ||
} |
@@ -18,5 +18,6 @@ var test = require('tape') | ||
var smqErrClient = socketmq.connect('eio://127.0.0.1:9090') | ||
smqErrClient.on('stream error', function(err, socket) { | ||
t.equal(err.type, 'TransportError', 'eio get stream connection error') | ||
t.ok(socket, 'eio has socket instance in error event') | ||
smqErrClient.on('error', function(event) { | ||
t.equal(event.type, smqErrClient.ERR_STREAM, 'error type match') | ||
t.equal(event.error.type, 'TransportError', 'eio get stream connection error') | ||
t.ok(event.stream, 'eio has stream instance in error event') | ||
}) | ||
@@ -23,0 +24,0 @@ |
@@ -18,5 +18,6 @@ var test = require('tape') | ||
var smqErrClient = socketmq.connect('tcp://127.0.0.1:3636') | ||
smqErrClient.on('stream error', function(err, socket) { | ||
t.equal(err.code, 'ECONNREFUSED', 'tcp get stream connection error') | ||
t.ok(socket, 'tcp has socket instance in error event') | ||
smqErrClient.on('error', function(event) { | ||
t.equal(event.type, smqErrClient.ERR_STREAM, 'error type match') | ||
t.equal(event.error.code, 'ECONNREFUSED', 'tcp get stream connection error') | ||
t.ok(event.stream, 'tcp has stream instance in error event') | ||
}) | ||
@@ -23,0 +24,0 @@ |
@@ -33,5 +33,6 @@ var fs = require('fs') | ||
var smqErrClient = socketmq.connect('tls://localhost:43636', clientOptions) | ||
smqErrClient.on('stream error', function(err, socket) { | ||
t.equal(err.code, 'ECONNREFUSED', 'tls get stream connection error') | ||
t.ok(socket, 'tls has socket instance in error event') | ||
smqErrClient.on('error', function(event) { | ||
t.equal(event.type, smqErrClient.ERR_STREAM, 'error type match') | ||
t.equal(event.error.code, 'ECONNREFUSED', 'tls get stream connection error') | ||
t.ok(event.stream, 'tls has stream instance in error event') | ||
}) | ||
@@ -38,0 +39,0 @@ |
87046
2550
+ Addedinherits@2.0.3(transitive)
- Removedinherits@2.0.1(transitive)
Updatedinherits@2.0.3