Comparing version
@@ -0,1 +1,8 @@ | ||
## [4.12.6](https://github.com/OptimalBits/bull/compare/v4.12.5...v4.12.6) (2024-05-18) | ||
### Bug Fixes | ||
* **stalled:** take in count removeOnFail option ([#2734](https://github.com/OptimalBits/bull/issues/2734)) ([2112269](https://github.com/OptimalBits/bull/commit/21122697461b551055192ff3b9c02c6f37cb331e)) | ||
## [4.12.5](https://github.com/OptimalBits/bull/compare/v4.12.4...v4.12.5) (2024-05-17) | ||
@@ -2,0 +9,0 @@ |
@@ -40,3 +40,3 @@ /** | ||
job.data, | ||
job.opts, | ||
pack(job.opts), | ||
job.timestamp, | ||
@@ -43,0 +43,0 @@ job.delay, |
@@ -49,4 +49,5 @@ 'use strict'; | ||
end | ||
local opts = cmsgpack.unpack(ARGV[5]) | ||
-- Store the job. | ||
rcall("HMSET", jobIdKey, "name", ARGV[3], "data", ARGV[4], "opts", ARGV[5], "timestamp", ARGV[6], "delay", ARGV[7], "priority", ARGV[9]) | ||
rcall("HMSET", jobIdKey, "name", ARGV[3], "data", ARGV[4], "opts", opts, "timestamp", ARGV[6], "delay", ARGV[7], "priority", ARGV[9]) | ||
-- Check if job is delayed | ||
@@ -53,0 +54,0 @@ local delayedTimestamp = tonumber(ARGV[8]) |
@@ -20,2 +20,22 @@ 'use strict'; | ||
local rcall = redis.call | ||
local function removeJob(jobId, baseKey) | ||
local jobKey = baseKey .. jobId | ||
rcall("DEL", jobKey, jobKey .. ':logs') | ||
end | ||
local function removeJobsByMaxAge(timestamp, maxAge, targetSet, prefix) | ||
local start = timestamp - maxAge * 1000 | ||
local jobIds = rcall("ZREVRANGEBYSCORE", targetSet, start, "-inf") | ||
for i, jobId in ipairs(jobIds) do | ||
removeJob(jobId, prefix) | ||
end | ||
rcall("ZREMRANGEBYSCORE", targetSet, "-inf", start) | ||
end | ||
local function removeJobsByMaxCount(maxCount, targetSet, prefix) | ||
local start = maxCount | ||
local jobIds = rcall("ZREVRANGE", targetSet, start, -1) | ||
for i, jobId in ipairs(jobIds) do | ||
removeJob(jobId, prefix) | ||
end | ||
rcall("ZREMRANGEBYRANK", targetSet, 0, -(maxCount + 1)) | ||
end | ||
local function batches(n, batchSize) | ||
@@ -62,5 +82,29 @@ local i = 0 | ||
if(stalledCount > MAX_STALLED_JOB_COUNT) then | ||
local rawOpts = rcall("HGET", jobKey, "opts") | ||
local opts = cjson.decode(rawOpts) | ||
local removeOnFailType = type(opts["removeOnFail"]) | ||
rcall("ZADD", KEYS[4], ARGV[3], jobId) | ||
rcall("HSET", jobKey, "failedReason", "job stalled more than allowable limit") | ||
rcall("HMSET", jobKey, "failedReason", "job stalled more than allowable limit", | ||
"finishedOn", ARGV[3]) | ||
rcall("PUBLISH", KEYS[4], '{"jobId":"' .. jobId .. '", "val": "job stalled more than maxStalledCount"}') | ||
if removeOnFailType == "number" then | ||
removeJobsByMaxCount(opts["removeOnFail"], | ||
KEYS[4], ARGV[2]) | ||
elseif removeOnFailType == "boolean" then | ||
if opts["removeOnFail"] then | ||
removeJob(jobId, ARGV[2]) | ||
rcall("ZREM", KEYS[4], jobId) | ||
end | ||
elseif removeOnFailType ~= "nil" then | ||
local maxAge = opts["removeOnFail"]["age"] | ||
local maxCount = opts["removeOnFail"]["count"] | ||
if maxAge ~= nil then | ||
removeJobsByMaxAge(ARGV[3], maxAge, | ||
KEYS[4], ARGV[2]) | ||
end | ||
if maxCount ~= nil and maxCount > 0 then | ||
removeJobsByMaxCount(maxCount, KEYS[4], | ||
ARGV[2]) | ||
end | ||
end | ||
table.insert(failed, jobId) | ||
@@ -67,0 +111,0 @@ else |
{ | ||
"name": "bull", | ||
"version": "4.12.5", | ||
"version": "4.12.6", | ||
"description": "Job manager", | ||
@@ -5,0 +5,0 @@ "engines": { |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
290512
1.32%5945
0.76%