sparql-engine
Advanced tools
Comparing version 0.5.5 to 0.5.6
@@ -0,1 +1,2 @@ | ||
import { SPARQL_OPERATION } from './engine/plan-builder'; | ||
import AggregateStageBuilder from './engine/stages/aggregate-stage-builder'; | ||
@@ -14,14 +15,2 @@ import BGPStageBuilder from './engine/stages/bgp-stage-builder'; | ||
import UpdateStageBuilder from './engine/stages/update-stage-builder'; | ||
export { default as Dataset } from './rdf/dataset'; | ||
export { BindingBase } from './rdf/bindings'; | ||
export { default as HashMapDataset } from './rdf/hashmap-dataset'; | ||
export { default as Graph } from './rdf/graph'; | ||
export { default as ExecutionContext } from './engine/context/execution-context'; | ||
export { PlanBuilder } from './engine/plan-builder'; | ||
import { SPARQL_OPERATION } from './engine/plan-builder'; | ||
export { Pipeline } from './engine/pipeline/pipeline'; | ||
export { PipelineEngine, PipelineInput } from './engine/pipeline/pipeline-engine'; | ||
export { default as RxjsPipeline } from './engine/pipeline/rxjs-pipeline'; | ||
export { default as VectorPipeline } from './engine/pipeline/vector-pipeline'; | ||
export { terms } from './rdf-terms'; | ||
declare const stages: { | ||
@@ -43,2 +32,13 @@ SPARQL_OPERATION: typeof SPARQL_OPERATION; | ||
}; | ||
export { default as Dataset } from './rdf/dataset'; | ||
export { Bindings, BindingBase } from './rdf/bindings'; | ||
export { default as HashMapDataset } from './rdf/hashmap-dataset'; | ||
export { default as Graph } from './rdf/graph'; | ||
export { default as ExecutionContext } from './engine/context/execution-context'; | ||
export { PlanBuilder } from './engine/plan-builder'; | ||
export { Pipeline } from './engine/pipeline/pipeline'; | ||
export { PipelineEngine, PipelineInput, PipelineStage, StreamPipelineInput } from './engine/pipeline/pipeline-engine'; | ||
export { default as RxjsPipeline } from './engine/pipeline/rxjs-pipeline'; | ||
export { default as VectorPipeline } from './engine/pipeline/vector-pipeline'; | ||
export { terms } from './rdf-terms'; | ||
export { stages }; |
@@ -26,2 +26,4 @@ /* file : api.ts | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
// stages builders | ||
var plan_builder_1 = require("./engine/plan-builder"); | ||
var aggregate_stage_builder_1 = require("./engine/stages/aggregate-stage-builder"); | ||
@@ -40,5 +42,24 @@ var bgp_stage_builder_1 = require("./engine/stages/bgp-stage-builder"); | ||
var update_stage_builder_1 = require("./engine/stages/update-stage-builder"); | ||
var stages = { | ||
SPARQL_OPERATION: plan_builder_1.SPARQL_OPERATION, | ||
AggregateStageBuilder: aggregate_stage_builder_1.default, | ||
BGPStageBuilder: bgp_stage_builder_1.default, | ||
BindStageBuilder: bind_stage_builder_1.default, | ||
DistinctStageBuilder: distinct_stage_builder_1.default, | ||
FilterStageBuilder: filter_stage_builder_1.default, | ||
GlushkovStageBuilder: glushkov_stage_builder_1.default, | ||
GraphStageBuilder: graph_stage_builder_1.default, | ||
MinusStageBuilder: minus_stage_builder_1.default, | ||
ServiceStageBuilder: service_stage_builder_1.default, | ||
OptionalStageBuilder: optional_stage_builder_1.default, | ||
OrderByStageBuilder: orderby_stage_builder_1.default, | ||
UnionStageBuilder: union_stage_builder_1.default, | ||
UpdateStageBuilder: update_stage_builder_1.default | ||
}; | ||
exports.stages = stages; | ||
// base types | ||
var dataset_1 = require("./rdf/dataset"); | ||
exports.Dataset = dataset_1.default; | ||
var bindings_1 = require("./rdf/bindings"); | ||
exports.Bindings = bindings_1.Bindings; | ||
exports.BindingBase = bindings_1.BindingBase; | ||
@@ -51,5 +72,4 @@ var hashmap_dataset_1 = require("./rdf/hashmap-dataset"); | ||
exports.ExecutionContext = execution_context_1.default; | ||
var plan_builder_1 = require("./engine/plan-builder"); | ||
exports.PlanBuilder = plan_builder_1.PlanBuilder; | ||
var plan_builder_2 = require("./engine/plan-builder"); | ||
exports.PlanBuilder = plan_builder_2.PlanBuilder; | ||
// pipeline | ||
@@ -67,19 +87,1 @@ var pipeline_1 = require("./engine/pipeline/pipeline"); | ||
exports.terms = rdf_terms_1.terms; | ||
// stages builders | ||
var stages = { | ||
SPARQL_OPERATION: plan_builder_2.SPARQL_OPERATION, | ||
AggregateStageBuilder: aggregate_stage_builder_1.default, | ||
BGPStageBuilder: bgp_stage_builder_1.default, | ||
BindStageBuilder: bind_stage_builder_1.default, | ||
DistinctStageBuilder: distinct_stage_builder_1.default, | ||
FilterStageBuilder: filter_stage_builder_1.default, | ||
GlushkovStageBuilder: glushkov_stage_builder_1.default, | ||
GraphStageBuilder: graph_stage_builder_1.default, | ||
MinusStageBuilder: minus_stage_builder_1.default, | ||
ServiceStageBuilder: service_stage_builder_1.default, | ||
OptionalStageBuilder: optional_stage_builder_1.default, | ||
OrderByStageBuilder: orderby_stage_builder_1.default, | ||
UnionStageBuilder: union_stage_builder_1.default, | ||
UpdateStageBuilder: update_stage_builder_1.default | ||
}; | ||
exports.stages = stages; |
@@ -110,2 +110,9 @@ /** | ||
/** | ||
* Do something after the PipelineStage has produced all its results | ||
* @param input - Input PipelineStage | ||
* @param callback - Function invoked after the PipelineStage has produced all its results | ||
* @return Output PipelineStage | ||
*/ | ||
abstract finalize<T>(input: PipelineStage<T>, callback: () => void): PipelineStage<T>; | ||
/** | ||
* Maps each source value to an array of values which is merged in the output PipelineStage. | ||
@@ -146,9 +153,2 @@ * @param input - Input PipelineStage | ||
/** | ||
* Returns a PipelineStage that emits all items emitted by the source PipelineStage that are distinct by comparison from previous items. | ||
* @param input - Input PipelineStage | ||
* @param selector - Optional function to select which value you want to check as distinct. | ||
* @return A PipelineStage that emits items from the source PipelineStage with distinct values. | ||
*/ | ||
abstract distinct<T, K>(input: PipelineStage<T>, selector?: (value: T) => K): PipelineStage<T>; | ||
/** | ||
* Apply a callback on every item emitted by the source PipelineStage | ||
@@ -180,2 +180,9 @@ * @param input - Input PipelineStage | ||
/** | ||
* Returns a PipelineStage that emits all items emitted by the source PipelineStage that are distinct by comparison from previous items. | ||
* @param input - Input PipelineStage | ||
* @param selector - Optional function to select which value you want to check as distinct. | ||
* @return A PipelineStage that emits items from the source PipelineStage with distinct values. | ||
*/ | ||
distinct<T, K>(input: PipelineStage<T>, selector?: (value: T) => T | K): PipelineStage<T>; | ||
/** | ||
* Emits only the first value (or the first value that meets some condition) emitted by the source PipelineStage. | ||
@@ -200,2 +207,30 @@ * @param input - Input PipelineStage | ||
tap<T>(input: PipelineStage<T>, cb: (value: T) => void): PipelineStage<T>; | ||
/** | ||
* Find the smallest value produced by a pipeline of iterators. | ||
* It takes a ranking function as input, which is invoked with (x, y) | ||
* and must returns True if x < y and False otherwise. | ||
* Warning: this function needs to materialize all values of the pipeline. | ||
* @param input - Input PipelineStage | ||
* @param comparator - (optional) Ranking function | ||
* @return A pipeline stage that emits the lowest value found | ||
*/ | ||
min<T>(input: PipelineStage<T>, ranking?: (x: T, y: T) => boolean): PipelineStage<T>; | ||
/** | ||
* Find the smallest value produced by a pipeline of iterators. | ||
* It takes a ranking function as input, which is invoked with (x, y) | ||
* and must returns True if x > y and False otherwise. | ||
* Warning: this function needs to materialize all values of the pipeline. | ||
* @param input - Input PipelineStage | ||
* @param comparator - (optional) Ranking function | ||
* @return A pipeline stage that emits the highest value found | ||
*/ | ||
max<T>(input: PipelineStage<T>, ranking?: (x: T, y: T) => boolean): PipelineStage<T>; | ||
/** | ||
* Groups the items produced by a pipeline according to a specified criterion, | ||
* and emits the resulting groups | ||
* @param input - Input PipelineStage | ||
* @param keySelector - A function that extracts the grouping key for each item | ||
* @param elementSelector - (optional) A function that transforms items before inserting them in a group | ||
*/ | ||
groupBy<T, K, R>(input: PipelineStage<T>, keySelector: (value: T) => K, elementSelector?: (value: T) => R): PipelineStage<[K, R[]]>; | ||
} |
@@ -46,2 +46,3 @@ /* file : pipeline-engine.ts | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
var lodash_1 = require("lodash"); | ||
/** | ||
@@ -67,2 +68,14 @@ * Abstract representation used to apply transformations on a pipeline of iterators. | ||
/** | ||
* Returns a PipelineStage that emits all items emitted by the source PipelineStage that are distinct by comparison from previous items. | ||
* @param input - Input PipelineStage | ||
* @param selector - Optional function to select which value you want to check as distinct. | ||
* @return A PipelineStage that emits items from the source PipelineStage with distinct values. | ||
*/ | ||
PipelineEngine.prototype.distinct = function (input, selector) { | ||
if (lodash_1.isUndefined(selector)) { | ||
selector = lodash_1.identity; | ||
} | ||
return this.flatMap(this.collect(input), function (values) { return lodash_1.uniqBy(values, selector); }); | ||
}; | ||
/** | ||
* Emits only the first value (or the first value that meets some condition) emitted by the source PipelineStage. | ||
@@ -96,4 +109,85 @@ * @param input - Input PipelineStage | ||
}; | ||
/** | ||
* Find the smallest value produced by a pipeline of iterators. | ||
* It takes a ranking function as input, which is invoked with (x, y) | ||
* and must returns True if x < y and False otherwise. | ||
* Warning: this function needs to materialize all values of the pipeline. | ||
* @param input - Input PipelineStage | ||
* @param comparator - (optional) Ranking function | ||
* @return A pipeline stage that emits the lowest value found | ||
*/ | ||
PipelineEngine.prototype.min = function (input, ranking) { | ||
if (lodash_1.isUndefined(ranking)) { | ||
ranking = function (x, y) { return x < y; }; | ||
} | ||
return this.map(this.collect(input), function (values) { | ||
var minValue = values[0]; | ||
for (var i = 1; i < values.length - 1; i++) { | ||
if (ranking(values[i], minValue)) { | ||
minValue = values[i]; | ||
} | ||
} | ||
return minValue; | ||
}); | ||
}; | ||
/** | ||
* Find the smallest value produced by a pipeline of iterators. | ||
* It takes a ranking function as input, which is invoked with (x, y) | ||
* and must returns True if x > y and False otherwise. | ||
* Warning: this function needs to materialize all values of the pipeline. | ||
* @param input - Input PipelineStage | ||
* @param comparator - (optional) Ranking function | ||
* @return A pipeline stage that emits the highest value found | ||
*/ | ||
PipelineEngine.prototype.max = function (input, ranking) { | ||
if (lodash_1.isUndefined(ranking)) { | ||
ranking = function (x, y) { return x > y; }; | ||
} | ||
return this.map(this.collect(input), function (values) { | ||
var maxValue = values[0]; | ||
for (var i = 1; i < values.length - 1; i++) { | ||
if (ranking(values[i], maxValue)) { | ||
maxValue = values[i]; | ||
} | ||
} | ||
return maxValue; | ||
}); | ||
}; | ||
/** | ||
* Groups the items produced by a pipeline according to a specified criterion, | ||
* and emits the resulting groups | ||
* @param input - Input PipelineStage | ||
* @param keySelector - A function that extracts the grouping key for each item | ||
* @param elementSelector - (optional) A function that transforms items before inserting them in a group | ||
*/ | ||
PipelineEngine.prototype.groupBy = function (input, keySelector, elementSelector) { | ||
var _this = this; | ||
if (lodash_1.isUndefined(elementSelector)) { | ||
elementSelector = lodash_1.identity; | ||
} | ||
var groups = new Map(); | ||
var stage = this.map(input, function (value) { | ||
return { | ||
key: keySelector(value), | ||
value: elementSelector(value) | ||
}; | ||
}); | ||
return this.mergeMap(this.collect(stage), function (subgroups) { | ||
// build groups | ||
subgroups.forEach(function (g) { | ||
if (!groups.has(g.key)) { | ||
groups.set(g.key, [g.value]); | ||
} | ||
else { | ||
groups.set(g.key, groups.get(g.key).concat([g.value])); | ||
} | ||
}); | ||
// inject groups into the pipeline | ||
return _this.fromAsync(function (input) { | ||
groups.forEach(function (value, key) { return input.next([key, value]); }); | ||
}); | ||
}); | ||
}; | ||
return PipelineEngine; | ||
}()); | ||
exports.PipelineEngine = PipelineEngine; |
@@ -30,6 +30,7 @@ import { Observable, Subscriber } from 'rxjs'; | ||
filter<T>(input: Observable<T>, predicate: (value: T) => boolean): Observable<T>; | ||
finalize<T>(input: Observable<T>, callback: () => void): Observable<T>; | ||
reduce<F, T>(input: Observable<F>, reducer: (acc: T, value: F) => T, initial: T): Observable<T>; | ||
limit<T>(input: Observable<T>, stopAfter: number): Observable<T>; | ||
skip<T>(input: Observable<T>, toSkip: number): Observable<T>; | ||
distinct<T, K>(input: Observable<T>, selector?: (value: T) => K): Observable<T>; | ||
distinct<T, K>(input: Observable<T>, selector?: (value: T) => T | K): Observable<T>; | ||
defaultValues<T>(input: Observable<T>, ...values: T[]): Observable<T>; | ||
@@ -36,0 +37,0 @@ bufferCount<T>(input: Observable<T>, count: number): Observable<T[]>; |
@@ -139,2 +139,5 @@ /* file : rxjs-pipeline.ts | ||
}; | ||
RxjsPipeline.prototype.finalize = function (input, callback) { | ||
return input.pipe(operators_1.finalize(callback)); | ||
}; | ||
RxjsPipeline.prototype.reduce = function (input, reducer, initial) { | ||
@@ -141,0 +144,0 @@ return input.pipe(operators_1.reduce(reducer, initial)); |
@@ -41,6 +41,6 @@ import { PipelineInput, StreamPipelineInput, PipelineStage, PipelineEngine } from './pipeline-engine'; | ||
filter<T>(input: VectorStage<T>, predicate: (value: T) => boolean): VectorStage<T>; | ||
finalize<T>(input: VectorStage<T>, callback: () => void): VectorStage<T>; | ||
reduce<F, T>(input: VectorStage<F>, reducer: (acc: T, value: F) => T, initial: T): VectorStage<T>; | ||
limit<T>(input: VectorStage<T>, stopAfter: number): VectorStage<T>; | ||
skip<T>(input: VectorStage<T>, toSkip: number): VectorStage<T>; | ||
distinct<T, K>(input: VectorStage<T>, selector?: (value: T) => K): VectorStage<T>; | ||
defaultValues<T>(input: VectorStage<T>, ...values: T[]): VectorStage<T>; | ||
@@ -47,0 +47,0 @@ bufferCount<T>(input: VectorStage<T>, count: number): VectorStage<T[]>; |
@@ -178,2 +178,8 @@ /* file : vector-pipeline.ts | ||
}; | ||
VectorPipeline.prototype.finalize = function (input, callback) { | ||
return new VectorStage(input.getContent().then(function (c) { | ||
callback(); | ||
return c; | ||
})); | ||
}; | ||
VectorPipeline.prototype.reduce = function (input, reducer, initial) { | ||
@@ -188,8 +194,2 @@ return new VectorStage(input.getContent().then(function (c) { return [c.reduce(reducer, initial)]; })); | ||
}; | ||
VectorPipeline.prototype.distinct = function (input, selector) { | ||
if (lodash_1.isUndefined(selector)) { | ||
return new VectorStage(input.getContent().then(function (c) { return lodash_1.uniq(c); })); | ||
} | ||
return new VectorStage(input.getContent().then(function (c) { return lodash_1.uniqBy(c, selector); })); | ||
}; | ||
VectorPipeline.prototype.defaultValues = function (input) { | ||
@@ -196,0 +196,0 @@ var values = []; |
{ | ||
"name": "sparql-engine", | ||
"version": "0.5.5", | ||
"version": "0.5.6", | ||
"description": "A framework for building SPARQL query engines in Javascript", | ||
@@ -5,0 +5,0 @@ "main": "dist/api.js", |
@@ -27,2 +27,4 @@ /* file : api.ts | ||
// stages builders | ||
import { SPARQL_OPERATION } from './engine/plan-builder' | ||
import AggregateStageBuilder from './engine/stages/aggregate-stage-builder' | ||
@@ -42,17 +44,2 @@ import BGPStageBuilder from './engine/stages/bgp-stage-builder' | ||
export { default as Dataset } from './rdf/dataset' | ||
export { BindingBase } from './rdf/bindings' | ||
export { default as HashMapDataset } from './rdf/hashmap-dataset' | ||
export { default as Graph } from './rdf/graph' | ||
export { default as ExecutionContext } from './engine/context/execution-context' | ||
export { PlanBuilder } from './engine/plan-builder' | ||
import { SPARQL_OPERATION } from './engine/plan-builder' | ||
// pipeline | ||
export { Pipeline } from './engine/pipeline/pipeline' | ||
export { PipelineEngine, PipelineInput } from './engine/pipeline/pipeline-engine' | ||
export { default as RxjsPipeline } from './engine/pipeline/rxjs-pipeline' | ||
export { default as VectorPipeline } from './engine/pipeline/vector-pipeline' | ||
// RDF terms Utilities | ||
export { terms } from './rdf-terms' | ||
// stages builders | ||
const stages = { | ||
@@ -74,2 +61,18 @@ SPARQL_OPERATION, | ||
} | ||
// base types | ||
export { default as Dataset } from './rdf/dataset' | ||
export { Bindings, BindingBase } from './rdf/bindings' | ||
export { default as HashMapDataset } from './rdf/hashmap-dataset' | ||
export { default as Graph } from './rdf/graph' | ||
export { default as ExecutionContext } from './engine/context/execution-context' | ||
export { PlanBuilder } from './engine/plan-builder' | ||
// pipeline | ||
export { Pipeline } from './engine/pipeline/pipeline' | ||
export { PipelineEngine, PipelineInput, PipelineStage, StreamPipelineInput } from './engine/pipeline/pipeline-engine' | ||
export { default as RxjsPipeline } from './engine/pipeline/rxjs-pipeline' | ||
export { default as VectorPipeline } from './engine/pipeline/vector-pipeline' | ||
// RDF terms Utilities | ||
export { terms } from './rdf-terms' | ||
export { stages } |
@@ -27,2 +27,4 @@ /* file : pipeline-engine.ts | ||
import { identity, isUndefined, uniqBy } from 'lodash' | ||
/** | ||
@@ -33,2 +35,7 @@ * The input of a {@link PipelineStage}, either another {@link PipelineStage}, an array, an iterable or a promise. | ||
interface SubGroup<K, R> { | ||
key: K, | ||
value: R | ||
} | ||
/** | ||
@@ -154,2 +161,10 @@ * An input of a {@link PipelineStage} which produces items in stream/async way. | ||
/** | ||
* Do something after the PipelineStage has produced all its results | ||
* @param input - Input PipelineStage | ||
* @param callback - Function invoked after the PipelineStage has produced all its results | ||
* @return Output PipelineStage | ||
*/ | ||
abstract finalize<T>(input: PipelineStage<T>, callback: () => void): PipelineStage<T>; | ||
/** | ||
* Maps each source value to an array of values which is merged in the output PipelineStage. | ||
@@ -197,10 +212,2 @@ * @param input - Input PipelineStage | ||
/** | ||
* Returns a PipelineStage that emits all items emitted by the source PipelineStage that are distinct by comparison from previous items. | ||
* @param input - Input PipelineStage | ||
* @param selector - Optional function to select which value you want to check as distinct. | ||
* @return A PipelineStage that emits items from the source PipelineStage with distinct values. | ||
*/ | ||
abstract distinct<T, K>(input: PipelineStage<T>, selector?: (value: T) => K): PipelineStage<T>; | ||
/** | ||
* Apply a callback on every item emitted by the source PipelineStage | ||
@@ -236,2 +243,15 @@ * @param input - Input PipelineStage | ||
/** | ||
* Returns a PipelineStage that emits all items emitted by the source PipelineStage that are distinct by comparison from previous items. | ||
* @param input - Input PipelineStage | ||
* @param selector - Optional function to select which value you want to check as distinct. | ||
* @return A PipelineStage that emits items from the source PipelineStage with distinct values. | ||
*/ | ||
distinct<T, K>(input: PipelineStage<T>, selector?: (value: T) => T | K): PipelineStage<T> { | ||
if (isUndefined(selector)) { | ||
selector = identity | ||
} | ||
return this.flatMap(this.collect(input), (values: T[])=> uniqBy(values, selector!)) | ||
} | ||
/** | ||
* Emits only the first value (or the first value that meets some condition) emitted by the source PipelineStage. | ||
@@ -267,2 +287,84 @@ * @param input - Input PipelineStage | ||
} | ||
/** | ||
* Find the smallest value produced by a pipeline of iterators. | ||
* It takes a ranking function as input, which is invoked with (x, y) | ||
* and must returns True if x < y and False otherwise. | ||
* Warning: this function needs to materialize all values of the pipeline. | ||
* @param input - Input PipelineStage | ||
* @param comparator - (optional) Ranking function | ||
* @return A pipeline stage that emits the lowest value found | ||
*/ | ||
min<T>(input: PipelineStage<T>, ranking?: (x: T, y: T) => boolean): PipelineStage<T> { | ||
if (isUndefined(ranking)) { | ||
ranking = (x: T, y: T) => x < y | ||
} | ||
return this.map(this.collect(input), (values: T[]) => { | ||
let minValue = values[0] | ||
for(let i = 1; i < values.length - 1; i++) { | ||
if (ranking!(values[i], minValue)) { | ||
minValue = values[i] | ||
} | ||
} | ||
return minValue | ||
}) | ||
} | ||
/** | ||
* Find the smallest value produced by a pipeline of iterators. | ||
* It takes a ranking function as input, which is invoked with (x, y) | ||
* and must returns True if x > y and False otherwise. | ||
* Warning: this function needs to materialize all values of the pipeline. | ||
* @param input - Input PipelineStage | ||
* @param comparator - (optional) Ranking function | ||
* @return A pipeline stage that emits the highest value found | ||
*/ | ||
max<T>(input: PipelineStage<T>, ranking?: (x: T, y: T) => boolean): PipelineStage<T> { | ||
if (isUndefined(ranking)) { | ||
ranking = (x: T, y: T) => x > y | ||
} | ||
return this.map(this.collect(input), (values: T[]) => { | ||
let maxValue = values[0] | ||
for(let i = 1; i < values.length - 1; i++) { | ||
if (ranking!(values[i], maxValue)) { | ||
maxValue = values[i] | ||
} | ||
} | ||
return maxValue | ||
}) | ||
} | ||
/** | ||
* Groups the items produced by a pipeline according to a specified criterion, | ||
* and emits the resulting groups | ||
* @param input - Input PipelineStage | ||
* @param keySelector - A function that extracts the grouping key for each item | ||
* @param elementSelector - (optional) A function that transforms items before inserting them in a group | ||
*/ | ||
groupBy<T, K, R>(input: PipelineStage<T>, keySelector: (value: T) => K, elementSelector?: (value: T) => R): PipelineStage<[K, R[]]> { | ||
if (isUndefined(elementSelector)) { | ||
elementSelector = identity | ||
} | ||
const groups: Map<K, R[]> = new Map() | ||
let stage: PipelineStage<SubGroup<K, R>> = this.map(input, value => { | ||
return { | ||
key: keySelector(value), | ||
value: elementSelector!(value) | ||
} | ||
}) | ||
return this.mergeMap(this.collect(stage), (subgroups: SubGroup<K, R>[]) => { | ||
// build groups | ||
subgroups.forEach(g => { | ||
if (!groups.has(g.key)) { | ||
groups.set(g.key, [ g.value ]) | ||
} else { | ||
groups.set(g.key, groups.get(g.key)!.concat([g.value])) | ||
} | ||
}) | ||
// inject groups into the pipeline | ||
return this.fromAsync(input => { | ||
groups.forEach((value, key) => input.next([key, value])) | ||
}) | ||
}) | ||
} | ||
} |
@@ -35,2 +35,3 @@ /* file : rxjs-pipeline.ts | ||
filter, | ||
finalize, | ||
first, | ||
@@ -129,2 +130,6 @@ flatMap, | ||
finalize<T> (input: Observable<T>, callback: () => void): Observable<T> { | ||
return input.pipe(finalize(callback)) | ||
} | ||
reduce<F,T>(input: Observable<F>, reducer: (acc: T, value: F) => T, initial: T): Observable<T> { | ||
@@ -142,3 +147,3 @@ return input.pipe(reduce(reducer, initial)) | ||
distinct<T, K>(input: Observable<T>, selector?: (value: T) => K): Observable<T> { | ||
distinct<T, K>(input: Observable<T>, selector?: (value: T) => T | K): Observable<T> { | ||
return input.pipe(distinct(selector)) | ||
@@ -145,0 +150,0 @@ } |
@@ -175,2 +175,9 @@ /* file : vector-pipeline.ts | ||
finalize<T> (input: VectorStage<T>, callback: () => void): VectorStage<T> { | ||
return new VectorStage<T>(input.getContent().then(c => { | ||
callback() | ||
return c | ||
})) | ||
} | ||
reduce<F,T>(input: VectorStage<F>, reducer: (acc: T, value: F) => T, initial: T): VectorStage<T> { | ||
@@ -188,9 +195,2 @@ return new VectorStage<T>(input.getContent().then(c => [c.reduce(reducer, initial)])) | ||
distinct<T, K>(input: VectorStage<T>, selector?: (value: T) => K): VectorStage<T> { | ||
if (isUndefined(selector)) { | ||
return new VectorStage<T>(input.getContent().then(c => uniq(c))) | ||
} | ||
return new VectorStage<T>(input.getContent().then(c => uniqBy(c, selector))) | ||
} | ||
defaultValues<T>(input: VectorStage<T>, ...values: T[]): VectorStage<T> { | ||
@@ -197,0 +197,0 @@ return new VectorStage<T>(input.getContent().then(content => { |
798886
18634