Comparing version 0.6.1 to 0.6.3
{ | ||
"name": "aurumjs", | ||
"version": "0.6.1", | ||
"version": "0.6.3", | ||
"description": "Stream based declarative DOM rendering library for javascript", | ||
@@ -39,5 +39,4 @@ "main": "prebuilt/cjs/aurumjs.js", | ||
"sinon": "^9.0.3", | ||
"typedoc": "^0.15.4", | ||
"typescript": "^4.1.5" | ||
"typescript": "^4.2.2" | ||
} | ||
} |
@@ -13,2 +13,3 @@ export * from './rendering/webcomponent'; | ||
export * from './stream/data_source_operators'; | ||
export * from './stream/duplex_data_source_operators'; | ||
export * from './stream/operator_model'; | ||
@@ -19,2 +20,3 @@ export * from './stream/stream'; | ||
export * from './utilities/event_emitter'; | ||
export * from './rendering/classname'; | ||
export * from './stream/emitters'; | ||
@@ -21,0 +23,0 @@ export * from './nodes/string_adapter'; |
@@ -26,2 +26,3 @@ "use strict"; | ||
__exportStar(require("./stream/data_source_operators"), exports); | ||
__exportStar(require("./stream/duplex_data_source_operators"), exports); | ||
__exportStar(require("./stream/operator_model"), exports); | ||
@@ -32,2 +33,3 @@ __exportStar(require("./stream/stream"), exports); | ||
__exportStar(require("./utilities/event_emitter"), exports); | ||
__exportStar(require("./rendering/classname"), exports); | ||
__exportStar(require("./stream/emitters"), exports); | ||
@@ -34,0 +36,0 @@ __exportStar(require("./nodes/string_adapter"), exports); |
@@ -192,2 +192,5 @@ "use strict"; | ||
const value = data.reduce((p, c) => { | ||
if (!c) { | ||
return p; | ||
} | ||
if (typeof c === 'string') { | ||
@@ -194,0 +197,0 @@ return `${p} ${c}`; |
@@ -6,3 +6,3 @@ import { AurumComponentAPI, Renderable } from '../rendering/aurum_element'; | ||
} | ||
export declare function Suspense(props: SuspenseProps, children: Renderable[], api: AurumComponentAPI): DataSource<string | number | HTMLElement | Text | import("../rendering/aurum_element").AurumElement | import("../rendering/aurum_element").AurumElementModel<any> | Promise<Renderable> | DataSource<Renderable> | import("../stream/data_source").ArrayDataSource<Renderable> | import("../aurumjs").DuplexDataSource<Renderable> | Renderable[]>; | ||
export declare function Suspense(props: SuspenseProps, children: Renderable[], api: AurumComponentAPI): DataSource<Renderable | Renderable[]>; | ||
//# sourceMappingURL=suspense.d.ts.map |
@@ -10,3 +10,4 @@ import { DataSource, ArrayDataSource, ReadOnlyDataSource } from '../stream/data_source'; | ||
} | ||
export declare type Renderable = AurumElement | HTMLElement | Text | string | number | AurumElementModel<any> | Promise<Renderable> | DataSource<Renderable> | ArrayDataSource<Renderable> | DuplexDataSource<Renderable>; | ||
declare type ResolvedRenderable = AurumElement | HTMLElement | Text | string | number | AurumElementModel<any> | DataSource<Renderable> | ArrayDataSource<Renderable> | DuplexDataSource<Renderable>; | ||
export declare type Renderable = ResolvedRenderable | Promise<ResolvedRenderable>; | ||
export declare type Rendered = AurumElement | HTMLElement | Text; | ||
@@ -101,2 +102,3 @@ export interface ComponentLifeCycle { | ||
} | ||
export {}; | ||
//# sourceMappingURL=aurum_element.d.ts.map |
@@ -121,3 +121,5 @@ "use strict"; | ||
listenAndRepeat(callback, cancellationToken) { | ||
callback(this.value); | ||
if (this.primed) { | ||
callback(this.value); | ||
} | ||
return this.listen(callback, cancellationToken); | ||
@@ -659,3 +661,2 @@ } | ||
break; | ||
break; | ||
} | ||
@@ -662,0 +663,0 @@ }, cancellationToken); |
import { AurumServerInfo } from '../aurum_server/aurum_server_client'; | ||
import { CancellationToken } from '../utilities/cancellation_token'; | ||
import { Callback } from '../utilities/common'; | ||
import { EventEmitter } from '../utilities/event_emitter'; | ||
import { DataSource, GenericDataSource, ReadOnlyDataSource } from './data_source'; | ||
import { DataSourceOperator } from './operator_model'; | ||
export declare enum DataFlow { | ||
UPSTREAM = 0, | ||
DOWNSTREAM = 1 | ||
} | ||
import { DataFlow } from './duplex_data_source_operators'; | ||
import { DataSourceOperator, DuplexDataSourceOperator } from './operator_model'; | ||
/** | ||
@@ -19,2 +17,4 @@ * Same as DataSource except data can flow in both directions | ||
private primed; | ||
protected errorHandler: (error: any) => T; | ||
protected errorEvent: EventEmitter<Error>; | ||
private updatingUpstream; | ||
@@ -29,5 +29,5 @@ private updatingDownstream; | ||
* @param initialValue | ||
* @param propagateWritesToReadStream If a write is done propagate this update back down to all the consumers. Useful at the root node | ||
* @param rootNode If a write is done propagate this update back down to all the consumers. Useful at the root node | ||
*/ | ||
constructor(initialValue?: T, propagateWritesToReadStream?: boolean, name?: string); | ||
constructor(initialValue?: T, rootNode?: boolean, name?: string); | ||
/** | ||
@@ -88,2 +88,9 @@ * Connects to an aurum-server exposed datasource view https://github.com/CyberPhoenix90/aurum-server for more information | ||
/** | ||
* Subscribes exclusively to updates of the data stream that occur due to an update flowing upstream | ||
* @param callback Callback to call when value is updated | ||
* @param cancellationToken Optional token to control the cancellation of the subscription | ||
* @returns Cancellation callback, can be used to cancel subscription without a cancellation token | ||
*/ | ||
listenUpstreamAndRepeat(callback: Callback<T>, cancellationToken?: CancellationToken): Callback<void>; | ||
/** | ||
* Subscribes exclusively to one update of the data stream that occur due to an update flowing upstream | ||
@@ -162,9 +169,3 @@ * @param callback Callback to call when value is updated | ||
], combinator?: (self: T, second: A, third: B, fourth: C, fifth: D, sixth: E, seventh: F, eigth: G, ninth: H, tenth: I) => R, cancellationToken?: CancellationToken): DataSource<R>; | ||
/** | ||
* Creates a new datasource that listenes to updates of this datasource but only propagates the updates from this source if they pass a predicate check | ||
* @param callback predicate check to decide if the update from the parent data source is passed down or not | ||
* @param cancellationToken Cancellation token to cancel the subscriptions added to the datasources by this operation | ||
*/ | ||
filter(downStreamFilter: (value: T, oldValue: T) => boolean, cancellationToken?: CancellationToken): DataSource<T>; | ||
filter(downStreamFilter: (value: T, oldValue: T) => boolean, upstreamFilter?: (value: T) => boolean, cancellationToken?: CancellationToken): DuplexDataSource<T>; | ||
transformDuplex<A, B = A, C = B, D = C, E = D, F = E, G = F, H = G, I = H, J = I, K = J>(operationA: DuplexDataSourceOperator<T, A>, operationB?: DuplexDataSourceOperator<A, B> | CancellationToken, operationC?: DuplexDataSourceOperator<B, C> | CancellationToken, operationD?: DuplexDataSourceOperator<C, D> | CancellationToken, operationE?: DuplexDataSourceOperator<D, E> | CancellationToken, operationF?: DuplexDataSourceOperator<E, F> | CancellationToken, operationG?: DuplexDataSourceOperator<F, G> | CancellationToken, operationH?: DuplexDataSourceOperator<G, H> | CancellationToken, operationI?: DuplexDataSourceOperator<H, I> | CancellationToken, operationJ?: DuplexDataSourceOperator<I, J> | CancellationToken, operationK?: DuplexDataSourceOperator<J, K> | CancellationToken, cancellationToken?: CancellationToken): DuplexDataSource<K>; | ||
transform<A, B = A, C = B, D = C, E = D, F = E, G = F, H = G, I = H, J = I, K = J>(operationA: DataSourceOperator<T, A>, operationB?: DataSourceOperator<A, B> | CancellationToken, operationC?: DataSourceOperator<B, C> | CancellationToken, operationD?: DataSourceOperator<C, D> | CancellationToken, operationE?: DataSourceOperator<D, E> | CancellationToken, operationF?: DataSourceOperator<E, F> | CancellationToken, operationG?: DataSourceOperator<F, G> | CancellationToken, operationH?: DataSourceOperator<G, H> | CancellationToken, operationI?: DataSourceOperator<H, I> | CancellationToken, operationJ?: DataSourceOperator<I, J> | CancellationToken, operationK?: DataSourceOperator<J, K> | CancellationToken, cancellationToken?: CancellationToken): DataSource<K>; | ||
@@ -177,10 +178,2 @@ /** | ||
pipe(targetDataSource: DuplexDataSource<T>, cancellationToken?: CancellationToken): this; | ||
/** | ||
* Creates a new datasource that is listening to updates from this datasource and transforms them with a mapper function before fowarding them to itself | ||
* @param mapper mapper function that transforms the data when it flows downwards | ||
* @param reverseMapper mapper function that transforms the data when it flows upwards | ||
* @param cancellationToken Cancellation token to cancel the subscriptions added to the datasources by this operation | ||
*/ | ||
map<D>(mapper: (value: T) => D, cancellationToken?: CancellationToken): DataSource<D>; | ||
map<D>(mapper: (value: T) => D, reverseMapper: (value: D) => T, cancellationToken?: CancellationToken): DuplexDataSource<D>; | ||
listenOnce(callback: Callback<T>, cancellationToken?: CancellationToken): Callback<void>; | ||
@@ -192,23 +185,3 @@ /** | ||
awaitNextUpdate(cancellationToken?: CancellationToken): Promise<T>; | ||
debounceUpstream(time: number, cancellationToken?: CancellationToken): DuplexDataSource<T>; | ||
debounceDownstream(time: number, cancellationToken?: CancellationToken): DuplexDataSource<T>; | ||
/** | ||
* Creates a new datasource that listens to this one and forwards updates if they are not the same as the last update | ||
* @param cancellationToken Cancellation token to cancel the subscription the new datasource has to this datasource | ||
*/ | ||
unique(cancellationToken?: CancellationToken): DuplexDataSource<T>; | ||
/** | ||
* Allows flow of data only in one direction | ||
* @param direction direction of the dataflow that is allowed | ||
* @param cancellationToken Cancellation token to cancel the subscriptions the new datasource has to the two parent datasources | ||
*/ | ||
oneWayFlow(direction?: DataFlow, cancellationToken?: CancellationToken): DuplexDataSource<T>; | ||
/** | ||
* Creates a new datasource that listens to this source and combines all updates into a single value | ||
* @param reducer function that aggregates an update with the previous result of aggregation | ||
* @param initialValue initial value given to the new source | ||
* @param cancellationToken Cancellation token to cancel the subscription the new datasource has to this datasource | ||
*/ | ||
reduce(reducer: (p: T, c: T) => T, initialValue: T, cancellationToken?: CancellationToken): DataSource<T>; | ||
/** | ||
* Remove all listeners | ||
@@ -219,3 +192,10 @@ */ | ||
cancelAllUpstream(): void; | ||
/** | ||
* Assign a function to handle errors and map them back to regular values. Rethrow the error in case you want to fallback to emitting error | ||
*/ | ||
handleErrors(callback: (error: any) => T): this; | ||
onError(callback: (error: any) => void, cancellationToken?: CancellationToken): this; | ||
emitError(e: Error, direction: DataFlow): void; | ||
} | ||
export declare function processTransformDuplex<I, O>(operations: DuplexDataSourceOperator<any, any>[], result: DuplexDataSource<O>, direction: DataFlow): Callback<I>; | ||
//# sourceMappingURL=duplex_data_source.d.ts.map |
"use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
exports.DuplexDataSource = exports.DataFlow = void 0; | ||
exports.processTransformDuplex = exports.DuplexDataSource = void 0; | ||
const aurum_server_client_1 = require("../aurum_server/aurum_server_client"); | ||
@@ -8,7 +8,4 @@ const cancellation_token_1 = require("../utilities/cancellation_token"); | ||
const data_source_1 = require("./data_source"); | ||
var DataFlow; | ||
(function (DataFlow) { | ||
DataFlow[DataFlow["UPSTREAM"] = 0] = "UPSTREAM"; | ||
DataFlow[DataFlow["DOWNSTREAM"] = 1] = "DOWNSTREAM"; | ||
})(DataFlow = exports.DataFlow || (exports.DataFlow = {})); | ||
const duplex_data_source_operators_1 = require("./duplex_data_source_operators"); | ||
const operator_model_1 = require("./operator_model"); | ||
/** | ||
@@ -21,5 +18,5 @@ * Same as DataSource except data can flow in both directions | ||
* @param initialValue | ||
* @param propagateWritesToReadStream If a write is done propagate this update back down to all the consumers. Useful at the root node | ||
* @param rootNode If a write is done propagate this update back down to all the consumers. Useful at the root node | ||
*/ | ||
constructor(initialValue, propagateWritesToReadStream = true, name = 'RootDuplexDataSource') { | ||
constructor(initialValue, rootNode = true, name = 'RootDuplexDataSource') { | ||
this.name = name; | ||
@@ -30,3 +27,3 @@ this.value = initialValue; | ||
this.updateUpstreamEvent = new event_emitter_1.EventEmitter(); | ||
this.propagateWritesToReadStream = propagateWritesToReadStream; | ||
this.propagateWritesToReadStream = rootNode; | ||
} | ||
@@ -70,4 +67,4 @@ /** | ||
*/ | ||
static createOneWay(direction = DataFlow.DOWNSTREAM, initialValue) { | ||
return new DuplexDataSource(initialValue, false).oneWayFlow(direction); | ||
static createOneWay(direction = duplex_data_source_operators_1.DataFlow.DOWNSTREAM, initialValue) { | ||
return new DuplexDataSource(initialValue, false).transformDuplex(duplex_data_source_operators_1.ddsOneWayFlow(direction)); | ||
} | ||
@@ -112,3 +109,5 @@ /** | ||
listenAndRepeat(callback, cancellationToken) { | ||
callback(this.value); | ||
if (this.primed) { | ||
callback(this.value); | ||
} | ||
return this.listen(callback, cancellationToken); | ||
@@ -138,2 +137,14 @@ } | ||
/** | ||
* Subscribes exclusively to updates of the data stream that occur due to an update flowing upstream | ||
* @param callback Callback to call when value is updated | ||
* @param cancellationToken Optional token to control the cancellation of the subscription | ||
* @returns Cancellation callback, can be used to cancel subscription without a cancellation token | ||
*/ | ||
listenUpstreamAndRepeat(callback, cancellationToken) { | ||
if (this.primed) { | ||
callback(this.value); | ||
} | ||
return this.updateUpstreamEvent.subscribe(callback, cancellationToken).cancel; | ||
} | ||
/** | ||
* Subscribes exclusively to one update of the data stream that occur due to an update flowing upstream | ||
@@ -174,26 +185,24 @@ * @param callback Callback to call when value is updated | ||
} | ||
filter(downStreamFilter, upstreamFilter, cancellationToken) { | ||
if (typeof upstreamFilter === 'function') { | ||
const filteredSource = new DuplexDataSource(undefined, false); | ||
this.listenDownstream((newVal) => { | ||
if (downStreamFilter(newVal, filteredSource.value)) { | ||
filteredSource.updateDownstream(newVal); | ||
} | ||
}, cancellationToken); | ||
filteredSource.listenUpstream((newVal) => { | ||
if (upstreamFilter(newVal, this.value)) { | ||
this.updateUpstream(newVal); | ||
} | ||
}, cancellationToken); | ||
return filteredSource; | ||
transformDuplex(operationA, operationB, operationC, operationD, operationE, operationF, operationG, operationH, operationI, operationJ, operationK, cancellationToken) { | ||
let token; | ||
const operations = [ | ||
operationA, | ||
operationB, | ||
operationC, | ||
operationD, | ||
operationE, | ||
operationF, | ||
operationG, | ||
operationH, | ||
operationI, | ||
operationJ, | ||
operationK | ||
].filter((e) => e && (e instanceof cancellation_token_1.CancellationToken ? ((token = e), false) : true)); | ||
if (cancellationToken) { | ||
token = cancellationToken; | ||
} | ||
else { | ||
const filteredSource = new data_source_1.DataSource(); | ||
this.listenDownstream((newVal) => { | ||
if (downStreamFilter(newVal, filteredSource.value)) { | ||
filteredSource.update(newVal); | ||
} | ||
}, upstreamFilter); | ||
return filteredSource; | ||
} | ||
const result = new DuplexDataSource(undefined, false, this.name + ' ' + operations.map((v) => v.name).join(' ')); | ||
(this.primed ? this.listenAndRepeat : this.listen).call(this, processTransformDuplex(operations, result, duplex_data_source_operators_1.DataFlow.DOWNSTREAM), token); | ||
result.listenUpstream.call(result, processTransformDuplex(operations, this, duplex_data_source_operators_1.DataFlow.UPSTREAM), token); | ||
return result; | ||
} | ||
@@ -232,27 +241,2 @@ transform(operationA, operationB, operationC, operationD, operationE, operationF, operationG, operationH, operationI, operationJ, operationK, cancellationToken) { | ||
} | ||
map(mapper, reverseMapper, cancellationToken) { | ||
if (typeof reverseMapper === 'function') { | ||
let mappedSource; | ||
if (this.primed) { | ||
mappedSource = new DuplexDataSource(mapper(this.value), false); | ||
} | ||
else { | ||
mappedSource = new DuplexDataSource(undefined, false); | ||
} | ||
this.listenDownstream((v) => mappedSource.updateDownstream(mapper(v)), cancellationToken); | ||
mappedSource.listenUpstream((v) => this.updateUpstream(reverseMapper(v)), cancellationToken); | ||
return mappedSource; | ||
} | ||
else { | ||
let mappedSource; | ||
if (this.primed) { | ||
mappedSource = new data_source_1.DataSource(mapper(this.value)); | ||
} | ||
else { | ||
mappedSource = new data_source_1.DataSource(); | ||
} | ||
this.listenDownstream((v) => mappedSource.update(mapper(v)), reverseMapper); | ||
return mappedSource; | ||
} | ||
} | ||
listenOnce(callback, cancellationToken) { | ||
@@ -270,81 +254,3 @@ return this.updateDownstreamEvent.subscribeOnce(callback, cancellationToken).cancel; | ||
} | ||
debounceUpstream(time, cancellationToken) { | ||
const debouncedDataSource = new DuplexDataSource(this.value); | ||
let timeout; | ||
debouncedDataSource.listenUpstream((v) => { | ||
clearTimeout(timeout); | ||
timeout = setTimeout(() => { | ||
this.updateUpstream(v); | ||
}, time); | ||
}, cancellationToken); | ||
this.listenDownstream((v) => { | ||
debouncedDataSource.updateDownstream(v); | ||
}, cancellationToken); | ||
return debouncedDataSource; | ||
} | ||
debounceDownstream(time, cancellationToken) { | ||
const debouncedDataSource = new DuplexDataSource(this.value); | ||
let timeout; | ||
this.listenDownstream((v) => { | ||
clearTimeout(timeout); | ||
timeout = setTimeout(() => { | ||
debouncedDataSource.updateDownstream(v); | ||
}, time); | ||
}, cancellationToken); | ||
debouncedDataSource.listenUpstream((v) => { | ||
this.updateUpstream(v); | ||
}, cancellationToken); | ||
return debouncedDataSource; | ||
} | ||
/** | ||
* Creates a new datasource that listens to this one and forwards updates if they are not the same as the last update | ||
* @param cancellationToken Cancellation token to cancel the subscription the new datasource has to this datasource | ||
*/ | ||
unique(cancellationToken) { | ||
const uniqueSource = new DuplexDataSource(this.value, false); | ||
let upstreamValue = this.value; | ||
let downStreamValue = this.value; | ||
this.listenDownstream((v) => { | ||
if (downStreamValue !== v) { | ||
downStreamValue = v; | ||
uniqueSource.updateDownstream(v); | ||
} | ||
}, cancellationToken); | ||
uniqueSource.listenUpstream((v) => { | ||
if (upstreamValue !== v) { | ||
upstreamValue = v; | ||
this.updateUpstream(v); | ||
} | ||
}, cancellationToken); | ||
return uniqueSource; | ||
} | ||
/** | ||
* Allows flow of data only in one direction | ||
* @param direction direction of the dataflow that is allowed | ||
* @param cancellationToken Cancellation token to cancel the subscriptions the new datasource has to the two parent datasources | ||
*/ | ||
oneWayFlow(direction = DataFlow.DOWNSTREAM, cancellationToken) { | ||
const oneWaySource = new DuplexDataSource(this.value, false); | ||
if (direction === DataFlow.DOWNSTREAM) { | ||
this.listenDownstream((v) => oneWaySource.updateDownstream(v), cancellationToken); | ||
oneWaySource.updateUpstream = () => void 0; | ||
} | ||
else { | ||
oneWaySource.listenUpstream((v) => this.updateUpstream(v)); | ||
oneWaySource.updateDownstream = () => void 0; | ||
} | ||
return oneWaySource; | ||
} | ||
/** | ||
* Creates a new datasource that listens to this source and combines all updates into a single value | ||
* @param reducer function that aggregates an update with the previous result of aggregation | ||
* @param initialValue initial value given to the new source | ||
* @param cancellationToken Cancellation token to cancel the subscription the new datasource has to this datasource | ||
*/ | ||
reduce(reducer, initialValue, cancellationToken) { | ||
const reduceSource = new data_source_1.DataSource(initialValue); | ||
this.listen((v) => reduceSource.update(reducer(reduceSource.value, v)), cancellationToken); | ||
return reduceSource; | ||
} | ||
/** | ||
* Remove all listeners | ||
@@ -362,4 +268,95 @@ */ | ||
} | ||
/** | ||
* Assign a function to handle errors and map them back to regular values. Rethrow the error in case you want to fallback to emitting error | ||
*/ | ||
handleErrors(callback) { | ||
this.errorHandler = callback; | ||
return this; | ||
} | ||
onError(callback, cancellationToken) { | ||
this.errorEvent.subscribe(callback, cancellationToken); | ||
return this; | ||
} | ||
emitError(e, direction) { | ||
if (this.errorHandler) { | ||
try { | ||
if (direction === duplex_data_source_operators_1.DataFlow.DOWNSTREAM) { | ||
return this.updateDownstream(this.errorHandler(e)); | ||
} | ||
else { | ||
return this.updateUpstream(this.errorHandler(e)); | ||
} | ||
} | ||
catch (newError) { | ||
e = newError; | ||
} | ||
} | ||
if (this.errorEvent.hasSubscriptions()) { | ||
this.errorEvent.fire(e); | ||
} | ||
else { | ||
throw e; | ||
} | ||
} | ||
} | ||
exports.DuplexDataSource = DuplexDataSource; | ||
function processTransformDuplex(operations, result, direction) { | ||
return async (v) => { | ||
try { | ||
for (const operation of operations) { | ||
switch (operation.operationType) { | ||
case operator_model_1.OperationType.NOOP: | ||
case operator_model_1.OperationType.MAP: | ||
v = | ||
direction === duplex_data_source_operators_1.DataFlow.DOWNSTREAM | ||
? operation.operationDown(v) | ||
: operation.operationUp(v); | ||
break; | ||
case operator_model_1.OperationType.MAP_DELAY_FILTER: | ||
const tmp = direction === duplex_data_source_operators_1.DataFlow.DOWNSTREAM | ||
? await operation.operationDown(v) | ||
: await operation.operationUp(v); | ||
if (tmp.cancelled) { | ||
return; | ||
} | ||
else { | ||
v = await tmp.item; | ||
} | ||
break; | ||
case operator_model_1.OperationType.DELAY: | ||
case operator_model_1.OperationType.MAP_DELAY: | ||
v = | ||
direction === duplex_data_source_operators_1.DataFlow.DOWNSTREAM | ||
? await operation.operationDown(v) | ||
: await operation.operationUp(v); | ||
break; | ||
case operator_model_1.OperationType.DELAY_FILTER: | ||
if (!(direction === duplex_data_source_operators_1.DataFlow.DOWNSTREAM | ||
? await operation.operationDown(v) | ||
: await operation.operationUp(v))) { | ||
return; | ||
} | ||
break; | ||
case operator_model_1.OperationType.FILTER: | ||
if (!(direction === duplex_data_source_operators_1.DataFlow.DOWNSTREAM | ||
? operation.operationDown(v) | ||
: operation.operationUp(v))) { | ||
return; | ||
} | ||
break; | ||
} | ||
} | ||
if (direction === duplex_data_source_operators_1.DataFlow.DOWNSTREAM) { | ||
result.updateDownstream(v); | ||
} | ||
else { | ||
result.updateUpstream(v); | ||
} | ||
} | ||
catch (e) { | ||
result.emitError(e, direction); | ||
} | ||
}; | ||
} | ||
exports.processTransformDuplex = processTransformDuplex; | ||
//# sourceMappingURL=duplex_data_source.js.map |
@@ -10,7 +10,12 @@ export declare enum OperationType { | ||
} | ||
export interface DataSourceOperator<T, M> { | ||
interface SourceOperator { | ||
operationType: OperationType; | ||
typescriptLimitationWorkaround?: (value: T) => M; | ||
name: string; | ||
} | ||
export interface DataSourceOperator<T, M> extends SourceOperator { | ||
typescriptLimitationWorkaround?: (value: T) => M; | ||
} | ||
export interface DuplexDataSourceOperator<T, M> extends SourceOperator { | ||
typescriptLimitationWorkaround?: (value: T) => M; | ||
} | ||
export interface DataSourceFilterOperator<T> extends DataSourceOperator<T, T> { | ||
@@ -20,2 +25,12 @@ operationType: OperationType.FILTER; | ||
} | ||
export interface DuplexDataSourceFilterOperator<T> extends DuplexDataSourceOperator<T, T> { | ||
operationType: OperationType.FILTER; | ||
operationDown: (value: T) => boolean; | ||
operationUp: (value: T) => boolean; | ||
} | ||
export interface DuplexDataSourceMapOperator<T, M> extends DuplexDataSourceOperator<T, M> { | ||
operationType: OperationType.MAP; | ||
operationDown: (value: T) => M; | ||
operationUp: (value: M) => T; | ||
} | ||
export interface DataSourceMapOperator<T, M> extends DataSourceOperator<T, M> { | ||
@@ -44,2 +59,13 @@ operationType: OperationType.MAP; | ||
} | ||
export interface DuplexDataSourceMapDelayFilterOperator<T, M> extends DuplexDataSourceOperator<T, M> { | ||
operationType: OperationType.MAP_DELAY_FILTER; | ||
operationDown: (value: T) => Promise<{ | ||
item: M; | ||
cancelled: boolean; | ||
}>; | ||
operationUp: (value: T) => Promise<{ | ||
item: M; | ||
cancelled: boolean; | ||
}>; | ||
} | ||
export interface DataSourceDelayFilterOperator<T> extends DataSourceOperator<T, T> { | ||
@@ -49,2 +75,8 @@ operationType: OperationType.DELAY_FILTER; | ||
} | ||
export interface DuplexDataSourceDelayFilterOperator<T> extends DuplexDataSourceOperator<T, T> { | ||
operationType: OperationType.DELAY_FILTER; | ||
operationDown: (value: T) => Promise<boolean>; | ||
operationUp: (value: T) => Promise<boolean>; | ||
} | ||
export {}; | ||
//# sourceMappingURL=operator_model.d.ts.map |
@@ -13,2 +13,3 @@ export * from './rendering/webcomponent'; | ||
export * from './stream/data_source_operators'; | ||
export * from './stream/duplex_data_source_operators'; | ||
export * from './stream/operator_model'; | ||
@@ -19,2 +20,3 @@ export * from './stream/stream'; | ||
export * from './utilities/event_emitter'; | ||
export * from './rendering/classname'; | ||
export * from './stream/emitters'; | ||
@@ -21,0 +23,0 @@ export * from './nodes/string_adapter'; |
@@ -13,2 +13,3 @@ export * from './rendering/webcomponent'; | ||
export * from './stream/data_source_operators'; | ||
export * from './stream/duplex_data_source_operators'; | ||
export * from './stream/operator_model'; | ||
@@ -19,2 +20,3 @@ export * from './stream/stream'; | ||
export * from './utilities/event_emitter'; | ||
export * from './rendering/classname'; | ||
export * from './stream/emitters'; | ||
@@ -21,0 +23,0 @@ export * from './nodes/string_adapter'; |
@@ -186,2 +186,5 @@ import { DataSource } from '../stream/data_source'; | ||
const value = data.reduce((p, c) => { | ||
if (!c) { | ||
return p; | ||
} | ||
if (typeof c === 'string') { | ||
@@ -188,0 +191,0 @@ return `${p} ${c}`; |
@@ -6,3 +6,3 @@ import { AurumComponentAPI, Renderable } from '../rendering/aurum_element'; | ||
} | ||
export declare function Suspense(props: SuspenseProps, children: Renderable[], api: AurumComponentAPI): DataSource<string | number | HTMLElement | Text | import("../rendering/aurum_element").AurumElement | import("../rendering/aurum_element").AurumElementModel<any> | Promise<Renderable> | DataSource<Renderable> | import("../stream/data_source").ArrayDataSource<Renderable> | import("../aurumjs").DuplexDataSource<Renderable> | Renderable[]>; | ||
export declare function Suspense(props: SuspenseProps, children: Renderable[], api: AurumComponentAPI): DataSource<Renderable | Renderable[]>; | ||
//# sourceMappingURL=suspense.d.ts.map |
@@ -10,3 +10,4 @@ import { DataSource, ArrayDataSource, ReadOnlyDataSource } from '../stream/data_source'; | ||
} | ||
export declare type Renderable = AurumElement | HTMLElement | Text | string | number | AurumElementModel<any> | Promise<Renderable> | DataSource<Renderable> | ArrayDataSource<Renderable> | DuplexDataSource<Renderable>; | ||
declare type ResolvedRenderable = AurumElement | HTMLElement | Text | string | number | AurumElementModel<any> | DataSource<Renderable> | ArrayDataSource<Renderable> | DuplexDataSource<Renderable>; | ||
export declare type Renderable = ResolvedRenderable | Promise<ResolvedRenderable>; | ||
export declare type Rendered = AurumElement | HTMLElement | Text; | ||
@@ -101,2 +102,3 @@ export interface ComponentLifeCycle { | ||
} | ||
export {}; | ||
//# sourceMappingURL=aurum_element.d.ts.map |
@@ -118,3 +118,5 @@ import { syncArrayDataSource, syncDataSource } from '../aurum_server/aurum_server_client'; | ||
listenAndRepeat(callback, cancellationToken) { | ||
callback(this.value); | ||
if (this.primed) { | ||
callback(this.value); | ||
} | ||
return this.listen(callback, cancellationToken); | ||
@@ -653,3 +655,2 @@ } | ||
break; | ||
break; | ||
} | ||
@@ -656,0 +657,0 @@ }, cancellationToken); |
import { AurumServerInfo } from '../aurum_server/aurum_server_client'; | ||
import { CancellationToken } from '../utilities/cancellation_token'; | ||
import { Callback } from '../utilities/common'; | ||
import { EventEmitter } from '../utilities/event_emitter'; | ||
import { DataSource, GenericDataSource, ReadOnlyDataSource } from './data_source'; | ||
import { DataSourceOperator } from './operator_model'; | ||
export declare enum DataFlow { | ||
UPSTREAM = 0, | ||
DOWNSTREAM = 1 | ||
} | ||
import { DataFlow } from './duplex_data_source_operators'; | ||
import { DataSourceOperator, DuplexDataSourceOperator } from './operator_model'; | ||
/** | ||
@@ -19,2 +17,4 @@ * Same as DataSource except data can flow in both directions | ||
private primed; | ||
protected errorHandler: (error: any) => T; | ||
protected errorEvent: EventEmitter<Error>; | ||
private updatingUpstream; | ||
@@ -29,5 +29,5 @@ private updatingDownstream; | ||
* @param initialValue | ||
* @param propagateWritesToReadStream If a write is done propagate this update back down to all the consumers. Useful at the root node | ||
* @param rootNode If a write is done propagate this update back down to all the consumers. Useful at the root node | ||
*/ | ||
constructor(initialValue?: T, propagateWritesToReadStream?: boolean, name?: string); | ||
constructor(initialValue?: T, rootNode?: boolean, name?: string); | ||
/** | ||
@@ -88,2 +88,9 @@ * Connects to an aurum-server exposed datasource view https://github.com/CyberPhoenix90/aurum-server for more information | ||
/** | ||
* Subscribes exclusively to updates of the data stream that occur due to an update flowing upstream | ||
* @param callback Callback to call when value is updated | ||
* @param cancellationToken Optional token to control the cancellation of the subscription | ||
* @returns Cancellation callback, can be used to cancel subscription without a cancellation token | ||
*/ | ||
listenUpstreamAndRepeat(callback: Callback<T>, cancellationToken?: CancellationToken): Callback<void>; | ||
/** | ||
* Subscribes exclusively to one update of the data stream that occur due to an update flowing upstream | ||
@@ -162,9 +169,3 @@ * @param callback Callback to call when value is updated | ||
], combinator?: (self: T, second: A, third: B, fourth: C, fifth: D, sixth: E, seventh: F, eigth: G, ninth: H, tenth: I) => R, cancellationToken?: CancellationToken): DataSource<R>; | ||
/** | ||
* Creates a new datasource that listenes to updates of this datasource but only propagates the updates from this source if they pass a predicate check | ||
* @param callback predicate check to decide if the update from the parent data source is passed down or not | ||
* @param cancellationToken Cancellation token to cancel the subscriptions added to the datasources by this operation | ||
*/ | ||
filter(downStreamFilter: (value: T, oldValue: T) => boolean, cancellationToken?: CancellationToken): DataSource<T>; | ||
filter(downStreamFilter: (value: T, oldValue: T) => boolean, upstreamFilter?: (value: T) => boolean, cancellationToken?: CancellationToken): DuplexDataSource<T>; | ||
transformDuplex<A, B = A, C = B, D = C, E = D, F = E, G = F, H = G, I = H, J = I, K = J>(operationA: DuplexDataSourceOperator<T, A>, operationB?: DuplexDataSourceOperator<A, B> | CancellationToken, operationC?: DuplexDataSourceOperator<B, C> | CancellationToken, operationD?: DuplexDataSourceOperator<C, D> | CancellationToken, operationE?: DuplexDataSourceOperator<D, E> | CancellationToken, operationF?: DuplexDataSourceOperator<E, F> | CancellationToken, operationG?: DuplexDataSourceOperator<F, G> | CancellationToken, operationH?: DuplexDataSourceOperator<G, H> | CancellationToken, operationI?: DuplexDataSourceOperator<H, I> | CancellationToken, operationJ?: DuplexDataSourceOperator<I, J> | CancellationToken, operationK?: DuplexDataSourceOperator<J, K> | CancellationToken, cancellationToken?: CancellationToken): DuplexDataSource<K>; | ||
transform<A, B = A, C = B, D = C, E = D, F = E, G = F, H = G, I = H, J = I, K = J>(operationA: DataSourceOperator<T, A>, operationB?: DataSourceOperator<A, B> | CancellationToken, operationC?: DataSourceOperator<B, C> | CancellationToken, operationD?: DataSourceOperator<C, D> | CancellationToken, operationE?: DataSourceOperator<D, E> | CancellationToken, operationF?: DataSourceOperator<E, F> | CancellationToken, operationG?: DataSourceOperator<F, G> | CancellationToken, operationH?: DataSourceOperator<G, H> | CancellationToken, operationI?: DataSourceOperator<H, I> | CancellationToken, operationJ?: DataSourceOperator<I, J> | CancellationToken, operationK?: DataSourceOperator<J, K> | CancellationToken, cancellationToken?: CancellationToken): DataSource<K>; | ||
@@ -177,10 +178,2 @@ /** | ||
pipe(targetDataSource: DuplexDataSource<T>, cancellationToken?: CancellationToken): this; | ||
/** | ||
* Creates a new datasource that is listening to updates from this datasource and transforms them with a mapper function before fowarding them to itself | ||
* @param mapper mapper function that transforms the data when it flows downwards | ||
* @param reverseMapper mapper function that transforms the data when it flows upwards | ||
* @param cancellationToken Cancellation token to cancel the subscriptions added to the datasources by this operation | ||
*/ | ||
map<D>(mapper: (value: T) => D, cancellationToken?: CancellationToken): DataSource<D>; | ||
map<D>(mapper: (value: T) => D, reverseMapper: (value: D) => T, cancellationToken?: CancellationToken): DuplexDataSource<D>; | ||
listenOnce(callback: Callback<T>, cancellationToken?: CancellationToken): Callback<void>; | ||
@@ -192,23 +185,3 @@ /** | ||
awaitNextUpdate(cancellationToken?: CancellationToken): Promise<T>; | ||
debounceUpstream(time: number, cancellationToken?: CancellationToken): DuplexDataSource<T>; | ||
debounceDownstream(time: number, cancellationToken?: CancellationToken): DuplexDataSource<T>; | ||
/** | ||
* Creates a new datasource that listens to this one and forwards updates if they are not the same as the last update | ||
* @param cancellationToken Cancellation token to cancel the subscription the new datasource has to this datasource | ||
*/ | ||
unique(cancellationToken?: CancellationToken): DuplexDataSource<T>; | ||
/** | ||
* Allows flow of data only in one direction | ||
* @param direction direction of the dataflow that is allowed | ||
* @param cancellationToken Cancellation token to cancel the subscriptions the new datasource has to the two parent datasources | ||
*/ | ||
oneWayFlow(direction?: DataFlow, cancellationToken?: CancellationToken): DuplexDataSource<T>; | ||
/** | ||
* Creates a new datasource that listens to this source and combines all updates into a single value | ||
* @param reducer function that aggregates an update with the previous result of aggregation | ||
* @param initialValue initial value given to the new source | ||
* @param cancellationToken Cancellation token to cancel the subscription the new datasource has to this datasource | ||
*/ | ||
reduce(reducer: (p: T, c: T) => T, initialValue: T, cancellationToken?: CancellationToken): DataSource<T>; | ||
/** | ||
* Remove all listeners | ||
@@ -219,3 +192,10 @@ */ | ||
cancelAllUpstream(): void; | ||
/** | ||
* Assign a function to handle errors and map them back to regular values. Rethrow the error in case you want to fallback to emitting error | ||
*/ | ||
handleErrors(callback: (error: any) => T): this; | ||
onError(callback: (error: any) => void, cancellationToken?: CancellationToken): this; | ||
emitError(e: Error, direction: DataFlow): void; | ||
} | ||
export declare function processTransformDuplex<I, O>(operations: DuplexDataSourceOperator<any, any>[], result: DuplexDataSource<O>, direction: DataFlow): Callback<I>; | ||
//# sourceMappingURL=duplex_data_source.d.ts.map |
@@ -5,7 +5,4 @@ import { syncDuplexDataSource } from '../aurum_server/aurum_server_client'; | ||
import { DataSource, processTransform } from './data_source'; | ||
export var DataFlow; | ||
(function (DataFlow) { | ||
DataFlow[DataFlow["UPSTREAM"] = 0] = "UPSTREAM"; | ||
DataFlow[DataFlow["DOWNSTREAM"] = 1] = "DOWNSTREAM"; | ||
})(DataFlow || (DataFlow = {})); | ||
import { DataFlow, ddsOneWayFlow } from './duplex_data_source_operators'; | ||
import { OperationType } from './operator_model'; | ||
/** | ||
@@ -18,5 +15,5 @@ * Same as DataSource except data can flow in both directions | ||
* @param initialValue | ||
* @param propagateWritesToReadStream If a write is done propagate this update back down to all the consumers. Useful at the root node | ||
* @param rootNode If a write is done propagate this update back down to all the consumers. Useful at the root node | ||
*/ | ||
constructor(initialValue, propagateWritesToReadStream = true, name = 'RootDuplexDataSource') { | ||
constructor(initialValue, rootNode = true, name = 'RootDuplexDataSource') { | ||
this.name = name; | ||
@@ -27,3 +24,3 @@ this.value = initialValue; | ||
this.updateUpstreamEvent = new EventEmitter(); | ||
this.propagateWritesToReadStream = propagateWritesToReadStream; | ||
this.propagateWritesToReadStream = rootNode; | ||
} | ||
@@ -68,3 +65,3 @@ /** | ||
static createOneWay(direction = DataFlow.DOWNSTREAM, initialValue) { | ||
return new DuplexDataSource(initialValue, false).oneWayFlow(direction); | ||
return new DuplexDataSource(initialValue, false).transformDuplex(ddsOneWayFlow(direction)); | ||
} | ||
@@ -109,3 +106,5 @@ /** | ||
listenAndRepeat(callback, cancellationToken) { | ||
callback(this.value); | ||
if (this.primed) { | ||
callback(this.value); | ||
} | ||
return this.listen(callback, cancellationToken); | ||
@@ -135,2 +134,14 @@ } | ||
/** | ||
* Subscribes exclusively to updates of the data stream that occur due to an update flowing upstream | ||
* @param callback Callback to call when value is updated | ||
* @param cancellationToken Optional token to control the cancellation of the subscription | ||
* @returns Cancellation callback, can be used to cancel subscription without a cancellation token | ||
*/ | ||
listenUpstreamAndRepeat(callback, cancellationToken) { | ||
if (this.primed) { | ||
callback(this.value); | ||
} | ||
return this.updateUpstreamEvent.subscribe(callback, cancellationToken).cancel; | ||
} | ||
/** | ||
* Subscribes exclusively to one update of the data stream that occur due to an update flowing upstream | ||
@@ -171,26 +182,24 @@ * @param callback Callback to call when value is updated | ||
} | ||
filter(downStreamFilter, upstreamFilter, cancellationToken) { | ||
if (typeof upstreamFilter === 'function') { | ||
const filteredSource = new DuplexDataSource(undefined, false); | ||
this.listenDownstream((newVal) => { | ||
if (downStreamFilter(newVal, filteredSource.value)) { | ||
filteredSource.updateDownstream(newVal); | ||
} | ||
}, cancellationToken); | ||
filteredSource.listenUpstream((newVal) => { | ||
if (upstreamFilter(newVal, this.value)) { | ||
this.updateUpstream(newVal); | ||
} | ||
}, cancellationToken); | ||
return filteredSource; | ||
transformDuplex(operationA, operationB, operationC, operationD, operationE, operationF, operationG, operationH, operationI, operationJ, operationK, cancellationToken) { | ||
let token; | ||
const operations = [ | ||
operationA, | ||
operationB, | ||
operationC, | ||
operationD, | ||
operationE, | ||
operationF, | ||
operationG, | ||
operationH, | ||
operationI, | ||
operationJ, | ||
operationK | ||
].filter((e) => e && (e instanceof CancellationToken ? ((token = e), false) : true)); | ||
if (cancellationToken) { | ||
token = cancellationToken; | ||
} | ||
else { | ||
const filteredSource = new DataSource(); | ||
this.listenDownstream((newVal) => { | ||
if (downStreamFilter(newVal, filteredSource.value)) { | ||
filteredSource.update(newVal); | ||
} | ||
}, upstreamFilter); | ||
return filteredSource; | ||
} | ||
const result = new DuplexDataSource(undefined, false, this.name + ' ' + operations.map((v) => v.name).join(' ')); | ||
(this.primed ? this.listenAndRepeat : this.listen).call(this, processTransformDuplex(operations, result, DataFlow.DOWNSTREAM), token); | ||
result.listenUpstream.call(result, processTransformDuplex(operations, this, DataFlow.UPSTREAM), token); | ||
return result; | ||
} | ||
@@ -229,27 +238,2 @@ transform(operationA, operationB, operationC, operationD, operationE, operationF, operationG, operationH, operationI, operationJ, operationK, cancellationToken) { | ||
} | ||
map(mapper, reverseMapper, cancellationToken) { | ||
if (typeof reverseMapper === 'function') { | ||
let mappedSource; | ||
if (this.primed) { | ||
mappedSource = new DuplexDataSource(mapper(this.value), false); | ||
} | ||
else { | ||
mappedSource = new DuplexDataSource(undefined, false); | ||
} | ||
this.listenDownstream((v) => mappedSource.updateDownstream(mapper(v)), cancellationToken); | ||
mappedSource.listenUpstream((v) => this.updateUpstream(reverseMapper(v)), cancellationToken); | ||
return mappedSource; | ||
} | ||
else { | ||
let mappedSource; | ||
if (this.primed) { | ||
mappedSource = new DataSource(mapper(this.value)); | ||
} | ||
else { | ||
mappedSource = new DataSource(); | ||
} | ||
this.listenDownstream((v) => mappedSource.update(mapper(v)), reverseMapper); | ||
return mappedSource; | ||
} | ||
} | ||
listenOnce(callback, cancellationToken) { | ||
@@ -267,81 +251,3 @@ return this.updateDownstreamEvent.subscribeOnce(callback, cancellationToken).cancel; | ||
} | ||
debounceUpstream(time, cancellationToken) { | ||
const debouncedDataSource = new DuplexDataSource(this.value); | ||
let timeout; | ||
debouncedDataSource.listenUpstream((v) => { | ||
clearTimeout(timeout); | ||
timeout = setTimeout(() => { | ||
this.updateUpstream(v); | ||
}, time); | ||
}, cancellationToken); | ||
this.listenDownstream((v) => { | ||
debouncedDataSource.updateDownstream(v); | ||
}, cancellationToken); | ||
return debouncedDataSource; | ||
} | ||
debounceDownstream(time, cancellationToken) { | ||
const debouncedDataSource = new DuplexDataSource(this.value); | ||
let timeout; | ||
this.listenDownstream((v) => { | ||
clearTimeout(timeout); | ||
timeout = setTimeout(() => { | ||
debouncedDataSource.updateDownstream(v); | ||
}, time); | ||
}, cancellationToken); | ||
debouncedDataSource.listenUpstream((v) => { | ||
this.updateUpstream(v); | ||
}, cancellationToken); | ||
return debouncedDataSource; | ||
} | ||
/** | ||
* Creates a new datasource that listens to this one and forwards updates if they are not the same as the last update | ||
* @param cancellationToken Cancellation token to cancel the subscription the new datasource has to this datasource | ||
*/ | ||
unique(cancellationToken) { | ||
const uniqueSource = new DuplexDataSource(this.value, false); | ||
let upstreamValue = this.value; | ||
let downStreamValue = this.value; | ||
this.listenDownstream((v) => { | ||
if (downStreamValue !== v) { | ||
downStreamValue = v; | ||
uniqueSource.updateDownstream(v); | ||
} | ||
}, cancellationToken); | ||
uniqueSource.listenUpstream((v) => { | ||
if (upstreamValue !== v) { | ||
upstreamValue = v; | ||
this.updateUpstream(v); | ||
} | ||
}, cancellationToken); | ||
return uniqueSource; | ||
} | ||
/** | ||
* Allows flow of data only in one direction | ||
* @param direction direction of the dataflow that is allowed | ||
* @param cancellationToken Cancellation token to cancel the subscriptions the new datasource has to the two parent datasources | ||
*/ | ||
oneWayFlow(direction = DataFlow.DOWNSTREAM, cancellationToken) { | ||
const oneWaySource = new DuplexDataSource(this.value, false); | ||
if (direction === DataFlow.DOWNSTREAM) { | ||
this.listenDownstream((v) => oneWaySource.updateDownstream(v), cancellationToken); | ||
oneWaySource.updateUpstream = () => void 0; | ||
} | ||
else { | ||
oneWaySource.listenUpstream((v) => this.updateUpstream(v)); | ||
oneWaySource.updateDownstream = () => void 0; | ||
} | ||
return oneWaySource; | ||
} | ||
/** | ||
* Creates a new datasource that listens to this source and combines all updates into a single value | ||
* @param reducer function that aggregates an update with the previous result of aggregation | ||
* @param initialValue initial value given to the new source | ||
* @param cancellationToken Cancellation token to cancel the subscription the new datasource has to this datasource | ||
*/ | ||
reduce(reducer, initialValue, cancellationToken) { | ||
const reduceSource = new DataSource(initialValue); | ||
this.listen((v) => reduceSource.update(reducer(reduceSource.value, v)), cancellationToken); | ||
return reduceSource; | ||
} | ||
/** | ||
* Remove all listeners | ||
@@ -359,3 +265,93 @@ */ | ||
} | ||
/** | ||
* Assign a function to handle errors and map them back to regular values. Rethrow the error in case you want to fallback to emitting error | ||
*/ | ||
handleErrors(callback) { | ||
this.errorHandler = callback; | ||
return this; | ||
} | ||
onError(callback, cancellationToken) { | ||
this.errorEvent.subscribe(callback, cancellationToken); | ||
return this; | ||
} | ||
emitError(e, direction) { | ||
if (this.errorHandler) { | ||
try { | ||
if (direction === DataFlow.DOWNSTREAM) { | ||
return this.updateDownstream(this.errorHandler(e)); | ||
} | ||
else { | ||
return this.updateUpstream(this.errorHandler(e)); | ||
} | ||
} | ||
catch (newError) { | ||
e = newError; | ||
} | ||
} | ||
if (this.errorEvent.hasSubscriptions()) { | ||
this.errorEvent.fire(e); | ||
} | ||
else { | ||
throw e; | ||
} | ||
} | ||
} | ||
export function processTransformDuplex(operations, result, direction) { | ||
return async (v) => { | ||
try { | ||
for (const operation of operations) { | ||
switch (operation.operationType) { | ||
case OperationType.NOOP: | ||
case OperationType.MAP: | ||
v = | ||
direction === DataFlow.DOWNSTREAM | ||
? operation.operationDown(v) | ||
: operation.operationUp(v); | ||
break; | ||
case OperationType.MAP_DELAY_FILTER: | ||
const tmp = direction === DataFlow.DOWNSTREAM | ||
? await operation.operationDown(v) | ||
: await operation.operationUp(v); | ||
if (tmp.cancelled) { | ||
return; | ||
} | ||
else { | ||
v = await tmp.item; | ||
} | ||
break; | ||
case OperationType.DELAY: | ||
case OperationType.MAP_DELAY: | ||
v = | ||
direction === DataFlow.DOWNSTREAM | ||
? await operation.operationDown(v) | ||
: await operation.operationUp(v); | ||
break; | ||
case OperationType.DELAY_FILTER: | ||
if (!(direction === DataFlow.DOWNSTREAM | ||
? await operation.operationDown(v) | ||
: await operation.operationUp(v))) { | ||
return; | ||
} | ||
break; | ||
case OperationType.FILTER: | ||
if (!(direction === DataFlow.DOWNSTREAM | ||
? operation.operationDown(v) | ||
: operation.operationUp(v))) { | ||
return; | ||
} | ||
break; | ||
} | ||
} | ||
if (direction === DataFlow.DOWNSTREAM) { | ||
result.updateDownstream(v); | ||
} | ||
else { | ||
result.updateUpstream(v); | ||
} | ||
} | ||
catch (e) { | ||
result.emitError(e, direction); | ||
} | ||
}; | ||
} | ||
//# sourceMappingURL=duplex_data_source.js.map |
@@ -10,7 +10,12 @@ export declare enum OperationType { | ||
} | ||
export interface DataSourceOperator<T, M> { | ||
interface SourceOperator { | ||
operationType: OperationType; | ||
typescriptLimitationWorkaround?: (value: T) => M; | ||
name: string; | ||
} | ||
export interface DataSourceOperator<T, M> extends SourceOperator { | ||
typescriptLimitationWorkaround?: (value: T) => M; | ||
} | ||
export interface DuplexDataSourceOperator<T, M> extends SourceOperator { | ||
typescriptLimitationWorkaround?: (value: T) => M; | ||
} | ||
export interface DataSourceFilterOperator<T> extends DataSourceOperator<T, T> { | ||
@@ -20,2 +25,12 @@ operationType: OperationType.FILTER; | ||
} | ||
export interface DuplexDataSourceFilterOperator<T> extends DuplexDataSourceOperator<T, T> { | ||
operationType: OperationType.FILTER; | ||
operationDown: (value: T) => boolean; | ||
operationUp: (value: T) => boolean; | ||
} | ||
export interface DuplexDataSourceMapOperator<T, M> extends DuplexDataSourceOperator<T, M> { | ||
operationType: OperationType.MAP; | ||
operationDown: (value: T) => M; | ||
operationUp: (value: M) => T; | ||
} | ||
export interface DataSourceMapOperator<T, M> extends DataSourceOperator<T, M> { | ||
@@ -44,2 +59,13 @@ operationType: OperationType.MAP; | ||
} | ||
export interface DuplexDataSourceMapDelayFilterOperator<T, M> extends DuplexDataSourceOperator<T, M> { | ||
operationType: OperationType.MAP_DELAY_FILTER; | ||
operationDown: (value: T) => Promise<{ | ||
item: M; | ||
cancelled: boolean; | ||
}>; | ||
operationUp: (value: T) => Promise<{ | ||
item: M; | ||
cancelled: boolean; | ||
}>; | ||
} | ||
export interface DataSourceDelayFilterOperator<T> extends DataSourceOperator<T, T> { | ||
@@ -49,2 +75,8 @@ operationType: OperationType.DELAY_FILTER; | ||
} | ||
export interface DuplexDataSourceDelayFilterOperator<T> extends DuplexDataSourceOperator<T, T> { | ||
operationType: OperationType.DELAY_FILTER; | ||
operationDown: (value: T) => Promise<boolean>; | ||
operationUp: (value: T) => Promise<boolean>; | ||
} | ||
export {}; | ||
//# sourceMappingURL=operator_model.d.ts.map |
Sorry, the diff of this file is too big to display
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is too big to display
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
3368242
10
263
32897
12