Security News
38% of CISOs Fear They’re Not Moving Fast Enough on AI
CISOs are racing to adopt AI for cybersecurity, but hurdles in budgets and governance may leave some falling behind in the fight against cyber threats.
amqp10 is a promise-based, AMQP 1.0 compliant node.js client
The basic usage is to require the module, new up a client with the appropriate policy for the server you're connecting against, connect, and then send/receive as necessary. So a simple example for a local Apache Qpid server would look like:
var AMQPClient = require('amqp10').Client,
Promise = require('bluebird');
var client = new AMQPClient(); // Uses PolicyBase default policy
client.connect('amqp://localhost')
.then(function() {
return Promise.all([
client.createReceiver('amq.topic'),
client.createSender('amq.topic')
]);
})
.spread(function(receiver, sender) {
receiver.on('errorReceived', function(err) { // check for errors });
receiver.on('message', function(message) {
console.log('Rx message: ', message.body);
});
return sender.send({ key: "Value" });
})
.error(function(err) {
console.log("error: ", err);
});
By default send promises are resolved when a disposition frame is received from the remote link for the sent message, at this point the message is considered "settled". To tune this behavior, you can tweak the policy you give to AMQPClient on construction. For instance, to force send promises to be resolved immediately on successful sending of the payload, you would build AMQPClient like so:
var AMQPClient = require('amqp10').Client,
Policy = require('amqp10').Policy;
var client = new AMQPClient(Policy.merge({
senderLinkPolicy: {
callbackPolicy: Policy.Utils.SenderCallbackPolicies.OnSent
}
}, Policy.DefaultPolicy));
In addition to the above, you can also tune how message link credit is doled out (for throttling), as well as most other AMQP behaviors, all through policy overrides. See DefaultPolicy and the policy utilities for more details on altering various behaviors.
Flow control in AMQP occurs at both the Session
and Link
layers. Using our default policy, we start out with some sensible Session windows and Link credits, and renew those every time they get to the half-way point. In addition, receiver links start in "auto-settle" mode, which means that the sender side can consider the message "settled" as soon as it's sent. However, all of those settings are easily tune-able through Policy overrides (Policy.merge(<overrides>, <base policy>)
).
For instance. we've provided a convenience helper for throttling your receiver links to only renew credits on messages they've "settled". To use this with Azure ServiceBus Queues for instance, it would look like:
var AMQPClient = require('amqp10').Client,
Policy = require('amqp10').Policy;
var client = new AMQPClient(Policy.Utils.RenewOnSettle(1, 1, Policy.ServiceBusQueue));
Where the first number is the initial credit, and the second is the threshold - once remaining credit goes below that, we will give out more credit by the number of messages we've settled. In this case we're setting up the client for one-by-one message processing. Behind the scenes, this does the following:
Sets the Link's creditQuantum to the first number (1), which you can do for yourself via the Policy mix-in { receiverLink: { creditQuantum: 1 } }
Sets the Link to not auto-settle messages at the sender, which you can do for yourself via { receiverLink: { attach: { receiverSettleMode: 1 } } }
Where did that magic "1" come from? Well, that's the value from the spec, but you could use the constant we've defined at require('amqp10').Constants.receiverSettleMode.settleOnDisposition
Sets the Link's credit renewal policy to a custom method that renews only when the link credit is below the threshold and we've settled some messages. You can do this yourself by using your own custom method:
{
receiverLink: {
credit: function (link, options) {
// If the receiver link was just connected, set the initial link credit to the quantum. Otherwise, give more credit for every message we've settled.
var creditQuantum = (!!options && options.initial) ? link.policy.creditQuantum : link.settledMessagesSinceLastCredit;
if (creditQuantum > 0 && link.linkCredit < threshold) {
link.addCredits(creditQuantum);
}
}
}
}
Note that once you've set the policy to not auto-settle messages, you'll need to settle them yourself. We've tried to make that easy by providing methods on the receiver link for each of the possible "disposition states" that AMQP allows:
link.accept(message)
will tell the sender that you've accepted and processed the message.link.reject(message, [error])
will reject the message with the given error (if provided). The sender is free to re-deliver, so this can be used to indicate transient errors.link.modify(message, [options])
will tell the sender to modify the message and re-deliver. You can tell it you can't accept the message by using link.modify(message, { undeliverableHere: true })
link.release(message)
will tell the sender that you haven't processed the message and it's free to re-deliver - even back to you.All of these methods accept an array of messages, allowing you to settle many at once.
The amqp10 module now supports pluggable Client behaviors with the exported use
method. Officially supported plugins include:
We are currently actively running integration tests against the following servers:
We have been tested against the following servers, but not exhaustively so issues may remain:
If you find any issues, please report them via GitHub.
Using node's built-in net/tls classes for communicating with the server.
Data from the server is written to a buffer-list based on Rod Vagg's BL.
Outgoing data is encoded using this buffer builder - streaming output won't really work since each outgoing payload needs to be prefixed with its encoded size, however we're working on converting to use as much streaming as possible.
The connection state is managed using Stately.js, with state transitions swapping which callback gets invoked on receipt of new data. (e.g. post-connection, we write the AMQP version header and then install a callback to ensure the correct version. Once incoming data is written to the circular buffer, this callback is invoked, and a comparison vs. the expected version triggers another transition).
Debug output is done via debug with the prefix amqp10:
. The main client's debug
name is amqp10:client
so setting DEBUG=amqp10:client
as an environment variable will get you all top-level debugging output.
bash# export DEBUG=amqp*
C:\> set DEBUG=amqp*
[root@pinguino]# node simple_eventhub_test.js
amqp10:client connecting to: amqps://xxxxxx:xxxxxxxxx@xxxxxxxxxxxx.servicebus.windows.net +0ms
amqp10:connection Connecting to xxxxxx-service-bus-001.servicebus.windows.net:5671 via TLS +72ms
amqp10:connection Transitioning from DISCONNECTED to START due to connect +17ms
amqp10:connection Sending Header 414d515003010000 +405ms
amqp10:connection Transitioning from START to IN_SASL due to connected +6ms
amqp10:connection Rx: 414d515003010000 +128ms
amqp10:sasl Server SASL Version: 414d515003010000 vs 414d515003010000 +1ms
amqp10:connection Rx: 0000003f02010000005340c03201e02f04b3000000074d535342434... +162ms
amqp10:client Reading variable with prefix 0xc0 of length 52 +2ms
amqp10:client Decoding 5340 +0ms
[...]
Many thanks to Gordon Sim for inspiration on the type system, gleaned from his project rhea.
FAQs
Native AMQP-1.0 client for node.js
The npm package amqp10 receives a total of 0 weekly downloads. As such, amqp10 popularity was classified as not popular.
We found that amqp10 demonstrated a not healthy version release cadence and project activity because the last version was released a year ago. It has 5 open source maintainers collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Security News
CISOs are racing to adopt AI for cybersecurity, but hurdles in budgets and governance may leave some falling behind in the fight against cyber threats.
Research
Security News
Socket researchers uncovered a backdoored typosquat of BoltDB in the Go ecosystem, exploiting Go Module Proxy caching to persist undetected for years.
Security News
Company News
Socket is joining TC54 to help develop standards for software supply chain security, contributing to the evolution of SBOMs, CycloneDX, and Package URL specifications.