Comparing version 0.0.2 to 0.0.3
293
index.js
@@ -9,2 +9,3 @@ #!/usr/bin/env node | ||
var inherits = require('util').inherits; | ||
var spawn = require('child_process').spawn; | ||
@@ -29,2 +30,7 @@ var options = require('yargs') | ||
.options('exec', { | ||
describe: 'Spawn a process and use stdin and stdout from that process', | ||
alias: 'e' | ||
}) | ||
.describe('l', 'Listen for connections') | ||
@@ -48,15 +54,50 @@ .describe('k', 'Keep listening after client disconnections') | ||
// queue, which is the common knowledge between the client and the | ||
// server (the "connection point", like a port); then is a stdin queue | ||
// and a stdout queue, both named from the point of view of the | ||
// server. | ||
// server (the "connection point", like a port); then there is an in | ||
// queue and an out queue, both named from the point of view of the | ||
// process. | ||
// The stdin queue is that over which the client sends data to the | ||
// server, and the stdout queue is that over which the server sends | ||
// data to the client. The client creates the stdin queue and annouces | ||
// it to the server (as the 'replyTo' of an empty message sent to the | ||
// handshake queue); the server creates the stdout queue and announces | ||
// it to the client (likewise, sent to the stdout queue). | ||
// The stdin queue is that over which the remote sends data to the | ||
// this process, and the out queue is that over which the process | ||
// sends data to the remote. This process creates the in queue and | ||
// annouces it to the remote (as the 'replyTo' of an empty message); | ||
// the remote creates the out queue and announces it to this process. | ||
// The difference between a client and a server (listener) is that the | ||
// client sends an open message to the handshake queue, and the | ||
// listener responds with an open message to the client's in queue | ||
// (which becomes the server's out queue). | ||
var handshakeQ = argv.service; | ||
function closeLatch(done) { | ||
function either(which) { | ||
switch (which) { | ||
case 'in': | ||
return only('out'); | ||
case 'out': | ||
return only('in'); | ||
default: | ||
throw new Error('Unknown stream ' + which); | ||
} | ||
} | ||
function only(s) { | ||
return function(which) { | ||
switch (which) { | ||
case s: | ||
return done(); | ||
default: | ||
throw new Error('Close on stream other than expected ' + s); | ||
} | ||
} | ||
} | ||
return either; | ||
} | ||
function neither(which) { | ||
throw new Error('Attempted to close ' + which + '; both already closed'); | ||
} | ||
var ok = amqp.connect(url); | ||
@@ -67,2 +108,4 @@ ok.then(function(connection) { | ||
// Simplex | ||
// It's convenient, since most of the work goes through the | ||
@@ -85,2 +128,8 @@ // channel, to use its closure as a signal to clean up and leave. | ||
process.stdin.pipe(out); | ||
process.on('SIGINT', function() { | ||
process.stdin.unpipe(); | ||
out.end(); | ||
}); | ||
out.on('finish', function() { | ||
@@ -95,2 +144,26 @@ ch.close(); | ||
var reader = readableQueue(ch, source); | ||
var torndown = false; | ||
function teardown() { | ||
if (torndown) return; | ||
torndown = true; | ||
debug("Tearing down pipe to stdout"); | ||
reader.stop(); | ||
reader.unpipe(); | ||
ch.close(); | ||
} | ||
process.on('SIGINT', teardown); | ||
// If we're being piped into another process, and that process | ||
// terminates or otherwise closes its input, we can get an | ||
// EPIPE exception here, possibly more than once. | ||
process.stdout.on('error', function(err) { | ||
if (err.code === 'EPIPE') { | ||
debug(err); | ||
teardown(); } | ||
else | ||
throw err; | ||
}); | ||
reader.on('end', function() { | ||
@@ -105,2 +178,4 @@ ch.close(); | ||
// Duplex | ||
// Make sure the handshake queue exists, since we don't know who | ||
@@ -111,6 +186,28 @@ // will turn up first | ||
var stdin; | ||
var stdout; | ||
var child; | ||
function setup() { | ||
if (argv.e) { | ||
debug('Starting process %s', argv.e); | ||
var args = argv.e.split(' '); | ||
child = spawn(args[0], args.slice(1)); | ||
stdin = child.stdout; | ||
stdin.on('end', function() { | ||
debug('Child process output ended'); | ||
}); | ||
stdout = child.stdin; | ||
} | ||
else { | ||
stdin = process.stdin; | ||
stdout = process.stdout; | ||
} | ||
} | ||
if (argv.l) { // act as server | ||
ch.assertQueue('', {exclusive: true}).then(function(ok) { | ||
var stdinQ = ok.queue; | ||
debug('Created stdin queue: %s', stdinQ); | ||
return ch.assertQueue('', {exclusive: true}).then(function(ok) { | ||
var inQ = ok.queue; | ||
debug('Created in queue: %s', inQ); | ||
@@ -124,9 +221,32 @@ // I need a channel on which to accept connections. Why | ||
var accepted = null; | ||
var current = null; | ||
var writable = null; | ||
process.on('SIGINT', function() { | ||
if (writable !== null) { | ||
writable.end(); | ||
} | ||
ch.close(); | ||
}); | ||
function next() { | ||
process.stdin.unpipe(); | ||
stdin.unpipe(); | ||
acceptCh.ack(accepted); | ||
} | ||
var latch; | ||
if (argv.k) { | ||
var freshLatch = closeLatch(function() { | ||
next(); | ||
return freshLatch; | ||
}); | ||
latch = freshLatch; | ||
} | ||
else { | ||
latch = closeLatch(function() { | ||
next(); | ||
ch.close(); | ||
return neither; | ||
}); | ||
} | ||
acceptCh.prefetch(1); | ||
@@ -140,2 +260,6 @@ // Any returned messages are a result of the 'open' not | ||
' assuming dead connection'); | ||
if (child != null) { | ||
debug('Killing child process'); | ||
child.kill(); | ||
} | ||
next(); | ||
@@ -148,17 +272,15 @@ }); | ||
accepted = msg; | ||
var stdoutQ = msg.properties.replyTo; | ||
debug('Recv open: stdout is %s', stdoutQ); | ||
acceptCh.sendToQueue(stdoutQ, new Buffer(0), | ||
var outQ = msg.properties.replyTo; | ||
debug('Recv open: out queue is %s', outQ); | ||
acceptCh.sendToQueue(outQ, new Buffer(0), | ||
{type: 'open', | ||
mandatory: true, | ||
replyTo: stdinQ}); | ||
debug('Sent open to %s: stdin is %s', stdoutQ, stdinQ); | ||
current = writableQueue(ch, stdoutQ); | ||
current.on('finish', function() { | ||
next(); | ||
if (!argv.k) { | ||
ch.close(); | ||
} | ||
replyTo: inQ}); | ||
debug('Sent open to out queue %s: in queue is %s', outQ, inQ); | ||
writable = writableQueue(ch, outQ); | ||
writable.on('finish', function() { | ||
latch = latch('out'); | ||
}); | ||
process.stdin.pipe(current, {end: true}); | ||
setup(); | ||
stdin.pipe(writable, {end: true}); | ||
break; | ||
@@ -172,8 +294,21 @@ default: | ||
var streams = new QueueStreamServer(ch, stdinQ); | ||
streams.on('connection', function(stream) { | ||
stream.pipe(process.stdout, {end: !argv.k}); | ||
stream.on('end', function() { | ||
current.end(); | ||
var streams = new QueueStreamServer(ch, inQ); | ||
streams.on('connection', function(readable) { | ||
// process.stdout doesn't like to have `#end` called on | ||
// it'; however, pipe appears to know not to do so, and I | ||
// *do* want it called if stdout is the input to an | ||
// `--exec`. | ||
readable.pipe(stdout, {end: true}); | ||
readable.on('end', function() { | ||
latch = latch('in'); | ||
}); | ||
// The special case for closing client streams: if we're | ||
// accepting input on stdin, treat the server closing as us | ||
// closing. | ||
if (stdin === process.stdin) { | ||
readable.on('end', function() { | ||
writable.end(); | ||
}); | ||
} | ||
}); | ||
@@ -185,31 +320,41 @@ }); | ||
else { // act as client | ||
var latch = closeLatch(function() { | ||
ch.close(); | ||
return neither; | ||
}); | ||
ch.assertQueue('', {exclusive: true}).then(function(ok) { | ||
var stdoutQ = ok.queue; | ||
debug('Created stdout queue %s', stdoutQ); | ||
var outQ = ok.queue; | ||
debug('Created out queue %s', outQ); | ||
ch.consume(stdoutQ, function(msg) { | ||
switch (msg.properties.type) { | ||
case 'open': | ||
var stdinQ = msg.properties.replyTo; | ||
debug('Recv open: stdin is %s', stdinQ); | ||
var relay = writableQueue(ch, stdinQ); | ||
process.stdin.pipe(relay, {end: true}); | ||
break; | ||
case 'data': | ||
debug('Recv %d bytes on stdout', msg.content.length); | ||
process.stdout.write(msg.content); | ||
break; | ||
case 'eof': | ||
debug('Recv eof on stdout (%s)', stdoutQ); | ||
ch.close(); | ||
break; | ||
default: | ||
console.warn('Unknown message type %s', | ||
msg.properties.type, | ||
' received on stdout queue'); | ||
var readable = readableQueue(ch, outQ, function(inQ) { | ||
setup(); | ||
readable.pipe(stdout); | ||
var writable = writableQueue(ch, inQ); | ||
stdin.pipe(writable, {end: true}); | ||
writable.on('finish', function() { | ||
latch = latch('out'); | ||
}); | ||
process.on('SIGINT', function() { | ||
if (writable !== null) { | ||
writable.end(); | ||
} | ||
}); | ||
// The special case for closing client streams: if we're | ||
// accepting input on stdin, treat the server closing as us | ||
// closing. | ||
if (stdin === process.stdin) { | ||
readable.on('end', function() { | ||
writable.end(); | ||
}); | ||
} | ||
}, {noAck: true, exclusive: true}); | ||
}); | ||
readable.on('end', function() { | ||
latch = latch('in'); | ||
}); | ||
ch.sendToQueue(handshakeQ, new Buffer(0), | ||
{type: 'open', replyTo: stdoutQ}); | ||
{type: 'open', replyTo: outQ}); | ||
debug('Sent open to handshake queue %s', handshakeQ); | ||
@@ -239,3 +384,3 @@ }); | ||
// Create a readable stream that gets chunks from a stream. | ||
function readableQueue(channel, queue) { | ||
function readableQueue(channel, queue, openCb) { | ||
var readable = new Readable(); | ||
@@ -251,2 +396,15 @@ readable._read = function() {}; | ||
// Don't trigger anything (e.g., closing the channel) until | ||
// we've cancelled. We may get messages in the meantime, which | ||
// is why the nack and early return above. | ||
function stop() { | ||
running = false; | ||
ok.then(function(consumeOk) { | ||
channel.cancel(consumeOk.consumerTag); | ||
readable.push(null); | ||
}); | ||
} | ||
readable.stop = stop; | ||
var ok = channel.consume(queue, function(msg) { | ||
@@ -263,14 +421,13 @@ if (!running) { | ||
case 'eof': | ||
running = false; | ||
// Don't trigger anything (e.g., closing the channel) until | ||
// we've cancelled. We may get messages in the meantime, which | ||
// is why the nack and early return above. | ||
ok.then(function(consumeOk) { | ||
channel.cancel(consumeOk.consumerTag); | ||
readable.push(null); | ||
}); | ||
stop(); | ||
break; | ||
case 'data': | ||
debug('Recv %d bytes', msg.content.length); | ||
readable.push(msg.content); | ||
break; | ||
case 'open': | ||
var inQ = msg.properties.replyTo; | ||
debug('Recv open: in queue is %s', inQ); | ||
openCb(inQ); | ||
break; | ||
default: | ||
@@ -302,5 +459,6 @@ console.warn('Unknown message type %s', msg.properties.type); | ||
setImmediate(function() { | ||
self.emit('error', new Error('Input queue deleted'))}); // fall-through | ||
self.emit('error', new Error('In queue deleted'))}); | ||
// fall-through | ||
case 'eof': | ||
debug('Recv eof on %s', queue); | ||
debug('Recv eof on in queue %s', queue); | ||
current.push(null); | ||
@@ -311,3 +469,4 @@ current = null; | ||
debug('Recv %d bytes on %s', msg.content.length, queue); | ||
current.push(msg.content); break; | ||
current.push(msg.content); | ||
break; | ||
default: | ||
@@ -314,0 +473,0 @@ console.warn('Unknown message type %s', msg.properties.type); |
{ | ||
"name": "rmqcat", | ||
"version": "0.0.2", | ||
"version": "0.0.3", | ||
"description": "netcat-like tool for sending data through RabbitMQ", | ||
@@ -5,0 +5,0 @@ "keywords": [ |
@@ -59,2 +59,14 @@ rmqcat | ||
The option `-e` or `--exec` causes rmqcat to spawn a child process | ||
using the argument following and redirect stdin and stdout of that | ||
process to the queue. For example, | ||
```js | ||
rmqcat -l --exec "grep -n foo" | ||
``` | ||
If the option `-k` is used in combination with `-e`, the child process | ||
will be run for each connection made. In a client, the process is run | ||
once the connection is accepted. | ||
The option `--service` has a role similar to a TCP port number. It | ||
@@ -61,0 +73,0 @@ names a queue to be used by clients and listeners to establish |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Shell access
Supply chain riskThis module accesses the system shell. Accessing the system shell increases the risk of executing arbitrary code.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
17955
395
91
1