Comparing version 0.2.2 to 0.2.3
@@ -5,2 +5,8 @@ # Changelog | ||
## 0.2.3 | ||
### Added | ||
- mode option | ||
## 0.2.2 | ||
@@ -7,0 +13,0 @@ |
@@ -17,2 +17,3 @@ 'use strict'; | ||
* @param {number} options.timeout job will run after this time (in case of too long previous tasks processing) | ||
* @param {string} options.mode mode {string} ('limiter' - rate limiter (no order guarantee); 'queue' - real queue (keep order)) | ||
* @memberof Coordinator | ||
@@ -23,3 +24,3 @@ */ | ||
const {jobId, client, concurrency, keyPending, keyProcessing, timeout, lock} = options; | ||
const {jobId, client, concurrency, keyPending, keyProcessing, timeout, lock, mode} = options; | ||
@@ -36,2 +37,3 @@ this._jobId = jobId; | ||
this._lock = lock; | ||
this._mode = mode; | ||
this._startTime = null; | ||
@@ -155,4 +157,11 @@ this._canRun = new Promise(resolve => this._resolve = resolve); | ||
if (processingCount < this._concurrency && nextJobId === this._jobId) { | ||
this._resolve(); | ||
if (processingCount < this._concurrency) { | ||
switch (this._mode) { | ||
case 'limiter': | ||
this._resolve(); | ||
break; | ||
case 'queue': | ||
nextJobId === this._jobId && this._resolve(); | ||
break; | ||
} | ||
} | ||
@@ -159,0 +168,0 @@ } |
29
index.js
@@ -26,2 +26,3 @@ 'use strict'; | ||
* @param {number} options.timeout job will run after this time (in case of too long previous tasks processing) | ||
* @param {string} options.mode mode {string} ('limiter' - rate limiter (no order guarantee); 'queue' - real queue (keep order)) | ||
* @memberof Oraq | ||
@@ -36,5 +37,7 @@ */ | ||
timeout = 2 * 60 * 60 * 1000, // 2 hours | ||
concurrency = 1 | ||
concurrency = 1, | ||
mode = 'queue' | ||
} = options || {}; | ||
this._mode = mode; | ||
this._ping = ping; | ||
@@ -104,3 +107,4 @@ this._timeout = timeout; | ||
timeout: this._timeout, | ||
lock: this._lock | ||
lock: this._lock, | ||
mode: this._mode | ||
}); | ||
@@ -127,8 +131,19 @@ const onKeyEvent = this._getOnKeyEvent(coordinator); | ||
await coordinator.keepAlive(this._ping); | ||
// move job from pending to processing queue | ||
await this._client | ||
.multi() | ||
.brpoplpush(this._keyPending, this._keyProcessing, 0) | ||
.del(`${this._keyPending}:${jobId}${this._lock}`) | ||
.exec(); | ||
const pipe = this._client.multi(); | ||
switch (this._mode) { | ||
case 'limiter': | ||
pipe.lrem(this._keyPending, 1, jobId); | ||
pipe.lpush(this._keyProcessing, jobId); | ||
break; | ||
case 'queue': | ||
pipe.brpoplpush(this._keyPending, this._keyProcessing, 0); | ||
break; | ||
} | ||
pipe.del(`${this._keyPending}:${jobId}${this._lock}`); | ||
await pipe.exec(); | ||
// run job | ||
@@ -135,0 +150,0 @@ result = await job(jobData); |
{ | ||
"name": "oraq", | ||
"version": "0.2.2", | ||
"version": "0.2.3", | ||
"description": "Ordered redis asynchronous queue", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
@@ -53,3 +53,4 @@ # Oraq | ||
timeout, // job will run after this time {integer} (in case of too long previous tasks processing, 2 * 60 * 60 * 1000 (2 hours) by default) | ||
concurrency // jobs concurrency {integer} (1 by default) | ||
concurrency, // jobs concurrency {integer} (1 by default) | ||
mode // mode {string} ("limiter" - rate limiter (no order guarantee) or "queue" - real queue (keep order), "queue" by default) | ||
}); | ||
@@ -56,0 +57,0 @@ ``` |
18743
492
88