
Product
Introducing Socket Fix for Safe, Automated Dependency Upgrades
Automatically fix and test dependency updates with socket fix—a new CLI tool that turns CVE alerts into safe, automated upgrades.
@hpcc-js/dataflow
Advanced tools
A small functional library for processing "data flows" in JavaScript (more examples on ObservableHQ). Highlights:
The underlying motivation for this library is to simplify the processing of data in an efficient way. The analogy we use is one of a "data" pipe, which consists of:
Some other properties of pipes are:
map
, filter
, sort
, ...).min
, max
, quartile
, ...).map
, filter
, sort
, ...).min
, max
, reduce
...)._Simple example of data flowing through a pipe
of activities: filter
->map
->filter
->first
import { count, filter, first, generate, map, max, pipe, sensor } from "@hpcc-js/dataflow";
const c1 = count();
const c2 = count();
const c3 = count();
const m1 = max(row => row.value);
const p1 = pipe(
sensor(c1), // Keep running count of input
filter(n => n <= 0.5), // Filter out numbers > 0.5
sensor(c2), // Keep running count of filtered rows
map((n, idx) => // Convert to JSON Object
({ index: idx, value: n })),
filter(row => row.index % 2 === 0), // Filter even row indecies
sensor(c3), // Keep running count of final rows
sensor(m1), // Track largest value
first(3) // Take first 3 rows
);
console.log(`Counts: ${c1.peek()}, ${c2.peek()}, ${c3.peek()}`);
// [1] => Counts: undefined, undefined, undefined
const outIterable = p1(generate(Math.random, 1000));
console.log(`Counts: ${c1.peek()}, ${c2.peek()}, ${c3.peek()}`);
// [2] => Counts: undefined, undefined, undefined
console.log(JSON.stringify([...outIterable]));
// [3] => [{"index":0,"value":0.19075931906641008},{"index":2,"value":0.4873469062925415},{"index":4,"value":0.4412516774100035}]
console.log(`Counts: ${c1.peek()}, ${c2.peek()}, ${c3.peek()}, ${m1.peek()}`);
// [4] => Counts: 6, 5, 3, 0.4873469062925415
const outArray = [...p1([0.7, 0.5, 0.4, 0.8, 0.3, 1])];
console.log(JSON.stringify(outArray));
// [5] => [{"index":0,"value":0.5},{"index":2,"value":0.3}]
console.log(`Counts: ${c1.peek()}, ${c2.peek()}, ${c3.peek()}, ${m1.peek()}`);
// [6] => Counts: 6, 3, 2, 0.5
Notes:
p1(generate(Math.random, 1000))
only returns an IterableIterator
. IOW no data has flown through the pipe yet.[...outIterable]
Is a shorthand way to populate an array with data from an iterable.p1
can be reused with new data, this time the input is a simple arrayFurther the sensors can be observed at any point during the process.
for (const row of p1(generate(Math.random, 1000000))) {
console.log(`${row.index}: ${c1.peek()}, ${c2.peek()}, ${c3.peek()}, ${m1.peek()}`);
}
// => 0: 1, 1, 1, 0.13662528848681
// => 2: 3, 3, 2, 0.13662528848681
// => 4: 7, 5, 3, 0.4328468228869129
Note: Even though there is 1000000 rows of data being potentially generated, only 7 are actually read for this run.
Functions which alter data inside the dataflow pipe
# concat(iterable, iterable): iterable
# concat(iterable): (iterable) => iterable
Concatenates two iterables into a single iterable. Similar to Array.concat.
concat(["a", "b", "c"], ["d", "e", "f"]); // => "a", "b", "c", "d", "e", "f"
const concatDEF = concat(["d", "e", "f"]);
concatDEF(["a", "b", "c"]); // => "a", "b", "c", "d", "e", "f"
concatDEF(["1", "2", "3"]); // => "1", "2", "3", "d", "e", "f"
# each(iterable, callbackFn): iterable
# each(_callbackFn): (iterable) => iterable
Perform callback for each
row in an iterable. Cannot alter the iterable value. Similar to Array.forEach. Useful for debugging steps in a pipe.
each(["a", "b", "c"], (row, idx) => console.log(row)); // => "a", "b", "c"
const logFlow = each(console.log);
logFlow(["a", "b", "c"]); // => "a", "b", "c"
# entries(iterable): iterable
# entries(): (iterable) => iterable
Perform callback for entries
row in an iterable. Cannot alter the iterable value. Similar to Array.entries.
entries(["a", "b", "c"]); // => [0, "a"], [1, "b"], [2, "c"]
const calcEntries = entries();
calcEntries(["a", "b", "c"]); // => [0, "a"], [1, "b"], [2, "c"]
# filter(iterable, condition): iterable
# filter(condition): (iterable) => iterable
Filter iterable based on some condition
. Similar to Array.filter.
const words = ["spray", "limit", "elite", "exuberant", "destruction", "present"];
filter(words, word => word.length > 6); // => "exuberant", "destruction", "present"
const smallWords = filter(word => word.length <= 6);
smallWords(words); // => "spray", "limit", "elite"
# first(iterable, number): iterable
# first(number): (iterable) => iterable
Limit the flow to the first N rows of data.
const words = ["spray", "limit", "elite", "exuberant", "destruction", "present"];
first(words, 3); // => "spray", "limit", "elite"
const first2 = first(2);
first2(words); // => "spray", "limit"
# group(iterable, condition): iterable
# group(condition): (iterable) => iterable
Groups data based on some grouping condition. Output is in the form {key: groupCondition, value:[...]}, where the key has to be either a number
or a string
.
const words = ["one", "two", "three", "four", "five", "six"];
group(words, word => word.length); // => {key: 3, value: ["one", "two", "six"]}, {key: 4, value: ["four", "five"]}, { key: 5, value: ["three"]}
const groupByLength = group(word => word.length);
groupByLength(words); // => {key: 3, value: ["one", "two", "six"]}, {key: 4, value: ["four", "five"]}, { key: 5, value: ["three"]}
# histogram(iterable, condition, options): iterable
# histogram(condition, options): (iterable) => iterable
Groups data into buckets (or bins) based on numeric ranges. Output is in the form {from: numeric, to: numeric, value:[...]}
.
Available options are:
{ buckets: number } // Specify number of buckets / bins
or
{ min: number, range: number } // Specify starting bucket (min) and size of bucket (range)
const data = [1, 12, 13, 13, 3, 14, 19, 6];
histogram(data, n => n, { buckets: 3 }); // => {"from":1,"to":7,"value":[1,3,6]},{"from":7,"to":13,"value":[12]},{"from":13,"to":19,"value":[13,13,14,19]}
histogram(data, n => n, { min: 0, range: 5 }); // => {"from":0,"to":5,"value":[1,3]},{"from":5,"to":10,"value":[6]},{"from":10,"to":15,"value":[12,13,13,14]},{"from":15,"to":20,"value":[19]}
# map(iterable, callback): iterable
# map(callback): (iterable) => iterable
Map data to a new shape via a callback frunction. Similar to Array.map.
map([{ n: 22 }, { n: 11 }, { n: 33 }], (row, idx) => ({ ...row, index: idx })); // => { n: 22, index: 0 }, { n: 11, index: 1 }, { n: 33, index: 2 }
const indexData = map((row, idx) => ({ ...row, index: idx + 1 }));
indexData([{ n: 22 }, { n: 11 }, { n: 33 }]); // => { n: 22, index: 1 }, { n: 11, index: 2 }, { n: 33, index: 3 }
# skip(iterable, number): iterable
# skip(number): (iterable) => iterable
Skip a set number of rows.
const words = ["spray", "limit", "elite", "exuberant", "destruction", "present"];
skip(words, 3); // => "exuberant", "destruction", "present"
const skip4 = skip(4);
skip4(words); // => "destruction", "present"
# sort(iterable, compare): iterable
# sort(compare): (iterable) => iterable
Sort iterable based on result of compare
function (should return -1, 0, 1). Similar to Array.sort.
var numbers = [4, 2, 5, 1, 3];
sort(numbers, (a, b) => a - b); // => 1, 2, 3, 4, 5
const reverseSort = sort((a, b) => b - a);
reverseSort(numbers) // => 5, 4, 3, 2, 1
A collection of "Observers" which can be adapted as functions, activities and sensors
export interface Observer<T, U> {
observe(r: T, idx: number): void;
peek(): U;
}
# sensor(_: Observer): iterable
Adapts an observer so it can be used in a pipe.
# scalar(_: Observer): any
Adapts an observer so it can be called as a regular function.
# count(): Observer
Counts the number of "observed" rows:
const s1 = count();
const s2 = count();
const p1 = pipe(
sensor(s1),
filter(r => r.age > 30),
sensor(s2),
);
const data = [...p1(population)];
s1.peek(); // => 1000;
s2.peek(); // => 699;
const doCount = scalar(count());
doCount([5, 1, 2, -3, 4]); // => 5
# min(): Observer
# min(accessor): Observer
Calculates minimal value for "observed" rows:
const s1 = min();
const s2 = min();
const p1 = pipe(
sensor(s1),
filter(r => r > 3),
sensor(s2),
);
const data = [...p1([1, 2, 3, 4, 5, 0])];
s1.peek() // => 0
s2.peek() // => 4
const calcMin = scalar(min(row => row.id));
calcMin([{ id: 22 }, { id: 44 }, { id: 33 }]); // => 22
# max(): Observer
# max(accessor): Observer
Calculates maximum value for "observed" rows:
const s1 = max();
const s2 = max();
const p1 = pipe(
sensor(s1),
filter(r => r < 3),
sensor(s2),
);
const data = [...p1([1, 2, 3, 4, 5, 0])];
s1.peek() // => 5
s2.peek() // => 2
const calcMax = scalar(max(row => row.id));
calcMax([{ id: 22 }, { id: 44 }, { id: 33 }]); // => 44
# extent(): Observer
# extent(accessor): Observer
Calculates extent (min + max) values for "observed" rows:
const s1 = extent(r => r.age);
const s2 = extent(r => r.age);
const p1 = pipe(
sensor(s1),
filter(r => r.age > 30),
sensor(s2),
);
const data = [...p1(population)];
s1.peek() // => [16, 66]
s2.peek() // => [31, 66]
const calcExtent = scalar(extent(row => row.id));
calcExtent([{ id: 22 }, { id: 44 }, { id: 33 }]); // => [22, 44]
# mean(): Observer
# mean(accessor): Observer
Calculates mean (average) value for "observed" rows:
const calcMean = scalar(mean());
calcMean([5, -6, 1, 2, -2])) // => 0
# median(): Observer
# median(accessor): Observer
Calculates median value for "observed" rows:
const calcMedian = scalar(median());
calcMedian([-6, -2, 1, 2, 5]) // => 1
calcMedian([5, -6, 1, 2, -2]) // => 1
calcMedian([-6, -2, 1, 2, 5, 6]) // => 1.5
calcMedian([5, -6, 1, 2, -2, 6]) // => 1.5
calcMedian([9]) // => 9
# quartile(): Observer
# quartile(accessor): Observer
Calculates quartile value for "observed" rows:
const calcQuartile = scalar(quartile());
calcQuartile([6, 7, 15, 36, 39, 40, 41, 42, 43, 47, 49]) // => [6, 15, 40, 43, 49]
calcQuartile([7, 15, 36, 39, 40, 41]) // => [7, 15, 37.5, 40, 41]
calcQuartile([1, 22, 133]) // => [1, 1, 22, 133, 133]
calcQuartile([2, 144, 33]) // => [2, 2, 33, 144, 144]
# reduce(reducer[, initialValue]): Observer
Calculates reduced value for "observed" rows:
const reduceFunc = (prev, row) => prev + row;
const calcReduce1 = scalar(reduce(reduceFunc));
const calcReduce2 = scalar(reduce(reduceFunc), 10);
calcReduce1([1, 2, 3, 4, 5]) // => 15
calcReduce2([1, 2, 3, 4, 5]) // => 25
# variance(): Observer
# variance(accessor): Observer
Calculates the variance for the "observed" rows. If the number of rows is fewer than two numbers, returns undefined.
const calcVariance = scalar(variance());
calcVariance([5, 1, 2, 3, 4]) // => 2.5
# deviation(): Observer
# deviation(accessor): Observer
Calculates the standard deviation for the "observed" rows. If the number of rows is fewer than two numbers, returns undefined.
const calcDeviation = scalar(deviation());
calcDeviation([5, 1, 2, 3, 4]) // => 1.58113883008 == sqrt(2.5)
# distribution(): Observer<number, { min: number, mean: number, max: number, deviation: number, variance: number}>
# distribution(accessor): Observer<any, { min: number, mean: number, max: number, deviation: number, variance: number}>
Calculates a "distribution" (a combination of min, max, mean, variance and deviance) of the "observed" rows. If the number of rows is fewer than two numbers, returns undefined.
const calcDistribution = scalar(distribution());
calcDistribution([5, 1, 2, 3, 4])) // => { min: 1, mean: 3, max: 5, deviation: Math.sqrt(2.5), variance: 2.5}
Convenience functions
# pipe(iterable, ...iterableActivity): iterable
# pipe(iterable, ...iterableActivity, scalarActivity): scalar
# pipe(...iterableActivity): iterableActivity
# pipe(...iterableActivity, scalarActivity): scalarActivity
Pipes a series of activities into a single process pipeline.
// Iterable output
pipe([0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
filter(n => n <= 5),
map((n, idx) => ({ index: idx, value: n })),
filter(row => row.index % 2 === 0),
sort((l, r) => l.value - r.value),
first(3)
); // => { index: 0, value: 0 }, { index: 2, value: 2 }, { index: 4, value: 4 }
const process = pipe(
filter(n => n <= 5),
map((n, idx) => ({ index: idx, value: n })),
filter(row => row.index % 2 === 0),
sort((l, r) => l.value - r.value),
first(3)
);
process([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])// => { index: 0, value: 0 }, { index: 2, value: 2 }, { index: 4, value: 4 }
// Scalar output
pipe([0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
process,
max(row => row.value)
); // => 4
const process_2 = pipe(
process,
min(row => row.value)
);
process_2([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]); // => 0
# generate(generatorFn[, maxIterations]): iterable
Generates an iterable data set. Optionally limits the length to maxIterations
.
// Iterable output
generate(Math.random); // => Random number iterator
generate(Math.random, 100); // => Random number iterator limited to 100 items
FAQs
hpcc-js - Data Flow
We found that @hpcc-js/dataflow demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Product
Automatically fix and test dependency updates with socket fix—a new CLI tool that turns CVE alerts into safe, automated upgrades.
Security News
CISA denies CVE funding issues amid backlash over a new CVE foundation formed by board members, raising concerns about transparency and program governance.
Product
We’re excited to announce a powerful new capability in Socket: historical data and enhanced analytics.