Comparing version 0.0.1 to 0.0.2
91
index.js
@@ -17,8 +17,13 @@ #!/usr/bin/env node | ||
describe:'Connect to the RabbitMQ at <url>'}) | ||
.options('queue', { | ||
.options('service', { | ||
'default': 'rmqcat', | ||
describe: 'Use the service at <queue>'}) | ||
.options('help', { | ||
describe: 'Print help'}) | ||
describe: 'Print help and exit'}) | ||
.options('send', { | ||
describe: 'Send directly to <queue>'}) | ||
.options('recv', { | ||
describe: 'Receive directly from <queue>'}) | ||
.describe('l', 'Listen for connections') | ||
@@ -53,3 +58,3 @@ .describe('k', 'Keep listening after client disconnections') | ||
var handshakeQ = argv.queue; | ||
var handshakeQ = argv.service; | ||
@@ -61,4 +66,4 @@ var ok = amqp.connect(url); | ||
// It's convenient, since it's passed around, to use the channel | ||
// closing a a signal to clean up and leave. | ||
// It's convenient, since most of the work goes through the | ||
// channel, to use its closure as a signal to clean up and leave. | ||
ch.on('close', function() { | ||
@@ -70,3 +75,30 @@ connection.close().then(function() { | ||
// Always sure the handshake queue exists, since we don't know who | ||
// send and recv don't use a service (handshake) queue, they just | ||
// send to or receive from the queue mentioned. | ||
if (argv.send || argv.recv) { | ||
if (argv.send) { | ||
var dest = argv.send; | ||
ch.assertQueue(dest); | ||
var out = writableQueue(ch, dest); | ||
process.stdin.pipe(out); | ||
out.on('finish', function() { | ||
ch.close(); | ||
}); | ||
} | ||
else if (argv.recv) { | ||
var source = argv.recv; | ||
ch.assertQueue(source); | ||
var reader = readableQueue(ch, source); | ||
reader.on('end', function() { | ||
ch.close(); | ||
}); | ||
reader.pipe(process.stdout); | ||
} | ||
return; // no more options matter | ||
} | ||
// Make sure the handshake queue exists, since we don't know who | ||
// will turn up first | ||
@@ -131,3 +163,3 @@ debug('Asserting handshake queue: %s', handshakeQ); | ||
} | ||
}, {exclusive: true}); | ||
}, {noAck: false}); | ||
@@ -181,2 +213,3 @@ var streams = new QueueStreamServer(ch, stdinQ); | ||
// Create a writable stream that sends chunks to a queue. | ||
function writableQueue(channel, queue) { | ||
@@ -198,2 +231,46 @@ var writable = new Writable(); | ||
// Create a readable stream that gets chunks from a stream. | ||
function readableQueue(channel, queue) { | ||
var readable = new Readable(); | ||
readable._read = function() {}; | ||
// Logically, we want to receive everything up to EOF and no | ||
// more. However, in practice there's no way to switch the tap off | ||
// at an exact message; instead, we will overrun slightly, so we | ||
// need to put those extra messages back in the queue by nacking | ||
// them. | ||
var running = true; | ||
var ok = channel.consume(queue, function(msg) { | ||
if (!running) { | ||
channel.nack(msg); | ||
return; | ||
} | ||
switch (msg && msg.properties.type) { | ||
case null: // cancelled by server | ||
readable.emit('error', new Error('Consume cancelled by server')); | ||
break; | ||
case 'eof': | ||
running = false; | ||
// Don't trigger anything (e.g., closing the channel) until | ||
// we've cancelled. We may get messages in the meantime, which | ||
// is why the nack and early return above. | ||
ok.then(function(consumeOk) { | ||
channel.cancel(consumeOk.consumerTag); | ||
readable.push(null); | ||
}); | ||
break; | ||
case 'data': | ||
readable.push(msg.content); | ||
break; | ||
default: | ||
console.warn('Unknown message type %s', msg.properties.type); | ||
} | ||
channel.ack(msg); | ||
}, {exclusive: true, noAck: false}); | ||
return readable; | ||
} | ||
function QueueStreamServer(channel, queue) { | ||
@@ -200,0 +277,0 @@ EventEmitter.call(this); |
{ | ||
"name": "rmqcat", | ||
"version": "0.0.1", | ||
"version": "0.0.2", | ||
"description": "netcat-like tool for sending data through RabbitMQ", | ||
@@ -5,0 +5,0 @@ "keywords": [ |
@@ -6,3 +6,3 @@ rmqcat | ||
Installation: | ||
## Installation | ||
@@ -14,4 +14,29 @@ ```sh | ||
Use: | ||
## Use | ||
`rmqcat` has two modes of use, one-way and two-way ("duplex"). Duplex | ||
corresponds more or less to how `netcat` works, that is, it | ||
establishes a socket-like connection with a server ('listener') on one | ||
side and a client on the other, which can speak back and forth. | ||
One-way (simplex) either relays stdin to a RabbitMQ queue, *or* from a | ||
RabbitMQ queue to stdout. Sending to a queue doesn't wait for a | ||
receiver; receiving from a queue waits for data in the queue. | ||
### Common to both modes | ||
The option `--url` can be used to address a specific RabbitMQ server, | ||
and to provide connection parameters -- see the [amqplib | ||
documentation][amqplib-doc-url]. By default a RabbitMQ server on | ||
localhost is assumed, so you will probably want to supply `--url` in | ||
practice. | ||
The option `-D` will make rmqcat output a bit of debug information to | ||
stderr. | ||
The option `--help`, if present at all, will make rmqcat output a | ||
usage message to stderr then exit. | ||
### Duplex | ||
```sh | ||
@@ -25,3 +50,3 @@ # Start a listener that will put whatever it gets in a file | ||
In general, `rmqcat` will keep a connection open until it gets | ||
`rmqcat` used this way will keep a connection open until it gets | ||
end-of-file, so you can use it to "chat" back and forth, similar to | ||
@@ -34,15 +59,24 @@ netcat. | ||
The option `-k` in combination with `-l` will keep the listener | ||
accepting successive connections. Otherwise it will close once the | ||
accepting successive connections. Otherwise it will exit once the | ||
first connection closes. | ||
The options `--url` and `--queue` can be used to address a RabbitMQ | ||
server elsewhere (url) with a particular queue used for opening | ||
connections (queue). By default a RabbitMQ server on localhost is | ||
assumed, so you will probably want to at least supply `--url` in | ||
practice. The queue name defaults to `"rmqcat"`. | ||
The option `--service` has a role similar to a TCP port number. It | ||
names a queue to be used by clients and listeners to establish | ||
connections. The default is arbitrarily `"rmqcat"`. | ||
The option `-D` will make rmqcat output a bit of debug information to | ||
stderr. | ||
### One-way | ||
The option `--help`, if present at all, will make rmqcat output a | ||
usage message then exit. | ||
```sh | ||
# Send a file to a queue | ||
rmqcat --send bobbins < bobbins.iso | ||
# Save the file in a queue and output the SHA1 sum | ||
rmqcat --recv bobbins | tee bobbins.iso | shasum | ||
``` | ||
The string following either `--send` or `--recv` names a queue that | ||
will hold the data in transit. More than one file of data can be | ||
present in the queue; `rmqcat --recv <queue>` will read a single file | ||
before exiting, or wait if there is no data yet. | ||
[amqplib-doc-url]: http://squaremo.github.io/amqp.node/doc/channel_api.html |
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
13859
262
79
5