Comparing version 0.3.2 to 0.4.0
var ss = require('..') | ||
@@ -4,0 +3,0 @@ , program = require('commander'); |
0.4.0 / 2012-10-12 | ||
================== | ||
* add emitter wildcard support | ||
* add sub socket subscription support | ||
* add `pub-emitter` | ||
* add `sub-emitter` | ||
* perf: remove `.concat()` usage, ~10% gain | ||
* remove greetings | ||
0.3.2 / 2012-10-08 | ||
@@ -3,0 +13,0 @@ ================== |
@@ -14,3 +14,4 @@ | ||
exports.PullSocket = require('./sockets/pull'); | ||
exports.EmitterSocket = require('./sockets/emitter'); | ||
exports.PubEmitterSocket = require('./sockets/pub-emitter'); | ||
exports.SubEmitterSocket = require('./sockets/sub-emitter'); | ||
exports.ReqSocket = require('./sockets/req'); | ||
@@ -24,10 +25,10 @@ exports.RepSocket = require('./sockets/rep'); | ||
exports.types = { | ||
stream: exports.Socket, | ||
pub: exports.PubSocket, | ||
sub: exports.SubSocket, | ||
push: exports.PushSocket, | ||
pull: exports.PullSocket, | ||
emitter: exports.EmitterSocket, | ||
req: exports.ReqSocket, | ||
rep: exports.RepSocket | ||
'pub': exports.PubSocket, | ||
'sub': exports.SubSocket, | ||
'push': exports.PushSocket, | ||
'pull': exports.PullSocket, | ||
'pub-emitter': exports.PubEmitterSocket, | ||
'sub-emitter': exports.SubEmitterSocket, | ||
'req': exports.ReqSocket, | ||
'rep': exports.RepSocket | ||
}; | ||
@@ -34,0 +35,0 @@ |
@@ -42,8 +42,10 @@ | ||
return function (msg, multipart){ | ||
if (!multipart) return debug('rep expects multipart'); | ||
if (!multipart) return debug('expected multipart: %j', msg); | ||
var id = msg.pop(); | ||
self.emit.apply(self, ['message'].concat(msg, reply)); | ||
msg.unshift('message'); | ||
msg.push(reply); | ||
self.emit.apply(self, msg); | ||
function reply(){ | ||
function reply() { | ||
var args = [].slice.call(arguments); | ||
@@ -50,0 +52,0 @@ args[0] = args[0] || null; |
@@ -60,3 +60,3 @@ | ||
return function(msg, multipart){ | ||
if (!multipart) return debug('expected multipart'); | ||
if (!multipart) return debug('expected multipart: %j', msg); | ||
var id = msg.pop(); | ||
@@ -63,0 +63,0 @@ var fn = self.callbacks[id]; |
@@ -40,3 +40,2 @@ | ||
this.socks = []; | ||
this.map = {}; | ||
this.settings = {}; | ||
@@ -119,2 +118,15 @@ this.format('none'); | ||
/** | ||
* Close all open underlying sockets. | ||
* | ||
* @api private | ||
*/ | ||
Socket.prototype.closeSockets = function(){ | ||
debug('closing %d connections', this.socks.length); | ||
this.socks.forEach(function(sock){ | ||
sock.destroy(); | ||
}); | ||
}; | ||
/** | ||
* Close the socket. | ||
@@ -129,10 +141,9 @@ * | ||
Socket.prototype.close = function(){ | ||
debug('close'); | ||
debug('closing'); | ||
this.closing = true; | ||
if ('server' == this.type) { | ||
this.server && this.server.close(); | ||
} else { | ||
this.socks.forEach(function(sock){ | ||
sock.destroy(); | ||
}); | ||
this.closeSockets(); | ||
if (this.server) { | ||
debug('closing server'); | ||
this.server.on('close', this.emit.bind(this, 'close')); | ||
this.server.close(); | ||
} | ||
@@ -149,7 +160,6 @@ }; | ||
Socket.prototype.address = function(){ | ||
if (this.server) { | ||
var addr = this.server.address(); | ||
addr.string = 'tcp://' + addr.address + ':' + addr.port; | ||
return addr; | ||
} | ||
if (!this.server) return; | ||
var addr = this.server.address(); | ||
addr.string = 'tcp://' + addr.address + ':' + addr.port; | ||
return addr; | ||
}; | ||
@@ -167,3 +177,2 @@ | ||
this.socks.splice(i, 1); | ||
if (sock._axon_id) delete this.map[sock._axon_id]; | ||
}; | ||
@@ -179,23 +188,6 @@ | ||
Socket.prototype.addSocket = function(sock){ | ||
var self = this | ||
, pid = String(process.pid) | ||
, parser = new Parser | ||
, n = this.socks.push(sock); | ||
// send our greeting | ||
sock.write(this.pack(this.get('identity'))); | ||
// parse incoming | ||
var parser = new Parser; | ||
this.socks.push(sock); | ||
sock.on('data', parser.write.bind(parser)); | ||
// accept greeting once, emit messages there on out | ||
parser.onmessage = ongreeting; | ||
function ongreeting(msg){ | ||
var id = String(msg); | ||
if ('0' === id) id = pid + Date.now() + n; | ||
sock._axon_id = id; | ||
self.map[id] = sock; | ||
parser.onmessage = self.onmessage(sock); | ||
} | ||
parser.onmessage = this.onmessage(sock); | ||
}; | ||
@@ -220,3 +212,4 @@ | ||
if (multipart) { | ||
self.emit.apply(self, ['message'].concat(msg)); | ||
msg.unshift('message'); | ||
self.emit.apply(self, msg); | ||
} else { | ||
@@ -258,2 +251,3 @@ self.emit('message', msg); | ||
sock.on('error', function(err){ | ||
debug('error %s', err.code); | ||
if ('ECONNREFUSED' != err.code) { | ||
@@ -260,0 +254,0 @@ self.emit('error', err); |
@@ -6,3 +6,4 @@ | ||
var Socket = require('./sock'); | ||
var Socket = require('./sock') | ||
, debug = require('debug')('axon:sub'); | ||
@@ -33,2 +34,78 @@ /** | ||
/** | ||
* Check if this socket has subscriptions. | ||
* | ||
* @return {Boolean} | ||
* @api public | ||
*/ | ||
SubSocket.prototype.hasSubscriptions = function(){ | ||
return !! this.subscriptions.length; | ||
}; | ||
/** | ||
* Check if any subscriptions match `topic`. | ||
* | ||
* @param {String} topic | ||
* @return {Boolean} | ||
* @api public | ||
*/ | ||
SubSocket.prototype.matches = function(topic){ | ||
for (var i = 0; i < this.subscriptions.length; ++i) { | ||
if (this.subscriptions[i].test(topic)) { | ||
return true; | ||
} | ||
} | ||
return false; | ||
}; | ||
/** | ||
* Incoming. | ||
* | ||
* @param {net.Socket} sock | ||
* @return {Function} closure(msg, mulitpart) | ||
* @api private | ||
*/ | ||
SubSocket.prototype.onmessage = function(sock){ | ||
var self = this; | ||
var patterns = this.subscriptions; | ||
if (this.hasSubscriptions()) { | ||
return function(msg, multipart){ | ||
var topic = msg[0].toString(); | ||
if (!self.matches(topic)) return debug('not subscribed to "%s"', topic); | ||
self.emit.apply(self, ['message'].concat(msg)); | ||
} | ||
} | ||
return Socket.prototype.onmessage.call(this, sock); | ||
}; | ||
/** | ||
* Subscribe with the given `re`. | ||
* | ||
* @param {RegExp|String} re | ||
* @return {RegExp} | ||
* @api public | ||
*/ | ||
SubSocket.prototype.subscribe = function(re){ | ||
debug('subscribe to "%s"', re); | ||
this.subscriptions.push(re = toRegExp(re)); | ||
return re; | ||
}; | ||
/** | ||
* Clear current subscriptions. | ||
* | ||
* @api public | ||
*/ | ||
SubSocket.prototype.clearSubscriptions = function(){ | ||
this.subscriptions = []; | ||
}; | ||
/** | ||
* Subscribers should not send messages. | ||
@@ -38,3 +115,17 @@ */ | ||
SubSocket.prototype.send = function(){ | ||
throw new Error('subscribers should not send messages'); | ||
throw new Error('subscribers cannot send messages'); | ||
}; | ||
/** | ||
* Convert `str` to a `RegExp`. | ||
* | ||
* @param {String} str | ||
* @return {RegExp} | ||
* @api private | ||
*/ | ||
function toRegExp(str) { | ||
if (str instanceof RegExp) return str; | ||
str = str.replace(/\*/g, '(.+)'); | ||
return new RegExp('^' + str + '$'); | ||
} |
{ | ||
"name": "axon", | ||
"description": "High-level messaging & socket patterns implemented in pure js", | ||
"version": "0.3.2", | ||
"version": "0.4.0", | ||
"author": "TJ Holowaychuk <tj@vision-media.ca>", | ||
@@ -11,2 +11,3 @@ "dependencies": { | ||
"devDependencies": { | ||
"better-assert": "*", | ||
"should": "*", | ||
@@ -13,0 +14,0 @@ "mocha": "*", |
104
Readme.md
@@ -22,3 +22,3 @@ # Axon | ||
- req / rep | ||
- emitter | ||
- pub-emitter / sub-emitter | ||
@@ -83,3 +83,3 @@ ## Push / Pull | ||
`SubSocket` simply recieves any messages from a `PubSocket`: | ||
`SubSocket` simply receives any messages from a `PubSocket`: | ||
@@ -97,2 +97,18 @@ ```js | ||
`SubSocket`s may optionally `.subscribe()` to one or more "topics" (the first multipart value), | ||
using string patterns or regular expressions: | ||
```js | ||
var axon = require('axon') | ||
, sock = axon.socket('sub'); | ||
sock.connect(3000); | ||
sock.subscribe('user:login'); | ||
sock.subscribe('upload:*:progress'); | ||
sock.on('message', function(topic, msg){ | ||
}); | ||
``` | ||
## Req / Rep | ||
@@ -165,16 +181,13 @@ | ||
## EmitterSocket | ||
## PubEmitter / SubEmitter | ||
`EmitterSocket`'s send and receive messages behaving like regular node `EventEmitter`s. | ||
This is achieved by using pub / sub sockets behind the scenes and automatically formatting | ||
messages with the "json" codec. Currently we simply define the `EmitterSocket` as a `PubSocket` if you `.bind()`, and `SubSocket` if you `.connect()`, providing the natural API you're used to: | ||
`PubEmitter` and `SubEmitter` are higher-level `Pub` / `Sub` sockets, using the "json" codec to behave much like node's `EventEmitter`. When a `SubEmitter`'s `.on()` method is invoked, the event name is `.subscribe()`d for you. Each wildcard (`*`) or regexp capture group is passed to the callback along with regular message arguments. | ||
server.js: | ||
app.js: | ||
```js | ||
var axon = require('axon') | ||
, sock = axon.socket('emitter'); | ||
, sock = axon.socket('pub-emitter'); | ||
sock.bind(3000); | ||
console.log('pub server started'); | ||
sock.connect(3000); | ||
@@ -186,14 +199,21 @@ setInterval(function(){ | ||
client.js: | ||
logger.js: | ||
```js | ||
var axon = require('axon') | ||
, sock = axon.socket('emitter'); | ||
, sock = axon.socket('sub-emitter'); | ||
sock.connect(3000); | ||
console.log('sub client connected'); | ||
sock.bind(3000); | ||
sock.on('login', function(user){ | ||
sock.on('user:login', function(user){ | ||
console.log('%s signed in', user.name); | ||
}); | ||
sock.on('user:*', function(action, user){ | ||
console.log('%s %s', user.name, action); | ||
}); | ||
sock.on('*', function(event){ | ||
console.log(arguments); | ||
}); | ||
``` | ||
@@ -308,2 +328,13 @@ | ||
15 byte messages: | ||
``` | ||
min: 280 ops/s | ||
mean: 472,109 ops/s | ||
median: 477,309 ops/s | ||
total: 10,758,780 ops in 24.633s | ||
through: 6.753573417663574 mb/s | ||
``` | ||
64 byte messages: | ||
@@ -313,7 +344,7 @@ | ||
min: 22,085 ops/s | ||
mean: 585,944 ops/s | ||
median: 606,176 ops/s | ||
total: 326,7126 ops in 6.5s | ||
through: 35.76318359375 mb/s | ||
min: 218 ops/s | ||
mean: 462,286 ops/s | ||
median: 461,512 ops/s | ||
total: 6,455,160 ops in 15.488s | ||
through: 28.2156982421875 mb/s | ||
@@ -326,10 +357,22 @@ ``` | ||
min: 1,851 ops/s | ||
mean: 34,0156 ops/s | ||
median: 449,660 ops/s | ||
total: 329,831 ops in 4.241s | ||
through: 332.18359375 mb/s | ||
min: 280 ops/s | ||
mean: 382,829 ops/s | ||
median: 382,764 ops/s | ||
total: 3,333,581 ops in 15.126s | ||
through: 373.8564453125 mb/s | ||
``` | ||
8k messages: | ||
``` | ||
min: 392 ops/s | ||
mean: 92,778 ops/s | ||
median: 87,943 ops/s | ||
total: 1,257,430 ops in 21.735s | ||
through: 724.828125 mb/s | ||
```` | ||
## What's it good for? | ||
@@ -358,13 +401,2 @@ | ||
## Todo | ||
- more tests | ||
- code cov | ||
- weighted fair queuing | ||
- cap batch size | ||
- zero-copy for batches... | ||
- make batching configurable... disable for lower latency | ||
- subscriptions | ||
- ... | ||
## License | ||
@@ -371,0 +403,0 @@ |
42529
24
1352
418
4