New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

node-resque

Package Overview
Dependencies
Maintainers
3
Versions
181
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

node-resque - npm Package Compare versions

Comparing version 5.2.0 to 5.3.0

__tests__/core/connection.js

1

examples/customPluginExample.js

@@ -66,3 +66,2 @@ const path = require('path')

await worker.connect()
await worker.workerCleanup() // optional: cleanup any previous improperly shutdown workers on this host
worker.start()

@@ -69,0 +68,0 @@

@@ -55,3 +55,2 @@ const path = require('path')

await worker.connect()
await worker.workerCleanup() // optional: cleanup any previous improperly shutdown workers on this host
worker.start()

@@ -58,0 +57,0 @@

@@ -69,3 +69,2 @@ const path = require('path')

await worker.connect()
await worker.workerCleanup() // optional: cleanup any previous improperly shutdown workers on this host
worker.start()

@@ -89,2 +88,3 @@

worker.on('poll', (queue) => { console.log(`worker polling ${queue}`) })
worker.on('ping', (time) => { console.log(`worker check in @ ${time}`) })
worker.on('job', (queue, job) => { console.log(`working job ${queue} ${JSON.stringify(job)}`) })

@@ -102,2 +102,3 @@ worker.on('reEnqueue', (queue, job, plugin) => { console.log(`reEnqueue job (${plugin}) ${queue} ${JSON.stringify(job)}`) })

scheduler.on('error', (error) => { console.log(`scheduler error >> ${error}`) })
scheduler.on('cleanStuckWorker', (workerName, errorPayload, delta) => { console.log(`failing ${workerName} (stuck for ${delta}s) and failing job ${errorPayload}`) })
scheduler.on('workingTimestamp', (timestamp) => { console.log(`scheduler working timestamp ${timestamp}`) })

@@ -104,0 +105,0 @@ scheduler.on('transferredJob', (timestamp, job) => { console.log(`scheduler enquing job ${timestamp} >> ${JSON.stringify(job)}`) })

@@ -51,3 +51,2 @@ const path = require('path')

await worker.connect()
await worker.workerCleanup() // optional: cleanup any previous improperly shutdown workers on this host
worker.start()

@@ -54,0 +53,0 @@

@@ -42,3 +42,2 @@ const path = require('path')

await worker.connect()
await worker.workerCleanup() // optional: cleanup any previous improperly shutdown workers on this host
worker.start()

@@ -45,0 +44,0 @@

@@ -70,2 +70,3 @@ const os = require('os')

worker.on('poll', (queue) => { this.emit('poll', worker.id, queue) })
worker.on('ping', (time) => { this.emit('ping', worker.id, time) })
worker.on('job', (queue, job) => { this.emit('job', worker.id, queue, job) })

@@ -81,3 +82,2 @@ worker.on('reEnqueue', (queue, job, plugin) => { this.emit('reEnqueue', worker.id, queue, job, plugin) })

await worker.connect()
await worker.workerCleanup()
await worker.start()

@@ -150,2 +150,3 @@ }

'poll',
'ping',
'job',

@@ -152,0 +153,0 @@ 'reEnqueue',

const path = require('path')
const os = require('os')
const EventEmitter = require('events').EventEmitter

@@ -252,2 +253,3 @@ const Connection = require(path.join(__dirname, 'connection.js')).Connection

let workingOn = await this.workingOn(workerName, queues)
let message = 'Worker Timeout (killed manually)'
if (workingOn) {

@@ -258,6 +260,10 @@ workingOn = JSON.parse(workingOn)

queue: workingOn.queue,
payload: workingOn.payload,
exception: 'Worker Timeout (killed manually)',
error: 'Worker Timeout (killed manually)',
backtrace: null,
payload: workingOn.payload || [],
exception: message,
error: message,
backtrace: [
`killed by ${os.hostname} at ${new Date()}`,
'queue#forceCleanWorker',
'node-resque'
],
failed_at: (new Date()).toString()

@@ -273,2 +279,3 @@ }

await this.connection.redis.del(this.connection.key('stat', 'processed', workerName))
await this.connection.redis.del(this.connection.key('worker', 'ping', workerName))
await this.connection.redis.del(this.connection.key('worker', workerName))

@@ -275,0 +282,0 @@ await this.connection.redis.srem(this.connection.key('workers'), workerName + ':' + queues)

@@ -16,2 +16,3 @@ // To read notes about the master locking scheme, check out:

timeout: 5000, // in ms
stuckWorkerTimeout: 60 * 60 * 1000, // 60 minutes in ms
masterLockTimeout: 60 * 3, // in seconds

@@ -92,2 +93,4 @@ name: os.hostname() + ':' + process.pid // assumes only one worker per node process

await this.checkStuckWorkers()
this.emit('poll')

@@ -189,4 +192,29 @@ let timestamp = await this.nextDelayedTimestamp()

}
async checkStuckWorkers () {
if (!this.options.stuckWorkerTimeout) { return }
const keys = await this.connection.redis.keys(this.connection.key('worker', 'ping', '*'))
const payloads = await Promise.all(keys.map(async (k) => {
return JSON.parse(await this.connection.redis.get(k))
}))
const nowInSeconds = Math.round(new Date().getTime() / 1000)
const stuckWorkerTimeoutInSeconds = Math.round(this.options.stuckWorkerTimeout / 1000)
for (let i in payloads) {
const {name, time} = payloads[i]
const delta = nowInSeconds - time
if (delta > stuckWorkerTimeoutInSeconds) {
await this.forceCleanWorker(name, delta)
}
i++
}
}
async forceCleanWorker (workerName, delta) {
const errorPayload = await this.queue.forceCleanWorker(workerName)
this.emit('cleanStuckWorker', workerName, errorPayload, delta)
}
}
exports.Scheduler = Scheduler
const os = require('os')
const path = require('path')
const exec = require('child_process').exec
const EventEmitter = require('events').EventEmitter

@@ -42,2 +41,3 @@ const Queue = require(path.join(__dirname, 'queue.js')).Queue

this.job = null
this.pingTimer = null

@@ -68,2 +68,4 @@ this.queueObject = new Queue({connection: options.connection}, this.jobs)

)
await this.ping()
this.pingTimer = setInterval(this.ping.bind(this), this.options.timeout)
}

@@ -80,3 +82,4 @@

if (this.connection.connected === true || this.connection.connected === undefined || this.connection.connected === null) {
await this.untrack(this.name, this.stringQueues())
clearInterval(this.pingTimer)
await this.untrack()
}

@@ -273,8 +276,23 @@

async untrack (name, queues) {
async ping () {
const name = this.name
const nowSeconds = Math.round(new Date().getTime() / 1000)
this.emit('ping', nowSeconds)
const payload = JSON.stringify({
time: nowSeconds,
name: name,
queues: this.stringQueues()
})
await this.connection.redis.set(this.connection.key('worker', 'ping', name), payload)
}
async untrack () {
const name = this.name
const queues = this.stringQueues()
if (!this.connection || !this.connection.redis) { return }
await this.connection.redis.srem(this.connection.key('workers'), (name + ':' + queues))
await this.connection.redis.del(this.connection.key('worker', name, this.stringQueues()))
await this.connection.redis.del(this.connection.key('worker', name, this.stringQueues(), 'started'))
await this.connection.redis.del(this.connection.key('worker', 'ping', name))
await this.connection.redis.del(this.connection.key('worker', name, queues))
await this.connection.redis.del(this.connection.key('worker', name, queues, 'started'))
await this.connection.redis.del(this.connection.key('stat', 'failed', name))

@@ -284,47 +302,2 @@ await this.connection.redis.del(this.connection.key('stat', 'processed', name))

async workerCleanup () {
let pids = await this.getPids()
let workers = await this.connection.redis.smembers(this.connection.key('workers'))
for (let i in workers) {
let worker = workers[i]
let parts = worker.split(':')
let host = parts[0]
let pid = parseInt(parts[1])
let queues = parts.splice(-1, 1)
let pureName = parts.join(':')
if (host === os.hostname() && pids.indexOf(pid) < 0) {
this.emit('cleaning_worker', worker, pid)
await this.untrack(pureName, queues)
}
}
}
async getPids () {
let cmd
if (process.platform === 'win32') {
cmd = 'powershell.exe -command "Get-Process | select Id"'
} else {
cmd = 'ps -eo pid='
}
return new Promise((resolve, reject) => {
exec(cmd, (error, stdout, stderr) => {
if (!error && stderr) { error = new Error(stderr) }
if (error) { return reject(error) }
let pids = []
stdout.split('\n').forEach((line) => {
line = line.trim()
if (line.length > 0) {
var pid = parseInt(line.split(' ')[0])
if (!isNaN(pid)) { pids.push(pid) }
}
})
return resolve(pids)
})
})
}
async checkQueues () {

@@ -342,3 +315,3 @@ if (typeof this.queues === 'string') {

this.originalQueue = '*'
await this.untrack(this.name, this.stringQueues())
await this.untrack()
let response = await this.connection.redis.smembers(this.connection.key('queues'))

@@ -345,0 +318,0 @@ this.queues = (response ? response.sort() : [])

@@ -6,3 +6,3 @@ {

"license": "Apache-2.0",
"version": "5.2.0",
"version": "5.3.0",
"homepage": "http://github.com/taskrabbit/node-resque",

@@ -32,14 +32,18 @@ "repository": {

"devDependencies": {
"fakeredis": "latest",
"mocha": "^5.0.1",
"jest": "^22.4.3",
"node-schedule": "^1.3.0",
"should": "^13.2.1",
"standard": "^11.0.0"
},
"jest": {
"testPathIgnorePatterns": [
"<rootDir>/__tests__/utils"
]
},
"standard": {
"globals": [
"it",
"after",
"expect",
"test",
"afterAll",
"afterEach",
"before",
"beforeAll",
"beforeEach",

@@ -51,4 +55,4 @@ "describe"

"pretest": "standard",
"test": "mocha"
"test": "jest"
}
}

@@ -18,4 +18,3 @@ # node-resque: The best background jobs in node.

const path = require('path')
const NodeResque = require(path.join(__dirname, '..', 'index.js'))
// In your projects: const NR = require('node-resque');
const NodeResque = require('node-resque')

@@ -70,3 +69,2 @@ async function boot () {

await worker.connect()
await worker.workerCleanup() // optional: cleanup any previous improperly shutdown workers on this host
worker.start()

@@ -90,2 +88,3 @@

worker.on('poll', (queue) => { console.log(`worker polling ${queue}`) })
worker.on('ping', (time) => { console.log(`worker check in @ ${time}`) })
worker.on('job', (queue, job) => { console.log(`working job ${queue} ${JSON.stringify(job)}`) })

@@ -102,2 +101,3 @@ worker.on('reEnqueue', (queue, job, plugin) => { console.log(`reEnqueue job (${plugin}) ${queue} ${JSON.stringify(job)}`) })

scheduler.on('master', (state) => { console.log('scheduler became master') })
scheduler.on('cleanStuckWorker', (workerName, errorPayload, delta) => { console.log(`failing ${workerName} (stuck for ${delta}s) and failing job ${errorPayload}`) })
scheduler.on('error', (error) => { console.log(`scheduler error >> ${error}`) })

@@ -284,2 +284,4 @@ scheduler.on('workingTimestamp', (timestamp) => { console.log(`scheduler working timestamp ${timestamp}`) })

- `failedJobs` is an array listing the data of the failed jobs. Each element looks like:
`{"worker": "host:pid", "queue": "test_queue", "payload": {"class":"slowJob", "queue":"test_queue", "args":[null]}, "exception": "TypeError", "error": "MyImport is not a function", "backtrace": [' at Worker.perform (/path/to/worker:111:24)', ' at <anonymous>'], "failed_at": "Fri Dec 12 2014 14:01:16 GMT-0800 (PST)"}`
- To retrieve all failed jobs, use arguments: `await queue.failed(0, -1)`

@@ -308,2 +310,26 @@ ### Failing a Job

### Automatically
By default, the scheduler will check for workers which haven't pinged redis in 60 minutes. If this happens, we will assume the process crashed, and remove it from redis. If this worker was working on a job, we will place it in the failed queue for later inspection. Every worker has a timer running in which it then updates a key in redis every `timeout` (default: 5 seconds). If your job is slow, but async, there should be no problem. However, if your job consumes 100% of the CPU of the process, this timer might not fire.
To modify the 60 minute check, change `stuckWorkerTimeout` when configuring your scheudler, ie:
```js
const scheduler = new NodeResque.Scheduler({
stuckWorkerTimeout: (1000 * 60 * 60) // 1 hour, in ms
connection: connectionDetails
})
```
Set your scheduler's `stuckWorkerTimeout = false` to disable this behavior.
```js
const scheduler = new NodeResque.Scheduler({
stuckWorkerTimeout: false // will not fail jobs which haven't pinged redis
connection: connectionDetails
})
```
### Manually
Sometimes a worker crashes is a *severe* way, and it doesn't get the time/chance to notify redis that it is leaving the pool (this happens all the time on PAAS providers like Heroku). When this happens, you will not only need to extract the job from the now-zombie worker's "working on" status, but also remove the stuck worker. To aid you in these edge cases, `await queue.cleanOldWorkers(age)` is available.

@@ -444,2 +470,3 @@

multiWorker.on('poll', (workerId, queue) => { console.log("worker["+workerId+"] polling " + queue); })
multiWorker.on('ping', (workerId, time) => { console.log("worker["+workerId+"] check in @ " + time); })
multiWorker.on('job', (workerId, queue, job) => { console.log("worker["+workerId+"] working job " + queue + " " + JSON.stringify(job)); })

@@ -446,0 +473,0 @@ multiWorker.on('reEnqueue', (workerId, queue, job, plugin) => { console.log("worker["+workerId+"] reEnqueue job (" + plugin + ") " + queue + " " + JSON.stringify(job)); })

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc