Comparing version 0.3.1 to 0.3.2
@@ -33,7 +33,7 @@ var $__Object = Object, $__getOwnPropertyNames = $__Object.getOwnPropertyNames, $__getOwnPropertyDescriptor = $__Object.getOwnPropertyDescriptor, $__getDescriptors = function(object) { | ||
var Q = require('q'); | ||
var RESERVE_TIMEOUT = ms('50s'); | ||
var RESERVE_TIMEOUT = ms('30s'); | ||
var RESERVE_BACKOFF = ms('30s'); | ||
var TIMEOUT_REQUEST = RESERVE_TIMEOUT + ms('10s'); | ||
var PROCESSING_TIMEOUT = ms('2m'); | ||
var PROCESSING_TIMEOUT = ms('10m'); | ||
var RELEASE_DELAY = ms('1m'); | ||
var ERROR_BACKOFF = ms('30s'); | ||
var Configuration = function() { | ||
@@ -166,5 +166,5 @@ 'use strict'; | ||
} catch (requestTimeout) { | ||
requestTimeout = setTimeout((function() { | ||
return outcome.reject('TIMED_OUT'); | ||
}), TIMEOUT_REQUEST); | ||
requestTimeout = setTimeout(function() { | ||
outcome.reject('TIMED_OUT'); | ||
}, TIMEOUT_REQUEST); | ||
($__4 = client[command]).call.apply($__4, $__spread([client], args, [function(error) { | ||
@@ -263,3 +263,3 @@ for (var results = [], $__2 = 1; $__2 < arguments.length; $__2++) results[$__2 - 1] = arguments[$__2]; | ||
var payload = JSON.stringify(job); | ||
var promise = this._put.request('put', 0, 0, PROCESSING_TIMEOUT / 1000, payload); | ||
var promise = this._put.request('put', priority = 0, delay = 0, timeToRun = Math.floor(PROCESSING_TIMEOUT / 1000), payload); | ||
if (callback) { | ||
@@ -317,3 +317,3 @@ promise.then((function() { | ||
var outcome = Q.defer(); | ||
session.request('reserve_with_timeout', 0).then((function($__3) { | ||
session.request('reserve_with_timeout', timeout = 0).then((function($__3) { | ||
var jobID = $__3[0], payload = $__3[1]; | ||
@@ -334,3 +334,3 @@ return this._runAndDestroy(session, jobID, payload); | ||
this.notify.debug("Waiting for jobs on queue %s", this.name); | ||
session.request('reserve_with_timeout', RESERVE_TIMEOUT / 1000).then((function($__3) { | ||
session.request('reserve_with_timeout', timeout = RESERVE_TIMEOUT / 1000).then((function($__3) { | ||
var jobID = $__3[0], payload = $__3[1]; | ||
@@ -341,3 +341,3 @@ return this._runAndDestroy(session, jobID, payload); | ||
this.notify.error(error); | ||
setTimeout(repeat, ERROR_BACKOFF); | ||
setTimeout(repeat, RESERVE_BACKOFF); | ||
} | ||
@@ -352,4 +352,4 @@ }).bind(this)); | ||
var delay = (process.env.NODE_ENV == 'test' ? 0: RELEASE_DELAY); | ||
session.request('release', jobID, 0, delay / 1000). catch ((function(error) { | ||
return this.notify.error(error.stack); | ||
session.request('release', jobID, priority = 0, delay = Math.floor(delay / 1000)). catch ((function(error) { | ||
return this.notify.error(error); | ||
}).bind(this)); | ||
@@ -366,8 +366,8 @@ }).bind(this)); | ||
var errorOnTimeout = setTimeout(function() { | ||
outcome.reject(new Error("Timeout processing job")); | ||
}, PROCESSING_TIMEOUT); | ||
outcome.reject(new Error("Timeout processing job " + jobID)); | ||
}, PROCESSING_TIMEOUT - ms('1s')); | ||
domain.add(errorOnTimeout); | ||
domain.run((function() { | ||
this.notify.info("Picked up job %s from queue %s", jobID, this.name); | ||
this.notify.debug("Processing %s: %s", jobID, payload); | ||
this.notify.debug("Processing job %s", jobID, payload); | ||
var job; | ||
@@ -374,0 +374,0 @@ try { |
{ | ||
"name": "ironium", | ||
"version": "0.3.1", | ||
"version": "0.3.2", | ||
"scripts": { | ||
@@ -5,0 +5,0 @@ "test": "./node_modules/.bin/mocha", |
@@ -12,4 +12,8 @@ const _ = require('lodash'); | ||
// reopen connctions. | ||
const RESERVE_TIMEOUT = ms('50s'); | ||
const RESERVE_TIMEOUT = ms('30s'); | ||
// Back-off in case of connection error, prevents continously failing to | ||
// reserve a job. | ||
const RESERVE_BACKOFF = ms('30s'); | ||
// How long before we consider a request failed due to timeout. | ||
@@ -21,12 +25,9 @@ // Should be longer than RESERVE_TIMEOUT. | ||
// to the queue. | ||
const PROCESSING_TIMEOUT = ms('2m'); | ||
const PROCESSING_TIMEOUT = ms('10m'); | ||
// Delay before a released job is available for pickup again (in seconds). | ||
// This is our primary mechanism for dealing with back-pressure. Ignored in | ||
// test environment. | ||
// This is our primary mechanism for dealing with load during failures. | ||
// Ignored in test environment. | ||
const RELEASE_DELAY = ms('1m'); | ||
// Back-off in case of connection error, prevents continously failing to | ||
// reserve a job. | ||
const ERROR_BACKOFF = ms('30s'); | ||
@@ -213,3 +214,5 @@ | ||
// way we can handle this condition is with a timeout. | ||
let requestTimeout = setTimeout(()=> outcome.reject('TIMED_OUT'), TIMEOUT_REQUEST); | ||
let requestTimeout = setTimeout(function() { | ||
outcome.reject('TIMED_OUT'); | ||
}, TIMEOUT_REQUEST); | ||
@@ -361,3 +364,3 @@ client[command].call(client, ...args, function(error, ...results) { | ||
let payload = JSON.stringify(job); | ||
let promise = this._put.request('put', 0, 0, PROCESSING_TIMEOUT / 1000, payload); | ||
let promise = this._put.request('put', priority = 0, delay = 0, timeToRun = Math.floor(PROCESSING_TIMEOUT / 1000), payload); | ||
if (callback) { | ||
@@ -411,3 +414,3 @@ // Don't pass jobID to callback, easy to use in test before hook, like | ||
let outcome = Q.defer(); | ||
session.request('reserve_with_timeout', 0) | ||
session.request('reserve_with_timeout', timeout = 0) | ||
// If we reserved a job, this will run the job and delete it. | ||
@@ -441,3 +444,3 @@ .then(([jobID, payload])=> this._runAndDestroy(session, jobID, payload) ) | ||
this.notify.debug("Waiting for jobs on queue %s", this.name); | ||
session.request('reserve_with_timeout', RESERVE_TIMEOUT / 1000) | ||
session.request('reserve_with_timeout', timeout = RESERVE_TIMEOUT / 1000) | ||
// If we reserved a job, this will run the job and delete it. | ||
@@ -453,3 +456,3 @@ .then(([jobID, payload])=> this._runAndDestroy(session, jobID, payload) ) | ||
this.notify.error(error); | ||
setTimeout(repeat, ERROR_BACKOFF); | ||
setTimeout(repeat, RESERVE_BACKOFF); | ||
} | ||
@@ -474,4 +477,4 @@ }); | ||
let delay = (process.env.NODE_ENV == 'test' ? 0 : RELEASE_DELAY); | ||
session.request('release', jobID, 0, delay / 1000) | ||
.catch((error)=> this.notify.error(error.stack) ); | ||
session.request('release', jobID, priority = 0, delay = Math.floor(delay / 1000)) | ||
.catch((error)=> this.notify.error(error) ); | ||
}); | ||
@@ -498,6 +501,6 @@ | ||
// This timer trigger if the job doesn't complete in time and rejects | ||
// the promise. | ||
// the promise. Server gets a longer timeout than we do. | ||
let errorOnTimeout = setTimeout(function() { | ||
outcome.reject(new Error("Timeout processing job")); | ||
}, PROCESSING_TIMEOUT); | ||
outcome.reject(new Error("Timeout processing job " + jobID)); | ||
}, PROCESSING_TIMEOUT - ms('1s')); | ||
domain.add(errorOnTimeout); | ||
@@ -511,3 +514,3 @@ | ||
this.notify.info("Picked up job %s from queue %s", jobID, this.name); | ||
this.notify.debug("Processing %s: %s", jobID, payload); | ||
this.notify.debug("Processing job %s", jobID, payload); | ||
// Typically we queue JSON objects, but the payload may be just a | ||
@@ -514,0 +517,0 @@ // string, e.g. some services send URL encoded name/value pairs, or MIME |
58885
1343