for-emit-of
Advanced tools
Comparing version 1.0.4 to 1.1.0
/// <reference types="node" /> | ||
import { EventEmitter } from "events"; | ||
import { Readable, Writable } from "stream"; | ||
/** | ||
* Options to define AsyncIterable behavior | ||
*/ | ||
interface Options<T = any> { | ||
/** | ||
* The event that generates the AsyncIterable items | ||
*/ | ||
event?: string; | ||
/** | ||
* The event to be listen for errors, default "error" | ||
*/ | ||
error?: string; | ||
/** | ||
* The events to be listen for finalization, default ["end", "close"] | ||
*/ | ||
end?: string[]; | ||
/** | ||
* The timeout for the first event emission. If not informed, the AsyncIterable will wait indefinitely | ||
* for it. If it is informed and the timeout is reached, an error is thrown | ||
*/ | ||
firstEventTimeout?: number; | ||
/** | ||
* The timeout for between each event emission. If not informed, the AsyncIterable will wait indefinitely | ||
* for them. If it is informed and the timeout is reached, an error is thrown | ||
*/ | ||
inBetweenTimeout?: number; | ||
/** | ||
* A transformation to be used for each iterable element before yielding it. If not informed, | ||
* the value will be yield as is. | ||
*/ | ||
transform?: (buffer: Buffer) => T; | ||
@@ -7,0 +35,0 @@ } |
"use strict"; | ||
var _interopRequireWildcard = require("@babel/runtime/helpers/interopRequireWildcard"); | ||
Object.defineProperty(exports, "__esModule", { | ||
@@ -12,14 +10,72 @@ value: true | ||
var util = _interopRequireWildcard(require("util")); | ||
var _timeout = require("./timeout"); | ||
const sleep = util.promisify(setTimeout); | ||
var _sleep = require("./sleep"); | ||
const defaults = { | ||
event: "data", | ||
error: "error" | ||
error: "error", | ||
end: ["close", "end"] | ||
}; | ||
/** | ||
* @param {import('events').EventEmitter} emitter | ||
* @param {{event: string, transform: () => any}} options | ||
* Options to define AsyncIterable behavior | ||
*/ | ||
function waitResponse(emitter, options) { | ||
return new Promise((resolve, reject) => { | ||
emitter.once(options.event, () => { | ||
resolve(); | ||
emitter.removeListener(options.error, reject); | ||
options.end.forEach(event => emitter.removeListener(event, resolve)); | ||
}); | ||
emitter.once(options.error, reject); | ||
options.end.forEach(event => emitter.once(event, resolve)); | ||
}); | ||
} | ||
async function awaitAndResetTimeout(emitter, options, timeoutWrapper) { | ||
const result = await waitResponse(emitter, options); | ||
timeoutWrapper.updateDeadline(); | ||
return result; | ||
} | ||
function getInBetweenTimeoutRace(options, emitter) { | ||
const timeoutWrapper = (0, _timeout.timeout)(options.inBetweenTimeout); | ||
return () => [awaitAndResetTimeout(emitter, options, timeoutWrapper), timeoutWrapper.awaiter]; | ||
} | ||
function getFirstAwaiter(options, emitter) { | ||
if (options.firstEventTimeout) { | ||
const firstTimeout = (0, _timeout.timeout)(options.firstEventTimeout); | ||
return Promise.race([waitResponse(emitter, options), firstTimeout.awaiter]); | ||
} | ||
return waitResponse(emitter, options); | ||
} | ||
function switchRace(options, emitter, getNextRace) { | ||
let timeoutRace; | ||
return () => timeoutRace ? timeoutRace() : [getFirstAwaiter(options, emitter).then(result => { | ||
if (result !== _timeout.timedOut) { | ||
timeoutRace = getNextRace(); | ||
} | ||
return result; | ||
})]; | ||
} | ||
function getTimeoutRace(options, emitter) { | ||
return switchRace(options, emitter, () => getInBetweenTimeoutRace(options, emitter)); | ||
} | ||
function raceFactory(options, emitter) { | ||
if (options.inBetweenTimeout) { | ||
return getTimeoutRace(options, emitter); | ||
} | ||
const getWaitResponse = () => [waitResponse(emitter, options)]; | ||
return options.firstEventTimeout ? switchRace(options, emitter, () => getWaitResponse) : getWaitResponse; | ||
} | ||
function forEmitOf(emitter, options) { | ||
@@ -48,14 +104,26 @@ if (!options) { | ||
if (!Array.isArray(options.end)) { | ||
throw new Error("end must be an array"); | ||
} | ||
let events = []; | ||
let error; | ||
let active = true; | ||
emitter.on(options.event, event => events.push(event)); | ||
emitter.once("error", err => { | ||
const eventListener = event => events.push(event); | ||
const endListener = () => { | ||
active = false; | ||
}; | ||
const errorListener = err => { | ||
error = err; | ||
}; | ||
emitter.on(options.event, eventListener); | ||
emitter.once(options.error, errorListener); | ||
options.end.forEach(event => { | ||
emitter.once(event, endListener); | ||
}); | ||
["close", "end"].forEach(event => { | ||
emitter.once(event, () => { | ||
active = false; | ||
}); | ||
}); | ||
const getRaceItems = raceFactory(options, emitter); | ||
@@ -67,21 +135,24 @@ async function* generator() { | ||
} | ||
/* We do not want to block the process! | ||
This call allows other processes | ||
a chance to execute. | ||
*/ | ||
while (events.length > 0) { | ||
/* We do not want to block the process! | ||
This call allows other processes | ||
a chance to execute. | ||
*/ | ||
await (0, _sleep.sleep)(0); | ||
const [event, ...rest] = events; | ||
events = rest; | ||
yield options.transform ? options.transform(event) : event; | ||
} | ||
await sleep(0); | ||
const [event, ...rest] = events; | ||
events = rest; | ||
if (active && !error) { | ||
const winner = await Promise.race(getRaceItems()); | ||
if (!event) { | ||
continue; | ||
if (winner === _timeout.timedOut) { | ||
emitter.removeListener(options.event, eventListener); | ||
emitter.removeListener(options.error, errorListener); | ||
options.end.forEach(event => emitter.removeListener(event, endListener)); | ||
throw Error("Event timed out"); | ||
} | ||
} | ||
if (options.transform) { | ||
yield options.transform(event); | ||
} else { | ||
yield event; | ||
} | ||
} | ||
@@ -88,0 +159,0 @@ } |
{ | ||
"name": "for-emit-of", | ||
"version": "1.0.4", | ||
"version": "1.1.0", | ||
"description": "Turn Node.js Events into Async Iterables", | ||
@@ -9,3 +9,3 @@ "main": "./dist/index.js", | ||
"test": "cross-env ./node_modules/.bin/nyc ./node_modules/.bin/mocha --recursive --exit --timeout=100000 -r ts-node/register test/**/*.test.ts", | ||
"build": "mkdir dist && cross-env ./node_modules/.bin/babel --extensions .ts src -d dist", | ||
"build": "rm -rf dist && mkdir dist && cross-env ./node_modules/.bin/babel --extensions .ts src -d dist", | ||
"types": "tsc ./src/index.ts -d --emitDeclarationOnly --declarationDir ./dist" | ||
@@ -35,12 +35,12 @@ }, | ||
"devDependencies": { | ||
"@babel/cli": "^7.8.4", | ||
"@babel/core": "^7.9.0", | ||
"@babel/plugin-transform-runtime": "^7.9.0", | ||
"@babel/preset-env": "^7.9.5", | ||
"@babel/preset-typescript": "^7.9.0", | ||
"@types/chai": "^4.2.11", | ||
"@babel/cli": "^7.10.5", | ||
"@babel/core": "^7.11.1", | ||
"@babel/plugin-transform-runtime": "^7.11.0", | ||
"@babel/preset-env": "^7.11.0", | ||
"@babel/preset-typescript": "^7.10.4", | ||
"@types/chai": "^4.2.12", | ||
"@types/mocha": "^7.0.2", | ||
"@types/node": "^13.11.1", | ||
"@typescript-eslint/eslint-plugin": "^2.27.0", | ||
"@typescript-eslint/parser": "^2.27.0", | ||
"@types/node": "^13.13.15", | ||
"@typescript-eslint/eslint-plugin": "^2.34.0", | ||
"@typescript-eslint/parser": "^2.34.0", | ||
"babel-plugin-add-module-exports": "^1.0.2", | ||
@@ -50,12 +50,12 @@ "chai": "^4.2.0", | ||
"eslint": "^6.8.0", | ||
"eslint-config-prettier": "^6.10.1", | ||
"eslint-plugin-prettier": "^3.1.3", | ||
"mocha": "^7.1.1", | ||
"nodemon": "^2.0.3", | ||
"nyc": "^15.0.1", | ||
"prettier": "^2.0.4", | ||
"ts-node": "^8.8.2", | ||
"typescript": "^3.8.3" | ||
"eslint-config-prettier": "^6.11.0", | ||
"eslint-plugin-prettier": "^3.1.4", | ||
"mocha": "^7.2.0", | ||
"nodemon": "^2.0.4", | ||
"nyc": "^15.1.0", | ||
"prettier": "^2.0.5", | ||
"ts-node": "^8.10.2", | ||
"typescript": "^3.9.7" | ||
}, | ||
"dependencies": {} | ||
} |
@@ -75,4 +75,52 @@ # for-emit-of | ||
# FAQ | ||
## When will the iterator end? | ||
`Emitter.on("end")` or `Emitter.on("close")` | ||
# Change the end | ||
```javascript | ||
import forEmitOf from 'for-emit-of'; | ||
import { Cart } from '..'; | ||
const iterator = forEmitOf(Cart, { | ||
end: ["end", "close"] // default | ||
}); | ||
``` | ||
# Timeout | ||
## `firstEventTimeout` | ||
```javascript | ||
import forEmitOf from 'for-emit-of'; | ||
import { EventEmitter } from "events"; | ||
const emitter = new EventEmitter(); | ||
const iterator = forEmitOf(emitter, { | ||
firstEventTimeout: 1000, | ||
}); | ||
setTimeout(() => { | ||
emitter.emit("data", {}); | ||
}, 2000); // greater than firstEventTimeout ERROR! | ||
for await (const msg of iterator) { | ||
console.log(msg); // never get here | ||
} | ||
``` | ||
## `inBetweenTimeout` | ||
```javascript | ||
import forEmitOf from 'for-emit-of'; | ||
import { EventEmitter } from "events"; | ||
const emitter = new EventEmitter(); | ||
const iterator = forEmitOf(emitter, { | ||
inBetweenTimeout: 1000, | ||
}); | ||
setInterval(() => { | ||
emitter.emit("data", {}) | ||
}, 2000) // greater than inBetweenTimeout ERROR! | ||
for await (const msg of iterator) { | ||
console.log(msg); // gets here once | ||
} | ||
``` |
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
12424
9
219
125
1