Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

axon

Package Overview
Dependencies
Maintainers
2
Versions
23
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

axon - npm Package Compare versions

Comparing version 1.0.0 to 2.0.0

LICENSE

2

benchmark/pub.js

@@ -24,5 +24,5 @@

for (var i = 0; i < perTick; ++i) sock.send(buf);
process.nextTick(more);
setImmediate(more);
}
more();
1.0.0 / 2013-08-30
2.0.0 / 2014-02-25
==================
* refactor to use the AMP protocol. Closes #577
* remove old codec support
1.0.0 / 2013-08-30
==================
* change Socket#connect() to use inaddr_any as well
0.6.1 / 2013-04-13
0.6.1 / 2013-04-13
==================

@@ -13,3 +19,3 @@

0.6.0 / 2013-04-13
0.6.0 / 2013-04-13
==================

@@ -20,3 +26,3 @@

0.5.2 / 2013-04-09
0.5.2 / 2013-04-09
==================

@@ -23,0 +29,0 @@

@@ -6,13 +6,11 @@

exports.Parser = require('./parser');
exports.Message = require('./message');
exports.Socket = require('./sockets/sock');
exports.PubEmitterSocket = require('./sockets/pub-emitter');
exports.SubEmitterSocket = require('./sockets/sub-emitter');
exports.PushSocket = require('./sockets/push');
exports.PullSocket = require('./sockets/pull');
exports.PubSocket = require('./sockets/pub');
exports.SubSocket = require('./sockets/sub');
exports.PushSocket = require('./sockets/push');
exports.PullSocket = require('./sockets/pull');
exports.PubEmitterSocket = require('./sockets/pub-emitter');
exports.SubEmitterSocket = require('./sockets/sub-emitter');
exports.ReqSocket = require('./sockets/req');
exports.RepSocket = require('./sockets/rep');
exports.Socket = require('./sockets/sock');

@@ -24,8 +22,8 @@ /**

exports.types = {
'pub-emitter': exports.PubEmitterSocket,
'sub-emitter': exports.SubEmitterSocket,
'push': exports.PushSocket,
'pull': exports.PullSocket,
'pub': exports.PubSocket,
'sub': exports.SubSocket,
'push': exports.PushSocket,
'pull': exports.PullSocket,
'pub-emitter': exports.PubEmitterSocket,
'sub-emitter': exports.SubEmitterSocket,
'req': exports.ReqSocket,

@@ -36,8 +34,2 @@ 'rep': exports.RepSocket

/**
* Codecs.
*/
exports.codec = require('./codecs');
/**
* Return a new socket of the given `type`.

@@ -44,0 +36,0 @@ *

@@ -46,3 +46,3 @@

for (var i = 0; i < len; ++i) {
this.send(prev[i]);
this.send.apply(this, prev[i]);
}

@@ -49,0 +49,0 @@

@@ -40,8 +40,8 @@

sock.send = function(msg){
var socks = this.socks
, len = socks.length
, sock = socks[n++ % len];
sock.send = function(){
var socks = this.socks;
var len = socks.length;
var sock = socks[n++ % len];
if (arguments.length > 1) msg = slice(arguments);
var msg = slice(arguments);

@@ -48,0 +48,0 @@ if (sock && sock.writable) {

@@ -22,3 +22,2 @@

this.sock = new PubSocket;
this.sock.format('json');
this.emit = this.sock.send.bind(this.sock);

@@ -25,0 +24,0 @@ this.bind = this.sock.bind.bind(this.sock);

@@ -6,4 +6,4 @@

var Socket = require('./sock')
, slice = require('../utils').slice;
var Socket = require('./sock');
var slice = require('../utils').slice;

@@ -40,12 +40,11 @@ /**

PubSocket.prototype.send = function(msg){
var socks = this.socks
, len = socks.length
, sock;
var socks = this.socks;
var len = socks.length;
var sock;
if (arguments.length > 1) msg = slice(arguments);
msg = this.pack(msg);
var buf = this.pack(arguments);
for (var i = 0; i < len; i++) {
sock = socks[i];
if (sock.writable) sock.write(msg);
if (sock.writable) sock.write(buf);
}

@@ -52,0 +51,0 @@

@@ -6,5 +6,5 @@

var Socket = require('./sock')
, queue = require('../plugins/queue')
, roundrobin = require('../plugins/round-robin');
var roundrobin = require('../plugins/round-robin');
var queue = require('../plugins/queue');
var Socket = require('./sock');

@@ -11,0 +11,0 @@ /**

@@ -6,4 +6,6 @@

var Socket = require('./sock')
, debug = require('debug')('axon:rep');
var slice = require('../utils').slice;
var debug = require('debug')('axon:rep');
var Message = require('amp-message');
var Socket = require('./sock');

@@ -42,13 +44,15 @@ /**

var self = this;
return function (msg, multipart){
if (!multipart) return debug('expected multipart: %j', msg);
var id = msg.pop();
msg.unshift('message');
msg.push(reply);
self.emit.apply(self, msg);
return function (buf){
var msg = new Message(buf);
var args = msg.args;
var id = args.pop();
args.unshift('message');
args.push(reply);
self.emit.apply(self, args);
function reply() {
var fn = function(){};
var args = [].slice.call(arguments);
var args = slice(arguments);
args[0] = args[0] || null;

@@ -55,0 +59,0 @@

@@ -6,6 +6,7 @@

var Socket = require('./sock')
, queue = require('../plugins/queue')
, slice = require('../utils').slice
, debug = require('debug')('axon:req');
var debug = require('debug')('axon:req');
var queue = require('../plugins/queue');
var slice = require('../utils').slice;
var Message = require('amp-message');
var Socket = require('./sock');

@@ -61,8 +62,8 @@ /**

var self = this;
return function(msg, multipart){
if (!multipart) return debug('expected multipart: %j', msg);
return function(buf){
var msg = new Message(buf);
var id = msg.pop();
var fn = self.callbacks[id];
if (!fn) return debug('missing callback %s', id);
fn.apply(null, msg);
fn.apply(null, msg.args);
delete self.callbacks[id];

@@ -81,8 +82,6 @@ };

ReqSocket.prototype.send = function(msg){
var socks = this.socks
, len = socks.length
, sock = socks[this.n++ % len]
, args = Array.isArray(msg)
? msg
: slice(arguments);
var socks = this.socks;
var len = socks.length;
var sock = socks[this.n++ % len];
var args = slice(arguments);

@@ -89,0 +88,0 @@ if (sock) {

@@ -6,10 +6,10 @@

var Emitter = require('events').EventEmitter;
var Configurable = require('configurable');
var debug = require('debug')('axon:sock');
var Message = require('amp-message');
var Parser = require('amp').Stream;
var url = require('url');
var net = require('net')
, Parser = require('../parser')
, Message = require('../message')
, codecs = require('../codecs')
, Emitter = require('events').EventEmitter
, debug = require('debug')('axon:sock')
, Configurable = require('configurable')
, url = require('url');
var fs = require('fs');

@@ -27,3 +27,4 @@ /**

'ENETDOWN',
'EPIPE'
'EPIPE',
'ENOENT'
];

@@ -44,6 +45,2 @@

*
* Upon sending and recieving messages, the correct codec
* set by `format` will be applied. Both sides must elect
* the same codec, or all hell will break loose on the app.
*
* @api private

@@ -58,3 +55,2 @@ */

this.settings = {};
this.format('none');
this.set('hwm', Infinity);

@@ -91,27 +87,5 @@ this.set('identity', String(process.pid));

