@reactive-js/observable
Advanced tools
Comparing version 0.0.16 to 0.0.17
@@ -0,1 +1,2 @@ | ||
export { buffer } from "./internal/buffer"; | ||
export { lift } from "./internal/lift"; | ||
@@ -2,0 +3,0 @@ export { observe, onComplete, onError, onNext } from "./internal/observe"; |
"use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
var buffer_1 = require("./internal/buffer"); | ||
exports.buffer = buffer_1.buffer; | ||
var lift_1 = require("./internal/lift"); | ||
@@ -4,0 +6,0 @@ exports.lift = lift_1.lift; |
"use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
const disposable_1 = require("@reactive-js/disposable"); | ||
const rx_1 = require("@reactive-js/rx"); | ||
const fromArray_1 = require("./fromArray"); | ||
const observe_1 = require("./observe"); | ||
const pipe_1 = require("@reactive-js/pipe"); | ||
const defer_1 = require("./defer"); | ||
class ConcatObservable { | ||
constructor(observables) { | ||
class ConcatSubscriber extends rx_1.DelegatingSubscriber { | ||
constructor(delegate, observables) { | ||
super(delegate); | ||
this.observables = observables; | ||
this.innerSubscription = disposable_1.disposed; | ||
this.index = 0; | ||
} | ||
onNext(v) { | ||
this.subscriber.next(v); | ||
} | ||
onComplete(error) { | ||
const subscriber = this.subscriber; | ||
subscriber.remove(this.innerSubscription); | ||
complete(error) { | ||
if (error !== undefined) { | ||
subscriber.complete(error); | ||
this.delegate.complete(error); | ||
} | ||
else if (!this.subscribeNext()) { | ||
subscriber.complete(); | ||
else { | ||
const head = this.observables[this.index]; | ||
const hasNextObservable = head !== undefined; | ||
if (hasNextObservable) { | ||
this.index++; | ||
head.subscribe(this); | ||
} | ||
else { | ||
this.delegate.complete(); | ||
} | ||
} | ||
} | ||
subscribeNext() { | ||
const head = this.observables[this.index]; | ||
const hasNextObservable = head !== undefined; | ||
if (hasNextObservable) { | ||
this.index++; | ||
const subscriber = this.subscriber; | ||
this.innerSubscription = pipe_1.pipe(head, observe_1.observe(this), rx_1.subscribe(subscriber)); | ||
subscriber.add(this.innerSubscription); | ||
} | ||
return hasNextObservable; | ||
next(data) { | ||
this.delegate.next(data); | ||
} | ||
} | ||
class ConcatObservable { | ||
constructor(observables) { | ||
this.observables = observables; | ||
} | ||
subscribe(subscriber) { | ||
this.subscriber = subscriber; | ||
this.subscribeNext(); | ||
const concatSubscriber = new ConcatSubscriber(subscriber, this.observables); | ||
concatSubscriber.complete(); | ||
} | ||
} | ||
function concat(...observables) { | ||
return defer_1.defer(() => new ConcatObservable(observables)); | ||
return new ConcatObservable(observables); | ||
} | ||
@@ -47,0 +43,0 @@ exports.concat = concat; |
@@ -62,13 +62,16 @@ "use strict"; | ||
this.error = undefined; | ||
const observer = { | ||
onNext: (value) => { | ||
this.value = [value]; | ||
}, | ||
onComplete: e => { | ||
this.error = e; | ||
}, | ||
}; | ||
const subscription = pipe_1.pipe(observable, observe_1.observe(observer), rx_1.subscribe(scheduler)); | ||
const subscription = pipe_1.pipe(observable, observe_1.observe(this), rx_1.subscribe(scheduler)); | ||
scheduler.add(subscription); | ||
} | ||
onNext(value) { | ||
if (this.value === undefined) { | ||
this.value = [value]; | ||
} | ||
else { | ||
this.value[0] = value; | ||
} | ||
} | ||
onComplete(error) { | ||
this.error = error; | ||
} | ||
next() { | ||
@@ -75,0 +78,0 @@ disposable_1.throwIfDisposed(this.scheduler); |
@@ -35,11 +35,15 @@ "use strict"; | ||
}); | ||
exports.onError = (onError) => exports.observe({ | ||
onNext: ignore, | ||
onComplete: (error) => { | ||
class OnErrorObserver { | ||
constructor(onError) { | ||
this.onError = onError; | ||
} | ||
onComplete(error) { | ||
if (error !== undefined) { | ||
const { cause } = error; | ||
onError(cause); | ||
this.onError(cause); | ||
} | ||
}, | ||
}); | ||
} | ||
onNext(_) { } | ||
} | ||
exports.onError = (onError) => exports.observe(new OnErrorObserver(onError)); | ||
exports.onNext = (onNext) => exports.observe({ | ||
@@ -46,0 +50,0 @@ onNext, |
@@ -16,20 +16,20 @@ "use strict"; | ||
this.value = undefined; | ||
this.notifyNext = () => { | ||
const value = this.value; | ||
if (value !== undefined) { | ||
this.value = undefined; | ||
const [next] = value; | ||
this.setupDurationSubscription(next); | ||
try { | ||
this.delegate.next(next); | ||
} | ||
catch (cause) { | ||
this.delegate.complete({ cause }); | ||
} | ||
} | ||
}; | ||
this.add(this.durationSubscription); | ||
} | ||
notifyNext() { | ||
const value = this.value; | ||
if (value !== undefined) { | ||
this.value = undefined; | ||
const [next] = value; | ||
this.setupDurationSubscription(next); | ||
try { | ||
this.delegate.next(next); | ||
} | ||
catch (cause) { | ||
this.delegate.complete({ cause }); | ||
} | ||
} | ||
} | ||
setupDurationSubscription(next) { | ||
this.durationSubscription.disposable = pipe_1.pipe(this.durationSelector(next), observe_1.onComplete(this.notifyNext), rx_1.subscribe(this)); | ||
this.durationSubscription.disposable = pipe_1.pipe(this.durationSelector(next), observe_1.observe(this), rx_1.subscribe(this)); | ||
} | ||
@@ -61,2 +61,11 @@ complete(error) { | ||
} | ||
onComplete(error) { | ||
if (error !== undefined) { | ||
this.complete(error); | ||
} | ||
else { | ||
this.notifyNext(); | ||
} | ||
} | ||
onNext(_) { } | ||
} | ||
@@ -63,0 +72,0 @@ const throttleOperator = (durationSelector, mode) => subscriber => new ThrottleSubscriber(subscriber, durationSelector, mode); |
@@ -7,8 +7,8 @@ "use strict"; | ||
this.delay = delay; | ||
this.subscribe = (subscriber) => { | ||
const continuation = (_) => { | ||
subscriber.complete(this.error); | ||
}; | ||
subscriber.schedule(continuation, this.delay); | ||
} | ||
subscribe(subscriber) { | ||
const continuation = (_) => { | ||
subscriber.complete(this.error); | ||
}; | ||
subscriber.schedule(continuation, this.delay); | ||
} | ||
@@ -15,0 +15,0 @@ } |
@@ -19,6 +19,10 @@ "use strict"; | ||
if (!this.isDisposed) { | ||
this.durationSubscription.disposable = pipe_1.pipe(this.duration, observe_1.onComplete(error => this.complete(error)), rx_1.subscribe(this)); | ||
this.durationSubscription.disposable = pipe_1.pipe(this.duration, observe_1.observe(this), rx_1.subscribe(this)); | ||
this.delegate.next(data); | ||
} | ||
} | ||
onComplete(error) { | ||
this.complete(error); | ||
} | ||
onNext(_) { } | ||
} | ||
@@ -25,0 +29,0 @@ const operator = (duration) => subscriber => new TimeoutSubscriber(subscriber, duration); |
@@ -0,1 +1,2 @@ | ||
export { buffer } from "./internal/buffer"; | ||
export { lift } from "./internal/lift"; | ||
@@ -2,0 +3,0 @@ export { observe, onComplete, onError, onNext } from "./internal/observe"; |
@@ -0,1 +1,2 @@ | ||
export { buffer } from "./internal/buffer"; | ||
export { lift } from "./internal/lift"; | ||
@@ -2,0 +3,0 @@ export { observe, onComplete, onError, onNext } from "./internal/observe"; |
@@ -1,44 +0,40 @@ | ||
import { disposed } from "@reactive-js/disposable"; | ||
import { subscribe, } from "@reactive-js/rx"; | ||
import { DelegatingSubscriber, } from "@reactive-js/rx"; | ||
import { fromArray } from "./fromArray"; | ||
import { observe } from "./observe"; | ||
import { pipe } from "@reactive-js/pipe"; | ||
import { defer } from "./defer"; | ||
class ConcatObservable { | ||
constructor(observables) { | ||
class ConcatSubscriber extends DelegatingSubscriber { | ||
constructor(delegate, observables) { | ||
super(delegate); | ||
this.observables = observables; | ||
this.innerSubscription = disposed; | ||
this.index = 0; | ||
} | ||
onNext(v) { | ||
this.subscriber.next(v); | ||
} | ||
onComplete(error) { | ||
const subscriber = this.subscriber; | ||
subscriber.remove(this.innerSubscription); | ||
complete(error) { | ||
if (error !== undefined) { | ||
subscriber.complete(error); | ||
this.delegate.complete(error); | ||
} | ||
else if (!this.subscribeNext()) { | ||
subscriber.complete(); | ||
else { | ||
const head = this.observables[this.index]; | ||
const hasNextObservable = head !== undefined; | ||
if (hasNextObservable) { | ||
this.index++; | ||
head.subscribe(this); | ||
} | ||
else { | ||
this.delegate.complete(); | ||
} | ||
} | ||
} | ||
subscribeNext() { | ||
const head = this.observables[this.index]; | ||
const hasNextObservable = head !== undefined; | ||
if (hasNextObservable) { | ||
this.index++; | ||
const subscriber = this.subscriber; | ||
this.innerSubscription = pipe(head, observe(this), subscribe(subscriber)); | ||
subscriber.add(this.innerSubscription); | ||
} | ||
return hasNextObservable; | ||
next(data) { | ||
this.delegate.next(data); | ||
} | ||
} | ||
class ConcatObservable { | ||
constructor(observables) { | ||
this.observables = observables; | ||
} | ||
subscribe(subscriber) { | ||
this.subscriber = subscriber; | ||
this.subscribeNext(); | ||
const concatSubscriber = new ConcatSubscriber(subscriber, this.observables); | ||
concatSubscriber.complete(); | ||
} | ||
} | ||
export function concat(...observables) { | ||
return defer(() => new ConcatObservable(observables)); | ||
return new ConcatObservable(observables); | ||
} | ||
@@ -45,0 +41,0 @@ export function startWith(...values) { |
@@ -60,13 +60,16 @@ import { subscribe, } from "@reactive-js/rx"; | ||
this.error = undefined; | ||
const observer = { | ||
onNext: (value) => { | ||
this.value = [value]; | ||
}, | ||
onComplete: e => { | ||
this.error = e; | ||
}, | ||
}; | ||
const subscription = pipe(observable, observe(observer), subscribe(scheduler)); | ||
const subscription = pipe(observable, observe(this), subscribe(scheduler)); | ||
scheduler.add(subscription); | ||
} | ||
onNext(value) { | ||
if (this.value === undefined) { | ||
this.value = [value]; | ||
} | ||
else { | ||
this.value[0] = value; | ||
} | ||
} | ||
onComplete(error) { | ||
this.error = error; | ||
} | ||
next() { | ||
@@ -73,0 +76,0 @@ throwIfDisposed(this.scheduler); |
@@ -33,11 +33,15 @@ import { DelegatingSubscriber, } from "@reactive-js/rx"; | ||
}); | ||
export const onError = (onError) => observe({ | ||
onNext: ignore, | ||
onComplete: (error) => { | ||
class OnErrorObserver { | ||
constructor(onError) { | ||
this.onError = onError; | ||
} | ||
onComplete(error) { | ||
if (error !== undefined) { | ||
const { cause } = error; | ||
onError(cause); | ||
this.onError(cause); | ||
} | ||
}, | ||
}); | ||
} | ||
onNext(_) { } | ||
} | ||
export const onError = (onError) => observe(new OnErrorObserver(onError)); | ||
export const onNext = (onNext) => observe({ | ||
@@ -44,0 +48,0 @@ onNext, |
@@ -5,3 +5,3 @@ import { createSerialDisposable, } from "@reactive-js/disposable"; | ||
import { lift } from "./lift"; | ||
import { onComplete } from "./observe"; | ||
import { observe } from "./observe"; | ||
import { pipe } from "@reactive-js/pipe"; | ||
@@ -15,20 +15,20 @@ class ThrottleSubscriber extends DelegatingSubscriber { | ||
this.value = undefined; | ||
this.notifyNext = () => { | ||
const value = this.value; | ||
if (value !== undefined) { | ||
this.value = undefined; | ||
const [next] = value; | ||
this.setupDurationSubscription(next); | ||
try { | ||
this.delegate.next(next); | ||
} | ||
catch (cause) { | ||
this.delegate.complete({ cause }); | ||
} | ||
} | ||
}; | ||
this.add(this.durationSubscription); | ||
} | ||
notifyNext() { | ||
const value = this.value; | ||
if (value !== undefined) { | ||
this.value = undefined; | ||
const [next] = value; | ||
this.setupDurationSubscription(next); | ||
try { | ||
this.delegate.next(next); | ||
} | ||
catch (cause) { | ||
this.delegate.complete({ cause }); | ||
} | ||
} | ||
} | ||
setupDurationSubscription(next) { | ||
this.durationSubscription.disposable = pipe(this.durationSelector(next), onComplete(this.notifyNext), subscribe(this)); | ||
this.durationSubscription.disposable = pipe(this.durationSelector(next), observe(this), subscribe(this)); | ||
} | ||
@@ -60,2 +60,11 @@ complete(error) { | ||
} | ||
onComplete(error) { | ||
if (error !== undefined) { | ||
this.complete(error); | ||
} | ||
else { | ||
this.notifyNext(); | ||
} | ||
} | ||
onNext(_) { } | ||
} | ||
@@ -62,0 +71,0 @@ const throttleOperator = (durationSelector, mode) => subscriber => new ThrottleSubscriber(subscriber, durationSelector, mode); |
@@ -5,8 +5,8 @@ class ThrowsObservable { | ||
this.delay = delay; | ||
this.subscribe = (subscriber) => { | ||
const continuation = (_) => { | ||
subscriber.complete(this.error); | ||
}; | ||
subscriber.schedule(continuation, this.delay); | ||
} | ||
subscribe(subscriber) { | ||
const continuation = (_) => { | ||
subscriber.complete(this.error); | ||
}; | ||
subscriber.schedule(continuation, this.delay); | ||
} | ||
@@ -13,0 +13,0 @@ } |
import { createSerialDisposable, } from "@reactive-js/disposable"; | ||
import { subscribe, DelegatingSubscriber, } from "@reactive-js/rx"; | ||
import { lift } from "./lift"; | ||
import { onComplete } from "./observe"; | ||
import { observe } from "./observe"; | ||
import { pipe } from "@reactive-js/pipe"; | ||
@@ -17,6 +17,10 @@ import { throws } from "./throws"; | ||
if (!this.isDisposed) { | ||
this.durationSubscription.disposable = pipe(this.duration, onComplete(error => this.complete(error)), subscribe(this)); | ||
this.durationSubscription.disposable = pipe(this.duration, observe(this), subscribe(this)); | ||
this.delegate.next(data); | ||
} | ||
} | ||
onComplete(error) { | ||
this.complete(error); | ||
} | ||
onNext(_) { } | ||
} | ||
@@ -23,0 +27,0 @@ const operator = (duration) => subscriber => new TimeoutSubscriber(subscriber, duration); |
@@ -0,1 +1,2 @@ | ||
export { buffer } from "./internal/buffer"; | ||
export { lift } from "./internal/lift"; | ||
@@ -2,0 +3,0 @@ export { observe, onComplete, onError, onNext } from "./internal/observe"; |
@@ -18,2 +18,3 @@ [@reactive-js/observable](README.md) | ||
* [buffer](README.md#const-buffer) | ||
* [combineLatest](README.md#combinelatest) | ||
@@ -67,2 +68,21 @@ * [concat](README.md#concat) | ||
### `Const` buffer | ||
▸ **buffer**<**T**>(`duration`: function | number, `maxBufferSize`: number): *[ObservableOperatorLike](interfaces/observableoperatorlike.md)‹T, keyof T[]›* | ||
**Type parameters:** | ||
▪ **T** | ||
**Parameters:** | ||
Name | Type | Default | | ||
------ | ------ | ------ | | ||
`duration` | function | number | - | | ||
`maxBufferSize` | number | Number.MAX_SAFE_INTEGER | | ||
**Returns:** *[ObservableOperatorLike](interfaces/observableoperatorlike.md)‹T, keyof T[]›* | ||
___ | ||
### combineLatest | ||
@@ -497,3 +517,3 @@ | ||
▸ **fromArray**<**T**>(`values`: ReadonlyArray‹T›, `delay`: number): *ObservableLike‹T›* | ||
▸ **fromArray**<**T**>(`values`: keyof T[], `delay`: number): *ObservableLike‹T›* | ||
@@ -508,3 +528,3 @@ **Type parameters:** | ||
------ | ------ | ------ | | ||
`values` | ReadonlyArray‹T› | - | | ||
`values` | keyof T[] | - | | ||
`delay` | number | 0 | | ||
@@ -1096,3 +1116,3 @@ | ||
▸ **throws**<**T**>(`cause`: unknown, `delay?`: undefined | number): *ObservableLike‹T›* | ||
▸ **throws**<**T**>(`cause`: unknown, `delay`: number): *ObservableLike‹T›* | ||
@@ -1105,6 +1125,6 @@ **Type parameters:** | ||
Name | Type | | ||
------ | ------ | | ||
`cause` | unknown | | ||
`delay?` | undefined | number | | ||
Name | Type | Default | | ||
------ | ------ | ------ | | ||
`cause` | unknown | - | | ||
`delay` | number | 0 | | ||
@@ -1111,0 +1131,0 @@ **Returns:** *ObservableLike‹T›* |
{ | ||
"name": "@reactive-js/observable", | ||
"version": "0.0.16", | ||
"version": "0.0.17", | ||
"main": "dist/cjs/index.js", | ||
@@ -41,7 +41,7 @@ "module": "dist/esm5/index.js", | ||
"dependencies": { | ||
"@reactive-js/disposable": "^0.0.16", | ||
"@reactive-js/pipe": "^0.0.16", | ||
"@reactive-js/rx": "^0.0.16", | ||
"@reactive-js/scheduler": "^0.0.16", | ||
"@reactive-js/schedulers": "^0.0.16" | ||
"@reactive-js/disposable": "^0.0.17", | ||
"@reactive-js/pipe": "^0.0.17", | ||
"@reactive-js/rx": "^0.0.17", | ||
"@reactive-js/scheduler": "^0.0.17", | ||
"@reactive-js/schedulers": "^0.0.17" | ||
}, | ||
@@ -73,3 +73,3 @@ "devDependencies": { | ||
}, | ||
"gitHead": "dcc81a6c83b8b8d8ac95dd07a395f1e5889bdeb8" | ||
"gitHead": "ad2250c08f04d3d48f0d6db2393444719ed21dc7" | ||
} |
@@ -0,1 +1,2 @@ | ||
export { buffer } from "./internal/buffer"; | ||
export { lift } from "./internal/lift"; | ||
@@ -2,0 +3,0 @@ export { observe, onComplete, onError, onNext } from "./internal/observe"; |
@@ -1,2 +0,1 @@ | ||
import { disposed } from "@reactive-js/disposable"; | ||
import { | ||
@@ -6,52 +5,44 @@ ErrorLike, | ||
SubscriberLike, | ||
subscribe, | ||
ObserverLike, | ||
DelegatingSubscriber, | ||
} from "@reactive-js/rx"; | ||
import { fromArray } from "./fromArray"; | ||
import { observe } from "./observe"; | ||
import { pipe } from "@reactive-js/pipe"; | ||
import { ObservableOperatorLike } from "./interfaces"; | ||
import { defer } from "./defer"; | ||
class ConcatObservable<T> implements ObservableLike<T>, ObserverLike<T> { | ||
private subscriber: SubscriberLike<T> | undefined; | ||
private innerSubscription = disposed; | ||
class ConcatSubscriber<T> extends DelegatingSubscriber<T, T> { | ||
private index = 0; | ||
constructor(private readonly observables: readonly ObservableLike<T>[]) {} | ||
onNext(v: T) { | ||
(this.subscriber as SubscriberLike<T>).next(v); | ||
constructor( | ||
delegate: SubscriberLike<T>, | ||
private readonly observables: readonly ObservableLike<T>[], | ||
) { | ||
super(delegate); | ||
} | ||
onComplete(error?: ErrorLike) { | ||
const subscriber = this.subscriber as SubscriberLike<T>; | ||
subscriber.remove(this.innerSubscription); | ||
complete(error?: ErrorLike) { | ||
if (error !== undefined) { | ||
this.delegate.complete(error); | ||
} else { | ||
const head = this.observables[this.index]; | ||
const hasNextObservable = head !== undefined; | ||
if (error !== undefined) { | ||
subscriber.complete(error); | ||
} else if (!this.subscribeNext()) { | ||
subscriber.complete(); | ||
if (hasNextObservable) { | ||
this.index++; | ||
head.subscribe(this); | ||
} else { | ||
this.delegate.complete(); | ||
} | ||
} | ||
} | ||
subscribeNext() { | ||
const head = this.observables[this.index]; | ||
const hasNextObservable = head !== undefined; | ||
next(data: T) { | ||
this.delegate.next(data); | ||
} | ||
} | ||
if (hasNextObservable) { | ||
this.index++; | ||
class ConcatObservable<T> implements ObservableLike<T> { | ||
constructor(private readonly observables: readonly ObservableLike<T>[]) {} | ||
const subscriber = this.subscriber as SubscriberLike<T>; | ||
this.innerSubscription = pipe(head, observe(this), subscribe(subscriber)); | ||
subscriber.add(this.innerSubscription); | ||
} | ||
return hasNextObservable; | ||
} | ||
subscribe(subscriber: SubscriberLike<T>) { | ||
this.subscriber = subscriber; | ||
this.subscribeNext(); | ||
const concatSubscriber = new ConcatSubscriber(subscriber, this.observables); | ||
concatSubscriber.complete(); | ||
} | ||
@@ -68,3 +59,3 @@ } | ||
): ObservableLike<T> { | ||
return defer(() => new ConcatObservable(observables)); | ||
return new ConcatObservable(observables); | ||
} | ||
@@ -71,0 +62,0 @@ |
@@ -12,2 +12,22 @@ import { ObservableLike, SubscriberLike } from "@reactive-js/rx"; | ||
private readonly continuation: SchedulerContinuationLike = shouldYield => { | ||
let error = undefined; | ||
try { | ||
const result = this.loop(shouldYield); | ||
if (result !== undefined) { | ||
return result; | ||
} | ||
} catch (cause) { | ||
error = { cause }; | ||
} | ||
(this.subscriber as SubscriberLike<T>).complete(error); | ||
return; | ||
}; | ||
private readonly continuationResult: SchedulerContinuationResultLike = { | ||
continuation: this.continuation, | ||
delay: this.delay, | ||
}; | ||
constructor( | ||
@@ -40,22 +60,2 @@ private readonly values: readonly T[], | ||
private readonly continuation: SchedulerContinuationLike = shouldYield => { | ||
let error = undefined; | ||
try { | ||
const result = this.loop(shouldYield); | ||
if (result !== undefined) { | ||
return result; | ||
} | ||
} catch (cause) { | ||
error = { cause }; | ||
} | ||
(this.subscriber as SubscriberLike<T>).complete(error); | ||
return; | ||
}; | ||
private readonly continuationResult: SchedulerContinuationResultLike = { | ||
continuation: this.continuation, | ||
delay: this.delay, | ||
}; | ||
subscribe(subscriber: SubscriberLike<T>) { | ||
@@ -62,0 +62,0 @@ this.subscriber = subscriber; |
@@ -11,2 +11,22 @@ import { ObservableLike, SubscriberLike } from "@reactive-js/rx"; | ||
private readonly continuation: SchedulerContinuationLike = shouldYield => { | ||
let error = undefined; | ||
try { | ||
const result = this.loop(shouldYield); | ||
if (result !== undefined) { | ||
return result; | ||
} | ||
} catch (cause) { | ||
error = { cause }; | ||
} | ||
(this.subscriber as SubscriberLike<T>).complete(error); | ||
return; | ||
}; | ||
private readonly continuationResult: SchedulerContinuationResultLike = { | ||
continuation: this.continuation, | ||
delay: this.delay, | ||
}; | ||
constructor( | ||
@@ -37,22 +57,2 @@ private readonly iterator: Iterator<T>, | ||
private readonly continuation: SchedulerContinuationLike = shouldYield => { | ||
let error = undefined; | ||
try { | ||
const result = this.loop(shouldYield); | ||
if (result !== undefined) { | ||
return result; | ||
} | ||
} catch (cause) { | ||
error = { cause }; | ||
} | ||
(this.subscriber as SubscriberLike<T>).complete(error); | ||
return; | ||
}; | ||
private readonly continuationResult: SchedulerContinuationResultLike = { | ||
continuation: this.continuation, | ||
delay: this.delay, | ||
}; | ||
subscribe(subscriber: SubscriberLike<T>) { | ||
@@ -59,0 +59,0 @@ this.subscriber = subscriber; |
@@ -12,2 +12,17 @@ import { ObservableLike, SubscriberLike } from "@reactive-js/rx"; | ||
private readonly continuation: SchedulerContinuationLike = shouldYield => { | ||
let error = undefined; | ||
try { | ||
const result = this.loop(shouldYield); | ||
if (result !== undefined) { | ||
return result; | ||
} | ||
} catch (cause) { | ||
error = { cause }; | ||
} | ||
(this.subscriber as SubscriberLike<T>).complete(error); | ||
return; | ||
}; | ||
constructor(private readonly values: ReadonlyArray<[number, T]>) {} | ||
@@ -39,17 +54,2 @@ | ||
private readonly continuation: SchedulerContinuationLike = shouldYield => { | ||
let error = undefined; | ||
try { | ||
const result = this.loop(shouldYield); | ||
if (result !== undefined) { | ||
return result; | ||
} | ||
} catch (cause) { | ||
error = { cause }; | ||
} | ||
(this.subscriber as SubscriberLike<T>).complete(error); | ||
return; | ||
}; | ||
subscribe(subscriber: SubscriberLike<T>) { | ||
@@ -56,0 +56,0 @@ this.subscriber = subscriber; |
@@ -11,2 +11,20 @@ import { ObservableLike, SubscriberLike } from "@reactive-js/rx"; | ||
private readonly continuation: SchedulerContinuationLike = ( | ||
shouldYield: () => boolean, | ||
) => { | ||
const subscriber = this.subscriber as SubscriberLike<T>; | ||
try { | ||
this.loop(shouldYield); | ||
} catch (cause) { | ||
subscriber.complete({ cause }); | ||
} | ||
return subscriber.isDisposed ? undefined : this.continuationResult; | ||
}; | ||
private readonly continuationResult: SchedulerContinuationResultLike = { | ||
continuation: this.continuation, | ||
delay: this.delay, | ||
}; | ||
constructor( | ||
@@ -31,20 +49,2 @@ private readonly generator: (acc: T) => T, | ||
private readonly continuation: SchedulerContinuationLike = ( | ||
shouldYield: () => boolean, | ||
) => { | ||
const subscriber = this.subscriber as SubscriberLike<T>; | ||
try { | ||
this.loop(shouldYield); | ||
} catch (cause) { | ||
subscriber.complete({ cause }); | ||
} | ||
return subscriber.isDisposed ? undefined : this.continuationResult; | ||
}; | ||
private readonly continuationResult: SchedulerContinuationResultLike = { | ||
continuation: this.continuation, | ||
delay: this.delay, | ||
}; | ||
subscribe(subscriber: SubscriberLike<T>) { | ||
@@ -51,0 +51,0 @@ this.subscriber = subscriber; |
@@ -85,3 +85,3 @@ import { | ||
class ObservableIteratorImpl<T> implements Iterator<T> { | ||
class ObservableIteratorImpl<T> implements Iterator<T>, ObserverLike<T> { | ||
private value: [T] | undefined = undefined; | ||
@@ -94,18 +94,18 @@ private error: ErrorLike | undefined = undefined; | ||
) { | ||
const observer: ObserverLike<T> = { | ||
onNext: (value: T) => { | ||
this.value = [value]; | ||
}, | ||
onComplete: e => { | ||
this.error = e; | ||
}, | ||
}; | ||
const subscription = pipe( | ||
observable, | ||
observe(observer), | ||
subscribe(scheduler), | ||
); | ||
const subscription = pipe(observable, observe(this), subscribe(scheduler)); | ||
scheduler.add(subscription); | ||
} | ||
onNext(value: T) { | ||
if (this.value === undefined) { | ||
this.value = [value]; | ||
} else { | ||
this.value[0] = value; | ||
} | ||
} | ||
onComplete(error?: ErrorLike) { | ||
this.error = error; | ||
} | ||
next(): IteratorResult<T> { | ||
@@ -126,3 +126,2 @@ throwIfDisposed(this.scheduler); | ||
if (done) { | ||
// Cleanup | ||
this.scheduler.dispose(); | ||
@@ -141,3 +140,3 @@ return iteratorDone; | ||
throw(e?: any): IteratorResult<T> { | ||
throw(e?: unknown): IteratorResult<T> { | ||
this.scheduler.dispose; | ||
@@ -144,0 +143,0 @@ if (e !== undefined) { |
@@ -61,14 +61,18 @@ import { | ||
class OnErrorObserver<T> implements ObserverLike<T> { | ||
constructor(private readonly onError: (err: unknown) => void) {} | ||
onComplete(error?: ErrorLike) { | ||
if (error !== undefined) { | ||
const { cause } = error; | ||
this.onError(cause); | ||
} | ||
} | ||
onNext(_: T) {} | ||
} | ||
export const onError = <T>( | ||
onError: (err: unknown) => void, | ||
): ObservableOperatorLike<T, T> => | ||
observe({ | ||
onNext: ignore, | ||
onComplete: (error?: ErrorLike) => { | ||
if (error !== undefined) { | ||
const { cause } = error; | ||
onError(cause); | ||
} | ||
}, | ||
}); | ||
): ObservableOperatorLike<T, T> => observe(new OnErrorObserver(onError)); | ||
@@ -75,0 +79,0 @@ export const onNext = <T>( |
@@ -11,2 +11,3 @@ import { | ||
SubscriberLike, | ||
ObserverLike, | ||
} from "@reactive-js/rx"; | ||
@@ -16,3 +17,3 @@ import { empty } from "./fromArray"; | ||
import { lift } from "./lift"; | ||
import { onComplete } from "./observe"; | ||
import { observe } from "./observe"; | ||
import { pipe } from "@reactive-js/pipe"; | ||
@@ -26,6 +27,18 @@ | ||
class ThrottleSubscriber<T> extends DelegatingSubscriber<T, T> { | ||
class ThrottleSubscriber<T> extends DelegatingSubscriber<T, T> | ||
implements ObserverLike<unknown> { | ||
private readonly durationSubscription: SerialDisposableLike = createSerialDisposable(); | ||
private value: [T] | undefined = undefined; | ||
private readonly notifyNext = () => { | ||
constructor( | ||
delegate: SubscriberLike<T>, | ||
private readonly durationSelector: (next: T) => ObservableLike<unknown>, | ||
private readonly mode: ThrottleMode, | ||
) { | ||
super(delegate); | ||
this.add(this.durationSubscription); | ||
} | ||
private notifyNext() { | ||
const value = this.value; | ||
@@ -44,12 +57,2 @@ if (value !== undefined) { | ||
} | ||
}; | ||
constructor( | ||
delegate: SubscriberLike<T>, | ||
private readonly durationSelector: (next: T) => ObservableLike<unknown>, | ||
private readonly mode: ThrottleMode, | ||
) { | ||
super(delegate); | ||
this.add(this.durationSubscription); | ||
} | ||
@@ -60,3 +63,3 @@ | ||
this.durationSelector(next), | ||
onComplete(this.notifyNext), | ||
observe(this), | ||
subscribe(this), | ||
@@ -95,2 +98,12 @@ ); | ||
} | ||
onComplete(error?: ErrorLike) { | ||
if (error !== undefined) { | ||
this.complete(error); | ||
} else { | ||
this.notifyNext(); | ||
} | ||
} | ||
onNext(_: unknown) {} | ||
} | ||
@@ -97,0 +110,0 @@ |
@@ -9,3 +9,3 @@ import { ObservableLike, SubscriberLike, ErrorLike } from "@reactive-js/rx"; | ||
subscribe = (subscriber: SubscriberLike<T>) => { | ||
subscribe(subscriber: SubscriberLike<T>) { | ||
const continuation = (_: () => boolean) => { | ||
@@ -16,3 +16,3 @@ subscriber.complete(this.error); | ||
subscriber.schedule(continuation, this.delay); | ||
}; | ||
} | ||
} | ||
@@ -19,0 +19,0 @@ |
@@ -10,6 +10,8 @@ import { | ||
ObservableLike, | ||
ObserverLike, | ||
ErrorLike, | ||
} from "@reactive-js/rx"; | ||
import { ObservableOperatorLike, SubscriberOperatorLike } from "./interfaces"; | ||
import { lift } from "./lift"; | ||
import { onComplete } from "./observe"; | ||
import { observe } from "./observe"; | ||
import { pipe } from "@reactive-js/pipe"; | ||
@@ -20,3 +22,4 @@ import { throws } from "./throws"; | ||
class TimeoutSubscriber<T> extends DelegatingSubscriber<T, T> { | ||
class TimeoutSubscriber<T> extends DelegatingSubscriber<T, T> | ||
implements ObserverLike<unknown> { | ||
private readonly durationSubscription: SerialDisposableLike = createSerialDisposable(); | ||
@@ -36,3 +39,3 @@ | ||
this.duration, | ||
onComplete(error => this.complete(error)), | ||
observe(this), | ||
subscribe(this), | ||
@@ -44,2 +47,8 @@ ); | ||
} | ||
onComplete(error?: ErrorLike) { | ||
this.complete(error); | ||
} | ||
onNext(_: unknown) {} | ||
} | ||
@@ -46,0 +55,0 @@ |
@@ -18,2 +18,3 @@ import { DisposableLike } from "@reactive-js/disposable"; | ||
if (Array.isArray(resources)) { | ||
// eslint-disable-next-line prefer-spread | ||
subscriber.add.apply(subscriber, resources as any); | ||
@@ -20,0 +21,0 @@ } else { |
@@ -14,2 +14,3 @@ import { | ||
import { | ||
buffer, | ||
combineLatest, | ||
@@ -108,2 +109,25 @@ concat, | ||
test("buffer", () => { | ||
const result = pipe( | ||
fromScheduledValues( | ||
[0, 1], | ||
[0, 2], | ||
[0, 3], | ||
[0, 4], | ||
[1, 1], | ||
[1, 2], | ||
[1, 3], | ||
[1, 4], | ||
), | ||
buffer(4, 3), | ||
toArray(createVirtualTimeSchedulerResource), | ||
); | ||
expect(result).toEqual([ | ||
[1, 2, 3], | ||
[4, 1, 2], | ||
[3, 4], | ||
]); | ||
}); | ||
test("combineLatest", () => { | ||
@@ -840,2 +864,3 @@ const result = pipe( | ||
expect(() => { | ||
// eslint-disable-next-line @typescript-eslint/no-unused-vars, no-empty | ||
for (const _ of iterable) { | ||
@@ -860,3 +885,3 @@ } | ||
expect((result as any).done).toBeTruthy(); | ||
expect(result.done).toBeTruthy(); | ||
}); | ||
@@ -869,3 +894,3 @@ | ||
expect((result as any).done).toBeTruthy(); | ||
expect(result.done).toBeTruthy(); | ||
}); | ||
@@ -872,0 +897,0 @@ }); |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is too big to display
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
1438482
386
6893
+ Added@reactive-js/disposable@0.0.17(transitive)
+ Added@reactive-js/pipe@0.0.17(transitive)
+ Added@reactive-js/rx@0.0.17(transitive)
+ Added@reactive-js/scheduler@0.0.17(transitive)
+ Added@reactive-js/schedulers@0.0.17(transitive)
- Removed@reactive-js/disposable@0.0.16(transitive)
- Removed@reactive-js/pipe@0.0.16(transitive)
- Removed@reactive-js/rx@0.0.16(transitive)
- Removed@reactive-js/scheduler@0.0.16(transitive)
- Removed@reactive-js/schedulers@0.0.16(transitive)
Updated@reactive-js/pipe@^0.0.17
Updated@reactive-js/rx@^0.0.17