@smartrent/heretic
Advanced tools
Comparing version 0.5.1 to 0.5.2
@@ -6,3 +6,3 @@ "use strict"; | ||
}); | ||
exports["default"] = void 0; | ||
exports.default = void 0; | ||
@@ -19,59 +19,27 @@ var _bluebird = _interopRequireDefault(require("bluebird")); | ||
function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { "default": obj }; } | ||
function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { default: obj }; } | ||
function _typeof(obj) { if (typeof Symbol === "function" && typeof Symbol.iterator === "symbol") { _typeof = function _typeof(obj) { return typeof obj; }; } else { _typeof = function _typeof(obj) { return obj && typeof Symbol === "function" && obj.constructor === Symbol && obj !== Symbol.prototype ? "symbol" : typeof obj; }; } return _typeof(obj); } | ||
function asyncGeneratorStep(gen, resolve, reject, _next, _throw, key, arg) { try { var info = gen[key](arg); var value = info.value; } catch (error) { reject(error); return; } if (info.done) { resolve(value); } else { Promise.resolve(value).then(_next, _throw); } } | ||
function _asyncToGenerator(fn) { return function () { var self = this, args = arguments; return new Promise(function (resolve, reject) { var gen = fn.apply(self, args); function _next(value) { asyncGeneratorStep(gen, resolve, reject, _next, _throw, "next", value); } function _throw(err) { asyncGeneratorStep(gen, resolve, reject, _next, _throw, "throw", err); } _next(undefined); }); }; } | ||
function _classCallCheck(instance, Constructor) { if (!(instance instanceof Constructor)) { throw new TypeError("Cannot call a class as a function"); } } | ||
function _defineProperties(target, props) { for (var i = 0; i < props.length; i++) { var descriptor = props[i]; descriptor.enumerable = descriptor.enumerable || false; descriptor.configurable = true; if ("value" in descriptor) descriptor.writable = true; Object.defineProperty(target, descriptor.key, descriptor); } } | ||
function _createClass(Constructor, protoProps, staticProps) { if (protoProps) _defineProperties(Constructor.prototype, protoProps); if (staticProps) _defineProperties(Constructor, staticProps); return Constructor; } | ||
function _possibleConstructorReturn(self, call) { if (call && (_typeof(call) === "object" || typeof call === "function")) { return call; } return _assertThisInitialized(self); } | ||
function _assertThisInitialized(self) { if (self === void 0) { throw new ReferenceError("this hasn't been initialised - super() hasn't been called"); } return self; } | ||
function _getPrototypeOf(o) { _getPrototypeOf = Object.setPrototypeOf ? Object.getPrototypeOf : function _getPrototypeOf(o) { return o.__proto__ || Object.getPrototypeOf(o); }; return _getPrototypeOf(o); } | ||
function _inherits(subClass, superClass) { if (typeof superClass !== "function" && superClass !== null) { throw new TypeError("Super expression must either be null or a function"); } subClass.prototype = Object.create(superClass && superClass.prototype, { constructor: { value: subClass, writable: true, configurable: true } }); if (superClass) _setPrototypeOf(subClass, superClass); } | ||
function _setPrototypeOf(o, p) { _setPrototypeOf = Object.setPrototypeOf || function _setPrototypeOf(o, p) { o.__proto__ = p; return o; }; return _setPrototypeOf(o, p); } | ||
var optionsSchema = _joi["default"].object().keys({ | ||
tableName: _joi["default"].string()["default"]('heretic_jobs'), | ||
jobsExchange: _joi["default"].string()["default"]('heretic.jobs'), | ||
outcomesExchange: _joi["default"].string()["default"]('heretic.outcomes'), | ||
deadJobsExchange: _joi["default"].string()["default"]('heretic.dead'), | ||
jobOutcomesQueue: _joi["default"].string()["default"]('heretic.jobs.outcomes'), | ||
deadJobsQueue: _joi["default"].string()["default"]('heretic.jobs.dead'), | ||
outcomeRoutingKeyPrefix: _joi["default"].string()["default"]('job-outcome'), | ||
socketOptions: _joi["default"].object().optional().keys({ | ||
clientProperties: _joi["default"].object().optional().unknown().keys({ | ||
Application: _joi["default"].string().optional() | ||
const optionsSchema = _joi.default.object().keys({ | ||
tableName: _joi.default.string().default('heretic_jobs'), | ||
jobsExchange: _joi.default.string().default('heretic.jobs'), | ||
outcomesExchange: _joi.default.string().default('heretic.outcomes'), | ||
deadJobsExchange: _joi.default.string().default('heretic.dead'), | ||
jobOutcomesQueue: _joi.default.string().default('heretic.jobs.outcomes'), | ||
deadJobsQueue: _joi.default.string().default('heretic.jobs.dead'), | ||
outcomeRoutingKeyPrefix: _joi.default.string().default('job-outcome'), | ||
socketOptions: _joi.default.object().optional().keys({ | ||
clientProperties: _joi.default.object().optional().unknown().keys({ | ||
Application: _joi.default.string().optional() | ||
}) | ||
}), | ||
writeOutcomes: _joi["default"]["boolean"]()["default"]('true') | ||
writeOutcomes: _joi.default.boolean().default('true') | ||
}); | ||
var Heretic = | ||
/*#__PURE__*/ | ||
function (_EventEmitter) { | ||
_inherits(Heretic, _EventEmitter); | ||
class Heretic extends _events.EventEmitter { | ||
constructor(amqpUrl, knex, options = {}) { | ||
super(); | ||
function Heretic(amqpUrl, knex) { | ||
var _this; | ||
_joi.default.assert(amqpUrl, _joi.default.string().uri().label('amqpUrl')); | ||
var options = arguments.length > 2 && arguments[2] !== undefined ? arguments[2] : {}; | ||
_classCallCheck(this, Heretic); | ||
_this = _possibleConstructorReturn(this, _getPrototypeOf(Heretic).call(this)); | ||
_joi["default"].assert(amqpUrl, _joi["default"].string().uri().label('amqpUrl')); | ||
_joi["default"].validate(options, optionsSchema, function (err, value) { | ||
_joi.default.validate(options, optionsSchema, (err, value) => { | ||
if (err) { | ||
@@ -88,549 +56,198 @@ throw err; | ||
_this.amqpUrl = amqpUrl; | ||
_this.knex = knex; | ||
_this.options = options; | ||
_this.queues = {}; | ||
_this.connection = null; | ||
_this.controlChannel = null; | ||
return _this; | ||
this.amqpUrl = amqpUrl; | ||
this.knex = knex; | ||
this.options = options; | ||
this.queues = {}; | ||
this.connection = null; | ||
this.controlChannel = null; | ||
} | ||
_createClass(Heretic, [{ | ||
key: "init", | ||
value: function () { | ||
var _init = _asyncToGenerator( | ||
/*#__PURE__*/ | ||
regeneratorRuntime.mark(function _callee() { | ||
var _this2 = this; | ||
async init() { | ||
if (this.connection && this.controlChannel) { | ||
return _bluebird.default.resolve(); | ||
} | ||
return regeneratorRuntime.wrap(function _callee$(_context) { | ||
while (1) { | ||
switch (_context.prev = _context.next) { | ||
case 0: | ||
if (!(this.connection && this.controlChannel)) { | ||
_context.next = 2; | ||
break; | ||
} | ||
this.connection = await _amqplib.default.connect(this.amqpUrl, this.socketOptions); | ||
this.controlChannel = await this.connection.createConfirmChannel(); | ||
this.controlChannel.on('error', err => { | ||
this.emit('error', err); | ||
}); | ||
this.connection.on('error', err => { | ||
this.emit('error', err); | ||
}); | ||
} | ||
return _context.abrupt("return", _bluebird["default"].resolve()); | ||
async enqueue(queueName, payload, options = {}) { | ||
await this.init(); | ||
return await this.knex.transaction(async trx => { | ||
let row = { | ||
queue_name: queueName, | ||
payload: payload | ||
}; | ||
case 2: | ||
_context.next = 4; | ||
return _amqplib["default"].connect(this.amqpUrl, this.socketOptions); | ||
if (options.maxAttempts) { | ||
row.max_attempts = options.maxAttempts; | ||
} | ||
case 4: | ||
this.connection = _context.sent; | ||
_context.next = 7; | ||
return this.connection.createConfirmChannel(); | ||
let inserts = await trx(this.options.tableName).insert(row).returning('*'); | ||
let job = inserts[0]; | ||
return job; | ||
}).tap(async job => { | ||
await this.publishConfirm(this.options.jobsExchange, queueName, new Buffer(JSON.stringify({ | ||
id: job.id | ||
}))); | ||
}); | ||
} | ||
case 7: | ||
this.controlChannel = _context.sent; | ||
this.controlChannel.on('error', function (err) { | ||
_this2.emit('error', err); | ||
}); | ||
this.connection.on('error', function (err) { | ||
_this2.emit('error', err); | ||
}); | ||
async retry(id) { | ||
await this.init(); | ||
return await this.knex.transaction(async trx => { | ||
let result = await this.knex(this.options.tableName).update({ | ||
status: 'pending' | ||
}).where({ | ||
id | ||
}).returning('*'); | ||
let job = result[0]; | ||
case 10: | ||
case "end": | ||
return _context.stop(); | ||
} | ||
} | ||
}, _callee, this); | ||
})); | ||
function init() { | ||
return _init.apply(this, arguments); | ||
if (!job) { | ||
throw new Error('Job not found'); | ||
} | ||
return init; | ||
}() | ||
}, { | ||
key: "enqueue", | ||
value: function () { | ||
var _enqueue = _asyncToGenerator( | ||
/*#__PURE__*/ | ||
regeneratorRuntime.mark(function _callee4(queueName, payload) { | ||
var _this3 = this; | ||
await this.publishConfirm(this.options.jobsExchange, job.queue_name, new Buffer(JSON.stringify({ | ||
id: job.id | ||
}))); | ||
}); | ||
} | ||
var options, | ||
_args4 = arguments; | ||
return regeneratorRuntime.wrap(function _callee4$(_context4) { | ||
while (1) { | ||
switch (_context4.prev = _context4.next) { | ||
case 0: | ||
options = _args4.length > 2 && _args4[2] !== undefined ? _args4[2] : {}; | ||
_context4.next = 3; | ||
return this.init(); | ||
process(name, concurrency, handler) { | ||
_joi.default.assert(name, _joi.default.string().label('name')); | ||
case 3: | ||
_context4.next = 5; | ||
return this.knex.transaction( | ||
/*#__PURE__*/ | ||
function () { | ||
var _ref = _asyncToGenerator( | ||
/*#__PURE__*/ | ||
regeneratorRuntime.mark(function _callee2(trx) { | ||
var row, inserts, job; | ||
return regeneratorRuntime.wrap(function _callee2$(_context2) { | ||
while (1) { | ||
switch (_context2.prev = _context2.next) { | ||
case 0: | ||
row = { | ||
queue_name: queueName, | ||
payload: payload | ||
}; | ||
_joi.default.assert(concurrency, _joi.default.number().min(1).label('concurrency')); | ||
if (options.maxAttempts) { | ||
row.max_attempts = options.maxAttempts; | ||
} | ||
_joi.default.assert(handler, _joi.default.func().label('handler')); | ||
_context2.next = 4; | ||
return trx(_this3.options.tableName).insert(row).returning('*'); | ||
if (this.queues[name]) { | ||
throw new Error(`Queue "${name}" already registered`); | ||
} | ||
case 4: | ||
inserts = _context2.sent; | ||
job = inserts[0]; | ||
return _context2.abrupt("return", job); | ||
this.queues[name] = new _Queue.default(this, name, concurrency, handler); | ||
this.queues[name].on('error', err => { | ||
this.emit('jobError', err); | ||
}); | ||
this.queues[name].on('jobSuccess', job => { | ||
this.emit('jobSuccess', job); | ||
}); | ||
this.queues[name].on('jobFailed', (job, err) => { | ||
this.emit('jobFailed', job, err); | ||
}); | ||
this.queues[name].on('jobComplete', () => { | ||
this.emit('jobComplete'); | ||
}); | ||
} | ||
case 7: | ||
case "end": | ||
return _context2.stop(); | ||
} | ||
} | ||
}, _callee2); | ||
})); | ||
async start() { | ||
await this.init(); | ||
return function (_x3) { | ||
return _ref.apply(this, arguments); | ||
}; | ||
}()).tap( | ||
/*#__PURE__*/ | ||
function () { | ||
var _ref2 = _asyncToGenerator( | ||
/*#__PURE__*/ | ||
regeneratorRuntime.mark(function _callee3(job) { | ||
return regeneratorRuntime.wrap(function _callee3$(_context3) { | ||
while (1) { | ||
switch (_context3.prev = _context3.next) { | ||
case 0: | ||
_context3.next = 2; | ||
return _this3.publishConfirm(_this3.options.jobsExchange, queueName, new Buffer(JSON.stringify({ | ||
id: job.id | ||
}))); | ||
try { | ||
await this.assertEnvironment(); | ||
} catch (err) { | ||
await this.controlChannel.close(); | ||
await this.connection.close(); | ||
throw err; | ||
} | ||
case 2: | ||
case "end": | ||
return _context3.stop(); | ||
} | ||
} | ||
}, _callee3); | ||
})); | ||
try { | ||
for (let name in this.queues) { | ||
if (!this.queues.hasOwnProperty(name)) { | ||
continue; | ||
} | ||
return function (_x4) { | ||
return _ref2.apply(this, arguments); | ||
}; | ||
}()); | ||
case 5: | ||
return _context4.abrupt("return", _context4.sent); | ||
case 6: | ||
case "end": | ||
return _context4.stop(); | ||
} | ||
} | ||
}, _callee4, this); | ||
})); | ||
function enqueue(_x, _x2) { | ||
return _enqueue.apply(this, arguments); | ||
await this.queues[name].start(); | ||
} | ||
} catch (err) { | ||
await this.connection.close(); | ||
throw err; | ||
} | ||
} | ||
return enqueue; | ||
}() | ||
}, { | ||
key: "retry", | ||
value: function () { | ||
var _retry = _asyncToGenerator( | ||
/*#__PURE__*/ | ||
regeneratorRuntime.mark(function _callee6(id) { | ||
var _this4 = this; | ||
return regeneratorRuntime.wrap(function _callee6$(_context6) { | ||
while (1) { | ||
switch (_context6.prev = _context6.next) { | ||
case 0: | ||
_context6.next = 2; | ||
return this.init(); | ||
case 2: | ||
_context6.next = 4; | ||
return this.knex.transaction( | ||
/*#__PURE__*/ | ||
function () { | ||
var _ref3 = _asyncToGenerator( | ||
/*#__PURE__*/ | ||
regeneratorRuntime.mark(function _callee5(trx) { | ||
var result, job; | ||
return regeneratorRuntime.wrap(function _callee5$(_context5) { | ||
while (1) { | ||
switch (_context5.prev = _context5.next) { | ||
case 0: | ||
_context5.next = 2; | ||
return _this4.knex(_this4.options.tableName).update({ | ||
status: 'pending' | ||
}).where({ | ||
id: id | ||
}).returning('*'); | ||
case 2: | ||
result = _context5.sent; | ||
job = result[0]; | ||
if (job) { | ||
_context5.next = 6; | ||
break; | ||
} | ||
throw new Error('Job not found'); | ||
case 6: | ||
_context5.next = 8; | ||
return _this4.publishConfirm(_this4.options.jobsExchange, job.queue_name, new Buffer(JSON.stringify({ | ||
id: job.id | ||
}))); | ||
case 8: | ||
case "end": | ||
return _context5.stop(); | ||
} | ||
} | ||
}, _callee5); | ||
})); | ||
return function (_x6) { | ||
return _ref3.apply(this, arguments); | ||
}; | ||
}()); | ||
case 4: | ||
return _context6.abrupt("return", _context6.sent); | ||
case 5: | ||
case "end": | ||
return _context6.stop(); | ||
} | ||
} | ||
}, _callee6, this); | ||
})); | ||
function retry(_x5) { | ||
return _retry.apply(this, arguments); | ||
async assertQueue(queue) { | ||
await this.init(); | ||
return this.controlChannel.assertQueue(queue, { | ||
durable: true, | ||
arguments: { | ||
'x-dead-letter-exchange': this.options.deadJobsExchange, | ||
'x-dead-letter-routing-key': this.options.deadJobsQueue | ||
} | ||
}); | ||
} | ||
return retry; | ||
}() | ||
}, { | ||
key: "process", | ||
value: function process(name, concurrency, handler) { | ||
var _this5 = this; | ||
async assertEnvironment() { | ||
const ch = await this.connection.createChannel(); // error events from this channel will result in promises being rejected | ||
_joi["default"].assert(name, _joi["default"].string().label('name')); | ||
ch.on('error', err => {}); | ||
await ch.assertExchange(this.options.jobsExchange, 'topic', { | ||
durable: true | ||
}); | ||
_joi["default"].assert(concurrency, _joi["default"].number().min(1).label('concurrency')); | ||
if (this.options.writeOutcomes) { | ||
await ch.assertExchange(this.options.outcomesExchange, 'topic', { | ||
durable: true | ||
}); | ||
} | ||
_joi["default"].assert(handler, _joi["default"].func().label('handler')); | ||
await ch.assertExchange(this.options.deadJobsExchange, 'direct', { | ||
durable: true, | ||
internal: true | ||
}); | ||
if (this.queues[name]) { | ||
throw new Error("Queue \"".concat(name, "\" already registered")); | ||
} | ||
this.queues[name] = new _Queue["default"](this, name, concurrency, handler); | ||
this.queues[name].on('error', function (err) { | ||
_this5.emit('jobError', err); | ||
if (this.options.writeOutcomes) { | ||
await ch.assertQueue(this.options.jobOutcomesQueue, { | ||
durable: true, | ||
arguments: { | ||
'x-dead-letter-exchange': this.options.deadJobsExchange, | ||
'x-dead-letter-routing-key': this.options.deadJobsQueue | ||
} | ||
}); | ||
this.queues[name].on('jobSuccess', function (job) { | ||
_this5.emit('jobSuccess', job); | ||
}); | ||
this.queues[name].on('jobFailed', function (job, err) { | ||
_this5.emit('jobFailed', job, err); | ||
}); | ||
this.queues[name].on('jobComplete', function () { | ||
_this5.emit('jobComplete'); | ||
}); | ||
} | ||
}, { | ||
key: "start", | ||
value: function () { | ||
var _start = _asyncToGenerator( | ||
/*#__PURE__*/ | ||
regeneratorRuntime.mark(function _callee7() { | ||
var name; | ||
return regeneratorRuntime.wrap(function _callee7$(_context7) { | ||
while (1) { | ||
switch (_context7.prev = _context7.next) { | ||
case 0: | ||
_context7.next = 2; | ||
return this.init(); | ||
case 2: | ||
_context7.prev = 2; | ||
_context7.next = 5; | ||
return this.assertEnvironment(); | ||
await ch.assertQueue(this.options.deadJobsQueue, { | ||
durable: true | ||
}); | ||
case 5: | ||
_context7.next = 14; | ||
break; | ||
if (this.options.writeOutcomes) { | ||
await ch.bindQueue(this.options.jobOutcomesQueue, this.options.outcomesExchange, this.options.outcomeRoutingKeyPrefix + '.#'); | ||
} | ||
case 7: | ||
_context7.prev = 7; | ||
_context7.t0 = _context7["catch"](2); | ||
_context7.next = 11; | ||
return this.controlChannel.close(); | ||
case 11: | ||
_context7.next = 13; | ||
return this.connection.close(); | ||
case 13: | ||
throw _context7.t0; | ||
case 14: | ||
_context7.prev = 14; | ||
_context7.t1 = regeneratorRuntime.keys(this.queues); | ||
case 16: | ||
if ((_context7.t2 = _context7.t1()).done) { | ||
_context7.next = 24; | ||
break; | ||
} | ||
name = _context7.t2.value; | ||
if (this.queues.hasOwnProperty(name)) { | ||
_context7.next = 20; | ||
break; | ||
} | ||
return _context7.abrupt("continue", 16); | ||
case 20: | ||
_context7.next = 22; | ||
return this.queues[name].start(); | ||
case 22: | ||
_context7.next = 16; | ||
break; | ||
case 24: | ||
_context7.next = 31; | ||
break; | ||
case 26: | ||
_context7.prev = 26; | ||
_context7.t3 = _context7["catch"](14); | ||
_context7.next = 30; | ||
return this.connection.close(); | ||
case 30: | ||
throw _context7.t3; | ||
case 31: | ||
case "end": | ||
return _context7.stop(); | ||
} | ||
} | ||
}, _callee7, this, [[2, 7], [14, 26]]); | ||
})); | ||
function start() { | ||
return _start.apply(this, arguments); | ||
for (let name in this.queues) { | ||
if (!this.queues.hasOwnProperty(name)) { | ||
continue; | ||
} | ||
return start; | ||
}() | ||
}, { | ||
key: "assertQueue", | ||
value: function () { | ||
var _assertQueue = _asyncToGenerator( | ||
/*#__PURE__*/ | ||
regeneratorRuntime.mark(function _callee8(queue) { | ||
return regeneratorRuntime.wrap(function _callee8$(_context8) { | ||
while (1) { | ||
switch (_context8.prev = _context8.next) { | ||
case 0: | ||
_context8.next = 2; | ||
return this.init(); | ||
await ch.assertQueue(name, { | ||
durable: true, | ||
arguments: { | ||
'x-dead-letter-exchange': this.options.deadJobsExchange, | ||
'x-dead-letter-routing-key': this.options.deadJobsQueue | ||
} | ||
}); | ||
await ch.bindQueue(name, this.options.jobsExchange, name); | ||
} | ||
case 2: | ||
return _context8.abrupt("return", this.controlChannel.assertQueue(queue, { | ||
durable: true, | ||
arguments: { | ||
'x-dead-letter-exchange': this.options.deadJobsExchange, | ||
'x-dead-letter-routing-key': this.options.deadJobsQueue | ||
} | ||
})); | ||
await ch.close(); | ||
} | ||
case 3: | ||
case "end": | ||
return _context8.stop(); | ||
} | ||
} | ||
}, _callee8, this); | ||
})); | ||
publishConfirm(exchange, routingKey, content, options = {}) { | ||
return new _bluebird.default((resolve, reject) => { | ||
this.controlChannel.publish(exchange, routingKey, content, options, err => { | ||
if (err) { | ||
return reject(err); | ||
} | ||
function assertQueue(_x7) { | ||
return _assertQueue.apply(this, arguments); | ||
} | ||
return assertQueue; | ||
}() | ||
}, { | ||
key: "assertEnvironment", | ||
value: function () { | ||
var _assertEnvironment = _asyncToGenerator( | ||
/*#__PURE__*/ | ||
regeneratorRuntime.mark(function _callee9() { | ||
var ch, name; | ||
return regeneratorRuntime.wrap(function _callee9$(_context9) { | ||
while (1) { | ||
switch (_context9.prev = _context9.next) { | ||
case 0: | ||
_context9.next = 2; | ||
return this.connection.createChannel(); | ||
case 2: | ||
ch = _context9.sent; | ||
// error events from this channel will result in promises being rejected | ||
ch.on('error', function (err) {}); | ||
_context9.next = 6; | ||
return ch.assertExchange(this.options.jobsExchange, 'topic', { | ||
durable: true | ||
}); | ||
case 6: | ||
if (!this.options.writeOutcomes) { | ||
_context9.next = 9; | ||
break; | ||
} | ||
_context9.next = 9; | ||
return ch.assertExchange(this.options.outcomesExchange, 'topic', { | ||
durable: true | ||
}); | ||
case 9: | ||
_context9.next = 11; | ||
return ch.assertExchange(this.options.deadJobsExchange, 'direct', { | ||
durable: true, | ||
internal: true | ||
}); | ||
case 11: | ||
if (!this.options.writeOutcomes) { | ||
_context9.next = 14; | ||
break; | ||
} | ||
_context9.next = 14; | ||
return ch.assertQueue(this.options.jobOutcomesQueue, { | ||
durable: true, | ||
arguments: { | ||
'x-dead-letter-exchange': this.options.deadJobsExchange, | ||
'x-dead-letter-routing-key': this.options.deadJobsQueue | ||
} | ||
}); | ||
case 14: | ||
_context9.next = 16; | ||
return ch.assertQueue(this.options.deadJobsQueue, { | ||
durable: true | ||
}); | ||
case 16: | ||
if (!this.options.writeOutcomes) { | ||
_context9.next = 19; | ||
break; | ||
} | ||
_context9.next = 19; | ||
return ch.bindQueue(this.options.jobOutcomesQueue, this.options.outcomesExchange, this.options.outcomeRoutingKeyPrefix + '.#'); | ||
case 19: | ||
_context9.t0 = regeneratorRuntime.keys(this.queues); | ||
case 20: | ||
if ((_context9.t1 = _context9.t0()).done) { | ||
_context9.next = 30; | ||
break; | ||
} | ||
name = _context9.t1.value; | ||
if (this.queues.hasOwnProperty(name)) { | ||
_context9.next = 24; | ||
break; | ||
} | ||
return _context9.abrupt("continue", 20); | ||
case 24: | ||
_context9.next = 26; | ||
return ch.assertQueue(name, { | ||
durable: true, | ||
arguments: { | ||
'x-dead-letter-exchange': this.options.deadJobsExchange, | ||
'x-dead-letter-routing-key': this.options.deadJobsQueue | ||
} | ||
}); | ||
case 26: | ||
_context9.next = 28; | ||
return ch.bindQueue(name, this.options.jobsExchange, name); | ||
case 28: | ||
_context9.next = 20; | ||
break; | ||
case 30: | ||
_context9.next = 32; | ||
return ch.close(); | ||
case 32: | ||
case "end": | ||
return _context9.stop(); | ||
} | ||
} | ||
}, _callee9, this); | ||
})); | ||
function assertEnvironment() { | ||
return _assertEnvironment.apply(this, arguments); | ||
} | ||
return assertEnvironment; | ||
}() | ||
}, { | ||
key: "publishConfirm", | ||
value: function publishConfirm(exchange, routingKey, content) { | ||
var _this6 = this; | ||
var options = arguments.length > 3 && arguments[3] !== undefined ? arguments[3] : {}; | ||
return new _bluebird["default"](function (resolve, reject) { | ||
_this6.controlChannel.publish(exchange, routingKey, content, options, function (err) { | ||
if (err) { | ||
return reject(err); | ||
} | ||
return resolve(); | ||
}); | ||
return resolve(); | ||
}); | ||
} | ||
}]); | ||
}); | ||
} | ||
return Heretic; | ||
}(_events.EventEmitter); | ||
} | ||
exports["default"] = Heretic; | ||
exports.default = Heretic; |
@@ -6,3 +6,3 @@ "use strict"; | ||
}); | ||
exports["default"] = void 0; | ||
exports.default = void 0; | ||
@@ -17,499 +17,162 @@ var _events = require("events"); | ||
function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { "default": obj }; } | ||
function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { default: obj }; } | ||
function _typeof(obj) { if (typeof Symbol === "function" && typeof Symbol.iterator === "symbol") { _typeof = function _typeof(obj) { return typeof obj; }; } else { _typeof = function _typeof(obj) { return obj && typeof Symbol === "function" && obj.constructor === Symbol && obj !== Symbol.prototype ? "symbol" : typeof obj; }; } return _typeof(obj); } | ||
class Queue extends _events.EventEmitter { | ||
constructor(heretic, name, concurrency, handler) { | ||
super(); | ||
this.heretic = heretic; | ||
this.knex = this.heretic.knex; | ||
this.tableName = this.heretic.options.tableName; | ||
this.name = name; | ||
this.concurrency = concurrency; | ||
this.handler = handler; | ||
this.channel = null; | ||
this.consumerTag = null; | ||
} | ||
function asyncGeneratorStep(gen, resolve, reject, _next, _throw, key, arg) { try { var info = gen[key](arg); var value = info.value; } catch (error) { reject(error); return; } if (info.done) { resolve(value); } else { Promise.resolve(value).then(_next, _throw); } } | ||
async start() { | ||
await this.openChannel(); | ||
let { | ||
consumerTag | ||
} = await this.channel.consume(this.name, this.receiveJob.bind(this), { | ||
noAck: false | ||
}); | ||
this.consumerTag = consumerTag; | ||
} | ||
function _asyncToGenerator(fn) { return function () { var self = this, args = arguments; return new Promise(function (resolve, reject) { var gen = fn.apply(self, args); function _next(value) { asyncGeneratorStep(gen, resolve, reject, _next, _throw, "next", value); } function _throw(err) { asyncGeneratorStep(gen, resolve, reject, _next, _throw, "throw", err); } _next(undefined); }); }; } | ||
async pause() { | ||
if (this.channel) { | ||
await this.channel.cancel(this.consumerTag); | ||
} | ||
} | ||
function _classCallCheck(instance, Constructor) { if (!(instance instanceof Constructor)) { throw new TypeError("Cannot call a class as a function"); } } | ||
async openChannel() { | ||
if (this.channel) { | ||
return; | ||
} | ||
function _defineProperties(target, props) { for (var i = 0; i < props.length; i++) { var descriptor = props[i]; descriptor.enumerable = descriptor.enumerable || false; descriptor.configurable = true; if ("value" in descriptor) descriptor.writable = true; Object.defineProperty(target, descriptor.key, descriptor); } } | ||
this.channel = await this.heretic.connection.createChannel(); | ||
this.channel.prefetch(this.concurrency); | ||
} | ||
function _createClass(Constructor, protoProps, staticProps) { if (protoProps) _defineProperties(Constructor.prototype, protoProps); if (staticProps) _defineProperties(Constructor, staticProps); return Constructor; } | ||
async closeChannel() { | ||
await this.ch.close(); | ||
} | ||
function _possibleConstructorReturn(self, call) { if (call && (_typeof(call) === "object" || typeof call === "function")) { return call; } return _assertThisInitialized(self); } | ||
async fetchJob(id) { | ||
let job = await this.knex(this.tableName).select('*').first().where('id', id); | ||
function _assertThisInitialized(self) { if (self === void 0) { throw new ReferenceError("this hasn't been initialised - super() hasn't been called"); } return self; } | ||
if (!job) { | ||
throw new Error('Job not found'); | ||
} | ||
function _getPrototypeOf(o) { _getPrototypeOf = Object.setPrototypeOf ? Object.getPrototypeOf : function _getPrototypeOf(o) { return o.__proto__ || Object.getPrototypeOf(o); }; return _getPrototypeOf(o); } | ||
function _inherits(subClass, superClass) { if (typeof superClass !== "function" && superClass !== null) { throw new TypeError("Super expression must either be null or a function"); } subClass.prototype = Object.create(superClass && superClass.prototype, { constructor: { value: subClass, writable: true, configurable: true } }); if (superClass) _setPrototypeOf(subClass, superClass); } | ||
function _setPrototypeOf(o, p) { _setPrototypeOf = Object.setPrototypeOf || function _setPrototypeOf(o, p) { o.__proto__ = p; return o; }; return _setPrototypeOf(o, p); } | ||
var Queue = | ||
/*#__PURE__*/ | ||
function (_EventEmitter) { | ||
_inherits(Queue, _EventEmitter); | ||
function Queue(heretic, name, concurrency, handler) { | ||
var _this; | ||
_classCallCheck(this, Queue); | ||
_this = _possibleConstructorReturn(this, _getPrototypeOf(Queue).call(this)); | ||
_this.heretic = heretic; | ||
_this.knex = _this.heretic.knex; | ||
_this.tableName = _this.heretic.options.tableName; | ||
_this.name = name; | ||
_this.concurrency = concurrency; | ||
_this.handler = handler; | ||
_this.channel = null; | ||
_this.consumerTag = null; | ||
return _this; | ||
return job; | ||
} | ||
_createClass(Queue, [{ | ||
key: "start", | ||
value: function () { | ||
var _start = _asyncToGenerator( | ||
/*#__PURE__*/ | ||
regeneratorRuntime.mark(function _callee() { | ||
var _ref, consumerTag; | ||
async receiveJob(message) { | ||
let body; | ||
return regeneratorRuntime.wrap(function _callee$(_context) { | ||
while (1) { | ||
switch (_context.prev = _context.next) { | ||
case 0: | ||
_context.next = 2; | ||
return this.openChannel(); | ||
try { | ||
body = JSON.parse(message.content.toString('utf8')); | ||
case 2: | ||
_context.next = 4; | ||
return this.channel.consume(this.name, this.receiveJob.bind(this), { | ||
noAck: false | ||
}); | ||
case 4: | ||
_ref = _context.sent; | ||
consumerTag = _ref.consumerTag; | ||
this.consumerTag = consumerTag; | ||
case 7: | ||
case "end": | ||
return _context.stop(); | ||
} | ||
} | ||
}, _callee, this); | ||
})); | ||
function start() { | ||
return _start.apply(this, arguments); | ||
if (!body.id) { | ||
throw new Error('Decoded message did not contain a job id'); | ||
} | ||
} catch (e) { | ||
// we won't ever be able to handle this message properly, so | ||
this.channel.nack(message, false, false); | ||
this.emit('error', new Error('Unable to decode message content')); | ||
return; | ||
} | ||
return start; | ||
}() | ||
}, { | ||
key: "pause", | ||
value: function () { | ||
var _pause = _asyncToGenerator( | ||
/*#__PURE__*/ | ||
regeneratorRuntime.mark(function _callee2() { | ||
return regeneratorRuntime.wrap(function _callee2$(_context2) { | ||
while (1) { | ||
switch (_context2.prev = _context2.next) { | ||
case 0: | ||
if (!this.channel) { | ||
_context2.next = 3; | ||
break; | ||
} | ||
let job; | ||
_context2.next = 3; | ||
return this.channel.cancel(this.consumerTag); | ||
try { | ||
job = await this.fetchJob(body.id); | ||
} catch (err) { | ||
this.channel.nack(message, false, false); | ||
this.emit('error', err); | ||
return; | ||
} | ||
case 3: | ||
case "end": | ||
return _context2.stop(); | ||
} | ||
} | ||
}, _callee2, this); | ||
})); | ||
let d = _domain.default.create(); | ||
function pause() { | ||
return _pause.apply(this, arguments); | ||
} | ||
return new _bluebird.default((resolve, reject) => { | ||
d.on('error', reject); | ||
d.run(this.handler, job, message, err => { | ||
if (err) { | ||
return reject(err); | ||
} | ||
return pause; | ||
}() | ||
}, { | ||
key: "openChannel", | ||
value: function () { | ||
var _openChannel = _asyncToGenerator( | ||
/*#__PURE__*/ | ||
regeneratorRuntime.mark(function _callee3() { | ||
return regeneratorRuntime.wrap(function _callee3$(_context3) { | ||
while (1) { | ||
switch (_context3.prev = _context3.next) { | ||
case 0: | ||
if (!this.channel) { | ||
_context3.next = 2; | ||
break; | ||
} | ||
return resolve(); | ||
}); | ||
}).then(async result => { | ||
let savedJob = await this.jobSuccess(job.id); | ||
this.emit('jobSuccess', savedJob); | ||
return _context3.abrupt("return"); | ||
case 2: | ||
_context3.next = 4; | ||
return this.heretic.connection.createChannel(); | ||
case 4: | ||
this.channel = _context3.sent; | ||
this.channel.prefetch(this.concurrency); | ||
case 6: | ||
case "end": | ||
return _context3.stop(); | ||
} | ||
} | ||
}, _callee3, this); | ||
})); | ||
function openChannel() { | ||
return _openChannel.apply(this, arguments); | ||
if (this.heretic.options.writeOutcomes) { | ||
await this.publishConfirm(this.heretic.options.outcomesExchange, `${this.heretic.options.outcomeRoutingKeyPrefix}.success`, message.content); | ||
} | ||
return openChannel; | ||
}() | ||
}, { | ||
key: "closeChannel", | ||
value: function () { | ||
var _closeChannel = _asyncToGenerator( | ||
/*#__PURE__*/ | ||
regeneratorRuntime.mark(function _callee4() { | ||
return regeneratorRuntime.wrap(function _callee4$(_context4) { | ||
while (1) { | ||
switch (_context4.prev = _context4.next) { | ||
case 0: | ||
_context4.next = 2; | ||
return this.ch.close(); | ||
this.channel.ack(message, false); | ||
}).catch(async err => { | ||
let savedJob = await this.jobFailed(job.id, err.stack); | ||
this.emit('jobFailed', savedJob, err); | ||
case 2: | ||
case "end": | ||
return _context4.stop(); | ||
} | ||
} | ||
}, _callee4, this); | ||
})); | ||
function closeChannel() { | ||
return _closeChannel.apply(this, arguments); | ||
if (this.heretic.options.writeOutcomes) { | ||
await this.publishConfirm(this.heretic.options.outcomesExchange, `${this.heretic.options.outcomeRoutingKeyPrefix}.failed`, message.content); | ||
} | ||
return closeChannel; | ||
}() | ||
}, { | ||
key: "fetchJob", | ||
value: function () { | ||
var _fetchJob = _asyncToGenerator( | ||
/*#__PURE__*/ | ||
regeneratorRuntime.mark(function _callee5(id) { | ||
var job; | ||
return regeneratorRuntime.wrap(function _callee5$(_context5) { | ||
while (1) { | ||
switch (_context5.prev = _context5.next) { | ||
case 0: | ||
_context5.next = 2; | ||
return this.knex(this.tableName).select('*').first().where('id', id); | ||
this.channel.ack(message, false); | ||
}).finally(() => { | ||
_bluebird.default.delay(1).then(() => this.emit('jobComplete')); | ||
}); | ||
} | ||
case 2: | ||
job = _context5.sent; | ||
async jobFailed(jobId, message) { | ||
let result = await this.knex(this.tableName).where({ | ||
id: jobId | ||
}).update({ | ||
status: 'failed', | ||
attempt_logs: this.knex.raw('attempt_logs || ?::jsonb', JSON.stringify({ | ||
time: new Date(), | ||
status: 'failed', | ||
message: message, | ||
hostname: _os.default.hostname() | ||
})), | ||
last_attempted_at: new Date() | ||
}).returning('*'); | ||
return result[0]; | ||
} | ||
if (job) { | ||
_context5.next = 5; | ||
break; | ||
} | ||
async jobSuccess(jobId) { | ||
let result = await this.knex(this.tableName).where({ | ||
id: jobId | ||
}).update({ | ||
status: 'success', | ||
attempt_logs: this.knex.raw('attempt_logs || ?::jsonb', JSON.stringify({ | ||
time: new Date(), | ||
status: 'success', | ||
message: 'success', | ||
hostname: _os.default.hostname() | ||
})), | ||
last_attempted_at: new Date() | ||
}).returning('*'); | ||
return result[0]; | ||
} | ||
throw new Error('Job not found'); | ||
publishConfirm(exchange, routingKey, content, options = {}) { | ||
return new _bluebird.default((resolve, reject) => { | ||
this.heretic.controlChannel.publish(exchange, routingKey, content, options, err => { | ||
if (err) { | ||
return reject(err); | ||
} | ||
case 5: | ||
return _context5.abrupt("return", job); | ||
case 6: | ||
case "end": | ||
return _context5.stop(); | ||
} | ||
} | ||
}, _callee5, this); | ||
})); | ||
function fetchJob(_x) { | ||
return _fetchJob.apply(this, arguments); | ||
} | ||
return fetchJob; | ||
}() | ||
}, { | ||
key: "receiveJob", | ||
value: function () { | ||
var _receiveJob = _asyncToGenerator( | ||
/*#__PURE__*/ | ||
regeneratorRuntime.mark(function _callee8(message) { | ||
var _this2 = this; | ||
var body, job, d; | ||
return regeneratorRuntime.wrap(function _callee8$(_context8) { | ||
while (1) { | ||
switch (_context8.prev = _context8.next) { | ||
case 0: | ||
_context8.prev = 0; | ||
body = JSON.parse(message.content.toString('utf8')); | ||
if (body.id) { | ||
_context8.next = 4; | ||
break; | ||
} | ||
throw new Error('Decoded message did not contain a job id'); | ||
case 4: | ||
_context8.next = 11; | ||
break; | ||
case 6: | ||
_context8.prev = 6; | ||
_context8.t0 = _context8["catch"](0); | ||
// we won't ever be able to handle this message properly, so | ||
this.channel.nack(message, false, false); | ||
this.emit('error', new Error('Unable to decode message content')); | ||
return _context8.abrupt("return"); | ||
case 11: | ||
_context8.prev = 11; | ||
_context8.next = 14; | ||
return this.fetchJob(body.id); | ||
case 14: | ||
job = _context8.sent; | ||
_context8.next = 22; | ||
break; | ||
case 17: | ||
_context8.prev = 17; | ||
_context8.t1 = _context8["catch"](11); | ||
this.channel.nack(message, false, false); | ||
this.emit('error', _context8.t1); | ||
return _context8.abrupt("return"); | ||
case 22: | ||
d = _domain["default"].create(); | ||
return _context8.abrupt("return", new _bluebird["default"](function (resolve, reject) { | ||
d.on('error', reject); | ||
d.run(_this2.handler, job, message, function (err) { | ||
if (err) { | ||
return reject(err); | ||
} | ||
return resolve(); | ||
}); | ||
}).then( | ||
/*#__PURE__*/ | ||
function () { | ||
var _ref2 = _asyncToGenerator( | ||
/*#__PURE__*/ | ||
regeneratorRuntime.mark(function _callee6(result) { | ||
var savedJob; | ||
return regeneratorRuntime.wrap(function _callee6$(_context6) { | ||
while (1) { | ||
switch (_context6.prev = _context6.next) { | ||
case 0: | ||
_context6.next = 2; | ||
return _this2.jobSuccess(job.id); | ||
case 2: | ||
savedJob = _context6.sent; | ||
_this2.emit('jobSuccess', savedJob); | ||
if (!_this2.heretic.options.writeOutcomes) { | ||
_context6.next = 7; | ||
break; | ||
} | ||
_context6.next = 7; | ||
return _this2.publishConfirm(_this2.heretic.options.outcomesExchange, "".concat(_this2.heretic.options.outcomeRoutingKeyPrefix, ".success"), message.content); | ||
case 7: | ||
_this2.channel.ack(message, false); | ||
case 8: | ||
case "end": | ||
return _context6.stop(); | ||
} | ||
} | ||
}, _callee6); | ||
})); | ||
return function (_x3) { | ||
return _ref2.apply(this, arguments); | ||
}; | ||
}())["catch"]( | ||
/*#__PURE__*/ | ||
function () { | ||
var _ref3 = _asyncToGenerator( | ||
/*#__PURE__*/ | ||
regeneratorRuntime.mark(function _callee7(err) { | ||
var savedJob; | ||
return regeneratorRuntime.wrap(function _callee7$(_context7) { | ||
while (1) { | ||
switch (_context7.prev = _context7.next) { | ||
case 0: | ||
_context7.next = 2; | ||
return _this2.jobFailed(job.id, err.stack); | ||
case 2: | ||
savedJob = _context7.sent; | ||
_this2.emit('jobFailed', savedJob, err); | ||
if (!_this2.heretic.options.writeOutcomes) { | ||
_context7.next = 7; | ||
break; | ||
} | ||
_context7.next = 7; | ||
return _this2.publishConfirm(_this2.heretic.options.outcomesExchange, "".concat(_this2.heretic.options.outcomeRoutingKeyPrefix, ".failed"), message.content); | ||
case 7: | ||
_this2.channel.ack(message, false); | ||
case 8: | ||
case "end": | ||
return _context7.stop(); | ||
} | ||
} | ||
}, _callee7); | ||
})); | ||
return function (_x4) { | ||
return _ref3.apply(this, arguments); | ||
}; | ||
}())["finally"](function () { | ||
_bluebird["default"].delay(1).then(function () { | ||
return _this2.emit('jobComplete'); | ||
}); | ||
})); | ||
case 24: | ||
case "end": | ||
return _context8.stop(); | ||
} | ||
} | ||
}, _callee8, this, [[0, 6], [11, 17]]); | ||
})); | ||
function receiveJob(_x2) { | ||
return _receiveJob.apply(this, arguments); | ||
} | ||
return receiveJob; | ||
}() | ||
}, { | ||
key: "jobFailed", | ||
value: function () { | ||
var _jobFailed = _asyncToGenerator( | ||
/*#__PURE__*/ | ||
regeneratorRuntime.mark(function _callee9(jobId, message) { | ||
var result; | ||
return regeneratorRuntime.wrap(function _callee9$(_context9) { | ||
while (1) { | ||
switch (_context9.prev = _context9.next) { | ||
case 0: | ||
_context9.next = 2; | ||
return this.knex(this.tableName).where({ | ||
id: jobId | ||
}).update({ | ||
status: 'failed', | ||
attempt_logs: this.knex.raw('attempt_logs || ?::jsonb', JSON.stringify({ | ||
time: new Date(), | ||
status: 'failed', | ||
message: message, | ||
hostname: _os["default"].hostname() | ||
})), | ||
last_attempted_at: new Date() | ||
}).returning('*'); | ||
case 2: | ||
result = _context9.sent; | ||
return _context9.abrupt("return", result[0]); | ||
case 4: | ||
case "end": | ||
return _context9.stop(); | ||
} | ||
} | ||
}, _callee9, this); | ||
})); | ||
function jobFailed(_x5, _x6) { | ||
return _jobFailed.apply(this, arguments); | ||
} | ||
return jobFailed; | ||
}() | ||
}, { | ||
key: "jobSuccess", | ||
value: function () { | ||
var _jobSuccess = _asyncToGenerator( | ||
/*#__PURE__*/ | ||
regeneratorRuntime.mark(function _callee10(jobId) { | ||
var result; | ||
return regeneratorRuntime.wrap(function _callee10$(_context10) { | ||
while (1) { | ||
switch (_context10.prev = _context10.next) { | ||
case 0: | ||
_context10.next = 2; | ||
return this.knex(this.tableName).where({ | ||
id: jobId | ||
}).update({ | ||
status: 'success', | ||
attempt_logs: this.knex.raw('attempt_logs || ?::jsonb', JSON.stringify({ | ||
time: new Date(), | ||
status: 'success', | ||
message: 'success', | ||
hostname: _os["default"].hostname() | ||
})), | ||
last_attempted_at: new Date() | ||
}).returning('*'); | ||
case 2: | ||
result = _context10.sent; | ||
return _context10.abrupt("return", result[0]); | ||
case 4: | ||
case "end": | ||
return _context10.stop(); | ||
} | ||
} | ||
}, _callee10, this); | ||
})); | ||
function jobSuccess(_x7) { | ||
return _jobSuccess.apply(this, arguments); | ||
} | ||
return jobSuccess; | ||
}() | ||
}, { | ||
key: "publishConfirm", | ||
value: function publishConfirm(exchange, routingKey, content) { | ||
var _this3 = this; | ||
var options = arguments.length > 3 && arguments[3] !== undefined ? arguments[3] : {}; | ||
return new _bluebird["default"](function (resolve, reject) { | ||
_this3.heretic.controlChannel.publish(exchange, routingKey, content, options, function (err) { | ||
if (err) { | ||
return reject(err); | ||
} | ||
return resolve(); | ||
}); | ||
return resolve(); | ||
}); | ||
} | ||
}]); | ||
}); | ||
} | ||
return Queue; | ||
}(_events.EventEmitter); | ||
} | ||
exports["default"] = Queue; | ||
exports.default = Queue; |
{ | ||
"name": "@smartrent/heretic", | ||
"version": "0.5.1", | ||
"version": "0.5.2", | ||
"main": "dist/Heretic.js", | ||
@@ -5,0 +5,0 @@ "dependencies": { |
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
152012
703