New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

@reactive-js/rx

Package Overview
Dependencies
Maintainers
1
Versions
28
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@reactive-js/rx - npm Package Compare versions

Comparing version 0.0.13 to 0.0.14

8

dist/cjs/internal/abstractSubscriber.d.ts

@@ -6,7 +6,8 @@ import { DisposableLike, DisposableOrTeardown } from "@reactive-js/disposable";

readonly scheduler: SchedulerLike;
readonly subscription: DisposableLike;
constructor(scheduler: SchedulerLike, subscription: DisposableLike);
readonly disposable: DisposableLike;
constructor(scheduler: SchedulerLike, disposable: DisposableLike);
get inScheduledContinuation(): boolean;
abstract get isCompleted(): boolean;
get isDisposed(): boolean;
abstract get isSubscribed(): boolean;
get isDisposed(): boolean;
get now(): number;

@@ -17,2 +18,3 @@ add(disposable: DisposableOrTeardown, ...disposables: DisposableOrTeardown[]): this;

abstract next(data: T): void;
abstract nextUnsafe(data: T): void;
remove(disposable: DisposableOrTeardown, ...disposables: DisposableOrTeardown[]): this;

@@ -19,0 +21,0 @@ schedule(continuation: SchedulerContinuationLike, delay?: number): DisposableLike;

"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
class AbstractSubscriber {
constructor(scheduler, subscription) {
constructor(scheduler, disposable) {
this.scheduler = scheduler;
this.subscription = subscription;
this.disposable = disposable;
}

@@ -12,3 +12,3 @@ get inScheduledContinuation() {

get isDisposed() {
return this.subscription.isDisposed;
return this.disposable.isDisposed;
}

@@ -19,10 +19,10 @@ get now() {

add(disposable, ...disposables) {
this.subscription.add(disposable, ...disposables);
this.disposable.add(disposable, ...disposables);
return this;
}
dispose() {
this.subscription.dispose();
this.disposable.dispose();
}
remove(disposable, ...disposables) {
this.subscription.remove(disposable, ...disposables);
this.disposable.remove(disposable, ...disposables);
return this;

@@ -29,0 +29,0 @@ }

@@ -14,3 +14,3 @@ "use strict";

catch (cause) {
observer.complete({ cause });
observer.onComplete({ cause });
}

@@ -17,0 +17,0 @@ };

@@ -1,6 +0,6 @@

import { ErrorLike, ObserverLike, SubscriberLike } from "./interfaces";
import { ErrorLike, SubscriberLike } from "./interfaces";
import { AbstractSubscriber } from "./abstractSubscriber";
export declare abstract class AbstractDelegatingSubscriber<TA, TB> extends AbstractSubscriber<TA> {
readonly delegate: ObserverLike<TB>;
private isStopped;
readonly delegate: SubscriberLike<TB>;
isCompleted: boolean;
constructor(delegate: SubscriberLike<TB>);

@@ -10,3 +10,3 @@ get isSubscribed(): boolean;

next(data: TA): void;
abstract completeUnsafe(error?: ErrorLike): void;
protected abstract completeUnsafe(error?: ErrorLike): void;
abstract nextUnsafe(data: TA): void;

@@ -13,0 +13,0 @@ private tryComplete;

@@ -7,7 +7,7 @@ "use strict";

constructor(delegate) {
super(delegate.scheduler || delegate, delegate.subscription || delegate);
this.isStopped = false;
super(delegate.scheduler || delegate, delegate.disposable || delegate);
this.isCompleted = false;
this.delegate = delegate;
this.add(() => {
this.isStopped = true;
this.isCompleted = true;
});

@@ -22,4 +22,4 @@ }

}
if (!this.isStopped) {
this.isStopped = true;
if (!this.isCompleted) {
this.isCompleted = true;
this.tryComplete(error);

@@ -32,3 +32,3 @@ }

}
if (!this.isStopped) {
if (!this.isCompleted) {
this.tryNext(data);

@@ -35,0 +35,0 @@ }

@@ -7,8 +7,12 @@ import { DisposableLike } from "@reactive-js/disposable";

export interface ObserverLike<T> {
onComplete(error?: ErrorLike): void;
onNext(data: T): void;
}
export interface SubscriberLike<T> extends SchedulerResourceLike {
readonly isCompleted: boolean;
readonly isSubscribed: boolean;
complete(error?: ErrorLike): void;
next(data: T): void;
nextUnsafe(data: T): void;
}
export interface SubscriberLike<T> extends ObserverLike<T>, DisposableLike, SchedulerResourceLike {
readonly isSubscribed: boolean;
}
export interface ObservableLike<T> {

@@ -15,0 +19,0 @@ subscribe(subscriber: SubscriberLike<T>): void;

@@ -5,19 +5,25 @@ "use strict";

constructor(subscriber) {
this.isComplete = false;
this.isCompleted = false;
this.nextQueue = [];
this.teardown = () => {
this.nextQueue.length = 0;
this.isComplete = true;
this.isCompleted = true;
};
this.drainQueue = shouldYield => {
while (this.nextQueue.length > 0) {
const next = this.nextQueue.shift();
this.subscriber.next(next);
const yieldRequest = shouldYield();
const hasMoreEvents = this.remainingEvents > 0;
if (yieldRequest && hasMoreEvents) {
return this.continuation;
try {
while (this.nextQueue.length > 0 && !this.subscriber.isCompleted) {
const next = this.nextQueue.shift();
this.subscriber.nextUnsafe(next);
const yieldRequest = shouldYield();
const hasMoreEvents = this.remainingEvents > 0;
if (yieldRequest && hasMoreEvents) {
return this.continuation;
}
}
}
if (this.isComplete) {
catch (cause) {
this.isCompleted = true;
this.error = { cause };
}
if (this.isCompleted) {
this.subscriber.remove(this.teardown).complete(this.error);

@@ -32,7 +38,7 @@ }

get remainingEvents() {
return this.nextQueue.length + (this.isComplete ? 1 : 0);
return this.nextQueue.length + (this.isCompleted ? 1 : 0);
}
complete(error) {
if (!this.isComplete) {
this.isComplete = true;
onComplete(error) {
if (!this.isCompleted) {
this.isCompleted = true;
this.error = error;

@@ -42,4 +48,4 @@ this.scheduleDrainQueue();

}
next(data) {
if (!this.isComplete) {
onNext(data) {
if (!this.isCompleted) {
this.nextQueue.push(data);

@@ -46,0 +52,0 @@ this.scheduleDrainQueue();

@@ -8,6 +8,6 @@ "use strict";

case 1:
observer.next(notification[1]);
observer.onNext(notification[1]);
break;
case 2:
observer.complete(notification[1]);
observer.onComplete(notification[1]);
break;

@@ -39,3 +39,6 @@ }

}
complete(error) {
dispose() {
this.disposable.dispose();
}
onComplete(error) {
if (this.isCompleted) {

@@ -46,12 +49,9 @@ return;

this.isCompleted = true;
const subscribers = this.observers.slice();
const observers = this.observers.slice();
this.observers.length = 0;
for (const subscriber of subscribers) {
subscriber.complete(error);
for (const observer of observers) {
observer.onComplete(error);
}
}
dispose() {
this.disposable.dispose();
}
next(data) {
onNext(data) {
if (this.isCompleted) {

@@ -61,5 +61,5 @@ return;

this.pushNotification([1, data]);
const subscribers = this.observers.slice();
for (const subscriber of subscribers) {
subscriber.next(data);
const observers = this.observers.slice();
for (const observer of observers) {
observer.onNext(data);
}

@@ -66,0 +66,0 @@ }

@@ -7,6 +7,9 @@ "use strict";

class AutoDisposingSubscriber extends abstractSubscriber_1.AbstractSubscriber {
constructor(scheduler, subscription) {
super(scheduler, subscription);
constructor(scheduler, disposable) {
super(scheduler, disposable);
this._isSubscribed = false;
}
get isCompleted() {
return this.disposable.isDisposed;
}
get isSubscribed() {

@@ -29,2 +32,3 @@ return this._isSubscribed;

}
nextUnsafe(_) { }
}

@@ -31,0 +35,0 @@ exports.subscribe = (scheduler) => (observable) => {

@@ -6,7 +6,8 @@ import { DisposableLike, DisposableOrTeardown } from "@reactive-js/disposable";

readonly scheduler: SchedulerLike;
readonly subscription: DisposableLike;
constructor(scheduler: SchedulerLike, subscription: DisposableLike);
readonly disposable: DisposableLike;
constructor(scheduler: SchedulerLike, disposable: DisposableLike);
get inScheduledContinuation(): boolean;
abstract get isCompleted(): boolean;
get isDisposed(): boolean;
abstract get isSubscribed(): boolean;
get isDisposed(): boolean;
get now(): number;

@@ -17,2 +18,3 @@ add(disposable: DisposableOrTeardown, ...disposables: DisposableOrTeardown[]): this;

abstract next(data: T): void;
abstract nextUnsafe(data: T): void;
remove(disposable: DisposableOrTeardown, ...disposables: DisposableOrTeardown[]): this;

@@ -19,0 +21,0 @@ schedule(continuation: SchedulerContinuationLike, delay?: number): DisposableLike;

export class AbstractSubscriber {
constructor(scheduler, subscription) {
constructor(scheduler, disposable) {
this.scheduler = scheduler;
this.subscription = subscription;
this.disposable = disposable;
}

@@ -10,3 +10,3 @@ get inScheduledContinuation() {

get isDisposed() {
return this.subscription.isDisposed;
return this.disposable.isDisposed;
}

@@ -17,10 +17,10 @@ get now() {

add(disposable, ...disposables) {
this.subscription.add(disposable, ...disposables);
this.disposable.add(disposable, ...disposables);
return this;
}
dispose() {
this.subscription.dispose();
this.disposable.dispose();
}
remove(disposable, ...disposables) {
this.subscription.remove(disposable, ...disposables);
this.disposable.remove(disposable, ...disposables);
return this;

@@ -27,0 +27,0 @@ }

@@ -12,3 +12,3 @@ import { createSafeObserver } from "./safeObserver";

catch (cause) {
observer.complete({ cause });
observer.onComplete({ cause });
}

@@ -15,0 +15,0 @@ };

@@ -1,6 +0,6 @@

import { ErrorLike, ObserverLike, SubscriberLike } from "./interfaces";
import { ErrorLike, SubscriberLike } from "./interfaces";
import { AbstractSubscriber } from "./abstractSubscriber";
export declare abstract class AbstractDelegatingSubscriber<TA, TB> extends AbstractSubscriber<TA> {
readonly delegate: ObserverLike<TB>;
private isStopped;
readonly delegate: SubscriberLike<TB>;
isCompleted: boolean;
constructor(delegate: SubscriberLike<TB>);

@@ -10,3 +10,3 @@ get isSubscribed(): boolean;

next(data: TA): void;
abstract completeUnsafe(error?: ErrorLike): void;
protected abstract completeUnsafe(error?: ErrorLike): void;
abstract nextUnsafe(data: TA): void;

@@ -13,0 +13,0 @@ private tryComplete;

@@ -5,7 +5,7 @@ import { AbstractSubscriber, checkState } from "./abstractSubscriber";

constructor(delegate) {
super(delegate.scheduler || delegate, delegate.subscription || delegate);
this.isStopped = false;
super(delegate.scheduler || delegate, delegate.disposable || delegate);
this.isCompleted = false;
this.delegate = delegate;
this.add(() => {
this.isStopped = true;
this.isCompleted = true;
});

@@ -20,4 +20,4 @@ }

}
if (!this.isStopped) {
this.isStopped = true;
if (!this.isCompleted) {
this.isCompleted = true;
this.tryComplete(error);

@@ -30,3 +30,3 @@ }

}
if (!this.isStopped) {
if (!this.isCompleted) {
this.tryNext(data);

@@ -33,0 +33,0 @@ }

@@ -7,8 +7,12 @@ import { DisposableLike } from "@reactive-js/disposable";

export interface ObserverLike<T> {
onComplete(error?: ErrorLike): void;
onNext(data: T): void;
}
export interface SubscriberLike<T> extends SchedulerResourceLike {
readonly isCompleted: boolean;
readonly isSubscribed: boolean;
complete(error?: ErrorLike): void;
next(data: T): void;
nextUnsafe(data: T): void;
}
export interface SubscriberLike<T> extends ObserverLike<T>, DisposableLike, SchedulerResourceLike {
readonly isSubscribed: boolean;
}
export interface ObservableLike<T> {

@@ -15,0 +19,0 @@ subscribe(subscriber: SubscriberLike<T>): void;

class SafeObserver {
constructor(subscriber) {
this.isComplete = false;
this.isCompleted = false;
this.nextQueue = [];
this.teardown = () => {
this.nextQueue.length = 0;
this.isComplete = true;
this.isCompleted = true;
};
this.drainQueue = shouldYield => {
while (this.nextQueue.length > 0) {
const next = this.nextQueue.shift();
this.subscriber.next(next);
const yieldRequest = shouldYield();
const hasMoreEvents = this.remainingEvents > 0;
if (yieldRequest && hasMoreEvents) {
return this.continuation;
try {
while (this.nextQueue.length > 0 && !this.subscriber.isCompleted) {
const next = this.nextQueue.shift();
this.subscriber.nextUnsafe(next);
const yieldRequest = shouldYield();
const hasMoreEvents = this.remainingEvents > 0;
if (yieldRequest && hasMoreEvents) {
return this.continuation;
}
}
}
if (this.isComplete) {
catch (cause) {
this.isCompleted = true;
this.error = { cause };
}
if (this.isCompleted) {
this.subscriber.remove(this.teardown).complete(this.error);

@@ -29,7 +35,7 @@ }

get remainingEvents() {
return this.nextQueue.length + (this.isComplete ? 1 : 0);
return this.nextQueue.length + (this.isCompleted ? 1 : 0);
}
complete(error) {
if (!this.isComplete) {
this.isComplete = true;
onComplete(error) {
if (!this.isCompleted) {
this.isCompleted = true;
this.error = error;

@@ -39,4 +45,4 @@ this.scheduleDrainQueue();

}
next(data) {
if (!this.isComplete) {
onNext(data) {
if (!this.isCompleted) {
this.nextQueue.push(data);

@@ -43,0 +49,0 @@ this.scheduleDrainQueue();

@@ -6,6 +6,6 @@ import { createDisposable, } from "@reactive-js/disposable";

case 1:
observer.next(notification[1]);
observer.onNext(notification[1]);
break;
case 2:
observer.complete(notification[1]);
observer.onComplete(notification[1]);
break;

@@ -37,3 +37,6 @@ }

}
complete(error) {
dispose() {
this.disposable.dispose();
}
onComplete(error) {
if (this.isCompleted) {

@@ -44,12 +47,9 @@ return;

this.isCompleted = true;
const subscribers = this.observers.slice();
const observers = this.observers.slice();
this.observers.length = 0;
for (const subscriber of subscribers) {
subscriber.complete(error);
for (const observer of observers) {
observer.onComplete(error);
}
}
dispose() {
this.disposable.dispose();
}
next(data) {
onNext(data) {
if (this.isCompleted) {

@@ -59,5 +59,5 @@ return;

this.pushNotification([1, data]);
const subscribers = this.observers.slice();
for (const subscriber of subscribers) {
subscriber.next(data);
const observers = this.observers.slice();
for (const observer of observers) {
observer.onNext(data);
}

@@ -64,0 +64,0 @@ }

@@ -5,6 +5,9 @@ import { createDisposable } from "@reactive-js/disposable";

class AutoDisposingSubscriber extends AbstractSubscriber {
constructor(scheduler, subscription) {
super(scheduler, subscription);
constructor(scheduler, disposable) {
super(scheduler, disposable);
this._isSubscribed = false;
}
get isCompleted() {
return this.disposable.isDisposed;
}
get isSubscribed() {

@@ -27,2 +30,3 @@ return this._isSubscribed;

}
nextUnsafe(_) { }
}

@@ -29,0 +33,0 @@ export const subscribe = (scheduler) => (observable) => {

@@ -6,7 +6,8 @@ import { DisposableLike, DisposableOrTeardown } from "@reactive-js/disposable";

readonly scheduler: SchedulerLike;
readonly subscription: DisposableLike;
constructor(scheduler: SchedulerLike, subscription: DisposableLike);
readonly disposable: DisposableLike;
constructor(scheduler: SchedulerLike, disposable: DisposableLike);
get inScheduledContinuation(): boolean;
abstract get isCompleted(): boolean;
get isDisposed(): boolean;
abstract get isSubscribed(): boolean;
get isDisposed(): boolean;
get now(): number;

@@ -17,2 +18,3 @@ add(disposable: DisposableOrTeardown, ...disposables: DisposableOrTeardown[]): this;

abstract next(data: T): void;
abstract nextUnsafe(data: T): void;
remove(disposable: DisposableOrTeardown, ...disposables: DisposableOrTeardown[]): this;

@@ -19,0 +21,0 @@ schedule(continuation: SchedulerContinuationLike, delay?: number): DisposableLike;

@@ -1,6 +0,6 @@

import { ErrorLike, ObserverLike, SubscriberLike } from "./interfaces";
import { ErrorLike, SubscriberLike } from "./interfaces";
import { AbstractSubscriber } from "./abstractSubscriber";
export declare abstract class AbstractDelegatingSubscriber<TA, TB> extends AbstractSubscriber<TA> {
readonly delegate: ObserverLike<TB>;
private isStopped;
readonly delegate: SubscriberLike<TB>;
isCompleted: boolean;
constructor(delegate: SubscriberLike<TB>);

@@ -10,3 +10,3 @@ get isSubscribed(): boolean;

next(data: TA): void;
abstract completeUnsafe(error?: ErrorLike): void;
protected abstract completeUnsafe(error?: ErrorLike): void;
abstract nextUnsafe(data: TA): void;

@@ -13,0 +13,0 @@ private tryComplete;

@@ -7,8 +7,12 @@ import { DisposableLike } from "@reactive-js/disposable";

export interface ObserverLike<T> {
onComplete(error?: ErrorLike): void;
onNext(data: T): void;
}
export interface SubscriberLike<T> extends SchedulerResourceLike {
readonly isCompleted: boolean;
readonly isSubscribed: boolean;
complete(error?: ErrorLike): void;
next(data: T): void;
nextUnsafe(data: T): void;
}
export interface SubscriberLike<T> extends ObserverLike<T>, DisposableLike, SchedulerResourceLike {
readonly isSubscribed: boolean;
}
export interface ObservableLike<T> {

@@ -15,0 +19,0 @@ subscribe(subscriber: SubscriberLike<T>): void;

@@ -32,7 +32,7 @@ [@reactive-js/rx](../README.md) › [AbstractDelegatingSubscriber](abstractdelegatingsubscriber.md)

* [delegate](abstractdelegatingsubscriber.md#delegate)
* [isCompleted](abstractdelegatingsubscriber.md#iscompleted)
### Methods
* [completeUnsafe](abstractdelegatingsubscriber.md#abstract-completeunsafe)
* [nextUnsafe](abstractdelegatingsubscriber.md#abstract-nextunsafe)
* [completeUnsafe](abstractdelegatingsubscriber.md#protected-abstract-completeunsafe)

@@ -59,28 +59,21 @@ ## Constructors

• **delegate**: *[ObserverLike](../interfaces/observerlike.md)‹TB›*
• **delegate**: *[SubscriberLike](../interfaces/subscriberlike.md)‹TB›*
## Methods
___
### `Abstract` completeUnsafe
### isCompleted
▸ **completeUnsafe**(`error?`: [ErrorLike](../interfaces/errorlike.md)): *void*
• **isCompleted**: *boolean* = false
Override to handle complete notification. Implementations
may throw errors which will be caught and propogated.
*Implementation of [SubscriberLike](../interfaces/subscriberlike.md).[isCompleted](../interfaces/subscriberlike.md#iscompleted)*
**Parameters:**
*Overrides void*
Name | Type | Description |
------ | ------ | ------ |
`error?` | [ErrorLike](../interfaces/errorlike.md) | |
## Methods
**Returns:** *void*
### `Protected` `Abstract` completeUnsafe
___
▸ **completeUnsafe**(`error?`: [ErrorLike](../interfaces/errorlike.md)): *void*
### `Abstract` nextUnsafe
▸ **nextUnsafe**(`data`: TA): *void*
Overried to handle incoming next notifications. Implementations
Override to handle complete notification. Implementations
may throw errors which will be caught and propogated.

@@ -92,4 +85,4 @@

------ | ------ | ------ |
`data` | TA | |
`error?` | [ErrorLike](../interfaces/errorlike.md) | |
**Returns:** *void*

@@ -15,4 +15,2 @@ [@reactive-js/rx](../README.md) › [ObserverLike](observerlike.md)

↳ [SubscriberLike](subscriberlike.md)
↳ [SubjectLike](subjectlike.md)

@@ -24,10 +22,10 @@

* [complete](observerlike.md#complete)
* [next](observerlike.md#next)
* [onComplete](observerlike.md#oncomplete)
* [onNext](observerlike.md#onnext)
## Methods
### complete
### onComplete
▸ **complete**(`error?`: [ErrorLike](errorlike.md)): *void*
▸ **onComplete**(`error?`: [ErrorLike](errorlike.md)): *void*

@@ -46,5 +44,5 @@ Called by a provider to indicate that it is done sending push-based notifications.

### next
### onNext
▸ **next**(`data`: T): *void*
▸ **onNext**(`data`: T): *void*

@@ -51,0 +49,0 @@ Provides the next item to observe.

@@ -19,6 +19,2 @@ [@reactive-js/rx](../README.md) › [SubscriberLike](subscriberlike.md)

* [ObserverLike](observerlike.md)‹T›
* DisposableLike
* SchedulerResourceLike

@@ -36,6 +32,19 @@

* [isCompleted](subscriberlike.md#iscompleted)
* [isSubscribed](subscriberlike.md#issubscribed)
### Methods
* [complete](subscriberlike.md#complete)
* [next](subscriberlike.md#next)
* [nextUnsafe](subscriberlike.md#nextunsafe)
## Properties
### isCompleted
• **isCompleted**: *boolean*
___
### isSubscribed

@@ -45,2 +54,42 @@

Returns true if the subscriber has been subscribed to an observable.
## Methods
### complete
▸ **complete**(`error?`: [ErrorLike](errorlike.md)): *void*
**Parameters:**
Name | Type |
------ | ------ |
`error?` | [ErrorLike](errorlike.md) |
**Returns:** *void*
___
### next
▸ **next**(`data`: T): *void*
**Parameters:**
Name | Type |
------ | ------ |
`data` | T |
**Returns:** *void*
___
### nextUnsafe
▸ **nextUnsafe**(`data`: T): *void*
**Parameters:**
Name | Type |
------ | ------ |
`data` | T |
**Returns:** *void*
{
"name": "@reactive-js/rx",
"version": "0.0.13",
"version": "0.0.14",
"main": "dist/cjs/index.js",

@@ -41,8 +41,8 @@ "module": "dist/esm5/index.js",

"dependencies": {
"@reactive-js/disposable": "^0.0.13",
"@reactive-js/scheduler": "^0.0.13"
"@reactive-js/disposable": "^0.0.14",
"@reactive-js/scheduler": "^0.0.14"
},
"devDependencies": {
"@reactive-js/pipe": "^0.0.13",
"@reactive-js/schedulers": "^0.0.13",
"@reactive-js/pipe": "^0.0.14",
"@reactive-js/schedulers": "^0.0.14",
"@types/jest": "^24.0.23",

@@ -72,3 +72,3 @@ "jest": "^24.9.0",

},
"gitHead": "86211c5e64b3702b5e3b00dc31d40a79f35bd02f"
"gitHead": "8a86dc5efc38fdfe6683765bf2ff3eb14f3820de"
}

@@ -11,6 +11,7 @@ import { DisposableLike, DisposableOrTeardown } from "@reactive-js/disposable";

readonly scheduler: SchedulerLike;
readonly subscription: DisposableLike;
constructor(scheduler: SchedulerLike, subscription: DisposableLike) {
readonly disposable: DisposableLike;
constructor(scheduler: SchedulerLike, disposable: DisposableLike) {
this.scheduler = scheduler;
this.subscription = subscription;
this.disposable = disposable;
}

@@ -22,8 +23,10 @@

abstract get isSubscribed(): boolean;
abstract get isCompleted(): boolean;
get isDisposed() {
return this.subscription.isDisposed;
return this.disposable.isDisposed;
}
abstract get isSubscribed(): boolean;
get now() {

@@ -37,3 +40,3 @@ return this.scheduler.now;

) {
this.subscription.add(disposable, ...disposables);
this.disposable.add(disposable, ...disposables);
return this;

@@ -45,5 +48,8 @@ }

dispose() {
this.subscription.dispose();
this.disposable.dispose();
}
abstract next(data: T): void;
abstract nextUnsafe(data: T): void;

@@ -54,3 +60,3 @@ remove(

) {
this.subscription.remove(disposable, ...disposables);
this.disposable.remove(disposable, ...disposables);
return this;

@@ -57,0 +63,0 @@ }

@@ -32,3 +32,3 @@ import { DisposableOrTeardown } from "@reactive-js/disposable";

} catch (cause) {
observer.complete({ cause });
observer.onComplete({ cause });
}

@@ -35,0 +35,0 @@ };

@@ -1,2 +0,2 @@

import { ErrorLike, ObserverLike, SubscriberLike } from "./interfaces";
import { ErrorLike, SubscriberLike } from "./interfaces";
import { AbstractSubscriber, checkState } from "./abstractSubscriber";

@@ -15,4 +15,4 @@

> extends AbstractSubscriber<TA> {
readonly delegate: ObserverLike<TB>;
private isStopped = false;
readonly delegate: SubscriberLike<TB>;
isCompleted = false;

@@ -22,3 +22,3 @@ constructor(delegate: SubscriberLike<TB>) {

(delegate as any).scheduler || delegate,
(delegate as any).subscription || delegate,
(delegate as any).disposable || delegate,
);

@@ -29,3 +29,3 @@

this.add(() => {
this.isStopped = true;
this.isCompleted = true;
});

@@ -36,3 +36,3 @@ }

get isSubscribed() {
return (this.delegate as SubscriberLike<unknown>).isSubscribed;
return this.delegate.isSubscribed;
}

@@ -46,4 +46,4 @@

if (!this.isStopped) {
this.isStopped = true;
if (!this.isCompleted) {
this.isCompleted = true;
this.tryComplete(error);

@@ -59,3 +59,3 @@ }

if (!this.isStopped) {
if (!this.isCompleted) {
this.tryNext(data);

@@ -71,10 +71,5 @@ }

*/
abstract completeUnsafe(error?: ErrorLike): void;
protected abstract completeUnsafe(error?: ErrorLike): void;
/**
* Overried to handle incoming next notifications. Implementations
* may throw errors which will be caught and propogated.
*
* @param data
*/
/** @ignore */
abstract nextUnsafe(data: TA): void;

@@ -81,0 +76,0 @@

@@ -21,3 +21,3 @@ import { DisposableLike } from "@reactive-js/disposable";

*/
complete(error?: ErrorLike): void;
onComplete(error?: ErrorLike): void;

@@ -29,3 +29,3 @@ /**

*/
next(data: T): void;
onNext(data: T): void;
}

@@ -45,7 +45,10 @@

export interface SubscriberLike<T>
extends ObserverLike<T>,
DisposableLike,
SchedulerResourceLike {
/** Returns true if the subscriber has been subscribed to an observable. */
extends SchedulerResourceLike {
readonly isCompleted: boolean;
readonly isSubscribed: boolean;
complete(error?: ErrorLike): void;
next(data: T): void;
nextUnsafe(data: T): void;
}

@@ -52,0 +55,0 @@

@@ -6,3 +6,3 @@ import { SchedulerContinuationLike } from "@reactive-js/scheduler";

private error: ErrorLike | undefined;
private isComplete = false;
private isCompleted = false;
private readonly nextQueue: Array<T> = [];

@@ -12,18 +12,23 @@ private readonly subscriber: SubscriberLike<T>;

this.nextQueue.length = 0;
this.isComplete = true;
this.isCompleted = true;
};
private readonly drainQueue: SchedulerContinuationLike = shouldYield => {
while (this.nextQueue.length > 0) {
const next = this.nextQueue.shift() as T;
this.subscriber.next(next);
try {
while (this.nextQueue.length > 0 && !this.subscriber.isCompleted) {
const next = this.nextQueue.shift() as T;
this.subscriber.nextUnsafe(next);
const yieldRequest = shouldYield();
const hasMoreEvents = this.remainingEvents > 0;
const yieldRequest = shouldYield();
const hasMoreEvents = this.remainingEvents > 0;
if (yieldRequest && hasMoreEvents) {
return this.continuation;
if (yieldRequest && hasMoreEvents) {
return this.continuation;
}
}
} catch (cause) {
this.isCompleted = true;
this.error = { cause };
}
if (this.isComplete) {
if (this.isCompleted) {
this.subscriber.remove(this.teardown).complete(this.error);

@@ -34,2 +39,3 @@ }

private readonly continuation = { continuation: this.drainQueue };
constructor(subscriber: SubscriberLike<T>) {

@@ -41,8 +47,8 @@ this.subscriber = subscriber;

private get remainingEvents() {
return this.nextQueue.length + (this.isComplete ? 1 : 0);
return this.nextQueue.length + (this.isCompleted ? 1 : 0);
}
complete(error?: ErrorLike) {
if (!this.isComplete) {
this.isComplete = true;
onComplete(error?: ErrorLike) {
if (!this.isCompleted) {
this.isCompleted = true;
this.error = error;

@@ -53,4 +59,4 @@ this.scheduleDrainQueue();

next(data: T) {
if (!this.isComplete) {
onNext(data: T) {
if (!this.isCompleted) {
this.nextQueue.push(data);

@@ -57,0 +63,0 @@ this.scheduleDrainQueue();

@@ -29,6 +29,6 @@ import {

case NotificationKind.Next:
observer.next(notification[1]);
observer.onNext(notification[1]);
break;
case NotificationKind.Complete:
observer.complete(notification[1]);
observer.onComplete(notification[1]);
break;

@@ -71,3 +71,7 @@ }

complete(error?: ErrorLike) {
dispose() {
this.disposable.dispose();
}
onComplete(error?: ErrorLike) {
if (this.isCompleted) {

@@ -80,15 +84,11 @@ return;

this.isCompleted = true;
const subscribers = this.observers.slice();
const observers = this.observers.slice();
this.observers.length = 0;
for (const subscriber of subscribers) {
subscriber.complete(error);
for (const observer of observers) {
observer.onComplete(error);
}
}
dispose() {
this.disposable.dispose();
}
next(data: T) {
onNext(data: T) {
if (this.isCompleted) {

@@ -100,8 +100,9 @@ return;

const subscribers = this.observers.slice();
for (const subscriber of subscribers) {
subscriber.next(data);
const observers = this.observers.slice();
for (const observer of observers) {
observer.onNext(data);
}
}
remove(

@@ -108,0 +109,0 @@ disposable: DisposableOrTeardown,

@@ -13,5 +13,10 @@ import { createDisposable, DisposableLike } from "@reactive-js/disposable";

constructor(scheduler: SchedulerLike, subscription: DisposableLike) {
super(scheduler, subscription);
constructor(scheduler: SchedulerLike, disposable: DisposableLike) {
super(scheduler, disposable);
}
get isCompleted() {
return this.disposable.isDisposed;
}
get isSubscribed() {

@@ -38,2 +43,4 @@ return this._isSubscribed;

}
nextUnsafe(_: T){}
}

@@ -40,0 +47,0 @@

@@ -21,3 +21,6 @@ import {

readonly isSubscribed = true;
readonly isCompleted = false;
next = jest.fn();
nextUnsafe = jest.fn();
complete = jest.fn();

@@ -56,3 +59,3 @@

test("auto-disposes the subscription on complete", () => {
const observable = createObservable(observer => observer.complete());
const observable = createObservable(observer => observer.onComplete());
const scheduler = createVirtualTimeSchedulerResource();

@@ -104,6 +107,6 @@

subject.next(1);
subject.next(2);
subject.next(3);
subject.complete();
subject.onNext(1);
subject.onNext(2);
subject.onNext(3);
subject.onComplete();

@@ -124,5 +127,5 @@ const scheduler = createVirtualTimeSchedulerResource();

subject.next(1);
subject.next(2);
subject.next(3);
subject.onNext(1);
subject.onNext(2);
subject.onNext(3);

@@ -133,4 +136,4 @@ const scheduler = createVirtualTimeSchedulerResource();

scheduler.schedule(_ => {
subject.next(4);
subject.complete();
subject.onNext(4);
subject.onComplete();
});

@@ -150,5 +153,5 @@

subject.next(1);
subject.next(2);
subject.next(3);
subject.onNext(1);
subject.onNext(2);
subject.onNext(3);

@@ -180,4 +183,4 @@ const scheduler = createVirtualTimeSchedulerResource();

subject.next(1);
subject.complete();
subject.onNext(1);
subject.onComplete();
expect(subscriber.next).toHaveBeenCalledTimes(0);

@@ -184,0 +187,0 @@ expect(subscriber.complete).toHaveBeenCalledTimes(0);

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc