@reactive-js/observable
Advanced tools
Comparing version 0.0.30 to 0.0.31
export { MulticastObservableLike, ObserverLike, ObservableLike, ObservableOperatorLike, SafeSubscriberLike, SubjectLike, SubscriberLike, SubscriberOperatorLike, } from "./internal/interfaces"; | ||
export { observableMixin } from "./internal/observable"; | ||
export { AbstractSubscriber, AbstractDelegatingSubscriber, } from "./internal/subscriber"; | ||
export { enumerate } from "./internal/observable"; | ||
export { AbstractDelegatingSubscriber } from "./internal/subscriber"; | ||
export { combineLatest } from "./internal/combineLatest"; | ||
@@ -5,0 +5,0 @@ export { createObservable } from "./internal/createObservable"; |
"use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
var observable_1 = require("./internal/observable"); | ||
exports.observableMixin = observable_1.observableMixin; | ||
exports.enumerate = observable_1.enumerate; | ||
var subscriber_1 = require("./internal/subscriber"); | ||
exports.AbstractSubscriber = subscriber_1.AbstractSubscriber; | ||
exports.AbstractDelegatingSubscriber = subscriber_1.AbstractDelegatingSubscriber; | ||
@@ -8,0 +7,0 @@ var combineLatest_1 = require("./internal/combineLatest"); |
"use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
const disposable_1 = require("@reactive-js/disposable"); | ||
const observable_1 = require("./observable"); | ||
const producer_1 = require("./producer"); | ||
const subscriber_1 = require("./subscriber"); | ||
class CombineLatestSubscriber extends subscriber_1.AbstractDelegatingSubscriber { | ||
constructor(delegate, ctx, index) { | ||
super(delegate); | ||
constructor(ctx, index) { | ||
super(ctx.subscriber); | ||
this.ctx = ctx; | ||
this.index = index; | ||
this.hasProducedValue = false; | ||
this.ready = false; | ||
this.add(error => { | ||
@@ -16,3 +16,3 @@ const ctx = this.ctx; | ||
if (error !== undefined || ctx.completedCount === ctx.totalCount) { | ||
delegate.dispose(error); | ||
this.delegate.dispose(error); | ||
} | ||
@@ -25,7 +25,7 @@ }); | ||
latest[this.index] = next; | ||
if (!this.hasProducedValue) { | ||
ctx.producedCount++; | ||
this.hasProducedValue = true; | ||
if (!this.ready) { | ||
ctx.readyCount++; | ||
this.ready = true; | ||
} | ||
if (ctx.producedCount === ctx.totalCount) { | ||
if (ctx.readyCount === ctx.totalCount) { | ||
const result = ctx.selector(...latest); | ||
@@ -36,3 +36,3 @@ this.delegate.notify(result); | ||
} | ||
class CombineLatestProducer { | ||
class CombineLatestSchedulerContinuation { | ||
constructor(subscriber, observables, selector) { | ||
@@ -43,33 +43,29 @@ this.subscriber = subscriber; | ||
this.completedCount = 0; | ||
this.producedCount = 0; | ||
this.run = producer_1.producerMixin.run; | ||
this.readyCount = 0; | ||
this.add = disposable_1.add; | ||
this.delay = 0; | ||
this.disposable = disposable_1.createDisposable(_ => { | ||
this.isDisposed = true; | ||
}); | ||
this.dispose = disposable_1.dispose; | ||
this.isDisposed = false; | ||
subscriber.add(this); | ||
this.totalCount = observables.length; | ||
this.latest = new Array(this.totalCount); | ||
} | ||
produce(_) { | ||
run(_) { | ||
const observables = this.observables; | ||
const totalCount = this.totalCount; | ||
const subscriber = this.subscriber; | ||
for (let index = 0; index < totalCount; index++) { | ||
const innerSubscriber = new CombineLatestSubscriber(subscriber, this, index); | ||
const innerSubscriber = new CombineLatestSubscriber(this, index); | ||
observables[index].subscribe(innerSubscriber); | ||
} | ||
this.dispose(); | ||
} | ||
} | ||
class CombineLatestObservable { | ||
constructor(observables, selector) { | ||
this.observables = observables; | ||
this.selector = selector; | ||
this.enumerate = observable_1.observableMixin.enumerate; | ||
this.isSynchronous = observables.every(obs => obs.isSynchronous); | ||
} | ||
subscribe(subscriber) { | ||
const producer = new CombineLatestProducer(subscriber, this.observables, this.selector); | ||
subscriber.schedule(producer); | ||
} | ||
} | ||
function combineLatest(observables, selector) { | ||
return new CombineLatestObservable(observables, selector); | ||
const factory = (subscriber) => new CombineLatestSchedulerContinuation(subscriber, observables, selector); | ||
return observable_1.createScheduledObservable(factory, observables.every(obs => obs.isSynchronous)); | ||
} | ||
exports.combineLatest = combineLatest; | ||
//# sourceMappingURL=combineLatest.js.map |
"use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
const subscriber_1 = require("./subscriber"); | ||
const fromArray_1 = require("./fromArray"); | ||
const observable_1 = require("./observable"); | ||
class ConcatSubscriber extends subscriber_1.AbstractDelegatingSubscriber { | ||
constructor(delegate, enumerator) { | ||
constructor(delegate, observables, next) { | ||
super(delegate); | ||
this.enumerator = enumerator; | ||
this.observables = observables; | ||
this.next = next; | ||
this.add(error => { | ||
const observables = this.observables; | ||
const next = this.next; | ||
if (error !== undefined) { | ||
delegate.dispose(error); | ||
} | ||
else if (next < observables.length) { | ||
const concatSubscriber = new ConcatSubscriber(delegate, observables, next + 1); | ||
observables[next].subscribe(concatSubscriber); | ||
} | ||
else { | ||
const enumerator = this.enumerator; | ||
if (enumerator.move()) { | ||
const concatSubscriber = new ConcatSubscriber(delegate, enumerator); | ||
enumerator.current.subscribe(concatSubscriber); | ||
} | ||
else { | ||
delegate.dispose(); | ||
} | ||
delegate.dispose(); | ||
} | ||
@@ -33,11 +32,10 @@ }); | ||
this.observables = observables; | ||
this.enumerate = observable_1.observableMixin.enumerate; | ||
this.enumerate = observable_1.enumerate; | ||
this.isSynchronous = observables.every(obs => obs.isSynchronous); | ||
} | ||
subscribe(subscriber) { | ||
const enumerator = fromArray_1.fromArray(this.observables).enumerate(); | ||
subscriber.add(enumerator); | ||
const concatSubscriber = new ConcatSubscriber(subscriber, enumerator); | ||
if (enumerator.move()) { | ||
enumerator.current.subscribe(concatSubscriber); | ||
const observables = this.observables; | ||
if (observables.length > 0) { | ||
const concatSubscriber = new ConcatSubscriber(subscriber, observables, 1); | ||
observables[0].subscribe(concatSubscriber); | ||
} | ||
@@ -44,0 +42,0 @@ else { |
@@ -8,3 +8,3 @@ "use strict"; | ||
this.onSubscribe = onSubscribe; | ||
this.enumerate = observable_1.observableMixin.enumerate; | ||
this.enumerate = observable_1.enumerate; | ||
this.isSynchronous = false; | ||
@@ -11,0 +11,0 @@ } |
import { OperatorLike } from "@reactive-js/pipe"; | ||
import { VirtualTimeSchedulerLike } from "@reactive-js/scheduler"; | ||
import { ObservableLike } from "./interfaces"; | ||
export declare const forEach: (schedulerFactory: () => VirtualTimeSchedulerLike) => <T>(onNotify: (next: T) => void) => OperatorLike<ObservableLike<T>, void>; | ||
export declare const forEach: <T>(schedulerFactory: () => VirtualTimeSchedulerLike, onNotify: (next: T) => void) => OperatorLike<ObservableLike<T>, void>; |
@@ -15,3 +15,3 @@ "use strict"; | ||
} | ||
exports.forEach = (schedulerFactory) => (onNotify) => observable => { | ||
exports.forEach = (schedulerFactory, onNotify) => observable => { | ||
const scheduler = schedulerFactory(); | ||
@@ -18,0 +18,0 @@ const observer = new ForEachObserver(onNotify); |
@@ -5,5 +5,6 @@ "use strict"; | ||
const producer_1 = require("./producer"); | ||
class FromArrayProducer { | ||
const alwaysTrue = () => true; | ||
class FromArrayProducer extends producer_1.AbstractProducer { | ||
constructor(subscriber, values, startIndex, delay) { | ||
this.subscriber = subscriber; | ||
super(subscriber); | ||
this.values = values; | ||
@@ -13,25 +14,18 @@ this.startIndex = startIndex; | ||
this.index = this.startIndex; | ||
this.run = producer_1.producerMixin.run; | ||
} | ||
produce(shouldYield) { | ||
const delay = this.delay; | ||
const values = this.values; | ||
const length = values.length; | ||
const subscriber = this.subscriber; | ||
let index = this.index; | ||
if (this.delay > 0 && index <= length && !subscriber.isDisposed) { | ||
if (index < length) { | ||
const value = values[index]; | ||
subscriber.notify(value); | ||
} | ||
this.index++; | ||
return this; | ||
} | ||
else if (shouldYield !== undefined) { | ||
while (index < length && !subscriber.isDisposed) { | ||
const value = values[index]; | ||
if (delay > 0 || shouldYield !== undefined) { | ||
let isDisposed = this.isDisposed; | ||
shouldYield = shouldYield || alwaysTrue; | ||
while (index < length && !isDisposed) { | ||
this.notify(values[index]); | ||
index++; | ||
subscriber.notify(value); | ||
if (shouldYield()) { | ||
isDisposed = this.isDisposed; | ||
if (index < length && !isDisposed && (delay > 0 || shouldYield())) { | ||
this.index = index; | ||
return this; | ||
return; | ||
} | ||
@@ -41,25 +35,10 @@ } | ||
else { | ||
while (index < length && !subscriber.isDisposed) { | ||
const value = values[index]; | ||
while (index < length && !this.isDisposed) { | ||
this.notify(values[index]); | ||
index++; | ||
subscriber.notify(value); | ||
} | ||
} | ||
subscriber.dispose(); | ||
return; | ||
this.dispose(); | ||
} | ||
} | ||
class FromArrayObservable { | ||
constructor(values, startIndex, delay) { | ||
this.values = values; | ||
this.startIndex = startIndex; | ||
this.delay = delay; | ||
this.enumerate = observable_1.observableMixin.enumerate; | ||
this.isSynchronous = delay === 0; | ||
} | ||
subscribe(subscriber) { | ||
const producer = new FromArrayProducer(subscriber, this.values, this.startIndex, this.delay); | ||
subscriber.schedule(producer); | ||
} | ||
} | ||
function fromArray(values, options = {}) { | ||
@@ -69,5 +48,6 @@ var _a, _b; | ||
const startIndex = Math.min((_b = options.startIndex, (_b !== null && _b !== void 0 ? _b : 0)), values.length); | ||
return new FromArrayObservable(values, startIndex, delay); | ||
const factory = (subscriber) => new FromArrayProducer(subscriber, values, startIndex, delay); | ||
return observable_1.createScheduledObservable(factory, delay === 0); | ||
} | ||
exports.fromArray = fromArray; | ||
//# sourceMappingURL=fromArray.js.map |
@@ -5,71 +5,47 @@ "use strict"; | ||
const producer_1 = require("./producer"); | ||
class FromIteratorProducer { | ||
const alwaysTrue = () => true; | ||
class FromIteratorProducer extends producer_1.AbstractProducer { | ||
constructor(subscriber, iterator, delay) { | ||
this.subscriber = subscriber; | ||
super(subscriber); | ||
this.iterator = iterator; | ||
this.delay = delay; | ||
this.run = producer_1.producerMixin.run; | ||
this.next = iterator.next(); | ||
} | ||
produce(shouldYield) { | ||
const delay = this.delay; | ||
const iterator = this.iterator; | ||
const subscriber = this.subscriber; | ||
if (this.delay > 0 && !subscriber.isDisposed) { | ||
const next = iterator.next(); | ||
if (!next.done) { | ||
subscriber.notify(next.value); | ||
return this; | ||
} | ||
} | ||
else if (shouldYield !== undefined) { | ||
let done = false; | ||
while (!subscriber.isDisposed && !done) { | ||
const next = iterator.next(); | ||
done = next.done || false; | ||
if (!done) { | ||
subscriber.notify(next.value); | ||
let next = this.next; | ||
let done = next.done; | ||
if (delay > 0 || shouldYield !== undefined) { | ||
let isDisposed = this.isDisposed; | ||
shouldYield = shouldYield || alwaysTrue; | ||
while (!done && !isDisposed) { | ||
this.notify(next.value); | ||
next = iterator.next(); | ||
done = next.done; | ||
isDisposed = this.isDisposed; | ||
if (!done && !isDisposed && (delay > 0 || shouldYield())) { | ||
this.next = next; | ||
return; | ||
} | ||
if (!done && shouldYield()) { | ||
return this; | ||
} | ||
} | ||
} | ||
else { | ||
let done = false; | ||
while (!subscriber.isDisposed && !done) { | ||
const next = iterator.next(); | ||
done = next.done || false; | ||
if (!done) { | ||
subscriber.notify(next.value); | ||
} | ||
while (!this.isDisposed && !done) { | ||
this.notify(next.value); | ||
next = iterator.next(); | ||
done = next.done; | ||
} | ||
} | ||
subscriber.dispose(); | ||
return; | ||
this.dispose(); | ||
} | ||
} | ||
class FromIteratorObservable { | ||
constructor(iterator, delay) { | ||
this.iterator = iterator; | ||
this.delay = delay; | ||
this.enumerate = observable_1.observableMixin.enumerate; | ||
this.isSynchronous = delay === 0; | ||
} | ||
subscribe(subscriber) { | ||
const producer = new FromIteratorProducer(subscriber, this.iterator, this.delay); | ||
subscriber.schedule(producer); | ||
} | ||
} | ||
function fromIterator(iterator, delay = 0) { | ||
return new FromIteratorObservable(iterator, delay); | ||
const factory = (subscriber) => new FromIteratorProducer(subscriber, iterator, delay); | ||
return observable_1.createScheduledObservable(factory, delay === 0); | ||
} | ||
exports.fromIterator = fromIterator; | ||
class FromIterableObservable { | ||
constructor(iterable, delay) { | ||
this.iterable = iterable; | ||
this.delay = delay; | ||
this.enumerate = observable_1.observableMixin.enumerate; | ||
this.isSynchronous = delay === 0; | ||
} | ||
subscribe(subscriber) { | ||
const iterator = this.iterable[Symbol.iterator](); | ||
function fromIterable(iterable, delay = 0) { | ||
const factory = (subscriber) => { | ||
const iterator = iterable[Symbol.iterator](); | ||
subscriber.add(error => { | ||
@@ -84,10 +60,7 @@ if (error !== undefined && iterator.throw !== undefined) { | ||
}); | ||
const producer = new FromIteratorProducer(subscriber, iterator, this.delay); | ||
subscriber.schedule(producer); | ||
} | ||
return new FromIteratorProducer(subscriber, iterator, delay); | ||
}; | ||
return observable_1.createScheduledObservable(factory, delay === 0); | ||
} | ||
function fromIterable(iterable, delay = 0) { | ||
return new FromIterableObservable(iterable, delay); | ||
} | ||
exports.fromIterable = fromIterable; | ||
//# sourceMappingURL=fromIterable.js.map |
@@ -5,8 +5,8 @@ "use strict"; | ||
const producer_1 = require("./producer"); | ||
class FromScheduledValuesProducer { | ||
const alwaysTrue = () => true; | ||
class FromScheduledValuesProducer extends producer_1.AbstractProducer { | ||
constructor(subscriber, values) { | ||
this.subscriber = subscriber; | ||
super(subscriber); | ||
this.values = values; | ||
this.index = 0; | ||
this.run = producer_1.producerMixin.run; | ||
const [delay] = this.values[0]; | ||
@@ -16,37 +16,29 @@ this.delay = delay; | ||
produce(shouldYield) { | ||
const subscriber = this.subscriber; | ||
const values = this.values; | ||
const length = values.length; | ||
let index = this.index; | ||
while (index < values.length && !subscriber.isDisposed) { | ||
let isDisposed = this.isDisposed; | ||
shouldYield = shouldYield || alwaysTrue; | ||
while (index < length && !isDisposed) { | ||
const [, value] = values[index]; | ||
this.notify(value); | ||
index++; | ||
subscriber.notify(value); | ||
if (index < values.length) { | ||
const delay = values[index][0] || 0; | ||
if (delay > 0 || (shouldYield !== undefined && shouldYield())) { | ||
this.index = index; | ||
this.delay = delay; | ||
return this; | ||
} | ||
const shouldYieldDueToDelay = index < length && values[index][0] > 0; | ||
isDisposed = this.isDisposed; | ||
if (index < length && | ||
!isDisposed && | ||
(shouldYieldDueToDelay || shouldYield())) { | ||
this.index = index; | ||
this.delay = values[index][0]; | ||
return; | ||
} | ||
} | ||
subscriber.dispose(); | ||
return; | ||
this.dispose(); | ||
} | ||
} | ||
class FromScheduledValuesObservable { | ||
constructor(values) { | ||
this.values = values; | ||
this.enumerate = observable_1.observableMixin.enumerate; | ||
this.isSynchronous = false; | ||
} | ||
subscribe(subscriber) { | ||
const producer = new FromScheduledValuesProducer(subscriber, this.values); | ||
subscriber.schedule(producer); | ||
} | ||
} | ||
function fromScheduledValues(...values) { | ||
return new FromScheduledValuesObservable(values); | ||
const factory = (subscriber) => new FromScheduledValuesProducer(subscriber, values); | ||
return observable_1.createScheduledObservable(factory, false); | ||
} | ||
exports.fromScheduledValues = fromScheduledValues; | ||
//# sourceMappingURL=fromScheduledValues.js.map |
@@ -5,28 +5,26 @@ "use strict"; | ||
const producer_1 = require("./producer"); | ||
class GenerateProducer { | ||
const alwaysTrue = () => true; | ||
class GenerateProducer extends producer_1.AbstractProducer { | ||
constructor(subscriber, generator, acc, delay) { | ||
this.subscriber = subscriber; | ||
super(subscriber); | ||
this.generator = generator; | ||
this.acc = acc; | ||
this.delay = delay; | ||
this.run = producer_1.producerMixin.run; | ||
} | ||
produce(shouldYield) { | ||
const generator = this.generator; | ||
const subscriber = this.subscriber; | ||
const delay = this.delay; | ||
let acc = this.acc; | ||
let result = undefined; | ||
if (this.delay > 0 && !subscriber.isDisposed) { | ||
subscriber.notify(acc); | ||
this.acc = generator(acc); | ||
result = this; | ||
} | ||
else if (shouldYield !== undefined) { | ||
while (!subscriber.isDisposed) { | ||
subscriber.notify(acc); | ||
acc = generator(acc); | ||
if (shouldYield()) { | ||
let isDisposed = this.isDisposed; | ||
if (delay > 0 || shouldYield !== undefined) { | ||
shouldYield = shouldYield || alwaysTrue; | ||
while (!isDisposed) { | ||
this.notify(acc); | ||
isDisposed = this.isDisposed; | ||
if (!isDisposed) { | ||
acc = generator(acc); | ||
} | ||
if (!isDisposed && (delay > 0 || shouldYield())) { | ||
this.acc = acc; | ||
result = this; | ||
break; | ||
return; | ||
} | ||
@@ -36,27 +34,17 @@ } | ||
else { | ||
while (!subscriber.isDisposed) { | ||
subscriber.notify(acc); | ||
acc = generator(acc); | ||
while (!isDisposed) { | ||
this.notify(acc); | ||
isDisposed = this.isDisposed; | ||
if (!isDisposed) { | ||
acc = generator(acc); | ||
} | ||
} | ||
} | ||
return result; | ||
} | ||
} | ||
class GenerateObservable { | ||
constructor(generator, acc, delay) { | ||
this.generator = generator; | ||
this.acc = acc; | ||
this.delay = delay; | ||
this.enumerate = observable_1.observableMixin.enumerate; | ||
this.isSynchronous = delay === 0; | ||
} | ||
subscribe(subscriber) { | ||
const producer = new GenerateProducer(subscriber, this.generator, this.acc, this.delay); | ||
subscriber.schedule(producer); | ||
} | ||
} | ||
function generate(generator, initialValue, delay = 0) { | ||
return new GenerateObservable(generator, initialValue(), delay); | ||
const factory = (subscriber) => new GenerateProducer(subscriber, generator, initialValue(), delay); | ||
return observable_1.createScheduledObservable(factory, delay === 0); | ||
} | ||
exports.generate = generate; | ||
//# sourceMappingURL=generate.js.map |
@@ -9,3 +9,3 @@ "use strict"; | ||
this.isSynchronous = isSynchronous; | ||
this.enumerate = observable_1.observableMixin.enumerate; | ||
this.enumerate = observable_1.enumerate; | ||
} | ||
@@ -12,0 +12,0 @@ subscribe(subscriber) { |
@@ -24,3 +24,3 @@ "use strict"; | ||
this.observables = observables; | ||
this.enumerate = observable_1.observableMixin.enumerate; | ||
this.enumerate = observable_1.enumerate; | ||
this.isSynchronous = false; | ||
@@ -27,0 +27,0 @@ } |
@@ -6,3 +6,3 @@ "use strict"; | ||
constructor() { | ||
this.enumerate = observable_1.observableMixin.enumerate; | ||
this.enumerate = observable_1.enumerate; | ||
this.isSynchronous = false; | ||
@@ -9,0 +9,0 @@ } |
import { EnumeratorLike } from "@reactive-js/enumerable"; | ||
import { ObservableLike } from "./interfaces"; | ||
export declare const observableMixin: { | ||
enumerate<T>(this: ObservableLike<T>): EnumeratorLike<void, T>; | ||
}; | ||
import { SchedulerContinuationLike } from "@reactive-js/scheduler"; | ||
import { ObservableLike, SubscriberLike } from "./interfaces"; | ||
export declare function enumerate<T>(this: ObservableLike<T>): EnumeratorLike<void, T>; | ||
export declare const createScheduledObservable: <T>(factory: (subscriber: SubscriberLike<T>) => SchedulerContinuationLike, isSynchronous: boolean) => ObservableLike<T>; |
"use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
const enumerable_1 = require("@reactive-js/enumerable"); | ||
const disposable_1 = require("@reactive-js/disposable"); | ||
const alwaysTrue = () => true; | ||
class EnumeratorSubscriber extends enumerable_1.AbstractEnumerator { | ||
class EnumeratorSubscriber { | ||
constructor() { | ||
super(); | ||
this.continuations = []; | ||
this.add = disposable_1.add; | ||
this.disposable = disposable_1.createDisposable(() => { this.isDisposed = true; }); | ||
this.dispose = disposable_1.dispose; | ||
this.error = undefined; | ||
this.hasCurrent = false; | ||
this.isDisposed = false; | ||
this.now = 0; | ||
@@ -16,2 +20,3 @@ this.add(error => { | ||
move() { | ||
const continuations = this.continuations; | ||
this.hasCurrent = false; | ||
@@ -21,6 +26,10 @@ this.current = undefined; | ||
while (!this.hasCurrent) { | ||
if (this.isDisposed || this.continuation === undefined) { | ||
const continuation = continuations.shift(); | ||
if (continuation === undefined || continuation.isDisposed) { | ||
break; | ||
} | ||
this.continuation = this.continuation.run(alwaysTrue) || undefined; | ||
continuation.run(alwaysTrue); | ||
if (!continuation.isDisposed) { | ||
continuations.push(continuation); | ||
} | ||
const error = this.error; | ||
@@ -39,15 +48,26 @@ if (error !== undefined) { | ||
schedule(continuation, delay = 0) { | ||
if (!this.isDisposed && delay === 0) { | ||
this.continuation = continuation; | ||
this.add(continuation); | ||
if (!continuation.isDisposed && delay === 0) { | ||
this.continuations.push(continuation); | ||
} | ||
return this; | ||
} | ||
} | ||
exports.observableMixin = { | ||
enumerate() { | ||
const subscriber = new EnumeratorSubscriber(); | ||
this.subscribe(subscriber); | ||
return subscriber; | ||
}, | ||
}; | ||
function enumerate() { | ||
const subscriber = new EnumeratorSubscriber(); | ||
this.subscribe(subscriber); | ||
return subscriber; | ||
} | ||
exports.enumerate = enumerate; | ||
class ScheduledObservable { | ||
constructor(factory, isSynchronous) { | ||
this.factory = factory; | ||
this.isSynchronous = isSynchronous; | ||
this.enumerate = enumerate; | ||
} | ||
subscribe(subscriber) { | ||
const schedulerContinuation = this.factory(subscriber); | ||
subscriber.schedule(schedulerContinuation); | ||
} | ||
} | ||
exports.createScheduledObservable = (factory, isSynchronous) => new ScheduledObservable(factory, isSynchronous); | ||
//# sourceMappingURL=observable.js.map |
@@ -0,10 +1,15 @@ | ||
import { add, dispose } from "@reactive-js/disposable"; | ||
import { SchedulerContinuationLike } from "@reactive-js/scheduler"; | ||
import { SubscriberLike } from "./interfaces"; | ||
interface ProducerLike<T> extends SchedulerContinuationLike { | ||
readonly subscriber: SubscriberLike<T>; | ||
produce(shouldYield?: () => boolean): SchedulerContinuationLike | void; | ||
export declare abstract class AbstractProducer<T> implements SchedulerContinuationLike { | ||
private readonly subscriber; | ||
readonly add: typeof add; | ||
readonly disposable: import("@reactive-js/disposable").DisposableLike; | ||
abstract readonly delay: number; | ||
readonly dispose: typeof dispose; | ||
isDisposed: boolean; | ||
constructor(subscriber: SubscriberLike<T>); | ||
notify(next: T): void; | ||
abstract produce(shouldYield?: () => boolean): void; | ||
run(shouldYield?: () => boolean): void; | ||
} | ||
export declare const producerMixin: { | ||
run: <T>(this: ProducerLike<T>, shouldYield?: (() => boolean) | undefined) => SchedulerContinuationLike | undefined; | ||
}; | ||
export {}; |
"use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
exports.producerMixin = { | ||
run: function run(shouldYield) { | ||
try { | ||
const result = this.produce(shouldYield); | ||
if (result !== undefined) { | ||
return result; | ||
const disposable_1 = require("@reactive-js/disposable"); | ||
class AbstractProducer { | ||
constructor(subscriber) { | ||
this.subscriber = subscriber; | ||
this.add = disposable_1.add; | ||
this.disposable = disposable_1.createDisposable(_ => { | ||
this.isDisposed = true; | ||
}); | ||
this.dispose = disposable_1.dispose; | ||
this.isDisposed = false; | ||
this.add(subscriber); | ||
} | ||
notify(next) { | ||
this.subscriber.notify(next); | ||
} | ||
run(shouldYield) { | ||
if (!this.isDisposed) { | ||
try { | ||
this.produce(shouldYield); | ||
} | ||
catch (cause) { | ||
const error = { cause }; | ||
this.dispose(error); | ||
} | ||
} | ||
catch (cause) { | ||
const error = { cause }; | ||
this.subscriber.dispose(error); | ||
} | ||
return; | ||
}, | ||
}; | ||
} | ||
} | ||
exports.AbstractProducer = AbstractProducer; | ||
//# sourceMappingURL=producer.js.map |
@@ -16,3 +16,3 @@ "use strict"; | ||
this.initialValue = initialValue; | ||
this.enumerate = observable_1.observableMixin.enumerate; | ||
this.enumerate = observable_1.enumerate; | ||
this.isSynchronous = false; | ||
@@ -19,0 +19,0 @@ } |
@@ -17,3 +17,3 @@ "use strict"; | ||
}; | ||
this.enumerate = observable_1.observableMixin.enumerate; | ||
this.enumerate = observable_1.enumerate; | ||
this.isSynchronous = false; | ||
@@ -20,0 +20,0 @@ } |
@@ -12,3 +12,3 @@ "use strict"; | ||
this.replayed = []; | ||
this.enumerate = observable_1.observableMixin.enumerate; | ||
this.enumerate = observable_1.enumerate; | ||
this.isSynchronous = false; | ||
@@ -15,0 +15,0 @@ } |
@@ -1,12 +0,8 @@ | ||
import { DisposableLike } from "@reactive-js/disposable"; | ||
import { add, dispose } from "@reactive-js/disposable"; | ||
import { SchedulerContinuationLike, SchedulerLike } from "@reactive-js/scheduler"; | ||
import { SubscriberLike } from "./interfaces"; | ||
export declare abstract class AbstractSubscriber<T> implements SubscriberLike<T> { | ||
readonly add: <This extends DisposableLike>(this: { | ||
disposable: DisposableLike; | ||
} & This, disposable: DisposableLike | ((error?: import("@reactive-js/disposable").ErrorLike | undefined) => void)) => This; | ||
readonly disposable: DisposableLike; | ||
readonly dispose: (this: { | ||
disposable: DisposableLike; | ||
}, error?: import("@reactive-js/disposable").ErrorLike | undefined) => void; | ||
readonly add: typeof add; | ||
readonly disposable: import("@reactive-js/disposable").DisposableLike; | ||
readonly dispose: typeof dispose; | ||
isDisposed: boolean; | ||
@@ -19,3 +15,3 @@ private readonly scheduler; | ||
scheduler: SchedulerLike; | ||
}, continuation: SchedulerContinuationLike): DisposableLike; | ||
}, continuation: SchedulerContinuationLike): void; | ||
} | ||
@@ -22,0 +18,0 @@ export declare abstract class AbstractDelegatingSubscriber<TA, TB> extends AbstractSubscriber<TA> { |
@@ -6,7 +6,7 @@ "use strict"; | ||
constructor(scheduler) { | ||
this.add = disposable_1.disposableMixin.add; | ||
this.add = disposable_1.add; | ||
this.disposable = disposable_1.createDisposable(_ => { | ||
this.isDisposed = true; | ||
}); | ||
this.dispose = disposable_1.disposableMixin.dispose; | ||
this.dispose = disposable_1.dispose; | ||
this.isDisposed = false; | ||
@@ -19,5 +19,4 @@ this.scheduler = scheduler.scheduler || scheduler; | ||
schedule(continuation) { | ||
const schedulerSubscription = this.scheduler.schedule(continuation); | ||
this.add(schedulerSubscription); | ||
return schedulerSubscription; | ||
this.add(continuation); | ||
this.scheduler.schedule(continuation); | ||
} | ||
@@ -24,0 +23,0 @@ } |
import { ObservableLike } from "./interfaces"; | ||
export declare const throws: <T>(factory: () => unknown, delay?: number) => ObservableLike<T>; | ||
export declare const throws: <T>(errorFactory: () => unknown, delay?: number) => ObservableLike<T>; |
@@ -5,26 +5,16 @@ "use strict"; | ||
const producer_1 = require("./producer"); | ||
class ThrowsProducer { | ||
class ThrowsProducer extends producer_1.AbstractProducer { | ||
constructor(subscriber, error, delay) { | ||
this.subscriber = subscriber; | ||
super(subscriber); | ||
this.error = error; | ||
this.delay = delay; | ||
this.run = producer_1.producerMixin.run; | ||
} | ||
produce(_) { | ||
this.subscriber.dispose(this.error); | ||
this.dispose(this.error); | ||
} | ||
} | ||
class ThrowsObservable { | ||
constructor(factory, delay) { | ||
this.factory = factory; | ||
this.delay = delay; | ||
this.enumerate = observable_1.observableMixin.enumerate; | ||
this.isSynchronous = delay === 0; | ||
} | ||
subscribe(subscriber) { | ||
const producer = new ThrowsProducer(subscriber, { cause: this.factory() }, this.delay); | ||
subscriber.schedule(producer); | ||
} | ||
} | ||
exports.throws = (factory, delay = 0) => new ThrowsObservable(factory, delay); | ||
exports.throws = (errorFactory, delay = 0) => { | ||
const factory = (subscriber) => new ThrowsProducer(subscriber, { cause: errorFactory() }, delay); | ||
return observable_1.createScheduledObservable(factory, delay === 0); | ||
}; | ||
//# sourceMappingURL=throws.js.map |
"use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
const producer_1 = require("./producer"); | ||
const disposable_1 = require("@reactive-js/disposable"); | ||
const subscriber_1 = require("./subscriber"); | ||
class SafeSubscriberSchedulerContinuation { | ||
constructor(subscriber) { | ||
this.subscriber = subscriber; | ||
this.add = disposable_1.add; | ||
this.delay = 0; | ||
this.disposable = disposable_1.createDisposable(_ => { | ||
this.isDisposed = true; | ||
}); | ||
this.dispose = disposable_1.dispose; | ||
this.isDisposed = false; | ||
} | ||
produce(shouldYield) { | ||
const subscriber = this.subscriber; | ||
const nextQueue = subscriber.nextQueue; | ||
const delegate = subscriber.delegate; | ||
if (shouldYield !== undefined) { | ||
while (nextQueue.length > 0 && !delegate.isDisposed) { | ||
const next = nextQueue.shift(); | ||
delegate.notify(next); | ||
const hasRemainingEvents = subscriber.nextQueue.length > 0 || subscriber.isDisposed; | ||
if (hasRemainingEvents && shouldYield()) { | ||
return; | ||
} | ||
} | ||
} | ||
else { | ||
while (nextQueue.length > 0 && !delegate.isDisposed) { | ||
const next = nextQueue.shift(); | ||
delegate.notify(next); | ||
} | ||
} | ||
if (subscriber.isDisposed) { | ||
delegate.dispose(subscriber.error); | ||
} | ||
this.dispose(); | ||
} | ||
run(shouldYield) { | ||
if (!this.isDisposed) { | ||
try { | ||
this.produce(shouldYield); | ||
} | ||
catch (cause) { | ||
const error = { cause }; | ||
this.subscriber.dispose(error); | ||
this.dispose(); | ||
} | ||
} | ||
} | ||
} | ||
const scheduleDrainQueue = (subscriber) => { | ||
if (subscriber.remainingEvents === 1) { | ||
subscriber.delegate.schedule(subscriber); | ||
const remainingEvents = subscriber.nextQueue.length + (subscriber.isDisposed ? 1 : 0); | ||
if (remainingEvents === 1) { | ||
const producer = new SafeSubscriberSchedulerContinuation(subscriber); | ||
subscriber.delegate.schedule(producer); | ||
} | ||
@@ -15,3 +66,2 @@ }; | ||
this.nextQueue = []; | ||
this.run = producer_1.producerMixin.run; | ||
this.add(error => { | ||
@@ -22,5 +72,2 @@ this.error = error; | ||
} | ||
get remainingEvents() { | ||
return this.nextQueue.length + (this.isDisposed ? 1 : 0); | ||
} | ||
notify(next) { | ||
@@ -35,26 +82,4 @@ this.delegate.notify(next); | ||
} | ||
produce(shouldYield) { | ||
const delegate = this.delegate; | ||
const nextQueue = this.nextQueue; | ||
if (shouldYield !== undefined) { | ||
while (nextQueue.length > 0 && !delegate.isDisposed) { | ||
const next = nextQueue.shift(); | ||
delegate.notify(next); | ||
if (shouldYield() && this.remainingEvents > 0) { | ||
return this; | ||
} | ||
} | ||
} | ||
else { | ||
while (nextQueue.length > 0 && !delegate.isDisposed) { | ||
const next = nextQueue.shift(); | ||
delegate.notify(next); | ||
} | ||
} | ||
if (this.isDisposed) { | ||
delegate.dispose(this.error); | ||
} | ||
} | ||
} | ||
exports.toSafeSubscriber = (subscriber) => new SafeSubscriberImpl(subscriber); | ||
//# sourceMappingURL=toSafeSubscriber.js.map |
@@ -8,3 +8,3 @@ "use strict"; | ||
this.observableFactory = observableFactory; | ||
this.enumerate = observable_1.observableMixin.enumerate; | ||
this.enumerate = observable_1.enumerate; | ||
this.isSynchronous = false; | ||
@@ -11,0 +11,0 @@ } |
@@ -83,9 +83,9 @@ "use strict"; | ||
} | ||
class ZipProducer { | ||
class ZipProducer extends producer_1.AbstractProducer { | ||
constructor(subscriber, enumerators, selector) { | ||
this.subscriber = subscriber; | ||
super(subscriber); | ||
this.enumerators = enumerators; | ||
this.selector = selector; | ||
this.delay = 0; | ||
this.hasCurrent = false; | ||
this.run = producer_1.producerMixin.run; | ||
} | ||
@@ -95,12 +95,15 @@ produce(shouldYield) { | ||
const selector = this.selector; | ||
const subscriber = this.subscriber; | ||
if (shouldYield !== undefined) { | ||
while (shouldEmit(enumerators) && !subscriber.isDisposed) { | ||
let isDisposed = this.isDisposed; | ||
let shouldEmitNext = shouldEmit(enumerators); | ||
while (shouldEmitNext && !isDisposed) { | ||
const next = selector(...enumerators.map(getCurrent)); | ||
subscriber.notify(next); | ||
this.notify(next); | ||
isDisposed = this.isDisposed; | ||
for (const buffer of enumerators) { | ||
buffer.move(); | ||
} | ||
if (shouldYield()) { | ||
return this; | ||
shouldEmitNext = shouldEmit(enumerators); | ||
if (shouldEmitNext && !isDisposed && shouldYield()) { | ||
return; | ||
} | ||
@@ -110,3 +113,3 @@ } | ||
else { | ||
while (shouldEmit(enumerators) && !subscriber.isDisposed) { | ||
while (shouldEmit(enumerators) && !this.isDisposed) { | ||
const next = selector(...enumerators.map(getCurrent)); | ||
@@ -116,7 +119,6 @@ for (const enumerator of enumerators) { | ||
} | ||
subscriber.notify(next); | ||
this.notify(next); | ||
} | ||
} | ||
subscriber.dispose(); | ||
return; | ||
this.dispose(); | ||
} | ||
@@ -128,3 +130,3 @@ } | ||
this.selector = selector; | ||
this.enumerate = observable_1.observableMixin.enumerate; | ||
this.enumerate = observable_1.enumerate; | ||
this.isSynchronous = observables.every(obs => obs.isSynchronous); | ||
@@ -131,0 +133,0 @@ } |
export { MulticastObservableLike, ObserverLike, ObservableLike, ObservableOperatorLike, SafeSubscriberLike, SubjectLike, SubscriberLike, SubscriberOperatorLike, } from "./internal/interfaces"; | ||
export { observableMixin } from "./internal/observable"; | ||
export { AbstractSubscriber, AbstractDelegatingSubscriber, } from "./internal/subscriber"; | ||
export { enumerate } from "./internal/observable"; | ||
export { AbstractDelegatingSubscriber } from "./internal/subscriber"; | ||
export { combineLatest } from "./internal/combineLatest"; | ||
@@ -5,0 +5,0 @@ export { createObservable } from "./internal/createObservable"; |
@@ -1,3 +0,3 @@ | ||
export { observableMixin } from "./internal/observable"; | ||
export { AbstractSubscriber, AbstractDelegatingSubscriber, } from "./internal/subscriber"; | ||
export { enumerate } from "./internal/observable"; | ||
export { AbstractDelegatingSubscriber } from "./internal/subscriber"; | ||
export { combineLatest } from "./internal/combineLatest"; | ||
@@ -4,0 +4,0 @@ export { createObservable } from "./internal/createObservable"; |
@@ -1,10 +0,10 @@ | ||
import { observableMixin } from "./observable"; | ||
import { producerMixin } from "./producer"; | ||
import { add, createDisposable, dispose } from "@reactive-js/disposable"; | ||
import { createScheduledObservable } from "./observable"; | ||
import { AbstractDelegatingSubscriber } from "./subscriber"; | ||
class CombineLatestSubscriber extends AbstractDelegatingSubscriber { | ||
constructor(delegate, ctx, index) { | ||
super(delegate); | ||
constructor(ctx, index) { | ||
super(ctx.subscriber); | ||
this.ctx = ctx; | ||
this.index = index; | ||
this.hasProducedValue = false; | ||
this.ready = false; | ||
this.add(error => { | ||
@@ -14,3 +14,3 @@ const ctx = this.ctx; | ||
if (error !== undefined || ctx.completedCount === ctx.totalCount) { | ||
delegate.dispose(error); | ||
this.delegate.dispose(error); | ||
} | ||
@@ -23,7 +23,7 @@ }); | ||
latest[this.index] = next; | ||
if (!this.hasProducedValue) { | ||
ctx.producedCount++; | ||
this.hasProducedValue = true; | ||
if (!this.ready) { | ||
ctx.readyCount++; | ||
this.ready = true; | ||
} | ||
if (ctx.producedCount === ctx.totalCount) { | ||
if (ctx.readyCount === ctx.totalCount) { | ||
const result = ctx.selector(...latest); | ||
@@ -34,3 +34,3 @@ this.delegate.notify(result); | ||
} | ||
class CombineLatestProducer { | ||
class CombineLatestSchedulerContinuation { | ||
constructor(subscriber, observables, selector) { | ||
@@ -41,32 +41,28 @@ this.subscriber = subscriber; | ||
this.completedCount = 0; | ||
this.producedCount = 0; | ||
this.run = producerMixin.run; | ||
this.readyCount = 0; | ||
this.add = add; | ||
this.delay = 0; | ||
this.disposable = createDisposable(_ => { | ||
this.isDisposed = true; | ||
}); | ||
this.dispose = dispose; | ||
this.isDisposed = false; | ||
subscriber.add(this); | ||
this.totalCount = observables.length; | ||
this.latest = new Array(this.totalCount); | ||
} | ||
produce(_) { | ||
run(_) { | ||
const observables = this.observables; | ||
const totalCount = this.totalCount; | ||
const subscriber = this.subscriber; | ||
for (let index = 0; index < totalCount; index++) { | ||
const innerSubscriber = new CombineLatestSubscriber(subscriber, this, index); | ||
const innerSubscriber = new CombineLatestSubscriber(this, index); | ||
observables[index].subscribe(innerSubscriber); | ||
} | ||
this.dispose(); | ||
} | ||
} | ||
class CombineLatestObservable { | ||
constructor(observables, selector) { | ||
this.observables = observables; | ||
this.selector = selector; | ||
this.enumerate = observableMixin.enumerate; | ||
this.isSynchronous = observables.every(obs => obs.isSynchronous); | ||
} | ||
subscribe(subscriber) { | ||
const producer = new CombineLatestProducer(subscriber, this.observables, this.selector); | ||
subscriber.schedule(producer); | ||
} | ||
} | ||
export function combineLatest(observables, selector) { | ||
return new CombineLatestObservable(observables, selector); | ||
const factory = (subscriber) => new CombineLatestSchedulerContinuation(subscriber, observables, selector); | ||
return createScheduledObservable(factory, observables.every(obs => obs.isSynchronous)); | ||
} | ||
//# sourceMappingURL=combineLatest.js.map |
import { AbstractDelegatingSubscriber } from "./subscriber"; | ||
import { fromArray } from "./fromArray"; | ||
import { observableMixin } from "./observable"; | ||
import { enumerate } from "./observable"; | ||
class ConcatSubscriber extends AbstractDelegatingSubscriber { | ||
constructor(delegate, enumerator) { | ||
constructor(delegate, observables, next) { | ||
super(delegate); | ||
this.enumerator = enumerator; | ||
this.observables = observables; | ||
this.next = next; | ||
this.add(error => { | ||
const observables = this.observables; | ||
const next = this.next; | ||
if (error !== undefined) { | ||
delegate.dispose(error); | ||
} | ||
else if (next < observables.length) { | ||
const concatSubscriber = new ConcatSubscriber(delegate, observables, next + 1); | ||
observables[next].subscribe(concatSubscriber); | ||
} | ||
else { | ||
const enumerator = this.enumerator; | ||
if (enumerator.move()) { | ||
const concatSubscriber = new ConcatSubscriber(delegate, enumerator); | ||
enumerator.current.subscribe(concatSubscriber); | ||
} | ||
else { | ||
delegate.dispose(); | ||
} | ||
delegate.dispose(); | ||
} | ||
@@ -31,11 +30,10 @@ }); | ||
this.observables = observables; | ||
this.enumerate = observableMixin.enumerate; | ||
this.enumerate = enumerate; | ||
this.isSynchronous = observables.every(obs => obs.isSynchronous); | ||
} | ||
subscribe(subscriber) { | ||
const enumerator = fromArray(this.observables).enumerate(); | ||
subscriber.add(enumerator); | ||
const concatSubscriber = new ConcatSubscriber(subscriber, enumerator); | ||
if (enumerator.move()) { | ||
enumerator.current.subscribe(concatSubscriber); | ||
const observables = this.observables; | ||
if (observables.length > 0) { | ||
const concatSubscriber = new ConcatSubscriber(subscriber, observables, 1); | ||
observables[0].subscribe(concatSubscriber); | ||
} | ||
@@ -42,0 +40,0 @@ else { |
@@ -1,2 +0,2 @@ | ||
import { observableMixin } from "./observable"; | ||
import { enumerate } from "./observable"; | ||
import { toSafeSubscriber } from "./toSafeSubscriber"; | ||
@@ -6,3 +6,3 @@ class CreateObservable { | ||
this.onSubscribe = onSubscribe; | ||
this.enumerate = observableMixin.enumerate; | ||
this.enumerate = enumerate; | ||
this.isSynchronous = false; | ||
@@ -9,0 +9,0 @@ } |
import { OperatorLike } from "@reactive-js/pipe"; | ||
import { VirtualTimeSchedulerLike } from "@reactive-js/scheduler"; | ||
import { ObservableLike } from "./interfaces"; | ||
export declare const forEach: (schedulerFactory: () => VirtualTimeSchedulerLike) => <T>(onNotify: (next: T) => void) => OperatorLike<ObservableLike<T>, void>; | ||
export declare const forEach: <T>(schedulerFactory: () => VirtualTimeSchedulerLike, onNotify: (next: T) => void) => OperatorLike<ObservableLike<T>, void>; |
@@ -13,3 +13,3 @@ import { pipe } from "@reactive-js/pipe"; | ||
} | ||
export const forEach = (schedulerFactory) => (onNotify) => observable => { | ||
export const forEach = (schedulerFactory, onNotify) => observable => { | ||
const scheduler = schedulerFactory(); | ||
@@ -16,0 +16,0 @@ const observer = new ForEachObserver(onNotify); |
@@ -1,6 +0,7 @@ | ||
import { observableMixin } from "./observable"; | ||
import { producerMixin } from "./producer"; | ||
class FromArrayProducer { | ||
import { createScheduledObservable } from "./observable"; | ||
import { AbstractProducer } from "./producer"; | ||
const alwaysTrue = () => true; | ||
class FromArrayProducer extends AbstractProducer { | ||
constructor(subscriber, values, startIndex, delay) { | ||
this.subscriber = subscriber; | ||
super(subscriber); | ||
this.values = values; | ||
@@ -10,25 +11,18 @@ this.startIndex = startIndex; | ||
this.index = this.startIndex; | ||
this.run = producerMixin.run; | ||
} | ||
produce(shouldYield) { | ||
const delay = this.delay; | ||
const values = this.values; | ||
const length = values.length; | ||
const subscriber = this.subscriber; | ||
let index = this.index; | ||
if (this.delay > 0 && index <= length && !subscriber.isDisposed) { | ||
if (index < length) { | ||
const value = values[index]; | ||
subscriber.notify(value); | ||
} | ||
this.index++; | ||
return this; | ||
} | ||
else if (shouldYield !== undefined) { | ||
while (index < length && !subscriber.isDisposed) { | ||
const value = values[index]; | ||
if (delay > 0 || shouldYield !== undefined) { | ||
let isDisposed = this.isDisposed; | ||
shouldYield = shouldYield || alwaysTrue; | ||
while (index < length && !isDisposed) { | ||
this.notify(values[index]); | ||
index++; | ||
subscriber.notify(value); | ||
if (shouldYield()) { | ||
isDisposed = this.isDisposed; | ||
if (index < length && !isDisposed && (delay > 0 || shouldYield())) { | ||
this.index = index; | ||
return this; | ||
return; | ||
} | ||
@@ -38,25 +32,10 @@ } | ||
else { | ||
while (index < length && !subscriber.isDisposed) { | ||
const value = values[index]; | ||
while (index < length && !this.isDisposed) { | ||
this.notify(values[index]); | ||
index++; | ||
subscriber.notify(value); | ||
} | ||
} | ||
subscriber.dispose(); | ||
return; | ||
this.dispose(); | ||
} | ||
} | ||
class FromArrayObservable { | ||
constructor(values, startIndex, delay) { | ||
this.values = values; | ||
this.startIndex = startIndex; | ||
this.delay = delay; | ||
this.enumerate = observableMixin.enumerate; | ||
this.isSynchronous = delay === 0; | ||
} | ||
subscribe(subscriber) { | ||
const producer = new FromArrayProducer(subscriber, this.values, this.startIndex, this.delay); | ||
subscriber.schedule(producer); | ||
} | ||
} | ||
export function fromArray(values, options = {}) { | ||
@@ -66,4 +45,5 @@ var _a, _b; | ||
const startIndex = Math.min((_b = options.startIndex, (_b !== null && _b !== void 0 ? _b : 0)), values.length); | ||
return new FromArrayObservable(values, startIndex, delay); | ||
const factory = (subscriber) => new FromArrayProducer(subscriber, values, startIndex, delay); | ||
return createScheduledObservable(factory, delay === 0); | ||
} | ||
//# sourceMappingURL=fromArray.js.map |
@@ -1,71 +0,47 @@ | ||
import { observableMixin } from "./observable"; | ||
import { producerMixin } from "./producer"; | ||
class FromIteratorProducer { | ||
import { createScheduledObservable } from "./observable"; | ||
import { AbstractProducer } from "./producer"; | ||
const alwaysTrue = () => true; | ||
class FromIteratorProducer extends AbstractProducer { | ||
constructor(subscriber, iterator, delay) { | ||
this.subscriber = subscriber; | ||
super(subscriber); | ||
this.iterator = iterator; | ||
this.delay = delay; | ||
this.run = producerMixin.run; | ||
this.next = iterator.next(); | ||
} | ||
produce(shouldYield) { | ||
const delay = this.delay; | ||
const iterator = this.iterator; | ||
const subscriber = this.subscriber; | ||
if (this.delay > 0 && !subscriber.isDisposed) { | ||
const next = iterator.next(); | ||
if (!next.done) { | ||
subscriber.notify(next.value); | ||
return this; | ||
} | ||
} | ||
else if (shouldYield !== undefined) { | ||
let done = false; | ||
while (!subscriber.isDisposed && !done) { | ||
const next = iterator.next(); | ||
done = next.done || false; | ||
if (!done) { | ||
subscriber.notify(next.value); | ||
let next = this.next; | ||
let done = next.done; | ||
if (delay > 0 || shouldYield !== undefined) { | ||
let isDisposed = this.isDisposed; | ||
shouldYield = shouldYield || alwaysTrue; | ||
while (!done && !isDisposed) { | ||
this.notify(next.value); | ||
next = iterator.next(); | ||
done = next.done; | ||
isDisposed = this.isDisposed; | ||
if (!done && !isDisposed && (delay > 0 || shouldYield())) { | ||
this.next = next; | ||
return; | ||
} | ||
if (!done && shouldYield()) { | ||
return this; | ||
} | ||
} | ||
} | ||
else { | ||
let done = false; | ||
while (!subscriber.isDisposed && !done) { | ||
const next = iterator.next(); | ||
done = next.done || false; | ||
if (!done) { | ||
subscriber.notify(next.value); | ||
} | ||
while (!this.isDisposed && !done) { | ||
this.notify(next.value); | ||
next = iterator.next(); | ||
done = next.done; | ||
} | ||
} | ||
subscriber.dispose(); | ||
return; | ||
this.dispose(); | ||
} | ||
} | ||
class FromIteratorObservable { | ||
constructor(iterator, delay) { | ||
this.iterator = iterator; | ||
this.delay = delay; | ||
this.enumerate = observableMixin.enumerate; | ||
this.isSynchronous = delay === 0; | ||
} | ||
subscribe(subscriber) { | ||
const producer = new FromIteratorProducer(subscriber, this.iterator, this.delay); | ||
subscriber.schedule(producer); | ||
} | ||
} | ||
export function fromIterator(iterator, delay = 0) { | ||
return new FromIteratorObservable(iterator, delay); | ||
const factory = (subscriber) => new FromIteratorProducer(subscriber, iterator, delay); | ||
return createScheduledObservable(factory, delay === 0); | ||
} | ||
class FromIterableObservable { | ||
constructor(iterable, delay) { | ||
this.iterable = iterable; | ||
this.delay = delay; | ||
this.enumerate = observableMixin.enumerate; | ||
this.isSynchronous = delay === 0; | ||
} | ||
subscribe(subscriber) { | ||
const iterator = this.iterable[Symbol.iterator](); | ||
export function fromIterable(iterable, delay = 0) { | ||
const factory = (subscriber) => { | ||
const iterator = iterable[Symbol.iterator](); | ||
subscriber.add(error => { | ||
@@ -80,9 +56,6 @@ if (error !== undefined && iterator.throw !== undefined) { | ||
}); | ||
const producer = new FromIteratorProducer(subscriber, iterator, this.delay); | ||
subscriber.schedule(producer); | ||
} | ||
return new FromIteratorProducer(subscriber, iterator, delay); | ||
}; | ||
return createScheduledObservable(factory, delay === 0); | ||
} | ||
export function fromIterable(iterable, delay = 0) { | ||
return new FromIterableObservable(iterable, delay); | ||
} | ||
//# sourceMappingURL=fromIterable.js.map |
@@ -1,9 +0,9 @@ | ||
import { observableMixin } from "./observable"; | ||
import { producerMixin } from "./producer"; | ||
class FromScheduledValuesProducer { | ||
import { createScheduledObservable } from "./observable"; | ||
import { AbstractProducer } from "./producer"; | ||
const alwaysTrue = () => true; | ||
class FromScheduledValuesProducer extends AbstractProducer { | ||
constructor(subscriber, values) { | ||
this.subscriber = subscriber; | ||
super(subscriber); | ||
this.values = values; | ||
this.index = 0; | ||
this.run = producerMixin.run; | ||
const [delay] = this.values[0]; | ||
@@ -13,36 +13,28 @@ this.delay = delay; | ||
produce(shouldYield) { | ||
const subscriber = this.subscriber; | ||
const values = this.values; | ||
const length = values.length; | ||
let index = this.index; | ||
while (index < values.length && !subscriber.isDisposed) { | ||
let isDisposed = this.isDisposed; | ||
shouldYield = shouldYield || alwaysTrue; | ||
while (index < length && !isDisposed) { | ||
const [, value] = values[index]; | ||
this.notify(value); | ||
index++; | ||
subscriber.notify(value); | ||
if (index < values.length) { | ||
const delay = values[index][0] || 0; | ||
if (delay > 0 || (shouldYield !== undefined && shouldYield())) { | ||
this.index = index; | ||
this.delay = delay; | ||
return this; | ||
} | ||
const shouldYieldDueToDelay = index < length && values[index][0] > 0; | ||
isDisposed = this.isDisposed; | ||
if (index < length && | ||
!isDisposed && | ||
(shouldYieldDueToDelay || shouldYield())) { | ||
this.index = index; | ||
this.delay = values[index][0]; | ||
return; | ||
} | ||
} | ||
subscriber.dispose(); | ||
return; | ||
this.dispose(); | ||
} | ||
} | ||
class FromScheduledValuesObservable { | ||
constructor(values) { | ||
this.values = values; | ||
this.enumerate = observableMixin.enumerate; | ||
this.isSynchronous = false; | ||
} | ||
subscribe(subscriber) { | ||
const producer = new FromScheduledValuesProducer(subscriber, this.values); | ||
subscriber.schedule(producer); | ||
} | ||
} | ||
export function fromScheduledValues(...values) { | ||
return new FromScheduledValuesObservable(values); | ||
const factory = (subscriber) => new FromScheduledValuesProducer(subscriber, values); | ||
return createScheduledObservable(factory, false); | ||
} | ||
//# sourceMappingURL=fromScheduledValues.js.map |
@@ -1,29 +0,27 @@ | ||
import { observableMixin } from "./observable"; | ||
import { producerMixin } from "./producer"; | ||
class GenerateProducer { | ||
import { createScheduledObservable } from "./observable"; | ||
import { AbstractProducer } from "./producer"; | ||
const alwaysTrue = () => true; | ||
class GenerateProducer extends AbstractProducer { | ||
constructor(subscriber, generator, acc, delay) { | ||
this.subscriber = subscriber; | ||
super(subscriber); | ||
this.generator = generator; | ||
this.acc = acc; | ||
this.delay = delay; | ||
this.run = producerMixin.run; | ||
} | ||
produce(shouldYield) { | ||
const generator = this.generator; | ||
const subscriber = this.subscriber; | ||
const delay = this.delay; | ||
let acc = this.acc; | ||
let result = undefined; | ||
if (this.delay > 0 && !subscriber.isDisposed) { | ||
subscriber.notify(acc); | ||
this.acc = generator(acc); | ||
result = this; | ||
} | ||
else if (shouldYield !== undefined) { | ||
while (!subscriber.isDisposed) { | ||
subscriber.notify(acc); | ||
acc = generator(acc); | ||
if (shouldYield()) { | ||
let isDisposed = this.isDisposed; | ||
if (delay > 0 || shouldYield !== undefined) { | ||
shouldYield = shouldYield || alwaysTrue; | ||
while (!isDisposed) { | ||
this.notify(acc); | ||
isDisposed = this.isDisposed; | ||
if (!isDisposed) { | ||
acc = generator(acc); | ||
} | ||
if (!isDisposed && (delay > 0 || shouldYield())) { | ||
this.acc = acc; | ||
result = this; | ||
break; | ||
return; | ||
} | ||
@@ -33,26 +31,16 @@ } | ||
else { | ||
while (!subscriber.isDisposed) { | ||
subscriber.notify(acc); | ||
acc = generator(acc); | ||
while (!isDisposed) { | ||
this.notify(acc); | ||
isDisposed = this.isDisposed; | ||
if (!isDisposed) { | ||
acc = generator(acc); | ||
} | ||
} | ||
} | ||
return result; | ||
} | ||
} | ||
class GenerateObservable { | ||
constructor(generator, acc, delay) { | ||
this.generator = generator; | ||
this.acc = acc; | ||
this.delay = delay; | ||
this.enumerate = observableMixin.enumerate; | ||
this.isSynchronous = delay === 0; | ||
} | ||
subscribe(subscriber) { | ||
const producer = new GenerateProducer(subscriber, this.generator, this.acc, this.delay); | ||
subscriber.schedule(producer); | ||
} | ||
} | ||
export function generate(generator, initialValue, delay = 0) { | ||
return new GenerateObservable(generator, initialValue(), delay); | ||
const factory = (subscriber) => new GenerateProducer(subscriber, generator, initialValue(), delay); | ||
return createScheduledObservable(factory, delay === 0); | ||
} | ||
//# sourceMappingURL=generate.js.map |
@@ -1,2 +0,2 @@ | ||
import { observableMixin } from "./observable"; | ||
import { enumerate } from "./observable"; | ||
class LiftedObservable { | ||
@@ -7,3 +7,3 @@ constructor(source, operators, isSynchronous) { | ||
this.isSynchronous = isSynchronous; | ||
this.enumerate = observableMixin.enumerate; | ||
this.enumerate = enumerate; | ||
} | ||
@@ -10,0 +10,0 @@ subscribe(subscriber) { |
@@ -1,2 +0,2 @@ | ||
import { observableMixin } from "./observable"; | ||
import { enumerate } from "./observable"; | ||
import { AbstractDelegatingSubscriber } from "./subscriber"; | ||
@@ -22,3 +22,3 @@ class MergeSubscriber extends AbstractDelegatingSubscriber { | ||
this.observables = observables; | ||
this.enumerate = observableMixin.enumerate; | ||
this.enumerate = enumerate; | ||
this.isSynchronous = false; | ||
@@ -25,0 +25,0 @@ } |
@@ -1,5 +0,5 @@ | ||
import { observableMixin } from "./observable"; | ||
import { enumerate } from "./observable"; | ||
class NeverObservable { | ||
constructor() { | ||
this.enumerate = observableMixin.enumerate; | ||
this.enumerate = enumerate; | ||
this.isSynchronous = false; | ||
@@ -6,0 +6,0 @@ } |
import { EnumeratorLike } from "@reactive-js/enumerable"; | ||
import { ObservableLike } from "./interfaces"; | ||
export declare const observableMixin: { | ||
enumerate<T>(this: ObservableLike<T>): EnumeratorLike<void, T>; | ||
}; | ||
import { SchedulerContinuationLike } from "@reactive-js/scheduler"; | ||
import { ObservableLike, SubscriberLike } from "./interfaces"; | ||
export declare function enumerate<T>(this: ObservableLike<T>): EnumeratorLike<void, T>; | ||
export declare const createScheduledObservable: <T>(factory: (subscriber: SubscriberLike<T>) => SchedulerContinuationLike, isSynchronous: boolean) => ObservableLike<T>; |
@@ -1,8 +0,12 @@ | ||
import { AbstractEnumerator } from "@reactive-js/enumerable"; | ||
import { add, createDisposable, dispose, } from "@reactive-js/disposable"; | ||
const alwaysTrue = () => true; | ||
class EnumeratorSubscriber extends AbstractEnumerator { | ||
class EnumeratorSubscriber { | ||
constructor() { | ||
super(); | ||
this.continuations = []; | ||
this.add = add; | ||
this.disposable = createDisposable(() => { this.isDisposed = true; }); | ||
this.dispose = dispose; | ||
this.error = undefined; | ||
this.hasCurrent = false; | ||
this.isDisposed = false; | ||
this.now = 0; | ||
@@ -14,2 +18,3 @@ this.add(error => { | ||
move() { | ||
const continuations = this.continuations; | ||
this.hasCurrent = false; | ||
@@ -19,6 +24,10 @@ this.current = undefined; | ||
while (!this.hasCurrent) { | ||
if (this.isDisposed || this.continuation === undefined) { | ||
const continuation = continuations.shift(); | ||
if (continuation === undefined || continuation.isDisposed) { | ||
break; | ||
} | ||
this.continuation = this.continuation.run(alwaysTrue) || undefined; | ||
continuation.run(alwaysTrue); | ||
if (!continuation.isDisposed) { | ||
continuations.push(continuation); | ||
} | ||
const error = this.error; | ||
@@ -37,15 +46,25 @@ if (error !== undefined) { | ||
schedule(continuation, delay = 0) { | ||
if (!this.isDisposed && delay === 0) { | ||
this.continuation = continuation; | ||
this.add(continuation); | ||
if (!continuation.isDisposed && delay === 0) { | ||
this.continuations.push(continuation); | ||
} | ||
return this; | ||
} | ||
} | ||
export const observableMixin = { | ||
enumerate() { | ||
const subscriber = new EnumeratorSubscriber(); | ||
this.subscribe(subscriber); | ||
return subscriber; | ||
}, | ||
}; | ||
export function enumerate() { | ||
const subscriber = new EnumeratorSubscriber(); | ||
this.subscribe(subscriber); | ||
return subscriber; | ||
} | ||
class ScheduledObservable { | ||
constructor(factory, isSynchronous) { | ||
this.factory = factory; | ||
this.isSynchronous = isSynchronous; | ||
this.enumerate = enumerate; | ||
} | ||
subscribe(subscriber) { | ||
const schedulerContinuation = this.factory(subscriber); | ||
subscriber.schedule(schedulerContinuation); | ||
} | ||
} | ||
export const createScheduledObservable = (factory, isSynchronous) => new ScheduledObservable(factory, isSynchronous); | ||
//# sourceMappingURL=observable.js.map |
@@ -0,10 +1,15 @@ | ||
import { add, dispose } from "@reactive-js/disposable"; | ||
import { SchedulerContinuationLike } from "@reactive-js/scheduler"; | ||
import { SubscriberLike } from "./interfaces"; | ||
interface ProducerLike<T> extends SchedulerContinuationLike { | ||
readonly subscriber: SubscriberLike<T>; | ||
produce(shouldYield?: () => boolean): SchedulerContinuationLike | void; | ||
export declare abstract class AbstractProducer<T> implements SchedulerContinuationLike { | ||
private readonly subscriber; | ||
readonly add: typeof add; | ||
readonly disposable: import("@reactive-js/disposable").DisposableLike; | ||
abstract readonly delay: number; | ||
readonly dispose: typeof dispose; | ||
isDisposed: boolean; | ||
constructor(subscriber: SubscriberLike<T>); | ||
notify(next: T): void; | ||
abstract produce(shouldYield?: () => boolean): void; | ||
run(shouldYield?: () => boolean): void; | ||
} | ||
export declare const producerMixin: { | ||
run: <T>(this: ProducerLike<T>, shouldYield?: (() => boolean) | undefined) => SchedulerContinuationLike | undefined; | ||
}; | ||
export {}; |
@@ -1,16 +0,28 @@ | ||
export const producerMixin = { | ||
run: function run(shouldYield) { | ||
try { | ||
const result = this.produce(shouldYield); | ||
if (result !== undefined) { | ||
return result; | ||
import { add, createDisposable, dispose } from "@reactive-js/disposable"; | ||
export class AbstractProducer { | ||
constructor(subscriber) { | ||
this.subscriber = subscriber; | ||
this.add = add; | ||
this.disposable = createDisposable(_ => { | ||
this.isDisposed = true; | ||
}); | ||
this.dispose = dispose; | ||
this.isDisposed = false; | ||
this.add(subscriber); | ||
} | ||
notify(next) { | ||
this.subscriber.notify(next); | ||
} | ||
run(shouldYield) { | ||
if (!this.isDisposed) { | ||
try { | ||
this.produce(shouldYield); | ||
} | ||
catch (cause) { | ||
const error = { cause }; | ||
this.dispose(error); | ||
} | ||
} | ||
catch (cause) { | ||
const error = { cause }; | ||
this.subscriber.dispose(error); | ||
} | ||
return; | ||
}, | ||
}; | ||
} | ||
} | ||
//# sourceMappingURL=producer.js.map |
@@ -6,3 +6,3 @@ import { pipe } from "@reactive-js/pipe"; | ||
import { merge } from "./merge"; | ||
import { observableMixin } from "./observable"; | ||
import { enumerate } from "./observable"; | ||
import { ofValue } from "./ofValue"; | ||
@@ -15,3 +15,3 @@ import { skipFirst } from "./skipFirst"; | ||
this.initialValue = initialValue; | ||
this.enumerate = observableMixin.enumerate; | ||
this.enumerate = enumerate; | ||
this.isSynchronous = false; | ||
@@ -18,0 +18,0 @@ } |
@@ -1,2 +0,2 @@ | ||
import { observableMixin } from "./observable"; | ||
import { enumerate } from "./observable"; | ||
import { createSubject } from "./subject"; | ||
@@ -15,3 +15,3 @@ class SharedObservable { | ||
}; | ||
this.enumerate = observableMixin.enumerate; | ||
this.enumerate = enumerate; | ||
this.isSynchronous = false; | ||
@@ -18,0 +18,0 @@ } |
@@ -1,2 +0,2 @@ | ||
import { observableMixin } from "./observable"; | ||
import { enumerate } from "./observable"; | ||
import { AbstractSubscriber } from "./subscriber"; | ||
@@ -10,3 +10,3 @@ import { toSafeSubscriber } from "./toSafeSubscriber"; | ||
this.replayed = []; | ||
this.enumerate = observableMixin.enumerate; | ||
this.enumerate = enumerate; | ||
this.isSynchronous = false; | ||
@@ -13,0 +13,0 @@ } |
@@ -1,12 +0,8 @@ | ||
import { DisposableLike } from "@reactive-js/disposable"; | ||
import { add, dispose } from "@reactive-js/disposable"; | ||
import { SchedulerContinuationLike, SchedulerLike } from "@reactive-js/scheduler"; | ||
import { SubscriberLike } from "./interfaces"; | ||
export declare abstract class AbstractSubscriber<T> implements SubscriberLike<T> { | ||
readonly add: <This extends DisposableLike>(this: { | ||
disposable: DisposableLike; | ||
} & This, disposable: DisposableLike | ((error?: import("@reactive-js/disposable").ErrorLike | undefined) => void)) => This; | ||
readonly disposable: DisposableLike; | ||
readonly dispose: (this: { | ||
disposable: DisposableLike; | ||
}, error?: import("@reactive-js/disposable").ErrorLike | undefined) => void; | ||
readonly add: typeof add; | ||
readonly disposable: import("@reactive-js/disposable").DisposableLike; | ||
readonly dispose: typeof dispose; | ||
isDisposed: boolean; | ||
@@ -19,3 +15,3 @@ private readonly scheduler; | ||
scheduler: SchedulerLike; | ||
}, continuation: SchedulerContinuationLike): DisposableLike; | ||
}, continuation: SchedulerContinuationLike): void; | ||
} | ||
@@ -22,0 +18,0 @@ export declare abstract class AbstractDelegatingSubscriber<TA, TB> extends AbstractSubscriber<TA> { |
@@ -1,9 +0,9 @@ | ||
import { createDisposable, disposableMixin, } from "@reactive-js/disposable"; | ||
import { add, createDisposable, dispose } from "@reactive-js/disposable"; | ||
export class AbstractSubscriber { | ||
constructor(scheduler) { | ||
this.add = disposableMixin.add; | ||
this.add = add; | ||
this.disposable = createDisposable(_ => { | ||
this.isDisposed = true; | ||
}); | ||
this.dispose = disposableMixin.dispose; | ||
this.dispose = dispose; | ||
this.isDisposed = false; | ||
@@ -16,5 +16,4 @@ this.scheduler = scheduler.scheduler || scheduler; | ||
schedule(continuation) { | ||
const schedulerSubscription = this.scheduler.schedule(continuation); | ||
this.add(schedulerSubscription); | ||
return schedulerSubscription; | ||
this.add(continuation); | ||
this.scheduler.schedule(continuation); | ||
} | ||
@@ -21,0 +20,0 @@ } |
import { ObservableLike } from "./interfaces"; | ||
export declare const throws: <T>(factory: () => unknown, delay?: number) => ObservableLike<T>; | ||
export declare const throws: <T>(errorFactory: () => unknown, delay?: number) => ObservableLike<T>; |
@@ -1,27 +0,17 @@ | ||
import { observableMixin } from "./observable"; | ||
import { producerMixin } from "./producer"; | ||
class ThrowsProducer { | ||
import { createScheduledObservable } from "./observable"; | ||
import { AbstractProducer } from "./producer"; | ||
class ThrowsProducer extends AbstractProducer { | ||
constructor(subscriber, error, delay) { | ||
this.subscriber = subscriber; | ||
super(subscriber); | ||
this.error = error; | ||
this.delay = delay; | ||
this.run = producerMixin.run; | ||
} | ||
produce(_) { | ||
this.subscriber.dispose(this.error); | ||
this.dispose(this.error); | ||
} | ||
} | ||
class ThrowsObservable { | ||
constructor(factory, delay) { | ||
this.factory = factory; | ||
this.delay = delay; | ||
this.enumerate = observableMixin.enumerate; | ||
this.isSynchronous = delay === 0; | ||
} | ||
subscribe(subscriber) { | ||
const producer = new ThrowsProducer(subscriber, { cause: this.factory() }, this.delay); | ||
subscriber.schedule(producer); | ||
} | ||
} | ||
export const throws = (factory, delay = 0) => new ThrowsObservable(factory, delay); | ||
export const throws = (errorFactory, delay = 0) => { | ||
const factory = (subscriber) => new ThrowsProducer(subscriber, { cause: errorFactory() }, delay); | ||
return createScheduledObservable(factory, delay === 0); | ||
}; | ||
//# sourceMappingURL=throws.js.map |
@@ -1,6 +0,57 @@ | ||
import { producerMixin } from "./producer"; | ||
import { add, createDisposable, dispose, } from "@reactive-js/disposable"; | ||
import { AbstractDelegatingSubscriber } from "./subscriber"; | ||
class SafeSubscriberSchedulerContinuation { | ||
constructor(subscriber) { | ||
this.subscriber = subscriber; | ||
this.add = add; | ||
this.delay = 0; | ||
this.disposable = createDisposable(_ => { | ||
this.isDisposed = true; | ||
}); | ||
this.dispose = dispose; | ||
this.isDisposed = false; | ||
} | ||
produce(shouldYield) { | ||
const subscriber = this.subscriber; | ||
const nextQueue = subscriber.nextQueue; | ||
const delegate = subscriber.delegate; | ||
if (shouldYield !== undefined) { | ||
while (nextQueue.length > 0 && !delegate.isDisposed) { | ||
const next = nextQueue.shift(); | ||
delegate.notify(next); | ||
const hasRemainingEvents = subscriber.nextQueue.length > 0 || subscriber.isDisposed; | ||
if (hasRemainingEvents && shouldYield()) { | ||
return; | ||
} | ||
} | ||
} | ||
else { | ||
while (nextQueue.length > 0 && !delegate.isDisposed) { | ||
const next = nextQueue.shift(); | ||
delegate.notify(next); | ||
} | ||
} | ||
if (subscriber.isDisposed) { | ||
delegate.dispose(subscriber.error); | ||
} | ||
this.dispose(); | ||
} | ||
run(shouldYield) { | ||
if (!this.isDisposed) { | ||
try { | ||
this.produce(shouldYield); | ||
} | ||
catch (cause) { | ||
const error = { cause }; | ||
this.subscriber.dispose(error); | ||
this.dispose(); | ||
} | ||
} | ||
} | ||
} | ||
const scheduleDrainQueue = (subscriber) => { | ||
if (subscriber.remainingEvents === 1) { | ||
subscriber.delegate.schedule(subscriber); | ||
const remainingEvents = subscriber.nextQueue.length + (subscriber.isDisposed ? 1 : 0); | ||
if (remainingEvents === 1) { | ||
const producer = new SafeSubscriberSchedulerContinuation(subscriber); | ||
subscriber.delegate.schedule(producer); | ||
} | ||
@@ -13,3 +64,2 @@ }; | ||
this.nextQueue = []; | ||
this.run = producerMixin.run; | ||
this.add(error => { | ||
@@ -20,5 +70,2 @@ this.error = error; | ||
} | ||
get remainingEvents() { | ||
return this.nextQueue.length + (this.isDisposed ? 1 : 0); | ||
} | ||
notify(next) { | ||
@@ -33,26 +80,4 @@ this.delegate.notify(next); | ||
} | ||
produce(shouldYield) { | ||
const delegate = this.delegate; | ||
const nextQueue = this.nextQueue; | ||
if (shouldYield !== undefined) { | ||
while (nextQueue.length > 0 && !delegate.isDisposed) { | ||
const next = nextQueue.shift(); | ||
delegate.notify(next); | ||
if (shouldYield() && this.remainingEvents > 0) { | ||
return this; | ||
} | ||
} | ||
} | ||
else { | ||
while (nextQueue.length > 0 && !delegate.isDisposed) { | ||
const next = nextQueue.shift(); | ||
delegate.notify(next); | ||
} | ||
} | ||
if (this.isDisposed) { | ||
delegate.dispose(this.error); | ||
} | ||
} | ||
} | ||
export const toSafeSubscriber = (subscriber) => new SafeSubscriberImpl(subscriber); | ||
//# sourceMappingURL=toSafeSubscriber.js.map |
@@ -1,2 +0,2 @@ | ||
import { observableMixin } from "./observable"; | ||
import { enumerate } from "./observable"; | ||
class UsingObservable { | ||
@@ -6,3 +6,3 @@ constructor(resourceFactory, observableFactory) { | ||
this.observableFactory = observableFactory; | ||
this.enumerate = observableMixin.enumerate; | ||
this.enumerate = enumerate; | ||
this.isSynchronous = false; | ||
@@ -9,0 +9,0 @@ } |
@@ -1,4 +0,4 @@ | ||
import { producerMixin } from "./producer"; | ||
import { AbstractProducer } from "./producer"; | ||
import { AbstractDelegatingSubscriber } from "./subscriber"; | ||
import { observableMixin } from "./observable"; | ||
import { enumerate } from "./observable"; | ||
const shouldEmit = (enumerators) => { | ||
@@ -81,9 +81,9 @@ for (const enumerator of enumerators) { | ||
} | ||
class ZipProducer { | ||
class ZipProducer extends AbstractProducer { | ||
constructor(subscriber, enumerators, selector) { | ||
this.subscriber = subscriber; | ||
super(subscriber); | ||
this.enumerators = enumerators; | ||
this.selector = selector; | ||
this.delay = 0; | ||
this.hasCurrent = false; | ||
this.run = producerMixin.run; | ||
} | ||
@@ -93,12 +93,15 @@ produce(shouldYield) { | ||
const selector = this.selector; | ||
const subscriber = this.subscriber; | ||
if (shouldYield !== undefined) { | ||
while (shouldEmit(enumerators) && !subscriber.isDisposed) { | ||
let isDisposed = this.isDisposed; | ||
let shouldEmitNext = shouldEmit(enumerators); | ||
while (shouldEmitNext && !isDisposed) { | ||
const next = selector(...enumerators.map(getCurrent)); | ||
subscriber.notify(next); | ||
this.notify(next); | ||
isDisposed = this.isDisposed; | ||
for (const buffer of enumerators) { | ||
buffer.move(); | ||
} | ||
if (shouldYield()) { | ||
return this; | ||
shouldEmitNext = shouldEmit(enumerators); | ||
if (shouldEmitNext && !isDisposed && shouldYield()) { | ||
return; | ||
} | ||
@@ -108,3 +111,3 @@ } | ||
else { | ||
while (shouldEmit(enumerators) && !subscriber.isDisposed) { | ||
while (shouldEmit(enumerators) && !this.isDisposed) { | ||
const next = selector(...enumerators.map(getCurrent)); | ||
@@ -114,7 +117,6 @@ for (const enumerator of enumerators) { | ||
} | ||
subscriber.notify(next); | ||
this.notify(next); | ||
} | ||
} | ||
subscriber.dispose(); | ||
return; | ||
this.dispose(); | ||
} | ||
@@ -126,3 +128,3 @@ } | ||
this.selector = selector; | ||
this.enumerate = observableMixin.enumerate; | ||
this.enumerate = enumerate; | ||
this.isSynchronous = observables.every(obs => obs.isSynchronous); | ||
@@ -129,0 +131,0 @@ } |
export { MulticastObservableLike, ObserverLike, ObservableLike, ObservableOperatorLike, SafeSubscriberLike, SubjectLike, SubscriberLike, SubscriberOperatorLike, } from "./internal/interfaces"; | ||
export { observableMixin } from "./internal/observable"; | ||
export { AbstractSubscriber, AbstractDelegatingSubscriber, } from "./internal/subscriber"; | ||
export { enumerate } from "./internal/observable"; | ||
export { AbstractDelegatingSubscriber } from "./internal/subscriber"; | ||
export { combineLatest } from "./internal/combineLatest"; | ||
@@ -5,0 +5,0 @@ export { createObservable } from "./internal/createObservable"; |
import { OperatorLike } from "@reactive-js/pipe"; | ||
import { VirtualTimeSchedulerLike } from "@reactive-js/scheduler"; | ||
import { ObservableLike } from "./interfaces"; | ||
export declare const forEach: (schedulerFactory: () => VirtualTimeSchedulerLike) => <T>(onNotify: (next: T) => void) => OperatorLike<ObservableLike<T>, void>; | ||
export declare const forEach: <T>(schedulerFactory: () => VirtualTimeSchedulerLike, onNotify: (next: T) => void) => OperatorLike<ObservableLike<T>, void>; | ||
//# sourceMappingURL=forEach.d.ts.map |
import { EnumeratorLike } from "@reactive-js/enumerable"; | ||
import { ObservableLike } from "./interfaces"; | ||
export declare const observableMixin: { | ||
enumerate<T>(this: ObservableLike<T>): EnumeratorLike<void, T>; | ||
}; | ||
import { SchedulerContinuationLike } from "@reactive-js/scheduler"; | ||
import { ObservableLike, SubscriberLike } from "./interfaces"; | ||
export declare function enumerate<T>(this: ObservableLike<T>): EnumeratorLike<void, T>; | ||
export declare const createScheduledObservable: <T>(factory: (subscriber: SubscriberLike<T>) => SchedulerContinuationLike, isSynchronous: boolean) => ObservableLike<T>; | ||
//# sourceMappingURL=observable.d.ts.map |
@@ -0,11 +1,16 @@ | ||
import { add, dispose } from "@reactive-js/disposable"; | ||
import { SchedulerContinuationLike } from "@reactive-js/scheduler"; | ||
import { SubscriberLike } from "./interfaces"; | ||
interface ProducerLike<T> extends SchedulerContinuationLike { | ||
readonly subscriber: SubscriberLike<T>; | ||
produce(shouldYield?: () => boolean): SchedulerContinuationLike | void; | ||
export declare abstract class AbstractProducer<T> implements SchedulerContinuationLike { | ||
private readonly subscriber; | ||
readonly add: typeof add; | ||
readonly disposable: import("@reactive-js/disposable").DisposableLike; | ||
abstract readonly delay: number; | ||
readonly dispose: typeof dispose; | ||
isDisposed: boolean; | ||
constructor(subscriber: SubscriberLike<T>); | ||
notify(next: T): void; | ||
abstract produce(shouldYield?: () => boolean): void; | ||
run(shouldYield?: () => boolean): void; | ||
} | ||
export declare const producerMixin: { | ||
run: <T>(this: ProducerLike<T>, shouldYield?: (() => boolean) | undefined) => SchedulerContinuationLike | undefined; | ||
}; | ||
export {}; | ||
//# sourceMappingURL=producer.d.ts.map |
@@ -1,12 +0,8 @@ | ||
import { DisposableLike } from "@reactive-js/disposable"; | ||
import { add, dispose } from "@reactive-js/disposable"; | ||
import { SchedulerContinuationLike, SchedulerLike } from "@reactive-js/scheduler"; | ||
import { SubscriberLike } from "./interfaces"; | ||
export declare abstract class AbstractSubscriber<T> implements SubscriberLike<T> { | ||
readonly add: <This extends DisposableLike>(this: { | ||
disposable: DisposableLike; | ||
} & This, disposable: DisposableLike | ((error?: import("@reactive-js/disposable").ErrorLike | undefined) => void)) => This; | ||
readonly disposable: DisposableLike; | ||
readonly dispose: (this: { | ||
disposable: DisposableLike; | ||
}, error?: import("@reactive-js/disposable").ErrorLike | undefined) => void; | ||
readonly add: typeof add; | ||
readonly disposable: import("@reactive-js/disposable").DisposableLike; | ||
readonly dispose: typeof dispose; | ||
isDisposed: boolean; | ||
@@ -19,3 +15,3 @@ private readonly scheduler; | ||
scheduler: SchedulerLike; | ||
}, continuation: SchedulerContinuationLike): DisposableLike; | ||
}, continuation: SchedulerContinuationLike): void; | ||
} | ||
@@ -22,0 +18,0 @@ export declare abstract class AbstractDelegatingSubscriber<TA, TB> extends AbstractSubscriber<TA> { |
import { ObservableLike } from "./interfaces"; | ||
export declare const throws: <T>(factory: () => unknown, delay?: number) => ObservableLike<T>; | ||
export declare const throws: <T>(errorFactory: () => unknown, delay?: number) => ObservableLike<T>; | ||
//# sourceMappingURL=throws.d.ts.map |
{ | ||
"name": "@reactive-js/observable", | ||
"version": "0.0.30", | ||
"version": "0.0.31", | ||
"main": "dist/cjs/index.js", | ||
@@ -41,9 +41,9 @@ "module": "dist/esm5/index.js", | ||
"dependencies": { | ||
"@reactive-js/disposable": "^0.0.30", | ||
"@reactive-js/enumerable": "^0.0.30", | ||
"@reactive-js/pipe": "^0.0.30", | ||
"@reactive-js/scheduler": "^0.0.30" | ||
"@reactive-js/disposable": "^0.0.31", | ||
"@reactive-js/enumerable": "^0.0.31", | ||
"@reactive-js/pipe": "^0.0.31", | ||
"@reactive-js/scheduler": "^0.0.31" | ||
}, | ||
"devDependencies": { | ||
"@reactive-js/schedulers": "^0.0.30", | ||
"@reactive-js/schedulers": "^0.0.31", | ||
"@types/jest": "^24.0.23", | ||
@@ -74,3 +74,3 @@ "jest": "^24.9.0", | ||
}, | ||
"gitHead": "f231e2dd36697e368790180dddb4eb16eb713856" | ||
"gitHead": "c950e60d18f5bb13f5890123c05728b4edf9c3c0" | ||
} |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
382776
459
4905
+ Added@reactive-js/disposable@0.0.31(transitive)
+ Added@reactive-js/enumerable@0.0.31(transitive)
+ Added@reactive-js/pipe@0.0.31(transitive)
+ Added@reactive-js/scheduler@0.0.31(transitive)
- Removed@reactive-js/disposable@0.0.30(transitive)
- Removed@reactive-js/enumerable@0.0.30(transitive)
- Removed@reactive-js/pipe@0.0.30(transitive)
- Removed@reactive-js/scheduler@0.0.30(transitive)
Updated@reactive-js/pipe@^0.0.31