Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

rhea

Package Overview
Dependencies
Maintainers
1
Versions
83
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

rhea - npm Package Compare versions

Comparing version 0.1.2 to 0.1.3

examples/direct_send.js

8

examples/client.js

@@ -25,4 +25,4 @@ /*

function next_request(context) {
if (context.receiver.remote.attach.source.address) {
sender.send({properties:{reply_to:context.receiver.remote.attach.source.address}, body:requests[0]})
if (context.receiver.source.address) {
sender.send({properties:{reply_to:context.receiver.source.address}, body:requests[0]})
}

@@ -32,4 +32,4 @@ }

container.on('connection_open', function (context) {
sender = context.connection.attach_sender('examples');
context.connection.attach_receiver({source:{dynamic:true}});
sender = context.connection.open_sender('examples');
context.connection.open_receiver({source:{dynamic:true}});
});

@@ -36,0 +36,0 @@ container.on('receiver_open', function (context) {

@@ -28,3 +28,3 @@ /*

server.once('listening', function () {
container.connect({'port':8888}).attach_sender('examples');
container.connect({'port':8888}).open_sender('examples');
});

@@ -31,0 +31,0 @@ server.once('connection', function () {

@@ -18,4 +18,4 @@ /*

container.on('connection_open', function (context) {
context.connection.attach_receiver('examples');
context.connection.attach_sender('examples');
context.connection.open_receiver('examples');
context.connection.open_sender('examples');
});

@@ -22,0 +22,0 @@ container.on('message', function (context) {

@@ -37,2 +37,2 @@ /*

container.connect({'port':args.port}).attach_receiver({source:{address:args.node,distribution_mode:'copy'}});
container.connect({'port':args.port}).open_receiver({source:{address:args.node,distribution_mode:'copy'}});

@@ -96,3 +96,3 @@ /*

var connection = container.connect(connect_options);
sender = connection.attach_sender('examples');
connection.attach_receiver('examples');
sender = connection.open_sender('examples');
connection.open_receiver('examples');

@@ -41,12 +41,12 @@ /*

container.on('sender_open', function (context) {
subscribe(context.connection.remote.open.container_id, context.sender);
subscribe(context.connection.container_id, context.sender);
});
container.on('sender_close', function (context) {
unsubscribe(context.connection.remote.open.container_id);
unsubscribe(context.connection.container_id);
});
container.on('connection_close', function (context) {
unsubscribe(context.connection.remote.open.container_id);
unsubscribe(context.connection.container_id);
});
container.on('disconnected', function (context) {
unsubscribe(context.connection.remote.open.container_id);
unsubscribe(context.connection.container_id);
});

@@ -56,3 +56,3 @@

if (expected === 0 || received < expected) {
var name = context.connection.remote.open.container_id;
var name = context.connection.container_id;
console.log('echoed ' + context.message.body + ' to ' + name);

@@ -59,0 +59,0 @@ listeners[name].send(context.message);

@@ -22,3 +22,3 @@ /*

container.on('connection_open', function (context) {
context.connection.attach_receiver('examples');
context.connection.open_receiver('examples');
});

@@ -25,0 +25,0 @@

@@ -41,2 +41,2 @@ /*

container.connect({'port':args.port}).attach_receiver(args.node);
container.connect({'port':args.port}).open_receiver(args.node);

@@ -44,2 +44,2 @@ /*

container.connect({'port':args.port}).attach_sender(args.node);
container.connect({'port':args.port}).open_sender(args.node);

@@ -73,3 +73,5 @@ /*

} else {
if (signal === null) assert.equal(code, 0);
if (signal === null && !process.version.match(/v0\.10\.\d+/)) {
assert.equal(code, 0);
}
if (prog.verifier) {

@@ -76,0 +78,0 @@ prog.verifier(prog.actual_output);

@@ -34,2 +34,12 @@ /*

function ConnectionError(message, condition, connection) {
Error.call(this);
Error.captureStackTrace(this, this.constructor);
this.message = message;
this.condition = condition;
this.description = message;
this.connection = connection;
}
require('util').inherits(ConnectionError, Error);
function get_socket_id(socket) {

@@ -104,2 +114,18 @@ if (socket.get_id_string) return socket.get_id_string();

var aliases = ["container_id",
"hostname",
"max_frame_size",
"channel_max",
"idle_time_out",
"outgoing_locales",
"incoming_locales",
"offered_capabilities",
"desired_capabilities",
"properties"];
function remote_property_shortcut(name) {
return function() { return this.remote.open ? this.remote.open[name] : undefined; };
}
var conn_counter = 1;

@@ -153,2 +179,8 @@

this.default_sender = undefined;
for (var i in aliases) {
var f = aliases[i];
Object.defineProperty(this, f, { get: remote_property_shortcut(f) });
}
Object.defineProperty(this, 'error', { get: function() { return this.remote.close ? this.remote.close.error : undefined; }});
};

@@ -226,3 +258,3 @@

this.socket.on('data', this.input.bind(this));
this.socket.on('error', this.error.bind(this));
this.socket.on('error', this.on_error.bind(this));
this.socket.on('end', this.eof.bind(this));

@@ -302,8 +334,7 @@

if (error) {
//TODO: invoke connection_close regardless of whether connection_error is handled
//TODO: example for error handling
if (!this.dispatch('connection_error', this._context())) {
if (!this.dispatch('connection_close', this._context())) {
console.log('error: ' + JSON.stringify(error));
}
var handled = this.dispatch('connection_error', this._context());
handled = this.dispatch('connection_close', this._context()) || handled;
if (!handled) {
EventEmitter.prototype.emit.call(this.container, 'error', new ConnectionError(error.description, error.condition, this));
}

@@ -349,3 +380,7 @@ return true;

if (this.local.open.idle_time_out) this.heartbeat_in = setTimeout(this.idle.bind(this), this.local.open.idle_time_out);
if (this.transport.has_writes_pending()) this.output();
if (this.transport.has_writes_pending()) {
this.output();
} else if (this.is_closed() && this.state.has_settled()) {
this.socket.end();
}
};

@@ -361,3 +396,3 @@

Connection.prototype.error = function (e) {
Connection.prototype.on_error = function (e) {
console.log('[' + this.options.id + '] error: ' + e);

@@ -372,2 +407,4 @@ this._disconnected();

Connection.prototype._disconnected = function () {
if (this.heartbeat_out) clearTimeout(this.heartbeat_out);
if (this.heartbeat_in) clearTimeout(this.heartbeat_in);
if (!this.is_closed() && this.scheduled_reconnect == undefined) {

@@ -392,3 +429,4 @@ if (!this.dispatch('disconnected', this._context())) {

};
Connection.prototype.close = function () {
Connection.prototype.close = function (error) {
if (error) this.local.close.error = error;
if (this.state.close()) {

@@ -395,0 +433,0 @@ this._register();

@@ -38,5 +38,15 @@ /*

function auto_accept(context) {
context.delivery.update(true, message.accepted().described());
context.delivery.update(undefined, message.accepted().described());
};
function LinkError(message, condition, link) {
Error.call(this);
Error.captureStackTrace(this, this.constructor);
this.message = message;
this.condition = condition;
this.description = message;
this.link = link;
}
require('util').inherits(LinkError, Error);
var EventEmitter = require('events').EventEmitter;

@@ -50,4 +60,5 @@

EventEmitter.prototype.emit.apply(this, arguments);
return true;
} else {
this.session.dispatch.apply(this.session, arguments);
return this.session.dispatch.apply(this.session, arguments);
}

@@ -75,3 +86,4 @@ };

};
link.close = function() {
link.close = function(error) {
if (error) this.local.detach.error = error;
this.local.detach.closed = true;

@@ -123,2 +135,6 @@ if (this.state.close()) {

link.prefix_event = function (event) {
return (this.local.attach.role ? 'receiver_' : 'sender_') + event;
}
link.on_detach = function (frame) {

@@ -128,3 +144,12 @@ if (this.state.remote_closed()) {

this.close();
this.dispatch(this.local.attach.role ? 'receiver_close' : 'sender_close', this._context());
var error = this.remote.detach.error;
if (error) {
var handled = this.dispatch(this.prefix_event('error'), this._context());
handled = this.dispatch(this.prefix_event('close'), this._context()) || handled;
if (!handled) {
EventEmitter.prototype.emit.call(this.connection.container, 'error', new LinkError(error.description, error.condition, this));
}
} else {
this.dispatch(this.prefix_event('close'), this._context());
}
} else {

@@ -137,2 +162,3 @@ throw Error('Detach already received');

switch (name) {
case 'name':
case 'handle':

@@ -147,2 +173,16 @@ case 'role':

var aliases = ["snd_settle_mode",
"rcv_settle_mode",
"source",
"target",
"max_message_size",
"offered_capabilities",
"desired_capabilities",
"properties"];
function remote_property_shortcut(name) {
return function() { return this.remote.attach ? this.remote.attach[name] : undefined; }
}
link.init = function (session, name, local_handle, opts, is_receiver) {

@@ -154,3 +194,3 @@ this.session = session;

this.state = new EndpointState();
this.issue_flow = false;//currently only used by receiver
this.issue_flow = false;
this.local = {'handle': local_handle};

@@ -168,2 +208,7 @@ this.local.attach = frames.attach({'handle':local_handle,'name':name, role:is_receiver});

this.observers = new EventEmitter();
for (var i in aliases) {
var f = aliases[i];
Object.defineProperty(this, f, { get: remote_property_shortcut(f) });
}
Object.defineProperty(this, 'error', { get: function() { return this.remote.detach ? this.remote.detach.error : undefined; }});
};

@@ -199,2 +244,4 @@ link.reset = function() {

this.init(session, name, local_handle, opts, false);
this._draining = false;
this._drained = false;
this.local.attach.initial_delivery_count = 0;

@@ -205,5 +252,28 @@ this.tag = 0;

}
var sender = this;
if (this.get_option('treat_modified_as_released', true)) {
this.observers.on('modified', function (context) {
sender.dispatch('released', context);
});
}
};
Sender.prototype = Object.create(link);
Sender.prototype.constructor = Sender;
Sender.prototype._get_drain = function () {
if (this._draining && this._drained && this.credit) {
while (this.credit) {
++this.delivery_count;
--this.credit;
}
return true;
} else {
return false;
}
}
Sender.prototype.set_drained = function (drained) {
this._drained = drained;
if (this._draining && this._drained) {
this.issue_flow = true;
}
}
Sender.prototype.next_tag = function () {

@@ -218,4 +288,9 @@ return new String(this.tag++);

this.credit = flow.delivery_count + flow.link_credit - this.delivery_count;
this._draining = flow.drain;
this._drained = this.credit > 0;
if (this.is_open()) {
this.dispatch('sender_flow', this._context());
if (this._draining) {
this.dispatch('sender_draining', this._context());
}
if (this.sendable()) {

@@ -230,3 +305,7 @@ this.dispatch('sendable', this._context());

Sender.prototype.send = function (msg, tag) {
return this.session.send(this, tag ? tag : this.next_tag(), message.encode(msg), 0);
var delivery = this.session.send(this, tag ? tag : this.next_tag(), message.encode(msg), 0);
if (this.local.attach.snd_settle_mode === 1) {
delivery.settled = true;
}
return delivery;
};

@@ -237,3 +316,4 @@

this.init(session, name, local_handle, opts, true);
this.set_prefetch(this.get_option('prefetch', 100));
this.drain = false;
this.set_credit_window(this.get_option('credit_window', 100));
if (this.get_option('autoaccept', true)) {

@@ -247,2 +327,6 @@ this.observers.on('message', auto_accept);

this.dispatch('receiver_flow', this._context());
if (frame.performative.drain) {
if (frame.performative.link_credit > 0) console.log('ERROR: received flow with drain set, but non zero credit');
else this.dispatch('receiver_drained', this._context());
}
};

@@ -256,6 +340,10 @@ Receiver.prototype.flow = function(credit) {

};
Receiver.prototype.add_credit = Receiver.prototype.flow;//alias
Receiver.prototype._get_drain = function () {
return this.drain;
}
Receiver.prototype.set_prefetch = function(prefetch) {
if (prefetch > 0) {
var flow_controller = new FlowController(prefetch);
Receiver.prototype.set_credit_window = function(credit_window) {
if (credit_window > 0) {
var flow_controller = new FlowController(credit_window);
var listener = flow_controller.update.bind(flow_controller);

@@ -262,0 +350,0 @@ this.observers.on('message', listener);

@@ -110,6 +110,112 @@ /*

function reverse_lookup(map) {
var reversed = {};
for (var key in map) {
var values = map[key];
for (var i in values) {
reversed[values[i]] = key;
}
}
return reversed;
}
var special_sections = {"properties": ["message_id",
"user_id",
"to",
"subject",
"reply_to",
"correlation_id",
"content_type",
"content_encoding",
"absolute_expiry_time",
"creation_time",
"group_id",
"group_sequence",
"reply_to_group_id"],
"header": ["durable",
"priority",
"ttl",
"first_acquirer",
"delivery_count"]
};
var special_fields = reverse_lookup(special_sections);
var section_names = ["header", "delivery_annotations", "message_annotations", "properties", "application_properties", "body", "footer"];
function copy(src, tgt) {
for (var k in src) {
var v = src[k];
if (typeof v === "object") {
copy(v, tgt[k]);
} else {
tgt[k] = v;
}
}
}
function add_property_shortcut(o, group, name) {
Object.defineProperty(o, name,
{
get: function() { return this[group] ? this[group][name] : undefined; },
set: function(v) { if (this[group] === undefined) { this[group] = {}; } this[group][name] = v; }
});
}
function create_message(m) {
var msg = {
toJSON: function () {
var o = {};
for (var i in section_names) {
var key = section_names[i];
if (typeof this[key] === 'function') continue;
var fields = special_sections[key];
if (fields) {
for (var j in fields) {
if (this[fields[j]] !== undefined) {
if (o[key] === undefined) o[key] = {};
o[key][fields[j]] = this[fields[j]];
}
}
} else if (this[key]) {
o[key] = this[key];
}
}
return o;
},
inspect: function() {
return JSON.stringify(this.toJSON());
},
toString: function () {
return JSON.stringify(this.toJSON());
}
};
for (var field in special_fields) {
add_property_shortcut(msg, special_fields[field], field);
}
if (m) copy(m, msg);
return msg;
}
function massage(msg) {
var out = undefined;
for (var key in msg) {
if (typeof msg[key] !== 'function' && section_names.indexOf(key) < 0) {
if (out === undefined) out = {};
var group = special_fields[key] || 'application_properties';
if (out[group] === undefined) {
out[group] = {};
}
out[group][key] = msg[key];
} else {
if (out !== undefined) {
out[key] = msg[key];
}
}
}
return out || msg;
}
message.encode = function(obj) {
var sections = [];
wrappers.forEach(function (wrapper_fn) { wrapper_fn(sections, obj); });
var msg = massage(obj);
wrappers.forEach(function (wrapper_fn) { wrapper_fn(sections, msg); });
var writer = new types.Writer();

@@ -126,3 +232,3 @@ for (var i = 0; i < sections.length; i++) {

message.decode = function(buffer) {
var msg = {};
var msg = create_message();
var reader = new types.Reader(buffer);

@@ -143,2 +249,7 @@ while (reader.remaining()) {

}
if (msg.application_properties) {
for (var key in msg.application_properties) {
add_property_shortcut(msg, 'application_properties', key);
}
}
return msg;

@@ -170,6 +281,6 @@ }

if (c) {
return new c(outcome);
return new c(types.unwrap(outcome));
}
}
console.log('unrecognised outcome');
console.log('unrecognised outcome: ' + JSON.stringify(outcome));
return outcome;

@@ -186,4 +297,4 @@ };

fields:[
{name:"section-number", type:"uint", mandatory:true},
{name:"section-offset", type:"ulong", mandatory:true}
{name:"section_number", type:"uint", mandatory:true},
{name:"section_offset", type:"ulong", mandatory:true}
]});

@@ -196,7 +307,7 @@ define_outcome({name:"accepted", code:0x24, fields:[]});

fields:[
{name:"delivery-failed", type:"boolean"},
{name:"undeliverable-here", type:"boolean"},
{name:"message-annotations", type:"fields"}
{name:"delivery_failed", type:"boolean"},
{name:"undeliverable_here", type:"boolean"},
{name:"message_annotations", type:"map"}
]});
module.exports = message;

@@ -76,5 +76,37 @@ /*

function write_dispositions(deliveries) {
var first;
var last;
var next_id;
for (var i = 0; i < deliveries.length; i++) {
var delivery = deliveries[i];
if (first === undefined) {
first = delivery;
last = delivery;
next_id = delivery.id;
}
if (!message.are_outcomes_equivalent(last.state, delivery.state) || last.settled !== delivery.settled || next_id !== delivery.id) {
first.link.session.output(frames.disposition({'role':first.link.is_receiver(),'first':first.id,'last':last.id, 'state':first.state, 'settled':first.settled}).described());
first = delivery;
last = delivery;
next_id = delivery.id;
} else {
if (last.id !== delivery.id) {
last = delivery;
}
next_id++;
}
}
if (first !== undefined && last !== undefined) {
first.link.session.output(frames.disposition({'role':first.link.is_receiver(),'first':first.id,'last':last.id, 'state':first.state, 'settled':first.settled}).described());
}
}
var Outgoing = function () {
this.deliveries = new CircularBuffer(2048/*TODO= configurable?*/);
this.updated = [];
this.pending_dispositions = [];
this.next_delivery_id = 0;

@@ -103,2 +135,6 @@ this.next_pending_delivery = 0;

'remote_state': undefined};
var self = this;
d.update = function (settled, state) {
self.update(d, settled, state);
};
this.deliveries.push(d);

@@ -121,5 +157,2 @@ return d;

var d = this.deliveries.by_id(i);
if (!d) {
console.log('Could not find delivery for ' + i + ' [' + JSON.stringify(fields) + ']');
}
if (d && !d.remote_settled) {

@@ -132,3 +165,3 @@ var updated = false;

if (fields.state && fields.state !== d.remote_state) {
d.remote_state = fields.state;
d.remote_state = message.unwrap_outcome(fields.state);
updated = true;

@@ -143,2 +176,13 @@ }

Outgoing.prototype.update = function (delivery, settled, state) {
if (delivery) {
delivery.settled = settled;
if (state !== undefined) delivery.state = state;
if (!delivery.remote_settled) {
this.pending_dispositions.push(delivery);
}
delivery.link.connection._register();
}
};
Outgoing.prototype.transfer_window = function() {

@@ -162,3 +206,3 @@ if (this.remote_window) {

this.window -= d.transfers_required;
d.link.session.output(frames.transfer({'handle':d.link.local.handle,'message_format':d.format,'delivery_id':d.id, 'delivery_tag':d.tag}).described(), d.data);
d.link.session.output(frames.transfer({'handle':d.link.local.handle,'message_format':d.format,'delivery_id':d.id, 'delivery_tag':d.tag, 'settled':d.settled}).described(), d.data);
d.link.credit--;

@@ -184,7 +228,4 @@ this.next_pending_delivery++;

var d = this.updated[i];
if (d.remote_state) {
d.remote_state = message.unwrap_outcome(d.remote_state);
if (d.remote_state && d.remote_state.constructor.composite_type) {
d.link.dispatch(d.remote_state.constructor.composite_type, d.link._context({'delivery':d}));
}
if (d.remote_state && d.remote_state.constructor.composite_type) {
d.link.dispatch(d.remote_state.constructor.composite_type, d.link._context({'delivery':d}));
}

@@ -195,2 +236,7 @@ if (d.remote_settled) d.link.dispatch('settled', d.link._context({'delivery':d}));

if (this.pending_dispositions.length) {
write_dispositions(this.pending_dispositions);
this.pending_dispositions = [];
}
// remove any fully settled deliveries:

@@ -243,6 +289,23 @@ this.deliveries.pop_if(function (d) { return d.settled && d.remote_settled; });

'state': undefined,
'remote_settled': frame.performative.settled,
'remote_state': undefined};
'remote_settled': frame.performative.settled === undefined ? false : frame.performative.settled,
'remote_state': frame.performative.state};
var self = this;
current.update = function (settled, state) { self.update(current, settled, state); };
current.update = function (settled, state) {
var settled_ = settled;
if (settled_ === undefined) {
settled_ = receiver.local.attach.rcv_settle_mode !== 1;
}
self.update(current, settled_, state);
};
current.accept = function () { this.update(undefined, message.accepted().described()); };
current.release = function (params) {
if (params) {
this.update(undefined, message.modified(params).described());
} else {
this.update(undefined, message.released().described());
}
};
current.reject = function (error) { this.update(true, message.rejected({'error':error}).described()); };
current.modified = function (params) { this.update(true, message.modified(params).described()); };
this.deliveries.push(current);

@@ -268,30 +331,3 @@ data = frame.payload;

if (this.updated.length > 0) {
var first;
var last;
var next_id;
for (var i = 0; i < this.updated.length; i++) {
var delivery = this.updated[i];
if (first === undefined) {
first = delivery;
last = delivery;
next_id = delivery.id;
}
if (!message.are_outcomes_equivalent(last.state, delivery.state) || last.settled !== delivery.settled || next_id !== delivery.id) {
first.link.session.output(frames.disposition({'role':true,'first':first.id,'last':last.id, 'state':first.state, 'settled':first.settled}).described());
first = delivery;
last = delivery;
next_id = delivery.id;
} else {
if (last.id !== delivery.id) {
last = delivery;
}
next_id++;
}
}
if (first !== undefined && last !== undefined) {
first.link.session.output(frames.disposition({'role':true,'first':first.id,'last':last.id, 'state':first.state, 'settled':first.settled}).described());
}
write_dispositions(this.updated);
this.updated = [];

@@ -317,5 +353,2 @@ }

var d = this.deliveries.by_id(i);
if (!d) {
console.log('Could not find delivery for ' + i);
}
if (d && !d.remote_settled) {

@@ -325,15 +358,6 @@ var updated = false;

d.remote_settled = fields.settled;
updated = true;
d.link.dispatch('settled', d.link._context({'delivery':d}));
}
if (fields.state && fields.state !== d.remote_state) {
d.remote_state = fields.state;
updated = true;
}
if (updated) {
console.log(d.link.connection.options.id + ' added delivery to updated list following receipt of disposition for incoming deliveries');
this.updated.push(d);
}
}
}
};

@@ -370,4 +394,5 @@

EventEmitter.prototype.emit.apply(this, arguments);
return true;
} else {
this.connection.dispatch.apply(this.connection, arguments);
return this.connection.dispatch.apply(this.connection, arguments);
}

@@ -438,3 +463,4 @@ };

Session.prototype.end = function () {
Session.prototype.end = function (error) {
if (error) this.local.end.error = error;
if (this.state.close()) {

@@ -485,2 +511,3 @@ this.connection._register();

if (link) {
if (link._get_drain()) fields.drain = true;
fields.delivery_count = link.delivery_count;

@@ -487,0 +514,0 @@ fields.handle = link.local.handle;

@@ -236,3 +236,3 @@ /*

define_type('CharUTF32', 0x73);
define_type('Timestamp', 0x83);//TODO: convert to/from Date
define_type('Timestamp', 0x83, {'write':write_long, 'read':read_long});//TODO: convert to/from Date
define_type('Uuid', 0x98);//TODO: convert to/from stringified form?

@@ -276,3 +276,3 @@ define_type('Vbin8', 0xa0);

types.wrap_boolean = function(v) {
return new types.Boolean(v);
return v ? new types.True() : new types.False();
};

@@ -279,0 +279,0 @@ types.wrap_ulong = function(l) {

{
"name" : "rhea",
"version": "0.1.2",
"version": "0.1.3",
"description": "reactive AMQP 1.0 library",

@@ -5,0 +5,0 @@ "homepage": "http://github.com/grs/rhea",

@@ -310,3 +310,4 @@ # rhea

Closes a connection.
Closes a connection (may take an error object which is an object
that consists of condition and description fields).

@@ -332,12 +333,6 @@ #### events:

##### source_address()
Returns the address of the source from which this receiver is
receiving messages. This can be useful when the source name is
generated by the peer, i.e. for so-called dyanmic nodes (like
temporary queues) used in some request-response patterns.
##### close()
Closes a receiving link (i.e. cancels the subscription).
Closes a receiving link (i.e. cancels the subscription). (May take an error object which is an object
that consists of condition and description fields).

@@ -349,3 +344,3 @@ ##### detach()

##### flow(n)
##### add_credit(n)

@@ -355,4 +350,5 @@ By default, receivers have a prefetch window that is moved

set the prefecth to zero and manage credit itself. Each invocation of
flow() method issues credit for a further 'n' messages to be sent by
the peer over this receiving link.
add_credit() method issues credit for a further 'n' messages to be
sent by the peer over this receiving link. [Note: flow()is an alias
for add_credit()]

@@ -405,5 +401,12 @@ ##### credit()

The header, properties and application_properties can be set on the
message itself, i.e. the nesting is not necessary (it reflects the
AMQP specification however). So
e.g. {subject:'abc',colour:'green',body:'foo'} is equivalent to
{properties:{subject:'abc'},application_properties:{colour:'green'},body:'foo'}
##### close()
Closes a sending link.
Closes a sending link (may take an error object which is an object
that consists of condition and description fields).

@@ -410,0 +413,0 @@ ##### detach()

@@ -57,2 +57,15 @@ /*

}
function close_test_simple(error, verification) {
return function(done) {
container.on('connection_close', function(context) {
verification(context.connection);
done();
});
var c = container.connect(listener.address());
c.on('connection_open', function(context) {
context.connection.close(error);
});
c.on('connection_close', function(context) {});
};
}

@@ -80,18 +93,33 @@ afterEach(function() {

}));
it('hostname', open_test({hostname:'my-virtual-host'}, function(connection) {
it('hostname explicit', open_test({hostname:'my-virtual-host'}, function(connection) {
assert.equal(connection.remote.open.hostname, 'my-virtual-host');
}));
it('container_id', open_test({container_id:'this-is-me'}, function(connection) {
it('hostname aliased', open_test({hostname:'my-virtual-host'}, function(connection) {
assert.equal(connection.hostname, 'my-virtual-host');
}));
it('container_id explicit', open_test({container_id:'this-is-me'}, function(connection) {
assert.equal(connection.remote.open.container_id, 'this-is-me');
}));
it('max frame size', open_test({max_frame_size:5432}, function(connection) {
it('container_id aliased', open_test({container_id:'this-is-me'}, function(connection) {
assert.equal(connection.container_id, 'this-is-me');
}));
it('max frame size explicit', open_test({max_frame_size:5432}, function(connection) {
assert.equal(connection.remote.open.max_frame_size, 5432);
}));
it('channel max', open_test({channel_max:10}, function(connection) {
it('max frame size aliased', open_test({max_frame_size:5432}, function(connection) {
assert.equal(connection.max_frame_size, 5432);
}));
it('channel max explicit', open_test({channel_max:10}, function(connection) {
assert.equal(connection.remote.open.channel_max, 10);
}));
it('idle time out', open_test({idle_time_out:1000}, function(connection) {
it('channel max aliased', open_test({channel_max:10}, function(connection) {
assert.equal(connection.channel_max, 10);
}));
it('idle time out explicit', open_test({idle_time_out:1000}, function(connection) {
assert.equal(connection.remote.open.idle_time_out, 1000);
}));
it('properties', open_test({properties:{flavour:'vanilla', scoops:2, cone:true}}, function(connection) {
it('idle time out aliased', open_test({idle_time_out:1000}, function(connection) {
assert.equal(connection.idle_time_out, 1000);
}));
it('properties explicit', open_test({properties:{flavour:'vanilla', scoops:2, cone:true}}, function(connection) {
assert.equal(connection.remote.open.properties.flavour, 'vanilla');

@@ -101,2 +129,7 @@ assert.equal(connection.remote.open.properties.scoops, 2);

}));
it('properties aliased', open_test({properties:{flavour:'vanilla', scoops:2, cone:true}}, function(connection) {
assert.equal(connection.properties.flavour, 'vanilla');
assert.equal(connection.properties.scoops, 2);
assert.equal(connection.properties.cone, true);
}));
it('error on close', close_test({condition:'amqp:connection:forced', description:'testing error on close'}, function(connection) {

@@ -107,4 +140,82 @@ var error = connection.remote.close.error;

}));
it('pass error to close', close_test_simple({condition:'amqp:connection:forced', description:'testing error on close'}, function(connection) {
var error = connection.remote.close.error;
assert.equal(error.condition, 'amqp:connection:forced');
assert.equal(error.description, 'testing error on close');
}));
});
describe('connection error handling', function() {
var container, listener;
beforeEach(function(done) {
container = rhea.create_container();
listener = container.listen({port:0});
listener.on('listening', function() {
done();
});
});
afterEach(function() {
listener.close();
});
it('error and close handled', function (done) {
var error_handler_called;
var close_handler_called;
container.on('connection_open', function(context) {
context.connection.close({condition:'amqp:connection:forced', description:'testing error on close'});
});
container.on('connection_close', function(context) {
assert.equal(error_handler_called, true);
assert.equal(close_handler_called, true);
done();
});
var c = container.connect(listener.address());
c.on('connection_error', function(context) {
error_handler_called = true;
var error = context.connection.error;
assert.equal(error.condition, 'amqp:connection:forced');
assert.equal(error.description, 'testing error on close');
});
c.on('connection_close', function(context) {
close_handler_called = true;
var error = context.connection.error;
assert.equal(error.condition, 'amqp:connection:forced');
assert.equal(error.description, 'testing error on close');
});
});
it('error handled', function (done) {
var error_handler_called;
container.on('connection_open', function(context) {
context.connection.close({condition:'amqp:connection:forced', description:'testing error on close'});
});
container.on('connection_close', function(context) {
assert.equal(error_handler_called, true);
done();
});
var c = rhea.create_container().connect(listener.address());
c.on('connection_error', function(context) {
error_handler_called = true;
var error = context.connection.error;
assert.equal(error.condition, 'amqp:connection:forced');
assert.equal(error.description, 'testing error on close');
});
});
it('unhandled error', function (done) {
var error_handler_called;
container.on('connection_open', function(context) {
context.connection.close({condition:'amqp:connection:forced', description:'testing error on close'});
});
container.on('connection_close', function(context) {
done();
});
var container2 = rhea.create_container();
container2.on('error', function (error) {
assert.equal(error.condition, 'amqp:connection:forced');
assert.equal(error.description, 'testing error on close');
});
var c = container2.connect(listener.address());
});
});
describe('connection events', function() {

@@ -111,0 +222,0 @@ var listener;

@@ -70,2 +70,17 @@ /*

}
function close_test_simple(local_role, error, verification) {
var remote_role = local_role === 'sender' ? 'receiver' : 'sender';
return function(done) {
container.on(remote_role + '_close', function(context) {
verification(context[remote_role]);
done();
});
var c = container.connect(listener.address());
c.on(local_role + '_open', function(context) {
context[local_role].close(error);
});
c.on(local_role + '_close', function(context) {});
c['open_' + local_role]();
};
}
function close_sender_test(error, verification) {

@@ -88,5 +103,8 @@ return close_test('sender', error, verification);

}));
it('single offered ' + t + ' capability', open_test(t, {offered_capabilities:'foo'}, function(link) {
it('single offered ' + t + ' capability explicit', open_test(t, {offered_capabilities:'foo'}, function(link) {
assert.equal(link.remote.attach.offered_capabilities, 'foo');
}));
it('single offered ' + t + ' capability aliased', open_test(t, {offered_capabilities:'foo'}, function(link) {
assert.equal(link.offered_capabilities, 'foo');
}));
it('multiple offered ' + t + ' capabilities', open_test(t, {offered_capabilities:['foo', 'bar']}, function(link) {

@@ -116,2 +134,7 @@ assert.equal(link.remote.attach.offered_capabilities.length, 2);

}));
it('pass error to ' + t + ' close', close_test_simple(t, {condition:'amqp:link:detach-forced', description:'testing error on close'}, function(link) {
var error = link.remote.detach.error;
assert.equal(error.condition, 'amqp:link:detach-forced');
assert.equal(error.description, 'testing error on close');
}));
}

@@ -121,2 +144,5 @@ it('source address as simple string', open_receiver_test('my-source', function (link) {

}));
it('source address aliased', open_receiver_test('my-source', function (link) {
assert.equal(link.source.address, 'my-source');
}));
it('source address as single nested value', open_receiver_test({source:'my-source'}, function (link) {

@@ -171,5 +197,12 @@ assert.equal(link.remote.attach.source.address, 'my-source');

}));
it('dynamic source aliased', open_receiver_test({source:{dynamic:true, dynamic_node_properties:{foo:'bar'}}}, function (link) {
assert.equal(link.source.dynamic, true);
assert.equal(link.source.dynamic_node_properties.foo, 'bar');
}));
it('target address as simple string', open_sender_test('my-target', function (link) {
assert.equal(link.remote.attach.target.address, 'my-target');
}));
it('target address aliased', open_sender_test('my-target', function (link) {
assert.equal(link.target.address, 'my-target');
}));
it('target address as single nested value', open_sender_test({target:'my-target'}, function (link) {

@@ -265,2 +298,220 @@ assert.equal(link.remote.attach.target.address, 'my-target');

});
describe(local_role + ' error handling', function() {
var container, listener;
var remote_role;
beforeEach(function(done) {
remote_role = roles[local_role];
container = rhea.create_container();
listener = container.listen({port:0});
listener.on('listening', function() {
done();
});
});
afterEach(function() {
listener.close();
});
it('error and close handled', function (done) {
var error_handler_called;
var close_handler_called;
container.on(remote_role + '_open', function(context) {
context[remote_role].close({condition:'amqp:link:detach-forced', description:'testing error on close'});
});
container.on('connection_close', function(context) {
assert.equal(error_handler_called, true);
assert.equal(close_handler_called, true);
done();
});
var c = rhea.create_container().connect(listener.address());
c.on(local_role + '_error', function(context) {
error_handler_called = true;
var error = context[local_role].error;
assert.equal(error.condition, 'amqp:link:detach-forced');
assert.equal(error.description, 'testing error on close');
});
c.on(local_role + '_close', function(context) {
close_handler_called = true;
var error = context[local_role].error;
assert.equal(error.condition, 'amqp:link:detach-forced');
assert.equal(error.description, 'testing error on close');
c.close();
});
c['open_' + local_role]('foo');
});
it('error handled', function (done) {
var error_handler_called;
container.on(remote_role + '_open', function(context) {
context[remote_role].close({condition:'amqp:link:detach-forced', description:'testing error on close'});
});
container.on('connection_close', function(context) {
assert.equal(error_handler_called, true);
done();
});
var c = rhea.create_container().connect(listener.address());
c.on(local_role + '_error', function(context) {
error_handler_called = true;
var error = context[local_role].error;
assert.equal(error.condition, 'amqp:link:detach-forced');
assert.equal(error.description, 'testing error on close');
c.close();
});
c['open_' + local_role]();
});
it('unhandled error', function (done) {
var error_handler_called;
container.on(remote_role + '_open', function(context) {
context[remote_role].close({condition:'amqp:link:detach-forced', description:'testing error on close'});
});
container.on('connection_close', function(context) {
done();
});
var container2 = rhea.create_container();
var c = container2.connect(listener.address());
container2.on('error', function (error) {
assert.equal(error.condition, 'amqp:link:detach-forced');
assert.equal(error.description, 'testing error on close');
c.close();
});
c['open_' + local_role]();
});
});
}
describe('settlement modes', function() {
var server, client, listener;
beforeEach(function(done) {
server = rhea.create_container();
client = rhea.create_container();
listener = server.listen({port:0});
listener.on('listening', function() {
done();
});
});
afterEach(function() {
listener.close();
});
it('sender sends unsettled', function(done) {
server.on('receiver_open', function(context) {
assert.equal(context.receiver.snd_settle_mode, 0);
});
server.on('message', function(context) {
assert.equal(context.message.body, 'settle-me');
assert.equal(context.delivery.remote_settled, false);
});
client.on('settled', function (context) {
context.connection.close();
});
client.once('sendable', function (context) {
context.sender.send({body:'settle-me'});
});
client.on('connection_close', function (context) {
done();
});
client.connect(listener.address()).attach_sender({snd_settle_mode:0});
});
it('sender sends settled', function(done) {
server.on('receiver_open', function(context) {
assert.equal(context.receiver.snd_settle_mode, 1);
});
server.on('message', function(context) {
assert.equal(context.message.body, 'already-settled');
assert.equal(context.delivery.remote_settled, true);
context.connection.close();
});
client.once('sendable', function (context) {
context.sender.send({body:'already-settled'});
});
client.on('connection_close', function (context) {
done();
});
client.connect(listener.address()).attach_sender({snd_settle_mode:1});
});
it('receiver requests send unsettled', function(done) {
server.on('sender_open', function(context) {
assert.equal(context.sender.snd_settle_mode, 0);
context.sender.local.attach.snd_settle_mode = context.sender.snd_settle_mode;
});
server.on('settled', function (context) {
context.connection.close();
});
server.once('sendable', function (context) {
context.sender.send({body:'settle-me'});
});
client.on('message', function(context) {
assert.equal(context.message.body, 'settle-me');
assert.equal(context.delivery.remote_settled, false);
});
client.on('connection_close', function (context) {
done();
});
client.connect(listener.address()).attach_receiver({snd_settle_mode:0});
});
it('receiver requests send settled', function(done) {
server.on('sender_open', function(context) {
assert.equal(context.sender.snd_settle_mode, 1);
context.sender.local.attach.snd_settle_mode = context.sender.snd_settle_mode;
});
server.once('sendable', function (context) {
context.sender.send({body:'already-settled'});
});
client.on('message', function(context) {
assert.equal(context.message.body, 'already-settled');
assert.equal(context.delivery.remote_settled, true);
context.connection.close();
});
client.on('connection_close', function (context) {
done();
});
client.connect(listener.address()).attach_receiver({snd_settle_mode:1});
});
it('receiver settles first', function(done) {
server.on('sender_open', function(context) {
assert.equal(context.sender.rcv_settle_mode, 0);
});
server.once('sendable', function (context) {
context.sender.send({body:'settle-me'});
});
server.once('accepted', function (context) {
assert.equal(context.delivery.remote_settled, true);
context.connection.close();
});
client.on('message', function(context) {
assert.equal(context.message.body, 'settle-me');
assert.equal(context.delivery.remote_settled, false);
});
client.on('connection_close', function (context) {
done();
});
client.connect(listener.address()).attach_receiver({rcv_settle_mode:0});
});
it('receiver settles second', function(done) {
server.on('sender_open', function(context) {
assert.equal(context.sender.rcv_settle_mode, 1);
});
server.once('sendable', function (context) {
context.sender.send({body:'settle-me'});
});
server.once('accepted', function (context) {
assert.equal(context.delivery.remote_settled, false);
context.delivery.update(true);
});
client.on('message', function(context) {
assert.equal(context.message.body, 'settle-me');
assert.equal(context.delivery.remote_settled, false);
});
client.on('settled', function (context) {
assert.equal(context.delivery.remote_settled, true);
context.connection.close();
});
client.on('connection_close', function (context) {
done();
});
client.connect(listener.address()).attach_receiver({rcv_settle_mode:1});
});
});

@@ -79,5 +79,8 @@ /*

}));
it('sends and receives map body', transfer_test({body:{colour:'green',age:8}}, function(message) {
it('sends and receives map body', transfer_test({body:{colour:'green',age:8,happy:true, sad:false}}, function(message) {
assert.equal(message.body.colour, 'green');
assert.equal(message.body.age, 8);
assert.equal(message.body.happy, true);
assert.equal(message.body.sad, false);
assert.equal(message.body.indifferent, undefined);
}));

@@ -124,3 +127,243 @@ it('sends and receives map with ulongs', transfer_test({body:{age:amqp_types.wrap_ulong(888), max:amqp_types.wrap_ulong(9007199254740992),

}));
it('get header and properties directly', transfer_test({properties:{
message_id:'my-id',
user_id:'my-user',
to:'my-to',
subject:'my-subject',
reply_to:'my-reply-to',
correlation_id:'correlate-me',
content_type:'text',
content_encoding:'ascii',
absolute_expiry_time:123456789,
creation_time:987654321,
group_id:'my-group',
group_sequence:77,
reply_to_group_id:'still-my-group'
}, header:{
durable:true,
priority:3,
ttl:123456789,
first_acquirer:false,
delivery_count:8
}}, function(message) {
assert.equal(message.message_id, 'my-id');
assert.equal(message.user_id, 'my-user');
assert.equal(message.to, 'my-to');
assert.equal(message.subject, 'my-subject');
assert.equal(message.reply_to, 'my-reply-to');
assert.equal(message.correlation_id, 'correlate-me');
assert.equal(message.content_type, 'text');
assert.equal(message.content_encoding, 'ascii');
assert.equal(message.absolute_expiry_time, 123456789);
assert.equal(message.creation_time, 987654321);
assert.equal(message.group_id, 'my-group');
assert.equal(message.group_sequence, 77);
assert.equal(message.reply_to_group_id, 'still-my-group');
assert.equal(message.durable, true);
assert.equal(message.priority, 3);
assert.equal(message.ttl, 123456789);
assert.equal(message.first_acquirer, false);
assert.equal(message.delivery_count, 8);
}));
it('set header and properties directly', transfer_test({
message_id:'my-id',
user_id:'my-user',
to:'my-to',
subject:'my-subject',
reply_to:'my-reply-to',
correlation_id:'correlate-me',
content_type:'text',
content_encoding:'ascii',
absolute_expiry_time:123456789,
creation_time:987654321,
group_id:'my-group',
group_sequence:77,
reply_to_group_id:'still-my-group',
durable:true,
priority:3,
ttl:123456789,
first_acquirer:false,
delivery_count:8
}, function(message) {
assert.equal(message.properties.message_id, 'my-id');
assert.equal(message.properties.user_id, 'my-user');
assert.equal(message.properties.to, 'my-to');
assert.equal(message.properties.subject, 'my-subject');
assert.equal(message.properties.reply_to, 'my-reply-to');
assert.equal(message.properties.correlation_id, 'correlate-me');
assert.equal(message.properties.content_type, 'text');
assert.equal(message.properties.content_encoding, 'ascii');
assert.equal(message.properties.absolute_expiry_time, 123456789);
assert.equal(message.properties.creation_time, 987654321);
assert.equal(message.properties.group_id, 'my-group');
assert.equal(message.properties.group_sequence, 77);
assert.equal(message.properties.reply_to_group_id, 'still-my-group');
assert.equal(message.header.durable, true);
assert.equal(message.header.priority, 3);
assert.equal(message.header.ttl, 123456789);
assert.equal(message.header.first_acquirer, false);
assert.equal(message.header.delivery_count, 8);
}));
it('get application property directly', transfer_test({application_properties:{colour:'red'}}, function(message) {
assert.equal(message.colour, 'red');
}));
it('set application property directly', transfer_test({colour:'red'}, function(message) {
assert.equal(message.application_properties.colour, 'red');
}));
it('test undefined properties and headers directly', transfer_test({body:'hello world!'}, function(message) {
assert.equal(message.body, 'hello world!');
assert.equal(message.message_id, undefined);
assert.equal(message.user_id, undefined);
assert.equal(message.to, undefined);
assert.equal(message.subject, undefined);
assert.equal(message.reply_to, undefined);
assert.equal(message.correlation_id, undefined);
assert.equal(message.content_type, undefined);
assert.equal(message.content_encoding, undefined);
assert.equal(message.absolute_expiry_time, undefined);
assert.equal(message.creation_time, undefined);
assert.equal(message.group_id, undefined);
assert.equal(message.group_sequence, undefined);
assert.equal(message.reply_to_group_id, undefined);
assert.equal(message.durable, undefined);
assert.equal(message.priority, undefined);
assert.equal(message.ttl, undefined);
assert.equal(message.first_acquirer, undefined);
assert.equal(message.delivery_count, undefined);
}));
});
describe('acknowledgement', function() {
var server, client, listener;
var outcome;
beforeEach(function(done) {
outcome = {};
server = rhea.create_container();
server.on('accepted', function (context) {
outcome.state = 'accepted';
});
server.on('released', function (context) {
outcome.state = 'released';
outcome.delivery_failed = context.delivery.remote_state.delivery_failed;
outcome.undeliverable_here = context.delivery.remote_state.undeliverable_here;
});
server.on('rejected', function (context) {
outcome.state = 'rejected';
outcome.error = context.delivery.remote_state.error;
});
server.on('settled', function (context) {
context.connection.close();
});
client = rhea.create_container();
listener = server.listen({port:0});
listener.on('listening', function() {
done();
});
});
afterEach(function() {
listener.close();
});
it('auto-accept', function(done) {
server.once('sendable', function (context) {
context.sender.send({body:'accept-me'});
});
client.on('message', function(context) {
assert.equal(context.message.body, 'accept-me');
});
client.on('connection_close', function (context) {
assert.equal(outcome.state, 'accepted');
done();
});
client.connect(listener.address()).attach_receiver();
});
it('explicit accept', function(done) {
server.once('sendable', function (context) {
context.sender.send({body:'accept-me'});
});
client.on('message', function(context) {
assert.equal(context.message.body, 'accept-me');
context.delivery.accept();
});
client.on('connection_close', function (context) {
assert.equal(outcome.state, 'accepted');
done();
});
client.connect(listener.address()).attach_receiver({autoaccept: false});
});
it('explicit release', function(done) {
server.once('sendable', function (context) {
context.sender.send({body:'release-me'});
});
client.on('message', function(context) {
assert.equal(context.message.body, 'release-me');
context.delivery.release();
});
client.on('connection_close', function (context) {
assert.equal(outcome.state, 'released');
assert.equal(outcome.delivery_failed, undefined);
assert.equal(outcome.undeliverable_here, undefined);
done();
});
client.connect(listener.address()).attach_receiver({autoaccept: false});
});
it('explicit reject', function(done) {
server.once('sendable', function (context) {
context.sender.send({body:'reject-me'});
});
client.on('message', function(context) {
assert.equal(context.message.body, 'reject-me');
context.delivery.reject({condition:'rhea:oops:string',description:'something bad occurred'});
});
client.on('connection_close', function (context) {
assert.equal(outcome.state, 'rejected');
assert.equal(outcome.error.condition, 'rhea:oops:string');
assert.equal(outcome.modified, undefined);
done();
});
client.connect(listener.address()).attach_receiver({autoaccept: false});
});
it('explicit modify', function(done) {
server.options.treat_modified_as_released = false;
server.on('modified', function (context) {
assert.equal(outcome.state, undefined);
outcome.state = 'modified';
outcome.delivery_failed = context.delivery.remote_state.delivery_failed;
outcome.undeliverable_here = context.delivery.remote_state.undeliverable_here;
});
server.once('sendable', function (context) {
context.sender.send({body:'modify-me'});
});
client.on('message', function(context) {
assert.equal(context.message.body, 'modify-me');
context.delivery.modified({delivery_failed:true, undeliverable_here: true});
});
client.on('connection_close', function (context) {
assert.equal(outcome.state, 'modified');
assert.equal(outcome.delivery_failed, true);
assert.equal(outcome.undeliverable_here, true);
done();
});
client.connect(listener.address()).attach_receiver({autoaccept: false});
});
it('modified as released', function(done) {
server.once('sendable', function (context) {
context.sender.send({body:'try-again'});
});
client.on('message', function(context) {
assert.equal(context.message.body, 'try-again');
context.delivery.release({delivery_failed:true, undeliverable_here: true});
});
client.on('connection_close', function (context) {
assert.equal(outcome.state, 'released');
assert.equal(outcome.delivery_failed, true);
assert.equal(outcome.undeliverable_here, true);
done();
});
client.connect(listener.address()).attach_receiver({autoaccept: false});
});
});
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc