Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

rmqcat

Package Overview
Dependencies
Maintainers
1
Versions
3
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

rmqcat - npm Package Compare versions

Comparing version 0.0.2 to 0.0.3

293

index.js

@@ -9,2 +9,3 @@ #!/usr/bin/env node

var inherits = require('util').inherits;
var spawn = require('child_process').spawn;

@@ -29,2 +30,7 @@ var options = require('yargs')

.options('exec', {
describe: 'Spawn a process and use stdin and stdout from that process',
alias: 'e'
})
.describe('l', 'Listen for connections')

@@ -48,15 +54,50 @@ .describe('k', 'Keep listening after client disconnections')

// queue, which is the common knowledge between the client and the
// server (the "connection point", like a port); then is a stdin queue
// and a stdout queue, both named from the point of view of the
// server.
// server (the "connection point", like a port); then there is an in
// queue and an out queue, both named from the point of view of the
// process.
// The stdin queue is that over which the client sends data to the
// server, and the stdout queue is that over which the server sends
// data to the client. The client creates the stdin queue and annouces
// it to the server (as the 'replyTo' of an empty message sent to the
// handshake queue); the server creates the stdout queue and announces
// it to the client (likewise, sent to the stdout queue).
// The stdin queue is that over which the remote sends data to the
// this process, and the out queue is that over which the process
// sends data to the remote. This process creates the in queue and
// annouces it to the remote (as the 'replyTo' of an empty message);
// the remote creates the out queue and announces it to this process.
// The difference between a client and a server (listener) is that the
// client sends an open message to the handshake queue, and the
// listener responds with an open message to the client's in queue
// (which becomes the server's out queue).
var handshakeQ = argv.service;
function closeLatch(done) {
function either(which) {
switch (which) {
case 'in':
return only('out');
case 'out':
return only('in');
default:
throw new Error('Unknown stream ' + which);
}
}
function only(s) {
return function(which) {
switch (which) {
case s:
return done();
default:
throw new Error('Close on stream other than expected ' + s);
}
}
}
return either;
}
function neither(which) {
throw new Error('Attempted to close ' + which + '; both already closed');
}
var ok = amqp.connect(url);

@@ -67,2 +108,4 @@ ok.then(function(connection) {

// Simplex
// It's convenient, since most of the work goes through the

@@ -85,2 +128,8 @@ // channel, to use its closure as a signal to clean up and leave.

process.stdin.pipe(out);
process.on('SIGINT', function() {
process.stdin.unpipe();
out.end();
});
out.on('finish', function() {

@@ -95,2 +144,26 @@ ch.close();

var reader = readableQueue(ch, source);
var torndown = false;
function teardown() {
if (torndown) return;
torndown = true;
debug("Tearing down pipe to stdout");
reader.stop();
reader.unpipe();
ch.close();
}
process.on('SIGINT', teardown);
// If we're being piped into another process, and that process
// terminates or otherwise closes its input, we can get an
// EPIPE exception here, possibly more than once.
process.stdout.on('error', function(err) {
if (err.code === 'EPIPE') {
debug(err);
teardown(); }
else
throw err;
});
reader.on('end', function() {

@@ -105,2 +178,4 @@ ch.close();

// Duplex
// Make sure the handshake queue exists, since we don't know who

@@ -111,6 +186,28 @@ // will turn up first

var stdin;
var stdout;
var child;
function setup() {
if (argv.e) {
debug('Starting process %s', argv.e);
var args = argv.e.split(' ');
child = spawn(args[0], args.slice(1));
stdin = child.stdout;
stdin.on('end', function() {
debug('Child process output ended');
});
stdout = child.stdin;
}
else {
stdin = process.stdin;
stdout = process.stdout;
}
}
if (argv.l) { // act as server
ch.assertQueue('', {exclusive: true}).then(function(ok) {
var stdinQ = ok.queue;
debug('Created stdin queue: %s', stdinQ);
return ch.assertQueue('', {exclusive: true}).then(function(ok) {
var inQ = ok.queue;
debug('Created in queue: %s', inQ);

@@ -124,9 +221,32 @@ // I need a channel on which to accept connections. Why

var accepted = null;
var current = null;
var writable = null;
process.on('SIGINT', function() {
if (writable !== null) {
writable.end();
}
ch.close();
});
function next() {
process.stdin.unpipe();
stdin.unpipe();
acceptCh.ack(accepted);
}
var latch;
if (argv.k) {
var freshLatch = closeLatch(function() {
next();
return freshLatch;
});
latch = freshLatch;
}
else {
latch = closeLatch(function() {
next();
ch.close();
return neither;
});
}
acceptCh.prefetch(1);

@@ -140,2 +260,6 @@ // Any returned messages are a result of the 'open' not

' assuming dead connection');
if (child != null) {
debug('Killing child process');
child.kill();
}
next();

@@ -148,17 +272,15 @@ });

accepted = msg;
var stdoutQ = msg.properties.replyTo;
debug('Recv open: stdout is %s', stdoutQ);
acceptCh.sendToQueue(stdoutQ, new Buffer(0),
var outQ = msg.properties.replyTo;
debug('Recv open: out queue is %s', outQ);
acceptCh.sendToQueue(outQ, new Buffer(0),
{type: 'open',
mandatory: true,
replyTo: stdinQ});
debug('Sent open to %s: stdin is %s', stdoutQ, stdinQ);
current = writableQueue(ch, stdoutQ);
current.on('finish', function() {
next();
if (!argv.k) {
ch.close();
}
replyTo: inQ});
debug('Sent open to out queue %s: in queue is %s', outQ, inQ);
writable = writableQueue(ch, outQ);
writable.on('finish', function() {
latch = latch('out');
});
process.stdin.pipe(current, {end: true});
setup();
stdin.pipe(writable, {end: true});
break;

@@ -172,8 +294,21 @@ default:

var streams = new QueueStreamServer(ch, stdinQ);
streams.on('connection', function(stream) {
stream.pipe(process.stdout, {end: !argv.k});
stream.on('end', function() {
current.end();
var streams = new QueueStreamServer(ch, inQ);
streams.on('connection', function(readable) {
// process.stdout doesn't like to have `#end` called on
// it'; however, pipe appears to know not to do so, and I
// *do* want it called if stdout is the input to an
// `--exec`.
readable.pipe(stdout, {end: true});
readable.on('end', function() {
latch = latch('in');
});
// The special case for closing client streams: if we're
// accepting input on stdin, treat the server closing as us
// closing.
if (stdin === process.stdin) {
readable.on('end', function() {
writable.end();
});
}
});

@@ -185,31 +320,41 @@ });

else { // act as client
var latch = closeLatch(function() {
ch.close();
return neither;
});
ch.assertQueue('', {exclusive: true}).then(function(ok) {
var stdoutQ = ok.queue;
debug('Created stdout queue %s', stdoutQ);
var outQ = ok.queue;
debug('Created out queue %s', outQ);
ch.consume(stdoutQ, function(msg) {
switch (msg.properties.type) {
case 'open':
var stdinQ = msg.properties.replyTo;
debug('Recv open: stdin is %s', stdinQ);
var relay = writableQueue(ch, stdinQ);
process.stdin.pipe(relay, {end: true});
break;
case 'data':
debug('Recv %d bytes on stdout', msg.content.length);
process.stdout.write(msg.content);
break;
case 'eof':
debug('Recv eof on stdout (%s)', stdoutQ);
ch.close();
break;
default:
console.warn('Unknown message type %s',
msg.properties.type,
' received on stdout queue');
var readable = readableQueue(ch, outQ, function(inQ) {
setup();
readable.pipe(stdout);
var writable = writableQueue(ch, inQ);
stdin.pipe(writable, {end: true});
writable.on('finish', function() {
latch = latch('out');
});
process.on('SIGINT', function() {
if (writable !== null) {
writable.end();
}
});
// The special case for closing client streams: if we're
// accepting input on stdin, treat the server closing as us
// closing.
if (stdin === process.stdin) {
readable.on('end', function() {
writable.end();
});
}
}, {noAck: true, exclusive: true});
});
readable.on('end', function() {
latch = latch('in');
});
ch.sendToQueue(handshakeQ, new Buffer(0),
{type: 'open', replyTo: stdoutQ});
{type: 'open', replyTo: outQ});
debug('Sent open to handshake queue %s', handshakeQ);

@@ -239,3 +384,3 @@ });

// Create a readable stream that gets chunks from a stream.
function readableQueue(channel, queue) {
function readableQueue(channel, queue, openCb) {
var readable = new Readable();

@@ -251,2 +396,15 @@ readable._read = function() {};

// Don't trigger anything (e.g., closing the channel) until
// we've cancelled. We may get messages in the meantime, which
// is why the nack and early return above.
function stop() {
running = false;
ok.then(function(consumeOk) {
channel.cancel(consumeOk.consumerTag);
readable.push(null);
});
}
readable.stop = stop;
var ok = channel.consume(queue, function(msg) {

@@ -263,14 +421,13 @@ if (!running) {

case 'eof':
running = false;
// Don't trigger anything (e.g., closing the channel) until
// we've cancelled. We may get messages in the meantime, which
// is why the nack and early return above.
ok.then(function(consumeOk) {
channel.cancel(consumeOk.consumerTag);
readable.push(null);
});
stop();
break;
case 'data':
debug('Recv %d bytes', msg.content.length);
readable.push(msg.content);
break;
case 'open':
var inQ = msg.properties.replyTo;
debug('Recv open: in queue is %s', inQ);
openCb(inQ);
break;
default:

@@ -302,5 +459,6 @@ console.warn('Unknown message type %s', msg.properties.type);

setImmediate(function() {
self.emit('error', new Error('Input queue deleted'))}); // fall-through
self.emit('error', new Error('In queue deleted'))});
// fall-through
case 'eof':
debug('Recv eof on %s', queue);
debug('Recv eof on in queue %s', queue);
current.push(null);

@@ -311,3 +469,4 @@ current = null;

debug('Recv %d bytes on %s', msg.content.length, queue);
current.push(msg.content); break;
current.push(msg.content);
break;
default:

@@ -314,0 +473,0 @@ console.warn('Unknown message type %s', msg.properties.type);

{
"name": "rmqcat",
"version": "0.0.2",
"version": "0.0.3",
"description": "netcat-like tool for sending data through RabbitMQ",

@@ -5,0 +5,0 @@ "keywords": [

@@ -59,2 +59,14 @@ rmqcat

The option `-e` or `--exec` causes rmqcat to spawn a child process
using the argument following and redirect stdin and stdout of that
process to the queue. For example,
```js
rmqcat -l --exec "grep -n foo"
```
If the option `-k` is used in combination with `-e`, the child process
will be run for each connection made. In a client, the process is run
once the connection is accepted.
The option `--service` has a role similar to a TCP port number. It

@@ -61,0 +73,0 @@ names a queue to be used by clients and listeners to establish

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc