node-resque
Advanced tools
Comparing version 6.0.7 to 6.0.8
@@ -215,3 +215,3 @@ import specHelper from "../utils/specHelper"; | ||
// TODO: Typescript seems to have troble with frozen objects | ||
// TODO: Typescript seems to have trouble with frozen objects | ||
// test('job arguments are immutable', async (done) => { | ||
@@ -218,0 +218,0 @@ // await queue.enqueue(specHelper.queue, 'messWithData', { a: 'starting value' }) |
@@ -70,5 +70,8 @@ "use strict"; | ||
} | ||
await this.connection.redis.rpush(this.connection.key("delayed:" + rTimestamp), item); | ||
await this.connection.redis.sadd(this.connection.key("timestamps:" + item), "delayed:" + rTimestamp); | ||
await this.connection.redis.zadd(this.connection.key("delayed_queue_schedule"), rTimestamp.toString(), rTimestamp.toString()); | ||
await this.connection.redis | ||
.multi() | ||
.rpush(this.connection.key("delayed:" + rTimestamp), item) | ||
.sadd(this.connection.key("timestamps:" + item), "delayed:" + rTimestamp) | ||
.zadd(this.connection.key("delayed_queue_schedule"), rTimestamp.toString(), rTimestamp.toString()) | ||
.exec(); | ||
} | ||
@@ -94,4 +97,8 @@ /** | ||
async delQueue(q) { | ||
await this.connection.redis.del(this.connection.key("queue", q)); | ||
await this.connection.redis.srem(this.connection.key("queues"), q); | ||
const { redis } = this.connection; | ||
await redis | ||
.multi() | ||
.del(this.connection.key("queue", q)) | ||
.srem(this.connection.key("queues"), q) | ||
.exec(); | ||
} | ||
@@ -98,0 +105,0 @@ /** |
@@ -170,7 +170,12 @@ "use strict"; | ||
const key = this.connection.key("delayed:" + timestamp); | ||
await this.connection.redis.watch(key); | ||
const length = await this.connection.redis.llen(key); | ||
if (length === 0) { | ||
await this.connection.redis.del(key); | ||
await this.connection.redis.zrem(this.connection.key("delayed_queue_schedule"), timestamp); | ||
await this.connection.redis | ||
.multi() | ||
.del(key) | ||
.zrem(this.connection.key("delayed_queue_schedule"), timestamp) | ||
.exec(); | ||
} | ||
await this.connection.redis.unwatch(); | ||
} | ||
@@ -177,0 +182,0 @@ async checkStuckWorkers() { |
@@ -6,3 +6,3 @@ { | ||
"license": "Apache-2.0", | ||
"version": "6.0.7", | ||
"version": "6.0.8", | ||
"homepage": "http://github.com/actionhero/node-resque", | ||
@@ -9,0 +9,0 @@ "repository": { |
@@ -102,15 +102,12 @@ import * as os from "os"; | ||
await this.connection.redis.rpush( | ||
this.connection.key("delayed:" + rTimestamp), | ||
item | ||
); | ||
await this.connection.redis.sadd( | ||
this.connection.key("timestamps:" + item), | ||
"delayed:" + rTimestamp | ||
); | ||
await this.connection.redis.zadd( | ||
this.connection.key("delayed_queue_schedule"), | ||
rTimestamp.toString(), | ||
rTimestamp.toString() | ||
); | ||
await this.connection.redis | ||
.multi() | ||
.rpush(this.connection.key("delayed:" + rTimestamp), item) | ||
.sadd(this.connection.key("timestamps:" + item), "delayed:" + rTimestamp) | ||
.zadd( | ||
this.connection.key("delayed_queue_schedule"), | ||
rTimestamp.toString(), | ||
rTimestamp.toString() | ||
) | ||
.exec(); | ||
} | ||
@@ -143,4 +140,8 @@ /** | ||
async delQueue(q: string) { | ||
await this.connection.redis.del(this.connection.key("queue", q)); | ||
await this.connection.redis.srem(this.connection.key("queues"), q); | ||
const { redis } = this.connection; | ||
await redis | ||
.multi() | ||
.del(this.connection.key("queue", q)) | ||
.srem(this.connection.key("queues"), q) | ||
.exec(); | ||
} | ||
@@ -147,0 +148,0 @@ |
@@ -268,10 +268,12 @@ // To read notes about the master locking scheme, check out: | ||
const key = this.connection.key("delayed:" + timestamp); | ||
await this.connection.redis.watch(key); | ||
const length = await this.connection.redis.llen(key); | ||
if (length === 0) { | ||
await this.connection.redis.del(key); | ||
await this.connection.redis.zrem( | ||
this.connection.key("delayed_queue_schedule"), | ||
timestamp | ||
); | ||
await this.connection.redis | ||
.multi() | ||
.del(key) | ||
.zrem(this.connection.key("delayed_queue_schedule"), timestamp) | ||
.exec(); | ||
} | ||
await this.connection.redis.unwatch(); | ||
} | ||
@@ -278,0 +280,0 @@ |
640469
7557