@reactive-js/rx
Advanced tools
Comparing version 0.0.15 to 0.0.16
@@ -5,2 +5,2 @@ export { ErrorLike, MulticastObservableLike, MulticastObservableResourceLike, ObserverLike, ObservableLike, ObservableResourceLike, SubscriberLike, SubjectLike, SubjectResourceLike, } from "./internal/interfaces"; | ||
export { createSubject } from "./internal/subject"; | ||
export { AbstractDelegatingSubscriber } from "./internal/abstractSubscriber"; | ||
export { DelegatingSubscriber } from "./internal/subscriber"; |
@@ -9,4 +9,4 @@ "use strict"; | ||
exports.createSubject = subject_1.createSubject; | ||
var abstractSubscriber_1 = require("./internal/abstractSubscriber"); | ||
exports.AbstractDelegatingSubscriber = abstractSubscriber_1.AbstractDelegatingSubscriber; | ||
var subscriber_1 = require("./internal/subscriber"); | ||
exports.DelegatingSubscriber = subscriber_1.DelegatingSubscriber; | ||
//# sourceMappingURL=index.js.map |
import { DisposableLike, DisposableOrTeardown } from "@reactive-js/disposable"; | ||
import { SchedulerContinuationLike, SchedulerLike } from "@reactive-js/scheduler"; | ||
import { ErrorLike, SubscriberLike } from "./interfaces"; | ||
export declare abstract class AbstractSubscriber<T> implements SubscriberLike<T> { | ||
readonly scheduler: SchedulerLike; | ||
readonly disposable: DisposableLike; | ||
export declare class Subscriber<T> implements SubscriberLike<T> { | ||
private readonly scheduler; | ||
private readonly disposable; | ||
isDisposed: boolean; | ||
@@ -12,9 +12,9 @@ constructor(scheduler: SchedulerLike); | ||
add(disposable: DisposableOrTeardown, ...disposables: DisposableOrTeardown[]): this; | ||
abstract complete(error?: ErrorLike): void; | ||
complete(_?: ErrorLike): void; | ||
dispose(): void; | ||
abstract next(data: T): void; | ||
next(_: T): void; | ||
remove(disposable: DisposableOrTeardown, ...disposables: DisposableOrTeardown[]): this; | ||
schedule(continuation: SchedulerContinuationLike, delay?: number): DisposableLike; | ||
} | ||
export declare abstract class AbstractDelegatingSubscriber<TA, TB> extends AbstractSubscriber<TA> { | ||
export declare class DelegatingSubscriber<TA, TB> extends Subscriber<TA> { | ||
readonly delegate: SubscriberLike<TB>; | ||
@@ -21,0 +21,0 @@ constructor(delegate: SubscriberLike<TB>); |
"use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
const disposable_1 = require("@reactive-js/disposable"); | ||
class AbstractSubscriber { | ||
class Subscriber { | ||
constructor(scheduler) { | ||
@@ -23,5 +23,9 @@ this.scheduler = scheduler; | ||
} | ||
complete(_) { | ||
this.dispose(); | ||
} | ||
dispose() { | ||
this.disposable.dispose(); | ||
} | ||
next(_) { } | ||
remove(disposable, ...disposables) { | ||
@@ -32,10 +36,15 @@ this.disposable.remove(disposable, ...disposables); | ||
schedule(continuation, delay) { | ||
const schedulerSubscription = this.scheduler.schedule(continuation, delay); | ||
this.add(schedulerSubscription); | ||
schedulerSubscription.add(() => this.remove(schedulerSubscription)); | ||
return schedulerSubscription; | ||
if (!this.isDisposed) { | ||
const schedulerSubscription = this.scheduler.schedule(continuation, delay); | ||
this.add(schedulerSubscription); | ||
schedulerSubscription.add(() => this.remove(schedulerSubscription)); | ||
return schedulerSubscription; | ||
} | ||
else { | ||
return disposable_1.disposed; | ||
} | ||
} | ||
} | ||
exports.AbstractSubscriber = AbstractSubscriber; | ||
class AbstractDelegatingSubscriber extends AbstractSubscriber { | ||
exports.Subscriber = Subscriber; | ||
class DelegatingSubscriber extends Subscriber { | ||
constructor(delegate) { | ||
@@ -53,3 +62,3 @@ super(delegate.scheduler || delegate); | ||
} | ||
exports.AbstractDelegatingSubscriber = AbstractDelegatingSubscriber; | ||
exports.DelegatingSubscriber = DelegatingSubscriber; | ||
//# sourceMappingURL=abstractSubscriber.js.map |
@@ -32,2 +32,4 @@ "use strict"; | ||
this.nextQueue.length = 0; | ||
this.isCompleted = false; | ||
this.error = undefined; | ||
}); | ||
@@ -39,13 +41,15 @@ } | ||
onComplete(error) { | ||
if (!(this.isCompleted && this.subscriber.isDisposed)) { | ||
this.isCompleted = true; | ||
this.error = error; | ||
this.scheduleDrainQueue(); | ||
if (this.isCompleted || this.subscriber.isDisposed) { | ||
return; | ||
} | ||
this.isCompleted = true; | ||
this.error = error; | ||
this.scheduleDrainQueue(); | ||
} | ||
onNext(data) { | ||
if (!(this.isCompleted && this.subscriber.isDisposed)) { | ||
this.nextQueue.push(data); | ||
this.scheduleDrainQueue(); | ||
if (this.isCompleted || this.subscriber.isDisposed) { | ||
return; | ||
} | ||
this.nextQueue.push(data); | ||
this.scheduleDrainQueue(); | ||
} | ||
@@ -52,0 +56,0 @@ scheduleDrainQueue() { |
@@ -16,4 +16,4 @@ "use strict"; | ||
class SubjectImpl { | ||
constructor(count) { | ||
this.count = count; | ||
constructor(replayCount) { | ||
this.replayCount = replayCount; | ||
this.disposable = disposable_1.createDisposable(); | ||
@@ -23,5 +23,3 @@ this.isCompleted = false; | ||
this.replayed = []; | ||
this.count = count; | ||
this.add(() => { | ||
this.isCompleted = true; | ||
this.observers.length = 0; | ||
@@ -45,6 +43,8 @@ this.replayed.length = 0; | ||
onComplete(error) { | ||
if (this.isCompleted) { | ||
if (this.isCompleted || this.isDisposed) { | ||
return; | ||
} | ||
this.pushNotification([2, error]); | ||
if (this.replayCount > 0) { | ||
this.pushNotification(2, error); | ||
} | ||
this.isCompleted = true; | ||
@@ -58,6 +58,8 @@ const observers = this.observers.slice(); | ||
onNext(data) { | ||
if (this.isCompleted) { | ||
if (this.isCompleted || this.isDisposed) { | ||
return; | ||
} | ||
this.pushNotification([1, data]); | ||
if (this.replayCount > 0) { | ||
this.pushNotification(1, data); | ||
} | ||
const observers = this.observers.slice(); | ||
@@ -94,5 +96,5 @@ for (const observer of observers) { | ||
} | ||
pushNotification(notif) { | ||
this.replayed.push(notif); | ||
if (this.replayed.length > this.count) { | ||
pushNotification(notif, value) { | ||
this.replayed.push([notif, value]); | ||
if (this.replayed.length > this.replayCount) { | ||
this.replayed.shift(); | ||
@@ -99,0 +101,0 @@ } |
"use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
const abstractSubscriber_1 = require("./abstractSubscriber"); | ||
class AutoDisposingSubscriber extends abstractSubscriber_1.AbstractSubscriber { | ||
constructor(scheduler) { | ||
super(scheduler); | ||
} | ||
complete(_) { | ||
this.dispose(); | ||
} | ||
next(_) { } | ||
} | ||
const subscriber_1 = require("./subscriber"); | ||
exports.subscribe = (scheduler) => (observable) => { | ||
const subscriber = new AutoDisposingSubscriber(scheduler); | ||
const subscriber = new subscriber_1.Subscriber(scheduler); | ||
observable.subscribe(subscriber); | ||
return subscriber.disposable; | ||
return subscriber; | ||
}; | ||
//# sourceMappingURL=subscribe.js.map |
@@ -5,2 +5,2 @@ export { ErrorLike, MulticastObservableLike, MulticastObservableResourceLike, ObserverLike, ObservableLike, ObservableResourceLike, SubscriberLike, SubjectLike, SubjectResourceLike, } from "./internal/interfaces"; | ||
export { createSubject } from "./internal/subject"; | ||
export { AbstractDelegatingSubscriber } from "./internal/abstractSubscriber"; | ||
export { DelegatingSubscriber } from "./internal/subscriber"; |
export { subscribe } from "./internal/subscribe"; | ||
export { createObservable } from "./internal/createObservable"; | ||
export { createSubject } from "./internal/subject"; | ||
export { AbstractDelegatingSubscriber } from "./internal/abstractSubscriber"; | ||
export { DelegatingSubscriber } from "./internal/subscriber"; | ||
//# sourceMappingURL=index.js.map |
import { DisposableLike, DisposableOrTeardown } from "@reactive-js/disposable"; | ||
import { SchedulerContinuationLike, SchedulerLike } from "@reactive-js/scheduler"; | ||
import { ErrorLike, SubscriberLike } from "./interfaces"; | ||
export declare abstract class AbstractSubscriber<T> implements SubscriberLike<T> { | ||
readonly scheduler: SchedulerLike; | ||
readonly disposable: DisposableLike; | ||
export declare class Subscriber<T> implements SubscriberLike<T> { | ||
private readonly scheduler; | ||
private readonly disposable; | ||
isDisposed: boolean; | ||
@@ -12,9 +12,9 @@ constructor(scheduler: SchedulerLike); | ||
add(disposable: DisposableOrTeardown, ...disposables: DisposableOrTeardown[]): this; | ||
abstract complete(error?: ErrorLike): void; | ||
complete(_?: ErrorLike): void; | ||
dispose(): void; | ||
abstract next(data: T): void; | ||
next(_: T): void; | ||
remove(disposable: DisposableOrTeardown, ...disposables: DisposableOrTeardown[]): this; | ||
schedule(continuation: SchedulerContinuationLike, delay?: number): DisposableLike; | ||
} | ||
export declare abstract class AbstractDelegatingSubscriber<TA, TB> extends AbstractSubscriber<TA> { | ||
export declare class DelegatingSubscriber<TA, TB> extends Subscriber<TA> { | ||
readonly delegate: SubscriberLike<TB>; | ||
@@ -21,0 +21,0 @@ constructor(delegate: SubscriberLike<TB>); |
@@ -1,3 +0,3 @@ | ||
import { createDisposable, } from "@reactive-js/disposable"; | ||
export class AbstractSubscriber { | ||
import { createDisposable, disposed, } from "@reactive-js/disposable"; | ||
export class Subscriber { | ||
constructor(scheduler) { | ||
@@ -21,5 +21,9 @@ this.scheduler = scheduler; | ||
} | ||
complete(_) { | ||
this.dispose(); | ||
} | ||
dispose() { | ||
this.disposable.dispose(); | ||
} | ||
next(_) { } | ||
remove(disposable, ...disposables) { | ||
@@ -30,9 +34,14 @@ this.disposable.remove(disposable, ...disposables); | ||
schedule(continuation, delay) { | ||
const schedulerSubscription = this.scheduler.schedule(continuation, delay); | ||
this.add(schedulerSubscription); | ||
schedulerSubscription.add(() => this.remove(schedulerSubscription)); | ||
return schedulerSubscription; | ||
if (!this.isDisposed) { | ||
const schedulerSubscription = this.scheduler.schedule(continuation, delay); | ||
this.add(schedulerSubscription); | ||
schedulerSubscription.add(() => this.remove(schedulerSubscription)); | ||
return schedulerSubscription; | ||
} | ||
else { | ||
return disposed; | ||
} | ||
} | ||
} | ||
export class AbstractDelegatingSubscriber extends AbstractSubscriber { | ||
export class DelegatingSubscriber extends Subscriber { | ||
constructor(delegate) { | ||
@@ -39,0 +48,0 @@ super(delegate.scheduler || delegate); |
@@ -30,2 +30,4 @@ class SafeObserver { | ||
this.nextQueue.length = 0; | ||
this.isCompleted = false; | ||
this.error = undefined; | ||
}); | ||
@@ -37,13 +39,15 @@ } | ||
onComplete(error) { | ||
if (!(this.isCompleted && this.subscriber.isDisposed)) { | ||
this.isCompleted = true; | ||
this.error = error; | ||
this.scheduleDrainQueue(); | ||
if (this.isCompleted || this.subscriber.isDisposed) { | ||
return; | ||
} | ||
this.isCompleted = true; | ||
this.error = error; | ||
this.scheduleDrainQueue(); | ||
} | ||
onNext(data) { | ||
if (!(this.isCompleted && this.subscriber.isDisposed)) { | ||
this.nextQueue.push(data); | ||
this.scheduleDrainQueue(); | ||
if (this.isCompleted || this.subscriber.isDisposed) { | ||
return; | ||
} | ||
this.nextQueue.push(data); | ||
this.scheduleDrainQueue(); | ||
} | ||
@@ -50,0 +54,0 @@ scheduleDrainQueue() { |
@@ -14,4 +14,4 @@ import { createDisposable, } from "@reactive-js/disposable"; | ||
class SubjectImpl { | ||
constructor(count) { | ||
this.count = count; | ||
constructor(replayCount) { | ||
this.replayCount = replayCount; | ||
this.disposable = createDisposable(); | ||
@@ -21,5 +21,3 @@ this.isCompleted = false; | ||
this.replayed = []; | ||
this.count = count; | ||
this.add(() => { | ||
this.isCompleted = true; | ||
this.observers.length = 0; | ||
@@ -43,6 +41,8 @@ this.replayed.length = 0; | ||
onComplete(error) { | ||
if (this.isCompleted) { | ||
if (this.isCompleted || this.isDisposed) { | ||
return; | ||
} | ||
this.pushNotification([2, error]); | ||
if (this.replayCount > 0) { | ||
this.pushNotification(2, error); | ||
} | ||
this.isCompleted = true; | ||
@@ -56,6 +56,8 @@ const observers = this.observers.slice(); | ||
onNext(data) { | ||
if (this.isCompleted) { | ||
if (this.isCompleted || this.isDisposed) { | ||
return; | ||
} | ||
this.pushNotification([1, data]); | ||
if (this.replayCount > 0) { | ||
this.pushNotification(1, data); | ||
} | ||
const observers = this.observers.slice(); | ||
@@ -92,5 +94,5 @@ for (const observer of observers) { | ||
} | ||
pushNotification(notif) { | ||
this.replayed.push(notif); | ||
if (this.replayed.length > this.count) { | ||
pushNotification(notif, value) { | ||
this.replayed.push([notif, value]); | ||
if (this.replayed.length > this.replayCount) { | ||
this.replayed.shift(); | ||
@@ -97,0 +99,0 @@ } |
@@ -1,16 +0,7 @@ | ||
import { AbstractSubscriber } from "./abstractSubscriber"; | ||
class AutoDisposingSubscriber extends AbstractSubscriber { | ||
constructor(scheduler) { | ||
super(scheduler); | ||
} | ||
complete(_) { | ||
this.dispose(); | ||
} | ||
next(_) { } | ||
} | ||
import { Subscriber } from "./subscriber"; | ||
export const subscribe = (scheduler) => (observable) => { | ||
const subscriber = new AutoDisposingSubscriber(scheduler); | ||
const subscriber = new Subscriber(scheduler); | ||
observable.subscribe(subscriber); | ||
return subscriber.disposable; | ||
return subscriber; | ||
}; | ||
//# sourceMappingURL=subscribe.js.map |
@@ -5,3 +5,3 @@ export { ErrorLike, MulticastObservableLike, MulticastObservableResourceLike, ObserverLike, ObservableLike, ObservableResourceLike, SubscriberLike, SubjectLike, SubjectResourceLike, } from "./internal/interfaces"; | ||
export { createSubject } from "./internal/subject"; | ||
export { AbstractDelegatingSubscriber } from "./internal/abstractSubscriber"; | ||
export { DelegatingSubscriber } from "./internal/subscriber"; | ||
//# sourceMappingURL=index.d.ts.map |
import { DisposableLike, DisposableOrTeardown } from "@reactive-js/disposable"; | ||
import { SchedulerContinuationLike, SchedulerLike } from "@reactive-js/scheduler"; | ||
import { ErrorLike, SubscriberLike } from "./interfaces"; | ||
export declare abstract class AbstractSubscriber<T> implements SubscriberLike<T> { | ||
readonly scheduler: SchedulerLike; | ||
readonly disposable: DisposableLike; | ||
export declare class Subscriber<T> implements SubscriberLike<T> { | ||
private readonly scheduler; | ||
private readonly disposable; | ||
isDisposed: boolean; | ||
@@ -12,9 +12,9 @@ constructor(scheduler: SchedulerLike); | ||
add(disposable: DisposableOrTeardown, ...disposables: DisposableOrTeardown[]): this; | ||
abstract complete(error?: ErrorLike): void; | ||
complete(_?: ErrorLike): void; | ||
dispose(): void; | ||
abstract next(data: T): void; | ||
next(_: T): void; | ||
remove(disposable: DisposableOrTeardown, ...disposables: DisposableOrTeardown[]): this; | ||
schedule(continuation: SchedulerContinuationLike, delay?: number): DisposableLike; | ||
} | ||
export declare abstract class AbstractDelegatingSubscriber<TA, TB> extends AbstractSubscriber<TA> { | ||
export declare class DelegatingSubscriber<TA, TB> extends Subscriber<TA> { | ||
readonly delegate: SubscriberLike<TB>; | ||
@@ -21,0 +21,0 @@ constructor(delegate: SubscriberLike<TB>); |
@@ -23,3 +23,3 @@ [@reactive-js/rx](../README.md) › [SubscriberLike](subscriberlike.md) | ||
* [AbstractDelegatingSubscriber](../classes/abstractdelegatingsubscriber.md) | ||
* [DelegatingSubscriber](../classes/delegatingsubscriber.md) | ||
@@ -26,0 +26,0 @@ ## Index |
@@ -9,3 +9,3 @@ [@reactive-js/rx](README.md) | ||
* [AbstractDelegatingSubscriber](classes/abstractdelegatingsubscriber.md) | ||
* [DelegatingSubscriber](classes/delegatingsubscriber.md) | ||
@@ -12,0 +12,0 @@ ### Interfaces |
{ | ||
"name": "@reactive-js/rx", | ||
"version": "0.0.15", | ||
"version": "0.0.16", | ||
"main": "dist/cjs/index.js", | ||
@@ -41,8 +41,8 @@ "module": "dist/esm5/index.js", | ||
"dependencies": { | ||
"@reactive-js/disposable": "^0.0.15", | ||
"@reactive-js/scheduler": "^0.0.15" | ||
"@reactive-js/disposable": "^0.0.16", | ||
"@reactive-js/scheduler": "^0.0.16" | ||
}, | ||
"devDependencies": { | ||
"@reactive-js/pipe": "^0.0.15", | ||
"@reactive-js/schedulers": "^0.0.15", | ||
"@reactive-js/pipe": "^0.0.16", | ||
"@reactive-js/schedulers": "^0.0.16", | ||
"@types/jest": "^24.0.23", | ||
@@ -72,3 +72,3 @@ "jest": "^24.9.0", | ||
}, | ||
"gitHead": "bb6f825c233fcf3516c7b6ee146d262ac836e5f2" | ||
"gitHead": "dcc81a6c83b8b8d8ac95dd07a395f1e5889bdeb8" | ||
} |
@@ -16,2 +16,2 @@ export { | ||
export { createSubject } from "./internal/subject"; | ||
export { AbstractDelegatingSubscriber } from "./internal/abstractSubscriber"; | ||
export { DelegatingSubscriber } from "./internal/subscriber"; |
@@ -36,2 +36,4 @@ import { SchedulerContinuationLike } from "@reactive-js/scheduler"; | ||
this.nextQueue.length = 0; | ||
this.isCompleted = false; | ||
this.error = undefined; | ||
}); | ||
@@ -45,14 +47,18 @@ } | ||
onComplete(error?: ErrorLike) { | ||
if (!(this.isCompleted && this.subscriber.isDisposed)) { | ||
this.isCompleted = true; | ||
this.error = error; | ||
this.scheduleDrainQueue(); | ||
if (this.isCompleted || this.subscriber.isDisposed) { | ||
return; | ||
} | ||
this.isCompleted = true; | ||
this.error = error; | ||
this.scheduleDrainQueue(); | ||
} | ||
onNext(data: T) { | ||
if (!(this.isCompleted && this.subscriber.isDisposed)) { | ||
this.nextQueue.push(data); | ||
this.scheduleDrainQueue(); | ||
if (this.isCompleted || this.subscriber.isDisposed) { | ||
return; | ||
} | ||
this.nextQueue.push(data); | ||
this.scheduleDrainQueue(); | ||
} | ||
@@ -59,0 +65,0 @@ |
@@ -42,6 +42,4 @@ import { | ||
constructor(private readonly count: number) { | ||
this.count = count; | ||
constructor(private readonly replayCount: number) { | ||
this.add(() => { | ||
this.isCompleted = true; | ||
this.observers.length = 0; | ||
@@ -73,12 +71,14 @@ this.replayed.length = 0; | ||
onComplete(error?: ErrorLike) { | ||
if (this.isCompleted) { | ||
if (this.isCompleted || this.isDisposed) { | ||
return; | ||
} | ||
this.pushNotification([NotificationKind.Complete, error]); | ||
if (this.replayCount > 0) { | ||
this.pushNotification(NotificationKind.Complete, error); | ||
} | ||
this.isCompleted = true; | ||
const observers = this.observers.slice(); | ||
this.observers.length = 0; | ||
for (const observer of observers) { | ||
@@ -90,7 +90,9 @@ observer.onComplete(error); | ||
onNext(data: T) { | ||
if (this.isCompleted) { | ||
if (this.isCompleted || this.isDisposed) { | ||
return; | ||
} | ||
this.pushNotification([NotificationKind.Next, data]); | ||
if (this.replayCount > 0) { | ||
this.pushNotification(NotificationKind.Next, data); | ||
} | ||
@@ -143,5 +145,10 @@ const observers = this.observers.slice(); | ||
private pushNotification(notif: Notification<T>) { | ||
this.replayed.push(notif); | ||
if (this.replayed.length > this.count) { | ||
private pushNotification( | ||
notif: NotificationKind.Complete, | ||
error?: ErrorLike, | ||
): void; | ||
private pushNotification(notif: NotificationKind.Next, value: T): void; | ||
private pushNotification(notif: NotificationKind, value: any) { | ||
this.replayed.push([notif, value] as Notification<T>); | ||
if (this.replayed.length > this.replayCount) { | ||
this.replayed.shift(); | ||
@@ -148,0 +155,0 @@ } |
import { DisposableLike } from "@reactive-js/disposable"; | ||
import { OperatorLike } from "@reactive-js/pipe"; | ||
import { SchedulerLike } from "@reactive-js/scheduler"; | ||
import { ErrorLike, ObservableLike, SubscriberLike } from "./interfaces"; | ||
import { AbstractSubscriber } from "./abstractSubscriber"; | ||
import { ObservableLike } from "./interfaces"; | ||
import { Subscriber } from "./subscriber"; | ||
class AutoDisposingSubscriber<T> extends AbstractSubscriber<T> | ||
implements SubscriberLike<T> { | ||
constructor(scheduler: SchedulerLike) { | ||
super(scheduler); | ||
} | ||
complete(_?: ErrorLike) { | ||
this.dispose(); | ||
} | ||
next(_: T) {} | ||
} | ||
/** | ||
@@ -30,5 +17,5 @@ * Safely subscribes an ObservableLike to a SubscriberLike, | ||
): DisposableLike => { | ||
const subscriber = new AutoDisposingSubscriber(scheduler); | ||
const subscriber = new Subscriber(scheduler); | ||
observable.subscribe(subscriber); | ||
return subscriber.disposable; | ||
return subscriber; | ||
}; |
import { subscribe, createObservable, createSubject } from "../src/index"; | ||
import { createDisposable } from "@reactive-js/disposable"; | ||
import { pipe } from "@reactive-js/pipe"; | ||
import { SchedulerLike } from "@reactive-js/scheduler"; | ||
import { createVirtualTimeSchedulerResource } from "@reactive-js/schedulers"; | ||
import { AbstractSubscriber } from "../src/internal/abstractSubscriber"; | ||
import { Subscriber } from "../src/internal/subscriber"; | ||
import { ObserverLike } from "../dist/types"; | ||
class MockSubscriber<T> extends AbstractSubscriber<T> { | ||
readonly isSubscribed = true; | ||
class MockSubscriber<T> extends Subscriber<T> { | ||
next = jest.fn(); | ||
complete = jest.fn(); | ||
constructor(scheduler: SchedulerLike) { | ||
super(scheduler); | ||
} | ||
} | ||
@@ -69,2 +57,24 @@ | ||
}); | ||
test("when subscriber throws", () => { | ||
const cause = new Error(); | ||
class ThrowingSubscriber<T> extends Subscriber<T> { | ||
complete = jest.fn(); | ||
next(_: T) { | ||
throw cause; | ||
} | ||
} | ||
const observable = createObservable(observer => { | ||
observer.onNext(1); | ||
}); | ||
const scheduler = createVirtualTimeSchedulerResource(); | ||
const subscriber = new ThrowingSubscriber(scheduler); | ||
observable.subscribe(subscriber); | ||
scheduler.run(); | ||
expect(subscriber.complete).toBeCalledWith({ cause }); | ||
}); | ||
}); | ||
@@ -81,3 +91,3 @@ | ||
const scheduler = createVirtualTimeSchedulerResource(); | ||
const scheduler = createVirtualTimeSchedulerResource(1); | ||
const subscriber = new MockSubscriber(scheduler); | ||
@@ -84,0 +94,0 @@ subject.subscribe(subscriber); |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
605143
121
2212
+ Added@reactive-js/disposable@0.0.16(transitive)
+ Added@reactive-js/scheduler@0.0.16(transitive)
- Removed@reactive-js/disposable@0.0.15(transitive)
- Removed@reactive-js/scheduler@0.0.15(transitive)