/**
* Set format to `type`.
* Creates a new `Message` and write the `args`.
*
* @param {String} type
* @return {Socket}
* @api public
*/
Socket.prototype.format = function(type){
var codec = codecs.byName(type);
if (!codec) throw new Error('unknown format "' + type + '"');
this.codec = codec;
return this;
};
/**
* Creates a new `Message` and writes `data` using the
* `codec` set already by `format`.
*
* This will only work for single part messages or multi
* part message that all use the same codec. If you need
* otherwise, use the `Message` constructor to craft
* your own message.
*
* @param {Mixed} data
* @param {Array} args
* @return {Buffer}

@@ -121,14 +95,4 @@ * @api private

Socket.prototype.pack = function(data){
var msg = new Message
, codec = this.codec;
if (Array.isArray(data)) {
for (var i = 0; i < data.length; i++) {
msg.write(codec.encode(data[i]), codec.id);
}
} else {
msg.write(codec.encode(data), codec.id);
}
Socket.prototype.pack = function(args){
var msg = new Message(args);
return msg.toBuffer();

@@ -220,4 +184,4 @@ };

debug('add socket %d', i);
sock.on('data', parser.write.bind(parser));
parser.onmessage = this.onmessage(sock);
sock.pipe(parser);
parser.on('data', this.onmessage(sock));
};

@@ -265,9 +229,5 @@

var self = this;
return function(msg, multipart){
if (multipart) {
msg.unshift('message');
self.emit.apply(self, msg);
} else {
self.emit('message', msg);
}
return function(buf){
var msg = new Message(buf);
self.emit.apply(self, ['message'].concat(msg.args));
};

@@ -281,2 +241,4 @@ };

*
* TODO: needs big cleanup
*
* @param {Number|String} port

@@ -296,4 +258,13 @@ * @param {String} host

port = url.parse(port);
host = port.hostname;
port = parseInt(port.port, 10);
if (port.protocol == "unix:") {
host = fn;
fn = undefined;
port = port.pathname;
} else {
host = port.hostname || '0.0.0.0';
port = parseInt(port.port, 10);
}
} else {
host = host || '0.0.0.0';
}

@@ -306,3 +277,2 @@

port = port;
host = host || '0.0.0.0';

@@ -382,10 +352,21 @@ this.handleErrors(sock);

var unixSocket = false;
if ('string' == typeof port) {
port = url.parse(port);
host = port.hostname;
port = parseInt(port.port, 10);
if ('unix:' == port.protocol) {
host = fn;
fn = undefined;
port = port.pathname;
unixSocket = true;
} else {
host = port.hostname || '0.0.0.0';
port = parseInt(port.port, 10);
}
} else {
host = host || '0.0.0.0';
}
this.type = 'server';
host = host || '0.0.0.0';

@@ -396,4 +377,34 @@ this.server = net.createServer(this.onconnect.bind(this));

this.server.on('listening', this.emit.bind(this, 'bind'));
if (unixSocket) {
// TODO: move out
this.server.on('error', function(e) {
if (e.code == 'EADDRINUSE') {
// Unix file socket and error EADDRINUSE is the case if
// the file socket exists. We check if other processes
// listen on file socket, otherwise it is a stale socket
// that we could reopen
// We try to connect to socket via plain network socket
var clientSocket = new net.Socket();
clientSocket.on('error', function(e2) {
if (e2.code == 'ECONNREFUSED') {
// No other server listening, so we can delete stale
// socket file and reopen server socket
fs.unlink(port);
self.server.listen(port, host, fn);
}
});
clientSocket.connect({path: port}, function() {
// Connection is possible, so other server is listening
// on this file socket
throw e;
});
}
});
}
this.server.listen(port, host, fn);
return this;
};

@@ -6,2 +6,3 @@

var Message = require('amp-message');
var SubSocket = require('./sub');

@@ -21,5 +22,4 @@

function SubEmitterSocket() {
function SubEmitterSocket() {
this.sock = new SubSocket;
this.sock.format('json');
this.sock.onmessage = this.onmessage.bind(this);

@@ -41,19 +41,16 @@ this.bind = this.sock.bind.bind(this.sock);

SubEmitterSocket.prototype.onmessage = function(){
var listeners = this.listeners;
var self = this;
var listeners = this.listeners;
return function(msg, multipart){
var topic = multipart
? msg[0].toString()
: msg.toString();
return function(buf){
var msg = new Message(buf);
var topic = msg.shift();
for (var i = 0; i < listeners.length; ++i) {
var listener = listeners[i];
var m = listener.re.exec(topic);
if (!m) continue;
if (multipart) {
listener.fn.apply(this, m.slice(1).concat(msg.slice(1)));
} else {
listener.fn.apply(this, m.slice(1));
}
listener.fn.apply(this, m.slice(1).concat(msg.args));
}

@@ -81,1 +78,19 @@ }

};
/**
* Unsubscribe with the given `event`.
*
* @param {String} event
* @return {SubEmitterSocket} self
* @api public
*/
SubEmitterSocket.prototype.off = function(event){
for (var i = 0; i < this.listeners.length; ++i) {
if (this.listeners[i].event === event) {
this.sock.unsubscribe(this.listeners[i].re);
this.listeners.splice(i--, 1);
}
}
return this;
};

