Comparing version 2.13.0 to 2.14.0
@@ -0,1 +1,8 @@ | ||
2.14.0 / 2015-11-20 | ||
=================== | ||
* A socket.read() method was added to retrieve messages while paused [sshutovskyi] | ||
* socket.send() now takes a callback as 3rd argument which is called once the message is sent [ronkorving] | ||
* Now tested on Node.js 0.8, 0.10, 0.12, 4 and 5 [ronkorving] | ||
2.13.0 / 2015-08-26 | ||
@@ -8,3 +15,2 @@ =================== | ||
2.12.0 / 2015-07-10 | ||
@@ -20,3 +26,2 @@ =================== | ||
2.11.1 / 2015-05-21 | ||
@@ -23,0 +28,0 @@ =================== |
@@ -242,2 +242,22 @@ /** | ||
Socket.prototype.read = function() { | ||
var message = [], flags; | ||
if (this._zmq.state !== zmq.STATE_READY) { | ||
return null; | ||
} | ||
flags = this._zmq.getsockopt(zmq.ZMQ_EVENTS); | ||
if (flags & zmq.ZMQ_POLLIN) { | ||
do { | ||
message.push(this._zmq.recv()); | ||
} while (this._zmq.getsockopt(zmq.ZMQ_RCVMORE)); | ||
return message; | ||
} | ||
return null; | ||
} | ||
/** | ||
@@ -481,3 +501,4 @@ * Set `opt` to `val`. | ||
* @param {String|Buffer|Array} msg | ||
* @param {Number} flags | ||
* @param {Number} [flags] | ||
* @param {Function} [cb] | ||
* @return {Socket} for chaining | ||
@@ -487,3 +508,3 @@ * @api public | ||
Socket.prototype.send = function(msg, flags) { | ||
Socket.prototype.send = function(msg, flags, cb) { | ||
flags = flags | 0; | ||
@@ -494,2 +515,3 @@ | ||
var part = msg[i]; | ||
var isLast = i === len - 1; | ||
@@ -500,3 +522,3 @@ if (!Buffer.isBuffer(part)) { | ||
this._outgoing.push([part, i < len - 1 ? zmq.ZMQ_SNDMORE : flags]); | ||
this._outgoing.push([part, isLast ? flags : zmq.ZMQ_SNDMORE, isLast ? cb : undefined]); | ||
} | ||
@@ -508,3 +530,3 @@ } else { | ||
this._outgoing.push([msg, flags]); | ||
this._outgoing.push([msg, flags, cb]); | ||
} | ||
@@ -525,2 +547,3 @@ | ||
// The workhorse that does actual send and receive operations. | ||
@@ -530,3 +553,3 @@ // This helper is called from `send` above, and in response to | ||
Socket.prototype._flush = function() { | ||
var flags, args, emitArgs; | ||
var flags, args, emitArgs, errorSent; | ||
@@ -551,19 +574,15 @@ // Don't allow recursive flush invocation as it can lead to stack | ||
this._flushing = false; | ||
return | ||
return; | ||
}; | ||
if (flags & zmq.ZMQ_POLLIN) { | ||
emitArgs = ['message']; | ||
do { | ||
emitArgs.push(this._zmq.recv()); | ||
} while (this._zmq.getsockopt(zmq.ZMQ_RCVMORE)); | ||
var emitArgs = this.read(); | ||
if (emitArgs) { | ||
emitArgs.unshift('message'); | ||
// Handle received message immediately to prevent memory leak in driver | ||
this.emit.apply(this, emitArgs); | ||
} | ||
if (this._zmq.state !== zmq.STATE_READY) { | ||
this._flushing = false; | ||
return; | ||
} | ||
if (this._zmq.state !== zmq.STATE_READY) { | ||
this._flushing = false; | ||
return; | ||
} | ||
@@ -586,4 +605,15 @@ | ||
this._zmq.send(args[0], args[1]); | ||
if (args[2]) { | ||
args[2].call(this); | ||
} | ||
} catch (sendError) { | ||
// More chunks were to follow, which we should now drop. | ||
errorSent = false; | ||
if (args[2]) { | ||
errorSent = true; | ||
args[2].call(this, sendError); | ||
} | ||
// If more chunks were to follow, we need to drop all of them. | ||
// This loop will pull off the items up until and including | ||
@@ -594,5 +624,13 @@ // the first item that is not flagged SNDMORE. | ||
args = this._outgoing.shift(); | ||
if (args && args[2]) { | ||
errorSent = true; | ||
args[2].call(this, sendError); | ||
} | ||
} | ||
this._flushing = false; | ||
throw sendError; | ||
if (!errorSent) { | ||
this._flushing = false; | ||
throw sendError; | ||
} | ||
} | ||
@@ -599,0 +637,0 @@ } |
{ | ||
"name": "zmq", | ||
"version": "2.13.0", | ||
"version": "2.14.0", | ||
"description": "Bindings for node.js and io.js to ZeroMQ", | ||
@@ -5,0 +5,0 @@ "main": "index", |
@@ -115,2 +115,28 @@ var zmq = require('..') | ||
it('should call send() callbacks', function(done){ | ||
var received = 0; | ||
var callbacks = 0; | ||
function cb() { | ||
callbacks += 1; | ||
} | ||
pull.on('message', function () { | ||
received += 1; | ||
if (received === 4) { | ||
callbacks.should.equal(received); | ||
done(); | ||
} | ||
}); | ||
pull.bind('inproc://stuff_ssmm', function(){ | ||
push.connect('inproc://stuff_ssmm'); | ||
push.send('hello', null, cb); | ||
push.send('hello', null, cb); | ||
push.send('hello', null, cb); | ||
push.send(['hello', 'world'], null, cb); | ||
}); | ||
}); | ||
}); |
@@ -75,4 +75,30 @@ var zmq = require('..') | ||
it('should be able to read messages after pause()', function(done){ | ||
var push = zmq.socket('push') | ||
, pull = zmq.socket('pull'); | ||
var addr = "inproc://pause_stuff"; | ||
var messages = ['bar', 'foo']; | ||
pull.bind(addr, function(){ | ||
push.connect(addr); | ||
pull.pause() | ||
messages.forEach(function(message){ | ||
push.send(message); | ||
}); | ||
messages.forEach(function(message){ | ||
pull.read().toString().should.eql(message); | ||
}); | ||
}); | ||
setTimeout(function (){ | ||
pull.close(); | ||
push.close(); | ||
done(); | ||
}, 100); | ||
}); | ||
it('should emit messages after resume()', function(done){ | ||
@@ -79,0 +105,0 @@ var push = zmq.socket('push') |
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
1206567
58
2618