@n1ru4l/push-pull-async-iterable-iterator
Advanced tools
Comparing version 3.1.0 to 3.2.0
@@ -6,1 +6,5 @@ export * from "./makePushPullAsyncIterableIterator"; | ||
export * from "./isAsyncIterable"; | ||
export * from "./withHandlers"; | ||
export * from "./withHandlersFrom"; | ||
export * from "./operators/filter"; | ||
export * from "./operators/map"; |
139
index.js
@@ -5,2 +5,30 @@ 'use strict'; | ||
/** | ||
* Attaches a cleanup handler to a AsyncIterable. | ||
* | ||
* @param source The source that should have a return handler attached | ||
* @param onReturn The return handler that should be attached | ||
* @returns | ||
*/ | ||
function withHandlers(source, onReturn, onThrow) { | ||
const stream = (async function* withReturnSource() { | ||
yield* source; | ||
})(); | ||
const originalReturn = stream.return.bind(stream); | ||
if (onReturn) { | ||
stream.return = (...args) => { | ||
onReturn(); | ||
return originalReturn(...args); | ||
}; | ||
} | ||
if (onThrow) { | ||
const originalThrow = stream.throw.bind(stream); | ||
stream.throw = (err) => { | ||
onThrow(err); | ||
return originalThrow(err); | ||
}; | ||
} | ||
return stream; | ||
} | ||
function createDeferred() { | ||
@@ -14,4 +42,2 @@ const d = {}; | ||
} | ||
const SYMBOL_FINISHED = Symbol(); | ||
const SYMBOL_NEW_VALUE = Symbol(); | ||
/** | ||
@@ -25,7 +51,16 @@ * makePushPullAsyncIterableIterator | ||
function makePushPullAsyncIterableIterator() { | ||
let isRunning = true; | ||
let state = { | ||
type: "running" /* running */ | ||
}; | ||
let next = createDeferred(); | ||
const values = []; | ||
let newValueD = createDeferred(); | ||
const finishedD = createDeferred(); | ||
const asyncIterableIterator = (async function* PushPullAsyncIterableIterator() { | ||
function pushValue(value) { | ||
if (state.type !== "running" /* running */) { | ||
return; | ||
} | ||
values.push(value); | ||
next.resolve(); | ||
next = createDeferred(); | ||
} | ||
const source = (async function* PushPullAsyncIterableIterator() { | ||
while (true) { | ||
@@ -37,44 +72,33 @@ if (values.length > 0) { | ||
else { | ||
const result = await Promise.race([ | ||
newValueD.promise, | ||
finishedD.promise | ||
]); | ||
if (result === SYMBOL_FINISHED) { | ||
break; | ||
if (state.type === "error" /* error */) { | ||
throw state.error; | ||
} | ||
if (result !== SYMBOL_NEW_VALUE) { | ||
throw result; | ||
if (state.type === "finished" /* finished */) { | ||
return; | ||
} | ||
await next.promise; | ||
} | ||
} | ||
})(); | ||
function pushValue(value) { | ||
if (isRunning === false) { | ||
// TODO: Should this throw? | ||
const stream = withHandlers(source, () => { | ||
if (state.type !== "running" /* running */) { | ||
return; | ||
} | ||
values.push(value); | ||
newValueD.resolve(SYMBOL_NEW_VALUE); | ||
newValueD = createDeferred(); | ||
} | ||
// We monkey patch the original generator for clean-up | ||
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion | ||
const originalReturn = asyncIterableIterator.return.bind(asyncIterableIterator); | ||
asyncIterableIterator.return = ( | ||
// eslint-disable-next-line @typescript-eslint/no-explicit-any | ||
...args) => { | ||
isRunning = false; | ||
finishedD.resolve(SYMBOL_FINISHED); | ||
return originalReturn(...args); | ||
}; | ||
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion | ||
const originalThrow = asyncIterableIterator.throw.bind(asyncIterableIterator); | ||
asyncIterableIterator.throw = (err) => { | ||
isRunning = false; | ||
finishedD.resolve(err); | ||
return originalThrow(err); | ||
}; | ||
state = { | ||
type: "finished" /* finished */ | ||
}; | ||
next.resolve(); | ||
}, (error) => { | ||
if (state.type !== "running" /* running */) { | ||
return; | ||
} | ||
state = { | ||
type: "error" /* error */, | ||
error | ||
}; | ||
next.resolve(); | ||
}); | ||
return { | ||
pushValue, | ||
asyncIterableIterator | ||
asyncIterableIterator: stream | ||
}; | ||
@@ -141,5 +165,42 @@ } | ||
/** | ||
* Attaches a cleanup handler from and AsyncIterable to an AsyncIterable. | ||
* | ||
* @param source | ||
* @param target | ||
*/ | ||
function withHandlersFrom( | ||
/** The source that should be returned with attached handlers. */ | ||
source, | ||
/**The target on which the return and throw methods should be called. */ | ||
target) { | ||
return withHandlers(source, () => { var _a; return (_a = target.return) === null || _a === void 0 ? void 0 : _a.call(target); }, err => { var _a; return (_a = target.throw) === null || _a === void 0 ? void 0 : _a.call(target, err); }); | ||
} | ||
function filter(filter) { | ||
return async function* filterGenerator(asyncIterable) { | ||
for await (const value of asyncIterable) { | ||
if (filter(value)) { | ||
yield value; | ||
} | ||
} | ||
}; | ||
} | ||
/** | ||
* Map the events published by an AsyncIterable. | ||
*/ | ||
const map = (map) => async function* mapGenerator(asyncIterable) { | ||
for await (const value of asyncIterable) { | ||
yield map(value); | ||
} | ||
}; | ||
exports.applyAsyncIterableIteratorToSink = applyAsyncIterableIteratorToSink; | ||
exports.filter = filter; | ||
exports.isAsyncIterable = isAsyncIterable; | ||
exports.makeAsyncIterableIteratorFromSink = makeAsyncIterableIteratorFromSink; | ||
exports.makePushPullAsyncIterableIterator = makePushPullAsyncIterableIterator; | ||
exports.map = map; | ||
exports.withHandlers = withHandlers; | ||
exports.withHandlersFrom = withHandlersFrom; |
@@ -0,1 +1,2 @@ | ||
import { withHandlers } from "./withHandlers"; | ||
function createDeferred() { | ||
@@ -9,4 +10,2 @@ const d = {}; | ||
} | ||
const SYMBOL_FINISHED = Symbol(); | ||
const SYMBOL_NEW_VALUE = Symbol(); | ||
/** | ||
@@ -20,7 +19,16 @@ * makePushPullAsyncIterableIterator | ||
export function makePushPullAsyncIterableIterator() { | ||
let isRunning = true; | ||
let state = { | ||
type: "running" /* running */ | ||
}; | ||
let next = createDeferred(); | ||
const values = []; | ||
let newValueD = createDeferred(); | ||
const finishedD = createDeferred(); | ||
const asyncIterableIterator = (async function* PushPullAsyncIterableIterator() { | ||
function pushValue(value) { | ||
if (state.type !== "running" /* running */) { | ||
return; | ||
} | ||
values.push(value); | ||
next.resolve(); | ||
next = createDeferred(); | ||
} | ||
const source = (async function* PushPullAsyncIterableIterator() { | ||
while (true) { | ||
@@ -32,46 +40,35 @@ if (values.length > 0) { | ||
else { | ||
const result = await Promise.race([ | ||
newValueD.promise, | ||
finishedD.promise | ||
]); | ||
if (result === SYMBOL_FINISHED) { | ||
break; | ||
if (state.type === "error" /* error */) { | ||
throw state.error; | ||
} | ||
if (result !== SYMBOL_NEW_VALUE) { | ||
throw result; | ||
if (state.type === "finished" /* finished */) { | ||
return; | ||
} | ||
await next.promise; | ||
} | ||
} | ||
})(); | ||
function pushValue(value) { | ||
if (isRunning === false) { | ||
// TODO: Should this throw? | ||
const stream = withHandlers(source, () => { | ||
if (state.type !== "running" /* running */) { | ||
return; | ||
} | ||
values.push(value); | ||
newValueD.resolve(SYMBOL_NEW_VALUE); | ||
newValueD = createDeferred(); | ||
} | ||
// We monkey patch the original generator for clean-up | ||
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion | ||
const originalReturn = asyncIterableIterator.return.bind(asyncIterableIterator); | ||
asyncIterableIterator.return = ( | ||
// eslint-disable-next-line @typescript-eslint/no-explicit-any | ||
...args) => { | ||
isRunning = false; | ||
finishedD.resolve(SYMBOL_FINISHED); | ||
return originalReturn(...args); | ||
}; | ||
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion | ||
const originalThrow = asyncIterableIterator.throw.bind(asyncIterableIterator); | ||
asyncIterableIterator.throw = (err) => { | ||
isRunning = false; | ||
finishedD.resolve(err); | ||
return originalThrow(err); | ||
}; | ||
state = { | ||
type: "finished" /* finished */ | ||
}; | ||
next.resolve(); | ||
}, (error) => { | ||
if (state.type !== "running" /* running */) { | ||
return; | ||
} | ||
state = { | ||
type: "error" /* error */, | ||
error | ||
}; | ||
next.resolve(); | ||
}); | ||
return { | ||
pushValue, | ||
asyncIterableIterator | ||
asyncIterableIterator: stream | ||
}; | ||
} | ||
//# sourceMappingURL=makePushPullAsyncIterableIterator.js.map |
{ | ||
"name": "@n1ru4l/push-pull-async-iterable-iterator", | ||
"version": "3.1.0", | ||
"version": "3.2.0", | ||
"repository": "https://github.com/n1ru4l/push-pull-async-iterable-iterator", | ||
@@ -5,0 +5,0 @@ "author": { |
@@ -155,1 +155,75 @@ # `@n1ru4l/push-pull-async-iterable-iterator` | ||
``` | ||
## Operators | ||
This package also ships a few utilities that make your life easier! | ||
### `map` | ||
Map a source | ||
```ts | ||
import { map } from "@n1ru4l/push-pull-async-iterable-iterator"; | ||
async function* source() { | ||
yield 1; | ||
yield 2; | ||
yield 3; | ||
} | ||
const square = map((value: number): number => value * value); | ||
for await (const value of square(source())) { | ||
console.log(value); | ||
} | ||
// logs 1, 4, 9 | ||
``` | ||
### `filter` | ||
Filter a source | ||
```ts | ||
import { filter } from "@n1ru4l/push-pull-async-iterable-iterator"; | ||
async function* source() { | ||
yield 1; | ||
yield 2; | ||
yield 3; | ||
} | ||
const biggerThan1 = filter((value: number): number => value > 1); | ||
for await (const value of biggerThan1(source())) { | ||
console.log(value); | ||
} | ||
// logs 2, 3 | ||
``` | ||
## Other helpers | ||
### `withHandlers` | ||
Attach a return and throw handler to a source. | ||
```ts | ||
import { withReturn } from "@n1ru4l/push-pull-async-iterable-iterator"; | ||
async function* source() { | ||
yield 1; | ||
yield 2; | ||
yield 3; | ||
} | ||
const sourceInstance = source(); | ||
const newSourceWithHandlers = withHandlers( | ||
sourceInstance, | ||
() => sourceInstance.return(), | ||
err => sourceInstance.throw(err) | ||
); | ||
for await (const value of stream) { | ||
// ... | ||
} | ||
``` |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
35685
34
617
229