@@ -6,5 +6,6 @@

var Socket = require('./sock')
, debug = require('debug')('axon:sub')
, escape = require('escape-regexp');
var debug = require('debug')('axon:sub');
var escape = require('escape-regexp');
var Message = require('amp-message');
var Socket = require('./sock');

@@ -71,17 +72,15 @@ /**

SubSocket.prototype.onmessage = function(sock){
var subs = this.hasSubscriptions();
var self = this;
var patterns = this.subscriptions;
if (this.hasSubscriptions()) {
return function(msg, multipart){
var topic = multipart
? msg[0].toString()
: msg.toString();
return function(buf){
var msg = new Message(buf);
if (subs) {
var topic = msg.args[0];
if (!self.matches(topic)) return debug('not subscribed to "%s"', topic);
self.emit.apply(self, ['message'].concat(msg));
}
}
return Socket.prototype.onmessage.call(this, sock);
self.emit.apply(self, ['message'].concat(msg.args));
};
};

@@ -104,2 +103,19 @@

/**
* Unsubscribe with the given `re`.
*
* @param {RegExp|String} re
* @api public
*/
SubSocket.prototype.unsubscribe = function(re){
debug('unsubscribe from "%s"', re);
re = toRegExp(re);
for (var i = 0; i < this.subscriptions.length; ++i) {
if (this.subscriptions[i].toString() === re.toString()) {
this.subscriptions.splice(i--, 1);
}
}
};
/**
* Clear current subscriptions.

@@ -106,0 +122,0 @@ *

@@ -11,4 +11,4 @@

exports.slice = function(args){
var len = args.length
, ret = new Array(len);
var len = args.length;
var ret = new Array(len);

@@ -15,0 +15,0 @@ for (var i = 0; i < len; i++) {

{
"name": "axon",
"description": "High-level messaging & socket patterns implemented in pure js",
"version": "1.0.0",
"version": "2.0.0",
"author": "TJ Holowaychuk <tj@vision-media.ca>",

@@ -9,3 +9,5 @@ "dependencies": {

"configurable": "0.0.1",
"escape-regexp": "0.0.1"
"escape-regexp": "0.0.1",
"amp-message": "~0.1.1",
"amp": "~0.3.1"
},

@@ -31,3 +33,4 @@ "devDependencies": {

"url": "https://github.com/visionmedia/axon.git"
}
},
"license": "MIT"
}

@@ -17,4 +17,4 @@ # Axon

- light-weight wire protocol
- supports arbitrary binary message (msgpack, json, BLOBS, etc)
- supports JSON messages out of the box
- mixed-type arguments (strings, objects, buffers, etc)
- unix domain socket support
- fast (~800 mb/s ~500,000 messages/s)

@@ -25,3 +25,3 @@

- `close` when server or connection is closed
- `error` (err) when an-handled socket error occurs
- `error` (err) when an un-handled socket error occurs
- `ignored error` (err) when an axon-handled socket error occurs, but is ignored

@@ -43,2 +43,12 @@ - `socket error` (err) emitted regardless of handling, for logging purposes

## Mixed argument types
Backed by [node-amp-message](https://github.com/visionmedia/node-amp-message)
you may pass strings, objects, and buffers as arguments.
```js
push.send('image', { w: 100, h: 200 }, imageBuffer);
pull.on('message', function(type, size, img){});
```
## Push / Pull

@@ -49,4 +59,4 @@

```js
var axon = require('axon')
, sock = axon.socket('push');
var axon = require('axon');
var sock = axon.socket('push');

@@ -64,4 +74,4 @@ sock.bind(3000);

```js
var axon = require('axon')
, sock = axon.socket('pull');
var axon = require('axon');
var sock = axon.socket('pull');

@@ -75,2 +85,3 @@ sock.connect(3000);

Both `PushSocket`s and `PullSocket`s may `.bind()` or `.connect()`. In the

@@ -94,4 +105,4 @@ following configuration the push socket is bound and pull "workers" connect

```js
var axon = require('axon')
, sock = axon.socket('pub');
var axon = require('axon');
var sock = axon.socket('pub');

@@ -109,4 +120,4 @@ sock.bind(3000);

```js
var axon = require('axon')
, sock = axon.socket('sub');
var axon = require('axon');
var sock = axon.socket('sub');

@@ -124,4 +135,4 @@ sock.connect(3000);

```js
var axon = require('axon')
, sock = axon.socket('sub');
var axon = require('axon');
var sock = axon.socket('sub');

@@ -145,4 +156,4 @@ sock.connect(3000);

```js
var axon = require('axon')
, sock = axon.socket('req');
var axon = require('axon');
var sock = axon.socket('req');

@@ -160,4 +171,4 @@ sock.bind(3000);

```js
var axon = require('axon')
, sock = axon.socket('rep');
var axon = require('axon');
var sock = axon.socket('rep');

@@ -177,4 +188,4 @@ sock.connect(3000);

```js
var axon = require('axon')
, sock = axon.socket('req');
var axon = require('axon');
var sock = axon.socket('req');

@@ -191,4 +202,4 @@ sock.bind(3000);

```js
var axon = require('axon')
, sock = axon.socket('rep');
var axon = require('axon');
var sock = axon.socket('rep');

@@ -198,3 +209,3 @@ sock.connect(3000);

sock.on('message', function(task, img, reply){
switch (task.toString()) {
switch (task) {
case 'resize':

@@ -215,4 +226,4 @@ // resize the image

```js
var axon = require('axon')
, sock = axon.socket('pub-emitter');
var axon = require('axon');
var sock = axon.socket('pub-emitter');

@@ -229,4 +240,4 @@ sock.connect(3000);

```js
var axon = require('axon')
, sock = axon.socket('sub-emitter');
var axon = require('axon');
var sock = axon.socket('sub-emitter');

@@ -274,61 +285,13 @@ sock.bind(3000);

## Protocol
You may also use unix domain sockets:
The wire protocol is simple and very much zeromq-like, where `<length>` is
a BE 24 bit unsigned integer representing a maximum length of roughly ~16mb. The `<meta>`
data byte is currently only used to store the codec, for example "json" is simply `1`,
in turn JSON messages received on the client end will then be automatically decoded for
you by selecting this same codec.
```
octet: 0 1 2 3 <length>
+------+------+------+------+------------------...
| meta | <length> | data ...
+------+------+------+------+------------------...
sock.bind('unix:///some/path')
sock.connect('unix:///some/path')
```
Thus 5 bytes is the smallest message axon supports at the moment. Later if
necessary we can use the meta to indicate a small message and ditch octet 2 and 3
allowing for 3 byte messages.
## Protocol
## Codecs
Axon 2.x uses the extremely simple [AMP](https://github.com/visionmedia/node-amp) protocol to send messages on the wire. Codecs are no longer required as they were in Axon 1.x.
To define a codec simply invoke the `axon.codec.define()` method, for example
here is the JSON codec:
```js
var axon = require('axon');
axon.codec.define('json', {
encode: JSON.stringify,
decode: JSON.parse
});
```
__Note:__ codecs must be defined on both the sending and receiving ends, otherwise
axon cannot properly decode the messages. You may of course ignore this
feature all together and simply pass encoded data to `.send()`.
To use a codec in a socket pair, use the `format(<codec name>)` command. For example, to send json over a req/rep socket pair:
```
var axon = require('axon')
, req = axon.socket('req')
, rep = axon.socket('rep')
req.format('json');
req.bind(3000);
rep.format('json');
rep.connect(3000);
rep.on('message', function(obj, reply){
reply(obj);
});
req.send({ hello: 'World' }, function(res){
console.log(res);
});
```
## Performance

@@ -412,3 +375,2 @@

- [Axon RPC](https://github.com/visionmedia/axon-rpc)
- [msgpack codec](https://github.com/visionmedia/axon-msgpack)

@@ -415,0 +377,0 @@ ## License

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc