Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

axon

Package Overview
Dependencies
Maintainers
1
Versions
23
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

axon - npm Package Compare versions

Comparing version 0.0.3 to 0.1.0

lib/message.js

13

benchmark/pub.js
var ss = require('..')
, sock = ss.socket('pub');
, program = require('commander');
program
.option('-T, --type <name>', 'socket type [pub]', 'pub')
.option('-t, --per-tick <n>', 'messages per tick [1000]', parseInt)
.option('-s, --size <n>', 'message size in bytes [1024]', parseInt)
.parse(process.argv)
var sock = ss.socket(program.type);
sock.bind(3000);
console.log('pub bound');
var perTick = 1000;
var buf = new Buffer(Array(1024).join('a'));
var perTick = program.perTick || 1000;
var buf = new Buffer(Array(program.size || 1024).join('a'));
console.log('sending %d per tick', perTick);

@@ -11,0 +18,0 @@ console.log('sending %d byte messages', buf.length);

var ss = require('..')
, sock = ss.socket('sub');
, program = require('commander');
program
.option('-T, --type <name>', 'socket type [sub]', 'sub')
.option('-s, --size <n>', 'message size in bytes [1024]', parseInt)
.parse(process.argv)
var sock = ss.socket(program.type);
sock.connect(3000);

@@ -10,3 +16,3 @@

var ops = 200;
var bytes = 1024;
var bytes = program.size || 1024;
var t = start = process.hrtime();

@@ -13,0 +19,0 @@ var results = [];

0.1.0 / 2012-09-24
==================
* add router socket [gjohnson]
* add dealer socket [gjohnson]
* add req socket [gjohnson]
* add rep socket [gjohnson]
* add multipart support [gjohnson]
* add `.set()` / `.get()` configuration methods
* add tcp://hostname:port support to .bind() and .connect(). Closes #16
* add `make bm`
* add Batch#empty()
* remove Socket#option()
0.0.3 / 2012-07-14

@@ -3,0 +17,0 @@ ==================

/**
* Expose `Batch`.
*/
module.exports = Batch;
/**
* Initialize a new `Batch`.
*
* The "Batch" is in charge of buffering
* messages which may then be written to
* the socket(s) at once, increasing throughput.
*
* @api private
*/
function Batch() {

@@ -8,2 +22,9 @@ this.clear();

/**
* Add `msg` to the batch.
*
* @param {Buffer} msg
* @api private
*/
Batch.prototype.add = function(msg){

@@ -13,2 +34,20 @@ this.msgs.push(msg);

/**
* Check if the batch is empty.
*
* @return {Boolean}
* @api private
*/
Batch.prototype.empty = function(){
return 0 == this.msgs.length;
};
/**
* Return the total length of all buffers.
*
* @return {String}
* @api private
*/
Batch.prototype.length = function(){

@@ -23,2 +62,8 @@ var ret = 0;

/**
* Clear the batch buffer.
*
* @api private
*/
Batch.prototype.clear = function(){

@@ -28,2 +73,16 @@ this.msgs = [];

/**
* Return a `Buffer` containing all the buffered
* messages as contiguous memory.
*
* TODO:
* look into optimizing this
* and do some better profiling. less .write()s
* really bumps our throughput for small-ish messages,
* however for larger ones these copies are terrible.
*
* @return {Buffer}
* @api private
*/
Batch.prototype.toBuffer = function(){

@@ -30,0 +89,0 @@ var buf = new Buffer(this.length());

/**
* Library version.
*/
exports.version = '0.0.1';
/**
* Constructors.

@@ -13,6 +7,5 @@ */

exports.Batch = require('./batch');
exports.Decoder = require('./decoder');
exports.Encoder = require('./encoder');
exports.Parser = require('./parser');
exports.Message = require('./message');
exports.Socket = require('./sockets/sock');
exports.Queue = require('./sockets/queue');
exports.PubSocket = require('./sockets/pub');

@@ -23,2 +16,6 @@ exports.SubSocket = require('./sockets/sub');

exports.EmitterSocket = require('./sockets/emitter');
exports.RouterSocket = require('./sockets/router');
exports.DealerSocket = require('./sockets/dealer');
exports.ReqSocket = require('./sockets/req');
exports.RepSocket = require('./sockets/rep');

@@ -31,3 +28,2 @@ /**

stream: exports.Socket,
queue: exports.Queue,
pub: exports.PubSocket,

@@ -37,3 +33,7 @@ sub: exports.SubSocket,

pull: exports.PullSocket,
emitter: exports.EmitterSocket
emitter: exports.EmitterSocket,
router: exports.RouterSocket,
dealer: exports.DealerSocket,
req: exports.ReqSocket,
rep: exports.RepSocket
};

@@ -40,0 +40,0 @@

@@ -52,4 +52,4 @@

this.sub = new SubSocket;
this.sub.on('message', function(args){
self.emit.apply(self, args);
this.sub.on('message', function(){
self.emit.apply(self, arguments);
});

@@ -77,4 +77,3 @@ return this.sub.connect.apply(this.sub, arguments);

function emit(event){
var args = [].slice.apply(arguments);
this.pub.send(args);
};
this.pub.send.apply(this.pub, arguments);
}

@@ -6,3 +6,3 @@

var Queue = require('./queue')
var Socket = require('./sock')
, Batch = require('../batch');

@@ -17,3 +17,3 @@

/**
* Initialzie a new `PubSocket`.
* Initialize a new `PubSocket`.
*

@@ -24,16 +24,20 @@ * @api private

function PubSocket() {
Queue.call(this);
Socket.call(this);
var self = this;
this.n = 0;
this.filters = [];
this.batch = new Batch;
this.batchMax = 10;
this.batchTTL = 100;
this.batchTimer = setInterval(this.flushBatch.bind(this), this.batchTTL);
this.n = 0;
this.set('batch max', 10);
this.set('batch ttl', 100);
process.nextTick(function(){
var ttl = self.get('batch ttl');
self.batchTimer = setInterval(self.flushBatch.bind(self), ttl);
});
}
/**
* Inherits from `Queue.prototype`.
* Inherits from `Socket.prototype`.
*/
PubSocket.prototype.__proto__ = Queue.prototype;
PubSocket.prototype.__proto__ = Socket.prototype;

@@ -47,3 +51,4 @@ /**

PubSocket.prototype.flushBatch = function(){
if (!this.batch.msgs.length) return;
if (this.batch.empty()) return;
var socks = this.socks

@@ -66,2 +71,5 @@ , len = socks.length

*
* Messages will be batched rather then sent immediately
* until the batch reaches the option `batch max`.
*
* @param {Mixed} msg

@@ -72,10 +80,13 @@ * @api public

PubSocket.prototype.send = function(msg){
var codec = this.codec;
var msg = this.encoder.pack(codec.encode(msg), codec.id);
if (++this.n == this.batchMax) {
this.flushBatch();
if (++this.n == this.get('batch max')) return this.flushBatch();
if (Array.isArray(msg)) {
this.batch.add(this.pack(msg));
} else {
this.batch.add(msg);
var args = [];
for (var i = 0; i < arguments.length; ++i) {
args[i] = arguments[i];
}
this.batch.add(this.pack(args));
}
return this;
};

@@ -91,3 +102,3 @@

clearInterval(this.batchTimer);
return Queue.prototype.close.call(this);
return Socket.prototype.close.call(this);
};

@@ -6,3 +6,3 @@

var Queue = require('./queue');
var Socket = require('./sock');

@@ -22,3 +22,3 @@ /**

function PullSocket() {
Queue.call(this);
Socket.call(this);
// TODO: selective reception

@@ -28,5 +28,13 @@ }

/**
* Inherits from `Queue.prototype`.
* Inherits from `Socket.prototype`.
*/
PullSocket.prototype.__proto__ = Queue.prototype;
PullSocket.prototype.__proto__ = Socket.prototype;
/**
* Pull sockets should not send messages.
*/
PullSocket.prototype.send = function(){
throw new Error('pull sockets should not send messages');
};

@@ -6,3 +6,5 @@

var Queue = require('./queue');
var Socket = require('./sock')
, queue = require('../plugins/queue')
, roundrobin = require('../plugins/round-robin');

@@ -16,3 +18,3 @@ /**

/**
* Initialzie a new `PushSocket`.
* Initialize a new `PushSocket`.
*

@@ -23,61 +25,11 @@ * @api private

function PushSocket() {
Queue.call(this);
this.n = 0;
this.on('connection', this.flush.bind(this));
Socket.call(this);
this.use(queue());
this.use(roundrobin({ fallback: this.enqueue }));
}
/**
* Inherits from `Queue.prototype`.
* Inherits from `Socket.prototype`.
*/
PushSocket.prototype.__proto__ = Queue.prototype;
/**
* Send `msg` to all established peers.
*
* @param {Mixed} msg
* @api private
*/
PushSocket.prototype.sendToPeers = function(msg){
var socks = this.socks
, len = socks.length
, sock = socks[this.n++ % len]
, codec = this.codec;
if (sock) {
sock.write(this.encoder.pack(codec.encode(msg), codec.id));
} else {
this.buf.push(msg);
}
};
/**
* Send `msg` to established peer.
*
* @param {Mixed} msg
* @api private
*/
PushSocket.prototype.sendToPeer = function(msg){
var sock = this.sock
, codec = this.codec;
if (sock) {
sock.write(this.encoder.pack(codec.encode(msg), codec.id));
} else {
this.buf.push(msg);
}
};
/**
* Send `msg` round-robin to established peers.
*
* @param {Mixed} msg
* @api public
*/
PushSocket.prototype.send = function(msg){
if ('server' == this.type) this.sendToPeers(msg);
else this.sendToPeer(msg);
};
PushSocket.prototype.__proto__ = Socket.prototype;

@@ -7,4 +7,9 @@

var net = require('net')
, Parser = require('../parser')
, Message = require('../message')
, codecs = require('../codecs')
, Emitter = require('events').EventEmitter
, debug = require('debug')('ss:sock');
, debug = require('debug')('axon:sock')
, Configurable = require('configurable')
, url = require('url');

@@ -20,6 +25,10 @@ /**

*
* A super socket "Socket" encapsulates the
* reconnection logic with exponential backoff,
* serving as a base for the `Queue`.
* A "Socket" encapsulates the ability of being
* the "client" or the "server" depending on
* whether `connect()` or `bind()` was called.
*
* Upon sending and recieving messages, the correct codec
* set by `format` will be applied. Both sides must elect
* the same codec, or all hell will break loose on the app.
*
* @api private

@@ -30,7 +39,200 @@ */

var self = this;
var sock = this.sock = new net.Socket;
this.opts = {};
this.server = null;
this.socks = [];
this.map = {};
this.settings = {};
this.format('none');
this.set('identity', '\u0000');
this.set('retry timeout', 100);
this.set('retry max timeout', 2000);
}
this.retryTimeout = this.retry = 100;
this.retryMaxTimeout = 2000;
/**
* Inherit from `Emitter.prototype`.
*/
Socket.prototype.__proto__ = Emitter.prototype;
/**
* Make it configurable `.set()` etc.
*/
Configurable(Socket.prototype);
/**
* Use the given `plugin`.
*
* @param {Function} plugin
* @api private
*/
Socket.prototype.use = function(plugin){
plugin(this);
return this;
};
/**
* Set format to `type`.
*
* @param {String} type
* @return {Socket}
* @api public
*/
Socket.prototype.format = function(type){
var codec = codecs.byName(type);
if (!codec) throw new Error('unknown format "' + type + '"');
this.codec = codec;
return this;
};
/**
* Creates a new `Message` and writes `data` using the
* `codec` set already by `format`.
*
* This will only work for single part messages or multi
* part message that all use the same codec. If you need
* otherwise, use the `Message` constructor to craft
* your own message.
*
* @param {Mixed} data
* @return {Buffer}
* @api private
*/
Socket.prototype.pack = function(data){
var msg = new Message
, codec = this.codec;
if (Array.isArray(data)) {
for (var i = 0; i < data.length; i++) {
msg.write(codec.encode(data[i]), codec.id);
}
} else {
msg.write(codec.encode(data), codec.id);
}
return msg.toBuffer();
};
/**
* Close the socket.
*
* Delegates to the server or clients
* based on the socket `type`.
*
* @api public
*/
Socket.prototype.close = function(){
debug('close');
this.closing = true;
if ('server' == this.type) {
this.server && this.server.close();
} else {
this.socks.forEach(function(sock){
sock.destroy();
});
}
};
/**
* Remove `sock`.
*
* @param {Socket} sock
* @api private
*/
Socket.prototype.removeSocket = function(sock){
var i = this.socks.indexOf(sock);
this.socks.splice(i, 1);
if (sock._axon_id) delete this.map[sock._axon_id];
};
/**
* Add `sock`.
*
* @param {Socket} sock
* @api private
*/
Socket.prototype.addSocket = function(sock){
var self = this
, pid = String(process.pid)
, parser = new Parser
, n = this.socks.push(sock);
// send our greeting
sock.write(this.pack(this.get('identity')));
// parse incoming
sock.on('data', parser.write.bind(parser));
// accept greeting once, emit messages there on out
parser.onmessage = ongreeting;
function ongreeting(msg){
var id = String(msg);
if ('0' === id) id = pid + Date.now() + n;
sock._axon_id = id;
self.map[id] = sock;
parser.onmessage = self.onmessage(sock);
}
};
/**
* Handles framed messages emitted from the parser, by
* default it will go ahead and emit the "message" events on
* the socket. However, if the "higher level" socket needs
* to hook into the messages before they are emitted, it
* should override this method and take care of everything
* it self, including emitted the "message" event.
*
* @param {net.Socket} sock
* @return {Function} closure(msg, mulitpart)
* @api private
*/
Socket.prototype.onmessage = function(sock){
var self = this;
return function(msg, multipart){
if (multipart) {
self.emit.apply(self, ['message'].concat(msg));
} else {
self.emit('message', msg);
}
};
};
/**
* Connect to `port` at `host` and invoke `fn()`.
*
* Defaults `host` to localhost.
*
* @param {Number|String} port
* @param {String} host
* @param {Function} fn
* @return {Socket}
* @api public
*/
Socket.prototype.connect = function(port, host, fn){
var self = this;
if ('server' == this.type) throw new Error('cannot connect() after bind()');
if ('function' == typeof host) fn = host, host = undefined;
if ('string' == typeof port) {
port = url.parse(port);
host = port.hostname;
port = parseInt(port.port, 10);
}
var sock = new net.Socket;
sock.setNoDelay();
this.type = 'client';
port = port;
host = host || '127.0.0.1';
sock.on('error', function(err){

@@ -42,9 +244,8 @@ if ('ECONNREFUSED' != err.code) {

sock.on('data', function(chunk){
self.emit('data', chunk);
});
sock.on('close', function(){
self.connected = false;
self.removeSocket(sock);
if (self.closing) return self.emit('close');
var max = self.get('max retry timeout');
var retry = self.retry || self.get('retry timeout');
setTimeout(function(){

@@ -54,5 +255,5 @@ debug('attempting reconnect');

sock.destroy();
self.connect(self.port, self.host);
self.retry = Math.min(self.retryMaxTimeout, self.retry * 1.5);
}, self.retry);
self.connect(port, host);
self.retry = Math.min(max, retry * 1.5);
}, retry);
});

@@ -63,47 +264,70 @@

self.connected = true;
self.retry = self.retryTimeout;
self.addSocket(sock);
self.retry = self.get('retry timeout');
self.emit('connect');
self.callback && self.callback();
fn && fn();
});
}
debug('connect attempt %s:%s', host, port);
sock.connect(port, host);
return this;
};
/**
* Inherit from `Emitter.prototype`.
* Handle connection.
*
* @param {Socket} sock
* @api private
*/
Socket.prototype.__proto__ = Emitter.prototype;
Socket.prototype.onconnect = function(sock){
var self = this;
var addr = sock.remoteAddress + ':' + sock.remotePort;
debug('connection %s', addr);
this.addSocket(sock);
this.emit('connect', sock);
sock.on('close', function(){
debug('disconnect %s', addr);
self.emit('disconnect', sock);
self.removeSocket(sock);
});
};
/**
* Connect to `port` at `host` and invoke `fn()`.
* Bind to `port` at `host` and invoke `fn()`.
*
* Defaults `host` to localhost.
* Defaults `host` to INADDR_ANY.
*
* @param {Number} port
* @param {String} host
* Emits:
*
* - `connection` when a client connects
* - `disconnect` when a client disconnects
* - `bind` when bound and listening
*
* @param {Number|String} port
* @param {Function} fn
* @return {Socket}
* @api public
*/
Socket.prototype.connect = function(port, host, fn){
if ('function' == host) fn = host, host = undefined;
this.type = 'client';
this.port = port;
this.host = host || '127.0.0.1';
debug('connect %s:%s', this.host, this.port);
this.sock.connect(this.port, this.host);
this.callback = fn;
return this;
};
Socket.prototype.bind = function(port, host, fn){
var self = this;
if ('client' == this.type) throw new Error('cannot bind() after connect()');
if ('function' == typeof host) fn = host, host = undefined;
/**
* Close the socket.
*
* @api public
*/
if ('string' == typeof port) {
port = url.parse(port);
host = port.hostname;
port = parseInt(port.port, 10);
}
Socket.prototype.close = function(){
debug('close');
this.closing = true;
this.sock.destroy();
this.type = 'server';
host = host || '0.0.0.0';
this.server = net.createServer(this.onconnect.bind(this));
debug('bind %s:%s', host, port);
this.server.on('listening', this.emit.bind('bind'));
this.server.listen(port, host, fn);
return this;
};

@@ -6,3 +6,3 @@

var Queue = require('./queue');
var Socket = require('./sock');

@@ -22,3 +22,3 @@ /**

function SubSocket() {
Queue.call(this);
Socket.call(this);
this.subscriptions = [];

@@ -28,5 +28,13 @@ }

/**
* Inherits from `Queue.prototype`.
* Inherits from `Socket.prototype`.
*/
SubSocket.prototype.__proto__ = Queue.prototype;
SubSocket.prototype.__proto__ = Socket.prototype;
/**
* Subscribers should not send messages.
*/
SubSocket.prototype.send = function(){
throw new Error('subscribers should not send messages');
};
{
"name": "axon",
"description": "High-level messaging & socket patterns implemented in pure js",
"version": "0.0.3",
"version": "0.1.0",
"author": "TJ Holowaychuk <tj@vision-media.ca>",
"dependencies": {
"debug": "*"
"debug": "*",
"configurable": "0.0.1"
},
"devDependencies": {
"should": "*",
"mocha": "*"
"mocha": "*",
"commander": "*"
},
"keywords": ["zmq", "zeromq", "pubsub", "socket", "emitter", "ipc", "rpc"]
}

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc