@reactive-js/rx
Advanced tools
Comparing version 0.0.11 to 0.0.12
@@ -5,2 +5,2 @@ export { ErrorLike, MulticastObservableLike, MulticastObservableResourceLike, ObserverLike, ObservableLike, ObservableResourceLike, SubscriberLike, SubjectLike, SubjectResourceLike, } from "./internal/interfaces"; | ||
export { createSubject } from "./internal/subject"; | ||
export { DelegatingSubscriber } from "./internal/delegatingSubscriber"; | ||
export { AbstractDelegatingSubscriber } from "./internal/delegatingSubscriber"; |
@@ -10,3 +10,3 @@ "use strict"; | ||
var delegatingSubscriber_1 = require("./internal/delegatingSubscriber"); | ||
exports.DelegatingSubscriber = delegatingSubscriber_1.DelegatingSubscriber; | ||
exports.AbstractDelegatingSubscriber = delegatingSubscriber_1.AbstractDelegatingSubscriber; | ||
//# sourceMappingURL=index.js.map |
@@ -12,9 +12,9 @@ import { DisposableLike, DisposableOrTeardown } from "@reactive-js/disposable"; | ||
get now(): number; | ||
add(disposable: DisposableOrTeardown, ...disposables: DisposableOrTeardown[]): void; | ||
add(disposable: DisposableOrTeardown, ...disposables: DisposableOrTeardown[]): this; | ||
abstract complete(_error?: ErrorLike): void; | ||
dispose(): void; | ||
abstract next(data: T): void; | ||
remove(disposable: DisposableOrTeardown, ...disposables: DisposableOrTeardown[]): void; | ||
remove(disposable: DisposableOrTeardown, ...disposables: DisposableOrTeardown[]): this; | ||
schedule(continuation: SchedulerContinuationLike, delay?: number): DisposableLike; | ||
} | ||
export declare const checkState: <T>(subscriber: SubscriberLike<T>) => void; |
@@ -19,2 +19,3 @@ "use strict"; | ||
this.subscription.add(disposable, ...disposables); | ||
return this; | ||
} | ||
@@ -26,2 +27,3 @@ dispose() { | ||
this.subscription.remove(disposable, ...disposables); | ||
return this; | ||
} | ||
@@ -28,0 +30,0 @@ schedule(continuation, delay) { |
import { ErrorLike, ObserverLike, SubscriberLike } from "./interfaces"; | ||
import { AbstractSubscriber } from "./abstractSubscriber"; | ||
export declare abstract class DelegatingSubscriber<TA, TB> extends AbstractSubscriber<TA> { | ||
export declare abstract class AbstractDelegatingSubscriber<TA, TB> extends AbstractSubscriber<TA> { | ||
readonly delegate: ObserverLike<TB>; | ||
private isStopped; | ||
private readonly source; | ||
constructor(delegate: SubscriberLike<TB>); | ||
@@ -11,6 +10,6 @@ get isSubscribed(): boolean; | ||
next(data: TA): void; | ||
protected abstract onComplete(error?: ErrorLike): void; | ||
protected abstract onNext(data: TA): void; | ||
private tryOnComplete; | ||
private tryOnNext; | ||
abstract completeUnsafe(error?: ErrorLike): void; | ||
abstract nextUnsafe(data: TA): void; | ||
private tryComplete; | ||
private tryNext; | ||
} |
@@ -5,3 +5,3 @@ "use strict"; | ||
const __DEV__ = process.env.NODE_ENV !== "production"; | ||
class DelegatingSubscriber extends abstractSubscriber_1.AbstractSubscriber { | ||
class AbstractDelegatingSubscriber extends abstractSubscriber_1.AbstractSubscriber { | ||
constructor(delegate) { | ||
@@ -11,4 +11,2 @@ super(delegate.scheduler || delegate, delegate.subscription || delegate); | ||
this.delegate = delegate; | ||
this.source = | ||
delegate instanceof DelegatingSubscriber ? delegate.source : delegate; | ||
this.add(() => { | ||
@@ -19,3 +17,3 @@ this.isStopped = true; | ||
get isSubscribed() { | ||
return this.source.isSubscribed; | ||
return this.delegate.isSubscribed; | ||
} | ||
@@ -28,3 +26,3 @@ complete(error) { | ||
this.isStopped = true; | ||
this.tryOnComplete(error); | ||
this.tryComplete(error); | ||
} | ||
@@ -37,8 +35,8 @@ } | ||
if (!this.isStopped) { | ||
this.tryOnNext(data); | ||
this.tryNext(data); | ||
} | ||
} | ||
tryOnComplete(error) { | ||
tryComplete(error) { | ||
try { | ||
this.onComplete(error); | ||
this.completeUnsafe(error); | ||
} | ||
@@ -49,5 +47,5 @@ catch (cause) { | ||
} | ||
tryOnNext(data) { | ||
tryNext(data) { | ||
try { | ||
this.onNext(data); | ||
this.nextUnsafe(data); | ||
} | ||
@@ -59,3 +57,3 @@ catch (cause) { | ||
} | ||
exports.DelegatingSubscriber = DelegatingSubscriber; | ||
exports.AbstractDelegatingSubscriber = AbstractDelegatingSubscriber; | ||
//# sourceMappingURL=delegatingSubscriber.js.map |
@@ -22,4 +22,3 @@ "use strict"; | ||
if (this.isComplete) { | ||
this.subscriber.remove(this.teardown); | ||
this.subscriber.complete(this.error); | ||
this.subscriber.remove(this.teardown).complete(this.error); | ||
} | ||
@@ -26,0 +25,0 @@ return; |
@@ -36,2 +36,3 @@ "use strict"; | ||
this.disposable.add(disposable, ...disposables); | ||
return this; | ||
} | ||
@@ -65,2 +66,3 @@ complete(error) { | ||
this.disposable.remove(disposable, ...disposables); | ||
return this; | ||
} | ||
@@ -67,0 +69,0 @@ subscribe(subscriber) { |
@@ -5,2 +5,2 @@ export { ErrorLike, MulticastObservableLike, MulticastObservableResourceLike, ObserverLike, ObservableLike, ObservableResourceLike, SubscriberLike, SubjectLike, SubjectResourceLike, } from "./internal/interfaces"; | ||
export { createSubject } from "./internal/subject"; | ||
export { DelegatingSubscriber } from "./internal/delegatingSubscriber"; | ||
export { AbstractDelegatingSubscriber } from "./internal/delegatingSubscriber"; |
export { subscribe } from "./internal/subscribe"; | ||
export { createObservable } from "./internal/createObservable"; | ||
export { createSubject } from "./internal/subject"; | ||
export { DelegatingSubscriber } from "./internal/delegatingSubscriber"; | ||
export { AbstractDelegatingSubscriber } from "./internal/delegatingSubscriber"; | ||
//# sourceMappingURL=index.js.map |
@@ -12,9 +12,9 @@ import { DisposableLike, DisposableOrTeardown } from "@reactive-js/disposable"; | ||
get now(): number; | ||
add(disposable: DisposableOrTeardown, ...disposables: DisposableOrTeardown[]): void; | ||
add(disposable: DisposableOrTeardown, ...disposables: DisposableOrTeardown[]): this; | ||
abstract complete(_error?: ErrorLike): void; | ||
dispose(): void; | ||
abstract next(data: T): void; | ||
remove(disposable: DisposableOrTeardown, ...disposables: DisposableOrTeardown[]): void; | ||
remove(disposable: DisposableOrTeardown, ...disposables: DisposableOrTeardown[]): this; | ||
schedule(continuation: SchedulerContinuationLike, delay?: number): DisposableLike; | ||
} | ||
export declare const checkState: <T>(subscriber: SubscriberLike<T>) => void; |
@@ -17,2 +17,3 @@ export class AbstractSubscriber { | ||
this.subscription.add(disposable, ...disposables); | ||
return this; | ||
} | ||
@@ -24,2 +25,3 @@ dispose() { | ||
this.subscription.remove(disposable, ...disposables); | ||
return this; | ||
} | ||
@@ -26,0 +28,0 @@ schedule(continuation, delay) { |
import { ErrorLike, ObserverLike, SubscriberLike } from "./interfaces"; | ||
import { AbstractSubscriber } from "./abstractSubscriber"; | ||
export declare abstract class DelegatingSubscriber<TA, TB> extends AbstractSubscriber<TA> { | ||
export declare abstract class AbstractDelegatingSubscriber<TA, TB> extends AbstractSubscriber<TA> { | ||
readonly delegate: ObserverLike<TB>; | ||
private isStopped; | ||
private readonly source; | ||
constructor(delegate: SubscriberLike<TB>); | ||
@@ -11,6 +10,6 @@ get isSubscribed(): boolean; | ||
next(data: TA): void; | ||
protected abstract onComplete(error?: ErrorLike): void; | ||
protected abstract onNext(data: TA): void; | ||
private tryOnComplete; | ||
private tryOnNext; | ||
abstract completeUnsafe(error?: ErrorLike): void; | ||
abstract nextUnsafe(data: TA): void; | ||
private tryComplete; | ||
private tryNext; | ||
} |
import { AbstractSubscriber, checkState } from "./abstractSubscriber"; | ||
const __DEV__ = process.env.NODE_ENV !== "production"; | ||
export class DelegatingSubscriber extends AbstractSubscriber { | ||
export class AbstractDelegatingSubscriber extends AbstractSubscriber { | ||
constructor(delegate) { | ||
@@ -8,4 +8,2 @@ super(delegate.scheduler || delegate, delegate.subscription || delegate); | ||
this.delegate = delegate; | ||
this.source = | ||
delegate instanceof DelegatingSubscriber ? delegate.source : delegate; | ||
this.add(() => { | ||
@@ -16,3 +14,3 @@ this.isStopped = true; | ||
get isSubscribed() { | ||
return this.source.isSubscribed; | ||
return this.delegate.isSubscribed; | ||
} | ||
@@ -25,3 +23,3 @@ complete(error) { | ||
this.isStopped = true; | ||
this.tryOnComplete(error); | ||
this.tryComplete(error); | ||
} | ||
@@ -34,8 +32,8 @@ } | ||
if (!this.isStopped) { | ||
this.tryOnNext(data); | ||
this.tryNext(data); | ||
} | ||
} | ||
tryOnComplete(error) { | ||
tryComplete(error) { | ||
try { | ||
this.onComplete(error); | ||
this.completeUnsafe(error); | ||
} | ||
@@ -46,5 +44,5 @@ catch (cause) { | ||
} | ||
tryOnNext(data) { | ||
tryNext(data) { | ||
try { | ||
this.onNext(data); | ||
this.nextUnsafe(data); | ||
} | ||
@@ -51,0 +49,0 @@ catch (cause) { |
@@ -20,4 +20,3 @@ class SafeObserver { | ||
if (this.isComplete) { | ||
this.subscriber.remove(this.teardown); | ||
this.subscriber.complete(this.error); | ||
this.subscriber.remove(this.teardown).complete(this.error); | ||
} | ||
@@ -24,0 +23,0 @@ return; |
@@ -34,2 +34,3 @@ import { createDisposable, } from "@reactive-js/disposable"; | ||
this.disposable.add(disposable, ...disposables); | ||
return this; | ||
} | ||
@@ -63,2 +64,3 @@ complete(error) { | ||
this.disposable.remove(disposable, ...disposables); | ||
return this; | ||
} | ||
@@ -65,0 +67,0 @@ subscribe(subscriber) { |
@@ -5,3 +5,3 @@ export { ErrorLike, MulticastObservableLike, MulticastObservableResourceLike, ObserverLike, ObservableLike, ObservableResourceLike, SubscriberLike, SubjectLike, SubjectResourceLike, } from "./internal/interfaces"; | ||
export { createSubject } from "./internal/subject"; | ||
export { DelegatingSubscriber } from "./internal/delegatingSubscriber"; | ||
export { AbstractDelegatingSubscriber } from "./internal/delegatingSubscriber"; | ||
//# sourceMappingURL=index.d.ts.map |
@@ -12,7 +12,7 @@ import { DisposableLike, DisposableOrTeardown } from "@reactive-js/disposable"; | ||
get now(): number; | ||
add(disposable: DisposableOrTeardown, ...disposables: DisposableOrTeardown[]): void; | ||
add(disposable: DisposableOrTeardown, ...disposables: DisposableOrTeardown[]): this; | ||
abstract complete(_error?: ErrorLike): void; | ||
dispose(): void; | ||
abstract next(data: T): void; | ||
remove(disposable: DisposableOrTeardown, ...disposables: DisposableOrTeardown[]): void; | ||
remove(disposable: DisposableOrTeardown, ...disposables: DisposableOrTeardown[]): this; | ||
schedule(continuation: SchedulerContinuationLike, delay?: number): DisposableLike; | ||
@@ -19,0 +19,0 @@ } |
import { ErrorLike, ObserverLike, SubscriberLike } from "./interfaces"; | ||
import { AbstractSubscriber } from "./abstractSubscriber"; | ||
export declare abstract class DelegatingSubscriber<TA, TB> extends AbstractSubscriber<TA> { | ||
export declare abstract class AbstractDelegatingSubscriber<TA, TB> extends AbstractSubscriber<TA> { | ||
readonly delegate: ObserverLike<TB>; | ||
private isStopped; | ||
private readonly source; | ||
constructor(delegate: SubscriberLike<TB>); | ||
@@ -11,7 +10,7 @@ get isSubscribed(): boolean; | ||
next(data: TA): void; | ||
protected abstract onComplete(error?: ErrorLike): void; | ||
protected abstract onNext(data: TA): void; | ||
private tryOnComplete; | ||
private tryOnNext; | ||
abstract completeUnsafe(error?: ErrorLike): void; | ||
abstract nextUnsafe(data: TA): void; | ||
private tryComplete; | ||
private tryNext; | ||
} | ||
//# sourceMappingURL=delegatingSubscriber.d.ts.map |
@@ -29,3 +29,3 @@ [@reactive-js/rx](../README.md) › [SubscriberLike](subscriberlike.md) | ||
* [DelegatingSubscriber](../classes/delegatingsubscriber.md) | ||
* [AbstractDelegatingSubscriber](../classes/abstractdelegatingsubscriber.md) | ||
@@ -32,0 +32,0 @@ ## Index |
@@ -9,3 +9,3 @@ [@reactive-js/rx](README.md) | ||
* [DelegatingSubscriber](classes/delegatingsubscriber.md) | ||
* [AbstractDelegatingSubscriber](classes/abstractdelegatingsubscriber.md) | ||
@@ -12,0 +12,0 @@ ### Interfaces |
{ | ||
"name": "@reactive-js/rx", | ||
"version": "0.0.11", | ||
"version": "0.0.12", | ||
"main": "dist/cjs/index.js", | ||
@@ -41,8 +41,8 @@ "module": "dist/esm5/index.js", | ||
"dependencies": { | ||
"@reactive-js/disposable": "^0.0.11", | ||
"@reactive-js/scheduler": "^0.0.11" | ||
"@reactive-js/disposable": "^0.0.12", | ||
"@reactive-js/scheduler": "^0.0.12" | ||
}, | ||
"devDependencies": { | ||
"@reactive-js/pipe": "^0.0.11", | ||
"@reactive-js/schedulers": "^0.0.11", | ||
"@reactive-js/pipe": "^0.0.12", | ||
"@reactive-js/schedulers": "^0.0.12", | ||
"@types/jest": "^24.0.23", | ||
@@ -72,3 +72,3 @@ "jest": "^24.9.0", | ||
}, | ||
"gitHead": "894e4863ea037c317967555ebc27ab66167cfc59" | ||
"gitHead": "66ee9bfeeb79e01c9770dc09829bedf292674ff0" | ||
} |
@@ -16,2 +16,2 @@ export { | ||
export { createSubject } from "./internal/subject"; | ||
export { DelegatingSubscriber } from "./internal/delegatingSubscriber"; | ||
export { AbstractDelegatingSubscriber } from "./internal/delegatingSubscriber"; |
@@ -36,3 +36,5 @@ import { DisposableLike, DisposableOrTeardown } from "@reactive-js/disposable"; | ||
this.subscription.add(disposable, ...disposables); | ||
return this; | ||
} | ||
abstract complete(_error?: ErrorLike): void; | ||
@@ -50,2 +52,3 @@ | ||
this.subscription.remove(disposable, ...disposables); | ||
return this; | ||
} | ||
@@ -52,0 +55,0 @@ |
@@ -22,3 +22,3 @@ import { DisposableOrTeardown } from "@reactive-js/disposable"; | ||
// The idea here is that an onSubscribe function may | ||
// call onNext from unscheduled sources such as event handlers. | ||
// call next from unscheduled sources such as event handlers. | ||
// So we marshall those events back to the scheduler. | ||
@@ -25,0 +25,0 @@ const observer = createSafeObserver(subscriber); |
@@ -11,8 +11,8 @@ import { ErrorLike, ObserverLike, SubscriberLike } from "./interfaces"; | ||
*/ | ||
export abstract class DelegatingSubscriber<TA, TB> extends AbstractSubscriber< | ||
TA | ||
export abstract class AbstractDelegatingSubscriber<TA, TB> extends AbstractSubscriber< | ||
TA | ||
> { | ||
readonly delegate: ObserverLike<TB>; | ||
private isStopped = false; | ||
private readonly source: SubscriberLike<any>; | ||
constructor(delegate: SubscriberLike<TB>) { | ||
@@ -26,5 +26,2 @@ super( | ||
this.source = | ||
delegate instanceof DelegatingSubscriber ? delegate.source : delegate; | ||
this.add(() => { | ||
@@ -37,3 +34,3 @@ this.isStopped = true; | ||
get isSubscribed() { | ||
return this.source.isSubscribed; | ||
return (this.delegate as SubscriberLike<unknown>).isSubscribed; | ||
} | ||
@@ -49,3 +46,3 @@ | ||
this.isStopped = true; | ||
this.tryOnComplete(error); | ||
this.tryComplete(error); | ||
} | ||
@@ -61,3 +58,3 @@ } | ||
if (!this.isStopped) { | ||
this.tryOnNext(data); | ||
this.tryNext(data); | ||
} | ||
@@ -72,3 +69,3 @@ } | ||
*/ | ||
protected abstract onComplete(error?: ErrorLike): void; | ||
abstract completeUnsafe(error?: ErrorLike): void; | ||
@@ -81,7 +78,7 @@ /** | ||
*/ | ||
protected abstract onNext(data: TA): void; | ||
abstract nextUnsafe(data: TA): void; | ||
private tryOnComplete(error?: ErrorLike) { | ||
private tryComplete(error?: ErrorLike) { | ||
try { | ||
this.onComplete(error); | ||
this.completeUnsafe(error); | ||
} catch (cause) { | ||
@@ -92,5 +89,5 @@ this.delegate.complete({ cause, parent: error } as ErrorLike); | ||
private tryOnNext(data: TA) { | ||
private tryNext(data: TA) { | ||
try { | ||
this.onNext(data); | ||
this.nextUnsafe(data); | ||
} catch (cause) { | ||
@@ -100,2 +97,2 @@ this.complete({ cause }); | ||
} | ||
} | ||
} |
@@ -27,4 +27,3 @@ import { SchedulerContinuationLike } from "@reactive-js/scheduler"; | ||
if (this.isComplete) { | ||
this.subscriber.remove(this.teardown); | ||
this.subscriber.complete(this.error); | ||
this.subscriber.remove(this.teardown).complete(this.error); | ||
} | ||
@@ -31,0 +30,0 @@ return; |
@@ -67,2 +67,3 @@ import { | ||
this.disposable.add(disposable, ...disposables); | ||
return this; | ||
} | ||
@@ -108,2 +109,3 @@ | ||
this.disposable.remove(disposable, ...disposables); | ||
return this; | ||
} | ||
@@ -114,3 +116,3 @@ | ||
// The idea here is that an onSubscribe function may | ||
// call onNext from unscheduled sources such as event handlers. | ||
// call next from unscheduled sources such as event handlers. | ||
// So we marshall those events back to the scheduler. | ||
@@ -117,0 +119,0 @@ const observer = createSafeObserver(subscriber); |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
434827
1501
+ Added@reactive-js/disposable@0.0.12(transitive)
+ Added@reactive-js/scheduler@0.0.12(transitive)
- Removed@reactive-js/disposable@0.0.11(transitive)
- Removed@reactive-js/scheduler@0.0.11(transitive)