Comparing version 1.8.5 to 1.8.6
@@ -0,1 +1,11 @@ | ||
## [1.8.6](https://github.com/taskforcesh/bullmq/compare/v1.8.5...v1.8.6) (2020-04-10) | ||
### Bug Fixes | ||
* **workers:** do not call super.close() ([ebd2ae1](https://github.com/taskforcesh/bullmq/commit/ebd2ae1a5613d71643c5a7ba3f685d77585de68e)) | ||
* make sure closing is returned in every close call ([88c5948](https://github.com/taskforcesh/bullmq/commit/88c5948d33a9a7b7a4f4f64f3183727b87d80207)) | ||
* **scheduler:** duplicate connections fixes [#174](https://github.com/taskforcesh/bullmq/issues/174) ([011b8ac](https://github.com/taskforcesh/bullmq/commit/011b8acfdec54737d94a9fead2423e060e3364db)) | ||
* **worker:** return this.closing when calling close ([06d3d4f](https://github.com/taskforcesh/bullmq/commit/06d3d4f476444a2d2af8538d60cb2561a1915868)) | ||
## [1.8.5](https://github.com/taskforcesh/bullmq/compare/v1.8.4...v1.8.5) (2020-04-05) | ||
@@ -2,0 +12,0 @@ |
@@ -57,3 +57,6 @@ "use strict"; | ||
close() { | ||
return (this.closing = this.connection.close()); | ||
if (!this.closing) { | ||
this.closing = this.connection.close(); | ||
} | ||
return this.closing; | ||
} | ||
@@ -60,0 +63,0 @@ disconnect() { |
@@ -52,3 +52,6 @@ "use strict"; | ||
async close() { | ||
return (this.closing = this.disconnect()); | ||
if (!this.closing) { | ||
this.closing = this.disconnect(); | ||
} | ||
return this.closing; | ||
} | ||
@@ -55,0 +58,0 @@ } |
@@ -22,3 +22,3 @@ import { QueueSchedulerOptions } from '../interfaces'; | ||
private isBlocked; | ||
constructor(name: string, opts?: QueueSchedulerOptions); | ||
constructor(name: string, { connection, ...opts }?: QueueSchedulerOptions); | ||
private run; | ||
@@ -25,0 +25,0 @@ private readDelayedData; |
"use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
const tslib_1 = require("tslib"); | ||
const utils_1 = require("../utils"); | ||
@@ -24,4 +25,7 @@ const _1 = require("./"); | ||
class QueueScheduler extends _1.QueueBase { | ||
constructor(name, opts = {}) { | ||
super(name, Object.assign({ maxStalledCount: 1, stalledInterval: 30000 }, opts)); | ||
constructor(name, _a = {}) { | ||
var { connection } = _a, opts = tslib_1.__rest(_a, ["connection"]); | ||
super(name, Object.assign(Object.assign({ maxStalledCount: 1, stalledInterval: 30000 }, opts), { connection: utils_1.isRedisInstance(connection) | ||
? connection.duplicate() | ||
: connection })); | ||
this.nextTimestamp = Number.MAX_VALUE; | ||
@@ -121,2 +125,5 @@ this.isBlocked = false; | ||
async close() { | ||
if (this.closing) { | ||
return this.closing; | ||
} | ||
if (this.isBlocked) { | ||
@@ -126,3 +133,3 @@ this.closing = this.disconnect(); | ||
else { | ||
super.close(); | ||
this.closing = super.close(); | ||
} | ||
@@ -129,0 +136,0 @@ return this.closing; |
@@ -293,4 +293,2 @@ "use strict"; | ||
} | ||
// await this.disconnect(); | ||
await super.close(); | ||
} | ||
@@ -308,2 +306,3 @@ catch (err) { | ||
} | ||
return this.closing; | ||
} | ||
@@ -310,0 +309,0 @@ } |
@@ -98,8 +98,8 @@ "use strict"; | ||
mocha_1.it('jobs not stalled while lock is extended', async function () { | ||
this.timeout(6000); | ||
this.timeout(5000); | ||
const concurrency = 4; | ||
const worker = new classes_1.Worker(queueName, async (job) => { | ||
return utils_1.delay(10000); | ||
return utils_1.delay(4000); | ||
}, { | ||
lockDuration: 1000, | ||
lockDuration: 100, | ||
concurrency, | ||
@@ -110,3 +110,2 @@ }); | ||
}); | ||
await worker.waitUntilReady(); | ||
await Promise.all([ | ||
@@ -120,9 +119,6 @@ queue.add('test', { bar: 'baz' }), | ||
const queueScheduler = new classes_1.QueueScheduler(queueName, { | ||
stalledInterval: 100, | ||
stalledInterval: 50, | ||
}); | ||
await queueScheduler.waitUntilReady(); | ||
const allStalled = new Promise(resolve => { | ||
queueScheduler.on('stalled', lodash_1.after(concurrency, resolve)); | ||
}); | ||
await utils_1.delay(2000); // let worker to extend lock for several times | ||
const allStalled = new Promise(resolve => queueScheduler.on('stalled', lodash_1.after(concurrency, resolve))); | ||
await utils_1.delay(500); // Wait for jobs to become active | ||
const active = await queue.getActiveCount(); | ||
@@ -129,0 +125,0 @@ chai_1.expect(active).to.be.equal(4); |
{ | ||
"name": "bullmq", | ||
"version": "1.8.5", | ||
"version": "1.8.6", | ||
"description": "Queue for messages and jobs based on Redis", | ||
@@ -5,0 +5,0 @@ "main": "dist/index.js", |
@@ -75,3 +75,6 @@ import { EventEmitter } from 'events'; | ||
close() { | ||
return (this.closing = this.connection.close()); | ||
if (!this.closing) { | ||
this.closing = this.connection.close(); | ||
} | ||
return this.closing; | ||
} | ||
@@ -78,0 +81,0 @@ |
@@ -71,4 +71,7 @@ import { QueueEventsOptions } from '../interfaces'; | ||
async close() { | ||
return (this.closing = this.disconnect()); | ||
if (!this.closing) { | ||
this.closing = this.disconnect(); | ||
} | ||
return this.closing; | ||
} | ||
} |
import { QueueSchedulerOptions } from '../interfaces'; | ||
import { array2obj } from '../utils'; | ||
import { array2obj, isRedisInstance } from '../utils'; | ||
import { QueueBase } from './'; | ||
@@ -29,4 +29,14 @@ import { Scripts } from './scripts'; | ||
constructor(name: string, opts: QueueSchedulerOptions = {}) { | ||
super(name, { maxStalledCount: 1, stalledInterval: 30000, ...opts }); | ||
constructor( | ||
name: string, | ||
{ connection, ...opts }: QueueSchedulerOptions = {}, | ||
) { | ||
super(name, { | ||
maxStalledCount: 1, | ||
stalledInterval: 30000, | ||
...opts, | ||
connection: isRedisInstance(connection) | ||
? (<IORedis.Redis>connection).duplicate() | ||
: connection, | ||
}); | ||
@@ -164,6 +174,9 @@ // tslint:disable: no-floating-promises | ||
async close() { | ||
if (this.closing) { | ||
return this.closing; | ||
} | ||
if (this.isBlocked) { | ||
this.closing = this.disconnect(); | ||
} else { | ||
super.close(); | ||
this.closing = super.close(); | ||
} | ||
@@ -170,0 +183,0 @@ return this.closing; |
@@ -375,2 +375,3 @@ import * as fs from 'fs'; | ||
await this.resume(); | ||
if (!force) { | ||
@@ -381,4 +382,2 @@ await this.whenCurrentJobsFinished(false); | ||
} | ||
// await this.disconnect(); | ||
await super.close(); | ||
} catch (err) { | ||
@@ -394,3 +393,4 @@ reject(err); | ||
} | ||
return this.closing; | ||
} | ||
} |
@@ -138,3 +138,3 @@ import { Queue, QueueScheduler, Worker, QueueEvents } from '@src/classes'; | ||
it('jobs not stalled while lock is extended', async function() { | ||
this.timeout(6000); | ||
this.timeout(5000); | ||
@@ -146,6 +146,6 @@ const concurrency = 4; | ||
async job => { | ||
return delay(10000); | ||
return delay(4000); | ||
}, | ||
{ | ||
lockDuration: 1000, // lockRenewTime would be half of it i.e. 500 | ||
lockDuration: 100, // lockRenewTime would be half of it i.e. 500 | ||
concurrency, | ||
@@ -159,4 +159,2 @@ }, | ||
await worker.waitUntilReady(); | ||
await Promise.all([ | ||
@@ -172,11 +170,10 @@ queue.add('test', { bar: 'baz' }), | ||
const queueScheduler = new QueueScheduler(queueName, { | ||
stalledInterval: 100, | ||
stalledInterval: 50, | ||
}); | ||
await queueScheduler.waitUntilReady(); | ||
const allStalled = new Promise(resolve => { | ||
queueScheduler.on('stalled', after(concurrency, resolve)); | ||
}); | ||
const allStalled = new Promise(resolve => | ||
queueScheduler.on('stalled', after(concurrency, resolve)), | ||
); | ||
await delay(2000); // let worker to extend lock for several times | ||
await delay(500); // Wait for jobs to become active | ||
@@ -183,0 +180,0 @@ const active = await queue.getActiveCount(); |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
1012331
17165