Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

promise-blocking-queue

Package Overview
Dependencies
Maintainers
1
Versions
4
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

promise-blocking-queue

Memory optimized promise blocking queue with concurrency control

  • 0.0.1
  • Source
  • npm
  • Socket score

Version published
Maintainers
1
Created
Source

Npm Version Build Status Coverage Status Codacy Badge Known Vulnerabilities dependencies Status devDependencies Status

Promise Blocking Queue

Memory optimized promise blocking queue with concurrency control, specially designed to handle large data sets that must be consumed using streams.

Useful for rate-limiting async (or sync) operations that consume large data sets. For example, when interacting with a REST API or when doing CPU/memory intensive tasks.

Why

If we use Bluebird.map() for example, we are forced to load all the data in memory, before being able to consume it - Out Of Memory Exception is right around the corner.

If we use p-queue (by the amazing sindresorhus) for example, we can utilize streams to avoid memory bloat, but we have no (easy) way to control the stream flow without hitting that Out Of Memory Exception.

The solution - a blocking queue that returns a promise that will be resolved when the added item gain an available slot in the queue, thus, allowing us to pause the stream consumption, until there is a real need to consume the next item - keeping us memory smart while maintaining concurrency level of data handling.

Install

npm install promise-blocking-queue

Usage

Let's assume we have a very large (a couple of GBs) file called users.json which contains a long list of users we want to add to our DB. Also, let's assume that our DB instance it very cheap, and as such we don't want to load it to much, so we only want to handle 100 concurrent DB insert operations. We can achieve a short scalable solution like:

import * as JSONStream from 'JSONStream';
import * as fs from 'fs';
import * as _ from 'underscore';
import * as es from 'event-stream';
import { BlockingQueue } from 'promise-blocking-queue';

const queue = new BlockingQueue({ concurrency: 100 });
let count = 0;

const mapper = (user, cb) => {
  queue.enqueue(() => {
    // Add user to DB
    return Promise.resolve();
  }).enqueuePromise
    .then(() => {
      console.log('handled', count++);
      cb();
    })
    .catch((err) => {
      cb(err);
    });
  return false;
};

const readStream = fs.createReadStream('./users.json', { flags: 'r', encoding: 'utf-8' });
const jsonReadStream = JSONStream.parse('*');
const mapStream = es.map(mapper);

readStream
  .pipe(jsonReadStream)
  .pipe(mapStream)
  .on('data', _.noop)
  .on('error', (err) => {
    console.log('error streaming', err);
    process.exit(1);
  })
  .on('end', () => {
    console.log('done streaming');
    queue.on('idle', () => {
      console.log('done processing', count);
      process.exit(0);
    });
  });

API

BlockingQueue(options)

Returns a new queue instance, which is an EventEmitter subclass.

options

Type: object

concurrency

Type: number
Default: Infinity
Minimum: 1

Concurrency limit.

queue

BlockingQueue instance.

.enqueue(fn, ...args)

Adds a sync or async task to the queue

Return value

Type: object

enqueuePromise

Type: Promise<void>

A promise that will be resolved when the queue has an available slot to run the task.
Used to realize that it is a good time to add another task to the queue.

fnPromise

Type: Promise<T>

A promise that will be resolved with the result of fn.

fn

Type: Function

Promise/Value returning function.

args

Type: any[]

The arguments to pass to the function

activeCount

The number of promises that are currently running.

pendingCount

The number of promises that are waiting to run.

Events

empty

Emitted when the queue becomes empty. Useful if, for example, you add additional items at a later time.

idle

Emitted when the queue becomes empty, and all promises have completed: queue.activeCount === 0 && queue.pendingCount === 0.

The difference with empty is that idle guarantees that all work from the queue has finished. empty merely signals that the queue is empty, but it could mean that some promises haven't completed yet.

Credits

The library is based on p-limit and p-queue (by the amazing sindresorhus)

Versions

Promise Blocking Queue supports Node 6 LTS and higher.

Contributing

All contributions are happily welcomed!
Please make all pull requests to the master branch from your fork and ensure tests pass locally.

Keywords

FAQs

Package last updated on 15 Jul 2019

Did you know?

Socket

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc