@keiii/k-stream
Advanced tools
Comparing version 0.0.31 to 0.0.32
@@ -1,7 +0,7 @@ | ||
import { CompleteFn, NextFn, Stream } from './core'; | ||
export declare type BehaviourSubject<T> = Stream<T> & { | ||
value: T; | ||
readonly next: NextFn<T>; | ||
readonly complete: CompleteFn; | ||
import { Complete, Next, Stream } from './core'; | ||
export declare type BehaviourSubject<A> = Stream<A> & { | ||
value: A; | ||
readonly next: Next<A>; | ||
readonly complete: Complete; | ||
}; | ||
export declare const ksBehaviourSubject: <T>(initValue: T, behaviour?: import("./core").KsBehaviour) => BehaviourSubject<T>; | ||
export declare const ksBehaviourSubject: <A>(initValue: A, behaviour?: import("./core").KsBehaviour) => BehaviourSubject<A>; |
@@ -10,11 +10,9 @@ import { ksCreateStream, ksShareReplay, noop, } from './core'; | ||
}); | ||
const next = (value) => { | ||
const next = value => { | ||
if (state.isCompleted) { | ||
console.warn('Logic error: Ignore call next on completed stream.'); | ||
return console.warn('Logic error: Ignore call `next` on completed stream.'); | ||
} | ||
else { | ||
state.current = value; | ||
if (subjectObserver !== null) { | ||
subjectObserver.next(value); | ||
} | ||
state.current = value; | ||
if (subjectObserver !== null) { | ||
subjectObserver.next(value); | ||
} | ||
@@ -30,5 +28,3 @@ }; | ||
} | ||
else { | ||
return stream.subscribe(observer); | ||
} | ||
return stream.subscribe(observer); | ||
}, | ||
@@ -35,0 +31,0 @@ pipe: stream.pipe, |
export declare type Unsubscribable = { | ||
readonly unsubscribe: () => void; | ||
}; | ||
export declare type NextFn<T> = (value: T) => void; | ||
export declare type CompleteFn = () => void; | ||
export declare type Observer<T> = { | ||
readonly next: NextFn<T>; | ||
readonly complete: CompleteFn; | ||
export declare type Next<A> = (value: A) => void; | ||
export declare type Complete = () => void; | ||
export declare type Observer<A> = { | ||
readonly next?: Next<A>; | ||
readonly complete?: Complete; | ||
}; | ||
export declare type SubscribeFn<T> = (observer: Observer<T>) => Unsubscribable; | ||
export declare type SubscribePartialFn<T> = (partialObserver: Partial<Observer<T>>) => Unsubscribable; | ||
export declare type TransformFn<T, O> = (stream: Stream<T>) => Stream<O>; | ||
export declare type PipeFn<T> = <O>(transformFn: TransformFn<T, O>) => Stream<O>; | ||
export declare type Observable<T> = { | ||
readonly subscribe: SubscribePartialFn<T>; | ||
export declare type Subscriber<A> = (observer: Observer<A>) => Unsubscribable; | ||
export declare type SubscriberRequired<A> = (observer: Required<Observer<A>>) => Unsubscribable; | ||
export declare type Transformer<A, B> = (stream: Stream<A>) => Stream<B>; | ||
export declare type Pipe<A> = <B>(transformer: Transformer<A, B>) => Stream<B>; | ||
export declare type Observable<A> = { | ||
readonly subscribe: Subscriber<A>; | ||
}; | ||
export declare type Stream<T> = Observable<T> & { | ||
readonly pipe: PipeFn<T>; | ||
export declare type Stream<A> = Observable<A> & { | ||
readonly pipe: Pipe<A>; | ||
readonly behaviour: KsBehaviour; | ||
readonly lastValue?: T; | ||
readonly lastValue?: A; | ||
}; | ||
export declare type KsBehaviour = <T>(subscribeFn: SubscribeFn<T>) => Stream<T>; | ||
export declare type KsBehaviour = <A>(subscriber: SubscriberRequired<A>) => Stream<A>; | ||
export declare const noop: () => void; | ||
export declare type Scheduler = { | ||
schedule: (handler: () => void, ms: number) => Unsubscribable; | ||
}; | ||
export declare const asyncScheduler: Scheduler; | ||
export declare const lazySubscription: () => { | ||
resolve: (subscription: Unsubscribable) => Unsubscribable; | ||
unsubscribe: () => void; | ||
}; | ||
/** | ||
@@ -36,6 +44,6 @@ * Create source on each subscription. | ||
export declare const ksShareReplay: KsBehaviour; | ||
export declare const ksCreateStream: <T>(b: KsBehaviour, f: SubscribeFn<T>) => Stream<T>; | ||
export declare const ksCreateStream: <A>(behaviour: KsBehaviour, subscriber: SubscriberRequired<A>) => Stream<A>; | ||
/** | ||
* Combine transformers. | ||
*/ | ||
export declare const ksPipe: <A, B, C>(t1: TransformFn<A, B>, t2: TransformFn<B, C>) => TransformFn<A, C>; | ||
export declare const ksPipe: <A, B, C>(fab: Transformer<A, B>, fbc: Transformer<B, C>) => Transformer<A, C>; |
105
dist/core.js
@@ -1,18 +0,37 @@ | ||
import { Some, None } from './ts-option'; | ||
export const noop = () => { }; | ||
import { some, none, isSome } from './option'; | ||
export const noop = () => void 0; | ||
export const asyncScheduler = { | ||
schedule: (handler, ms) => { | ||
const t = setTimeout(handler, ms); | ||
return { unsubscribe: () => clearTimeout(t) }; | ||
}, | ||
}; | ||
export const lazySubscription = () => { | ||
let resolve; | ||
const promise = new Promise(r => { | ||
resolve = r; | ||
}); | ||
return { | ||
resolve: (subscription) => { | ||
resolve(subscription); | ||
return subscription; | ||
}, | ||
unsubscribe: () => { | ||
promise.then(subscription => subscription.unsubscribe()); | ||
}, | ||
}; | ||
}; | ||
/** | ||
* Create source on each subscription. | ||
*/ | ||
export const ksCold = (subscribeFn) => { | ||
const subscribe = (observer) => { | ||
export const ksCold = (subscriber) => { | ||
const subscribe = observer => { | ||
let isCompleted = false; | ||
return subscribeFn({ | ||
return subscriber({ | ||
next: value => { | ||
var _a; | ||
if (isCompleted) { | ||
console.warn('Logic error: Ignore call next on completed stream.'); | ||
return console.warn('Logic error: Ignore call `next` on completed stream.'); | ||
} | ||
else { | ||
(_a = observer.next) === null || _a === void 0 ? void 0 : _a.call(observer, value); | ||
} | ||
(_a = observer.next) === null || _a === void 0 ? void 0 : _a.call(observer, value); | ||
}, | ||
@@ -22,8 +41,6 @@ complete: () => { | ||
if (isCompleted) { | ||
console.warn('Logic error: Ignore call complete on completed stream.'); | ||
return console.warn('Logic error: Ignore call `complete` on completed stream.'); | ||
} | ||
else { | ||
isCompleted = true; | ||
(_a = observer.complete) === null || _a === void 0 ? void 0 : _a.call(observer); | ||
} | ||
isCompleted = true; | ||
(_a = observer.complete) === null || _a === void 0 ? void 0 : _a.call(observer); | ||
}, | ||
@@ -34,3 +51,3 @@ }); | ||
subscribe, | ||
pipe: transformFn => transformFn(stream), | ||
pipe: transformer => transformer(stream), | ||
behaviour: ksCold, | ||
@@ -40,32 +57,24 @@ }; | ||
}; | ||
const createShareStream = (subscribeFn, replay) => { | ||
const createShareStream = (subscriber, replay) => { | ||
let isCompleted = false; | ||
let lastValue = None(); | ||
let lastValue = none; | ||
let subscription = null; | ||
const observersMap = new Map(); | ||
const onNext = (value) => { | ||
const onNext = value => { | ||
if (isCompleted) { | ||
console.warn('Logic error: Ignore call next on completed stream.'); | ||
return console.warn('Logic error: Ignore call `next` on completed stream.'); | ||
} | ||
else { | ||
if (replay) { | ||
lastValue = Some(value); | ||
} | ||
for (const { next } of observersMap.values()) { | ||
next === null || next === void 0 ? void 0 : next(value); | ||
} | ||
if (replay) { | ||
lastValue = some(value); | ||
} | ||
observersMap.forEach(observer => { var _a; return (_a = observer.next) === null || _a === void 0 ? void 0 : _a.call(observer, value); }); | ||
}; | ||
const onComplete = () => { | ||
if (isCompleted) { | ||
console.warn('Logic error: Ignore call complete on completed stream.'); | ||
return console.warn('Logic error: Ignore call `complete` on completed stream.'); | ||
} | ||
else { | ||
isCompleted = true; | ||
for (const { complete } of observersMap.values()) { | ||
complete === null || complete === void 0 ? void 0 : complete(); | ||
} | ||
} | ||
isCompleted = true; | ||
observersMap.forEach(observer => { var _a; return (_a = observer.complete) === null || _a === void 0 ? void 0 : _a.call(observer); }); | ||
}; | ||
const subscribe = (observer) => { | ||
const subscribe = observer => { | ||
var _a; | ||
@@ -75,6 +84,6 @@ if (isCompleted) { | ||
} | ||
if (replay && lastValue._tag === 'Some') { | ||
(_a = observer.next) === null || _a === void 0 ? void 0 : _a.call(observer, lastValue.some); | ||
if (replay && isSome(lastValue)) { | ||
(_a = observer.next) === null || _a === void 0 ? void 0 : _a.call(observer, lastValue.value); | ||
} | ||
const subscribeId = Object.freeze({}); | ||
const subscribeId = Symbol(); | ||
const unsubscribe = () => { | ||
@@ -84,3 +93,3 @@ observersMap.delete(subscribeId); | ||
if (replay) { | ||
lastValue = None(); | ||
lastValue = none; | ||
} | ||
@@ -96,3 +105,3 @@ if (subscription !== null) { | ||
if (subscription === null) { | ||
subscription = subscribeFn({ | ||
subscription = subscriber({ | ||
next: onNext, | ||
@@ -106,6 +115,6 @@ complete: onComplete, | ||
subscribe, | ||
pipe: transformFn => transformFn(stream), | ||
pipe: transformer => transformer(stream), | ||
behaviour: replay ? ksShareReplay : ksShare, | ||
get lastValue() { | ||
return lastValue._tag === 'Some' ? lastValue.some : undefined; | ||
return isSome(lastValue) ? lastValue.value : undefined; | ||
}, | ||
@@ -118,18 +127,16 @@ }; | ||
*/ | ||
export const ksShare = (f) => { | ||
return createShareStream(f, false); | ||
}; | ||
export const ksShare = f => createShareStream(f, false); | ||
/** | ||
* Share source and replay last emissions on subscription. | ||
*/ | ||
export const ksShareReplay = (f) => { | ||
return createShareStream(f, true); | ||
export const ksShareReplay = f => createShareStream(f, true); | ||
export const ksCreateStream = (behaviour, subscriber) => { | ||
return behaviour(subscriber); | ||
}; | ||
export const ksCreateStream = (b, f) => b(f); | ||
/** | ||
* Combine transformers. | ||
*/ | ||
export const ksPipe = (t1, t2) => { | ||
return (s) => s.pipe(t1).pipe(t2); | ||
export const ksPipe = (fab, fbc) => { | ||
return stream => stream.pipe(fab).pipe(fbc); | ||
}; | ||
//# sourceMappingURL=core.js.map |
import { Observable, Stream } from './core'; | ||
import { Option } from './ts-option'; | ||
import { Result } from './ts-result'; | ||
import { Option } from './option'; | ||
import { Either } from './either'; | ||
/** | ||
* Observable that immediately completes. | ||
*/ | ||
export declare const ksEmpty: <T>() => Stream<T>; | ||
export declare const ksEmpty: <A>() => Stream<A>; | ||
/** | ||
* Emit variable amount of values in a sequence and then emits a complete notification. | ||
*/ | ||
export declare const ksOf: <T>(value: T, behaviour?: import("./core").KsBehaviour) => Stream<T>; | ||
export declare const ksOf: <A>(value: A, behaviour?: import("./core").KsBehaviour) => Stream<A>; | ||
/** | ||
* Subscribe to observables in order as previous completes. | ||
*/ | ||
export declare const ksConcat: <T1, T2>(stream1: Stream<T1>, stream2: Stream<T2>) => Stream<T1 | T2>; | ||
export declare const ksConcat: <A, B>(stream_a: Stream<A>, stream_b: Stream<B>) => Stream<A | B>; | ||
/** | ||
* Turn multiple observables into a single observable. | ||
*/ | ||
export declare const ksMerge: <T1, T2>(stream1: Stream<T1>, stream2: Stream<T2>) => Stream<T1 | T2>; | ||
export declare const ksMerge: <A, B>(stream_a: Stream<A>, stream_b: Stream<B>) => Stream<A | B>; | ||
/** | ||
* After all observables emit, emit values as an array. | ||
*/ | ||
export declare const ksZip: <T1, T2>(stream1: Stream<T1>, stream2: Stream<T2>) => Stream<[T1, T2]>; | ||
export declare const ksTimeout: (ms: number, behaviour?: import("./core").KsBehaviour) => Stream<number>; | ||
export declare const ksInterval: (ms: number, behaviour?: import("./core").KsBehaviour) => Stream<number>; | ||
export declare const ksZip: <A, B>(stream_a: Stream<A>, stream_b: Stream<B>) => Stream<[A, B]>; | ||
export declare const ksTimeout: (ms: number, behaviour?: import("./core").KsBehaviour, scheduler?: import("./core").Scheduler) => Stream<number>; | ||
export declare const ksInterval: (ms: number, behaviour?: import("./core").KsBehaviour, scheduler?: import("./core").Scheduler) => Stream<number>; | ||
export declare const ksPeriodic: (ms: number, behaviour?: import("./core").KsBehaviour) => Stream<number>; | ||
@@ -30,8 +30,8 @@ /** | ||
*/ | ||
export declare const ksCombineLatest: <T1, T2>(stream1: Stream<T1>, stream2: Stream<T2>) => Stream<[T1, T2]>; | ||
export declare const ksCombineLatest: <A, B>(stream_a: Stream<A>, stream_b: Stream<B>) => Stream<[A, B]>; | ||
/** | ||
* When all observables complete, emit the last emitted value from each. | ||
*/ | ||
export declare const ksForkJoin: <T1, T2>(stream1: Stream<T1>, stream2: Stream<T2>) => Stream<[T1, T2]>; | ||
export declare const ksFromPromise: <T, E>(promise: Promise<T>, behaviour?: import("./core").KsBehaviour) => Stream<Result<T, E>>; | ||
export declare const ksToPromise: <T>(o: Observable<T>) => Promise<Option<T>>; | ||
export declare const ksForkJoin: <A, B>(stream_a: Stream<A>, stream_b: Stream<B>) => Stream<[A, B]>; | ||
export declare const ksFromPromise: <A, E>(promise: Promise<A>, behaviour?: import("./core").KsBehaviour) => Stream<Either<E, A>>; | ||
export declare const ksToPromise: <A>(observable: Observable<A>) => Promise<Option<A>>; |
@@ -1,5 +0,5 @@ | ||
import { ksCold, ksCreateStream, noop, } from './core'; | ||
import { asyncScheduler, ksCold, ksCreateStream, lazySubscription, noop, } from './core'; | ||
import { ksMap } from './transformers'; | ||
import { None, Some } from './ts-option'; | ||
import { Err, Ok } from './ts-result'; | ||
import { isSome, none, some } from './option'; | ||
import { left, right } from './either'; | ||
/** | ||
@@ -27,20 +27,15 @@ * Observable that immediately completes. | ||
*/ | ||
export const ksConcat = (stream1, stream2) => { | ||
return ksCreateStream(stream1.behaviour, ({ next, complete }) => { | ||
let subscription2 = null; | ||
const subscription1 = stream1.subscribe({ | ||
export const ksConcat = (stream_a, stream_b) => { | ||
return ksCreateStream(stream_a.behaviour, ({ next, complete }) => { | ||
const subscription_b = lazySubscription(); | ||
const subscription_a = stream_a.subscribe({ | ||
next, | ||
complete: () => { | ||
subscription2 = stream2.subscribe({ next, complete }); | ||
subscription_b.resolve(stream_b.subscribe({ next, complete })); | ||
}, | ||
}); | ||
const tryUnsubscribeSecond = () => { | ||
if (subscription2 !== null) { | ||
subscription2.unsubscribe(); | ||
} | ||
}; | ||
return { | ||
unsubscribe: () => { | ||
subscription1.unsubscribe(); | ||
tryUnsubscribeSecond(); | ||
subscription_b.unsubscribe(); | ||
subscription_a.unsubscribe(); | ||
}, | ||
@@ -53,31 +48,33 @@ }; | ||
*/ | ||
export const ksMerge = (stream1, stream2) => { | ||
return ksCreateStream(stream1.behaviour, ({ next, complete }) => { | ||
let completed1 = false; | ||
let completed2 = false; | ||
export const ksMerge = (stream_a, stream_b) => { | ||
return ksCreateStream(stream_a.behaviour, ({ next, complete }) => { | ||
let completed_a = false; | ||
let completed_b = false; | ||
const subscription_a = lazySubscription(); | ||
const subscription_b = lazySubscription(); | ||
const unsubscribe = () => { | ||
subscription_b.unsubscribe(); | ||
subscription_a.unsubscribe(); | ||
}; | ||
const tryComplete = () => { | ||
if (completed1 && completed2) { | ||
if (completed_a && completed_b) { | ||
complete(); | ||
unsubscribe(); | ||
} | ||
}; | ||
const subscription1 = stream1.subscribe({ | ||
subscription_a.resolve(stream_a.subscribe({ | ||
next, | ||
complete: () => { | ||
completed1 = true; | ||
completed_a = true; | ||
tryComplete(); | ||
}, | ||
}); | ||
const subscription2 = stream2.subscribe({ | ||
})); | ||
subscription_b.resolve(stream_b.subscribe({ | ||
next, | ||
complete: () => { | ||
completed2 = true; | ||
completed_b = true; | ||
tryComplete(); | ||
}, | ||
}); | ||
return { | ||
unsubscribe: () => { | ||
subscription1.unsubscribe(); | ||
subscription2.unsubscribe(); | ||
}, | ||
}; | ||
})); | ||
return { unsubscribe }; | ||
}); | ||
@@ -88,48 +85,50 @@ }; | ||
*/ | ||
export const ksZip = (stream1, stream2) => { | ||
return ksCreateStream(stream1.behaviour, ({ next, complete }) => { | ||
let completed1 = false; | ||
let completed2 = false; | ||
const queue1 = []; | ||
const queue2 = []; | ||
export const ksZip = (stream_a, stream_b) => { | ||
return ksCreateStream(stream_a.behaviour, ({ next, complete }) => { | ||
let completed_a = false; | ||
let completed_b = false; | ||
const queue_a = []; | ||
const queue_b = []; | ||
const subscription_a = lazySubscription(); | ||
const subscription_b = lazySubscription(); | ||
const unsubscribe = () => { | ||
subscription_b.unsubscribe(); | ||
subscription_a.unsubscribe(); | ||
}; | ||
const tryNext = () => { | ||
if (queue1.length > 0 && queue2.length > 0) { | ||
next([queue1.shift(), queue2.shift()]); | ||
if (queue_a.length > 0 && queue_b.length > 0) { | ||
next([queue_a.shift(), queue_b.shift()]); | ||
} | ||
}; | ||
const tryComplete = () => { | ||
if ((completed1 && queue1.length === 0) || | ||
(completed2 && queue2.length === 0)) { | ||
if ((completed_a && queue_a.length === 0) || | ||
(completed_b && queue_b.length === 0)) { | ||
complete(); | ||
unsubscribe(); | ||
} | ||
}; | ||
const subscription1 = stream1.subscribe({ | ||
subscription_a.resolve(stream_a.subscribe({ | ||
next: value => { | ||
queue1.push(value); | ||
queue_a.push(value); | ||
tryNext(); | ||
}, | ||
complete: () => { | ||
completed1 = true; | ||
completed_a = true; | ||
tryComplete(); | ||
}, | ||
}); | ||
const subscription2 = stream2.subscribe({ | ||
})); | ||
subscription_b.resolve(stream_b.subscribe({ | ||
next: value => { | ||
queue2.push(value); | ||
queue_b.push(value); | ||
tryNext(); | ||
}, | ||
complete: () => { | ||
completed2 = true; | ||
completed_b = true; | ||
tryComplete(); | ||
}, | ||
}); | ||
return { | ||
unsubscribe: () => { | ||
subscription1.unsubscribe(); | ||
subscription2.unsubscribe(); | ||
}, | ||
}; | ||
})); | ||
return { unsubscribe }; | ||
}); | ||
}; | ||
export const ksTimeout = (ms, behaviour = ksCold) => { | ||
export const ksTimeout = (ms, behaviour = ksCold, scheduler = asyncScheduler) => { | ||
return ksCreateStream(behaviour, ({ next, complete }) => { | ||
@@ -140,12 +139,18 @@ const handler = () => { | ||
}; | ||
const timeoutId = setTimeout(handler, ms); | ||
return { unsubscribe: () => clearTimeout(timeoutId) }; | ||
return scheduler.schedule(handler, ms); | ||
}); | ||
}; | ||
export const ksInterval = (ms, behaviour = ksCold) => { | ||
export const ksInterval = (ms, behaviour = ksCold, scheduler = asyncScheduler) => { | ||
return ksCreateStream(behaviour, ({ next }) => { | ||
let count = 0; | ||
const handler = () => next(count++); | ||
const intervalId = setInterval(handler, ms); | ||
return { unsubscribe: () => clearInterval(intervalId) }; | ||
let unsubscribe = noop; | ||
const tick = () => { | ||
unsubscribe = scheduler.schedule(handler, ms).unsubscribe; | ||
}; | ||
const handler = () => { | ||
next(count++); | ||
tick(); | ||
}; | ||
tick(); | ||
return { unsubscribe: () => unsubscribe() }; | ||
}); | ||
@@ -159,44 +164,46 @@ }; | ||
*/ | ||
export const ksCombineLatest = (stream1, stream2) => { | ||
return ksCreateStream(stream1.behaviour, ({ next, complete }) => { | ||
let completed1 = false; | ||
let completed2 = false; | ||
let value1 = None(); | ||
let value2 = None(); | ||
export const ksCombineLatest = (stream_a, stream_b) => { | ||
return ksCreateStream(stream_a.behaviour, ({ next, complete }) => { | ||
let completed_a = false; | ||
let completed_b = false; | ||
let value_a = none; | ||
let value_b = none; | ||
const subscription_a = lazySubscription(); | ||
const subscription_b = lazySubscription(); | ||
const unsubscribe = () => { | ||
subscription_b.unsubscribe(); | ||
subscription_a.unsubscribe(); | ||
}; | ||
const tryNext = () => { | ||
if (value1._tag === 'Some' && value2._tag === 'Some') { | ||
return next([value1.some, value2.some]); | ||
if (isSome(value_a) && isSome(value_b)) { | ||
return next([value_a.value, value_b.value]); | ||
} | ||
}; | ||
const tryComplete = () => { | ||
if (completed1 && completed2) { | ||
if (completed_a && completed_b) { | ||
complete(); | ||
unsubscribe(); | ||
} | ||
}; | ||
const subscription1 = stream1.subscribe({ | ||
subscription_a.resolve(stream_a.subscribe({ | ||
next: value => { | ||
value1 = Some(value); | ||
value_a = some(value); | ||
tryNext(); | ||
}, | ||
complete: () => { | ||
completed1 = true; | ||
completed_a = true; | ||
tryComplete(); | ||
}, | ||
}); | ||
const subscription2 = stream2.subscribe({ | ||
})); | ||
subscription_b.resolve(stream_b.subscribe({ | ||
next: value => { | ||
value2 = Some(value); | ||
value_b = some(value); | ||
tryNext(); | ||
}, | ||
complete: () => { | ||
completed2 = true; | ||
completed_b = true; | ||
tryComplete(); | ||
}, | ||
}); | ||
return { | ||
unsubscribe: () => { | ||
subscription1.unsubscribe(); | ||
subscription2.unsubscribe(); | ||
}, | ||
}; | ||
})); | ||
return { unsubscribe }; | ||
}); | ||
@@ -207,37 +214,36 @@ }; | ||
*/ | ||
export const ksForkJoin = (stream1, stream2) => { | ||
return ksCreateStream(stream1.behaviour, ({ next, complete }) => { | ||
let completed1 = false; | ||
let completed2 = false; | ||
let value1 = None(); | ||
let value2 = None(); | ||
export const ksForkJoin = (stream_a, stream_b) => { | ||
return ksCreateStream(stream_a.behaviour, ({ next, complete }) => { | ||
let completed_a = false; | ||
let completed_b = false; | ||
let value_a = none; | ||
let value_b = none; | ||
const subscription_a = lazySubscription(); | ||
const subscription_b = lazySubscription(); | ||
const unsubscribe = () => { | ||
subscription_b.unsubscribe(); | ||
subscription_a.unsubscribe(); | ||
}; | ||
const tryComplete = () => { | ||
if (completed1 && | ||
completed2 && | ||
value1._tag === 'Some' && | ||
value2._tag === 'Some') { | ||
next([value1.some, value2.some]); | ||
if (completed_a && completed_b && isSome(value_a) && isSome(value_b)) { | ||
next([value_a.value, value_b.value]); | ||
complete(); | ||
unsubscribe(); | ||
} | ||
}; | ||
const subscription1 = stream1.subscribe({ | ||
next: value => (value1 = Some(value)), | ||
subscription_a.resolve(stream_a.subscribe({ | ||
next: value => (value_a = some(value)), | ||
complete: () => { | ||
completed1 = true; | ||
completed_a = true; | ||
tryComplete(); | ||
}, | ||
}); | ||
const subscription2 = stream2.subscribe({ | ||
next: value => (value2 = Some(value)), | ||
})); | ||
subscription_b.resolve(stream_b.subscribe({ | ||
next: value => (value_b = some(value)), | ||
complete: () => { | ||
completed2 = true; | ||
completed_b = true; | ||
tryComplete(); | ||
}, | ||
}); | ||
return { | ||
unsubscribe: () => { | ||
subscription1.unsubscribe(); | ||
subscription2.unsubscribe(); | ||
}, | ||
}; | ||
})); | ||
return { unsubscribe }; | ||
}); | ||
@@ -251,3 +257,3 @@ }; | ||
if (on) { | ||
next(Ok(value)); | ||
next(right(value)); | ||
complete(); | ||
@@ -258,3 +264,3 @@ } | ||
if (on) { | ||
next(Err(err)); | ||
next(left(err)); | ||
complete(); | ||
@@ -266,11 +272,8 @@ } | ||
}; | ||
export const ksToPromise = (o) => { | ||
export const ksToPromise = (observable) => { | ||
return new Promise(resolve => { | ||
let result = None(); | ||
const s = o.subscribe({ | ||
next: value => (result = Some(value)), | ||
complete: () => { | ||
resolve(result); | ||
setTimeout(() => s.unsubscribe()); | ||
}, | ||
let result = none; | ||
observable.subscribe({ | ||
next: value => (result = some(value)), | ||
complete: () => resolve(result), | ||
}); | ||
@@ -277,0 +280,0 @@ }); |
@@ -6,3 +6,3 @@ export * from './core'; | ||
export * from './transformers'; | ||
export * from './ts-option'; | ||
export * from './ts-result'; | ||
export * from './option'; | ||
export * from './either'; |
@@ -6,4 +6,4 @@ export * from './core'; | ||
export * from './transformers'; | ||
export * from './ts-option'; | ||
export * from './ts-result'; | ||
export * from './option'; | ||
export * from './either'; | ||
//# sourceMappingURL=index.js.map |
@@ -1,6 +0,6 @@ | ||
import { CompleteFn, Stream } from './core'; | ||
export declare type Subject<T> = Stream<T> & { | ||
readonly next: (value: T) => void; | ||
readonly complete: CompleteFn; | ||
import { Complete, Stream } from './core'; | ||
export declare type Subject<A> = Stream<A> & { | ||
readonly next: (value: A) => void; | ||
readonly complete: Complete; | ||
}; | ||
export declare const ksSubject: <T>(behaviour?: import("./core").KsBehaviour) => Subject<T>; | ||
export declare const ksSubject: <A>(behaviour?: import("./core").KsBehaviour) => Subject<A>; |
@@ -5,6 +5,6 @@ import { ksCreateStream, noop, ksShare, } from './core'; | ||
let subjectObserver = null; | ||
const next = (value) => { | ||
if (!isCompleted) { | ||
subjectObserver === null || subjectObserver === void 0 ? void 0 : subjectObserver.next(value); | ||
} | ||
const next = value => { | ||
if (isCompleted) | ||
return; | ||
subjectObserver === null || subjectObserver === void 0 ? void 0 : subjectObserver.next(value); | ||
}; | ||
@@ -15,4 +15,4 @@ const complete = () => { | ||
}; | ||
const stream = ksCreateStream(behaviour, o => { | ||
subjectObserver = o; | ||
const stream = ksCreateStream(behaviour, observer => { | ||
subjectObserver = observer; | ||
return { unsubscribe: noop }; | ||
@@ -27,5 +27,3 @@ }); | ||
} | ||
else { | ||
return stream.subscribe(observer); | ||
} | ||
return stream.subscribe(observer); | ||
}, | ||
@@ -32,0 +30,0 @@ pipe: stream.pipe, |
@@ -1,55 +0,55 @@ | ||
import { KsBehaviour, Observer, Stream, TransformFn } from './core'; | ||
import { Option } from './ts-option'; | ||
export declare const ksChangeBehaviour: <T>(b: KsBehaviour) => TransformFn<T, T>; | ||
import { KsBehaviour, Observer, Stream, Transformer } from './core'; | ||
import { Option } from './option'; | ||
export declare const ksChangeBehaviour: <A>(behaviour: KsBehaviour) => Transformer<A, A>; | ||
/** | ||
* Apply projection with each value from source. | ||
*/ | ||
export declare const ksMap: <T, O>(project: (value: T) => O) => TransformFn<T, O>; | ||
export declare const ksMap: <A, B>(project: (value: A) => B) => Transformer<A, B>; | ||
/** | ||
* Map emissions to constant value. | ||
*/ | ||
export declare const ksMapTo: <T, O>(value: O) => TransformFn<T, O>; | ||
export declare const ksMapTo: <A, B>(value: B) => Transformer<A, B>; | ||
/** | ||
* Transparently perform actions or side-effects, such as logging. | ||
*/ | ||
export declare const ksTap: <T>(tapObserver: Partial<Observer<T>>) => TransformFn<T, T>; | ||
export declare const ksTap: <A>(observer: Observer<A>) => Transformer<A, A>; | ||
/** | ||
* Emit values that pass the provided condition. | ||
*/ | ||
export declare const ksFilter: <T, O>(select: (value: T) => Option<O>) => TransformFn<T, O>; | ||
export declare const ksFilterMap: <A, B>(select: (value: A) => Option<B>) => Transformer<A, B>; | ||
/** | ||
* Map to observable, complete previous inner observable, emit values. | ||
*/ | ||
export declare const ksSwitch: <T, O>(project: (value: T) => Stream<O>) => TransformFn<T, O>; | ||
export declare const ksSwitch: <A, B>(project: (value: A) => Stream<B>) => Transformer<A, B>; | ||
/** | ||
* Emit values until provided observable emits or completes. | ||
*/ | ||
export declare const ksTakeUntil: <T>(notifier: Stream<unknown>) => TransformFn<T, T>; | ||
export declare const ksTakeUntil: <A>(notifier: Stream<unknown>) => Transformer<A, A>; | ||
/** | ||
* Emit provided number of values before completing. | ||
*/ | ||
export declare const ksTake: <T>(count: number) => TransformFn<T, T>; | ||
export declare const ksTake: <A>(count: number) => Transformer<A, A>; | ||
/** | ||
* Emit values until provided expression is false. | ||
*/ | ||
export declare const ksTakeWhile: <T>(predicate: (value: T) => boolean) => TransformFn<T, T>; | ||
export declare const ksTakeWhile: <A>(predicate: (value: A) => boolean) => Transformer<A, A>; | ||
/** | ||
* Delay emitted values by given time. | ||
*/ | ||
export declare const ksDelay: <T>(delay: number) => TransformFn<T, T>; | ||
export declare const ksDelay: <A>(ms: number) => Transformer<A, A>; | ||
/** | ||
* Discard emitted values that take less than the specified time between output. | ||
*/ | ||
export declare const ksDebounce: <T>(dueTime: number) => TransformFn<T, T>; | ||
export declare const ksDebounce: <A>(dueTime: number) => Transformer<A, A>; | ||
/** | ||
* Emit first value then ignore for specified duration. | ||
*/ | ||
export declare const ksThrottle: <T>(duration: number) => TransformFn<T, T>; | ||
export declare const ksThrottle: <A>(duration: number) => Transformer<A, A>; | ||
/** | ||
* Emit the previous and current values as an array. | ||
*/ | ||
export declare const ksPairwise: <T>() => TransformFn<T, [T, T]>; | ||
export declare const ksPairwise: <A>() => Transformer<A, [A, A]>; | ||
/** | ||
* Reduce over time. | ||
*/ | ||
export declare const ksScan: <T, O>(accumulator: (acc: O, curr: T) => O, seed: O) => TransformFn<T, O>; | ||
export declare const ksScan: <A, B>(accumulator: (acc: B, curr: A) => B, seed: B) => Transformer<A, B>; |
@@ -1,5 +0,5 @@ | ||
import { ksCreateStream, } from './core'; | ||
import { Some, None } from './ts-option'; | ||
export const ksChangeBehaviour = (b) => { | ||
return s => ksCreateStream(b, s.subscribe); | ||
import { ksCreateStream, lazySubscription, } from './core'; | ||
import { some, none, isSome } from './option'; | ||
export const ksChangeBehaviour = (behaviour) => { | ||
return stream => ksCreateStream(behaviour, stream.subscribe); | ||
}; | ||
@@ -10,3 +10,3 @@ /** | ||
export const ksMap = (project) => { | ||
return (stream) => { | ||
return stream => { | ||
return ksCreateStream(stream.behaviour, ({ next, complete }) => { | ||
@@ -24,3 +24,3 @@ return stream.subscribe({ | ||
export const ksMapTo = (value) => { | ||
return (stream) => { | ||
return stream => { | ||
return ksCreateStream(stream.behaviour, ({ next, complete }) => { | ||
@@ -37,4 +37,4 @@ return stream.subscribe({ | ||
*/ | ||
export const ksTap = (tapObserver) => { | ||
return (stream) => { | ||
export const ksTap = (observer) => { | ||
return stream => { | ||
return ksCreateStream(stream.behaviour, ({ next, complete }) => { | ||
@@ -44,3 +44,3 @@ return stream.subscribe({ | ||
var _a; | ||
(_a = tapObserver.next) === null || _a === void 0 ? void 0 : _a.call(tapObserver, value); | ||
(_a = observer.next) === null || _a === void 0 ? void 0 : _a.call(observer, value); | ||
next(value); | ||
@@ -50,3 +50,3 @@ }, | ||
var _a; | ||
(_a = tapObserver.complete) === null || _a === void 0 ? void 0 : _a.call(tapObserver); | ||
(_a = observer.complete) === null || _a === void 0 ? void 0 : _a.call(observer); | ||
complete(); | ||
@@ -61,10 +61,10 @@ }, | ||
*/ | ||
export const ksFilter = (select) => { | ||
return (stream) => { | ||
export const ksFilterMap = (select) => { | ||
return stream => { | ||
return ksCreateStream(stream.behaviour, ({ next, complete }) => { | ||
return stream.subscribe({ | ||
next: (value) => { | ||
const o = select(value); | ||
if (o._tag === 'Some') { | ||
next(o.some); | ||
next: value => { | ||
const valueOptional = select(value); | ||
if (isSome(valueOptional)) { | ||
next(valueOptional.value); | ||
} | ||
@@ -81,3 +81,3 @@ }, | ||
export const ksSwitch = (project) => { | ||
return (stream) => { | ||
return stream => { | ||
return ksCreateStream(stream.behaviour, ({ next, complete }) => { | ||
@@ -130,5 +130,4 @@ let projectSubscription = null; | ||
export const ksTakeUntil = (notifier) => { | ||
return (stream) => { | ||
return stream => { | ||
const newStream = ksCreateStream(stream.behaviour, ({ next, complete }) => { | ||
let notified = false; | ||
let isCompleted = false; | ||
@@ -145,25 +144,23 @@ const mainSubscription = stream.subscribe({ | ||
} | ||
else { | ||
const terminate = () => { | ||
if (!notified) { | ||
notified = true; | ||
complete(); | ||
mainSubscription.unsubscribe(); | ||
setTimeout(() => notifierSubscription.unsubscribe()); | ||
} | ||
}; | ||
const notifierSubscription = notifier.subscribe({ | ||
next: terminate, | ||
complete: terminate, | ||
}); | ||
return { | ||
unsubscribe: () => { | ||
notifierSubscription.unsubscribe(); | ||
mainSubscription.unsubscribe(); | ||
}, | ||
}; | ||
} | ||
let isTerminated = false; | ||
const notifierSubscription = lazySubscription(); | ||
const unsubscribe = () => { | ||
notifierSubscription.unsubscribe(); | ||
mainSubscription.unsubscribe(); | ||
}; | ||
const terminate = () => { | ||
if (isTerminated) | ||
return; | ||
isTerminated = true; | ||
complete(); | ||
unsubscribe(); | ||
}; | ||
notifierSubscription.resolve(notifier.subscribe({ | ||
next: terminate, | ||
complete: terminate, | ||
})); | ||
return { unsubscribe }; | ||
}); | ||
return Object.assign(Object.assign({}, newStream), { pipe: () => { | ||
throw 'Disallows the application of operators after takeUntil. Operators placed after takeUntil can effect subscription leaks.'; | ||
throw 'Disallows the application of operators after `takeUntil`. Operators placed after `takeUntil` can effect subscription leaks.'; | ||
} }); | ||
@@ -176,14 +173,20 @@ }; | ||
export const ksTake = (count) => { | ||
return (stream) => { | ||
return stream => { | ||
return ksCreateStream(stream.behaviour, ({ next, complete }) => { | ||
let counter = 0; | ||
const tryNext = value => { | ||
const subscription = lazySubscription(); | ||
const onComplete = () => { | ||
complete(); | ||
subscription.unsubscribe(); | ||
}; | ||
const onNext = value => { | ||
next(value); | ||
if (++counter >= count) { | ||
complete(); | ||
setTimeout(() => subscription.unsubscribe()); | ||
onComplete(); | ||
} | ||
}; | ||
const subscription = stream.subscribe({ next: tryNext, complete }); | ||
return subscription; | ||
return subscription.resolve(stream.subscribe({ | ||
next: onNext, | ||
complete: onComplete, | ||
})); | ||
}); | ||
@@ -196,15 +199,21 @@ }; | ||
export const ksTakeWhile = (predicate) => { | ||
return (stream) => { | ||
return stream => { | ||
return ksCreateStream(stream.behaviour, ({ next, complete }) => { | ||
return stream.subscribe({ | ||
next: value => { | ||
if (predicate(value)) { | ||
next(value); | ||
} | ||
else { | ||
complete(); | ||
} | ||
}, | ||
complete, | ||
}); | ||
const subscription = lazySubscription(); | ||
const onComplete = () => { | ||
complete(); | ||
subscription.unsubscribe(); | ||
}; | ||
const onNext = value => { | ||
if (predicate(value)) { | ||
next(value); | ||
} | ||
else { | ||
onComplete(); | ||
} | ||
}; | ||
return subscription.resolve(stream.subscribe({ | ||
next: onNext, | ||
complete: onComplete, | ||
})); | ||
}); | ||
@@ -216,10 +225,8 @@ }; | ||
*/ | ||
export const ksDelay = (delay) => { | ||
return (stream) => { | ||
export const ksDelay = (ms) => { | ||
return stream => { | ||
return ksCreateStream(stream.behaviour, ({ next, complete }) => { | ||
const timers = new Map(); | ||
const timers = new Set(); | ||
const clearTimers = () => { | ||
for (const t of timers.keys()) { | ||
clearTimeout(t); | ||
} | ||
timers.forEach(t => clearTimeout(t)); | ||
timers.clear(); | ||
@@ -232,4 +239,4 @@ }; | ||
timers.delete(t); | ||
}, delay); | ||
timers.set(t); | ||
}, ms); | ||
timers.add(t); | ||
}, | ||
@@ -240,4 +247,4 @@ complete: () => { | ||
timers.delete(t); | ||
}, delay); | ||
timers.set(t); | ||
}, ms); | ||
timers.add(t); | ||
}, | ||
@@ -258,14 +265,14 @@ }); | ||
export const ksDebounce = (dueTime) => { | ||
return (stream) => { | ||
return stream => { | ||
return ksCreateStream(stream.behaviour, ({ next, complete }) => { | ||
let timeoutId; | ||
let lastValue = None(); | ||
let lastValue = none; | ||
const tryNext = () => { | ||
if (lastValue._tag === 'Some') { | ||
next(lastValue.some); | ||
lastValue = None(); | ||
if (isSome(lastValue)) { | ||
next(lastValue.value); | ||
lastValue = none; | ||
} | ||
}; | ||
const debounceNext = value => { | ||
lastValue = Some(value); | ||
lastValue = some(value); | ||
clearTimeout(timeoutId); | ||
@@ -297,14 +304,14 @@ timeoutId = setTimeout(tryNext, dueTime); | ||
export const ksThrottle = (duration) => { | ||
return (stream) => { | ||
return stream => { | ||
return ksCreateStream(stream.behaviour, ({ next, complete }) => { | ||
let executedTime = Number.MIN_SAFE_INTEGER; | ||
let lastValue = None(); | ||
let lastValue = none; | ||
const tryNext = () => { | ||
if (lastValue._tag === 'Some') { | ||
next(lastValue.some); | ||
lastValue = None(); | ||
if (isSome(lastValue)) { | ||
next(lastValue.value); | ||
lastValue = none; | ||
} | ||
}; | ||
const throttleNext = value => { | ||
lastValue = Some(value); | ||
lastValue = some(value); | ||
const now = Date.now(); | ||
@@ -332,11 +339,11 @@ const diff = now - executedTime; | ||
export const ksPairwise = () => { | ||
return (o) => { | ||
return ksCreateStream(o.behaviour, ({ next, complete }) => { | ||
let prevValue = None(); | ||
return o.subscribe({ | ||
return stream => { | ||
return ksCreateStream(stream.behaviour, ({ next, complete }) => { | ||
let prevValue = none; | ||
return stream.subscribe({ | ||
next: value => { | ||
if (prevValue._tag === 'Some') { | ||
next([prevValue.some, value]); | ||
if (isSome(prevValue)) { | ||
next([prevValue.value, value]); | ||
} | ||
prevValue = Some(value); | ||
prevValue = some(value); | ||
}, | ||
@@ -352,6 +359,6 @@ complete, | ||
export const ksScan = (accumulator, seed) => { | ||
return (o) => { | ||
return ksCreateStream(o.behaviour, ({ next, complete }) => { | ||
return stream => { | ||
return ksCreateStream(stream.behaviour, ({ next, complete }) => { | ||
let acc = seed; | ||
return o.subscribe({ | ||
return stream.subscribe({ | ||
next: value => { | ||
@@ -358,0 +365,0 @@ acc = accumulator(acc, value); |
{ | ||
"name": "@keiii/k-stream", | ||
"version": "0.0.31", | ||
"version": "0.0.32", | ||
"description": "K-Stream is a functional reactive stream library for TypeScript", | ||
@@ -13,3 +13,2 @@ "main": "./dist/index.js", | ||
"test": "jest", | ||
"coveralls": "jest --coverage && cat ./coverage/lcov.info | ./node_modules/coveralls/bin/coveralls.js", | ||
"prettier": "prettier --write ./src/*.ts", | ||
@@ -32,10 +31,10 @@ "version-patch": "npm version patch" | ||
"devDependencies": { | ||
"@types/jest": "^25.2.3", | ||
"@types/jest": "^26.0.20", | ||
"coveralls": "^3.1.0", | ||
"jest": "^25.5.4", | ||
"prettier": "^2.0.5", | ||
"rxjs": "^6.6.2", | ||
"ts-jest": "^25.5.1", | ||
"typescript": "^3.9.7" | ||
"jest": "^26.6.3", | ||
"prettier": "^2.2.1", | ||
"rxjs": "^6.6.3", | ||
"ts-jest": "^26.4.4", | ||
"typescript": "^4.1.3" | ||
} | ||
} |
@@ -5,3 +5,2 @@ # Functional reactive stream library for TypeScript | ||
[](https://www.npmjs.com/package/@keiii/k-stream) | ||
[](https://travis-ci.com/KEIII/k-stream) | ||
[](https://coveralls.io/github/KEIII/k-stream?branch=master) | ||
@@ -15,11 +14,11 @@ | ||
```typescript | ||
import { ksPeriodic as periodic, ksShare as share, ksFilter as filter, ksTake as take, Some, None } from "@keiii/k-stream"; | ||
import { ksPeriodic, ksShare, ksFilterMap, ksTake, some, none } from "@keiii/k-stream"; | ||
const stream = periodic(100, share) | ||
.pipe(filter(n => n % 2 === 0 ? Some(n) : None(n))) | ||
.pipe(take(10)); | ||
const stream = ksPeriodic(100, ksShare) | ||
.pipe(ksFilterMap(n => (n % 2 === 0 ? some(n) : none))) | ||
.pipe(ksTake(10)); | ||
stream.subscribe({ | ||
next: console.log, | ||
complete: () => console.log("complete!"), | ||
complete: () => console.log('complete!'), | ||
}); | ||
@@ -29,6 +28,6 @@ ``` | ||
```typescript | ||
const stream = ksCreateStream<MouseEvent>(ksShare, ({ next, complete }) => { | ||
const handler = (e: MouseEvent) => next(e); | ||
document.addEventListener("click", handler); | ||
return { unsubscribe: () => document.removeEventListener("click", handler) }; | ||
const stream = ksCreateStream<MouseEvent>(ksShare, observer => { | ||
const handler = (e: MouseEvent) => observer.next(e); | ||
document.addEventListener('click', handler); | ||
return { unsubscribe: () => document.removeEventListener('click', handler) }; | ||
}); | ||
@@ -39,4 +38,4 @@ ``` | ||
- “Hot” streams stay “hot” after pipe usage (https://github.com/ReactiveX/rxjs/issues/1148) | ||
- Do not lose type information | ||
- RxJS similar | ||
- Type safe, no “any” | ||
- Looks like RxJS syntax | ||
@@ -43,0 +42,0 @@ ## ToDo |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
63663
1043
44