Socket
Socket
Sign inDemoInstall

promise-blocking-queue

Package Overview
Dependencies
Maintainers
1
Versions
4
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

promise-blocking-queue - npm Package Compare versions

Comparing version 0.0.2 to 0.1.0

4

CHANGELOG.md

@@ -8,2 +8,6 @@ # Changelog

## v0.1.0
Expose if the enqueued task already started running
## v0.0.2

@@ -10,0 +14,0 @@

7

dist/BlockingQueue.d.ts

@@ -9,3 +9,4 @@ /// <reference types="node" />

}
declare const BlockingQueue_base: new () => StrictEventEmitter<EventEmitter, IBlockingQueueEvents, IBlockingQueueEvents, "addEventListener" | "removeEventListener", "on" | "addListener" | "removeListener" | "once" | "emit">;
declare type MessageEmitter = StrictEventEmitter<EventEmitter, IBlockingQueueEvents>;
declare const BlockingQueue_base: new () => MessageEmitter;
export declare class BlockingQueue extends BlockingQueue_base {

@@ -18,4 +19,4 @@ private readonly _options;

enqueue<T, P extends any[]>(fn: QueueFn<T, P>, ...args: P): IEnqueueResult<T>;
readonly activeCount: number;
readonly pendingCount: number;
get activeCount(): number;
get pendingCount(): number;
private _next;

@@ -22,0 +23,0 @@ private _run;

'use strict';
Object.defineProperty(exports, "__esModule", { value: true });
exports.BlockingQueue = void 0;
const events_1 = require("events");

@@ -33,3 +34,5 @@ const LinkedList = require("linked-list");

};
let started = false;
if (this.activeCount < this._options.concurrency) {
started = true;
this._run(item);

@@ -41,2 +44,3 @@ }

