promise-blocking-queue
Advanced tools
Comparing version 0.1.0 to 1.0.0
@@ -9,3 +9,3 @@ /// <reference types="node" /> | ||
} | ||
declare type MessageEmitter = StrictEventEmitter<EventEmitter, IBlockingQueueEvents>; | ||
type MessageEmitter = StrictEventEmitter<EventEmitter, IBlockingQueueEvents>; | ||
declare const BlockingQueue_base: new () => MessageEmitter; | ||
@@ -12,0 +12,0 @@ export declare class BlockingQueue extends BlockingQueue_base { |
@@ -7,5 +7,5 @@ declare const assignmentCompatibilityHack: unique symbol; | ||
} | ||
declare type InnerEEMethodReturnType<T, TValue, FValue> = T extends (...args: any[]) => any ? ReturnType<T> extends void | undefined ? FValue : TValue : FValue; | ||
declare type EEMethodReturnType<T, S extends string, TValue, FValue = void> = S extends keyof T ? InnerEEMethodReturnType<T[S], TValue, FValue> : FValue; | ||
declare type ListenerType<T> = [T] extends [(...args: infer U) => any] ? U : [T] extends [void] ? [] : [T]; | ||
type InnerEEMethodReturnType<T, TValue, FValue> = T extends (...args: any[]) => any ? ReturnType<T> extends void | undefined ? FValue : TValue : FValue; | ||
type EEMethodReturnType<T, S extends string, TValue, FValue = void> = S extends keyof T ? InnerEEMethodReturnType<T[S], TValue, FValue> : FValue; | ||
type ListenerType<T> = [T] extends [(...args: infer U) => any] ? U : [T] extends [void] ? [] : [T]; | ||
interface OverriddenMethods<TEmitter, TEventRecord, TEmitRecord = TEventRecord> { | ||
@@ -27,4 +27,4 @@ on<P extends keyof TEventRecord, T>(this: T, event: P, listener: (...args: ListenerType<TEventRecord[P]>) => void): EEMethodReturnType<TEmitter, 'on', T>; | ||
} | ||
declare type OverriddenKeys = keyof OverriddenMethods<any, any, any>; | ||
export declare type StrictEventEmitter<TEmitterType, TEventRecord, TEmitRecord = TEventRecord, UnneededMethods extends Exclude<OverriddenKeys, keyof TEmitterType> = Exclude<OverriddenKeys, keyof TEmitterType>, NeededMethods extends Exclude<OverriddenKeys, UnneededMethods> = Exclude<OverriddenKeys, UnneededMethods>> = TypeRecord<TEmitterType, TEventRecord, TEmitRecord> & Pick<TEmitterType, Exclude<keyof TEmitterType, OverriddenKeys>> & Pick<OverriddenMethods<TEmitterType, TEventRecord, TEmitRecord>, NeededMethods>; | ||
type OverriddenKeys = keyof OverriddenMethods<any, any, any>; | ||
export type StrictEventEmitter<TEmitterType, TEventRecord, TEmitRecord = TEventRecord, UnneededMethods extends Exclude<OverriddenKeys, keyof TEmitterType> = Exclude<OverriddenKeys, keyof TEmitterType>, NeededMethods extends Exclude<OverriddenKeys, UnneededMethods> = Exclude<OverriddenKeys, UnneededMethods>> = TypeRecord<TEmitterType, TEventRecord, TEmitRecord> & Pick<TEmitterType, Exclude<keyof TEmitterType, OverriddenKeys>> & Pick<OverriddenMethods<TEmitterType, TEventRecord, TEmitRecord>, NeededMethods>; | ||
export {}; |
'use strict'; | ||
var __createBinding = (this && this.__createBinding) || (Object.create ? (function(o, m, k, k2) { | ||
if (k2 === undefined) k2 = k; | ||
Object.defineProperty(o, k2, { enumerable: true, get: function() { return m[k]; } }); | ||
var desc = Object.getOwnPropertyDescriptor(m, k); | ||
if (!desc || ("get" in desc ? !m.__esModule : desc.writable || desc.configurable)) { | ||
desc = { enumerable: true, get: function() { return m[k]; } }; | ||
} | ||
Object.defineProperty(o, k2, desc); | ||
}) : (function(o, m, k, k2) { | ||
@@ -6,0 +10,0 @@ if (k2 === undefined) k2 = k; |
export interface IBlockingQueueOptions { | ||
concurrency: number; | ||
} | ||
export declare type QueueFn<T, P extends any[]> = ((...args: P) => Promise<T> | T); | ||
export type QueueFn<T, P extends any[]> = ((...args: P) => Promise<T> | T); | ||
export interface IEnqueueResult<T> { | ||
@@ -6,0 +6,0 @@ enqueuePromise: Promise<void>; |
{ | ||
"name": "promise-blocking-queue", | ||
"version": "0.1.0", | ||
"version": "1.0.0", | ||
"description": "Memory optimized promise blocking queue with concurrency control", | ||
@@ -9,3 +9,3 @@ "main": "dist/index.js", | ||
"engines": { | ||
"node": ">=6" | ||
"node": ">=14" | ||
}, | ||
@@ -17,4 +17,4 @@ "scripts": { | ||
"test": "npm run lint && npm run mocha", | ||
"mocha": "mocha test/**/*.ts --reporter spec --opts test/mocha.opts", | ||
"cover": "istanbul cover -x dist/index.js node_modules/mocha/bin/_mocha test/**/*.ts -- -R spec --opts test/mocha.opts", | ||
"mocha": "mocha --config test/.mocharc.json", | ||
"cover": "nyc --reporter=lcov --reporter=text-summary mocha --config test/.mocharc.json", | ||
"lint": "tslint -c tslint.json 'src/**/*.ts' 'test/**/*.ts'" | ||
@@ -53,3 +53,3 @@ }, | ||
"author": "Regev Brody <regevbr@gmail.com>", | ||
"license": "ISC", | ||
"license": "MIT", | ||
"bugs": { | ||
@@ -60,19 +60,21 @@ "url": "https://github.com/PruvoNet/promise-blocking-queue/issues" | ||
"devDependencies": { | ||
"@types/chai": "^4.2.15", | ||
"@types/chai-as-promised": "^7.1.3", | ||
"@types/mocha": "^8.2.1", | ||
"@types/node": "^14.14.31", | ||
"chai": "^4.2.0", | ||
"@istanbuljs/nyc-config-typescript": "^1.0.2", | ||
"@types/chai": "^4.3.5", | ||
"@types/chai-as-promised": "^7.1.5", | ||
"@types/mocha": "^10.0.1", | ||
"@types/node": "^18.16.18", | ||
"chai": "^4.3.7", | ||
"chai-as-promised": "^7.1.1", | ||
"coveralls": "^3.1.0", | ||
"delay": "^5.0.0", | ||
"coveralls": "^3.1.1", | ||
"delay": "^6.0.0", | ||
"dirty-chai": "^2.0.1", | ||
"istanbul": "^0.4.5", | ||
"mocha": "^6.2.3", | ||
"sinon": "^9.2.4", | ||
"sinon-chai": "^3.5.0", | ||
"mocha": "^10.2.0", | ||
"nyc": "^15.1.0", | ||
"sinon": "^15.1.2", | ||
"sinon-chai": "^3.7.0", | ||
"sleep-promise": "^9.1.0", | ||
"ts-node": "^9.1.1", | ||
"source-map-support": "^0.5.21", | ||
"ts-node": "^10.9.1", | ||
"tslint": "^6.1.3", | ||
"typescript": "^4.1.5" | ||
"typescript": "^5.1.3" | ||
}, | ||
@@ -79,0 +81,0 @@ "dependencies": { |
116
README.md
[![Npm Version](https://img.shields.io/npm/v/promise-blocking-queue.svg?style=popout)](https://www.npmjs.com/package/promise-blocking-queue) | ||
[![Build Status](https://travis-ci.org/PruvoNet/promise-blocking-queue.svg?branch=master)](https://travis-ci.org/PruvoNet/promise-blocking-queue) | ||
[![Coverage Status](https://coveralls.io/repos/github/PruvoNet/promise-blocking-queue/badge.svg?branch=master)](https://coveralls.io/github/PruvoNet/promise-blocking-queue?branch=master) | ||
[![Codacy Badge](https://api.codacy.com/project/badge/Grade/58abd1713b064f4c9af7dc88d7178ebe)](https://www.codacy.com/app/regevbr/promise-blocking-queue?utm_source=github.com&utm_medium=referral&utm_content=PruvoNet/promise-blocking-queue&utm_campaign=Badge_Grade) | ||
[![node](https://img.shields.io/node/v-lts/promise-blocking-queue)](https://www.npmjs.com/package/promise-blocking-queue) | ||
[![Build status](https://github.com/PruvoNet/promise-blocking-queue/actions/workflows/ci.yml/badge.svg?branch=master)](https://www.npmjs.com/package/promise-blocking-queue) | ||
[![Test Coverage](https://api.codeclimate.com/v1/badges/5cc9e9fe4871a315f2aa/test_coverage)](https://codeclimate.com/github/PruvoNet/promise-blocking-queue/test_coverage) | ||
[![Maintainability](https://api.codeclimate.com/v1/badges/5cc9e9fe4871a315f2aa/maintainability)](https://codeclimate.com/github/PruvoNet/promise-blocking-queue/maintainability) | ||
[![Known Vulnerabilities](https://snyk.io/test/github/PruvoNet/promise-blocking-queue/badge.svg?targetFile=package.json)](https://snyk.io/test/github/PruvoNet/promise-blocking-queue?targetFile=package.json) | ||
[![dependencies Status](https://david-dm.org/PruvoNet/promise-blocking-queue/status.svg)](https://david-dm.org/PruvoNet/promise-blocking-queue) | ||
[![devDependencies Status](https://david-dm.org/PruvoNet/promise-blocking-queue/dev-status.svg)](https://david-dm.org/PruvoNet/promise-blocking-queue?type=dev) | ||
@@ -52,2 +51,3 @@ # Promise Blocking Queue | ||
let failed = 0; | ||
let awaitDrain: Promise<void> | undefined; | ||
@@ -59,61 +59,79 @@ const readStream = fs.createReadStream('./users.json', { flags: 'r', encoding: 'utf-8' }); | ||
const logFailed = () => { | ||
console.log(`failed ${++failed}`); | ||
const addUserToDB = async (user) => { | ||
try { | ||
console.log(`adding ${user.username}`); | ||
// Simulate long running task | ||
await sleep((handled + 1) * 100); | ||
console.log(`added ${user.username} #${++handled}`); | ||
const writePaused = !jsonWriteStream.write(user.username); | ||
if (writePaused && !awaitDrain) { | ||
// Down stream asked to pause the writes for now | ||
awaitDrain = new Promise((resolve) => { | ||
jsonWriteStream.once('drain', resolve); | ||
}); | ||
} | ||
} catch (err) { | ||
console.log(`failed ${++failed}`, err); | ||
} | ||
}; | ||
const logAddedUser = (username) => () => { | ||
console.log(`added ${username} #${++handled}`); | ||
jsonWriteStream.write(username); | ||
const handleUser = async (user) => { | ||
// Wait until the down stream is ready to receive more data without increasing the memory footprint | ||
if (awaitDrain) { | ||
await awaitDrain; | ||
awaitDrain = undefined; | ||
} | ||
return queue.enqueue(addUserToDB, user).enqueuePromise; | ||
}; | ||
const addUserToDB = (user) => { | ||
console.log(`adding ${user.username}`); | ||
// Simulate long running task | ||
return sleep((handled + 1) * 100).then(logAddedUser(user.username)); | ||
}; | ||
// Do not use async! | ||
const mapper = (user, cb) => { | ||
console.log(`streamed ${user.username}`); | ||
const qResult = queue.enqueue(addUserToDB, user); | ||
qResult.fnPromise.catch(logFailed); | ||
// Continue streaming only after current item handling starts | ||
qResult.enqueuePromise.then(cb, cb); | ||
return false; | ||
console.log(`streamed ${user.username}`); | ||
handleUser(user) | ||
.then(() => { | ||
cb(); | ||
}); | ||
// Pause the read stream until we are ready to handle more data | ||
return false; | ||
}; | ||
// tslint:disable-next-line:no-empty | ||
const noop = () => {}; | ||
const onReadEnd = () => { | ||
console.log('done read streaming'); | ||
// Wait until all work is done | ||
queue.on('idle', () => { | ||
jsonWriteStream.end(); | ||
}); | ||
console.log('done read streaming'); | ||
// If nothing was written, idle event will not be fired | ||
if (queue.pendingCount === 0 && queue.activeCount === 0) { | ||
jsonWriteStream.end(); | ||
} else { | ||
// Wait until all work is done | ||
queue.on('idle', () => { | ||
jsonWriteStream.end(); | ||
}); | ||
} | ||
}; | ||
const onWriteEnd = () => { | ||
console.log(`done processing - ${handled} handled, ${failed} failed`); | ||
process.exit(0); | ||
console.log(`done processing - ${handled} handled, ${failed} failed`); | ||
process.exit(0); | ||
}; | ||
jsonWriteStream | ||
.pipe(writeStream) | ||
.on('error', (err) => { | ||
console.log('error wrtie streaming', err); | ||
process.exit(1); | ||
}) | ||
.on('end', onWriteEnd) | ||
.on('finish', onWriteEnd); | ||
.pipe(writeStream) | ||
.on('error', (err) => { | ||
console.log('error wrtie streaming', err); | ||
process.exit(1); | ||
}) | ||
.on('end', onWriteEnd) | ||
.on('finish', onWriteEnd); | ||
readStream | ||
.pipe(jsonReadStream) | ||
.pipe(es.map(mapper)) | ||
.on('data', noop) | ||
.on('error', (err) => { | ||
console.log('error read streaming', err); | ||
process.exit(1); | ||
}) | ||
.on('finish', onReadEnd) | ||
.on('end', onReadEnd); | ||
.pipe(jsonReadStream) | ||
.pipe(es.map(mapper)) | ||
.on('data', () => { | ||
// Do nothing | ||
}) | ||
.on('error', (err) => { | ||
console.log('error read streaming', err); | ||
process.exit(1); | ||
}) | ||
.on('finish', onReadEnd) | ||
.on('end', onReadEnd); | ||
``` | ||
@@ -149,4 +167,4 @@ | ||
added a #1 | ||
adding c // c only gets handled after a is done | ||
streamed d // d only get streamed after c has a spot in the queue | ||
adding c // c only gets handled after a is done | ||
added b #2 | ||
@@ -153,0 +171,0 @@ adding d // d only gets handled after b is done |
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Copyleft License
License(Experimental) Copyleft license information was found.
Found 1 instance in 1 package
Mixed license
License(Experimental) Package contains multiple licenses.
Found 1 instance in 1 package
No v1
QualityPackage is not semver >=1. This means it is not stable and does not support ^ ranges.
Found 1 instance in 1 package
Non-permissive License
License(Experimental) A license not known to be considered permissive was found.
Found 1 instance in 1 package
0
100
178
0
284
19237
19
11