@reactive-js/core
Advanced tools
Comparing version 0.4.0 to 0.5.0
35
node.js
@@ -56,3 +56,5 @@ 'use strict'; | ||
readableValue.pause(); | ||
const modeSubscription = functions.pipe(mode, observable.subscribe(observer, ev => { | ||
disposable.addDisposableDisposeParentOnChildError(observer, readable); | ||
disposable.addDisposable(readable, dispatcher); | ||
const modeSubscription = functions.pipe(mode, observable.subscribe(observer.scheduler, ev => { | ||
switch (ev) { | ||
@@ -67,4 +69,3 @@ case "pause": | ||
})); | ||
disposable.addDisposable(observer, readable); | ||
disposable.addDisposableDisposeParentOnChildError(readable, modeSubscription); | ||
disposable.addDisposableDisposeParentOnChildError(observer, modeSubscription); | ||
const onData = observable.dispatchTo(dispatcher); | ||
@@ -76,7 +77,2 @@ const onEnd = () => { | ||
readableValue.on("end", onEnd); | ||
disposable.addDisposable(readable, dispatcher); | ||
disposable.addTeardown(dispatcher, _ => { | ||
readableValue.removeListener("data", onData); | ||
readableValue.removeListener("end", onEnd); | ||
}); | ||
})); | ||
@@ -90,3 +86,5 @@ const readFileIOSource = (path, options) => createReadableIOSource(() => functions.pipe(fs__default["default"].createReadStream(path, options), createDisposableNodeStream)); | ||
const writableValue = writable.value; | ||
const streamEventsSubscription = functions.pipe(events, observable.subscribe(observer, ev => { | ||
disposable.addDisposableDisposeParentOnChildError(observer, writable); | ||
disposable.addDisposable(writable, dispatcher); | ||
const streamEventsSubscription = functions.pipe(events, observable.subscribe(observer.scheduler, ev => { | ||
// FIXME: when writing to an outgoing node ServerResponse with a UInt8Array | ||
@@ -100,7 +98,6 @@ // node throws a type Error regarding expecting a Buffer, though the docs | ||
})); | ||
disposable.addDisposableDisposeParentOnChildError(observer, streamEventsSubscription); | ||
disposable.addOnDisposedWithoutErrorTeardown(streamEventsSubscription, () => { | ||
writableValue.end(); | ||
}); | ||
disposable.addDisposableDisposeParentOnChildError(writable, streamEventsSubscription); | ||
disposable.addDisposable(observer, writable); | ||
const onDrain = functions.defer("resume", observable.dispatchTo(dispatcher)); | ||
@@ -112,8 +109,2 @@ const onFinish = functions.defer(dispatcher, disposable.dispose()); | ||
writableValue.on(NODE_JS_PAUSE_EVENT, onPause); | ||
disposable.addDisposable(writable, dispatcher); | ||
disposable.addTeardown(dispatcher, _ => { | ||
writableValue.removeListener("drain", onDrain); | ||
writableValue.removeListener("finish", onFinish); | ||
writableValue.removeListener(NODE_JS_PAUSE_EVENT, onPause); | ||
}); | ||
dispatcher.dispatch("resume"); | ||
@@ -131,8 +122,10 @@ })); | ||
}); | ||
const transformReadableStream = functions.pipe(createReadableIOSource(functions.returns(transform)), streamable.stream(observer)); | ||
const sinkSubscription = functions.pipe(streamable.sink(src, transformSink), observable.subscribe(observer)); | ||
const modeSubscription = functions.pipe(modeObs, observable.subscribe(observer, transformReadableStream.dispatch, transformReadableStream)); | ||
const transformReadableStream = functions.pipe(createReadableIOSource(functions.returns(transform)), streamable.stream(observer.scheduler)); | ||
disposable.addDisposableDisposeParentOnChildError(observer, transformReadableStream); | ||
const sinkSubscription = functions.pipe(streamable.sink(src, transformSink), observable.subscribe(observer.scheduler)); | ||
disposable.addDisposableDisposeParentOnChildError(observer, sinkSubscription); | ||
const modeSubscription = functions.pipe(modeObs, observable.subscribe(observer.scheduler, transformReadableStream.dispatch, transformReadableStream)); | ||
disposable.addDisposableDisposeParentOnChildError(observer, modeSubscription); | ||
disposable.addDisposableDisposeParentOnChildError(transformReadableStream, sinkSubscription); | ||
disposable.addDisposableDisposeParentOnChildError(transformReadableStream, modeSubscription); | ||
disposable.addDisposable(observer, transformReadableStream); | ||
functions.pipe(transformReadableStream, source.sinkInto(observer)); | ||
@@ -139,0 +132,0 @@ })); |
import { AbstractDisposableContainer, Concat, FromArray, FromIterator, FromIterable, Using, Map, ConcatAll, Repeat, TakeFirst, Zip, DecodeWithCharset, DistinctUntilChanged, EverySatisfy, Keep, Pairwise, Reduce, Scan, SkipFirst, SomeSatisfy, TakeLast, TakeWhile, ThrowIfEmpty } from './container.js'; | ||
import { DisposableLike, DisposableOrTeardown } from './disposable.js'; | ||
import { SideEffect1, Factory, Function1, Function2, Function3, Function4, Function5, Function6, SideEffect, SideEffect2, SideEffect3, SideEffect4, SideEffect5, SideEffect6, Updater, Predicate, Equality, Reducer } from './functions.js'; | ||
import { SchedulerLike, SchedulerContinuationLike, VirtualTimeSchedulerLike } from './scheduler.js'; | ||
import { SchedulerLike, VirtualTimeSchedulerLike } from './scheduler.js'; | ||
import { SinkLike, AbstractSource, AbstractDisposableSource, SourceLike } from './source.js'; | ||
@@ -13,21 +13,7 @@ import { Option } from './option.js'; | ||
*/ | ||
declare class Observer<T> extends AbstractDisposableContainer implements SinkLike<T>, SchedulerLike { | ||
declare class Observer<T> extends AbstractDisposableContainer implements SinkLike<T> { | ||
readonly scheduler: SchedulerLike; | ||
inContinuation: boolean; | ||
private readonly _scheduler; | ||
constructor(scheduler: SchedulerLike); | ||
/** @ignore */ | ||
get now(): number; | ||
/** @ignore */ | ||
get shouldYield(): boolean; | ||
assertState(this: Observer<T>): void; | ||
notify(_: T): void; | ||
/** @ignore */ | ||
onRunStatusChanged(status: boolean): void; | ||
/** @ignore */ | ||
requestYield(): void; | ||
/** @ignore */ | ||
schedule(continuation: SchedulerContinuationLike, options?: { | ||
readonly delay?: number; | ||
}): void; | ||
} | ||
@@ -34,0 +20,0 @@ |
{ | ||
"name": "@reactive-js/core", | ||
"version": "0.4.0", | ||
"version": "0.5.0", | ||
"keywords": [ | ||
@@ -45,3 +45,3 @@ "asynchronous", | ||
}, | ||
"gitHead": "4c9c6283eaf89d607379894927792a42d7bb3df3" | ||
"gitHead": "9d88535ad822a660cf0bdec11be2b0fb36a11d77" | ||
} |
@@ -70,2 +70,6 @@ 'use strict'; | ||
schedule(continuation, { priority, delay = 0, }) { | ||
disposable.addDisposable(this, continuation); | ||
if (continuation.isDisposed) { | ||
return; | ||
} | ||
const callback = () => { | ||
@@ -72,0 +76,0 @@ functions.pipe(callbackNodeDisposable, disposable.dispose()); |
@@ -47,5 +47,2 @@ import { DisposableLike } from './disposable.js'; | ||
interface SchedulerContinuationRunStatusChangedListenerLike { | ||
onRunStatusChanged(this: SchedulerContinuationRunStatusChangedListenerLike, state: boolean): void; | ||
} | ||
/** | ||
@@ -57,4 +54,2 @@ * A unit of work to be executed by a scheduler. | ||
interface SchedulerContinuationLike extends DisposableLike { | ||
addListener(this: SchedulerContinuationLike, ev: "onRunStatusChanged", listener: SchedulerContinuationRunStatusChangedListenerLike): void; | ||
removeListener(this: SchedulerContinuationLike, ev: "onRunStatusChanged", listener: SchedulerContinuationRunStatusChangedListenerLike): void; | ||
/** | ||
@@ -124,2 +119,2 @@ * Work function to be invoked by the scheduler after the specified delay. | ||
export { PausableSchedulerLike, PrioritySchedulerLike, SchedulerContinuationLike, SchedulerContinuationRunStatusChangedListenerLike, SchedulerLike, VirtualTimeSchedulerLike, YieldError, __yield, createHostScheduler, createVirtualTimeScheduler, run, schedule, toPausableScheduler, toPriorityScheduler, toSchedulerWithPriority }; | ||
export { PausableSchedulerLike, PrioritySchedulerLike, SchedulerContinuationLike, SchedulerLike, VirtualTimeSchedulerLike, YieldError, __yield, createHostScheduler, createVirtualTimeScheduler, run, schedule, toPausableScheduler, toPriorityScheduler, toSchedulerWithPriority }; |
@@ -88,7 +88,2 @@ 'use strict'; | ||
const notifyListeners = (listeners, state) => { | ||
for (const listener of listeners) { | ||
listener.onRunStatusChanged(state); | ||
} | ||
}; | ||
const isYieldError = (e) => e instanceof YieldError; | ||
@@ -101,5 +96,2 @@ class YieldError { | ||
let currentScheduler = option.none; | ||
function clearListeners() { | ||
this.listeners = option.none; | ||
} | ||
class SchedulerContinuationImpl extends disposable.AbstractDisposable { | ||
@@ -110,27 +102,7 @@ constructor(scheduler, f) { | ||
this.f = f; | ||
this.listeners = option.none; | ||
} | ||
addListener(_ev, listener) { | ||
if (!this.isDisposed) { | ||
let { listeners } = this; | ||
if (option.isNone(listeners)) { | ||
this.listeners = new Set(); | ||
} | ||
this.listeners.add(listener); | ||
} | ||
} | ||
removeListener(_ev, listener) { | ||
let { listeners } = this; | ||
if (option.isSome(listeners)) { | ||
listeners.delete(listener); | ||
} | ||
} | ||
continue() { | ||
if (!this.isDisposed) { | ||
const { listeners } = this; | ||
let error = option.none; | ||
let yieldError = option.none; | ||
if (option.isSome(listeners)) { | ||
notifyListeners(listeners, true); | ||
} | ||
const oldCurrentScheduler = currentScheduler; | ||
@@ -150,5 +122,2 @@ currentScheduler = this.scheduler; | ||
currentScheduler = oldCurrentScheduler; | ||
if (option.isSome(listeners)) { | ||
notifyListeners(listeners, false); | ||
} | ||
if (option.isSome(yieldError)) { | ||
@@ -176,3 +145,2 @@ this.scheduler.schedule(this, yieldError); | ||
const continuation = new SchedulerContinuationImpl(scheduler, f); | ||
disposable.addTeardown(continuation, clearListeners); | ||
scheduler.schedule(continuation, options); | ||
@@ -179,0 +147,0 @@ return continuation; |
@@ -35,4 +35,4 @@ 'use strict'; | ||
const stream = new StreamImpl(subject, observable$1); | ||
disposable.addDisposable(observable$1, stream); | ||
disposable.addDisposable(stream, subject); | ||
disposable.addDisposableDisposeParentOnChildError(observable$1, stream); | ||
disposable.addDisposableDisposeParentOnChildError(stream, subject); | ||
return stream; | ||
@@ -61,6 +61,7 @@ }; | ||
const op = requests => observable.createObservableUnsafe(observer => { | ||
const srcStream = functions.pipe(src, stream(observer)); | ||
const requestSubscription = functions.pipe(requests, observable.map(functions.compose(...reqOps)), observable.subscribe(observer, srcStream.dispatch, srcStream)); | ||
const srcStream = functions.pipe(src, stream(observer.scheduler)); | ||
disposable.addDisposableDisposeParentOnChildError(observer, srcStream); | ||
const requestSubscription = functions.pipe(requests, observable.map(functions.compose(...reqOps)), observable.subscribe(observer.scheduler, srcStream.dispatch, srcStream)); | ||
disposable.addDisposableDisposeParentOnChildError(observer, requestSubscription); | ||
disposable.bindDisposables(srcStream, requestSubscription); | ||
disposable.addDisposable(observer, srcStream); | ||
functions.pipe(srcStream, functions.compose(...obsOps), source.sinkInto(observer)); | ||
@@ -139,6 +140,6 @@ }); | ||
const toStateStore = () => streamable => createStreamable(updates => observable.createObservableUnsafe(observer => { | ||
const stream$1 = functions.pipe(streamable, stream(observer)); | ||
const updatesSubscription = functions.pipe(updates, observable.zipWithLatestFrom(stream$1, (updateState, prev) => updateState(prev)), observable.subscribe(observer, stream$1.dispatch, stream$1)); | ||
const stream$1 = functions.pipe(streamable, stream(observer.scheduler)); | ||
disposable.addDisposableDisposeParentOnChildError(observer, stream$1); | ||
const updatesSubscription = functions.pipe(updates, observable.zipWithLatestFrom(stream$1, (updateState, prev) => updateState(prev)), observable.subscribe(observer.scheduler, stream$1.dispatch, stream$1)); | ||
disposable.bindDisposables(updatesSubscription, stream$1); | ||
disposable.addDisposable(observer, stream$1); | ||
functions.pipe(stream$1, source.sinkInto(observer)); | ||
@@ -159,3 +160,4 @@ })); | ||
const op = (modeObs) => observable.createObservableUnsafe(observer => { | ||
const pausableScheduler = scheduler.toPausableScheduler(scheduler$1 !== null && scheduler$1 !== void 0 ? scheduler$1 : observer); | ||
const pausableScheduler = scheduler.toPausableScheduler(scheduler$1 !== null && scheduler$1 !== void 0 ? scheduler$1 : observer.scheduler); | ||
disposable.addDisposableDisposeParentOnChildError(observer, pausableScheduler); | ||
const onModeChange = (mode) => { | ||
@@ -171,5 +173,5 @@ switch (mode) { | ||
}; | ||
const modeSubscription = functions.pipe(modeObs, observable.subscribe(observer, onModeChange)); | ||
const modeSubscription = functions.pipe(modeObs, observable.subscribe(observer.scheduler, onModeChange)); | ||
disposable.addDisposableDisposeParentOnChildError(observer, modeSubscription); | ||
disposable.bindDisposables(modeSubscription, pausableScheduler); | ||
disposable.addDisposable(observer, pausableScheduler); | ||
functions.pipe(observable$1, observable.subscribeOn(pausableScheduler), functions.pipe(pausableScheduler, observable.fromDisposable, observable.takeUntil), source.sinkInto(observer)); | ||
@@ -176,0 +178,0 @@ }); |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is too big to display
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
497666
12552