promise-blocking-queue
Advanced tools
Comparing version 0.0.2 to 0.1.0
@@ -8,2 +8,6 @@ # Changelog | ||
## v0.1.0 | ||
Expose if the enqueued task already started running | ||
## v0.0.2 | ||
@@ -10,0 +14,0 @@ |
@@ -9,3 +9,4 @@ /// <reference types="node" /> | ||
} | ||
declare const BlockingQueue_base: new () => StrictEventEmitter<EventEmitter, IBlockingQueueEvents, IBlockingQueueEvents, "addEventListener" | "removeEventListener", "on" | "addListener" | "removeListener" | "once" | "emit">; | ||
declare type MessageEmitter = StrictEventEmitter<EventEmitter, IBlockingQueueEvents>; | ||
declare const BlockingQueue_base: new () => MessageEmitter; | ||
export declare class BlockingQueue extends BlockingQueue_base { | ||
@@ -18,4 +19,4 @@ private readonly _options; | ||
enqueue<T, P extends any[]>(fn: QueueFn<T, P>, ...args: P): IEnqueueResult<T>; | ||
readonly activeCount: number; | ||
readonly pendingCount: number; | ||
get activeCount(): number; | ||
get pendingCount(): number; | ||
private _next; | ||
@@ -22,0 +23,0 @@ private _run; |
'use strict'; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
exports.BlockingQueue = void 0; | ||
const events_1 = require("events"); | ||
@@ -33,3 +34,5 @@ const LinkedList = require("linked-list"); | ||
}; | ||
let started = false; | ||
if (this.activeCount < this._options.concurrency) { | ||
started = true; | ||
this._run(item); | ||
@@ -41,2 +44,3 @@ } | ||
return { | ||
started, | ||
enqueuePromise: enqueuePromiseParts.promise, | ||
@@ -43,0 +47,0 @@ fnPromise: fnPromiseParts.promise, |
'use strict'; | ||
function __export(m) { | ||
for (var p in m) if (!exports.hasOwnProperty(p)) exports[p] = m[p]; | ||
} | ||
var __createBinding = (this && this.__createBinding) || (Object.create ? (function(o, m, k, k2) { | ||
if (k2 === undefined) k2 = k; | ||
Object.defineProperty(o, k2, { enumerable: true, get: function() { return m[k]; } }); | ||
}) : (function(o, m, k, k2) { | ||
if (k2 === undefined) k2 = k; | ||
o[k2] = m[k]; | ||
})); | ||
var __exportStar = (this && this.__exportStar) || function(m, exports) { | ||
for (var p in m) if (p !== "default" && !Object.prototype.hasOwnProperty.call(exports, p)) __createBinding(exports, m, p); | ||
}; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
__export(require("./BlockingQueue")); | ||
__exportStar(require("./BlockingQueue"), exports); | ||
__exportStar(require("./EventEmitterTypesHelper"), exports); | ||
__exportStar(require("./types"), exports); | ||
//# sourceMappingURL=index.js.map |
@@ -8,2 +8,3 @@ export interface IBlockingQueueOptions { | ||
fnPromise: Promise<T>; | ||
started: boolean; | ||
} |
{ | ||
"name": "promise-blocking-queue", | ||
"version": "0.0.2", | ||
"version": "0.1.0", | ||
"description": "Memory optimized promise blocking queue with concurrency control", | ||
@@ -57,19 +57,19 @@ "main": "dist/index.js", | ||
"devDependencies": { | ||
"@types/chai": "^4.1.7", | ||
"@types/chai-as-promised": "^7.1.0", | ||
"@types/mocha": "^5.2.7", | ||
"@types/node": "^12.6.3", | ||
"@types/chai": "^4.2.15", | ||
"@types/chai-as-promised": "^7.1.3", | ||
"@types/mocha": "^8.2.1", | ||
"@types/node": "^14.14.31", | ||
"chai": "^4.2.0", | ||
"chai-as-promised": "^7.1.1", | ||
"coveralls": "^3.0.5", | ||
"delay": "^4.3.0", | ||
"coveralls": "^3.1.0", | ||
"delay": "^5.0.0", | ||
"dirty-chai": "^2.0.1", | ||
"istanbul": "^0.4.5", | ||
"mocha": "^6.1.4", | ||
"sinon": "^7.3.2", | ||
"sinon-chai": "^3.3.0", | ||
"sleep-promise": "^8.0.1", | ||
"ts-node": "^8.3.0", | ||
"tslint": "^5.18.0", | ||
"typescript": "^3.5.3" | ||
"mocha": "^6.2.3", | ||
"sinon": "^9.2.4", | ||
"sinon-chai": "^3.5.0", | ||
"sleep-promise": "^9.1.0", | ||
"ts-node": "^9.1.1", | ||
"tslint": "^6.1.3", | ||
"typescript": "^4.1.5" | ||
}, | ||
@@ -76,0 +76,0 @@ "dependencies": { |
156
README.md
@@ -24,3 +24,3 @@ [![Npm Version](https://img.shields.io/npm/v/promise-blocking-queue.svg?style=popout)](https://www.npmjs.com/package/promise-blocking-queue) | ||
The solution - a blocking queue that returns a promise that will be resolved when the added item gain an available slot in the | ||
The solution - a blocking queue that returns a promise that will be resolved when the added item gains an available slot in the | ||
queue, thus, allowing us to pause the stream consumption, until there is a _real_ need to consume the next item - keeping us | ||
@@ -35,55 +35,139 @@ memory smart while maintaining concurrency level of data handling. | ||
## Usage | ||
## Usage example | ||
Let's assume we have a very large (a couple of GBs) file called `users.json` which contains a long list of users we want to add to our DB. | ||
Also, let's assume that our DB instance it very cheap, and as such we don't want to load it to much, so we only want to handle | ||
100 concurrent DB insert operations. | ||
We can achieve a short scalable solution like: | ||
Let's assume we have a very large (a couple of GBs) file called `users.json` which contains a long list of users we want to add to our DB. | ||
Also, let's assume that our DB instance it very cheap, and as such we don't want to load it too much, so we only want to handle | ||
2 concurrent DB insert operations. | ||
We can achieve a short scalable solution like so: | ||
```typescript | ||
import * as JSONStream from 'JSONStream'; | ||
import * as fs from 'fs'; | ||
import * as _ from 'underscore'; | ||
import * as es from 'event-stream'; | ||
import * as sleep from 'sleep-promise'; | ||
import { BlockingQueue } from 'promise-blocking-queue'; | ||
const queue = new BlockingQueue({ concurrency: 100 }); | ||
let count = 0; | ||
const queue = new BlockingQueue({ concurrency: 2 }); | ||
let handled = 0; | ||
let failed = 0; | ||
const readStream = fs.createReadStream('./users.json', { flags: 'r', encoding: 'utf-8' }); | ||
const jsonReadStream = JSONStream.parse('*'); | ||
const jsonWriteStream = JSONStream.stringify(); | ||
const writeStream = fs.createWriteStream('./results.json'); | ||
const logFailed = () => { | ||
console.log(`failed ${++failed}`); | ||
}; | ||
const logAddedUser = (username) => () => { | ||
console.log(`added ${username} #${++handled}`); | ||
jsonWriteStream.write(username); | ||
}; | ||
const addUserToDB = (user) => { | ||
console.log(`adding ${user.username}`); | ||
// Simulate long running task | ||
return sleep((handled + 1) * 100).then(logAddedUser(user.username)); | ||
}; | ||
const mapper = (user, cb) => { | ||
queue.enqueue(() => { | ||
// Add user to DB | ||
return Promise.resolve(); | ||
}).enqueuePromise | ||
.then(() => { | ||
console.log('handled', count++); | ||
cb(); | ||
}) | ||
.catch((err) => { | ||
cb(err); | ||
}); | ||
console.log(`streamed ${user.username}`); | ||
const qResult = queue.enqueue(addUserToDB, user); | ||
qResult.fnPromise.catch(logFailed); | ||
// Continue streaming only after current item handling starts | ||
qResult.enqueuePromise.then(cb, cb); | ||
return false; | ||
}; | ||
const readStream = fs.createReadStream('./users.json', { flags: 'r', encoding: 'utf-8' }); | ||
const jsonReadStream = JSONStream.parse('*'); | ||
const mapStream = es.map(mapper); | ||
// tslint:disable-next-line:no-empty | ||
const noop = () => {}; | ||
const onReadEnd = () => { | ||
console.log('done read streaming'); | ||
// Wait until all work is done | ||
queue.on('idle', () => { | ||
jsonWriteStream.end(); | ||
}); | ||
}; | ||
const onWriteEnd = () => { | ||
console.log(`done processing - ${handled} handled, ${failed} failed`); | ||
process.exit(0); | ||
}; | ||
jsonWriteStream | ||
.pipe(writeStream) | ||
.on('error', (err) => { | ||
console.log('error wrtie streaming', err); | ||
process.exit(1); | ||
}) | ||
.on('end', onWriteEnd) | ||
.on('finish', onWriteEnd); | ||
readStream | ||
.pipe(jsonReadStream) | ||
.pipe(mapStream) | ||
.on('data', _.noop) | ||
.pipe(es.map(mapper)) | ||
.on('data', noop) | ||
.on('error', (err) => { | ||
console.log('error streaming', err); | ||
console.log('error read streaming', err); | ||
process.exit(1); | ||
}) | ||
.on('end', () => { | ||
console.log('done streaming'); | ||
queue.on('idle', () => { | ||
console.log('done processing', count); | ||
process.exit(0); | ||
}); | ||
}); | ||
.on('finish', onReadEnd) | ||
.on('end', onReadEnd); | ||
``` | ||
If `users.json` is like: | ||
```json | ||
[ | ||
{ | ||
"username": "a" | ||
}, | ||
{ | ||
"username": "b" | ||
}, | ||
{ | ||
"username": "c" | ||
}, | ||
{ | ||
"username": "d" | ||
} | ||
] | ||
``` | ||
Output will be: | ||
```bash | ||
streamed a | ||
adding a | ||
streamed b | ||
adding b | ||
streamed c // c now waits in line to start and streaming is paused until then | ||
added a #1 | ||
streamed d // d only get streamed after c has a spot in the queue | ||
adding c // c only gets handled after a is done | ||
added b #2 | ||
adding d // d only gets handled after b is done | ||
done read streaming | ||
added c #3 | ||
added d #4 | ||
done processing - 4 handled, 0 failed | ||
``` | ||
`results.json` will be: | ||
```json | ||
[ | ||
"a" | ||
, | ||
"b" | ||
, | ||
"c" | ||
, | ||
"d" | ||
] | ||
``` | ||
## API | ||
@@ -132,2 +216,8 @@ | ||
###### started | ||
Type: `boolean` | ||
Indicates if the task has already started to run | ||
##### fn | ||
@@ -134,0 +224,0 @@ |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
35881
174
266