Socket
Socket
Sign inDemoInstall

bullmq

Package Overview
Dependencies
Maintainers
1
Versions
531
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

bullmq - npm Package Compare versions

Comparing version 1.17.0 to 1.18.0

dist/commands/removeJob-1.lua

19

dist/bullmq.d.ts

@@ -258,3 +258,3 @@ /// <reference types="node" />

declare interface JobNode {
export declare interface JobNode {
job: Job;

@@ -359,2 +359,3 @@ children?: JobNode[];

parentDependenciesKey?: string;
parentKey?: string;
};

@@ -408,3 +409,8 @@

id: string;
endDate: number;
endDate: number; /**
* Drains the queue, i.e., removes all jobs that are waiting
* or delayed, but not active, completed or failed.
*
* TODO: Convert to an atomic LUA script.
*/
tz: string;

@@ -417,2 +423,11 @@ cron: string;

/**
* Removes the given job from the queue as well as all its
* dependencies.
*
* @param jobId The if of the job to remove
* @returns 1 if it managed to remove the job or -1 if the job or
* any of its dependencies was locked.
*/
remove(jobId: string): Promise<any>;
/**
* Drains the queue, i.e., removes all jobs that are waiting

@@ -419,0 +434,0 @@ * or delayed, but not active, completed or failed.

3

dist/classes/flow.d.ts

@@ -8,3 +8,3 @@ /// <reference types="node" />

import { Job } from './job';
interface JobNode {
export interface JobNode {
job: Job;

@@ -66,2 +66,1 @@ children?: JobNode[];

}
export {};

@@ -75,2 +75,3 @@ "use strict";

const job = new job_1.Job(queue, node.name, node.data, Object.assign(Object.assign({}, node.opts), { parent: parent === null || parent === void 0 ? void 0 : parent.parentOpts }), ((_a = node.opts) === null || _a === void 0 ? void 0 : _a.jobId) || parentId);
const parentKey = getParentKey(parent === null || parent === void 0 ? void 0 : parent.parentOpts);
if (node.children) {

@@ -83,2 +84,3 @@ // Create parent job, will be a job in status "waiting-children".

waitChildrenKey,
parentKey,
});

@@ -98,2 +100,3 @@ const parentDependenciesKey = `${queueKeysParent.toKey(node.queueName, parentId)}:dependencies`;

parentDependenciesKey: parent === null || parent === void 0 ? void 0 : parent.parentDependenciesKey,
parentKey,
});

@@ -140,2 +143,7 @@ return { job };

exports.Flow = Flow;
function getParentKey(opts) {
if (opts) {
return `${opts.queue}:${opts.id}`;
}
}
//# sourceMappingURL=flow.js.map

@@ -44,3 +44,8 @@ import { JobsOptions, QueueOptions, RepeatOptions } from '../interfaces';

id: string;
endDate: number;
endDate: number; /**
* Drains the queue, i.e., removes all jobs that are waiting
* or delayed, but not active, completed or failed.
*
* TODO: Convert to an atomic LUA script.
*/
tz: string;

@@ -53,2 +58,11 @@ cron: string;

/**
* Removes the given job from the queue as well as all its
* dependencies.
*
* @param jobId The if of the job to remove
* @returns 1 if it managed to remove the job or -1 if the job or
* any of its dependencies was locked.
*/
remove(jobId: string): Promise<any>;
/**
* Drains the queue, i.e., removes all jobs that are waiting

@@ -55,0 +69,0 @@ * or delayed, but not active, completed or failed.

@@ -98,2 +98,13 @@ "use strict";

/**
* Removes the given job from the queue as well as all its
* dependencies.
*
* @param jobId The if of the job to remove
* @returns 1 if it managed to remove the job or -1 if the job or
* any of its dependencies was locked.
*/
async remove(jobId) {
return scripts_1.Scripts.remove(this, jobId);
}
/**
* Drains the queue, i.e., removes all jobs that are waiting

@@ -100,0 +111,0 @@ * or delayed, but not active, completed or failed.

@@ -12,2 +12,3 @@ /**

parentDependenciesKey?: string;
parentKey?: string;
};

@@ -14,0 +15,0 @@ export declare class Scripts {

@@ -25,2 +25,3 @@ /**

parentDependenciesKey: null,
parentKey: null,
}) {

@@ -51,2 +52,3 @@ const queueKeys = queue.keys;

opts.lifo ? 'RPUSH' : 'LPUSH',
parentOpts.parentKey,
];

@@ -69,14 +71,4 @@ keys = keys.concat(args);

const client = await queue.client;
const keys = [
'active',
'wait',
'delayed',
'paused',
'completed',
'failed',
'priority',
jobId,
`${jobId}:logs`,
].map(name => queue.toKey(name));
return client.removeJob(keys.concat([queue.keys.events, jobId]));
const keys = [jobId].map(name => queue.toKey(name));
return client.removeJob(keys.concat([jobId]));
}

@@ -83,0 +75,0 @@ static async extendLock(queue, jobId, token, duration) {

@@ -258,3 +258,133 @@ "use strict";

});
mocha_1.describe('remove', () => {
mocha_1.it('should remove all children when removing a parent', async () => {
const parentQueueName = 'parent-queue';
const name = 'child-job';
const flow = new classes_1.Flow();
const tree = await flow.add({
name: 'parent-job',
queueName: parentQueueName,
data: {},
children: [
{ name, data: { idx: 0, foo: 'bar' }, queueName },
{
name,
data: { idx: 0, foo: 'baz' },
queueName,
children: [{ name, data: { idx: 0, foo: 'qux' }, queueName }],
},
],
});
chai_1.expect(await tree.job.getState()).to.be.equal('waiting-children');
chai_1.expect(await tree.children[0].job.getState()).to.be.equal('waiting');
chai_1.expect(await tree.children[1].job.getState()).to.be.equal('waiting-children');
chai_1.expect(await tree.children[1].children[0].job.getState()).to.be.equal('waiting');
await tree.job.remove();
const parentQueue = new classes_1.Queue(parentQueueName);
const parentJob = await classes_1.Job.fromId(parentQueue, tree.job.id);
chai_1.expect(parentJob).to.be.undefined;
for (let i = 0; i < tree.children.length; i++) {
const child = tree.children[i];
const childJob = await classes_1.Job.fromId(queue, child.job.id);
chai_1.expect(childJob).to.be.undefined;
}
chai_1.expect(await tree.children[0].job.getState()).to.be.equal('unknown');
chai_1.expect(await tree.children[1].job.getState()).to.be.equal('unknown');
chai_1.expect(await tree.job.getState()).to.be.equal('unknown');
await flow.close();
await utils_1.removeAllQueueData(new IORedis(), parentQueueName);
});
mocha_1.it('should not remove anything if there is a locked job in the tree', async () => {
const parentQueueName = 'parent-queue';
const name = 'child-job';
const worker = new classes_1.Worker(queueName);
const flow = new classes_1.Flow();
const tree = await flow.add({
name: 'parent-job',
queueName: parentQueueName,
data: {},
children: [
{ name, data: { idx: 0, foo: 'bar' }, queueName },
{ name, data: { idx: 0, foo: 'baz' }, queueName },
],
});
// Get job so that it gets locked.
const nextJob = await worker.getNextJob('1234');
chai_1.expect(nextJob).to.not.be.undefined;
chai_1.expect(await nextJob.getState()).to.be.equal('active');
try {
await tree.job.remove();
}
catch (err) { }
chai_1.expect(await tree.job.getState()).to.be.equal('waiting-children');
chai_1.expect(await tree.children[0].job.getState()).to.be.equal('active');
chai_1.expect(await tree.children[1].job.getState()).to.be.equal('waiting');
await flow.close();
await utils_1.removeAllQueueData(new IORedis(), parentQueueName);
});
mocha_1.it('should remove from parent dependencies and move parent to wait', async () => {
const parentQueueName = 'parent-queue';
const name = 'child-job';
const flow = new classes_1.Flow();
const tree = await flow.add({
name: 'root-job',
queueName: parentQueueName,
data: {},
children: [
{
name,
data: { idx: 0, foo: 'bar' },
queueName,
children: [
{
name,
data: { idx: 1, foo: 'baz' },
queueName,
children: [{ name, data: { idx: 2, foo: 'qux' }, queueName }],
},
],
},
],
});
// We remove from deepest child and upwards to check if jobs
// are moved to the wait status correctly
const parentQueue = new classes_1.Queue(parentQueueName);
await removeChildJob(tree.children[0].children[0]);
await removeChildJob(tree.children[0]);
await removeChildJob(tree);
async function removeChildJob(node) {
chai_1.expect(await node.job.getState()).to.be.equal('waiting-children');
await node.children[0].job.remove();
chai_1.expect(await node.job.getState()).to.be.equal('waiting');
}
await flow.close();
await parentQueue.close();
await utils_1.removeAllQueueData(new IORedis(), parentQueueName);
});
mocha_1.it(`should only move parent to wait when all children have been removed`, async () => {
const parentQueueName = 'parent-queue';
const name = 'child-job';
const flow = new classes_1.Flow();
const tree = await flow.add({
name: 'parent-job',
queueName: parentQueueName,
data: {},
children: [
{ name, data: { idx: 0, foo: 'bar' }, queueName },
{ name, data: { idx: 0, foo: 'baz' }, queueName },
],
});
chai_1.expect(await tree.job.getState()).to.be.equal('waiting-children');
chai_1.expect(await tree.children[0].job.getState()).to.be.equal('waiting');
await tree.children[0].job.remove();
chai_1.expect(await tree.children[0].job.getState()).to.be.equal('unknown');
chai_1.expect(await tree.job.getState()).to.be.equal('waiting-children');
await tree.children[1].job.remove();
chai_1.expect(await tree.children[1].job.getState()).to.be.equal('unknown');
chai_1.expect(await tree.job.getState()).to.be.equal('waiting');
await flow.close();
await utils_1.removeAllQueueData(new IORedis(), parentQueueName);
});
});
});
//# sourceMappingURL=test_flow.js.map

@@ -117,2 +117,3 @@ /*eslint-env node */

});
// TODO: Add more remove tests
mocha_1.describe('.progress', function () {

@@ -119,0 +120,0 @@ mocha_1.it('can set and get progress as number', async function () {

@@ -0,1 +1,8 @@

# [1.18.0](https://github.com/taskforcesh/bullmq/compare/v1.17.0...v1.18.0) (2021-04-16)
### Features
* add remove support for flows ([4e8a7ef](https://github.com/taskforcesh/bullmq/commit/4e8a7efd53f918937478ae13f5da7dee9ea9d8b3))
# [1.17.0](https://github.com/taskforcesh/bullmq/compare/v1.16.2...v1.17.0) (2021-04-16)

@@ -2,0 +9,0 @@

{
"name": "bullmq",
"version": "1.17.0",
"version": "1.18.0",
"description": "Queue for messages and jobs based on Redis",

@@ -5,0 +5,0 @@ "main": "dist/index.js",

@@ -11,4 +11,5 @@ import uuid = require('uuid');

import { Job } from './job';
import { ParentOpts } from './scripts';
interface JobNode {
export interface JobNode {
job: Job;

@@ -127,2 +128,4 @@ children?: JobNode[];

const parentKey = getParentKey(parent?.parentOpts);
if (node.children) {

@@ -139,2 +142,3 @@ // Create parent job, will be a job in status "waiting-children".

waitChildrenKey,
parentKey,
});

@@ -159,2 +163,3 @@

parentDependenciesKey: parent?.parentDependenciesKey,
parentKey,
});

@@ -215,1 +220,7 @@

}
function getParentKey(opts: { id: string; queue: string }) {
if (opts) {
return `${opts.queue}:${opts.id}`;
}
}

@@ -146,2 +146,14 @@ import { Parent } from '../interfaces/parent';

/**
* Removes the given job from the queue as well as all its
* dependencies.
*
* @param jobId The if of the job to remove
* @returns 1 if it managed to remove the job or -1 if the job or
* any of its dependencies was locked.
*/
async remove(jobId: string) {
return Scripts.remove(this, jobId);
}
/**
* Drains the queue, i.e., removes all jobs that are waiting

@@ -148,0 +160,0 @@ * or delayed, but not active, completed or failed.

@@ -36,2 +36,3 @@ /**

parentDependenciesKey?: string;
parentKey?: string;
};

@@ -64,2 +65,3 @@

parentDependenciesKey: null,
parentKey: null,
},

@@ -92,2 +94,3 @@ ) {

opts.lifo ? 'RPUSH' : 'LPUSH',
parentOpts.parentKey,
];

@@ -120,14 +123,4 @@

const keys = [
'active',
'wait',
'delayed',
'paused',
'completed',
'failed',
'priority',
jobId,
`${jobId}:logs`,
].map(name => queue.toKey(name));
return (<any>client).removeJob(keys.concat([queue.keys.events, jobId]));
const keys = [jobId].map(name => queue.toKey(name));
return (<any>client).removeJob(keys.concat([jobId]));
}

@@ -134,0 +127,0 @@

@@ -1,2 +0,2 @@

import { Queue, Worker, Job, Flow } from '../classes';
import { Queue, Worker, Job, Flow, JobNode } from '../classes';
import { expect } from 'chai';

@@ -330,2 +330,170 @@ import * as IORedis from 'ioredis';

});
describe('remove', () => {
it('should remove all children when removing a parent', async () => {
const parentQueueName = 'parent-queue';
const name = 'child-job';
const flow = new Flow();
const tree = await flow.add({
name: 'parent-job',
queueName: parentQueueName,
data: {},
children: [
{ name, data: { idx: 0, foo: 'bar' }, queueName },
{
name,
data: { idx: 0, foo: 'baz' },
queueName,
children: [{ name, data: { idx: 0, foo: 'qux' }, queueName }],
},
],
});
expect(await tree.job.getState()).to.be.equal('waiting-children');
expect(await tree.children[0].job.getState()).to.be.equal('waiting');
expect(await tree.children[1].job.getState()).to.be.equal(
'waiting-children',
);
expect(await tree.children[1].children[0].job.getState()).to.be.equal(
'waiting',
);
await tree.job.remove();
const parentQueue = new Queue(parentQueueName);
const parentJob = await Job.fromId(parentQueue, tree.job.id);
expect(parentJob).to.be.undefined;
for (let i = 0; i < tree.children.length; i++) {
const child = tree.children[i];
const childJob = await Job.fromId(queue, child.job.id);
expect(childJob).to.be.undefined;
}
expect(await tree.children[0].job.getState()).to.be.equal('unknown');
expect(await tree.children[1].job.getState()).to.be.equal('unknown');
expect(await tree.job.getState()).to.be.equal('unknown');
await flow.close();
await removeAllQueueData(new IORedis(), parentQueueName);
});
it('should not remove anything if there is a locked job in the tree', async () => {
const parentQueueName = 'parent-queue';
const name = 'child-job';
const worker = new Worker(queueName);
const flow = new Flow();
const tree = await flow.add({
name: 'parent-job',
queueName: parentQueueName,
data: {},
children: [
{ name, data: { idx: 0, foo: 'bar' }, queueName },
{ name, data: { idx: 0, foo: 'baz' }, queueName },
],
});
// Get job so that it gets locked.
const nextJob = await worker.getNextJob('1234');
expect(nextJob).to.not.be.undefined;
expect(await (nextJob as Job).getState()).to.be.equal('active');
try {
await tree.job.remove();
} catch (err) {}
expect(await tree.job.getState()).to.be.equal('waiting-children');
expect(await tree.children[0].job.getState()).to.be.equal('active');
expect(await tree.children[1].job.getState()).to.be.equal('waiting');
await flow.close();
await removeAllQueueData(new IORedis(), parentQueueName);
});
it('should remove from parent dependencies and move parent to wait', async () => {
const parentQueueName = 'parent-queue';
const name = 'child-job';
const flow = new Flow();
const tree = await flow.add({
name: 'root-job',
queueName: parentQueueName,
data: {},
children: [
{
name,
data: { idx: 0, foo: 'bar' },
queueName,
children: [
{
name,
data: { idx: 1, foo: 'baz' },
queueName,
children: [{ name, data: { idx: 2, foo: 'qux' }, queueName }],
},
],
},
],
});
// We remove from deepest child and upwards to check if jobs
// are moved to the wait status correctly
const parentQueue = new Queue(parentQueueName);
await removeChildJob(tree.children[0].children[0]);
await removeChildJob(tree.children[0]);
await removeChildJob(tree);
async function removeChildJob(node: JobNode) {
expect(await node.job.getState()).to.be.equal('waiting-children');
await node.children[0].job.remove();
expect(await node.job.getState()).to.be.equal('waiting');
}
await flow.close();
await parentQueue.close();
await removeAllQueueData(new IORedis(), parentQueueName);
});
it(`should only move parent to wait when all children have been removed`, async () => {
const parentQueueName = 'parent-queue';
const name = 'child-job';
const flow = new Flow();
const tree = await flow.add({
name: 'parent-job',
queueName: parentQueueName,
data: {},
children: [
{ name, data: { idx: 0, foo: 'bar' }, queueName },
{ name, data: { idx: 0, foo: 'baz' }, queueName },
],
});
expect(await tree.job.getState()).to.be.equal('waiting-children');
expect(await tree.children[0].job.getState()).to.be.equal('waiting');
await tree.children[0].job.remove();
expect(await tree.children[0].job.getState()).to.be.equal('unknown');
expect(await tree.job.getState()).to.be.equal('waiting-children');
await tree.children[1].job.remove();
expect(await tree.children[1].job.getState()).to.be.equal('unknown');
expect(await tree.job.getState()).to.be.equal('waiting');
await flow.close();
await removeAllQueueData(new IORedis(), parentQueueName);
});
});
});

@@ -139,2 +139,4 @@ /*eslint-env node */

// TODO: Add more remove tests
describe('.progress', function() {

@@ -141,0 +143,0 @@ it('can set and get progress as number', async function() {

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc