Comparing version 0.1.2 to 0.1.3
@@ -25,4 +25,4 @@ /* | ||
function next_request(context) { | ||
if (context.receiver.remote.attach.source.address) { | ||
sender.send({properties:{reply_to:context.receiver.remote.attach.source.address}, body:requests[0]}) | ||
if (context.receiver.source.address) { | ||
sender.send({properties:{reply_to:context.receiver.source.address}, body:requests[0]}) | ||
} | ||
@@ -32,4 +32,4 @@ } | ||
container.on('connection_open', function (context) { | ||
sender = context.connection.attach_sender('examples'); | ||
context.connection.attach_receiver({source:{dynamic:true}}); | ||
sender = context.connection.open_sender('examples'); | ||
context.connection.open_receiver({source:{dynamic:true}}); | ||
}); | ||
@@ -36,0 +36,0 @@ container.on('receiver_open', function (context) { |
@@ -28,3 +28,3 @@ /* | ||
server.once('listening', function () { | ||
container.connect({'port':8888}).attach_sender('examples'); | ||
container.connect({'port':8888}).open_sender('examples'); | ||
}); | ||
@@ -31,0 +31,0 @@ server.once('connection', function () { |
@@ -18,4 +18,4 @@ /* | ||
container.on('connection_open', function (context) { | ||
context.connection.attach_receiver('examples'); | ||
context.connection.attach_sender('examples'); | ||
context.connection.open_receiver('examples'); | ||
context.connection.open_sender('examples'); | ||
}); | ||
@@ -22,0 +22,0 @@ container.on('message', function (context) { |
@@ -37,2 +37,2 @@ /* | ||
container.connect({'port':args.port}).attach_receiver({source:{address:args.node,distribution_mode:'copy'}}); | ||
container.connect({'port':args.port}).open_receiver({source:{address:args.node,distribution_mode:'copy'}}); |
@@ -96,3 +96,3 @@ /* | ||
var connection = container.connect(connect_options); | ||
sender = connection.attach_sender('examples'); | ||
connection.attach_receiver('examples'); | ||
sender = connection.open_sender('examples'); | ||
connection.open_receiver('examples'); |
@@ -41,12 +41,12 @@ /* | ||
container.on('sender_open', function (context) { | ||
subscribe(context.connection.remote.open.container_id, context.sender); | ||
subscribe(context.connection.container_id, context.sender); | ||
}); | ||
container.on('sender_close', function (context) { | ||
unsubscribe(context.connection.remote.open.container_id); | ||
unsubscribe(context.connection.container_id); | ||
}); | ||
container.on('connection_close', function (context) { | ||
unsubscribe(context.connection.remote.open.container_id); | ||
unsubscribe(context.connection.container_id); | ||
}); | ||
container.on('disconnected', function (context) { | ||
unsubscribe(context.connection.remote.open.container_id); | ||
unsubscribe(context.connection.container_id); | ||
}); | ||
@@ -56,3 +56,3 @@ | ||
if (expected === 0 || received < expected) { | ||
var name = context.connection.remote.open.container_id; | ||
var name = context.connection.container_id; | ||
console.log('echoed ' + context.message.body + ' to ' + name); | ||
@@ -59,0 +59,0 @@ listeners[name].send(context.message); |
@@ -22,3 +22,3 @@ /* | ||
container.on('connection_open', function (context) { | ||
context.connection.attach_receiver('examples'); | ||
context.connection.open_receiver('examples'); | ||
}); | ||
@@ -25,0 +25,0 @@ |
@@ -41,2 +41,2 @@ /* | ||
container.connect({'port':args.port}).attach_receiver(args.node); | ||
container.connect({'port':args.port}).open_receiver(args.node); |
@@ -44,2 +44,2 @@ /* | ||
container.connect({'port':args.port}).attach_sender(args.node); | ||
container.connect({'port':args.port}).open_sender(args.node); |
@@ -73,3 +73,5 @@ /* | ||
} else { | ||
if (signal === null) assert.equal(code, 0); | ||
if (signal === null && !process.version.match(/v0\.10\.\d+/)) { | ||
assert.equal(code, 0); | ||
} | ||
if (prog.verifier) { | ||
@@ -76,0 +78,0 @@ prog.verifier(prog.actual_output); |
@@ -34,2 +34,12 @@ /* | ||
function ConnectionError(message, condition, connection) { | ||
Error.call(this); | ||
Error.captureStackTrace(this, this.constructor); | ||
this.message = message; | ||
this.condition = condition; | ||
this.description = message; | ||
this.connection = connection; | ||
} | ||
require('util').inherits(ConnectionError, Error); | ||
function get_socket_id(socket) { | ||
@@ -104,2 +114,18 @@ if (socket.get_id_string) return socket.get_id_string(); | ||
var aliases = ["container_id", | ||
"hostname", | ||
"max_frame_size", | ||
"channel_max", | ||
"idle_time_out", | ||
"outgoing_locales", | ||
"incoming_locales", | ||
"offered_capabilities", | ||
"desired_capabilities", | ||
"properties"]; | ||
function remote_property_shortcut(name) { | ||
return function() { return this.remote.open ? this.remote.open[name] : undefined; }; | ||
} | ||
var conn_counter = 1; | ||
@@ -153,2 +179,8 @@ | ||
this.default_sender = undefined; | ||
for (var i in aliases) { | ||
var f = aliases[i]; | ||
Object.defineProperty(this, f, { get: remote_property_shortcut(f) }); | ||
} | ||
Object.defineProperty(this, 'error', { get: function() { return this.remote.close ? this.remote.close.error : undefined; }}); | ||
}; | ||
@@ -226,3 +258,3 @@ | ||
this.socket.on('data', this.input.bind(this)); | ||
this.socket.on('error', this.error.bind(this)); | ||
this.socket.on('error', this.on_error.bind(this)); | ||
this.socket.on('end', this.eof.bind(this)); | ||
@@ -302,8 +334,7 @@ | ||
if (error) { | ||
//TODO: invoke connection_close regardless of whether connection_error is handled | ||
//TODO: example for error handling | ||
if (!this.dispatch('connection_error', this._context())) { | ||
if (!this.dispatch('connection_close', this._context())) { | ||
console.log('error: ' + JSON.stringify(error)); | ||
} | ||
var handled = this.dispatch('connection_error', this._context()); | ||
handled = this.dispatch('connection_close', this._context()) || handled; | ||
if (!handled) { | ||
EventEmitter.prototype.emit.call(this.container, 'error', new ConnectionError(error.description, error.condition, this)); | ||
} | ||
@@ -349,3 +380,7 @@ return true; | ||
if (this.local.open.idle_time_out) this.heartbeat_in = setTimeout(this.idle.bind(this), this.local.open.idle_time_out); | ||
if (this.transport.has_writes_pending()) this.output(); | ||
if (this.transport.has_writes_pending()) { | ||
this.output(); | ||
} else if (this.is_closed() && this.state.has_settled()) { | ||
this.socket.end(); | ||
} | ||
}; | ||
@@ -361,3 +396,3 @@ | ||
Connection.prototype.error = function (e) { | ||
Connection.prototype.on_error = function (e) { | ||
console.log('[' + this.options.id + '] error: ' + e); | ||
@@ -372,2 +407,4 @@ this._disconnected(); | ||
Connection.prototype._disconnected = function () { | ||
if (this.heartbeat_out) clearTimeout(this.heartbeat_out); | ||
if (this.heartbeat_in) clearTimeout(this.heartbeat_in); | ||
if (!this.is_closed() && this.scheduled_reconnect == undefined) { | ||
@@ -392,3 +429,4 @@ if (!this.dispatch('disconnected', this._context())) { | ||
}; | ||
Connection.prototype.close = function () { | ||
Connection.prototype.close = function (error) { | ||
if (error) this.local.close.error = error; | ||
if (this.state.close()) { | ||
@@ -395,0 +433,0 @@ this._register(); |
108
lib/link.js
@@ -38,5 +38,15 @@ /* | ||
function auto_accept(context) { | ||
context.delivery.update(true, message.accepted().described()); | ||
context.delivery.update(undefined, message.accepted().described()); | ||
}; | ||
function LinkError(message, condition, link) { | ||
Error.call(this); | ||
Error.captureStackTrace(this, this.constructor); | ||
this.message = message; | ||
this.condition = condition; | ||
this.description = message; | ||
this.link = link; | ||
} | ||
require('util').inherits(LinkError, Error); | ||
var EventEmitter = require('events').EventEmitter; | ||
@@ -50,4 +60,5 @@ | ||
EventEmitter.prototype.emit.apply(this, arguments); | ||
return true; | ||
} else { | ||
this.session.dispatch.apply(this.session, arguments); | ||
return this.session.dispatch.apply(this.session, arguments); | ||
} | ||
@@ -75,3 +86,4 @@ }; | ||
}; | ||
link.close = function() { | ||
link.close = function(error) { | ||
if (error) this.local.detach.error = error; | ||
this.local.detach.closed = true; | ||
@@ -123,2 +135,6 @@ if (this.state.close()) { | ||
link.prefix_event = function (event) { | ||
return (this.local.attach.role ? 'receiver_' : 'sender_') + event; | ||
} | ||
link.on_detach = function (frame) { | ||
@@ -128,3 +144,12 @@ if (this.state.remote_closed()) { | ||
this.close(); | ||
this.dispatch(this.local.attach.role ? 'receiver_close' : 'sender_close', this._context()); | ||
var error = this.remote.detach.error; | ||
if (error) { | ||
var handled = this.dispatch(this.prefix_event('error'), this._context()); | ||
handled = this.dispatch(this.prefix_event('close'), this._context()) || handled; | ||
if (!handled) { | ||
EventEmitter.prototype.emit.call(this.connection.container, 'error', new LinkError(error.description, error.condition, this)); | ||
} | ||
} else { | ||
this.dispatch(this.prefix_event('close'), this._context()); | ||
} | ||
} else { | ||
@@ -137,2 +162,3 @@ throw Error('Detach already received'); | ||
switch (name) { | ||
case 'name': | ||
case 'handle': | ||
@@ -147,2 +173,16 @@ case 'role': | ||
var aliases = ["snd_settle_mode", | ||
"rcv_settle_mode", | ||
"source", | ||
"target", | ||
"max_message_size", | ||
"offered_capabilities", | ||
"desired_capabilities", | ||
"properties"]; | ||
function remote_property_shortcut(name) { | ||
return function() { return this.remote.attach ? this.remote.attach[name] : undefined; } | ||
} | ||
link.init = function (session, name, local_handle, opts, is_receiver) { | ||
@@ -154,3 +194,3 @@ this.session = session; | ||
this.state = new EndpointState(); | ||
this.issue_flow = false;//currently only used by receiver | ||
this.issue_flow = false; | ||
this.local = {'handle': local_handle}; | ||
@@ -168,2 +208,7 @@ this.local.attach = frames.attach({'handle':local_handle,'name':name, role:is_receiver}); | ||
this.observers = new EventEmitter(); | ||
for (var i in aliases) { | ||
var f = aliases[i]; | ||
Object.defineProperty(this, f, { get: remote_property_shortcut(f) }); | ||
} | ||
Object.defineProperty(this, 'error', { get: function() { return this.remote.detach ? this.remote.detach.error : undefined; }}); | ||
}; | ||
@@ -199,2 +244,4 @@ link.reset = function() { | ||
this.init(session, name, local_handle, opts, false); | ||
this._draining = false; | ||
this._drained = false; | ||
this.local.attach.initial_delivery_count = 0; | ||
@@ -205,5 +252,28 @@ this.tag = 0; | ||
} | ||
var sender = this; | ||
if (this.get_option('treat_modified_as_released', true)) { | ||
this.observers.on('modified', function (context) { | ||
sender.dispatch('released', context); | ||
}); | ||
} | ||
}; | ||
Sender.prototype = Object.create(link); | ||
Sender.prototype.constructor = Sender; | ||
Sender.prototype._get_drain = function () { | ||
if (this._draining && this._drained && this.credit) { | ||
while (this.credit) { | ||
++this.delivery_count; | ||
--this.credit; | ||
} | ||
return true; | ||
} else { | ||
return false; | ||
} | ||
} | ||
Sender.prototype.set_drained = function (drained) { | ||
this._drained = drained; | ||
if (this._draining && this._drained) { | ||
this.issue_flow = true; | ||
} | ||
} | ||
Sender.prototype.next_tag = function () { | ||
@@ -218,4 +288,9 @@ return new String(this.tag++); | ||
this.credit = flow.delivery_count + flow.link_credit - this.delivery_count; | ||
this._draining = flow.drain; | ||
this._drained = this.credit > 0; | ||
if (this.is_open()) { | ||
this.dispatch('sender_flow', this._context()); | ||
if (this._draining) { | ||
this.dispatch('sender_draining', this._context()); | ||
} | ||
if (this.sendable()) { | ||
@@ -230,3 +305,7 @@ this.dispatch('sendable', this._context()); | ||
Sender.prototype.send = function (msg, tag) { | ||
return this.session.send(this, tag ? tag : this.next_tag(), message.encode(msg), 0); | ||
var delivery = this.session.send(this, tag ? tag : this.next_tag(), message.encode(msg), 0); | ||
if (this.local.attach.snd_settle_mode === 1) { | ||
delivery.settled = true; | ||
} | ||
return delivery; | ||
}; | ||
@@ -237,3 +316,4 @@ | ||
this.init(session, name, local_handle, opts, true); | ||
this.set_prefetch(this.get_option('prefetch', 100)); | ||
this.drain = false; | ||
this.set_credit_window(this.get_option('credit_window', 100)); | ||
if (this.get_option('autoaccept', true)) { | ||
@@ -247,2 +327,6 @@ this.observers.on('message', auto_accept); | ||
this.dispatch('receiver_flow', this._context()); | ||
if (frame.performative.drain) { | ||
if (frame.performative.link_credit > 0) console.log('ERROR: received flow with drain set, but non zero credit'); | ||
else this.dispatch('receiver_drained', this._context()); | ||
} | ||
}; | ||
@@ -256,6 +340,10 @@ Receiver.prototype.flow = function(credit) { | ||
}; | ||
Receiver.prototype.add_credit = Receiver.prototype.flow;//alias | ||
Receiver.prototype._get_drain = function () { | ||
return this.drain; | ||
} | ||
Receiver.prototype.set_prefetch = function(prefetch) { | ||
if (prefetch > 0) { | ||
var flow_controller = new FlowController(prefetch); | ||
Receiver.prototype.set_credit_window = function(credit_window) { | ||
if (credit_window > 0) { | ||
var flow_controller = new FlowController(credit_window); | ||
var listener = flow_controller.update.bind(flow_controller); | ||
@@ -262,0 +350,0 @@ this.observers.on('message', listener); |
@@ -110,6 +110,112 @@ /* | ||
function reverse_lookup(map) { | ||
var reversed = {}; | ||
for (var key in map) { | ||
var values = map[key]; | ||
for (var i in values) { | ||
reversed[values[i]] = key; | ||
} | ||
} | ||
return reversed; | ||
} | ||
var special_sections = {"properties": ["message_id", | ||
"user_id", | ||
"to", | ||
"subject", | ||
"reply_to", | ||
"correlation_id", | ||
"content_type", | ||
"content_encoding", | ||
"absolute_expiry_time", | ||
"creation_time", | ||
"group_id", | ||
"group_sequence", | ||
"reply_to_group_id"], | ||
"header": ["durable", | ||
"priority", | ||
"ttl", | ||
"first_acquirer", | ||
"delivery_count"] | ||
}; | ||
var special_fields = reverse_lookup(special_sections); | ||
var section_names = ["header", "delivery_annotations", "message_annotations", "properties", "application_properties", "body", "footer"]; | ||
function copy(src, tgt) { | ||
for (var k in src) { | ||
var v = src[k]; | ||
if (typeof v === "object") { | ||
copy(v, tgt[k]); | ||
} else { | ||
tgt[k] = v; | ||
} | ||
} | ||
} | ||
function add_property_shortcut(o, group, name) { | ||
Object.defineProperty(o, name, | ||
{ | ||
get: function() { return this[group] ? this[group][name] : undefined; }, | ||
set: function(v) { if (this[group] === undefined) { this[group] = {}; } this[group][name] = v; } | ||
}); | ||
} | ||
function create_message(m) { | ||
var msg = { | ||
toJSON: function () { | ||
var o = {}; | ||
for (var i in section_names) { | ||
var key = section_names[i]; | ||
if (typeof this[key] === 'function') continue; | ||
var fields = special_sections[key]; | ||
if (fields) { | ||
for (var j in fields) { | ||
if (this[fields[j]] !== undefined) { | ||
if (o[key] === undefined) o[key] = {}; | ||
o[key][fields[j]] = this[fields[j]]; | ||
} | ||
} | ||
} else if (this[key]) { | ||
o[key] = this[key]; | ||
} | ||
} | ||
return o; | ||
}, | ||
inspect: function() { | ||
return JSON.stringify(this.toJSON()); | ||
}, | ||
toString: function () { | ||
return JSON.stringify(this.toJSON()); | ||
} | ||
}; | ||
for (var field in special_fields) { | ||
add_property_shortcut(msg, special_fields[field], field); | ||
} | ||
if (m) copy(m, msg); | ||
return msg; | ||
} | ||
function massage(msg) { | ||
var out = undefined; | ||
for (var key in msg) { | ||
if (typeof msg[key] !== 'function' && section_names.indexOf(key) < 0) { | ||
if (out === undefined) out = {}; | ||
var group = special_fields[key] || 'application_properties'; | ||
if (out[group] === undefined) { | ||
out[group] = {}; | ||
} | ||
out[group][key] = msg[key]; | ||
} else { | ||
if (out !== undefined) { | ||
out[key] = msg[key]; | ||
} | ||
} | ||
} | ||
return out || msg; | ||
} | ||
message.encode = function(obj) { | ||
var sections = []; | ||
wrappers.forEach(function (wrapper_fn) { wrapper_fn(sections, obj); }); | ||
var msg = massage(obj); | ||
wrappers.forEach(function (wrapper_fn) { wrapper_fn(sections, msg); }); | ||
var writer = new types.Writer(); | ||
@@ -126,3 +232,3 @@ for (var i = 0; i < sections.length; i++) { | ||
message.decode = function(buffer) { | ||
var msg = {}; | ||
var msg = create_message(); | ||
var reader = new types.Reader(buffer); | ||
@@ -143,2 +249,7 @@ while (reader.remaining()) { | ||
} | ||
if (msg.application_properties) { | ||
for (var key in msg.application_properties) { | ||
add_property_shortcut(msg, 'application_properties', key); | ||
} | ||
} | ||
return msg; | ||
@@ -170,6 +281,6 @@ } | ||
if (c) { | ||
return new c(outcome); | ||
return new c(types.unwrap(outcome)); | ||
} | ||
} | ||
console.log('unrecognised outcome'); | ||
console.log('unrecognised outcome: ' + JSON.stringify(outcome)); | ||
return outcome; | ||
@@ -186,4 +297,4 @@ }; | ||
fields:[ | ||
{name:"section-number", type:"uint", mandatory:true}, | ||
{name:"section-offset", type:"ulong", mandatory:true} | ||
{name:"section_number", type:"uint", mandatory:true}, | ||
{name:"section_offset", type:"ulong", mandatory:true} | ||
]}); | ||
@@ -196,7 +307,7 @@ define_outcome({name:"accepted", code:0x24, fields:[]}); | ||
fields:[ | ||
{name:"delivery-failed", type:"boolean"}, | ||
{name:"undeliverable-here", type:"boolean"}, | ||
{name:"message-annotations", type:"fields"} | ||
{name:"delivery_failed", type:"boolean"}, | ||
{name:"undeliverable_here", type:"boolean"}, | ||
{name:"message_annotations", type:"map"} | ||
]}); | ||
module.exports = message; |
@@ -76,5 +76,37 @@ /* | ||
function write_dispositions(deliveries) { | ||
var first; | ||
var last; | ||
var next_id; | ||
for (var i = 0; i < deliveries.length; i++) { | ||
var delivery = deliveries[i]; | ||
if (first === undefined) { | ||
first = delivery; | ||
last = delivery; | ||
next_id = delivery.id; | ||
} | ||
if (!message.are_outcomes_equivalent(last.state, delivery.state) || last.settled !== delivery.settled || next_id !== delivery.id) { | ||
first.link.session.output(frames.disposition({'role':first.link.is_receiver(),'first':first.id,'last':last.id, 'state':first.state, 'settled':first.settled}).described()); | ||
first = delivery; | ||
last = delivery; | ||
next_id = delivery.id; | ||
} else { | ||
if (last.id !== delivery.id) { | ||
last = delivery; | ||
} | ||
next_id++; | ||
} | ||
} | ||
if (first !== undefined && last !== undefined) { | ||
first.link.session.output(frames.disposition({'role':first.link.is_receiver(),'first':first.id,'last':last.id, 'state':first.state, 'settled':first.settled}).described()); | ||
} | ||
} | ||
var Outgoing = function () { | ||
this.deliveries = new CircularBuffer(2048/*TODO= configurable?*/); | ||
this.updated = []; | ||
this.pending_dispositions = []; | ||
this.next_delivery_id = 0; | ||
@@ -103,2 +135,6 @@ this.next_pending_delivery = 0; | ||
'remote_state': undefined}; | ||
var self = this; | ||
d.update = function (settled, state) { | ||
self.update(d, settled, state); | ||
}; | ||
this.deliveries.push(d); | ||
@@ -121,5 +157,2 @@ return d; | ||
var d = this.deliveries.by_id(i); | ||
if (!d) { | ||
console.log('Could not find delivery for ' + i + ' [' + JSON.stringify(fields) + ']'); | ||
} | ||
if (d && !d.remote_settled) { | ||
@@ -132,3 +165,3 @@ var updated = false; | ||
if (fields.state && fields.state !== d.remote_state) { | ||
d.remote_state = fields.state; | ||
d.remote_state = message.unwrap_outcome(fields.state); | ||
updated = true; | ||
@@ -143,2 +176,13 @@ } | ||
Outgoing.prototype.update = function (delivery, settled, state) { | ||
if (delivery) { | ||
delivery.settled = settled; | ||
if (state !== undefined) delivery.state = state; | ||
if (!delivery.remote_settled) { | ||
this.pending_dispositions.push(delivery); | ||
} | ||
delivery.link.connection._register(); | ||
} | ||
}; | ||
Outgoing.prototype.transfer_window = function() { | ||
@@ -162,3 +206,3 @@ if (this.remote_window) { | ||
this.window -= d.transfers_required; | ||
d.link.session.output(frames.transfer({'handle':d.link.local.handle,'message_format':d.format,'delivery_id':d.id, 'delivery_tag':d.tag}).described(), d.data); | ||
d.link.session.output(frames.transfer({'handle':d.link.local.handle,'message_format':d.format,'delivery_id':d.id, 'delivery_tag':d.tag, 'settled':d.settled}).described(), d.data); | ||
d.link.credit--; | ||
@@ -184,7 +228,4 @@ this.next_pending_delivery++; | ||
var d = this.updated[i]; | ||
if (d.remote_state) { | ||
d.remote_state = message.unwrap_outcome(d.remote_state); | ||
if (d.remote_state && d.remote_state.constructor.composite_type) { | ||
d.link.dispatch(d.remote_state.constructor.composite_type, d.link._context({'delivery':d})); | ||
} | ||
if (d.remote_state && d.remote_state.constructor.composite_type) { | ||
d.link.dispatch(d.remote_state.constructor.composite_type, d.link._context({'delivery':d})); | ||
} | ||
@@ -195,2 +236,7 @@ if (d.remote_settled) d.link.dispatch('settled', d.link._context({'delivery':d})); | ||
if (this.pending_dispositions.length) { | ||
write_dispositions(this.pending_dispositions); | ||
this.pending_dispositions = []; | ||
} | ||
// remove any fully settled deliveries: | ||
@@ -243,6 +289,23 @@ this.deliveries.pop_if(function (d) { return d.settled && d.remote_settled; }); | ||
'state': undefined, | ||
'remote_settled': frame.performative.settled, | ||
'remote_state': undefined}; | ||
'remote_settled': frame.performative.settled === undefined ? false : frame.performative.settled, | ||
'remote_state': frame.performative.state}; | ||
var self = this; | ||
current.update = function (settled, state) { self.update(current, settled, state); }; | ||
current.update = function (settled, state) { | ||
var settled_ = settled; | ||
if (settled_ === undefined) { | ||
settled_ = receiver.local.attach.rcv_settle_mode !== 1; | ||
} | ||
self.update(current, settled_, state); | ||
}; | ||
current.accept = function () { this.update(undefined, message.accepted().described()); }; | ||
current.release = function (params) { | ||
if (params) { | ||
this.update(undefined, message.modified(params).described()); | ||
} else { | ||
this.update(undefined, message.released().described()); | ||
} | ||
}; | ||
current.reject = function (error) { this.update(true, message.rejected({'error':error}).described()); }; | ||
current.modified = function (params) { this.update(true, message.modified(params).described()); }; | ||
this.deliveries.push(current); | ||
@@ -268,30 +331,3 @@ data = frame.payload; | ||
if (this.updated.length > 0) { | ||
var first; | ||
var last; | ||
var next_id; | ||
for (var i = 0; i < this.updated.length; i++) { | ||
var delivery = this.updated[i]; | ||
if (first === undefined) { | ||
first = delivery; | ||
last = delivery; | ||
next_id = delivery.id; | ||
} | ||
if (!message.are_outcomes_equivalent(last.state, delivery.state) || last.settled !== delivery.settled || next_id !== delivery.id) { | ||
first.link.session.output(frames.disposition({'role':true,'first':first.id,'last':last.id, 'state':first.state, 'settled':first.settled}).described()); | ||
first = delivery; | ||
last = delivery; | ||
next_id = delivery.id; | ||
} else { | ||
if (last.id !== delivery.id) { | ||
last = delivery; | ||
} | ||
next_id++; | ||
} | ||
} | ||
if (first !== undefined && last !== undefined) { | ||
first.link.session.output(frames.disposition({'role':true,'first':first.id,'last':last.id, 'state':first.state, 'settled':first.settled}).described()); | ||
} | ||
write_dispositions(this.updated); | ||
this.updated = []; | ||
@@ -317,5 +353,2 @@ } | ||
var d = this.deliveries.by_id(i); | ||
if (!d) { | ||
console.log('Could not find delivery for ' + i); | ||
} | ||
if (d && !d.remote_settled) { | ||
@@ -325,15 +358,6 @@ var updated = false; | ||
d.remote_settled = fields.settled; | ||
updated = true; | ||
d.link.dispatch('settled', d.link._context({'delivery':d})); | ||
} | ||
if (fields.state && fields.state !== d.remote_state) { | ||
d.remote_state = fields.state; | ||
updated = true; | ||
} | ||
if (updated) { | ||
console.log(d.link.connection.options.id + ' added delivery to updated list following receipt of disposition for incoming deliveries'); | ||
this.updated.push(d); | ||
} | ||
} | ||
} | ||
}; | ||
@@ -370,4 +394,5 @@ | ||
EventEmitter.prototype.emit.apply(this, arguments); | ||
return true; | ||
} else { | ||
this.connection.dispatch.apply(this.connection, arguments); | ||
return this.connection.dispatch.apply(this.connection, arguments); | ||
} | ||
@@ -438,3 +463,4 @@ }; | ||
Session.prototype.end = function () { | ||
Session.prototype.end = function (error) { | ||
if (error) this.local.end.error = error; | ||
if (this.state.close()) { | ||
@@ -485,2 +511,3 @@ this.connection._register(); | ||
if (link) { | ||
if (link._get_drain()) fields.drain = true; | ||
fields.delivery_count = link.delivery_count; | ||
@@ -487,0 +514,0 @@ fields.handle = link.local.handle; |
@@ -236,3 +236,3 @@ /* | ||
define_type('CharUTF32', 0x73); | ||
define_type('Timestamp', 0x83);//TODO: convert to/from Date | ||
define_type('Timestamp', 0x83, {'write':write_long, 'read':read_long});//TODO: convert to/from Date | ||
define_type('Uuid', 0x98);//TODO: convert to/from stringified form? | ||
@@ -276,3 +276,3 @@ define_type('Vbin8', 0xa0); | ||
types.wrap_boolean = function(v) { | ||
return new types.Boolean(v); | ||
return v ? new types.True() : new types.False(); | ||
}; | ||
@@ -279,0 +279,0 @@ types.wrap_ulong = function(l) { |
{ | ||
"name" : "rhea", | ||
"version": "0.1.2", | ||
"version": "0.1.3", | ||
"description": "reactive AMQP 1.0 library", | ||
@@ -5,0 +5,0 @@ "homepage": "http://github.com/grs/rhea", |
@@ -310,3 +310,4 @@ # rhea | ||
Closes a connection. | ||
Closes a connection (may take an error object which is an object | ||
that consists of condition and description fields). | ||
@@ -332,12 +333,6 @@ #### events: | ||
##### source_address() | ||
Returns the address of the source from which this receiver is | ||
receiving messages. This can be useful when the source name is | ||
generated by the peer, i.e. for so-called dyanmic nodes (like | ||
temporary queues) used in some request-response patterns. | ||
##### close() | ||
Closes a receiving link (i.e. cancels the subscription). | ||
Closes a receiving link (i.e. cancels the subscription). (May take an error object which is an object | ||
that consists of condition and description fields). | ||
@@ -349,3 +344,3 @@ ##### detach() | ||
##### flow(n) | ||
##### add_credit(n) | ||
@@ -355,4 +350,5 @@ By default, receivers have a prefetch window that is moved | ||
set the prefecth to zero and manage credit itself. Each invocation of | ||
flow() method issues credit for a further 'n' messages to be sent by | ||
the peer over this receiving link. | ||
add_credit() method issues credit for a further 'n' messages to be | ||
sent by the peer over this receiving link. [Note: flow()is an alias | ||
for add_credit()] | ||
@@ -405,5 +401,12 @@ ##### credit() | ||
The header, properties and application_properties can be set on the | ||
message itself, i.e. the nesting is not necessary (it reflects the | ||
AMQP specification however). So | ||
e.g. {subject:'abc',colour:'green',body:'foo'} is equivalent to | ||
{properties:{subject:'abc'},application_properties:{colour:'green'},body:'foo'} | ||
##### close() | ||
Closes a sending link. | ||
Closes a sending link (may take an error object which is an object | ||
that consists of condition and description fields). | ||
@@ -410,0 +413,0 @@ ##### detach() |
@@ -57,2 +57,15 @@ /* | ||
} | ||
function close_test_simple(error, verification) { | ||
return function(done) { | ||
container.on('connection_close', function(context) { | ||
verification(context.connection); | ||
done(); | ||
}); | ||
var c = container.connect(listener.address()); | ||
c.on('connection_open', function(context) { | ||
context.connection.close(error); | ||
}); | ||
c.on('connection_close', function(context) {}); | ||
}; | ||
} | ||
@@ -80,18 +93,33 @@ afterEach(function() { | ||
})); | ||
it('hostname', open_test({hostname:'my-virtual-host'}, function(connection) { | ||
it('hostname explicit', open_test({hostname:'my-virtual-host'}, function(connection) { | ||
assert.equal(connection.remote.open.hostname, 'my-virtual-host'); | ||
})); | ||
it('container_id', open_test({container_id:'this-is-me'}, function(connection) { | ||
it('hostname aliased', open_test({hostname:'my-virtual-host'}, function(connection) { | ||
assert.equal(connection.hostname, 'my-virtual-host'); | ||
})); | ||
it('container_id explicit', open_test({container_id:'this-is-me'}, function(connection) { | ||
assert.equal(connection.remote.open.container_id, 'this-is-me'); | ||
})); | ||
it('max frame size', open_test({max_frame_size:5432}, function(connection) { | ||
it('container_id aliased', open_test({container_id:'this-is-me'}, function(connection) { | ||
assert.equal(connection.container_id, 'this-is-me'); | ||
})); | ||
it('max frame size explicit', open_test({max_frame_size:5432}, function(connection) { | ||
assert.equal(connection.remote.open.max_frame_size, 5432); | ||
})); | ||
it('channel max', open_test({channel_max:10}, function(connection) { | ||
it('max frame size aliased', open_test({max_frame_size:5432}, function(connection) { | ||
assert.equal(connection.max_frame_size, 5432); | ||
})); | ||
it('channel max explicit', open_test({channel_max:10}, function(connection) { | ||
assert.equal(connection.remote.open.channel_max, 10); | ||
})); | ||
it('idle time out', open_test({idle_time_out:1000}, function(connection) { | ||
it('channel max aliased', open_test({channel_max:10}, function(connection) { | ||
assert.equal(connection.channel_max, 10); | ||
})); | ||
it('idle time out explicit', open_test({idle_time_out:1000}, function(connection) { | ||
assert.equal(connection.remote.open.idle_time_out, 1000); | ||
})); | ||
it('properties', open_test({properties:{flavour:'vanilla', scoops:2, cone:true}}, function(connection) { | ||
it('idle time out aliased', open_test({idle_time_out:1000}, function(connection) { | ||
assert.equal(connection.idle_time_out, 1000); | ||
})); | ||
it('properties explicit', open_test({properties:{flavour:'vanilla', scoops:2, cone:true}}, function(connection) { | ||
assert.equal(connection.remote.open.properties.flavour, 'vanilla'); | ||
@@ -101,2 +129,7 @@ assert.equal(connection.remote.open.properties.scoops, 2); | ||
})); | ||
it('properties aliased', open_test({properties:{flavour:'vanilla', scoops:2, cone:true}}, function(connection) { | ||
assert.equal(connection.properties.flavour, 'vanilla'); | ||
assert.equal(connection.properties.scoops, 2); | ||
assert.equal(connection.properties.cone, true); | ||
})); | ||
it('error on close', close_test({condition:'amqp:connection:forced', description:'testing error on close'}, function(connection) { | ||
@@ -107,4 +140,82 @@ var error = connection.remote.close.error; | ||
})); | ||
it('pass error to close', close_test_simple({condition:'amqp:connection:forced', description:'testing error on close'}, function(connection) { | ||
var error = connection.remote.close.error; | ||
assert.equal(error.condition, 'amqp:connection:forced'); | ||
assert.equal(error.description, 'testing error on close'); | ||
})); | ||
}); | ||
describe('connection error handling', function() { | ||
var container, listener; | ||
beforeEach(function(done) { | ||
container = rhea.create_container(); | ||
listener = container.listen({port:0}); | ||
listener.on('listening', function() { | ||
done(); | ||
}); | ||
}); | ||
afterEach(function() { | ||
listener.close(); | ||
}); | ||
it('error and close handled', function (done) { | ||
var error_handler_called; | ||
var close_handler_called; | ||
container.on('connection_open', function(context) { | ||
context.connection.close({condition:'amqp:connection:forced', description:'testing error on close'}); | ||
}); | ||
container.on('connection_close', function(context) { | ||
assert.equal(error_handler_called, true); | ||
assert.equal(close_handler_called, true); | ||
done(); | ||
}); | ||
var c = container.connect(listener.address()); | ||
c.on('connection_error', function(context) { | ||
error_handler_called = true; | ||
var error = context.connection.error; | ||
assert.equal(error.condition, 'amqp:connection:forced'); | ||
assert.equal(error.description, 'testing error on close'); | ||
}); | ||
c.on('connection_close', function(context) { | ||
close_handler_called = true; | ||
var error = context.connection.error; | ||
assert.equal(error.condition, 'amqp:connection:forced'); | ||
assert.equal(error.description, 'testing error on close'); | ||
}); | ||
}); | ||
it('error handled', function (done) { | ||
var error_handler_called; | ||
container.on('connection_open', function(context) { | ||
context.connection.close({condition:'amqp:connection:forced', description:'testing error on close'}); | ||
}); | ||
container.on('connection_close', function(context) { | ||
assert.equal(error_handler_called, true); | ||
done(); | ||
}); | ||
var c = rhea.create_container().connect(listener.address()); | ||
c.on('connection_error', function(context) { | ||
error_handler_called = true; | ||
var error = context.connection.error; | ||
assert.equal(error.condition, 'amqp:connection:forced'); | ||
assert.equal(error.description, 'testing error on close'); | ||
}); | ||
}); | ||
it('unhandled error', function (done) { | ||
var error_handler_called; | ||
container.on('connection_open', function(context) { | ||
context.connection.close({condition:'amqp:connection:forced', description:'testing error on close'}); | ||
}); | ||
container.on('connection_close', function(context) { | ||
done(); | ||
}); | ||
var container2 = rhea.create_container(); | ||
container2.on('error', function (error) { | ||
assert.equal(error.condition, 'amqp:connection:forced'); | ||
assert.equal(error.description, 'testing error on close'); | ||
}); | ||
var c = container2.connect(listener.address()); | ||
}); | ||
}); | ||
describe('connection events', function() { | ||
@@ -111,0 +222,0 @@ var listener; |
@@ -70,2 +70,17 @@ /* | ||
} | ||
function close_test_simple(local_role, error, verification) { | ||
var remote_role = local_role === 'sender' ? 'receiver' : 'sender'; | ||
return function(done) { | ||
container.on(remote_role + '_close', function(context) { | ||
verification(context[remote_role]); | ||
done(); | ||
}); | ||
var c = container.connect(listener.address()); | ||
c.on(local_role + '_open', function(context) { | ||
context[local_role].close(error); | ||
}); | ||
c.on(local_role + '_close', function(context) {}); | ||
c['open_' + local_role](); | ||
}; | ||
} | ||
function close_sender_test(error, verification) { | ||
@@ -88,5 +103,8 @@ return close_test('sender', error, verification); | ||
})); | ||
it('single offered ' + t + ' capability', open_test(t, {offered_capabilities:'foo'}, function(link) { | ||
it('single offered ' + t + ' capability explicit', open_test(t, {offered_capabilities:'foo'}, function(link) { | ||
assert.equal(link.remote.attach.offered_capabilities, 'foo'); | ||
})); | ||
it('single offered ' + t + ' capability aliased', open_test(t, {offered_capabilities:'foo'}, function(link) { | ||
assert.equal(link.offered_capabilities, 'foo'); | ||
})); | ||
it('multiple offered ' + t + ' capabilities', open_test(t, {offered_capabilities:['foo', 'bar']}, function(link) { | ||
@@ -116,2 +134,7 @@ assert.equal(link.remote.attach.offered_capabilities.length, 2); | ||
})); | ||
it('pass error to ' + t + ' close', close_test_simple(t, {condition:'amqp:link:detach-forced', description:'testing error on close'}, function(link) { | ||
var error = link.remote.detach.error; | ||
assert.equal(error.condition, 'amqp:link:detach-forced'); | ||
assert.equal(error.description, 'testing error on close'); | ||
})); | ||
} | ||
@@ -121,2 +144,5 @@ it('source address as simple string', open_receiver_test('my-source', function (link) { | ||
})); | ||
it('source address aliased', open_receiver_test('my-source', function (link) { | ||
assert.equal(link.source.address, 'my-source'); | ||
})); | ||
it('source address as single nested value', open_receiver_test({source:'my-source'}, function (link) { | ||
@@ -171,5 +197,12 @@ assert.equal(link.remote.attach.source.address, 'my-source'); | ||
})); | ||
it('dynamic source aliased', open_receiver_test({source:{dynamic:true, dynamic_node_properties:{foo:'bar'}}}, function (link) { | ||
assert.equal(link.source.dynamic, true); | ||
assert.equal(link.source.dynamic_node_properties.foo, 'bar'); | ||
})); | ||
it('target address as simple string', open_sender_test('my-target', function (link) { | ||
assert.equal(link.remote.attach.target.address, 'my-target'); | ||
})); | ||
it('target address aliased', open_sender_test('my-target', function (link) { | ||
assert.equal(link.target.address, 'my-target'); | ||
})); | ||
it('target address as single nested value', open_sender_test({target:'my-target'}, function (link) { | ||
@@ -265,2 +298,220 @@ assert.equal(link.remote.attach.target.address, 'my-target'); | ||
}); | ||
describe(local_role + ' error handling', function() { | ||
var container, listener; | ||
var remote_role; | ||
beforeEach(function(done) { | ||
remote_role = roles[local_role]; | ||
container = rhea.create_container(); | ||
listener = container.listen({port:0}); | ||
listener.on('listening', function() { | ||
done(); | ||
}); | ||
}); | ||
afterEach(function() { | ||
listener.close(); | ||
}); | ||
it('error and close handled', function (done) { | ||
var error_handler_called; | ||
var close_handler_called; | ||
container.on(remote_role + '_open', function(context) { | ||
context[remote_role].close({condition:'amqp:link:detach-forced', description:'testing error on close'}); | ||
}); | ||
container.on('connection_close', function(context) { | ||
assert.equal(error_handler_called, true); | ||
assert.equal(close_handler_called, true); | ||
done(); | ||
}); | ||
var c = rhea.create_container().connect(listener.address()); | ||
c.on(local_role + '_error', function(context) { | ||
error_handler_called = true; | ||
var error = context[local_role].error; | ||
assert.equal(error.condition, 'amqp:link:detach-forced'); | ||
assert.equal(error.description, 'testing error on close'); | ||
}); | ||
c.on(local_role + '_close', function(context) { | ||
close_handler_called = true; | ||
var error = context[local_role].error; | ||
assert.equal(error.condition, 'amqp:link:detach-forced'); | ||
assert.equal(error.description, 'testing error on close'); | ||
c.close(); | ||
}); | ||
c['open_' + local_role]('foo'); | ||
}); | ||
it('error handled', function (done) { | ||
var error_handler_called; | ||
container.on(remote_role + '_open', function(context) { | ||
context[remote_role].close({condition:'amqp:link:detach-forced', description:'testing error on close'}); | ||
}); | ||
container.on('connection_close', function(context) { | ||
assert.equal(error_handler_called, true); | ||
done(); | ||
}); | ||
var c = rhea.create_container().connect(listener.address()); | ||
c.on(local_role + '_error', function(context) { | ||
error_handler_called = true; | ||
var error = context[local_role].error; | ||
assert.equal(error.condition, 'amqp:link:detach-forced'); | ||
assert.equal(error.description, 'testing error on close'); | ||
c.close(); | ||
}); | ||
c['open_' + local_role](); | ||
}); | ||
it('unhandled error', function (done) { | ||
var error_handler_called; | ||
container.on(remote_role + '_open', function(context) { | ||
context[remote_role].close({condition:'amqp:link:detach-forced', description:'testing error on close'}); | ||
}); | ||
container.on('connection_close', function(context) { | ||
done(); | ||
}); | ||
var container2 = rhea.create_container(); | ||
var c = container2.connect(listener.address()); | ||
container2.on('error', function (error) { | ||
assert.equal(error.condition, 'amqp:link:detach-forced'); | ||
assert.equal(error.description, 'testing error on close'); | ||
c.close(); | ||
}); | ||
c['open_' + local_role](); | ||
}); | ||
}); | ||
} | ||
describe('settlement modes', function() { | ||
var server, client, listener; | ||
beforeEach(function(done) { | ||
server = rhea.create_container(); | ||
client = rhea.create_container(); | ||
listener = server.listen({port:0}); | ||
listener.on('listening', function() { | ||
done(); | ||
}); | ||
}); | ||
afterEach(function() { | ||
listener.close(); | ||
}); | ||
it('sender sends unsettled', function(done) { | ||
server.on('receiver_open', function(context) { | ||
assert.equal(context.receiver.snd_settle_mode, 0); | ||
}); | ||
server.on('message', function(context) { | ||
assert.equal(context.message.body, 'settle-me'); | ||
assert.equal(context.delivery.remote_settled, false); | ||
}); | ||
client.on('settled', function (context) { | ||
context.connection.close(); | ||
}); | ||
client.once('sendable', function (context) { | ||
context.sender.send({body:'settle-me'}); | ||
}); | ||
client.on('connection_close', function (context) { | ||
done(); | ||
}); | ||
client.connect(listener.address()).attach_sender({snd_settle_mode:0}); | ||
}); | ||
it('sender sends settled', function(done) { | ||
server.on('receiver_open', function(context) { | ||
assert.equal(context.receiver.snd_settle_mode, 1); | ||
}); | ||
server.on('message', function(context) { | ||
assert.equal(context.message.body, 'already-settled'); | ||
assert.equal(context.delivery.remote_settled, true); | ||
context.connection.close(); | ||
}); | ||
client.once('sendable', function (context) { | ||
context.sender.send({body:'already-settled'}); | ||
}); | ||
client.on('connection_close', function (context) { | ||
done(); | ||
}); | ||
client.connect(listener.address()).attach_sender({snd_settle_mode:1}); | ||
}); | ||
it('receiver requests send unsettled', function(done) { | ||
server.on('sender_open', function(context) { | ||
assert.equal(context.sender.snd_settle_mode, 0); | ||
context.sender.local.attach.snd_settle_mode = context.sender.snd_settle_mode; | ||
}); | ||
server.on('settled', function (context) { | ||
context.connection.close(); | ||
}); | ||
server.once('sendable', function (context) { | ||
context.sender.send({body:'settle-me'}); | ||
}); | ||
client.on('message', function(context) { | ||
assert.equal(context.message.body, 'settle-me'); | ||
assert.equal(context.delivery.remote_settled, false); | ||
}); | ||
client.on('connection_close', function (context) { | ||
done(); | ||
}); | ||
client.connect(listener.address()).attach_receiver({snd_settle_mode:0}); | ||
}); | ||
it('receiver requests send settled', function(done) { | ||
server.on('sender_open', function(context) { | ||
assert.equal(context.sender.snd_settle_mode, 1); | ||
context.sender.local.attach.snd_settle_mode = context.sender.snd_settle_mode; | ||
}); | ||
server.once('sendable', function (context) { | ||
context.sender.send({body:'already-settled'}); | ||
}); | ||
client.on('message', function(context) { | ||
assert.equal(context.message.body, 'already-settled'); | ||
assert.equal(context.delivery.remote_settled, true); | ||
context.connection.close(); | ||
}); | ||
client.on('connection_close', function (context) { | ||
done(); | ||
}); | ||
client.connect(listener.address()).attach_receiver({snd_settle_mode:1}); | ||
}); | ||
it('receiver settles first', function(done) { | ||
server.on('sender_open', function(context) { | ||
assert.equal(context.sender.rcv_settle_mode, 0); | ||
}); | ||
server.once('sendable', function (context) { | ||
context.sender.send({body:'settle-me'}); | ||
}); | ||
server.once('accepted', function (context) { | ||
assert.equal(context.delivery.remote_settled, true); | ||
context.connection.close(); | ||
}); | ||
client.on('message', function(context) { | ||
assert.equal(context.message.body, 'settle-me'); | ||
assert.equal(context.delivery.remote_settled, false); | ||
}); | ||
client.on('connection_close', function (context) { | ||
done(); | ||
}); | ||
client.connect(listener.address()).attach_receiver({rcv_settle_mode:0}); | ||
}); | ||
it('receiver settles second', function(done) { | ||
server.on('sender_open', function(context) { | ||
assert.equal(context.sender.rcv_settle_mode, 1); | ||
}); | ||
server.once('sendable', function (context) { | ||
context.sender.send({body:'settle-me'}); | ||
}); | ||
server.once('accepted', function (context) { | ||
assert.equal(context.delivery.remote_settled, false); | ||
context.delivery.update(true); | ||
}); | ||
client.on('message', function(context) { | ||
assert.equal(context.message.body, 'settle-me'); | ||
assert.equal(context.delivery.remote_settled, false); | ||
}); | ||
client.on('settled', function (context) { | ||
assert.equal(context.delivery.remote_settled, true); | ||
context.connection.close(); | ||
}); | ||
client.on('connection_close', function (context) { | ||
done(); | ||
}); | ||
client.connect(listener.address()).attach_receiver({rcv_settle_mode:1}); | ||
}); | ||
}); |
@@ -79,5 +79,8 @@ /* | ||
})); | ||
it('sends and receives map body', transfer_test({body:{colour:'green',age:8}}, function(message) { | ||
it('sends and receives map body', transfer_test({body:{colour:'green',age:8,happy:true, sad:false}}, function(message) { | ||
assert.equal(message.body.colour, 'green'); | ||
assert.equal(message.body.age, 8); | ||
assert.equal(message.body.happy, true); | ||
assert.equal(message.body.sad, false); | ||
assert.equal(message.body.indifferent, undefined); | ||
})); | ||
@@ -124,3 +127,243 @@ it('sends and receives map with ulongs', transfer_test({body:{age:amqp_types.wrap_ulong(888), max:amqp_types.wrap_ulong(9007199254740992), | ||
})); | ||
it('get header and properties directly', transfer_test({properties:{ | ||
message_id:'my-id', | ||
user_id:'my-user', | ||
to:'my-to', | ||
subject:'my-subject', | ||
reply_to:'my-reply-to', | ||
correlation_id:'correlate-me', | ||
content_type:'text', | ||
content_encoding:'ascii', | ||
absolute_expiry_time:123456789, | ||
creation_time:987654321, | ||
group_id:'my-group', | ||
group_sequence:77, | ||
reply_to_group_id:'still-my-group' | ||
}, header:{ | ||
durable:true, | ||
priority:3, | ||
ttl:123456789, | ||
first_acquirer:false, | ||
delivery_count:8 | ||
}}, function(message) { | ||
assert.equal(message.message_id, 'my-id'); | ||
assert.equal(message.user_id, 'my-user'); | ||
assert.equal(message.to, 'my-to'); | ||
assert.equal(message.subject, 'my-subject'); | ||
assert.equal(message.reply_to, 'my-reply-to'); | ||
assert.equal(message.correlation_id, 'correlate-me'); | ||
assert.equal(message.content_type, 'text'); | ||
assert.equal(message.content_encoding, 'ascii'); | ||
assert.equal(message.absolute_expiry_time, 123456789); | ||
assert.equal(message.creation_time, 987654321); | ||
assert.equal(message.group_id, 'my-group'); | ||
assert.equal(message.group_sequence, 77); | ||
assert.equal(message.reply_to_group_id, 'still-my-group'); | ||
assert.equal(message.durable, true); | ||
assert.equal(message.priority, 3); | ||
assert.equal(message.ttl, 123456789); | ||
assert.equal(message.first_acquirer, false); | ||
assert.equal(message.delivery_count, 8); | ||
})); | ||
it('set header and properties directly', transfer_test({ | ||
message_id:'my-id', | ||
user_id:'my-user', | ||
to:'my-to', | ||
subject:'my-subject', | ||
reply_to:'my-reply-to', | ||
correlation_id:'correlate-me', | ||
content_type:'text', | ||
content_encoding:'ascii', | ||
absolute_expiry_time:123456789, | ||
creation_time:987654321, | ||
group_id:'my-group', | ||
group_sequence:77, | ||
reply_to_group_id:'still-my-group', | ||
durable:true, | ||
priority:3, | ||
ttl:123456789, | ||
first_acquirer:false, | ||
delivery_count:8 | ||
}, function(message) { | ||
assert.equal(message.properties.message_id, 'my-id'); | ||
assert.equal(message.properties.user_id, 'my-user'); | ||
assert.equal(message.properties.to, 'my-to'); | ||
assert.equal(message.properties.subject, 'my-subject'); | ||
assert.equal(message.properties.reply_to, 'my-reply-to'); | ||
assert.equal(message.properties.correlation_id, 'correlate-me'); | ||
assert.equal(message.properties.content_type, 'text'); | ||
assert.equal(message.properties.content_encoding, 'ascii'); | ||
assert.equal(message.properties.absolute_expiry_time, 123456789); | ||
assert.equal(message.properties.creation_time, 987654321); | ||
assert.equal(message.properties.group_id, 'my-group'); | ||
assert.equal(message.properties.group_sequence, 77); | ||
assert.equal(message.properties.reply_to_group_id, 'still-my-group'); | ||
assert.equal(message.header.durable, true); | ||
assert.equal(message.header.priority, 3); | ||
assert.equal(message.header.ttl, 123456789); | ||
assert.equal(message.header.first_acquirer, false); | ||
assert.equal(message.header.delivery_count, 8); | ||
})); | ||
it('get application property directly', transfer_test({application_properties:{colour:'red'}}, function(message) { | ||
assert.equal(message.colour, 'red'); | ||
})); | ||
it('set application property directly', transfer_test({colour:'red'}, function(message) { | ||
assert.equal(message.application_properties.colour, 'red'); | ||
})); | ||
it('test undefined properties and headers directly', transfer_test({body:'hello world!'}, function(message) { | ||
assert.equal(message.body, 'hello world!'); | ||
assert.equal(message.message_id, undefined); | ||
assert.equal(message.user_id, undefined); | ||
assert.equal(message.to, undefined); | ||
assert.equal(message.subject, undefined); | ||
assert.equal(message.reply_to, undefined); | ||
assert.equal(message.correlation_id, undefined); | ||
assert.equal(message.content_type, undefined); | ||
assert.equal(message.content_encoding, undefined); | ||
assert.equal(message.absolute_expiry_time, undefined); | ||
assert.equal(message.creation_time, undefined); | ||
assert.equal(message.group_id, undefined); | ||
assert.equal(message.group_sequence, undefined); | ||
assert.equal(message.reply_to_group_id, undefined); | ||
assert.equal(message.durable, undefined); | ||
assert.equal(message.priority, undefined); | ||
assert.equal(message.ttl, undefined); | ||
assert.equal(message.first_acquirer, undefined); | ||
assert.equal(message.delivery_count, undefined); | ||
})); | ||
}); | ||
describe('acknowledgement', function() { | ||
var server, client, listener; | ||
var outcome; | ||
beforeEach(function(done) { | ||
outcome = {}; | ||
server = rhea.create_container(); | ||
server.on('accepted', function (context) { | ||
outcome.state = 'accepted'; | ||
}); | ||
server.on('released', function (context) { | ||
outcome.state = 'released'; | ||
outcome.delivery_failed = context.delivery.remote_state.delivery_failed; | ||
outcome.undeliverable_here = context.delivery.remote_state.undeliverable_here; | ||
}); | ||
server.on('rejected', function (context) { | ||
outcome.state = 'rejected'; | ||
outcome.error = context.delivery.remote_state.error; | ||
}); | ||
server.on('settled', function (context) { | ||
context.connection.close(); | ||
}); | ||
client = rhea.create_container(); | ||
listener = server.listen({port:0}); | ||
listener.on('listening', function() { | ||
done(); | ||
}); | ||
}); | ||
afterEach(function() { | ||
listener.close(); | ||
}); | ||
it('auto-accept', function(done) { | ||
server.once('sendable', function (context) { | ||
context.sender.send({body:'accept-me'}); | ||
}); | ||
client.on('message', function(context) { | ||
assert.equal(context.message.body, 'accept-me'); | ||
}); | ||
client.on('connection_close', function (context) { | ||
assert.equal(outcome.state, 'accepted'); | ||
done(); | ||
}); | ||
client.connect(listener.address()).attach_receiver(); | ||
}); | ||
it('explicit accept', function(done) { | ||
server.once('sendable', function (context) { | ||
context.sender.send({body:'accept-me'}); | ||
}); | ||
client.on('message', function(context) { | ||
assert.equal(context.message.body, 'accept-me'); | ||
context.delivery.accept(); | ||
}); | ||
client.on('connection_close', function (context) { | ||
assert.equal(outcome.state, 'accepted'); | ||
done(); | ||
}); | ||
client.connect(listener.address()).attach_receiver({autoaccept: false}); | ||
}); | ||
it('explicit release', function(done) { | ||
server.once('sendable', function (context) { | ||
context.sender.send({body:'release-me'}); | ||
}); | ||
client.on('message', function(context) { | ||
assert.equal(context.message.body, 'release-me'); | ||
context.delivery.release(); | ||
}); | ||
client.on('connection_close', function (context) { | ||
assert.equal(outcome.state, 'released'); | ||
assert.equal(outcome.delivery_failed, undefined); | ||
assert.equal(outcome.undeliverable_here, undefined); | ||
done(); | ||
}); | ||
client.connect(listener.address()).attach_receiver({autoaccept: false}); | ||
}); | ||
it('explicit reject', function(done) { | ||
server.once('sendable', function (context) { | ||
context.sender.send({body:'reject-me'}); | ||
}); | ||
client.on('message', function(context) { | ||
assert.equal(context.message.body, 'reject-me'); | ||
context.delivery.reject({condition:'rhea:oops:string',description:'something bad occurred'}); | ||
}); | ||
client.on('connection_close', function (context) { | ||
assert.equal(outcome.state, 'rejected'); | ||
assert.equal(outcome.error.condition, 'rhea:oops:string'); | ||
assert.equal(outcome.modified, undefined); | ||
done(); | ||
}); | ||
client.connect(listener.address()).attach_receiver({autoaccept: false}); | ||
}); | ||
it('explicit modify', function(done) { | ||
server.options.treat_modified_as_released = false; | ||
server.on('modified', function (context) { | ||
assert.equal(outcome.state, undefined); | ||
outcome.state = 'modified'; | ||
outcome.delivery_failed = context.delivery.remote_state.delivery_failed; | ||
outcome.undeliverable_here = context.delivery.remote_state.undeliverable_here; | ||
}); | ||
server.once('sendable', function (context) { | ||
context.sender.send({body:'modify-me'}); | ||
}); | ||
client.on('message', function(context) { | ||
assert.equal(context.message.body, 'modify-me'); | ||
context.delivery.modified({delivery_failed:true, undeliverable_here: true}); | ||
}); | ||
client.on('connection_close', function (context) { | ||
assert.equal(outcome.state, 'modified'); | ||
assert.equal(outcome.delivery_failed, true); | ||
assert.equal(outcome.undeliverable_here, true); | ||
done(); | ||
}); | ||
client.connect(listener.address()).attach_receiver({autoaccept: false}); | ||
}); | ||
it('modified as released', function(done) { | ||
server.once('sendable', function (context) { | ||
context.sender.send({body:'try-again'}); | ||
}); | ||
client.on('message', function(context) { | ||
assert.equal(context.message.body, 'try-again'); | ||
context.delivery.release({delivery_failed:true, undeliverable_here: true}); | ||
}); | ||
client.on('connection_close', function (context) { | ||
assert.equal(outcome.state, 'released'); | ||
assert.equal(outcome.delivery_failed, true); | ||
assert.equal(outcome.undeliverable_here, true); | ||
done(); | ||
}); | ||
client.connect(listener.address()).attach_receiver({autoaccept: false}); | ||
}); | ||
}); |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
306126
73
6531
439