Comparing version 0.4.1 to 0.4.2
@@ -72,3 +72,3 @@ var Url = require('url') | ||
msg = getArgs(arguments, msg, 2, 1, 3) | ||
var streams = filterStreamsWithTag(this.streams, tag) | ||
var streams = this.getStreamsByTag(tag) | ||
if (streams.length > 0) | ||
@@ -95,3 +95,3 @@ this.queue.pub(streams, event, msg) | ||
callback = arguments[arguments.length - 1] | ||
var streams = filterStreamsWithTag(this.streams, tag) | ||
var streams = this.getStreamsByTag(tag) | ||
if (streams.length > 0) | ||
@@ -107,29 +107,2 @@ this.queue.req(streams, event, msg, callback) | ||
// Tagging support | ||
Socket.prototype.tag = function(stream, tags) { | ||
var _tags = stream.__smq_tags__ || [] | ||
function addTag(tag) { | ||
if (-1 === _tags.indexOf(tag)) | ||
_tags.push(tag) | ||
} | ||
if (Array.isArray(tags)) { | ||
tags.forEach(addTag) | ||
} else if ('string' === typeof tags) { | ||
addTag(tags) | ||
} else { | ||
throw new Error('Tags could only be string or array of strings.') | ||
} | ||
stream.__smq_tags__ = _tags | ||
} | ||
Socket.prototype.hasTag = function(tag) { | ||
return this.streams.some(function(stream) { | ||
return stream.__smq_tags__.indexOf(tag) > -1 | ||
}) | ||
} | ||
// Queue private functions | ||
@@ -150,4 +123,38 @@ | ||
function filterStreamsWithTag(streams, tag) { | ||
return streams.filter(function(stream) { | ||
/** | ||
* Tagging support | ||
*/ | ||
Socket.prototype.tag = function(stream, tags) { | ||
var streams | ||
if ('string' === typeof stream) { | ||
streams = this.getStreamsByEndpoint(stream) | ||
if (0 === streams.length) | ||
return false | ||
} else { | ||
streams = [stream] | ||
} | ||
if (!Array.isArray(tags)) | ||
tags = [tags] | ||
var result = 0 | ||
streams.forEach(function(stream) { | ||
tags.forEach(function(tag) { | ||
result += addTag(tag, stream) | ||
}) | ||
}) | ||
return result | ||
} | ||
Socket.prototype.hasTag = function(tag) { | ||
return this.streams.some(function(stream) { | ||
return stream.__smq_tags__.indexOf(tag) > -1 | ||
}) | ||
} | ||
Socket.prototype.getStreamsByTag = function(tag) { | ||
return this.streams.filter(function(stream) { | ||
return Array.isArray(stream.__smq_tags__) | ||
@@ -158,2 +165,25 @@ && stream.__smq_tags__.indexOf(tag) > -1 | ||
Socket.prototype.getStreamsByEndpoint = function(endpoint) { | ||
return this.streams.filter(function(stream) { | ||
return endpoint === stream.__smq_endpoint__ | ||
}) | ||
} | ||
Socket.prototype.hasConnection = function(endpoint) { | ||
return this.getStreamsByEndpoint(endpoint).length > -1 | ||
} | ||
// Tagging private functions | ||
function addTag(tag, stream) { | ||
var added = 0 | ||
var _tags = stream.__smq_tags__ || [] | ||
if (-1 === _tags.indexOf(tag)) { | ||
_tags.push(tag) | ||
added = 1 | ||
} | ||
stream.__smq_tags__ = _tags | ||
return added | ||
} | ||
/** | ||
@@ -193,2 +223,3 @@ * Transport & stream operations | ||
var client = endpoint.transport.connect(endpoint.target, this, options, cb) | ||
client.__smq_endpoint__ = uri | ||
this.tag(client, uri) | ||
@@ -195,0 +226,0 @@ return client |
{ | ||
"name": "socketmq", | ||
"version": "0.4.1", | ||
"version": "0.4.2", | ||
"description": "Lightweight stream-oriented messaging library for node.", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
var test = require('tape') | ||
module.exports = function(name, T, smqServer, smqClient1, smqClient2) { | ||
T.plan(15) | ||
module.exports = function(name, T, smqServer, smqClient1, smqClient2, endpoint, options) { | ||
T.plan(21) | ||
@@ -27,6 +27,6 @@ var serverStream1 | ||
T.ok(stream.writable, 'stream is writable') | ||
if ('tcp' === name) | ||
T.ok(smqClient1.hasTag('tcp://127.0.0.1:6363'), 'client1 has default tag') | ||
if ('tls' === name) | ||
T.ok(smqClient1.hasTag('tls://localhost:46363'), 'client1 has default tag') | ||
T.ok(smqClient1.hasTag(endpoint), 'client1 has default tag') | ||
T.equal(smqClient1.tag(endpoint, 'tag by uri'), 1, 'tag by endpoint uri') | ||
T.equal(smqClient1.tag(endpoint, 'tag by uri'), 0, 'tag existing tag again') | ||
T.ok(smqClient1.hasTag('tag by uri'), 'client1 has tag tagged by endpoint uri') | ||
} | ||
@@ -36,10 +36,11 @@ | ||
var clientStream2 | ||
smqClient2.on('connect', function(stream) { | ||
clientStream2 = stream | ||
T.pass('client2 connect event') | ||
T.ok(stream.readable, 'stream is readable') | ||
T.ok(stream.writable, 'stream is writable') | ||
if ('tcp' === name) | ||
T.ok(smqClient2.hasTag('tcp://127.0.0.1:6363'), 'client2 has default tag') | ||
if ('tls' === name) | ||
T.ok(smqClient2.hasTag('tls://localhost:46363'), 'client2 has default tag') | ||
T.ok(smqClient2.hasTag(endpoint), 'client2 has default tag') | ||
T.equal(smqClient1.tag(endpoint + '123456', 'tag by non-existent uri'), false, 'tag by non-existent uri') | ||
T.notOk(smqClient1.hasTag('tag by non-existent uri'), 'tag by non-existent uri') | ||
}) | ||
@@ -167,2 +168,3 @@ | ||
}) | ||
smqClient1.removeListener('connect', onClient1Connect) | ||
@@ -185,26 +187,36 @@ smqClient1.addStream(clientStream1) | ||
if ('tcp' === name) { | ||
test(name + ': pending messages', function(t) { | ||
t.plan(3) | ||
test(name + ': streamError', function(t) { | ||
t.plan(4) | ||
smqClient2.on('disconnect', function(stream) { | ||
t.equal(stream, clientStream2, 'disconnect stream match') | ||
}) | ||
smqClient2.on('streamError', function(err, stream) { | ||
t.equal(err, 'some error', 'streamError error match') | ||
t.equal(stream, clientStream2, 'streamError stream match') | ||
}) | ||
clientStream2.emit('error', 'some error') | ||
}) | ||
var reqEvent = name + ' pending req' | ||
smqClient1.req(reqEvent, 'pending req msg', function(data) { | ||
t.equal(data, 'pending req reply', 'pending req reply') | ||
}) | ||
test(name + ': pending messages', function(t) { | ||
t.plan(3) | ||
smqServer.rep(reqEvent, function(msg, reply) { | ||
t.equal(msg, 'pending req msg', 'pending req msg') | ||
reply('pending req reply') | ||
}) | ||
var reqEvent = name + ' pending req' | ||
smqClient1.req(reqEvent, 'pending req msg', function(data) { | ||
t.equal(data, 'pending req reply', 'pending req reply') | ||
}) | ||
var pubEvent = name + ' pending pub' | ||
smqClient1.pub(pubEvent, 'pending pub msg') | ||
smqServer.sub(pubEvent, function(msg) { | ||
t.equal(msg, 'pending pub msg', 'pending pub msg') | ||
}) | ||
smqServer.rep(reqEvent, function(msg, reply) { | ||
t.equal(msg, 'pending req msg', 'pending req msg') | ||
reply('pending req reply') | ||
}) | ||
smqServer.removeListener('connect', onServerConnect) | ||
smqClient1.connect('tcp://127.0.0.1:6363') | ||
var pubEvent = name + ' pending pub' | ||
smqClient1.pub(pubEvent, 'pending pub msg') | ||
smqServer.sub(pubEvent, function(msg) { | ||
t.equal(msg, 'pending pub msg', 'pending pub msg') | ||
}) | ||
} | ||
smqServer.removeListener('connect', onServerConnect) | ||
smqClient1.connect(endpoint, options) | ||
}) | ||
} |
@@ -6,12 +6,15 @@ var test = require('tape') | ||
module.exports = function() { | ||
test('connect event tcp', function(t) { | ||
test('tcp: connect event', function(t) { | ||
var smqServer = socketmq.bind('tcp://127.0.0.1:6363') | ||
var smqClient1 = socketmq.connect('tcp://127.0.0.1:6363', function () { | ||
t.notOk(smqClient1.hasTag('tcp://127.0.0.1:6363'), 'default tag has not been added') | ||
var endpoint = 'tcp://127.0.0.1:6363' | ||
var smqServer = socketmq.bind(endpoint) | ||
var smqClient1 = socketmq.connect(endpoint, function() { | ||
t.notOk(smqClient1.hasTag(endpoint), 'default tag has not been added') | ||
t.ok(smqClient1.hasConnection(endpoint), 'endpoint connected') | ||
}) | ||
var smqClient2 = socketmq.connect('tcp://127.0.0.1:6363') | ||
var smqClient2 = socketmq.connect(endpoint) | ||
testDefault('tcp', t, smqServer, smqClient1, smqClient2) | ||
testDefault('tcp', t, smqServer, smqClient1, smqClient2, endpoint) | ||
}) | ||
} |
@@ -12,3 +12,10 @@ var fs = require('fs') | ||
var smqServer = socketmq.bind('tls://localhost:46363', { | ||
var endpoint = 'tls://localhost:46363' | ||
var clientOptions = { | ||
key: fs.readFileSync(certPath + '/client-key.pem'), | ||
cert: fs.readFileSync(certPath + '/client-cert.pem'), | ||
ca: [fs.readFileSync(certPath + '/server-cert.pem')] | ||
} | ||
var smqServer = socketmq.bind(endpoint, { | ||
key: fs.readFileSync(certPath + '/server-key.pem'), | ||
@@ -19,18 +26,11 @@ cert: fs.readFileSync(certPath + '/server-cert.pem'), | ||
var smqClient1 = socketmq.connect('tls://localhost:46363', { | ||
key: fs.readFileSync(certPath + '/client-key.pem'), | ||
cert: fs.readFileSync(certPath + '/client-cert.pem'), | ||
ca: [fs.readFileSync(certPath + '/server-cert.pem')] | ||
}, function() { | ||
t.notOk(smqClient1.hasTag('tls://localhost:46363'), 'default tag has not been added') | ||
var smqClient1 = socketmq.connect(endpoint, clientOptions, function() { | ||
t.notOk(smqClient1.hasTag(endpoint), 'default tag has not been added') | ||
t.ok(smqClient1.hasConnection(endpoint), 'endpoint connected') | ||
}) | ||
var smqClient2 = socketmq.connect('tls://localhost:46363', { | ||
key: fs.readFileSync(certPath + '/client-key.pem'), | ||
cert: fs.readFileSync(certPath + '/client-cert.pem'), | ||
ca: [fs.readFileSync(certPath + '/server-cert.pem')] | ||
}) | ||
var smqClient2 = socketmq.connect(endpoint, clientOptions) | ||
testDefault('tls', t, smqServer, smqClient1, smqClient2) | ||
testDefault('tls', t, smqServer, smqClient1, smqClient2, endpoint, clientOptions) | ||
}) | ||
} |
26855
719