Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

@smartrent/heretic

Package Overview
Dependencies
Maintainers
6
Versions
3
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@smartrent/heretic - npm Package Compare versions

Comparing version 0.5.1 to 0.5.2

735

dist/Heretic.js

@@ -6,3 +6,3 @@ "use strict";

});
exports["default"] = void 0;
exports.default = void 0;

@@ -19,59 +19,27 @@ var _bluebird = _interopRequireDefault(require("bluebird"));

function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { "default": obj }; }
function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { default: obj }; }
function _typeof(obj) { if (typeof Symbol === "function" && typeof Symbol.iterator === "symbol") { _typeof = function _typeof(obj) { return typeof obj; }; } else { _typeof = function _typeof(obj) { return obj && typeof Symbol === "function" && obj.constructor === Symbol && obj !== Symbol.prototype ? "symbol" : typeof obj; }; } return _typeof(obj); }
function asyncGeneratorStep(gen, resolve, reject, _next, _throw, key, arg) { try { var info = gen[key](arg); var value = info.value; } catch (error) { reject(error); return; } if (info.done) { resolve(value); } else { Promise.resolve(value).then(_next, _throw); } }
function _asyncToGenerator(fn) { return function () { var self = this, args = arguments; return new Promise(function (resolve, reject) { var gen = fn.apply(self, args); function _next(value) { asyncGeneratorStep(gen, resolve, reject, _next, _throw, "next", value); } function _throw(err) { asyncGeneratorStep(gen, resolve, reject, _next, _throw, "throw", err); } _next(undefined); }); }; }
function _classCallCheck(instance, Constructor) { if (!(instance instanceof Constructor)) { throw new TypeError("Cannot call a class as a function"); } }
function _defineProperties(target, props) { for (var i = 0; i < props.length; i++) { var descriptor = props[i]; descriptor.enumerable = descriptor.enumerable || false; descriptor.configurable = true; if ("value" in descriptor) descriptor.writable = true; Object.defineProperty(target, descriptor.key, descriptor); } }
function _createClass(Constructor, protoProps, staticProps) { if (protoProps) _defineProperties(Constructor.prototype, protoProps); if (staticProps) _defineProperties(Constructor, staticProps); return Constructor; }
function _possibleConstructorReturn(self, call) { if (call && (_typeof(call) === "object" || typeof call === "function")) { return call; } return _assertThisInitialized(self); }
function _assertThisInitialized(self) { if (self === void 0) { throw new ReferenceError("this hasn't been initialised - super() hasn't been called"); } return self; }
function _getPrototypeOf(o) { _getPrototypeOf = Object.setPrototypeOf ? Object.getPrototypeOf : function _getPrototypeOf(o) { return o.__proto__ || Object.getPrototypeOf(o); }; return _getPrototypeOf(o); }
function _inherits(subClass, superClass) { if (typeof superClass !== "function" && superClass !== null) { throw new TypeError("Super expression must either be null or a function"); } subClass.prototype = Object.create(superClass && superClass.prototype, { constructor: { value: subClass, writable: true, configurable: true } }); if (superClass) _setPrototypeOf(subClass, superClass); }
function _setPrototypeOf(o, p) { _setPrototypeOf = Object.setPrototypeOf || function _setPrototypeOf(o, p) { o.__proto__ = p; return o; }; return _setPrototypeOf(o, p); }
var optionsSchema = _joi["default"].object().keys({
tableName: _joi["default"].string()["default"]('heretic_jobs'),
jobsExchange: _joi["default"].string()["default"]('heretic.jobs'),
outcomesExchange: _joi["default"].string()["default"]('heretic.outcomes'),
deadJobsExchange: _joi["default"].string()["default"]('heretic.dead'),
jobOutcomesQueue: _joi["default"].string()["default"]('heretic.jobs.outcomes'),
deadJobsQueue: _joi["default"].string()["default"]('heretic.jobs.dead'),
outcomeRoutingKeyPrefix: _joi["default"].string()["default"]('job-outcome'),
socketOptions: _joi["default"].object().optional().keys({
clientProperties: _joi["default"].object().optional().unknown().keys({
Application: _joi["default"].string().optional()
const optionsSchema = _joi.default.object().keys({
tableName: _joi.default.string().default('heretic_jobs'),
jobsExchange: _joi.default.string().default('heretic.jobs'),
outcomesExchange: _joi.default.string().default('heretic.outcomes'),
deadJobsExchange: _joi.default.string().default('heretic.dead'),
jobOutcomesQueue: _joi.default.string().default('heretic.jobs.outcomes'),
deadJobsQueue: _joi.default.string().default('heretic.jobs.dead'),
outcomeRoutingKeyPrefix: _joi.default.string().default('job-outcome'),
socketOptions: _joi.default.object().optional().keys({
clientProperties: _joi.default.object().optional().unknown().keys({
Application: _joi.default.string().optional()
})
}),
writeOutcomes: _joi["default"]["boolean"]()["default"]('true')
writeOutcomes: _joi.default.boolean().default('true')
});
var Heretic =
/*#__PURE__*/
function (_EventEmitter) {
_inherits(Heretic, _EventEmitter);
class Heretic extends _events.EventEmitter {
constructor(amqpUrl, knex, options = {}) {
super();
function Heretic(amqpUrl, knex) {
var _this;
_joi.default.assert(amqpUrl, _joi.default.string().uri().label('amqpUrl'));
var options = arguments.length > 2 && arguments[2] !== undefined ? arguments[2] : {};
_classCallCheck(this, Heretic);
_this = _possibleConstructorReturn(this, _getPrototypeOf(Heretic).call(this));
_joi["default"].assert(amqpUrl, _joi["default"].string().uri().label('amqpUrl'));
_joi["default"].validate(options, optionsSchema, function (err, value) {
_joi.default.validate(options, optionsSchema, (err, value) => {
if (err) {

@@ -88,549 +56,198 @@ throw err;

_this.amqpUrl = amqpUrl;
_this.knex = knex;
_this.options = options;
_this.queues = {};
_this.connection = null;
_this.controlChannel = null;
return _this;
this.amqpUrl = amqpUrl;
this.knex = knex;
this.options = options;
this.queues = {};
this.connection = null;
this.controlChannel = null;
}
_createClass(Heretic, [{
key: "init",
value: function () {
var _init = _asyncToGenerator(
/*#__PURE__*/
regeneratorRuntime.mark(function _callee() {
var _this2 = this;
async init() {
if (this.connection && this.controlChannel) {
return _bluebird.default.resolve();
}
return regeneratorRuntime.wrap(function _callee$(_context) {
while (1) {
switch (_context.prev = _context.next) {
case 0:
if (!(this.connection && this.controlChannel)) {
_context.next = 2;
break;
}
this.connection = await _amqplib.default.connect(this.amqpUrl, this.socketOptions);
this.controlChannel = await this.connection.createConfirmChannel();
this.controlChannel.on('error', err => {
this.emit('error', err);
});
this.connection.on('error', err => {
this.emit('error', err);
});
}
return _context.abrupt("return", _bluebird["default"].resolve());
async enqueue(queueName, payload, options = {}) {
await this.init();
return await this.knex.transaction(async trx => {
let row = {
queue_name: queueName,
payload: payload
};
case 2:
_context.next = 4;
return _amqplib["default"].connect(this.amqpUrl, this.socketOptions);
if (options.maxAttempts) {
row.max_attempts = options.maxAttempts;
}
case 4:
this.connection = _context.sent;
_context.next = 7;
return this.connection.createConfirmChannel();
let inserts = await trx(this.options.tableName).insert(row).returning('*');
let job = inserts[0];
return job;
}).tap(async job => {
await this.publishConfirm(this.options.jobsExchange, queueName, new Buffer(JSON.stringify({
id: job.id
})));
});
}
case 7:
this.controlChannel = _context.sent;
this.controlChannel.on('error', function (err) {
_this2.emit('error', err);
});
this.connection.on('error', function (err) {
_this2.emit('error', err);
});
async retry(id) {
await this.init();
return await this.knex.transaction(async trx => {
let result = await this.knex(this.options.tableName).update({
status: 'pending'
}).where({
id
}).returning('*');
let job = result[0];
case 10:
case "end":
return _context.stop();
}
}
}, _callee, this);
}));
function init() {
return _init.apply(this, arguments);
if (!job) {
throw new Error('Job not found');
}
return init;
}()
}, {
key: "enqueue",
value: function () {
var _enqueue = _asyncToGenerator(
/*#__PURE__*/
regeneratorRuntime.mark(function _callee4(queueName, payload) {
var _this3 = this;
await this.publishConfirm(this.options.jobsExchange, job.queue_name, new Buffer(JSON.stringify({
id: job.id
})));
});
}
var options,
_args4 = arguments;
return regeneratorRuntime.wrap(function _callee4$(_context4) {
while (1) {
switch (_context4.prev = _context4.next) {
case 0:
options = _args4.length > 2 && _args4[2] !== undefined ? _args4[2] : {};
_context4.next = 3;
return this.init();
process(name, concurrency, handler) {
_joi.default.assert(name, _joi.default.string().label('name'));
case 3:
_context4.next = 5;
return this.knex.transaction(
/*#__PURE__*/
function () {
var _ref = _asyncToGenerator(
/*#__PURE__*/
regeneratorRuntime.mark(function _callee2(trx) {
var row, inserts, job;
return regeneratorRuntime.wrap(function _callee2$(_context2) {
while (1) {
switch (_context2.prev = _context2.next) {
case 0:
row = {
queue_name: queueName,
payload: payload
};
_joi.default.assert(concurrency, _joi.default.number().min(1).label('concurrency'));
if (options.maxAttempts) {
row.max_attempts = options.maxAttempts;
}
_joi.default.assert(handler, _joi.default.func().label('handler'));
_context2.next = 4;
return trx(_this3.options.tableName).insert(row).returning('*');
if (this.queues[name]) {
throw new Error(`Queue "${name}" already registered`);
}
case 4:
inserts = _context2.sent;
job = inserts[0];
return _context2.abrupt("return", job);
this.queues[name] = new _Queue.default(this, name, concurrency, handler);
this.queues[name].on('error', err => {
this.emit('jobError', err);
});
this.queues[name].on('jobSuccess', job => {
this.emit('jobSuccess', job);
});
this.queues[name].on('jobFailed', (job, err) => {
this.emit('jobFailed', job, err);
});
this.queues[name].on('jobComplete', () => {
this.emit('jobComplete');
});
}
case 7:
case "end":
return _context2.stop();
}
}
}, _callee2);
}));
async start() {
await this.init();
return function (_x3) {
return _ref.apply(this, arguments);
};
}()).tap(
/*#__PURE__*/
function () {
var _ref2 = _asyncToGenerator(
/*#__PURE__*/
regeneratorRuntime.mark(function _callee3(job) {
return regeneratorRuntime.wrap(function _callee3$(_context3) {
while (1) {
switch (_context3.prev = _context3.next) {
case 0:
_context3.next = 2;
return _this3.publishConfirm(_this3.options.jobsExchange, queueName, new Buffer(JSON.stringify({
id: job.id
})));
try {
await this.assertEnvironment();
} catch (err) {
await this.controlChannel.close();
await this.connection.close();
throw err;
}
case 2:
case "end":
return _context3.stop();
}
}
}, _callee3);
}));
try {
for (let name in this.queues) {
if (!this.queues.hasOwnProperty(name)) {
continue;
}
return function (_x4) {
return _ref2.apply(this, arguments);
};
}());
case 5:
return _context4.abrupt("return", _context4.sent);
case 6:
case "end":
return _context4.stop();
}
}
}, _callee4, this);
}));
function enqueue(_x, _x2) {
return _enqueue.apply(this, arguments);
await this.queues[name].start();
}
} catch (err) {
await this.connection.close();
throw err;
}
}
return enqueue;
}()
}, {
key: "retry",
value: function () {
var _retry = _asyncToGenerator(
/*#__PURE__*/
regeneratorRuntime.mark(function _callee6(id) {
var _this4 = this;
return regeneratorRuntime.wrap(function _callee6$(_context6) {
while (1) {
switch (_context6.prev = _context6.next) {
case 0:
_context6.next = 2;
return this.init();
case 2:
_context6.next = 4;
return this.knex.transaction(
/*#__PURE__*/
function () {
var _ref3 = _asyncToGenerator(
/*#__PURE__*/
regeneratorRuntime.mark(function _callee5(trx) {
var result, job;
return regeneratorRuntime.wrap(function _callee5$(_context5) {
while (1) {
switch (_context5.prev = _context5.next) {
case 0:
_context5.next = 2;
return _this4.knex(_this4.options.tableName).update({
status: 'pending'
}).where({
id: id
}).returning('*');
case 2:
result = _context5.sent;
job = result[0];
if (job) {
_context5.next = 6;
break;
}
throw new Error('Job not found');
case 6:
_context5.next = 8;
return _this4.publishConfirm(_this4.options.jobsExchange, job.queue_name, new Buffer(JSON.stringify({
id: job.id
})));
case 8:
case "end":
return _context5.stop();
}
}
}, _callee5);
}));
return function (_x6) {
return _ref3.apply(this, arguments);
};
}());
case 4:
return _context6.abrupt("return", _context6.sent);
case 5:
case "end":
return _context6.stop();
}
}
}, _callee6, this);
}));
function retry(_x5) {
return _retry.apply(this, arguments);
async assertQueue(queue) {
await this.init();
return this.controlChannel.assertQueue(queue, {
durable: true,
arguments: {
'x-dead-letter-exchange': this.options.deadJobsExchange,
'x-dead-letter-routing-key': this.options.deadJobsQueue
}
});
}
return retry;
}()
}, {
key: "process",
value: function process(name, concurrency, handler) {
var _this5 = this;
async assertEnvironment() {
const ch = await this.connection.createChannel(); // error events from this channel will result in promises being rejected
_joi["default"].assert(name, _joi["default"].string().label('name'));
ch.on('error', err => {});
await ch.assertExchange(this.options.jobsExchange, 'topic', {
durable: true
});
_joi["default"].assert(concurrency, _joi["default"].number().min(1).label('concurrency'));
if (this.options.writeOutcomes) {
await ch.assertExchange(this.options.outcomesExchange, 'topic', {
durable: true
});
}
_joi["default"].assert(handler, _joi["default"].func().label('handler'));
await ch.assertExchange(this.options.deadJobsExchange, 'direct', {
durable: true,
internal: true
});
if (this.queues[name]) {
throw new Error("Queue \"".concat(name, "\" already registered"));
}
this.queues[name] = new _Queue["default"](this, name, concurrency, handler);
this.queues[name].on('error', function (err) {
_this5.emit('jobError', err);
if (this.options.writeOutcomes) {
await ch.assertQueue(this.options.jobOutcomesQueue, {
durable: true,
arguments: {
'x-dead-letter-exchange': this.options.deadJobsExchange,
'x-dead-letter-routing-key': this.options.deadJobsQueue
}
});
this.queues[name].on('jobSuccess', function (job) {
_this5.emit('jobSuccess', job);
});
this.queues[name].on('jobFailed', function (job, err) {
_this5.emit('jobFailed', job, err);
});
this.queues[name].on('jobComplete', function () {
_this5.emit('jobComplete');
});
}
}, {
key: "start",
value: function () {
var _start = _asyncToGenerator(
/*#__PURE__*/
regeneratorRuntime.mark(function _callee7() {
var name;
return regeneratorRuntime.wrap(function _callee7$(_context7) {
while (1) {
switch (_context7.prev = _context7.next) {
case 0:
_context7.next = 2;
return this.init();
case 2:
_context7.prev = 2;
_context7.next = 5;
return this.assertEnvironment();
await ch.assertQueue(this.options.deadJobsQueue, {
durable: true
});
case 5:
_context7.next = 14;
break;
if (this.options.writeOutcomes) {
await ch.bindQueue(this.options.jobOutcomesQueue, this.options.outcomesExchange, this.options.outcomeRoutingKeyPrefix + '.#');
}
case 7:
_context7.prev = 7;
_context7.t0 = _context7["catch"](2);
_context7.next = 11;
return this.controlChannel.close();
case 11:
_context7.next = 13;
return this.connection.close();
case 13:
throw _context7.t0;
case 14:
_context7.prev = 14;
_context7.t1 = regeneratorRuntime.keys(this.queues);
case 16:
if ((_context7.t2 = _context7.t1()).done) {
_context7.next = 24;
break;
}
name = _context7.t2.value;
if (this.queues.hasOwnProperty(name)) {
_context7.next = 20;
break;
}
return _context7.abrupt("continue", 16);
case 20:
_context7.next = 22;
return this.queues[name].start();
case 22:
_context7.next = 16;
break;
case 24:
_context7.next = 31;
break;
case 26:
_context7.prev = 26;
_context7.t3 = _context7["catch"](14);
_context7.next = 30;
return this.connection.close();
case 30:
throw _context7.t3;
case 31:
case "end":
return _context7.stop();
}
}
}, _callee7, this, [[2, 7], [14, 26]]);
}));
function start() {
return _start.apply(this, arguments);
for (let name in this.queues) {
if (!this.queues.hasOwnProperty(name)) {
continue;
}
return start;
}()
}, {
key: "assertQueue",
value: function () {
var _assertQueue = _asyncToGenerator(
/*#__PURE__*/
regeneratorRuntime.mark(function _callee8(queue) {
return regeneratorRuntime.wrap(function _callee8$(_context8) {
while (1) {
switch (_context8.prev = _context8.next) {
case 0:
_context8.next = 2;
return this.init();
await ch.assertQueue(name, {
durable: true,
arguments: {
'x-dead-letter-exchange': this.options.deadJobsExchange,
'x-dead-letter-routing-key': this.options.deadJobsQueue
}
});
await ch.bindQueue(name, this.options.jobsExchange, name);
}
case 2:
return _context8.abrupt("return", this.controlChannel.assertQueue(queue, {
durable: true,
arguments: {
'x-dead-letter-exchange': this.options.deadJobsExchange,
'x-dead-letter-routing-key': this.options.deadJobsQueue
}
}));
await ch.close();
}
case 3:
case "end":
return _context8.stop();
}
}
}, _callee8, this);
}));
publishConfirm(exchange, routingKey, content, options = {}) {
return new _bluebird.default((resolve, reject) => {
this.controlChannel.publish(exchange, routingKey, content, options, err => {
if (err) {
return reject(err);
}
function assertQueue(_x7) {
return _assertQueue.apply(this, arguments);
}
return assertQueue;
}()
}, {
key: "assertEnvironment",
value: function () {
var _assertEnvironment = _asyncToGenerator(
/*#__PURE__*/
regeneratorRuntime.mark(function _callee9() {
var ch, name;
return regeneratorRuntime.wrap(function _callee9$(_context9) {
while (1) {
switch (_context9.prev = _context9.next) {
case 0:
_context9.next = 2;
return this.connection.createChannel();
case 2:
ch = _context9.sent;
// error events from this channel will result in promises being rejected
ch.on('error', function (err) {});
_context9.next = 6;
return ch.assertExchange(this.options.jobsExchange, 'topic', {
durable: true
});
case 6:
if (!this.options.writeOutcomes) {
_context9.next = 9;
break;
}
_context9.next = 9;
return ch.assertExchange(this.options.outcomesExchange, 'topic', {
durable: true
});
case 9:
_context9.next = 11;
return ch.assertExchange(this.options.deadJobsExchange, 'direct', {
durable: true,
internal: true
});
case 11:
if (!this.options.writeOutcomes) {
_context9.next = 14;
break;
}
_context9.next = 14;
return ch.assertQueue(this.options.jobOutcomesQueue, {
durable: true,
arguments: {
'x-dead-letter-exchange': this.options.deadJobsExchange,
'x-dead-letter-routing-key': this.options.deadJobsQueue
}
});
case 14:
_context9.next = 16;
return ch.assertQueue(this.options.deadJobsQueue, {
durable: true
});
case 16:
if (!this.options.writeOutcomes) {
_context9.next = 19;
break;
}
_context9.next = 19;
return ch.bindQueue(this.options.jobOutcomesQueue, this.options.outcomesExchange, this.options.outcomeRoutingKeyPrefix + '.#');
case 19:
_context9.t0 = regeneratorRuntime.keys(this.queues);
case 20:
if ((_context9.t1 = _context9.t0()).done) {
_context9.next = 30;
break;
}
name = _context9.t1.value;
if (this.queues.hasOwnProperty(name)) {
_context9.next = 24;
break;
}
return _context9.abrupt("continue", 20);
case 24:
_context9.next = 26;
return ch.assertQueue(name, {
durable: true,
arguments: {
'x-dead-letter-exchange': this.options.deadJobsExchange,
'x-dead-letter-routing-key': this.options.deadJobsQueue
}
});
case 26:
_context9.next = 28;
return ch.bindQueue(name, this.options.jobsExchange, name);
case 28:
_context9.next = 20;
break;
case 30:
_context9.next = 32;
return ch.close();
case 32:
case "end":
return _context9.stop();
}
}
}, _callee9, this);
}));
function assertEnvironment() {
return _assertEnvironment.apply(this, arguments);
}
return assertEnvironment;
}()
}, {
key: "publishConfirm",
value: function publishConfirm(exchange, routingKey, content) {
var _this6 = this;
var options = arguments.length > 3 && arguments[3] !== undefined ? arguments[3] : {};
return new _bluebird["default"](function (resolve, reject) {
_this6.controlChannel.publish(exchange, routingKey, content, options, function (err) {
if (err) {
return reject(err);
}
return resolve();
});
return resolve();
});
}
}]);
});
}
return Heretic;
}(_events.EventEmitter);
}
exports["default"] = Heretic;
exports.default = Heretic;

@@ -6,3 +6,3 @@ "use strict";

});
exports["default"] = void 0;
exports.default = void 0;

@@ -17,499 +17,162 @@ var _events = require("events");

function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { "default": obj }; }
function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { default: obj }; }
function _typeof(obj) { if (typeof Symbol === "function" && typeof Symbol.iterator === "symbol") { _typeof = function _typeof(obj) { return typeof obj; }; } else { _typeof = function _typeof(obj) { return obj && typeof Symbol === "function" && obj.constructor === Symbol && obj !== Symbol.prototype ? "symbol" : typeof obj; }; } return _typeof(obj); }
class Queue extends _events.EventEmitter {
constructor(heretic, name, concurrency, handler) {
super();
this.heretic = heretic;
this.knex = this.heretic.knex;
this.tableName = this.heretic.options.tableName;
this.name = name;
this.concurrency = concurrency;
this.handler = handler;
this.channel = null;
this.consumerTag = null;
}
function asyncGeneratorStep(gen, resolve, reject, _next, _throw, key, arg) { try { var info = gen[key](arg); var value = info.value; } catch (error) { reject(error); return; } if (info.done) { resolve(value); } else { Promise.resolve(value).then(_next, _throw); } }
async start() {
await this.openChannel();
let {
consumerTag
} = await this.channel.consume(this.name, this.receiveJob.bind(this), {
noAck: false
});
this.consumerTag = consumerTag;
}
function _asyncToGenerator(fn) { return function () { var self = this, args = arguments; return new Promise(function (resolve, reject) { var gen = fn.apply(self, args); function _next(value) { asyncGeneratorStep(gen, resolve, reject, _next, _throw, "next", value); } function _throw(err) { asyncGeneratorStep(gen, resolve, reject, _next, _throw, "throw", err); } _next(undefined); }); }; }
async pause() {
if (this.channel) {
await this.channel.cancel(this.consumerTag);
}
}
function _classCallCheck(instance, Constructor) { if (!(instance instanceof Constructor)) { throw new TypeError("Cannot call a class as a function"); } }
async openChannel() {
if (this.channel) {
return;
}
function _defineProperties(target, props) { for (var i = 0; i < props.length; i++) { var descriptor = props[i]; descriptor.enumerable = descriptor.enumerable || false; descriptor.configurable = true; if ("value" in descriptor) descriptor.writable = true; Object.defineProperty(target, descriptor.key, descriptor); } }
this.channel = await this.heretic.connection.createChannel();
this.channel.prefetch(this.concurrency);
}
function _createClass(Constructor, protoProps, staticProps) { if (protoProps) _defineProperties(Constructor.prototype, protoProps); if (staticProps) _defineProperties(Constructor, staticProps); return Constructor; }
async closeChannel() {
await this.ch.close();
}
function _possibleConstructorReturn(self, call) { if (call && (_typeof(call) === "object" || typeof call === "function")) { return call; } return _assertThisInitialized(self); }
async fetchJob(id) {
let job = await this.knex(this.tableName).select('*').first().where('id', id);
function _assertThisInitialized(self) { if (self === void 0) { throw new ReferenceError("this hasn't been initialised - super() hasn't been called"); } return self; }
if (!job) {
throw new Error('Job not found');
}
function _getPrototypeOf(o) { _getPrototypeOf = Object.setPrototypeOf ? Object.getPrototypeOf : function _getPrototypeOf(o) { return o.__proto__ || Object.getPrototypeOf(o); }; return _getPrototypeOf(o); }
function _inherits(subClass, superClass) { if (typeof superClass !== "function" && superClass !== null) { throw new TypeError("Super expression must either be null or a function"); } subClass.prototype = Object.create(superClass && superClass.prototype, { constructor: { value: subClass, writable: true, configurable: true } }); if (superClass) _setPrototypeOf(subClass, superClass); }
function _setPrototypeOf(o, p) { _setPrototypeOf = Object.setPrototypeOf || function _setPrototypeOf(o, p) { o.__proto__ = p; return o; }; return _setPrototypeOf(o, p); }
var Queue =
/*#__PURE__*/
function (_EventEmitter) {
_inherits(Queue, _EventEmitter);
function Queue(heretic, name, concurrency, handler) {
var _this;
_classCallCheck(this, Queue);
_this = _possibleConstructorReturn(this, _getPrototypeOf(Queue).call(this));
_this.heretic = heretic;
_this.knex = _this.heretic.knex;
_this.tableName = _this.heretic.options.tableName;
_this.name = name;
_this.concurrency = concurrency;
_this.handler = handler;
_this.channel = null;
_this.consumerTag = null;
return _this;
return job;
}
_createClass(Queue, [{
key: "start",
value: function () {
var _start = _asyncToGenerator(
/*#__PURE__*/
regeneratorRuntime.mark(function _callee() {
var _ref, consumerTag;
async receiveJob(message) {
let body;
return regeneratorRuntime.wrap(function _callee$(_context) {
while (1) {
switch (_context.prev = _context.next) {
case 0:
_context.next = 2;
return this.openChannel();
try {
body = JSON.parse(message.content.toString('utf8'));
case 2:
_context.next = 4;
return this.channel.consume(this.name, this.receiveJob.bind(this), {
noAck: false
});
case 4:
_ref = _context.sent;
consumerTag = _ref.consumerTag;
this.consumerTag = consumerTag;
case 7:
case "end":
return _context.stop();
}
}
}, _callee, this);
}));
function start() {
return _start.apply(this, arguments);
if (!body.id) {
throw new Error('Decoded message did not contain a job id');
}
} catch (e) {
// we won't ever be able to handle this message properly, so
this.channel.nack(message, false, false);
this.emit('error', new Error('Unable to decode message content'));
return;
}
return start;
}()
}, {
key: "pause",
value: function () {
var _pause = _asyncToGenerator(
/*#__PURE__*/
regeneratorRuntime.mark(function _callee2() {
return regeneratorRuntime.wrap(function _callee2$(_context2) {
while (1) {
switch (_context2.prev = _context2.next) {
case 0:
if (!this.channel) {
_context2.next = 3;
break;
}
let job;
_context2.next = 3;
return this.channel.cancel(this.consumerTag);
try {
job = await this.fetchJob(body.id);
} catch (err) {
this.channel.nack(message, false, false);
this.emit('error', err);
return;
}
case 3:
case "end":
return _context2.stop();
}
}
}, _callee2, this);
}));
let d = _domain.default.create();
function pause() {
return _pause.apply(this, arguments);
}
return new _bluebird.default((resolve, reject) => {
d.on('error', reject);
d.run(this.handler, job, message, err => {
if (err) {
return reject(err);
}
return pause;
}()
}, {
key: "openChannel",
value: function () {
var _openChannel = _asyncToGenerator(
/*#__PURE__*/
regeneratorRuntime.mark(function _callee3() {
return regeneratorRuntime.wrap(function _callee3$(_context3) {
while (1) {
switch (_context3.prev = _context3.next) {
case 0:
if (!this.channel) {
_context3.next = 2;
break;
}
return resolve();
});
}).then(async result => {
let savedJob = await this.jobSuccess(job.id);
this.emit('jobSuccess', savedJob);
return _context3.abrupt("return");
case 2:
_context3.next = 4;
return this.heretic.connection.createChannel();
case 4:
this.channel = _context3.sent;
this.channel.prefetch(this.concurrency);
case 6:
case "end":
return _context3.stop();
}
}
}, _callee3, this);
}));
function openChannel() {
return _openChannel.apply(this, arguments);
if (this.heretic.options.writeOutcomes) {
await this.publishConfirm(this.heretic.options.outcomesExchange, `${this.heretic.options.outcomeRoutingKeyPrefix}.success`, message.content);
}
return openChannel;
}()
}, {
key: "closeChannel",
value: function () {
var _closeChannel = _asyncToGenerator(
/*#__PURE__*/
regeneratorRuntime.mark(function _callee4() {
return regeneratorRuntime.wrap(function _callee4$(_context4) {
while (1) {
switch (_context4.prev = _context4.next) {
case 0:
_context4.next = 2;
return this.ch.close();
this.channel.ack(message, false);
}).catch(async err => {
let savedJob = await this.jobFailed(job.id, err.stack);
this.emit('jobFailed', savedJob, err);
case 2:
case "end":
return _context4.stop();
}
}
}, _callee4, this);
}));
function closeChannel() {
return _closeChannel.apply(this, arguments);
if (this.heretic.options.writeOutcomes) {
await this.publishConfirm(this.heretic.options.outcomesExchange, `${this.heretic.options.outcomeRoutingKeyPrefix}.failed`, message.content);
}
return closeChannel;
}()
}, {
key: "fetchJob",
value: function () {
var _fetchJob = _asyncToGenerator(
/*#__PURE__*/
regeneratorRuntime.mark(function _callee5(id) {
var job;
return regeneratorRuntime.wrap(function _callee5$(_context5) {
while (1) {
switch (_context5.prev = _context5.next) {
case 0:
_context5.next = 2;
return this.knex(this.tableName).select('*').first().where('id', id);
this.channel.ack(message, false);
}).finally(() => {
_bluebird.default.delay(1).then(() => this.emit('jobComplete'));
});
}
case 2:
job = _context5.sent;
async jobFailed(jobId, message) {
let result = await this.knex(this.tableName).where({
id: jobId
}).update({
status: 'failed',
attempt_logs: this.knex.raw('attempt_logs || ?::jsonb', JSON.stringify({
time: new Date(),
status: 'failed',
message: message,
hostname: _os.default.hostname()
})),
last_attempted_at: new Date()
}).returning('*');
return result[0];
}
if (job) {
_context5.next = 5;
break;
}
async jobSuccess(jobId) {
let result = await this.knex(this.tableName).where({
id: jobId
}).update({
status: 'success',
attempt_logs: this.knex.raw('attempt_logs || ?::jsonb', JSON.stringify({
time: new Date(),
status: 'success',
message: 'success',
hostname: _os.default.hostname()
})),
last_attempted_at: new Date()
}).returning('*');
return result[0];
}
throw new Error('Job not found');
publishConfirm(exchange, routingKey, content, options = {}) {
return new _bluebird.default((resolve, reject) => {
this.heretic.controlChannel.publish(exchange, routingKey, content, options, err => {
if (err) {
return reject(err);
}
case 5:
return _context5.abrupt("return", job);
case 6:
case "end":
return _context5.stop();
}
}
}, _callee5, this);
}));
function fetchJob(_x) {
return _fetchJob.apply(this, arguments);
}
return fetchJob;
}()
}, {
key: "receiveJob",
value: function () {
var _receiveJob = _asyncToGenerator(
/*#__PURE__*/
regeneratorRuntime.mark(function _callee8(message) {
var _this2 = this;
var body, job, d;
return regeneratorRuntime.wrap(function _callee8$(_context8) {
while (1) {
switch (_context8.prev = _context8.next) {
case 0:
_context8.prev = 0;
body = JSON.parse(message.content.toString('utf8'));
if (body.id) {
_context8.next = 4;
break;
}
throw new Error('Decoded message did not contain a job id');
case 4:
_context8.next = 11;
break;
case 6:
_context8.prev = 6;
_context8.t0 = _context8["catch"](0);
// we won't ever be able to handle this message properly, so
this.channel.nack(message, false, false);
this.emit('error', new Error('Unable to decode message content'));
return _context8.abrupt("return");
case 11:
_context8.prev = 11;
_context8.next = 14;
return this.fetchJob(body.id);
case 14:
job = _context8.sent;
_context8.next = 22;
break;
case 17:
_context8.prev = 17;
_context8.t1 = _context8["catch"](11);
this.channel.nack(message, false, false);
this.emit('error', _context8.t1);
return _context8.abrupt("return");
case 22:
d = _domain["default"].create();
return _context8.abrupt("return", new _bluebird["default"](function (resolve, reject) {
d.on('error', reject);
d.run(_this2.handler, job, message, function (err) {
if (err) {
return reject(err);
}
return resolve();
});
}).then(
/*#__PURE__*/
function () {
var _ref2 = _asyncToGenerator(
/*#__PURE__*/
regeneratorRuntime.mark(function _callee6(result) {
var savedJob;
return regeneratorRuntime.wrap(function _callee6$(_context6) {
while (1) {
switch (_context6.prev = _context6.next) {
case 0:
_context6.next = 2;
return _this2.jobSuccess(job.id);
case 2:
savedJob = _context6.sent;
_this2.emit('jobSuccess', savedJob);
if (!_this2.heretic.options.writeOutcomes) {
_context6.next = 7;
break;
}
_context6.next = 7;
return _this2.publishConfirm(_this2.heretic.options.outcomesExchange, "".concat(_this2.heretic.options.outcomeRoutingKeyPrefix, ".success"), message.content);
case 7:
_this2.channel.ack(message, false);
case 8:
case "end":
return _context6.stop();
}
}
}, _callee6);
}));
return function (_x3) {
return _ref2.apply(this, arguments);
};
}())["catch"](
/*#__PURE__*/
function () {
var _ref3 = _asyncToGenerator(
/*#__PURE__*/
regeneratorRuntime.mark(function _callee7(err) {
var savedJob;
return regeneratorRuntime.wrap(function _callee7$(_context7) {
while (1) {
switch (_context7.prev = _context7.next) {
case 0:
_context7.next = 2;
return _this2.jobFailed(job.id, err.stack);
case 2:
savedJob = _context7.sent;
_this2.emit('jobFailed', savedJob, err);
if (!_this2.heretic.options.writeOutcomes) {
_context7.next = 7;
break;
}
_context7.next = 7;
return _this2.publishConfirm(_this2.heretic.options.outcomesExchange, "".concat(_this2.heretic.options.outcomeRoutingKeyPrefix, ".failed"), message.content);
case 7:
_this2.channel.ack(message, false);
case 8:
case "end":
return _context7.stop();
}
}
}, _callee7);
}));
return function (_x4) {
return _ref3.apply(this, arguments);
};
}())["finally"](function () {
_bluebird["default"].delay(1).then(function () {
return _this2.emit('jobComplete');
});
}));
case 24:
case "end":
return _context8.stop();
}
}
}, _callee8, this, [[0, 6], [11, 17]]);
}));
function receiveJob(_x2) {
return _receiveJob.apply(this, arguments);
}
return receiveJob;
}()
}, {
key: "jobFailed",
value: function () {
var _jobFailed = _asyncToGenerator(
/*#__PURE__*/
regeneratorRuntime.mark(function _callee9(jobId, message) {
var result;
return regeneratorRuntime.wrap(function _callee9$(_context9) {
while (1) {
switch (_context9.prev = _context9.next) {
case 0:
_context9.next = 2;
return this.knex(this.tableName).where({
id: jobId
}).update({
status: 'failed',
attempt_logs: this.knex.raw('attempt_logs || ?::jsonb', JSON.stringify({
time: new Date(),
status: 'failed',
message: message,
hostname: _os["default"].hostname()
})),
last_attempted_at: new Date()
}).returning('*');
case 2:
result = _context9.sent;
return _context9.abrupt("return", result[0]);
case 4:
case "end":
return _context9.stop();
}
}
}, _callee9, this);
}));
function jobFailed(_x5, _x6) {
return _jobFailed.apply(this, arguments);
}
return jobFailed;
}()
}, {
key: "jobSuccess",
value: function () {
var _jobSuccess = _asyncToGenerator(
/*#__PURE__*/
regeneratorRuntime.mark(function _callee10(jobId) {
var result;
return regeneratorRuntime.wrap(function _callee10$(_context10) {
while (1) {
switch (_context10.prev = _context10.next) {
case 0:
_context10.next = 2;
return this.knex(this.tableName).where({
id: jobId
}).update({
status: 'success',
attempt_logs: this.knex.raw('attempt_logs || ?::jsonb', JSON.stringify({
time: new Date(),
status: 'success',
message: 'success',
hostname: _os["default"].hostname()
})),
last_attempted_at: new Date()
}).returning('*');
case 2:
result = _context10.sent;
return _context10.abrupt("return", result[0]);
case 4:
case "end":
return _context10.stop();
}
}
}, _callee10, this);
}));
function jobSuccess(_x7) {
return _jobSuccess.apply(this, arguments);
}
return jobSuccess;
}()
}, {
key: "publishConfirm",
value: function publishConfirm(exchange, routingKey, content) {
var _this3 = this;
var options = arguments.length > 3 && arguments[3] !== undefined ? arguments[3] : {};
return new _bluebird["default"](function (resolve, reject) {
_this3.heretic.controlChannel.publish(exchange, routingKey, content, options, function (err) {
if (err) {
return reject(err);
}
return resolve();
});
return resolve();
});
}
}]);
});
}
return Queue;
}(_events.EventEmitter);
}
exports["default"] = Queue;
exports.default = Queue;
{
"name": "@smartrent/heretic",
"version": "0.5.1",
"version": "0.5.2",
"main": "dist/Heretic.js",

@@ -5,0 +5,0 @@ "dependencies": {

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc