Socket
Socket
Sign inDemoInstall

bullmq

Package Overview
Dependencies
Maintainers
1
Versions
531
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

bullmq - npm Package Compare versions

Comparing version 1.9.0 to 1.10.0

.github/workflows/coverage.yml

16

CHANGELOG.md

@@ -0,1 +1,17 @@

# [1.10.0](https://github.com/taskforcesh/bullmq/compare/v1.9.0...v1.10.0) (2020-10-20)
### Bug Fixes
* **job:** remove listeners before resolving promise ([563ce92](https://github.com/taskforcesh/bullmq/commit/563ce9218f5dd81f2bc836f9e8ccdedc549f09dd))
* **worker:** continue processing if handleFailed fails. fixes [#286](https://github.com/taskforcesh/bullmq/issues/286) ([4ef1cbc](https://github.com/taskforcesh/bullmq/commit/4ef1cbc13d53897b57ae3d271afbaa1b213824aa))
* **worker:** fix memory leak on Promise.race ([#282](https://github.com/taskforcesh/bullmq/issues/282)) ([a78ab2b](https://github.com/taskforcesh/bullmq/commit/a78ab2b362e54f897eec6c8b16f16ecccf7875c2))
* **worker:** setname on worker blocking connection ([#291](https://github.com/taskforcesh/bullmq/issues/291)) ([50a87fc](https://github.com/taskforcesh/bullmq/commit/50a87fcb1dab976a6a0273d2b0cc4b31b63c015f))
* remove async for loop in child pool fixes [#229](https://github.com/taskforcesh/bullmq/issues/229) ([d77505e](https://github.com/taskforcesh/bullmq/commit/d77505e989cd1395465c5222613555f79e4d9720))
### Features
* **sandbox:** kill child workers gracefully ([#243](https://github.com/taskforcesh/bullmq/issues/243)) ([4262837](https://github.com/taskforcesh/bullmq/commit/4262837bc67e007fe44606670dce48ee7fec65cd))
# [1.9.0](https://github.com/taskforcesh/bullmq/compare/v1.8.14...v1.9.0) (2020-07-19)

@@ -2,0 +18,0 @@

4

dist/classes/child-pool.d.ts

@@ -17,6 +17,6 @@ /// <reference types="node" />

remove(child: ChildProcessExt): void;
kill(child: ChildProcess, signal?: NodeJS.Signals): void;
clean(): void;
kill(child: ChildProcess, signal?: 'SIGTERM' | 'SIGKILL'): Promise<void>;
clean(): Promise<void>;
getFree(id: string): ChildProcessExt[];
getAllFree(): ChildProcessExt[];
}

@@ -10,7 +10,10 @@ "use strict";

const util_1 = require("util");
const process_utils_1 = require("./process-utils");
const stat = util_1.promisify(fs.stat);
const CHILD_KILL_TIMEOUT = 30000;
const convertExecArgv = async (execArgv) => {
const standard = [];
const convertedArgs = [];
lodash_1.forEach(execArgv, async (arg) => {
for (let i = 0; i < execArgv.length; i++) {
const arg = execArgv[i];
if (arg.indexOf('--inspect') === -1) {

@@ -24,10 +27,18 @@ standard.push(arg);

}
});
}
return standard.concat(convertedArgs);
};
const initChild = function (child, processFile) {
return new Promise(resolve => {
child.send({ cmd: 'init', value: processFile }, resolve);
async function initChild(child, processFile) {
const onComplete = new Promise(resolve => {
const onMessageHandler = (msg) => {
if (msg.cmd === 'init-complete') {
resolve();
child.off('message', onMessageHandler);
}
};
child.on('message', onMessageHandler);
});
};
await new Promise(resolve => child.send({ cmd: 'init', value: processFile }, resolve));
await onComplete;
}
class ChildPool {

@@ -77,14 +88,11 @@ constructor() {

}
kill(child, signal) {
child.kill(signal || 'SIGKILL');
async kill(child, signal = 'SIGKILL') {
this.remove(child);
await process_utils_1.killAsync(child, signal, CHILD_KILL_TIMEOUT);
}
clean() {
async clean() {
const children = lodash_1.values(this.retained).concat(this.getAllFree());
children.forEach(child => {
// TODO: We may want to use SIGKILL if the process does not die after some time.
this.kill(child, 'SIGTERM');
});
this.retained = {};
this.free = {};
await Promise.all(children.map(c => this.kill(c, 'SIGTERM')));
}

@@ -91,0 +99,0 @@ getFree(id) {

@@ -254,8 +254,8 @@ "use strict";

function onCompleted(args) {
removeListeners();
resolve(args.returnvalue);
removeListeners();
}
function onFailed(args) {
removeListeners();
reject(new Error(args.failedReason || args));
removeListeners();
}

@@ -262,0 +262,0 @@ const completedEvent = `completed:${jobId}`;

@@ -12,2 +12,8 @@ "use strict";

let processor;
let currentJobPromise;
// same as process.send but waits until the send is complete
// the async version is used below because otherwise
// the termination handler may exit before the parent
// process has received the messages it requires
const processSendAsync = util_1.promisify(process.send.bind(process));
// https://stackoverflow.com/questions/18391212/is-it-not-possible-to-stringify-an-error-using-json-stringify

@@ -28,3 +34,14 @@ if (!('toJSON' in Error.prototype)) {

}
process.on('message', async (msg) => {
async function waitForCurrentJobAndExit() {
status = 'TERMINATING';
try {
await currentJobPromise;
}
finally {
process.exit(process.exitCode || 0);
}
}
process.on('SIGTERM', waitForCurrentJobAndExit);
process.on('SIGINT', waitForCurrentJobAndExit);
process.on('message', msg => {
switch (msg.cmd) {

@@ -52,2 +69,5 @@ case 'init':

status = 'IDLE';
process.send({
cmd: 'init-complete',
});
break;

@@ -62,21 +82,24 @@ case 'start':

status = 'STARTED';
try {
const result = await Promise.resolve(processor(wrapJob(msg.job)) || {});
process.send({
cmd: 'completed',
value: result,
});
}
catch (err) {
if (!err.message) {
err = new Error(err);
currentJobPromise = (async () => {
try {
const result = (await processor(wrapJob(msg.job))) || {};
await processSendAsync({
cmd: 'completed',
value: result,
});
}
process.send({
cmd: 'failed',
value: err,
});
}
finally {
status = 'IDLE';
}
catch (err) {
if (!err.message) {
err = new Error(err);
}
await processSendAsync({
cmd: 'failed',
value: err,
});
}
finally {
status = 'IDLE';
currentJobPromise = undefined;
}
})();
break;

@@ -83,0 +106,0 @@ case 'stop':

@@ -33,2 +33,5 @@ "use strict";

this.isBlocked = false;
if (!this.opts.stalledInterval) {
throw new Error('Stalled interval cannot be zero or undefined');
}
// tslint:disable: no-floating-promises

@@ -35,0 +38,0 @@ this.run();

@@ -0,1 +1,2 @@

import { Redis } from 'ioredis';
import { Processor, WorkerOptions } from '../interfaces';

@@ -18,2 +19,3 @@ import { QueueBase, Repeat } from './';

constructor(name: string, processor: string | Processor, opts?: WorkerOptions);
waitUntilReady(): Promise<Redis>;
get repeat(): Promise<Repeat>;

@@ -20,0 +22,0 @@ private run;

@@ -51,3 +51,8 @@ "use strict";

});
this.on('error', err => console.error(err));
}
async waitUntilReady() {
await super.waitUntilReady();
return this.blockingConnection.client;
}
get repeat() {

@@ -63,3 +68,3 @@ return new Promise(async (resolve) => {

async run() {
const client = await this.client;
const client = await this.blockingConnection.client;
if (this.closing) {

@@ -91,5 +96,6 @@ return;

* Get the first promise that completes
* Explanation https://stackoverflow.com/a/42898229/1848640
*/
const [completed] = await Promise.race([...processing.keys()].map(p => p.then(() => [p])));
const promises = [...processing.keys()];
const completedIdx = await Promise.race(promises.map((p, idx) => p.then(() => idx)));
const completed = promises[completedIdx];
const token = processing.get(completed);

@@ -182,2 +188,4 @@ processing.delete(completed);

//
// TODO: Have only 1 timer that extends all the locks instead of one timer
// per concurrency setting.
let lockRenewId;

@@ -215,6 +223,7 @@ let timerStopped = false;

}
catch (e) {
catch (err) {
this.emit('error', err);
// It probably means that the job has lost the lock before completion
// The QueueScheduler will (or already has) move the job to the waiting list (as stalled)
this.emit('error', e);
// The QueueScheduler will (or already has) moved the job back
// to the waiting list (as stalled)
}

@@ -286,27 +295,31 @@ };

}
async close(force = false) {
if (!this.closing) {
close(force = false) {
if (this.closing) {
return this.closing;
}
this.closing = (async () => {
this.emit('closing', 'closing queue');
this.closing = new Promise(async (resolve, reject) => {
try {
const client = await this.blockingConnection.client;
await this.resume();
if (!force) {
await this.whenCurrentJobsFinished(false);
}
else {
await client.disconnect();
}
const client = await this.blockingConnection.client;
this.resume();
await Promise.resolve()
.finally(() => {
return force || this.whenCurrentJobsFinished(false);
})
.finally(() => {
var _a;
const closePoolPromise = (_a = this.childPool) === null || _a === void 0 ? void 0 : _a.clean();
if (force) {
// since we're not waiting for the job to end attach
// an error handler to avoid crashing the whole process
closePoolPromise === null || closePoolPromise === void 0 ? void 0 : closePoolPromise.catch(err => {
console.error(err);
});
return;
}
catch (err) {
reject(err);
}
finally {
this.timerManager.clearAllTimers();
this.childPool && this.childPool.clean();
}
this.emit('closed');
resolve();
});
}
return closePoolPromise;
})
.finally(() => client.disconnect())
.finally(() => this.timerManager.clearAllTimers())
.finally(() => this.emit('closed'));
})();
return this.closing;

@@ -313,0 +326,0 @@ }

@@ -10,5 +10,3 @@ "use strict";

});
afterEach(() => {
pool.clean();
});
afterEach(() => pool.clean());
it('should return same child if free', async () => {

@@ -47,3 +45,3 @@ const processor = __dirname + '/fixtures/fixture_processor_bar.js';

chai_1.expect(child).to.be.ok;
pool.kill(child);
await pool.kill(child);
chai_1.expect(pool.retained).to.be.empty;

@@ -66,3 +64,3 @@ const newChild = await pool.retain(processor);

chai_1.expect(children).not.to.include(child);
});
}).timeout(10000);
it('should return an old child if many retained and one free', async () => {

@@ -82,4 +80,4 @@ const processor = __dirname + '/fixtures/fixture_processor_bar.js';

chai_1.expect(children).to.include(child);
});
}).timeout(10000);
});
//# sourceMappingURL=test_child-pool.js.map

@@ -116,3 +116,4 @@ "use strict";

const timeDiff = Date.now() - startTime;
chai_1.expect(timeDiff).to.be.gte(numGroups * 1000);
// In some test envs, these timestamps can drift.
chai_1.expect(timeDiff).to.be.gte(numGroups * 990);
chai_1.expect(timeDiff).to.be.below((numGroups + 1) * 1100);

@@ -124,3 +125,3 @@ for (const group in completed) {

chai_1.expect(diff).to.be.below(2000);
chai_1.expect(diff).to.be.gte(1000);
chai_1.expect(diff).to.be.gte(990);
prevTime = completed[group][i];

@@ -127,0 +128,0 @@ }

@@ -274,3 +274,34 @@ "use strict";

});
it('should allow the job to complete and then exit on worker close', async function () {
this.timeout(1500000);
const processFile = __dirname + '/fixtures/fixture_processor_slow.js';
const worker = new classes_1.Worker(queueName, processFile);
// aquire and release a child here so we know it has it's full termination handler setup
const initalizedChild = await worker['childPool'].retain(processFile);
await worker['childPool'].release(initalizedChild);
// await this After we've added the job
const onJobActive = new Promise(resolve => {
worker.on('active', resolve);
});
const jobAdd = queue.add('foo', {});
await onJobActive;
chai_1.expect(Object.keys(worker['childPool'].retained)).to.have.lengthOf(1);
chai_1.expect(worker['childPool'].getAllFree()).to.have.lengthOf(0);
const child = Object.values(worker['childPool'].retained)[0];
chai_1.expect(child).to.equal(initalizedChild);
chai_1.expect(child.exitCode).to.equal(null);
chai_1.expect(child.killed).to.equal(false);
// at this point the job should be active and running on the child
// trigger a close while we know it's doing work
await worker.close();
// ensure the child did get cleaned up
chai_1.expect(!!child.killed).to.eql(true);
chai_1.expect(Object.keys(worker['childPool'].retained)).to.have.lengthOf(0);
chai_1.expect(worker['childPool'].getAllFree()).to.have.lengthOf(0);
const job = await jobAdd;
// check that the job did finish successfully
const jobResult = await job.waitUntilFinished(queueEvents);
chai_1.expect(jobResult).to.equal(42);
});
});
//# sourceMappingURL=test_sandboxed_process.js.map

@@ -752,2 +752,41 @@ "use strict";

});
mocha_1.it('continue processing after a worker has stalled', async function () {
let first = true;
this.timeout(10000);
const worker = new classes_1.Worker(queueName, async (job) => {
if (first) {
first = false;
return utils_1.delay(2000);
}
}, {
lockDuration: 1000,
lockRenewTime: 3000,
});
await worker.waitUntilReady();
const queueScheduler = new classes_1.QueueScheduler(queueName, {
stalledInterval: 100,
});
await queueScheduler.waitUntilReady();
const job = await queue.add('test', { bar: 'baz' });
const completed = new Promise(resolve => {
worker.on('completed', resolve);
});
await completed;
await worker.close();
await queueScheduler.close();
});
mocha_1.it('stalled interval cannot be zero', function (done) {
this.timeout(10000);
let queueScheduler;
try {
queueScheduler = new classes_1.QueueScheduler(queueName, {
stalledInterval: 0,
});
// Fail test if we reach here.
done(new Error('Should throw an exception'));
}
catch (err) {
done();
}
});
mocha_1.describe('Concurrency process', () => {

@@ -754,0 +793,0 @@ mocha_1.it('should run job in sequence if I specify a concurrency of 1', async () => {

@@ -11,3 +11,3 @@ ---

![Lifecycle of a job](../.gitbook/assets/image.png)
![Lifecycle of a job](../.gitbook/assets/image%20%281%29.png)

@@ -17,1 +17,2 @@ When a job is added to a queue it can be in one of two states, it can either be in the “wait” status, which is, in fact, a waiting list, where all jobs must enter before they can be processed, or it can be in a “delayed” status: a delayed status implies that the job is waiting for some timeout or to be promoted for being processed, however, a delayed job will not be processed directly, instead it will be placed at the beginning of the waiting list and processed as soon as a worker is idle.

The next state for a job Is the “active” state. The active state is represented by a set, and are jobs that are currently being processed, i.e. they are running in the `process` function explained in the previous chapter. A job can be in the active state for an unlimited amount of time until the process is completed or an exception is thrown so that the job will end in either the “completed” or the “failed” status.

@@ -14,3 +14,3 @@ # Connections

const myQueue = new Queue('myqueue', { connection: {
host: myredis.taskforce.run,
host: "myredis.taskforce.run",
port: 32856

@@ -20,3 +20,3 @@ }});

const myWorker = new Worker('myworker', async (job)=>{}, { connection: {
host: myredis.taskforce.run,
host: "myredis.taskforce.run",
port: 32856

@@ -37,4 +37,7 @@ }});

Note that in the second example, even though the ioredis instance is being reused, the worker will create a duplicated connection that it needs internally to make blocking connections.
Note that in the second example, even though the ioredis instance is being reused, the worker will create a duplicated connection that it needs internally to make blocking connections. Please read on the [ioredis](https://github.com/luin/ioredis/blob/master/API.md) documentation on how to properly create an instance of `IORedis.`
If you can afford many connections, by all means just use them. Redis connections have quite low overhead, so you should not need to care about reusing connections unless your service provider is imposing you hard limitations.

@@ -8,3 +8,3 @@ # Events

```typescript
import { Queue } from 'bullmq'
import { Queue } from 'bullmq'

@@ -19,3 +19,3 @@ const myQueue = new Queue('Paint');

```typescript
import { Worker } from 'bullmq'
import { Worker } from 'bullmq'

@@ -40,3 +40,3 @@ const myWorker = new Worker('Paint');

```typescript
import { QueueEvents } from 'bullmq'
import { QueueEvents } from 'bullmq'

@@ -43,0 +43,0 @@ const queueEvents = new QueueEvents('Paint')

@@ -8,1 +8,2 @@ # Introduction

Jobs in BullMQ are basically a user created data structure that can be stored in the queue. Jobs are processed by _**workers**_. A Worker is the second class you should be aware about. Workers are instances capable of processing jobs. You can have many workers, either running in the same Node.js process, or in separate processes as well as in different machines. They will all consume jobs from the queue and mark the jobs as completed or failed.

@@ -9,1 +9,24 @@ ---

```typescript
import { Queue } from 'bullmq'
const myQueue = new Queue('Paint');
// Add a job that will be processed before all others
await myQueue.add('wall', { color: 'pink' });
```
When you add jobs to the queue there are several options that you can use. For example you can specify how many jobs you want to keep when the jobs are completed or failed:
```typescript
await myQueue.add(
'wall',
{ color: 'pink' },
{ removeOnComplete: true, removeOnFailed: 1000 },
);
```
In the example above all completed jobs will be removed automatically and the last 1000 failed will be kept in the queue.

@@ -1,2 +0,2 @@

# Proritized
# Prioritized

@@ -9,3 +9,3 @@ Jobs can also include a priority option. Using priorities, job's processing order will be affected by the specified priority instead of following a FIFO or LIFO pattern.

Priorities goes from 1 to MAX\_INT, whereas lower number is always higher priority than higher.
Priorities goes from 1 to MAX\_INT, whereas lower number is always higher priority than higher numbers.

@@ -12,0 +12,0 @@ ```typescript

@@ -6,1 +6,2 @@ # Jobs

An important thing to consider is that you can mix the different job types in the same queue, so you can add FIFO jobs, and at any moment add a LIFO or a delayed job.

@@ -24,3 +24,2 @@ # Stalled

const worker = new Worker('Paint', painter);
```

@@ -36,1 +35,2 @@ {% endcode %}

{% endcode %}
# Rate limiting
BullMQ provides rate limiting for the queues. It is possible to configure the workers so that they obey a given rate limiting option:
```typescript
import { Worker, QueueScheduler } from "bullmq";
const worker = new Worker('painter', async job => paintCar(job), {
limiter: {
max: 10,
duration: 1000
}
});
const scheduler = new QueueScheduler('painter');
```
{% hint style="warning" %}
Jobs that get rate limited will actually end as delayed jobs, so you need at least one QueueScheduler somewhere in your deployment so that jobs are put back to the wait status.
{% endhint %}
{% hint style="info" %}
The rate limiter is global, so if you have for example 10 workers for one queue with the above settings, still only 10 jobs will be processed by second.
{% endhint %}
It is also possible to define a rate limiter based on group keys, for example you may want to have a rate limiter per _customer_ instead of a global rate limiter for all customers:
```typescript
import { Queue, Worker, QueueScheduler } from "bullmq";
const queue = new Queue('painter',
{
limiter: {
groupKey: 'customerId',
}
});
const worker = new Worker('painter', async job => paintCar(job), {
limiter: {
max: 10,
duration: 1000,
groupKey: 'customerId'
}
});
const scheduler = new QueueScheduler('painter');
// jobs will be rate limited by the value of customerId key:
await queue.add('rate limited paint', { customerId: 'my-customer-id' });
```

@@ -5,3 +5,64 @@ # Retrying failing jobs

BullMQ support retries of failed jobs using backoff functions.
BullMQ support retries of failed jobs using backoff functions. It is possible to use the built in backoff functions or provide custom ones.
The code below shows how to specify a "exponential" backoff function with a 1 second delay as seed value, so it will retry at most 3 times spaced after 1 second, 2 seconds and 4 seconds:
```typescript
import { Queue } from 'bullmq';
const myQueue = new Queue('foo');
await queue.add(
'test-retry',
{ foo: 'bar' },
{
attempts: 3,
backoff: {
type: 'exponential',
delay: 1000,
},
},
);
```
The current built-in backoff functions are "exponential" and "fixed".
If you want to define your custom backoff you need to define it at the worker:
```typescript
import { Worker } from 'bullmq';
const worker = new Worker(
'foo',
async job => doSomeProcessing(),
{
settings: {
backoffStrategies: {
custom(attemptsMade: number) {
return attemptsMade * 1000;
},
},
},
},
);
```
You can then use your "custom" strategy when adding jobs:
```typescript
import { Queue } from 'bullmq';
const myQueue = new Queue('foo');
await queue.add(
'test-retry',
{ foo: 'bar' },
{
attempts: 3,
backoff: {
type: 'custom',
},
},
);
```

@@ -10,5 +10,3 @@ # Queues

```typescript
await queue.add('paint', { colour: 'red' });
```

@@ -25,1 +23,2 @@

The job will now wait **at** **least** 5 seconds before it is processed. There are many other options available such as priorities, backoff settings, lifo behaviour, remove-on-complete policies, etc. Please check the remaining of this guide for more information regarding these options.

@@ -31,3 +31,3 @@ ---

addJobs();
await addJobs();
```

@@ -110,1 +110,2 @@

{% endhint %}

@@ -18,3 +18,3 @@ # Table of contents

* [Repeatable](guide/jobs/repeatable.md)
* [Proritized](guide/jobs/proritized.md)
* [Prioritized](guide/jobs/proritized.md)
* [Stalled](guide/jobs/stalled.md)

@@ -31,3 +31,3 @@ * [Rate limiting](guide/rate-limiting.md)

* [Producer - Consumer](patterns/producer-consumer.md)
* [Workflows](patterns/chaining-jobs.md)
* [Flows](patterns/flows.md)
* [Real time updates](patterns/real-time-updates.md)

@@ -34,0 +34,0 @@ * [Sender - Receiver](patterns/sender-receiver.md)

@@ -7,3 +7,3 @@ ---

BullMQ is a [Node.js](https://nodejs.org) library that implements a fast and robust queue system based on [Redis](https://redis.io/).
BullMQ is a [Node.js](https://nodejs.org) library that implements a fast and robust queue system built on top of [Redis](https://redis.io/).

@@ -33,1 +33,2 @@ The library is designed so that it will fulfil the following goals:

* [x] **Automatic recovery from process crashes**
{
"name": "bullmq",
"version": "1.9.0",
"version": "1.10.0",
"description": "Queue for messages and jobs based on Redis",

@@ -11,15 +11,15 @@ "main": "dist/index.js",

"scripts": {
"docs": "typedoc --out docs src",
"build": "tsc && yarn copylua",
"copylua": "copyfiles -f ./src/commands/*.lua ./dist/commands",
"lint": "tslint --project tsconfig.json -c tslint.json 'src/**/*.ts'",
"test": "yarn lint && tsc && ts-mocha --paths 'src/**/test_*.ts' --exit",
"test:watch": "yarn lint && ts-mocha --paths 'src/**/test_*.ts' -w --watch-extensions ts",
"coverage": "nyc --reporter=text npm run test",
"coveralls": "nyc report --reporter=text-lcov | coveralls",
"docs": "typedoc --out docs src",
"lint": "tslint --project tsconfig.json -c tslint.json 'src/**/*.ts'",
"precommit": "yarn lint",
"prepare": "yarn build",
"prettier": "prettier --config package.json src/**/*.ts",
"semantic-release": "semantic-release",
"prettier": "prettier --config package.json src/**/*.ts",
"semantic-release-prepare": "ts-node tools/semantic-release-prepare",
"prepublish": "yarn build"
"test": "yarn lint && tsc && ts-mocha --paths 'src/**/test_*.ts' --exit",
"test:watch": "yarn lint && ts-mocha --paths 'src/**/test_*.ts' -w --watch-extensions ts"
},

@@ -26,0 +26,0 @@ "dependencies": {

@@ -32,2 +32,20 @@ <div align="center">

# Poll
Which other languages would you like to see BullMQ ported to?
Please vote here: https://twitter.com/manast/status/1318168684049977345
# Official FrontEnd
[<img src="http://taskforce.sh/assets/logo_square.png" width="100" alt="Taskforce.sh, Inc" style="padding: 100px"/>](https://taskforce.sh)
Super charge your queues with a profesional front end and optional Redis hosting:
- Get a complete overview of all your queues.
- Inspect jobs, search, retry, or promote delayed jobs.
- Metrics and statistics.
- and many more features.
Sign up at [Taskforce.sh](https://taskforce.sh)
# The gist

@@ -34,0 +52,0 @@

@@ -7,5 +7,8 @@ import { ChildProcess, fork } from 'child_process';

import { promisify } from 'util';
import { killAsync } from './process-utils';
const stat = promisify(fs.stat);
const CHILD_KILL_TIMEOUT = 30_000;
export interface ChildProcessExt extends ChildProcess {

@@ -19,3 +22,4 @@ processFile?: string;

forEach(execArgv, async arg => {
for (let i = 0; i < execArgv.length; i++) {
const arg = execArgv[i];
if (arg.indexOf('--inspect') === -1) {

@@ -28,3 +32,3 @@ standard.push(arg);

}
});
}

@@ -34,7 +38,17 @@ return standard.concat(convertedArgs);

const initChild = function(child: ChildProcess, processFile: string) {
return new Promise(resolve => {
child.send({ cmd: 'init', value: processFile }, resolve);
async function initChild(child: ChildProcess, processFile: string) {
const onComplete = new Promise(resolve => {
const onMessageHandler = (msg: any) => {
if (msg.cmd === 'init-complete') {
resolve();
child.off('message', onMessageHandler);
}
};
child.on('message', onMessageHandler);
});
};
await new Promise(resolve =>
child.send({ cmd: 'init', value: processFile }, resolve),
);
await onComplete;
}

@@ -96,17 +110,13 @@ export class ChildPool {

kill(child: ChildProcess, signal?: NodeJS.Signals) {
child.kill(signal || 'SIGKILL');
async kill(child: ChildProcess, signal: 'SIGTERM' | 'SIGKILL' = 'SIGKILL') {
this.remove(child);
await killAsync(child, signal, CHILD_KILL_TIMEOUT);
}
clean() {
async clean() {
const children = values(this.retained).concat(this.getAllFree());
children.forEach(child => {
// TODO: We may want to use SIGKILL if the process does not die after some time.
this.kill(child, 'SIGTERM');
});
this.retained = {};
this.free = {};
await Promise.all(children.map(c => this.kill(c, 'SIGTERM')));
}

@@ -113,0 +123,0 @@

@@ -370,9 +370,9 @@ import { Redis, Pipeline } from 'ioredis';

function onCompleted(args: any) {
removeListeners();
resolve(args.returnvalue);
removeListeners();
}
function onFailed(args: any) {
removeListeners();
reject(new Error(args.failedReason || args));
removeListeners();
}

@@ -379,0 +379,0 @@

@@ -13,3 +13,12 @@ /**

let processor: any;
let currentJobPromise: Promise<unknown> | undefined;
// same as process.send but waits until the send is complete
// the async version is used below because otherwise
// the termination handler may exit before the parent
// process has received the messages it requires
const processSendAsync = promisify(process.send.bind(process)) as (
msg: any,
) => Promise<void>;
// https://stackoverflow.com/questions/18391212/is-it-not-possible-to-stringify-an-error-using-json-stringify

@@ -33,3 +42,15 @@ if (!('toJSON' in Error.prototype)) {

process.on('message', async msg => {
async function waitForCurrentJobAndExit() {
status = 'TERMINATING';
try {
await currentJobPromise;
} finally {
process.exit(process.exitCode || 0);
}
}
process.on('SIGTERM', waitForCurrentJobAndExit);
process.on('SIGINT', waitForCurrentJobAndExit);
process.on('message', msg => {
switch (msg.cmd) {

@@ -55,2 +76,5 @@ case 'init':

status = 'IDLE';
process.send({
cmd: 'init-complete',
});
break;

@@ -66,19 +90,22 @@

status = 'STARTED';
try {
const result = await Promise.resolve(processor(wrapJob(msg.job)) || {});
process.send({
cmd: 'completed',
value: result,
});
} catch (err) {
if (!err.message) {
err = new Error(err);
currentJobPromise = (async () => {
try {
const result = (await processor(wrapJob(msg.job))) || {};
await processSendAsync({
cmd: 'completed',
value: result,
});
} catch (err) {
if (!err.message) {
err = new Error(err);
}
await processSendAsync({
cmd: 'failed',
value: err,
});
} finally {
status = 'IDLE';
currentJobPromise = undefined;
}
process.send({
cmd: 'failed',
value: err,
});
} finally {
status = 'IDLE';
}
})();
break;

@@ -85,0 +112,0 @@ case 'stop':

@@ -43,2 +43,6 @@ import { QueueSchedulerOptions } from '../interfaces';

if (!(this.opts as QueueSchedulerOptions).stalledInterval) {
throw new Error('Stalled interval cannot be zero or undefined');
}
// tslint:disable: no-floating-promises

@@ -45,0 +49,0 @@ this.run();

@@ -84,4 +84,11 @@ import * as fs from 'fs';

});
this.on('error', err => console.error(err));
}
async waitUntilReady() {
await super.waitUntilReady();
return this.blockingConnection.client;
}
get repeat() {

@@ -101,3 +108,3 @@ return new Promise<Repeat>(async resolve => {

private async run() {
const client = await this.client;
const client = await this.blockingConnection.client;

@@ -137,8 +144,10 @@ if (this.closing) {

* Get the first promise that completes
* Explanation https://stackoverflow.com/a/42898229/1848640
*/
const [completed] = await Promise.race(
[...processing.keys()].map(p => p.then(() => [p])),
const promises = [...processing.keys()];
const completedIdx = await Promise.race(
promises.map((p, idx) => p.then(() => idx)),
);
const completed = promises[completedIdx];
const token = processing.get(completed);

@@ -249,2 +258,4 @@ processing.delete(completed);

//
// TODO: Have only 1 timer that extends all the locks instead of one timer
// per concurrency setting.
let lockRenewId: string;

@@ -292,6 +303,7 @@ let timerStopped = false;

this.emit('failed', job, err, 'active');
} catch (e) {
} catch (err) {
this.emit('error', err);
// It probably means that the job has lost the lock before completion
// The QueueScheduler will (or already has) move the job to the waiting list (as stalled)
this.emit('error', e);
// The QueueScheduler will (or already has) moved the job back
// to the waiting list (as stalled)
}

@@ -372,28 +384,35 @@ };

async close(force = false) {
if (!this.closing) {
close(force = false) {
if (this.closing) {
return this.closing;
}
this.closing = (async () => {
this.emit('closing', 'closing queue');
this.closing = new Promise(async (resolve, reject) => {
try {
const client = await this.blockingConnection.client;
await this.resume();
const client = await this.blockingConnection.client;
if (!force) {
await this.whenCurrentJobsFinished(false);
} else {
await client.disconnect();
this.resume();
await Promise.resolve()
.finally(() => {
return force || this.whenCurrentJobsFinished(false);
})
.finally(() => {
const closePoolPromise = this.childPool?.clean();
if (force) {
// since we're not waiting for the job to end attach
// an error handler to avoid crashing the whole process
closePoolPromise?.catch(err => {
console.error(err);
});
return;
}
} catch (err) {
reject(err);
} finally {
this.timerManager.clearAllTimers();
this.childPool && this.childPool.clean();
}
this.emit('closed');
resolve();
});
}
return closePoolPromise;
})
.finally(() => client.disconnect())
.finally(() => this.timerManager.clearAllTimers())
.finally(() => this.emit('closed'));
})();
return this.closing;
}
}

@@ -11,5 +11,3 @@ import { expect } from 'chai';

afterEach(() => {
pool.clean();
});
afterEach(() => pool.clean());

@@ -52,3 +50,3 @@ it('should return same child if free', async () => {

expect(child).to.be.ok;
pool.kill(child);
await pool.kill(child);
expect(pool.retained).to.be.empty;

@@ -72,3 +70,3 @@ const newChild = await pool.retain(processor);

expect(children).not.to.include(child);
});
}).timeout(10000);

@@ -90,3 +88,3 @@ it('should return an old child if many retained and one free', async () => {

expect(children).to.include(child);
});
}).timeout(10000);
});

@@ -139,3 +139,4 @@ import { Queue } from '@src/classes';

const timeDiff = Date.now() - startTime;
expect(timeDiff).to.be.gte(numGroups * 1000);
// In some test envs, these timestamps can drift.
expect(timeDiff).to.be.gte(numGroups * 990);
expect(timeDiff).to.be.below((numGroups + 1) * 1100);

@@ -148,3 +149,3 @@

expect(diff).to.be.below(2000);
expect(diff).to.be.gte(1000);
expect(diff).to.be.gte(990);
prevTime = completed[group][i];

@@ -151,0 +152,0 @@ }

@@ -336,2 +336,42 @@ import { expect } from 'chai';

});
it('should allow the job to complete and then exit on worker close', async function() {
this.timeout(1500000);
const processFile = __dirname + '/fixtures/fixture_processor_slow.js';
const worker = new Worker(queueName, processFile);
// aquire and release a child here so we know it has it's full termination handler setup
const initalizedChild = await worker['childPool'].retain(processFile);
await worker['childPool'].release(initalizedChild);
// await this After we've added the job
const onJobActive = new Promise<void>(resolve => {
worker.on('active', resolve);
});
const jobAdd = queue.add('foo', {});
await onJobActive;
expect(Object.keys(worker['childPool'].retained)).to.have.lengthOf(1);
expect(worker['childPool'].getAllFree()).to.have.lengthOf(0);
const child = Object.values(worker['childPool'].retained)[0];
expect(child).to.equal(initalizedChild);
expect(child.exitCode).to.equal(null);
expect(child.killed).to.equal(false);
// at this point the job should be active and running on the child
// trigger a close while we know it's doing work
await worker.close();
// ensure the child did get cleaned up
expect(!!child.killed).to.eql(true);
expect(Object.keys(worker['childPool'].retained)).to.have.lengthOf(0);
expect(worker['childPool'].getAllFree()).to.have.lengthOf(0);
const job = await jobAdd;
// check that the job did finish successfully
const jobResult = await job.waitUntilFinished(queueEvents);
expect(jobResult).to.equal(42);
});
});

@@ -933,2 +933,53 @@ import { Queue, QueueEvents, Job, Worker, QueueScheduler } from '@src/classes';

it('continue processing after a worker has stalled', async function() {
let first = true;
this.timeout(10000);
const worker = new Worker(
queueName,
async job => {
if (first) {
first = false;
return delay(2000);
}
},
{
lockDuration: 1000,
lockRenewTime: 3000, // The lock will not be updated
},
);
await worker.waitUntilReady();
const queueScheduler = new QueueScheduler(queueName, {
stalledInterval: 100,
});
await queueScheduler.waitUntilReady();
const job = await queue.add('test', { bar: 'baz' });
const completed = new Promise(resolve => {
worker.on('completed', resolve);
});
await completed;
await worker.close();
await queueScheduler.close();
});
it('stalled interval cannot be zero', function(done) {
this.timeout(10000);
let queueScheduler;
try {
queueScheduler = new QueueScheduler(queueName, {
stalledInterval: 0,
});
// Fail test if we reach here.
done(new Error('Should throw an exception'));
} catch (err) {
done();
}
});
describe('Concurrency process', () => {

@@ -935,0 +986,0 @@ it('should run job in sequence if I specify a concurrency of 1', async () => {

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc