Socket
Socket
Sign inDemoInstall

bullmq

Package Overview
Dependencies
Maintainers
1
Versions
531
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

bullmq - npm Package Compare versions

Comparing version 1.12.3 to 1.13.0

docs/gitbook/CHANGELOG.md

6

dist/classes/queue-base.d.ts

@@ -1,2 +0,1 @@

/// <reference types="ioredis" />
/// <reference types="node" />

@@ -6,2 +5,3 @@ import { EventEmitter } from 'events';

import { RedisConnection } from './redis-connection';
import * as IORedis from 'ioredis';
export declare class QueueBase extends EventEmitter {

@@ -17,4 +17,4 @@ readonly name: string;

toKey(type: string): string;
get client(): Promise<import("ioredis").Redis>;
waitUntilReady(): Promise<import("ioredis").Redis>;
get client(): Promise<IORedis.Redis>;
waitUntilReady(): Promise<IORedis.Redis>;
protected base64Name(): string;

@@ -21,0 +21,0 @@ protected clientName(): string;

@@ -47,3 +47,2 @@ "use strict";

}
// TO BE DEPRECATED
async waitUntilReady() {

@@ -50,0 +49,0 @@ return this.client;

@@ -18,3 +18,3 @@ import { Redis } from 'ioredis';

private processing;
constructor(name: string, processor: string | Processor<T, R, N>, opts?: WorkerOptions);
constructor(name: string, processor?: string | Processor<T, R, N>, opts?: WorkerOptions);
waitUntilReady(): Promise<Redis>;

@@ -21,0 +21,0 @@ get repeat(): Promise<Repeat>;

@@ -31,22 +31,24 @@ "use strict";

this.blockingConnection.on('error', this.emit.bind(this, 'error'));
if (typeof processor === 'function') {
this.processFn = processor;
}
else {
// SANDBOXED
const supportedFileTypes = ['.js', '.ts', '.flow'];
const processorFile = processor +
(supportedFileTypes.includes(path.extname(processor)) ? '' : '.js');
if (!fs.existsSync(processorFile)) {
// TODO are we forced to use sync api here?
throw new Error(`File ${processorFile} does not exist`);
if (processor) {
if (typeof processor === 'function') {
this.processFn = processor;
}
this.childPool = this.childPool || new child_pool_1.ChildPool();
this.processFn = sandbox_1.default(processor, this.childPool).bind(this);
else {
// SANDBOXED
const supportedFileTypes = ['.js', '.ts', '.flow'];
const processorFile = processor +
(supportedFileTypes.includes(path.extname(processor)) ? '' : '.js');
if (!fs.existsSync(processorFile)) {
// TODO are we forced to use sync api here?
throw new Error(`File ${processorFile} does not exist`);
}
this.childPool = this.childPool || new child_pool_1.ChildPool();
this.processFn = sandbox_1.default(processor, this.childPool).bind(this);
}
this.timerManager = new timer_manager_1.TimerManager();
/* tslint:disable: no-floating-promises */
this.run().catch(error => {
console.error(error);
});
}
this.timerManager = new timer_manager_1.TimerManager();
/* tslint:disable: no-floating-promises */
this.run().catch(error => {
console.error(error);
});
this.on('error', err => console.error(err));

@@ -318,3 +320,3 @@ }

.finally(() => client.disconnect())
.finally(() => this.timerManager.clearAllTimers())
.finally(() => this.timerManager && this.timerManager.clearAllTimers())
.finally(() => this.emit('closed'));

@@ -321,0 +323,0 @@ })();

@@ -1272,3 +1272,31 @@ "use strict";

});
mocha_1.describe('Manually process jobs', () => {
mocha_1.it('should allow to complete jobs manually', async () => {
const worker = new classes_1.Worker(queueName);
const token = 'my-token';
await queue.add('test', { foo: 'bar' });
const job = (await worker.getNextJob(token));
const isActive = await job.isActive();
chai_1.expect(isActive).to.be.equal(true);
await job.moveToCompleted('return value', token);
const isCompleted = await job.isCompleted();
chai_1.expect(isCompleted).to.be.equal(true);
await worker.close();
});
mocha_1.it('should allow to fail jobs manually', async () => {
const worker = new classes_1.Worker(queueName);
const token = 'my-token';
await queue.add('test', { foo: 'bar' });
const job = (await worker.getNextJob(token));
const isActive = await job.isActive();
chai_1.expect(isActive).to.be.equal(true);
await job.moveToFailed(new Error('job failed for some reason'), token);
const isCompleted = await job.isCompleted();
const isFailed = await job.isFailed();
chai_1.expect(isCompleted).to.be.equal(false);
chai_1.expect(isFailed).to.be.equal(true);
await worker.close();
});
});
});
//# sourceMappingURL=test_worker.js.map

@@ -10,3 +10,3 @@ ---

```typescript
import { Queue } from 'bullmq'
import { Queue } from 'bullmq'

@@ -22,3 +22,2 @@ const myQueue = new Queue('Paint');

```typescript
await myQueue.add(

@@ -29,3 +28,2 @@ 'wall',

);
```

@@ -35,3 +33,3 @@

### Default job options
## Default job options

@@ -46,3 +44,1 @@ Quite often you will want to provide the same job options to all the jobs that you add to the Queue. In this case you can use the "defaultJobOptions" option when instantiating the Queue class:

@@ -11,3 +11,3 @@ # Repeatable

There are two ways to specify a repeatable's job repetition pattern, either with a [cron expression](https://www.freeformatter.com/cron-expression-generator-quartz.html), or specifying a fix amount of milliseconds between repetitions.
There are two ways to specify a repeatable's job repetition pattern, either with a cron expression (using [cron-parser](https://www.npmjs.com/package/cron-parser)'s "unix cron w/ optional seconds" format), or specifying a fix amount of milliseconds between repetitions.

@@ -14,0 +14,0 @@ ```typescript

@@ -6,3 +6,3 @@ # Table of contents

* [API Reference](api-reference.md)
* [Changelog](changelog.md)
* [Changelog](CHANGELOG.md)

@@ -34,2 +34,3 @@ ## Guide

* [Manually fetching jobs](patterns/manually-fetching-jobs.md)
* [Producer - Consumer](patterns/producer-consumer.md)

@@ -36,0 +37,0 @@ * [Flows](patterns/flows.md)

{
"name": "bullmq",
"version": "1.12.3",
"version": "1.13.0",
"description": "Queue for messages and jobs based on Redis",

@@ -15,3 +15,3 @@ "main": "dist/index.js",

"coveralls": "nyc report --reporter=text-lcov | coveralls",
"docs": "typedoc --out docs src",
"docs": "(api-extractor run || true) && api-documenter markdown -i ./temp -o docs/gitbook/api",
"lint": "tslint --project tsconfig.json -c tslint.json 'src/**/*.ts'",

@@ -40,4 +40,7 @@ "precommit": "yarn lint",

"@istanbuljs/nyc-config-typescript": "^0.1.3",
"@microsoft/api-documenter": "^7.12.1",
"@microsoft/api-extractor": "^7.12.1",
"@semantic-release/changelog": "^3.0.4",
"@semantic-release/commit-analyzer": "^6.3.0",
"@semantic-release/exec": "^5.0.0",
"@semantic-release/git": "^7.0.16",

@@ -64,3 +67,3 @@ "@semantic-release/github": "^5.5.4",

"prettier": "^1.18.2",
"semantic-release": "^15.13.2",
"semantic-release": "^17.2.3",
"sinon": "^7.2.2",

@@ -73,3 +76,2 @@ "ts-mocha": "^6.0.0",

"tslint-plugin-prettier": "^2.0.1",
"typedoc": "^0.15.0",
"typescript": "^3.2.2"

@@ -117,6 +119,17 @@ },

],
[
"@semantic-release/exec",
{
"prepareCmd": "yarn docs"
}
],
"@semantic-release/github"
],
"prepare": [
"@semantic-release/changelog",
[
"@semantic-release/changelog",
{
"changelogFile": "docs/gitbook/CHANGELOG.md"
}
],
"@semantic-release/npm",

@@ -128,3 +141,4 @@ {

"yarn.lock",
"CHANGELOG.md"
"docs/gitbook/CHANGELOG.md",
"docs/gitbook/api/**"
],

@@ -131,0 +145,0 @@ "message": "chore(release): ${nextRelease.version} [skip ci]nn${nextRelease.notes}"

import { EventEmitter } from 'events';
import { QueueBaseOptions } from '../interfaces';
import { RedisConnection } from './redis-connection';
import * as IORedis from 'ioredis';

@@ -57,7 +58,6 @@ export class QueueBase extends EventEmitter {

get client() {
get client(): Promise<IORedis.Redis> {
return this.connection.client;
}
// TO BE DEPRECATED
async waitUntilReady() {

@@ -64,0 +64,0 @@ return this.client;

@@ -42,3 +42,3 @@ import * as fs from 'fs';

name: string,
processor: string | Processor<T, R, N>,
processor?: string | Processor<T, R, N>,
opts: WorkerOptions = {},

@@ -66,26 +66,28 @@ ) {

if (typeof processor === 'function') {
this.processFn = processor;
} else {
// SANDBOXED
const supportedFileTypes = ['.js', '.ts', '.flow'];
const processorFile =
processor +
(supportedFileTypes.includes(path.extname(processor)) ? '' : '.js');
if (processor) {
if (typeof processor === 'function') {
this.processFn = processor;
} else {
// SANDBOXED
const supportedFileTypes = ['.js', '.ts', '.flow'];
const processorFile =
processor +
(supportedFileTypes.includes(path.extname(processor)) ? '' : '.js');
if (!fs.existsSync(processorFile)) {
// TODO are we forced to use sync api here?
throw new Error(`File ${processorFile} does not exist`);
if (!fs.existsSync(processorFile)) {
// TODO are we forced to use sync api here?
throw new Error(`File ${processorFile} does not exist`);
}
this.childPool = this.childPool || new ChildPool();
this.processFn = sandbox<T, R, N>(processor, this.childPool).bind(this);
}
this.timerManager = new TimerManager();
this.childPool = this.childPool || new ChildPool();
this.processFn = sandbox<T, R, N>(processor, this.childPool).bind(this);
/* tslint:disable: no-floating-promises */
this.run().catch(error => {
console.error(error);
});
}
this.timerManager = new TimerManager();
/* tslint:disable: no-floating-promises */
this.run().catch(error => {
console.error(error);
});
this.on('error', err => console.error(err));

@@ -417,3 +419,3 @@ }

.finally(() => client.disconnect())
.finally(() => this.timerManager.clearAllTimers())
.finally(() => this.timerManager && this.timerManager.clearAllTimers())
.finally(() => this.emit('closed'));

@@ -420,0 +422,0 @@ })();

@@ -1633,2 +1633,46 @@ import { Queue, QueueEvents, Job, Worker, QueueScheduler } from '@src/classes';

});
describe('Manually process jobs', () => {
it('should allow to complete jobs manually', async () => {
const worker = new Worker(queueName);
const token = 'my-token';
await queue.add('test', { foo: 'bar' });
const job = (await worker.getNextJob(token)) as Job;
const isActive = await job.isActive();
expect(isActive).to.be.equal(true);
await job.moveToCompleted('return value', token);
const isCompleted = await job.isCompleted();
expect(isCompleted).to.be.equal(true);
await worker.close();
});
it('should allow to fail jobs manually', async () => {
const worker = new Worker(queueName);
const token = 'my-token';
await queue.add('test', { foo: 'bar' });
const job = (await worker.getNextJob(token)) as Job;
const isActive = await job.isActive();
expect(isActive).to.be.equal(true);
await job.moveToFailed(new Error('job failed for some reason'), token);
const isCompleted = await job.isCompleted();
const isFailed = await job.isFailed();
expect(isCompleted).to.be.equal(false);
expect(isFailed).to.be.equal(true);
await worker.close();
});
});
});

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc