Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

@reactive-js/core

Package Overview
Dependencies
Maintainers
1
Versions
146
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@reactive-js/core - npm Package Compare versions

Comparing version 0.4.0 to 0.5.0

35

node.js

@@ -56,3 +56,5 @@ 'use strict';

readableValue.pause();
const modeSubscription = functions.pipe(mode, observable.subscribe(observer, ev => {
disposable.addDisposableDisposeParentOnChildError(observer, readable);
disposable.addDisposable(readable, dispatcher);
const modeSubscription = functions.pipe(mode, observable.subscribe(observer.scheduler, ev => {
switch (ev) {

@@ -67,4 +69,3 @@ case "pause":

}));
disposable.addDisposable(observer, readable);
disposable.addDisposableDisposeParentOnChildError(readable, modeSubscription);
disposable.addDisposableDisposeParentOnChildError(observer, modeSubscription);
const onData = observable.dispatchTo(dispatcher);

@@ -76,7 +77,2 @@ const onEnd = () => {

readableValue.on("end", onEnd);
disposable.addDisposable(readable, dispatcher);
disposable.addTeardown(dispatcher, _ => {
readableValue.removeListener("data", onData);
readableValue.removeListener("end", onEnd);
});
}));

@@ -90,3 +86,5 @@ const readFileIOSource = (path, options) => createReadableIOSource(() => functions.pipe(fs__default["default"].createReadStream(path, options), createDisposableNodeStream));

const writableValue = writable.value;
const streamEventsSubscription = functions.pipe(events, observable.subscribe(observer, ev => {
disposable.addDisposableDisposeParentOnChildError(observer, writable);
disposable.addDisposable(writable, dispatcher);
const streamEventsSubscription = functions.pipe(events, observable.subscribe(observer.scheduler, ev => {
// FIXME: when writing to an outgoing node ServerResponse with a UInt8Array

@@ -100,7 +98,6 @@ // node throws a type Error regarding expecting a Buffer, though the docs

}));
disposable.addDisposableDisposeParentOnChildError(observer, streamEventsSubscription);
disposable.addOnDisposedWithoutErrorTeardown(streamEventsSubscription, () => {
writableValue.end();
});
disposable.addDisposableDisposeParentOnChildError(writable, streamEventsSubscription);
disposable.addDisposable(observer, writable);
const onDrain = functions.defer("resume", observable.dispatchTo(dispatcher));

@@ -112,8 +109,2 @@ const onFinish = functions.defer(dispatcher, disposable.dispose());

writableValue.on(NODE_JS_PAUSE_EVENT, onPause);
disposable.addDisposable(writable, dispatcher);
disposable.addTeardown(dispatcher, _ => {
writableValue.removeListener("drain", onDrain);
writableValue.removeListener("finish", onFinish);
writableValue.removeListener(NODE_JS_PAUSE_EVENT, onPause);
});
dispatcher.dispatch("resume");

@@ -131,8 +122,10 @@ }));

});
const transformReadableStream = functions.pipe(createReadableIOSource(functions.returns(transform)), streamable.stream(observer));
const sinkSubscription = functions.pipe(streamable.sink(src, transformSink), observable.subscribe(observer));
const modeSubscription = functions.pipe(modeObs, observable.subscribe(observer, transformReadableStream.dispatch, transformReadableStream));
const transformReadableStream = functions.pipe(createReadableIOSource(functions.returns(transform)), streamable.stream(observer.scheduler));
disposable.addDisposableDisposeParentOnChildError(observer, transformReadableStream);
const sinkSubscription = functions.pipe(streamable.sink(src, transformSink), observable.subscribe(observer.scheduler));
disposable.addDisposableDisposeParentOnChildError(observer, sinkSubscription);
const modeSubscription = functions.pipe(modeObs, observable.subscribe(observer.scheduler, transformReadableStream.dispatch, transformReadableStream));
disposable.addDisposableDisposeParentOnChildError(observer, modeSubscription);
disposable.addDisposableDisposeParentOnChildError(transformReadableStream, sinkSubscription);
disposable.addDisposableDisposeParentOnChildError(transformReadableStream, modeSubscription);
disposable.addDisposable(observer, transformReadableStream);
functions.pipe(transformReadableStream, source.sinkInto(observer));

@@ -139,0 +132,0 @@ }));

18

observable.d.ts
import { AbstractDisposableContainer, Concat, FromArray, FromIterator, FromIterable, Using, Map, ConcatAll, Repeat, TakeFirst, Zip, DecodeWithCharset, DistinctUntilChanged, EverySatisfy, Keep, Pairwise, Reduce, Scan, SkipFirst, SomeSatisfy, TakeLast, TakeWhile, ThrowIfEmpty } from './container.js';
import { DisposableLike, DisposableOrTeardown } from './disposable.js';
import { SideEffect1, Factory, Function1, Function2, Function3, Function4, Function5, Function6, SideEffect, SideEffect2, SideEffect3, SideEffect4, SideEffect5, SideEffect6, Updater, Predicate, Equality, Reducer } from './functions.js';
import { SchedulerLike, SchedulerContinuationLike, VirtualTimeSchedulerLike } from './scheduler.js';
import { SchedulerLike, VirtualTimeSchedulerLike } from './scheduler.js';
import { SinkLike, AbstractSource, AbstractDisposableSource, SourceLike } from './source.js';

@@ -13,21 +13,7 @@ import { Option } from './option.js';

*/
declare class Observer<T> extends AbstractDisposableContainer implements SinkLike<T>, SchedulerLike {
declare class Observer<T> extends AbstractDisposableContainer implements SinkLike<T> {
readonly scheduler: SchedulerLike;
inContinuation: boolean;
private readonly _scheduler;
constructor(scheduler: SchedulerLike);
/** @ignore */
get now(): number;
/** @ignore */
get shouldYield(): boolean;
assertState(this: Observer<T>): void;
notify(_: T): void;
/** @ignore */
onRunStatusChanged(status: boolean): void;
/** @ignore */
requestYield(): void;
/** @ignore */
schedule(continuation: SchedulerContinuationLike, options?: {
readonly delay?: number;
}): void;
}

@@ -34,0 +20,0 @@

{
"name": "@reactive-js/core",
"version": "0.4.0",
"version": "0.5.0",
"keywords": [

@@ -45,3 +45,3 @@ "asynchronous",

},
"gitHead": "4c9c6283eaf89d607379894927792a42d7bb3df3"
"gitHead": "9d88535ad822a660cf0bdec11be2b0fb36a11d77"
}

@@ -70,2 +70,6 @@ 'use strict';

schedule(continuation, { priority, delay = 0, }) {
disposable.addDisposable(this, continuation);
if (continuation.isDisposed) {
return;
}
const callback = () => {

@@ -72,0 +76,0 @@ functions.pipe(callbackNodeDisposable, disposable.dispose());

@@ -47,5 +47,2 @@ import { DisposableLike } from './disposable.js';

interface SchedulerContinuationRunStatusChangedListenerLike {
onRunStatusChanged(this: SchedulerContinuationRunStatusChangedListenerLike, state: boolean): void;
}
/**

@@ -57,4 +54,2 @@ * A unit of work to be executed by a scheduler.

interface SchedulerContinuationLike extends DisposableLike {
addListener(this: SchedulerContinuationLike, ev: "onRunStatusChanged", listener: SchedulerContinuationRunStatusChangedListenerLike): void;
removeListener(this: SchedulerContinuationLike, ev: "onRunStatusChanged", listener: SchedulerContinuationRunStatusChangedListenerLike): void;
/**

@@ -124,2 +119,2 @@ * Work function to be invoked by the scheduler after the specified delay.

export { PausableSchedulerLike, PrioritySchedulerLike, SchedulerContinuationLike, SchedulerContinuationRunStatusChangedListenerLike, SchedulerLike, VirtualTimeSchedulerLike, YieldError, __yield, createHostScheduler, createVirtualTimeScheduler, run, schedule, toPausableScheduler, toPriorityScheduler, toSchedulerWithPriority };
export { PausableSchedulerLike, PrioritySchedulerLike, SchedulerContinuationLike, SchedulerLike, VirtualTimeSchedulerLike, YieldError, __yield, createHostScheduler, createVirtualTimeScheduler, run, schedule, toPausableScheduler, toPriorityScheduler, toSchedulerWithPriority };

@@ -88,7 +88,2 @@ 'use strict';

const notifyListeners = (listeners, state) => {
for (const listener of listeners) {
listener.onRunStatusChanged(state);
}
};
const isYieldError = (e) => e instanceof YieldError;

@@ -101,5 +96,2 @@ class YieldError {

let currentScheduler = option.none;
function clearListeners() {
this.listeners = option.none;
}
class SchedulerContinuationImpl extends disposable.AbstractDisposable {

@@ -110,27 +102,7 @@ constructor(scheduler, f) {

this.f = f;
this.listeners = option.none;
}
addListener(_ev, listener) {
if (!this.isDisposed) {
let { listeners } = this;
if (option.isNone(listeners)) {
this.listeners = new Set();
}
this.listeners.add(listener);
}
}
removeListener(_ev, listener) {
let { listeners } = this;
if (option.isSome(listeners)) {
listeners.delete(listener);
}
}
continue() {
if (!this.isDisposed) {
const { listeners } = this;
let error = option.none;
let yieldError = option.none;
if (option.isSome(listeners)) {
notifyListeners(listeners, true);
}
const oldCurrentScheduler = currentScheduler;

@@ -150,5 +122,2 @@ currentScheduler = this.scheduler;

currentScheduler = oldCurrentScheduler;
if (option.isSome(listeners)) {
notifyListeners(listeners, false);
}
if (option.isSome(yieldError)) {

@@ -176,3 +145,2 @@ this.scheduler.schedule(this, yieldError);

const continuation = new SchedulerContinuationImpl(scheduler, f);
disposable.addTeardown(continuation, clearListeners);
scheduler.schedule(continuation, options);

@@ -179,0 +147,0 @@ return continuation;

@@ -35,4 +35,4 @@ 'use strict';

const stream = new StreamImpl(subject, observable$1);
disposable.addDisposable(observable$1, stream);
disposable.addDisposable(stream, subject);
disposable.addDisposableDisposeParentOnChildError(observable$1, stream);
disposable.addDisposableDisposeParentOnChildError(stream, subject);
return stream;

@@ -61,6 +61,7 @@ };

const op = requests => observable.createObservableUnsafe(observer => {
const srcStream = functions.pipe(src, stream(observer));
const requestSubscription = functions.pipe(requests, observable.map(functions.compose(...reqOps)), observable.subscribe(observer, srcStream.dispatch, srcStream));
const srcStream = functions.pipe(src, stream(observer.scheduler));
disposable.addDisposableDisposeParentOnChildError(observer, srcStream);
const requestSubscription = functions.pipe(requests, observable.map(functions.compose(...reqOps)), observable.subscribe(observer.scheduler, srcStream.dispatch, srcStream));
disposable.addDisposableDisposeParentOnChildError(observer, requestSubscription);
disposable.bindDisposables(srcStream, requestSubscription);
disposable.addDisposable(observer, srcStream);
functions.pipe(srcStream, functions.compose(...obsOps), source.sinkInto(observer));

@@ -139,6 +140,6 @@ });

const toStateStore = () => streamable => createStreamable(updates => observable.createObservableUnsafe(observer => {
const stream$1 = functions.pipe(streamable, stream(observer));
const updatesSubscription = functions.pipe(updates, observable.zipWithLatestFrom(stream$1, (updateState, prev) => updateState(prev)), observable.subscribe(observer, stream$1.dispatch, stream$1));
const stream$1 = functions.pipe(streamable, stream(observer.scheduler));
disposable.addDisposableDisposeParentOnChildError(observer, stream$1);
const updatesSubscription = functions.pipe(updates, observable.zipWithLatestFrom(stream$1, (updateState, prev) => updateState(prev)), observable.subscribe(observer.scheduler, stream$1.dispatch, stream$1));
disposable.bindDisposables(updatesSubscription, stream$1);
disposable.addDisposable(observer, stream$1);
functions.pipe(stream$1, source.sinkInto(observer));

@@ -159,3 +160,4 @@ }));

const op = (modeObs) => observable.createObservableUnsafe(observer => {
const pausableScheduler = scheduler.toPausableScheduler(scheduler$1 !== null && scheduler$1 !== void 0 ? scheduler$1 : observer);
const pausableScheduler = scheduler.toPausableScheduler(scheduler$1 !== null && scheduler$1 !== void 0 ? scheduler$1 : observer.scheduler);
disposable.addDisposableDisposeParentOnChildError(observer, pausableScheduler);
const onModeChange = (mode) => {

@@ -171,5 +173,5 @@ switch (mode) {

};
const modeSubscription = functions.pipe(modeObs, observable.subscribe(observer, onModeChange));
const modeSubscription = functions.pipe(modeObs, observable.subscribe(observer.scheduler, onModeChange));
disposable.addDisposableDisposeParentOnChildError(observer, modeSubscription);
disposable.bindDisposables(modeSubscription, pausableScheduler);
disposable.addDisposable(observer, pausableScheduler);
functions.pipe(observable$1, observable.subscribeOn(pausableScheduler), functions.pipe(pausableScheduler, observable.fromDisposable, observable.takeUntil), source.sinkInto(observer));

@@ -176,0 +178,0 @@ });

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is too big to display

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc