New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

for-emit-of

Package Overview
Dependencies
Maintainers
1
Versions
13
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

for-emit-of - npm Package Compare versions

Comparing version 1.0.4 to 1.1.0

dist/sleep.d.ts

28

dist/index.d.ts
/// <reference types="node" />
import { EventEmitter } from "events";
import { Readable, Writable } from "stream";
/**
* Options to define AsyncIterable behavior
*/
interface Options<T = any> {
/**
* The event that generates the AsyncIterable items
*/
event?: string;
/**
* The event to be listen for errors, default "error"
*/
error?: string;
/**
* The events to be listen for finalization, default ["end", "close"]
*/
end?: string[];
/**
* The timeout for the first event emission. If not informed, the AsyncIterable will wait indefinitely
* for it. If it is informed and the timeout is reached, an error is thrown
*/
firstEventTimeout?: number;
/**
* The timeout for between each event emission. If not informed, the AsyncIterable will wait indefinitely
* for them. If it is informed and the timeout is reached, an error is thrown
*/
inBetweenTimeout?: number;
/**
* A transformation to be used for each iterable element before yielding it. If not informed,
* the value will be yield as is.
*/
transform?: (buffer: Buffer) => T;

@@ -7,0 +35,0 @@ }

131

dist/index.js
"use strict";
var _interopRequireWildcard = require("@babel/runtime/helpers/interopRequireWildcard");
Object.defineProperty(exports, "__esModule", {

@@ -12,14 +10,72 @@ value: true

var util = _interopRequireWildcard(require("util"));
var _timeout = require("./timeout");
const sleep = util.promisify(setTimeout);
var _sleep = require("./sleep");
const defaults = {
event: "data",
error: "error"
error: "error",
end: ["close", "end"]
};
/**
* @param {import('events').EventEmitter} emitter
* @param {{event: string, transform: () => any}} options
* Options to define AsyncIterable behavior
*/
function waitResponse(emitter, options) {
return new Promise((resolve, reject) => {
emitter.once(options.event, () => {
resolve();
emitter.removeListener(options.error, reject);
options.end.forEach(event => emitter.removeListener(event, resolve));
});
emitter.once(options.error, reject);
options.end.forEach(event => emitter.once(event, resolve));
});
}
async function awaitAndResetTimeout(emitter, options, timeoutWrapper) {
const result = await waitResponse(emitter, options);
timeoutWrapper.updateDeadline();
return result;
}
function getInBetweenTimeoutRace(options, emitter) {
const timeoutWrapper = (0, _timeout.timeout)(options.inBetweenTimeout);
return () => [awaitAndResetTimeout(emitter, options, timeoutWrapper), timeoutWrapper.awaiter];
}
function getFirstAwaiter(options, emitter) {
if (options.firstEventTimeout) {
const firstTimeout = (0, _timeout.timeout)(options.firstEventTimeout);
return Promise.race([waitResponse(emitter, options), firstTimeout.awaiter]);
}
return waitResponse(emitter, options);
}
function switchRace(options, emitter, getNextRace) {
let timeoutRace;
return () => timeoutRace ? timeoutRace() : [getFirstAwaiter(options, emitter).then(result => {
if (result !== _timeout.timedOut) {
timeoutRace = getNextRace();
}
return result;
})];
}
function getTimeoutRace(options, emitter) {
return switchRace(options, emitter, () => getInBetweenTimeoutRace(options, emitter));
}
function raceFactory(options, emitter) {
if (options.inBetweenTimeout) {
return getTimeoutRace(options, emitter);
}
const getWaitResponse = () => [waitResponse(emitter, options)];
return options.firstEventTimeout ? switchRace(options, emitter, () => getWaitResponse) : getWaitResponse;
}
function forEmitOf(emitter, options) {

@@ -48,14 +104,26 @@ if (!options) {

if (!Array.isArray(options.end)) {
throw new Error("end must be an array");
}
let events = [];
let error;
let active = true;
emitter.on(options.event, event => events.push(event));
emitter.once("error", err => {
const eventListener = event => events.push(event);
const endListener = () => {
active = false;
};
const errorListener = err => {
error = err;
};
emitter.on(options.event, eventListener);
emitter.once(options.error, errorListener);
options.end.forEach(event => {
emitter.once(event, endListener);
});
["close", "end"].forEach(event => {
emitter.once(event, () => {
active = false;
});
});
const getRaceItems = raceFactory(options, emitter);

@@ -67,21 +135,24 @@ async function* generator() {

}
/* We do not want to block the process!
This call allows other processes
a chance to execute.
*/
while (events.length > 0) {
/* We do not want to block the process!
This call allows other processes
a chance to execute.
*/
await (0, _sleep.sleep)(0);
const [event, ...rest] = events;
events = rest;
yield options.transform ? options.transform(event) : event;
}
await sleep(0);
const [event, ...rest] = events;
events = rest;
if (active && !error) {
const winner = await Promise.race(getRaceItems());
if (!event) {
continue;
if (winner === _timeout.timedOut) {
emitter.removeListener(options.event, eventListener);
emitter.removeListener(options.error, errorListener);
options.end.forEach(event => emitter.removeListener(event, endListener));
throw Error("Event timed out");
}
}
if (options.transform) {
yield options.transform(event);
} else {
yield event;
}
}

@@ -88,0 +159,0 @@ }

{
"name": "for-emit-of",
"version": "1.0.4",
"version": "1.1.0",
"description": "Turn Node.js Events into Async Iterables",

@@ -9,3 +9,3 @@ "main": "./dist/index.js",

"test": "cross-env ./node_modules/.bin/nyc ./node_modules/.bin/mocha --recursive --exit --timeout=100000 -r ts-node/register test/**/*.test.ts",
"build": "mkdir dist && cross-env ./node_modules/.bin/babel --extensions .ts src -d dist",
"build": "rm -rf dist && mkdir dist && cross-env ./node_modules/.bin/babel --extensions .ts src -d dist",
"types": "tsc ./src/index.ts -d --emitDeclarationOnly --declarationDir ./dist"

@@ -35,12 +35,12 @@ },

"devDependencies": {
"@babel/cli": "^7.8.4",
"@babel/core": "^7.9.0",
"@babel/plugin-transform-runtime": "^7.9.0",
"@babel/preset-env": "^7.9.5",
"@babel/preset-typescript": "^7.9.0",
"@types/chai": "^4.2.11",
"@babel/cli": "^7.10.5",
"@babel/core": "^7.11.1",
"@babel/plugin-transform-runtime": "^7.11.0",
"@babel/preset-env": "^7.11.0",
"@babel/preset-typescript": "^7.10.4",
"@types/chai": "^4.2.12",
"@types/mocha": "^7.0.2",
"@types/node": "^13.11.1",
"@typescript-eslint/eslint-plugin": "^2.27.0",
"@typescript-eslint/parser": "^2.27.0",
"@types/node": "^13.13.15",
"@typescript-eslint/eslint-plugin": "^2.34.0",
"@typescript-eslint/parser": "^2.34.0",
"babel-plugin-add-module-exports": "^1.0.2",

@@ -50,12 +50,12 @@ "chai": "^4.2.0",

"eslint": "^6.8.0",
"eslint-config-prettier": "^6.10.1",
"eslint-plugin-prettier": "^3.1.3",
"mocha": "^7.1.1",
"nodemon": "^2.0.3",
"nyc": "^15.0.1",
"prettier": "^2.0.4",
"ts-node": "^8.8.2",
"typescript": "^3.8.3"
"eslint-config-prettier": "^6.11.0",
"eslint-plugin-prettier": "^3.1.4",
"mocha": "^7.2.0",
"nodemon": "^2.0.4",
"nyc": "^15.1.0",
"prettier": "^2.0.5",
"ts-node": "^8.10.2",
"typescript": "^3.9.7"
},
"dependencies": {}
}

@@ -75,4 +75,52 @@ # for-emit-of

# FAQ
## When will the iterator end?
`Emitter.on("end")` or `Emitter.on("close")`
# Change the end
```javascript
import forEmitOf from 'for-emit-of';
import { Cart } from '..';
const iterator = forEmitOf(Cart, {
end: ["end", "close"] // default
});
```
# Timeout
## `firstEventTimeout`
```javascript
import forEmitOf from 'for-emit-of';
import { EventEmitter } from "events";
const emitter = new EventEmitter();
const iterator = forEmitOf(emitter, {
firstEventTimeout: 1000,
});
setTimeout(() => {
emitter.emit("data", {});
}, 2000); // greater than firstEventTimeout ERROR!
for await (const msg of iterator) {
console.log(msg); // never get here
}
```
## `inBetweenTimeout`
```javascript
import forEmitOf from 'for-emit-of';
import { EventEmitter } from "events";
const emitter = new EventEmitter();
const iterator = forEmitOf(emitter, {
inBetweenTimeout: 1000,
});
setInterval(() => {
emitter.emit("data", {})
}, 2000) // greater than inBetweenTimeout ERROR!
for await (const msg of iterator) {
console.log(msg); // gets here once
}
```
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc