Socket
Socket
Sign inDemoInstall

promise-blocking-queue

Package Overview
Dependencies
1
Maintainers
1
Versions
4
Alerts
File Explorer

Advanced tools

Install Socket

Detect and block malicious and high-risk dependencies

Install

Comparing version 0.1.0 to 1.0.0

2

dist/BlockingQueue.d.ts

@@ -9,3 +9,3 @@ /// <reference types="node" />

}
declare type MessageEmitter = StrictEventEmitter<EventEmitter, IBlockingQueueEvents>;
type MessageEmitter = StrictEventEmitter<EventEmitter, IBlockingQueueEvents>;
declare const BlockingQueue_base: new () => MessageEmitter;

@@ -12,0 +12,0 @@ export declare class BlockingQueue extends BlockingQueue_base {

@@ -7,5 +7,5 @@ declare const assignmentCompatibilityHack: unique symbol;

}
declare type InnerEEMethodReturnType<T, TValue, FValue> = T extends (...args: any[]) => any ? ReturnType<T> extends void | undefined ? FValue : TValue : FValue;
declare type EEMethodReturnType<T, S extends string, TValue, FValue = void> = S extends keyof T ? InnerEEMethodReturnType<T[S], TValue, FValue> : FValue;
declare type ListenerType<T> = [T] extends [(...args: infer U) => any] ? U : [T] extends [void] ? [] : [T];
type InnerEEMethodReturnType<T, TValue, FValue> = T extends (...args: any[]) => any ? ReturnType<T> extends void | undefined ? FValue : TValue : FValue;
type EEMethodReturnType<T, S extends string, TValue, FValue = void> = S extends keyof T ? InnerEEMethodReturnType<T[S], TValue, FValue> : FValue;
type ListenerType<T> = [T] extends [(...args: infer U) => any] ? U : [T] extends [void] ? [] : [T];
interface OverriddenMethods<TEmitter, TEventRecord, TEmitRecord = TEventRecord> {

@@ -27,4 +27,4 @@ on<P extends keyof TEventRecord, T>(this: T, event: P, listener: (...args: ListenerType<TEventRecord[P]>) => void): EEMethodReturnType<TEmitter, 'on', T>;

}
declare type OverriddenKeys = keyof OverriddenMethods<any, any, any>;
export declare type StrictEventEmitter<TEmitterType, TEventRecord, TEmitRecord = TEventRecord, UnneededMethods extends Exclude<OverriddenKeys, keyof TEmitterType> = Exclude<OverriddenKeys, keyof TEmitterType>, NeededMethods extends Exclude<OverriddenKeys, UnneededMethods> = Exclude<OverriddenKeys, UnneededMethods>> = TypeRecord<TEmitterType, TEventRecord, TEmitRecord> & Pick<TEmitterType, Exclude<keyof TEmitterType, OverriddenKeys>> & Pick<OverriddenMethods<TEmitterType, TEventRecord, TEmitRecord>, NeededMethods>;
type OverriddenKeys = keyof OverriddenMethods<any, any, any>;
export type StrictEventEmitter<TEmitterType, TEventRecord, TEmitRecord = TEventRecord, UnneededMethods extends Exclude<OverriddenKeys, keyof TEmitterType> = Exclude<OverriddenKeys, keyof TEmitterType>, NeededMethods extends Exclude<OverriddenKeys, UnneededMethods> = Exclude<OverriddenKeys, UnneededMethods>> = TypeRecord<TEmitterType, TEventRecord, TEmitRecord> & Pick<TEmitterType, Exclude<keyof TEmitterType, OverriddenKeys>> & Pick<OverriddenMethods<TEmitterType, TEventRecord, TEmitRecord>, NeededMethods>;
export {};
'use strict';
var __createBinding = (this && this.__createBinding) || (Object.create ? (function(o, m, k, k2) {
if (k2 === undefined) k2 = k;
Object.defineProperty(o, k2, { enumerable: true, get: function() { return m[k]; } });
var desc = Object.getOwnPropertyDescriptor(m, k);
if (!desc || ("get" in desc ? !m.__esModule : desc.writable || desc.configurable)) {
desc = { enumerable: true, get: function() { return m[k]; } };
}
Object.defineProperty(o, k2, desc);
}) : (function(o, m, k, k2) {

@@ -6,0 +10,0 @@ if (k2 === undefined) k2 = k;

export interface IBlockingQueueOptions {
concurrency: number;
}
export declare type QueueFn<T, P extends any[]> = ((...args: P) => Promise<T> | T);
export type QueueFn<T, P extends any[]> = ((...args: P) => Promise<T> | T);
export interface IEnqueueResult<T> {

@@ -6,0 +6,0 @@ enqueuePromise: Promise<void>;

{
"name": "promise-blocking-queue",
"version": "0.1.0",
"version": "1.0.0",
"description": "Memory optimized promise blocking queue with concurrency control",

@@ -9,3 +9,3 @@ "main": "dist/index.js",

"engines": {
"node": ">=6"
"node": ">=14"
},

@@ -17,4 +17,4 @@ "scripts": {

"test": "npm run lint && npm run mocha",
"mocha": "mocha test/**/*.ts --reporter spec --opts test/mocha.opts",
"cover": "istanbul cover -x dist/index.js node_modules/mocha/bin/_mocha test/**/*.ts -- -R spec --opts test/mocha.opts",
"mocha": "mocha --config test/.mocharc.json",
"cover": "nyc --reporter=lcov --reporter=text-summary mocha --config test/.mocharc.json",
"lint": "tslint -c tslint.json 'src/**/*.ts' 'test/**/*.ts'"

@@ -53,3 +53,3 @@ },

"author": "Regev Brody <regevbr@gmail.com>",
"license": "ISC",
"license": "MIT",
"bugs": {

@@ -60,19 +60,21 @@ "url": "https://github.com/PruvoNet/promise-blocking-queue/issues"

"devDependencies": {
"@types/chai": "^4.2.15",
"@types/chai-as-promised": "^7.1.3",
"@types/mocha": "^8.2.1",
"@types/node": "^14.14.31",
"chai": "^4.2.0",
"@istanbuljs/nyc-config-typescript": "^1.0.2",
"@types/chai": "^4.3.5",
"@types/chai-as-promised": "^7.1.5",
"@types/mocha": "^10.0.1",
"@types/node": "^18.16.18",
"chai": "^4.3.7",
"chai-as-promised": "^7.1.1",
"coveralls": "^3.1.0",
"delay": "^5.0.0",
"coveralls": "^3.1.1",
"delay": "^6.0.0",
"dirty-chai": "^2.0.1",
"istanbul": "^0.4.5",
"mocha": "^6.2.3",
"sinon": "^9.2.4",
"sinon-chai": "^3.5.0",
"mocha": "^10.2.0",
"nyc": "^15.1.0",
"sinon": "^15.1.2",
"sinon-chai": "^3.7.0",
"sleep-promise": "^9.1.0",
"ts-node": "^9.1.1",
"source-map-support": "^0.5.21",
"ts-node": "^10.9.1",
"tslint": "^6.1.3",
"typescript": "^4.1.5"
"typescript": "^5.1.3"
},

@@ -79,0 +81,0 @@ "dependencies": {

[![Npm Version](https://img.shields.io/npm/v/promise-blocking-queue.svg?style=popout)](https://www.npmjs.com/package/promise-blocking-queue)
[![Build Status](https://travis-ci.org/PruvoNet/promise-blocking-queue.svg?branch=master)](https://travis-ci.org/PruvoNet/promise-blocking-queue)
[![Coverage Status](https://coveralls.io/repos/github/PruvoNet/promise-blocking-queue/badge.svg?branch=master)](https://coveralls.io/github/PruvoNet/promise-blocking-queue?branch=master)
[![Codacy Badge](https://api.codacy.com/project/badge/Grade/58abd1713b064f4c9af7dc88d7178ebe)](https://www.codacy.com/app/regevbr/promise-blocking-queue?utm_source=github.com&amp;utm_medium=referral&amp;utm_content=PruvoNet/promise-blocking-queue&amp;utm_campaign=Badge_Grade)
[![node](https://img.shields.io/node/v-lts/promise-blocking-queue)](https://www.npmjs.com/package/promise-blocking-queue)
[![Build status](https://github.com/PruvoNet/promise-blocking-queue/actions/workflows/ci.yml/badge.svg?branch=master)](https://www.npmjs.com/package/promise-blocking-queue)
[![Test Coverage](https://api.codeclimate.com/v1/badges/5cc9e9fe4871a315f2aa/test_coverage)](https://codeclimate.com/github/PruvoNet/promise-blocking-queue/test_coverage)
[![Maintainability](https://api.codeclimate.com/v1/badges/5cc9e9fe4871a315f2aa/maintainability)](https://codeclimate.com/github/PruvoNet/promise-blocking-queue/maintainability)
[![Known Vulnerabilities](https://snyk.io/test/github/PruvoNet/promise-blocking-queue/badge.svg?targetFile=package.json)](https://snyk.io/test/github/PruvoNet/promise-blocking-queue?targetFile=package.json)
[![dependencies Status](https://david-dm.org/PruvoNet/promise-blocking-queue/status.svg)](https://david-dm.org/PruvoNet/promise-blocking-queue)
[![devDependencies Status](https://david-dm.org/PruvoNet/promise-blocking-queue/dev-status.svg)](https://david-dm.org/PruvoNet/promise-blocking-queue?type=dev)

@@ -52,2 +51,3 @@ # Promise Blocking Queue

let failed = 0;
let awaitDrain: Promise<void> | undefined;

@@ -59,61 +59,79 @@ const readStream = fs.createReadStream('./users.json', { flags: 'r', encoding: 'utf-8' });

const logFailed = () => {
console.log(`failed ${++failed}`);
const addUserToDB = async (user) => {
try {
console.log(`adding ${user.username}`);
// Simulate long running task
await sleep((handled + 1) * 100);
console.log(`added ${user.username} #${++handled}`);
const writePaused = !jsonWriteStream.write(user.username);
if (writePaused && !awaitDrain) {
// Down stream asked to pause the writes for now
awaitDrain = new Promise((resolve) => {
jsonWriteStream.once('drain', resolve);
});
}
} catch (err) {
console.log(`failed ${++failed}`, err);
}
};
const logAddedUser = (username) => () => {
console.log(`added ${username} #${++handled}`);
jsonWriteStream.write(username);
const handleUser = async (user) => {
// Wait until the down stream is ready to receive more data without increasing the memory footprint
if (awaitDrain) {
await awaitDrain;
awaitDrain = undefined;
}
return queue.enqueue(addUserToDB, user).enqueuePromise;
};
const addUserToDB = (user) => {
console.log(`adding ${user.username}`);
// Simulate long running task
return sleep((handled + 1) * 100).then(logAddedUser(user.username));
};
// Do not use async!
const mapper = (user, cb) => {
console.log(`streamed ${user.username}`);
const qResult = queue.enqueue(addUserToDB, user);
qResult.fnPromise.catch(logFailed);
// Continue streaming only after current item handling starts
qResult.enqueuePromise.then(cb, cb);
return false;
console.log(`streamed ${user.username}`);
handleUser(user)
.then(() => {
cb();
});
// Pause the read stream until we are ready to handle more data
return false;
};
// tslint:disable-next-line:no-empty
const noop = () => {};
const onReadEnd = () => {
console.log('done read streaming');
// Wait until all work is done
queue.on('idle', () => {
jsonWriteStream.end();
});
console.log('done read streaming');
// If nothing was written, idle event will not be fired
if (queue.pendingCount === 0 && queue.activeCount === 0) {
jsonWriteStream.end();
} else {
// Wait until all work is done
queue.on('idle', () => {
jsonWriteStream.end();
});
}
};
const onWriteEnd = () => {
console.log(`done processing - ${handled} handled, ${failed} failed`);
process.exit(0);
console.log(`done processing - ${handled} handled, ${failed} failed`);
process.exit(0);
};
jsonWriteStream
.pipe(writeStream)
.on('error', (err) => {
console.log('error wrtie streaming', err);
process.exit(1);
})
.on('end', onWriteEnd)
.on('finish', onWriteEnd);
.pipe(writeStream)
.on('error', (err) => {
console.log('error wrtie streaming', err);
process.exit(1);
})
.on('end', onWriteEnd)
.on('finish', onWriteEnd);
readStream
.pipe(jsonReadStream)
.pipe(es.map(mapper))
.on('data', noop)
.on('error', (err) => {
console.log('error read streaming', err);
process.exit(1);
})
.on('finish', onReadEnd)
.on('end', onReadEnd);
.pipe(jsonReadStream)
.pipe(es.map(mapper))
.on('data', () => {
// Do nothing
})
.on('error', (err) => {
console.log('error read streaming', err);
process.exit(1);
})
.on('finish', onReadEnd)
.on('end', onReadEnd);
```

@@ -149,4 +167,4 @@

added a #1
adding c // c only gets handled after a is done
streamed d // d only get streamed after c has a spot in the queue
adding c // c only gets handled after a is done
added b #2

@@ -153,0 +171,0 @@ adding d // d only gets handled after b is done

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc