Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

axon

Package Overview
Dependencies
Maintainers
1
Versions
23
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

axon - npm Package Compare versions

Comparing version 0.3.2 to 0.4.0

lib/sockets/pub-emitter.js

1

benchmark/sub.js
var ss = require('..')

@@ -4,0 +3,0 @@ , program = require('commander');

0.4.0 / 2012-10-12
==================
* add emitter wildcard support
* add sub socket subscription support
* add `pub-emitter`
* add `sub-emitter`
* perf: remove `.concat()` usage, ~10% gain
* remove greetings
0.3.2 / 2012-10-08

@@ -3,0 +13,0 @@ ==================

@@ -14,3 +14,4 @@

exports.PullSocket = require('./sockets/pull');
exports.EmitterSocket = require('./sockets/emitter');
exports.PubEmitterSocket = require('./sockets/pub-emitter');
exports.SubEmitterSocket = require('./sockets/sub-emitter');
exports.ReqSocket = require('./sockets/req');

@@ -24,10 +25,10 @@ exports.RepSocket = require('./sockets/rep');

exports.types = {
stream: exports.Socket,
pub: exports.PubSocket,
sub: exports.SubSocket,
push: exports.PushSocket,
pull: exports.PullSocket,
emitter: exports.EmitterSocket,
req: exports.ReqSocket,
rep: exports.RepSocket
'pub': exports.PubSocket,
'sub': exports.SubSocket,
'push': exports.PushSocket,
'pull': exports.PullSocket,
'pub-emitter': exports.PubEmitterSocket,
'sub-emitter': exports.SubEmitterSocket,
'req': exports.ReqSocket,
'rep': exports.RepSocket
};

@@ -34,0 +35,0 @@

@@ -42,8 +42,10 @@

return function (msg, multipart){
if (!multipart) return debug('rep expects multipart');
if (!multipart) return debug('expected multipart: %j', msg);
var id = msg.pop();
self.emit.apply(self, ['message'].concat(msg, reply));
msg.unshift('message');
msg.push(reply);
self.emit.apply(self, msg);
function reply(){
function reply() {
var args = [].slice.call(arguments);

@@ -50,0 +52,0 @@ args[0] = args[0] || null;

@@ -60,3 +60,3 @@

return function(msg, multipart){
if (!multipart) return debug('expected multipart');
if (!multipart) return debug('expected multipart: %j', msg);
var id = msg.pop();

@@ -63,0 +63,0 @@ var fn = self.callbacks[id];

@@ -40,3 +40,2 @@

this.socks = [];
this.map = {};
this.settings = {};

@@ -119,2 +118,15 @@ this.format('none');

/**
* Close all open underlying sockets.
*
* @api private
*/
Socket.prototype.closeSockets = function(){
debug('closing %d connections', this.socks.length);
this.socks.forEach(function(sock){
sock.destroy();
});
};
/**
* Close the socket.

@@ -129,10 +141,9 @@ *

Socket.prototype.close = function(){
debug('close');
debug('closing');
this.closing = true;
if ('server' == this.type) {
this.server && this.server.close();
} else {
this.socks.forEach(function(sock){
sock.destroy();
});
this.closeSockets();
if (this.server) {
debug('closing server');
this.server.on('close', this.emit.bind(this, 'close'));
this.server.close();
}

@@ -149,7 +160,6 @@ };

Socket.prototype.address = function(){
if (this.server) {
var addr = this.server.address();
addr.string = 'tcp://' + addr.address + ':' + addr.port;
return addr;
}
if (!this.server) return;
var addr = this.server.address();
addr.string = 'tcp://' + addr.address + ':' + addr.port;
return addr;
};

@@ -167,3 +177,2 @@

this.socks.splice(i, 1);
if (sock._axon_id) delete this.map[sock._axon_id];
};

@@ -179,23 +188,6 @@

Socket.prototype.addSocket = function(sock){
var self = this
, pid = String(process.pid)
, parser = new Parser
, n = this.socks.push(sock);
// send our greeting
sock.write(this.pack(this.get('identity')));
// parse incoming
var parser = new Parser;
this.socks.push(sock);
sock.on('data', parser.write.bind(parser));
// accept greeting once, emit messages there on out
parser.onmessage = ongreeting;
function ongreeting(msg){
var id = String(msg);
if ('0' === id) id = pid + Date.now() + n;
sock._axon_id = id;
self.map[id] = sock;
parser.onmessage = self.onmessage(sock);
}
parser.onmessage = this.onmessage(sock);
};

@@ -220,3 +212,4 @@

if (multipart) {
self.emit.apply(self, ['message'].concat(msg));
msg.unshift('message');
self.emit.apply(self, msg);
} else {

@@ -258,2 +251,3 @@ self.emit('message', msg);

sock.on('error', function(err){
debug('error %s', err.code);
if ('ECONNREFUSED' != err.code) {

@@ -260,0 +254,0 @@ self.emit('error', err);

@@ -6,3 +6,4 @@

var Socket = require('./sock');
var Socket = require('./sock')
, debug = require('debug')('axon:sub');

@@ -33,2 +34,78 @@ /**

/**
* Check if this socket has subscriptions.
*
* @return {Boolean}
* @api public
*/
SubSocket.prototype.hasSubscriptions = function(){
return !! this.subscriptions.length;
};
/**
* Check if any subscriptions match `topic`.
*
* @param {String} topic
* @return {Boolean}
* @api public
*/
SubSocket.prototype.matches = function(topic){
for (var i = 0; i < this.subscriptions.length; ++i) {
if (this.subscriptions[i].test(topic)) {
return true;
}
}
return false;
};
/**
* Incoming.
*
* @param {net.Socket} sock
* @return {Function} closure(msg, mulitpart)
* @api private
*/
SubSocket.prototype.onmessage = function(sock){
var self = this;
var patterns = this.subscriptions;
if (this.hasSubscriptions()) {
return function(msg, multipart){
var topic = msg[0].toString();
if (!self.matches(topic)) return debug('not subscribed to "%s"', topic);
self.emit.apply(self, ['message'].concat(msg));
}
}
return Socket.prototype.onmessage.call(this, sock);
};
/**
* Subscribe with the given `re`.
*
* @param {RegExp|String} re
* @return {RegExp}
* @api public
*/
SubSocket.prototype.subscribe = function(re){
debug('subscribe to "%s"', re);
this.subscriptions.push(re = toRegExp(re));
return re;
};
/**
* Clear current subscriptions.
*
* @api public
*/
SubSocket.prototype.clearSubscriptions = function(){
this.subscriptions = [];
};
/**
* Subscribers should not send messages.

@@ -38,3 +115,17 @@ */

SubSocket.prototype.send = function(){
throw new Error('subscribers should not send messages');
throw new Error('subscribers cannot send messages');
};
/**
* Convert `str` to a `RegExp`.
*
* @param {String} str
* @return {RegExp}
* @api private
*/
function toRegExp(str) {
if (str instanceof RegExp) return str;
str = str.replace(/\*/g, '(.+)');
return new RegExp('^' + str + '$');
}
{
"name": "axon",
"description": "High-level messaging & socket patterns implemented in pure js",
"version": "0.3.2",
"version": "0.4.0",
"author": "TJ Holowaychuk <tj@vision-media.ca>",

@@ -11,2 +11,3 @@ "dependencies": {

"devDependencies": {
"better-assert": "*",
"should": "*",

@@ -13,0 +14,0 @@ "mocha": "*",

@@ -22,3 +22,3 @@ # Axon

- req / rep
- emitter
- pub-emitter / sub-emitter

@@ -83,3 +83,3 @@ ## Push / Pull

`SubSocket` simply recieves any messages from a `PubSocket`:
`SubSocket` simply receives any messages from a `PubSocket`:

@@ -97,2 +97,18 @@ ```js

`SubSocket`s may optionally `.subscribe()` to one or more "topics" (the first multipart value),
using string patterns or regular expressions:
```js
var axon = require('axon')
, sock = axon.socket('sub');
sock.connect(3000);
sock.subscribe('user:login');
sock.subscribe('upload:*:progress');
sock.on('message', function(topic, msg){
});
```
## Req / Rep

@@ -165,16 +181,13 @@

## EmitterSocket
## PubEmitter / SubEmitter
`EmitterSocket`'s send and receive messages behaving like regular node `EventEmitter`s.
This is achieved by using pub / sub sockets behind the scenes and automatically formatting
messages with the "json" codec. Currently we simply define the `EmitterSocket` as a `PubSocket` if you `.bind()`, and `SubSocket` if you `.connect()`, providing the natural API you're used to:
`PubEmitter` and `SubEmitter` are higher-level `Pub` / `Sub` sockets, using the "json" codec to behave much like node's `EventEmitter`. When a `SubEmitter`'s `.on()` method is invoked, the event name is `.subscribe()`d for you. Each wildcard (`*`) or regexp capture group is passed to the callback along with regular message arguments.
server.js:
app.js:
```js
var axon = require('axon')
, sock = axon.socket('emitter');
, sock = axon.socket('pub-emitter');
sock.bind(3000);
console.log('pub server started');
sock.connect(3000);

@@ -186,14 +199,21 @@ setInterval(function(){

client.js:
logger.js:
```js
var axon = require('axon')
, sock = axon.socket('emitter');
, sock = axon.socket('sub-emitter');
sock.connect(3000);
console.log('sub client connected');
sock.bind(3000);
sock.on('login', function(user){
sock.on('user:login', function(user){
console.log('%s signed in', user.name);
});
sock.on('user:*', function(action, user){
console.log('%s %s', user.name, action);
});
sock.on('*', function(event){
console.log(arguments);
});
```

@@ -308,2 +328,13 @@

15 byte messages:
```
min: 280 ops/s
mean: 472,109 ops/s
median: 477,309 ops/s
total: 10,758,780 ops in 24.633s
through: 6.753573417663574 mb/s
```
64 byte messages:

@@ -313,7 +344,7 @@

min: 22,085 ops/s
mean: 585,944 ops/s
median: 606,176 ops/s
total: 326,7126 ops in 6.5s
through: 35.76318359375 mb/s
min: 218 ops/s
mean: 462,286 ops/s
median: 461,512 ops/s
total: 6,455,160 ops in 15.488s
through: 28.2156982421875 mb/s

@@ -326,10 +357,22 @@ ```

min: 1,851 ops/s
mean: 34,0156 ops/s
median: 449,660 ops/s
total: 329,831 ops in 4.241s
through: 332.18359375 mb/s
min: 280 ops/s
mean: 382,829 ops/s
median: 382,764 ops/s
total: 3,333,581 ops in 15.126s
through: 373.8564453125 mb/s
```
8k messages:
```
min: 392 ops/s
mean: 92,778 ops/s
median: 87,943 ops/s
total: 1,257,430 ops in 21.735s
through: 724.828125 mb/s
````
## What's it good for?

@@ -358,13 +401,2 @@

## Todo
- more tests
- code cov
- weighted fair queuing
- cap batch size
- zero-copy for batches...
- make batching configurable... disable for lower latency
- subscriptions
- ...
## License

@@ -371,0 +403,0 @@

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc