@react-rxjs/utils
Advanced tools
Comparing version
@@ -1,4 +0,8 @@ | ||
export { useSubscribe } from "./useSubscribe"; | ||
export { Subscribe } from "./Subscribe"; | ||
export { groupInMap } from "./groupInMap"; | ||
export { collectValues } from "./collectValues"; | ||
export { collect } from "./collect"; | ||
export { mergeWithKey } from "./mergeWithKey"; | ||
export { split } from "./split"; | ||
export { suspend } from "./suspend"; | ||
export { suspended } from "./suspended"; | ||
export { switchMapSuspended } from "./switchMapSuspended"; | ||
export { selfDependant } from "./selfDependant"; |
@@ -5,65 +5,144 @@ 'use strict'; | ||
function _interopDefault (ex) { return (ex && (typeof ex === 'object') && 'default' in ex) ? ex['default'] : ex; } | ||
var React = require('react'); | ||
var React__default = _interopDefault(React); | ||
var operators = require('rxjs/operators'); | ||
var rxjs = require('rxjs'); | ||
var operators = require('rxjs/operators'); | ||
var core = require('@react-rxjs/core'); | ||
var defaultStart = function defaultStart(value) { | ||
return function (source$) { | ||
return new rxjs.Observable(function (observer) { | ||
var emitted = false; | ||
var subscription = source$.subscribe(function (x) { | ||
emitted = true; | ||
observer.next(x); | ||
}, function (e) { | ||
return observer.error(e); | ||
}, function () { | ||
return observer.complete(); | ||
}); | ||
if (!emitted) { | ||
observer.next(value); | ||
} | ||
return subscription; | ||
}); | ||
}; | ||
}; | ||
var scanWithDefaultValue = function scanWithDefaultValue(accumulator, getSeed) { | ||
return function (source) { | ||
return rxjs.defer(function () { | ||
var seed = getSeed(); | ||
return source.pipe(operators.scan(accumulator, seed), defaultStart(seed)); | ||
}); | ||
}; | ||
}; | ||
var set = "s"; | ||
var del = "d"; | ||
var complete = "c"; | ||
var collector = function collector(source, enhancer) { | ||
return source.pipe(operators.publish(function (x) { | ||
return x.pipe(operators.mergeMap(enhancer), operators.takeUntil(operators.takeLast(1)(x))); | ||
}), operators.endWith({ | ||
t: complete | ||
}), scanWithDefaultValue(function (acc, val) { | ||
if (val.t === set) { | ||
acc.set(val.k, val.v); | ||
} else if (val.t === del) { | ||
acc["delete"](val.k); | ||
} else { | ||
acc.clear(); | ||
} | ||
return acc; | ||
}, function () { | ||
return new Map(); | ||
}), core.shareLatest()); | ||
}; | ||
/** | ||
* A React hook that creates a subscription to the provided observable once the | ||
* component mounts and it unsubscribes when the component unmounts. | ||
* A pipeable operator that collects all the GroupedObservables emitted by | ||
* the source and emits a Map with the latest values of the inner observables. | ||
*/ | ||
var collectValues = function collectValues() { | ||
return function (source$) { | ||
return collector(source$, function (inner$) { | ||
return inner$.pipe(operators.map(function (v) { | ||
return { | ||
t: set, | ||
k: inner$.key, | ||
v: v | ||
}; | ||
}), operators.endWith({ | ||
t: del, | ||
k: inner$.key | ||
})); | ||
}); | ||
}; | ||
}; | ||
var defaultFilter = function defaultFilter(source$) { | ||
return source$.pipe(operators.ignoreElements(), operators.startWith(true), operators.endWith(false)); | ||
}; | ||
/** | ||
* A pipeable operator that collects all the GroupedObservables emitted by | ||
* the source and emits a Map with the active inner observables | ||
* | ||
* @param source$ Source observable that the hook will subscribe to. | ||
* @param unsubscribeGraceTime (= 200): Amount of time in ms that the hook | ||
* should wait before unsubscribing from the source observable after it unmounts. | ||
* @returns void | ||
* | ||
* @remarks This hook doesn't trigger any updates. | ||
* @param filter? A function that receives the inner Observable and returns an | ||
* Observable of boolean values, which indicates whether the inner observable | ||
* should be collected. | ||
*/ | ||
var useSubscribe = function useSubscribe(source$, unsubscribeGraceTime) { | ||
if (unsubscribeGraceTime === void 0) { | ||
unsubscribeGraceTime = 200; | ||
} | ||
React.useEffect(function () { | ||
var subscription = source$.subscribe(); | ||
return function () { | ||
if (unsubscribeGraceTime === 0) { | ||
return subscription.unsubscribe(); | ||
} | ||
setTimeout(function () { | ||
subscription.unsubscribe(); | ||
}, unsubscribeGraceTime); | ||
}; | ||
}, [source$, unsubscribeGraceTime]); | ||
var collect = function collect(filter) { | ||
var enhancer = filter ? function (source$) { | ||
return filter(source$).pipe(operators.endWith(false), operators.skipWhile(function (x) { | ||
return !x; | ||
}), operators.distinctUntilChanged()); | ||
} : defaultFilter; | ||
return function (source$) { | ||
return collector(source$, function (o) { | ||
return operators.map(function (x) { | ||
return { | ||
t: x ? set : del, | ||
k: o.key, | ||
v: o | ||
}; | ||
})(enhancer(o)); | ||
}); | ||
}; | ||
}; | ||
/** | ||
* A React Component that creates a subscription to the provided observable once | ||
* the component mounts and it unsubscribes when the component unmounts. | ||
* Emits the values from all the streams of the provided object, in a result | ||
* which provides the key of the stream of that emission. | ||
* | ||
* @param source$ Source observable that the Component will subscribe to. | ||
* @param graceTime (= 200): Amount of time in ms that the Component should wait | ||
* before unsubscribing from the source observable after it unmounts. | ||
* | ||
* @remarks This Component doesn't trigger any updates. | ||
* @param input object of streams | ||
*/ | ||
var Subscribe = function Subscribe(_ref) { | ||
var source$ = _ref.source$, | ||
graceTime = _ref.graceTime, | ||
children = _ref.children; | ||
useSubscribe(source$, graceTime); | ||
return React__default.createElement(React__default.Fragment, null, children); | ||
var mergeWithKey = function mergeWithKey(input) { | ||
for (var _len = arguments.length, optionalArgs = new Array(_len > 1 ? _len - 1 : 0), _key = 1; _key < _len; _key++) { | ||
optionalArgs[_key - 1] = arguments[_key]; | ||
} | ||
return rxjs.merge.apply(void 0, Object.entries(input).map(function (_ref) { | ||
var type = _ref[0], | ||
stream = _ref[1]; | ||
return rxjs.from(stream).pipe(operators.map(function (payload) { | ||
return { | ||
type: type, | ||
payload: payload | ||
}; | ||
})); | ||
}).concat(optionalArgs)); | ||
}; | ||
var continuousGroupBy = function continuousGroupBy(mapper) { | ||
var emptyError = {}; | ||
function split(keySelector, streamSelector) { | ||
return function (stream) { | ||
return new rxjs.Observable(function (subscriber) { | ||
var groups = new Map(); | ||
return stream.subscribe(function (x) { | ||
var key = mapper(x); | ||
var error = emptyError; | ||
var sub = stream.subscribe(function (x) { | ||
var key = keySelector(x); | ||
@@ -74,55 +153,59 @@ if (groups.has(key)) { | ||
var subject = new rxjs.BehaviorSubject(x); | ||
var subject = streamSelector ? new rxjs.Subject() : new rxjs.ReplaySubject(1); | ||
groups.set(key, subject); | ||
var res = subject.pipe(operators.finalize(function () { | ||
return groups["delete"](key); | ||
}), operators.share()); | ||
var res = streamSelector ? streamSelector(subject, key).pipe(operators.shareReplay(1)) : subject.asObservable(); | ||
res.key = key; | ||
res.subscribe({ | ||
complete: function complete() { | ||
groups["delete"](key); | ||
} | ||
}); | ||
subject.next(x); | ||
subscriber.next(res); | ||
}, function (e) { | ||
subscriber.error(e); | ||
/* istanbul ignore next */ | ||
groups.forEach(function (g) { | ||
return g.error(e); | ||
}); | ||
subscriber.error(error = e); | ||
}, function () { | ||
subscriber.complete(); | ||
/* istanbul ignore next */ | ||
groups.forEach(function (g) { | ||
}); | ||
return function () { | ||
sub.unsubscribe(); | ||
groups.forEach(error === emptyError ? function (g) { | ||
return g.complete(); | ||
} : function (g) { | ||
return g.error(error); | ||
}); | ||
}); | ||
}; | ||
}); | ||
}; | ||
} | ||
/** | ||
* A RxJS creation operator that prepends a SUSPENSE on the source observable. | ||
* | ||
* @param source$ Source observable | ||
*/ | ||
var suspend = function suspend(source$) { | ||
return rxjs.from(source$).pipe(operators.startWith(core.SUSPENSE)); | ||
}; | ||
var DELETE = /*#__PURE__*/Symbol("DELETE"); | ||
/** | ||
* A pipeable operator that groups all values by key and emits a Map that holds | ||
* the latest value for each key. | ||
* A RxJS pipeable operator that prepends a SUSPENSE on the source observable. | ||
*/ | ||
var suspended = function suspended() { | ||
return suspend; | ||
}; | ||
/** | ||
* Same behaviour as rxjs' `switchMap`, but prepending every new event with | ||
* SUSPENSE. | ||
* | ||
* @param keyGetter A function that extracts the key for each item. | ||
* @param projection Projection function for each group. | ||
* @param fn Projection function | ||
*/ | ||
var groupInMap = function groupInMap(keyGetter, projection) { | ||
return function (source$) { | ||
var res = new Map(); | ||
return rxjs.concat(source$.pipe(continuousGroupBy(keyGetter), operators.publish(function (multicasted$) { | ||
return multicasted$.pipe(operators.mergeMap(function (inner$) { | ||
return rxjs.concat(projection(inner$).pipe(operators.map(function (v) { | ||
return [inner$.key, v]; | ||
})), rxjs.of([inner$.key, DELETE])); | ||
}), operators.takeUntil(multicasted$.pipe(operators.takeLast(1)))); | ||
}), operators.scan(function (acc, _ref) { | ||
var key = _ref[0], | ||
value = _ref[1]; | ||
if (value !== DELETE) return acc.set(key, value); | ||
acc["delete"](key); | ||
return acc; | ||
}, res)), rxjs.defer(function () { | ||
res.clear(); | ||
return rxjs.of(res); | ||
var switchMapSuspended = function switchMapSuspended(fn) { | ||
return function (src$) { | ||
return src$.pipe(operators.switchMap(function (x) { | ||
return suspend(fn(x)); | ||
})); | ||
@@ -133,29 +216,25 @@ }; | ||
/** | ||
* Emits the values from all the streams of the provided object, in a result | ||
* which provides the key of the stream of that emission. | ||
* A creation operator that helps at creating observables that have circular | ||
* dependencies | ||
* | ||
* @param input object of streams | ||
* @returns [1, 2] | ||
* 1. The inner subject as an Observable | ||
* 2. A pipable operator that taps into the inner Subject | ||
*/ | ||
var mergeWithKey = function mergeWithKey(input) { | ||
for (var _len = arguments.length, optionalArgs = new Array(_len > 1 ? _len - 1 : 0), _key = 1; _key < _len; _key++) { | ||
optionalArgs[_key - 1] = arguments[_key]; | ||
} | ||
return rxjs.merge.apply(void 0, Object.entries(input).map(function (_ref) { | ||
var type = _ref[0], | ||
stream = _ref[1]; | ||
return rxjs.from(stream).pipe(operators.map(function (payload) { | ||
return { | ||
type: type, | ||
payload: payload | ||
}; | ||
})); | ||
}).concat(optionalArgs)); | ||
var selfDependant = function selfDependant() { | ||
var mirrored$ = new rxjs.Subject(); | ||
return [mirrored$.asObservable(), function () { | ||
return operators.tap(mirrored$); | ||
}]; | ||
}; | ||
exports.Subscribe = Subscribe; | ||
exports.groupInMap = groupInMap; | ||
exports.collect = collect; | ||
exports.collectValues = collectValues; | ||
exports.mergeWithKey = mergeWithKey; | ||
exports.useSubscribe = useSubscribe; | ||
exports.selfDependant = selfDependant; | ||
exports.split = split; | ||
exports.suspend = suspend; | ||
exports.suspended = suspended; | ||
exports.switchMapSuspended = switchMapSuspended; | ||
//# sourceMappingURL=utils.cjs.development.js.map |
@@ -1,2 +0,2 @@ | ||
"use strict";Object.defineProperty(exports,"__esModule",{value:!0});var e,r=require("react"),n=(e=r)&&"object"==typeof e&&"default"in e?e.default:e,t=require("rxjs"),u=require("rxjs/operators"),i=function(e,n){void 0===n&&(n=200),r.useEffect((function(){var r=e.subscribe();return function(){if(0===n)return r.unsubscribe();setTimeout((function(){r.unsubscribe()}),n)}}),[e,n])},o=Symbol("DELETE");exports.Subscribe=function(e){var r=e.children;return i(e.source$,e.graceTime),n.createElement(n.Fragment,null,r)},exports.groupInMap=function(e,r){return function(n){var i,c=new Map;return t.concat(n.pipe((i=e,function(e){return new t.Observable((function(r){var n=new Map;return e.subscribe((function(e){var o=i(e);if(n.has(o))return n.get(o).next(e);var c=new t.BehaviorSubject(e);n.set(o,c);var a=c.pipe(u.finalize((function(){return n.delete(o)})),u.share());a.key=o,r.next(a)}),(function(e){r.error(e),n.forEach((function(r){return r.error(e)}))}),(function(){r.complete(),n.forEach((function(e){return e.complete()}))}))}))}),u.publish((function(e){return e.pipe(u.mergeMap((function(e){return t.concat(r(e).pipe(u.map((function(r){return[e.key,r]}))),t.of([e.key,o]))})),u.takeUntil(e.pipe(u.takeLast(1))))})),u.scan((function(e,r){var n=r[0],t=r[1];return t!==o?e.set(n,t):(e.delete(n),e)}),c)),t.defer((function(){return c.clear(),t.of(c)})))}},exports.mergeWithKey=function(e){for(var r=arguments.length,n=new Array(r>1?r-1:0),i=1;i<r;i++)n[i-1]=arguments[i];return t.merge.apply(void 0,Object.entries(e).map((function(e){var r=e[0];return t.from(e[1]).pipe(u.map((function(e){return{type:r,payload:e}})))})).concat(n))},exports.useSubscribe=i; | ||
"use strict";Object.defineProperty(exports,"__esModule",{value:!0});var e=require("rxjs/operators"),n=require("rxjs"),t=require("@react-rxjs/core"),r=function(r,u){return r.pipe(e.publish((function(n){return n.pipe(e.mergeMap(u),e.takeUntil(e.takeLast(1)(n)))})),e.endWith({t:"c"}),(i=function(e,n){return"s"===n.t?e.set(n.k,n.v):"d"===n.t?e.delete(n.k):e.clear(),e},o=function(){return new Map},function(t){return n.defer((function(){var r,u=o();return t.pipe(e.scan(i,u),(r=u,function(e){return new n.Observable((function(n){var t=!1,u=e.subscribe((function(e){t=!0,n.next(e)}),(function(e){return n.error(e)}),(function(){return n.complete()}));return t||n.next(r),u}))}))}))}),t.shareLatest());var i,o},u=function(n){return n.pipe(e.ignoreElements(),e.startWith(!0),e.endWith(!1))},i={},o=function(r){return n.from(r).pipe(e.startWith(t.SUSPENSE))};exports.collect=function(n){var t=n?function(t){return n(t).pipe(e.endWith(!1),e.skipWhile((function(e){return!e})),e.distinctUntilChanged())}:u;return function(n){return r(n,(function(n){return e.map((function(e){return{t:e?"s":"d",k:n.key,v:n}}))(t(n))}))}},exports.collectValues=function(){return function(n){return r(n,(function(n){return n.pipe(e.map((function(e){return{t:"s",k:n.key,v:e}})),e.endWith({t:"d",k:n.key}))}))}},exports.mergeWithKey=function(t){for(var r=arguments.length,u=new Array(r>1?r-1:0),i=1;i<r;i++)u[i-1]=arguments[i];return n.merge.apply(void 0,Object.entries(t).map((function(t){var r=t[0];return n.from(t[1]).pipe(e.map((function(e){return{type:r,payload:e}})))})).concat(u))},exports.selfDependant=function(){var t=new n.Subject;return[t.asObservable(),function(){return e.tap(t)}]},exports.split=function(t,r){return function(u){return new n.Observable((function(o){var c=new Map,p=i,s=u.subscribe((function(u){var i=t(u);if(c.has(i))return c.get(i).next(u);var p=r?new n.Subject:new n.ReplaySubject(1);c.set(i,p);var s=r?r(p,i).pipe(e.shareReplay(1)):p.asObservable();s.key=i,s.subscribe({complete:function(){c.delete(i)}}),p.next(u),o.next(s)}),(function(e){o.error(p=e)}),(function(){o.complete()}));return function(){s.unsubscribe(),c.forEach(p===i?function(e){return e.complete()}:function(e){return e.error(p)})}}))}},exports.suspend=o,exports.suspended=function(){return o},exports.switchMapSuspended=function(n){return function(t){return t.pipe(e.switchMap((function(e){return o(n(e))})))}}; | ||
//# sourceMappingURL=utils.cjs.production.min.js.map |
@@ -1,61 +0,143 @@ | ||
import React, { useEffect } from 'react'; | ||
import { Observable, BehaviorSubject, concat, of, defer, merge, from } from 'rxjs'; | ||
import { finalize, share, publish, mergeMap, map, takeUntil, takeLast, scan } from 'rxjs/operators'; | ||
import { publish, mergeMap, takeUntil, takeLast, endWith, scan, map, skipWhile, distinctUntilChanged, ignoreElements, startWith, shareReplay, switchMap, tap } from 'rxjs/operators'; | ||
import { defer, Observable, merge, from, Subject, ReplaySubject } from 'rxjs'; | ||
import { shareLatest, SUSPENSE } from '@react-rxjs/core'; | ||
var defaultStart = function defaultStart(value) { | ||
return function (source$) { | ||
return new Observable(function (observer) { | ||
var emitted = false; | ||
var subscription = source$.subscribe(function (x) { | ||
emitted = true; | ||
observer.next(x); | ||
}, function (e) { | ||
return observer.error(e); | ||
}, function () { | ||
return observer.complete(); | ||
}); | ||
if (!emitted) { | ||
observer.next(value); | ||
} | ||
return subscription; | ||
}); | ||
}; | ||
}; | ||
var scanWithDefaultValue = function scanWithDefaultValue(accumulator, getSeed) { | ||
return function (source) { | ||
return defer(function () { | ||
var seed = getSeed(); | ||
return source.pipe(scan(accumulator, seed), defaultStart(seed)); | ||
}); | ||
}; | ||
}; | ||
var set = "s"; | ||
var del = "d"; | ||
var complete = "c"; | ||
var collector = function collector(source, enhancer) { | ||
return source.pipe(publish(function (x) { | ||
return x.pipe(mergeMap(enhancer), takeUntil(takeLast(1)(x))); | ||
}), endWith({ | ||
t: complete | ||
}), scanWithDefaultValue(function (acc, val) { | ||
if (val.t === set) { | ||
acc.set(val.k, val.v); | ||
} else if (val.t === del) { | ||
acc["delete"](val.k); | ||
} else { | ||
acc.clear(); | ||
} | ||
return acc; | ||
}, function () { | ||
return new Map(); | ||
}), shareLatest()); | ||
}; | ||
/** | ||
* A React hook that creates a subscription to the provided observable once the | ||
* component mounts and it unsubscribes when the component unmounts. | ||
* A pipeable operator that collects all the GroupedObservables emitted by | ||
* the source and emits a Map with the latest values of the inner observables. | ||
*/ | ||
var collectValues = function collectValues() { | ||
return function (source$) { | ||
return collector(source$, function (inner$) { | ||
return inner$.pipe(map(function (v) { | ||
return { | ||
t: set, | ||
k: inner$.key, | ||
v: v | ||
}; | ||
}), endWith({ | ||
t: del, | ||
k: inner$.key | ||
})); | ||
}); | ||
}; | ||
}; | ||
var defaultFilter = function defaultFilter(source$) { | ||
return source$.pipe(ignoreElements(), startWith(true), endWith(false)); | ||
}; | ||
/** | ||
* A pipeable operator that collects all the GroupedObservables emitted by | ||
* the source and emits a Map with the active inner observables | ||
* | ||
* @param source$ Source observable that the hook will subscribe to. | ||
* @param unsubscribeGraceTime (= 200): Amount of time in ms that the hook | ||
* should wait before unsubscribing from the source observable after it unmounts. | ||
* @returns void | ||
* | ||
* @remarks This hook doesn't trigger any updates. | ||
* @param filter? A function that receives the inner Observable and returns an | ||
* Observable of boolean values, which indicates whether the inner observable | ||
* should be collected. | ||
*/ | ||
var useSubscribe = function useSubscribe(source$, unsubscribeGraceTime) { | ||
if (unsubscribeGraceTime === void 0) { | ||
unsubscribeGraceTime = 200; | ||
} | ||
useEffect(function () { | ||
var subscription = source$.subscribe(); | ||
return function () { | ||
if (unsubscribeGraceTime === 0) { | ||
return subscription.unsubscribe(); | ||
} | ||
setTimeout(function () { | ||
subscription.unsubscribe(); | ||
}, unsubscribeGraceTime); | ||
}; | ||
}, [source$, unsubscribeGraceTime]); | ||
var collect = function collect(filter) { | ||
var enhancer = filter ? function (source$) { | ||
return filter(source$).pipe(endWith(false), skipWhile(function (x) { | ||
return !x; | ||
}), distinctUntilChanged()); | ||
} : defaultFilter; | ||
return function (source$) { | ||
return collector(source$, function (o) { | ||
return map(function (x) { | ||
return { | ||
t: x ? set : del, | ||
k: o.key, | ||
v: o | ||
}; | ||
})(enhancer(o)); | ||
}); | ||
}; | ||
}; | ||
/** | ||
* A React Component that creates a subscription to the provided observable once | ||
* the component mounts and it unsubscribes when the component unmounts. | ||
* Emits the values from all the streams of the provided object, in a result | ||
* which provides the key of the stream of that emission. | ||
* | ||
* @param source$ Source observable that the Component will subscribe to. | ||
* @param graceTime (= 200): Amount of time in ms that the Component should wait | ||
* before unsubscribing from the source observable after it unmounts. | ||
* | ||
* @remarks This Component doesn't trigger any updates. | ||
* @param input object of streams | ||
*/ | ||
var Subscribe = function Subscribe(_ref) { | ||
var source$ = _ref.source$, | ||
graceTime = _ref.graceTime, | ||
children = _ref.children; | ||
useSubscribe(source$, graceTime); | ||
return React.createElement(React.Fragment, null, children); | ||
var mergeWithKey = function mergeWithKey(input) { | ||
for (var _len = arguments.length, optionalArgs = new Array(_len > 1 ? _len - 1 : 0), _key = 1; _key < _len; _key++) { | ||
optionalArgs[_key - 1] = arguments[_key]; | ||
} | ||
return merge.apply(void 0, Object.entries(input).map(function (_ref) { | ||
var type = _ref[0], | ||
stream = _ref[1]; | ||
return from(stream).pipe(map(function (payload) { | ||
return { | ||
type: type, | ||
payload: payload | ||
}; | ||
})); | ||
}).concat(optionalArgs)); | ||
}; | ||
var continuousGroupBy = function continuousGroupBy(mapper) { | ||
var emptyError = {}; | ||
function split(keySelector, streamSelector) { | ||
return function (stream) { | ||
return new Observable(function (subscriber) { | ||
var groups = new Map(); | ||
return stream.subscribe(function (x) { | ||
var key = mapper(x); | ||
var error = emptyError; | ||
var sub = stream.subscribe(function (x) { | ||
var key = keySelector(x); | ||
@@ -66,55 +148,59 @@ if (groups.has(key)) { | ||
var subject = new BehaviorSubject(x); | ||
var subject = streamSelector ? new Subject() : new ReplaySubject(1); | ||
groups.set(key, subject); | ||
var res = subject.pipe(finalize(function () { | ||
return groups["delete"](key); | ||
}), share()); | ||
var res = streamSelector ? streamSelector(subject, key).pipe(shareReplay(1)) : subject.asObservable(); | ||
res.key = key; | ||
res.subscribe({ | ||
complete: function complete() { | ||
groups["delete"](key); | ||
} | ||
}); | ||
subject.next(x); | ||
subscriber.next(res); | ||
}, function (e) { | ||
subscriber.error(e); | ||
/* istanbul ignore next */ | ||
groups.forEach(function (g) { | ||
return g.error(e); | ||
}); | ||
subscriber.error(error = e); | ||
}, function () { | ||
subscriber.complete(); | ||
/* istanbul ignore next */ | ||
groups.forEach(function (g) { | ||
}); | ||
return function () { | ||
sub.unsubscribe(); | ||
groups.forEach(error === emptyError ? function (g) { | ||
return g.complete(); | ||
} : function (g) { | ||
return g.error(error); | ||
}); | ||
}); | ||
}; | ||
}); | ||
}; | ||
} | ||
/** | ||
* A RxJS creation operator that prepends a SUSPENSE on the source observable. | ||
* | ||
* @param source$ Source observable | ||
*/ | ||
var suspend = function suspend(source$) { | ||
return from(source$).pipe(startWith(SUSPENSE)); | ||
}; | ||
var DELETE = /*#__PURE__*/Symbol("DELETE"); | ||
/** | ||
* A pipeable operator that groups all values by key and emits a Map that holds | ||
* the latest value for each key. | ||
* A RxJS pipeable operator that prepends a SUSPENSE on the source observable. | ||
*/ | ||
var suspended = function suspended() { | ||
return suspend; | ||
}; | ||
/** | ||
* Same behaviour as rxjs' `switchMap`, but prepending every new event with | ||
* SUSPENSE. | ||
* | ||
* @param keyGetter A function that extracts the key for each item. | ||
* @param projection Projection function for each group. | ||
* @param fn Projection function | ||
*/ | ||
var groupInMap = function groupInMap(keyGetter, projection) { | ||
return function (source$) { | ||
var res = new Map(); | ||
return concat(source$.pipe(continuousGroupBy(keyGetter), publish(function (multicasted$) { | ||
return multicasted$.pipe(mergeMap(function (inner$) { | ||
return concat(projection(inner$).pipe(map(function (v) { | ||
return [inner$.key, v]; | ||
})), of([inner$.key, DELETE])); | ||
}), takeUntil(multicasted$.pipe(takeLast(1)))); | ||
}), scan(function (acc, _ref) { | ||
var key = _ref[0], | ||
value = _ref[1]; | ||
if (value !== DELETE) return acc.set(key, value); | ||
acc["delete"](key); | ||
return acc; | ||
}, res)), defer(function () { | ||
res.clear(); | ||
return of(res); | ||
var switchMapSuspended = function switchMapSuspended(fn) { | ||
return function (src$) { | ||
return src$.pipe(switchMap(function (x) { | ||
return suspend(fn(x)); | ||
})); | ||
@@ -125,26 +211,18 @@ }; | ||
/** | ||
* Emits the values from all the streams of the provided object, in a result | ||
* which provides the key of the stream of that emission. | ||
* A creation operator that helps at creating observables that have circular | ||
* dependencies | ||
* | ||
* @param input object of streams | ||
* @returns [1, 2] | ||
* 1. The inner subject as an Observable | ||
* 2. A pipable operator that taps into the inner Subject | ||
*/ | ||
var mergeWithKey = function mergeWithKey(input) { | ||
for (var _len = arguments.length, optionalArgs = new Array(_len > 1 ? _len - 1 : 0), _key = 1; _key < _len; _key++) { | ||
optionalArgs[_key - 1] = arguments[_key]; | ||
} | ||
return merge.apply(void 0, Object.entries(input).map(function (_ref) { | ||
var type = _ref[0], | ||
stream = _ref[1]; | ||
return from(stream).pipe(map(function (payload) { | ||
return { | ||
type: type, | ||
payload: payload | ||
}; | ||
})); | ||
}).concat(optionalArgs)); | ||
var selfDependant = function selfDependant() { | ||
var mirrored$ = new Subject(); | ||
return [mirrored$.asObservable(), function () { | ||
return tap(mirrored$); | ||
}]; | ||
}; | ||
export { Subscribe, groupInMap, mergeWithKey, useSubscribe }; | ||
export { collect, collectValues, mergeWithKey, selfDependant, split, suspend, suspended, switchMapSuspended }; | ||
//# sourceMappingURL=utils.esm.js.map |
{ | ||
"version": "0.1.1", | ||
"version": "0.2.0", | ||
"repository": { | ||
@@ -19,7 +19,8 @@ "type": "git", | ||
"test": "tsdx test --coverage", | ||
"test:ci": "cross-env CI=true tsdx test --coverage && codecov --disable=gcov", | ||
"lint": "tsdx lint", | ||
"lint": "prettier --check README.md && tsdx lint src", | ||
"format": "prettier --write README.md \"src/**/*.{js,jsx,ts,tsx,json,md}\"", | ||
"prepare": "tsdx build" | ||
}, | ||
"peerDependencies": { | ||
"@react-rxjs/core": ">=0.1.0", | ||
"react": ">=16.8.0", | ||
@@ -36,8 +37,8 @@ "rxjs": ">=6" | ||
"devDependencies": { | ||
"@testing-library/react": "^10.4.6", | ||
"@testing-library/react-hooks": "^3.3.0", | ||
"@types/jest": "^26.0.5", | ||
"@types/react": "^16.9.43", | ||
"@react-rxjs/core": "0.2.0", | ||
"@testing-library/react": "^10.4.9", | ||
"@testing-library/react-hooks": "^3.4.1", | ||
"@types/jest": "^26.0.10", | ||
"@types/react": "^16.9.48", | ||
"@types/react-dom": "^16.9.8", | ||
"codecov": "3.7.2", | ||
"jest-marbles": "^2.5.1", | ||
@@ -47,7 +48,7 @@ "react": "^16.13.1", | ||
"react-test-renderer": "^16.13.1", | ||
"rxjs": "^6.6.0", | ||
"tsdx": "^0.13.2", | ||
"tslib": "^2.0.0", | ||
"rxjs": "^6.6.2", | ||
"tsdx": "^0.13.3", | ||
"tslib": "^2.0.1", | ||
"typescript": "^3.9.7" | ||
} | ||
} |
143
README.md
# @react-rxjs/utils | ||
## Installation | ||
npm install @react-rxjs/utils | ||
@@ -8,65 +9,54 @@ | ||
### useSubscribe | ||
### split | ||
A React hook that creates a subscription to the provided observable once the | ||
component mounts and it unsubscribes when the component unmounts. | ||
A RxJS operator that groups the items emitted by the source based on the | ||
keySelector function, emitting one Observable for each group. | ||
Arguments: | ||
- `source$`: Source observable that the hook will subscribe to. | ||
- `unsubscribeGraceTime`: Amount of time in ms that the hook should wait before | ||
unsubscribing from the source observable after it unmounts (default = 200). | ||
Important: This hook doesn't trigger any updates. | ||
- `keySelector`: A function that receives an item and returns the key of that | ||
item's group. | ||
- `streamSelector`: (optional) The function to apply to each group observable | ||
(default = identity). | ||
### Subscribe | ||
`split` will subscribe to each group observable and share the result to every | ||
inner subscriber of that group. This inner observable can be mapped to another | ||
observable through the `streamSelector` argument. | ||
A React Component that creates a subscription to the provided observable once | ||
the component mounts and it unsubscribes from it when the component unmounts. | ||
### collectValues | ||
Properties: | ||
- `source$`: Source observable that the Component will subscribe to. | ||
- `graceTime`: an optional property that describes the amount of time in ms | ||
that the Component should wait before unsubscribing from the source observable | ||
after it unmounts (default = 200). | ||
A pipeable operator that collects all the GroupedObservables emitted by | ||
the source and emits a Map with the latest values of the inner observables. | ||
Important: This Component doesn't trigger any updates. | ||
### groupInMap | ||
A RxJS pipeable operator which groups all values by key and emits a Map that | ||
holds the latest value for each key | ||
Arguments: | ||
- `keyGetter`: A function that extracts the key for each item. | ||
- `projection`: Projection function for each group. | ||
```ts | ||
const votesByKey$ = new Subject<{key: string}>() | ||
const votesByKey$ = new Subject<{ key: string }>() | ||
const counters$ = votesByKey$.pipe( | ||
groupInMap( | ||
vote => vote.key, | ||
votes$ => votes$.pipe( | ||
mapTo(1), | ||
scan(count => count + 1), | ||
takeWhile(count => count < 3) | ||
) | ||
) | ||
split( | ||
(vote) => vote.key, | ||
(votes$) => | ||
votes$.pipe( | ||
mapTo(1), | ||
scan((count) => count + 1), | ||
takeWhile((count) => count < 3), | ||
), | ||
), | ||
collectValues(), | ||
) | ||
counters$.subscribe(counters => { | ||
console.log('counters$:') | ||
counters$.subscribe((counters) => { | ||
console.log("counters$:") | ||
counters.forEach((value, key) => { | ||
console.log(`${key}: ${value}`); | ||
console.log(`${key}: ${value}`) | ||
}) | ||
}) | ||
votesByKey$.next({key: 'foo'}) | ||
votesByKey$.next({ key: "foo" }) | ||
// > counters$: | ||
// > foo: 1 | ||
votesByKey$.next({key: 'foo'}) | ||
votesByKey$.next({ key: "foo" }) | ||
// > counters$: | ||
// > foo: 2 | ||
votesByKey$.next({key: 'bar'}) | ||
votesByKey$.next({ key: "bar" }) | ||
// > counters$: | ||
@@ -76,14 +66,25 @@ // > foo: 2 | ||
votesByKey$.next({key: 'foo'}) | ||
votesByKey$.next({ key: "foo" }) | ||
// > counters$: | ||
// > bar: 1 | ||
votesByKey$.next({key: 'bar'}) | ||
votesByKey$.next({ key: "bar" }) | ||
// > counters$: | ||
// > bar: 2 | ||
// | ||
votesByKey$.next({key: 'bar'}) | ||
votesByKey$.next({ key: "bar" }) | ||
// > counters$: | ||
``` | ||
### collect | ||
A pipeable operator that collects all the GroupedObservables emitted by | ||
the source and emits a Map with the active inner observables. | ||
Arguments: | ||
- `filter?`: A function that receives the inner Observable and returns an | ||
Observable of boolean values, which indicates whether the inner observable | ||
should be collected. | ||
### mergeWithKey | ||
@@ -95,4 +96,5 @@ | ||
Arguments: | ||
- `inputObject`: Object of streams | ||
- `inputObject`: Object of streams | ||
```ts | ||
@@ -123,1 +125,52 @@ const inc$ = new Subject() | ||
``` | ||
### selfDependant | ||
A creation operator that helps at creating observables that have circular | ||
dependencies | ||
```ts | ||
const [_resetableCounter$, connectResetableCounter] = selfDependant<number>() | ||
const clicks$ = new Subject() | ||
const inc$ = clicks$.pipe( | ||
withLatestFrom(_resetableCounter$), | ||
map((_, x) => x + 1), | ||
share(), | ||
) | ||
const delayedZero$ = of(0).pipe(delay(10_000)) | ||
const reset$ = inc$.pipe(switchMapTo(delayedZero$)) | ||
const resetableCounter$ = merge(inc$, reset$, of(0)).pipe( | ||
connectResetableCounter(), | ||
) | ||
``` | ||
### suspend | ||
```ts | ||
const story$ = selectedStoryId$.pipe( | ||
switchMap(id => suspend(getStory$(id)) | ||
) | ||
``` | ||
A RxJS creation operator that prepends a `SUSPENSE` on the source observable. | ||
### suspended | ||
```ts | ||
const story$ = selectedStoryId$.pipe( | ||
switchMap((id) => getStory$(id).pipe(suspended())), | ||
) | ||
``` | ||
The pipeable version of `suspend` | ||
### switchMapSuspended | ||
```ts | ||
const story$ = selectedStoryId$.pipe(switchMapSuspended(getStory$)) | ||
``` | ||
Like `switchMap` but applying a `startWith(SUSPENSE)` to the inner observable. |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
66237
52.05%19
18.75%529
53.78%173
44.17%3
50%1
Infinity%