Security News
Input Validation Vulnerabilities Dominate MITRE's 2024 CWE Top 25 List
MITRE's 2024 CWE Top 25 highlights critical software vulnerabilities like XSS, SQL Injection, and CSRF, reflecting shifts due to a refined ranking methodology.
amqp-stream
Advanced tools
This is a flexible streaming interface for the node-amqp RabbitMQ client that aims to fully support node's streaming API. It can be used to create readable, writable, and duplex streams over amqp. This can help in chunking data for transport over amqp, but more importantly can be used to wire amqp listeners/providers into the expanding collection of node stream services.
npm install amqp-stream
Example: A simple chat client using amqp-stream with pipes.
###chat.js
var amqp = require( 'amqp' ), amqp_stream = require( 'amqp-stream' );
var connection = amqp.createConnection();
amqp_stream( {connection:connection, exchange:'chat.exchange'}, function ( err, chat_stream ) {
process.stdin.pipe( chat_stream ).pipe( process.stdout );
});
Provided you have a RabbitMQ server running locally with default security settings, you can execute the above script in two separate terminals and begin communicating.
###amqp_stream(options [, callback])
amqp_stream()
will return an instantiated amqp stream object. The new operator
may also be used. Additionally, there is a createStream function that will also
return an instantiated object. These three methods are all equal:
var stream = amqp_stream( ... )
var stream = new amqp_stream( ... );
var stream = amqp_stream.createStream( ... );
The exact behavior and features of an amqp stream will depend on the options passed at the time it is instantiated. The options are:
connection
: amqp.Connection, no default.
If set, must be an instance of amqp.Connection. The stream will use this connection
to declare named AMQP objects.exchange
: amqp.Exchange || string, no default.
If set, a writable/duplex stream will be returned. It must be either a string or an instance
of amqp.Exchange. If it is a string, then a connection must also be provided (see above).routingKey
: string, default='#'.
The routingKey used for reads/writes (publish/subscribe).queue
: amqp.Queue || string || false, defaults to an exclusive queue.
If set, a readable/duplex stream will be returned. It must be either a string or an instance
of amqp.Queue. If it is a string, then a connection must also be provided (see above). If a
queue is not provided, then by default an exclusive queue will be created for reads. To over-ride
this behaviour, the option may be set to false
. If a pre-bound amqp.Queue instance is provided,
and no exchange is provided in the option set, then the exchange that the queue object was
previously bound to will be used for writes. To avoid this behaviour, do not bind the queue before
creating the stream.autoBind
: true|false, default=true.
If set, then then the associated queue will be bound to the specified exchange with the specified
routingKey when it is declared. This is the default. To over-ride this, autoBind must be set to
false
.exchangeOptions
: {}queueOptions
: {}
Options to be used when declaring the queue.publishOptions
: {}
Options to be used when publishing (writing) to the exchange.When creating an amqp stream, the options provided must contain at least one node-amqp object: either an amqp.Connection, amqp.Exchange, or amqp.Queue.
amqp_stream.write( buff|string [, encoding] )
amqp_stream.error( buff|string [, encoding] )
amqp_stream.end( buff|string [, encoding] )
Writes to an amqp stream are published to the associated exchange / routing key. If publishOptions were provided at initialization time, they are used.
There are two headers used by amqp-stream. x-stream-origin and x-stream-event. These headers are used to track message origins and payload types going across the amqp bus, so that the receiver on the other end can emit the message correctly to it's listeners.
amqp_stream.on( 'data', function (buf) {})
amqp_stream.on( 'error', function () {})
amqp_stream.on( 'end', function () {})
amqp_stream.pause()
amqp_stream.resume()
All data received by the amqp stream's associated queue will be emitted, provided the message did not originate from the stream object itself (in duplex streams).
Duplex amqp streams provide both readable and writable capabilities described above. One important note is that when writing to a duplex stream, the data being sent will not be emitted locally, even though the local queue will receive the message. The x-stream-origin header is used to identify this situation and discard the data.
AMQP (at least RabbitMQ) makes use of several options for RPC-like behaviour, and in situations where a response must be matched with a request. Exclusive queues combined with correlationId and replyTo are used to achieve this.
In amqp-stream, this is handled via correlated streams, which are created from an existing amqp stream. When a write occurs on a correlated stream, the message goes out with the appropriate correlationId/replyTo, and a special event is emitted on the receiving end using amqp-stream containing a new "sub stream". Writes to this new stream are sent back to the correlated stream created locally. It's easier in practice than it is to explain, so here's an example:
###amqp-rpc-server.js
var amqp = require( 'amqp' ), amqp_stream = require( 'amqp-stream' );
var connection = amqp.createConnection();
amqp_stream( {connection:connection, exchange:'rpc', routingKey:'upper'}, function ( err, rpc_stream ) {
rpc_stream.on( 'correlatedRequest', function (correlatedStream) {
correlatedStream.on( 'data', function (buf) {
correlatedStream.write( buf.toString().toUpperCase() );
correlatedStream.end();
});
});
});
###amqp-rpc-client.js
var amqp = require( 'amqp' ), amqp_stream = require( 'amqp-stream' );
var connection = amqp.createConnection();
amqp_stream( {connection:connection, exchange:'rpc', routingKey:'upper'}, function ( err, rpc_stream ) {
rpc_stream.createCorrelatedRequest(function ( err, upper ) {
upper.on( 'data', function (buf) { console.log(buf.toString()) });
upper.on( 'end', connection.end.bind( connection ) );
upper.write( process.argv[2] );
});
});
###amqp_stream.createCorrelatedRequest([correlationId] [, callback])
Sets up a new correalted stream
correlationId
string, optional
A custom correlationId may be provided, otherwise one will be generated automatically.callback(err, correlated_stream)
function, optional
If a callback is provided, it will be executed with (err,stream) arguments once the
correlated stream is ready.###amqp_stream.on('correlatedRequest', callback)
The correlatedRequest event is emitted on amqp_streams receiving the correlated request.
The callback is executed and passed a single argument - a duplex stream created via
createCorrelatedResponse
.
The primary reason for amqp-stream existing is to enable the many great node modules that rely on node's stream api to operate over amqp.
Scuttlebutt is an awesome node module that can be used to replicate data structures between nodes via streams. Here's an example of using crdt (a subclass of scuttlebutt) over amqp-stream. A more comprehensive example (with some basic output), along with other examples, can be found in the examples directory.
###amqp-crdt-replication.js
var amqp = require( 'amqp' )
, amqp_stream = require( 'amqp-stream' )
, Doc = require('crdt').Doc
;
var connection = amqp.createConnection()
, ReplicatedDoc = new Doc()
;
amqp_stream( {connection:connection, exchange:'scuttlebutt', routingKey:'document.stream'}, function ( err, doc_stream ) {
var rs;
(rs = ReplicatedDoc.createStream())
.pipe(doc_stream)
.pipe(rs);
});
Dnode is an RPC client for node that wires client and servers up via streams (net sockets by default). It is possible to run dnode over amqp-stream, though there's a few things to note.
###amqp-dnode-server.js
var dnode = require( 'dnode' )
, amqp = require( 'amqp' )
, amqp_stream = require( 'amqp-stream' )
;
var connection = amqp.createConnection();
amqp_stream( {connection:connection, exchange:'dnode'}, function ( err, rpc_stream ) {
rpc_stream.on( 'correlatedRequest', function (cstream) {
var d = dnode({
transform : function (s, cb) {
cb(s.replace(/[aeiou]{2,}/, 'oo').toUpperCase())
}
});
cstream.pipe(d).pipe(cstream);
});
});
###amqp-dnode-client.js
var dnode = require( 'dnode' )
, amqp = require( 'amqp' )
, amqp_stream = require( 'amqp-stream' )
;
var connection = amqp.createConnection()
, d = dnode();
amqp_stream( {connection:connection, exchange:'dnode'}, function ( err, rpc_stream ) {
rpc_stream.createCorrelatedRequest(function (err, cstream) {
d.on('remote', function (remote) {
remote.transform('beep', function (s) {
console.log('beep => ' + s);
d.end();
});
});
cstream.on( 'end', connection.end.bind(connection) );
cstream.pipe(d).pipe(cstream);
/* This write forces the correlated stream to be initialized on remote side and prompts
* dnode server to send us the remote interface
*/
cstream.write();
});
});
Note, that we do an empty write in the client. Because dnode expects the server to deliver the remote interface upon connection, and amqp doesn't emulate socket connections very well, we use a correlated stream which does fire an event on the remote side, but it does not fire until the first message (stream write) comes through. By doing an empty write, we trigger the correlatedRequest event on the remote side which sets up our dnode pipes properly.
FAQs
Stream interface to node's AMQP driver
The npm package amqp-stream receives a total of 0 weekly downloads. As such, amqp-stream popularity was classified as not popular.
We found that amqp-stream demonstrated a not healthy version release cadence and project activity because the last version was released a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Security News
MITRE's 2024 CWE Top 25 highlights critical software vulnerabilities like XSS, SQL Injection, and CSRF, reflecting shifts due to a refined ranking methodology.
Security News
In this segment of the Risky Business podcast, Feross Aboukhadijeh and Patrick Gray discuss the challenges of tracking malware discovered in open source softare.
Research
Security News
A threat actor's playbook for exploiting the npm ecosystem was exposed on the dark web, detailing how to build a blockchain-powered botnet.