Comparing version 4.12.5 to 4.12.6
@@ -0,1 +1,8 @@ | ||
## [4.12.6](https://github.com/OptimalBits/bull/compare/v4.12.5...v4.12.6) (2024-05-18) | ||
### Bug Fixes | ||
* **stalled:** take in count removeOnFail option ([#2734](https://github.com/OptimalBits/bull/issues/2734)) ([2112269](https://github.com/OptimalBits/bull/commit/21122697461b551055192ff3b9c02c6f37cb331e)) | ||
## [4.12.5](https://github.com/OptimalBits/bull/compare/v4.12.4...v4.12.5) (2024-05-17) | ||
@@ -2,0 +9,0 @@ |
@@ -40,3 +40,3 @@ /** | ||
job.data, | ||
job.opts, | ||
pack(job.opts), | ||
job.timestamp, | ||
@@ -43,0 +43,0 @@ job.delay, |
@@ -49,4 +49,5 @@ 'use strict'; | ||
end | ||
local opts = cmsgpack.unpack(ARGV[5]) | ||
-- Store the job. | ||
rcall("HMSET", jobIdKey, "name", ARGV[3], "data", ARGV[4], "opts", ARGV[5], "timestamp", ARGV[6], "delay", ARGV[7], "priority", ARGV[9]) | ||
rcall("HMSET", jobIdKey, "name", ARGV[3], "data", ARGV[4], "opts", opts, "timestamp", ARGV[6], "delay", ARGV[7], "priority", ARGV[9]) | ||
-- Check if job is delayed | ||
@@ -53,0 +54,0 @@ local delayedTimestamp = tonumber(ARGV[8]) |
@@ -20,2 +20,22 @@ 'use strict'; | ||
local rcall = redis.call | ||
local function removeJob(jobId, baseKey) | ||
local jobKey = baseKey .. jobId | ||
rcall("DEL", jobKey, jobKey .. ':logs') | ||
end | ||
local function removeJobsByMaxAge(timestamp, maxAge, targetSet, prefix) | ||
local start = timestamp - maxAge * 1000 | ||
local jobIds = rcall("ZREVRANGEBYSCORE", targetSet, start, "-inf") | ||
for i, jobId in ipairs(jobIds) do | ||
removeJob(jobId, prefix) | ||
end | ||
rcall("ZREMRANGEBYSCORE", targetSet, "-inf", start) | ||
end | ||
local function removeJobsByMaxCount(maxCount, targetSet, prefix) | ||
local start = maxCount | ||
local jobIds = rcall("ZREVRANGE", targetSet, start, -1) | ||
for i, jobId in ipairs(jobIds) do | ||
removeJob(jobId, prefix) | ||
end | ||
rcall("ZREMRANGEBYRANK", targetSet, 0, -(maxCount + 1)) | ||
end | ||
local function batches(n, batchSize) | ||
@@ -62,5 +82,29 @@ local i = 0 | ||
if(stalledCount > MAX_STALLED_JOB_COUNT) then | ||
local rawOpts = rcall("HGET", jobKey, "opts") | ||
local opts = cjson.decode(rawOpts) | ||
local removeOnFailType = type(opts["removeOnFail"]) | ||
rcall("ZADD", KEYS[4], ARGV[3], jobId) | ||
rcall("HSET", jobKey, "failedReason", "job stalled more than allowable limit") | ||
rcall("HMSET", jobKey, "failedReason", "job stalled more than allowable limit", | ||
"finishedOn", ARGV[3]) | ||
rcall("PUBLISH", KEYS[4], '{"jobId":"' .. jobId .. '", "val": "job stalled more than maxStalledCount"}') | ||
if removeOnFailType == "number" then | ||
removeJobsByMaxCount(opts["removeOnFail"], | ||
KEYS[4], ARGV[2]) | ||
elseif removeOnFailType == "boolean" then | ||
if opts["removeOnFail"] then | ||
removeJob(jobId, ARGV[2]) | ||
rcall("ZREM", KEYS[4], jobId) | ||
end | ||
elseif removeOnFailType ~= "nil" then | ||
local maxAge = opts["removeOnFail"]["age"] | ||
local maxCount = opts["removeOnFail"]["count"] | ||
if maxAge ~= nil then | ||
removeJobsByMaxAge(ARGV[3], maxAge, | ||
KEYS[4], ARGV[2]) | ||
end | ||
if maxCount ~= nil and maxCount > 0 then | ||
removeJobsByMaxCount(maxCount, KEYS[4], | ||
ARGV[2]) | ||
end | ||
end | ||
table.insert(failed, jobId) | ||
@@ -67,0 +111,0 @@ else |
{ | ||
"name": "bull", | ||
"version": "4.12.5", | ||
"version": "4.12.6", | ||
"description": "Job manager", | ||
@@ -5,0 +5,0 @@ "engines": { |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
290512
5945