node-resque
Advanced tools
Comparing version 5.2.0 to 5.3.0
@@ -66,3 +66,2 @@ const path = require('path') | ||
await worker.connect() | ||
await worker.workerCleanup() // optional: cleanup any previous improperly shutdown workers on this host | ||
worker.start() | ||
@@ -69,0 +68,0 @@ |
@@ -55,3 +55,2 @@ const path = require('path') | ||
await worker.connect() | ||
await worker.workerCleanup() // optional: cleanup any previous improperly shutdown workers on this host | ||
worker.start() | ||
@@ -58,0 +57,0 @@ |
@@ -69,3 +69,2 @@ const path = require('path') | ||
await worker.connect() | ||
await worker.workerCleanup() // optional: cleanup any previous improperly shutdown workers on this host | ||
worker.start() | ||
@@ -89,2 +88,3 @@ | ||
worker.on('poll', (queue) => { console.log(`worker polling ${queue}`) }) | ||
worker.on('ping', (time) => { console.log(`worker check in @ ${time}`) }) | ||
worker.on('job', (queue, job) => { console.log(`working job ${queue} ${JSON.stringify(job)}`) }) | ||
@@ -102,2 +102,3 @@ worker.on('reEnqueue', (queue, job, plugin) => { console.log(`reEnqueue job (${plugin}) ${queue} ${JSON.stringify(job)}`) }) | ||
scheduler.on('error', (error) => { console.log(`scheduler error >> ${error}`) }) | ||
scheduler.on('cleanStuckWorker', (workerName, errorPayload, delta) => { console.log(`failing ${workerName} (stuck for ${delta}s) and failing job ${errorPayload}`) }) | ||
scheduler.on('workingTimestamp', (timestamp) => { console.log(`scheduler working timestamp ${timestamp}`) }) | ||
@@ -104,0 +105,0 @@ scheduler.on('transferredJob', (timestamp, job) => { console.log(`scheduler enquing job ${timestamp} >> ${JSON.stringify(job)}`) }) |
@@ -51,3 +51,2 @@ const path = require('path') | ||
await worker.connect() | ||
await worker.workerCleanup() // optional: cleanup any previous improperly shutdown workers on this host | ||
worker.start() | ||
@@ -54,0 +53,0 @@ |
@@ -42,3 +42,2 @@ const path = require('path') | ||
await worker.connect() | ||
await worker.workerCleanup() // optional: cleanup any previous improperly shutdown workers on this host | ||
worker.start() | ||
@@ -45,0 +44,0 @@ |
@@ -70,2 +70,3 @@ const os = require('os') | ||
worker.on('poll', (queue) => { this.emit('poll', worker.id, queue) }) | ||
worker.on('ping', (time) => { this.emit('ping', worker.id, time) }) | ||
worker.on('job', (queue, job) => { this.emit('job', worker.id, queue, job) }) | ||
@@ -81,3 +82,2 @@ worker.on('reEnqueue', (queue, job, plugin) => { this.emit('reEnqueue', worker.id, queue, job, plugin) }) | ||
await worker.connect() | ||
await worker.workerCleanup() | ||
await worker.start() | ||
@@ -150,2 +150,3 @@ } | ||
'poll', | ||
'ping', | ||
'job', | ||
@@ -152,0 +153,0 @@ 'reEnqueue', |
const path = require('path') | ||
const os = require('os') | ||
const EventEmitter = require('events').EventEmitter | ||
@@ -252,2 +253,3 @@ const Connection = require(path.join(__dirname, 'connection.js')).Connection | ||
let workingOn = await this.workingOn(workerName, queues) | ||
let message = 'Worker Timeout (killed manually)' | ||
if (workingOn) { | ||
@@ -258,6 +260,10 @@ workingOn = JSON.parse(workingOn) | ||
queue: workingOn.queue, | ||
payload: workingOn.payload, | ||
exception: 'Worker Timeout (killed manually)', | ||
error: 'Worker Timeout (killed manually)', | ||
backtrace: null, | ||
payload: workingOn.payload || [], | ||
exception: message, | ||
error: message, | ||
backtrace: [ | ||
`killed by ${os.hostname} at ${new Date()}`, | ||
'queue#forceCleanWorker', | ||
'node-resque' | ||
], | ||
failed_at: (new Date()).toString() | ||
@@ -273,2 +279,3 @@ } | ||
await this.connection.redis.del(this.connection.key('stat', 'processed', workerName)) | ||
await this.connection.redis.del(this.connection.key('worker', 'ping', workerName)) | ||
await this.connection.redis.del(this.connection.key('worker', workerName)) | ||
@@ -275,0 +282,0 @@ await this.connection.redis.srem(this.connection.key('workers'), workerName + ':' + queues) |
@@ -16,2 +16,3 @@ // To read notes about the master locking scheme, check out: | ||
timeout: 5000, // in ms | ||
stuckWorkerTimeout: 60 * 60 * 1000, // 60 minutes in ms | ||
masterLockTimeout: 60 * 3, // in seconds | ||
@@ -92,2 +93,4 @@ name: os.hostname() + ':' + process.pid // assumes only one worker per node process | ||
await this.checkStuckWorkers() | ||
this.emit('poll') | ||
@@ -189,4 +192,29 @@ let timestamp = await this.nextDelayedTimestamp() | ||
} | ||
async checkStuckWorkers () { | ||
if (!this.options.stuckWorkerTimeout) { return } | ||
const keys = await this.connection.redis.keys(this.connection.key('worker', 'ping', '*')) | ||
const payloads = await Promise.all(keys.map(async (k) => { | ||
return JSON.parse(await this.connection.redis.get(k)) | ||
})) | ||
const nowInSeconds = Math.round(new Date().getTime() / 1000) | ||
const stuckWorkerTimeoutInSeconds = Math.round(this.options.stuckWorkerTimeout / 1000) | ||
for (let i in payloads) { | ||
const {name, time} = payloads[i] | ||
const delta = nowInSeconds - time | ||
if (delta > stuckWorkerTimeoutInSeconds) { | ||
await this.forceCleanWorker(name, delta) | ||
} | ||
i++ | ||
} | ||
} | ||
async forceCleanWorker (workerName, delta) { | ||
const errorPayload = await this.queue.forceCleanWorker(workerName) | ||
this.emit('cleanStuckWorker', workerName, errorPayload, delta) | ||
} | ||
} | ||
exports.Scheduler = Scheduler |
const os = require('os') | ||
const path = require('path') | ||
const exec = require('child_process').exec | ||
const EventEmitter = require('events').EventEmitter | ||
@@ -42,2 +41,3 @@ const Queue = require(path.join(__dirname, 'queue.js')).Queue | ||
this.job = null | ||
this.pingTimer = null | ||
@@ -68,2 +68,4 @@ this.queueObject = new Queue({connection: options.connection}, this.jobs) | ||
) | ||
await this.ping() | ||
this.pingTimer = setInterval(this.ping.bind(this), this.options.timeout) | ||
} | ||
@@ -80,3 +82,4 @@ | ||
if (this.connection.connected === true || this.connection.connected === undefined || this.connection.connected === null) { | ||
await this.untrack(this.name, this.stringQueues()) | ||
clearInterval(this.pingTimer) | ||
await this.untrack() | ||
} | ||
@@ -273,8 +276,23 @@ | ||
async untrack (name, queues) { | ||
async ping () { | ||
const name = this.name | ||
const nowSeconds = Math.round(new Date().getTime() / 1000) | ||
this.emit('ping', nowSeconds) | ||
const payload = JSON.stringify({ | ||
time: nowSeconds, | ||
name: name, | ||
queues: this.stringQueues() | ||
}) | ||
await this.connection.redis.set(this.connection.key('worker', 'ping', name), payload) | ||
} | ||
async untrack () { | ||
const name = this.name | ||
const queues = this.stringQueues() | ||
if (!this.connection || !this.connection.redis) { return } | ||
await this.connection.redis.srem(this.connection.key('workers'), (name + ':' + queues)) | ||
await this.connection.redis.del(this.connection.key('worker', name, this.stringQueues())) | ||
await this.connection.redis.del(this.connection.key('worker', name, this.stringQueues(), 'started')) | ||
await this.connection.redis.del(this.connection.key('worker', 'ping', name)) | ||
await this.connection.redis.del(this.connection.key('worker', name, queues)) | ||
await this.connection.redis.del(this.connection.key('worker', name, queues, 'started')) | ||
await this.connection.redis.del(this.connection.key('stat', 'failed', name)) | ||
@@ -284,47 +302,2 @@ await this.connection.redis.del(this.connection.key('stat', 'processed', name)) | ||
async workerCleanup () { | ||
let pids = await this.getPids() | ||
let workers = await this.connection.redis.smembers(this.connection.key('workers')) | ||
for (let i in workers) { | ||
let worker = workers[i] | ||
let parts = worker.split(':') | ||
let host = parts[0] | ||
let pid = parseInt(parts[1]) | ||
let queues = parts.splice(-1, 1) | ||
let pureName = parts.join(':') | ||
if (host === os.hostname() && pids.indexOf(pid) < 0) { | ||
this.emit('cleaning_worker', worker, pid) | ||
await this.untrack(pureName, queues) | ||
} | ||
} | ||
} | ||
async getPids () { | ||
let cmd | ||
if (process.platform === 'win32') { | ||
cmd = 'powershell.exe -command "Get-Process | select Id"' | ||
} else { | ||
cmd = 'ps -eo pid=' | ||
} | ||
return new Promise((resolve, reject) => { | ||
exec(cmd, (error, stdout, stderr) => { | ||
if (!error && stderr) { error = new Error(stderr) } | ||
if (error) { return reject(error) } | ||
let pids = [] | ||
stdout.split('\n').forEach((line) => { | ||
line = line.trim() | ||
if (line.length > 0) { | ||
var pid = parseInt(line.split(' ')[0]) | ||
if (!isNaN(pid)) { pids.push(pid) } | ||
} | ||
}) | ||
return resolve(pids) | ||
}) | ||
}) | ||
} | ||
async checkQueues () { | ||
@@ -342,3 +315,3 @@ if (typeof this.queues === 'string') { | ||
this.originalQueue = '*' | ||
await this.untrack(this.name, this.stringQueues()) | ||
await this.untrack() | ||
let response = await this.connection.redis.smembers(this.connection.key('queues')) | ||
@@ -345,0 +318,0 @@ this.queues = (response ? response.sort() : []) |
@@ -6,3 +6,3 @@ { | ||
"license": "Apache-2.0", | ||
"version": "5.2.0", | ||
"version": "5.3.0", | ||
"homepage": "http://github.com/taskrabbit/node-resque", | ||
@@ -32,14 +32,18 @@ "repository": { | ||
"devDependencies": { | ||
"fakeredis": "latest", | ||
"mocha": "^5.0.1", | ||
"jest": "^22.4.3", | ||
"node-schedule": "^1.3.0", | ||
"should": "^13.2.1", | ||
"standard": "^11.0.0" | ||
}, | ||
"jest": { | ||
"testPathIgnorePatterns": [ | ||
"<rootDir>/__tests__/utils" | ||
] | ||
}, | ||
"standard": { | ||
"globals": [ | ||
"it", | ||
"after", | ||
"expect", | ||
"test", | ||
"afterAll", | ||
"afterEach", | ||
"before", | ||
"beforeAll", | ||
"beforeEach", | ||
@@ -51,4 +55,4 @@ "describe" | ||
"pretest": "standard", | ||
"test": "mocha" | ||
"test": "jest" | ||
} | ||
} |
@@ -18,4 +18,3 @@ # node-resque: The best background jobs in node. | ||
const path = require('path') | ||
const NodeResque = require(path.join(__dirname, '..', 'index.js')) | ||
// In your projects: const NR = require('node-resque'); | ||
const NodeResque = require('node-resque') | ||
@@ -70,3 +69,2 @@ async function boot () { | ||
await worker.connect() | ||
await worker.workerCleanup() // optional: cleanup any previous improperly shutdown workers on this host | ||
worker.start() | ||
@@ -90,2 +88,3 @@ | ||
worker.on('poll', (queue) => { console.log(`worker polling ${queue}`) }) | ||
worker.on('ping', (time) => { console.log(`worker check in @ ${time}`) }) | ||
worker.on('job', (queue, job) => { console.log(`working job ${queue} ${JSON.stringify(job)}`) }) | ||
@@ -102,2 +101,3 @@ worker.on('reEnqueue', (queue, job, plugin) => { console.log(`reEnqueue job (${plugin}) ${queue} ${JSON.stringify(job)}`) }) | ||
scheduler.on('master', (state) => { console.log('scheduler became master') }) | ||
scheduler.on('cleanStuckWorker', (workerName, errorPayload, delta) => { console.log(`failing ${workerName} (stuck for ${delta}s) and failing job ${errorPayload}`) }) | ||
scheduler.on('error', (error) => { console.log(`scheduler error >> ${error}`) }) | ||
@@ -284,2 +284,4 @@ scheduler.on('workingTimestamp', (timestamp) => { console.log(`scheduler working timestamp ${timestamp}`) }) | ||
- `failedJobs` is an array listing the data of the failed jobs. Each element looks like: | ||
`{"worker": "host:pid", "queue": "test_queue", "payload": {"class":"slowJob", "queue":"test_queue", "args":[null]}, "exception": "TypeError", "error": "MyImport is not a function", "backtrace": [' at Worker.perform (/path/to/worker:111:24)', ' at <anonymous>'], "failed_at": "Fri Dec 12 2014 14:01:16 GMT-0800 (PST)"}` | ||
- To retrieve all failed jobs, use arguments: `await queue.failed(0, -1)` | ||
@@ -308,2 +310,26 @@ ### Failing a Job | ||
### Automatically | ||
By default, the scheduler will check for workers which haven't pinged redis in 60 minutes. If this happens, we will assume the process crashed, and remove it from redis. If this worker was working on a job, we will place it in the failed queue for later inspection. Every worker has a timer running in which it then updates a key in redis every `timeout` (default: 5 seconds). If your job is slow, but async, there should be no problem. However, if your job consumes 100% of the CPU of the process, this timer might not fire. | ||
To modify the 60 minute check, change `stuckWorkerTimeout` when configuring your scheudler, ie: | ||
```js | ||
const scheduler = new NodeResque.Scheduler({ | ||
stuckWorkerTimeout: (1000 * 60 * 60) // 1 hour, in ms | ||
connection: connectionDetails | ||
}) | ||
``` | ||
Set your scheduler's `stuckWorkerTimeout = false` to disable this behavior. | ||
```js | ||
const scheduler = new NodeResque.Scheduler({ | ||
stuckWorkerTimeout: false // will not fail jobs which haven't pinged redis | ||
connection: connectionDetails | ||
}) | ||
``` | ||
### Manually | ||
Sometimes a worker crashes is a *severe* way, and it doesn't get the time/chance to notify redis that it is leaving the pool (this happens all the time on PAAS providers like Heroku). When this happens, you will not only need to extract the job from the now-zombie worker's "working on" status, but also remove the stuck worker. To aid you in these edge cases, `await queue.cleanOldWorkers(age)` is available. | ||
@@ -444,2 +470,3 @@ | ||
multiWorker.on('poll', (workerId, queue) => { console.log("worker["+workerId+"] polling " + queue); }) | ||
multiWorker.on('ping', (workerId, time) => { console.log("worker["+workerId+"] check in @ " + time); }) | ||
multiWorker.on('job', (workerId, queue, job) => { console.log("worker["+workerId+"] working job " + queue + " " + JSON.stringify(job)); }) | ||
@@ -446,0 +473,0 @@ multiWorker.on('reEnqueue', (workerId, queue, job, plugin) => { console.log("worker["+workerId+"] reEnqueue job (" + plugin + ") " + queue + " " + JSON.stringify(job)); }) |
Dynamic require
Supply chain riskDynamic require can indicate the package is performing dangerous or unsafe dynamic code execution.
Found 1 instance in 1 package
Environment variable access
Supply chain riskPackage accesses environment variables, which may be a sign of credential stuffing or data theft.
Found 1 instance in 1 package
Shell access
Supply chain riskThis module accesses the system shell. Accessing the system shell increases the risk of executing arbitrary code.
Found 1 instance in 1 package
Dynamic require
Supply chain riskDynamic require can indicate the package is performing dangerous or unsafe dynamic code execution.
Found 1 instance in 1 package
528803
3
3677
496
0
51