node-resque
Advanced tools
@@ -34,2 +34,22 @@ import specHelper from "../utils/specHelper"; | ||
}, | ||
jobWithLockTimeout: { | ||
plugins: [Plugins.QueueLock], | ||
pluginOptions: { | ||
QueueLock: { | ||
lockTimeout: specHelper.timeout, | ||
}, | ||
}, | ||
perform: (a, b) => a + b, | ||
}, | ||
stuckJob: { | ||
plugins: [Plugins.QueueLock], | ||
pluginOptions: { | ||
QueueLock: { | ||
lockTimeout: specHelper.smallTimeout, | ||
}, | ||
}, | ||
perform: (a, b) => { | ||
a + b; | ||
}, | ||
}, | ||
}; | ||
@@ -82,2 +102,56 @@ | ||
test("will enqueue a job with timeout set by QueueLock plugin options and check its ttl", async () => { | ||
let job = "jobWithLockTimeout"; | ||
const enqueue = await queue.enqueue(specHelper.queue, job, [1, 2]); | ||
const length = await queue.length(specHelper.queue); | ||
expect(length).toBe(1); | ||
expect(enqueue).toBe(true); | ||
const result = await specHelper.redis.keys( | ||
specHelper.namespace + ":lock*" | ||
); | ||
expect(result).toHaveLength(1); | ||
const ttl = await specHelper.redis.ttl( | ||
specHelper.namespace + | ||
":lock" + | ||
":" + | ||
job + | ||
":" + | ||
specHelper.queue + | ||
":[1,2]" | ||
); | ||
expect(ttl).toBe(specHelper.timeout); | ||
}); | ||
test("will enqueue a repeated stuck job after another one to overwrite the ttl and the expiration time of the lock", async () => { | ||
let stuckJob = "stuckJob"; | ||
const tryOne = await queue.enqueue(specHelper.queue, stuckJob, [1, 2]); | ||
await new Promise((resolve) => | ||
setTimeout( | ||
resolve, | ||
Math.min((specHelper.smallTimeout + 1) * 1000, 4000) | ||
) | ||
); | ||
const tryTwo = await queue.enqueue(specHelper.queue, stuckJob, [1, 2]); | ||
const length = await queue.length(specHelper.queue); | ||
expect(length).toBe(2); | ||
expect(tryOne).toBe(true); | ||
expect(tryTwo).toBe(true); | ||
const result = await specHelper.redis.keys( | ||
specHelper.namespace + ":lock*" | ||
); | ||
expect(result).toHaveLength(1); | ||
const ttl = await specHelper.redis.ttl( | ||
specHelper.namespace + | ||
":lock" + | ||
":" + | ||
stuckJob + | ||
":" + | ||
specHelper.queue + | ||
":[1,2]" | ||
); | ||
expect(ttl).toBe(specHelper.smallTimeout); | ||
}); | ||
describe("with worker", () => { | ||
@@ -84,0 +158,0 @@ let worker; |
@@ -13,2 +13,3 @@ import * as Redis from "ioredis"; | ||
timeout: 500, | ||
smallTimeout: 3, | ||
redis: null, | ||
@@ -15,0 +16,0 @@ connectionDetails: { |
@@ -14,2 +14,3 @@ "use strict"; | ||
if (set === true || set === 1) { | ||
await this.queueObject.connection.redis.expire(key, this.lockTimeout()); | ||
return true; | ||
@@ -23,3 +24,3 @@ } | ||
await this.queueObject.connection.redis.set(key, timeout); | ||
await this.queueObject.connection.redis.expire(key, timeout); | ||
await this.queueObject.connection.redis.expire(key, this.lockTimeout()); | ||
return true; | ||
@@ -26,0 +27,0 @@ } |
@@ -6,3 +6,3 @@ { | ||
"license": "Apache-2.0", | ||
"version": "8.2.3", | ||
"version": "8.2.4", | ||
"homepage": "http://github.com/actionhero/node-resque", | ||
@@ -9,0 +9,0 @@ "repository": { |
@@ -14,2 +14,3 @@ // If a job with the same name, queue, and args is already in the queue, do not enqueue it again | ||
if (set === true || set === 1) { | ||
await this.queueObject.connection.redis.expire(key, this.lockTimeout()); | ||
return true; | ||
@@ -25,3 +26,3 @@ } | ||
await this.queueObject.connection.redis.set(key, timeout); | ||
await this.queueObject.connection.redis.expire(key, timeout); | ||
await this.queueObject.connection.redis.expire(key, this.lockTimeout()); | ||
return true; | ||
@@ -28,0 +29,0 @@ } |
682455
0.35%8592
0.86%