Comparing version 0.1.3 to 0.1.4
// pub/sub benchmark script credit: https://github.com/tj/axon/tree/master/benchmark | ||
var fs = require('fs'); | ||
var duplexify = require('duplexify') | ||
var socketmq = require('../') | ||
@@ -5,0 +4,0 @@ humanize = require('humanize-number'); |
@@ -6,2 +6,6 @@ // pub/sub benchmark script credit: https://github.com/tj/axon/tree/master/benchmark | ||
var smq = socketmq.bind('tcp://0.0.0.0:6363') | ||
// var msgpack = require('msgpack') | ||
// smq.setMsgEncoder(msgpack) | ||
smq.on('bind', function() { | ||
@@ -8,0 +12,0 @@ console.log('pub bound'); |
@@ -8,2 +8,5 @@ // pub/sub benchmark script credit: https://github.com/tj/axon/tree/master/benchmark | ||
// var msgpack = require('msgpack') | ||
// smq.setMsgEncoder(msgpack) | ||
var n = 0; | ||
@@ -10,0 +13,0 @@ var ops = 5000; |
@@ -7,39 +7,70 @@ var amp = require('amp') | ||
exports.B_PUB = new Buffer(exports.PUB) | ||
exports.B_REQ = new Buffer(exports.REQ) | ||
exports.B_REP = new Buffer(exports.REP) | ||
var TYPES = exports.TYPES = {} | ||
TYPES[exports.PUB] = Buffer(exports.PUB) | ||
TYPES[exports.REQ] = Buffer(exports.REQ) | ||
TYPES[exports.REP] = Buffer(exports.REP) | ||
var jsonEncode = JSON.stringify | ||
var jsonDecode = JSON.parse | ||
exports.json = { | ||
encode: function(type, event, msg, id) { | ||
var args = [TYPES[type], Buffer(event)] | ||
if (msg) { | ||
args[2] = Buffer(jsonEncode(msg)) | ||
} | ||
if (id) { | ||
args[3] = Buffer(id) | ||
} | ||
return amp.encode(args) | ||
}, | ||
decode: function(buf) { | ||
var decoded = amp.decode(buf) | ||
// message type | ||
decoded[0] = decoded[0].toString() | ||
// event name | ||
decoded[1] = decoded[1].toString() | ||
// message if any | ||
if (decoded[2]) { | ||
decoded[2] = jsonDecode(decoded[2]) | ||
} | ||
// message id | ||
if (decoded[3]) { | ||
decoded[3] = decoded[3].toString() | ||
} | ||
return decoded | ||
} | ||
} | ||
exports.encode = function(msgEncode, type, event, msg, id) { | ||
var args = [type, new Buffer(event)] | ||
var args = [type, event] | ||
if (msg) { | ||
args[2] = msgEncode(msg) | ||
args[2] = msg | ||
} | ||
if (id) { | ||
args[3] = new Buffer(id) | ||
args[3] = id | ||
} | ||
return amp.encode(args) | ||
return amp.encode([msgEncode(args)]) | ||
} | ||
exports.decode = function(msgDecode, buf) { | ||
var decoded = amp.decode(buf) | ||
// message type | ||
decoded[0] = decoded[0].toString() | ||
// event name | ||
decoded[1] = decoded[1].toString() | ||
// message if any | ||
if (decoded[2]) { | ||
decoded[2] = msgDecode(decoded[2]) | ||
if (decoded) { | ||
return msgDecode(decoded[0]) | ||
} else { | ||
return null | ||
} | ||
} | ||
// message id | ||
if (decoded[3]) { | ||
decoded[3] = decoded[3].toString() | ||
} | ||
return decoded | ||
} | ||
exports.StreamParser = amp.Stream |
@@ -10,5 +10,4 @@ var util = require('util') | ||
var B_PUB = Message.B_PUB | ||
var B_REQ = Message.B_REQ | ||
var B_REP = Message.B_REP | ||
var wireEncode = Message.encode | ||
var wireDecode = Message.decode | ||
@@ -41,18 +40,23 @@ /** | ||
Socket.prototype.setMsgEncoder = function(encode, decode) { | ||
var type = typeof encode | ||
if (encode && 'function' !== type && 'object' === type) { | ||
// an encoder object | ||
decode = encode.decode || encode.unpack || decode | ||
encode = encode.encode || encode.pack || encode | ||
} | ||
if (encode) { | ||
this.msgEncode = encode | ||
this.encode = function(type, event, msg, id) { | ||
return wireEncode(encode, type, event, msg, id) | ||
} | ||
} else { | ||
encode = JSON.stringify | ||
this.msgEncode = function(msg) { | ||
return Buffer(encode(msg)) | ||
} | ||
this.encode = Message.json.encode | ||
} | ||
if (decode) { | ||
this.msgDecode = decode | ||
this.decode = function(buf) { | ||
return wireDecode(decode, buf) | ||
} | ||
} else { | ||
decode = JSON.parse | ||
this.msgDecode = function(buf) { | ||
return decode(buf) | ||
} | ||
this.decode = Message.json.decode | ||
} | ||
@@ -62,3 +66,3 @@ } | ||
Socket.prototype.pub = function(event, msg) { | ||
var buf = Message.encode(this.msgEncode, B_PUB, event, msg) | ||
var buf = this.encode(PUB, event, msg) | ||
this.queue.pub(send, this.streams, buf) | ||
@@ -73,3 +77,3 @@ } | ||
var q = this.queue.req(this.streams, callback) | ||
var buf = Message.encode(this.msgEncode, B_REQ, event, msg, q.id) | ||
var buf = this.encode(REQ, event, msg, q.id) | ||
send(q.stream, buf) | ||
@@ -83,3 +87,3 @@ } | ||
Socket.prototype.onMessage = function(data, stream) { | ||
var decoded = Message.decode(this.msgDecode, data) | ||
var decoded = this.decode(data) | ||
var type = decoded[0] | ||
@@ -97,5 +101,5 @@ var event = decoded[1] | ||
// Make reply function | ||
var encode = this.msgEncode | ||
var encode = this.encode | ||
var reply = function(msg) { | ||
var buf = Message.encode(encode, B_REP, event, msg, inboxId) | ||
var buf = encode(REP, event, msg, inboxId) | ||
send(stream, buf) | ||
@@ -102,0 +106,0 @@ } |
{ | ||
"name": "socketmq", | ||
"version": "0.1.3", | ||
"version": "0.1.4", | ||
"description": "Lightweight stream-oriented messaging library for node.", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
32209
678