return {
started,
enqueuePromise: enqueuePromiseParts.promise,

@@ -43,0 +47,0 @@ fnPromise: fnPromiseParts.promise,

'use strict';
function __export(m) {
for (var p in m) if (!exports.hasOwnProperty(p)) exports[p] = m[p];
}
var __createBinding = (this && this.__createBinding) || (Object.create ? (function(o, m, k, k2) {
if (k2 === undefined) k2 = k;
Object.defineProperty(o, k2, { enumerable: true, get: function() { return m[k]; } });
}) : (function(o, m, k, k2) {
if (k2 === undefined) k2 = k;
o[k2] = m[k];
}));
var __exportStar = (this && this.__exportStar) || function(m, exports) {
for (var p in m) if (p !== "default" && !Object.prototype.hasOwnProperty.call(exports, p)) __createBinding(exports, m, p);
};
Object.defineProperty(exports, "__esModule", { value: true });
__export(require("./BlockingQueue"));
__exportStar(require("./BlockingQueue"), exports);
__exportStar(require("./EventEmitterTypesHelper"), exports);
__exportStar(require("./types"), exports);
//# sourceMappingURL=index.js.map

@@ -8,2 +8,3 @@ export interface IBlockingQueueOptions {

fnPromise: Promise<T>;
started: boolean;
}
{
"name": "promise-blocking-queue",
"version": "0.0.2",
"version": "0.1.0",
"description": "Memory optimized promise blocking queue with concurrency control",

@@ -57,19 +57,19 @@ "main": "dist/index.js",

"devDependencies": {
"@types/chai": "^4.1.7",
"@types/chai-as-promised": "^7.1.0",
"@types/mocha": "^5.2.7",
"@types/node": "^12.6.3",
"@types/chai": "^4.2.15",
"@types/chai-as-promised": "^7.1.3",
"@types/mocha": "^8.2.1",
"@types/node": "^14.14.31",
"chai": "^4.2.0",
"chai-as-promised": "^7.1.1",
"coveralls": "^3.0.5",
"delay": "^4.3.0",
"coveralls": "^3.1.0",
"delay": "^5.0.0",
"dirty-chai": "^2.0.1",
"istanbul": "^0.4.5",
"mocha": "^6.1.4",
"sinon": "^7.3.2",
"sinon-chai": "^3.3.0",
"sleep-promise": "^8.0.1",
"ts-node": "^8.3.0",
"tslint": "^5.18.0",
"typescript": "^3.5.3"
"mocha": "^6.2.3",
"sinon": "^9.2.4",
"sinon-chai": "^3.5.0",
"sleep-promise": "^9.1.0",
"ts-node": "^9.1.1",
"tslint": "^6.1.3",
"typescript": "^4.1.5"
},

@@ -76,0 +76,0 @@ "dependencies": {

@@ -24,3 +24,3 @@ [![Npm Version](https://img.shields.io/npm/v/promise-blocking-queue.svg?style=popout)](https://www.npmjs.com/package/promise-blocking-queue)

The solution - a blocking queue that returns a promise that will be resolved when the added item gain an available slot in the
The solution - a blocking queue that returns a promise that will be resolved when the added item gains an available slot in the
queue, thus, allowing us to pause the stream consumption, until there is a _real_ need to consume the next item - keeping us

@@ -35,55 +35,139 @@ memory smart while maintaining concurrency level of data handling.

## Usage
## Usage example
Let's assume we have a very large (a couple of GBs) file called `users.json` which contains a long list of users we want to add to our DB.
Also, let's assume that our DB instance it very cheap, and as such we don't want to load it to much, so we only want to handle
100 concurrent DB insert operations.
We can achieve a short scalable solution like:
Let's assume we have a very large (a couple of GBs) file called `users.json` which contains a long list of users we want to add to our DB.
Also, let's assume that our DB instance it very cheap, and as such we don't want to load it too much, so we only want to handle
2 concurrent DB insert operations.
We can achieve a short scalable solution like so:
```typescript
import * as JSONStream from 'JSONStream';
import * as fs from 'fs';
import * as _ from 'underscore';
import * as es from 'event-stream';
import * as sleep from 'sleep-promise';
import { BlockingQueue } from 'promise-blocking-queue';
const queue = new BlockingQueue({ concurrency: 100 });
let count = 0;
const queue = new BlockingQueue({ concurrency: 2 });
let handled = 0;
let failed = 0;
const readStream = fs.createReadStream('./users.json', { flags: 'r', encoding: 'utf-8' });
const jsonReadStream = JSONStream.parse('*');
const jsonWriteStream = JSONStream.stringify();
const writeStream = fs.createWriteStream('./results.json');
const logFailed = () => {
console.log(`failed ${++failed}`);
};
const logAddedUser = (username) => () => {
console.log(`added ${username} #${++handled}`);
jsonWriteStream.write(username);
};
const addUserToDB = (user) => {
console.log(`adding ${user.username}`);
// Simulate long running task
return sleep((handled + 1) * 100).then(logAddedUser(user.username));
};
const mapper = (user, cb) => {
queue.enqueue(() => {
// Add user to DB
return Promise.resolve();
}).enqueuePromise
.then(() => {
console.log('handled', count++);
cb();
})
.catch((err) => {
cb(err);
});
console.log(`streamed ${user.username}`);
const qResult = queue.enqueue(addUserToDB, user);
qResult.fnPromise.catch(logFailed);
// Continue streaming only after current item handling starts
qResult.enqueuePromise.then(cb, cb);
return false;
};
const readStream = fs.createReadStream('./users.json', { flags: 'r', encoding: 'utf-8' });
const jsonReadStream = JSONStream.parse('*');
const mapStream = es.map(mapper);
// tslint:disable-next-line:no-empty
const noop = () => {};
const onReadEnd = () => {
console.log('done read streaming');
// Wait until all work is done
queue.on('idle', () => {
jsonWriteStream.end();
});
};
const onWriteEnd = () => {
console.log(`done processing - ${handled} handled, ${failed} failed`);
process.exit(0);
};
jsonWriteStream
.pipe(writeStream)
.on('error', (err) => {
console.log('error wrtie streaming', err);
process.exit(1);
})
.on('end', onWriteEnd)
.on('finish', onWriteEnd);
readStream
.pipe(jsonReadStream)
.pipe(mapStream)
.on('data', _.noop)
.pipe(es.map(mapper))
.on('data', noop)
.on('error', (err) => {
console.log('error streaming', err);
console.log('error read streaming', err);
process.exit(1);
})
.on('end', () => {
console.log('done streaming');
queue.on('idle', () => {
console.log('done processing', count);
process.exit(0);
});
});
.on('finish', onReadEnd)
.on('end', onReadEnd);
```
If `users.json` is like:
```json
[
{
"username": "a"
},
{
"username": "b"
},
{
"username": "c"
},
{
"username": "d"
}
]
```
Output will be:
```bash
streamed a
adding a
streamed b
adding b
streamed c // c now waits in line to start and streaming is paused until then
added a #1
streamed d // d only get streamed after c has a spot in the queue
adding c // c only gets handled after a is done
added b #2
adding d // d only gets handled after b is done
done read streaming
added c #3
added d #4
done processing - 4 handled, 0 failed
```
`results.json` will be:
```json
[
"a"
,
"b"
,
"c"
,
"d"
]
```
## API

@@ -132,2 +216,8 @@

###### started
Type: `boolean`
Indicates if the task has already started to run
##### fn

@@ -134,0 +224,0 @@

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc