Comparing version 0.0.3 to 0.1.0
var ss = require('..') | ||
, sock = ss.socket('pub'); | ||
, program = require('commander'); | ||
program | ||
.option('-T, --type <name>', 'socket type [pub]', 'pub') | ||
.option('-t, --per-tick <n>', 'messages per tick [1000]', parseInt) | ||
.option('-s, --size <n>', 'message size in bytes [1024]', parseInt) | ||
.parse(process.argv) | ||
var sock = ss.socket(program.type); | ||
sock.bind(3000); | ||
console.log('pub bound'); | ||
var perTick = 1000; | ||
var buf = new Buffer(Array(1024).join('a')); | ||
var perTick = program.perTick || 1000; | ||
var buf = new Buffer(Array(program.size || 1024).join('a')); | ||
console.log('sending %d per tick', perTick); | ||
@@ -11,0 +18,0 @@ console.log('sending %d byte messages', buf.length); |
var ss = require('..') | ||
, sock = ss.socket('sub'); | ||
, program = require('commander'); | ||
program | ||
.option('-T, --type <name>', 'socket type [sub]', 'sub') | ||
.option('-s, --size <n>', 'message size in bytes [1024]', parseInt) | ||
.parse(process.argv) | ||
var sock = ss.socket(program.type); | ||
sock.connect(3000); | ||
@@ -10,3 +16,3 @@ | ||
var ops = 200; | ||
var bytes = 1024; | ||
var bytes = program.size || 1024; | ||
var t = start = process.hrtime(); | ||
@@ -13,0 +19,0 @@ var results = []; |
0.1.0 / 2012-09-24 | ||
================== | ||
* add router socket [gjohnson] | ||
* add dealer socket [gjohnson] | ||
* add req socket [gjohnson] | ||
* add rep socket [gjohnson] | ||
* add multipart support [gjohnson] | ||
* add `.set()` / `.get()` configuration methods | ||
* add tcp://hostname:port support to .bind() and .connect(). Closes #16 | ||
* add `make bm` | ||
* add Batch#empty() | ||
* remove Socket#option() | ||
0.0.3 / 2012-07-14 | ||
@@ -3,0 +17,0 @@ ================== |
/** | ||
* Expose `Batch`. | ||
*/ | ||
module.exports = Batch; | ||
/** | ||
* Initialize a new `Batch`. | ||
* | ||
* The "Batch" is in charge of buffering | ||
* messages which may then be written to | ||
* the socket(s) at once, increasing throughput. | ||
* | ||
* @api private | ||
*/ | ||
function Batch() { | ||
@@ -8,2 +22,9 @@ this.clear(); | ||
/** | ||
* Add `msg` to the batch. | ||
* | ||
* @param {Buffer} msg | ||
* @api private | ||
*/ | ||
Batch.prototype.add = function(msg){ | ||
@@ -13,2 +34,20 @@ this.msgs.push(msg); | ||
/** | ||
* Check if the batch is empty. | ||
* | ||
* @return {Boolean} | ||
* @api private | ||
*/ | ||
Batch.prototype.empty = function(){ | ||
return 0 == this.msgs.length; | ||
}; | ||
/** | ||
* Return the total length of all buffers. | ||
* | ||
* @return {String} | ||
* @api private | ||
*/ | ||
Batch.prototype.length = function(){ | ||
@@ -23,2 +62,8 @@ var ret = 0; | ||
/** | ||
* Clear the batch buffer. | ||
* | ||
* @api private | ||
*/ | ||
Batch.prototype.clear = function(){ | ||
@@ -28,2 +73,16 @@ this.msgs = []; | ||
/** | ||
* Return a `Buffer` containing all the buffered | ||
* messages as contiguous memory. | ||
* | ||
* TODO: | ||
* look into optimizing this | ||
* and do some better profiling. less .write()s | ||
* really bumps our throughput for small-ish messages, | ||
* however for larger ones these copies are terrible. | ||
* | ||
* @return {Buffer} | ||
* @api private | ||
*/ | ||
Batch.prototype.toBuffer = function(){ | ||
@@ -30,0 +89,0 @@ var buf = new Buffer(this.length()); |
/** | ||
* Library version. | ||
*/ | ||
exports.version = '0.0.1'; | ||
/** | ||
* Constructors. | ||
@@ -13,6 +7,5 @@ */ | ||
exports.Batch = require('./batch'); | ||
exports.Decoder = require('./decoder'); | ||
exports.Encoder = require('./encoder'); | ||
exports.Parser = require('./parser'); | ||
exports.Message = require('./message'); | ||
exports.Socket = require('./sockets/sock'); | ||
exports.Queue = require('./sockets/queue'); | ||
exports.PubSocket = require('./sockets/pub'); | ||
@@ -23,2 +16,6 @@ exports.SubSocket = require('./sockets/sub'); | ||
exports.EmitterSocket = require('./sockets/emitter'); | ||
exports.RouterSocket = require('./sockets/router'); | ||
exports.DealerSocket = require('./sockets/dealer'); | ||
exports.ReqSocket = require('./sockets/req'); | ||
exports.RepSocket = require('./sockets/rep'); | ||
@@ -31,3 +28,2 @@ /** | ||
stream: exports.Socket, | ||
queue: exports.Queue, | ||
pub: exports.PubSocket, | ||
@@ -37,3 +33,7 @@ sub: exports.SubSocket, | ||
pull: exports.PullSocket, | ||
emitter: exports.EmitterSocket | ||
emitter: exports.EmitterSocket, | ||
router: exports.RouterSocket, | ||
dealer: exports.DealerSocket, | ||
req: exports.ReqSocket, | ||
rep: exports.RepSocket | ||
}; | ||
@@ -40,0 +40,0 @@ |
@@ -52,4 +52,4 @@ | ||
this.sub = new SubSocket; | ||
this.sub.on('message', function(args){ | ||
self.emit.apply(self, args); | ||
this.sub.on('message', function(){ | ||
self.emit.apply(self, arguments); | ||
}); | ||
@@ -77,4 +77,3 @@ return this.sub.connect.apply(this.sub, arguments); | ||
function emit(event){ | ||
var args = [].slice.apply(arguments); | ||
this.pub.send(args); | ||
}; | ||
this.pub.send.apply(this.pub, arguments); | ||
} |
@@ -6,3 +6,3 @@ | ||
var Queue = require('./queue') | ||
var Socket = require('./sock') | ||
, Batch = require('../batch'); | ||
@@ -17,3 +17,3 @@ | ||
/** | ||
* Initialzie a new `PubSocket`. | ||
* Initialize a new `PubSocket`. | ||
* | ||
@@ -24,16 +24,20 @@ * @api private | ||
function PubSocket() { | ||
Queue.call(this); | ||
Socket.call(this); | ||
var self = this; | ||
this.n = 0; | ||
this.filters = []; | ||
this.batch = new Batch; | ||
this.batchMax = 10; | ||
this.batchTTL = 100; | ||
this.batchTimer = setInterval(this.flushBatch.bind(this), this.batchTTL); | ||
this.n = 0; | ||
this.set('batch max', 10); | ||
this.set('batch ttl', 100); | ||
process.nextTick(function(){ | ||
var ttl = self.get('batch ttl'); | ||
self.batchTimer = setInterval(self.flushBatch.bind(self), ttl); | ||
}); | ||
} | ||
/** | ||
* Inherits from `Queue.prototype`. | ||
* Inherits from `Socket.prototype`. | ||
*/ | ||
PubSocket.prototype.__proto__ = Queue.prototype; | ||
PubSocket.prototype.__proto__ = Socket.prototype; | ||
@@ -47,3 +51,4 @@ /** | ||
PubSocket.prototype.flushBatch = function(){ | ||
if (!this.batch.msgs.length) return; | ||
if (this.batch.empty()) return; | ||
var socks = this.socks | ||
@@ -66,2 +71,5 @@ , len = socks.length | ||
* | ||
* Messages will be batched rather then sent immediately | ||
* until the batch reaches the option `batch max`. | ||
* | ||
* @param {Mixed} msg | ||
@@ -72,10 +80,13 @@ * @api public | ||
PubSocket.prototype.send = function(msg){ | ||
var codec = this.codec; | ||
var msg = this.encoder.pack(codec.encode(msg), codec.id); | ||
if (++this.n == this.batchMax) { | ||
this.flushBatch(); | ||
if (++this.n == this.get('batch max')) return this.flushBatch(); | ||
if (Array.isArray(msg)) { | ||
this.batch.add(this.pack(msg)); | ||
} else { | ||
this.batch.add(msg); | ||
var args = []; | ||
for (var i = 0; i < arguments.length; ++i) { | ||
args[i] = arguments[i]; | ||
} | ||
this.batch.add(this.pack(args)); | ||
} | ||
return this; | ||
}; | ||
@@ -91,3 +102,3 @@ | ||
clearInterval(this.batchTimer); | ||
return Queue.prototype.close.call(this); | ||
return Socket.prototype.close.call(this); | ||
}; |
@@ -6,3 +6,3 @@ | ||
var Queue = require('./queue'); | ||
var Socket = require('./sock'); | ||
@@ -22,3 +22,3 @@ /** | ||
function PullSocket() { | ||
Queue.call(this); | ||
Socket.call(this); | ||
// TODO: selective reception | ||
@@ -28,5 +28,13 @@ } | ||
/** | ||
* Inherits from `Queue.prototype`. | ||
* Inherits from `Socket.prototype`. | ||
*/ | ||
PullSocket.prototype.__proto__ = Queue.prototype; | ||
PullSocket.prototype.__proto__ = Socket.prototype; | ||
/** | ||
* Pull sockets should not send messages. | ||
*/ | ||
PullSocket.prototype.send = function(){ | ||
throw new Error('pull sockets should not send messages'); | ||
}; |
@@ -6,3 +6,5 @@ | ||
var Queue = require('./queue'); | ||
var Socket = require('./sock') | ||
, queue = require('../plugins/queue') | ||
, roundrobin = require('../plugins/round-robin'); | ||
@@ -16,3 +18,3 @@ /** | ||
/** | ||
* Initialzie a new `PushSocket`. | ||
* Initialize a new `PushSocket`. | ||
* | ||
@@ -23,61 +25,11 @@ * @api private | ||
function PushSocket() { | ||
Queue.call(this); | ||
this.n = 0; | ||
this.on('connection', this.flush.bind(this)); | ||
Socket.call(this); | ||
this.use(queue()); | ||
this.use(roundrobin({ fallback: this.enqueue })); | ||
} | ||
/** | ||
* Inherits from `Queue.prototype`. | ||
* Inherits from `Socket.prototype`. | ||
*/ | ||
PushSocket.prototype.__proto__ = Queue.prototype; | ||
/** | ||
* Send `msg` to all established peers. | ||
* | ||
* @param {Mixed} msg | ||
* @api private | ||
*/ | ||
PushSocket.prototype.sendToPeers = function(msg){ | ||
var socks = this.socks | ||
, len = socks.length | ||
, sock = socks[this.n++ % len] | ||
, codec = this.codec; | ||
if (sock) { | ||
sock.write(this.encoder.pack(codec.encode(msg), codec.id)); | ||
} else { | ||
this.buf.push(msg); | ||
} | ||
}; | ||
/** | ||
* Send `msg` to established peer. | ||
* | ||
* @param {Mixed} msg | ||
* @api private | ||
*/ | ||
PushSocket.prototype.sendToPeer = function(msg){ | ||
var sock = this.sock | ||
, codec = this.codec; | ||
if (sock) { | ||
sock.write(this.encoder.pack(codec.encode(msg), codec.id)); | ||
} else { | ||
this.buf.push(msg); | ||
} | ||
}; | ||
/** | ||
* Send `msg` round-robin to established peers. | ||
* | ||
* @param {Mixed} msg | ||
* @api public | ||
*/ | ||
PushSocket.prototype.send = function(msg){ | ||
if ('server' == this.type) this.sendToPeers(msg); | ||
else this.sendToPeer(msg); | ||
}; | ||
PushSocket.prototype.__proto__ = Socket.prototype; |
@@ -7,4 +7,9 @@ | ||
var net = require('net') | ||
, Parser = require('../parser') | ||
, Message = require('../message') | ||
, codecs = require('../codecs') | ||
, Emitter = require('events').EventEmitter | ||
, debug = require('debug')('ss:sock'); | ||
, debug = require('debug')('axon:sock') | ||
, Configurable = require('configurable') | ||
, url = require('url'); | ||
@@ -20,6 +25,10 @@ /** | ||
* | ||
* A super socket "Socket" encapsulates the | ||
* reconnection logic with exponential backoff, | ||
* serving as a base for the `Queue`. | ||
* A "Socket" encapsulates the ability of being | ||
* the "client" or the "server" depending on | ||
* whether `connect()` or `bind()` was called. | ||
* | ||
* Upon sending and recieving messages, the correct codec | ||
* set by `format` will be applied. Both sides must elect | ||
* the same codec, or all hell will break loose on the app. | ||
* | ||
* @api private | ||
@@ -30,7 +39,200 @@ */ | ||
var self = this; | ||
var sock = this.sock = new net.Socket; | ||
this.opts = {}; | ||
this.server = null; | ||
this.socks = []; | ||
this.map = {}; | ||
this.settings = {}; | ||
this.format('none'); | ||
this.set('identity', '\u0000'); | ||
this.set('retry timeout', 100); | ||
this.set('retry max timeout', 2000); | ||
} | ||
this.retryTimeout = this.retry = 100; | ||
this.retryMaxTimeout = 2000; | ||
/** | ||
* Inherit from `Emitter.prototype`. | ||
*/ | ||
Socket.prototype.__proto__ = Emitter.prototype; | ||
/** | ||
* Make it configurable `.set()` etc. | ||
*/ | ||
Configurable(Socket.prototype); | ||
/** | ||
* Use the given `plugin`. | ||
* | ||
* @param {Function} plugin | ||
* @api private | ||
*/ | ||
Socket.prototype.use = function(plugin){ | ||
plugin(this); | ||
return this; | ||
}; | ||
/** | ||
* Set format to `type`. | ||
* | ||
* @param {String} type | ||
* @return {Socket} | ||
* @api public | ||
*/ | ||
Socket.prototype.format = function(type){ | ||
var codec = codecs.byName(type); | ||
if (!codec) throw new Error('unknown format "' + type + '"'); | ||
this.codec = codec; | ||
return this; | ||
}; | ||
/** | ||
* Creates a new `Message` and writes `data` using the | ||
* `codec` set already by `format`. | ||
* | ||
* This will only work for single part messages or multi | ||
* part message that all use the same codec. If you need | ||
* otherwise, use the `Message` constructor to craft | ||
* your own message. | ||
* | ||
* @param {Mixed} data | ||
* @return {Buffer} | ||
* @api private | ||
*/ | ||
Socket.prototype.pack = function(data){ | ||
var msg = new Message | ||
, codec = this.codec; | ||
if (Array.isArray(data)) { | ||
for (var i = 0; i < data.length; i++) { | ||
msg.write(codec.encode(data[i]), codec.id); | ||
} | ||
} else { | ||
msg.write(codec.encode(data), codec.id); | ||
} | ||
return msg.toBuffer(); | ||
}; | ||
/** | ||
* Close the socket. | ||
* | ||
* Delegates to the server or clients | ||
* based on the socket `type`. | ||
* | ||
* @api public | ||
*/ | ||
Socket.prototype.close = function(){ | ||
debug('close'); | ||
this.closing = true; | ||
if ('server' == this.type) { | ||
this.server && this.server.close(); | ||
} else { | ||
this.socks.forEach(function(sock){ | ||
sock.destroy(); | ||
}); | ||
} | ||
}; | ||
/** | ||
* Remove `sock`. | ||
* | ||
* @param {Socket} sock | ||
* @api private | ||
*/ | ||
Socket.prototype.removeSocket = function(sock){ | ||
var i = this.socks.indexOf(sock); | ||
this.socks.splice(i, 1); | ||
if (sock._axon_id) delete this.map[sock._axon_id]; | ||
}; | ||
/** | ||
* Add `sock`. | ||
* | ||
* @param {Socket} sock | ||
* @api private | ||
*/ | ||
Socket.prototype.addSocket = function(sock){ | ||
var self = this | ||
, pid = String(process.pid) | ||
, parser = new Parser | ||
, n = this.socks.push(sock); | ||
// send our greeting | ||
sock.write(this.pack(this.get('identity'))); | ||
// parse incoming | ||
sock.on('data', parser.write.bind(parser)); | ||
// accept greeting once, emit messages there on out | ||
parser.onmessage = ongreeting; | ||
function ongreeting(msg){ | ||
var id = String(msg); | ||
if ('0' === id) id = pid + Date.now() + n; | ||
sock._axon_id = id; | ||
self.map[id] = sock; | ||
parser.onmessage = self.onmessage(sock); | ||
} | ||
}; | ||
/** | ||
* Handles framed messages emitted from the parser, by | ||
* default it will go ahead and emit the "message" events on | ||
* the socket. However, if the "higher level" socket needs | ||
* to hook into the messages before they are emitted, it | ||
* should override this method and take care of everything | ||
* it self, including emitted the "message" event. | ||
* | ||
* @param {net.Socket} sock | ||
* @return {Function} closure(msg, mulitpart) | ||
* @api private | ||
*/ | ||
Socket.prototype.onmessage = function(sock){ | ||
var self = this; | ||
return function(msg, multipart){ | ||
if (multipart) { | ||
self.emit.apply(self, ['message'].concat(msg)); | ||
} else { | ||
self.emit('message', msg); | ||
} | ||
}; | ||
}; | ||
/** | ||
* Connect to `port` at `host` and invoke `fn()`. | ||
* | ||
* Defaults `host` to localhost. | ||
* | ||
* @param {Number|String} port | ||
* @param {String} host | ||
* @param {Function} fn | ||
* @return {Socket} | ||
* @api public | ||
*/ | ||
Socket.prototype.connect = function(port, host, fn){ | ||
var self = this; | ||
if ('server' == this.type) throw new Error('cannot connect() after bind()'); | ||
if ('function' == typeof host) fn = host, host = undefined; | ||
if ('string' == typeof port) { | ||
port = url.parse(port); | ||
host = port.hostname; | ||
port = parseInt(port.port, 10); | ||
} | ||
var sock = new net.Socket; | ||
sock.setNoDelay(); | ||
this.type = 'client'; | ||
port = port; | ||
host = host || '127.0.0.1'; | ||
sock.on('error', function(err){ | ||
@@ -42,9 +244,8 @@ if ('ECONNREFUSED' != err.code) { | ||
sock.on('data', function(chunk){ | ||
self.emit('data', chunk); | ||
}); | ||
sock.on('close', function(){ | ||
self.connected = false; | ||
self.removeSocket(sock); | ||
if (self.closing) return self.emit('close'); | ||
var max = self.get('max retry timeout'); | ||
var retry = self.retry || self.get('retry timeout'); | ||
setTimeout(function(){ | ||
@@ -54,5 +255,5 @@ debug('attempting reconnect'); | ||
sock.destroy(); | ||
self.connect(self.port, self.host); | ||
self.retry = Math.min(self.retryMaxTimeout, self.retry * 1.5); | ||
}, self.retry); | ||
self.connect(port, host); | ||
self.retry = Math.min(max, retry * 1.5); | ||
}, retry); | ||
}); | ||
@@ -63,47 +264,70 @@ | ||
self.connected = true; | ||
self.retry = self.retryTimeout; | ||
self.addSocket(sock); | ||
self.retry = self.get('retry timeout'); | ||
self.emit('connect'); | ||
self.callback && self.callback(); | ||
fn && fn(); | ||
}); | ||
} | ||
debug('connect attempt %s:%s', host, port); | ||
sock.connect(port, host); | ||
return this; | ||
}; | ||
/** | ||
* Inherit from `Emitter.prototype`. | ||
* Handle connection. | ||
* | ||
* @param {Socket} sock | ||
* @api private | ||
*/ | ||
Socket.prototype.__proto__ = Emitter.prototype; | ||
Socket.prototype.onconnect = function(sock){ | ||
var self = this; | ||
var addr = sock.remoteAddress + ':' + sock.remotePort; | ||
debug('connection %s', addr); | ||
this.addSocket(sock); | ||
this.emit('connect', sock); | ||
sock.on('close', function(){ | ||
debug('disconnect %s', addr); | ||
self.emit('disconnect', sock); | ||
self.removeSocket(sock); | ||
}); | ||
}; | ||
/** | ||
* Connect to `port` at `host` and invoke `fn()`. | ||
* Bind to `port` at `host` and invoke `fn()`. | ||
* | ||
* Defaults `host` to localhost. | ||
* Defaults `host` to INADDR_ANY. | ||
* | ||
* @param {Number} port | ||
* @param {String} host | ||
* Emits: | ||
* | ||
* - `connection` when a client connects | ||
* - `disconnect` when a client disconnects | ||
* - `bind` when bound and listening | ||
* | ||
* @param {Number|String} port | ||
* @param {Function} fn | ||
* @return {Socket} | ||
* @api public | ||
*/ | ||
Socket.prototype.connect = function(port, host, fn){ | ||
if ('function' == host) fn = host, host = undefined; | ||
this.type = 'client'; | ||
this.port = port; | ||
this.host = host || '127.0.0.1'; | ||
debug('connect %s:%s', this.host, this.port); | ||
this.sock.connect(this.port, this.host); | ||
this.callback = fn; | ||
return this; | ||
}; | ||
Socket.prototype.bind = function(port, host, fn){ | ||
var self = this; | ||
if ('client' == this.type) throw new Error('cannot bind() after connect()'); | ||
if ('function' == typeof host) fn = host, host = undefined; | ||
/** | ||
* Close the socket. | ||
* | ||
* @api public | ||
*/ | ||
if ('string' == typeof port) { | ||
port = url.parse(port); | ||
host = port.hostname; | ||
port = parseInt(port.port, 10); | ||
} | ||
Socket.prototype.close = function(){ | ||
debug('close'); | ||
this.closing = true; | ||
this.sock.destroy(); | ||
this.type = 'server'; | ||
host = host || '0.0.0.0'; | ||
this.server = net.createServer(this.onconnect.bind(this)); | ||
debug('bind %s:%s', host, port); | ||
this.server.on('listening', this.emit.bind('bind')); | ||
this.server.listen(port, host, fn); | ||
return this; | ||
}; |
@@ -6,3 +6,3 @@ | ||
var Queue = require('./queue'); | ||
var Socket = require('./sock'); | ||
@@ -22,3 +22,3 @@ /** | ||
function SubSocket() { | ||
Queue.call(this); | ||
Socket.call(this); | ||
this.subscriptions = []; | ||
@@ -28,5 +28,13 @@ } | ||
/** | ||
* Inherits from `Queue.prototype`. | ||
* Inherits from `Socket.prototype`. | ||
*/ | ||
SubSocket.prototype.__proto__ = Queue.prototype; | ||
SubSocket.prototype.__proto__ = Socket.prototype; | ||
/** | ||
* Subscribers should not send messages. | ||
*/ | ||
SubSocket.prototype.send = function(){ | ||
throw new Error('subscribers should not send messages'); | ||
}; |
{ | ||
"name": "axon", | ||
"description": "High-level messaging & socket patterns implemented in pure js", | ||
"version": "0.0.3", | ||
"version": "0.1.0", | ||
"author": "TJ Holowaychuk <tj@vision-media.ca>", | ||
"dependencies": { | ||
"debug": "*" | ||
"debug": "*", | ||
"configurable": "0.0.1" | ||
}, | ||
"devDependencies": { | ||
"should": "*", | ||
"mocha": "*" | ||
"mocha": "*", | ||
"commander": "*" | ||
}, | ||
"keywords": ["zmq", "zeromq", "pubsub", "socket", "emitter", "ipc", "rpc"] | ||
} |
Sorry, the diff of this file is not supported yet
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
New author
Supply chain riskA new npm collaborator published a version of the package for the first time. New collaborators are usually benign additions to a project, but do indicate a change to the security surface area of a package.
Found 1 instance in 1 package
Non-existent author
Supply chain riskThe package was published by an npm account that no longer exists.
Found 1 instance in 1 package
No README
QualityPackage does not have a README. This may indicate a failed publish or a low quality package.
Found 1 instance in 1 package
39915
25
1258
1
405
0
2
3
3
+ Addedconfigurable@0.0.1
+ Addedconfigurable@0.0.1(transitive)