@keiii/k-stream
Advanced tools
Comparing version 0.0.32 to 0.0.33
@@ -1,6 +0,6 @@ | ||
import { ksCreateStream, ksShareReplay, noop, } from './core'; | ||
import { ksShareReplay, noop } from './core'; | ||
export const ksBehaviourSubject = (initValue, behaviour = ksShareReplay) => { | ||
const state = { isCompleted: false, current: initValue }; | ||
let subjectObserver = null; | ||
const stream = ksCreateStream(behaviour, observer => { | ||
const stream = behaviour(observer => { | ||
subjectObserver = observer; | ||
@@ -15,5 +15,3 @@ subjectObserver.next(state.current); | ||
state.current = value; | ||
if (subjectObserver !== null) { | ||
subjectObserver.next(value); | ||
} | ||
subjectObserver === null || subjectObserver === void 0 ? void 0 : subjectObserver.next(value); | ||
}; | ||
@@ -34,5 +32,3 @@ return { | ||
state.isCompleted = true; | ||
if (subjectObserver !== null) { | ||
subjectObserver.complete(); | ||
} | ||
subjectObserver === null || subjectObserver === void 0 ? void 0 : subjectObserver.complete(); | ||
}, | ||
@@ -39,0 +35,0 @@ next, |
@@ -28,4 +28,10 @@ export declare type Unsubscribable = { | ||
export declare const asyncScheduler: Scheduler; | ||
export declare const lazySubscription: () => { | ||
resolve: (subscription: Unsubscribable) => Unsubscribable; | ||
/** | ||
* Creates new observable object with `unsubscribe()` method | ||
* what could be called before `subscribe()`. | ||
*/ | ||
export declare const _lazy: <A>(observable: { | ||
subscribe: (observer: A) => Unsubscribable; | ||
}) => { | ||
subscribe: (observer: A) => Unsubscribable; | ||
unsubscribe: () => void; | ||
@@ -45,6 +51,1 @@ }; | ||
export declare const ksShareReplay: KsBehaviour; | ||
export declare const ksCreateStream: <A>(behaviour: KsBehaviour, subscriber: SubscriberRequired<A>) => Stream<A>; | ||
/** | ||
* Combine transformers. | ||
*/ | ||
export declare const ksPipe: <A, B, C>(fab: Transformer<A, B>, fbc: Transformer<B, C>) => Transformer<A, C>; |
@@ -9,3 +9,7 @@ import { some, none, isSome } from './option'; | ||
}; | ||
export const lazySubscription = () => { | ||
/** | ||
* Creates new observable object with `unsubscribe()` method | ||
* what could be called before `subscribe()`. | ||
*/ | ||
export const _lazy = (observable) => { | ||
let resolve; | ||
@@ -16,3 +20,4 @@ const promise = new Promise(r => { | ||
return { | ||
resolve: (subscription) => { | ||
subscribe: (observer) => { | ||
const subscription = observable.subscribe(observer); | ||
resolve(subscription); | ||
@@ -66,6 +71,8 @@ return subscription; | ||
} | ||
observersMap.forEach(observer => { var _a; return (_a = observer.next) === null || _a === void 0 ? void 0 : _a.call(observer, value); }); | ||
// We need to save last value after notify observers | ||
// to prevent duplicates with circular dependencies | ||
if (replay) { | ||
lastValue = some(value); | ||
} | ||
observersMap.forEach(observer => { var _a; return (_a = observer.next) === null || _a === void 0 ? void 0 : _a.call(observer, value); }); | ||
}; | ||
@@ -101,4 +108,7 @@ const onComplete = () => { | ||
observersMap.set(subscribeId, observer); | ||
// NOTE: we need to create subscription after added observer | ||
// We need to create subscription after added observer into observersMap | ||
if (subscription === null) { | ||
// First we need to make `subscription` not equals `null` | ||
// to prevent `Maximum call stack size exceeded` with circular dependencies | ||
subscription = { unsubscribe: noop }; | ||
subscription = subscriber({ | ||
@@ -129,11 +139,2 @@ next: onNext, | ||
export const ksShareReplay = f => createShareStream(f, true); | ||
export const ksCreateStream = (behaviour, subscriber) => { | ||
return behaviour(subscriber); | ||
}; | ||
/** | ||
* Combine transformers. | ||
*/ | ||
export const ksPipe = (fab, fbc) => { | ||
return stream => stream.pipe(fab).pipe(fbc); | ||
}; | ||
//# sourceMappingURL=core.js.map |
@@ -1,2 +0,2 @@ | ||
import { asyncScheduler, ksCold, ksCreateStream, lazySubscription, noop, } from './core'; | ||
import { asyncScheduler, ksCold, _lazy, noop, } from './core'; | ||
import { ksMap } from './transformers'; | ||
@@ -9,3 +9,3 @@ import { isSome, none, some } from './option'; | ||
export const ksEmpty = () => { | ||
return ksCreateStream(ksCold, ({ complete }) => { | ||
return ksCold(({ complete }) => { | ||
complete(); | ||
@@ -19,3 +19,3 @@ return { unsubscribe: noop }; | ||
export const ksOf = (value, behaviour = ksCold) => { | ||
return ksCreateStream(behaviour, ({ next, complete }) => { | ||
return behaviour(({ next, complete }) => { | ||
next(value); | ||
@@ -30,8 +30,8 @@ complete(); | ||
export const ksConcat = (stream_a, stream_b) => { | ||
return ksCreateStream(stream_a.behaviour, ({ next, complete }) => { | ||
const subscription_b = lazySubscription(); | ||
const subscription_a = stream_a.subscribe({ | ||
return stream_a.behaviour(({ next, complete }) => { | ||
const b = _lazy(stream_b); | ||
const a = stream_a.subscribe({ | ||
next, | ||
complete: () => { | ||
subscription_b.resolve(stream_b.subscribe({ next, complete })); | ||
b.subscribe({ next, complete }); | ||
}, | ||
@@ -41,4 +41,4 @@ }); | ||
unsubscribe: () => { | ||
subscription_b.unsubscribe(); | ||
subscription_a.unsubscribe(); | ||
b.unsubscribe(); | ||
a.unsubscribe(); | ||
}, | ||
@@ -52,10 +52,10 @@ }; | ||
export const ksMerge = (stream_a, stream_b) => { | ||
return ksCreateStream(stream_a.behaviour, ({ next, complete }) => { | ||
return stream_a.behaviour(({ next, complete }) => { | ||
let completed_a = false; | ||
let completed_b = false; | ||
const subscription_a = lazySubscription(); | ||
const subscription_b = lazySubscription(); | ||
const a = _lazy(stream_a); | ||
const b = _lazy(stream_b); | ||
const unsubscribe = () => { | ||
subscription_b.unsubscribe(); | ||
subscription_a.unsubscribe(); | ||
b.unsubscribe(); | ||
a.unsubscribe(); | ||
}; | ||
@@ -68,3 +68,3 @@ const tryComplete = () => { | ||
}; | ||
subscription_a.resolve(stream_a.subscribe({ | ||
a.subscribe({ | ||
next, | ||
@@ -75,4 +75,4 @@ complete: () => { | ||
}, | ||
})); | ||
subscription_b.resolve(stream_b.subscribe({ | ||
}); | ||
b.subscribe({ | ||
next, | ||
@@ -83,3 +83,3 @@ complete: () => { | ||
}, | ||
})); | ||
}); | ||
return { unsubscribe }; | ||
@@ -92,3 +92,3 @@ }); | ||
export const ksZip = (stream_a, stream_b) => { | ||
return ksCreateStream(stream_a.behaviour, ({ next, complete }) => { | ||
return stream_a.behaviour(({ next, complete }) => { | ||
let completed_a = false; | ||
@@ -98,7 +98,7 @@ let completed_b = false; | ||
const queue_b = []; | ||
const subscription_a = lazySubscription(); | ||
const subscription_b = lazySubscription(); | ||
const a = _lazy(stream_a); | ||
const b = _lazy(stream_b); | ||
const unsubscribe = () => { | ||
subscription_b.unsubscribe(); | ||
subscription_a.unsubscribe(); | ||
b.unsubscribe(); | ||
a.unsubscribe(); | ||
}; | ||
@@ -117,3 +117,3 @@ const tryNext = () => { | ||
}; | ||
subscription_a.resolve(stream_a.subscribe({ | ||
a.subscribe({ | ||
next: value => { | ||
@@ -127,4 +127,4 @@ queue_a.push(value); | ||
}, | ||
})); | ||
subscription_b.resolve(stream_b.subscribe({ | ||
}); | ||
b.subscribe({ | ||
next: value => { | ||
@@ -138,3 +138,3 @@ queue_b.push(value); | ||
}, | ||
})); | ||
}); | ||
return { unsubscribe }; | ||
@@ -144,3 +144,3 @@ }); | ||
export const ksTimeout = (ms, behaviour = ksCold, scheduler = asyncScheduler) => { | ||
return ksCreateStream(behaviour, ({ next, complete }) => { | ||
return behaviour(({ next, complete }) => { | ||
const handler = () => { | ||
@@ -154,3 +154,3 @@ next(0); | ||
export const ksInterval = (ms, behaviour = ksCold, scheduler = asyncScheduler) => { | ||
return ksCreateStream(behaviour, ({ next }) => { | ||
return behaviour(({ next }) => { | ||
let count = 0; | ||
@@ -176,3 +176,3 @@ let unsubscribe = noop; | ||
export const ksCombineLatest = (stream_a, stream_b) => { | ||
return ksCreateStream(stream_a.behaviour, ({ next, complete }) => { | ||
return stream_a.behaviour(({ next, complete }) => { | ||
let completed_a = false; | ||
@@ -182,7 +182,7 @@ let completed_b = false; | ||
let value_b = none; | ||
const subscription_a = lazySubscription(); | ||
const subscription_b = lazySubscription(); | ||
const a = _lazy(stream_a); | ||
const b = _lazy(stream_b); | ||
const unsubscribe = () => { | ||
subscription_b.unsubscribe(); | ||
subscription_a.unsubscribe(); | ||
b.unsubscribe(); | ||
a.unsubscribe(); | ||
}; | ||
@@ -200,3 +200,3 @@ const tryNext = () => { | ||
}; | ||
subscription_a.resolve(stream_a.subscribe({ | ||
a.subscribe({ | ||
next: value => { | ||
@@ -210,4 +210,4 @@ value_a = some(value); | ||
}, | ||
})); | ||
subscription_b.resolve(stream_b.subscribe({ | ||
}); | ||
b.subscribe({ | ||
next: value => { | ||
@@ -221,3 +221,3 @@ value_b = some(value); | ||
}, | ||
})); | ||
}); | ||
return { unsubscribe }; | ||
@@ -230,3 +230,3 @@ }); | ||
export const ksForkJoin = (stream_a, stream_b) => { | ||
return ksCreateStream(stream_a.behaviour, ({ next, complete }) => { | ||
return stream_a.behaviour(({ next, complete }) => { | ||
let completed_a = false; | ||
@@ -236,7 +236,7 @@ let completed_b = false; | ||
let value_b = none; | ||
const subscription_a = lazySubscription(); | ||
const subscription_b = lazySubscription(); | ||
const a = _lazy(stream_a); | ||
const b = _lazy(stream_b); | ||
const unsubscribe = () => { | ||
subscription_b.unsubscribe(); | ||
subscription_a.unsubscribe(); | ||
b.unsubscribe(); | ||
a.unsubscribe(); | ||
}; | ||
@@ -250,3 +250,3 @@ const tryComplete = () => { | ||
}; | ||
subscription_a.resolve(stream_a.subscribe({ | ||
a.subscribe({ | ||
next: value => (value_a = some(value)), | ||
@@ -257,4 +257,4 @@ complete: () => { | ||
}, | ||
})); | ||
subscription_b.resolve(stream_b.subscribe({ | ||
}); | ||
b.subscribe({ | ||
next: value => (value_b = some(value)), | ||
@@ -265,3 +265,3 @@ complete: () => { | ||
}, | ||
})); | ||
}); | ||
return { unsubscribe }; | ||
@@ -271,3 +271,3 @@ }); | ||
export const ksFromPromise = (promise, behaviour = ksCold) => { | ||
return ksCreateStream(behaviour, ({ next, complete }) => { | ||
return behaviour(({ next, complete }) => { | ||
let on = true; | ||
@@ -274,0 +274,0 @@ promise |
@@ -1,2 +0,2 @@ | ||
import { ksCreateStream, noop, ksShare, } from './core'; | ||
import { noop, ksShare } from './core'; | ||
export const ksSubject = (behaviour = ksShare) => { | ||
@@ -14,3 +14,3 @@ let isCompleted = false; | ||
}; | ||
const stream = ksCreateStream(behaviour, observer => { | ||
const stream = behaviour(observer => { | ||
subjectObserver = observer; | ||
@@ -17,0 +17,0 @@ return { unsubscribe: noop }; |
@@ -1,5 +0,5 @@ | ||
import { ksCreateStream, lazySubscription, } from './core'; | ||
import { _lazy, } from './core'; | ||
import { some, none, isSome } from './option'; | ||
export const ksChangeBehaviour = (behaviour) => { | ||
return stream => ksCreateStream(behaviour, stream.subscribe); | ||
return stream => behaviour(stream.subscribe); | ||
}; | ||
@@ -11,3 +11,3 @@ /** | ||
return stream => { | ||
return ksCreateStream(stream.behaviour, ({ next, complete }) => { | ||
return stream.behaviour(({ next, complete }) => { | ||
return stream.subscribe({ | ||
@@ -25,3 +25,3 @@ next: (value) => next(project(value)), | ||
return stream => { | ||
return ksCreateStream(stream.behaviour, ({ next, complete }) => { | ||
return stream.behaviour(({ next, complete }) => { | ||
return stream.subscribe({ | ||
@@ -39,3 +39,3 @@ next: () => next(value), | ||
return stream => { | ||
return ksCreateStream(stream.behaviour, ({ next, complete }) => { | ||
return stream.behaviour(({ next, complete }) => { | ||
return stream.subscribe({ | ||
@@ -61,3 +61,3 @@ next: value => { | ||
return stream => { | ||
return ksCreateStream(stream.behaviour, ({ next, complete }) => { | ||
return stream.behaviour(({ next, complete }) => { | ||
return stream.subscribe({ | ||
@@ -80,3 +80,3 @@ next: value => { | ||
return stream => { | ||
return ksCreateStream(stream.behaviour, ({ next, complete }) => { | ||
return stream.behaviour(({ next, complete }) => { | ||
let projectSubscription = null; | ||
@@ -98,9 +98,4 @@ let projectCompleted = false; | ||
}; | ||
const tryUnsubscribeProject = () => { | ||
if (projectSubscription !== null) { | ||
projectSubscription.unsubscribe(); | ||
} | ||
}; | ||
const onMainNext = (value) => { | ||
tryUnsubscribeProject(); | ||
projectSubscription === null || projectSubscription === void 0 ? void 0 : projectSubscription.unsubscribe(); | ||
projectCompleted = false; | ||
@@ -118,3 +113,3 @@ projectSubscription = project(value).subscribe({ | ||
unsubscribe: () => { | ||
tryUnsubscribeProject(); | ||
projectSubscription === null || projectSubscription === void 0 ? void 0 : projectSubscription.unsubscribe(); | ||
mainSubscription.unsubscribe(); | ||
@@ -131,3 +126,3 @@ }, | ||
return stream => { | ||
const newStream = ksCreateStream(stream.behaviour, ({ next, complete }) => { | ||
const newStream = stream.behaviour(({ next, complete }) => { | ||
let isCompleted = false; | ||
@@ -145,5 +140,5 @@ const mainSubscription = stream.subscribe({ | ||
let isTerminated = false; | ||
const notifierSubscription = lazySubscription(); | ||
const _notifier = _lazy(notifier); | ||
const unsubscribe = () => { | ||
notifierSubscription.unsubscribe(); | ||
_notifier.unsubscribe(); | ||
mainSubscription.unsubscribe(); | ||
@@ -158,6 +153,6 @@ }; | ||
}; | ||
notifierSubscription.resolve(notifier.subscribe({ | ||
_notifier.subscribe({ | ||
next: terminate, | ||
complete: terminate, | ||
})); | ||
}); | ||
return { unsubscribe }; | ||
@@ -175,8 +170,8 @@ }); | ||
return stream => { | ||
return ksCreateStream(stream.behaviour, ({ next, complete }) => { | ||
return stream.behaviour(({ next, complete }) => { | ||
let counter = 0; | ||
const subscription = lazySubscription(); | ||
const _stream = _lazy(stream); | ||
const onComplete = () => { | ||
complete(); | ||
subscription.unsubscribe(); | ||
_stream.unsubscribe(); | ||
}; | ||
@@ -189,6 +184,6 @@ const onNext = value => { | ||
}; | ||
return subscription.resolve(stream.subscribe({ | ||
return _stream.subscribe({ | ||
next: onNext, | ||
complete: onComplete, | ||
})); | ||
}); | ||
}); | ||
@@ -202,7 +197,7 @@ }; | ||
return stream => { | ||
return ksCreateStream(stream.behaviour, ({ next, complete }) => { | ||
const subscription = lazySubscription(); | ||
return stream.behaviour(({ next, complete }) => { | ||
const _stream = _lazy(stream); | ||
const onComplete = () => { | ||
complete(); | ||
subscription.unsubscribe(); | ||
_stream.unsubscribe(); | ||
}; | ||
@@ -217,6 +212,6 @@ const onNext = value => { | ||
}; | ||
return subscription.resolve(stream.subscribe({ | ||
return _stream.subscribe({ | ||
next: onNext, | ||
complete: onComplete, | ||
})); | ||
}); | ||
}); | ||
@@ -230,3 +225,3 @@ }; | ||
return stream => { | ||
return ksCreateStream(stream.behaviour, ({ next, complete }) => { | ||
return stream.behaviour(({ next, complete }) => { | ||
const timers = new Set(); | ||
@@ -267,3 +262,3 @@ const clearTimers = () => { | ||
return stream => { | ||
return ksCreateStream(stream.behaviour, ({ next, complete }) => { | ||
return stream.behaviour(({ next, complete }) => { | ||
let timeoutId; | ||
@@ -306,3 +301,3 @@ let lastValue = none; | ||
return stream => { | ||
return ksCreateStream(stream.behaviour, ({ next, complete }) => { | ||
return stream.behaviour(({ next, complete }) => { | ||
let executedTime = Number.MIN_SAFE_INTEGER; | ||
@@ -341,3 +336,3 @@ let lastValue = none; | ||
return stream => { | ||
return ksCreateStream(stream.behaviour, ({ next, complete }) => { | ||
return stream.behaviour(({ next, complete }) => { | ||
let prevValue = none; | ||
@@ -361,3 +356,3 @@ return stream.subscribe({ | ||
return stream => { | ||
return ksCreateStream(stream.behaviour, ({ next, complete }) => { | ||
return stream.behaviour(({ next, complete }) => { | ||
let acc = seed; | ||
@@ -364,0 +359,0 @@ return stream.subscribe({ |
{ | ||
"name": "@keiii/k-stream", | ||
"version": "0.0.32", | ||
"version": "0.0.33", | ||
"description": "K-Stream is a functional reactive stream library for TypeScript", | ||
@@ -5,0 +5,0 @@ "main": "./dist/index.js", |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
62010
1036