Socket
Socket
Sign inDemoInstall

bullmq

Package Overview
Dependencies
Maintainers
1
Versions
531
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

bullmq - npm Package Compare versions

Comparing version 1.10.0 to 1.11.0

12

CHANGELOG.md

@@ -0,1 +1,13 @@

# [1.11.0](https://github.com/taskforcesh/bullmq/compare/v1.10.0...v1.11.0) (2020-11-24)
### Bug Fixes
* add generic type to processor ([d4f6501](https://github.com/taskforcesh/bullmq/commit/d4f650120804bd6161f0eeda5162ad5a96811a05))
### Features
* add name and return types to queue, worker and processor ([4879715](https://github.com/taskforcesh/bullmq/commit/4879715ec7c917f11e3a0ac3c5f5126029340ed3))
# [1.10.0](https://github.com/taskforcesh/bullmq/compare/v1.9.0...v1.10.0) (2020-10-20)

@@ -2,0 +14,0 @@

2

dist/classes/compat.d.ts

@@ -46,3 +46,3 @@ /// <reference types="node" />

*/
process(processor: string | Processor): Promise<void>;
process(processor: string | Processor<T>): Promise<void>;
add(jobName: string, data: any, opts?: JobsOptions): Promise<Job>;

@@ -49,0 +49,0 @@ /**

@@ -17,5 +17,5 @@ import { JobsOptions } from '../interfaces';

}
export declare class Job<T = any, R = any> {
export declare class Job<T = any, R = any, N extends string = string> {
private queue;
name: string;
name: N;
data: T;

@@ -34,10 +34,10 @@ opts: JobsOptions;

private discarded;
constructor(queue: QueueBase, name: string, data: T, opts?: JobsOptions, id?: string);
static create<T = any, R = any>(queue: QueueBase, name: string, data: T, opts?: JobsOptions): Promise<Job<T, R>>;
static createBulk<T = any, R = any>(queue: QueueBase, jobs: {
name: string;
constructor(queue: QueueBase, name: N, data: T, opts?: JobsOptions, id?: string);
static create<T = any, R = any, N extends string = string>(queue: QueueBase, name: N, data: T, opts?: JobsOptions): Promise<Job<T, R, N>>;
static createBulk<T = any, R = any, N extends string = string>(queue: QueueBase, jobs: {
name: N;
data: T;
opts?: JobsOptions;
}[]): Promise<Job<T, R>[]>;
static fromJSON(queue: QueueBase, json: any, jobId?: string): Job<any, any>;
}[]): Promise<Job<T, R, N>[]>;
static fromJSON(queue: QueueBase, json: any, jobId?: string): Job<any, any, any>;
static fromId(queue: QueueBase, jobId: string): Promise<Job | undefined>;

@@ -44,0 +44,0 @@ toJSON(): Pick<this, Exclude<keyof this, "queue">>;

@@ -23,9 +23,9 @@ import { QueueBase } from './queue-base';

getWaitingCount(): Promise<number>;
getWaiting(start?: number, end?: number): Promise<Job<any, any>[]>;
getActive(start?: number, end?: number): Promise<Job<any, any>[]>;
getDelayed(start?: number, end?: number): Promise<Job<any, any>[]>;
getCompleted(start?: number, end?: number): Promise<Job<any, any>[]>;
getFailed(start?: number, end?: number): Promise<Job<any, any>[]>;
getWaiting(start?: number, end?: number): Promise<Job<any, any, string>[]>;
getActive(start?: number, end?: number): Promise<Job<any, any, string>[]>;
getDelayed(start?: number, end?: number): Promise<Job<any, any, string>[]>;
getCompleted(start?: number, end?: number): Promise<Job<any, any, string>[]>;
getFailed(start?: number, end?: number): Promise<Job<any, any, string>[]>;
getRanges(types: string[], start?: number, end?: number, asc?: boolean): Promise<any[]>;
getJobs(types: string[] | string, start?: number, end?: number, asc?: boolean): Promise<Job<any, any>[]>;
getJobs(types: string[] | string, start?: number, end?: number, asc?: boolean): Promise<Job<any, any, string>[]>;
getJobLogs(jobId: string, start?: number, end?: number): Promise<{

@@ -32,0 +32,0 @@ logs: any;

import { JobsOptions, QueueOptions, RepeatOptions } from '../interfaces';
import { Job, QueueGetters, Repeat } from './';
export declare class Queue<T = any> extends QueueGetters {
export declare class Queue<T = any, R = any, N extends string = string> extends QueueGetters {
token: string;

@@ -13,3 +13,3 @@ jobsOpts: JobsOptions;

get repeat(): Promise<Repeat>;
add(name: string, data: T, opts?: JobsOptions): Promise<Job<any, any>>;
add(name: N, data: T, opts?: JobsOptions): Promise<Job<any, any, string>>;
private jobIdForGroup;

@@ -23,6 +23,6 @@ /**

addBulk(jobs: {
name: string;
name: N;
data: T;
opts?: JobsOptions;
}[]): Promise<Job<T, any>[]>;
}[]): Promise<Job<T, any, N>[]>;
/**

@@ -50,3 +50,3 @@ Pauses the processing of this queue globally.

}[]>;
removeRepeatable(name: string, repeatOpts: RepeatOptions, jobId?: string): Promise<any>;
removeRepeatable(name: N, repeatOpts: RepeatOptions, jobId?: string): Promise<any>;
removeRepeatableByKey(key: string): Promise<any>;

@@ -53,0 +53,0 @@ /**

import { JobsOptions, RepeatOptions } from '../interfaces';
import { Job, QueueBase } from './';
export declare class Repeat extends QueueBase {
addNextRepeatableJob(name: string, data: any, opts: JobsOptions, skipCheckExists?: boolean): Promise<Job<any, any>>;
addNextRepeatableJob(name: string, data: any, opts: JobsOptions, skipCheckExists?: boolean): Promise<Job<any, any, string>>;
private createNextJob;

@@ -6,0 +6,0 @@ removeRepeatable(name: string, repeat: RepeatOptions, jobId?: string): Promise<any>;

@@ -1,2 +0,3 @@

declare const sandbox: (processFile: any, childPool: any) => (job: any) => Promise<unknown>;
import { Job } from './job';
declare const sandbox: <T, R, N extends string>(processFile: any, childPool: any) => (job: Job<T, R, N>) => Promise<R>;
export default sandbox;

@@ -13,3 +13,3 @@ "use strict";

const done = new Promise((resolve, reject) => {
msgHandler = (msg) => {
msgHandler = async (msg) => {
switch (msg.cmd) {

@@ -27,6 +27,6 @@ case 'completed':

case 'progress':
job.updateProgress(msg.value);
await job.updateProgress(msg.value);
break;
case 'log':
job.log(msg.value);
await job.log(msg.value);
break;

@@ -33,0 +33,0 @@ }

@@ -13,3 +13,3 @@ /**

static remove(queue: QueueBase, jobId: string): Promise<any>;
static extendLock(worker: Worker, jobId: string, token: string): Promise<any>;
static extendLock<T, R, N extends string>(worker: Worker<T, R, N>, jobId: string, token: string): Promise<any>;
static updateProgress(queue: QueueBase, job: Job, progress: number | object): Promise<void>;

@@ -41,3 +41,3 @@ static moveToFinishedArgs(queue: QueueBase, job: Job, val: any, propVal: string, shouldRemove: boolean | number, target: string, token: string, fetchNext?: boolean): string[];

static reprocessJob(queue: QueueBase, job: Job, state: 'failed' | 'completed'): Promise<any>;
static moveToActive(worker: Worker, token: string, jobId?: string): Promise<[] | [JobJson, string]>;
static moveToActive<T, R, N extends string>(worker: Worker<T, R, N>, token: string, jobId?: string): Promise<[] | [JobJson, string]>;
static updateDelaySet(queue: QueueBase, delayedTimestamp: number): Promise<any>;

@@ -44,0 +44,0 @@ static promote(queue: QueueBase, jobId: string): Promise<any>;

@@ -6,3 +6,3 @@ import { Redis } from 'ioredis';

export declare const clientCommandMessageReg: RegExp;
export declare class Worker<T = any> extends QueueBase {
export declare class Worker<T = any, R = any, N extends string = string> extends QueueBase {
opts: WorkerOptions;

@@ -19,3 +19,3 @@ private drained;

private processing;
constructor(name: string, processor: string | Processor, opts?: WorkerOptions);
constructor(name: string, processor: string | Processor<T, R, N>, opts?: WorkerOptions);
waitUntilReady(): Promise<Redis>;

@@ -28,7 +28,7 @@ get repeat(): Promise<Repeat>;

*/
getNextJob(token: string): Promise<Job | void>;
getNextJob(token: string): Promise<Job<T, R, N> | void>;
private moveToActive;
private waitForJob;
private nextJobFromJobData;
processJob(job: Job, token: string): Promise<Job | void>;
processJob(job: Job<T, R, N>, token: string): Promise<Job<T, R, N> | void>;
/**

@@ -35,0 +35,0 @@ * Pauses the processing of this queue only for this worker.

import { Job } from '../classes';
import { AdvancedOptions, QueueBaseOptions, RateLimiterOptions } from './';
export declare type Processor = (job: Job) => Promise<any>;
export declare type Processor<T = any, R = any, N extends string = string> = (job: Job<T, R, N>) => Promise<R>;
export interface WorkerOptions extends QueueBaseOptions {

@@ -5,0 +5,0 @@ concurrency?: number;

@@ -46,3 +46,11 @@ # Repeatable

#### Slow repeatable jobs
It is worth to mention the case where the repeatable frequency is larger than the time it takes to process a job.
For instance, let's say that you have a job that is repeated every second, but the process of the job itself takes 5 seconds. As explained above, repeatable jobs are just delayed jobs, so this means that the next repeatable job will be added as soon as the next job is starting to be processed.
In this particular example, the worker will pick up the next job and also add the next repeatable job delayed 1 second since that is the repeatable interval. The worker will require 5 seconds to process the job, and if there is only 1 worker available then the next job will need to wait a full 5 seconds before it can be processed.
On the other hand, if there were 5 workers available, then they will most likely be able to process all the repeatable jobs with the desired frequency of one job per second.

@@ -7,2 +7,4 @@ # Retrying failing jobs

For BullMQ to reschedule failed jobs, make sure you create a `QueueScheduler` for your queue.
The code below shows how to specify a "exponential" backoff function with a 1 second delay as seed value, so it will retry at most 3 times spaced after 1 second, 2 seconds and 4 seconds:

@@ -9,0 +11,0 @@

# Returning job data
When a worker is done processing, sometimes it is convenient to return some data. This data can then be accessed for example by listening to the "completed" event. This return data is available at the job's "returnvalue" property.
Imagine a simple worker that performs some async processing:
```typescript
import { Queue, Worker } from 'bullmq';
const myWorker = new Worker('AsyncProc', async (job)=>{
const result = await doSomeAsyncProcessing();
return result;
});
```
{% hint style="info" %}
Note, in the example above we could just return directly doSomeAsyncProcessing, we just use a temporal variable to make the example more explicit.
{% endhint %}
We can now listen to the completed event in order to get the result value:
```typescript
import { Job, QueueEvents } from 'bullmq'
const queueEvents = new QueueEvents('AsyncProc')
queueEvents.on('completed', async (jobId: string) => {
const job = await Job.fromId(jobId);
console.log(job.returnvalue);
});
```
If you want to store the result of the processing function it is still much more robust to do it in the process function itself, that will guarantee that if the job is completed the return value would be stored as well. Storing data on the completed event on the other hand could fail and still the job would complete without detecting the error.
{
"name": "bullmq",
"version": "1.10.0",
"version": "1.11.0",
"description": "Queue for messages and jobs based on Redis",

@@ -5,0 +5,0 @@ "main": "dist/index.js",

@@ -34,3 +34,3 @@ <div align="center">

Which other languages would you like to see BullMQ ported to?
Which other languages would you like to see BullMQ ported to, Python, PHP, Java or C#?

@@ -37,0 +37,0 @@ Please vote here: https://twitter.com/manast/status/1318168684049977345

@@ -97,3 +97,3 @@ // Type definitions for bull 3.10

*/
async process(processor: string | Processor) {
async process(processor: string | Processor<T>) {
if (this.worker) {

@@ -100,0 +100,0 @@ throw new Error('Queue3.process() cannot be called twice');

@@ -26,3 +26,3 @@ import { Redis, Pipeline } from 'ioredis';

export class Job<T = any, R = any> {
export class Job<T = any, R = any, N extends string = string> {
progress: number | object = 0;

@@ -44,3 +44,3 @@ returnvalue: R = null;

private queue: QueueBase,
public name: string,
public name: N,
public data: T,

@@ -65,5 +65,5 @@ public opts: JobsOptions = {},

static async create<T = any, R = any>(
static async create<T = any, R = any, N extends string = string>(
queue: QueueBase,
name: string,
name: N,
data: T,

@@ -74,3 +74,3 @@ opts?: JobsOptions,

const job = new Job<T, R>(queue, name, data, opts, opts && opts.jobId);
const job = new Job<T, R, N>(queue, name, data, opts, opts && opts.jobId);

@@ -82,6 +82,6 @@ job.id = await job.addJob(client);

static async createBulk<T = any, R = any>(
static async createBulk<T = any, R = any, N extends string = string>(
queue: QueueBase,
jobs: {
name: string;
name: N;
data: T;

@@ -94,3 +94,3 @@ opts?: JobsOptions;

const jobInstances = jobs.map(
job => new Job<T, R>(queue, job.name, job.data, job.opts),
job => new Job<T, R, N>(queue, job.name, job.data, job.opts),
);

@@ -97,0 +97,0 @@

@@ -7,3 +7,7 @@ import { get } from 'lodash';

export class Queue<T = any> extends QueueGetters {
export class Queue<
T = any,
R = any,
N extends string = string
> extends QueueGetters {
token = v4();

@@ -48,3 +52,3 @@ jobsOpts: JobsOptions;

async add(name: string, data: T, opts?: JobsOptions) {
async add(name: N, data: T, opts?: JobsOptions) {
if (opts && opts.repeat) {

@@ -60,3 +64,3 @@ return (await this.repeat).addNextRepeatableJob(

const job = await Job.create(this, name, data, {
const job = await Job.create<T, R, N>(this, name, data, {
...this.jobsOpts,

@@ -86,3 +90,3 @@ ...opts,

*/
async addBulk(jobs: { name: string; data: T; opts?: JobsOptions }[]) {
async addBulk(jobs: { name: N; data: T; opts?: JobsOptions }[]) {
return Job.createBulk(

@@ -127,7 +131,3 @@ this,

async removeRepeatable(
name: string,
repeatOpts: RepeatOptions,
jobId?: string,
) {
async removeRepeatable(name: N, repeatOpts: RepeatOptions, jobId?: string) {
return (await this.repeat).removeRepeatable(name, repeatOpts, jobId);

@@ -134,0 +134,0 @@ }

@@ -1,3 +0,5 @@

const sandbox = (processFile: any, childPool: any) => {
return async function process(job: any) {
import { Job } from './job';
const sandbox = <T, R, N extends string>(processFile: any, childPool: any) => {
return async function process(job: Job<T, R, N>): Promise<R> {
const child = await childPool.retain(processFile);

@@ -12,4 +14,4 @@ let msgHandler: any;

const done = new Promise((resolve, reject) => {
msgHandler = (msg: any) => {
const done: Promise<R> = new Promise((resolve, reject) => {
msgHandler = async (msg: any) => {
switch (msg.cmd) {

@@ -27,6 +29,6 @@ case 'completed':

case 'progress':
job.updateProgress(msg.value);
await job.updateProgress(msg.value);
break;
case 'log':
job.log(msg.value);
await job.log(msg.value);
break;

@@ -33,0 +35,0 @@ }

@@ -94,3 +94,7 @@ /**

static async extendLock(worker: Worker, jobId: string, token: string) {
static async extendLock<T, R, N extends string>(
worker: Worker<T, R, N>,
jobId: string,
token: string,
) {
const client = await worker.client;

@@ -366,3 +370,7 @@ const opts: WorkerOptions = worker.opts;

static async moveToActive(worker: Worker, token: string, jobId?: string) {
static async moveToActive<T, R, N extends string>(
worker: Worker<T, R, N>,
token: string,
jobId?: string,
) {
const client = await worker.client;

@@ -369,0 +377,0 @@ const opts = worker.opts;

@@ -20,3 +20,7 @@ import * as fs from 'fs';

export class Worker<T = any> extends QueueBase {
export class Worker<
T = any,
R = any,
N extends string = string
> extends QueueBase {
opts: WorkerOptions;

@@ -26,3 +30,3 @@

private waiting = false;
private processFn: Processor;
private processFn: Processor<T, R, N>;

@@ -37,6 +41,6 @@ private resumeWorker: () => void;

private processing: Map<Promise<Job<T> | string>, string>; // { [index: number]: Promise<Job | void> } = {};
private processing: Map<Promise<Job<T, R, N> | string>, string>; // { [index: number]: Promise<Job | void> } = {};
constructor(
name: string,
processor: string | Processor,
processor: string | Processor<T, R, N>,
opts: WorkerOptions = {},

@@ -79,3 +83,3 @@ ) {

this.childPool = this.childPool || new ChildPool();
this.processFn = sandbox(processor, this.childPool).bind(this);
this.processFn = sandbox<T, R, N>(processor, this.childPool).bind(this);
}

@@ -172,3 +176,3 @@ this.timerManager = new TimerManager();

*/
async getNextJob(token: string): Promise<Job | void> {
async getNextJob(token: string): Promise<Job<T, R, N> | void> {
if (this.paused) {

@@ -203,3 +207,3 @@ await this.paused;

jobId?: string,
): Promise<Job | void> {
): Promise<Job<T, R, N> | void> {
const [jobData, id] = await Scripts.moveToActive(this, token, jobId);

@@ -231,3 +235,3 @@ return this.nextJobFromJobData(jobData, id);

jobId?: string,
): Promise<Job | void> {
): Promise<Job<T, R, N> | void> {
if (jobData) {

@@ -247,3 +251,6 @@ this.drained = false;

async processJob(job: Job, token: string): Promise<Job | void> {
async processJob(
job: Job<T, R, N>,
token: string,
): Promise<Job<T, R, N> | void> {
if (!job || this.closing || this.paused) {

@@ -293,3 +300,3 @@ return;

const handleCompleted = async (result: any): Promise<Job | void> => {
const handleCompleted = async (result: R): Promise<Job<T, R, N> | void> => {
const jobData = await job.moveToCompleted(

@@ -296,0 +303,0 @@ result,

import { Job } from '../classes';
import { AdvancedOptions, QueueBaseOptions, RateLimiterOptions } from './';
export type Processor = (job: Job) => Promise<any>;
export type Processor<T = any, R = any, N extends string = string> = (
job: Job<T, R, N>,
) => Promise<R>;

@@ -6,0 +8,0 @@ export interface WorkerOptions extends QueueBaseOptions {

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc