rxjs-mergemap-array
Advanced tools
Comparing version
@@ -9,4 +9,4 @@ import { share, scan, mergeMap, filter, takeUntil, merge } from "rxjs"; | ||
function mergeMapArray(project, isEqual = (a, b) => a === b) { | ||
return (source) => { | ||
const source$ = source.pipe(share()), state$ = source.pipe( | ||
return (input) => { | ||
const sharedInput = input.pipe(share()), state$ = sharedInput.pipe( | ||
scan((state, next) => { | ||
@@ -24,12 +24,12 @@ const added = next.filter( | ||
}, INITIAL_STATE) | ||
).pipe(share()), added$ = state$.pipe(mergeMap((state) => state.removed)), removed$ = state$.pipe(mergeMap((state) => state.added)), empty = source.pipe( | ||
).pipe(share()), removed$ = state$.pipe(mergeMap((state) => state.removed)), added$ = state$.pipe(mergeMap((state) => state.added)), empty = sharedInput.pipe( | ||
filter((arr) => arr.length === 0), | ||
map(() => EMPTY_ARRAY) | ||
), mapped = removed$.pipe( | ||
), mapped = added$.pipe( | ||
mergeMap( | ||
(item) => project(item).pipe( | ||
takeUntil(added$.pipe(filter((k) => isEqual(k, item)))), | ||
map((projected) => [item, projected]), | ||
map(([item2, projected]) => ({ | ||
item: item2, | ||
(element) => project(element).pipe( | ||
takeUntil(removed$.pipe(filter((k) => isEqual(k, element)))), | ||
map((projected) => [element, projected]), | ||
map(([inputElement, projected]) => ({ | ||
element: inputElement, | ||
projected | ||
@@ -39,5 +39,5 @@ })) | ||
), | ||
withLatestFrom(source$), | ||
withLatestFrom(sharedInput), | ||
scan( | ||
(acc, [next, input]) => input.map((item) => isEqual(item, next.item) ? { item, emitted: !0, value: next.projected } : acc.find((v) => v && isEqual(v.item, item))), | ||
(acc, [next, inputArray]) => inputArray.map((item) => isEqual(item, next.element) ? { item, emitted: !0, value: next.projected } : acc.find((v) => v && isEqual(v.item, item))), | ||
[] | ||
@@ -44,0 +44,0 @@ ), |
{ | ||
"name": "rxjs-mergemap-array", | ||
"version": "0.0.1", | ||
"description": "", | ||
"version": "0.0.2", | ||
"description": "An RxJS map operator that takes an observable of arrays as input and emits arrays where each element represents emissions from the projected observable.", | ||
"keywords": [ | ||
@@ -6,0 +6,0 @@ "rxjs", |
@@ -36,5 +36,5 @@ # rxjs-mergemap-array | ||
```ts | ||
declare function mergeMapArray<T, R>(project: (item: T) => Observable<R>, isEqual?: (a: T, b: T) => boolean): OperatorFunction<T[], R[]> | ||
``` | ||
mergeMapArray<T, R>(project: (item: T) => Observable<R>, isEqual?: (a: T, b: T) => boolean): OperatorFunction<T[], R[]> | ||
``` | ||
@@ -41,0 +41,0 @@ ## License |
@@ -1,13 +0,13 @@ | ||
import {delay, toArray} from "rxjs/operators" | ||
import {concat, firstValueFrom, of, Subject, timer} from "rxjs" | ||
import {concat, firstValueFrom, of, Subject, timer} from 'rxjs' | ||
import {delay, toArray} from 'rxjs/operators' | ||
import {describe, expect, it} from 'vitest' | ||
import {describe, expect, it} from "vitest" | ||
import {mergeMapArray} from "./mergeMapArray" | ||
import {mergeMapArray} from './mergeMapArray' | ||
describe("mergeMapArray()", () => { | ||
it("works with an empty array as input", async () => { | ||
describe('mergeMapArray()', () => { | ||
it('works with an empty array as input', async () => { | ||
const subject = new Subject<{id: number}[]>() | ||
const observable = subject.asObservable().pipe( | ||
mergeMapArray((item) => timer(1000)), | ||
mergeMapArray(() => timer(1000)), | ||
toArray(), | ||
@@ -25,3 +25,3 @@ ) | ||
it("orders the output array based on the input array", async () => { | ||
it('orders the output array based on the input array', async () => { | ||
const one = {id: 1} | ||
@@ -106,3 +106,3 @@ const two = {id: 2} | ||
it("works with duplicate elements", async () => { | ||
it('works with duplicate elements', async () => { | ||
const one = {id: 1} | ||
@@ -116,6 +116,3 @@ const two = {id: 2} | ||
mergeMapArray((item) => { | ||
return concat( | ||
of(`id=${item.id} #1`), | ||
of(`id=${item.id} #2`).pipe(delay(20)), | ||
) | ||
return concat(of(`id=${item.id} #1`), of(`id=${item.id} #2`).pipe(delay(20))) | ||
}), | ||
@@ -151,3 +148,3 @@ toArray(), | ||
it("works with multiple emissions", async () => { | ||
it('works with multiple emissions', async () => { | ||
const one = {id: 1} | ||
@@ -161,6 +158,3 @@ const two = {id: 2} | ||
mergeMapArray((item) => { | ||
return concat( | ||
of(`id=${item.id} #1`), | ||
of(`id=${item.id} #2`).pipe(delay(20)), | ||
) | ||
return concat(of(`id=${item.id} #1`), of(`id=${item.id} #2`).pipe(delay(20))) | ||
}), | ||
@@ -205,3 +199,3 @@ toArray(), | ||
it("supports custom isEqual", async () => { | ||
it('supports custom isEqual', async () => { | ||
const one = {id: 1} | ||
@@ -208,0 +202,0 @@ const anotherone = {id: 1} |
@@ -10,3 +10,3 @@ import {filter, merge, mergeMap, Observable, scan, share, takeUntil} from 'rxjs' | ||
const INITIAL_STATE: State<any> = { | ||
const INITIAL_STATE: State<never> = { | ||
current: [], | ||
@@ -30,5 +30,5 @@ added: [], | ||
): (source: Observable<T[]>) => Observable<R[]> { | ||
return (source: Observable<T[]>): Observable<R[]> => { | ||
const source$ = source.pipe(share()) | ||
const state$ = source | ||
return (input: Observable<T[]>): Observable<R[]> => { | ||
const sharedInput = input.pipe(share()) | ||
const state$ = sharedInput | ||
.pipe( | ||
@@ -54,9 +54,9 @@ scan((state: State<T>, next: T[]) => { | ||
// emits elements as they are added to the input array | ||
const added$ = state$.pipe(mergeMap((state) => state.removed)) | ||
const removed$ = state$.pipe(mergeMap((state) => state.removed)) | ||
// emits elements as they are removed from the input array | ||
const removed$ = state$.pipe(mergeMap((state) => state.added)) | ||
const added$ = state$.pipe(mergeMap((state) => state.added)) | ||
// special case for empty input array since it won't trigger any emission on the "add element" stream | ||
const empty = source.pipe( | ||
const empty = sharedInput.pipe( | ||
filter((arr) => arr.length === 0), | ||
@@ -66,9 +66,9 @@ map(() => EMPTY_ARRAY), | ||
const mapped = removed$.pipe( | ||
mergeMap((item) => | ||
project(item).pipe( | ||
takeUntil(added$.pipe(filter((k) => isEqual(k, item)))), | ||
map((projected): [T, R] => [item, projected]), | ||
map(([item, projected]): {item: T; projected: R} => ({ | ||
item, | ||
const mapped = added$.pipe( | ||
mergeMap((element) => | ||
project(element).pipe( | ||
takeUntil(removed$.pipe(filter((k) => isEqual(k, element)))), | ||
map((projected): [T, R] => [element, projected]), | ||
map(([inputElement, projected]): {element: T; projected: R} => ({ | ||
element: inputElement, | ||
projected, | ||
@@ -78,7 +78,7 @@ })), | ||
), | ||
withLatestFrom(source$), | ||
withLatestFrom(sharedInput), | ||
scan( | ||
(acc: (undefined | {item: T; emitted: boolean; value: R})[], [next, input]) => | ||
input.map((item) => { | ||
if (isEqual(item, next.item)) { | ||
(acc: (undefined | {item: T; emitted: boolean; value: R})[], [next, inputArray]) => | ||
inputArray.map((item) => { | ||
if (isEqual(item, next.element)) { | ||
return {item, emitted: true, value: next.projected} | ||
@@ -85,0 +85,0 @@ } |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
32750
1.28%441
-1.34%