Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

beanstalkd

Package Overview
Dependencies
Maintainers
1
Versions
20
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

beanstalkd - npm Package Compare versions

Comparing version 1.1.0 to 2.0.0

lib/commands.js

96

lib/client.js

@@ -11,2 +11,4 @@ 'use strict';

var _slicedToArray = function () { function sliceIterator(arr, i) { var _arr = []; var _n = true; var _d = false; var _e = undefined; try { for (var _i = arr[Symbol.iterator](), _s; !(_n = (_s = _i.next()).done); _n = true) { _arr.push(_s.value); if (i && _arr.length === i) break; } } catch (err) { _d = true; _e = err; } finally { try { if (!_n && _i["return"]) _i["return"](); } finally { if (_d) throw _e; } } return _arr; } return function (arr, i) { if (Array.isArray(arr)) { return arr; } else if (Symbol.iterator in Object(arr)) { return sliceIterator(arr, i); } else { throw new TypeError("Invalid attempt to destructure non-iterable instance"); } }; }();
var _net = require('net');

@@ -28,4 +30,8 @@

var _types = require('./types');
var _commands = require('./commands');
var _lodash = require('lodash.camelcase');
var _lodash2 = _interopRequireDefault(_lodash);
function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { default: obj }; }

@@ -47,40 +53,14 @@

this.closed = false;
}
this.use = makeCommand(new _writer.BasicWriter('use'), new _reader.BasicReader('USING', new _types.TubeType()));
static addCommand(command, expected) {
BeanstalkdClient.prototype[(0, _lodash2.default)(command)] = makeCommand(command, new _writer.BasicWriter(command), new _reader.BasicReader(expected));
}
this.listTubeUsed = makeCommand(new _writer.BasicWriter('list-tube-used'), new _reader.BasicReader('USING', new _types.TubeType()));
static addYamlCommand(command, expected) {
BeanstalkdClient.prototype[(0, _lodash2.default)(command)] = makeCommand(command, new _writer.BasicWriter(command), new _reader.YamlReader(expected));
}
this.pauseTube = makeCommand(new _writer.BasicWriter('pause-tube'), new _reader.BasicReader('PAUSED'));
this.put = makeCommand(new _writer.BodyWriter('put'), new _reader.BasicReader('INSERTED', new _types.IdType()));
this.watch = makeCommand(new _writer.BasicWriter('watch'), new _reader.BasicReader('WATCHING', new _types.TubeType()));
this.ignore = makeCommand(new _writer.BasicWriter('ignore'), new _reader.BasicReader('WATCHING', new _types.TubeType()));
/* Reserve commands */
this.reserve = makeCommand(new _writer.BasicWriter('reserve'), new _reader.BodyReader('RESERVED', new _types.IdType(), new _types.BodyType()));
this.reserveWithTimeout = makeCommand(new _writer.BasicWriter('reserve-with-timeout'), new _reader.BodyReader('RESERVED', new _types.IdType(), new _types.BodyType()));
/* Job commands */
this.destroy = makeCommand(new _writer.BasicWriter('delete'), new _reader.BasicReader('DELETED'));
this.bury = makeCommand(new _writer.BasicWriter('bury'), new _reader.BasicReader('BURIED'));
this.release = makeCommand(new _writer.BasicWriter('release'), new _reader.BasicReader('RELEASED'));
this.touch = makeCommand(new _writer.BasicWriter('touch'), new _reader.BasicReader('TOUCHED'));
this.kickJob = makeCommand(new _writer.BasicWriter('kick-job'), new _reader.BasicReader('KICKED'));
/* Peek commands */
this.peek = makeCommand(new _writer.BasicWriter('peek'), new _reader.BodyReader('FOUND', new _types.IgnoreType(), new _types.BodyType()));
this.peekReady = makeCommand(new _writer.BasicWriter('peek-ready'), new _reader.BodyReader('FOUND', new _types.IdType(), new _types.BodyType()));
this.peekDelayed = makeCommand(new _writer.BasicWriter('peek-delayed'), new _reader.BodyReader('FOUND', new _types.IdType(), new _types.BodyType()));
this.peekBuried = makeCommand(new _writer.BasicWriter('peek-buried'), new _reader.BodyReader('FOUND', new _types.IdType(), new _types.BodyType()));
/* Commands that returns YAML */
this.listTubesWatched = makeCommand(new _writer.BasicWriter('list-tubes-watched'), new _reader.YamlReader('OK', new _types.YamlBodyType()));
this.listTubes = makeCommand(new _writer.BasicWriter('list-tubes'), new _reader.YamlReader('OK', new _types.YamlBodyType()));
this.statsJob = makeCommand(new _writer.BasicWriter('stats-job'), new _reader.YamlReader('OK', new _types.YamlBodyType()));
this.statsTube = makeCommand(new _writer.BasicWriter('stats-tube'), new _reader.YamlReader('OK', new _types.YamlBodyType()));
this.stats = makeCommand(new _writer.BasicWriter('stats'), new _reader.YamlReader('OK', new _types.YamlBodyType()));
destroy() {
return this['delete'].apply(this, arguments);
}

