Socket
Socket
Sign inDemoInstall

lcherone-task-queue

Package Overview
Dependencies
2
Maintainers
1
Versions
3
Alerts
File Explorer

Advanced tools

Install Socket

Detect and block malicious and high-risk dependencies

Install

Comparing version 1.0.1 to 1.0.3

2

package.json
{
"name": "lcherone-task-queue",
"version": "1.0.1",
"version": "1.0.3",
"description": "A simple wrapper for fastq",

@@ -5,0 +5,0 @@ "main": "index.js",

// Import the `os` module and use it to get the number of CPU cores
const threads = require('os').cpus().length || 1
// Async sleep function that returns a promise that resolves after the specified duration
const sleep = (duration) => new Promise((resolve) => setTimeout(resolve, duration));
// Import the `fastq` library

@@ -10,68 +13,123 @@ const fastq = require('fastq')

// The constructor function initializes the `timers` and `queues` properties
constructor() {
this.timers = {}
this.queues = {}
// The constructor function initializes the `timers` and `queues` properties
constructor(options = {
concurrency: threads,
sleep: 0
}) {
this.options = options
this.stats = {}
this.timers = {}
this.queues = {}
}
// The `add` method adds a new task queue to the `Tasks` instance
add(name, config) {
// Initialize the stats for the task queue
this.stats[name] = {
jobs: 0,
saturated: 0,
empty: 0,
drained: 0,
completed: 0,
errored: 0
}
// The `add` method adds a new task queue to the `Tasks` instance
add(name, config) {
// Initialize the timer for the task queue
this.timers[name] = 0
// Initialize the timer for the task queue
this.timers[name] = 0
// Save the task queue configuration
this.queues[name] = config
// Save the task queue configuration
this.queues[name] = config
// Create the task queue using the `fastq` library
this.queues[name].queue = fastq(config.worker, threads)
// Create the task queue using the `fastq` library
this.queues[name].queue = fastq(config.worker, this.options.concurrency)
}
// The `run` method starts the task queues running
run() {
// Get an array of the names of the task queues
const jobs = Object.keys(this.queues)
// Set the timers for the next iteration
for (let job of jobs) {
this.queues[job].queue.saturated = function () {
console.log('Queue[' + job + ']: saturated')
this.stats[job].saturated++
}.bind(this)
//
this.queues[job].queue.empty = function () {
console.log('Queue[' + job + ']: empty')
this.stats[job].empty++
}.bind(this)
//
this.queues[job].queue.drain = function () {
console.log('Queue[' + job + ']: drained')
this.stats[job].drained++
}.bind(this)
}
// The `run` method starts the task queues running
run() {
// Get an array of the names of the task queues
const jobs = Object.keys(this.queues)
// Define a loop function that will be called repeatedly
const loop = async () => {
// Create an array of Promises that will resolve when the finder function for each task has completed
const finderPromises = jobs.map(async job => {
// Get the list of jobs for the task queue
const items = await this.queues[job].finder()
// Define a loop function that will be called repeatedly
const loop = async () => {
// Create an array of Promises that will resolve when the finder function for each task has completed
const finderPromises = jobs.map(async job => {
//
this.stats[job].jobs = parseInt(this.stats[job].jobs || 0, 10) + parseInt(items.length || 0, 10)
// Get the list of jobs for the task queue
const items = await this.queues[job].finder()
// Loop over the items and add them to the queue
for (let item of items) {
// Add the item to the queue
this.queues[job].queue.push(item, (err, result) => {
if (err) {
//
this.stats[job].errored++
// Loop over the items and add them to the queue
for (let item of items) {
// Add the item to the queue
this.queues[job].queue.push(item, (err, result) => {
if (err) {
// If there was an error, call the onError handler
this.queues[job].onError(item, err)
} else {
if (result instanceof Error) {
// If the result is an Error object, call the onError handler
this.queues[job].onError(item, result)
} else {
// If the result is not an Error object, call the onSuccess handler
this.queues[job].onSuccess(item, result)
}
}
})
}
})
// If there was an error, call the onError handler
this.queues[job].onError(item, err)
} else {
if (result instanceof Error) {
//
this.stats[job].errored++
// Wait for all of the finder Promises to resolve
await Promise.all(finderPromises)
// If the result is an Error object, call the onError handler
this.queues[job].onError(item, result)
} else {
//
this.stats[job].completed++
// Set the timers for the next iteration
for (let job of jobs) {
// Clear the timeout for the task queue
clearTimeout(this.timers[job])
// Set a new timeout to run the loop function again after the specified interval
this.timers[job] = setTimeout(loop, this.queues[job].interval)
// If the result is not an Error object, call the onSuccess handler
this.queues[job].onSuccess(item, result)
}
}
})
}
})
// Start the loop function running for the first time
loop()
// Wait for all of the finder Promises to resolve
await Promise.all(finderPromises)
// Set the timers for the next iteration
for (let job of jobs) {
// Clear the timeout for the task queue
clearTimeout(this.timers[job])
// Set a new timeout to run the loop function again after the specified interval
this.timers[job] = setTimeout(async () => {
// If the sleep option is truthy, pause the execution of the function for the specified duration
if (this.options.sleep) {
await sleep(this.options.sleep)
}
//
loop()
}, this.queues[job].interval)
}
}
// Start the loop function running for the first time
loop()
}
}
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc