@chromaui/bee-queue
Advanced tools
Comparing version 1.2.3-chromaui.0 to 1.4.0-chromaui.0
@@ -0,1 +1,40 @@ | ||
## [1.4.0](https://github.com/bee-queue/bee-queue/compare/v1.3.1...v1.4.0) (2021-05-19) | ||
### Features | ||
- **job:** store progress with the whole job data ([#339](https://github.com/bee-queue/bee-queue/issues/339)) ([6df8b5a](https://github.com/bee-queue/bee-queue/commit/6df8b5a6d126d8a9e06300451481f57eab3a7d89)) | ||
### Bug Fixes | ||
- typo fix in README ([#412](https://github.com/bee-queue/bee-queue/issues/412)) ([850d1d1](https://github.com/bee-queue/bee-queue/commit/850d1d1bd1c43552ad312bfcd41fe2741994f78a)) | ||
### [1.3.1](https://github.com/bee-queue/bee-queue/compare/v1.3.0...v1.3.1) (2020-11-04) | ||
### Bug Fixes | ||
- **scale:** bound unpack arguments count ([#297](https://github.com/bee-queue/bee-queue/issues/297)) ([4108e5e](https://github.com/bee-queue/bee-queue/commit/4108e5e4e97dcf2d8df44dd0114ba31edab511ca)) | ||
- **types:** fix typescript definitions errors ([#311](https://github.com/bee-queue/bee-queue/issues/311)) ([3bc3f31](https://github.com/bee-queue/bee-queue/commit/3bc3f317d09a04b1165fc9f3aa46e17e82d3606f)) | ||
## [1.3.0](https://github.com/bee-queue/bee-queue/compare/v1.2.3...v1.3.0) (2020-11-03) | ||
### Features | ||
- implement Queue#saveAll feature ([#198](https://github.com/bee-queue/bee-queue/issues/198)) ([851f09d](https://github.com/bee-queue/bee-queue/commit/851f09df65d144adc7d6798c91c8d665e52400d5)) | ||
- support custom strategies on a queue ([#134](https://github.com/bee-queue/bee-queue/issues/134)) ([926de9d](https://github.com/bee-queue/bee-queue/commit/926de9dc016541cdfdd6139b780ecf4058a6e709)) | ||
- **types:** add generics to type definitions ([a565d3d](https://github.com/bee-queue/bee-queue/commit/a565d3dc2acf120cb7a91cdfbbf2840d67267ae7)) | ||
- **types:** add isRunning ([c488385](https://github.com/bee-queue/bee-queue/commit/c4883859fbbdc0eb0ee894d77097c67e528e2d31)) | ||
- **types:** add ready handler ([32c4b1e](https://github.com/bee-queue/bee-queue/commit/32c4b1ef45e82a7bd5698445f30e7460ece71484)) | ||
### Bug Fixes | ||
- **backoff:** allow no delay arg when setting immediate strategy ([#154](https://github.com/bee-queue/bee-queue/issues/154)) ([6f1d62f](https://github.com/bee-queue/bee-queue/commit/6f1d62fc493c5f2ad20eae5a8122fc316c092451)) | ||
- **queue:** remove error event listener on close ([#231](https://github.com/bee-queue/bee-queue/issues/231)) ([36b4904](https://github.com/bee-queue/bee-queue/commit/36b4904b363cae1a84bb38f1d8c40f3e7f930c44)) | ||
- **removeJob:** remove job from stored jobs ([#230](https://github.com/bee-queue/bee-queue/issues/230)) ([a8c9d87](https://github.com/bee-queue/bee-queue/commit/a8c9d87f106cf4e0f51b5a6b1000cc1e1e19e6ad)) | ||
- **types:** support progress events using arbitrary data ([#140](https://github.com/bee-queue/bee-queue/issues/140)) ([bc8aa52](https://github.com/bee-queue/bee-queue/commit/bc8aa522f66ed7038bdea03629c0f3244ea8a55f)) | ||
- **types:** update createJob handler for consistency ([b71a993](https://github.com/bee-queue/bee-queue/commit/b71a9930c1cbf3de3c191073b6c1c41e7bcde1d8)) | ||
- **types:** update redis option type ([#290](https://github.com/bee-queue/bee-queue/issues/290)) ([e80c51d](https://github.com/bee-queue/bee-queue/commit/e80c51db100f9a0e7fc596b27bcd9c8a5e78791f)) | ||
- **types:** update type declaration ([#252](https://github.com/bee-queue/bee-queue/issues/252)) ([1dce7ca](https://github.com/bee-queue/bee-queue/commit/1dce7ca9cc90da328d5943a3f77483849d3dd816)) | ||
- misc edge case fixes ([a2df983](https://github.com/bee-queue/bee-queue/commit/a2df9836dbeeef7458c09d6b7aa2f674e5d0efeb)) | ||
- update typescript declarations and add documentation ([#187](https://github.com/bee-queue/bee-queue/issues/187)) ([cec1498](https://github.com/bee-queue/bee-queue/commit/cec1498ecc486c26c6a0b882daf360ad8c0d0402)), closes [#138](https://github.com/bee-queue/bee-queue/issues/138) | ||
# 1.2.3 / 2020-01-28 | ||
@@ -2,0 +41,0 @@ |
@@ -5,3 +5,3 @@ /// <reference types="node" /> | ||
import {EventEmitter} from 'events'; | ||
import {ClientOpts} from 'redis'; | ||
import {ClientOpts, RedisClient} from 'redis'; | ||
@@ -14,2 +14,3 @@ declare class BeeQueue<T = any> extends EventEmitter { | ||
settings: any; | ||
backoffStrategies: Map<string, (job: BeeQueue.Job<T>) => number>; | ||
@@ -92,3 +93,3 @@ constructor(name: string, settings?: BeeQueue.QueueSettings); | ||
delayedDebounce?: number; | ||
redis?: ClientOpts; | ||
redis?: ClientOpts | RedisClient; | ||
isWorker?: boolean; | ||
@@ -112,2 +113,3 @@ getEvents?: boolean; | ||
progress: any; | ||
status: 'created' | 'succeeded' | 'failed' | 'retrying'; | ||
@@ -121,6 +123,3 @@ on(ev: 'succeeded', fn: (result: any) => void): this; | ||
retries(n: number): this; | ||
backoff( | ||
strategy: 'immediate' | 'fixed' | 'exponential', | ||
delayFactor?: number | ||
): this; | ||
backoff(strategy: string, delayFactor?: number): this; | ||
delayUntil(dateOrTimestamp: Date | number): this; | ||
@@ -127,0 +126,0 @@ timeout(milliseconds: number): this; |
@@ -21,4 +21,2 @@ 'use strict'; | ||
redisScanCount: 100, | ||
// first retry period after a brpoplpush failure, backoff is exponential | ||
initialRedisFailureRetryDelay: 1000, | ||
@@ -25,0 +23,0 @@ // quitCommandClient is dependent on whether the redis setting was an actual |
'use strict'; | ||
const Emitter = require('events').EventEmitter; | ||
const helpers = require('./helpers'); | ||
const strategies = require('./backoff'); | ||
@@ -30,7 +28,3 @@ class Job extends Emitter { | ||
) | ||
.then((data) => | ||
data | ||
? Job.fromData(queue, jobId, data) | ||
: queue.removeJob(jobId).then(() => null) | ||
); | ||
.then((data) => (data ? Job.fromData(queue, jobId, data) : null)); | ||
@@ -46,2 +40,3 @@ if (cb) helpers.asCallback(promise, cb); | ||
job.status = data.status; | ||
job.progress = data.progress; | ||
return job; | ||
@@ -55,6 +50,9 @@ } | ||
status: this.status, | ||
progress: this.progress, | ||
}); | ||
} | ||
save(cb) { | ||
// For Queue#saveAll, this method is guaranteed to invoke evalScript | ||
// synchronously. | ||
_save(evalScript) { | ||
const toKey = this.queue.toKey.bind(this.queue); | ||
@@ -64,3 +62,3 @@ | ||
if (this.options.delay) { | ||
promise = this.queue._evalScript( | ||
promise = evalScript([ | ||
'addDelayedJob', | ||
@@ -74,4 +72,4 @@ 4, | ||
this.toData(), | ||
this.options.delay | ||
); | ||
this.options.delay, | ||
]); | ||
@@ -88,3 +86,3 @@ if (this.queue.settings.activateDelayedJobs) { | ||
} else { | ||
promise = this.queue._evalScript( | ||
promise = evalScript([ | ||
'addJob', | ||
@@ -96,7 +94,7 @@ 3, | ||
this.id || '', | ||
this.toData() | ||
); | ||
this.toData(), | ||
]); | ||
} | ||
promise = promise.then((jobId) => { | ||
return promise.then((jobId) => { | ||
this.id = jobId; | ||
@@ -109,3 +107,9 @@ // If the jobId is not null, then store the job in the job map. | ||
}); | ||
} | ||
save(cb) { | ||
const promise = this._save((args) => | ||
this.queue._evalScript.apply(this.queue, args) | ||
); | ||
if (cb) helpers.asCallback(promise, cb); | ||
@@ -153,3 +157,3 @@ return promise; | ||
backoff(strategy, delay) { | ||
if (!strategies.has(strategy)) { | ||
if (!this.queue.backoffStrategies.has(strategy)) { | ||
throw new Error('unknown strategy'); | ||
@@ -156,0 +160,0 @@ } |
111
lib/queue.js
@@ -23,2 +23,3 @@ 'use strict'; | ||
this.checkTimer = null; | ||
this.backoffStrategies = new Map(backoff); | ||
@@ -496,22 +497,4 @@ this._closed = null; | ||
Job.fromId(this, jobId), | ||
(err) => { | ||
if (redis.isAbortError(err) && this.paused) { | ||
return null; | ||
} | ||
this.emit('error', err); | ||
// Retry the brpoplpush after a delay | ||
if (!this._redisFailureRetryDelay) { | ||
// prettier-ignore | ||
this._redisFailureRetryDelay | ||
= this.settings.initialRedisFailureRetryDelay; | ||
} else { | ||
this._redisFailureRetryDelay = this._redisFailureRetryDelay * 2; | ||
} | ||
return helpers | ||
.delay(this._redisFailureRetryDelay) | ||
.then(() => this._waitForJob()); | ||
} | ||
(err) => | ||
redis.isAbortError(err) && this.paused ? null : Promise.reject(err) | ||
); | ||
@@ -521,3 +504,2 @@ } | ||
_getNextJob() { | ||
this._redisFailureRetryDelay = 0; | ||
// Under normal calling conditions, commandable will not reject because we | ||
@@ -606,3 +588,5 @@ // will have just checked paused in Queue#process. | ||
const strategy = | ||
job.options.retries > 0 ? backoff.get(strategyName) : null; | ||
job.options.retries > 0 | ||
? this.backoffStrategies.get(strategyName) | ||
: null; | ||
const delay = strategy ? strategy(job) : -1; | ||
@@ -699,2 +683,9 @@ if (delay < 0) { | ||
this.running += 1; | ||
this.queued -= 1; | ||
if (this.running + this.queued < this.concurrency) { | ||
this.queued += 1; | ||
setImmediate(jobTick); | ||
} | ||
if (!job) { | ||
@@ -707,9 +698,2 @@ // Per comment in Queue#_waitForJob, this branch is possible when | ||
this.running += 1; | ||
this.queued -= 1; | ||
if (this.running + this.queued < this.concurrency) { | ||
this.queued += 1; | ||
setImmediate(jobTick); | ||
} | ||
return this._runJob(job).then((results) => { | ||
@@ -809,2 +793,34 @@ this.running -= 1; | ||
/** | ||
* Save all the provided jobs, without waiting for each job to be created. | ||
* This pipelines the requests which avoids the waiting 2N*RTT for N jobs - | ||
* the client waits to receive each command result before sending the next | ||
* command. Note that this method does not support a callback parameter - you | ||
* must use the returned Promise. | ||
* | ||
* @param {Iterable<Job>} jobs The jobs to save. Jobs that have no ID will be | ||
* assigned one by mutation. | ||
* @return {Promise<Map<Job, Error>>} The errors that occurred when saving | ||
* jobs. Will be empty if no errors occurred. Will reject if there was an | ||
* exception executing the batch or readying the connection. | ||
* @modifies {arguments} | ||
*/ | ||
saveAll(jobs) { | ||
return this._commandable().then((client) => { | ||
const batch = client.batch(), | ||
errors = new Map(); | ||
for (const job of jobs) { | ||
try { | ||
job | ||
._save((evalArgs) => this._evalScriptOn(batch, evalArgs)) | ||
.catch((err) => void errors.set(job, err)); | ||
} catch (err) { | ||
errors.set(job, err); | ||
} | ||
} | ||
return helpers.callAsync((done) => batch.exec(done)).then(() => errors); | ||
}); | ||
} | ||
_activateDelayed() { | ||
@@ -845,26 +861,35 @@ if (!this.settings.activateDelayedJobs) return; | ||
/** | ||
* Evaluate the named script on the given commandable object, which might be a | ||
* RedisClient or a Batch or Multi object. This exists to facilitate | ||
* command pipelining. | ||
* | ||
* @modifies {arguments} | ||
*/ | ||
_evalScriptOn(commandable, args) { | ||
// Precondition: Queue is ready - otherwise lua.shas may not have loaded. | ||
args[0] = lua.shas[args[0]]; | ||
return helpers.callAsync((done) => { | ||
args.push(done); | ||
commandable.evalsha.apply(commandable, args); | ||
}); | ||
} | ||
/** | ||
* Evaluate the named script, return a promise with its results. | ||
* | ||
* Same parameter list/syntax as evalsha, except for the name. | ||
* | ||
* @param {string} name | ||
*/ | ||
_evalScript(name) { | ||
_evalScript() { | ||
// Avoid deoptimization by leaking arguments: store them directly in an | ||
// array instead of passing them to a helper. | ||
const args = new Array(arguments.length); | ||
// Skip the first because it's just the name, and it'll get filled in within | ||
// the promise. | ||
for (let i = 1; i < arguments.length; ++i) { | ||
for (let i = 0; i < arguments.length; ++i) { | ||
args[i] = arguments[i]; | ||
} | ||
return this._commandable().then((client) => { | ||
// Get the sha for the script after we're ready to avoid a race condition | ||
// with the lua script loader. | ||
args[0] = lua.shas[name]; | ||
const promise = helpers.deferred(); | ||
args.push(promise.defer()); | ||
client.evalsha.apply(client, args); | ||
return promise; | ||
}); | ||
return this._commandable().then((client) => | ||
this._evalScriptOn(client, args) | ||
); | ||
} | ||
@@ -871,0 +896,0 @@ } |
{ | ||
"name": "@chromaui/bee-queue", | ||
"version": "1.2.3-chromaui.0", | ||
"version": "1.4.0-chromaui.0", | ||
"description": "A simple, fast, robust job/task queue, backed by Redis.", | ||
@@ -12,5 +12,14 @@ "main": "index.js", | ||
"devDependencies": { | ||
"@commitlint/cli": "^11.0.0", | ||
"@commitlint/config-conventional": "^11.0.0", | ||
"@semantic-release/changelog": "^5.0.1", | ||
"@semantic-release/commit-analyzer": "^8.0.1", | ||
"@semantic-release/exec": "^5.0.0", | ||
"@semantic-release/git": "^9.0.0", | ||
"@semantic-release/github": "^7.1.1", | ||
"@semantic-release/npm": "^7.0.6", | ||
"@semantic-release/release-notes-generator": "^9.0.1", | ||
"ava": "^0.25.0", | ||
"ava-spec": "^1.1.0", | ||
"coveralls": "^3.0.11", | ||
"conventional-changelog-conventionalcommits": "^4.4.0", | ||
"eslint": "^7.8.1", | ||
@@ -23,2 +32,3 @@ "eslint-config-prettier": "^6.10.1", | ||
"sandboxed-module": "^2.0.3", | ||
"semantic-release": "^17.2.2", | ||
"semver": "^7.2.1", | ||
@@ -37,5 +47,6 @@ "sinon": "^7.5.0" | ||
"scripts": { | ||
"ci": "if node -e '({...0})' 2>/dev/null && node -e '\"\".trimEnd()' 2>/dev/null; then npm run lint && npm run ci:coverage; else ava; fi", | ||
"ci:coverage": "if [ -z \"$CI\" ]; then npm run coverage; else npm run coverage-and-publish; fi", | ||
"coverage-and-publish": "npm run coverage && nyc report --reporter=text-lcov | coveralls", | ||
"ci": "if node -e '({...0})' 2>/dev/null && node -e '\"\".trimEnd()' 2>/dev/null; then npm run lint && npm run ci:coverage && if [ -z \"$CI\" ]; then npm run ci:commitlint; fi; else ava; fi", | ||
"ci:commitlint": "if node -e 'async()=>{for await(var a of a);}'; then commitlint --from \"origin/${GITHUB_BASE_REF:-master}\"; fi", | ||
"ci:coverage": "if [ -z \"$CI\" ]; then npm run coverage; else npm run coverage-and-report; fi", | ||
"coverage-and-report": "npm run coverage && mkdir -p coverage && nyc report --reporter=text-lcov > coverage/lcov.info", | ||
"coverage": "nyc ava", | ||
@@ -60,2 +71,5 @@ "lint": "eslint . && prettier --check '**/*.(html|json|md|sublime-project|ts|yml)'", | ||
"author": "Lewis J Ellis <me@lewisjellis.com>", | ||
"contributors": [ | ||
"Eli Skeggs <dev@eliskeggs.com> (https://eli.skeg.gs)" | ||
], | ||
"license": "MIT", | ||
@@ -62,0 +76,0 @@ "repository": { |
<a name="top"></a> | ||
![bee-queue logo](https://raw.githubusercontent.com/bee-queue/bee-queue/master/bee-queue.png) | ||
[![NPM Version][npm-image]][npm-url] [![Build Status][travis-image]][travis-url] [![Coverage Status][coveralls-image]][coveralls-url] | ||
[![npm Version][npm-image]][npm-url] [![Node.js CI](https://github.com/bee-queue/bee-queue/actions/workflows/node.js.yml/badge.svg)](https://github.com/bee-queue/bee-queue/actions/workflows/node.js.yml) [![Coverage Status][coveralls-image]][coveralls-url] | ||
@@ -32,3 +32,3 @@ A simple, fast, robust job/task queue for Node.js, backed by Redis. | ||
Thanks to the folks at [Mixmax](https://mixmax.com), Bee-Queue is once again being regularly [maintained](https://mixmax.com/blog/bee-queue-v1-node-redis-queue)! | ||
Thanks to the folks at [Mixmax](https://mixmax.com), Bee-Queue is once again being regularly [maintained](https://www.mixmax.com/engineering/bee-queue-v1-node-redis-queue/)! | ||
@@ -158,2 +158,16 @@ [Celery](http://www.celeryproject.org/), [Resque](https://github.com/resque/resque), [Kue](https://github.com/Automattic/kue), and [Bull](https://github.com/OptimalBits/bull) operate similarly, but are generally designed for longer background jobs, supporting things like job prioritization and repeatable jobs, which Bee-Queue [currently does not](#contributing). Bee-Queue can handle longer background jobs just fine, but they aren't [the primary focus](#motivation). | ||
### Advanced: Bulk-Creating Jobs | ||
Normally, creating and saving jobs blocks the underlying redis client for the full duration of an RTT to the Redis server. This can reduce throughput in cases where many operations should occur without delay - particularly when there are many jobs that need to be created quickly. Use `Queue#saveAll` to save an iterable (e.g. an Array) containing jobs in a pipelined network request, thus pushing all the work out on the wire before hearing back from the Redis server. | ||
```js | ||
addQueue | ||
.saveAll([addQueue.createJob({x: 3, y: 4}), addQueue.createJob({x: 4, y: 5})]) | ||
.then((errors) => { | ||
// The errors value is a Map associating Jobs with Errors. This will often be an empty Map. | ||
}); | ||
``` | ||
Each job in the array provided to saveAll will be mutated with the ID it gets assigned. | ||
## Processing Jobs | ||
@@ -320,3 +334,3 @@ | ||
- `stallInterval`: number, ms; the length of the window in which workers must report that they aren't stalling. Higher values will reduce Redis/network overhead, but if a worker stalls, it will take longer before its stalled job(s) will be retried. A higher value will also result in a lower probability of false-positives during stall detection. | ||
- `nearTermWindow`: number, ms; the window during which delayed jobs will be specifically scheduled using `setTimeout` - if all delayed jobs are further out that this window, the Queue will double-check that it hasn't missed any jobs after the window elapses. | ||
- `nearTermWindow`: number, ms; the window during which delayed jobs will be specifically scheduled using `setTimeout` - if all delayed jobs are further out than this window, the Queue will double-check that it hasn't missed any jobs after the window elapses. | ||
- `delayedDebounce`: number, ms; to avoid unnecessary churn for several jobs in short succession, the Queue may delay individual jobs by up to this amount. | ||
@@ -572,3 +586,3 @@ - `redis`: object or string, specifies how to connect to Redis. See [`redis.createClient()`](https://github.com/NodeRedis/node_redis#rediscreateclient) for the full set of options. | ||
#### Queue#close([cb]) | ||
#### Queue#close([timeout], [cb]) | ||
@@ -730,3 +744,3 @@ Closes the queue's connections to Redis. Idempotent. | ||
Explicitly sets the ID of the job. If a job with the given ID already exists, the Job will not be created, and `job.id` will be set to `null`. This method can be used to run a once for each of an external resource by passing that resource's ID. For instance, you might run the setup job for a user only once by setting the job ID to the ID of the user. | ||
Explicitly sets the ID of the job. If a job with the given ID already exists, the Job will not be created, and `job.id` will be set to `null`. This method can be used to run a job once for each of an external resource by passing that resource's ID. For instance, you might run the setup job for a user only once by setting the job ID to the ID of the user. Furthermore, when this feature is used with queue settings `removeOnSuccess: true` and `removeOnFailure: true`, it will allow that job to be re-run again, effectively ensuring that jobId will have a global concurrency of 1. | ||
@@ -894,5 +908,5 @@ Avoid passing a numeric job ID, as it may conflict with an auto-generated ID. | ||
[npm-url]: https://www.npmjs.com/package/bee-queue | ||
[travis-image]: https://img.shields.io/travis/bee-queue/bee-queue.svg?style=flat | ||
[travis-url]: https://travis-ci.org/bee-queue/bee-queue | ||
[travis-image]: https://github.com/bee-queue/bee-queue/workflows/Node.js%20CI/badge.svg | ||
[travis-url]: https://github.com/bee-queue/bee-queue/actions | ||
[coveralls-image]: https://coveralls.io/repos/bee-queue/bee-queue/badge.svg?branch=master | ||
[coveralls-url]: https://coveralls.io/r/bee-queue/bee-queue?branch=master |
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
100622
1370
907
22