Comparing version 5.8.2 to 5.8.3
@@ -27,11 +27,4 @@ "use strict"; | ||
--[[ | ||
Function to check for the meta.paused key to decide if we are paused or not | ||
(since an empty list and !EXISTS are not really the same). | ||
Function to add job in target list and add marker if needed. | ||
]] | ||
local function isQueuePaused(queueMetaKey) | ||
return rcall("HEXISTS", queueMetaKey, "paused") == 1 | ||
end | ||
--[[ | ||
Function to add job considering priority. | ||
]] | ||
-- Includes | ||
@@ -46,2 +39,10 @@ --[[ | ||
end | ||
local function addJobInTargetList(targetKey, markerKey, pushCmd, isPaused, jobId) | ||
rcall(pushCmd, targetKey, jobId) | ||
addBaseMarkerIfNeeded(markerKey, isPaused) | ||
end | ||
--[[ | ||
Function to add job considering priority. | ||
]] | ||
-- Includes | ||
local function addJobWithPriority(markerKey, prioritizedKey, priority, jobId, priorityCounterKey, isPaused) | ||
@@ -53,5 +54,39 @@ local prioCounter = rcall("INCR", priorityCounterKey) | ||
end | ||
--[[ | ||
Function to check for the meta.paused key to decide if we are paused or not | ||
(since an empty list and !EXISTS are not really the same). | ||
]] | ||
local function getTargetQueueList(queueMetaKey, waitKey, pausedKey) | ||
if rcall("HEXISTS", queueMetaKey, "paused") ~= 1 then | ||
return waitKey, false | ||
else | ||
return pausedKey, true | ||
end | ||
end | ||
--[[ | ||
Function to push back job considering priority in front of same prioritized jobs. | ||
]] | ||
local function pushBackJobWithPriority(prioritizedKey, priority, jobId) | ||
-- in order to put it at front of same prioritized jobs | ||
-- we consider prioritized counter as 0 | ||
local score = priority * 0x100000000 | ||
rcall("ZADD", prioritizedKey, score, jobId) | ||
end | ||
local function reAddJobWithNewPriority( prioritizedKey, markerKey, targetKey, | ||
priorityCounter, lifo, priority, jobId, paused) | ||
if priority == 0 then | ||
local pushCmd = lifo and 'RPUSH' or 'LPUSH' | ||
addJobInTargetList(targetKey, markerKey, pushCmd, paused, jobId) | ||
else | ||
if lifo then | ||
pushBackJobWithPriority(prioritizedKey, priority, jobId) | ||
else | ||
addJobWithPriority(markerKey, prioritizedKey, priority, jobId, | ||
priorityCounter, paused) | ||
end | ||
end | ||
end | ||
if rcall("EXISTS", jobKey) == 1 then | ||
local metaKey = KEYS[3] | ||
local isPaused = isQueuePaused(metaKey) | ||
local target, isPaused = getTargetQueueList(metaKey, KEYS[1], KEYS[2]) | ||
local markerKey = KEYS[6] | ||
@@ -61,13 +96,7 @@ local prioritizedKey = KEYS[4] | ||
if rcall("ZREM", KEYS[4], jobId) > 0 then | ||
addJobWithPriority(markerKey, prioritizedKey, priority, jobId, KEYS[5], | ||
isPaused) | ||
-- If the new priority is 0, then just leave the job where it is in the wait list. | ||
elseif priority > 0 then | ||
-- Job is already in the wait list, we need to re-add it with the new priority. | ||
local target = isPaused and KEYS[2] or KEYS[1] | ||
local numRemovedElements = rcall("LREM", target, -1, jobId) | ||
if numRemovedElements > 0 then | ||
addJobWithPriority(markerKey, prioritizedKey, priority, jobId, | ||
KEYS[5], isPaused) | ||
end | ||
reAddJobWithNewPriority( prioritizedKey, markerKey, target, | ||
KEYS[5], ARGV[4] == '1', priority, jobId, isPaused) | ||
elseif rcall("LREM", target, -1, jobId) > 0 then | ||
reAddJobWithNewPriority( prioritizedKey, markerKey, target, | ||
KEYS[5], ARGV[4] == '1', priority, jobId, isPaused) | ||
end | ||
@@ -74,0 +103,0 @@ rcall("HSET", jobKey, "priority", priority) |
@@ -58,3 +58,3 @@ /** | ||
saveStacktraceArgs(jobId: string, stacktrace: string, failedReason: string): string[]; | ||
moveToWaitingChildrenArgs(jobId: string, token: string, opts?: MoveToWaitingChildrenOpts): string[]; | ||
moveToWaitingChildrenArgs(jobId: string, token: string, opts?: MoveToWaitingChildrenOpts): (string | number)[]; | ||
moveToDelayed(jobId: string, timestamp: number, delay: number, token?: string, opts?: MoveToDelayedOpts): Promise<void>; | ||
@@ -61,0 +61,0 @@ /** |
@@ -24,11 +24,4 @@ const content = `--[[ | ||
--[[ | ||
Function to check for the meta.paused key to decide if we are paused or not | ||
(since an empty list and !EXISTS are not really the same). | ||
Function to add job in target list and add marker if needed. | ||
]] | ||
local function isQueuePaused(queueMetaKey) | ||
return rcall("HEXISTS", queueMetaKey, "paused") == 1 | ||
end | ||
--[[ | ||
Function to add job considering priority. | ||
]] | ||
-- Includes | ||
@@ -43,2 +36,10 @@ --[[ | ||
end | ||
local function addJobInTargetList(targetKey, markerKey, pushCmd, isPaused, jobId) | ||
rcall(pushCmd, targetKey, jobId) | ||
addBaseMarkerIfNeeded(markerKey, isPaused) | ||
end | ||
--[[ | ||
Function to add job considering priority. | ||
]] | ||
-- Includes | ||
local function addJobWithPriority(markerKey, prioritizedKey, priority, jobId, priorityCounterKey, isPaused) | ||
@@ -50,5 +51,39 @@ local prioCounter = rcall("INCR", priorityCounterKey) | ||
end | ||
--[[ | ||
Function to check for the meta.paused key to decide if we are paused or not | ||
(since an empty list and !EXISTS are not really the same). | ||
]] | ||
local function getTargetQueueList(queueMetaKey, waitKey, pausedKey) | ||
if rcall("HEXISTS", queueMetaKey, "paused") ~= 1 then | ||
return waitKey, false | ||
else | ||
return pausedKey, true | ||
end | ||
end | ||
--[[ | ||
Function to push back job considering priority in front of same prioritized jobs. | ||
]] | ||
local function pushBackJobWithPriority(prioritizedKey, priority, jobId) | ||
-- in order to put it at front of same prioritized jobs | ||
-- we consider prioritized counter as 0 | ||
local score = priority * 0x100000000 | ||
rcall("ZADD", prioritizedKey, score, jobId) | ||
end | ||
local function reAddJobWithNewPriority( prioritizedKey, markerKey, targetKey, | ||
priorityCounter, lifo, priority, jobId, paused) | ||
if priority == 0 then | ||
local pushCmd = lifo and 'RPUSH' or 'LPUSH' | ||
addJobInTargetList(targetKey, markerKey, pushCmd, paused, jobId) | ||
else | ||
if lifo then | ||
pushBackJobWithPriority(prioritizedKey, priority, jobId) | ||
else | ||
addJobWithPriority(markerKey, prioritizedKey, priority, jobId, | ||
priorityCounter, paused) | ||
end | ||
end | ||
end | ||
if rcall("EXISTS", jobKey) == 1 then | ||
local metaKey = KEYS[3] | ||
local isPaused = isQueuePaused(metaKey) | ||
local target, isPaused = getTargetQueueList(metaKey, KEYS[1], KEYS[2]) | ||
local markerKey = KEYS[6] | ||
@@ -58,13 +93,7 @@ local prioritizedKey = KEYS[4] | ||
if rcall("ZREM", KEYS[4], jobId) > 0 then | ||
addJobWithPriority(markerKey, prioritizedKey, priority, jobId, KEYS[5], | ||
isPaused) | ||
-- If the new priority is 0, then just leave the job where it is in the wait list. | ||
elseif priority > 0 then | ||
-- Job is already in the wait list, we need to re-add it with the new priority. | ||
local target = isPaused and KEYS[2] or KEYS[1] | ||
local numRemovedElements = rcall("LREM", target, -1, jobId) | ||
if numRemovedElements > 0 then | ||
addJobWithPriority(markerKey, prioritizedKey, priority, jobId, | ||
KEYS[5], isPaused) | ||
end | ||
reAddJobWithNewPriority( prioritizedKey, markerKey, target, | ||
KEYS[5], ARGV[4] == '1', priority, jobId, isPaused) | ||
elseif rcall("LREM", target, -1, jobId) > 0 then | ||
reAddJobWithNewPriority( prioritizedKey, markerKey, target, | ||
KEYS[5], ARGV[4] == '1', priority, jobId, isPaused) | ||
end | ||
@@ -71,0 +100,0 @@ rcall("HSET", jobKey, "priority", priority) |
{ | ||
"name": "bullmq", | ||
"version": "5.8.2", | ||
"version": "5.8.3", | ||
"description": "Queue for messages and jobs based on Redis", | ||
@@ -71,3 +71,3 @@ "homepage": "https://bullmq.io/", | ||
"@semantic-release/git": "^10.0.1", | ||
"@semantic-release/github": "^8.0.5", | ||
"@semantic-release/github": "^8.1.0", | ||
"@semantic-release/npm": "^9.0.1", | ||
@@ -74,0 +74,0 @@ "@semantic-release/release-notes-generator": "^10.0.3", |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
1520158
24514