oxen-queue
Advanced tools
Comparing version 0.1.9 to 0.1.10-beta-0
@@ -29,3 +29,3 @@ const crypto = require('crypto') | ||
onJobSuccess = async () => {}, | ||
onJobError = async () => {} | ||
onJobError = async () => {}, | ||
}) { | ||
@@ -170,3 +170,3 @@ if (!mysql_config) { | ||
setTimeout(function() { | ||
setTimeout(function () { | ||
if (!_this.processing) { | ||
@@ -182,5 +182,5 @@ return | ||
this.job_recovery_interval = setInterval(function() { | ||
this.job_recovery_interval = setInterval(function () { | ||
if (recover_stuck_jobs) { | ||
_this.recoverStuckJobs().catch(function(error) { | ||
_this.recoverStuckJobs().catch(function (error) { | ||
_this.log('Unable to recover stuck jobs:') | ||
@@ -190,3 +190,3 @@ _this.log(error) | ||
} else { | ||
_this.markStuckJobs().catch(function(error) { | ||
_this.markStuckJobs().catch(function (error) { | ||
_this.log('Unable to mark stuck jobs:') | ||
@@ -227,3 +227,3 @@ _this.log(error) | ||
.then(async job_result => { | ||
return _this.handleSuccess({ job_id: job.id, job_result, job_body:job.body }) | ||
return _this.handleSuccess({ job_id: job.id, job_result, job_body: job.body }) | ||
}) | ||
@@ -274,8 +274,4 @@ .catch(async error => { | ||
const next_jobs = await this.dbQry( | ||
`SELECT id, body FROM ${this.db_table} WHERE ? ORDER BY priority ASC LIMIT ${ | ||
this.batch_size | ||
}`, | ||
{ | ||
batch_id, | ||
} | ||
`SELECT id, body FROM ${this.db_table} WHERE ? ORDER BY priority ASC LIMIT ${this.batch_size}`, | ||
{ batch_id } | ||
) | ||
@@ -379,4 +375,30 @@ | ||
async dbQry(query, params) { | ||
const [result] = await this.db.query(query, params) | ||
return result | ||
const errors_to_retry = [ | ||
'ER_LOCK_WAIT_TIMEOUT', | ||
'ER_LOCK_DEADLOCK', | ||
'ETIMEDOUT', | ||
'ECONNREFUSED', | ||
'try restarting transaction', | ||
] | ||
let retries = 5 | ||
while (retries > 0) { | ||
try { | ||
const connection = await this.db.getConnection() | ||
try { | ||
const [result] = await connection.query(query, params) | ||
return result | ||
} finally { | ||
connection.release() | ||
} | ||
} catch (e) { | ||
retries-- | ||
if (retries === 0 || !errors_to_retry.some(msg => e.message.includes(msg))) { | ||
throw e | ||
} | ||
} | ||
await Promise.delay(Math.random() * 500 + 500) | ||
} | ||
} | ||
@@ -383,0 +405,0 @@ |
{ | ||
"name": "oxen-queue", | ||
"version": "0.1.9", | ||
"version": "0.1.10-beta-0", | ||
"description": "A resilient worker queue backed by MySQL.", | ||
@@ -5,0 +5,0 @@ "engines": { |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
58093
742