🚀 Big News: Socket Acquires Coana to Bring Reachability Analysis to Every Appsec Team.Learn more
Socket
DemoInstallSign in
Socket

rxjs-mergemap-array

Package Overview
Dependencies
Maintainers
0
Versions
3
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

rxjs-mergemap-array - npm Package Compare versions

Comparing version

to
0.0.2

22

dist/index.js

@@ -9,4 +9,4 @@ import { share, scan, mergeMap, filter, takeUntil, merge } from "rxjs";

function mergeMapArray(project, isEqual = (a, b) => a === b) {
return (source) => {
const source$ = source.pipe(share()), state$ = source.pipe(
return (input) => {
const sharedInput = input.pipe(share()), state$ = sharedInput.pipe(
scan((state, next) => {

@@ -24,12 +24,12 @@ const added = next.filter(

}, INITIAL_STATE)
).pipe(share()), added$ = state$.pipe(mergeMap((state) => state.removed)), removed$ = state$.pipe(mergeMap((state) => state.added)), empty = source.pipe(
).pipe(share()), removed$ = state$.pipe(mergeMap((state) => state.removed)), added$ = state$.pipe(mergeMap((state) => state.added)), empty = sharedInput.pipe(
filter((arr) => arr.length === 0),
map(() => EMPTY_ARRAY)
), mapped = removed$.pipe(
), mapped = added$.pipe(
mergeMap(
(item) => project(item).pipe(
takeUntil(added$.pipe(filter((k) => isEqual(k, item)))),
map((projected) => [item, projected]),
map(([item2, projected]) => ({
item: item2,
(element) => project(element).pipe(
takeUntil(removed$.pipe(filter((k) => isEqual(k, element)))),
map((projected) => [element, projected]),
map(([inputElement, projected]) => ({
element: inputElement,
projected

@@ -39,5 +39,5 @@ }))

),
withLatestFrom(source$),
withLatestFrom(sharedInput),
scan(
(acc, [next, input]) => input.map((item) => isEqual(item, next.item) ? { item, emitted: !0, value: next.projected } : acc.find((v) => v && isEqual(v.item, item))),
(acc, [next, inputArray]) => inputArray.map((item) => isEqual(item, next.element) ? { item, emitted: !0, value: next.projected } : acc.find((v) => v && isEqual(v.item, item))),
[]

@@ -44,0 +44,0 @@ ),

{
"name": "rxjs-mergemap-array",
"version": "0.0.1",
"description": "",
"version": "0.0.2",
"description": "An RxJS map operator that takes an observable of arrays as input and emits arrays where each element represents emissions from the projected observable.",
"keywords": [

@@ -6,0 +6,0 @@ "rxjs",

@@ -36,5 +36,5 @@ # rxjs-mergemap-array

```ts
declare function mergeMapArray<T, R>(project: (item: T) => Observable<R>, isEqual?: (a: T, b: T) => boolean): OperatorFunction<T[], R[]>
```
mergeMapArray<T, R>(project: (item: T) => Observable<R>, isEqual?: (a: T, b: T) => boolean): OperatorFunction<T[], R[]>
```

@@ -41,0 +41,0 @@ ## License

@@ -1,13 +0,13 @@

import {delay, toArray} from "rxjs/operators"
import {concat, firstValueFrom, of, Subject, timer} from "rxjs"
import {concat, firstValueFrom, of, Subject, timer} from 'rxjs'
import {delay, toArray} from 'rxjs/operators'
import {describe, expect, it} from 'vitest'
import {describe, expect, it} from "vitest"
import {mergeMapArray} from "./mergeMapArray"
import {mergeMapArray} from './mergeMapArray'
describe("mergeMapArray()", () => {
it("works with an empty array as input", async () => {
describe('mergeMapArray()', () => {
it('works with an empty array as input', async () => {
const subject = new Subject<{id: number}[]>()
const observable = subject.asObservable().pipe(
mergeMapArray((item) => timer(1000)),
mergeMapArray(() => timer(1000)),
toArray(),

@@ -25,3 +25,3 @@ )

it("orders the output array based on the input array", async () => {
it('orders the output array based on the input array', async () => {
const one = {id: 1}

@@ -106,3 +106,3 @@ const two = {id: 2}

it("works with duplicate elements", async () => {
it('works with duplicate elements', async () => {
const one = {id: 1}

@@ -116,6 +116,3 @@ const two = {id: 2}

mergeMapArray((item) => {
return concat(
of(`id=${item.id} #1`),
of(`id=${item.id} #2`).pipe(delay(20)),
)
return concat(of(`id=${item.id} #1`), of(`id=${item.id} #2`).pipe(delay(20)))
}),

@@ -151,3 +148,3 @@ toArray(),

it("works with multiple emissions", async () => {
it('works with multiple emissions', async () => {
const one = {id: 1}

@@ -161,6 +158,3 @@ const two = {id: 2}

mergeMapArray((item) => {
return concat(
of(`id=${item.id} #1`),
of(`id=${item.id} #2`).pipe(delay(20)),
)
return concat(of(`id=${item.id} #1`), of(`id=${item.id} #2`).pipe(delay(20)))
}),

@@ -205,3 +199,3 @@ toArray(),

it("supports custom isEqual", async () => {
it('supports custom isEqual', async () => {
const one = {id: 1}

@@ -208,0 +202,0 @@ const anotherone = {id: 1}

@@ -10,3 +10,3 @@ import {filter, merge, mergeMap, Observable, scan, share, takeUntil} from 'rxjs'

const INITIAL_STATE: State<any> = {
const INITIAL_STATE: State<never> = {
current: [],

@@ -30,5 +30,5 @@ added: [],

): (source: Observable<T[]>) => Observable<R[]> {
return (source: Observable<T[]>): Observable<R[]> => {
const source$ = source.pipe(share())
const state$ = source
return (input: Observable<T[]>): Observable<R[]> => {
const sharedInput = input.pipe(share())
const state$ = sharedInput
.pipe(

@@ -54,9 +54,9 @@ scan((state: State<T>, next: T[]) => {

// emits elements as they are added to the input array
const added$ = state$.pipe(mergeMap((state) => state.removed))
const removed$ = state$.pipe(mergeMap((state) => state.removed))
// emits elements as they are removed from the input array
const removed$ = state$.pipe(mergeMap((state) => state.added))
const added$ = state$.pipe(mergeMap((state) => state.added))
// special case for empty input array since it won't trigger any emission on the "add element" stream
const empty = source.pipe(
const empty = sharedInput.pipe(
filter((arr) => arr.length === 0),

@@ -66,9 +66,9 @@ map(() => EMPTY_ARRAY),

const mapped = removed$.pipe(
mergeMap((item) =>
project(item).pipe(
takeUntil(added$.pipe(filter((k) => isEqual(k, item)))),
map((projected): [T, R] => [item, projected]),
map(([item, projected]): {item: T; projected: R} => ({
item,
const mapped = added$.pipe(
mergeMap((element) =>
project(element).pipe(
takeUntil(removed$.pipe(filter((k) => isEqual(k, element)))),
map((projected): [T, R] => [element, projected]),
map(([inputElement, projected]): {element: T; projected: R} => ({
element: inputElement,
projected,

@@ -78,7 +78,7 @@ })),

),
withLatestFrom(source$),
withLatestFrom(sharedInput),
scan(
(acc: (undefined | {item: T; emitted: boolean; value: R})[], [next, input]) =>
input.map((item) => {
if (isEqual(item, next.item)) {
(acc: (undefined | {item: T; emitted: boolean; value: R})[], [next, inputArray]) =>
inputArray.map((item) => {
if (isEqual(item, next.element)) {
return {item, emitted: true, value: next.projected}

@@ -85,0 +85,0 @@ }

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet