New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

@reactive-js/rx

Package Overview
Dependencies
Maintainers
1
Versions
28
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@reactive-js/rx - npm Package Compare versions

Comparing version 0.0.15 to 0.0.16

coverage/clover.xml

2

dist/cjs/index.d.ts

@@ -5,2 +5,2 @@ export { ErrorLike, MulticastObservableLike, MulticastObservableResourceLike, ObserverLike, ObservableLike, ObservableResourceLike, SubscriberLike, SubjectLike, SubjectResourceLike, } from "./internal/interfaces";

export { createSubject } from "./internal/subject";
export { AbstractDelegatingSubscriber } from "./internal/abstractSubscriber";
export { DelegatingSubscriber } from "./internal/subscriber";

@@ -9,4 +9,4 @@ "use strict";

exports.createSubject = subject_1.createSubject;
var abstractSubscriber_1 = require("./internal/abstractSubscriber");
exports.AbstractDelegatingSubscriber = abstractSubscriber_1.AbstractDelegatingSubscriber;
var subscriber_1 = require("./internal/subscriber");
exports.DelegatingSubscriber = subscriber_1.DelegatingSubscriber;
//# sourceMappingURL=index.js.map
import { DisposableLike, DisposableOrTeardown } from "@reactive-js/disposable";
import { SchedulerContinuationLike, SchedulerLike } from "@reactive-js/scheduler";
import { ErrorLike, SubscriberLike } from "./interfaces";
export declare abstract class AbstractSubscriber<T> implements SubscriberLike<T> {
readonly scheduler: SchedulerLike;
readonly disposable: DisposableLike;
export declare class Subscriber<T> implements SubscriberLike<T> {
private readonly scheduler;
private readonly disposable;
isDisposed: boolean;

@@ -12,9 +12,9 @@ constructor(scheduler: SchedulerLike);

add(disposable: DisposableOrTeardown, ...disposables: DisposableOrTeardown[]): this;
abstract complete(error?: ErrorLike): void;
complete(_?: ErrorLike): void;
dispose(): void;
abstract next(data: T): void;
next(_: T): void;
remove(disposable: DisposableOrTeardown, ...disposables: DisposableOrTeardown[]): this;
schedule(continuation: SchedulerContinuationLike, delay?: number): DisposableLike;
}
export declare abstract class AbstractDelegatingSubscriber<TA, TB> extends AbstractSubscriber<TA> {
export declare class DelegatingSubscriber<TA, TB> extends Subscriber<TA> {
readonly delegate: SubscriberLike<TB>;

@@ -21,0 +21,0 @@ constructor(delegate: SubscriberLike<TB>);

"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
const disposable_1 = require("@reactive-js/disposable");
class AbstractSubscriber {
class Subscriber {
constructor(scheduler) {

@@ -23,5 +23,9 @@ this.scheduler = scheduler;

}
complete(_) {
this.dispose();
}
dispose() {
this.disposable.dispose();
}
next(_) { }
remove(disposable, ...disposables) {

@@ -32,10 +36,15 @@ this.disposable.remove(disposable, ...disposables);

schedule(continuation, delay) {
const schedulerSubscription = this.scheduler.schedule(continuation, delay);
this.add(schedulerSubscription);
schedulerSubscription.add(() => this.remove(schedulerSubscription));
return schedulerSubscription;
if (!this.isDisposed) {
const schedulerSubscription = this.scheduler.schedule(continuation, delay);
this.add(schedulerSubscription);
schedulerSubscription.add(() => this.remove(schedulerSubscription));
return schedulerSubscription;
}
else {
return disposable_1.disposed;
}
}
}
exports.AbstractSubscriber = AbstractSubscriber;
class AbstractDelegatingSubscriber extends AbstractSubscriber {
exports.Subscriber = Subscriber;
class DelegatingSubscriber extends Subscriber {
constructor(delegate) {

@@ -53,3 +62,3 @@ super(delegate.scheduler || delegate);

}
exports.AbstractDelegatingSubscriber = AbstractDelegatingSubscriber;
exports.DelegatingSubscriber = DelegatingSubscriber;
//# sourceMappingURL=abstractSubscriber.js.map

@@ -32,2 +32,4 @@ "use strict";

this.nextQueue.length = 0;
this.isCompleted = false;
this.error = undefined;
});

@@ -39,13 +41,15 @@ }

onComplete(error) {
if (!(this.isCompleted && this.subscriber.isDisposed)) {
this.isCompleted = true;
this.error = error;
this.scheduleDrainQueue();
if (this.isCompleted || this.subscriber.isDisposed) {
return;
}
this.isCompleted = true;
this.error = error;
this.scheduleDrainQueue();
}
onNext(data) {
if (!(this.isCompleted && this.subscriber.isDisposed)) {
this.nextQueue.push(data);
this.scheduleDrainQueue();
if (this.isCompleted || this.subscriber.isDisposed) {
return;
}
this.nextQueue.push(data);
this.scheduleDrainQueue();
}

@@ -52,0 +56,0 @@ scheduleDrainQueue() {

@@ -16,4 +16,4 @@ "use strict";

class SubjectImpl {
constructor(count) {
this.count = count;
constructor(replayCount) {
this.replayCount = replayCount;
this.disposable = disposable_1.createDisposable();

@@ -23,5 +23,3 @@ this.isCompleted = false;

this.replayed = [];
this.count = count;
this.add(() => {
this.isCompleted = true;
this.observers.length = 0;

@@ -45,6 +43,8 @@ this.replayed.length = 0;

onComplete(error) {
if (this.isCompleted) {
if (this.isCompleted || this.isDisposed) {
return;
}
this.pushNotification([2, error]);
if (this.replayCount > 0) {
this.pushNotification(2, error);
}
this.isCompleted = true;

@@ -58,6 +58,8 @@ const observers = this.observers.slice();

onNext(data) {
if (this.isCompleted) {
if (this.isCompleted || this.isDisposed) {
return;
}
this.pushNotification([1, data]);
if (this.replayCount > 0) {
this.pushNotification(1, data);
}
const observers = this.observers.slice();

@@ -94,5 +96,5 @@ for (const observer of observers) {

}
pushNotification(notif) {
this.replayed.push(notif);
if (this.replayed.length > this.count) {
pushNotification(notif, value) {
this.replayed.push([notif, value]);
if (this.replayed.length > this.replayCount) {
this.replayed.shift();

@@ -99,0 +101,0 @@ }

"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
const abstractSubscriber_1 = require("./abstractSubscriber");
class AutoDisposingSubscriber extends abstractSubscriber_1.AbstractSubscriber {
constructor(scheduler) {
super(scheduler);
}
complete(_) {
this.dispose();
}
next(_) { }
}
const subscriber_1 = require("./subscriber");
exports.subscribe = (scheduler) => (observable) => {
const subscriber = new AutoDisposingSubscriber(scheduler);
const subscriber = new subscriber_1.Subscriber(scheduler);
observable.subscribe(subscriber);
return subscriber.disposable;
return subscriber;
};
//# sourceMappingURL=subscribe.js.map

@@ -5,2 +5,2 @@ export { ErrorLike, MulticastObservableLike, MulticastObservableResourceLike, ObserverLike, ObservableLike, ObservableResourceLike, SubscriberLike, SubjectLike, SubjectResourceLike, } from "./internal/interfaces";

export { createSubject } from "./internal/subject";
export { AbstractDelegatingSubscriber } from "./internal/abstractSubscriber";
export { DelegatingSubscriber } from "./internal/subscriber";
export { subscribe } from "./internal/subscribe";
export { createObservable } from "./internal/createObservable";
export { createSubject } from "./internal/subject";
export { AbstractDelegatingSubscriber } from "./internal/abstractSubscriber";
export { DelegatingSubscriber } from "./internal/subscriber";
//# sourceMappingURL=index.js.map
import { DisposableLike, DisposableOrTeardown } from "@reactive-js/disposable";
import { SchedulerContinuationLike, SchedulerLike } from "@reactive-js/scheduler";
import { ErrorLike, SubscriberLike } from "./interfaces";
export declare abstract class AbstractSubscriber<T> implements SubscriberLike<T> {
readonly scheduler: SchedulerLike;
readonly disposable: DisposableLike;
export declare class Subscriber<T> implements SubscriberLike<T> {
private readonly scheduler;
private readonly disposable;
isDisposed: boolean;

@@ -12,9 +12,9 @@ constructor(scheduler: SchedulerLike);

add(disposable: DisposableOrTeardown, ...disposables: DisposableOrTeardown[]): this;
abstract complete(error?: ErrorLike): void;
complete(_?: ErrorLike): void;
dispose(): void;
abstract next(data: T): void;
next(_: T): void;
remove(disposable: DisposableOrTeardown, ...disposables: DisposableOrTeardown[]): this;
schedule(continuation: SchedulerContinuationLike, delay?: number): DisposableLike;
}
export declare abstract class AbstractDelegatingSubscriber<TA, TB> extends AbstractSubscriber<TA> {
export declare class DelegatingSubscriber<TA, TB> extends Subscriber<TA> {
readonly delegate: SubscriberLike<TB>;

@@ -21,0 +21,0 @@ constructor(delegate: SubscriberLike<TB>);

@@ -1,3 +0,3 @@

import { createDisposable, } from "@reactive-js/disposable";
export class AbstractSubscriber {
import { createDisposable, disposed, } from "@reactive-js/disposable";
export class Subscriber {
constructor(scheduler) {

@@ -21,5 +21,9 @@ this.scheduler = scheduler;

}
complete(_) {
this.dispose();
}
dispose() {
this.disposable.dispose();
}
next(_) { }
remove(disposable, ...disposables) {

@@ -30,9 +34,14 @@ this.disposable.remove(disposable, ...disposables);

schedule(continuation, delay) {
const schedulerSubscription = this.scheduler.schedule(continuation, delay);
this.add(schedulerSubscription);
schedulerSubscription.add(() => this.remove(schedulerSubscription));
return schedulerSubscription;
if (!this.isDisposed) {
const schedulerSubscription = this.scheduler.schedule(continuation, delay);
this.add(schedulerSubscription);
schedulerSubscription.add(() => this.remove(schedulerSubscription));
return schedulerSubscription;
}
else {
return disposed;
}
}
}
export class AbstractDelegatingSubscriber extends AbstractSubscriber {
export class DelegatingSubscriber extends Subscriber {
constructor(delegate) {

@@ -39,0 +48,0 @@ super(delegate.scheduler || delegate);

@@ -30,2 +30,4 @@ class SafeObserver {

this.nextQueue.length = 0;
this.isCompleted = false;
this.error = undefined;
});

@@ -37,13 +39,15 @@ }

onComplete(error) {
if (!(this.isCompleted && this.subscriber.isDisposed)) {
this.isCompleted = true;
this.error = error;
this.scheduleDrainQueue();
if (this.isCompleted || this.subscriber.isDisposed) {
return;
}
this.isCompleted = true;
this.error = error;
this.scheduleDrainQueue();
}
onNext(data) {
if (!(this.isCompleted && this.subscriber.isDisposed)) {
this.nextQueue.push(data);
this.scheduleDrainQueue();
if (this.isCompleted || this.subscriber.isDisposed) {
return;
}
this.nextQueue.push(data);
this.scheduleDrainQueue();
}

@@ -50,0 +54,0 @@ scheduleDrainQueue() {

@@ -14,4 +14,4 @@ import { createDisposable, } from "@reactive-js/disposable";

class SubjectImpl {
constructor(count) {
this.count = count;
constructor(replayCount) {
this.replayCount = replayCount;
this.disposable = createDisposable();

@@ -21,5 +21,3 @@ this.isCompleted = false;

this.replayed = [];
this.count = count;
this.add(() => {
this.isCompleted = true;
this.observers.length = 0;

@@ -43,6 +41,8 @@ this.replayed.length = 0;

onComplete(error) {
if (this.isCompleted) {
if (this.isCompleted || this.isDisposed) {
return;
}
this.pushNotification([2, error]);
if (this.replayCount > 0) {
this.pushNotification(2, error);
}
this.isCompleted = true;

@@ -56,6 +56,8 @@ const observers = this.observers.slice();

onNext(data) {
if (this.isCompleted) {
if (this.isCompleted || this.isDisposed) {
return;
}
this.pushNotification([1, data]);
if (this.replayCount > 0) {
this.pushNotification(1, data);
}
const observers = this.observers.slice();

@@ -92,5 +94,5 @@ for (const observer of observers) {

}
pushNotification(notif) {
this.replayed.push(notif);
if (this.replayed.length > this.count) {
pushNotification(notif, value) {
this.replayed.push([notif, value]);
if (this.replayed.length > this.replayCount) {
this.replayed.shift();

@@ -97,0 +99,0 @@ }

@@ -1,16 +0,7 @@

import { AbstractSubscriber } from "./abstractSubscriber";
class AutoDisposingSubscriber extends AbstractSubscriber {
constructor(scheduler) {
super(scheduler);
}
complete(_) {
this.dispose();
}
next(_) { }
}
import { Subscriber } from "./subscriber";
export const subscribe = (scheduler) => (observable) => {
const subscriber = new AutoDisposingSubscriber(scheduler);
const subscriber = new Subscriber(scheduler);
observable.subscribe(subscriber);
return subscriber.disposable;
return subscriber;
};
//# sourceMappingURL=subscribe.js.map

@@ -5,3 +5,3 @@ export { ErrorLike, MulticastObservableLike, MulticastObservableResourceLike, ObserverLike, ObservableLike, ObservableResourceLike, SubscriberLike, SubjectLike, SubjectResourceLike, } from "./internal/interfaces";

export { createSubject } from "./internal/subject";
export { AbstractDelegatingSubscriber } from "./internal/abstractSubscriber";
export { DelegatingSubscriber } from "./internal/subscriber";
//# sourceMappingURL=index.d.ts.map
import { DisposableLike, DisposableOrTeardown } from "@reactive-js/disposable";
import { SchedulerContinuationLike, SchedulerLike } from "@reactive-js/scheduler";
import { ErrorLike, SubscriberLike } from "./interfaces";
export declare abstract class AbstractSubscriber<T> implements SubscriberLike<T> {
readonly scheduler: SchedulerLike;
readonly disposable: DisposableLike;
export declare class Subscriber<T> implements SubscriberLike<T> {
private readonly scheduler;
private readonly disposable;
isDisposed: boolean;

@@ -12,9 +12,9 @@ constructor(scheduler: SchedulerLike);

add(disposable: DisposableOrTeardown, ...disposables: DisposableOrTeardown[]): this;
abstract complete(error?: ErrorLike): void;
complete(_?: ErrorLike): void;
dispose(): void;
abstract next(data: T): void;
next(_: T): void;
remove(disposable: DisposableOrTeardown, ...disposables: DisposableOrTeardown[]): this;
schedule(continuation: SchedulerContinuationLike, delay?: number): DisposableLike;
}
export declare abstract class AbstractDelegatingSubscriber<TA, TB> extends AbstractSubscriber<TA> {
export declare class DelegatingSubscriber<TA, TB> extends Subscriber<TA> {
readonly delegate: SubscriberLike<TB>;

@@ -21,0 +21,0 @@ constructor(delegate: SubscriberLike<TB>);

@@ -23,3 +23,3 @@ [@reactive-js/rx](../README.md) › [SubscriberLike](subscriberlike.md)

* [AbstractDelegatingSubscriber](../classes/abstractdelegatingsubscriber.md)
* [DelegatingSubscriber](../classes/delegatingsubscriber.md)

@@ -26,0 +26,0 @@ ## Index

@@ -9,3 +9,3 @@ [@reactive-js/rx](README.md)

* [AbstractDelegatingSubscriber](classes/abstractdelegatingsubscriber.md)
* [DelegatingSubscriber](classes/delegatingsubscriber.md)

@@ -12,0 +12,0 @@ ### Interfaces

{
"name": "@reactive-js/rx",
"version": "0.0.15",
"version": "0.0.16",
"main": "dist/cjs/index.js",

@@ -41,8 +41,8 @@ "module": "dist/esm5/index.js",

"dependencies": {
"@reactive-js/disposable": "^0.0.15",
"@reactive-js/scheduler": "^0.0.15"
"@reactive-js/disposable": "^0.0.16",
"@reactive-js/scheduler": "^0.0.16"
},
"devDependencies": {
"@reactive-js/pipe": "^0.0.15",
"@reactive-js/schedulers": "^0.0.15",
"@reactive-js/pipe": "^0.0.16",
"@reactive-js/schedulers": "^0.0.16",
"@types/jest": "^24.0.23",

@@ -72,3 +72,3 @@ "jest": "^24.9.0",

},
"gitHead": "bb6f825c233fcf3516c7b6ee146d262ac836e5f2"
"gitHead": "dcc81a6c83b8b8d8ac95dd07a395f1e5889bdeb8"
}

@@ -16,2 +16,2 @@ export {

export { createSubject } from "./internal/subject";
export { AbstractDelegatingSubscriber } from "./internal/abstractSubscriber";
export { DelegatingSubscriber } from "./internal/subscriber";

@@ -36,2 +36,4 @@ import { SchedulerContinuationLike } from "@reactive-js/scheduler";

this.nextQueue.length = 0;
this.isCompleted = false;
this.error = undefined;
});

@@ -45,14 +47,18 @@ }

onComplete(error?: ErrorLike) {
if (!(this.isCompleted && this.subscriber.isDisposed)) {
this.isCompleted = true;
this.error = error;
this.scheduleDrainQueue();
if (this.isCompleted || this.subscriber.isDisposed) {
return;
}
this.isCompleted = true;
this.error = error;
this.scheduleDrainQueue();
}
onNext(data: T) {
if (!(this.isCompleted && this.subscriber.isDisposed)) {
this.nextQueue.push(data);
this.scheduleDrainQueue();
if (this.isCompleted || this.subscriber.isDisposed) {
return;
}
this.nextQueue.push(data);
this.scheduleDrainQueue();
}

@@ -59,0 +65,0 @@

@@ -42,6 +42,4 @@ import {

constructor(private readonly count: number) {
this.count = count;
constructor(private readonly replayCount: number) {
this.add(() => {
this.isCompleted = true;
this.observers.length = 0;

@@ -73,12 +71,14 @@ this.replayed.length = 0;

onComplete(error?: ErrorLike) {
if (this.isCompleted) {
if (this.isCompleted || this.isDisposed) {
return;
}
this.pushNotification([NotificationKind.Complete, error]);
if (this.replayCount > 0) {
this.pushNotification(NotificationKind.Complete, error);
}
this.isCompleted = true;
const observers = this.observers.slice();
this.observers.length = 0;
for (const observer of observers) {

@@ -90,7 +90,9 @@ observer.onComplete(error);

onNext(data: T) {
if (this.isCompleted) {
if (this.isCompleted || this.isDisposed) {
return;
}
this.pushNotification([NotificationKind.Next, data]);
if (this.replayCount > 0) {
this.pushNotification(NotificationKind.Next, data);
}

@@ -143,5 +145,10 @@ const observers = this.observers.slice();

private pushNotification(notif: Notification<T>) {
this.replayed.push(notif);
if (this.replayed.length > this.count) {
private pushNotification(
notif: NotificationKind.Complete,
error?: ErrorLike,
): void;
private pushNotification(notif: NotificationKind.Next, value: T): void;
private pushNotification(notif: NotificationKind, value: any) {
this.replayed.push([notif, value] as Notification<T>);
if (this.replayed.length > this.replayCount) {
this.replayed.shift();

@@ -148,0 +155,0 @@ }

import { DisposableLike } from "@reactive-js/disposable";
import { OperatorLike } from "@reactive-js/pipe";
import { SchedulerLike } from "@reactive-js/scheduler";
import { ErrorLike, ObservableLike, SubscriberLike } from "./interfaces";
import { AbstractSubscriber } from "./abstractSubscriber";
import { ObservableLike } from "./interfaces";
import { Subscriber } from "./subscriber";
class AutoDisposingSubscriber<T> extends AbstractSubscriber<T>
implements SubscriberLike<T> {
constructor(scheduler: SchedulerLike) {
super(scheduler);
}
complete(_?: ErrorLike) {
this.dispose();
}
next(_: T) {}
}
/**

@@ -30,5 +17,5 @@ * Safely subscribes an ObservableLike to a SubscriberLike,

): DisposableLike => {
const subscriber = new AutoDisposingSubscriber(scheduler);
const subscriber = new Subscriber(scheduler);
observable.subscribe(subscriber);
return subscriber.disposable;
return subscriber;
};
import { subscribe, createObservable, createSubject } from "../src/index";
import { createDisposable } from "@reactive-js/disposable";
import { pipe } from "@reactive-js/pipe";
import { SchedulerLike } from "@reactive-js/scheduler";
import { createVirtualTimeSchedulerResource } from "@reactive-js/schedulers";
import { AbstractSubscriber } from "../src/internal/abstractSubscriber";
import { Subscriber } from "../src/internal/subscriber";
import { ObserverLike } from "../dist/types";
class MockSubscriber<T> extends AbstractSubscriber<T> {
readonly isSubscribed = true;
class MockSubscriber<T> extends Subscriber<T> {
next = jest.fn();
complete = jest.fn();
constructor(scheduler: SchedulerLike) {
super(scheduler);
}
}

@@ -69,2 +57,24 @@

});
test("when subscriber throws", () => {
const cause = new Error();
class ThrowingSubscriber<T> extends Subscriber<T> {
complete = jest.fn();
next(_: T) {
throw cause;
}
}
const observable = createObservable(observer => {
observer.onNext(1);
});
const scheduler = createVirtualTimeSchedulerResource();
const subscriber = new ThrowingSubscriber(scheduler);
observable.subscribe(subscriber);
scheduler.run();
expect(subscriber.complete).toBeCalledWith({ cause });
});
});

@@ -81,3 +91,3 @@

const scheduler = createVirtualTimeSchedulerResource();
const scheduler = createVirtualTimeSchedulerResource(1);
const subscriber = new MockSubscriber(scheduler);

@@ -84,0 +94,0 @@ subject.subscribe(subscriber);

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc