Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

@reactive-js/observable

Package Overview
Dependencies
Maintainers
1
Versions
27
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@reactive-js/observable - npm Package Compare versions

Comparing version 0.0.9 to 0.0.10

coverage/clover.xml

169

docs/README.md

@@ -7,5 +7,8 @@ [@reactive-js/observable](README.md)

### Enumerations
* [ThrottleMode](enums/throttlemode.md)
### Interfaces
* [IteratorResource](interfaces/iteratorresource.md)
* [ObservableOperatorLike](interfaces/observableoperatorlike.md)

@@ -29,3 +32,2 @@ * [SubscriberOperatorLike](interfaces/subscriberoperatorlike.md)

* [ignoreElements](README.md#const-ignoreelements)
* [iterate](README.md#const-iterate)
* [keep](README.md#const-keep)

@@ -54,7 +56,2 @@ * [lift](README.md#const-lift)

* [throttle](README.md#const-throttle)
* [throttleFirst](README.md#const-throttlefirst)
* [throttleFirstTime](README.md#const-throttlefirsttime)
* [throttleLast](README.md#const-throttlelast)
* [throttleLastTime](README.md#const-throttlelasttime)
* [throttleTime](README.md#const-throttletime)
* [throws](README.md#const-throws)

@@ -64,3 +61,2 @@ * [timeout](README.md#const-timeout)

* [toIterable](README.md#const-toiterable)
* [toIterator](README.md#const-toiterator)
* [toPromise](README.md#const-topromise)

@@ -519,20 +515,2 @@ * [withLatestFrom](README.md#const-withlatestfrom)

### `Const` iterate
▸ **iterate**<**T**>(`schedulerFactory`: function): *OperatorLike‹ObservableLike‹T›, void›*
**Type parameters:**
▪ **T**
**Parameters:**
▪`Default value` **schedulerFactory**: *function*= createSynchronousSchedulerResource
▸ (): *VirtualTimeSchedulerResourceLike*
**Returns:** *OperatorLike‹ObservableLike‹T›, void›*
___
### `Const` keep

@@ -999,3 +977,3 @@

▸ **throttle**<**T**>(`durationSelector`: function): *[ObservableOperatorLike](interfaces/observableoperatorlike.md)‹T, T›*
▸ **throttle**<**T**>(`duration`: function | number, `mode`: [ThrottleMode](enums/throttlemode.md)): *[ObservableOperatorLike](interfaces/observableoperatorlike.md)‹T, T›*

@@ -1008,12 +986,7 @@ **Type parameters:**

▪ **durationSelector**: *function*
Name | Type | Default |
------ | ------ | ------ |
`duration` | function &#124; number | - |
`mode` | [ThrottleMode](enums/throttlemode.md) | ThrottleMode.Interval |
▸ (`next`: T): *ObservableLike‹unknown›*
**Parameters:**
Name | Type |
------ | ------ |
`next` | T |
**Returns:** *[ObservableOperatorLike](interfaces/observableoperatorlike.md)‹T, T›*

@@ -1023,104 +996,2 @@

### `Const` throttleFirst
▸ **throttleFirst**<**T**>(`durationSelector`: function): *[ObservableOperatorLike](interfaces/observableoperatorlike.md)‹T, T›*
**Type parameters:**
▪ **T**
**Parameters:**
▪ **durationSelector**: *function*
▸ (`next`: T): *ObservableLike‹unknown›*
**Parameters:**
Name | Type |
------ | ------ |
`next` | T |
**Returns:** *[ObservableOperatorLike](interfaces/observableoperatorlike.md)‹T, T›*
___
### `Const` throttleFirstTime
▸ **throttleFirstTime**<**T**>(`duration`: number): *[ObservableOperatorLike](interfaces/observableoperatorlike.md)‹T, T›*
**Type parameters:**
▪ **T**
**Parameters:**
Name | Type |
------ | ------ |
`duration` | number |
**Returns:** *[ObservableOperatorLike](interfaces/observableoperatorlike.md)‹T, T›*
___
### `Const` throttleLast
▸ **throttleLast**<**T**>(`durationSelector`: function): *[ObservableOperatorLike](interfaces/observableoperatorlike.md)‹T, T›*
**Type parameters:**
▪ **T**
**Parameters:**
▪ **durationSelector**: *function*
▸ (`next`: T): *ObservableLike‹unknown›*
**Parameters:**
Name | Type |
------ | ------ |
`next` | T |
**Returns:** *[ObservableOperatorLike](interfaces/observableoperatorlike.md)‹T, T›*
___
### `Const` throttleLastTime
▸ **throttleLastTime**<**T**>(`duration`: number): *[ObservableOperatorLike](interfaces/observableoperatorlike.md)‹T, T›*
**Type parameters:**
▪ **T**
**Parameters:**
Name | Type |
------ | ------ |
`duration` | number |
**Returns:** *[ObservableOperatorLike](interfaces/observableoperatorlike.md)‹T, T›*
___
### `Const` throttleTime
▸ **throttleTime**<**T**>(`duration`: number): *[ObservableOperatorLike](interfaces/observableoperatorlike.md)‹T, T›*
**Type parameters:**
▪ **T**
**Parameters:**
Name | Type |
------ | ------ |
`duration` | number |
**Returns:** *[ObservableOperatorLike](interfaces/observableoperatorlike.md)‹T, T›*
___
### `Const` throws

@@ -1147,3 +1018,3 @@

▸ **timeout**<**T**>(`duration`: number): *[ObservableOperatorLike](interfaces/observableoperatorlike.md)‹T, T›*
▸ **timeout**<**T**>(`duration`: number | ObservableLike‹unknown›): *[ObservableOperatorLike](interfaces/observableoperatorlike.md)‹T, T›*

@@ -1158,3 +1029,3 @@ **Type parameters:**

------ | ------ |
`duration` | number |
`duration` | number &#124; ObservableLike‹unknown› |

@@ -1201,20 +1072,2 @@ **Returns:** *[ObservableOperatorLike](interfaces/observableoperatorlike.md)‹T, T›*

### `Const` toIterator
▸ **toIterator**<**T**>(`schedulerFactory`: function): *OperatorLike‹ObservableLike‹T›, [IteratorResource](interfaces/iteratorresource.md)‹T››*
**Type parameters:**
▪ **T**
**Parameters:**
▪`Default value` **schedulerFactory**: *function*= createSynchronousSchedulerResource
▸ (): *VirtualTimeSchedulerResourceLike*
**Returns:** *OperatorLike‹ObservableLike‹T›, [IteratorResource](interfaces/iteratorresource.md)‹T››*
___
### `Const` toPromise

@@ -1221,0 +1074,0 @@

{
"name": "@reactive-js/observable",
"version": "0.0.9",
"version": "0.0.10",
"main": "dist/cjs/index.js",

@@ -41,7 +41,7 @@ "module": "dist/esm5/index.js",

"dependencies": {
"@reactive-js/disposable": "^0.0.9",
"@reactive-js/pipe": "^0.0.9",
"@reactive-js/rx": "^0.0.9",
"@reactive-js/scheduler": "^0.0.9",
"@reactive-js/schedulers": "^0.0.9"
"@reactive-js/disposable": "^0.0.10",
"@reactive-js/pipe": "^0.0.10",
"@reactive-js/rx": "^0.0.10",
"@reactive-js/scheduler": "^0.0.10",
"@reactive-js/schedulers": "^0.0.10"
},

@@ -73,3 +73,3 @@ "devDependencies": {

},
"gitHead": "24008ff071cc86dfb1eaf3940996e6feb8e69704"
"gitHead": "fef42eb9cbce4260e80cf397f7fbd2a3bf3311d5"
}

@@ -10,3 +10,3 @@ export { lift } from "./internal/lift";

} from "./internal/interfaces";
export { iterate, toArray, toIterable, toIterator } from "./internal/iterate";
export { toArray, toIterable } from "./internal/iterate";
export {

@@ -36,12 +36,5 @@ empty,

export { takeWhile } from "./internal/takeWhile";
export {
throttleFirst,
throttleFirstTime,
throttleLast,
throttleLastTime,
throttle,
throttleTime,
} from "./internal/throttle";
export { ThrottleMode, throttle } from "./internal/throttle";
export { throws } from "./internal/throws";
export { timeout } from "./internal/timeout";
export { withLatestFrom } from "./internal/withLatestFrom";

@@ -15,3 +15,5 @@ import { ObservableLike, SubscriberLike } from "@reactive-js/rx";

const continuation: SchedulerContinuationLike = shouldYield => {
if (index < values.length && delay > 0) {
if (subscriber.isDisposed) {
return;
} else if (index < values.length && delay > 0) {
const value = values[index];

@@ -22,3 +24,3 @@ index++;

} else {
while (index < values.length) {
while (index < values.length && !subscriber.isDisposed) {
const value = values[index];

@@ -67,3 +69,3 @@ index++;

) => {
while (index < values.length) {
while (index < values.length && !subscriber.isDisposed) {
const [, value] = values[index];

@@ -70,0 +72,0 @@ index++;

@@ -22,4 +22,5 @@ import { ObservableLike, SubscriberLike } from "@reactive-js/rx";

let next = iterator.next();
if (next.done) {
if (subscriber.isDisposed) {
return;
} else if (next.done) {
subscriber.complete();

@@ -31,3 +32,3 @@ return;

} else {
for (; !next.done; next = iterator.next()) {
for (; !next.done && !subscriber.isDisposed; next = iterator.next()) {
subscriber.next(next.value);

@@ -34,0 +35,0 @@

@@ -21,2 +21,4 @@ import { ObservableLike, SubscriberLike } from "@reactive-js/rx";

} else if (delay > 0) {
subscriber.next(acc);
try {

@@ -29,6 +31,7 @@ acc = generator(acc);

subscriber.next(acc);
return continuationResult;
} else {
do {
subscriber.next(acc);
try {

@@ -40,6 +43,4 @@ acc = generator(acc);

}
} while (!shouldYield() && !subscriber.isDisposed);
subscriber.next(acc);
} while (!shouldYield());
return continuationResult;

@@ -46,0 +47,0 @@ }

@@ -14,9 +14,5 @@ import {

import { OperatorLike, pipe } from "@reactive-js/pipe";
import {
DisposableLike,
DisposableOrTeardown,
throwIfDisposed,
} from "@reactive-js/disposable";
import { throwIfDisposed } from "@reactive-js/disposable";
export const iterate = <T>(
const iterate = <T>(
schedulerFactory: () => VirtualTimeSchedulerResourceLike = createSynchronousSchedulerResource,

@@ -75,4 +71,2 @@ ): OperatorLike<ObservableLike<T>, void> => observable => {

export interface IteratorResource<T> extends Iterator<T>, DisposableLike {}
const iteratorDone: IteratorReturnResult<any> = {

@@ -83,3 +77,3 @@ done: true,

class ObservableIteratorResourceImpl<T> implements IteratorResource<T> {
class ObservableIteratorImpl<T> implements Iterator<T> {
private readonly scheduler: VirtualTimeSchedulerResourceLike;

@@ -112,19 +106,4 @@

get isDisposed(): boolean {
return this.scheduler.isDisposed;
}
add(
disposable: DisposableOrTeardown,
...disposables: DisposableOrTeardown[]
) {
this.scheduler.add(disposable, ...disposables);
}
dispose() {
this.scheduler.dispose();
}
next(): IteratorResult<T> {
throwIfDisposed(this);
throwIfDisposed(this.scheduler);

@@ -143,2 +122,4 @@ let done = false;

if (done) {
// Cleanup
this.scheduler.dispose();
return iteratorDone;

@@ -152,3 +133,3 @@ } else {

return(): IteratorResult<T> {
this.dispose();
this.scheduler.dispose();
return iteratorDone;

@@ -158,3 +139,3 @@ }

throw(e?: any): IteratorResult<T> {
this.dispose;
this.scheduler.dispose;
if (e !== undefined) {

@@ -165,16 +146,9 @@ throw e;

}
remove(
disposable: DisposableOrTeardown,
...disposables: DisposableOrTeardown[]
) {
this.scheduler.remove(disposable, ...disposables);
}
}
export const toIterator = <T>(
const toIterator = <T>(
schedulerFactory: () => VirtualTimeSchedulerResourceLike = createSynchronousSchedulerResource,
): OperatorLike<ObservableLike<T>, IteratorResource<T>> => observable => {
): OperatorLike<ObservableLike<T>, Iterator<T>> => observable => {
const scheduler = schedulerFactory();
return new ObservableIteratorResourceImpl(scheduler, observable);
return new ObservableIteratorImpl(scheduler, observable);
};

@@ -181,0 +155,0 @@

@@ -18,51 +18,12 @@ import {

class ThrottleFirstSubscriber<T> extends DelegatingSubscriber<T, T> {
private readonly durationSelector: (next: T) => ObservableLike<unknown>;
private readonly durationSubscription: SerialDisposableLike = createSerialDisposable();
constructor(
delegate: SubscriberLike<T>,
durationSelector: (next: T) => ObservableLike<unknown>,
) {
super(delegate);
this.durationSelector = durationSelector;
this.add(this.durationSubscription);
}
protected onComplete(error?: ErrorLike) {
this.remove(this.durationSubscription);
this.delegate.complete(error);
}
protected onNext(data: T) {
if (this.durationSubscription.disposable.isDisposed) {
this.durationSubscription.disposable = pipe(
this.durationSelector(data),
connect(this),
);
this.delegate.next(data);
}
}
export const enum ThrottleMode {
First = 1,
Last = 2,
Interval = 3,
}
const throttleFirstOperator = <T>(
durationSelector: (next: T) => ObservableLike<unknown>,
): SubscriberOperatorLike<T, T> => subscriber =>
new ThrottleFirstSubscriber(subscriber, durationSelector);
export const throttleFirst = <T>(
durationSelector: (next: T) => ObservableLike<unknown>,
): ObservableOperatorLike<T, T> =>
lift(throttleFirstOperator(durationSelector));
export const throttleFirstTime = <T>(
duration: number,
): ObservableOperatorLike<T, T> => {
const durationSelector = (_: T) => empty(duration);
return throttleFirst(durationSelector);
};
class ThrottleLastSubscriber<T> extends DelegatingSubscriber<T, T> {
class ThrottleSubscriber<T> extends DelegatingSubscriber<T, T> {
private readonly durationSelector: (next: T) => ObservableLike<unknown>;
private readonly durationSubscription: SerialDisposableLike = createSerialDisposable();
private readonly mode: ThrottleMode;
private value: [T] | undefined = undefined;

@@ -75,11 +36,16 @@ private readonly notifyNext = () => {

this.setupDurationSubscription(next);
this.delegate.next(next);
}
};
constructor(
delegate: SubscriberLike<T>,
durationSelector: (next: T) => ObservableLike<unknown>,
mode: ThrottleMode,
) {
super(delegate);
this.durationSelector = durationSelector;
this.mode = mode;

@@ -89,5 +55,13 @@ this.add(this.durationSubscription);

private setupDurationSubscription(next: T) {
this.durationSubscription.disposable = pipe(
this.durationSelector(next),
onComplete(this.notifyNext),
connect(this),
);
}
protected onComplete(error?: ErrorLike) {
this.remove(this.durationSubscription);
if (error === undefined) {
if (error === undefined && this.mode !== ThrottleMode.First) {
this.notifyNext();

@@ -99,73 +73,17 @@ }

protected onNext(data: T) {
this.value = [data];
if (this.durationSubscription.disposable.isDisposed) {
this.durationSubscription.disposable = pipe(
this.durationSelector(data),
onComplete(this.notifyNext),
connect(this),
);
if (this.value !== undefined) {
this.value[0] = data;
} else {
this.value = [data];
}
}
}
const throttleLastOperator = <T>(
durationSelector: (next: T) => ObservableLike<unknown>,
): SubscriberOperatorLike<T, T> => subscriber =>
new ThrottleLastSubscriber(subscriber, durationSelector);
export const throttleLast = <T>(
durationSelector: (next: T) => ObservableLike<unknown>,
): ObservableOperatorLike<T, T> => lift(throttleLastOperator(durationSelector));
export const throttleLastTime = <T>(
duration: number,
): ObservableOperatorLike<T, T> => {
const durationSelector = (_: T) => empty(duration);
return throttleLast(durationSelector);
};
class ThrottleSubscriber<T> extends DelegatingSubscriber<T, T> {
private readonly durationSelector: (next: T) => ObservableLike<unknown>;
private readonly durationSubscription: SerialDisposableLike = createSerialDisposable();
private value: [T] | undefined = undefined;
private readonly notifyNext = () => {
const value = this.value;
if (value !== undefined) {
this.value = undefined;
const [next] = value;
this.durationSubscription.disposable = pipe(
this.durationSelector(next),
onComplete(this.notifyNext),
connect(this),
);
this.delegate.next(next);
}
};
constructor(
delegate: SubscriberLike<T>,
durationSelector: (next: T) => ObservableLike<unknown>,
) {
super(delegate);
this.durationSelector = durationSelector;
this.add(this.durationSubscription);
}
protected onComplete(error?: ErrorLike) {
this.remove(this.durationSubscription);
if (error === undefined) {
if (
this.durationSubscription.disposable.isDisposed &&
this.mode !== ThrottleMode.Last
) {
this.notifyNext();
} else if (this.durationSubscription.disposable.isDisposed) {
this.setupDurationSubscription(data);
}
this.delegate.complete(error);
}
protected onNext(data: T) {
this.value = [data];
if (this.durationSubscription.disposable.isDisposed) {
this.notifyNext();
}
}
}

@@ -175,14 +93,15 @@

durationSelector: (next: T) => ObservableLike<unknown>,
mode: ThrottleMode,
): SubscriberOperatorLike<T, T> => subscriber =>
new ThrottleSubscriber(subscriber, durationSelector);
new ThrottleSubscriber(subscriber, durationSelector, mode);
export const throttle = <T>(
durationSelector: (next: T) => ObservableLike<unknown>,
): ObservableOperatorLike<T, T> => lift(throttleOperator(durationSelector));
export const throttleTime = <T>(
duration: number,
): ObservableOperatorLike<T, T> => {
const durationSelector = (_: T) => empty(duration);
return throttle(durationSelector);
};
duration: ((next: T) => ObservableLike<unknown>) | number,
mode: ThrottleMode = ThrottleMode.Interval,
): ObservableOperatorLike<T, T> =>
lift(
throttleOperator(
typeof duration === "number" ? _ => empty(duration) : duration,
mode,
),
);

@@ -10,6 +10,7 @@ import {

DelegatingSubscriber,
ObservableLike,
} from "@reactive-js/rx";
import { ObservableOperatorLike, SubscriberOperatorLike } from "./interfaces";
import { lift } from "./lift";
import { onError } from "./observe";
import { onComplete } from "./observe";
import { pipe } from "@reactive-js/pipe";

@@ -21,12 +22,6 @@ import { throws } from "./throws";

class TimeoutSubscriber<T> extends DelegatingSubscriber<T, T> {
private readonly duration: number;
private readonly duration: ObservableLike<unknown>;
private readonly durationSubscription: SerialDisposableLike = createSerialDisposable();
private readonly setupTimeout = () => {
this.durationSubscription.disposable = pipe(
throws(timeoutError, this.duration),
onError(cause => this.complete({ cause })),
connect(this),
);
};
constructor(delegate: SubscriberLike<T>, duration: number) {
constructor(delegate: SubscriberLike<T>, duration: ObservableLike<unknown>) {
super(delegate);

@@ -44,3 +39,8 @@ this.duration = duration;

protected onNext(data: T) {
this.setupTimeout();
this.durationSubscription.disposable = pipe(
this.duration,
onComplete(error => this.complete(error)),
connect(this),
);
this.delegate.next(data);

@@ -51,7 +51,13 @@ }

const operator = <T>(
duration: number,
duration: ObservableLike<unknown>,
): SubscriberOperatorLike<T, T> => subscriber =>
new TimeoutSubscriber(subscriber, duration);
export const timeout = <T>(duration: number): ObservableOperatorLike<T, T> =>
lift(operator(duration));
export const timeout = <T>(
duration: number | ObservableLike<unknown>,
): ObservableOperatorLike<T, T> =>
lift(
operator(
typeof duration === "number" ? throws(timeoutError, duration) : duration,
),
);

@@ -42,5 +42,9 @@ import {

toArray,
iterate,
toIterable,
fromIterable,
repeat,
timeout,
throttle,
ThrottleMode,
takeWhile,
} from "../src/index";

@@ -111,3 +115,3 @@

i => i + 2,
() => 1,
() => 3,
2,

@@ -120,3 +124,3 @@ ),

i => i + 2,
() => 0,
() => 2,
3,

@@ -152,3 +156,4 @@ ),

onNext(cb),
iterate(),
ignoreElements(),
toArray(),
),

@@ -183,3 +188,5 @@ ).toThrow(cause);

expect(() => pipe(src, concatAll(), onNext(cb), iterate())).toThrow(cause);
expect(() =>
pipe(src, concatAll(), onNext(cb), ignoreElements(), toArray()),
).toThrow(cause);
expect(cb).toHaveBeenNthCalledWith(1, 1);

@@ -197,3 +204,5 @@ expect(cb).toHaveBeenNthCalledWith(2, 2);

expect(() => pipe(src, concatAll(), onNext(cb), iterate())).toThrow(cause);
expect(() =>
pipe(src, concatAll(), onNext(cb), ignoreElements(), toArray()),
).toThrow(cause);
expect(cb).toHaveBeenNthCalledWith(1, 1);

@@ -245,3 +254,4 @@ expect(cb).toHaveBeenNthCalledWith(2, 2);

onNext(cb),
iterate(),
ignoreElements(),
toArray(),
),

@@ -266,6 +276,3 @@ ).toThrow(cause);

const observable = fromArray(src);
const result = pipe(
observable,
toArray(() => createVirtualTimeSchedulerResource(1)),
);
const result = pipe(observable, toArray());
expect(result).toEqual(src);

@@ -297,5 +304,12 @@ });

describe("fromIterable", () => {
test("with no delay", () => {
test("with no delay when scheduler does not request yields", () => {
const src = [1, 2, 3, 4, 5, 6];
const observable = fromIterable(src);
const result = pipe(observable, toArray());
expect(result).toEqual(src);
});
test("with no delay when scheduler requests yields", () => {
const src = [1, 2, 3, 4, 5, 6];
const observable = fromIterable(src);
const result = pipe(

@@ -328,2 +342,21 @@ observable,

});
test("calls iterator.return when disposed", () => {
const iterator = {
next: jest.fn(),
return: jest.fn(),
throw: jest.fn(),
};
const mockIterable = {
[Symbol.iterator](): Iterator<unknown> {
return iterator;
},
};
const scheduler = createVirtualTimeSchedulerResource(1);
const subscription = connect(scheduler)(fromIterable(mockIterable));
subscription.dispose();
expect(mockIterable[Symbol.iterator]().return).toHaveBeenCalledTimes(1);
});
});

@@ -388,6 +421,6 @@

i => i + 1,
() => 0,
() => 1,
),
take(5),
toArray(() => createVirtualTimeSchedulerResource(1)),
toArray(),
);

@@ -412,6 +445,7 @@

pipe(
generate(generator, () => 0),
generate(generator, () => 1),
take(5),
onNext(cb),
iterate(() => createVirtualTimeSchedulerResource(1)),
ignoreElements(),
toArray(),
),

@@ -432,3 +466,3 @@ ).toThrow(cause);

i => i + 1,
() => 0,
() => 1,
5,

@@ -465,3 +499,3 @@ ),

pipe(
generate(generator, () => 0, 5),
generate(generator, () => 1, 5),
map(x => [scheduler.now, x]),

@@ -524,3 +558,3 @@ take(5),

i => i + 2,
() => 1,
() => 3,
2,

@@ -533,3 +567,3 @@ ),

i => i + 2,
() => 0,
() => 2,
3,

@@ -558,3 +592,3 @@ ),

pipe(never(), observe(observer), iterate());
expect(() => pipe(never(), observe(observer), toArray())).toThrow();

@@ -624,13 +658,19 @@ expect(observer.next).toHaveBeenCalledTimes(0);

describe("throws", () => {
test("completes with an exception when subscribed", () => {
const scheduler = createVirtualTimeSchedulerResource();
const observer = createMockObserver();
const cause = new Error();
describe("repeat", () => {
test("repeats the observable n times", () => {
const result = pipe(ofValue(1), repeat(3), toArray());
expect(result).toEqual([1, 1, 1]);
});
pipe(throws(cause), observe(observer), connect(scheduler));
scheduler.run();
expect(observer.next).toBeCalledTimes(0);
expect(observer.complete).toBeCalledWith({ cause });
test("when the repeat functions throws throws", () => {
const error = new Error();
expect(() =>
pipe(
ofValue(1),
repeat(_ => {
throw error;
}),
toArray(),
),
).toThrow(error);
});

@@ -652,3 +692,3 @@ });

observe(observer),
iterate(),
toArray(),
),

@@ -669,10 +709,3 @@ ).toThrow(cause);

expect(() =>
pipe(
src,
switchAll(),
onNext(cb),
iterate(() => createVirtualTimeSchedulerResource()),
),
).toThrow(cause);
expect(() => pipe(src, switchAll(), onNext(cb), toArray())).toThrow(cause);

@@ -689,7 +722,3 @@ expect(cb).toBeCalledTimes(4);

const src = fromArray([1, 2, 3, 4]);
const result = pipe(
src,
takeLast(3),
toArray(() => createVirtualTimeSchedulerResource(2)),
);
const result = pipe(src, takeLast(3), toArray());
expect(result).toEqual([2, 3, 4]);

@@ -708,3 +737,3 @@ });

observe(observer),
iterate(() => createVirtualTimeSchedulerResource()),
toArray(() => createVirtualTimeSchedulerResource()),
),

@@ -716,15 +745,146 @@ ).toThrow(cause);

test("toIterable", () => {
const iterable = pipe(
fromArray([1, 2, 3, 4]),
map(x => x + 1),
toIterable(),
test("takeWhile", () => {
const result = pipe(
generate(
x => x + 1,
() => 0,
),
takeWhile(x => x < 3),
toArray(),
);
const acc = [];
for (const v of iterable) {
acc.push(v);
}
expect(acc).toEqual([2, 3, 4, 5]);
expect(result).toEqual([0, 1, 2]);
});
describe("throttle", () => {
test("first", () => {
const result = pipe(
generate(
x => x + 1,
() => 0,
1,
),
take(100),
throttle(50, ThrottleMode.First),
toArray(() => createVirtualTimeSchedulerResource(1)),
);
expect(result).toEqual([0, 49, 99]);
});
test("last", () => {
const result = pipe(
generate(
x => x + 1,
() => 0,
1,
),
take(200),
throttle(50, ThrottleMode.Last),
toArray(() => createVirtualTimeSchedulerResource(1)),
);
expect(result).toEqual([49, 99, 149, 199]);
});
test("interval", () => {
const result = pipe(
generate(
x => x + 1,
() => 0,
1,
),
take(200),
throttle(75, ThrottleMode.Interval),
toArray(() => createVirtualTimeSchedulerResource(1)),
);
expect(result).toEqual([0, 74, 149, 199]);
});
});
describe("throws", () => {
test("completes with an exception when subscribed", () => {
const scheduler = createVirtualTimeSchedulerResource();
const observer = createMockObserver();
const cause = new Error();
pipe(throws(cause), observe(observer), connect(scheduler));
scheduler.run();
expect(observer.next).toBeCalledTimes(0);
expect(observer.complete).toBeCalledWith({ cause });
});
});
describe("timeout", () => {
test("throws when a timeout occurs", () => {
expect(() =>
pipe(
ofValue(1, 2),
timeout(1),
toArray(() => createVirtualTimeSchedulerResource(2)),
),
).toThrow();
});
test("when timeout is greater than observed time", () => {
const result = pipe(
ofValue(1, 2),
timeout(3),
toArray(() => createVirtualTimeSchedulerResource(2)),
);
expect(result).toEqual([1]);
});
});
describe("toIterable", () => {
test("iterate using a for of loop", () => {
const iterable = pipe(
fromArray([1, 2, 3, 4]),
map(x => x + 1),
toIterable(),
);
const acc = [];
for (const v of iterable) {
acc.push(v);
}
expect(acc).toEqual([2, 3, 4, 5]);
});
test("rethrows an error when the source throws", () => {
const error = new Error();
const iterable = pipe(throws(error), toIterable());
expect(() => {
for (const _ of iterable) {
}
}).toThrowError(error);
});
test("calling throw, throws the error", () => {
const error = new Error();
const iterator = pipe(fromArray([1, 2, 3, 4]), toIterable())[
Symbol.iterator
]();
expect(() => (iterator as any).throw(error)).toThrowError(error);
});
test("calling throw without an error returns done.", () => {
const result = (pipe(fromArray([1, 2, 3, 4]), toIterable())[
Symbol.iterator
]() as any).throw();
expect((result as any).done).toBeTruthy();
});
test("calling return, returns done", () => {
const result = (pipe(fromArray([1, 2, 3, 4]), toIterable())[
Symbol.iterator
]() as any).return();
expect((result as any).done).toBeTruthy();
});
});
describe("toPromise", () => {

@@ -763,3 +923,3 @@ test("when the observable produces no values", () => {

onNext(cb),
iterate(createVirtualTimeSchedulerResource),
toArray(createVirtualTimeSchedulerResource),
),

@@ -766,0 +926,0 @@ ).toThrow(cause);

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc