node-resque
Advanced tools
Comparing version 9.0.2 to 9.0.3
@@ -16,27 +16,2 @@ import * as Ioredis from "ioredis"; | ||
test( | ||
"can provide an error if connection failed", | ||
async (resolve) => { | ||
const connectionDetails = { | ||
pkg: specHelper.connectionDetails.pkg, | ||
host: "wrong-hostname", | ||
password: specHelper.connectionDetails.password, | ||
port: specHelper.connectionDetails.port, | ||
database: specHelper.connectionDetails.database, | ||
namespace: specHelper.connectionDetails.namespace, | ||
}; | ||
const connection = new Connection(connectionDetails); | ||
connection.connect(); | ||
connection.on("error", (error) => { | ||
expect(error.message).toMatch(/ENOTFOUND|ETIMEDOUT|ECONNREFUSED/); | ||
connection.end(); | ||
resolve(); | ||
}); | ||
}, | ||
30 * 1000 | ||
); | ||
test("should stat with no redis keys in the namespace", async () => { | ||
@@ -84,17 +59,15 @@ const keys = await specHelper.redis.keys(specHelper.namespace + "*"); | ||
// seed the DB with keys to test with | ||
await Promise.all( | ||
new Array(25) | ||
.fill(0) | ||
.map((v, i) => i + 1) | ||
.map(async (v) => { | ||
await connection.redis.set(`test-key${v}`, v.toString()); | ||
if (v <= 5) { | ||
await connection.redis.set(`test-not-key${v}`, v.toString()); | ||
} | ||
}) | ||
); | ||
for (const v of new Array(5).fill(0).map((v, i) => i + 1)) { | ||
await connection.redis.set(`test-key${v}`, v.toString()); | ||
await connection.redis.set(`test-not-key${v}`, v.toString()); | ||
} | ||
await connection.redis.set(`test-key2`, 2); | ||
await connection.redis.set(`test-key3`, 3); | ||
await connection.redis.set(`test-key4`, 4); | ||
await connection.redis.set(`test-key5`, 5); | ||
// sanity checks to confirm keys above are set and exist | ||
expect(await connection.redis.get("test-key1")).toBe("1"); | ||
expect(await connection.redis.get("test-key20")).toBe("20"); | ||
expect(await connection.redis.get("test-not-key1")).toBe("1"); | ||
@@ -105,7 +78,5 @@ expect(await connection.redis.get("test-not-key5")).toBe("5"); | ||
expect(foundKeys.length).toBe(25); | ||
expect(foundKeys.length).toBe(5); | ||
expect(foundKeys).toContain("test-key1"); | ||
expect(foundKeys).toContain("test-key5"); | ||
expect(foundKeys).toContain("test-key20"); | ||
expect(foundKeys).toContain("test-key25"); | ||
expect(foundKeys).not.toContain("test-key50"); | ||
@@ -112,0 +83,0 @@ expect(foundKeys).not.toContain("test-not-key1"); |
@@ -132,17 +132,22 @@ import specHelper from "../utils/specHelper"; | ||
test("should pass on all worker emits to the instance of multiWorker", async (resolve) => { | ||
await queue.enqueue(specHelper.queue, "missingJob", []); | ||
test("should pass on all worker emits to the instance of multiWorker", async () => { | ||
await new Promise(async (resolve) => { | ||
await queue.enqueue(specHelper.queue, "missingJob", []); | ||
multiWorker.start(); | ||
multiWorker.start(); | ||
multiWorker.on("failure", async (workerId, queue, job, error, duration) => { | ||
expect(String(error)).toBe( | ||
'Error: No job defined for class "missingJob"' | ||
multiWorker.on( | ||
"failure", | ||
async (workerId, queue, job, error, duration) => { | ||
expect(String(error)).toBe( | ||
'Error: No job defined for class "missingJob"' | ||
); | ||
expect(duration).toBeGreaterThanOrEqual(0); | ||
multiWorker.removeAllListeners("error"); | ||
await multiWorker.end(); | ||
resolve(null); | ||
} | ||
); | ||
expect(duration).toBeGreaterThanOrEqual(0); | ||
multiWorker.removeAllListeners("error"); | ||
await multiWorker.end(); | ||
resolve(); | ||
}); | ||
}); | ||
}); |
@@ -19,30 +19,2 @@ import { Queue, Worker } from "../../src"; | ||
test( | ||
"can provide an error if connection failed", | ||
async (resolve) => { | ||
const connectionDetails = { | ||
pkg: specHelper.connectionDetails.pkg, | ||
host: "wronghostname", | ||
password: specHelper.connectionDetails.password, | ||
port: specHelper.connectionDetails.port, | ||
database: specHelper.connectionDetails.database, | ||
namespace: specHelper.connectionDetails.namespace, | ||
}; | ||
queue = new Queue( | ||
{ connection: connectionDetails, queue: specHelper.queue }, | ||
{} | ||
); | ||
queue.connect(); | ||
queue.on("error", (error) => { | ||
expect(error.message).toMatch(/ENOTFOUND|ETIMEDOUT|ECONNREFUSED/); | ||
queue.end(); | ||
resolve(); | ||
}); | ||
}, | ||
30 * 1000 | ||
); | ||
describe("[with connection]", () => { | ||
@@ -584,107 +556,117 @@ beforeAll(async () => { | ||
test("we can see what workers are working on (active)", async (resolve) => { | ||
queue.enqueue(specHelper.queue, "slowJob"); | ||
workerA.start(); | ||
test("we can see what workers are working on (active)", async () => { | ||
await new Promise(async (resolve) => { | ||
queue.enqueue(specHelper.queue, "slowJob"); | ||
workerA.start(); | ||
workerA.on("job", async () => { | ||
workerA.removeAllListeners("job"); | ||
workerA.on("job", async () => { | ||
workerA.removeAllListeners("job"); | ||
const data = await queue.allWorkingOn(); | ||
expect(data).toHaveProperty("workerB", "started"); | ||
const paylaod = data.workerA.payload; | ||
expect(paylaod.queue).toBe("test_queue"); | ||
expect(paylaod.class).toBe("slowJob"); | ||
const data = await queue.allWorkingOn(); | ||
expect(data).toHaveProperty("workerB", "started"); | ||
const paylaod = data.workerA.payload; | ||
expect(paylaod.queue).toBe("test_queue"); | ||
expect(paylaod.class).toBe("slowJob"); | ||
return resolve(); | ||
return resolve(null); | ||
}); | ||
}); | ||
}); | ||
test("can remove stuck workers and re-enqueue their jobs", async (resolve) => { | ||
const age = 1; | ||
await queue.enqueue(specHelper.queue, "slowJob", [{ a: 1 }]); | ||
await workerA.start(); | ||
test("can remove stuck workers and re-enqueue their jobs", async () => { | ||
await new Promise(async (resolve) => { | ||
const age = 1; | ||
await queue.enqueue(specHelper.queue, "slowJob", [{ a: 1 }]); | ||
await workerA.start(); | ||
// hijack a worker in the middle of working on a job | ||
workerA.on("job", async () => { | ||
workerA.removeAllListeners("job"); | ||
// hijack a worker in the middle of working on a job | ||
workerA.on("job", async () => { | ||
workerA.removeAllListeners("job"); | ||
const workingOnData = await queue.allWorkingOn(); | ||
const paylaod = workingOnData.workerA.payload; | ||
expect(paylaod.queue).toBe("test_queue"); | ||
expect(paylaod.class).toBe("slowJob"); | ||
expect(paylaod.args[0].a).toBe(1); | ||
const workingOnData = await queue.allWorkingOn(); | ||
const paylaod = workingOnData.workerA.payload; | ||
expect(paylaod.queue).toBe("test_queue"); | ||
expect(paylaod.class).toBe("slowJob"); | ||
expect(paylaod.args[0].a).toBe(1); | ||
const runAt = Date.parse(workingOnData.workerA.run_at); | ||
const now = new Date().getTime(); | ||
expect(runAt).toBeGreaterThanOrEqual(now - 1001); | ||
expect(runAt).toBeLessThanOrEqual(now); | ||
const runAt = Date.parse(workingOnData.workerA.run_at); | ||
const now = new Date().getTime(); | ||
expect(runAt).toBeGreaterThanOrEqual(now - 1001); | ||
expect(runAt).toBeLessThanOrEqual(now); | ||
const cleanData = await queue.cleanOldWorkers(age); | ||
expect(Object.keys(cleanData).length).toBe(1); | ||
expect(cleanData.workerA.queue).toBe("test_queue"); | ||
expect(cleanData.workerA.worker).toBe("workerA"); | ||
expect(cleanData.workerA.payload.class).toBe("slowJob"); | ||
expect(cleanData.workerA.payload.args[0].a).toBe(1); | ||
const cleanData = await queue.cleanOldWorkers(age); | ||
expect(Object.keys(cleanData).length).toBe(1); | ||
expect(cleanData.workerA.queue).toBe("test_queue"); | ||
expect(cleanData.workerA.worker).toBe("workerA"); | ||
expect(cleanData.workerA.payload.class).toBe("slowJob"); | ||
expect(cleanData.workerA.payload.args[0].a).toBe(1); | ||
let failedData = await specHelper.redis.rpop( | ||
specHelper.namespace + ":" + "failed" | ||
); | ||
failedData = JSON.parse(failedData); | ||
expect(failedData.queue).toBe(specHelper.queue); | ||
expect(failedData.exception).toBe("Worker Timeout (killed manually)"); | ||
expect(failedData.error).toBe("Worker Timeout (killed manually)"); | ||
expect(failedData.payload.class).toBe("slowJob"); | ||
expect(failedData.payload.args[0].a).toBe(1); | ||
let failedData = await specHelper.redis.rpop( | ||
specHelper.namespace + ":" + "failed" | ||
); | ||
failedData = JSON.parse(failedData); | ||
expect(failedData.queue).toBe(specHelper.queue); | ||
expect(failedData.exception).toBe( | ||
"Worker Timeout (killed manually)" | ||
); | ||
expect(failedData.error).toBe("Worker Timeout (killed manually)"); | ||
expect(failedData.payload.class).toBe("slowJob"); | ||
expect(failedData.payload.args[0].a).toBe(1); | ||
const workingOnDataAgain = await queue.allWorkingOn(); | ||
expect(Object.keys(workingOnDataAgain).length).toBe(1); | ||
expect(workingOnDataAgain.workerB).toBe("started"); | ||
const workingOnDataAgain = await queue.allWorkingOn(); | ||
expect(Object.keys(workingOnDataAgain).length).toBe(1); | ||
expect(workingOnDataAgain.workerB).toBe("started"); | ||
return resolve(); | ||
return resolve(null); | ||
}); | ||
}); | ||
}); | ||
test("will not remove stuck jobs within the time limit", async (resolve) => { | ||
const age = 999; | ||
queue.enqueue(specHelper.queue, "slowJob"); | ||
workerA.start(); | ||
test("will not remove stuck jobs within the time limit", async () => { | ||
await new Promise(async (resolve) => { | ||
const age = 999; | ||
queue.enqueue(specHelper.queue, "slowJob"); | ||
workerA.start(); | ||
// hijack a worker in the middle of working on a job | ||
workerA.on("job", async () => { | ||
workerA.removeAllListeners("job"); | ||
// hijack a worker in the middle of working on a job | ||
workerA.on("job", async () => { | ||
workerA.removeAllListeners("job"); | ||
const data = await queue.cleanOldWorkers(age); | ||
expect(Object.keys(data).length).toBe(0); | ||
const data = await queue.cleanOldWorkers(age); | ||
expect(Object.keys(data).length).toBe(0); | ||
const workingOn = await queue.allWorkingOn(); | ||
const paylaod = workingOn.workerA.payload; | ||
expect(paylaod.queue).toBe("test_queue"); | ||
expect(paylaod.class).toBe("slowJob"); | ||
const workingOn = await queue.allWorkingOn(); | ||
const paylaod = workingOn.workerA.payload; | ||
expect(paylaod.queue).toBe("test_queue"); | ||
expect(paylaod.class).toBe("slowJob"); | ||
return resolve(); | ||
return resolve(null); | ||
}); | ||
}); | ||
}); | ||
test("can forceClean a worker, returning the error payload", async (resolve) => { | ||
queue.enqueue(specHelper.queue, "slowJob"); | ||
workerA.start(); | ||
test("can forceClean a worker, returning the error payload", async () => { | ||
await new Promise(async (resolve) => { | ||
queue.enqueue(specHelper.queue, "slowJob"); | ||
workerA.start(); | ||
// hijack a worker in the middle of working on a job | ||
workerA.on("job", async () => { | ||
workerA.removeAllListeners("job"); | ||
// hijack a worker in the middle of working on a job | ||
workerA.on("job", async () => { | ||
workerA.removeAllListeners("job"); | ||
const errorPayload = await queue.forceCleanWorker(workerA.name); | ||
const errorPayload = await queue.forceCleanWorker(workerA.name); | ||
expect(errorPayload.worker).toBe("workerA"); | ||
expect(errorPayload.queue).toBe("test_queue"); | ||
expect(errorPayload.payload.class).toBe("slowJob"); | ||
expect(errorPayload.exception).toBe( | ||
"Worker Timeout (killed manually)" | ||
); | ||
expect(errorPayload.backtrace[0]).toMatch(/killed by/); | ||
expect(errorPayload.backtrace[1]).toBe("queue#forceCleanWorker"); | ||
expect(errorPayload.backtrace[2]).toBe("node-resque"); | ||
expect(errorPayload.failed_at).toBeTruthy(); | ||
expect(errorPayload.worker).toBe("workerA"); | ||
expect(errorPayload.queue).toBe("test_queue"); | ||
expect(errorPayload.payload.class).toBe("slowJob"); | ||
expect(errorPayload.exception).toBe( | ||
"Worker Timeout (killed manually)" | ||
); | ||
expect(errorPayload.backtrace[0]).toMatch(/killed by/); | ||
expect(errorPayload.backtrace[1]).toBe("queue#forceCleanWorker"); | ||
expect(errorPayload.backtrace[2]).toBe("node-resque"); | ||
expect(errorPayload.failed_at).toBeTruthy(); | ||
return resolve(); | ||
return resolve(null); | ||
}); | ||
}); | ||
@@ -711,20 +693,22 @@ }); | ||
test("retryStuckJobs", async (resolve) => { | ||
queue.enqueue(specHelper.queue, "slowJob"); | ||
workerA.start(); | ||
test("retryStuckJobs", async () => { | ||
await new Promise(async (resolve) => { | ||
queue.enqueue(specHelper.queue, "slowJob"); | ||
workerA.start(); | ||
// hijack a worker in the middle of working on a job | ||
workerA.on("job", async () => { | ||
workerA.removeAllListeners("job"); | ||
// hijack a worker in the middle of working on a job | ||
workerA.on("job", async () => { | ||
workerA.removeAllListeners("job"); | ||
await queue.forceCleanWorker(workerA.name); | ||
let failedJobs = await queue.failed(0, 100); | ||
expect(failedJobs.length).toBe(1); | ||
await queue.forceCleanWorker(workerA.name); | ||
let failedJobs = await queue.failed(0, 100); | ||
expect(failedJobs.length).toBe(1); | ||
await queue.retryStuckJobs(); | ||
await queue.retryStuckJobs(); | ||
failedJobs = await queue.failed(0, 100); | ||
expect(failedJobs.length).toBe(0); | ||
failedJobs = await queue.failed(0, 100); | ||
expect(failedJobs.length).toBe(0); | ||
return resolve(); | ||
return resolve(null); | ||
}); | ||
}); | ||
@@ -731,0 +715,0 @@ }); |
@@ -18,44 +18,5 @@ import { Queue, Scheduler, Worker } from "../../src"; | ||
describe("with specHelper", () => { | ||
beforeAll(async () => { | ||
await specHelper.connect(); | ||
}); | ||
afterAll(async () => { | ||
await specHelper.disconnect(); | ||
}); | ||
beforeAll(async () => await specHelper.connect()); | ||
afterAll(async () => await specHelper.disconnect()); | ||
test( | ||
"can provide an error if connection failed", | ||
async (done) => { | ||
const connectionDetails = { | ||
pkg: specHelper.connectionDetails.pkg, | ||
host: "wronghostname", | ||
password: specHelper.connectionDetails.password, | ||
port: specHelper.connectionDetails.port, | ||
database: specHelper.connectionDetails.database, | ||
namespace: specHelper.connectionDetails.namespace, | ||
}; | ||
const brokenScheduler = new Scheduler({ | ||
connection: connectionDetails, | ||
timeout: specHelper.timeout, | ||
}); | ||
brokenScheduler.on("poll", () => { | ||
throw new Error("Should not emit poll"); | ||
}); | ||
brokenScheduler.on("leader", () => { | ||
throw new Error("Should not emit leader"); | ||
}); | ||
brokenScheduler.on("error", async (error) => { | ||
expect(error.message).toMatch(/ENOTFOUND|ETIMEDOUT|ECONNREFUSED/); | ||
await brokenScheduler.end(); | ||
done(); | ||
}); | ||
brokenScheduler.connect(); | ||
}, | ||
30 * 1000 | ||
); | ||
describe("locking", () => { | ||
@@ -188,43 +149,46 @@ beforeEach(async () => { | ||
await scheduler.end(); | ||
await worker.end(); | ||
}); | ||
test("will remove stuck workers and fail thier jobs", async (done) => { | ||
await scheduler.connect(); | ||
await scheduler.start(); | ||
await worker.start(); | ||
test("will remove stuck workers and fail their jobs", async () => { | ||
await new Promise(async (resolve) => { | ||
await scheduler.connect(); | ||
await scheduler.start(); | ||
await worker.start(); | ||
const workers = await queue.allWorkingOn(); | ||
const h = {}; | ||
h[worker.name] = "started"; | ||
expect(workers).toEqual(h); | ||
const workers = await queue.allWorkingOn(); | ||
const h = {}; | ||
h[worker.name] = "started"; | ||
expect(workers).toEqual(h); | ||
await queue.enqueue("stuckJobs", "stuck", ["oh no!"]); | ||
await queue.enqueue("stuckJobs", "stuck", ["oh no!"]); | ||
scheduler.on( | ||
"cleanStuckWorker", | ||
async (workerName, errorPayload, delta) => { | ||
// response data should contain failure | ||
expect(workerName).toEqual(worker.name); | ||
expect(errorPayload.worker).toEqual(worker.name); | ||
expect(errorPayload.error).toEqual( | ||
"Worker Timeout (killed manually)" | ||
); | ||
scheduler.on( | ||
"cleanStuckWorker", | ||
async (workerName, errorPayload, delta) => { | ||
// response data should contain failure | ||
expect(workerName).toEqual(worker.name); | ||
expect(errorPayload.worker).toEqual(worker.name); | ||
expect(errorPayload.error).toEqual( | ||
"Worker Timeout (killed manually)" | ||
); | ||
// check the workers list, should be empty now | ||
expect(await queue.allWorkingOn()).toEqual({}); | ||
// check the workers list, should be empty now | ||
expect(await queue.allWorkingOn()).toEqual({}); | ||
// check the failed list | ||
let failed = await specHelper.redis.rpop( | ||
specHelper.namespace + ":" + "failed" | ||
); | ||
failed = JSON.parse(failed); | ||
expect(failed.queue).toBe("stuckJobs"); | ||
expect(failed.exception).toBe("Worker Timeout (killed manually)"); | ||
expect(failed.error).toBe("Worker Timeout (killed manually)"); | ||
// check the failed list | ||
let failed = await specHelper.redis.rpop( | ||
specHelper.namespace + ":" + "failed" | ||
); | ||
failed = JSON.parse(failed); | ||
expect(failed.queue).toBe("stuckJobs"); | ||
expect(failed.exception).toBe( | ||
"Worker Timeout (killed manually)" | ||
); | ||
expect(failed.error).toBe("Worker Timeout (killed manually)"); | ||
scheduler.removeAllListeners("cleanStuckWorker"); | ||
done(); | ||
} | ||
); | ||
scheduler.removeAllListeners("cleanStuckWorker"); | ||
resolve(null); | ||
} | ||
); | ||
}); | ||
}); | ||
@@ -231,0 +195,0 @@ }); |
@@ -59,34 +59,2 @@ import { Queue, Worker } from "../../src"; | ||
test( | ||
"can provide an error if connection failed", | ||
async (done) => { | ||
const connectionDetails = { | ||
pkg: specHelper.connectionDetails.pkg, | ||
host: "wronghostname", | ||
password: specHelper.connectionDetails.password, | ||
port: specHelper.connectionDetails.port, | ||
database: specHelper.connectionDetails.database, | ||
namespace: specHelper.connectionDetails.namespace, | ||
}; | ||
const worker = new Worker( | ||
{ | ||
connection: connectionDetails, | ||
timeout: specHelper.timeout, | ||
queues: [specHelper.queue], | ||
}, | ||
{} | ||
); | ||
worker.on("error", async (error) => { | ||
expect(error.message).toMatch(/ENOTFOUND|ETIMEDOUT|ECONNREFUSED/); | ||
await worker.end(); | ||
done(); | ||
}); | ||
worker.connect(); | ||
}, | ||
30 * 1000 | ||
); | ||
describe("performInline", () => { | ||
@@ -169,29 +137,31 @@ beforeAll(() => { | ||
describe("integration", () => { | ||
test("will notice new job queues when started with queues=*", async (done) => { | ||
const wildcardWorker = new Worker( | ||
{ | ||
connection: specHelper.connectionDetails, | ||
timeout: specHelper.timeout, | ||
queues: ["*"], | ||
}, | ||
jobs | ||
); | ||
test("will notice new job queues when started with queues=*", async () => { | ||
await new Promise(async (resolve) => { | ||
const wildcardWorker = new Worker( | ||
{ | ||
connection: specHelper.connectionDetails, | ||
timeout: specHelper.timeout, | ||
queues: ["*"], | ||
}, | ||
jobs | ||
); | ||
await wildcardWorker.connect(); | ||
await wildcardWorker.start(); | ||
await wildcardWorker.connect(); | ||
await wildcardWorker.start(); | ||
setTimeout(async () => { | ||
await queue.enqueue("__newQueue", "add", [1, 2]); | ||
}, 501); | ||
setTimeout(async () => { | ||
await queue.enqueue("__newQueue", "add", [1, 2]); | ||
}, 501); | ||
wildcardWorker.on("success", async (q, job, result, duration) => { | ||
expect(q).toBe("__newQueue"); | ||
expect(job.class).toBe("add"); | ||
expect(result).toBe(3); | ||
expect(wildcardWorker.result).toBe(result); | ||
expect(duration).toBeGreaterThanOrEqual(0); | ||
wildcardWorker.on("success", async (q, job, result, duration) => { | ||
expect(q).toBe("__newQueue"); | ||
expect(job.class).toBe("add"); | ||
expect(result).toBe(3); | ||
expect(wildcardWorker.result).toBe(result); | ||
expect(duration).toBeGreaterThanOrEqual(0); | ||
wildcardWorker.removeAllListeners("success"); | ||
await wildcardWorker.end(); | ||
done(); | ||
wildcardWorker.removeAllListeners("success"); | ||
await wildcardWorker.end(); | ||
resolve(null); | ||
}); | ||
}); | ||
@@ -217,31 +187,35 @@ }); | ||
test("will mark a job as failed", async (done) => { | ||
await queue.enqueue(specHelper.queue, "badAdd", [1, 2]); | ||
test("will mark a job as failed", async () => { | ||
await new Promise(async (resolve) => { | ||
await queue.enqueue(specHelper.queue, "badAdd", [1, 2]); | ||
await worker.start(); | ||
await worker.start(); | ||
worker.on("failure", (q, job, failire) => { | ||
expect(q).toBe(specHelper.queue); | ||
expect(job.class).toBe("badAdd"); | ||
expect(failire.message).toBe("Blue Smoke"); | ||
worker.on("failure", (q, job, failire) => { | ||
expect(q).toBe(specHelper.queue); | ||
expect(job.class).toBe("badAdd"); | ||
expect(failire.message).toBe("Blue Smoke"); | ||
worker.removeAllListeners("failure"); | ||
done(); | ||
worker.removeAllListeners("failure"); | ||
resolve(null); | ||
}); | ||
}); | ||
}); | ||
test("can work a job and return successful things", async (done) => { | ||
await queue.enqueue(specHelper.queue, "add", [1, 2]); | ||
test("can work a job and return successful things", async () => { | ||
await new Promise(async (resolve) => { | ||
await queue.enqueue(specHelper.queue, "add", [1, 2]); | ||
worker.start(); | ||
worker.start(); | ||
worker.on("success", (q, job, result, duration) => { | ||
expect(q).toBe(specHelper.queue); | ||
expect(job.class).toBe("add"); | ||
expect(result).toBe(3); | ||
expect(worker.result).toBe(result); | ||
expect(duration).toBeGreaterThanOrEqual(0); | ||
worker.on("success", (q, job, result, duration) => { | ||
expect(q).toBe(specHelper.queue); | ||
expect(job.class).toBe("add"); | ||
expect(result).toBe(3); | ||
expect(worker.result).toBe(result); | ||
expect(duration).toBeGreaterThanOrEqual(0); | ||
worker.removeAllListeners("success"); | ||
done(); | ||
worker.removeAllListeners("success"); | ||
resolve(null); | ||
}); | ||
}); | ||
@@ -265,29 +239,33 @@ }); | ||
test("can accept jobs that are simple functions", async (done) => { | ||
await queue.enqueue(specHelper.queue, "quickDefine"); | ||
test("can accept jobs that are simple functions", async () => { | ||
await new Promise(async (resolve) => { | ||
await queue.enqueue(specHelper.queue, "quickDefine"); | ||
worker.start(); | ||
worker.start(); | ||
worker.on("success", (q, job, result, duration) => { | ||
expect(result).toBe("ok"); | ||
expect(duration).toBeGreaterThanOrEqual(0); | ||
worker.removeAllListeners("success"); | ||
done(); | ||
worker.on("success", (q, job, result, duration) => { | ||
expect(result).toBe("ok"); | ||
expect(duration).toBeGreaterThanOrEqual(0); | ||
worker.removeAllListeners("success"); | ||
resolve(null); | ||
}); | ||
}); | ||
}); | ||
test("will not work jobs that are not defined", async (done) => { | ||
await queue.enqueue(specHelper.queue, "somethingFake"); | ||
test("will not work jobs that are not defined", async () => { | ||
await new Promise(async (resolve) => { | ||
await queue.enqueue(specHelper.queue, "somethingFake"); | ||
worker.start(); | ||
worker.start(); | ||
worker.on("failure", (q, job, failure, duration) => { | ||
expect(q).toBe(specHelper.queue); | ||
expect(String(failure)).toBe( | ||
'Error: No job defined for class "somethingFake"' | ||
); | ||
expect(duration).toBeGreaterThanOrEqual(0); | ||
worker.on("failure", (q, job, failure, duration) => { | ||
expect(q).toBe(specHelper.queue); | ||
expect(String(failure)).toBe( | ||
'Error: No job defined for class "somethingFake"' | ||
); | ||
expect(duration).toBeGreaterThanOrEqual(0); | ||
worker.removeAllListeners("failure"); | ||
done(); | ||
worker.removeAllListeners("failure"); | ||
resolve(null); | ||
}); | ||
}); | ||
@@ -306,24 +284,36 @@ }); | ||
test("will ping with status even when working a slow job", async (done) => { | ||
const nowInSeconds = Math.round(new Date().getTime() / 1000); | ||
await worker.start(); | ||
await new Promise((resolve) => | ||
setTimeout(resolve, worker.options.timeout * 2) | ||
); | ||
const pingKey = worker.connection.key("worker", "ping", worker.name); | ||
const firstPayload = JSON.parse(await specHelper.redis.get(pingKey)); | ||
expect(firstPayload.name).toEqual(worker.name); | ||
expect(firstPayload.time).toBeGreaterThanOrEqual(nowInSeconds); | ||
test("will ping with status even when working a slow job", async () => { | ||
await new Promise(async (resolve) => { | ||
const nowInSeconds = Math.round(new Date().getTime() / 1000); | ||
await worker.start(); | ||
await new Promise((resolve) => | ||
setTimeout(resolve, worker.options.timeout * 2) | ||
); | ||
const pingKey = worker.connection.key( | ||
"worker", | ||
"ping", | ||
worker.name | ||
); | ||
const firstPayload = JSON.parse( | ||
await specHelper.redis.get(pingKey) | ||
); | ||
expect(firstPayload.name).toEqual(worker.name); | ||
expect(firstPayload.time).toBeGreaterThanOrEqual(nowInSeconds); | ||
await queue.enqueue(specHelper.queue, "twoSeconds"); | ||
await queue.enqueue(specHelper.queue, "twoSeconds"); | ||
worker.on("success", (q, job, result) => { | ||
expect(result).toBe("slow"); | ||
worker.removeAllListeners("success"); | ||
done(); | ||
worker.on("success", (q, job, result) => { | ||
expect(result).toBe("slow"); | ||
worker.removeAllListeners("success"); | ||
resolve(null); | ||
}); | ||
const secondPayload = JSON.parse( | ||
await specHelper.redis.get(pingKey) | ||
); | ||
expect(secondPayload.name).toEqual(worker.name); | ||
expect(secondPayload.time).toBeGreaterThanOrEqual( | ||
firstPayload.time | ||
); | ||
}); | ||
const secondPayload = JSON.parse(await specHelper.redis.get(pingKey)); | ||
expect(secondPayload.name).toEqual(worker.name); | ||
expect(secondPayload.time).toBeGreaterThanOrEqual(firstPayload.time); | ||
}); | ||
@@ -330,0 +320,0 @@ }); |
@@ -74,15 +74,17 @@ import { Queue, Worker, Scheduler } from "../../src"; | ||
test("the worker can work the job", async (done) => { | ||
await worker.start(); | ||
worker.on("success", async (q, job, result, duration) => { | ||
expect(q).toBe("math"); | ||
expect(job.class).toBe("add"); | ||
expect(result).toBe(3); | ||
expect(worker.result).toBe(result); | ||
expect(duration).toBeGreaterThanOrEqual(0); | ||
test("the worker can work the job", async () => { | ||
await new Promise(async (resolve) => { | ||
await worker.start(); | ||
worker.on("success", async (q, job, result, duration) => { | ||
expect(q).toBe("math"); | ||
expect(job.class).toBe("add"); | ||
expect(result).toBe(3); | ||
expect(worker.result).toBe(result); | ||
expect(duration).toBeGreaterThanOrEqual(0); | ||
worker.removeAllListeners("success"); | ||
done(); | ||
worker.removeAllListeners("success"); | ||
resolve(null); | ||
}); | ||
}); | ||
}); | ||
}); |
@@ -76,15 +76,17 @@ import { Queue, Worker, Scheduler } from "../../src"; | ||
test("the worker can work the job", async (done) => { | ||
await worker.start(); | ||
worker.on("success", async (q, job, result, duration) => { | ||
expect(q).toBe("math"); | ||
expect(job.class).toBe("add"); | ||
expect(result).toBe(3); | ||
expect(worker.result).toBe(result); | ||
expect(duration).toBeGreaterThanOrEqual(0); | ||
test("the worker can work the job", async () => { | ||
await new Promise(async (resolve) => { | ||
await worker.start(); | ||
worker.on("success", async (q, job, result, duration) => { | ||
expect(q).toBe("math"); | ||
expect(job.class).toBe("add"); | ||
expect(result).toBe(3); | ||
expect(worker.result).toBe(result); | ||
expect(duration).toBeGreaterThanOrEqual(0); | ||
worker.removeAllListeners("success"); | ||
done(); | ||
worker.removeAllListeners("success"); | ||
resolve(null); | ||
}); | ||
}); | ||
}); | ||
}); |
@@ -115,206 +115,216 @@ import specHelper from "../utils/specHelper"; | ||
test("allows the key to be specified as a function", async (done) => { | ||
let calls = 0; | ||
test("allows the key to be specified as a function", async () => { | ||
await new Promise(async (resolve) => { | ||
let calls = 0; | ||
const functionJobs = { | ||
jobLockAdd: { | ||
plugins: [Plugins.JobLock], | ||
pluginOptions: { | ||
JobLock: { | ||
key: function () { | ||
// Once to create, once to delete | ||
if (++calls === 2) { | ||
worker1.end(); | ||
done(); | ||
} | ||
const key = this.worker.connection.key( | ||
"customKey", | ||
Math.max.apply(Math.max, this.args) | ||
); | ||
return key; | ||
const functionJobs = { | ||
jobLockAdd: { | ||
plugins: [Plugins.JobLock], | ||
pluginOptions: { | ||
JobLock: { | ||
key: function () { | ||
// Once to create, once to delete | ||
if (++calls === 2) { | ||
worker1.end(); | ||
resolve(null); | ||
} | ||
const key = this.worker.connection.key( | ||
"customKey", | ||
Math.max.apply(Math.max, this.args) | ||
); | ||
return key; | ||
}, | ||
}, | ||
}, | ||
perform: (a, b) => { | ||
return a + b; | ||
}, | ||
}, | ||
perform: (a, b) => { | ||
return a + b; | ||
}; | ||
worker1 = new Worker( | ||
{ | ||
connection: specHelper.cleanConnectionDetails(), | ||
timeout: specHelper.timeout, | ||
queues: [specHelper.queue], | ||
}, | ||
}, | ||
}; | ||
worker1 = new Worker( | ||
{ | ||
connection: specHelper.cleanConnectionDetails(), | ||
timeout: specHelper.timeout, | ||
queues: [specHelper.queue], | ||
}, | ||
functionJobs | ||
); | ||
worker1.on("error", (error) => { | ||
throw error; | ||
functionJobs | ||
); | ||
worker1.on("error", (error) => { | ||
throw error; | ||
}); | ||
await worker1.connect(); | ||
await queue.enqueue(specHelper.queue, "jobLockAdd", [1, 2]); | ||
worker1.start(); | ||
}); | ||
await worker1.connect(); | ||
await queue.enqueue(specHelper.queue, "jobLockAdd", [1, 2]); | ||
worker1.start(); | ||
}); | ||
test("will not run 2 jobs with the same args at the same time", async (done) => { | ||
let count = 0; | ||
worker1 = new Worker( | ||
{ | ||
connection: specHelper.cleanConnectionDetails(), | ||
timeout: specHelper.timeout, | ||
queues: [specHelper.queue], | ||
}, | ||
jobs | ||
); | ||
worker2 = new Worker( | ||
{ | ||
connection: specHelper.cleanConnectionDetails(), | ||
timeout: specHelper.timeout, | ||
queues: [specHelper.queue], | ||
}, | ||
jobs | ||
); | ||
test("will not run 2 jobs with the same args at the same time", async () => { | ||
await new Promise(async (resolve) => { | ||
let count = 0; | ||
worker1 = new Worker( | ||
{ | ||
connection: specHelper.cleanConnectionDetails(), | ||
timeout: specHelper.timeout, | ||
queues: [specHelper.queue], | ||
}, | ||
jobs | ||
); | ||
worker2 = new Worker( | ||
{ | ||
connection: specHelper.cleanConnectionDetails(), | ||
timeout: specHelper.timeout, | ||
queues: [specHelper.queue], | ||
}, | ||
jobs | ||
); | ||
worker1.on("error", (error) => { | ||
throw error; | ||
}); | ||
worker2.on("error", (error) => { | ||
throw error; | ||
}); | ||
worker1.on("error", (error) => { | ||
throw error; | ||
}); | ||
worker2.on("error", (error) => { | ||
throw error; | ||
}); | ||
await worker1.connect(); | ||
await worker2.connect(); | ||
await worker1.connect(); | ||
await worker2.connect(); | ||
const onComplete = async () => { | ||
count++; | ||
expect(count).toBe(1); | ||
await worker1.end(); | ||
await worker2.end(); | ||
const onComplete = async () => { | ||
count++; | ||
expect(count).toBe(1); | ||
await worker1.end(); | ||
await worker2.end(); | ||
const timestamps = await queue.timestamps(); | ||
let dealyedJob = await specHelper.redis.lpop( | ||
specHelper.namespace + ":delayed:" + Math.round(timestamps[0] / 1000) | ||
); | ||
expect(dealyedJob).toBeDefined(); | ||
dealyedJob = JSON.parse(dealyedJob); | ||
expect(dealyedJob.class).toBe("slowAdd"); | ||
expect(dealyedJob.args).toEqual([1, 2]); | ||
const timestamps = await queue.timestamps(); | ||
let dealyedJob = await specHelper.redis.lpop( | ||
specHelper.namespace + | ||
":delayed:" + | ||
Math.round(timestamps[0] / 1000) | ||
); | ||
expect(dealyedJob).toBeDefined(); | ||
dealyedJob = JSON.parse(dealyedJob); | ||
expect(dealyedJob.class).toBe("slowAdd"); | ||
expect(dealyedJob.args).toEqual([1, 2]); | ||
done(); | ||
}; | ||
resolve(null); | ||
}; | ||
worker1.on("success", onComplete); | ||
worker2.on("success", onComplete); | ||
worker1.on("success", onComplete); | ||
worker2.on("success", onComplete); | ||
await queue.enqueue(specHelper.queue, "slowAdd", [1, 2]); | ||
await queue.enqueue(specHelper.queue, "slowAdd", [1, 2]); | ||
await queue.enqueue(specHelper.queue, "slowAdd", [1, 2]); | ||
await queue.enqueue(specHelper.queue, "slowAdd", [1, 2]); | ||
worker1.start(); | ||
worker2.start(); | ||
worker1.start(); | ||
worker2.start(); | ||
}); | ||
}); | ||
test("can be configured not to re-enqueue a duplicate task", async (done) => { | ||
let count = 0; | ||
worker1 = new Worker( | ||
{ | ||
connection: specHelper.cleanConnectionDetails(), | ||
timeout: specHelper.timeout, | ||
queues: [specHelper.queue], | ||
}, | ||
jobs | ||
); | ||
worker2 = new Worker( | ||
{ | ||
connection: specHelper.cleanConnectionDetails(), | ||
timeout: specHelper.timeout, | ||
queues: [specHelper.queue], | ||
}, | ||
jobs | ||
); | ||
test("can be configured not to re-enqueue a duplicate task", async () => { | ||
await new Promise(async (resolve) => { | ||
let count = 0; | ||
worker1 = new Worker( | ||
{ | ||
connection: specHelper.cleanConnectionDetails(), | ||
timeout: specHelper.timeout, | ||
queues: [specHelper.queue], | ||
}, | ||
jobs | ||
); | ||
worker2 = new Worker( | ||
{ | ||
connection: specHelper.cleanConnectionDetails(), | ||
timeout: specHelper.timeout, | ||
queues: [specHelper.queue], | ||
}, | ||
jobs | ||
); | ||
worker1.on("error", (error) => { | ||
throw error; | ||
}); | ||
worker2.on("error", (error) => { | ||
throw error; | ||
}); | ||
worker1.on("error", (error) => { | ||
throw error; | ||
}); | ||
worker2.on("error", (error) => { | ||
throw error; | ||
}); | ||
await worker1.connect(); | ||
await worker2.connect(); | ||
await worker1.connect(); | ||
await worker2.connect(); | ||
const onComplete = async () => { | ||
count++; | ||
expect(count).toBe(1); | ||
await worker1.end(); | ||
await worker2.end(); | ||
const onComplete = async () => { | ||
count++; | ||
expect(count).toBe(1); | ||
await worker1.end(); | ||
await worker2.end(); | ||
const timestamps = await queue.timestamps(); | ||
expect(timestamps).toEqual([]); | ||
const timestamps = await queue.timestamps(); | ||
expect(timestamps).toEqual([]); | ||
done(); | ||
}; | ||
resolve(null); | ||
}; | ||
worker1.on("success", onComplete); | ||
worker2.on("success", onComplete); | ||
worker1.on("success", onComplete); | ||
worker2.on("success", onComplete); | ||
await queue.enqueue(specHelper.queue, "withoutReEnqueue"); | ||
await queue.enqueue(specHelper.queue, "withoutReEnqueue"); | ||
await queue.enqueue(specHelper.queue, "withoutReEnqueue"); | ||
await queue.enqueue(specHelper.queue, "withoutReEnqueue"); | ||
worker1.start(); | ||
worker2.start(); | ||
worker1.start(); | ||
worker2.start(); | ||
}); | ||
}); | ||
test("will run 2 jobs with the different args at the same time", async (done) => { | ||
worker1 = new Worker( | ||
{ | ||
connection: specHelper.cleanConnectionDetails(), | ||
timeout: specHelper.timeout, | ||
queues: [specHelper.queue], | ||
}, | ||
jobs | ||
); | ||
worker2 = new Worker( | ||
{ | ||
connection: specHelper.cleanConnectionDetails(), | ||
timeout: specHelper.timeout, | ||
queues: [specHelper.queue], | ||
}, | ||
jobs | ||
); | ||
test("will run 2 jobs with the different args at the same time", async () => { | ||
await new Promise(async (resolve) => { | ||
worker1 = new Worker( | ||
{ | ||
connection: specHelper.cleanConnectionDetails(), | ||
timeout: specHelper.timeout, | ||
queues: [specHelper.queue], | ||
}, | ||
jobs | ||
); | ||
worker2 = new Worker( | ||
{ | ||
connection: specHelper.cleanConnectionDetails(), | ||
timeout: specHelper.timeout, | ||
queues: [specHelper.queue], | ||
}, | ||
jobs | ||
); | ||
worker1.on("error", (error) => { | ||
throw error; | ||
}); | ||
worker2.on("error", (error) => { | ||
throw error; | ||
}); | ||
worker1.on("error", (error) => { | ||
throw error; | ||
}); | ||
worker2.on("error", (error) => { | ||
throw error; | ||
}); | ||
await worker1.connect(); | ||
await worker2.connect(); | ||
await worker1.connect(); | ||
await worker2.connect(); | ||
const startTime = new Date().getTime(); | ||
let completed = 0; | ||
const startTime = new Date().getTime(); | ||
let completed = 0; | ||
const onComplete = async function (q, job, result) { | ||
completed++; | ||
if (completed === 2) { | ||
await worker1.end(); | ||
await worker2.end(); | ||
const delta = new Date().getTime() - startTime; | ||
expect(delta).toBeLessThan(jobDelay * 3); | ||
done(); | ||
} | ||
}; | ||
const onComplete = async function (q, job, result) { | ||
completed++; | ||
if (completed === 2) { | ||
await worker1.end(); | ||
await worker2.end(); | ||
const delta = new Date().getTime() - startTime; | ||
expect(delta).toBeLessThan(jobDelay * 3); | ||
resolve(null); | ||
} | ||
}; | ||
worker1.on("success", onComplete); | ||
worker2.on("success", onComplete); | ||
worker1.on("success", onComplete); | ||
worker2.on("success", onComplete); | ||
await queue.enqueue(specHelper.queue, "slowAdd", [1, 2]); | ||
await queue.enqueue(specHelper.queue, "slowAdd", [3, 4]); | ||
await queue.enqueue(specHelper.queue, "slowAdd", [1, 2]); | ||
await queue.enqueue(specHelper.queue, "slowAdd", [3, 4]); | ||
worker1.start(); | ||
worker2.start(); | ||
worker1.start(); | ||
worker2.start(); | ||
}); | ||
}); | ||
}); | ||
}); |
@@ -72,61 +72,65 @@ import specHelper from "../utils/specHelper"; | ||
test("will work fine with non-crashing jobs", async (done) => { | ||
await queue.enqueue(specHelper.queue, "happyJob", [1, 2]); | ||
const length = await queue.length(specHelper.queue); | ||
expect(length).toBe(1); | ||
test("will work fine with non-crashing jobs", async () => { | ||
await new Promise(async (resolve) => { | ||
await queue.enqueue(specHelper.queue, "happyJob", [1, 2]); | ||
const length = await queue.length(specHelper.queue); | ||
expect(length).toBe(1); | ||
const worker = new Worker( | ||
{ | ||
connection: specHelper.cleanConnectionDetails(), | ||
timeout: specHelper.timeout, | ||
queues: [specHelper.queue], | ||
}, | ||
jobs | ||
); | ||
const worker = new Worker( | ||
{ | ||
connection: specHelper.cleanConnectionDetails(), | ||
timeout: specHelper.timeout, | ||
queues: [specHelper.queue], | ||
}, | ||
jobs | ||
); | ||
worker.on("success", async () => { | ||
expect(loggedErrors.length).toBe(0); | ||
const length = await specHelper.redis.llen("resque_test:failed"); | ||
expect(length).toBe(0); | ||
await worker.end(); | ||
done(); | ||
}); | ||
worker.on("success", async () => { | ||
expect(loggedErrors.length).toBe(0); | ||
const length = await specHelper.redis.llen("resque_test:failed"); | ||
expect(length).toBe(0); | ||
await worker.end(); | ||
resolve(null); | ||
}); | ||
worker.on("failure", () => { | ||
throw new Error("should never get here"); | ||
worker.on("failure", () => { | ||
throw new Error("should never get here"); | ||
}); | ||
await worker.connect(); | ||
await worker.start(); | ||
}); | ||
await worker.connect(); | ||
await worker.start(); | ||
}); | ||
test("will prevent any failed jobs from ending in the failed queue", async (done) => { | ||
await queue.enqueue(specHelper.queue, "brokenJob", [1, 2]); | ||
const length = await queue.length(specHelper.queue); | ||
expect(length).toBe(1); | ||
test("will prevent any failed jobs from ending in the failed queue", async () => { | ||
await new Promise(async (resolve) => { | ||
await queue.enqueue(specHelper.queue, "brokenJob", [1, 2]); | ||
const length = await queue.length(specHelper.queue); | ||
expect(length).toBe(1); | ||
const worker = new Worker( | ||
{ | ||
connection: specHelper.cleanConnectionDetails(), | ||
timeout: specHelper.timeout, | ||
queues: [specHelper.queue], | ||
}, | ||
jobs | ||
); | ||
const worker = new Worker( | ||
{ | ||
connection: specHelper.cleanConnectionDetails(), | ||
timeout: specHelper.timeout, | ||
queues: [specHelper.queue], | ||
}, | ||
jobs | ||
); | ||
worker.on("success", async () => { | ||
expect(loggedErrors.length).toBe(1); | ||
const length = await specHelper.redis.llen("resque_test:failed"); | ||
expect(length).toBe(0); | ||
await worker.end(); | ||
done(); | ||
}); | ||
worker.on("success", async () => { | ||
expect(loggedErrors.length).toBe(1); | ||
const length = await specHelper.redis.llen("resque_test:failed"); | ||
expect(length).toBe(0); | ||
await worker.end(); | ||
resolve(null); | ||
}); | ||
await worker.connect(); | ||
worker.on("failure", () => { | ||
throw new Error("should never get here"); | ||
await worker.connect(); | ||
worker.on("failure", () => { | ||
throw new Error("should never get here"); | ||
}); | ||
worker.start(); | ||
}); | ||
worker.start(); | ||
}); | ||
}); | ||
}); |
@@ -65,286 +65,300 @@ import specHelper from "../utils/specHelper"; | ||
test("will work fine with non-crashing jobs", async (done) => { | ||
await queue.enqueue(specHelper.queue, "happyJob", [1, 2]); | ||
const length = await queue.length(specHelper.queue); | ||
expect(length).toBe(1); | ||
test("will work fine with non-crashing jobs", async () => { | ||
await new Promise(async (resolve) => { | ||
await queue.enqueue(specHelper.queue, "happyJob", [1, 2]); | ||
const length = await queue.length(specHelper.queue); | ||
expect(length).toBe(1); | ||
const worker = new Worker( | ||
{ | ||
connection: specHelper.cleanConnectionDetails(), | ||
timeout: specHelper.timeout, | ||
queues: [specHelper.queue], | ||
}, | ||
jobs | ||
); | ||
const worker = new Worker( | ||
{ | ||
connection: specHelper.cleanConnectionDetails(), | ||
timeout: specHelper.timeout, | ||
queues: [specHelper.queue], | ||
}, | ||
jobs | ||
); | ||
worker.on("failure", () => { | ||
throw new Error("should not get here"); | ||
}); | ||
worker.on("failure", () => { | ||
throw new Error("should not get here"); | ||
}); | ||
await worker.connect(); | ||
await worker.connect(); | ||
worker.on("success", async () => { | ||
const length = await specHelper.redis.llen( | ||
`${specHelper.namespace}:failed` | ||
); | ||
expect(length).toBe(0); | ||
await worker.end(); | ||
done(); | ||
worker.on("success", async () => { | ||
const length = await specHelper.redis.llen( | ||
`${specHelper.namespace}:failed` | ||
); | ||
expect(length).toBe(0); | ||
await worker.end(); | ||
resolve(null); | ||
}); | ||
worker.start(); | ||
}); | ||
worker.start(); | ||
}); | ||
test("will retry the job n times before finally failing", async (done) => { | ||
await queue.enqueue(specHelper.queue, "brokenJob"); | ||
const length = await queue.length(specHelper.queue); | ||
expect(length).toBe(1); | ||
test("will retry the job n times before finally failing", async () => { | ||
await new Promise(async (resolve) => { | ||
await queue.enqueue(specHelper.queue, "brokenJob"); | ||
const length = await queue.length(specHelper.queue); | ||
expect(length).toBe(1); | ||
let failButRetryCount = 0; | ||
let failureCount = 0; | ||
let failButRetryCount = 0; | ||
let failureCount = 0; | ||
const worker = new Worker( | ||
{ | ||
connection: specHelper.cleanConnectionDetails(), | ||
timeout: specHelper.timeout, | ||
queues: [specHelper.queue], | ||
}, | ||
jobs | ||
); | ||
const worker = new Worker( | ||
{ | ||
connection: specHelper.cleanConnectionDetails(), | ||
timeout: specHelper.timeout, | ||
queues: [specHelper.queue], | ||
}, | ||
jobs | ||
); | ||
worker.on("success", () => { | ||
failButRetryCount++; | ||
}); | ||
worker.on("success", () => { | ||
failButRetryCount++; | ||
}); | ||
await worker.connect(); | ||
await worker.connect(); | ||
worker.on("failure", async () => { | ||
failureCount++; | ||
expect(failButRetryCount).toBe(2); | ||
expect(failureCount).toBe(1); | ||
expect(failButRetryCount + failureCount).toBe(3); | ||
worker.on("failure", async () => { | ||
failureCount++; | ||
expect(failButRetryCount).toBe(2); | ||
expect(failureCount).toBe(1); | ||
expect(failButRetryCount + failureCount).toBe(3); | ||
const length = await specHelper.redis.llen( | ||
`${specHelper.namespace}:failed` | ||
); | ||
expect(length).toBe(1); | ||
await worker.end(); | ||
done(); | ||
const length = await specHelper.redis.llen( | ||
`${specHelper.namespace}:failed` | ||
); | ||
expect(length).toBe(1); | ||
await worker.end(); | ||
resolve(null); | ||
}); | ||
worker.start(); | ||
}); | ||
worker.start(); | ||
}); | ||
test("can have a retry count set", async (done) => { | ||
const customJobs = { | ||
jobWithRetryCount: { | ||
plugins: [Plugins.Retry], | ||
pluginOptions: { | ||
Retry: { | ||
retryLimit: 5, | ||
retryDelay: 100, | ||
test("can have a retry count set", async () => { | ||
await new Promise(async (resolve) => { | ||
const customJobs = { | ||
jobWithRetryCount: { | ||
plugins: [Plugins.Retry], | ||
pluginOptions: { | ||
Retry: { | ||
retryLimit: 5, | ||
retryDelay: 100, | ||
}, | ||
}, | ||
perform: () => { | ||
throw new Error("BUSTED"); | ||
}, | ||
}, | ||
perform: () => { | ||
throw new Error("BUSTED"); | ||
}, | ||
}, | ||
}; | ||
}; | ||
await queue.enqueue(specHelper.queue, "jobWithRetryCount", [1, 2]); | ||
const length = await queue.length(specHelper.queue); | ||
expect(length).toBe(1); | ||
await queue.enqueue(specHelper.queue, "jobWithRetryCount", [1, 2]); | ||
const length = await queue.length(specHelper.queue); | ||
expect(length).toBe(1); | ||
let failButRetryCount = 0; | ||
let failureCount = 0; | ||
let failButRetryCount = 0; | ||
let failureCount = 0; | ||
const worker = new Worker( | ||
{ | ||
connection: specHelper.cleanConnectionDetails(), | ||
timeout: specHelper.timeout, | ||
queues: [specHelper.queue], | ||
}, | ||
customJobs | ||
); | ||
const worker = new Worker( | ||
{ | ||
connection: specHelper.cleanConnectionDetails(), | ||
timeout: specHelper.timeout, | ||
queues: [specHelper.queue], | ||
}, | ||
customJobs | ||
); | ||
worker.on("success", () => { | ||
failButRetryCount++; | ||
}); | ||
worker.on("success", () => { | ||
failButRetryCount++; | ||
}); | ||
await worker.connect(); | ||
await worker.connect(); | ||
worker.on("failure", async () => { | ||
failureCount++; | ||
expect(failButRetryCount).toBe(4); | ||
expect(failureCount).toBe(1); | ||
expect(failButRetryCount + failureCount).toBe(5); | ||
worker.on("failure", async () => { | ||
failureCount++; | ||
expect(failButRetryCount).toBe(4); | ||
expect(failureCount).toBe(1); | ||
expect(failButRetryCount + failureCount).toBe(5); | ||
const length = await specHelper.redis.llen( | ||
`${specHelper.namespace}:failed` | ||
); | ||
expect(length).toBe(1); | ||
await worker.end(); | ||
done(); | ||
const length = await specHelper.redis.llen( | ||
`${specHelper.namespace}:failed` | ||
); | ||
expect(length).toBe(1); | ||
await worker.end(); | ||
resolve(null); | ||
}); | ||
worker.start(); | ||
}); | ||
worker.start(); | ||
}); | ||
test("can have custom retry times set", async (done) => { | ||
const customJobs = { | ||
jobWithBackoffStrategy: { | ||
plugins: [Plugins.Retry], | ||
pluginOptions: { | ||
Retry: { | ||
retryLimit: 5, | ||
backoffStrategy: [1, 2, 3, 4, 5], | ||
test("can have custom retry times set", async () => { | ||
await new Promise(async (resolve) => { | ||
const customJobs = { | ||
jobWithBackoffStrategy: { | ||
plugins: [Plugins.Retry], | ||
pluginOptions: { | ||
Retry: { | ||
retryLimit: 5, | ||
backoffStrategy: [1, 2, 3, 4, 5], | ||
}, | ||
}, | ||
perform: function (a, b, callback) { | ||
callback(new Error("BUSTED"), null); | ||
}, | ||
}, | ||
perform: function (a, b, callback) { | ||
callback(new Error("BUSTED"), null); | ||
}, | ||
}, | ||
}; | ||
}; | ||
await queue.enqueue(specHelper.queue, "jobWithBackoffStrategy", [1, 2]); | ||
const length = await queue.length(specHelper.queue); | ||
expect(length).toBe(1); | ||
await queue.enqueue(specHelper.queue, "jobWithBackoffStrategy", [1, 2]); | ||
const length = await queue.length(specHelper.queue); | ||
expect(length).toBe(1); | ||
let failButRetryCount = 0; | ||
let failureCount = 0; | ||
let failButRetryCount = 0; | ||
let failureCount = 0; | ||
const worker = new Worker( | ||
{ | ||
connection: specHelper.cleanConnectionDetails(), | ||
timeout: specHelper.timeout, | ||
queues: [specHelper.queue], | ||
}, | ||
customJobs | ||
); | ||
const worker = new Worker( | ||
{ | ||
connection: specHelper.cleanConnectionDetails(), | ||
timeout: specHelper.timeout, | ||
queues: [specHelper.queue], | ||
}, | ||
customJobs | ||
); | ||
worker.on("success", () => { | ||
failButRetryCount++; | ||
}); | ||
worker.on("success", () => { | ||
failButRetryCount++; | ||
}); | ||
await worker.connect(); | ||
await worker.connect(); | ||
worker.on("failure", async () => { | ||
failureCount++; | ||
expect(failButRetryCount).toBe(4); | ||
expect(failureCount).toBe(1); | ||
expect(failButRetryCount + failureCount).toBe(5); | ||
worker.on("failure", async () => { | ||
failureCount++; | ||
expect(failButRetryCount).toBe(4); | ||
expect(failureCount).toBe(1); | ||
expect(failButRetryCount + failureCount).toBe(5); | ||
const length = await specHelper.redis.llen( | ||
`${specHelper.namespace}:failed` | ||
); | ||
expect(length).toBe(1); | ||
await worker.end(); | ||
done(); | ||
const length = await specHelper.redis.llen( | ||
`${specHelper.namespace}:failed` | ||
); | ||
expect(length).toBe(1); | ||
await worker.end(); | ||
resolve(null); | ||
}); | ||
worker.start(); | ||
}); | ||
worker.start(); | ||
}); | ||
test("when a job fails it should be re-enqueued (and not go to the failure queue)", async (done) => { | ||
await queue.enqueue(specHelper.queue, "brokenJob", [1, 2]); | ||
test("when a job fails it should be re-enqueued (and not go to the failure queue)", async () => { | ||
await new Promise(async (resolve) => { | ||
await queue.enqueue(specHelper.queue, "brokenJob", [1, 2]); | ||
const worker = new Worker( | ||
{ | ||
connection: specHelper.cleanConnectionDetails(), | ||
timeout: specHelper.timeout, | ||
queues: [specHelper.queue], | ||
}, | ||
jobs | ||
); | ||
const worker = new Worker( | ||
{ | ||
connection: specHelper.cleanConnectionDetails(), | ||
timeout: specHelper.timeout, | ||
queues: [specHelper.queue], | ||
}, | ||
jobs | ||
); | ||
await worker.connect(); | ||
worker.on("success", async () => { | ||
const timestamps = await queue.scheduledAt( | ||
specHelper.queue, | ||
"brokenJob", | ||
[1, 2] | ||
); | ||
expect(timestamps.length).toBe(1); | ||
const length = await specHelper.redis.llen( | ||
`${specHelper.namespace}:failed` | ||
); | ||
expect(length).toBe(0); | ||
await worker.end(); | ||
done(); | ||
await worker.connect(); | ||
worker.on("success", async () => { | ||
const timestamps = await queue.scheduledAt( | ||
specHelper.queue, | ||
"brokenJob", | ||
[1, 2] | ||
); | ||
expect(timestamps.length).toBe(1); | ||
const length = await specHelper.redis.llen( | ||
`${specHelper.namespace}:failed` | ||
); | ||
expect(length).toBe(0); | ||
await worker.end(); | ||
resolve(null); | ||
}); | ||
worker.start(); | ||
}); | ||
worker.start(); | ||
}); | ||
test("will handle the stats properly for failing jobs", async (done) => { | ||
await queue.enqueue(specHelper.queue, "brokenJob", [1, 2]); | ||
test("will handle the stats properly for failing jobs", async () => { | ||
await new Promise(async (resolve) => { | ||
await queue.enqueue(specHelper.queue, "brokenJob", [1, 2]); | ||
const worker = new Worker( | ||
{ | ||
connection: specHelper.cleanConnectionDetails(), | ||
timeout: specHelper.timeout, | ||
queues: [specHelper.queue], | ||
}, | ||
jobs | ||
); | ||
const worker = new Worker( | ||
{ | ||
connection: specHelper.cleanConnectionDetails(), | ||
timeout: specHelper.timeout, | ||
queues: [specHelper.queue], | ||
}, | ||
jobs | ||
); | ||
await worker.connect(); | ||
await worker.connect(); | ||
worker.on("success", async () => { | ||
const globalProcessed = await specHelper.redis.get( | ||
`${specHelper.namespace}:stat:processed` | ||
); | ||
const globalFailed = await specHelper.redis.get( | ||
`${specHelper.namespace}:stat:failed` | ||
); | ||
const workerProcessed = await specHelper.redis.get( | ||
`${specHelper.namespace}:stat:processed:${worker.name}` | ||
); | ||
const workerFailed = await specHelper.redis.get( | ||
`${specHelper.namespace}:stat:failed:${worker.name}` | ||
); | ||
expect(String(globalProcessed)).toBe("0"); | ||
expect(String(globalFailed)).toBe("1"); | ||
expect(String(workerProcessed)).toBe("0"); | ||
expect(String(workerFailed)).toBe("1"); | ||
await worker.end(); | ||
done(); | ||
worker.on("success", async () => { | ||
const globalProcessed = await specHelper.redis.get( | ||
`${specHelper.namespace}:stat:processed` | ||
); | ||
const globalFailed = await specHelper.redis.get( | ||
`${specHelper.namespace}:stat:failed` | ||
); | ||
const workerProcessed = await specHelper.redis.get( | ||
`${specHelper.namespace}:stat:processed:${worker.name}` | ||
); | ||
const workerFailed = await specHelper.redis.get( | ||
`${specHelper.namespace}:stat:failed:${worker.name}` | ||
); | ||
expect(String(globalProcessed)).toBe("0"); | ||
expect(String(globalFailed)).toBe("1"); | ||
expect(String(workerProcessed)).toBe("0"); | ||
expect(String(workerFailed)).toBe("1"); | ||
await worker.end(); | ||
resolve(null); | ||
}); | ||
worker.start(); | ||
}); | ||
worker.start(); | ||
}); | ||
test("will set the retry counter & retry data", async (done) => { | ||
await queue.enqueue(specHelper.queue, "brokenJob", [1, 2]); | ||
test("will set the retry counter & retry data", async () => { | ||
await new Promise(async (resolve) => { | ||
await queue.enqueue(specHelper.queue, "brokenJob", [1, 2]); | ||
const worker = new Worker( | ||
{ | ||
connection: specHelper.cleanConnectionDetails(), | ||
timeout: specHelper.timeout, | ||
queues: [specHelper.queue], | ||
}, | ||
jobs | ||
); | ||
const worker = new Worker( | ||
{ | ||
connection: specHelper.cleanConnectionDetails(), | ||
timeout: specHelper.timeout, | ||
queues: [specHelper.queue], | ||
}, | ||
jobs | ||
); | ||
await worker.connect(); | ||
worker.on("success", async () => { | ||
const retryAttempts = await specHelper.redis.get( | ||
`${specHelper.namespace}:resque-retry:brokenJob:1-2` | ||
); | ||
let failureData = await specHelper.redis.get( | ||
`${specHelper.namespace}:failure-resque-retry:brokenJob:1-2` | ||
); | ||
expect(String(retryAttempts)).toBe("0"); | ||
failureData = JSON.parse(failureData); | ||
expect(failureData.payload).toEqual([1, 2]); | ||
expect(failureData.exception).toBe("Error: BUSTED"); | ||
expect(failureData.worker).toBe("brokenJob"); | ||
expect(failureData.queue).toBe("test_queue"); | ||
await worker.end(); | ||
done(); | ||
await worker.connect(); | ||
worker.on("success", async () => { | ||
const retryAttempts = await specHelper.redis.get( | ||
`${specHelper.namespace}:resque-retry:brokenJob:1-2` | ||
); | ||
let failureData = await specHelper.redis.get( | ||
`${specHelper.namespace}:failure-resque-retry:brokenJob:1-2` | ||
); | ||
expect(String(retryAttempts)).toBe("0"); | ||
failureData = JSON.parse(failureData); | ||
expect(failureData.payload).toEqual([1, 2]); | ||
expect(failureData.exception).toBe("Error: BUSTED"); | ||
expect(failureData.worker).toBe("brokenJob"); | ||
expect(failureData.queue).toBe("test_queue"); | ||
await worker.end(); | ||
resolve(null); | ||
}); | ||
worker.start(); | ||
}); | ||
worker.start(); | ||
}); | ||
}); | ||
}); |
@@ -17,3 +17,3 @@ import * as Redis from "ioredis"; | ||
pkg: pkg, | ||
host: "127.0.0.1", | ||
host: process.env.REDIS_HOST || "127.0.0.1", | ||
password: "", | ||
@@ -20,0 +20,0 @@ port: 6379, |
@@ -20,2 +20,4 @@ /// <reference types="node" /> | ||
working: boolean; | ||
pollTimer: NodeJS.Timeout; | ||
endTimer: NodeJS.Timeout; | ||
pingTimer: NodeJS.Timeout; | ||
@@ -22,0 +24,0 @@ job: Job<any>; |
@@ -42,2 +42,4 @@ "use strict"; | ||
this.job = null; | ||
this.pollTimer = null; | ||
this.endTimer = null; | ||
this.pingTimer = null; | ||
@@ -73,3 +75,3 @@ this.started = false; | ||
await new Promise((resolve) => { | ||
setTimeout(() => { | ||
this.endTimer = setTimeout(() => { | ||
resolve(null); | ||
@@ -80,2 +82,5 @@ }, this.options.timeout); | ||
} | ||
clearTimeout(this.pollTimer); | ||
clearTimeout(this.endTimer); | ||
clearInterval(this.pingTimer); | ||
if (this.connection && | ||
@@ -85,3 +90,2 @@ (this.connection.connected === true || | ||
this.connection.connected === null)) { | ||
clearInterval(this.pingTimer); | ||
await this.untrack(); | ||
@@ -250,3 +254,3 @@ } | ||
await new Promise((resolve) => { | ||
setTimeout(() => { | ||
this.pollTimer = setTimeout(() => { | ||
this.poll(); | ||
@@ -285,2 +289,4 @@ resolve(null); | ||
async ping() { | ||
if (!this.running) | ||
return; | ||
const name = this.name; | ||
@@ -287,0 +293,0 @@ const nowSeconds = Math.round(new Date().getTime() / 1000); |
@@ -6,3 +6,3 @@ { | ||
"license": "Apache-2.0", | ||
"version": "9.0.2", | ||
"version": "9.0.3", | ||
"homepage": "http://github.com/actionhero/node-resque", | ||
@@ -30,3 +30,3 @@ "repository": { | ||
"dependencies": { | ||
"ioredis": "^4.27.2" | ||
"ioredis": "^4.27.6" | ||
}, | ||
@@ -36,20 +36,12 @@ "devDependencies": { | ||
"@types/jest": "^26.0.23", | ||
"@types/node": "^15.3.0", | ||
"ioredis-mock": "^5.5.6", | ||
"jest": "^26.6.3", | ||
"@types/node": "^16.0.0", | ||
"ioredis-mock": "^5.6.0", | ||
"jest": "^27.0.6", | ||
"node-schedule": "^2.0.0", | ||
"prettier": "^2.3.0", | ||
"ts-jest": "^26.5.6", | ||
"prettier": "^2.3.2", | ||
"ts-jest": "^27.0.3", | ||
"ts-node": "^10.0.0", | ||
"typedoc": "^0.20.36", | ||
"typescript": "^4.2.4" | ||
"typedoc": "^0.21.2", | ||
"typescript": "^4.3.5" | ||
}, | ||
"jest": { | ||
"testPathIgnorePatterns": [ | ||
"<rootDir>/__tests__/utils" | ||
], | ||
"transform": { | ||
"^.+\\.ts?$": "ts-jest" | ||
} | ||
}, | ||
"scripts": { | ||
@@ -56,0 +48,0 @@ "prepare": "npm run build && npm run docs", |
@@ -9,3 +9,3 @@ # node-resque: The best background jobs in node. | ||
[![CircleCI](https://circleci.com/gh/actionhero/node-resque.svg?style=svg)](https://circleci.com/gh/actionhero/node-resque) | ||
![Test](https://github.com/actionhero/node-resque/workflows/Test/badge.svg) | ||
@@ -12,0 +12,0 @@ ## The Resque Factory (How It Works) |
@@ -30,2 +30,4 @@ import { EventEmitter } from "events"; | ||
working: boolean; | ||
pollTimer: NodeJS.Timeout; | ||
endTimer: NodeJS.Timeout; | ||
pingTimer: NodeJS.Timeout; | ||
@@ -133,2 +135,4 @@ job: Job<any>; | ||
this.job = null; | ||
this.pollTimer = null; | ||
this.endTimer = null; | ||
this.pingTimer = null; | ||
@@ -173,3 +177,3 @@ this.started = false; | ||
await new Promise((resolve) => { | ||
setTimeout(() => { | ||
this.endTimer = setTimeout(() => { | ||
resolve(null); | ||
@@ -181,2 +185,6 @@ }, this.options.timeout); | ||
clearTimeout(this.pollTimer); | ||
clearTimeout(this.endTimer); | ||
clearInterval(this.pingTimer); | ||
if ( | ||
@@ -188,3 +196,2 @@ this.connection && | ||
) { | ||
clearInterval(this.pingTimer); | ||
await this.untrack(); | ||
@@ -413,3 +420,3 @@ } | ||
await new Promise((resolve) => { | ||
setTimeout(() => { | ||
this.pollTimer = setTimeout(() => { | ||
this.poll(); | ||
@@ -469,2 +476,4 @@ resolve(null); | ||
private async ping() { | ||
if (!this.running) return; | ||
const name = this.name; | ||
@@ -471,0 +480,0 @@ const nowSeconds = Math.round(new Date().getTime() / 1000); |
Sorry, the diff of this file is not supported yet
688880
116
8703
Updatedioredis@^4.27.6