@@ -143,5 +123,20 @@

exports.default = BeanstalkdClient;
function makeCommand(writer, reader) {
let command = (() => {
var _ref = (0, _bluebird.coroutine)(function* () {
_commands.commands.forEach((_ref) => {
var _ref2 = _slicedToArray(_ref, 2);
let command = _ref2[0],
expectation = _ref2[1];
return BeanstalkdClient.addCommand(command, expectation);
});
_commands.yamlCommands.forEach((_ref3) => {
var _ref4 = _slicedToArray(_ref3, 2);
let command = _ref4[0],
expectation = _ref4[1];
return BeanstalkdClient.addYamlCommand(command, expectation);
});
function makeCommand(command, writer, reader) {
let handler = (() => {
var _ref5 = (0, _bluebird.coroutine)(function* () {
var _this = this;

@@ -156,4 +151,11 @@

protocol = this.protocol,
spec = protocol.commandMap[command],
result;
if (spec.args.indexOf('bytes') > -1) {
let data = args.pop();
args.push(data.length);
args.push(data);
}
if (this.closed) throw new Error('Connection is closed');

@@ -166,3 +168,3 @@

onConnectionEnded = function onConnectionEnded(error) {
reject(error || 'CLOSED');
reject(error || new Error('CONNECTION_CLOSED'));
};

@@ -174,3 +176,3 @@

_this.readQueue.push(function (data) {
return reader.handle(data, function (result) {
return reader.handle(protocol, data, function (result) {
connection.removeListener('close', onConnectionEnded);

@@ -206,11 +208,13 @@ connection.removeListener('error', onConnectionEnded);

return function command() {
return _ref.apply(this, arguments);
return function handler() {
return _ref5.apply(this, arguments);
};
})();
command.writer = writer;
command.reader = reader;
/*
handler.writer = writer;
handler.reader = reader;
*/
return command;
return handler;
}

@@ -33,3 +33,3 @@ 'use strict';

this.connection.emit('error', new Error(`No read queue item for item, length: ${ data.length }`));
console.log(data.toString());
console.log('No read queue item' /* , data.toString() */);
this.connection.destroy();

@@ -36,0 +36,0 @@ return;

@@ -6,8 +6,6 @@ 'use strict';

});
exports.YamlReader = exports.BodyReader = exports.BasicReader = undefined;
exports.YamlReader = exports.BasicReader = undefined;
var _slicedToArray = function () { function sliceIterator(arr, i) { var _arr = []; var _n = true; var _d = false; var _e = undefined; try { for (var _i = arr[Symbol.iterator](), _s; !(_n = (_s = _i.next()).done); _n = true) { _arr.push(_s.value); if (i && _arr.length === i) break; } } catch (err) { _d = true; _e = err; } finally { try { if (!_n && _i["return"]) _i["return"](); } finally { if (_d) throw _e; } } return _arr; } return function (arr, i) { if (Array.isArray(arr)) { return arr; } else if (Symbol.iterator in Object(arr)) { return sliceIterator(arr, i); } else { throw new TypeError("Invalid attempt to destructure non-iterable instance"); } }; }();
exports.parseResult = parseResult;
var _jsYaml = require('js-yaml');

@@ -17,144 +15,46 @@

var _types = require('./types');
var _misc = require('./misc');
function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { default: obj }; }
function extractHeader(data) {
let length = data.indexOf(_misc.CRLF),
header = data.toString('utf8', 0, length),
remainder = data.slice(length + _misc.CRLF.length, data.length);
return [header, remainder];
}
function parseResult(result, types) {
let offset = 0;
result = result.slice(0);
types.forEach(function (type, i) {
if (type instanceof _types.IgnoreType) {
result.splice(i + offset, 1);
offset--;
}
});
return result;
}
class BasicReader {
constructor(expectation) {
this.expectation = expectation;
this.remainder = new Buffer(0);
}
for (var _len = arguments.length, types = Array(_len > 1 ? _len - 1 : 0), _key = 1; _key < _len; _key++) {
types[_key - 1] = arguments[_key];
}
this.types = types;
parseData(data) {
return data;
}
handle(data, resolve, reject) {
var _extractHeader = extractHeader(data),
_extractHeader2 = _slicedToArray(_extractHeader, 2);
handle(protocol, data, resolve, reject) {
var _protocol$parseReply = protocol.parseReply(Buffer.concat([this.remainder, data])),
_protocol$parseReply2 = _slicedToArray(_protocol$parseReply, 2);
let header = _extractHeader2[0],
remainder = _extractHeader2[1],
args = header.split(' '),
response = args.shift(),
result = null;
let remainder = _protocol$parseReply2[0],
result = _protocol$parseReply2[1];
if (response === this.expectation) {
result = args;
result = parseResult(result, this.types);
resolve(result.length > 1 ? result : result[0]);
return remainder;
if (remainder && !result) {
this.remainder = remainder;
return;
} else {
reject(new Error(response));
return remainder;
this.remainder = new Buffer(0);
}
}
}
exports.BasicReader = BasicReader;
class BodyReader extends BasicReader {
constructor(expectation) {
for (var _len2 = arguments.length, types = Array(_len2 > 1 ? _len2 - 1 : 0), _key2 = 1; _key2 < _len2; _key2++) {
types[_key2 - 1] = arguments[_key2];
}
if (result.reply === this.expectation) {
let args = [],
spec = protocol.replyMap[result.reply];
super(expectation, ...types);
this.continue = null;
}
parseBody(body) {
return body;
}
handle(data, resolve, reject) {
if (this.continue) {
var _continue = this.continue;
let length = _continue.length,
result = _continue.result,
body = _continue.body;
body = Buffer.concat([body, data]);
if (body.length - _misc.CRLF.length < length) {
this.continue.body = body;
return false;
if (spec) {
let bytes = spec.args.indexOf('bytes');
args = spec.args.map(arg => result.args[arg]);
if (bytes !== -1) {
args.splice(bytes, 1);
args[args.length - 1] = this.parseData(args[args.length - 1]);
}
}
this.continue = null;
body = body.slice(0, length);
let remainder = body.slice(length + _misc.CRLF.length);
body = this.parseBody(body);
result.push(body);
result = parseResult(result, this.types);
resolve(result.length > 1 ? result : result[0]);
return remainder;
}
var _extractHeader3 = extractHeader(data),
_extractHeader4 = _slicedToArray(_extractHeader3, 2);
let header = _extractHeader4[0],
remainder = _extractHeader4[1],
args = header.split(' '),
response = args.shift(),
length = parseInt(args.pop(), 10),
result = null,
body;
if (response === this.expectation) {
result = args;
if (remainder.length < length) {
this.continue = {
result: result,
body: remainder,
length: length
};
return false;
} else {
body = remainder.slice(0, length);
remainder = remainder.slice(length + _misc.CRLF.length);
}
body = this.parseBody(body);
result.push(body);
result = parseResult(result, this.types);
resolve(result.length > 1 ? result : result[0]);
return remainder;
resolve(args.length > 1 ? args : args[0]);
return remainder || new Buffer(0);
} else {
reject(new Error(response));
return remainder;
reject(new Error(result.reply));
return remainder || new Buffer(0);
}

@@ -164,8 +64,8 @@ }

exports.BodyReader = BodyReader;
class YamlReader extends BodyReader {
parseBody(body) {
return _jsYaml2.default.load(body.toString());
exports.BasicReader = BasicReader;
class YamlReader extends BasicReader {
parseData(data) {
return _jsYaml2.default.load(data.toString());
}
}
exports.YamlReader = YamlReader;

@@ -6,3 +6,3 @@ "use strict";

});
exports.BodyWriter = exports.BasicWriter = exports.Writer = undefined;
exports.BasicWriter = exports.Writer = undefined;

@@ -33,28 +33,2 @@ var _bluebird = require("bluebird");

}
exports.BasicWriter = BasicWriter;
class BodyWriter extends BasicWriter {
handle(protocol, connection) {
for (var _len2 = arguments.length, args = Array(_len2 > 2 ? _len2 - 2 : 0), _key2 = 2; _key2 < _len2; _key2++) {
args[_key2 - 2] = arguments[_key2];
}
var _this2 = this;
return (0, _bluebird.coroutine)(function* () {
let body = args.pop();
if (!Buffer.isBuffer(body)) {
body = new Buffer(body);
}
args.push(body.length);
args.push(body);
yield new Promise(function (resolve) {
connection.write(protocol.buildCommand(_this2.command, args), resolve);
});
})();
}
}
exports.BodyWriter = BodyWriter;
exports.BasicWriter = BasicWriter;
{
"name": "beanstalkd",
"version": "1.1.0",
"version": "2.0.0",
"description": "A beanstalkd client for Node.js with promises",

@@ -8,6 +8,7 @@ "main": "lib/client.js",

"babel-runtime": "^5.8.25",
"beanstalkd-protocol": "^0.1.1",
"beanstalkd-protocol": "^0.2.2",
"bluebird": "^3.4.7",
"debug": "^2.2.0",
"js-yaml": "^3.4.2"
"js-yaml": "^3.4.2",
"lodash.camelcase": "^4.3.0"
},

@@ -14,0 +15,0 @@ "devDependencies": {

@@ -36,3 +36,3 @@ import Client from '../../src/client';

}).then((jobId) => {
return this.client.peek(jobId).then(function (payload) {
return this.client.peek(jobId).spread(function (jobId, payload) {
expect(Buffer.isBuffer(payload)).to.be.ok;

@@ -57,2 +57,8 @@ expect(JSON.parse(payload.toString())).to.deep.equal(values);

it('should return stats', function () {
return this.client.stats().then((stats) => {
expect(stats.hostname).to.be.ok;
});
});
it('should be able to put and reserve a job', function () {

@@ -78,3 +84,3 @@ let worker = new Client(host, port)

return worker.reserveWithTimeout(0).spread((reserveId, body) => {
expect(putId).to.equal(reserveId);
expect(putId.toString()).to.equal(reserveId.toString());
expect(JSON.parse(body.toString())).to.deep.equal(values);

@@ -109,3 +115,3 @@

return worker.reserveWithTimeout(0).spread((reserveId, body) => {
expect(putId).to.equal(reserveId);
expect(putId.toString()).to.equal(reserveId.toString());
expect(JSON.parse(body.toString())).to.deep.equal(values);

@@ -138,3 +144,3 @@ });

return worker.reserveWithTimeout(0).spread((reserveId, body) => {
expect(putId).to.equal(reserveId);
expect(putId.toString()).to.equal(reserveId.toString());
expect(JSON.parse(body.toString())).to.deep.equal(values);

@@ -141,0 +147,0 @@ });

import {BasicReader, BodyReader, YamlReader} from 'reader';
import {CRLF} from 'misc';
import {expect} from 'chai';
import sinon from 'sinon';
import BeanstalkdProtocol from 'beanstalkd-protocol';
const protocol = new BeanstalkdProtocol();
const CRLF = new Buffer([0x0d, 0x0a]);

@@ -16,26 +18,2 @@ describe('reader', function () {

describe('BasicReader', function () {
it('should extract the header and resolve with the rest of arguments', function () {
var expectation = Math.random().toString()
, reader = new BasicReader(expectation, [])
, resolve = this.sinon.stub()
, result = [Math.random().toString(), Math.random().toString()]
, data = Buffer.concat([new Buffer([expectation].concat(result).join(' ')), CRLF]);
reader.handle(data, resolve);
expect(resolve).to.have.been.calledWith(result);
});
it('should resolve with a single argument', function () {
var resolve = this.sinon.stub()
, expectation = Math.random().toString()
, reader = new BasicReader(expectation, [])
, result = [Math.random().toString()]
, data = Buffer.concat([new Buffer([expectation].concat(result).join(' ')), CRLF]);
reader.handle(data, resolve);
expect(resolve).to.have.been.calledWith(result[0]);
});
it('should reject if response does not match expectation', function () {

@@ -47,3 +25,3 @@ var reject = this.sinon.stub()

reader.handle(data, null, reject);
reader.handle(protocol, data, null, reject);

@@ -54,17 +32,2 @@ expect(reject).to.have.been.calledOnce;

});
describe('BodyReader', function () {
it('should resolve with arguments and job body', function () {
var resolve = this.sinon.stub()
, expectation = Math.random().toString()
, reader = new BodyReader(expectation, [])
, body = new Buffer(Math.random().toString()+Math.random().toString()+Math.random().toString()+Math.random().toString())
, result = [Math.random().toString(), body.length]
, data = Buffer.concat([new Buffer([expectation].concat(result).join(' ')), CRLF, body, CRLF]);
reader.handle(data, resolve);
expect(resolve).to.have.been.calledWith([result[0], body]);
});
});
});

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc