node-resque
Advanced tools
Comparing version 8.0.4 to 8.1.0
@@ -1,6 +0,7 @@ | ||
const Redis = require("ioredis"); | ||
import * as Redis from "ioredis"; | ||
import * as NodeResque from "../../src/index"; | ||
const namespace = `resque-test-${process.env.JEST_WORKER_ID || 0}`; | ||
const queue = "test_queue"; | ||
const pkg = "ioredis"; | ||
const NodeResque = require("../../src/index"); | ||
@@ -24,3 +25,5 @@ const SpecHelper = { | ||
connect: async function () { | ||
this.redis = Redis.createClient( | ||
if (!this.connectionDetails.options) this.connectionDetails.options = {}; | ||
this.connectionDetails.options.db = this.connectionDetails?.options?.database; | ||
this.redis = new Redis( | ||
this.connectionDetails.port, | ||
@@ -30,2 +33,3 @@ this.connectionDetails.host, | ||
); | ||
this.redis.setMaxListeners(0); | ||
@@ -32,0 +36,0 @@ if ( |
@@ -6,3 +6,2 @@ "use strict"; | ||
const events_1 = require("events"); | ||
const IORedis = require("ioredis"); | ||
const fs = require("fs"); | ||
@@ -52,11 +51,11 @@ const path = require("path"); | ||
else { | ||
if (this.options.pkg === "ioredis") { | ||
const Pkg = IORedis; | ||
const Pkg = require(this.options.pkg); | ||
if (typeof Pkg.createClient === "function" && | ||
this.options.pkg !== "ioredis") { | ||
this.redis = Pkg.createClient(this.options.port, this.options.host, this.options.options); | ||
} | ||
else { | ||
this.options.options.db = this.options.database; | ||
this.redis = new Pkg(this.options.port, this.options.host, this.options.options); | ||
} | ||
else { | ||
const Pkg = require(this.options.pkg); | ||
this.redis = Pkg.createClient(this.options.port, this.options.host, this.options.options); | ||
} | ||
} | ||
@@ -71,3 +70,3 @@ this.eventListeners.error = (error) => { | ||
this.redis.on("end", () => this.eventListeners.end()); | ||
if (!this.options.redis) { | ||
if (!this.options.redis && typeof this.redis.select === "function") { | ||
await this.redis.select(this.options.database); | ||
@@ -78,2 +77,5 @@ } | ||
loadLua() { | ||
// even though ioredis-mock can run LUA, cjson is not available | ||
if (this.options.pkg === "ioredis-mock") | ||
return; | ||
const luaDir = path.join(__dirname, "..", "..", "lua"); | ||
@@ -80,0 +82,0 @@ const files = fs.readdirSync(luaDir); |
@@ -49,2 +49,4 @@ /// <reference types="node" /> | ||
forceCleanWorker(workerName: any, delta: any): Promise<void>; | ||
private watchIfPossible; | ||
private unwatchIfPossible; | ||
} |
@@ -174,4 +174,4 @@ "use strict"; | ||
const key = this.connection.key("delayed:" + timestamp); | ||
await this.connection.redis.watch(key); | ||
await this.connection.redis.watch(this.connection.key("delayed_queue_schedule")); | ||
await this.watchIfPossible(key); | ||
await this.watchIfPossible(this.connection.key("delayed_queue_schedule")); | ||
const length = await this.connection.redis.llen(key); | ||
@@ -185,3 +185,3 @@ if (length === 0) { | ||
} | ||
await this.connection.redis.unwatch(); | ||
await this.unwatchIfPossible(); | ||
} | ||
@@ -215,3 +215,13 @@ async checkStuckWorkers() { | ||
} | ||
async watchIfPossible(key) { | ||
if (typeof this.connection.redis.watch === "function") { | ||
return this.connection.redis.watch(key); | ||
} | ||
} | ||
async unwatchIfPossible() { | ||
if (typeof this.connection.redis.unwatch === "function") { | ||
return this.connection.redis.unwatch(); | ||
} | ||
} | ||
} | ||
exports.Scheduler = Scheduler; |
@@ -256,3 +256,17 @@ "use strict"; | ||
const workerKey = this.connection.key("worker", this.name, this.stringQueues()); | ||
const encodedJob = await this.connection.redis["popAndStoreJob"](queueKey, workerKey, new Date().toString(), this.queue, this.name); | ||
let encodedJob; | ||
if (this.connection.redis["popAndStoreJob"]) { | ||
encodedJob = await this.connection.redis["popAndStoreJob"](queueKey, workerKey, new Date().toString(), this.queue, this.name); | ||
} | ||
else { | ||
encodedJob = await this.connection.redis.lpop(queueKey); | ||
if (encodedJob) { | ||
await this.connection.redis.set(workerKey, JSON.stringify({ | ||
run_at: new Date().toString(), | ||
queue: this.queue, | ||
worker: this.name, | ||
payload: JSON.parse(encodedJob), | ||
})); | ||
} | ||
} | ||
if (encodedJob) | ||
@@ -259,0 +273,0 @@ currentJob = JSON.parse(encodedJob); |
@@ -6,3 +6,3 @@ { | ||
"license": "Apache-2.0", | ||
"version": "8.0.4", | ||
"version": "8.1.0", | ||
"homepage": "http://github.com/actionhero/node-resque", | ||
@@ -36,2 +36,3 @@ "repository": { | ||
"@types/node": "^14.14.6", | ||
"ioredis-mock": "^5.1.0", | ||
"jest": "^26.6.3", | ||
@@ -55,3 +56,3 @@ "node-schedule": "^1.3.2", | ||
"prepare": "npm run build && npm run docs", | ||
"pretest": "npm run lint", | ||
"pretest": "npm run lint && npm run build", | ||
"lint": "prettier --check src __tests__ examples \"*.md\"", | ||
@@ -58,0 +59,0 @@ "pretty": "prettier --write src __tests__ examples \"**/*.md\"", |
@@ -6,3 +6,2 @@ /// <reference path="./../../node_modules/@types/ioredis/index.d.ts" /> | ||
import * as fs from "fs"; | ||
import * as os from "os"; | ||
import * as path from "path"; | ||
@@ -65,6 +64,8 @@ import { ConnectionOptions } from "../types/options"; | ||
} else { | ||
if (this.options.pkg === "ioredis") { | ||
const Pkg = IORedis; | ||
this.options.options.db = this.options.database; | ||
this.redis = new Pkg( | ||
const Pkg = require(this.options.pkg); | ||
if ( | ||
typeof Pkg.createClient === "function" && | ||
this.options.pkg !== "ioredis" | ||
) { | ||
this.redis = Pkg.createClient( | ||
this.options.port, | ||
@@ -75,4 +76,4 @@ this.options.host, | ||
} else { | ||
const Pkg = require(this.options.pkg); | ||
this.redis = Pkg.createClient( | ||
this.options.options.db = this.options.database; | ||
this.redis = new Pkg( | ||
this.options.port, | ||
@@ -94,5 +95,6 @@ this.options.host, | ||
if (!this.options.redis) { | ||
if (!this.options.redis && typeof this.redis.select === "function") { | ||
await this.redis.select(this.options.database); | ||
} | ||
await connectionTestAndLoadLua(); | ||
@@ -102,2 +104,5 @@ } | ||
loadLua() { | ||
// even though ioredis-mock can run LUA, cjson is not available | ||
if (this.options.pkg === "ioredis-mock") return; | ||
const luaDir = path.join(__dirname, "..", "..", "lua"); | ||
@@ -104,0 +109,0 @@ |
@@ -271,6 +271,4 @@ // To read notes about the leader locking scheme, check out: | ||
const key = this.connection.key("delayed:" + timestamp); | ||
await this.connection.redis.watch(key); | ||
await this.connection.redis.watch( | ||
this.connection.key("delayed_queue_schedule") | ||
); | ||
await this.watchIfPossible(key); | ||
await this.watchIfPossible(this.connection.key("delayed_queue_schedule")); | ||
const length = await this.connection.redis.llen(key); | ||
@@ -284,3 +282,3 @@ if (length === 0) { | ||
} | ||
await this.connection.redis.unwatch(); | ||
await this.unwatchIfPossible(); | ||
} | ||
@@ -330,2 +328,14 @@ | ||
} | ||
private async watchIfPossible(key: string) { | ||
if (typeof this.connection.redis.watch === "function") { | ||
return this.connection.redis.watch(key); | ||
} | ||
} | ||
private async unwatchIfPossible() { | ||
if (typeof this.connection.redis.unwatch === "function") { | ||
return this.connection.redis.unwatch(); | ||
} | ||
} | ||
} |
@@ -425,10 +425,27 @@ import { EventEmitter } from "events"; | ||
const encodedJob: string = await this.connection.redis["popAndStoreJob"]( | ||
queueKey, | ||
workerKey, | ||
new Date().toString(), | ||
this.queue, | ||
this.name | ||
); | ||
let encodedJob: string; | ||
if (this.connection.redis["popAndStoreJob"]) { | ||
encodedJob = await this.connection.redis["popAndStoreJob"]( | ||
queueKey, | ||
workerKey, | ||
new Date().toString(), | ||
this.queue, | ||
this.name | ||
); | ||
} else { | ||
encodedJob = await this.connection.redis.lpop(queueKey); | ||
if (encodedJob) { | ||
await this.connection.redis.set( | ||
workerKey, | ||
JSON.stringify({ | ||
run_at: new Date().toString(), | ||
queue: this.queue, | ||
worker: this.name, | ||
payload: JSON.parse(encodedJob), | ||
}) | ||
); | ||
} | ||
} | ||
if (encodedJob) currentJob = JSON.parse(encodedJob); | ||
@@ -435,0 +452,0 @@ |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
677687
106
8464
11