Socket
Socket
Sign inDemoInstall

bullmq

Package Overview
Dependencies
Maintainers
0
Versions
531
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

bullmq - npm Package Compare versions

Comparing version 5.8.2 to 5.8.3

69

dist/cjs/scripts/changePriority-6.js

@@ -27,11 +27,4 @@ "use strict";

--[[
Function to check for the meta.paused key to decide if we are paused or not
(since an empty list and !EXISTS are not really the same).
Function to add job in target list and add marker if needed.
]]
local function isQueuePaused(queueMetaKey)
return rcall("HEXISTS", queueMetaKey, "paused") == 1
end
--[[
Function to add job considering priority.
]]
-- Includes

@@ -46,2 +39,10 @@ --[[

end
local function addJobInTargetList(targetKey, markerKey, pushCmd, isPaused, jobId)
rcall(pushCmd, targetKey, jobId)
addBaseMarkerIfNeeded(markerKey, isPaused)
end
--[[
Function to add job considering priority.
]]
-- Includes
local function addJobWithPriority(markerKey, prioritizedKey, priority, jobId, priorityCounterKey, isPaused)

@@ -53,5 +54,39 @@ local prioCounter = rcall("INCR", priorityCounterKey)

end
--[[
Function to check for the meta.paused key to decide if we are paused or not
(since an empty list and !EXISTS are not really the same).
]]
local function getTargetQueueList(queueMetaKey, waitKey, pausedKey)
if rcall("HEXISTS", queueMetaKey, "paused") ~= 1 then
return waitKey, false
else
return pausedKey, true
end
end
--[[
Function to push back job considering priority in front of same prioritized jobs.
]]
local function pushBackJobWithPriority(prioritizedKey, priority, jobId)
-- in order to put it at front of same prioritized jobs
-- we consider prioritized counter as 0
local score = priority * 0x100000000
rcall("ZADD", prioritizedKey, score, jobId)
end
local function reAddJobWithNewPriority( prioritizedKey, markerKey, targetKey,
priorityCounter, lifo, priority, jobId, paused)
if priority == 0 then
local pushCmd = lifo and 'RPUSH' or 'LPUSH'
addJobInTargetList(targetKey, markerKey, pushCmd, paused, jobId)
else
if lifo then
pushBackJobWithPriority(prioritizedKey, priority, jobId)
else
addJobWithPriority(markerKey, prioritizedKey, priority, jobId,
priorityCounter, paused)
end
end
end
if rcall("EXISTS", jobKey) == 1 then
local metaKey = KEYS[3]
local isPaused = isQueuePaused(metaKey)
local target, isPaused = getTargetQueueList(metaKey, KEYS[1], KEYS[2])
local markerKey = KEYS[6]

@@ -61,13 +96,7 @@ local prioritizedKey = KEYS[4]

if rcall("ZREM", KEYS[4], jobId) > 0 then
addJobWithPriority(markerKey, prioritizedKey, priority, jobId, KEYS[5],
isPaused)
-- If the new priority is 0, then just leave the job where it is in the wait list.
elseif priority > 0 then
-- Job is already in the wait list, we need to re-add it with the new priority.
local target = isPaused and KEYS[2] or KEYS[1]
local numRemovedElements = rcall("LREM", target, -1, jobId)
if numRemovedElements > 0 then
addJobWithPriority(markerKey, prioritizedKey, priority, jobId,
KEYS[5], isPaused)
end
reAddJobWithNewPriority( prioritizedKey, markerKey, target,
KEYS[5], ARGV[4] == '1', priority, jobId, isPaused)
elseif rcall("LREM", target, -1, jobId) > 0 then
reAddJobWithNewPriority( prioritizedKey, markerKey, target,
KEYS[5], ARGV[4] == '1', priority, jobId, isPaused)
end

@@ -74,0 +103,0 @@ rcall("HSET", jobKey, "priority", priority)

@@ -58,3 +58,3 @@ /**

saveStacktraceArgs(jobId: string, stacktrace: string, failedReason: string): string[];
moveToWaitingChildrenArgs(jobId: string, token: string, opts?: MoveToWaitingChildrenOpts): string[];
moveToWaitingChildrenArgs(jobId: string, token: string, opts?: MoveToWaitingChildrenOpts): (string | number)[];
moveToDelayed(jobId: string, timestamp: number, delay: number, token?: string, opts?: MoveToDelayedOpts): Promise<void>;

@@ -61,0 +61,0 @@ /**

@@ -24,11 +24,4 @@ const content = `--[[

--[[
Function to check for the meta.paused key to decide if we are paused or not
(since an empty list and !EXISTS are not really the same).
Function to add job in target list and add marker if needed.
]]
local function isQueuePaused(queueMetaKey)
return rcall("HEXISTS", queueMetaKey, "paused") == 1
end
--[[
Function to add job considering priority.
]]
-- Includes

@@ -43,2 +36,10 @@ --[[

end
local function addJobInTargetList(targetKey, markerKey, pushCmd, isPaused, jobId)
rcall(pushCmd, targetKey, jobId)
addBaseMarkerIfNeeded(markerKey, isPaused)
end
--[[
Function to add job considering priority.
]]
-- Includes
local function addJobWithPriority(markerKey, prioritizedKey, priority, jobId, priorityCounterKey, isPaused)

@@ -50,5 +51,39 @@ local prioCounter = rcall("INCR", priorityCounterKey)

end
--[[
Function to check for the meta.paused key to decide if we are paused or not
(since an empty list and !EXISTS are not really the same).
]]
local function getTargetQueueList(queueMetaKey, waitKey, pausedKey)
if rcall("HEXISTS", queueMetaKey, "paused") ~= 1 then
return waitKey, false
else
return pausedKey, true
end
end
--[[
Function to push back job considering priority in front of same prioritized jobs.
]]
local function pushBackJobWithPriority(prioritizedKey, priority, jobId)
-- in order to put it at front of same prioritized jobs
-- we consider prioritized counter as 0
local score = priority * 0x100000000
rcall("ZADD", prioritizedKey, score, jobId)
end
local function reAddJobWithNewPriority( prioritizedKey, markerKey, targetKey,
priorityCounter, lifo, priority, jobId, paused)
if priority == 0 then
local pushCmd = lifo and 'RPUSH' or 'LPUSH'
addJobInTargetList(targetKey, markerKey, pushCmd, paused, jobId)
else
if lifo then
pushBackJobWithPriority(prioritizedKey, priority, jobId)
else
addJobWithPriority(markerKey, prioritizedKey, priority, jobId,
priorityCounter, paused)
end
end
end
if rcall("EXISTS", jobKey) == 1 then
local metaKey = KEYS[3]
local isPaused = isQueuePaused(metaKey)
local target, isPaused = getTargetQueueList(metaKey, KEYS[1], KEYS[2])
local markerKey = KEYS[6]

@@ -58,13 +93,7 @@ local prioritizedKey = KEYS[4]

if rcall("ZREM", KEYS[4], jobId) > 0 then
addJobWithPriority(markerKey, prioritizedKey, priority, jobId, KEYS[5],
isPaused)
-- If the new priority is 0, then just leave the job where it is in the wait list.
elseif priority > 0 then
-- Job is already in the wait list, we need to re-add it with the new priority.
local target = isPaused and KEYS[2] or KEYS[1]
local numRemovedElements = rcall("LREM", target, -1, jobId)
if numRemovedElements > 0 then
addJobWithPriority(markerKey, prioritizedKey, priority, jobId,
KEYS[5], isPaused)
end
reAddJobWithNewPriority( prioritizedKey, markerKey, target,
KEYS[5], ARGV[4] == '1', priority, jobId, isPaused)
elseif rcall("LREM", target, -1, jobId) > 0 then
reAddJobWithNewPriority( prioritizedKey, markerKey, target,
KEYS[5], ARGV[4] == '1', priority, jobId, isPaused)
end

@@ -71,0 +100,0 @@ rcall("HSET", jobKey, "priority", priority)

{
"name": "bullmq",
"version": "5.8.2",
"version": "5.8.3",
"description": "Queue for messages and jobs based on Redis",

@@ -71,3 +71,3 @@ "homepage": "https://bullmq.io/",

"@semantic-release/git": "^10.0.1",
"@semantic-release/github": "^8.0.5",
"@semantic-release/github": "^8.1.0",
"@semantic-release/npm": "^9.0.1",

@@ -74,0 +74,0 @@ "@semantic-release/release-notes-generator": "^10.0.3",

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc