Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

@reactive-js/observable

Package Overview
Dependencies
Maintainers
1
Versions
27
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@reactive-js/observable - npm Package Compare versions

Comparing version 0.0.16 to 0.0.17

coverage/lcov-report/src/internal/buffer.ts.html

1

dist/cjs/index.d.ts

@@ -0,1 +1,2 @@

export { buffer } from "./internal/buffer";
export { lift } from "./internal/lift";

@@ -2,0 +3,0 @@ export { observe, onComplete, onError, onNext } from "./internal/observe";

"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
var buffer_1 = require("./internal/buffer");
exports.buffer = buffer_1.buffer;
var lift_1 = require("./internal/lift");

@@ -4,0 +6,0 @@ exports.lift = lift_1.lift;

54

dist/cjs/internal/concat.js
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
const disposable_1 = require("@reactive-js/disposable");
const rx_1 = require("@reactive-js/rx");
const fromArray_1 = require("./fromArray");
const observe_1 = require("./observe");
const pipe_1 = require("@reactive-js/pipe");
const defer_1 = require("./defer");
class ConcatObservable {
constructor(observables) {
class ConcatSubscriber extends rx_1.DelegatingSubscriber {
constructor(delegate, observables) {
super(delegate);
this.observables = observables;
this.innerSubscription = disposable_1.disposed;
this.index = 0;
}
onNext(v) {
this.subscriber.next(v);
}
onComplete(error) {
const subscriber = this.subscriber;
subscriber.remove(this.innerSubscription);
complete(error) {
if (error !== undefined) {
subscriber.complete(error);
this.delegate.complete(error);
}
else if (!this.subscribeNext()) {
subscriber.complete();
else {
const head = this.observables[this.index];
const hasNextObservable = head !== undefined;
if (hasNextObservable) {
this.index++;
head.subscribe(this);
}
else {
this.delegate.complete();
}
}
}
subscribeNext() {
const head = this.observables[this.index];
const hasNextObservable = head !== undefined;
if (hasNextObservable) {
this.index++;
const subscriber = this.subscriber;
this.innerSubscription = pipe_1.pipe(head, observe_1.observe(this), rx_1.subscribe(subscriber));
subscriber.add(this.innerSubscription);
}
return hasNextObservable;
next(data) {
this.delegate.next(data);
}
}
class ConcatObservable {
constructor(observables) {
this.observables = observables;
}
subscribe(subscriber) {
this.subscriber = subscriber;
this.subscribeNext();
const concatSubscriber = new ConcatSubscriber(subscriber, this.observables);
concatSubscriber.complete();
}
}
function concat(...observables) {
return defer_1.defer(() => new ConcatObservable(observables));
return new ConcatObservable(observables);
}

@@ -47,0 +43,0 @@ exports.concat = concat;

@@ -62,13 +62,16 @@ "use strict";

this.error = undefined;
const observer = {
onNext: (value) => {
this.value = [value];
},
onComplete: e => {
this.error = e;
},
};
const subscription = pipe_1.pipe(observable, observe_1.observe(observer), rx_1.subscribe(scheduler));
const subscription = pipe_1.pipe(observable, observe_1.observe(this), rx_1.subscribe(scheduler));
scheduler.add(subscription);
}
onNext(value) {
if (this.value === undefined) {
this.value = [value];
}
else {
this.value[0] = value;
}
}
onComplete(error) {
this.error = error;
}
next() {

@@ -75,0 +78,0 @@ disposable_1.throwIfDisposed(this.scheduler);

@@ -35,11 +35,15 @@ "use strict";

});
exports.onError = (onError) => exports.observe({
onNext: ignore,
onComplete: (error) => {
class OnErrorObserver {
constructor(onError) {
this.onError = onError;
}
onComplete(error) {
if (error !== undefined) {
const { cause } = error;
onError(cause);
this.onError(cause);
}
},
});
}
onNext(_) { }
}
exports.onError = (onError) => exports.observe(new OnErrorObserver(onError));
exports.onNext = (onNext) => exports.observe({

@@ -46,0 +50,0 @@ onNext,

@@ -16,20 +16,20 @@ "use strict";

this.value = undefined;
this.notifyNext = () => {
const value = this.value;
if (value !== undefined) {
this.value = undefined;
const [next] = value;
this.setupDurationSubscription(next);
try {
this.delegate.next(next);
}
catch (cause) {
this.delegate.complete({ cause });
}
}
};
this.add(this.durationSubscription);
}
notifyNext() {
const value = this.value;
if (value !== undefined) {
this.value = undefined;
const [next] = value;
this.setupDurationSubscription(next);
try {
this.delegate.next(next);
}
catch (cause) {
this.delegate.complete({ cause });
}
}
}
setupDurationSubscription(next) {
this.durationSubscription.disposable = pipe_1.pipe(this.durationSelector(next), observe_1.onComplete(this.notifyNext), rx_1.subscribe(this));
this.durationSubscription.disposable = pipe_1.pipe(this.durationSelector(next), observe_1.observe(this), rx_1.subscribe(this));
}

@@ -61,2 +61,11 @@ complete(error) {

}
onComplete(error) {
if (error !== undefined) {
this.complete(error);
}
else {
this.notifyNext();
}
}
onNext(_) { }
}

@@ -63,0 +72,0 @@ const throttleOperator = (durationSelector, mode) => subscriber => new ThrottleSubscriber(subscriber, durationSelector, mode);

@@ -7,8 +7,8 @@ "use strict";

this.delay = delay;
this.subscribe = (subscriber) => {
const continuation = (_) => {
subscriber.complete(this.error);
};
subscriber.schedule(continuation, this.delay);
}
subscribe(subscriber) {
const continuation = (_) => {
subscriber.complete(this.error);
};
subscriber.schedule(continuation, this.delay);
}

@@ -15,0 +15,0 @@ }

@@ -19,6 +19,10 @@ "use strict";

if (!this.isDisposed) {
this.durationSubscription.disposable = pipe_1.pipe(this.duration, observe_1.onComplete(error => this.complete(error)), rx_1.subscribe(this));
this.durationSubscription.disposable = pipe_1.pipe(this.duration, observe_1.observe(this), rx_1.subscribe(this));
this.delegate.next(data);
}
}
onComplete(error) {
this.complete(error);
}
onNext(_) { }
}

@@ -25,0 +29,0 @@ const operator = (duration) => subscriber => new TimeoutSubscriber(subscriber, duration);

@@ -0,1 +1,2 @@

export { buffer } from "./internal/buffer";
export { lift } from "./internal/lift";

@@ -2,0 +3,0 @@ export { observe, onComplete, onError, onNext } from "./internal/observe";

@@ -0,1 +1,2 @@

export { buffer } from "./internal/buffer";
export { lift } from "./internal/lift";

@@ -2,0 +3,0 @@ export { observe, onComplete, onError, onNext } from "./internal/observe";

@@ -1,44 +0,40 @@

import { disposed } from "@reactive-js/disposable";
import { subscribe, } from "@reactive-js/rx";
import { DelegatingSubscriber, } from "@reactive-js/rx";
import { fromArray } from "./fromArray";
import { observe } from "./observe";
import { pipe } from "@reactive-js/pipe";
import { defer } from "./defer";
class ConcatObservable {
constructor(observables) {
class ConcatSubscriber extends DelegatingSubscriber {
constructor(delegate, observables) {
super(delegate);
this.observables = observables;
this.innerSubscription = disposed;
this.index = 0;
}
onNext(v) {
this.subscriber.next(v);
}
onComplete(error) {
const subscriber = this.subscriber;
subscriber.remove(this.innerSubscription);
complete(error) {
if (error !== undefined) {
subscriber.complete(error);
this.delegate.complete(error);
}
else if (!this.subscribeNext()) {
subscriber.complete();
else {
const head = this.observables[this.index];
const hasNextObservable = head !== undefined;
if (hasNextObservable) {
this.index++;
head.subscribe(this);
}
else {
this.delegate.complete();
}
}
}
subscribeNext() {
const head = this.observables[this.index];
const hasNextObservable = head !== undefined;
if (hasNextObservable) {
this.index++;
const subscriber = this.subscriber;
this.innerSubscription = pipe(head, observe(this), subscribe(subscriber));
subscriber.add(this.innerSubscription);
}
return hasNextObservable;
next(data) {
this.delegate.next(data);
}
}
class ConcatObservable {
constructor(observables) {
this.observables = observables;
}
subscribe(subscriber) {
this.subscriber = subscriber;
this.subscribeNext();
const concatSubscriber = new ConcatSubscriber(subscriber, this.observables);
concatSubscriber.complete();
}
}
export function concat(...observables) {
return defer(() => new ConcatObservable(observables));
return new ConcatObservable(observables);
}

@@ -45,0 +41,0 @@ export function startWith(...values) {

@@ -60,13 +60,16 @@ import { subscribe, } from "@reactive-js/rx";

this.error = undefined;
const observer = {
onNext: (value) => {
this.value = [value];
},
onComplete: e => {
this.error = e;
},
};
const subscription = pipe(observable, observe(observer), subscribe(scheduler));
const subscription = pipe(observable, observe(this), subscribe(scheduler));
scheduler.add(subscription);
}
onNext(value) {
if (this.value === undefined) {
this.value = [value];
}
else {
this.value[0] = value;
}
}
onComplete(error) {
this.error = error;
}
next() {

@@ -73,0 +76,0 @@ throwIfDisposed(this.scheduler);

@@ -33,11 +33,15 @@ import { DelegatingSubscriber, } from "@reactive-js/rx";

});
export const onError = (onError) => observe({
onNext: ignore,
onComplete: (error) => {
class OnErrorObserver {
constructor(onError) {
this.onError = onError;
}
onComplete(error) {
if (error !== undefined) {
const { cause } = error;
onError(cause);
this.onError(cause);
}
},
});
}
onNext(_) { }
}
export const onError = (onError) => observe(new OnErrorObserver(onError));
export const onNext = (onNext) => observe({

@@ -44,0 +48,0 @@ onNext,

@@ -5,3 +5,3 @@ import { createSerialDisposable, } from "@reactive-js/disposable";

import { lift } from "./lift";
import { onComplete } from "./observe";
import { observe } from "./observe";
import { pipe } from "@reactive-js/pipe";

@@ -15,20 +15,20 @@ class ThrottleSubscriber extends DelegatingSubscriber {

this.value = undefined;
this.notifyNext = () => {
const value = this.value;
if (value !== undefined) {
this.value = undefined;
const [next] = value;
this.setupDurationSubscription(next);
try {
this.delegate.next(next);
}
catch (cause) {
this.delegate.complete({ cause });
}
}
};
this.add(this.durationSubscription);
}
notifyNext() {
const value = this.value;
if (value !== undefined) {
this.value = undefined;
const [next] = value;
this.setupDurationSubscription(next);
try {
this.delegate.next(next);
}
catch (cause) {
this.delegate.complete({ cause });
}
}
}
setupDurationSubscription(next) {
this.durationSubscription.disposable = pipe(this.durationSelector(next), onComplete(this.notifyNext), subscribe(this));
this.durationSubscription.disposable = pipe(this.durationSelector(next), observe(this), subscribe(this));
}

@@ -60,2 +60,11 @@ complete(error) {

}
onComplete(error) {
if (error !== undefined) {
this.complete(error);
}
else {
this.notifyNext();
}
}
onNext(_) { }
}

@@ -62,0 +71,0 @@ const throttleOperator = (durationSelector, mode) => subscriber => new ThrottleSubscriber(subscriber, durationSelector, mode);

@@ -5,8 +5,8 @@ class ThrowsObservable {

this.delay = delay;
this.subscribe = (subscriber) => {
const continuation = (_) => {
subscriber.complete(this.error);
};
subscriber.schedule(continuation, this.delay);
}
subscribe(subscriber) {
const continuation = (_) => {
subscriber.complete(this.error);
};
subscriber.schedule(continuation, this.delay);
}

@@ -13,0 +13,0 @@ }

import { createSerialDisposable, } from "@reactive-js/disposable";
import { subscribe, DelegatingSubscriber, } from "@reactive-js/rx";
import { lift } from "./lift";
import { onComplete } from "./observe";
import { observe } from "./observe";
import { pipe } from "@reactive-js/pipe";

@@ -17,6 +17,10 @@ import { throws } from "./throws";

if (!this.isDisposed) {
this.durationSubscription.disposable = pipe(this.duration, onComplete(error => this.complete(error)), subscribe(this));
this.durationSubscription.disposable = pipe(this.duration, observe(this), subscribe(this));
this.delegate.next(data);
}
}
onComplete(error) {
this.complete(error);
}
onNext(_) { }
}

@@ -23,0 +27,0 @@ const operator = (duration) => subscriber => new TimeoutSubscriber(subscriber, duration);

@@ -0,1 +1,2 @@

export { buffer } from "./internal/buffer";
export { lift } from "./internal/lift";

@@ -2,0 +3,0 @@ export { observe, onComplete, onError, onNext } from "./internal/observe";

@@ -18,2 +18,3 @@ [@reactive-js/observable](README.md)

* [buffer](README.md#const-buffer)
* [combineLatest](README.md#combinelatest)

@@ -67,2 +68,21 @@ * [concat](README.md#concat)

### `Const` buffer
▸ **buffer**<**T**>(`duration`: function | number, `maxBufferSize`: number): *[ObservableOperatorLike](interfaces/observableoperatorlike.md)‹T, keyof T[]›*
**Type parameters:**
▪ **T**
**Parameters:**
Name | Type | Default |
------ | ------ | ------ |
`duration` | function &#124; number | - |
`maxBufferSize` | number | Number.MAX_SAFE_INTEGER |
**Returns:** *[ObservableOperatorLike](interfaces/observableoperatorlike.md)‹T, keyof T[]›*
___
### combineLatest

@@ -497,3 +517,3 @@

▸ **fromArray**<**T**>(`values`: ReadonlyArray‹T›, `delay`: number): *ObservableLike‹T›*
▸ **fromArray**<**T**>(`values`: keyof T[], `delay`: number): *ObservableLike‹T›*

@@ -508,3 +528,3 @@ **Type parameters:**

------ | ------ | ------ |
`values` | ReadonlyArray‹T› | - |
`values` | keyof T[] | - |
`delay` | number | 0 |

@@ -1096,3 +1116,3 @@

▸ **throws**<**T**>(`cause`: unknown, `delay?`: undefined | number): *ObservableLike‹T›*
▸ **throws**<**T**>(`cause`: unknown, `delay`: number): *ObservableLike‹T›*

@@ -1105,6 +1125,6 @@ **Type parameters:**

Name | Type |
------ | ------ |
`cause` | unknown |
`delay?` | undefined &#124; number |
Name | Type | Default |
------ | ------ | ------ |
`cause` | unknown | - |
`delay` | number | 0 |

@@ -1111,0 +1131,0 @@ **Returns:** *ObservableLike‹T›*

{
"name": "@reactive-js/observable",
"version": "0.0.16",
"version": "0.0.17",
"main": "dist/cjs/index.js",

@@ -41,7 +41,7 @@ "module": "dist/esm5/index.js",

"dependencies": {
"@reactive-js/disposable": "^0.0.16",
"@reactive-js/pipe": "^0.0.16",
"@reactive-js/rx": "^0.0.16",
"@reactive-js/scheduler": "^0.0.16",
"@reactive-js/schedulers": "^0.0.16"
"@reactive-js/disposable": "^0.0.17",
"@reactive-js/pipe": "^0.0.17",
"@reactive-js/rx": "^0.0.17",
"@reactive-js/scheduler": "^0.0.17",
"@reactive-js/schedulers": "^0.0.17"
},

@@ -73,3 +73,3 @@ "devDependencies": {

},
"gitHead": "dcc81a6c83b8b8d8ac95dd07a395f1e5889bdeb8"
"gitHead": "ad2250c08f04d3d48f0d6db2393444719ed21dc7"
}

@@ -0,1 +1,2 @@

export { buffer } from "./internal/buffer";
export { lift } from "./internal/lift";

@@ -2,0 +3,0 @@ export { observe, onComplete, onError, onNext } from "./internal/observe";

@@ -1,2 +0,1 @@

import { disposed } from "@reactive-js/disposable";
import {

@@ -6,52 +5,44 @@ ErrorLike,

SubscriberLike,
subscribe,
ObserverLike,
DelegatingSubscriber,
} from "@reactive-js/rx";
import { fromArray } from "./fromArray";
import { observe } from "./observe";
import { pipe } from "@reactive-js/pipe";
import { ObservableOperatorLike } from "./interfaces";
import { defer } from "./defer";
class ConcatObservable<T> implements ObservableLike<T>, ObserverLike<T> {
private subscriber: SubscriberLike<T> | undefined;
private innerSubscription = disposed;
class ConcatSubscriber<T> extends DelegatingSubscriber<T, T> {
private index = 0;
constructor(private readonly observables: readonly ObservableLike<T>[]) {}
onNext(v: T) {
(this.subscriber as SubscriberLike<T>).next(v);
constructor(
delegate: SubscriberLike<T>,
private readonly observables: readonly ObservableLike<T>[],
) {
super(delegate);
}
onComplete(error?: ErrorLike) {
const subscriber = this.subscriber as SubscriberLike<T>;
subscriber.remove(this.innerSubscription);
complete(error?: ErrorLike) {
if (error !== undefined) {
this.delegate.complete(error);
} else {
const head = this.observables[this.index];
const hasNextObservable = head !== undefined;
if (error !== undefined) {
subscriber.complete(error);
} else if (!this.subscribeNext()) {
subscriber.complete();
if (hasNextObservable) {
this.index++;
head.subscribe(this);
} else {
this.delegate.complete();
}
}
}
subscribeNext() {
const head = this.observables[this.index];
const hasNextObservable = head !== undefined;
next(data: T) {
this.delegate.next(data);
}
}
if (hasNextObservable) {
this.index++;
class ConcatObservable<T> implements ObservableLike<T> {
constructor(private readonly observables: readonly ObservableLike<T>[]) {}
const subscriber = this.subscriber as SubscriberLike<T>;
this.innerSubscription = pipe(head, observe(this), subscribe(subscriber));
subscriber.add(this.innerSubscription);
}
return hasNextObservable;
}
subscribe(subscriber: SubscriberLike<T>) {
this.subscriber = subscriber;
this.subscribeNext();
const concatSubscriber = new ConcatSubscriber(subscriber, this.observables);
concatSubscriber.complete();
}

@@ -68,3 +59,3 @@ }

): ObservableLike<T> {
return defer(() => new ConcatObservable(observables));
return new ConcatObservable(observables);
}

@@ -71,0 +62,0 @@

@@ -12,2 +12,22 @@ import { ObservableLike, SubscriberLike } from "@reactive-js/rx";

private readonly continuation: SchedulerContinuationLike = shouldYield => {
let error = undefined;
try {
const result = this.loop(shouldYield);
if (result !== undefined) {
return result;
}
} catch (cause) {
error = { cause };
}
(this.subscriber as SubscriberLike<T>).complete(error);
return;
};
private readonly continuationResult: SchedulerContinuationResultLike = {
continuation: this.continuation,
delay: this.delay,
};
constructor(

@@ -40,22 +60,2 @@ private readonly values: readonly T[],

private readonly continuation: SchedulerContinuationLike = shouldYield => {
let error = undefined;
try {
const result = this.loop(shouldYield);
if (result !== undefined) {
return result;
}
} catch (cause) {
error = { cause };
}
(this.subscriber as SubscriberLike<T>).complete(error);
return;
};
private readonly continuationResult: SchedulerContinuationResultLike = {
continuation: this.continuation,
delay: this.delay,
};
subscribe(subscriber: SubscriberLike<T>) {

@@ -62,0 +62,0 @@ this.subscriber = subscriber;

@@ -11,2 +11,22 @@ import { ObservableLike, SubscriberLike } from "@reactive-js/rx";

private readonly continuation: SchedulerContinuationLike = shouldYield => {
let error = undefined;
try {
const result = this.loop(shouldYield);
if (result !== undefined) {
return result;
}
} catch (cause) {
error = { cause };
}
(this.subscriber as SubscriberLike<T>).complete(error);
return;
};
private readonly continuationResult: SchedulerContinuationResultLike = {
continuation: this.continuation,
delay: this.delay,
};
constructor(

@@ -37,22 +57,2 @@ private readonly iterator: Iterator<T>,

private readonly continuation: SchedulerContinuationLike = shouldYield => {
let error = undefined;
try {
const result = this.loop(shouldYield);
if (result !== undefined) {
return result;
}
} catch (cause) {
error = { cause };
}
(this.subscriber as SubscriberLike<T>).complete(error);
return;
};
private readonly continuationResult: SchedulerContinuationResultLike = {
continuation: this.continuation,
delay: this.delay,
};
subscribe(subscriber: SubscriberLike<T>) {

@@ -59,0 +59,0 @@ this.subscriber = subscriber;

@@ -12,2 +12,17 @@ import { ObservableLike, SubscriberLike } from "@reactive-js/rx";

private readonly continuation: SchedulerContinuationLike = shouldYield => {
let error = undefined;
try {
const result = this.loop(shouldYield);
if (result !== undefined) {
return result;
}
} catch (cause) {
error = { cause };
}
(this.subscriber as SubscriberLike<T>).complete(error);
return;
};
constructor(private readonly values: ReadonlyArray<[number, T]>) {}

@@ -39,17 +54,2 @@

private readonly continuation: SchedulerContinuationLike = shouldYield => {
let error = undefined;
try {
const result = this.loop(shouldYield);
if (result !== undefined) {
return result;
}
} catch (cause) {
error = { cause };
}
(this.subscriber as SubscriberLike<T>).complete(error);
return;
};
subscribe(subscriber: SubscriberLike<T>) {

@@ -56,0 +56,0 @@ this.subscriber = subscriber;

@@ -11,2 +11,20 @@ import { ObservableLike, SubscriberLike } from "@reactive-js/rx";

private readonly continuation: SchedulerContinuationLike = (
shouldYield: () => boolean,
) => {
const subscriber = this.subscriber as SubscriberLike<T>;
try {
this.loop(shouldYield);
} catch (cause) {
subscriber.complete({ cause });
}
return subscriber.isDisposed ? undefined : this.continuationResult;
};
private readonly continuationResult: SchedulerContinuationResultLike = {
continuation: this.continuation,
delay: this.delay,
};
constructor(

@@ -31,20 +49,2 @@ private readonly generator: (acc: T) => T,

private readonly continuation: SchedulerContinuationLike = (
shouldYield: () => boolean,
) => {
const subscriber = this.subscriber as SubscriberLike<T>;
try {
this.loop(shouldYield);
} catch (cause) {
subscriber.complete({ cause });
}
return subscriber.isDisposed ? undefined : this.continuationResult;
};
private readonly continuationResult: SchedulerContinuationResultLike = {
continuation: this.continuation,
delay: this.delay,
};
subscribe(subscriber: SubscriberLike<T>) {

@@ -51,0 +51,0 @@ this.subscriber = subscriber;

@@ -85,3 +85,3 @@ import {

class ObservableIteratorImpl<T> implements Iterator<T> {
class ObservableIteratorImpl<T> implements Iterator<T>, ObserverLike<T> {
private value: [T] | undefined = undefined;

@@ -94,18 +94,18 @@ private error: ErrorLike | undefined = undefined;

) {
const observer: ObserverLike<T> = {
onNext: (value: T) => {
this.value = [value];
},
onComplete: e => {
this.error = e;
},
};
const subscription = pipe(
observable,
observe(observer),
subscribe(scheduler),
);
const subscription = pipe(observable, observe(this), subscribe(scheduler));
scheduler.add(subscription);
}
onNext(value: T) {
if (this.value === undefined) {
this.value = [value];
} else {
this.value[0] = value;
}
}
onComplete(error?: ErrorLike) {
this.error = error;
}
next(): IteratorResult<T> {

@@ -126,3 +126,2 @@ throwIfDisposed(this.scheduler);

if (done) {
// Cleanup
this.scheduler.dispose();

@@ -141,3 +140,3 @@ return iteratorDone;

throw(e?: any): IteratorResult<T> {
throw(e?: unknown): IteratorResult<T> {
this.scheduler.dispose;

@@ -144,0 +143,0 @@ if (e !== undefined) {

@@ -61,14 +61,18 @@ import {

class OnErrorObserver<T> implements ObserverLike<T> {
constructor(private readonly onError: (err: unknown) => void) {}
onComplete(error?: ErrorLike) {
if (error !== undefined) {
const { cause } = error;
this.onError(cause);
}
}
onNext(_: T) {}
}
export const onError = <T>(
onError: (err: unknown) => void,
): ObservableOperatorLike<T, T> =>
observe({
onNext: ignore,
onComplete: (error?: ErrorLike) => {
if (error !== undefined) {
const { cause } = error;
onError(cause);
}
},
});
): ObservableOperatorLike<T, T> => observe(new OnErrorObserver(onError));

@@ -75,0 +79,0 @@ export const onNext = <T>(

@@ -11,2 +11,3 @@ import {

SubscriberLike,
ObserverLike,
} from "@reactive-js/rx";

@@ -16,3 +17,3 @@ import { empty } from "./fromArray";

import { lift } from "./lift";
import { onComplete } from "./observe";
import { observe } from "./observe";
import { pipe } from "@reactive-js/pipe";

@@ -26,6 +27,18 @@

class ThrottleSubscriber<T> extends DelegatingSubscriber<T, T> {
class ThrottleSubscriber<T> extends DelegatingSubscriber<T, T>
implements ObserverLike<unknown> {
private readonly durationSubscription: SerialDisposableLike = createSerialDisposable();
private value: [T] | undefined = undefined;
private readonly notifyNext = () => {
constructor(
delegate: SubscriberLike<T>,
private readonly durationSelector: (next: T) => ObservableLike<unknown>,
private readonly mode: ThrottleMode,
) {
super(delegate);
this.add(this.durationSubscription);
}
private notifyNext() {
const value = this.value;

@@ -44,12 +57,2 @@ if (value !== undefined) {

}
};
constructor(
delegate: SubscriberLike<T>,
private readonly durationSelector: (next: T) => ObservableLike<unknown>,
private readonly mode: ThrottleMode,
) {
super(delegate);
this.add(this.durationSubscription);
}

@@ -60,3 +63,3 @@

this.durationSelector(next),
onComplete(this.notifyNext),
observe(this),
subscribe(this),

@@ -95,2 +98,12 @@ );

}
onComplete(error?: ErrorLike) {
if (error !== undefined) {
this.complete(error);
} else {
this.notifyNext();
}
}
onNext(_: unknown) {}
}

@@ -97,0 +110,0 @@

@@ -9,3 +9,3 @@ import { ObservableLike, SubscriberLike, ErrorLike } from "@reactive-js/rx";

subscribe = (subscriber: SubscriberLike<T>) => {
subscribe(subscriber: SubscriberLike<T>) {
const continuation = (_: () => boolean) => {

@@ -16,3 +16,3 @@ subscriber.complete(this.error);

subscriber.schedule(continuation, this.delay);
};
}
}

@@ -19,0 +19,0 @@

@@ -10,6 +10,8 @@ import {

ObservableLike,
ObserverLike,
ErrorLike,
} from "@reactive-js/rx";
import { ObservableOperatorLike, SubscriberOperatorLike } from "./interfaces";
import { lift } from "./lift";
import { onComplete } from "./observe";
import { observe } from "./observe";
import { pipe } from "@reactive-js/pipe";

@@ -20,3 +22,4 @@ import { throws } from "./throws";

class TimeoutSubscriber<T> extends DelegatingSubscriber<T, T> {
class TimeoutSubscriber<T> extends DelegatingSubscriber<T, T>
implements ObserverLike<unknown> {
private readonly durationSubscription: SerialDisposableLike = createSerialDisposable();

@@ -36,3 +39,3 @@

this.duration,
onComplete(error => this.complete(error)),
observe(this),
subscribe(this),

@@ -44,2 +47,8 @@ );

}
onComplete(error?: ErrorLike) {
this.complete(error);
}
onNext(_: unknown) {}
}

@@ -46,0 +55,0 @@

@@ -18,2 +18,3 @@ import { DisposableLike } from "@reactive-js/disposable";

if (Array.isArray(resources)) {
// eslint-disable-next-line prefer-spread
subscriber.add.apply(subscriber, resources as any);

@@ -20,0 +21,0 @@ } else {

@@ -14,2 +14,3 @@ import {

import {
buffer,
combineLatest,

@@ -108,2 +109,25 @@ concat,

test("buffer", () => {
const result = pipe(
fromScheduledValues(
[0, 1],
[0, 2],
[0, 3],
[0, 4],
[1, 1],
[1, 2],
[1, 3],
[1, 4],
),
buffer(4, 3),
toArray(createVirtualTimeSchedulerResource),
);
expect(result).toEqual([
[1, 2, 3],
[4, 1, 2],
[3, 4],
]);
});
test("combineLatest", () => {

@@ -840,2 +864,3 @@ const result = pipe(

expect(() => {
// eslint-disable-next-line @typescript-eslint/no-unused-vars, no-empty
for (const _ of iterable) {

@@ -860,3 +885,3 @@ }

expect((result as any).done).toBeTruthy();
expect(result.done).toBeTruthy();
});

@@ -869,3 +894,3 @@

expect((result as any).done).toBeTruthy();
expect(result.done).toBeTruthy();
});

@@ -872,0 +897,0 @@ });

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is too big to display

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc