lcherone-task-queue
Advanced tools
Comparing version 1.0.1 to 1.0.3
{ | ||
"name": "lcherone-task-queue", | ||
"version": "1.0.1", | ||
"version": "1.0.3", | ||
"description": "A simple wrapper for fastq", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
160
tasks.js
// Import the `os` module and use it to get the number of CPU cores | ||
const threads = require('os').cpus().length || 1 | ||
// Async sleep function that returns a promise that resolves after the specified duration | ||
const sleep = (duration) => new Promise((resolve) => setTimeout(resolve, duration)); | ||
// Import the `fastq` library | ||
@@ -10,68 +13,123 @@ const fastq = require('fastq') | ||
// The constructor function initializes the `timers` and `queues` properties | ||
constructor() { | ||
this.timers = {} | ||
this.queues = {} | ||
// The constructor function initializes the `timers` and `queues` properties | ||
constructor(options = { | ||
concurrency: threads, | ||
sleep: 0 | ||
}) { | ||
this.options = options | ||
this.stats = {} | ||
this.timers = {} | ||
this.queues = {} | ||
} | ||
// The `add` method adds a new task queue to the `Tasks` instance | ||
add(name, config) { | ||
// Initialize the stats for the task queue | ||
this.stats[name] = { | ||
jobs: 0, | ||
saturated: 0, | ||
empty: 0, | ||
drained: 0, | ||
completed: 0, | ||
errored: 0 | ||
} | ||
// The `add` method adds a new task queue to the `Tasks` instance | ||
add(name, config) { | ||
// Initialize the timer for the task queue | ||
this.timers[name] = 0 | ||
// Initialize the timer for the task queue | ||
this.timers[name] = 0 | ||
// Save the task queue configuration | ||
this.queues[name] = config | ||
// Save the task queue configuration | ||
this.queues[name] = config | ||
// Create the task queue using the `fastq` library | ||
this.queues[name].queue = fastq(config.worker, threads) | ||
// Create the task queue using the `fastq` library | ||
this.queues[name].queue = fastq(config.worker, this.options.concurrency) | ||
} | ||
// The `run` method starts the task queues running | ||
run() { | ||
// Get an array of the names of the task queues | ||
const jobs = Object.keys(this.queues) | ||
// Set the timers for the next iteration | ||
for (let job of jobs) { | ||
this.queues[job].queue.saturated = function () { | ||
console.log('Queue[' + job + ']: saturated') | ||
this.stats[job].saturated++ | ||
}.bind(this) | ||
// | ||
this.queues[job].queue.empty = function () { | ||
console.log('Queue[' + job + ']: empty') | ||
this.stats[job].empty++ | ||
}.bind(this) | ||
// | ||
this.queues[job].queue.drain = function () { | ||
console.log('Queue[' + job + ']: drained') | ||
this.stats[job].drained++ | ||
}.bind(this) | ||
} | ||
// The `run` method starts the task queues running | ||
run() { | ||
// Get an array of the names of the task queues | ||
const jobs = Object.keys(this.queues) | ||
// Define a loop function that will be called repeatedly | ||
const loop = async () => { | ||
// Create an array of Promises that will resolve when the finder function for each task has completed | ||
const finderPromises = jobs.map(async job => { | ||
// Get the list of jobs for the task queue | ||
const items = await this.queues[job].finder() | ||
// Define a loop function that will be called repeatedly | ||
const loop = async () => { | ||
// Create an array of Promises that will resolve when the finder function for each task has completed | ||
const finderPromises = jobs.map(async job => { | ||
// | ||
this.stats[job].jobs = parseInt(this.stats[job].jobs || 0, 10) + parseInt(items.length || 0, 10) | ||
// Get the list of jobs for the task queue | ||
const items = await this.queues[job].finder() | ||
// Loop over the items and add them to the queue | ||
for (let item of items) { | ||
// Add the item to the queue | ||
this.queues[job].queue.push(item, (err, result) => { | ||
if (err) { | ||
// | ||
this.stats[job].errored++ | ||
// Loop over the items and add them to the queue | ||
for (let item of items) { | ||
// Add the item to the queue | ||
this.queues[job].queue.push(item, (err, result) => { | ||
if (err) { | ||
// If there was an error, call the onError handler | ||
this.queues[job].onError(item, err) | ||
} else { | ||
if (result instanceof Error) { | ||
// If the result is an Error object, call the onError handler | ||
this.queues[job].onError(item, result) | ||
} else { | ||
// If the result is not an Error object, call the onSuccess handler | ||
this.queues[job].onSuccess(item, result) | ||
} | ||
} | ||
}) | ||
} | ||
}) | ||
// If there was an error, call the onError handler | ||
this.queues[job].onError(item, err) | ||
} else { | ||
if (result instanceof Error) { | ||
// | ||
this.stats[job].errored++ | ||
// Wait for all of the finder Promises to resolve | ||
await Promise.all(finderPromises) | ||
// If the result is an Error object, call the onError handler | ||
this.queues[job].onError(item, result) | ||
} else { | ||
// | ||
this.stats[job].completed++ | ||
// Set the timers for the next iteration | ||
for (let job of jobs) { | ||
// Clear the timeout for the task queue | ||
clearTimeout(this.timers[job]) | ||
// Set a new timeout to run the loop function again after the specified interval | ||
this.timers[job] = setTimeout(loop, this.queues[job].interval) | ||
// If the result is not an Error object, call the onSuccess handler | ||
this.queues[job].onSuccess(item, result) | ||
} | ||
} | ||
}) | ||
} | ||
}) | ||
// Start the loop function running for the first time | ||
loop() | ||
// Wait for all of the finder Promises to resolve | ||
await Promise.all(finderPromises) | ||
// Set the timers for the next iteration | ||
for (let job of jobs) { | ||
// Clear the timeout for the task queue | ||
clearTimeout(this.timers[job]) | ||
// Set a new timeout to run the loop function again after the specified interval | ||
this.timers[job] = setTimeout(async () => { | ||
// If the sleep option is truthy, pause the execution of the function for the specified duration | ||
if (this.options.sleep) { | ||
await sleep(this.options.sleep) | ||
} | ||
// | ||
loop() | ||
}, this.queues[job].interval) | ||
} | ||
} | ||
// Start the loop function running for the first time | ||
loop() | ||
} | ||
} |
6860
111