@reactive-js/observable
Advanced tools
Comparing version 0.0.9 to 0.0.10
@@ -7,5 +7,8 @@ [@reactive-js/observable](README.md) | ||
### Enumerations | ||
* [ThrottleMode](enums/throttlemode.md) | ||
### Interfaces | ||
* [IteratorResource](interfaces/iteratorresource.md) | ||
* [ObservableOperatorLike](interfaces/observableoperatorlike.md) | ||
@@ -29,3 +32,2 @@ * [SubscriberOperatorLike](interfaces/subscriberoperatorlike.md) | ||
* [ignoreElements](README.md#const-ignoreelements) | ||
* [iterate](README.md#const-iterate) | ||
* [keep](README.md#const-keep) | ||
@@ -54,7 +56,2 @@ * [lift](README.md#const-lift) | ||
* [throttle](README.md#const-throttle) | ||
* [throttleFirst](README.md#const-throttlefirst) | ||
* [throttleFirstTime](README.md#const-throttlefirsttime) | ||
* [throttleLast](README.md#const-throttlelast) | ||
* [throttleLastTime](README.md#const-throttlelasttime) | ||
* [throttleTime](README.md#const-throttletime) | ||
* [throws](README.md#const-throws) | ||
@@ -64,3 +61,2 @@ * [timeout](README.md#const-timeout) | ||
* [toIterable](README.md#const-toiterable) | ||
* [toIterator](README.md#const-toiterator) | ||
* [toPromise](README.md#const-topromise) | ||
@@ -519,20 +515,2 @@ * [withLatestFrom](README.md#const-withlatestfrom) | ||
### `Const` iterate | ||
▸ **iterate**<**T**>(`schedulerFactory`: function): *OperatorLike‹ObservableLike‹T›, void›* | ||
**Type parameters:** | ||
▪ **T** | ||
**Parameters:** | ||
▪`Default value` **schedulerFactory**: *function*= createSynchronousSchedulerResource | ||
▸ (): *VirtualTimeSchedulerResourceLike* | ||
**Returns:** *OperatorLike‹ObservableLike‹T›, void›* | ||
___ | ||
### `Const` keep | ||
@@ -999,3 +977,3 @@ | ||
▸ **throttle**<**T**>(`durationSelector`: function): *[ObservableOperatorLike](interfaces/observableoperatorlike.md)‹T, T›* | ||
▸ **throttle**<**T**>(`duration`: function | number, `mode`: [ThrottleMode](enums/throttlemode.md)): *[ObservableOperatorLike](interfaces/observableoperatorlike.md)‹T, T›* | ||
@@ -1008,12 +986,7 @@ **Type parameters:** | ||
▪ **durationSelector**: *function* | ||
Name | Type | Default | | ||
------ | ------ | ------ | | ||
`duration` | function | number | - | | ||
`mode` | [ThrottleMode](enums/throttlemode.md) | ThrottleMode.Interval | | ||
▸ (`next`: T): *ObservableLike‹unknown›* | ||
**Parameters:** | ||
Name | Type | | ||
------ | ------ | | ||
`next` | T | | ||
**Returns:** *[ObservableOperatorLike](interfaces/observableoperatorlike.md)‹T, T›* | ||
@@ -1023,104 +996,2 @@ | ||
### `Const` throttleFirst | ||
▸ **throttleFirst**<**T**>(`durationSelector`: function): *[ObservableOperatorLike](interfaces/observableoperatorlike.md)‹T, T›* | ||
**Type parameters:** | ||
▪ **T** | ||
**Parameters:** | ||
▪ **durationSelector**: *function* | ||
▸ (`next`: T): *ObservableLike‹unknown›* | ||
**Parameters:** | ||
Name | Type | | ||
------ | ------ | | ||
`next` | T | | ||
**Returns:** *[ObservableOperatorLike](interfaces/observableoperatorlike.md)‹T, T›* | ||
___ | ||
### `Const` throttleFirstTime | ||
▸ **throttleFirstTime**<**T**>(`duration`: number): *[ObservableOperatorLike](interfaces/observableoperatorlike.md)‹T, T›* | ||
**Type parameters:** | ||
▪ **T** | ||
**Parameters:** | ||
Name | Type | | ||
------ | ------ | | ||
`duration` | number | | ||
**Returns:** *[ObservableOperatorLike](interfaces/observableoperatorlike.md)‹T, T›* | ||
___ | ||
### `Const` throttleLast | ||
▸ **throttleLast**<**T**>(`durationSelector`: function): *[ObservableOperatorLike](interfaces/observableoperatorlike.md)‹T, T›* | ||
**Type parameters:** | ||
▪ **T** | ||
**Parameters:** | ||
▪ **durationSelector**: *function* | ||
▸ (`next`: T): *ObservableLike‹unknown›* | ||
**Parameters:** | ||
Name | Type | | ||
------ | ------ | | ||
`next` | T | | ||
**Returns:** *[ObservableOperatorLike](interfaces/observableoperatorlike.md)‹T, T›* | ||
___ | ||
### `Const` throttleLastTime | ||
▸ **throttleLastTime**<**T**>(`duration`: number): *[ObservableOperatorLike](interfaces/observableoperatorlike.md)‹T, T›* | ||
**Type parameters:** | ||
▪ **T** | ||
**Parameters:** | ||
Name | Type | | ||
------ | ------ | | ||
`duration` | number | | ||
**Returns:** *[ObservableOperatorLike](interfaces/observableoperatorlike.md)‹T, T›* | ||
___ | ||
### `Const` throttleTime | ||
▸ **throttleTime**<**T**>(`duration`: number): *[ObservableOperatorLike](interfaces/observableoperatorlike.md)‹T, T›* | ||
**Type parameters:** | ||
▪ **T** | ||
**Parameters:** | ||
Name | Type | | ||
------ | ------ | | ||
`duration` | number | | ||
**Returns:** *[ObservableOperatorLike](interfaces/observableoperatorlike.md)‹T, T›* | ||
___ | ||
### `Const` throws | ||
@@ -1147,3 +1018,3 @@ | ||
▸ **timeout**<**T**>(`duration`: number): *[ObservableOperatorLike](interfaces/observableoperatorlike.md)‹T, T›* | ||
▸ **timeout**<**T**>(`duration`: number | ObservableLike‹unknown›): *[ObservableOperatorLike](interfaces/observableoperatorlike.md)‹T, T›* | ||
@@ -1158,3 +1029,3 @@ **Type parameters:** | ||
------ | ------ | | ||
`duration` | number | | ||
`duration` | number | ObservableLike‹unknown› | | ||
@@ -1201,20 +1072,2 @@ **Returns:** *[ObservableOperatorLike](interfaces/observableoperatorlike.md)‹T, T›* | ||
### `Const` toIterator | ||
▸ **toIterator**<**T**>(`schedulerFactory`: function): *OperatorLike‹ObservableLike‹T›, [IteratorResource](interfaces/iteratorresource.md)‹T››* | ||
**Type parameters:** | ||
▪ **T** | ||
**Parameters:** | ||
▪`Default value` **schedulerFactory**: *function*= createSynchronousSchedulerResource | ||
▸ (): *VirtualTimeSchedulerResourceLike* | ||
**Returns:** *OperatorLike‹ObservableLike‹T›, [IteratorResource](interfaces/iteratorresource.md)‹T››* | ||
___ | ||
### `Const` toPromise | ||
@@ -1221,0 +1074,0 @@ |
{ | ||
"name": "@reactive-js/observable", | ||
"version": "0.0.9", | ||
"version": "0.0.10", | ||
"main": "dist/cjs/index.js", | ||
@@ -41,7 +41,7 @@ "module": "dist/esm5/index.js", | ||
"dependencies": { | ||
"@reactive-js/disposable": "^0.0.9", | ||
"@reactive-js/pipe": "^0.0.9", | ||
"@reactive-js/rx": "^0.0.9", | ||
"@reactive-js/scheduler": "^0.0.9", | ||
"@reactive-js/schedulers": "^0.0.9" | ||
"@reactive-js/disposable": "^0.0.10", | ||
"@reactive-js/pipe": "^0.0.10", | ||
"@reactive-js/rx": "^0.0.10", | ||
"@reactive-js/scheduler": "^0.0.10", | ||
"@reactive-js/schedulers": "^0.0.10" | ||
}, | ||
@@ -73,3 +73,3 @@ "devDependencies": { | ||
}, | ||
"gitHead": "24008ff071cc86dfb1eaf3940996e6feb8e69704" | ||
"gitHead": "fef42eb9cbce4260e80cf397f7fbd2a3bf3311d5" | ||
} |
@@ -10,3 +10,3 @@ export { lift } from "./internal/lift"; | ||
} from "./internal/interfaces"; | ||
export { iterate, toArray, toIterable, toIterator } from "./internal/iterate"; | ||
export { toArray, toIterable } from "./internal/iterate"; | ||
export { | ||
@@ -36,12 +36,5 @@ empty, | ||
export { takeWhile } from "./internal/takeWhile"; | ||
export { | ||
throttleFirst, | ||
throttleFirstTime, | ||
throttleLast, | ||
throttleLastTime, | ||
throttle, | ||
throttleTime, | ||
} from "./internal/throttle"; | ||
export { ThrottleMode, throttle } from "./internal/throttle"; | ||
export { throws } from "./internal/throws"; | ||
export { timeout } from "./internal/timeout"; | ||
export { withLatestFrom } from "./internal/withLatestFrom"; |
@@ -15,3 +15,5 @@ import { ObservableLike, SubscriberLike } from "@reactive-js/rx"; | ||
const continuation: SchedulerContinuationLike = shouldYield => { | ||
if (index < values.length && delay > 0) { | ||
if (subscriber.isDisposed) { | ||
return; | ||
} else if (index < values.length && delay > 0) { | ||
const value = values[index]; | ||
@@ -22,3 +24,3 @@ index++; | ||
} else { | ||
while (index < values.length) { | ||
while (index < values.length && !subscriber.isDisposed) { | ||
const value = values[index]; | ||
@@ -67,3 +69,3 @@ index++; | ||
) => { | ||
while (index < values.length) { | ||
while (index < values.length && !subscriber.isDisposed) { | ||
const [, value] = values[index]; | ||
@@ -70,0 +72,0 @@ index++; |
@@ -22,4 +22,5 @@ import { ObservableLike, SubscriberLike } from "@reactive-js/rx"; | ||
let next = iterator.next(); | ||
if (next.done) { | ||
if (subscriber.isDisposed) { | ||
return; | ||
} else if (next.done) { | ||
subscriber.complete(); | ||
@@ -31,3 +32,3 @@ return; | ||
} else { | ||
for (; !next.done; next = iterator.next()) { | ||
for (; !next.done && !subscriber.isDisposed; next = iterator.next()) { | ||
subscriber.next(next.value); | ||
@@ -34,0 +35,0 @@ |
@@ -21,2 +21,4 @@ import { ObservableLike, SubscriberLike } from "@reactive-js/rx"; | ||
} else if (delay > 0) { | ||
subscriber.next(acc); | ||
try { | ||
@@ -29,6 +31,7 @@ acc = generator(acc); | ||
subscriber.next(acc); | ||
return continuationResult; | ||
} else { | ||
do { | ||
subscriber.next(acc); | ||
try { | ||
@@ -40,6 +43,4 @@ acc = generator(acc); | ||
} | ||
} while (!shouldYield() && !subscriber.isDisposed); | ||
subscriber.next(acc); | ||
} while (!shouldYield()); | ||
return continuationResult; | ||
@@ -46,0 +47,0 @@ } |
@@ -14,9 +14,5 @@ import { | ||
import { OperatorLike, pipe } from "@reactive-js/pipe"; | ||
import { | ||
DisposableLike, | ||
DisposableOrTeardown, | ||
throwIfDisposed, | ||
} from "@reactive-js/disposable"; | ||
import { throwIfDisposed } from "@reactive-js/disposable"; | ||
export const iterate = <T>( | ||
const iterate = <T>( | ||
schedulerFactory: () => VirtualTimeSchedulerResourceLike = createSynchronousSchedulerResource, | ||
@@ -75,4 +71,2 @@ ): OperatorLike<ObservableLike<T>, void> => observable => { | ||
export interface IteratorResource<T> extends Iterator<T>, DisposableLike {} | ||
const iteratorDone: IteratorReturnResult<any> = { | ||
@@ -83,3 +77,3 @@ done: true, | ||
class ObservableIteratorResourceImpl<T> implements IteratorResource<T> { | ||
class ObservableIteratorImpl<T> implements Iterator<T> { | ||
private readonly scheduler: VirtualTimeSchedulerResourceLike; | ||
@@ -112,19 +106,4 @@ | ||
get isDisposed(): boolean { | ||
return this.scheduler.isDisposed; | ||
} | ||
add( | ||
disposable: DisposableOrTeardown, | ||
...disposables: DisposableOrTeardown[] | ||
) { | ||
this.scheduler.add(disposable, ...disposables); | ||
} | ||
dispose() { | ||
this.scheduler.dispose(); | ||
} | ||
next(): IteratorResult<T> { | ||
throwIfDisposed(this); | ||
throwIfDisposed(this.scheduler); | ||
@@ -143,2 +122,4 @@ let done = false; | ||
if (done) { | ||
// Cleanup | ||
this.scheduler.dispose(); | ||
return iteratorDone; | ||
@@ -152,3 +133,3 @@ } else { | ||
return(): IteratorResult<T> { | ||
this.dispose(); | ||
this.scheduler.dispose(); | ||
return iteratorDone; | ||
@@ -158,3 +139,3 @@ } | ||
throw(e?: any): IteratorResult<T> { | ||
this.dispose; | ||
this.scheduler.dispose; | ||
if (e !== undefined) { | ||
@@ -165,16 +146,9 @@ throw e; | ||
} | ||
remove( | ||
disposable: DisposableOrTeardown, | ||
...disposables: DisposableOrTeardown[] | ||
) { | ||
this.scheduler.remove(disposable, ...disposables); | ||
} | ||
} | ||
export const toIterator = <T>( | ||
const toIterator = <T>( | ||
schedulerFactory: () => VirtualTimeSchedulerResourceLike = createSynchronousSchedulerResource, | ||
): OperatorLike<ObservableLike<T>, IteratorResource<T>> => observable => { | ||
): OperatorLike<ObservableLike<T>, Iterator<T>> => observable => { | ||
const scheduler = schedulerFactory(); | ||
return new ObservableIteratorResourceImpl(scheduler, observable); | ||
return new ObservableIteratorImpl(scheduler, observable); | ||
}; | ||
@@ -181,0 +155,0 @@ |
@@ -18,51 +18,12 @@ import { | ||
class ThrottleFirstSubscriber<T> extends DelegatingSubscriber<T, T> { | ||
private readonly durationSelector: (next: T) => ObservableLike<unknown>; | ||
private readonly durationSubscription: SerialDisposableLike = createSerialDisposable(); | ||
constructor( | ||
delegate: SubscriberLike<T>, | ||
durationSelector: (next: T) => ObservableLike<unknown>, | ||
) { | ||
super(delegate); | ||
this.durationSelector = durationSelector; | ||
this.add(this.durationSubscription); | ||
} | ||
protected onComplete(error?: ErrorLike) { | ||
this.remove(this.durationSubscription); | ||
this.delegate.complete(error); | ||
} | ||
protected onNext(data: T) { | ||
if (this.durationSubscription.disposable.isDisposed) { | ||
this.durationSubscription.disposable = pipe( | ||
this.durationSelector(data), | ||
connect(this), | ||
); | ||
this.delegate.next(data); | ||
} | ||
} | ||
export const enum ThrottleMode { | ||
First = 1, | ||
Last = 2, | ||
Interval = 3, | ||
} | ||
const throttleFirstOperator = <T>( | ||
durationSelector: (next: T) => ObservableLike<unknown>, | ||
): SubscriberOperatorLike<T, T> => subscriber => | ||
new ThrottleFirstSubscriber(subscriber, durationSelector); | ||
export const throttleFirst = <T>( | ||
durationSelector: (next: T) => ObservableLike<unknown>, | ||
): ObservableOperatorLike<T, T> => | ||
lift(throttleFirstOperator(durationSelector)); | ||
export const throttleFirstTime = <T>( | ||
duration: number, | ||
): ObservableOperatorLike<T, T> => { | ||
const durationSelector = (_: T) => empty(duration); | ||
return throttleFirst(durationSelector); | ||
}; | ||
class ThrottleLastSubscriber<T> extends DelegatingSubscriber<T, T> { | ||
class ThrottleSubscriber<T> extends DelegatingSubscriber<T, T> { | ||
private readonly durationSelector: (next: T) => ObservableLike<unknown>; | ||
private readonly durationSubscription: SerialDisposableLike = createSerialDisposable(); | ||
private readonly mode: ThrottleMode; | ||
private value: [T] | undefined = undefined; | ||
@@ -75,11 +36,16 @@ private readonly notifyNext = () => { | ||
this.setupDurationSubscription(next); | ||
this.delegate.next(next); | ||
} | ||
}; | ||
constructor( | ||
delegate: SubscriberLike<T>, | ||
durationSelector: (next: T) => ObservableLike<unknown>, | ||
mode: ThrottleMode, | ||
) { | ||
super(delegate); | ||
this.durationSelector = durationSelector; | ||
this.mode = mode; | ||
@@ -89,5 +55,13 @@ this.add(this.durationSubscription); | ||
private setupDurationSubscription(next: T) { | ||
this.durationSubscription.disposable = pipe( | ||
this.durationSelector(next), | ||
onComplete(this.notifyNext), | ||
connect(this), | ||
); | ||
} | ||
protected onComplete(error?: ErrorLike) { | ||
this.remove(this.durationSubscription); | ||
if (error === undefined) { | ||
if (error === undefined && this.mode !== ThrottleMode.First) { | ||
this.notifyNext(); | ||
@@ -99,73 +73,17 @@ } | ||
protected onNext(data: T) { | ||
this.value = [data]; | ||
if (this.durationSubscription.disposable.isDisposed) { | ||
this.durationSubscription.disposable = pipe( | ||
this.durationSelector(data), | ||
onComplete(this.notifyNext), | ||
connect(this), | ||
); | ||
if (this.value !== undefined) { | ||
this.value[0] = data; | ||
} else { | ||
this.value = [data]; | ||
} | ||
} | ||
} | ||
const throttleLastOperator = <T>( | ||
durationSelector: (next: T) => ObservableLike<unknown>, | ||
): SubscriberOperatorLike<T, T> => subscriber => | ||
new ThrottleLastSubscriber(subscriber, durationSelector); | ||
export const throttleLast = <T>( | ||
durationSelector: (next: T) => ObservableLike<unknown>, | ||
): ObservableOperatorLike<T, T> => lift(throttleLastOperator(durationSelector)); | ||
export const throttleLastTime = <T>( | ||
duration: number, | ||
): ObservableOperatorLike<T, T> => { | ||
const durationSelector = (_: T) => empty(duration); | ||
return throttleLast(durationSelector); | ||
}; | ||
class ThrottleSubscriber<T> extends DelegatingSubscriber<T, T> { | ||
private readonly durationSelector: (next: T) => ObservableLike<unknown>; | ||
private readonly durationSubscription: SerialDisposableLike = createSerialDisposable(); | ||
private value: [T] | undefined = undefined; | ||
private readonly notifyNext = () => { | ||
const value = this.value; | ||
if (value !== undefined) { | ||
this.value = undefined; | ||
const [next] = value; | ||
this.durationSubscription.disposable = pipe( | ||
this.durationSelector(next), | ||
onComplete(this.notifyNext), | ||
connect(this), | ||
); | ||
this.delegate.next(next); | ||
} | ||
}; | ||
constructor( | ||
delegate: SubscriberLike<T>, | ||
durationSelector: (next: T) => ObservableLike<unknown>, | ||
) { | ||
super(delegate); | ||
this.durationSelector = durationSelector; | ||
this.add(this.durationSubscription); | ||
} | ||
protected onComplete(error?: ErrorLike) { | ||
this.remove(this.durationSubscription); | ||
if (error === undefined) { | ||
if ( | ||
this.durationSubscription.disposable.isDisposed && | ||
this.mode !== ThrottleMode.Last | ||
) { | ||
this.notifyNext(); | ||
} else if (this.durationSubscription.disposable.isDisposed) { | ||
this.setupDurationSubscription(data); | ||
} | ||
this.delegate.complete(error); | ||
} | ||
protected onNext(data: T) { | ||
this.value = [data]; | ||
if (this.durationSubscription.disposable.isDisposed) { | ||
this.notifyNext(); | ||
} | ||
} | ||
} | ||
@@ -175,14 +93,15 @@ | ||
durationSelector: (next: T) => ObservableLike<unknown>, | ||
mode: ThrottleMode, | ||
): SubscriberOperatorLike<T, T> => subscriber => | ||
new ThrottleSubscriber(subscriber, durationSelector); | ||
new ThrottleSubscriber(subscriber, durationSelector, mode); | ||
export const throttle = <T>( | ||
durationSelector: (next: T) => ObservableLike<unknown>, | ||
): ObservableOperatorLike<T, T> => lift(throttleOperator(durationSelector)); | ||
export const throttleTime = <T>( | ||
duration: number, | ||
): ObservableOperatorLike<T, T> => { | ||
const durationSelector = (_: T) => empty(duration); | ||
return throttle(durationSelector); | ||
}; | ||
duration: ((next: T) => ObservableLike<unknown>) | number, | ||
mode: ThrottleMode = ThrottleMode.Interval, | ||
): ObservableOperatorLike<T, T> => | ||
lift( | ||
throttleOperator( | ||
typeof duration === "number" ? _ => empty(duration) : duration, | ||
mode, | ||
), | ||
); |
@@ -10,6 +10,7 @@ import { | ||
DelegatingSubscriber, | ||
ObservableLike, | ||
} from "@reactive-js/rx"; | ||
import { ObservableOperatorLike, SubscriberOperatorLike } from "./interfaces"; | ||
import { lift } from "./lift"; | ||
import { onError } from "./observe"; | ||
import { onComplete } from "./observe"; | ||
import { pipe } from "@reactive-js/pipe"; | ||
@@ -21,12 +22,6 @@ import { throws } from "./throws"; | ||
class TimeoutSubscriber<T> extends DelegatingSubscriber<T, T> { | ||
private readonly duration: number; | ||
private readonly duration: ObservableLike<unknown>; | ||
private readonly durationSubscription: SerialDisposableLike = createSerialDisposable(); | ||
private readonly setupTimeout = () => { | ||
this.durationSubscription.disposable = pipe( | ||
throws(timeoutError, this.duration), | ||
onError(cause => this.complete({ cause })), | ||
connect(this), | ||
); | ||
}; | ||
constructor(delegate: SubscriberLike<T>, duration: number) { | ||
constructor(delegate: SubscriberLike<T>, duration: ObservableLike<unknown>) { | ||
super(delegate); | ||
@@ -44,3 +39,8 @@ this.duration = duration; | ||
protected onNext(data: T) { | ||
this.setupTimeout(); | ||
this.durationSubscription.disposable = pipe( | ||
this.duration, | ||
onComplete(error => this.complete(error)), | ||
connect(this), | ||
); | ||
this.delegate.next(data); | ||
@@ -51,7 +51,13 @@ } | ||
const operator = <T>( | ||
duration: number, | ||
duration: ObservableLike<unknown>, | ||
): SubscriberOperatorLike<T, T> => subscriber => | ||
new TimeoutSubscriber(subscriber, duration); | ||
export const timeout = <T>(duration: number): ObservableOperatorLike<T, T> => | ||
lift(operator(duration)); | ||
export const timeout = <T>( | ||
duration: number | ObservableLike<unknown>, | ||
): ObservableOperatorLike<T, T> => | ||
lift( | ||
operator( | ||
typeof duration === "number" ? throws(timeoutError, duration) : duration, | ||
), | ||
); |
@@ -42,5 +42,9 @@ import { | ||
toArray, | ||
iterate, | ||
toIterable, | ||
fromIterable, | ||
repeat, | ||
timeout, | ||
throttle, | ||
ThrottleMode, | ||
takeWhile, | ||
} from "../src/index"; | ||
@@ -111,3 +115,3 @@ | ||
i => i + 2, | ||
() => 1, | ||
() => 3, | ||
2, | ||
@@ -120,3 +124,3 @@ ), | ||
i => i + 2, | ||
() => 0, | ||
() => 2, | ||
3, | ||
@@ -152,3 +156,4 @@ ), | ||
onNext(cb), | ||
iterate(), | ||
ignoreElements(), | ||
toArray(), | ||
), | ||
@@ -183,3 +188,5 @@ ).toThrow(cause); | ||
expect(() => pipe(src, concatAll(), onNext(cb), iterate())).toThrow(cause); | ||
expect(() => | ||
pipe(src, concatAll(), onNext(cb), ignoreElements(), toArray()), | ||
).toThrow(cause); | ||
expect(cb).toHaveBeenNthCalledWith(1, 1); | ||
@@ -197,3 +204,5 @@ expect(cb).toHaveBeenNthCalledWith(2, 2); | ||
expect(() => pipe(src, concatAll(), onNext(cb), iterate())).toThrow(cause); | ||
expect(() => | ||
pipe(src, concatAll(), onNext(cb), ignoreElements(), toArray()), | ||
).toThrow(cause); | ||
expect(cb).toHaveBeenNthCalledWith(1, 1); | ||
@@ -245,3 +254,4 @@ expect(cb).toHaveBeenNthCalledWith(2, 2); | ||
onNext(cb), | ||
iterate(), | ||
ignoreElements(), | ||
toArray(), | ||
), | ||
@@ -266,6 +276,3 @@ ).toThrow(cause); | ||
const observable = fromArray(src); | ||
const result = pipe( | ||
observable, | ||
toArray(() => createVirtualTimeSchedulerResource(1)), | ||
); | ||
const result = pipe(observable, toArray()); | ||
expect(result).toEqual(src); | ||
@@ -297,5 +304,12 @@ }); | ||
describe("fromIterable", () => { | ||
test("with no delay", () => { | ||
test("with no delay when scheduler does not request yields", () => { | ||
const src = [1, 2, 3, 4, 5, 6]; | ||
const observable = fromIterable(src); | ||
const result = pipe(observable, toArray()); | ||
expect(result).toEqual(src); | ||
}); | ||
test("with no delay when scheduler requests yields", () => { | ||
const src = [1, 2, 3, 4, 5, 6]; | ||
const observable = fromIterable(src); | ||
const result = pipe( | ||
@@ -328,2 +342,21 @@ observable, | ||
}); | ||
test("calls iterator.return when disposed", () => { | ||
const iterator = { | ||
next: jest.fn(), | ||
return: jest.fn(), | ||
throw: jest.fn(), | ||
}; | ||
const mockIterable = { | ||
[Symbol.iterator](): Iterator<unknown> { | ||
return iterator; | ||
}, | ||
}; | ||
const scheduler = createVirtualTimeSchedulerResource(1); | ||
const subscription = connect(scheduler)(fromIterable(mockIterable)); | ||
subscription.dispose(); | ||
expect(mockIterable[Symbol.iterator]().return).toHaveBeenCalledTimes(1); | ||
}); | ||
}); | ||
@@ -388,6 +421,6 @@ | ||
i => i + 1, | ||
() => 0, | ||
() => 1, | ||
), | ||
take(5), | ||
toArray(() => createVirtualTimeSchedulerResource(1)), | ||
toArray(), | ||
); | ||
@@ -412,6 +445,7 @@ | ||
pipe( | ||
generate(generator, () => 0), | ||
generate(generator, () => 1), | ||
take(5), | ||
onNext(cb), | ||
iterate(() => createVirtualTimeSchedulerResource(1)), | ||
ignoreElements(), | ||
toArray(), | ||
), | ||
@@ -432,3 +466,3 @@ ).toThrow(cause); | ||
i => i + 1, | ||
() => 0, | ||
() => 1, | ||
5, | ||
@@ -465,3 +499,3 @@ ), | ||
pipe( | ||
generate(generator, () => 0, 5), | ||
generate(generator, () => 1, 5), | ||
map(x => [scheduler.now, x]), | ||
@@ -524,3 +558,3 @@ take(5), | ||
i => i + 2, | ||
() => 1, | ||
() => 3, | ||
2, | ||
@@ -533,3 +567,3 @@ ), | ||
i => i + 2, | ||
() => 0, | ||
() => 2, | ||
3, | ||
@@ -558,3 +592,3 @@ ), | ||
pipe(never(), observe(observer), iterate()); | ||
expect(() => pipe(never(), observe(observer), toArray())).toThrow(); | ||
@@ -624,13 +658,19 @@ expect(observer.next).toHaveBeenCalledTimes(0); | ||
describe("throws", () => { | ||
test("completes with an exception when subscribed", () => { | ||
const scheduler = createVirtualTimeSchedulerResource(); | ||
const observer = createMockObserver(); | ||
const cause = new Error(); | ||
describe("repeat", () => { | ||
test("repeats the observable n times", () => { | ||
const result = pipe(ofValue(1), repeat(3), toArray()); | ||
expect(result).toEqual([1, 1, 1]); | ||
}); | ||
pipe(throws(cause), observe(observer), connect(scheduler)); | ||
scheduler.run(); | ||
expect(observer.next).toBeCalledTimes(0); | ||
expect(observer.complete).toBeCalledWith({ cause }); | ||
test("when the repeat functions throws throws", () => { | ||
const error = new Error(); | ||
expect(() => | ||
pipe( | ||
ofValue(1), | ||
repeat(_ => { | ||
throw error; | ||
}), | ||
toArray(), | ||
), | ||
).toThrow(error); | ||
}); | ||
@@ -652,3 +692,3 @@ }); | ||
observe(observer), | ||
iterate(), | ||
toArray(), | ||
), | ||
@@ -669,10 +709,3 @@ ).toThrow(cause); | ||
expect(() => | ||
pipe( | ||
src, | ||
switchAll(), | ||
onNext(cb), | ||
iterate(() => createVirtualTimeSchedulerResource()), | ||
), | ||
).toThrow(cause); | ||
expect(() => pipe(src, switchAll(), onNext(cb), toArray())).toThrow(cause); | ||
@@ -689,7 +722,3 @@ expect(cb).toBeCalledTimes(4); | ||
const src = fromArray([1, 2, 3, 4]); | ||
const result = pipe( | ||
src, | ||
takeLast(3), | ||
toArray(() => createVirtualTimeSchedulerResource(2)), | ||
); | ||
const result = pipe(src, takeLast(3), toArray()); | ||
expect(result).toEqual([2, 3, 4]); | ||
@@ -708,3 +737,3 @@ }); | ||
observe(observer), | ||
iterate(() => createVirtualTimeSchedulerResource()), | ||
toArray(() => createVirtualTimeSchedulerResource()), | ||
), | ||
@@ -716,15 +745,146 @@ ).toThrow(cause); | ||
test("toIterable", () => { | ||
const iterable = pipe( | ||
fromArray([1, 2, 3, 4]), | ||
map(x => x + 1), | ||
toIterable(), | ||
test("takeWhile", () => { | ||
const result = pipe( | ||
generate( | ||
x => x + 1, | ||
() => 0, | ||
), | ||
takeWhile(x => x < 3), | ||
toArray(), | ||
); | ||
const acc = []; | ||
for (const v of iterable) { | ||
acc.push(v); | ||
} | ||
expect(acc).toEqual([2, 3, 4, 5]); | ||
expect(result).toEqual([0, 1, 2]); | ||
}); | ||
describe("throttle", () => { | ||
test("first", () => { | ||
const result = pipe( | ||
generate( | ||
x => x + 1, | ||
() => 0, | ||
1, | ||
), | ||
take(100), | ||
throttle(50, ThrottleMode.First), | ||
toArray(() => createVirtualTimeSchedulerResource(1)), | ||
); | ||
expect(result).toEqual([0, 49, 99]); | ||
}); | ||
test("last", () => { | ||
const result = pipe( | ||
generate( | ||
x => x + 1, | ||
() => 0, | ||
1, | ||
), | ||
take(200), | ||
throttle(50, ThrottleMode.Last), | ||
toArray(() => createVirtualTimeSchedulerResource(1)), | ||
); | ||
expect(result).toEqual([49, 99, 149, 199]); | ||
}); | ||
test("interval", () => { | ||
const result = pipe( | ||
generate( | ||
x => x + 1, | ||
() => 0, | ||
1, | ||
), | ||
take(200), | ||
throttle(75, ThrottleMode.Interval), | ||
toArray(() => createVirtualTimeSchedulerResource(1)), | ||
); | ||
expect(result).toEqual([0, 74, 149, 199]); | ||
}); | ||
}); | ||
describe("throws", () => { | ||
test("completes with an exception when subscribed", () => { | ||
const scheduler = createVirtualTimeSchedulerResource(); | ||
const observer = createMockObserver(); | ||
const cause = new Error(); | ||
pipe(throws(cause), observe(observer), connect(scheduler)); | ||
scheduler.run(); | ||
expect(observer.next).toBeCalledTimes(0); | ||
expect(observer.complete).toBeCalledWith({ cause }); | ||
}); | ||
}); | ||
describe("timeout", () => { | ||
test("throws when a timeout occurs", () => { | ||
expect(() => | ||
pipe( | ||
ofValue(1, 2), | ||
timeout(1), | ||
toArray(() => createVirtualTimeSchedulerResource(2)), | ||
), | ||
).toThrow(); | ||
}); | ||
test("when timeout is greater than observed time", () => { | ||
const result = pipe( | ||
ofValue(1, 2), | ||
timeout(3), | ||
toArray(() => createVirtualTimeSchedulerResource(2)), | ||
); | ||
expect(result).toEqual([1]); | ||
}); | ||
}); | ||
describe("toIterable", () => { | ||
test("iterate using a for of loop", () => { | ||
const iterable = pipe( | ||
fromArray([1, 2, 3, 4]), | ||
map(x => x + 1), | ||
toIterable(), | ||
); | ||
const acc = []; | ||
for (const v of iterable) { | ||
acc.push(v); | ||
} | ||
expect(acc).toEqual([2, 3, 4, 5]); | ||
}); | ||
test("rethrows an error when the source throws", () => { | ||
const error = new Error(); | ||
const iterable = pipe(throws(error), toIterable()); | ||
expect(() => { | ||
for (const _ of iterable) { | ||
} | ||
}).toThrowError(error); | ||
}); | ||
test("calling throw, throws the error", () => { | ||
const error = new Error(); | ||
const iterator = pipe(fromArray([1, 2, 3, 4]), toIterable())[ | ||
Symbol.iterator | ||
](); | ||
expect(() => (iterator as any).throw(error)).toThrowError(error); | ||
}); | ||
test("calling throw without an error returns done.", () => { | ||
const result = (pipe(fromArray([1, 2, 3, 4]), toIterable())[ | ||
Symbol.iterator | ||
]() as any).throw(); | ||
expect((result as any).done).toBeTruthy(); | ||
}); | ||
test("calling return, returns done", () => { | ||
const result = (pipe(fromArray([1, 2, 3, 4]), toIterable())[ | ||
Symbol.iterator | ||
]() as any).return(); | ||
expect((result as any).done).toBeTruthy(); | ||
}); | ||
}); | ||
describe("toPromise", () => { | ||
@@ -763,3 +923,3 @@ test("when the observable produces no values", () => { | ||
onNext(cb), | ||
iterate(createVirtualTimeSchedulerResource), | ||
toArray(createVirtualTimeSchedulerResource), | ||
), | ||
@@ -766,0 +926,0 @@ ).toThrow(cause); |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
1366935
337
6346
+ Added@reactive-js/disposable@0.0.10(transitive)
+ Added@reactive-js/pipe@0.0.10(transitive)
+ Added@reactive-js/rx@0.0.10(transitive)
+ Added@reactive-js/scheduler@0.0.10(transitive)
+ Added@reactive-js/schedulers@0.0.10(transitive)
- Removed@reactive-js/disposable@0.0.9(transitive)
- Removed@reactive-js/pipe@0.0.9(transitive)
- Removed@reactive-js/rx@0.0.9(transitive)
- Removed@reactive-js/scheduler@0.0.9(transitive)
- Removed@reactive-js/schedulers@0.0.9(transitive)
Updated@reactive-js/pipe@^0.0.10
Updated@reactive-js/rx@^0.0.10