rxjs-exhaustmap-with-trailing
Advanced tools
Comparing version 2.0.0 to 2.1.0
@@ -6,5 +6,4 @@ import { Observable, ObservableInput, OperatorFunction } from "rxjs"; | ||
* Original code adapted from https://github.com/ReactiveX/rxjs/issues/5004 | ||
* @param {function(value: T, ?index: number): ObservableInput} project A function | ||
* that, when applied to an item emitted by the source Observable, returns an | ||
* Observable. | ||
* @param {function<T, K>(value: T, ?index: number): ObservableInput<K>} project - A function that, when applied to an item emitted by the | ||
* source Observable, returns a projected Observable. | ||
*/ | ||
@@ -11,0 +10,0 @@ export declare function exhaustMapWithTrailing<T, R>(project: (value: T, index: number) => ObservableInput<R>): OperatorFunction<T, R>; |
@@ -10,5 +10,4 @@ "use strict"; | ||
* Original code adapted from https://github.com/ReactiveX/rxjs/issues/5004 | ||
* @param {function(value: T, ?index: number): ObservableInput} project A function | ||
* that, when applied to an item emitted by the source Observable, returns an | ||
* Observable. | ||
* @param {function<T, K>(value: T, ?index: number): ObservableInput<K>} project - A function that, when applied to an item emitted by the | ||
* source Observable, returns a projected Observable. | ||
*/ | ||
@@ -21,3 +20,3 @@ function exhaustMapWithTrailing(project) { | ||
trailing: true, | ||
}), operators_1.exhaustMap((value, index) => rxjs_1.from(project(value, index)).pipe(operators_1.finalize(() => { | ||
}), operators_1.exhaustMap((value, index) => rxjs_1.scheduled(project(value, index), rxjs_1.asapScheduler).pipe(operators_1.finalize(() => { | ||
release.next(); | ||
@@ -24,0 +23,0 @@ })))); |
@@ -6,5 +6,4 @@ import { Observable, ObservableInput, OperatorFunction } from "rxjs"; | ||
* Original code adapted from https://github.com/ReactiveX/rxjs/issues/5004 | ||
* @param {function(value: T, ?index: number): ObservableInput} project A function | ||
* that, when applied to an item emitted by the source Observable, returns an | ||
* Observable. | ||
* @param {function<T, K>(value: T, ?index: number): ObservableInput<K>} project - A function that, when applied to an item emitted by the | ||
* source Observable, returns a projected Observable. | ||
*/ | ||
@@ -11,0 +10,0 @@ export declare function exhaustMapWithTrailing<T, R>(project: (value: T, index: number) => ObservableInput<R>): OperatorFunction<T, R>; |
@@ -1,2 +0,2 @@ | ||
import { defer, from, Subject, } from "rxjs"; | ||
import { asapScheduler, defer, scheduled, Subject, } from "rxjs"; | ||
import { exhaustMap, finalize, throttle } from "rxjs/operators"; | ||
@@ -7,5 +7,4 @@ /** | ||
* Original code adapted from https://github.com/ReactiveX/rxjs/issues/5004 | ||
* @param {function(value: T, ?index: number): ObservableInput} project A function | ||
* that, when applied to an item emitted by the source Observable, returns an | ||
* Observable. | ||
* @param {function<T, K>(value: T, ?index: number): ObservableInput<K>} project - A function that, when applied to an item emitted by the | ||
* source Observable, returns a projected Observable. | ||
*/ | ||
@@ -18,3 +17,3 @@ export function exhaustMapWithTrailing(project) { | ||
trailing: true, | ||
}), exhaustMap((value, index) => from(project(value, index)).pipe(finalize(() => { | ||
}), exhaustMap((value, index) => scheduled(project(value, index), asapScheduler).pipe(finalize(() => { | ||
release.next(); | ||
@@ -21,0 +20,0 @@ })))); |
10
index.ts
import { | ||
asapScheduler, | ||
defer, | ||
from, | ||
Observable, | ||
ObservableInput, | ||
OperatorFunction, | ||
scheduled, | ||
Subject, | ||
@@ -15,5 +16,4 @@ } from "rxjs" | ||
* Original code adapted from https://github.com/ReactiveX/rxjs/issues/5004 | ||
* @param {function(value: T, ?index: number): ObservableInput} project A function | ||
* that, when applied to an item emitted by the source Observable, returns an | ||
* Observable. | ||
* @param {function<T, K>(value: T, ?index: number): ObservableInput<K>} project - A function that, when applied to an item emitted by the | ||
* source Observable, returns a projected Observable. | ||
*/ | ||
@@ -32,3 +32,3 @@ export function exhaustMapWithTrailing<T, R>( | ||
exhaustMap((value, index) => | ||
from(project(value, index)).pipe( | ||
scheduled(project(value, index), asapScheduler).pipe( | ||
finalize(() => { | ||
@@ -35,0 +35,0 @@ release.next() |
{ | ||
"name": "rxjs-exhaustmap-with-trailing", | ||
"version": "2.0.0", | ||
"version": "2.1.0", | ||
"main": "dist/cjs/index.js", | ||
@@ -5,0 +5,0 @@ "module": "dist/esm/index.js", |
22
test.ts
import {TestScheduler} from "rxjs/testing" | ||
import {delay} from "rxjs/operators" | ||
import {exhaustMapWithTrailing, exhaustMapToWithTrailing} from "./" | ||
import {of} from "rxjs" | ||
import {delay, mapTo, toArray} from "rxjs/operators" | ||
import {exhaustMapToWithTrailing, exhaustMapWithTrailing} from "./index" | ||
import {concat, EMPTY, firstValueFrom, of, timer} from "rxjs" | ||
import {RunHelpers} from "rxjs/internal/testing/TestScheduler" | ||
@@ -144,2 +144,18 @@ | ||
it("sync projected values", async () => { | ||
const values = concat( | ||
timer(5).pipe(mapTo("async")), | ||
timer(10).pipe(mapTo("sync")), | ||
timer(10).pipe(mapTo("empty")), | ||
timer(15).pipe(mapTo("async")) | ||
).pipe( | ||
exhaustMapWithTrailing((x) => | ||
x === "empty" ? EMPTY : x === "sync" ? of(x) : timer(1).pipe(mapTo(x)) | ||
), | ||
toArray() | ||
) | ||
expect(await firstValueFrom(values)).toEqual(["async", "sync", "async"]) | ||
}) | ||
function runTest(callback: (helpers: RunHelpers) => void) { | ||
@@ -146,0 +162,0 @@ return new TestScheduler((actual, expected) => { |
Sorry, the diff of this file is not supported yet
21761
309