Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

node-resque

Package Overview
Dependencies
Maintainers
4
Versions
181
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

node-resque - npm Package Compare versions

Comparing version 9.0.2 to 9.0.3

__tests__/core/connectionError.ts

51

__tests__/core/connection.ts

@@ -16,27 +16,2 @@ import * as Ioredis from "ioredis";

test(
"can provide an error if connection failed",
async (resolve) => {
const connectionDetails = {
pkg: specHelper.connectionDetails.pkg,
host: "wrong-hostname",
password: specHelper.connectionDetails.password,
port: specHelper.connectionDetails.port,
database: specHelper.connectionDetails.database,
namespace: specHelper.connectionDetails.namespace,
};
const connection = new Connection(connectionDetails);
connection.connect();
connection.on("error", (error) => {
expect(error.message).toMatch(/ENOTFOUND|ETIMEDOUT|ECONNREFUSED/);
connection.end();
resolve();
});
},
30 * 1000
);
test("should stat with no redis keys in the namespace", async () => {

@@ -84,17 +59,15 @@ const keys = await specHelper.redis.keys(specHelper.namespace + "*");

// seed the DB with keys to test with
await Promise.all(
new Array(25)
.fill(0)
.map((v, i) => i + 1)
.map(async (v) => {
await connection.redis.set(`test-key${v}`, v.toString());
if (v <= 5) {
await connection.redis.set(`test-not-key${v}`, v.toString());
}
})
);
for (const v of new Array(5).fill(0).map((v, i) => i + 1)) {
await connection.redis.set(`test-key${v}`, v.toString());
await connection.redis.set(`test-not-key${v}`, v.toString());
}
await connection.redis.set(`test-key2`, 2);
await connection.redis.set(`test-key3`, 3);
await connection.redis.set(`test-key4`, 4);
await connection.redis.set(`test-key5`, 5);
// sanity checks to confirm keys above are set and exist
expect(await connection.redis.get("test-key1")).toBe("1");
expect(await connection.redis.get("test-key20")).toBe("20");
expect(await connection.redis.get("test-not-key1")).toBe("1");

@@ -105,7 +78,5 @@ expect(await connection.redis.get("test-not-key5")).toBe("5");

expect(foundKeys.length).toBe(25);
expect(foundKeys.length).toBe(5);
expect(foundKeys).toContain("test-key1");
expect(foundKeys).toContain("test-key5");
expect(foundKeys).toContain("test-key20");
expect(foundKeys).toContain("test-key25");
expect(foundKeys).not.toContain("test-key50");

@@ -112,0 +83,0 @@ expect(foundKeys).not.toContain("test-not-key1");

@@ -132,17 +132,22 @@ import specHelper from "../utils/specHelper";

test("should pass on all worker emits to the instance of multiWorker", async (resolve) => {
await queue.enqueue(specHelper.queue, "missingJob", []);
test("should pass on all worker emits to the instance of multiWorker", async () => {
await new Promise(async (resolve) => {
await queue.enqueue(specHelper.queue, "missingJob", []);
multiWorker.start();
multiWorker.start();
multiWorker.on("failure", async (workerId, queue, job, error, duration) => {
expect(String(error)).toBe(
'Error: No job defined for class "missingJob"'
multiWorker.on(
"failure",
async (workerId, queue, job, error, duration) => {
expect(String(error)).toBe(
'Error: No job defined for class "missingJob"'
);
expect(duration).toBeGreaterThanOrEqual(0);
multiWorker.removeAllListeners("error");
await multiWorker.end();
resolve(null);
}
);
expect(duration).toBeGreaterThanOrEqual(0);
multiWorker.removeAllListeners("error");
await multiWorker.end();
resolve();
});
});
});

@@ -19,30 +19,2 @@ import { Queue, Worker } from "../../src";

test(
"can provide an error if connection failed",
async (resolve) => {
const connectionDetails = {
pkg: specHelper.connectionDetails.pkg,
host: "wronghostname",
password: specHelper.connectionDetails.password,
port: specHelper.connectionDetails.port,
database: specHelper.connectionDetails.database,
namespace: specHelper.connectionDetails.namespace,
};
queue = new Queue(
{ connection: connectionDetails, queue: specHelper.queue },
{}
);
queue.connect();
queue.on("error", (error) => {
expect(error.message).toMatch(/ENOTFOUND|ETIMEDOUT|ECONNREFUSED/);
queue.end();
resolve();
});
},
30 * 1000
);
describe("[with connection]", () => {

@@ -584,107 +556,117 @@ beforeAll(async () => {

test("we can see what workers are working on (active)", async (resolve) => {
queue.enqueue(specHelper.queue, "slowJob");
workerA.start();
test("we can see what workers are working on (active)", async () => {
await new Promise(async (resolve) => {
queue.enqueue(specHelper.queue, "slowJob");
workerA.start();
workerA.on("job", async () => {
workerA.removeAllListeners("job");
workerA.on("job", async () => {
workerA.removeAllListeners("job");
const data = await queue.allWorkingOn();
expect(data).toHaveProperty("workerB", "started");
const paylaod = data.workerA.payload;
expect(paylaod.queue).toBe("test_queue");
expect(paylaod.class).toBe("slowJob");
const data = await queue.allWorkingOn();
expect(data).toHaveProperty("workerB", "started");
const paylaod = data.workerA.payload;
expect(paylaod.queue).toBe("test_queue");
expect(paylaod.class).toBe("slowJob");
return resolve();
return resolve(null);
});
});
});
test("can remove stuck workers and re-enqueue their jobs", async (resolve) => {
const age = 1;
await queue.enqueue(specHelper.queue, "slowJob", [{ a: 1 }]);
await workerA.start();
test("can remove stuck workers and re-enqueue their jobs", async () => {
await new Promise(async (resolve) => {
const age = 1;
await queue.enqueue(specHelper.queue, "slowJob", [{ a: 1 }]);
await workerA.start();
// hijack a worker in the middle of working on a job
workerA.on("job", async () => {
workerA.removeAllListeners("job");
// hijack a worker in the middle of working on a job
workerA.on("job", async () => {
workerA.removeAllListeners("job");
const workingOnData = await queue.allWorkingOn();
const paylaod = workingOnData.workerA.payload;
expect(paylaod.queue).toBe("test_queue");
expect(paylaod.class).toBe("slowJob");
expect(paylaod.args[0].a).toBe(1);
const workingOnData = await queue.allWorkingOn();
const paylaod = workingOnData.workerA.payload;
expect(paylaod.queue).toBe("test_queue");
expect(paylaod.class).toBe("slowJob");
expect(paylaod.args[0].a).toBe(1);
const runAt = Date.parse(workingOnData.workerA.run_at);
const now = new Date().getTime();
expect(runAt).toBeGreaterThanOrEqual(now - 1001);
expect(runAt).toBeLessThanOrEqual(now);
const runAt = Date.parse(workingOnData.workerA.run_at);
const now = new Date().getTime();
expect(runAt).toBeGreaterThanOrEqual(now - 1001);
expect(runAt).toBeLessThanOrEqual(now);
const cleanData = await queue.cleanOldWorkers(age);
expect(Object.keys(cleanData).length).toBe(1);
expect(cleanData.workerA.queue).toBe("test_queue");
expect(cleanData.workerA.worker).toBe("workerA");
expect(cleanData.workerA.payload.class).toBe("slowJob");
expect(cleanData.workerA.payload.args[0].a).toBe(1);
const cleanData = await queue.cleanOldWorkers(age);
expect(Object.keys(cleanData).length).toBe(1);
expect(cleanData.workerA.queue).toBe("test_queue");
expect(cleanData.workerA.worker).toBe("workerA");
expect(cleanData.workerA.payload.class).toBe("slowJob");
expect(cleanData.workerA.payload.args[0].a).toBe(1);
let failedData = await specHelper.redis.rpop(
specHelper.namespace + ":" + "failed"
);
failedData = JSON.parse(failedData);
expect(failedData.queue).toBe(specHelper.queue);
expect(failedData.exception).toBe("Worker Timeout (killed manually)");
expect(failedData.error).toBe("Worker Timeout (killed manually)");
expect(failedData.payload.class).toBe("slowJob");
expect(failedData.payload.args[0].a).toBe(1);
let failedData = await specHelper.redis.rpop(
specHelper.namespace + ":" + "failed"
);
failedData = JSON.parse(failedData);
expect(failedData.queue).toBe(specHelper.queue);
expect(failedData.exception).toBe(
"Worker Timeout (killed manually)"
);
expect(failedData.error).toBe("Worker Timeout (killed manually)");
expect(failedData.payload.class).toBe("slowJob");
expect(failedData.payload.args[0].a).toBe(1);
const workingOnDataAgain = await queue.allWorkingOn();
expect(Object.keys(workingOnDataAgain).length).toBe(1);
expect(workingOnDataAgain.workerB).toBe("started");
const workingOnDataAgain = await queue.allWorkingOn();
expect(Object.keys(workingOnDataAgain).length).toBe(1);
expect(workingOnDataAgain.workerB).toBe("started");
return resolve();
return resolve(null);
});
});
});
test("will not remove stuck jobs within the time limit", async (resolve) => {
const age = 999;
queue.enqueue(specHelper.queue, "slowJob");
workerA.start();
test("will not remove stuck jobs within the time limit", async () => {
await new Promise(async (resolve) => {
const age = 999;
queue.enqueue(specHelper.queue, "slowJob");
workerA.start();
// hijack a worker in the middle of working on a job
workerA.on("job", async () => {
workerA.removeAllListeners("job");
// hijack a worker in the middle of working on a job
workerA.on("job", async () => {
workerA.removeAllListeners("job");
const data = await queue.cleanOldWorkers(age);
expect(Object.keys(data).length).toBe(0);
const data = await queue.cleanOldWorkers(age);
expect(Object.keys(data).length).toBe(0);
const workingOn = await queue.allWorkingOn();
const paylaod = workingOn.workerA.payload;
expect(paylaod.queue).toBe("test_queue");
expect(paylaod.class).toBe("slowJob");
const workingOn = await queue.allWorkingOn();
const paylaod = workingOn.workerA.payload;
expect(paylaod.queue).toBe("test_queue");
expect(paylaod.class).toBe("slowJob");
return resolve();
return resolve(null);
});
});
});
test("can forceClean a worker, returning the error payload", async (resolve) => {
queue.enqueue(specHelper.queue, "slowJob");
workerA.start();
test("can forceClean a worker, returning the error payload", async () => {
await new Promise(async (resolve) => {
queue.enqueue(specHelper.queue, "slowJob");
workerA.start();
// hijack a worker in the middle of working on a job
workerA.on("job", async () => {
workerA.removeAllListeners("job");
// hijack a worker in the middle of working on a job
workerA.on("job", async () => {
workerA.removeAllListeners("job");
const errorPayload = await queue.forceCleanWorker(workerA.name);
const errorPayload = await queue.forceCleanWorker(workerA.name);
expect(errorPayload.worker).toBe("workerA");
expect(errorPayload.queue).toBe("test_queue");
expect(errorPayload.payload.class).toBe("slowJob");
expect(errorPayload.exception).toBe(
"Worker Timeout (killed manually)"
);
expect(errorPayload.backtrace[0]).toMatch(/killed by/);
expect(errorPayload.backtrace[1]).toBe("queue#forceCleanWorker");
expect(errorPayload.backtrace[2]).toBe("node-resque");
expect(errorPayload.failed_at).toBeTruthy();
expect(errorPayload.worker).toBe("workerA");
expect(errorPayload.queue).toBe("test_queue");
expect(errorPayload.payload.class).toBe("slowJob");
expect(errorPayload.exception).toBe(
"Worker Timeout (killed manually)"
);
expect(errorPayload.backtrace[0]).toMatch(/killed by/);
expect(errorPayload.backtrace[1]).toBe("queue#forceCleanWorker");
expect(errorPayload.backtrace[2]).toBe("node-resque");
expect(errorPayload.failed_at).toBeTruthy();
return resolve();
return resolve(null);
});
});

@@ -711,20 +693,22 @@ });

test("retryStuckJobs", async (resolve) => {
queue.enqueue(specHelper.queue, "slowJob");
workerA.start();
test("retryStuckJobs", async () => {
await new Promise(async (resolve) => {
queue.enqueue(specHelper.queue, "slowJob");
workerA.start();
// hijack a worker in the middle of working on a job
workerA.on("job", async () => {
workerA.removeAllListeners("job");
// hijack a worker in the middle of working on a job
workerA.on("job", async () => {
workerA.removeAllListeners("job");
await queue.forceCleanWorker(workerA.name);
let failedJobs = await queue.failed(0, 100);
expect(failedJobs.length).toBe(1);
await queue.forceCleanWorker(workerA.name);
let failedJobs = await queue.failed(0, 100);
expect(failedJobs.length).toBe(1);
await queue.retryStuckJobs();
await queue.retryStuckJobs();
failedJobs = await queue.failed(0, 100);
expect(failedJobs.length).toBe(0);
failedJobs = await queue.failed(0, 100);
expect(failedJobs.length).toBe(0);
return resolve();
return resolve(null);
});
});

@@ -731,0 +715,0 @@ });

@@ -18,44 +18,5 @@ import { Queue, Scheduler, Worker } from "../../src";

describe("with specHelper", () => {
beforeAll(async () => {
await specHelper.connect();
});
afterAll(async () => {
await specHelper.disconnect();
});
beforeAll(async () => await specHelper.connect());
afterAll(async () => await specHelper.disconnect());
test(
"can provide an error if connection failed",
async (done) => {
const connectionDetails = {
pkg: specHelper.connectionDetails.pkg,
host: "wronghostname",
password: specHelper.connectionDetails.password,
port: specHelper.connectionDetails.port,
database: specHelper.connectionDetails.database,
namespace: specHelper.connectionDetails.namespace,
};
const brokenScheduler = new Scheduler({
connection: connectionDetails,
timeout: specHelper.timeout,
});
brokenScheduler.on("poll", () => {
throw new Error("Should not emit poll");
});
brokenScheduler.on("leader", () => {
throw new Error("Should not emit leader");
});
brokenScheduler.on("error", async (error) => {
expect(error.message).toMatch(/ENOTFOUND|ETIMEDOUT|ECONNREFUSED/);
await brokenScheduler.end();
done();
});
brokenScheduler.connect();
},
30 * 1000
);
describe("locking", () => {

@@ -188,43 +149,46 @@ beforeEach(async () => {

await scheduler.end();
await worker.end();
});
test("will remove stuck workers and fail thier jobs", async (done) => {
await scheduler.connect();
await scheduler.start();
await worker.start();
test("will remove stuck workers and fail their jobs", async () => {
await new Promise(async (resolve) => {
await scheduler.connect();
await scheduler.start();
await worker.start();
const workers = await queue.allWorkingOn();
const h = {};
h[worker.name] = "started";
expect(workers).toEqual(h);
const workers = await queue.allWorkingOn();
const h = {};
h[worker.name] = "started";
expect(workers).toEqual(h);
await queue.enqueue("stuckJobs", "stuck", ["oh no!"]);
await queue.enqueue("stuckJobs", "stuck", ["oh no!"]);
scheduler.on(
"cleanStuckWorker",
async (workerName, errorPayload, delta) => {
// response data should contain failure
expect(workerName).toEqual(worker.name);
expect(errorPayload.worker).toEqual(worker.name);
expect(errorPayload.error).toEqual(
"Worker Timeout (killed manually)"
);
scheduler.on(
"cleanStuckWorker",
async (workerName, errorPayload, delta) => {
// response data should contain failure
expect(workerName).toEqual(worker.name);
expect(errorPayload.worker).toEqual(worker.name);
expect(errorPayload.error).toEqual(
"Worker Timeout (killed manually)"
);
// check the workers list, should be empty now
expect(await queue.allWorkingOn()).toEqual({});
// check the workers list, should be empty now
expect(await queue.allWorkingOn()).toEqual({});
// check the failed list
let failed = await specHelper.redis.rpop(
specHelper.namespace + ":" + "failed"
);
failed = JSON.parse(failed);
expect(failed.queue).toBe("stuckJobs");
expect(failed.exception).toBe("Worker Timeout (killed manually)");
expect(failed.error).toBe("Worker Timeout (killed manually)");
// check the failed list
let failed = await specHelper.redis.rpop(
specHelper.namespace + ":" + "failed"
);
failed = JSON.parse(failed);
expect(failed.queue).toBe("stuckJobs");
expect(failed.exception).toBe(
"Worker Timeout (killed manually)"
);
expect(failed.error).toBe("Worker Timeout (killed manually)");
scheduler.removeAllListeners("cleanStuckWorker");
done();
}
);
scheduler.removeAllListeners("cleanStuckWorker");
resolve(null);
}
);
});
});

@@ -231,0 +195,0 @@ });

@@ -59,34 +59,2 @@ import { Queue, Worker } from "../../src";

test(
"can provide an error if connection failed",
async (done) => {
const connectionDetails = {
pkg: specHelper.connectionDetails.pkg,
host: "wronghostname",
password: specHelper.connectionDetails.password,
port: specHelper.connectionDetails.port,
database: specHelper.connectionDetails.database,
namespace: specHelper.connectionDetails.namespace,
};
const worker = new Worker(
{
connection: connectionDetails,
timeout: specHelper.timeout,
queues: [specHelper.queue],
},
{}
);
worker.on("error", async (error) => {
expect(error.message).toMatch(/ENOTFOUND|ETIMEDOUT|ECONNREFUSED/);
await worker.end();
done();
});
worker.connect();
},
30 * 1000
);
describe("performInline", () => {

@@ -169,29 +137,31 @@ beforeAll(() => {

describe("integration", () => {
test("will notice new job queues when started with queues=*", async (done) => {
const wildcardWorker = new Worker(
{
connection: specHelper.connectionDetails,
timeout: specHelper.timeout,
queues: ["*"],
},
jobs
);
test("will notice new job queues when started with queues=*", async () => {
await new Promise(async (resolve) => {
const wildcardWorker = new Worker(
{
connection: specHelper.connectionDetails,
timeout: specHelper.timeout,
queues: ["*"],
},
jobs
);
await wildcardWorker.connect();
await wildcardWorker.start();
await wildcardWorker.connect();
await wildcardWorker.start();
setTimeout(async () => {
await queue.enqueue("__newQueue", "add", [1, 2]);
}, 501);
setTimeout(async () => {
await queue.enqueue("__newQueue", "add", [1, 2]);
}, 501);
wildcardWorker.on("success", async (q, job, result, duration) => {
expect(q).toBe("__newQueue");
expect(job.class).toBe("add");
expect(result).toBe(3);
expect(wildcardWorker.result).toBe(result);
expect(duration).toBeGreaterThanOrEqual(0);
wildcardWorker.on("success", async (q, job, result, duration) => {
expect(q).toBe("__newQueue");
expect(job.class).toBe("add");
expect(result).toBe(3);
expect(wildcardWorker.result).toBe(result);
expect(duration).toBeGreaterThanOrEqual(0);
wildcardWorker.removeAllListeners("success");
await wildcardWorker.end();
done();
wildcardWorker.removeAllListeners("success");
await wildcardWorker.end();
resolve(null);
});
});

@@ -217,31 +187,35 @@ });

test("will mark a job as failed", async (done) => {
await queue.enqueue(specHelper.queue, "badAdd", [1, 2]);
test("will mark a job as failed", async () => {
await new Promise(async (resolve) => {
await queue.enqueue(specHelper.queue, "badAdd", [1, 2]);
await worker.start();
await worker.start();
worker.on("failure", (q, job, failire) => {
expect(q).toBe(specHelper.queue);
expect(job.class).toBe("badAdd");
expect(failire.message).toBe("Blue Smoke");
worker.on("failure", (q, job, failire) => {
expect(q).toBe(specHelper.queue);
expect(job.class).toBe("badAdd");
expect(failire.message).toBe("Blue Smoke");
worker.removeAllListeners("failure");
done();
worker.removeAllListeners("failure");
resolve(null);
});
});
});
test("can work a job and return successful things", async (done) => {
await queue.enqueue(specHelper.queue, "add", [1, 2]);
test("can work a job and return successful things", async () => {
await new Promise(async (resolve) => {
await queue.enqueue(specHelper.queue, "add", [1, 2]);
worker.start();
worker.start();
worker.on("success", (q, job, result, duration) => {
expect(q).toBe(specHelper.queue);
expect(job.class).toBe("add");
expect(result).toBe(3);
expect(worker.result).toBe(result);
expect(duration).toBeGreaterThanOrEqual(0);
worker.on("success", (q, job, result, duration) => {
expect(q).toBe(specHelper.queue);
expect(job.class).toBe("add");
expect(result).toBe(3);
expect(worker.result).toBe(result);
expect(duration).toBeGreaterThanOrEqual(0);
worker.removeAllListeners("success");
done();
worker.removeAllListeners("success");
resolve(null);
});
});

@@ -265,29 +239,33 @@ });

test("can accept jobs that are simple functions", async (done) => {
await queue.enqueue(specHelper.queue, "quickDefine");
test("can accept jobs that are simple functions", async () => {
await new Promise(async (resolve) => {
await queue.enqueue(specHelper.queue, "quickDefine");
worker.start();
worker.start();
worker.on("success", (q, job, result, duration) => {
expect(result).toBe("ok");
expect(duration).toBeGreaterThanOrEqual(0);
worker.removeAllListeners("success");
done();
worker.on("success", (q, job, result, duration) => {
expect(result).toBe("ok");
expect(duration).toBeGreaterThanOrEqual(0);
worker.removeAllListeners("success");
resolve(null);
});
});
});
test("will not work jobs that are not defined", async (done) => {
await queue.enqueue(specHelper.queue, "somethingFake");
test("will not work jobs that are not defined", async () => {
await new Promise(async (resolve) => {
await queue.enqueue(specHelper.queue, "somethingFake");
worker.start();
worker.start();
worker.on("failure", (q, job, failure, duration) => {
expect(q).toBe(specHelper.queue);
expect(String(failure)).toBe(
'Error: No job defined for class "somethingFake"'
);
expect(duration).toBeGreaterThanOrEqual(0);
worker.on("failure", (q, job, failure, duration) => {
expect(q).toBe(specHelper.queue);
expect(String(failure)).toBe(
'Error: No job defined for class "somethingFake"'
);
expect(duration).toBeGreaterThanOrEqual(0);
worker.removeAllListeners("failure");
done();
worker.removeAllListeners("failure");
resolve(null);
});
});

@@ -306,24 +284,36 @@ });

test("will ping with status even when working a slow job", async (done) => {
const nowInSeconds = Math.round(new Date().getTime() / 1000);
await worker.start();
await new Promise((resolve) =>
setTimeout(resolve, worker.options.timeout * 2)
);
const pingKey = worker.connection.key("worker", "ping", worker.name);
const firstPayload = JSON.parse(await specHelper.redis.get(pingKey));
expect(firstPayload.name).toEqual(worker.name);
expect(firstPayload.time).toBeGreaterThanOrEqual(nowInSeconds);
test("will ping with status even when working a slow job", async () => {
await new Promise(async (resolve) => {
const nowInSeconds = Math.round(new Date().getTime() / 1000);
await worker.start();
await new Promise((resolve) =>
setTimeout(resolve, worker.options.timeout * 2)
);
const pingKey = worker.connection.key(
"worker",
"ping",
worker.name
);
const firstPayload = JSON.parse(
await specHelper.redis.get(pingKey)
);
expect(firstPayload.name).toEqual(worker.name);
expect(firstPayload.time).toBeGreaterThanOrEqual(nowInSeconds);
await queue.enqueue(specHelper.queue, "twoSeconds");
await queue.enqueue(specHelper.queue, "twoSeconds");
worker.on("success", (q, job, result) => {
expect(result).toBe("slow");
worker.removeAllListeners("success");
done();
worker.on("success", (q, job, result) => {
expect(result).toBe("slow");
worker.removeAllListeners("success");
resolve(null);
});
const secondPayload = JSON.parse(
await specHelper.redis.get(pingKey)
);
expect(secondPayload.name).toEqual(worker.name);
expect(secondPayload.time).toBeGreaterThanOrEqual(
firstPayload.time
);
});
const secondPayload = JSON.parse(await specHelper.redis.get(pingKey));
expect(secondPayload.name).toEqual(worker.name);
expect(secondPayload.time).toBeGreaterThanOrEqual(firstPayload.time);
});

@@ -330,0 +320,0 @@ });

@@ -74,15 +74,17 @@ import { Queue, Worker, Scheduler } from "../../src";

test("the worker can work the job", async (done) => {
await worker.start();
worker.on("success", async (q, job, result, duration) => {
expect(q).toBe("math");
expect(job.class).toBe("add");
expect(result).toBe(3);
expect(worker.result).toBe(result);
expect(duration).toBeGreaterThanOrEqual(0);
test("the worker can work the job", async () => {
await new Promise(async (resolve) => {
await worker.start();
worker.on("success", async (q, job, result, duration) => {
expect(q).toBe("math");
expect(job.class).toBe("add");
expect(result).toBe(3);
expect(worker.result).toBe(result);
expect(duration).toBeGreaterThanOrEqual(0);
worker.removeAllListeners("success");
done();
worker.removeAllListeners("success");
resolve(null);
});
});
});
});

@@ -76,15 +76,17 @@ import { Queue, Worker, Scheduler } from "../../src";

test("the worker can work the job", async (done) => {
await worker.start();
worker.on("success", async (q, job, result, duration) => {
expect(q).toBe("math");
expect(job.class).toBe("add");
expect(result).toBe(3);
expect(worker.result).toBe(result);
expect(duration).toBeGreaterThanOrEqual(0);
test("the worker can work the job", async () => {
await new Promise(async (resolve) => {
await worker.start();
worker.on("success", async (q, job, result, duration) => {
expect(q).toBe("math");
expect(job.class).toBe("add");
expect(result).toBe(3);
expect(worker.result).toBe(result);
expect(duration).toBeGreaterThanOrEqual(0);
worker.removeAllListeners("success");
done();
worker.removeAllListeners("success");
resolve(null);
});
});
});
});

@@ -115,206 +115,216 @@ import specHelper from "../utils/specHelper";

test("allows the key to be specified as a function", async (done) => {
let calls = 0;
test("allows the key to be specified as a function", async () => {
await new Promise(async (resolve) => {
let calls = 0;
const functionJobs = {
jobLockAdd: {
plugins: [Plugins.JobLock],
pluginOptions: {
JobLock: {
key: function () {
// Once to create, once to delete
if (++calls === 2) {
worker1.end();
done();
}
const key = this.worker.connection.key(
"customKey",
Math.max.apply(Math.max, this.args)
);
return key;
const functionJobs = {
jobLockAdd: {
plugins: [Plugins.JobLock],
pluginOptions: {
JobLock: {
key: function () {
// Once to create, once to delete
if (++calls === 2) {
worker1.end();
resolve(null);
}
const key = this.worker.connection.key(
"customKey",
Math.max.apply(Math.max, this.args)
);
return key;
},
},
},
perform: (a, b) => {
return a + b;
},
},
perform: (a, b) => {
return a + b;
};
worker1 = new Worker(
{
connection: specHelper.cleanConnectionDetails(),
timeout: specHelper.timeout,
queues: [specHelper.queue],
},
},
};
worker1 = new Worker(
{
connection: specHelper.cleanConnectionDetails(),
timeout: specHelper.timeout,
queues: [specHelper.queue],
},
functionJobs
);
worker1.on("error", (error) => {
throw error;
functionJobs
);
worker1.on("error", (error) => {
throw error;
});
await worker1.connect();
await queue.enqueue(specHelper.queue, "jobLockAdd", [1, 2]);
worker1.start();
});
await worker1.connect();
await queue.enqueue(specHelper.queue, "jobLockAdd", [1, 2]);
worker1.start();
});
test("will not run 2 jobs with the same args at the same time", async (done) => {
let count = 0;
worker1 = new Worker(
{
connection: specHelper.cleanConnectionDetails(),
timeout: specHelper.timeout,
queues: [specHelper.queue],
},
jobs
);
worker2 = new Worker(
{
connection: specHelper.cleanConnectionDetails(),
timeout: specHelper.timeout,
queues: [specHelper.queue],
},
jobs
);
test("will not run 2 jobs with the same args at the same time", async () => {
await new Promise(async (resolve) => {
let count = 0;
worker1 = new Worker(
{
connection: specHelper.cleanConnectionDetails(),
timeout: specHelper.timeout,
queues: [specHelper.queue],
},
jobs
);
worker2 = new Worker(
{
connection: specHelper.cleanConnectionDetails(),
timeout: specHelper.timeout,
queues: [specHelper.queue],
},
jobs
);
worker1.on("error", (error) => {
throw error;
});
worker2.on("error", (error) => {
throw error;
});
worker1.on("error", (error) => {
throw error;
});
worker2.on("error", (error) => {
throw error;
});
await worker1.connect();
await worker2.connect();
await worker1.connect();
await worker2.connect();
const onComplete = async () => {
count++;
expect(count).toBe(1);
await worker1.end();
await worker2.end();
const onComplete = async () => {
count++;
expect(count).toBe(1);
await worker1.end();
await worker2.end();
const timestamps = await queue.timestamps();
let dealyedJob = await specHelper.redis.lpop(
specHelper.namespace + ":delayed:" + Math.round(timestamps[0] / 1000)
);
expect(dealyedJob).toBeDefined();
dealyedJob = JSON.parse(dealyedJob);
expect(dealyedJob.class).toBe("slowAdd");
expect(dealyedJob.args).toEqual([1, 2]);
const timestamps = await queue.timestamps();
let dealyedJob = await specHelper.redis.lpop(
specHelper.namespace +
":delayed:" +
Math.round(timestamps[0] / 1000)
);
expect(dealyedJob).toBeDefined();
dealyedJob = JSON.parse(dealyedJob);
expect(dealyedJob.class).toBe("slowAdd");
expect(dealyedJob.args).toEqual([1, 2]);
done();
};
resolve(null);
};
worker1.on("success", onComplete);
worker2.on("success", onComplete);
worker1.on("success", onComplete);
worker2.on("success", onComplete);
await queue.enqueue(specHelper.queue, "slowAdd", [1, 2]);
await queue.enqueue(specHelper.queue, "slowAdd", [1, 2]);
await queue.enqueue(specHelper.queue, "slowAdd", [1, 2]);
await queue.enqueue(specHelper.queue, "slowAdd", [1, 2]);
worker1.start();
worker2.start();
worker1.start();
worker2.start();
});
});
test("can be configured not to re-enqueue a duplicate task", async (done) => {
let count = 0;
worker1 = new Worker(
{
connection: specHelper.cleanConnectionDetails(),
timeout: specHelper.timeout,
queues: [specHelper.queue],
},
jobs
);
worker2 = new Worker(
{
connection: specHelper.cleanConnectionDetails(),
timeout: specHelper.timeout,
queues: [specHelper.queue],
},
jobs
);
test("can be configured not to re-enqueue a duplicate task", async () => {
await new Promise(async (resolve) => {
let count = 0;
worker1 = new Worker(
{
connection: specHelper.cleanConnectionDetails(),
timeout: specHelper.timeout,
queues: [specHelper.queue],
},
jobs
);
worker2 = new Worker(
{
connection: specHelper.cleanConnectionDetails(),
timeout: specHelper.timeout,
queues: [specHelper.queue],
},
jobs
);
worker1.on("error", (error) => {
throw error;
});
worker2.on("error", (error) => {
throw error;
});
worker1.on("error", (error) => {
throw error;
});
worker2.on("error", (error) => {
throw error;
});
await worker1.connect();
await worker2.connect();
await worker1.connect();
await worker2.connect();
const onComplete = async () => {
count++;
expect(count).toBe(1);
await worker1.end();
await worker2.end();
const onComplete = async () => {
count++;
expect(count).toBe(1);
await worker1.end();
await worker2.end();
const timestamps = await queue.timestamps();
expect(timestamps).toEqual([]);
const timestamps = await queue.timestamps();
expect(timestamps).toEqual([]);
done();
};
resolve(null);
};
worker1.on("success", onComplete);
worker2.on("success", onComplete);
worker1.on("success", onComplete);
worker2.on("success", onComplete);
await queue.enqueue(specHelper.queue, "withoutReEnqueue");
await queue.enqueue(specHelper.queue, "withoutReEnqueue");
await queue.enqueue(specHelper.queue, "withoutReEnqueue");
await queue.enqueue(specHelper.queue, "withoutReEnqueue");
worker1.start();
worker2.start();
worker1.start();
worker2.start();
});
});
test("will run 2 jobs with the different args at the same time", async (done) => {
worker1 = new Worker(
{
connection: specHelper.cleanConnectionDetails(),
timeout: specHelper.timeout,
queues: [specHelper.queue],
},
jobs
);
worker2 = new Worker(
{
connection: specHelper.cleanConnectionDetails(),
timeout: specHelper.timeout,
queues: [specHelper.queue],
},
jobs
);
test("will run 2 jobs with the different args at the same time", async () => {
await new Promise(async (resolve) => {
worker1 = new Worker(
{
connection: specHelper.cleanConnectionDetails(),
timeout: specHelper.timeout,
queues: [specHelper.queue],
},
jobs
);
worker2 = new Worker(
{
connection: specHelper.cleanConnectionDetails(),
timeout: specHelper.timeout,
queues: [specHelper.queue],
},
jobs
);
worker1.on("error", (error) => {
throw error;
});
worker2.on("error", (error) => {
throw error;
});
worker1.on("error", (error) => {
throw error;
});
worker2.on("error", (error) => {
throw error;
});
await worker1.connect();
await worker2.connect();
await worker1.connect();
await worker2.connect();
const startTime = new Date().getTime();
let completed = 0;
const startTime = new Date().getTime();
let completed = 0;
const onComplete = async function (q, job, result) {
completed++;
if (completed === 2) {
await worker1.end();
await worker2.end();
const delta = new Date().getTime() - startTime;
expect(delta).toBeLessThan(jobDelay * 3);
done();
}
};
const onComplete = async function (q, job, result) {
completed++;
if (completed === 2) {
await worker1.end();
await worker2.end();
const delta = new Date().getTime() - startTime;
expect(delta).toBeLessThan(jobDelay * 3);
resolve(null);
}
};
worker1.on("success", onComplete);
worker2.on("success", onComplete);
worker1.on("success", onComplete);
worker2.on("success", onComplete);
await queue.enqueue(specHelper.queue, "slowAdd", [1, 2]);
await queue.enqueue(specHelper.queue, "slowAdd", [3, 4]);
await queue.enqueue(specHelper.queue, "slowAdd", [1, 2]);
await queue.enqueue(specHelper.queue, "slowAdd", [3, 4]);
worker1.start();
worker2.start();
worker1.start();
worker2.start();
});
});
});
});

@@ -72,61 +72,65 @@ import specHelper from "../utils/specHelper";

test("will work fine with non-crashing jobs", async (done) => {
await queue.enqueue(specHelper.queue, "happyJob", [1, 2]);
const length = await queue.length(specHelper.queue);
expect(length).toBe(1);
test("will work fine with non-crashing jobs", async () => {
await new Promise(async (resolve) => {
await queue.enqueue(specHelper.queue, "happyJob", [1, 2]);
const length = await queue.length(specHelper.queue);
expect(length).toBe(1);
const worker = new Worker(
{
connection: specHelper.cleanConnectionDetails(),
timeout: specHelper.timeout,
queues: [specHelper.queue],
},
jobs
);
const worker = new Worker(
{
connection: specHelper.cleanConnectionDetails(),
timeout: specHelper.timeout,
queues: [specHelper.queue],
},
jobs
);
worker.on("success", async () => {
expect(loggedErrors.length).toBe(0);
const length = await specHelper.redis.llen("resque_test:failed");
expect(length).toBe(0);
await worker.end();
done();
});
worker.on("success", async () => {
expect(loggedErrors.length).toBe(0);
const length = await specHelper.redis.llen("resque_test:failed");
expect(length).toBe(0);
await worker.end();
resolve(null);
});
worker.on("failure", () => {
throw new Error("should never get here");
worker.on("failure", () => {
throw new Error("should never get here");
});
await worker.connect();
await worker.start();
});
await worker.connect();
await worker.start();
});
test("will prevent any failed jobs from ending in the failed queue", async (done) => {
await queue.enqueue(specHelper.queue, "brokenJob", [1, 2]);
const length = await queue.length(specHelper.queue);
expect(length).toBe(1);
test("will prevent any failed jobs from ending in the failed queue", async () => {
await new Promise(async (resolve) => {
await queue.enqueue(specHelper.queue, "brokenJob", [1, 2]);
const length = await queue.length(specHelper.queue);
expect(length).toBe(1);
const worker = new Worker(
{
connection: specHelper.cleanConnectionDetails(),
timeout: specHelper.timeout,
queues: [specHelper.queue],
},
jobs
);
const worker = new Worker(
{
connection: specHelper.cleanConnectionDetails(),
timeout: specHelper.timeout,
queues: [specHelper.queue],
},
jobs
);
worker.on("success", async () => {
expect(loggedErrors.length).toBe(1);
const length = await specHelper.redis.llen("resque_test:failed");
expect(length).toBe(0);
await worker.end();
done();
});
worker.on("success", async () => {
expect(loggedErrors.length).toBe(1);
const length = await specHelper.redis.llen("resque_test:failed");
expect(length).toBe(0);
await worker.end();
resolve(null);
});
await worker.connect();
worker.on("failure", () => {
throw new Error("should never get here");
await worker.connect();
worker.on("failure", () => {
throw new Error("should never get here");
});
worker.start();
});
worker.start();
});
});
});

@@ -65,286 +65,300 @@ import specHelper from "../utils/specHelper";

test("will work fine with non-crashing jobs", async (done) => {
await queue.enqueue(specHelper.queue, "happyJob", [1, 2]);
const length = await queue.length(specHelper.queue);
expect(length).toBe(1);
test("will work fine with non-crashing jobs", async () => {
await new Promise(async (resolve) => {
await queue.enqueue(specHelper.queue, "happyJob", [1, 2]);
const length = await queue.length(specHelper.queue);
expect(length).toBe(1);
const worker = new Worker(
{
connection: specHelper.cleanConnectionDetails(),
timeout: specHelper.timeout,
queues: [specHelper.queue],
},
jobs
);
const worker = new Worker(
{
connection: specHelper.cleanConnectionDetails(),
timeout: specHelper.timeout,
queues: [specHelper.queue],
},
jobs
);
worker.on("failure", () => {
throw new Error("should not get here");
});
worker.on("failure", () => {
throw new Error("should not get here");
});
await worker.connect();
await worker.connect();
worker.on("success", async () => {
const length = await specHelper.redis.llen(
`${specHelper.namespace}:failed`
);
expect(length).toBe(0);
await worker.end();
done();
worker.on("success", async () => {
const length = await specHelper.redis.llen(
`${specHelper.namespace}:failed`
);
expect(length).toBe(0);
await worker.end();
resolve(null);
});
worker.start();
});
worker.start();
});
test("will retry the job n times before finally failing", async (done) => {
await queue.enqueue(specHelper.queue, "brokenJob");
const length = await queue.length(specHelper.queue);
expect(length).toBe(1);
test("will retry the job n times before finally failing", async () => {
await new Promise(async (resolve) => {
await queue.enqueue(specHelper.queue, "brokenJob");
const length = await queue.length(specHelper.queue);
expect(length).toBe(1);
let failButRetryCount = 0;
let failureCount = 0;
let failButRetryCount = 0;
let failureCount = 0;
const worker = new Worker(
{
connection: specHelper.cleanConnectionDetails(),
timeout: specHelper.timeout,
queues: [specHelper.queue],
},
jobs
);
const worker = new Worker(
{
connection: specHelper.cleanConnectionDetails(),
timeout: specHelper.timeout,
queues: [specHelper.queue],
},
jobs
);
worker.on("success", () => {
failButRetryCount++;
});
worker.on("success", () => {
failButRetryCount++;
});
await worker.connect();
await worker.connect();
worker.on("failure", async () => {
failureCount++;
expect(failButRetryCount).toBe(2);
expect(failureCount).toBe(1);
expect(failButRetryCount + failureCount).toBe(3);
worker.on("failure", async () => {
failureCount++;
expect(failButRetryCount).toBe(2);
expect(failureCount).toBe(1);
expect(failButRetryCount + failureCount).toBe(3);
const length = await specHelper.redis.llen(
`${specHelper.namespace}:failed`
);
expect(length).toBe(1);
await worker.end();
done();
const length = await specHelper.redis.llen(
`${specHelper.namespace}:failed`
);
expect(length).toBe(1);
await worker.end();
resolve(null);
});
worker.start();
});
worker.start();
});
test("can have a retry count set", async (done) => {
const customJobs = {
jobWithRetryCount: {
plugins: [Plugins.Retry],
pluginOptions: {
Retry: {
retryLimit: 5,
retryDelay: 100,
test("can have a retry count set", async () => {
await new Promise(async (resolve) => {
const customJobs = {
jobWithRetryCount: {
plugins: [Plugins.Retry],
pluginOptions: {
Retry: {
retryLimit: 5,
retryDelay: 100,
},
},
perform: () => {
throw new Error("BUSTED");
},
},
perform: () => {
throw new Error("BUSTED");
},
},
};
};
await queue.enqueue(specHelper.queue, "jobWithRetryCount", [1, 2]);
const length = await queue.length(specHelper.queue);
expect(length).toBe(1);
await queue.enqueue(specHelper.queue, "jobWithRetryCount", [1, 2]);
const length = await queue.length(specHelper.queue);
expect(length).toBe(1);
let failButRetryCount = 0;
let failureCount = 0;
let failButRetryCount = 0;
let failureCount = 0;
const worker = new Worker(
{
connection: specHelper.cleanConnectionDetails(),
timeout: specHelper.timeout,
queues: [specHelper.queue],
},
customJobs
);
const worker = new Worker(
{
connection: specHelper.cleanConnectionDetails(),
timeout: specHelper.timeout,
queues: [specHelper.queue],
},
customJobs
);
worker.on("success", () => {
failButRetryCount++;
});
worker.on("success", () => {
failButRetryCount++;
});
await worker.connect();
await worker.connect();
worker.on("failure", async () => {
failureCount++;
expect(failButRetryCount).toBe(4);
expect(failureCount).toBe(1);
expect(failButRetryCount + failureCount).toBe(5);
worker.on("failure", async () => {
failureCount++;
expect(failButRetryCount).toBe(4);
expect(failureCount).toBe(1);
expect(failButRetryCount + failureCount).toBe(5);
const length = await specHelper.redis.llen(
`${specHelper.namespace}:failed`
);
expect(length).toBe(1);
await worker.end();
done();
const length = await specHelper.redis.llen(
`${specHelper.namespace}:failed`
);
expect(length).toBe(1);
await worker.end();
resolve(null);
});
worker.start();
});
worker.start();
});
test("can have custom retry times set", async (done) => {
const customJobs = {
jobWithBackoffStrategy: {
plugins: [Plugins.Retry],
pluginOptions: {
Retry: {
retryLimit: 5,
backoffStrategy: [1, 2, 3, 4, 5],
test("can have custom retry times set", async () => {
await new Promise(async (resolve) => {
const customJobs = {
jobWithBackoffStrategy: {
plugins: [Plugins.Retry],
pluginOptions: {
Retry: {
retryLimit: 5,
backoffStrategy: [1, 2, 3, 4, 5],
},
},
perform: function (a, b, callback) {
callback(new Error("BUSTED"), null);
},
},
perform: function (a, b, callback) {
callback(new Error("BUSTED"), null);
},
},
};
};
await queue.enqueue(specHelper.queue, "jobWithBackoffStrategy", [1, 2]);
const length = await queue.length(specHelper.queue);
expect(length).toBe(1);
await queue.enqueue(specHelper.queue, "jobWithBackoffStrategy", [1, 2]);
const length = await queue.length(specHelper.queue);
expect(length).toBe(1);
let failButRetryCount = 0;
let failureCount = 0;
let failButRetryCount = 0;
let failureCount = 0;
const worker = new Worker(
{
connection: specHelper.cleanConnectionDetails(),
timeout: specHelper.timeout,
queues: [specHelper.queue],
},
customJobs
);
const worker = new Worker(
{
connection: specHelper.cleanConnectionDetails(),
timeout: specHelper.timeout,
queues: [specHelper.queue],
},
customJobs
);
worker.on("success", () => {
failButRetryCount++;
});
worker.on("success", () => {
failButRetryCount++;
});
await worker.connect();
await worker.connect();
worker.on("failure", async () => {
failureCount++;
expect(failButRetryCount).toBe(4);
expect(failureCount).toBe(1);
expect(failButRetryCount + failureCount).toBe(5);
worker.on("failure", async () => {
failureCount++;
expect(failButRetryCount).toBe(4);
expect(failureCount).toBe(1);
expect(failButRetryCount + failureCount).toBe(5);
const length = await specHelper.redis.llen(
`${specHelper.namespace}:failed`
);
expect(length).toBe(1);
await worker.end();
done();
const length = await specHelper.redis.llen(
`${specHelper.namespace}:failed`
);
expect(length).toBe(1);
await worker.end();
resolve(null);
});
worker.start();
});
worker.start();
});
test("when a job fails it should be re-enqueued (and not go to the failure queue)", async (done) => {
await queue.enqueue(specHelper.queue, "brokenJob", [1, 2]);
test("when a job fails it should be re-enqueued (and not go to the failure queue)", async () => {
await new Promise(async (resolve) => {
await queue.enqueue(specHelper.queue, "brokenJob", [1, 2]);
const worker = new Worker(
{
connection: specHelper.cleanConnectionDetails(),
timeout: specHelper.timeout,
queues: [specHelper.queue],
},
jobs
);
const worker = new Worker(
{
connection: specHelper.cleanConnectionDetails(),
timeout: specHelper.timeout,
queues: [specHelper.queue],
},
jobs
);
await worker.connect();
worker.on("success", async () => {
const timestamps = await queue.scheduledAt(
specHelper.queue,
"brokenJob",
[1, 2]
);
expect(timestamps.length).toBe(1);
const length = await specHelper.redis.llen(
`${specHelper.namespace}:failed`
);
expect(length).toBe(0);
await worker.end();
done();
await worker.connect();
worker.on("success", async () => {
const timestamps = await queue.scheduledAt(
specHelper.queue,
"brokenJob",
[1, 2]
);
expect(timestamps.length).toBe(1);
const length = await specHelper.redis.llen(
`${specHelper.namespace}:failed`
);
expect(length).toBe(0);
await worker.end();
resolve(null);
});
worker.start();
});
worker.start();
});
test("will handle the stats properly for failing jobs", async (done) => {
await queue.enqueue(specHelper.queue, "brokenJob", [1, 2]);
test("will handle the stats properly for failing jobs", async () => {
await new Promise(async (resolve) => {
await queue.enqueue(specHelper.queue, "brokenJob", [1, 2]);
const worker = new Worker(
{
connection: specHelper.cleanConnectionDetails(),
timeout: specHelper.timeout,
queues: [specHelper.queue],
},
jobs
);
const worker = new Worker(
{
connection: specHelper.cleanConnectionDetails(),
timeout: specHelper.timeout,
queues: [specHelper.queue],
},
jobs
);
await worker.connect();
await worker.connect();
worker.on("success", async () => {
const globalProcessed = await specHelper.redis.get(
`${specHelper.namespace}:stat:processed`
);
const globalFailed = await specHelper.redis.get(
`${specHelper.namespace}:stat:failed`
);
const workerProcessed = await specHelper.redis.get(
`${specHelper.namespace}:stat:processed:${worker.name}`
);
const workerFailed = await specHelper.redis.get(
`${specHelper.namespace}:stat:failed:${worker.name}`
);
expect(String(globalProcessed)).toBe("0");
expect(String(globalFailed)).toBe("1");
expect(String(workerProcessed)).toBe("0");
expect(String(workerFailed)).toBe("1");
await worker.end();
done();
worker.on("success", async () => {
const globalProcessed = await specHelper.redis.get(
`${specHelper.namespace}:stat:processed`
);
const globalFailed = await specHelper.redis.get(
`${specHelper.namespace}:stat:failed`
);
const workerProcessed = await specHelper.redis.get(
`${specHelper.namespace}:stat:processed:${worker.name}`
);
const workerFailed = await specHelper.redis.get(
`${specHelper.namespace}:stat:failed:${worker.name}`
);
expect(String(globalProcessed)).toBe("0");
expect(String(globalFailed)).toBe("1");
expect(String(workerProcessed)).toBe("0");
expect(String(workerFailed)).toBe("1");
await worker.end();
resolve(null);
});
worker.start();
});
worker.start();
});
test("will set the retry counter & retry data", async (done) => {
await queue.enqueue(specHelper.queue, "brokenJob", [1, 2]);
test("will set the retry counter & retry data", async () => {
await new Promise(async (resolve) => {
await queue.enqueue(specHelper.queue, "brokenJob", [1, 2]);
const worker = new Worker(
{
connection: specHelper.cleanConnectionDetails(),
timeout: specHelper.timeout,
queues: [specHelper.queue],
},
jobs
);
const worker = new Worker(
{
connection: specHelper.cleanConnectionDetails(),
timeout: specHelper.timeout,
queues: [specHelper.queue],
},
jobs
);
await worker.connect();
worker.on("success", async () => {
const retryAttempts = await specHelper.redis.get(
`${specHelper.namespace}:resque-retry:brokenJob:1-2`
);
let failureData = await specHelper.redis.get(
`${specHelper.namespace}:failure-resque-retry:brokenJob:1-2`
);
expect(String(retryAttempts)).toBe("0");
failureData = JSON.parse(failureData);
expect(failureData.payload).toEqual([1, 2]);
expect(failureData.exception).toBe("Error: BUSTED");
expect(failureData.worker).toBe("brokenJob");
expect(failureData.queue).toBe("test_queue");
await worker.end();
done();
await worker.connect();
worker.on("success", async () => {
const retryAttempts = await specHelper.redis.get(
`${specHelper.namespace}:resque-retry:brokenJob:1-2`
);
let failureData = await specHelper.redis.get(
`${specHelper.namespace}:failure-resque-retry:brokenJob:1-2`
);
expect(String(retryAttempts)).toBe("0");
failureData = JSON.parse(failureData);
expect(failureData.payload).toEqual([1, 2]);
expect(failureData.exception).toBe("Error: BUSTED");
expect(failureData.worker).toBe("brokenJob");
expect(failureData.queue).toBe("test_queue");
await worker.end();
resolve(null);
});
worker.start();
});
worker.start();
});
});
});

@@ -17,3 +17,3 @@ import * as Redis from "ioredis";

pkg: pkg,
host: "127.0.0.1",
host: process.env.REDIS_HOST || "127.0.0.1",
password: "",

@@ -20,0 +20,0 @@ port: 6379,

@@ -20,2 +20,4 @@ /// <reference types="node" />

working: boolean;
pollTimer: NodeJS.Timeout;
endTimer: NodeJS.Timeout;
pingTimer: NodeJS.Timeout;

@@ -22,0 +24,0 @@ job: Job<any>;

@@ -42,2 +42,4 @@ "use strict";

this.job = null;
this.pollTimer = null;
this.endTimer = null;
this.pingTimer = null;

@@ -73,3 +75,3 @@ this.started = false;

await new Promise((resolve) => {
setTimeout(() => {
this.endTimer = setTimeout(() => {
resolve(null);

@@ -80,2 +82,5 @@ }, this.options.timeout);

}
clearTimeout(this.pollTimer);
clearTimeout(this.endTimer);
clearInterval(this.pingTimer);
if (this.connection &&

@@ -85,3 +90,2 @@ (this.connection.connected === true ||

this.connection.connected === null)) {
clearInterval(this.pingTimer);
await this.untrack();

@@ -250,3 +254,3 @@ }

await new Promise((resolve) => {
setTimeout(() => {
this.pollTimer = setTimeout(() => {
this.poll();

@@ -285,2 +289,4 @@ resolve(null);

async ping() {
if (!this.running)
return;
const name = this.name;

@@ -287,0 +293,0 @@ const nowSeconds = Math.round(new Date().getTime() / 1000);

@@ -6,3 +6,3 @@ {

"license": "Apache-2.0",
"version": "9.0.2",
"version": "9.0.3",
"homepage": "http://github.com/actionhero/node-resque",

@@ -30,3 +30,3 @@ "repository": {

"dependencies": {
"ioredis": "^4.27.2"
"ioredis": "^4.27.6"
},

@@ -36,20 +36,12 @@ "devDependencies": {

"@types/jest": "^26.0.23",
"@types/node": "^15.3.0",
"ioredis-mock": "^5.5.6",
"jest": "^26.6.3",
"@types/node": "^16.0.0",
"ioredis-mock": "^5.6.0",
"jest": "^27.0.6",
"node-schedule": "^2.0.0",
"prettier": "^2.3.0",
"ts-jest": "^26.5.6",
"prettier": "^2.3.2",
"ts-jest": "^27.0.3",
"ts-node": "^10.0.0",
"typedoc": "^0.20.36",
"typescript": "^4.2.4"
"typedoc": "^0.21.2",
"typescript": "^4.3.5"
},
"jest": {
"testPathIgnorePatterns": [
"<rootDir>/__tests__/utils"
],
"transform": {
"^.+\\.ts?$": "ts-jest"
}
},
"scripts": {

@@ -56,0 +48,0 @@ "prepare": "npm run build && npm run docs",

@@ -9,3 +9,3 @@ # node-resque: The best background jobs in node.

[![CircleCI](https://circleci.com/gh/actionhero/node-resque.svg?style=svg)](https://circleci.com/gh/actionhero/node-resque)
![Test](https://github.com/actionhero/node-resque/workflows/Test/badge.svg)

@@ -12,0 +12,0 @@ ## The Resque Factory (How It Works)

@@ -30,2 +30,4 @@ import { EventEmitter } from "events";

working: boolean;
pollTimer: NodeJS.Timeout;
endTimer: NodeJS.Timeout;
pingTimer: NodeJS.Timeout;

@@ -133,2 +135,4 @@ job: Job<any>;

this.job = null;
this.pollTimer = null;
this.endTimer = null;
this.pingTimer = null;

@@ -173,3 +177,3 @@ this.started = false;

await new Promise((resolve) => {
setTimeout(() => {
this.endTimer = setTimeout(() => {
resolve(null);

@@ -181,2 +185,6 @@ }, this.options.timeout);

clearTimeout(this.pollTimer);
clearTimeout(this.endTimer);
clearInterval(this.pingTimer);
if (

@@ -188,3 +196,2 @@ this.connection &&

) {
clearInterval(this.pingTimer);
await this.untrack();

@@ -413,3 +420,3 @@ }

await new Promise((resolve) => {
setTimeout(() => {
this.pollTimer = setTimeout(() => {
this.poll();

@@ -469,2 +476,4 @@ resolve(null);

private async ping() {
if (!this.running) return;
const name = this.name;

@@ -471,0 +480,0 @@ const nowSeconds = Math.round(new Date().getTime() / 1000);

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc