@reactive-js/core
Advanced tools
Comparing version 0.0.54 to 0.1.0
@@ -7,3 +7,3 @@ /// <reference types="node" /> | ||
import { DisposableValueLike } from './disposable.js'; | ||
import { IOSourceLike, IOSinkLike, IOSourceOperator } from './io.js'; | ||
import { StreamableLike, FlowMode, NotifyEvent, DoneEvent, StreamableOperator } from './streamable.js'; | ||
import { BrotliOptions, ZlibOptions } from 'zlib'; | ||
@@ -24,3 +24,3 @@ | ||
declare const createReadableIOSource: (factory: Factory<DisposableValueLike<Readable>>) => IOSourceLike<Uint8Array>; | ||
declare const createReadableIOSource: (factory: Factory<DisposableValueLike<Readable>>) => StreamableLike<FlowMode, NotifyEvent<Uint8Array> | DoneEvent>; | ||
declare const readFileIOSource: (path: fs.PathLike, options?: { | ||
@@ -32,16 +32,16 @@ readonly flags?: string; | ||
readonly highWaterMark?: number; | ||
}) => IOSourceLike<Uint8Array>; | ||
}) => StreamableLike<FlowMode, DoneEvent | NotifyEvent<Uint8Array>>; | ||
declare const createWritableIOSink: (factory: Factory<DisposableValueLike<Writable>>) => IOSinkLike<Uint8Array>; | ||
declare const createWritableIOSink: (factory: Factory<DisposableValueLike<Writable>>) => StreamableLike<NotifyEvent<Uint8Array> | DoneEvent, FlowMode>; | ||
declare const createDisposableNodeStream: <T extends Readable | Writable | Transform>(stream: T) => DisposableValueLike<T>; | ||
declare const transform: (factory: Factory<DisposableValueLike<Transform>>) => IOSourceOperator<Uint8Array, Uint8Array>; | ||
declare const brotliDecompress: (options?: BrotliOptions) => IOSourceOperator<Uint8Array, Uint8Array>; | ||
declare const gunzip: (options?: ZlibOptions) => IOSourceOperator<Uint8Array, Uint8Array>; | ||
declare const inflate: (options?: ZlibOptions) => IOSourceOperator<Uint8Array, Uint8Array>; | ||
declare const brotliCompress: (options?: BrotliOptions) => IOSourceOperator<Uint8Array, Uint8Array>; | ||
declare const gzip: (options?: ZlibOptions) => IOSourceOperator<Uint8Array, Uint8Array>; | ||
declare const deflate: (options?: ZlibOptions) => IOSourceOperator<Uint8Array, Uint8Array>; | ||
declare const transform: (factory: Factory<DisposableValueLike<Transform>>) => StreamableOperator<FlowMode, NotifyEvent<Uint8Array> | DoneEvent, FlowMode, NotifyEvent<Uint8Array> | DoneEvent>; | ||
declare const brotliDecompress: (options?: BrotliOptions) => StreamableOperator<FlowMode, NotifyEvent<Uint8Array> | DoneEvent, FlowMode, NotifyEvent<Uint8Array> | DoneEvent>; | ||
declare const gunzip: (options?: ZlibOptions) => StreamableOperator<FlowMode, NotifyEvent<Uint8Array> | DoneEvent, FlowMode, NotifyEvent<Uint8Array> | DoneEvent>; | ||
declare const inflate: (options?: ZlibOptions) => StreamableOperator<FlowMode, NotifyEvent<Uint8Array> | DoneEvent, FlowMode, NotifyEvent<Uint8Array> | DoneEvent>; | ||
declare const brotliCompress: (options?: BrotliOptions) => StreamableOperator<FlowMode, NotifyEvent<Uint8Array> | DoneEvent, FlowMode, NotifyEvent<Uint8Array> | DoneEvent>; | ||
declare const gzip: (options?: ZlibOptions) => StreamableOperator<FlowMode, NotifyEvent<Uint8Array> | DoneEvent, FlowMode, NotifyEvent<Uint8Array> | DoneEvent>; | ||
declare const deflate: (options?: ZlibOptions) => StreamableOperator<FlowMode, NotifyEvent<Uint8Array> | DoneEvent, FlowMode, NotifyEvent<Uint8Array> | DoneEvent>; | ||
export { bindNodeCallback, brotliCompress, brotliDecompress, createDisposableNodeStream, createReadableIOSource, createWritableIOSink, deflate, gunzip, gzip, inflate, readFileIOSource, transform }; |
@@ -9,3 +9,2 @@ 'use strict'; | ||
var fs = require('fs'); | ||
var io = require('./io.js'); | ||
var streamable = require('./streamable.js'); | ||
@@ -54,6 +53,6 @@ var zlib = require('zlib'); | ||
const readableValue = readable.value; | ||
const onData = functions.compose(io.notify, observable.dispatchTo(dispatcher)); | ||
const onData = functions.compose(streamable.notifyEvent, observable.dispatchTo(dispatcher)); | ||
readableValue.on("data", onData); | ||
const onEnd = () => { | ||
dispatcher.dispatch(io.done()); | ||
dispatcher.dispatch(streamable.doneEvent); | ||
functions.pipe(dispatcher, disposable.dispose()); | ||
@@ -60,0 +59,0 @@ }; |
{ | ||
"name": "@reactive-js/core", | ||
"version": "0.0.54", | ||
"version": "0.1.0", | ||
"keywords": [ | ||
@@ -45,3 +45,3 @@ "asynchronous", | ||
}, | ||
"gitHead": "d91eeafb726d2e2e628fbdb574367614d81ed5b3" | ||
"gitHead": "348535c19ca68e980b2fbae5da0df69a678f4dc5" | ||
} |
@@ -1,4 +0,5 @@ | ||
import { Reducer, Factory, Equality, Updater, Function1, SideEffect1, Function2 } from './functions.js'; | ||
import { ObservableOperator, StreamLike, ObservableLike } from './observable.js'; | ||
import { Reducer, Factory, Equality, Updater, Function1, Function2 } from './functions.js'; | ||
import { ObservableOperator, StreamLike, ObservableLike, MulticastObservableLike } from './observable.js'; | ||
import { SchedulerLike } from './scheduler.js'; | ||
import { EnumerableLike } from './enumerable.js'; | ||
@@ -58,8 +59,2 @@ /** | ||
declare const map: <TReq, TA, TB>(mapper: Function1<TA, TB>) => StreamableOperator<TReq, TA, TReq, TB>; | ||
declare const mapTo: <TReq, TA, TB>(v: TB) => StreamableOperator<TReq, TA, TReq, TB>; | ||
declare const onNotify: <TReq, T>(onNotify: SideEffect1<T>) => StreamableOperator<TReq, T, TReq, T>; | ||
declare const scan: <TReq, T, TAcc>(scanner: Reducer<T, TAcc>, initalValue: Factory<TAcc>) => StreamableOperator<TReq, T, TReq, TAcc>; | ||
declare const withLatestFrom: <TReq, TA, TB, T>(other: ObservableLike<TB>, selector: Function2<TA, TB, T>) => StreamableOperator<TReq, TA, TReq, T>; | ||
declare const flow: <T>({ scheduler, }?: { | ||
@@ -71,2 +66,55 @@ scheduler?: SchedulerLike | undefined; | ||
declare type IOEvent<T> = NotifyEvent<T> | DoneEvent; | ||
declare const decodeWithCharset: (charset?: string, options?: TextDecoderOptions) => StreamableOperator<FlowMode, NotifyEvent<ArrayBuffer> | DoneEvent, FlowMode, NotifyEvent<string> | DoneEvent>; | ||
declare const encodeUtf8: StreamableOperator<FlowMode, IOEvent<string>, FlowMode, IOEvent<Uint8Array>>; | ||
declare const mapIOEventStream: <TA, TB>(mapper: Function1<TA, TB>) => Function1<StreamableLike<FlowMode, IOEvent<TA>>, StreamableLike<FlowMode, IOEvent<TB>>>; | ||
declare const flowIOEvents: <T>() => Function1<ObservableLike<T>, StreamableLike<FlowMode, IOEvent<T>>>; | ||
/** @experimental */ | ||
declare const createIOSinkAccumulator: <T, TAcc>(reducer: Reducer<T, TAcc>, initialValue: Factory<TAcc>, options?: { | ||
readonly replay?: number; | ||
}) => IOSinkAccumulatorLike<T, TAcc>; | ||
/** | ||
* Returns an `AsyncEnumerableLike` from the provided array. | ||
* | ||
* @param values The array. | ||
*/ | ||
declare const fromArray: <T>(options?: { | ||
readonly delay?: number; | ||
readonly startIndex?: number; | ||
readonly endIndex?: number; | ||
}) => Function1<readonly T[], StreamableLike<void, T>>; | ||
/** | ||
* Returns an `AsyncEnumerableLike` from the provided iterable. | ||
* | ||
* @param iterable | ||
*/ | ||
declare const fromEnumerable: <T>() => Function1<EnumerableLike<T>, StreamableLike<void, T>>; | ||
/** | ||
* Returns an `AsyncEnumerableLike` from the provided iterable. | ||
* | ||
* @param iterable | ||
*/ | ||
declare const fromIterable: <T>() => Function1<Iterable<T>, StreamableLike<void, T>>; | ||
/** | ||
* Generates an `AsyncEnumerableLike` sequence from a generator function | ||
* that is applied to an accumulator value. | ||
* | ||
* @param generator The generator function. | ||
* @param initialValue Factory function to generate the initial accumulator. | ||
*/ | ||
declare const generate: <T>(generator: Updater<T>, initialValue: Factory<T>, options?: { | ||
readonly delay?: number; | ||
}) => StreamableLike<void, T>; | ||
declare const consume: <T, TAcc>(consumer: Function2<TAcc, T, NotifyEvent<TAcc> | DoneEventWithData<TAcc>>, initial: Factory<TAcc>) => Function1<StreamableLike<void, T>, ObservableLike<TAcc>>; | ||
declare const consumeAsync: <T, TAcc>(consumer: Function2<TAcc, T, ObservableLike<NotifyEvent<TAcc> | DoneEventWithData<TAcc>>>, initial: Factory<TAcc>) => Function1<StreamableLike<void, T>, ObservableLike<TAcc>>; | ||
declare const notifyEvent: <T>(data: T) => NotifyEvent<T>; | ||
declare const doneEventWithData: <T>(data: T) => DoneEventWithData<T>; | ||
declare const doneEvent: DoneEvent; | ||
interface StreamableLike<TReq, T> { | ||
@@ -78,4 +126,21 @@ stream(this: StreamableLike<TReq, T>, scheduler: SchedulerLike, options?: { | ||
declare type StreamableOperator<TSrcReq, TSrc, TReq, T> = Function1<StreamableLike<TSrcReq, TSrc>, StreamableLike<TReq, T>>; | ||
/** | ||
* @experimental | ||
* @noInheritDoc | ||
* */ | ||
interface IOSinkAccumulatorLike<T, TAcc> extends StreamableLike<NotifyEvent<T> | DoneEvent, FlowMode>, MulticastObservableLike<TAcc> { | ||
} | ||
declare type FlowMode = "resume" | "pause"; | ||
declare type NotifyEvent<T> = { | ||
readonly type: "notify"; | ||
readonly data: T; | ||
}; | ||
declare type DoneEvent = { | ||
readonly type: "done"; | ||
}; | ||
declare type DoneEventWithData<T> = { | ||
readonly type: "done"; | ||
readonly data: T; | ||
}; | ||
export { FlowMode, StreamableLike, StreamableOperator, __stream, createActionReducer, createStateStore, createStreamable, empty, flow, identity, lift, map, mapReq, mapTo, onNotify, scan, sink, stream, toStateStore, withLatestFrom }; | ||
export { DoneEvent, DoneEventWithData, FlowMode, IOSinkAccumulatorLike, NotifyEvent, StreamableLike, StreamableOperator, __stream, consume, consumeAsync, createActionReducer, createIOSinkAccumulator, createStateStore, createStreamable, decodeWithCharset, doneEvent, doneEventWithData, empty, encodeUtf8, flow, flowIOEvents, fromArray, fromEnumerable, fromIterable, generate, identity, lift, mapIOEventStream, mapReq, notifyEvent, sink, stream, toStateStore }; |
@@ -11,2 +11,3 @@ 'use strict'; | ||
var scheduler = require('./scheduler.js'); | ||
var enumerable = require('./enumerable.js'); | ||
@@ -154,8 +155,2 @@ class StreamImpl extends disposable.AbstractDisposable { | ||
const map = (mapper) => lift(observable.map(mapper)); | ||
const mapTo = (v) => lift(container.mapTo(observable.mapT, v)); | ||
const onNotify = (onNotify) => lift(observable.onNotify(onNotify)); | ||
const scan = (scanner, initalValue) => lift(observable.scan(scanner, initalValue)); | ||
const withLatestFrom = (other, selector) => lift(observable.withLatestFrom(other, selector)); | ||
const flow = ({ scheduler: scheduler$1, } = {}) => observable$1 => { | ||
@@ -193,18 +188,187 @@ const createScheduler = (modeObs) => (modeScheduler) => { | ||
const notifyEvent = (data) => ({ | ||
type: "notify", | ||
data, | ||
}); | ||
const doneEventWithData = (data) => ({ | ||
type: "done", | ||
data, | ||
}); | ||
const doneEvent = { type: "done" }; | ||
const decodeWithCharset = (charset = "utf-8", options) => functions.pipe(observable.withLatestFrom(container.compute({ | ||
...observable.fromArrayT, | ||
...observable.mapT, | ||
})(() => new TextDecoder(charset, options)), function* (ev, decoder) { | ||
switch (ev.type) { | ||
case "notify": { | ||
const data = decoder.decode(ev.data, { stream: true }); | ||
if (data.length > 0) { | ||
yield notifyEvent(data); | ||
} | ||
break; | ||
} | ||
case "done": { | ||
const data = decoder.decode(); | ||
if (data.length > 0) { | ||
yield notifyEvent(data); | ||
} | ||
yield doneEvent; | ||
break; | ||
} | ||
} | ||
}), functions.composeWith(observable.map(functions.returns)), functions.composeWith(container.concatMap({ ...observable.concatAllT, ...observable.mapT }, observable.fromIterator())), lift); | ||
const _encodeUtf8 = lift(observable.withLatestFrom(container.compute({ | ||
...observable.fromArrayT, | ||
...observable.mapT, | ||
})(() => new TextEncoder()), (ev, textEncoder) => { | ||
switch (ev.type) { | ||
case "notify": { | ||
const data = textEncoder.encode(ev.data); | ||
return notifyEvent(data); | ||
} | ||
case "done": { | ||
return ev; | ||
} | ||
} | ||
})); | ||
const encodeUtf8 = _encodeUtf8; | ||
const mapIOEventStream = (mapper) => lift(observable.map((ev) => ev.type === "notify" ? functions.pipe(ev.data, mapper, notifyEvent) : ev)); | ||
const _flowIOEvents = functions.compose(observable.map(notifyEvent), container.endWith({ ...observable.fromArrayT, ...observable.concatT }, doneEvent), flow()); | ||
const flowIOEvents = () => _flowIOEvents; | ||
const isNotify = (ev) => ev.type === "notify"; | ||
class IOSinkAccumulatorImpl extends disposable.AbstractDisposable { | ||
constructor(reducer, initialValue, options) { | ||
super(); | ||
this.isSynchronous = false; | ||
const subject = observable.createSubject(options); | ||
disposable.addDisposableDisposeParentOnChildError(this, subject); | ||
const op = (events) => observable.using(scheduler => functions.pipe(events, observable.takeWhile(isNotify), container.keepType(observable.keepT, isNotify), observable.map(ev => ev.data), observable.reduce(reducer, initialValue), observable.subscribe(scheduler, subject.dispatch, subject)), eventsSubscription => observable.createObservable(dispatcher => { | ||
dispatcher.dispatch("pause"); | ||
dispatcher.dispatch("resume"); | ||
disposable.addDisposable(eventsSubscription, dispatcher); | ||
})); | ||
this.streamable = createStreamable(op); | ||
this.subject = subject; | ||
} | ||
get type() { | ||
return this; | ||
} | ||
get T() { | ||
return undefined; | ||
} | ||
get observerCount() { | ||
return this.subject.observerCount; | ||
} | ||
observe(observer) { | ||
this.subject.observe(observer); | ||
} | ||
stream(scheduler, options) { | ||
const result = functions.pipe(this.streamable, stream(scheduler, options)); | ||
disposable.addDisposableDisposeParentOnChildError(this, result); | ||
return result; | ||
} | ||
} | ||
/** @experimental */ | ||
const createIOSinkAccumulator = (reducer, initialValue, options) => new IOSinkAccumulatorImpl(reducer, initialValue, options); | ||
const fromArrayScanner = (acc, _) => acc + 1; | ||
/** | ||
* Returns an `AsyncEnumerableLike` from the provided array. | ||
* | ||
* @param values The array. | ||
*/ | ||
const fromArray = (options = {}) => values => { | ||
var _a, _b; | ||
const valuesLength = values.length; | ||
const startIndex = Math.min((_a = options.startIndex) !== null && _a !== void 0 ? _a : 0, valuesLength); | ||
const endIndex = Math.max(Math.min((_b = options.endIndex) !== null && _b !== void 0 ? _b : valuesLength, valuesLength), 0); | ||
const fromValueWithDelay = container.fromValue(observable.fromArrayT, options); | ||
return createStreamable(functions.compose(observable.scan(fromArrayScanner, functions.returns(startIndex - 1)), container.concatMap({ ...observable.mapT, ...observable.concatAllT }, (i) => fromValueWithDelay(values[i])), observable.takeFirst({ count: endIndex - startIndex }))); | ||
}; | ||
const _fromEnumerable = (enumerable$1) => createStreamable(functions.compose(observable.withLatestFrom(observable.using(_ => enumerable.enumerate(enumerable$1), functions.compose(container.fromValue(observable.fromArrayT), container.concatWith(observable.concatT, observable.never()))), (_, enumerator) => enumerator), observable.onNotify(enumerable.move), observable.takeWhile(enumerable.hasCurrent), observable.map(enumerable.current))); | ||
/** | ||
* Returns an `AsyncEnumerableLike` from the provided iterable. | ||
* | ||
* @param iterable | ||
*/ | ||
const fromEnumerable = () => _fromEnumerable; | ||
/** | ||
* Returns an `AsyncEnumerableLike` from the provided iterable. | ||
* | ||
* @param iterable | ||
*/ | ||
const _fromIterable = (iterable) => functions.pipe(iterable, enumerable.fromIterable(), fromEnumerable()); | ||
/** | ||
* Returns an `AsyncEnumerableLike` from the provided iterable. | ||
* | ||
* @param iterable | ||
*/ | ||
const fromIterable = () => _fromIterable; | ||
const generateScanner = (generator) => (acc, _) => generator(acc); | ||
const asyncGeneratorScanner = (generator, options) => { | ||
const fromValueWithDelay = container.fromValue(observable.fromArrayT, options); | ||
return (acc, _) => functions.pipe(acc, generator, fromValueWithDelay); | ||
}; | ||
/** | ||
* Generates an `AsyncEnumerableLike` sequence from a generator function | ||
* that is applied to an accumulator value. | ||
* | ||
* @param generator The generator function. | ||
* @param initialValue Factory function to generate the initial accumulator. | ||
*/ | ||
const generate = (generator, initialValue, options = {}) => { | ||
const { delay = 0 } = options; | ||
const op = delay > 0 | ||
? observable.scanAsync(asyncGeneratorScanner(generator, options), initialValue) | ||
: observable.scan(generateScanner(generator), initialValue); | ||
return createStreamable(op); | ||
}; | ||
const consumeImpl = (consumer, initial) => enumerable => observable.using(scheduler => { | ||
const enumerator = functions.pipe(enumerable, stream(scheduler)); | ||
const accFeedback = observable.createSubject(); | ||
return [accFeedback, enumerator]; | ||
}, (accFeedback, enumerator) => functions.pipe(enumerator, consumer(accFeedback), observable.onNotify(ev => { | ||
switch (ev.type) { | ||
case "notify": | ||
accFeedback.dispatch(ev.data); | ||
enumerator.dispatch(option.none); | ||
break; | ||
} | ||
}), observable.map(ev => ev.data), observable.onSubscribe(() => { | ||
accFeedback.dispatch(initial()); | ||
enumerator.dispatch(option.none); | ||
}))); | ||
const consume = (consumer, initial) => consumeImpl(accObs => observable.zipWithLatestFrom(accObs, functions.flip(consumer)), initial); | ||
const consumeAsync = (consumer, initial) => consumeImpl(accObs => functions.compose(observable.zipWithLatestFrom(accObs, (next, acc) => functions.pipe(consumer(acc, next), observable.takeFirst())), observable.switchAll()), initial); | ||
exports.__stream = __stream; | ||
exports.consume = consume; | ||
exports.consumeAsync = consumeAsync; | ||
exports.createActionReducer = createActionReducer; | ||
exports.createIOSinkAccumulator = createIOSinkAccumulator; | ||
exports.createStateStore = createStateStore; | ||
exports.createStreamable = createStreamable; | ||
exports.decodeWithCharset = decodeWithCharset; | ||
exports.doneEvent = doneEvent; | ||
exports.doneEventWithData = doneEventWithData; | ||
exports.empty = empty; | ||
exports.encodeUtf8 = encodeUtf8; | ||
exports.flow = flow; | ||
exports.flowIOEvents = flowIOEvents; | ||
exports.fromArray = fromArray; | ||
exports.fromEnumerable = fromEnumerable; | ||
exports.fromIterable = fromIterable; | ||
exports.generate = generate; | ||
exports.identity = identity; | ||
exports.lift = lift; | ||
exports.map = map; | ||
exports.mapIOEventStream = mapIOEventStream; | ||
exports.mapReq = mapReq; | ||
exports.mapTo = mapTo; | ||
exports.onNotify = onNotify; | ||
exports.scan = scan; | ||
exports.notifyEvent = notifyEvent; | ||
exports.sink = sink; | ||
exports.stream = stream; | ||
exports.toStateStore = toStateStore; | ||
exports.withLatestFrom = withLatestFrom; |
@@ -89,3 +89,3 @@ 'use strict'; | ||
uri: getCurrentWindowLocationURI(), | ||
}), streamable.createStateStore, streamable.onNotify(({ uri }) => { | ||
}), streamable.createStateStore, streamable.lift(observable.onNotify(({ uri }) => { | ||
// Initialize the history state on page load | ||
@@ -97,3 +97,3 @@ const isInitialPageLoad = this.historyCounter === -1; | ||
} | ||
}), streamable.lift(observable.keep(({ uri }) => { | ||
})), streamable.lift(observable.keep(({ uri }) => { | ||
const { title } = uri; | ||
@@ -104,3 +104,3 @@ const uriString = windowLocationURIToString(uri); | ||
return titleChanged || uriChanged; | ||
})), streamable.lift(observable.throttle(300)), streamable.onNotify(({ replace, uri }) => { | ||
})), streamable.lift(observable.throttle(300)), streamable.lift(observable.onNotify(({ replace, uri }) => { | ||
const { title } = uri; | ||
@@ -116,3 +116,3 @@ const uriString = windowLocationURIToString(uri); | ||
updateHistoryState(this, uri); | ||
}), streamable.map(({ uri }) => uri), streamable.stream(scheduler, options)); | ||
})), streamable.lift(observable.map(({ uri }) => uri)), streamable.stream(scheduler, options)); | ||
const historySubscription = functions.pipe(fromEvent(window, "popstate", (e) => { | ||
@@ -119,0 +119,0 @@ const { counter, title } = e.state; |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is too big to display
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
465731
54
11995