Comparing version 0.4.6 to 0.5.0
@@ -9,2 +9,3 @@ | ||
.option('-s, --size <n>', 'message size in bytes [1024]', parseInt) | ||
.option('-d, --duration <n>', 'duration of test [5000]', parseInt) | ||
.parse(process.argv) | ||
@@ -14,2 +15,3 @@ | ||
sock.bind(3000); | ||
sock.on('disconnect', process.exit); | ||
console.log('pub bound'); | ||
@@ -27,2 +29,2 @@ | ||
more(); | ||
more(); |
@@ -9,2 +9,3 @@ | ||
.option('-s, --size <n>', 'message size in bytes [1024]', parseInt) | ||
.option('-d, --duration <n>', 'duration of test [5000]', parseInt) | ||
.parse(process.argv) | ||
@@ -53,3 +54,3 @@ | ||
process.on('SIGINT', function(){ | ||
function done(){ | ||
var ms = Date.now() - start; | ||
@@ -65,2 +66,5 @@ var avg = n / (ms / 1000); | ||
process.exit(); | ||
}); | ||
} | ||
process.on('SIGINT', done); | ||
setTimeout(done, program.duration); |
0.4.6 / 2012-11-15 | ||
0.5.0 / 2013-01-01 | ||
================== | ||
* add HWM support. Closes #19 | ||
* add ability to pass a callback in to the Socket.close method. | ||
* update benchmarks. Closes #72 | ||
* remove batching | ||
0.4.6 / 2012-11-15 | ||
================== | ||
* fix round-robin write to unwritable socket | ||
0.4.5 / 2012-10-30 | ||
0.4.5 / 2012-10-30 | ||
================== | ||
@@ -15,3 +23,3 @@ | ||
0.4.4 / 2012-10-29 | ||
0.4.4 / 2012-10-29 | ||
================== | ||
@@ -21,3 +29,3 @@ | ||
0.4.3 / 2012-10-27 | ||
0.4.3 / 2012-10-27 | ||
================== | ||
@@ -28,3 +36,3 @@ | ||
0.4.2 / 2012-10-18 | ||
0.4.2 / 2012-10-18 | ||
================== | ||
@@ -36,3 +44,3 @@ | ||
0.4.1 / 2012-10-16 | ||
0.4.1 / 2012-10-16 | ||
================== | ||
@@ -47,3 +55,3 @@ | ||
0.4.0 / 2012-10-12 | ||
0.4.0 / 2012-10-12 | ||
================== | ||
@@ -58,3 +66,3 @@ | ||
0.3.2 / 2012-10-08 | ||
0.3.2 / 2012-10-08 | ||
================== | ||
@@ -64,3 +72,3 @@ | ||
0.3.1 / 2012-10-08 | ||
0.3.1 / 2012-10-08 | ||
================== | ||
@@ -70,3 +78,3 @@ | ||
0.3.0 / 2012-10-05 | ||
0.3.0 / 2012-10-05 | ||
================== | ||
@@ -78,3 +86,3 @@ | ||
0.2.0 / 2012-09-27 | ||
0.2.0 / 2012-09-27 | ||
================== | ||
@@ -87,3 +95,3 @@ | ||
0.1.0 / 2012-09-24 | ||
0.1.0 / 2012-09-24 | ||
================== | ||
@@ -102,3 +110,3 @@ | ||
0.0.3 / 2012-07-14 | ||
0.0.3 / 2012-07-14 | ||
================== | ||
@@ -105,0 +113,0 @@ |
@@ -6,3 +6,2 @@ | ||
exports.Batch = require('./batch'); | ||
exports.Parser = require('./parser'); | ||
@@ -54,2 +53,2 @@ exports.Message = require('./message'); | ||
return new fn(options); | ||
}; | ||
}; |
@@ -11,8 +11,11 @@ | ||
* | ||
* Provides an `enqueue` method to the `sock`. Messages | ||
* Provides an `.enqueue()` method to the `sock`. Messages | ||
* passed to `enqueue` will be buffered until the next | ||
* `connect` event is emitted. | ||
* | ||
* TODO: HWM via `opts`? | ||
* Emits: | ||
* | ||
* - `drop` (msg) when a message is dropped | ||
* - `flush` (msgs) when the queue is flushed | ||
* | ||
* @param {Object} options | ||
@@ -40,6 +43,10 @@ * @api private | ||
debug('flush %d messages', len); | ||
for (var i = 0; i < len; ++i) { | ||
this.send(buf[i]); | ||
} | ||
var prev = buf; | ||
buf = []; | ||
sock.emit('flush', prev); | ||
}); | ||
@@ -52,6 +59,16 @@ | ||
sock.enqueue = function(msg){ | ||
var hwm = sock.settings.hwm; | ||
if (buf.length >= hwm) return drop(msg); | ||
buf.push(msg); | ||
}; | ||
/** | ||
* Drop the given `msg`. | ||
*/ | ||
function drop(msg) { | ||
debug('drop'); | ||
sock.emit('drop', msg); | ||
} | ||
}; | ||
}; | ||
}; |
/** | ||
* Deps. | ||
*/ | ||
var slice = require('../utils').slice; | ||
/** | ||
* Round-robin plugin. | ||
@@ -31,3 +37,3 @@ * | ||
/** | ||
* Sends `msg` to all connected peers. | ||
* Sends `msg` to all connected peers round-robin. | ||
*/ | ||
@@ -38,15 +44,8 @@ | ||
, len = socks.length | ||
, sock = socks[n++ % len] | ||
, args = []; | ||
, sock = socks[n++ % len]; | ||
if (Array.isArray(msg)) { | ||
args = msg; | ||
} else { | ||
for (var i = 0; i < arguments.length; ++i) { | ||
args[i] = arguments[i]; | ||
} | ||
} | ||
if (arguments.length > 1) msg = slice(arguments); | ||
if (sock && sock.writable) { | ||
sock.write(this.pack(args)); | ||
sock.write(this.pack(msg)); | ||
} else { | ||
@@ -58,2 +57,2 @@ fallback(msg); | ||
}; | ||
}; | ||
}; |
@@ -7,3 +7,3 @@ | ||
var Socket = require('./sock') | ||
, Batch = require('../batch'); | ||
, slice = require('../utils').slice; | ||
@@ -24,12 +24,2 @@ /** | ||
Socket.call(this); | ||
var self = this; | ||
this.n = 0; | ||
this.filters = []; | ||
this.batch = new Batch; | ||
this.set('batch max', 10); | ||
this.set('batch ttl', 100); | ||
process.nextTick(function(){ | ||
var ttl = self.get('batch ttl'); | ||
self.batchTimer = setInterval(self.flushBatch.bind(self), ttl); | ||
}); | ||
} | ||
@@ -44,16 +34,17 @@ | ||
/** | ||
* Flush the batch. | ||
* Send `msg` to all established peers. | ||
* | ||
* @api private | ||
* @param {Mixed} msg | ||
* @api public | ||
*/ | ||
PubSocket.prototype.flushBatch = function(){ | ||
if (this.batch.empty()) return; | ||
PubSocket.prototype.send = function(msg){ | ||
var socks = this.socks | ||
, len = socks.length | ||
, msg = this.batch.toBuffer() | ||
, sock; | ||
for (var i = 0; i < len; ++i) { | ||
if (arguments.length > 1) msg = slice(arguments); | ||
msg = this.pack(msg); | ||
for (var i = 0; i < len; i++) { | ||
sock = socks[i]; | ||
@@ -63,39 +54,3 @@ if (sock.writable) sock.write(msg); | ||
this.batch.clear(); | ||
this.n = 0; | ||
}; | ||
/** | ||
* Send `msg` to all established peers. | ||
* | ||
* Messages will be batched rather then sent immediately | ||
* until the batch reaches the option `batch max`. | ||
* | ||
* @param {Mixed} msg | ||
* @api public | ||
*/ | ||
PubSocket.prototype.send = function(msg){ | ||
if (++this.n == this.get('batch max')) return this.flushBatch(); | ||
if (Array.isArray(msg)) { | ||
this.batch.add(this.pack(msg)); | ||
} else { | ||
var args = []; | ||
for (var i = 0; i < arguments.length; ++i) { | ||
args[i] = arguments[i]; | ||
} | ||
this.batch.add(this.pack(args)); | ||
} | ||
return this; | ||
}; | ||
/** | ||
* Close the pub socket. | ||
* | ||
* @api public | ||
*/ | ||
PubSocket.prototype.close = function(){ | ||
clearInterval(this.batchTimer); | ||
return Socket.prototype.close.call(this); | ||
}; |
@@ -8,2 +8,3 @@ | ||
, queue = require('../plugins/queue') | ||
, slice = require('../utils').slice | ||
, debug = require('debug')('axon:req'); | ||
@@ -82,12 +83,6 @@ | ||
, sock = socks[this.n++ % len] | ||
, args = []; | ||
, args = Array.isArray(msg) | ||
? msg | ||
: slice(arguments); | ||
if (Array.isArray(msg)) { | ||
args = msg; | ||
} else { | ||
for (var i = 0; i < arguments.length; ++i) { | ||
args[i] = arguments[i]; | ||
} | ||
} | ||
if (sock) { | ||
@@ -94,0 +89,0 @@ if ('function' == typeof args[args.length - 1]) { |
@@ -56,2 +56,3 @@ | ||
this.format('none'); | ||
this.set('hwm', Infinity); | ||
this.set('identity', String(process.pid)); | ||
@@ -153,3 +154,3 @@ this.set('retry timeout', 100); | ||
Socket.prototype.close = function(){ | ||
Socket.prototype.close = function(fn){ | ||
debug('closing'); | ||
@@ -161,3 +162,3 @@ this.closing = true; | ||
this.server.on('close', this.emit.bind(this, 'close')); | ||
this.server.close(); | ||
this.server.close(fn); | ||
} | ||
@@ -164,0 +165,0 @@ }; |
{ | ||
"name": "axon", | ||
"description": "High-level messaging & socket patterns implemented in pure js", | ||
"version": "0.4.6", | ||
"version": "0.5.0", | ||
"author": "TJ Holowaychuk <tj@vision-media.ca>", | ||
@@ -6,0 +6,0 @@ "dependencies": { |
@@ -28,2 +28,4 @@ # Axon | ||
- `bind` when the server is bound | ||
- `drop` (msg) when a message is dropped due to the HWM | ||
- `flush` (msgs) queued when messages are flushed on connection | ||
@@ -138,3 +140,3 @@ ## Patterns | ||
sock.send(img, function(res){ | ||
}); | ||
@@ -169,3 +171,3 @@ ``` | ||
sock.send('resize', img, function(res){ | ||
}); | ||
@@ -237,8 +239,4 @@ ``` | ||
- `retry max timeout` - the cap for retry timeout length in milliseconds [5000] | ||
- `hwm` - the high water mark threshold for queues [Infinity] | ||
PubSockets additionally have options for batching: | ||
- `batch max` - Max amount of messages to buffer in memory [10]. | ||
- `batch ttl` - Amount of time in milliesconds to buffer messages before sending [100]. | ||
## Binding / Connecting | ||
@@ -300,50 +298,54 @@ | ||
Preliminary benchmarks on my Macbook Pro: | ||
Preliminary benchmarks on my Macbook Pro based on 10 messages | ||
per tick as a realistic production application would likely have | ||
even less than this. "better" numbers may be acheived with batching | ||
and a larger messages/tick count however this is not realistic. | ||
15 byte messages: | ||
64 byte messages: | ||
``` | ||
min: 280 ops/s | ||
mean: 472,109 ops/s | ||
median: 477,309 ops/s | ||
total: 10,758,780 ops in 24.633s | ||
through: 6.75 mb/s | ||
min: 47,169 ops/s | ||
mean: 465,127 ops/s | ||
median: 500,000 ops/s | ||
total: 2,325,636 ops in 5s | ||
through: 28.39 mb/s | ||
``` | ||
64 byte messages: | ||
1k messages: | ||
``` | ||
min: 218 ops/s | ||
mean: 462,286 ops/s | ||
median: 461,512 ops/s | ||
total: 6,455,160 ops in 15.488s | ||
through: 28.21 mb/s | ||
min: 48,076 ops/s | ||
mean: 120,253 ops/s | ||
median: 121,951 ops/s | ||
total: 601,386 ops in 5.001s | ||
through: 117.43 mb/s | ||
``` | ||
1k messages: | ||
8k messages: | ||
``` | ||
min: 280 ops/s | ||
mean: 382,829 ops/s | ||
median: 382,764 ops/s | ||
total: 3,333,581 ops in 15.126s | ||
through: 373.85 mb/s | ||
min: 36,496 ops/s | ||
mean: 53,194 ops/s | ||
median: 50,505 ops/s | ||
total: 266,506 ops in 5.01s | ||
through: 405.84 mb/s | ||
``` | ||
```` | ||
8k messages: | ||
32k messages: | ||
``` | ||
min: 392 ops/s | ||
mean: 92,778 ops/s | ||
median: 87,943 ops/s | ||
total: 1,257,430 ops in 21.735s | ||
through: 724.82 mb/s | ||
min: 12,077 ops/s | ||
mean: 14,792 ops/s | ||
median: 16,233 ops/s | ||
total: 74,186 ops in 5.015s | ||
through: 462.28 mb/s | ||
```` | ||
``` | ||
@@ -376,24 +378,3 @@ ## What's it good for? | ||
(The MIT License) | ||
MIT | ||
Copyright (c) 2012 TJ Holowaychuk <tj@vision-media.ca> | ||
Permission is hereby granted, free of charge, to any person obtaining | ||
a copy of this software and associated documentation files (the | ||
'Software'), to deal in the Software without restriction, including | ||
without limitation the rights to use, copy, modify, merge, publish, | ||
distribute, sublicense, and/or sell copies of the Software, and to | ||
permit persons to whom the Software is furnished to do so, subject to | ||
the following conditions: | ||
The above copyright notice and this permission notice shall be | ||
included in all copies or substantial portions of the Software. | ||
THE SOFTWARE IS PROVIDED 'AS IS', WITHOUT WARRANTY OF ANY KIND, | ||
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF | ||
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. | ||
IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY | ||
CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, | ||
TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE | ||
SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. | ||
Sorry, the diff of this file is not supported yet
41318
1256
375